From f231559d7cf62af7a67abad8e6b1068cd1345ad9 Mon Sep 17 00:00:00 2001 From: DJ Majumdar Date: Mon, 23 Mar 2026 08:16:00 -0700 Subject: [PATCH 1/8] feat!: add PauseReasons bitmask type, paused_groups migration, and pause_reasons column Foundation for group-level pause/resume (plan 042, step 1): - Migration 010: creates `paused_groups` table and adds `pause_reasons` bitmask column to `tasks` with backfill for existing paused rows - PauseReasons newtype with PREEMPTION/MODULE/GLOBAL/GROUP bit constants and contains/with/without/is_empty/bits/from_bits operations - TaskRecord gains `pause_reasons` field; row mapping reads it with fallback to 0 for backward compatibility --- migrations/010_paused_groups.sql | 16 ++++++++++++++ src/lib.rs | 2 +- src/store/row_mapping.rs | 5 +++-- src/task/mod.rs | 36 ++++++++++++++++++++++++++++++++ src/task/tests.rs | 1 + 5 files changed, 57 insertions(+), 3 deletions(-) create mode 100644 migrations/010_paused_groups.sql diff --git a/migrations/010_paused_groups.sql b/migrations/010_paused_groups.sql new file mode 100644 index 0000000..b18b207 --- /dev/null +++ b/migrations/010_paused_groups.sql @@ -0,0 +1,16 @@ +-- Group-level pause state. +-- New columns use epoch milliseconds (INTEGER) per the epoch-for-storage convention. +CREATE TABLE IF NOT EXISTS paused_groups ( + group_key TEXT NOT NULL PRIMARY KEY, + paused_at INTEGER NOT NULL, -- bound from Rust: Utc::now().timestamp_millis() + resume_at INTEGER -- optional auto-resume deadline (epoch milliseconds) +); + +-- Per-task pause attribution: bitmask of active pause reasons. +-- Bit values: PREEMPTION=1, MODULE=2, GLOBAL=4, GROUP=8. +-- A task is paused when pause_reasons != 0. +ALTER TABLE tasks ADD COLUMN pause_reasons INTEGER NOT NULL DEFAULT 0; + +-- Backfill: any tasks currently in 'paused' status were paused by preemption +-- (module/global pauses don't survive restarts as they use in-memory flags). +UPDATE tasks SET pause_reasons = 1 WHERE status = 'paused'; diff --git a/src/lib.rs b/src/lib.rs index 49cd729..470f9de 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -812,7 +812,7 @@ pub use scheduler::{ pub use store::{RetentionPolicy, StoreConfig, StoreError, TaskStore}; pub use task::{ generate_dedup_key, BackoffStrategy, BatchOutcome, BatchSubmission, DependencyFailurePolicy, - DuplicateStrategy, HistoryStatus, IoBudget, ParentResolution, RecurringSchedule, + DuplicateStrategy, HistoryStatus, IoBudget, ParentResolution, PauseReasons, RecurringSchedule, RecurringScheduleInfo, RetryPolicy, SubmitOutcome, TaskError, TaskHistoryRecord, TaskLookup, TaskRecord, TaskStatus, TaskSubmission, TtlFrom, TypeStats, TypedTask, MAX_TAGS_PER_TASK, MAX_TAG_KEY_LEN, MAX_TAG_VALUE_LEN, diff --git a/src/store/row_mapping.rs b/src/store/row_mapping.rs index 8325914..bbcfdad 100644 --- a/src/store/row_mapping.rs +++ b/src/store/row_mapping.rs @@ -5,8 +5,8 @@ use sqlx::Row; use crate::priority::Priority; use crate::task::{ - DependencyFailurePolicy, HistoryStatus, IoBudget, TaskHistoryRecord, TaskRecord, TaskStatus, - TtlFrom, + DependencyFailurePolicy, HistoryStatus, IoBudget, PauseReasons, TaskHistoryRecord, TaskRecord, + TaskStatus, TtlFrom, }; /// Convert a `DateTime` to epoch milliseconds for SQLite INTEGER storage. @@ -76,6 +76,7 @@ pub(crate) fn row_to_task_record(row: &sqlx::sqlite::SqliteRow) -> TaskRecord { tags: std::collections::HashMap::new(), max_retries: row.get("max_retries"), memo: row.get("memo"), + pause_reasons: PauseReasons::from_bits(row.try_get::("pause_reasons").unwrap_or(0)), } } diff --git a/src/task/mod.rs b/src/task/mod.rs index 1599a3f..a03ee76 100644 --- a/src/task/mod.rs +++ b/src/task/mod.rs @@ -185,6 +185,40 @@ impl std::str::FromStr for HistoryStatus { } } +/// Bitmask tracking why a task is paused. Multiple reasons can be active +/// simultaneously (e.g., both module and group paused). +/// +/// Stored as INTEGER in SQLite. A value of 0 means the task is not paused. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)] +pub struct PauseReasons(i64); + +impl PauseReasons { + pub const NONE: Self = Self(0); + pub const PREEMPTION: Self = Self(1); + pub const MODULE: Self = Self(2); + pub const GLOBAL: Self = Self(4); + pub const GROUP: Self = Self(8); + + pub fn contains(self, flag: Self) -> bool { + self.0 & flag.0 != 0 + } + pub fn with(self, flag: Self) -> Self { + Self(self.0 | flag.0) + } + pub fn without(self, flag: Self) -> Self { + Self(self.0 & !flag.0) + } + pub fn is_empty(self) -> bool { + self.0 == 0 + } + pub fn bits(self) -> i64 { + self.0 + } + pub fn from_bits(bits: i64) -> Self { + Self(bits) + } +} + /// A task in the active queue (pending, running, or paused). #[derive(Debug, Clone, Serialize, Deserialize)] pub struct TaskRecord { @@ -246,6 +280,8 @@ pub struct TaskRecord { /// Serialized memo from `execute()`, delivered to `finalize()`. /// `None` when no memo was produced (e.g. `Memo = ()`). pub memo: Option>, + /// Bitmask of active pause reasons. 0 when the task is not paused. + pub pause_reasons: PauseReasons, } impl TaskRecord { diff --git a/src/task/tests.rs b/src/task/tests.rs index 05c83d4..be5cb1a 100644 --- a/src/task/tests.rs +++ b/src/task/tests.rs @@ -255,6 +255,7 @@ fn event_header_includes_tags() { on_dependency_failure: super::submission::DependencyFailurePolicy::Cancel, max_retries: None, memo: None, + pause_reasons: super::PauseReasons::NONE, }; record.tags.insert("env".into(), "prod".into()); record.tags.insert("owner".into(), "alice".into()); From db8bdc49fecdd7c87212f2b8461ea6ad5ddf9598 Mon Sep 17 00:00:00 2001 From: DJ Majumdar Date: Mon, 23 Mar 2026 08:41:00 -0700 Subject: [PATCH 2/8] feat!: retrofit all pause/resume paths to use pause_reasons bitmask Plan 042 step 2: plumb PauseReasons through every existing pause and resume code path so multiple pause sources can coexist without stranded-task bugs. Store changes (cancel_expire.rs): - pause() now accepts a reason parameter and ORs the bit into pause_reasons - pause_pending_by_type_prefix() ORs the MODULE bit; also touches already-paused tasks so the bit accumulates - resume_paused_by_type_prefix() two-step: fully resume sole-reason tasks, clear bit from multi-reason tasks - New: preemption_paused_tasks(), resume_preempted(), clear_pause_bit() Scheduler changes: - cancel_pause_emit (dispatch.rs) accepts reason; callers pass PREEMPTION, MODULE, or GLOBAL respectively - Auto-resume (run_loop.rs) scoped to PREEMPTION bit only via preemption_paused_tasks + resume_preempted - resume_all (control.rs) clears GLOBAL bit from DB Schema: - pause_reasons column added to 001_tasks.sql base schema - Migration 010 registered in migrate() with idempotent handling Tests: 5 new store tests covering bitmask accumulation, preemption filtering, clear_pause_bit, and module pause/resume with assertions on pause_reasons values. --- migrations/001_tasks.sql | 1 + src/scheduler/control.rs | 10 +- src/scheduler/dispatch.rs | 14 +-- src/scheduler/run_loop.rs | 9 +- src/store/lifecycle/cancel_expire.rs | 132 +++++++++++++++++++++++---- src/store/lifecycle/tests.rs | 112 ++++++++++++++++++++++- src/store/mod.rs | 28 ++++++ 7 files changed, 272 insertions(+), 34 deletions(-) diff --git a/migrations/001_tasks.sql b/migrations/001_tasks.sql index c6d66f2..45df5a6 100644 --- a/migrations/001_tasks.sql +++ b/migrations/001_tasks.sql @@ -35,6 +35,7 @@ CREATE TABLE IF NOT EXISTS tasks ( recurring_max_executions INTEGER, recurring_execution_count INTEGER NOT NULL DEFAULT 0, recurring_paused INTEGER NOT NULL DEFAULT 0, + pause_reasons INTEGER NOT NULL DEFAULT 0, UNIQUE(key) ); diff --git a/src/scheduler/control.rs b/src/scheduler/control.rs index 5f9d884..87cce68 100644 --- a/src/scheduler/control.rs +++ b/src/scheduler/control.rs @@ -42,10 +42,16 @@ impl Scheduler { /// Resume the scheduler after a [`pause_all`](Self::pause_all). /// /// Clears the pause flag so the run loop will resume dispatching on - /// its next poll tick. Tasks that were paused in the store will be - /// picked up automatically. + /// its next poll tick. Also clears the GLOBAL bit from all paused tasks + /// in the store — tasks with no remaining pause reasons transition back + /// to pending. pub async fn resume_all(&self) { self.inner.paused.store(false, AtomicOrdering::Release); + let _ = self + .inner + .store + .clear_pause_bit(crate::task::PauseReasons::GLOBAL) + .await; self.inner.work_notify.notify_one(); emit_event(&self.inner.event_tx, SchedulerEvent::Resumed); tracing::info!("scheduler resumed"); diff --git a/src/scheduler/dispatch.rs b/src/scheduler/dispatch.rs index cb6cf90..c0f8304 100644 --- a/src/scheduler/dispatch.rs +++ b/src/scheduler/dispatch.rs @@ -8,7 +8,7 @@ use tokio_util::sync::CancellationToken; use crate::priority::Priority; use crate::registry::IoTracker; use crate::store::TaskStore; -use crate::task::TaskRecord; +use crate::task::{PauseReasons, TaskRecord}; use super::{emit_event, SchedulerEvent}; @@ -166,7 +166,7 @@ impl ActiveTaskMap { "preempting task for higher-priority work" ); } - cancel_pause_emit(&drained, store, event_tx).await; + cancel_pause_emit(&drained, store, event_tx, PauseReasons::PREEMPTION).await; drained.into_iter().map(|(id, _)| id).collect() } @@ -221,7 +221,7 @@ impl ActiveTaskMap { event_tx: &tokio::sync::broadcast::Sender, ) -> usize { let drained = self.drain_where(|at| at.record.task_type.starts_with(prefix)); - cancel_pause_emit(&drained, store, event_tx).await; + cancel_pause_emit(&drained, store, event_tx, PauseReasons::MODULE).await; drained.len() } @@ -236,7 +236,7 @@ impl ActiveTaskMap { event_tx: &tokio::sync::broadcast::Sender, ) -> usize { let drained = self.drain_where(|_| true); - cancel_pause_emit(&drained, store, event_tx).await; + cancel_pause_emit(&drained, store, event_tx, PauseReasons::GLOBAL).await; drained.len() } @@ -259,15 +259,17 @@ impl ActiveTaskMap { // ── Helpers ──────────────────────────────────────────────────────── -/// Cancel tokens, pause in store, and emit `Preempted` events for drained tasks. +/// Cancel tokens, pause in store with the given reason, and emit `Preempted` +/// events for drained tasks. async fn cancel_pause_emit( drained: &[(i64, ActiveTask)], store: &TaskStore, event_tx: &tokio::sync::broadcast::Sender, + reason: PauseReasons, ) { for (id, at) in drained { at.token.cancel(); - let _ = store.pause(*id).await; + let _ = store.pause(*id, reason).await; emit_event( event_tx, SchedulerEvent::Preempted(at.record.event_header()), diff --git a/src/scheduler/run_loop.rs b/src/scheduler/run_loop.rs index 938cc50..b2cbf59 100644 --- a/src/scheduler/run_loop.rs +++ b/src/scheduler/run_loop.rs @@ -442,10 +442,11 @@ impl Scheduler { // Run expiry sweep before dispatching. self.maybe_expire_tasks().await; - // Resume paused tasks only if no active preemptors exist. - // Skip the query entirely when no tasks have been paused. + // Resume preemption-paused tasks only if no active preemptors exist. + // Scoped to the PREEMPTION bit — tasks paused by module/global/group + // are not auto-resumed here. if self.inner.has_paused_tasks.load(AtomicOrdering::Relaxed) { - if let Ok(paused) = self.inner.store.paused_tasks().await { + if let Ok(paused) = self.inner.store.preemption_paused_tasks().await { if paused.is_empty() { self.inner .has_paused_tasks @@ -457,7 +458,7 @@ impl Scheduler { .active .has_preemptors_for(task.priority, self.inner.preempt_priority) { - let _ = self.inner.store.resume(task.id).await; + let _ = self.inner.store.resume_preempted(task.id).await; } } } diff --git a/src/store/lifecycle/cancel_expire.rs b/src/store/lifecycle/cancel_expire.rs index 80b0e3e..291680f 100644 --- a/src/store/lifecycle/cancel_expire.rs +++ b/src/store/lifecycle/cancel_expire.rs @@ -6,31 +6,107 @@ use crate::store::row_mapping::row_to_task_record; use crate::store::{StoreError, TaskStore}; -use crate::task::{IoBudget, TaskRecord}; +use crate::task::{IoBudget, PauseReasons, TaskRecord}; use super::{compute_duration_ms, insert_history, HistoryStatus}; // ── Pause / Resume ────────────────────────────────────────────────── impl TaskStore { - /// Pause a running task (for preemption). Sets status to paused. - pub async fn pause(&self, id: i64) -> Result<(), StoreError> { - sqlx::query("UPDATE tasks SET status = 'paused', started_at = NULL WHERE id = ?") - .bind(id) - .execute(&self.pool) - .await?; + /// Pause a running task. ORs the reason bit into `pause_reasons`. + pub async fn pause(&self, id: i64, reason: PauseReasons) -> Result<(), StoreError> { + sqlx::query( + "UPDATE tasks SET status = 'paused', started_at = NULL, + pause_reasons = pause_reasons | ? + WHERE id = ?", + ) + .bind(reason.bits()) + .bind(id) + .execute(&self.pool) + .await?; Ok(()) } - /// Resume a paused task back to pending. + /// Resume a paused task back to pending. Clears all pause reasons. pub async fn resume(&self, id: i64) -> Result<(), StoreError> { - sqlx::query("UPDATE tasks SET status = 'pending' WHERE id = ? AND status = 'paused'") + sqlx::query( + "UPDATE tasks SET status = 'pending', pause_reasons = 0 + WHERE id = ? AND status = 'paused'", + ) + .bind(id) + .execute(&self.pool) + .await?; + Ok(()) + } + + /// Return paused tasks that have the PREEMPTION bit set (for auto-resume). + pub async fn preemption_paused_tasks(&self) -> Result, StoreError> { + let rows = sqlx::query( + "SELECT * FROM tasks + WHERE status = 'paused' AND (pause_reasons & 1) != 0 + ORDER BY priority ASC, id ASC", + ) + .fetch_all(&self.pool) + .await?; + let mut records: Vec = rows.iter().map(row_to_task_record).collect(); + self.populate_tags(&mut records).await?; + Ok(records) + } + + /// Clear the PREEMPTION bit from a paused task. If no other pause reasons + /// remain, the task transitions back to pending. + pub async fn resume_preempted(&self, id: i64) -> Result<(), StoreError> { + // Fully resume if PREEMPTION is the only reason. + let result = sqlx::query( + "UPDATE tasks SET status = 'pending', pause_reasons = 0 + WHERE id = ? AND status = 'paused' AND pause_reasons = 1", + ) + .bind(id) + .execute(&self.pool) + .await?; + + if result.rows_affected() == 0 { + // Clear PREEMPTION bit but stay paused (other reasons remain). + sqlx::query( + "UPDATE tasks SET pause_reasons = pause_reasons & ~1 + WHERE id = ? AND status = 'paused' AND (pause_reasons & 1) != 0", + ) .bind(id) .execute(&self.pool) .await?; + } Ok(()) } + /// Clear a specific pause-reason bit from all paused tasks. Tasks whose + /// `pause_reasons` becomes 0 transition back to pending. + /// Returns the count of tasks that fully resumed. + pub async fn clear_pause_bit(&self, reason: PauseReasons) -> Result { + let bit = reason.bits(); + + // Fully resume tasks where this is the sole reason. + let fully_resumed = sqlx::query( + "UPDATE tasks SET status = 'pending', pause_reasons = 0 + WHERE status = 'paused' AND pause_reasons = ?", + ) + .bind(bit) + .execute(&self.pool) + .await? + .rows_affected(); + + // Clear the bit from multi-reason tasks (stays paused). + sqlx::query( + "UPDATE tasks SET pause_reasons = pause_reasons & ~? + WHERE status = 'paused' AND (pause_reasons & ?) != 0", + ) + .bind(bit) + .bind(bit) + .execute(&self.pool) + .await?; + + Ok(fully_resumed) + } + // ── Cancellation (user-initiated) ────────────────────────────── /// Move a task to history as cancelled and delete it from the active queue. @@ -127,14 +203,17 @@ impl TaskStore { // ── Bulk pause / resume by type prefix ───────────────────────── - /// Pause all pending tasks whose `task_type` starts with `prefix`. + /// Pause all pending tasks whose `task_type` starts with `prefix` (module pause). /// - /// Updates their status from `pending` to `paused` in a single SQL statement. - /// Returns the number of tasks paused. + /// ORs the MODULE bit. Tasks already paused for other reasons get the + /// additional MODULE bit. + /// Returns the number of tasks whose status changed to paused. pub async fn pause_pending_by_type_prefix(&self, prefix: &str) -> Result { let pattern = format!("{prefix}%"); let result = sqlx::query( - "UPDATE tasks SET status = 'paused' WHERE task_type LIKE ? AND status = 'pending'", + "UPDATE tasks SET status = 'paused', + pause_reasons = pause_reasons | 2 + WHERE task_type LIKE ? AND status IN ('pending', 'paused')", ) .bind(&pattern) .execute(&self.pool) @@ -142,19 +221,32 @@ impl TaskStore { Ok(result.rows_affected()) } - /// Resume all paused tasks whose `task_type` starts with `prefix`. - /// - /// Updates their status from `paused` to `pending` in a single SQL statement. - /// Returns the number of tasks resumed. + /// Resume module-paused tasks by type prefix. Clears the MODULE bit. + /// Tasks fully resume (become pending) only if no other pause reasons remain. + /// Returns the count of tasks that fully resumed. pub async fn resume_paused_by_type_prefix(&self, prefix: &str) -> Result { let pattern = format!("{prefix}%"); - let result = sqlx::query( - "UPDATE tasks SET status = 'pending' WHERE task_type LIKE ? AND status = 'paused'", + + // Fully resume tasks where MODULE is the only reason. + let fully_resumed = sqlx::query( + "UPDATE tasks SET status = 'pending', pause_reasons = 0 + WHERE task_type LIKE ? AND status = 'paused' AND pause_reasons = 2", + ) + .bind(&pattern) + .execute(&self.pool) + .await? + .rows_affected(); + + // Clear MODULE bit from tasks with other active reasons (stays paused). + sqlx::query( + "UPDATE tasks SET pause_reasons = pause_reasons & ~2 + WHERE task_type LIKE ? AND status = 'paused' AND (pause_reasons & 2) != 0", ) .bind(&pattern) .execute(&self.pool) .await?; - Ok(result.rows_affected()) + + Ok(fully_resumed) } // ── Expiry (time-driven) ─────────────────────────────────────── diff --git a/src/store/lifecycle/tests.rs b/src/store/lifecycle/tests.rs index 52458e4..ca6771c 100644 --- a/src/store/lifecycle/tests.rs +++ b/src/store/lifecycle/tests.rs @@ -1,5 +1,5 @@ use crate::priority::Priority; -use crate::task::{HistoryStatus, IoBudget, TaskStatus, TaskSubmission}; +use crate::task::{HistoryStatus, IoBudget, PauseReasons, TaskStatus, TaskSubmission}; use super::super::TaskStore; use super::FailBackoff; @@ -138,15 +138,123 @@ async fn pause_and_resume() { .unwrap(); let task = store.pop_next().await.unwrap().unwrap(); - store.pause(task.id).await.unwrap(); + store.pause(task.id, PauseReasons::PREEMPTION).await.unwrap(); let paused = store.paused_tasks().await.unwrap(); assert_eq!(paused.len(), 1); assert_eq!(paused[0].status, TaskStatus::Paused); + assert_eq!(paused[0].pause_reasons, PauseReasons::PREEMPTION); store.resume(task.id).await.unwrap(); let pending = store.pending_tasks(10).await.unwrap(); assert_eq!(pending.len(), 1); assert_eq!(pending[0].status, TaskStatus::Pending); + assert_eq!(pending[0].pause_reasons, PauseReasons::NONE); +} + +#[tokio::test] +async fn pause_reasons_accumulate_across_sources() { + let store = test_store().await; + let sub = TaskSubmission::new("mod1.work").key("multi-pause"); + store.submit(&sub).await.unwrap(); + let task = store.pop_next().await.unwrap().unwrap(); + + // Preemption pause. + store.pause(task.id, PauseReasons::PREEMPTION).await.unwrap(); + let t = store.task_by_id(task.id).await.unwrap().unwrap(); + assert_eq!(t.pause_reasons, PauseReasons::PREEMPTION); + + // Module pause ORs in the MODULE bit. + store.pause_pending_by_type_prefix("mod1.").await.unwrap(); + let t = store.task_by_id(task.id).await.unwrap().unwrap(); + assert!(t.pause_reasons.contains(PauseReasons::PREEMPTION)); + assert!(t.pause_reasons.contains(PauseReasons::MODULE)); + + // Clearing preemption leaves MODULE bit. + store.resume_preempted(task.id).await.unwrap(); + let t = store.task_by_id(task.id).await.unwrap().unwrap(); + assert_eq!(t.status, TaskStatus::Paused); + assert!(!t.pause_reasons.contains(PauseReasons::PREEMPTION)); + assert!(t.pause_reasons.contains(PauseReasons::MODULE)); + + // Clearing module fully resumes. + store.resume_paused_by_type_prefix("mod1.").await.unwrap(); + let t = store.task_by_id(task.id).await.unwrap().unwrap(); + assert_eq!(t.status, TaskStatus::Pending); + assert_eq!(t.pause_reasons, PauseReasons::NONE); +} + +#[tokio::test] +async fn preemption_paused_tasks_filters_by_bit() { + let store = test_store().await; + + // Task A: preemption-paused. + let sub_a = TaskSubmission::new("test").key("a"); + store.submit(&sub_a).await.unwrap(); + let a = store.pop_next().await.unwrap().unwrap(); + store.pause(a.id, PauseReasons::PREEMPTION).await.unwrap(); + + // Task B: module-paused only. + let sub_b = TaskSubmission::new("test").key("b"); + store.submit(&sub_b).await.unwrap(); + store.pause_pending_by_type_prefix("test").await.unwrap(); + + let preempted = store.preemption_paused_tasks().await.unwrap(); + // A has PREEMPTION bit (and MODULE bit added by prefix pause). + // B has only MODULE bit. + assert_eq!(preempted.len(), 1); + assert_eq!(preempted[0].id, a.id); +} + +#[tokio::test] +async fn clear_pause_bit_resumes_sole_reason_tasks() { + let store = test_store().await; + + // Task with only GLOBAL reason. + let sub_a = TaskSubmission::new("test").key("global-only"); + store.submit(&sub_a).await.unwrap(); + let a = store.pop_next().await.unwrap().unwrap(); + store.pause(a.id, PauseReasons::GLOBAL).await.unwrap(); + + // Task with GLOBAL + PREEMPTION reasons. + let sub_b = TaskSubmission::new("test").key("multi"); + store.submit(&sub_b).await.unwrap(); + let b = store.pop_next().await.unwrap().unwrap(); + store.pause(b.id, PauseReasons::PREEMPTION).await.unwrap(); + store.pause(b.id, PauseReasons::GLOBAL).await.unwrap(); + + let fully_resumed = store.clear_pause_bit(PauseReasons::GLOBAL).await.unwrap(); + assert_eq!(fully_resumed, 1); // Only task A fully resumed. + + let ta = store.task_by_id(a.id).await.unwrap().unwrap(); + assert_eq!(ta.status, TaskStatus::Pending); + assert_eq!(ta.pause_reasons, PauseReasons::NONE); + + let tb = store.task_by_id(b.id).await.unwrap().unwrap(); + assert_eq!(tb.status, TaskStatus::Paused); + assert!(tb.pause_reasons.contains(PauseReasons::PREEMPTION)); + assert!(!tb.pause_reasons.contains(PauseReasons::GLOBAL)); +} + +#[tokio::test] +async fn module_pause_resume_with_bitmask() { + let store = test_store().await; + + let sub = TaskSubmission::new("mod1.upload").key("m1"); + store.submit(&sub).await.unwrap(); + + // Module pause sets MODULE bit. + let paused_count = store.pause_pending_by_type_prefix("mod1.").await.unwrap(); + assert_eq!(paused_count, 1); + let t = store.task_by_key(&sub.effective_key()).await.unwrap().unwrap(); + assert_eq!(t.status, TaskStatus::Paused); + assert_eq!(t.pause_reasons, PauseReasons::MODULE); + + // Module resume clears MODULE bit → fully resumes. + let resumed_count = store.resume_paused_by_type_prefix("mod1.").await.unwrap(); + assert_eq!(resumed_count, 1); + let t = store.task_by_key(&sub.effective_key()).await.unwrap().unwrap(); + assert_eq!(t.status, TaskStatus::Pending); + assert_eq!(t.pause_reasons, PauseReasons::NONE); } #[tokio::test] diff --git a/src/store/mod.rs b/src/store/mod.rs index 5ce55d9..300da62 100644 --- a/src/store/mod.rs +++ b/src/store/mod.rs @@ -281,6 +281,34 @@ impl TaskStore { sqlx::raw_sql(include_str!("../../migrations/004_task_tags.sql")) .execute(&self.pool) .await?; + + // 010: paused_groups table + pause_reasons column on tasks. + // The CREATE TABLE is idempotent, but ALTER TABLE ADD COLUMN will + // fail if the column already exists (fresh databases include it in + // 001_tasks.sql). Run each statement individually and tolerate the + // "duplicate column" error from the ALTER. + sqlx::raw_sql( + "CREATE TABLE IF NOT EXISTS paused_groups ( + group_key TEXT NOT NULL PRIMARY KEY, + paused_at INTEGER NOT NULL, + resume_at INTEGER + );", + ) + .execute(&self.pool) + .await?; + + // ALTER TABLE ADD COLUMN is not idempotent — tolerate failure. + let _ = sqlx::raw_sql( + "ALTER TABLE tasks ADD COLUMN pause_reasons INTEGER NOT NULL DEFAULT 0;", + ) + .execute(&self.pool) + .await; + + // Backfill: existing paused tasks get PREEMPTION bit. + sqlx::raw_sql("UPDATE tasks SET pause_reasons = 1 WHERE status = 'paused' AND pause_reasons = 0;") + .execute(&self.pool) + .await?; + Ok(()) } From e3f0f91c83f60ee4c326a09e0823a411111fa0af Mon Sep 17 00:00:00 2001 From: DJ Majumdar Date: Mon, 23 Mar 2026 19:16:00 -0700 Subject: [PATCH 3/8] feat: add store-level group pause state, bulk pause/resume, and query helpers Group pause state management (pause_group_state, resume_group_state, paused_groups, is_group_paused, groups_due_for_resume), bulk task pause/resume by group key with bitmask integration, and per-group pending/paused count queries. Pure store additions exercised by tests. --- src/store/lifecycle/cancel_expire.rs | 123 +++++++++++++++ src/store/lifecycle/tests.rs | 220 ++++++++++++++++++++++++++- src/store/mod.rs | 17 ++- src/store/query/active.rs | 20 +++ 4 files changed, 368 insertions(+), 12 deletions(-) diff --git a/src/store/lifecycle/cancel_expire.rs b/src/store/lifecycle/cancel_expire.rs index 291680f..b8ffa6f 100644 --- a/src/store/lifecycle/cancel_expire.rs +++ b/src/store/lifecycle/cancel_expire.rs @@ -107,6 +107,129 @@ impl TaskStore { Ok(fully_resumed) } + // ── Group Pause State ────────────────────────────────────────── + + /// Record a group as paused. Returns `true` if the group was newly paused + /// (not already paused — idempotent). + pub async fn pause_group_state( + &self, + group_key: &str, + resume_at: Option, // epoch milliseconds + ) -> Result { + let now_ms = chrono::Utc::now().timestamp_millis(); + let result = sqlx::query( + "INSERT OR IGNORE INTO paused_groups (group_key, paused_at, resume_at) VALUES (?, ?, ?)", + ) + .bind(group_key) + .bind(now_ms) + .bind(resume_at) + .execute(&self.pool) + .await?; + Ok(result.rows_affected() > 0) + } + + /// Remove a group from the paused state. Returns `true` if the group was + /// actually paused (idempotent). + pub async fn resume_group_state(&self, group_key: &str) -> Result { + let result = sqlx::query("DELETE FROM paused_groups WHERE group_key = ?") + .bind(group_key) + .execute(&self.pool) + .await?; + Ok(result.rows_affected() > 0) + } + + /// List all currently paused groups with their pause timestamps. + pub async fn paused_groups(&self) -> Result, StoreError> { + let rows: Vec<(String, i64)> = + sqlx::query_as("SELECT group_key, paused_at FROM paused_groups ORDER BY paused_at") + .fetch_all(&self.pool) + .await?; + Ok(rows) + } + + /// Check if a specific group is currently paused. + pub async fn is_group_paused(&self, group_key: &str) -> Result { + let row: Option<(i64,)> = sqlx::query_as("SELECT 1 FROM paused_groups WHERE group_key = ?") + .bind(group_key) + .fetch_optional(&self.pool) + .await?; + Ok(row.is_some()) + } + + /// Find groups whose `resume_at` has elapsed (for auto-resume). + pub async fn groups_due_for_resume(&self) -> Result, StoreError> { + let now_ms = chrono::Utc::now().timestamp_millis(); + let rows: Vec<(String,)> = sqlx::query_as( + "SELECT group_key FROM paused_groups + WHERE resume_at IS NOT NULL AND resume_at <= ?", + ) + .bind(now_ms) + .fetch_all(&self.pool) + .await?; + Ok(rows.into_iter().map(|(k,)| k).collect()) + } + + // ── Bulk pause / resume by group ────────────────────────────── + + /// Pause tasks in a group. ORs the GROUP bit into all pending and + /// already-paused tasks in the group: + /// - Pending tasks → paused with GROUP bit. + /// - Already-paused tasks → GROUP bit added (prevents premature resume + /// by other mechanisms clearing their own bit). + /// + /// Returns the count of newly paused tasks (status changed from pending). + pub async fn pause_tasks_in_group(&self, group_key: &str) -> Result { + // Add GROUP bit to already-paused tasks (no status change, just adds the bit). + sqlx::query( + "UPDATE tasks SET pause_reasons = pause_reasons | 8 + WHERE group_key = ? AND status = 'paused' AND (pause_reasons & 8) = 0", + ) + .bind(group_key) + .execute(&self.pool) + .await?; + + // Pause pending tasks with GROUP bit. + let result = sqlx::query( + "UPDATE tasks SET status = 'paused', + pause_reasons = pause_reasons | 8 + WHERE group_key = ? AND status = 'pending'", + ) + .bind(group_key) + .execute(&self.pool) + .await?; + Ok(result.rows_affected()) + } + + /// Resume group-paused tasks. Two-step process: + /// + /// 1. **Fully resume** tasks where GROUP is the sole reason (pause_reasons = 8). + /// 2. **Clear GROUP bit** from multi-reason tasks (they stay paused under + /// their remaining reasons). + /// + /// Returns the count of tasks that fully resumed (became pending). + pub async fn resume_paused_by_group(&self, group_key: &str) -> Result { + // 1. Fully resume tasks where GROUP is the sole reason. + let fully_resumed = sqlx::query( + "UPDATE tasks SET status = 'pending', pause_reasons = 0 + WHERE group_key = ? AND status = 'paused' AND pause_reasons = 8", + ) + .bind(group_key) + .execute(&self.pool) + .await? + .rows_affected(); + + // 2. Clear GROUP bit from multi-reason tasks (stays paused). + sqlx::query( + "UPDATE tasks SET pause_reasons = pause_reasons & ~8 + WHERE group_key = ? AND status = 'paused' AND (pause_reasons & 8) != 0", + ) + .bind(group_key) + .execute(&self.pool) + .await?; + + Ok(fully_resumed) + } + // ── Cancellation (user-initiated) ────────────────────────────── /// Move a task to history as cancelled and delete it from the active queue. diff --git a/src/store/lifecycle/tests.rs b/src/store/lifecycle/tests.rs index ca6771c..e85b26d 100644 --- a/src/store/lifecycle/tests.rs +++ b/src/store/lifecycle/tests.rs @@ -1,3 +1,7 @@ +use std::time::Duration; + +use chrono::Utc; + use crate::priority::Priority; use crate::task::{HistoryStatus, IoBudget, PauseReasons, TaskStatus, TaskSubmission}; @@ -138,7 +142,10 @@ async fn pause_and_resume() { .unwrap(); let task = store.pop_next().await.unwrap().unwrap(); - store.pause(task.id, PauseReasons::PREEMPTION).await.unwrap(); + store + .pause(task.id, PauseReasons::PREEMPTION) + .await + .unwrap(); let paused = store.paused_tasks().await.unwrap(); assert_eq!(paused.len(), 1); assert_eq!(paused[0].status, TaskStatus::Paused); @@ -159,7 +166,10 @@ async fn pause_reasons_accumulate_across_sources() { let task = store.pop_next().await.unwrap().unwrap(); // Preemption pause. - store.pause(task.id, PauseReasons::PREEMPTION).await.unwrap(); + store + .pause(task.id, PauseReasons::PREEMPTION) + .await + .unwrap(); let t = store.task_by_id(task.id).await.unwrap().unwrap(); assert_eq!(t.pause_reasons, PauseReasons::PREEMPTION); @@ -245,14 +255,22 @@ async fn module_pause_resume_with_bitmask() { // Module pause sets MODULE bit. let paused_count = store.pause_pending_by_type_prefix("mod1.").await.unwrap(); assert_eq!(paused_count, 1); - let t = store.task_by_key(&sub.effective_key()).await.unwrap().unwrap(); + let t = store + .task_by_key(&sub.effective_key()) + .await + .unwrap() + .unwrap(); assert_eq!(t.status, TaskStatus::Paused); assert_eq!(t.pause_reasons, PauseReasons::MODULE); // Module resume clears MODULE bit → fully resumes. let resumed_count = store.resume_paused_by_type_prefix("mod1.").await.unwrap(); assert_eq!(resumed_count, 1); - let t = store.task_by_key(&sub.effective_key()).await.unwrap().unwrap(); + let t = store + .task_by_key(&sub.effective_key()) + .await + .unwrap() + .unwrap(); assert_eq!(t.status, TaskStatus::Pending); assert_eq!(t.pause_reasons, PauseReasons::NONE); } @@ -968,3 +986,197 @@ async fn dead_letter_tasks_query_returns_only_dead_lettered() { assert_eq!(failed.len(), 1); assert_eq!(failed[0].status, HistoryStatus::Failed); } + +// ── Group pause state management (Step 3a) ────────────────────── + +#[tokio::test] +async fn pause_group_state_is_idempotent() { + let store = test_store().await; + assert!(store.pause_group_state("g1", None).await.unwrap()); + assert!(!store.pause_group_state("g1", None).await.unwrap()); // no-op +} + +#[tokio::test] +async fn resume_group_state_is_idempotent() { + let store = test_store().await; + store.pause_group_state("g1", None).await.unwrap(); + assert!(store.resume_group_state("g1").await.unwrap()); + assert!(!store.resume_group_state("g1").await.unwrap()); // no-op +} + +#[tokio::test] +async fn paused_groups_lists_all() { + let store = test_store().await; + store.pause_group_state("g2", None).await.unwrap(); + store.pause_group_state("g1", None).await.unwrap(); + + let groups = store.paused_groups().await.unwrap(); + assert_eq!(groups.len(), 2); + // Ordered by paused_at. + assert_eq!(groups[0].0, "g2"); + assert_eq!(groups[1].0, "g1"); +} + +#[tokio::test] +async fn is_group_paused_reflects_state() { + let store = test_store().await; + assert!(!store.is_group_paused("g1").await.unwrap()); + store.pause_group_state("g1", None).await.unwrap(); + assert!(store.is_group_paused("g1").await.unwrap()); + store.resume_group_state("g1").await.unwrap(); + assert!(!store.is_group_paused("g1").await.unwrap()); +} + +#[tokio::test] +async fn groups_due_for_resume_finds_elapsed() { + let store = test_store().await; + let past = Utc::now() - chrono::Duration::seconds(10); + store + .pause_group_state("g1", Some(past.timestamp_millis())) + .await + .unwrap(); + // Group with future resume_at should not appear. + let future = Utc::now() + chrono::Duration::seconds(3600); + store + .pause_group_state("g2", Some(future.timestamp_millis())) + .await + .unwrap(); + // Group with no resume_at should not appear. + store.pause_group_state("g3", None).await.unwrap(); + + let due = store.groups_due_for_resume().await.unwrap(); + assert_eq!(due, vec!["g1"]); +} + +// ── Bulk pause / resume by group (Step 3b) ────────────────────── + +#[tokio::test] +async fn pause_tasks_in_group_sets_status_and_bits() { + let store = test_store().await; + let sub1 = TaskSubmission::new("test").key("t1").group("g1"); + let sub2 = TaskSubmission::new("test").key("t2").group("g2"); + let key1 = sub1.effective_key(); + let key2 = sub2.effective_key(); + store.submit(&sub1).await.unwrap(); + store.submit(&sub2).await.unwrap(); + + let count = store.pause_tasks_in_group("g1").await.unwrap(); + assert_eq!(count, 1); + + let t1 = store.task_by_key(&key1).await.unwrap().unwrap(); + assert_eq!(t1.status, TaskStatus::Paused); + assert!(t1.pause_reasons.contains(PauseReasons::GROUP)); + + let t2 = store.task_by_key(&key2).await.unwrap().unwrap(); + assert_eq!(t2.status, TaskStatus::Pending); + assert!(t2.pause_reasons.is_empty()); +} + +#[tokio::test] +async fn pause_tasks_in_group_adds_bit_to_already_paused() { + let store = test_store().await; + let sub = TaskSubmission::new("test").key("t1").group("g1"); + store.submit(&sub).await.unwrap(); + let task = store.pop_next().await.unwrap().unwrap(); + + // Preemption pause first. + store + .pause(task.id, PauseReasons::PREEMPTION) + .await + .unwrap(); + let t = store.task_by_id(task.id).await.unwrap().unwrap(); + assert_eq!(t.pause_reasons, PauseReasons::PREEMPTION); + + // Group pause should OR in the GROUP bit. + store.pause_tasks_in_group("g1").await.unwrap(); + let t = store.task_by_id(task.id).await.unwrap().unwrap(); + assert_eq!(t.status, TaskStatus::Paused); + assert!(t.pause_reasons.contains(PauseReasons::PREEMPTION)); + assert!(t.pause_reasons.contains(PauseReasons::GROUP)); +} + +#[tokio::test] +async fn resume_paused_by_group_fully_resumes_sole_reason() { + let store = test_store().await; + let sub = TaskSubmission::new("test").key("t1").group("g1"); + store.submit(&sub).await.unwrap(); + + store.pause_tasks_in_group("g1").await.unwrap(); + let fully_resumed = store.resume_paused_by_group("g1").await.unwrap(); + assert_eq!(fully_resumed, 1); + + let t = store.task_by_id(1).await.unwrap().unwrap(); + assert_eq!(t.status, TaskStatus::Pending); + assert_eq!(t.pause_reasons, PauseReasons::NONE); +} + +#[tokio::test] +async fn resume_paused_by_group_clears_bit_but_stays_paused_with_other_reasons() { + let store = test_store().await; + let sub = TaskSubmission::new("test").key("t1").group("g1"); + store.submit(&sub).await.unwrap(); + let task = store.pop_next().await.unwrap().unwrap(); + + // Preemption pause + group pause. + store + .pause(task.id, PauseReasons::PREEMPTION) + .await + .unwrap(); + store.pause_tasks_in_group("g1").await.unwrap(); + + let fully_resumed = store.resume_paused_by_group("g1").await.unwrap(); + assert_eq!(fully_resumed, 0); // Still held by PREEMPTION. + + let t = store.task_by_id(task.id).await.unwrap().unwrap(); + assert_eq!(t.status, TaskStatus::Paused); + assert!(t.pause_reasons.contains(PauseReasons::PREEMPTION)); + assert!(!t.pause_reasons.contains(PauseReasons::GROUP)); +} + +// ── TTL behavior with group pause (Step 3c) ───────────────────── + +#[tokio::test] +async fn expire_tasks_expires_group_paused() { + let store = test_store().await; + store + .submit( + &TaskSubmission::new("test") + .key("t1") + .group("g1") + .ttl(Duration::from_millis(1)), + ) + .await + .unwrap(); + + store.pause_group_state("g1", None).await.unwrap(); + store.pause_tasks_in_group("g1").await.unwrap(); + tokio::time::sleep(Duration::from_millis(10)).await; + + // Sweep SHOULD expire the group-paused task — TTL is a hard deadline. + let expired = store.expire_tasks().await.unwrap(); + assert_eq!(expired.len(), 1); +} + +// ── Query helpers (Step 3d) ───────────────────────────────────── + +#[tokio::test] +async fn pending_and_paused_count_for_group() { + let store = test_store().await; + let sub1 = TaskSubmission::new("test").key("t1").group("g1"); + let sub2 = TaskSubmission::new("test").key("t2").group("g1"); + let sub3 = TaskSubmission::new("test").key("t3").group("g2"); + store.submit(&sub1).await.unwrap(); + store.submit(&sub2).await.unwrap(); + store.submit(&sub3).await.unwrap(); + + assert_eq!(store.pending_count_for_group("g1").await.unwrap(), 2); + assert_eq!(store.paused_count_for_group("g1").await.unwrap(), 0); + + store.pause_tasks_in_group("g1").await.unwrap(); + + assert_eq!(store.pending_count_for_group("g1").await.unwrap(), 0); + assert_eq!(store.paused_count_for_group("g1").await.unwrap(), 2); + // g2 unaffected. + assert_eq!(store.pending_count_for_group("g2").await.unwrap(), 1); + assert_eq!(store.paused_count_for_group("g2").await.unwrap(), 0); +} diff --git a/src/store/mod.rs b/src/store/mod.rs index 300da62..d16738b 100644 --- a/src/store/mod.rs +++ b/src/store/mod.rs @@ -298,16 +298,17 @@ impl TaskStore { .await?; // ALTER TABLE ADD COLUMN is not idempotent — tolerate failure. - let _ = sqlx::raw_sql( - "ALTER TABLE tasks ADD COLUMN pause_reasons INTEGER NOT NULL DEFAULT 0;", - ) - .execute(&self.pool) - .await; + let _ = + sqlx::raw_sql("ALTER TABLE tasks ADD COLUMN pause_reasons INTEGER NOT NULL DEFAULT 0;") + .execute(&self.pool) + .await; // Backfill: existing paused tasks get PREEMPTION bit. - sqlx::raw_sql("UPDATE tasks SET pause_reasons = 1 WHERE status = 'paused' AND pause_reasons = 0;") - .execute(&self.pool) - .await?; + sqlx::raw_sql( + "UPDATE tasks SET pause_reasons = 1 WHERE status = 'paused' AND pause_reasons = 0;", + ) + .execute(&self.pool) + .await?; Ok(()) } diff --git a/src/store/query/active.rs b/src/store/query/active.rs index 1bcff47..d61d262 100644 --- a/src/store/query/active.rs +++ b/src/store/query/active.rs @@ -232,6 +232,26 @@ impl TaskStore { Ok(count.0) } + /// Count of pending tasks in a group (for event emission). + pub async fn pending_count_for_group(&self, group_key: &str) -> Result { + let (count,): (i64,) = + sqlx::query_as("SELECT COUNT(*) FROM tasks WHERE group_key = ? AND status = 'pending'") + .bind(group_key) + .fetch_one(&self.pool) + .await?; + Ok(count) + } + + /// Count of paused tasks in a group. + pub async fn paused_count_for_group(&self, group_key: &str) -> Result { + let (count,): (i64,) = + sqlx::query_as("SELECT COUNT(*) FROM tasks WHERE group_key = ? AND status = 'paused'") + .bind(group_key) + .fetch_one(&self.pool) + .await?; + Ok(count) + } + /// Return the dependency edges for a given task (what it depends on). pub async fn task_dependencies(&self, task_id: i64) -> Result, StoreError> { let rows: Vec<(i64,)> = From 16e2a5c167a836d740b90fda96ab395b69fa6116 Mon Sep 17 00:00:00 2001 From: DJ Majumdar Date: Mon, 23 Mar 2026 19:35:00 -0700 Subject: [PATCH 4/8] feat: add scheduler infrastructure for group pause/resume Wire SchedulerInner with in-memory paused_groups set, builder startup loading from DB, GroupPaused/GroupResumed events, PausedGroupInfo in snapshots, and ActiveTaskMap::pause_group for cancelling running tasks. --- src/lib.rs | 5 +++-- src/scheduler/builder.rs | 24 +++++++++++++++++++++--- src/scheduler/dispatch.rs | 29 ++++++++++++++++++++++++----- src/scheduler/event.rs | 27 ++++++++++++++++++++++++++- src/scheduler/mod.rs | 23 +++++++++++++++++++++-- src/scheduler/queries.rs | 17 +++++++++++++++++ src/store/query/active.rs | 18 ++++++++++++++++++ 7 files changed, 130 insertions(+), 13 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 470f9de..300607b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -806,8 +806,9 @@ pub use resource::network_pressure::NetworkPressure; pub use resource::sampler::SamplerConfig; pub use resource::{ResourceReader, ResourceSampler, ResourceSnapshot}; pub use scheduler::{ - EstimatedProgress, GroupLimits, ProgressReporter, Scheduler, SchedulerBuilder, SchedulerConfig, - SchedulerEvent, SchedulerSnapshot, ShutdownMode, TaskEventHeader, TaskProgress, + EstimatedProgress, GroupLimits, PausedGroupInfo, ProgressReporter, Scheduler, SchedulerBuilder, + SchedulerConfig, SchedulerEvent, SchedulerSnapshot, ShutdownMode, TaskEventHeader, + TaskProgress, }; pub use store::{RetentionPolicy, StoreConfig, StoreError, TaskStore}; pub use task::{ diff --git a/src/scheduler/builder.rs b/src/scheduler/builder.rs index 28dec4c..481664e 100644 --- a/src/scheduler/builder.rs +++ b/src/scheduler/builder.rs @@ -436,6 +436,23 @@ impl SchedulerBuilder { module_state, ); + // Load persisted group pause state (survives restarts). + let paused_groups_rows = scheduler.inner.store.paused_groups().await?; + let paused_groups_set: std::collections::HashSet = + paused_groups_rows.iter().map(|(k, _)| k.clone()).collect(); + let has_paused_groups = !paused_groups_set.is_empty(); + *scheduler.inner.paused_groups.write().unwrap() = paused_groups_set; + + // Store builder-computed flags for maybe_restore_fast_dispatch(). + scheduler + .inner + .has_pressure_sources + .store(has_pressure, std::sync::atomic::Ordering::Relaxed); + scheduler.inner.has_resource_monitoring.store( + self.enable_resource_monitoring, + std::sync::atomic::Ordering::Relaxed, + ); + // Compute fast-dispatch eligibility before consuming builder fields. let has_groups = self.default_group_concurrency > 0 || !self.group_concurrency_overrides.is_empty(); @@ -482,9 +499,10 @@ impl SchedulerBuilder { } // Enable fast dispatch (single pop_next instead of peek + gate + claim) - // when no groups, no resource monitoring, no pressure sources, and no - // module caps are configured. - if !has_groups && !has_monitoring && !has_pressure && !has_module_caps { + // when no groups, no resource monitoring, no pressure sources, no + // module caps, and no paused groups are present. + if !has_groups && !has_monitoring && !has_pressure && !has_module_caps && !has_paused_groups + { scheduler .inner .fast_dispatch diff --git a/src/scheduler/dispatch.rs b/src/scheduler/dispatch.rs index c0f8304..ce5af49 100644 --- a/src/scheduler/dispatch.rs +++ b/src/scheduler/dispatch.rs @@ -240,6 +240,19 @@ impl ActiveTaskMap { drained.len() } + /// Pause active tasks whose `group_key` matches: cancel their tokens and + /// move them to paused state in the store. Returns count paused. + pub async fn pause_group( + &self, + group_key: &str, + store: &TaskStore, + event_tx: &tokio::sync::broadcast::Sender, + ) -> usize { + let drained = self.drain_where(|at| at.record.group_key.as_deref() == Some(group_key)); + cancel_pause_emit(&drained, store, event_tx, PauseReasons::GROUP).await; + drained.len() + } + /// Drain tasks matching `predicate` from the active map. /// /// Collects matching tasks under the sync lock and removes them @@ -261,7 +274,10 @@ impl ActiveTaskMap { /// Cancel tokens, pause in store with the given reason, and emit `Preempted` /// events for drained tasks. -async fn cancel_pause_emit( +/// +/// GROUP reason suppresses per-task events — the caller emits a single +/// aggregate `GroupPaused` event instead. +pub(crate) async fn cancel_pause_emit( drained: &[(i64, ActiveTask)], store: &TaskStore, event_tx: &tokio::sync::broadcast::Sender, @@ -270,9 +286,12 @@ async fn cancel_pause_emit( for (id, at) in drained { at.token.cancel(); let _ = store.pause(*id, reason).await; - emit_event( - event_tx, - SchedulerEvent::Preempted(at.record.event_header()), - ); + // GROUP has its own coordinator-level event (GroupPaused). + if !reason.contains(PauseReasons::GROUP) { + emit_event( + event_tx, + SchedulerEvent::Preempted(at.record.event_header()), + ); + } } } diff --git a/src/scheduler/event.rs b/src/scheduler/event.rs index 3c41402..61d1e81 100644 --- a/src/scheduler/event.rs +++ b/src/scheduler/event.rs @@ -49,6 +49,21 @@ pub struct SchedulerSnapshot { pub recurring_schedules: Vec, /// Tasks currently blocked waiting for dependencies. pub blocked_count: i64, + /// Groups that are currently paused, with the timestamp each was paused. + pub paused_groups: Vec, +} + +/// Information about a paused group for snapshot/dashboard display. +/// +/// `paused_at` and `resume_at` are stored as epoch milliseconds (INTEGER) in SQLite. +/// Converted to `DateTime` for the public API (display layer). +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct PausedGroupInfo { + pub group: String, + pub paused_at: chrono::DateTime, + pub paused_task_count: i64, + /// When the group will auto-resume (if time-boxed). + pub resume_at: Option>, } // ── Task Event Header ──────────────────────────────────────────────── @@ -164,6 +179,14 @@ pub enum SchedulerEvent { Paused, /// The scheduler was resumed via [`Scheduler::resume_all`](super::Scheduler::resume_all). Resumed, + /// A task group was paused. + GroupPaused { + group: String, + pending_count: usize, + running_count: usize, + }, + /// A task group was resumed. + GroupResumed { group: String, resumed_count: usize }, } impl SchedulerEvent { @@ -185,7 +208,9 @@ impl SchedulerEvent { | Self::TaskUnblocked { .. } | Self::DependencyFailed { .. } | Self::Paused - | Self::Resumed => None, + | Self::Resumed + | Self::GroupPaused { .. } + | Self::GroupResumed { .. } => None, } } } diff --git a/src/scheduler/mod.rs b/src/scheduler/mod.rs index 52b7703..5145572 100644 --- a/src/scheduler/mod.rs +++ b/src/scheduler/mod.rs @@ -33,7 +33,7 @@ mod submit; #[cfg(test)] mod tests; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::sync::atomic::{AtomicBool, AtomicUsize}; use std::sync::{Arc, RwLock}; @@ -76,7 +76,8 @@ pub(crate) struct FailureMsg { pub metrics: IoBudget, } pub use event::{ - SchedulerConfig, SchedulerEvent, SchedulerSnapshot, ShutdownMode, TaskEventHeader, + PausedGroupInfo, SchedulerConfig, SchedulerEvent, SchedulerSnapshot, ShutdownMode, + TaskEventHeader, }; pub use gate::GroupLimits; pub use progress::{EstimatedProgress, ProgressReporter, TaskProgress}; @@ -165,6 +166,18 @@ pub(crate) struct SchedulerInner { /// Computed at build time: `true` when no groups, no resource monitoring, /// and no module concurrency caps are configured. pub(crate) fast_dispatch: AtomicBool, + /// Set of currently paused group keys. Mirrors the `paused_groups` SQLite + /// table for fast in-memory lookups during gate checks and submit. + /// Uses `std::sync::RwLock` since reads vastly outnumber writes. + pub(crate) paused_groups: std::sync::RwLock>, + /// Builder-computed: were pressure sources configured? Stored here because + /// the `CompositePressure` is boxed inside `DefaultDispatchGate` and cannot + /// be introspected through `Box`. + pub(crate) has_pressure_sources: AtomicBool, + /// Builder-computed: was resource monitoring enabled? + pub(crate) has_resource_monitoring: AtomicBool, + /// Last time the auto-resume check ran for time-boxed group pauses. + pub(crate) last_group_resume_check: std::sync::Mutex, /// Send side of the completion coalescing channel. pub(crate) completion_tx: tokio::sync::mpsc::UnboundedSender, /// Receive side, `Arc`-wrapped so spawned tasks can try to drain the batch @@ -309,6 +322,12 @@ impl Scheduler { has_paused_tasks: AtomicBool::new(false), // Default to false; builder sets true when safe. fast_dispatch: AtomicBool::new(false), + // Empty by default; builder loads from DB. + paused_groups: std::sync::RwLock::new(HashSet::new()), + // Conservative defaults; builder overrides. + has_pressure_sources: AtomicBool::new(false), + has_resource_monitoring: AtomicBool::new(false), + last_group_resume_check: std::sync::Mutex::new(tokio::time::Instant::now()), completion_tx, completion_rx: std::sync::Arc::new(Mutex::new(completion_rx)), failure_tx, diff --git a/src/scheduler/queries.rs b/src/scheduler/queries.rs index b2ca461..bbed6a9 100644 --- a/src/scheduler/queries.rs +++ b/src/scheduler/queries.rs @@ -2,6 +2,7 @@ use crate::store::StoreError; +use super::event::PausedGroupInfo; use super::progress::TaskProgress; use super::{EstimatedProgress, Scheduler, SchedulerSnapshot}; @@ -152,6 +153,21 @@ impl Scheduler { let recurring_schedules = self.inner.store.recurring_schedules().await?; let blocked_count = self.inner.store.blocked_count().await?; + // Paused groups with per-group task counts. + let paused_groups_rows = self.inner.store.paused_group_info().await?; + let paused_groups = paused_groups_rows + .into_iter() + .map( + |(group, paused_at_ms, resume_at_ms, paused_task_count)| PausedGroupInfo { + group, + paused_at: chrono::DateTime::from_timestamp_millis(paused_at_ms) + .unwrap_or_default(), + paused_task_count, + resume_at: resume_at_ms.and_then(chrono::DateTime::from_timestamp_millis), + }, + ) + .collect(); + Ok(SchedulerSnapshot { running, pending_count, @@ -165,6 +181,7 @@ impl Scheduler { is_paused: self.is_paused(), recurring_schedules, blocked_count, + paused_groups, }) } } diff --git a/src/store/query/active.rs b/src/store/query/active.rs index d61d262..e7d7b39 100644 --- a/src/store/query/active.rs +++ b/src/store/query/active.rs @@ -252,6 +252,24 @@ impl TaskStore { Ok(count) } + /// Paused group info with per-group task counts, for snapshot display. + /// + /// Uses a single aggregate query to avoid N+1 per paused group. + pub async fn paused_group_info( + &self, + ) -> Result, i64)>, StoreError> { + let rows: Vec<(String, i64, Option, i64)> = sqlx::query_as( + "SELECT pg.group_key, pg.paused_at, pg.resume_at, COUNT(t.id) AS paused_task_count + FROM paused_groups pg + LEFT JOIN tasks t ON t.group_key = pg.group_key AND t.status = 'paused' AND (t.pause_reasons & 8) != 0 + GROUP BY pg.group_key + ORDER BY pg.paused_at", + ) + .fetch_all(&self.pool) + .await?; + Ok(rows) + } + /// Return the dependency edges for a given task (what it depends on). pub async fn task_dependencies(&self, task_id: i64) -> Result, StoreError> { let rows: Vec<(i64,)> = From 37c7d4d5fc65a0932f2e68aeb3dfe4f08f8ecd1e Mon Sep 17 00:00:00 2001 From: DJ Majumdar Date: Mon, 23 Mar 2026 19:43:00 -0700 Subject: [PATCH 5/8] feat: add scheduler control, gate check, and auto-resume for group pause - pause_group / resume_group / pause_group_until public API in control.rs - Gate admission rejects tasks whose group is paused - Finalizer deferral: re-queue parent finalize when group is paused - Time-boxed auto-resume checked every ~5s in the run loop - maybe_restore_fast_dispatch re-evaluates when last group is resumed --- src/scheduler/control.rs | 188 ++++++++++++++++++++++++++++++++++++++ src/scheduler/gate.rs | 21 ++++- src/scheduler/run_loop.rs | 38 ++++++++ 3 files changed, 246 insertions(+), 1 deletion(-) diff --git a/src/scheduler/control.rs b/src/scheduler/control.rs index 87cce68..0632f39 100644 --- a/src/scheduler/control.rs +++ b/src/scheduler/control.rs @@ -2,6 +2,10 @@ use std::sync::atomic::Ordering as AtomicOrdering; +use chrono::{DateTime, Utc}; + +use crate::store::StoreError; + use super::{emit_event, Scheduler, SchedulerEvent}; impl Scheduler { @@ -80,4 +84,188 @@ impl Scheduler { pub fn set_default_group_concurrency(&self, limit: usize) { self.inner.group_limits.set_default(limit); } + + // ── Group Pause / Resume ─────────────────────────────────────── + + /// Shared implementation for `pause_group` and `pause_group_until`. + /// Persists state, updates in-memory set, pauses pending + running tasks. + /// + /// **Ordering is load-bearing**: Step 1 (persist) MUST happen before step 4 + /// (pause pending) and step 5 (cancel running). If a running task completes + /// between steps 1 and 5, it may trigger recurring instance creation or + /// dependency resolution. Both code paths check the `paused_groups` table — + /// which is already populated by step 1 — so the new instance/unblocked + /// task will be correctly inserted as `'paused'`. + async fn pause_group_inner( + &self, + group_key: &str, + resume_at: Option, + ) -> Result, StoreError> { + // 1. Persist pause state (idempotent). + let newly_paused = self + .inner + .store + .pause_group_state(group_key, resume_at) + .await?; + if !newly_paused { + return Ok(None); // Already paused — no-op. + } + + // 2. Add to in-memory set. + self.inner + .paused_groups + .write() + .unwrap() + .insert(group_key.to_string()); + + // 3. Disable fast dispatch (gate checks now needed). + self.inner + .fast_dispatch + .store(false, AtomicOrdering::Relaxed); + + // 4. Pause pending tasks in the group (pending → paused, already-paused get GROUP bit). + let pending_paused = self.inner.store.pause_tasks_in_group(group_key).await?; + + // 5. Cancel and pause running tasks in the group. + // Note: does NOT set has_paused_tasks — the auto-resume path only handles + // PREEMPTION-paused tasks, so setting the flag would cause an unnecessary + // query that returns empty. + let running_paused = self + .inner + .active + .pause_group(group_key, &self.inner.store, &self.inner.event_tx) + .await; + + let _ = self.inner.event_tx.send(SchedulerEvent::GroupPaused { + group: group_key.to_string(), + pending_count: pending_paused as usize, + running_count: running_paused, + }); + + Ok(Some(pending_paused)) + } + + /// Pause all tasks in a group. + /// + /// - Persists the pause state in SQLite (survives restarts). + /// - Pending tasks in the group are set to `paused` status with GROUP bit. + /// - Already-paused tasks get the GROUP bit added (prevents premature resume). + /// - Running tasks are cancelled and moved to `paused` with GROUP bit. + /// They will resume from the beginning on group resume. + /// - New submissions to the group are accepted but paused immediately. + /// - TTL is NOT frozen — `expires_at` remains a hard deadline. + /// - Idempotent: pausing an already-paused group is a no-op. + pub async fn pause_group(&self, group_key: &str) -> Result<(), StoreError> { + let Some(pending_paused) = self.pause_group_inner(group_key, None).await? else { + return Ok(()); + }; + tracing::info!(group = group_key, pending_paused, "group paused"); + Ok(()) + } + + /// Resume a paused group. + /// + /// - Clears the GROUP bit. Tasks with no remaining bits become `pending`. + /// Tasks with other bits (e.g., MODULE) stay paused under those reasons. + /// - Removes the group from the persisted pause state. + /// - Idempotent: resuming a non-paused group is a no-op. + pub async fn resume_group(&self, group_key: &str) -> Result<(), StoreError> { + if !self.inner.paused_groups.read().unwrap().contains(group_key) { + return Ok(()); + } + + // Resume tasks (clear GROUP bit; sole-group tasks become pending). + let resumed_count = self.inner.store.resume_paused_by_group(group_key).await?; + + self.inner.store.resume_group_state(group_key).await?; + + // Drop the write lock BEFORE calling maybe_restore_fast_dispatch + // to avoid deadlock (RwLock is not reentrant). + let is_empty = { + let mut set = self.inner.paused_groups.write().unwrap(); + set.remove(group_key); + set.is_empty() + }; + if is_empty { + self.maybe_restore_fast_dispatch(); + } + + if resumed_count > 0 { + self.inner.work_notify.notify_one(); + } + + let _ = self.inner.event_tx.send(SchedulerEvent::GroupResumed { + group: group_key.to_string(), + resumed_count: resumed_count as usize, + }); + + tracing::info!(group = group_key, resumed_count, "group resumed"); + Ok(()) + } + + /// Time-boxed pause: pause a group until a deadline, then auto-resume. + /// Cancels running tasks immediately (same as `pause_group`). + /// + /// **Latency**: Auto-resume is checked every ~5 seconds (throttled to avoid + /// per-cycle DB queries), so the group may remain paused up to ~5 seconds + /// past the deadline. For sub-second precision, use an external timer that + /// calls `resume_group` directly. + pub async fn pause_group_until( + &self, + group_key: &str, + deadline: DateTime, + ) -> Result<(), StoreError> { + let Some(pending_paused) = self + .pause_group_inner(group_key, Some(deadline.timestamp_millis())) + .await? + else { + return Ok(()); + }; + tracing::info!(group = group_key, pending_paused, resume_at = %deadline, "group paused until deadline"); + Ok(()) + } + + /// List currently paused groups (synchronous — reads in-memory set). + pub fn paused_groups(&self) -> Vec { + self.inner + .paused_groups + .read() + .unwrap() + .iter() + .cloned() + .collect() + } + + /// Check if a specific group is paused. + pub fn is_group_paused(&self, group_key: &str) -> bool { + self.inner.paused_groups.read().unwrap().contains(group_key) + } + + /// Re-evaluate whether fast dispatch can be re-enabled. + /// + /// Must mirror the conditions in `SchedulerBuilder::build()`: + /// no paused groups, no group limits (default or overrides), no resource + /// monitoring, no pressure sources, no module concurrency caps. + fn maybe_restore_fast_dispatch(&self) { + let has_groups = self.inner.group_limits.default_limit() > 0 + || self.inner.group_limits.has_overrides() + || !self.inner.paused_groups.read().unwrap().is_empty(); + let has_module_caps = !self.inner.module_caps.read().unwrap().is_empty(); + + if !has_groups + && !self + .inner + .has_resource_monitoring + .load(AtomicOrdering::Relaxed) + && !has_module_caps + && !self + .inner + .has_pressure_sources + .load(AtomicOrdering::Relaxed) + { + self.inner + .fast_dispatch + .store(true, AtomicOrdering::Relaxed); + } + } } diff --git a/src/scheduler/gate.rs b/src/scheduler/gate.rs index 4cd6539..821fc33 100644 --- a/src/scheduler/gate.rs +++ b/src/scheduler/gate.rs @@ -4,7 +4,7 @@ //! requeued. The built-in [`DefaultDispatchGate`] applies backpressure //! throttling and IO-budget checks. -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::future::Future; use std::pin::Pin; use std::sync::atomic::{AtomicUsize, Ordering}; @@ -35,6 +35,8 @@ pub struct GateContext<'a> { pub module_caps: &'a StdRwLock>, /// Per-module live running counts (module name → AtomicUsize). pub module_running: &'a HashMap, + /// Set of currently paused group keys. + pub paused_groups: &'a HashSet, } // ── Dispatch Gate ────────────────────────────────────────────────── @@ -149,6 +151,18 @@ impl DispatchGate for DefaultDispatchGate { return Ok(false); } + // Group pause check. + if let Some(group_key) = &task.group_key { + if ctx.paused_groups.contains(group_key) { + tracing::trace!( + task_type = task.task_type, + group = group_key, + "task deferred — group paused — requeuing" + ); + return Ok(false); + } + } + // Group concurrency check. if let Some(group_key) = &task.group_key { if let Some(limits) = ctx.group_limits { @@ -352,4 +366,9 @@ impl GroupLimits { pub fn default_limit(&self) -> usize { self.default.load(Ordering::Relaxed) } + + /// Returns true if any per-group overrides are configured. + pub fn has_overrides(&self) -> bool { + !self.overrides.read().unwrap().is_empty() + } } diff --git a/src/scheduler/run_loop.rs b/src/scheduler/run_loop.rs index b2cbf59..b3ac820 100644 --- a/src/scheduler/run_loop.rs +++ b/src/scheduler/run_loop.rs @@ -2,6 +2,7 @@ use std::sync::atomic::Ordering as AtomicOrdering; use std::sync::Arc; +use std::time::Duration; use tokio_util::sync::CancellationToken; @@ -94,12 +95,14 @@ impl Scheduler { // Build gate context from current state. let reader_guard = self.inner.resource_reader.lock().await; + let paused_groups = self.inner.paused_groups.read().unwrap().clone(); let gate_ctx = GateContext { store: &self.inner.store, resource_reader: reader_guard.as_ref(), group_limits: Some(&self.inner.group_limits), module_caps: &self.inner.module_caps, module_running: &self.inner.module_running, + paused_groups: &paused_groups, }; // Admission check while the task is still pending — no running @@ -181,6 +184,22 @@ impl Scheduler { id }; + // Check if parent's group is paused — defer finalize if so. + if let Ok(Some(parent_record)) = self.inner.store.task_by_id(parent_id).await { + if let Some(ref gk) = parent_record.group_key { + if self.inner.paused_groups.read().unwrap().contains(gk) { + // Re-insert into pending_finalizers for later. + self.inner + .active + .pending_finalizers + .lock() + .unwrap() + .insert(parent_id); + return Ok(false); + } + } + } + // Transition the parent from waiting to running for finalize. self.inner.store.set_running_for_finalize(parent_id).await?; @@ -464,6 +483,25 @@ impl Scheduler { } } + // Auto-resume time-boxed group pauses (throttled to avoid per-cycle DB queries). + if !self.inner.paused_groups.read().unwrap().is_empty() { + let now = tokio::time::Instant::now(); + let should_check = { + let last = self.inner.last_group_resume_check.lock().unwrap(); + now.duration_since(*last) >= Duration::from_secs(5) + }; + if should_check { + *self.inner.last_group_resume_check.lock().unwrap() = now; + if let Ok(due_groups) = self.inner.store.groups_due_for_resume().await { + for group_key in due_groups { + if let Err(e) = self.resume_group(&group_key).await { + tracing::error!(group = group_key, error = %e, "auto-resume failed"); + } + } + } + } + } + // Dispatch any pending finalizers (parent tasks ready for finalize phase). loop { match self.try_dispatch_finalizer().await { From 9556b3b4c3f40584ed81fde2811aca14c641aea0 Mon Sep 17 00:00:00 2001 From: DJ Majumdar Date: Mon, 23 Mar 2026 19:59:00 -0700 Subject: [PATCH 6/8] feat!: add group-pause edge cases, public API delegation, and SubmitOutcome change Wire group-pause awareness into submit, dependency resolution, recurring instance creation, and task-completion unblocking so newly-eligible tasks are downgraded to paused when their group is paused. Breaking: SubmitOutcome::Inserted is now a struct with { id, group_paused } instead of a tuple variant. Expose pause_group / resume_group / is_group_paused / paused_groups on DomainHandle and ModuleHandle, delegating to the scheduler. --- docs/persistence-and-recovery.md | 2 +- src/domain.rs | 23 ++++++++++++++++++++ src/lib.rs | 2 +- src/module.rs | 23 ++++++++++++++++++++ src/scheduler/tests.rs | 2 +- src/store/dependencies.rs | 24 +++++++++++++++++++++ src/store/lifecycle/transitions.rs | 34 ++++++++++++++++++++++++++++++ src/store/submit/mod.rs | 28 +++++++++++++++++++++++- src/task/submission.rs | 14 ++++++++---- 9 files changed, 144 insertions(+), 8 deletions(-) diff --git a/docs/persistence-and-recovery.md b/docs/persistence-and-recovery.md index bd2bb95..9dedcc7 100644 --- a/docs/persistence-and-recovery.md +++ b/docs/persistence-and-recovery.md @@ -64,7 +64,7 @@ use taskmill::SubmitOutcome; let outcome = scheduler.domain::().submit(task).await?; match outcome { - SubmitOutcome::Inserted(id) => println!("new task: {id}"), + SubmitOutcome::Inserted { id, group_paused } => println!("new task: {id}, group_paused: {group_paused}"), SubmitOutcome::Duplicate => println!("already queued"), SubmitOutcome::Upgraded(id) => println!("priority upgraded: {id}"), SubmitOutcome::Requeued(id) => println!("requeued from history: {id}"), diff --git a/src/domain.rs b/src/domain.rs index 642aea1..2dc9117 100644 --- a/src/domain.rs +++ b/src/domain.rs @@ -601,6 +601,29 @@ impl DomainHandle { self.inner.is_paused() } + // ── Group pause / resume ──────────────────────────────────────── + + /// Pause all tasks in a group. Delegates to the scheduler — group pause is + /// global, not scoped to this domain. + pub async fn pause_group(&self, group_key: &str) -> Result<(), StoreError> { + self.inner.pause_group(group_key).await + } + + /// Resume a paused group. + pub async fn resume_group(&self, group_key: &str) -> Result<(), StoreError> { + self.inner.resume_group(group_key).await + } + + /// Check if a group is paused. + pub fn is_group_paused(&self, group_key: &str) -> bool { + self.inner.is_group_paused(group_key) + } + + /// List all currently paused groups. + pub fn paused_groups(&self) -> Vec { + self.inner.paused_groups() + } + // ── Queries ───────────────────────────────────────────────────── /// Capture a status snapshot for this domain. diff --git a/src/lib.rs b/src/lib.rs index 300607b..2e40add 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -749,7 +749,7 @@ //! let sync: DomainHandle = scheduler.domain::(); //! let outcome = sync.submit(SyncFile { path: "path/to/file.txt".into() }).await?; //! // outcome is Superseded { new_task_id, replaced_task_id } if a duplicate existed, -//! // or Inserted(id) if this was the first submission. +//! // or Inserted { id, group_paused } if this was the first submission. //! ``` //! //! # How the dispatch loop works diff --git a/src/module.rs b/src/module.rs index c19a3cf..25edecf 100644 --- a/src/module.rs +++ b/src/module.rs @@ -661,6 +661,29 @@ impl ModuleHandle { .is_some_and(|f| f.load(AtomicOrdering::Acquire)) } + // ── Group pause / resume ─────────────────────────────────────── + + /// Pause all tasks in a group. This is scheduler-global — a group paused + /// via one module's handle affects all modules. + pub async fn pause_group(&self, group_key: &str) -> Result<(), StoreError> { + self.scheduler.pause_group(group_key).await + } + + /// Resume a paused group. + pub async fn resume_group(&self, group_key: &str) -> Result<(), StoreError> { + self.scheduler.resume_group(group_key).await + } + + /// Check if a group is paused. + pub fn is_group_paused(&self, group_key: &str) -> bool { + self.scheduler.is_group_paused(group_key) + } + + /// List all currently paused groups. + pub fn paused_groups(&self) -> Vec { + self.scheduler.paused_groups() + } + // ── Module concurrency ──────────────────────────────────────── /// Set the maximum number of tasks from this module that may run concurrently. diff --git a/src/scheduler/tests.rs b/src/scheduler/tests.rs index 0a195f2..a6d69d9 100644 --- a/src/scheduler/tests.rs +++ b/src/scheduler/tests.rs @@ -1498,7 +1498,7 @@ async fn chain_of_supersedes() { .key("chain") .payload_raw(b"A".to_vec()); let out_a = sched.submit(&sub_a).await.unwrap(); - assert!(matches!(out_a, SubmitOutcome::Inserted(_))); + assert!(matches!(out_a, SubmitOutcome::Inserted { .. })); // B supersedes A. let sub_b = TaskSubmission::new("test::test") diff --git a/src/store/dependencies.rs b/src/store/dependencies.rs index 3f3649b..4b756c2 100644 --- a/src/store/dependencies.rs +++ b/src/store/dependencies.rs @@ -58,6 +58,20 @@ impl TaskStore { } let unblocked: Vec<(i64,)> = q.fetch_all(&mut **conn).await?; + // Step 3: If any newly-unblocked task's group is paused, downgrade to + // paused with the GROUP reason bit instead of leaving it as pending. + for (task_id,) in &unblocked { + sqlx::query( + "UPDATE tasks SET status = 'paused', + pause_reasons = pause_reasons | 8 + WHERE id = ? AND status = 'pending' + AND group_key IN (SELECT group_key FROM paused_groups)", + ) + .bind(task_id) + .execute(&mut **conn) + .await?; + } + Ok(unblocked.into_iter().map(|(id,)| id).collect()) } @@ -171,6 +185,16 @@ impl TaskStore { .execute(&mut **conn) .await?; if result.rows_affected() > 0 { + // If the task's group is paused, downgrade to paused. + sqlx::query( + "UPDATE tasks SET status = 'paused', + pause_reasons = pause_reasons | 8 + WHERE id = ? AND status = 'pending' + AND group_key IN (SELECT group_key FROM paused_groups)", + ) + .bind(dep_id) + .execute(&mut **conn) + .await?; all_unblocked.push(dep_id); } } diff --git a/src/store/lifecycle/transitions.rs b/src/store/lifecycle/transitions.rs index 2940a98..541bbc9 100644 --- a/src/store/lifecycle/transitions.rs +++ b/src/store/lifecycle/transitions.rs @@ -450,6 +450,19 @@ impl TaskStore { } let unblocked: Vec<(i64,)> = uq.fetch_all(&mut *conn).await?; all_unblocked = unblocked.into_iter().map(|(id,)| id).collect(); + + // Downgrade any newly-unblocked tasks whose group is paused. + for task_id in &all_unblocked { + sqlx::query( + "UPDATE tasks SET status = 'paused', + pause_reasons = pause_reasons | 8 + WHERE id = ? AND status = 'pending' + AND group_key IN (SELECT group_key FROM paused_groups)", + ) + .bind(task_id) + .execute(&mut *conn) + .await?; + } } sqlx::query("COMMIT").execute(&mut *conn).await?; @@ -624,6 +637,27 @@ impl TaskStore { .await?; } + // If the group is paused, set the new instance to paused + // with the GROUP reason bit. + if task.group_key.is_some() { + let is_paused: Option<(i64,)> = + sqlx::query_as("SELECT 1 FROM paused_groups WHERE group_key = ?") + .bind(&task.group_key) + .fetch_optional(&mut **conn) + .await?; + + if is_paused.is_some() { + sqlx::query( + "UPDATE tasks SET status = 'paused', + pause_reasons = pause_reasons | 8 + WHERE id = ?", + ) + .bind(next_id) + .execute(&mut **conn) + .await?; + } + } + recurring_info = Some((next_run, execution_count)); } // If existing.is_some(), skip (pile-up prevention). diff --git a/src/store/submit/mod.rs b/src/store/submit/mod.rs index 15089c6..032899c 100644 --- a/src/store/submit/mod.rs +++ b/src/store/submit/mod.rs @@ -167,7 +167,33 @@ pub(crate) async fn submit_one( } } - return Ok(SubmitOutcome::Inserted(task_id)); + // If the task's group is paused, insert as paused with the GROUP reason. + let is_group_paused = if let Some(ref gk) = sub.group_key { + let row: Option<(i64,)> = + sqlx::query_as("SELECT 1 FROM paused_groups WHERE group_key = ?") + .bind(gk) + .fetch_optional(&mut **conn) + .await?; + row.is_some() + } else { + false + }; + + if is_group_paused { + sqlx::query( + "UPDATE tasks SET status = 'paused', + pause_reasons = pause_reasons | 8 + WHERE id = ? AND status = 'pending'", + ) + .bind(task_id) + .execute(&mut **conn) + .await?; + } + + return Ok(SubmitOutcome::Inserted { + id: task_id, + group_paused: is_group_paused, + }); } // Dedup hit — branch on the duplicate strategy. diff --git a/src/task/submission.rs b/src/task/submission.rs index 18ef837..4615a9b 100644 --- a/src/task/submission.rs +++ b/src/task/submission.rs @@ -102,7 +102,13 @@ pub enum DuplicateStrategy { #[derive(Debug, Clone, PartialEq, Eq)] pub enum SubmitOutcome { /// Task was inserted as new. - Inserted(i64), + Inserted { + /// The task ID assigned to the new task. + id: i64, + /// True if the task was submitted to a paused group and will not be + /// dispatched until the group is resumed. + group_paused: bool, + }, /// Duplicate key existed; its priority was upgraded (pending/paused tasks only). Upgraded(i64), /// Duplicate key existed and is running/paused; marked for re-queue after completion. @@ -124,7 +130,7 @@ impl SubmitOutcome { /// Returns the task ID if the task was inserted, upgraded, requeued, or superseded. pub fn id(&self) -> Option { match self { - Self::Inserted(id) | Self::Upgraded(id) | Self::Requeued(id) => Some(*id), + Self::Inserted { id, .. } | Self::Upgraded(id) | Self::Requeued(id) => Some(*id), Self::Superseded { new_task_id, .. } => Some(*new_task_id), Self::Duplicate | Self::Rejected => None, } @@ -132,7 +138,7 @@ impl SubmitOutcome { /// Returns `true` if a new task was inserted (including via supersede). pub fn is_inserted(&self) -> bool { - matches!(self, Self::Inserted(_) | Self::Superseded { .. }) + matches!(self, Self::Inserted { .. } | Self::Superseded { .. }) } } @@ -152,7 +158,7 @@ impl BatchOutcome { self.outcomes .iter() .filter_map(|o| match o { - SubmitOutcome::Inserted(id) => Some(*id), + SubmitOutcome::Inserted { id, .. } => Some(*id), _ => None, }) .collect() From c1b46f0d6fb08489dff8d70b3e6c2eb7907d910c Mon Sep 17 00:00:00 2001 From: DJ Majumdar Date: Mon, 23 Mar 2026 20:06:00 -0700 Subject: [PATCH 7/8] test: add integration tests for group-pause edge cases (plan 042, step 6) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Cover submit-to-paused-group, recurring next-instance downgrade, blocked→paused transition, multi-reason pause interaction, and DomainHandle delegation. Also remove duplicated ALTER TABLE from migration 010 (now lives in the earlier pause_reasons migration). --- migrations/010_paused_groups.sql | 9 - tests/integration.rs | 2 + tests/integration/group_pause.rs | 420 +++++++++++++++++++++++++++++++ 3 files changed, 422 insertions(+), 9 deletions(-) create mode 100644 tests/integration/group_pause.rs diff --git a/migrations/010_paused_groups.sql b/migrations/010_paused_groups.sql index b18b207..3006b6f 100644 --- a/migrations/010_paused_groups.sql +++ b/migrations/010_paused_groups.sql @@ -5,12 +5,3 @@ CREATE TABLE IF NOT EXISTS paused_groups ( paused_at INTEGER NOT NULL, -- bound from Rust: Utc::now().timestamp_millis() resume_at INTEGER -- optional auto-resume deadline (epoch milliseconds) ); - --- Per-task pause attribution: bitmask of active pause reasons. --- Bit values: PREEMPTION=1, MODULE=2, GLOBAL=4, GROUP=8. --- A task is paused when pause_reasons != 0. -ALTER TABLE tasks ADD COLUMN pause_reasons INTEGER NOT NULL DEFAULT 0; - --- Backfill: any tasks currently in 'paused' status were paused by preemption --- (module/global pauses don't survive restarts as they use in-memory flags). -UPDATE tasks SET pause_reasons = 1 WHERE status = 'paused'; diff --git a/tests/integration.rs b/tests/integration.rs index d86544c..41e3aef 100644 --- a/tests/integration.rs +++ b/tests/integration.rs @@ -18,6 +18,8 @@ mod common; mod cross_module; #[path = "integration/dependencies.rs"] mod dependencies; +#[path = "integration/group_pause.rs"] +mod group_pause; #[path = "integration/memo.rs"] mod memo; #[path = "integration/module_features.rs"] diff --git a/tests/integration/group_pause.rs b/tests/integration/group_pause.rs new file mode 100644 index 0000000..c81f527 --- /dev/null +++ b/tests/integration/group_pause.rs @@ -0,0 +1,420 @@ +//! Integration tests: Group Pause edge cases (plan 042, step 6). +//! +//! - 6a: Submit to paused group → inserted as paused +//! - 6b: Recurring task in paused group → next instance paused +//! - 6c: Blocked→pending in paused group → downgraded to paused +//! - 6d: Multi-reason pause interaction (module + group) + +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; +use std::time::Duration; + +use taskmill::{ + Domain, IoBudget, PauseReasons, Scheduler, SchedulerEvent, SubmitOutcome, TaskStatus, + TaskStore, TaskSubmission, +}; +use tokio_util::sync::CancellationToken; + +use super::common::*; + +// ═══════════════════════════════════════════════════════════════════ +// 6a. Submit to paused group +// ═══════════════════════════════════════════════════════════════════ + +#[tokio::test] +async fn submit_to_paused_group_inserts_as_paused() { + let sched = Scheduler::builder() + .store(TaskStore::open_memory().await.unwrap()) + .domain(Domain::::new().task::(NoopExecutor)) + .build() + .await + .unwrap(); + + // Pause the group before submitting. + sched.pause_group("g1").await.unwrap(); + + // Submit a task to the paused group. + let outcome = sched + .submit( + &TaskSubmission::new("test::test") + .key("paused-submit") + .group("g1"), + ) + .await + .unwrap(); + + // Outcome should indicate group_paused. + match outcome { + SubmitOutcome::Inserted { id, group_paused } => { + assert!(group_paused, "group_paused flag should be true"); + let task = sched.store().task_by_id(id).await.unwrap().unwrap(); + assert_eq!(task.status, TaskStatus::Paused); + assert!(task.pause_reasons.contains(PauseReasons::GROUP)); + } + other => panic!("expected Inserted, got {other:?}"), + } + + // Submit to a non-paused group — should NOT be paused. + let outcome2 = sched + .submit( + &TaskSubmission::new("test::test") + .key("normal-submit") + .group("g2"), + ) + .await + .unwrap(); + + match outcome2 { + SubmitOutcome::Inserted { group_paused, .. } => { + assert!(!group_paused, "group_paused should be false for non-paused group"); + } + other => panic!("expected Inserted, got {other:?}"), + } +} + +#[tokio::test] +async fn submit_batch_to_paused_group() { + let sched = Scheduler::builder() + .store(TaskStore::open_memory().await.unwrap()) + .domain(Domain::::new().task::(NoopExecutor)) + .build() + .await + .unwrap(); + + sched.pause_group("g1").await.unwrap(); + + let subs: Vec<_> = (0..3) + .map(|i| { + TaskSubmission::new("test::test") + .key(format!("batch-{i}")) + .group("g1") + }) + .collect(); + + let outcomes = sched.submit_batch(&subs).await.unwrap(); + for outcome in outcomes { + match outcome { + SubmitOutcome::Inserted { id, group_paused } => { + assert!(group_paused); + let task = sched.store().task_by_id(id).await.unwrap().unwrap(); + assert_eq!(task.status, TaskStatus::Paused); + } + other => panic!("expected Inserted, got {other:?}"), + } + } +} + +#[tokio::test] +async fn submit_to_paused_group_resumes_on_group_resume() { + let count = Arc::new(AtomicUsize::new(0)); + + let sched = Scheduler::builder() + .store(TaskStore::open_memory().await.unwrap()) + .domain( + Domain::::new().task::(CountingExecutor { + count: count.clone(), + }), + ) + .max_concurrency(4) + .poll_interval(Duration::from_millis(50)) + .build() + .await + .unwrap(); + + let mut rx = sched.subscribe(); + + // Pause group, submit tasks, then resume — they should all complete. + sched.pause_group("g1").await.unwrap(); + + for i in 0..3 { + sched + .submit( + &TaskSubmission::new("test::test") + .key(format!("resume-{i}")) + .group("g1"), + ) + .await + .unwrap(); + } + + let token = CancellationToken::new(); + let sched_clone = sched.clone(); + let token_clone = token.clone(); + let handle = tokio::spawn(async move { sched_clone.run(token_clone).await }); + + // Brief wait to verify nothing dispatches while paused. + tokio::time::sleep(Duration::from_millis(200)).await; + assert_eq!(count.load(Ordering::SeqCst), 0, "no tasks should run while group is paused"); + + // Resume the group. + sched.resume_group("g1").await.unwrap(); + + // Wait for all 3 completions. + let deadline = tokio::time::Instant::now() + Duration::from_secs(5); + let mut completed = 0; + while tokio::time::Instant::now() < deadline && completed < 3 { + if let Ok(Ok(SchedulerEvent::Completed(..))) = + tokio::time::timeout(Duration::from_millis(100), rx.recv()).await + { + completed += 1; + } + } + + token.cancel(); + let _ = handle.await; + + assert_eq!(completed, 3, "all 3 tasks should complete after group resume"); + assert_eq!(count.load(Ordering::SeqCst), 3); +} + +// ═══════════════════════════════════════════════════════════════════ +// 6b. Recurring task in paused group +// ═══════════════════════════════════════════════════════════════════ + +#[tokio::test] +async fn recurring_next_instance_paused_in_paused_group() { + let sched = Scheduler::builder() + .store(TaskStore::open_memory().await.unwrap()) + .domain(Domain::::new().task::(NoopExecutor)) + .max_concurrency(4) + .poll_interval(Duration::from_millis(50)) + .build() + .await + .unwrap(); + + let store = sched.store(); + + // Submit a recurring task in group g1. + let outcome = sched + .submit( + &TaskSubmission::new("test::test") + .key("recurring-g1") + .group("g1") + .recurring(Duration::from_secs(600)), + ) + .await + .unwrap(); + let id = outcome.id().unwrap(); + + // Pop and run it. + let task = store.pop_by_id(id).await.unwrap().unwrap(); + + // Pause the group while the task is running. + sched.pause_group("g1").await.unwrap(); + + // Complete the task — this should create a next instance. + store + .complete_with_record(&task, &IoBudget::default()) + .await + .unwrap(); + + // The next instance should exist and be paused with GROUP bit. + let key = task.key.clone(); + let next = store.task_by_key(&key).await.unwrap().unwrap(); + assert_eq!(next.status, TaskStatus::Paused); + assert!(next.pause_reasons.contains(PauseReasons::GROUP)); +} + +// ═══════════════════════════════════════════════════════════════════ +// 6c. Blocked→pending in paused group +// ═══════════════════════════════════════════════════════════════════ + +#[tokio::test] +async fn blocked_task_unblocks_into_paused_group() { + let sched = Scheduler::builder() + .store(TaskStore::open_memory().await.unwrap()) + .domain(Domain::::new().task::(NoopExecutor)) + .build() + .await + .unwrap(); + + let store = sched.store(); + let handle = sched.domain::(); + + // Submit dependency task in group g1. + let dep_outcome = handle + .submit_with(TestTask) + .key("dep") + .group("g1") + .await + .unwrap(); + let dep_id = dep_outcome.id().unwrap(); + + // Submit blocked task depending on dep, same group. + let blocked_outcome = handle + .submit_with(TestTask) + .key("blocked") + .group("g1") + .depends_on(dep_id) + .await + .unwrap(); + let blocked_id = blocked_outcome.id().unwrap(); + + // Verify blocked. + let t = store.task_by_id(blocked_id).await.unwrap().unwrap(); + assert_eq!(t.status, TaskStatus::Blocked); + + // Pause the group. + sched.pause_group("g1").await.unwrap(); + + // Complete the dependency. + let dep = store.pop_by_id(dep_id).await.unwrap(); + // dep may be None if pause_group already paused it — use store directly. + if let Some(dep) = dep { + store + .complete_with_record(&dep, &IoBudget::default()) + .await + .unwrap(); + store.resolve_dependents(dep_id).await.unwrap(); + } else { + // The dep was paused by pause_group. Resume it, pop, complete, resolve. + sched.resume_group("g1").await.unwrap(); + let dep = store.pop_by_id(dep_id).await.unwrap().unwrap(); + // Re-pause before completing so the blocked task unblocks into a paused group. + sched.pause_group("g1").await.unwrap(); + store + .complete_with_record(&dep, &IoBudget::default()) + .await + .unwrap(); + store.resolve_dependents(dep_id).await.unwrap(); + } + + // Blocked task should now be paused (not pending) because its group is paused. + let t = store.task_by_id(blocked_id).await.unwrap().unwrap(); + assert_eq!(t.status, TaskStatus::Paused); + assert!(t.pause_reasons.contains(PauseReasons::GROUP)); + + // Resume group — task should become pending. + sched.resume_group("g1").await.unwrap(); + let t = store.task_by_id(blocked_id).await.unwrap().unwrap(); + assert_eq!(t.status, TaskStatus::Pending); + assert!(t.pause_reasons.is_empty()); +} + +// ═══════════════════════════════════════════════════════════════════ +// 6d. Multi-reason pause interaction +// ═══════════════════════════════════════════════════════════════════ + +#[tokio::test] +async fn multi_reason_pause_no_stranding() { + let sched = Scheduler::builder() + .store(TaskStore::open_memory().await.unwrap()) + .domain(Domain::::new().task::(NoopExecutor)) + .build() + .await + .unwrap(); + + let store = sched.store(); + let handle = sched.domain::(); + + // Submit a task in group g1. + let outcome = handle + .submit_with(TestTask) + .key("multi") + .group("g1") + .await + .unwrap(); + let id = outcome.id().unwrap(); + + // Pause by module (adds MODULE bit). + handle.pause().await.unwrap(); + let t = store.task_by_id(id).await.unwrap().unwrap(); + assert_eq!(t.status, TaskStatus::Paused); + assert!(t.pause_reasons.contains(PauseReasons::MODULE)); + + // Pause group (adds GROUP bit to already-paused task). + sched.pause_group("g1").await.unwrap(); + let t = store.task_by_id(id).await.unwrap().unwrap(); + assert!(t.pause_reasons.contains(PauseReasons::MODULE)); + assert!(t.pause_reasons.contains(PauseReasons::GROUP)); + + // Resume module — task stays paused (GROUP bit remains). + handle.resume().await.unwrap(); + let t = store.task_by_id(id).await.unwrap().unwrap(); + assert_eq!(t.status, TaskStatus::Paused); + assert!(!t.pause_reasons.contains(PauseReasons::MODULE)); + assert!(t.pause_reasons.contains(PauseReasons::GROUP)); + + // Resume group — task finally becomes pending. + sched.resume_group("g1").await.unwrap(); + let t = store.task_by_id(id).await.unwrap().unwrap(); + assert_eq!(t.status, TaskStatus::Pending); + assert!(t.pause_reasons.is_empty()); +} + +#[tokio::test] +async fn group_resume_with_module_still_paused() { + let sched = Scheduler::builder() + .store(TaskStore::open_memory().await.unwrap()) + .domain(Domain::::new().task::(NoopExecutor)) + .build() + .await + .unwrap(); + + let store = sched.store(); + let handle = sched.domain::(); + + let outcome = handle + .submit_with(TestTask) + .key("dual") + .group("g1") + .await + .unwrap(); + let id = outcome.id().unwrap(); + + // Both pauses. + handle.pause().await.unwrap(); + sched.pause_group("g1").await.unwrap(); + + // Resume group first — module still holds it paused. + sched.resume_group("g1").await.unwrap(); + let t = store.task_by_id(id).await.unwrap().unwrap(); + assert_eq!(t.status, TaskStatus::Paused); + assert!(t.pause_reasons.contains(PauseReasons::MODULE)); + assert!(!t.pause_reasons.contains(PauseReasons::GROUP)); + + // Resume module — now finally pending. + handle.resume().await.unwrap(); + let t = store.task_by_id(id).await.unwrap().unwrap(); + assert_eq!(t.status, TaskStatus::Pending); + assert!(t.pause_reasons.is_empty()); +} + +// ═══════════════════════════════════════════════════════════════════ +// 6e. ModuleHandle / DomainHandle delegation +// ═══════════════════════════════════════════════════════════════════ + +#[tokio::test] +async fn domain_handle_group_pause_delegation() { + let sched = Scheduler::builder() + .store(TaskStore::open_memory().await.unwrap()) + .domain(Domain::::new().task::(NoopExecutor)) + .build() + .await + .unwrap(); + + let handle = sched.domain::(); + + assert!(!handle.is_group_paused("g1")); + assert!(handle.paused_groups().is_empty()); + + handle.pause_group("g1").await.unwrap(); + assert!(handle.is_group_paused("g1")); + assert_eq!(handle.paused_groups(), vec!["g1".to_string()]); + + // Submit via domain handle — should be paused. + let outcome = handle + .submit_with(TestTask) + .key("via-handle") + .group("g1") + .await + .unwrap(); + match outcome { + SubmitOutcome::Inserted { group_paused, .. } => assert!(group_paused), + other => panic!("expected Inserted, got {other:?}"), + } + + handle.resume_group("g1").await.unwrap(); + assert!(!handle.is_group_paused("g1")); + assert!(handle.paused_groups().is_empty()); +} From 275639892dc7b9e399ed02551a99ff9b1034e3a0 Mon Sep 17 00:00:00 2001 From: DJ Majumdar Date: Mon, 23 Mar 2026 22:02:28 -0700 Subject: [PATCH 8/8] style: format group_pause integration tests --- tests/integration/group_pause.rs | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/tests/integration/group_pause.rs b/tests/integration/group_pause.rs index c81f527..83e51ec 100644 --- a/tests/integration/group_pause.rs +++ b/tests/integration/group_pause.rs @@ -66,7 +66,10 @@ async fn submit_to_paused_group_inserts_as_paused() { match outcome2 { SubmitOutcome::Inserted { group_paused, .. } => { - assert!(!group_paused, "group_paused should be false for non-paused group"); + assert!( + !group_paused, + "group_paused should be false for non-paused group" + ); } other => panic!("expected Inserted, got {other:?}"), } @@ -144,7 +147,11 @@ async fn submit_to_paused_group_resumes_on_group_resume() { // Brief wait to verify nothing dispatches while paused. tokio::time::sleep(Duration::from_millis(200)).await; - assert_eq!(count.load(Ordering::SeqCst), 0, "no tasks should run while group is paused"); + assert_eq!( + count.load(Ordering::SeqCst), + 0, + "no tasks should run while group is paused" + ); // Resume the group. sched.resume_group("g1").await.unwrap(); @@ -163,7 +170,10 @@ async fn submit_to_paused_group_resumes_on_group_resume() { token.cancel(); let _ = handle.await; - assert_eq!(completed, 3, "all 3 tasks should complete after group resume"); + assert_eq!( + completed, 3, + "all 3 tasks should complete after group resume" + ); assert_eq!(count.load(Ordering::SeqCst), 3); }