Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ async fn main() {
- **Stay responsive** — IO-aware scheduling defers work when disk or network throughput is saturated
- **Prioritize what matters** — 256-level priority queue with preemption lets urgent work interrupt background tasks
- **Show progress** — executor-reported and throughput-extrapolated progress for real-time UI updates
- **Expire stale work** — configurable TTL (per-task, per-type, or global) automatically expires tasks that haven't started in time
- **Avoid duplicate work** — key-based deduplication prevents the same task from being queued twice
- **React to system load** — composable backpressure from any signal (disk, network, memory, battery, API limits)
- **Control concurrency** — per-group limits (e.g., per S3 bucket), global limits, runtime-adjustable
Expand Down
80 changes: 80 additions & 0 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ Controls scheduling behavior. Set via builder methods or pass directly to `Sched
| `poll_interval` | `Duration` | 500ms | Sleep between dispatch cycles. The scheduler also wakes immediately on submit. | Lower = more responsive but slightly more CPU. 250ms is fine for interactive apps. |
| `throughput_sample_size` | `i32` | 20 | Recent completions used for progress extrapolation. | More = smoother estimates but slower to adapt to changes in task behavior. |
| `shutdown_mode` | `ShutdownMode` | `Hard` | `Hard` cancels immediately. `Graceful(Duration)` waits for running tasks. | Always use `Graceful` for desktop apps to avoid data loss. |
| `default_ttl` | `Option<Duration>` | `None` | Global TTL applied to tasks without a per-task or per-type TTL. | Set to catch stale tasks (e.g., `Duration::from_secs(3600)` for 1 hour). |
| `expiry_sweep_interval` | `Option<Duration>` | `Some(30s)` | How often the scheduler sweeps for expired tasks. `None` disables periodic sweeps (dispatch-time checks still apply). | Lower for latency-sensitive expiry; `None` if you only need dispatch-time checks. |

### Builder methods

Expand Down Expand Up @@ -132,6 +134,80 @@ When the scheduler stops (the `CancellationToken` passed to `run()` is cancelled

Both modes stop the resource sampler. **For desktop apps, always use `Graceful` to avoid interrupting in-progress uploads or file operations.**

## Task TTL (time-to-live)

Tasks can expire automatically if they haven't started running within a configurable duration. TTL is resolved with a priority chain: **per-task > per-type > global default > none**.

### Per-task TTL

Set directly on a submission:

```rust
use std::time::Duration;
use taskmill::{TaskSubmission, TtlFrom};

let sub = TaskSubmission::new("sync")
.payload_json(&data)
.ttl(Duration::from_secs(300)) // expire after 5 minutes
.ttl_from(TtlFrom::Submission); // clock starts at submit time (default)
```

`TtlFrom::FirstAttempt` starts the clock when the task is first dispatched — useful when queue wait time shouldn't count against the deadline.

### Per-type TTL

Register a default TTL for all tasks of a given type:

```rust
use std::time::Duration;

let scheduler = Scheduler::builder()
.executor_with_ttl("thumbnail", Arc::new(ThumbExec), Duration::from_secs(600))
.build()
.await?;
```

Tasks submitted with an explicit `.ttl()` override the per-type default.

### Global default TTL

Catch-all for tasks without a per-task or per-type TTL:

```rust
use std::time::Duration;

let scheduler = Scheduler::builder()
.default_ttl(Duration::from_secs(3600)) // 1 hour
.build()
.await?;
```

### Expiry sweep

The scheduler catches expired tasks in two ways:

1. **Dispatch-time** — before dispatching a task, the scheduler checks `expires_at`. This has zero extra cost.
2. **Periodic sweep** — every `expiry_sweep_interval` (default 30s), the scheduler batch-expires pending and paused tasks. Disable with `.expiry_sweep_interval(None)`.

### Child TTL inheritance

Children spawned via `ctx.spawn_child()` without an explicit TTL inherit the **remaining** parent TTL. A child can never outlive its parent's deadline. When a parent expires, its pending and paused children are cascade-expired.

### Typed tasks

Implement `ttl()` and `ttl_from()` on your `TypedTask`:

```rust
use std::time::Duration;
use taskmill::{TypedTask, TtlFrom};

impl TypedTask for SyncTask {
const TASK_TYPE: &'static str = "sync";
fn ttl(&self) -> Option<Duration> { Some(Duration::from_secs(300)) }
fn ttl_from(&self) -> TtlFrom { TtlFrom::FirstAttempt }
}
```

## Application state

Executors often need shared services (HTTP clients, database connections, caches). Rather than capturing `Arc<T>` per executor, register state on the builder:
Expand Down Expand Up @@ -215,12 +291,16 @@ All `SchedulerBuilder` methods:
| `store(store)` | Use a pre-opened `TaskStore`. |
| `store_config(config)` | Pool size and retention settings. |
| `executor(name, executor)` | Register a `TaskExecutor` by name. |
| `executor_with_ttl(name, executor, ttl)` | Register with a per-type default TTL. |
| `typed_executor::<T>(executor)` | Register using `T::TASK_TYPE` as the name. |
| `max_concurrency(n)` | Set initial max concurrent tasks. |
| `max_retries(n)` | Set retry limit. |
| `preempt_priority(p)` | Set preemption threshold. |
| `poll_interval(d)` | Set dispatch cycle interval. |
| `shutdown_mode(mode)` | Set shutdown behavior. |
| `default_ttl(d)` | Global TTL for tasks without per-task or per-type TTL. |
| `expiry_sweep_interval(opt_d)` | How often to sweep for expired tasks (`None` to disable). |
| `cancel_hook_timeout(d)` | Timeout for `on_cancel` hooks. |
| `pressure_source(source)` | Add a `PressureSource` to the composite. |
| `throttle_policy(policy)` | Set a custom `ThrottlePolicy`. |
| `with_resource_monitoring()` | Enable platform resource monitoring. |
Expand Down
18 changes: 12 additions & 6 deletions docs/design.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,12 @@ Submit ──► Pending ──► Running ──► Completed (moved to task_h
│ │
│ └──► Paused (preempted by higher-priority work)
│ │
└─────────────────┘ (resumed when preemptors finish)
├─────────────────┘ (resumed when preemptors finish)
└──► Expired (TTL exceeded → task_history)
```

Active states live in the `tasks` table. Terminal states (`completed`, `failed`) are atomically moved to `task_history`.
Active states live in the `tasks` table. Terminal states (`completed`, `failed`, `cancelled`, `superseded`, `expired`) are atomically moved to `task_history`.

### Data flow

Expand Down Expand Up @@ -83,7 +85,10 @@ The scheduler's dispatch loop uses a two-step approach: first *peek* at the next

```mermaid
flowchart TD
PEEK["peek_next()\n(non-mutating)"] --> GATE{"gate.admit()\nbackpressure + IO budget"}
PEEK["peek_next()\n(non-mutating)"] --> TTL{"expires_at\npassed?"}
TTL -- yes --> EXPIRE["expire_single()\n→ task_history"]
EXPIRE --> PEEK
TTL -- no --> GATE{"gate.admit()\nbackpressure + IO budget"}
GATE -- rejected --> WAIT["Wait for next cycle"]
GATE -- admitted --> POP["pop_by_id()\n(atomic claim)"]
POP -- claimed --> SPAWN["spawn_task()"]
Expand Down Expand Up @@ -159,9 +164,10 @@ The run loop wakes on two signals:
Each cycle, the loop:

1. Checks if the scheduler is globally paused.
2. Resumes paused tasks if no active preemptors remain.
3. While `active_count < max_concurrency`: peek the next candidate, check the dispatch gate, pop-by-id if admitted, spawn the executor.
4. Sleep until the next signal.
2. Sweeps expired tasks (if the expiry sweep interval has elapsed).
3. Resumes paused tasks if no active preemptors remain.
4. While `active_count < max_concurrency`: peek the next candidate, check for TTL expiry, check the dispatch gate, pop-by-id if admitted, spawn the executor.
5. Sleep until the next signal.

## Retry flow

Expand Down
2 changes: 2 additions & 0 deletions docs/glossary.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,6 @@ Quick reference for terms used throughout the taskmill documentation.
| **Pressure source** | Anything that signals the system is busy — disk IO, network throughput, memory usage, API rate limits, battery level. Returns a value from 0.0 (idle) to 1.0 (saturated). See [IO & Backpressure](io-and-backpressure.md#pressure-sources). |
| **Task group** | A named set of tasks that share a concurrency limit. For example, you might limit uploads to a specific S3 bucket to 3 at a time. See [Priorities & Preemption](priorities-and-preemption.md#task-groups). |
| **Throttle policy** | Rules that map system pressure to dispatch decisions. The default policy defers background tasks when pressure exceeds 50% and normal tasks when it exceeds 75%, but never blocks high-priority work. See [Priorities & Preemption](priorities-and-preemption.md#throttle-behavior). |
| **TTL (time-to-live)** | A duration after which a task automatically expires if it hasn't started running. Configurable per-task, per-type, or as a global default. See [Configuration](configuration.md#task-ttl-time-to-live). |
| **TtlFrom** | Controls when the TTL clock starts: `Submission` (at submit time, the default) or `FirstAttempt` (when the task is first dispatched). See [Configuration](configuration.md#task-ttl-time-to-live). |
| **Typed task** | A struct that implements the `TypedTask` trait, giving you compile-time type safety for task payloads, priorities, and IO budgets instead of stringly-typed submissions. See [Quick Start](quick-start.md#typed-tasks). |
18 changes: 18 additions & 0 deletions docs/guides/tauri-upload-queue.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ impl TypedTask for UploadTask {
Some(self.file_path.clone())
}

fn ttl(&self) -> Option<std::time::Duration> {
// Expire uploads that haven't started within 30 minutes
Some(std::time::Duration::from_secs(30 * 60))
}

fn label(&self) -> Option<String> {
// Human-readable name for the UI
let filename = std::path::Path::new(&self.file_path)
Expand Down Expand Up @@ -288,3 +293,16 @@ If your upload target supports resumable uploads, you can store the upload sessi
### Duplicate uploads

Handled automatically. The `key()` method on `UploadTask` returns the file path, so submitting the same file twice returns `SubmitOutcome::Duplicate`. The UI can show a "already queued" message.

### Stale uploads

The `ttl()` method on `UploadTask` expires queued uploads that haven't started within 30 minutes. Listen for `TaskExpired` events to notify the user:

```rust
SchedulerEvent::TaskExpired { header, age } => {
notify_user(&format!(
"{} expired after {:.0}s in the queue",
header.label, age.as_secs_f64(),
));
}
```
31 changes: 29 additions & 2 deletions docs/persistence-and-recovery.md
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,25 @@ let deleted = store.prune_history_by_count(5_000).await?;
let deleted = store.prune_history_by_age(30).await?;
```

## Task TTL and expiry

Tasks with a TTL that haven't started running before their deadline are automatically expired. Expired tasks are moved to the history table with `HistoryStatus::Expired` — they are never silently deleted.

### What happens on expiry

1. The task is removed from the active `tasks` table.
2. A history record is created with `status = 'expired'`.
3. A `TaskExpired` event is emitted with the task header and age.
4. If the expired task has pending or paused children, they are cascade-expired too.

### TTL and crash recovery

TTL deadlines (`expires_at`) are persisted in SQLite, so they survive crashes. After a restart, the scheduler's first expiry sweep picks up any tasks that expired while the process was down.

### TTL and deduplication

When a task expires, its dedup key is freed (since it moves to history). The same work can be submitted again after expiry.

## Testing with in-memory stores

For tests, use an in-memory database that doesn't touch the filesystem:
Expand Down Expand Up @@ -150,8 +169,13 @@ You normally don't need to know the schema, but it's documented here for debuggi
| `last_error` | TEXT | Most recent error message |
| `created_at` | TEXT | ISO 8601 timestamp |
| `started_at` | TEXT | Set when dispatched, cleared on pause |
| `ttl_seconds` | INTEGER | TTL duration in seconds (NULL = no TTL) |
| `ttl_from` | TEXT DEFAULT 'submission' | When TTL clock starts: `submission` or `first_attempt` |
| `expires_at` | TEXT | ISO 8601 deadline (NULL = no expiry) |

**Index:** `idx_tasks_pending(status, priority ASC, id ASC) WHERE status = 'pending'` — partial index for fast priority-ordered dispatch.
**Indexes:**
- `idx_tasks_pending(status, priority ASC, id ASC) WHERE status = 'pending'` — fast priority-ordered dispatch.
- `idx_tasks_expires(expires_at ASC) WHERE expires_at IS NOT NULL AND status IN ('pending', 'paused')` — efficient expiry sweep.

### `task_history` — completed and failed tasks

Expand All @@ -165,7 +189,10 @@ All columns from `tasks`, plus:
| `actual_net_tx_bytes` | INTEGER | Reported by executor |
| `completed_at` | TEXT | ISO 8601 timestamp |
| `duration_ms` | INTEGER | Wall-clock duration |
| `status` | TEXT | `completed` or `failed` |
| `status` | TEXT | `completed`, `failed`, `cancelled`, `superseded`, or `expired` |
| `ttl_seconds` | INTEGER | TTL duration in seconds (NULL = no TTL) |
| `ttl_from` | TEXT DEFAULT 'submission' | When TTL clock started |
| `expires_at` | TEXT | ISO 8601 deadline (NULL = no expiry) |

**Index:** `idx_history_type_completed(task_type, completed_at DESC) WHERE status = 'completed'` — for per-type history queries and throughput calculations.

Expand Down
2 changes: 2 additions & 0 deletions docs/progress-and-events.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ tokio::spawn(async move {
| `Cancelled(TaskEventHeader)` | Task cancelled via `scheduler.cancel()` |
| `Progress { header, percent, message }` | Progress update from executor |
| `Waiting { task_id, children_count }` | Parent task waiting for children to complete |
| `TaskExpired { header, age }` | Task expired (TTL exceeded) — `age` is the time since the TTL clock started |
| `Paused` | Scheduler globally paused via `pause_all()` |
| `Resumed` | Scheduler resumed via `resume_all()` |

Expand All @@ -98,6 +99,7 @@ Task-specific events share a `TaskEventHeader` with `task_id`, `task_type`, `key
| Error alerting | `Failed` where `will_retry` is false |
| A "pause/resume" button | `Paused`, `Resumed` |
| Upload status indicators | `Dispatched`, `Progress`, `Completed`, `Failed`, `Preempted` |
| Stale task cleanup UI | `TaskExpired` |

## Querying progress

Expand Down
2 changes: 2 additions & 0 deletions docs/query-apis.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ Use these queries to build dashboards, debug stuck tasks, and gather analytics a
| `history_by_key(key)` | `Vec<TaskHistoryRecord>` | All past runs matching a dedup key. |
| `failed_tasks(limit)` | `Vec<TaskHistoryRecord>` | Recent failures with error messages. |

History records include a `status` field that can be `completed`, `failed`, `cancelled`, `superseded`, or `expired`. Filter by status to find expired tasks (e.g., for analytics on TTL effectiveness).

## Aggregate queries

| Method | Returns | Description |
Expand Down
4 changes: 4 additions & 0 deletions docs/quick-start.md
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,10 @@ impl TypedTask for ResizeTask {

fn expected_io(&self) -> IoBudget { IoBudget::disk(4096, 1024) }
fn priority(&self) -> Priority { Priority::NORMAL }

// Optional: expire if not started within 10 minutes.
// fn ttl(&self) -> Option<Duration> { Some(Duration::from_secs(600)) }
// fn ttl_from(&self) -> TtlFrom { TtlFrom::Submission }
}

// Submit:
Expand Down
13 changes: 13 additions & 0 deletions migrations/004_ttl.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
-- Migration 004: Task TTL / automatic expiry
-- Adds TTL columns to both tasks and task_history tables.

ALTER TABLE tasks ADD COLUMN ttl_seconds INTEGER;
ALTER TABLE tasks ADD COLUMN ttl_from TEXT NOT NULL DEFAULT 'submission';
ALTER TABLE tasks ADD COLUMN expires_at TEXT;

ALTER TABLE task_history ADD COLUMN ttl_seconds INTEGER;
ALTER TABLE task_history ADD COLUMN ttl_from TEXT NOT NULL DEFAULT 'submission';
ALTER TABLE task_history ADD COLUMN expires_at TEXT;

CREATE INDEX IF NOT EXISTS idx_tasks_expires ON tasks (expires_at ASC)
WHERE expires_at IS NOT NULL AND status IN ('pending', 'paused');
Loading
Loading