Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion apps/workspace-engine/pkg/reconcile/memory/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ func (q *Queue) AckSuccess(
defer q.backend.mu.Unlock()

s, ok := q.backend.scopes[params.ItemID]
if !ok || s.ClaimedBy != params.WorkerID || s.UpdatedAt.After(params.ClaimedUpdatedAt) {
if !ok || s.ClaimedBy != params.WorkerID {
return reconcile.AckSuccessResult{}, reconcile.ErrClaimNotOwned
}

Expand Down
5 changes: 2 additions & 3 deletions apps/workspace-engine/pkg/reconcile/memory/memory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,8 @@ func TestQueue_EnqueueClaimAckLifecycle(t *testing.T) {
}

ack, err := queue.AckSuccess(ctx, reconcile.AckSuccessParams{
ItemID: items[0].ID,
WorkerID: "worker-a",
ClaimedUpdatedAt: items[0].UpdatedAt,
ItemID: items[0].ID,
WorkerID: "worker-a",
})
if err != nil {
t.Fatalf("ack failed: %v", err)
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion apps/workspace-engine/pkg/reconcile/postgres/pg.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,6 @@ func (q *Queue) AckSuccess(
sqldb.DeleteClaimedReconcileWorkItemParams{
ID: params.ItemID,
ClaimedBy: pgtype.Text{String: params.WorkerID, Valid: true},
UpdatedAt: pgtype.Timestamptz{Time: params.ClaimedUpdatedAt, Valid: true},
},
)
if err != nil {
Expand Down
88 changes: 76 additions & 12 deletions apps/workspace-engine/pkg/reconcile/postgres/pg_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,9 +352,8 @@ func TestQueue_EnqueueClaimAckLifecycle(t *testing.T) {
}

ack, err := queue.AckSuccess(ctx, reconcile.AckSuccessParams{
ItemID: item.ID,
WorkerID: "worker-a",
ClaimedUpdatedAt: item.UpdatedAt,
ItemID: item.ID,
WorkerID: "worker-a",
})
Comment on lines 354 to 357
if err != nil {
t.Fatalf("ack failed: %v", err)
Expand All @@ -376,6 +375,74 @@ func TestQueue_EnqueueClaimAckLifecycle(t *testing.T) {
}
}

// Regression: AckSuccess must delete the row even when a heartbeat has
// advanced updated_at past the claim snapshot. A previous version of the
// DELETE required updated_at <= claim time, which silently broke Ack for any
// item whose Process exceeded one heartbeat tick.
func TestQueue_AckSucceedsAfterHeartbeat(t *testing.T) {
pool := requireDB(t)
queue := New(pool)
workspaceID := uuid.NewString()
t.Cleanup(func() { cleanupWorkspaceItems(t, pool, workspaceID) })

ctx := context.Background()

err := queue.Enqueue(ctx, reconcile.EnqueueParams{
WorkspaceID: workspaceID,
Kind: "deploymentresourceselectoreval",
ScopeType: "deployment",
ScopeID: uuid.NewString(),
NotBefore: time.Now().Add(-1 * time.Second),
})
if err != nil {
t.Fatalf("enqueue failed: %v", err)
}

items, err := queue.Claim(ctx, reconcile.ClaimParams{
BatchSize: 1,
WorkerID: "worker-a",
LeaseDuration: 5 * time.Second,
})
if err != nil {
t.Fatalf("claim failed: %v", err)
}
if len(items) != 1 {
t.Fatalf("expected 1 claimed item, got %d", len(items))
}
item := items[0]

if err := queue.ExtendLease(ctx, reconcile.ExtendLeaseParams{
ItemID: item.ID,
WorkerID: "worker-a",
LeaseDuration: 5 * time.Second,
}); err != nil {
t.Fatalf("extend lease failed: %v", err)
}

ack, err := queue.AckSuccess(ctx, reconcile.AckSuccessParams{
ItemID: item.ID,
WorkerID: "worker-a",
})
if err != nil {
t.Fatalf("ack failed after heartbeat: %v", err)
}
if !ack.Deleted {
t.Fatalf("expected Deleted=true after heartbeat, got %+v", ack)
}

claimedAgain, err := queue.Claim(ctx, reconcile.ClaimParams{
BatchSize: 1,
WorkerID: "worker-a",
LeaseDuration: 2 * time.Second,
})
if err != nil {
t.Fatalf("re-claim failed: %v", err)
}
if len(claimedAgain) != 0 {
t.Fatalf("expected queue empty after ack, got %d items", len(claimedAgain))
}
}

func TestQueue_FilteredClaimAndRetry(t *testing.T) {
pool := requireDB(t)
workspaceID := uuid.NewString()
Expand Down Expand Up @@ -577,9 +644,8 @@ func TestQueue_ValidationAndOwnershipErrors(t *testing.T) {
t.Fatalf("expected ErrMissingWorkerID, got %v", err)
}
if _, err := queue.AckSuccess(ctx, reconcile.AckSuccessParams{
ItemID: 123456,
WorkerID: "w",
ClaimedUpdatedAt: time.Now(),
ItemID: 123456,
WorkerID: "w",
}); !errors.Is(err, reconcile.ErrClaimNotOwned) {
t.Fatalf("expected ErrClaimNotOwned for unknown ack item, got %v", err)
}
Expand Down Expand Up @@ -643,9 +709,8 @@ func TestQueue_DatabaseErrorPaths(t *testing.T) {
}

if _, err := queue.AckSuccess(ctx, reconcile.AckSuccessParams{
ItemID: 1,
WorkerID: "w",
ClaimedUpdatedAt: time.Now(),
ItemID: 1,
WorkerID: "w",
}); err == nil {
t.Fatal("expected ack db error on closed pool")
}
Expand Down Expand Up @@ -737,9 +802,8 @@ func TestQueue_ClaimAndAck(t *testing.T) {
}

ack, err := queue.AckSuccess(ctx, reconcile.AckSuccessParams{
ItemID: items[0].ID,
WorkerID: "worker-a",
ClaimedUpdatedAt: items[0].UpdatedAt,
ItemID: items[0].ID,
WorkerID: "worker-a",
})
if err != nil {
t.Fatalf("ack item failed: %v", err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,16 @@ WHERE id = sqlc.arg(id)
AND claimed_by = sqlc.arg(claimed_by);

-- name: DeleteClaimedReconcileWorkItem :one
-- Delete the scope row if it is still owned by the caller and unchanged since
-- the claim snapshot. Returns ownership and deletion status.
-- Delete the scope row if it is still owned by the caller. Ownership is
-- the only invariant we check; a prior version also required updated_at to
-- match the claim snapshot, but our own lease heartbeat advances updated_at
-- mid-processing, so that guard silently broke deletion for any item whose
-- processing exceeded one heartbeat tick.
--
-- claimed_by is re-checked inside the DELETE row-lock (not just on the
-- target_scope snapshot) to close a race where the lease could expire,
-- claim_cleanup releases, and another worker claims the row between the
-- CTE snapshot read and the row-lock acquisition.
WITH target_scope AS (
SELECT s.id
FROM reconcile_work_scope AS s
Expand All @@ -137,7 +145,7 @@ WITH target_scope AS (
DELETE FROM reconcile_work_scope AS s
USING target_scope AS t
WHERE s.id = t.id
AND s.updated_at <= sqlc.arg(updated_at)
AND s.claimed_by = sqlc.arg(claimed_by)
RETURNING s.id
)
SELECT
Expand Down
23 changes: 19 additions & 4 deletions apps/workspace-engine/pkg/reconcile/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,17 +286,32 @@ func (w *Worker) processClaimedItem(ctx context.Context, item Item) {
return
}

_, ackErr := w.queue.AckSuccess(ctx, AckSuccessParams{
ItemID: item.ID,
WorkerID: w.cfg.WorkerID,
ClaimedUpdatedAt: item.UpdatedAt,
ackResult, ackErr := w.queue.AckSuccess(ctx, AckSuccessParams{
ItemID: item.ID,
WorkerID: w.cfg.WorkerID,
})
if ackErr != nil {
if w.cfg.Hooks.OnDropped != nil {
w.cfg.Hooks.OnDropped(item, ackErr)
}
return
}
if !ackResult.Deleted {
slog.ErrorContext(ctx,
"ack reported ownership but did not delete row — row will be re-claimed",
"item", item.ID,
"kind", item.Kind,
Comment on lines +299 to +303
"scopeType", item.ScopeType,
"scopeID", item.ScopeID,
)
if w.cfg.Hooks.OnDropped != nil {
w.cfg.Hooks.OnDropped(
item,
fmt.Errorf("ack success returned Deleted=false for item %d", item.ID),
)
}
return
}
if w.cfg.Hooks.OnProcessed != nil {
w.cfg.Hooks.OnProcessed(item)
}
Expand Down
33 changes: 33 additions & 0 deletions apps/workspace-engine/pkg/reconcile/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package reconcile
import (
"context"
"errors"
"strings"
"sync"
"sync/atomic"
"testing"
Expand Down Expand Up @@ -502,6 +503,38 @@ func TestProcessClaimedItemBranches(t *testing.T) {
t.Fatalf("expected AckPermanentFailure NOT called, got %d", q.ackPermCalls.Load())
}
})

// Safety net: if AckSuccess ever returns Owned=true with Deleted=false
// (which the current SQL shouldn't allow, but a future regression might),
// the worker must surface it via OnDropped rather than silently calling
// OnProcessed as if all is well.
t.Run("ack owned but not deleted fires OnDropped", func(t *testing.T) {
var dropped atomic.Int64
var processed atomic.Int64
var droppedErr error
cfg7 := cfg
cfg7.Hooks.OnDropped = func(_ Item, err error) {
dropped.Add(1)
droppedErr = err
}
cfg7.Hooks.OnProcessed = func(Item) { processed.Add(1) }
q := &fakeQueue{
ackFn: func(context.Context, AckSuccessParams) (AckSuccessResult, error) {
return AckSuccessResult{Deleted: false}, nil
},
Comment on lines +522 to +524
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Match the mocked ack result to the intended regression shape (Owned=true, Deleted=false).

This subtest targets the Owned=true/Deleted=false case, but the mock currently leaves Owned at its zero value (false), which weakens the regression coverage.

Suggested fix
 		q := &fakeQueue{
 			ackFn: func(context.Context, AckSuccessParams) (AckSuccessResult, error) {
-				return AckSuccessResult{Deleted: false}, nil
+				return AckSuccessResult{Owned: true, Deleted: false}, nil
 			},
 		}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
ackFn: func(context.Context, AckSuccessParams) (AckSuccessResult, error) {
return AckSuccessResult{Deleted: false}, nil
},
ackFn: func(context.Context, AckSuccessParams) (AckSuccessResult, error) {
return AckSuccessResult{Owned: true, Deleted: false}, nil
},
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@apps/workspace-engine/pkg/reconcile/worker_test.go` around lines 522 - 524,
The mock ackFn in the subtest returns AckSuccessResult{Deleted: false} but
leaves Owned as the zero value; update the mock return to
AckSuccessResult{Owned: true, Deleted: false} in the ackFn used by this subtest
so it matches the intended regression shape (Owned=true, Deleted=false) when
invoking the function under test (ackFn with AckSuccessParams).

}
w, _ := NewWorker("workqueue-worker", q, fakeProcessor{}, cfg7)
w.processClaimedItem(context.Background(), item)
if dropped.Load() != 1 {
t.Fatalf("expected OnDropped called once, got %d", dropped.Load())
}
if processed.Load() != 0 {
t.Fatalf("expected OnProcessed NOT called, got %d", processed.Load())
}
if droppedErr == nil || !strings.Contains(droppedErr.Error(), "Deleted=false") {
t.Fatalf("expected error mentioning Deleted=false, got %v", droppedErr)
}
})
}

func TestRetryBackoff(t *testing.T) {
Expand Down
5 changes: 2 additions & 3 deletions apps/workspace-engine/pkg/reconcile/workqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,8 @@ type ExtendLeaseParams struct {
}

type AckSuccessParams struct {
ItemID int64
WorkerID string
ClaimedUpdatedAt time.Time
ItemID int64
WorkerID string
}

type AckSuccessResult struct {
Expand Down
14 changes: 10 additions & 4 deletions apps/workspace-engine/test/controllers/harness/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package harness

import (
"context"
"fmt"
"time"

"workspace-engine/pkg/reconcile"
Expand Down Expand Up @@ -72,11 +73,16 @@ func DrainQueue(
})
}

_, _ = queue.AckSuccess(ctx, reconcile.AckSuccessParams{
ItemID: item.ID,
WorkerID: testWorkerID,
ClaimedUpdatedAt: item.UpdatedAt,
ackResult, ackErr := queue.AckSuccess(ctx, reconcile.AckSuccessParams{
ItemID: item.ID,
WorkerID: testWorkerID,
})
if ackErr != nil {
return res, ackErr
}
if !ackResult.Deleted {
return res, fmt.Errorf("ack reported owned but did not delete item %d", item.ID)
}
res.Processed++
}
}
Expand Down
Loading