Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ability to specify additional container ports for kafka clusters (#897) #898

Merged
merged 3 commits into from
Nov 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions api/v1beta1/kafkacluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ const (
type KafkaClusterSpec struct {
HeadlessServiceEnabled bool `json:"headlessServiceEnabled"`
ListenersConfig ListenersConfig `json:"listenersConfig"`
// Custom ports to expose in the container. Example use case: a custom kafka distribution, that includes an integrated metrics api endpoint
AdditionalPorts []corev1.ContainerPort `json:"additionalPorts,omitempty"`
// ZKAddresses specifies the ZooKeeper connection string
// in the form hostname:port where host and port are the host and port of a ZooKeeper server.
ZKAddresses []string `json:"zkAddresses"`
Expand Down
5 changes: 5 additions & 0 deletions api/v1beta1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

38 changes: 38 additions & 0 deletions charts/kafka-operator/templates/crds.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,44 @@ spec:
spec:
description: KafkaClusterSpec defines the desired state of KafkaCluster
properties:
additionalPorts:
description: 'Custom ports to expose in the container. Example use
case: a custom kafka distribution, that includes an integrated metrics
api endpoint'
items:
description: ContainerPort represents a network port in a single
container.
properties:
containerPort:
description: Number of port to expose on the pod's IP address.
This must be a valid port number, 0 < x < 65536.
format: int32
type: integer
hostIP:
description: What host IP to bind the external port to.
type: string
hostPort:
description: Number of port to expose on the host. If specified,
this must be a valid port number, 0 < x < 65536. If HostNetwork
is specified, this must match ContainerPort. Most containers
do not need this.
format: int32
type: integer
name:
description: If specified, this must be an IANA_SVC_NAME and
unique within the pod. Each named port in a pod must have
a unique name. Name for the port that can be referred to by
services.
type: string
protocol:
default: TCP
description: Protocol for port. Must be UDP, TCP, or SCTP. Defaults
to "TCP".
type: string
required:
- containerPort
type: object
type: array
alertManagerConfig:
description: AlertManagerConfig defines configuration for alert manager
properties:
Expand Down
38 changes: 38 additions & 0 deletions config/base/crds/kafka.banzaicloud.io_kafkaclusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,44 @@ spec:
spec:
description: KafkaClusterSpec defines the desired state of KafkaCluster
properties:
additionalPorts:
description: 'Custom ports to expose in the container. Example use
case: a custom kafka distribution, that includes an integrated metrics
api endpoint'
items:
description: ContainerPort represents a network port in a single
container.
properties:
containerPort:
description: Number of port to expose on the pod's IP address.
This must be a valid port number, 0 < x < 65536.
format: int32
type: integer
hostIP:
description: What host IP to bind the external port to.
type: string
hostPort:
description: Number of port to expose on the host. If specified,
this must be a valid port number, 0 < x < 65536. If HostNetwork
is specified, this must match ContainerPort. Most containers
do not need this.
format: int32
type: integer
name:
description: If specified, this must be an IANA_SVC_NAME and
unique within the pod. Each named port in a pod must have
a unique name. Name for the port that can be referred to by
services.
type: string
protocol:
default: TCP
description: Protocol for port. Must be UDP, TCP, or SCTP. Defaults
to "TCP".
type: string
required:
- containerPort
type: object
type: array
alertManagerConfig:
description: AlertManagerConfig defines configuration for alert manager
properties:
Expand Down
5 changes: 5 additions & 0 deletions config/samples/banzaicloud_v1beta1_kafkacluster.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@ spec:
# Specify if the cluster should use headlessService for Kafka or individual services
# using service/broker may come in handy in case of service mesh
headlessServiceEnabled: true
# Specify additional broker ports to expose through services
additionalPorts:
- name: "remote-debug"
containerPort: 5005
protocol: "TCP"
# Specify the usable ingress controller, only envoy and istioingress supported can be left blank
ingressController: "envoy"
# Specify the zookeeper addresses where the Kafka should store it's metadata
Expand Down
4 changes: 4 additions & 0 deletions pkg/resources/kafka/allBrokerService.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ func (r *Reconciler) allBrokerService() runtime.Object {
usedPorts = append(usedPorts,
generateServicePortForEListeners(r.KafkaCluster.Spec.ListenersConfig.ExternalListeners)...)

// Append additional ports
usedPorts = append(usedPorts,
generateServicePortForAdditionalPorts(r.KafkaCluster.Spec.AdditionalPorts)...)

return &corev1.Service{
ObjectMeta: templates.ObjectMetaWithAnnotations(
fmt.Sprintf(kafkautils.AllBrokerServiceTemplate, r.KafkaCluster.GetName()),
Expand Down
4 changes: 4 additions & 0 deletions pkg/resources/kafka/headlessService.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ func (r *Reconciler) headlessService() runtime.Object {
usedPorts = append(usedPorts,
generateServicePortForEListeners(r.KafkaCluster.Spec.ListenersConfig.ExternalListeners)...)

// Append additional ports
usedPorts = append(usedPorts,
generateServicePortForAdditionalPorts(r.KafkaCluster.Spec.AdditionalPorts)...)

// prometheus metrics port for servicemonitor
usedPorts = append(usedPorts, corev1.ServicePort{
Name: "metrics",
Expand Down
13 changes: 13 additions & 0 deletions pkg/resources/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -1346,3 +1346,16 @@ func generateServicePortForEListeners(listeners []v1beta1.ExternalListenerConfig
}
return usedPorts
}

func generateServicePortForAdditionalPorts(containerPorts []corev1.ContainerPort) []corev1.ServicePort {
var usedPorts []corev1.ServicePort
for _, containerPort := range containerPorts {
usedPorts = append(usedPorts, corev1.ServicePort{
Name: containerPort.Name,
Protocol: containerPort.Protocol,
Port: containerPort.ContainerPort,
TargetPort: intstr.FromInt(int(containerPort.ContainerPort)),
})
}
return usedPorts
}
2 changes: 2 additions & 0 deletions pkg/resources/kafka/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ func (r *Reconciler) pod(id int32, brokerConfig *v1beta1.BrokerConfig, pvcs []co
})
}

kafkaBrokerContainerPorts = append(kafkaBrokerContainerPorts, r.KafkaCluster.Spec.AdditionalPorts...)

for _, envVar := range r.KafkaCluster.Spec.Envs {
if envVar.Name == "JMX_PORT" {
port, err := strconv.ParseInt(envVar.Value, 10, 32)
Expand Down
3 changes: 3 additions & 0 deletions pkg/resources/kafka/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ func (r *Reconciler) service(id int32, _ *v1beta1.BrokerConfig) runtime.Object {
// Append external listener ports
usedPorts = append(usedPorts,
generateServicePortForEListeners(r.KafkaCluster.Spec.ListenersConfig.ExternalListeners)...)
// Append additional ports
usedPorts = append(usedPorts,
generateServicePortForAdditionalPorts(r.KafkaCluster.Spec.AdditionalPorts)...)

usedPorts = append(usedPorts, corev1.ServicePort{
Name: "metrics",
Expand Down