From bc00be2ad59081d4668cd8c8255a0000723836ac Mon Sep 17 00:00:00 2001 From: Adrian Lungu Date: Tue, 10 Nov 2020 19:25:02 +0200 Subject: [PATCH] Allow Kafka to use External DNS for inter-broker protocol --- .../kafka.banzaicloud.io_kafkaclusters.yaml | 16 +++ pkg/resources/envoy/configmap.go | 130 +++++++++++------- pkg/resources/kafka/configmap.go | 33 +++-- pkg/sdk/v1beta1/kafkacluster_types.go | 11 ++ 4 files changed, 128 insertions(+), 62 deletions(-) diff --git a/config/base/crds/kafka.banzaicloud.io_kafkaclusters.yaml b/config/base/crds/kafka.banzaicloud.io_kafkaclusters.yaml index 282efb2f9..598b14c24 100644 --- a/config/base/crds/kafka.banzaicloud.io_kafkaclusters.yaml +++ b/config/base/crds/kafka.banzaicloud.io_kafkaclusters.yaml @@ -4185,10 +4185,26 @@ spec: containerPort: format: int32 type: integer + ingressForwarded: + description: If set to true, Envoy will create routes for + the internal listener ports. Required if `ingressForwarded` + or `useExternalHostname` is set. + type: boolean + internalStartingPort: + description: This following options are helpful when you want + to run a Kafka cluster over multiple Kubernetes clusters. + The broker internal ports are computed as the sum of the + internalStartingPort and the broker id. + format: int32 + type: integer name: type: string type: type: string + useExternalHostname: + description: If set to true, the Kafka brokers will use the + external hostname for inter broker communication. + type: boolean usedForControllerCommunication: type: boolean usedForInnerBrokerCommunication: diff --git a/pkg/resources/envoy/configmap.go b/pkg/resources/envoy/configmap.go index 530171bd1..e5dc03493 100644 --- a/pkg/resources/envoy/configmap.go +++ b/pkg/resources/envoy/configmap.go @@ -82,57 +82,37 @@ func GenerateEnvoyConfig(kc *v1beta1.KafkaCluster, envoyConfig *v1beta1.EnvoyCon } } if kc.Spec.ListenersConfig.ExternalListeners != nil { - listeners = append(listeners, &envoyapi.Listener{ - Address: &envoycore.Address{ - Address: &envoycore.Address_SocketAddress{ - SocketAddress: &envoycore.SocketAddress{ - Address: "0.0.0.0", - PortSpecifier: &envoycore.SocketAddress_PortValue{ - PortValue: uint32(kc.Spec.ListenersConfig.ExternalListeners[0].ExternalStartingPort + int32(brokerId)), - }, - }, - }, - }, - FilterChains: []*envoylistener.FilterChain{ - { - Filters: []*envoylistener.Filter{ - { - Name: wellknown.TCPProxy, - ConfigType: &envoylistener.Filter_Config{ - Config: &ptypesstruct.Struct{ - Fields: map[string]*ptypesstruct.Value{ - "stat_prefix": {Kind: &ptypesstruct.Value_StringValue{StringValue: fmt.Sprintf("broker_tcp-%d", brokerId)}}, - "cluster": {Kind: &ptypesstruct.Value_StringValue{StringValue: fmt.Sprintf("broker-%d", brokerId)}}, - }, - }, - }, - }, - }, - }, - }, - }) + listeners = append(listeners, envoyListener(fmt.Sprintf("broker-%d-external", brokerId), + uint32(kc.Spec.ListenersConfig.ExternalListeners[0].ExternalStartingPort+int32(brokerId)))) + } + + for _, internalListener := range kc.Spec.ListenersConfig.InternalListeners { + if internalListener.IngressForwarded && internalListener.InternalStartingPort > 0 { + if internalListener.UsedForInnerBrokerCommunication { + listeners = append(listeners, envoyListener(fmt.Sprintf("broker-%d-internal", brokerId), + uint32(internalListener.InternalStartingPort+int32(brokerId)))) + } else if internalListener.UsedForControllerCommunication { + listeners = append(listeners, envoyListener(fmt.Sprintf("broker-%d-controller", brokerId), + uint32(internalListener.InternalStartingPort+int32(brokerId)))) + } + } } if kc.Spec.ListenersConfig.ExternalListeners != nil { - clusters = append(clusters, &envoyapi.Cluster{ - Name: fmt.Sprintf("broker-%d", brokerId), - ConnectTimeout: &duration.Duration{Seconds: 1}, - ClusterDiscoveryType: &envoyapi.Cluster_Type{Type: envoyapi.Cluster_STRICT_DNS}, - LbPolicy: envoyapi.Cluster_ROUND_ROBIN, - Http2ProtocolOptions: &envoycore.Http2ProtocolOptions{}, - Hosts: []*envoycore.Address{ - { - Address: &envoycore.Address_SocketAddress{ - SocketAddress: &envoycore.SocketAddress{ - Address: fmt.Sprintf("%s-%d.%s-headless.%s.svc.%s", kc.Name, brokerId, kc.Name, kc.Namespace, kc.Spec.GetKubernetesClusterDomain()), - PortSpecifier: &envoycore.SocketAddress_PortValue{ - PortValue: uint32(kc.Spec.ListenersConfig.ExternalListeners[0].ContainerPort), - }, - }, - }, - }, - }, - }) + clusters = append(clusters, envoyCluster(kc, fmt.Sprintf("broker-%d-external", brokerId), uint32(brokerId), + uint32(kc.Spec.ListenersConfig.ExternalListeners[0].ContainerPort))) + } + + for _, internalListener := range kc.Spec.ListenersConfig.InternalListeners { + if internalListener.IngressForwarded && internalListener.InternalStartingPort > 0 { + if internalListener.UsedForInnerBrokerCommunication { + clusters = append(clusters, envoyCluster(kc, fmt.Sprintf("broker-%d-internal", brokerId), uint32(brokerId), + uint32(internalListener.ContainerPort))) + } else { + clusters = append(clusters, envoyCluster(kc, fmt.Sprintf("broker-%d-controller", brokerId), uint32(brokerId), + uint32(internalListener.ContainerPort))) + } + } } } @@ -159,6 +139,60 @@ func GenerateEnvoyConfig(kc *v1beta1.KafkaCluster, envoyConfig *v1beta1.EnvoyCon return string(marshalledConfig) } +func envoyListener(name string, containerPort uint32) *envoyapi.Listener { + return &envoyapi.Listener{ + Address: &envoycore.Address{ + Address: &envoycore.Address_SocketAddress{ + SocketAddress: &envoycore.SocketAddress{ + Address: "0.0.0.0", + PortSpecifier: &envoycore.SocketAddress_PortValue{ + PortValue: containerPort, + }, + }, + }, + }, + FilterChains: []*envoylistener.FilterChain{ + { + Filters: []*envoylistener.Filter{ + { + Name: wellknown.TCPProxy, + ConfigType: &envoylistener.Filter_Config{ + Config: &ptypesstruct.Struct{ + Fields: map[string]*ptypesstruct.Value{ + "stat_prefix": {Kind: &ptypesstruct.Value_StringValue{StringValue: fmt.Sprintf("tcp_%s", name)}}, + "cluster": {Kind: &ptypesstruct.Value_StringValue{StringValue: name}}, + }, + }, + }, + }, + }, + }, + }, + } +} + +func envoyCluster(kc *v1beta1.KafkaCluster, name string, brokerId, containerPort uint32) *envoyapi.Cluster { + return &envoyapi.Cluster{ + Name: name, + ConnectTimeout: &duration.Duration{Seconds: 1}, + ClusterDiscoveryType: &envoyapi.Cluster_Type{Type: envoyapi.Cluster_STRICT_DNS}, + LbPolicy: envoyapi.Cluster_ROUND_ROBIN, + Http2ProtocolOptions: &envoycore.Http2ProtocolOptions{}, + Hosts: []*envoycore.Address{ + { + Address: &envoycore.Address_SocketAddress{ + SocketAddress: &envoycore.SocketAddress{ + Address: fmt.Sprintf("%s-%d.%s-headless.%s.svc.%s", kc.Name, brokerId, kc.Name, kc.Namespace, kc.Spec.GetKubernetesClusterDomain()), + PortSpecifier: &envoycore.SocketAddress_PortValue{ + PortValue: containerPort, + }, + }, + }, + }, + }, + } +} + func configName(envoyConfig *v1beta1.EnvoyConfig) string { if envoyConfig.Id == envoyGlobal { return envoyVolumeAndConfigName diff --git a/pkg/resources/kafka/configmap.go b/pkg/resources/kafka/configmap.go index b757b854d..a5734f328 100644 --- a/pkg/resources/kafka/configmap.go +++ b/pkg/resources/kafka/configmap.go @@ -84,7 +84,7 @@ func (r *Reconciler) getConfigString(bConfig *v1beta1.BrokerConfig, id int32, se "ListenerConfig": generateListenerSpecificConfig(&r.KafkaCluster.Spec.ListenersConfig, log), "SSLEnabledForInternalCommunication": r.KafkaCluster.Spec.ListenersConfig.SSLSecrets != nil && util.IsSSLEnabledForInternalCommunication(r.KafkaCluster.Spec.ListenersConfig.InternalListeners), "ZookeeperConnectString": zookeeperutils.PrepareConnectionAddress(r.KafkaCluster.Spec.ZKAddresses, r.KafkaCluster.Spec.GetZkPath()), - "CruiseControlBootstrapServers": getInternalListener(r.KafkaCluster.Spec.ListenersConfig.InternalListeners, id, r.KafkaCluster.Spec.GetKubernetesClusterDomain(), r.KafkaCluster.Namespace, r.KafkaCluster.Name, r.KafkaCluster.Spec.HeadlessServiceEnabled), + "CruiseControlBootstrapServers": getInternalListener(r.KafkaCluster.Spec.ListenersConfig.InternalListeners, id, bConfig.HostnameOverride, r.KafkaCluster.Spec.GetKubernetesClusterDomain(), r.KafkaCluster.Namespace, r.KafkaCluster.Name, r.KafkaCluster.Spec.HeadlessServiceEnabled), "StorageConfig": generateStorageConfig(bConfig.StorageConfigs), "AdvertisedListenersConfig": generateAdvertisedListenerConfig(id, &r.KafkaCluster.Spec.ListenersConfig, bConfig.HostnameOverride, r.KafkaCluster.Spec.GetKubernetesClusterDomain(), r.KafkaCluster.Namespace, r.KafkaCluster.Name, r.KafkaCluster.Spec.HeadlessServiceEnabled), "SuperUsers": strings.Join(generateSuperUsers(superUsers), ";"), @@ -134,12 +134,17 @@ func generateAdvertisedListenerConfig(id int32, l *v1beta1.ListenersConfig, brok fmt.Sprintf("%s://%s:%d", strings.ToUpper(eListener.Name), brokerHostname, eListener.ExternalStartingPort+id)) } for _, iListener := range l.InternalListeners { - if headlessServiceEnabled { + if iListener.UseExternalHostname && iListener.InternalStartingPort > 0 { advertisedListenerConfig = append(advertisedListenerConfig, - fmt.Sprintf("%s://%s-%d.%s-headless.%s.svc.%s:%d", strings.ToUpper(iListener.Name), crName, id, crName, namespace, domain, iListener.ContainerPort)) + fmt.Sprintf("%s://%s:%d", strings.ToUpper(iListener.Name), brokerHostname, iListener.InternalStartingPort+id)) } else { - advertisedListenerConfig = append(advertisedListenerConfig, - fmt.Sprintf("%s://%s-%d.%s.svc.%s:%d", strings.ToUpper(iListener.Name), crName, id, namespace, domain, iListener.ContainerPort)) + if headlessServiceEnabled { + advertisedListenerConfig = append(advertisedListenerConfig, + fmt.Sprintf("%s://%s-%d.%s-headless.%s.svc.%s:%d", strings.ToUpper(iListener.Name), crName, id, crName, namespace, domain, iListener.ContainerPort)) + } else { + advertisedListenerConfig = append(advertisedListenerConfig, + fmt.Sprintf("%s://%s-%d.%s.svc.%s:%d", strings.ToUpper(iListener.Name), crName, id, namespace, domain, iListener.ContainerPort)) + } } } return fmt.Sprintf("advertised.listeners=%s\n", strings.Join(advertisedListenerConfig, ",")) @@ -195,21 +200,21 @@ func generateListenerSpecificConfig(l *v1beta1.ListenersConfig, log logr.Logger) "listeners=" + strings.Join(listenerConfig, ",") + "\n" } -func getInternalListener(iListeners []v1beta1.InternalListenerConfig, id int32, domain, namespace, crName string, headlessServiceEnabled bool) string { - - internalListener := "" - +func getInternalListener(iListeners []v1beta1.InternalListenerConfig, id int32, brokerHostname, domain, namespace, crName string, headlessServiceEnabled bool) string { for _, iListener := range iListeners { if iListener.UsedForInnerBrokerCommunication { - if headlessServiceEnabled { - internalListener = fmt.Sprintf("%s://%s-%d.%s-headless.%s.svc.%s:%d", strings.ToUpper(iListener.Name), crName, id, crName, namespace, domain, iListener.ContainerPort) + if iListener.UseExternalHostname && iListener.InternalStartingPort > 0 { + return fmt.Sprintf("%s://%s:%d", strings.ToUpper(iListener.Name), brokerHostname, iListener.InternalStartingPort+id) } else { - internalListener = fmt.Sprintf("%s://%s-%d.%s.svc.%s:%d", strings.ToUpper(iListener.Name), crName, id, namespace, domain, iListener.ContainerPort) + if headlessServiceEnabled { + return fmt.Sprintf("%s://%s-%d.%s-headless.%s.svc.%s:%d", strings.ToUpper(iListener.Name), crName, id, crName, namespace, domain, iListener.ContainerPort) + } else { + return fmt.Sprintf("%s://%s-%d.%s.svc.%s:%d", strings.ToUpper(iListener.Name), crName, id, namespace, domain, iListener.ContainerPort) + } } } } - - return internalListener + return "" } func (r Reconciler) generateBrokerConfig(id int32, brokerConfig *v1beta1.BrokerConfig, serverPass, clientPass string, superUsers []string, log logr.Logger) string { diff --git a/pkg/sdk/v1beta1/kafkacluster_types.go b/pkg/sdk/v1beta1/kafkacluster_types.go index c49a5f5aa..563d17db8 100644 --- a/pkg/sdk/v1beta1/kafkacluster_types.go +++ b/pkg/sdk/v1beta1/kafkacluster_types.go @@ -324,6 +324,17 @@ type InternalListenerConfig struct { CommonListenerSpec `json:",inline"` UsedForInnerBrokerCommunication bool `json:"usedForInnerBrokerCommunication"` UsedForControllerCommunication bool `json:"usedForControllerCommunication,omitempty"` + // This following options are helpful when you want to run a Kafka cluster over multiple Kubernetes clusters. + // The broker internal ports are computed as the sum of the internalStartingPort and the broker id. + // +optional + InternalStartingPort int32 `json:"internalStartingPort"` + // If set to true, Envoy will create routes for the internal listener ports. + // Required if `ingressForwarded` or `useExternalHostname` is set. + // +optional + IngressForwarded bool `json:"ingressForwarded,omitempty"` + // If set to true, the Kafka brokers will use the external hostname for inter broker communication. + // +optional + UseExternalHostname bool `json:"useExternalHostname,omitempty"` } // CommonListenerSpec defines the common building block for Listener type