diff --git a/controlplane/configstore/models.go b/controlplane/configstore/models.go index 57cf6516..21299132 100644 --- a/controlplane/configstore/models.go +++ b/controlplane/configstore/models.go @@ -157,8 +157,8 @@ type ManagedWarehouseIceberg struct { // UUID. Iceberg REST clients pass this as the `warehouse` parameter to // /v1/config and the server returns the UUID as a prefix for subsequent // calls. PR2's worker-side ATTACH SQL uses this value directly. - LakekeeperWarehouse string `gorm:"size:128" json:"lakekeeper_warehouse,omitempty"` - LakekeeperClientID string `gorm:"size:128" json:"lakekeeper_client_id,omitempty"` + LakekeeperWarehouse string `gorm:"size:128" json:"lakekeeper_warehouse,omitempty"` + LakekeeperClientID string `gorm:"size:128" json:"lakekeeper_client_id,omitempty"` // LakekeeperOAuth2ServerURI is the OAuth2 token endpoint URI for the // duckling-side CREATE SECRET. Empty during PR1 (allowall mode); @@ -360,6 +360,20 @@ const ( WorkerClaimMissReasonShuttingDown WorkerClaimMissReason = "shutting_down" ) +const WarmCapacityMissBucketSize = 10 * time.Second + +// WarmCapacityMissBucket stores foreground warm-capacity misses in coarse time +// buckets so every control-plane pod contributes to one shared demand signal. +type WarmCapacityMissBucket struct { + Scope string `gorm:"primaryKey;type:text" json:"scope"` + Reason WorkerClaimMissReason `gorm:"primaryKey;size:64" json:"reason"` + BucketStart time.Time `gorm:"primaryKey;index" json:"bucket_start"` + Count int64 `gorm:"not null" json:"count"` + UpdatedAt time.Time `gorm:"not null;index" json:"updated_at"` +} + +func (WarmCapacityMissBucket) TableName() string { return "warm_capacity_miss_buckets" } + // WorkerRecord is the durable runtime coordination record for one worker pod. type WorkerRecord struct { WorkerID int `gorm:"primaryKey" json:"worker_id"` diff --git a/controlplane/configstore/store.go b/controlplane/configstore/store.go index 8873209b..a08414d7 100644 --- a/controlplane/configstore/store.go +++ b/controlplane/configstore/store.go @@ -746,6 +746,7 @@ func autoMigrateRuntimeTables(db *gorm.DB, runtimeSchema string) error { {table: runtimeSchema + ".cp_instances", model: &ControlPlaneInstance{}}, {table: runtimeSchema + ".worker_records", model: &WorkerRecord{}}, {table: runtimeSchema + ".flight_session_records", model: &FlightSessionRecord{}}, + {table: runtimeSchema + ".warm_capacity_miss_buckets", model: &WarmCapacityMissBucket{}}, } { if err := db.Table(spec.table).AutoMigrate(spec.model); err != nil { return err @@ -772,6 +773,57 @@ func (cs *ConfigStore) runtimeTable(base string) string { return cs.runtimeSchema + "." + base } +// RecordWarmCapacityMiss increments the shared bucket for a foreground warm +// capacity miss. The insert/upsert is atomic so concurrent control-plane pods +// can all contribute to the same scope/reason/bucket row without coordination. +func (cs *ConfigStore) RecordWarmCapacityMiss(scope string, reason WorkerClaimMissReason, now time.Time) error { + scope = strings.TrimSpace(scope) + if scope == "" { + return fmt.Errorf("record warm capacity miss: scope is required") + } + if reason == WorkerClaimMissReasonNone { + return fmt.Errorf("record warm capacity miss: reason is required") + } + if now.IsZero() { + now = time.Now() + } + now = now.UTC() + + bucket := WarmCapacityMissBucket{ + Scope: scope, + Reason: reason, + BucketStart: now.Truncate(WarmCapacityMissBucketSize), + Count: 1, + UpdatedAt: now, + } + if err := cs.db.Table(cs.runtimeTable(bucket.TableName())).Clauses(clause.OnConflict{ + Columns: []clause.Column{ + {Name: "scope"}, + {Name: "reason"}, + {Name: "bucket_start"}, + }, + DoUpdates: clause.Assignments(map[string]any{ + "count": gorm.Expr(`"warm_capacity_miss_buckets"."count" + EXCLUDED."count"`), + "updated_at": now, + }), + }).Create(&bucket).Error; err != nil { + return fmt.Errorf("record warm capacity miss: %w", err) + } + return nil +} + +// PruneWarmCapacityMissBuckets removes buckets older than the caller-provided +// cutoff and returns the number of deleted rows. +func (cs *ConfigStore) PruneWarmCapacityMissBuckets(before time.Time) (int64, error) { + result := cs.db.Table(cs.runtimeTable((&WarmCapacityMissBucket{}).TableName())). + Where("bucket_start < ?", before.UTC()). + Delete(&WarmCapacityMissBucket{}) + if result.Error != nil { + return 0, fmt.Errorf("prune warm capacity miss buckets: %w", result.Error) + } + return result.RowsAffected, nil +} + // UpsertControlPlaneInstance inserts or updates a runtime control-plane instance row. func (cs *ConfigStore) UpsertControlPlaneInstance(instance *ControlPlaneInstance) error { if err := cs.db.Table(cs.runtimeTable(instance.TableName())).Clauses(clause.OnConflict{ @@ -895,8 +947,11 @@ func (cs *ConfigStore) GetWorkerRecord(workerID int) (*WorkerRecord, error) { // ClaimIdleWorker atomically claims one idle worker row for a control-plane instance. // The selected row is locked with SKIP LOCKED and transitioned to reserved while // incrementing owner_epoch. When maxOrgWorkers is set, org claims are serialized -// under the same advisory lock used for spawn-slot allocation. -func (cs *ConfigStore) ClaimIdleWorker(ownerCPInstanceID, orgID, image string, maxOrgWorkers int) (*WorkerRecord, WorkerClaimMissReason, error) { +// under the same advisory lock used for spawn-slot allocation. maxGlobalWorkers +// is only used to classify an unfulfilled claim after no suitable idle worker +// exists; an existing idle worker remains claimable even when the global pool is +// at capacity. +func (cs *ConfigStore) ClaimIdleWorker(ownerCPInstanceID, orgID, image string, maxOrgWorkers, maxGlobalWorkers int) (*WorkerRecord, WorkerClaimMissReason, error) { var claimed *WorkerRecord missReason := WorkerClaimMissReasonNone err := cs.db.Transaction(func(tx *gorm.DB) error { @@ -928,6 +983,16 @@ func (cs *ConfigStore) ClaimIdleWorker(ownerCPInstanceID, orgID, image string, m err := query.Order("worker_id ASC").Take(¤t).Error if err != nil { if err == gorm.ErrRecordNotFound { + if maxGlobalWorkers > 0 { + count, err := cs.countActiveWorkers(tx) + if err != nil { + return err + } + if count >= int64(maxGlobalWorkers) { + missReason = WorkerClaimMissReasonGlobalCap + return nil + } + } missReason = WorkerClaimMissReasonNoIdle return nil } diff --git a/controlplane/control.go b/controlplane/control.go index cf9f8c39..0367df70 100644 --- a/controlplane/control.go +++ b/controlplane/control.go @@ -682,15 +682,7 @@ func sessionCreationErrorResponse(err error) (code string, message string) { var capacityErr *WarmCapacityExhaustedError switch { case errors.As(err, &capacityErr): - if capacityErr.missReason() == configstore.WorkerClaimMissReasonOrgCap { - return "53300", "Duckgres worker capacity for this organization is currently exhausted; retry later" - } - retryAfter := capacityErr.RetryAfter - if retryAfter <= 0 { - retryAfter = DefaultWarmCapacityRetryAfter - } - retrySeconds := int((retryAfter + time.Second - 1) / time.Second) - return "53300", fmt.Sprintf("no warm Duckgres worker is currently available; retry in about %d seconds", retrySeconds) + return "53300", warmCapacityMissPolicyForReason(capacityErr.missReason()).sqlMessage(capacityErr.RetryAfter) case errors.Is(err, context.Canceled): return "57014", "canceling authentication due to user request" case errors.Is(err, context.DeadlineExceeded): diff --git a/controlplane/control_cancel_test.go b/controlplane/control_cancel_test.go index 55a01b2f..78bcb44f 100644 --- a/controlplane/control_cancel_test.go +++ b/controlplane/control_cancel_test.go @@ -105,16 +105,37 @@ func TestSessionCreationErrorResponse(t *testing.T) { } }) - t.Run("org capacity exhausted", func(t *testing.T) { - code, message := sessionCreationErrorResponse(NewWarmCapacityExhaustedErrorForReason(configstore.WorkerClaimMissReasonOrgCap, 45*time.Second)) - if code != "53300" { - t.Fatalf("code = %q, want 53300", code) - } - want := "Duckgres worker capacity for this organization is currently exhausted; retry later" - if message != want { - t.Fatalf("message = %q, want %q", message, want) - } - }) + for _, tt := range []struct { + name string + reason configstore.WorkerClaimMissReason + message string + }{ + { + name: "org capacity exhausted", + reason: configstore.WorkerClaimMissReasonOrgCap, + message: "Duckgres worker capacity for this organization is currently exhausted; retry later", + }, + { + name: "global capacity exhausted", + reason: configstore.WorkerClaimMissReasonGlobalCap, + message: "Duckgres worker capacity is currently exhausted; retry later", + }, + { + name: "control plane shutting down", + reason: configstore.WorkerClaimMissReasonShuttingDown, + message: "Duckgres control plane is shutting down; retry later", + }, + } { + t.Run(tt.name, func(t *testing.T) { + code, message := sessionCreationErrorResponse(NewWarmCapacityExhaustedErrorForReason(tt.reason, 45*time.Second)) + if code != "53300" { + t.Fatalf("code = %q, want 53300", code) + } + if message != tt.message { + t.Fatalf("message = %q, want %q", message, tt.message) + } + }) + } t.Run("generic error", func(t *testing.T) { code, message := sessionCreationErrorResponse(errors.New("worker activation failed")) @@ -126,3 +147,28 @@ func TestSessionCreationErrorResponse(t *testing.T) { } }) } + +func TestWarmCapacityMissPolicyForKnownReasons(t *testing.T) { + for _, tt := range []struct { + name string + reason configstore.WorkerClaimMissReason + policyReason configstore.WorkerClaimMissReason + recordDynamicDemand bool + }{ + {name: "none defaults to no_idle", reason: configstore.WorkerClaimMissReasonNone, policyReason: configstore.WorkerClaimMissReasonNoIdle, recordDynamicDemand: true}, + {name: "no_idle", reason: configstore.WorkerClaimMissReasonNoIdle, policyReason: configstore.WorkerClaimMissReasonNoIdle, recordDynamicDemand: true}, + {name: "org_cap", reason: configstore.WorkerClaimMissReasonOrgCap, policyReason: configstore.WorkerClaimMissReasonOrgCap, recordDynamicDemand: false}, + {name: "global_cap", reason: configstore.WorkerClaimMissReasonGlobalCap, policyReason: configstore.WorkerClaimMissReasonGlobalCap, recordDynamicDemand: false}, + {name: "shutting_down", reason: configstore.WorkerClaimMissReasonShuttingDown, policyReason: configstore.WorkerClaimMissReasonShuttingDown, recordDynamicDemand: false}, + } { + t.Run(tt.name, func(t *testing.T) { + policy := warmCapacityMissPolicyForReason(tt.reason) + if policy.reason != tt.policyReason { + t.Fatalf("policy reason = %q, want %q", policy.reason, tt.policyReason) + } + if policy.recordDynamicDemand != tt.recordDynamicDemand { + t.Fatalf("recordDynamicDemand = %v, want %v", policy.recordDynamicDemand, tt.recordDynamicDemand) + } + }) + } +} diff --git a/controlplane/janitor.go b/controlplane/janitor.go index 7bf15683..6033b370 100644 --- a/controlplane/janitor.go +++ b/controlplane/janitor.go @@ -11,6 +11,7 @@ import ( const ( janitorRetireReasonOrphaned = "orphaned" janitorRetireReasonStuckActivating = "stuck_activating" + defaultWarmCapacityMissBucketTTL = 15 * time.Minute ) type controlPlaneExpiryStore interface { @@ -23,6 +24,10 @@ type controlPlaneExpiryStore interface { RetireHotIdleWorker(workerID int) (bool, error) } +type warmCapacityMissBucketPruner interface { + PruneWarmCapacityMissBuckets(before time.Time) (int64, error) +} + type ControlPlaneJanitor struct { store controlPlaneExpiryStore interval time.Duration @@ -162,6 +167,16 @@ func (j *ControlPlaneJanitor) runOnce() { slog.Warn("Janitor failed to expire stale Flight sessions.", "error", err) } + if pruner, ok := j.store.(warmCapacityMissBucketPruner); ok { + cutoff := j.now().Add(-defaultWarmCapacityMissBucketTTL) + pruned, err := pruner.PruneWarmCapacityMissBuckets(cutoff) + if err != nil { + slog.Warn("Janitor failed to prune warm capacity miss buckets.", "error", err) + } else if pruned > 0 { + slog.Info("Janitor pruned warm capacity miss buckets.", "count", pruned, "cutoff", cutoff) + } + } + // Gradual rolling replacement of warm workers whose Deployment version // differs from this CP's. Runs only when this CP holds the janitor // leader lease, so at most one CP at a time is retiring workers and the diff --git a/controlplane/janitor_test.go b/controlplane/janitor_test.go index b593b1aa..290b63a1 100644 --- a/controlplane/janitor_test.go +++ b/controlplane/janitor_test.go @@ -10,18 +10,21 @@ import ( ) type captureControlPlaneExpiryStore struct { - mu sync.Mutex - cutoffs []time.Time - count int64 - expireErr error - drainingCutoffs []time.Time - drainingCount int64 - orphanedBefore []time.Time - orphanedWorkers []configstore.WorkerRecord - stuckSpawningBefore []time.Time - stuckActivatingBefore []time.Time - stuckWorkers []configstore.WorkerRecord - expiredSessionsBefore []time.Time + mu sync.Mutex + cutoffs []time.Time + count int64 + expireErr error + drainingCutoffs []time.Time + drainingCount int64 + orphanedBefore []time.Time + orphanedWorkers []configstore.WorkerRecord + stuckSpawningBefore []time.Time + stuckActivatingBefore []time.Time + stuckWorkers []configstore.WorkerRecord + expiredSessionsBefore []time.Time + pruneMissBucketsBefore []time.Time + prunedMissBucketCount int64 + pruneMissBucketErr error } func (s *captureControlPlaneExpiryStore) ExpireControlPlaneInstances(cutoff time.Time) (int64, error) { @@ -84,6 +87,13 @@ func (s *captureControlPlaneExpiryStore) RetireHotIdleWorker(workerID int) (bool return true, nil } +func (s *captureControlPlaneExpiryStore) PruneWarmCapacityMissBuckets(before time.Time) (int64, error) { + s.mu.Lock() + defer s.mu.Unlock() + s.pruneMissBucketsBefore = append(s.pruneMissBucketsBefore, before) + return s.prunedMissBucketCount, s.pruneMissBucketErr +} + func TestControlPlaneJanitorRunExpiresStaleInstances(t *testing.T) { store := &captureControlPlaneExpiryStore{} now := time.Date(2026, time.March, 26, 15, 0, 0, 0, time.UTC) @@ -115,6 +125,23 @@ func TestControlPlaneJanitorRunExpiresStaleInstances(t *testing.T) { } } +func TestControlPlaneJanitorRunPrunesWarmCapacityMissBuckets(t *testing.T) { + store := &captureControlPlaneExpiryStore{} + now := time.Date(2026, time.March, 26, 15, 0, 0, 0, time.UTC) + janitor := NewControlPlaneJanitor(store, 10*time.Millisecond, 20*time.Second) + janitor.now = func() time.Time { return now } + + janitor.runOnce() + + if len(store.pruneMissBucketsBefore) != 1 { + t.Fatalf("expected one warm capacity miss bucket prune call, got %d", len(store.pruneMissBucketsBefore)) + } + want := now.Add(-defaultWarmCapacityMissBucketTTL) + if got := store.pruneMissBucketsBefore[0]; !got.Equal(want) { + t.Fatalf("expected warm capacity miss bucket prune cutoff %v, got %v", want, got) + } +} + func TestControlPlaneJanitorRunRetiresOrphanedAndStuckWorkers(t *testing.T) { store := &captureControlPlaneExpiryStore{ orphanedWorkers: []configstore.WorkerRecord{ diff --git a/controlplane/k8s_pool.go b/controlplane/k8s_pool.go index cf918439..a7d3414e 100644 --- a/controlplane/k8s_pool.go +++ b/controlplane/k8s_pool.go @@ -1385,7 +1385,11 @@ func (p *K8sWorkerPool) ReserveSharedWorker(ctx context.Context, assignment *Wor } } - claimed, missReason, err := p.runtimeStore.ClaimIdleWorker(p.cpInstanceID, assignment.OrgID, assignment.Image, assignment.MaxWorkers) + p.mu.Lock() + maxGlobalWorkers := p.maxWorkers + p.mu.Unlock() + + claimed, missReason, err := p.runtimeStore.ClaimIdleWorker(p.cpInstanceID, assignment.OrgID, assignment.Image, assignment.MaxWorkers, maxGlobalWorkers) if err != nil { return nil, err } @@ -1404,15 +1408,7 @@ func (p *K8sWorkerPool) ReserveSharedWorker(ctx context.Context, assignment *Wor continue } - // Surface unmet demand to the warm-pool reconciler. An OrgCap miss - // is a per-org cap, not a shared-pool shortage, so spawning more - // neutral warm pods won't help — skip those. Any other miss - // (NoIdle, etc.) means the shared warm pool was too small for the - // observed burst; bump the demand counter so the next reconcile - // tick scales toward `idle + recent_misses` in one shot. - if missReason != configstore.WorkerClaimMissReasonOrgCap { - p.warmCapacityMisses.Add(1) - } + p.recordWarmCapacityMiss(assignment, missReason) return nil, NewWarmCapacityExhaustedErrorForReason(missReason, DefaultWarmCapacityRetryAfter) } @@ -1478,14 +1474,38 @@ func (p *K8sWorkerPool) ReserveSharedWorker(ctx context.Context, assignment *Wor p.mu.Unlock() - // Single-CP / no-runtime-store mode: same demand signal as above. The - // next janitor tick scales the warm pool toward absorbed demand - // instead of staying at the static SharedWarmTarget floor. - p.warmCapacityMisses.Add(1) + p.recordWarmCapacityMiss(assignment, configstore.WorkerClaimMissReasonNoIdle) return nil, NewWarmCapacityExhaustedError(DefaultWarmCapacityRetryAfter) } } +func (p *K8sWorkerPool) recordWarmCapacityMiss(assignment *WorkerAssignment, reason configstore.WorkerClaimMissReason) { + policy := warmCapacityMissPolicyForReason(reason) + if !policy.recordDynamicDemand { + return + } + p.warmCapacityMisses.Add(1) + if p.runtimeStore == nil { + return + } + + image := "" + if assignment != nil { + image = strings.TrimSpace(assignment.Image) + } + if image == "" { + image = strings.TrimSpace(p.workerImage) + } + if image == "" { + return + } + + scope := "image:" + image + if err := p.runtimeStore.RecordWarmCapacityMiss(scope, policy.reason, time.Now()); err != nil { + slog.Warn("Failed to record warm capacity miss.", "scope", scope, "reason", policy.reason, "error", err) + } +} + func (p *K8sWorkerPool) reserveClaimedWorker(ctx context.Context, claimed *configstore.WorkerRecord, assignment *WorkerAssignment) (*ManagedWorker, error) { p.mu.Lock() if p.shuttingDown { diff --git a/controlplane/k8s_pool_test.go b/controlplane/k8s_pool_test.go index e5d77ba8..a3e39c18 100644 --- a/controlplane/k8s_pool_test.go +++ b/controlplane/k8s_pool_test.go @@ -34,6 +34,7 @@ type captureRuntimeWorkerStore struct { claimOrgID string claimImage string claimMaxOrgWorkers int + claimMaxGlobalWorkers int spawned *configstore.WorkerRecord spawnErr error spawnCalls int @@ -66,6 +67,10 @@ type captureRuntimeWorkerStore struct { hotIdleClaimMissReason configstore.WorkerClaimMissReason hotIdleClaimCPID string hotIdleClaimOrgID string + recordMissCalls int + recordMissScopes []string + recordMissReasons []configstore.WorkerClaimMissReason + recordMissErr error takenOver *configstore.WorkerRecord takeOverErr error takeOverWorkerID int @@ -123,7 +128,7 @@ func (s *captureRuntimeWorkerStore) snapshot() []configstore.WorkerRecord { return out } -func (s *captureRuntimeWorkerStore) ClaimIdleWorker(ownerCPInstanceID, orgID, image string, maxOrgWorkers int) (*configstore.WorkerRecord, configstore.WorkerClaimMissReason, error) { +func (s *captureRuntimeWorkerStore) ClaimIdleWorker(ownerCPInstanceID, orgID, image string, maxOrgWorkers, maxGlobalWorkers int) (*configstore.WorkerRecord, configstore.WorkerClaimMissReason, error) { s.mu.Lock() defer s.mu.Unlock() s.claimCalls++ @@ -131,6 +136,7 @@ func (s *captureRuntimeWorkerStore) ClaimIdleWorker(ownerCPInstanceID, orgID, im s.claimOrgID = orgID s.claimImage = image s.claimMaxOrgWorkers = maxOrgWorkers + s.claimMaxGlobalWorkers = maxGlobalWorkers if s.claimErr != nil { return nil, configstore.WorkerClaimMissReasonNone, s.claimErr } @@ -153,6 +159,15 @@ func (s *captureRuntimeWorkerStore) ClaimHotIdleWorker(ownerCPInstanceID, orgID return nil, s.hotIdleClaimMissReason, nil } +func (s *captureRuntimeWorkerStore) RecordWarmCapacityMiss(scope string, reason configstore.WorkerClaimMissReason, _ time.Time) error { + s.mu.Lock() + defer s.mu.Unlock() + s.recordMissCalls++ + s.recordMissScopes = append(s.recordMissScopes, scope) + s.recordMissReasons = append(s.recordMissReasons, reason) + return s.recordMissErr +} + func (s *captureRuntimeWorkerStore) CreateSpawningWorkerSlot(ownerCPInstanceID, orgID, image string, ownerEpoch int64, podNamePrefix string, maxOrgWorkers, maxGlobalWorkers int) (*configstore.WorkerRecord, error) { s.mu.Lock() defer s.mu.Unlock() @@ -1417,6 +1432,145 @@ func TestK8sPoolReserveSharedWorkerBackpressuresWhenRuntimeClaimReturnsNil(t *te } } +func TestK8sPoolReserveSharedWorkerRecordsNoIdleMissByResolvedImage(t *testing.T) { + pool, _ := newTestK8sPool(t, 5) + store := &captureRuntimeWorkerStore{ + claimMissReason: configstore.WorkerClaimMissReasonNoIdle, + } + pool.runtimeStore = store + + worker, err := pool.ReserveSharedWorker(context.Background(), &WorkerAssignment{ + OrgID: "analytics", + Image: "duckgres:v2", + }) + var capacityErr *WarmCapacityExhaustedError + if !errors.As(err, &capacityErr) { + t.Fatalf("expected warm capacity exhaustion, got worker=%#v err=%v", worker, err) + } + if worker != nil { + t.Fatalf("expected no worker on capacity miss, got %d", worker.ID) + } + if store.recordMissCalls != 1 { + t.Fatalf("expected one recorded miss, got %d", store.recordMissCalls) + } + if got := store.recordMissScopes[0]; got != "image:duckgres:v2" { + t.Fatalf("expected miss scope image:duckgres:v2, got %q", got) + } + if got := store.recordMissReasons[0]; got != configstore.WorkerClaimMissReasonNoIdle { + t.Fatalf("expected no-idle miss reason, got %q", got) + } + if store.spawnCalls != 0 || store.neutralSpawnCalls != 0 || store.perImageSpawnCalls != 0 { + t.Fatalf("did not expect foreground or async spawn, got spawn=%d neutral=%d per_image=%d", store.spawnCalls, store.neutralSpawnCalls, store.perImageSpawnCalls) + } +} + +func TestK8sPoolReserveSharedWorkerDoesNotRecordOrgCapMiss(t *testing.T) { + pool, _ := newTestK8sPool(t, 5) + store := &captureRuntimeWorkerStore{ + claimMissReason: configstore.WorkerClaimMissReasonOrgCap, + } + pool.runtimeStore = store + + worker, err := pool.ReserveSharedWorker(context.Background(), &WorkerAssignment{ + OrgID: "analytics", + MaxWorkers: 1, + Image: "duckgres:v2", + }) + var capacityErr *WarmCapacityExhaustedError + if !errors.As(err, &capacityErr) { + t.Fatalf("expected warm capacity exhaustion, got worker=%#v err=%v", worker, err) + } + if worker != nil { + t.Fatalf("expected no worker on capacity miss, got %d", worker.ID) + } + if store.recordMissCalls != 0 { + t.Fatalf("expected no recorded miss for org-cap, got %d", store.recordMissCalls) + } +} + +func TestK8sPoolReserveSharedWorkerDoesNotRecordGlobalCapMiss(t *testing.T) { + pool, _ := newTestK8sPool(t, 5) + store := &captureRuntimeWorkerStore{ + claimMissReason: configstore.WorkerClaimMissReasonGlobalCap, + } + pool.runtimeStore = store + pool.SetMaxWorkers(7) + + worker, err := pool.ReserveSharedWorker(context.Background(), &WorkerAssignment{ + OrgID: "analytics", + Image: "duckgres:v2", + }) + var capacityErr *WarmCapacityExhaustedError + if !errors.As(err, &capacityErr) { + t.Fatalf("expected warm capacity exhaustion, got worker=%#v err=%v", worker, err) + } + if worker != nil { + t.Fatalf("expected no worker on capacity miss, got %d", worker.ID) + } + if capacityErr.Reason != configstore.WorkerClaimMissReasonGlobalCap { + t.Fatalf("expected global-cap miss reason, got %q", capacityErr.Reason) + } + if store.claimMaxGlobalWorkers != 7 { + t.Fatalf("expected claim max global workers 7, got %d", store.claimMaxGlobalWorkers) + } + if store.recordMissCalls != 0 { + t.Fatalf("expected no recorded miss for global-cap, got %d", store.recordMissCalls) + } +} + +func TestK8sPoolReserveSharedWorkerDoesNotRecordShuttingDownMiss(t *testing.T) { + pool, _ := newTestK8sPool(t, 5) + store := &captureRuntimeWorkerStore{ + claimMissReason: configstore.WorkerClaimMissReasonShuttingDown, + } + pool.runtimeStore = store + + worker, err := pool.ReserveSharedWorker(context.Background(), &WorkerAssignment{ + OrgID: "analytics", + Image: "duckgres:v2", + }) + var capacityErr *WarmCapacityExhaustedError + if !errors.As(err, &capacityErr) { + t.Fatalf("expected warm capacity exhaustion, got worker=%#v err=%v", worker, err) + } + if worker != nil { + t.Fatalf("expected no worker on capacity miss, got %d", worker.ID) + } + if capacityErr.Reason != configstore.WorkerClaimMissReasonShuttingDown { + t.Fatalf("expected shutting-down miss reason, got %q", capacityErr.Reason) + } + if store.recordMissCalls != 0 { + t.Fatalf("expected no recorded miss for shutting-down, got %d", store.recordMissCalls) + } +} + +func TestK8sPoolReserveSharedWorkerIgnoresWarmCapacityMissRecordError(t *testing.T) { + pool, _ := newTestK8sPool(t, 5) + store := &captureRuntimeWorkerStore{ + claimMissReason: configstore.WorkerClaimMissReasonNoIdle, + recordMissErr: errors.New("recording failed"), + } + pool.runtimeStore = store + + worker, err := pool.ReserveSharedWorker(context.Background(), &WorkerAssignment{ + OrgID: "analytics", + Image: "duckgres:v2", + }) + var capacityErr *WarmCapacityExhaustedError + if !errors.As(err, &capacityErr) { + t.Fatalf("expected warm capacity exhaustion despite record error, got worker=%#v err=%v", worker, err) + } + if worker != nil { + t.Fatalf("expected no worker on capacity miss, got %d", worker.ID) + } + if store.recordMissCalls != 1 { + t.Fatalf("expected one best-effort record attempt, got %d", store.recordMissCalls) + } + if capacityErr.Reason != configstore.WorkerClaimMissReasonNoIdle { + t.Fatalf("expected capacity miss reason no-idle, got %q", capacityErr.Reason) + } +} + func TestK8sPoolReserveSharedWorkerPropagatesRuntimeClaimMissReason(t *testing.T) { pool, _ := newTestK8sPool(t, 5) store := &captureRuntimeWorkerStore{ @@ -1471,6 +1625,28 @@ func TestK8sPoolReserveSharedWorkerPassesOrgCapToRuntimeClaim(t *testing.T) { } } +func TestK8sPoolReserveSharedWorkerPassesGlobalCapToRuntimeClaim(t *testing.T) { + pool, _ := newTestK8sPool(t, 5) + store := &captureRuntimeWorkerStore{} + pool.runtimeStore = store + pool.SetMaxWorkers(11) + + worker, err := pool.ReserveSharedWorker(context.Background(), &WorkerAssignment{ + OrgID: "analytics", + Image: "duckgres:v2", + }) + var capacityErr *WarmCapacityExhaustedError + if !errors.As(err, &capacityErr) { + t.Fatalf("expected warm capacity exhaustion, got worker=%#v err=%v", worker, err) + } + if worker != nil { + t.Fatalf("expected no worker on capacity miss, got %d", worker.ID) + } + if store.claimMaxGlobalWorkers != 11 { + t.Fatalf("expected claim max global workers 11, got %d", store.claimMaxGlobalWorkers) + } +} + func TestK8sPoolClaimSpecificWorkerTakesOverRuntimeWorker(t *testing.T) { pool, _ := newTestK8sPool(t, 5) store := &captureRuntimeWorkerStore{ diff --git a/controlplane/warm_capacity_policy.go b/controlplane/warm_capacity_policy.go new file mode 100644 index 00000000..aadf8603 --- /dev/null +++ b/controlplane/warm_capacity_policy.go @@ -0,0 +1,84 @@ +package controlplane + +import ( + "fmt" + "time" + + "github.com/posthog/duckgres/controlplane/configstore" +) + +type warmCapacityClientMessageKind int + +const ( + warmCapacityMessageNoIdle warmCapacityClientMessageKind = iota + warmCapacityMessageOrgCap + warmCapacityMessageGlobalCap + warmCapacityMessageShuttingDown + warmCapacityMessageGeneric +) + +type warmCapacityMissPolicy struct { + reason configstore.WorkerClaimMissReason + recordDynamicDemand bool + messageKind warmCapacityClientMessageKind +} + +func warmCapacityMissPolicyForReason(reason configstore.WorkerClaimMissReason) warmCapacityMissPolicy { + switch reason { + case configstore.WorkerClaimMissReasonNone, configstore.WorkerClaimMissReasonNoIdle: + return warmCapacityMissPolicy{ + reason: configstore.WorkerClaimMissReasonNoIdle, + recordDynamicDemand: true, + messageKind: warmCapacityMessageNoIdle, + } + case configstore.WorkerClaimMissReasonOrgCap: + return warmCapacityMissPolicy{reason: reason, messageKind: warmCapacityMessageOrgCap} + case configstore.WorkerClaimMissReasonGlobalCap: + return warmCapacityMissPolicy{reason: reason, messageKind: warmCapacityMessageGlobalCap} + case configstore.WorkerClaimMissReasonShuttingDown: + return warmCapacityMissPolicy{reason: reason, messageKind: warmCapacityMessageShuttingDown} + default: + return warmCapacityMissPolicy{reason: reason, messageKind: warmCapacityMessageGeneric} + } +} + +func (p warmCapacityMissPolicy) errorString(retryAfter time.Duration) string { + switch p.messageKind { + case warmCapacityMessageNoIdle: + return fmt.Sprintf("warm worker capacity exhausted; retry in about %s", normalizedWarmCapacityRetryAfter(retryAfter).Round(time.Second)) + case warmCapacityMessageOrgCap: + return "warm worker capacity exhausted for organization" + case warmCapacityMessageGlobalCap: + return "warm worker capacity exhausted by global pool limit" + case warmCapacityMessageShuttingDown: + return "warm worker capacity unavailable while control plane is shutting down" + default: + return "warm worker capacity exhausted" + } +} + +func (p warmCapacityMissPolicy) sqlMessage(retryAfter time.Duration) string { + switch p.messageKind { + case warmCapacityMessageNoIdle: + return fmt.Sprintf("no warm Duckgres worker is currently available; retry in about %d seconds", warmCapacityRetrySeconds(retryAfter)) + case warmCapacityMessageOrgCap: + return "Duckgres worker capacity for this organization is currently exhausted; retry later" + case warmCapacityMessageGlobalCap: + return "Duckgres worker capacity is currently exhausted; retry later" + case warmCapacityMessageShuttingDown: + return "Duckgres control plane is shutting down; retry later" + default: + return "Duckgres worker capacity is currently unavailable; retry later" + } +} + +func normalizedWarmCapacityRetryAfter(retryAfter time.Duration) time.Duration { + if retryAfter <= 0 { + return DefaultWarmCapacityRetryAfter + } + return retryAfter +} + +func warmCapacityRetrySeconds(retryAfter time.Duration) int { + return int((normalizedWarmCapacityRetryAfter(retryAfter) + time.Second - 1) / time.Second) +} diff --git a/controlplane/worker_pool.go b/controlplane/worker_pool.go index 672d3d7e..6fa69bb6 100644 --- a/controlplane/worker_pool.go +++ b/controlplane/worker_pool.go @@ -22,14 +22,7 @@ type WarmCapacityExhaustedError struct { } func (e *WarmCapacityExhaustedError) Error() string { - retryAfter := e.RetryAfter - if retryAfter <= 0 { - retryAfter = DefaultWarmCapacityRetryAfter - } - if e.missReason() == configstore.WorkerClaimMissReasonOrgCap { - return "warm worker capacity exhausted for organization" - } - return fmt.Sprintf("warm worker capacity exhausted; retry in about %s", retryAfter.Round(time.Second)) + return warmCapacityMissPolicyForReason(e.missReason()).errorString(e.RetryAfter) } func (e *WarmCapacityExhaustedError) missReason() configstore.WorkerClaimMissReason { @@ -119,8 +112,9 @@ type K8sWorkerPoolConfig struct { type RuntimeWorkerStore interface { UpsertWorkerRecord(record *configstore.WorkerRecord) error - ClaimIdleWorker(ownerCPInstanceID, orgID, image string, maxOrgWorkers int) (*configstore.WorkerRecord, configstore.WorkerClaimMissReason, error) + ClaimIdleWorker(ownerCPInstanceID, orgID, image string, maxOrgWorkers, maxGlobalWorkers int) (*configstore.WorkerRecord, configstore.WorkerClaimMissReason, error) ClaimHotIdleWorker(ownerCPInstanceID, orgID string) (*configstore.WorkerRecord, configstore.WorkerClaimMissReason, error) + RecordWarmCapacityMiss(scope string, reason configstore.WorkerClaimMissReason, now time.Time) error CreateSpawningWorkerSlot(ownerCPInstanceID, orgID, image string, ownerEpoch int64, podNamePrefix string, maxOrgWorkers, maxGlobalWorkers int) (*configstore.WorkerRecord, error) CreateNeutralWarmWorkerSlot(ownerCPInstanceID, podNamePrefix, image string, targetWarmWorkers, maxGlobalWorkers int) (*configstore.WorkerRecord, error) CreateNeutralWarmWorkerSlotForImage(ownerCPInstanceID, podNamePrefix, image string, perImageTarget, maxGlobalWorkers int) (*configstore.WorkerRecord, error) diff --git a/tests/configstore/helpers_test.go b/tests/configstore/helpers_test.go index c2a5b6bf..4a1abdd1 100644 --- a/tests/configstore/helpers_test.go +++ b/tests/configstore/helpers_test.go @@ -55,6 +55,42 @@ func newIsolatedConfigStore(t *testing.T) *cpconfigstore.ConfigStore { return store } +func newConfigStoreOnSameSchema(t *testing.T, store *cpconfigstore.ConfigStore) *cpconfigstore.ConfigStore { + t.Helper() + + var schema string + if err := store.DB().Raw("SELECT current_schema()").Scan(&schema).Error; err != nil { + t.Fatalf("current schema: %v", err) + } + other, err := cpconfigstore.NewConfigStore("host=127.0.0.1 port=35432 user=postgres password=postgres dbname=testdb sslmode=disable search_path="+schema, time.Hour) + if err != nil { + t.Fatalf("new config store on schema %s: %v", schema, err) + } + sqlDB, err := other.DB().DB() + if err != nil { + t.Fatalf("other store sql db: %v", err) + } + t.Cleanup(func() { + _ = sqlDB.Close() + }) + return other +} + +func assertWarmCapacityMissBucketCount(t *testing.T, store *cpconfigstore.ConfigStore, scope string, reason cpconfigstore.WorkerClaimMissReason, bucketStart time.Time, want int64) { + t.Helper() + + var got int64 + if err := store.DB().Table(store.RuntimeSchema()+".warm_capacity_miss_buckets"). + Where("scope = ? AND reason = ? AND bucket_start = ?", scope, string(reason), bucketStart). + Select("COALESCE(SUM(count), 0)"). + Scan(&got).Error; err != nil { + t.Fatalf("lookup warm capacity miss bucket: %v", err) + } + if got != want { + t.Fatalf("expected bucket count %d for scope=%q reason=%q bucket=%s, got %d", want, scope, reason, bucketStart.Format(time.RFC3339), got) + } +} + func ensureIntegrationPostgres(t *testing.T) { t.Helper() diff --git a/tests/configstore/runtime_store_postgres_test.go b/tests/configstore/runtime_store_postgres_test.go index a86cab8b..1da1f96c 100644 --- a/tests/configstore/runtime_store_postgres_test.go +++ b/tests/configstore/runtime_store_postgres_test.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "strconv" + "sync" "testing" "time" @@ -20,7 +21,7 @@ func TestRuntimeStorePostgres(t *testing.T) { t.Fatal("expected runtime schema to be configured") } - for _, table := range []string{"cp_instances", "worker_records", "flight_session_records"} { + for _, table := range []string{"cp_instances", "worker_records", "flight_session_records", "warm_capacity_miss_buckets"} { var count int64 if err := store.DB().Raw( "SELECT COUNT(*) FROM information_schema.tables WHERE table_schema = ? AND table_name = ?", @@ -110,6 +111,83 @@ func TestRuntimeStorePostgres(t *testing.T) { } } +func TestRecordWarmCapacityMissAggregatesByBucketScopeAndReasonPostgres(t *testing.T) { + store := newIsolatedConfigStore(t) + other := newConfigStoreOnSameSchema(t, store) + + now := time.Date(2026, time.March, 26, 14, 0, 5, 0, time.UTC) + for _, recorder := range []*configstore.ConfigStore{store, other} { + if err := recorder.RecordWarmCapacityMiss("image:duckgres:default", configstore.WorkerClaimMissReasonNoIdle, now); err != nil { + t.Fatalf("RecordWarmCapacityMiss(default): %v", err) + } + } + if err := store.RecordWarmCapacityMiss("image:duckgres:pinned", configstore.WorkerClaimMissReasonNoIdle, now); err != nil { + t.Fatalf("RecordWarmCapacityMiss(pinned): %v", err) + } + if err := store.RecordWarmCapacityMiss("image:duckgres:default", configstore.WorkerClaimMissReasonGlobalCap, now); err != nil { + t.Fatalf("RecordWarmCapacityMiss(default/global-cap): %v", err) + } + + bucketStart := now.Truncate(configstore.WarmCapacityMissBucketSize) + assertWarmCapacityMissBucketCount(t, store, "image:duckgres:default", configstore.WorkerClaimMissReasonNoIdle, bucketStart, 2) + assertWarmCapacityMissBucketCount(t, store, "image:duckgres:default", configstore.WorkerClaimMissReasonGlobalCap, bucketStart, 1) + assertWarmCapacityMissBucketCount(t, store, "image:duckgres:pinned", configstore.WorkerClaimMissReasonNoIdle, bucketStart, 1) +} + +func TestRecordWarmCapacityMissConcurrentWritersPostgres(t *testing.T) { + store := newIsolatedConfigStore(t) + other := newConfigStoreOnSameSchema(t, store) + + now := time.Date(2026, time.March, 26, 14, 5, 5, 0, time.UTC) + recorders := []*configstore.ConfigStore{store, other} + errs := make(chan error, 40) + var wg sync.WaitGroup + for i := 0; i < cap(errs); i++ { + wg.Add(1) + recorder := recorders[i%len(recorders)] + go func() { + defer wg.Done() + errs <- recorder.RecordWarmCapacityMiss("image:duckgres:default", configstore.WorkerClaimMissReasonNoIdle, now) + }() + } + wg.Wait() + close(errs) + for err := range errs { + if err != nil { + t.Fatalf("RecordWarmCapacityMiss concurrent writer: %v", err) + } + } + + bucketStart := now.Truncate(configstore.WarmCapacityMissBucketSize) + assertWarmCapacityMissBucketCount(t, store, "image:duckgres:default", configstore.WorkerClaimMissReasonNoIdle, bucketStart, 40) +} + +func TestPruneWarmCapacityMissBucketsPostgres(t *testing.T) { + store := newIsolatedConfigStore(t) + + oldNow := time.Date(2026, time.March, 26, 14, 0, 5, 0, time.UTC) + newNow := oldNow.Add(15 * time.Minute) + if err := store.RecordWarmCapacityMiss("image:duckgres:default", configstore.WorkerClaimMissReasonNoIdle, oldNow); err != nil { + t.Fatalf("RecordWarmCapacityMiss(old): %v", err) + } + if err := store.RecordWarmCapacityMiss("image:duckgres:default", configstore.WorkerClaimMissReasonNoIdle, newNow); err != nil { + t.Fatalf("RecordWarmCapacityMiss(new): %v", err) + } + + pruned, err := store.PruneWarmCapacityMissBuckets(newNow.Add(-10 * time.Minute)) + if err != nil { + t.Fatalf("PruneWarmCapacityMissBuckets: %v", err) + } + if pruned != 1 { + t.Fatalf("expected one old bucket pruned, got %d", pruned) + } + + oldBucketStart := oldNow.Truncate(configstore.WarmCapacityMissBucketSize) + newBucketStart := newNow.Truncate(configstore.WarmCapacityMissBucketSize) + assertWarmCapacityMissBucketCount(t, store, "image:duckgres:default", configstore.WorkerClaimMissReasonNoIdle, oldBucketStart, 0) + assertWarmCapacityMissBucketCount(t, store, "image:duckgres:default", configstore.WorkerClaimMissReasonNoIdle, newBucketStart, 1) +} + func TestClaimIdleWorkerPostgres(t *testing.T) { store := newIsolatedConfigStore(t) @@ -126,7 +204,7 @@ func TestClaimIdleWorkerPostgres(t *testing.T) { t.Fatalf("UpsertWorkerRecord: %v", err) } - claimed, missReason, err := store.ClaimIdleWorker("cp-new:boot-b", "analytics", "", 0) + claimed, missReason, err := store.ClaimIdleWorker("cp-new:boot-b", "analytics", "", 0, 1) if err != nil { t.Fatalf("ClaimIdleWorker: %v", err) } @@ -167,7 +245,7 @@ func TestClaimIdleWorkerPostgres(t *testing.T) { func TestClaimIdleWorkerReturnsNilWhenNoIdleWorkerExists(t *testing.T) { store := newIsolatedConfigStore(t) - claimed, missReason, err := store.ClaimIdleWorker("cp-new:boot-b", "analytics", "", 0) + claimed, missReason, err := store.ClaimIdleWorker("cp-new:boot-b", "analytics", "", 0, 0) if err != nil { t.Fatalf("ClaimIdleWorker: %v", err) } @@ -179,6 +257,131 @@ func TestClaimIdleWorkerReturnsNilWhenNoIdleWorkerExists(t *testing.T) { } } +func TestClaimIdleWorkerReturnsGlobalCapWhenNoIdleAndGlobalCapReached(t *testing.T) { + store := newIsolatedConfigStore(t) + + now := time.Date(2026, time.March, 26, 13, 15, 0, 0, time.UTC) + if err := store.UpsertWorkerRecord(&configstore.WorkerRecord{ + WorkerID: 8, + PodName: "duckgres-worker-8", + State: configstore.WorkerStateHot, + OrgID: "analytics", + OwnerCPInstanceID: "cp-old:boot-a", + OwnerEpoch: 4, + LastHeartbeatAt: now, + }); err != nil { + t.Fatalf("UpsertWorkerRecord(hot): %v", err) + } + + claimed, missReason, err := store.ClaimIdleWorker("cp-new:boot-b", "billing", "", 0, 1) + if err != nil { + t.Fatalf("ClaimIdleWorker: %v", err) + } + if claimed != nil { + t.Fatalf("expected no claim, got %#v", claimed) + } + if missReason != configstore.WorkerClaimMissReasonGlobalCap { + t.Fatalf("expected global-cap miss reason, got %q", missReason) + } +} + +func TestClaimIdleWorkerReturnsNoIdleWhenBelowGlobalCap(t *testing.T) { + store := newIsolatedConfigStore(t) + + now := time.Date(2026, time.March, 26, 13, 20, 0, 0, time.UTC) + if err := store.UpsertWorkerRecord(&configstore.WorkerRecord{ + WorkerID: 8, + PodName: "duckgres-worker-8", + State: configstore.WorkerStateHot, + OrgID: "analytics", + OwnerCPInstanceID: "cp-old:boot-a", + OwnerEpoch: 4, + LastHeartbeatAt: now, + }); err != nil { + t.Fatalf("UpsertWorkerRecord(hot): %v", err) + } + + claimed, missReason, err := store.ClaimIdleWorker("cp-new:boot-b", "billing", "", 0, 2) + if err != nil { + t.Fatalf("ClaimIdleWorker: %v", err) + } + if claimed != nil { + t.Fatalf("expected no claim, got %#v", claimed) + } + if missReason != configstore.WorkerClaimMissReasonNoIdle { + t.Fatalf("expected no-idle miss reason below global cap, got %q", missReason) + } +} + +func TestClaimIdleWorkerReturnsGlobalCapForImageMissAtGlobalCap(t *testing.T) { + store := newIsolatedConfigStore(t) + + now := time.Date(2026, time.March, 26, 13, 25, 0, 0, time.UTC) + if err := store.UpsertWorkerRecord(&configstore.WorkerRecord{ + WorkerID: 8, + PodName: "duckgres-worker-v1", + State: configstore.WorkerStateIdle, + Image: "duckgres:v1", + OwnerCPInstanceID: "cp-old:boot-a", + OwnerEpoch: 4, + LastHeartbeatAt: now, + }); err != nil { + t.Fatalf("UpsertWorkerRecord(idle): %v", err) + } + + claimed, missReason, err := store.ClaimIdleWorker("cp-new:boot-b", "billing", "duckgres:v2", 0, 1) + if err != nil { + t.Fatalf("ClaimIdleWorker: %v", err) + } + if claimed != nil { + t.Fatalf("expected no claim, got %#v", claimed) + } + if missReason != configstore.WorkerClaimMissReasonGlobalCap { + t.Fatalf("expected global-cap miss reason for image miss, got %q", missReason) + } +} + +func TestClaimIdleWorkerClaimsMatchingIdleWorkerAtGlobalCap(t *testing.T) { + store := newIsolatedConfigStore(t) + + now := time.Date(2026, time.March, 26, 13, 27, 0, 0, time.UTC) + if err := store.UpsertWorkerRecord(&configstore.WorkerRecord{ + WorkerID: 9, + PodName: "duckgres-worker-v2", + State: configstore.WorkerStateIdle, + Image: "duckgres:v2", + OwnerCPInstanceID: "cp-old:boot-a", + OwnerEpoch: 4, + LastHeartbeatAt: now, + }); err != nil { + t.Fatalf("UpsertWorkerRecord(idle): %v", err) + } + + claimed, missReason, err := store.ClaimIdleWorker("cp-new:boot-b", "billing", "duckgres:v2", 0, 1) + if err != nil { + t.Fatalf("ClaimIdleWorker: %v", err) + } + if claimed == nil { + t.Fatal("expected matching idle worker claim to succeed at global cap") + return + } + if missReason != configstore.WorkerClaimMissReasonNone { + t.Fatalf("expected no miss reason on successful claim, got %q", missReason) + } + if claimed.WorkerID != 9 { + t.Fatalf("expected worker id 9, got %d", claimed.WorkerID) + } + if claimed.State != configstore.WorkerStateReserved { + t.Fatalf("expected reserved state, got %q", claimed.State) + } + if claimed.OrgID != "billing" { + t.Fatalf("expected org billing, got %q", claimed.OrgID) + } + if claimed.Image != "duckgres:v2" { + t.Fatalf("expected image duckgres:v2, got %q", claimed.Image) + } +} + func TestClaimIdleWorkerRespectsOrgCapPostgres(t *testing.T) { store := newIsolatedConfigStore(t) @@ -205,7 +408,7 @@ func TestClaimIdleWorkerRespectsOrgCapPostgres(t *testing.T) { t.Fatalf("UpsertWorkerRecord(hot): %v", err) } - claimed, missReason, err := store.ClaimIdleWorker("cp-new:boot-b", "analytics", "", 1) + claimed, missReason, err := store.ClaimIdleWorker("cp-new:boot-b", "analytics", "", 1, 0) if err != nil { t.Fatalf("ClaimIdleWorker: %v", err) } @@ -249,7 +452,7 @@ func TestClaimIdleWorkerRespectsImageAffinity(t *testing.T) { } // Try claiming v2 - claimed, missReason, err := store.ClaimIdleWorker("cp-1", "org-1", "duckgres:v2", 0) + claimed, missReason, err := store.ClaimIdleWorker("cp-1", "org-1", "duckgres:v2", 0, 0) if err != nil { t.Fatalf("ClaimIdleWorker: %v", err) } @@ -261,7 +464,7 @@ func TestClaimIdleWorkerRespectsImageAffinity(t *testing.T) { } // Try claiming v3 (none exist) - claimed, missReason, err = store.ClaimIdleWorker("cp-1", "org-1", "duckgres:v3", 0) + claimed, missReason, err = store.ClaimIdleWorker("cp-1", "org-1", "duckgres:v3", 0, 0) if err != nil { t.Fatalf("ClaimIdleWorker: %v", err) } @@ -273,7 +476,7 @@ func TestClaimIdleWorkerRespectsImageAffinity(t *testing.T) { } // Neutral claim (no image filter) - should get v1 (lowest ID) - claimed, missReason, err = store.ClaimIdleWorker("cp-1", "org-1", "", 0) + claimed, missReason, err = store.ClaimIdleWorker("cp-1", "org-1", "", 0, 0) if err != nil { t.Fatalf("ClaimIdleWorker: %v", err) }