Skip to content

Commit

Permalink
Add explicit flags for ingressFwd and externalDNS
Browse files Browse the repository at this point in the history
for Internal listener - cross-K8s Kafka cluster usecase
  • Loading branch information
alungu committed Nov 10, 2020
1 parent 428022e commit 28339e8
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 28 deletions.
14 changes: 13 additions & 1 deletion config/base/crds/kafka.banzaicloud.io_kafkaclusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4185,20 +4185,32 @@ spec:
containerPort:
format: int32
type: integer
ingressForwarded:
description: If set to true, Envoy will create routes for
the internal listener ports. Required if `ingressForwarded`
or `useExternalHostname` is set.
type: boolean
internalStartingPort:
description: This following options are helpful when you want
to run a Kafka cluster over multiple Kubernetes clusters.
The broker internal ports are computed as the sum of the
internalStartingPort and the broker id.
format: int32
type: integer
name:
type: string
type:
type: string
useExternalHostname:
description: If set to true, the Kafka brokers will use the
external hostname for inter broker communication.
type: boolean
usedForControllerCommunication:
type: boolean
usedForInnerBrokerCommunication:
type: boolean
required:
- containerPort
- internalStartingPort
- name
- type
- usedForInnerBrokerCommunication
Expand Down
4 changes: 4 additions & 0 deletions config/samples/kafka-cluster-1.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -126,11 +126,15 @@ spec:
internalListeners:
- containerPort: 29092
internalStartingPort: 9011
ingressForwarded: true
useExternalHostname: true
name: internal
type: plaintext
usedForInnerBrokerCommunication: true
- containerPort: 29093
internalStartingPort: 9021
ingressForwarded: true
useExternalHostname: true
name: controller
type: plaintext
usedForControllerCommunication: true
Expand Down
29 changes: 17 additions & 12 deletions pkg/resources/envoy/configmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,25 +76,30 @@ func GenerateEnvoyConfig(kc *v1beta1.KafkaCluster, envoyConfig *v1beta1.EnvoyCon

listeners = append(listeners, envoyListener(fmt.Sprintf("broker-%d-external", brokerId),
uint32(kc.Spec.ListenersConfig.ExternalListeners[0].ExternalStartingPort+int32(brokerId))))

for _, internalListener := range kc.Spec.ListenersConfig.InternalListeners {
if internalListener.UsedForInnerBrokerCommunication {
listeners = append(listeners, envoyListener(fmt.Sprintf("broker-%d-internal", brokerId),
uint32(internalListener.InternalStartingPort+int32(brokerId))))
} else if internalListener.UsedForControllerCommunication {
listeners = append(listeners, envoyListener(fmt.Sprintf("broker-%d-controller", brokerId),
uint32(internalListener.InternalStartingPort+int32(brokerId))))
if internalListener.IngressForwarded && internalListener.InternalStartingPort > 0 {
if internalListener.UsedForInnerBrokerCommunication {
listeners = append(listeners, envoyListener(fmt.Sprintf("broker-%d-internal", brokerId),
uint32(internalListener.InternalStartingPort+int32(brokerId))))
} else if internalListener.UsedForControllerCommunication {
listeners = append(listeners, envoyListener(fmt.Sprintf("broker-%d-controller", brokerId),
uint32(internalListener.InternalStartingPort+int32(brokerId))))
}
}
}

clusters = append(clusters, envoyCluster(kc, fmt.Sprintf("broker-%d-external", brokerId), uint32(brokerId),
uint32(kc.Spec.ListenersConfig.ExternalListeners[0].ContainerPort)))
for _, internalListener := range kc.Spec.ListenersConfig.InternalListeners {
if internalListener.UsedForInnerBrokerCommunication {
clusters = append(clusters, envoyCluster(kc, fmt.Sprintf("broker-%d-internal", brokerId), uint32(brokerId),
uint32(internalListener.ContainerPort)))
} else {
clusters = append(clusters, envoyCluster(kc, fmt.Sprintf("broker-%d-controller", brokerId), uint32(brokerId),
uint32(internalListener.ContainerPort)))
if internalListener.IngressForwarded && internalListener.InternalStartingPort > 0 {
if internalListener.UsedForInnerBrokerCommunication {
clusters = append(clusters, envoyCluster(kc, fmt.Sprintf("broker-%d-internal", brokerId), uint32(brokerId),
uint32(internalListener.ContainerPort)))
} else {
clusters = append(clusters, envoyCluster(kc, fmt.Sprintf("broker-%d-controller", brokerId), uint32(brokerId),
uint32(internalListener.ContainerPort)))
}
}
}
}
Expand Down
38 changes: 26 additions & 12 deletions pkg/resources/kafka/configmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,9 @@ func (r *Reconciler) getConfigString(bConfig *v1beta1.BrokerConfig, id int32, se
"ListenerConfig": generateListenerSpecificConfig(&r.KafkaCluster.Spec.ListenersConfig, log),
"SSLEnabledForInternalCommunication": r.KafkaCluster.Spec.ListenersConfig.SSLSecrets != nil && util.IsSSLEnabledForInternalCommunication(r.KafkaCluster.Spec.ListenersConfig.InternalListeners),
"ZookeeperConnectString": zookeeperutils.PrepareConnectionAddress(r.KafkaCluster.Spec.ZKAddresses, r.KafkaCluster.Spec.GetZkPath()),
"CruiseControlBootstrapServers": getInternalListener(r.KafkaCluster.Spec.ListenersConfig.InternalListeners, id, bConfig.HostnameOverride),
"CruiseControlBootstrapServers": getInternalListener(r.KafkaCluster.Spec.ListenersConfig.InternalListeners, id, bConfig.HostnameOverride, r.KafkaCluster.Spec.GetKubernetesClusterDomain(), r.KafkaCluster.Namespace, r.KafkaCluster.Name, r.KafkaCluster.Spec.HeadlessServiceEnabled),
"StorageConfig": generateStorageConfig(bConfig.StorageConfigs),
"AdvertisedListenersConfig": generateAdvertisedListenerConfig(id, &r.KafkaCluster.Spec.ListenersConfig, bConfig.HostnameOverride),
"AdvertisedListenersConfig": generateAdvertisedListenerConfig(id, &r.KafkaCluster.Spec.ListenersConfig, bConfig.HostnameOverride, r.KafkaCluster.Spec.GetKubernetesClusterDomain(), r.KafkaCluster.Namespace, r.KafkaCluster.Name, r.KafkaCluster.Spec.HeadlessServiceEnabled),
"SuperUsers": strings.Join(generateSuperUsers(superUsers), ";"),
"ServerKeystorePath": serverKeystorePath,
"ClientKeystorePath": clientKeystorePath,
Expand Down Expand Up @@ -127,15 +127,25 @@ func (r *Reconciler) configMap(id int32, brokerConfig *v1beta1.BrokerConfig, ser
return brokerConf
}

func generateAdvertisedListenerConfig(id int32, l *v1beta1.ListenersConfig, brokerHostname string) string {
func generateAdvertisedListenerConfig(id int32, l *v1beta1.ListenersConfig, brokerHostname, domain, namespace, crName string, headlessServiceEnabled bool) string {
advertisedListenerConfig := []string{}
for _, eListener := range l.ExternalListeners {
advertisedListenerConfig = append(advertisedListenerConfig,
fmt.Sprintf("%s://%s:%d", strings.ToUpper(eListener.Name), brokerHostname, eListener.ExternalStartingPort+id))
}
for _, iListener := range l.InternalListeners {
advertisedListenerConfig = append(advertisedListenerConfig,
fmt.Sprintf("%s://%s:%d", strings.ToUpper(iListener.Name), brokerHostname, iListener.InternalStartingPort+id))
if iListener.UseExternalHostname && iListener.InternalStartingPort > 0 {
advertisedListenerConfig = append(advertisedListenerConfig,
fmt.Sprintf("%s://%s:%d", strings.ToUpper(iListener.Name), brokerHostname, iListener.InternalStartingPort+id))
} else {
if headlessServiceEnabled {
advertisedListenerConfig = append(advertisedListenerConfig,
fmt.Sprintf("%s://%s-%d.%s-headless.%s.svc.%s:%d", strings.ToUpper(iListener.Name), crName, id, crName, namespace, domain, iListener.ContainerPort))
} else {
advertisedListenerConfig = append(advertisedListenerConfig,
fmt.Sprintf("%s://%s-%d.%s.svc.%s:%d", strings.ToUpper(iListener.Name), crName, id, namespace, domain, iListener.ContainerPort))
}
}
}
return fmt.Sprintf("advertised.listeners=%s\n", strings.Join(advertisedListenerConfig, ","))
}
Expand Down Expand Up @@ -190,17 +200,21 @@ func generateListenerSpecificConfig(l *v1beta1.ListenersConfig, log logr.Logger)
"listeners=" + strings.Join(listenerConfig, ",") + "\n"
}

func getInternalListener(iListeners []v1beta1.InternalListenerConfig, id int32, brokerHostname string) string {

internalListener := ""

func getInternalListener(iListeners []v1beta1.InternalListenerConfig, id int32, brokerHostname, domain, namespace, crName string, headlessServiceEnabled bool) string {
for _, iListener := range iListeners {
if iListener.UsedForInnerBrokerCommunication {
internalListener = fmt.Sprintf("%s://%s:%d", strings.ToUpper(iListener.Name), brokerHostname, iListener.InternalStartingPort+id)
if iListener.UseExternalHostname && iListener.InternalStartingPort > 0 {
return fmt.Sprintf("%s://%s:%d", strings.ToUpper(iListener.Name), brokerHostname, iListener.InternalStartingPort+id)
} else {
if headlessServiceEnabled {
return fmt.Sprintf("%s://%s-%d.%s-headless.%s.svc.%s:%d", strings.ToUpper(iListener.Name), crName, id, crName, namespace, domain, iListener.ContainerPort)
} else {
return fmt.Sprintf("%s://%s-%d.%s.svc.%s:%d", strings.ToUpper(iListener.Name), crName, id, namespace, domain, iListener.ContainerPort)
}
}
}
}

return internalListener
return ""
}

func (r Reconciler) generateBrokerConfig(id int32, brokerConfig *v1beta1.BrokerConfig, serverPass, clientPass string, superUsers []string, log logr.Logger) string {
Expand Down
16 changes: 13 additions & 3 deletions pkg/sdk/v1beta1/kafkacluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,9 +322,19 @@ type ExternalListenerConfig struct {
// InternalListenerConfig defines the internal listener config for Kafka
type InternalListenerConfig struct {
CommonListenerSpec `json:",inline"`
UsedForInnerBrokerCommunication bool `json:"usedForInnerBrokerCommunication"`
UsedForControllerCommunication bool `json:"usedForControllerCommunication,omitempty"`
InternalStartingPort int32 `json:"internalStartingPort"`
UsedForInnerBrokerCommunication bool `json:"usedForInnerBrokerCommunication"`
UsedForControllerCommunication bool `json:"usedForControllerCommunication,omitempty"`
// This following options are helpful when you want to run a Kafka cluster over multiple Kubernetes clusters.
// The broker internal ports are computed as the sum of the internalStartingPort and the broker id.
// +optional
InternalStartingPort int32 `json:"internalStartingPort"`
// If set to true, Envoy will create routes for the internal listener ports.
// Required if `ingressForwarded` or `useExternalHostname` is set.
// +optional
IngressForwarded bool `json:"ingressForwarded,omitempty"`
// If set to true, the Kafka brokers will use the external hostname for inter broker communication.
// +optional
UseExternalHostname bool `json:"useExternalHostname,omitempty"`
}

// CommonListenerSpec defines the common building block for Listener type
Expand Down

0 comments on commit 28339e8

Please sign in to comment.