Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion benches/history.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ async fn store_with_history(n: usize) -> TaskStore {
.submit(&TaskSubmission::new(task_type).key(format!("h-{i}")))
.await
.unwrap();
let task = store.pop_next().await.unwrap().unwrap();
let task = store.pop_next(None).await.unwrap().unwrap();
store.complete(task.id, &budget).await.unwrap();
}
store
Expand Down
2 changes: 1 addition & 1 deletion benches/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ fn bench_peek_next_varying_depth(c: &mut Criterion) {
async move {
let start = Instant::now();
for _ in 0..iters {
let _ = store.peek_next().await.unwrap();
let _ = store.peek_next(None).await.unwrap();
}
start.elapsed()
}
Expand Down
60 changes: 60 additions & 0 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ Controls scheduling behavior. Set via builder methods or pass directly to `Sched
| `shutdown_mode` | `ShutdownMode` | `Hard` | `Hard` cancels immediately. `Graceful(Duration)` waits for running tasks. | Always use `Graceful` for desktop apps to avoid data loss. |
| `default_ttl` | `Option<Duration>` | `None` | Global TTL applied to tasks without a per-task or per-type TTL. | Set to catch stale tasks (e.g., `Duration::from_secs(3600)` for 1 hour). |
| `expiry_sweep_interval` | `Option<Duration>` | `Some(30s)` | How often the scheduler sweeps for expired tasks. `None` disables periodic sweeps (dispatch-time checks still apply). | Lower for latency-sensitive expiry; `None` if you only need dispatch-time checks. |
| `aging_config` | `Option<AgingConfig>` | `None` | Priority aging configuration. When enabled, pending tasks are gradually promoted in effective priority over time. | See [Priorities & Preemption — Priority aging](priorities-and-preemption.md#priority-aging). |

### Builder methods

Expand Down Expand Up @@ -395,6 +396,61 @@ for rl in &snap.rate_limits {
}
```

## Priority aging

Priority aging prevents starvation of low-priority work by gradually promoting tasks that have been waiting too long. See [Priorities & Preemption — Priority aging](priorities-and-preemption.md#priority-aging) for the full explanation.

```rust
use taskmill::{AgingConfig, Priority, Scheduler};
use std::time::Duration;

Scheduler::builder()
.priority_aging(AgingConfig {
grace_period: Duration::from_secs(300), // 5 min before aging starts
aging_interval: Duration::from_secs(60), // promote 1 level per minute
max_effective_priority: Priority::HIGH, // can't age above HIGH
urgent_threshold: None, // disable urgent override
})
// ...
.build()
.await?;
```

The `AgingConfig::default()` uses the values shown above. Effective priority is computed at dispatch time — the stored priority column is never mutated.

## Weighted fair scheduling

When multiple task groups compete for dispatch slots, weighted fair scheduling allocates capacity proportionally. See [Priorities & Preemption — Weighted fair scheduling](priorities-and-preemption.md#weighted-fair-scheduling) for the full explanation.

```rust
Scheduler::builder()
.group_weight("uploads", 3)
.group_weight("indexing", 1)
.group_minimum_slots("alerts", 2)
.default_group_weight(1)
// ...
.build()
.await?;
```

Adjust at runtime:

```rust
scheduler.set_group_weight("uploads", 5);
scheduler.remove_group_weight("uploads");
scheduler.reset_group_weights();
scheduler.set_group_minimum_slots("alerts", 4);
```

Current allocations are visible in the scheduler snapshot:

```rust
let snap = scheduler.snapshot().await?;
for alloc in &snap.group_allocations {
println!("{}: {} slots (weight {})", alloc.group, alloc.allocated_slots, alloc.weight);
}
```

## Tuning for specific workloads

### Desktop app with file processing
Expand Down Expand Up @@ -484,6 +540,10 @@ Scheduler::builder()
| `group_concurrency(group, n)` | Per-group concurrency limit override. |
| `rate_limit(task_type, limit)` | Set a token-bucket rate limit for a task type. |
| `group_rate_limit(group, limit)` | Set a token-bucket rate limit for a task group. |
| `priority_aging(config)` | Enable [priority aging](priorities-and-preemption.md#priority-aging) with the given `AgingConfig`. |
| `group_weight(group, weight)` | Set a relative scheduling weight for a task group. See [Weighted fair scheduling](priorities-and-preemption.md#weighted-fair-scheduling). |
| `default_group_weight(weight)` | Default weight for groups without a specific override. Default: 1. |
| `group_minimum_slots(group, slots)` | Minimum guaranteed dispatch slots for a group, regardless of weight. |
| `app_state(state)` | Register global state visible to all domains. |
| `app_state_arc(arc)` | Register global state from a pre-existing `Arc`. |
| `build()` | Build and return the `Scheduler`. |
Expand Down
17 changes: 13 additions & 4 deletions docs/design.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,15 @@ taskmill/src/
mod.rs — Scheduler, SchedulerBuilder, public API
run_loop.rs — main event loop, dispatch cycle
submit.rs — submit, submit_batch, cancellation
control.rs — pause/resume, concurrency limits
control.rs — pause/resume, concurrency limits, group weights
queries.rs — snapshot, active tasks, progress
gate.rs — DispatchGate, IO budget check
dispatch.rs — ActiveTaskMap, spawn_task, preemption
progress.rs — ProgressReporter, throughput extrapolation
event.rs — SchedulerEvent, SchedulerSnapshot
aging.rs — AgingConfig, AgingParams, effective_priority()
fair.rs — GroupWeights, SlotAllocation, compute_allocation()
rate_limit.rs — RateLimit, token-bucket rate limiting
resource/
mod.rs — ResourceSampler + ResourceReader traits
sampler.rs — EWMA-smoothed background loop
Expand Down Expand Up @@ -183,9 +186,15 @@ Each cycle, the loop:

1. Checks if the scheduler is globally paused.
2. Sweeps expired tasks (if the expiry sweep interval has elapsed).
3. Resumes paused tasks if no active preemptors remain.
4. While `active_count < max_concurrency`: peek the next candidate, check for TTL expiry, check the dispatch gate, pop-by-id if admitted, spawn the executor.
5. Sleep until the next signal.
3. Auto-resumes timed group pauses that have reached their deadline.
4. Resumes paused tasks if no active preemptors remain.
5. Dispatches pending tasks using one of three paths:
- **Fair dispatch** (when group weights are configured) — three-pass loop: fair per-group allocation, greedy fill, urgent threshold override.
- **Fast dispatch** (when no groups, no monitoring, no pressure, no rate limits, no weights) — batch `pop_next` in priority order with no gate checks.
- **Slow dispatch** (otherwise) — peek the next candidate, check for TTL expiry, check the dispatch gate, pop-by-id if admitted, spawn the executor.
6. Sleep until the next signal.

When [priority aging](priorities-and-preemption.md#priority-aging) is enabled, `AgingParams` are computed once per dispatch cycle and passed to peek/pop queries. The SQL ORDER BY uses a computed expression for effective priority instead of the stored priority column.

## Retry flow

Expand Down
11 changes: 8 additions & 3 deletions docs/glossary.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,19 @@ Quick reference for terms used throughout the taskmill documentation.
| **Dependency edge** | A record in the `task_deps` junction table representing that one task depends on another. An edge `(A, B)` means "task A cannot start until task B completes." Edges are removed when the dependency completes or fails, and are cleaned up on startup if they reference tasks that no longer exist. |
| **DependencyFailurePolicy** | Controls what happens to a dependent task when one of its dependencies fails permanently. `Cancel` (default) moves the dependent to history as `DependencyFailed` and cascades to other dependents. `Fail` does the same without cascading. `Ignore` unblocks the dependent anyway. See [Configuration](configuration.md#dependency-failure-policy). |
| **Deduplication (dedup)** | Preventing the same task from being queued twice. Taskmill generates a SHA-256 key from the task type and payload; a second submission with the same key is silently ignored. See [Persistence & Recovery](persistence-and-recovery.md#deduplication). |
| **Dispatch** | Moving a task from "waiting in line" (pending) to "actively running." The scheduler dispatches tasks in priority order, subject to concurrency limits and backpressure. |
| **Dispatch** | Moving a task from "waiting in line" (pending) to "actively running." The scheduler dispatches tasks in priority order (or effective priority order when aging is enabled), subject to concurrency limits, group weights, and backpressure. |
| **EWMA** | Exponentially Weighted Moving Average — a smoothing technique that gives recent measurements more weight than old ones. Taskmill uses EWMA to smooth resource readings so the scheduler doesn't overreact to momentary spikes. See [IO & Backpressure](io-and-backpressure.md#ewma-smoothing). |
| **TypedExecutor** | Your code that performs the actual work for a task type. Implements `TypedExecutor<T>` and receives a deserialized payload: `async fn execute(&self, payload: T, ctx: DomainTaskContext<'_, T::Domain>)`. Register with `Domain::task::<T>(executor)`. See [Quick Start](quick-start.md#implement-an-executor). |
| **IO budget** | An estimate of how many bytes a task will read and write (disk and/or network), submitted alongside the task. The scheduler uses IO budgets to avoid overwhelming the disk. See [IO & Backpressure](io-and-backpressure.md#io-budgets-telling-the-scheduler-what-to-expect). |
| **Pile-up prevention** | The mechanism that skips a recurring task instance when the previous instance hasn't been dispatched yet, preventing unbounded queue growth under sustained load. See [Quick Start](quick-start.md#recurring-tasks). |
| **Priority aging** | An anti-starvation mechanism that gradually promotes pending tasks in effective priority over time. Configured via `AgingConfig` on `SchedulerBuilder::priority_aging()`. The stored priority is never mutated — effective priority is computed at dispatch time. See [Priorities & Preemption](priorities-and-preemption.md#priority-aging). |
| **Preemption** | Pausing lower-priority work so higher-priority work can run immediately. Preempted tasks resume automatically once the urgent work finishes. See [Priorities & Preemption](priorities-and-preemption.md#preemption). |
| **Pressure source** | Anything that signals the system is busy — disk IO, network throughput, memory usage, API rate limits, battery level. Returns a value from 0.0 (idle) to 1.0 (saturated). See [IO & Backpressure](io-and-backpressure.md#pressure-sources). |
| **Rate limit** | A token-bucket cap on how many tasks start per unit of time, independent of concurrency. Scoped by task type and/or group. Configured via `RateLimit::per_second(n)` / `per_minute(n)` with optional `.with_burst(b)`. See [Configuration — Rate limiting](configuration.md#rate-limiting). |
| **Task group** | A named set of tasks that share a concurrency limit and/or rate limit. For example, you might limit uploads to a specific S3 bucket to 3 concurrent and 100/sec. See [Priorities & Preemption](priorities-and-preemption.md#task-groups). |
| **Task group** | A named set of tasks that share a concurrency limit, rate limit, and/or scheduling weight. For example, you might limit uploads to a specific S3 bucket to 3 concurrent and 100/sec, or give it 3x the scheduling weight of other groups. See [Priorities & Preemption](priorities-and-preemption.md#task-groups). |
| **Group weight** | A relative scheduling weight assigned to a task group for [weighted fair scheduling](priorities-and-preemption.md#weighted-fair-scheduling). Weights are relative — `(A:3, B:1)` gives A 75% and B 25% of capacity. Groups without an explicit weight use `default_group_weight` (default: 1). Configurable at build time and runtime. |
| **Fair scheduling** | A dispatch strategy that allocates slots proportionally to group weights using a three-pass loop: fair per-group allocation, greedy fill, and urgent threshold override. Enabled when any group weights are configured. See [Priorities & Preemption](priorities-and-preemption.md#weighted-fair-scheduling). |
| **Urgent threshold** | A priority level in `AgingConfig` at which aged tasks may bypass group weight allocation during fair dispatch. Tasks that age past this threshold are dispatched in the urgent pass regardless of their group's weight allocation, but still respect `max_concurrency`. |
| **task_deps** | The SQLite junction table that stores dependency edges between tasks. Each row `(task_id, depends_on_id)` means the task cannot start until the dependency completes. Edges survive restarts and are cleaned up automatically when dependencies resolve or on startup. See [Persistence & Recovery](persistence-and-recovery.md#dependency-recovery). |
| **Throttle policy** | Rules that map system pressure to dispatch decisions. The default policy defers background tasks when pressure exceeds 50% and normal tasks when it exceeds 75%, but never blocks high-priority work. See [Priorities & Preemption](priorities-and-preemption.md#throttle-behavior). |
| **TTL (time-to-live)** | A duration after which a task automatically expires if it hasn't started running. Configurable per-task, per-type, or as a global default. See [Configuration](configuration.md#task-ttl-time-to-live). |
Expand All @@ -36,5 +40,6 @@ Quick reference for terms used throughout the taskmill documentation.
| **TypedEventStream** | A per-task-type event subscription (`TypedEventStream<T>`) created via `handle.task_events::<T>()`. Filters the global scheduler event broadcast to only events matching `T::TASK_TYPE` within the domain. Terminal events include the `TaskHistoryRecord`. |
| **Qualified task type** | The full database-stored task type including the domain prefix, e.g. `"media::thumbnail"`. Required when using store-level query APIs (`history_stats`, `task_lookup`, `avg_throughput`). `DomainHandle` methods apply the prefix automatically, so you typically only need the short form when submitting tasks. |
| **Cross-domain dependency** | A dependency edge where the dependent task and its prerequisite belong to different domains. Functionally identical to same-domain dependencies — the domain boundary does not affect dependency resolution or failure propagation. See [Multi-Module Applications](multi-module-apps.md#cross-module-task-dependencies). |
| **Domain starvation** | A condition where one domain's tasks are never dispatched because higher-priority tasks from other domains continuously consume available concurrency slots. Priority ordering is global across all domains. Mitigated by assigning appropriate priorities and using group concurrency to reserve capacity. See [Multi-Module Applications](multi-module-apps.md#module-starvation-understanding-priority-competition). |
| **Domain starvation** | A condition where one domain's tasks are never dispatched because higher-priority tasks from other domains continuously consume available concurrency slots. Priority ordering is global across all domains. Mitigated by [priority aging](priorities-and-preemption.md#priority-aging), [group weights](priorities-and-preemption.md#weighted-fair-scheduling), assigning appropriate priorities, and using group concurrency to reserve capacity. See [Multi-Module Applications](multi-module-apps.md#module-starvation-understanding-priority-competition). |
| **Effective priority** | The dispatch-time priority of a task when [priority aging](priorities-and-preemption.md#priority-aging) is enabled. Computed as `base_priority - promotions` (clamped to `max_effective_priority`). Equals the stored (base) priority when aging is disabled or the task hasn't waited past the grace period. Visible in `TaskEventHeader` and snapshots. |
| **Late-binding state** | Application state injected into the scheduler after `build()` via `scheduler.register_state()`. Useful for library crates that receive a pre-built `Scheduler` as a dependency. Must be called before `scheduler.run()` — calling it after tasks are dispatching has no ordering guarantees with in-flight executors. |
32 changes: 29 additions & 3 deletions docs/multi-module-apps.md
Original file line number Diff line number Diff line change
Expand Up @@ -196,9 +196,35 @@ A task is dispatched when **all** of these pass:

A domain with only `BACKGROUND`-priority tasks can be indefinitely deferred when other domains continuously submit `NORMAL` work. This is by design — priority ordering is global across all domains.

If you need guaranteed throughput for a domain:
- **Raise the priority** of its most important tasks to `NORMAL` or `HIGH`.
- **Use task groups** with a dedicated concurrency reservation. A group limit acts as a soft floor: tasks in the group bypass the global priority queue as long as the group has available slots.
Taskmill provides several tools to address this:

- **[Priority aging](priorities-and-preemption.md#priority-aging)** — automatically promotes tasks that have been waiting too long, ensuring even `IDLE` tasks eventually get dispatched.
- **[Weighted fair scheduling](priorities-and-preemption.md#weighted-fair-scheduling)** — allocates dispatch slots proportionally to group weights, guaranteeing each group a fair share of capacity regardless of priority levels.
- **Raise the priority** of the domain's most important tasks to `NORMAL` or `HIGH`.
- **Use task groups** with a dedicated concurrency reservation or minimum slot guarantee.

### Using group weights for guaranteed throughput

The most effective solution is weighted fair scheduling with minimum slot guarantees:

```rust
pub struct Sync;
impl DomainKey for Sync { const NAME: &'static str = "sync"; }

let scheduler = Scheduler::builder()
.domain(
Domain::<Sync>::new()
.task::<SyncTask>(SyncExecutor)
.default_group("sync-reserved")
.default_priority(Priority::BACKGROUND)
)
.group_weight("sync-reserved", 1) // participate in fair allocation
.group_minimum_slots("sync-reserved", 2) // guaranteed at least 2 slots
.priority_aging(AgingConfig::default()) // prevent indefinite starvation
.max_concurrency(16)
.build()
.await?;
```

### Using group concurrency as a soft floor

Expand Down
4 changes: 4 additions & 0 deletions docs/persistence-and-recovery.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ When your app starts up, taskmill automatically recovers:
- **Dedup keys stay occupied** — no duplicate submissions sneak in during recovery.
- **Retry counts are preserved** — a task that had retried twice before the crash still has two retries used.

When [priority aging](priorities-and-preemption.md#priority-aging) is enabled, crash recovery also accumulates stale `pause_duration_ms` for tasks that were paused at crash time. This ensures the aging clock is approximately correct after recovery (slightly over-promoting, which is acceptable for anti-starvation).

The guarantee is **at-least-once execution**: a task might run partially, crash, and re-run from the beginning. Design your executors to be idempotent (or to check for partial work) so re-execution is safe.

## Scheduled task recovery
Expand Down Expand Up @@ -195,6 +197,8 @@ You normally don't need to know the schema, but it's documented here for debuggi
| `ttl_seconds` | INTEGER | TTL duration in seconds (NULL = no TTL) |
| `ttl_from` | TEXT DEFAULT 'submission' | When TTL clock starts: `submission` or `first_attempt` |
| `expires_at` | TEXT | ISO 8601 deadline (NULL = no expiry) |
| `pause_duration_ms` | INTEGER DEFAULT 0 | Accumulated milliseconds spent in `paused` state. Excluded from the [priority aging](priorities-and-preemption.md#priority-aging) formula to freeze the aging clock while paused. |
| `paused_at_ms` | INTEGER | Epoch-ms timestamp of the most recent pause transition. `NULL` when the task is not paused. On resume, `pause_duration_ms` is accumulated and this is cleared. |

**Indexes:**
- `idx_tasks_pending(status, priority ASC, id ASC) WHERE status = 'pending'` — fast priority-ordered dispatch.
Expand Down
Loading
Loading