From 743861d60cd2b5bb66593acfedca79155e91207d Mon Sep 17 00:00:00 2001 From: DJ Majumdar Date: Fri, 13 Mar 2026 21:45:57 -0700 Subject: [PATCH 1/2] feat: add network IO pressure and per-group concurrency limiting Add network bandwidth monitoring alongside existing disk IO tracking, with a new NetworkPressure backpressure source and budget-based admission gating. Introduce group-based concurrency limits so tasks targeting the same resource (e.g. an S3 endpoint) can be throttled independently from the global concurrency cap. --- migrations/003_net_io_and_groups.sql | 16 +++ src/lib.rs | 3 +- src/registry.rs | 22 ++++ src/resource/mod.rs | 8 ++ src/resource/network_pressure.rs | 103 ++++++++++++++++++ src/resource/sampler.rs | 12 +++ src/resource/sysinfo_monitor.rs | 60 +++++++++-- src/scheduler/gate.rs | 149 ++++++++++++++++++++++++++- src/scheduler/mod.rs | 103 ++++++++++++++++-- src/store.rs | 82 ++++++++++++--- src/task.rs | 117 +++++++++++++++++++-- 11 files changed, 634 insertions(+), 41 deletions(-) create mode 100644 migrations/003_net_io_and_groups.sql create mode 100644 src/resource/network_pressure.rs diff --git a/migrations/003_net_io_and_groups.sql b/migrations/003_net_io_and_groups.sql new file mode 100644 index 0000000..f3eb2a0 --- /dev/null +++ b/migrations/003_net_io_and_groups.sql @@ -0,0 +1,16 @@ +-- Network IO columns for tasks table. +ALTER TABLE tasks ADD COLUMN expected_net_rx_bytes INTEGER NOT NULL DEFAULT 0; +ALTER TABLE tasks ADD COLUMN expected_net_tx_bytes INTEGER NOT NULL DEFAULT 0; +ALTER TABLE tasks ADD COLUMN group_key TEXT; + +-- Network IO columns for task_history table. +ALTER TABLE task_history ADD COLUMN expected_net_rx_bytes INTEGER NOT NULL DEFAULT 0; +ALTER TABLE task_history ADD COLUMN expected_net_tx_bytes INTEGER NOT NULL DEFAULT 0; +ALTER TABLE task_history ADD COLUMN actual_net_rx_bytes INTEGER; +ALTER TABLE task_history ADD COLUMN actual_net_tx_bytes INTEGER; +ALTER TABLE task_history ADD COLUMN group_key TEXT; + +-- Index for group concurrency checks (running tasks per group). +CREATE INDEX IF NOT EXISTS idx_tasks_group_running + ON tasks (group_key, status) + WHERE group_key IS NOT NULL AND status = 'running'; diff --git a/src/lib.rs b/src/lib.rs index 98e63fb..d7ed04a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -288,10 +288,11 @@ pub mod task; pub use backpressure::{CompositePressure, PressureSource, ThrottlePolicy}; pub use priority::Priority; pub use registry::{TaskContext, TaskExecutor}; +pub use resource::network_pressure::NetworkPressure; pub use resource::sampler::SamplerConfig; pub use resource::{ResourceReader, ResourceSampler, ResourceSnapshot}; pub use scheduler::{ - EstimatedProgress, ProgressReporter, Scheduler, SchedulerBuilder, SchedulerConfig, + EstimatedProgress, GroupLimits, ProgressReporter, Scheduler, SchedulerBuilder, SchedulerConfig, SchedulerEvent, SchedulerSnapshot, ShutdownMode, }; pub use store::{RetentionPolicy, StoreConfig, StoreError, TaskStore}; diff --git a/src/registry.rs b/src/registry.rs index c8ec0fb..3640361 100644 --- a/src/registry.rs +++ b/src/registry.rs @@ -142,6 +142,8 @@ impl ChildSpawner { pub(crate) struct IoTracker { pub read_bytes: AtomicI64, pub write_bytes: AtomicI64, + pub net_rx_bytes: AtomicI64, + pub net_tx_bytes: AtomicI64, } impl IoTracker { @@ -149,6 +151,8 @@ impl IoTracker { Self { read_bytes: AtomicI64::new(0), write_bytes: AtomicI64::new(0), + net_rx_bytes: AtomicI64::new(0), + net_tx_bytes: AtomicI64::new(0), } } @@ -156,6 +160,8 @@ impl IoTracker { crate::task::TaskMetrics { read_bytes: self.read_bytes.load(Ordering::Relaxed), write_bytes: self.write_bytes.load(Ordering::Relaxed), + net_rx_bytes: self.net_rx_bytes.load(Ordering::Relaxed), + net_tx_bytes: self.net_tx_bytes.load(Ordering::Relaxed), } } } @@ -264,6 +270,22 @@ impl TaskContext { self.io.write_bytes.fetch_add(bytes, Ordering::Relaxed); } + /// Record actual bytes received over the network during this task's execution. + /// + /// Can be called multiple times — values are accumulated. The scheduler + /// reads the total after the executor returns. + pub fn record_net_rx_bytes(&self, bytes: i64) { + self.io.net_rx_bytes.fetch_add(bytes, Ordering::Relaxed); + } + + /// Record actual bytes transmitted over the network during this task's execution. + /// + /// Can be called multiple times — values are accumulated. The scheduler + /// reads the total after the executor returns. + pub fn record_net_tx_bytes(&self, bytes: i64) { + self.io.net_tx_bytes.fetch_add(bytes, Ordering::Relaxed); + } + // ── Task submission (scoped scheduler access) ──────────────────── /// Submit a continuation or follow-up task. diff --git a/src/resource/mod.rs b/src/resource/mod.rs index 7a68657..c459c43 100644 --- a/src/resource/mod.rs +++ b/src/resource/mod.rs @@ -12,6 +12,8 @@ pub mod sampler; +pub mod network_pressure; + #[cfg(feature = "sysinfo-monitor")] pub mod sysinfo_monitor; @@ -26,6 +28,10 @@ pub struct ResourceSnapshot { pub io_read_bytes_per_sec: f64, /// Disk write throughput in bytes/sec (EWMA-smoothed). pub io_write_bytes_per_sec: f64, + /// Network receive throughput in bytes/sec (EWMA-smoothed). + pub net_rx_bytes_per_sec: f64, + /// Network transmit throughput in bytes/sec (EWMA-smoothed). + pub net_tx_bytes_per_sec: f64, } impl Default for ResourceSnapshot { @@ -34,6 +40,8 @@ impl Default for ResourceSnapshot { cpu_usage: 0.0, io_read_bytes_per_sec: 0.0, io_write_bytes_per_sec: 0.0, + net_rx_bytes_per_sec: 0.0, + net_tx_bytes_per_sec: 0.0, } } } diff --git a/src/resource/network_pressure.rs b/src/resource/network_pressure.rs new file mode 100644 index 0000000..213b76c --- /dev/null +++ b/src/resource/network_pressure.rs @@ -0,0 +1,103 @@ +//! Built-in [`PressureSource`](crate::PressureSource) derived from network bandwidth utilization. +//! +//! [`NetworkPressure`] reads the latest [`ResourceSnapshot`](crate::ResourceSnapshot) +//! and computes pressure as the ratio of observed network throughput to a +//! configured bandwidth cap. Register via +//! [`SchedulerBuilder::bandwidth_limit`](crate::SchedulerBuilder::bandwidth_limit). + +use std::sync::Arc; + +use crate::backpressure::PressureSource; +use crate::resource::ResourceReader; + +/// Pressure source based on network bandwidth utilization. +/// +/// Reads EWMA-smoothed RX+TX throughput from a [`ResourceReader`] and +/// returns `(rx + tx) / max_bandwidth_bps` clamped to `[0.0, 1.0]`. +/// +/// # Example +/// +/// ```ignore +/// use taskmill::resource::network_pressure::NetworkPressure; +/// +/// let pressure = NetworkPressure::new(reader, 100_000_000.0); // 100 MB/s cap +/// assert!(pressure.pressure() >= 0.0); +/// ``` +pub struct NetworkPressure { + reader: Arc, + /// Combined RX+TX bandwidth limit in bytes/sec. + max_bandwidth_bps: f64, +} + +impl NetworkPressure { + /// Create a new `NetworkPressure` source. + /// + /// `max_bandwidth_bps` is the combined (RX + TX) bandwidth cap in + /// bytes per second. When observed throughput reaches this value, + /// pressure reports 1.0 (fully saturated). + pub fn new(reader: Arc, max_bandwidth_bps: f64) -> Self { + assert!( + max_bandwidth_bps > 0.0, + "max_bandwidth_bps must be positive" + ); + Self { + reader, + max_bandwidth_bps, + } + } +} + +impl PressureSource for NetworkPressure { + fn pressure(&self) -> f32 { + let snapshot = self.reader.latest(); + let total_bps = snapshot.net_rx_bytes_per_sec + snapshot.net_tx_bytes_per_sec; + (total_bps / self.max_bandwidth_bps).min(1.0) as f32 + } + + fn name(&self) -> &str { + "network" + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::resource::ResourceSnapshot; + + struct FixedReader(ResourceSnapshot); + + impl ResourceReader for FixedReader { + fn latest(&self) -> ResourceSnapshot { + self.0.clone() + } + } + + #[test] + fn pressure_scales_with_throughput() { + let reader = Arc::new(FixedReader(ResourceSnapshot { + net_rx_bytes_per_sec: 30_000_000.0, + net_tx_bytes_per_sec: 20_000_000.0, + ..Default::default() + })); + let pressure = NetworkPressure::new(reader, 100_000_000.0); + assert!((pressure.pressure() - 0.5).abs() < 0.01); + } + + #[test] + fn pressure_clamps_to_one() { + let reader = Arc::new(FixedReader(ResourceSnapshot { + net_rx_bytes_per_sec: 80_000_000.0, + net_tx_bytes_per_sec: 80_000_000.0, + ..Default::default() + })); + let pressure = NetworkPressure::new(reader, 100_000_000.0); + assert!((pressure.pressure() - 1.0).abs() < f32::EPSILON); + } + + #[test] + fn zero_throughput_is_zero_pressure() { + let reader = Arc::new(FixedReader(ResourceSnapshot::default())); + let pressure = NetworkPressure::new(reader, 100_000_000.0); + assert_eq!(pressure.pressure(), 0.0); + } +} diff --git a/src/resource/sampler.rs b/src/resource/sampler.rs index 62a1dc4..6221cb1 100644 --- a/src/resource/sampler.rs +++ b/src/resource/sampler.rs @@ -117,6 +117,16 @@ pub(crate) async fn run_sampler( raw.io_write_bytes_per_sec, config.ewma_alpha, ); + smoothed.net_rx_bytes_per_sec = ewma( + smoothed.net_rx_bytes_per_sec, + raw.net_rx_bytes_per_sec, + config.ewma_alpha, + ); + smoothed.net_tx_bytes_per_sec = ewma( + smoothed.net_tx_bytes_per_sec, + raw.net_tx_bytes_per_sec, + config.ewma_alpha, + ); reader.update(smoothed.clone()).await; @@ -124,6 +134,8 @@ pub(crate) async fn run_sampler( cpu = format!("{:.1}%", smoothed.cpu_usage * 100.0), read_mbps = format!("{:.1}", smoothed.io_read_bytes_per_sec / 1_048_576.0), write_mbps = format!("{:.1}", smoothed.io_write_bytes_per_sec / 1_048_576.0), + net_rx_mbps = format!("{:.1}", smoothed.net_rx_bytes_per_sec / 1_048_576.0), + net_tx_mbps = format!("{:.1}", smoothed.net_tx_bytes_per_sec / 1_048_576.0), "resource sample" ); } diff --git a/src/resource/sysinfo_monitor.rs b/src/resource/sysinfo_monitor.rs index 25f53e3..505e9d5 100644 --- a/src/resource/sysinfo_monitor.rs +++ b/src/resource/sysinfo_monitor.rs @@ -1,42 +1,52 @@ //! Cross-platform resource sampler using the [`sysinfo`](https://docs.rs/sysinfo) crate. //! -//! Tracks CPU utilization and aggregate disk IO throughput across all mounted -//! disks. Gated behind the `sysinfo-monitor` feature (enabled by default). +//! Tracks CPU utilization, aggregate disk IO throughput across all mounted +//! disks, and aggregate network IO throughput across all network interfaces. +//! Gated behind the `sysinfo-monitor` feature (enabled by default). use std::time::Instant; -use sysinfo::{Disks, System}; +use sysinfo::{Disks, Networks, System}; use crate::resource::{ResourceSampler, ResourceSnapshot}; /// Cross-platform resource sampler using the `sysinfo` crate. /// -/// Works on Linux, macOS, and Windows. Tracks CPU utilization and -/// aggregate disk IO throughput across all mounted disks. +/// Works on Linux, macOS, and Windows. Tracks CPU utilization, +/// aggregate disk IO throughput across all mounted disks, and +/// aggregate network IO throughput across all network interfaces. pub struct SysinfoSampler { sys: System, disks: Disks, + networks: Networks, prev_read_bytes: u64, prev_write_bytes: u64, + prev_net_rx_bytes: u64, + prev_net_tx_bytes: u64, prev_sample: Option, } impl SysinfoSampler { - /// Create a new sampler, taking an initial CPU and disk reading. + /// Create a new sampler, taking an initial CPU, disk, and network reading. pub fn new() -> Self { let mut sys = System::new(); sys.refresh_cpu_usage(); let disks = Disks::new_with_refreshed_list(); + let networks = Networks::new_with_refreshed_list(); - // Take initial disk totals so first delta is meaningful. + // Take initial totals so first delta is meaningful. let (read, write) = disk_totals(&disks); + let (net_rx, net_tx) = network_totals(&networks); Self { sys, disks, + networks, prev_read_bytes: read, prev_write_bytes: write, + prev_net_rx_bytes: net_rx, + prev_net_tx_bytes: net_tx, prev_sample: Some(Instant::now()), } } @@ -57,29 +67,46 @@ impl ResourceSampler for SysinfoSampler { // Disk IO: compute bytes/sec since last sample. self.disks.refresh(true); let (read_bytes, write_bytes) = disk_totals(&self.disks); + + // Network IO: compute bytes/sec since last sample. + self.networks.refresh(true); + let (net_rx_bytes, net_tx_bytes) = network_totals(&self.networks); + let now = Instant::now(); - let (read_bps, write_bps) = if let Some(prev_ts) = self.prev_sample { + let (read_bps, write_bps, net_rx_bps, net_tx_bps) = if let Some(prev_ts) = self.prev_sample + { let elapsed = now.duration_since(prev_ts).as_secs_f64(); if elapsed > 0.0 { let read_delta = read_bytes.saturating_sub(self.prev_read_bytes); let write_delta = write_bytes.saturating_sub(self.prev_write_bytes); - (read_delta as f64 / elapsed, write_delta as f64 / elapsed) + let rx_delta = net_rx_bytes.saturating_sub(self.prev_net_rx_bytes); + let tx_delta = net_tx_bytes.saturating_sub(self.prev_net_tx_bytes); + ( + read_delta as f64 / elapsed, + write_delta as f64 / elapsed, + rx_delta as f64 / elapsed, + tx_delta as f64 / elapsed, + ) } else { - (0.0, 0.0) + (0.0, 0.0, 0.0, 0.0) } } else { - (0.0, 0.0) + (0.0, 0.0, 0.0, 0.0) }; self.prev_read_bytes = read_bytes; self.prev_write_bytes = write_bytes; + self.prev_net_rx_bytes = net_rx_bytes; + self.prev_net_tx_bytes = net_tx_bytes; self.prev_sample = Some(now); ResourceSnapshot { cpu_usage, io_read_bytes_per_sec: read_bps, io_write_bytes_per_sec: write_bps, + net_rx_bytes_per_sec: net_rx_bps, + net_tx_bytes_per_sec: net_tx_bps, } } } @@ -98,3 +125,14 @@ fn disk_totals(disks: &Disks) -> (u64, u64) { } (total_read, total_write) } + +/// Sum received/transmitted bytes across all network interfaces. +fn network_totals(networks: &Networks) -> (u64, u64) { + let mut total_rx = 0u64; + let mut total_tx = 0u64; + for (_name, data) in networks { + total_rx += data.received(); + total_tx += data.transmitted(); + } + (total_rx, total_tx) +} diff --git a/src/scheduler/gate.rs b/src/scheduler/gate.rs index 3485c8f..97edd49 100644 --- a/src/scheduler/gate.rs +++ b/src/scheduler/gate.rs @@ -4,10 +4,14 @@ //! requeued. The built-in [`DefaultDispatchGate`] applies backpressure //! throttling and IO-budget checks. +use std::collections::HashMap; use std::future::Future; use std::pin::Pin; +use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; +use tokio::sync::RwLock; + use crate::backpressure::{CompositePressure, ThrottlePolicy}; use crate::resource::ResourceReader; use crate::store::{StoreError, TaskStore}; @@ -27,6 +31,8 @@ pub struct GateContext<'a> { pub store: &'a TaskStore, /// The current resource reader, if monitoring is enabled. pub resource_reader: Option<&'a Arc>, + /// Group concurrency limits (if configured). + pub group_limits: Option<&'a GroupLimits>, } // ── Dispatch Gate ────────────────────────────────────────────────── @@ -119,17 +125,47 @@ impl DispatchGate for DefaultDispatchGate { return Ok(false); } - // IO budget check. + // IO budget check (disk). if !has_io_headroom(task, ctx).await? { tracing::trace!( task_type = task.task_type, expected_read = task.expected_read_bytes, expected_write = task.expected_write_bytes, - "task deferred — IO budget exhausted — requeuing" + "task deferred — disk IO budget exhausted — requeuing" + ); + return Ok(false); + } + + // Network IO budget check. + if !has_net_io_headroom(task, ctx).await? { + tracing::trace!( + task_type = task.task_type, + expected_rx = task.expected_net_rx_bytes, + expected_tx = task.expected_net_tx_bytes, + "task deferred — network IO budget exhausted — requeuing" ); return Ok(false); } + // Group concurrency check. + if let Some(group_key) = &task.group_key { + if let Some(limits) = ctx.group_limits { + if let Some(limit) = limits.limit_for(group_key) { + let running = ctx.store.running_count_for_group(group_key).await?; + if running >= limit as i64 { + tracing::trace!( + task_type = task.task_type, + group = group_key, + running, + limit, + "task deferred — group concurrency saturated — requeuing" + ); + return Ok(false); + } + } + } + } + Ok(true) }) } @@ -185,3 +221,112 @@ pub async fn has_io_headroom(task: &TaskRecord, ctx: &GateContext<'_>) -> Result Ok(read_ok && write_ok) } + +// ── Network IO Budget ───────────────────────────────────────────── + +/// Check if there is network IO headroom for a task given current running +/// network IO and system capacity. +/// +/// Parallel to [`has_io_headroom`] but for network throughput. +pub async fn has_net_io_headroom( + task: &TaskRecord, + ctx: &GateContext<'_>, +) -> Result { + // If the task doesn't declare any network IO, always allow. + if task.expected_net_rx_bytes == 0 && task.expected_net_tx_bytes == 0 { + return Ok(true); + } + + let Some(reader) = ctx.resource_reader else { + return Ok(true); + }; + + let snapshot = reader.latest(); + if snapshot.net_rx_bytes_per_sec == 0.0 && snapshot.net_tx_bytes_per_sec == 0.0 { + return Ok(true); + } + + let (running_rx, running_tx) = ctx.store.running_net_io_totals().await?; + + let rx_capacity = snapshot.net_rx_bytes_per_sec * 2.0; + let tx_capacity = snapshot.net_tx_bytes_per_sec * 2.0; + + let rx_ok = + rx_capacity == 0.0 || (running_rx + task.expected_net_rx_bytes) as f64 <= rx_capacity * 0.8; + let tx_ok = + tx_capacity == 0.0 || (running_tx + task.expected_net_tx_bytes) as f64 <= tx_capacity * 0.8; + + Ok(rx_ok && tx_ok) +} + +// ── Group Limits ────────────────────────────────────────────────── + +/// Per-group concurrency limits for task dispatch. +/// +/// Groups allow throttling tasks that target the same resource (e.g. an S3 +/// endpoint) independently from global concurrency. Each task can optionally +/// carry a `group_key`; the scheduler checks the running count for that group +/// against the configured limit before dispatching. +/// +/// A default limit applies to any group without a specific override. Set +/// the default to `0` (the initial value) to disable group limiting for +/// groups without explicit overrides. +pub struct GroupLimits { + default: AtomicUsize, + overrides: RwLock>, +} + +impl Default for GroupLimits { + fn default() -> Self { + Self::new() + } +} + +impl GroupLimits { + /// Create a new `GroupLimits` with no default limit and no overrides. + pub fn new() -> Self { + Self { + default: AtomicUsize::new(0), + overrides: RwLock::new(HashMap::new()), + } + } + + /// Look up the effective limit for a group. + /// + /// Returns the per-group override if set, otherwise the default limit. + /// Returns `None` if neither is configured (default is 0 = unlimited). + pub fn limit_for(&self, group: &str) -> Option { + // Fast path: check overrides with a read lock. + if let Some(&limit) = self.overrides.blocking_read().get(group) { + return Some(limit); + } + let default = self.default.load(Ordering::Relaxed); + if default > 0 { + Some(default) + } else { + None + } + } + + /// Set a concurrency limit for a specific group. + pub fn set_limit(&self, group: String, limit: usize) { + self.overrides.blocking_write().insert(group, limit); + } + + /// Remove the per-group override, falling back to the default. + pub fn remove_limit(&self, group: &str) { + self.overrides.blocking_write().remove(group); + } + + /// Set the default limit applied to groups without a specific override. + /// + /// `0` means unlimited (no group limiting for unconfigured groups). + pub fn set_default(&self, limit: usize) { + self.default.store(limit, Ordering::Relaxed); + } + + /// Read the current default limit. + pub fn default_limit(&self) -> usize { + self.default.load(Ordering::Relaxed) + } +} diff --git a/src/scheduler/mod.rs b/src/scheduler/mod.rs index b14b262..3628ad0 100644 --- a/src/scheduler/mod.rs +++ b/src/scheduler/mod.rs @@ -34,6 +34,7 @@ use crate::task::{ use dispatch::ActiveTaskMap; use gate::{DefaultDispatchGate, GateContext}; +pub use gate::GroupLimits; pub use progress::{EstimatedProgress, ProgressReporter}; // ── Snapshot ──────────────────────────────────────────────────────── @@ -227,6 +228,8 @@ struct SchedulerInner { paused: AtomicBool, /// Wakes the run loop when new work is submitted or the scheduler is resumed. work_notify: Arc, + /// Per-group concurrency limits. + group_limits: GroupLimits, } /// IO-aware priority scheduler. @@ -292,6 +295,7 @@ impl Scheduler { app_state, paused: AtomicBool::new(false), work_notify: Arc::new(Notify::new()), + group_limits: GroupLimits::new(), }), } } @@ -511,6 +515,7 @@ impl Scheduler { let gate_ctx = GateContext { store: &self.inner.store, resource_reader: reader_guard.as_ref(), + group_limits: Some(&self.inner.group_limits), }; // Admission check while the task is still pending — no running @@ -842,6 +847,25 @@ impl Scheduler { pub fn is_paused(&self) -> bool { self.inner.paused.load(AtomicOrdering::Acquire) } + + /// Set the concurrency limit for a specific task group. + /// + /// Tasks with a matching `group_key` will be throttled so that at most + /// `limit` run concurrently, independent of the global concurrency cap. + pub fn set_group_limit(&self, group: impl Into, limit: usize) { + self.inner.group_limits.set_limit(group.into(), limit); + } + + /// Remove a per-group concurrency override, falling back to the default. + pub fn remove_group_limit(&self, group: &str) { + self.inner.group_limits.remove_limit(group); + } + + /// Set the default concurrency limit for any grouped task without a + /// specific override. `0` means unlimited. + pub fn set_default_group_concurrency(&self, limit: usize) { + self.inner.group_limits.set_default(limit); + } } // ── Builder ───────────────────────────────────────────────────────── @@ -879,6 +903,9 @@ pub struct SchedulerBuilder { custom_sampler: Option>, sampler_config: SamplerConfig, app_state_entries: Vec<(std::any::TypeId, Arc)>, + bandwidth_limit_bps: Option, + default_group_concurrency: usize, + group_concurrency_overrides: Vec<(String, usize)>, } impl SchedulerBuilder { @@ -896,6 +923,9 @@ impl SchedulerBuilder { custom_sampler: None, sampler_config: SamplerConfig::default(), app_state_entries: Vec::new(), + bandwidth_limit_bps: None, + default_group_concurrency: 0, + group_concurrency_overrides: Vec::new(), } } @@ -1001,6 +1031,39 @@ impl SchedulerBuilder { self } + /// Set a network bandwidth cap (combined RX+TX, in bytes per second). + /// + /// Registers a built-in [`NetworkPressure`](crate::resource::network_pressure::NetworkPressure) + /// source that maps observed throughput to backpressure. When throughput + /// approaches this limit, low-priority tasks are throttled. + /// + /// Requires resource monitoring to be enabled (either + /// [`with_resource_monitoring`](Self::with_resource_monitoring) or a custom + /// [`resource_sampler`](Self::resource_sampler)). + pub fn bandwidth_limit(mut self, max_bytes_per_sec: f64) -> Self { + self.bandwidth_limit_bps = Some(max_bytes_per_sec); + self.enable_resource_monitoring = true; + self + } + + /// Set the default concurrency limit for any grouped task. + /// + /// `0` means unlimited (no limit for groups without an explicit override). + /// Default: 0. + pub fn default_group_concurrency(mut self, limit: usize) -> Self { + self.default_group_concurrency = limit; + self + } + + /// Set a concurrency limit for a specific task group. + /// + /// Tasks with a matching `group_key` will be limited to at most `limit` + /// concurrent executions. + pub fn group_concurrency(mut self, group: impl Into, limit: usize) -> Self { + self.group_concurrency_overrides.push((group.into(), limit)); + self + } + /// Register shared application state accessible from every executor via /// [`TaskContext::state`](crate::TaskContext::state). /// @@ -1069,11 +1132,29 @@ impl SchedulerBuilder { registry.register_erased(&name, executor); } + // Prepare resource monitoring reader early so NetworkPressure can + // reference it before the gate is boxed. + let reader = if self.enable_resource_monitoring { + Some(SmoothedReader::new()) + } else { + None + }; + // Build gate from pressure sources + policy. let mut pressure = CompositePressure::new(); for source in self.pressure_sources { pressure.add_source(source); } + + // Register NetworkPressure if a bandwidth limit was set. + if let (Some(bw_limit), Some(ref reader)) = (self.bandwidth_limit_bps, &reader) { + let net_pressure = crate::resource::network_pressure::NetworkPressure::new( + Arc::new(reader.clone()) as Arc, + bw_limit, + ); + pressure.add_source(Box::new(net_pressure)); + } + let policy = self .policy .unwrap_or_else(ThrottlePolicy::default_three_tier); @@ -1086,8 +1167,23 @@ impl SchedulerBuilder { let scheduler = Scheduler::with_gate(store, self.config, Arc::new(registry), gate, app_state); + // Apply group concurrency limits. + if self.default_group_concurrency > 0 { + scheduler + .inner + .group_limits + .set_default(self.default_group_concurrency); + } + for (group, limit) in self.group_concurrency_overrides { + scheduler.inner.group_limits.set_limit(group, limit); + } + // Set up resource monitoring. - if self.enable_resource_monitoring { + if let Some(reader) = reader { + scheduler + .set_resource_reader(Arc::new(reader.clone())) + .await; + #[cfg(feature = "sysinfo-monitor")] let sampler: Box = self .custom_sampler @@ -1098,11 +1194,6 @@ impl SchedulerBuilder { .custom_sampler .expect("resource monitoring enabled but no custom sampler provided and sysinfo-monitor feature is disabled"); - let reader = SmoothedReader::new(); - scheduler - .set_resource_reader(Arc::new(reader.clone())) - .await; - // Spawn sampler loop — it will stop when the scheduler's sampler_token is cancelled. let sampler_arc = Arc::new(tokio::sync::Mutex::new(sampler)); let sampler_config = self.sampler_config; diff --git a/src/store.rs b/src/store.rs index f82006f..2ad2b6f 100644 --- a/src/store.rs +++ b/src/store.rs @@ -126,7 +126,7 @@ impl Default for StoreConfig { /// assert_eq!(task.status, TaskStatus::Running); /// /// // Complete it — moves to history. -/// store.complete(task.id, &TaskMetrics { read_bytes: 4096, write_bytes: 1024 }).await?; +/// store.complete(task.id, &TaskMetrics { read_bytes: 4096, write_bytes: 1024, ..Default::default() }).await?; /// assert!(store.task_by_id(task.id).await?.is_none()); // gone from active queue /// # Ok(()) /// # } @@ -201,6 +201,9 @@ impl TaskStore { sqlx::raw_sql(include_str!("../migrations/002_add_label.sql")) .execute(&self.pool) .await?; + sqlx::raw_sql(include_str!("../migrations/003_net_io_and_groups.sql")) + .execute(&self.pool) + .await?; Ok(()) } @@ -263,8 +266,8 @@ impl TaskStore { let fail_fast_val: i32 = if sub.fail_fast { 1 } else { 0 }; tracing::debug!(task_type = %sub.task_type, "store.submit: INSERT start"); let result = sqlx::query( - "INSERT OR IGNORE INTO tasks (task_type, key, label, priority, payload, expected_read_bytes, expected_write_bytes, parent_id, fail_fast) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", + "INSERT OR IGNORE INTO tasks (task_type, key, label, priority, payload, expected_read_bytes, expected_write_bytes, expected_net_rx_bytes, expected_net_tx_bytes, parent_id, fail_fast, group_key) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", ) .bind(&sub.task_type) .bind(&key) @@ -273,8 +276,11 @@ impl TaskStore { .bind(&sub.payload) .bind(sub.expected_read_bytes) .bind(sub.expected_write_bytes) + .bind(sub.expected_net_rx_bytes) + .bind(sub.expected_net_tx_bytes) .bind(sub.parent_id) .bind(fail_fast_val) + .bind(&sub.group_key) .execute(&self.pool) .await?; tracing::debug!(task_type = %sub.task_type, "store.submit: INSERT end"); @@ -350,8 +356,8 @@ impl TaskStore { let priority = sub.priority.value() as i32; let fail_fast_val: i32 = if sub.fail_fast { 1 } else { 0 }; let result = sqlx::query( - "INSERT OR IGNORE INTO tasks (task_type, key, label, priority, payload, expected_read_bytes, expected_write_bytes, parent_id, fail_fast) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", + "INSERT OR IGNORE INTO tasks (task_type, key, label, priority, payload, expected_read_bytes, expected_write_bytes, expected_net_rx_bytes, expected_net_tx_bytes, parent_id, fail_fast, group_key) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", ) .bind(&sub.task_type) .bind(&key) @@ -360,8 +366,11 @@ impl TaskStore { .bind(&sub.payload) .bind(sub.expected_read_bytes) .bind(sub.expected_write_bytes) + .bind(sub.expected_net_rx_bytes) + .bind(sub.expected_net_tx_bytes) .bind(sub.parent_id) .bind(fail_fast_val) + .bind(&sub.group_key) .execute(&mut *conn) .await?; @@ -480,8 +489,6 @@ impl TaskStore { /// Mark a task as completed and move it to history. pub async fn complete(&self, id: i64, metrics: &TaskMetrics) -> Result<(), StoreError> { - let actual_read_bytes = metrics.read_bytes; - let actual_write_bytes = metrics.write_bytes; tracing::debug!(task_id = id, "store.complete: BEGIN tx"); let mut conn = self.begin_write().await?; @@ -513,9 +520,10 @@ impl TaskStore { let fail_fast_val: i32 = if task.fail_fast { 1 } else { 0 }; sqlx::query( "INSERT INTO task_history (task_type, key, label, priority, status, payload, - expected_read_bytes, expected_write_bytes, actual_read_bytes, actual_write_bytes, - retry_count, last_error, created_at, started_at, duration_ms, parent_id, fail_fast) - VALUES (?, ?, ?, ?, 'completed', ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", + expected_read_bytes, expected_write_bytes, expected_net_rx_bytes, expected_net_tx_bytes, + actual_read_bytes, actual_write_bytes, actual_net_rx_bytes, actual_net_tx_bytes, + retry_count, last_error, created_at, started_at, duration_ms, parent_id, fail_fast, group_key) + VALUES (?, ?, ?, ?, 'completed', ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", ) .bind(&task.task_type) .bind(&task.key) @@ -524,8 +532,12 @@ impl TaskStore { .bind(&task.payload) .bind(task.expected_read_bytes) .bind(task.expected_write_bytes) - .bind(actual_read_bytes) - .bind(actual_write_bytes) + .bind(task.expected_net_rx_bytes) + .bind(task.expected_net_tx_bytes) + .bind(metrics.read_bytes) + .bind(metrics.write_bytes) + .bind(metrics.net_rx_bytes) + .bind(metrics.net_tx_bytes) .bind(task.retry_count) .bind(&task.last_error) .bind(task.created_at.format("%Y-%m-%d %H:%M:%S").to_string()) @@ -536,6 +548,7 @@ impl TaskStore { .bind(duration_ms) .bind(task.parent_id) .bind(fail_fast_val) + .bind(&task.group_key) .execute(&mut *conn) .await?; @@ -625,9 +638,10 @@ impl TaskStore { let fail_fast_val: i32 = if task.fail_fast { 1 } else { 0 }; sqlx::query( "INSERT INTO task_history (task_type, key, label, priority, status, payload, - expected_read_bytes, expected_write_bytes, actual_read_bytes, actual_write_bytes, - retry_count, last_error, created_at, started_at, duration_ms, parent_id, fail_fast) - VALUES (?, ?, ?, ?, 'failed', ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", + expected_read_bytes, expected_write_bytes, expected_net_rx_bytes, expected_net_tx_bytes, + actual_read_bytes, actual_write_bytes, actual_net_rx_bytes, actual_net_tx_bytes, + retry_count, last_error, created_at, started_at, duration_ms, parent_id, fail_fast, group_key) + VALUES (?, ?, ?, ?, 'failed', ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", ) .bind(&task.task_type) .bind(&task.key) @@ -636,8 +650,12 @@ impl TaskStore { .bind(&task.payload) .bind(task.expected_read_bytes) .bind(task.expected_write_bytes) + .bind(task.expected_net_rx_bytes) + .bind(task.expected_net_tx_bytes) .bind(metrics.read_bytes) .bind(metrics.write_bytes) + .bind(metrics.net_rx_bytes) + .bind(metrics.net_tx_bytes) .bind(task.retry_count + 1) .bind(error) .bind(task.created_at.format("%Y-%m-%d %H:%M:%S").to_string()) @@ -645,6 +663,7 @@ impl TaskStore { .bind(duration_ms) .bind(task.parent_id) .bind(fail_fast_val) + .bind(&task.group_key) .execute(&mut *conn) .await?; @@ -780,6 +799,27 @@ impl TaskStore { Ok(row) } + /// Sum of expected network rx/tx bytes for all running tasks. + pub async fn running_net_io_totals(&self) -> Result<(i64, i64), StoreError> { + let row: (i64, i64) = sqlx::query_as( + "SELECT COALESCE(SUM(expected_net_rx_bytes), 0), COALESCE(SUM(expected_net_tx_bytes), 0) + FROM tasks WHERE status = 'running'", + ) + .fetch_one(&self.pool) + .await?; + Ok(row) + } + + /// Count of running tasks in a specific group. + pub async fn running_count_for_group(&self, group_key: &str) -> Result { + let count: (i64,) = + sqlx::query_as("SELECT COUNT(*) FROM tasks WHERE group_key = ? AND status = 'running'") + .bind(group_key) + .fetch_one(&self.pool) + .await?; + Ok(count.0) + } + // ── Query: history ────────────────────────────────────────────── /// Look up a history record by its row id. @@ -1170,6 +1210,8 @@ fn row_to_task_record(row: &sqlx::sqlite::SqliteRow) -> TaskRecord { payload: row.get("payload"), expected_read_bytes: row.get("expected_read_bytes"), expected_write_bytes: row.get("expected_write_bytes"), + expected_net_rx_bytes: row.get("expected_net_rx_bytes"), + expected_net_tx_bytes: row.get("expected_net_tx_bytes"), retry_count: row.get("retry_count"), last_error: row.get("last_error"), created_at: parse_datetime(&created_at_str), @@ -1178,6 +1220,7 @@ fn row_to_task_record(row: &sqlx::sqlite::SqliteRow) -> TaskRecord { requeue_priority: requeue_priority_val.map(|p| Priority::new(p as u8)), parent_id, fail_fast: fail_fast_val != 0, + group_key: row.get("group_key"), } } @@ -1200,8 +1243,12 @@ fn row_to_history_record(row: &sqlx::sqlite::SqliteRow) -> TaskHistoryRecord { payload: row.get("payload"), expected_read_bytes: row.get("expected_read_bytes"), expected_write_bytes: row.get("expected_write_bytes"), + expected_net_rx_bytes: row.get("expected_net_rx_bytes"), + expected_net_tx_bytes: row.get("expected_net_tx_bytes"), actual_read_bytes: row.get("actual_read_bytes"), actual_write_bytes: row.get("actual_write_bytes"), + actual_net_rx_bytes: row.get("actual_net_rx_bytes"), + actual_net_tx_bytes: row.get("actual_net_tx_bytes"), retry_count: row.get("retry_count"), last_error: row.get("last_error"), created_at: parse_datetime(&created_at_str), @@ -1210,6 +1257,7 @@ fn row_to_history_record(row: &sqlx::sqlite::SqliteRow) -> TaskHistoryRecord { duration_ms: row.get("duration_ms"), parent_id, fail_fast: fail_fast_val != 0, + group_key: row.get("group_key"), } } @@ -1461,6 +1509,7 @@ mod tests { &TaskMetrics { read_bytes: 2000, write_bytes: 1000, + ..Default::default() }, ) .await @@ -1521,6 +1570,7 @@ mod tests { &TaskMetrics { read_bytes: 100, write_bytes: 50, + ..Default::default() }, ) .await @@ -1597,6 +1647,7 @@ mod tests { &TaskMetrics { read_bytes: 1000, write_bytes: 500, + ..Default::default() }, ) .await @@ -1678,6 +1729,7 @@ mod tests { &TaskMetrics { read_bytes: 100, write_bytes: 50, + ..Default::default() }, ) .await diff --git a/src/task.rs b/src/task.rs index 46a0f76..db36836 100644 --- a/src/task.rs +++ b/src/task.rs @@ -102,6 +102,8 @@ pub struct TaskRecord { pub payload: Option>, pub expected_read_bytes: i64, pub expected_write_bytes: i64, + pub expected_net_rx_bytes: i64, + pub expected_net_tx_bytes: i64, pub retry_count: i32, pub last_error: Option, pub created_at: DateTime, @@ -114,6 +116,9 @@ pub struct TaskRecord { /// fails the parent immediately. When `false`, the parent waits for all /// children to finish before resolving. pub fail_fast: bool, + /// Optional group key for per-group concurrency limiting (e.g. an + /// endpoint URL). Tasks in the same group share a concurrency budget. + pub group_key: Option, } impl TaskRecord { @@ -143,8 +148,12 @@ pub struct TaskHistoryRecord { pub payload: Option>, pub expected_read_bytes: i64, pub expected_write_bytes: i64, + pub expected_net_rx_bytes: i64, + pub expected_net_tx_bytes: i64, pub actual_read_bytes: Option, pub actual_write_bytes: Option, + pub actual_net_rx_bytes: Option, + pub actual_net_tx_bytes: Option, pub retry_count: i32, pub last_error: Option, pub created_at: DateTime, @@ -155,6 +164,8 @@ pub struct TaskHistoryRecord { pub parent_id: Option, /// Whether the parent used fail-fast semantics. pub fail_fast: bool, + /// Optional group key for per-group concurrency limiting. + pub group_key: Option, } /// Accumulated IO metrics captured by the scheduler after an executor finishes. @@ -166,6 +177,8 @@ pub struct TaskHistoryRecord { pub struct TaskMetrics { pub read_bytes: i64, pub write_bytes: i64, + pub net_rx_bytes: i64, + pub net_tx_bytes: i64, } /// Reported by the executor on failure. @@ -312,6 +325,8 @@ pub struct TaskSubmission { pub payload: Option>, pub expected_read_bytes: i64, pub expected_write_bytes: i64, + pub expected_net_rx_bytes: i64, + pub expected_net_tx_bytes: i64, /// Parent task ID for hierarchical tasks. Set automatically by /// [`TaskContext::spawn_child`](crate::TaskContext::spawn_child). pub parent_id: Option, @@ -320,6 +335,12 @@ pub struct TaskSubmission { /// children to finish before resolving. Only meaningful for parent tasks /// that spawn children. pub fail_fast: bool, + /// Optional group key for per-group concurrency limiting. + /// + /// Tasks with the same group key share a concurrency budget controlled + /// by [`SchedulerBuilder::group_concurrency`](crate::SchedulerBuilder::group_concurrency). + /// Use this to prevent hammering a single endpoint (e.g. `"s3://my-bucket"`). + pub group_key: Option, } impl TaskSubmission { @@ -348,8 +369,11 @@ impl TaskSubmission { payload: None, expected_read_bytes: 0, expected_write_bytes: 0, + expected_net_rx_bytes: 0, + expected_net_tx_bytes: 0, parent_id: None, fail_fast: true, + group_key: None, } } @@ -400,7 +424,7 @@ impl TaskSubmission { self } - /// Set expected IO bytes for budget-based scheduling. + /// Set expected disk IO bytes for budget-based scheduling. /// /// The scheduler uses these estimates to avoid saturating disk throughput /// when [resource monitoring](crate::SchedulerBuilder::with_resource_monitoring) @@ -411,6 +435,26 @@ impl TaskSubmission { self } + /// Set expected network IO bytes for budget-based scheduling. + /// + /// The scheduler uses these estimates to avoid saturating network bandwidth + /// when [resource monitoring](crate::SchedulerBuilder::with_resource_monitoring) + /// is enabled. Default: 0 for both. + pub fn expected_net_io(mut self, rx_bytes: i64, tx_bytes: i64) -> Self { + self.expected_net_rx_bytes = rx_bytes; + self.expected_net_tx_bytes = tx_bytes; + self + } + + /// Set the group key for per-group concurrency limiting. + /// + /// Tasks with the same group key share a concurrency budget. Use this + /// to prevent hammering a single endpoint (e.g. `"s3://my-bucket"`). + pub fn group(mut self, group_key: impl Into) -> Self { + self.group_key = Some(group_key.into()); + self + } + /// Set the fail-fast flag for parent tasks that spawn children. /// /// When `true` (the default), the first child failure cancels siblings @@ -484,30 +528,48 @@ pub trait TypedTask: Serialize + DeserializeOwned + Send + 'static { /// Unique name used to register and look up the executor. const TASK_TYPE: &'static str; - /// Estimated bytes this task will read. Default: 0. + /// Estimated bytes this task will read from disk. Default: 0. fn expected_read_bytes(&self) -> i64 { 0 } - /// Estimated bytes this task will write. Default: 0. + /// Estimated bytes this task will write to disk. Default: 0. fn expected_write_bytes(&self) -> i64 { 0 } + /// Estimated bytes this task will receive over the network. Default: 0. + fn expected_net_rx_bytes(&self) -> i64 { + 0 + } + + /// Estimated bytes this task will transmit over the network. Default: 0. + fn expected_net_tx_bytes(&self) -> i64 { + 0 + } + /// Scheduling priority. Default: [`Priority::NORMAL`]. fn priority(&self) -> Priority { Priority::NORMAL } + + /// Optional group key for per-group concurrency limiting. Default: `None`. + fn group_key(&self) -> Option { + None + } } impl TaskSubmission { /// Create a submission from a [`TypedTask`], serializing the payload and - /// pulling task type, priority, and IO estimates from the trait. + /// pulling task type, priority, IO estimates, and group key from the trait. pub fn from_typed(task: &T) -> Result { - Ok(Self::new(T::TASK_TYPE) + let mut sub = Self::new(T::TASK_TYPE) .priority(task.priority()) .payload_json(task)? - .expected_io(task.expected_read_bytes(), task.expected_write_bytes())) + .expected_io(task.expected_read_bytes(), task.expected_write_bytes()) + .expected_net_io(task.expected_net_rx_bytes(), task.expected_net_tx_bytes()); + sub.group_key = task.group_key(); + Ok(sub) } } @@ -627,6 +689,49 @@ mod tests { let sub = TaskSubmission::from_typed(&Minimal).unwrap(); assert_eq!(sub.expected_read_bytes, 0); assert_eq!(sub.expected_write_bytes, 0); + assert_eq!(sub.expected_net_rx_bytes, 0); + assert_eq!(sub.expected_net_tx_bytes, 0); assert_eq!(sub.priority, Priority::NORMAL); + assert!(sub.group_key.is_none()); + } + + #[test] + fn typed_task_with_network_and_group() { + #[derive(Serialize, Deserialize)] + struct S3Upload { + bucket: String, + size: i64, + } + + impl TypedTask for S3Upload { + const TASK_TYPE: &'static str = "s3-upload"; + + fn expected_net_tx_bytes(&self) -> i64 { + self.size + } + + fn group_key(&self) -> Option { + Some(format!("s3://{}", self.bucket)) + } + } + + let task = S3Upload { + bucket: "my-bucket".into(), + size: 10_000_000, + }; + let sub = TaskSubmission::from_typed(&task).unwrap(); + assert_eq!(sub.expected_net_tx_bytes, 10_000_000); + assert_eq!(sub.expected_net_rx_bytes, 0); + assert_eq!(sub.group_key.as_deref(), Some("s3://my-bucket")); + } + + #[test] + fn submission_builder_net_io_and_group() { + let sub = TaskSubmission::new("upload") + .expected_net_io(5000, 10000) + .group("s3://bucket-a"); + assert_eq!(sub.expected_net_rx_bytes, 5000); + assert_eq!(sub.expected_net_tx_bytes, 10000); + assert_eq!(sub.group_key.as_deref(), Some("s3://bucket-a")); } } From f31b09ebba407f4c5978cceb18e4ca93e463db2a Mon Sep 17 00:00:00 2001 From: DJ Majumdar Date: Fri, 13 Mar 2026 21:48:10 -0700 Subject: [PATCH 2/2] docs: document network IO tracking, bandwidth pressure, and group concurrency Update README, feature docs, quick-start, IO/backpressure guide, progress reporting, and inline doc comments to cover network-aware scheduling, NetworkPressure, and per-group concurrency limits. --- README.md | 6 +++-- docs/configuration.md | 3 +++ docs/features.md | 15 ++++++++--- docs/io-and-backpressure.md | 48 ++++++++++++++++++++++++++++----- docs/progress-reporting.md | 1 + docs/quick-start.md | 3 ++- src/backpressure.rs | 4 +++ src/lib.rs | 54 ++++++++++++++++++++++++++++++++++--- src/resource/mod.rs | 12 +++++---- src/scheduler/mod.rs | 8 +++--- src/task.rs | 18 +++++++++++-- 11 files changed, 145 insertions(+), 27 deletions(-) diff --git a/README.md b/README.md index 4211fe9..ea82cc7 100644 --- a/README.md +++ b/README.md @@ -86,16 +86,18 @@ scheduler.run(token).await; - **SQLite persistence** — tasks survive restarts; crash recovery requeues interrupted work - **256-level priority queue** — with preemption of lower-priority tasks -- **IO-aware scheduling** — defers work when disk throughput is saturated +- **IO-aware scheduling** — defers work when disk or network throughput is saturated - **Key-based deduplication** — SHA-256 keys prevent duplicate submissions - **Composable backpressure** — plug in external pressure signals with custom throttle policies -- **Cross-platform resource monitoring** — CPU and disk IO via `sysinfo` (Linux, macOS, Windows) +- **Cross-platform resource monitoring** — CPU, disk IO, and network throughput via `sysinfo` (Linux, macOS, Windows) +- **Network bandwidth pressure** — built-in `NetworkPressure` source throttles tasks when bandwidth is saturated - **Retries** — automatic requeue of retryable failures with configurable limits - **Progress reporting** — executor-reported and throughput-extrapolated progress - **Lifecycle events** — broadcast events for UI integration (Tauri, etc.) - **Typed payloads** — serialize/deserialize structured task data - **Batch submission** — bulk enqueue in a single SQLite transaction - **Graceful shutdown** — configurable drain timeout before force-cancellation +- **Task group concurrency** — limit concurrent tasks per named group (e.g., per S3 bucket) - **Global pause/resume** — pause all work when the app is backgrounded - **Type-keyed application state** — register multiple state types, inject pre- or post-build - **Clone-friendly** — `Scheduler` is `Clone` via `Arc` for easy sharing diff --git a/docs/configuration.md b/docs/configuration.md index f5eb708..adf0d4a 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -142,6 +142,9 @@ All `SchedulerBuilder` methods: | `with_resource_monitoring()` | Enable platform resource monitoring. | | `resource_sampler(sampler)` | Provide a custom `ResourceSampler`. | | `sampler_config(config)` | Configure sample interval and smoothing. | +| `bandwidth_limit(bytes_per_sec)` | Set a network bandwidth cap; registers a built-in `NetworkPressure` source. | +| `default_group_concurrency(n)` | Default concurrency limit for grouped tasks (0 = unlimited). | +| `group_concurrency(group, n)` | Per-group concurrency limit override. | | `app_state(state)` | Register a state type (multiple types can coexist). | | `app_state_arc(arc)` | Register a state type from a pre-existing `Arc`. | | `build()` | Build and return the `Scheduler`. | diff --git a/docs/features.md b/docs/features.md index 39af17b..9458b02 100644 --- a/docs/features.md +++ b/docs/features.md @@ -15,6 +15,13 @@ A complete list of taskmill's capabilities. - **Atomic dispatch** — pop operations use `UPDATE ... WHERE id = (SELECT ...) RETURNING *` for race-free claiming with no lost tasks. - **Runtime-adjustable concurrency** — change `max_concurrency` at runtime via `set_max_concurrency()`. +## Task Groups + +- **Per-group concurrency** — assign tasks to named groups via `.group(key)` on `TaskSubmission` or `TypedTask::group_key()`. The scheduler limits how many tasks in the same group can run concurrently. +- **Configurable limits** — set per-group limits at build time with `.group_concurrency(group, limit)` or a default for all groups with `.default_group_concurrency(limit)`. +- **Runtime-adjustable** — change limits at runtime via `set_group_limit()`, `remove_group_limit()`, and `set_default_group_concurrency()`. +- **Independent of global concurrency** — group limits are checked *in addition to* `max_concurrency`. A task must pass both the global and group gate to be dispatched. + ## Deduplication - **Key-based dedup** — each task gets a SHA-256 key derived from `task_type + payload` (or an explicit key). A `UNIQUE(key)` constraint with `INSERT OR IGNORE` prevents duplicate submissions. @@ -25,15 +32,17 @@ A complete list of taskmill's capabilities. ## IO Awareness - **Expected/actual IO tracking** — submit estimated read/write bytes; executors report actual bytes on completion. +- **Network IO tracking** — tasks can declare expected network RX/TX bytes via `expected_net_io()` and report actuals via `ctx.record_net_rx_bytes()` / `ctx.record_net_tx_bytes()`. - **IO budget gating** — the scheduler compares running task IO estimates against EWMA-smoothed system throughput. New work is deferred when cumulative IO would exceed 80% of observed disk capacity. - **Learning from history** — `avg_throughput()` and `history_stats()` compute per-type IO averages from actual completions, enabling callers to refine estimates over time. ## Resource Monitoring -- **Cross-platform** — CPU and disk IO via `sysinfo` on Linux, macOS, and Windows. Feature-gated under `sysinfo-monitor` (enabled by default). +- **Cross-platform** — CPU, disk IO, and network throughput via `sysinfo` on Linux, macOS, and Windows. Feature-gated under `sysinfo-monitor` (enabled by default). - **EWMA smoothing** — raw samples are smoothed with an exponentially weighted moving average (alpha=0.3, configurable) to avoid spiky readings. - **Two-trait design** — `ResourceSampler` (raw platform readings) and `ResourceReader` (smoothed snapshots) are separated for testability and custom implementations. - **Custom samplers** — disable the `sysinfo-monitor` feature and provide your own `ResourceSampler` for containers, cgroups, or mobile platforms. +- **Network bandwidth pressure** — built-in `NetworkPressure` source maps observed RX+TX throughput against a configurable bandwidth cap to backpressure. Enable via `.bandwidth_limit(bytes_per_sec)` on the builder. ## Backpressure @@ -61,7 +70,7 @@ A complete list of taskmill's capabilities. ## Lifecycle Events -- **Broadcast channel** — subscribe via `scheduler.subscribe()` to receive `SchedulerEvent` variants: `Dispatched`, `Completed`, `Failed`, `Preempted`, `Cancelled`, `Progress`, `Paused`, `Resumed`. +- **Broadcast channel** — subscribe via `scheduler.subscribe()` to receive `SchedulerEvent` variants: `Dispatched`, `Completed`, `Failed`, `Preempted`, `Cancelled`, `Progress`, `Waiting`, `Paused`, `Resumed`. - **Tauri-ready** — all events are `Serialize`, designed for direct bridging to frontend via `app_handle.emit()`. ## Task Management @@ -72,7 +81,7 @@ A complete list of taskmill's capabilities. ## Typed Payloads -- **Builder-style submission** — `TaskSubmission::new(type).payload_json(&data)?.expected_io(r, w)` for ergonomic construction with serialization. +- **Builder-style submission** — `TaskSubmission::new(type).payload_json(&data)?.expected_io(r, w)` for ergonomic construction with serialization. Use `.label("display name")` to set a human-readable display label independent of the dedup key. - **Type-safe deserialization** — `ctx.payload::()?` in executors for zero-boilerplate extraction. - **TypedTask trait** — define `TASK_TYPE`, default priority, and expected IO on your struct. Submit with `scheduler.submit_typed()` and deserialize with `ctx.payload::()`. diff --git a/docs/io-and-backpressure.md b/docs/io-and-backpressure.md index 6c12bd6..5dc3d7b 100644 --- a/docs/io-and-backpressure.md +++ b/docs/io-and-backpressure.md @@ -25,6 +25,26 @@ ctx.record_write_bytes(9_876); Actual values are stored in `task_history` for learning. +### Network IO + +Tasks that perform network transfers can declare expected bandwidth: + +```rust +let sub = TaskSubmission::new("upload") + .payload_json(&upload_payload)? + .expected_io(0, 0) // no disk IO + .expected_net_io(0, 50_000_000); // 50 MB upload +``` + +Executors report actual network bytes during execution: + +```rust +ctx.record_net_rx_bytes(response_size); +ctx.record_net_tx_bytes(uploaded_bytes); +``` + +Network actuals are stored in `task_history` alongside disk IO. + ### IO budget gating When resource monitoring is enabled, the scheduler checks IO headroom before dispatching: @@ -54,7 +74,7 @@ let stats = store.history_stats("scan").await?; ### Built-in platform sampler -Enabled by default via the `sysinfo-monitor` feature flag. Provides CPU and disk IO on Linux, macOS, and Windows. +Enabled by default via the `sysinfo-monitor` feature flag. Provides CPU, disk IO, and network throughput on Linux, macOS, and Windows. ```rust let scheduler = Scheduler::builder() @@ -75,9 +95,11 @@ struct CgroupSampler; impl ResourceSampler for CgroupSampler { fn sample(&mut self) -> ResourceSnapshot { ResourceSnapshot { - cpu_usage: read_cgroup_cpu(), // 0.0–1.0 + cpu_usage: read_cgroup_cpu(), // 0.0–1.0 io_read_bytes_per_sec: read_blkio_read(), io_write_bytes_per_sec: read_blkio_write(), + net_rx_bytes_per_sec: read_net_rx(), + net_tx_bytes_per_sec: read_net_tx(), } } } @@ -146,8 +168,8 @@ Multiple sources are aggregated via `CompositePressure`. The aggregate pressure use taskmill::CompositePressure; let mut pressure = CompositePressure::new(); -pressure.add_source(Arc::new(MemoryPressure)); -pressure.add_source(Arc::new(QueueDepthPressure)); +pressure.add_source(Box::new(MemoryPressure)); +pressure.add_source(Box::new(QueueDepthPressure)); // Aggregate = max(memory_pressure, queue_pressure) ``` @@ -155,8 +177,8 @@ Or via the builder: ```rust let scheduler = Scheduler::builder() - .pressure_source(Arc::new(MemoryPressure)) - .pressure_source(Arc::new(QueueDepthPressure)) + .pressure_source(Box::new(MemoryPressure)) + .pressure_source(Box::new(QueueDepthPressure)) .build() .await?; ``` @@ -188,6 +210,20 @@ The default `DispatchGate` combines both mechanisms. A task is dispatched only w If either check fails, the task stays in the queue and is retried on the next poll cycle. +### Network bandwidth pressure + +Taskmill includes a built-in `NetworkPressure` source that maps observed network throughput against a configurable bandwidth cap. Enable it via the builder: + +```rust +let scheduler = Scheduler::builder() + .with_resource_monitoring() + .bandwidth_limit(100_000_000.0) // 100 MB/s combined RX+TX cap + .build() + .await?; +``` + +When observed RX+TX throughput approaches the cap, the pressure rises toward 1.0 and the `ThrottlePolicy` starts throttling lower-priority tasks. This is especially useful for upload/download workloads where you want to avoid saturating a network link. + ### Diagnostics The `SchedulerSnapshot` includes pressure readings for debugging: diff --git a/docs/progress-reporting.md b/docs/progress-reporting.md index fc3ade3..f06fa7b 100644 --- a/docs/progress-reporting.md +++ b/docs/progress-reporting.md @@ -114,6 +114,7 @@ All scheduler state changes are broadcast as `SchedulerEvent` variants: | `Preempted { task_id, task_type, key, label }` | Task paused for higher-priority work | | `Cancelled { task_id, task_type, key, label }` | Task cancelled via `scheduler.cancel()` | | `Progress { task_id, task_type, key, label, percent, message }` | Progress update from executor | +| `Waiting { task_id, children_count }` | Parent task entered waiting state after spawning children | | `Paused` | Scheduler globally paused via `pause_all()` | | `Resumed` | Scheduler resumed via `resume_all()` | diff --git a/docs/quick-start.md b/docs/quick-start.md index d9c7492..a594d24 100644 --- a/docs/quick-start.md +++ b/docs/quick-start.md @@ -153,8 +153,9 @@ For full control over components, use `Scheduler::new()` directly: use std::sync::Arc; use taskmill::{ CompositePressure, Scheduler, SchedulerConfig, - TaskStore, TaskTypeRegistry, ThrottlePolicy, + TaskStore, ThrottlePolicy, }; +use taskmill::registry::TaskTypeRegistry; let store = TaskStore::open("tasks.db").await.unwrap(); diff --git a/src/backpressure.rs b/src/backpressure.rs index 9f6e7c2..c856385 100644 --- a/src/backpressure.rs +++ b/src/backpressure.rs @@ -7,6 +7,10 @@ //! [`ThrottlePolicy`] maps the aggregate pressure to per-[`Priority`] //! throttle decisions. Customize the policy with //! [`SchedulerBuilder::throttle_policy`](crate::SchedulerBuilder::throttle_policy). +//! +//! A built-in [`NetworkPressure`](crate::NetworkPressure) source is available for +//! throttling based on network bandwidth — enable it via +//! [`SchedulerBuilder::bandwidth_limit`](crate::SchedulerBuilder::bandwidth_limit). use crate::priority::Priority; diff --git a/src/lib.rs b/src/lib.rs index d7ed04a..7e20df3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -6,8 +6,8 @@ //! - Persists tasks to SQLite so the queue survives restarts //! - Schedules by priority (0 = highest, 255 = lowest) with [named tiers](Priority) //! - Deduplicates tasks by key — submitting an already-queued key is a no-op -//! - Tracks expected and actual IO bytes per task for budget-based scheduling -//! - Monitors system CPU and disk throughput to adjust concurrency +//! - Tracks expected and actual IO bytes (disk and network) per task for budget-based scheduling +//! - Monitors system CPU, disk, and network throughput to adjust concurrency //! - Supports [composable backpressure](PressureSource) from arbitrary external sources //! - Preempts lower-priority work when high-priority tasks arrive //! - [Retries](TaskError::retryable) failed tasks at the same priority level @@ -69,6 +69,27 @@ //! which feeds back into historical throughput averages for future scheduling //! decisions. //! +//! ### Network IO +//! +//! Tasks can also declare expected network IO via +//! [`TaskSubmission::expected_net_io`] (or [`TypedTask::expected_net_rx_bytes`] / +//! [`expected_net_tx_bytes`](TypedTask::expected_net_tx_bytes)). Executors report +//! actual network bytes via [`TaskContext::record_net_rx_bytes`] / +//! [`record_net_tx_bytes`](TaskContext::record_net_tx_bytes). To throttle tasks +//! when network bandwidth is saturated, set a bandwidth cap with +//! [`SchedulerBuilder::bandwidth_limit`] — this registers a built-in +//! [`NetworkPressure`] source that maps observed throughput to backpressure. +//! +//! ## Task groups +//! +//! Tasks can be assigned to a named group via [`TaskSubmission::group`] (or +//! [`TypedTask::group_key`]). The scheduler enforces per-group concurrency +//! limits — for example, limiting uploads to any single S3 bucket to 4 +//! concurrent tasks. Configure limits at build time with +//! [`SchedulerBuilder::group_concurrency`] and +//! [`SchedulerBuilder::default_group_concurrency`], or adjust at runtime via +//! [`Scheduler::set_group_limit`] and [`Scheduler::set_default_group_concurrency`]. +//! //! ## Child tasks & two-phase execution //! //! An executor can spawn child tasks via [`TaskContext::spawn_child`]. When @@ -218,6 +239,31 @@ //! returns a serializable [`SchedulerSnapshot`] with queue depths, running //! tasks, progress estimates, and backpressure. //! +//! ## Group concurrency +//! +//! Limit concurrent tasks within a named group — for example, cap uploads +//! per S3 bucket: +//! +//! ```ignore +//! let scheduler = Scheduler::builder() +//! .store_path("tasks.db") +//! .executor("upload-part", Arc::new(UploadPartExecutor)) +//! .default_group_concurrency(4) // default for all groups +//! .group_concurrency("s3://hot-bucket", 8) // override for one group +//! .build() +//! .await?; +//! +//! // Tasks declare their group via the submission: +//! let sub = TaskSubmission::new("upload-part") +//! .group("s3://my-bucket") +//! .payload_json(&part)?; +//! scheduler.submit(&sub).await?; +//! +//! // Adjust at runtime: +//! scheduler.set_group_limit("s3://my-bucket", 2); +//! scheduler.remove_group_limit("s3://my-bucket"); // fall back to default +//! ``` +//! //! ## Child tasks //! //! Spawn child tasks from an executor to model fan-out work. The parent @@ -262,8 +308,8 @@ //! 3. Pending finalizers (parents whose children all completed) are //! dispatched first. //! 4. The highest-priority pending task is peeked (without claiming it). -//! 5. The dispatch gate checks concurrency limits, IO budget, and -//! backpressure. If the gate rejects, no slot is consumed. +//! 5. The dispatch gate checks concurrency limits, group concurrency, +//! IO budget, and backpressure. If the gate rejects, no slot is consumed. //! 6. If admitted, the task is atomically claimed (`peek` → `pop_by_id`) //! and spawned as a Tokio task. //! 7. Steps 4–6 repeat until the queue is empty or the gate rejects. diff --git a/src/resource/mod.rs b/src/resource/mod.rs index c459c43..9dddec4 100644 --- a/src/resource/mod.rs +++ b/src/resource/mod.rs @@ -1,14 +1,16 @@ //! System resource monitoring for IO-aware scheduling. //! -//! Implement [`ResourceSampler`] to feed CPU and disk IO metrics into the -//! scheduler, or use the built-in [`sysinfo_monitor`] module (enabled by the -//! `sysinfo-monitor` feature) for cross-platform monitoring. Enable via -//! [`SchedulerBuilder::with_resource_monitoring`](crate::SchedulerBuilder::with_resource_monitoring) +//! Implement [`ResourceSampler`] to feed CPU, disk IO, and network throughput +//! metrics into the scheduler, or use the built-in [`sysinfo_monitor`] module +//! (enabled by the `sysinfo-monitor` feature) for cross-platform monitoring. +//! Enable via [`SchedulerBuilder::with_resource_monitoring`](crate::SchedulerBuilder::with_resource_monitoring) //! or provide a custom sampler with //! [`SchedulerBuilder::resource_sampler`](crate::SchedulerBuilder::resource_sampler). //! //! The scheduler reads the latest EWMA-smoothed snapshot via [`ResourceReader`] -//! when making IO-budget dispatch decisions. +//! when making IO-budget dispatch decisions. For network-aware throttling, use +//! [`SchedulerBuilder::bandwidth_limit`](crate::SchedulerBuilder::bandwidth_limit) +//! to register a built-in [`NetworkPressure`](network_pressure::NetworkPressure) source. pub mod sampler; diff --git a/src/scheduler/mod.rs b/src/scheduler/mod.rs index 3628ad0..b36e23f 100644 --- a/src/scheduler/mod.rs +++ b/src/scheduler/mod.rs @@ -1,10 +1,10 @@ //! The scheduler: configuration, event stream, and the main run loop. //! //! [`Scheduler`] coordinates task execution — popping from the -//! [`TaskStore`], applying [backpressure](crate::backpressure) -//! and IO-budget checks, preempting lower-priority work, and emitting -//! [`SchedulerEvent`]s for UI integration. Use [`SchedulerBuilder`] for -//! ergonomic construction. +//! [`TaskStore`], applying [backpressure](crate::backpressure), +//! IO-budget checks, and [group concurrency](crate::GroupLimits) limits, +//! preempting lower-priority work, and emitting [`SchedulerEvent`]s for UI +//! integration. Use [`SchedulerBuilder`] for ergonomic construction. //! //! See the [crate-level docs](crate) for a full walkthrough of the task //! lifecycle, common patterns, and how the dispatch loop works. diff --git a/src/task.rs b/src/task.rs index db36836..973ed86 100644 --- a/src/task.rs +++ b/src/task.rs @@ -102,7 +102,9 @@ pub struct TaskRecord { pub payload: Option>, pub expected_read_bytes: i64, pub expected_write_bytes: i64, + /// Estimated network receive bytes for IO budget scheduling. pub expected_net_rx_bytes: i64, + /// Estimated network transmit bytes for IO budget scheduling. pub expected_net_tx_bytes: i64, pub retry_count: i32, pub last_error: Option, @@ -148,11 +150,15 @@ pub struct TaskHistoryRecord { pub payload: Option>, pub expected_read_bytes: i64, pub expected_write_bytes: i64, + /// Estimated network receive bytes declared at submission. pub expected_net_rx_bytes: i64, + /// Estimated network transmit bytes declared at submission. pub expected_net_tx_bytes: i64, pub actual_read_bytes: Option, pub actual_write_bytes: Option, + /// Actual network receive bytes reported by the executor. pub actual_net_rx_bytes: Option, + /// Actual network transmit bytes reported by the executor. pub actual_net_tx_bytes: Option, pub retry_count: i32, pub last_error: Option, @@ -170,14 +176,20 @@ pub struct TaskHistoryRecord { /// Accumulated IO metrics captured by the scheduler after an executor finishes. /// -/// Executors report metrics incrementally via [`TaskContext::record_read_bytes`](crate::TaskContext::record_read_bytes) -/// and [`TaskContext::record_write_bytes`](crate::TaskContext::record_write_bytes). +/// Executors report metrics incrementally via [`TaskContext::record_read_bytes`](crate::TaskContext::record_read_bytes), +/// [`record_write_bytes`](crate::TaskContext::record_write_bytes), +/// [`record_net_rx_bytes`](crate::TaskContext::record_net_rx_bytes), and +/// [`record_net_tx_bytes`](crate::TaskContext::record_net_tx_bytes). /// This struct is the snapshot read by the scheduler — executors never construct it directly. #[derive(Debug, Clone, Default, Serialize, Deserialize)] pub struct TaskMetrics { + /// Actual disk bytes read during execution. pub read_bytes: i64, + /// Actual disk bytes written during execution. pub write_bytes: i64, + /// Actual network bytes received during execution. pub net_rx_bytes: i64, + /// Actual network bytes transmitted during execution. pub net_tx_bytes: i64, } @@ -325,7 +337,9 @@ pub struct TaskSubmission { pub payload: Option>, pub expected_read_bytes: i64, pub expected_write_bytes: i64, + /// Estimated network receive bytes for IO budget scheduling. pub expected_net_rx_bytes: i64, + /// Estimated network transmit bytes for IO budget scheduling. pub expected_net_tx_bytes: i64, /// Parent task ID for hierarchical tasks. Set automatically by /// [`TaskContext::spawn_child`](crate::TaskContext::spawn_child).