From 1839912f2fa9ffaf660a5f3bbfef5dd4ddf159e8 Mon Sep 17 00:00:00 2001 From: David Simon Date: Tue, 22 Nov 2022 15:17:40 +0100 Subject: [PATCH] Add ability to specify additional container ports for kafka clusters (#897) --- api/v1beta1/kafkacluster_types.go | 2 + api/v1beta1/zz_generated.deepcopy.go | 5 +++ charts/kafka-operator/templates/crds.yaml | 38 +++++++++++++++++++ .../kafka.banzaicloud.io_kafkaclusters.yaml | 38 +++++++++++++++++++ pkg/resources/kafka/allBrokerService.go | 4 ++ pkg/resources/kafka/headlessService.go | 4 ++ pkg/resources/kafka/kafka.go | 13 +++++++ pkg/resources/kafka/pod.go | 4 ++ pkg/resources/kafka/service.go | 3 ++ 9 files changed, 111 insertions(+) diff --git a/api/v1beta1/kafkacluster_types.go b/api/v1beta1/kafkacluster_types.go index c8de5f0524..e8aa72bc80 100644 --- a/api/v1beta1/kafkacluster_types.go +++ b/api/v1beta1/kafkacluster_types.go @@ -60,6 +60,8 @@ const ( type KafkaClusterSpec struct { HeadlessServiceEnabled bool `json:"headlessServiceEnabled"` ListenersConfig ListenersConfig `json:"listenersConfig"` + // Custom ports to expose in the container. Example use case: a custom kafka distribution, that includes an integrated metrics api endpoint + AdditionalPorts []corev1.ContainerPort `json:"additionalPorts,omitempty"` // ZKAddresses specifies the ZooKeeper connection string // in the form hostname:port where host and port are the host and port of a ZooKeeper server. ZKAddresses []string `json:"zkAddresses"` diff --git a/api/v1beta1/zz_generated.deepcopy.go b/api/v1beta1/zz_generated.deepcopy.go index ee68054646..629fd328ea 100644 --- a/api/v1beta1/zz_generated.deepcopy.go +++ b/api/v1beta1/zz_generated.deepcopy.go @@ -796,6 +796,11 @@ func (in *KafkaClusterList) DeepCopyObject() runtime.Object { func (in *KafkaClusterSpec) DeepCopyInto(out *KafkaClusterSpec) { *out = *in in.ListenersConfig.DeepCopyInto(&out.ListenersConfig) + if in.AdditionalPorts != nil { + in, out := &in.AdditionalPorts, &out.AdditionalPorts + *out = make([]v1.ContainerPort, len(*in)) + copy(*out, *in) + } if in.ZKAddresses != nil { in, out := &in.ZKAddresses, &out.ZKAddresses *out = make([]string, len(*in)) diff --git a/charts/kafka-operator/templates/crds.yaml b/charts/kafka-operator/templates/crds.yaml index fa801775df..6b4746ecfe 100644 --- a/charts/kafka-operator/templates/crds.yaml +++ b/charts/kafka-operator/templates/crds.yaml @@ -214,6 +214,44 @@ spec: spec: description: KafkaClusterSpec defines the desired state of KafkaCluster properties: + additionalPorts: + description: 'Custom ports to expose in the container. Example use + case: a custom kafka distribution, that includes an integrated metrics + api endpoint' + items: + description: ContainerPort represents a network port in a single + container. + properties: + containerPort: + description: Number of port to expose on the pod's IP address. + This must be a valid port number, 0 < x < 65536. + format: int32 + type: integer + hostIP: + description: What host IP to bind the external port to. + type: string + hostPort: + description: Number of port to expose on the host. If specified, + this must be a valid port number, 0 < x < 65536. If HostNetwork + is specified, this must match ContainerPort. Most containers + do not need this. + format: int32 + type: integer + name: + description: If specified, this must be an IANA_SVC_NAME and + unique within the pod. Each named port in a pod must have + a unique name. Name for the port that can be referred to by + services. + type: string + protocol: + default: TCP + description: Protocol for port. Must be UDP, TCP, or SCTP. Defaults + to "TCP". + type: string + required: + - containerPort + type: object + type: array alertManagerConfig: description: AlertManagerConfig defines configuration for alert manager properties: diff --git a/config/base/crds/kafka.banzaicloud.io_kafkaclusters.yaml b/config/base/crds/kafka.banzaicloud.io_kafkaclusters.yaml index 816e953fe9..8316fb02de 100644 --- a/config/base/crds/kafka.banzaicloud.io_kafkaclusters.yaml +++ b/config/base/crds/kafka.banzaicloud.io_kafkaclusters.yaml @@ -51,6 +51,44 @@ spec: spec: description: KafkaClusterSpec defines the desired state of KafkaCluster properties: + additionalPorts: + description: 'Custom ports to expose in the container. Example use + case: a custom kafka distribution, that includes an integrated metrics + api endpoint' + items: + description: ContainerPort represents a network port in a single + container. + properties: + containerPort: + description: Number of port to expose on the pod's IP address. + This must be a valid port number, 0 < x < 65536. + format: int32 + type: integer + hostIP: + description: What host IP to bind the external port to. + type: string + hostPort: + description: Number of port to expose on the host. If specified, + this must be a valid port number, 0 < x < 65536. If HostNetwork + is specified, this must match ContainerPort. Most containers + do not need this. + format: int32 + type: integer + name: + description: If specified, this must be an IANA_SVC_NAME and + unique within the pod. Each named port in a pod must have + a unique name. Name for the port that can be referred to by + services. + type: string + protocol: + default: TCP + description: Protocol for port. Must be UDP, TCP, or SCTP. Defaults + to "TCP". + type: string + required: + - containerPort + type: object + type: array alertManagerConfig: description: AlertManagerConfig defines configuration for alert manager properties: diff --git a/pkg/resources/kafka/allBrokerService.go b/pkg/resources/kafka/allBrokerService.go index 2ba23dd0e5..155e047f6b 100644 --- a/pkg/resources/kafka/allBrokerService.go +++ b/pkg/resources/kafka/allBrokerService.go @@ -46,6 +46,10 @@ func (r *Reconciler) allBrokerService() runtime.Object { usedPorts = append(usedPorts, generateServicePortForEListeners(r.KafkaCluster.Spec.ListenersConfig.ExternalListeners)...) + // Append additional ports + usedPorts = append(usedPorts, + generateServicePortForAdditionalPorts(r.KafkaCluster.Spec.AdditionalPorts)...) + return &corev1.Service{ ObjectMeta: templates.ObjectMetaWithAnnotations( fmt.Sprintf(kafkautils.AllBrokerServiceTemplate, r.KafkaCluster.GetName()), diff --git a/pkg/resources/kafka/headlessService.go b/pkg/resources/kafka/headlessService.go index 595fa6dc49..0f91140e6c 100644 --- a/pkg/resources/kafka/headlessService.go +++ b/pkg/resources/kafka/headlessService.go @@ -39,6 +39,10 @@ func (r *Reconciler) headlessService() runtime.Object { usedPorts = append(usedPorts, generateServicePortForEListeners(r.KafkaCluster.Spec.ListenersConfig.ExternalListeners)...) + // Append additional ports + usedPorts = append(usedPorts, + generateServicePortForAdditionalPorts(r.KafkaCluster.Spec.AdditionalPorts)...) + // prometheus metrics port for servicemonitor usedPorts = append(usedPorts, corev1.ServicePort{ Name: "metrics", diff --git a/pkg/resources/kafka/kafka.go b/pkg/resources/kafka/kafka.go index 0518481259..c1aca4e705 100644 --- a/pkg/resources/kafka/kafka.go +++ b/pkg/resources/kafka/kafka.go @@ -1346,3 +1346,16 @@ func generateServicePortForEListeners(listeners []v1beta1.ExternalListenerConfig } return usedPorts } + +func generateServicePortForAdditionalPorts(containerPorts []corev1.ContainerPort) []corev1.ServicePort { + var usedPorts []corev1.ServicePort + for _, containerPort := range containerPorts { + usedPorts = append(usedPorts, corev1.ServicePort{ + Name: containerPort.Name, + Protocol: containerPort.Protocol, + Port: containerPort.ContainerPort, + TargetPort: intstr.FromInt(int(containerPort.ContainerPort)), + }) + } + return usedPorts +} diff --git a/pkg/resources/kafka/pod.go b/pkg/resources/kafka/pod.go index 4a80dce6a9..d9f48217cc 100644 --- a/pkg/resources/kafka/pod.go +++ b/pkg/resources/kafka/pod.go @@ -63,6 +63,10 @@ func (r *Reconciler) pod(id int32, brokerConfig *v1beta1.BrokerConfig, pvcs []co }) } + for _, additionalPort := range r.KafkaCluster.Spec.AdditionalPorts { + kafkaBrokerContainerPorts = append(kafkaBrokerContainerPorts, additionalPort) + } + for _, envVar := range r.KafkaCluster.Spec.Envs { if envVar.Name == "JMX_PORT" { port, err := strconv.ParseInt(envVar.Value, 10, 32) diff --git a/pkg/resources/kafka/service.go b/pkg/resources/kafka/service.go index 3166b99742..fd53e5dc1e 100644 --- a/pkg/resources/kafka/service.go +++ b/pkg/resources/kafka/service.go @@ -35,6 +35,9 @@ func (r *Reconciler) service(id int32, _ *v1beta1.BrokerConfig) runtime.Object { // Append external listener ports usedPorts = append(usedPorts, generateServicePortForEListeners(r.KafkaCluster.Spec.ListenersConfig.ExternalListeners)...) + // Append additional ports + usedPorts = append(usedPorts, + generateServicePortForAdditionalPorts(r.KafkaCluster.Spec.AdditionalPorts)...) usedPorts = append(usedPorts, corev1.ServicePort{ Name: "metrics",