From 7724bca8f3fd176fcc7eb4ff0b88b69424bef3a3 Mon Sep 17 00:00:00 2001 From: David Simon Date: Tue, 22 Nov 2022 15:17:40 +0100 Subject: [PATCH 1/3] Add ability to specify additional container ports for kafka clusters (#897) --- api/v1beta1/kafkacluster_types.go | 2 + api/v1beta1/zz_generated.deepcopy.go | 5 +++ charts/kafka-operator/templates/crds.yaml | 38 +++++++++++++++++++ .../kafka.banzaicloud.io_kafkaclusters.yaml | 38 +++++++++++++++++++ pkg/resources/kafka/allBrokerService.go | 4 ++ pkg/resources/kafka/headlessService.go | 4 ++ pkg/resources/kafka/kafka.go | 13 +++++++ pkg/resources/kafka/pod.go | 4 ++ pkg/resources/kafka/service.go | 3 ++ 9 files changed, 111 insertions(+) diff --git a/api/v1beta1/kafkacluster_types.go b/api/v1beta1/kafkacluster_types.go index c8de5f052..e8aa72bc8 100644 --- a/api/v1beta1/kafkacluster_types.go +++ b/api/v1beta1/kafkacluster_types.go @@ -60,6 +60,8 @@ const ( type KafkaClusterSpec struct { HeadlessServiceEnabled bool `json:"headlessServiceEnabled"` ListenersConfig ListenersConfig `json:"listenersConfig"` + // Custom ports to expose in the container. Example use case: a custom kafka distribution, that includes an integrated metrics api endpoint + AdditionalPorts []corev1.ContainerPort `json:"additionalPorts,omitempty"` // ZKAddresses specifies the ZooKeeper connection string // in the form hostname:port where host and port are the host and port of a ZooKeeper server. ZKAddresses []string `json:"zkAddresses"` diff --git a/api/v1beta1/zz_generated.deepcopy.go b/api/v1beta1/zz_generated.deepcopy.go index ee6805464..629fd328e 100644 --- a/api/v1beta1/zz_generated.deepcopy.go +++ b/api/v1beta1/zz_generated.deepcopy.go @@ -796,6 +796,11 @@ func (in *KafkaClusterList) DeepCopyObject() runtime.Object { func (in *KafkaClusterSpec) DeepCopyInto(out *KafkaClusterSpec) { *out = *in in.ListenersConfig.DeepCopyInto(&out.ListenersConfig) + if in.AdditionalPorts != nil { + in, out := &in.AdditionalPorts, &out.AdditionalPorts + *out = make([]v1.ContainerPort, len(*in)) + copy(*out, *in) + } if in.ZKAddresses != nil { in, out := &in.ZKAddresses, &out.ZKAddresses *out = make([]string, len(*in)) diff --git a/charts/kafka-operator/templates/crds.yaml b/charts/kafka-operator/templates/crds.yaml index fa801775d..6b4746ecf 100644 --- a/charts/kafka-operator/templates/crds.yaml +++ b/charts/kafka-operator/templates/crds.yaml @@ -214,6 +214,44 @@ spec: spec: description: KafkaClusterSpec defines the desired state of KafkaCluster properties: + additionalPorts: + description: 'Custom ports to expose in the container. Example use + case: a custom kafka distribution, that includes an integrated metrics + api endpoint' + items: + description: ContainerPort represents a network port in a single + container. + properties: + containerPort: + description: Number of port to expose on the pod's IP address. + This must be a valid port number, 0 < x < 65536. + format: int32 + type: integer + hostIP: + description: What host IP to bind the external port to. + type: string + hostPort: + description: Number of port to expose on the host. If specified, + this must be a valid port number, 0 < x < 65536. If HostNetwork + is specified, this must match ContainerPort. Most containers + do not need this. + format: int32 + type: integer + name: + description: If specified, this must be an IANA_SVC_NAME and + unique within the pod. Each named port in a pod must have + a unique name. Name for the port that can be referred to by + services. + type: string + protocol: + default: TCP + description: Protocol for port. Must be UDP, TCP, or SCTP. Defaults + to "TCP". + type: string + required: + - containerPort + type: object + type: array alertManagerConfig: description: AlertManagerConfig defines configuration for alert manager properties: diff --git a/config/base/crds/kafka.banzaicloud.io_kafkaclusters.yaml b/config/base/crds/kafka.banzaicloud.io_kafkaclusters.yaml index 816e953fe..8316fb02d 100644 --- a/config/base/crds/kafka.banzaicloud.io_kafkaclusters.yaml +++ b/config/base/crds/kafka.banzaicloud.io_kafkaclusters.yaml @@ -51,6 +51,44 @@ spec: spec: description: KafkaClusterSpec defines the desired state of KafkaCluster properties: + additionalPorts: + description: 'Custom ports to expose in the container. Example use + case: a custom kafka distribution, that includes an integrated metrics + api endpoint' + items: + description: ContainerPort represents a network port in a single + container. + properties: + containerPort: + description: Number of port to expose on the pod's IP address. + This must be a valid port number, 0 < x < 65536. + format: int32 + type: integer + hostIP: + description: What host IP to bind the external port to. + type: string + hostPort: + description: Number of port to expose on the host. If specified, + this must be a valid port number, 0 < x < 65536. If HostNetwork + is specified, this must match ContainerPort. Most containers + do not need this. + format: int32 + type: integer + name: + description: If specified, this must be an IANA_SVC_NAME and + unique within the pod. Each named port in a pod must have + a unique name. Name for the port that can be referred to by + services. + type: string + protocol: + default: TCP + description: Protocol for port. Must be UDP, TCP, or SCTP. Defaults + to "TCP". + type: string + required: + - containerPort + type: object + type: array alertManagerConfig: description: AlertManagerConfig defines configuration for alert manager properties: diff --git a/pkg/resources/kafka/allBrokerService.go b/pkg/resources/kafka/allBrokerService.go index 2ba23dd0e..155e047f6 100644 --- a/pkg/resources/kafka/allBrokerService.go +++ b/pkg/resources/kafka/allBrokerService.go @@ -46,6 +46,10 @@ func (r *Reconciler) allBrokerService() runtime.Object { usedPorts = append(usedPorts, generateServicePortForEListeners(r.KafkaCluster.Spec.ListenersConfig.ExternalListeners)...) + // Append additional ports + usedPorts = append(usedPorts, + generateServicePortForAdditionalPorts(r.KafkaCluster.Spec.AdditionalPorts)...) + return &corev1.Service{ ObjectMeta: templates.ObjectMetaWithAnnotations( fmt.Sprintf(kafkautils.AllBrokerServiceTemplate, r.KafkaCluster.GetName()), diff --git a/pkg/resources/kafka/headlessService.go b/pkg/resources/kafka/headlessService.go index 595fa6dc4..0f91140e6 100644 --- a/pkg/resources/kafka/headlessService.go +++ b/pkg/resources/kafka/headlessService.go @@ -39,6 +39,10 @@ func (r *Reconciler) headlessService() runtime.Object { usedPorts = append(usedPorts, generateServicePortForEListeners(r.KafkaCluster.Spec.ListenersConfig.ExternalListeners)...) + // Append additional ports + usedPorts = append(usedPorts, + generateServicePortForAdditionalPorts(r.KafkaCluster.Spec.AdditionalPorts)...) + // prometheus metrics port for servicemonitor usedPorts = append(usedPorts, corev1.ServicePort{ Name: "metrics", diff --git a/pkg/resources/kafka/kafka.go b/pkg/resources/kafka/kafka.go index 051848125..c1aca4e70 100644 --- a/pkg/resources/kafka/kafka.go +++ b/pkg/resources/kafka/kafka.go @@ -1346,3 +1346,16 @@ func generateServicePortForEListeners(listeners []v1beta1.ExternalListenerConfig } return usedPorts } + +func generateServicePortForAdditionalPorts(containerPorts []corev1.ContainerPort) []corev1.ServicePort { + var usedPorts []corev1.ServicePort + for _, containerPort := range containerPorts { + usedPorts = append(usedPorts, corev1.ServicePort{ + Name: containerPort.Name, + Protocol: containerPort.Protocol, + Port: containerPort.ContainerPort, + TargetPort: intstr.FromInt(int(containerPort.ContainerPort)), + }) + } + return usedPorts +} diff --git a/pkg/resources/kafka/pod.go b/pkg/resources/kafka/pod.go index 4a80dce6a..d9f48217c 100644 --- a/pkg/resources/kafka/pod.go +++ b/pkg/resources/kafka/pod.go @@ -63,6 +63,10 @@ func (r *Reconciler) pod(id int32, brokerConfig *v1beta1.BrokerConfig, pvcs []co }) } + for _, additionalPort := range r.KafkaCluster.Spec.AdditionalPorts { + kafkaBrokerContainerPorts = append(kafkaBrokerContainerPorts, additionalPort) + } + for _, envVar := range r.KafkaCluster.Spec.Envs { if envVar.Name == "JMX_PORT" { port, err := strconv.ParseInt(envVar.Value, 10, 32) diff --git a/pkg/resources/kafka/service.go b/pkg/resources/kafka/service.go index 3166b9974..fd53e5dc1 100644 --- a/pkg/resources/kafka/service.go +++ b/pkg/resources/kafka/service.go @@ -35,6 +35,9 @@ func (r *Reconciler) service(id int32, _ *v1beta1.BrokerConfig) runtime.Object { // Append external listener ports usedPorts = append(usedPorts, generateServicePortForEListeners(r.KafkaCluster.Spec.ListenersConfig.ExternalListeners)...) + // Append additional ports + usedPorts = append(usedPorts, + generateServicePortForAdditionalPorts(r.KafkaCluster.Spec.AdditionalPorts)...) usedPorts = append(usedPorts, corev1.ServicePort{ Name: "metrics", From b0b76c876c4637a3d45ea3e335db7d70d699e715 Mon Sep 17 00:00:00 2001 From: David Simon Date: Wed, 23 Nov 2022 10:28:41 +0100 Subject: [PATCH 2/3] fix lint issue --- pkg/resources/kafka/pod.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/pkg/resources/kafka/pod.go b/pkg/resources/kafka/pod.go index d9f48217c..227cfdbc6 100644 --- a/pkg/resources/kafka/pod.go +++ b/pkg/resources/kafka/pod.go @@ -63,9 +63,7 @@ func (r *Reconciler) pod(id int32, brokerConfig *v1beta1.BrokerConfig, pvcs []co }) } - for _, additionalPort := range r.KafkaCluster.Spec.AdditionalPorts { - kafkaBrokerContainerPorts = append(kafkaBrokerContainerPorts, additionalPort) - } + kafkaBrokerContainerPorts = append(kafkaBrokerContainerPorts, r.KafkaCluster.Spec.AdditionalPorts...) for _, envVar := range r.KafkaCluster.Spec.Envs { if envVar.Name == "JMX_PORT" { From 1358e82cb8750e271313ab511632d848c147d0a3 Mon Sep 17 00:00:00 2001 From: David Simon Date: Thu, 24 Nov 2022 13:47:14 +0100 Subject: [PATCH 3/3] add example --- config/samples/banzaicloud_v1beta1_kafkacluster.yaml | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/config/samples/banzaicloud_v1beta1_kafkacluster.yaml b/config/samples/banzaicloud_v1beta1_kafkacluster.yaml index 07f4362ef..ef3af5e6e 100644 --- a/config/samples/banzaicloud_v1beta1_kafkacluster.yaml +++ b/config/samples/banzaicloud_v1beta1_kafkacluster.yaml @@ -8,6 +8,11 @@ spec: # Specify if the cluster should use headlessService for Kafka or individual services # using service/broker may come in handy in case of service mesh headlessServiceEnabled: true + # Specify additional broker ports to expose through services + additionalPorts: + - name: "remote-debug" + containerPort: 5005 + protocol: "TCP" # Specify the usable ingress controller, only envoy and istioingress supported can be left blank ingressController: "envoy" # Specify the zookeeper addresses where the Kafka should store it's metadata