Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions openspec/changes/downscale-improvements/.openspec.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
schema: spec-driven
created: 2026-05-13
79 changes: 79 additions & 0 deletions openspec/changes/downscale-improvements/design.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
## Context

The koperator manages Kafka cluster lifecycle via a set of reconcilers. Downscale (broker removal) is handled in two phases:

1. **KafkaCluster reconciler** (`pkg/resources/kafka/kafka.go`): Detects brokers removed from spec. Collects all such brokers in one pass and sets them all to `GracefulDownscaleRequired` via a single atomic `UpdateBrokerStatus` call.

2. **CruiseControlTask reconciler** (`controllers/cruisecontroltask_controller.go`): Picks up brokers in `GracefulDownscaleRequired` state and submits CC operations. The `add_broker` path already batches all pending brokers into one CC operation. The `remove_broker` path does not — it picks only the first task and breaks.

3. **External listener reconcilers** (`pkg/resources/envoy/`, `pkg/resources/istioingress/`): Gate broker inclusion in envoy/istio/contour config on `ShouldIncludeBroker()`. This function returns `false` when `brokerConfig == nil` (broker not in spec), causing draining brokers to vanish from external listener config immediately upon spec removal.

**Key invariant:** `GetActiveTasksByOp(OperationRemoveBroker)` only returns brokers in `GracefulDownscaleRequired` state (via `IsRequired()` → `IsRequiredState()`). Brokers already `Scheduled` or `Running` are not returned. Because the KafkaCluster reconciler transitions all removed brokers atomically, all brokers from a single manifest apply are guaranteed to be in `Required` state simultaneously when the CC task reconciler fires.

## Goals / Non-Goals

**Goals:**
- All broker IDs removed in a single manifest apply are submitted as one CC `remove_broker` operation.
- Brokers removed from spec remain in external listener config until `GracefulDownscaleSucceeded`.
- Brokers stuck in `GracefulDownscaleCompletedWithError` or `GracefulDownscalePaused` remain in external listener config (broker still holds data; manual investigation needed).

**Non-Goals:**
- Guaranteed single-operation batching across multiple sequential manifest applies (best-effort; second apply may produce a second CC operation if first batch is already `Scheduled`).
- Changes to CRD schema or the CC REST API.
- KRaft controller-only node handling (controller-only nodes skip CC graceful downscale entirely; unchanged).

## Decisions

### D1: Mirror `addBrokers` pattern for `removeBrokers`

The `addBrokers` helper (lines 366-368) already accepts `[]string` and submits one CC operation. The `removeBroker` helper (lines 370-372) takes a single `string`.

**Decision:** Rename `removeBroker` → `removeBrokers`, change signature to `[]string`, and replace the early-break loop with the collect-all pattern.

**Why not a separate aggregation layer?** The batching boundary is already correct — `GetActiveTasksByOp` returns exactly the set to batch. No new abstraction needed.

### D2: Fix `ShouldIncludeBroker` as the single gatekeeper

`ShouldIncludeBroker` is called by all external listener reconcilers (envoy configmap, service, deployment; istio gateway, virtualservice, meshgateway; contour). When `brokerConfig == nil`, the function currently falls through to `return false`.

**Decision:** Add a fallback block for `brokerConfig == nil`: check the broker's `CruiseControlState` in status. If `IsDownscale() && !IsSucceeded()` and the broker has the requested `ingressConfigName` in its `ExternalListenerConfigNames`, return `true`.

**Why `ExternalListenerConfigNames` check?** A broker may have been associated with a specific ingress config. Re-using the persisted `ExternalListenerConfigNames` (set when the pod was created, never cleared until `GracefulDownscaleSucceeded`) ensures we only retain the broker for the listener configs it actually served.

**Why `IsDownscale() && !IsSucceeded()` instead of an explicit state list?**
`IsDownscale()` covers all 6 downscale states. Excluding `IsSucceeded()` retains the broker in all non-terminal states, including `CompletedWithError` and `Paused`, which is the desired behavior for manual investigation. If new downscale states are added to the enum in future, they're covered automatically.

**Alternatives rejected:**
- Fix each caller individually: more code, same logic duplicated.
- Add a new function: unnecessary indirection; `ShouldIncludeBroker` is the right seam.

### D3: Task order to satisfy CI (green at every commit)

The original plan placed failing tests before implementation. With CI gating on green builds, the order must be:

```
Commit 1: Unit test for createCCOperation (passes immediately — tests downstream, not the wrapper)
Commit 2: Implement removeBrokers + integration test (both green together)
Commit 3: Implement ShouldIncludeBroker fix + unit test (both green together)
Commit 4: E2E test + sample manifest
```

## Risks / Trade-offs

**[Race: second manifest apply before first batch starts]** → If a user applies a second spec change (removing more brokers) before the first CC operation is created, those new brokers will be included in the same batch (still in `Required` state). If the first CC operation is already `Scheduled`, new brokers get a separate operation. Acceptable; same behavior as `add_broker`.

**[CompletedWithError brokers stay in envoy indefinitely]** → A broker that fails CC draining stays in external listener config. This is intentional (data is still present, clients need connectivity), but operators must monitor and manually recover. No change from current behavior for the envoy side — previously, the broker would have been dropped from envoy even with data present, which was worse.

**[ExternalListenerConfigNames populated assumption]** → The fix assumes `ExternalListenerConfigNames` is non-empty for any broker that was ever reconciled. This field is set on pod creation and never cleared. Clusters created before this field existed would not benefit from the fix for those brokers, but all newly created or recently reconciled brokers are covered.

## Migration Plan

No migration required. Both fixes are backwards-compatible:
- `removeBrokers` produces the same CC API calls as `removeBroker` for single-broker downscales.
- `ShouldIncludeBroker` only changes behavior for brokers with `brokerConfig == nil` (already removed from spec); existing behavior for in-spec brokers is unchanged.

Rollback: revert the two commits. No persistent state is affected.

## Open Questions

None — all decisions resolved during exploration.
26 changes: 26 additions & 0 deletions openspec/changes/downscale-improvements/proposal.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
## Why

When downscaling a Kafka cluster, the operator creates one CruiseControl operation per removed broker and immediately drops removed brokers from external listener config (envoy/istio/contour), causing unnecessary partition movements and client connectivity loss during draining.

## What Changes

- **Batch broker removal**: Collect all broker IDs pending downscale and submit them as a single `remove_broker` CC operation, matching the existing `add_broker` batching behavior.
- **Retain draining brokers in external listeners**: Keep removed brokers in envoy/istio/contour config until CruiseControl finishes draining them (`GracefulDownscaleSucceeded`), so clients retain connectivity while data is being moved.

## Capabilities

### New Capabilities

- `batched-broker-removal`: Single CruiseControl `remove_broker` operation for all brokers removed in a manifest apply, eliminating redundant partition movements.
- `draining-broker-listener-retention`: Brokers removed from spec but still being drained by CruiseControl remain visible in all external listener resources until draining completes.

### Modified Capabilities

<!-- none — no existing specs to delta -->

## Impact

- `controllers/cruisecontroltask_controller.go`: Replace single-broker `removeBroker` logic with `removeBrokers` (plural), mirroring `addBrokers` pattern.
- `pkg/util/util.go`: `ShouldIncludeBroker` gains a fallback path for `brokerConfig == nil` that checks `CruiseControlState`.
- All external listener reconcilers (`pkg/resources/envoy/`, `pkg/resources/istioingress/`) benefit automatically — they all gate on `ShouldIncludeBroker`.
- No API or CRD changes. No breaking changes.
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
## ADDED Requirements

### Requirement: Broker removals from a single manifest apply are batched into one CC operation
When a KafkaCluster spec is applied that removes N brokers, the operator SHALL submit a single `remove_broker` CruiseControl operation containing all N broker IDs, rather than N separate operations.

#### Scenario: Multiple brokers removed simultaneously
- **WHEN** a KafkaCluster spec is applied removing brokers 3 and 4 from a 5-broker cluster
- **THEN** exactly one `CruiseControlOperation` of type `remove_broker` is created
- **THEN** the operation's broker ID parameter contains both broker IDs (e.g. `"3,4"`)
- **THEN** both brokers transition to `GracefulDownscaleScheduled` referencing the same operation

#### Scenario: Single broker removal (unchanged behavior)
- **WHEN** a KafkaCluster spec is applied removing one broker
- **THEN** exactly one `CruiseControlOperation` of type `remove_broker` is created
- **THEN** the broker transitions to `GracefulDownscaleScheduled`

#### Scenario: Brokers already scheduled are not re-batched
- **WHEN** broker 3 is already in `GracefulDownscaleScheduled` state and broker 4 enters `GracefulDownscaleRequired`
- **THEN** a new `CruiseControlOperation` is created for broker 4 only
- **THEN** broker 3's existing operation is not modified
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
## ADDED Requirements

### Requirement: Draining brokers remain in external listener config until CC completes
A broker removed from the KafkaCluster spec SHALL remain present in all external listener resources (envoy, istio, contour) while CruiseControl is actively draining it, and SHALL be removed only after `GracefulDownscaleSucceeded`.

#### Scenario: Broker retained during active draining
- **WHEN** a broker is removed from spec and its `CruiseControlState` is `GracefulDownscaleRunning`
- **THEN** `ShouldIncludeBroker` returns `true` for that broker
- **THEN** the broker remains in envoy configmap, service, and deployment resources

#### Scenario: Broker retained when CC operation is scheduled but not yet running
- **WHEN** a broker is removed from spec and its `CruiseControlState` is `GracefulDownscaleScheduled`
- **THEN** `ShouldIncludeBroker` returns `true` for that broker

#### Scenario: Broker retained when CC has not yet started (Required state)
- **WHEN** a broker is removed from spec and its `CruiseControlState` is `GracefulDownscaleRequired`
- **THEN** `ShouldIncludeBroker` returns `true` for that broker

#### Scenario: Broker retained on CC error (manual investigation needed)
- **WHEN** a broker is removed from spec and its `CruiseControlState` is `GracefulDownscaleCompletedWithError`
- **THEN** `ShouldIncludeBroker` returns `true` for that broker

#### Scenario: Broker retained when CC operation is paused
- **WHEN** a broker is removed from spec and its `CruiseControlState` is `GracefulDownscalePaused`
- **THEN** `ShouldIncludeBroker` returns `true` for that broker

#### Scenario: Broker removed from listener config after successful drain
- **WHEN** a broker's `CruiseControlState` transitions to `GracefulDownscaleSucceeded`
- **THEN** `ShouldIncludeBroker` returns `false` for that broker
- **THEN** the broker is removed from all external listener resources on the next reconcile

#### Scenario: Unknown broker excluded
- **WHEN** a broker ID has no entry in `BrokersState` (unknown broker)
- **THEN** `ShouldIncludeBroker` returns `false` for that broker
26 changes: 26 additions & 0 deletions openspec/changes/downscale-improvements/tasks.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
## 1. Batched broker removal — unit test (passes immediately)

- [x] 1.1 Add multi-broker test case to `TestCreateCCOperation` in `controllers/cruisecontroltask_controller_test.go` (after line 345): `operationType: OperationRemoveBroker`, `brokerIDs: []string{"1","2","3"}`, assert params contain `"1,2,3"` for `ParamBrokerID`
- [x] 1.2 Commit: `test: add unit test for batched remove_broker CC operation params`

## 2. Batched broker removal — implementation + integration test

- [x] 2.1 Rename `removeBroker` → `removeBrokers` in `controllers/cruisecontroltask_controller.go` (line 370), change `brokerID string` parameter to `brokerIDs []string`
- [x] 2.2 Replace the early-break loop (lines 173-186) with collect-all pattern: gather all broker IDs from `GetActiveTasksByOp(OperationRemoveBroker)`, call `removeBrokers`, set `CruiseControlOperationRef` and `StateScheduled` on all tasks
- [x] 2.3 Add integration test `When("multiple brokers are removed", ...)` to `controllers/tests/cruisecontroltask_controller_test.go` (after line 458): set brokers "1" and "2" to `GracefulDownscaleRequired`, assert exactly 1 `CruiseControlOperation` created, both brokers reference same operation, both transition to `GracefulDownscaleScheduled`
- [x] 2.4 Run `go test ./controllers/ -run TestCreateCCOperation` and `go test ./controllers/tests/ -run CruiseControlTaskReconciler` — all green
- [ ] 2.5 Commit: `feat: batch remove_broker operations into single CruiseControl task`

## 3. Draining broker listener retention — implementation + unit test

- [x] 3.1 Add fallback block to `ShouldIncludeBroker` in `pkg/util/util.go` (after line 284): when `brokerConfig == nil`, look up `brokerState` in `status.BrokersState`; if `ccState.IsDownscale() && !ccState.IsSucceeded()` and `StringSliceContains(brokerState.ExternalListenerConfigNames, ingressConfigName)`, return `true`
- [x] 3.2 Add unit test cases to `pkg/util/util_test.go` for `ShouldIncludeBroker` with `brokerConfig=nil`: all 5 active downscale states return `true`, `GracefulDownscaleSucceeded` returns `false`, missing broker state returns `false`
- [x] 3.3 Run `go test ./pkg/util/ -run TestShouldIncludeBroker` — all green
- [x] 3.4 Commit: `fix: retain draining brokers in external listener config until CruiseControl completes`

## 4. E2E test + sample manifest

- [x] 4.1 Create `config/samples/simplekafkacluster_5broker.yaml`: copy `simplekafkacluster.yaml`, add brokers 3 and 4, adjust `cruise.control.metrics.topic.replication.factor=2` and `min.insync.replicas=2`
- [x] 4.2 Add `testBatchedBrokerRemoval()` to `tests/e2e/test_broker_removal.go` following `testMultiDiskRemoval` pattern: apply 5-broker manifest, then apply 3-broker manifest, assert exactly 1 `CruiseControlOperation` of type `remove_broker`, assert only 3 pods remain Ready
- [x] 4.3 Wire into suite in `tests/e2e/koperator_suite_test.go` (after multi-disk removal block, line 74): `testInstallKafkaCluster("../../config/samples/simplekafkacluster_5broker.yaml")`, `testBatchedBrokerRemoval()`, `testUninstallKafkaCluster()`
- [ ] 4.4 Commit: `test: add e2e test for batched broker removal`
Loading