diff --git a/controllers/tests/kafkacluster_controller_cruisecontrol_test.go b/controllers/tests/kafkacluster_controller_cruisecontrol_test.go index 1db4c9ad2..72a56d267 100644 --- a/controllers/tests/kafkacluster_controller_cruisecontrol_test.go +++ b/controllers/tests/kafkacluster_controller_cruisecontrol_test.go @@ -16,6 +16,8 @@ package tests import ( "context" + "crypto/sha256" + "encoding/hex" "fmt" . "github.com/onsi/gomega" @@ -331,4 +333,33 @@ func expectCruiseControlDeployment(kafkaCluster *v1beta1.KafkaCluster) { // Check if the securityContext values are propagated correctly Expect(deployment.Spec.Template.Spec.SecurityContext.RunAsNonRoot).To(Equal(util.BoolPointer(false))) Expect(container.SecurityContext.Privileged).To(Equal(util.BoolPointer(true))) + + // Check config checksum annotations + configMap := &corev1.ConfigMap{} + Eventually(func() error { + return k8sClient.Get(context.Background(), types.NamespacedName{ + Namespace: kafkaCluster.Namespace, + Name: fmt.Sprintf("%s-cruisecontrol-config", kafkaCluster.Name), + }, configMap) + }).Should(Succeed()) + + userProvidedCCPodAnnotations := kafkaCluster.Spec.CruiseControlConfig.GetCruiseControlAnnotations() + ccConfigHash := sha256.Sum256([]byte(configMap.Data["cruisecontrol.properties"])) + ccClusterConfigHash := sha256.Sum256([]byte(configMap.Data["clusterConfigs.json"])) + ccLogConfigHash := sha256.Sum256([]byte(configMap.Data["log4j.properties"])) + ccBrokerCapacityConfigHash := sha256.Sum256([]byte(configMap.Data["capacity.json"])) + expectedPodAnnotations := util.MergeAnnotations( + userProvidedCCPodAnnotations, + map[string]string{ + "cruiseControlConfig.json": hex.EncodeToString(ccConfigHash[:]), + "cruiseControlClusterConfig.json": hex.EncodeToString(ccClusterConfigHash[:]), + "cruiseControlLogConfig.json": hex.EncodeToString(ccLogConfigHash[:]), + }, + ) + + if capacityConfigType, ok := userProvidedCCPodAnnotations["cruise-control.banzaicloud.com/broker-capacity-config"]; !ok || capacityConfigType == "static" { + expectedPodAnnotations["cruiseControlCapacity.json"] = hex.EncodeToString(ccBrokerCapacityConfigHash[:]) + } + + Expect(deployment.Spec.Template.GetAnnotations()).To(Equal(expectedPodAnnotations)) } diff --git a/controllers/tests/kafkacluster_controller_test.go b/controllers/tests/kafkacluster_controller_test.go index 4436d02c4..3307d529f 100644 --- a/controllers/tests/kafkacluster_controller_test.go +++ b/controllers/tests/kafkacluster_controller_test.go @@ -58,7 +58,8 @@ var _ = Describe("KafkaCluster", func() { Partitions: 7, ReplicationFactor: 2, }, - Config: "some.config=value", + Config: "some.config=value", + CruiseControlAnnotations: map[string]string{"test-cc-ann": "test-cc-ann-val"}, } kafkaCluster.Spec.ReadOnlyConfig = "" // Set some Kafka pod and container related SecurityContext values diff --git a/pkg/resources/cruisecontrol/configmap.go b/pkg/resources/cruisecontrol/configmap.go index 2c1fcde71..7923ed3db 100644 --- a/pkg/resources/cruisecontrol/configmap.go +++ b/pkg/resources/cruisecontrol/configmap.go @@ -138,7 +138,7 @@ type JBODInvariantCapacityConfig struct { Capacities []interface{} `json:"brokerCapacities"` } -// generateCapacityConfig generates a CC capacity config with default values or returns the manually overridden value if it exists +// GenerateCapacityConfig generates a CC capacity config with default values or returns the manually overridden value if it exists func GenerateCapacityConfig(kafkaCluster *v1beta1.KafkaCluster, log logr.Logger, config *corev1.ConfigMap) (string, error) { var err error diff --git a/pkg/resources/cruisecontrol/cruisecontrol.go b/pkg/resources/cruisecontrol/cruisecontrol.go index 2aca8ab00..4bb7d090b 100644 --- a/pkg/resources/cruisecontrol/cruisecontrol.go +++ b/pkg/resources/cruisecontrol/cruisecontrol.go @@ -129,7 +129,10 @@ func (r *Reconciler) Reconcile(log logr.Logger) error { return errors.WrapIfWithDetails(err, "failed to reconcile resource", "resource", o.GetObjectKind().GroupVersionKind()) } - podAnnotations := GeneratePodAnnotations(r.KafkaCluster, capacityConfig) + podAnnotations := GeneratePodAnnotations( + r.KafkaCluster.Spec.CruiseControlConfig.GetCruiseControlAnnotations(), + o.(*corev1.ConfigMap).Data, + ) o = r.deployment(podAnnotations) err = k8sutil.Reconcile(log, r.Client, o, r.KafkaCluster) diff --git a/pkg/resources/cruisecontrol/deployment.go b/pkg/resources/cruisecontrol/deployment.go index c2421bf63..5d34a49dc 100644 --- a/pkg/resources/cruisecontrol/deployment.go +++ b/pkg/resources/cruisecontrol/deployment.go @@ -182,23 +182,22 @@ fi`}, } } -func GeneratePodAnnotations(kafkaCluster *v1beta1.KafkaCluster, capacityConfig string) map[string]string { - ccAnnotationsFromCR := kafkaCluster.Spec.CruiseControlConfig.GetCruiseControlAnnotations() - hashedCruiseControlConfigJson := sha256.Sum256([]byte(kafkaCluster.Spec.CruiseControlConfig.Config)) - hashedCruiseControlClusterConfigJson := sha256.Sum256([]byte(kafkaCluster.Spec.CruiseControlConfig.ClusterConfig)) - hashedCruiseControlLogConfigJson := sha256.Sum256([]byte(kafkaCluster.Spec.CruiseControlConfig.GetCCLog4jConfig())) +func GeneratePodAnnotations(cruiseControlAnnotations, cruiseControlConfig map[string]string) map[string]string { + hashedCruiseControlConfigJson := sha256.Sum256([]byte(cruiseControlConfig["cruisecontrol.properties"])) + hashedCruiseControlClusterConfigJson := sha256.Sum256([]byte(cruiseControlConfig["clusterConfigs.json"])) + hashedCruiseControlLogConfigJson := sha256.Sum256([]byte(cruiseControlConfig["log4j.properties"])) annotations := []map[string]string{ + cruiseControlAnnotations, { "cruiseControlConfig.json": hex.EncodeToString(hashedCruiseControlConfigJson[:]), "cruiseControlClusterConfig.json": hex.EncodeToString(hashedCruiseControlClusterConfigJson[:]), "cruiseControlLogConfig.json": hex.EncodeToString(hashedCruiseControlLogConfigJson[:]), }, - ccAnnotationsFromCR, } - if value, ok := ccAnnotationsFromCR[capacityConfigAnnotation]; !ok || + if value, ok := cruiseControlAnnotations[capacityConfigAnnotation]; !ok || value == string(staticCapacityConfig) { - hashedCruiseControlCapacityJson := sha256.Sum256([]byte(capacityConfig)) + hashedCruiseControlCapacityJson := sha256.Sum256([]byte(cruiseControlConfig["capacity.json"])) annotations = append(annotations, map[string]string{"cruiseControlCapacity.json": hex.EncodeToString(hashedCruiseControlCapacityJson[:])}) }