From 24687480a8988089429101faf588e165eb0bf35b Mon Sep 17 00:00:00 2001 From: DJ Majumdar Date: Sat, 14 Mar 2026 13:08:22 -0700 Subject: [PATCH 1/3] feat: add bulk task submission enhancements (#16) Add BatchOutcome summary, BatchSubmission builder with batch-wide defaults, SchedulerEvent::BatchSubmitted for observability, intra-batch last-wins dedup, and transaction chunking for large batches (>10k). --- src/lib.rs | 5 +- src/scheduler/event.rs | 11 ++- src/scheduler/submit.rs | 37 +++++++-- src/scheduler/tests.rs | 69 +++++++++++++++++ src/store/submit.rs | 88 +++++++++++++++++++-- src/task/mod.rs | 2 +- src/task/submission.rs | 168 ++++++++++++++++++++++++++++++++++++++++ src/task/tests.rs | 38 ++++++++- 8 files changed, 402 insertions(+), 16 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index b998884..25af5e2 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -372,8 +372,9 @@ pub use scheduler::{ }; pub use store::{RetentionPolicy, StoreConfig, StoreError, TaskStore}; pub use task::{ - generate_dedup_key, HistoryStatus, IoBudget, ParentResolution, SubmitOutcome, TaskError, - TaskHistoryRecord, TaskLookup, TaskRecord, TaskStatus, TaskSubmission, TypeStats, TypedTask, + generate_dedup_key, BatchOutcome, BatchSubmission, HistoryStatus, IoBudget, ParentResolution, + SubmitOutcome, TaskError, TaskHistoryRecord, TaskLookup, TaskRecord, TaskStatus, TaskSubmission, + TypeStats, TypedTask, }; #[cfg(feature = "sysinfo-monitor")] diff --git a/src/scheduler/event.rs b/src/scheduler/event.rs index 1693cb3..a3bf94e 100644 --- a/src/scheduler/event.rs +++ b/src/scheduler/event.rs @@ -86,6 +86,13 @@ pub enum SchedulerEvent { /// A parent task entered the waiting state after its executor returned /// and it has active children. Waiting { task_id: i64, children_count: i64 }, + /// A batch of tasks was submitted. + BatchSubmitted { + /// Number of tasks in the batch (total input count). + count: usize, + /// Task IDs that were inserted (new tasks only, not upgrades/requeues). + inserted_ids: Vec, + }, /// The scheduler was globally paused via [`Scheduler::pause_all`]. Paused, /// The scheduler was resumed via [`Scheduler::resume_all`]. @@ -100,7 +107,9 @@ impl SchedulerEvent { Some(h) } Self::Failed { header, .. } | Self::Progress { header, .. } => Some(header), - Self::Waiting { .. } | Self::Paused | Self::Resumed => None, + Self::Waiting { .. } | Self::BatchSubmitted { .. } | Self::Paused | Self::Resumed => { + None + } } } } diff --git a/src/scheduler/submit.rs b/src/scheduler/submit.rs index 4d7b393..c6d7494 100644 --- a/src/scheduler/submit.rs +++ b/src/scheduler/submit.rs @@ -2,7 +2,10 @@ use crate::priority::Priority; use crate::store::StoreError; -use crate::task::{generate_dedup_key, SubmitOutcome, TaskLookup, TaskSubmission, TypedTask}; +use crate::task::{ + generate_dedup_key, BatchOutcome, BatchSubmission, SubmitOutcome, TaskLookup, TaskSubmission, + TypedTask, +}; use super::{Scheduler, SchedulerEvent}; @@ -33,12 +36,14 @@ impl Scheduler { /// Submit multiple tasks in a single SQLite transaction. /// - /// Preemption is triggered once at the end if any inserted or upgraded - /// task has high enough priority. + /// Returns a [`BatchOutcome`] with per-task results and convenience + /// accessors. Preemption is triggered once at the end if any inserted + /// or upgraded task has high enough priority. A [`SchedulerEvent::BatchSubmitted`] + /// event is emitted when at least one task was changed. pub async fn submit_batch( &self, submissions: &[TaskSubmission], - ) -> Result, StoreError> { + ) -> Result { let results = self.inner.store.submit_batch(submissions).await?; // Find the highest (lowest numeric value) priority among tasks that @@ -63,11 +68,33 @@ impl Scheduler { } } + let outcome = BatchOutcome { outcomes: results }; + if any_changed { + let inserted_ids = outcome.inserted(); + let _ = self + .inner + .event_tx + .send(SchedulerEvent::BatchSubmitted { + count: submissions.len(), + inserted_ids, + }); + self.inner.work_notify.notify_one(); } - Ok(results) + Ok(outcome) + } + + /// Submit a batch built with [`BatchSubmission`]. + /// + /// Applies the builder's defaults and delegates to [`submit_batch`](Self::submit_batch). + pub async fn submit_built( + &self, + batch: BatchSubmission, + ) -> Result { + let submissions = batch.build(); + self.submit_batch(&submissions).await } /// Submit a [`TypedTask`], handling serialization automatically. diff --git a/src/scheduler/tests.rs b/src/scheduler/tests.rs index d3df98f..6baa7a6 100644 --- a/src/scheduler/tests.rs +++ b/src/scheduler/tests.rs @@ -6,6 +6,7 @@ use tokio_util::sync::CancellationToken; use crate::backpressure::{CompositePressure, ThrottlePolicy}; use crate::registry::{TaskContext, TaskExecutor, TaskTypeRegistry}; use crate::store::TaskStore; +use crate::priority::Priority; use crate::task::{SubmitOutcome, TaskError, TaskSubmission}; use super::{Scheduler, SchedulerConfig, SchedulerEvent, TaskProgress}; @@ -891,3 +892,71 @@ async fn byte_progress_in_snapshot() { assert_eq!(snap.byte_progress[0].bytes_total, Some(1_048_576)); assert!(snap.byte_progress[0].bytes_completed > 0); } + +#[tokio::test] +async fn batch_submitted_event() { + let sched = setup(arc_erased(InstantExecutor)).await; + let mut rx = sched.subscribe(); + + let subs: Vec<_> = (0..3) + .map(|i| TaskSubmission::new("test").key(format!("ev-{i}"))) + .collect(); + + let outcome = sched.submit_batch(&subs).await.unwrap(); + assert_eq!(outcome.len(), 3); + assert_eq!(outcome.inserted().len(), 3); + assert_eq!(outcome.duplicated_count(), 0); + + // The BatchSubmitted event should be receivable. + let event = rx.try_recv().unwrap(); + match event { + SchedulerEvent::BatchSubmitted { + count, + inserted_ids, + } => { + assert_eq!(count, 3); + assert_eq!(inserted_ids.len(), 3); + } + other => panic!("expected BatchSubmitted, got {other:?}"), + } +} + +#[tokio::test] +async fn batch_outcome_convenience_methods() { + let sched = setup(arc_erased(InstantExecutor)).await; + + // Submit one task first so re-submitting it produces a Duplicate. + sched + .submit(&TaskSubmission::new("test").key("existing")) + .await + .unwrap(); + + let subs = vec![ + TaskSubmission::new("test").key("new-1"), + TaskSubmission::new("test").key("existing"), + TaskSubmission::new("test").key("new-2"), + ]; + + let outcome = sched.submit_batch(&subs).await.unwrap(); + assert_eq!(outcome.len(), 3); + assert_eq!(outcome.inserted().len(), 2); + assert_eq!(outcome.duplicated_count(), 1); + assert!(outcome.upgraded().is_empty()); + assert!(outcome.requeued().is_empty()); +} + +#[tokio::test] +async fn submit_built_applies_defaults() { + use crate::task::BatchSubmission; + + let sched = setup(arc_erased(InstantExecutor)).await; + + let batch = BatchSubmission::new() + .default_group("g1") + .default_priority(Priority::HIGH) + .task(TaskSubmission::new("test").key("built-1")) + .task(TaskSubmission::new("test").key("built-2")); + + let outcome = sched.submit_built(batch).await.unwrap(); + assert_eq!(outcome.inserted().len(), 2); +} diff --git a/src/store/submit.rs b/src/store/submit.rs index 842c5de..7f7d627 100644 --- a/src/store/submit.rs +++ b/src/store/submit.rs @@ -1,11 +1,18 @@ //! Task submission: deduplication, priority upgrade, and requeue logic. +use std::collections::HashMap; + use sqlx::Row; use crate::task::{SubmitOutcome, TaskSubmission, MAX_PAYLOAD_BYTES}; use super::{StoreError, TaskStore}; +/// Maximum number of tasks per transaction chunk. Batches larger than this +/// are split into multiple transactions to avoid holding the SQLite write +/// lock for too long. +const BATCH_CHUNK_SIZE: usize = 10_000; + /// Core dedup logic for a single task submission within an existing connection. /// /// Performs the three-step dedup: INSERT OR IGNORE → upgrade priority on @@ -112,6 +119,15 @@ impl TaskStore { /// This is significantly faster than calling [`submit`](Self::submit) in a /// loop because all inserts share a single SQLite transaction (one /// `BEGIN`/`COMMIT` pair instead of N implicit transactions). + /// + /// **Intra-batch dedup:** When multiple tasks in the same batch share a + /// dedup key, only the last occurrence is submitted (last-wins). Earlier + /// duplicates receive [`SubmitOutcome::Duplicate`]. + /// + /// **Chunking:** Batches larger than 10,000 tasks are split into + /// sub-transactions to avoid holding the SQLite write lock for too long. + /// This means very large batches are not fully atomic, but task submission + /// is idempotent so re-submitting after a partial failure is safe. pub async fn submit_batch( &self, submissions: &[TaskSubmission], @@ -126,14 +142,31 @@ impl TaskStore { } } + // Intra-batch dedup: last-wins. Map each effective key to its last + // occurrence index so earlier duplicates are skipped. + let mut last_occurrence: HashMap = HashMap::new(); + for (i, sub) in submissions.iter().enumerate() { + last_occurrence.insert(sub.effective_key(), i); + } + let mut results = Vec::with_capacity(submissions.len()); - let mut conn = self.begin_write().await?; - for sub in submissions { - results.push(submit_one(&mut conn, sub).await?); + for chunk in submissions.chunks(BATCH_CHUNK_SIZE) { + let chunk_offset = results.len(); + let mut conn = self.begin_write().await?; + + for (i, sub) in chunk.iter().enumerate() { + let global_i = chunk_offset + i; + if last_occurrence[&sub.effective_key()] != global_i { + results.push(SubmitOutcome::Duplicate); + } else { + results.push(submit_one(&mut conn, sub).await?); + } + } + + sqlx::query("COMMIT").execute(&mut *conn).await?; } - sqlx::query("COMMIT").execute(&mut *conn).await?; Ok(results) } } @@ -357,13 +390,16 @@ mod tests { let store = test_store().await; let sub = make_submission("dup", Priority::NORMAL); + // Intra-batch dedup: last-wins, so the first is Duplicate and the + // second (last occurrence) is Inserted. let results = store .submit_batch(&[sub.clone(), sub.clone()]) .await .unwrap(); - assert!(results[0].is_inserted()); - assert_eq!(results[1], SubmitOutcome::Duplicate); + assert_eq!(results[0], SubmitOutcome::Duplicate); + assert!(results[1].is_inserted()); + // Re-submitting the same key hits the DB-level dedup. let results = store.submit_batch(&[sub]).await.unwrap(); assert_eq!(results[0], SubmitOutcome::Duplicate); } @@ -375,6 +411,46 @@ mod tests { assert!(results.is_empty()); } + #[tokio::test] + async fn submit_batch_intra_dedup_last_wins() { + let store = test_store().await; + + // Two tasks with the same dedup key but different priorities. + // Last-wins: the second task (HIGH) should be inserted, first skipped. + let sub_normal = make_submission("same-key", Priority::NORMAL); + let sub_high = make_submission("same-key", Priority::HIGH); + + let results = store + .submit_batch(&[sub_normal.clone(), sub_high.clone()]) + .await + .unwrap(); + assert_eq!(results[0], SubmitOutcome::Duplicate); + assert!(results[1].is_inserted()); + + // Verify the stored task has the second task's priority. + let key = sub_normal.effective_key(); + let task = store.task_by_key(&key).await.unwrap().unwrap(); + assert_eq!(task.priority, Priority::HIGH); + } + + #[tokio::test] + async fn submit_batch_large_chunking() { + use super::BATCH_CHUNK_SIZE; + + let store = test_store().await; + let count = BATCH_CHUNK_SIZE + 100; + let subs: Vec<_> = (0..count) + .map(|i| make_submission(&format!("chunk-{i}"), Priority::NORMAL)) + .collect(); + + let results = store.submit_batch(&subs).await.unwrap(); + assert_eq!(results.len(), count); + assert!(results.iter().all(|r| r.is_inserted())); + + let pending = store.pending_count().await.unwrap(); + assert_eq!(pending, count as i64); + } + #[tokio::test] async fn submit_batch_rejects_oversized_payload() { use crate::store::StoreError; diff --git a/src/task/mod.rs b/src/task/mod.rs index 5d72263..45bd52e 100644 --- a/src/task/mod.rs +++ b/src/task/mod.rs @@ -24,7 +24,7 @@ use crate::priority::Priority; pub use dedup::{generate_dedup_key, MAX_PAYLOAD_BYTES}; pub use error::TaskError; -pub use submission::{SubmitOutcome, TaskSubmission}; +pub use submission::{BatchOutcome, BatchSubmission, SubmitOutcome, TaskSubmission}; pub use typed::TypedTask; /// Lifecycle state of a task in the active queue. diff --git a/src/task/submission.rs b/src/task/submission.rs index 6505017..ffd4212 100644 --- a/src/task/submission.rs +++ b/src/task/submission.rs @@ -36,6 +36,174 @@ impl SubmitOutcome { } } +/// Summary of a batch submission. +/// +/// Wraps the per-task [`SubmitOutcome`] results in input order (1:1 with +/// submissions) and provides convenience accessors for categorized views. +#[derive(Debug, Clone)] +pub struct BatchOutcome { + /// Per-task results, in input order (1:1 with submissions). + pub outcomes: Vec, +} + +impl BatchOutcome { + /// Collect task IDs from [`SubmitOutcome::Inserted`] outcomes. + pub fn inserted(&self) -> Vec { + self.outcomes + .iter() + .filter_map(|o| match o { + SubmitOutcome::Inserted(id) => Some(*id), + _ => None, + }) + .collect() + } + + /// Collect task IDs from [`SubmitOutcome::Upgraded`] outcomes. + pub fn upgraded(&self) -> Vec { + self.outcomes + .iter() + .filter_map(|o| match o { + SubmitOutcome::Upgraded(id) => Some(*id), + _ => None, + }) + .collect() + } + + /// Collect task IDs from [`SubmitOutcome::Requeued`] outcomes. + pub fn requeued(&self) -> Vec { + self.outcomes + .iter() + .filter_map(|o| match o { + SubmitOutcome::Requeued(id) => Some(*id), + _ => None, + }) + .collect() + } + + /// Count [`SubmitOutcome::Duplicate`] outcomes. + pub fn duplicated_count(&self) -> usize { + self.outcomes + .iter() + .filter(|o| matches!(o, SubmitOutcome::Duplicate)) + .count() + } + + /// Number of outcomes. + pub fn len(&self) -> usize { + self.outcomes.len() + } + + /// Returns `true` if there are no outcomes. + pub fn is_empty(&self) -> bool { + self.outcomes.is_empty() + } + + /// Iterate over outcomes. + pub fn iter(&self) -> std::slice::Iter<'_, SubmitOutcome> { + self.outcomes.iter() + } +} + +impl<'a> IntoIterator for &'a BatchOutcome { + type Item = &'a SubmitOutcome; + type IntoIter = std::slice::Iter<'a, SubmitOutcome>; + + fn into_iter(self) -> Self::IntoIter { + self.outcomes.iter() + } +} + +impl IntoIterator for BatchOutcome { + type Item = SubmitOutcome; + type IntoIter = std::vec::IntoIter; + + fn into_iter(self) -> Self::IntoIter { + self.outcomes.into_iter() + } +} + +/// Builder for batch submissions with shared defaults. +/// +/// Allows setting batch-wide defaults for group and priority that are applied +/// to tasks that don't have explicit values: +/// +/// ```ignore +/// use taskmill::{BatchSubmission, TaskSubmission, Priority}; +/// +/// let submissions = BatchSubmission::new() +/// .default_group("s3://my-bucket") +/// .default_priority(Priority::HIGH) +/// .task(TaskSubmission::new("upload").key("file-1")) +/// .task(TaskSubmission::new("upload").key("file-2").priority(Priority::REALTIME)) +/// .build(); +/// ``` +pub struct BatchSubmission { + default_group: Option, + default_priority: Option, + tasks: Vec, +} + +impl BatchSubmission { + /// Create a new empty batch submission builder. + pub fn new() -> Self { + Self { + default_group: None, + default_priority: None, + tasks: Vec::new(), + } + } + + /// Set a default group key applied to tasks without an explicit group. + pub fn default_group(mut self, group: impl Into) -> Self { + self.default_group = Some(group.into()); + self + } + + /// Set a default priority applied to tasks still at [`Priority::NORMAL`]. + pub fn default_priority(mut self, priority: Priority) -> Self { + self.default_priority = Some(priority); + self + } + + /// Add a single task to the batch. + pub fn task(mut self, sub: TaskSubmission) -> Self { + self.tasks.push(sub); + self + } + + /// Add multiple tasks to the batch. + pub fn tasks(mut self, iter: impl IntoIterator) -> Self { + self.tasks.extend(iter); + self + } + + /// Apply defaults and return the final submissions. + /// + /// - If a task has no `group_key` and `default_group` is set, the default is applied. + /// - If a task has `Priority::NORMAL` (the default) and `default_priority` is set, it is overridden. + pub fn build(mut self) -> Vec { + for task in &mut self.tasks { + if task.group_key.is_none() { + if let Some(ref group) = self.default_group { + task.group_key = Some(group.clone()); + } + } + if task.priority == Priority::NORMAL { + if let Some(priority) = self.default_priority { + task.priority = priority; + } + } + } + self.tasks + } +} + +impl Default for BatchSubmission { + fn default() -> Self { + Self::new() + } +} + /// Parameters for submitting a new task. /// /// Use the builder-style constructor [`TaskSubmission::new`] for ergonomic diff --git a/src/task/tests.rs b/src/task/tests.rs index 4a03fde..03abec3 100644 --- a/src/task/tests.rs +++ b/src/task/tests.rs @@ -2,7 +2,7 @@ use serde::{Deserialize, Serialize}; use crate::priority::Priority; -use super::{IoBudget, TaskSubmission, TypedTask}; +use super::{BatchSubmission, IoBudget, TaskSubmission, TypedTask}; #[derive(Serialize, Deserialize, Debug, PartialEq)] struct Thumbnail { @@ -142,3 +142,39 @@ fn typed_task_key_and_label() { assert_eq!(sub.dedup_key.as_deref(), Some("/a.txt")); assert_eq!(sub.label, "Process /a.txt"); } + +#[test] +fn batch_submission_builder_defaults() { + let subs = BatchSubmission::new() + .default_group("g1") + .default_priority(Priority::HIGH) + .task(TaskSubmission::new("test").key("a")) + .task(TaskSubmission::new("test").key("b").priority(Priority::REALTIME)) + .task(TaskSubmission::new("test").key("c").group("custom-group")) + .build(); + + assert_eq!(subs.len(), 3); + + // Task without explicit group/priority gets defaults. + assert_eq!(subs[0].group_key.as_deref(), Some("g1")); + assert_eq!(subs[0].priority, Priority::HIGH); + + // Task with explicit priority (non-NORMAL) keeps its own. + assert_eq!(subs[1].group_key.as_deref(), Some("g1")); + assert_eq!(subs[1].priority, Priority::REALTIME); + + // Task with explicit group keeps its own. + assert_eq!(subs[2].group_key.as_deref(), Some("custom-group")); + assert_eq!(subs[2].priority, Priority::HIGH); +} + +#[test] +fn batch_submission_builder_no_defaults() { + let subs = BatchSubmission::new() + .task(TaskSubmission::new("test").key("a")) + .build(); + + assert_eq!(subs.len(), 1); + assert!(subs[0].group_key.is_none()); + assert_eq!(subs[0].priority, Priority::NORMAL); +} From 4447864569aaa05aa80d4ec0ac6ce671f7680c40 Mon Sep 17 00:00:00 2001 From: DJ Majumdar Date: Sat, 14 Mar 2026 13:22:22 -0700 Subject: [PATCH 2/3] docs: document batch submission APIs and update module docs --- src/lib.rs | 37 ++++++++++++++++++++++++++++++++++++- src/scheduler/submit.rs | 2 +- src/store/submit.rs | 3 ++- src/task/mod.rs | 13 ++++++++----- src/task/submission.rs | 4 +++- 5 files changed, 50 insertions(+), 9 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 25af5e2..3ac0209 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -12,6 +12,7 @@ //! - Preempts lower-priority work when high-priority tasks arrive //! - [Retries](TaskError::retryable) failed tasks at the same priority level //! - Records completed/failed [task history](TaskHistoryRecord) for queries and IO learning +//! - Supports [batch submission](Scheduler::submit_batch) with intra-batch dedup and chunking //! - Emits [lifecycle events](SchedulerEvent) including progress for UI integration //! - Reports [byte-level transfer progress](TaskProgress) with EWMA-smoothed throughput and ETA //! - Supports [graceful shutdown](ShutdownMode) with configurable drain timeout @@ -28,7 +29,9 @@ //! ↘ failed (permanent → history) //! ``` //! -//! 1. **Submit** — [`Scheduler::submit`] (or [`submit_typed`](Scheduler::submit_typed)) +//! 1. **Submit** — [`Scheduler::submit`] (or [`submit_typed`](Scheduler::submit_typed), +//! [`submit_batch`](Scheduler::submit_batch), +//! [`submit_built`](Scheduler::submit_built)) //! enqueues a [`TaskSubmission`] into the SQLite store. //! 2. **Pending** — the task waits in a priority queue. The scheduler's run loop //! pops the highest-priority pending task on each tick. @@ -48,6 +51,10 @@ //! [`Upgraded`](SubmitOutcome::Upgraded) if the new submission has higher //! priority). This makes it safe to call `submit` idempotently. //! +//! Within a single [`submit_batch`](Scheduler::submit_batch) call, intra-batch +//! dedup applies a **last-wins** policy: if two tasks share a dedup key, only +//! the last occurrence is submitted and earlier ones receive `Duplicate`. +//! //! ## Priority & preemption //! //! [`Priority`] is a `u8` newtype where **lower values = higher priority**. @@ -293,6 +300,34 @@ //! scheduler.remove_group_limit("s3://my-bucket"); // fall back to default //! ``` //! +//! ## Batch submission +//! +//! Submit many tasks at once with [`Scheduler::submit_batch`] for better +//! throughput (single SQLite transaction instead of N). Use +//! [`BatchSubmission`] to set batch-wide defaults and [`BatchOutcome`] to +//! inspect results: +//! +//! ```ignore +//! use taskmill::{BatchSubmission, TaskSubmission, Priority}; +//! +//! let batch = BatchSubmission::new() +//! .default_group("s3://my-bucket") +//! .default_priority(Priority::HIGH) +//! .task(TaskSubmission::new("upload").key("file-1").payload_json(&p1)) +//! .task(TaskSubmission::new("upload").key("file-2").payload_json(&p2)); +//! +//! let outcome = scheduler.submit_built(batch).await?; +//! println!("inserted: {:?}, dupes: {}", outcome.inserted(), outcome.duplicated_count()); +//! ``` +//! +//! Batches with duplicate dedup keys use a **last-wins** policy — only the +//! final occurrence is submitted. Batches larger than 10,000 tasks are +//! automatically chunked into sub-transactions to avoid holding the SQLite +//! write lock for too long. +//! +//! A [`SchedulerEvent::BatchSubmitted`] event is emitted for observability +//! whenever at least one task in the batch was inserted. +//! //! ## Child tasks //! //! Spawn child tasks from an executor to model fan-out work. The parent diff --git a/src/scheduler/submit.rs b/src/scheduler/submit.rs index c6d7494..ba16d81 100644 --- a/src/scheduler/submit.rs +++ b/src/scheduler/submit.rs @@ -1,4 +1,4 @@ -//! Task submission, lookup, and cancellation. +//! Task submission (single, batch, typed), lookup, and cancellation. use crate::priority::Priority; use crate::store::StoreError; diff --git a/src/store/submit.rs b/src/store/submit.rs index 7f7d627..e787257 100644 --- a/src/store/submit.rs +++ b/src/store/submit.rs @@ -1,4 +1,5 @@ -//! Task submission: deduplication, priority upgrade, and requeue logic. +//! Task submission: deduplication, priority upgrade, requeue logic, +//! intra-batch last-wins dedup, and transaction chunking for large batches. use std::collections::HashMap; diff --git a/src/task/mod.rs b/src/task/mod.rs index 45bd52e..245838e 100644 --- a/src/task/mod.rs +++ b/src/task/mod.rs @@ -1,12 +1,15 @@ //! Task types, submission parameters, and the [`TypedTask`] trait. //! //! This module defines the data structures that flow through the scheduler: -//! [`TaskSubmission`] for enqueuing work, [`TaskRecord`] for in-flight tasks, -//! [`TaskHistoryRecord`] for completed/failed results, and [`TypedTask`] for -//! strongly-typed task payloads with built-in serialization. +//! [`TaskSubmission`] for enqueuing work, [`BatchSubmission`] for building +//! batches with shared defaults, [`BatchOutcome`] for categorized batch +//! results, [`TaskRecord`] for in-flight tasks, [`TaskHistoryRecord`] for +//! completed/failed results, and [`TypedTask`] for strongly-typed task +//! payloads with built-in serialization. //! -//! Submit tasks via [`Scheduler::submit`](crate::Scheduler::submit) or -//! [`Scheduler::submit_typed`](crate::Scheduler::submit_typed). Executors +//! Submit tasks via [`Scheduler::submit`](crate::Scheduler::submit), +//! [`Scheduler::submit_typed`](crate::Scheduler::submit_typed), or +//! [`Scheduler::submit_batch`](crate::Scheduler::submit_batch). Executors //! receive a [`TaskContext`](crate::TaskContext) with the deserialized record //! and report results via [`TaskError`]. diff --git a/src/task/submission.rs b/src/task/submission.rs index ffd4212..0369e7a 100644 --- a/src/task/submission.rs +++ b/src/task/submission.rs @@ -1,4 +1,6 @@ -//! [`TaskSubmission`] — parameters for submitting a new task, and [`SubmitOutcome`]. +//! Task submission types: [`TaskSubmission`] for single tasks, [`BatchSubmission`] +//! for building batches with shared defaults, [`SubmitOutcome`] for per-task +//! results, and [`BatchOutcome`] for categorized batch summaries. use serde::{Deserialize, Serialize}; From 5c113212abe07d51942b30e4040e4849fd7385cd Mon Sep 17 00:00:00 2001 From: DJ Majumdar Date: Sat, 14 Mar 2026 13:26:57 -0700 Subject: [PATCH 3/3] style: apply formatting fixes --- src/lib.rs | 4 ++-- src/scheduler/submit.rs | 16 +++++----------- src/scheduler/tests.rs | 2 +- src/task/tests.rs | 6 +++++- 4 files changed, 13 insertions(+), 15 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 3ac0209..d3efe1d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -408,8 +408,8 @@ pub use scheduler::{ pub use store::{RetentionPolicy, StoreConfig, StoreError, TaskStore}; pub use task::{ generate_dedup_key, BatchOutcome, BatchSubmission, HistoryStatus, IoBudget, ParentResolution, - SubmitOutcome, TaskError, TaskHistoryRecord, TaskLookup, TaskRecord, TaskStatus, TaskSubmission, - TypeStats, TypedTask, + SubmitOutcome, TaskError, TaskHistoryRecord, TaskLookup, TaskRecord, TaskStatus, + TaskSubmission, TypeStats, TypedTask, }; #[cfg(feature = "sysinfo-monitor")] diff --git a/src/scheduler/submit.rs b/src/scheduler/submit.rs index ba16d81..b5e2262 100644 --- a/src/scheduler/submit.rs +++ b/src/scheduler/submit.rs @@ -72,13 +72,10 @@ impl Scheduler { if any_changed { let inserted_ids = outcome.inserted(); - let _ = self - .inner - .event_tx - .send(SchedulerEvent::BatchSubmitted { - count: submissions.len(), - inserted_ids, - }); + let _ = self.inner.event_tx.send(SchedulerEvent::BatchSubmitted { + count: submissions.len(), + inserted_ids, + }); self.inner.work_notify.notify_one(); } @@ -89,10 +86,7 @@ impl Scheduler { /// Submit a batch built with [`BatchSubmission`]. /// /// Applies the builder's defaults and delegates to [`submit_batch`](Self::submit_batch). - pub async fn submit_built( - &self, - batch: BatchSubmission, - ) -> Result { + pub async fn submit_built(&self, batch: BatchSubmission) -> Result { let submissions = batch.build(); self.submit_batch(&submissions).await } diff --git a/src/scheduler/tests.rs b/src/scheduler/tests.rs index 6baa7a6..fff8441 100644 --- a/src/scheduler/tests.rs +++ b/src/scheduler/tests.rs @@ -4,9 +4,9 @@ use tokio::time::Duration; use tokio_util::sync::CancellationToken; use crate::backpressure::{CompositePressure, ThrottlePolicy}; +use crate::priority::Priority; use crate::registry::{TaskContext, TaskExecutor, TaskTypeRegistry}; use crate::store::TaskStore; -use crate::priority::Priority; use crate::task::{SubmitOutcome, TaskError, TaskSubmission}; use super::{Scheduler, SchedulerConfig, SchedulerEvent, TaskProgress}; diff --git a/src/task/tests.rs b/src/task/tests.rs index 03abec3..166c158 100644 --- a/src/task/tests.rs +++ b/src/task/tests.rs @@ -149,7 +149,11 @@ fn batch_submission_builder_defaults() { .default_group("g1") .default_priority(Priority::HIGH) .task(TaskSubmission::new("test").key("a")) - .task(TaskSubmission::new("test").key("b").priority(Priority::REALTIME)) + .task( + TaskSubmission::new("test") + .key("b") + .priority(Priority::REALTIME), + ) .task(TaskSubmission::new("test").key("c").group("custom-group")) .build();