diff --git a/pkg/resources/kafka/allBrokerService.go b/pkg/resources/kafka/allBrokerService.go index 155e047f6..ecfdd5b7b 100644 --- a/pkg/resources/kafka/allBrokerService.go +++ b/pkg/resources/kafka/allBrokerService.go @@ -15,19 +15,8 @@ package kafka import ( - "context" "fmt" - "emperror.dev/errors" - - "github.com/banzaicloud/koperator/api/v1beta1" - - "k8s.io/apimachinery/pkg/labels" - "k8s.io/apimachinery/pkg/selection" - - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "sigs.k8s.io/controller-runtime/pkg/client" - corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" @@ -64,67 +53,3 @@ func (r *Reconciler) allBrokerService() runtime.Object { }, } } - -// deleteNonHeadlessServices deletes the all-broker service that was created for the current KafkaCluster -// if there is any and also the service of each broker -func (r *Reconciler) deleteNonHeadlessServices() error { - ctx := context.Background() - - svc := corev1.Service{ - TypeMeta: metav1.TypeMeta{ - APIVersion: corev1.SchemeGroupVersion.String(), - Kind: "Service", - }, - ObjectMeta: metav1.ObjectMeta{ - Namespace: r.KafkaCluster.GetNamespace(), - Name: fmt.Sprintf(kafkautils.AllBrokerServiceTemplate, r.KafkaCluster.GetName()), - }, - } - - err := r.Client.Delete(ctx, &svc) - if err != nil && client.IgnoreNotFound(err) != nil { - return err - } - - // delete broker services - labelSelector := labels.NewSelector() - for k, v := range apiutil.LabelsForKafka(r.KafkaCluster.GetName()) { - req, err := labels.NewRequirement(k, selection.Equals, []string{v}) - if err != nil { - return err - } - labelSelector = labelSelector.Add(*req) - } - - // add "has label 'brokerId' to matching labels selector expression - req, err := labels.NewRequirement(v1beta1.BrokerIdLabelKey, selection.Exists, nil) - if err != nil { - return err - } - labelSelector = labelSelector.Add(*req) - - var services corev1.ServiceList - err = r.Client.List(ctx, &services, - client.InNamespace(r.KafkaCluster.GetNamespace()), - client.MatchingLabelsSelector{Selector: labelSelector}, - ) - - if err != nil { - return errors.WrapIfWithDetails(err, "failed to list services", - "namespace", r.KafkaCluster.GetNamespace(), - "label selector", labelSelector.String()) - } - - for i := range services.Items { - svc = services.Items[i] - if !svc.GetDeletionTimestamp().IsZero() { - continue - } - err = r.Client.Delete(ctx, &svc) - if err != nil && client.IgnoreNotFound(err) != nil { - return err - } - } - - return nil -} diff --git a/pkg/resources/kafka/headlessService.go b/pkg/resources/kafka/headlessService.go index 0f91140e6..f86a5a105 100644 --- a/pkg/resources/kafka/headlessService.go +++ b/pkg/resources/kafka/headlessService.go @@ -18,12 +18,17 @@ import ( "context" "fmt" + "emperror.dev/errors" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/selection" "k8s.io/apimachinery/pkg/util/intstr" "sigs.k8s.io/controller-runtime/pkg/client" + "github.com/banzaicloud/koperator/api/v1beta1" + apiutil "github.com/banzaicloud/koperator/api/util" "github.com/banzaicloud/koperator/pkg/resources/templates" kafkautils "github.com/banzaicloud/koperator/pkg/util/kafka" @@ -92,3 +97,94 @@ func (r *Reconciler) deleteHeadlessService() error { return err } + +// deleteNonHeadlessServices deletes the all-broker service that was created for the current KafkaCluster +// if there is any and also the service of each broker +func (r *Reconciler) deleteNonHeadlessServices(ctx context.Context) error { + svc := corev1.Service{ + TypeMeta: metav1.TypeMeta{ + APIVersion: corev1.SchemeGroupVersion.String(), + Kind: "Service", + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: r.KafkaCluster.GetNamespace(), + Name: fmt.Sprintf(kafkautils.AllBrokerServiceTemplate, r.KafkaCluster.GetName()), + }, + } + + err := r.Client.Delete(ctx, &svc) + if err != nil && client.IgnoreNotFound(err) != nil { + return err + } + + // delete broker services + labelSelector := labels.NewSelector() + for k, v := range apiutil.LabelsForKafka(r.KafkaCluster.GetName()) { + req, err := labels.NewRequirement(k, selection.Equals, []string{v}) + if err != nil { + return err + } + labelSelector = labelSelector.Add(*req) + } + + // add "has label 'brokerId' to matching labels selector expression + req, err := labels.NewRequirement(v1beta1.BrokerIdLabelKey, selection.Exists, nil) + if err != nil { + return err + } + labelSelector = labelSelector.Add(*req) + + var services corev1.ServiceList + err = r.Client.List(ctx, &services, + client.InNamespace(r.KafkaCluster.GetNamespace()), + client.MatchingLabelsSelector{Selector: labelSelector}, + ) + + if err != nil { + return errors.WrapIfWithDetails(err, "failed to list services", + "namespace", r.KafkaCluster.GetNamespace(), + "label selector", labelSelector.String()) + } + + // if NodePort is used for any of the external listeners, the corresponding services need to remain + // so that clients from outside the Kubernetes cluster can reach the brokers + filteredSvcsToDelete := services + if isNodePortAccessMethodInUseAmongExternalListeners(r.KafkaCluster.Spec.ListenersConfig.ExternalListeners) { + filteredSvcsToDelete = nonNodePortServices(services) + } + + for _, svc := range filteredSvcsToDelete.Items { + if !svc.GetDeletionTimestamp().IsZero() { + continue + } + err = r.Client.Delete(ctx, &svc) + if err != nil && client.IgnoreNotFound(err) != nil { + return err + } + } + + return nil +} + +// isNodePortAccessMethodInUseAmongExternalListeners returns true when users specify any of the external listeners to use NodePort +func isNodePortAccessMethodInUseAmongExternalListeners(externalListeners []v1beta1.ExternalListenerConfig) bool { + for _, externalListener := range externalListeners { + if externalListener.GetAccessMethod() == corev1.ServiceTypeNodePort { + return true + } + } + + return false +} + +func nonNodePortServices(services corev1.ServiceList) corev1.ServiceList { + var nonNodePortSvc corev1.ServiceList + + for _, svc := range services.Items { + if svc.Spec.Type != corev1.ServiceTypeNodePort { + nonNodePortSvc.Items = append(nonNodePortSvc.Items, svc) + } + } + + return nonNodePortSvc +} diff --git a/pkg/resources/kafka/headlessService_test.go b/pkg/resources/kafka/headlessService_test.go new file mode 100644 index 000000000..b8007fe71 --- /dev/null +++ b/pkg/resources/kafka/headlessService_test.go @@ -0,0 +1,298 @@ +// Copyright © 2023 Cisco Systems, Inc. and/or its affiliates +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package kafka + +import ( + "testing" + + "github.com/stretchr/testify/require" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "github.com/banzaicloud/koperator/api/v1beta1" +) + +func TestIsNodePortAccessMethodInUseAmongExternalListeners(t *testing.T) { + testCases := []struct { + testName string + externalListeners []v1beta1.ExternalListenerConfig + expectedNodePortAccessMethodInUseAmongExternalListeners bool + }{ + { + testName: "All external listeners use NodePort access method", + externalListeners: []v1beta1.ExternalListenerConfig{ + { + AccessMethod: corev1.ServiceTypeNodePort, + }, + { + AccessMethod: corev1.ServiceTypeNodePort, + }, + { + AccessMethod: corev1.ServiceTypeNodePort, + }, + }, + expectedNodePortAccessMethodInUseAmongExternalListeners: true, + }, + { + testName: "All external listeners use non-NodePort access method", + externalListeners: []v1beta1.ExternalListenerConfig{ + { + AccessMethod: corev1.ServiceTypeLoadBalancer, + }, + { + AccessMethod: corev1.ServiceTypeClusterIP, + }, + }, + expectedNodePortAccessMethodInUseAmongExternalListeners: false, + }, + { + testName: "External listeners with mixed usages of NodePort and non-NodePort access methods", + externalListeners: []v1beta1.ExternalListenerConfig{ + { + AccessMethod: corev1.ServiceTypeLoadBalancer, + }, + { + AccessMethod: corev1.ServiceTypeNodePort, + }, + }, + expectedNodePortAccessMethodInUseAmongExternalListeners: true, + }, + } + + for _, test := range testCases { + t.Run(test.testName, func(t *testing.T) { + actualNodePortAccessMethodInUseAmongExternalListeners := isNodePortAccessMethodInUseAmongExternalListeners(test.externalListeners) + + require.Equal(t, test.expectedNodePortAccessMethodInUseAmongExternalListeners, actualNodePortAccessMethodInUseAmongExternalListeners) + }) + } +} + +func TestNonNodePortServices(t *testing.T) { + testCases := []struct { + testName string + services corev1.ServiceList + expectedNonNodePortServices corev1.ServiceList + }{ + { + testName: "Services with mixed NodePort and non-NodePort services that are evenly distributed", + services: corev1.ServiceList{ + Items: []corev1.Service{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "NodePort service 1", + }, + Spec: corev1.ServiceSpec{ + Type: corev1.ServiceTypeNodePort, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "Non-NodePort service 1", + }, + Spec: corev1.ServiceSpec{ + Type: corev1.ServiceTypeLoadBalancer, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "NodePort service 2", + }, + Spec: corev1.ServiceSpec{ + Type: corev1.ServiceTypeNodePort, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "Non-NodePort service 2", + }, + Spec: corev1.ServiceSpec{ + Type: corev1.ServiceTypeClusterIP, + }, + }, + }, + }, + expectedNonNodePortServices: corev1.ServiceList{ + Items: []corev1.Service{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "Non-NodePort service 1", + }, + Spec: corev1.ServiceSpec{ + Type: corev1.ServiceTypeLoadBalancer, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "Non-NodePort service 2", + }, + Spec: corev1.ServiceSpec{ + Type: corev1.ServiceTypeClusterIP, + }, + }, + }, + }, + }, + { + testName: "Services with mixed NodePort and non-NodePort services that are non-evenly distributed", + services: corev1.ServiceList{ + Items: []corev1.Service{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "Non-NodePort service 1", + }, + Spec: corev1.ServiceSpec{ + Type: corev1.ServiceTypeLoadBalancer, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "NodePort service 1", + }, + Spec: corev1.ServiceSpec{ + Type: corev1.ServiceTypeNodePort, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "Non-NodePort service 2", + }, + Spec: corev1.ServiceSpec{ + Type: corev1.ServiceTypeClusterIP, + }, + }, + }, + }, + expectedNonNodePortServices: corev1.ServiceList{ + Items: []corev1.Service{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "Non-NodePort service 1", + }, + Spec: corev1.ServiceSpec{ + Type: corev1.ServiceTypeLoadBalancer, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "Non-NodePort service 2", + }, + Spec: corev1.ServiceSpec{ + Type: corev1.ServiceTypeClusterIP, + }, + }, + }, + }, + }, + { + testName: "Services with only NodePort services", + services: corev1.ServiceList{ + Items: []corev1.Service{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "NodePort service 1", + }, + Spec: corev1.ServiceSpec{ + Type: corev1.ServiceTypeNodePort, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "NodePort service 2", + }, + Spec: corev1.ServiceSpec{ + Type: corev1.ServiceTypeNodePort, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "NodePort service 3", + }, + Spec: corev1.ServiceSpec{ + Type: corev1.ServiceTypeNodePort, + }, + }, + }, + }, + expectedNonNodePortServices: corev1.ServiceList{}, + }, + { + testName: "Services with only non-NodePort services", + services: corev1.ServiceList{ + Items: []corev1.Service{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "Non-NodePort service 1", + }, + Spec: corev1.ServiceSpec{ + Type: corev1.ServiceTypeLoadBalancer, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "Non-NodePort service 2", + }, + Spec: corev1.ServiceSpec{ + Type: corev1.ServiceTypeClusterIP, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "Non-NodePort service 3", + }, + Spec: corev1.ServiceSpec{ + Type: corev1.ServiceTypeClusterIP, + }, + }, + }, + }, + expectedNonNodePortServices: corev1.ServiceList{ + Items: []corev1.Service{{ + ObjectMeta: metav1.ObjectMeta{ + Name: "Non-NodePort service 1", + }, + Spec: corev1.ServiceSpec{ + Type: corev1.ServiceTypeLoadBalancer, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "Non-NodePort service 2", + }, + Spec: corev1.ServiceSpec{ + Type: corev1.ServiceTypeClusterIP, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "Non-NodePort service 3", + }, + Spec: corev1.ServiceSpec{ + Type: corev1.ServiceTypeClusterIP, + }, + }, + }, + }, + }, + } + + for _, test := range testCases { + t.Run(test.testName, func(t *testing.T) { + actualNonNodePortServices := nonNodePortServices(test.services) + + require.Equal(t, test.expectedNonNodePortServices, actualNonNodePortServices) + }) + } +} diff --git a/pkg/resources/kafka/kafka.go b/pkg/resources/kafka/kafka.go index c1aca4e70..4fcb9c486 100644 --- a/pkg/resources/kafka/kafka.go +++ b/pkg/resources/kafka/kafka.go @@ -352,7 +352,7 @@ func (r *Reconciler) Reconcile(log logr.Logger) error { return err } // If dynamic configs can not be set then let the loop continue to the next broker, - // after the loop we return error. This solve that case when other brokers could get healthy, + // after the loop we return error. This solves that case when other brokers could get healthy, // but the loop exits too soon because dynamic configs can not be set. err = r.reconcilePerBrokerDynamicConfig(broker.Id, brokerConfig, configMap, log) if err != nil { @@ -373,16 +373,15 @@ func (r *Reconciler) Reconcile(log logr.Logger) error { return err } - // in case HeadlessServiceEnabled is changed delete the service that was created by the previous + // in case HeadlessServiceEnabled is changed, delete the service that was created by the previous // reconcile flow. The services must be deleted at the end of the reconcile flow after the new services // were created and broker configurations reflecting the new services otherwise the Kafka brokers // won't be reachable by koperator. if r.KafkaCluster.Spec.HeadlessServiceEnabled { - // delete non headless services for all brokers - log.V(1).Info("deleting non headless services for all brokers") + log.V(1).Info("deleting non-headless services for all of the brokers") - if err := r.deleteNonHeadlessServices(); err != nil { - return errors.WrapIfWithDetails(err, "failed to delete non headless services for all brokers", + if err := r.deleteNonHeadlessServices(ctx); err != nil { + return errors.WrapIfWithDetails(err, "failed to delete non-headless services for all of the brokers", "component", componentName, "clusterName", r.KafkaCluster.Name, "clusterNamespace", r.KafkaCluster.Namespace)