From 2d55a6799b6d073b6ec70c83d8da677c5d3821d0 Mon Sep 17 00:00:00 2001 From: Razvan Dobre Date: Wed, 13 May 2026 13:32:26 +0300 Subject: [PATCH] fix: retain draining brokers in external listener config until CruiseControl completes Brokers removed from the KafkaCluster spec were immediately excluded from envoy, istio, and contour config, even while CruiseControl was still draining them. Clients lost connectivity to brokers that still held partition leaders. Root cause: ShouldIncludeBroker() returned false when brokerConfig==nil (broker not in spec). Add a fallback path: when brokerConfig is nil, check the broker's CruiseControlState in status. If the state is an active downscale (IsDownscale && !IsSucceeded) and the broker was previously bound to the requested ingressConfig, keep it in the external listener resources. Brokers stuck in CompletedWithError or Paused are also retained, allowing manual investigation while keeping client connectivity. ShouldIncludeBroker is the single gatekeeper for all external listener reconcilers (envoy, istio, contour), so no other files need changes. Co-Authored-By: Claude Sonnet 4.6 --- pkg/util/util.go | 11 +++++++ pkg/util/util_test.go | 69 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 80 insertions(+) diff --git a/pkg/util/util.go b/pkg/util/util.go index 6b3007201..a6a396eb1 100644 --- a/pkg/util/util.go +++ b/pkg/util/util.go @@ -281,6 +281,17 @@ func ShouldIncludeBroker(brokerConfig *v1beta1.BrokerConfig, status v1beta1.Kafk } } } + + // Broker removed from spec but still draining — keep in external listener config until CC finishes + if brokerConfig == nil { + if brokerState, ok := status.BrokersState[strconv.Itoa(brokerID)]; ok { + ccState := brokerState.GracefulActionState.CruiseControlState + if ccState.IsDownscale() && !ccState.IsSucceeded() && + apiutil.StringSliceContains(brokerState.ExternalListenerConfigNames, ingressConfigName) { + return true + } + } + } return false } diff --git a/pkg/util/util_test.go b/pkg/util/util_test.go index db84482ed..22533352a 100644 --- a/pkg/util/util_test.go +++ b/pkg/util/util_test.go @@ -930,3 +930,72 @@ func TestGetMD5Hash(t *testing.T) { } } } + +func TestShouldIncludeBroker(t *testing.T) { + t.Parallel() + + const ingressConfig = "default" + brokerID := 5 + + makeStatus := func(state v1beta1.CruiseControlState) v1beta1.KafkaClusterStatus { + return v1beta1.KafkaClusterStatus{ + BrokersState: map[string]v1beta1.BrokerState{ + "5": { + GracefulActionState: v1beta1.GracefulActionState{ + CruiseControlState: state, + }, + ExternalListenerConfigNames: v1beta1.ExternalListenerConfigNames{ingressConfig}, + }, + }, + } + } + + testCases := []struct { + name string + state v1beta1.CruiseControlState + expected bool + }{ + {"GracefulDownscaleRequired", v1beta1.GracefulDownscaleRequired, true}, + {"GracefulDownscaleScheduled", v1beta1.GracefulDownscaleScheduled, true}, + {"GracefulDownscaleRunning", v1beta1.GracefulDownscaleRunning, true}, + {"GracefulDownscaleCompletedWithError", v1beta1.GracefulDownscaleCompletedWithError, true}, + {"GracefulDownscalePaused", v1beta1.GracefulDownscalePaused, true}, + {"GracefulDownscaleSucceeded", v1beta1.GracefulDownscaleSucceeded, false}, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + result := ShouldIncludeBroker(nil, makeStatus(tc.state), brokerID, ingressConfig, ingressConfig) + if result != tc.expected { + t.Errorf("state %s: expected %v, got %v", tc.name, tc.expected, result) + } + }) + } + + t.Run("no broker state entry", func(t *testing.T) { + emptyStatus := v1beta1.KafkaClusterStatus{ + BrokersState: map[string]v1beta1.BrokerState{}, + } + result := ShouldIncludeBroker(nil, emptyStatus, brokerID, ingressConfig, ingressConfig) + if result { + t.Error("expected false for broker with no status entry, got true") + } + }) + + t.Run("ingress config not in ExternalListenerConfigNames", func(t *testing.T) { + status := v1beta1.KafkaClusterStatus{ + BrokersState: map[string]v1beta1.BrokerState{ + "5": { + GracefulActionState: v1beta1.GracefulActionState{ + CruiseControlState: v1beta1.GracefulDownscaleRunning, + }, + ExternalListenerConfigNames: v1beta1.ExternalListenerConfigNames{"other-ingress"}, + }, + }, + } + result := ShouldIncludeBroker(nil, status, brokerID, ingressConfig, ingressConfig) + if result { + t.Error("expected false when ingressConfig not in ExternalListenerConfigNames, got true") + } + }) +}