diff --git a/controlplane/configstore/store.go b/controlplane/configstore/store.go index 155d0f5..a7625d5 100644 --- a/controlplane/configstore/store.go +++ b/controlplane/configstore/store.go @@ -1078,8 +1078,17 @@ func (cs *ConfigStore) CreateNeutralWarmWorkerSlot(ownerCPInstanceID, podNamePre // Re-listing terminal rows here would loop the janitor on K8s 404s. // // The join switched from INNER to LEFT in Apr 2026 to handle (2)/(3); -// the original implementation only handled (1). See -// TestListOrphanedWorkers* in tests/configstore for the regression +// the original implementation only handled (1). +// +// Apr 2026 also added an exclusion for workers with reclaimable Flight +// sessions: a row with at least one flight_session_records entry in +// active or reconnecting state is spared from orphan retirement so a +// customer reconnecting by session token can still pick up their query +// (see TakeOverWorker). Once the session record itself becomes terminal +// (expired/closed via ExpireFlightSessionRecords), the worker is +// retired normally on the next sweep. +// +// See TestListOrphanedWorkers* in tests/configstore for the regression // fixtures. func (cs *ConfigStore) ListOrphanedWorkers(before time.Time) ([]WorkerRecord, error) { var workers []WorkerRecord @@ -1092,8 +1101,13 @@ func (cs *ConfigStore) ListOrphanedWorkers(before time.Time) ([]WorkerRecord, er WorkerStateHotIdle, WorkerStateDraining, } + reclaimableSessionStates := []FlightSessionState{ + FlightSessionStateActive, + FlightSessionStateReconnecting, + } workerTable := cs.runtimeTable((&WorkerRecord{}).TableName()) cpTable := cs.runtimeTable((&ControlPlaneInstance{}).TableName()) + flightTable := cs.runtimeTable((&FlightSessionRecord{}).TableName()) err := cs.db.Table(workerTable+" AS w"). Select("w.*"). Joins("LEFT JOIN "+cpTable+" AS cp ON cp.id = w.owner_cp_instance_id"). @@ -1109,6 +1123,14 @@ func (cs *ConfigStore) ListOrphanedWorkers(before time.Time) ([]WorkerRecord, er before, before, ). + // Spare workers with at least one reclaimable Flight session: a + // retire here would kill the customer's mid-flight query at the + // moment they reconnect by session token. Bounded by + // ExpireFlightSessionRecords — once the session record is moved + // to a terminal state, the worker is no longer protected. + Where("NOT EXISTS (SELECT 1 FROM "+flightTable+" AS f "+ + "WHERE f.worker_id = w.worker_id AND f.state IN ?)", + reclaimableSessionStates). Order("w.worker_id ASC"). Find(&workers).Error if err != nil { diff --git a/controlplane/control.go b/controlplane/control.go index ebcd50d..71d4a3d 100644 --- a/controlplane/control.go +++ b/controlplane/control.go @@ -39,7 +39,7 @@ type ControlPlaneConfig struct { WorkerQueueTimeout time.Duration // How long to wait for an available worker slot (default: 5m) WorkerIdleTimeout time.Duration // How long to keep an idle worker alive (default: 5m) RetireOnSessionEnd bool // When true, process workers are retired immediately after their last session ends. - HandoverDrainTimeout time.Duration // How long to wait for connections to drain during upgrade (default: 24h) + HandoverDrainTimeout time.Duration // How long to wait for connections to drain during upgrade. 0 = unbounded (wait until k8s SIGKILL via terminationGracePeriodSeconds). Default: 0 in remote mode (so a CP rolling out doesn't kill in-flight customer queries at a self-imposed wall — see drainAndShutdown), 24h in process mode. MetricsServer *http.Server // Optional metrics server to shut down during upgrade // WorkerBackend selects the worker management backend. @@ -187,8 +187,15 @@ func RunControlPlane(cfg ControlPlaneConfig) { cfg.WorkerIdleTimeout = 5 * time.Minute } if cfg.HandoverDrainTimeout == 0 { + // Remote mode: no internal drain timeout. The CP waits for active + // sessions to finish for as long as it takes; k8s + // terminationGracePeriodSeconds is the only hard wall. The + // previous 15m default cut off in-flight customer queries that + // just happened to be running at the wall (see the worker-40761 + // incident) — moving the wall doesn't fix that race, removing it + // does. Process mode keeps 24h since there's no k8s safety net. if cfg.WorkerBackend == "remote" { - cfg.HandoverDrainTimeout = 15 * time.Minute + cfg.HandoverDrainTimeout = 0 } else { cfg.HandoverDrainTimeout = 24 * time.Hour } @@ -1257,7 +1264,11 @@ func (cp *ControlPlane) drainAndShutdown(timeout time.Duration) { if cp.flight != nil { cp.flight.BeginDrain() } - slog.Info("Waiting for planned shutdown drain.", "timeout", timeout) + if timeout > 0 { + slog.Info("Waiting for planned shutdown drain.", "timeout", timeout) + } else { + slog.Info("Waiting for planned shutdown drain (unbounded — k8s SIGKILL is the wall).") + } if cp.waitForDrain(timeout) { slog.Info("All pgwire connections and Flight sessions drained before shutdown.") } else { @@ -1280,9 +1291,17 @@ func (cp *ControlPlane) stopAcceptingPGConnections() { } } +// waitForDrain blocks until both the pgwire and Flight server report +// zero in-flight work, or the timeout fires. timeout == 0 means +// unbounded — k8s terminationGracePeriodSeconds becomes the only wall. +// Returns true on clean drain, false on timeout. func (cp *ControlPlane) waitForDrain(timeout time.Duration) bool { - ctx, cancel := context.WithTimeout(context.Background(), timeout) - defer cancel() + ctx := context.Background() + if timeout > 0 { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, timeout) + defer cancel() + } pgDone := make(chan struct{}) go func() { diff --git a/controlplane/k8s_pool.go b/controlplane/k8s_pool.go index 04a9c7a..97af46d 100644 --- a/controlplane/k8s_pool.go +++ b/controlplane/k8s_pool.go @@ -2015,8 +2015,32 @@ func (p *K8sWorkerPool) ShutdownAll() { close(p.stopInform) ctx := context.Background() + preserved := make(map[int]*ManagedWorker) for _, w := range workers { podName := p.workerPodName(w) + + // A worker that's serving sessions must not be torn down during + // CP shutdown — pod-deletion would kill in-flight customer queries + // at exactly the moment ShutdownAll runs (the failure mode hit by + // the production worker-40761 incident on a 15-minute drain wall). + // Leave the worker in `hot` state, owned by this dying CP. Two + // downstream guarantees keep this safe: + // (1) Flight clients can reconnect by session token; a peer CP + // claims via TakeOverWorker and the query resumes. + // (2) ListOrphanedWorkers' JOIN against flight_session_records + // prevents peer CPs' janitors from retiring the worker + // while a session is still active or reconnecting; once the + // session record is expired the worker is retired normally. + // pgwire customers connected to this CP have already lost their + // connection (the CP socket is going away) — protecting the + // worker doesn't help them, but it doesn't hurt either. + if w.activeSessions > 0 { + slog.Info("ShutdownAll: worker has active sessions; leaving pod alive for Flight reconnect.", + "id", w.ID, "worker_pod", podName, "active_sessions", w.activeSessions) + preserved[w.ID] = w + continue + } + slog.Info("Shutting down K8s worker.", "id", w.ID, "worker_pod", podName) // Step 1: CAS to draining. Skip the worker on CAS miss or error — @@ -2068,10 +2092,14 @@ func (p *K8sWorkerPool) ShutdownAll() { p.mu.Unlock() } + // Workers preserved due to active sessions stay in the in-memory map + // so any remaining session bookkeeping inside this CP still finds them + // during the residual shutdown window. The map is wiped on process + // exit; preservation is purely about not yanking the rug here. p.mu.Lock() - p.workers = make(map[int]*ManagedWorker) + p.workers = preserved p.mu.Unlock() - observeControlPlaneWorkers(0) + observeControlPlaneWorkers(len(preserved)) } // retireWorkerPod closes the gRPC client and deletes the worker pod. diff --git a/controlplane/k8s_pool_test.go b/controlplane/k8s_pool_test.go index fad0836..e7d4d06 100644 --- a/controlplane/k8s_pool_test.go +++ b/controlplane/k8s_pool_test.go @@ -1502,7 +1502,7 @@ func TestK8sPoolActivateReservedWorkerPersistsActivatingThenHotWorkerRecord(t *t // TestK8sPoolWorkerRecordForIdleStampsOwnerCPInstanceID guards against the // warm-pool churn loop. workerRecordFor used to clear OwnerCPInstanceID // whenever state==Idle, which left every freshly-spawned warm worker -// matching ListOrphanedWorkers case (2) (NULLIF(owner_cp_instance_id, '') IS +// matching ListOrphanedWorkers case (2) (NULLIF(owner_cp_instance_id, ”) IS // NULL AND last_heartbeat_at <= before) the moment it crossed the orphan // grace. The janitor then retired it, the warm pool replenished, and the // loop repeated indefinitely. Stamping warm workers with the creating CP's @@ -2790,6 +2790,49 @@ func TestShutdownAll_LeavesInDrainingWhenPodDeleteFails(t *testing.T) { } } +// TestShutdownAll_SparesWorkersWithActiveSessions: a CP receiving SIGTERM +// must not pod-delete a worker that's mid-query. Today the chain runs for +// every owned worker regardless of session count, which collapses every +// in-flight query at the moment ShutdownAll fires (the failure mode the +// production incident hit on a 15-minute drain wall). +// +// After the fix, busy workers (activeSessions > 0) are skipped — left in +// hot/serving state, owned by the dying CP. Customer Flight clients can +// reconnect via session token; the orphan janitor's flight-session JOIN +// (Layer 3) prevents peer CPs from retiring them while a session record +// is still active. Idle workers (activeSessions == 0) drain normally. +func TestShutdownAll_SparesWorkersWithActiveSessions(t *testing.T) { + store := &captureRuntimeWorkerStore{} + pool, cs := shutdownTestPool(t, store) + + busy := addShutdownWorker(t, pool, cs, 1) + busy.activeSessions = 2 + + addShutdownWorker(t, pool, cs, 2) // idle (activeSessions == 0) + + pool.ShutdownAll() + + // Busy worker: pod survives, no DB transitions on its row. + if !podExists(t, cs, "worker-1") { + t.Fatal("worker-1 has active sessions; ShutdownAll must not delete its pod") + } + for _, id := range store.markDrainingCalledIDs { + if id == 1 { + t.Fatalf("MarkWorkerDraining called for busy worker 1; ShutdownAll must skip it (calls=%v)", store.markDrainingCalledIDs) + } + } + for _, id := range store.retireDrainingCalledIDs { + if id == 1 { + t.Fatalf("RetireDrainingWorker called for busy worker 1; ShutdownAll must skip it (calls=%v)", store.retireDrainingCalledIDs) + } + } + + // Idle worker: drained as before. + if podExists(t, cs, "worker-2") { + t.Fatal("worker-2 is idle; ShutdownAll should have deleted its pod") + } +} + func TestShutdownAll_TreatsPodNotFoundAsDeleteSuccess(t *testing.T) { // NotFound means another actor already removed the pod (node eviction, // a racing CP during split-brain, manual kubectl delete). The state diff --git a/tests/configstore/runtime_store_postgres_test.go b/tests/configstore/runtime_store_postgres_test.go index e672641..58d321a 100644 --- a/tests/configstore/runtime_store_postgres_test.go +++ b/tests/configstore/runtime_store_postgres_test.go @@ -1097,6 +1097,188 @@ func TestMarkCredentialsRefreshedFailsOnOwnerMismatch(t *testing.T) { } } +// TestListOrphanedWorkersExcludesWorkersWithActiveFlightSessions: a +// worker whose owning CP has expired is normally an orphan-cleanup +// candidate. But if a Flight client could still reconnect by session +// token (record is in active or reconnecting state), the orphan retire +// would kill an in-flight customer query the moment they reconnect. The +// JOIN onto flight_session_records gives those workers a reprieve until +// the session record itself is expired by ExpireFlightSessionRecords. +func TestListOrphanedWorkersExcludesWorkersWithActiveFlightSessions(t *testing.T) { + store := newIsolatedConfigStore(t) + now := time.Date(2026, time.April, 30, 14, 0, 0, 0, time.UTC) + + // Owner CP is expired long ago — easily past the 30s orphan grace. + if err := store.UpsertControlPlaneInstance(&configstore.ControlPlaneInstance{ + ID: "cp-old:boot-a", + PodName: "duckgres-old", + PodUID: "pod-old", + BootID: "boot-a", + State: configstore.ControlPlaneInstanceStateExpired, + StartedAt: now.Add(-2 * time.Hour), + LastHeartbeatAt: now.Add(-1 * time.Hour), + ExpiredAt: ptrTime(now.Add(-1 * time.Hour)), + }); err != nil { + t.Fatalf("UpsertControlPlaneInstance: %v", err) + } + + // A hot worker owned by that expired CP — exactly the shape that + // today's orphan janitor would retire. With Layer 3, the active Flight + // session below should spare it. + if err := store.UpsertWorkerRecord(&configstore.WorkerRecord{ + WorkerID: 501, + PodName: "duckgres-worker-501", + State: configstore.WorkerStateHot, + OrgID: "acme", + OwnerCPInstanceID: "cp-old:boot-a", + OwnerEpoch: 2, + LastHeartbeatAt: now.Add(-30 * time.Minute), + }); err != nil { + t.Fatalf("UpsertWorkerRecord: %v", err) + } + + // Reclaimable Flight session pointing at that worker. + if err := store.UpsertFlightSessionRecord(&configstore.FlightSessionRecord{ + SessionToken: "tok-reclaim-501", + Username: "postgres", + OrgID: "acme", + WorkerID: 501, + OwnerEpoch: 2, + State: configstore.FlightSessionStateActive, + ExpiresAt: now.Add(30 * time.Minute), // not yet expired + LastSeenAt: now.Add(-2 * time.Minute), + }); err != nil { + t.Fatalf("UpsertFlightSessionRecord: %v", err) + } + + orphaned, err := store.ListOrphanedWorkers(now.Add(-30 * time.Second)) + if err != nil { + t.Fatalf("ListOrphanedWorkers: %v", err) + } + for _, w := range orphaned { + if w.WorkerID == 501 { + t.Fatalf("worker 501 has an active Flight session; orphan janitor must spare it (got %#v)", orphaned) + } + } +} + +// TestListOrphanedWorkersIncludesWorkersWithReconnectingFlightSessions: +// the reconnecting state means a customer is mid-handshake from a Flight +// client picking the session back up. Same protection applies — kill the +// worker and you kill the resumption. +func TestListOrphanedWorkersIncludesWorkersWithReconnectingFlightSessions(t *testing.T) { + store := newIsolatedConfigStore(t) + now := time.Date(2026, time.April, 30, 14, 0, 0, 0, time.UTC) + + if err := store.UpsertControlPlaneInstance(&configstore.ControlPlaneInstance{ + ID: "cp-old:boot-a", + PodName: "duckgres-old", + PodUID: "pod-old", + BootID: "boot-a", + State: configstore.ControlPlaneInstanceStateExpired, + StartedAt: now.Add(-2 * time.Hour), + LastHeartbeatAt: now.Add(-1 * time.Hour), + ExpiredAt: ptrTime(now.Add(-1 * time.Hour)), + }); err != nil { + t.Fatalf("UpsertControlPlaneInstance: %v", err) + } + if err := store.UpsertWorkerRecord(&configstore.WorkerRecord{ + WorkerID: 502, + PodName: "duckgres-worker-502", + State: configstore.WorkerStateHot, + OrgID: "acme", + OwnerCPInstanceID: "cp-old:boot-a", + OwnerEpoch: 2, + LastHeartbeatAt: now.Add(-30 * time.Minute), + }); err != nil { + t.Fatalf("UpsertWorkerRecord: %v", err) + } + if err := store.UpsertFlightSessionRecord(&configstore.FlightSessionRecord{ + SessionToken: "tok-reconnect-502", + Username: "postgres", + OrgID: "acme", + WorkerID: 502, + OwnerEpoch: 2, + State: configstore.FlightSessionStateReconnecting, + ExpiresAt: now.Add(30 * time.Minute), + LastSeenAt: now.Add(-1 * time.Minute), + }); err != nil { + t.Fatalf("UpsertFlightSessionRecord: %v", err) + } + + orphaned, err := store.ListOrphanedWorkers(now.Add(-30 * time.Second)) + if err != nil { + t.Fatalf("ListOrphanedWorkers: %v", err) + } + for _, w := range orphaned { + if w.WorkerID == 502 { + t.Fatalf("worker 502 has a Flight session in reconnecting state; orphan janitor must spare it (got %#v)", orphaned) + } + } +} + +// TestListOrphanedWorkersIncludesWorkersWithExpiredFlightSessions: once +// the Flight session record has been moved to a terminal state (expired +// or closed), the customer can no longer reclaim. The worker should +// then be retired by the orphan janitor like any other unowned row. +// Without this, a worker would linger forever once its session expired +// and the orphan list filtered it out forever. +func TestListOrphanedWorkersIncludesWorkersWithExpiredFlightSessions(t *testing.T) { + store := newIsolatedConfigStore(t) + now := time.Date(2026, time.April, 30, 14, 0, 0, 0, time.UTC) + + if err := store.UpsertControlPlaneInstance(&configstore.ControlPlaneInstance{ + ID: "cp-old:boot-a", + PodName: "duckgres-old", + PodUID: "pod-old", + BootID: "boot-a", + State: configstore.ControlPlaneInstanceStateExpired, + StartedAt: now.Add(-2 * time.Hour), + LastHeartbeatAt: now.Add(-1 * time.Hour), + ExpiredAt: ptrTime(now.Add(-1 * time.Hour)), + }); err != nil { + t.Fatalf("UpsertControlPlaneInstance: %v", err) + } + if err := store.UpsertWorkerRecord(&configstore.WorkerRecord{ + WorkerID: 503, + PodName: "duckgres-worker-503", + State: configstore.WorkerStateHot, + OrgID: "acme", + OwnerCPInstanceID: "cp-old:boot-a", + OwnerEpoch: 2, + LastHeartbeatAt: now.Add(-30 * time.Minute), + }); err != nil { + t.Fatalf("UpsertWorkerRecord: %v", err) + } + if err := store.UpsertFlightSessionRecord(&configstore.FlightSessionRecord{ + SessionToken: "tok-gone-503", + Username: "postgres", + OrgID: "acme", + WorkerID: 503, + OwnerEpoch: 2, + State: configstore.FlightSessionStateExpired, + ExpiresAt: now.Add(-10 * time.Minute), + LastSeenAt: now.Add(-2 * time.Hour), + }); err != nil { + t.Fatalf("UpsertFlightSessionRecord: %v", err) + } + + orphaned, err := store.ListOrphanedWorkers(now.Add(-30 * time.Second)) + if err != nil { + t.Fatalf("ListOrphanedWorkers: %v", err) + } + found := false + for _, w := range orphaned { + if w.WorkerID == 503 { + found = true + break + } + } + if !found { + t.Fatalf("worker 503's Flight session is expired; orphan janitor MUST return it for retirement (got %#v)", orphaned) + } +} + func TestExpireFlightSessionRecordsPostgres(t *testing.T) { store := newIsolatedConfigStore(t) now := time.Date(2026, time.March, 27, 15, 0, 0, 0, time.UTC)