Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prevent kafka controller from running into NodePort service deletion and re-creation cycles indefinitely #928

Merged
merged 11 commits into from
Feb 16, 2023
75 changes: 0 additions & 75 deletions pkg/resources/kafka/allBrokerService.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,8 @@
package kafka

import (
"context"
"fmt"

"emperror.dev/errors"

"github.com/banzaicloud/koperator/api/v1beta1"

"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/selection"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/client"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"

Expand Down Expand Up @@ -64,67 +53,3 @@ func (r *Reconciler) allBrokerService() runtime.Object {
},
}
}

// deleteNonHeadlessServices deletes the all-broker service that was created for the current KafkaCluster
// if there is any and also the service of each broker
func (r *Reconciler) deleteNonHeadlessServices() error {
panyuenlau marked this conversation as resolved.
Show resolved Hide resolved
ctx := context.Background()

svc := corev1.Service{
TypeMeta: metav1.TypeMeta{
APIVersion: corev1.SchemeGroupVersion.String(),
Kind: "Service",
},
ObjectMeta: metav1.ObjectMeta{
Namespace: r.KafkaCluster.GetNamespace(),
Name: fmt.Sprintf(kafkautils.AllBrokerServiceTemplate, r.KafkaCluster.GetName()),
},
}

err := r.Client.Delete(ctx, &svc)
if err != nil && client.IgnoreNotFound(err) != nil {
return err
}

// delete broker services
labelSelector := labels.NewSelector()
for k, v := range apiutil.LabelsForKafka(r.KafkaCluster.GetName()) {
req, err := labels.NewRequirement(k, selection.Equals, []string{v})
if err != nil {
return err
}
labelSelector = labelSelector.Add(*req)
}

// add "has label 'brokerId' to matching labels selector expression
req, err := labels.NewRequirement(v1beta1.BrokerIdLabelKey, selection.Exists, nil)
if err != nil {
return err
}
labelSelector = labelSelector.Add(*req)

var services corev1.ServiceList
err = r.Client.List(ctx, &services,
client.InNamespace(r.KafkaCluster.GetNamespace()),
client.MatchingLabelsSelector{Selector: labelSelector},
)

if err != nil {
return errors.WrapIfWithDetails(err, "failed to list services",
"namespace", r.KafkaCluster.GetNamespace(),
"label selector", labelSelector.String())
}

for i := range services.Items {
svc = services.Items[i]
if !svc.GetDeletionTimestamp().IsZero() {
continue
}
err = r.Client.Delete(ctx, &svc)
if err != nil && client.IgnoreNotFound(err) != nil {
return err
}
}

return nil
}
96 changes: 96 additions & 0 deletions pkg/resources/kafka/headlessService.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,17 @@ import (
"context"
"fmt"

"emperror.dev/errors"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/selection"
"k8s.io/apimachinery/pkg/util/intstr"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/banzaicloud/koperator/api/v1beta1"

apiutil "github.com/banzaicloud/koperator/api/util"
"github.com/banzaicloud/koperator/pkg/resources/templates"
kafkautils "github.com/banzaicloud/koperator/pkg/util/kafka"
Expand Down Expand Up @@ -92,3 +97,94 @@ func (r *Reconciler) deleteHeadlessService() error {

return err
}

// deleteNonHeadlessServices deletes the all-broker service that was created for the current KafkaCluster
// if there is any and also the service of each broker
func (r *Reconciler) deleteNonHeadlessServices(ctx context.Context) error {
svc := corev1.Service{
TypeMeta: metav1.TypeMeta{
APIVersion: corev1.SchemeGroupVersion.String(),
Kind: "Service",
},
ObjectMeta: metav1.ObjectMeta{
Namespace: r.KafkaCluster.GetNamespace(),
Name: fmt.Sprintf(kafkautils.AllBrokerServiceTemplate, r.KafkaCluster.GetName()),
},
}

err := r.Client.Delete(ctx, &svc)
if err != nil && client.IgnoreNotFound(err) != nil {
return err
}

// delete broker services
labelSelector := labels.NewSelector()
for k, v := range apiutil.LabelsForKafka(r.KafkaCluster.GetName()) {
req, err := labels.NewRequirement(k, selection.Equals, []string{v})
if err != nil {
return err
}
labelSelector = labelSelector.Add(*req)
}

// add "has label 'brokerId' to matching labels selector expression
req, err := labels.NewRequirement(v1beta1.BrokerIdLabelKey, selection.Exists, nil)
if err != nil {
return err
}
labelSelector = labelSelector.Add(*req)

var services corev1.ServiceList
err = r.Client.List(ctx, &services,
client.InNamespace(r.KafkaCluster.GetNamespace()),
client.MatchingLabelsSelector{Selector: labelSelector},
)

if err != nil {
return errors.WrapIfWithDetails(err, "failed to list services",
"namespace", r.KafkaCluster.GetNamespace(),
"label selector", labelSelector.String())
}

// if NodePort is used for any of the external listeners, the corresponding services need to remain
// so that clients from outside the Kubernetes cluster can reach the brokers
filteredSvcsToDelete := services
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we dont have to use another variable for this.
we can use the services all way long.
later: services = nonNodePortServices(services)
...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea...but having this little extra variable named filteredSvcsToDelete would make the implementation more readable

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is clear enough to me services = nonNodePortServices(services).
It is just my opinion and a suggestion. Your solution is ok for me.

Copy link
Contributor

@bartam1 bartam1 Feb 15, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can also use here (

var services corev1.ServiceList
) name like: filteredSvcsToDelete
and filteredSvcsToDelete = nonNodePortBrokerServices(filteredSvcsToDelete).

This

filteredSvcsToDelete := services

"services" already filtered by label selector so it is equal filteredSvcsToDelete

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"services" already filtered by label selector so it is equal filteredSvcsToDelete

I guess I have a slightly different interpretation - we use the label selectors to query all of the services that match the labels (and they are stored in the services variable), then we filter them based on the external listener configuration, which results in filteredSvcsToDelete and these are the services that we will actually remove at the end

Using a different variable with a more declarative name would make it more understandable to people that are not familiar with the existing codebase IMO - so I' d prefer to keep it as it is

if isNodePortAccessMethodInUseAmongExternalListeners(r.KafkaCluster.Spec.ListenersConfig.ExternalListeners) {
filteredSvcsToDelete = nonNodePortServices(services)
}

for _, svc := range filteredSvcsToDelete.Items {
if !svc.GetDeletionTimestamp().IsZero() {
continue
}
err = r.Client.Delete(ctx, &svc)
if err != nil && client.IgnoreNotFound(err) != nil {
return err
}
}

return nil
}
pregnor marked this conversation as resolved.
Show resolved Hide resolved

// isNodePortAccessMethodInUseAmongExternalListeners returns true when users specify any of the external listeners to use NodePort
func isNodePortAccessMethodInUseAmongExternalListeners(externalListeners []v1beta1.ExternalListenerConfig) bool {
for _, externalListener := range externalListeners {
if externalListener.GetAccessMethod() == corev1.ServiceTypeNodePort {
return true
}
}

return false
}

func nonNodePortServices(services corev1.ServiceList) corev1.ServiceList {
var nonNodePortSvc corev1.ServiceList
panyuenlau marked this conversation as resolved.
Show resolved Hide resolved

for _, svc := range services.Items {
if svc.Spec.Type != corev1.ServiceTypeNodePort {
nonNodePortSvc.Items = append(nonNodePortSvc.Items, svc)
}
}

return nonNodePortSvc
}