Skip to content

Commit

Permalink
[INTERNAL] Allow Kafka to use External DNS for inter-broker protocol (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
amuraru committed Jul 28, 2021
1 parent 77e6bce commit 3dc18a8
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 9 deletions.
34 changes: 26 additions & 8 deletions pkg/k8sutil/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ func UpdateListenerStatuses(ctx context.Context, c client.Client, cluster *v1bet
return nil
}

func CreateInternalListenerStatuses(kafkaCluster *v1beta1.KafkaCluster) (map[string]v1beta1.ListenerStatusList, map[string]v1beta1.ListenerStatusList) {
func CreateInternalListenerStatuses(kafkaCluster *v1beta1.KafkaCluster, externalListenerStatus map[string]v1beta1.ListenerStatusList) (map[string]v1beta1.ListenerStatusList, map[string]v1beta1.ListenerStatusList) {
intListenerStatuses := make(map[string]v1beta1.ListenerStatusList, len(kafkaCluster.Spec.ListenersConfig.InternalListeners))
controllerIntListenerStatuses := make(map[string]v1beta1.ListenerStatusList)

Expand All @@ -326,13 +326,22 @@ func CreateInternalListenerStatuses(kafkaCluster *v1beta1.KafkaCluster) (map[str

// add addresses per broker
for _, broker := range kafkaCluster.Spec.Brokers {
var address string
if kafkaCluster.Spec.HeadlessServiceEnabled {
address = fmt.Sprintf("%s-%d.%s-headless.%s.svc.%s:%d", kafkaCluster.Name, broker.Id, kafkaCluster.Name,
kafkaCluster.Namespace, kafkaCluster.Spec.GetKubernetesClusterDomain(), iListener.ContainerPort)
} else {
address = fmt.Sprintf("%s-%d.%s.svc.%s:%d", kafkaCluster.Name, broker.Id, kafkaCluster.Namespace,
kafkaCluster.Spec.GetKubernetesClusterDomain(), iListener.ContainerPort)
var address = ""
if iListener.ExternalListenerForHostname != "" && iListener.InternalStartingPort > 0 {
if eListenerStatus, ok := externalListenerStatus[iListener.ExternalListenerForHostname]; ok {
address = fmt.Sprintf("%s:%d", getHostnameForBrokerId(eListenerStatus, broker.Id),
iListener.InternalStartingPort+broker.Id)
}
}

if address == "" {
if kafkaCluster.Spec.HeadlessServiceEnabled {
address = fmt.Sprintf("%s-%d.%s-headless.%s.svc.%s:%d", kafkaCluster.Name, broker.Id, kafkaCluster.Name,
kafkaCluster.Namespace, kafkaCluster.Spec.GetKubernetesClusterDomain(), iListener.ContainerPort)
} else {
address = fmt.Sprintf("%s-%d.%s.svc.%s:%d", kafkaCluster.Name, broker.Id, kafkaCluster.Namespace,
kafkaCluster.Spec.GetKubernetesClusterDomain(), iListener.ContainerPort)
}
}
listenerStatusList = append(listenerStatusList, v1beta1.ListenerStatus{
Name: fmt.Sprintf("broker-%d", broker.Id),
Expand All @@ -349,3 +358,12 @@ func CreateInternalListenerStatuses(kafkaCluster *v1beta1.KafkaCluster) (map[str

return intListenerStatuses, controllerIntListenerStatuses
}

func getHostnameForBrokerId(eListenerStatusList v1beta1.ListenerStatusList, brokerId int32) string {
for _, eListenerStatus := range eListenerStatusList {
if eListenerStatus.Name == fmt.Sprintf("broker-%d", brokerId) {
return strings.Split(eListenerStatus.Address, ":")[0]
}
}
return ""
}
2 changes: 1 addition & 1 deletion pkg/resources/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ func (r *Reconciler) Reconcile(log logr.Logger) error {
if err != nil {
return errors.WrapIf(err, "could not update status for external listeners")
}
intListenerStatuses, controllerIntListenerStatuses := k8sutil.CreateInternalListenerStatuses(r.KafkaCluster)
intListenerStatuses, controllerIntListenerStatuses := k8sutil.CreateInternalListenerStatuses(r.KafkaCluster, extListenerStatuses)
err = k8sutil.UpdateListenerStatuses(context.Background(), r.Client, r.KafkaCluster, log, intListenerStatuses, extListenerStatuses)
if err != nil {
return errors.WrapIf(err, "failed to update listener statuses")
Expand Down
8 changes: 8 additions & 0 deletions pkg/sdk/v1beta1/kafkacluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,14 @@ type InternalListenerConfig struct {
CommonListenerSpec `json:",inline"`
UsedForInnerBrokerCommunication bool `json:"usedForInnerBrokerCommunication"`
UsedForControllerCommunication bool `json:"usedForControllerCommunication,omitempty"`
// This following options are helpful when you want to run a Kafka cluster over multiple Kubernetes clusters.
// The broker internal ports are computed as the sum of the internalStartingPort and the broker id.
// +optional
InternalStartingPort int32 `json:"internalStartingPort"`
// If set to a non-empty value, the Kafka brokers will use the external hostname for inter broker communication.
// The internal lister will will share the same hostname with the external listener that is referenced here.
// +optional
ExternalListenerForHostname string `json:"externalListenerForHostname,omitempty"`
}

// CommonListenerSpec defines the common building block for Listener type
Expand Down

0 comments on commit 3dc18a8

Please sign in to comment.