Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,16 +86,18 @@ scheduler.run(token).await;

- **SQLite persistence** — tasks survive restarts; crash recovery requeues interrupted work
- **256-level priority queue** — with preemption of lower-priority tasks
- **IO-aware scheduling** — defers work when disk throughput is saturated
- **IO-aware scheduling** — defers work when disk or network throughput is saturated
- **Key-based deduplication** — SHA-256 keys prevent duplicate submissions
- **Composable backpressure** — plug in external pressure signals with custom throttle policies
- **Cross-platform resource monitoring** — CPU and disk IO via `sysinfo` (Linux, macOS, Windows)
- **Cross-platform resource monitoring** — CPU, disk IO, and network throughput via `sysinfo` (Linux, macOS, Windows)
- **Network bandwidth pressure** — built-in `NetworkPressure` source throttles tasks when bandwidth is saturated
- **Retries** — automatic requeue of retryable failures with configurable limits
- **Progress reporting** — executor-reported and throughput-extrapolated progress
- **Lifecycle events** — broadcast events for UI integration (Tauri, etc.)
- **Typed payloads** — serialize/deserialize structured task data
- **Batch submission** — bulk enqueue in a single SQLite transaction
- **Graceful shutdown** — configurable drain timeout before force-cancellation
- **Task group concurrency** — limit concurrent tasks per named group (e.g., per S3 bucket)
- **Global pause/resume** — pause all work when the app is backgrounded
- **Type-keyed application state** — register multiple state types, inject pre- or post-build
- **Clone-friendly** — `Scheduler` is `Clone` via `Arc` for easy sharing
Expand Down
3 changes: 3 additions & 0 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,9 @@ All `SchedulerBuilder` methods:
| `with_resource_monitoring()` | Enable platform resource monitoring. |
| `resource_sampler(sampler)` | Provide a custom `ResourceSampler`. |
| `sampler_config(config)` | Configure sample interval and smoothing. |
| `bandwidth_limit(bytes_per_sec)` | Set a network bandwidth cap; registers a built-in `NetworkPressure` source. |
| `default_group_concurrency(n)` | Default concurrency limit for grouped tasks (0 = unlimited). |
| `group_concurrency(group, n)` | Per-group concurrency limit override. |
| `app_state(state)` | Register a state type (multiple types can coexist). |
| `app_state_arc(arc)` | Register a state type from a pre-existing `Arc`. |
| `build()` | Build and return the `Scheduler`. |
15 changes: 12 additions & 3 deletions docs/features.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,13 @@ A complete list of taskmill's capabilities.
- **Atomic dispatch** — pop operations use `UPDATE ... WHERE id = (SELECT ...) RETURNING *` for race-free claiming with no lost tasks.
- **Runtime-adjustable concurrency** — change `max_concurrency` at runtime via `set_max_concurrency()`.

## Task Groups

- **Per-group concurrency** — assign tasks to named groups via `.group(key)` on `TaskSubmission` or `TypedTask::group_key()`. The scheduler limits how many tasks in the same group can run concurrently.
- **Configurable limits** — set per-group limits at build time with `.group_concurrency(group, limit)` or a default for all groups with `.default_group_concurrency(limit)`.
- **Runtime-adjustable** — change limits at runtime via `set_group_limit()`, `remove_group_limit()`, and `set_default_group_concurrency()`.
- **Independent of global concurrency** — group limits are checked *in addition to* `max_concurrency`. A task must pass both the global and group gate to be dispatched.

## Deduplication

- **Key-based dedup** — each task gets a SHA-256 key derived from `task_type + payload` (or an explicit key). A `UNIQUE(key)` constraint with `INSERT OR IGNORE` prevents duplicate submissions.
Expand All @@ -25,15 +32,17 @@ A complete list of taskmill's capabilities.
## IO Awareness

- **Expected/actual IO tracking** — submit estimated read/write bytes; executors report actual bytes on completion.
- **Network IO tracking** — tasks can declare expected network RX/TX bytes via `expected_net_io()` and report actuals via `ctx.record_net_rx_bytes()` / `ctx.record_net_tx_bytes()`.
- **IO budget gating** — the scheduler compares running task IO estimates against EWMA-smoothed system throughput. New work is deferred when cumulative IO would exceed 80% of observed disk capacity.
- **Learning from history** — `avg_throughput()` and `history_stats()` compute per-type IO averages from actual completions, enabling callers to refine estimates over time.

## Resource Monitoring

- **Cross-platform** — CPU and disk IO via `sysinfo` on Linux, macOS, and Windows. Feature-gated under `sysinfo-monitor` (enabled by default).
- **Cross-platform** — CPU, disk IO, and network throughput via `sysinfo` on Linux, macOS, and Windows. Feature-gated under `sysinfo-monitor` (enabled by default).
- **EWMA smoothing** — raw samples are smoothed with an exponentially weighted moving average (alpha=0.3, configurable) to avoid spiky readings.
- **Two-trait design** — `ResourceSampler` (raw platform readings) and `ResourceReader` (smoothed snapshots) are separated for testability and custom implementations.
- **Custom samplers** — disable the `sysinfo-monitor` feature and provide your own `ResourceSampler` for containers, cgroups, or mobile platforms.
- **Network bandwidth pressure** — built-in `NetworkPressure` source maps observed RX+TX throughput against a configurable bandwidth cap to backpressure. Enable via `.bandwidth_limit(bytes_per_sec)` on the builder.

## Backpressure

Expand Down Expand Up @@ -61,7 +70,7 @@ A complete list of taskmill's capabilities.

## Lifecycle Events

- **Broadcast channel** — subscribe via `scheduler.subscribe()` to receive `SchedulerEvent` variants: `Dispatched`, `Completed`, `Failed`, `Preempted`, `Cancelled`, `Progress`, `Paused`, `Resumed`.
- **Broadcast channel** — subscribe via `scheduler.subscribe()` to receive `SchedulerEvent` variants: `Dispatched`, `Completed`, `Failed`, `Preempted`, `Cancelled`, `Progress`, `Waiting`, `Paused`, `Resumed`.
- **Tauri-ready** — all events are `Serialize`, designed for direct bridging to frontend via `app_handle.emit()`.

## Task Management
Expand All @@ -72,7 +81,7 @@ A complete list of taskmill's capabilities.

## Typed Payloads

- **Builder-style submission** — `TaskSubmission::new(type).payload_json(&data)?.expected_io(r, w)` for ergonomic construction with serialization.
- **Builder-style submission** — `TaskSubmission::new(type).payload_json(&data)?.expected_io(r, w)` for ergonomic construction with serialization. Use `.label("display name")` to set a human-readable display label independent of the dedup key.
- **Type-safe deserialization** — `ctx.payload::<T>()?` in executors for zero-boilerplate extraction.
- **TypedTask trait** — define `TASK_TYPE`, default priority, and expected IO on your struct. Submit with `scheduler.submit_typed()` and deserialize with `ctx.payload::<T>()`.

Expand Down
48 changes: 42 additions & 6 deletions docs/io-and-backpressure.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,26 @@ ctx.record_write_bytes(9_876);

Actual values are stored in `task_history` for learning.

### Network IO

Tasks that perform network transfers can declare expected bandwidth:

```rust
let sub = TaskSubmission::new("upload")
.payload_json(&upload_payload)?
.expected_io(0, 0) // no disk IO
.expected_net_io(0, 50_000_000); // 50 MB upload
```

Executors report actual network bytes during execution:

```rust
ctx.record_net_rx_bytes(response_size);
ctx.record_net_tx_bytes(uploaded_bytes);
```

Network actuals are stored in `task_history` alongside disk IO.

### IO budget gating

When resource monitoring is enabled, the scheduler checks IO headroom before dispatching:
Expand Down Expand Up @@ -54,7 +74,7 @@ let stats = store.history_stats("scan").await?;

### Built-in platform sampler

Enabled by default via the `sysinfo-monitor` feature flag. Provides CPU and disk IO on Linux, macOS, and Windows.
Enabled by default via the `sysinfo-monitor` feature flag. Provides CPU, disk IO, and network throughput on Linux, macOS, and Windows.

```rust
let scheduler = Scheduler::builder()
Expand All @@ -75,9 +95,11 @@ struct CgroupSampler;
impl ResourceSampler for CgroupSampler {
fn sample(&mut self) -> ResourceSnapshot {
ResourceSnapshot {
cpu_usage: read_cgroup_cpu(), // 0.0–1.0
cpu_usage: read_cgroup_cpu(), // 0.0–1.0
io_read_bytes_per_sec: read_blkio_read(),
io_write_bytes_per_sec: read_blkio_write(),
net_rx_bytes_per_sec: read_net_rx(),
net_tx_bytes_per_sec: read_net_tx(),
}
}
}
Expand Down Expand Up @@ -146,17 +168,17 @@ Multiple sources are aggregated via `CompositePressure`. The aggregate pressure
use taskmill::CompositePressure;

let mut pressure = CompositePressure::new();
pressure.add_source(Arc::new(MemoryPressure));
pressure.add_source(Arc::new(QueueDepthPressure));
pressure.add_source(Box::new(MemoryPressure));
pressure.add_source(Box::new(QueueDepthPressure));
// Aggregate = max(memory_pressure, queue_pressure)
```

Or via the builder:

```rust
let scheduler = Scheduler::builder()
.pressure_source(Arc::new(MemoryPressure))
.pressure_source(Arc::new(QueueDepthPressure))
.pressure_source(Box::new(MemoryPressure))
.pressure_source(Box::new(QueueDepthPressure))
.build()
.await?;
```
Expand Down Expand Up @@ -188,6 +210,20 @@ The default `DispatchGate` combines both mechanisms. A task is dispatched only w

If either check fails, the task stays in the queue and is retried on the next poll cycle.

### Network bandwidth pressure

Taskmill includes a built-in `NetworkPressure` source that maps observed network throughput against a configurable bandwidth cap. Enable it via the builder:

```rust
let scheduler = Scheduler::builder()
.with_resource_monitoring()
.bandwidth_limit(100_000_000.0) // 100 MB/s combined RX+TX cap
.build()
.await?;
```

When observed RX+TX throughput approaches the cap, the pressure rises toward 1.0 and the `ThrottlePolicy` starts throttling lower-priority tasks. This is especially useful for upload/download workloads where you want to avoid saturating a network link.

### Diagnostics

The `SchedulerSnapshot` includes pressure readings for debugging:
Expand Down
1 change: 1 addition & 0 deletions docs/progress-reporting.md
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ All scheduler state changes are broadcast as `SchedulerEvent` variants:
| `Preempted { task_id, task_type, key, label }` | Task paused for higher-priority work |
| `Cancelled { task_id, task_type, key, label }` | Task cancelled via `scheduler.cancel()` |
| `Progress { task_id, task_type, key, label, percent, message }` | Progress update from executor |
| `Waiting { task_id, children_count }` | Parent task entered waiting state after spawning children |
| `Paused` | Scheduler globally paused via `pause_all()` |
| `Resumed` | Scheduler resumed via `resume_all()` |

Expand Down
3 changes: 2 additions & 1 deletion docs/quick-start.md
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,9 @@ For full control over components, use `Scheduler::new()` directly:
use std::sync::Arc;
use taskmill::{
CompositePressure, Scheduler, SchedulerConfig,
TaskStore, TaskTypeRegistry, ThrottlePolicy,
TaskStore, ThrottlePolicy,
};
use taskmill::registry::TaskTypeRegistry;

let store = TaskStore::open("tasks.db").await.unwrap();

Expand Down
16 changes: 16 additions & 0 deletions migrations/003_net_io_and_groups.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
-- Network IO columns for tasks table.
ALTER TABLE tasks ADD COLUMN expected_net_rx_bytes INTEGER NOT NULL DEFAULT 0;
ALTER TABLE tasks ADD COLUMN expected_net_tx_bytes INTEGER NOT NULL DEFAULT 0;
ALTER TABLE tasks ADD COLUMN group_key TEXT;

-- Network IO columns for task_history table.
ALTER TABLE task_history ADD COLUMN expected_net_rx_bytes INTEGER NOT NULL DEFAULT 0;
ALTER TABLE task_history ADD COLUMN expected_net_tx_bytes INTEGER NOT NULL DEFAULT 0;
ALTER TABLE task_history ADD COLUMN actual_net_rx_bytes INTEGER;
ALTER TABLE task_history ADD COLUMN actual_net_tx_bytes INTEGER;
ALTER TABLE task_history ADD COLUMN group_key TEXT;

-- Index for group concurrency checks (running tasks per group).
CREATE INDEX IF NOT EXISTS idx_tasks_group_running
ON tasks (group_key, status)
WHERE group_key IS NOT NULL AND status = 'running';
4 changes: 4 additions & 0 deletions src/backpressure.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@
//! [`ThrottlePolicy`] maps the aggregate pressure to per-[`Priority`]
//! throttle decisions. Customize the policy with
//! [`SchedulerBuilder::throttle_policy`](crate::SchedulerBuilder::throttle_policy).
//!
//! A built-in [`NetworkPressure`](crate::NetworkPressure) source is available for
//! throttling based on network bandwidth — enable it via
//! [`SchedulerBuilder::bandwidth_limit`](crate::SchedulerBuilder::bandwidth_limit).

use crate::priority::Priority;

Expand Down
57 changes: 52 additions & 5 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
//! - Persists tasks to SQLite so the queue survives restarts
//! - Schedules by priority (0 = highest, 255 = lowest) with [named tiers](Priority)
//! - Deduplicates tasks by key — submitting an already-queued key is a no-op
//! - Tracks expected and actual IO bytes per task for budget-based scheduling
//! - Monitors system CPU and disk throughput to adjust concurrency
//! - Tracks expected and actual IO bytes (disk and network) per task for budget-based scheduling
//! - Monitors system CPU, disk, and network throughput to adjust concurrency
//! - Supports [composable backpressure](PressureSource) from arbitrary external sources
//! - Preempts lower-priority work when high-priority tasks arrive
//! - [Retries](TaskError::retryable) failed tasks at the same priority level
Expand Down Expand Up @@ -69,6 +69,27 @@
//! which feeds back into historical throughput averages for future scheduling
//! decisions.
//!
//! ### Network IO
//!
//! Tasks can also declare expected network IO via
//! [`TaskSubmission::expected_net_io`] (or [`TypedTask::expected_net_rx_bytes`] /
//! [`expected_net_tx_bytes`](TypedTask::expected_net_tx_bytes)). Executors report
//! actual network bytes via [`TaskContext::record_net_rx_bytes`] /
//! [`record_net_tx_bytes`](TaskContext::record_net_tx_bytes). To throttle tasks
//! when network bandwidth is saturated, set a bandwidth cap with
//! [`SchedulerBuilder::bandwidth_limit`] — this registers a built-in
//! [`NetworkPressure`] source that maps observed throughput to backpressure.
//!
//! ## Task groups
//!
//! Tasks can be assigned to a named group via [`TaskSubmission::group`] (or
//! [`TypedTask::group_key`]). The scheduler enforces per-group concurrency
//! limits — for example, limiting uploads to any single S3 bucket to 4
//! concurrent tasks. Configure limits at build time with
//! [`SchedulerBuilder::group_concurrency`] and
//! [`SchedulerBuilder::default_group_concurrency`], or adjust at runtime via
//! [`Scheduler::set_group_limit`] and [`Scheduler::set_default_group_concurrency`].
//!
//! ## Child tasks & two-phase execution
//!
//! An executor can spawn child tasks via [`TaskContext::spawn_child`]. When
Expand Down Expand Up @@ -218,6 +239,31 @@
//! returns a serializable [`SchedulerSnapshot`] with queue depths, running
//! tasks, progress estimates, and backpressure.
//!
//! ## Group concurrency
//!
//! Limit concurrent tasks within a named group — for example, cap uploads
//! per S3 bucket:
//!
//! ```ignore
//! let scheduler = Scheduler::builder()
//! .store_path("tasks.db")
//! .executor("upload-part", Arc::new(UploadPartExecutor))
//! .default_group_concurrency(4) // default for all groups
//! .group_concurrency("s3://hot-bucket", 8) // override for one group
//! .build()
//! .await?;
//!
//! // Tasks declare their group via the submission:
//! let sub = TaskSubmission::new("upload-part")
//! .group("s3://my-bucket")
//! .payload_json(&part)?;
//! scheduler.submit(&sub).await?;
//!
//! // Adjust at runtime:
//! scheduler.set_group_limit("s3://my-bucket", 2);
//! scheduler.remove_group_limit("s3://my-bucket"); // fall back to default
//! ```
//!
//! ## Child tasks
//!
//! Spawn child tasks from an executor to model fan-out work. The parent
Expand Down Expand Up @@ -262,8 +308,8 @@
//! 3. Pending finalizers (parents whose children all completed) are
//! dispatched first.
//! 4. The highest-priority pending task is peeked (without claiming it).
//! 5. The dispatch gate checks concurrency limits, IO budget, and
//! backpressure. If the gate rejects, no slot is consumed.
//! 5. The dispatch gate checks concurrency limits, group concurrency,
//! IO budget, and backpressure. If the gate rejects, no slot is consumed.
//! 6. If admitted, the task is atomically claimed (`peek` → `pop_by_id`)
//! and spawned as a Tokio task.
//! 7. Steps 4–6 repeat until the queue is empty or the gate rejects.
Expand All @@ -288,10 +334,11 @@ pub mod task;
pub use backpressure::{CompositePressure, PressureSource, ThrottlePolicy};
pub use priority::Priority;
pub use registry::{TaskContext, TaskExecutor};
pub use resource::network_pressure::NetworkPressure;
pub use resource::sampler::SamplerConfig;
pub use resource::{ResourceReader, ResourceSampler, ResourceSnapshot};
pub use scheduler::{
EstimatedProgress, ProgressReporter, Scheduler, SchedulerBuilder, SchedulerConfig,
EstimatedProgress, GroupLimits, ProgressReporter, Scheduler, SchedulerBuilder, SchedulerConfig,
SchedulerEvent, SchedulerSnapshot, ShutdownMode,
};
pub use store::{RetentionPolicy, StoreConfig, StoreError, TaskStore};
Expand Down
22 changes: 22 additions & 0 deletions src/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,20 +142,26 @@ impl ChildSpawner {
pub(crate) struct IoTracker {
pub read_bytes: AtomicI64,
pub write_bytes: AtomicI64,
pub net_rx_bytes: AtomicI64,
pub net_tx_bytes: AtomicI64,
}

impl IoTracker {
pub fn new() -> Self {
Self {
read_bytes: AtomicI64::new(0),
write_bytes: AtomicI64::new(0),
net_rx_bytes: AtomicI64::new(0),
net_tx_bytes: AtomicI64::new(0),
}
}

pub fn snapshot(&self) -> crate::task::TaskMetrics {
crate::task::TaskMetrics {
read_bytes: self.read_bytes.load(Ordering::Relaxed),
write_bytes: self.write_bytes.load(Ordering::Relaxed),
net_rx_bytes: self.net_rx_bytes.load(Ordering::Relaxed),
net_tx_bytes: self.net_tx_bytes.load(Ordering::Relaxed),
}
}
}
Expand Down Expand Up @@ -264,6 +270,22 @@ impl TaskContext {
self.io.write_bytes.fetch_add(bytes, Ordering::Relaxed);
}

/// Record actual bytes received over the network during this task's execution.
///
/// Can be called multiple times — values are accumulated. The scheduler
/// reads the total after the executor returns.
pub fn record_net_rx_bytes(&self, bytes: i64) {
self.io.net_rx_bytes.fetch_add(bytes, Ordering::Relaxed);
}

/// Record actual bytes transmitted over the network during this task's execution.
///
/// Can be called multiple times — values are accumulated. The scheduler
/// reads the total after the executor returns.
pub fn record_net_tx_bytes(&self, bytes: i64) {
self.io.net_tx_bytes.fetch_add(bytes, Ordering::Relaxed);
}

// ── Task submission (scoped scheduler access) ────────────────────

/// Submit a continuation or follow-up task.
Expand Down
Loading
Loading