Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ async fn main() {
- **Avoid duplicate work** — key-based deduplication prevents the same task from being queued twice
- **React to system load** — composable backpressure from any signal (disk, network, memory, battery, API limits)
- **Control concurrency** — per-group limits (e.g., per S3 bucket), global limits, runtime-adjustable
- **Rate-limit dispatch** — token-bucket rate limits per task type and/or group cap start rate independently of concurrency
- **Build for Tauri** — `Clone`, `Serialize` on all types; events bridge directly to frontends

## Where to start
Expand Down
49 changes: 49 additions & 0 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,52 @@ scheduler.domain::<Media>().set_max_concurrency(8);
let current = scheduler.domain::<Media>().max_concurrency();
```

## Rate limiting

Rate limits control how many tasks *start per unit of time*, complementing concurrency limits which control how many run *simultaneously*. This is useful when fast tasks (e.g., small API calls completing in milliseconds) could produce bursts that exceed external rate limits.

Configure at build time:

```rust
use taskmill::{RateLimit, Scheduler};

Scheduler::builder()
// Task-type rate limit: at most 100 uploads per second.
.rate_limit("media::upload", RateLimit::per_second(100))
// Group rate limit: at most 50 requests/sec to this S3 bucket.
.group_rate_limit("s3://prod-bucket", RateLimit::per_second(50))
// Allow short bursts above the steady-state rate.
.rate_limit("media::thumbnail", RateLimit::per_second(10).with_burst(20))
// ...
.build()
.await?;
```

Adjust at runtime via the scheduler or a domain handle:

```rust
scheduler.set_rate_limit("media::upload", RateLimit::per_second(200));
scheduler.remove_rate_limit("media::upload");

// Domain handles auto-prefix the task type:
let media = scheduler.domain::<Media>();
media.set_rate_limit("upload", RateLimit::per_second(150)); // → "media::upload"
media.set_group_rate_limit("s3://prod-bucket", RateLimit::per_minute(3000));
```

A task can be subject to both a task-type rate limit and a group rate limit — it must pass both to be dispatched. Rate limit tokens are acquired *after* all free checks (backpressure, IO budget, concurrency, group pause) so tokens are never wasted on tasks that would be rejected anyway.

When a task is rate-limited, its `run_after` is set to the next token availability, removing it from the dispatch window so other task types can proceed without head-of-line blocking.

Current rate limit state is visible in the scheduler snapshot:

```rust
let snap = scheduler.snapshot().await?;
for rl in &snap.rate_limits {
println!("{}: {}/{} tokens available", rl.scope, rl.available_tokens, rl.burst);
}
```

## Tuning for specific workloads

### Desktop app with file processing
Expand Down Expand Up @@ -386,6 +432,7 @@ Scheduler::builder()
.with_resource_monitoring()
.bandwidth_limit(50_000_000.0) // 50 MB/s cap
.group_concurrency("s3-bucket", 4) // per-endpoint limits
.group_rate_limit("s3-bucket", RateLimit::per_second(100)) // stay under API rate limit
.shutdown_mode(ShutdownMode::Graceful(Duration::from_secs(30)))
```

Expand Down Expand Up @@ -435,6 +482,8 @@ Scheduler::builder()
| `bandwidth_limit(bytes_per_sec)` | Set a network bandwidth cap; registers a built-in `NetworkPressure` source. |
| `default_group_concurrency(n)` | Default concurrency limit for grouped tasks (0 = unlimited). |
| `group_concurrency(group, n)` | Per-group concurrency limit override. |
| `rate_limit(task_type, limit)` | Set a token-bucket rate limit for a task type. |
| `group_rate_limit(group, limit)` | Set a token-bucket rate limit for a task group. |
| `app_state(state)` | Register global state visible to all domains. |
| `app_state_arc(arc)` | Register global state from a pre-existing `Arc`. |
| `build()` | Build and return the `Scheduler`. |
Expand Down
3 changes: 2 additions & 1 deletion docs/glossary.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ Quick reference for terms used throughout the taskmill documentation.
| **Pile-up prevention** | The mechanism that skips a recurring task instance when the previous instance hasn't been dispatched yet, preventing unbounded queue growth under sustained load. See [Quick Start](quick-start.md#recurring-tasks). |
| **Preemption** | Pausing lower-priority work so higher-priority work can run immediately. Preempted tasks resume automatically once the urgent work finishes. See [Priorities & Preemption](priorities-and-preemption.md#preemption). |
| **Pressure source** | Anything that signals the system is busy — disk IO, network throughput, memory usage, API rate limits, battery level. Returns a value from 0.0 (idle) to 1.0 (saturated). See [IO & Backpressure](io-and-backpressure.md#pressure-sources). |
| **Task group** | A named set of tasks that share a concurrency limit. For example, you might limit uploads to a specific S3 bucket to 3 at a time. See [Priorities & Preemption](priorities-and-preemption.md#task-groups). |
| **Rate limit** | A token-bucket cap on how many tasks start per unit of time, independent of concurrency. Scoped by task type and/or group. Configured via `RateLimit::per_second(n)` / `per_minute(n)` with optional `.with_burst(b)`. See [Configuration — Rate limiting](configuration.md#rate-limiting). |
| **Task group** | A named set of tasks that share a concurrency limit and/or rate limit. For example, you might limit uploads to a specific S3 bucket to 3 concurrent and 100/sec. See [Priorities & Preemption](priorities-and-preemption.md#task-groups). |
| **task_deps** | The SQLite junction table that stores dependency edges between tasks. Each row `(task_id, depends_on_id)` means the task cannot start until the dependency completes. Edges survive restarts and are cleaned up automatically when dependencies resolve or on startup. See [Persistence & Recovery](persistence-and-recovery.md#dependency-recovery). |
| **Throttle policy** | Rules that map system pressure to dispatch decisions. The default policy defers background tasks when pressure exceeds 50% and normal tasks when it exceeds 75%, but never blocks high-priority work. See [Priorities & Preemption](priorities-and-preemption.md#throttle-behavior). |
| **TTL (time-to-live)** | A duration after which a task automatically expires if it hasn't started running. Configurable per-task, per-type, or as a global default. See [Configuration](configuration.md#task-ttl-time-to-live). |
Expand Down
20 changes: 20 additions & 0 deletions docs/priorities-and-preemption.md
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,26 @@ scheduler.remove_group_limit("bucket-prod").await;

Group limits are checked *in addition to* `max_concurrency` — a task must pass both the global and group gate to be dispatched.

### Rate limiting

While concurrency limits control how many tasks run *simultaneously*, rate limits control how many tasks *start per unit of time*. This is essential when fast tasks (completing in milliseconds) produce bursts that overwhelm external APIs.

Rate limits use a token-bucket algorithm and can be scoped by task type, group, or both:

```rust
Scheduler::builder()
.rate_limit("media::upload", RateLimit::per_second(100))
.group_rate_limit("s3://prod", RateLimit::per_second(50).with_burst(75))
.build()
.await?;
```

When a task is rate-limited, the scheduler sets its `run_after` to the next token availability. This prevents head-of-line blocking — other task types dispatch normally while the rate-limited type waits.

Rate limits are checked *after* all other gate checks (backpressure, IO budget, concurrency), so tokens are never wasted on tasks that would be rejected for other reasons.

See [Configuration — Rate limiting](configuration.md#rate-limiting) for full details and runtime adjustment APIs.

## Domain-level pause and resume

Individual domains can be paused and resumed independently, without affecting other domains. This is useful for features like a user-togglable sync, or temporarily disabling a domain during maintenance.
Expand Down
26 changes: 26 additions & 0 deletions src/domain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -710,6 +710,32 @@ impl<D: DomainKey> DomainHandle<D> {
self.inner.max_concurrency()
}

// ── Rate Limiting ──────────────────────────────────────────────

/// Set a rate limit for a task type in this domain.
pub fn set_rate_limit(&self, task_type: &str, limit: crate::scheduler::RateLimit) {
self.inner.set_rate_limit(task_type, limit);
}

/// Remove the rate limit for a task type in this domain.
pub fn remove_rate_limit(&self, task_type: &str) {
self.inner.remove_rate_limit(task_type);
}

/// Set a rate limit for a task group.
pub fn set_group_rate_limit(
&self,
group: impl Into<String>,
limit: crate::scheduler::RateLimit,
) {
self.inner.set_group_rate_limit(group, limit);
}

/// Remove a group rate limit.
pub fn remove_group_rate_limit(&self, group: &str) {
self.inner.remove_group_rate_limit(group);
}

// ── Events ──────────────────────────────────────────────────────

/// Subscribe to all events for this domain.
Expand Down
8 changes: 5 additions & 3 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
//! - Supports [task superseding](DuplicateStrategy::Supersede) for atomic cancel-and-replace
//! - Supports [task TTL](TtlFrom) with automatic expiry, per-type defaults, and child inheritance
//! - Supports [graceful shutdown](ShutdownMode) with configurable drain timeout
//! - Supports [token-bucket rate limiting](RateLimit) per task type and/or group to cap start rate
//! independently of concurrency
//!
//! # Concepts
//!
Expand Down Expand Up @@ -806,9 +808,9 @@ pub use resource::network_pressure::NetworkPressure;
pub use resource::sampler::SamplerConfig;
pub use resource::{ResourceReader, ResourceSampler, ResourceSnapshot};
pub use scheduler::{
EstimatedProgress, GroupLimits, PausedGroupInfo, ProgressReporter, Scheduler, SchedulerBuilder,
SchedulerConfig, SchedulerEvent, SchedulerSnapshot, ShutdownMode, TaskEventHeader,
TaskProgress,
EstimatedProgress, GroupLimits, PausedGroupInfo, ProgressReporter, RateLimit, RateLimitInfo,
Scheduler, SchedulerBuilder, SchedulerConfig, SchedulerEvent, SchedulerSnapshot, ShutdownMode,
TaskEventHeader, TaskProgress,
};
pub use store::{RetentionPolicy, StoreConfig, StoreError, TaskStore};
pub use task::{
Expand Down
31 changes: 31 additions & 0 deletions src/module.rs
Original file line number Diff line number Diff line change
Expand Up @@ -714,6 +714,37 @@ impl ModuleHandle {
.unwrap_or(0)
}

// ── Rate Limiting ─────────────────────────────────────────────

/// Set a rate limit for a task type in this module.
///
/// The `task_type` is the unprefixed name (e.g. `"upload"` for a module
/// named `"media"` becomes `"media::upload"` internally).
pub fn set_rate_limit(&self, task_type: &str, limit: crate::scheduler::RateLimit) {
let prefixed = format!("{}{}", self.prefix, task_type);
self.scheduler.set_rate_limit(prefixed, limit);
}

/// Remove the rate limit for a task type in this module.
pub fn remove_rate_limit(&self, task_type: &str) {
let prefixed = format!("{}{}", self.prefix, task_type);
self.scheduler.remove_rate_limit(&prefixed);
}

/// Set a rate limit for a task group.
pub fn set_group_rate_limit(
&self,
group: impl Into<String>,
limit: crate::scheduler::RateLimit,
) {
self.scheduler.set_group_rate_limit(group, limit);
}

/// Remove a group rate limit.
pub fn remove_group_rate_limit(&self, group: &str) {
self.scheduler.remove_group_rate_limit(group);
}

// ── Scoped queries ────────────────────────────────────────────

/// All active tasks in this module (any status).
Expand Down
41 changes: 40 additions & 1 deletion src/scheduler/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ use crate::resource::sampler::{SamplerConfig, SmoothedReader};
use crate::resource::{ResourceReader, ResourceSampler};
use crate::store::{StoreConfig, StoreError, TaskStore};

use super::rate_limit::RateLimit;

use super::event::{SchedulerConfig, ShutdownMode};
use super::Scheduler;

Expand Down Expand Up @@ -52,6 +54,8 @@ pub struct SchedulerBuilder {
bandwidth_limit_bps: Option<f64>,
default_group_concurrency: usize,
group_concurrency_overrides: Vec<(String, usize)>,
type_rate_limits: Vec<(String, RateLimit)>,
group_rate_limits: Vec<(String, RateLimit)>,
}

impl SchedulerBuilder {
Expand All @@ -72,6 +76,8 @@ impl SchedulerBuilder {
bandwidth_limit_bps: None,
default_group_concurrency: 0,
group_concurrency_overrides: Vec::new(),
type_rate_limits: Vec::new(),
group_rate_limits: Vec::new(),
}
}

Expand Down Expand Up @@ -245,6 +251,24 @@ impl SchedulerBuilder {
self
}

/// Set a rate limit for a task type.
///
/// The `task_type` should be the full prefixed name (e.g. `"media::upload"`).
/// Multiple calls with the same key overwrite.
pub fn rate_limit(mut self, task_type: impl Into<String>, limit: RateLimit) -> Self {
self.type_rate_limits.push((task_type.into(), limit));
self
}

/// Set a rate limit for a task group.
///
/// Tasks with a matching `group_key` are rate-limited to the configured
/// rate, independent of task-type rate limits (both must pass).
pub fn group_rate_limit(mut self, group: impl Into<String>, limit: RateLimit) -> Self {
self.group_rate_limits.push((group.into(), limit));
self
}

/// Register shared application state accessible from every executor via
/// [`TaskContext::state`](crate::TaskContext::state).
///
Expand Down Expand Up @@ -453,6 +477,16 @@ impl SchedulerBuilder {
std::sync::atomic::Ordering::Relaxed,
);

// Apply rate limits.
let has_rate_limits =
!self.type_rate_limits.is_empty() || !self.group_rate_limits.is_empty();
for (scope, limit) in self.type_rate_limits {
scheduler.inner.type_rate_limits.set(scope, limit);
}
for (scope, limit) in self.group_rate_limits {
scheduler.inner.group_rate_limits.set(scope, limit);
}

// Compute fast-dispatch eligibility before consuming builder fields.
let has_groups =
self.default_group_concurrency > 0 || !self.group_concurrency_overrides.is_empty();
Expand Down Expand Up @@ -501,7 +535,12 @@ impl SchedulerBuilder {
// Enable fast dispatch (single pop_next instead of peek + gate + claim)
// when no groups, no resource monitoring, no pressure sources, no
// module caps, and no paused groups are present.
if !has_groups && !has_monitoring && !has_pressure && !has_module_caps && !has_paused_groups
if !has_groups
&& !has_monitoring
&& !has_pressure
&& !has_module_caps
&& !has_paused_groups
&& !has_rate_limits
{
scheduler
.inner
Expand Down
39 changes: 38 additions & 1 deletion src/scheduler/control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use chrono::{DateTime, Utc};

use crate::store::StoreError;

use super::rate_limit::RateLimit;
use super::{emit_event, Scheduler, SchedulerEvent};

impl Scheduler {
Expand Down Expand Up @@ -241,16 +242,51 @@ impl Scheduler {
self.inner.paused_groups.read().unwrap().contains(group_key)
}

// ── Rate Limiting ──────────────────────────────────────────────

/// Set or update the rate limit for a task type at runtime.
///
/// If a bucket already exists, reconfigures in-place (preserving current
/// token count, clamped to new burst). If not, creates a new bucket.
pub fn set_rate_limit(&self, task_type: impl Into<String>, limit: RateLimit) {
self.inner.type_rate_limits.set(task_type.into(), limit);
self.inner
.fast_dispatch
.store(false, AtomicOrdering::Relaxed);
}

/// Remove the task-type rate limit, falling back to unlimited.
pub fn remove_rate_limit(&self, task_type: &str) {
self.inner.type_rate_limits.remove(task_type);
self.maybe_restore_fast_dispatch();
}

/// Set or update the rate limit for a task group at runtime.
pub fn set_group_rate_limit(&self, group: impl Into<String>, limit: RateLimit) {
self.inner.group_rate_limits.set(group.into(), limit);
self.inner
.fast_dispatch
.store(false, AtomicOrdering::Relaxed);
}

/// Remove the group rate limit, falling back to unlimited.
pub fn remove_group_rate_limit(&self, group: &str) {
self.inner.group_rate_limits.remove(group);
self.maybe_restore_fast_dispatch();
}

/// Re-evaluate whether fast dispatch can be re-enabled.
///
/// Must mirror the conditions in `SchedulerBuilder::build()`:
/// no paused groups, no group limits (default or overrides), no resource
/// monitoring, no pressure sources, no module concurrency caps.
/// monitoring, no pressure sources, no module concurrency caps, no rate limits.
fn maybe_restore_fast_dispatch(&self) {
let has_groups = self.inner.group_limits.default_limit() > 0
|| self.inner.group_limits.has_overrides()
|| !self.inner.paused_groups.read().unwrap().is_empty();
let has_module_caps = !self.inner.module_caps.read().unwrap().is_empty();
let has_rate_limits =
!self.inner.type_rate_limits.is_empty() || !self.inner.group_rate_limits.is_empty();

if !has_groups
&& !self
Expand All @@ -262,6 +298,7 @@ impl Scheduler {
.inner
.has_pressure_sources
.load(AtomicOrdering::Relaxed)
&& !has_rate_limits
{
self.inner
.fast_dispatch
Expand Down
3 changes: 3 additions & 0 deletions src/scheduler/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use tokio::time::Duration;
use crate::priority::Priority;

use super::progress::{EstimatedProgress, TaskProgress};
use super::rate_limit::RateLimitInfo;

// ── Snapshot ────────────────────────────────────────────────────────

Expand Down Expand Up @@ -51,6 +52,8 @@ pub struct SchedulerSnapshot {
pub blocked_count: i64,
/// Groups that are currently paused, with the timestamp each was paused.
pub paused_groups: Vec<PausedGroupInfo>,
/// Configured rate limits with current utilization.
pub rate_limits: Vec<RateLimitInfo>,
}

/// Information about a paused group for snapshot/dashboard display.
Expand Down
Loading
Loading