feat(queue_storage): queue_terminal_live_counts + full lifecycle wiring (#290 PR A)#304
Conversation
…ring (#290 PR A1) First slice of the #290 live terminal counter work. Adds the new `queue_terminal_live_counts` table and wires the increment side of the bookkeeping. Read switch (queue_counts_exact), decrement wiring at terminal-delete paths, prune fold, and benchmarks land in follow-on PRs (A2, B, C). Design decisions are recorded at #290 (comment). ### Schema `queue_terminal_live_counts` is keyed by `(ready_slot, queue, priority, enqueue_shard)` — the same identity the ring + ADR-025 shards already use. One counter row per group means a skewed batch of N terminal inserts costs at most one UPSERT per touched tuple, not one per row. fillfactor=50 + tuned autovacuum match the existing hot-counter pattern on `queue_claimer_leases`. Backfill from existing `done_entries` runs in the same `prepare_schema` transaction that creates the table. On a fresh install the backfill is a no-op; on an upgrade it builds the counters from the current `done_entries` population before any new path reads them. `ON CONFLICT DO NOTHING` keeps the backfill idempotent across re-runs. ### Writes wired Two surfaces, both increment via `increment_live_terminal_counters_tx`: - `insert_done_rows_tx` — the canonical helper used by `complete_claimed_batch` (and by retry/cancel/fail terminal paths that hydrate through `into_done_row`). Aggregates the batch by group in memory, then one UPSERT. - The fused receipt fast path in `complete_receipt_runtime_batch_fast` — adds a `counter_upsert` data-modifying CTE that GROUP BYs the `terminal` CTE's `RETURNING` and UPSERTs in the same statement. Postgres executes data-modifying CTEs unconditionally, so the outer SELECT doesn't need to reference it. ### Decrement helper `decrement_live_terminal_counters_tx` is defined but `#[allow(dead_code)]` in this PR. PR A2 will wire it at every terminal-delete site (retry-from-terminal, single/bulk DLQ move, discard, SQL compat deletes). Placing the helper now means A2 is a pure wiring change. ### Truncate list `queue_terminal_live_counts` added to the test-reset TRUNCATE block. ### Invariant test Two new tests in `queue_storage_runtime_test.rs`: - `test_queue_terminal_live_counts_matches_done_entries_via_insert_helper` exercises `insert_done_rows_tx` via `complete_batch` and asserts `SUM(live_terminal_count) FROM queue_terminal_live_counts WHERE queue = $1` equals `count(*) FROM done_entries WHERE queue = $1` after one and then two batches. - `test_queue_terminal_live_counts_matches_done_entries_via_receipt_fast_path` configures `lease_claim_receipts: true` and exercises the fused CTE path, asserting the same invariant. Both tests deliberately avoid any terminal-delete path because the decrement side hasn't landed yet; the invariant holds on the increment side alone in this PR. Refs #290. Follow-ups in this series: PR A2 (decrement wiring), PR B (read switch + prune fold), PR C (bench + docs).
|
Warning Review limit reached
More reviews will be available in 43 minutes and 21 seconds. Learn how PR review limits work. Your organization has run out of usage credits. Purchase more in the billing tab. ⌛ How to resolve this issue?After more reviews become available, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans include higher PR review limits than trial, open-source, and free plans. In all cases, reviews become available again over time. During sustained high-volume PR review activity, CodeRabbit may temporarily slow when the next review becomes available. Please see our Fair Usage Limits Policy for further information. ℹ️ Review info⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (5)
📝 WalkthroughWalkthroughAdds a denormalized per-group counter table ChangesQueue Terminal Live Counts Counter
Sequence DiagramssequenceDiagram
participant Test as Integration Test / CLI
participant QueueStorage as QueueStorage
participant Done as done_entries
participant Counter as queue_terminal_live_counts
participant DB as Postgres
Test->>QueueStorage: complete_batch / complete_runtime_batch / RebuildTerminalCounters
QueueStorage->>DB: INSERT done_entries (batch or receipt CTE)
DB->>Done: persist rows
QueueStorage->>DB: aggregated UPSERT into queue_terminal_live_counts (same txn or helper)
DB->>Counter: increment live_terminal_count per group
QueueStorage->>DB: delete paths (retry / dlq / discard / prune) -> aggregated decrement
DB->>Counter: decrement live_terminal_count (clamped to >=0)
Test->>DB: SELECT SUM(live_terminal_count), SELECT COUNT(*) FROM done_entries
Test->>Test: assert equality or run rebuild -> re-aggregate from done_entries
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related issues
Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: db7314eabf
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| let mut priorities: Vec<i16> = Vec::with_capacity(by_group.len()); | ||
| let mut enqueue_shards: Vec<i16> = Vec::with_capacity(by_group.len()); | ||
| let mut deltas: Vec<i64> = Vec::with_capacity(by_group.len()); | ||
| for ((slot, queue, prio, shard), delta) in by_group { |
There was a problem hiding this comment.
Sort counter upserts before locking rows
When a completion batch spans more than one counter key, this iterates a HashMap, so the rows passed to the ON CONFLICT DO UPDATE are in arbitrary order. PostgreSQL acquires the conflicting counter-row locks as it processes the input, so two concurrent completions touching the same keys can lock them in opposite orders and deadlock; the receipt fast path has the same issue because its GROUP BY source is unordered. Please sort the grouped counter keys before binding/upserting, and add an ORDER BY to the CTE path, so all writers acquire these hot-counter locks consistently.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Actionable comments posted: 3
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@awa-model/src/queue_storage.rs`:
- Around line 4996-5005: The insert-side increment implemented in
increment_live_terminal_counters_tx will drift because several deletion/prune
paths (retry_job_tx, move_failed_to_dlq, bulk_move_failed_to_dlq,
discard_failed_by_kind, prune_oldest) remove terminal rows without updating the
denormalised queue_terminal_live_counts; update each of these code paths to
decrement or adjust queue_terminal_live_counts consistently when they remove
terminal done_entries (or batch-adjust where appropriate), ensuring the same
grouping key logic (ready_slot, queue, priority, enqueue_shard) used by
increment_live_terminal_counters_tx is applied so UPSERTs/aggregates keep counts
in sync with deletes.
- Around line 3419-3448: The backfill INSERT into queue_terminal_live_counts
using ON CONFLICT DO NOTHING can leave counts permanently low during rolling
upgrades because older writers may add done_entries after this single snapshot;
change the INSERT conflict handling to upsert by adding the incoming count to
the existing live_terminal_count (e.g. ON CONFLICT (ready_slot, queue, priority,
enqueue_shard) DO UPDATE SET live_terminal_count =
queue_terminal_live_counts.live_terminal_count + EXCLUDED.live_terminal_count)
so prepare_schema() (the code block using install_tx and the INSERT from
done_entries) reconciles missed groups instead of ignoring them.
In `@awa/tests/queue_storage_runtime_test.rs`:
- Around line 4832-4857: The test enables lease_claim_receipts but never asserts
the fast-path was actually used; before calling complete_runtime_batch add an
assertion that the Claim/ClaimedBatch returned by claim_runtime_batch is
receipt-backed (e.g. check the receipt flag or variant on the claimed value) and
also assert that no lease row was materialized (e.g. verify lease is None or
that lease_id/lease_timestamp fields are unset) so the test fails if
complete_runtime_batch falls back to the generic insert path; locate these
checks around the returned value from claim_runtime_batch and before invoking
complete_runtime_batch (functions: create_store_with_config,
claim_runtime_batch, complete_runtime_batch, and any receipt/lease fields or
helper like receipt_fast_complete_candidate).
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 3b247c28-5190-4f75-a866-e7210eeedd8a
📒 Files selected for processing (2)
awa-model/src/queue_storage.rsawa/tests/queue_storage_runtime_test.rs
#290) Addresses three reviewer findings on the original PR A1: **High: stale counter state risk between A1 and A2/B** The original split (A1 = inserts only, A2 = deletes later) would have persisted counter state that drifts the moment any normal delete or prune ran. Folding the full lifecycle into this PR keeps the counter in lockstep with `done_entries` at all times. Reviewer's exact words: "I'd either wire decrement/prune-fold in this PR, defer writing counters until the full lifecycle is wired, or make the read-switch PR reset/rebuild from done_entries under a clear upgrade-safe boundary." This commit goes with option 1. Decrement wired at the 5 terminal-removal surfaces: - `retry_job_tx` — retry-from-terminal (`Some(DoneJobRow)` after the DELETE FROM done_entries). - `move_failed_to_dlq` — single DLQ move (`Some(DoneJobRow)`). - `bulk_move_failed_to_dlq` — bulk DLQ move (`Vec<DoneJobRow>`). - `discard_failed_by_kind` — discard (`Vec<DoneJobRow>`). - `prune_oldest` — folds `SUM(live_terminal_count) WHERE ready_slot = $slot` into `queue_terminal_rollups.pruned_completed_count` via the existing `adjust_terminal_rollups_batch` helper, then `DELETE FROM queue_terminal_live_counts WHERE ready_slot = $slot` — all in the same transaction as the partition `TRUNCATE`. The fold now reads from the counter rather than re-scanning `{done_child}`, which also removes one O(rows-in-slot) scan from the prune path. SQL `DELETE FROM awa.jobs` compat layer was checked — there are no triggers/functions routing it into `done_entries`, so nothing else to wire on that side. **Medium: counter upserts can deadlock under multi-key batches** The Rust helper iterated a `HashMap` (non-deterministic key order) and the fused CTE's `GROUP BY` had no `ORDER BY`. Two concurrent batches upserting the same set of counter keys in different orders would acquire row locks in opposite order and deadlock on the ON CONFLICT DO UPDATE. Fixed by sorting in both places: helper now uses `BTreeMap` for deterministic iteration; the fused CTE wraps its GROUP BY in a subquery with `ORDER BY ready_slot, queue, priority, enqueue_shard` before the ON CONFLICT. The decrement helper gets the same treatment. **Low: receipt fast-path test could pass without exercising the fast path** The original test enabled `lease_claim_receipts: true` and called `complete_runtime_batch`, but didn't assert that the claimed entries were actually receipt-backed. If `receipt_fast_complete_candidate`'s eligibility ever tightened, the test would silently fall through to the non-fused helper and the new CTE `counter_upsert` stage would go untested. Test now asserts on every claimed entry: - `lease_claim_receipt == true` - the full `receipt_fast_complete_candidate` predicate (`unique_key.is_none() && tags.empty() && errors.empty()`) Two additional invariant tests: - `test_queue_terminal_live_counts_decrement_on_terminal_delete_paths` — seeds terminal rows directly, then exercises `move_failed_to_dlq`, `bulk_move_failed_to_dlq`, `discard_failed_by_kind`, and `retry_job`, asserting `SUM(live_terminal_count) == count(*) FROM done_entries` after each. - `test_queue_terminal_live_counts_prune_folds_into_rollups` — completes 5 rows, rotates, prunes; asserts `done_entries` empty, live counter empty for that slot, and `queue_terminal_rollups` absorbed the 5. `decrement_live_terminal_counters_tx` is no longer `#[allow(dead_code)]`. Read-switch in `queue_counts_exact` + `QueueCounts.completed → .terminal` rename still come in a follow-on PR.
…d CLI (#290) Addresses two High findings on the prior #304 push: **High 1: SQL compat deletes bypassed the counter** `DELETE FROM awa.jobs WHERE id = $1` routes to `awa.delete_job_compat()`. The done_entries branch deleted directly from the table without decrementing `queue_terminal_live_counts`. Reviewer reproduced on v021 head: `DELETE FROM awa.jobs WHERE id = <terminal>` dropped done_entries from 5 → 4 while the live counter stayed at 5. Once queue_counts_exact (#305) reads the counter, the drift becomes operator-visible; before that, prune_oldest folding from the counter could bake the drift into queue_terminal_rollups permanently. Fixed via a new v022 migration that reshapes the done_entries branch of delete_job_compat to return ready_slot + enqueue_shard alongside the existing columns and decrement the counter for the deleted row in the same statement. **High 2: backfill not rollout-safe → prune folded from drifted counter** The prepare_schema backfill uses ON CONFLICT DO NOTHING, so if older binaries insert terminal rows after the first new prepare_schema snapshot, later prepares won't reconcile existing counter groups. With #304's prune-fold-from-counter, missed rows would be folded as zero and disappear from completed counts after prune — a permanent rollup undercount. The reviewer's preferred shape was "an explicit upgrade/rebuild boundary or a reconciliation path under write quiescence". This commit chooses the reconciliation path: 1. Revert prune fold to scan-based (the original safe path). `queue_terminal_rollups` is *permanent* state, so it MUST be folded from ground truth — `count(*) FROM {done_child}` — not from the potentially-drifted counter. The counter rows for the slot are still cleaned up after the partition truncate, so the counter table doesn't accumulate orphans. 2. Reads from the counter (queue_counts_exact in #305) may transiently disagree with the rollup until the operator runs the rebuild command below. That's acceptable: the visible-but-stale number is recoverable, vs. silent rollup corruption which is not. 3. New `QueueStorage::rebuild_terminal_counters` + CLI subcommand `awa storage rebuild-terminal-counters`: TRUNCATEs the counter table and re-aggregates from done_entries under an advisory lock scoped to the queue-storage schema. Operators run this after upgrading from a pre-#290 fleet, after any incident that may have left the counter inconsistent, or as routine drift-recovery before relying on counter-fed reads for billing-grade accuracy. ### Tests Two new tests: - `test_queue_terminal_live_counts_decrement_on_sql_compat_delete` exercises `SELECT awa.delete_job_compat(<id>)` after seeding 5 terminal rows in a test-only schema (the test temporarily points `awa.storage_transition_state` at its schema for the call, then restores; protected by `QUEUE_STORAGE_RUNTIME_LOCK` against interleaving). Asserts the invariant after the SQL-compat delete. - `test_queue_terminal_live_counts_rebuild_restores_invariant` seeds 7 terminal rows, manually poisons the counter to simulate rollover drift (999 vs 7), runs rebuild, asserts the invariant is back. All 6 #290 invariant tests pass (the 4 from prior commits plus these 2). ### Refs Refs #290. With these fixes + #305's read switch, the headline regressor in #169's long_horizon evidence — the O(done_entries) scan inside queue_counts_exact — is gone, and the counter has both a SQL-layer maintenance path and an operator-driven recovery mechanism. PRs left in the series: bench (PR C) and docs (PR D).
There was a problem hiding this comment.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
awa-model/src/queue_storage.rs (1)
3973-3989:⚠️ Potential issue | 🟠 Major | ⚡ Quick winReset still leaves stale queue head state behind.
reset()now clearsqueue_terminal_live_counts, but it still keepsqueue_enqueue_headsandqueue_claim_heads. Because availability is derived from those tables, a reset can preserve phantom backlog and stale lane sequence state even thoughready_entries/done_entrieswere emptied. Please truncate both head tables here as well so reset returns the queue store to a truly empty state.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@awa-model/src/queue_storage.rs` around lines 3973 - 3989, The reset() SQL TRUNCATE list currently clears many tables but omits the head tables; update the TRUNCATE statement in queue_storage.rs (the reset() implementation) to also include {schema}.queue_enqueue_heads and {schema}.queue_claim_heads so that resetting truncates both queue_enqueue_heads and queue_claim_heads along with the other tables, ensuring availability/queue head state is fully cleared.
♻️ Duplicate comments (1)
awa-model/src/queue_storage.rs (1)
3419-3443:⚠️ Potential issue | 🟠 Major | 🏗️ Heavy liftSerialize reconciliation with legacy
done_entrieswriters.These two code paths still rebuild the counter from a one-time
done_entriessnapshot without coordinating with pre-counter binaries that only writedone_entries.prepare_schema()will miss rows committed after the backfill starts, andrebuild_terminal_counters()has the same hole because none of the write paths take the rebuild advisory lock. That leavesqueue_terminal_live_countspermanently low until a later quiesced rebuild, so the “reconcile after upgrades/drift” contract is not actually guaranteed under a rolling deployment.Also applies to: 11533-11586
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@awa-model/src/queue_storage.rs` around lines 3419 - 3443, prepare_schema() and rebuild_terminal_counters() perform a backfill from done_entries into queue_terminal_live_counts without taking the same rebuild advisory lock used elsewhere, allowing concurrent legacy writers to commit new done_entries that are missed; fix this by acquiring the same rebuild/advisory lock used by the counter rebuild code before running the INSERT backfill (and release after), and ensure the same advisory lock is taken in rebuild_terminal_counters() so both paths serialize with legacy done_entries writers; reference the INSERT block that writes into queue_terminal_live_counts, the prepare_schema() function, and rebuild_terminal_counters() when adding calls to the existing advisory lock helper (or implement a small acquire_rebuild_lock()/release_rebuild_lock() wrapper) so the backfill is atomic relative to done_entries writers.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@awa/tests/queue_storage_runtime_test.rs`:
- Around line 5147-5217: The test manually overrides
awa.storage_transition_state after calling create_store which is redundant and
also fails to snapshot/restore prepared_engine (it selects/sets only
current_engine, details, state), so either remove the entire manual
UPDATE/restore block and rely on create_store/activate_queue_storage_transition,
or if you must keep it, change the prior_state to capture prepared_engine as
well (select current_engine, details, state, prepared_engine into prior_state),
avoid setting prepared_engine = NULL in the intermediate UPDATE, and restore all
four fields when resetting awa.storage_transition_state so no cross-test state
is leaked; update references around create_store, the SQL UPDATE/SELECT queries,
and the restore bind calls accordingly.
- Around line 4933-4938: The test currently asserts moved >= 2 after calling
store.bulk_move_failed_to_dlq(...), which masks regressions where the single-row
path (move_failed_to_dlq) didn't run; change the assertion to require exactly 2
rows moved (e.g. assert_eq!(moved, 2, "...")) so the test proves one row was
moved earlier and the bulk step removed exactly the remaining two; update the
assertion near the variable moved returned from bulk_move_failed_to_dlq and keep
the surrounding invariant check call to ensure behavior is still validated.
---
Outside diff comments:
In `@awa-model/src/queue_storage.rs`:
- Around line 3973-3989: The reset() SQL TRUNCATE list currently clears many
tables but omits the head tables; update the TRUNCATE statement in
queue_storage.rs (the reset() implementation) to also include
{schema}.queue_enqueue_heads and {schema}.queue_claim_heads so that resetting
truncates both queue_enqueue_heads and queue_claim_heads along with the other
tables, ensuring availability/queue head state is fully cleared.
---
Duplicate comments:
In `@awa-model/src/queue_storage.rs`:
- Around line 3419-3443: prepare_schema() and rebuild_terminal_counters()
perform a backfill from done_entries into queue_terminal_live_counts without
taking the same rebuild advisory lock used elsewhere, allowing concurrent legacy
writers to commit new done_entries that are missed; fix this by acquiring the
same rebuild/advisory lock used by the counter rebuild code before running the
INSERT backfill (and release after), and ensure the same advisory lock is taken
in rebuild_terminal_counters() so both paths serialize with legacy done_entries
writers; reference the INSERT block that writes into queue_terminal_live_counts,
the prepare_schema() function, and rebuild_terminal_counters() when adding calls
to the existing advisory lock helper (or implement a small
acquire_rebuild_lock()/release_rebuild_lock() wrapper) so the backfill is atomic
relative to done_entries writers.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: df6075bc-5fe2-4cf2-9d9f-abba370fdb8f
📒 Files selected for processing (5)
awa-cli/src/main.rsawa-model/migrations/v022_delete_compat_terminal_counter.sqlawa-model/src/migrations.rsawa-model/src/queue_storage.rsawa/tests/queue_storage_runtime_test.rs
…ion + doc links (#290) Addresses three reviewer findings on the previous push plus two collateral doc cleanups. **High: v022 can break SQL compat deletes during the migration/startup window** `queue_terminal_live_counts` is created lazily by `QueueStorage::prepare_schema()` on first runtime boot, not by migrations. Between `awa migrate` running and the new runtime preparing the active queue-storage schema, a `DELETE FROM awa.jobs` that landed on a terminal row would have called `awa.delete_job_compat()` and hit "relation does not exist". Guard the counter UPDATE in v022 with `IF to_regclass(format('%I.queue_terminal_live_counts', v_schema)) IS NOT NULL` so the DELETE silently degrades to "drift, then rebuild" rather than failing. Operators recover with `awa storage rebuild-terminal-counters` once the counter catches up. **Low: SQL compat test mutated awa.storage_transition_state redundantly and restored only part of it** `create_store()` already activates the test schema via the storage transition dance (prepare → enter_mixed_transition → finalize), so `awa.active_queue_storage_schema()` returns the test schema for the delete_job_compat call. The previous version snapshot/override/ restore block was redundant — and the restore only put back three of the four columns it mutated (`prepared_engine` was nulled but never restored). Removed the entire block; the test is now ~30 lines shorter. **Low: delete-path test loosened the bulk move assertion** Changed `assert!(moved >= 2, ...)` to `assert_eq!(moved, 2, ...)` so a silent no-op on the single-move step in part 1 doesn't get masked by part 2 picking up all 3 seeded `failed` rows. **Collateral: broken ADR-027 links** `docs/http-callbacks.md` (2 sites) and `docs/callback-receivers.md` (1 site) pointed at `027-callback-ingress.md`; the actual filename is `027-callback-ingress-surface.md`. Fixed all three. **Collateral: ADR-029 contradiction about callback-only ingress follow-up dispatch** The "Relationship to other ADRs" section said callback-only ingress "dispatches follow-ups best-effort after the resolution commits; no in-process handler registry is required for delivery." That contradicted the Open Extensions section, which correctly says callback ingress outside the worker `Client` has no follow-up dispatch yet — the worker `Client::*_external` paths drive it via `admin::*_external_in_tx`, and the callback-only ingress process will need its own registry hookup to reuse that path. Rewrote the ADR-027 cross-reference to match the implementation reality and pointer at the Open Extensions section. ### Tests All 6 #290 invariant tests still pass after the simplification.
Summary
First substantive PR for #290. Lands the new
queue_terminal_live_countstable with the complete write-side lifecycle (insert / delete / prune-fold) and a tight invariant test on every path. The read switch inqueue_counts_exactand theQueueCounts.completed → .terminalrename land in a follow-on PR; the bench and docs in another.Design recorded on the issue: #290 (comment)
Scope evolved from the original PR A1 review
The original PR A1 (schema + inserts only, decrements + prune deferred) was withdrawn after reviewer feedback that splitting writes from deletes would persist stale counter state between PRs — any normal
DLQ move/discard/retry-from-terminal/prunebetween the merges would drift the counter. This PR keeps the full lifecycle atomic so the counter never reaches a state the read-switch PR would need to rebuild.What ships
Schema
queue_terminal_live_countskeyed by(ready_slot, queue, priority, enqueue_shard)— same identity the ring + ADR-025 shards already use.fillfactor=50+ tuned autovacuum, matching thequeue_claimer_leaseshot-counter pattern.done_entriesduringprepare_schema. Fresh installs no-op; upgrades populate before any new path reads.ON CONFLICT DO NOTHINGkeeps it idempotent.Increments
insert_done_rows_txaggregates the batch by group in Rust (BTreeMap, sorted) then one UPSERT viaincrement_live_terminal_counters_tx.complete_receipt_runtime_batch_fastadds acounter_upsertdata-modifying CTE thatGROUP BYs theterminalCTE and UPSERTs in the same statement, withORDER BY ready_slot, queue, priority, enqueue_shardbefore the ON CONFLICT.Decrements wired at the 5 terminal-removal surfaces
retry_job_tx(retry-from-terminal)move_failed_to_dlq(single DLQ move)bulk_move_failed_to_dlq(bulk DLQ move)discard_failed_by_kind(discard)prune_oldest(foldsSUM(live_terminal_count) WHERE ready_slot = $slotintoqueue_terminal_rollups, then DELETEs the slot's counter rows in the same tx as partition TRUNCATE — no scan of done_entries needed)The SQL compat layer was checked: no triggers/functions route
DELETE FROM awa.jobsintodone_entries, so nothing else to wire.Deadlock safety
The original counter upserts iterated
HashMap(non-deterministic key order) and the fused CTE'sGROUP BYhad noORDER BY. Two concurrent batches upserting overlapping keys could acquire row locks in opposite order and deadlock onON CONFLICT DO UPDATE. Fixed by sorting:BTreeMapfor deterministic iteration.GROUP BYin a subquery withORDER BY ready_slot, queue, priority, enqueue_shardbeforeON CONFLICT.Truncate list
queue_terminal_live_countsadded to the test-reset TRUNCATE block.Invariant tests (4 total, all green)
test_queue_terminal_live_counts_matches_done_entries_via_insert_helper—insert_done_rows_txpath. Asserts the invariant after one and two batches (catchesSET = EXCLUDEDvsSET += EXCLUDEDregressions).test_queue_terminal_live_counts_matches_done_entries_via_receipt_fast_path— drives the fused CTE path. Now asserts on every claimed entry thatlease_claim_receipt = trueAND the fullreceipt_fast_complete_candidatepredicate holds (unique_key.is_none() && tags.empty() && errors.empty()), so the test cannot silently fall through to the non-fused helper if eligibility ever tightens.test_queue_terminal_live_counts_decrement_on_terminal_delete_paths— exercisesmove_failed_to_dlq/bulk_move_failed_to_dlq/discard_failed_by_kind/retry_jobagainst directly-seeded terminal rows, assertingSUM(live_terminal_count) == count(*) FROM done_entriesafter each.test_queue_terminal_live_counts_prune_folds_into_rollups— completes 5 rows, rotates, prunes; assertsdone_entriesempty, live counter empty for that slot, andqueue_terminal_rollupsabsorbed the 5.Refs
QueueCounts.completed → .terminalrename + MAPPING.md flip land in the next PR; single-shard worst-case bench in the one after that. None of those have any harder write-side correctness work since the counter is already authoritative.Test plan
cargo fmt --allcargo clippy --workspace --all-targets --all-features -- -D warningscargo build --workspacecargo test -p awa --test queue_storage_runtime_test test_queue_terminal_live_countsagainst Postgres — all 4 tests pass.Summary by CodeRabbit
Performance
New Features
Schema Updates