From 754e59cdf48d8c5a2c29aeb7064ad8165ffc9fe7 Mon Sep 17 00:00:00 2001 From: Benjamin Knofe-Vider Date: Thu, 21 May 2026 10:27:33 +0200 Subject: [PATCH 1/2] =?UTF-8?q?feat(controlplane):=20unbounded=20K8s=20wor?= =?UTF-8?q?ker=20scaling=20=E2=80=94=20remove=20memory-budget-derived=20Ma?= =?UTF-8?q?xWorkers=20cap?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit In K8s mode, workers run as separate pods on separate nodes, so the control plane's own memory budget tells us nothing about how many worker pods the cluster can host. Yet the CP was deriving `k8s.max_workers = memory_budget / 256MB` whenever the operator left the cap unset, and a 30-tenant / 900-qps load test stabilised at 11 workers / ~96 qps because the CP host had ~2.8 GB free — even though Karpenter had already spun up 12 worker nodes ready to accept pods. This change: * Drops the memory-budget derivation for `k8s.max_workers`. If `cfg.K8s.MaxWorkers` is 0, the pool is unbounded and the cluster's NodePool / autoscaler is the natural ceiling. Downstream call sites (`OrgReservedPool`, `K8sWorkerPool.canSpawn`, `ConfigStore.ClaimIdleWorker`, `CreateSpawningWorkerSlot`) already treat `MaxWorkers == 0` as "no cap", so no other code paths need changes. * Skips the "k8s.shared_warm_target exceeds k8s.max_workers" capping when MaxWorkers is unbounded — the warm target stands on its own. * Replaces the misleading "Derived k8s.max_workers from memory budget" startup log with one that names what's actually happening — `k8s.max_workers unset; worker pool is unbounded`. * Updates the `K8sConfig.MaxWorkers` field doc to match. Process / local mode keeps the existing derivation since process-mode workers share the CP process's memory. Adds `TestOrgReservedPoolAcquireUnboundedWhenMaxWorkersZero` driving `OrgReservedPool` with `maxWorkers=0` and asserting it can acquire many more workers than the previous cap would have allowed (30 in the test) without rejecting on max-workers grounds. 🤖 Generated with [Claude Code](https://claude.com/claude-code) --- controlplane/control.go | 24 +++++++----- controlplane/org_reserved_pool_test.go | 52 ++++++++++++++++++++++++++ 2 files changed, 66 insertions(+), 10 deletions(-) diff --git a/controlplane/control.go b/controlplane/control.go index 3fd4cd4c..cf9f8c39 100644 --- a/controlplane/control.go +++ b/controlplane/control.go @@ -111,7 +111,7 @@ type K8sConfig struct { WorkerConfigMap string // ConfigMap name for duckgres.yaml ImagePullPolicy string // Image pull policy for worker pods (e.g., "Never", "IfNotPresent", "Always") ServiceAccount string // Neutral ServiceAccount name for worker pods (default: "duckgres-worker") - MaxWorkers int // Global cap for the shared K8s worker pool (0 = auto-derived) + MaxWorkers int // Global cap for the shared K8s worker pool (0 = unbounded; cluster autoscaler is the natural ceiling) SharedWarmTarget int // Neutral shared warm-worker target for K8s multi-tenant mode (0 = disabled) WorkerCPURequest string // CPU request for worker pods (e.g., "500m") WorkerMemoryRequest string // Memory request for worker pods (e.g., "1Gi") @@ -346,7 +346,12 @@ func RunControlPlane(cfg ControlPlaneConfig) { memBudget := server.ParseMemoryBytes(cfg.MemoryBudget) // Use a temporary rebalancer to auto-detect the budget and derive - // backend-specific default max_workers if not explicitly set. + // process-mode default max_workers if not explicitly set. The K8s + // backend does NOT derive max_workers from this budget — worker pods + // run on separate nodes, so the CP's RAM tells us nothing about how + // many worker pods the cluster can host. For K8s mode, MaxWorkers=0 + // means "unbounded" and the cluster's NodePool/autoscaler is the + // natural ceiling. tempRebalancer := NewMemoryRebalancer(memBudget, 0, nil, false) memBudget = tempRebalancer.memoryBudget // capture auto-detected value @@ -354,10 +359,8 @@ func RunControlPlane(cfg ControlPlaneConfig) { if processMaxWorkers == 0 { processMaxWorkers = tempRebalancer.DefaultMaxWorkers() } + // K8s max_workers: 0 = unbounded (no cap derived from CP memory). k8sMaxWorkers := cfg.K8s.MaxWorkers - if k8sMaxWorkers == 0 { - k8sMaxWorkers = tempRebalancer.DefaultMaxWorkers() - } rebalancer := NewMemoryRebalancer(memBudget, 0, nil, cfg.MemoryRebalance) @@ -366,10 +369,8 @@ func RunControlPlane(cfg ControlPlaneConfig) { "process_max_workers", processMaxWorkers, "memory_budget", formatBytes(memBudget)) } - if isK8s && cfg.K8s.MaxWorkers == 0 { - slog.Info("Derived k8s.max_workers from memory budget.", - "k8s_max_workers", k8sMaxWorkers, - "memory_budget", formatBytes(memBudget)) + if isK8s && k8sMaxWorkers == 0 { + slog.Info("k8s.max_workers unset; worker pool is unbounded — cluster autoscaler (e.g. Karpenter) is the ceiling.") } processMinWorkers := cfg.Process.MinWorkers @@ -381,7 +382,10 @@ func RunControlPlane(cfg ControlPlaneConfig) { } k8sSharedWarmTarget := cfg.K8s.SharedWarmTarget - if isK8s && k8sSharedWarmTarget > k8sMaxWorkers { + // Only cap the warm target if k8sMaxWorkers is an actual upper bound + // (>0). When k8sMaxWorkers == 0 the pool is unbounded and the warm + // target stands on its own. + if isK8s && k8sMaxWorkers > 0 && k8sSharedWarmTarget > k8sMaxWorkers { slog.Warn("k8s.shared_warm_target exceeds k8s.max_workers; capping to k8s.max_workers.", "k8s_shared_warm_target", k8sSharedWarmTarget, "k8s_max_workers", k8sMaxWorkers) diff --git a/controlplane/org_reserved_pool_test.go b/controlplane/org_reserved_pool_test.go index 923814e8..bfd189fd 100644 --- a/controlplane/org_reserved_pool_test.go +++ b/controlplane/org_reserved_pool_test.go @@ -209,6 +209,58 @@ func TestOrgReservedWorkerPoolAcquireDelegatesActivationWithoutCachedTenantRunti } } +// TestOrgReservedPoolAcquireUnboundedWhenMaxWorkersZero confirms that +// MaxWorkers == 0 means "no per-org cap" in K8s mode. The load test that +// motivated the cap removal stabilised at 11 workers because the CP +// derived MaxWorkers from CP host memory; the only thing keeping us from +// scaling further was this synthetic cap. With MaxWorkers == 0, the pool +// must keep handing out new workers as long as the shared (cluster) pool +// has room. +func TestOrgReservedPoolAcquireUnboundedWhenMaxWorkersZero(t *testing.T) { + shared, _ := newTestK8sPool(t, 0) // shared pool also unbounded + shared.healthCheckFunc = func(ctx context.Context, worker *ManagedWorker) error { + return nil + } + // Pre-seed many neutral warm workers so AcquireWorker can reserve + // each one in turn without blocking on a real spawn path. + const target = 30 + for i := 1; i <= target; i++ { + addNeutralWarmWorker(shared, i) + } + shared.spawnWarmWorkerFunc = func(ctx context.Context, id int) error { + shared.mu.Lock() + shared.workers[id] = &ManagedWorker{ID: id, image: shared.workerImage, done: make(chan struct{})} + shared.mu.Unlock() + return nil + } + + // maxWorkers = 0 — the change under test. AcquireWorker must NOT + // reject on max-workers grounds. + pool := NewOrgReservedPool(shared, "analytics", 0, shared.workerImage, nil) + pool.activateReservedWorker = func(ctx context.Context, worker *ManagedWorker) error { + return nil + } + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + seen := make(map[int]struct{}, target) + for i := 0; i < target; i++ { + w, err := pool.AcquireWorker(ctx) + if err != nil { + t.Fatalf("AcquireWorker[%d] failed with maxWorkers=0: %v", i, err) + } + if _, dup := seen[w.ID]; dup { + t.Fatalf("AcquireWorker[%d] returned duplicate worker ID %d", i, w.ID) + } + seen[w.ID] = struct{}{} + } + + if got := pool.assignedWorkerCountLocked(); got < target { + t.Fatalf("expected at least %d assigned workers, got %d", target, got) + } +} + func TestOrgReservedPoolAcquireWaitsWhenSharedWarmWorkerBusyAtCapacity(t *testing.T) { shared, _ := newTestK8sPool(t, 5) worker := &ManagedWorker{ID: 3, activeSessions: 1, done: make(chan struct{})} From abbb361ed2a321b7c01d6479961e6f54c73fd8d6 Mon Sep 17 00:00:00 2001 From: Benjamin Knofe-Vider Date: Thu, 21 May 2026 10:40:24 +0200 Subject: [PATCH 2/2] feat(controlplane): scale worker pool to demand in one tick MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit A 30-tenant cold ramp at 30 qps revealed the warm-pool reconciler creeping up at ~2-3 workers per minute instead of closing the gap in one cycle: t=0 5 workers (initial warm) t=60s 8 (+3) t=120s 10 (+2) t=150s 11 (+1) t=600s still 11 never reached the desired 30 Karpenter was already provisioning nodes within ~30s; the bottleneck was on the CP side. Two throttles compounded: 1. `reconcileWarmCapacity` called `SpawnMinWorkers(target)` where `target = K8s.SharedWarmTarget` — a static configuration value. Every tick it filled the warm pool only up to that floor and never reacted to bursts of `WarmCapacityExhausted` retries. The observed pool growth came entirely from `triggerPerImageReplenish` refilling exactly one slot per consumed warm worker, which scales with successful activations — not with queued demand. 2. `K8sWorkerPool.spawnSem` was sized at 3, serialising even the parallel WaitGroup fan-out in `SpawnMinWorkers` down to 3 concurrent pod creates. The K8s client was already configured for QPS=50 / Burst=100, so the semaphore was the binding constraint. This commit threads observed demand through the reconciler: * `K8sWorkerPool.warmCapacityMisses` (atomic int64) is incremented every time `ReserveSharedWorker` returns `WarmCapacityExhausted` for any non-`OrgCap` reason. `OrgCap` is excluded because adding neutral warm pods doesn't help an org that has hit its own cap. * `ConsumeWarmCapacityDemand()` returns and atomically resets the counter. * Janitor's `reconcileWarmCapacity` now computes `effectiveTarget = staticTarget + observedDemand` and calls `SpawnMinWorkers(effectiveTarget)`, scaling to absorbed demand in a single tick. Pod creation already fans out via WaitGroup inside `SpawnMinWorkers`; with `spawnSem` raised to 50, the K8s API calls actually run in parallel. Scale-DOWN is intentionally untouched — the idle reaper keeps its slower cadence so steady-state idle dips don't thrash the pool. `TestK8sPoolWarmCapacityDemandScalesPoolInOneTick` drives 30 concurrent `ReserveSharedWorker` calls against a store that always misses, simulates one janitor tick, and asserts exactly 35 (`staticTarget=5 + demand=30`) spawn slots are allocated in a single pass — pinning the new behaviour against a regression to per-tick increments. 🤖 Generated with [Claude Code](https://claude.com/claude-code) --- controlplane/k8s_pool.go | 47 ++++++++++++++- controlplane/k8s_pool_test.go | 107 ++++++++++++++++++++++++++++++++++ controlplane/multitenant.go | 22 ++++++- 3 files changed, 174 insertions(+), 2 deletions(-) diff --git a/controlplane/k8s_pool.go b/controlplane/k8s_pool.go index 5947244e..cf918439 100644 --- a/controlplane/k8s_pool.go +++ b/controlplane/k8s_pool.go @@ -15,6 +15,7 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "time" "crypto/x509" @@ -98,6 +99,14 @@ type K8sWorkerPool struct { runtimeStore RuntimeWorkerStore activatingTimeout time.Duration // max time a worker can stay in reserved/activating before being reaped + + // warmCapacityMisses counts ReserveSharedWorker calls that returned + // WarmCapacityExhaustedError since the last ConsumeWarmCapacityDemand call. + // The janitor's warm-capacity reconciler drains this counter each tick and + // scales the warm pool to absorb the observed demand in one shot, rather + // than creeping up at the static SharedWarmTarget floor while cold tenants + // retry on 45-second backoffs. Atomically accessed; no lock needed. + warmCapacityMisses atomic.Int64 } // NewK8sWorkerPool creates a K8sWorkerPool using in-cluster credentials. @@ -145,7 +154,14 @@ func newK8sWorkerPool(cfg K8sWorkerPoolConfig, clientset kubernetes.Interface) ( } // Limit concurrent K8s API calls to avoid overwhelming the API server. - spawnConcurrency := 3 + // The K8s client above is configured for QPS=50 / Burst=100, so 50 in-flight + // pod creates fits comfortably within client-side throttling while letting + // the warm-pool reconciler close large demand gaps (e.g. 30-tenant cold + // ramp) in one tick instead of creeping up at 3-per-pod-startup. Pod + // scheduling latency on the node side is bounded by Karpenter (~30s) and + // the kubelet, neither of which cares about how many pods we create + // in parallel from the CP. + spawnConcurrency := 50 retireConcurrency := 5 pool := &K8sWorkerPool{ workers: make(map[int]*ManagedWorker), @@ -1388,6 +1404,15 @@ func (p *K8sWorkerPool) ReserveSharedWorker(ctx context.Context, assignment *Wor continue } + // Surface unmet demand to the warm-pool reconciler. An OrgCap miss + // is a per-org cap, not a shared-pool shortage, so spawning more + // neutral warm pods won't help — skip those. Any other miss + // (NoIdle, etc.) means the shared warm pool was too small for the + // observed burst; bump the demand counter so the next reconcile + // tick scales toward `idle + recent_misses` in one shot. + if missReason != configstore.WorkerClaimMissReasonOrgCap { + p.warmCapacityMisses.Add(1) + } return nil, NewWarmCapacityExhaustedErrorForReason(missReason, DefaultWarmCapacityRetryAfter) } @@ -1453,6 +1478,10 @@ func (p *K8sWorkerPool) ReserveSharedWorker(ctx context.Context, assignment *Wor p.mu.Unlock() + // Single-CP / no-runtime-store mode: same demand signal as above. The + // next janitor tick scales the warm pool toward absorbed demand + // instead of staying at the static SharedWarmTarget floor. + p.warmCapacityMisses.Add(1) return nil, NewWarmCapacityExhaustedError(DefaultWarmCapacityRetryAfter) } } @@ -2872,6 +2901,22 @@ func (p *K8sWorkerPool) WarmCapacityTarget() int { return p.minWorkers } +// ConsumeWarmCapacityDemand returns the number of ReserveSharedWorker calls +// that have hit WarmCapacityExhausted (excluding per-org cap misses) since +// the last call, atomically resetting the counter. The warm-pool reconciler +// adds this to the static target each tick so a cold burst of N tenants +// scales the pool toward N workers in one or two ticks, instead of creeping +// up while clients re-arrive on their 45-second retry hints. Scale-down +// stays under the idle reaper's slower cadence so steady-state idle dips +// don't thrash the pool. +func (p *K8sWorkerPool) ConsumeWarmCapacityDemand() int { + n := p.warmCapacityMisses.Swap(0) + if n < 0 { + return 0 + } + return int(n) +} + // SetPerImageWarmTargets replaces the per-image warm-worker floor. Each entry // asks the pool to keep at least N warm-idle workers running with the given // image. This is layered on top of SetWarmCapacityTarget — the per-image floor diff --git a/controlplane/k8s_pool_test.go b/controlplane/k8s_pool_test.go index 61cbc84f..e5d77ba8 100644 --- a/controlplane/k8s_pool_test.go +++ b/controlplane/k8s_pool_test.go @@ -9,6 +9,7 @@ import ( "regexp" "strconv" "sync" + "sync/atomic" "testing" "time" @@ -3364,3 +3365,109 @@ func TestReapIdleWorkersSkipsWorkersWithinIdleTimeout(t *testing.T) { t.Errorf("expected both workers to survive (under idleTimeout); got %d remaining", len(pool.workers)) } } + +// TestK8sPoolWarmCapacityDemandScalesPoolInOneTick reproduces the 30-tenant +// cold-ramp regression: with the static SharedWarmTarget set low (5) and 30 +// orgs racing in simultaneously, the warm pool used to creep up by ~2-3 +// workers per janitor tick because the reconciler only ever filled to the +// static floor. The demand counter on ReserveSharedWorker now surfaces the +// observed shortage so a single reconcile pass scales to ~staticTarget + +// observed_demand = 35, closing the gap in one tick instead of many. +// +// Drives 30 concurrent ReserveSharedWorker calls against a store that +// always returns NoIdle, then simulates one janitor tick by calling +// SpawnMinWorkers(static + ConsumeWarmCapacityDemand()) and asserts the +// pool tried to spawn ~30 fresh slots in that single tick. +func TestK8sPoolWarmCapacityDemandScalesPoolInOneTick(t *testing.T) { + pool, _ := newTestK8sPool(t, 0) // unbounded — matches the post-cap K8s mode + store := &captureRuntimeWorkerStore{ + claimMissReason: configstore.WorkerClaimMissReasonNoIdle, + } + pool.runtimeStore = store + + // Allocate distinct worker IDs for each spawn slot the reconciler will + // request. Real ConfigStore generates these from a sequence; here we + // just hand back monotonically increasing ids so the parallel spawn + // fan-out has unique records to drive. + var slotIdx int64 + store.neutralSpawnedFunc = func() *configstore.WorkerRecord { + id := int(atomic.AddInt64(&slotIdx, 1)) + return &configstore.WorkerRecord{ + WorkerID: 1000 + id, + PodName: fmt.Sprintf("duckgres-worker-test-cp-%d", 1000+id), + State: configstore.WorkerStateSpawning, + OwnerCPInstanceID: pool.cpInstanceID, + } + } + + // Capture every spawn the reconciler kicks off. We don't actually start + // pods — the fake clientset can't, and the test only cares that the + // reconciler asked for ~30 spawns in one pass. + var spawnedMu sync.Mutex + var spawnedIDs []int + pool.spawnWarmWorkerFunc = func(ctx context.Context, id int) error { + spawnedMu.Lock() + spawnedIDs = append(spawnedIDs, id) + spawnedMu.Unlock() + return nil + } + + // Step 1: a static warm-pool floor (the operator-configured + // SharedWarmTarget) — matches the production "5 initial warm" baseline. + const staticTarget = 5 + pool.SetWarmCapacityTarget(staticTarget) + + // Step 2: 30 cold tenants race in simultaneously. Each ReserveSharedWorker + // claim misses (store returns NoIdle), each call records one demand miss. + const burst = 30 + var reserveWG sync.WaitGroup + reserveWG.Add(burst) + for i := 0; i < burst; i++ { + go func(i int) { + defer reserveWG.Done() + worker, err := pool.ReserveSharedWorker(context.Background(), &WorkerAssignment{ + OrgID: fmt.Sprintf("org-%d", i), + Image: pool.workerImage, + }) + var capacityErr *WarmCapacityExhaustedError + if !errors.As(err, &capacityErr) { + t.Errorf("org-%d: expected WarmCapacityExhausted, got worker=%v err=%v", i, worker, err) + } + }(i) + } + reserveWG.Wait() + + // Step 3: drain the demand counter (this is what the janitor's + // reconcileWarmCapacity closure does each tick). + demand := pool.ConsumeWarmCapacityDemand() + if demand != burst { + t.Fatalf("expected ConsumeWarmCapacityDemand=%d, got %d", burst, demand) + } + // And confirm the counter actually reset — a second read must see zero. + if again := pool.ConsumeWarmCapacityDemand(); again != 0 { + t.Fatalf("expected counter reset to 0 after consume, got %d", again) + } + + // Step 4: one janitor tick. Effective target = static + observed demand. + effectiveTarget := staticTarget + demand + if err := pool.SpawnMinWorkers(effectiveTarget); err != nil { + t.Fatalf("SpawnMinWorkers(%d): %v", effectiveTarget, err) + } + + // Step 5: the reconciler must have asked for ~`effectiveTarget` fresh + // slots in this single tick — not crept up by 2-3 per minute the way it + // did before the demand counter was wired up. The exact count depends + // on whether any pre-existing workers were idle (none here, so it + // should be exactly effectiveTarget = 35). + spawnedMu.Lock() + got := len(spawnedIDs) + spawnedMu.Unlock() + if got != effectiveTarget { + t.Fatalf("expected %d spawns in one tick (static %d + demand %d), got %d", + effectiveTarget, staticTarget, demand, got) + } + if store.neutralSpawnCalls != effectiveTarget { + t.Fatalf("expected %d neutral spawn slot allocations, got %d", + effectiveTarget, store.neutralSpawnCalls) + } +} diff --git a/controlplane/multitenant.go b/controlplane/multitenant.go index 8513914f..de7bc464 100644 --- a/controlplane/multitenant.go +++ b/controlplane/multitenant.go @@ -246,8 +246,28 @@ func SetupMultiTenant( return router.sharedPool.retireWorkerWithReason(workerID, reason) } janitor.reconcileWarmCapacity = func() { - target := router.sharedPool.WarmCapacityTarget() + // Demand-aware warm-pool scale-up: drain the warm-capacity miss + // counter and add it to the static target. A burst of N cold tenants + // (all returning WarmCapacityExhausted on first try) bumps the next + // tick's effective target by ~N, so the pool spawns toward demand + // in a single tick instead of creeping up at SharedWarmTarget while + // clients politely re-arrive on the 45-second retry hint. Karpenter + // (~30s) and pod scheduling are the real lower bounds on cold-start + // latency, not CP reconciler step size. + // + // Scale-DOWN is intentionally untouched here — the idle reaper still + // runs on its own slower cadence so dipping idle counts don't thrash + // the pool. We only ever scale UP off the demand counter. + staticTarget := router.sharedPool.WarmCapacityTarget() + demand := router.sharedPool.ConsumeWarmCapacityDemand() + target := staticTarget + demand if target > 0 { + if demand > 0 { + slog.Info("Scaling shared warm pool to absorb demand.", + "static_target", staticTarget, + "observed_demand", demand, + "effective_target", target) + } observeOrgWorkerSpawn("shared") if err := router.sharedPool.SpawnMinWorkers(target); err != nil { slog.Warn("Janitor failed to reconcile shared warm capacity.", "target", target, "error", err)