From 53b326f68f30dd0723729a6c0ec5b63933c92d2c Mon Sep 17 00:00:00 2001 From: Darren Lau Date: Thu, 9 Feb 2023 16:51:34 -0500 Subject: [PATCH 1/6] Prevent kafka controller from running into NodePort service deletion and re-creation cycles indefinitely --- pkg/resources/kafka/allBrokerService.go | 75 ------ pkg/resources/kafka/headlessService.go | 98 ++++++++ pkg/resources/kafka/headlessService_test.go | 242 ++++++++++++++++++++ pkg/resources/kafka/kafka.go | 6 +- 4 files changed, 343 insertions(+), 78 deletions(-) create mode 100644 pkg/resources/kafka/headlessService_test.go diff --git a/pkg/resources/kafka/allBrokerService.go b/pkg/resources/kafka/allBrokerService.go index 155e047f6..ecfdd5b7b 100644 --- a/pkg/resources/kafka/allBrokerService.go +++ b/pkg/resources/kafka/allBrokerService.go @@ -15,19 +15,8 @@ package kafka import ( - "context" "fmt" - "emperror.dev/errors" - - "github.com/banzaicloud/koperator/api/v1beta1" - - "k8s.io/apimachinery/pkg/labels" - "k8s.io/apimachinery/pkg/selection" - - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "sigs.k8s.io/controller-runtime/pkg/client" - corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" @@ -64,67 +53,3 @@ func (r *Reconciler) allBrokerService() runtime.Object { }, } } - -// deleteNonHeadlessServices deletes the all-broker service that was created for the current KafkaCluster -// if there is any and also the service of each broker -func (r *Reconciler) deleteNonHeadlessServices() error { - ctx := context.Background() - - svc := corev1.Service{ - TypeMeta: metav1.TypeMeta{ - APIVersion: corev1.SchemeGroupVersion.String(), - Kind: "Service", - }, - ObjectMeta: metav1.ObjectMeta{ - Namespace: r.KafkaCluster.GetNamespace(), - Name: fmt.Sprintf(kafkautils.AllBrokerServiceTemplate, r.KafkaCluster.GetName()), - }, - } - - err := r.Client.Delete(ctx, &svc) - if err != nil && client.IgnoreNotFound(err) != nil { - return err - } - - // delete broker services - labelSelector := labels.NewSelector() - for k, v := range apiutil.LabelsForKafka(r.KafkaCluster.GetName()) { - req, err := labels.NewRequirement(k, selection.Equals, []string{v}) - if err != nil { - return err - } - labelSelector = labelSelector.Add(*req) - } - - // add "has label 'brokerId' to matching labels selector expression - req, err := labels.NewRequirement(v1beta1.BrokerIdLabelKey, selection.Exists, nil) - if err != nil { - return err - } - labelSelector = labelSelector.Add(*req) - - var services corev1.ServiceList - err = r.Client.List(ctx, &services, - client.InNamespace(r.KafkaCluster.GetNamespace()), - client.MatchingLabelsSelector{Selector: labelSelector}, - ) - - if err != nil { - return errors.WrapIfWithDetails(err, "failed to list services", - "namespace", r.KafkaCluster.GetNamespace(), - "label selector", labelSelector.String()) - } - - for i := range services.Items { - svc = services.Items[i] - if !svc.GetDeletionTimestamp().IsZero() { - continue - } - err = r.Client.Delete(ctx, &svc) - if err != nil && client.IgnoreNotFound(err) != nil { - return err - } - } - - return nil -} diff --git a/pkg/resources/kafka/headlessService.go b/pkg/resources/kafka/headlessService.go index 0f91140e6..6bb3aa3d6 100644 --- a/pkg/resources/kafka/headlessService.go +++ b/pkg/resources/kafka/headlessService.go @@ -18,12 +18,17 @@ import ( "context" "fmt" + "emperror.dev/errors" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/selection" "k8s.io/apimachinery/pkg/util/intstr" "sigs.k8s.io/controller-runtime/pkg/client" + "github.com/banzaicloud/koperator/api/v1beta1" + apiutil "github.com/banzaicloud/koperator/api/util" "github.com/banzaicloud/koperator/pkg/resources/templates" kafkautils "github.com/banzaicloud/koperator/pkg/util/kafka" @@ -92,3 +97,96 @@ func (r *Reconciler) deleteHeadlessService() error { return err } + +// deleteNonHeadlessServices deletes the all-broker service that was created for the current KafkaCluster +// if there is any and also the service of each broker +func (r *Reconciler) deleteNonHeadlessServices(ctx context.Context) error { + svc := corev1.Service{ + TypeMeta: metav1.TypeMeta{ + APIVersion: corev1.SchemeGroupVersion.String(), + Kind: "Service", + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: r.KafkaCluster.GetNamespace(), + Name: fmt.Sprintf(kafkautils.AllBrokerServiceTemplate, r.KafkaCluster.GetName()), + }, + } + + err := r.Client.Delete(ctx, &svc) + if err != nil && client.IgnoreNotFound(err) != nil { + return err + } + + // delete broker services + labelSelector := labels.NewSelector() + for k, v := range apiutil.LabelsForKafka(r.KafkaCluster.GetName()) { + req, err := labels.NewRequirement(k, selection.Equals, []string{v}) + if err != nil { + return err + } + labelSelector = labelSelector.Add(*req) + } + + // add "has label 'brokerId' to matching labels selector expression + req, err := labels.NewRequirement(v1beta1.BrokerIdLabelKey, selection.Exists, nil) + if err != nil { + return err + } + labelSelector = labelSelector.Add(*req) + + var services corev1.ServiceList + err = r.Client.List(ctx, &services, + client.InNamespace(r.KafkaCluster.GetNamespace()), + client.MatchingLabelsSelector{Selector: labelSelector}, + ) + + if err != nil { + return errors.WrapIfWithDetails(err, "failed to list services", + "namespace", r.KafkaCluster.GetNamespace(), + "label selector", labelSelector.String()) + } + + // if NodePort is used for any of the external listeners, the corresponding services need to remain + // so that clients from outside the Kubernetes cluster can reach the brokers + filteredServices := services + if r.checkIfNodePortSvcNeeded() { + filteredServices = getNonNodePortSvc(services) + } + + for i := range filteredServices.Items { + svc = filteredServices.Items[i] + if !svc.GetDeletionTimestamp().IsZero() { + continue + } + err = r.Client.Delete(ctx, &svc) + if err != nil && client.IgnoreNotFound(err) != nil { + return err + } + } + + return nil +} + +// checkIfNodePortSvcNeeded returns true when users specify any of the external listeners to use NodePort +func (r *Reconciler) checkIfNodePortSvcNeeded() bool { + for _, externalListener := range r.KafkaCluster.Spec.ListenersConfig.ExternalListeners { + if externalListener.GetAccessMethod() == corev1.ServiceTypeNodePort { + return true + } + } + return false +} + +func getNonNodePortSvc(services corev1.ServiceList) corev1.ServiceList { + var svc corev1.Service + var nonNodePortSvc corev1.ServiceList + + for i := range services.Items { + svc = services.Items[i] + if svc.Spec.Type != corev1.ServiceTypeNodePort { + nonNodePortSvc.Items = append(nonNodePortSvc.Items, svc) + } + } + + return nonNodePortSvc +} diff --git a/pkg/resources/kafka/headlessService_test.go b/pkg/resources/kafka/headlessService_test.go new file mode 100644 index 000000000..29de1eec5 --- /dev/null +++ b/pkg/resources/kafka/headlessService_test.go @@ -0,0 +1,242 @@ +// Copyright © 2023 Cisco Systems, Inc. and/or its affiliates +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package kafka + +import ( + "testing" + + "github.com/stretchr/testify/require" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "github.com/banzaicloud/koperator/api/v1beta1" +) + +func TestCheckIfNodePortSvcNeeded(t *testing.T) { + testReconciler := Reconciler{} + + testCases := []struct { + testName string + kafkaCluster *v1beta1.KafkaCluster + isNodePortSvcNeeded bool + }{ + { + testName: "KafkaCluster with all external listeners using NodePort", + kafkaCluster: &v1beta1.KafkaCluster{ + Spec: v1beta1.KafkaClusterSpec{ + ListenersConfig: v1beta1.ListenersConfig{ + ExternalListeners: []v1beta1.ExternalListenerConfig{ + { + AccessMethod: corev1.ServiceTypeNodePort, + }, + { + AccessMethod: corev1.ServiceTypeNodePort, + }, + }, + }, + }, + }, + isNodePortSvcNeeded: true, + }, + { + testName: "KafkaCluster with all external listeners using non-NodePort", + kafkaCluster: &v1beta1.KafkaCluster{ + Spec: v1beta1.KafkaClusterSpec{ + ListenersConfig: v1beta1.ListenersConfig{ + ExternalListeners: []v1beta1.ExternalListenerConfig{ + { + AccessMethod: corev1.ServiceTypeLoadBalancer, + }, + { + AccessMethod: corev1.ServiceTypeClusterIP, + }, + }, + }, + }, + }, + isNodePortSvcNeeded: false, + }, + { + testName: "KafkaCluster with external listeners of mixed usages of NodePort and non-NodePort", + kafkaCluster: &v1beta1.KafkaCluster{ + Spec: v1beta1.KafkaClusterSpec{ + ListenersConfig: v1beta1.ListenersConfig{ + ExternalListeners: []v1beta1.ExternalListenerConfig{ + { + AccessMethod: corev1.ServiceTypeLoadBalancer, + }, + { + AccessMethod: corev1.ServiceTypeNodePort, + }, + }, + }, + }, + }, + isNodePortSvcNeeded: true, + }, + } + + for _, test := range testCases { + testReconciler.KafkaCluster = test.kafkaCluster + got := testReconciler.checkIfNodePortSvcNeeded() + t.Run(test.testName, func(t *testing.T) { + if got != test.isNodePortSvcNeeded { + t.Errorf("Expected: %v Got: %v", test.isNodePortSvcNeeded, got) + } + }) + } +} + +func TestGetNonNodePortSvc(t *testing.T) { + testCases := []struct { + testName string + services corev1.ServiceList + filteredServices corev1.ServiceList + }{ + { + testName: "Services with mixed NodePort and non-NodePort services", + services: corev1.ServiceList{ + Items: []corev1.Service{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "NodePort service 1", + }, + Spec: corev1.ServiceSpec{ + Type: corev1.ServiceTypeNodePort, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "NodePort service 2", + }, + Spec: corev1.ServiceSpec{ + Type: corev1.ServiceTypeNodePort, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "LoadBalancer service", + }, + Spec: corev1.ServiceSpec{ + Type: corev1.ServiceTypeLoadBalancer, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "ClusterIP service", + }, + Spec: corev1.ServiceSpec{ + Type: corev1.ServiceTypeClusterIP, + }, + }, + }, + }, + filteredServices: corev1.ServiceList{ + Items: []corev1.Service{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "LoadBalancer service", + }, + Spec: corev1.ServiceSpec{ + Type: corev1.ServiceTypeLoadBalancer, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "ClusterIP service", + }, + Spec: corev1.ServiceSpec{ + Type: corev1.ServiceTypeClusterIP, + }, + }, + }, + }, + }, + { + testName: "Services with only NodePort services", + services: corev1.ServiceList{ + Items: []corev1.Service{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "NodePort service 1", + }, + Spec: corev1.ServiceSpec{ + Type: corev1.ServiceTypeNodePort, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "NodePort service 2", + }, + Spec: corev1.ServiceSpec{ + Type: corev1.ServiceTypeNodePort, + }, + }, + }, + }, + filteredServices: corev1.ServiceList{}, + }, + { + testName: "Services with only non-NodePort services", + services: corev1.ServiceList{ + Items: []corev1.Service{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "LoadBalancer service", + }, + Spec: corev1.ServiceSpec{ + Type: corev1.ServiceTypeLoadBalancer, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "ClusterIP service", + }, + Spec: corev1.ServiceSpec{ + Type: corev1.ServiceTypeClusterIP, + }, + }, + }, + }, + filteredServices: corev1.ServiceList{ + Items: []corev1.Service{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "LoadBalancer service", + }, + Spec: corev1.ServiceSpec{ + Type: corev1.ServiceTypeLoadBalancer, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "ClusterIP service", + }, + Spec: corev1.ServiceSpec{ + Type: corev1.ServiceTypeClusterIP, + }, + }, + }, + }, + }, + } + + for _, test := range testCases { + t.Run(test.testName, func(t *testing.T) { + got := getNonNodePortSvc(test.services) + require.Equal(t, test.filteredServices, got) + }) + } +} diff --git a/pkg/resources/kafka/kafka.go b/pkg/resources/kafka/kafka.go index c1aca4e70..55a13a450 100644 --- a/pkg/resources/kafka/kafka.go +++ b/pkg/resources/kafka/kafka.go @@ -352,7 +352,7 @@ func (r *Reconciler) Reconcile(log logr.Logger) error { return err } // If dynamic configs can not be set then let the loop continue to the next broker, - // after the loop we return error. This solve that case when other brokers could get healthy, + // after the loop we return error. This solves that case when other brokers could get healthy, // but the loop exits too soon because dynamic configs can not be set. err = r.reconcilePerBrokerDynamicConfig(broker.Id, brokerConfig, configMap, log) if err != nil { @@ -373,7 +373,7 @@ func (r *Reconciler) Reconcile(log logr.Logger) error { return err } - // in case HeadlessServiceEnabled is changed delete the service that was created by the previous + // in case HeadlessServiceEnabled is changed, delete the service that was created by the previous // reconcile flow. The services must be deleted at the end of the reconcile flow after the new services // were created and broker configurations reflecting the new services otherwise the Kafka brokers // won't be reachable by koperator. @@ -381,7 +381,7 @@ func (r *Reconciler) Reconcile(log logr.Logger) error { // delete non headless services for all brokers log.V(1).Info("deleting non headless services for all brokers") - if err := r.deleteNonHeadlessServices(); err != nil { + if err := r.deleteNonHeadlessServices(ctx); err != nil { return errors.WrapIfWithDetails(err, "failed to delete non headless services for all brokers", "component", componentName, "clusterName", r.KafkaCluster.Name, From f21a0be4f5da20a3f21ae8a05fbbc105b51d5083 Mon Sep 17 00:00:00 2001 From: Darren Lau Date: Thu, 9 Feb 2023 17:25:57 -0500 Subject: [PATCH 2/6] Rename variable --- pkg/resources/kafka/headlessService.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/resources/kafka/headlessService.go b/pkg/resources/kafka/headlessService.go index 6bb3aa3d6..e4d4df99b 100644 --- a/pkg/resources/kafka/headlessService.go +++ b/pkg/resources/kafka/headlessService.go @@ -148,13 +148,13 @@ func (r *Reconciler) deleteNonHeadlessServices(ctx context.Context) error { // if NodePort is used for any of the external listeners, the corresponding services need to remain // so that clients from outside the Kubernetes cluster can reach the brokers - filteredServices := services + filteredSvcsToDelete := services if r.checkIfNodePortSvcNeeded() { - filteredServices = getNonNodePortSvc(services) + filteredSvcsToDelete = getNonNodePortSvc(services) } - for i := range filteredServices.Items { - svc = filteredServices.Items[i] + for i := range filteredSvcsToDelete.Items { + svc = filteredSvcsToDelete.Items[i] if !svc.GetDeletionTimestamp().IsZero() { continue } From 471c297c34cbd0eceb8b9d747d1c36e627a5f2ae Mon Sep 17 00:00:00 2001 From: Darren Lau Date: Mon, 13 Feb 2023 14:56:02 -0500 Subject: [PATCH 3/6] Refactor existing implementation to improve readability --- pkg/resources/kafka/configmap.go | 2 +- pkg/resources/kafka/headlessService.go | 17 +- pkg/resources/kafka/headlessService_test.go | 202 +++++++++++++------- 3 files changed, 138 insertions(+), 83 deletions(-) diff --git a/pkg/resources/kafka/configmap.go b/pkg/resources/kafka/configmap.go index 067a12588..632e56a5c 100644 --- a/pkg/resources/kafka/configmap.go +++ b/pkg/resources/kafka/configmap.go @@ -129,7 +129,7 @@ func (r *Reconciler) getConfigProperties(bConfig *v1beta1.BrokerConfig, id int32 mountPathsMerged, isMountPathRemoved := mergeMountPaths(mountPathsOld, mountPathsNew) if isMountPathRemoved { - log.Error(errors.New("removed storage is found in the KafkaCluster CR"), "removing storage from broker is not supported", v1beta1.BrokerIdLabelKey, id, "mountPaths", mountPathsOld, "mountPaths in kafkaCluster CR ", mountPathsNew) + log.Error(errors.New("removed storage is found in the KafkaCluster CR"), "removing storage from broker is not supported", v1beta1.BrokerIdLabelKey, id, "mountPaths", mountPathsOld, "mountPaths in externalListeners CR ", mountPathsNew) } if len(mountPathsMerged) != 0 { diff --git a/pkg/resources/kafka/headlessService.go b/pkg/resources/kafka/headlessService.go index e4d4df99b..58cdcf18c 100644 --- a/pkg/resources/kafka/headlessService.go +++ b/pkg/resources/kafka/headlessService.go @@ -149,8 +149,8 @@ func (r *Reconciler) deleteNonHeadlessServices(ctx context.Context) error { // if NodePort is used for any of the external listeners, the corresponding services need to remain // so that clients from outside the Kubernetes cluster can reach the brokers filteredSvcsToDelete := services - if r.checkIfNodePortSvcNeeded() { - filteredSvcsToDelete = getNonNodePortSvc(services) + if isNodePortAccessMethodInUseAmongExternalListeners(r.KafkaCluster.Spec.ListenersConfig.ExternalListeners) { + filteredSvcsToDelete = nonNodePortServices(services) } for i := range filteredSvcsToDelete.Items { @@ -167,22 +167,21 @@ func (r *Reconciler) deleteNonHeadlessServices(ctx context.Context) error { return nil } -// checkIfNodePortSvcNeeded returns true when users specify any of the external listeners to use NodePort -func (r *Reconciler) checkIfNodePortSvcNeeded() bool { - for _, externalListener := range r.KafkaCluster.Spec.ListenersConfig.ExternalListeners { +// isNodePortAccessMethodInUseAmongExternalListeners returns true when users specify any of the external listeners to use NodePort +func isNodePortAccessMethodInUseAmongExternalListeners(externalListeners []v1beta1.ExternalListenerConfig) bool { + for _, externalListener := range externalListeners { if externalListener.GetAccessMethod() == corev1.ServiceTypeNodePort { return true } } + return false } -func getNonNodePortSvc(services corev1.ServiceList) corev1.ServiceList { - var svc corev1.Service +func nonNodePortServices(services corev1.ServiceList) corev1.ServiceList { var nonNodePortSvc corev1.ServiceList - for i := range services.Items { - svc = services.Items[i] + for _, svc := range services.Items { if svc.Spec.Type != corev1.ServiceTypeNodePort { nonNodePortSvc.Items = append(nonNodePortSvc.Items, svc) } diff --git a/pkg/resources/kafka/headlessService_test.go b/pkg/resources/kafka/headlessService_test.go index 29de1eec5..b8007fe71 100644 --- a/pkg/resources/kafka/headlessService_test.go +++ b/pkg/resources/kafka/headlessService_test.go @@ -24,89 +24,70 @@ import ( "github.com/banzaicloud/koperator/api/v1beta1" ) -func TestCheckIfNodePortSvcNeeded(t *testing.T) { - testReconciler := Reconciler{} - +func TestIsNodePortAccessMethodInUseAmongExternalListeners(t *testing.T) { testCases := []struct { - testName string - kafkaCluster *v1beta1.KafkaCluster - isNodePortSvcNeeded bool + testName string + externalListeners []v1beta1.ExternalListenerConfig + expectedNodePortAccessMethodInUseAmongExternalListeners bool }{ { - testName: "KafkaCluster with all external listeners using NodePort", - kafkaCluster: &v1beta1.KafkaCluster{ - Spec: v1beta1.KafkaClusterSpec{ - ListenersConfig: v1beta1.ListenersConfig{ - ExternalListeners: []v1beta1.ExternalListenerConfig{ - { - AccessMethod: corev1.ServiceTypeNodePort, - }, - { - AccessMethod: corev1.ServiceTypeNodePort, - }, - }, - }, + testName: "All external listeners use NodePort access method", + externalListeners: []v1beta1.ExternalListenerConfig{ + { + AccessMethod: corev1.ServiceTypeNodePort, + }, + { + AccessMethod: corev1.ServiceTypeNodePort, + }, + { + AccessMethod: corev1.ServiceTypeNodePort, }, }, - isNodePortSvcNeeded: true, + expectedNodePortAccessMethodInUseAmongExternalListeners: true, }, { - testName: "KafkaCluster with all external listeners using non-NodePort", - kafkaCluster: &v1beta1.KafkaCluster{ - Spec: v1beta1.KafkaClusterSpec{ - ListenersConfig: v1beta1.ListenersConfig{ - ExternalListeners: []v1beta1.ExternalListenerConfig{ - { - AccessMethod: corev1.ServiceTypeLoadBalancer, - }, - { - AccessMethod: corev1.ServiceTypeClusterIP, - }, - }, - }, + testName: "All external listeners use non-NodePort access method", + externalListeners: []v1beta1.ExternalListenerConfig{ + { + AccessMethod: corev1.ServiceTypeLoadBalancer, + }, + { + AccessMethod: corev1.ServiceTypeClusterIP, }, }, - isNodePortSvcNeeded: false, + expectedNodePortAccessMethodInUseAmongExternalListeners: false, }, { - testName: "KafkaCluster with external listeners of mixed usages of NodePort and non-NodePort", - kafkaCluster: &v1beta1.KafkaCluster{ - Spec: v1beta1.KafkaClusterSpec{ - ListenersConfig: v1beta1.ListenersConfig{ - ExternalListeners: []v1beta1.ExternalListenerConfig{ - { - AccessMethod: corev1.ServiceTypeLoadBalancer, - }, - { - AccessMethod: corev1.ServiceTypeNodePort, - }, - }, - }, + testName: "External listeners with mixed usages of NodePort and non-NodePort access methods", + externalListeners: []v1beta1.ExternalListenerConfig{ + { + AccessMethod: corev1.ServiceTypeLoadBalancer, + }, + { + AccessMethod: corev1.ServiceTypeNodePort, }, }, - isNodePortSvcNeeded: true, + expectedNodePortAccessMethodInUseAmongExternalListeners: true, }, } for _, test := range testCases { - testReconciler.KafkaCluster = test.kafkaCluster - got := testReconciler.checkIfNodePortSvcNeeded() t.Run(test.testName, func(t *testing.T) { - if got != test.isNodePortSvcNeeded { - t.Errorf("Expected: %v Got: %v", test.isNodePortSvcNeeded, got) - } + actualNodePortAccessMethodInUseAmongExternalListeners := isNodePortAccessMethodInUseAmongExternalListeners(test.externalListeners) + + require.Equal(t, test.expectedNodePortAccessMethodInUseAmongExternalListeners, actualNodePortAccessMethodInUseAmongExternalListeners) }) } } -func TestGetNonNodePortSvc(t *testing.T) { +func TestNonNodePortServices(t *testing.T) { testCases := []struct { - testName string - services corev1.ServiceList - filteredServices corev1.ServiceList + testName string + services corev1.ServiceList + expectedNonNodePortServices corev1.ServiceList }{ { - testName: "Services with mixed NodePort and non-NodePort services", + testName: "Services with mixed NodePort and non-NodePort services that are evenly distributed", services: corev1.ServiceList{ Items: []corev1.Service{ { @@ -117,6 +98,14 @@ func TestGetNonNodePortSvc(t *testing.T) { Type: corev1.ServiceTypeNodePort, }, }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "Non-NodePort service 1", + }, + Spec: corev1.ServiceSpec{ + Type: corev1.ServiceTypeLoadBalancer, + }, + }, { ObjectMeta: metav1.ObjectMeta{ Name: "NodePort service 2", @@ -127,7 +116,42 @@ func TestGetNonNodePortSvc(t *testing.T) { }, { ObjectMeta: metav1.ObjectMeta{ - Name: "LoadBalancer service", + Name: "Non-NodePort service 2", + }, + Spec: corev1.ServiceSpec{ + Type: corev1.ServiceTypeClusterIP, + }, + }, + }, + }, + expectedNonNodePortServices: corev1.ServiceList{ + Items: []corev1.Service{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "Non-NodePort service 1", + }, + Spec: corev1.ServiceSpec{ + Type: corev1.ServiceTypeLoadBalancer, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "Non-NodePort service 2", + }, + Spec: corev1.ServiceSpec{ + Type: corev1.ServiceTypeClusterIP, + }, + }, + }, + }, + }, + { + testName: "Services with mixed NodePort and non-NodePort services that are non-evenly distributed", + services: corev1.ServiceList{ + Items: []corev1.Service{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "Non-NodePort service 1", }, Spec: corev1.ServiceSpec{ Type: corev1.ServiceTypeLoadBalancer, @@ -135,7 +159,15 @@ func TestGetNonNodePortSvc(t *testing.T) { }, { ObjectMeta: metav1.ObjectMeta{ - Name: "ClusterIP service", + Name: "NodePort service 1", + }, + Spec: corev1.ServiceSpec{ + Type: corev1.ServiceTypeNodePort, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "Non-NodePort service 2", }, Spec: corev1.ServiceSpec{ Type: corev1.ServiceTypeClusterIP, @@ -143,11 +175,11 @@ func TestGetNonNodePortSvc(t *testing.T) { }, }, }, - filteredServices: corev1.ServiceList{ + expectedNonNodePortServices: corev1.ServiceList{ Items: []corev1.Service{ { ObjectMeta: metav1.ObjectMeta{ - Name: "LoadBalancer service", + Name: "Non-NodePort service 1", }, Spec: corev1.ServiceSpec{ Type: corev1.ServiceTypeLoadBalancer, @@ -155,7 +187,7 @@ func TestGetNonNodePortSvc(t *testing.T) { }, { ObjectMeta: metav1.ObjectMeta{ - Name: "ClusterIP service", + Name: "Non-NodePort service 2", }, Spec: corev1.ServiceSpec{ Type: corev1.ServiceTypeClusterIP, @@ -184,9 +216,17 @@ func TestGetNonNodePortSvc(t *testing.T) { Type: corev1.ServiceTypeNodePort, }, }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "NodePort service 3", + }, + Spec: corev1.ServiceSpec{ + Type: corev1.ServiceTypeNodePort, + }, + }, }, }, - filteredServices: corev1.ServiceList{}, + expectedNonNodePortServices: corev1.ServiceList{}, }, { testName: "Services with only non-NodePort services", @@ -194,7 +234,7 @@ func TestGetNonNodePortSvc(t *testing.T) { Items: []corev1.Service{ { ObjectMeta: metav1.ObjectMeta{ - Name: "LoadBalancer service", + Name: "Non-NodePort service 1", }, Spec: corev1.ServiceSpec{ Type: corev1.ServiceTypeLoadBalancer, @@ -202,7 +242,15 @@ func TestGetNonNodePortSvc(t *testing.T) { }, { ObjectMeta: metav1.ObjectMeta{ - Name: "ClusterIP service", + Name: "Non-NodePort service 2", + }, + Spec: corev1.ServiceSpec{ + Type: corev1.ServiceTypeClusterIP, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "Non-NodePort service 3", }, Spec: corev1.ServiceSpec{ Type: corev1.ServiceTypeClusterIP, @@ -210,19 +258,26 @@ func TestGetNonNodePortSvc(t *testing.T) { }, }, }, - filteredServices: corev1.ServiceList{ - Items: []corev1.Service{ + expectedNonNodePortServices: corev1.ServiceList{ + Items: []corev1.Service{{ + ObjectMeta: metav1.ObjectMeta{ + Name: "Non-NodePort service 1", + }, + Spec: corev1.ServiceSpec{ + Type: corev1.ServiceTypeLoadBalancer, + }, + }, { ObjectMeta: metav1.ObjectMeta{ - Name: "LoadBalancer service", + Name: "Non-NodePort service 2", }, Spec: corev1.ServiceSpec{ - Type: corev1.ServiceTypeLoadBalancer, + Type: corev1.ServiceTypeClusterIP, }, }, { ObjectMeta: metav1.ObjectMeta{ - Name: "ClusterIP service", + Name: "Non-NodePort service 3", }, Spec: corev1.ServiceSpec{ Type: corev1.ServiceTypeClusterIP, @@ -235,8 +290,9 @@ func TestGetNonNodePortSvc(t *testing.T) { for _, test := range testCases { t.Run(test.testName, func(t *testing.T) { - got := getNonNodePortSvc(test.services) - require.Equal(t, test.filteredServices, got) + actualNonNodePortServices := nonNodePortServices(test.services) + + require.Equal(t, test.expectedNonNodePortServices, actualNonNodePortServices) }) } } From 6c240bba1702ac873d305bf90d2c7401b819afa7 Mon Sep 17 00:00:00 2001 From: Darren Lau Date: Tue, 14 Feb 2023 08:59:03 -0500 Subject: [PATCH 4/6] Fix unintended change --- pkg/resources/kafka/configmap.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/resources/kafka/configmap.go b/pkg/resources/kafka/configmap.go index 632e56a5c..067a12588 100644 --- a/pkg/resources/kafka/configmap.go +++ b/pkg/resources/kafka/configmap.go @@ -129,7 +129,7 @@ func (r *Reconciler) getConfigProperties(bConfig *v1beta1.BrokerConfig, id int32 mountPathsMerged, isMountPathRemoved := mergeMountPaths(mountPathsOld, mountPathsNew) if isMountPathRemoved { - log.Error(errors.New("removed storage is found in the KafkaCluster CR"), "removing storage from broker is not supported", v1beta1.BrokerIdLabelKey, id, "mountPaths", mountPathsOld, "mountPaths in externalListeners CR ", mountPathsNew) + log.Error(errors.New("removed storage is found in the KafkaCluster CR"), "removing storage from broker is not supported", v1beta1.BrokerIdLabelKey, id, "mountPaths", mountPathsOld, "mountPaths in kafkaCluster CR ", mountPathsNew) } if len(mountPathsMerged) != 0 { From 33be6c94d64ea51f8e79b72d52a3d4b05f060cad Mon Sep 17 00:00:00 2001 From: Darren Lau Date: Tue, 14 Feb 2023 15:00:35 -0500 Subject: [PATCH 5/6] Update logs and comments to reduce confusion --- pkg/resources/kafka/kafka.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/pkg/resources/kafka/kafka.go b/pkg/resources/kafka/kafka.go index 55a13a450..4fcb9c486 100644 --- a/pkg/resources/kafka/kafka.go +++ b/pkg/resources/kafka/kafka.go @@ -378,11 +378,10 @@ func (r *Reconciler) Reconcile(log logr.Logger) error { // were created and broker configurations reflecting the new services otherwise the Kafka brokers // won't be reachable by koperator. if r.KafkaCluster.Spec.HeadlessServiceEnabled { - // delete non headless services for all brokers - log.V(1).Info("deleting non headless services for all brokers") + log.V(1).Info("deleting non-headless services for all of the brokers") if err := r.deleteNonHeadlessServices(ctx); err != nil { - return errors.WrapIfWithDetails(err, "failed to delete non headless services for all brokers", + return errors.WrapIfWithDetails(err, "failed to delete non-headless services for all of the brokers", "component", componentName, "clusterName", r.KafkaCluster.Name, "clusterNamespace", r.KafkaCluster.Namespace) From a4a8fb53ccba2bb8dd6be6d2c809ca778f005bb0 Mon Sep 17 00:00:00 2001 From: Darren Lau Date: Tue, 14 Feb 2023 15:27:21 -0500 Subject: [PATCH 6/6] Update for loop --- pkg/resources/kafka/headlessService.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pkg/resources/kafka/headlessService.go b/pkg/resources/kafka/headlessService.go index 58cdcf18c..f86a5a105 100644 --- a/pkg/resources/kafka/headlessService.go +++ b/pkg/resources/kafka/headlessService.go @@ -153,8 +153,7 @@ func (r *Reconciler) deleteNonHeadlessServices(ctx context.Context) error { filteredSvcsToDelete = nonNodePortServices(services) } - for i := range filteredSvcsToDelete.Items { - svc = filteredSvcsToDelete.Items[i] + for _, svc := range filteredSvcsToDelete.Items { if !svc.GetDeletionTimestamp().IsZero() { continue }