Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 16 additions & 2 deletions controlplane/configstore/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,8 @@ type ManagedWarehouseIceberg struct {
// UUID. Iceberg REST clients pass this as the `warehouse` parameter to
// /v1/config and the server returns the UUID as a prefix for subsequent
// calls. PR2's worker-side ATTACH SQL uses this value directly.
LakekeeperWarehouse string `gorm:"size:128" json:"lakekeeper_warehouse,omitempty"`
LakekeeperClientID string `gorm:"size:128" json:"lakekeeper_client_id,omitempty"`
LakekeeperWarehouse string `gorm:"size:128" json:"lakekeeper_warehouse,omitempty"`
LakekeeperClientID string `gorm:"size:128" json:"lakekeeper_client_id,omitempty"`

// LakekeeperOAuth2ServerURI is the OAuth2 token endpoint URI for the
// duckling-side CREATE SECRET. Empty during PR1 (allowall mode);
Expand Down Expand Up @@ -360,6 +360,20 @@ const (
WorkerClaimMissReasonShuttingDown WorkerClaimMissReason = "shutting_down"
)

const WarmCapacityMissBucketSize = 10 * time.Second

// WarmCapacityMissBucket stores foreground warm-capacity misses in coarse time
// buckets so every control-plane pod contributes to one shared demand signal.
type WarmCapacityMissBucket struct {
Scope string `gorm:"primaryKey;type:text" json:"scope"`
Reason WorkerClaimMissReason `gorm:"primaryKey;size:64" json:"reason"`
BucketStart time.Time `gorm:"primaryKey;index" json:"bucket_start"`
Count int64 `gorm:"not null" json:"count"`
UpdatedAt time.Time `gorm:"not null;index" json:"updated_at"`
}

func (WarmCapacityMissBucket) TableName() string { return "warm_capacity_miss_buckets" }

// WorkerRecord is the durable runtime coordination record for one worker pod.
type WorkerRecord struct {
WorkerID int `gorm:"primaryKey" json:"worker_id"`
Expand Down
69 changes: 67 additions & 2 deletions controlplane/configstore/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -746,6 +746,7 @@ func autoMigrateRuntimeTables(db *gorm.DB, runtimeSchema string) error {
{table: runtimeSchema + ".cp_instances", model: &ControlPlaneInstance{}},
{table: runtimeSchema + ".worker_records", model: &WorkerRecord{}},
{table: runtimeSchema + ".flight_session_records", model: &FlightSessionRecord{}},
{table: runtimeSchema + ".warm_capacity_miss_buckets", model: &WarmCapacityMissBucket{}},
} {
if err := db.Table(spec.table).AutoMigrate(spec.model); err != nil {
return err
Expand All @@ -772,6 +773,57 @@ func (cs *ConfigStore) runtimeTable(base string) string {
return cs.runtimeSchema + "." + base
}

// RecordWarmCapacityMiss increments the shared bucket for a foreground warm
// capacity miss. The insert/upsert is atomic so concurrent control-plane pods
// can all contribute to the same scope/reason/bucket row without coordination.
func (cs *ConfigStore) RecordWarmCapacityMiss(scope string, reason WorkerClaimMissReason, now time.Time) error {
scope = strings.TrimSpace(scope)
if scope == "" {
return fmt.Errorf("record warm capacity miss: scope is required")
}
if reason == WorkerClaimMissReasonNone {
return fmt.Errorf("record warm capacity miss: reason is required")
}
if now.IsZero() {
now = time.Now()
}
now = now.UTC()

bucket := WarmCapacityMissBucket{
Scope: scope,
Reason: reason,
BucketStart: now.Truncate(WarmCapacityMissBucketSize),
Count: 1,
UpdatedAt: now,
}
if err := cs.db.Table(cs.runtimeTable(bucket.TableName())).Clauses(clause.OnConflict{
Columns: []clause.Column{
{Name: "scope"},
{Name: "reason"},
{Name: "bucket_start"},
},
DoUpdates: clause.Assignments(map[string]any{
"count": gorm.Expr(`"warm_capacity_miss_buckets"."count" + EXCLUDED."count"`),
"updated_at": now,
}),
}).Create(&bucket).Error; err != nil {
return fmt.Errorf("record warm capacity miss: %w", err)
}
return nil
}

// PruneWarmCapacityMissBuckets removes buckets older than the caller-provided
// cutoff and returns the number of deleted rows.
func (cs *ConfigStore) PruneWarmCapacityMissBuckets(before time.Time) (int64, error) {
result := cs.db.Table(cs.runtimeTable((&WarmCapacityMissBucket{}).TableName())).
Where("bucket_start < ?", before.UTC()).
Delete(&WarmCapacityMissBucket{})
if result.Error != nil {
return 0, fmt.Errorf("prune warm capacity miss buckets: %w", result.Error)
}
return result.RowsAffected, nil
}

// UpsertControlPlaneInstance inserts or updates a runtime control-plane instance row.
func (cs *ConfigStore) UpsertControlPlaneInstance(instance *ControlPlaneInstance) error {
if err := cs.db.Table(cs.runtimeTable(instance.TableName())).Clauses(clause.OnConflict{
Expand Down Expand Up @@ -895,8 +947,11 @@ func (cs *ConfigStore) GetWorkerRecord(workerID int) (*WorkerRecord, error) {
// ClaimIdleWorker atomically claims one idle worker row for a control-plane instance.
// The selected row is locked with SKIP LOCKED and transitioned to reserved while
// incrementing owner_epoch. When maxOrgWorkers is set, org claims are serialized
// under the same advisory lock used for spawn-slot allocation.
func (cs *ConfigStore) ClaimIdleWorker(ownerCPInstanceID, orgID, image string, maxOrgWorkers int) (*WorkerRecord, WorkerClaimMissReason, error) {
// under the same advisory lock used for spawn-slot allocation. maxGlobalWorkers
// is only used to classify an unfulfilled claim after no suitable idle worker
// exists; an existing idle worker remains claimable even when the global pool is
// at capacity.
func (cs *ConfigStore) ClaimIdleWorker(ownerCPInstanceID, orgID, image string, maxOrgWorkers, maxGlobalWorkers int) (*WorkerRecord, WorkerClaimMissReason, error) {
var claimed *WorkerRecord
missReason := WorkerClaimMissReasonNone
err := cs.db.Transaction(func(tx *gorm.DB) error {
Expand Down Expand Up @@ -928,6 +983,16 @@ func (cs *ConfigStore) ClaimIdleWorker(ownerCPInstanceID, orgID, image string, m
err := query.Order("worker_id ASC").Take(&current).Error
if err != nil {
if err == gorm.ErrRecordNotFound {
if maxGlobalWorkers > 0 {
count, err := cs.countActiveWorkers(tx)
if err != nil {
return err
}
if count >= int64(maxGlobalWorkers) {
missReason = WorkerClaimMissReasonGlobalCap
return nil
}
}
missReason = WorkerClaimMissReasonNoIdle
return nil
}
Expand Down
10 changes: 1 addition & 9 deletions controlplane/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -682,15 +682,7 @@ func sessionCreationErrorResponse(err error) (code string, message string) {
var capacityErr *WarmCapacityExhaustedError
switch {
case errors.As(err, &capacityErr):
if capacityErr.missReason() == configstore.WorkerClaimMissReasonOrgCap {
return "53300", "Duckgres worker capacity for this organization is currently exhausted; retry later"
}
retryAfter := capacityErr.RetryAfter
if retryAfter <= 0 {
retryAfter = DefaultWarmCapacityRetryAfter
}
retrySeconds := int((retryAfter + time.Second - 1) / time.Second)
return "53300", fmt.Sprintf("no warm Duckgres worker is currently available; retry in about %d seconds", retrySeconds)
return "53300", warmCapacityMissPolicyForReason(capacityErr.missReason()).sqlMessage(capacityErr.RetryAfter)
case errors.Is(err, context.Canceled):
return "57014", "canceling authentication due to user request"
case errors.Is(err, context.DeadlineExceeded):
Expand Down
66 changes: 56 additions & 10 deletions controlplane/control_cancel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,16 +105,37 @@ func TestSessionCreationErrorResponse(t *testing.T) {
}
})

t.Run("org capacity exhausted", func(t *testing.T) {
code, message := sessionCreationErrorResponse(NewWarmCapacityExhaustedErrorForReason(configstore.WorkerClaimMissReasonOrgCap, 45*time.Second))
if code != "53300" {
t.Fatalf("code = %q, want 53300", code)
}
want := "Duckgres worker capacity for this organization is currently exhausted; retry later"
if message != want {
t.Fatalf("message = %q, want %q", message, want)
}
})
for _, tt := range []struct {
name string
reason configstore.WorkerClaimMissReason
message string
}{
{
name: "org capacity exhausted",
reason: configstore.WorkerClaimMissReasonOrgCap,
message: "Duckgres worker capacity for this organization is currently exhausted; retry later",
},
{
name: "global capacity exhausted",
reason: configstore.WorkerClaimMissReasonGlobalCap,
message: "Duckgres worker capacity is currently exhausted; retry later",
},
{
name: "control plane shutting down",
reason: configstore.WorkerClaimMissReasonShuttingDown,
message: "Duckgres control plane is shutting down; retry later",
},
} {
t.Run(tt.name, func(t *testing.T) {
code, message := sessionCreationErrorResponse(NewWarmCapacityExhaustedErrorForReason(tt.reason, 45*time.Second))
if code != "53300" {
t.Fatalf("code = %q, want 53300", code)
}
if message != tt.message {
t.Fatalf("message = %q, want %q", message, tt.message)
}
})
}

t.Run("generic error", func(t *testing.T) {
code, message := sessionCreationErrorResponse(errors.New("worker activation failed"))
Expand All @@ -126,3 +147,28 @@ func TestSessionCreationErrorResponse(t *testing.T) {
}
})
}

func TestWarmCapacityMissPolicyForKnownReasons(t *testing.T) {
for _, tt := range []struct {
name string
reason configstore.WorkerClaimMissReason
policyReason configstore.WorkerClaimMissReason
recordDynamicDemand bool
}{
{name: "none defaults to no_idle", reason: configstore.WorkerClaimMissReasonNone, policyReason: configstore.WorkerClaimMissReasonNoIdle, recordDynamicDemand: true},
{name: "no_idle", reason: configstore.WorkerClaimMissReasonNoIdle, policyReason: configstore.WorkerClaimMissReasonNoIdle, recordDynamicDemand: true},
{name: "org_cap", reason: configstore.WorkerClaimMissReasonOrgCap, policyReason: configstore.WorkerClaimMissReasonOrgCap, recordDynamicDemand: false},
{name: "global_cap", reason: configstore.WorkerClaimMissReasonGlobalCap, policyReason: configstore.WorkerClaimMissReasonGlobalCap, recordDynamicDemand: false},
{name: "shutting_down", reason: configstore.WorkerClaimMissReasonShuttingDown, policyReason: configstore.WorkerClaimMissReasonShuttingDown, recordDynamicDemand: false},
} {
t.Run(tt.name, func(t *testing.T) {
policy := warmCapacityMissPolicyForReason(tt.reason)
if policy.reason != tt.policyReason {
t.Fatalf("policy reason = %q, want %q", policy.reason, tt.policyReason)
}
if policy.recordDynamicDemand != tt.recordDynamicDemand {
t.Fatalf("recordDynamicDemand = %v, want %v", policy.recordDynamicDemand, tt.recordDynamicDemand)
}
})
}
}
15 changes: 15 additions & 0 deletions controlplane/janitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
const (
janitorRetireReasonOrphaned = "orphaned"
janitorRetireReasonStuckActivating = "stuck_activating"
defaultWarmCapacityMissBucketTTL = 15 * time.Minute
)

type controlPlaneExpiryStore interface {
Expand All @@ -23,6 +24,10 @@ type controlPlaneExpiryStore interface {
RetireHotIdleWorker(workerID int) (bool, error)
}

type warmCapacityMissBucketPruner interface {
PruneWarmCapacityMissBuckets(before time.Time) (int64, error)
}

type ControlPlaneJanitor struct {
store controlPlaneExpiryStore
interval time.Duration
Expand Down Expand Up @@ -162,6 +167,16 @@ func (j *ControlPlaneJanitor) runOnce() {
slog.Warn("Janitor failed to expire stale Flight sessions.", "error", err)
}

if pruner, ok := j.store.(warmCapacityMissBucketPruner); ok {
cutoff := j.now().Add(-defaultWarmCapacityMissBucketTTL)
pruned, err := pruner.PruneWarmCapacityMissBuckets(cutoff)
if err != nil {
slog.Warn("Janitor failed to prune warm capacity miss buckets.", "error", err)
} else if pruned > 0 {
slog.Info("Janitor pruned warm capacity miss buckets.", "count", pruned, "cutoff", cutoff)
}
}

// Gradual rolling replacement of warm workers whose Deployment version
// differs from this CP's. Runs only when this CP holds the janitor
// leader lease, so at most one CP at a time is retiring workers and the
Expand Down
51 changes: 39 additions & 12 deletions controlplane/janitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,21 @@ import (
)

type captureControlPlaneExpiryStore struct {
mu sync.Mutex
cutoffs []time.Time
count int64
expireErr error
drainingCutoffs []time.Time
drainingCount int64
orphanedBefore []time.Time
orphanedWorkers []configstore.WorkerRecord
stuckSpawningBefore []time.Time
stuckActivatingBefore []time.Time
stuckWorkers []configstore.WorkerRecord
expiredSessionsBefore []time.Time
mu sync.Mutex
cutoffs []time.Time
count int64
expireErr error
drainingCutoffs []time.Time
drainingCount int64
orphanedBefore []time.Time
orphanedWorkers []configstore.WorkerRecord
stuckSpawningBefore []time.Time
stuckActivatingBefore []time.Time
stuckWorkers []configstore.WorkerRecord
expiredSessionsBefore []time.Time
pruneMissBucketsBefore []time.Time
prunedMissBucketCount int64
pruneMissBucketErr error
}

func (s *captureControlPlaneExpiryStore) ExpireControlPlaneInstances(cutoff time.Time) (int64, error) {
Expand Down Expand Up @@ -84,6 +87,13 @@ func (s *captureControlPlaneExpiryStore) RetireHotIdleWorker(workerID int) (bool
return true, nil
}

func (s *captureControlPlaneExpiryStore) PruneWarmCapacityMissBuckets(before time.Time) (int64, error) {
s.mu.Lock()
defer s.mu.Unlock()
s.pruneMissBucketsBefore = append(s.pruneMissBucketsBefore, before)
return s.prunedMissBucketCount, s.pruneMissBucketErr
}

func TestControlPlaneJanitorRunExpiresStaleInstances(t *testing.T) {
store := &captureControlPlaneExpiryStore{}
now := time.Date(2026, time.March, 26, 15, 0, 0, 0, time.UTC)
Expand Down Expand Up @@ -115,6 +125,23 @@ func TestControlPlaneJanitorRunExpiresStaleInstances(t *testing.T) {
}
}

func TestControlPlaneJanitorRunPrunesWarmCapacityMissBuckets(t *testing.T) {
store := &captureControlPlaneExpiryStore{}
now := time.Date(2026, time.March, 26, 15, 0, 0, 0, time.UTC)
janitor := NewControlPlaneJanitor(store, 10*time.Millisecond, 20*time.Second)
janitor.now = func() time.Time { return now }

janitor.runOnce()

if len(store.pruneMissBucketsBefore) != 1 {
t.Fatalf("expected one warm capacity miss bucket prune call, got %d", len(store.pruneMissBucketsBefore))
}
want := now.Add(-defaultWarmCapacityMissBucketTTL)
if got := store.pruneMissBucketsBefore[0]; !got.Equal(want) {
t.Fatalf("expected warm capacity miss bucket prune cutoff %v, got %v", want, got)
}
}

func TestControlPlaneJanitorRunRetiresOrphanedAndStuckWorkers(t *testing.T) {
store := &captureControlPlaneExpiryStore{
orphanedWorkers: []configstore.WorkerRecord{
Expand Down
Loading
Loading