diff --git a/README.md b/README.md index ea850cc..4211fe9 100644 --- a/README.md +++ b/README.md @@ -12,11 +12,10 @@ throughput. ```rust use std::sync::Arc; -use std::time::Duration; use tokio_util::sync::CancellationToken; use taskmill::{ Scheduler, Priority, TaskSubmission, TaskExecutor, - TaskContext, TaskResult, TaskError, ShutdownMode, + TaskContext, TaskError, }; struct ThumbnailGenerator; @@ -24,9 +23,11 @@ struct ThumbnailGenerator; impl TaskExecutor for ThumbnailGenerator { async fn execute<'a>( &'a self, ctx: &'a TaskContext, - ) -> Result { - ctx.progress.report(0.5, Some("resizing".into())); - Ok(TaskResult { actual_read_bytes: 4096, actual_write_bytes: 1024 }) + ) -> Result<(), TaskError> { + ctx.progress().report(0.5, Some("resizing".into())); + ctx.record_read_bytes(4096); + ctx.record_write_bytes(1024); + Ok(()) } } @@ -41,12 +42,11 @@ async fn main() { .await .unwrap(); - scheduler.submit(&TaskSubmission::with_payload( - "thumbnail", - Priority::NORMAL, - &serde_json::json!({"path": "/photos/img.jpg"}), - 4096, 1024, - ).unwrap()).await.unwrap(); + let sub = TaskSubmission::new("thumbnail") + .payload_json(&serde_json::json!({"path": "/photos/img.jpg"})) + .unwrap() + .expected_io(4096, 1024); + scheduler.submit(&sub).await.unwrap(); let token = CancellationToken::new(); scheduler.run(token).await; diff --git a/docs/ARCHITECTURE.md b/docs/ARCHITECTURE.md index 910808a..035882d 100644 --- a/docs/ARCHITECTURE.md +++ b/docs/ARCHITECTURE.md @@ -9,7 +9,7 @@ SQLite persistence, designed for desktop apps (Tauri) and background services. taskmill/ src/ lib.rs — public API re-exports - task.rs — TaskRecord, TaskSubmission, TaskResult, TaskError, TypedTask, etc. + task.rs — TaskRecord, TaskSubmission, TaskError, TypedTask, etc. priority.rs — Priority newtype (u8, lower = higher priority) store.rs — TaskStore: SQLite persistence, atomic pop, queries, retention registry.rs — TaskExecutor trait (RPITIT), TaskContext, TaskTypeRegistry @@ -25,6 +25,7 @@ taskmill/ sysinfo_monitor.rs — SysinfoSampler via `sysinfo` crate (feature-gated) migrations/ 001_tasks.sql — tasks table, task_history table, indexes + 002_add_label.sql — adds human-readable label column ``` ## Task lifecycle @@ -39,7 +40,7 @@ Submit ──► Pending ──► Running ──► Completed (moved to task_h └─────────────────┘ (resumed when preemptors finish) ``` -Active-queue states (`tasks` table): `pending`, `running`, `paused`. +Active-queue states (`tasks` table): `pending`, `running`, `paused`, `waiting`. Terminal states (`task_history` table): `completed`, `failed`. ## Data flow @@ -67,6 +68,7 @@ flowchart TD | `id` | `INTEGER PRIMARY KEY` — insertion order within tier| | `task_type` | Executor lookup name (e.g. `"scan-l3"`) | | `key` | `UNIQUE` — SHA-256 deduplication key | +| `label` | Human-readable display label (original key or type)| | `priority` | `INTEGER NOT NULL` — 0 (highest) to 255 (lowest) | | `status` | `TEXT` — `pending`, `running`, or `paused` | | `payload` | `BLOB` — opaque, max 1 MiB, executor-defined | @@ -122,8 +124,9 @@ type is always incorporated so different types with identical payloads never collide. Enforcement uses the `UNIQUE(key)` constraint with `INSERT OR IGNORE` — a -duplicate submission silently returns `None`. The key stays occupied while the -task is active (including retries) and is freed when the task moves to history. +duplicate submission returns `SubmitOutcome::Duplicate`. The key stays occupied +while the task is active (including retries) and is freed when the task moves +to history. ## Priority queue @@ -323,7 +326,7 @@ When a task is submitted at or above `preempt_priority` (default `REALTIME`): 3. On subsequent poll cycles, paused tasks are only resumed when no active preemptors remain — this prevents a thrashing loop of pause/resume/re-preempt. -Executors cooperate by checking `ctx.token.is_cancelled()` at yield points. An +Executors cooperate by checking `ctx.token().is_cancelled()` at yield points. An executor that ignores cancellation continues running but is no longer tracked; its completion or failure is still recorded normally. @@ -362,8 +365,8 @@ All variants derive `Serialize`/`Deserialize`. ### Executor-reported -Executors call `ctx.progress.report(percent, message)` or -`ctx.progress.report_fraction(completed, total, message)`. These emit +Executors call `ctx.progress().report(percent, message)` or +`ctx.progress().report_fraction(completed, total, message)`. These emit `SchedulerEvent::Progress` and update the active task map. ### Throughput-extrapolated @@ -510,7 +513,7 @@ app.manage(scheduler); // Scheduler is Clone — no Arc needed #[tauri::command] async fn submit_task( scheduler: tauri::State<'_, Scheduler>, -) -> Result, StoreError> { +) -> Result { scheduler.submit(&submission).await } diff --git a/docs/features.md b/docs/features.md index 83da43c..39af17b 100644 --- a/docs/features.md +++ b/docs/features.md @@ -49,13 +49,13 @@ A complete list of taskmill's capabilities. ## Retries -- **Automatic requeue** — retryable failures (`TaskError { retryable: true }`) are requeued at the same priority with `retry_count += 1`. +- **Automatic requeue** — retryable failures (`TaskError::retryable(msg)`) are requeued at the same priority with `retry_count += 1`. - **Configurable limit** — `max_retries` (default 3) controls how many times a task can be retried before permanent failure. - **Dedup preserved** — the key stays occupied during retries, preventing duplicate submission of in-progress work. ## Progress Reporting -- **Executor-reported progress** — report percentage or fraction-based progress via `ctx.progress.report()` or `ctx.progress.report_fraction()`. +- **Executor-reported progress** — report percentage or fraction-based progress via `ctx.progress().report()` or `ctx.progress().report_fraction()`. - **Throughput-based extrapolation** — for tasks without explicit reports, the scheduler extrapolates progress from historical average duration, capped at 99% to avoid false completion signals. - **Event-driven** — progress updates are emitted as `SchedulerEvent::Progress` for real-time UI updates. @@ -72,13 +72,19 @@ A complete list of taskmill's capabilities. ## Typed Payloads -- **Structured submission** — `TaskSubmission::with_payload()` serializes any `Serialize` type to JSON bytes. -- **Type-safe deserialization** — `TaskRecord::deserialize_payload::()` in executors. -- **TypedTask trait** — define `TASK_TYPE`, default priority, and expected IO on your struct. Submit with `scheduler.submit_typed()` and deserialize with `ctx.deserialize_typed()`. +- **Builder-style submission** — `TaskSubmission::new(type).payload_json(&data)?.expected_io(r, w)` for ergonomic construction with serialization. +- **Type-safe deserialization** — `ctx.payload::()?` in executors for zero-boilerplate extraction. +- **TypedTask trait** — define `TASK_TYPE`, default priority, and expected IO on your struct. Submit with `scheduler.submit_typed()` and deserialize with `ctx.payload::()`. + +## Child Tasks + +- **Hierarchical execution** — spawn child tasks from an executor via `ctx.spawn_child()`. The parent enters a `waiting` state and resumes for finalization after all children complete. +- **Two-phase execution** — implement `TaskExecutor::finalize()` for assembly work after children finish (e.g. `CompleteMultipartUpload`). +- **Fail-fast** — when enabled (default), the first child failure cancels siblings and fails the parent immediately. ## Batch Operations -- **Bulk enqueue** — `submit_batch()` wraps many inserts in a single SQLite transaction. Returns `Vec>` where `None` indicates deduplication. +- **Bulk enqueue** — `submit_batch()` wraps many inserts in a single SQLite transaction. Returns `Vec` indicating whether each was inserted, upgraded, requeued, or deduplicated. ## Graceful Shutdown diff --git a/docs/io-and-backpressure.md b/docs/io-and-backpressure.md index 6ec709d..6c12bd6 100644 --- a/docs/io-and-backpressure.md +++ b/docs/io-and-backpressure.md @@ -9,25 +9,18 @@ Taskmill combines two independent gating mechanisms — IO budget tracking and c Every `TaskSubmission` includes expected IO: ```rust -let sub = TaskSubmission { - task_type: "scan".into(), - key: None, - priority: Priority::NORMAL, - payload: Some(data), - expected_read_bytes: 50_000, // caller's estimate - expected_write_bytes: 10_000, -}; +let sub = TaskSubmission::new("scan") + .payload_raw(data) + .expected_io(50_000, 10_000); // caller's estimate ``` ### Completion actuals -Executors report actual IO in `TaskResult`: +Executors report actual IO via context methods during execution: ```rust -Ok(TaskResult { - actual_read_bytes: 48_312, - actual_write_bytes: 9_876, -}) +ctx.record_read_bytes(48_312); +ctx.record_write_bytes(9_876); ``` Actual values are stored in `task_history` for learning. diff --git a/docs/persistence-and-recovery.md b/docs/persistence-and-recovery.md index 17cb840..c493c56 100644 --- a/docs/persistence-and-recovery.md +++ b/docs/persistence-and-recovery.md @@ -15,11 +15,14 @@ Holds pending, running, and paused tasks. | `id` | INTEGER PRIMARY KEY | Insertion-order ID | | `task_type` | TEXT NOT NULL | Executor lookup name | | `key` | TEXT NOT NULL UNIQUE | SHA-256 dedup key | +| `label` | TEXT NOT NULL | Human-readable display name (original dedup key or task type) | | `priority` | INTEGER NOT NULL | 0–255 (lower = higher priority) | -| `status` | TEXT DEFAULT 'pending' | `pending`, `running`, or `paused` | +| `status` | TEXT DEFAULT 'pending' | `pending`, `running`, `paused`, or `waiting` | | `payload` | BLOB | Opaque task data (max 1 MiB) | | `expected_read_bytes` | INTEGER | Estimated read IO | | `expected_write_bytes` | INTEGER | Estimated write IO | +| `parent_id` | INTEGER | Parent task ID for child tasks (NULL for top-level) | +| `fail_fast` | INTEGER DEFAULT 1 | Whether child failure cancels siblings and fails parent | | `retry_count` | INTEGER DEFAULT 0 | Number of retries so far | | `last_error` | TEXT | Most recent error message | | `created_at` | TEXT | ISO 8601 timestamp | @@ -31,7 +34,7 @@ Holds pending, running, and paused tasks. | Column | Type | Description | |--------|------|-------------| -| *(all columns from `tasks`)* | | | +| *(all columns from `tasks`, including `label`, `parent_id`, `fail_fast`)* | | | | `actual_read_bytes` | INTEGER | Reported by executor | | `actual_write_bytes` | INTEGER | Reported by executor | | `completed_at` | TEXT | ISO 8601 timestamp | @@ -61,29 +64,33 @@ This resets any tasks that were mid-execution when the process died. The behavio Every task gets a SHA-256 key: `SHA-256(task_type + ":" + (explicit_key OR payload))`. -- **Implicit key** — if no `key` is provided, the payload bytes are used. Tasks with the same type and payload get the same key. -- **Explicit key** — set `TaskSubmission.key` to control deduplication yourself. Useful when two payloads represent the same logical work (e.g., different timestamps but same file path). +- **Implicit key** — if no key is provided, the payload bytes are used. Tasks with the same type and payload get the same key. +- **Explicit key** — use the `.key()` builder method to control deduplication yourself. Useful when two payloads represent the same logical work (e.g., different timestamps but same file path). The explicit key is also stored as the display `label`. - **Type scoping** — the task type is always part of the hash, so `("resize", payload)` and `("compress", payload)` never collide. ### Lifecycle -A key is "occupied" while the task is in the `tasks` table (pending, running, paused, or retrying). When the task moves to `task_history` (completed or failed), the key is freed and can be resubmitted. +A key is "occupied" while the task is in the `tasks` table (pending, running, paused, waiting, or retrying). When the task moves to `task_history` (completed or failed), the key is freed and can be resubmitted. ### Submission behavior ```rust -// Returns Some(id) if inserted -let id = scheduler.submit(&submission).await?; // Ok(Some(42)) - -// Returns None if a task with the same key already exists -let id = scheduler.submit(&submission).await?; // Ok(None) +use taskmill::SubmitOutcome; + +let outcome = scheduler.submit(&submission).await?; +match outcome { + SubmitOutcome::Inserted(id) => println!("new task: {id}"), + SubmitOutcome::Duplicate => println!("already queued"), + SubmitOutcome::Upgraded(id) => println!("priority upgraded: {id}"), + SubmitOutcome::Requeued(id) => println!("requeued from history: {id}"), +} ``` `submit_batch()` applies the same dedup within a single transaction: ```rust -let ids = scheduler.submit_batch(&[sub1, sub2, sub3]).await?; -// ids = [Some(1), None, Some(2)] — sub2 was a duplicate +let outcomes = scheduler.submit_batch(&[sub1, sub2, sub3]).await?; +// outcomes: Vec — sub2 might be Duplicate ``` ### Looking up tasks by dedup key diff --git a/docs/priorities-and-preemption.md b/docs/priorities-and-preemption.md index 36b069a..557c66e 100644 --- a/docs/priorities-and-preemption.md +++ b/docs/priorities-and-preemption.md @@ -43,28 +43,24 @@ Executors should check for cancellation at natural yield points: impl TaskExecutor for MyExecutor { async fn execute<'a>( &'a self, ctx: &'a TaskContext, - ) -> Result { + ) -> Result<(), TaskError> { for chunk in chunks { // Check before each unit of work - if ctx.token.is_cancelled() { - return Err(TaskError { - message: "preempted".into(), - retryable: true, - actual_read_bytes: bytes_read_so_far, - actual_write_bytes: bytes_written_so_far, - }); + if ctx.token().is_cancelled() { + return Err(TaskError::retryable("preempted")); } process(chunk).await; - ctx.progress.report_fraction(i, total, None); + ctx.record_read_bytes(chunk.len() as i64); + ctx.progress().report_fraction(i, total, None); } - Ok(TaskResult { actual_read_bytes: total_read, actual_write_bytes: total_written }) + Ok(()) } } ``` -Returning a retryable error on preemption is optional — the scheduler handles pausing regardless. But it gives the executor a chance to report partial IO and clean up. +Returning a retryable error on preemption is optional — the scheduler handles pausing regardless. But it gives the executor a chance to clean up. ### Configuring preemption threshold diff --git a/docs/progress-reporting.md b/docs/progress-reporting.md index 2c65067..fc3ade3 100644 --- a/docs/progress-reporting.md +++ b/docs/progress-reporting.md @@ -4,26 +4,26 @@ Taskmill provides real-time progress tracking for running tasks, combining execu ## Reporting from executors -Executors receive a `ProgressReporter` via `ctx.progress`: +Executors receive a `ProgressReporter` via `ctx.progress()`: ```rust impl TaskExecutor for MyExecutor { async fn execute<'a>( &'a self, ctx: &'a TaskContext, - ) -> Result { + ) -> Result<(), TaskError> { let items = get_work_items(); for (i, item) in items.iter().enumerate() { process(item).await; // Percentage-based (0.0 to 1.0) - ctx.progress.report( + ctx.progress().report( (i + 1) as f32 / items.len() as f32, Some(format!("processed {}/{}", i + 1, items.len())), ); } - Ok(TaskResult { actual_read_bytes: 0, actual_write_bytes: 0 }) + Ok(()) } } ``` @@ -33,7 +33,7 @@ impl TaskExecutor for MyExecutor { For count-based progress: ```rust -ctx.progress.report_fraction(processed, total, Some("importing".into())); +ctx.progress().report_fraction(processed, total, Some("importing".into())); // Automatically computes: processed as f32 / total as f32 ``` @@ -46,6 +46,7 @@ SchedulerEvent::Progress { task_id: 42, task_type: "resize".into(), key: "abc123".into(), + label: "my-image.jpg".into(), percent: 0.5, message: Some("resizing".into()), } @@ -107,12 +108,12 @@ All scheduler state changes are broadcast as `SchedulerEvent` variants: | Event | When | |-------|------| -| `Dispatched { task_id, task_type, key }` | Task popped from queue and executor spawned | -| `Completed { task_id, task_type, key }` | Task finished successfully | -| `Failed { task_id, task_type, key, error, will_retry }` | Task failed (includes whether it will be retried) | -| `Preempted { task_id, task_type, key }` | Task paused for higher-priority work | -| `Cancelled { task_id, task_type, key }` | Task cancelled via `scheduler.cancel()` | -| `Progress { task_id, task_type, key, percent, message }` | Progress update from executor | +| `Dispatched { task_id, task_type, key, label }` | Task popped from queue and executor spawned | +| `Completed { task_id, task_type, key, label }` | Task finished successfully | +| `Failed { task_id, task_type, key, label, error, will_retry }` | Task failed (includes whether it will be retried) | +| `Preempted { task_id, task_type, key, label }` | Task paused for higher-priority work | +| `Cancelled { task_id, task_type, key, label }` | Task cancelled via `scheduler.cancel()` | +| `Progress { task_id, task_type, key, label, percent, message }` | Progress update from executor | | `Paused` | Scheduler globally paused via `pause_all()` | | `Resumed` | Scheduler resumed via `resume_all()` | diff --git a/docs/quick-start.md b/docs/quick-start.md index bc3b871..d9c7492 100644 --- a/docs/quick-start.md +++ b/docs/quick-start.md @@ -18,16 +18,16 @@ taskmill = { path = "crates/taskmill", default-features = false } ## Implement an executor -Each task type needs a `TaskExecutor` implementation. The executor receives a `TaskContext` containing: +Each task type needs a `TaskExecutor` implementation. The executor receives a `TaskContext` with accessor methods: -- `record` — the full `TaskRecord` with payload (up to 1 MiB), priority, retry count, etc. -- `token` — a `CancellationToken` for preemption support -- `progress` — a `ProgressReporter` for reporting progress back to the scheduler -- Shared application state (if registered via `.app_state()` or `register_state()`) +- `record()` — the full `TaskRecord` with payload (up to 1 MiB), priority, retry count, etc. +- `token()` — a `CancellationToken` for preemption support +- `progress()` — a `ProgressReporter` for reporting progress back to the scheduler +- `state::()` — shared application state (if registered via `.app_state()` or `register_state()`) ```rust use std::sync::Arc; -use taskmill::{TaskExecutor, TaskContext, TaskResult, TaskError}; +use taskmill::{TaskExecutor, TaskContext, TaskError}; struct ImageResizer; @@ -35,29 +35,24 @@ impl TaskExecutor for ImageResizer { async fn execute<'a>( &'a self, ctx: &'a TaskContext, - ) -> Result { + ) -> Result<(), TaskError> { // Deserialize your payload - let data: Option = ctx.record.deserialize_payload()?; + let data: Option = ctx.record().deserialize_payload()?; // Check for preemption at yield points - if ctx.token.is_cancelled() { - return Err(TaskError { - message: "preempted".into(), - retryable: true, - actual_read_bytes: 0, - actual_write_bytes: 0, - }); + if ctx.token().is_cancelled() { + return Err(TaskError::retryable("preempted")); } // Report progress - ctx.progress.report(0.5, Some("resizing".into())); + ctx.progress().report(0.5, Some("resizing".into())); // Do work... - Ok(TaskResult { - actual_read_bytes: 4096, - actual_write_bytes: 1024, - }) + // Report actual IO + ctx.record_read_bytes(4096); + ctx.record_write_bytes(1024); + Ok(()) } } ``` @@ -95,26 +90,22 @@ async fn main() { }); // Submit a single task with a typed payload. - scheduler.submit(&TaskSubmission::with_payload( - "resize", - Priority::NORMAL, - &serde_json::json!({"path": "/photos/image.jpg", "width": 300}), - 4096, // expected read bytes - 1024, // expected write bytes - ).unwrap()).await.unwrap(); + let sub = TaskSubmission::new("resize") + .payload_json(&serde_json::json!({"path": "/photos/image.jpg", "width": 300})) + .unwrap() + .expected_io(4096, 1024); + scheduler.submit(&sub).await.unwrap(); // Submit tasks in bulk (single SQLite transaction). let paths = vec!["/a.jpg", "/b.jpg", "/c.jpg"]; let batch: Vec<_> = paths.iter().map(|p| { - TaskSubmission::with_payload( - "resize", - Priority::NORMAL, - &serde_json::json!({"path": p}), - 4096, 1024, - ).unwrap() + TaskSubmission::new("resize") + .payload_json(&serde_json::json!({"path": p})) + .unwrap() + .expected_io(4096, 1024) }).collect(); - let ids = scheduler.submit_batch(&batch).await.unwrap(); - // ids[i] is Some(row_id) if inserted, None if deduplicated. + let outcomes = scheduler.submit_batch(&batch).await.unwrap(); + // Each outcome is Inserted, Upgraded, Requeued, or Duplicate. // Run the scheduler loop (blocks until the token is cancelled). let token = CancellationToken::new(); @@ -151,7 +142,7 @@ scheduler.submit_typed(&ResizeTask { }).await?; // In the executor: -let task: Option = ctx.deserialize_typed()?; +let task: ResizeTask = ctx.payload()?; ``` ## Manual wiring diff --git a/migrations/002_add_label.sql b/migrations/002_add_label.sql new file mode 100644 index 0000000..060e7cb --- /dev/null +++ b/migrations/002_add_label.sql @@ -0,0 +1,5 @@ +-- Add a human-readable label column to both tables. +-- The label stores the original dedup_key (or task_type if no explicit key), +-- while the `key` column retains the SHA-256 hash for dedup. +ALTER TABLE tasks ADD COLUMN label TEXT NOT NULL DEFAULT ''; +ALTER TABLE task_history ADD COLUMN label TEXT NOT NULL DEFAULT ''; diff --git a/src/lib.rs b/src/lib.rs index f358875..98e63fb 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -108,8 +108,8 @@ //! &'a self, ctx: &'a TaskContext, //! ) -> Result<(), TaskError> { //! let thumb: Thumbnail = ctx.payload()?; -//! ctx.progress.report(0.5, Some("resizing".into())); -//! // ... do work, check ctx.token.is_cancelled() ... +//! ctx.progress().report(0.5, Some("resizing".into())); +//! // ... do work, check ctx.token().is_cancelled() ... //! ctx.record_read_bytes(4_096); //! ctx.record_write_bytes(1_024); //! Ok(()) @@ -229,16 +229,13 @@ //! async fn execute<'a>(&'a self, ctx: &'a TaskContext) -> Result<(), TaskError> { //! let upload: MultipartUpload = ctx.payload()?; //! for part in &upload.parts { -//! ctx.spawn_child(TaskSubmission { -//! task_type: "upload-part".into(), -//! dedup_key: Some(part.etag.clone()), -//! priority: ctx.record.priority, -//! payload: Some(serde_json::to_vec(part)?), -//! expected_read_bytes: part.size as i64, -//! expected_write_bytes: 0, -//! parent_id: None, // set automatically by spawn_child -//! fail_fast: true, -//! }).await?; +//! ctx.spawn_child( +//! TaskSubmission::new("upload-part") +//! .key(&part.etag) +//! .priority(ctx.record().priority) +//! .payload_json(part)? +//! .expected_io(part.size as i64, 0), +//! ).await?; //! } //! Ok(()) //! } diff --git a/src/registry.rs b/src/registry.rs index 8642c46..c8ec0fb 100644 --- a/src/registry.rs +++ b/src/registry.rs @@ -164,36 +164,45 @@ impl IoTracker { /// Execution context passed to a [`TaskExecutor`]. /// -/// Bundles the task record, cancellation token, progress reporter, and -/// optional application state into a single value. This keeps the executor -/// signature stable when new contextual data is added in the future. +/// Provides access to the task record, cancellation token, progress reporter, +/// shared application state, and scoped task submission. Use the accessor +/// methods rather than accessing fields directly: +/// +/// - [`record()`](Self::record) — the full [`TaskRecord`] with payload, priority, etc. +/// - [`token()`](Self::token) — [`CancellationToken`] for preemption support +/// - [`progress()`](Self::progress) — [`ProgressReporter`] for reporting progress +/// - [`submit()`](Self::submit) / [`submit_typed()`](Self::submit_typed) — submit continuation tasks +/// - [`spawn_child()`](Self::spawn_child) — spawn hierarchical child tasks pub struct TaskContext { - /// The full task record including payload, priority, and IO estimates. - pub record: TaskRecord, - /// Cancelled when the task is preempted. Check `token.is_cancelled()` - /// at natural yield points and return early if set. - pub token: CancellationToken, - /// Report progress back to the scheduler (0.0–1.0). - pub progress: ProgressReporter, - /// Handle to the scheduler that dispatched this task. Allows executors to - /// submit continuation tasks, look up other tasks, etc. without needing - /// a separate `OnceLock` or `Arc` in application state. - pub scheduler: Scheduler, - /// Shared application state set via [`SchedulerBuilder::app_state`](crate::SchedulerBuilder::app_state). + pub(crate) record: TaskRecord, + pub(crate) token: CancellationToken, + pub(crate) progress: ProgressReporter, + pub(crate) scheduler: Scheduler, pub(crate) app_state: StateSnapshot, - /// Spawner for creating child tasks via [`spawn_child`](Self::spawn_child) - /// and [`spawn_children`](Self::spawn_children). Present for all tasks - /// dispatched by the scheduler — the parent relationship is set automatically - /// when children are spawned. pub(crate) child_spawner: Option, - /// IO bytes accumulator fed by [`record_read_bytes`](Self::record_read_bytes) - /// and [`record_write_bytes`](Self::record_write_bytes). The scheduler reads - /// the final totals after the executor returns and stores them in history - /// for future IO budget estimation. pub(crate) io: Arc, } impl TaskContext { + // ── Accessors ──────────────────────────────────────────────────── + + /// The persisted task record (id, key, priority, payload, etc.). + pub fn record(&self) -> &TaskRecord { + &self.record + } + + /// Cancellation token — check `token().is_cancelled()` for preemption. + pub fn token(&self) -> &CancellationToken { + &self.token + } + + /// Progress reporter for this task. + pub fn progress(&self) -> &ProgressReporter { + &self.progress + } + + // ── Payload ────────────────────────────────────────────────────── + /// Deserialize the payload as a [`TypedTask`]. /// /// Returns an error if the payload is missing or deserialization fails. @@ -215,14 +224,7 @@ impl TaskContext { .ok_or_else(|| TaskError::new("missing payload")) } - /// Deserialize the payload as a [`TypedTask`]. - /// - /// Convenience wrapper around [`TaskRecord::deserialize_payload`] that - /// mirrors the typed submission API. - #[deprecated(since = "2.0.0", note = "use `ctx.payload::()` instead")] - pub fn deserialize_typed(&self) -> Result, serde_json::Error> { - self.record.deserialize_payload() - } + // ── Shared state ───────────────────────────────────────────────── /// Retrieve shared application state registered via /// [`SchedulerBuilder::app_state`](crate::SchedulerBuilder::app_state) or @@ -244,6 +246,8 @@ impl TaskContext { self.app_state.get::() } + // ── IO tracking ────────────────────────────────────────────────── + /// Record actual bytes read during this task's execution. /// /// Can be called multiple times — values are accumulated. The scheduler @@ -260,6 +264,34 @@ impl TaskContext { self.io.write_bytes.fetch_add(bytes, Ordering::Relaxed); } + // ── Task submission (scoped scheduler access) ──────────────────── + + /// Submit a continuation or follow-up task. + /// + /// This is the primary way to enqueue new work from inside an executor + /// without exposing the full [`Scheduler`](crate::Scheduler) handle. + pub async fn submit(&self, sub: &TaskSubmission) -> Result { + self.scheduler.submit(sub).await + } + + /// Submit a [`TypedTask`], handling serialization automatically. + /// + /// Uses the priority from [`TypedTask::priority()`]. + pub async fn submit_typed(&self, task: &T) -> Result { + self.scheduler.submit_typed(task).await + } + + /// Submit a [`TypedTask`] with an explicit priority override. + pub async fn submit_typed_at( + &self, + task: &T, + priority: crate::Priority, + ) -> Result { + self.scheduler.submit_typed_at(task, priority).await + } + + // ── Child tasks ────────────────────────────────────────────────── + /// Spawn a child task that will be tracked under this task as parent. /// /// The child's `parent_id` is set automatically. Returns the submit @@ -308,7 +340,7 @@ impl TaskContext { /// &'a self, /// ctx: &'a TaskContext, /// ) -> Result<(), TaskError> { -/// ctx.progress.report(0.5, Some("halfway".into())); +/// ctx.progress().report(0.5, Some("halfway".into())); /// Ok(()) /// } /// } diff --git a/src/scheduler/dispatch.rs b/src/scheduler/dispatch.rs index 2ec890c..df26d01 100644 --- a/src/scheduler/dispatch.rs +++ b/src/scheduler/dispatch.rs @@ -127,6 +127,7 @@ impl ActiveTaskMap { task_id: id, task_type: at.record.task_type.clone(), key: at.record.key.clone(), + label: at.record.label.clone(), }); preempted.push(id); } @@ -172,6 +173,7 @@ impl ActiveTaskMap { task_id: id, task_type: at.record.task_type.clone(), key: at.record.key.clone(), + label: at.record.label.clone(), }); } count @@ -242,6 +244,7 @@ pub(crate) async fn spawn_task( task.id, task.task_type.clone(), task.key.clone(), + task.label.clone(), event_tx.clone(), ), scheduler, @@ -255,6 +258,7 @@ pub(crate) async fn spawn_task( task_id: task.id, task_type: task.task_type.clone(), key: task.key.clone(), + label: task.label.clone(), }); // Spawn progress listener — bridges broadcast events into the active map. @@ -341,6 +345,7 @@ pub(crate) async fn spawn_task( task_id, task_type: task.task_type.clone(), key: task.key.clone(), + label: task.label.clone(), }); // If this was a child task, check if parent is ready. @@ -383,6 +388,7 @@ pub(crate) async fn spawn_task( task_id, task_type: task.task_type.clone(), key: task.key.clone(), + label: task.label.clone(), error: te.message.clone(), will_retry, }); @@ -404,6 +410,7 @@ pub(crate) async fn spawn_task( task_id: *rid, task_type: at.record.task_type.clone(), key: at.record.key.clone(), + label: at.record.label.clone(), }); } } @@ -424,6 +431,7 @@ pub(crate) async fn spawn_task( task_id: parent_id, task_type: parent.task_type.clone(), key: parent.key.clone(), + label: parent.label.clone(), error: msg, will_retry: false, }); @@ -477,6 +485,7 @@ async fn handle_parent_resolution( task_id: parent_id, task_type: parent.task_type.clone(), key: parent.key.clone(), + label: parent.label.clone(), error: reason, will_retry: false, }); diff --git a/src/scheduler/mod.rs b/src/scheduler/mod.rs index 604b4f7..b14b262 100644 --- a/src/scheduler/mod.rs +++ b/src/scheduler/mod.rs @@ -78,18 +78,21 @@ pub enum SchedulerEvent { task_id: i64, task_type: String, key: String, + label: String, }, /// A task completed successfully. Completed { task_id: i64, task_type: String, key: String, + label: String, }, /// A task failed (may be retried or permanently failed). Failed { task_id: i64, task_type: String, key: String, + label: String, error: String, will_retry: bool, }, @@ -98,18 +101,21 @@ pub enum SchedulerEvent { task_id: i64, task_type: String, key: String, + label: String, }, /// A task was cancelled by the application. Cancelled { task_id: i64, task_type: String, key: String, + label: String, }, /// Progress update from a running task. Progress { task_id: i64, task_type: String, key: String, + label: String, /// Progress percentage (0.0 to 1.0). percent: f32, /// Optional human-readable message from the executor. @@ -460,6 +466,7 @@ impl Scheduler { task_id: *child_id, task_type: at.record.task_type.clone(), key: at.record.key.clone(), + label: at.record.label.clone(), }); } } @@ -472,6 +479,7 @@ impl Scheduler { task_id, task_type: at.record.task_type.clone(), key: at.record.key.clone(), + label: at.record.label.clone(), }); return Ok(true); } @@ -1138,7 +1146,7 @@ mod tests { impl TaskExecutor for SlowExecutor { async fn execute<'a>(&'a self, ctx: &'a TaskContext) -> Result<(), TaskError> { tokio::select! { - _ = ctx.token.cancelled() => { + _ = ctx.token().cancelled() => { Err(TaskError::new("cancelled")) } _ = tokio::time::sleep(Duration::from_secs(60)) => { @@ -1182,16 +1190,7 @@ mod tests { let sched = setup(arc_erased(InstantExecutor)).await; sched - .submit(&TaskSubmission { - task_type: "test".into(), - dedup_key: Some("k1".into()), - priority: Priority::NORMAL, - payload: None, - expected_read_bytes: 0, - expected_write_bytes: 0, - parent_id: None, - fail_fast: true, - }) + .submit(&TaskSubmission::new("test").key("k1")) .await .unwrap(); @@ -1229,16 +1228,7 @@ mod tests { ); sched - .submit(&TaskSubmission { - task_type: "unknown".into(), - dedup_key: Some("k".into()), - priority: Priority::NORMAL, - payload: None, - expected_read_bytes: 0, - expected_write_bytes: 0, - parent_id: None, - fail_fast: true, - }) + .submit(&TaskSubmission::new("unknown").key("k")) .await .unwrap(); @@ -1253,16 +1243,7 @@ mod tests { async fn dedup_via_scheduler() { let sched = setup(arc_erased(InstantExecutor)).await; - let sub = TaskSubmission { - task_type: "test".into(), - dedup_key: Some("dup".into()), - priority: Priority::NORMAL, - payload: None, - expected_read_bytes: 0, - expected_write_bytes: 0, - parent_id: None, - fail_fast: true, - }; + let sub = TaskSubmission::new("test").key("dup"); let first = sched.submit(&sub).await.unwrap(); let second = sched.submit(&sub).await.unwrap(); @@ -1283,16 +1264,7 @@ mod tests { let sched = setup(arc_erased(InstantExecutor)).await; let id = sched - .submit(&TaskSubmission { - task_type: "test".into(), - dedup_key: Some("cancel-me".into()), - priority: Priority::NORMAL, - payload: None, - expected_read_bytes: 0, - expected_write_bytes: 0, - parent_id: None, - fail_fast: true, - }) + .submit(&TaskSubmission::new("test").key("cancel-me")) .await .unwrap() .id() @@ -1316,16 +1288,7 @@ mod tests { let sched = setup(arc_erased(SlowExecutor)).await; let id = sched - .submit(&TaskSubmission { - task_type: "test".into(), - dedup_key: Some("cancel-running".into()), - priority: Priority::NORMAL, - payload: None, - expected_read_bytes: 0, - expected_write_bytes: 0, - parent_id: None, - fail_fast: true, - }) + .submit(&TaskSubmission::new("test").key("cancel-running")) .await .unwrap() .id() @@ -1345,16 +1308,7 @@ mod tests { let mut rx = sched.subscribe(); sched - .submit(&TaskSubmission { - task_type: "test".into(), - dedup_key: Some("evt".into()), - priority: Priority::NORMAL, - payload: None, - expected_read_bytes: 0, - expected_write_bytes: 0, - parent_id: None, - fail_fast: true, - }) + .submit(&TaskSubmission::new("test").key("evt")) .await .unwrap(); @@ -1378,16 +1332,7 @@ mod tests { // Both should share the same store. sched - .submit(&TaskSubmission { - task_type: "test".into(), - dedup_key: Some("shared".into()), - priority: Priority::NORMAL, - payload: None, - expected_read_bytes: 0, - expected_write_bytes: 0, - parent_id: None, - fail_fast: true, - }) + .submit(&TaskSubmission::new("test").key("shared")) .await .unwrap(); @@ -1449,16 +1394,7 @@ mod tests { // Submit two tasks. for key in &["snap-a", "snap-b"] { sched - .submit(&TaskSubmission { - task_type: "test".into(), - dedup_key: Some(key.to_string()), - priority: Priority::NORMAL, - payload: None, - expected_read_bytes: 0, - expected_write_bytes: 0, - parent_id: None, - fail_fast: true, - }) + .submit(&TaskSubmission::new("test").key(*key)) .await .unwrap(); } @@ -1485,16 +1421,7 @@ mod tests { // Submit two tasks. for key in &["pa-1", "pa-2"] { sched - .submit(&TaskSubmission { - task_type: "test".into(), - dedup_key: Some(key.to_string()), - priority: Priority::NORMAL, - payload: None, - expected_read_bytes: 0, - expected_write_bytes: 0, - parent_id: None, - fail_fast: true, - }) + .submit(&TaskSubmission::new("test").key(*key)) .await .unwrap(); } @@ -1565,16 +1492,7 @@ mod tests { .unwrap(); sched - .submit(&TaskSubmission { - task_type: "test".into(), - dedup_key: Some("state-test".into()), - priority: Priority::NORMAL, - payload: None, - expected_read_bytes: 0, - expected_write_bytes: 0, - parent_id: None, - fail_fast: true, - }) + .submit(&TaskSubmission::new("test").key("state-test")) .await .unwrap(); @@ -1589,16 +1507,7 @@ mod tests { let sched = setup(arc_erased(InstantExecutor)).await; sched - .submit(&TaskSubmission { - task_type: "test".into(), - dedup_key: Some("lookup-1".into()), - priority: Priority::NORMAL, - payload: None, - expected_read_bytes: 0, - expected_write_bytes: 0, - parent_id: None, - fail_fast: true, - }) + .submit(&TaskSubmission::new("test").key("lookup-1")) .await .unwrap(); @@ -1614,16 +1523,7 @@ mod tests { let sched = setup(arc_erased(InstantExecutor)).await; sched - .submit(&TaskSubmission { - task_type: "test".into(), - dedup_key: Some("lookup-done".into()), - priority: Priority::NORMAL, - payload: None, - expected_read_bytes: 0, - expected_write_bytes: 0, - parent_id: None, - fail_fast: true, - }) + .submit(&TaskSubmission::new("test").key("lookup-done")) .await .unwrap(); @@ -1681,16 +1581,9 @@ mod tests { impl TaskExecutor for SpawningExecutor { async fn execute<'a>(&'a self, ctx: &'a TaskContext) -> Result<(), TaskError> { for i in 0..self.num_children { - let sub = TaskSubmission { - task_type: "child".into(), - dedup_key: Some(format!("child-{i}")), - priority: ctx.record.priority, - payload: None, - expected_read_bytes: 0, - expected_write_bytes: 0, - parent_id: None, // spawn_child sets this - fail_fast: true, - }; + let sub = TaskSubmission::new("child") + .key(format!("child-{i}")) + .priority(ctx.record().priority); ctx.spawn_child(sub).await?; } Ok(()) @@ -1706,16 +1599,9 @@ mod tests { impl TaskExecutor for FinalizeTrackingExecutor { async fn execute<'a>(&'a self, ctx: &'a TaskContext) -> Result<(), TaskError> { for i in 0..self.children { - let sub = TaskSubmission { - task_type: "child".into(), - dedup_key: Some(format!("ft-child-{i}")), - priority: ctx.record.priority, - payload: None, - expected_read_bytes: 0, - expected_write_bytes: 0, - parent_id: None, - fail_fast: true, - }; + let sub = TaskSubmission::new("child") + .key(format!("ft-child-{i}")) + .priority(ctx.record().priority); ctx.spawn_child(sub).await?; } Ok(()) @@ -1746,16 +1632,7 @@ mod tests { // Submit parent task. sched - .submit(&TaskSubmission { - task_type: "parent".into(), - dedup_key: Some("p1".into()), - priority: Priority::NORMAL, - payload: None, - expected_read_bytes: 0, - expected_write_bytes: 0, - parent_id: None, - fail_fast: true, - }) + .submit(&TaskSubmission::new("parent").key("p1")) .await .unwrap(); @@ -1806,16 +1683,7 @@ mod tests { let mut rx = sched.subscribe(); sched - .submit(&TaskSubmission { - task_type: "parent".into(), - dedup_key: Some("p-complete".into()), - priority: Priority::NORMAL, - payload: None, - expected_read_bytes: 0, - expected_write_bytes: 0, - parent_id: None, - fail_fast: true, - }) + .submit(&TaskSubmission::new("parent").key("p-complete")) .await .unwrap(); @@ -1879,16 +1747,7 @@ mod tests { let mut rx = sched.subscribe(); sched - .submit(&TaskSubmission { - task_type: "parent".into(), - dedup_key: Some("p-finalize".into()), - priority: Priority::NORMAL, - payload: None, - expected_read_bytes: 0, - expected_write_bytes: 0, - parent_id: None, - fail_fast: true, - }) + .submit(&TaskSubmission::new("parent").key("p-finalize")) .await .unwrap(); @@ -1935,16 +1794,7 @@ mod tests { ); let parent_id = sched - .submit(&TaskSubmission { - task_type: "parent".into(), - dedup_key: Some("p-cancel".into()), - priority: Priority::NORMAL, - payload: None, - expected_read_bytes: 0, - expected_write_bytes: 0, - parent_id: None, - fail_fast: true, - }) + .submit(&TaskSubmission::new("parent").key("p-cancel")) .await .unwrap() .id() @@ -1969,16 +1819,7 @@ mod tests { let sched = setup(arc_erased(InstantExecutor)).await; sched - .submit(&TaskSubmission { - task_type: "test".into(), - dedup_key: Some("no-kids".into()), - priority: Priority::NORMAL, - payload: None, - expected_read_bytes: 0, - expected_write_bytes: 0, - parent_id: None, - fail_fast: true, - }) + .submit(&TaskSubmission::new("test").key("no-kids")) .await .unwrap(); diff --git a/src/scheduler/progress.rs b/src/scheduler/progress.rs index 199638d..99324c8 100644 --- a/src/scheduler/progress.rs +++ b/src/scheduler/progress.rs @@ -29,7 +29,7 @@ use super::SchedulerEvent; /// let items = vec![/* ... */]; /// for (i, item) in items.iter().enumerate() { /// // process item... -/// ctx.progress.report_fraction(i as u64 + 1, items.len() as u64, None); +/// ctx.progress().report_fraction(i as u64 + 1, items.len() as u64, None); /// } /// Ok(()) /// } @@ -39,6 +39,7 @@ pub struct ProgressReporter { task_id: i64, task_type: String, key: String, + label: String, event_tx: tokio::sync::broadcast::Sender, } @@ -47,12 +48,14 @@ impl ProgressReporter { task_id: i64, task_type: String, key: String, + label: String, event_tx: tokio::sync::broadcast::Sender, ) -> Self { Self { task_id, task_type, key, + label, event_tx, } } @@ -63,6 +66,7 @@ impl ProgressReporter { task_id: self.task_id, task_type: self.task_type.clone(), key: self.key.clone(), + label: self.label.clone(), percent: percent.clamp(0.0, 1.0), message, }); @@ -88,6 +92,7 @@ pub struct EstimatedProgress { pub task_id: i64, pub task_type: String, pub key: String, + pub label: String, /// Executor-reported progress (0.0 to 1.0), if available. pub reported_percent: Option, /// Throughput-extrapolated progress (0.0 to 1.0), if history data exists. @@ -148,6 +153,7 @@ pub(crate) async fn extrapolate( task_id: record.id, task_type: record.task_type.clone(), key: record.key.clone(), + label: record.label.clone(), reported_percent: reported, extrapolated_percent: extrapolated, percent, diff --git a/src/store.rs b/src/store.rs index 0dc3a6c..f82006f 100644 --- a/src/store.rs +++ b/src/store.rs @@ -114,16 +114,10 @@ impl Default for StoreConfig { /// let store = TaskStore::open_memory().await?; /// /// // Submit a task. -/// let sub = TaskSubmission { -/// task_type: "thumbnail".into(), -/// dedup_key: Some("photo-1".into()), -/// priority: Priority::NORMAL, -/// payload: Some(br#"{"path":"/a.jpg"}"#.to_vec()), -/// expected_read_bytes: 4096, -/// expected_write_bytes: 1024, -/// parent_id: None, -/// fail_fast: true, -/// }; +/// let sub = TaskSubmission::new("thumbnail") +/// .key("photo-1") +/// .payload_raw(br#"{"path":"/a.jpg"}"#.to_vec()) +/// .expected_io(4096, 1024); /// let outcome = store.submit(&sub).await?; /// assert!(outcome.is_inserted()); /// @@ -204,6 +198,9 @@ impl TaskStore { sqlx::raw_sql(include_str!("../migrations/001_tasks.sql")) .execute(&self.pool) .await?; + sqlx::raw_sql(include_str!("../migrations/002_add_label.sql")) + .execute(&self.pool) + .await?; Ok(()) } @@ -266,11 +263,12 @@ impl TaskStore { let fail_fast_val: i32 = if sub.fail_fast { 1 } else { 0 }; tracing::debug!(task_type = %sub.task_type, "store.submit: INSERT start"); let result = sqlx::query( - "INSERT OR IGNORE INTO tasks (task_type, key, priority, payload, expected_read_bytes, expected_write_bytes, parent_id, fail_fast) - VALUES (?, ?, ?, ?, ?, ?, ?, ?)", + "INSERT OR IGNORE INTO tasks (task_type, key, label, priority, payload, expected_read_bytes, expected_write_bytes, parent_id, fail_fast) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", ) .bind(&sub.task_type) .bind(&key) + .bind(&sub.label) .bind(priority) .bind(&sub.payload) .bind(sub.expected_read_bytes) @@ -352,11 +350,12 @@ impl TaskStore { let priority = sub.priority.value() as i32; let fail_fast_val: i32 = if sub.fail_fast { 1 } else { 0 }; let result = sqlx::query( - "INSERT OR IGNORE INTO tasks (task_type, key, priority, payload, expected_read_bytes, expected_write_bytes, parent_id, fail_fast) - VALUES (?, ?, ?, ?, ?, ?, ?, ?)", + "INSERT OR IGNORE INTO tasks (task_type, key, label, priority, payload, expected_read_bytes, expected_write_bytes, parent_id, fail_fast) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", ) .bind(&sub.task_type) .bind(&key) + .bind(&sub.label) .bind(priority) .bind(&sub.payload) .bind(sub.expected_read_bytes) @@ -513,13 +512,14 @@ impl TaskStore { // Insert into history. let fail_fast_val: i32 = if task.fail_fast { 1 } else { 0 }; sqlx::query( - "INSERT INTO task_history (task_type, key, priority, status, payload, + "INSERT INTO task_history (task_type, key, label, priority, status, payload, expected_read_bytes, expected_write_bytes, actual_read_bytes, actual_write_bytes, retry_count, last_error, created_at, started_at, duration_ms, parent_id, fail_fast) - VALUES (?, ?, ?, 'completed', ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", + VALUES (?, ?, ?, ?, 'completed', ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", ) .bind(&task.task_type) .bind(&task.key) + .bind(&task.label) .bind(task.priority.value() as i32) .bind(&task.payload) .bind(task.expected_read_bytes) @@ -624,13 +624,14 @@ impl TaskStore { let fail_fast_val: i32 = if task.fail_fast { 1 } else { 0 }; sqlx::query( - "INSERT INTO task_history (task_type, key, priority, status, payload, + "INSERT INTO task_history (task_type, key, label, priority, status, payload, expected_read_bytes, expected_write_bytes, actual_read_bytes, actual_write_bytes, retry_count, last_error, created_at, started_at, duration_ms, parent_id, fail_fast) - VALUES (?, ?, ?, 'failed', ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", + VALUES (?, ?, ?, ?, 'failed', ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", ) .bind(&task.task_type) .bind(&task.key) + .bind(&task.label) .bind(task.priority.value() as i32) .bind(&task.payload) .bind(task.expected_read_bytes) @@ -1163,6 +1164,7 @@ fn row_to_task_record(row: &sqlx::sqlite::SqliteRow) -> TaskRecord { id: row.get("id"), task_type: row.get("task_type"), key: row.get("key"), + label: row.get("label"), priority: Priority::new(priority_val as u8), status: status_str.parse().unwrap_or(TaskStatus::Pending), payload: row.get("payload"), @@ -1192,6 +1194,7 @@ fn row_to_history_record(row: &sqlx::sqlite::SqliteRow) -> TaskHistoryRecord { id: row.get("id"), task_type: row.get("task_type"), key: row.get("key"), + label: row.get("label"), priority: Priority::new(priority_val as u8), status: status_str.parse().unwrap_or(HistoryStatus::Failed), payload: row.get("payload"), @@ -1219,16 +1222,11 @@ mod tests { } fn make_submission(key: &str, priority: Priority) -> TaskSubmission { - TaskSubmission { - task_type: "test".into(), - dedup_key: Some(key.into()), - priority, - payload: Some(b"hello".to_vec()), - expected_read_bytes: 1000, - expected_write_bytes: 500, - parent_id: None, - fail_fast: true, - } + TaskSubmission::new("test") + .key(key) + .priority(priority) + .payload_raw(b"hello".to_vec()) + .expected_io(1000, 500) } #[tokio::test] @@ -1393,26 +1391,8 @@ mod tests { async fn dedup_allows_same_key_different_types() { let store = test_store().await; - let sub_a = TaskSubmission { - task_type: "type_a".into(), - dedup_key: Some("shared-key".into()), - priority: Priority::NORMAL, - payload: None, - expected_read_bytes: 0, - expected_write_bytes: 0, - parent_id: None, - fail_fast: true, - }; - let sub_b = TaskSubmission { - task_type: "type_b".into(), - dedup_key: Some("shared-key".into()), - priority: Priority::NORMAL, - payload: None, - expected_read_bytes: 0, - expected_write_bytes: 0, - parent_id: None, - fail_fast: true, - }; + let sub_a = TaskSubmission::new("type_a").key("shared-key"); + let sub_b = TaskSubmission::new("type_b").key("shared-key"); let first = store.submit(&sub_a).await.unwrap(); assert!(first.is_inserted()); @@ -1426,16 +1406,7 @@ mod tests { async fn dedup_by_payload_when_no_key() { let store = test_store().await; - let sub = TaskSubmission { - task_type: "ingest".into(), - dedup_key: None, - priority: Priority::NORMAL, - payload: Some(b"same-data".to_vec()), - expected_read_bytes: 0, - expected_write_bytes: 0, - parent_id: None, - fail_fast: true, - }; + let sub = TaskSubmission::new("ingest").payload_raw(b"same-data".to_vec()); let first = store.submit(&sub).await.unwrap(); assert!(first.is_inserted()); @@ -1445,10 +1416,7 @@ mod tests { assert_eq!(second, SubmitOutcome::Duplicate); // Different payload → no dedup. - let sub2 = TaskSubmission { - payload: Some(b"different-data".to_vec()), - ..sub.clone() - }; + let sub2 = TaskSubmission::new("ingest").payload_raw(b"different-data".to_vec()); let third = store.submit(&sub2).await.unwrap(); assert!(third.is_inserted()); } @@ -1930,16 +1898,9 @@ mod tests { async fn submit_batch_rejects_oversized_payload() { let store = test_store().await; let sub = make_submission("ok", Priority::NORMAL); - let big = TaskSubmission { - task_type: "test".into(), - dedup_key: Some("big".into()), - priority: Priority::NORMAL, - payload: Some(vec![0u8; MAX_PAYLOAD_BYTES + 1]), - expected_read_bytes: 0, - expected_write_bytes: 0, - parent_id: None, - fail_fast: true, - }; + let big = TaskSubmission::new("test") + .key("big") + .payload_raw(vec![0u8; MAX_PAYLOAD_BYTES + 1]); // The oversized payload should fail the entire batch — no partial inserts. let err = store.submit_batch(&[sub.clone(), big]).await.unwrap_err(); diff --git a/src/task.rs b/src/task.rs index 4e5ddcf..46a0f76 100644 --- a/src/task.rs +++ b/src/task.rs @@ -93,6 +93,10 @@ pub struct TaskRecord { pub id: i64, pub task_type: String, pub key: String, + /// Human-readable label for UI display. Carries the original dedup key + /// (or `task_type` if no explicit key was given). The `key` field holds + /// the SHA-256 hash used for deduplication. + pub label: String, pub priority: Priority, pub status: TaskStatus, pub payload: Option>, @@ -132,6 +136,8 @@ pub struct TaskHistoryRecord { pub id: i64, pub task_type: String, pub key: String, + /// Human-readable label for UI display (see [`TaskRecord::label`]). + pub label: String, pub priority: Priority, pub status: HistoryStatus, pub payload: Option>, @@ -276,6 +282,22 @@ pub fn generate_dedup_key(task_type: &str, payload: Option<&[u8]>) -> String { } /// Parameters for submitting a new task. +/// +/// Use the builder-style constructor [`TaskSubmission::new`] for ergonomic +/// construction with sensible defaults: +/// +/// ```ignore +/// use taskmill::{TaskSubmission, Priority}; +/// +/// let sub = TaskSubmission::new("thumbnail") +/// .key("img-001") +/// .priority(Priority::HIGH) +/// .payload_json(&my_payload)? +/// .expected_io(4096, 1024); +/// ``` +/// +/// For strongly-typed tasks, prefer [`TaskSubmission::from_typed`] or +/// [`Scheduler::submit_typed`](crate::Scheduler::submit_typed). #[derive(Debug, Clone, Serialize, Deserialize)] pub struct TaskSubmission { pub task_type: String, @@ -283,6 +305,9 @@ pub struct TaskSubmission { /// `task_type` and `payload`, so two submissions with the same type and /// payload are deduplicated automatically. pub dedup_key: Option, + /// Human-readable label for UI display. Defaults to `dedup_key` (if set) + /// or `task_type`. Override with [`label()`](Self::label) if needed. + pub label: String, pub priority: Priority, pub payload: Option>, pub expected_read_bytes: i64, @@ -298,6 +323,104 @@ pub struct TaskSubmission { } impl TaskSubmission { + /// Create a new submission with sensible defaults. + /// + /// Defaults: `Priority::NORMAL`, no payload, no dedup key, zero IO + /// estimates, no parent, fail-fast enabled. + /// + /// Chain builder methods to customise: + /// + /// ```ignore + /// TaskSubmission::new("resize") + /// .key("my-file.jpg") + /// .priority(Priority::HIGH) + /// .payload_json(&data)? + /// .expected_io(4096, 1024) + /// ``` + pub fn new(task_type: impl Into) -> Self { + let task_type = task_type.into(); + let label = task_type.clone(); + Self { + task_type, + dedup_key: None, + label, + priority: Priority::NORMAL, + payload: None, + expected_read_bytes: 0, + expected_write_bytes: 0, + parent_id: None, + fail_fast: true, + } + } + + /// Set an explicit dedup key. + /// + /// When set, deduplication is based on `hash(task_type + ":" + key)` + /// instead of the payload contents. Useful when different payloads + /// represent the same logical work (e.g. same file path with different + /// timestamps). + pub fn key(mut self, key: impl Into) -> Self { + let key = key.into(); + self.label = key.clone(); + self.dedup_key = Some(key); + self + } + + /// Override the display label. + /// + /// By default the label is derived from the dedup key (if set via + /// [`key()`](Self::key)) or the task type. Use this to provide a + /// custom human-readable description for UI display. + pub fn label(mut self, label: impl Into) -> Self { + self.label = label.into(); + self + } + + /// Set the scheduling priority. Default: [`Priority::NORMAL`]. + pub fn priority(mut self, priority: Priority) -> Self { + self.priority = priority; + self + } + + /// Set the payload from a serializable value (JSON-encoded). + /// + /// Returns an error if serialization fails. The payload can be + /// deserialized in the executor via [`TaskContext::payload`](crate::TaskContext::payload). + pub fn payload_json( + mut self, + data: &T, + ) -> Result { + self.payload = Some(serde_json::to_vec(data)?); + Ok(self) + } + + /// Set the payload from raw bytes. + pub fn payload_raw(mut self, data: Vec) -> Self { + self.payload = Some(data); + self + } + + /// Set expected IO bytes for budget-based scheduling. + /// + /// The scheduler uses these estimates to avoid saturating disk throughput + /// when [resource monitoring](crate::SchedulerBuilder::with_resource_monitoring) + /// is enabled. Default: 0 for both. + pub fn expected_io(mut self, read_bytes: i64, write_bytes: i64) -> Self { + self.expected_read_bytes = read_bytes; + self.expected_write_bytes = write_bytes; + self + } + + /// Set the fail-fast flag for parent tasks that spawn children. + /// + /// When `true` (the default), the first child failure cancels siblings + /// and fails the parent immediately. When `false`, the parent waits for + /// all children to finish before resolving. + pub fn fail_fast(mut self, fail_fast: bool) -> Self { + self.fail_fast = fail_fast; + self + } + /// Resolve the effective dedup key. Always incorporates the task type /// so different task types never collide, even with the same logical key. /// @@ -314,6 +437,10 @@ impl TaskSubmission { /// /// The dedup key is auto-generated from the task type and serialized payload. /// Use `TaskRecord::deserialize_payload()` on the executor side to recover the type. + #[deprecated( + since = "2.0.0", + note = "use `TaskSubmission::new(task_type).payload_json(&data)?.priority(p).expected_io(r, w)` instead" + )] pub fn with_payload( task_type: &str, priority: Priority, @@ -321,17 +448,10 @@ impl TaskSubmission { expected_read_bytes: i64, expected_write_bytes: i64, ) -> Result { - let payload = serde_json::to_vec(data)?; - Ok(Self { - task_type: task_type.to_string(), - dedup_key: None, - priority, - payload: Some(payload), - expected_read_bytes, - expected_write_bytes, - parent_id: None, - fail_fast: true, - }) + Ok(Self::new(task_type) + .priority(priority) + .payload_json(data)? + .expected_io(expected_read_bytes, expected_write_bytes)) } } @@ -384,17 +504,10 @@ impl TaskSubmission { /// Create a submission from a [`TypedTask`], serializing the payload and /// pulling task type, priority, and IO estimates from the trait. pub fn from_typed(task: &T) -> Result { - let payload = serde_json::to_vec(task)?; - Ok(Self { - task_type: T::TASK_TYPE.to_string(), - dedup_key: None, - priority: task.priority(), - payload: Some(payload), - expected_read_bytes: task.expected_read_bytes(), - expected_write_bytes: task.expected_write_bytes(), - parent_id: None, - fail_fast: true, - }) + Ok(Self::new(T::TASK_TYPE) + .priority(task.priority()) + .payload_json(task)? + .expected_io(task.expected_read_bytes(), task.expected_write_bytes())) } }