Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 90 additions & 25 deletions awa-model/src/queue_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,40 @@ pub struct QueueClaimerLease {
pub lease_epoch: i64,
}

#[derive(Debug, Clone, Copy, PartialEq, Eq, sqlx::FromRow)]
struct QueueClaimerLeaseRow {
claimer_slot: i16,
lease_epoch: i64,
last_claimed_at: DateTime<Utc>,
expires_at: DateTime<Utc>,
}

impl QueueClaimerLeaseRow {
fn lease(self) -> QueueClaimerLease {
QueueClaimerLease {
claimer_slot: self.claimer_slot,
lease_epoch: self.lease_epoch,
}
}

fn needs_refresh(
self,
now: DateTime<Utc>,
lease_ttl: Duration,
idle_threshold: Duration,
) -> bool {
let Ok(idle_refresh_delta) = TimeDelta::from_std(idle_threshold / 2) else {
return true;
};
let Ok(expiry_refresh_delta) = TimeDelta::from_std(lease_ttl / 2) else {
return true;
};

self.last_claimed_at <= now - idle_refresh_delta
|| self.expires_at <= now + expiry_refresh_delta
}
}

#[derive(Debug, Clone, Copy, PartialEq, Eq, sqlx::FromRow)]
pub struct QueueClaimerState {
pub target_claimers: i16,
Expand Down Expand Up @@ -2352,13 +2386,13 @@ impl QueueStorage {
.await
.map_err(map_sqlx_error)?;

// mark_queue_claimer_active updates last_claimed_at + expires_at
// every heartbeat (~30/sec/row at 4-replica scale). HOT updates
// require free space on the same page as the old tuple, which
// default fillfactor=100% denies. Without explicit fillfactor the
// 30-min repro saw n_tup_hot_upd=2 / n_tup_upd=266116 — every
// heartbeat spilled to a fresh page. Match the pattern of the
// other 1-row-per-(queue, slot) hot Warm tables.
// mark_queue_claimer_active refreshes last_claimed_at + expires_at
// when a claimer lease is nearing the idle-steal threshold. HOT
// updates require free space on the same page as the old tuple,
// which default fillfactor=100% denies. Without explicit
// fillfactor, high-replica repros spilled frequent lease refreshes
// to fresh pages. Match the pattern of the other
// 1-row-per-(queue, slot) hot Warm tables.
sqlx::query(&format!(
r#"
ALTER TABLE {schema}.queue_claimer_leases SET (
Expand Down Expand Up @@ -2387,13 +2421,13 @@ impl QueueStorage {
.await
.map_err(map_sqlx_error)?;

// expires_at is updated on every heartbeat (mark_queue_claimer_active
// → SET expires_at = $now + ttl). Any column referenced by an
// index — INCLUDE columns count for HOT-blocking purposes on
// PG 17 — disqualifies the update from HOT. Empirically observed
// 0% HOT ratio at 4×8 with both `(queue, owner_instance_id,
// expires_at)` and `(queue, owner_instance_id) INCLUDE
// (expires_at)` index shapes.
// expires_at is updated by mark_queue_claimer_active when a
// claimer lease needs refresh. Any column referenced by an index —
// INCLUDE columns count for HOT-blocking purposes on PG 17 —
// disqualifies the update from HOT. Empirically observed 0% HOT
// ratio at 4×8 with both `(queue, owner_instance_id, expires_at)`
// and `(queue, owner_instance_id) INCLUDE (expires_at)` index
// shapes.
//
// Drop expires_at from the index entirely. The SELECT at
// acquire_queue_claimer that filters `expires_at > $now` falls
Expand Down Expand Up @@ -5128,6 +5162,28 @@ impl QueueStorage {
lease_ttl: Duration,
idle_threshold: Duration,
) -> Result<Option<QueueClaimerLease>, AwaError> {
Ok(self
.acquire_queue_claimer_row(
pool,
queue,
instance_id,
max_claimers,
lease_ttl,
idle_threshold,
)
.await?
.map(QueueClaimerLeaseRow::lease))
}

async fn acquire_queue_claimer_row(
&self,
pool: &PgPool,
queue: &str,
instance_id: Uuid,
max_claimers: i16,
lease_ttl: Duration,
idle_threshold: Duration,
) -> Result<Option<QueueClaimerLeaseRow>, AwaError> {
if max_claimers <= 0 {
return Ok(None);
}
Expand All @@ -5148,9 +5204,9 @@ impl QueueStorage {
0
};

if let Some(owned) = sqlx::query_as::<_, QueueClaimerLease>(&format!(
if let Some(owned) = sqlx::query_as::<_, QueueClaimerLeaseRow>(&format!(
r#"
SELECT claimer_slot, lease_epoch
SELECT claimer_slot, lease_epoch, last_claimed_at, expires_at
FROM {schema}.queue_claimer_leases
WHERE queue = $1
AND owner_instance_id = $2
Expand All @@ -5171,7 +5227,7 @@ impl QueueStorage {

for offset in 0..max_claimers {
let slot = (probe_start + offset) % max_claimers;
if let Some(updated) = sqlx::query_as::<_, QueueClaimerLease>(&format!(
if let Some(updated) = sqlx::query_as::<_, QueueClaimerLeaseRow>(&format!(
r#"
UPDATE {schema}.queue_claimer_leases
SET owner_instance_id = $3,
Expand All @@ -5189,7 +5245,7 @@ impl QueueStorage {
OR expires_at <= $4
OR last_claimed_at <= $6
)
RETURNING claimer_slot, lease_epoch
RETURNING claimer_slot, lease_epoch, last_claimed_at, expires_at
"#
))
.bind(queue)
Expand All @@ -5205,7 +5261,7 @@ impl QueueStorage {
return Ok(Some(updated));
}

if let Some(inserted) = sqlx::query_as::<_, QueueClaimerLease>(&format!(
if let Some(inserted) = sqlx::query_as::<_, QueueClaimerLeaseRow>(&format!(
r#"
INSERT INTO {schema}.queue_claimer_leases (
queue,
Expand All @@ -5218,7 +5274,7 @@ impl QueueStorage {
)
VALUES ($1, $2, $3, 0, $4, $4, $5)
ON CONFLICT (queue, claimer_slot) DO NOTHING
RETURNING claimer_slot, lease_epoch
RETURNING claimer_slot, lease_epoch, last_claimed_at, expires_at
"#
))
.bind(queue)
Expand Down Expand Up @@ -5438,7 +5494,7 @@ impl QueueStorage {
.await?;

let Some(lease) = self
.acquire_queue_claimer(
.acquire_queue_claimer_row(
pool,
queue,
instance_id,
Expand All @@ -5461,9 +5517,9 @@ impl QueueStorage {
)
.await?;

if !claimed.is_empty() {
if !claimed.is_empty() && lease.needs_refresh(Utc::now(), lease_ttl, idle_threshold) {
let _ = self
.mark_queue_claimer_active(pool, queue, instance_id, lease, lease_ttl)
.mark_queue_claimer_active(pool, queue, instance_id, lease.lease(), lease_ttl)
.await?;
}

Expand Down Expand Up @@ -5632,10 +5688,19 @@ impl QueueStorage {
WITH completed(claim_slot, job_id, run_lease) AS (
SELECT * FROM unnest($1::int[], $2::bigint[], $3::bigint[])
),
locked_claims AS (
SELECT claims.claim_slot, claims.job_id, claims.run_lease
FROM {schema}.lease_claims AS claims
JOIN completed
ON completed.claim_slot = claims.claim_slot
AND completed.job_id = claims.job_id
AND completed.run_lease = claims.run_lease
FOR UPDATE OF claims
),
inserted AS (
INSERT INTO {schema}.lease_claim_closures (claim_slot, job_id, run_lease, outcome, closed_at)
SELECT completed.claim_slot, completed.job_id, completed.run_lease, 'completed', clock_timestamp()
FROM completed
SELECT locked_claims.claim_slot, locked_claims.job_id, locked_claims.run_lease, 'completed', clock_timestamp()
FROM locked_claims
ON CONFLICT (claim_slot, job_id, run_lease) DO NOTHING
RETURNING job_id, run_lease
),
Expand Down
86 changes: 52 additions & 34 deletions awa/tests/chaos_suite_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,15 +70,15 @@ async fn clean_queue(pool: &sqlx::PgPool, queue: &str) {
}

async fn queue_state_counts(pool: &sqlx::PgPool, queue: &str) -> HashMap<String, i64> {
// The 0.6 default is queue_storage; jobs flow through the
// segmented schema (`ready_entries`, `leases`, `lease_claims`,
// `done_entries`, `dlq_entries`) rather than `awa.jobs`. Mirror
// the union the telemetry test uses so this helper sees the
// same set the dispatcher does — including open `lease_claims`
// (anti-joined with closures) projected as `running`.
// Transition-era chaos tests may run against either canonical storage or
// queue_storage depending on the runtime capabilities each process reports.
// When queue_storage exists, include both planes so a mixed-language smoke
// test does not accidentally wait on the wrong one.
if let Some(schema) = queue_storage_schema_for_counts(pool).await {
let sql = format!(
"SELECT state::text, count(*)::bigint FROM ( \
SELECT state FROM awa.jobs WHERE queue = $1 \
UNION ALL \
SELECT 'available'::awa.job_state AS state \
FROM {schema}.ready_entries AS ready \
JOIN {schema}.queue_claim_heads AS claims \
Expand Down Expand Up @@ -1225,39 +1225,57 @@ async fn test_mixed_rust_and_python_workers_share_same_queue() {
.expect("Failed to insert Rust-enqueued ChaosProbe");
}

let rust_processed_marker =
tokio::time::timeout(scaled_timeout(Duration::from_secs(10)), rx.recv())
.await
.expect("Timed out waiting for Rust worker to process a shared-kind job")
.expect("Rust mixed-fleet receiver closed unexpectedly");
let expected_completed = batch_size * 2;
let deadline = tokio::time::sleep(scaled_timeout(Duration::from_secs(20)));
tokio::pin!(deadline);
let mut rust_completed = 0_i64;
let mut python_completed = 0_i64;
let mut first_rust_marker: Option<String> = None;
let mut first_python_line: Option<String> = None;

loop {
if rust_completed + python_completed == expected_completed {
break;
}

tokio::select! {
marker = rx.recv() => {
let marker = marker.expect("Rust mixed-fleet receiver closed unexpectedly");
assert!(
marker.starts_with("python-") || marker.starts_with("rust-"),
"Unexpected marker processed by Rust worker: {marker}"
);
first_rust_marker.get_or_insert(marker);
rust_completed += 1;
}
line = python_worker.stdout_lines.recv() => {
let line = line.expect("Python mixed-fleet worker stdout closed unexpectedly");
if line.contains("COMPLETE mode=worker_chaos_probe") {
assert!(
line.contains("marker=python-") || line.contains("marker=rust-"),
"Unexpected python worker completion line: {line}"
);
first_python_line.get_or_insert(line);
python_completed += 1;
}
}
() = &mut deadline => {
panic!(
"Timed out waiting for mixed-fleet completions; rust_completed={rust_completed}, python_completed={python_completed}, expected={expected_completed}"
);
}
}
}

assert!(
rust_processed_marker.starts_with("python-")
|| rust_processed_marker.starts_with("rust-"),
"Unexpected marker processed by Rust worker: {rust_processed_marker}"
first_rust_marker.is_some(),
"Rust worker did not process any mixed-fleet jobs"
);

let python_line = python_worker
.wait_for_line("COMPLETE mode=worker_chaos_probe", Duration::from_secs(10))
.await;
assert!(
python_line.contains("marker=python-") || python_line.contains("marker=rust-"),
"Unexpected python worker completion line: {python_line}"
first_python_line.is_some(),
"Python worker did not process any mixed-fleet jobs"
);

let counts = wait_for_counts(
&pool,
&queue,
|counts| {
state_count(counts, "completed") == batch_size * 2
&& state_count(counts, "failed") == 0
&& state_count(counts, "running") == 0
&& state_count(counts, "available") == 0
},
Duration::from_secs(20),
)
.await;
assert_eq!(state_count(&counts, "completed"), batch_size * 2);

python_worker.stop().await;
}
.await;
Expand Down
Loading
Loading