Skip to content
2 changes: 1 addition & 1 deletion docs/persistence-and-recovery.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ use taskmill::SubmitOutcome;

let outcome = scheduler.domain::<App>().submit(task).await?;
match outcome {
SubmitOutcome::Inserted(id) => println!("new task: {id}"),
SubmitOutcome::Inserted { id, group_paused } => println!("new task: {id}, group_paused: {group_paused}"),
SubmitOutcome::Duplicate => println!("already queued"),
SubmitOutcome::Upgraded(id) => println!("priority upgraded: {id}"),
SubmitOutcome::Requeued(id) => println!("requeued from history: {id}"),
Expand Down
1 change: 1 addition & 0 deletions migrations/001_tasks.sql
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ CREATE TABLE IF NOT EXISTS tasks (
recurring_max_executions INTEGER,
recurring_execution_count INTEGER NOT NULL DEFAULT 0,
recurring_paused INTEGER NOT NULL DEFAULT 0,
pause_reasons INTEGER NOT NULL DEFAULT 0,
UNIQUE(key)
);

Expand Down
7 changes: 7 additions & 0 deletions migrations/010_paused_groups.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
-- Group-level pause state.
-- New columns use epoch milliseconds (INTEGER) per the epoch-for-storage convention.
CREATE TABLE IF NOT EXISTS paused_groups (
group_key TEXT NOT NULL PRIMARY KEY,
paused_at INTEGER NOT NULL, -- bound from Rust: Utc::now().timestamp_millis()
resume_at INTEGER -- optional auto-resume deadline (epoch milliseconds)
);
23 changes: 23 additions & 0 deletions src/domain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -601,6 +601,29 @@ impl<D: DomainKey> DomainHandle<D> {
self.inner.is_paused()
}

// ── Group pause / resume ────────────────────────────────────────

/// Pause all tasks in a group. Delegates to the scheduler — group pause is
/// global, not scoped to this domain.
pub async fn pause_group(&self, group_key: &str) -> Result<(), StoreError> {
self.inner.pause_group(group_key).await
}

/// Resume a paused group.
pub async fn resume_group(&self, group_key: &str) -> Result<(), StoreError> {
self.inner.resume_group(group_key).await
}

/// Check if a group is paused.
pub fn is_group_paused(&self, group_key: &str) -> bool {
self.inner.is_group_paused(group_key)
}

/// List all currently paused groups.
pub fn paused_groups(&self) -> Vec<String> {
self.inner.paused_groups()
}

// ── Queries ─────────────────────────────────────────────────────

/// Capture a status snapshot for this domain.
Expand Down
9 changes: 5 additions & 4 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -749,7 +749,7 @@
//! let sync: DomainHandle<Sync> = scheduler.domain::<Sync>();
//! let outcome = sync.submit(SyncFile { path: "path/to/file.txt".into() }).await?;
//! // outcome is Superseded { new_task_id, replaced_task_id } if a duplicate existed,
//! // or Inserted(id) if this was the first submission.
//! // or Inserted { id, group_paused } if this was the first submission.
//! ```
//!
//! # How the dispatch loop works
Expand Down Expand Up @@ -806,13 +806,14 @@ pub use resource::network_pressure::NetworkPressure;
pub use resource::sampler::SamplerConfig;
pub use resource::{ResourceReader, ResourceSampler, ResourceSnapshot};
pub use scheduler::{
EstimatedProgress, GroupLimits, ProgressReporter, Scheduler, SchedulerBuilder, SchedulerConfig,
SchedulerEvent, SchedulerSnapshot, ShutdownMode, TaskEventHeader, TaskProgress,
EstimatedProgress, GroupLimits, PausedGroupInfo, ProgressReporter, Scheduler, SchedulerBuilder,
SchedulerConfig, SchedulerEvent, SchedulerSnapshot, ShutdownMode, TaskEventHeader,
TaskProgress,
};
pub use store::{RetentionPolicy, StoreConfig, StoreError, TaskStore};
pub use task::{
generate_dedup_key, BackoffStrategy, BatchOutcome, BatchSubmission, DependencyFailurePolicy,
DuplicateStrategy, HistoryStatus, IoBudget, ParentResolution, RecurringSchedule,
DuplicateStrategy, HistoryStatus, IoBudget, ParentResolution, PauseReasons, RecurringSchedule,
RecurringScheduleInfo, RetryPolicy, SubmitOutcome, TaskError, TaskHistoryRecord, TaskLookup,
TaskRecord, TaskStatus, TaskSubmission, TtlFrom, TypeStats, TypedTask, MAX_TAGS_PER_TASK,
MAX_TAG_KEY_LEN, MAX_TAG_VALUE_LEN,
Expand Down
23 changes: 23 additions & 0 deletions src/module.rs
Original file line number Diff line number Diff line change
Expand Up @@ -661,6 +661,29 @@ impl ModuleHandle {
.is_some_and(|f| f.load(AtomicOrdering::Acquire))
}

// ── Group pause / resume ───────────────────────────────────────

/// Pause all tasks in a group. This is scheduler-global — a group paused
/// via one module's handle affects all modules.
pub async fn pause_group(&self, group_key: &str) -> Result<(), StoreError> {
self.scheduler.pause_group(group_key).await
}

/// Resume a paused group.
pub async fn resume_group(&self, group_key: &str) -> Result<(), StoreError> {
self.scheduler.resume_group(group_key).await
}

/// Check if a group is paused.
pub fn is_group_paused(&self, group_key: &str) -> bool {
self.scheduler.is_group_paused(group_key)
}

/// List all currently paused groups.
pub fn paused_groups(&self) -> Vec<String> {
self.scheduler.paused_groups()
}

// ── Module concurrency ────────────────────────────────────────

/// Set the maximum number of tasks from this module that may run concurrently.
Expand Down
24 changes: 21 additions & 3 deletions src/scheduler/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,23 @@ impl SchedulerBuilder {
module_state,
);

// Load persisted group pause state (survives restarts).
let paused_groups_rows = scheduler.inner.store.paused_groups().await?;
let paused_groups_set: std::collections::HashSet<String> =
paused_groups_rows.iter().map(|(k, _)| k.clone()).collect();
let has_paused_groups = !paused_groups_set.is_empty();
*scheduler.inner.paused_groups.write().unwrap() = paused_groups_set;

// Store builder-computed flags for maybe_restore_fast_dispatch().
scheduler
.inner
.has_pressure_sources
.store(has_pressure, std::sync::atomic::Ordering::Relaxed);
scheduler.inner.has_resource_monitoring.store(
self.enable_resource_monitoring,
std::sync::atomic::Ordering::Relaxed,
);

// Compute fast-dispatch eligibility before consuming builder fields.
let has_groups =
self.default_group_concurrency > 0 || !self.group_concurrency_overrides.is_empty();
Expand Down Expand Up @@ -482,9 +499,10 @@ impl SchedulerBuilder {
}

// Enable fast dispatch (single pop_next instead of peek + gate + claim)
// when no groups, no resource monitoring, no pressure sources, and no
// module caps are configured.
if !has_groups && !has_monitoring && !has_pressure && !has_module_caps {
// when no groups, no resource monitoring, no pressure sources, no
// module caps, and no paused groups are present.
if !has_groups && !has_monitoring && !has_pressure && !has_module_caps && !has_paused_groups
{
scheduler
.inner
.fast_dispatch
Expand Down
198 changes: 196 additions & 2 deletions src/scheduler/control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

use std::sync::atomic::Ordering as AtomicOrdering;

use chrono::{DateTime, Utc};

use crate::store::StoreError;

use super::{emit_event, Scheduler, SchedulerEvent};

impl Scheduler {
Expand Down Expand Up @@ -42,10 +46,16 @@ impl Scheduler {
/// Resume the scheduler after a [`pause_all`](Self::pause_all).
///
/// Clears the pause flag so the run loop will resume dispatching on
/// its next poll tick. Tasks that were paused in the store will be
/// picked up automatically.
/// its next poll tick. Also clears the GLOBAL bit from all paused tasks
/// in the store — tasks with no remaining pause reasons transition back
/// to pending.
pub async fn resume_all(&self) {
self.inner.paused.store(false, AtomicOrdering::Release);
let _ = self
.inner
.store
.clear_pause_bit(crate::task::PauseReasons::GLOBAL)
.await;
self.inner.work_notify.notify_one();
emit_event(&self.inner.event_tx, SchedulerEvent::Resumed);
tracing::info!("scheduler resumed");
Expand Down Expand Up @@ -74,4 +84,188 @@ impl Scheduler {
pub fn set_default_group_concurrency(&self, limit: usize) {
self.inner.group_limits.set_default(limit);
}

// ── Group Pause / Resume ───────────────────────────────────────

/// Shared implementation for `pause_group` and `pause_group_until`.
/// Persists state, updates in-memory set, pauses pending + running tasks.
///
/// **Ordering is load-bearing**: Step 1 (persist) MUST happen before step 4
/// (pause pending) and step 5 (cancel running). If a running task completes
/// between steps 1 and 5, it may trigger recurring instance creation or
/// dependency resolution. Both code paths check the `paused_groups` table —
/// which is already populated by step 1 — so the new instance/unblocked
/// task will be correctly inserted as `'paused'`.
async fn pause_group_inner(
&self,
group_key: &str,
resume_at: Option<i64>,
) -> Result<Option<u64>, StoreError> {
// 1. Persist pause state (idempotent).
let newly_paused = self
.inner
.store
.pause_group_state(group_key, resume_at)
.await?;
if !newly_paused {
return Ok(None); // Already paused — no-op.
}

// 2. Add to in-memory set.
self.inner
.paused_groups
.write()
.unwrap()
.insert(group_key.to_string());

// 3. Disable fast dispatch (gate checks now needed).
self.inner
.fast_dispatch
.store(false, AtomicOrdering::Relaxed);

// 4. Pause pending tasks in the group (pending → paused, already-paused get GROUP bit).
let pending_paused = self.inner.store.pause_tasks_in_group(group_key).await?;

// 5. Cancel and pause running tasks in the group.
// Note: does NOT set has_paused_tasks — the auto-resume path only handles
// PREEMPTION-paused tasks, so setting the flag would cause an unnecessary
// query that returns empty.
let running_paused = self
.inner
.active
.pause_group(group_key, &self.inner.store, &self.inner.event_tx)
.await;

let _ = self.inner.event_tx.send(SchedulerEvent::GroupPaused {
group: group_key.to_string(),
pending_count: pending_paused as usize,
running_count: running_paused,
});

Ok(Some(pending_paused))
}

/// Pause all tasks in a group.
///
/// - Persists the pause state in SQLite (survives restarts).
/// - Pending tasks in the group are set to `paused` status with GROUP bit.
/// - Already-paused tasks get the GROUP bit added (prevents premature resume).
/// - Running tasks are cancelled and moved to `paused` with GROUP bit.
/// They will resume from the beginning on group resume.
/// - New submissions to the group are accepted but paused immediately.
/// - TTL is NOT frozen — `expires_at` remains a hard deadline.
/// - Idempotent: pausing an already-paused group is a no-op.
pub async fn pause_group(&self, group_key: &str) -> Result<(), StoreError> {
let Some(pending_paused) = self.pause_group_inner(group_key, None).await? else {
return Ok(());
};
tracing::info!(group = group_key, pending_paused, "group paused");
Ok(())
}

/// Resume a paused group.
///
/// - Clears the GROUP bit. Tasks with no remaining bits become `pending`.
/// Tasks with other bits (e.g., MODULE) stay paused under those reasons.
/// - Removes the group from the persisted pause state.
/// - Idempotent: resuming a non-paused group is a no-op.
pub async fn resume_group(&self, group_key: &str) -> Result<(), StoreError> {
if !self.inner.paused_groups.read().unwrap().contains(group_key) {
return Ok(());
}

// Resume tasks (clear GROUP bit; sole-group tasks become pending).
let resumed_count = self.inner.store.resume_paused_by_group(group_key).await?;

self.inner.store.resume_group_state(group_key).await?;

// Drop the write lock BEFORE calling maybe_restore_fast_dispatch
// to avoid deadlock (RwLock is not reentrant).
let is_empty = {
let mut set = self.inner.paused_groups.write().unwrap();
set.remove(group_key);
set.is_empty()
};
if is_empty {
self.maybe_restore_fast_dispatch();
}

if resumed_count > 0 {
self.inner.work_notify.notify_one();
}

let _ = self.inner.event_tx.send(SchedulerEvent::GroupResumed {
group: group_key.to_string(),
resumed_count: resumed_count as usize,
});

tracing::info!(group = group_key, resumed_count, "group resumed");
Ok(())
}

/// Time-boxed pause: pause a group until a deadline, then auto-resume.
/// Cancels running tasks immediately (same as `pause_group`).
///
/// **Latency**: Auto-resume is checked every ~5 seconds (throttled to avoid
/// per-cycle DB queries), so the group may remain paused up to ~5 seconds
/// past the deadline. For sub-second precision, use an external timer that
/// calls `resume_group` directly.
pub async fn pause_group_until(
&self,
group_key: &str,
deadline: DateTime<Utc>,
) -> Result<(), StoreError> {
let Some(pending_paused) = self
.pause_group_inner(group_key, Some(deadline.timestamp_millis()))
.await?
else {
return Ok(());
};
tracing::info!(group = group_key, pending_paused, resume_at = %deadline, "group paused until deadline");
Ok(())
}

/// List currently paused groups (synchronous — reads in-memory set).
pub fn paused_groups(&self) -> Vec<String> {
self.inner
.paused_groups
.read()
.unwrap()
.iter()
.cloned()
.collect()
}

/// Check if a specific group is paused.
pub fn is_group_paused(&self, group_key: &str) -> bool {
self.inner.paused_groups.read().unwrap().contains(group_key)
}

/// Re-evaluate whether fast dispatch can be re-enabled.
///
/// Must mirror the conditions in `SchedulerBuilder::build()`:
/// no paused groups, no group limits (default or overrides), no resource
/// monitoring, no pressure sources, no module concurrency caps.
fn maybe_restore_fast_dispatch(&self) {
let has_groups = self.inner.group_limits.default_limit() > 0
|| self.inner.group_limits.has_overrides()
|| !self.inner.paused_groups.read().unwrap().is_empty();
let has_module_caps = !self.inner.module_caps.read().unwrap().is_empty();

if !has_groups
&& !self
.inner
.has_resource_monitoring
.load(AtomicOrdering::Relaxed)
&& !has_module_caps
&& !self
.inner
.has_pressure_sources
.load(AtomicOrdering::Relaxed)
{
self.inner
.fast_dispatch
.store(true, AtomicOrdering::Relaxed);
}
}
}
Loading
Loading