diff --git a/api/v1beta1/kafkacluster_types.go b/api/v1beta1/kafkacluster_types.go index 2abfe7640..f47228dbf 100644 --- a/api/v1beta1/kafkacluster_types.go +++ b/api/v1beta1/kafkacluster_types.go @@ -471,6 +471,14 @@ type InternalListenerConfig struct { CommonListenerSpec `json:",inline"` UsedForInnerBrokerCommunication bool `json:"usedForInnerBrokerCommunication"` UsedForControllerCommunication bool `json:"usedForControllerCommunication,omitempty"` + // This following options are helpful when you want to run a Kafka cluster over multiple Kubernetes clusters. + // The broker internal ports are computed as the sum of the internalStartingPort and the broker id. + // +optional + InternalStartingPort int32 `json:"internalStartingPort"` + // If set to a non-empty value, the Kafka brokers will use the external hostname for inter broker communication. + // The internal lister will will share the same hostname with the external listener that is referenced here. + // +optional + ExternalListenerForHostname string `json:"externalListenerForHostname,omitempty"` } // CommonListenerSpec defines the common building block for Listener type diff --git a/pkg/k8sutil/status.go b/pkg/k8sutil/status.go index a7abf4910..562558482 100644 --- a/pkg/k8sutil/status.go +++ b/pkg/k8sutil/status.go @@ -29,6 +29,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + "github.com/banzaicloud/koperator/api/v1beta1" banzaicloudv1beta1 "github.com/banzaicloud/koperator/api/v1beta1" clientutil "github.com/banzaicloud/koperator/pkg/util/client" ) @@ -306,7 +307,7 @@ func UpdateListenerStatuses(ctx context.Context, c client.Client, cluster *banza return nil } -func CreateInternalListenerStatuses(kafkaCluster *banzaicloudv1beta1.KafkaCluster) (map[string]banzaicloudv1beta1.ListenerStatusList, map[string]banzaicloudv1beta1.ListenerStatusList) { +func CreateInternalListenerStatuses(kafkaCluster *banzaicloudv1beta1.KafkaCluster, externalListenerStatus map[string]banzaicloudv1beta1.ListenerStatusList) (map[string]banzaicloudv1beta1.ListenerStatusList, map[string]banzaicloudv1beta1.ListenerStatusList) { intListenerStatuses := make(map[string]banzaicloudv1beta1.ListenerStatusList, len(kafkaCluster.Spec.ListenersConfig.InternalListeners)) controllerIntListenerStatuses := make(map[string]banzaicloudv1beta1.ListenerStatusList) @@ -326,13 +327,22 @@ func CreateInternalListenerStatuses(kafkaCluster *banzaicloudv1beta1.KafkaCluste // add addresses per broker for _, broker := range kafkaCluster.Spec.Brokers { - var address string - if kafkaCluster.Spec.HeadlessServiceEnabled { - address = fmt.Sprintf("%s-%d.%s-headless.%s.svc.%s:%d", kafkaCluster.Name, broker.Id, kafkaCluster.Name, - kafkaCluster.Namespace, kafkaCluster.Spec.GetKubernetesClusterDomain(), iListener.ContainerPort) - } else { - address = fmt.Sprintf("%s-%d.%s.svc.%s:%d", kafkaCluster.Name, broker.Id, kafkaCluster.Namespace, - kafkaCluster.Spec.GetKubernetesClusterDomain(), iListener.ContainerPort) + var address = "" + if iListener.ExternalListenerForHostname != "" && iListener.InternalStartingPort > 0 { + if eListenerStatus, ok := externalListenerStatus[iListener.ExternalListenerForHostname]; ok { + address = fmt.Sprintf("%s:%d", getHostnameForBrokerId(eListenerStatus, broker.Id), + iListener.InternalStartingPort+broker.Id) + } + } + + if address == "" { + if kafkaCluster.Spec.HeadlessServiceEnabled { + address = fmt.Sprintf("%s-%d.%s-headless.%s.svc.%s:%d", kafkaCluster.Name, broker.Id, kafkaCluster.Name, + kafkaCluster.Namespace, kafkaCluster.Spec.GetKubernetesClusterDomain(), iListener.ContainerPort) + } else { + address = fmt.Sprintf("%s-%d.%s.svc.%s:%d", kafkaCluster.Name, broker.Id, kafkaCluster.Namespace, + kafkaCluster.Spec.GetKubernetesClusterDomain(), iListener.ContainerPort) + } } listenerStatusList = append(listenerStatusList, banzaicloudv1beta1.ListenerStatus{ Name: fmt.Sprintf("broker-%d", broker.Id), @@ -349,3 +359,12 @@ func CreateInternalListenerStatuses(kafkaCluster *banzaicloudv1beta1.KafkaCluste return intListenerStatuses, controllerIntListenerStatuses } + +func getHostnameForBrokerId(eListenerStatusList v1beta1.ListenerStatusList, brokerId int32) string { + for _, eListenerStatus := range eListenerStatusList { + if eListenerStatus.Name == fmt.Sprintf("broker-%d", brokerId) { + return strings.Split(eListenerStatus.Address, ":")[0] + } + } + return "" +} diff --git a/pkg/resources/kafka/kafka.go b/pkg/resources/kafka/kafka.go index 89acafcbd..df9ca6918 100644 --- a/pkg/resources/kafka/kafka.go +++ b/pkg/resources/kafka/kafka.go @@ -185,7 +185,7 @@ func (r *Reconciler) Reconcile(log logr.Logger) error { if err != nil { return errors.WrapIf(err, "could not update status for external listeners") } - intListenerStatuses, controllerIntListenerStatuses := k8sutil.CreateInternalListenerStatuses(r.KafkaCluster) + intListenerStatuses, controllerIntListenerStatuses := k8sutil.CreateInternalListenerStatuses(r.KafkaCluster, extListenerStatuses) err = k8sutil.UpdateListenerStatuses(context.Background(), r.Client, r.KafkaCluster, intListenerStatuses, extListenerStatuses) if err != nil { return errors.WrapIf(err, "failed to update listener statuses")