Skip to content

Commit

Permalink
Add ability to specify additional container ports for kafka clusters (b…
Browse files Browse the repository at this point in the history
  • Loading branch information
david-simon committed Nov 23, 2022
1 parent c51f8a7 commit 1839912
Show file tree
Hide file tree
Showing 9 changed files with 111 additions and 0 deletions.
2 changes: 2 additions & 0 deletions api/v1beta1/kafkacluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ const (
type KafkaClusterSpec struct {
HeadlessServiceEnabled bool `json:"headlessServiceEnabled"`
ListenersConfig ListenersConfig `json:"listenersConfig"`
// Custom ports to expose in the container. Example use case: a custom kafka distribution, that includes an integrated metrics api endpoint
AdditionalPorts []corev1.ContainerPort `json:"additionalPorts,omitempty"`
// ZKAddresses specifies the ZooKeeper connection string
// in the form hostname:port where host and port are the host and port of a ZooKeeper server.
ZKAddresses []string `json:"zkAddresses"`
Expand Down
5 changes: 5 additions & 0 deletions api/v1beta1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

38 changes: 38 additions & 0 deletions charts/kafka-operator/templates/crds.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,44 @@ spec:
spec:
description: KafkaClusterSpec defines the desired state of KafkaCluster
properties:
additionalPorts:
description: 'Custom ports to expose in the container. Example use
case: a custom kafka distribution, that includes an integrated metrics
api endpoint'
items:
description: ContainerPort represents a network port in a single
container.
properties:
containerPort:
description: Number of port to expose on the pod's IP address.
This must be a valid port number, 0 < x < 65536.
format: int32
type: integer
hostIP:
description: What host IP to bind the external port to.
type: string
hostPort:
description: Number of port to expose on the host. If specified,
this must be a valid port number, 0 < x < 65536. If HostNetwork
is specified, this must match ContainerPort. Most containers
do not need this.
format: int32
type: integer
name:
description: If specified, this must be an IANA_SVC_NAME and
unique within the pod. Each named port in a pod must have
a unique name. Name for the port that can be referred to by
services.
type: string
protocol:
default: TCP
description: Protocol for port. Must be UDP, TCP, or SCTP. Defaults
to "TCP".
type: string
required:
- containerPort
type: object
type: array
alertManagerConfig:
description: AlertManagerConfig defines configuration for alert manager
properties:
Expand Down
38 changes: 38 additions & 0 deletions config/base/crds/kafka.banzaicloud.io_kafkaclusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,44 @@ spec:
spec:
description: KafkaClusterSpec defines the desired state of KafkaCluster
properties:
additionalPorts:
description: 'Custom ports to expose in the container. Example use
case: a custom kafka distribution, that includes an integrated metrics
api endpoint'
items:
description: ContainerPort represents a network port in a single
container.
properties:
containerPort:
description: Number of port to expose on the pod's IP address.
This must be a valid port number, 0 < x < 65536.
format: int32
type: integer
hostIP:
description: What host IP to bind the external port to.
type: string
hostPort:
description: Number of port to expose on the host. If specified,
this must be a valid port number, 0 < x < 65536. If HostNetwork
is specified, this must match ContainerPort. Most containers
do not need this.
format: int32
type: integer
name:
description: If specified, this must be an IANA_SVC_NAME and
unique within the pod. Each named port in a pod must have
a unique name. Name for the port that can be referred to by
services.
type: string
protocol:
default: TCP
description: Protocol for port. Must be UDP, TCP, or SCTP. Defaults
to "TCP".
type: string
required:
- containerPort
type: object
type: array
alertManagerConfig:
description: AlertManagerConfig defines configuration for alert manager
properties:
Expand Down
4 changes: 4 additions & 0 deletions pkg/resources/kafka/allBrokerService.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ func (r *Reconciler) allBrokerService() runtime.Object {
usedPorts = append(usedPorts,
generateServicePortForEListeners(r.KafkaCluster.Spec.ListenersConfig.ExternalListeners)...)

// Append additional ports
usedPorts = append(usedPorts,
generateServicePortForAdditionalPorts(r.KafkaCluster.Spec.AdditionalPorts)...)

return &corev1.Service{
ObjectMeta: templates.ObjectMetaWithAnnotations(
fmt.Sprintf(kafkautils.AllBrokerServiceTemplate, r.KafkaCluster.GetName()),
Expand Down
4 changes: 4 additions & 0 deletions pkg/resources/kafka/headlessService.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ func (r *Reconciler) headlessService() runtime.Object {
usedPorts = append(usedPorts,
generateServicePortForEListeners(r.KafkaCluster.Spec.ListenersConfig.ExternalListeners)...)

// Append additional ports
usedPorts = append(usedPorts,
generateServicePortForAdditionalPorts(r.KafkaCluster.Spec.AdditionalPorts)...)

// prometheus metrics port for servicemonitor
usedPorts = append(usedPorts, corev1.ServicePort{
Name: "metrics",
Expand Down
13 changes: 13 additions & 0 deletions pkg/resources/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -1346,3 +1346,16 @@ func generateServicePortForEListeners(listeners []v1beta1.ExternalListenerConfig
}
return usedPorts
}

func generateServicePortForAdditionalPorts(containerPorts []corev1.ContainerPort) []corev1.ServicePort {
var usedPorts []corev1.ServicePort
for _, containerPort := range containerPorts {
usedPorts = append(usedPorts, corev1.ServicePort{
Name: containerPort.Name,
Protocol: containerPort.Protocol,
Port: containerPort.ContainerPort,
TargetPort: intstr.FromInt(int(containerPort.ContainerPort)),
})
}
return usedPorts
}
4 changes: 4 additions & 0 deletions pkg/resources/kafka/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ func (r *Reconciler) pod(id int32, brokerConfig *v1beta1.BrokerConfig, pvcs []co
})
}

for _, additionalPort := range r.KafkaCluster.Spec.AdditionalPorts {
kafkaBrokerContainerPorts = append(kafkaBrokerContainerPorts, additionalPort)
}

for _, envVar := range r.KafkaCluster.Spec.Envs {
if envVar.Name == "JMX_PORT" {
port, err := strconv.ParseInt(envVar.Value, 10, 32)
Expand Down
3 changes: 3 additions & 0 deletions pkg/resources/kafka/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ func (r *Reconciler) service(id int32, _ *v1beta1.BrokerConfig) runtime.Object {
// Append external listener ports
usedPorts = append(usedPorts,
generateServicePortForEListeners(r.KafkaCluster.Spec.ListenersConfig.ExternalListeners)...)
// Append additional ports
usedPorts = append(usedPorts,
generateServicePortForAdditionalPorts(r.KafkaCluster.Spec.AdditionalPorts)...)

usedPorts = append(usedPorts, corev1.ServicePort{
Name: "metrics",
Expand Down

0 comments on commit 1839912

Please sign in to comment.