From 6d1ca64137f73ec28dc4121519d4509695b7b26f Mon Sep 17 00:00:00 2001 From: marbarta Date: Fri, 3 Jun 2022 12:36:10 +0200 Subject: [PATCH 01/13] Add forbid broker storage removal --- .../operator-deployment-with-webhook.yaml | 20 + pkg/resources/kafka/configmap.go | 75 +++- pkg/webhook/kafkacluster_validator.go | 79 ++++ pkg/webhook/kafkacluster_validator_test.go | 343 ++++++++++++++++++ pkg/webhook/request.go | 29 +- 5 files changed, 536 insertions(+), 10 deletions(-) create mode 100644 pkg/webhook/kafkacluster_validator.go create mode 100644 pkg/webhook/kafkacluster_validator_test.go diff --git a/charts/kafka-operator/templates/operator-deployment-with-webhook.yaml b/charts/kafka-operator/templates/operator-deployment-with-webhook.yaml index 75e54913f..ea6d46d11 100644 --- a/charts/kafka-operator/templates/operator-deployment-with-webhook.yaml +++ b/charts/kafka-operator/templates/operator-deployment-with-webhook.yaml @@ -51,6 +51,26 @@ webhooks: resources: - kafkatopics sideEffects: None +- admissionReviewVersions: + - v1 + clientConfig: + caBundle: {{ $caCrt }} + service: + name: "{{ include "kafka-operator.fullname" . }}-operator" + namespace: {{ .Release.Namespace }} + path: /validate + failurePolicy: Fail + name: kafkaclusters.kafka.banzaicloud.io + rules: + - apiGroups: + - kafka.banzaicloud.io + apiVersions: + - v1beta1 + operations: + - UPDATE + resources: + - kafkaclusters + sideEffects: None --- apiVersion: v1 kind: Secret diff --git a/pkg/resources/kafka/configmap.go b/pkg/resources/kafka/configmap.go index 5cf5655d5..74b257dc0 100644 --- a/pkg/resources/kafka/configmap.go +++ b/pkg/resources/kafka/configmap.go @@ -15,14 +15,18 @@ package kafka import ( + "context" "fmt" "sort" "strings" "emperror.dev/errors" "github.com/go-logr/logr" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "sigs.k8s.io/controller-runtime/pkg/client" corev1 "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" apiutil "github.com/banzaicloud/koperator/api/util" "github.com/banzaicloud/koperator/api/v1alpha1" @@ -34,6 +38,8 @@ import ( properties "github.com/banzaicloud/koperator/properties/pkg" ) +const logDirPropertyName = "log.dirs" + func (r *Reconciler) getConfigProperties(bConfig *v1beta1.BrokerConfig, id int32, extListenerStatuses, intListenerStatuses, controllerIntListenerStatuses map[string]v1beta1.ListenerStatusList, serverPasses map[string]string, clientPass string, superUsers []string, log logr.Logger) *properties.Properties { @@ -106,10 +112,24 @@ func (r *Reconciler) getConfigProperties(bConfig *v1beta1.BrokerConfig, id int32 log.Error(err, "setting broker.id in broker configuration resulted an error") } - // Storage configuration - storageConf := generateStorageConfig(bConfig.StorageConfigs) - if storageConf != "" { - if err := config.Set("log.dirs", storageConf); err != nil { + // This logic prevents the removal of the mounthPath from the broker configmap + brokerConfigMapName := fmt.Sprintf(brokerConfigTemplate+"-%d", r.KafkaCluster.Name, id) + var brokerConfigMapOld v1.ConfigMap + err = r.Client.Get(context.Background(), client.ObjectKey{Name: brokerConfigMapName, Namespace: r.KafkaCluster.GetNamespace()}, &brokerConfigMapOld) + if err != nil && !apierrors.IsNotFound(err) { + log.Error(err, "getting broker configmap from the Kubernetes API server resulted an error") + } + + mounthPathsOld := getMountPathsFromBrokerConfigMap(brokerConfigMapOld) + mounthPathsNew := generateStorageConfig(bConfig.StorageConfigs) + mounthPathsMerged, isMounthPathRemoved := mergeMounthPaths(mounthPathsOld, mounthPathsNew) + + if isMounthPathRemoved { + log.Error(errors.New("removing storage from a running broker is not supported"), "", "brokerID", id) + } + + if len(mounthPathsMerged) != 0 { + if err := config.Set("log.dirs", strings.Join(mounthPathsMerged, ",")); err != nil { log.Error(err, "setting log.dirs in broker configuration resulted an error") } } @@ -124,6 +144,36 @@ func (r *Reconciler) getConfigProperties(bConfig *v1beta1.BrokerConfig, id int32 return config } +// mergeMounthPaths is merges the new mounthPaths with the old. +// It returns the merged []string and a bool which true or false depend on mounthPathsNew contains or not all of the elements of the mounthPathsOld +func mergeMounthPaths(mounthPathsOld, mounthPathsNew []string) ([]string, bool) { + var mounthPathsMerged []string + mounthPathsMerged = append(mounthPathsMerged, mounthPathsOld...) + mounthPathsOldLen := len(mounthPathsOld) + // Merging the new mounthPaths with the old. If any of them is removed we can check the difference in the mounthPathsOldLen + for i := range mounthPathsNew { + found := false + for k := range mounthPathsOld { + if mounthPathsOld[k] == mounthPathsNew[i] { + found = true + mounthPathsOldLen-- + break + } + } + // if this is a new mounthPath then add it to te current + if !found { + mounthPathsMerged = append(mounthPathsMerged, mounthPathsNew[i]) + } + } + // If any of them is removed we can check the difference in the mounthPathsOldLen + isMounthPathRemoved := false + if mounthPathsOldLen > 0 { + isMounthPathRemoved = true + } + + return mounthPathsMerged, isMounthPathRemoved +} + func generateSuperUsers(users []string) (suStrings []string) { suStrings = make([]string, 0) for _, x := range users { @@ -180,12 +230,25 @@ func appendListenerConfigs(advertisedListenerConfig []string, id int32, return advertisedListenerConfig } -func generateStorageConfig(sConfig []v1beta1.StorageConfig) string { +func getMountPathsFromBrokerConfigMap(configMap v1.ConfigMap) []string { + brokerConfig := configMap.Data[kafkautils.ConfigPropertyName] + brokerConfigsLines := strings.Split(brokerConfig, "\n") + var mountPaths string + for i := range brokerConfigsLines { + keyVal := strings.Split(brokerConfigsLines[i], "=") + if len(keyVal) == 2 && keyVal[0] == logDirPropertyName { + mountPaths = keyVal[1] + } + } + return strings.Split(mountPaths, ",") +} + +func generateStorageConfig(sConfig []v1beta1.StorageConfig) []string { mountPaths := make([]string, 0, len(sConfig)) for _, storage := range sConfig { mountPaths = append(mountPaths, util.StorageConfigKafkaMountPath(storage.MountPath)) } - return strings.Join(mountPaths, ",") + return mountPaths } func generateControlPlaneListener(iListeners []v1beta1.InternalListenerConfig) string { diff --git a/pkg/webhook/kafkacluster_validator.go b/pkg/webhook/kafkacluster_validator.go new file mode 100644 index 000000000..752bd16a8 --- /dev/null +++ b/pkg/webhook/kafkacluster_validator.go @@ -0,0 +1,79 @@ +// Copyright © 2019 Banzai Cloud +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package webhook + +import ( + "context" + "fmt" + + admissionv1 "k8s.io/api/admission/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/controller-runtime/pkg/client" + + banzaicloudv1beta1 "github.com/banzaicloud/koperator/api/v1beta1" +) + +func (s *webhookServer) validateKafkaCluster(kafkaClusterNew *banzaicloudv1beta1.KafkaCluster) *admissionv1.AdmissionResponse { + ctx := context.Background() + // get the Old kafkaCluster CR + kafkaClusterSpecOld := banzaicloudv1beta1.KafkaCluster{} + err := s.client.Get(ctx, client.ObjectKey{Name: kafkaClusterNew.GetName(), Namespace: kafkaClusterNew.GetNamespace()}, &kafkaClusterSpecOld) + if err != nil { + // New kafkaCluster has been added thus no need to check storage removal + if apierrors.IsNotFound(err) { + return nil + } + log.Error(err, "couldn't get KafkaCluster custom resource") + return notAllowed("API failure while retrieving KafkaCluster CR, please try again", metav1.StatusReasonServiceUnavailable) + } + + res := checkBrokerStorageRemoval(&kafkaClusterSpecOld.Spec, &kafkaClusterNew.Spec) + if res != nil { + return res + } + + // everything looks a-okay + return &admissionv1.AdmissionResponse{ + Allowed: true, + } +} + +// checkBrokerStorageRemoval checking is there any broker storage which has been removed. If yes, admission will be refused +func checkBrokerStorageRemoval(kafkaClusterSpecOld, kafkaClusterSpecNew *banzaicloudv1beta1.KafkaClusterSpec) *admissionv1.AdmissionResponse { + for _, brokerOld := range kafkaClusterSpecOld.Brokers { + for _, brokerNew := range kafkaClusterSpecNew.Brokers { + if brokerOld.Id == brokerNew.Id { + brokerConfigsOld, _ := brokerOld.GetBrokerConfig(*kafkaClusterSpecOld) + brokerConfigsNew, _ := brokerNew.GetBrokerConfig(*kafkaClusterSpecNew) + for _, storageConfigOld := range brokerConfigsOld.StorageConfigs { + isStorageFound := false + + for _, storageConfigNew := range brokerConfigsNew.StorageConfigs { + if storageConfigOld.MountPath == storageConfigNew.MountPath { + isStorageFound = true + break + } + } + if !isStorageFound { + log.Info(fmt.Sprintf("Not allowed to remove broker storage with mountPath: %s from brokerID: %v", storageConfigOld.MountPath, brokerOld.Id)) + return notAllowed(fmt.Sprintf("Removing storage from a runnng broker is not supported! (mounthPath: %s, brokerID: %v)", storageConfigOld.MountPath, brokerOld.Id), metav1.StatusReasonInvalid) + } + } + } + } + } + return nil +} diff --git a/pkg/webhook/kafkacluster_validator_test.go b/pkg/webhook/kafkacluster_validator_test.go new file mode 100644 index 000000000..b8b625b52 --- /dev/null +++ b/pkg/webhook/kafkacluster_validator_test.go @@ -0,0 +1,343 @@ +// Copyright © 2019 Banzai Cloud +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package webhook + +import ( + "testing" + + "github.com/banzaicloud/koperator/api/v1beta1" +) + +func TestCheckBrokerStorageRemoval(t *testing.T) { + testCases := []struct { + testName string + kafkaClusterSpecNew v1beta1.KafkaClusterSpec + kafkaClusterSpecOld v1beta1.KafkaClusterSpec + isValid bool + }{ + { + testName: "1", + kafkaClusterSpecNew: v1beta1.KafkaClusterSpec{ + BrokerConfigGroups: map[string]v1beta1.BrokerConfig{ + "default": v1beta1.BrokerConfig{ + StorageConfigs: []v1beta1.StorageConfig{ + v1beta1.StorageConfig{MountPath: "logs1"}, + v1beta1.StorageConfig{MountPath: "logs2"}, + v1beta1.StorageConfig{MountPath: "logs3"}, + }, + }, + }, + Brokers: []v1beta1.Broker{ + v1beta1.Broker{ + Id: 1, + BrokerConfigGroup: "default", + }, + }, + }, + kafkaClusterSpecOld: v1beta1.KafkaClusterSpec{ + BrokerConfigGroups: map[string]v1beta1.BrokerConfig{ + "default": v1beta1.BrokerConfig{ + StorageConfigs: []v1beta1.StorageConfig{ + v1beta1.StorageConfig{MountPath: "logs1"}, + v1beta1.StorageConfig{MountPath: "logs2"}, + v1beta1.StorageConfig{MountPath: "logs3"}, + }, + }, + }, + Brokers: []v1beta1.Broker{ + v1beta1.Broker{ + Id: 1, + BrokerConfigGroup: "default", + }, + }, + }, + isValid: true, + }, + { + testName: "2", + kafkaClusterSpecNew: v1beta1.KafkaClusterSpec{ + BrokerConfigGroups: map[string]v1beta1.BrokerConfig{ + "default": v1beta1.BrokerConfig{ + StorageConfigs: []v1beta1.StorageConfig{ + v1beta1.StorageConfig{MountPath: "logs1"}, + v1beta1.StorageConfig{MountPath: "logs2"}, + v1beta1.StorageConfig{MountPath: "logs3"}, + }, + }, + "default2": v1beta1.BrokerConfig{ + StorageConfigs: []v1beta1.StorageConfig{ + // v1beta1.StorageConfig{MountPath: "logs1"}, + v1beta1.StorageConfig{MountPath: "logs2"}, + v1beta1.StorageConfig{MountPath: "logs3"}, + }, + }, + }, + Brokers: []v1beta1.Broker{ + v1beta1.Broker{ + Id: 1, + BrokerConfigGroup: "default2", + }, + }, + }, + kafkaClusterSpecOld: v1beta1.KafkaClusterSpec{ + BrokerConfigGroups: map[string]v1beta1.BrokerConfig{ + "default": v1beta1.BrokerConfig{ + StorageConfigs: []v1beta1.StorageConfig{ + v1beta1.StorageConfig{MountPath: "logs1"}, + v1beta1.StorageConfig{MountPath: "logs2"}, + v1beta1.StorageConfig{MountPath: "logs3"}, + }, + }, + }, + Brokers: []v1beta1.Broker{ + v1beta1.Broker{ + Id: 1, + BrokerConfigGroup: "default", + }, + }, + }, + isValid: false, + }, + { + testName: "3", + kafkaClusterSpecNew: v1beta1.KafkaClusterSpec{ + BrokerConfigGroups: map[string]v1beta1.BrokerConfig{ + "default": v1beta1.BrokerConfig{ + StorageConfigs: []v1beta1.StorageConfig{ + //v1beta1.StorageConfig{MountPath: "logs1"}, + v1beta1.StorageConfig{MountPath: "logs2"}, + v1beta1.StorageConfig{MountPath: "logs3"}, + }, + }, + }, + Brokers: []v1beta1.Broker{ + v1beta1.Broker{ + Id: 1, + BrokerConfigGroup: "default", + }, + }, + }, + kafkaClusterSpecOld: v1beta1.KafkaClusterSpec{ + BrokerConfigGroups: map[string]v1beta1.BrokerConfig{ + "default": v1beta1.BrokerConfig{ + StorageConfigs: []v1beta1.StorageConfig{ + v1beta1.StorageConfig{MountPath: "logs1"}, + v1beta1.StorageConfig{MountPath: "logs2"}, + v1beta1.StorageConfig{MountPath: "logs3"}, + }, + }, + }, + Brokers: []v1beta1.Broker{ + v1beta1.Broker{ + Id: 1, + BrokerConfigGroup: "default", + }, + }, + }, + isValid: false, + }, + { + testName: "4", + kafkaClusterSpecNew: v1beta1.KafkaClusterSpec{ + BrokerConfigGroups: map[string]v1beta1.BrokerConfig{ + "default": v1beta1.BrokerConfig{ + StorageConfigs: []v1beta1.StorageConfig{ + v1beta1.StorageConfig{MountPath: "logs1"}, + v1beta1.StorageConfig{MountPath: "logs2"}, + v1beta1.StorageConfig{MountPath: "logs3"}, + }, + }, + }, + Brokers: []v1beta1.Broker{ + v1beta1.Broker{ + Id: 1, + BrokerConfigGroup: "default", + }, + }, + }, + kafkaClusterSpecOld: v1beta1.KafkaClusterSpec{ + BrokerConfigGroups: map[string]v1beta1.BrokerConfig{ + "default": v1beta1.BrokerConfig{ + StorageConfigs: []v1beta1.StorageConfig{ + //v1beta1.StorageConfig{MountPath: "logs1"}, + v1beta1.StorageConfig{MountPath: "logs2"}, + v1beta1.StorageConfig{MountPath: "logs3"}, + }, + }, + }, + Brokers: []v1beta1.Broker{ + v1beta1.Broker{ + Id: 1, + BrokerConfigGroup: "default", + }, + }, + }, + isValid: true, + }, + { + testName: "5", + kafkaClusterSpecNew: v1beta1.KafkaClusterSpec{ + BrokerConfigGroups: map[string]v1beta1.BrokerConfig{ + "default": v1beta1.BrokerConfig{ + StorageConfigs: []v1beta1.StorageConfig{ + v1beta1.StorageConfig{MountPath: "logs3"}, + v1beta1.StorageConfig{MountPath: "logs2"}, + v1beta1.StorageConfig{MountPath: "logs1"}, + }, + }, + }, + Brokers: []v1beta1.Broker{ + v1beta1.Broker{ + Id: 1, + BrokerConfigGroup: "default", + }, + }, + }, + kafkaClusterSpecOld: v1beta1.KafkaClusterSpec{ + BrokerConfigGroups: map[string]v1beta1.BrokerConfig{ + "default": v1beta1.BrokerConfig{ + StorageConfigs: []v1beta1.StorageConfig{ + v1beta1.StorageConfig{MountPath: "logs1"}, + v1beta1.StorageConfig{MountPath: "logs2"}, + v1beta1.StorageConfig{MountPath: "logs3"}, + }, + }, + }, + Brokers: []v1beta1.Broker{ + v1beta1.Broker{ + Id: 1, + BrokerConfigGroup: "default", + }, + }, + }, + isValid: true, + }, + { + testName: "6", + kafkaClusterSpecNew: v1beta1.KafkaClusterSpec{ + BrokerConfigGroups: map[string]v1beta1.BrokerConfig{ + "default": v1beta1.BrokerConfig{ + StorageConfigs: []v1beta1.StorageConfig{ + v1beta1.StorageConfig{MountPath: "logs1"}, + v1beta1.StorageConfig{MountPath: "logs2"}, + v1beta1.StorageConfig{MountPath: "logs3"}, + }, + }, + }, + Brokers: []v1beta1.Broker{ + v1beta1.Broker{ + Id: 1, + BrokerConfigGroup: "default", + BrokerConfig: &v1beta1.BrokerConfig{ + StorageConfigs: []v1beta1.StorageConfig{ + v1beta1.StorageConfig{MountPath: "logs4"}, + v1beta1.StorageConfig{MountPath: "logs5"}, + v1beta1.StorageConfig{MountPath: "logs6"}, + }, + }, + }, + }, + }, + kafkaClusterSpecOld: v1beta1.KafkaClusterSpec{ + BrokerConfigGroups: map[string]v1beta1.BrokerConfig{ + "default": v1beta1.BrokerConfig{ + StorageConfigs: []v1beta1.StorageConfig{ + v1beta1.StorageConfig{MountPath: "logs1"}, + v1beta1.StorageConfig{MountPath: "logs2"}, + v1beta1.StorageConfig{MountPath: "logs3"}, + }, + }, + }, + Brokers: []v1beta1.Broker{ + v1beta1.Broker{ + Id: 1, + BrokerConfigGroup: "default", + BrokerConfig: &v1beta1.BrokerConfig{ + StorageConfigs: []v1beta1.StorageConfig{ + v1beta1.StorageConfig{MountPath: "logs4"}, + v1beta1.StorageConfig{MountPath: "logs5"}, + v1beta1.StorageConfig{MountPath: "logs6"}, + }, + }, + }, + }, + }, + isValid: true, + }, + { + testName: "7", + kafkaClusterSpecNew: v1beta1.KafkaClusterSpec{ + BrokerConfigGroups: map[string]v1beta1.BrokerConfig{ + "default": v1beta1.BrokerConfig{ + StorageConfigs: []v1beta1.StorageConfig{ + v1beta1.StorageConfig{MountPath: "logs1"}, + v1beta1.StorageConfig{MountPath: "logs2"}, + v1beta1.StorageConfig{MountPath: "logs3"}, + }, + }, + }, + Brokers: []v1beta1.Broker{ + v1beta1.Broker{ + Id: 1, + BrokerConfigGroup: "default", + BrokerConfig: &v1beta1.BrokerConfig{ + StorageConfigs: []v1beta1.StorageConfig{ + v1beta1.StorageConfig{MountPath: "logs4"}, + v1beta1.StorageConfig{MountPath: "logs5"}, + v1beta1.StorageConfig{MountPath: "logs6"}, + }, + }, + }, + }, + }, + kafkaClusterSpecOld: v1beta1.KafkaClusterSpec{ + BrokerConfigGroups: map[string]v1beta1.BrokerConfig{ + "default": v1beta1.BrokerConfig{ + StorageConfigs: []v1beta1.StorageConfig{ + v1beta1.StorageConfig{MountPath: "logs1"}, + v1beta1.StorageConfig{MountPath: "logs2"}, + v1beta1.StorageConfig{MountPath: "logs3"}, + }, + }, + }, + Brokers: []v1beta1.Broker{ + v1beta1.Broker{ + Id: 1, + BrokerConfigGroup: "default", + BrokerConfig: &v1beta1.BrokerConfig{ + StorageConfigs: []v1beta1.StorageConfig{ + v1beta1.StorageConfig{MountPath: "logs4"}, + v1beta1.StorageConfig{MountPath: "logs5"}, + v1beta1.StorageConfig{MountPath: "logs8"}, + }, + }, + }, + }, + }, + isValid: false, + }, + } + + for _, testCase := range testCases { + res := checkBrokerStorageRemoval(&testCase.kafkaClusterSpecOld, &testCase.kafkaClusterSpecNew) + if res != nil && testCase.isValid { + t.Errorf("Message: %s, testName: %s", res.Result.Message, testCase.testName) + } else if res == nil && !testCase.isValid { + t.Errorf("there should be storage removal, testName: %s", testCase.testName) + } + + } + +} diff --git a/pkg/webhook/request.go b/pkg/webhook/request.go index d36177335..a3f20031a 100644 --- a/pkg/webhook/request.go +++ b/pkg/webhook/request.go @@ -28,21 +28,21 @@ import ( "github.com/banzaicloud/koperator/pkg/util" "github.com/banzaicloud/koperator/api/v1alpha1" + "github.com/banzaicloud/koperator/api/v1beta1" ) var ( - kafkaTopic = reflect.TypeOf(v1alpha1.KafkaTopic{}).Name() + kafkaTopic = reflect.TypeOf(v1alpha1.KafkaTopic{}).Name() + kafkaCluster = reflect.TypeOf(v1beta1.KafkaCluster{}).Name() ) func (s *webhookServer) validate(ar *admissionv1.AdmissionReview) *admissionv1.AdmissionResponse { req := ar.Request - l := log.WithValues("kind", req.Kind, "namespace", req.Namespace, "name", req.Name, "uid", req.UID, "operation", req.Operation, "user info", req.UserInfo) - l.Info("AdmissionReview") - switch req.Kind.Kind { case kafkaTopic: + l.Info("AdmissionReview") var topic v1alpha1.KafkaTopic if err := json.Unmarshal(req.Object.Raw, &topic); err != nil { l.Error(err, "Could not unmarshal raw object") @@ -55,8 +55,29 @@ func (s *webhookServer) validate(ar *admissionv1.AdmissionReview) *admissionv1.A } } return s.validateKafkaTopic(&topic) + case kafkaCluster: + // when the operator modifies the resource we dont do any validation + if req.UserInfo.Username == "system:serviceaccount:kafka:kafka-operator" { + return &admissionv1.AdmissionResponse{ + Allowed: true, + } + } + l.Info("AdmissionReview") + var kafkaCluster v1beta1.KafkaCluster + if err := json.Unmarshal(req.Object.Raw, &kafkaCluster); err != nil { + l.Error(err, "Could not unmarshal raw object") + return notAllowed(err.Error(), metav1.StatusReasonBadRequest) + } + if ok := util.ObjectManagedByClusterRegistry(kafkaCluster.GetObjectMeta()); ok { + l.Info("Skip validation as the resource is managed by Cluster Registry") + return &admissionv1.AdmissionResponse{ + Allowed: true, + } + } + return s.validateKafkaCluster(&kafkaCluster) default: + l.Info("AdmissionReview") return notAllowed(fmt.Sprintf("Unexpected resource kind: %s", req.Kind.Kind), metav1.StatusReasonBadRequest) } } From 2bca0495c66cac6540791dba7813a169eb3560b2 Mon Sep 17 00:00:00 2001 From: marbarta Date: Fri, 3 Jun 2022 12:37:41 +0200 Subject: [PATCH 02/13] Fix lint --- pkg/webhook/kafkacluster_validator_test.go | 168 ++++++++++----------- 1 file changed, 83 insertions(+), 85 deletions(-) diff --git a/pkg/webhook/kafkacluster_validator_test.go b/pkg/webhook/kafkacluster_validator_test.go index b8b625b52..9c9d879c3 100644 --- a/pkg/webhook/kafkacluster_validator_test.go +++ b/pkg/webhook/kafkacluster_validator_test.go @@ -31,16 +31,16 @@ func TestCheckBrokerStorageRemoval(t *testing.T) { testName: "1", kafkaClusterSpecNew: v1beta1.KafkaClusterSpec{ BrokerConfigGroups: map[string]v1beta1.BrokerConfig{ - "default": v1beta1.BrokerConfig{ + "default": { StorageConfigs: []v1beta1.StorageConfig{ - v1beta1.StorageConfig{MountPath: "logs1"}, - v1beta1.StorageConfig{MountPath: "logs2"}, - v1beta1.StorageConfig{MountPath: "logs3"}, + {MountPath: "logs1"}, + {MountPath: "logs2"}, + {MountPath: "logs3"}, }, }, }, Brokers: []v1beta1.Broker{ - v1beta1.Broker{ + { Id: 1, BrokerConfigGroup: "default", }, @@ -48,16 +48,16 @@ func TestCheckBrokerStorageRemoval(t *testing.T) { }, kafkaClusterSpecOld: v1beta1.KafkaClusterSpec{ BrokerConfigGroups: map[string]v1beta1.BrokerConfig{ - "default": v1beta1.BrokerConfig{ + "default": { StorageConfigs: []v1beta1.StorageConfig{ - v1beta1.StorageConfig{MountPath: "logs1"}, - v1beta1.StorageConfig{MountPath: "logs2"}, - v1beta1.StorageConfig{MountPath: "logs3"}, + {MountPath: "logs1"}, + {MountPath: "logs2"}, + {MountPath: "logs3"}, }, }, }, Brokers: []v1beta1.Broker{ - v1beta1.Broker{ + { Id: 1, BrokerConfigGroup: "default", }, @@ -69,23 +69,23 @@ func TestCheckBrokerStorageRemoval(t *testing.T) { testName: "2", kafkaClusterSpecNew: v1beta1.KafkaClusterSpec{ BrokerConfigGroups: map[string]v1beta1.BrokerConfig{ - "default": v1beta1.BrokerConfig{ + "default": { StorageConfigs: []v1beta1.StorageConfig{ - v1beta1.StorageConfig{MountPath: "logs1"}, - v1beta1.StorageConfig{MountPath: "logs2"}, - v1beta1.StorageConfig{MountPath: "logs3"}, + {MountPath: "logs1"}, + {MountPath: "logs2"}, + {MountPath: "logs3"}, }, }, - "default2": v1beta1.BrokerConfig{ + "default2": { StorageConfigs: []v1beta1.StorageConfig{ // v1beta1.StorageConfig{MountPath: "logs1"}, - v1beta1.StorageConfig{MountPath: "logs2"}, - v1beta1.StorageConfig{MountPath: "logs3"}, + {MountPath: "logs2"}, + {MountPath: "logs3"}, }, }, }, Brokers: []v1beta1.Broker{ - v1beta1.Broker{ + { Id: 1, BrokerConfigGroup: "default2", }, @@ -93,16 +93,16 @@ func TestCheckBrokerStorageRemoval(t *testing.T) { }, kafkaClusterSpecOld: v1beta1.KafkaClusterSpec{ BrokerConfigGroups: map[string]v1beta1.BrokerConfig{ - "default": v1beta1.BrokerConfig{ + "default": { StorageConfigs: []v1beta1.StorageConfig{ - v1beta1.StorageConfig{MountPath: "logs1"}, - v1beta1.StorageConfig{MountPath: "logs2"}, - v1beta1.StorageConfig{MountPath: "logs3"}, + {MountPath: "logs1"}, + {MountPath: "logs2"}, + {MountPath: "logs3"}, }, }, }, Brokers: []v1beta1.Broker{ - v1beta1.Broker{ + { Id: 1, BrokerConfigGroup: "default", }, @@ -114,16 +114,16 @@ func TestCheckBrokerStorageRemoval(t *testing.T) { testName: "3", kafkaClusterSpecNew: v1beta1.KafkaClusterSpec{ BrokerConfigGroups: map[string]v1beta1.BrokerConfig{ - "default": v1beta1.BrokerConfig{ + "default": { StorageConfigs: []v1beta1.StorageConfig{ //v1beta1.StorageConfig{MountPath: "logs1"}, - v1beta1.StorageConfig{MountPath: "logs2"}, - v1beta1.StorageConfig{MountPath: "logs3"}, + {MountPath: "logs2"}, + {MountPath: "logs3"}, }, }, }, Brokers: []v1beta1.Broker{ - v1beta1.Broker{ + { Id: 1, BrokerConfigGroup: "default", }, @@ -131,16 +131,16 @@ func TestCheckBrokerStorageRemoval(t *testing.T) { }, kafkaClusterSpecOld: v1beta1.KafkaClusterSpec{ BrokerConfigGroups: map[string]v1beta1.BrokerConfig{ - "default": v1beta1.BrokerConfig{ + "default": { StorageConfigs: []v1beta1.StorageConfig{ - v1beta1.StorageConfig{MountPath: "logs1"}, - v1beta1.StorageConfig{MountPath: "logs2"}, - v1beta1.StorageConfig{MountPath: "logs3"}, + {MountPath: "logs1"}, + {MountPath: "logs2"}, + {MountPath: "logs3"}, }, }, }, Brokers: []v1beta1.Broker{ - v1beta1.Broker{ + { Id: 1, BrokerConfigGroup: "default", }, @@ -152,16 +152,16 @@ func TestCheckBrokerStorageRemoval(t *testing.T) { testName: "4", kafkaClusterSpecNew: v1beta1.KafkaClusterSpec{ BrokerConfigGroups: map[string]v1beta1.BrokerConfig{ - "default": v1beta1.BrokerConfig{ + "default": { StorageConfigs: []v1beta1.StorageConfig{ - v1beta1.StorageConfig{MountPath: "logs1"}, - v1beta1.StorageConfig{MountPath: "logs2"}, - v1beta1.StorageConfig{MountPath: "logs3"}, + {MountPath: "logs1"}, + {MountPath: "logs2"}, + {MountPath: "logs3"}, }, }, }, Brokers: []v1beta1.Broker{ - v1beta1.Broker{ + { Id: 1, BrokerConfigGroup: "default", }, @@ -169,16 +169,16 @@ func TestCheckBrokerStorageRemoval(t *testing.T) { }, kafkaClusterSpecOld: v1beta1.KafkaClusterSpec{ BrokerConfigGroups: map[string]v1beta1.BrokerConfig{ - "default": v1beta1.BrokerConfig{ + "default": { StorageConfigs: []v1beta1.StorageConfig{ //v1beta1.StorageConfig{MountPath: "logs1"}, - v1beta1.StorageConfig{MountPath: "logs2"}, - v1beta1.StorageConfig{MountPath: "logs3"}, + {MountPath: "logs2"}, + {MountPath: "logs3"}, }, }, }, Brokers: []v1beta1.Broker{ - v1beta1.Broker{ + { Id: 1, BrokerConfigGroup: "default", }, @@ -190,16 +190,16 @@ func TestCheckBrokerStorageRemoval(t *testing.T) { testName: "5", kafkaClusterSpecNew: v1beta1.KafkaClusterSpec{ BrokerConfigGroups: map[string]v1beta1.BrokerConfig{ - "default": v1beta1.BrokerConfig{ + "default": { StorageConfigs: []v1beta1.StorageConfig{ - v1beta1.StorageConfig{MountPath: "logs3"}, - v1beta1.StorageConfig{MountPath: "logs2"}, - v1beta1.StorageConfig{MountPath: "logs1"}, + {MountPath: "logs3"}, + {MountPath: "logs2"}, + {MountPath: "logs1"}, }, }, }, Brokers: []v1beta1.Broker{ - v1beta1.Broker{ + { Id: 1, BrokerConfigGroup: "default", }, @@ -207,16 +207,16 @@ func TestCheckBrokerStorageRemoval(t *testing.T) { }, kafkaClusterSpecOld: v1beta1.KafkaClusterSpec{ BrokerConfigGroups: map[string]v1beta1.BrokerConfig{ - "default": v1beta1.BrokerConfig{ + "default": { StorageConfigs: []v1beta1.StorageConfig{ - v1beta1.StorageConfig{MountPath: "logs1"}, - v1beta1.StorageConfig{MountPath: "logs2"}, - v1beta1.StorageConfig{MountPath: "logs3"}, + {MountPath: "logs1"}, + {MountPath: "logs2"}, + {MountPath: "logs3"}, }, }, }, Brokers: []v1beta1.Broker{ - v1beta1.Broker{ + { Id: 1, BrokerConfigGroup: "default", }, @@ -228,23 +228,23 @@ func TestCheckBrokerStorageRemoval(t *testing.T) { testName: "6", kafkaClusterSpecNew: v1beta1.KafkaClusterSpec{ BrokerConfigGroups: map[string]v1beta1.BrokerConfig{ - "default": v1beta1.BrokerConfig{ + "default": { StorageConfigs: []v1beta1.StorageConfig{ - v1beta1.StorageConfig{MountPath: "logs1"}, - v1beta1.StorageConfig{MountPath: "logs2"}, - v1beta1.StorageConfig{MountPath: "logs3"}, + {MountPath: "logs1"}, + {MountPath: "logs2"}, + {MountPath: "logs3"}, }, }, }, Brokers: []v1beta1.Broker{ - v1beta1.Broker{ + { Id: 1, BrokerConfigGroup: "default", BrokerConfig: &v1beta1.BrokerConfig{ StorageConfigs: []v1beta1.StorageConfig{ - v1beta1.StorageConfig{MountPath: "logs4"}, - v1beta1.StorageConfig{MountPath: "logs5"}, - v1beta1.StorageConfig{MountPath: "logs6"}, + {MountPath: "logs4"}, + {MountPath: "logs5"}, + {MountPath: "logs6"}, }, }, }, @@ -252,23 +252,23 @@ func TestCheckBrokerStorageRemoval(t *testing.T) { }, kafkaClusterSpecOld: v1beta1.KafkaClusterSpec{ BrokerConfigGroups: map[string]v1beta1.BrokerConfig{ - "default": v1beta1.BrokerConfig{ + "default": { StorageConfigs: []v1beta1.StorageConfig{ - v1beta1.StorageConfig{MountPath: "logs1"}, - v1beta1.StorageConfig{MountPath: "logs2"}, - v1beta1.StorageConfig{MountPath: "logs3"}, + {MountPath: "logs1"}, + {MountPath: "logs2"}, + {MountPath: "logs3"}, }, }, }, Brokers: []v1beta1.Broker{ - v1beta1.Broker{ + { Id: 1, BrokerConfigGroup: "default", BrokerConfig: &v1beta1.BrokerConfig{ StorageConfigs: []v1beta1.StorageConfig{ - v1beta1.StorageConfig{MountPath: "logs4"}, - v1beta1.StorageConfig{MountPath: "logs5"}, - v1beta1.StorageConfig{MountPath: "logs6"}, + {MountPath: "logs4"}, + {MountPath: "logs5"}, + {MountPath: "logs6"}, }, }, }, @@ -280,23 +280,23 @@ func TestCheckBrokerStorageRemoval(t *testing.T) { testName: "7", kafkaClusterSpecNew: v1beta1.KafkaClusterSpec{ BrokerConfigGroups: map[string]v1beta1.BrokerConfig{ - "default": v1beta1.BrokerConfig{ + "default": { StorageConfigs: []v1beta1.StorageConfig{ - v1beta1.StorageConfig{MountPath: "logs1"}, - v1beta1.StorageConfig{MountPath: "logs2"}, - v1beta1.StorageConfig{MountPath: "logs3"}, + {MountPath: "logs1"}, + {MountPath: "logs2"}, + {MountPath: "logs3"}, }, }, }, Brokers: []v1beta1.Broker{ - v1beta1.Broker{ + { Id: 1, BrokerConfigGroup: "default", BrokerConfig: &v1beta1.BrokerConfig{ StorageConfigs: []v1beta1.StorageConfig{ - v1beta1.StorageConfig{MountPath: "logs4"}, - v1beta1.StorageConfig{MountPath: "logs5"}, - v1beta1.StorageConfig{MountPath: "logs6"}, + {MountPath: "logs4"}, + {MountPath: "logs5"}, + {MountPath: "logs6"}, }, }, }, @@ -304,23 +304,23 @@ func TestCheckBrokerStorageRemoval(t *testing.T) { }, kafkaClusterSpecOld: v1beta1.KafkaClusterSpec{ BrokerConfigGroups: map[string]v1beta1.BrokerConfig{ - "default": v1beta1.BrokerConfig{ + "default": { StorageConfigs: []v1beta1.StorageConfig{ - v1beta1.StorageConfig{MountPath: "logs1"}, - v1beta1.StorageConfig{MountPath: "logs2"}, - v1beta1.StorageConfig{MountPath: "logs3"}, + {MountPath: "logs1"}, + {MountPath: "logs2"}, + {MountPath: "logs3"}, }, }, }, Brokers: []v1beta1.Broker{ - v1beta1.Broker{ + { Id: 1, BrokerConfigGroup: "default", BrokerConfig: &v1beta1.BrokerConfig{ StorageConfigs: []v1beta1.StorageConfig{ - v1beta1.StorageConfig{MountPath: "logs4"}, - v1beta1.StorageConfig{MountPath: "logs5"}, - v1beta1.StorageConfig{MountPath: "logs8"}, + {MountPath: "logs4"}, + {MountPath: "logs5"}, + {MountPath: "logs8"}, }, }, }, @@ -337,7 +337,5 @@ func TestCheckBrokerStorageRemoval(t *testing.T) { } else if res == nil && !testCase.isValid { t.Errorf("there should be storage removal, testName: %s", testCase.testName) } - } - } From 284cd29334a6aa7a42400c5f2d9263a61c8a3b0b Mon Sep 17 00:00:00 2001 From: marbarta Date: Fri, 3 Jun 2022 12:42:43 +0200 Subject: [PATCH 03/13] Refactor 1 --- pkg/resources/kafka/configmap.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/pkg/resources/kafka/configmap.go b/pkg/resources/kafka/configmap.go index 74b257dc0..6f19a72ba 100644 --- a/pkg/resources/kafka/configmap.go +++ b/pkg/resources/kafka/configmap.go @@ -38,7 +38,7 @@ import ( properties "github.com/banzaicloud/koperator/properties/pkg" ) -const logDirPropertyName = "log.dirs" +const brokerLogDirPropertyName = "log.dirs" func (r *Reconciler) getConfigProperties(bConfig *v1beta1.BrokerConfig, id int32, extListenerStatuses, intListenerStatuses, controllerIntListenerStatuses map[string]v1beta1.ListenerStatusList, @@ -120,7 +120,7 @@ func (r *Reconciler) getConfigProperties(bConfig *v1beta1.BrokerConfig, id int32 log.Error(err, "getting broker configmap from the Kubernetes API server resulted an error") } - mounthPathsOld := getMountPathsFromBrokerConfigMap(brokerConfigMapOld) + mounthPathsOld := getMountPathsFromBrokerConfigMap(&brokerConfigMapOld) mounthPathsNew := generateStorageConfig(bConfig.StorageConfigs) mounthPathsMerged, isMounthPathRemoved := mergeMounthPaths(mounthPathsOld, mounthPathsNew) @@ -129,7 +129,7 @@ func (r *Reconciler) getConfigProperties(bConfig *v1beta1.BrokerConfig, id int32 } if len(mounthPathsMerged) != 0 { - if err := config.Set("log.dirs", strings.Join(mounthPathsMerged, ",")); err != nil { + if err := config.Set(brokerLogDirPropertyName, strings.Join(mounthPathsMerged, ",")); err != nil { log.Error(err, "setting log.dirs in broker configuration resulted an error") } } @@ -230,13 +230,13 @@ func appendListenerConfigs(advertisedListenerConfig []string, id int32, return advertisedListenerConfig } -func getMountPathsFromBrokerConfigMap(configMap v1.ConfigMap) []string { +func getMountPathsFromBrokerConfigMap(configMap *v1.ConfigMap) []string { brokerConfig := configMap.Data[kafkautils.ConfigPropertyName] brokerConfigsLines := strings.Split(brokerConfig, "\n") var mountPaths string for i := range brokerConfigsLines { keyVal := strings.Split(brokerConfigsLines[i], "=") - if len(keyVal) == 2 && keyVal[0] == logDirPropertyName { + if len(keyVal) == 2 && keyVal[0] == brokerLogDirPropertyName { mountPaths = keyVal[1] } } From a033fd8d60dd9e9a807dcac39408ef487fe96272 Mon Sep 17 00:00:00 2001 From: marbarta Date: Tue, 7 Jun 2022 11:34:44 +0200 Subject: [PATCH 04/13] Fix review 1 --- pkg/resources/kafka/configmap.go | 53 ++++++++++++++------------- pkg/resources/kafka/configmap_test.go | 29 +++++++++++++++ pkg/webhook/kafkacluster_validator.go | 6 +-- 3 files changed, 60 insertions(+), 28 deletions(-) diff --git a/pkg/resources/kafka/configmap.go b/pkg/resources/kafka/configmap.go index 6f19a72ba..34fa60528 100644 --- a/pkg/resources/kafka/configmap.go +++ b/pkg/resources/kafka/configmap.go @@ -112,7 +112,7 @@ func (r *Reconciler) getConfigProperties(bConfig *v1beta1.BrokerConfig, id int32 log.Error(err, "setting broker.id in broker configuration resulted an error") } - // This logic prevents the removal of the mounthPath from the broker configmap + // This logic prevents the removal of the mountPath from the broker configmap brokerConfigMapName := fmt.Sprintf(brokerConfigTemplate+"-%d", r.KafkaCluster.Name, id) var brokerConfigMapOld v1.ConfigMap err = r.Client.Get(context.Background(), client.ObjectKey{Name: brokerConfigMapName, Namespace: r.KafkaCluster.GetNamespace()}, &brokerConfigMapOld) @@ -120,16 +120,16 @@ func (r *Reconciler) getConfigProperties(bConfig *v1beta1.BrokerConfig, id int32 log.Error(err, "getting broker configmap from the Kubernetes API server resulted an error") } - mounthPathsOld := getMountPathsFromBrokerConfigMap(&brokerConfigMapOld) - mounthPathsNew := generateStorageConfig(bConfig.StorageConfigs) - mounthPathsMerged, isMounthPathRemoved := mergeMounthPaths(mounthPathsOld, mounthPathsNew) + mountPathsOld := getMountPathsFromBrokerConfigMap(&brokerConfigMapOld) + mountPathsNew := generateStorageConfig(bConfig.StorageConfigs) + mountPathsMerged, isMountPathRemoved := mergeMountPaths(mountPathsOld, mountPathsNew) - if isMounthPathRemoved { + if isMountPathRemoved { log.Error(errors.New("removing storage from a running broker is not supported"), "", "brokerID", id) } - if len(mounthPathsMerged) != 0 { - if err := config.Set(brokerLogDirPropertyName, strings.Join(mounthPathsMerged, ",")); err != nil { + if len(mountPathsMerged) != 0 { + if err := config.Set(brokerLogDirPropertyName, strings.Join(mountPathsMerged, ",")); err != nil { log.Error(err, "setting log.dirs in broker configuration resulted an error") } } @@ -144,34 +144,34 @@ func (r *Reconciler) getConfigProperties(bConfig *v1beta1.BrokerConfig, id int32 return config } -// mergeMounthPaths is merges the new mounthPaths with the old. -// It returns the merged []string and a bool which true or false depend on mounthPathsNew contains or not all of the elements of the mounthPathsOld -func mergeMounthPaths(mounthPathsOld, mounthPathsNew []string) ([]string, bool) { - var mounthPathsMerged []string - mounthPathsMerged = append(mounthPathsMerged, mounthPathsOld...) - mounthPathsOldLen := len(mounthPathsOld) - // Merging the new mounthPaths with the old. If any of them is removed we can check the difference in the mounthPathsOldLen - for i := range mounthPathsNew { +// mergeMountPaths is merges the new mountPaths with the old. +// It returns the merged []string and a bool which true or false depend on mountPathsNew contains or not all of the elements of the mountPathsOld +func mergeMountPaths(mountPathsOld, mountPathsNew []string) ([]string, bool) { + var mountPathsMerged []string + mountPathsMerged = append(mountPathsMerged, mountPathsOld...) + mountPathsOldLen := len(mountPathsOld) + // Merging the new mountPaths with the old. If any of them is removed we can check the difference in the mountPathsOldLen + for i := range mountPathsNew { found := false - for k := range mounthPathsOld { - if mounthPathsOld[k] == mounthPathsNew[i] { + for k := range mountPathsOld { + if mountPathsOld[k] == mountPathsNew[i] { found = true - mounthPathsOldLen-- + mountPathsOldLen-- break } } - // if this is a new mounthPath then add it to te current + // if this is a new mountPath then add it to te current if !found { - mounthPathsMerged = append(mounthPathsMerged, mounthPathsNew[i]) + mountPathsMerged = append(mountPathsMerged, mountPathsNew[i]) } } - // If any of them is removed we can check the difference in the mounthPathsOldLen - isMounthPathRemoved := false - if mounthPathsOldLen > 0 { - isMounthPathRemoved = true + // If any of them is removed we can check the difference in the mountPathsOldLen + isMountPathRemoved := false + if mountPathsOldLen > 0 { + isMountPathRemoved = true } - return mounthPathsMerged, isMounthPathRemoved + return mountPathsMerged, isMountPathRemoved } func generateSuperUsers(users []string) (suStrings []string) { @@ -231,6 +231,9 @@ func appendListenerConfigs(advertisedListenerConfig []string, id int32, } func getMountPathsFromBrokerConfigMap(configMap *v1.ConfigMap) []string { + if configMap == nil { + return nil + } brokerConfig := configMap.Data[kafkautils.ConfigPropertyName] brokerConfigsLines := strings.Split(brokerConfig, "\n") var mountPaths string diff --git a/pkg/resources/kafka/configmap_test.go b/pkg/resources/kafka/configmap_test.go index 00bea33ca..049f57393 100644 --- a/pkg/resources/kafka/configmap_test.go +++ b/pkg/resources/kafka/configmap_test.go @@ -15,6 +15,7 @@ package kafka import ( + "reflect" "testing" "github.com/go-logr/logr" @@ -22,6 +23,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "github.com/banzaicloud/koperator/pkg/util" + kafkautils "github.com/banzaicloud/koperator/pkg/util/kafka" properties "github.com/banzaicloud/koperator/properties/pkg" @@ -29,6 +31,33 @@ import ( "github.com/banzaicloud/koperator/pkg/resources" ) +func TestGetMountPathsFromBrokerConfigMap(t *testing.T) { + tests := []struct { + testName string + brokerConfigMap v1.ConfigMap + expectedLogDirs []string + }{ + { + testName: "1", + brokerConfigMap: v1.ConfigMap{ + Data: map[string]string{kafkautils.ConfigPropertyName: `inter.broker.listener.name=INTERNAL\nlistener.security.protocol.map=INTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT +listeners=INTERNAL://:29092,CONTROLLER://:29093 +log.dirs=/kafka-logs3/kafka,/kafka-logs/kafka,/kafka-logs2/kafka,/kafka-logs4/kafka +metric.reporters=com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporter\noffsets.topic.replication.factor=2 +zookeeper.connect=zookeeper-server-client.zookeeper:2181/ +`}, + }, + expectedLogDirs: []string{"/kafka-logs3/kafka", "/kafka-logs/kafka", "/kafka-logs2/kafka", "/kafka-logs4/kafka"}, + }, + } + for _, test := range tests { + logDirs := getMountPathsFromBrokerConfigMap(&test.brokerConfigMap) + if !reflect.DeepEqual(logDirs, test.expectedLogDirs) { + t.Errorf("expected: %s, got: %s", test.expectedLogDirs, logDirs) + } + } +} + func TestGenerateBrokerConfig(t *testing.T) { //nolint funlen tests := []struct { testName string diff --git a/pkg/webhook/kafkacluster_validator.go b/pkg/webhook/kafkacluster_validator.go index 752bd16a8..8d118b6c8 100644 --- a/pkg/webhook/kafkacluster_validator.go +++ b/pkg/webhook/kafkacluster_validator.go @@ -37,7 +37,7 @@ func (s *webhookServer) validateKafkaCluster(kafkaClusterNew *banzaicloudv1beta1 return nil } log.Error(err, "couldn't get KafkaCluster custom resource") - return notAllowed("API failure while retrieving KafkaCluster CR, please try again", metav1.StatusReasonServiceUnavailable) + return notAllowed("API failure while retrieving KafkaCluster CR, please try again", metav1.StatusReasonInternalError) } res := checkBrokerStorageRemoval(&kafkaClusterSpecOld.Spec, &kafkaClusterNew.Spec) @@ -51,7 +51,7 @@ func (s *webhookServer) validateKafkaCluster(kafkaClusterNew *banzaicloudv1beta1 } } -// checkBrokerStorageRemoval checking is there any broker storage which has been removed. If yes, admission will be refused +// checkBrokerStorageRemoval checks if there is any broker storage which has been removed. If yes, admission will be rejected func checkBrokerStorageRemoval(kafkaClusterSpecOld, kafkaClusterSpecNew *banzaicloudv1beta1.KafkaClusterSpec) *admissionv1.AdmissionResponse { for _, brokerOld := range kafkaClusterSpecOld.Brokers { for _, brokerNew := range kafkaClusterSpecNew.Brokers { @@ -69,7 +69,7 @@ func checkBrokerStorageRemoval(kafkaClusterSpecOld, kafkaClusterSpecNew *banzaic } if !isStorageFound { log.Info(fmt.Sprintf("Not allowed to remove broker storage with mountPath: %s from brokerID: %v", storageConfigOld.MountPath, brokerOld.Id)) - return notAllowed(fmt.Sprintf("Removing storage from a runnng broker is not supported! (mounthPath: %s, brokerID: %v)", storageConfigOld.MountPath, brokerOld.Id), metav1.StatusReasonInvalid) + return notAllowed(fmt.Sprintf("Removing storage from a runnng broker is not supported! (mountPath: %s, brokerID: %v)", storageConfigOld.MountPath, brokerOld.Id), metav1.StatusReasonInvalid) } } } From 82f858070ab8e758f84274567a4cfb12877b740d Mon Sep 17 00:00:00 2001 From: marbarta Date: Tue, 7 Jun 2022 12:05:47 +0200 Subject: [PATCH 05/13] Add benchmark tests --- .../kafkaclusterbench_validator_test.go | 328 ++++++++++++++++++ 1 file changed, 328 insertions(+) create mode 100644 pkg/webhook/kafkaclusterbench_validator_test.go diff --git a/pkg/webhook/kafkaclusterbench_validator_test.go b/pkg/webhook/kafkaclusterbench_validator_test.go new file mode 100644 index 000000000..0f5af9330 --- /dev/null +++ b/pkg/webhook/kafkaclusterbench_validator_test.go @@ -0,0 +1,328 @@ +// Copyright © 2019 Banzai Cloud +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package webhook + +import ( + "fmt" + "testing" + + "github.com/banzaicloud/k8s-objectmatcher/patch" + "github.com/banzaicloud/koperator/api/v1beta1" + banzaicloudv1beta1 "github.com/banzaicloud/koperator/api/v1beta1" + admissionv1 "k8s.io/api/admission/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + runtimeClient "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/apiutil" + + json "github.com/json-iterator/go" +) + +func CalculatePatch(currentObject, modifiedObject runtimeClient.Object, scheme *runtime.Scheme, opts ...patch.CalculateOption) (*patch.PatchResult, error) { + + // // when we ignore fields when comparing old and new objects we should + // // ignore resourceVersion changes as well since that may be caused by changes to the ignored field + // opts = append(opts, ignoreResourceVersion()) + + // // ignore API server maintained 'managedFields' field as we are not interested in changes made to it + // opts = append(opts, ignoreManagedFields()) + var err error + + currentObject, err = convertTypedResourceToUnstructured(currentObject, scheme) + if err != nil { + return nil, err + } + + modifiedObject, err = convertTypedResourceToUnstructured(modifiedObject, scheme) + if err != nil { + return nil, err + } + + return patch.DefaultPatchMaker.Calculate(currentObject, modifiedObject) +} + +func convertTypedResourceToUnstructured(srcObj runtimeClient.Object, scheme *runtime.Scheme) (runtimeClient.Object, error) { + // runtime.DefaultUnstructuredConverter.ToUnstructured converts PeerAuthentication incorrectly + // thus we need to convert srcObj to map[string]interface{} first and than create Unstructured resource from it + src, err := json.Marshal(srcObj) + if err != nil { + return nil, err + } + + content := map[string]interface{}{} + err = json.Unmarshal(src, &content) + if err != nil { + return nil, err + } + + dst := &unstructured.Unstructured{Object: content} + gvk, err := apiutil.GVKForObject(srcObj, scheme) + if err != nil { + return nil, err + } + + dst.SetKind(gvk.Kind) + dst.SetAPIVersion(gvk.GroupVersion().String()) + + return dst, nil +} +func BenchmarkCheckBrokerStorageRemoval(t *testing.B) { + testCases := []struct { + testName string + kafkaClusterSpecNew v1beta1.KafkaClusterSpec + kafkaClusterSpecOld v1beta1.KafkaClusterSpec + isValid bool + }{ + { + testName: "1", + kafkaClusterSpecNew: v1beta1.KafkaClusterSpec{ + BrokerConfigGroups: map[string]v1beta1.BrokerConfig{ + "default": { + StorageConfigs: []v1beta1.StorageConfig{ + {MountPath: "logs1"}, + {MountPath: "logs2"}, + {MountPath: "logs3"}, + }, + }, + }, + Brokers: []v1beta1.Broker{ + { + Id: 1, + BrokerConfigGroup: "default", + }, + }, + }, + kafkaClusterSpecOld: v1beta1.KafkaClusterSpec{ + BrokerConfigGroups: map[string]v1beta1.BrokerConfig{ + "default": { + StorageConfigs: []v1beta1.StorageConfig{ + {MountPath: "logs1"}, + {MountPath: "logs2"}, + {MountPath: "logs3"}, + }, + }, + }, + Brokers: []v1beta1.Broker{ + { + Id: 1, + BrokerConfigGroup: "default", + }, + }, + }, + isValid: true, + }, + } + for k := 2; k < 1000; k++ { + testCases[0].kafkaClusterSpecOld.Brokers = append(testCases[0].kafkaClusterSpecOld.Brokers, v1beta1.Broker{Id: int32(k), BrokerConfigGroup: "default"}) + testCases[0].kafkaClusterSpecNew.Brokers = append(testCases[0].kafkaClusterSpecNew.Brokers, v1beta1.Broker{Id: int32(k), BrokerConfigGroup: "default"}) + } + for i := 0; i < t.N; i++ { + _ = checkBrokerStorageRemoval(&testCases[0].kafkaClusterSpecOld, &testCases[0].kafkaClusterSpecNew) + } + +} + +func checkBrokerStorageRemovalMap(kafkaClusterSpecOld, kafkaClusterSpecNew *banzaicloudv1beta1.KafkaClusterSpec) *admissionv1.AdmissionResponse { + idToBroker := make(map[int32]banzaicloudv1beta1.Broker) + + for _, broker := range kafkaClusterSpecNew.Brokers { + idToBroker[broker.Id] = broker + } + + for _, brokerOld := range kafkaClusterSpecOld.Brokers { + if brokerNew, ok := idToBroker[brokerOld.Id]; ok { + brokerConfigsOld, _ := brokerOld.GetBrokerConfig(*kafkaClusterSpecOld) + brokerConfigsNew, _ := brokerNew.GetBrokerConfig(*kafkaClusterSpecNew) + for _, storageConfigOld := range brokerConfigsOld.StorageConfigs { + isStorageFound := false + + for _, storageConfigNew := range brokerConfigsNew.StorageConfigs { + if storageConfigOld.MountPath == storageConfigNew.MountPath { + isStorageFound = true + break + } + } + if !isStorageFound { + log.Info(fmt.Sprintf("Not allowed to remove broker storage with mountPath: %s from brokerID: %v", storageConfigOld.MountPath, brokerOld.Id)) + return notAllowed(fmt.Sprintf("Removing storage from a runnng broker is not supported! (mountPath: %s, brokerID: %v)", storageConfigOld.MountPath, brokerOld.Id), metav1.StatusReasonInvalid) + } + } + } + } + return nil +} + +func BenchmarkCheckBrokerStorageRemovalMap(t *testing.B) { + testCases := []struct { + testName string + kafkaClusterSpecNew v1beta1.KafkaClusterSpec + kafkaClusterSpecOld v1beta1.KafkaClusterSpec + isValid bool + }{ + { + testName: "1", + kafkaClusterSpecNew: v1beta1.KafkaClusterSpec{ + BrokerConfigGroups: map[string]v1beta1.BrokerConfig{ + "default": { + StorageConfigs: []v1beta1.StorageConfig{ + {MountPath: "logs1"}, + {MountPath: "logs2"}, + {MountPath: "logs3"}, + }, + }, + }, + Brokers: []v1beta1.Broker{ + { + Id: 1, + BrokerConfigGroup: "default", + }, + }, + }, + kafkaClusterSpecOld: v1beta1.KafkaClusterSpec{ + BrokerConfigGroups: map[string]v1beta1.BrokerConfig{ + "default": { + StorageConfigs: []v1beta1.StorageConfig{ + {MountPath: "logs1"}, + {MountPath: "logs2"}, + {MountPath: "logs3"}, + }, + }, + }, + Brokers: []v1beta1.Broker{ + { + Id: 1, + BrokerConfigGroup: "default", + }, + }, + }, + isValid: true, + }, + } + for k := 2; k < 1000; k++ { + testCases[0].kafkaClusterSpecOld.Brokers = append(testCases[0].kafkaClusterSpecOld.Brokers, v1beta1.Broker{Id: int32(k), BrokerConfigGroup: "default"}) + testCases[0].kafkaClusterSpecNew.Brokers = append(testCases[0].kafkaClusterSpecNew.Brokers, v1beta1.Broker{Id: int32(k), BrokerConfigGroup: "default"}) + } + for i := 0; i < t.N; i++ { + _ = checkBrokerStorageRemovalMap(&testCases[0].kafkaClusterSpecOld, &testCases[0].kafkaClusterSpecNew) + } + +} + +func BenchmarkCalculate(b *testing.B) { + testCases2 := []struct { + testName string + kafkaClusterNew v1beta1.KafkaCluster + kafkaClusterOld v1beta1.KafkaCluster + isValid bool + }{ + { + testName: "1", + kafkaClusterNew: v1beta1.KafkaCluster{ + ObjectMeta: metav1.ObjectMeta{Name: "test-cluster", Namespace: "test-namespace"}, + Spec: v1beta1.KafkaClusterSpec{ + BrokerConfigGroups: map[string]v1beta1.BrokerConfig{ + "default": { + StorageConfigs: []v1beta1.StorageConfig{ + {MountPath: "logs1"}, + {MountPath: "logs2"}, + {MountPath: "logs3"}, + }, + }, + }, + Brokers: []v1beta1.Broker{ + { + Id: 1, + BrokerConfigGroup: "default", + }, + { + Id: 2, + BrokerConfigGroup: "default", + BrokerConfig: &v1beta1.BrokerConfig{ + StorageConfigs: []v1beta1.StorageConfig{ + {MountPath: "logs4"}, + {MountPath: "logs5"}, + {MountPath: "logs6"}, + }, + }, + }, + { + Id: 3, + BrokerConfigGroup: "default", + }, + }, + }, + Status: v1beta1.KafkaClusterStatus{ + BrokersState: map[string]v1beta1.BrokerState{}, + CruiseControlTopicStatus: "", + State: "", + RollingUpgrade: v1beta1.RollingUpgradeStatus{}, + AlertCount: 0, + ListenerStatuses: v1beta1.ListenerStatuses{ + InternalListeners: map[string]v1beta1.ListenerStatusList{}, + ExternalListeners: map[string]v1beta1.ListenerStatusList{}, + }, + }, + }, + kafkaClusterOld: v1beta1.KafkaCluster{ + ObjectMeta: metav1.ObjectMeta{Name: "test-cluster", Namespace: "test-namespace"}, + Spec: v1beta1.KafkaClusterSpec{ + BrokerConfigGroups: map[string]v1beta1.BrokerConfig{ + "default": { + StorageConfigs: []v1beta1.StorageConfig{ + {MountPath: "logs3"}, + {MountPath: "logs2"}, + {MountPath: "logs1"}, + }, + }, + }, + Brokers: []v1beta1.Broker{ + { + Id: 1, + BrokerConfigGroup: "default", + }, + { + Id: 2, + BrokerConfigGroup: "default", + }, + { + Id: 3, + BrokerConfigGroup: "default", + }, + }, + }, + Status: v1beta1.KafkaClusterStatus{ + BrokersState: map[string]v1beta1.BrokerState{}, + CruiseControlTopicStatus: "", + State: "", + RollingUpgrade: v1beta1.RollingUpgradeStatus{}, + AlertCount: 0, + ListenerStatuses: v1beta1.ListenerStatuses{ + InternalListeners: map[string]v1beta1.ListenerStatusList{}, + ExternalListeners: map[string]v1beta1.ListenerStatusList{}, + }, + }, + }, + isValid: true, + }, + } + scheme := runtime.NewScheme() + v1beta1.AddToScheme(scheme) + for i := 0; i < b.N; i++ { + _, _ = CalculatePatch(&testCases2[0].kafkaClusterOld, &testCases2[0].kafkaClusterNew, scheme) + + } + +} From c8e0681bc211181b27740a05e80998f163f60e71 Mon Sep 17 00:00:00 2001 From: marbarta Date: Wed, 8 Jun 2022 12:29:48 +0200 Subject: [PATCH 06/13] Refactor 1 --- pkg/webhook/kafkacluster_validator.go | 14 +-- pkg/webhook/kafkacluster_validator_test.go | 110 +++++++++++++++++++-- 2 files changed, 105 insertions(+), 19 deletions(-) diff --git a/pkg/webhook/kafkacluster_validator.go b/pkg/webhook/kafkacluster_validator.go index 8d118b6c8..cd4dc528d 100644 --- a/pkg/webhook/kafkacluster_validator.go +++ b/pkg/webhook/kafkacluster_validator.go @@ -40,15 +40,7 @@ func (s *webhookServer) validateKafkaCluster(kafkaClusterNew *banzaicloudv1beta1 return notAllowed("API failure while retrieving KafkaCluster CR, please try again", metav1.StatusReasonInternalError) } - res := checkBrokerStorageRemoval(&kafkaClusterSpecOld.Spec, &kafkaClusterNew.Spec) - if res != nil { - return res - } - - // everything looks a-okay - return &admissionv1.AdmissionResponse{ - Allowed: true, - } + return checkBrokerStorageRemoval(&kafkaClusterSpecOld.Spec, &kafkaClusterNew.Spec) } // checkBrokerStorageRemoval checks if there is any broker storage which has been removed. If yes, admission will be rejected @@ -75,5 +67,7 @@ func checkBrokerStorageRemoval(kafkaClusterSpecOld, kafkaClusterSpecNew *banzaic } } } - return nil + return &admissionv1.AdmissionResponse{ + Allowed: true, + } } diff --git a/pkg/webhook/kafkacluster_validator_test.go b/pkg/webhook/kafkacluster_validator_test.go index 9c9d879c3..e1562d32a 100644 --- a/pkg/webhook/kafkacluster_validator_test.go +++ b/pkg/webhook/kafkacluster_validator_test.go @@ -20,6 +20,7 @@ import ( "github.com/banzaicloud/koperator/api/v1beta1" ) +//nolint: funlen func TestCheckBrokerStorageRemoval(t *testing.T) { testCases := []struct { testName string @@ -28,7 +29,7 @@ func TestCheckBrokerStorageRemoval(t *testing.T) { isValid bool }{ { - testName: "1", + testName: "there is no storage remove", kafkaClusterSpecNew: v1beta1.KafkaClusterSpec{ BrokerConfigGroups: map[string]v1beta1.BrokerConfig{ "default": { @@ -66,7 +67,98 @@ func TestCheckBrokerStorageRemoval(t *testing.T) { isValid: true, }, { - testName: "2", + testName: "there is no storage remove but there is broker remove", + kafkaClusterSpecNew: v1beta1.KafkaClusterSpec{ + BrokerConfigGroups: map[string]v1beta1.BrokerConfig{ + "default": { + StorageConfigs: []v1beta1.StorageConfig{ + {MountPath: "logs1"}, + {MountPath: "logs2"}, + {MountPath: "logs3"}, + }, + }, + }, + Brokers: []v1beta1.Broker{ + { + Id: 1, + BrokerConfigGroup: "default", + }, + }, + }, + kafkaClusterSpecOld: v1beta1.KafkaClusterSpec{ + BrokerConfigGroups: map[string]v1beta1.BrokerConfig{ + "default": { + StorageConfigs: []v1beta1.StorageConfig{ + {MountPath: "logs1"}, + {MountPath: "logs2"}, + {MountPath: "logs3"}, + }, + }, + }, + Brokers: []v1beta1.Broker{ + { + Id: 1, + BrokerConfigGroup: "default", + }, + { + Id: 2, + BrokerConfigGroup: "default", + }, + }, + }, + isValid: true, + }, + { + testName: "when there is storage remove but there is broker remove also", + kafkaClusterSpecNew: v1beta1.KafkaClusterSpec{ + BrokerConfigGroups: map[string]v1beta1.BrokerConfig{ + "default": { + StorageConfigs: []v1beta1.StorageConfig{ + {MountPath: "logs1"}, + {MountPath: "logs2"}, + {MountPath: "logs3"}, + }, + }, + }, + Brokers: []v1beta1.Broker{ + { + Id: 1, + BrokerConfigGroup: "default", + }, + }, + }, + kafkaClusterSpecOld: v1beta1.KafkaClusterSpec{ + BrokerConfigGroups: map[string]v1beta1.BrokerConfig{ + "default": { + StorageConfigs: []v1beta1.StorageConfig{ + {MountPath: "logs1"}, + {MountPath: "logs2"}, + {MountPath: "logs3"}, + }, + }, + }, + Brokers: []v1beta1.Broker{ + { + Id: 1, + BrokerConfigGroup: "default", + }, + { + Id: 2, + BrokerConfigGroup: "default", + BrokerConfig: &v1beta1.BrokerConfig{ + StorageConfigs: []v1beta1.StorageConfig{ + {MountPath: "logs4"}, + {MountPath: "logs5"}, + {MountPath: "logs6"}, + }, + }, + }, + }, + }, + isValid: true, + }, + { + testName: "when there is storage remove from another brokerConfigBroup", kafkaClusterSpecNew: v1beta1.KafkaClusterSpec{ BrokerConfigGroups: map[string]v1beta1.BrokerConfig{ "default": { @@ -111,7 +203,7 @@ func TestCheckBrokerStorageRemoval(t *testing.T) { isValid: false, }, { - testName: "3", + testName: "when there is storage remove", kafkaClusterSpecNew: v1beta1.KafkaClusterSpec{ BrokerConfigGroups: map[string]v1beta1.BrokerConfig{ "default": { @@ -149,7 +241,7 @@ func TestCheckBrokerStorageRemoval(t *testing.T) { isValid: false, }, { - testName: "4", + testName: "when added a new one", kafkaClusterSpecNew: v1beta1.KafkaClusterSpec{ BrokerConfigGroups: map[string]v1beta1.BrokerConfig{ "default": { @@ -187,7 +279,7 @@ func TestCheckBrokerStorageRemoval(t *testing.T) { isValid: true, }, { - testName: "5", + testName: "when only sequence has changed", kafkaClusterSpecNew: v1beta1.KafkaClusterSpec{ BrokerConfigGroups: map[string]v1beta1.BrokerConfig{ "default": { @@ -225,7 +317,7 @@ func TestCheckBrokerStorageRemoval(t *testing.T) { isValid: true, }, { - testName: "6", + testName: "when there is perBroker storageconfigs and there is no storage remove", kafkaClusterSpecNew: v1beta1.KafkaClusterSpec{ BrokerConfigGroups: map[string]v1beta1.BrokerConfig{ "default": { @@ -277,7 +369,7 @@ func TestCheckBrokerStorageRemoval(t *testing.T) { isValid: true, }, { - testName: "7", + testName: "when there is perBroker config and added new and removed old", kafkaClusterSpecNew: v1beta1.KafkaClusterSpec{ BrokerConfigGroups: map[string]v1beta1.BrokerConfig{ "default": { @@ -332,9 +424,9 @@ func TestCheckBrokerStorageRemoval(t *testing.T) { for _, testCase := range testCases { res := checkBrokerStorageRemoval(&testCase.kafkaClusterSpecOld, &testCase.kafkaClusterSpecNew) - if res != nil && testCase.isValid { + if !res.Allowed && testCase.isValid { t.Errorf("Message: %s, testName: %s", res.Result.Message, testCase.testName) - } else if res == nil && !testCase.isValid { + } else if res.Allowed && !testCase.isValid { t.Errorf("there should be storage removal, testName: %s", testCase.testName) } } From 3c935f3219cd49cf12f3d8294b6810946ec75219 Mon Sep 17 00:00:00 2001 From: marbarta Date: Thu, 9 Jun 2022 09:46:27 +0200 Subject: [PATCH 07/13] Remove benchmark --- .../kafkaclusterbench_validator_test.go | 328 ------------------ 1 file changed, 328 deletions(-) delete mode 100644 pkg/webhook/kafkaclusterbench_validator_test.go diff --git a/pkg/webhook/kafkaclusterbench_validator_test.go b/pkg/webhook/kafkaclusterbench_validator_test.go deleted file mode 100644 index 0f5af9330..000000000 --- a/pkg/webhook/kafkaclusterbench_validator_test.go +++ /dev/null @@ -1,328 +0,0 @@ -// Copyright © 2019 Banzai Cloud -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package webhook - -import ( - "fmt" - "testing" - - "github.com/banzaicloud/k8s-objectmatcher/patch" - "github.com/banzaicloud/koperator/api/v1beta1" - banzaicloudv1beta1 "github.com/banzaicloud/koperator/api/v1beta1" - admissionv1 "k8s.io/api/admission/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - "k8s.io/apimachinery/pkg/runtime" - runtimeClient "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/client/apiutil" - - json "github.com/json-iterator/go" -) - -func CalculatePatch(currentObject, modifiedObject runtimeClient.Object, scheme *runtime.Scheme, opts ...patch.CalculateOption) (*patch.PatchResult, error) { - - // // when we ignore fields when comparing old and new objects we should - // // ignore resourceVersion changes as well since that may be caused by changes to the ignored field - // opts = append(opts, ignoreResourceVersion()) - - // // ignore API server maintained 'managedFields' field as we are not interested in changes made to it - // opts = append(opts, ignoreManagedFields()) - var err error - - currentObject, err = convertTypedResourceToUnstructured(currentObject, scheme) - if err != nil { - return nil, err - } - - modifiedObject, err = convertTypedResourceToUnstructured(modifiedObject, scheme) - if err != nil { - return nil, err - } - - return patch.DefaultPatchMaker.Calculate(currentObject, modifiedObject) -} - -func convertTypedResourceToUnstructured(srcObj runtimeClient.Object, scheme *runtime.Scheme) (runtimeClient.Object, error) { - // runtime.DefaultUnstructuredConverter.ToUnstructured converts PeerAuthentication incorrectly - // thus we need to convert srcObj to map[string]interface{} first and than create Unstructured resource from it - src, err := json.Marshal(srcObj) - if err != nil { - return nil, err - } - - content := map[string]interface{}{} - err = json.Unmarshal(src, &content) - if err != nil { - return nil, err - } - - dst := &unstructured.Unstructured{Object: content} - gvk, err := apiutil.GVKForObject(srcObj, scheme) - if err != nil { - return nil, err - } - - dst.SetKind(gvk.Kind) - dst.SetAPIVersion(gvk.GroupVersion().String()) - - return dst, nil -} -func BenchmarkCheckBrokerStorageRemoval(t *testing.B) { - testCases := []struct { - testName string - kafkaClusterSpecNew v1beta1.KafkaClusterSpec - kafkaClusterSpecOld v1beta1.KafkaClusterSpec - isValid bool - }{ - { - testName: "1", - kafkaClusterSpecNew: v1beta1.KafkaClusterSpec{ - BrokerConfigGroups: map[string]v1beta1.BrokerConfig{ - "default": { - StorageConfigs: []v1beta1.StorageConfig{ - {MountPath: "logs1"}, - {MountPath: "logs2"}, - {MountPath: "logs3"}, - }, - }, - }, - Brokers: []v1beta1.Broker{ - { - Id: 1, - BrokerConfigGroup: "default", - }, - }, - }, - kafkaClusterSpecOld: v1beta1.KafkaClusterSpec{ - BrokerConfigGroups: map[string]v1beta1.BrokerConfig{ - "default": { - StorageConfigs: []v1beta1.StorageConfig{ - {MountPath: "logs1"}, - {MountPath: "logs2"}, - {MountPath: "logs3"}, - }, - }, - }, - Brokers: []v1beta1.Broker{ - { - Id: 1, - BrokerConfigGroup: "default", - }, - }, - }, - isValid: true, - }, - } - for k := 2; k < 1000; k++ { - testCases[0].kafkaClusterSpecOld.Brokers = append(testCases[0].kafkaClusterSpecOld.Brokers, v1beta1.Broker{Id: int32(k), BrokerConfigGroup: "default"}) - testCases[0].kafkaClusterSpecNew.Brokers = append(testCases[0].kafkaClusterSpecNew.Brokers, v1beta1.Broker{Id: int32(k), BrokerConfigGroup: "default"}) - } - for i := 0; i < t.N; i++ { - _ = checkBrokerStorageRemoval(&testCases[0].kafkaClusterSpecOld, &testCases[0].kafkaClusterSpecNew) - } - -} - -func checkBrokerStorageRemovalMap(kafkaClusterSpecOld, kafkaClusterSpecNew *banzaicloudv1beta1.KafkaClusterSpec) *admissionv1.AdmissionResponse { - idToBroker := make(map[int32]banzaicloudv1beta1.Broker) - - for _, broker := range kafkaClusterSpecNew.Brokers { - idToBroker[broker.Id] = broker - } - - for _, brokerOld := range kafkaClusterSpecOld.Brokers { - if brokerNew, ok := idToBroker[brokerOld.Id]; ok { - brokerConfigsOld, _ := brokerOld.GetBrokerConfig(*kafkaClusterSpecOld) - brokerConfigsNew, _ := brokerNew.GetBrokerConfig(*kafkaClusterSpecNew) - for _, storageConfigOld := range brokerConfigsOld.StorageConfigs { - isStorageFound := false - - for _, storageConfigNew := range brokerConfigsNew.StorageConfigs { - if storageConfigOld.MountPath == storageConfigNew.MountPath { - isStorageFound = true - break - } - } - if !isStorageFound { - log.Info(fmt.Sprintf("Not allowed to remove broker storage with mountPath: %s from brokerID: %v", storageConfigOld.MountPath, brokerOld.Id)) - return notAllowed(fmt.Sprintf("Removing storage from a runnng broker is not supported! (mountPath: %s, brokerID: %v)", storageConfigOld.MountPath, brokerOld.Id), metav1.StatusReasonInvalid) - } - } - } - } - return nil -} - -func BenchmarkCheckBrokerStorageRemovalMap(t *testing.B) { - testCases := []struct { - testName string - kafkaClusterSpecNew v1beta1.KafkaClusterSpec - kafkaClusterSpecOld v1beta1.KafkaClusterSpec - isValid bool - }{ - { - testName: "1", - kafkaClusterSpecNew: v1beta1.KafkaClusterSpec{ - BrokerConfigGroups: map[string]v1beta1.BrokerConfig{ - "default": { - StorageConfigs: []v1beta1.StorageConfig{ - {MountPath: "logs1"}, - {MountPath: "logs2"}, - {MountPath: "logs3"}, - }, - }, - }, - Brokers: []v1beta1.Broker{ - { - Id: 1, - BrokerConfigGroup: "default", - }, - }, - }, - kafkaClusterSpecOld: v1beta1.KafkaClusterSpec{ - BrokerConfigGroups: map[string]v1beta1.BrokerConfig{ - "default": { - StorageConfigs: []v1beta1.StorageConfig{ - {MountPath: "logs1"}, - {MountPath: "logs2"}, - {MountPath: "logs3"}, - }, - }, - }, - Brokers: []v1beta1.Broker{ - { - Id: 1, - BrokerConfigGroup: "default", - }, - }, - }, - isValid: true, - }, - } - for k := 2; k < 1000; k++ { - testCases[0].kafkaClusterSpecOld.Brokers = append(testCases[0].kafkaClusterSpecOld.Brokers, v1beta1.Broker{Id: int32(k), BrokerConfigGroup: "default"}) - testCases[0].kafkaClusterSpecNew.Brokers = append(testCases[0].kafkaClusterSpecNew.Brokers, v1beta1.Broker{Id: int32(k), BrokerConfigGroup: "default"}) - } - for i := 0; i < t.N; i++ { - _ = checkBrokerStorageRemovalMap(&testCases[0].kafkaClusterSpecOld, &testCases[0].kafkaClusterSpecNew) - } - -} - -func BenchmarkCalculate(b *testing.B) { - testCases2 := []struct { - testName string - kafkaClusterNew v1beta1.KafkaCluster - kafkaClusterOld v1beta1.KafkaCluster - isValid bool - }{ - { - testName: "1", - kafkaClusterNew: v1beta1.KafkaCluster{ - ObjectMeta: metav1.ObjectMeta{Name: "test-cluster", Namespace: "test-namespace"}, - Spec: v1beta1.KafkaClusterSpec{ - BrokerConfigGroups: map[string]v1beta1.BrokerConfig{ - "default": { - StorageConfigs: []v1beta1.StorageConfig{ - {MountPath: "logs1"}, - {MountPath: "logs2"}, - {MountPath: "logs3"}, - }, - }, - }, - Brokers: []v1beta1.Broker{ - { - Id: 1, - BrokerConfigGroup: "default", - }, - { - Id: 2, - BrokerConfigGroup: "default", - BrokerConfig: &v1beta1.BrokerConfig{ - StorageConfigs: []v1beta1.StorageConfig{ - {MountPath: "logs4"}, - {MountPath: "logs5"}, - {MountPath: "logs6"}, - }, - }, - }, - { - Id: 3, - BrokerConfigGroup: "default", - }, - }, - }, - Status: v1beta1.KafkaClusterStatus{ - BrokersState: map[string]v1beta1.BrokerState{}, - CruiseControlTopicStatus: "", - State: "", - RollingUpgrade: v1beta1.RollingUpgradeStatus{}, - AlertCount: 0, - ListenerStatuses: v1beta1.ListenerStatuses{ - InternalListeners: map[string]v1beta1.ListenerStatusList{}, - ExternalListeners: map[string]v1beta1.ListenerStatusList{}, - }, - }, - }, - kafkaClusterOld: v1beta1.KafkaCluster{ - ObjectMeta: metav1.ObjectMeta{Name: "test-cluster", Namespace: "test-namespace"}, - Spec: v1beta1.KafkaClusterSpec{ - BrokerConfigGroups: map[string]v1beta1.BrokerConfig{ - "default": { - StorageConfigs: []v1beta1.StorageConfig{ - {MountPath: "logs3"}, - {MountPath: "logs2"}, - {MountPath: "logs1"}, - }, - }, - }, - Brokers: []v1beta1.Broker{ - { - Id: 1, - BrokerConfigGroup: "default", - }, - { - Id: 2, - BrokerConfigGroup: "default", - }, - { - Id: 3, - BrokerConfigGroup: "default", - }, - }, - }, - Status: v1beta1.KafkaClusterStatus{ - BrokersState: map[string]v1beta1.BrokerState{}, - CruiseControlTopicStatus: "", - State: "", - RollingUpgrade: v1beta1.RollingUpgradeStatus{}, - AlertCount: 0, - ListenerStatuses: v1beta1.ListenerStatuses{ - InternalListeners: map[string]v1beta1.ListenerStatusList{}, - ExternalListeners: map[string]v1beta1.ListenerStatusList{}, - }, - }, - }, - isValid: true, - }, - } - scheme := runtime.NewScheme() - v1beta1.AddToScheme(scheme) - for i := 0; i < b.N; i++ { - _, _ = CalculatePatch(&testCases2[0].kafkaClusterOld, &testCases2[0].kafkaClusterNew, scheme) - - } - -} From cbd7bac4e5a499a7713b1a90fcde58895db0629b Mon Sep 17 00:00:00 2001 From: marbarta Date: Thu, 9 Jun 2022 12:10:49 +0200 Subject: [PATCH 08/13] Fix empty mountPath --- pkg/resources/kafka/configmap.go | 9 +++- pkg/resources/kafka/configmap_test.go | 70 +++++++++++++++++++++++++-- 2 files changed, 74 insertions(+), 5 deletions(-) diff --git a/pkg/resources/kafka/configmap.go b/pkg/resources/kafka/configmap.go index 34fa60528..61419e473 100644 --- a/pkg/resources/kafka/configmap.go +++ b/pkg/resources/kafka/configmap.go @@ -125,7 +125,7 @@ func (r *Reconciler) getConfigProperties(bConfig *v1beta1.BrokerConfig, id int32 mountPathsMerged, isMountPathRemoved := mergeMountPaths(mountPathsOld, mountPathsNew) if isMountPathRemoved { - log.Error(errors.New("removing storage from a running broker is not supported"), "", "brokerID", id) + log.Error(errors.New("removing storage from a running broker is not supported"), "", "brokerID", id, "old mountPaths", mountPathsOld, "new mountPaths", mountPathsNew) } if len(mountPathsMerged) != 0 { @@ -160,7 +160,7 @@ func mergeMountPaths(mountPathsOld, mountPathsNew []string) ([]string, bool) { break } } - // if this is a new mountPath then add it to te current + // if this is a new mountPath then add it to the current if !found { mountPathsMerged = append(mountPathsMerged, mountPathsNew[i]) } @@ -243,6 +243,11 @@ func getMountPathsFromBrokerConfigMap(configMap *v1.ConfigMap) []string { mountPaths = keyVal[1] } } + + if mountPaths == "" { + return nil + } + return strings.Split(mountPaths, ",") } diff --git a/pkg/resources/kafka/configmap_test.go b/pkg/resources/kafka/configmap_test.go index 049f57393..6d7b6e8e7 100644 --- a/pkg/resources/kafka/configmap_test.go +++ b/pkg/resources/kafka/configmap_test.go @@ -38,7 +38,7 @@ func TestGetMountPathsFromBrokerConfigMap(t *testing.T) { expectedLogDirs []string }{ { - testName: "1", + testName: "simple case", brokerConfigMap: v1.ConfigMap{ Data: map[string]string{kafkautils.ConfigPropertyName: `inter.broker.listener.name=INTERNAL\nlistener.security.protocol.map=INTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT listeners=INTERNAL://:29092,CONTROLLER://:29093 @@ -49,11 +49,75 @@ zookeeper.connect=zookeeper-server-client.zookeeper:2181/ }, expectedLogDirs: []string{"/kafka-logs3/kafka", "/kafka-logs/kafka", "/kafka-logs2/kafka", "/kafka-logs4/kafka"}, }, + { + testName: "no old configs", + brokerConfigMap: v1.ConfigMap{ + Data: map[string]string{}, + }, + expectedLogDirs: []string{}, + }, } for _, test := range tests { logDirs := getMountPathsFromBrokerConfigMap(&test.brokerConfigMap) - if !reflect.DeepEqual(logDirs, test.expectedLogDirs) { - t.Errorf("expected: %s, got: %s", test.expectedLogDirs, logDirs) + if len(logDirs) != 0 && len(test.expectedLogDirs) != 0 { + if !reflect.DeepEqual(logDirs, test.expectedLogDirs) { + t.Errorf("expected: %s, got: %s", test.expectedLogDirs, logDirs) + } + } + } +} + +func TestMergeMountPaths(t *testing.T) { + tests := []struct { + testName string + mountPathNew []string + mountPathOld []string + expectedMergedMountPath []string + expectedRemoved bool + }{ + { + testName: "no old mountPath", + mountPathNew: []string{"/kafka-logs3/kafka", "/kafka-logs/kafka", "/kafka-logs2/kafka", "/kafka-logs4/kafka"}, + mountPathOld: []string{}, + expectedMergedMountPath: []string{"/kafka-logs3/kafka", "/kafka-logs/kafka", "/kafka-logs2/kafka", "/kafka-logs4/kafka"}, + expectedRemoved: false, + }, + { + testName: "same", + mountPathNew: []string{"/kafka-logs3/kafka", "/kafka-logs/kafka", "/kafka-logs2/kafka", "/kafka-logs4/kafka"}, + mountPathOld: []string{"/kafka-logs3/kafka", "/kafka-logs/kafka", "/kafka-logs2/kafka", "/kafka-logs4/kafka"}, + expectedMergedMountPath: []string{"/kafka-logs3/kafka", "/kafka-logs/kafka", "/kafka-logs2/kafka", "/kafka-logs4/kafka"}, + expectedRemoved: false, + }, + { + testName: "changed order", + mountPathNew: []string{"/kafka-logs/kafka", "/kafka-logs3/kafka", "/kafka-logs2/kafka", "/kafka-logs4/kafka"}, + mountPathOld: []string{"/kafka-logs3/kafka", "/kafka-logs/kafka", "/kafka-logs2/kafka", "/kafka-logs4/kafka"}, + expectedMergedMountPath: []string{"/kafka-logs3/kafka", "/kafka-logs/kafka", "/kafka-logs2/kafka", "/kafka-logs4/kafka"}, + expectedRemoved: false, + }, + { + testName: "removed one", + mountPathNew: []string{"/kafka-logs/kafka", "/kafka-logs2/kafka", "/kafka-logs4/kafka"}, + mountPathOld: []string{"/kafka-logs3/kafka", "/kafka-logs/kafka", "/kafka-logs2/kafka", "/kafka-logs4/kafka"}, + expectedMergedMountPath: []string{"/kafka-logs3/kafka", "/kafka-logs/kafka", "/kafka-logs2/kafka", "/kafka-logs4/kafka"}, + expectedRemoved: true, + }, + { + testName: "removed all", + mountPathNew: []string{}, + mountPathOld: []string{"/kafka-logs3/kafka", "/kafka-logs/kafka", "/kafka-logs2/kafka", "/kafka-logs4/kafka"}, + expectedMergedMountPath: []string{"/kafka-logs3/kafka", "/kafka-logs/kafka", "/kafka-logs2/kafka", "/kafka-logs4/kafka"}, + expectedRemoved: true, + }, + } + for _, test := range tests { + mergedMountPaths, isRemoved := mergeMountPaths(test.mountPathOld, test.mountPathNew) + if !reflect.DeepEqual(mergedMountPaths, test.expectedMergedMountPath) { + t.Errorf("expected: %s, got: %s", test.expectedMergedMountPath, mergedMountPaths) + } + if isRemoved != test.expectedRemoved { + t.Errorf("expectedRemoved: %v, got: %v", test.expectedRemoved, isRemoved) } } } From fc1da37d5bf695ff3a814a6c0d3749338130eca4 Mon Sep 17 00:00:00 2001 From: marbarta Date: Thu, 9 Jun 2022 14:44:26 +0200 Subject: [PATCH 09/13] Fix test --- pkg/resources/kafka/configmap_test.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/pkg/resources/kafka/configmap_test.go b/pkg/resources/kafka/configmap_test.go index 6d7b6e8e7..de8745869 100644 --- a/pkg/resources/kafka/configmap_test.go +++ b/pkg/resources/kafka/configmap_test.go @@ -25,10 +25,11 @@ import ( "github.com/banzaicloud/koperator/pkg/util" kafkautils "github.com/banzaicloud/koperator/pkg/util/kafka" - properties "github.com/banzaicloud/koperator/properties/pkg" - "github.com/banzaicloud/koperator/api/v1beta1" "github.com/banzaicloud/koperator/pkg/resources" + mocks "github.com/banzaicloud/koperator/pkg/resources/kafka/mocks" + properties "github.com/banzaicloud/koperator/properties/pkg" + "github.com/stretchr/testify/mock" ) func TestGetMountPathsFromBrokerConfigMap(t *testing.T) { @@ -469,8 +470,11 @@ zookeeper.connect=example.zk:2181/`, test := test t.Run(test.testName, func(t *testing.T) { + mockClient := new(mocks.Client) + mockClient.On("Get", mock.Anything, mock.Anything, mock.Anything).Return(nil) r := Reconciler{ Reconciler: resources.Reconciler{ + Client: mockClient, KafkaCluster: &v1beta1.KafkaCluster{ ObjectMeta: metav1.ObjectMeta{ Name: "kafka", From f995d0e5470d239cf6cba1d5f73b84f348e7f5d3 Mon Sep 17 00:00:00 2001 From: marbarta Date: Thu, 9 Jun 2022 17:23:39 +0200 Subject: [PATCH 10/13] Lint fix --- pkg/resources/kafka/configmap_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/resources/kafka/configmap_test.go b/pkg/resources/kafka/configmap_test.go index de8745869..6d797b486 100644 --- a/pkg/resources/kafka/configmap_test.go +++ b/pkg/resources/kafka/configmap_test.go @@ -25,11 +25,12 @@ import ( "github.com/banzaicloud/koperator/pkg/util" kafkautils "github.com/banzaicloud/koperator/pkg/util/kafka" + "github.com/stretchr/testify/mock" + "github.com/banzaicloud/koperator/api/v1beta1" "github.com/banzaicloud/koperator/pkg/resources" mocks "github.com/banzaicloud/koperator/pkg/resources/kafka/mocks" properties "github.com/banzaicloud/koperator/properties/pkg" - "github.com/stretchr/testify/mock" ) func TestGetMountPathsFromBrokerConfigMap(t *testing.T) { From 6847f8036b6caa013afbd1f253fc53eac8d6b7ff Mon Sep 17 00:00:00 2001 From: marbarta Date: Fri, 10 Jun 2022 11:55:50 +0200 Subject: [PATCH 11/13] Refactor error msg --- pkg/resources/kafka/configmap.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/resources/kafka/configmap.go b/pkg/resources/kafka/configmap.go index 61419e473..fc800cf33 100644 --- a/pkg/resources/kafka/configmap.go +++ b/pkg/resources/kafka/configmap.go @@ -125,7 +125,7 @@ func (r *Reconciler) getConfigProperties(bConfig *v1beta1.BrokerConfig, id int32 mountPathsMerged, isMountPathRemoved := mergeMountPaths(mountPathsOld, mountPathsNew) if isMountPathRemoved { - log.Error(errors.New("removing storage from a running broker is not supported"), "", "brokerID", id, "old mountPaths", mountPathsOld, "new mountPaths", mountPathsNew) + log.Error(errors.New("removed storage is found in the KafkaCluster CR"), "removing storage from broker is not supported", "brokerID", id, "mountPaths", mountPathsOld, "mountPaths in kafkaCluster CR ", mountPathsNew) } if len(mountPathsMerged) != 0 { From 02c0d304d24d63ebe06f34919eb19253bcda999b Mon Sep 17 00:00:00 2001 From: marbarta Date: Tue, 14 Jun 2022 13:12:33 +0200 Subject: [PATCH 12/13] Fix Stoader suggestions 1 --- .../operator-deployment-with-webhook.yaml | 6 +++++ pkg/resources/kafka/configmap.go | 27 +++++++++---------- pkg/resources/kafka/configmap_test.go | 5 +++- pkg/webhook/request.go | 4 ++- 4 files changed, 26 insertions(+), 16 deletions(-) diff --git a/charts/kafka-operator/templates/operator-deployment-with-webhook.yaml b/charts/kafka-operator/templates/operator-deployment-with-webhook.yaml index ea6d46d11..b2fe83385 100644 --- a/charts/kafka-operator/templates/operator-deployment-with-webhook.yaml +++ b/charts/kafka-operator/templates/operator-deployment-with-webhook.yaml @@ -201,7 +201,13 @@ spec: - name: POD_NAMESPACE valueFrom: fieldRef: + apiVersion: v1 fieldPath: metadata.namespace + - name: SERVICE_ACCOUNT + valueFrom: + fieldRef: + apiVersion: v1 + fieldPath: spec.serviceAccountName {{- if .Values.additionalEnv }} {{ toYaml .Values.additionalEnv | nindent 12 }} {{- end }} diff --git a/pkg/resources/kafka/configmap.go b/pkg/resources/kafka/configmap.go index fc800cf33..cc6a50f47 100644 --- a/pkg/resources/kafka/configmap.go +++ b/pkg/resources/kafka/configmap.go @@ -120,7 +120,10 @@ func (r *Reconciler) getConfigProperties(bConfig *v1beta1.BrokerConfig, id int32 log.Error(err, "getting broker configmap from the Kubernetes API server resulted an error") } - mountPathsOld := getMountPathsFromBrokerConfigMap(&brokerConfigMapOld) + mountPathsOld, err := getMountPathsFromBrokerConfigMap(&brokerConfigMapOld) + if err != nil { + log.Error(err, "could not get mounthPaths from broker configmap", "brokerID", id) + } mountPathsNew := generateStorageConfig(bConfig.StorageConfigs) mountPathsMerged, isMountPathRemoved := mergeMountPaths(mountPathsOld, mountPathsNew) @@ -230,25 +233,21 @@ func appendListenerConfigs(advertisedListenerConfig []string, id int32, return advertisedListenerConfig } -func getMountPathsFromBrokerConfigMap(configMap *v1.ConfigMap) []string { +func getMountPathsFromBrokerConfigMap(configMap *v1.ConfigMap) ([]string, error) { if configMap == nil { - return nil + return nil, nil } brokerConfig := configMap.Data[kafkautils.ConfigPropertyName] - brokerConfigsLines := strings.Split(brokerConfig, "\n") - var mountPaths string - for i := range brokerConfigsLines { - keyVal := strings.Split(brokerConfigsLines[i], "=") - if len(keyVal) == 2 && keyVal[0] == brokerLogDirPropertyName { - mountPaths = keyVal[1] - } + brokerConfigProperties, err := properties.NewFromString(brokerConfig) + if err != nil { + return nil, err } - - if mountPaths == "" { - return nil + brokerLogDirProperty, found := brokerConfigProperties.Get(brokerLogDirPropertyName) + if !found || brokerLogDirProperty.Value() == "" { + return nil, nil } - return strings.Split(mountPaths, ",") + return strings.Split(brokerLogDirProperty.Value(), ","), nil } func generateStorageConfig(sConfig []v1beta1.StorageConfig) []string { diff --git a/pkg/resources/kafka/configmap_test.go b/pkg/resources/kafka/configmap_test.go index 6d797b486..cdde85605 100644 --- a/pkg/resources/kafka/configmap_test.go +++ b/pkg/resources/kafka/configmap_test.go @@ -60,7 +60,10 @@ zookeeper.connect=zookeeper-server-client.zookeeper:2181/ }, } for _, test := range tests { - logDirs := getMountPathsFromBrokerConfigMap(&test.brokerConfigMap) + logDirs, err := getMountPathsFromBrokerConfigMap(&test.brokerConfigMap) + if err != nil { + t.Errorf("err should be nil, got: %v", err) + } if len(logDirs) != 0 && len(test.expectedLogDirs) != 0 { if !reflect.DeepEqual(logDirs, test.expectedLogDirs) { t.Errorf("expected: %s, got: %s", test.expectedLogDirs, logDirs) diff --git a/pkg/webhook/request.go b/pkg/webhook/request.go index a3f20031a..11cfd6f96 100644 --- a/pkg/webhook/request.go +++ b/pkg/webhook/request.go @@ -20,6 +20,7 @@ import ( "fmt" "io/ioutil" "net/http" + "os" "reflect" admissionv1 "k8s.io/api/admission/v1" @@ -57,7 +58,8 @@ func (s *webhookServer) validate(ar *admissionv1.AdmissionReview) *admissionv1.A return s.validateKafkaTopic(&topic) case kafkaCluster: // when the operator modifies the resource we dont do any validation - if req.UserInfo.Username == "system:serviceaccount:kafka:kafka-operator" { + operatorUsername := fmt.Sprintf("system:serviceaccount:%v:%v", os.Getenv("POD_NAMESPACE"), os.Getenv("SERVICE_ACCOUNT")) + if req.UserInfo.Username == operatorUsername { return &admissionv1.AdmissionResponse{ Allowed: true, } From 83a64542134a5a81a58ae29b4f8804c81dccffd6 Mon Sep 17 00:00:00 2001 From: marbarta Date: Wed, 15 Jun 2022 13:10:28 +0200 Subject: [PATCH 13/13] Fix Stoader suggestions 2 --- main.go | 7 ++++++- pkg/resources/kafka/configmap.go | 19 +++++++------------ pkg/resources/kafka/configmap_test.go | 10 +++++----- pkg/webhook/request.go | 3 +-- pkg/webhook/server.go | 20 ++++++++++++-------- pkg/webhook/server_test.go | 4 ++-- 6 files changed, 33 insertions(+), 30 deletions(-) diff --git a/main.go b/main.go index 49acf44dc..ef4821221 100644 --- a/main.go +++ b/main.go @@ -77,6 +77,8 @@ func init() { func main() { var ( + podNamespace string + podServiceAccount string namespaces string metricsAddr string enableLeaderElection bool @@ -104,6 +106,9 @@ func main() { flag.IntVar(&maxKafkaTopicConcurrentReconciles, "max-kafka-topic-concurrent-reconciles", 10, "Define max amount of concurrent KafkaTopic reconciles") flag.Parse() + podNamespace = os.Getenv("POD_NAMESPACE") + podServiceAccount = os.Getenv("SERVICE_ACCOUNT") + ctrl.SetLogger(util.CreateLogger(verboseLogging, developmentLogging)) // adding indexers to KafkaTopics so that the KafkaTopic admission webhooks could work @@ -192,7 +197,7 @@ func main() { } if !webhookDisabled { - webhook.SetupServerHandlers(mgr, webhookCertDir) + webhook.SetupServerHandlers(mgr, webhookCertDir, podNamespace, podServiceAccount) } // +kubebuilder:scaffold:builder diff --git a/pkg/resources/kafka/configmap.go b/pkg/resources/kafka/configmap.go index cc6a50f47..8f634478f 100644 --- a/pkg/resources/kafka/configmap.go +++ b/pkg/resources/kafka/configmap.go @@ -151,28 +151,23 @@ func (r *Reconciler) getConfigProperties(bConfig *v1beta1.BrokerConfig, id int32 // It returns the merged []string and a bool which true or false depend on mountPathsNew contains or not all of the elements of the mountPathsOld func mergeMountPaths(mountPathsOld, mountPathsNew []string) ([]string, bool) { var mountPathsMerged []string - mountPathsMerged = append(mountPathsMerged, mountPathsOld...) - mountPathsOldLen := len(mountPathsOld) + mountPathsMerged = append(mountPathsMerged, mountPathsNew...) + isMountPathRemoved := false // Merging the new mountPaths with the old. If any of them is removed we can check the difference in the mountPathsOldLen - for i := range mountPathsNew { + for i := range mountPathsOld { found := false - for k := range mountPathsOld { - if mountPathsOld[k] == mountPathsNew[i] { + for k := range mountPathsNew { + if mountPathsOld[i] == mountPathsNew[k] { found = true - mountPathsOldLen-- break } } // if this is a new mountPath then add it to the current if !found { - mountPathsMerged = append(mountPathsMerged, mountPathsNew[i]) + mountPathsMerged = append(mountPathsMerged, mountPathsOld[i]) + isMountPathRemoved = true } } - // If any of them is removed we can check the difference in the mountPathsOldLen - isMountPathRemoved := false - if mountPathsOldLen > 0 { - isMountPathRemoved = true - } return mountPathsMerged, isMountPathRemoved } diff --git a/pkg/resources/kafka/configmap_test.go b/pkg/resources/kafka/configmap_test.go index cdde85605..24c99709e 100644 --- a/pkg/resources/kafka/configmap_test.go +++ b/pkg/resources/kafka/configmap_test.go @@ -98,14 +98,14 @@ func TestMergeMountPaths(t *testing.T) { testName: "changed order", mountPathNew: []string{"/kafka-logs/kafka", "/kafka-logs3/kafka", "/kafka-logs2/kafka", "/kafka-logs4/kafka"}, mountPathOld: []string{"/kafka-logs3/kafka", "/kafka-logs/kafka", "/kafka-logs2/kafka", "/kafka-logs4/kafka"}, - expectedMergedMountPath: []string{"/kafka-logs3/kafka", "/kafka-logs/kafka", "/kafka-logs2/kafka", "/kafka-logs4/kafka"}, + expectedMergedMountPath: []string{"/kafka-logs/kafka", "/kafka-logs3/kafka", "/kafka-logs2/kafka", "/kafka-logs4/kafka"}, expectedRemoved: false, }, { testName: "removed one", mountPathNew: []string{"/kafka-logs/kafka", "/kafka-logs2/kafka", "/kafka-logs4/kafka"}, mountPathOld: []string{"/kafka-logs3/kafka", "/kafka-logs/kafka", "/kafka-logs2/kafka", "/kafka-logs4/kafka"}, - expectedMergedMountPath: []string{"/kafka-logs3/kafka", "/kafka-logs/kafka", "/kafka-logs2/kafka", "/kafka-logs4/kafka"}, + expectedMergedMountPath: []string{"/kafka-logs/kafka", "/kafka-logs2/kafka", "/kafka-logs4/kafka", "/kafka-logs3/kafka"}, expectedRemoved: true, }, { @@ -119,10 +119,10 @@ func TestMergeMountPaths(t *testing.T) { for _, test := range tests { mergedMountPaths, isRemoved := mergeMountPaths(test.mountPathOld, test.mountPathNew) if !reflect.DeepEqual(mergedMountPaths, test.expectedMergedMountPath) { - t.Errorf("expected: %s, got: %s", test.expectedMergedMountPath, mergedMountPaths) + t.Errorf("testName: %s, expected: %s, got: %s", test.testName, test.expectedMergedMountPath, mergedMountPaths) } if isRemoved != test.expectedRemoved { - t.Errorf("expectedRemoved: %v, got: %v", test.expectedRemoved, isRemoved) + t.Errorf("testName: %s, expectedRemoved: %v, got: %v", test.testName, test.expectedRemoved, isRemoved) } } } @@ -475,7 +475,7 @@ zookeeper.connect=example.zk:2181/`, t.Run(test.testName, func(t *testing.T) { mockClient := new(mocks.Client) - mockClient.On("Get", mock.Anything, mock.Anything, mock.Anything).Return(nil) + mockClient.On("Get", mock.Anything, mock.Anything, mock.AnythingOfType("*v1.ConfigMap")).Return(nil) r := Reconciler{ Reconciler: resources.Reconciler{ Client: mockClient, diff --git a/pkg/webhook/request.go b/pkg/webhook/request.go index 11cfd6f96..ee5235192 100644 --- a/pkg/webhook/request.go +++ b/pkg/webhook/request.go @@ -20,7 +20,6 @@ import ( "fmt" "io/ioutil" "net/http" - "os" "reflect" admissionv1 "k8s.io/api/admission/v1" @@ -58,7 +57,7 @@ func (s *webhookServer) validate(ar *admissionv1.AdmissionReview) *admissionv1.A return s.validateKafkaTopic(&topic) case kafkaCluster: // when the operator modifies the resource we dont do any validation - operatorUsername := fmt.Sprintf("system:serviceaccount:%v:%v", os.Getenv("POD_NAMESPACE"), os.Getenv("SERVICE_ACCOUNT")) + operatorUsername := fmt.Sprintf("system:serviceaccount:%v:%v", s.podNamespace, s.podServiceAccount) if req.UserInfo.Username == operatorUsername { return &admissionv1.AdmissionResponse{ Allowed: true, diff --git a/pkg/webhook/server.go b/pkg/webhook/server.go index dadbb50db..3ef7ca259 100644 --- a/pkg/webhook/server.go +++ b/pkg/webhook/server.go @@ -32,34 +32,38 @@ var ( ) type webhookServer struct { - client client.Client - scheme *runtime.Scheme - deserializer runtime.Decoder + client client.Client + scheme *runtime.Scheme + deserializer runtime.Decoder + podNamespace string + podServiceAccount string // For mocking - use kafkaclient.NewMockFromCluster newKafkaFromCluster func(client.Client, *v1beta1.KafkaCluster) (kafkaclient.KafkaClient, func(), error) } -func newWebHookServer(client client.Client, scheme *runtime.Scheme) *webhookServer { +func newWebHookServer(client client.Client, scheme *runtime.Scheme, podNamespace, podServiceAccount string) *webhookServer { return &webhookServer{ client: client, scheme: scheme, deserializer: serializer.NewCodecFactory(scheme).UniversalDeserializer(), + podNamespace: podNamespace, + podServiceAccount: podServiceAccount, newKafkaFromCluster: kafkaclient.NewFromCluster, } } -func newWebhookServerMux(client client.Client, scheme *runtime.Scheme) *http.ServeMux { +func newWebhookServerMux(client client.Client, scheme *runtime.Scheme, podNamespace, podServiceAccount string) *http.ServeMux { mux := http.NewServeMux() - webhookServer := newWebHookServer(client, scheme) + webhookServer := newWebHookServer(client, scheme, podNamespace, podServiceAccount) mux.HandleFunc("/validate", webhookServer.serve) return mux } // SetupServerHandlers sets up a webhook with the manager -func SetupServerHandlers(mgr ctrl.Manager, certDir string) { +func SetupServerHandlers(mgr ctrl.Manager, certDir, podNamespace, podServiceAccount string) { server := mgr.GetWebhookServer() server.CertDir = certDir - mux := newWebhookServerMux(mgr.GetClient(), mgr.GetScheme()) + mux := newWebhookServerMux(mgr.GetClient(), mgr.GetScheme(), podNamespace, podServiceAccount) server.Register("/validate", mux) } diff --git a/pkg/webhook/server_test.go b/pkg/webhook/server_test.go index d428a1eb5..63ddb1e2c 100644 --- a/pkg/webhook/server_test.go +++ b/pkg/webhook/server_test.go @@ -57,14 +57,14 @@ func newMockServerWithClients(c client.Client, kafkaClientProvider func(client c } func TestNewServer(t *testing.T) { - server := newWebHookServer(fake.NewClientBuilder().Build(), scheme.Scheme) + server := newWebHookServer(fake.NewClientBuilder().Build(), scheme.Scheme, "", "") if reflect.ValueOf(server.newKafkaFromCluster).Pointer() != reflect.ValueOf(kafkaclient.NewFromCluster).Pointer() { t.Error("Expected newKafkaFromCluster ptr -> kafkaclient.NewFromCluster") } } func TestNewServerMux(t *testing.T) { - mux := newWebhookServerMux(fake.NewClientBuilder().Build(), scheme.Scheme) + mux := newWebhookServerMux(fake.NewClientBuilder().Build(), scheme.Scheme, "", "") var buf bytes.Buffer req, _ := http.NewRequest("POST", "/validate", &buf) if _, pattern := mux.Handler(req); pattern == "" {