From 3289e194a78e51651309f6bb9ac6c830bdbd376c Mon Sep 17 00:00:00 2001 From: DJ Majumdar Date: Sun, 15 Mar 2026 09:27:26 -0700 Subject: [PATCH 1/2] feat: task TTL with automatic expiry, sweep, and child inheritance Add configurable per-task, per-type, and global default TTL with two clock modes (submission time or first attempt). Expired tasks are caught at dispatch time and by a periodic background sweep. Children without explicit TTL inherit the remaining parent TTL. Cascade-expires pending/paused children when a parent expires. --- migrations/004_ttl.sql | 13 +++ src/lib.rs | 2 +- src/registry/child_spawner.rs | 53 +++++++++++- src/registry/mod.rs | 29 +++++++ src/scheduler/builder.rs | 51 ++++++++++- src/scheduler/dispatch.rs | 10 ++- src/scheduler/event.rs | 17 +++- src/scheduler/mod.rs | 9 ++ src/scheduler/run_loop.rs | 57 ++++++++++++ src/scheduler/submit.rs | 33 +++++-- src/store/lifecycle.rs | 157 +++++++++++++++++++++++++++++++++- src/store/mod.rs | 1 + src/store/row_mapping.rs | 14 ++- src/store/submit.rs | 45 ++++++++-- src/task/mod.rs | 48 +++++++++++ src/task/submission.rs | 48 ++++++++++- src/task/typed.rs | 14 ++- 17 files changed, 575 insertions(+), 26 deletions(-) create mode 100644 migrations/004_ttl.sql diff --git a/migrations/004_ttl.sql b/migrations/004_ttl.sql new file mode 100644 index 0000000..e484328 --- /dev/null +++ b/migrations/004_ttl.sql @@ -0,0 +1,13 @@ +-- Migration 004: Task TTL / automatic expiry +-- Adds TTL columns to both tasks and task_history tables. + +ALTER TABLE tasks ADD COLUMN ttl_seconds INTEGER; +ALTER TABLE tasks ADD COLUMN ttl_from TEXT NOT NULL DEFAULT 'submission'; +ALTER TABLE tasks ADD COLUMN expires_at TEXT; + +ALTER TABLE task_history ADD COLUMN ttl_seconds INTEGER; +ALTER TABLE task_history ADD COLUMN ttl_from TEXT NOT NULL DEFAULT 'submission'; +ALTER TABLE task_history ADD COLUMN expires_at TEXT; + +CREATE INDEX IF NOT EXISTS idx_tasks_expires ON tasks (expires_at ASC) + WHERE expires_at IS NOT NULL AND status IN ('pending', 'paused'); diff --git a/src/lib.rs b/src/lib.rs index 6fef185..fffe7f3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -487,7 +487,7 @@ pub use store::{RetentionPolicy, StoreConfig, StoreError, TaskStore}; pub use task::{ generate_dedup_key, BatchOutcome, BatchSubmission, DuplicateStrategy, HistoryStatus, IoBudget, ParentResolution, SubmitOutcome, TaskError, TaskHistoryRecord, TaskLookup, TaskRecord, - TaskStatus, TaskSubmission, TypeStats, TypedTask, + TaskStatus, TaskSubmission, TtlFrom, TypeStats, TypedTask, }; #[cfg(feature = "sysinfo-monitor")] diff --git a/src/registry/child_spawner.rs b/src/registry/child_spawner.rs index 9544104..0b8e088 100644 --- a/src/registry/child_spawner.rs +++ b/src/registry/child_spawner.rs @@ -1,9 +1,12 @@ //! Child task spawning from within an executor. use std::sync::Arc; +use std::time::Duration; + +use chrono::{DateTime, Utc}; use crate::store::{StoreError, TaskStore}; -use crate::task::{SubmitOutcome, TaskSubmission}; +use crate::task::{SubmitOutcome, TaskSubmission, TtlFrom}; /// Handle for spawning child tasks from within an executor. /// @@ -11,11 +14,18 @@ use crate::task::{SubmitOutcome, TaskSubmission}; /// child submissions automatically inherit the parent relationship. /// Holds a `Notify` reference to wake the scheduler run loop after /// spawning, so children are dispatched promptly. +/// +/// If the parent has a TTL, children without an explicit TTL inherit +/// the remaining parent TTL (with `TtlFrom::Submission`). #[derive(Clone)] pub(crate) struct ChildSpawner { store: TaskStore, parent_id: i64, work_notify: Arc, + parent_created_at: DateTime, + parent_ttl_seconds: Option, + parent_ttl_from: TtlFrom, + parent_started_at: Option>, } impl ChildSpawner { @@ -23,17 +33,57 @@ impl ChildSpawner { store: TaskStore, parent_id: i64, work_notify: Arc, + parent_created_at: DateTime, + parent_ttl_seconds: Option, + parent_ttl_from: TtlFrom, + parent_started_at: Option>, ) -> Self { Self { store, parent_id, work_notify, + parent_created_at, + parent_ttl_seconds, + parent_ttl_from, + parent_started_at, + } + } + + /// Compute the remaining parent TTL and apply it to a child submission + /// if the child doesn't have an explicit TTL. + fn inherit_ttl(&self, sub: &mut TaskSubmission) { + if sub.ttl.is_some() { + return; // Child has explicit TTL, don't override. + } + let Some(parent_ttl_secs) = self.parent_ttl_seconds else { + return; // Parent has no TTL. + }; + let parent_ttl = Duration::from_secs(parent_ttl_secs as u64); + + // Determine when the parent's TTL started. + let ttl_start = match self.parent_ttl_from { + TtlFrom::Submission => self.parent_created_at, + TtlFrom::FirstAttempt => match self.parent_started_at { + Some(started) => started, + None => return, // Parent hasn't started yet, can't compute remaining. + }, + }; + + let elapsed = Utc::now() - ttl_start; + let elapsed_std = elapsed.to_std().unwrap_or_default(); + + if let Some(remaining) = parent_ttl.checked_sub(elapsed_std) { + if remaining > Duration::ZERO { + sub.ttl = Some(remaining); + sub.ttl_from = TtlFrom::Submission; + } } } /// Submit a single child task. Sets `parent_id` automatically. pub async fn spawn(&self, mut sub: TaskSubmission) -> Result { sub.parent_id = Some(self.parent_id); + self.inherit_ttl(&mut sub); let outcome = self.store.submit(&sub).await?; self.work_notify.notify_one(); Ok(outcome) @@ -46,6 +96,7 @@ impl ChildSpawner { ) -> Result, StoreError> { for sub in submissions.iter_mut() { sub.parent_id = Some(self.parent_id); + self.inherit_ttl(sub); } let outcomes = self.store.submit_batch(submissions).await?; self.work_notify.notify_one(); diff --git a/src/registry/mod.rs b/src/registry/mod.rs index 39f317a..a050e45 100644 --- a/src/registry/mod.rs +++ b/src/registry/mod.rs @@ -103,6 +103,7 @@ pub trait TaskExecutor: Send + Sync + 'static { /// After construction, the registry is immutable (shared via `Arc`). pub struct TaskTypeRegistry { types: HashMap>, + type_ttls: HashMap, } /// Object-safe wrapper around [`TaskExecutor`] for dynamic dispatch in the registry. @@ -155,6 +156,7 @@ impl TaskTypeRegistry { pub fn new() -> Self { Self { types: HashMap::new(), + type_ttls: HashMap::new(), } } @@ -170,6 +172,22 @@ impl TaskTypeRegistry { .insert(name.to_string(), executor as Arc); } + /// Register an executor with a per-type default TTL. + pub fn register_with_ttl( + &mut self, + name: &str, + executor: Arc, + ttl: std::time::Duration, + ) { + self.register(name, executor); + self.type_ttls.insert(name.to_string(), ttl); + } + + /// Look up the per-type default TTL for a task type. + pub fn type_ttl(&self, name: &str) -> Option<&std::time::Duration> { + self.type_ttls.get(name) + } + /// Look up the executor for a task type. pub(crate) fn get(&self, name: &str) -> Option<&Arc> { self.types.get(name) @@ -198,6 +216,17 @@ impl TaskTypeRegistry { } self.types.insert(name.to_string(), executor); } + + /// Register a pre-erased executor with a per-type TTL. + pub(crate) fn register_erased_with_ttl( + &mut self, + name: &str, + executor: Arc, + ttl: std::time::Duration, + ) { + self.register_erased(name, executor); + self.type_ttls.insert(name.to_string(), ttl); + } } impl Default for TaskTypeRegistry { diff --git a/src/scheduler/builder.rs b/src/scheduler/builder.rs index 32d85a3..27dc334 100644 --- a/src/scheduler/builder.rs +++ b/src/scheduler/builder.rs @@ -41,7 +41,11 @@ pub struct SchedulerBuilder { store_path: Option, store_config: StoreConfig, store: Option, - executors: Vec<(String, Arc)>, + executors: Vec<( + String, + Arc, + Option, + )>, config: SchedulerConfig, pressure_sources: Vec>, policy: Option, @@ -98,6 +102,25 @@ impl SchedulerBuilder { self.executors.push(( name.to_string(), executor as Arc, + None, + )); + self + } + + /// Register a task executor with a per-type default TTL. + /// + /// Tasks of this type that don't specify their own TTL will use this + /// duration as their TTL. + pub fn executor_with_ttl( + mut self, + name: &str, + executor: Arc, + ttl: Duration, + ) -> Self { + self.executors.push(( + name.to_string(), + executor as Arc, + Some(ttl), )); self } @@ -158,6 +181,24 @@ impl SchedulerBuilder { self } + /// Set the default TTL applied to tasks without an explicit TTL. + /// + /// `None` (default) means no global TTL. + pub fn default_ttl(mut self, ttl: Duration) -> Self { + self.config.default_ttl = Some(ttl); + self + } + + /// Set the expiry sweep interval. + /// + /// Controls how often the scheduler sweeps for expired tasks. `None` + /// disables periodic sweeps (dispatch-time checks still apply). + /// Default: `Some(30s)`. + pub fn expiry_sweep_interval(mut self, interval: Option) -> Self { + self.config.expiry_sweep_interval = interval; + self + } + /// Add a backpressure source (used by the default gate). pub fn pressure_source( mut self, @@ -289,11 +330,15 @@ impl SchedulerBuilder { // Build registry. let mut registry = crate::registry::TaskTypeRegistry::new(); - for (name, executor) in self.executors { + for (name, executor, ttl) in self.executors { if registry.get(&name).is_some() { panic!("task type '{name}' already registered"); } - registry.register_erased(&name, executor); + if let Some(ttl) = ttl { + registry.register_erased_with_ttl(&name, executor, ttl); + } else { + registry.register_erased(&name, executor); + } } // Prepare resource monitoring reader early so NetworkPressure can diff --git a/src/scheduler/dispatch.rs b/src/scheduler/dispatch.rs index 7a38be8..5ce9969 100644 --- a/src/scheduler/dispatch.rs +++ b/src/scheduler/dispatch.rs @@ -296,7 +296,15 @@ pub(crate) async fn spawn_task( let child_token = CancellationToken::new(); // Build execution context. - let child_spawner = ChildSpawner::new(store.clone(), task.id, work_notify.clone()); + let child_spawner = ChildSpawner::new( + store.clone(), + task.id, + work_notify.clone(), + task.created_at, + task.ttl_seconds, + task.ttl_from, + task.started_at, + ); let io = Arc::new(IoTracker::new()); // Insert into active map before spawning to avoid races. diff --git a/src/scheduler/event.rs b/src/scheduler/event.rs index 7108bef..8c2c909 100644 --- a/src/scheduler/event.rs +++ b/src/scheduler/event.rs @@ -100,6 +100,12 @@ pub enum SchedulerEvent { /// Task IDs that were inserted (new tasks only, not upgrades/requeues). inserted_ids: Vec, }, + /// A task expired because its TTL elapsed before execution started. + TaskExpired { + header: TaskEventHeader, + /// How long the task lived before expiring. + age: Duration, + }, /// The scheduler was globally paused via [`Scheduler::pause_all`]. Paused, /// The scheduler was resumed via [`Scheduler::resume_all`]. @@ -115,7 +121,8 @@ impl SchedulerEvent { } Self::Failed { header, .. } | Self::Progress { header, .. } - | Self::Superseded { old: header, .. } => Some(header), + | Self::Superseded { old: header, .. } + | Self::TaskExpired { header, .. } => Some(header), Self::Waiting { .. } | Self::BatchSubmitted { .. } | Self::Paused | Self::Resumed => { None } @@ -187,6 +194,12 @@ pub struct SchedulerConfig { /// hooks. If a cancel hook does not complete within this duration it is /// aborted. Default: 30 seconds. pub cancel_hook_timeout: Duration, + /// Default TTL applied to tasks that don't specify one. `None` (default) + /// means no global TTL. + pub default_ttl: Option, + /// How often to sweep for expired tasks. `None` disables periodic sweeps + /// (dispatch-time checks still apply). Default: `Some(30s)`. + pub expiry_sweep_interval: Option, } impl Default for SchedulerConfig { @@ -200,6 +213,8 @@ impl Default for SchedulerConfig { shutdown_mode: ShutdownMode::Hard, progress_interval: None, cancel_hook_timeout: Duration::from_secs(30), + default_ttl: None, + expiry_sweep_interval: Some(Duration::from_secs(30)), } } } diff --git a/src/scheduler/mod.rs b/src/scheduler/mod.rs index a933416..4cf4f75 100644 --- a/src/scheduler/mod.rs +++ b/src/scheduler/mod.rs @@ -91,6 +91,12 @@ pub(crate) struct SchedulerInner { pub(crate) group_limits: GroupLimits, /// Timeout for on_cancel hooks. pub(crate) cancel_hook_timeout: Duration, + /// Default TTL for tasks without an explicit TTL. + pub(crate) default_ttl: Option, + /// How often to sweep for expired tasks. + pub(crate) expiry_sweep_interval: Option, + /// Last time the expiry sweep ran. + pub(crate) last_expiry_sweep: std::sync::Mutex, } /// IO-aware priority scheduler. @@ -188,6 +194,9 @@ impl Scheduler { work_notify: Arc::new(Notify::new()), group_limits: GroupLimits::new(), cancel_hook_timeout: config.cancel_hook_timeout, + default_ttl: config.default_ttl, + expiry_sweep_interval: config.expiry_sweep_interval, + last_expiry_sweep: std::sync::Mutex::new(tokio::time::Instant::now()), }), } } diff --git a/src/scheduler/run_loop.rs b/src/scheduler/run_loop.rs index 3b5839b..a67dba6 100644 --- a/src/scheduler/run_loop.rs +++ b/src/scheduler/run_loop.rs @@ -8,6 +8,8 @@ use tokio_util::sync::CancellationToken; use crate::store::StoreError; use crate::task::IoBudget; +use super::SchedulerEvent; + use super::dispatch::{self, SpawnContext}; use super::gate::GateContext; use super::{Scheduler, ShutdownMode}; @@ -44,6 +46,23 @@ impl Scheduler { return Ok(false); }; + // Dispatch-time expiry check: if the candidate has expired, expire + // it and retry (return Ok(true) to loop again). + if let Some(expires_at) = candidate.expires_at { + if expires_at <= chrono::Utc::now() { + if let Ok(Some(task)) = self.inner.store.expire_single(candidate.id).await { + let age = (chrono::Utc::now() - task.created_at) + .to_std() + .unwrap_or_default(); + let _ = self.inner.event_tx.send(SchedulerEvent::TaskExpired { + header: task.event_header(), + age, + }); + } + return Ok(true); + } + } + // Build gate context from current state. let reader_guard = self.inner.resource_reader.lock().await; let gate_ctx = GateContext { @@ -195,12 +214,50 @@ impl Scheduler { } } + /// Run the periodic expiry sweep if the interval has elapsed. + async fn maybe_expire_tasks(&self) { + let Some(interval) = self.inner.expiry_sweep_interval else { + return; + }; + let should_sweep = { + let last = self.inner.last_expiry_sweep.lock().unwrap(); + last.elapsed() >= interval + }; + if !should_sweep { + return; + } + *self.inner.last_expiry_sweep.lock().unwrap() = tokio::time::Instant::now(); + + match self.inner.store.expire_tasks().await { + Ok(expired) => { + for task in &expired { + let age = (chrono::Utc::now() - task.created_at) + .to_std() + .unwrap_or_default(); + let _ = self.inner.event_tx.send(SchedulerEvent::TaskExpired { + header: task.event_header(), + age, + }); + } + if !expired.is_empty() { + tracing::info!(count = expired.len(), "expired stale tasks"); + } + } + Err(e) => { + tracing::error!(error = %e, "expiry sweep failed"); + } + } + } + /// Resume paused tasks, dispatch finalizers, and dispatch pending work. async fn poll_and_dispatch(&self) { if self.is_paused() { return; } + // Run expiry sweep before dispatching. + self.maybe_expire_tasks().await; + // Resume paused tasks only if no active preemptors exist. if let Ok(paused) = self.inner.store.paused_tasks().await { for task in paused { diff --git a/src/scheduler/submit.rs b/src/scheduler/submit.rs index 5c37760..5d1ed35 100644 --- a/src/scheduler/submit.rs +++ b/src/scheduler/submit.rs @@ -16,13 +16,31 @@ use super::progress::ProgressReporter; use super::{Scheduler, SchedulerEvent}; impl Scheduler { + /// Resolve the effective TTL for a submission. + /// + /// Priority: per-task TTL > per-type TTL > global default TTL > None + fn resolve_ttl(&self, sub: &mut TaskSubmission) { + if sub.ttl.is_some() { + return; // per-task TTL takes precedence + } + if let Some(type_ttl) = self.inner.registry.type_ttl(&sub.task_type) { + sub.ttl = Some(*type_ttl); + return; + } + if let Some(default_ttl) = self.inner.default_ttl { + sub.ttl = Some(default_ttl); + } + } + /// Submit a task. /// /// If the task's priority meets the preemption threshold, running tasks /// with lower priority are preempted (their cancellation tokens are cancelled /// and they are paused in the store). pub async fn submit(&self, sub: &TaskSubmission) -> Result { - let outcome = self.inner.store.submit(sub).await?; + let mut sub = sub.clone(); + self.resolve_ttl(&mut sub); + let outcome = self.inner.store.submit(&sub).await?; // Handle superseded tasks. if let SubmitOutcome::Superseded { @@ -72,10 +90,15 @@ impl Scheduler { &self, submissions: &[TaskSubmission], ) -> Result { - let results = self.inner.store.submit_batch(submissions).await?; + // Resolve TTLs for all submissions. + let mut resolved: Vec = submissions.to_vec(); + for sub in &mut resolved { + self.resolve_ttl(sub); + } + let results = self.inner.store.submit_batch(&resolved).await?; // Handle superseded tasks. - for (sub, outcome) in submissions.iter().zip(results.iter()) { + for (sub, outcome) in resolved.iter().zip(results.iter()) { if let SubmitOutcome::Superseded { new_task_id, replaced_task_id, @@ -97,7 +120,7 @@ impl Scheduler { // Find the highest (lowest numeric value) priority among tasks that // were inserted or had their priority upgraded. - let best_priority = submissions + let best_priority = resolved .iter() .zip(results.iter()) .filter(|(_, outcome)| { @@ -124,7 +147,7 @@ impl Scheduler { if any_changed { let inserted_ids = outcome.inserted(); let _ = self.inner.event_tx.send(SchedulerEvent::BatchSubmitted { - count: submissions.len(), + count: resolved.len(), inserted_ids, }); diff --git a/src/store/lifecycle.rs b/src/store/lifecycle.rs index 976efa6..e270819 100644 --- a/src/store/lifecycle.rs +++ b/src/store/lifecycle.rs @@ -27,8 +27,9 @@ pub(crate) async fn insert_history( "INSERT INTO task_history (task_type, key, label, priority, status, payload, expected_read_bytes, expected_write_bytes, expected_net_rx_bytes, expected_net_tx_bytes, actual_read_bytes, actual_write_bytes, actual_net_rx_bytes, actual_net_tx_bytes, - retry_count, last_error, created_at, started_at, duration_ms, parent_id, fail_fast, group_key) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", + retry_count, last_error, created_at, started_at, duration_ms, parent_id, fail_fast, group_key, + ttl_seconds, ttl_from, expires_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", ) .bind(&task.task_type) .bind(&task.key) @@ -55,6 +56,12 @@ pub(crate) async fn insert_history( .bind(task.parent_id) .bind(fail_fast_val) .bind(&task.group_key) + .bind(task.ttl_seconds) + .bind(task.ttl_from.as_str()) + .bind( + task.expires_at + .map(|dt| dt.format("%Y-%m-%d %H:%M:%S").to_string()), + ) .execute(&mut **conn) .await?; Ok(()) @@ -90,10 +97,20 @@ impl TaskStore { /// Atomically claim a specific pending task by id, setting it to running. /// Returns `None` if the task is no longer pending (e.g. claimed by another /// dispatcher or cancelled). + /// + /// For tasks with `ttl_from = 'first_attempt'`, sets `expires_at` on the + /// first pop (when `expires_at IS NULL` and `ttl_seconds IS NOT NULL`). pub async fn pop_by_id(&self, id: i64) -> Result, StoreError> { tracing::debug!(task_id = id, "store.pop_by_id: UPDATE start"); let row = sqlx::query( - "UPDATE tasks SET status = 'running', started_at = datetime('now') + "UPDATE tasks SET + status = 'running', + started_at = datetime('now'), + expires_at = CASE + WHEN ttl_from = 'first_attempt' AND ttl_seconds IS NOT NULL AND expires_at IS NULL + THEN datetime('now', '+' || ttl_seconds || ' seconds') + ELSE expires_at + END WHERE id = ? AND status = 'pending' RETURNING *", ) @@ -107,9 +124,19 @@ impl TaskStore { /// Pop the highest-priority pending task and mark it as running. /// Returns `None` if the queue is empty. + /// + /// For tasks with `ttl_from = 'first_attempt'`, sets `expires_at` on + /// the first pop. pub async fn pop_next(&self) -> Result, StoreError> { let row = sqlx::query( - "UPDATE tasks SET status = 'running', started_at = datetime('now') + "UPDATE tasks SET + status = 'running', + started_at = datetime('now'), + expires_at = CASE + WHEN ttl_from = 'first_attempt' AND ttl_seconds IS NOT NULL AND expires_at IS NULL + THEN datetime('now', '+' || ttl_seconds || ' seconds') + ELSE expires_at + END WHERE id = ( SELECT id FROM tasks WHERE status = 'pending' @@ -403,6 +430,128 @@ impl TaskStore { sqlx::query("COMMIT").execute(&mut *conn).await?; Ok(()) } + + /// Sweep for expired tasks and move them to history. + /// + /// Finds tasks whose `expires_at` has passed and that are still pending + /// or paused, records them in history as "expired", cascade-expires their + /// pending/paused children, and deletes them from the active queue. + /// + /// Returns the expired task records (for event emission). + pub async fn expire_tasks(&self) -> Result, StoreError> { + let mut conn = self.begin_write().await?; + + // Find expired tasks. + let rows = sqlx::query( + "SELECT * FROM tasks + WHERE expires_at IS NOT NULL + AND expires_at <= datetime('now') + AND status IN ('pending', 'paused') + ORDER BY expires_at ASC + LIMIT 500", + ) + .fetch_all(&mut *conn) + .await?; + + let mut expired = Vec::with_capacity(rows.len()); + + for row in &rows { + let task = row_to_task_record(row); + + // Record in history as expired. + insert_history( + &mut conn, + &task, + "expired", + &IoBudget::default(), + None, + None, + ) + .await?; + + // Cascade: expire pending/paused children. + let child_rows = sqlx::query( + "SELECT * FROM tasks + WHERE parent_id = ? AND status IN ('pending', 'paused')", + ) + .bind(task.id) + .fetch_all(&mut *conn) + .await?; + + for child_row in &child_rows { + let child = row_to_task_record(child_row); + insert_history( + &mut conn, + &child, + "expired", + &IoBudget::default(), + None, + None, + ) + .await?; + sqlx::query("DELETE FROM tasks WHERE id = ?") + .bind(child.id) + .execute(&mut *conn) + .await?; + expired.push(child); + } + + // Delete the expired task itself. + sqlx::query("DELETE FROM tasks WHERE id = ?") + .bind(task.id) + .execute(&mut *conn) + .await?; + + expired.push(task); + } + + sqlx::query("COMMIT").execute(&mut *conn).await?; + Ok(expired) + } + + /// Expire a single task by ID if it has passed its `expires_at`. + /// + /// Returns `Some(task)` if the task was expired, `None` if it wasn't + /// found, not expired, or not in an expirable state. + pub async fn expire_single(&self, id: i64) -> Result, StoreError> { + let mut conn = self.begin_write().await?; + + let row = sqlx::query( + "SELECT * FROM tasks + WHERE id = ? + AND expires_at IS NOT NULL + AND expires_at <= datetime('now') + AND status IN ('pending', 'paused')", + ) + .bind(id) + .fetch_optional(&mut *conn) + .await?; + + let Some(row) = row else { + sqlx::query("COMMIT").execute(&mut *conn).await?; + return Ok(None); + }; + + let task = row_to_task_record(&row); + + insert_history( + &mut conn, + &task, + "expired", + &IoBudget::default(), + None, + None, + ) + .await?; + + sqlx::query("DELETE FROM tasks WHERE id = ?") + .bind(task.id) + .execute(&mut *conn) + .await?; + + sqlx::query("COMMIT").execute(&mut *conn).await?; + Ok(Some(task)) + } } #[cfg(test)] diff --git a/src/store/mod.rs b/src/store/mod.rs index b062641..6669851 100644 --- a/src/store/mod.rs +++ b/src/store/mod.rs @@ -217,6 +217,7 @@ impl TaskStore { include_str!("../../migrations/003_net_io_and_groups.sql"), ) .await?; + Self::run_alter_migration(&self.pool, include_str!("../../migrations/004_ttl.sql")).await?; Ok(()) } diff --git a/src/store/row_mapping.rs b/src/store/row_mapping.rs index fec7748..1e92b74 100644 --- a/src/store/row_mapping.rs +++ b/src/store/row_mapping.rs @@ -4,7 +4,7 @@ use chrono::{DateTime, Utc}; use sqlx::Row; use crate::priority::Priority; -use crate::task::{HistoryStatus, IoBudget, TaskHistoryRecord, TaskRecord, TaskStatus}; +use crate::task::{HistoryStatus, IoBudget, TaskHistoryRecord, TaskRecord, TaskStatus, TtlFrom}; pub(crate) fn parse_datetime(s: &str) -> DateTime { // SQLite stores as "YYYY-MM-DD HH:MM:SS". Parse with chrono. @@ -24,6 +24,9 @@ pub(crate) fn row_to_task_record(row: &sqlx::sqlite::SqliteRow) -> TaskRecord { let parent_id: Option = row.get("parent_id"); let fail_fast_val: i32 = row.get("fail_fast"); + let ttl_from_str: String = row.get("ttl_from"); + let expires_at_str: Option = row.get("expires_at"); + TaskRecord { id: row.get("id"), task_type: row.get("task_type"), @@ -47,6 +50,9 @@ pub(crate) fn row_to_task_record(row: &sqlx::sqlite::SqliteRow) -> TaskRecord { parent_id, fail_fast: fail_fast_val != 0, group_key: row.get("group_key"), + ttl_seconds: row.get("ttl_seconds"), + ttl_from: ttl_from_str.parse().unwrap_or(TtlFrom::Submission), + expires_at: expires_at_str.map(|s| parse_datetime(&s)), } } @@ -71,6 +77,9 @@ pub(crate) fn row_to_history_record(row: &sqlx::sqlite::SqliteRow) -> TaskHistor net_tx: actual_tx.unwrap_or(0), }); + let ttl_from_str: String = row.get("ttl_from"); + let expires_at_str: Option = row.get("expires_at"); + TaskHistoryRecord { id: row.get("id"), task_type: row.get("task_type"), @@ -95,5 +104,8 @@ pub(crate) fn row_to_history_record(row: &sqlx::sqlite::SqliteRow) -> TaskHistor parent_id, fail_fast: fail_fast_val != 0, group_key: row.get("group_key"), + ttl_seconds: row.get("ttl_seconds"), + ttl_from: ttl_from_str.parse().unwrap_or(TtlFrom::Submission), + expires_at: expires_at_str.map(|s| parse_datetime(&s)), } } diff --git a/src/store/submit.rs b/src/store/submit.rs index bf59cb8..a228460 100644 --- a/src/store/submit.rs +++ b/src/store/submit.rs @@ -5,7 +5,7 @@ use std::collections::HashMap; use sqlx::Row; -use crate::task::{DuplicateStrategy, SubmitOutcome, TaskSubmission, MAX_PAYLOAD_BYTES}; +use crate::task::{DuplicateStrategy, SubmitOutcome, TaskSubmission, TtlFrom, MAX_PAYLOAD_BYTES}; use super::row_mapping::row_to_task_record; use super::{StoreError, TaskStore}; @@ -32,9 +32,20 @@ pub(crate) async fn submit_one( let priority = sub.priority.value() as i32; let fail_fast_val: i32 = if sub.fail_fast { 1 } else { 0 }; + // Compute TTL columns. + let ttl_seconds = sub.ttl.map(|d| d.as_secs() as i64); + let ttl_from_str = sub.ttl_from.as_str(); + let expires_at: Option = match (sub.ttl, sub.ttl_from) { + (Some(ttl), TtlFrom::Submission) => { + let exp = chrono::Utc::now() + ttl; + Some(exp.format("%Y-%m-%d %H:%M:%S").to_string()) + } + _ => None, // FirstAttempt: set on pop; no TTL: NULL + }; + let result = sqlx::query( - "INSERT OR IGNORE INTO tasks (task_type, key, label, priority, payload, expected_read_bytes, expected_write_bytes, expected_net_rx_bytes, expected_net_tx_bytes, parent_id, fail_fast, group_key) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", + "INSERT OR IGNORE INTO tasks (task_type, key, label, priority, payload, expected_read_bytes, expected_write_bytes, expected_net_rx_bytes, expected_net_tx_bytes, parent_id, fail_fast, group_key, ttl_seconds, ttl_from, expires_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", ) .bind(&sub.task_type) .bind(&key) @@ -48,6 +59,9 @@ pub(crate) async fn submit_one( .bind(sub.parent_id) .bind(fail_fast_val) .bind(&sub.group_key) + .bind(ttl_seconds) + .bind(ttl_from_str) + .bind(&expires_at) .execute(&mut **conn) .await?; @@ -145,6 +159,17 @@ pub(crate) async fn supersede_existing( let priority = sub.priority.value() as i32; let fail_fast_val: i32 = if sub.fail_fast { 1 } else { 0 }; + // Compute TTL columns for the new submission. + let ttl_seconds = sub.ttl.map(|d| d.as_secs() as i64); + let ttl_from_str = sub.ttl_from.as_str(); + let expires_at: Option = match (sub.ttl, sub.ttl_from) { + (Some(ttl), TtlFrom::Submission) => { + let exp = chrono::Utc::now() + ttl; + Some(exp.format("%Y-%m-%d %H:%M:%S").to_string()) + } + _ => None, + }; + match existing.status { crate::task::TaskStatus::Pending | crate::task::TaskStatus::Paused => { // In-place update — keeps the row ID and queue position. @@ -154,7 +179,8 @@ pub(crate) async fn supersede_existing( expected_read_bytes = ?, expected_write_bytes = ?, expected_net_rx_bytes = ?, expected_net_tx_bytes = ?, retry_count = 0, last_error = NULL, status = 'pending', - requeue = 0, requeue_priority = NULL, fail_fast = ?, group_key = ? + requeue = 0, requeue_priority = NULL, fail_fast = ?, group_key = ?, + ttl_seconds = ?, ttl_from = ?, expires_at = ? WHERE id = ?", ) .bind(&sub.label) @@ -166,6 +192,9 @@ pub(crate) async fn supersede_existing( .bind(sub.expected_io.net_tx) .bind(fail_fast_val) .bind(&sub.group_key) + .bind(ttl_seconds) + .bind(ttl_from_str) + .bind(&expires_at) .bind(replaced_id) .execute(&mut **conn) .await?; @@ -185,8 +214,9 @@ pub(crate) async fn supersede_existing( let result = sqlx::query( "INSERT INTO tasks (task_type, key, label, priority, payload, expected_read_bytes, expected_write_bytes, expected_net_rx_bytes, - expected_net_tx_bytes, parent_id, fail_fast, group_key) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", + expected_net_tx_bytes, parent_id, fail_fast, group_key, + ttl_seconds, ttl_from, expires_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", ) .bind(&sub.task_type) .bind(key) @@ -200,6 +230,9 @@ pub(crate) async fn supersede_existing( .bind(sub.parent_id) .bind(fail_fast_val) .bind(&sub.group_key) + .bind(ttl_seconds) + .bind(ttl_from_str) + .bind(&expires_at) .execute(&mut **conn) .await?; diff --git a/src/task/mod.rs b/src/task/mod.rs index aec88ca..ebd7699 100644 --- a/src/task/mod.rs +++ b/src/task/mod.rs @@ -33,6 +33,39 @@ pub use submission::{ }; pub use typed::TypedTask; +/// When the TTL clock starts ticking. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum TtlFrom { + /// TTL starts at submission time (default). `expires_at` is set immediately. + #[default] + Submission, + /// TTL starts at first execution attempt. `expires_at` is set when the + /// task transitions from pending to running for the first time. + FirstAttempt, +} + +impl TtlFrom { + pub fn as_str(self) -> &'static str { + match self { + Self::Submission => "submission", + Self::FirstAttempt => "first_attempt", + } + } +} + +impl std::str::FromStr for TtlFrom { + type Err = String; + + fn from_str(s: &str) -> Result { + match s { + "submission" => Ok(Self::Submission), + "first_attempt" => Ok(Self::FirstAttempt), + other => Err(format!("unknown TtlFrom: {other}")), + } + } +} + /// Lifecycle state of a task in the active queue. #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] #[serde(rename_all = "snake_case")] @@ -79,6 +112,7 @@ pub enum HistoryStatus { Failed, Cancelled, Superseded, + Expired, } impl HistoryStatus { @@ -88,6 +122,7 @@ impl HistoryStatus { Self::Failed => "failed", Self::Cancelled => "cancelled", Self::Superseded => "superseded", + Self::Expired => "expired", } } } @@ -101,6 +136,7 @@ impl std::str::FromStr for HistoryStatus { "failed" => Ok(Self::Failed), "cancelled" => Ok(Self::Cancelled), "superseded" => Ok(Self::Superseded), + "expired" => Ok(Self::Expired), other => Err(format!("unknown HistoryStatus: {other}")), } } @@ -136,6 +172,12 @@ pub struct TaskRecord { /// Optional group key for per-group concurrency limiting (e.g. an /// endpoint URL). Tasks in the same group share a concurrency budget. pub group_key: Option, + /// Original TTL duration in seconds. `None` means no TTL. + pub ttl_seconds: Option, + /// When the TTL clock starts. + pub ttl_from: TtlFrom, + /// Pre-computed expiry datetime. `None` means never expires. + pub expires_at: Option>, } impl TaskRecord { @@ -189,6 +231,12 @@ pub struct TaskHistoryRecord { pub fail_fast: bool, /// Optional group key for per-group concurrency limiting. pub group_key: Option, + /// Original TTL duration in seconds. `None` means no TTL. + pub ttl_seconds: Option, + /// When the TTL clock starts. + pub ttl_from: TtlFrom, + /// Pre-computed expiry datetime. `None` means never expires. + pub expires_at: Option>, } /// IO budget for a task: expected or actual disk and network IO bytes. diff --git a/src/task/submission.rs b/src/task/submission.rs index cb795ee..ca72872 100644 --- a/src/task/submission.rs +++ b/src/task/submission.rs @@ -2,13 +2,15 @@ //! for building batches with shared defaults, [`SubmitOutcome`] for per-task //! results, and [`BatchOutcome`] for categorized batch summaries. +use std::time::Duration; + use serde::{Deserialize, Serialize}; use crate::priority::Priority; use super::dedup::generate_dedup_key; use super::typed::TypedTask; -use super::IoBudget; +use super::{IoBudget, TtlFrom}; /// Strategy for handling duplicate dedup keys on submission. /// @@ -187,6 +189,7 @@ impl IntoIterator for BatchOutcome { pub struct BatchSubmission { default_group: Option, default_priority: Option, + default_ttl: Option, tasks: Vec, } @@ -196,6 +199,7 @@ impl BatchSubmission { Self { default_group: None, default_priority: None, + default_ttl: None, tasks: Vec::new(), } } @@ -206,6 +210,12 @@ impl BatchSubmission { self } + /// Set a default TTL applied to tasks that don't specify one. + pub fn default_ttl(mut self, ttl: Duration) -> Self { + self.default_ttl = Some(ttl); + self + } + /// Set a default priority applied to tasks still at [`Priority::NORMAL`]. pub fn default_priority(mut self, priority: Priority) -> Self { self.default_priority = Some(priority); @@ -240,6 +250,11 @@ impl BatchSubmission { task.priority = priority; } } + if task.ttl.is_none() { + if let Some(ttl) = self.default_ttl { + task.ttl = Some(ttl); + } + } } self.tasks } @@ -299,6 +314,13 @@ pub struct TaskSubmission { /// Strategy for handling duplicate dedup keys. Default: [`DuplicateStrategy::Skip`]. #[serde(default)] pub on_duplicate: DuplicateStrategy, + /// Time-to-live for the task. If set, the task will be automatically expired + /// if it hasn't started executing within this duration (clock starts + /// based on [`ttl_from`](Self::ttl_from)). + pub ttl: Option, + /// When the TTL clock starts. Default: [`TtlFrom::Submission`]. + #[serde(default)] + pub ttl_from: TtlFrom, /// Deferred serialization error from [`payload_json`](Self::payload_json). /// Surfaced at submit time as [`StoreError::Serialization`](crate::StoreError::Serialization). #[serde(skip)] @@ -334,6 +356,8 @@ impl TaskSubmission { fail_fast: true, group_key: None, on_duplicate: DuplicateStrategy::default(), + ttl: None, + ttl_from: TtlFrom::default(), payload_error: None, } } @@ -420,6 +444,22 @@ impl TaskSubmission { self } + /// Set the time-to-live for the task. + /// + /// If the task hasn't started executing within this duration (from + /// submission or first attempt, depending on [`ttl_from`](Self::ttl_from)), + /// it is automatically expired. + pub fn ttl(mut self, ttl: Duration) -> Self { + self.ttl = Some(ttl); + self + } + + /// Set when the TTL clock starts. Default: [`TtlFrom::Submission`]. + pub fn ttl_from(mut self, ttl_from: TtlFrom) -> Self { + self.ttl_from = ttl_from; + self + } + /// Resolve the effective dedup key. Always incorporates the task type /// so different task types never collide, even with the same logical key. /// @@ -440,7 +480,11 @@ impl TaskSubmission { .priority(task.priority()) .payload_json(task) .expected_io(task.expected_io()) - .on_duplicate(task.on_duplicate()); + .on_duplicate(task.on_duplicate()) + .ttl_from(task.ttl_from()); + if let Some(t) = task.ttl() { + sub = sub.ttl(t); + } if let Some(k) = task.key() { sub = sub.key(k); } diff --git a/src/task/typed.rs b/src/task/typed.rs index 54485b9..82db429 100644 --- a/src/task/typed.rs +++ b/src/task/typed.rs @@ -1,12 +1,14 @@ //! The [`TypedTask`] trait for strongly-typed task payloads. +use std::time::Duration; + use serde::de::DeserializeOwned; use serde::Serialize; use crate::priority::Priority; use super::submission::DuplicateStrategy; -use super::IoBudget; +use super::{IoBudget, TtlFrom}; /// A strongly-typed task that bundles serialization, task type name, and default /// IO estimates. @@ -65,4 +67,14 @@ pub trait TypedTask: Serialize + DeserializeOwned + Send + 'static { fn on_duplicate(&self) -> DuplicateStrategy { DuplicateStrategy::default() } + + /// Optional time-to-live. Default: `None` (no TTL). + fn ttl(&self) -> Option { + None + } + + /// When the TTL clock starts. Default: [`TtlFrom::Submission`]. + fn ttl_from(&self) -> TtlFrom { + TtlFrom::default() + } } From a6e7242fc8a00cfd63790ba55f939b55c1dcfba6 Mon Sep 17 00:00:00 2001 From: DJ Majumdar Date: Sun, 15 Mar 2026 09:35:22 -0700 Subject: [PATCH 2/2] docs: document task TTL feature across all user-facing docs Add TTL concepts, common patterns, and dispatch loop changes to the crate-level module docs. Update configuration, persistence, events, design, glossary, query APIs, quick-start, and Tauri guide with TTL coverage including per-task/per-type/global config, expiry sweep, child inheritance, and the TaskExpired event. --- README.md | 1 + docs/configuration.md | 80 +++++++++++++++++++++++++++++ docs/design.md | 18 ++++--- docs/glossary.md | 2 + docs/guides/tauri-upload-queue.md | 18 +++++++ docs/persistence-and-recovery.md | 31 +++++++++++- docs/progress-and-events.md | 2 + docs/query-apis.md | 2 + docs/quick-start.md | 4 ++ src/lib.rs | 83 ++++++++++++++++++++++++++++--- 10 files changed, 227 insertions(+), 14 deletions(-) diff --git a/README.md b/README.md index c345577..b0df059 100644 --- a/README.md +++ b/README.md @@ -56,6 +56,7 @@ async fn main() { - **Stay responsive** — IO-aware scheduling defers work when disk or network throughput is saturated - **Prioritize what matters** — 256-level priority queue with preemption lets urgent work interrupt background tasks - **Show progress** — executor-reported and throughput-extrapolated progress for real-time UI updates +- **Expire stale work** — configurable TTL (per-task, per-type, or global) automatically expires tasks that haven't started in time - **Avoid duplicate work** — key-based deduplication prevents the same task from being queued twice - **React to system load** — composable backpressure from any signal (disk, network, memory, battery, API limits) - **Control concurrency** — per-group limits (e.g., per S3 bucket), global limits, runtime-adjustable diff --git a/docs/configuration.md b/docs/configuration.md index 9801d6f..61694d6 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -33,6 +33,8 @@ Controls scheduling behavior. Set via builder methods or pass directly to `Sched | `poll_interval` | `Duration` | 500ms | Sleep between dispatch cycles. The scheduler also wakes immediately on submit. | Lower = more responsive but slightly more CPU. 250ms is fine for interactive apps. | | `throughput_sample_size` | `i32` | 20 | Recent completions used for progress extrapolation. | More = smoother estimates but slower to adapt to changes in task behavior. | | `shutdown_mode` | `ShutdownMode` | `Hard` | `Hard` cancels immediately. `Graceful(Duration)` waits for running tasks. | Always use `Graceful` for desktop apps to avoid data loss. | +| `default_ttl` | `Option` | `None` | Global TTL applied to tasks without a per-task or per-type TTL. | Set to catch stale tasks (e.g., `Duration::from_secs(3600)` for 1 hour). | +| `expiry_sweep_interval` | `Option` | `Some(30s)` | How often the scheduler sweeps for expired tasks. `None` disables periodic sweeps (dispatch-time checks still apply). | Lower for latency-sensitive expiry; `None` if you only need dispatch-time checks. | ### Builder methods @@ -132,6 +134,80 @@ When the scheduler stops (the `CancellationToken` passed to `run()` is cancelled Both modes stop the resource sampler. **For desktop apps, always use `Graceful` to avoid interrupting in-progress uploads or file operations.** +## Task TTL (time-to-live) + +Tasks can expire automatically if they haven't started running within a configurable duration. TTL is resolved with a priority chain: **per-task > per-type > global default > none**. + +### Per-task TTL + +Set directly on a submission: + +```rust +use std::time::Duration; +use taskmill::{TaskSubmission, TtlFrom}; + +let sub = TaskSubmission::new("sync") + .payload_json(&data) + .ttl(Duration::from_secs(300)) // expire after 5 minutes + .ttl_from(TtlFrom::Submission); // clock starts at submit time (default) +``` + +`TtlFrom::FirstAttempt` starts the clock when the task is first dispatched — useful when queue wait time shouldn't count against the deadline. + +### Per-type TTL + +Register a default TTL for all tasks of a given type: + +```rust +use std::time::Duration; + +let scheduler = Scheduler::builder() + .executor_with_ttl("thumbnail", Arc::new(ThumbExec), Duration::from_secs(600)) + .build() + .await?; +``` + +Tasks submitted with an explicit `.ttl()` override the per-type default. + +### Global default TTL + +Catch-all for tasks without a per-task or per-type TTL: + +```rust +use std::time::Duration; + +let scheduler = Scheduler::builder() + .default_ttl(Duration::from_secs(3600)) // 1 hour + .build() + .await?; +``` + +### Expiry sweep + +The scheduler catches expired tasks in two ways: + +1. **Dispatch-time** — before dispatching a task, the scheduler checks `expires_at`. This has zero extra cost. +2. **Periodic sweep** — every `expiry_sweep_interval` (default 30s), the scheduler batch-expires pending and paused tasks. Disable with `.expiry_sweep_interval(None)`. + +### Child TTL inheritance + +Children spawned via `ctx.spawn_child()` without an explicit TTL inherit the **remaining** parent TTL. A child can never outlive its parent's deadline. When a parent expires, its pending and paused children are cascade-expired. + +### Typed tasks + +Implement `ttl()` and `ttl_from()` on your `TypedTask`: + +```rust +use std::time::Duration; +use taskmill::{TypedTask, TtlFrom}; + +impl TypedTask for SyncTask { + const TASK_TYPE: &'static str = "sync"; + fn ttl(&self) -> Option { Some(Duration::from_secs(300)) } + fn ttl_from(&self) -> TtlFrom { TtlFrom::FirstAttempt } +} +``` + ## Application state Executors often need shared services (HTTP clients, database connections, caches). Rather than capturing `Arc` per executor, register state on the builder: @@ -215,12 +291,16 @@ All `SchedulerBuilder` methods: | `store(store)` | Use a pre-opened `TaskStore`. | | `store_config(config)` | Pool size and retention settings. | | `executor(name, executor)` | Register a `TaskExecutor` by name. | +| `executor_with_ttl(name, executor, ttl)` | Register with a per-type default TTL. | | `typed_executor::(executor)` | Register using `T::TASK_TYPE` as the name. | | `max_concurrency(n)` | Set initial max concurrent tasks. | | `max_retries(n)` | Set retry limit. | | `preempt_priority(p)` | Set preemption threshold. | | `poll_interval(d)` | Set dispatch cycle interval. | | `shutdown_mode(mode)` | Set shutdown behavior. | +| `default_ttl(d)` | Global TTL for tasks without per-task or per-type TTL. | +| `expiry_sweep_interval(opt_d)` | How often to sweep for expired tasks (`None` to disable). | +| `cancel_hook_timeout(d)` | Timeout for `on_cancel` hooks. | | `pressure_source(source)` | Add a `PressureSource` to the composite. | | `throttle_policy(policy)` | Set a custom `ThrottlePolicy`. | | `with_resource_monitoring()` | Enable platform resource monitoring. | diff --git a/docs/design.md b/docs/design.md index 8abd98d..99fbb53 100644 --- a/docs/design.md +++ b/docs/design.md @@ -47,10 +47,12 @@ Submit ──► Pending ──► Running ──► Completed (moved to task_h │ │ │ └──► Paused (preempted by higher-priority work) │ │ - └─────────────────┘ (resumed when preemptors finish) + ├─────────────────┘ (resumed when preemptors finish) + │ + └──► Expired (TTL exceeded → task_history) ``` -Active states live in the `tasks` table. Terminal states (`completed`, `failed`) are atomically moved to `task_history`. +Active states live in the `tasks` table. Terminal states (`completed`, `failed`, `cancelled`, `superseded`, `expired`) are atomically moved to `task_history`. ### Data flow @@ -83,7 +85,10 @@ The scheduler's dispatch loop uses a two-step approach: first *peek* at the next ```mermaid flowchart TD - PEEK["peek_next()\n(non-mutating)"] --> GATE{"gate.admit()\nbackpressure + IO budget"} + PEEK["peek_next()\n(non-mutating)"] --> TTL{"expires_at\npassed?"} + TTL -- yes --> EXPIRE["expire_single()\n→ task_history"] + EXPIRE --> PEEK + TTL -- no --> GATE{"gate.admit()\nbackpressure + IO budget"} GATE -- rejected --> WAIT["Wait for next cycle"] GATE -- admitted --> POP["pop_by_id()\n(atomic claim)"] POP -- claimed --> SPAWN["spawn_task()"] @@ -159,9 +164,10 @@ The run loop wakes on two signals: Each cycle, the loop: 1. Checks if the scheduler is globally paused. -2. Resumes paused tasks if no active preemptors remain. -3. While `active_count < max_concurrency`: peek the next candidate, check the dispatch gate, pop-by-id if admitted, spawn the executor. -4. Sleep until the next signal. +2. Sweeps expired tasks (if the expiry sweep interval has elapsed). +3. Resumes paused tasks if no active preemptors remain. +4. While `active_count < max_concurrency`: peek the next candidate, check for TTL expiry, check the dispatch gate, pop-by-id if admitted, spawn the executor. +5. Sleep until the next signal. ## Retry flow diff --git a/docs/glossary.md b/docs/glossary.md index e53d823..6f75d01 100644 --- a/docs/glossary.md +++ b/docs/glossary.md @@ -14,4 +14,6 @@ Quick reference for terms used throughout the taskmill documentation. | **Pressure source** | Anything that signals the system is busy — disk IO, network throughput, memory usage, API rate limits, battery level. Returns a value from 0.0 (idle) to 1.0 (saturated). See [IO & Backpressure](io-and-backpressure.md#pressure-sources). | | **Task group** | A named set of tasks that share a concurrency limit. For example, you might limit uploads to a specific S3 bucket to 3 at a time. See [Priorities & Preemption](priorities-and-preemption.md#task-groups). | | **Throttle policy** | Rules that map system pressure to dispatch decisions. The default policy defers background tasks when pressure exceeds 50% and normal tasks when it exceeds 75%, but never blocks high-priority work. See [Priorities & Preemption](priorities-and-preemption.md#throttle-behavior). | +| **TTL (time-to-live)** | A duration after which a task automatically expires if it hasn't started running. Configurable per-task, per-type, or as a global default. See [Configuration](configuration.md#task-ttl-time-to-live). | +| **TtlFrom** | Controls when the TTL clock starts: `Submission` (at submit time, the default) or `FirstAttempt` (when the task is first dispatched). See [Configuration](configuration.md#task-ttl-time-to-live). | | **Typed task** | A struct that implements the `TypedTask` trait, giving you compile-time type safety for task payloads, priorities, and IO budgets instead of stringly-typed submissions. See [Quick Start](quick-start.md#typed-tasks). | diff --git a/docs/guides/tauri-upload-queue.md b/docs/guides/tauri-upload-queue.md index 5af5395..8853e02 100644 --- a/docs/guides/tauri-upload-queue.md +++ b/docs/guides/tauri-upload-queue.md @@ -60,6 +60,11 @@ impl TypedTask for UploadTask { Some(self.file_path.clone()) } + fn ttl(&self) -> Option { + // Expire uploads that haven't started within 30 minutes + Some(std::time::Duration::from_secs(30 * 60)) + } + fn label(&self) -> Option { // Human-readable name for the UI let filename = std::path::Path::new(&self.file_path) @@ -288,3 +293,16 @@ If your upload target supports resumable uploads, you can store the upload sessi ### Duplicate uploads Handled automatically. The `key()` method on `UploadTask` returns the file path, so submitting the same file twice returns `SubmitOutcome::Duplicate`. The UI can show a "already queued" message. + +### Stale uploads + +The `ttl()` method on `UploadTask` expires queued uploads that haven't started within 30 minutes. Listen for `TaskExpired` events to notify the user: + +```rust +SchedulerEvent::TaskExpired { header, age } => { + notify_user(&format!( + "{} expired after {:.0}s in the queue", + header.label, age.as_secs_f64(), + )); +} +``` diff --git a/docs/persistence-and-recovery.md b/docs/persistence-and-recovery.md index 54ca8b3..c268bfc 100644 --- a/docs/persistence-and-recovery.md +++ b/docs/persistence-and-recovery.md @@ -117,6 +117,25 @@ let deleted = store.prune_history_by_count(5_000).await?; let deleted = store.prune_history_by_age(30).await?; ``` +## Task TTL and expiry + +Tasks with a TTL that haven't started running before their deadline are automatically expired. Expired tasks are moved to the history table with `HistoryStatus::Expired` — they are never silently deleted. + +### What happens on expiry + +1. The task is removed from the active `tasks` table. +2. A history record is created with `status = 'expired'`. +3. A `TaskExpired` event is emitted with the task header and age. +4. If the expired task has pending or paused children, they are cascade-expired too. + +### TTL and crash recovery + +TTL deadlines (`expires_at`) are persisted in SQLite, so they survive crashes. After a restart, the scheduler's first expiry sweep picks up any tasks that expired while the process was down. + +### TTL and deduplication + +When a task expires, its dedup key is freed (since it moves to history). The same work can be submitted again after expiry. + ## Testing with in-memory stores For tests, use an in-memory database that doesn't touch the filesystem: @@ -150,8 +169,13 @@ You normally don't need to know the schema, but it's documented here for debuggi | `last_error` | TEXT | Most recent error message | | `created_at` | TEXT | ISO 8601 timestamp | | `started_at` | TEXT | Set when dispatched, cleared on pause | +| `ttl_seconds` | INTEGER | TTL duration in seconds (NULL = no TTL) | +| `ttl_from` | TEXT DEFAULT 'submission' | When TTL clock starts: `submission` or `first_attempt` | +| `expires_at` | TEXT | ISO 8601 deadline (NULL = no expiry) | -**Index:** `idx_tasks_pending(status, priority ASC, id ASC) WHERE status = 'pending'` — partial index for fast priority-ordered dispatch. +**Indexes:** +- `idx_tasks_pending(status, priority ASC, id ASC) WHERE status = 'pending'` — fast priority-ordered dispatch. +- `idx_tasks_expires(expires_at ASC) WHERE expires_at IS NOT NULL AND status IN ('pending', 'paused')` — efficient expiry sweep. ### `task_history` — completed and failed tasks @@ -165,7 +189,10 @@ All columns from `tasks`, plus: | `actual_net_tx_bytes` | INTEGER | Reported by executor | | `completed_at` | TEXT | ISO 8601 timestamp | | `duration_ms` | INTEGER | Wall-clock duration | -| `status` | TEXT | `completed` or `failed` | +| `status` | TEXT | `completed`, `failed`, `cancelled`, `superseded`, or `expired` | +| `ttl_seconds` | INTEGER | TTL duration in seconds (NULL = no TTL) | +| `ttl_from` | TEXT DEFAULT 'submission' | When TTL clock started | +| `expires_at` | TEXT | ISO 8601 deadline (NULL = no expiry) | **Index:** `idx_history_type_completed(task_type, completed_at DESC) WHERE status = 'completed'` — for per-type history queries and throughput calculations. diff --git a/docs/progress-and-events.md b/docs/progress-and-events.md index 158cdf8..10b402c 100644 --- a/docs/progress-and-events.md +++ b/docs/progress-and-events.md @@ -84,6 +84,7 @@ tokio::spawn(async move { | `Cancelled(TaskEventHeader)` | Task cancelled via `scheduler.cancel()` | | `Progress { header, percent, message }` | Progress update from executor | | `Waiting { task_id, children_count }` | Parent task waiting for children to complete | +| `TaskExpired { header, age }` | Task expired (TTL exceeded) — `age` is the time since the TTL clock started | | `Paused` | Scheduler globally paused via `pause_all()` | | `Resumed` | Scheduler resumed via `resume_all()` | @@ -98,6 +99,7 @@ Task-specific events share a `TaskEventHeader` with `task_id`, `task_type`, `key | Error alerting | `Failed` where `will_retry` is false | | A "pause/resume" button | `Paused`, `Resumed` | | Upload status indicators | `Dispatched`, `Progress`, `Completed`, `Failed`, `Preempted` | +| Stale task cleanup UI | `TaskExpired` | ## Querying progress diff --git a/docs/query-apis.md b/docs/query-apis.md index cc54fe2..4d05f43 100644 --- a/docs/query-apis.md +++ b/docs/query-apis.md @@ -34,6 +34,8 @@ Use these queries to build dashboards, debug stuck tasks, and gather analytics a | `history_by_key(key)` | `Vec` | All past runs matching a dedup key. | | `failed_tasks(limit)` | `Vec` | Recent failures with error messages. | +History records include a `status` field that can be `completed`, `failed`, `cancelled`, `superseded`, or `expired`. Filter by status to find expired tasks (e.g., for analytics on TTL effectiveness). + ## Aggregate queries | Method | Returns | Description | diff --git a/docs/quick-start.md b/docs/quick-start.md index 71c3ad3..0a9c0e2 100644 --- a/docs/quick-start.md +++ b/docs/quick-start.md @@ -141,6 +141,10 @@ impl TypedTask for ResizeTask { fn expected_io(&self) -> IoBudget { IoBudget::disk(4096, 1024) } fn priority(&self) -> Priority { Priority::NORMAL } + + // Optional: expire if not started within 10 minutes. + // fn ttl(&self) -> Option { Some(Duration::from_secs(600)) } + // fn ttl_from(&self) -> TtlFrom { TtlFrom::Submission } } // Submit: diff --git a/src/lib.rs b/src/lib.rs index fffe7f3..4f2787f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -17,6 +17,7 @@ //! - Reports [byte-level transfer progress](TaskProgress) with EWMA-smoothed throughput and ETA //! - Supports [task cancellation](Scheduler::cancel) with history recording and cleanup hooks //! - Supports [task superseding](DuplicateStrategy::Supersede) for atomic cancel-and-replace +//! - Supports [task TTL](TtlFrom) with automatic expiry, per-type defaults, and child inheritance //! - Supports [graceful shutdown](ShutdownMode) with configurable drain timeout //! //! # Concepts @@ -30,6 +31,7 @@ //! ↓ ↘ paused ↗ ↘ failed (retryable → pending) //! cancelled ↘ failed (permanent → history) //! superseded ↘ cancelled (via cancel() or supersede) +//! expired (TTL) ↘ expired (TTL, cascade to children) //! ``` //! //! 1. **Submit** — [`Scheduler::submit`] (or [`submit_typed`](Scheduler::submit_typed), @@ -116,6 +118,31 @@ //! [`fail_fast`](TaskSubmission::fail_fast) is `true` (the default), siblings //! are cancelled and the parent fails immediately. //! +//! ## Task TTL & automatic expiry +//! +//! Tasks can be given a time-to-live (TTL) so they expire automatically if they +//! haven't started running within the allowed window. TTL is resolved with a +//! priority chain: **per-task** > **per-type** > **global default** > none. +//! +//! The TTL clock can start at submission time ([`TtlFrom::Submission`], the +//! default) or when the task is first dispatched ([`TtlFrom::FirstAttempt`]). +//! Expired tasks are moved to history with [`HistoryStatus::Expired`] and a +//! [`SchedulerEvent::TaskExpired`] event is emitted. +//! +//! The scheduler catches expired tasks in two places: +//! - **Dispatch time** — when a task is about to be dispatched, its `expires_at` +//! is checked first. +//! - **Periodic sweep** — a background sweep (default every 30s, configurable +//! via [`SchedulerBuilder::expiry_sweep_interval`]) batch-expires pending and +//! paused tasks whose deadline has passed. +//! +//! When a parent task expires, its pending and paused children are +//! cascade-expired. +//! +//! Child tasks without an explicit TTL inherit the **remaining** parent TTL +//! (with `TtlFrom::Submission`), so a child can never outlive its parent's +//! deadline. +//! //! ## Cancellation //! //! Tasks can be cancelled individually via [`Scheduler::cancel`], or in bulk @@ -418,6 +445,47 @@ //! scheduler.cancel_where(|t| t.priority == Priority::BACKGROUND).await?; //! ``` //! +//! ## Task TTL +//! +//! Set a TTL on individual submissions, on a task type, or as a global default: +//! +//! ```ignore +//! use std::time::Duration; +//! use taskmill::{Scheduler, TaskSubmission, TtlFrom}; +//! +//! // Per-task TTL — expire if not started within 5 minutes. +//! let sub = TaskSubmission::new("sync") +//! .payload_json(&data) +//! .ttl(Duration::from_secs(300)) +//! .ttl_from(TtlFrom::Submission); +//! scheduler.submit(&sub).await?; +//! +//! // Per-type default — every "thumbnail" task gets a 10-minute TTL +//! // unless overridden per-task. +//! let scheduler = Scheduler::builder() +//! .store_path("tasks.db") +//! .executor_with_ttl("thumbnail", Arc::new(ThumbExec), Duration::from_secs(600)) +//! .build() +//! .await?; +//! +//! // Global default — catch-all for any task without a per-task or per-type TTL. +//! let scheduler = Scheduler::builder() +//! .store_path("tasks.db") +//! .default_ttl(Duration::from_secs(3600)) // 1 hour +//! .build() +//! .await?; +//! ``` +//! +//! For typed tasks, implement [`TypedTask::ttl`] and [`TypedTask::ttl_from`]: +//! +//! ```ignore +//! impl TypedTask for SyncTask { +//! const TASK_TYPE: &'static str = "sync"; +//! fn ttl(&self) -> Option { Some(Duration::from_secs(300)) } +//! fn ttl_from(&self) -> TtlFrom { TtlFrom::FirstAttempt } +//! } +//! ``` +//! //! ## Task superseding //! //! Use [`DuplicateStrategy::Supersede`] for "latest-value-wins" scenarios @@ -445,16 +513,19 @@ //! [`Notify`](tokio::sync::Notify)), the //! [`poll_interval`](SchedulerBuilder::poll_interval) elapsed (default //! 500ms), or the cancellation token fired. -//! 2. Paused tasks are resumed if no active preemptors exist at their +//! 2. Expired tasks are swept (if the sweep interval has elapsed). +//! 3. Paused tasks are resumed if no active preemptors exist at their //! priority level. -//! 3. Pending finalizers (parents whose children all completed) are +//! 4. Pending finalizers (parents whose children all completed) are //! dispatched first. -//! 4. The highest-priority pending task is peeked (without claiming it). -//! 5. The dispatch gate checks concurrency limits, group concurrency, +//! 5. The highest-priority pending task is peeked (without claiming it). +//! If it has expired, it is moved to history and the next candidate is +//! tried. +//! 6. The dispatch gate checks concurrency limits, group concurrency, //! IO budget, and backpressure. If the gate rejects, no slot is consumed. -//! 6. If admitted, the task is atomically claimed (`peek` → `pop_by_id`) +//! 7. If admitted, the task is atomically claimed (`peek` → `pop_by_id`) //! and spawned as a Tokio task. -//! 7. Steps 4–6 repeat until the queue is empty or the gate rejects. +//! 8. Steps 5–7 repeat until the queue is empty or the gate rejects. //! //! # Feature flags //!