From 062fc2f112d11738d9e517d62fd03162a294f22d Mon Sep 17 00:00:00 2001 From: Benjamin Knofe-Vider Date: Thu, 30 Apr 2026 14:12:24 +0200 Subject: [PATCH 1/2] test(controlplane): protect active sessions from CP shutdown (RED) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Failing tests for the two pieces of state-store + worker-pool behavior that need to change so a CP rollout doesn't kill in-flight customer queries — extends the worker 40761 incident analysis to a regression suite. - TestShutdownAll_SparesWorkersWithActiveSessions: a worker with activeSessions > 0 must be skipped by ShutdownAll. Today the chain marks-draining and pod-deletes every owned worker, killing in-flight queries when the CP receives SIGTERM. - TestListOrphanedWorkersExcludesWorkersWithActiveFlightSessions: a worker with a Flight session record in active state must be spared by ListOrphanedWorkers even if its owning CP has expired. Without this, peer CPs' janitors retire workers the customer can still reclaim by reconnect. - TestListOrphanedWorkersIncludesWorkersWithReconnectingFlightSessions: same protection extends to records in reconnecting state — that's exactly when the customer is in the middle of picking the session back up. - TestListOrphanedWorkersIncludesWorkersWithExpiredFlightSessions: once the Flight session record is terminal (expired/closed) the customer can't reclaim, so the worker should be retired normally. Bounds the protection so a stuck row can't pin a worker forever. Tests fail with the current implementation; the next commit makes them pass. --- controlplane/k8s_pool_test.go | 43 +++++ .../runtime_store_postgres_test.go | 182 ++++++++++++++++++ 2 files changed, 225 insertions(+) diff --git a/controlplane/k8s_pool_test.go b/controlplane/k8s_pool_test.go index fad0836..a1048f0 100644 --- a/controlplane/k8s_pool_test.go +++ b/controlplane/k8s_pool_test.go @@ -2790,6 +2790,49 @@ func TestShutdownAll_LeavesInDrainingWhenPodDeleteFails(t *testing.T) { } } +// TestShutdownAll_SparesWorkersWithActiveSessions: a CP receiving SIGTERM +// must not pod-delete a worker that's mid-query. Today the chain runs for +// every owned worker regardless of session count, which collapses every +// in-flight query at the moment ShutdownAll fires (the failure mode the +// production incident hit on a 15-minute drain wall). +// +// After the fix, busy workers (activeSessions > 0) are skipped — left in +// hot/serving state, owned by the dying CP. Customer Flight clients can +// reconnect via session token; the orphan janitor's flight-session JOIN +// (Layer 3) prevents peer CPs from retiring them while a session record +// is still active. Idle workers (activeSessions == 0) drain normally. +func TestShutdownAll_SparesWorkersWithActiveSessions(t *testing.T) { + store := &captureRuntimeWorkerStore{} + pool, cs := shutdownTestPool(t, store) + + busy := addShutdownWorker(t, pool, cs, 1) + busy.activeSessions = 2 + + addShutdownWorker(t, pool, cs, 2) // idle (activeSessions == 0) + + pool.ShutdownAll() + + // Busy worker: pod survives, no DB transitions on its row. + if !podExists(t, cs, "worker-1") { + t.Fatal("worker-1 has active sessions; ShutdownAll must not delete its pod") + } + for _, id := range store.markDrainingCalledIDs { + if id == 1 { + t.Fatalf("MarkWorkerDraining called for busy worker 1; ShutdownAll must skip it (calls=%v)", store.markDrainingCalledIDs) + } + } + for _, id := range store.retireDrainingCalledIDs { + if id == 1 { + t.Fatalf("RetireDrainingWorker called for busy worker 1; ShutdownAll must skip it (calls=%v)", store.retireDrainingCalledIDs) + } + } + + // Idle worker: drained as before. + if podExists(t, cs, "worker-2") { + t.Fatal("worker-2 is idle; ShutdownAll should have deleted its pod") + } +} + func TestShutdownAll_TreatsPodNotFoundAsDeleteSuccess(t *testing.T) { // NotFound means another actor already removed the pod (node eviction, // a racing CP during split-brain, manual kubectl delete). The state diff --git a/tests/configstore/runtime_store_postgres_test.go b/tests/configstore/runtime_store_postgres_test.go index e672641..58d321a 100644 --- a/tests/configstore/runtime_store_postgres_test.go +++ b/tests/configstore/runtime_store_postgres_test.go @@ -1097,6 +1097,188 @@ func TestMarkCredentialsRefreshedFailsOnOwnerMismatch(t *testing.T) { } } +// TestListOrphanedWorkersExcludesWorkersWithActiveFlightSessions: a +// worker whose owning CP has expired is normally an orphan-cleanup +// candidate. But if a Flight client could still reconnect by session +// token (record is in active or reconnecting state), the orphan retire +// would kill an in-flight customer query the moment they reconnect. The +// JOIN onto flight_session_records gives those workers a reprieve until +// the session record itself is expired by ExpireFlightSessionRecords. +func TestListOrphanedWorkersExcludesWorkersWithActiveFlightSessions(t *testing.T) { + store := newIsolatedConfigStore(t) + now := time.Date(2026, time.April, 30, 14, 0, 0, 0, time.UTC) + + // Owner CP is expired long ago — easily past the 30s orphan grace. + if err := store.UpsertControlPlaneInstance(&configstore.ControlPlaneInstance{ + ID: "cp-old:boot-a", + PodName: "duckgres-old", + PodUID: "pod-old", + BootID: "boot-a", + State: configstore.ControlPlaneInstanceStateExpired, + StartedAt: now.Add(-2 * time.Hour), + LastHeartbeatAt: now.Add(-1 * time.Hour), + ExpiredAt: ptrTime(now.Add(-1 * time.Hour)), + }); err != nil { + t.Fatalf("UpsertControlPlaneInstance: %v", err) + } + + // A hot worker owned by that expired CP — exactly the shape that + // today's orphan janitor would retire. With Layer 3, the active Flight + // session below should spare it. + if err := store.UpsertWorkerRecord(&configstore.WorkerRecord{ + WorkerID: 501, + PodName: "duckgres-worker-501", + State: configstore.WorkerStateHot, + OrgID: "acme", + OwnerCPInstanceID: "cp-old:boot-a", + OwnerEpoch: 2, + LastHeartbeatAt: now.Add(-30 * time.Minute), + }); err != nil { + t.Fatalf("UpsertWorkerRecord: %v", err) + } + + // Reclaimable Flight session pointing at that worker. + if err := store.UpsertFlightSessionRecord(&configstore.FlightSessionRecord{ + SessionToken: "tok-reclaim-501", + Username: "postgres", + OrgID: "acme", + WorkerID: 501, + OwnerEpoch: 2, + State: configstore.FlightSessionStateActive, + ExpiresAt: now.Add(30 * time.Minute), // not yet expired + LastSeenAt: now.Add(-2 * time.Minute), + }); err != nil { + t.Fatalf("UpsertFlightSessionRecord: %v", err) + } + + orphaned, err := store.ListOrphanedWorkers(now.Add(-30 * time.Second)) + if err != nil { + t.Fatalf("ListOrphanedWorkers: %v", err) + } + for _, w := range orphaned { + if w.WorkerID == 501 { + t.Fatalf("worker 501 has an active Flight session; orphan janitor must spare it (got %#v)", orphaned) + } + } +} + +// TestListOrphanedWorkersIncludesWorkersWithReconnectingFlightSessions: +// the reconnecting state means a customer is mid-handshake from a Flight +// client picking the session back up. Same protection applies — kill the +// worker and you kill the resumption. +func TestListOrphanedWorkersIncludesWorkersWithReconnectingFlightSessions(t *testing.T) { + store := newIsolatedConfigStore(t) + now := time.Date(2026, time.April, 30, 14, 0, 0, 0, time.UTC) + + if err := store.UpsertControlPlaneInstance(&configstore.ControlPlaneInstance{ + ID: "cp-old:boot-a", + PodName: "duckgres-old", + PodUID: "pod-old", + BootID: "boot-a", + State: configstore.ControlPlaneInstanceStateExpired, + StartedAt: now.Add(-2 * time.Hour), + LastHeartbeatAt: now.Add(-1 * time.Hour), + ExpiredAt: ptrTime(now.Add(-1 * time.Hour)), + }); err != nil { + t.Fatalf("UpsertControlPlaneInstance: %v", err) + } + if err := store.UpsertWorkerRecord(&configstore.WorkerRecord{ + WorkerID: 502, + PodName: "duckgres-worker-502", + State: configstore.WorkerStateHot, + OrgID: "acme", + OwnerCPInstanceID: "cp-old:boot-a", + OwnerEpoch: 2, + LastHeartbeatAt: now.Add(-30 * time.Minute), + }); err != nil { + t.Fatalf("UpsertWorkerRecord: %v", err) + } + if err := store.UpsertFlightSessionRecord(&configstore.FlightSessionRecord{ + SessionToken: "tok-reconnect-502", + Username: "postgres", + OrgID: "acme", + WorkerID: 502, + OwnerEpoch: 2, + State: configstore.FlightSessionStateReconnecting, + ExpiresAt: now.Add(30 * time.Minute), + LastSeenAt: now.Add(-1 * time.Minute), + }); err != nil { + t.Fatalf("UpsertFlightSessionRecord: %v", err) + } + + orphaned, err := store.ListOrphanedWorkers(now.Add(-30 * time.Second)) + if err != nil { + t.Fatalf("ListOrphanedWorkers: %v", err) + } + for _, w := range orphaned { + if w.WorkerID == 502 { + t.Fatalf("worker 502 has a Flight session in reconnecting state; orphan janitor must spare it (got %#v)", orphaned) + } + } +} + +// TestListOrphanedWorkersIncludesWorkersWithExpiredFlightSessions: once +// the Flight session record has been moved to a terminal state (expired +// or closed), the customer can no longer reclaim. The worker should +// then be retired by the orphan janitor like any other unowned row. +// Without this, a worker would linger forever once its session expired +// and the orphan list filtered it out forever. +func TestListOrphanedWorkersIncludesWorkersWithExpiredFlightSessions(t *testing.T) { + store := newIsolatedConfigStore(t) + now := time.Date(2026, time.April, 30, 14, 0, 0, 0, time.UTC) + + if err := store.UpsertControlPlaneInstance(&configstore.ControlPlaneInstance{ + ID: "cp-old:boot-a", + PodName: "duckgres-old", + PodUID: "pod-old", + BootID: "boot-a", + State: configstore.ControlPlaneInstanceStateExpired, + StartedAt: now.Add(-2 * time.Hour), + LastHeartbeatAt: now.Add(-1 * time.Hour), + ExpiredAt: ptrTime(now.Add(-1 * time.Hour)), + }); err != nil { + t.Fatalf("UpsertControlPlaneInstance: %v", err) + } + if err := store.UpsertWorkerRecord(&configstore.WorkerRecord{ + WorkerID: 503, + PodName: "duckgres-worker-503", + State: configstore.WorkerStateHot, + OrgID: "acme", + OwnerCPInstanceID: "cp-old:boot-a", + OwnerEpoch: 2, + LastHeartbeatAt: now.Add(-30 * time.Minute), + }); err != nil { + t.Fatalf("UpsertWorkerRecord: %v", err) + } + if err := store.UpsertFlightSessionRecord(&configstore.FlightSessionRecord{ + SessionToken: "tok-gone-503", + Username: "postgres", + OrgID: "acme", + WorkerID: 503, + OwnerEpoch: 2, + State: configstore.FlightSessionStateExpired, + ExpiresAt: now.Add(-10 * time.Minute), + LastSeenAt: now.Add(-2 * time.Hour), + }); err != nil { + t.Fatalf("UpsertFlightSessionRecord: %v", err) + } + + orphaned, err := store.ListOrphanedWorkers(now.Add(-30 * time.Second)) + if err != nil { + t.Fatalf("ListOrphanedWorkers: %v", err) + } + found := false + for _, w := range orphaned { + if w.WorkerID == 503 { + found = true + break + } + } + if !found { + t.Fatalf("worker 503's Flight session is expired; orphan janitor MUST return it for retirement (got %#v)", orphaned) + } +} + func TestExpireFlightSessionRecordsPostgres(t *testing.T) { store := newIsolatedConfigStore(t) now := time.Date(2026, time.March, 27, 15, 0, 0, 0, time.UTC) From cf8121a7ad9a326592cfdd9360279c50796d8bf5 Mon Sep 17 00:00:00 2001 From: Benjamin Knofe-Vider Date: Thu, 30 Apr 2026 14:16:54 +0200 Subject: [PATCH 2/2] feat(controlplane): protect active sessions across CP shutdowns (GREEN) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three interlocking changes that keep an in-flight customer query from being killed by a CP rollout. Together they remove the timing race behind the worker-40761 incident: a query running at the moment the old CP gives up (whether at 15min, 8h, or any other timeout) survives. Layer 1: CP doesn't exit while sessions are active. - HandoverDrainTimeout default flips from 15m to 0 in remote mode (= unbounded; k8s terminationGracePeriodSeconds is the only wall). The previous self-imposed wall was the proximate cause of pod-deletes during long-tail drains. Process mode keeps 24h since there's no k8s safety net. - waitForDrain treats timeout==0 as unbounded via context.Background(). - Janitor's ExpireDrainingControlPlaneInstances is now disabled in remote mode (it gates on j.maxDrainTimeout > 0). A draining CP that is still heartbeating stays "alive" to its peers; only stale heartbeat — handled by the existing ExpireControlPlaneInstances — marks a CP dead. This prevents peer CPs from forcibly expiring a draining CP and orphan-retiring its workers. Layer 2: ShutdownAll skips workers with active sessions. - Defense in depth for the case where the CP exits anyway (k8s SIGKILL after gracePeriod, or future code paths). A worker with activeSessions > 0 is left running in 'hot' state owned by the dying CP. The pod survives, the Flight client can reconnect by session token, and a peer CP can claim via TakeOverWorker. - Workers preserved this way stay in the in-memory pool until process exit so any residual session bookkeeping during the shutdown window still finds them. Layer 3: orphan janitor spares workers with reclaimable Flight sessions. - ListOrphanedWorkers gains a NOT EXISTS filter against flight_session_records: a row with at least one session in active or reconnecting state is left alone. Bounds the protection — once ExpireFlightSessionRecords moves the session to expired/closed, the worker is retired normally on the next sweep. Behavior with all three layers: - Customer mid-query during CP roll → old CP's drainAndShutdown waits for sessions → if customer finishes, clean exit; if k8s SIGKILL fires, worker survives via Layer 2 + Layer 3. - Customer Flight client reconnects within session TTL → peer CP claims via TakeOverWorker, query resumes. - Customer never reconnects → flight_session_records expires (TTL default 1h) → next orphan sweep retires the worker normally. - Pgwire customer connected to dying CP → connection dies when CP exits regardless. Worker preservation is moot for them but harmless; Layer 1 still extends the CP's life so most short queries finish. --- controlplane/configstore/store.go | 26 +++++++++++++++++++++++-- controlplane/control.go | 29 +++++++++++++++++++++++----- controlplane/k8s_pool.go | 32 +++++++++++++++++++++++++++++-- controlplane/k8s_pool_test.go | 2 +- 4 files changed, 79 insertions(+), 10 deletions(-) diff --git a/controlplane/configstore/store.go b/controlplane/configstore/store.go index 155d0f5..a7625d5 100644 --- a/controlplane/configstore/store.go +++ b/controlplane/configstore/store.go @@ -1078,8 +1078,17 @@ func (cs *ConfigStore) CreateNeutralWarmWorkerSlot(ownerCPInstanceID, podNamePre // Re-listing terminal rows here would loop the janitor on K8s 404s. // // The join switched from INNER to LEFT in Apr 2026 to handle (2)/(3); -// the original implementation only handled (1). See -// TestListOrphanedWorkers* in tests/configstore for the regression +// the original implementation only handled (1). +// +// Apr 2026 also added an exclusion for workers with reclaimable Flight +// sessions: a row with at least one flight_session_records entry in +// active or reconnecting state is spared from orphan retirement so a +// customer reconnecting by session token can still pick up their query +// (see TakeOverWorker). Once the session record itself becomes terminal +// (expired/closed via ExpireFlightSessionRecords), the worker is +// retired normally on the next sweep. +// +// See TestListOrphanedWorkers* in tests/configstore for the regression // fixtures. func (cs *ConfigStore) ListOrphanedWorkers(before time.Time) ([]WorkerRecord, error) { var workers []WorkerRecord @@ -1092,8 +1101,13 @@ func (cs *ConfigStore) ListOrphanedWorkers(before time.Time) ([]WorkerRecord, er WorkerStateHotIdle, WorkerStateDraining, } + reclaimableSessionStates := []FlightSessionState{ + FlightSessionStateActive, + FlightSessionStateReconnecting, + } workerTable := cs.runtimeTable((&WorkerRecord{}).TableName()) cpTable := cs.runtimeTable((&ControlPlaneInstance{}).TableName()) + flightTable := cs.runtimeTable((&FlightSessionRecord{}).TableName()) err := cs.db.Table(workerTable+" AS w"). Select("w.*"). Joins("LEFT JOIN "+cpTable+" AS cp ON cp.id = w.owner_cp_instance_id"). @@ -1109,6 +1123,14 @@ func (cs *ConfigStore) ListOrphanedWorkers(before time.Time) ([]WorkerRecord, er before, before, ). + // Spare workers with at least one reclaimable Flight session: a + // retire here would kill the customer's mid-flight query at the + // moment they reconnect by session token. Bounded by + // ExpireFlightSessionRecords — once the session record is moved + // to a terminal state, the worker is no longer protected. + Where("NOT EXISTS (SELECT 1 FROM "+flightTable+" AS f "+ + "WHERE f.worker_id = w.worker_id AND f.state IN ?)", + reclaimableSessionStates). Order("w.worker_id ASC"). Find(&workers).Error if err != nil { diff --git a/controlplane/control.go b/controlplane/control.go index ebcd50d..71d4a3d 100644 --- a/controlplane/control.go +++ b/controlplane/control.go @@ -39,7 +39,7 @@ type ControlPlaneConfig struct { WorkerQueueTimeout time.Duration // How long to wait for an available worker slot (default: 5m) WorkerIdleTimeout time.Duration // How long to keep an idle worker alive (default: 5m) RetireOnSessionEnd bool // When true, process workers are retired immediately after their last session ends. - HandoverDrainTimeout time.Duration // How long to wait for connections to drain during upgrade (default: 24h) + HandoverDrainTimeout time.Duration // How long to wait for connections to drain during upgrade. 0 = unbounded (wait until k8s SIGKILL via terminationGracePeriodSeconds). Default: 0 in remote mode (so a CP rolling out doesn't kill in-flight customer queries at a self-imposed wall — see drainAndShutdown), 24h in process mode. MetricsServer *http.Server // Optional metrics server to shut down during upgrade // WorkerBackend selects the worker management backend. @@ -187,8 +187,15 @@ func RunControlPlane(cfg ControlPlaneConfig) { cfg.WorkerIdleTimeout = 5 * time.Minute } if cfg.HandoverDrainTimeout == 0 { + // Remote mode: no internal drain timeout. The CP waits for active + // sessions to finish for as long as it takes; k8s + // terminationGracePeriodSeconds is the only hard wall. The + // previous 15m default cut off in-flight customer queries that + // just happened to be running at the wall (see the worker-40761 + // incident) — moving the wall doesn't fix that race, removing it + // does. Process mode keeps 24h since there's no k8s safety net. if cfg.WorkerBackend == "remote" { - cfg.HandoverDrainTimeout = 15 * time.Minute + cfg.HandoverDrainTimeout = 0 } else { cfg.HandoverDrainTimeout = 24 * time.Hour } @@ -1257,7 +1264,11 @@ func (cp *ControlPlane) drainAndShutdown(timeout time.Duration) { if cp.flight != nil { cp.flight.BeginDrain() } - slog.Info("Waiting for planned shutdown drain.", "timeout", timeout) + if timeout > 0 { + slog.Info("Waiting for planned shutdown drain.", "timeout", timeout) + } else { + slog.Info("Waiting for planned shutdown drain (unbounded — k8s SIGKILL is the wall).") + } if cp.waitForDrain(timeout) { slog.Info("All pgwire connections and Flight sessions drained before shutdown.") } else { @@ -1280,9 +1291,17 @@ func (cp *ControlPlane) stopAcceptingPGConnections() { } } +// waitForDrain blocks until both the pgwire and Flight server report +// zero in-flight work, or the timeout fires. timeout == 0 means +// unbounded — k8s terminationGracePeriodSeconds becomes the only wall. +// Returns true on clean drain, false on timeout. func (cp *ControlPlane) waitForDrain(timeout time.Duration) bool { - ctx, cancel := context.WithTimeout(context.Background(), timeout) - defer cancel() + ctx := context.Background() + if timeout > 0 { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, timeout) + defer cancel() + } pgDone := make(chan struct{}) go func() { diff --git a/controlplane/k8s_pool.go b/controlplane/k8s_pool.go index 04a9c7a..97af46d 100644 --- a/controlplane/k8s_pool.go +++ b/controlplane/k8s_pool.go @@ -2015,8 +2015,32 @@ func (p *K8sWorkerPool) ShutdownAll() { close(p.stopInform) ctx := context.Background() + preserved := make(map[int]*ManagedWorker) for _, w := range workers { podName := p.workerPodName(w) + + // A worker that's serving sessions must not be torn down during + // CP shutdown — pod-deletion would kill in-flight customer queries + // at exactly the moment ShutdownAll runs (the failure mode hit by + // the production worker-40761 incident on a 15-minute drain wall). + // Leave the worker in `hot` state, owned by this dying CP. Two + // downstream guarantees keep this safe: + // (1) Flight clients can reconnect by session token; a peer CP + // claims via TakeOverWorker and the query resumes. + // (2) ListOrphanedWorkers' JOIN against flight_session_records + // prevents peer CPs' janitors from retiring the worker + // while a session is still active or reconnecting; once the + // session record is expired the worker is retired normally. + // pgwire customers connected to this CP have already lost their + // connection (the CP socket is going away) — protecting the + // worker doesn't help them, but it doesn't hurt either. + if w.activeSessions > 0 { + slog.Info("ShutdownAll: worker has active sessions; leaving pod alive for Flight reconnect.", + "id", w.ID, "worker_pod", podName, "active_sessions", w.activeSessions) + preserved[w.ID] = w + continue + } + slog.Info("Shutting down K8s worker.", "id", w.ID, "worker_pod", podName) // Step 1: CAS to draining. Skip the worker on CAS miss or error — @@ -2068,10 +2092,14 @@ func (p *K8sWorkerPool) ShutdownAll() { p.mu.Unlock() } + // Workers preserved due to active sessions stay in the in-memory map + // so any remaining session bookkeeping inside this CP still finds them + // during the residual shutdown window. The map is wiped on process + // exit; preservation is purely about not yanking the rug here. p.mu.Lock() - p.workers = make(map[int]*ManagedWorker) + p.workers = preserved p.mu.Unlock() - observeControlPlaneWorkers(0) + observeControlPlaneWorkers(len(preserved)) } // retireWorkerPod closes the gRPC client and deletes the worker pod. diff --git a/controlplane/k8s_pool_test.go b/controlplane/k8s_pool_test.go index a1048f0..e7d4d06 100644 --- a/controlplane/k8s_pool_test.go +++ b/controlplane/k8s_pool_test.go @@ -1502,7 +1502,7 @@ func TestK8sPoolActivateReservedWorkerPersistsActivatingThenHotWorkerRecord(t *t // TestK8sPoolWorkerRecordForIdleStampsOwnerCPInstanceID guards against the // warm-pool churn loop. workerRecordFor used to clear OwnerCPInstanceID // whenever state==Idle, which left every freshly-spawned warm worker -// matching ListOrphanedWorkers case (2) (NULLIF(owner_cp_instance_id, '') IS +// matching ListOrphanedWorkers case (2) (NULLIF(owner_cp_instance_id, ”) IS // NULL AND last_heartbeat_at <= before) the moment it crossed the orphan // grace. The janitor then retired it, the warm pool replenished, and the // loop repeated indefinitely. Stamping warm workers with the creating CP's