Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 11 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,22 @@ throughput.

```rust
use std::sync::Arc;
use std::time::Duration;
use tokio_util::sync::CancellationToken;
use taskmill::{
Scheduler, Priority, TaskSubmission, TaskExecutor,
TaskContext, TaskResult, TaskError, ShutdownMode,
TaskContext, TaskError,
};

struct ThumbnailGenerator;

impl TaskExecutor for ThumbnailGenerator {
async fn execute<'a>(
&'a self, ctx: &'a TaskContext,
) -> Result<TaskResult, TaskError> {
ctx.progress.report(0.5, Some("resizing".into()));
Ok(TaskResult { actual_read_bytes: 4096, actual_write_bytes: 1024 })
) -> Result<(), TaskError> {
ctx.progress().report(0.5, Some("resizing".into()));
ctx.record_read_bytes(4096);
ctx.record_write_bytes(1024);
Ok(())
}
}

Expand All @@ -41,12 +42,11 @@ async fn main() {
.await
.unwrap();

scheduler.submit(&TaskSubmission::with_payload(
"thumbnail",
Priority::NORMAL,
&serde_json::json!({"path": "/photos/img.jpg"}),
4096, 1024,
).unwrap()).await.unwrap();
let sub = TaskSubmission::new("thumbnail")
.payload_json(&serde_json::json!({"path": "/photos/img.jpg"}))
.unwrap()
.expected_io(4096, 1024);
scheduler.submit(&sub).await.unwrap();

let token = CancellationToken::new();
scheduler.run(token).await;
Expand Down
19 changes: 11 additions & 8 deletions docs/ARCHITECTURE.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ SQLite persistence, designed for desktop apps (Tauri) and background services.
taskmill/
src/
lib.rs — public API re-exports
task.rs — TaskRecord, TaskSubmission, TaskResult, TaskError, TypedTask, etc.
task.rs — TaskRecord, TaskSubmission, TaskError, TypedTask, etc.
priority.rs — Priority newtype (u8, lower = higher priority)
store.rs — TaskStore: SQLite persistence, atomic pop, queries, retention
registry.rs — TaskExecutor trait (RPITIT), TaskContext, TaskTypeRegistry
Expand All @@ -25,6 +25,7 @@ taskmill/
sysinfo_monitor.rs — SysinfoSampler via `sysinfo` crate (feature-gated)
migrations/
001_tasks.sql — tasks table, task_history table, indexes
002_add_label.sql — adds human-readable label column
```

## Task lifecycle
Expand All @@ -39,7 +40,7 @@ Submit ──► Pending ──► Running ──► Completed (moved to task_h
└─────────────────┘ (resumed when preemptors finish)
```

Active-queue states (`tasks` table): `pending`, `running`, `paused`.
Active-queue states (`tasks` table): `pending`, `running`, `paused`, `waiting`.
Terminal states (`task_history` table): `completed`, `failed`.

## Data flow
Expand Down Expand Up @@ -67,6 +68,7 @@ flowchart TD
| `id` | `INTEGER PRIMARY KEY` — insertion order within tier|
| `task_type` | Executor lookup name (e.g. `"scan-l3"`) |
| `key` | `UNIQUE` — SHA-256 deduplication key |
| `label` | Human-readable display label (original key or type)|
| `priority` | `INTEGER NOT NULL` — 0 (highest) to 255 (lowest) |
| `status` | `TEXT` — `pending`, `running`, or `paused` |
| `payload` | `BLOB` — opaque, max 1 MiB, executor-defined |
Expand Down Expand Up @@ -122,8 +124,9 @@ type is always incorporated so different types with identical payloads never
collide.

Enforcement uses the `UNIQUE(key)` constraint with `INSERT OR IGNORE` — a
duplicate submission silently returns `None`. The key stays occupied while the
task is active (including retries) and is freed when the task moves to history.
duplicate submission returns `SubmitOutcome::Duplicate`. The key stays occupied
while the task is active (including retries) and is freed when the task moves
to history.

## Priority queue

Expand Down Expand Up @@ -323,7 +326,7 @@ When a task is submitted at or above `preempt_priority` (default `REALTIME`):
3. On subsequent poll cycles, paused tasks are only resumed when no active
preemptors remain — this prevents a thrashing loop of pause/resume/re-preempt.

Executors cooperate by checking `ctx.token.is_cancelled()` at yield points. An
Executors cooperate by checking `ctx.token().is_cancelled()` at yield points. An
executor that ignores cancellation continues running but is no longer tracked;
its completion or failure is still recorded normally.

Expand Down Expand Up @@ -362,8 +365,8 @@ All variants derive `Serialize`/`Deserialize`.

### Executor-reported

Executors call `ctx.progress.report(percent, message)` or
`ctx.progress.report_fraction(completed, total, message)`. These emit
Executors call `ctx.progress().report(percent, message)` or
`ctx.progress().report_fraction(completed, total, message)`. These emit
`SchedulerEvent::Progress` and update the active task map.

### Throughput-extrapolated
Expand Down Expand Up @@ -510,7 +513,7 @@ app.manage(scheduler); // Scheduler is Clone — no Arc needed
#[tauri::command]
async fn submit_task(
scheduler: tauri::State<'_, Scheduler>,
) -> Result<Option<i64>, StoreError> {
) -> Result<SubmitOutcome, StoreError> {
scheduler.submit(&submission).await
}

Expand Down
18 changes: 12 additions & 6 deletions docs/features.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,13 @@ A complete list of taskmill's capabilities.

## Retries

- **Automatic requeue** — retryable failures (`TaskError { retryable: true }`) are requeued at the same priority with `retry_count += 1`.
- **Automatic requeue** — retryable failures (`TaskError::retryable(msg)`) are requeued at the same priority with `retry_count += 1`.
- **Configurable limit** — `max_retries` (default 3) controls how many times a task can be retried before permanent failure.
- **Dedup preserved** — the key stays occupied during retries, preventing duplicate submission of in-progress work.

## Progress Reporting

- **Executor-reported progress** — report percentage or fraction-based progress via `ctx.progress.report()` or `ctx.progress.report_fraction()`.
- **Executor-reported progress** — report percentage or fraction-based progress via `ctx.progress().report()` or `ctx.progress().report_fraction()`.
- **Throughput-based extrapolation** — for tasks without explicit reports, the scheduler extrapolates progress from historical average duration, capped at 99% to avoid false completion signals.
- **Event-driven** — progress updates are emitted as `SchedulerEvent::Progress` for real-time UI updates.

Expand All @@ -72,13 +72,19 @@ A complete list of taskmill's capabilities.

## Typed Payloads

- **Structured submission** — `TaskSubmission::with_payload()` serializes any `Serialize` type to JSON bytes.
- **Type-safe deserialization** — `TaskRecord::deserialize_payload::<T>()` in executors.
- **TypedTask trait** — define `TASK_TYPE`, default priority, and expected IO on your struct. Submit with `scheduler.submit_typed()` and deserialize with `ctx.deserialize_typed()`.
- **Builder-style submission** — `TaskSubmission::new(type).payload_json(&data)?.expected_io(r, w)` for ergonomic construction with serialization.
- **Type-safe deserialization** — `ctx.payload::<T>()?` in executors for zero-boilerplate extraction.
- **TypedTask trait** — define `TASK_TYPE`, default priority, and expected IO on your struct. Submit with `scheduler.submit_typed()` and deserialize with `ctx.payload::<T>()`.

## Child Tasks

- **Hierarchical execution** — spawn child tasks from an executor via `ctx.spawn_child()`. The parent enters a `waiting` state and resumes for finalization after all children complete.
- **Two-phase execution** — implement `TaskExecutor::finalize()` for assembly work after children finish (e.g. `CompleteMultipartUpload`).
- **Fail-fast** — when enabled (default), the first child failure cancels siblings and fails the parent immediately.

## Batch Operations

- **Bulk enqueue** — `submit_batch()` wraps many inserts in a single SQLite transaction. Returns `Vec<Option<i64>>` where `None` indicates deduplication.
- **Bulk enqueue** — `submit_batch()` wraps many inserts in a single SQLite transaction. Returns `Vec<SubmitOutcome>` indicating whether each was inserted, upgraded, requeued, or deduplicated.

## Graceful Shutdown

Expand Down
19 changes: 6 additions & 13 deletions docs/io-and-backpressure.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,25 +9,18 @@ Taskmill combines two independent gating mechanisms — IO budget tracking and c
Every `TaskSubmission` includes expected IO:

```rust
let sub = TaskSubmission {
task_type: "scan".into(),
key: None,
priority: Priority::NORMAL,
payload: Some(data),
expected_read_bytes: 50_000, // caller's estimate
expected_write_bytes: 10_000,
};
let sub = TaskSubmission::new("scan")
.payload_raw(data)
.expected_io(50_000, 10_000); // caller's estimate
```

### Completion actuals

Executors report actual IO in `TaskResult`:
Executors report actual IO via context methods during execution:

```rust
Ok(TaskResult {
actual_read_bytes: 48_312,
actual_write_bytes: 9_876,
})
ctx.record_read_bytes(48_312);
ctx.record_write_bytes(9_876);
```

Actual values are stored in `task_history` for learning.
Expand Down
31 changes: 19 additions & 12 deletions docs/persistence-and-recovery.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,14 @@ Holds pending, running, and paused tasks.
| `id` | INTEGER PRIMARY KEY | Insertion-order ID |
| `task_type` | TEXT NOT NULL | Executor lookup name |
| `key` | TEXT NOT NULL UNIQUE | SHA-256 dedup key |
| `label` | TEXT NOT NULL | Human-readable display name (original dedup key or task type) |
| `priority` | INTEGER NOT NULL | 0–255 (lower = higher priority) |
| `status` | TEXT DEFAULT 'pending' | `pending`, `running`, or `paused` |
| `status` | TEXT DEFAULT 'pending' | `pending`, `running`, `paused`, or `waiting` |
| `payload` | BLOB | Opaque task data (max 1 MiB) |
| `expected_read_bytes` | INTEGER | Estimated read IO |
| `expected_write_bytes` | INTEGER | Estimated write IO |
| `parent_id` | INTEGER | Parent task ID for child tasks (NULL for top-level) |
| `fail_fast` | INTEGER DEFAULT 1 | Whether child failure cancels siblings and fails parent |
| `retry_count` | INTEGER DEFAULT 0 | Number of retries so far |
| `last_error` | TEXT | Most recent error message |
| `created_at` | TEXT | ISO 8601 timestamp |
Expand All @@ -31,7 +34,7 @@ Holds pending, running, and paused tasks.

| Column | Type | Description |
|--------|------|-------------|
| *(all columns from `tasks`)* | | |
| *(all columns from `tasks`, including `label`, `parent_id`, `fail_fast`)* | | |
| `actual_read_bytes` | INTEGER | Reported by executor |
| `actual_write_bytes` | INTEGER | Reported by executor |
| `completed_at` | TEXT | ISO 8601 timestamp |
Expand Down Expand Up @@ -61,29 +64,33 @@ This resets any tasks that were mid-execution when the process died. The behavio

Every task gets a SHA-256 key: `SHA-256(task_type + ":" + (explicit_key OR payload))`.

- **Implicit key** — if no `key` is provided, the payload bytes are used. Tasks with the same type and payload get the same key.
- **Explicit key** — set `TaskSubmission.key` to control deduplication yourself. Useful when two payloads represent the same logical work (e.g., different timestamps but same file path).
- **Implicit key** — if no key is provided, the payload bytes are used. Tasks with the same type and payload get the same key.
- **Explicit key** — use the `.key()` builder method to control deduplication yourself. Useful when two payloads represent the same logical work (e.g., different timestamps but same file path). The explicit key is also stored as the display `label`.
- **Type scoping** — the task type is always part of the hash, so `("resize", payload)` and `("compress", payload)` never collide.

### Lifecycle

A key is "occupied" while the task is in the `tasks` table (pending, running, paused, or retrying). When the task moves to `task_history` (completed or failed), the key is freed and can be resubmitted.
A key is "occupied" while the task is in the `tasks` table (pending, running, paused, waiting, or retrying). When the task moves to `task_history` (completed or failed), the key is freed and can be resubmitted.

### Submission behavior

```rust
// Returns Some(id) if inserted
let id = scheduler.submit(&submission).await?; // Ok(Some(42))

// Returns None if a task with the same key already exists
let id = scheduler.submit(&submission).await?; // Ok(None)
use taskmill::SubmitOutcome;

let outcome = scheduler.submit(&submission).await?;
match outcome {
SubmitOutcome::Inserted(id) => println!("new task: {id}"),
SubmitOutcome::Duplicate => println!("already queued"),
SubmitOutcome::Upgraded(id) => println!("priority upgraded: {id}"),
SubmitOutcome::Requeued(id) => println!("requeued from history: {id}"),
}
```

`submit_batch()` applies the same dedup within a single transaction:

```rust
let ids = scheduler.submit_batch(&[sub1, sub2, sub3]).await?;
// ids = [Some(1), None, Some(2)] — sub2 was a duplicate
let outcomes = scheduler.submit_batch(&[sub1, sub2, sub3]).await?;
// outcomes: Vec<SubmitOutcome> — sub2 might be Duplicate
```

### Looking up tasks by dedup key
Expand Down
18 changes: 7 additions & 11 deletions docs/priorities-and-preemption.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,28 +43,24 @@ Executors should check for cancellation at natural yield points:
impl TaskExecutor for MyExecutor {
async fn execute<'a>(
&'a self, ctx: &'a TaskContext,
) -> Result<TaskResult, TaskError> {
) -> Result<(), TaskError> {
for chunk in chunks {
// Check before each unit of work
if ctx.token.is_cancelled() {
return Err(TaskError {
message: "preempted".into(),
retryable: true,
actual_read_bytes: bytes_read_so_far,
actual_write_bytes: bytes_written_so_far,
});
if ctx.token().is_cancelled() {
return Err(TaskError::retryable("preempted"));
}

process(chunk).await;
ctx.progress.report_fraction(i, total, None);
ctx.record_read_bytes(chunk.len() as i64);
ctx.progress().report_fraction(i, total, None);
}

Ok(TaskResult { actual_read_bytes: total_read, actual_write_bytes: total_written })
Ok(())
}
}
```

Returning a retryable error on preemption is optional — the scheduler handles pausing regardless. But it gives the executor a chance to report partial IO and clean up.
Returning a retryable error on preemption is optional — the scheduler handles pausing regardless. But it gives the executor a chance to clean up.

### Configuring preemption threshold

Expand Down
23 changes: 12 additions & 11 deletions docs/progress-reporting.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,26 +4,26 @@ Taskmill provides real-time progress tracking for running tasks, combining execu

## Reporting from executors

Executors receive a `ProgressReporter` via `ctx.progress`:
Executors receive a `ProgressReporter` via `ctx.progress()`:

```rust
impl TaskExecutor for MyExecutor {
async fn execute<'a>(
&'a self, ctx: &'a TaskContext,
) -> Result<TaskResult, TaskError> {
) -> Result<(), TaskError> {
let items = get_work_items();

for (i, item) in items.iter().enumerate() {
process(item).await;

// Percentage-based (0.0 to 1.0)
ctx.progress.report(
ctx.progress().report(
(i + 1) as f32 / items.len() as f32,
Some(format!("processed {}/{}", i + 1, items.len())),
);
}

Ok(TaskResult { actual_read_bytes: 0, actual_write_bytes: 0 })
Ok(())
}
}
```
Expand All @@ -33,7 +33,7 @@ impl TaskExecutor for MyExecutor {
For count-based progress:

```rust
ctx.progress.report_fraction(processed, total, Some("importing".into()));
ctx.progress().report_fraction(processed, total, Some("importing".into()));
// Automatically computes: processed as f32 / total as f32
```

Expand All @@ -46,6 +46,7 @@ SchedulerEvent::Progress {
task_id: 42,
task_type: "resize".into(),
key: "abc123".into(),
label: "my-image.jpg".into(),
percent: 0.5,
message: Some("resizing".into()),
}
Expand Down Expand Up @@ -107,12 +108,12 @@ All scheduler state changes are broadcast as `SchedulerEvent` variants:

| Event | When |
|-------|------|
| `Dispatched { task_id, task_type, key }` | Task popped from queue and executor spawned |
| `Completed { task_id, task_type, key }` | Task finished successfully |
| `Failed { task_id, task_type, key, error, will_retry }` | Task failed (includes whether it will be retried) |
| `Preempted { task_id, task_type, key }` | Task paused for higher-priority work |
| `Cancelled { task_id, task_type, key }` | Task cancelled via `scheduler.cancel()` |
| `Progress { task_id, task_type, key, percent, message }` | Progress update from executor |
| `Dispatched { task_id, task_type, key, label }` | Task popped from queue and executor spawned |
| `Completed { task_id, task_type, key, label }` | Task finished successfully |
| `Failed { task_id, task_type, key, label, error, will_retry }` | Task failed (includes whether it will be retried) |
| `Preempted { task_id, task_type, key, label }` | Task paused for higher-priority work |
| `Cancelled { task_id, task_type, key, label }` | Task cancelled via `scheduler.cancel()` |
| `Progress { task_id, task_type, key, label, percent, message }` | Progress update from executor |
| `Paused` | Scheduler globally paused via `pause_all()` |
| `Resumed` | Scheduler resumed via `resume_all()` |

Expand Down
Loading
Loading