diff --git a/CHANGELOG.md b/CHANGELOG.md index 1f4085a4..30ea8e3b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,16 @@ transitions live in [`docs/upgrade-0.5-to-0.6.md`](docs/upgrade-0.5-to-0.6.md). ## Unreleased +- **Cron schedule pause / resume.** `cron_jobs` carries `paused_at` and + `paused_by` columns; `POST /api/cron/{name}/pause` and `/resume` toggle + the state. The evaluator skips paused rows and the `atomic_enqueue` + CTE re-checks `paused_at IS NULL` so a pause asserted between the + leader's read and CAS still takes effect. `last_enqueued_at` is left + untouched while paused, so the schedule's `missed_fire_policy` decides + catch-up behaviour on resume. Manual `trigger_cron_job` bypasses pause. + The `/cron` UI page gains Pause/Resume controls and shows a + "queue paused" badge when the target queue is itself paused. + ## [0.6.0-beta.1] — 2026-05-19 First beta of the 0.6 line. Frames the user-facing diff between diff --git a/README.md b/README.md index 67fdee3e..1261dc65 100644 --- a/README.md +++ b/README.md @@ -17,7 +17,7 @@ without Redis or RabbitMQ, Awa is built for you. - **Unique jobs** — declare uniqueness by kind/queue/args; cancel by unique key without storing job IDs. - **Priorities, retries, snoozes** — exponential backoff with jitter; priority aging for fairness. - **Dead Letter Queue** — first-class DLQ with per-queue opt-in, retention, and operator retry/purge. -- **Periodic/cron jobs** — leader-elected scheduler with timezone support and atomic enqueue. +- **Periodic/cron jobs** — leader-elected scheduler with timezone support, atomic enqueue, and per-schedule pause/resume. - **Sequential callbacks** — `wait_for_callback()` / `resume_external()` for multi-step orchestration within a single handler. - **Webhook callbacks** — park jobs for external completion with optional CEL-expression filtering. diff --git a/awa-model/README.md b/awa-model/README.md index 46210555..5c5ea6c0 100644 --- a/awa-model/README.md +++ b/awa-model/README.md @@ -30,7 +30,9 @@ against the admin API. - **Dead Letter Queue** (`dlq`) — `DlqRow`, `DlqMetadata`, `ListDlqFilter`, `RetryFromDlqOpts`, list / retry / move / purge helpers backing the `awa dlq` CLI and the DLQ admin UI tab. -- **Cron** (`cron`) — `PeriodicJob`, `PeriodicJobBuilder`, `CronJobRow`. +- **Cron** (`cron`) — `PeriodicJob`, `PeriodicJobBuilder`, `CronJobRow` + plus `pause_cron_job` / `resume_cron_job` operating on the + `paused_at` / `paused_by` columns. - **Queue storage** (`queue_storage`) — `QueueStorage`, `QueueStorageConfig`, `ClaimedRuntimeJob`, `RotateOutcome`, `PruneOutcome`. The vacuum-aware engine introduced in 0.6. diff --git a/awa-model/migrations/v026_cron_jobs_pause.sql b/awa-model/migrations/v026_cron_jobs_pause.sql new file mode 100644 index 00000000..b1093e63 --- /dev/null +++ b/awa-model/migrations/v026_cron_jobs_pause.sql @@ -0,0 +1,18 @@ +-- Add pause state to awa.cron_jobs so individual cron schedules can be +-- paused without deleting them. Column shape mirrors awa.queue_meta +-- (paused_at / paused_by) so operators see one convention across the +-- two pause surfaces. +-- +-- `paused_at IS NULL` means the schedule is active. When non-null, the +-- evaluator skips it and atomic_enqueue refuses to fire (belt-and-braces). +-- `last_enqueued_at` is left untouched while paused, so the existing +-- `missed_fire_policy` (coalesce | catch_up) decides catch-up behaviour +-- on resume. + +ALTER TABLE awa.cron_jobs + ADD COLUMN IF NOT EXISTS paused_at TIMESTAMPTZ, + ADD COLUMN IF NOT EXISTS paused_by TEXT; + +INSERT INTO awa.schema_version (version, description) +VALUES (26, 'Add paused_at + paused_by to cron_jobs for per-schedule pause') +ON CONFLICT (version) DO NOTHING; diff --git a/awa-model/src/cron.rs b/awa-model/src/cron.rs index fe07be10..3bd2c42f 100644 --- a/awa-model/src/cron.rs +++ b/awa-model/src/cron.rs @@ -270,6 +270,19 @@ pub struct CronJobRow { pub last_enqueued_at: Option>, pub created_at: DateTime, pub updated_at: DateTime, + /// When the schedule was paused, or NULL if active. While paused, + /// the evaluator skips this row and `atomic_enqueue` refuses to + /// fire. `last_enqueued_at` is preserved across pause, so the + /// existing `missed_fire_policy` decides catch-up on resume. + pub paused_at: Option>, + pub paused_by: Option, +} + +impl CronJobRow { + /// Whether the schedule is currently paused. + pub fn is_paused(&self) -> bool { + self.paused_at.is_some() + } } /// Upsert a periodic job schedule into `awa.cron_jobs`. @@ -360,6 +373,58 @@ where Ok(result.rows_affected() > 0) } +/// Pause a cron schedule. The evaluator skips paused schedules and +/// `atomic_enqueue` refuses to fire while `paused_at IS NOT NULL`. +/// +/// `last_enqueued_at` is left untouched so the schedule's existing +/// `missed_fire_policy` decides catch-up behaviour on resume. +/// +/// Pausing an already-paused schedule refreshes `paused_at` and +/// `paused_by`. Returns `true` if a row was updated. +pub async fn pause_cron_job<'e, E>( + executor: E, + name: &str, + paused_by: Option<&str>, +) -> Result +where + E: PgExecutor<'e>, +{ + let result = sqlx::query( + r#" + UPDATE awa.cron_jobs + SET paused_at = now(), paused_by = $2, updated_at = now() + WHERE name = $1 + "#, + ) + .bind(name) + .bind(paused_by) + .execute(executor) + .await?; + Ok(result.rows_affected() > 0) +} + +/// Resume a paused cron schedule. Clears `paused_at` and `paused_by`. +/// +/// Resuming an already-active schedule is a no-op at the row level +/// (the UPDATE matches but the columns are already NULL). Returns +/// `true` if a row was updated. +pub async fn resume_cron_job<'e, E>(executor: E, name: &str) -> Result +where + E: PgExecutor<'e>, +{ + let result = sqlx::query( + r#" + UPDATE awa.cron_jobs + SET paused_at = NULL, paused_by = NULL, updated_at = now() + WHERE name = $1 + "#, + ) + .bind(name) + .execute(executor) + .await?; + Ok(result.rows_affected() > 0) +} + /// Atomically mark a cron job as enqueued AND insert the resulting job. /// /// Uses a single CTE so that both the UPDATE and INSERT happen in one @@ -385,6 +450,7 @@ where SET last_enqueued_at = $2, updated_at = now() WHERE name = $1 AND (last_enqueued_at IS NOT DISTINCT FROM $3) + AND paused_at IS NULL RETURNING name, kind, queue, args, priority, max_attempts, tags, metadata ) SELECT inserted.* @@ -417,7 +483,8 @@ where /// /// Reads the cron job config from `awa.cron_jobs` and inserts a new job /// directly. Does NOT update `last_enqueued_at` so the normal schedule -/// is unaffected. +/// is unaffected. Works on paused schedules — pause stops *automatic* +/// fires; manual trigger is an explicit operator action. pub async fn trigger_cron_job<'e, E>(executor: E, name: &str) -> Result where E: PgExecutor<'e>, diff --git a/awa-model/src/migrations.rs b/awa-model/src/migrations.rs index 4a1f17a4..fcd7f7d7 100644 --- a/awa-model/src/migrations.rs +++ b/awa-model/src/migrations.rs @@ -4,7 +4,7 @@ use sqlx::PgPool; use tracing::info; /// Current schema version. -pub const CURRENT_VERSION: i32 = 25; +pub const CURRENT_VERSION: i32 = 26; /// All migrations in order. SQL lives in `awa-model/migrations/*.sql` /// for easy inspection by users who run their own migration tooling. @@ -113,6 +113,11 @@ const MIGRATIONS: &[(i32, &str, &[&str])] = &[ "Drop idx__leases__state_hb on all AWA substrates", &[V25_UP], ), + ( + 26, + "Add paused_at + paused_by to cron_jobs for per-schedule pause", + &[V26_UP], + ), ]; const V1_UP: &str = include_str!("../migrations/v001_canonical_schema.sql"); @@ -139,6 +144,7 @@ const V22_UP: &str = include_str!("../migrations/v022_delete_compat_terminal_cou const V23_UP: &str = include_str!("../migrations/v023_install_queue_storage_substrate.sql"); const V24_UP: &str = include_str!("../migrations/v024_receipt_plane_fillfactor.sql"); const V25_UP: &str = include_str!("../migrations/v025_drop_leases_state_hb_index.sql"); +const V26_UP: &str = include_str!("../migrations/v026_cron_jobs_pause.sql"); /// Old version numbers from pre-0.4 releases that used V3/V4/V5 numbering. /// Also tolerates the unreleased inline-V6 branch numbering used during review. diff --git a/awa-ui/README.md b/awa-ui/README.md index 16860d27..1f827e5f 100644 --- a/awa-ui/README.md +++ b/awa-ui/README.md @@ -25,7 +25,9 @@ application. the **storage-transition card** when the cluster is mid-migration: prepared / mixed / finalized state, drain progress, and the gates blocking finalization. -- **Cron** — registered periodic schedules with next-run preview. +- **Cron** — registered periodic schedules with next-run preview, + per-schedule pause / resume / trigger controls, and a target-queue + paused indicator. - **DLQ** (`/dlq`) — Dead Letter Queue browser with kind / queue / tag filters, single and bulk retry, move, and purge actions. The DLQ tab is reachable from the Jobs tab via the row links on diff --git a/awa-ui/frontend/e2e/cron.spec.ts b/awa-ui/frontend/e2e/cron.spec.ts index 0b730a4d..ea92d9aa 100644 --- a/awa-ui/frontend/e2e/cron.spec.ts +++ b/awa-ui/frontend/e2e/cron.spec.ts @@ -48,6 +48,66 @@ test.describe("Cron page", () => { await expect(firstCronSummary(page, cronJobs[0].name)).toBeVisible(); await expect(page.getByRole("button", { name: "Trigger now" }).first()).toBeVisible(); + // Every row shows either a Pause or a Resume action. + const hasPauseOrResume = + (await page.getByRole("button", { name: /Pause|Resume/ }).count()) > 0; + expect(hasPauseOrResume).toBe(true); + }); + + test("pause and resume buttons round-trip a schedule", async ({ page }) => { + const cronJobs = await loadCronPage(page); + if (cronJobs.length === 0) { + test.skip(); + return; + } + + const target = cronJobs[0]; + const summary = firstCronSummary(page, target.name); + const row = summary.locator("xpath=ancestor::div[contains(@class, 'rounded-lg')][1]"); + + const pauseBtn = row.getByRole("button", { name: "Pause" }); + const resumeBtn = row.getByRole("button", { name: "Resume" }); + + // Start state may be either paused or active; normalise to active. + if ((await resumeBtn.count()) > 0) { + const [resumeRes] = await Promise.all([ + page.waitForResponse( + (r) => + r.ok() && + r.request().method() === "POST" && + new URL(r.url()).pathname === `/api/cron/${target.name}/resume`, + ), + resumeBtn.click(), + ]); + expect(resumeRes.ok()).toBeTruthy(); + await expect(pauseBtn).toBeVisible(); + } + + // Pause. + const [pauseRes] = await Promise.all([ + page.waitForResponse( + (r) => + r.ok() && + r.request().method() === "POST" && + new URL(r.url()).pathname === `/api/cron/${target.name}/pause`, + ), + pauseBtn.click(), + ]); + expect(pauseRes.ok()).toBeTruthy(); + await expect(row.getByText(/^paused$/)).toBeVisible(); + await expect(row.getByRole("button", { name: "Resume" })).toBeVisible(); + + // Resume to restore initial state. + await Promise.all([ + page.waitForResponse( + (r) => + r.ok() && + r.request().method() === "POST" && + new URL(r.url()).pathname === `/api/cron/${target.name}/resume`, + ), + row.getByRole("button", { name: "Resume" }).click(), + ]); + await expect(row.getByRole("button", { name: "Pause" })).toBeVisible(); }); test("clicking cron row toggles expand/collapse", async ({ page }) => { diff --git a/awa-ui/frontend/src/lib/api.ts b/awa-ui/frontend/src/lib/api.ts index f60554b3..900ef3df 100644 --- a/awa-ui/frontend/src/lib/api.ts +++ b/awa-ui/frontend/src/lib/api.ts @@ -177,6 +177,8 @@ export interface CronJobRow { next_fire_at: string | null; created_at: string; updated_at: string; + paused_at: string | null; + paused_by: string | null; } export type StateCounts = Record; @@ -416,6 +418,22 @@ export function triggerCronJob(name: string): Promise { }); } +export function pauseCronJob( + name: string, + pausedBy?: string, +): Promise<{ ok: boolean }> { + return apiFetch(`/cron/${encodeURIComponent(name)}/pause`, { + method: "POST", + body: JSON.stringify({ paused_by: pausedBy ?? null }), + }); +} + +export function resumeCronJob(name: string): Promise<{ ok: boolean }> { + return apiFetch(`/cron/${encodeURIComponent(name)}/resume`, { + method: "POST", + }); +} + // Stats export function fetchStats(): Promise { return apiFetch("/stats"); diff --git a/awa-ui/frontend/src/routes/cron.tsx b/awa-ui/frontend/src/routes/cron.tsx index c1313c63..f0823d6a 100644 --- a/awa-ui/frontend/src/routes/cron.tsx +++ b/awa-ui/frontend/src/routes/cron.tsx @@ -1,13 +1,19 @@ -import { useState } from "react"; +import { useMemo, useState } from "react"; import { useQuery, useMutation, useQueryClient, } from "@tanstack/react-query"; -import { fetchCronJobs, triggerCronJob } from "@/lib/api"; +import { + fetchCronJobs, + fetchQueues, + pauseCronJob, + resumeCronJob, + triggerCronJob, +} from "@/lib/api"; import { useReadOnly } from "@/hooks/use-read-only"; import { toast } from "@/components/ui/toast"; -import type { CronJobRow } from "@/lib/api"; +import type { CronJobRow, QueueOverview } from "@/lib/api"; import { Heading } from "@/components/ui/heading"; import { Badge } from "@/components/ui/badge"; import { Button } from "@/components/ui/button"; @@ -25,9 +31,28 @@ export function CronPage() { const cronQuery = useQuery({ queryKey: ["cron"], queryFn: fetchCronJobs, - refetchInterval: poll.interval, staleTime: poll.staleTime, + refetchInterval: poll.interval, + staleTime: poll.staleTime, + }); + + const queuesQuery = useQuery({ + queryKey: ["queues"], + queryFn: fetchQueues, + refetchInterval: poll.interval, + staleTime: poll.staleTime, }); + const pausedQueues = useMemo(() => { + const set = new Set(); + for (const q of queuesQuery.data ?? []) { + if (q.paused) set.add(q.queue); + } + return set; + }, [queuesQuery.data]); + + const invalidateCron = () => + queryClient.invalidateQueries({ queryKey: ["cron"] }); + const triggerMutation = useMutation({ mutationFn: (name: string) => triggerCronJob(name), onSuccess: (_data, name) => { @@ -39,6 +64,28 @@ export function CronPage() { }, }); + const pauseMutation = useMutation({ + mutationFn: (name: string) => pauseCronJob(name), + onSuccess: (_data, name) => { + void invalidateCron(); + toast.success(`Paused "${name}"`); + }, + onError: () => { + toast.error("Failed to pause cron job"); + }, + }); + + const resumeMutation = useMutation({ + mutationFn: (name: string) => resumeCronJob(name), + onSuccess: (_data, name) => { + void invalidateCron(); + toast.success(`Resumed "${name}"`); + }, + onError: () => { + toast.error("Failed to resume cron job"); + }, + }); + const cronJobs = cronQuery.data ?? []; return ( @@ -61,6 +108,10 @@ export function CronPage() { cj.metadata != null && typeof cj.metadata === "object" && Object.keys(cj.metadata as Record).length > 0; + const isPaused = cj.paused_at != null; + const targetQueuePaused = pausedQueues.has(cj.queue); + const mutating = + pauseMutation.isPending || resumeMutation.isPending; return (
{cj.name} + {isPaused && ( + + paused + + )} {cj.timezone !== "UTC" && ( @@ -102,7 +158,14 @@ export function CronPage() { )} {cj.kind} - → {cj.queue} + + → {cj.queue} + + {targetQueuePaused && ( + + queue paused + + )} {cj.priority !== 2 && ( - {cj.next_fire_at && ( + {cj.next_fire_at && !isPaused && ( + {isPaused ? ( + + ) : ( + + )} +