From 4d950097631b470476c7c726f27b41540e31c24f Mon Sep 17 00:00:00 2001 From: Bill Guowei Yang Date: Thu, 21 May 2026 16:10:09 -0400 Subject: [PATCH 1/2] Wire dynamic warm capacity reconciliation --- README.md | 21 ++ configresolve/cliflags.go | 12 ++ configresolve/cliflags_test.go | 6 + configresolve/resolve.go | 156 +++++++++------ configresolve/resolve_k8s_test.go | 50 +++-- controlplane/k8s_pool.go | 25 --- controlplane/k8s_pool_test.go | 107 ----------- controlplane/multitenant.go | 110 ++++++----- .../multitenant_warm_capacity_test.go | 180 ++++++++++++++++++ duckgres.example.yaml | 9 + main.go | 6 + 11 files changed, 423 insertions(+), 259 deletions(-) create mode 100644 controlplane/multitenant_warm_capacity_test.go diff --git a/README.md b/README.md index 3631954..a83503d 100644 --- a/README.md +++ b/README.md @@ -224,6 +224,12 @@ Run with config file: | `DUCKGRES_MANAGED_HOSTNAME_SUFFIXES` | Comma-separated managed hostname suffixes such as `.dw.us.postwh.com` | - | | `DUCKGRES_K8S_MAX_WORKERS` | Global cap for shared K8s workers (`0` means Duckgres does not impose a cap) | `0` | | `DUCKGRES_K8S_SHARED_WARM_TARGET` | Default-image neutral shared warm-worker target for K8s multi-tenant mode (`0` disables prewarm; subject to `DUCKGRES_K8S_MAX_WORKERS`) | `0` | +| `DUCKGRES_K8S_DYNAMIC_WARM_CAPACITY_ENABLED` | Enable configstore-driven dynamic warm-capacity target computation from recent no-idle misses | `true` | +| `DUCKGRES_K8S_WARM_CAPACITY_MISS_WINDOW` | Recent no-idle miss window used for dynamic warm-capacity demand | `2m` | +| `DUCKGRES_K8S_WARM_CAPACITY_MISSES_PER_WORKER` | Recent misses required for one extra dynamic warm worker | `8` | +| `DUCKGRES_K8S_WARM_CAPACITY_DEMAND_TTL` | Retention TTL for warm-capacity miss buckets; clamped to at least the miss window | `15m` | +| `DUCKGRES_K8S_WARM_CAPACITY_DYNAMIC_IMAGE_CEILING` | Max dynamic extra warm workers per image (`0` means unlimited) | `0` | +| `DUCKGRES_K8S_WARM_CAPACITY_DYNAMIC_TOTAL_CEILING` | Max dynamic extra warm workers across images (`0` means unlimited) | `0` | | `DUCKGRES_DUCKLAKE_METADATA_STORE` | DuckLake metadata connection string | - | | `DUCKGRES_DUCKLAKE_DELTA_CATALOG_ENABLED` | Attach a Delta Lake catalog/table during worker boot/activation | `false` | | `DUCKGRES_DUCKLAKE_DELTA_CATALOG_PATH` | Delta Lake catalog/table path; defaults to sibling `delta/` prefix at the DuckLake object-store root when enabled | Derived | @@ -283,6 +289,21 @@ Options: -sni-routing-mode string Hostname routing: off, passthrough, or enforce -managed-hostname-suffixes string Comma-separated managed tenant hostname suffixes + -k8s-max-workers int Max K8s workers in the shared pool, 0=unbounded + -k8s-shared-warm-target int + Neutral shared warm-worker target for K8s multi-tenant mode, 0=disabled + -k8s-dynamic-warm-capacity-enabled + Enable configstore-driven dynamic warm-capacity target computation (default true) + -k8s-warm-capacity-miss-window string + Recent no-idle miss window for dynamic warm-capacity demand (default 2m) + -k8s-warm-capacity-misses-per-worker int + Recent misses required for one extra dynamic warm worker (default 8) + -k8s-warm-capacity-demand-ttl string + Retention TTL for warm-capacity miss buckets (default 15m) + -k8s-warm-capacity-dynamic-image-ceiling int + Max dynamic extra warm workers per image, 0=unlimited + -k8s-warm-capacity-dynamic-total-ceiling int + Max dynamic extra warm workers across images, 0=unlimited ``` ## DuckDB Extensions diff --git a/configresolve/cliflags.go b/configresolve/cliflags.go index f4816f0..6fe2e5c 100644 --- a/configresolve/cliflags.go +++ b/configresolve/cliflags.go @@ -75,6 +75,12 @@ func RegisterCLIInputsFlags(fs *flag.FlagSet) func() CLIInputs { k8sWorkerServiceAccount := fs.String("k8s-worker-service-account", "", "Neutral ServiceAccount name for K8s worker pods (default: duckgres-worker) (env: DUCKGRES_K8S_WORKER_SERVICE_ACCOUNT)") k8sMaxWorkers := fs.Int("k8s-max-workers", 0, "Max K8s workers in the shared pool, 0=unbounded (env: DUCKGRES_K8S_MAX_WORKERS)") k8sSharedWarmTarget := fs.Int("k8s-shared-warm-target", 0, "Neutral shared warm-worker target for K8s multi-tenant mode, 0=disabled (env: DUCKGRES_K8S_SHARED_WARM_TARGET)") + k8sDynamicWarmCapacityEnabled := fs.Bool("k8s-dynamic-warm-capacity-enabled", true, "Enable configstore-driven dynamic warm-capacity target computation (default true; use --k8s-dynamic-warm-capacity-enabled=false to disable; env: DUCKGRES_K8S_DYNAMIC_WARM_CAPACITY_ENABLED)") + k8sWarmCapacityMissWindow := fs.String("k8s-warm-capacity-miss-window", "", "Recent no-idle miss window for dynamic warm-capacity demand (default: 2m) (env: DUCKGRES_K8S_WARM_CAPACITY_MISS_WINDOW)") + k8sWarmCapacityMissesPerWorker := fs.Int("k8s-warm-capacity-misses-per-worker", 0, "Recent misses required for one extra dynamic warm worker (default: 8) (env: DUCKGRES_K8S_WARM_CAPACITY_MISSES_PER_WORKER)") + k8sWarmCapacityDemandTTL := fs.String("k8s-warm-capacity-demand-ttl", "", "Retention TTL for warm-capacity miss buckets (default: 15m) (env: DUCKGRES_K8S_WARM_CAPACITY_DEMAND_TTL)") + k8sWarmCapacityDynamicImageCeiling := fs.Int("k8s-warm-capacity-dynamic-image-ceiling", 0, "Max dynamic extra warm workers per image, 0=unlimited (env: DUCKGRES_K8S_WARM_CAPACITY_DYNAMIC_IMAGE_CEILING)") + k8sWarmCapacityDynamicTotalCeiling := fs.Int("k8s-warm-capacity-dynamic-total-ceiling", 0, "Max dynamic extra warm workers across images, 0=unlimited (env: DUCKGRES_K8S_WARM_CAPACITY_DYNAMIC_TOTAL_CEILING)") awsRegion := fs.String("aws-region", "", "AWS region for STS client (env: DUCKGRES_AWS_REGION)") queryLog := fs.Bool("query-log", true, "Enable/disable DuckLake query log (use --query-log=false to disable; env: DUCKGRES_QUERY_LOG_ENABLED)") @@ -136,6 +142,12 @@ func RegisterCLIInputsFlags(fs *flag.FlagSet) func() CLIInputs { cli.K8sWorkerServiceAccount = *k8sWorkerServiceAccount cli.K8sMaxWorkers = *k8sMaxWorkers cli.K8sSharedWarmTarget = *k8sSharedWarmTarget + cli.K8sDynamicWarmCapacityEnabled = *k8sDynamicWarmCapacityEnabled + cli.K8sWarmCapacityMissWindow = *k8sWarmCapacityMissWindow + cli.K8sWarmCapacityMissesPerWorker = *k8sWarmCapacityMissesPerWorker + cli.K8sWarmCapacityDemandTTL = *k8sWarmCapacityDemandTTL + cli.K8sWarmCapacityDynamicImageCeiling = *k8sWarmCapacityDynamicImageCeiling + cli.K8sWarmCapacityDynamicTotalCeiling = *k8sWarmCapacityDynamicTotalCeiling cli.AWSRegion = *awsRegion cli.QueryLog = *queryLog return cli diff --git a/configresolve/cliflags_test.go b/configresolve/cliflags_test.go index 4181e35..0e8c6b2 100644 --- a/configresolve/cliflags_test.go +++ b/configresolve/cliflags_test.go @@ -46,6 +46,12 @@ func fieldNameToFlagName(name string) string { {"K8sWorkerNamespace", "k8s-worker-namespace"}, {"K8sWorkerConfigMap", "k8s-worker-configmap"}, {"K8sSharedWarmTarget", "k8s-shared-warm-target"}, + {"K8sDynamicWarmCapacityEnabled", "k8s-dynamic-warm-capacity-enabled"}, + {"K8sWarmCapacityMissWindow", "k8s-warm-capacity-miss-window"}, + {"K8sWarmCapacityMissesPerWorker", "k8s-warm-capacity-misses-per-worker"}, + {"K8sWarmCapacityDemandTTL", "k8s-warm-capacity-demand-ttl"}, + {"K8sWarmCapacityDynamicImageCeiling", "k8s-warm-capacity-dynamic-image-ceiling"}, + {"K8sWarmCapacityDynamicTotalCeiling", "k8s-warm-capacity-dynamic-total-ceiling"}, {"K8sWorkerImage", "k8s-worker-image"}, {"K8sWorkerSecret", "k8s-worker-secret"}, {"K8sWorkerPort", "k8s-worker-port"}, diff --git a/configresolve/resolve.go b/configresolve/resolve.go index 7c0640a..a37a76d 100644 --- a/configresolve/resolve.go +++ b/configresolve/resolve.go @@ -27,67 +27,73 @@ import ( type CLIInputs struct { Set map[string]bool - Host string - Port int - FlightPort int - FlightSessionIdleTTL string - FlightSessionReapInterval string - FlightHandleIdleTTL string - FlightSessionTokenTTL string - DataDir string - CertFile string - KeyFile string - FilePersistence bool - ProcessIsolation bool - IdleTimeout string - SessionInitTimeout string - MemoryLimit string - Threads int - MemoryBudget string - MemoryRebalance bool - DuckLakeDeltaCatalogEnabled bool - DuckLakeDeltaCatalogPath string - DuckLakeDefaultSpecVersion string - IcebergEnabled bool - IcebergTableBucket string - IcebergRegion string - IcebergNamespace string - ProcessMinWorkers int - ProcessMaxWorkers int - ProcessRetireOnSessionEnd bool - WorkerQueueTimeout string - WorkerIdleTimeout string - HandoverDrainTimeout string - ACMEDomain string - ACMEEmail string - ACMECacheDir string - ACMEDNSProvider string - ACMEDNSZoneID string - MaxConnections int - ConfigStoreConn string - ConfigPollInterval string - InternalSecret string - SNIRoutingMode string - ManagedHostnameSuffixes string - WorkerBackend string - K8sWorkerImage string - K8sWorkerNamespace string - K8sControlPlaneID string - K8sWorkerPort int - K8sWorkerSecret string - K8sWorkerConfigMap string - K8sWorkerImagePullPolicy string - K8sWorkerServiceAccount string - K8sMaxWorkers int - K8sSharedWarmTarget int - K8sWorkerCPURequest string - K8sWorkerMemoryRequest string - K8sWorkerNodeSelector string - K8sWorkerTolerationKey string - K8sWorkerTolerationValue string - K8sWorkerExclusiveNode bool - AWSRegion string - QueryLog bool + Host string + Port int + FlightPort int + FlightSessionIdleTTL string + FlightSessionReapInterval string + FlightHandleIdleTTL string + FlightSessionTokenTTL string + DataDir string + CertFile string + KeyFile string + FilePersistence bool + ProcessIsolation bool + IdleTimeout string + SessionInitTimeout string + MemoryLimit string + Threads int + MemoryBudget string + MemoryRebalance bool + DuckLakeDeltaCatalogEnabled bool + DuckLakeDeltaCatalogPath string + DuckLakeDefaultSpecVersion string + IcebergEnabled bool + IcebergTableBucket string + IcebergRegion string + IcebergNamespace string + ProcessMinWorkers int + ProcessMaxWorkers int + ProcessRetireOnSessionEnd bool + WorkerQueueTimeout string + WorkerIdleTimeout string + HandoverDrainTimeout string + ACMEDomain string + ACMEEmail string + ACMECacheDir string + ACMEDNSProvider string + ACMEDNSZoneID string + MaxConnections int + ConfigStoreConn string + ConfigPollInterval string + InternalSecret string + SNIRoutingMode string + ManagedHostnameSuffixes string + WorkerBackend string + K8sWorkerImage string + K8sWorkerNamespace string + K8sControlPlaneID string + K8sWorkerPort int + K8sWorkerSecret string + K8sWorkerConfigMap string + K8sWorkerImagePullPolicy string + K8sWorkerServiceAccount string + K8sMaxWorkers int + K8sSharedWarmTarget int + K8sDynamicWarmCapacityEnabled bool + K8sWarmCapacityMissWindow string + K8sWarmCapacityMissesPerWorker int + K8sWarmCapacityDemandTTL string + K8sWarmCapacityDynamicImageCeiling int + K8sWarmCapacityDynamicTotalCeiling int + K8sWorkerCPURequest string + K8sWorkerMemoryRequest string + K8sWorkerNodeSelector string + K8sWorkerTolerationKey string + K8sWorkerTolerationValue string + K8sWorkerExclusiveNode bool + AWSRegion string + QueryLog bool } type Resolved struct { @@ -195,7 +201,7 @@ func ResolveEffective(fileCfg *configloader.FileConfig, cli CLIInputs, getenv fu var k8sWorkerSecret, k8sWorkerConfigMap, k8sWorkerImagePullPolicy string k8sWorkerServiceAccount := controlplane.DefaultK8sWorkerServiceAccount var k8sMaxWorkers, k8sSharedWarmTarget int - var k8sDynamicWarmCapacityEnabled bool + k8sDynamicWarmCapacityEnabled := true k8sWarmCapacityMissWindow := controlplane.DefaultWarmCapacityMissWindow k8sWarmCapacityMissesPerWorker := controlplane.DefaultWarmCapacityMissesPerWorker k8sWarmCapacityDemandTTL := controlplane.DefaultWarmCapacityDemandTTL @@ -1155,6 +1161,32 @@ func ResolveEffective(fileCfg *configloader.FileConfig, cli CLIInputs, getenv fu if cli.Set["k8s-shared-warm-target"] { k8sSharedWarmTarget = cli.K8sSharedWarmTarget } + if cli.Set["k8s-dynamic-warm-capacity-enabled"] { + k8sDynamicWarmCapacityEnabled = cli.K8sDynamicWarmCapacityEnabled + } + if cli.Set["k8s-warm-capacity-miss-window"] { + if d, err := time.ParseDuration(cli.K8sWarmCapacityMissWindow); err == nil { + k8sWarmCapacityMissWindow = d + } else { + warn("Invalid --k8s-warm-capacity-miss-window duration: " + err.Error()) + } + } + if cli.Set["k8s-warm-capacity-misses-per-worker"] { + k8sWarmCapacityMissesPerWorker = cli.K8sWarmCapacityMissesPerWorker + } + if cli.Set["k8s-warm-capacity-demand-ttl"] { + if d, err := time.ParseDuration(cli.K8sWarmCapacityDemandTTL); err == nil { + k8sWarmCapacityDemandTTL = d + } else { + warn("Invalid --k8s-warm-capacity-demand-ttl duration: " + err.Error()) + } + } + if cli.Set["k8s-warm-capacity-dynamic-image-ceiling"] { + k8sWarmCapacityDynamicImageCeiling = cli.K8sWarmCapacityDynamicImageCeiling + } + if cli.Set["k8s-warm-capacity-dynamic-total-ceiling"] { + k8sWarmCapacityDynamicTotalCeiling = cli.K8sWarmCapacityDynamicTotalCeiling + } if cli.Set["aws-region"] { awsRegion = cli.AWSRegion } diff --git a/configresolve/resolve_k8s_test.go b/configresolve/resolve_k8s_test.go index 2a94098..c290f59 100644 --- a/configresolve/resolve_k8s_test.go +++ b/configresolve/resolve_k8s_test.go @@ -33,8 +33,8 @@ func TestResolveEffectiveExposesDuckLakeDefaultSpecVersionForControlPlane(t *tes func TestResolveEffectiveDefaultsK8sDynamicWarmCapacityConfig(t *testing.T) { resolved := ResolveEffective(nil, CLIInputs{}, nil, nil) - if resolved.K8sDynamicWarmCapacityEnabled { - t.Fatal("expected dynamic warm capacity to default disabled") + if !resolved.K8sDynamicWarmCapacityEnabled { + t.Fatal("expected dynamic warm capacity to default enabled") } if resolved.K8sWarmCapacityMissWindow != controlplane.DefaultWarmCapacityMissWindow { t.Fatalf("expected miss window %s, got %s", controlplane.DefaultWarmCapacityMissWindow, resolved.K8sWarmCapacityMissWindow) @@ -53,8 +53,8 @@ func TestResolveEffectiveDefaultsK8sDynamicWarmCapacityConfig(t *testing.T) { } } -func TestResolveEffectiveK8sDynamicWarmCapacityEnvOverridesFile(t *testing.T) { - fileEnabled := false +func TestResolveEffectiveK8sDynamicWarmCapacityPrecedence(t *testing.T) { + fileEnabled := true fileCfg := &configloader.FileConfig{ K8s: configloader.K8sFileConfig{ DynamicWarmCapacityEnabled: &fileEnabled, @@ -73,28 +73,44 @@ func TestResolveEffectiveK8sDynamicWarmCapacityEnvOverridesFile(t *testing.T) { "DUCKGRES_K8S_WARM_CAPACITY_DYNAMIC_IMAGE_CEILING": "3", "DUCKGRES_K8S_WARM_CAPACITY_DYNAMIC_TOTAL_CEILING": "5", } + cli := CLIInputs{ + Set: map[string]bool{ + "k8s-dynamic-warm-capacity-enabled": true, + "k8s-warm-capacity-miss-window": true, + "k8s-warm-capacity-misses-per-worker": true, + "k8s-warm-capacity-demand-ttl": true, + "k8s-warm-capacity-dynamic-image-ceiling": true, + "k8s-warm-capacity-dynamic-total-ceiling": true, + }, + K8sDynamicWarmCapacityEnabled: false, + K8sWarmCapacityMissWindow: "5m", + K8sWarmCapacityMissesPerWorker: 7, + K8sWarmCapacityDemandTTL: "14m", + K8sWarmCapacityDynamicImageCeiling: 8, + K8sWarmCapacityDynamicTotalCeiling: 9, + } - resolved := ResolveEffective(fileCfg, CLIInputs{}, func(key string) string { + resolved := ResolveEffective(fileCfg, cli, func(key string) string { return env[key] }, nil) - if !resolved.K8sDynamicWarmCapacityEnabled { - t.Fatal("expected env true to override file false") + if resolved.K8sDynamicWarmCapacityEnabled { + t.Fatal("expected explicit CLI false to override file/env true") } - if resolved.K8sWarmCapacityMissWindow != 4*time.Minute { - t.Fatalf("expected miss window 4m, got %s", resolved.K8sWarmCapacityMissWindow) + if resolved.K8sWarmCapacityMissWindow != 5*time.Minute { + t.Fatalf("expected miss window 5m, got %s", resolved.K8sWarmCapacityMissWindow) } - if resolved.K8sWarmCapacityMissesPerWorker != 6 { - t.Fatalf("expected misses per worker 6, got %d", resolved.K8sWarmCapacityMissesPerWorker) + if resolved.K8sWarmCapacityMissesPerWorker != 7 { + t.Fatalf("expected misses per worker 7, got %d", resolved.K8sWarmCapacityMissesPerWorker) } - if resolved.K8sWarmCapacityDemandTTL != 13*time.Minute { - t.Fatalf("expected demand TTL 13m, got %s", resolved.K8sWarmCapacityDemandTTL) + if resolved.K8sWarmCapacityDemandTTL != 14*time.Minute { + t.Fatalf("expected demand TTL 14m, got %s", resolved.K8sWarmCapacityDemandTTL) } - if resolved.K8sWarmCapacityDynamicImageCeiling != 3 { - t.Fatalf("expected image ceiling 3, got %d", resolved.K8sWarmCapacityDynamicImageCeiling) + if resolved.K8sWarmCapacityDynamicImageCeiling != 8 { + t.Fatalf("expected image ceiling 8, got %d", resolved.K8sWarmCapacityDynamicImageCeiling) } - if resolved.K8sWarmCapacityDynamicTotalCeiling != 5 { - t.Fatalf("expected total ceiling 5, got %d", resolved.K8sWarmCapacityDynamicTotalCeiling) + if resolved.K8sWarmCapacityDynamicTotalCeiling != 9 { + t.Fatalf("expected total ceiling 9, got %d", resolved.K8sWarmCapacityDynamicTotalCeiling) } } diff --git a/controlplane/k8s_pool.go b/controlplane/k8s_pool.go index a7d3414..e897e06 100644 --- a/controlplane/k8s_pool.go +++ b/controlplane/k8s_pool.go @@ -15,7 +15,6 @@ import ( "strconv" "strings" "sync" - "sync/atomic" "time" "crypto/x509" @@ -100,13 +99,6 @@ type K8sWorkerPool struct { activatingTimeout time.Duration // max time a worker can stay in reserved/activating before being reaped - // warmCapacityMisses counts ReserveSharedWorker calls that returned - // WarmCapacityExhaustedError since the last ConsumeWarmCapacityDemand call. - // The janitor's warm-capacity reconciler drains this counter each tick and - // scales the warm pool to absorb the observed demand in one shot, rather - // than creeping up at the static SharedWarmTarget floor while cold tenants - // retry on 45-second backoffs. Atomically accessed; no lock needed. - warmCapacityMisses atomic.Int64 } // NewK8sWorkerPool creates a K8sWorkerPool using in-cluster credentials. @@ -1484,7 +1476,6 @@ func (p *K8sWorkerPool) recordWarmCapacityMiss(assignment *WorkerAssignment, rea if !policy.recordDynamicDemand { return } - p.warmCapacityMisses.Add(1) if p.runtimeStore == nil { return } @@ -2921,22 +2912,6 @@ func (p *K8sWorkerPool) WarmCapacityTarget() int { return p.minWorkers } -// ConsumeWarmCapacityDemand returns the number of ReserveSharedWorker calls -// that have hit WarmCapacityExhausted (excluding per-org cap misses) since -// the last call, atomically resetting the counter. The warm-pool reconciler -// adds this to the static target each tick so a cold burst of N tenants -// scales the pool toward N workers in one or two ticks, instead of creeping -// up while clients re-arrive on their 45-second retry hints. Scale-down -// stays under the idle reaper's slower cadence so steady-state idle dips -// don't thrash the pool. -func (p *K8sWorkerPool) ConsumeWarmCapacityDemand() int { - n := p.warmCapacityMisses.Swap(0) - if n < 0 { - return 0 - } - return int(n) -} - // SetPerImageWarmTargets replaces the per-image warm-worker floor. Each entry // asks the pool to keep at least N warm-idle workers running with the given // image. This is layered on top of SetWarmCapacityTarget — the per-image floor diff --git a/controlplane/k8s_pool_test.go b/controlplane/k8s_pool_test.go index a3e39c1..5f6ca44 100644 --- a/controlplane/k8s_pool_test.go +++ b/controlplane/k8s_pool_test.go @@ -9,7 +9,6 @@ import ( "regexp" "strconv" "sync" - "sync/atomic" "testing" "time" @@ -3541,109 +3540,3 @@ func TestReapIdleWorkersSkipsWorkersWithinIdleTimeout(t *testing.T) { t.Errorf("expected both workers to survive (under idleTimeout); got %d remaining", len(pool.workers)) } } - -// TestK8sPoolWarmCapacityDemandScalesPoolInOneTick reproduces the 30-tenant -// cold-ramp regression: with the static SharedWarmTarget set low (5) and 30 -// orgs racing in simultaneously, the warm pool used to creep up by ~2-3 -// workers per janitor tick because the reconciler only ever filled to the -// static floor. The demand counter on ReserveSharedWorker now surfaces the -// observed shortage so a single reconcile pass scales to ~staticTarget + -// observed_demand = 35, closing the gap in one tick instead of many. -// -// Drives 30 concurrent ReserveSharedWorker calls against a store that -// always returns NoIdle, then simulates one janitor tick by calling -// SpawnMinWorkers(static + ConsumeWarmCapacityDemand()) and asserts the -// pool tried to spawn ~30 fresh slots in that single tick. -func TestK8sPoolWarmCapacityDemandScalesPoolInOneTick(t *testing.T) { - pool, _ := newTestK8sPool(t, 0) // unbounded — matches the post-cap K8s mode - store := &captureRuntimeWorkerStore{ - claimMissReason: configstore.WorkerClaimMissReasonNoIdle, - } - pool.runtimeStore = store - - // Allocate distinct worker IDs for each spawn slot the reconciler will - // request. Real ConfigStore generates these from a sequence; here we - // just hand back monotonically increasing ids so the parallel spawn - // fan-out has unique records to drive. - var slotIdx int64 - store.neutralSpawnedFunc = func() *configstore.WorkerRecord { - id := int(atomic.AddInt64(&slotIdx, 1)) - return &configstore.WorkerRecord{ - WorkerID: 1000 + id, - PodName: fmt.Sprintf("duckgres-worker-test-cp-%d", 1000+id), - State: configstore.WorkerStateSpawning, - OwnerCPInstanceID: pool.cpInstanceID, - } - } - - // Capture every spawn the reconciler kicks off. We don't actually start - // pods — the fake clientset can't, and the test only cares that the - // reconciler asked for ~30 spawns in one pass. - var spawnedMu sync.Mutex - var spawnedIDs []int - pool.spawnWarmWorkerFunc = func(ctx context.Context, id int) error { - spawnedMu.Lock() - spawnedIDs = append(spawnedIDs, id) - spawnedMu.Unlock() - return nil - } - - // Step 1: a static warm-pool floor (the operator-configured - // SharedWarmTarget) — matches the production "5 initial warm" baseline. - const staticTarget = 5 - pool.SetWarmCapacityTarget(staticTarget) - - // Step 2: 30 cold tenants race in simultaneously. Each ReserveSharedWorker - // claim misses (store returns NoIdle), each call records one demand miss. - const burst = 30 - var reserveWG sync.WaitGroup - reserveWG.Add(burst) - for i := 0; i < burst; i++ { - go func(i int) { - defer reserveWG.Done() - worker, err := pool.ReserveSharedWorker(context.Background(), &WorkerAssignment{ - OrgID: fmt.Sprintf("org-%d", i), - Image: pool.workerImage, - }) - var capacityErr *WarmCapacityExhaustedError - if !errors.As(err, &capacityErr) { - t.Errorf("org-%d: expected WarmCapacityExhausted, got worker=%v err=%v", i, worker, err) - } - }(i) - } - reserveWG.Wait() - - // Step 3: drain the demand counter (this is what the janitor's - // reconcileWarmCapacity closure does each tick). - demand := pool.ConsumeWarmCapacityDemand() - if demand != burst { - t.Fatalf("expected ConsumeWarmCapacityDemand=%d, got %d", burst, demand) - } - // And confirm the counter actually reset — a second read must see zero. - if again := pool.ConsumeWarmCapacityDemand(); again != 0 { - t.Fatalf("expected counter reset to 0 after consume, got %d", again) - } - - // Step 4: one janitor tick. Effective target = static + observed demand. - effectiveTarget := staticTarget + demand - if err := pool.SpawnMinWorkers(effectiveTarget); err != nil { - t.Fatalf("SpawnMinWorkers(%d): %v", effectiveTarget, err) - } - - // Step 5: the reconciler must have asked for ~`effectiveTarget` fresh - // slots in this single tick — not crept up by 2-3 per minute the way it - // did before the demand counter was wired up. The exact count depends - // on whether any pre-existing workers were idle (none here, so it - // should be exactly effectiveTarget = 35). - spawnedMu.Lock() - got := len(spawnedIDs) - spawnedMu.Unlock() - if got != effectiveTarget { - t.Fatalf("expected %d spawns in one tick (static %d + demand %d), got %d", - effectiveTarget, staticTarget, demand, got) - } - if store.neutralSpawnCalls != effectiveTarget { - t.Fatalf("expected %d neutral spawn slot allocations, got %d", - effectiveTarget, store.neutralSpawnCalls) - } -} diff --git a/controlplane/multitenant.go b/controlplane/multitenant.go index 4890706..a550671 100644 --- a/controlplane/multitenant.go +++ b/controlplane/multitenant.go @@ -250,56 +250,22 @@ func SetupMultiTenant( return router.sharedPool.retireWorkerWithReason(workerID, reason) } janitor.reconcileWarmCapacity = func() { - // Demand-aware warm-pool scale-up: drain the warm-capacity miss - // counter and add it to the static target. A burst of N cold tenants - // (all returning WarmCapacityExhausted on first try) bumps the next - // tick's effective target by ~N, so the pool spawns toward demand - // in a single tick instead of creeping up at SharedWarmTarget while - // clients politely re-arrive on the 45-second retry hint. Karpenter - // (~30s) and pod scheduling are the real lower bounds on cold-start - // latency, not CP reconciler step size. - // - // Scale-DOWN is intentionally untouched here — the idle reaper still - // runs on its own slower cadence so dipping idle counts don't thrash - // the pool. We only ever scale UP off the demand counter. - staticTarget := router.sharedPool.WarmCapacityTarget() - demand := router.sharedPool.ConsumeWarmCapacityDemand() - target := staticTarget + demand - if target > 0 { - if demand > 0 { - slog.Info("Scaling shared warm pool to absorb demand.", - "static_target", staticTarget, - "observed_demand", demand, - "effective_target", target) - } - observeOrgWorkerSpawn("shared") - if err := router.sharedPool.SpawnMinWorkers(target); err != nil { - slog.Warn("Janitor failed to reconcile shared warm capacity.", "target", target, "error", err) - } + snap := store.Snapshot() + if snap == nil { + return } - - // Per-image warm floor: ensure at least one warm pod for each - // distinct DuckDB version image in active use, so per-org pins - // always find a hot pod waiting instead of paying a cold spawn. - // Run in parallel — SpawnMinWorkersForImage blocks on pod startup, - // and a slow image must not block the others (and the next janitor - // tick) for N×30s. - perImage := router.sharedPool.PerImageWarmTargets() - if len(perImage) > 0 { - var wg sync.WaitGroup - for image, count := range perImage { - wg.Add(1) - go func(image string, count int) { - defer wg.Done() - ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) - defer cancel() - if err := router.sharedPool.SpawnMinWorkersForImage(ctx, image, count); err != nil { - slog.Warn("Janitor failed to reconcile per-image warm capacity.", "image", image, "target", count, "error", err) - } - }(image, count) - } - wg.Wait() + baseTargets := router.computeBaseWarmCapacityTargets(snap) + targets, err := computeEffectiveWarmCapacityTargets( + baseTargets, + store, + cfg.K8s, + janitor.now(), + ) + if err != nil { + slog.Warn("Janitor failed to read dynamic warm-capacity demand; reconciling base warm targets only.", "error", err) } + router.sharedPool.SetPerImageWarmTargets(targets) + reconcileWarmCapacityImageTargets(router.sharedPool, targets) } janitor.retireMismatchedVersionWorker = func() { ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) @@ -435,6 +401,54 @@ func SetupMultiTenant( return store, adpt, apiServer, runtimeTracker, janitorLeader, nil } +type warmCapacityMissAggregateLister interface { + ListWarmCapacityMissesSince(since time.Time, reasons ...configstore.WorkerClaimMissReason) ([]configstore.WarmCapacityMissAggregate, error) +} + +func computeEffectiveWarmCapacityTargets(baseTargets map[string]int, lister warmCapacityMissAggregateLister, cfg K8sConfig, now time.Time) (map[string]int, error) { + dynamicCfg := dynamicWarmCapacityConfigFromK8s(cfg) + if !dynamicCfg.Enabled { + return sanitizeWarmCapacityTargets(baseTargets), nil + } + if lister == nil { + return sanitizeWarmCapacityTargets(baseTargets), nil + } + window := cfg.WarmCapacityMissWindow + if window <= 0 { + window = DefaultWarmCapacityMissWindow + } + if now.IsZero() { + now = time.Now() + } + aggregates, err := lister.ListWarmCapacityMissesSince(now.Add(-window), configstore.WorkerClaimMissReasonNoIdle) + if err != nil { + return sanitizeWarmCapacityTargets(baseTargets), err + } + return computeDynamicWarmCapacityTargets(baseTargets, aggregates, dynamicCfg), nil +} + +func reconcileWarmCapacityImageTargets(pool *K8sWorkerPool, targets map[string]int) { + if pool == nil || len(targets) == 0 { + return + } + var wg sync.WaitGroup + for image, count := range targets { + if count <= 0 { + continue + } + wg.Add(1) + go func(image string, count int) { + defer wg.Done() + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + if err := pool.SpawnMinWorkersForImage(ctx, image, count); err != nil { + slog.Warn("Janitor failed to reconcile image warm capacity.", "image", image, "target", count, "error", err) + } + }(image, count) + } + wg.Wait() +} + func resolveK8sNamespace(namespace string) (string, error) { if namespace != "" { return namespace, nil diff --git a/controlplane/multitenant_warm_capacity_test.go b/controlplane/multitenant_warm_capacity_test.go new file mode 100644 index 0000000..01af29d --- /dev/null +++ b/controlplane/multitenant_warm_capacity_test.go @@ -0,0 +1,180 @@ +//go:build kubernetes + +package controlplane + +import ( + "context" + "errors" + "fmt" + "reflect" + "sync" + "testing" + "time" + + "github.com/posthog/duckgres/controlplane/configstore" +) + +type captureWarmCapacityMissLister struct { + aggregates []configstore.WarmCapacityMissAggregate + err error + calls int + since time.Time + reasons []configstore.WorkerClaimMissReason +} + +func (l *captureWarmCapacityMissLister) ListWarmCapacityMissesSince(since time.Time, reasons ...configstore.WorkerClaimMissReason) ([]configstore.WarmCapacityMissAggregate, error) { + l.calls++ + l.since = since + l.reasons = append([]configstore.WorkerClaimMissReason(nil), reasons...) + if l.err != nil { + return nil, l.err + } + return l.aggregates, nil +} + +func TestComputeEffectiveWarmCapacityTargetsDisabledSkipsDemandRead(t *testing.T) { + lister := &captureWarmCapacityMissLister{ + err: errors.New("should not be called"), + } + + got, err := computeEffectiveWarmCapacityTargets( + map[string]int{"posthog/duckgres:default": 4}, + lister, + K8sConfig{DynamicWarmCapacityEnabled: false}, + time.Date(2026, time.March, 26, 15, 0, 0, 0, time.UTC), + ) + if err != nil { + t.Fatalf("expected disabled dynamic mode to skip demand read, got err %v", err) + } + if lister.calls != 0 { + t.Fatalf("expected disabled dynamic mode not to read demand buckets, got %d calls", lister.calls) + } + want := map[string]int{"posthog/duckgres:default": 4} + if !reflect.DeepEqual(got, want) { + t.Fatalf("expected base targets %v, got %v", want, got) + } +} + +func TestComputeEffectiveWarmCapacityTargetsReadsBucketsForImageTargets(t *testing.T) { + now := time.Date(2026, time.March, 26, 15, 0, 0, 0, time.UTC) + lister := &captureWarmCapacityMissLister{ + aggregates: []configstore.WarmCapacityMissAggregate{ + warmCapacityMissAggregate("image:posthog/duckgres:default", configstore.WorkerClaimMissReasonNoIdle, 8), + warmCapacityMissAggregate("image:posthog/duckgres:v1.5.1", configstore.WorkerClaimMissReasonNoIdle, 16), + }, + } + + got, err := computeEffectiveWarmCapacityTargets( + map[string]int{ + "posthog/duckgres:default": 4, + "posthog/duckgres:v1.5.1": 1, + }, + lister, + K8sConfig{ + DynamicWarmCapacityEnabled: true, + WarmCapacityMissWindow: 2 * time.Minute, + WarmCapacityMissesPerWorker: 8, + }, + now, + ) + if err != nil { + t.Fatalf("compute effective warm targets: %v", err) + } + if lister.calls != 1 { + t.Fatalf("expected one demand bucket read, got %d", lister.calls) + } + if wantSince := now.Add(-2 * time.Minute); !lister.since.Equal(wantSince) { + t.Fatalf("expected demand read since %v, got %v", wantSince, lister.since) + } + wantReasons := []configstore.WorkerClaimMissReason{configstore.WorkerClaimMissReasonNoIdle} + if !reflect.DeepEqual(lister.reasons, wantReasons) { + t.Fatalf("expected demand read reasons %v, got %v", wantReasons, lister.reasons) + } + want := map[string]int{ + "posthog/duckgres:default": 5, + "posthog/duckgres:v1.5.1": 3, + } + if !reflect.DeepEqual(got, want) { + t.Fatalf("expected dynamic targets %v, got %v", want, got) + } +} + +func TestComputeEffectiveWarmCapacityTargetsFallsBackToBaseOnReadError(t *testing.T) { + lister := &captureWarmCapacityMissLister{ + err: errors.New("configstore read failed"), + } + + got, err := computeEffectiveWarmCapacityTargets( + map[string]int{"posthog/duckgres:default": 4}, + lister, + K8sConfig{ + DynamicWarmCapacityEnabled: true, + WarmCapacityMissWindow: 2 * time.Minute, + WarmCapacityMissesPerWorker: 8, + }, + time.Date(2026, time.March, 26, 15, 0, 0, 0, time.UTC), + ) + if err == nil { + t.Fatal("expected demand read error") + } + want := map[string]int{"posthog/duckgres:default": 4} + if !reflect.DeepEqual(got, want) { + t.Fatalf("expected base targets on read error %v, got %v", want, got) + } +} + +func TestReconcileWarmCapacityImageTargetsSpawnsPerImageTargets(t *testing.T) { + pool, _ := newTestK8sPool(t, 0) + store := &captureRuntimeWorkerStore{} + pool.runtimeStore = store + + var mu sync.Mutex + nextID := 1000 + slotCounts := map[string]int{} + store.perImageSpawnedFunc = func(image string) *configstore.WorkerRecord { + mu.Lock() + defer mu.Unlock() + nextID++ + slotCounts[image]++ + return &configstore.WorkerRecord{ + WorkerID: nextID, + PodName: fmt.Sprintf("duckgres-worker-test-cp-%d", nextID), + Image: image, + State: configstore.WorkerStateSpawning, + OwnerCPInstanceID: pool.cpInstanceID, + } + } + var spawned int + pool.spawnWarmWorkerFunc = func(ctx context.Context, id int) error { + mu.Lock() + defer mu.Unlock() + spawned++ + return nil + } + + reconcileWarmCapacityImageTargets(pool, map[string]int{ + "posthog/duckgres:default": 2, + "posthog/duckgres:v1.5.1": 1, + }) + + mu.Lock() + gotSlots := map[string]int{} + for image, count := range slotCounts { + gotSlots[image] = count + } + gotSpawned := spawned + mu.Unlock() + wantSlots := map[string]int{ + "posthog/duckgres:default": 2, + "posthog/duckgres:v1.5.1": 1, + } + if !reflect.DeepEqual(gotSlots, wantSlots) { + t.Fatalf("expected per-image slot requests %v, got %v", wantSlots, gotSlots) + } + if gotSpawned != 3 { + t.Fatalf("expected 3 per-image warm spawns, got %d", gotSpawned) + } + if store.neutralSpawnCalls != 0 { + t.Fatalf("expected no image-blind neutral warm spawns, got %d", store.neutralSpawnCalls) + } +} diff --git a/duckgres.example.yaml b/duckgres.example.yaml index c901cd5..5697852 100644 --- a/duckgres.example.yaml +++ b/duckgres.example.yaml @@ -124,6 +124,15 @@ ducklake: # # Neutral shared warm-worker target for the default worker image. Separate # # from per-org limits and process.min_workers. 0 disables shared prewarming. # shared_warm_target: 0 +# # Dynamic warm capacity uses shared configstore miss buckets to raise +# # image-specific warm targets after recent no-idle misses. +# dynamic_warm_capacity_enabled: true +# warm_capacity_miss_window: "2m" +# warm_capacity_misses_per_worker: 8 +# # Retain miss buckets at least as long as the miss window. +# warm_capacity_demand_ttl: "15m" +# warm_capacity_dynamic_image_ceiling: 0 +# warm_capacity_dynamic_total_ceiling: 0 # Process isolation (default: true) # Each client connection spawns a separate OS process, so a DuckDB crash diff --git a/main.go b/main.go index 808a12e..fc47d4d 100644 --- a/main.go +++ b/main.go @@ -137,6 +137,12 @@ func main() { fmt.Fprintf(os.Stderr, " DUCKGRES_INTERNAL_SECRET Shared secret for API authentication\n") fmt.Fprintf(os.Stderr, " DUCKGRES_K8S_MAX_WORKERS Max K8s workers in the shared pool (0=unbounded)\n") fmt.Fprintf(os.Stderr, " DUCKGRES_K8S_SHARED_WARM_TARGET Neutral shared warm-worker target for K8s multi-tenant mode\n") + fmt.Fprintf(os.Stderr, " DUCKGRES_K8S_DYNAMIC_WARM_CAPACITY_ENABLED Enable dynamic warm-capacity targets (default: true)\n") + fmt.Fprintf(os.Stderr, " DUCKGRES_K8S_WARM_CAPACITY_MISS_WINDOW Recent no-idle miss window for dynamic targets (default: 2m)\n") + fmt.Fprintf(os.Stderr, " DUCKGRES_K8S_WARM_CAPACITY_MISSES_PER_WORKER Misses per extra warm worker (default: 8)\n") + fmt.Fprintf(os.Stderr, " DUCKGRES_K8S_WARM_CAPACITY_DEMAND_TTL Retention TTL for warm-capacity miss buckets (default: 15m)\n") + fmt.Fprintf(os.Stderr, " DUCKGRES_K8S_WARM_CAPACITY_DYNAMIC_IMAGE_CEILING Max dynamic extra warm workers per image (0=unlimited)\n") + fmt.Fprintf(os.Stderr, " DUCKGRES_K8S_WARM_CAPACITY_DYNAMIC_TOTAL_CEILING Max dynamic extra warm workers across images (0=unlimited)\n") fmt.Fprintf(os.Stderr, " DUCKGRES_AWS_REGION AWS region for STS client\n") fmt.Fprintf(os.Stderr, " DUCKGRES_LOG_LEVEL Log level: debug, info, warn, error (default: info)\n") fmt.Fprintf(os.Stderr, "\nPrecedence: CLI flags > environment variables > config file > defaults\n") From fea8ac75a9dfeb46b37753c587b58ab90625af0a Mon Sep 17 00:00:00 2001 From: Bill Guowei Yang Date: Thu, 21 May 2026 17:03:16 -0400 Subject: [PATCH 2/2] Fix warm capacity helper build tags --- controlplane/warm_capacity_targets.go | 25 ------------------- controlplane/warm_capacity_targets_k8s.go | 30 +++++++++++++++++++++++ 2 files changed, 30 insertions(+), 25 deletions(-) create mode 100644 controlplane/warm_capacity_targets_k8s.go diff --git a/controlplane/warm_capacity_targets.go b/controlplane/warm_capacity_targets.go index 8105bff..4def31c 100644 --- a/controlplane/warm_capacity_targets.go +++ b/controlplane/warm_capacity_targets.go @@ -15,31 +15,6 @@ type dynamicWarmCapacityConfig struct { MaxWorkers int } -func dynamicWarmCapacityConfigFromK8s(cfg K8sConfig) dynamicWarmCapacityConfig { - return dynamicWarmCapacityConfig{ - Enabled: cfg.DynamicWarmCapacityEnabled, - MissesPerWorker: cfg.WarmCapacityMissesPerWorker, - DynamicImageCeiling: cfg.WarmCapacityDynamicImageCeiling, - DynamicTotalCeiling: cfg.WarmCapacityDynamicTotalCeiling, - MaxWorkers: cfg.MaxWorkers, - } -} - -func computeBaseWarmCapacityTargets(defaultImage string, sharedWarmTarget int, perImageFloors map[string]int) map[string]int { - out := sanitizeWarmCapacityTargets(perImageFloors) - defaultImage = strings.TrimSpace(defaultImage) - if defaultImage == "" { - return out - } - if sharedWarmTarget < 0 { - sharedWarmTarget = 0 - } - if sharedWarmTarget > out[defaultImage] { - out[defaultImage] = sharedWarmTarget - } - return out -} - func computeDynamicWarmCapacityTargets(baseTargets map[string]int, aggregates []configstore.WarmCapacityMissAggregate, cfg dynamicWarmCapacityConfig) map[string]int { targets := sanitizeWarmCapacityTargets(baseTargets) if !cfg.Enabled { diff --git a/controlplane/warm_capacity_targets_k8s.go b/controlplane/warm_capacity_targets_k8s.go new file mode 100644 index 0000000..53b2a8b --- /dev/null +++ b/controlplane/warm_capacity_targets_k8s.go @@ -0,0 +1,30 @@ +//go:build kubernetes + +package controlplane + +import "strings" + +func dynamicWarmCapacityConfigFromK8s(cfg K8sConfig) dynamicWarmCapacityConfig { + return dynamicWarmCapacityConfig{ + Enabled: cfg.DynamicWarmCapacityEnabled, + MissesPerWorker: cfg.WarmCapacityMissesPerWorker, + DynamicImageCeiling: cfg.WarmCapacityDynamicImageCeiling, + DynamicTotalCeiling: cfg.WarmCapacityDynamicTotalCeiling, + MaxWorkers: cfg.MaxWorkers, + } +} + +func computeBaseWarmCapacityTargets(defaultImage string, sharedWarmTarget int, perImageFloors map[string]int) map[string]int { + out := sanitizeWarmCapacityTargets(perImageFloors) + defaultImage = strings.TrimSpace(defaultImage) + if defaultImage == "" { + return out + } + if sharedWarmTarget < 0 { + sharedWarmTarget = 0 + } + if sharedWarmTarget > out[defaultImage] { + out[defaultImage] = sharedWarmTarget + } + return out +}