UN-3680 [FIX] PG queue — seam-audit fixes: poison identity, ack/release retries, callback duplicate guard, compose VT#2144
Conversation
…se retries, callback duplicate guard, compose VT Post-merge seam audit of the PG-queue epic surfaced interaction bugs between individually-reviewed features. Runtime (OSS) findings, all PG-gated with a Celery no-op proven by test. Chart-side fixes ship as a separate unstract-cloud PR (repo boundary). A1 (blocker) — poison drop couldn't identify orchestration messages. The backend dispatches async_execute_bin with execution_id POSITIONALLY (args[2]); the poison-drop identity reader only read kwargs, so a poisoned orchestration (a circuit-breaker-open drop, BEFORE the barrier is armed and BEFORE the claim is taken) had no barrier row and no claim row — invisible to every reaper sweep, a permanent silent strand. _pipeline_identity now takes a positional fallback keyed by task name (async_execute_bin: args[2]=execution_id, args[0]=org). A2 — consumer ack (client.delete) lacked the reconnect-retry every producer-side write already had. The consumer connection idles the whole task wall-clock, so the ack is the most likely statement to meet a PgBouncer-reaped connection; a lost ack redelivers an already-completed message (duplicate work / double-fired callback). One-shot reused-gate retry; the DELETE is idempotent so it is safe even in the ambiguous post-commit case (unlike send's at-least-once INSERT). A3 — release_orchestration_claim lacked the retry. It is a first-write-after-idle on the failure path whose raise the caller swallows, so an idle-reap left the claim committed and suppressed every redelivery. Wrapped in the existing _run_idempotent_pre_dispatch_write helper (the DELETE is idempotent). A4 — docker-compose PG consumers at the 30s vt default. The callback + both orchestrators + scheduler had no vt env → a task longer than 30s redelivered mid-run. Added vt/health-stale (env-overridable), mirroring the k8s chart: orchestrators 600/660, callback 3660/3720, scheduler 180/240. fileproc/executor left as-is (correctly sized to the executor RPC timeout, EXECUTOR_RESULT_TIMEOUT). L1 — orchestration-entry terminal skip (defense-in-depth). The claim tombstone normally blocks re-orchestrating a COMPLETED execution; if it was GC'd (e.g. a lowered stuck-timeout) a redelivery could re-arm/re-dispatch a finished execution (duplicate destination writes when use_file_history=False). Skip if already terminal, keeping the freshly-won claim so it re-establishes the tombstone. PG-gated; Celery has no redelivery so it is a no-op there. H1 (blocker) — the aggregating callback re-ran webhooks + subscription-usage billing wholesale on redelivery (the send commit-retry double-enqueue, an idle-reaped ack, a vt overrun). Added a terminal re-check that skips the side effects when a prior delivery already finalized the execution. PG-gated via a _pg_transport marker the barrier stamps on the PG callback dispatch (PG_TRANSPORT_CALLBACK_KWARG); the Celery .link path never injects it, so the guard is a strict no-op on Celery. Tests: 1223 unit + real-Postgres integration pass; pre-commit clean. Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
|
Important Review skippedAuto reviews are disabled on base/target branches other than the default branch. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
muhammad-ali-e
left a comment
There was a problem hiding this comment.
Automated PR review — PG-queue seam-audit (UN-3680)
Reviewed with the PR Review Toolkit (code review, silent-failure, type design, test coverage, comments, simplification). The runtime changes are correct, well PG-gated, and each seam has targeted Celery no-op coverage — the six agents found no blocker/high correctness defect. The inline comments below are hardening / test-quality / accuracy improvements, most Low, one Medium (the duplicate-guard's broad except). None need block merge; the Medium and the two delete() test-parity gaps are the ones most worth addressing.
Verified against the real dispatch sites: the positional-identity map (2,0) matches async_execute_bin's args=[schema, workflow_id, execution_id, hash], the delete() retry faithfully mirrors send(), and the terminal guard keeps the claim correctly.
| return False | ||
| status = (response.data.get("execution") or {}).get("status") | ||
| return status in ExecutionStatus.terminal_values() | ||
| except Exception: |
There was a problem hiding this comment.
[Medium — silent failure] The except Exception: … return False catches every error, so a programming error (e.g. response.data is None → AttributeError, or a future rename of terminal_values()) silently degrades this guard to a permanent no-op — re-enabling the exact double-webhook / double-billing this feature prevents, with only a WARNING that's indistinguishable from a transient "API down".
Fix: narrow the catch to the expected transport/HTTP failure types (so a genuinely unreadable backend still proceeds), and/or log the unexpected case at ERROR so a persistently-disabled guard is visible rather than blending into transient noise.
| # Skip a redelivered callback whose execution a prior delivery already | ||
| # finalized — don't re-fire webhooks / re-count billing. (Returns | ||
| # through the outer finally, which closes the api_client.) | ||
| if is_pg and _pg_execution_already_finalized( |
There was a problem hiding this comment.
[Low — design edge] _pg_execution_already_finalized treats any status in terminal_values() = {COMPLETED, STOPPED, ERROR} as "a prior callback finalized this". But STOPPED/ERROR can be set by other paths (external stop, upstream error) — so the first, legitimate callback delivery for such an execution would also skip its side effects, including _cleanup_execution_resources. The docstring's premise ("the callback is what SETS the terminal status") only strictly holds for COMPLETED.
Consider: gating the skip on COMPLETED-only, or confirm that skipping cleanup/notify for an externally-stopped/errored run is intended (re-firing webhooks on a stopped run is admittedly also undesirable — hence Low).
|
|
||
| # PG at-least-once duplicate guard (see _pg_execution_already_finalized). | ||
| # Popped BEFORE parameter extraction so the marker never flows into the context. | ||
| is_pg = bool(kwargs.pop(PG_TRANSPORT_CALLBACK_KWARG, False)) |
There was a problem hiding this comment.
[Low — test gap] The comment promises the marker is "Popped BEFORE parameter extraction so the marker never flows into the context", but no test asserts this: TestCoreCallbackGuard patches out _extract_callback_parameters entirely. A reorder of the pop (or forgetting it on the _api path at line 1632) would leak _pg_transport into downstream task kwargs and pass CI.
Fix: in test_pg_non_terminal_proceeds, assert PG_TRANSPORT_CALLBACK_KWARG not in _extract_callback_parameters.call_args.kwargs.
| f"(status update, subscription billing, webhooks)." | ||
| ) | ||
| return { | ||
| "status": "skipped_duplicate_callback", |
There was a problem hiding this comment.
[Low — DRY] This skipped_duplicate_callback result dict (+ the redundant duplicate_callback_skipped: True flag) is byte-identical to the one at line 1675, and general/tasks.py:328 builds a sibling skipped_terminal_execution dict — three hand-built contracts with magic-string statuses that can drift. Any monitoring keying on these fields would break on one path only.
Fix: a one-line helper _skipped_duplicate_callback(execution_id) (and/or module/enum constants for the status strings) shared by both callback sites.
| reused = self._conn is not None and self._owns_conn | ||
| try: | ||
| return self._delete_row(msg_id) | ||
| except _CONN_DEAD_ERRORS as exc: |
There was a problem hiding this comment.
[Low — test parity gap] send() has test_reused_conn_non_connection_error_not_retried pinning this narrowed except _CONN_DEAD_ERRORS, but the new TestDeleteReconnectRetry omits it. If someone broadened this to a bare except Exception, a logic bug would be silently retried with no test catching it.
Fix: add a test — reused conn raises RuntimeError("logic") → pytest.raises(RuntimeError), factory.assert_not_called(), sleep.assert_not_called().
|
|
||
| # Callback kwarg the PG barrier stamps on the aggregating callback's dispatch so | ||
| # the callback can PG-gate its at-least-once duplicate guard (see | ||
| # _fire_barrier_callback). Matched by the same literal in callback/tasks.py — a |
There was a problem hiding this comment.
[Low — comment accuracy] This says the kwarg is "Matched by the same literal in callback/tasks.py — a drift is caught by the callback's PG-guard test". But callback/tasks.py:11 does from queue_backend.pg_barrier import PG_TRANSPORT_CALLBACK_KWARG and uses that symbol at both sites — there is no second "_pg_transport" literal anywhere (grep-confirmed), so drift is structurally impossible and the described failure mode can't occur.
Fix: reword to reflect reality, e.g. "Imported (not re-typed) by callback/tasks.py so producer and consumer can't drift. Underscore-prefixed so it can't collide with a real task kwarg."
| # is invisible to every reaper sweep. Without recovering the execution_id here the | ||
| # poison drop can only bare-delete → the execution hangs EXECUTING forever with no | ||
| # handle. ``args[0]`` (org schema) doubles as the org. | ||
| _POSITIONAL_IDENTITY_ARGS: dict[str, tuple[int, int]] = { |
There was a problem hiding this comment.
[Low — invariant not in types] The load-bearing invariant "args[2] is execution_id, args[0] is org schema" lives only as this hand-maintained tuple + comment, unbound to the actual async_execute_bin dispatch site (args=[schema, workflow_id, execution_id, hash]). Verified correct today, but a future reorder of the dispatch args would silently make the poison-drop path mark the wrong execution ERROR (or swap org/exec) with no test or type failure.
Fix (optional hardening): derive the indices from a single shared arg-layout definition (NamedTuple / index enum) imported by both the enqueue site and here, so they can't drift independently.
| return (execution_id, organization_id) | ||
|
|
||
|
|
||
| def _pipeline_identity( |
There was a problem hiding this comment.
[Low — API design] task_name is threaded in as a second, defaulted parameter that duplicates payload["task_name"] (already present in the payload and used elsewhere). The two can diverge, and the None default silently makes the positional fallback inert — an implicit "caller must pass the same task_name that's in the payload" precondition that isn't enforced.
Fix: read payload.get("task_name") inside _positional_identity; if a decoupled override is genuinely wanted, name it explicitly (task_name_override) rather than a silently-defaulting positional. (Prod call site at line 555 does pass it, so no runtime bug today.)
| import pytest | ||
|
|
||
| with pytest.raises(AssertionError, match="side effects must not run"): | ||
| _, finalized, _ = self._run(is_pg=False, terminal=True) |
There was a problem hiding this comment.
[Low — test asserts less than it claims] self._run(...) raises the AssertionError sentinel before returning, so _, finalized, _ = self._run(...) never unpacks and the captured finalized mock is never asserted on — the comment's claim "finalized never called" is unverified. A regression where the Celery path performs the extra _pg_execution_already_finalized round-trip (but still proceeds) would pass.
Fix: patch _pg_execution_already_finalized at test scope, catch the sentinel, then finalized.assert_not_called(). (The test does still catch a wrongful Celery skip — this closes the narrower round-trip gap.)
| # update status, push destination, notify. Without an explicit vt it defaults | ||
| # to 30s and a >30s callback redelivers — re-firing webhooks + subscription | ||
| # billing. 3660/3720 mirrors the k8s chart. | ||
| - WORKER_PG_QUEUE_CONSUMER_VT_SECONDS=${WORKER_PG_CALLBACK_VT_SECONDS:-3660} |
There was a problem hiding this comment.
[Low — comment rot risk] Four comments assert the vt pairs (600/660, 3660/3720, 180/240) "mirror the k8s chart", but there's no Helm chart in this repo (the chart ships in a separate repo per the PR description), so a maintainer here can't cross-check the numbers and they'll silently rot if the chart's values change — with no test guarding the parity.
Fix: name the authoritative source once (external repo/path or values key) and reference it, or soften to "match our k8s deployment defaults" so it doesn't read as a checkable in-repo invariant.
|
| Filename | Overview |
|---|---|
| workers/queue_backend/pg_queue/consumer.py | Adds _positional_identity fallback for async_execute_bin poison drops; bounds/type-checked before indexing; correctly wired into _pipeline_identity as last-resort. |
| workers/queue_backend/pg_queue/client.py | Adds one-shot reconnect-retry to delete() mirroring the existing send() retry; correctly gated on reused-owned connection only; idempotent DELETE makes the retry safe even post-commit. |
| workers/queue_backend/pg_barrier.py | Wraps release_orchestration_claim in the existing idempotent retry helper; introduces PG_TRANSPORT_CALLBACK_KWARG constant shared between producer and consumer to prevent drift; injects marker on PG callback dispatch without mutating the shared descriptor. |
| workers/callback/tasks.py | Adds H1 duplicate guard to both callback entry points using cached execution_status (no extra round-trip); marker popped before context extraction; guard correctly skips COMPLETED only, leaving ERROR/STOPPED to the first legitimate callback. |
| workers/general/tasks.py | Adds L1 terminal-status guard after winning the orchestration claim; returns without releasing the claim (tombstone re-established); correctly PG-gated so the Celery path is unaffected. |
| docker/docker-compose.yaml | Adds VT/health-stale env vars to four previously unset services (orchestrator-api, orchestrator-general, callback, scheduler); values match k8s chart defaults and are env-overridable. |
| workers/tests/test_pg_callback_duplicate_guard.py | New test file covering H1: predicate unit tests, core callback guard (PG/Celery/status variants), API callback guard; correctly proves marker is popped before extraction and that Celery path is a strict no-op. |
| workers/tests/test_pg_queue_client.py | Adds TestDeleteReconnectRetry covering reused-conn retry, fresh-conn no-retry, injected-conn no-retry, non-connection error propagation, and bounded single-retry semantics. |
| workers/tests/test_pg_queue_consumer.py | Adds positional identity tests (happy path, short args, non-sequence args) and an end-to-end poison-drop test confirming exec-9 is recovered and marked ERROR from positional args. |
| workers/tests/test_pg_barrier.py | Adds TestReleaseOrchestrationClaimRetry (retry on reaped conn, propagation of non-connection errors) and two marker-injection tests for _fire_barrier_callback (PG tags, Celery omits). |
| workers/tests/test_orchestration_idempotency.py | Adds L1 tests: terminal status skips and keeps claim (tombstone re-established), and Celery path reaches past the guard (no regression). |
Sequence Diagram
%%{init: {'theme': 'neutral'}}%%
sequenceDiagram
participant Backend
participant PGQueue as PG Queue
participant Consumer as PgQueueConsumer
participant Orchestrator as async_execute_bin
participant Barrier as PgBarrier
participant Callback as process_batch_callback
Backend->>PGQueue: "send(async_execute_bin, args=[org, wf, exec_id, files])"
Consumer->>PGQueue: "read(vt=600s)"
alt "Poison drop (read_ct > max_attempts)"
Consumer->>Consumer: _pipeline_identity positional fallback args[2] A1
Consumer->>Backend API: mark_execution_error(exec_id, org)
Consumer->>PGQueue: delete(msg_id) with retry on dead conn A2
else Normal dispatch
Consumer->>Orchestrator: run task in-process
Orchestrator->>Orchestrator: try_claim_orchestration(exec_id)
alt Already terminal L1 guard
Orchestrator-->>Orchestrator: return skipped_terminal_execution claim kept as tombstone
else Proceed
Orchestrator->>Barrier: arm_barrier(exec_id, N_files)
Orchestrator->>PGQueue: dispatch N x process_file_batch
Barrier->>Barrier: decrement on each file completion
Barrier->>PGQueue: "dispatch callback with _pg_transport=True H1 marker"
end
alt Error path
Orchestrator->>Barrier: release_orchestration_claim with retry on dead conn A3
end
Consumer->>PGQueue: delete(msg_id) with retry on dead conn A2
end
Callback->>Callback: pop _pg_transport marker
Callback->>Backend API: get_workflow_execution status check
alt Already COMPLETED H1 guard
Callback-->>Callback: return skipped_duplicate_callback
else Proceed
Callback->>Backend API: update status plus webhooks plus billing
end
%%{init: {'theme': 'base', 'themeVariables': {"darkMode": true, "background": "#0d1117", "primaryColor": "#21262d", "primaryTextColor": "#e6edf3", "primaryBorderColor": "#8b949e", "lineColor": "#8b949e", "textColor": "#e6edf3", "edgeLabelBackground": "#161b22", "actorBkg": "#21262d", "actorBorder": "#8b949e", "actorTextColor": "#e6edf3", "actorLineColor": "#8b949e", "signalColor": "#8b949e", "signalTextColor": "#e6edf3", "noteBkgColor": "#373320", "noteBorderColor": "#d4a72c", "noteTextColor": "#f0e6c0", "labelBoxBkgColor": "#21262d", "labelBoxBorderColor": "#8b949e", "labelTextColor": "#e6edf3", "loopTextColor": "#e6edf3", "activationBkgColor": "#30363d", "activationBorderColor": "#8b949e"}}}%%
sequenceDiagram
participant Backend
participant PGQueue as PG Queue
participant Consumer as PgQueueConsumer
participant Orchestrator as async_execute_bin
participant Barrier as PgBarrier
participant Callback as process_batch_callback
Backend->>PGQueue: "send(async_execute_bin, args=[org, wf, exec_id, files])"
Consumer->>PGQueue: "read(vt=600s)"
alt "Poison drop (read_ct > max_attempts)"
Consumer->>Consumer: _pipeline_identity positional fallback args[2] A1
Consumer->>Backend API: mark_execution_error(exec_id, org)
Consumer->>PGQueue: delete(msg_id) with retry on dead conn A2
else Normal dispatch
Consumer->>Orchestrator: run task in-process
Orchestrator->>Orchestrator: try_claim_orchestration(exec_id)
alt Already terminal L1 guard
Orchestrator-->>Orchestrator: return skipped_terminal_execution claim kept as tombstone
else Proceed
Orchestrator->>Barrier: arm_barrier(exec_id, N_files)
Orchestrator->>PGQueue: dispatch N x process_file_batch
Barrier->>Barrier: decrement on each file completion
Barrier->>PGQueue: "dispatch callback with _pg_transport=True H1 marker"
end
alt Error path
Orchestrator->>Barrier: release_orchestration_claim with retry on dead conn A3
end
Consumer->>PGQueue: delete(msg_id) with retry on dead conn A2
end
Callback->>Callback: pop _pg_transport marker
Callback->>Backend API: get_workflow_execution status check
alt Already COMPLETED H1 guard
Callback-->>Callback: return skipped_duplicate_callback
else Proceed
Callback->>Backend API: update status plus webhooks plus billing
end
Reviews (2): Last reviewed commit: "UN-3680 [FIX] Address PR review — Sonar ..." | Re-trigger Greptile
…rdening (COMPLETED-only, reuse status, narrow the catch), test parity SonarCloud (Critical) + PR Review Toolkit + Greptile on #2144. - Sonar (Critical) — _positional_identity could IndexError on a short/non-list args. Now bounds- AND type-checks (isinstance + len <= max(idx)) before ANY indexing, returning (None, None) instead. Also drops the redundant `task_name` param (toolkit): it reads the payload's own `task_name` (can't drift from it, no silently-inert None default). +test for non-sequence args. - Callback duplicate guard (Greptile double-fetch + toolkit Medium + Low): * Greptile — the core guard fetched status a SECOND time though _extract_callback_parameters already fetched the execution. Now stores it on context.execution_status (set once at the both-paths convergence) and the guard reads that — no extra round-trip. Replaces the fetch-helper (and its broad `except Exception` that could silently degrade the guard on a programming error — the Medium) with a pure predicate _callback_already_ran. * Low (design edge) — gate on COMPLETED ONLY, not all terminal_values(): ERROR/STOPPED can be set by other paths (external stop, reaper), so the first legitimate callback for such an execution must still run its side effects. COMPLETED is the unambiguous "a prior callback finalized this". * DRY — shared _skipped_duplicate_callback() result for both callback sites. - pg_barrier comment (Low) — PG_TRANSPORT_CALLBACK_KWARG is IMPORTED by callback/tasks.py, not a re-typed literal; reworded (no drift possible). - compose (Low) — "mirrors the k8s chart" softened (no chart in this repo). - Tests (Low parity gaps): delete() gains non-connection-error-not-retried + retry-reraises-once (send() parity); core guard asserts the marker is popped before extraction and that the Celery path never skips. 1230 unit + integration pass; pre-commit clean. Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
Review feedback addressed (commit 273e9dd)Thanks — all actionable items fixed. Highlights: SonarCloud (Critical) — Callback duplicate guard — combined Greptile's double-fetch + the Medium + the COMPLETED-only edge into one cleaner shape:
Comments ✅ Test parity (Low) ✅ 1230 unit + real-PG integration pass; pre-commit clean. (Deferred, noted in the PR: the concurrent-redelivery race and the cross-file status-string constants — small follow-ups, not blockers.) |
|
746f943
into
feat/UN-3445-pg-queue-integration



What
Post-merge seam audit of the PG-queue epic surfaced interaction bugs between individually-reviewed features — the class no single-PR review could see. This closes the runtime (OSS) findings before the UN-3675 load-test gate. Every runtime change is PG-gated with a Celery no-op proven by test. The chart-side vt/runbook fixes ship as a separate unstract-cloud PR (repo boundary).
Fixes
async_execute_binwithexecution_idpositionally (args[2]); the poison-drop identity reader only read kwargs. A poisoned orchestration (a circuit-breaker-open drop, before the barrier is armed and before the claim is taken) had no barrier row and no claim row → invisible to every reaper sweep, a permanent silent strand._pipeline_identitynow takes a positional fallback keyed by task name.client.delete) lacked the reconnect-retry every producer-side write already had. The consumer connection idles the whole task wall-clock, so the ack is the likeliest statement to meet a reaped connection; a lost ack redelivers an already-completed message. One-shot reused-gate retry; the DELETE is idempotent (safe even post-commit, unlikesend's at-least-once INSERT).release_orchestration_claimlacked the retry. A first-write-after-idle on the failure path whose raise the caller swallows → an idle-reap left the claim committed and suppressed every redelivery. Wrapped in the existing idempotent retry helper.EXECUTOR_RESULT_TIMEOUT).sendcommit-retry double-enqueue, an idle-reaped ack, a vt overrun). Added a terminal re-check that skips the side effects when a prior delivery already finalized the execution. PG-gated via a_pg_transportmarker the barrier stamps on the PG callback dispatch; the Celery.linkpath never injects it → strict no-op.Celery safety
Every runtime change gates on the PG transport (
is_pg_transport/ the barrier marker) and ships with a Celery-no-op test. The full workers unit lane is untouched.Testing
test_exactly_one_task_fires_the_callback) is the pre-existing UN-3676 shared-lane flaky (passes solo).Notes / follow-ups (filed separately, don't block)
process_file_batchiterates files serially atEXECUTOR_TIMEOUT, and the PG path'stask.apply()doesn't enforce the Celerytime_limitSIGKILL — so a batch can run toward the task time limit. This validates the separate chart PR's fileproc-vt bump.process_file_batchendpoint, CIREQUIRE_PG_TESTS+ real-PG tests for the newest guards, rollback-drain runbook.🤖 Generated with Claude Code