Skip to content

Commit

Permalink
[INTERNAL] Allow external listeners to be used for inner communication (
Browse files Browse the repository at this point in the history
  • Loading branch information
amuraru committed Oct 29, 2021
1 parent 7783695 commit d152a61
Show file tree
Hide file tree
Showing 9 changed files with 70 additions and 44 deletions.
15 changes: 8 additions & 7 deletions api/v1beta1/kafkacluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,9 +424,8 @@ type IngressConfig struct {

// InternalListenerConfig defines the internal listener config for Kafka
type InternalListenerConfig struct {
CommonListenerSpec `json:",inline"`
UsedForInnerBrokerCommunication bool `json:"usedForInnerBrokerCommunication"`
UsedForControllerCommunication bool `json:"usedForControllerCommunication,omitempty"`
CommonListenerSpec `json:",inline"`
UsedForControllerCommunication bool `json:"usedForControllerCommunication,omitempty"`
// This following options are helpful when you want to run a Kafka cluster over multiple Kubernetes clusters.
// The broker internal ports are computed as the sum of the internalStartingPort and the broker id.
// +optional
Expand All @@ -440,10 +439,12 @@ type InternalListenerConfig struct {
// CommonListenerSpec defines the common building block for Listener type
type CommonListenerSpec struct {
// +kubebuilder:validation:Enum=ssl;plaintext;sasl_ssl;sasl_plaintext
Type SecurityProtocol `json:"type"`
// +kubebuilder:validation:Pattern=^[a-z0-9\-]+
Name string `json:"name"`
ContainerPort int32 `json:"containerPort"`
Type SecurityProtocol `json:"type"`
Name string `json:"name"`
ContainerPort int32 `json:"containerPort"`
// At least one of the listeners should have this flag enabled
// +optional
UsedForInnerBrokerCommunication bool `json:"usedForInnerBrokerCommunication"`
}

// ListenerStatuses holds information about the statuses of the configured listeners.
Expand Down
18 changes: 9 additions & 9 deletions controllers/tests/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,20 +55,20 @@ func createMinimalKafkaClusterCR(name, namespace string) *v1beta1.KafkaCluster {
InternalListeners: []v1beta1.InternalListenerConfig{
{
CommonListenerSpec: v1beta1.CommonListenerSpec{
Type: "plaintext",
Name: "internal",
ContainerPort: 29092,
Type: "plaintext",
Name: "internal",
ContainerPort: 29092,
UsedForInnerBrokerCommunication: true,
},
UsedForInnerBrokerCommunication: true,
},
{
CommonListenerSpec: v1beta1.CommonListenerSpec{
Type: "plaintext",
Name: "controller",
ContainerPort: 29093,
Type: "plaintext",
Name: "controller",
ContainerPort: 29093,
UsedForInnerBrokerCommunication: false,
},
UsedForInnerBrokerCommunication: false,
UsedForControllerCommunication: true,
UsedForControllerCommunication: true,
},
},
},
Expand Down
8 changes: 4 additions & 4 deletions internal/alertmanager/currentalert/current_alerts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,10 +137,10 @@ func TestGetCurrentAlerts(t *testing.T) {
ListenersConfig: v1beta1.ListenersConfig{
InternalListeners: []v1beta1.InternalListenerConfig{
{CommonListenerSpec: v1beta1.CommonListenerSpec{
Type: "plaintext",
Name: "plaintext",
ContainerPort: 29092},
UsedForInnerBrokerCommunication: true,
Type: "plaintext",
Name: "plaintext",
ContainerPort: 29092,
UsedForInnerBrokerCommunication: true},
},
},
},
Expand Down
7 changes: 7 additions & 0 deletions pkg/resources/kafka/configmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,13 @@ func generateListenerSpecificConfig(l *v1beta1.ListenersConfig, log logr.Logger)
listenerConfig = append(listenerConfig, fmt.Sprintf("%s://:%d", UpperedListenerName, iListener.ContainerPort))
}
for _, eListener := range l.ExternalListeners {
if eListener.UsedForInnerBrokerCommunication {
if interBrokerListenerName == "" {
interBrokerListenerName = strings.ToUpper(eListener.Name)
} else {
log.Error(errors.New("inter broker listener name already set"), "config error")
}
}
UpperedListenerType := eListener.Type.ToUpperString()
UpperedListenerName := strings.ToUpper(eListener.Name)
securityProtocolMapConfig = append(securityProtocolMapConfig, fmt.Sprintf("%s:%s", UpperedListenerName, UpperedListenerType))
Expand Down
8 changes: 4 additions & 4 deletions pkg/resources/kafka/configmap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,11 +248,11 @@ zookeeper.connect=example.zk:2181/`,
ListenersConfig: v1beta1.ListenersConfig{
InternalListeners: []v1beta1.InternalListenerConfig{{
CommonListenerSpec: v1beta1.CommonListenerSpec{
Type: v1beta1.SecurityProtocol(test.listenerType),
Name: "internal",
ContainerPort: 9092,
Type: v1beta1.SecurityProtocol(test.listenerType),
Name: "internal",
ContainerPort: 9092,
UsedForInnerBrokerCommunication: true,
},
UsedForInnerBrokerCommunication: true,
},
},
},
Expand Down
19 changes: 13 additions & 6 deletions pkg/util/client/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,22 @@ import (
)

func UseSSL(cluster *v1beta1.KafkaCluster) bool {
return cluster.Spec.ListenersConfig.InternalListeners[determineInternalListenerForInnerCom(cluster.Spec.ListenersConfig.InternalListeners)].Type.IsSSL()
return false
}

func determineInternalListenerForInnerCom(internalListeners []v1beta1.InternalListenerConfig) int {
for id, val := range internalListeners {
func getContainerPortForInnerCom(internalListeners []v1beta1.InternalListenerConfig, extListeners []v1beta1.ExternalListenerConfig) int32 {
for _, val := range internalListeners {
if val.UsedForInnerBrokerCommunication {
return id
return val.ContainerPort
}
}

for _, val := range extListeners {
if val.UsedForInnerBrokerCommunication {
return val.ContainerPort
}
}

return 0
}

Expand All @@ -50,6 +57,6 @@ func GenerateKafkaAddressWithoutPort(cluster *v1beta1.KafkaCluster) string {
}

func GenerateKafkaAddress(cluster *v1beta1.KafkaCluster) string {
return fmt.Sprintf("%s:%d", GenerateKafkaAddressWithoutPort(cluster),
cluster.Spec.ListenersConfig.InternalListeners[determineInternalListenerForInnerCom(cluster.Spec.ListenersConfig.InternalListeners)].ContainerPort)
return fmt.Sprintf("%s:%d", GenerateKafkaAddressWithoutPort(cluster), getContainerPortForInnerCom(
cluster.Spec.ListenersConfig.InternalListeners, cluster.Spec.ListenersConfig.ExternalListeners))
}
5 changes: 4 additions & 1 deletion pkg/util/client/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@ func TestGenerateKafkaAddress(t *testing.T) {
ListenersConfig: v1beta1.ListenersConfig{
InternalListeners: []v1beta1.InternalListenerConfig{
{
CommonListenerSpec: v1beta1.CommonListenerSpec{ContainerPort: 80},
CommonListenerSpec: v1beta1.CommonListenerSpec{
ContainerPort: 80,
UsedForInnerBrokerCommunication: true,
},
},
},
},
Expand Down
16 changes: 12 additions & 4 deletions pkg/util/kafka/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,18 +211,26 @@ func GetBootstrapServersService(cluster *v1beta1.KafkaCluster) (string, error) {

// GetBrokerContainerPort return broker container port
func GetBrokerContainerPort(cluster *v1beta1.KafkaCluster) (int32, error) {
var listener v1beta1.InternalListenerConfig
containerPort := int32(0)

for _, lc := range cluster.Spec.ListenersConfig.InternalListeners {
if lc.UsedForInnerBrokerCommunication && !lc.UsedForControllerCommunication {
listener = lc
containerPort = lc.ContainerPort
break
}
}
if listener.Name == "" {

for _, lc := range cluster.Spec.ListenersConfig.ExternalListeners {
if lc.UsedForInnerBrokerCommunication {
containerPort = lc.ContainerPort
break
}
}

if containerPort <= 0 {
return -1, errors.New("no suitable listener found for using as Kafka bootstrap server configuration")
}
return listener.ContainerPort, nil
return containerPort, nil
}

func getBootstrapServers(cluster *v1beta1.KafkaCluster, useService bool) (string, error) {
Expand Down
18 changes: 9 additions & 9 deletions pkg/util/kafka/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,20 +134,20 @@ var MinimalKafkaCluster = &v1beta1.KafkaCluster{
InternalListeners: []v1beta1.InternalListenerConfig{
{
CommonListenerSpec: v1beta1.CommonListenerSpec{
Type: "plaintext",
Name: "internal",
ContainerPort: 29092,
Type: "plaintext",
Name: "internal",
ContainerPort: 29092,
UsedForInnerBrokerCommunication: true,
},
UsedForInnerBrokerCommunication: true,
},
{
CommonListenerSpec: v1beta1.CommonListenerSpec{
Type: "plaintext",
Name: "controller",
ContainerPort: 29093,
Type: "plaintext",
Name: "controller",
ContainerPort: 29093,
UsedForInnerBrokerCommunication: false,
},
UsedForInnerBrokerCommunication: false,
UsedForControllerCommunication: true,
UsedForControllerCommunication: true,
},
},
},
Expand Down

0 comments on commit d152a61

Please sign in to comment.