Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 14 additions & 10 deletions controlplane/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ type K8sConfig struct {
WorkerConfigMap string // ConfigMap name for duckgres.yaml
ImagePullPolicy string // Image pull policy for worker pods (e.g., "Never", "IfNotPresent", "Always")
ServiceAccount string // Neutral ServiceAccount name for worker pods (default: "duckgres-worker")
MaxWorkers int // Global cap for the shared K8s worker pool (0 = auto-derived)
MaxWorkers int // Global cap for the shared K8s worker pool (0 = unbounded; cluster autoscaler is the natural ceiling)
SharedWarmTarget int // Neutral shared warm-worker target for K8s multi-tenant mode (0 = disabled)
WorkerCPURequest string // CPU request for worker pods (e.g., "500m")
WorkerMemoryRequest string // Memory request for worker pods (e.g., "1Gi")
Expand Down Expand Up @@ -346,18 +346,21 @@ func RunControlPlane(cfg ControlPlaneConfig) {
memBudget := server.ParseMemoryBytes(cfg.MemoryBudget)

// Use a temporary rebalancer to auto-detect the budget and derive
// backend-specific default max_workers if not explicitly set.
// process-mode default max_workers if not explicitly set. The K8s
// backend does NOT derive max_workers from this budget — worker pods
// run on separate nodes, so the CP's RAM tells us nothing about how
// many worker pods the cluster can host. For K8s mode, MaxWorkers=0
// means "unbounded" and the cluster's NodePool/autoscaler is the
// natural ceiling.
tempRebalancer := NewMemoryRebalancer(memBudget, 0, nil, false)
memBudget = tempRebalancer.memoryBudget // capture auto-detected value

processMaxWorkers := cfg.Process.MaxWorkers
if processMaxWorkers == 0 {
processMaxWorkers = tempRebalancer.DefaultMaxWorkers()
}
// K8s max_workers: 0 = unbounded (no cap derived from CP memory).
k8sMaxWorkers := cfg.K8s.MaxWorkers
if k8sMaxWorkers == 0 {
k8sMaxWorkers = tempRebalancer.DefaultMaxWorkers()
}

rebalancer := NewMemoryRebalancer(memBudget, 0, nil, cfg.MemoryRebalance)

Expand All @@ -366,10 +369,8 @@ func RunControlPlane(cfg ControlPlaneConfig) {
"process_max_workers", processMaxWorkers,
"memory_budget", formatBytes(memBudget))
}
if isK8s && cfg.K8s.MaxWorkers == 0 {
slog.Info("Derived k8s.max_workers from memory budget.",
"k8s_max_workers", k8sMaxWorkers,
"memory_budget", formatBytes(memBudget))
if isK8s && k8sMaxWorkers == 0 {
slog.Info("k8s.max_workers unset; worker pool is unbounded — cluster autoscaler (e.g. Karpenter) is the ceiling.")
}

processMinWorkers := cfg.Process.MinWorkers
Expand All @@ -381,7 +382,10 @@ func RunControlPlane(cfg ControlPlaneConfig) {
}

k8sSharedWarmTarget := cfg.K8s.SharedWarmTarget
if isK8s && k8sSharedWarmTarget > k8sMaxWorkers {
// Only cap the warm target if k8sMaxWorkers is an actual upper bound
// (>0). When k8sMaxWorkers == 0 the pool is unbounded and the warm
// target stands on its own.
if isK8s && k8sMaxWorkers > 0 && k8sSharedWarmTarget > k8sMaxWorkers {
slog.Warn("k8s.shared_warm_target exceeds k8s.max_workers; capping to k8s.max_workers.",
"k8s_shared_warm_target", k8sSharedWarmTarget,
"k8s_max_workers", k8sMaxWorkers)
Expand Down
47 changes: 46 additions & 1 deletion controlplane/k8s_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"time"

"crypto/x509"
Expand Down Expand Up @@ -98,6 +99,14 @@ type K8sWorkerPool struct {
runtimeStore RuntimeWorkerStore

activatingTimeout time.Duration // max time a worker can stay in reserved/activating before being reaped

// warmCapacityMisses counts ReserveSharedWorker calls that returned
// WarmCapacityExhaustedError since the last ConsumeWarmCapacityDemand call.
// The janitor's warm-capacity reconciler drains this counter each tick and
// scales the warm pool to absorb the observed demand in one shot, rather
// than creeping up at the static SharedWarmTarget floor while cold tenants
// retry on 45-second backoffs. Atomically accessed; no lock needed.
warmCapacityMisses atomic.Int64
}

// NewK8sWorkerPool creates a K8sWorkerPool using in-cluster credentials.
Expand Down Expand Up @@ -145,7 +154,14 @@ func newK8sWorkerPool(cfg K8sWorkerPoolConfig, clientset kubernetes.Interface) (
}

// Limit concurrent K8s API calls to avoid overwhelming the API server.
spawnConcurrency := 3
// The K8s client above is configured for QPS=50 / Burst=100, so 50 in-flight
// pod creates fits comfortably within client-side throttling while letting
// the warm-pool reconciler close large demand gaps (e.g. 30-tenant cold
// ramp) in one tick instead of creeping up at 3-per-pod-startup. Pod
// scheduling latency on the node side is bounded by Karpenter (~30s) and
// the kubelet, neither of which cares about how many pods we create
// in parallel from the CP.
spawnConcurrency := 50
retireConcurrency := 5
pool := &K8sWorkerPool{
workers: make(map[int]*ManagedWorker),
Expand Down Expand Up @@ -1388,6 +1404,15 @@ func (p *K8sWorkerPool) ReserveSharedWorker(ctx context.Context, assignment *Wor
continue
}

// Surface unmet demand to the warm-pool reconciler. An OrgCap miss
// is a per-org cap, not a shared-pool shortage, so spawning more
// neutral warm pods won't help — skip those. Any other miss
// (NoIdle, etc.) means the shared warm pool was too small for the
// observed burst; bump the demand counter so the next reconcile
// tick scales toward `idle + recent_misses` in one shot.
if missReason != configstore.WorkerClaimMissReasonOrgCap {
p.warmCapacityMisses.Add(1)
}
return nil, NewWarmCapacityExhaustedErrorForReason(missReason, DefaultWarmCapacityRetryAfter)
}

Expand Down Expand Up @@ -1453,6 +1478,10 @@ func (p *K8sWorkerPool) ReserveSharedWorker(ctx context.Context, assignment *Wor

p.mu.Unlock()

// Single-CP / no-runtime-store mode: same demand signal as above. The
// next janitor tick scales the warm pool toward absorbed demand
// instead of staying at the static SharedWarmTarget floor.
p.warmCapacityMisses.Add(1)
return nil, NewWarmCapacityExhaustedError(DefaultWarmCapacityRetryAfter)
}
}
Expand Down Expand Up @@ -2872,6 +2901,22 @@ func (p *K8sWorkerPool) WarmCapacityTarget() int {
return p.minWorkers
}

// ConsumeWarmCapacityDemand returns the number of ReserveSharedWorker calls
// that have hit WarmCapacityExhausted (excluding per-org cap misses) since
// the last call, atomically resetting the counter. The warm-pool reconciler
// adds this to the static target each tick so a cold burst of N tenants
// scales the pool toward N workers in one or two ticks, instead of creeping
// up while clients re-arrive on their 45-second retry hints. Scale-down
// stays under the idle reaper's slower cadence so steady-state idle dips
// don't thrash the pool.
func (p *K8sWorkerPool) ConsumeWarmCapacityDemand() int {
n := p.warmCapacityMisses.Swap(0)
if n < 0 {
return 0
}
return int(n)
}

// SetPerImageWarmTargets replaces the per-image warm-worker floor. Each entry
// asks the pool to keep at least N warm-idle workers running with the given
// image. This is layered on top of SetWarmCapacityTarget — the per-image floor
Expand Down
107 changes: 107 additions & 0 deletions controlplane/k8s_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"regexp"
"strconv"
"sync"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -3364,3 +3365,109 @@ func TestReapIdleWorkersSkipsWorkersWithinIdleTimeout(t *testing.T) {
t.Errorf("expected both workers to survive (under idleTimeout); got %d remaining", len(pool.workers))
}
}

// TestK8sPoolWarmCapacityDemandScalesPoolInOneTick reproduces the 30-tenant
// cold-ramp regression: with the static SharedWarmTarget set low (5) and 30
// orgs racing in simultaneously, the warm pool used to creep up by ~2-3
// workers per janitor tick because the reconciler only ever filled to the
// static floor. The demand counter on ReserveSharedWorker now surfaces the
// observed shortage so a single reconcile pass scales to ~staticTarget +
// observed_demand = 35, closing the gap in one tick instead of many.
//
// Drives 30 concurrent ReserveSharedWorker calls against a store that
// always returns NoIdle, then simulates one janitor tick by calling
// SpawnMinWorkers(static + ConsumeWarmCapacityDemand()) and asserts the
// pool tried to spawn ~30 fresh slots in that single tick.
func TestK8sPoolWarmCapacityDemandScalesPoolInOneTick(t *testing.T) {
pool, _ := newTestK8sPool(t, 0) // unbounded — matches the post-cap K8s mode
store := &captureRuntimeWorkerStore{
claimMissReason: configstore.WorkerClaimMissReasonNoIdle,
}
pool.runtimeStore = store

// Allocate distinct worker IDs for each spawn slot the reconciler will
// request. Real ConfigStore generates these from a sequence; here we
// just hand back monotonically increasing ids so the parallel spawn
// fan-out has unique records to drive.
var slotIdx int64
store.neutralSpawnedFunc = func() *configstore.WorkerRecord {
id := int(atomic.AddInt64(&slotIdx, 1))
return &configstore.WorkerRecord{
WorkerID: 1000 + id,
PodName: fmt.Sprintf("duckgres-worker-test-cp-%d", 1000+id),
State: configstore.WorkerStateSpawning,
OwnerCPInstanceID: pool.cpInstanceID,
}
}

// Capture every spawn the reconciler kicks off. We don't actually start
// pods — the fake clientset can't, and the test only cares that the
// reconciler asked for ~30 spawns in one pass.
var spawnedMu sync.Mutex
var spawnedIDs []int
pool.spawnWarmWorkerFunc = func(ctx context.Context, id int) error {
spawnedMu.Lock()
spawnedIDs = append(spawnedIDs, id)
spawnedMu.Unlock()
return nil
}

// Step 1: a static warm-pool floor (the operator-configured
// SharedWarmTarget) — matches the production "5 initial warm" baseline.
const staticTarget = 5
pool.SetWarmCapacityTarget(staticTarget)

// Step 2: 30 cold tenants race in simultaneously. Each ReserveSharedWorker
// claim misses (store returns NoIdle), each call records one demand miss.
const burst = 30
var reserveWG sync.WaitGroup
reserveWG.Add(burst)
for i := 0; i < burst; i++ {
go func(i int) {
defer reserveWG.Done()
worker, err := pool.ReserveSharedWorker(context.Background(), &WorkerAssignment{
OrgID: fmt.Sprintf("org-%d", i),
Image: pool.workerImage,
})
var capacityErr *WarmCapacityExhaustedError
if !errors.As(err, &capacityErr) {
t.Errorf("org-%d: expected WarmCapacityExhausted, got worker=%v err=%v", i, worker, err)
}
}(i)
}
reserveWG.Wait()

// Step 3: drain the demand counter (this is what the janitor's
// reconcileWarmCapacity closure does each tick).
demand := pool.ConsumeWarmCapacityDemand()
if demand != burst {
t.Fatalf("expected ConsumeWarmCapacityDemand=%d, got %d", burst, demand)
}
// And confirm the counter actually reset — a second read must see zero.
if again := pool.ConsumeWarmCapacityDemand(); again != 0 {
t.Fatalf("expected counter reset to 0 after consume, got %d", again)
}

// Step 4: one janitor tick. Effective target = static + observed demand.
effectiveTarget := staticTarget + demand
if err := pool.SpawnMinWorkers(effectiveTarget); err != nil {
t.Fatalf("SpawnMinWorkers(%d): %v", effectiveTarget, err)
}

// Step 5: the reconciler must have asked for ~`effectiveTarget` fresh
// slots in this single tick — not crept up by 2-3 per minute the way it
// did before the demand counter was wired up. The exact count depends
// on whether any pre-existing workers were idle (none here, so it
// should be exactly effectiveTarget = 35).
spawnedMu.Lock()
got := len(spawnedIDs)
spawnedMu.Unlock()
if got != effectiveTarget {
t.Fatalf("expected %d spawns in one tick (static %d + demand %d), got %d",
effectiveTarget, staticTarget, demand, got)
}
if store.neutralSpawnCalls != effectiveTarget {
t.Fatalf("expected %d neutral spawn slot allocations, got %d",
effectiveTarget, store.neutralSpawnCalls)
}
}
22 changes: 21 additions & 1 deletion controlplane/multitenant.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,8 +246,28 @@ func SetupMultiTenant(
return router.sharedPool.retireWorkerWithReason(workerID, reason)
}
janitor.reconcileWarmCapacity = func() {
target := router.sharedPool.WarmCapacityTarget()
// Demand-aware warm-pool scale-up: drain the warm-capacity miss
// counter and add it to the static target. A burst of N cold tenants
// (all returning WarmCapacityExhausted on first try) bumps the next
// tick's effective target by ~N, so the pool spawns toward demand
// in a single tick instead of creeping up at SharedWarmTarget while
// clients politely re-arrive on the 45-second retry hint. Karpenter
// (~30s) and pod scheduling are the real lower bounds on cold-start
// latency, not CP reconciler step size.
//
// Scale-DOWN is intentionally untouched here — the idle reaper still
// runs on its own slower cadence so dipping idle counts don't thrash
// the pool. We only ever scale UP off the demand counter.
staticTarget := router.sharedPool.WarmCapacityTarget()
demand := router.sharedPool.ConsumeWarmCapacityDemand()
target := staticTarget + demand
if target > 0 {
if demand > 0 {
slog.Info("Scaling shared warm pool to absorb demand.",
"static_target", staticTarget,
"observed_demand", demand,
"effective_target", target)
}
observeOrgWorkerSpawn("shared")
if err := router.sharedPool.SpawnMinWorkers(target); err != nil {
slog.Warn("Janitor failed to reconcile shared warm capacity.", "target", target, "error", err)
Expand Down
52 changes: 52 additions & 0 deletions controlplane/org_reserved_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,58 @@ func TestOrgReservedWorkerPoolAcquireDelegatesActivationWithoutCachedTenantRunti
}
}

// TestOrgReservedPoolAcquireUnboundedWhenMaxWorkersZero confirms that
// MaxWorkers == 0 means "no per-org cap" in K8s mode. The load test that
// motivated the cap removal stabilised at 11 workers because the CP
// derived MaxWorkers from CP host memory; the only thing keeping us from
// scaling further was this synthetic cap. With MaxWorkers == 0, the pool
// must keep handing out new workers as long as the shared (cluster) pool
// has room.
func TestOrgReservedPoolAcquireUnboundedWhenMaxWorkersZero(t *testing.T) {
shared, _ := newTestK8sPool(t, 0) // shared pool also unbounded
shared.healthCheckFunc = func(ctx context.Context, worker *ManagedWorker) error {
return nil
}
// Pre-seed many neutral warm workers so AcquireWorker can reserve
// each one in turn without blocking on a real spawn path.
const target = 30
for i := 1; i <= target; i++ {
addNeutralWarmWorker(shared, i)
}
shared.spawnWarmWorkerFunc = func(ctx context.Context, id int) error {
shared.mu.Lock()
shared.workers[id] = &ManagedWorker{ID: id, image: shared.workerImage, done: make(chan struct{})}
shared.mu.Unlock()
return nil
}

// maxWorkers = 0 — the change under test. AcquireWorker must NOT
// reject on max-workers grounds.
pool := NewOrgReservedPool(shared, "analytics", 0, shared.workerImage, nil)
pool.activateReservedWorker = func(ctx context.Context, worker *ManagedWorker) error {
return nil
}

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

seen := make(map[int]struct{}, target)
for i := 0; i < target; i++ {
w, err := pool.AcquireWorker(ctx)
if err != nil {
t.Fatalf("AcquireWorker[%d] failed with maxWorkers=0: %v", i, err)
}
if _, dup := seen[w.ID]; dup {
t.Fatalf("AcquireWorker[%d] returned duplicate worker ID %d", i, w.ID)
}
seen[w.ID] = struct{}{}
}

if got := pool.assignedWorkerCountLocked(); got < target {
t.Fatalf("expected at least %d assigned workers, got %d", target, got)
}
}

func TestOrgReservedPoolAcquireWaitsWhenSharedWarmWorkerBusyAtCapacity(t *testing.T) {
shared, _ := newTestK8sPool(t, 5)
worker := &ManagedWorker{ID: 3, activeSessions: 1, done: make(chan struct{})}
Expand Down
Loading