From 69fdc52ebbe3552854bae9c6411e9d5b9310a280 Mon Sep 17 00:00:00 2001 From: Brian Thorne Date: Sat, 2 May 2026 17:43:41 +1200 Subject: [PATCH 1/4] Reduce claimer heartbeat churn --- awa-model/src/queue_storage.rs | 102 +++++-- awa/tests/queue_storage_runtime_test.rs | 105 ++++++- docs/architecture.md | 375 +++++++++++++++++++----- 3 files changed, 477 insertions(+), 105 deletions(-) diff --git a/awa-model/src/queue_storage.rs b/awa-model/src/queue_storage.rs index 231ad97f..2df877a0 100644 --- a/awa-model/src/queue_storage.rs +++ b/awa-model/src/queue_storage.rs @@ -86,6 +86,40 @@ pub struct QueueClaimerLease { pub lease_epoch: i64, } +#[derive(Debug, Clone, Copy, PartialEq, Eq, sqlx::FromRow)] +struct QueueClaimerLeaseRow { + claimer_slot: i16, + lease_epoch: i64, + last_claimed_at: DateTime, + expires_at: DateTime, +} + +impl QueueClaimerLeaseRow { + fn lease(self) -> QueueClaimerLease { + QueueClaimerLease { + claimer_slot: self.claimer_slot, + lease_epoch: self.lease_epoch, + } + } + + fn needs_refresh( + self, + now: DateTime, + lease_ttl: Duration, + idle_threshold: Duration, + ) -> bool { + let Ok(idle_refresh_delta) = TimeDelta::from_std(idle_threshold / 2) else { + return true; + }; + let Ok(expiry_refresh_delta) = TimeDelta::from_std(lease_ttl / 2) else { + return true; + }; + + self.last_claimed_at <= now - idle_refresh_delta + || self.expires_at <= now + expiry_refresh_delta + } +} + #[derive(Debug, Clone, Copy, PartialEq, Eq, sqlx::FromRow)] pub struct QueueClaimerState { pub target_claimers: i16, @@ -2352,13 +2386,13 @@ impl QueueStorage { .await .map_err(map_sqlx_error)?; - // mark_queue_claimer_active updates last_claimed_at + expires_at - // every heartbeat (~30/sec/row at 4-replica scale). HOT updates - // require free space on the same page as the old tuple, which - // default fillfactor=100% denies. Without explicit fillfactor the - // 30-min repro saw n_tup_hot_upd=2 / n_tup_upd=266116 — every - // heartbeat spilled to a fresh page. Match the pattern of the - // other 1-row-per-(queue, slot) hot Warm tables. + // mark_queue_claimer_active refreshes last_claimed_at + expires_at + // when a claimer lease is nearing the idle-steal threshold. HOT + // updates require free space on the same page as the old tuple, + // which default fillfactor=100% denies. Without explicit + // fillfactor, high-replica repros spilled frequent lease refreshes + // to fresh pages. Match the pattern of the other + // 1-row-per-(queue, slot) hot Warm tables. sqlx::query(&format!( r#" ALTER TABLE {schema}.queue_claimer_leases SET ( @@ -2387,13 +2421,13 @@ impl QueueStorage { .await .map_err(map_sqlx_error)?; - // expires_at is updated on every heartbeat (mark_queue_claimer_active - // → SET expires_at = $now + ttl). Any column referenced by an - // index — INCLUDE columns count for HOT-blocking purposes on - // PG 17 — disqualifies the update from HOT. Empirically observed - // 0% HOT ratio at 4×8 with both `(queue, owner_instance_id, - // expires_at)` and `(queue, owner_instance_id) INCLUDE - // (expires_at)` index shapes. + // expires_at is updated by mark_queue_claimer_active when a + // claimer lease needs refresh. Any column referenced by an index — + // INCLUDE columns count for HOT-blocking purposes on PG 17 — + // disqualifies the update from HOT. Empirically observed 0% HOT + // ratio at 4×8 with both `(queue, owner_instance_id, expires_at)` + // and `(queue, owner_instance_id) INCLUDE (expires_at)` index + // shapes. // // Drop expires_at from the index entirely. The SELECT at // acquire_queue_claimer that filters `expires_at > $now` falls @@ -5128,6 +5162,28 @@ impl QueueStorage { lease_ttl: Duration, idle_threshold: Duration, ) -> Result, AwaError> { + Ok(self + .acquire_queue_claimer_row( + pool, + queue, + instance_id, + max_claimers, + lease_ttl, + idle_threshold, + ) + .await? + .map(QueueClaimerLeaseRow::lease)) + } + + async fn acquire_queue_claimer_row( + &self, + pool: &PgPool, + queue: &str, + instance_id: Uuid, + max_claimers: i16, + lease_ttl: Duration, + idle_threshold: Duration, + ) -> Result, AwaError> { if max_claimers <= 0 { return Ok(None); } @@ -5148,9 +5204,9 @@ impl QueueStorage { 0 }; - if let Some(owned) = sqlx::query_as::<_, QueueClaimerLease>(&format!( + if let Some(owned) = sqlx::query_as::<_, QueueClaimerLeaseRow>(&format!( r#" - SELECT claimer_slot, lease_epoch + SELECT claimer_slot, lease_epoch, last_claimed_at, expires_at FROM {schema}.queue_claimer_leases WHERE queue = $1 AND owner_instance_id = $2 @@ -5171,7 +5227,7 @@ impl QueueStorage { for offset in 0..max_claimers { let slot = (probe_start + offset) % max_claimers; - if let Some(updated) = sqlx::query_as::<_, QueueClaimerLease>(&format!( + if let Some(updated) = sqlx::query_as::<_, QueueClaimerLeaseRow>(&format!( r#" UPDATE {schema}.queue_claimer_leases SET owner_instance_id = $3, @@ -5189,7 +5245,7 @@ impl QueueStorage { OR expires_at <= $4 OR last_claimed_at <= $6 ) - RETURNING claimer_slot, lease_epoch + RETURNING claimer_slot, lease_epoch, last_claimed_at, expires_at "# )) .bind(queue) @@ -5205,7 +5261,7 @@ impl QueueStorage { return Ok(Some(updated)); } - if let Some(inserted) = sqlx::query_as::<_, QueueClaimerLease>(&format!( + if let Some(inserted) = sqlx::query_as::<_, QueueClaimerLeaseRow>(&format!( r#" INSERT INTO {schema}.queue_claimer_leases ( queue, @@ -5218,7 +5274,7 @@ impl QueueStorage { ) VALUES ($1, $2, $3, 0, $4, $4, $5) ON CONFLICT (queue, claimer_slot) DO NOTHING - RETURNING claimer_slot, lease_epoch + RETURNING claimer_slot, lease_epoch, last_claimed_at, expires_at "# )) .bind(queue) @@ -5438,7 +5494,7 @@ impl QueueStorage { .await?; let Some(lease) = self - .acquire_queue_claimer( + .acquire_queue_claimer_row( pool, queue, instance_id, @@ -5461,9 +5517,9 @@ impl QueueStorage { ) .await?; - if !claimed.is_empty() { + if !claimed.is_empty() && lease.needs_refresh(Utc::now(), lease_ttl, idle_threshold) { let _ = self - .mark_queue_claimer_active(pool, queue, instance_id, lease, lease_ttl) + .mark_queue_claimer_active(pool, queue, instance_id, lease.lease(), lease_ttl) .await?; } diff --git a/awa/tests/queue_storage_runtime_test.rs b/awa/tests/queue_storage_runtime_test.rs index 37d9f4b8..c4e37ef7 100644 --- a/awa/tests/queue_storage_runtime_test.rs +++ b/awa/tests/queue_storage_runtime_test.rs @@ -11,7 +11,7 @@ use awa::{ Client, InsertOpts, JobArgs, JobContext, JobError, JobResult, JobRow, JobState, QueueConfig, UniqueOpts, Worker, }; -use chrono::Utc; +use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; use sqlx::postgres::PgPoolOptions; use std::collections::HashSet; @@ -4082,6 +4082,109 @@ async fn test_queue_storage_bounded_claimers_can_steal_idle_slot() { ); } +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn test_queue_storage_claimer_heartbeat_skips_fresh_lease() { + let _guard = QUEUE_STORAGE_RUNTIME_LOCK.lock().await; + let pool = setup_pool(10).await; + let schema = "awa_qs_bounded_claimers_heartbeat"; + let store = create_store(&pool, schema).await; + let queue = "qs_bounded_claimers_heartbeat"; + let instance = Uuid::new_v4(); + let ttl = Duration::from_secs(3); + let idle_threshold = Duration::from_millis(500); + + let lease = store + .acquire_queue_claimer(&pool, queue, instance, 1, ttl, idle_threshold) + .await + .expect("instance should acquire claimer") + .expect("instance should get a claimer slot"); + + let before: DateTime = sqlx::query_scalar(&format!( + "SELECT last_claimed_at FROM {schema}.queue_claimer_leases WHERE queue = $1 AND claimer_slot = $2" + )) + .bind(queue) + .bind(lease.claimer_slot) + .fetch_one(&pool) + .await + .expect("failed to read initial heartbeat"); + + store + .enqueue_batch(&pool, queue, 1, 1) + .await + .expect("failed to enqueue fresh-lease claim job"); + let claimed = store + .claim_runtime_batch_with_aging_for_instance( + &pool, + queue, + 1, + Duration::from_secs(300), + Duration::from_secs(60), + instance, + 1, + ttl, + idle_threshold, + ) + .await + .expect("fresh lease claim should succeed"); + assert_eq!(claimed.len(), 1); + + let after_fresh: DateTime = sqlx::query_scalar(&format!( + "SELECT last_claimed_at FROM {schema}.queue_claimer_leases WHERE queue = $1 AND claimer_slot = $2" + )) + .bind(queue) + .bind(lease.claimer_slot) + .fetch_one(&pool) + .await + .expect("failed to read skipped heartbeat"); + assert_eq!( + after_fresh, before, + "fresh heartbeat should not rewrite queue_claimer_leases" + ); + + sqlx::query(&format!( + "UPDATE {schema}.queue_claimer_leases SET last_claimed_at = $1 WHERE queue = $2 AND claimer_slot = $3" + )) + .bind(Utc::now() - chrono::Duration::milliseconds(300)) + .bind(queue) + .bind(lease.claimer_slot) + .execute(&pool) + .await + .expect("failed to age claimer lease heartbeat"); + + store + .enqueue_batch(&pool, queue, 1, 1) + .await + .expect("failed to enqueue stale-lease claim job"); + let claimed = store + .claim_runtime_batch_with_aging_for_instance( + &pool, + queue, + 1, + Duration::from_secs(300), + Duration::from_secs(60), + instance, + 1, + ttl, + idle_threshold, + ) + .await + .expect("stale lease claim should succeed"); + assert_eq!(claimed.len(), 1); + + let after_stale: DateTime = sqlx::query_scalar(&format!( + "SELECT last_claimed_at FROM {schema}.queue_claimer_leases WHERE queue = $1 AND claimer_slot = $2" + )) + .bind(queue) + .bind(lease.claimer_slot) + .fetch_one(&pool) + .await + .expect("failed to read refreshed heartbeat"); + assert!( + after_stale > after_fresh, + "stale heartbeat should refresh queue_claimer_leases" + ); +} + #[tokio::test(flavor = "multi_thread", worker_threads = 4)] async fn test_queue_storage_prune_oldest_blocks_on_reader_lock() { let _guard = QUEUE_STORAGE_RUNTIME_LOCK.lock().await; diff --git a/docs/architecture.md b/docs/architecture.md index 51df460b..d2015b89 100644 --- a/docs/architecture.md +++ b/docs/architecture.md @@ -1,63 +1,108 @@ # Awa Architecture Overview -## System Overview - -Awa (Māori: river) is a Postgres-native background job queue providing durable, transactional job processing for Rust and Python. Postgres is the sole infrastructure dependency -- there is no Redis, RabbitMQ, or other broker. All queue state lives in Postgres, and all coordination uses Postgres primitives: `FOR UPDATE SKIP LOCKED` for dispatch, advisory locks for leader election, `LISTEN/NOTIFY` for wakeup, and transactions for atomic enqueue. - -The Rust runtime owns all queue machinery -- polling, heartbeating, crash recovery, and dispatch. Python workers are callbacks invoked by this runtime via PyO3, inheriting Rust-grade reliability without reimplementing queue internals. - -## Storage Engine - -Queue storage is Awa's worker engine. It is organised into three planes — queue -data, execution state, and control metadata — with append-only tables on the -queue plane and rotating segments for lease churn. +Awa (Māori: river) is a Postgres-native background job queue for Rust and +Python. Postgres is the sole infrastructure dependency: there is no Redis, +RabbitMQ, or separate scheduler. Jobs can be inserted in the producer's own +transaction, the Rust runtime owns dispatch, leases, heartbeats, rescue, and +maintenance, and Python workers run on that same runtime through PyO3. + +This document mirrors the architecture deck. Each level zooms one step further +in: + +- **L1** System overview +- **L2** Storage planes +- **L3** Job lifecycle +- **L4** Dispatcher claim path +- **L5** Crash recovery +- **L6** Maintenance leader +- **L7** Callback orchestration + +## L1 - System Overview + +```mermaid +flowchart LR + subgraph Producers["Producers (Rust or Python)"] + P1["BEGIN
app writes
awa insert
COMMIT"] + end + + subgraph PG["Postgres - sole infrastructure dependency"] + direction TB + Queue["Queue plane
ready_entries
deferred_jobs
done_entries
dlq_entries"] + Exec["Execution plane
lease_claims
lease_claim_closures
leases
attempt_state"] + Ctrl["Control plane
queue_lanes
queue_enqueue_heads
queue_claim_heads
*_ring_state
*_ring_slots
job_unique_claims
cron_jobs
runtime_instances"] + end + + subgraph Workers["Workers"] + RW["Rust runtime
dispatch
leases
heartbeats
rescue
maintenance"] + PY["Python workers
PyO3 bindings
same runtime"] + HTTP["HTTP workers
Lambda / Cloud Run
HMAC-signed callbacks"] + end + + P1 -- "transactional enqueue" --> Queue + Queue -- "LISTEN/NOTIFY + poll fallback" --> RW + RW <--> Exec + RW <--> Ctrl + PY -. "PyO3" .-> RW + HTTP -- "X-Awa-Signature" --> RW +``` -```text - Producers (Rust, Python) - │ - ▼ - ┌─────────────────────────────────────────────────────────┐ - │ Queue plane (append-only) │ - │ ready_entries ──claim──► lease_claims (receipt) / active_leases │ - │ │ │ - │ ├──retry──► deferred_jobs │ - │ │ │ │ - │ │ └─promote──┤ - │ ├──fail────► dlq_entries │ - │ └──complete─► terminal_entries - └─────────────────────────┬───────────────────────────────┘ - │ - ┌───────────────┼───────────────┐ - ▼ ▼ ▼ - Execution plane Control plane Maintenance leader - lease_claims lane_state promote / rescue / - lease_claim_closures enqueue_head rotate / prune / - active_leases claim_head dlq retention - attempt_state *_ring_state - (optional) *_ring_slots +The maintenance service runs in every worker process, but only the instance +holding the session-scoped advisory lock executes cluster-wide maintenance +tasks. The heartbeat service also runs in every worker process and refreshes +the attempts owned by that process. + +## L2 - Storage Planes + +Queue storage is Awa's worker engine. It is organised into three table families: +queue data, execution state, and control metadata. + +```mermaid +flowchart TB + subgraph QP["Queue plane - append-first, segmented where hot"] + Ready["ready_entries"] + Deferred["deferred_jobs"] + Done["done_entries"] + DLQ["dlq_entries"] + end + + subgraph EP["Execution plane"] + direction LR + subgraph Receipts["Receipt path - default for short jobs"] + LC["lease_claims
(append)"] + LCC["lease_claim_closures
(append)"] + LiveR["open receipt = claims anti-join closures"] + LC --> LiveR + LCC --> LiveR + end + subgraph Leases["Materialized lease path"] + L["leases
(running / waiting_external)"] + AS["attempt_state
(lazy progress / callback state)"] + end + end + + subgraph CP["Control plane"] + Rings["queue / lease / claim ring state"] + Heads["queue_enqueue_heads
queue_claim_heads
queue_lanes"] + Meta["queue_meta
runtime_instances
descriptors"] + Uniq["job_unique_claims"] + Cron["cron_jobs"] + end + + QP --> EP + EP --> CP ``` -Each queue-plane family (`ready_entries`, `deferred_jobs`, `terminal_entries`, -`dlq_entries`) rotates through segmented partitions so long-lived history does -not sit in one mutable heap. The lease plane (ADR-019 / ADR-023) is the same -shape: - -- zero-deadline short jobs stay on append-only `lease_claims` (a partitioned - table reclaimed by ring rotation + `TRUNCATE`, not row-level vacuum) -- the "currently open" set is derived at query time as the anti-join - `lease_claims` ⨝̸ `lease_claim_closures` over the active claim-ring - partitions; both tables share the same `claim_slot` partition key so a - partition's claims and closures are reclaimed together by `TRUNCATE` -- stale zero-deadline short jobs can be rescued from `lease_claims` after the - grace window without first creating a mutable lease row -- heartbeat/progress-only receipt-backed jobs can materialize into - `attempt_state` without creating a mutable lease row -- callback/wait or lease-specific mutation paths still materialize into - `active_leases` -- `attempt_state` is created lazily and only for jobs that need mutable - per-attempt data (progress, callback state) -- queue storage applies effective priority aging at claim time, so old waiting - jobs rise in priority without physically rewriting ready rows +- **Receipt path.** A claim writes an append-only `lease_claims` row. Completion + writes an append-only `lease_claim_closures` row and then appends the terminal, + retry, or DLQ row. The live set is the bounded active-partition anti-join of + claims without closures. See [ADR-023](adr/023-receipt-plane-ring-partitioning.md). +- **Materialized lease path.** Jobs that need mutable live state materialize + into `leases`, with optional `attempt_state` for progress, callback filters, + callback results, and other per-attempt mutable data. +- **Reclamation.** Hot queue and receipt families use ring partitions and + `TRUNCATE`-based prune rather than relying on row-level vacuum for the hot + path. `deferred_jobs` and `dlq_entries` are backlog/history tables rather + than the steady completion hot path. ADR-019 is the storage-engine source of truth; ADR-023 supersedes it for the receipt plane (`lease_claims` / `lease_claim_closures` partitioning): @@ -66,29 +111,26 @@ receipt plane (`lease_claims` / `lease_claim_closures` partitioning): - [ADR-023: Receipt Plane Ring Partitioning](adr/023-receipt-plane-ring-partitioning.md) This overview focuses on the current runtime boundaries and subsystems. -Migration and compatibility surfaces for older SQL entry points are -documented in [migrations.md](migrations.md). +Migration and compatibility surfaces for older SQL entry points are documented +in [migrations.md](migrations.md). -### Queue storage at a glance +### Queue Storage At A Glance -- queue plane: append-only `ready_entries`, `deferred_jobs`, - `terminal_entries`, and `dlq_entries` +- queue plane: `ready_entries`, `deferred_jobs`, `done_entries`, and + `dlq_entries` - execution plane: partitioned `lease_claims` + `lease_claim_closures` - (claim-ring; ADR-023), narrow `active_leases`, plus optional per-attempt - `attempt_state` -- control plane: cold `lane_state`, hot `queue_enqueue_heads`, - hot `queue_claim_heads`, plus ready/lease segment cursor tables -- `lane_state` stays off the terminal-completion hot path: live completion - totals come from `terminal_entries`, and the cached cold rollup used to keep - counts stable across prune lives outside `lane_state` -- live availability reads `sum(queue_lanes.available_count)` — a - denormalized counter the queue-storage SQL functions keep in sync with - `ready_entries` inserts and claim head advances. A drift-detection test - pins the counter to the live-row count. -- maintenance leader: promotion, rescue, rotation, prune, DLQ retention, and - queue-health publication - -### Queue Striping and Claim Authority + (claim-ring; ADR-023), `leases`, plus optional per-attempt `attempt_state` +- control plane: `queue_lanes`, `queue_enqueue_heads`, `queue_claim_heads`, + ready / lease / claim ring tables, `queue_meta`, descriptor catalogs, runtime + snapshots, cron rows, and uniqueness claims +- live availability reads `sum(queue_lanes.available_count)`, a denormalized + counter the queue-storage SQL functions keep in sync with `ready_entries` + inserts and claim-head advances +- maintenance leader: promotion, stale-heartbeat rescue, deadline rescue, + callback-timeout rescue, rotation, prune, DLQ retention, descriptor cleanup, + cron evaluation, priority aging, and queue-health publication + +### Queue Striping And Claim Authority Two 0.6 control mechanisms keep hot logical queues from turning into one coordination point for every replica: @@ -109,6 +151,177 @@ These mechanisms do not change the job lifecycle. They only decide which coordination path receives a ready row and which runtime instances are allowed to hit the claim path concurrently. +## L3 - Job Lifecycle + +`run_lease` increments at claim time. Later runtime mutations include +`(job_id, run_lease)` so stale completions, retries, snoozes, and cancels lose +cleanly after rescue, cancel, or re-claim. + +```mermaid +stateDiagram-v2 + [*] --> scheduled : insert with run_at > now() + [*] --> available : insert immediate + scheduled --> available : promote run_at <= now() + available --> running : claim / run_lease++ + running --> completed : handler ok + running --> retryable : handler error / backoff + retryable --> available : backoff elapsed + running --> waiting_external : WaitForCallback or wait_for_callback + waiting_external --> running : resume_external(token, payload) + running --> cancelled : handler/admin/rescue cancellation + waiting_external --> cancelled : admin cancel + running --> failed : attempts exhausted + waiting_external --> failed : callback timeout exhausted + failed --> [*] : optional move to dlq_entries + completed --> [*] + cancelled --> [*] +``` + +- `progress` JSONB is cleared on successful completion and preserved across + retry, snooze, cancel, fail, and rescue. +- DLQ is opt-in per queue. It is stored in `dlq_entries`; it is not a + dispatchable `job_state`. +- Cancellation is cooperative for live handlers. Runtime-driven shutdown, + stale-heartbeat rescue, and deadline rescue signal the local in-memory + cancellation flag when the rescuing process still has that handler registered. + A process that has panicked has no live handler left to signal; storage rescue + and the `run_lease` guard are what let another worker recover the attempt. + +## L4 - Dispatcher Claim Path + +The hot path reserves runtime capacity before claiming, then claims with a +bounded SQL function. The per-lane serializer is `queue_claim_heads ... FOR +UPDATE SKIP LOCKED`; ring cursors are read at statement snapshot, and rotation +uses compare-and-swap plus busy checks to avoid pruning a partition that a +concurrent claim can still write. + +```mermaid +sequenceDiagram + autonumber + participant W as Worker dispatcher + participant N as LISTEN/NOTIFY + participant Q as Postgres claim SQL + participant R as Ring state + + W->>W: acquire local concurrency permit + N-->>W: NOTIFY awa:<queue>
(or poll fallback) + W->>Q: claim_runtime_batch_with_aging_for_instance + Q->>Q: lock one queue_claim_heads row
FOR UPDATE SKIP LOCKED + Q->>R: read lease_ring_state and claim_ring_state
at statement snapshot + Q->>Q: append lease_claims receipt
or materialize leases row + Q->>Q: advance queue_claim_heads + Q-->>W: claimed batch + run_lease + + Note over W,Q: Receipt-backed jobs avoid mutable lease rows until
progress, callback, wait, or other lease-specific state is needed. +``` + +- `queue_enqueue_heads` allocates dense lane sequences at enqueue time. +- `queue_claim_heads` advances monotonically during claim and is the authority + for which lane position may be claimed next. +- Queue striping and bounded claimers reduce how many worker instances compete + for the same logical queue's claim path, but they do not own jobs. + +## L5 - Crash Recovery + +Awa uses two complementary rescue families: stale heartbeat rescue and hard +deadline rescue. Both are run by the elected maintenance leader; heartbeat +refresh itself is every-worker. + +```mermaid +flowchart LR + subgraph H["Heartbeat staleness"] + HB["HeartbeatService
every worker refreshes
owned attempts"] + Miss["heartbeat_at older than
heartbeat_staleness"] + HB -. "process dies or stalls" .-> Miss + Miss --> Rescue1["maintenance rescue
retry / fail / DLQ"] + end + + subgraph D["Hard deadline"] + Receipt["lease_claims.deadline_at
for receipt attempts"] + Lease["leases.deadline_at
for materialized attempts"] + Receipt --> Rescue2["rescue_expired_receipt_deadlines_tx"] + Lease --> Rescue3["rescue_expired_deadlines"] + end + + Rescue1 --> Guard["close old attempt
stale writers rejected by run_lease"] + Rescue2 --> Guard + Rescue3 --> Guard +``` + +- Defaults are 30s heartbeat interval and 90s heartbeat staleness; the + `QueueConfig::deadline_duration` default is 5 minutes. +- `deadline_duration = 0` disables hard-deadline rescue for that queue. +- When rescue happens in a process that still has the handler registered, the + runtime flips `ctx.is_cancelled()` / `job.is_cancelled()`. If the old process + is gone, the storage transition is still sufficient for another worker to + retry or fail the attempt. + +## L6 - Maintenance Leader + +A single worker instance holds the cluster-wide maintenance role through a +session-scoped Postgres advisory lock. + +```mermaid +flowchart TB + subgraph Workers["All worker processes"] + W1["worker"] + W2["worker"] + W3["worker"] + end + + Lock["pg_try_advisory_lock(0x4157415f4d41494e)"] + W1 --> Lock + W2 --> Lock + W3 --> Lock + + Lock -- "held by one session" --> Leader["Maintenance leader"] + Lock -- "retry after election interval" --> Followers["Followers"] + + Leader --> Tasks["promotion
heartbeat rescue
deadline rescue
callback timeout rescue
cron sync/eval
priority aging
queue/lease/claim rotation
prune and cleanup
queue stats"] +``` + +- Lock key: `0x4157415f4d41494e` (`AWA_MAIN`). +- If the leader process or connection dies, Postgres releases the session lock + and another worker can win the next election attempt. +- Heartbeat sending is not leader-elected; only the rescue scan is. + +## L7 - Callback Orchestration + +Two callback patterns cover external work: park the job and release the task +slot, or keep the handler task suspended while waiting for sequential external +steps. + +```mermaid +sequenceDiagram + autonumber + participant H as Handler + participant A as Awa runtime + participant E as External system + + Note over H,A: Pattern A - release the task slot + H->>A: register_callback(token) + H->>A: return WaitForCallback(token) + A->>A: state = waiting_external
task slot freed + E-->>A: complete/fail/retry callback + A->>A: terminal or retry transition + + Note over H,A: Pattern B - sequential wait in one handler + H->>A: register_callback(token) + H->>A: wait_for_callback(token).await + A->>A: state = waiting_external
handler task suspended + E-->>A: resume_external(token, payload) + A->>A: state = running
store callback result + A-->>H: payload returned in place +``` + +- Callback tokens are attempt-specific; stale tokens are rejected after a newer + callback is registered. +- `resume_external` accepts both `running` and `waiting_external` so an early + callback can win the race before the handler has fully parked. +- The `awa-ui` HTTP callback receiver verifies `X-Awa-Signature` when + `AWA_CALLBACK_HMAC_SECRET` is configured. Deployments that mount their own + callback receiver must provide equivalent authentication. + ## Control-plane descriptors Awa keeps two operator-facing descriptor catalogs, distinct from per-job payload metadata: @@ -360,7 +573,7 @@ completion and rescue. Zero-deadline short jobs can stay on the append-only receipt path until they prove they need the richer runtime semantics; the first heartbeat or progress flush lazily materializes that receipt into `attempt_state` only, while callback registration or other lease-specific -mutation paths still escalate it into `active_leases`. The currently-live +mutation paths still escalate it into `leases`. The currently-live receipt-backed set is derived at query time as the anti-join `lease_claims` ⨝̸ `lease_claim_closures` over the active claim-ring partitions (ADR-023); the partitioning is on a shared `claim_slot` so each partition's claim rows @@ -399,7 +612,7 @@ tokio::spawn(async { ├── Ok(false): already rescued/cancelled → skip metrics └── Err: DB error → log error │ - ├── Ok(Completed) → append `terminal_entries`, delete the active lease + ├── Ok(Completed) → append `done_entries`, delete or close the active attempt ├── Ok(RetryAfter) → append `deferred_jobs`, delete the active lease ├── Ok(Snooze) → append `deferred_jobs` without advancing attempt ├── Ok(Cancel) → append terminal `cancelled` @@ -436,7 +649,7 @@ Three flush paths: 3. **Explicit flush** — `ctx.flush_progress()` upserts `{schema}.attempt_state(job_id, run_lease, progress)` guarded by the active attempt identity. Receipt-backed attempts can therefore flush progress - before they ever materialize into `active_leases`. + before they ever materialize into `leases`. ```text ctx.set_progress(50, "halfway") ──► ProgressState.latest updated, generation bumped @@ -837,15 +1050,15 @@ The segmented-storage family has four complementary specs: | Spec | Checks | Artifact | |---|---|---| -| `AwaSegmentedStorage` | lifecycle transitions across ready / deferred / waiting / active_leases / attempt_state / terminal / dlq families and their rotate/prune safety; DLQ round-trip with `run_lease` reset; short-job rescue via lease-level heartbeat freshness | [spec](../correctness/storage/AwaSegmentedStorage.tla) | +| `AwaSegmentedStorage` | lifecycle transitions across ready / deferred / waiting / leases / attempt_state / terminal / dlq families and their rotate/prune safety; DLQ round-trip with `run_lease` reset; short-job rescue via receipt heartbeat freshness | [spec](../correctness/storage/AwaSegmentedStorage.tla) | | `AwaSegmentedStorageRaces` | the claim-vs-rotate-and-prune race that would exist if claim snapshots lease rotation state without the current compare-and-swap / checked-commit discipline; paired with a checked-commit variant that closes it | [spec](../correctness/storage/AwaSegmentedStorageRaces.tla) | | `AwaStorageLockOrder` | Postgres lock-acquisition order across claim / rotate-leases / prune-leases / rotate-ready / prune-ready; waits-for cycle detector; paired with a cycle-creating demo config to prove the checker works | [spec](../correctness/storage/AwaStorageLockOrder.tla) | | `AwaSegmentedStorageTrace` | trace-replay harness that feeds hand-transcribed event sequences from real queue-storage runtime tests through the base spec; single-threaded validation that every observed transition is a legal spec action | [spec](../correctness/storage/AwaSegmentedStorageTrace.tla) | Invariants covered include: no duplicate claim after rescue, stale completions rejected via `run_lease` guard, pruned segments empty, `attempt_state` -existence implies active lease or waiting entry, DLQ and terminal families -disjoint, lock-order deadlock freedom. The TLA+ action → Rust function +existence implies a live attempt, DLQ and terminal families disjoint, +lock-order deadlock freedom. The TLA+ action → Rust function mapping lives at [`correctness/storage/MAPPING.md`](../correctness/storage/MAPPING.md). The worker-lifecycle specs `AwaCore`, `AwaExtended`, `AwaBatcher`, `AwaCbk`, From fd38608a4986a95486c56a4b287f1f0d9a0aa931 Mon Sep 17 00:00:00 2001 From: Brian Thorne Date: Sat, 2 May 2026 20:51:10 +1200 Subject: [PATCH 2/4] Clarify stale claimer heartbeat test --- awa/tests/queue_storage_runtime_test.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/awa/tests/queue_storage_runtime_test.rs b/awa/tests/queue_storage_runtime_test.rs index c4e37ef7..26925c15 100644 --- a/awa/tests/queue_storage_runtime_test.rs +++ b/awa/tests/queue_storage_runtime_test.rs @@ -4144,7 +4144,7 @@ async fn test_queue_storage_claimer_heartbeat_skips_fresh_lease() { sqlx::query(&format!( "UPDATE {schema}.queue_claimer_leases SET last_claimed_at = $1 WHERE queue = $2 AND claimer_slot = $3" )) - .bind(Utc::now() - chrono::Duration::milliseconds(300)) + .bind(Utc::now() - chrono::Duration::milliseconds(600)) .bind(queue) .bind(lease.claimer_slot) .execute(&pool) From dfae2a220878c3715fa18a117f65c596ea96956c Mon Sep 17 00:00:00 2001 From: Brian Thorne Date: Sat, 2 May 2026 21:00:38 +1200 Subject: [PATCH 3/4] Serialize receipt completion with heartbeat materialization --- awa-model/src/queue_storage.rs | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/awa-model/src/queue_storage.rs b/awa-model/src/queue_storage.rs index 2df877a0..90eea689 100644 --- a/awa-model/src/queue_storage.rs +++ b/awa-model/src/queue_storage.rs @@ -5688,10 +5688,19 @@ impl QueueStorage { WITH completed(claim_slot, job_id, run_lease) AS ( SELECT * FROM unnest($1::int[], $2::bigint[], $3::bigint[]) ), + locked_claims AS ( + SELECT claims.claim_slot, claims.job_id, claims.run_lease + FROM {schema}.lease_claims AS claims + JOIN completed + ON completed.claim_slot = claims.claim_slot + AND completed.job_id = claims.job_id + AND completed.run_lease = claims.run_lease + FOR UPDATE OF claims + ), inserted AS ( INSERT INTO {schema}.lease_claim_closures (claim_slot, job_id, run_lease, outcome, closed_at) - SELECT completed.claim_slot, completed.job_id, completed.run_lease, 'completed', clock_timestamp() - FROM completed + SELECT locked_claims.claim_slot, locked_claims.job_id, locked_claims.run_lease, 'completed', clock_timestamp() + FROM locked_claims ON CONFLICT (claim_slot, job_id, run_lease) DO NOTHING RETURNING job_id, run_lease ), From 411d12d87cc721450abb725969cc3ce49d9386ff Mon Sep 17 00:00:00 2001 From: Brian Thorne Date: Sat, 2 May 2026 21:15:08 +1200 Subject: [PATCH 4/4] Harden mixed fleet chaos smoke --- awa/tests/chaos_suite_test.rs | 86 +++++++++++++++++++++-------------- 1 file changed, 52 insertions(+), 34 deletions(-) diff --git a/awa/tests/chaos_suite_test.rs b/awa/tests/chaos_suite_test.rs index 2159c5b1..c3be89df 100644 --- a/awa/tests/chaos_suite_test.rs +++ b/awa/tests/chaos_suite_test.rs @@ -70,15 +70,15 @@ async fn clean_queue(pool: &sqlx::PgPool, queue: &str) { } async fn queue_state_counts(pool: &sqlx::PgPool, queue: &str) -> HashMap { - // The 0.6 default is queue_storage; jobs flow through the - // segmented schema (`ready_entries`, `leases`, `lease_claims`, - // `done_entries`, `dlq_entries`) rather than `awa.jobs`. Mirror - // the union the telemetry test uses so this helper sees the - // same set the dispatcher does — including open `lease_claims` - // (anti-joined with closures) projected as `running`. + // Transition-era chaos tests may run against either canonical storage or + // queue_storage depending on the runtime capabilities each process reports. + // When queue_storage exists, include both planes so a mixed-language smoke + // test does not accidentally wait on the wrong one. if let Some(schema) = queue_storage_schema_for_counts(pool).await { let sql = format!( "SELECT state::text, count(*)::bigint FROM ( \ + SELECT state FROM awa.jobs WHERE queue = $1 \ + UNION ALL \ SELECT 'available'::awa.job_state AS state \ FROM {schema}.ready_entries AS ready \ JOIN {schema}.queue_claim_heads AS claims \ @@ -1225,39 +1225,57 @@ async fn test_mixed_rust_and_python_workers_share_same_queue() { .expect("Failed to insert Rust-enqueued ChaosProbe"); } - let rust_processed_marker = - tokio::time::timeout(scaled_timeout(Duration::from_secs(10)), rx.recv()) - .await - .expect("Timed out waiting for Rust worker to process a shared-kind job") - .expect("Rust mixed-fleet receiver closed unexpectedly"); + let expected_completed = batch_size * 2; + let deadline = tokio::time::sleep(scaled_timeout(Duration::from_secs(20))); + tokio::pin!(deadline); + let mut rust_completed = 0_i64; + let mut python_completed = 0_i64; + let mut first_rust_marker: Option = None; + let mut first_python_line: Option = None; + + loop { + if rust_completed + python_completed == expected_completed { + break; + } + + tokio::select! { + marker = rx.recv() => { + let marker = marker.expect("Rust mixed-fleet receiver closed unexpectedly"); + assert!( + marker.starts_with("python-") || marker.starts_with("rust-"), + "Unexpected marker processed by Rust worker: {marker}" + ); + first_rust_marker.get_or_insert(marker); + rust_completed += 1; + } + line = python_worker.stdout_lines.recv() => { + let line = line.expect("Python mixed-fleet worker stdout closed unexpectedly"); + if line.contains("COMPLETE mode=worker_chaos_probe") { + assert!( + line.contains("marker=python-") || line.contains("marker=rust-"), + "Unexpected python worker completion line: {line}" + ); + first_python_line.get_or_insert(line); + python_completed += 1; + } + } + () = &mut deadline => { + panic!( + "Timed out waiting for mixed-fleet completions; rust_completed={rust_completed}, python_completed={python_completed}, expected={expected_completed}" + ); + } + } + } + assert!( - rust_processed_marker.starts_with("python-") - || rust_processed_marker.starts_with("rust-"), - "Unexpected marker processed by Rust worker: {rust_processed_marker}" + first_rust_marker.is_some(), + "Rust worker did not process any mixed-fleet jobs" ); - - let python_line = python_worker - .wait_for_line("COMPLETE mode=worker_chaos_probe", Duration::from_secs(10)) - .await; assert!( - python_line.contains("marker=python-") || python_line.contains("marker=rust-"), - "Unexpected python worker completion line: {python_line}" + first_python_line.is_some(), + "Python worker did not process any mixed-fleet jobs" ); - let counts = wait_for_counts( - &pool, - &queue, - |counts| { - state_count(counts, "completed") == batch_size * 2 - && state_count(counts, "failed") == 0 - && state_count(counts, "running") == 0 - && state_count(counts, "available") == 0 - }, - Duration::from_secs(20), - ) - .await; - assert_eq!(state_count(&counts, "completed"), batch_size * 2); - python_worker.stop().await; } .await;