Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 89 additions & 11 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
//! Taskmill provides a generic task scheduling system that:
//! - Persists tasks to SQLite so the queue survives restarts
//! - Schedules by priority (0 = highest, 255 = lowest) with [named tiers](Priority)
//! - Deduplicates tasks by key — submitting an already-queued key is a no-op
//! - Deduplicates tasks by key with configurable [`DuplicateStrategy`] (skip, supersede, or reject)
//! - Tracks expected and actual IO bytes (disk and network) per task for budget-based scheduling
//! - Monitors system CPU, disk, and network throughput to adjust concurrency
//! - Supports [composable backpressure](PressureSource) from arbitrary external sources
Expand All @@ -15,6 +15,8 @@
//! - Supports [batch submission](Scheduler::submit_batch) with intra-batch dedup and chunking
//! - Emits [lifecycle events](SchedulerEvent) including progress for UI integration
//! - Reports [byte-level transfer progress](TaskProgress) with EWMA-smoothed throughput and ETA
//! - Supports [task cancellation](Scheduler::cancel) with history recording and cleanup hooks
//! - Supports [task superseding](DuplicateStrategy::Supersede) for atomic cancel-and-replace
//! - Supports [graceful shutdown](ShutdownMode) with configurable drain timeout
//!
//! # Concepts
Expand All @@ -25,8 +27,9 @@
//!
//! ```text
//! submit → pending → running → completed
//! ↘ paused ↗ ↘ failed (retryable → pending)
//! ↘ failed (permanent → history)
//! ↓ ↘ paused ↗ ↘ failed (retryable → pending)
//! cancelled ↘ failed (permanent → history)
//! superseded ↘ cancelled (via cancel() or supersede)
//! ```
//!
//! 1. **Submit** — [`Scheduler::submit`] (or [`submit_typed`](Scheduler::submit_typed),
Expand All @@ -43,13 +46,22 @@
//! [`SchedulerBuilder::max_retries`]); a non-retryable error moves it to
//! history as failed.
//!
//! ## Deduplication
//! ## Deduplication & duplicate strategies
//!
//! Every task has a dedup key derived from its type name and either an explicit
//! key string or the serialized payload (via SHA-256). Submitting a task whose
//! key already exists returns [`SubmitOutcome::Duplicate`] (or
//! [`Upgraded`](SubmitOutcome::Upgraded) if the new submission has higher
//! priority). This makes it safe to call `submit` idempotently.
//! key string or the serialized payload (via SHA-256). What happens when a
//! submission's key matches an existing task depends on the
//! [`DuplicateStrategy`]:
//!
//! - **`Skip`** (default) — attempt a priority upgrade or requeue, otherwise
//! return [`SubmitOutcome::Duplicate`]. Safe for idempotent `submit` calls.
//! - **`Supersede`** — cancel the existing task (recording it in history as
//! [`HistoryStatus::Superseded`]) and replace it with the new submission.
//! For running tasks the cancellation token is fired, the
//! [`on_cancel`](TaskExecutor::on_cancel) hook runs, and children are
//! cascade-cancelled. Returns [`SubmitOutcome::Superseded`].
//! - **`Reject`** — return [`SubmitOutcome::Rejected`] without modifying the
//! existing task.
//!
//! Within a single [`submit_batch`](Scheduler::submit_batch) call, intra-batch
//! dedup applies a **last-wins** policy: if two tasks share a dedup key, only
Expand Down Expand Up @@ -104,6 +116,22 @@
//! [`fail_fast`](TaskSubmission::fail_fast) is `true` (the default), siblings
//! are cancelled and the parent fails immediately.
//!
//! ## Cancellation
//!
//! Tasks can be cancelled individually via [`Scheduler::cancel`], or in bulk
//! via [`Scheduler::cancel_group`], [`Scheduler::cancel_type`], or
//! [`Scheduler::cancel_where`]. Cancelled tasks are recorded in the history
//! table as [`HistoryStatus::Cancelled`] rather than silently deleted.
//!
//! For running tasks, cancellation fires the
//! [`on_cancel`](TaskExecutor::on_cancel) hook (with a configurable
//! [`cancel_hook_timeout`](SchedulerBuilder::cancel_hook_timeout)) so
//! executors can clean up external resources — for example, aborting an S3
//! multipart upload. Executors can check for cancellation cooperatively via
//! [`TaskContext::check_cancelled`].
//!
//! Cancelling a parent task cascade-cancels all its children.
//!
//! ## Byte-level progress
//!
//! For long-running transfers (file copies, uploads, downloads), executors can
Expand Down Expand Up @@ -359,6 +387,56 @@
//! }
//! ```
//!
//! ## Cancellation & cleanup hooks
//!
//! Cancel tasks individually or in bulk. Implement
//! [`on_cancel`](TaskExecutor::on_cancel) to clean up external resources:
//!
//! ```ignore
//! impl TaskExecutor for UploadExecutor {
//! async fn execute<'a>(&'a self, ctx: &'a TaskContext) -> Result<(), TaskError> {
//! // Cooperatively check for cancellation in long loops.
//! for chunk in chunks {
//! ctx.check_cancelled()?;
//! upload_chunk(chunk).await?;
//! }
//! Ok(())
//! }
//!
//! async fn on_cancel<'a>(&'a self, ctx: &'a TaskContext) -> Result<(), TaskError> {
//! // Abort the in-progress multipart upload.
//! let upload: Upload = ctx.payload()?;
//! abort_multipart(&upload.upload_id).await?;
//! Ok(())
//! }
//! }
//!
//! // Cancel by ID, group, type, or predicate:
//! scheduler.cancel(task_id).await?;
//! scheduler.cancel_group("s3://my-bucket").await?;
//! scheduler.cancel_type("upload").await?;
//! scheduler.cancel_where(|t| t.priority == Priority::BACKGROUND).await?;
//! ```
//!
//! ## Task superseding
//!
//! Use [`DuplicateStrategy::Supersede`] for "latest-value-wins" scenarios
//! like continuous file sync, where re-submitting an already-queued task
//! should atomically cancel the old one and replace it:
//!
//! ```ignore
//! use taskmill::{TaskSubmission, DuplicateStrategy};
//!
//! let sub = TaskSubmission::new("sync-file")
//! .key("path/to/file.txt")
//! .payload_json(&new_content)
//! .on_duplicate(DuplicateStrategy::Supersede);
//!
//! let outcome = scheduler.submit(&sub).await?;
//! // outcome is Superseded { new_task_id, replaced_task_id } if a duplicate existed,
//! // or Inserted(id) if this was the first submission.
//! ```
//!
//! # How the dispatch loop works
//!
//! Understanding the run loop helps when tuning [`SchedulerConfig`]:
Expand Down Expand Up @@ -407,9 +485,9 @@ pub use scheduler::{
};
pub use store::{RetentionPolicy, StoreConfig, StoreError, TaskStore};
pub use task::{
generate_dedup_key, BatchOutcome, BatchSubmission, HistoryStatus, IoBudget, ParentResolution,
SubmitOutcome, TaskError, TaskHistoryRecord, TaskLookup, TaskRecord, TaskStatus,
TaskSubmission, TypeStats, TypedTask,
generate_dedup_key, BatchOutcome, BatchSubmission, DuplicateStrategy, HistoryStatus, IoBudget,
ParentResolution, SubmitOutcome, TaskError, TaskHistoryRecord, TaskLookup, TaskRecord,
TaskStatus, TaskSubmission, TypeStats, TypedTask,
};

#[cfg(feature = "sysinfo-monitor")]
Expand Down
11 changes: 10 additions & 1 deletion src/scheduler/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,13 @@ pub enum SchedulerEvent {
Preempted(TaskEventHeader),
/// A task was cancelled by the application.
Cancelled(TaskEventHeader),
/// A task was superseded by a new submission with the same dedup key.
Superseded {
/// Header of the old (replaced) task.
old: TaskEventHeader,
/// ID of the newly inserted replacement task.
new_task_id: i64,
},
/// Progress update from a running task.
Progress {
header: TaskEventHeader,
Expand Down Expand Up @@ -106,7 +113,9 @@ impl SchedulerEvent {
Self::Dispatched(h) | Self::Completed(h) | Self::Preempted(h) | Self::Cancelled(h) => {
Some(h)
}
Self::Failed { header, .. } | Self::Progress { header, .. } => Some(header),
Self::Failed { header, .. }
| Self::Progress { header, .. }
| Self::Superseded { old: header, .. } => Some(header),
Self::Waiting { .. } | Self::BatchSubmitted { .. } | Self::Paused | Self::Resumed => {
None
}
Expand Down
2 changes: 1 addition & 1 deletion src/scheduler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
//! integration. Use [`SchedulerBuilder`] for ergonomic construction.
//!
//! The `Scheduler` implementation is split across focused submodules:
//! - [`submit`] — task submission, lookup, and cancellation
//! - [`submit`] — task submission, lookup, cancellation, and superseding
//! - [`run_loop`] — the main event loop, dispatch, and shutdown
//! - [`control`] — pause/resume, concurrency limits, and group limits
//! - [`queries`] — read-only queries (active tasks, progress, snapshots)
Expand Down
86 changes: 83 additions & 3 deletions src/scheduler/submit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,29 @@ impl Scheduler {
pub async fn submit(&self, sub: &TaskSubmission) -> Result<SubmitOutcome, StoreError> {
let outcome = self.inner.store.submit(sub).await?;

if !matches!(outcome, SubmitOutcome::Duplicate) {
// Handle superseded tasks.
if let SubmitOutcome::Superseded {
new_task_id,
replaced_task_id,
} = &outcome
{
self.handle_superseded_active(*replaced_task_id).await;
// Emit superseded event — we need the old record's header.
// The old task is already in history, so build header from
// submission info.
let old_header = super::event::TaskEventHeader {
task_id: *replaced_task_id,
task_type: sub.task_type.clone(),
key: sub.effective_key(),
label: sub.label.clone(),
};
let _ = self.inner.event_tx.send(SchedulerEvent::Superseded {
old: old_header,
new_task_id: *new_task_id,
});
}

if !matches!(outcome, SubmitOutcome::Duplicate | SubmitOutcome::Rejected) {
// Preempt if this is a high-priority task.
if sub.priority.value() <= self.inner.preempt_priority.value() {
self.inner
Expand Down Expand Up @@ -52,18 +74,41 @@ impl Scheduler {
) -> Result<BatchOutcome, StoreError> {
let results = self.inner.store.submit_batch(submissions).await?;

// Handle superseded tasks.
for (sub, outcome) in submissions.iter().zip(results.iter()) {
if let SubmitOutcome::Superseded {
new_task_id,
replaced_task_id,
} = outcome
{
self.handle_superseded_active(*replaced_task_id).await;
let old_header = super::event::TaskEventHeader {
task_id: *replaced_task_id,
task_type: sub.task_type.clone(),
key: sub.effective_key(),
label: sub.label.clone(),
};
let _ = self.inner.event_tx.send(SchedulerEvent::Superseded {
old: old_header,
new_task_id: *new_task_id,
});
}
}

// Find the highest (lowest numeric value) priority among tasks that
// were inserted or had their priority upgraded.
let best_priority = submissions
.iter()
.zip(results.iter())
.filter(|(_, outcome)| !matches!(outcome, SubmitOutcome::Duplicate))
.filter(|(_, outcome)| {
!matches!(outcome, SubmitOutcome::Duplicate | SubmitOutcome::Rejected)
})
.map(|(sub, _)| sub.priority)
.min_by_key(|p| p.value());

let any_changed = results
.iter()
.any(|o| !matches!(o, SubmitOutcome::Duplicate));
.any(|o| !matches!(o, SubmitOutcome::Duplicate | SubmitOutcome::Rejected));

if let Some(priority) = best_priority {
if priority.value() <= self.inner.preempt_priority.value() {
Expand Down Expand Up @@ -241,6 +286,41 @@ impl Scheduler {
Ok(cancelled)
}

/// Handle the scheduler-side effects of a superseded task.
///
/// If the replaced task was running (in the active map), cancel its token,
/// fire the on_cancel hook, cascade-cancel its children, and emit events.
async fn handle_superseded_active(&self, replaced_task_id: i64) {
// Cancel children of the replaced task.
if let Ok(running_child_ids) = self.inner.store.cancel_children(replaced_task_id).await {
for child_id in &running_child_ids {
if let Some(at) = self.inner.active.remove(*child_id) {
at.token.cancel();
let _ = self
.inner
.store
.cancel_to_history_with_record(&at.record)
.await;
self.fire_on_cancel(&at.record).await;
let _ = self
.inner
.event_tx
.send(SchedulerEvent::Cancelled(at.record.event_header()));
}
}
}

// Cancel the replaced task itself if it's running.
if let Some(at) = self.inner.active.remove(replaced_task_id) {
at.token.cancel();
self.fire_on_cancel(&at.record).await;
let _ = self
.inner
.event_tx
.send(SchedulerEvent::Cancelled(at.record.event_header()));
}
}

/// Fire the `on_cancel` hook for a task (fire-and-forget).
///
/// Takes a snapshot of the app state and spawns a tokio task that runs
Expand Down
Loading
Loading