Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 24 additions & 2 deletions controlplane/configstore/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -1078,8 +1078,17 @@ func (cs *ConfigStore) CreateNeutralWarmWorkerSlot(ownerCPInstanceID, podNamePre
// Re-listing terminal rows here would loop the janitor on K8s 404s.
//
// The join switched from INNER to LEFT in Apr 2026 to handle (2)/(3);
// the original implementation only handled (1). See
// TestListOrphanedWorkers* in tests/configstore for the regression
// the original implementation only handled (1).
//
// Apr 2026 also added an exclusion for workers with reclaimable Flight
// sessions: a row with at least one flight_session_records entry in
// active or reconnecting state is spared from orphan retirement so a
// customer reconnecting by session token can still pick up their query
// (see TakeOverWorker). Once the session record itself becomes terminal
// (expired/closed via ExpireFlightSessionRecords), the worker is
// retired normally on the next sweep.
//
// See TestListOrphanedWorkers* in tests/configstore for the regression
// fixtures.
func (cs *ConfigStore) ListOrphanedWorkers(before time.Time) ([]WorkerRecord, error) {
var workers []WorkerRecord
Expand All @@ -1092,8 +1101,13 @@ func (cs *ConfigStore) ListOrphanedWorkers(before time.Time) ([]WorkerRecord, er
WorkerStateHotIdle,
WorkerStateDraining,
}
reclaimableSessionStates := []FlightSessionState{
FlightSessionStateActive,
FlightSessionStateReconnecting,
}
workerTable := cs.runtimeTable((&WorkerRecord{}).TableName())
cpTable := cs.runtimeTable((&ControlPlaneInstance{}).TableName())
flightTable := cs.runtimeTable((&FlightSessionRecord{}).TableName())
err := cs.db.Table(workerTable+" AS w").
Select("w.*").
Joins("LEFT JOIN "+cpTable+" AS cp ON cp.id = w.owner_cp_instance_id").
Expand All @@ -1109,6 +1123,14 @@ func (cs *ConfigStore) ListOrphanedWorkers(before time.Time) ([]WorkerRecord, er
before,
before,
).
// Spare workers with at least one reclaimable Flight session: a
// retire here would kill the customer's mid-flight query at the
// moment they reconnect by session token. Bounded by
// ExpireFlightSessionRecords — once the session record is moved
// to a terminal state, the worker is no longer protected.
Where("NOT EXISTS (SELECT 1 FROM "+flightTable+" AS f "+
"WHERE f.worker_id = w.worker_id AND f.state IN ?)",
reclaimableSessionStates).
Order("w.worker_id ASC").
Find(&workers).Error
if err != nil {
Expand Down
29 changes: 24 additions & 5 deletions controlplane/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type ControlPlaneConfig struct {
WorkerQueueTimeout time.Duration // How long to wait for an available worker slot (default: 5m)
WorkerIdleTimeout time.Duration // How long to keep an idle worker alive (default: 5m)
RetireOnSessionEnd bool // When true, process workers are retired immediately after their last session ends.
HandoverDrainTimeout time.Duration // How long to wait for connections to drain during upgrade (default: 24h)
HandoverDrainTimeout time.Duration // How long to wait for connections to drain during upgrade. 0 = unbounded (wait until k8s SIGKILL via terminationGracePeriodSeconds). Default: 0 in remote mode (so a CP rolling out doesn't kill in-flight customer queries at a self-imposed wall — see drainAndShutdown), 24h in process mode.
MetricsServer *http.Server // Optional metrics server to shut down during upgrade

// WorkerBackend selects the worker management backend.
Expand Down Expand Up @@ -187,8 +187,15 @@ func RunControlPlane(cfg ControlPlaneConfig) {
cfg.WorkerIdleTimeout = 5 * time.Minute
}
if cfg.HandoverDrainTimeout == 0 {
// Remote mode: no internal drain timeout. The CP waits for active
// sessions to finish for as long as it takes; k8s
// terminationGracePeriodSeconds is the only hard wall. The
// previous 15m default cut off in-flight customer queries that
// just happened to be running at the wall (see the worker-40761
// incident) — moving the wall doesn't fix that race, removing it
// does. Process mode keeps 24h since there's no k8s safety net.
if cfg.WorkerBackend == "remote" {
cfg.HandoverDrainTimeout = 15 * time.Minute
cfg.HandoverDrainTimeout = 0
} else {
cfg.HandoverDrainTimeout = 24 * time.Hour
}
Expand Down Expand Up @@ -1257,7 +1264,11 @@ func (cp *ControlPlane) drainAndShutdown(timeout time.Duration) {
if cp.flight != nil {
cp.flight.BeginDrain()
}
slog.Info("Waiting for planned shutdown drain.", "timeout", timeout)
if timeout > 0 {
slog.Info("Waiting for planned shutdown drain.", "timeout", timeout)
} else {
slog.Info("Waiting for planned shutdown drain (unbounded — k8s SIGKILL is the wall).")
}
if cp.waitForDrain(timeout) {
slog.Info("All pgwire connections and Flight sessions drained before shutdown.")
} else {
Expand All @@ -1280,9 +1291,17 @@ func (cp *ControlPlane) stopAcceptingPGConnections() {
}
}

// waitForDrain blocks until both the pgwire and Flight server report
// zero in-flight work, or the timeout fires. timeout == 0 means
// unbounded — k8s terminationGracePeriodSeconds becomes the only wall.
// Returns true on clean drain, false on timeout.
func (cp *ControlPlane) waitForDrain(timeout time.Duration) bool {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
ctx := context.Background()
if timeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, timeout)
defer cancel()
}

pgDone := make(chan struct{})
go func() {
Expand Down
32 changes: 30 additions & 2 deletions controlplane/k8s_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -2015,8 +2015,32 @@ func (p *K8sWorkerPool) ShutdownAll() {
close(p.stopInform)

ctx := context.Background()
preserved := make(map[int]*ManagedWorker)
for _, w := range workers {
podName := p.workerPodName(w)

// A worker that's serving sessions must not be torn down during
// CP shutdown — pod-deletion would kill in-flight customer queries
// at exactly the moment ShutdownAll runs (the failure mode hit by
// the production worker-40761 incident on a 15-minute drain wall).
// Leave the worker in `hot` state, owned by this dying CP. Two
// downstream guarantees keep this safe:
// (1) Flight clients can reconnect by session token; a peer CP
// claims via TakeOverWorker and the query resumes.
// (2) ListOrphanedWorkers' JOIN against flight_session_records
// prevents peer CPs' janitors from retiring the worker
// while a session is still active or reconnecting; once the
// session record is expired the worker is retired normally.
// pgwire customers connected to this CP have already lost their
// connection (the CP socket is going away) — protecting the
// worker doesn't help them, but it doesn't hurt either.
if w.activeSessions > 0 {
slog.Info("ShutdownAll: worker has active sessions; leaving pod alive for Flight reconnect.",
"id", w.ID, "worker_pod", podName, "active_sessions", w.activeSessions)
preserved[w.ID] = w
continue
}

slog.Info("Shutting down K8s worker.", "id", w.ID, "worker_pod", podName)

// Step 1: CAS to draining. Skip the worker on CAS miss or error —
Expand Down Expand Up @@ -2068,10 +2092,14 @@ func (p *K8sWorkerPool) ShutdownAll() {
p.mu.Unlock()
}

// Workers preserved due to active sessions stay in the in-memory map
// so any remaining session bookkeeping inside this CP still finds them
// during the residual shutdown window. The map is wiped on process
// exit; preservation is purely about not yanking the rug here.
p.mu.Lock()
p.workers = make(map[int]*ManagedWorker)
p.workers = preserved
p.mu.Unlock()
observeControlPlaneWorkers(0)
observeControlPlaneWorkers(len(preserved))
}

// retireWorkerPod closes the gRPC client and deletes the worker pod.
Expand Down
45 changes: 44 additions & 1 deletion controlplane/k8s_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1502,7 +1502,7 @@ func TestK8sPoolActivateReservedWorkerPersistsActivatingThenHotWorkerRecord(t *t
// TestK8sPoolWorkerRecordForIdleStampsOwnerCPInstanceID guards against the
// warm-pool churn loop. workerRecordFor used to clear OwnerCPInstanceID
// whenever state==Idle, which left every freshly-spawned warm worker
// matching ListOrphanedWorkers case (2) (NULLIF(owner_cp_instance_id, '') IS
// matching ListOrphanedWorkers case (2) (NULLIF(owner_cp_instance_id, ) IS
// NULL AND last_heartbeat_at <= before) the moment it crossed the orphan
// grace. The janitor then retired it, the warm pool replenished, and the
// loop repeated indefinitely. Stamping warm workers with the creating CP's
Expand Down Expand Up @@ -2790,6 +2790,49 @@ func TestShutdownAll_LeavesInDrainingWhenPodDeleteFails(t *testing.T) {
}
}

// TestShutdownAll_SparesWorkersWithActiveSessions: a CP receiving SIGTERM
// must not pod-delete a worker that's mid-query. Today the chain runs for
// every owned worker regardless of session count, which collapses every
// in-flight query at the moment ShutdownAll fires (the failure mode the
// production incident hit on a 15-minute drain wall).
//
// After the fix, busy workers (activeSessions > 0) are skipped — left in
// hot/serving state, owned by the dying CP. Customer Flight clients can
// reconnect via session token; the orphan janitor's flight-session JOIN
// (Layer 3) prevents peer CPs from retiring them while a session record
// is still active. Idle workers (activeSessions == 0) drain normally.
func TestShutdownAll_SparesWorkersWithActiveSessions(t *testing.T) {
store := &captureRuntimeWorkerStore{}
pool, cs := shutdownTestPool(t, store)

busy := addShutdownWorker(t, pool, cs, 1)
busy.activeSessions = 2

addShutdownWorker(t, pool, cs, 2) // idle (activeSessions == 0)

pool.ShutdownAll()

// Busy worker: pod survives, no DB transitions on its row.
if !podExists(t, cs, "worker-1") {
t.Fatal("worker-1 has active sessions; ShutdownAll must not delete its pod")
}
for _, id := range store.markDrainingCalledIDs {
if id == 1 {
t.Fatalf("MarkWorkerDraining called for busy worker 1; ShutdownAll must skip it (calls=%v)", store.markDrainingCalledIDs)
}
}
for _, id := range store.retireDrainingCalledIDs {
if id == 1 {
t.Fatalf("RetireDrainingWorker called for busy worker 1; ShutdownAll must skip it (calls=%v)", store.retireDrainingCalledIDs)
}
}

// Idle worker: drained as before.
if podExists(t, cs, "worker-2") {
t.Fatal("worker-2 is idle; ShutdownAll should have deleted its pod")
}
}

func TestShutdownAll_TreatsPodNotFoundAsDeleteSuccess(t *testing.T) {
// NotFound means another actor already removed the pod (node eviction,
// a racing CP during split-brain, manual kubectl delete). The state
Expand Down
Loading
Loading