Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do not allow to delete volume from brokers #820

Merged
merged 15 commits into from Jun 16, 2022
Merged

Do not allow to delete volume from brokers #820

merged 15 commits into from Jun 16, 2022

Conversation

bartam1
Copy link
Contributor

@bartam1 bartam1 commented Jun 3, 2022

Q A
Bug fix? yes
New feature? no
API breaks? no
Deprecations? no
Related tickets #739
License Apache 2.0

What's in this PR?

  • There is kafkaCluster validation webhook that checks is there a broker storage removal. When there is then the validation will be failed.
  • There is another protection for this problem in this PR. When there is broker storage removal in the modified kafkaCluster CR, the koperator will not remove it in the reconcile flow from the broker "log.dirs".

Why?

Currently nothing prohibits the user to remove broker storage entry from KafkaCluster custom resource. This will trigger koperator to remove the storage from broker configuration. This should not be allowed because it can lead to data loss

Checklist

  • Implementation tested
  • Error handling code meets the guideline
  • Logging code meets the guideline

@bartam1 bartam1 requested a review from a team as a code owner June 3, 2022 10:58
Copy link
Member

@panyuenlau panyuenlau left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

btw mounth is a typo and it should be mount

pkg/webhook/kafkacluster_validator.go Outdated Show resolved Hide resolved
pkg/webhook/kafkacluster_validator.go Outdated Show resolved Hide resolved
pkg/webhook/kafkacluster_validator.go Outdated Show resolved Hide resolved
Comment on lines 61 to 73
for _, storageConfigOld := range brokerConfigsOld.StorageConfigs {
isStorageFound := false

for _, storageConfigNew := range brokerConfigsNew.StorageConfigs {
if storageConfigOld.MountPath == storageConfigNew.MountPath {
isStorageFound = true
break
}
}
if !isStorageFound {
log.Info(fmt.Sprintf("Not allowed to remove broker storage with mountPath: %s from brokerID: %v", storageConfigOld.MountPath, brokerOld.Id))
return notAllowed(fmt.Sprintf("Removing storage from a runnng broker is not supported! (mounthPath: %s, brokerID: %v)", storageConfigOld.MountPath, brokerOld.Id), metav1.StatusReasonInvalid)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logic here looks a bit fishy to me: when we are at the first storageConfigOld and its MountPath doesn't match all the possible MountPath in the brokerConfigsNew.StorageConfigs, we immediately return notAllowed(...), is this intended?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, because we have found a removed storage so this update should be rejected.
Do you think we should search all of the removed storages and writes out all of them and after reject.
What others think?

Copy link
Member

@panyuenlau panyuenlau Jun 7, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so the intention here is to return immediately when the removed storage is found (e.g. isStorageFound is true)? But the implementation returns when isStorageFound is false

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably the name here is ambiguous. If I can found the old storage in the new one then the isStorageFound will be true. But If I cant, then it will be false, and I will reject.

pkg/resources/kafka/configmap.go Outdated Show resolved Hide resolved
pkg/webhook/request.go Show resolved Hide resolved
BrokerConfigGroups: map[string]v1beta1.BrokerConfig{
"default": {
StorageConfigs: []v1beta1.StorageConfig{
//v1beta1.StorageConfig{MountPath: "logs1"},
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: unnecessary comment? (unless it is to understand the case more easily)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can be removed, but you can see also more easily that this one has been removed from the new Config. So this validation should be rejected

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we wish to show that should we not have a comment indicating that instead? Like instead of this you could write in the comment that the removed MountPath is this.

Copy link
Member

@stoader stoader Jun 15, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with @Kuvesz

cc @bartam1

pkg/webhook/kafkacluster_validator_test.go Outdated Show resolved Hide resolved
pkg/webhook/kafkacluster_validator.go Outdated Show resolved Hide resolved
pkg/webhook/kafkacluster_validator.go Show resolved Hide resolved
pregnor
pregnor previously approved these changes Jun 8, 2022
@bartam1 bartam1 requested review from panyuenlau and Kuvesz June 9, 2022 07:41
pregnor
pregnor previously approved these changes Jun 9, 2022
Kuvesz
Kuvesz previously approved these changes Jun 9, 2022
Copy link
Contributor

@Kuvesz Kuvesz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, left some very minor comments.

BrokerConfigGroups: map[string]v1beta1.BrokerConfig{
"default": {
StorageConfigs: []v1beta1.StorageConfig{
//v1beta1.StorageConfig{MountPath: "logs1"},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we wish to show that should we not have a comment indicating that instead? Like instead of this you could write in the comment that the removed MountPath is this.

pkg/webhook/request.go Show resolved Hide resolved
@bartam1 bartam1 dismissed stale reviews from Kuvesz and pregnor via cbd7bac June 9, 2022 10:10
@bartam1 bartam1 requested review from pregnor and Kuvesz June 9, 2022 11:29
Kuvesz
Kuvesz previously approved these changes Jun 9, 2022
Kuvesz
Kuvesz previously approved these changes Jun 9, 2022
pkg/webhook/request.go Outdated Show resolved Hide resolved
pkg/resources/kafka/configmap.go Outdated Show resolved Hide resolved
pkg/resources/kafka/configmap.go Outdated Show resolved Hide resolved
pregnor
pregnor previously approved these changes Jun 15, 2022
Copy link
Member

@pregnor pregnor left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

switch req.Kind.Kind {
case kafkaTopic:
l.Info("AdmissionReview")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of l.Info("AdmissionReview") you could initialize the logger as

l := log.WithName("AdmissionReview").WithValues("kind", req.Kind, "namespace", req.Namespace, "name", req.Name, "uid", req.UID,
 		"operation", req.Operation, "user info", req.UserInfo)

@@ -55,8 +56,30 @@ func (s *webhookServer) validate(ar *admissionv1.AdmissionReview) *admissionv1.A
}
}
return s.validateKafkaTopic(&topic)
case kafkaCluster:
// when the operator modifies the resource we dont do any validation
operatorUsername := fmt.Sprintf("system:serviceaccount:%v:%v", os.Getenv("POD_NAMESPACE"), os.Getenv("SERVICE_ACCOUNT"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inject the namespace and service account instead of always reading from env var

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants