From df4f50dd3dfdd469b0800004aaea79bf9b8ff230 Mon Sep 17 00:00:00 2001 From: marbarta Date: Tue, 31 Jan 2023 09:39:59 -0800 Subject: [PATCH 1/5] Fix Kafka super.users override --- pkg/resources/kafka/configmap.go | 39 +++++++ pkg/resources/kafka/configmap_test.go | 146 +++++++++++++++++++++++++- 2 files changed, 181 insertions(+), 4 deletions(-) diff --git a/pkg/resources/kafka/configmap.go b/pkg/resources/kafka/configmap.go index 067a12588..ae222af99 100644 --- a/pkg/resources/kafka/configmap.go +++ b/pkg/resources/kafka/configmap.go @@ -346,6 +346,42 @@ func generateListenerSSLConfig(config *properties.Properties, name string, sslCl } } +// mergeSuperUsersPropertyValue merges the super.users property value from the source into the target properties +func mergeSuperUsersPropertyValue(source *properties.Properties, target *properties.Properties) { + sourceVal, foundSource := source.Get("super.users") + if !foundSource || sourceVal.IsEmpty() { + return + } + targetVal, foundTarget := target.Get("super.users") + if !foundTarget || targetVal.IsEmpty() { + return + } + + sourceSuperUsers := strings.Split(sourceVal.Value(), ";") + targetSuperUsers := strings.Split(targetVal.Value(), ";") + + inserted := false + for _, sourceSuperUser := range sourceSuperUsers { + found := false + for _, targetSuperUser := range targetSuperUsers { + if sourceSuperUser == targetSuperUser { + found = true + break + } + } + + if !found { + inserted = true + targetSuperUsers = append(targetSuperUsers, sourceSuperUser) + } + } + + if inserted { + mergedSuperUsers := strings.Join(targetSuperUsers, ";") + target.Set("super.users", mergedSuperUsers) + } +} + func (r Reconciler) generateBrokerConfig(id int32, brokerConfig *v1beta1.BrokerConfig, extListenerStatuses, intListenerStatuses, controllerIntListenerStatuses map[string]v1beta1.ListenerStatusList, serverPasses map[string]string, clientPass string, superUsers []string, log logr.Logger) string { @@ -356,6 +392,9 @@ func (r Reconciler) generateBrokerConfig(id int32, brokerConfig *v1beta1.BrokerC // Merge operator generated configuration to the final one if opGenConf != nil { + // When there is super.users configuration in the readOnly config we merge its value into the Koperator generated one. + // thus finalBrokerConfig contains the merged super.user value. + mergeSuperUsersPropertyValue(finalBrokerConfig, opGenConf) finalBrokerConfig.Merge(opGenConf) } diff --git a/pkg/resources/kafka/configmap_test.go b/pkg/resources/kafka/configmap_test.go index 40784ee55..f1f49c787 100644 --- a/pkg/resources/kafka/configmap_test.go +++ b/pkg/resources/kafka/configmap_test.go @@ -16,13 +16,13 @@ package kafka import ( "reflect" + "strings" "testing" "github.com/go-logr/logr" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "github.com/banzaicloud/koperator/pkg/util" kafkautils "github.com/banzaicloud/koperator/pkg/util/kafka" "github.com/stretchr/testify/mock" @@ -464,6 +464,146 @@ listener.security.protocol.map=INTERNAL:SSL listeners=INTERNAL://:9092 metric.reporters=com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporter super.users=User:CN=kafka-headless.kafka.svc.cluster.local +zookeeper.connect=example.zk:2181/`, + }, + { + testName: "configWithSSL_with_readOnly-superUsers1", + readOnlyConfig: `super.users=User:CN=custom-superuser1;User:CN=custom-superuser2`, + zkAddresses: []string{"example.zk:2181"}, + zkPath: ``, + kubernetesClusterDomain: ``, + clusterWideConfig: ``, + perBrokerConfig: ``, + perBrokerReadOnlyConfig: ``, + advertisedListenerAddress: `kafka-0.kafka.svc.cluster.local:9092`, + listenerType: "ssl", + sslClientAuth: "none", + expectedConfig: `advertised.listeners=INTERNAL://kafka-0.kafka.svc.cluster.local:9092 +broker.id=0 +cruise.control.metrics.reporter.bootstrap.servers=kafka-all-broker.kafka.svc.cluster.local:9092 +cruise.control.metrics.reporter.kubernetes.mode=true +cruise.control.metrics.reporter.security.protocol=SSL +cruise.control.metrics.reporter.ssl.keystore.location=/var/run/secrets/java.io/keystores/client/keystore.jks +cruise.control.metrics.reporter.ssl.keystore.password=keystore_clientpassword123 +cruise.control.metrics.reporter.ssl.truststore.location=/var/run/secrets/java.io/keystores/client/truststore.jks +cruise.control.metrics.reporter.ssl.truststore.password=keystore_clientpassword123 +inter.broker.listener.name=INTERNAL +listener.name.internal.ssl.client.auth=none +listener.name.internal.ssl.keystore.location=/var/run/secrets/java.io/keystores/server/internal/keystore.jks +listener.name.internal.ssl.keystore.password=keystore_serverpassword123 +listener.name.internal.ssl.keystore.type=JKS +listener.name.internal.ssl.truststore.location=/var/run/secrets/java.io/keystores/server/internal/truststore.jks +listener.name.internal.ssl.truststore.password=keystore_serverpassword123 +listener.name.internal.ssl.truststore.type=JKS +listener.security.protocol.map=INTERNAL:SSL +listeners=INTERNAL://:9092 +metric.reporters=com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporter +super.users=User:CN=kafka-headless.kafka.svc.cluster.local;User:CN=custom-superuser1;User:CN=custom-superuser2 +zookeeper.connect=example.zk:2181/`, + }, + { + testName: "configWithSSL_with_readOnly-superUsers2", + readOnlyConfig: `super.users=User:CN=kafka-headless.kafka.svc.cluster.local`, + zkAddresses: []string{"example.zk:2181"}, + zkPath: ``, + kubernetesClusterDomain: ``, + clusterWideConfig: ``, + perBrokerConfig: ``, + perBrokerReadOnlyConfig: ``, + advertisedListenerAddress: `kafka-0.kafka.svc.cluster.local:9092`, + listenerType: "ssl", + sslClientAuth: "none", + expectedConfig: `advertised.listeners=INTERNAL://kafka-0.kafka.svc.cluster.local:9092 +broker.id=0 +cruise.control.metrics.reporter.bootstrap.servers=kafka-all-broker.kafka.svc.cluster.local:9092 +cruise.control.metrics.reporter.kubernetes.mode=true +cruise.control.metrics.reporter.security.protocol=SSL +cruise.control.metrics.reporter.ssl.keystore.location=/var/run/secrets/java.io/keystores/client/keystore.jks +cruise.control.metrics.reporter.ssl.keystore.password=keystore_clientpassword123 +cruise.control.metrics.reporter.ssl.truststore.location=/var/run/secrets/java.io/keystores/client/truststore.jks +cruise.control.metrics.reporter.ssl.truststore.password=keystore_clientpassword123 +inter.broker.listener.name=INTERNAL +listener.name.internal.ssl.client.auth=none +listener.name.internal.ssl.keystore.location=/var/run/secrets/java.io/keystores/server/internal/keystore.jks +listener.name.internal.ssl.keystore.password=keystore_serverpassword123 +listener.name.internal.ssl.keystore.type=JKS +listener.name.internal.ssl.truststore.location=/var/run/secrets/java.io/keystores/server/internal/truststore.jks +listener.name.internal.ssl.truststore.password=keystore_serverpassword123 +listener.name.internal.ssl.truststore.type=JKS +listener.security.protocol.map=INTERNAL:SSL +listeners=INTERNAL://:9092 +metric.reporters=com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporter +super.users=User:CN=kafka-headless.kafka.svc.cluster.local +zookeeper.connect=example.zk:2181/`, + }, + { + testName: "configWithSSL_with_readOnly-superUsers3", + readOnlyConfig: `super.users=User:CN=custom-superuser1;User:CN=custom-superuser2;User:CN=kafka-headless.kafka.svc.cluster.local`, + zkAddresses: []string{"example.zk:2181"}, + zkPath: ``, + kubernetesClusterDomain: ``, + clusterWideConfig: ``, + perBrokerConfig: ``, + perBrokerReadOnlyConfig: ``, + advertisedListenerAddress: `kafka-0.kafka.svc.cluster.local:9092`, + listenerType: "ssl", + sslClientAuth: "none", + expectedConfig: `advertised.listeners=INTERNAL://kafka-0.kafka.svc.cluster.local:9092 +broker.id=0 +cruise.control.metrics.reporter.bootstrap.servers=kafka-all-broker.kafka.svc.cluster.local:9092 +cruise.control.metrics.reporter.kubernetes.mode=true +cruise.control.metrics.reporter.security.protocol=SSL +cruise.control.metrics.reporter.ssl.keystore.location=/var/run/secrets/java.io/keystores/client/keystore.jks +cruise.control.metrics.reporter.ssl.keystore.password=keystore_clientpassword123 +cruise.control.metrics.reporter.ssl.truststore.location=/var/run/secrets/java.io/keystores/client/truststore.jks +cruise.control.metrics.reporter.ssl.truststore.password=keystore_clientpassword123 +inter.broker.listener.name=INTERNAL +listener.name.internal.ssl.client.auth=none +listener.name.internal.ssl.keystore.location=/var/run/secrets/java.io/keystores/server/internal/keystore.jks +listener.name.internal.ssl.keystore.password=keystore_serverpassword123 +listener.name.internal.ssl.keystore.type=JKS +listener.name.internal.ssl.truststore.location=/var/run/secrets/java.io/keystores/server/internal/truststore.jks +listener.name.internal.ssl.truststore.password=keystore_serverpassword123 +listener.name.internal.ssl.truststore.type=JKS +listener.security.protocol.map=INTERNAL:SSL +listeners=INTERNAL://:9092 +metric.reporters=com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporter +super.users=User:CN=kafka-headless.kafka.svc.cluster.local;User:CN=custom-superuser1;User:CN=custom-superuser2 +zookeeper.connect=example.zk:2181/`, + }, + { + testName: "configWithSSL_with_readOnly-superUsers4", + readOnlyConfig: `super.users=`, + zkAddresses: []string{"example.zk:2181"}, + zkPath: ``, + kubernetesClusterDomain: ``, + clusterWideConfig: ``, + perBrokerConfig: ``, + perBrokerReadOnlyConfig: ``, + advertisedListenerAddress: `kafka-0.kafka.svc.cluster.local:9092`, + listenerType: "ssl", + sslClientAuth: "none", + expectedConfig: `advertised.listeners=INTERNAL://kafka-0.kafka.svc.cluster.local:9092 +broker.id=0 +cruise.control.metrics.reporter.bootstrap.servers=kafka-all-broker.kafka.svc.cluster.local:9092 +cruise.control.metrics.reporter.kubernetes.mode=true +cruise.control.metrics.reporter.security.protocol=SSL +cruise.control.metrics.reporter.ssl.keystore.location=/var/run/secrets/java.io/keystores/client/keystore.jks +cruise.control.metrics.reporter.ssl.keystore.password=keystore_clientpassword123 +cruise.control.metrics.reporter.ssl.truststore.location=/var/run/secrets/java.io/keystores/client/truststore.jks +cruise.control.metrics.reporter.ssl.truststore.password=keystore_clientpassword123 +inter.broker.listener.name=INTERNAL +listener.name.internal.ssl.client.auth=none +listener.name.internal.ssl.keystore.location=/var/run/secrets/java.io/keystores/server/internal/keystore.jks +listener.name.internal.ssl.keystore.password=keystore_serverpassword123 +listener.name.internal.ssl.keystore.type=JKS +listener.name.internal.ssl.truststore.location=/var/run/secrets/java.io/keystores/server/internal/truststore.jks +listener.name.internal.ssl.truststore.password=keystore_serverpassword123 +listener.name.internal.ssl.truststore.type=JKS +listener.security.protocol.map=INTERNAL:SSL +listeners=INTERNAL://:9092 +metric.reporters=com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporter +super.users=User:CN=kafka-headless.kafka.svc.cluster.local zookeeper.connect=example.zk:2181/`, }, } @@ -537,9 +677,7 @@ zookeeper.connect=example.zk:2181/`, superUsers []string ) - sslConfigTestNames := []string{"configWithSSL_SSLClientAuth_not_provided", "configWithSSL_SSLClientAuth_required", "configWithSSL_SSLClientAuth_requested", - "configWithSSL_SSLClientAuth_none"} - if util.StringSliceContains(sslConfigTestNames, test.testName) { + if strings.Contains(test.testName, "configWithSSL") { serverPasses = map[string]string{"internal": "keystore_serverpassword123"} clientPass = "keystore_clientpassword123" superUsers = []string{"CN=kafka-headless.kafka.svc.cluster.local"} From e9d0708dc0728a4dfc235d31de63730597ae88ea Mon Sep 17 00:00:00 2001 From: marbarta Date: Tue, 31 Jan 2023 09:55:37 -0800 Subject: [PATCH 2/5] Fix comment --- pkg/resources/kafka/configmap.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/resources/kafka/configmap.go b/pkg/resources/kafka/configmap.go index ae222af99..1b7d3b3d9 100644 --- a/pkg/resources/kafka/configmap.go +++ b/pkg/resources/kafka/configmap.go @@ -392,8 +392,8 @@ func (r Reconciler) generateBrokerConfig(id int32, brokerConfig *v1beta1.BrokerC // Merge operator generated configuration to the final one if opGenConf != nil { - // When there is super.users configuration in the readOnly config we merge its value into the Koperator generated one. - // thus finalBrokerConfig contains the merged super.user value. + // When there is custom super.users configuration we merge its value into the Koperator generated one + // thus finalBrokerConfig contains merged super.users value. mergeSuperUsersPropertyValue(finalBrokerConfig, opGenConf) finalBrokerConfig.Merge(opGenConf) } From 15edcbd0cba75d88c70323cb7c2d90209e6d2755 Mon Sep 17 00:00:00 2001 From: marbarta Date: Tue, 31 Jan 2023 10:34:24 -0800 Subject: [PATCH 3/5] Fix lint --- pkg/resources/kafka/configmap.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pkg/resources/kafka/configmap.go b/pkg/resources/kafka/configmap.go index 1b7d3b3d9..03b03efcb 100644 --- a/pkg/resources/kafka/configmap.go +++ b/pkg/resources/kafka/configmap.go @@ -378,6 +378,8 @@ func mergeSuperUsersPropertyValue(source *properties.Properties, target *propert if inserted { mergedSuperUsers := strings.Join(targetSuperUsers, ";") + // Setting string value for a property is not going to run into error, also we don't have error handling at upper levels + //nolint:errcheck target.Set("super.users", mergedSuperUsers) } } From 3cc50f81062e975b7730c60c3bebc744b7f9f72b Mon Sep 17 00:00:00 2001 From: marbarta Date: Mon, 13 Feb 2023 05:20:34 -0800 Subject: [PATCH 4/5] Fix review suggestion 1 --- pkg/resources/kafka/configmap.go | 26 +++++++++++++++----------- 1 file changed, 15 insertions(+), 11 deletions(-) diff --git a/pkg/resources/kafka/configmap.go b/pkg/resources/kafka/configmap.go index 03b03efcb..80639ec6f 100644 --- a/pkg/resources/kafka/configmap.go +++ b/pkg/resources/kafka/configmap.go @@ -346,15 +346,16 @@ func generateListenerSSLConfig(config *properties.Properties, name string, sslCl } } -// mergeSuperUsersPropertyValue merges the super.users property value from the source into the target properties -func mergeSuperUsersPropertyValue(source *properties.Properties, target *properties.Properties) { +// mergeSuperUsersPropertyValue merges the target and source super.users property value, and returns it as string. +// It returns empty string when there were no updates or any of the super.users property value was empty. +func mergeSuperUsersPropertyValue(source *properties.Properties, target *properties.Properties) string { sourceVal, foundSource := source.Get("super.users") if !foundSource || sourceVal.IsEmpty() { - return + return "" } targetVal, foundTarget := target.Get("super.users") if !foundTarget || targetVal.IsEmpty() { - return + return "" } sourceSuperUsers := strings.Split(sourceVal.Value(), ";") @@ -377,11 +378,10 @@ func mergeSuperUsersPropertyValue(source *properties.Properties, target *propert } if inserted { - mergedSuperUsers := strings.Join(targetSuperUsers, ";") - // Setting string value for a property is not going to run into error, also we don't have error handling at upper levels - //nolint:errcheck - target.Set("super.users", mergedSuperUsers) + return strings.Join(targetSuperUsers, ";") } + + return "" } func (r Reconciler) generateBrokerConfig(id int32, brokerConfig *v1beta1.BrokerConfig, extListenerStatuses, @@ -394,9 +394,13 @@ func (r Reconciler) generateBrokerConfig(id int32, brokerConfig *v1beta1.BrokerC // Merge operator generated configuration to the final one if opGenConf != nil { - // When there is custom super.users configuration we merge its value into the Koperator generated one - // thus finalBrokerConfig contains merged super.users value. - mergeSuperUsersPropertyValue(finalBrokerConfig, opGenConf) + // When there is custom super.users configuration we merge its value with the Koperator generated one + // to avoid overwrite that happens when the finalBrokerConfig.Merge(opGenConf) is called. + if suMerged := mergeSuperUsersPropertyValue(finalBrokerConfig, opGenConf); suMerged != "" { + // Setting string value for a property is not going to run into error, also we don't return error in this function + //nolint:errcheck + opGenConf.Set("super.users", suMerged) + } finalBrokerConfig.Merge(opGenConf) } From 9e4ab660455663e60679fa1f4ff9f921353be4eb Mon Sep 17 00:00:00 2001 From: marbarta Date: Wed, 15 Feb 2023 09:35:37 +0100 Subject: [PATCH 5/5] Fix empty space --- pkg/resources/kafka/configmap.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/resources/kafka/configmap.go b/pkg/resources/kafka/configmap.go index 80639ec6f..cbd260c5f 100644 --- a/pkg/resources/kafka/configmap.go +++ b/pkg/resources/kafka/configmap.go @@ -347,7 +347,7 @@ func generateListenerSSLConfig(config *properties.Properties, name string, sslCl } // mergeSuperUsersPropertyValue merges the target and source super.users property value, and returns it as string. -// It returns empty string when there were no updates or any of the super.users property value was empty. +// It returns empty string when there were no updates or any of the super.users property value was empty. func mergeSuperUsersPropertyValue(source *properties.Properties, target *properties.Properties) string { sourceVal, foundSource := source.Get("super.users") if !foundSource || sourceVal.IsEmpty() {