Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,16 @@ transitions live in [`docs/upgrade-0.5-to-0.6.md`](docs/upgrade-0.5-to-0.6.md).

## Unreleased

- **Cron schedule pause / resume.** `cron_jobs` carries `paused_at` and
`paused_by` columns; `POST /api/cron/{name}/pause` and `/resume` toggle
the state. The evaluator skips paused rows and the `atomic_enqueue`
CTE re-checks `paused_at IS NULL` so a pause asserted between the
leader's read and CAS still takes effect. `last_enqueued_at` is left
untouched while paused, so the schedule's `missed_fire_policy` decides
catch-up behaviour on resume. Manual `trigger_cron_job` bypasses pause.
The `/cron` UI page gains Pause/Resume controls and shows a
"queue paused" badge when the target queue is itself paused.

## [0.6.0-beta.1] — 2026-05-19

First beta of the 0.6 line. Frames the user-facing diff between
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ without Redis or RabbitMQ, Awa is built for you.
- **Unique jobs** — declare uniqueness by kind/queue/args; cancel by unique key without storing job IDs.
- **Priorities, retries, snoozes** — exponential backoff with jitter; priority aging for fairness.
- **Dead Letter Queue** — first-class DLQ with per-queue opt-in, retention, and operator retry/purge.
- **Periodic/cron jobs** — leader-elected scheduler with timezone support and atomic enqueue.
- **Periodic/cron jobs** — leader-elected scheduler with timezone support, atomic enqueue, and per-schedule pause/resume.
- **Sequential callbacks** — `wait_for_callback()` / `resume_external()` for multi-step orchestration within a single handler.
- **Webhook callbacks** — park jobs for external completion with optional CEL-expression filtering.

Expand Down
4 changes: 3 additions & 1 deletion awa-model/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ against the admin API.
- **Dead Letter Queue** (`dlq`) — `DlqRow`, `DlqMetadata`,
`ListDlqFilter`, `RetryFromDlqOpts`, list / retry / move / purge
helpers backing the `awa dlq` CLI and the DLQ admin UI tab.
- **Cron** (`cron`) — `PeriodicJob`, `PeriodicJobBuilder`, `CronJobRow`.
- **Cron** (`cron`) — `PeriodicJob`, `PeriodicJobBuilder`, `CronJobRow`
plus `pause_cron_job` / `resume_cron_job` operating on the
`paused_at` / `paused_by` columns.
- **Queue storage** (`queue_storage`) — `QueueStorage`,
`QueueStorageConfig`, `ClaimedRuntimeJob`, `RotateOutcome`,
`PruneOutcome`. The vacuum-aware engine introduced in 0.6.
Expand Down
18 changes: 18 additions & 0 deletions awa-model/migrations/v026_cron_jobs_pause.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
-- Add pause state to awa.cron_jobs so individual cron schedules can be
-- paused without deleting them. Column shape mirrors awa.queue_meta
-- (paused_at / paused_by) so operators see one convention across the
-- two pause surfaces.
--
-- `paused_at IS NULL` means the schedule is active. When non-null, the
-- evaluator skips it and atomic_enqueue refuses to fire (belt-and-braces).
-- `last_enqueued_at` is left untouched while paused, so the existing
-- `missed_fire_policy` (coalesce | catch_up) decides catch-up behaviour
-- on resume.

ALTER TABLE awa.cron_jobs
ADD COLUMN IF NOT EXISTS paused_at TIMESTAMPTZ,
ADD COLUMN IF NOT EXISTS paused_by TEXT;

INSERT INTO awa.schema_version (version, description)
VALUES (26, 'Add paused_at + paused_by to cron_jobs for per-schedule pause')
ON CONFLICT (version) DO NOTHING;
69 changes: 68 additions & 1 deletion awa-model/src/cron.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,19 @@ pub struct CronJobRow {
pub last_enqueued_at: Option<DateTime<Utc>>,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
/// When the schedule was paused, or NULL if active. While paused,
/// the evaluator skips this row and `atomic_enqueue` refuses to
/// fire. `last_enqueued_at` is preserved across pause, so the
/// existing `missed_fire_policy` decides catch-up on resume.
pub paused_at: Option<DateTime<Utc>>,
pub paused_by: Option<String>,
}

impl CronJobRow {
/// Whether the schedule is currently paused.
pub fn is_paused(&self) -> bool {
self.paused_at.is_some()
}
}

/// Upsert a periodic job schedule into `awa.cron_jobs`.
Expand Down Expand Up @@ -360,6 +373,58 @@ where
Ok(result.rows_affected() > 0)
}

/// Pause a cron schedule. The evaluator skips paused schedules and
/// `atomic_enqueue` refuses to fire while `paused_at IS NOT NULL`.
///
/// `last_enqueued_at` is left untouched so the schedule's existing
/// `missed_fire_policy` decides catch-up behaviour on resume.
///
/// Pausing an already-paused schedule refreshes `paused_at` and
/// `paused_by`. Returns `true` if a row was updated.
pub async fn pause_cron_job<'e, E>(
executor: E,
name: &str,
paused_by: Option<&str>,
) -> Result<bool, AwaError>
where
E: PgExecutor<'e>,
{
let result = sqlx::query(
r#"
UPDATE awa.cron_jobs
SET paused_at = now(), paused_by = $2, updated_at = now()
WHERE name = $1
"#,
)
.bind(name)
.bind(paused_by)
.execute(executor)
.await?;
Ok(result.rows_affected() > 0)
}

/// Resume a paused cron schedule. Clears `paused_at` and `paused_by`.
///
/// Resuming an already-active schedule is a no-op at the row level
/// (the UPDATE matches but the columns are already NULL). Returns
/// `true` if a row was updated.
pub async fn resume_cron_job<'e, E>(executor: E, name: &str) -> Result<bool, AwaError>
where
E: PgExecutor<'e>,
{
let result = sqlx::query(
r#"
UPDATE awa.cron_jobs
SET paused_at = NULL, paused_by = NULL, updated_at = now()
WHERE name = $1
"#,
)
.bind(name)
.execute(executor)
.await?;
Ok(result.rows_affected() > 0)
}

/// Atomically mark a cron job as enqueued AND insert the resulting job.
///
/// Uses a single CTE so that both the UPDATE and INSERT happen in one
Expand All @@ -385,6 +450,7 @@ where
SET last_enqueued_at = $2, updated_at = now()
WHERE name = $1
AND (last_enqueued_at IS NOT DISTINCT FROM $3)
AND paused_at IS NULL
RETURNING name, kind, queue, args, priority, max_attempts, tags, metadata
)
SELECT inserted.*
Expand Down Expand Up @@ -417,7 +483,8 @@ where
///
/// Reads the cron job config from `awa.cron_jobs` and inserts a new job
/// directly. Does NOT update `last_enqueued_at` so the normal schedule
/// is unaffected.
/// is unaffected. Works on paused schedules — pause stops *automatic*
/// fires; manual trigger is an explicit operator action.
pub async fn trigger_cron_job<'e, E>(executor: E, name: &str) -> Result<JobRow, AwaError>
where
E: PgExecutor<'e>,
Expand Down
8 changes: 7 additions & 1 deletion awa-model/src/migrations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use sqlx::PgPool;
use tracing::info;

/// Current schema version.
pub const CURRENT_VERSION: i32 = 25;
pub const CURRENT_VERSION: i32 = 26;

/// All migrations in order. SQL lives in `awa-model/migrations/*.sql`
/// for easy inspection by users who run their own migration tooling.
Expand Down Expand Up @@ -113,6 +113,11 @@ const MIGRATIONS: &[(i32, &str, &[&str])] = &[
"Drop idx_<schema>_leases_<slot>_state_hb on all AWA substrates",
&[V25_UP],
),
(
26,
"Add paused_at + paused_by to cron_jobs for per-schedule pause",
&[V26_UP],
),
];

const V1_UP: &str = include_str!("../migrations/v001_canonical_schema.sql");
Expand All @@ -139,6 +144,7 @@ const V22_UP: &str = include_str!("../migrations/v022_delete_compat_terminal_cou
const V23_UP: &str = include_str!("../migrations/v023_install_queue_storage_substrate.sql");
const V24_UP: &str = include_str!("../migrations/v024_receipt_plane_fillfactor.sql");
const V25_UP: &str = include_str!("../migrations/v025_drop_leases_state_hb_index.sql");
const V26_UP: &str = include_str!("../migrations/v026_cron_jobs_pause.sql");

/// Old version numbers from pre-0.4 releases that used V3/V4/V5 numbering.
/// Also tolerates the unreleased inline-V6 branch numbering used during review.
Expand Down
4 changes: 3 additions & 1 deletion awa-ui/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ application.
the **storage-transition card** when the cluster is mid-migration:
prepared / mixed / finalized state, drain progress, and the gates
blocking finalization.
- **Cron** — registered periodic schedules with next-run preview.
- **Cron** — registered periodic schedules with next-run preview,
per-schedule pause / resume / trigger controls, and a target-queue
paused indicator.
- **DLQ** (`/dlq`) — Dead Letter Queue browser with kind / queue /
tag filters, single and bulk retry, move, and purge actions. The
DLQ tab is reachable from the Jobs tab via the row links on
Expand Down
60 changes: 60 additions & 0 deletions awa-ui/frontend/e2e/cron.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,66 @@ test.describe("Cron page", () => {

await expect(firstCronSummary(page, cronJobs[0].name)).toBeVisible();
await expect(page.getByRole("button", { name: "Trigger now" }).first()).toBeVisible();
// Every row shows either a Pause or a Resume action.
const hasPauseOrResume =
(await page.getByRole("button", { name: /Pause|Resume/ }).count()) > 0;
expect(hasPauseOrResume).toBe(true);
});

test("pause and resume buttons round-trip a schedule", async ({ page }) => {
const cronJobs = await loadCronPage(page);
if (cronJobs.length === 0) {
test.skip();
return;
}

const target = cronJobs[0];
const summary = firstCronSummary(page, target.name);
const row = summary.locator("xpath=ancestor::div[contains(@class, 'rounded-lg')][1]");

const pauseBtn = row.getByRole("button", { name: "Pause" });
const resumeBtn = row.getByRole("button", { name: "Resume" });

// Start state may be either paused or active; normalise to active.
if ((await resumeBtn.count()) > 0) {
const [resumeRes] = await Promise.all([
page.waitForResponse(
(r) =>
r.ok() &&
r.request().method() === "POST" &&
new URL(r.url()).pathname === `/api/cron/${target.name}/resume`,
),
resumeBtn.click(),
]);
expect(resumeRes.ok()).toBeTruthy();
await expect(pauseBtn).toBeVisible();
}

// Pause.
const [pauseRes] = await Promise.all([
page.waitForResponse(
(r) =>
r.ok() &&
r.request().method() === "POST" &&
new URL(r.url()).pathname === `/api/cron/${target.name}/pause`,
),
pauseBtn.click(),
]);
expect(pauseRes.ok()).toBeTruthy();
await expect(row.getByText(/^paused$/)).toBeVisible();
await expect(row.getByRole("button", { name: "Resume" })).toBeVisible();

// Resume to restore initial state.
await Promise.all([
page.waitForResponse(
(r) =>
r.ok() &&
r.request().method() === "POST" &&
new URL(r.url()).pathname === `/api/cron/${target.name}/resume`,
),
row.getByRole("button", { name: "Resume" }).click(),
]);
await expect(row.getByRole("button", { name: "Pause" })).toBeVisible();
});

test("clicking cron row toggles expand/collapse", async ({ page }) => {
Expand Down
18 changes: 18 additions & 0 deletions awa-ui/frontend/src/lib/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,8 @@ export interface CronJobRow {
next_fire_at: string | null;
created_at: string;
updated_at: string;
paused_at: string | null;
paused_by: string | null;
}

export type StateCounts = Record<string, number>;
Expand Down Expand Up @@ -416,6 +418,22 @@ export function triggerCronJob(name: string): Promise<JobRow> {
});
}

export function pauseCronJob(
name: string,
pausedBy?: string,
): Promise<{ ok: boolean }> {
return apiFetch(`/cron/${encodeURIComponent(name)}/pause`, {
method: "POST",
body: JSON.stringify({ paused_by: pausedBy ?? null }),
});
}

export function resumeCronJob(name: string): Promise<{ ok: boolean }> {
return apiFetch(`/cron/${encodeURIComponent(name)}/resume`, {
method: "POST",
});
}

// Stats
export function fetchStats(): Promise<StateCounts> {
return apiFetch("/stats");
Expand Down
Loading
Loading