diff --git a/pkg/resources/kafka/configmap.go b/pkg/resources/kafka/configmap.go index 067a12588..cbd260c5f 100644 --- a/pkg/resources/kafka/configmap.go +++ b/pkg/resources/kafka/configmap.go @@ -346,6 +346,44 @@ func generateListenerSSLConfig(config *properties.Properties, name string, sslCl } } +// mergeSuperUsersPropertyValue merges the target and source super.users property value, and returns it as string. +// It returns empty string when there were no updates or any of the super.users property value was empty. +func mergeSuperUsersPropertyValue(source *properties.Properties, target *properties.Properties) string { + sourceVal, foundSource := source.Get("super.users") + if !foundSource || sourceVal.IsEmpty() { + return "" + } + targetVal, foundTarget := target.Get("super.users") + if !foundTarget || targetVal.IsEmpty() { + return "" + } + + sourceSuperUsers := strings.Split(sourceVal.Value(), ";") + targetSuperUsers := strings.Split(targetVal.Value(), ";") + + inserted := false + for _, sourceSuperUser := range sourceSuperUsers { + found := false + for _, targetSuperUser := range targetSuperUsers { + if sourceSuperUser == targetSuperUser { + found = true + break + } + } + + if !found { + inserted = true + targetSuperUsers = append(targetSuperUsers, sourceSuperUser) + } + } + + if inserted { + return strings.Join(targetSuperUsers, ";") + } + + return "" +} + func (r Reconciler) generateBrokerConfig(id int32, brokerConfig *v1beta1.BrokerConfig, extListenerStatuses, intListenerStatuses, controllerIntListenerStatuses map[string]v1beta1.ListenerStatusList, serverPasses map[string]string, clientPass string, superUsers []string, log logr.Logger) string { @@ -356,6 +394,13 @@ func (r Reconciler) generateBrokerConfig(id int32, brokerConfig *v1beta1.BrokerC // Merge operator generated configuration to the final one if opGenConf != nil { + // When there is custom super.users configuration we merge its value with the Koperator generated one + // to avoid overwrite that happens when the finalBrokerConfig.Merge(opGenConf) is called. + if suMerged := mergeSuperUsersPropertyValue(finalBrokerConfig, opGenConf); suMerged != "" { + // Setting string value for a property is not going to run into error, also we don't return error in this function + //nolint:errcheck + opGenConf.Set("super.users", suMerged) + } finalBrokerConfig.Merge(opGenConf) } diff --git a/pkg/resources/kafka/configmap_test.go b/pkg/resources/kafka/configmap_test.go index 40784ee55..f1f49c787 100644 --- a/pkg/resources/kafka/configmap_test.go +++ b/pkg/resources/kafka/configmap_test.go @@ -16,13 +16,13 @@ package kafka import ( "reflect" + "strings" "testing" "github.com/go-logr/logr" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "github.com/banzaicloud/koperator/pkg/util" kafkautils "github.com/banzaicloud/koperator/pkg/util/kafka" "github.com/stretchr/testify/mock" @@ -464,6 +464,146 @@ listener.security.protocol.map=INTERNAL:SSL listeners=INTERNAL://:9092 metric.reporters=com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporter super.users=User:CN=kafka-headless.kafka.svc.cluster.local +zookeeper.connect=example.zk:2181/`, + }, + { + testName: "configWithSSL_with_readOnly-superUsers1", + readOnlyConfig: `super.users=User:CN=custom-superuser1;User:CN=custom-superuser2`, + zkAddresses: []string{"example.zk:2181"}, + zkPath: ``, + kubernetesClusterDomain: ``, + clusterWideConfig: ``, + perBrokerConfig: ``, + perBrokerReadOnlyConfig: ``, + advertisedListenerAddress: `kafka-0.kafka.svc.cluster.local:9092`, + listenerType: "ssl", + sslClientAuth: "none", + expectedConfig: `advertised.listeners=INTERNAL://kafka-0.kafka.svc.cluster.local:9092 +broker.id=0 +cruise.control.metrics.reporter.bootstrap.servers=kafka-all-broker.kafka.svc.cluster.local:9092 +cruise.control.metrics.reporter.kubernetes.mode=true +cruise.control.metrics.reporter.security.protocol=SSL +cruise.control.metrics.reporter.ssl.keystore.location=/var/run/secrets/java.io/keystores/client/keystore.jks +cruise.control.metrics.reporter.ssl.keystore.password=keystore_clientpassword123 +cruise.control.metrics.reporter.ssl.truststore.location=/var/run/secrets/java.io/keystores/client/truststore.jks +cruise.control.metrics.reporter.ssl.truststore.password=keystore_clientpassword123 +inter.broker.listener.name=INTERNAL +listener.name.internal.ssl.client.auth=none +listener.name.internal.ssl.keystore.location=/var/run/secrets/java.io/keystores/server/internal/keystore.jks +listener.name.internal.ssl.keystore.password=keystore_serverpassword123 +listener.name.internal.ssl.keystore.type=JKS +listener.name.internal.ssl.truststore.location=/var/run/secrets/java.io/keystores/server/internal/truststore.jks +listener.name.internal.ssl.truststore.password=keystore_serverpassword123 +listener.name.internal.ssl.truststore.type=JKS +listener.security.protocol.map=INTERNAL:SSL +listeners=INTERNAL://:9092 +metric.reporters=com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporter +super.users=User:CN=kafka-headless.kafka.svc.cluster.local;User:CN=custom-superuser1;User:CN=custom-superuser2 +zookeeper.connect=example.zk:2181/`, + }, + { + testName: "configWithSSL_with_readOnly-superUsers2", + readOnlyConfig: `super.users=User:CN=kafka-headless.kafka.svc.cluster.local`, + zkAddresses: []string{"example.zk:2181"}, + zkPath: ``, + kubernetesClusterDomain: ``, + clusterWideConfig: ``, + perBrokerConfig: ``, + perBrokerReadOnlyConfig: ``, + advertisedListenerAddress: `kafka-0.kafka.svc.cluster.local:9092`, + listenerType: "ssl", + sslClientAuth: "none", + expectedConfig: `advertised.listeners=INTERNAL://kafka-0.kafka.svc.cluster.local:9092 +broker.id=0 +cruise.control.metrics.reporter.bootstrap.servers=kafka-all-broker.kafka.svc.cluster.local:9092 +cruise.control.metrics.reporter.kubernetes.mode=true +cruise.control.metrics.reporter.security.protocol=SSL +cruise.control.metrics.reporter.ssl.keystore.location=/var/run/secrets/java.io/keystores/client/keystore.jks +cruise.control.metrics.reporter.ssl.keystore.password=keystore_clientpassword123 +cruise.control.metrics.reporter.ssl.truststore.location=/var/run/secrets/java.io/keystores/client/truststore.jks +cruise.control.metrics.reporter.ssl.truststore.password=keystore_clientpassword123 +inter.broker.listener.name=INTERNAL +listener.name.internal.ssl.client.auth=none +listener.name.internal.ssl.keystore.location=/var/run/secrets/java.io/keystores/server/internal/keystore.jks +listener.name.internal.ssl.keystore.password=keystore_serverpassword123 +listener.name.internal.ssl.keystore.type=JKS +listener.name.internal.ssl.truststore.location=/var/run/secrets/java.io/keystores/server/internal/truststore.jks +listener.name.internal.ssl.truststore.password=keystore_serverpassword123 +listener.name.internal.ssl.truststore.type=JKS +listener.security.protocol.map=INTERNAL:SSL +listeners=INTERNAL://:9092 +metric.reporters=com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporter +super.users=User:CN=kafka-headless.kafka.svc.cluster.local +zookeeper.connect=example.zk:2181/`, + }, + { + testName: "configWithSSL_with_readOnly-superUsers3", + readOnlyConfig: `super.users=User:CN=custom-superuser1;User:CN=custom-superuser2;User:CN=kafka-headless.kafka.svc.cluster.local`, + zkAddresses: []string{"example.zk:2181"}, + zkPath: ``, + kubernetesClusterDomain: ``, + clusterWideConfig: ``, + perBrokerConfig: ``, + perBrokerReadOnlyConfig: ``, + advertisedListenerAddress: `kafka-0.kafka.svc.cluster.local:9092`, + listenerType: "ssl", + sslClientAuth: "none", + expectedConfig: `advertised.listeners=INTERNAL://kafka-0.kafka.svc.cluster.local:9092 +broker.id=0 +cruise.control.metrics.reporter.bootstrap.servers=kafka-all-broker.kafka.svc.cluster.local:9092 +cruise.control.metrics.reporter.kubernetes.mode=true +cruise.control.metrics.reporter.security.protocol=SSL +cruise.control.metrics.reporter.ssl.keystore.location=/var/run/secrets/java.io/keystores/client/keystore.jks +cruise.control.metrics.reporter.ssl.keystore.password=keystore_clientpassword123 +cruise.control.metrics.reporter.ssl.truststore.location=/var/run/secrets/java.io/keystores/client/truststore.jks +cruise.control.metrics.reporter.ssl.truststore.password=keystore_clientpassword123 +inter.broker.listener.name=INTERNAL +listener.name.internal.ssl.client.auth=none +listener.name.internal.ssl.keystore.location=/var/run/secrets/java.io/keystores/server/internal/keystore.jks +listener.name.internal.ssl.keystore.password=keystore_serverpassword123 +listener.name.internal.ssl.keystore.type=JKS +listener.name.internal.ssl.truststore.location=/var/run/secrets/java.io/keystores/server/internal/truststore.jks +listener.name.internal.ssl.truststore.password=keystore_serverpassword123 +listener.name.internal.ssl.truststore.type=JKS +listener.security.protocol.map=INTERNAL:SSL +listeners=INTERNAL://:9092 +metric.reporters=com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporter +super.users=User:CN=kafka-headless.kafka.svc.cluster.local;User:CN=custom-superuser1;User:CN=custom-superuser2 +zookeeper.connect=example.zk:2181/`, + }, + { + testName: "configWithSSL_with_readOnly-superUsers4", + readOnlyConfig: `super.users=`, + zkAddresses: []string{"example.zk:2181"}, + zkPath: ``, + kubernetesClusterDomain: ``, + clusterWideConfig: ``, + perBrokerConfig: ``, + perBrokerReadOnlyConfig: ``, + advertisedListenerAddress: `kafka-0.kafka.svc.cluster.local:9092`, + listenerType: "ssl", + sslClientAuth: "none", + expectedConfig: `advertised.listeners=INTERNAL://kafka-0.kafka.svc.cluster.local:9092 +broker.id=0 +cruise.control.metrics.reporter.bootstrap.servers=kafka-all-broker.kafka.svc.cluster.local:9092 +cruise.control.metrics.reporter.kubernetes.mode=true +cruise.control.metrics.reporter.security.protocol=SSL +cruise.control.metrics.reporter.ssl.keystore.location=/var/run/secrets/java.io/keystores/client/keystore.jks +cruise.control.metrics.reporter.ssl.keystore.password=keystore_clientpassword123 +cruise.control.metrics.reporter.ssl.truststore.location=/var/run/secrets/java.io/keystores/client/truststore.jks +cruise.control.metrics.reporter.ssl.truststore.password=keystore_clientpassword123 +inter.broker.listener.name=INTERNAL +listener.name.internal.ssl.client.auth=none +listener.name.internal.ssl.keystore.location=/var/run/secrets/java.io/keystores/server/internal/keystore.jks +listener.name.internal.ssl.keystore.password=keystore_serverpassword123 +listener.name.internal.ssl.keystore.type=JKS +listener.name.internal.ssl.truststore.location=/var/run/secrets/java.io/keystores/server/internal/truststore.jks +listener.name.internal.ssl.truststore.password=keystore_serverpassword123 +listener.name.internal.ssl.truststore.type=JKS +listener.security.protocol.map=INTERNAL:SSL +listeners=INTERNAL://:9092 +metric.reporters=com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporter +super.users=User:CN=kafka-headless.kafka.svc.cluster.local zookeeper.connect=example.zk:2181/`, }, } @@ -537,9 +677,7 @@ zookeeper.connect=example.zk:2181/`, superUsers []string ) - sslConfigTestNames := []string{"configWithSSL_SSLClientAuth_not_provided", "configWithSSL_SSLClientAuth_required", "configWithSSL_SSLClientAuth_requested", - "configWithSSL_SSLClientAuth_none"} - if util.StringSliceContains(sslConfigTestNames, test.testName) { + if strings.Contains(test.testName, "configWithSSL") { serverPasses = map[string]string{"internal": "keystore_serverpassword123"} clientPass = "keystore_clientpassword123" superUsers = []string{"CN=kafka-headless.kafka.svc.cluster.local"}