diff --git a/apps/workspace-engine/pkg/reconcile/memory/memory.go b/apps/workspace-engine/pkg/reconcile/memory/memory.go index e61acbb15..d16664870 100644 --- a/apps/workspace-engine/pkg/reconcile/memory/memory.go +++ b/apps/workspace-engine/pkg/reconcile/memory/memory.go @@ -264,7 +264,7 @@ func (q *Queue) AckSuccess( defer q.backend.mu.Unlock() s, ok := q.backend.scopes[params.ItemID] - if !ok || s.ClaimedBy != params.WorkerID || s.UpdatedAt.After(params.ClaimedUpdatedAt) { + if !ok || s.ClaimedBy != params.WorkerID { return reconcile.AckSuccessResult{}, reconcile.ErrClaimNotOwned } diff --git a/apps/workspace-engine/pkg/reconcile/memory/memory_test.go b/apps/workspace-engine/pkg/reconcile/memory/memory_test.go index f08c16ba1..fe300364c 100644 --- a/apps/workspace-engine/pkg/reconcile/memory/memory_test.go +++ b/apps/workspace-engine/pkg/reconcile/memory/memory_test.go @@ -45,9 +45,8 @@ func TestQueue_EnqueueClaimAckLifecycle(t *testing.T) { } ack, err := queue.AckSuccess(ctx, reconcile.AckSuccessParams{ - ItemID: items[0].ID, - WorkerID: "worker-a", - ClaimedUpdatedAt: items[0].UpdatedAt, + ItemID: items[0].ID, + WorkerID: "worker-a", }) if err != nil { t.Fatalf("ack failed: %v", err) diff --git a/apps/workspace-engine/pkg/reconcile/postgres/db/reconcile_work_item.sql.go b/apps/workspace-engine/pkg/reconcile/postgres/db/reconcile_work_item.sql.go index 7426858e1..4336a668c 100644 --- a/apps/workspace-engine/pkg/reconcile/postgres/db/reconcile_work_item.sql.go +++ b/apps/workspace-engine/pkg/reconcile/postgres/db/reconcile_work_item.sql.go @@ -334,7 +334,7 @@ WITH target_scope AS ( DELETE FROM reconcile_work_scope AS s USING target_scope AS t WHERE s.id = t.id - AND s.updated_at <= $3 + AND s.claimed_by = $2 RETURNING s.id ) SELECT @@ -345,7 +345,6 @@ SELECT type DeleteClaimedReconcileWorkItemParams struct { ID int64 ClaimedBy pgtype.Text - UpdatedAt pgtype.Timestamptz } type DeleteClaimedReconcileWorkItemRow struct { @@ -353,10 +352,18 @@ type DeleteClaimedReconcileWorkItemRow struct { Deleted bool } -// Delete the scope row if it is still owned by the caller and unchanged since -// the claim snapshot. Returns ownership and deletion status. +// Delete the scope row if it is still owned by the caller. Ownership is +// the only invariant we check; a prior version also required updated_at to +// match the claim snapshot, but our own lease heartbeat advances updated_at +// mid-processing, so that guard silently broke deletion for any item whose +// processing exceeded one heartbeat tick. +// +// claimed_by is re-checked inside the DELETE row-lock (not just on the +// target_scope snapshot) to close a race where the lease could expire, +// claim_cleanup releases, and another worker claims the row between the +// CTE snapshot read and the row-lock acquisition. func (q *Queries) DeleteClaimedReconcileWorkItem(ctx context.Context, arg DeleteClaimedReconcileWorkItemParams) (DeleteClaimedReconcileWorkItemRow, error) { - row := q.db.QueryRow(ctx, deleteClaimedReconcileWorkItem, arg.ID, arg.ClaimedBy, arg.UpdatedAt) + row := q.db.QueryRow(ctx, deleteClaimedReconcileWorkItem, arg.ID, arg.ClaimedBy) var i DeleteClaimedReconcileWorkItemRow err := row.Scan(&i.Owned, &i.Deleted) return i, err diff --git a/apps/workspace-engine/pkg/reconcile/postgres/pg.go b/apps/workspace-engine/pkg/reconcile/postgres/pg.go index 88c7b140a..9accaf3ab 100644 --- a/apps/workspace-engine/pkg/reconcile/postgres/pg.go +++ b/apps/workspace-engine/pkg/reconcile/postgres/pg.go @@ -341,7 +341,6 @@ func (q *Queue) AckSuccess( sqldb.DeleteClaimedReconcileWorkItemParams{ ID: params.ItemID, ClaimedBy: pgtype.Text{String: params.WorkerID, Valid: true}, - UpdatedAt: pgtype.Timestamptz{Time: params.ClaimedUpdatedAt, Valid: true}, }, ) if err != nil { diff --git a/apps/workspace-engine/pkg/reconcile/postgres/pg_integration_test.go b/apps/workspace-engine/pkg/reconcile/postgres/pg_integration_test.go index 9af32a634..e9cf546f6 100644 --- a/apps/workspace-engine/pkg/reconcile/postgres/pg_integration_test.go +++ b/apps/workspace-engine/pkg/reconcile/postgres/pg_integration_test.go @@ -352,9 +352,8 @@ func TestQueue_EnqueueClaimAckLifecycle(t *testing.T) { } ack, err := queue.AckSuccess(ctx, reconcile.AckSuccessParams{ - ItemID: item.ID, - WorkerID: "worker-a", - ClaimedUpdatedAt: item.UpdatedAt, + ItemID: item.ID, + WorkerID: "worker-a", }) if err != nil { t.Fatalf("ack failed: %v", err) @@ -376,6 +375,74 @@ func TestQueue_EnqueueClaimAckLifecycle(t *testing.T) { } } +// Regression: AckSuccess must delete the row even when a heartbeat has +// advanced updated_at past the claim snapshot. A previous version of the +// DELETE required updated_at <= claim time, which silently broke Ack for any +// item whose Process exceeded one heartbeat tick. +func TestQueue_AckSucceedsAfterHeartbeat(t *testing.T) { + pool := requireDB(t) + queue := New(pool) + workspaceID := uuid.NewString() + t.Cleanup(func() { cleanupWorkspaceItems(t, pool, workspaceID) }) + + ctx := context.Background() + + err := queue.Enqueue(ctx, reconcile.EnqueueParams{ + WorkspaceID: workspaceID, + Kind: "deploymentresourceselectoreval", + ScopeType: "deployment", + ScopeID: uuid.NewString(), + NotBefore: time.Now().Add(-1 * time.Second), + }) + if err != nil { + t.Fatalf("enqueue failed: %v", err) + } + + items, err := queue.Claim(ctx, reconcile.ClaimParams{ + BatchSize: 1, + WorkerID: "worker-a", + LeaseDuration: 5 * time.Second, + }) + if err != nil { + t.Fatalf("claim failed: %v", err) + } + if len(items) != 1 { + t.Fatalf("expected 1 claimed item, got %d", len(items)) + } + item := items[0] + + if err := queue.ExtendLease(ctx, reconcile.ExtendLeaseParams{ + ItemID: item.ID, + WorkerID: "worker-a", + LeaseDuration: 5 * time.Second, + }); err != nil { + t.Fatalf("extend lease failed: %v", err) + } + + ack, err := queue.AckSuccess(ctx, reconcile.AckSuccessParams{ + ItemID: item.ID, + WorkerID: "worker-a", + }) + if err != nil { + t.Fatalf("ack failed after heartbeat: %v", err) + } + if !ack.Deleted { + t.Fatalf("expected Deleted=true after heartbeat, got %+v", ack) + } + + claimedAgain, err := queue.Claim(ctx, reconcile.ClaimParams{ + BatchSize: 1, + WorkerID: "worker-a", + LeaseDuration: 2 * time.Second, + }) + if err != nil { + t.Fatalf("re-claim failed: %v", err) + } + if len(claimedAgain) != 0 { + t.Fatalf("expected queue empty after ack, got %d items", len(claimedAgain)) + } +} + func TestQueue_FilteredClaimAndRetry(t *testing.T) { pool := requireDB(t) workspaceID := uuid.NewString() @@ -577,9 +644,8 @@ func TestQueue_ValidationAndOwnershipErrors(t *testing.T) { t.Fatalf("expected ErrMissingWorkerID, got %v", err) } if _, err := queue.AckSuccess(ctx, reconcile.AckSuccessParams{ - ItemID: 123456, - WorkerID: "w", - ClaimedUpdatedAt: time.Now(), + ItemID: 123456, + WorkerID: "w", }); !errors.Is(err, reconcile.ErrClaimNotOwned) { t.Fatalf("expected ErrClaimNotOwned for unknown ack item, got %v", err) } @@ -643,9 +709,8 @@ func TestQueue_DatabaseErrorPaths(t *testing.T) { } if _, err := queue.AckSuccess(ctx, reconcile.AckSuccessParams{ - ItemID: 1, - WorkerID: "w", - ClaimedUpdatedAt: time.Now(), + ItemID: 1, + WorkerID: "w", }); err == nil { t.Fatal("expected ack db error on closed pool") } @@ -737,9 +802,8 @@ func TestQueue_ClaimAndAck(t *testing.T) { } ack, err := queue.AckSuccess(ctx, reconcile.AckSuccessParams{ - ItemID: items[0].ID, - WorkerID: "worker-a", - ClaimedUpdatedAt: items[0].UpdatedAt, + ItemID: items[0].ID, + WorkerID: "worker-a", }) if err != nil { t.Fatalf("ack item failed: %v", err) diff --git a/apps/workspace-engine/pkg/reconcile/postgres/queries/reconcile_work_item.sql b/apps/workspace-engine/pkg/reconcile/postgres/queries/reconcile_work_item.sql index 107e1de06..6b74a69b0 100644 --- a/apps/workspace-engine/pkg/reconcile/postgres/queries/reconcile_work_item.sql +++ b/apps/workspace-engine/pkg/reconcile/postgres/queries/reconcile_work_item.sql @@ -126,8 +126,16 @@ WHERE id = sqlc.arg(id) AND claimed_by = sqlc.arg(claimed_by); -- name: DeleteClaimedReconcileWorkItem :one --- Delete the scope row if it is still owned by the caller and unchanged since --- the claim snapshot. Returns ownership and deletion status. +-- Delete the scope row if it is still owned by the caller. Ownership is +-- the only invariant we check; a prior version also required updated_at to +-- match the claim snapshot, but our own lease heartbeat advances updated_at +-- mid-processing, so that guard silently broke deletion for any item whose +-- processing exceeded one heartbeat tick. +-- +-- claimed_by is re-checked inside the DELETE row-lock (not just on the +-- target_scope snapshot) to close a race where the lease could expire, +-- claim_cleanup releases, and another worker claims the row between the +-- CTE snapshot read and the row-lock acquisition. WITH target_scope AS ( SELECT s.id FROM reconcile_work_scope AS s @@ -137,7 +145,7 @@ WITH target_scope AS ( DELETE FROM reconcile_work_scope AS s USING target_scope AS t WHERE s.id = t.id - AND s.updated_at <= sqlc.arg(updated_at) + AND s.claimed_by = sqlc.arg(claimed_by) RETURNING s.id ) SELECT diff --git a/apps/workspace-engine/pkg/reconcile/worker.go b/apps/workspace-engine/pkg/reconcile/worker.go index 61ec7de5d..327614b08 100644 --- a/apps/workspace-engine/pkg/reconcile/worker.go +++ b/apps/workspace-engine/pkg/reconcile/worker.go @@ -286,10 +286,9 @@ func (w *Worker) processClaimedItem(ctx context.Context, item Item) { return } - _, ackErr := w.queue.AckSuccess(ctx, AckSuccessParams{ - ItemID: item.ID, - WorkerID: w.cfg.WorkerID, - ClaimedUpdatedAt: item.UpdatedAt, + ackResult, ackErr := w.queue.AckSuccess(ctx, AckSuccessParams{ + ItemID: item.ID, + WorkerID: w.cfg.WorkerID, }) if ackErr != nil { if w.cfg.Hooks.OnDropped != nil { @@ -297,6 +296,22 @@ func (w *Worker) processClaimedItem(ctx context.Context, item Item) { } return } + if !ackResult.Deleted { + slog.ErrorContext(ctx, + "ack reported ownership but did not delete row — row will be re-claimed", + "item", item.ID, + "kind", item.Kind, + "scopeType", item.ScopeType, + "scopeID", item.ScopeID, + ) + if w.cfg.Hooks.OnDropped != nil { + w.cfg.Hooks.OnDropped( + item, + fmt.Errorf("ack success returned Deleted=false for item %d", item.ID), + ) + } + return + } if w.cfg.Hooks.OnProcessed != nil { w.cfg.Hooks.OnProcessed(item) } diff --git a/apps/workspace-engine/pkg/reconcile/worker_test.go b/apps/workspace-engine/pkg/reconcile/worker_test.go index 01eff795b..106c90437 100644 --- a/apps/workspace-engine/pkg/reconcile/worker_test.go +++ b/apps/workspace-engine/pkg/reconcile/worker_test.go @@ -3,6 +3,7 @@ package reconcile import ( "context" "errors" + "strings" "sync" "sync/atomic" "testing" @@ -502,6 +503,38 @@ func TestProcessClaimedItemBranches(t *testing.T) { t.Fatalf("expected AckPermanentFailure NOT called, got %d", q.ackPermCalls.Load()) } }) + + // Safety net: if AckSuccess ever returns Owned=true with Deleted=false + // (which the current SQL shouldn't allow, but a future regression might), + // the worker must surface it via OnDropped rather than silently calling + // OnProcessed as if all is well. + t.Run("ack owned but not deleted fires OnDropped", func(t *testing.T) { + var dropped atomic.Int64 + var processed atomic.Int64 + var droppedErr error + cfg7 := cfg + cfg7.Hooks.OnDropped = func(_ Item, err error) { + dropped.Add(1) + droppedErr = err + } + cfg7.Hooks.OnProcessed = func(Item) { processed.Add(1) } + q := &fakeQueue{ + ackFn: func(context.Context, AckSuccessParams) (AckSuccessResult, error) { + return AckSuccessResult{Deleted: false}, nil + }, + } + w, _ := NewWorker("workqueue-worker", q, fakeProcessor{}, cfg7) + w.processClaimedItem(context.Background(), item) + if dropped.Load() != 1 { + t.Fatalf("expected OnDropped called once, got %d", dropped.Load()) + } + if processed.Load() != 0 { + t.Fatalf("expected OnProcessed NOT called, got %d", processed.Load()) + } + if droppedErr == nil || !strings.Contains(droppedErr.Error(), "Deleted=false") { + t.Fatalf("expected error mentioning Deleted=false, got %v", droppedErr) + } + }) } func TestRetryBackoff(t *testing.T) { diff --git a/apps/workspace-engine/pkg/reconcile/workqueue.go b/apps/workspace-engine/pkg/reconcile/workqueue.go index 7f4fe937f..e0d67fde2 100644 --- a/apps/workspace-engine/pkg/reconcile/workqueue.go +++ b/apps/workspace-engine/pkg/reconcile/workqueue.go @@ -117,9 +117,8 @@ type ExtendLeaseParams struct { } type AckSuccessParams struct { - ItemID int64 - WorkerID string - ClaimedUpdatedAt time.Time + ItemID int64 + WorkerID string } type AckSuccessResult struct { diff --git a/apps/workspace-engine/test/controllers/harness/helpers.go b/apps/workspace-engine/test/controllers/harness/helpers.go index 5d8a600e5..e69c5620f 100644 --- a/apps/workspace-engine/test/controllers/harness/helpers.go +++ b/apps/workspace-engine/test/controllers/harness/helpers.go @@ -2,6 +2,7 @@ package harness import ( "context" + "fmt" "time" "workspace-engine/pkg/reconcile" @@ -72,11 +73,16 @@ func DrainQueue( }) } - _, _ = queue.AckSuccess(ctx, reconcile.AckSuccessParams{ - ItemID: item.ID, - WorkerID: testWorkerID, - ClaimedUpdatedAt: item.UpdatedAt, + ackResult, ackErr := queue.AckSuccess(ctx, reconcile.AckSuccessParams{ + ItemID: item.ID, + WorkerID: testWorkerID, }) + if ackErr != nil { + return res, ackErr + } + if !ackResult.Deleted { + return res, fmt.Errorf("ack reported owned but did not delete item %d", item.ID) + } res.Processed++ } }