From 0aa9890b83bd37e4e15d4e6b3da906a709d9f09f Mon Sep 17 00:00:00 2001 From: DJ Majumdar Date: Sat, 14 Mar 2026 11:02:27 -0700 Subject: [PATCH] docs: rewrite documentation to be user-facing MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Restructure all docs from specification-heavy "how it works" to user-facing "why and when to use it" with practical guidance. New: glossary, why-taskmill, Tauri upload queue guide, background service guide. Replaced: ARCHITECTURE.md → design.md, progress-reporting.md → progress-and-events.md. Deleted: features.md (content redistributed to topic pages). All existing code examples preserved. --- README.md | 94 ++--- docs/ARCHITECTURE.md | 557 ------------------------------ docs/configuration.md | 154 +++++++-- docs/design.md | 176 ++++++++++ docs/features.md | 125 ------- docs/glossary.md | 17 + docs/guides/background-service.md | 189 ++++++++++ docs/guides/tauri-upload-queue.md | 290 ++++++++++++++++ docs/io-and-backpressure.md | 194 +++++------ docs/persistence-and-recovery.md | 163 ++++----- docs/priorities-and-preemption.md | 115 ++++-- docs/progress-and-events.md | 155 +++++++++ docs/progress-reporting.md | 166 --------- docs/query-apis.md | 26 +- docs/quick-start.md | 147 ++++++-- docs/why-taskmill.md | 46 +++ 16 files changed, 1428 insertions(+), 1186 deletions(-) delete mode 100644 docs/ARCHITECTURE.md create mode 100644 docs/design.md delete mode 100644 docs/features.md create mode 100644 docs/glossary.md create mode 100644 docs/guides/background-service.md create mode 100644 docs/guides/tauri-upload-queue.md create mode 100644 docs/progress-and-events.md delete mode 100644 docs/progress-reporting.md create mode 100644 docs/why-taskmill.md diff --git a/README.md b/README.md index eea65d5..c345577 100644 --- a/README.md +++ b/README.md @@ -1,12 +1,10 @@ # Taskmill -Adaptive priority work scheduler with IO-aware concurrency and SQLite persistence. +A persistent, priority-aware task scheduler for Rust with IO-aware concurrency. -Taskmill is an async task queue for Rust applications that persists work to SQLite, -schedules by priority with IO-budget awareness, and supports preemption, retries, and -composable backpressure. Designed for desktop apps (Tauri, etc.) and background services -where tasks have measurable IO costs and the system needs to avoid saturating disk -throughput. +Taskmill is a task queue for desktop apps and background services where work needs to survive crashes, respect system resources, and stay visible to users. It persists tasks to SQLite, schedules by priority with preemption, and automatically defers work when disk or network throughput is saturated. + +Read more about the [motivation and use cases](docs/why-taskmill.md). ## Quick example @@ -52,70 +50,38 @@ async fn main() { } ``` -## Shared scheduler (library embedding) +## Key capabilities -A single `Scheduler` can be shared across an application and any libraries it embeds. -Multiple state types can coexist — each is keyed by its concrete `TypeId`, and new state -can be injected after the scheduler is built via `register_state`. +- **Survive crashes** — tasks are persisted to SQLite and automatically recovered on restart +- **Stay responsive** — IO-aware scheduling defers work when disk or network throughput is saturated +- **Prioritize what matters** — 256-level priority queue with preemption lets urgent work interrupt background tasks +- **Show progress** — executor-reported and throughput-extrapolated progress for real-time UI updates +- **Avoid duplicate work** — key-based deduplication prevents the same task from being queued twice +- **React to system load** — composable backpressure from any signal (disk, network, memory, battery, API limits) +- **Control concurrency** — per-group limits (e.g., per S3 bucket), global limits, runtime-adjustable +- **Build for Tauri** — `Clone`, `Serialize` on all types; events bridge directly to frontends -```rust -use std::sync::Arc; -use taskmill::Scheduler; - -// The host app builds the scheduler and registers its own executors. -let scheduler = Scheduler::builder() - .store_path("app.db") - .executor("thumbnail", Arc::new(ThumbnailGenerator)) - .app_state(MyAppServices { /* ... */ }) - .max_concurrency(4) - .build() - .await - .unwrap(); - -// A library can inject its own state after build. -scheduler.register_state(Arc::new(LibraryState { /* ... */ })).await; - -// Both the host and the library submit tasks to the same queue. -// The host manages the run loop. -let token = CancellationToken::new(); -scheduler.run(token).await; -``` +## Where to start -## Features - -- **SQLite persistence** — tasks survive restarts; crash recovery requeues interrupted work -- **256-level priority queue** — with preemption of lower-priority tasks -- **IO-aware scheduling** — defers work when disk or network throughput is saturated -- **Key-based deduplication** — SHA-256 keys prevent duplicate submissions -- **Composable backpressure** — plug in external pressure signals with custom throttle policies -- **Cross-platform resource monitoring** — CPU, disk IO, and network throughput via `sysinfo` (Linux, macOS, Windows) -- **Network bandwidth pressure** — built-in `NetworkPressure` source throttles tasks when bandwidth is saturated -- **Retries** — automatic requeue of retryable failures with configurable limits -- **Progress reporting** — executor-reported and throughput-extrapolated progress -- **Lifecycle events** — broadcast events for UI integration (Tauri, etc.) -- **Typed payloads** — serialize/deserialize structured task data -- **Batch submission** — bulk enqueue in a single SQLite transaction -- **Graceful shutdown** — configurable drain timeout before force-cancellation -- **Task group concurrency** — limit concurrent tasks per named group (e.g., per S3 bucket) -- **Global pause/resume** — pause all work when the app is backgrounded -- **Type-keyed application state** — register multiple state types, inject pre- or post-build -- **Clone-friendly** — `Scheduler` is `Clone` via `Arc` for easy sharing -- **Serde on all public types** — ready for Tauri IPC - -For a detailed breakdown of every feature, see [docs/features.md](docs/features.md). +| If you want to... | Read | +|---|---| +| Understand what taskmill solves | [Why Taskmill](docs/why-taskmill.md) | +| Get running in 5 minutes | [Quick Start](docs/quick-start.md) | +| See it in a real Tauri app | [Guide: Tauri Upload Queue](docs/guides/tauri-upload-queue.md) | +| Look up a term | [Glossary](docs/glossary.md) | ## Documentation -| Guide | Description | -|-------|-------------| -| [Quick Start](docs/quick-start.md) | Installation, first executor, builder setup, and running the scheduler | -| [Features](docs/features.md) | Complete feature list with descriptions | -| [Priorities & Preemption](docs/priorities-and-preemption.md) | Priority levels, preemption mechanics, and throttle behavior | -| [IO Tracking & Backpressure](docs/io-and-backpressure.md) | IO budgets, resource monitoring, pressure sources, and throttle policies | -| [Persistence & Recovery](docs/persistence-and-recovery.md) | SQLite schema, crash recovery, deduplication, and history retention | -| [Progress Reporting](docs/progress-reporting.md) | Executor progress, extrapolation, dashboard snapshots, and lifecycle events | -| [Configuration](docs/configuration.md) | All configuration options for scheduler, store, sampler, and feature flags | -| [Query APIs](docs/query-apis.md) | Full `TaskStore` query reference for dashboards and debugging | +| Guide | What it covers | +|-------|----------------| +| [Quick Start](docs/quick-start.md) | Installation, first executor, builder setup, Tauri integration | +| [Priorities & Preemption](docs/priorities-and-preemption.md) | Priority levels, task groups, preemption, and throttle behavior | +| [IO & Backpressure](docs/io-and-backpressure.md) | IO budgets, resource monitoring, pressure sources, and tuning | +| [Progress & Events](docs/progress-and-events.md) | Progress reporting, lifecycle events, dashboard snapshots | +| [Persistence & Recovery](docs/persistence-and-recovery.md) | Crash recovery, deduplication, history retention | +| [Configuration](docs/configuration.md) | All options, recommended defaults, workload-specific tuning | +| [Query APIs](docs/query-apis.md) | TaskStore queries for dashboards, debugging, and analytics | +| [Design](docs/design.md) | Architecture decisions, extension points, thread safety | ## License diff --git a/docs/ARCHITECTURE.md b/docs/ARCHITECTURE.md deleted file mode 100644 index 0df657a..0000000 --- a/docs/ARCHITECTURE.md +++ /dev/null @@ -1,557 +0,0 @@ -# Taskmill Architecture - -Taskmill is an adaptive priority work scheduler with IO-aware concurrency and -SQLite persistence, designed for desktop apps (Tauri) and background services. - -## Module map - -``` -taskmill/ - src/ - lib.rs — public API re-exports - task.rs — TaskRecord, TaskSubmission, TaskError, TypedTask, etc. - priority.rs — Priority newtype (u8, lower = higher priority) - store.rs — TaskStore: SQLite persistence, atomic pop, queries, retention - registry.rs — TaskExecutor trait (RPITIT), TaskContext, TaskTypeRegistry - backpressure.rs — PressureSource trait, ThrottlePolicy, CompositePressure - scheduler/ - mod.rs — Scheduler, SchedulerBuilder, run loop, events, snapshot - gate.rs — DispatchGate trait, DefaultDispatchGate, IO budget check - dispatch.rs — ActiveTaskMap, spawn_task(), preemption - progress.rs — ProgressReporter, EstimatedProgress, throughput extrapolation - resource/ - mod.rs — ResourceSampler + ResourceReader traits, ResourceSnapshot - sampler.rs — EWMA-smoothed background loop, SmoothedReader - sysinfo_monitor.rs — SysinfoSampler via `sysinfo` crate (feature-gated) - migrations/ - 001_tasks.sql — tasks table, task_history table, indexes - 002_add_label.sql — adds human-readable label column -``` - -## Task lifecycle - -``` -Submit ──► Pending ──► Running ──► Completed (moved to task_history) - │ │ - │ ├──► Failed (moved to task_history, or retried) - │ │ - │ └──► Paused (preempted by higher-priority work) - │ │ - └─────────────────┘ (resumed when preemptors finish) -``` - -Active-queue states (`tasks` table): `pending`, `running`, `paused`, `waiting`. -Terminal states (`task_history` table): `completed`, `failed`. - -## Data flow - -```mermaid -flowchart TD - S["submit() / submit_batch()"] --> TS["TaskStore\n(INSERT OR IGNORE)"] - TS --> |SQLite| DB[(tasks table)] - DB --> SCH["Scheduler run loop"] - SCH --> |"tokio::spawn"| E1["Executor + TaskContext"] - SCH --> |"tokio::spawn"| E2["Executor + TaskContext"] - E1 --> CF["complete() / fail()"] - E2 --> CF - CF --> HIST[(task_history)] - CF --> PRUNE["maybe_prune()\n(amortised retention)"] - CF --> EVT["broadcast::Sender\n(SchedulerEvent)"] -``` - -## SQLite schema - -### `tasks` — active queue - -| Column | Purpose | -|-----------------------|----------------------------------------------------| -| `id` | `INTEGER PRIMARY KEY` — insertion order within tier| -| `task_type` | Executor lookup name (e.g. `"scan-l3"`) | -| `key` | `UNIQUE` — SHA-256 deduplication key | -| `label` | Human-readable display label (original key or type)| -| `priority` | `INTEGER NOT NULL` — 0 (highest) to 255 (lowest) | -| `status` | `TEXT` — `pending`, `running`, or `paused` | -| `payload` | `BLOB` — opaque, max 1 MiB, executor-defined | -| `expected_read_bytes` | Caller's disk read IO estimate (part of `IoBudget`)| -| `expected_write_bytes`| Caller's disk write IO estimate (part of `IoBudget`)| -| `expected_net_rx_bytes`| Caller's network RX estimate (part of `IoBudget`) | -| `expected_net_tx_bytes`| Caller's network TX estimate (part of `IoBudget`) | -| `retry_count` | Incremented on each retryable failure | -| `last_error` | Most recent error message (for diagnostics) | -| `started_at` | Set when popped; cleared on pause | - -A partial index `idx_tasks_pending` on `(status, priority ASC, id ASC) WHERE -status = 'pending'` covers the scheduler's hot path (`pop_next`), making -priority-ordered pops efficient regardless of how many running or paused tasks -sit in the table. - -### `task_history` — terminal records - -Completed and failed tasks are moved here atomically (delete from `tasks`, -insert into `task_history` in one transaction). Additional columns: - -| Column | Purpose | -|-----------------------|----------------------------------------------------| -| `actual_read_bytes` | Reported by executor on completion (part of `IoBudget`) | -| `actual_write_bytes` | Reported by executor on completion (part of `IoBudget`) | -| `actual_net_rx_bytes` | Reported by executor on completion (part of `IoBudget`) | -| `actual_net_tx_bytes` | Reported by executor on completion (part of `IoBudget`) | -| `completed_at` | Timestamp of completion or failure | -| `duration_ms` | Computed from `started_at` to `completed_at` | - -An index `idx_history_type_completed` on `(task_type, completed_at DESC)` -supports IO learning queries (`avg_throughput`, `history_stats`). - -### Connection pool - -Defaults to 16 connections (`StoreConfig::max_connections`). SQLite serialises -writes regardless, so this primarily benefits concurrent reads from multiple -Tauri commands and background tasks. - -### Retention policy - -`StoreConfig::retention_policy` controls automatic pruning of `task_history`: - -- `RetentionPolicy::MaxCount(n)` — keep at most N history records -- `RetentionPolicy::MaxAgeDays(n)` — keep records from the last N days - -Pruning is amortised: an `AtomicU64` completion counter triggers `maybe_prune()` -every `prune_interval` completions (default 100) rather than after every single -completion. Pruning errors are logged but never propagated — the task itself is -already committed. Manual pruning is available via `prune_history_by_count()` and -`prune_history_by_age()`. - -## Deduplication - -Key generation: `SHA-256(task_type + ":" + (explicit_key OR payload))`. The task -type is always incorporated so different types with identical payloads never -collide. - -Enforcement uses the `UNIQUE(key)` constraint with `INSERT OR IGNORE` — a -duplicate submission returns `SubmitOutcome::Duplicate`. The key stays occupied -while the task is active (including retries) and is freed when the task moves -to history. - -## Priority queue - -The priority queue lives entirely in SQLite. `pop_next()` is an atomic -`UPDATE ... RETURNING` that claims the highest-priority pending row: - -```sql -UPDATE tasks SET status = 'running', started_at = datetime('now') -WHERE id = ( - SELECT id FROM tasks WHERE status = 'pending' - ORDER BY priority ASC, id ASC LIMIT 1 -) -RETURNING * -``` - -`priority ASC` means lower numeric values are popped first (higher priority). -`id ASC` breaks ties by insertion order (FIFO within a tier). The partial index -makes this a single index scan. - -The `Priority` type is a `u8` newtype with named constants: - -| Constant | Value | Behaviour | -|--------------|-------|---------------------------------------| -| `REALTIME` | 0 | Never throttled, triggers preemption | -| `HIGH` | 64 | Throttled only under extreme pressure | -| `NORMAL` | 128 | Standard background work | -| `BACKGROUND` | 192 | Paused under moderate pressure | -| `IDLE` | 255 | Runs only when system is idle | - -`Ord` is reversed so `REALTIME > IDLE` semantically. Custom tiers are available -via `Priority::new(n)`. - -## Scheduler architecture - -The scheduler is split across four files: - -| File | Concern | -|----------------|---------------------------------------------------------------| -| `mod.rs` | Orchestration: run loop, submit, cancel, snapshot, builder | -| `gate.rs` | Admission control: backpressure + IO budget | -| `dispatch.rs` | Task lifecycle: active map, spawn, preemption | -| `progress.rs` | Progress reporting + throughput-based extrapolation | - -### Dispatch cycle - -```mermaid -flowchart TD - START["tick / notify"] --> PAUSED{"is_paused?"} - PAUSED -- yes --> WAIT - PAUSED -- no --> RESUME["Resume paused tasks\n(if no active preemptors)"] - RESUME --> CONC{"active < max_concurrency?"} - CONC -- no --> WAIT["Wait for next tick / notify"] - CONC -- yes --> PEEK["peek_next()\n(non-mutating)"] - PEEK -- empty --> WAIT - PEEK -- candidate --> GATE{"gate.admit()\nbackpressure + IO budget"} - GATE -- rejected --> WAIT - GATE -- admitted --> POP["pop_by_id()\n(atomic claim)"] - POP -- claimed --> REG{"Executor registered?"} - POP -- gone --> CONC - REG -- no --> FAIL["Fail immediately"] - REG -- yes --> SPAWN["spawn_task()"] - SPAWN --> CONC -``` - -The run loop wakes on two signals: - -1. **`Notify`** — triggered by `submit()`, `submit_batch()`, and `resume_all()`, - so newly enqueued work is picked up without waiting for the next tick. -2. **`poll_interval` timer** (default 500 ms) — fallback for paused-task - resumption and periodic housekeeping. - -Key design: the loop uses **peek-then-pop-by-id** rather than a bare `pop_next()`. -The gate inspects the candidate without mutating the queue; only after admission -does `pop_by_id()` atomically claim it. If another consumer claimed it in the -meantime, the loop simply retries. This eliminates the earlier race where a -popped-then-rejected task needed an explicit requeue step. - -Each stage independently halts dispatch: - -- **Concurrency** — hard cap via `max_concurrency` (`AtomicUsize`, adjustable at runtime) -- **DispatchGate** — pluggable admission (default: backpressure + IO budget) -- **Empty queue** — no pending tasks - -### Clone-friendly design - -`Scheduler` wraps all shared state in `Arc` and derives `Clone`: - -- Holds directly in `tauri::State` without extra `Arc` wrapping -- Cheap clones that share the underlying store, registry, and active map - -### Builder - -```rust -Scheduler::builder() - .store_path("tasks.db") - .executor("scan", Arc::new(ScanExecutor)) - .executor("exif", Arc::new(ExifExecutor)) - .pressure_source(Box::new(battery_pressure)) - .max_concurrency(8) - .shutdown_mode(ShutdownMode::Graceful(Duration::from_secs(30))) - .with_resource_monitoring() - .app_state(MyServices { http, db, cache }) - .build() - .await?; -``` - -The builder handles: opening the store, assembling the registry, composing -pressure sources, spawning the resource sampler, and wiring the `SmoothedReader`. -The lower-level `Scheduler::new()` remains for advanced use. - -## Dispatch gate (internal) - -The `DispatchGate` trait (`pub(crate)`) controls admission. The default -`DefaultDispatchGate` applies two checks: - -1. **Backpressure** — `ThrottlePolicy::should_throttle(priority, pressure)`. -2. **IO budget** — `has_io_headroom()`, described below. - -The trait also exposes `pressure()` and `pressure_breakdown()` (with default -no-op impls) so `Scheduler::snapshot()` can read backpressure state without -knowing the concrete gate type. - -## IO-aware scheduling - -### Expected vs actual IO - -Callers provide an `IoBudget` (disk read/write, network rx/tx) on submission. -Executors report actual IO via context methods on completion. The history table -stores both expected and actual values, enabling learning via `avg_throughput()` -and `history_stats()`. - -### IO budget heuristic - -When a `ResourceReader` is present, `has_io_headroom()` runs before each -dispatch: - -1. Read the latest EWMA-smoothed `ResourceSnapshot` (disk bytes/sec). -2. Sum expected IO across all currently running tasks. -3. Compute a 2-second budget window: `capacity = bytes_per_sec * 2.0`. -4. Defer if running IO exceeds 80% of capacity on either read or write axis. - -If no reader is configured the check is skipped (always allows dispatch). - -### Resource monitoring - -Two traits split sampling from consumption: - -- **`ResourceSampler`** — `sample() -> ResourceSnapshot`. Raw platform readings. -- **`ResourceReader`** — `latest() -> ResourceSnapshot`. Read-only, sync. - -`SmoothedReader` bridges them: the `run_sampler()` background loop calls -`sampler.sample()` at a configurable interval (default 1 s), applies EWMA -smoothing (alpha 0.3), and writes to the `SmoothedReader`. The scheduler reads -via `reader.latest()`, which uses `RwLock` so readers never block each other. - -The built-in `SysinfoSampler` (behind the `sysinfo-monitor` feature) provides -cross-platform CPU and disk IO via the `sysinfo` crate. - -## Backpressure - -### PressureSource trait - -```rust -pub trait PressureSource: Send + Sync + 'static { - fn pressure(&self) -> f32; // 0.0 (idle) to 1.0 (saturated) - fn name(&self) -> &str; -} -``` - -Implement for external signals: API rate, memory, queue depth, battery, etc. - -### CompositePressure - -Aggregates multiple sources. The composite value is the **max** across all — the -system is as pressured as its most constrained resource. `breakdown()` provides -per-source diagnostics. - -### ThrottlePolicy - -Default three-tier policy: - -| Priority range | Throttle threshold | -|-------------------|--------------------| -| BACKGROUND (192+) | > 50% pressure | -| NORMAL (128+) | > 75% pressure | -| HIGH / REALTIME | Never throttled | - -Custom policies via `ThrottlePolicy::new(thresholds)`. - -## Preemption - -When a task is submitted at or above `preempt_priority` (default `REALTIME`): - -1. All active tasks with strictly lower priority are cancelled - (`CancellationToken`) and moved to `paused` status in the store. -2. `Preempted` events are emitted. -3. On subsequent poll cycles, paused tasks are only resumed when no active - preemptors remain — this prevents a thrashing loop of pause/resume/re-preempt. - -Executors cooperate by checking `ctx.token().is_cancelled()` at yield points. An -executor that ignores cancellation continues running but is no longer tracked; -its completion or failure is still recorded normally. - -## Retry flow - -``` -Executor returns Err(TaskError) - └─ retryable: false? ──► move to task_history (failed) - └─ retryable: true? - └─ retry_count < max_retries? ──► status → pending, retry_count += 1 - └─ otherwise ──► move to task_history (failed) -``` - -- Retried tasks keep their original priority (no demotion). -- The dedup key remains occupied during retries. -- `max_retries` defaults to 3 (`SchedulerConfig`). - -## Event system - -`Scheduler::subscribe()` returns a `tokio::sync::broadcast::Receiver`: - -| Event | When | -|-------------|----------------------------------------------| -| `Dispatched(TaskEventHeader)` | Task popped and executor spawned | -| `Completed(TaskEventHeader)` | Task finished successfully | -| `Failed { header, error, will_retry }` | Task failed (includes retry flag) | -| `Preempted(TaskEventHeader)` | Task paused for higher-priority work | -| `Cancelled(TaskEventHeader)` | Task cancelled via `cancel()` | -| `Progress { header, percent, message }` | Progress update from executor | -| `Waiting { task_id, children_count }` | Parent entered waiting state | -| `Paused` | Scheduler globally paused | -| `Resumed` | Scheduler globally resumed | - -Task-specific variants share a `TaskEventHeader` containing `task_id`, -`task_type`, `key`, and `label`. Use `event.header()` to access it generically. - -All variants derive `Serialize`/`Deserialize`. - -## Progress reporting - -### Executor-reported - -Executors call `ctx.progress().report(percent, message)` or -`ctx.progress().report_fraction(completed, total, message)`. These emit -`SchedulerEvent::Progress` and update the active task map. - -### Throughput-extrapolated - -For tasks that don't report progress, `estimated_progress()` extrapolates from -elapsed time vs. the historical average duration for that task type. When a -partial report exists, the extrapolation blends historical and current throughput -for a more accurate estimate. - -`EstimatedProgress` provides `reported_percent`, `extrapolated_percent`, and a -unified `percent` (reported preferred over extrapolated). - -## Task type registry - -`TaskTypeRegistry` maps string names to executor implementations. The public -`TaskExecutor` trait uses RPITIT (`impl Future`) for ergonomic async; an internal -`ErasedExecutor` trait provides object-safe dynamic dispatch for storage. - -Duplicate registration panics — catches configuration errors at startup. When the -scheduler pops a task with no registered executor, it fails immediately with a -descriptive error. - -The registry is essential for crash recovery: after `recover_running()` resets -in-flight tasks to pending, the scheduler needs the registry to re-dispatch them. - -## Application state - -Executors often need shared services. Rather than capturing `Arc` per executor, -the scheduler provides a type-keyed `StateMap` that supports multiple state types: - -```rust -Scheduler::builder() - .app_state(MyServices { http, db, cache }) - .app_state(FeatureFlags { dark_mode: true }) - .build().await?; - -// In the executor: -let svc = ctx.state::().expect("state not set"); -let flags = ctx.state::().expect("flags not set"); -``` - -State flows: `SchedulerBuilder` collects `(TypeId, Arc)` entries → -assembled into `Arc` at build time → a `StateSnapshot` (lock-free -`HashMap` clone) is taken once per dispatch and placed in `TaskContext` → -executors call `ctx.state::()` which does a `TypeId` lookup + downcast. - -Libraries that embed a shared scheduler can inject their own state **after** -build via `scheduler.register_state(Arc::new(LibState { .. })).await`. This -is how shoebox injects `ScanAppState` into an externally-provided scheduler. - -This mirrors Axum's `State` / Tauri's `State` pattern. - -## Global pause / resume - -`pause_all()` sets an `AtomicBool` flag, cancels every running task's token, -moves them to paused status, and emits `Paused`. While paused the run loop skips -dispatch entirely. - -`resume_all()` clears the flag, wakes the run loop via `Notify`, and emits -`Resumed`. Paused tasks are picked up by the existing resumption logic on the -next cycle. - -`try_dispatch()` does **not** check the flag, so manual single-task dispatch -still works while globally paused. `SchedulerSnapshot::is_paused` reflects the -flag for UI integration. - -## Graceful shutdown - -`ShutdownMode` controls behaviour when the run loop's `CancellationToken` fires: - -- **`Hard`** (default) — cancel all running tasks immediately. -- **`Graceful(Duration)`** — stop dispatching, wait for running tasks to finish - (up to the timeout), then cancel stragglers. - -Both modes cancel the resource sampler's `CancellationToken`. - -## Crash recovery - -On `TaskStore::open()`, the store runs: - -```sql -UPDATE tasks SET status = 'pending', started_at = NULL WHERE status = 'running' -``` - -Any task mid-execution when the process died is reset to pending. This is safe -because executors should be idempotent (or check for partial work), the dedup key -stays occupied (no duplicates), and `retry_count` is preserved. - -## Thread safety - -- `Scheduler` — `Clone` via `Arc` -- `TaskStore` — `Clone` via `SqlitePool`; WAL journal mode for concurrent access -- `max_concurrency` — `AtomicUsize`, lock-free runtime adjustment -- `paused` — `AtomicBool` with `Release`/`Acquire` ordering -- `ActiveTaskMap` — `Arc>`, `Clone` -- `SmoothedReader` — `RwLock` so readers never block each other -- `TaskTypeRegistry` — immutable after startup, shared via `Arc` -- Application state — `Arc`, shared across all tasks -- Each spawned task gets its own `CancellationToken` -- All trait objects require `Send + Sync + 'static` - -## Feature flags - -- **`sysinfo-monitor`** (default) — enables `SysinfoSampler` for cross-platform - CPU and disk IO. Disable for mobile targets or when providing a custom sampler. - -Serde (`Serialize`/`Deserialize`) is always enabled on all public types. - -## Configuration reference - -### SchedulerConfig - -| Field | Default | Notes | -|--------------------------|---------------|------------------------------------| -| `max_concurrency` | 4 | Adjustable at runtime | -| `max_retries` | 3 | | -| `preempt_priority` | `REALTIME` | | -| `poll_interval` | 500 ms | Fallback; notify wakes sooner | -| `throughput_sample_size` | 20 | History rows for IO learning | -| `shutdown_mode` | `Hard` | | - -### StoreConfig - -| Field | Default | Notes | -|--------------------|---------|-------------------------------------------| -| `max_connections` | 16 | SQLite pool size | -| `retention_policy` | `None` | `MaxCount(n)` or `MaxAgeDays(n)` | -| `prune_interval` | 100 | Prune every N completions | - -### SamplerConfig - -| Field | Default | Notes | -|--------------|---------|--------------------------| -| `interval` | 1 s | Sample period | -| `ewma_alpha` | 0.3 | Smoothing factor (0–1) | - -## Tauri integration - -### State management - -```rust -app.manage(scheduler); // Scheduler is Clone — no Arc needed - -#[tauri::command] -async fn submit_task( - scheduler: tauri::State<'_, Scheduler>, -) -> Result { - scheduler.submit(&submission).await -} - -#[tauri::command] -async fn scheduler_status( - scheduler: tauri::State<'_, Scheduler>, -) -> Result { - scheduler.snapshot().await -} -``` - -### Event bridging - -```rust -let mut events = scheduler.subscribe(); -let handle = app_handle.clone(); -tokio::spawn(async move { - while let Ok(event) = events.recv().await { - handle.emit("taskmill-event", &event).unwrap(); - } -}); -``` - -### Error handling - -`StoreError` derives `Serialize`/`Deserialize`, so it can be returned directly -from Tauri commands without conversion. - -### Cross-platform - -Gate `sysinfo-monitor` for mobile: `default-features = false`. Provide a custom -`ResourceSampler` for iOS/Android if needed. Everything else (SQLite, scheduling, -events) works on all platforms. diff --git a/docs/configuration.md b/docs/configuration.md index adf0d4a..9801d6f 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -1,17 +1,38 @@ # Configuration +## Recommended defaults + +For most Tauri desktop apps, the defaults work well. Here's what you might want to change: + +```rust +use std::time::Duration; +use taskmill::{Scheduler, ShutdownMode, StoreConfig, RetentionPolicy}; + +let scheduler = Scheduler::builder() + .store_path("tasks.db") + .max_concurrency(8) // match your IO parallelism + .shutdown_mode(ShutdownMode::Graceful(Duration::from_secs(10))) + .with_resource_monitoring() + .store_config(StoreConfig { + retention_policy: Some(RetentionPolicy::MaxCount(10_000)), + ..Default::default() + }) + .build() + .await?; +``` + ## SchedulerConfig Controls scheduling behavior. Set via builder methods or pass directly to `Scheduler::new()`. -| Field | Type | Default | Description | -|-------|------|---------|-------------| -| `max_concurrency` | `usize` | 4 | Maximum concurrent running tasks. Adjustable at runtime via `set_max_concurrency()`. | -| `max_retries` | `i32` | 3 | Retry limit before a task is permanently failed. | -| `preempt_priority` | `Priority` | `REALTIME` (0) | Tasks at or above this priority trigger preemption of lower-priority work. | -| `poll_interval` | `Duration` | 500ms | Sleep between scheduler dispatch cycles. The scheduler also wakes on `Notify` signals. | -| `throughput_sample_size` | `i32` | 20 | Number of recent completions used for throughput-based progress extrapolation. | -| `shutdown_mode` | `ShutdownMode` | `Hard` | `Hard` cancels all tasks immediately. `Graceful(Duration)` waits up to the timeout. | +| Field | Type | Default | Description | Guidance | +|-------|------|---------|-------------|----------| +| `max_concurrency` | `usize` | 4 | Maximum concurrent running tasks. Adjustable at runtime via `set_max_concurrency()`. | Match your IO parallelism — 4–8 for disk-heavy, higher for network-heavy. | +| `max_retries` | `i32` | 3 | Retry limit before permanent failure. | Increase for flaky networks; decrease for tasks where retrying is wasteful. | +| `preempt_priority` | `Priority` | `REALTIME` (0) | Tasks at or above this priority trigger preemption. | Leave at `REALTIME` unless you need user-initiated tasks to preempt. | +| `poll_interval` | `Duration` | 500ms | Sleep between dispatch cycles. The scheduler also wakes immediately on submit. | Lower = more responsive but slightly more CPU. 250ms is fine for interactive apps. | +| `throughput_sample_size` | `i32` | 20 | Recent completions used for progress extrapolation. | More = smoother estimates but slower to adapt to changes in task behavior. | +| `shutdown_mode` | `ShutdownMode` | `Hard` | `Hard` cancels immediately. `Graceful(Duration)` waits for running tasks. | Always use `Graceful` for desktop apps to avoid data loss. | ### Builder methods @@ -33,13 +54,11 @@ let scheduler = Scheduler::builder() Controls the SQLite connection pool and history retention. -| Field | Type | Default | Description | -|-------|------|---------|-------------| -| `max_connections` | `u32` | 16 | SQLite connection pool size. | -| `retention_policy` | `Option` | `None` | Automatic history pruning. `MaxCount(n)` or `MaxAgeDays(n)`. | -| `prune_interval` | `u64` | 100 | Number of task completions between automatic prune runs. | - -### Builder method +| Field | Type | Default | Description | Guidance | +|-------|------|---------|-------------|----------| +| `max_connections` | `u32` | 16 | SQLite connection pool size. | Increase if you have many concurrent Tauri commands querying task state. | +| `retention_policy` | `Option` | `None` | Automatic history pruning. | Set this — without it, history grows without bound. `MaxCount(10_000)` is a good start. | +| `prune_interval` | `u64` | 100 | Completions between automatic prune runs. | Lower for apps that complete many tasks quickly; higher for slow-completing tasks. | ```rust use taskmill::{StoreConfig, RetentionPolicy}; @@ -57,14 +76,12 @@ let scheduler = Scheduler::builder() ## SamplerConfig -Controls the resource monitoring background loop. - -| Field | Type | Default | Description | -|-------|------|---------|-------------| -| `interval` | `Duration` | 1s | How often to sample system resources. | -| `ewma_alpha` | `f64` | 0.3 | EWMA smoothing factor. Higher = more responsive to changes, lower = smoother. | +Controls the resource monitoring background loop. Only relevant if you call `.with_resource_monitoring()` or provide a custom `ResourceSampler`. -### Builder method +| Field | Type | Default | Description | Guidance | +|-------|------|---------|-------------|----------| +| `interval` | `Duration` | 1s | How often to sample system resources. | 500ms for interactive apps; 2s for background services. | +| `ewma_alpha` | `f64` | 0.3 | Smoothing factor. Higher = more responsive, lower = smoother. | 0.2 for steady workloads, 0.5 for bursty workloads. See [IO & Backpressure](io-and-backpressure.md#ewma-smoothing). | ```rust use std::time::Duration; @@ -85,7 +102,7 @@ let scheduler = Scheduler::builder() | Variant | Behavior | |---------|----------| | `Hard` | Cancel all running tasks immediately when the scheduler stops. | -| `Graceful(Duration)` | Stop dispatching new tasks, wait for running tasks to complete (up to the timeout), then force-cancel any remaining. Stops the resource sampler afterward. | +| `Graceful(Duration)` | Stop dispatching new tasks, wait for running tasks to complete (up to the timeout), then force-cancel any remaining. | ## RetentionPolicy @@ -96,31 +113,98 @@ let scheduler = Scheduler::builder() ## Priority constants -| Constant | Value | Notes | -|----------|-------|-------| -| `Priority::REALTIME` | 0 | Highest. Never throttled. Triggers preemption. | -| `Priority::HIGH` | 64 | | -| `Priority::NORMAL` | 128 | Default for most tasks. | -| `Priority::BACKGROUND` | 192 | | -| `Priority::IDLE` | 255 | Lowest. | +| Constant | Value | Typical use | +|----------|-------|-------------| +| `Priority::REALTIME` | 0 | User-blocking, triggers preemption. | +| `Priority::HIGH` | 64 | User-initiated actions. | +| `Priority::NORMAL` | 128 | App-initiated work (default). | +| `Priority::BACKGROUND` | 192 | Maintenance tasks. | +| `Priority::IDLE` | 255 | Truly optional work. | Custom: `Priority::new(n)` for any `u8` value. +## Graceful shutdown + +When the scheduler stops (the `CancellationToken` passed to `run()` is cancelled): + +- **`Hard`** (default) — all running tasks are immediately cancelled. +- **`Graceful(Duration)`** — the scheduler stops dispatching new tasks, waits for running tasks to finish up to the timeout, then cancels any stragglers. + +Both modes stop the resource sampler. **For desktop apps, always use `Graceful` to avoid interrupting in-progress uploads or file operations.** + +## Application state + +Executors often need shared services (HTTP clients, database connections, caches). Rather than capturing `Arc` per executor, register state on the builder: + +```rust +let scheduler = Scheduler::builder() + .app_state(MyServices { http, db, cache }) + .app_state(FeatureFlags { dark_mode: true }) // multiple types can coexist + .build() + .await?; + +// In any executor: +let svc = ctx.state::().expect("state not registered"); +``` + +State is keyed by `TypeId` — each type has one instance, shared across all tasks. Libraries that embed a shared scheduler can inject their own state after build: + +```rust +scheduler.register_state(Arc::new(LibraryState { /* ... */ })).await; +``` + ## Feature flags | Feature | Default | Description | |---------|---------|-------------| -| `sysinfo-monitor` | Enabled | Cross-platform CPU and disk IO monitoring via `sysinfo`. Disable for mobile targets or custom samplers. | - -### Disabling platform monitoring +| `sysinfo-monitor` | Enabled | Cross-platform CPU, disk IO, and network monitoring via `sysinfo`. Disable for mobile targets or when using a custom sampler. | ```toml -[dependencies] -taskmill = { path = "crates/taskmill", default-features = false } +# Disable platform monitoring +taskmill = { version = "0.3", default-features = false } ``` When disabled, you can still provide a custom `ResourceSampler` via `.resource_sampler()`. +## Tuning for specific workloads + +### Desktop app with file processing + +```rust +Scheduler::builder() + .max_concurrency(4) // don't overwhelm the disk + .with_resource_monitoring() // auto-defer when disk is busy + .shutdown_mode(ShutdownMode::Graceful(Duration::from_secs(10))) + .store_config(StoreConfig { + retention_policy: Some(RetentionPolicy::MaxCount(10_000)), + ..Default::default() + }) +``` + +### Upload/download service + +```rust +Scheduler::builder() + .max_concurrency(16) // network tasks can run in parallel + .with_resource_monitoring() + .bandwidth_limit(50_000_000.0) // 50 MB/s cap + .group_concurrency("uploads", 4) // per-endpoint limits + .shutdown_mode(ShutdownMode::Graceful(Duration::from_secs(30))) +``` + +### Background indexer + +```rust +Scheduler::builder() + .max_concurrency(2) // stay out of the way + .with_resource_monitoring() + .sampler_config(SamplerConfig { + ewma_alpha: 0.2, // smooth — don't react to spikes + ..Default::default() + }) + .shutdown_mode(ShutdownMode::Hard) // indexing can restart +``` + ## Builder reference All `SchedulerBuilder` methods: diff --git a/docs/design.md b/docs/design.md new file mode 100644 index 0000000..8abd98d --- /dev/null +++ b/docs/design.md @@ -0,0 +1,176 @@ +# Design + +This document explains *why* taskmill is designed the way it is. If you're looking for *how to use* taskmill, start with the [Quick Start](quick-start.md). This page is for developers who want to understand the architecture to make better integration decisions or extend taskmill. + +## Design goals + +1. **Crash-safe by default.** Tasks are persisted to SQLite, not held in channels or memory. A killed process loses zero work. +2. **Desktop-friendly.** All public types are `Clone` and `Serialize`. The `Scheduler` fits directly into `tauri::State` with no extra wrapping. +3. **IO-aware.** Most task schedulers ignore throughput. Taskmill's dispatch gate checks disk and network capacity before spawning work, so your app doesn't freeze the system. +4. **Composable.** Pressure sources, resource samplers, throttle policies, and typed state are all pluggable. You can extend without forking. + +## Architecture overview + +### Module map + +``` +taskmill/src/ + lib.rs — public API re-exports + task/ — TaskRecord, TaskSubmission, TaskError, TypedTask + priority.rs — Priority newtype (u8, lower = higher priority) + store/ — TaskStore: SQLite persistence, atomic pop, queries + registry/ — TaskExecutor trait, TaskContext, TaskTypeRegistry + backpressure.rs — PressureSource trait, ThrottlePolicy, CompositePressure + scheduler/ + mod.rs — Scheduler, SchedulerBuilder, public API + run_loop.rs — main event loop, dispatch cycle + submit.rs — submit, submit_batch, cancellation + control.rs — pause/resume, concurrency limits + queries.rs — snapshot, active tasks, progress + gate.rs — DispatchGate, IO budget check + dispatch.rs — ActiveTaskMap, spawn_task, preemption + progress.rs — ProgressReporter, throughput extrapolation + event.rs — SchedulerEvent, SchedulerSnapshot + resource/ + mod.rs — ResourceSampler + ResourceReader traits + sampler.rs — EWMA-smoothed background loop + network_pressure.rs — NetworkPressure source + sysinfo_monitor.rs — SysinfoSampler (feature-gated) +``` + +### Task lifecycle + +``` +Submit ──► Pending ──► Running ──► Completed (moved to task_history) + │ │ + │ ├──► Failed (moved to task_history, or retried) + │ │ + │ └──► Paused (preempted by higher-priority work) + │ │ + └─────────────────┘ (resumed when preemptors finish) +``` + +Active states live in the `tasks` table. Terminal states (`completed`, `failed`) are atomically moved to `task_history`. + +### Data flow + +```mermaid +flowchart TD + S["submit() / submit_batch()"] --> TS["TaskStore\n(INSERT OR IGNORE)"] + TS --> |SQLite| DB[(tasks table)] + DB --> SCH["Scheduler run loop"] + SCH --> |"tokio::spawn"| E1["Executor + TaskContext"] + SCH --> |"tokio::spawn"| E2["Executor + TaskContext"] + E1 --> CF["complete() / fail()"] + E2 --> CF + CF --> HIST[(task_history)] + CF --> PRUNE["maybe_prune()\n(amortised retention)"] + CF --> EVT["broadcast::Sender\n(SchedulerEvent)"] +``` + +## Why SQLite? + +- **No external dependencies.** No Redis, no Postgres, no Docker. Just a file on disk. +- **Atomic operations.** `UPDATE ... RETURNING` lets us pop tasks from the queue without race conditions. `INSERT OR IGNORE` handles dedup atomically. +- **WAL for concurrent reads.** Multiple Tauri commands can query task status while the scheduler is dispatching. +- **Portable.** Works on Linux, macOS, Windows, and mobile platforms. + +The tradeoff is that SQLite serializes writes, so taskmill isn't suitable for extremely high-throughput scenarios (thousands of submissions per second). For desktop apps and background services, this is never a bottleneck. + +## Why peek-then-pop? + +The scheduler's dispatch loop uses a two-step approach: first *peek* at the next candidate (non-mutating), then *pop by ID* (atomic claim) only if the dispatch gate admits it. + +```mermaid +flowchart TD + PEEK["peek_next()\n(non-mutating)"] --> GATE{"gate.admit()\nbackpressure + IO budget"} + GATE -- rejected --> WAIT["Wait for next cycle"] + GATE -- admitted --> POP["pop_by_id()\n(atomic claim)"] + POP -- claimed --> SPAWN["spawn_task()"] + POP -- gone --> PEEK +``` + +This eliminates a race from the earlier design: if we popped first and then the gate rejected the task, we'd need to explicitly requeue it — which introduced edge cases around priority ordering and dedup key states. Peek-then-pop keeps the queue state clean. + +## How IO gating works + +When a `ResourceReader` is present, the scheduler checks IO headroom before dispatching: + +1. Read the latest EWMA-smoothed disk throughput (bytes/sec). +2. Sum expected IO across all currently running tasks. +3. Compute a 2-second capacity window: `capacity = throughput * 2.0`. +4. Defer if running IO would exceed 80% of capacity on either read or write axis. + +The 2-second window and 80% threshold are conservative defaults. They leave headroom for other applications and OS operations while still allowing taskmill to utilize most of the available throughput. + +If no resource reader is configured, the check is skipped — dispatch is purely by priority and concurrency. + +## Thread safety model + +Taskmill is designed for concurrent access from multiple async tasks and Tauri commands: + +| Component | Mechanism | Notes | +|-----------|-----------|-------| +| `Scheduler` | `Clone` via `Arc` | Cheap clones, no extra wrapping needed for Tauri | +| `TaskStore` | `Clone` via `SqlitePool` | WAL mode for concurrent reads | +| `max_concurrency` | `AtomicUsize` | Lock-free runtime adjustment | +| `paused` | `AtomicBool` | Release/Acquire ordering | +| `ActiveTaskMap` | `Arc>` | Holds running task metadata | +| `SmoothedReader` | `RwLock` | Readers never block each other | +| `TaskTypeRegistry` | `Arc`, immutable after build | No synchronization needed | +| Application state | `Arc` | Shared across all tasks | + +Each spawned task gets its own `CancellationToken`. All trait objects require `Send + Sync + 'static`. + +## Extension points + +Taskmill is designed to be extended without forking: + +### Custom pressure sources + +Implement `PressureSource` to feed any signal into the backpressure system. See [IO & Backpressure](io-and-backpressure.md#pressure-sources). + +```rust +pub trait PressureSource: Send + Sync + 'static { + fn pressure(&self) -> f32; // 0.0 (idle) to 1.0 (saturated) + fn name(&self) -> &str; +} +``` + +### Custom resource samplers + +Implement `ResourceSampler` for platforms where `sysinfo` doesn't work (containers, cgroups, mobile). See [IO & Backpressure](io-and-backpressure.md#advanced-custom-samplers). + +### Typed tasks + +Implement `TypedTask` on your structs for compile-time type safety on payloads, priorities, and IO budgets. See [Quick Start](quick-start.md#typed-tasks). + +### Application state injection + +Register shared services at build time or inject library-specific state after build. See [Configuration](configuration.md#application-state). + +## Dispatch cycle + +The run loop wakes on two signals: + +1. **`Notify`** — triggered by `submit()`, `submit_batch()`, and `resume_all()`. Newly enqueued work is picked up immediately. +2. **`poll_interval` timer** (default 500ms) — fallback for paused-task resumption and periodic housekeeping. + +Each cycle, the loop: + +1. Checks if the scheduler is globally paused. +2. Resumes paused tasks if no active preemptors remain. +3. While `active_count < max_concurrency`: peek the next candidate, check the dispatch gate, pop-by-id if admitted, spawn the executor. +4. Sleep until the next signal. + +## Retry flow + +``` +Executor returns Err(TaskError) + └─ retryable: false? ──► move to task_history (failed) + └─ retryable: true? + └─ retry_count < max_retries? ──► status → pending, retry_count += 1 + └─ otherwise ──► move to task_history (failed) +``` + +Retried tasks keep their original priority and dedup key. `max_retries` defaults to 3. diff --git a/docs/features.md b/docs/features.md deleted file mode 100644 index 587921d..0000000 --- a/docs/features.md +++ /dev/null @@ -1,125 +0,0 @@ -# Features - -A complete list of taskmill's capabilities. - -## Persistence - -- **SQLite-backed queue** — all tasks are stored in SQLite with WAL journal mode. Tasks survive process restarts, crashes, and power loss. -- **Crash recovery** — tasks left in `running` state during a crash are automatically reset to `pending` on startup. Dedup keys remain occupied so no duplicates sneak in during recovery. -- **Connection pooling** — configurable pool size (default 16) for concurrent reads. - -## Scheduling - -- **256-level priority queue** — priorities range from 0 (highest, `REALTIME`) to 255 (lowest, `IDLE`). Five named tiers are provided: `REALTIME`, `HIGH`, `NORMAL`, `BACKGROUND`, `IDLE`. Custom values like `Priority::new(100)` work too. -- **FIFO within tier** — tasks at the same priority are dispatched in insertion order. -- **Atomic dispatch** — pop operations use `UPDATE ... WHERE id = (SELECT ...) RETURNING *` for race-free claiming with no lost tasks. -- **Runtime-adjustable concurrency** — change `max_concurrency` at runtime via `set_max_concurrency()`. - -## Task Groups - -- **Per-group concurrency** — assign tasks to named groups via `.group(key)` on `TaskSubmission` or `TypedTask::group_key()`. The scheduler limits how many tasks in the same group can run concurrently. -- **Configurable limits** — set per-group limits at build time with `.group_concurrency(group, limit)` or a default for all groups with `.default_group_concurrency(limit)`. -- **Runtime-adjustable** — change limits at runtime via `set_group_limit()`, `remove_group_limit()`, and `set_default_group_concurrency()`. -- **Independent of global concurrency** — group limits are checked *in addition to* `max_concurrency`. A task must pass both the global and group gate to be dispatched. - -## Deduplication - -- **Key-based dedup** — each task gets a SHA-256 key derived from `task_type + payload` (or an explicit key). A `UNIQUE(key)` constraint with `INSERT OR IGNORE` prevents duplicate submissions. -- **Type-scoped keys** — the task type is always part of the hash, so different task types never collide even with identical payloads. -- **Lifecycle-aware** — keys are occupied while a task is pending, running, paused, or retrying. The key is freed when the task moves to history (completed or failed). -- **Batch-safe** — deduplication applies within `submit_batch()` transactions too. - -## IO Awareness - -- **Expected/actual IO tracking** — submit an `IoBudget` (disk read/write, network rx/tx); executors report actual bytes on completion. -- **Network IO tracking** — tasks can declare expected network RX/TX bytes via `IoBudget::net()` and report actuals via `ctx.record_net_rx_bytes()` / `ctx.record_net_tx_bytes()`. -- **IO budget gating** — the scheduler compares running task IO estimates against EWMA-smoothed system throughput. New work is deferred when cumulative IO would exceed 80% of observed disk capacity. -- **Learning from history** — `avg_throughput()` and `history_stats()` compute per-type IO averages from actual completions, enabling callers to refine estimates over time. - -## Resource Monitoring - -- **Cross-platform** — CPU, disk IO, and network throughput via `sysinfo` on Linux, macOS, and Windows. Feature-gated under `sysinfo-monitor` (enabled by default). -- **EWMA smoothing** — raw samples are smoothed with an exponentially weighted moving average (alpha=0.3, configurable) to avoid spiky readings. -- **Two-trait design** — `ResourceSampler` (raw platform readings) and `ResourceReader` (smoothed snapshots) are separated for testability and custom implementations. -- **Custom samplers** — disable the `sysinfo-monitor` feature and provide your own `ResourceSampler` for containers, cgroups, or mobile platforms. -- **Network bandwidth pressure** — built-in `NetworkPressure` source maps observed RX+TX throughput against a configurable bandwidth cap to backpressure. Enable via `.bandwidth_limit(bytes_per_sec)` on the builder. - -## Backpressure - -- **Composable pressure sources** — implement the `PressureSource` trait to expose a `0.0..=1.0` signal from any source (API load, memory, battery, queue depth). `CompositePressure` aggregates sources; the aggregate is the maximum across all. -- **Throttle policies** — `ThrottlePolicy` maps `(priority, pressure)` to dispatch decisions. The default three-tier policy throttles `BACKGROUND` tasks at >50% pressure, `NORMAL` at >75%, and never throttles `HIGH` or `REALTIME`. -- **Custom policies** — define your own thresholds for fine-grained control. - -## Preemption - -- **Priority-based preemption** — when a task at or above `preempt_priority` (default: `REALTIME`) is submitted, all lower-priority running tasks are cancelled and paused. -- **Token-based cancellation** — preempted tasks have their `CancellationToken` triggered. Executors should check `token.is_cancelled()` at yield points. -- **Anti-thrash protection** — paused tasks only resume when no active preemptors remain. - -## Retries - -- **Automatic requeue** — retryable failures (`TaskError::retryable(msg)`) are requeued at the same priority with `retry_count += 1`. -- **Configurable limit** — `max_retries` (default 3) controls how many times a task can be retried before permanent failure. -- **Dedup preserved** — the key stays occupied during retries, preventing duplicate submission of in-progress work. - -## Progress Reporting - -- **Executor-reported progress** — report percentage or fraction-based progress via `ctx.progress().report()` or `ctx.progress().report_fraction()`. -- **Throughput-based extrapolation** — for tasks without explicit reports, the scheduler extrapolates progress from historical average duration, capped at 99% to avoid false completion signals. -- **Event-driven** — progress updates are emitted as `SchedulerEvent::Progress` for real-time UI updates. - -## Lifecycle Events - -- **Broadcast channel** — subscribe via `scheduler.subscribe()` to receive `SchedulerEvent` variants: `Dispatched`, `Completed`, `Failed`, `Preempted`, `Cancelled`, `Progress`, `Waiting`, `Paused`, `Resumed`. Task-specific variants share a `TaskEventHeader` with `task_id`, `task_type`, `key`, and `label`. -- **Tauri-ready** — all events are `Serialize`, designed for direct bridging to frontend via `app_handle.emit()`. - -## Task Management - -- **Task cancellation** — cancel running, pending, or paused tasks via `scheduler.cancel(task_id)`. -- **Global pause/resume** — `pause_all()` stops dispatch and pauses running tasks; `resume_all()` resumes on the next cycle. Emits events for UI integration. -- **Task lookup by dedup key** — `task_lookup()` searches both active and history tables for a task matching a given type and dedup input. - -## Typed Payloads - -- **Builder-style submission** — `TaskSubmission::new(type).payload_json(&data).expected_io(budget)` for ergonomic construction with serialization. Serialization errors are deferred to submit time. Use `.label("display name")` to set a human-readable display label independent of the dedup key. -- **Type-safe deserialization** — `ctx.payload::()?` in executors for zero-boilerplate extraction. -- **TypedTask trait** — define `TASK_TYPE`, default priority, expected IO, key, and label on your struct. Submit with `scheduler.submit_typed()` and deserialize with `ctx.payload::()`. - -## Child Tasks - -- **Hierarchical execution** — spawn child tasks from an executor via `ctx.spawn_child()`. The parent enters a `waiting` state and resumes for finalization after all children complete. -- **Two-phase execution** — implement `TaskExecutor::finalize()` for assembly work after children finish (e.g. `CompleteMultipartUpload`). -- **Fail-fast** — when enabled (default), the first child failure cancels siblings and fails the parent immediately. - -## Batch Operations - -- **Bulk enqueue** — `submit_batch()` wraps many inserts in a single SQLite transaction. Returns `Vec` indicating whether each was inserted, upgraded, requeued, or deduplicated. - -## Graceful Shutdown - -- **Hard mode** (default) — immediately cancels all running tasks. -- **Graceful mode** — stops dispatching, waits for running tasks up to a configurable timeout, then force-cancels stragglers. - -## Application State - -- **Type-keyed state map** — register multiple state types on the builder via `.app_state()` / `.app_state_arc()`. Each type is keyed by `TypeId`; access from any executor via `ctx.state::()`. -- **Post-build injection** — call `scheduler.register_state(arc)` after build to let libraries inject their own state into a shared scheduler. -- **Arc-based sharing** — state is wrapped in `Arc` internally; all tasks share the same instance. - -## History & Pruning - -- **Automatic retention** — configure `RetentionPolicy::MaxCount(n)` or `RetentionPolicy::MaxAgeDays(n)` for automatic history pruning. -- **Amortized pruning** — pruning runs every N completions (default 100, configurable) to avoid per-task overhead. -- **Manual pruning** — `prune_history_by_count()` and `prune_history_by_age()` for on-demand cleanup. - -## Dashboard - -- **Single-call snapshot** — `scheduler.snapshot()` returns a serializable `SchedulerSnapshot` with running tasks, queue depths, progress estimates, pressure readings, and concurrency limits. -- **Designed for Tauri commands** — return the snapshot directly from a `#[tauri::command]` handler. - -## Ergonomics - -- **Builder pattern** — `Scheduler::builder()` provides fluent construction with sensible defaults. -- **Clone-friendly** — `Scheduler` is `Clone` via `Arc` for easy sharing in Tauri state and across async tasks. -- **Serde on all public types** — every public struct and enum derives `Serialize`/`Deserialize` for Tauri IPC. -- **Serializable errors** — `StoreError` is serializable for direct use in Tauri command returns. diff --git a/docs/glossary.md b/docs/glossary.md new file mode 100644 index 0000000..e53d823 --- /dev/null +++ b/docs/glossary.md @@ -0,0 +1,17 @@ +# Glossary + +Quick reference for terms used throughout the taskmill documentation. + +| Term | Definition | +|------|------------| +| **Backpressure** | Slowing down new work when the system is already busy. Taskmill uses [pressure sources](io-and-backpressure.md#backpressure-external-pressure-signals) to detect load and [throttle policies](priorities-and-preemption.md#throttle-behavior) to decide which tasks to defer. | +| **Deduplication (dedup)** | Preventing the same task from being queued twice. Taskmill generates a SHA-256 key from the task type and payload; a second submission with the same key is silently ignored. See [Persistence & Recovery](persistence-and-recovery.md#deduplication). | +| **Dispatch** | Moving a task from "waiting in line" (pending) to "actively running." The scheduler dispatches tasks in priority order, subject to concurrency limits and backpressure. | +| **EWMA** | Exponentially Weighted Moving Average — a smoothing technique that gives recent measurements more weight than old ones. Taskmill uses EWMA to smooth resource readings so the scheduler doesn't overreact to momentary spikes. See [IO & Backpressure](io-and-backpressure.md#ewma-smoothing). | +| **Executor** | Your code that performs the actual work for a task type. Implements the `TaskExecutor` trait. See [Quick Start](quick-start.md#implement-an-executor). | +| **IO budget** | An estimate of how many bytes a task will read and write (disk and/or network), submitted alongside the task. The scheduler uses IO budgets to avoid overwhelming the disk. See [IO & Backpressure](io-and-backpressure.md#io-budgets-telling-the-scheduler-what-to-expect). | +| **Preemption** | Pausing lower-priority work so higher-priority work can run immediately. Preempted tasks resume automatically once the urgent work finishes. See [Priorities & Preemption](priorities-and-preemption.md#preemption). | +| **Pressure source** | Anything that signals the system is busy — disk IO, network throughput, memory usage, API rate limits, battery level. Returns a value from 0.0 (idle) to 1.0 (saturated). See [IO & Backpressure](io-and-backpressure.md#pressure-sources). | +| **Task group** | A named set of tasks that share a concurrency limit. For example, you might limit uploads to a specific S3 bucket to 3 at a time. See [Priorities & Preemption](priorities-and-preemption.md#task-groups). | +| **Throttle policy** | Rules that map system pressure to dispatch decisions. The default policy defers background tasks when pressure exceeds 50% and normal tasks when it exceeds 75%, but never blocks high-priority work. See [Priorities & Preemption](priorities-and-preemption.md#throttle-behavior). | +| **Typed task** | A struct that implements the `TypedTask` trait, giving you compile-time type safety for task payloads, priorities, and IO budgets instead of stringly-typed submissions. See [Quick Start](quick-start.md#typed-tasks). | diff --git a/docs/guides/background-service.md b/docs/guides/background-service.md new file mode 100644 index 0000000..7af9bc0 --- /dev/null +++ b/docs/guides/background-service.md @@ -0,0 +1,189 @@ +# Guide: Background Processing Service + +This guide walks through using taskmill in a background service (daemon, CLI tool, or server process) that processes files. Unlike the [Tauri guide](tauri-upload-queue.md), there's no UI — the focus is on signal handling, container-friendly resource monitoring, and server-oriented tuning. + +## What we're building + +A service that: +- Watches a directory for new images +- Queues processing tasks (thumbnail generation, EXIF extraction) +- Prioritizes by file type (RAW files get `BACKGROUND`, JPEGs get `NORMAL`) +- Monitors disk IO to avoid saturating the system +- Shuts down gracefully on SIGTERM + +## Define the task + +```rust +use serde::{Serialize, Deserialize}; +use taskmill::{TypedTask, IoBudget, Priority}; + +#[derive(Serialize, Deserialize)] +struct ProcessImageTask { + path: String, + file_size: u64, + is_raw: bool, +} + +impl TypedTask for ProcessImageTask { + const TASK_TYPE: &'static str = "process-image"; + + fn expected_io(&self) -> IoBudget { + // RAW files are much larger + let write_estimate = if self.is_raw { self.file_size / 10 } else { self.file_size / 4 }; + IoBudget::disk(self.file_size as i64, write_estimate as i64) + } + + fn priority(&self) -> Priority { + if self.is_raw { + Priority::BACKGROUND // RAW processing is slow; don't block JPEGs + } else { + Priority::NORMAL + } + } + + fn key(&self) -> Option { + Some(self.path.clone()) + } +} +``` + +## Implement the executor + +```rust +use std::sync::Arc; +use taskmill::{TaskExecutor, TaskContext, TaskError}; + +struct ImageProcessor; + +impl TaskExecutor for ImageProcessor { + async fn execute<'a>( + &'a self, ctx: &'a TaskContext, + ) -> Result<(), TaskError> { + let task: ProcessImageTask = ctx.payload()?; + + // Read the source image + let data = tokio::fs::read(&task.path).await + .map_err(|e| TaskError::permanent(format!("can't read: {e}")))?; + ctx.record_read_bytes(data.len() as i64); + + if ctx.token().is_cancelled() { + return Err(TaskError::retryable("preempted")); + } + + // Generate thumbnail + let thumb = generate_thumbnail(&data) + .map_err(|e| TaskError::retryable(format!("processing failed: {e}")))?; + + let thumb_path = thumbnail_path(&task.path); + tokio::fs::write(&thumb_path, &thumb).await + .map_err(|e| TaskError::retryable(format!("can't write: {e}")))?; + ctx.record_write_bytes(thumb.len() as i64); + + ctx.progress().report(1.0, Some("done".into())); + Ok(()) + } +} +``` + +## Set up the service + +```rust +use std::sync::Arc; +use std::time::Duration; +use tokio_util::sync::CancellationToken; +use taskmill::{Scheduler, ShutdownMode, StoreConfig, RetentionPolicy, SamplerConfig}; + +#[tokio::main] +async fn main() { + tracing_subscriber::init(); + + let scheduler = Scheduler::builder() + .store_path("/var/lib/myservice/tasks.db") + .executor("process-image", Arc::new(ImageProcessor)) + .max_concurrency(4) + .max_retries(5) + .with_resource_monitoring() + .sampler_config(SamplerConfig { + ewma_alpha: 0.2, // smooth — don't overreact to spikes + interval: Duration::from_secs(2), + }) + .shutdown_mode(ShutdownMode::Graceful(Duration::from_secs(30))) + .store_config(StoreConfig { + retention_policy: Some(RetentionPolicy::MaxAgeDays(30)), + ..Default::default() + }) + .build() + .await + .unwrap(); + + // Log events + let mut events = scheduler.subscribe(); + tokio::spawn(async move { + while let Ok(event) = events.recv().await { + tracing::info!(?event, "scheduler event"); + } + }); + + // Watch for new files and submit tasks + let sched = scheduler.clone(); + tokio::spawn(async move { + watch_directory("/data/incoming", sched).await; + }); + + // Shut down gracefully on SIGTERM + let token = CancellationToken::new(); + let shutdown_token = token.clone(); + tokio::spawn(async move { + tokio::signal::ctrl_c().await.unwrap(); + tracing::info!("shutting down..."); + shutdown_token.cancel(); + }); + + scheduler.run(token).await; + tracing::info!("shutdown complete"); +} +``` + +## Custom resource sampler for containers + +If your service runs in a container, the built-in `sysinfo` sampler may not reflect cgroup limits. Provide a custom sampler: + +```rust +use taskmill::{ResourceSampler, ResourceSnapshot}; + +struct CgroupSampler; + +impl ResourceSampler for CgroupSampler { + fn sample(&mut self) -> ResourceSnapshot { + ResourceSnapshot { + cpu_usage: read_cgroup_cpu(), + io_read_bytes_per_sec: read_blkio_read(), + io_write_bytes_per_sec: read_blkio_write(), + net_rx_bytes_per_sec: 0.0, + net_tx_bytes_per_sec: 0.0, + } + } +} + +// In the builder: +Scheduler::builder() + .resource_sampler(Box::new(CgroupSampler)) + // ... +``` + +Disable the default sampler in `Cargo.toml`: + +```toml +taskmill = { version = "0.3", default-features = false } +``` + +## Key differences from desktop + +| Concern | Desktop (Tauri) | Background service | +|---------|----------------|-------------------| +| Event bridging | `app_handle.emit()` to frontend | `tracing::info!()` to logs | +| Shutdown | App close event | SIGTERM / SIGINT | +| Resource monitoring | Built-in `sysinfo` | Custom sampler for cgroups | +| Concurrency | Low (4–8) to keep UI responsive | Higher (8–16) for throughput | +| Smoothing | Default (0.3) — responsive | Lower (0.2) — stable | +| Retention | `MaxCount` — keep N recent | `MaxAgeDays` — time-based cleanup | diff --git a/docs/guides/tauri-upload-queue.md b/docs/guides/tauri-upload-queue.md new file mode 100644 index 0000000..5af5395 --- /dev/null +++ b/docs/guides/tauri-upload-queue.md @@ -0,0 +1,290 @@ +# Guide: Tauri Upload Queue + +This guide walks through building a file upload queue in a Tauri desktop app using taskmill. By the end, you'll have an app that queues file uploads, shows progress in the frontend, lets users prioritize urgent uploads, and doesn't saturate the network. + +## What we're building + +A Tauri app where: +- Users drag files to upload. Each file becomes a task in the queue. +- A progress bar shows upload status for each file. +- Users can right-click to prioritize an upload ("upload this next"). +- The scheduler limits concurrent uploads and backs off when bandwidth is saturated. +- If the app crashes, pending uploads resume on restart. + +## Setup + +Add taskmill to your Tauri app's `Cargo.toml`: + +```toml +[dependencies] +taskmill = "0.3" +serde = { version = "1", features = ["derive"] } +serde_json = "1" +tokio-util = "0.7" +``` + +## Define the upload task + +Use `TypedTask` to define the upload payload with its IO budget and priority: + +```rust +use serde::{Serialize, Deserialize}; +use taskmill::{TypedTask, IoBudget, Priority}; + +#[derive(Serialize, Deserialize)] +struct UploadTask { + file_path: String, + file_size: u64, + bucket: String, +} + +impl TypedTask for UploadTask { + const TASK_TYPE: &'static str = "upload"; + + fn expected_io(&self) -> IoBudget { + // Read from disk, write to network + IoBudget::new(self.file_size as i64, 0, 0, self.file_size as i64) + } + + fn priority(&self) -> Priority { + Priority::NORMAL + } + + fn group_key(&self) -> Option { + // Limit concurrent uploads per bucket + Some(self.bucket.clone()) + } + + fn key(&self) -> Option { + // Dedup by file path — uploading the same file twice is a no-op + Some(self.file_path.clone()) + } + + fn label(&self) -> Option { + // Human-readable name for the UI + let filename = std::path::Path::new(&self.file_path) + .file_name() + .map(|n| n.to_string_lossy().to_string()) + .unwrap_or_else(|| self.file_path.clone()); + Some(filename) + } +} +``` + +## Implement the executor + +The executor does the actual upload work. It reports progress and checks for preemption between chunks. + +```rust +use std::sync::Arc; +use taskmill::{TaskExecutor, TaskContext, TaskError}; + +struct UploadExecutor; + +impl TaskExecutor for UploadExecutor { + async fn execute<'a>( + &'a self, ctx: &'a TaskContext, + ) -> Result<(), TaskError> { + let task: UploadTask = ctx.payload()?; + + let file = tokio::fs::read(&task.file_path).await + .map_err(|e| TaskError::permanent(format!("can't read file: {e}")))?; + + ctx.record_read_bytes(file.len() as i64); + + let chunk_size = 5 * 1024 * 1024; // 5 MB chunks + let chunks: Vec<_> = file.chunks(chunk_size).collect(); + + for (i, chunk) in chunks.iter().enumerate() { + // Check for preemption before each chunk + if ctx.token().is_cancelled() { + return Err(TaskError::retryable("preempted")); + } + + // Upload the chunk (your upload logic here) + upload_chunk(&task.bucket, chunk).await + .map_err(|e| TaskError::retryable(format!("upload failed: {e}")))?; + + ctx.record_net_tx_bytes(chunk.len() as i64); + ctx.progress().report_fraction(i + 1, chunks.len(), Some("uploading".into())); + } + + Ok(()) + } +} +``` + +## Wire up the scheduler + +Build the scheduler in your Tauri setup with bandwidth limiting and per-bucket concurrency: + +```rust +use std::sync::Arc; +use std::time::Duration; +use tauri::Manager; +use tokio_util::sync::CancellationToken; +use taskmill::{Scheduler, ShutdownMode, StoreConfig, RetentionPolicy}; + +fn main() { + tauri::Builder::default() + .setup(|app| { + let app_dir = app.path().app_data_dir().unwrap(); + let db_path = app_dir.join("tasks.db"); + + let scheduler = tauri::async_runtime::block_on(async { + Scheduler::builder() + .store_path(db_path.to_str().unwrap()) + .executor("upload", Arc::new(UploadExecutor)) + .max_concurrency(8) + .group_concurrency("my-bucket", 3) // max 3 uploads to this bucket + .bandwidth_limit(50_000_000.0) // 50 MB/s cap + .with_resource_monitoring() + .shutdown_mode(ShutdownMode::Graceful(Duration::from_secs(30))) + .store_config(StoreConfig { + retention_policy: Some(RetentionPolicy::MaxCount(10_000)), + ..Default::default() + }) + .build() + .await + .unwrap() + }); + + // Bridge events to the frontend + setup_events(app, &scheduler); + + // Start the scheduler loop + let token = CancellationToken::new(); + let sched = scheduler.clone(); + tauri::async_runtime::spawn(async move { + sched.run(token).await; + }); + + // Store in Tauri state — Scheduler is Clone, no Arc needed + app.manage(scheduler); + + Ok(()) + }) + .invoke_handler(tauri::generate_handler![ + submit_upload, + prioritize_upload, + cancel_upload, + scheduler_status, + pause_all, + resume_all, + ]) + .run(tauri::generate_context!()) + .expect("error while running tauri application"); +} +``` + +## Expose commands to the frontend + +```rust +use taskmill::{Scheduler, SchedulerSnapshot, StoreError, Priority, TaskSubmission}; + +#[tauri::command] +async fn submit_upload( + scheduler: tauri::State<'_, Scheduler>, + file_path: String, + file_size: u64, + bucket: String, +) -> Result { + let task = UploadTask { file_path, file_size, bucket }; + let outcome = scheduler.submit_typed(&task).await?; + Ok(format!("{:?}", outcome)) +} + +#[tauri::command] +async fn prioritize_upload( + scheduler: tauri::State<'_, Scheduler>, + file_path: String, + file_size: u64, + bucket: String, +) -> Result { + // Re-submit at HIGH priority — dedup will upgrade the existing task + let sub = TaskSubmission::new("upload") + .payload_json(&UploadTask { file_path, file_size, bucket }) + .priority(Priority::HIGH); + let outcome = scheduler.submit(&sub).await?; + Ok(format!("{:?}", outcome)) +} + +#[tauri::command] +async fn cancel_upload( + scheduler: tauri::State<'_, Scheduler>, + task_id: i64, +) -> Result { + scheduler.cancel(task_id).await +} + +#[tauri::command] +async fn scheduler_status( + scheduler: tauri::State<'_, Scheduler>, +) -> Result { + scheduler.snapshot().await +} + +#[tauri::command] +async fn pause_all(scheduler: tauri::State<'_, Scheduler>) -> Result<(), StoreError> { + scheduler.pause_all().await; + Ok(()) +} + +#[tauri::command] +async fn resume_all(scheduler: tauri::State<'_, Scheduler>) -> Result<(), StoreError> { + scheduler.resume_all().await; + Ok(()) +} +``` + +## Bridge events to the frontend + +```rust +use taskmill::SchedulerEvent; + +fn setup_events(app: &tauri::App, scheduler: &Scheduler) { + let mut events = scheduler.subscribe(); + let handle = app.handle().clone(); + tokio::spawn(async move { + while let Ok(event) = events.recv().await { + handle.emit("taskmill-event", &event).unwrap(); + } + }); +} +``` + +In your frontend (TypeScript): + +```typescript +import { listen } from "@tauri-apps/api/event"; + +listen("taskmill-event", (event) => { + const data = event.payload; + // data has the SchedulerEvent structure — Progress, Completed, Failed, etc. + // Update your UI based on the event type +}); +``` + +## Handling edge cases + +### App backgrounded + +Pause all work when the app loses focus to conserve battery and bandwidth: + +```rust +// In a Tauri window event handler: +scheduler.pause_all().await; + +// When the app regains focus: +scheduler.resume_all().await; +``` + +### Crash recovery + +Handled automatically. When the app restarts, `TaskStore::open()` resets any tasks that were mid-upload back to pending. They'll be re-dispatched and the executor will re-upload from the beginning. + +If your upload target supports resumable uploads, you can store the upload session ID in the payload and check for it in the executor before starting a new upload. + +### Duplicate uploads + +Handled automatically. The `key()` method on `UploadTask` returns the file path, so submitting the same file twice returns `SubmitOutcome::Duplicate`. The UI can show a "already queued" message. diff --git a/docs/io-and-backpressure.md b/docs/io-and-backpressure.md index fdf07b9..1e12f79 100644 --- a/docs/io-and-backpressure.md +++ b/docs/io-and-backpressure.md @@ -1,94 +1,119 @@ # IO Tracking & Backpressure -Taskmill combines two independent gating mechanisms — IO budget tracking and composable backpressure — to avoid saturating system resources. +## The problem -## IO tracking +Spawning 50 image resizes at once will saturate your disk. The OS I/O queue backs up, file writes stall, the UI freezes, and other applications slow to a crawl. The same happens with network-heavy tasks — bulk uploads can exhaust your bandwidth. -### Submission estimates +Taskmill prevents this with two complementary mechanisms: -Every `TaskSubmission` includes expected IO: +- **IO budget tracking** — each task declares how much IO it expects, and the scheduler checks whether there's headroom before dispatching. +- **Backpressure** — external signals (disk load, network usage, memory, anything you want) tell the scheduler to slow down. -```rust -use taskmill::IoBudget; - -let sub = TaskSubmission::new("scan") - .payload_raw(data) - .expected_io(IoBudget::disk(50_000, 10_000)); // caller's estimate -``` - -### Completion actuals - -Executors report actual IO via context methods during execution: - -```rust -ctx.record_read_bytes(48_312); -ctx.record_write_bytes(9_876); -``` - -Actual values are stored in `task_history` for learning. +Both are optional. If you don't configure resource monitoring, the scheduler dispatches purely by priority and concurrency limits. -### Network IO +## IO budgets: telling the scheduler what to expect -Tasks that perform network transfers can declare expected bandwidth: +Every task can declare its expected IO when submitted. This is your best estimate of how many bytes the task will read and write: ```rust use taskmill::IoBudget; +// Disk-heavy task: reads a 50 KB file, writes a 10 KB thumbnail +let sub = TaskSubmission::new("thumbnail") + .payload_json(&data) + .expected_io(IoBudget::disk(50_000, 10_000)); + +// Network-heavy task: uploads 50 MB let sub = TaskSubmission::new("upload") .payload_json(&upload_payload) - .expected_io(IoBudget::net(0, 50_000_000)); // 50 MB upload + .expected_io(IoBudget::net(0, 50_000_000)); ``` -Executors report actual network bytes during execution: +**What if you don't know the IO cost?** Start with a rough estimate. Taskmill stores actual IO from completed tasks, so you can query `avg_throughput()` to refine your estimates over time. If you don't provide an IO budget at all, the task is treated as having zero expected IO and won't be gated. + +### Reporting actual IO + +Executors report actual IO during execution. This data is stored in history for learning: ```rust +// Disk IO +ctx.record_read_bytes(48_312); +ctx.record_write_bytes(9_876); + +// Network IO ctx.record_net_rx_bytes(response_size); ctx.record_net_tx_bytes(uploaded_bytes); ``` -Network actuals are stored in `task_history` alongside disk IO. +### How the scheduler uses IO budgets -### IO budget gating +When resource monitoring is enabled, the scheduler checks IO headroom before dispatching each task: -When resource monitoring is enabled, the scheduler checks IO headroom before dispatching: +1. Read the current smoothed disk throughput from the resource monitor. +2. Sum the expected IO across all currently running tasks. +3. Compute a 2-second capacity window: `capacity = throughput * 2.0`. +4. If running IO + the candidate task's IO would exceed 80% of capacity (on either read or write), the task is deferred to the next cycle. -1. Query EWMA-smoothed disk throughput from the `ResourceReader`. -2. Sum expected IO across all running tasks. -3. Compute a 2-second capacity window: `capacity = bytes_per_sec * 2.0`. -4. If running IO + candidate IO would exceed 80% of capacity on either axis (read or write), the task is deferred. - -This prevents the scheduler from piling up IO-heavy tasks that would saturate the disk. +This prevents the scheduler from piling up IO-heavy tasks that would saturate the disk. The threshold is conservative — 80% leaves headroom for other applications and OS operations. ### Learning from history -Use store queries to refine future estimates: +Use store queries to see how your tasks actually perform and refine future estimates: ```rust let store = scheduler.store(); -// Average read/write bytes per second for a task type (from recent completions) -let (avg_read_bps, avg_write_bps) = store.avg_throughput("scan", 20).await?; +// Average throughput for a task type (from recent completions) +let (avg_read_bps, avg_write_bps) = store.avg_throughput("thumbnail", 20).await?; // Aggregate stats: count, avg duration, avg IO, failure rate -let stats = store.history_stats("scan").await?; +let stats = store.history_stats("thumbnail").await?; ``` ## Resource monitoring -### Built-in platform sampler +Resource monitoring gives the scheduler real-time visibility into system load. **For most applications, just add `.with_resource_monitoring()` to the builder:** + +```rust +let scheduler = Scheduler::builder() + .with_resource_monitoring() // uses the built-in platform sampler + .build() + .await?; +``` + +This starts a background loop that samples CPU, disk IO, and network throughput every second using the `sysinfo` crate (Linux, macOS, Windows). Readings are smoothed with an [EWMA](glossary.md) (exponentially weighted moving average) to avoid overreacting to momentary spikes — a brief disk flush won't suddenly block all tasks. + +### EWMA smoothing + +Raw resource samples are smoothed before the scheduler uses them: -Enabled by default via the `sysinfo-monitor` feature flag. Provides CPU, disk IO, and network throughput on Linux, macOS, and Windows. +``` +smoothed = alpha * raw + (1 - alpha) * previous +``` + +- **Higher alpha** (e.g., 0.5) = more responsive to recent changes. Good for bursty workloads where conditions change quickly. +- **Lower alpha** (e.g., 0.2) = smoother, more stable. Good for steady workloads where you don't want momentary spikes to affect scheduling. +- **Default alpha: 0.3** — a balanced middle ground. + +Configure smoothing: ```rust +use std::time::Duration; +use taskmill::SamplerConfig; + let scheduler = Scheduler::builder() - .with_resource_monitoring() // uses SysinfoSampler automatically + .with_resource_monitoring() + .sampler_config(SamplerConfig { + interval: Duration::from_millis(500), // sample faster + ewma_alpha: 0.5, // more responsive + }) .build() .await?; ``` -### Custom samplers +### Advanced: custom samplers -For containers, cgroups, or mobile platforms, provide your own `ResourceSampler`: +For containers, cgroups, or mobile platforms where `sysinfo` doesn't work, provide your own `ResourceSampler`: ```rust use taskmill::{ResourceSampler, ResourceSnapshot}; @@ -113,39 +138,19 @@ let scheduler = Scheduler::builder() .await?; ``` -### EWMA smoothing - -Raw samples are smoothed via a `SmoothedReader` background loop: +Disable the built-in sampler in `Cargo.toml`: -``` -smoothed = alpha * raw + (1 - alpha) * previous +```toml +taskmill = { version = "0.3", default-features = false } ``` -- Default alpha: 0.3 (configurable via `SamplerConfig`) -- Default sample interval: 1 second -- Readers access snapshots via `RwLock` (readers never block each other) +## Backpressure: external pressure signals -Configure smoothing: - -```rust -use std::time::Duration; -use taskmill::SamplerConfig; - -let scheduler = Scheduler::builder() - .with_resource_monitoring() - .sampler_config(SamplerConfig { - interval: Duration::from_millis(500), // sample faster - ewma_alpha: 0.5, // more responsive - }) - .build() - .await?; -``` - -## Backpressure +Sometimes you need to slow down for reasons beyond disk IO — an API is rate-limited, memory is tight, the laptop is on battery. Taskmill lets you plug in any number of custom pressure signals. ### Pressure sources -Implement the `PressureSource` trait to expose a `0.0..=1.0` signal from any external source: +Implement the `PressureSource` trait to expose a `0.0..=1.0` signal: ```rust use taskmill::PressureSource; @@ -163,9 +168,18 @@ impl PressureSource for MemoryPressure { } ``` -### Composite pressure +**Common pressure sources you might build:** -Multiple sources are aggregated via `CompositePressure`. The aggregate pressure is the **maximum** across all sources: +| Signal | What it measures | When to use | +|--------|-----------------|-------------| +| Memory usage | RSS or system memory | Apps that process large files in memory | +| API rate limits | Remaining quota / total quota | Upload services with provider limits | +| Battery level | Charge percentage (inverted) | Mobile or laptop apps | +| Queue depth | Pending tasks / capacity | Prevent unbounded queue growth | + +### Composing multiple sources + +Multiple sources are aggregated via `CompositePressure`. The aggregate pressure is the **maximum** across all sources — the system is as pressured as its most constrained resource: ```rust use taskmill::CompositePressure; @@ -186,36 +200,20 @@ let scheduler = Scheduler::builder() .await?; ``` -### Throttle policies - -`ThrottlePolicy` maps `(priority, pressure)` to dispatch decisions: - -```rust -use taskmill::{ThrottlePolicy, Priority}; - -// Default: BACKGROUND >50%, NORMAL >75%, HIGH/REALTIME never -let policy = ThrottlePolicy::default_three_tier(); - -// Custom thresholds -let policy = ThrottlePolicy::new(vec![ - (Priority::IDLE, 0.3), // throttle IDLE at 30% - (Priority::BACKGROUND, 0.6), // throttle BACKGROUND at 60% - (Priority::NORMAL, 0.8), // throttle NORMAL at 80% -]); -``` +The aggregate pressure feeds into the [throttle policy](priorities-and-preemption.md#throttle-behavior), which decides which priority tiers to defer. -### How gating works +### How dispatch gating works -The default `DispatchGate` combines both mechanisms. A task is dispatched only when **both** pass: +The scheduler combines both mechanisms. A task is dispatched only when **both** pass: -1. **Backpressure check** — `ThrottlePolicy::should_throttle(priority, pressure)` returns false. -2. **IO budget check** — `has_io_headroom()` confirms the task won't saturate disk throughput. +1. **Backpressure check** — the throttle policy says this priority tier is allowed at the current pressure level. +2. **IO budget check** — the system has enough IO headroom for this task. If either check fails, the task stays in the queue and is retried on the next poll cycle. -### Network bandwidth pressure +## Network bandwidth pressure -Taskmill includes a built-in `NetworkPressure` source that maps observed network throughput against a configurable bandwidth cap. Enable it via the builder: +For upload/download-heavy applications, taskmill includes a built-in `NetworkPressure` source. It maps observed network throughput against a configurable bandwidth cap: ```rust let scheduler = Scheduler::builder() @@ -225,11 +223,11 @@ let scheduler = Scheduler::builder() .await?; ``` -When observed RX+TX throughput approaches the cap, the pressure rises toward 1.0 and the `ThrottlePolicy` starts throttling lower-priority tasks. This is especially useful for upload/download workloads where you want to avoid saturating a network link. +When observed RX+TX throughput approaches the cap, pressure rises toward 1.0 and the throttle policy starts deferring lower-priority tasks. This is especially useful when you want to leave bandwidth for other applications or avoid saturating a network link. -### Diagnostics +## Diagnostics -The `SchedulerSnapshot` includes pressure readings for debugging: +The `SchedulerSnapshot` includes pressure readings for debugging — useful for understanding why tasks are being deferred: ```rust let snap = scheduler.snapshot().await?; diff --git a/docs/persistence-and-recovery.md b/docs/persistence-and-recovery.md index 9b2b398..54ca8b3 100644 --- a/docs/persistence-and-recovery.md +++ b/docs/persistence-and-recovery.md @@ -1,82 +1,41 @@ # Persistence & Recovery -Taskmill persists all task state to SQLite, ensuring work survives process restarts, crashes, and power loss. +Taskmill persists all task state to SQLite. Work survives process restarts, crashes, and power loss — no manual recovery needed. -## SQLite schema +## What survives a crash -Two tables manage the task lifecycle: +When your app starts up, taskmill automatically recovers: -### `tasks` — active queue - -Holds pending, running, and paused tasks. +- **All queued tasks** are still in the queue at their original priority. +- **Running tasks** are reset to pending and re-dispatched. The crash doesn't count as a retry. +- **Dedup keys stay occupied** — no duplicate submissions sneak in during recovery. +- **Retry counts are preserved** — a task that had retried twice before the crash still has two retries used. -| Column | Type | Description | -|--------|------|-------------| -| `id` | INTEGER PRIMARY KEY | Insertion-order ID | -| `task_type` | TEXT NOT NULL | Executor lookup name | -| `key` | TEXT NOT NULL UNIQUE | SHA-256 dedup key | -| `label` | TEXT NOT NULL | Human-readable display name (original dedup key or task type) | -| `priority` | INTEGER NOT NULL | 0–255 (lower = higher priority) | -| `status` | TEXT DEFAULT 'pending' | `pending`, `running`, `paused`, or `waiting` | -| `payload` | BLOB | Opaque task data (max 1 MiB) | -| `expected_read_bytes` | INTEGER | Estimated disk read IO (part of `IoBudget`) | -| `expected_write_bytes` | INTEGER | Estimated disk write IO (part of `IoBudget`) | -| `expected_net_rx_bytes` | INTEGER | Estimated network RX (part of `IoBudget`) | -| `expected_net_tx_bytes` | INTEGER | Estimated network TX (part of `IoBudget`) | -| `parent_id` | INTEGER | Parent task ID for child tasks (NULL for top-level) | -| `fail_fast` | INTEGER DEFAULT 1 | Whether child failure cancels siblings and fails parent | -| `retry_count` | INTEGER DEFAULT 0 | Number of retries so far | -| `last_error` | TEXT | Most recent error message | -| `created_at` | TEXT | ISO 8601 timestamp | -| `started_at` | TEXT | Set when dispatched, cleared on pause | +The guarantee is **at-least-once execution**: a task might run partially, crash, and re-run from the beginning. Design your executors to be idempotent (or to check for partial work) so re-execution is safe. -**Index:** `idx_tasks_pending(status, priority ASC, id ASC) WHERE status = 'pending'` — partial index for efficient priority-ordered pop. +## Deduplication -### `task_history` — completed and failed tasks +A common problem: your app submits "upload photo.jpg" twice because the user clicked a button while a sync was already running. Without dedup, you'd upload the same file twice. -| Column | Type | Description | -|--------|------|-------------| -| *(all columns from `tasks`, including `label`, `parent_id`, `fail_fast`)* | | | -| `actual_read_bytes` | INTEGER | Reported by executor (part of `IoBudget`) | -| `actual_write_bytes` | INTEGER | Reported by executor (part of `IoBudget`) | -| `actual_net_rx_bytes` | INTEGER | Reported by executor (part of `IoBudget`) | -| `actual_net_tx_bytes` | INTEGER | Reported by executor (part of `IoBudget`) | -| `completed_at` | TEXT | ISO 8601 timestamp | -| `duration_ms` | INTEGER | Wall-clock duration | -| `status` | TEXT | `completed` or `failed` | +Taskmill prevents this automatically. Every task gets a unique key derived from its type and payload. If you submit a task with a key that's already in the queue, the duplicate is silently ignored. -**Index:** `idx_history_type_completed(task_type, completed_at DESC) WHERE status = 'completed'` — for per-type history queries and throughput calculations. +### How keys work -## Crash recovery +The key is `SHA-256(task_type + ":" + payload)`. Two tasks with the same type and payload always get the same key. -On startup, `TaskStore::open()` runs a recovery query: +You can also provide explicit keys when the default isn't right — for example, when two payloads represent the same logical work (different timestamps but same file path): -```sql -UPDATE tasks SET status = 'pending', started_at = NULL WHERE status = 'running' +```rust +let sub = TaskSubmission::new("upload") + .payload_json(&data) + .key("/photos/img.jpg"); // dedup on file path, not full payload ``` -This resets any tasks that were mid-execution when the process died. The behavior: - -- Tasks return to the priority queue at their original priority -- `retry_count` is preserved (crash doesn't count as a retry) -- Dedup keys remain occupied (no duplicate submissions during recovery) -- Tasks are re-dispatched in priority order on the next scheduler cycle - -## Deduplication - -### How keys are generated - -Every task gets a SHA-256 key: `SHA-256(task_type + ":" + (explicit_key OR payload))`. +### Key lifecycle -- **Implicit key** — if no key is provided, the payload bytes are used. Tasks with the same type and payload get the same key. -- **Explicit key** — use the `.key()` builder method to control deduplication yourself. Useful when two payloads represent the same logical work (e.g., different timestamps but same file path). The explicit key is also stored as the display `label`. -- **Type scoping** — the task type is always part of the hash, so `("resize", payload)` and `("compress", payload)` never collide. +A key is "occupied" while the task is active — pending, running, paused, waiting, or retrying. When the task completes or permanently fails (moves to history), the key is freed and the same work can be submitted again. -### Lifecycle - -A key is "occupied" while the task is in the `tasks` table (pending, running, paused, waiting, or retrying). When the task moves to `task_history` (completed or failed), the key is freed and can be resubmitted. - -### Submission behavior +### Submission outcomes ```rust use taskmill::SubmitOutcome; @@ -99,6 +58,8 @@ let outcomes = scheduler.submit_batch(&[sub1, sub2, sub3]).await?; ### Looking up tasks by dedup key +Check whether a task has been submitted (or has already completed): + ```rust use taskmill::TaskLookup; @@ -110,13 +71,13 @@ match lookup { } ``` -## History retention +## History and retention -Without pruning, `task_history` grows without bound. Configure automatic retention: +Completed and failed tasks are moved to a history table. Without pruning, this table grows without bound. Configure automatic retention to keep it manageable. -### By count +### Recommended settings -Keep the N most recent records: +For most desktop apps, keeping the last 10,000 records is a good default: ```rust use taskmill::{StoreConfig, RetentionPolicy}; @@ -146,7 +107,7 @@ let scheduler = Scheduler::builder() ### Pruning frequency -Pruning is amortized — it runs every N task completions (default 100, configurable via `StoreConfig::prune_interval`). Pruning errors are logged but don't affect the completed task. +Pruning is amortized — it runs every N task completions (default 100, configurable via `StoreConfig::prune_interval`). This keeps per-task overhead low. Pruning errors are logged but don't affect the completed task. ### Manual pruning @@ -156,13 +117,65 @@ let deleted = store.prune_history_by_count(5_000).await?; let deleted = store.prune_history_by_age(30).await?; ``` -## WAL mode +## Testing with in-memory stores + +For tests, use an in-memory database that doesn't touch the filesystem: + +```rust +let store = TaskStore::open_memory().await?; +``` + +## SQLite details + +You normally don't need to know the schema, but it's documented here for debugging and advanced use. + +### `tasks` — active queue + +| Column | Type | Description | +|--------|------|-------------| +| `id` | INTEGER PRIMARY KEY | Insertion-order ID | +| `task_type` | TEXT NOT NULL | Executor lookup name | +| `key` | TEXT NOT NULL UNIQUE | SHA-256 dedup key | +| `label` | TEXT NOT NULL | Human-readable display name | +| `priority` | INTEGER NOT NULL | 0–255 (lower = higher priority) | +| `status` | TEXT DEFAULT 'pending' | `pending`, `running`, `paused`, or `waiting` | +| `payload` | BLOB | Opaque task data (max 1 MiB) | +| `expected_read_bytes` | INTEGER | Estimated disk read IO | +| `expected_write_bytes` | INTEGER | Estimated disk write IO | +| `expected_net_rx_bytes` | INTEGER | Estimated network RX | +| `expected_net_tx_bytes` | INTEGER | Estimated network TX | +| `parent_id` | INTEGER | Parent task ID (NULL for top-level) | +| `fail_fast` | INTEGER DEFAULT 1 | Whether child failure cancels siblings | +| `retry_count` | INTEGER DEFAULT 0 | Retries so far | +| `last_error` | TEXT | Most recent error message | +| `created_at` | TEXT | ISO 8601 timestamp | +| `started_at` | TEXT | Set when dispatched, cleared on pause | + +**Index:** `idx_tasks_pending(status, priority ASC, id ASC) WHERE status = 'pending'` — partial index for fast priority-ordered dispatch. + +### `task_history` — completed and failed tasks + +All columns from `tasks`, plus: + +| Column | Type | Description | +|--------|------|-------------| +| `actual_read_bytes` | INTEGER | Reported by executor | +| `actual_write_bytes` | INTEGER | Reported by executor | +| `actual_net_rx_bytes` | INTEGER | Reported by executor | +| `actual_net_tx_bytes` | INTEGER | Reported by executor | +| `completed_at` | TEXT | ISO 8601 timestamp | +| `duration_ms` | INTEGER | Wall-clock duration | +| `status` | TEXT | `completed` or `failed` | + +**Index:** `idx_history_type_completed(task_type, completed_at DESC) WHERE status = 'completed'` — for per-type history queries and throughput calculations. + +### WAL mode -The database uses SQLite WAL (Write-Ahead Logging) for concurrent reads with serialized writes. This means multiple readers can query task status while the scheduler is dispatching work. +The database uses SQLite WAL (Write-Ahead Logging) for concurrent reads with serialized writes. Multiple Tauri commands can query task status simultaneously while the scheduler is dispatching work. -## Connection pooling +### Connection pooling -The default pool size is 16 connections. Configure via `StoreConfig::max_connections`: +The default pool size is 16 connections. Increase via `StoreConfig::max_connections` if you have many concurrent readers: ```rust let scheduler = Scheduler::builder() @@ -173,11 +186,3 @@ let scheduler = Scheduler::builder() .build() .await?; ``` - -## In-memory store for testing - -For tests, use an in-memory database that doesn't touch the filesystem: - -```rust -let store = TaskStore::open_memory().await?; -``` diff --git a/docs/priorities-and-preemption.md b/docs/priorities-and-preemption.md index 557c66e..483854d 100644 --- a/docs/priorities-and-preemption.md +++ b/docs/priorities-and-preemption.md @@ -1,18 +1,24 @@ # Priorities & Preemption +## When you need this + +Not all work is equally important. A user clicking "upload this file" should not wait behind 500 background thumbnail jobs. A re-index of the entire library should yield to normal operations if the system gets busy. + +Taskmill's priority system lets you express these relationships, and preemption enforces them at runtime — pausing lower-priority work so urgent tasks can run immediately. + ## Priority levels -Taskmill uses a 256-level priority scale where lower values mean higher priority. Five named constants are provided: +Taskmill uses a 256-level priority scale where lower values mean higher priority. Five named constants cover the most common scenarios: -| Constant | Value | Behavior | -|--------------|-------|----------| -| `REALTIME` | 0 | Never throttled. Triggers preemption of lower-priority work. | -| `HIGH` | 64 | Throttled only under extreme pressure (>75%). | -| `NORMAL` | 128 | Standard operations. Throttled at >75% pressure. | -| `BACKGROUND` | 192 | Deferred under moderate load. Throttled at >50% pressure. | -| `IDLE` | 255 | Runs only when the system is otherwise idle. Throttled at >50% pressure. | +| Constant | Value | When to use | +|----------|-------|-------------| +| `REALTIME` | 0 | User-blocking operations that can't wait. Rare — most apps never need this. | +| `HIGH` | 64 | User-initiated actions: uploading a file they clicked, exporting a document. | +| `NORMAL` | 128 | App-initiated work: generating thumbnails, syncing metadata. | +| `BACKGROUND` | 192 | Maintenance tasks: re-indexing, cleanup, cache warming. | +| `IDLE` | 255 | Truly optional work: analytics, prefetching, speculative processing. | -Custom values between tiers are supported: +Custom values between tiers are supported for fine-grained control: ```rust use taskmill::Priority; @@ -20,24 +26,40 @@ use taskmill::Priority; let custom = Priority::new(100); // between HIGH and NORMAL ``` -## Queue ordering +### Choosing priorities for your tasks -Tasks are popped from the queue in strict priority order (`ORDER BY priority ASC, id ASC`). Within the same priority tier, tasks are dispatched in insertion order (FIFO). +A good rule of thumb: **if the user is waiting for it, use `HIGH`. If the app initiated it, use `NORMAL`. If it can wait until later, use `BACKGROUND`.** -A partial index on `(status, priority, id) WHERE status = 'pending'` keeps pop operations fast regardless of history size. +Most applications only need `HIGH`, `NORMAL`, and `BACKGROUND`. Reserve `REALTIME` for operations where any delay is unacceptable — it triggers [preemption](#preemption) by default, which pauses all other work. + +## Queue ordering + +Tasks are dispatched in strict priority order. Within the same priority tier, tasks are dispatched in insertion order (FIFO) — the task submitted first runs first. ## Preemption -When a task with priority at or above `preempt_priority` (default: `REALTIME`) is submitted, the scheduler preempts lower-priority running work: +When a task at or above `preempt_priority` (default: `REALTIME`) is submitted, the scheduler pauses all lower-priority running work to make room: -1. **Cancel tokens** — the `CancellationToken` of every active task with lower priority (higher numeric value) is triggered. -2. **Pause in store** — preempted tasks are moved to `paused` status with `started_at` cleared. +1. **Cancel tokens** — the `CancellationToken` of every active task with lower priority is triggered. +2. **Pause in store** — preempted tasks are moved to `paused` status. 3. **Emit events** — a `SchedulerEvent::Preempted` is emitted for each affected task. 4. **Resume later** — paused tasks are only re-dispatched when no active preemptors remain, preventing thrashing between competing priority tiers. +### When to use preemption + +Preemption is powerful but rarely needed. The default threshold (`REALTIME`) means only the highest-priority tasks trigger it. **Most apps should leave the default.** If you find yourself frequently preempting, consider whether your task priorities are spread too wide. + +Use preemption when: +- A "cancel everything and do this NOW" escape hatch is needed (e.g., emergency sync) +- User-initiated work must always run immediately, even if the system is fully loaded + +Don't use preemption when: +- You just want important tasks to run first — priority ordering handles that without preemption +- Tasks are short enough that waiting for a slot is acceptable + ### Handling preemption in executors -Executors should check for cancellation at natural yield points: +Executors should check for cancellation at natural yield points — between chunks of work, between loop iterations, etc. This lets the executor clean up gracefully. ```rust impl TaskExecutor for MyExecutor { @@ -60,9 +82,9 @@ impl TaskExecutor for MyExecutor { } ``` -Returning a retryable error on preemption is optional — the scheduler handles pausing regardless. But it gives the executor a chance to clean up. +Returning a retryable error on preemption is optional — the scheduler handles pausing regardless. But it gives the executor a chance to clean up partial work. -### Configuring preemption threshold +### Configuring the preemption threshold ```rust let scheduler = Scheduler::builder() @@ -71,22 +93,54 @@ let scheduler = Scheduler::builder() .await?; ``` +## Task groups + +When multiple tasks share a limited resource (e.g., an API with rate limits, or a specific S3 bucket), you can limit how many run concurrently using task groups. + +Assign tasks to a group via `.group()` on `TaskSubmission` or `TypedTask::group_key()`: + +```rust +let sub = TaskSubmission::new("upload") + .payload_json(&data) + .group("bucket-prod"); // at most N uploads to this bucket at once +``` + +Configure limits at build time or adjust at runtime: + +```rust +let scheduler = Scheduler::builder() + .group_concurrency("bucket-prod", 3) // max 3 concurrent for this group + .default_group_concurrency(5) // default for any group not explicitly configured + .build() + .await?; + +// Adjust at runtime +scheduler.set_group_limit("bucket-prod", 5).await; +scheduler.remove_group_limit("bucket-prod").await; +``` + +Group limits are checked *in addition to* `max_concurrency` — a task must pass both the global and group gate to be dispatched. + ## Throttle behavior -Throttling is independent of preemption. It controls whether a pending task is *dispatched*, not whether a running task is *interrupted*. +Throttling is independent of preemption. While preemption *interrupts* running tasks, throttling controls whether pending tasks are *dispatched in the first place* based on current system [pressure](glossary.md#backpressure). The default three-tier `ThrottlePolicy`: | Priority tier | Throttled when pressure exceeds | |---------------|-------------------------------| -| `BACKGROUND` (192+) | 50% | +| `BACKGROUND` / `IDLE` (192+) | 50% | | `NORMAL` (128+) | 75% | | `HIGH` / `REALTIME` | Never | -Pressure is an aggregate `0.0..=1.0` value from all registered `PressureSource` implementations (see [IO & Backpressure](io-and-backpressure.md)). +This means: when the system is moderately busy (>50% pressure), background tasks wait. When it's heavily loaded (>75%), even normal tasks wait. High-priority and realtime tasks always run. + +Pressure is an aggregate `0.0..=1.0` value from all registered [pressure sources](io-and-backpressure.md#pressure-sources). ### Custom throttle policies +If the defaults don't fit your workload, define your own thresholds: + ```rust use taskmill::{ThrottlePolicy, Priority}; @@ -98,4 +152,21 @@ let policy = ThrottlePolicy::new(vec![ ]); ``` -Thresholds are evaluated from lowest priority (highest numeric value) first. A task is throttled if its priority is at or below the threshold tier and pressure exceeds the limit. +Thresholds are evaluated from lowest priority first. A task is throttled if its priority is at or below the threshold tier and pressure exceeds the limit. + +## Retries + +When an executor returns a retryable error, the task is automatically requeued at the same priority: + +```rust +// In your executor: +return Err(TaskError::retryable("network timeout, will retry")); + +// Non-retryable (permanent) failure: +return Err(TaskError::permanent("invalid payload, giving up")); +``` + +- **Retry limit** — `max_retries` (default 3) controls how many times a task can be retried before it permanently fails. +- **Priority preserved** — retried tasks keep their original priority; they aren't demoted. +- **Dedup key preserved** — the key stays occupied during retries, preventing duplicate submissions while the task is still being worked on. +- **Crash doesn't count** — if the process crashes while a task is running, the crash recovery doesn't increment `retry_count`. diff --git a/docs/progress-and-events.md b/docs/progress-and-events.md new file mode 100644 index 0000000..158cdf8 --- /dev/null +++ b/docs/progress-and-events.md @@ -0,0 +1,155 @@ +# Progress & Events + +Taskmill provides real-time progress tracking and lifecycle events so your UI always reflects what's happening — tasks starting, progressing, completing, failing, or being preempted. + +## Reporting progress from executors + +Executors report progress via `ctx.progress()`. This emits events that your UI can subscribe to for real-time updates. + +```rust +impl TaskExecutor for MyExecutor { + async fn execute<'a>( + &'a self, ctx: &'a TaskContext, + ) -> Result<(), TaskError> { + let items = get_work_items(); + + for (i, item) in items.iter().enumerate() { + process(item).await; + + // Percentage-based (0.0 to 1.0) + ctx.progress().report( + (i + 1) as f32 / items.len() as f32, + Some(format!("processed {}/{}", i + 1, items.len())), + ); + } + + Ok(()) + } +} +``` + +For count-based progress, use the convenience method: + +```rust +ctx.progress().report_fraction(processed, total, Some("importing".into())); +// Automatically computes: processed as f32 / total as f32 +``` + +## Automatic progress extrapolation + +Even tasks that don't explicitly report progress show movement in your UI. The scheduler extrapolates progress from historical data: + +1. Look up the average duration for this task type from completed tasks in history. +2. Calculate how long the current task has been running. +3. Estimate progress as elapsed time / average duration. +4. If the executor has reported partial progress, blend historical and current rates for a more accurate estimate. +5. Cap at 99% — extrapolation never reaches 100%, so a task only shows "complete" when it actually finishes. + +This means your progress bar always moves, even for tasks that don't call `report()`. + +## Lifecycle events + +All scheduler state changes are broadcast as `SchedulerEvent` variants. Subscribe via `scheduler.subscribe()`: + +```rust +let mut events = scheduler.subscribe(); +tokio::spawn(async move { + while let Ok(event) = events.recv().await { + match &event { + SchedulerEvent::Progress { header, percent, message } => { + update_progress_bar(header.task_id, *percent, message.as_deref()); + } + SchedulerEvent::Completed(header) => { + mark_done(header.task_id); + } + SchedulerEvent::Failed { header, error, will_retry } => { + if !will_retry { + show_error(header.task_id, error); + } + } + _ => {} + } + } +}); +``` + +### Event reference + +| Event | When it fires | +|-------|---------------| +| `Dispatched(TaskEventHeader)` | Task picked from queue and executor spawned | +| `Completed(TaskEventHeader)` | Task finished successfully | +| `Failed { header, error, will_retry }` | Task failed — `will_retry` tells you if it's being requeued | +| `Preempted(TaskEventHeader)` | Task paused for higher-priority work | +| `Cancelled(TaskEventHeader)` | Task cancelled via `scheduler.cancel()` | +| `Progress { header, percent, message }` | Progress update from executor | +| `Waiting { task_id, children_count }` | Parent task waiting for children to complete | +| `Paused` | Scheduler globally paused via `pause_all()` | +| `Resumed` | Scheduler resumed via `resume_all()` | + +Task-specific events share a `TaskEventHeader` with `task_id`, `task_type`, `key`, and `label`. Use `event.header()` to access it generically. + +### Which events to listen for + +| If you're building... | Listen to | +|-----------------------|-----------| +| A progress bar | `Progress`, `Completed`, `Failed` | +| An activity log | All events | +| Error alerting | `Failed` where `will_retry` is false | +| A "pause/resume" button | `Paused`, `Resumed` | +| Upload status indicators | `Dispatched`, `Progress`, `Completed`, `Failed`, `Preempted` | + +## Querying progress + +### All running tasks + +```rust +let progress = scheduler.estimated_progress().await; +for p in &progress { + println!("{} ({}): {:.0}%", p.header.task_type, p.header.key, p.percent * 100.0); + // p.reported_percent — last executor-reported value (if any) + // p.extrapolated_percent — throughput-based estimate (if any) + // p.percent — best available: reported if present, else extrapolated +} +``` + +## Dashboard snapshots + +For UI dashboards, `scheduler.snapshot()` gathers all scheduler state in a single call — running tasks, queue depth, progress, pressure, and configuration: + +```rust +let snap = scheduler.snapshot().await?; +// snap.running — Vec of currently executing tasks +// snap.pending_count — number of tasks waiting to dispatch +// snap.paused_count — number of preempted tasks +// snap.progress — Vec for every running task +// snap.pressure — aggregate backpressure (0.0–1.0) +// snap.pressure_breakdown — per-source diagnostics: Vec<(String, f32)> +// snap.max_concurrency — current concurrency limit +// snap.is_paused — whether the scheduler is globally paused +``` + +## Tauri event bridging + +All events derive `Serialize`, so they bridge directly to your frontend via Tauri IPC: + +```rust +let mut events = scheduler.subscribe(); +let handle = app_handle.clone(); +tokio::spawn(async move { + while let Ok(event) = events.recv().await { + handle.emit("taskmill-event", &event).unwrap(); + } +}); +``` + +Return snapshots from Tauri commands for polling-based UIs: + +```rust +#[tauri::command] +async fn scheduler_status( + scheduler: tauri::State<'_, Scheduler>, +) -> Result { + scheduler.snapshot().await +} +``` diff --git a/docs/progress-reporting.md b/docs/progress-reporting.md deleted file mode 100644 index 54fdb61..0000000 --- a/docs/progress-reporting.md +++ /dev/null @@ -1,166 +0,0 @@ -# Progress Reporting - -Taskmill provides real-time progress tracking for running tasks, combining executor-reported values with throughput-based extrapolation. - -## Reporting from executors - -Executors receive a `ProgressReporter` via `ctx.progress()`: - -```rust -impl TaskExecutor for MyExecutor { - async fn execute<'a>( - &'a self, ctx: &'a TaskContext, - ) -> Result<(), TaskError> { - let items = get_work_items(); - - for (i, item) in items.iter().enumerate() { - process(item).await; - - // Percentage-based (0.0 to 1.0) - ctx.progress().report( - (i + 1) as f32 / items.len() as f32, - Some(format!("processed {}/{}", i + 1, items.len())), - ); - } - - Ok(()) - } -} -``` - -### Fraction-based reporting - -For count-based progress: - -```rust -ctx.progress().report_fraction(processed, total, Some("importing".into())); -// Automatically computes: processed as f32 / total as f32 -``` - -## Progress events - -Every `report()` call emits a `SchedulerEvent::Progress`: - -```rust -SchedulerEvent::Progress { - header: TaskEventHeader { - task_id: 42, - task_type: "resize".into(), - key: "abc123".into(), - label: "my-image.jpg".into(), - }, - percent: 0.5, - message: Some("resizing".into()), -} -``` - -Subscribe to events for real-time UI updates: - -```rust -let mut events = scheduler.subscribe(); -tokio::spawn(async move { - while let Ok(event) = events.recv().await { - if let SchedulerEvent::Progress { header, percent, message } = event { - update_ui(header.task_id, percent, message); - } - } -}); -``` - -## Throughput-based extrapolation - -For tasks that don't report progress (or between reports), the scheduler extrapolates based on historical data: - -1. Fetch `history_stats(task_type)` to get the average duration for this task type. -2. Compute throughput: `1.0 / avg_duration_ms` (completion fraction per millisecond). -3. Multiply by elapsed time since `started_at` to get an extrapolated percentage. -4. If the executor has reported partial progress, blend the historical throughput with the current rate. -5. Cap at 99% — extrapolation never reaches 100% to avoid false "complete" signals. - -This means even tasks with no explicit progress reporting show movement in UI dashboards. - -## Querying progress - -### All running tasks - -```rust -let progress = scheduler.estimated_progress().await; -for p in &progress { - println!("{} ({}): {:.0}%", p.header.task_type, p.header.key, p.percent * 100.0); - // p.reported_percent — last executor-reported value (if any) - // p.extrapolated_percent — throughput-based estimate (if any) - // p.percent — best available: reported if present, else extrapolated -} -``` - -### Via snapshot - -The `SchedulerSnapshot` includes progress for all running tasks: - -```rust -let snap = scheduler.snapshot().await?; -for p in &snap.progress { - println!("{}: {:.0}%", p.header.key, p.percent * 100.0); -} -``` - -## Lifecycle events - -All scheduler state changes are broadcast as `SchedulerEvent` variants: - -| Event | When | -|-------|------| -| `Dispatched(TaskEventHeader)` | Task popped from queue and executor spawned | -| `Completed(TaskEventHeader)` | Task finished successfully | -| `Failed { header, error, will_retry }` | Task failed (includes whether it will be retried) | -| `Preempted(TaskEventHeader)` | Task paused for higher-priority work | -| `Cancelled(TaskEventHeader)` | Task cancelled via `scheduler.cancel()` | -| `Progress { header, percent, message }` | Progress update from executor | -| `Waiting { task_id, children_count }` | Parent task entered waiting state after spawning children | -| `Paused` | Scheduler globally paused via `pause_all()` | -| `Resumed` | Scheduler resumed via `resume_all()` | - -Task-specific variants share a `TaskEventHeader` struct with `task_id`, `task_type`, `key`, and `label`. Use `event.header()` to access it generically. - -### Tauri bridge - -Bridge events to the frontend in a Tauri app: - -```rust -let mut events = scheduler.subscribe(); -let handle = app_handle.clone(); -tokio::spawn(async move { - while let Ok(event) = events.recv().await { - handle.emit("taskmill-event", &event).unwrap(); - } -}); -``` - -All events derive `Serialize`, so they can be sent directly over Tauri IPC. - -## Dashboard snapshot - -For UI dashboards, `Scheduler::snapshot()` gathers all scheduler state in a single call: - -```rust -let snap = scheduler.snapshot().await?; -// snap.running — Vec of currently executing tasks -// snap.pending_count — number of tasks waiting to dispatch -// snap.paused_count — number of preempted tasks -// snap.progress — Vec for every running task -// snap.pressure — aggregate backpressure (0.0–1.0) -// snap.pressure_breakdown — per-source diagnostics: Vec<(String, f32)> -// snap.max_concurrency — current concurrency limit -// snap.is_paused — whether the scheduler is globally paused -``` - -Return directly from a Tauri command: - -```rust -#[tauri::command] -async fn scheduler_status( - scheduler: tauri::State<'_, Scheduler>, -) -> Result { - scheduler.snapshot().await -} -``` diff --git a/docs/query-apis.md b/docs/query-apis.md index 878dc9c..cc54fe2 100644 --- a/docs/query-apis.md +++ b/docs/query-apis.md @@ -1,6 +1,14 @@ # Query APIs -All queries are available on `TaskStore`, accessed via `scheduler.store()`. +Use these queries to build dashboards, debug stuck tasks, and gather analytics about task performance. All queries are available on `TaskStore`, accessed via `scheduler.store()`. + +## Common patterns + +**Dashboard** — for most UIs, `scheduler.snapshot()` is all you need. It returns running tasks, queue depth, progress, and pressure in a single call. Use store queries below for custom views. + +**Debugging** — use `task_by_id()` to inspect a specific task, or `failed_tasks()` to see recent failures and their error messages. + +**Analytics** — use `history_stats()` for per-type aggregates (average duration, failure rate) and `avg_throughput()` to calibrate IO budgets. ## Active task queries @@ -15,7 +23,7 @@ All queries are available on `TaskStore`, accessed via `scheduler.store()`. | `paused_count()` | `i64` | Count of paused tasks. | | `task_by_id(id)` | `Option` | Look up an active task by row ID. | | `task_by_key(key)` | `Option` | Look up an active task by dedup key. | -| `running_io_totals()` | `(i64, i64)` | Sum of `(expected_io.disk_read, expected_io.disk_write)` across running tasks. | +| `running_io_totals()` | `(i64, i64)` | Sum of expected disk read and write bytes across running tasks. Useful for comparing against system capacity. | ## History queries @@ -24,14 +32,14 @@ All queries are available on `TaskStore`, accessed via `scheduler.store()`. | `history(limit, offset)` | `Vec` | Paginated history, newest first. | | `history_by_type(task_type, limit)` | `Vec` | History filtered by task type. | | `history_by_key(key)` | `Vec` | All past runs matching a dedup key. | -| `failed_tasks(limit)` | `Vec` | Recent failures. | +| `failed_tasks(limit)` | `Vec` | Recent failures with error messages. | ## Aggregate queries | Method | Returns | Description | |--------|---------|-------------| -| `history_stats(task_type)` | `TypeStats` | Aggregate stats: count, avg duration, avg IO, failure rate. | -| `avg_throughput(task_type, limit)` | `(f64, f64)` | Average `(read_bytes/sec, write_bytes/sec)` from recent completions. | +| `history_stats(task_type)` | `TypeStats` | Aggregate stats for a task type. | +| `avg_throughput(task_type, limit)` | `(f64, f64)` | Average read/write bytes per second from recent completions. Use this to calibrate IO budgets. | ### TypeStats fields @@ -41,11 +49,11 @@ All queries are available on `TaskStore`, accessed via `scheduler.store()`. | `avg_duration_ms` | `f64` | Average wall-clock duration. | | `avg_read_bytes` | `f64` | Average actual read bytes. | | `avg_write_bytes` | `f64` | Average actual write bytes. | -| `failure_rate` | `f64` | Fraction of tasks that failed (0.0–1.0). | +| `failure_rate` | `f64` | Fraction of tasks that failed (0.0–1.0). A rate above 0.1 may indicate a systemic issue. | ## Unified lookup -Search both active and history tables by dedup key: +Search both active and history tables by dedup key — useful for checking whether a task has been submitted or has already completed: ```rust use taskmill::TaskLookup; @@ -77,8 +85,8 @@ let lookup = scheduler.lookup_typed(&ResizeTask { | Method | Returns | Description | |--------|---------|-------------| -| `prune_history_by_count(keep)` | `u64` | Delete all but the N most recent history records. Returns count deleted. | -| `prune_history_by_age(days)` | `u64` | Delete history records older than N days. Returns count deleted. | +| `prune_history_by_count(keep)` | `u64` | Delete all but the N most recent history records. | +| `prune_history_by_age(days)` | `u64` | Delete history records older than N days. | ## Usage example diff --git a/docs/quick-start.md b/docs/quick-start.md index 1fc7b59..71c3ad3 100644 --- a/docs/quick-start.md +++ b/docs/quick-start.md @@ -6,24 +6,26 @@ Add taskmill to your `Cargo.toml`: ```toml [dependencies] -taskmill = { path = "crates/taskmill" } +taskmill = "0.3" ``` -To disable platform resource monitoring (e.g., for mobile targets): +To disable platform resource monitoring (e.g., for mobile targets or custom samplers): ```toml [dependencies] -taskmill = { path = "crates/taskmill", default-features = false } +taskmill = { version = "0.3", default-features = false } ``` ## Implement an executor -Each task type needs a `TaskExecutor` implementation. The executor receives a `TaskContext` with accessor methods: +Every task type needs code that knows how to do the work. You provide this by implementing the `TaskExecutor` trait. The scheduler calls your executor whenever a task of that type is dispatched. -- `record()` — the full `TaskRecord` with payload (up to 1 MiB), priority, retry count, etc. -- `token()` — a `CancellationToken` for preemption support -- `progress()` — a `ProgressReporter` for reporting progress back to the scheduler -- `state::()` — shared application state (if registered via `.app_state()` or `register_state()`) +Your executor receives a `TaskContext` with everything it needs: + +- `record()` — the full task record including payload, priority, and retry count +- `token()` — a cancellation token for responding to preemption (see [Priorities & Preemption](priorities-and-preemption.md#handling-preemption-in-executors)) +- `progress()` — a reporter for sending progress updates to the UI (see [Progress & Events](progress-and-events.md)) +- `state::()` — shared application state you registered at build time ```rust use std::sync::Arc; @@ -59,6 +61,8 @@ impl TaskExecutor for ImageResizer { ## Build and run the scheduler +The builder wires everything together: it opens the SQLite database, registers your executors, and optionally starts resource monitoring. + ```rust use std::sync::Arc; use std::time::Duration; @@ -111,9 +115,16 @@ async fn main() { } ``` -## Using typed tasks +### What just happened? + +1. The builder opened `tasks.db` (creating it if needed), ran migrations, and recovered any tasks left running from a previous crash. +2. `submit()` inserted a task into SQLite with a dedup key derived from the payload. If you call `submit()` again with the same payload, it returns `Duplicate` instead of creating a second task. +3. `run()` started the dispatch loop. On each cycle, the scheduler picks the highest-priority pending task, checks whether the system has IO headroom, and if so, spawns your executor in a new tokio task. +4. When the executor finishes, the task moves to history and a `Completed` event is broadcast. -For stronger type safety, implement the `TypedTask` trait: +## Typed tasks + +For stronger compile-time guarantees, implement the `TypedTask` trait instead of using stringly-typed `TaskSubmission`. This keeps the task type name, priority, and IO budget co-located with the payload struct. ```rust use serde::{Serialize, Deserialize}; @@ -142,38 +153,69 @@ scheduler.submit_typed(&ResizeTask { let task: ResizeTask = ctx.payload()?; ``` -## Manual wiring +## Child tasks -For full control over components, use `Scheduler::new()` directly: +Some work is naturally hierarchical — a multipart upload needs to upload individual parts, then call `CompleteMultipartUpload`. Taskmill supports this with child tasks and two-phase execution. + +Spawn children from within an executor using `ctx.spawn_child()`. The parent automatically enters a `waiting` state until all children complete, then `finalize()` is called on the parent executor. ```rust -use std::sync::Arc; -use taskmill::{ - CompositePressure, Scheduler, SchedulerConfig, - TaskStore, ThrottlePolicy, -}; -use taskmill::registry::TaskTypeRegistry; +impl TaskExecutor for MultipartUploader { + async fn execute<'a>( + &'a self, ctx: &'a TaskContext, + ) -> Result<(), TaskError> { + let parts = split_into_parts(&ctx.record().payload); + for part in parts { + ctx.spawn_child( + TaskSubmission::new("upload-part") + .payload_json(&part) + .expected_io(IoBudget::net(0, part.size)) + ).await?; + } + Ok(()) // parent enters 'waiting' state + } -let store = TaskStore::open("tasks.db").await.unwrap(); + async fn finalize<'a>( + &'a self, ctx: &'a TaskContext, + ) -> Result<(), TaskError> { + // Called after all children complete + complete_multipart_upload(ctx).await + } +} +``` -let mut registry = TaskTypeRegistry::new(); -registry.register("resize", Arc::new(ImageResizer)); +By default, if any child fails, its siblings are cancelled and the parent fails immediately (fail-fast). Disable this per-submission with `.fail_fast(false)`. -let pressure = CompositePressure::new(); -let policy = ThrottlePolicy::default_three_tier(); +## Sharing the scheduler -let scheduler = Scheduler::new( - store, - SchedulerConfig::default(), - Arc::new(registry), - pressure, - policy, -); +A single `Scheduler` is `Clone` (via `Arc`) and can be shared across your entire application. Multiple state types can coexist — each is keyed by its concrete `TypeId`. + +```rust +use std::sync::Arc; +use taskmill::Scheduler; + +// The host app builds the scheduler and registers its own executors. +let scheduler = Scheduler::builder() + .store_path("app.db") + .executor("thumbnail", Arc::new(ThumbnailGenerator)) + .app_state(MyAppServices { /* ... */ }) + .max_concurrency(4) + .build() + .await + .unwrap(); + +// A library can inject its own state after build. +scheduler.register_state(Arc::new(LibraryState { /* ... */ })).await; + +// Both the host and the library submit tasks to the same queue. +// The host manages the run loop. +let token = CancellationToken::new(); +scheduler.run(token).await; ``` ## Tauri integration -Taskmill is designed for Tauri. A typical setup: +Taskmill is designed for Tauri. The `Scheduler` drops directly into Tauri state, and all events are serializable for IPC. ```rust use tauri::Manager; @@ -198,3 +240,46 @@ fn setup_events(app: &tauri::App, scheduler: &Scheduler) { }); } ``` + +For a complete walkthrough, see the [Tauri Upload Queue guide](guides/tauri-upload-queue.md). + +## Manual wiring + +If you need full control over individual components (custom pressure sources, custom throttle policies, pre-opened stores), you can bypass the builder: + +```rust +use std::sync::Arc; +use taskmill::{ + CompositePressure, Scheduler, SchedulerConfig, + TaskStore, ThrottlePolicy, +}; +use taskmill::registry::TaskTypeRegistry; + +let store = TaskStore::open("tasks.db").await.unwrap(); + +let mut registry = TaskTypeRegistry::new(); +registry.register("resize", Arc::new(ImageResizer)); + +let pressure = CompositePressure::new(); +let policy = ThrottlePolicy::default_three_tier(); + +let scheduler = Scheduler::new( + store, + SchedulerConfig::default(), + Arc::new(registry), + pressure, + policy, +); +``` + +## Next steps + +Work through the topic guides in order: + +1. [Priorities & Preemption](priorities-and-preemption.md) — control which tasks run first +2. [IO & Backpressure](io-and-backpressure.md) — prevent resource saturation +3. [Progress & Events](progress-and-events.md) — show progress and react to state changes +4. [Persistence & Recovery](persistence-and-recovery.md) — understand crash safety and deduplication +5. [Configuration](configuration.md) — tune for your workload +6. [Query APIs](query-apis.md) — build dashboards and debug stuck tasks +7. [Design](design.md) — understand the architecture for advanced use diff --git a/docs/why-taskmill.md b/docs/why-taskmill.md new file mode 100644 index 0000000..8f596bc --- /dev/null +++ b/docs/why-taskmill.md @@ -0,0 +1,46 @@ +# Why Taskmill + +## The problem + +Desktop apps and background services often need to do work in the background — generating thumbnails, uploading files, syncing data, scanning directories. The naive approach is to spawn async tasks and push results through channels. This works until it doesn't: + +- **Crashes lose work.** If the process dies, everything in-flight disappears. Users have to re-trigger uploads, re-scan libraries, re-generate thumbnails. +- **The system gets overwhelmed.** Spawning 200 image resizes at once saturates the disk. The UI freezes, other apps slow down, and the laptop fan spins up. +- **Users can't see what's happening.** There's no built-in way to show progress, pause work, or let urgent tasks jump the queue. +- **You end up building a scheduler.** Priority queues, retry logic, deduplication, persistence, backpressure — each one is a small problem, but together they're a significant engineering effort. + +Taskmill is that engineering effort, packaged as a library. + +## What taskmill gives you + +- **Work survives crashes.** Tasks are persisted to SQLite. If the process dies, pending and in-progress work is automatically recovered on restart — no user action needed. +- **The system stays responsive.** The scheduler monitors disk and network throughput and automatically defers work when resources are saturated. Your app doesn't freeze the system. +- **Urgent work runs first.** A 256-level priority queue with preemption lets important tasks interrupt background work. User-initiated uploads run before bulk indexing. +- **Progress comes for free.** Executors can report progress explicitly, but even without that, the scheduler extrapolates from historical data so your UI always shows movement. +- **No duplicate work.** Built-in deduplication means you can safely call "upload this file" multiple times — only one task is created. +- **Built for Tauri.** Every public type is `Clone` and `Serialize`. Events bridge directly to your frontend. The `Scheduler` drops into `tauri::State` without wrapping. + +## When to use taskmill + +**Good fit:** +- Desktop apps with file processing (photo editors, media managers, backup tools) +- Upload and download managers +- Background indexing and search (scanning directories, building metadata) +- Any system where tasks have measurable IO costs and you don't want to saturate the disk or network + +**Not a good fit:** +- Distributed job queues across multiple machines — taskmill is single-process, single-node +- Sub-millisecond scheduling — SQLite adds a few milliseconds of overhead per dispatch +- CPU-only compute farms — IO-aware scheduling doesn't help if your bottleneck is CPU +- Web request handlers — use your web framework's middleware and connection pooling instead + +## Comparison with alternatives + +| Approach | Persistence | IO awareness | Priority | Desktop focus | +|----------|------------|-------------|----------|--------------| +| `tokio::spawn` + channels | No | No | No | No | +| `apalis` / `background_jobs` | Yes (Redis/Postgres) | No | Basic | No (server-oriented) | +| Roll your own | Whatever you build | Whatever you build | Whatever you build | Whatever you build | +| **Taskmill** | Yes (SQLite) | Yes | 256-level with preemption | Yes (Tauri-native) | + +Taskmill is ~3,500 lines of tested Rust. It handles the edge cases (crash recovery, dedup races, preemption anti-thrash, amortized pruning) so you don't have to.