diff --git a/pkg/k8sutil/status.go b/pkg/k8sutil/status.go index 028081d0ae..965aff1d3b 100644 --- a/pkg/k8sutil/status.go +++ b/pkg/k8sutil/status.go @@ -306,7 +306,7 @@ func UpdateListenerStatuses(ctx context.Context, c client.Client, cluster *v1bet return nil } -func CreateInternalListenerStatuses(kafkaCluster *v1beta1.KafkaCluster) (map[string]v1beta1.ListenerStatusList, map[string]v1beta1.ListenerStatusList) { +func CreateInternalListenerStatuses(kafkaCluster *v1beta1.KafkaCluster, externalListenerStatus map[string]v1beta1.ListenerStatusList) (map[string]v1beta1.ListenerStatusList, map[string]v1beta1.ListenerStatusList) { intListenerStatuses := make(map[string]v1beta1.ListenerStatusList, len(kafkaCluster.Spec.ListenersConfig.InternalListeners)) controllerIntListenerStatuses := make(map[string]v1beta1.ListenerStatusList) @@ -326,13 +326,22 @@ func CreateInternalListenerStatuses(kafkaCluster *v1beta1.KafkaCluster) (map[str // add addresses per broker for _, broker := range kafkaCluster.Spec.Brokers { - var address string - if kafkaCluster.Spec.HeadlessServiceEnabled { - address = fmt.Sprintf("%s-%d.%s-headless.%s.svc.%s:%d", kafkaCluster.Name, broker.Id, kafkaCluster.Name, - kafkaCluster.Namespace, kafkaCluster.Spec.GetKubernetesClusterDomain(), iListener.ContainerPort) - } else { - address = fmt.Sprintf("%s-%d.%s.svc.%s:%d", kafkaCluster.Name, broker.Id, kafkaCluster.Namespace, - kafkaCluster.Spec.GetKubernetesClusterDomain(), iListener.ContainerPort) + var address = "" + if iListener.ExternalListenerForHostname != "" && iListener.InternalStartingPort > 0 { + if eListenerStatus, ok := externalListenerStatus[iListener.ExternalListenerForHostname]; ok { + address = fmt.Sprintf("%s:%d", getHostnameForBrokerId(eListenerStatus, broker.Id), + iListener.InternalStartingPort+broker.Id) + } + } + + if address == "" { + if kafkaCluster.Spec.HeadlessServiceEnabled { + address = fmt.Sprintf("%s-%d.%s-headless.%s.svc.%s:%d", kafkaCluster.Name, broker.Id, kafkaCluster.Name, + kafkaCluster.Namespace, kafkaCluster.Spec.GetKubernetesClusterDomain(), iListener.ContainerPort) + } else { + address = fmt.Sprintf("%s-%d.%s.svc.%s:%d", kafkaCluster.Name, broker.Id, kafkaCluster.Namespace, + kafkaCluster.Spec.GetKubernetesClusterDomain(), iListener.ContainerPort) + } } listenerStatusList = append(listenerStatusList, v1beta1.ListenerStatus{ Name: fmt.Sprintf("broker-%d", broker.Id), @@ -349,3 +358,12 @@ func CreateInternalListenerStatuses(kafkaCluster *v1beta1.KafkaCluster) (map[str return intListenerStatuses, controllerIntListenerStatuses } + +func getHostnameForBrokerId(eListenerStatusList v1beta1.ListenerStatusList, brokerId int32) string { + for _, eListenerStatus := range eListenerStatusList { + if eListenerStatus.Name == fmt.Sprintf("broker-%d", brokerId) { + return strings.Split(eListenerStatus.Address, ":")[0] + } + } + return "" +} diff --git a/pkg/resources/kafka/kafka.go b/pkg/resources/kafka/kafka.go index 8dee2c45cf..9fb3b7595c 100644 --- a/pkg/resources/kafka/kafka.go +++ b/pkg/resources/kafka/kafka.go @@ -184,7 +184,7 @@ func (r *Reconciler) Reconcile(log logr.Logger) error { if err != nil { return errors.WrapIf(err, "could not update status for external listeners") } - intListenerStatuses, controllerIntListenerStatuses := k8sutil.CreateInternalListenerStatuses(r.KafkaCluster) + intListenerStatuses, controllerIntListenerStatuses := k8sutil.CreateInternalListenerStatuses(r.KafkaCluster, extListenerStatuses) err = k8sutil.UpdateListenerStatuses(context.Background(), r.Client, r.KafkaCluster, log, intListenerStatuses, extListenerStatuses) if err != nil { return errors.WrapIf(err, "failed to update listener statuses") diff --git a/pkg/sdk/v1beta1/kafkacluster_types.go b/pkg/sdk/v1beta1/kafkacluster_types.go index 511638810b..c1ee53d4b5 100644 --- a/pkg/sdk/v1beta1/kafkacluster_types.go +++ b/pkg/sdk/v1beta1/kafkacluster_types.go @@ -412,6 +412,14 @@ type InternalListenerConfig struct { CommonListenerSpec `json:",inline"` UsedForInnerBrokerCommunication bool `json:"usedForInnerBrokerCommunication"` UsedForControllerCommunication bool `json:"usedForControllerCommunication,omitempty"` + // This following options are helpful when you want to run a Kafka cluster over multiple Kubernetes clusters. + // The broker internal ports are computed as the sum of the internalStartingPort and the broker id. + // +optional + InternalStartingPort int32 `json:"internalStartingPort"` + // If set to a non-empty value, the Kafka brokers will use the external hostname for inter broker communication. + // The internal lister will will share the same hostname with the external listener that is referenced here. + // +optional + ExternalListenerForHostname string `json:"externalListenerForHostname,omitempty"` } // CommonListenerSpec defines the common building block for Listener type