Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ throughput.
use std::sync::Arc;
use tokio_util::sync::CancellationToken;
use taskmill::{
Scheduler, Priority, TaskSubmission, TaskExecutor,
Scheduler, Priority, IoBudget, TaskSubmission, TaskExecutor,
TaskContext, TaskError,
};

Expand Down Expand Up @@ -44,8 +44,7 @@ async fn main() {

let sub = TaskSubmission::new("thumbnail")
.payload_json(&serde_json::json!({"path": "/photos/img.jpg"}))
.unwrap()
.expected_io(4096, 1024);
.expected_io(IoBudget::disk(4096, 1024));
scheduler.submit(&sub).await.unwrap();

let token = CancellationToken::new();
Expand Down
36 changes: 22 additions & 14 deletions docs/ARCHITECTURE.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,10 @@ flowchart TD
| `priority` | `INTEGER NOT NULL` — 0 (highest) to 255 (lowest) |
| `status` | `TEXT` — `pending`, `running`, or `paused` |
| `payload` | `BLOB` — opaque, max 1 MiB, executor-defined |
| `expected_read_bytes` | Caller's IO estimate for scheduling decisions |
| `expected_write_bytes`| Caller's IO estimate for scheduling decisions |
| `expected_read_bytes` | Caller's disk read IO estimate (part of `IoBudget`)|
| `expected_write_bytes`| Caller's disk write IO estimate (part of `IoBudget`)|
| `expected_net_rx_bytes`| Caller's network RX estimate (part of `IoBudget`) |
| `expected_net_tx_bytes`| Caller's network TX estimate (part of `IoBudget`) |
| `retry_count` | Incremented on each retryable failure |
| `last_error` | Most recent error message (for diagnostics) |
| `started_at` | Set when popped; cleared on pause |
Expand All @@ -90,8 +92,10 @@ insert into `task_history` in one transaction). Additional columns:

| Column | Purpose |
|-----------------------|----------------------------------------------------|
| `actual_read_bytes` | Reported by executor on completion |
| `actual_write_bytes` | Reported by executor on completion |
| `actual_read_bytes` | Reported by executor on completion (part of `IoBudget`) |
| `actual_write_bytes` | Reported by executor on completion (part of `IoBudget`) |
| `actual_net_rx_bytes` | Reported by executor on completion (part of `IoBudget`) |
| `actual_net_tx_bytes` | Reported by executor on completion (part of `IoBudget`) |
| `completed_at` | Timestamp of completion or failure |
| `duration_ms` | Computed from `started_at` to `completed_at` |

Expand Down Expand Up @@ -253,10 +257,10 @@ knowing the concrete gate type.

### Expected vs actual IO

Callers provide `expected_read_bytes` / `expected_write_bytes` on submission.
Executors report `actual_read_bytes` / `actual_write_bytes` on completion. The
history table stores both, enabling learning via `avg_throughput()` and
`history_stats()`.
Callers provide an `IoBudget` (disk read/write, network rx/tx) on submission.
Executors report actual IO via context methods on completion. The history table
stores both expected and actual values, enabling learning via `avg_throughput()`
and `history_stats()`.

### IO budget heuristic

Expand Down Expand Up @@ -350,15 +354,19 @@ Executor returns Err(TaskError)

| Event | When |
|-------------|----------------------------------------------|
| `Dispatched`| Task popped and executor spawned |
| `Completed` | Task finished successfully |
| `Failed` | Task failed (includes `will_retry` flag) |
| `Preempted` | Task paused for higher-priority work |
| `Cancelled` | Task cancelled via `cancel()` |
| `Progress` | Executor reported progress (0.0–1.0) |
| `Dispatched(TaskEventHeader)` | Task popped and executor spawned |
| `Completed(TaskEventHeader)` | Task finished successfully |
| `Failed { header, error, will_retry }` | Task failed (includes retry flag) |
| `Preempted(TaskEventHeader)` | Task paused for higher-priority work |
| `Cancelled(TaskEventHeader)` | Task cancelled via `cancel()` |
| `Progress { header, percent, message }` | Progress update from executor |
| `Waiting { task_id, children_count }` | Parent entered waiting state |
| `Paused` | Scheduler globally paused |
| `Resumed` | Scheduler globally resumed |

Task-specific variants share a `TaskEventHeader` containing `task_id`,
`task_type`, `key`, and `label`. Use `event.header()` to access it generically.

All variants derive `Serialize`/`Deserialize`.

## Progress reporting
Expand Down
10 changes: 5 additions & 5 deletions docs/features.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ A complete list of taskmill's capabilities.

## IO Awareness

- **Expected/actual IO tracking** — submit estimated read/write bytes; executors report actual bytes on completion.
- **Network IO tracking** — tasks can declare expected network RX/TX bytes via `expected_net_io()` and report actuals via `ctx.record_net_rx_bytes()` / `ctx.record_net_tx_bytes()`.
- **Expected/actual IO tracking** — submit an `IoBudget` (disk read/write, network rx/tx); executors report actual bytes on completion.
- **Network IO tracking** — tasks can declare expected network RX/TX bytes via `IoBudget::net()` and report actuals via `ctx.record_net_rx_bytes()` / `ctx.record_net_tx_bytes()`.
- **IO budget gating** — the scheduler compares running task IO estimates against EWMA-smoothed system throughput. New work is deferred when cumulative IO would exceed 80% of observed disk capacity.
- **Learning from history** — `avg_throughput()` and `history_stats()` compute per-type IO averages from actual completions, enabling callers to refine estimates over time.

Expand Down Expand Up @@ -70,7 +70,7 @@ A complete list of taskmill's capabilities.

## Lifecycle Events

- **Broadcast channel** — subscribe via `scheduler.subscribe()` to receive `SchedulerEvent` variants: `Dispatched`, `Completed`, `Failed`, `Preempted`, `Cancelled`, `Progress`, `Waiting`, `Paused`, `Resumed`.
- **Broadcast channel** — subscribe via `scheduler.subscribe()` to receive `SchedulerEvent` variants: `Dispatched`, `Completed`, `Failed`, `Preempted`, `Cancelled`, `Progress`, `Waiting`, `Paused`, `Resumed`. Task-specific variants share a `TaskEventHeader` with `task_id`, `task_type`, `key`, and `label`.
- **Tauri-ready** — all events are `Serialize`, designed for direct bridging to frontend via `app_handle.emit()`.

## Task Management
Expand All @@ -81,9 +81,9 @@ A complete list of taskmill's capabilities.

## Typed Payloads

- **Builder-style submission** — `TaskSubmission::new(type).payload_json(&data)?.expected_io(r, w)` for ergonomic construction with serialization. Use `.label("display name")` to set a human-readable display label independent of the dedup key.
- **Builder-style submission** — `TaskSubmission::new(type).payload_json(&data).expected_io(budget)` for ergonomic construction with serialization. Serialization errors are deferred to submit time. Use `.label("display name")` to set a human-readable display label independent of the dedup key.
- **Type-safe deserialization** — `ctx.payload::<T>()?` in executors for zero-boilerplate extraction.
- **TypedTask trait** — define `TASK_TYPE`, default priority, and expected IO on your struct. Submit with `scheduler.submit_typed()` and deserialize with `ctx.payload::<T>()`.
- **TypedTask trait** — define `TASK_TYPE`, default priority, expected IO, key, and label on your struct. Submit with `scheduler.submit_typed()` and deserialize with `ctx.payload::<T>()`.

## Child Tasks

Expand Down
11 changes: 7 additions & 4 deletions docs/io-and-backpressure.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@ Taskmill combines two independent gating mechanisms — IO budget tracking and c
Every `TaskSubmission` includes expected IO:

```rust
use taskmill::IoBudget;

let sub = TaskSubmission::new("scan")
.payload_raw(data)
.expected_io(50_000, 10_000); // caller's estimate
.expected_io(IoBudget::disk(50_000, 10_000)); // caller's estimate
```

### Completion actuals
Expand All @@ -30,10 +32,11 @@ Actual values are stored in `task_history` for learning.
Tasks that perform network transfers can declare expected bandwidth:

```rust
use taskmill::IoBudget;

let sub = TaskSubmission::new("upload")
.payload_json(&upload_payload)?
.expected_io(0, 0) // no disk IO
.expected_net_io(0, 50_000_000); // 50 MB upload
.payload_json(&upload_payload)
.expected_io(IoBudget::net(0, 50_000_000)); // 50 MB upload
```

Executors report actual network bytes during execution:
Expand Down
118 changes: 118 additions & 0 deletions docs/migrating-to-0.4.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
# Migrating from 0.3.x to 0.4.0

This guide covers the breaking API changes in taskmill 0.4.0. All changes are API-level — database columns are unchanged, so existing data is fully compatible.

## `IoBudget` replaces scattered IO fields

The four separate IO byte fields on `TypedTask`, `TaskSubmission`, `TaskRecord`, and `TaskHistoryRecord` have been consolidated into a single `IoBudget` struct. `TaskMetrics` has been removed — use `IoBudget` everywhere instead.

**Before:**
```rust
// TypedTask: 4 separate methods
impl TypedTask for MyTask {
const TASK_TYPE: &'static str = "my-task";
fn expected_read_bytes(&self) -> i64 { 4096 }
fn expected_write_bytes(&self) -> i64 { 1024 }
fn expected_net_rx_bytes(&self) -> i64 { 0 }
fn expected_net_tx_bytes(&self) -> i64 { 0 }
}

// TaskSubmission: two builder methods
TaskSubmission::new("upload")
.expected_io(4096, 1024)
.expected_net_io(0, 8192)

// Accessing fields on TaskRecord / TaskHistoryRecord
record.expected_read_bytes
history.actual_read_bytes
```

**After:**
```rust
// TypedTask: single method returning IoBudget
impl TypedTask for MyTask {
const TASK_TYPE: &'static str = "my-task";
fn expected_io(&self) -> IoBudget { IoBudget::disk(4096, 1024) }
}

// TaskSubmission: single builder method
TaskSubmission::new("upload")
.expected_io(IoBudget { disk_write: 1024, net_tx: 8192, ..Default::default() })

// Accessing fields on TaskRecord / TaskHistoryRecord
record.expected_io.disk_read
history.actual_io.map(|io| io.disk_read)
```

`IoBudget` provides two convenience constructors:
- `IoBudget::disk(read, write)` — sets disk fields, zeroes network
- `IoBudget::net(rx, tx)` — sets network fields, zeroes disk

The `TaskContext` recording methods (`record_read_bytes`, `record_write_bytes`, etc.) are unchanged.

## `TypedTask` now supports `key()` and `label()`

Two new optional default methods allow typed tasks to declare their own dedup key and UI label:

```rust
impl TypedTask for MyTask {
const TASK_TYPE: &'static str = "my-task";
fn key(&self) -> Option<String> { Some(self.file_path.clone()) }
fn label(&self) -> Option<String> { Some(format!("Process {}", self.file_path)) }
}
```

When `None` (the default), behavior is unchanged — key is derived from payload hash, label from task type. Existing `TypedTask` impls require no changes.

## `SchedulerEvent` uses `TaskEventHeader`

Per-task event variants now carry a `TaskEventHeader` struct instead of repeating `task_id`, `task_type`, `key`, and `label` as individual fields.

**Before:**
```rust
match event {
SchedulerEvent::Completed { task_id, task_type, key, label } => { ... }
SchedulerEvent::Failed { task_id, label, error, will_retry, .. } => { ... }
SchedulerEvent::Progress { task_id, percent, message, .. } => { ... }
}
```

**After:**
```rust
match event {
SchedulerEvent::Completed(header) => { /* header.task_id, header.label, ... */ }
SchedulerEvent::Failed { header, error, will_retry } => { ... }
SchedulerEvent::Progress { header, percent, message } => { ... }
}

// Or use the convenience accessor:
if let Some(header) = event.header() { ... }
```

`EstimatedProgress` fields `task_id`, `task_type`, `key`, `label` are also nested under `header: TaskEventHeader`.

## `payload_json()` and `from_typed()` no longer return `Result`

Both methods now always return `Self`, keeping the builder chain unbroken. Serialization errors are deferred and surfaced when calling `scheduler.submit()` / `store.submit()` as `StoreError::Serialization`.

**Before:**
```rust
let sub = TaskSubmission::new("task")
.key("k")
.payload_json(&data)? // breaks the chain
.priority(Priority::HIGH);

let sub = TaskSubmission::from_typed(&task)?;
```

**After:**
```rust
let sub = TaskSubmission::new("task")
.key("k")
.payload_json(&data) // always returns Self
.priority(Priority::HIGH);

let sub = TaskSubmission::from_typed(&task);
```

Remove any `?` operators on `payload_json()` or `from_typed()` calls. Errors are still caught before the task is persisted — they just surface at submit time instead.
12 changes: 8 additions & 4 deletions docs/persistence-and-recovery.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ Holds pending, running, and paused tasks.
| `priority` | INTEGER NOT NULL | 0–255 (lower = higher priority) |
| `status` | TEXT DEFAULT 'pending' | `pending`, `running`, `paused`, or `waiting` |
| `payload` | BLOB | Opaque task data (max 1 MiB) |
| `expected_read_bytes` | INTEGER | Estimated read IO |
| `expected_write_bytes` | INTEGER | Estimated write IO |
| `expected_read_bytes` | INTEGER | Estimated disk read IO (part of `IoBudget`) |
| `expected_write_bytes` | INTEGER | Estimated disk write IO (part of `IoBudget`) |
| `expected_net_rx_bytes` | INTEGER | Estimated network RX (part of `IoBudget`) |
| `expected_net_tx_bytes` | INTEGER | Estimated network TX (part of `IoBudget`) |
| `parent_id` | INTEGER | Parent task ID for child tasks (NULL for top-level) |
| `fail_fast` | INTEGER DEFAULT 1 | Whether child failure cancels siblings and fails parent |
| `retry_count` | INTEGER DEFAULT 0 | Number of retries so far |
Expand All @@ -35,8 +37,10 @@ Holds pending, running, and paused tasks.
| Column | Type | Description |
|--------|------|-------------|
| *(all columns from `tasks`, including `label`, `parent_id`, `fail_fast`)* | | |
| `actual_read_bytes` | INTEGER | Reported by executor |
| `actual_write_bytes` | INTEGER | Reported by executor |
| `actual_read_bytes` | INTEGER | Reported by executor (part of `IoBudget`) |
| `actual_write_bytes` | INTEGER | Reported by executor (part of `IoBudget`) |
| `actual_net_rx_bytes` | INTEGER | Reported by executor (part of `IoBudget`) |
| `actual_net_tx_bytes` | INTEGER | Reported by executor (part of `IoBudget`) |
| `completed_at` | TEXT | ISO 8601 timestamp |
| `duration_ms` | INTEGER | Wall-clock duration |
| `status` | TEXT | `completed` or `failed` |
Expand Down
32 changes: 18 additions & 14 deletions docs/progress-reporting.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,12 @@ Every `report()` call emits a `SchedulerEvent::Progress`:

```rust
SchedulerEvent::Progress {
task_id: 42,
task_type: "resize".into(),
key: "abc123".into(),
label: "my-image.jpg".into(),
header: TaskEventHeader {
task_id: 42,
task_type: "resize".into(),
key: "abc123".into(),
label: "my-image.jpg".into(),
},
percent: 0.5,
message: Some("resizing".into()),
}
Expand All @@ -58,8 +60,8 @@ Subscribe to events for real-time UI updates:
let mut events = scheduler.subscribe();
tokio::spawn(async move {
while let Ok(event) = events.recv().await {
if let SchedulerEvent::Progress { task_id, percent, message, .. } = event {
update_ui(task_id, percent, message);
if let SchedulerEvent::Progress { header, percent, message } = event {
update_ui(header.task_id, percent, message);
}
}
});
Expand All @@ -84,7 +86,7 @@ This means even tasks with no explicit progress reporting show movement in UI da
```rust
let progress = scheduler.estimated_progress().await;
for p in &progress {
println!("{} ({}): {:.0}%", p.task_type, p.key, p.percent * 100.0);
println!("{} ({}): {:.0}%", p.header.task_type, p.header.key, p.percent * 100.0);
// p.reported_percent — last executor-reported value (if any)
// p.extrapolated_percent — throughput-based estimate (if any)
// p.percent — best available: reported if present, else extrapolated
Expand All @@ -98,7 +100,7 @@ The `SchedulerSnapshot` includes progress for all running tasks:
```rust
let snap = scheduler.snapshot().await?;
for p in &snap.progress {
println!("{}: {:.0}%", p.key, p.percent * 100.0);
println!("{}: {:.0}%", p.header.key, p.percent * 100.0);
}
```

Expand All @@ -108,16 +110,18 @@ All scheduler state changes are broadcast as `SchedulerEvent` variants:

| Event | When |
|-------|------|
| `Dispatched { task_id, task_type, key, label }` | Task popped from queue and executor spawned |
| `Completed { task_id, task_type, key, label }` | Task finished successfully |
| `Failed { task_id, task_type, key, label, error, will_retry }` | Task failed (includes whether it will be retried) |
| `Preempted { task_id, task_type, key, label }` | Task paused for higher-priority work |
| `Cancelled { task_id, task_type, key, label }` | Task cancelled via `scheduler.cancel()` |
| `Progress { task_id, task_type, key, label, percent, message }` | Progress update from executor |
| `Dispatched(TaskEventHeader)` | Task popped from queue and executor spawned |
| `Completed(TaskEventHeader)` | Task finished successfully |
| `Failed { header, error, will_retry }` | Task failed (includes whether it will be retried) |
| `Preempted(TaskEventHeader)` | Task paused for higher-priority work |
| `Cancelled(TaskEventHeader)` | Task cancelled via `scheduler.cancel()` |
| `Progress { header, percent, message }` | Progress update from executor |
| `Waiting { task_id, children_count }` | Parent task entered waiting state after spawning children |
| `Paused` | Scheduler globally paused via `pause_all()` |
| `Resumed` | Scheduler resumed via `resume_all()` |

Task-specific variants share a `TaskEventHeader` struct with `task_id`, `task_type`, `key`, and `label`. Use `event.header()` to access it generically.

### Tauri bridge

Bridge events to the frontend in a Tauri app:
Expand Down
2 changes: 1 addition & 1 deletion docs/query-apis.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ All queries are available on `TaskStore`, accessed via `scheduler.store()`.
| `paused_count()` | `i64` | Count of paused tasks. |
| `task_by_id(id)` | `Option<TaskRecord>` | Look up an active task by row ID. |
| `task_by_key(key)` | `Option<TaskRecord>` | Look up an active task by dedup key. |
| `running_io_totals()` | `(i64, i64)` | Sum of `(expected_read_bytes, expected_write_bytes)` across running tasks. |
| `running_io_totals()` | `(i64, i64)` | Sum of `(expected_io.disk_read, expected_io.disk_write)` across running tasks. |

## History queries

Expand Down
Loading
Loading