diff --git a/awa-cli/src/main.rs b/awa-cli/src/main.rs index 9f856492..937f8f09 100644 --- a/awa-cli/src/main.rs +++ b/awa-cli/src/main.rs @@ -327,6 +327,14 @@ enum StorageCommands { #[arg(long, value_name = "DURATION", num_args = 0..=1, default_missing_value = "")] wait: Option, }, + /// Rebuild `queue_terminal_live_counts` from `done_entries`. + /// + /// Use this after upgrading from a pre-#290 fleet, after any incident + /// that may have left the counter inconsistent with `done_entries`, + /// or as a routine drift-recovery step before relying on + /// counter-fed reads for billing-grade accuracy. Wraps the rebuild + /// in an advisory lock; best run on a quiesced fleet. + RebuildTerminalCounters, } #[derive(Subcommand)] @@ -940,6 +948,23 @@ async fn main() -> Result<(), Box> { println!("{}", serde_json::to_string_pretty(&report)?); } } + StorageCommands::RebuildTerminalCounters => { + // Resolve the live queue-storage schema from the + // transition state; no point letting the operator + // pass it as a flag and risk targeting an inactive + // engine. + let schema = awa_model::QueueStorage::active_schema(&pool) + .await? + .ok_or_else(|| { + "no active queue-storage schema; nothing to rebuild".to_string() + })?; + let store = awa_model::QueueStorage::from_existing_schema(&schema)?; + let inserted = store.rebuild_terminal_counters(&pool).await?; + eprintln!( + "rebuilt queue_terminal_live_counts in schema '{schema}': \ + {inserted} counter row(s) populated from done_entries" + ); + } }, Commands::Queue { command } => match command { diff --git a/awa-model/migrations/v022_delete_compat_terminal_counter.sql b/awa-model/migrations/v022_delete_compat_terminal_counter.sql new file mode 100644 index 00000000..403ccfbe --- /dev/null +++ b/awa-model/migrations/v022_delete_compat_terminal_counter.sql @@ -0,0 +1,220 @@ +-- #290: the SQL compat layer's DELETE FROM awa.jobs WHERE id = $1 routes +-- to awa.delete_job_compat(p_id), which can land on `done_entries` for +-- terminal rows. The function previously deleted from done_entries +-- without touching `queue_terminal_live_counts`, so terminal-row deletes +-- via the SQL compat path drifted the counter from the underlying table. +-- +-- Reproduced on the v021 head: `DELETE FROM awa.jobs WHERE id = ` +-- dropped done_entries from 5 → 4 while the live counter stayed at 5. +-- Once `queue_counts_exact` reads the counter (see #305) the drift becomes +-- operator-visible; before that, prune_oldest folding from the counter +-- could bake the drift into `queue_terminal_rollups` permanently. +-- +-- This migration reshapes the done_entries branch of delete_job_compat to +-- also return ready_slot + enqueue_shard, and decrement the counter for +-- the deleted row inside the same statement. Other branches +-- (ready_entries / deferred_jobs / leases / dlq_entries) are unchanged — +-- the counter is keyed on done_entries only. + +CREATE OR REPLACE FUNCTION awa.delete_job_compat(p_id BIGINT) +RETURNS BOOLEAN AS $$ +DECLARE + v_schema TEXT; + v_queue TEXT; + v_priority SMALLINT; + v_lane_seq BIGINT; + v_enqueue_shard SMALLINT; + v_state awa.job_state; + v_unique_key BYTEA; + v_unique_states TEXT; + v_ready_slot INT; + v_rows INT; +BEGIN + v_schema := awa.active_queue_storage_schema(); + + IF v_schema IS NULL THEN + RAISE EXCEPTION 'queue storage is not active' + USING ERRCODE = '55000'; + END IF; + + EXECUTE format( + 'DELETE FROM %I.ready_entries + WHERE job_id = $1 + RETURNING queue, priority, lane_seq, enqueue_shard, + ''available''::awa.job_state, unique_key, unique_states', + v_schema + ) + INTO v_queue, v_priority, v_lane_seq, v_enqueue_shard, + v_state, v_unique_key, v_unique_states + USING p_id; + GET DIAGNOSTICS v_rows = ROW_COUNT; + + IF v_rows > 0 THEN + EXECUTE format( + 'UPDATE %I.queue_claim_heads + SET claim_seq = claim_seq + 1 + WHERE queue = $1 + AND priority = $2 + AND enqueue_shard = $3 + AND claim_seq = $4', + v_schema + ) + USING v_queue, v_priority, v_enqueue_shard, v_lane_seq; + PERFORM awa.release_queue_storage_unique_claim( + p_id, + v_unique_key, + v_unique_states, + v_state + ); + RETURN TRUE; + END IF; + + EXECUTE format( + 'DELETE FROM %I.deferred_jobs + WHERE job_id = $1 + RETURNING queue, priority, state, unique_key, unique_states', + v_schema + ) + INTO v_queue, v_priority, v_state, v_unique_key, v_unique_states + USING p_id; + GET DIAGNOSTICS v_rows = ROW_COUNT; + + IF v_rows > 0 THEN + PERFORM awa.release_queue_storage_unique_claim( + p_id, + v_unique_key, + v_unique_states, + v_state + ); + RETURN TRUE; + END IF; + + EXECUTE format( + 'WITH deleted AS ( + DELETE FROM %1$I.leases AS leases + WHERE job_id = $1 + RETURNING + leases.ready_slot, + leases.ready_generation, + leases.job_id, + leases.queue, + leases.priority, + leases.enqueue_shard, + leases.lane_seq, + leases.run_lease, + leases.state + ), + deleted_attempt AS ( + DELETE FROM %1$I.attempt_state AS attempt + USING deleted + WHERE attempt.job_id = deleted.job_id + AND attempt.run_lease = deleted.run_lease + ) + SELECT + deleted.queue, + deleted.priority, + deleted.state, + ready.unique_key, + ready.unique_states + FROM deleted + JOIN %1$I.ready_entries AS ready + ON ready.ready_slot = deleted.ready_slot + AND ready.ready_generation = deleted.ready_generation + AND ready.queue = deleted.queue + AND ready.priority = deleted.priority + AND ready.enqueue_shard = deleted.enqueue_shard + AND ready.lane_seq = deleted.lane_seq + AND ready.job_id = deleted.job_id', + v_schema + ) + INTO v_queue, v_priority, v_state, v_unique_key, v_unique_states + USING p_id; + GET DIAGNOSTICS v_rows = ROW_COUNT; + + IF v_rows > 0 THEN + PERFORM awa.release_queue_storage_unique_claim( + p_id, + v_unique_key, + v_unique_states, + v_state + ); + RETURN TRUE; + END IF; + + -- #290: done_entries branch now returns ready_slot + enqueue_shard + -- and decrements queue_terminal_live_counts for the deleted row. + -- This is the SQL-side mirror of the Rust delete-path wiring + -- (move_failed_to_dlq, bulk_move_failed_to_dlq, discard_failed_by_kind, + -- retry_job_tx). Without it, DELETE FROM awa.jobs WHERE id = + -- drifts the counter from done_entries. + EXECUTE format( + 'DELETE FROM %I.done_entries + WHERE job_id = $1 + RETURNING queue, priority, state, unique_key, unique_states, + ready_slot, enqueue_shard', + v_schema + ) + INTO v_queue, v_priority, v_state, v_unique_key, v_unique_states, + v_ready_slot, v_enqueue_shard + USING p_id; + GET DIAGNOSTICS v_rows = ROW_COUNT; + + IF v_rows > 0 THEN + -- The counter table is created lazily by + -- QueueStorage::prepare_schema() on first runtime boot, which + -- can happen AFTER migrations have run (migrations don't + -- prepare the per-schema queue-storage tables). Guard the + -- decrement with `to_regclass` so a DELETE FROM awa.jobs that + -- arrives during the boot window degrades to "drift, then + -- rebuild" rather than "relation does not exist". Operators + -- recover with `awa storage rebuild-terminal-counters` once the + -- counter table catches up. + IF to_regclass(format('%I.queue_terminal_live_counts', v_schema)) IS NOT NULL THEN + EXECUTE format( + 'UPDATE %I.queue_terminal_live_counts AS counts + SET live_terminal_count = GREATEST(0, counts.live_terminal_count - 1) + WHERE counts.ready_slot = $1 + AND counts.queue = $2 + AND counts.priority = $3 + AND counts.enqueue_shard = $4', + v_schema + ) + USING v_ready_slot, v_queue, v_priority, v_enqueue_shard; + END IF; + PERFORM awa.release_queue_storage_unique_claim( + p_id, + v_unique_key, + v_unique_states, + v_state + ); + RETURN TRUE; + END IF; + + EXECUTE format( + 'DELETE FROM %I.dlq_entries + WHERE job_id = $1 + RETURNING queue, priority, state, unique_key, unique_states', + v_schema + ) + INTO v_queue, v_priority, v_state, v_unique_key, v_unique_states + USING p_id; + GET DIAGNOSTICS v_rows = ROW_COUNT; + + IF v_rows > 0 THEN + PERFORM awa.release_queue_storage_unique_claim( + p_id, + v_unique_key, + v_unique_states, + v_state + ); + RETURN TRUE; + END IF; + + RETURN FALSE; +END; +$$ LANGUAGE plpgsql +SET search_path = pg_catalog, awa, public; + +INSERT INTO awa.schema_version (version, description) +VALUES (22, 'delete_job_compat decrements queue_terminal_live_counts for done_entries deletes (#290)') +ON CONFLICT (version) DO NOTHING; diff --git a/awa-model/src/migrations.rs b/awa-model/src/migrations.rs index 625acf95..7d5800e1 100644 --- a/awa-model/src/migrations.rs +++ b/awa-model/src/migrations.rs @@ -4,7 +4,7 @@ use sqlx::PgPool; use tracing::info; /// Current schema version. -pub const CURRENT_VERSION: i32 = 21; +pub const CURRENT_VERSION: i32 = 22; /// All migrations in order. SQL lives in `awa-model/migrations/*.sql` /// for easy inspection by users who run their own migration tooling. @@ -93,6 +93,11 @@ const MIGRATIONS: &[(i32, &str, &[&str])] = &[ "Shard-aware lane indexes on ready_entries/done_entries/leases", &[V21_UP], ), + ( + 22, + "delete_job_compat decrements queue_terminal_live_counts for done_entries deletes", + &[V22_UP], + ), ]; const V1_UP: &str = include_str!("../migrations/v001_canonical_schema.sql"); @@ -115,6 +120,7 @@ const V18_UP: &str = include_str!("../migrations/v018_insert_job_compat_ordering const V19_UP: &str = include_str!("../migrations/v019_queue_storage_jobs_compat_shard_joins.sql"); const V20_UP: &str = include_str!("../migrations/v020_active_queue_storage_schema_fallback.sql"); const V21_UP: &str = include_str!("../migrations/v021_shard_aware_lane_indexes.sql"); +const V22_UP: &str = include_str!("../migrations/v022_delete_compat_terminal_counter.sql"); /// Old version numbers from pre-0.4 releases that used V3/V4/V5 numbering. /// Also tolerates the unreleased inline-V6 branch numbering used during review. diff --git a/awa-model/src/queue_storage.rs b/awa-model/src/queue_storage.rs index cdef11fd..2e97f75a 100644 --- a/awa-model/src/queue_storage.rs +++ b/awa-model/src/queue_storage.rs @@ -3368,6 +3368,85 @@ impl QueueStorage { .map_err(map_sqlx_error)?; } + // queue_terminal_live_counts: per-(ready_slot, queue, priority, + // enqueue_shard) live count of rows currently in done_entries. + // Replaces the `count(*) FROM done_entries WHERE queue = ANY(...)` + // scan in queue_counts_exact (#290). Shard-aligned to inherit the + // ADR-025 row-lock spread; one counter row per group means a + // skewed batch of 512 terminal inserts costs one UPSERT. Prune + // folds a slot's rows into queue_terminal_rollups in the same + // transaction as the partition truncate (lands in PR B of this + // series; A1 here only wires the increment side). + // + // fillfactor=50 + tuned autovacuum: the upsert path is a HOT + // candidate (no secondary index touched). Without explicit + // fillfactor, the upserts spill to fresh pages under sustained + // completion load. + // + // Created here, after the `done_entries` partition loop above, + // because the backfill below SELECTs FROM `done_entries`. + sqlx::query(&format!( + r#" + CREATE TABLE IF NOT EXISTS {schema}.queue_terminal_live_counts ( + ready_slot INT NOT NULL, + queue TEXT NOT NULL, + priority SMALLINT NOT NULL, + enqueue_shard SMALLINT NOT NULL, + live_terminal_count BIGINT NOT NULL DEFAULT 0, + PRIMARY KEY (ready_slot, queue, priority, enqueue_shard) + ) + "# + )) + .execute(install_tx.as_mut()) + .await + .map_err(map_sqlx_error)?; + + sqlx::query(&format!( + r#" + ALTER TABLE {schema}.queue_terminal_live_counts SET ( + fillfactor = 50, + autovacuum_vacuum_scale_factor = 0.0, + autovacuum_vacuum_threshold = 200, + autovacuum_vacuum_cost_limit = 2000, + autovacuum_vacuum_cost_delay = 2 + ) + "# + )) + .execute(install_tx.as_mut()) + .await + .map_err(map_sqlx_error)?; + + // Backfill from existing done_entries in the same transaction as + // the table creation. On a fresh install this is a no-op + // (done_entries is empty). On an existing 0.6 cluster, this + // builds the live counts from the current done_entries + // population before any new code path tries to read them. + // ON CONFLICT DO NOTHING because the loop is idempotent and a + // re-running prepare_schema must not double-count. + sqlx::query(&format!( + r#" + INSERT INTO {schema}.queue_terminal_live_counts AS counts ( + ready_slot, + queue, + priority, + enqueue_shard, + live_terminal_count + ) + SELECT + ready_slot, + queue, + priority, + enqueue_shard, + count(*)::bigint + FROM {schema}.done_entries + GROUP BY ready_slot, queue, priority, enqueue_shard + ON CONFLICT (ready_slot, queue, priority, enqueue_shard) DO NOTHING + "# + )) + .execute(install_tx.as_mut()) + .await + .map_err(map_sqlx_error)?; + for slot in 0..self.lease_slot_count() { sqlx::query(&format!( r#" @@ -3902,6 +3981,7 @@ impl QueueStorage { {schema}.deferred_jobs, {schema}.queue_lanes, {schema}.queue_terminal_rollups, + {schema}.queue_terminal_live_counts, {schema}.queue_claimer_leases, {schema}.queue_claimer_state, {schema}.queue_ring_slots, @@ -4913,9 +4993,170 @@ impl QueueStorage { .map_err(map_sqlx_error)?; } + // #290: maintain queue_terminal_live_counts as a denormalised + // count of live done_entries rows, keyed by + // (ready_slot, queue, priority, enqueue_shard) to match the + // ring + shard identity. Aggregate this batch into one row per + // group so a skewed full batch of N terminal inserts costs at + // most one UPSERT per (slot, queue, priority, shard) tuple + // touched. The read side (queue_counts_exact, switched in a + // follow-up PR) then sums these counters instead of scanning + // done_entries. + self.increment_live_terminal_counters_tx(tx, rows).await?; + Ok(rows.len()) } + /// Aggregate `rows` by `(ready_slot, queue, priority, enqueue_shard)` + /// and UPSERT the per-group delta into `queue_terminal_live_counts`. + /// Called from every terminal-insert path so the counter stays in + /// lockstep with `done_entries` cardinality. The invariant tested in + /// `queue_storage_runtime_test::test_queue_terminal_live_counts_*` + /// is that `SUM(live_terminal_count)` for a queue equals + /// `count(*) FROM done_entries WHERE queue = ANY(...)`. + async fn increment_live_terminal_counters_tx<'a>( + &self, + tx: &mut sqlx::Transaction<'a, sqlx::Postgres>, + rows: &[DoneJobRow], + ) -> Result<(), AwaError> { + if rows.is_empty() { + return Ok(()); + } + // BTreeMap, not HashMap: two concurrent transactions upserting the + // same set of counter keys must take row locks in the same order, + // or they can deadlock on the ON CONFLICT DO UPDATE. Sorted key + // iteration removes that race entirely. See also the matching + // ORDER BY in the fused CTE upsert. + let mut by_group: BTreeMap<(i32, String, i16, i16), i64> = BTreeMap::new(); + for row in rows { + let key = ( + row.ready_slot, + row.queue.clone(), + row.priority, + row.enqueue_shard, + ); + *by_group.entry(key).or_insert(0) += 1; + } + let mut ready_slots: Vec = Vec::with_capacity(by_group.len()); + let mut queues: Vec = Vec::with_capacity(by_group.len()); + let mut priorities: Vec = Vec::with_capacity(by_group.len()); + let mut enqueue_shards: Vec = Vec::with_capacity(by_group.len()); + let mut deltas: Vec = Vec::with_capacity(by_group.len()); + for ((slot, queue, prio, shard), delta) in by_group { + ready_slots.push(slot); + queues.push(queue); + priorities.push(prio); + enqueue_shards.push(shard); + deltas.push(delta); + } + let schema = self.schema(); + // ORDER BY pins the lock-acquisition order PG sees inside the + // INSERT ... ON CONFLICT, matching the sorted Rust iteration above. + sqlx::query(&format!( + r#" + INSERT INTO {schema}.queue_terminal_live_counts AS counts ( + ready_slot, queue, priority, enqueue_shard, live_terminal_count + ) + SELECT ready_slot, queue, priority, enqueue_shard, delta + FROM unnest( + $1::int[], $2::text[], $3::smallint[], $4::smallint[], $5::bigint[] + ) AS d(ready_slot, queue, priority, enqueue_shard, delta) + ORDER BY ready_slot, queue, priority, enqueue_shard + ON CONFLICT (ready_slot, queue, priority, enqueue_shard) DO UPDATE + SET live_terminal_count = counts.live_terminal_count + EXCLUDED.live_terminal_count + "# + )) + .bind(&ready_slots) + .bind(&queues) + .bind(&priorities) + .bind(&enqueue_shards) + .bind(&deltas) + .execute(tx.as_mut()) + .await + .map_err(map_sqlx_error)?; + Ok(()) + } + + /// Decrement `queue_terminal_live_counts` by the per-group magnitudes + /// implied by `rows`. Called from every terminal-delete path + /// (retry-from-terminal, DLQ move, discard). Rows whose group has no + /// counter row are tolerated (`GREATEST(0, ...)` floor) so the + /// operation is idempotent across re-runs. + async fn decrement_live_terminal_counters_tx<'a>( + &self, + tx: &mut sqlx::Transaction<'a, sqlx::Postgres>, + rows: &[(i32, String, i16, i16)], + ) -> Result<(), AwaError> { + if rows.is_empty() { + return Ok(()); + } + // BTreeMap + ORDER BY: same deadlock-avoidance discipline as the + // increment helper. The UPDATE locks rows in the order PG sees + // its driving subquery; sorting the unnest input gives a stable + // (slot, queue, priority, shard) lock-acquisition order across + // concurrent transactions. + let mut by_group: BTreeMap<(i32, String, i16, i16), i64> = BTreeMap::new(); + for key in rows { + *by_group.entry(key.clone()).or_insert(0) += 1; + } + let mut ready_slots: Vec = Vec::with_capacity(by_group.len()); + let mut queues: Vec = Vec::with_capacity(by_group.len()); + let mut priorities: Vec = Vec::with_capacity(by_group.len()); + let mut enqueue_shards: Vec = Vec::with_capacity(by_group.len()); + let mut deltas: Vec = Vec::with_capacity(by_group.len()); + for ((slot, queue, prio, shard), delta) in by_group { + ready_slots.push(slot); + queues.push(queue); + priorities.push(prio); + enqueue_shards.push(shard); + deltas.push(delta); + } + let schema = self.schema(); + sqlx::query(&format!( + r#" + UPDATE {schema}.queue_terminal_live_counts AS counts + SET live_terminal_count = GREATEST(0, counts.live_terminal_count - sorted.delta) + FROM ( + SELECT ready_slot, queue, priority, enqueue_shard, delta + FROM unnest( + $1::int[], $2::text[], $3::smallint[], $4::smallint[], $5::bigint[] + ) AS d(ready_slot, queue, priority, enqueue_shard, delta) + ORDER BY ready_slot, queue, priority, enqueue_shard + ) AS sorted + WHERE counts.ready_slot = sorted.ready_slot + AND counts.queue = sorted.queue + AND counts.priority = sorted.priority + AND counts.enqueue_shard = sorted.enqueue_shard + "# + )) + .bind(&ready_slots) + .bind(&queues) + .bind(&priorities) + .bind(&enqueue_shards) + .bind(&deltas) + .execute(tx.as_mut()) + .await + .map_err(map_sqlx_error)?; + Ok(()) + } + + /// Build the row-key vector for `decrement_live_terminal_counters_tx` + /// from a slice of `DoneJobRow` (used by retry-from-terminal, + /// DLQ move, and discard, all of which DELETE FROM done_entries + /// with `RETURNING *` materialised into `DoneJobRow`). + fn done_rows_to_counter_keys(rows: &[DoneJobRow]) -> Vec<(i32, String, i16, i16)> { + rows.iter() + .map(|row| { + ( + row.ready_slot, + row.queue.clone(), + row.priority, + row.enqueue_shard, + ) + }) + .collect() + } + async fn delete_ready_backing_rows_tx<'a>( &self, tx: &mut sqlx::Transaction<'a, sqlx::Postgres>, @@ -6256,7 +6497,40 @@ impl QueueStorage { JOIN closed ON closed.job_id = completed.job_id AND closed.run_lease = completed.run_lease - RETURNING job_id, run_lease + RETURNING ready_slot, queue, priority, enqueue_shard, job_id, run_lease + ), + -- #290: increment live terminal counters in lockstep with + -- the `terminal` INSERT above. Aggregate by group so a + -- skewed batch costs one UPSERT per touched + -- (slot, queue, priority, shard). Postgres executes + -- data-modifying CTEs unconditionally, so the outer SELECT + -- doesn't need to reference this stage. + -- + -- ORDER BY pins the lock-acquisition order on the + -- ON CONFLICT DO UPDATE. Two concurrent receipt-fast batches + -- touching overlapping (slot, queue, priority, shard) tuples + -- would otherwise deadlock if their group iteration order + -- differed. + counter_upsert AS ( + INSERT INTO {schema}.queue_terminal_live_counts AS counts ( + ready_slot, queue, priority, enqueue_shard, live_terminal_count + ) + SELECT + ready_slot, queue, priority, enqueue_shard, delta + FROM ( + SELECT + terminal.ready_slot, + terminal.queue, + terminal.priority, + terminal.enqueue_shard, + count(*)::bigint AS delta + FROM terminal + GROUP BY terminal.ready_slot, terminal.queue, terminal.priority, terminal.enqueue_shard + ) AS grouped + ORDER BY ready_slot, queue, priority, enqueue_shard + ON CONFLICT (ready_slot, queue, priority, enqueue_shard) DO UPDATE + SET live_terminal_count = counts.live_terminal_count + EXCLUDED.live_terminal_count + RETURNING 1 ) SELECT job_id, run_lease FROM terminal @@ -7081,6 +7355,13 @@ impl QueueStorage { if let Some(terminal) = terminal { self.delete_ready_backing_rows_tx(tx, std::slice::from_ref(&terminal)) .await?; + // #290: the DELETE FROM done_entries above removes one terminal + // row; the counter must decrement in lockstep. + self.decrement_live_terminal_counters_tx( + tx, + &Self::done_rows_to_counter_keys(std::slice::from_ref(&terminal)), + ) + .await?; let ready_row = ExistingReadyRow { job_id: terminal.job_id, kind: terminal.kind, @@ -10302,6 +10583,12 @@ impl QueueStorage { self.delete_ready_backing_rows_tx(&mut tx, std::slice::from_ref(&moved)) .await?; + // #290: DLQ move deletes from done_entries — decrement counter. + self.decrement_live_terminal_counters_tx( + &mut tx, + &Self::done_rows_to_counter_keys(std::slice::from_ref(&moved)), + ) + .await?; let dlq_row = moved .clone() .into_dlq_row(dlq_reason.to_string(), Utc::now()); @@ -10353,6 +10640,10 @@ impl QueueStorage { } self.delete_ready_backing_rows_tx(&mut tx, &moved).await?; + // #290: bulk DLQ move deletes from done_entries — decrement counter + // by the per-group magnitudes of the moved rows. + self.decrement_live_terminal_counters_tx(&mut tx, &Self::done_rows_to_counter_keys(&moved)) + .await?; let dlq_at = Utc::now(); let rows: Vec = moved .into_iter() @@ -10594,6 +10885,15 @@ impl QueueStorage { self.delete_ready_backing_rows_tx(&mut tx, &deleted_done) .await?; + // #290: discard removes terminal `failed` rows from done_entries + // (and `failed` DLQ rows from dlq_entries — counter is keyed on + // done_entries only). Decrement the counter by the per-group + // magnitudes of `deleted_done`. + self.decrement_live_terminal_counters_tx( + &mut tx, + &Self::done_rows_to_counter_keys(&deleted_done), + ) + .await?; for row in &deleted_done { self.sync_unique_claim( @@ -11219,6 +11519,73 @@ impl QueueStorage { }) } + /// Rebuild the `queue_terminal_live_counts` table from scratch by + /// truncating it and re-aggregating `done_entries` (per #290). Run + /// this when: + /// + /// - upgrading from a pre-#290 fleet, where in-flight binaries wrote + /// `done_entries` without maintaining the counter, + /// - after any incident that may have left the counter inconsistent + /// with `done_entries`, or + /// - as a routine drift-recovery step before relying on counter-fed + /// reads for billing-grade accuracy. + /// + /// The rebuild is wrapped in an advisory lock so concurrent writers + /// don't interleave new inserts mid-rebuild. The lock key is scoped + /// to the queue-storage schema, so other schemas / other operations + /// are unaffected. Writers that hit the lock will block briefly + /// rather than fail. + /// + /// **Operator note:** this is best run on a quiesced fleet (workers + /// paused or fully drained). Concurrent inserts during the rebuild + /// will block on the lock; long-held locks can stall the fleet. The + /// rebuild itself is O(rows in `done_entries`). + #[tracing::instrument(skip(self, pool), name = "queue_storage.rebuild_terminal_counters")] + pub async fn rebuild_terminal_counters(&self, pool: &PgPool) -> Result { + let schema = self.schema(); + let rebuild_lock_name = format!("awa.queue_storage.rebuild_terminal_counters:{schema}"); + let mut tx = pool.begin().await.map_err(map_sqlx_error)?; + + sqlx::query("SELECT pg_advisory_xact_lock(hashtextextended($1, 0))") + .bind(&rebuild_lock_name) + .execute(tx.as_mut()) + .await + .map_err(map_sqlx_error)?; + + sqlx::query(&format!( + "TRUNCATE TABLE {schema}.queue_terminal_live_counts" + )) + .execute(tx.as_mut()) + .await + .map_err(map_sqlx_error)?; + + let inserted: i64 = sqlx::query_scalar(&format!( + r#" + WITH inserted AS ( + INSERT INTO {schema}.queue_terminal_live_counts AS counts ( + ready_slot, queue, priority, enqueue_shard, live_terminal_count + ) + SELECT + ready_slot, + queue, + priority, + enqueue_shard, + count(*)::bigint + FROM {schema}.done_entries + GROUP BY ready_slot, queue, priority, enqueue_shard + RETURNING 1 + ) + SELECT COALESCE(count(*), 0)::bigint FROM inserted + "# + )) + .fetch_one(tx.as_mut()) + .await + .map_err(map_sqlx_error)?; + + tx.commit().await.map_err(map_sqlx_error)?; + Ok(inserted) + } + #[tracing::instrument(skip(self, pool), name = "queue_storage.prune_oldest")] pub async fn prune_oldest(&self, pool: &PgPool) -> Result { let schema = self.schema(); @@ -11325,6 +11692,19 @@ impl QueueStorage { }); } + // #290: scan the about-to-be-truncated partition for the rollup + // fold. The rollup column is *permanent* state, so we MUST fold + // from ground truth (the `{done_child}` partition itself), not + // from `queue_terminal_live_counts` — the counter can drift + // briefly during a rolling upgrade from a pre-counter binary + // and folding drift into the rollup would bake it in forever. + // The counter rows for this slot are still cleaned up after the + // truncate; reads from the counter (queue_counts_exact in #305) + // may transiently disagree with the rollup until the operator + // runs `awa storage rebuild-terminal-counters`, but the rollup + // itself stays authoritative. See PR #304 reviewer finding + // "High: A1 can persist stale counter state before the + // read-switch PR" for the trade-off. let pruned_terminal_counts: Vec<(String, i16, i64)> = sqlx::query_as(&format!( r#" SELECT queue, priority, count(*)::bigint AS pruned_count @@ -11346,6 +11726,20 @@ impl QueueStorage { self.adjust_terminal_rollups_batch(&mut tx, pruned_terminal_counts.into_iter()) .await?; } + // #290: the live counter rows for this slot are about to + // be orphans (their underlying done_entries rows just got + // truncated). Delete them in the same transaction. The + // rollup fold above already captured ground-truth from + // the partition scan; this just cleans up the counter + // index entries so a future insert into a re-rotated + // slot starts from zero. + sqlx::query(&format!( + "DELETE FROM {schema}.queue_terminal_live_counts WHERE ready_slot = $1" + )) + .bind(slot) + .execute(tx.as_mut()) + .await + .map_err(map_sqlx_error)?; tx.commit().await.map_err(map_sqlx_error)?; Ok(PruneOutcome::Pruned { slot }) } diff --git a/awa/tests/queue_storage_runtime_test.rs b/awa/tests/queue_storage_runtime_test.rs index a5d8a5c0..b7aedd59 100644 --- a/awa/tests/queue_storage_runtime_test.rs +++ b/awa/tests/queue_storage_runtime_test.rs @@ -4755,6 +4755,488 @@ async fn test_queue_storage_queue_counts_fast_matches_exact_on_steady_state() { assert_eq!(post_prune_fast.completed, post_prune_exact.completed); } +/// #290 PR A1 invariant: every terminal-row insert path increments +/// `queue_terminal_live_counts` so that `SUM(live_terminal_count)` for a +/// queue equals `count(*) FROM done_entries WHERE queue = ANY(...)`. +/// +/// PR A1 only wires the increment side. The decrement side (DLQ moves, +/// discards, retry-from-terminal, SQL-compat deletes) lands in PR A2 and +/// the read switch lands in PR B; this test deliberately avoids any +/// terminal-delete path so the invariant holds on the increment side alone. +/// +/// The test exercises both insert paths: +/// - `insert_done_rows_tx` via `complete_batch` on the lease-materialisation +/// path (default `create_store` builds the store without the receipt +/// fast-complete candidate set). +/// - the fused receipt fast path is exercised in +/// `test_queue_terminal_live_counts_matches_done_entries_via_receipt_fast_path` +/// below. +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn test_queue_terminal_live_counts_matches_done_entries_via_insert_helper() { + let _guard = QUEUE_STORAGE_RUNTIME_LOCK.lock().await; + let pool = setup_pool(10).await; + let queue = "qs_terminal_live_counts_helper"; + let schema = "awa_qs_terminal_live_counts_helper"; + let store = create_store(&pool, schema).await; + + assert_eq!( + live_count_sum(&pool, schema, queue).await, + done_entries_count(&pool, schema, queue).await, + "invariant holds on empty queue" + ); + + store + .enqueue_batch(&pool, queue, 1, 7) + .await + .expect("enqueue"); + let claimed = store.claim_batch(&pool, queue, 7).await.expect("claim"); + assert_eq!(claimed.len(), 7); + let completed = store + .complete_batch(&pool, &claimed) + .await + .expect("complete"); + assert_eq!(completed, 7); + + let live_sum = live_count_sum(&pool, schema, queue).await; + let done_count = done_entries_count(&pool, schema, queue).await; + assert_eq!(done_count, 7); + assert_eq!( + live_sum, done_count, + "live counter sum must equal done_entries cardinality after complete_batch" + ); + + // A second completion batch on the same queue must accumulate, not + // replace — guards against the UPSERT using = instead of += on + // conflict. + store + .enqueue_batch(&pool, queue, 1, 3) + .await + .expect("enqueue 2"); + let claimed2 = store.claim_batch(&pool, queue, 3).await.expect("claim 2"); + store + .complete_batch(&pool, &claimed2) + .await + .expect("complete 2"); + let live_sum = live_count_sum(&pool, schema, queue).await; + let done_count = done_entries_count(&pool, schema, queue).await; + assert_eq!(done_count, 10); + assert_eq!(live_sum, done_count, "counter accumulates across batches"); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn test_queue_terminal_live_counts_matches_done_entries_via_receipt_fast_path() { + let _guard = QUEUE_STORAGE_RUNTIME_LOCK.lock().await; + let pool = setup_pool(10).await; + let queue = "qs_terminal_live_counts_fast"; + let schema = "awa_qs_terminal_live_counts_fast"; + // lease_claim_receipts: true makes the receipt fast-complete path + // available (assuming the claimed jobs are otherwise eligible — + // see Self::receipt_fast_complete_candidate). + let store = create_store_with_config( + &pool, + QueueStorageConfig { + schema: schema.to_string(), + lease_claim_receipts: true, + ..Default::default() + }, + ) + .await; + + store + .enqueue_batch(&pool, queue, 1, 5) + .await + .expect("enqueue"); + let claimed = store + .claim_runtime_batch(&pool, queue, 5, std::time::Duration::from_secs(30)) + .await + .expect("claim"); + + // Guard the test against silent fast-path elision: if anything in + // `receipt_fast_complete_candidate`'s eligibility ever changes (or + // if `enqueue_batch` starts producing jobs with metadata/tags/ + // unique_keys that disqualify the fast path), this assertion fails + // loudly so the counter wiring stays under test. Without it, a + // future regression could silently fall through to + // `complete_claimed_batch` (which routes via `insert_done_rows_tx`) + // and the fused-CTE counter_upsert stage would go untested. + for entry in &claimed { + assert!( + entry.claim.lease_claim_receipt, + "test setup must drive the receipt fast path: \ + entry has lease_claim_receipt={}", + entry.claim.lease_claim_receipt + ); + assert!( + entry.job.unique_key.is_none() + && entry.job.tags.is_empty() + && entry.job.errors.as_ref().is_none_or(Vec::is_empty), + "test setup must satisfy receipt_fast_complete_candidate: \ + unique_key.is_none={}, tags.empty={}, errors.empty={}", + entry.job.unique_key.is_none(), + entry.job.tags.is_empty(), + entry.job.errors.as_ref().is_none_or(Vec::is_empty), + ); + } + + let completed = store + .complete_runtime_batch(&pool, &claimed) + .await + .expect("complete"); + assert_eq!(completed.len(), 5); + + let live_sum = live_count_sum(&pool, schema, queue).await; + let done_count = done_entries_count(&pool, schema, queue).await; + assert_eq!(done_count, 5); + assert_eq!( + live_sum, done_count, + "fused receipt fast path must also increment the counter" + ); +} + +/// #290: every terminal-delete path keeps the counter invariant tight. +/// Exercises retry-from-terminal (`retry_job_tx`), single DLQ move +/// (`move_failed_to_dlq`), bulk DLQ move (`bulk_move_failed_to_dlq`), +/// and discard (`discard_failed_by_kind`). The invariant +/// `SUM(live_terminal_count) == count(*) FROM done_entries` must hold +/// after each operation. +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn test_queue_terminal_live_counts_decrement_on_terminal_delete_paths() { + let _guard = QUEUE_STORAGE_RUNTIME_LOCK.lock().await; + let pool = setup_pool(10).await; + let queue = "qs_terminal_live_counts_delete"; + let schema = "awa_qs_terminal_live_counts_delete"; + let store = create_store(&pool, schema).await; + + // Seed a known terminal population: 3 completed, 3 failed, 3 cancelled, + // direct SQL into done_entries with matching counter rows. Going through + // claim/complete would also work but adds noise; the writer paths are + // already tested above. Here we want to focus the test on delete-path + // decrements. + seed_terminal_rows(&pool, schema, queue, "completed", 3).await; + seed_terminal_rows(&pool, schema, queue, "failed", 3).await; + seed_terminal_rows(&pool, schema, queue, "cancelled", 3).await; + + let live_sum = live_count_sum(&pool, schema, queue).await; + let done_count = done_entries_count(&pool, schema, queue).await; + assert_eq!(done_count, 9); + assert_eq!(live_sum, done_count, "invariant after seeding terminals"); + + // 1. Single DLQ move: pick one failed job and move it. + let failed_job_id = first_failed_job_id(&pool, schema, queue).await; + store + .move_failed_to_dlq(&pool, failed_job_id, "manual-test") + .await + .expect("move_failed_to_dlq"); + assert_invariant_holds(&pool, schema, queue, "after move_failed_to_dlq").await; + + // 2. Bulk DLQ move: drain all remaining failed rows. Seeded 3 + // `failed` rows; the single move in step 1 removed one, so 2 + // remain. Exact assertion catches a silent no-op on the + // single-move path (which would make bulk pick up all 3 and + // still leave the invariant balanced). + let moved = store + .bulk_move_failed_to_dlq(&pool, None, Some(queue), "bulk-test") + .await + .expect("bulk_move_failed_to_dlq"); + assert_eq!( + moved, 2, + "expected exactly 2 failed rows to remain after step 1" + ); + assert_invariant_holds(&pool, schema, queue, "after bulk_move_failed_to_dlq").await; + + // 3. Discard: seed fresh failed rows of a unique kind, then + // `discard_failed_by_kind`. We use a unique kind for this seeding + // to scope the discard precisely. + seed_terminal_rows_with_kind(&pool, schema, queue, "failed", "discard_kind", 4).await; + assert_invariant_holds(&pool, schema, queue, "after reseed before discard").await; + let discarded = store + .discard_failed_by_kind(&pool, "discard_kind") + .await + .expect("discard_failed_by_kind"); + assert_eq!(discarded, 4, "discard should remove the 4 seeded rows"); + assert_invariant_holds(&pool, schema, queue, "after discard_failed_by_kind").await; + + // 4. Retry-from-terminal: pick a seeded cancelled row and retry it. + // retry_job_tx deletes the terminal row and inserts a fresh ready + // row. + let cancelled_job_id = first_cancelled_job_id(&pool, schema, queue).await; + let retried = store + .retry_job(&pool, cancelled_job_id) + .await + .expect("retry_job"); + assert!(retried.is_some(), "retry_job should succeed for cancelled"); + assert_invariant_holds(&pool, schema, queue, "after retry_job from terminal").await; +} + +/// #290: prune folds counter rows into queue_terminal_rollups and +/// clears the slot's live counter, keeping the invariant intact across +/// rotate + prune. +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn test_queue_terminal_live_counts_prune_folds_into_rollups() { + let _guard = QUEUE_STORAGE_RUNTIME_LOCK.lock().await; + let pool = setup_pool(10).await; + let queue = "qs_terminal_live_counts_prune"; + let schema = "awa_qs_terminal_live_counts_prune"; + let store = create_store(&pool, schema).await; + + store + .enqueue_batch(&pool, queue, 1, 5) + .await + .expect("enqueue"); + let claimed = store.claim_batch(&pool, queue, 5).await.expect("claim"); + store + .complete_batch(&pool, &claimed) + .await + .expect("complete"); + + assert_eq!(done_entries_count(&pool, schema, queue).await, 5); + assert_eq!(live_count_sum(&pool, schema, queue).await, 5); + + match store.rotate(&pool).await.expect("rotate") { + awa_model::queue_storage::RotateOutcome::Rotated { .. } => {} + other => panic!("expected Rotated, got {other:?}"), + } + match store.prune_oldest(&pool).await.expect("prune_oldest") { + awa_model::queue_storage::PruneOutcome::Pruned { .. } => {} + other => panic!("expected Pruned, got {other:?}"), + } + + // After prune: + // - done_entries for this queue is empty (partition truncated) + // - queue_terminal_live_counts has no rows for this slot + // - queue_terminal_rollups.pruned_completed_count absorbed the 5 + assert_eq!(done_entries_count(&pool, schema, queue).await, 0); + assert_eq!( + live_count_sum(&pool, schema, queue).await, + 0, + "live counter for the pruned slot must be cleared" + ); + let rollup: i64 = sqlx::query_scalar::<_, i64>(&format!( + "SELECT COALESCE(SUM(pruned_completed_count), 0)::bigint \ + FROM {schema}.queue_terminal_rollups WHERE queue = $1" + )) + .bind(queue) + .fetch_one(&pool) + .await + .expect("rollup sum"); + assert_eq!(rollup, 5, "rollup absorbed the pruned slot's counter"); +} + +/// Direct-SQL seed for terminal-state test rows. Inserts into +/// `done_entries` AND `queue_terminal_live_counts` so the invariant +/// holds at seed time; the test then exercises delete paths to verify +/// they decrement the counter correctly. +async fn seed_terminal_rows(pool: &sqlx::PgPool, schema: &str, queue: &str, state: &str, n: i64) { + seed_terminal_rows_with_kind(pool, schema, queue, state, "chaos_job", n).await; +} + +async fn seed_terminal_rows_with_kind( + pool: &sqlx::PgPool, + schema: &str, + queue: &str, + state: &str, + kind: &str, + n: i64, +) { + // Insert N done_entries rows at (ready_slot=0, priority=2, + // enqueue_shard=0). lane_seq stays unique per call by reading the + // current max and adding rownums. + let next_lane_seq: i64 = sqlx::query_scalar::<_, i64>(&format!( + "SELECT COALESCE(max(lane_seq), 0)::bigint + 1 \ + FROM {schema}.done_entries WHERE queue = $1" + )) + .bind(queue) + .fetch_one(pool) + .await + .expect("max lane_seq"); + let next_job_id: i64 = sqlx::query_scalar::<_, i64>( + "SELECT COALESCE(max(id), 1000000)::bigint + 1 FROM awa.jobs_hot", + ) + .fetch_one(pool) + .await + .unwrap_or(1_000_000); + sqlx::query(&format!( + r#" + INSERT INTO {schema}.done_entries ( + ready_slot, ready_generation, job_id, kind, queue, state, + priority, attempt, run_lease, lane_seq, enqueue_shard, + attempted_at, finalized_at, payload + ) + SELECT + 0, + 1, + $1::bigint + g - 1, + $2, + $3, + $4::awa.job_state, + 2::smallint, + 1::smallint, + 1::bigint, + $5::bigint + g - 1, + 0::smallint, + now(), + now(), + '{{}}'::jsonb + FROM generate_series(1, $6::int) AS g + "# + )) + .bind(next_job_id) + .bind(kind) + .bind(queue) + .bind(state) + .bind(next_lane_seq) + .bind(n as i32) + .execute(pool) + .await + .expect("seed done_entries"); + + sqlx::query(&format!( + r#" + INSERT INTO {schema}.queue_terminal_live_counts AS counts ( + ready_slot, queue, priority, enqueue_shard, live_terminal_count + ) + VALUES (0, $1, 2::smallint, 0::smallint, $2::bigint) + ON CONFLICT (ready_slot, queue, priority, enqueue_shard) DO UPDATE + SET live_terminal_count = counts.live_terminal_count + EXCLUDED.live_terminal_count + "# + )) + .bind(queue) + .bind(n) + .execute(pool) + .await + .expect("seed live counter"); +} + +async fn first_failed_job_id(pool: &sqlx::PgPool, schema: &str, queue: &str) -> i64 { + sqlx::query_scalar::<_, i64>(&format!( + "SELECT job_id FROM {schema}.done_entries \ + WHERE queue = $1 AND state = 'failed' LIMIT 1" + )) + .bind(queue) + .fetch_one(pool) + .await + .expect("first failed job id") +} + +async fn first_cancelled_job_id(pool: &sqlx::PgPool, schema: &str, queue: &str) -> i64 { + sqlx::query_scalar::<_, i64>(&format!( + "SELECT job_id FROM {schema}.done_entries \ + WHERE queue = $1 AND state = 'cancelled' LIMIT 1" + )) + .bind(queue) + .fetch_one(pool) + .await + .expect("first cancelled job id") +} + +async fn assert_invariant_holds(pool: &sqlx::PgPool, schema: &str, queue: &str, label: &str) { + let live = live_count_sum(pool, schema, queue).await; + let done = done_entries_count(pool, schema, queue).await; + assert_eq!( + live, done, + "#290 invariant ({label}): SUM(live_terminal_count) ({live}) \ + must equal count(*) FROM done_entries ({done})" + ); +} + +/// #290 — SQL compat `DELETE FROM awa.jobs WHERE id = $1` routes to +/// `awa.delete_job_compat()`. The v022 migration teaches the done_entries +/// branch of that function to decrement `queue_terminal_live_counts`. +/// Without it, deleting a terminal row via the SQL compat path drifts +/// the counter from the underlying table — and once #305's read switch +/// lands, the drift becomes operator-visible. +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn test_queue_terminal_live_counts_decrement_on_sql_compat_delete() { + let _guard = QUEUE_STORAGE_RUNTIME_LOCK.lock().await; + let pool = setup_pool(10).await; + let queue = "qs_terminal_live_counts_sql_compat"; + let schema = "awa_qs_terminal_live_counts_sql_compat"; + // create_store activates this schema via the storage transition + // dance (prepare → enter_mixed_transition → finalize), so + // `awa.active_queue_storage_schema()` returns `schema` and + // `awa.delete_job_compat()` routes to it. + let _store = create_store(&pool, schema).await; + + seed_terminal_rows(&pool, schema, queue, "completed", 5).await; + assert_eq!(done_entries_count(&pool, schema, queue).await, 5); + assert_eq!(live_count_sum(&pool, schema, queue).await, 5); + + let target_id: i64 = sqlx::query_scalar::<_, i64>(&format!( + "SELECT job_id FROM {schema}.done_entries WHERE queue = $1 LIMIT 1" + )) + .bind(queue) + .fetch_one(&pool) + .await + .expect("first done job id"); + + let deleted: bool = sqlx::query_scalar("SELECT awa.delete_job_compat($1)") + .bind(target_id) + .fetch_one(&pool) + .await + .expect("delete_job_compat call"); + assert!( + deleted, + "delete_job_compat should return true for an existing terminal row" + ); + + assert_eq!(done_entries_count(&pool, schema, queue).await, 4); + assert_invariant_holds(&pool, schema, queue, "after SQL compat delete").await; +} + +/// #290 — `rebuild_terminal_counters` truncates and re-aggregates from +/// `done_entries`, restoring the invariant after any drift. +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn test_queue_terminal_live_counts_rebuild_restores_invariant() { + let _guard = QUEUE_STORAGE_RUNTIME_LOCK.lock().await; + let pool = setup_pool(10).await; + let queue = "qs_terminal_live_counts_rebuild"; + let schema = "awa_qs_terminal_live_counts_rebuild"; + let store = create_store(&pool, schema).await; + + // Seed 7 terminal rows with matching counter entries, then manually + // poison the counter to simulate rollover drift. + seed_terminal_rows(&pool, schema, queue, "completed", 7).await; + sqlx::query(&format!( + "UPDATE {schema}.queue_terminal_live_counts SET live_terminal_count = 999 WHERE queue = $1" + )) + .bind(queue) + .execute(&pool) + .await + .expect("poison counter"); + + // Sanity-check the drift is present. + assert_eq!(live_count_sum(&pool, schema, queue).await, 999); + assert_eq!(done_entries_count(&pool, schema, queue).await, 7); + + let rebuilt = store + .rebuild_terminal_counters(&pool) + .await + .expect("rebuild"); + assert!(rebuilt >= 1, "rebuild should re-populate counter rows"); + assert_invariant_holds(&pool, schema, queue, "after rebuild").await; +} + +async fn live_count_sum(pool: &sqlx::PgPool, schema: &str, queue: &str) -> i64 { + sqlx::query_scalar::<_, i64>(&format!( + "SELECT COALESCE(SUM(live_terminal_count), 0)::bigint \ + FROM {schema}.queue_terminal_live_counts WHERE queue = $1" + )) + .bind(queue) + .fetch_one(pool) + .await + .expect("sum live_terminal_count") +} + +async fn done_entries_count(pool: &sqlx::PgPool, schema: &str, queue: &str) -> i64 { + sqlx::query_scalar::<_, i64>(&format!( + "SELECT count(*)::bigint FROM {schema}.done_entries WHERE queue = $1" + )) + .bind(queue) + .fetch_one(pool) + .await + .expect("count done_entries") +} + #[tokio::test(flavor = "multi_thread", worker_threads = 4)] async fn test_queue_storage_striped_claims_probe_stripes_round_robin() { let _guard = QUEUE_STORAGE_RUNTIME_LOCK.lock().await; diff --git a/docs/adr/029-transactional-followup-jobs.md b/docs/adr/029-transactional-followup-jobs.md index 479fc541..818de57c 100644 --- a/docs/adr/029-transactional-followup-jobs.md +++ b/docs/adr/029-transactional-followup-jobs.md @@ -312,10 +312,14 @@ positioning (ADR-001). Out of scope. the correct mechanism for observation; this ADR adds the mechanism for delivery. - **ADR-027** (callback ingress, proposed). Addresses the open question - ADR-027 punts on: durable callback notifications. A callback-only - ingress process dispatches follow-ups best-effort after the - resolution commits; no in-process handler registry is required for - delivery. + ADR-027 punts on: durable callback notifications. The worker `Client` + drives resolution + follow-up dispatch in a single transaction via + `admin::*_external_in_tx`. A callback-only ingress process built per + ADR-027 will need its own registry hookup to reuse the same + `_in_tx` admin path; until that hookup lands, callback ingress + outside the worker `Client` is a separate transition with no + follow-up dispatch. See the "Open extensions" section below for the + current implementation boundary. - **ADR-028** (maintenance-only runtime, proposed). Gives rescue paths a way to dispatch durable lifecycle work without owning a handler registry. The dispatch is best-effort (separate transaction); the diff --git a/docs/callback-receivers.md b/docs/callback-receivers.md index c6663030..0efe6541 100644 --- a/docs/callback-receivers.md +++ b/docs/callback-receivers.md @@ -31,7 +31,7 @@ The callback receiver routes mutate job state. Specifically, `complete` and `fail` are terminal — once they succeed, the job leaves `waiting_external`. A receiver that does not verify `X-Awa-Signature` must be protected another way (mTLS at the load balancer, IP allow-list, -private VPC). See [ADR-027](./adr/027-callback-ingress.md) for the +private VPC). See [ADR-027](./adr/027-callback-ingress-surface.md) for the deployable-role model. ## Custom axum receiver (Rust) diff --git a/docs/http-callbacks.md b/docs/http-callbacks.md index fcde2069..b7574955 100644 --- a/docs/http-callbacks.md +++ b/docs/http-callbacks.md @@ -95,7 +95,7 @@ The runtime builds callback URLs as: built-in `awa serve` route layout. Override it when the callback receiver is mounted somewhere else — for example, a callback-only deployment behind a reverse proxy, or a user-owned API layer that hosts the routes inside -a FastAPI / axum application (see [ADR-027](./adr/027-callback-ingress.md)): +a FastAPI / axum application (see [ADR-027](./adr/027-callback-ingress-surface.md)): ```rust let client = Client::builder(pool) @@ -205,7 +205,7 @@ convenient for development but undesirable when callbacks must be externally reachable while the admin surface must stay private. For that case Awa ships a callback-only receiver as a deployable role -(see [ADR-027](./adr/027-callback-ingress.md)): +(see [ADR-027](./adr/027-callback-ingress-surface.md)): ```text awa callbacks serve \