From 0ac514cff684c17829bfe6c90fa1d187d0c3fd1b Mon Sep 17 00:00:00 2001 From: DJ Majumdar Date: Sat, 14 Mar 2026 10:02:21 -0700 Subject: [PATCH 1/3] refactor!: consolidate IO fields, event headers, and typed task API for 0.4.0 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit BREAKING CHANGES: - Replace `TaskMetrics` and 4 scattered IO fields with `IoBudget` struct across `TaskRecord`, `TaskHistoryRecord`, `TaskSubmission`, `TypedTask`, and `IoTracker::snapshot()` - Add `key()` and `label()` default methods to `TypedTask` trait, allowing typed tasks to control their dedup key and display label directly - Extract `TaskEventHeader` from `SchedulerEvent` variants — tuple variants (`Dispatched`, `Completed`, `Preempted`, `Cancelled`) now wrap it; struct variants (`Failed`, `Progress`) include it as `header`. Adds `header()` accessor on `SchedulerEvent` and `event_header()` on `TaskRecord`. `ProgressReporter` and `EstimatedProgress` also use `TaskEventHeader`. - `payload_json()` and `from_typed()` now return `Self` instead of `Result`, deferring serialization errors to submit time via `payload_error` field Updates all documentation to match the new API. --- README.md | 5 +- docs/ARCHITECTURE.md | 36 ++++++---- docs/features.md | 10 +-- docs/io-and-backpressure.md | 11 ++-- docs/persistence-and-recovery.md | 12 ++-- docs/progress-reporting.md | 32 +++++---- docs/query-apis.md | 2 +- docs/quick-start.md | 13 ++-- src/lib.rs | 52 +++++++-------- src/registry/io_tracker.rs | 12 ++-- src/scheduler/dispatch.rs | 63 ++++-------------- src/scheduler/event.rs | 67 ++++++++++--------- src/scheduler/gate.rs | 18 ++--- src/scheduler/mod.rs | 2 +- src/scheduler/progress.rs | 34 +++------- src/scheduler/run_loop.rs | 6 +- src/scheduler/submit.rs | 18 ++--- src/scheduler/tests.rs | 20 +++--- src/store/hierarchy.rs | 14 ++-- src/store/lifecycle.rs | 72 ++++++++++---------- src/store/mod.rs | 6 +- src/store/query.rs | 22 ++----- src/store/row_mapping.rs | 39 +++++++---- src/store/submit.rs | 20 +++--- src/task/mod.rs | 84 ++++++++++++++---------- src/task/submission.rs | 109 ++++++++++++------------------- src/task/tests.rs | 74 ++++++++++++++------- src/task/typed.rs | 38 +++++------ tests/integration.rs | 25 +++---- 29 files changed, 438 insertions(+), 478 deletions(-) diff --git a/README.md b/README.md index ea82cc7..eea65d5 100644 --- a/README.md +++ b/README.md @@ -14,7 +14,7 @@ throughput. use std::sync::Arc; use tokio_util::sync::CancellationToken; use taskmill::{ - Scheduler, Priority, TaskSubmission, TaskExecutor, + Scheduler, Priority, IoBudget, TaskSubmission, TaskExecutor, TaskContext, TaskError, }; @@ -44,8 +44,7 @@ async fn main() { let sub = TaskSubmission::new("thumbnail") .payload_json(&serde_json::json!({"path": "/photos/img.jpg"})) - .unwrap() - .expected_io(4096, 1024); + .expected_io(IoBudget::disk(4096, 1024)); scheduler.submit(&sub).await.unwrap(); let token = CancellationToken::new(); diff --git a/docs/ARCHITECTURE.md b/docs/ARCHITECTURE.md index 035882d..0df657a 100644 --- a/docs/ARCHITECTURE.md +++ b/docs/ARCHITECTURE.md @@ -72,8 +72,10 @@ flowchart TD | `priority` | `INTEGER NOT NULL` — 0 (highest) to 255 (lowest) | | `status` | `TEXT` — `pending`, `running`, or `paused` | | `payload` | `BLOB` — opaque, max 1 MiB, executor-defined | -| `expected_read_bytes` | Caller's IO estimate for scheduling decisions | -| `expected_write_bytes`| Caller's IO estimate for scheduling decisions | +| `expected_read_bytes` | Caller's disk read IO estimate (part of `IoBudget`)| +| `expected_write_bytes`| Caller's disk write IO estimate (part of `IoBudget`)| +| `expected_net_rx_bytes`| Caller's network RX estimate (part of `IoBudget`) | +| `expected_net_tx_bytes`| Caller's network TX estimate (part of `IoBudget`) | | `retry_count` | Incremented on each retryable failure | | `last_error` | Most recent error message (for diagnostics) | | `started_at` | Set when popped; cleared on pause | @@ -90,8 +92,10 @@ insert into `task_history` in one transaction). Additional columns: | Column | Purpose | |-----------------------|----------------------------------------------------| -| `actual_read_bytes` | Reported by executor on completion | -| `actual_write_bytes` | Reported by executor on completion | +| `actual_read_bytes` | Reported by executor on completion (part of `IoBudget`) | +| `actual_write_bytes` | Reported by executor on completion (part of `IoBudget`) | +| `actual_net_rx_bytes` | Reported by executor on completion (part of `IoBudget`) | +| `actual_net_tx_bytes` | Reported by executor on completion (part of `IoBudget`) | | `completed_at` | Timestamp of completion or failure | | `duration_ms` | Computed from `started_at` to `completed_at` | @@ -253,10 +257,10 @@ knowing the concrete gate type. ### Expected vs actual IO -Callers provide `expected_read_bytes` / `expected_write_bytes` on submission. -Executors report `actual_read_bytes` / `actual_write_bytes` on completion. The -history table stores both, enabling learning via `avg_throughput()` and -`history_stats()`. +Callers provide an `IoBudget` (disk read/write, network rx/tx) on submission. +Executors report actual IO via context methods on completion. The history table +stores both expected and actual values, enabling learning via `avg_throughput()` +and `history_stats()`. ### IO budget heuristic @@ -350,15 +354,19 @@ Executor returns Err(TaskError) | Event | When | |-------------|----------------------------------------------| -| `Dispatched`| Task popped and executor spawned | -| `Completed` | Task finished successfully | -| `Failed` | Task failed (includes `will_retry` flag) | -| `Preempted` | Task paused for higher-priority work | -| `Cancelled` | Task cancelled via `cancel()` | -| `Progress` | Executor reported progress (0.0–1.0) | +| `Dispatched(TaskEventHeader)` | Task popped and executor spawned | +| `Completed(TaskEventHeader)` | Task finished successfully | +| `Failed { header, error, will_retry }` | Task failed (includes retry flag) | +| `Preempted(TaskEventHeader)` | Task paused for higher-priority work | +| `Cancelled(TaskEventHeader)` | Task cancelled via `cancel()` | +| `Progress { header, percent, message }` | Progress update from executor | +| `Waiting { task_id, children_count }` | Parent entered waiting state | | `Paused` | Scheduler globally paused | | `Resumed` | Scheduler globally resumed | +Task-specific variants share a `TaskEventHeader` containing `task_id`, +`task_type`, `key`, and `label`. Use `event.header()` to access it generically. + All variants derive `Serialize`/`Deserialize`. ## Progress reporting diff --git a/docs/features.md b/docs/features.md index 9458b02..587921d 100644 --- a/docs/features.md +++ b/docs/features.md @@ -31,8 +31,8 @@ A complete list of taskmill's capabilities. ## IO Awareness -- **Expected/actual IO tracking** — submit estimated read/write bytes; executors report actual bytes on completion. -- **Network IO tracking** — tasks can declare expected network RX/TX bytes via `expected_net_io()` and report actuals via `ctx.record_net_rx_bytes()` / `ctx.record_net_tx_bytes()`. +- **Expected/actual IO tracking** — submit an `IoBudget` (disk read/write, network rx/tx); executors report actual bytes on completion. +- **Network IO tracking** — tasks can declare expected network RX/TX bytes via `IoBudget::net()` and report actuals via `ctx.record_net_rx_bytes()` / `ctx.record_net_tx_bytes()`. - **IO budget gating** — the scheduler compares running task IO estimates against EWMA-smoothed system throughput. New work is deferred when cumulative IO would exceed 80% of observed disk capacity. - **Learning from history** — `avg_throughput()` and `history_stats()` compute per-type IO averages from actual completions, enabling callers to refine estimates over time. @@ -70,7 +70,7 @@ A complete list of taskmill's capabilities. ## Lifecycle Events -- **Broadcast channel** — subscribe via `scheduler.subscribe()` to receive `SchedulerEvent` variants: `Dispatched`, `Completed`, `Failed`, `Preempted`, `Cancelled`, `Progress`, `Waiting`, `Paused`, `Resumed`. +- **Broadcast channel** — subscribe via `scheduler.subscribe()` to receive `SchedulerEvent` variants: `Dispatched`, `Completed`, `Failed`, `Preempted`, `Cancelled`, `Progress`, `Waiting`, `Paused`, `Resumed`. Task-specific variants share a `TaskEventHeader` with `task_id`, `task_type`, `key`, and `label`. - **Tauri-ready** — all events are `Serialize`, designed for direct bridging to frontend via `app_handle.emit()`. ## Task Management @@ -81,9 +81,9 @@ A complete list of taskmill's capabilities. ## Typed Payloads -- **Builder-style submission** — `TaskSubmission::new(type).payload_json(&data)?.expected_io(r, w)` for ergonomic construction with serialization. Use `.label("display name")` to set a human-readable display label independent of the dedup key. +- **Builder-style submission** — `TaskSubmission::new(type).payload_json(&data).expected_io(budget)` for ergonomic construction with serialization. Serialization errors are deferred to submit time. Use `.label("display name")` to set a human-readable display label independent of the dedup key. - **Type-safe deserialization** — `ctx.payload::()?` in executors for zero-boilerplate extraction. -- **TypedTask trait** — define `TASK_TYPE`, default priority, and expected IO on your struct. Submit with `scheduler.submit_typed()` and deserialize with `ctx.payload::()`. +- **TypedTask trait** — define `TASK_TYPE`, default priority, expected IO, key, and label on your struct. Submit with `scheduler.submit_typed()` and deserialize with `ctx.payload::()`. ## Child Tasks diff --git a/docs/io-and-backpressure.md b/docs/io-and-backpressure.md index 5dc3d7b..fdf07b9 100644 --- a/docs/io-and-backpressure.md +++ b/docs/io-and-backpressure.md @@ -9,9 +9,11 @@ Taskmill combines two independent gating mechanisms — IO budget tracking and c Every `TaskSubmission` includes expected IO: ```rust +use taskmill::IoBudget; + let sub = TaskSubmission::new("scan") .payload_raw(data) - .expected_io(50_000, 10_000); // caller's estimate + .expected_io(IoBudget::disk(50_000, 10_000)); // caller's estimate ``` ### Completion actuals @@ -30,10 +32,11 @@ Actual values are stored in `task_history` for learning. Tasks that perform network transfers can declare expected bandwidth: ```rust +use taskmill::IoBudget; + let sub = TaskSubmission::new("upload") - .payload_json(&upload_payload)? - .expected_io(0, 0) // no disk IO - .expected_net_io(0, 50_000_000); // 50 MB upload + .payload_json(&upload_payload) + .expected_io(IoBudget::net(0, 50_000_000)); // 50 MB upload ``` Executors report actual network bytes during execution: diff --git a/docs/persistence-and-recovery.md b/docs/persistence-and-recovery.md index c493c56..9b2b398 100644 --- a/docs/persistence-and-recovery.md +++ b/docs/persistence-and-recovery.md @@ -19,8 +19,10 @@ Holds pending, running, and paused tasks. | `priority` | INTEGER NOT NULL | 0–255 (lower = higher priority) | | `status` | TEXT DEFAULT 'pending' | `pending`, `running`, `paused`, or `waiting` | | `payload` | BLOB | Opaque task data (max 1 MiB) | -| `expected_read_bytes` | INTEGER | Estimated read IO | -| `expected_write_bytes` | INTEGER | Estimated write IO | +| `expected_read_bytes` | INTEGER | Estimated disk read IO (part of `IoBudget`) | +| `expected_write_bytes` | INTEGER | Estimated disk write IO (part of `IoBudget`) | +| `expected_net_rx_bytes` | INTEGER | Estimated network RX (part of `IoBudget`) | +| `expected_net_tx_bytes` | INTEGER | Estimated network TX (part of `IoBudget`) | | `parent_id` | INTEGER | Parent task ID for child tasks (NULL for top-level) | | `fail_fast` | INTEGER DEFAULT 1 | Whether child failure cancels siblings and fails parent | | `retry_count` | INTEGER DEFAULT 0 | Number of retries so far | @@ -35,8 +37,10 @@ Holds pending, running, and paused tasks. | Column | Type | Description | |--------|------|-------------| | *(all columns from `tasks`, including `label`, `parent_id`, `fail_fast`)* | | | -| `actual_read_bytes` | INTEGER | Reported by executor | -| `actual_write_bytes` | INTEGER | Reported by executor | +| `actual_read_bytes` | INTEGER | Reported by executor (part of `IoBudget`) | +| `actual_write_bytes` | INTEGER | Reported by executor (part of `IoBudget`) | +| `actual_net_rx_bytes` | INTEGER | Reported by executor (part of `IoBudget`) | +| `actual_net_tx_bytes` | INTEGER | Reported by executor (part of `IoBudget`) | | `completed_at` | TEXT | ISO 8601 timestamp | | `duration_ms` | INTEGER | Wall-clock duration | | `status` | TEXT | `completed` or `failed` | diff --git a/docs/progress-reporting.md b/docs/progress-reporting.md index f06fa7b..54fdb61 100644 --- a/docs/progress-reporting.md +++ b/docs/progress-reporting.md @@ -43,10 +43,12 @@ Every `report()` call emits a `SchedulerEvent::Progress`: ```rust SchedulerEvent::Progress { - task_id: 42, - task_type: "resize".into(), - key: "abc123".into(), - label: "my-image.jpg".into(), + header: TaskEventHeader { + task_id: 42, + task_type: "resize".into(), + key: "abc123".into(), + label: "my-image.jpg".into(), + }, percent: 0.5, message: Some("resizing".into()), } @@ -58,8 +60,8 @@ Subscribe to events for real-time UI updates: let mut events = scheduler.subscribe(); tokio::spawn(async move { while let Ok(event) = events.recv().await { - if let SchedulerEvent::Progress { task_id, percent, message, .. } = event { - update_ui(task_id, percent, message); + if let SchedulerEvent::Progress { header, percent, message } = event { + update_ui(header.task_id, percent, message); } } }); @@ -84,7 +86,7 @@ This means even tasks with no explicit progress reporting show movement in UI da ```rust let progress = scheduler.estimated_progress().await; for p in &progress { - println!("{} ({}): {:.0}%", p.task_type, p.key, p.percent * 100.0); + println!("{} ({}): {:.0}%", p.header.task_type, p.header.key, p.percent * 100.0); // p.reported_percent — last executor-reported value (if any) // p.extrapolated_percent — throughput-based estimate (if any) // p.percent — best available: reported if present, else extrapolated @@ -98,7 +100,7 @@ The `SchedulerSnapshot` includes progress for all running tasks: ```rust let snap = scheduler.snapshot().await?; for p in &snap.progress { - println!("{}: {:.0}%", p.key, p.percent * 100.0); + println!("{}: {:.0}%", p.header.key, p.percent * 100.0); } ``` @@ -108,16 +110,18 @@ All scheduler state changes are broadcast as `SchedulerEvent` variants: | Event | When | |-------|------| -| `Dispatched { task_id, task_type, key, label }` | Task popped from queue and executor spawned | -| `Completed { task_id, task_type, key, label }` | Task finished successfully | -| `Failed { task_id, task_type, key, label, error, will_retry }` | Task failed (includes whether it will be retried) | -| `Preempted { task_id, task_type, key, label }` | Task paused for higher-priority work | -| `Cancelled { task_id, task_type, key, label }` | Task cancelled via `scheduler.cancel()` | -| `Progress { task_id, task_type, key, label, percent, message }` | Progress update from executor | +| `Dispatched(TaskEventHeader)` | Task popped from queue and executor spawned | +| `Completed(TaskEventHeader)` | Task finished successfully | +| `Failed { header, error, will_retry }` | Task failed (includes whether it will be retried) | +| `Preempted(TaskEventHeader)` | Task paused for higher-priority work | +| `Cancelled(TaskEventHeader)` | Task cancelled via `scheduler.cancel()` | +| `Progress { header, percent, message }` | Progress update from executor | | `Waiting { task_id, children_count }` | Parent task entered waiting state after spawning children | | `Paused` | Scheduler globally paused via `pause_all()` | | `Resumed` | Scheduler resumed via `resume_all()` | +Task-specific variants share a `TaskEventHeader` struct with `task_id`, `task_type`, `key`, and `label`. Use `event.header()` to access it generically. + ### Tauri bridge Bridge events to the frontend in a Tauri app: diff --git a/docs/query-apis.md b/docs/query-apis.md index f0acaee..878dc9c 100644 --- a/docs/query-apis.md +++ b/docs/query-apis.md @@ -15,7 +15,7 @@ All queries are available on `TaskStore`, accessed via `scheduler.store()`. | `paused_count()` | `i64` | Count of paused tasks. | | `task_by_id(id)` | `Option` | Look up an active task by row ID. | | `task_by_key(key)` | `Option` | Look up an active task by dedup key. | -| `running_io_totals()` | `(i64, i64)` | Sum of `(expected_read_bytes, expected_write_bytes)` across running tasks. | +| `running_io_totals()` | `(i64, i64)` | Sum of `(expected_io.disk_read, expected_io.disk_write)` across running tasks. | ## History queries diff --git a/docs/quick-start.md b/docs/quick-start.md index a594d24..1fc7b59 100644 --- a/docs/quick-start.md +++ b/docs/quick-start.md @@ -63,7 +63,7 @@ impl TaskExecutor for ImageResizer { use std::sync::Arc; use std::time::Duration; use tokio_util::sync::CancellationToken; -use taskmill::{Scheduler, Priority, TaskSubmission, ShutdownMode}; +use taskmill::{Scheduler, Priority, IoBudget, TaskSubmission, ShutdownMode}; #[tokio::main] async fn main() { @@ -92,8 +92,7 @@ async fn main() { // Submit a single task with a typed payload. let sub = TaskSubmission::new("resize") .payload_json(&serde_json::json!({"path": "/photos/image.jpg", "width": 300})) - .unwrap() - .expected_io(4096, 1024); + .expected_io(IoBudget::disk(4096, 1024)); scheduler.submit(&sub).await.unwrap(); // Submit tasks in bulk (single SQLite transaction). @@ -101,8 +100,7 @@ async fn main() { let batch: Vec<_> = paths.iter().map(|p| { TaskSubmission::new("resize") .payload_json(&serde_json::json!({"path": p})) - .unwrap() - .expected_io(4096, 1024) + .expected_io(IoBudget::disk(4096, 1024)) }).collect(); let outcomes = scheduler.submit_batch(&batch).await.unwrap(); // Each outcome is Inserted, Upgraded, Requeued, or Duplicate. @@ -119,7 +117,7 @@ For stronger type safety, implement the `TypedTask` trait: ```rust use serde::{Serialize, Deserialize}; -use taskmill::{TypedTask, Priority}; +use taskmill::{TypedTask, IoBudget, Priority}; #[derive(Serialize, Deserialize)] struct ResizeTask { @@ -130,8 +128,7 @@ struct ResizeTask { impl TypedTask for ResizeTask { const TASK_TYPE: &'static str = "resize"; - fn expected_read_bytes(&self) -> i64 { 4096 } - fn expected_write_bytes(&self) -> i64 { 1024 } + fn expected_io(&self) -> IoBudget { IoBudget::disk(4096, 1024) } fn priority(&self) -> Priority { Priority::NORMAL } } diff --git a/src/lib.rs b/src/lib.rs index 7e20df3..39c6fd7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -60,24 +60,20 @@ //! //! ## IO budgeting //! -//! Each task declares expected read/write bytes (via [`TypedTask`] or -//! [`TaskSubmission`] fields). The scheduler tracks running IO totals and, -//! when [resource monitoring](SchedulerBuilder::with_resource_monitoring) is -//! enabled, compares them against observed system disk throughput to avoid -//! over-saturating the disk. Executors report actual IO via -//! [`TaskContext::record_read_bytes`] / [`record_write_bytes`](TaskContext::record_write_bytes), +//! Each task declares an [`IoBudget`] covering expected disk and network +//! bytes (via [`TypedTask::expected_io`] or [`TaskSubmission::expected_io`]). +//! The scheduler tracks running IO totals and, when +//! [resource monitoring](SchedulerBuilder::with_resource_monitoring) is enabled, +//! compares them against observed system throughput to avoid over-saturating +//! the disk or network. Executors report actual IO via +//! [`TaskContext::record_read_bytes`] / [`record_write_bytes`](TaskContext::record_write_bytes) / +//! [`record_net_rx_bytes`](TaskContext::record_net_rx_bytes) / +//! [`record_net_tx_bytes`](TaskContext::record_net_tx_bytes), //! which feeds back into historical throughput averages for future scheduling //! decisions. //! -//! ### Network IO -//! -//! Tasks can also declare expected network IO via -//! [`TaskSubmission::expected_net_io`] (or [`TypedTask::expected_net_rx_bytes`] / -//! [`expected_net_tx_bytes`](TypedTask::expected_net_tx_bytes)). Executors report -//! actual network bytes via [`TaskContext::record_net_rx_bytes`] / -//! [`record_net_tx_bytes`](TaskContext::record_net_tx_bytes). To throttle tasks -//! when network bandwidth is saturated, set a bandwidth cap with -//! [`SchedulerBuilder::bandwidth_limit`] — this registers a built-in +//! To throttle tasks when network bandwidth is saturated, set a bandwidth +//! cap with [`SchedulerBuilder::bandwidth_limit`] — this registers a built-in //! [`NetworkPressure`] source that maps observed throughput to backpressure. //! //! ## Task groups @@ -106,7 +102,7 @@ //! use std::sync::Arc; //! use taskmill::{ //! Scheduler, TaskExecutor, TaskContext, TaskError, -//! TypedTask, Priority, +//! TypedTask, IoBudget, Priority, //! }; //! use serde::{Serialize, Deserialize}; //! use tokio_util::sync::CancellationToken; @@ -117,8 +113,7 @@ //! //! impl TypedTask for Thumbnail { //! const TASK_TYPE: &'static str = "thumbnail"; -//! fn expected_read_bytes(&self) -> i64 { 4_096 } -//! fn expected_write_bytes(&self) -> i64 { 1_024 } +//! fn expected_io(&self) -> IoBudget { IoBudget::disk(4_096, 1_024) } //! } //! //! // 2. Implement the executor. @@ -223,11 +218,11 @@ //! tokio::spawn(async move { //! while let Ok(event) = rx.recv().await { //! match event { -//! SchedulerEvent::Progress { task_id, percent, message, .. } => { -//! update_progress_bar(task_id, percent, message); +//! SchedulerEvent::Progress { header, percent, message } => { +//! update_progress_bar(header.task_id, percent, message); //! } -//! SchedulerEvent::Completed { task_id, .. } => { -//! mark_done(task_id); +//! SchedulerEvent::Completed(header) => { +//! mark_done(header.task_id); //! } //! _ => {} //! } @@ -256,7 +251,7 @@ //! // Tasks declare their group via the submission: //! let sub = TaskSubmission::new("upload-part") //! .group("s3://my-bucket") -//! .payload_json(&part)?; +//! .payload_json(&part); //! scheduler.submit(&sub).await?; //! //! // Adjust at runtime: @@ -279,8 +274,8 @@ //! TaskSubmission::new("upload-part") //! .key(&part.etag) //! .priority(ctx.record().priority) -//! .payload_json(part)? -//! .expected_io(part.size as i64, 0), +//! .payload_json(part) +//! .expected_io(IoBudget::disk(part.size as i64, 0)), //! ).await?; //! } //! Ok(()) @@ -339,13 +334,12 @@ pub use resource::sampler::SamplerConfig; pub use resource::{ResourceReader, ResourceSampler, ResourceSnapshot}; pub use scheduler::{ EstimatedProgress, GroupLimits, ProgressReporter, Scheduler, SchedulerBuilder, SchedulerConfig, - SchedulerEvent, SchedulerSnapshot, ShutdownMode, + SchedulerEvent, SchedulerSnapshot, ShutdownMode, TaskEventHeader, }; pub use store::{RetentionPolicy, StoreConfig, StoreError, TaskStore}; pub use task::{ - generate_dedup_key, HistoryStatus, ParentResolution, SubmitOutcome, TaskError, - TaskHistoryRecord, TaskLookup, TaskMetrics, TaskRecord, TaskStatus, TaskSubmission, TypeStats, - TypedTask, + generate_dedup_key, HistoryStatus, IoBudget, ParentResolution, SubmitOutcome, TaskError, + TaskHistoryRecord, TaskLookup, TaskRecord, TaskStatus, TaskSubmission, TypeStats, TypedTask, }; #[cfg(feature = "sysinfo-monitor")] diff --git a/src/registry/io_tracker.rs b/src/registry/io_tracker.rs index 52ca5bb..c7c8678 100644 --- a/src/registry/io_tracker.rs +++ b/src/registry/io_tracker.rs @@ -24,12 +24,12 @@ impl IoTracker { } } - pub fn snapshot(&self) -> crate::task::TaskMetrics { - crate::task::TaskMetrics { - read_bytes: self.read_bytes.load(Ordering::Relaxed), - write_bytes: self.write_bytes.load(Ordering::Relaxed), - net_rx_bytes: self.net_rx_bytes.load(Ordering::Relaxed), - net_tx_bytes: self.net_tx_bytes.load(Ordering::Relaxed), + pub fn snapshot(&self) -> crate::task::IoBudget { + crate::task::IoBudget { + disk_read: self.read_bytes.load(Ordering::Relaxed), + disk_write: self.write_bytes.load(Ordering::Relaxed), + net_rx: self.net_rx_bytes.load(Ordering::Relaxed), + net_tx: self.net_tx_bytes.load(Ordering::Relaxed), } } } diff --git a/src/scheduler/dispatch.rs b/src/scheduler/dispatch.rs index d78ba13..1125e6b 100644 --- a/src/scheduler/dispatch.rs +++ b/src/scheduler/dispatch.rs @@ -8,7 +8,7 @@ use tokio_util::sync::CancellationToken; use crate::priority::Priority; use crate::registry::{ChildSpawner, IoTracker, TaskContext}; use crate::store::TaskStore; -use crate::task::{ParentResolution, TaskMetrics, TaskRecord}; +use crate::task::{ParentResolution, IoBudget, TaskRecord}; use super::progress::ProgressReporter; use super::SchedulerEvent; @@ -140,12 +140,7 @@ impl ActiveTaskMap { ); at.token.cancel(); let _ = store.pause(id).await; - let _ = event_tx.send(SchedulerEvent::Preempted { - task_id: id, - task_type: at.record.task_type.clone(), - key: at.record.key.clone(), - label: at.record.label.clone(), - }); + let _ = event_tx.send(SchedulerEvent::Preempted(at.record.event_header())); preempted.push(id); } @@ -211,12 +206,7 @@ impl ActiveTaskMap { for (id, at) in drained { at.token.cancel(); let _ = store.pause(id).await; - let _ = event_tx.send(SchedulerEvent::Preempted { - task_id: id, - task_type: at.record.task_type.clone(), - key: at.record.key.clone(), - label: at.record.label.clone(), - }); + let _ = event_tx.send(SchedulerEvent::Preempted(at.record.event_header())); } count } @@ -282,10 +272,7 @@ pub(crate) async fn spawn_task( record: task.clone(), token: child_token.clone(), progress: ProgressReporter::new( - task.id, - task.task_type.clone(), - task.key.clone(), - task.label.clone(), + task.event_header(), event_tx.clone(), active.clone(), ), @@ -296,12 +283,7 @@ pub(crate) async fn spawn_task( }; // Emit dispatched event. - let _ = event_tx.send(SchedulerEvent::Dispatched { - task_id: task.id, - task_type: task.task_type.clone(), - key: task.key.clone(), - label: task.label.clone(), - }); + let _ = event_tx.send(SchedulerEvent::Dispatched(task.event_header())); // Spawn executor. let task_id_for_handle = task.id; @@ -365,12 +347,7 @@ pub(crate) async fn spawn_task( } // Remove from active tracking AFTER the store write completes. active.remove(task_id); - let _ = event_tx.send(SchedulerEvent::Completed { - task_id, - task_type: task.task_type.clone(), - key: task.key.clone(), - label: task.label.clone(), - }); + let _ = event_tx.send(SchedulerEvent::Completed(task.event_header())); work_notify.notify_one(); // If this was a child task, check if parent is ready. @@ -410,10 +387,7 @@ pub(crate) async fn spawn_task( // Remove from active tracking AFTER the store write completes. active.remove(task_id); let _ = event_tx.send(SchedulerEvent::Failed { - task_id, - task_type: task.task_type.clone(), - key: task.key.clone(), - label: task.label.clone(), + header: task.event_header(), error: te.message.clone(), will_retry, }); @@ -432,12 +406,9 @@ pub(crate) async fn spawn_task( if let Some(at) = active.remove(*rid) { at.token.cancel(); let _ = store.delete(*rid).await; - let _ = event_tx.send(SchedulerEvent::Cancelled { - task_id: *rid, - task_type: at.record.task_type.clone(), - key: at.record.key.clone(), - label: at.record.label.clone(), - }); + let _ = event_tx.send(SchedulerEvent::Cancelled( + at.record.event_header(), + )); } } } @@ -449,7 +420,7 @@ pub(crate) async fn spawn_task( &msg, false, 0, - &TaskMetrics::default(), + &IoBudget::default(), ) .await { @@ -460,10 +431,7 @@ pub(crate) async fn spawn_task( ); } let _ = event_tx.send(SchedulerEvent::Failed { - task_id: parent_id, - task_type: parent.task_type.clone(), - key: parent.key.clone(), - label: parent.label.clone(), + header: parent.event_header(), error: msg, will_retry: false, }); @@ -511,16 +479,13 @@ async fn handle_parent_resolution( // All children done but some failed — fail the parent. if let Ok(Some(parent)) = store.task_by_id(parent_id).await { if let Err(e) = store - .fail_with_record(&parent, &reason, false, 0, &TaskMetrics::default()) + .fail_with_record(&parent, &reason, false, 0, &IoBudget::default()) .await { tracing::error!(parent_id, error = %e, "failed to record parent failure"); } let _ = event_tx.send(SchedulerEvent::Failed { - task_id: parent_id, - task_type: parent.task_type.clone(), - key: parent.key.clone(), - label: parent.label.clone(), + header: parent.event_header(), error: reason, will_retry: false, }); diff --git a/src/scheduler/event.rs b/src/scheduler/event.rs index 0d20e35..635b40d 100644 --- a/src/scheduler/event.rs +++ b/src/scheduler/event.rs @@ -35,6 +35,21 @@ pub struct SchedulerSnapshot { pub is_paused: bool, } +// ── Task Event Header ──────────────────────────────────────────────── + +/// Common fields shared by task-specific [`SchedulerEvent`] variants. +/// +/// Extracted to avoid repeating `task_id`, `task_type`, `key`, `label` +/// in every variant. Use [`SchedulerEvent::header()`] to access it +/// generically. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct TaskEventHeader { + pub task_id: i64, + pub task_type: String, + pub key: String, + pub label: String, +} + // ── Events ────────────────────────────────────────────────────────── /// Events emitted by the scheduler for UI integration and observability. @@ -45,48 +60,22 @@ pub struct SchedulerSnapshot { #[serde(tag = "type", content = "data")] pub enum SchedulerEvent { /// A task was dispatched and is now running. - Dispatched { - task_id: i64, - task_type: String, - key: String, - label: String, - }, + Dispatched(TaskEventHeader), /// A task completed successfully. - Completed { - task_id: i64, - task_type: String, - key: String, - label: String, - }, + Completed(TaskEventHeader), /// A task failed (may be retried or permanently failed). Failed { - task_id: i64, - task_type: String, - key: String, - label: String, + header: TaskEventHeader, error: String, will_retry: bool, }, /// A task was preempted by higher-priority work. - Preempted { - task_id: i64, - task_type: String, - key: String, - label: String, - }, + Preempted(TaskEventHeader), /// A task was cancelled by the application. - Cancelled { - task_id: i64, - task_type: String, - key: String, - label: String, - }, + Cancelled(TaskEventHeader), /// Progress update from a running task. Progress { - task_id: i64, - task_type: String, - key: String, - label: String, + header: TaskEventHeader, /// Progress percentage (0.0 to 1.0). percent: f32, /// Optional human-readable message from the executor. @@ -101,6 +90,20 @@ pub enum SchedulerEvent { Resumed, } +impl SchedulerEvent { + /// Returns the [`TaskEventHeader`] if this event is task-specific. + pub fn header(&self) -> Option<&TaskEventHeader> { + match self { + Self::Dispatched(h) + | Self::Completed(h) + | Self::Preempted(h) + | Self::Cancelled(h) => Some(h), + Self::Failed { header, .. } | Self::Progress { header, .. } => Some(header), + Self::Waiting { .. } | Self::Paused | Self::Resumed => None, + } + } +} + // ── Config ────────────────────────────────────────────────────────── /// How the scheduler behaves during shutdown. diff --git a/src/scheduler/gate.rs b/src/scheduler/gate.rs index 4855545..570f88b 100644 --- a/src/scheduler/gate.rs +++ b/src/scheduler/gate.rs @@ -127,8 +127,8 @@ impl DispatchGate for DefaultDispatchGate { if !has_io_headroom(task, ctx).await? { tracing::trace!( task_type = task.task_type, - expected_read = task.expected_read_bytes, - expected_write = task.expected_write_bytes, + expected_read = task.expected_io.disk_read, + expected_write = task.expected_io.disk_write, "task deferred — disk IO budget exhausted — requeuing" ); return Ok(false); @@ -138,8 +138,8 @@ impl DispatchGate for DefaultDispatchGate { if !has_net_io_headroom(task, ctx).await? { tracing::trace!( task_type = task.task_type, - expected_rx = task.expected_net_rx_bytes, - expected_tx = task.expected_net_tx_bytes, + expected_rx = task.expected_io.net_rx, + expected_tx = task.expected_io.net_tx, "task deferred — network IO budget exhausted — requeuing" ); return Ok(false); @@ -213,9 +213,9 @@ pub async fn has_io_headroom(task: &TaskRecord, ctx: &GateContext<'_>) -> Result let write_capacity = snapshot.io_write_bytes_per_sec * 2.0; let read_ok = read_capacity == 0.0 - || (running_read + task.expected_read_bytes) as f64 <= read_capacity * 0.8; + || (running_read + task.expected_io.disk_read) as f64 <= read_capacity * 0.8; let write_ok = write_capacity == 0.0 - || (running_write + task.expected_write_bytes) as f64 <= write_capacity * 0.8; + || (running_write + task.expected_io.disk_write) as f64 <= write_capacity * 0.8; Ok(read_ok && write_ok) } @@ -231,7 +231,7 @@ pub async fn has_net_io_headroom( ctx: &GateContext<'_>, ) -> Result { // If the task doesn't declare any network IO, always allow. - if task.expected_net_rx_bytes == 0 && task.expected_net_tx_bytes == 0 { + if task.expected_io.net_rx == 0 && task.expected_io.net_tx == 0 { return Ok(true); } @@ -250,9 +250,9 @@ pub async fn has_net_io_headroom( let tx_capacity = snapshot.net_tx_bytes_per_sec * 2.0; let rx_ok = - rx_capacity == 0.0 || (running_rx + task.expected_net_rx_bytes) as f64 <= rx_capacity * 0.8; + rx_capacity == 0.0 || (running_rx + task.expected_io.net_rx) as f64 <= rx_capacity * 0.8; let tx_ok = - tx_capacity == 0.0 || (running_tx + task.expected_net_tx_bytes) as f64 <= tx_capacity * 0.8; + tx_capacity == 0.0 || (running_tx + task.expected_io.net_tx) as f64 <= tx_capacity * 0.8; Ok(rx_ok && tx_ok) } diff --git a/src/scheduler/mod.rs b/src/scheduler/mod.rs index f1b72ac..84686be 100644 --- a/src/scheduler/mod.rs +++ b/src/scheduler/mod.rs @@ -48,7 +48,7 @@ use crate::store::TaskStore; use dispatch::ActiveTaskMap; pub use builder::SchedulerBuilder; -pub use event::{SchedulerConfig, SchedulerEvent, SchedulerSnapshot, ShutdownMode}; +pub use event::{SchedulerConfig, SchedulerEvent, SchedulerSnapshot, ShutdownMode, TaskEventHeader}; pub use gate::GroupLimits; pub use progress::{EstimatedProgress, ProgressReporter}; diff --git a/src/scheduler/progress.rs b/src/scheduler/progress.rs index b5a36bc..f0dbc81 100644 --- a/src/scheduler/progress.rs +++ b/src/scheduler/progress.rs @@ -13,6 +13,7 @@ use crate::store::TaskStore; use crate::task::TaskRecord; use super::dispatch::ActiveTaskMap; +use super::event::TaskEventHeader; use super::SchedulerEvent; // ── Progress Reporter ────────────────────────────────────────────── @@ -39,10 +40,7 @@ use super::SchedulerEvent; /// ``` #[derive(Clone)] pub struct ProgressReporter { - task_id: i64, - task_type: String, - key: String, - label: String, + header: TaskEventHeader, event_tx: tokio::sync::broadcast::Sender, /// Direct handle for updating internal progress tracking without a /// broadcast roundtrip. @@ -51,18 +49,12 @@ pub struct ProgressReporter { impl ProgressReporter { pub(crate) fn new( - task_id: i64, - task_type: String, - key: String, - label: String, + header: TaskEventHeader, event_tx: tokio::sync::broadcast::Sender, active: ActiveTaskMap, ) -> Self { Self { - task_id, - task_type, - key, - label, + header, event_tx, active, } @@ -72,13 +64,10 @@ impl ProgressReporter { pub fn report(&self, percent: f32, message: Option) { let clamped = percent.clamp(0.0, 1.0); // Update internal progress tracking directly (sync, no broadcast roundtrip). - self.active.update_progress(self.task_id, clamped); + self.active.update_progress(self.header.task_id, clamped); // Broadcast for external subscribers (UI / Tauri). let _ = self.event_tx.send(SchedulerEvent::Progress { - task_id: self.task_id, - task_type: self.task_type.clone(), - key: self.key.clone(), - label: self.label.clone(), + header: self.header.clone(), percent: clamped, message, }); @@ -101,10 +90,8 @@ impl ProgressReporter { /// with throughput-based extrapolation. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct EstimatedProgress { - pub task_id: i64, - pub task_type: String, - pub key: String, - pub label: String, + /// Common task identification fields. + pub header: TaskEventHeader, /// Executor-reported progress (0.0 to 1.0), if available. pub reported_percent: Option, /// Throughput-extrapolated progress (0.0 to 1.0), if history data exists. @@ -162,10 +149,7 @@ pub(crate) async fn extrapolate( let percent = reported.or(extrapolated).unwrap_or(0.0); EstimatedProgress { - task_id: record.id, - task_type: record.task_type.clone(), - key: record.key.clone(), - label: record.label.clone(), + header: record.event_header(), reported_percent: reported, extrapolated_percent: extrapolated, percent, diff --git a/src/scheduler/run_loop.rs b/src/scheduler/run_loop.rs index 66149c6..90d3faa 100644 --- a/src/scheduler/run_loop.rs +++ b/src/scheduler/run_loop.rs @@ -6,7 +6,7 @@ use std::sync::Arc; use tokio_util::sync::CancellationToken; use crate::store::StoreError; -use crate::task::TaskMetrics; +use crate::task::IoBudget; use super::dispatch::{self, SpawnContext}; use super::gate::GateContext; @@ -78,7 +78,7 @@ impl Scheduler { &format!("no executor registered for type '{}'", task.task_type), false, 0, - &TaskMetrics::default(), + &IoBudget::default(), ) .await?; return Ok(true); @@ -133,7 +133,7 @@ impl Scheduler { "no executor for finalize", false, 0, - &TaskMetrics::default(), + &IoBudget::default(), ) .await?; return Ok(true); diff --git a/src/scheduler/submit.rs b/src/scheduler/submit.rs index 7b7169a..36e1ade 100644 --- a/src/scheduler/submit.rs +++ b/src/scheduler/submit.rs @@ -74,7 +74,7 @@ impl Scheduler { /// /// Uses the priority from [`TypedTask::priority()`]. pub async fn submit_typed(&self, task: &T) -> Result { - let sub = TaskSubmission::from_typed(task)?; + let sub = TaskSubmission::from_typed(task); self.submit(&sub).await } @@ -87,7 +87,7 @@ impl Scheduler { task: &T, priority: Priority, ) -> Result { - let mut sub = TaskSubmission::from_typed(task)?; + let mut sub = TaskSubmission::from_typed(task); sub.priority = priority; self.submit(&sub).await } @@ -140,12 +140,7 @@ impl Scheduler { if let Some(at) = self.inner.active.remove(*child_id) { at.token.cancel(); let _ = self.inner.store.delete(*child_id).await; - let _ = self.inner.event_tx.send(SchedulerEvent::Cancelled { - task_id: *child_id, - task_type: at.record.task_type.clone(), - key: at.record.key.clone(), - label: at.record.label.clone(), - }); + let _ = self.inner.event_tx.send(SchedulerEvent::Cancelled(at.record.event_header())); } } @@ -153,12 +148,7 @@ impl Scheduler { if let Some(at) = self.inner.active.remove(task_id) { at.token.cancel(); self.inner.store.delete(task_id).await?; - let _ = self.inner.event_tx.send(SchedulerEvent::Cancelled { - task_id, - task_type: at.record.task_type.clone(), - key: at.record.key.clone(), - label: at.record.label.clone(), - }); + let _ = self.inner.event_tx.send(SchedulerEvent::Cancelled(at.record.event_header())); return Ok(true); } diff --git a/src/scheduler/tests.rs b/src/scheduler/tests.rs index 431e27b..feb92b6 100644 --- a/src/scheduler/tests.rs +++ b/src/scheduler/tests.rs @@ -195,13 +195,13 @@ async fn event_emitted_on_complete() { // Should get Dispatched event. let evt = rx.recv().await.unwrap(); - assert!(matches!(evt, SchedulerEvent::Dispatched { .. })); + assert!(matches!(evt, SchedulerEvent::Dispatched(..))); // Wait for completion. tokio::time::sleep(Duration::from_millis(50)).await; let evt = rx.recv().await.unwrap(); - assert!(matches!(evt, SchedulerEvent::Completed { .. })); + assert!(matches!(evt, SchedulerEvent::Completed(..))); } #[tokio::test] @@ -233,12 +233,8 @@ async fn submit_typed_enqueues_task() { impl crate::task::TypedTask for Thumb { const TASK_TYPE: &'static str = "test"; - fn expected_read_bytes(&self) -> i64 { - 4096 - } - - fn expected_write_bytes(&self) -> i64 { - 512 + fn expected_io(&self) -> crate::task::IoBudget { + crate::task::IoBudget::disk(4096, 512) } } @@ -258,8 +254,8 @@ async fn submit_typed_enqueues_task() { .unwrap() .expect("task should exist"); assert_eq!(record.task_type, "test"); - assert_eq!(record.expected_read_bytes, 4096); - assert_eq!(record.expected_write_bytes, 512); + assert_eq!(record.expected_io.disk_read, 4096); + assert_eq!(record.expected_io.disk_write, 512); // Payload round-trips. let recovered: Thumb = record.deserialize_payload().unwrap().unwrap(); @@ -580,7 +576,7 @@ async fn parent_auto_completes_after_children_finish() { let mut parent_completed = false; while tokio::time::Instant::now() < deadline { match tokio::time::timeout(Duration::from_millis(200), rx.recv()).await { - Ok(Ok(SchedulerEvent::Completed { task_type, .. })) if task_type == "parent" => { + Ok(Ok(SchedulerEvent::Completed(ref h))) if h.task_type == "parent" => { parent_completed = true; break; } @@ -641,7 +637,7 @@ async fn finalize_called_after_children_complete() { let deadline = tokio::time::Instant::now() + Duration::from_secs(5); while tokio::time::Instant::now() < deadline { match tokio::time::timeout(Duration::from_millis(100), rx.recv()).await { - Ok(Ok(SchedulerEvent::Completed { task_type, .. })) if task_type == "parent" => { + Ok(Ok(SchedulerEvent::Completed(ref h))) if h.task_type == "parent" => { break; } _ => {} diff --git a/src/store/hierarchy.rs b/src/store/hierarchy.rs index f6f38a1..e2f62c8 100644 --- a/src/store/hierarchy.rs +++ b/src/store/hierarchy.rs @@ -154,7 +154,7 @@ impl TaskStore { #[cfg(test)] mod tests { use crate::priority::Priority; - use crate::task::{ParentResolution, TaskMetrics, TaskStatus, TaskSubmission}; + use crate::task::{ParentResolution, IoBudget, TaskStatus, TaskSubmission}; use super::super::TaskStore; @@ -167,7 +167,7 @@ mod tests { .key(key) .priority(priority) .payload_raw(b"hello".to_vec()) - .expected_io(1000, 500) + .expected_io(IoBudget::disk(1000, 500)) } #[tokio::test] @@ -237,7 +237,7 @@ mod tests { store.submit(&child_sub).await.unwrap(); let child = store.pop_next().await.unwrap().unwrap(); store - .complete(child.id, &TaskMetrics::default()) + .complete(child.id, &IoBudget::default()) .await .unwrap(); @@ -261,7 +261,7 @@ mod tests { } let child = store.pop_next().await.unwrap().unwrap(); store - .complete(child.id, &TaskMetrics::default()) + .complete(child.id, &IoBudget::default()) .await .unwrap(); @@ -283,7 +283,7 @@ mod tests { store.submit(&child_sub).await.unwrap(); let child = store.pop_next().await.unwrap().unwrap(); store - .fail(child.id, "boom", false, 0, &TaskMetrics::default()) + .fail(child.id, "boom", false, 0, &IoBudget::default()) .await .unwrap(); @@ -333,7 +333,7 @@ mod tests { let child = store.pop_next().await.unwrap().unwrap(); store - .complete(child.id, &TaskMetrics::default()) + .complete(child.id, &IoBudget::default()) .await .unwrap(); @@ -354,7 +354,7 @@ mod tests { assert!(!task.fail_fast); store - .complete(task.id, &TaskMetrics::default()) + .complete(task.id, &IoBudget::default()) .await .unwrap(); diff --git a/src/store/lifecycle.rs b/src/store/lifecycle.rs index ad9e4be..9c5247a 100644 --- a/src/store/lifecycle.rs +++ b/src/store/lifecycle.rs @@ -1,6 +1,6 @@ //! Task lifecycle transitions: pop, complete, fail, pause, resume. -use crate::task::{TaskMetrics, TaskRecord}; +use crate::task::{IoBudget, TaskRecord}; use super::row_mapping::row_to_task_record; use super::{StoreError, TaskStore}; @@ -13,7 +13,7 @@ async fn insert_history( conn: &mut sqlx::pool::PoolConnection, task: &TaskRecord, status: &str, - metrics: &TaskMetrics, + metrics: &IoBudget, duration_ms: Option, last_error: Option<&str>, ) -> Result<(), StoreError> { @@ -36,14 +36,14 @@ async fn insert_history( .bind(task.priority.value() as i32) .bind(status) .bind(&task.payload) - .bind(task.expected_read_bytes) - .bind(task.expected_write_bytes) - .bind(task.expected_net_rx_bytes) - .bind(task.expected_net_tx_bytes) - .bind(metrics.read_bytes) - .bind(metrics.write_bytes) - .bind(metrics.net_rx_bytes) - .bind(metrics.net_tx_bytes) + .bind(task.expected_io.disk_read) + .bind(task.expected_io.disk_write) + .bind(task.expected_io.net_rx) + .bind(task.expected_io.net_tx) + .bind(metrics.disk_read) + .bind(metrics.disk_write) + .bind(metrics.net_rx) + .bind(metrics.net_tx) .bind(retry_count) .bind(last_error) .bind(task.created_at.format("%Y-%m-%d %H:%M:%S").to_string()) @@ -140,7 +140,7 @@ impl TaskStore { } /// Mark a task as completed and move it to history. - pub async fn complete(&self, id: i64, metrics: &TaskMetrics) -> Result<(), StoreError> { + pub async fn complete(&self, id: i64, metrics: &IoBudget) -> Result<(), StoreError> { tracing::debug!(task_id = id, "store.complete: BEGIN tx"); let mut conn = self.begin_write().await?; @@ -171,7 +171,7 @@ impl TaskStore { pub async fn complete_with_record( &self, task: &TaskRecord, - metrics: &TaskMetrics, + metrics: &IoBudget, ) -> Result<(), StoreError> { tracing::debug!(task_id = task.id, "store.complete_with_record: BEGIN tx"); let mut conn = self.begin_write().await?; @@ -191,7 +191,7 @@ impl TaskStore { async fn complete_inner( conn: &mut sqlx::pool::PoolConnection, task: &TaskRecord, - metrics: &TaskMetrics, + metrics: &IoBudget, ) -> Result<(), StoreError> { let duration_ms = compute_duration_ms(task); @@ -238,7 +238,7 @@ impl TaskStore { error: &str, retryable: bool, max_retries: i32, - metrics: &TaskMetrics, + metrics: &IoBudget, ) -> Result<(), StoreError> { tracing::debug!(task_id = id, "store.fail: BEGIN tx"); let mut conn = self.begin_write().await?; @@ -271,7 +271,7 @@ impl TaskStore { error: &str, retryable: bool, max_retries: i32, - metrics: &TaskMetrics, + metrics: &IoBudget, ) -> Result<(), StoreError> { tracing::debug!(task_id = task.id, "store.fail_with_record: BEGIN tx"); let mut conn = self.begin_write().await?; @@ -295,7 +295,7 @@ impl TaskStore { error: &str, retryable: bool, max_retries: i32, - metrics: &TaskMetrics, + metrics: &IoBudget, ) -> Result<(), StoreError> { if retryable && task.retry_count < max_retries { // Requeue with incremented retry count, same priority. @@ -345,7 +345,7 @@ impl TaskStore { #[cfg(test)] mod tests { use crate::priority::Priority; - use crate::task::{HistoryStatus, TaskMetrics, TaskStatus, TaskSubmission}; + use crate::task::{HistoryStatus, IoBudget, TaskStatus, TaskSubmission}; use super::super::TaskStore; @@ -358,7 +358,7 @@ mod tests { .key(key) .priority(priority) .payload_raw(b"hello".to_vec()) - .expected_io(1000, 500) + .expected_io(IoBudget::disk(1000, 500)) } #[tokio::test] @@ -398,11 +398,7 @@ mod tests { store .complete( task.id, - &TaskMetrics { - read_bytes: 2000, - write_bytes: 1000, - ..Default::default() - }, + &IoBudget::disk(2000, 1000), ) .await .unwrap(); @@ -412,7 +408,7 @@ mod tests { let hist = store.history_by_key(&key).await.unwrap(); assert_eq!(hist.len(), 1); assert_eq!(hist[0].status, HistoryStatus::Completed); - assert_eq!(hist[0].actual_read_bytes, Some(2000)); + assert_eq!(hist[0].actual_io.unwrap().disk_read, 2000); } #[tokio::test] @@ -424,7 +420,7 @@ mod tests { let task = store.pop_next().await.unwrap().unwrap(); store - .fail(task.id, "transient error", true, 3, &TaskMetrics::default()) + .fail(task.id, "transient error", true, 3, &IoBudget::default()) .await .unwrap(); @@ -443,7 +439,7 @@ mod tests { let task = store.pop_next().await.unwrap().unwrap(); store - .fail(task.id, "err1", true, 1, &TaskMetrics::default()) + .fail(task.id, "err1", true, 1, &IoBudget::default()) .await .unwrap(); let task = store.pop_next().await.unwrap().unwrap(); @@ -454,11 +450,7 @@ mod tests { "err2", true, 1, - &TaskMetrics { - read_bytes: 100, - write_bytes: 50, - ..Default::default() - }, + &IoBudget::disk(100, 50), ) .await .unwrap(); @@ -493,14 +485,18 @@ mod tests { async fn running_io_totals() { let store = test_store().await; - let mut sub = make_submission("io-1", Priority::NORMAL); - sub.expected_read_bytes = 5000; - sub.expected_write_bytes = 2000; + let sub = TaskSubmission::new("test") + .key("io-1") + .priority(Priority::NORMAL) + .payload_raw(b"hello".to_vec()) + .expected_io(IoBudget::disk(5000, 2000)); store.submit(&sub).await.unwrap(); - let mut sub2 = make_submission("io-2", Priority::NORMAL); - sub2.expected_read_bytes = 3000; - sub2.expected_write_bytes = 1000; + let sub2 = TaskSubmission::new("test") + .key("io-2") + .priority(Priority::NORMAL) + .payload_raw(b"hello".to_vec()) + .expected_io(IoBudget::disk(3000, 1000)); store.submit(&sub2).await.unwrap(); store.pop_next().await.unwrap(); @@ -518,7 +514,7 @@ mod tests { store.submit(&sub).await.unwrap(); let task = store.pop_next().await.unwrap().unwrap(); store - .complete(task.id, &TaskMetrics::default()) + .complete(task.id, &IoBudget::default()) .await .unwrap(); diff --git a/src/store/mod.rs b/src/store/mod.rs index abf4074..4738634 100644 --- a/src/store/mod.rs +++ b/src/store/mod.rs @@ -109,7 +109,7 @@ impl Default for StoreConfig { /// ```no_run /// # async fn example() -> Result<(), taskmill::store::StoreError> { /// use taskmill::store::TaskStore; -/// use taskmill::task::{TaskSubmission, TaskMetrics, TaskStatus}; +/// use taskmill::task::{TaskSubmission, IoBudget, TaskStatus}; /// use taskmill::priority::Priority; /// /// let store = TaskStore::open_memory().await?; @@ -118,7 +118,7 @@ impl Default for StoreConfig { /// let sub = TaskSubmission::new("thumbnail") /// .key("photo-1") /// .payload_raw(br#"{"path":"/a.jpg"}"#.to_vec()) -/// .expected_io(4096, 1024); +/// .expected_io(IoBudget::disk(4096, 1024)); /// let outcome = store.submit(&sub).await?; /// assert!(outcome.is_inserted()); /// @@ -127,7 +127,7 @@ impl Default for StoreConfig { /// assert_eq!(task.status, TaskStatus::Running); /// /// // Complete it — moves to history. -/// store.complete(task.id, &TaskMetrics { read_bytes: 4096, write_bytes: 1024, ..Default::default() }).await?; +/// store.complete(task.id, &IoBudget::disk(4096, 1024)).await?; /// assert!(store.task_by_id(task.id).await?.is_none()); // gone from active queue /// # Ok(()) /// # } diff --git a/src/store/query.rs b/src/store/query.rs index 129ea4e..bc2d86a 100644 --- a/src/store/query.rs +++ b/src/store/query.rs @@ -294,7 +294,7 @@ impl TaskStore { #[cfg(test)] mod tests { use crate::priority::Priority; - use crate::task::{HistoryStatus, TaskLookup, TaskMetrics, TaskStatus, TaskSubmission}; + use crate::task::{HistoryStatus, IoBudget, TaskLookup, TaskStatus, TaskSubmission}; use super::super::TaskStore; @@ -307,7 +307,7 @@ mod tests { .key(key) .priority(priority) .payload_raw(b"hello".to_vec()) - .expected_io(1000, 500) + .expected_io(IoBudget::disk(1000, 500)) } #[tokio::test] @@ -333,11 +333,7 @@ mod tests { store .complete( task.id, - &TaskMetrics { - read_bytes: 100, - write_bytes: 50, - ..Default::default() - }, + &IoBudget::disk(100, 50), ) .await .unwrap(); @@ -348,7 +344,7 @@ mod tests { let record = store.history_by_id(hist_id).await.unwrap().unwrap(); assert_eq!(record.key, sub.effective_key()); - assert_eq!(record.actual_read_bytes, Some(100)); + assert_eq!(record.actual_io.unwrap().disk_read, 100); assert!(store.history_by_id(9999).await.unwrap().is_none()); } @@ -364,11 +360,7 @@ mod tests { store .complete( task.id, - &TaskMetrics { - read_bytes: 1000, - write_bytes: 500, - ..Default::default() - }, + &IoBudget::disk(1000, 500), ) .await .unwrap(); @@ -423,7 +415,7 @@ mod tests { store.submit(&sub).await.unwrap(); let task = store.pop_next().await.unwrap().unwrap(); store - .complete(task.id, &TaskMetrics::default()) + .complete(task.id, &IoBudget::default()) .await .unwrap(); @@ -450,7 +442,7 @@ mod tests { store.submit(&sub).await.unwrap(); let task = store.pop_next().await.unwrap().unwrap(); store - .complete(task.id, &TaskMetrics::default()) + .complete(task.id, &IoBudget::default()) .await .unwrap(); } diff --git a/src/store/row_mapping.rs b/src/store/row_mapping.rs index 1ca4661..fec7748 100644 --- a/src/store/row_mapping.rs +++ b/src/store/row_mapping.rs @@ -4,7 +4,7 @@ use chrono::{DateTime, Utc}; use sqlx::Row; use crate::priority::Priority; -use crate::task::{HistoryStatus, TaskHistoryRecord, TaskRecord, TaskStatus}; +use crate::task::{HistoryStatus, IoBudget, TaskHistoryRecord, TaskRecord, TaskStatus}; pub(crate) fn parse_datetime(s: &str) -> DateTime { // SQLite stores as "YYYY-MM-DD HH:MM:SS". Parse with chrono. @@ -32,10 +32,12 @@ pub(crate) fn row_to_task_record(row: &sqlx::sqlite::SqliteRow) -> TaskRecord { priority: Priority::new(priority_val as u8), status: status_str.parse().unwrap_or(TaskStatus::Pending), payload: row.get("payload"), - expected_read_bytes: row.get("expected_read_bytes"), - expected_write_bytes: row.get("expected_write_bytes"), - expected_net_rx_bytes: row.get("expected_net_rx_bytes"), - expected_net_tx_bytes: row.get("expected_net_tx_bytes"), + expected_io: IoBudget { + disk_read: row.get("expected_read_bytes"), + disk_write: row.get("expected_write_bytes"), + net_rx: row.get("expected_net_rx_bytes"), + net_tx: row.get("expected_net_tx_bytes"), + }, retry_count: row.get("retry_count"), last_error: row.get("last_error"), created_at: parse_datetime(&created_at_str), @@ -57,6 +59,18 @@ pub(crate) fn row_to_history_record(row: &sqlx::sqlite::SqliteRow) -> TaskHistor let parent_id: Option = row.get("parent_id"); let fail_fast_val: i32 = row.get("fail_fast"); + // Actual IO: if all four columns are NULL, return None; otherwise construct IoBudget. + let actual_read: Option = row.get("actual_read_bytes"); + let actual_write: Option = row.get("actual_write_bytes"); + let actual_rx: Option = row.get("actual_net_rx_bytes"); + let actual_tx: Option = row.get("actual_net_tx_bytes"); + let actual_io = actual_read.map(|dr| IoBudget { + disk_read: dr, + disk_write: actual_write.unwrap_or(0), + net_rx: actual_rx.unwrap_or(0), + net_tx: actual_tx.unwrap_or(0), + }); + TaskHistoryRecord { id: row.get("id"), task_type: row.get("task_type"), @@ -65,14 +79,13 @@ pub(crate) fn row_to_history_record(row: &sqlx::sqlite::SqliteRow) -> TaskHistor priority: Priority::new(priority_val as u8), status: status_str.parse().unwrap_or(HistoryStatus::Failed), payload: row.get("payload"), - expected_read_bytes: row.get("expected_read_bytes"), - expected_write_bytes: row.get("expected_write_bytes"), - expected_net_rx_bytes: row.get("expected_net_rx_bytes"), - expected_net_tx_bytes: row.get("expected_net_tx_bytes"), - actual_read_bytes: row.get("actual_read_bytes"), - actual_write_bytes: row.get("actual_write_bytes"), - actual_net_rx_bytes: row.get("actual_net_rx_bytes"), - actual_net_tx_bytes: row.get("actual_net_tx_bytes"), + expected_io: IoBudget { + disk_read: row.get("expected_read_bytes"), + disk_write: row.get("expected_write_bytes"), + net_rx: row.get("expected_net_rx_bytes"), + net_tx: row.get("expected_net_tx_bytes"), + }, + actual_io, retry_count: row.get("retry_count"), last_error: row.get("last_error"), created_at: parse_datetime(&created_at_str), diff --git a/src/store/submit.rs b/src/store/submit.rs index 32bc763..11a6d98 100644 --- a/src/store/submit.rs +++ b/src/store/submit.rs @@ -15,6 +15,10 @@ pub(crate) async fn submit_one( conn: &mut sqlx::pool::PoolConnection, sub: &TaskSubmission, ) -> Result { + if let Some(ref err) = sub.payload_error { + return Err(StoreError::Serialization(err.clone())); + } + let key = sub.effective_key(); let priority = sub.priority.value() as i32; let fail_fast_val: i32 = if sub.fail_fast { 1 } else { 0 }; @@ -28,10 +32,10 @@ pub(crate) async fn submit_one( .bind(&sub.label) .bind(priority) .bind(&sub.payload) - .bind(sub.expected_read_bytes) - .bind(sub.expected_write_bytes) - .bind(sub.expected_net_rx_bytes) - .bind(sub.expected_net_tx_bytes) + .bind(sub.expected_io.disk_read) + .bind(sub.expected_io.disk_write) + .bind(sub.expected_io.net_rx) + .bind(sub.expected_io.net_tx) .bind(sub.parent_id) .bind(fail_fast_val) .bind(&sub.group_key) @@ -137,7 +141,7 @@ impl TaskStore { #[cfg(test)] mod tests { use crate::priority::Priority; - use crate::task::{SubmitOutcome, TaskMetrics, TaskSubmission, MAX_PAYLOAD_BYTES}; + use crate::task::{IoBudget, SubmitOutcome, TaskSubmission, MAX_PAYLOAD_BYTES}; use super::super::TaskStore; @@ -150,7 +154,7 @@ mod tests { .key(key) .priority(priority) .payload_raw(b"hello".to_vec()) - .expected_io(1000, 500) + .expected_io(IoBudget::disk(1000, 500)) } #[tokio::test] @@ -223,7 +227,7 @@ mod tests { assert_eq!(running.requeue_priority, Some(Priority::HIGH)); store - .complete(task.id, &TaskMetrics::default()) + .complete(task.id, &IoBudget::default()) .await .unwrap(); @@ -286,7 +290,7 @@ mod tests { store.submit(&sub_high).await.unwrap(); store - .fail(task.id, "boom", false, 0, &TaskMetrics::default()) + .fail(task.id, "boom", false, 0, &IoBudget::default()) .await .unwrap(); diff --git a/src/task/mod.rs b/src/task/mod.rs index 1341f8f..5d72263 100644 --- a/src/task/mod.rs +++ b/src/task/mod.rs @@ -107,12 +107,8 @@ pub struct TaskRecord { pub priority: Priority, pub status: TaskStatus, pub payload: Option>, - pub expected_read_bytes: i64, - pub expected_write_bytes: i64, - /// Estimated network receive bytes for IO budget scheduling. - pub expected_net_rx_bytes: i64, - /// Estimated network transmit bytes for IO budget scheduling. - pub expected_net_tx_bytes: i64, + /// Expected IO budget declared at submission. + pub expected_io: IoBudget, pub retry_count: i32, pub last_error: Option, pub created_at: DateTime, @@ -142,6 +138,16 @@ impl TaskRecord { None => Ok(None), } } + + /// Build a [`TaskEventHeader`](crate::scheduler::event::TaskEventHeader) from this record. + pub fn event_header(&self) -> crate::scheduler::event::TaskEventHeader { + crate::scheduler::event::TaskEventHeader { + task_id: self.id, + task_type: self.task_type.clone(), + key: self.key.clone(), + label: self.label.clone(), + } + } } /// A task that has completed or permanently failed. @@ -155,18 +161,10 @@ pub struct TaskHistoryRecord { pub priority: Priority, pub status: HistoryStatus, pub payload: Option>, - pub expected_read_bytes: i64, - pub expected_write_bytes: i64, - /// Estimated network receive bytes declared at submission. - pub expected_net_rx_bytes: i64, - /// Estimated network transmit bytes declared at submission. - pub expected_net_tx_bytes: i64, - pub actual_read_bytes: Option, - pub actual_write_bytes: Option, - /// Actual network receive bytes reported by the executor. - pub actual_net_rx_bytes: Option, - /// Actual network transmit bytes reported by the executor. - pub actual_net_tx_bytes: Option, + /// Expected IO budget declared at submission. + pub expected_io: IoBudget, + /// Actual IO recorded by the executor, if available. + pub actual_io: Option, pub retry_count: i32, pub last_error: Option, pub created_at: DateTime, @@ -181,23 +179,41 @@ pub struct TaskHistoryRecord { pub group_key: Option, } -/// Accumulated IO metrics captured by the scheduler after an executor finishes. +/// IO budget for a task: expected or actual disk and network IO bytes. /// -/// Executors report metrics incrementally via [`TaskContext::record_read_bytes`](crate::TaskContext::record_read_bytes), -/// [`record_write_bytes`](crate::TaskContext::record_write_bytes), -/// [`record_net_rx_bytes`](crate::TaskContext::record_net_rx_bytes), and -/// [`record_net_tx_bytes`](crate::TaskContext::record_net_tx_bytes). -/// This struct is the snapshot read by the scheduler — executors never construct it directly. -#[derive(Debug, Clone, Default, Serialize, Deserialize)] -pub struct TaskMetrics { - /// Actual disk bytes read during execution. - pub read_bytes: i64, - /// Actual disk bytes written during execution. - pub write_bytes: i64, - /// Actual network bytes received during execution. - pub net_rx_bytes: i64, - /// Actual network bytes transmitted during execution. - pub net_tx_bytes: i64, +/// Used in [`TaskSubmission`] for expected IO (scheduling), [`TaskRecord`] for +/// persisted expectations, and as the snapshot returned by the IO tracker after +/// execution completes. +#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)] +pub struct IoBudget { + /// Disk bytes read. + pub disk_read: i64, + /// Disk bytes written. + pub disk_write: i64, + /// Network bytes received. + pub net_rx: i64, + /// Network bytes transmitted. + pub net_tx: i64, +} + +impl IoBudget { + /// Create an `IoBudget` with only disk IO set. + pub fn disk(read: i64, write: i64) -> Self { + Self { + disk_read: read, + disk_write: write, + ..Default::default() + } + } + + /// Create an `IoBudget` with only network IO set. + pub fn net(rx: i64, tx: i64) -> Self { + Self { + net_rx: rx, + net_tx: tx, + ..Default::default() + } + } } /// Unified lookup result for querying a task by its dedup inputs. diff --git a/src/task/submission.rs b/src/task/submission.rs index b2c06b4..6505017 100644 --- a/src/task/submission.rs +++ b/src/task/submission.rs @@ -6,6 +6,7 @@ use crate::priority::Priority; use super::dedup::generate_dedup_key; use super::typed::TypedTask; +use super::IoBudget; /// Result of a task submission attempt. #[derive(Debug, Clone, PartialEq, Eq)] @@ -41,13 +42,13 @@ impl SubmitOutcome { /// construction with sensible defaults: /// /// ```ignore -/// use taskmill::{TaskSubmission, Priority}; +/// use taskmill::{TaskSubmission, Priority, IoBudget}; /// /// let sub = TaskSubmission::new("thumbnail") /// .key("img-001") /// .priority(Priority::HIGH) /// .payload_json(&my_payload)? -/// .expected_io(4096, 1024); +/// .expected_io(IoBudget::disk(4096, 1024)); /// ``` /// /// For strongly-typed tasks, prefer [`TaskSubmission::from_typed`] or @@ -64,12 +65,8 @@ pub struct TaskSubmission { pub label: String, pub priority: Priority, pub payload: Option>, - pub expected_read_bytes: i64, - pub expected_write_bytes: i64, - /// Estimated network receive bytes for IO budget scheduling. - pub expected_net_rx_bytes: i64, - /// Estimated network transmit bytes for IO budget scheduling. - pub expected_net_tx_bytes: i64, + /// Expected IO budget for scheduling. + pub expected_io: IoBudget, /// Parent task ID for hierarchical tasks. Set automatically by /// [`TaskContext::spawn_child`](crate::TaskContext::spawn_child). pub parent_id: Option, @@ -84,6 +81,10 @@ pub struct TaskSubmission { /// by [`SchedulerBuilder::group_concurrency`](crate::SchedulerBuilder::group_concurrency). /// Use this to prevent hammering a single endpoint (e.g. `"s3://my-bucket"`). pub group_key: Option, + /// Deferred serialization error from [`payload_json`](Self::payload_json). + /// Surfaced at submit time as [`StoreError::Serialization`](crate::StoreError::Serialization). + #[serde(skip)] + pub(crate) payload_error: Option, } impl TaskSubmission { @@ -98,8 +99,8 @@ impl TaskSubmission { /// TaskSubmission::new("resize") /// .key("my-file.jpg") /// .priority(Priority::HIGH) - /// .payload_json(&data)? - /// .expected_io(4096, 1024) + /// .payload_json(&data) + /// .expected_io(IoBudget::disk(4096, 1024)) /// ``` pub fn new(task_type: impl Into) -> Self { let task_type = task_type.into(); @@ -110,13 +111,11 @@ impl TaskSubmission { label, priority: Priority::NORMAL, payload: None, - expected_read_bytes: 0, - expected_write_bytes: 0, - expected_net_rx_bytes: 0, - expected_net_tx_bytes: 0, + expected_io: IoBudget::default(), parent_id: None, fail_fast: true, group_key: None, + payload_error: None, } } @@ -151,14 +150,15 @@ impl TaskSubmission { /// Set the payload from a serializable value (JSON-encoded). /// - /// Returns an error if serialization fails. The payload can be - /// deserialized in the executor via [`TaskContext::payload`](crate::TaskContext::payload). - pub fn payload_json( - mut self, - data: &T, - ) -> Result { - self.payload = Some(serde_json::to_vec(data)?); - Ok(self) + /// Serialization errors are deferred to submit time so the builder + /// chain is never interrupted. The payload can be deserialized in + /// the executor via [`TaskContext::payload`](crate::TaskContext::payload). + pub fn payload_json(mut self, data: &T) -> Self { + match serde_json::to_vec(data) { + Ok(bytes) => self.payload = Some(bytes), + Err(e) => self.payload_error = Some(e.to_string()), + } + self } /// Set the payload from raw bytes. @@ -167,25 +167,9 @@ impl TaskSubmission { self } - /// Set expected disk IO bytes for budget-based scheduling. - /// - /// The scheduler uses these estimates to avoid saturating disk throughput - /// when [resource monitoring](crate::SchedulerBuilder::with_resource_monitoring) - /// is enabled. Default: 0 for both. - pub fn expected_io(mut self, read_bytes: i64, write_bytes: i64) -> Self { - self.expected_read_bytes = read_bytes; - self.expected_write_bytes = write_bytes; - self - } - - /// Set expected network IO bytes for budget-based scheduling. - /// - /// The scheduler uses these estimates to avoid saturating network bandwidth - /// when [resource monitoring](crate::SchedulerBuilder::with_resource_monitoring) - /// is enabled. Default: 0 for both. - pub fn expected_net_io(mut self, rx_bytes: i64, tx_bytes: i64) -> Self { - self.expected_net_rx_bytes = rx_bytes; - self.expected_net_tx_bytes = tx_bytes; + /// Set the expected IO budget for scheduling. + pub fn expected_io(mut self, budget: IoBudget) -> Self { + self.expected_io = budget; self } @@ -220,36 +204,23 @@ impl TaskSubmission { } } - /// Create a submission with a typed payload serialized to JSON bytes. - /// - /// The dedup key is auto-generated from the task type and serialized payload. - /// Use `TaskRecord::deserialize_payload()` on the executor side to recover the type. - #[deprecated( - since = "2.0.0", - note = "use `TaskSubmission::new(task_type).payload_json(&data)?.priority(p).expected_io(r, w)` instead" - )] - pub fn with_payload( - task_type: &str, - priority: Priority, - data: &T, - expected_read_bytes: i64, - expected_write_bytes: i64, - ) -> Result { - Ok(Self::new(task_type) - .priority(priority) - .payload_json(data)? - .expected_io(expected_read_bytes, expected_write_bytes)) - } - /// Create a submission from a [`TypedTask`], serializing the payload and - /// pulling task type, priority, IO estimates, and group key from the trait. - pub fn from_typed(task: &T) -> Result { + /// pulling task type, priority, IO estimates, key, label, and group key + /// from the trait. + pub fn from_typed(task: &T) -> Self { let mut sub = Self::new(T::TASK_TYPE) .priority(task.priority()) - .payload_json(task)? - .expected_io(task.expected_read_bytes(), task.expected_write_bytes()) - .expected_net_io(task.expected_net_rx_bytes(), task.expected_net_tx_bytes()); - sub.group_key = task.group_key(); - Ok(sub) + .payload_json(task) + .expected_io(task.expected_io()); + if let Some(k) = task.key() { + sub = sub.key(k); + } + if let Some(l) = task.label() { + sub = sub.label(l); + } + if let Some(g) = task.group_key() { + sub = sub.group(g); + } + sub } } diff --git a/src/task/tests.rs b/src/task/tests.rs index d1585d8..4a03fde 100644 --- a/src/task/tests.rs +++ b/src/task/tests.rs @@ -2,7 +2,7 @@ use serde::{Deserialize, Serialize}; use crate::priority::Priority; -use super::{TaskSubmission, TypedTask}; +use super::{IoBudget, TaskSubmission, TypedTask}; #[derive(Serialize, Deserialize, Debug, PartialEq)] struct Thumbnail { @@ -13,12 +13,8 @@ struct Thumbnail { impl TypedTask for Thumbnail { const TASK_TYPE: &'static str = "thumbnail"; - fn expected_read_bytes(&self) -> i64 { - 4096 - } - - fn expected_write_bytes(&self) -> i64 { - 1024 + fn expected_io(&self) -> IoBudget { + IoBudget::disk(4096, 1024) } } @@ -28,12 +24,12 @@ fn typed_task_to_submission() { path: "/photos/a.jpg".into(), size: 256, }; - let sub = TaskSubmission::from_typed(&task).unwrap(); + let sub = TaskSubmission::from_typed(&task); assert_eq!(sub.task_type, "thumbnail"); assert_eq!(sub.priority, Priority::NORMAL); - assert_eq!(sub.expected_read_bytes, 4096); - assert_eq!(sub.expected_write_bytes, 1024); + assert_eq!(sub.expected_io.disk_read, 4096); + assert_eq!(sub.expected_io.disk_write, 1024); assert!(sub.dedup_key.is_none()); // Payload round-trips correctly. @@ -56,7 +52,7 @@ fn typed_task_custom_priority() { } } - let sub = TaskSubmission::from_typed(&Urgent { id: 42 }).unwrap(); + let sub = TaskSubmission::from_typed(&Urgent { id: 42 }); assert_eq!(sub.priority, Priority::HIGH); assert_eq!(sub.task_type, "urgent"); } @@ -70,11 +66,8 @@ fn typed_task_defaults() { const TASK_TYPE: &'static str = "minimal"; } - let sub = TaskSubmission::from_typed(&Minimal).unwrap(); - assert_eq!(sub.expected_read_bytes, 0); - assert_eq!(sub.expected_write_bytes, 0); - assert_eq!(sub.expected_net_rx_bytes, 0); - assert_eq!(sub.expected_net_tx_bytes, 0); + let sub = TaskSubmission::from_typed(&Minimal); + assert_eq!(sub.expected_io, IoBudget::default()); assert_eq!(sub.priority, Priority::NORMAL); assert!(sub.group_key.is_none()); } @@ -90,8 +83,8 @@ fn typed_task_with_network_and_group() { impl TypedTask for S3Upload { const TASK_TYPE: &'static str = "s3-upload"; - fn expected_net_tx_bytes(&self) -> i64 { - self.size + fn expected_io(&self) -> IoBudget { + IoBudget::net(0, self.size) } fn group_key(&self) -> Option { @@ -103,18 +96,49 @@ fn typed_task_with_network_and_group() { bucket: "my-bucket".into(), size: 10_000_000, }; - let sub = TaskSubmission::from_typed(&task).unwrap(); - assert_eq!(sub.expected_net_tx_bytes, 10_000_000); - assert_eq!(sub.expected_net_rx_bytes, 0); + let sub = TaskSubmission::from_typed(&task); + assert_eq!(sub.expected_io.net_tx, 10_000_000); + assert_eq!(sub.expected_io.net_rx, 0); assert_eq!(sub.group_key.as_deref(), Some("s3://my-bucket")); } #[test] -fn submission_builder_net_io_and_group() { +fn submission_builder_io_and_group() { let sub = TaskSubmission::new("upload") - .expected_net_io(5000, 10000) + .expected_io(IoBudget { + net_rx: 5000, + net_tx: 10000, + ..Default::default() + }) .group("s3://bucket-a"); - assert_eq!(sub.expected_net_rx_bytes, 5000); - assert_eq!(sub.expected_net_tx_bytes, 10000); + assert_eq!(sub.expected_io.net_rx, 5000); + assert_eq!(sub.expected_io.net_tx, 10000); assert_eq!(sub.group_key.as_deref(), Some("s3://bucket-a")); } + +#[test] +fn typed_task_key_and_label() { + #[derive(Serialize, Deserialize)] + struct FileTask { + path: String, + } + + impl TypedTask for FileTask { + const TASK_TYPE: &'static str = "file-task"; + + fn key(&self) -> Option { + Some(self.path.clone()) + } + + fn label(&self) -> Option { + Some(format!("Process {}", self.path)) + } + } + + let task = FileTask { + path: "/a.txt".into(), + }; + let sub = TaskSubmission::from_typed(&task); + assert_eq!(sub.dedup_key.as_deref(), Some("/a.txt")); + assert_eq!(sub.label, "Process /a.txt"); +} diff --git a/src/task/typed.rs b/src/task/typed.rs index 74a1fa7..53b36ee 100644 --- a/src/task/typed.rs +++ b/src/task/typed.rs @@ -5,10 +5,12 @@ use serde::Serialize; use crate::priority::Priority; +use super::IoBudget; + /// A strongly-typed task that bundles serialization, task type name, and default /// IO estimates. /// -/// Implementing this trait collapses the 6 fields of [`TaskSubmission`](super::TaskSubmission) into a +/// Implementing this trait collapses the fields of [`TaskSubmission`](super::TaskSubmission) into a /// derive-friendly pattern. Use [`Scheduler::submit_typed`](crate::Scheduler::submit_typed) /// to submit and [`TaskContext::payload`](crate::TaskContext::payload) on the /// executor side to deserialize. Each `TypedTask` must have a corresponding @@ -19,44 +21,38 @@ use crate::priority::Priority; /// /// ```ignore /// use serde::{Serialize, Deserialize}; -/// use taskmill::{TypedTask, Priority}; +/// use taskmill::{TypedTask, IoBudget, Priority}; /// /// #[derive(Serialize, Deserialize)] /// struct Thumbnail { path: String, size: u32 } /// /// impl TypedTask for Thumbnail { /// const TASK_TYPE: &'static str = "thumbnail"; -/// fn expected_read_bytes(&self) -> i64 { 4096 } -/// fn expected_write_bytes(&self) -> i64 { 1024 } +/// fn expected_io(&self) -> IoBudget { IoBudget::disk(4096, 1024) } /// } /// ``` pub trait TypedTask: Serialize + DeserializeOwned + Send + 'static { /// Unique name used to register and look up the executor. const TASK_TYPE: &'static str; - /// Estimated bytes this task will read from disk. Default: 0. - fn expected_read_bytes(&self) -> i64 { - 0 - } - - /// Estimated bytes this task will write to disk. Default: 0. - fn expected_write_bytes(&self) -> i64 { - 0 + /// Expected IO budget for this task. Default: zero. + fn expected_io(&self) -> IoBudget { + IoBudget::default() } - /// Estimated bytes this task will receive over the network. Default: 0. - fn expected_net_rx_bytes(&self) -> i64 { - 0 + /// Scheduling priority. Default: [`Priority::NORMAL`]. + fn priority(&self) -> Priority { + Priority::NORMAL } - /// Estimated bytes this task will transmit over the network. Default: 0. - fn expected_net_tx_bytes(&self) -> i64 { - 0 + /// Optional dedup key. Default: `None` (payload hash used). + fn key(&self) -> Option { + None } - /// Scheduling priority. Default: [`Priority::NORMAL`]. - fn priority(&self) -> Priority { - Priority::NORMAL + /// Optional human-readable label. Default: `None` (derived from key or task type). + fn label(&self) -> Option { + None } /// Optional group key for per-group concurrency limiting. Default: `None`. diff --git a/tests/integration.rs b/tests/integration.rs index e858e7b..79a7750 100644 --- a/tests/integration.rs +++ b/tests/integration.rs @@ -236,8 +236,8 @@ async fn priority_ordering_dispatches_highest_first() { // Drain dispatched events. while let Ok(evt) = rx.try_recv() { - if let SchedulerEvent::Dispatched { label, .. } = evt { - dispatch_order.push(label); + if let SchedulerEvent::Dispatched(ref h) = evt { + dispatch_order.push(h.label.clone()); } } } @@ -284,7 +284,7 @@ async fn retryable_error_retries_then_succeeds() { // Wait for completion. let deadline = tokio::time::Instant::now() + Duration::from_secs(5); let completed = wait_for_event(&mut rx, deadline, |evt| { - matches!(evt, SchedulerEvent::Completed { .. }) + matches!(evt, SchedulerEvent::Completed(..)) }) .await; @@ -405,10 +405,10 @@ async fn preemption_resumes_after_preemptor_completes() { while tokio::time::Instant::now() < deadline && !(saw_preempted && saw_urgent_complete) { match tokio::time::timeout(Duration::from_millis(100), rx.recv()).await { - Ok(Ok(SchedulerEvent::Preempted { label, .. })) if label == "bg-work" => { + Ok(Ok(SchedulerEvent::Preempted(ref h))) if h.label == "bg-work" => { saw_preempted = true; } - Ok(Ok(SchedulerEvent::Completed { label, .. })) if label == "urgent" => { + Ok(Ok(SchedulerEvent::Completed(ref h))) if h.label == "urgent" => { saw_urgent_complete = true; } _ => {} @@ -566,7 +566,7 @@ async fn group_concurrency_limits_dispatch() { let deadline = tokio::time::Instant::now() + Duration::from_secs(5); let mut completed = 0; while tokio::time::Instant::now() < deadline && completed < 5 { - if let Ok(Ok(SchedulerEvent::Completed { .. })) = + if let Ok(Ok(SchedulerEvent::Completed(..))) = tokio::time::timeout(Duration::from_millis(100), rx.recv()).await { completed += 1; @@ -626,7 +626,7 @@ async fn run_loop_processes_queue_to_completion() { let deadline = tokio::time::Instant::now() + Duration::from_secs(5); let mut completed = 0; while tokio::time::Instant::now() < deadline && completed < 20 { - if let Ok(Ok(SchedulerEvent::Completed { .. })) = + if let Ok(Ok(SchedulerEvent::Completed(..))) = tokio::time::timeout(Duration::from_millis(100), rx.recv()).await { completed += 1; @@ -684,7 +684,7 @@ async fn concurrent_tasks_respect_max_concurrency() { let deadline = tokio::time::Instant::now() + Duration::from_secs(5); let mut completed = 0; while tokio::time::Instant::now() < deadline && completed < 10 { - if let Ok(Ok(SchedulerEvent::Completed { .. })) = + if let Ok(Ok(SchedulerEvent::Completed(..))) = tokio::time::timeout(Duration::from_millis(100), rx.recv()).await { completed += 1; @@ -749,7 +749,7 @@ async fn fail_fast_cancels_siblings_on_child_failure() { let parent_failed = wait_for_event( &mut rx, deadline, - |evt| matches!(evt, SchedulerEvent::Failed { task_type, .. } if task_type == "parent"), + |evt| matches!(evt, SchedulerEvent::Failed { ref header, .. } if header.task_type == "parent"), ) .await; @@ -805,7 +805,7 @@ async fn non_fail_fast_waits_for_all_children() { let parent_completed = wait_for_event( &mut rx, deadline, - |evt| matches!(evt, SchedulerEvent::Completed { task_type, .. } if task_type == "parent"), + |evt| matches!(evt, SchedulerEvent::Completed(ref h) if h.task_type == "parent"), ) .await; @@ -918,8 +918,9 @@ async fn io_metrics_recorded_in_history() { let key = taskmill::generate_dedup_key("test", Some(b"io-track")); let history = sched.store().history_by_key(&key).await.unwrap(); assert_eq!(history.len(), 1); - assert_eq!(history[0].actual_read_bytes, Some(4096)); - assert_eq!(history[0].actual_write_bytes, Some(1024)); + let actual = history[0].actual_io.unwrap(); + assert_eq!(actual.disk_read, 4096); + assert_eq!(actual.disk_write, 1024); } // ═══════════════════════════════════════════════════════════════════ From 3a007ecfe9fa58bb96f5a0971717a9e8d73a2015 Mon Sep 17 00:00:00 2001 From: DJ Majumdar Date: Sat, 14 Mar 2026 10:16:30 -0700 Subject: [PATCH 2/3] docs: add standalone 0.3.x to 0.4.0 migration guide --- docs/migrating-to-0.4.md | 118 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 118 insertions(+) create mode 100644 docs/migrating-to-0.4.md diff --git a/docs/migrating-to-0.4.md b/docs/migrating-to-0.4.md new file mode 100644 index 0000000..ecf92ce --- /dev/null +++ b/docs/migrating-to-0.4.md @@ -0,0 +1,118 @@ +# Migrating from 0.3.x to 0.4.0 + +This guide covers the breaking API changes in taskmill 0.4.0. All changes are API-level — database columns are unchanged, so existing data is fully compatible. + +## `IoBudget` replaces scattered IO fields + +The four separate IO byte fields on `TypedTask`, `TaskSubmission`, `TaskRecord`, and `TaskHistoryRecord` have been consolidated into a single `IoBudget` struct. `TaskMetrics` has been removed — use `IoBudget` everywhere instead. + +**Before:** +```rust +// TypedTask: 4 separate methods +impl TypedTask for MyTask { + const TASK_TYPE: &'static str = "my-task"; + fn expected_read_bytes(&self) -> i64 { 4096 } + fn expected_write_bytes(&self) -> i64 { 1024 } + fn expected_net_rx_bytes(&self) -> i64 { 0 } + fn expected_net_tx_bytes(&self) -> i64 { 0 } +} + +// TaskSubmission: two builder methods +TaskSubmission::new("upload") + .expected_io(4096, 1024) + .expected_net_io(0, 8192) + +// Accessing fields on TaskRecord / TaskHistoryRecord +record.expected_read_bytes +history.actual_read_bytes +``` + +**After:** +```rust +// TypedTask: single method returning IoBudget +impl TypedTask for MyTask { + const TASK_TYPE: &'static str = "my-task"; + fn expected_io(&self) -> IoBudget { IoBudget::disk(4096, 1024) } +} + +// TaskSubmission: single builder method +TaskSubmission::new("upload") + .expected_io(IoBudget { disk_write: 1024, net_tx: 8192, ..Default::default() }) + +// Accessing fields on TaskRecord / TaskHistoryRecord +record.expected_io.disk_read +history.actual_io.map(|io| io.disk_read) +``` + +`IoBudget` provides two convenience constructors: +- `IoBudget::disk(read, write)` — sets disk fields, zeroes network +- `IoBudget::net(rx, tx)` — sets network fields, zeroes disk + +The `TaskContext` recording methods (`record_read_bytes`, `record_write_bytes`, etc.) are unchanged. + +## `TypedTask` now supports `key()` and `label()` + +Two new optional default methods allow typed tasks to declare their own dedup key and UI label: + +```rust +impl TypedTask for MyTask { + const TASK_TYPE: &'static str = "my-task"; + fn key(&self) -> Option { Some(self.file_path.clone()) } + fn label(&self) -> Option { Some(format!("Process {}", self.file_path)) } +} +``` + +When `None` (the default), behavior is unchanged — key is derived from payload hash, label from task type. Existing `TypedTask` impls require no changes. + +## `SchedulerEvent` uses `TaskEventHeader` + +Per-task event variants now carry a `TaskEventHeader` struct instead of repeating `task_id`, `task_type`, `key`, and `label` as individual fields. + +**Before:** +```rust +match event { + SchedulerEvent::Completed { task_id, task_type, key, label } => { ... } + SchedulerEvent::Failed { task_id, label, error, will_retry, .. } => { ... } + SchedulerEvent::Progress { task_id, percent, message, .. } => { ... } +} +``` + +**After:** +```rust +match event { + SchedulerEvent::Completed(header) => { /* header.task_id, header.label, ... */ } + SchedulerEvent::Failed { header, error, will_retry } => { ... } + SchedulerEvent::Progress { header, percent, message } => { ... } +} + +// Or use the convenience accessor: +if let Some(header) = event.header() { ... } +``` + +`EstimatedProgress` fields `task_id`, `task_type`, `key`, `label` are also nested under `header: TaskEventHeader`. + +## `payload_json()` and `from_typed()` no longer return `Result` + +Both methods now always return `Self`, keeping the builder chain unbroken. Serialization errors are deferred and surfaced when calling `scheduler.submit()` / `store.submit()` as `StoreError::Serialization`. + +**Before:** +```rust +let sub = TaskSubmission::new("task") + .key("k") + .payload_json(&data)? // breaks the chain + .priority(Priority::HIGH); + +let sub = TaskSubmission::from_typed(&task)?; +``` + +**After:** +```rust +let sub = TaskSubmission::new("task") + .key("k") + .payload_json(&data) // always returns Self + .priority(Priority::HIGH); + +let sub = TaskSubmission::from_typed(&task); +``` + +Remove any `?` operators on `payload_json()` or `from_typed()` calls. Errors are still caught before the task is persisted — they just surface at submit time instead. From 2555e3c3a3deaf7ec10bee93dff873f0f96487ce Mon Sep 17 00:00:00 2001 From: DJ Majumdar Date: Sat, 14 Mar 2026 10:32:09 -0700 Subject: [PATCH 3/3] style: apply oxfmt formatting to scheduler and store modules --- src/scheduler/dispatch.rs | 16 +++------------- src/scheduler/event.rs | 7 +++---- src/scheduler/mod.rs | 4 +++- src/scheduler/submit.rs | 10 ++++++++-- src/store/hierarchy.rs | 7 ++----- src/store/lifecycle.rs | 18 +++--------------- src/store/query.rs | 20 ++++---------------- src/store/submit.rs | 5 +---- 8 files changed, 27 insertions(+), 60 deletions(-) diff --git a/src/scheduler/dispatch.rs b/src/scheduler/dispatch.rs index 1125e6b..5baa5c3 100644 --- a/src/scheduler/dispatch.rs +++ b/src/scheduler/dispatch.rs @@ -8,7 +8,7 @@ use tokio_util::sync::CancellationToken; use crate::priority::Priority; use crate::registry::{ChildSpawner, IoTracker, TaskContext}; use crate::store::TaskStore; -use crate::task::{ParentResolution, IoBudget, TaskRecord}; +use crate::task::{IoBudget, ParentResolution, TaskRecord}; use super::progress::ProgressReporter; use super::SchedulerEvent; @@ -271,11 +271,7 @@ pub(crate) async fn spawn_task( let ctx = TaskContext { record: task.clone(), token: child_token.clone(), - progress: ProgressReporter::new( - task.event_header(), - event_tx.clone(), - active.clone(), - ), + progress: ProgressReporter::new(task.event_header(), event_tx.clone(), active.clone()), scheduler, app_state, child_spawner: Some(child_spawner), @@ -415,13 +411,7 @@ pub(crate) async fn spawn_task( // Fail the parent. let msg = format!("child task {task_id} failed: {}", te.message); if let Err(e) = store - .fail_with_record( - &parent, - &msg, - false, - 0, - &IoBudget::default(), - ) + .fail_with_record(&parent, &msg, false, 0, &IoBudget::default()) .await { tracing::error!( diff --git a/src/scheduler/event.rs b/src/scheduler/event.rs index 635b40d..aaf2c03 100644 --- a/src/scheduler/event.rs +++ b/src/scheduler/event.rs @@ -94,10 +94,9 @@ impl SchedulerEvent { /// Returns the [`TaskEventHeader`] if this event is task-specific. pub fn header(&self) -> Option<&TaskEventHeader> { match self { - Self::Dispatched(h) - | Self::Completed(h) - | Self::Preempted(h) - | Self::Cancelled(h) => Some(h), + Self::Dispatched(h) | Self::Completed(h) | Self::Preempted(h) | Self::Cancelled(h) => { + Some(h) + } Self::Failed { header, .. } | Self::Progress { header, .. } => Some(header), Self::Waiting { .. } | Self::Paused | Self::Resumed => None, } diff --git a/src/scheduler/mod.rs b/src/scheduler/mod.rs index 84686be..8bb1711 100644 --- a/src/scheduler/mod.rs +++ b/src/scheduler/mod.rs @@ -48,7 +48,9 @@ use crate::store::TaskStore; use dispatch::ActiveTaskMap; pub use builder::SchedulerBuilder; -pub use event::{SchedulerConfig, SchedulerEvent, SchedulerSnapshot, ShutdownMode, TaskEventHeader}; +pub use event::{ + SchedulerConfig, SchedulerEvent, SchedulerSnapshot, ShutdownMode, TaskEventHeader, +}; pub use gate::GroupLimits; pub use progress::{EstimatedProgress, ProgressReporter}; diff --git a/src/scheduler/submit.rs b/src/scheduler/submit.rs index 36e1ade..4d7b393 100644 --- a/src/scheduler/submit.rs +++ b/src/scheduler/submit.rs @@ -140,7 +140,10 @@ impl Scheduler { if let Some(at) = self.inner.active.remove(*child_id) { at.token.cancel(); let _ = self.inner.store.delete(*child_id).await; - let _ = self.inner.event_tx.send(SchedulerEvent::Cancelled(at.record.event_header())); + let _ = self + .inner + .event_tx + .send(SchedulerEvent::Cancelled(at.record.event_header())); } } @@ -148,7 +151,10 @@ impl Scheduler { if let Some(at) = self.inner.active.remove(task_id) { at.token.cancel(); self.inner.store.delete(task_id).await?; - let _ = self.inner.event_tx.send(SchedulerEvent::Cancelled(at.record.event_header())); + let _ = self + .inner + .event_tx + .send(SchedulerEvent::Cancelled(at.record.event_header())); return Ok(true); } diff --git a/src/store/hierarchy.rs b/src/store/hierarchy.rs index e2f62c8..0b65541 100644 --- a/src/store/hierarchy.rs +++ b/src/store/hierarchy.rs @@ -154,7 +154,7 @@ impl TaskStore { #[cfg(test)] mod tests { use crate::priority::Priority; - use crate::task::{ParentResolution, IoBudget, TaskStatus, TaskSubmission}; + use crate::task::{IoBudget, ParentResolution, TaskStatus, TaskSubmission}; use super::super::TaskStore; @@ -353,10 +353,7 @@ mod tests { let task = store.pop_next().await.unwrap().unwrap(); assert!(!task.fail_fast); - store - .complete(task.id, &IoBudget::default()) - .await - .unwrap(); + store.complete(task.id, &IoBudget::default()).await.unwrap(); let hist = store.history(10, 0).await.unwrap(); assert!(!hist[0].fail_fast); diff --git a/src/store/lifecycle.rs b/src/store/lifecycle.rs index 9c5247a..6d80035 100644 --- a/src/store/lifecycle.rs +++ b/src/store/lifecycle.rs @@ -396,10 +396,7 @@ mod tests { let task = store.pop_next().await.unwrap().unwrap(); store - .complete( - task.id, - &IoBudget::disk(2000, 1000), - ) + .complete(task.id, &IoBudget::disk(2000, 1000)) .await .unwrap(); @@ -445,13 +442,7 @@ mod tests { let task = store.pop_next().await.unwrap().unwrap(); assert_eq!(task.retry_count, 1); store - .fail( - task.id, - "err2", - true, - 1, - &IoBudget::disk(100, 50), - ) + .fail(task.id, "err2", true, 1, &IoBudget::disk(100, 50)) .await .unwrap(); @@ -513,10 +504,7 @@ mod tests { let sub = make_submission("reuse", Priority::NORMAL); store.submit(&sub).await.unwrap(); let task = store.pop_next().await.unwrap().unwrap(); - store - .complete(task.id, &IoBudget::default()) - .await - .unwrap(); + store.complete(task.id, &IoBudget::default()).await.unwrap(); let outcome = store.submit(&sub).await.unwrap(); assert!(outcome.is_inserted()); diff --git a/src/store/query.rs b/src/store/query.rs index bc2d86a..2637f11 100644 --- a/src/store/query.rs +++ b/src/store/query.rs @@ -331,10 +331,7 @@ mod tests { let task = store.pop_next().await.unwrap().unwrap(); store - .complete( - task.id, - &IoBudget::disk(100, 50), - ) + .complete(task.id, &IoBudget::disk(100, 50)) .await .unwrap(); @@ -358,10 +355,7 @@ mod tests { store.submit(&sub).await.unwrap(); let task = store.pop_next().await.unwrap().unwrap(); store - .complete( - task.id, - &IoBudget::disk(1000, 500), - ) + .complete(task.id, &IoBudget::disk(1000, 500)) .await .unwrap(); } @@ -414,10 +408,7 @@ mod tests { let key = sub.effective_key(); store.submit(&sub).await.unwrap(); let task = store.pop_next().await.unwrap().unwrap(); - store - .complete(task.id, &IoBudget::default()) - .await - .unwrap(); + store.complete(task.id, &IoBudget::default()).await.unwrap(); let result = store.task_lookup(&key).await.unwrap(); assert!( @@ -441,10 +432,7 @@ mod tests { let sub = make_submission(&format!("prune-{i}"), Priority::NORMAL); store.submit(&sub).await.unwrap(); let task = store.pop_next().await.unwrap().unwrap(); - store - .complete(task.id, &IoBudget::default()) - .await - .unwrap(); + store.complete(task.id, &IoBudget::default()).await.unwrap(); } let hist = store.history(100, 0).await.unwrap(); diff --git a/src/store/submit.rs b/src/store/submit.rs index 11a6d98..842c5de 100644 --- a/src/store/submit.rs +++ b/src/store/submit.rs @@ -226,10 +226,7 @@ mod tests { assert!(running.requeue); assert_eq!(running.requeue_priority, Some(Priority::HIGH)); - store - .complete(task.id, &IoBudget::default()) - .await - .unwrap(); + store.complete(task.id, &IoBudget::default()).await.unwrap(); let requeued = store.task_by_key(&key).await.unwrap().unwrap(); assert_eq!(requeued.status, crate::task::TaskStatus::Pending);