Skip to content

Commit

Permalink
[INTERNAL] Allow Kafka to use External DNS for inter-broker protocol (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
amuraru committed Jun 29, 2022
1 parent d9e323f commit 62dc472
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 9 deletions.
8 changes: 8 additions & 0 deletions api/v1beta1/kafkacluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,6 +471,14 @@ type InternalListenerConfig struct {
CommonListenerSpec `json:",inline"`
UsedForInnerBrokerCommunication bool `json:"usedForInnerBrokerCommunication"`
UsedForControllerCommunication bool `json:"usedForControllerCommunication,omitempty"`
// This following options are helpful when you want to run a Kafka cluster over multiple Kubernetes clusters.
// The broker internal ports are computed as the sum of the internalStartingPort and the broker id.
// +optional
InternalStartingPort int32 `json:"internalStartingPort"`
// If set to a non-empty value, the Kafka brokers will use the external hostname for inter broker communication.
// The internal lister will will share the same hostname with the external listener that is referenced here.
// +optional
ExternalListenerForHostname string `json:"externalListenerForHostname,omitempty"`
}

// CommonListenerSpec defines the common building block for Listener type
Expand Down
35 changes: 27 additions & 8 deletions pkg/k8sutil/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"

"github.com/banzaicloud/koperator/api/v1beta1"
banzaicloudv1beta1 "github.com/banzaicloud/koperator/api/v1beta1"
clientutil "github.com/banzaicloud/koperator/pkg/util/client"
)
Expand Down Expand Up @@ -306,7 +307,7 @@ func UpdateListenerStatuses(ctx context.Context, c client.Client, cluster *banza
return nil
}

func CreateInternalListenerStatuses(kafkaCluster *banzaicloudv1beta1.KafkaCluster) (map[string]banzaicloudv1beta1.ListenerStatusList, map[string]banzaicloudv1beta1.ListenerStatusList) {
func CreateInternalListenerStatuses(kafkaCluster *banzaicloudv1beta1.KafkaCluster, externalListenerStatus map[string]banzaicloudv1beta1.ListenerStatusList) (map[string]banzaicloudv1beta1.ListenerStatusList, map[string]banzaicloudv1beta1.ListenerStatusList) {
intListenerStatuses := make(map[string]banzaicloudv1beta1.ListenerStatusList, len(kafkaCluster.Spec.ListenersConfig.InternalListeners))
controllerIntListenerStatuses := make(map[string]banzaicloudv1beta1.ListenerStatusList)

Expand All @@ -326,13 +327,22 @@ func CreateInternalListenerStatuses(kafkaCluster *banzaicloudv1beta1.KafkaCluste

// add addresses per broker
for _, broker := range kafkaCluster.Spec.Brokers {
var address string
if kafkaCluster.Spec.HeadlessServiceEnabled {
address = fmt.Sprintf("%s-%d.%s-headless.%s.svc.%s:%d", kafkaCluster.Name, broker.Id, kafkaCluster.Name,
kafkaCluster.Namespace, kafkaCluster.Spec.GetKubernetesClusterDomain(), iListener.ContainerPort)
} else {
address = fmt.Sprintf("%s-%d.%s.svc.%s:%d", kafkaCluster.Name, broker.Id, kafkaCluster.Namespace,
kafkaCluster.Spec.GetKubernetesClusterDomain(), iListener.ContainerPort)
var address = ""
if iListener.ExternalListenerForHostname != "" && iListener.InternalStartingPort > 0 {
if eListenerStatus, ok := externalListenerStatus[iListener.ExternalListenerForHostname]; ok {
address = fmt.Sprintf("%s:%d", getHostnameForBrokerId(eListenerStatus, broker.Id),
iListener.InternalStartingPort+broker.Id)
}
}

if address == "" {
if kafkaCluster.Spec.HeadlessServiceEnabled {
address = fmt.Sprintf("%s-%d.%s-headless.%s.svc.%s:%d", kafkaCluster.Name, broker.Id, kafkaCluster.Name,
kafkaCluster.Namespace, kafkaCluster.Spec.GetKubernetesClusterDomain(), iListener.ContainerPort)
} else {
address = fmt.Sprintf("%s-%d.%s.svc.%s:%d", kafkaCluster.Name, broker.Id, kafkaCluster.Namespace,
kafkaCluster.Spec.GetKubernetesClusterDomain(), iListener.ContainerPort)
}
}
listenerStatusList = append(listenerStatusList, banzaicloudv1beta1.ListenerStatus{
Name: fmt.Sprintf("broker-%d", broker.Id),
Expand All @@ -349,3 +359,12 @@ func CreateInternalListenerStatuses(kafkaCluster *banzaicloudv1beta1.KafkaCluste

return intListenerStatuses, controllerIntListenerStatuses
}

func getHostnameForBrokerId(eListenerStatusList v1beta1.ListenerStatusList, brokerId int32) string {
for _, eListenerStatus := range eListenerStatusList {
if eListenerStatus.Name == fmt.Sprintf("broker-%d", brokerId) {
return strings.Split(eListenerStatus.Address, ":")[0]
}
}
return ""
}
2 changes: 1 addition & 1 deletion pkg/resources/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ func (r *Reconciler) Reconcile(log logr.Logger) error {
if err != nil {
return errors.WrapIf(err, "could not update status for external listeners")
}
intListenerStatuses, controllerIntListenerStatuses := k8sutil.CreateInternalListenerStatuses(r.KafkaCluster)
intListenerStatuses, controllerIntListenerStatuses := k8sutil.CreateInternalListenerStatuses(r.KafkaCluster, extListenerStatuses)
err = k8sutil.UpdateListenerStatuses(context.Background(), r.Client, r.KafkaCluster, intListenerStatuses, extListenerStatuses)
if err != nil {
return errors.WrapIf(err, "failed to update listener statuses")
Expand Down

0 comments on commit 62dc472

Please sign in to comment.