Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 39 additions & 3 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
//! - Preempts lower-priority work when high-priority tasks arrive
//! - [Retries](TaskError::retryable) failed tasks at the same priority level
//! - Records completed/failed [task history](TaskHistoryRecord) for queries and IO learning
//! - Supports [batch submission](Scheduler::submit_batch) with intra-batch dedup and chunking
//! - Emits [lifecycle events](SchedulerEvent) including progress for UI integration
//! - Reports [byte-level transfer progress](TaskProgress) with EWMA-smoothed throughput and ETA
//! - Supports [graceful shutdown](ShutdownMode) with configurable drain timeout
Expand All @@ -28,7 +29,9 @@
//! ↘ failed (permanent → history)
//! ```
//!
//! 1. **Submit** — [`Scheduler::submit`] (or [`submit_typed`](Scheduler::submit_typed))
//! 1. **Submit** — [`Scheduler::submit`] (or [`submit_typed`](Scheduler::submit_typed),
//! [`submit_batch`](Scheduler::submit_batch),
//! [`submit_built`](Scheduler::submit_built))
//! enqueues a [`TaskSubmission`] into the SQLite store.
//! 2. **Pending** — the task waits in a priority queue. The scheduler's run loop
//! pops the highest-priority pending task on each tick.
Expand All @@ -48,6 +51,10 @@
//! [`Upgraded`](SubmitOutcome::Upgraded) if the new submission has higher
//! priority). This makes it safe to call `submit` idempotently.
//!
//! Within a single [`submit_batch`](Scheduler::submit_batch) call, intra-batch
//! dedup applies a **last-wins** policy: if two tasks share a dedup key, only
//! the last occurrence is submitted and earlier ones receive `Duplicate`.
//!
//! ## Priority & preemption
//!
//! [`Priority`] is a `u8` newtype where **lower values = higher priority**.
Expand Down Expand Up @@ -293,6 +300,34 @@
//! scheduler.remove_group_limit("s3://my-bucket"); // fall back to default
//! ```
//!
//! ## Batch submission
//!
//! Submit many tasks at once with [`Scheduler::submit_batch`] for better
//! throughput (single SQLite transaction instead of N). Use
//! [`BatchSubmission`] to set batch-wide defaults and [`BatchOutcome`] to
//! inspect results:
//!
//! ```ignore
//! use taskmill::{BatchSubmission, TaskSubmission, Priority};
//!
//! let batch = BatchSubmission::new()
//! .default_group("s3://my-bucket")
//! .default_priority(Priority::HIGH)
//! .task(TaskSubmission::new("upload").key("file-1").payload_json(&p1))
//! .task(TaskSubmission::new("upload").key("file-2").payload_json(&p2));
//!
//! let outcome = scheduler.submit_built(batch).await?;
//! println!("inserted: {:?}, dupes: {}", outcome.inserted(), outcome.duplicated_count());
//! ```
//!
//! Batches with duplicate dedup keys use a **last-wins** policy — only the
//! final occurrence is submitted. Batches larger than 10,000 tasks are
//! automatically chunked into sub-transactions to avoid holding the SQLite
//! write lock for too long.
//!
//! A [`SchedulerEvent::BatchSubmitted`] event is emitted for observability
//! whenever at least one task in the batch was inserted.
//!
//! ## Child tasks
//!
//! Spawn child tasks from an executor to model fan-out work. The parent
Expand Down Expand Up @@ -372,8 +407,9 @@ pub use scheduler::{
};
pub use store::{RetentionPolicy, StoreConfig, StoreError, TaskStore};
pub use task::{
generate_dedup_key, HistoryStatus, IoBudget, ParentResolution, SubmitOutcome, TaskError,
TaskHistoryRecord, TaskLookup, TaskRecord, TaskStatus, TaskSubmission, TypeStats, TypedTask,
generate_dedup_key, BatchOutcome, BatchSubmission, HistoryStatus, IoBudget, ParentResolution,
SubmitOutcome, TaskError, TaskHistoryRecord, TaskLookup, TaskRecord, TaskStatus,
TaskSubmission, TypeStats, TypedTask,
};

#[cfg(feature = "sysinfo-monitor")]
Expand Down
11 changes: 10 additions & 1 deletion src/scheduler/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,13 @@ pub enum SchedulerEvent {
/// A parent task entered the waiting state after its executor returned
/// and it has active children.
Waiting { task_id: i64, children_count: i64 },
/// A batch of tasks was submitted.
BatchSubmitted {
/// Number of tasks in the batch (total input count).
count: usize,
/// Task IDs that were inserted (new tasks only, not upgrades/requeues).
inserted_ids: Vec<i64>,
},
/// The scheduler was globally paused via [`Scheduler::pause_all`].
Paused,
/// The scheduler was resumed via [`Scheduler::resume_all`].
Expand All @@ -100,7 +107,9 @@ impl SchedulerEvent {
Some(h)
}
Self::Failed { header, .. } | Self::Progress { header, .. } => Some(header),
Self::Waiting { .. } | Self::Paused | Self::Resumed => None,
Self::Waiting { .. } | Self::BatchSubmitted { .. } | Self::Paused | Self::Resumed => {
None
}
}
}
}
Expand Down
33 changes: 27 additions & 6 deletions src/scheduler/submit.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
//! Task submission, lookup, and cancellation.
//! Task submission (single, batch, typed), lookup, and cancellation.

use crate::priority::Priority;
use crate::store::StoreError;
use crate::task::{generate_dedup_key, SubmitOutcome, TaskLookup, TaskSubmission, TypedTask};
use crate::task::{
generate_dedup_key, BatchOutcome, BatchSubmission, SubmitOutcome, TaskLookup, TaskSubmission,
TypedTask,
};

use super::{Scheduler, SchedulerEvent};

Expand Down Expand Up @@ -33,12 +36,14 @@ impl Scheduler {

/// Submit multiple tasks in a single SQLite transaction.
///
/// Preemption is triggered once at the end if any inserted or upgraded
/// task has high enough priority.
/// Returns a [`BatchOutcome`] with per-task results and convenience
/// accessors. Preemption is triggered once at the end if any inserted
/// or upgraded task has high enough priority. A [`SchedulerEvent::BatchSubmitted`]
/// event is emitted when at least one task was changed.
pub async fn submit_batch(
&self,
submissions: &[TaskSubmission],
) -> Result<Vec<SubmitOutcome>, StoreError> {
) -> Result<BatchOutcome, StoreError> {
let results = self.inner.store.submit_batch(submissions).await?;

// Find the highest (lowest numeric value) priority among tasks that
Expand All @@ -63,11 +68,27 @@ impl Scheduler {
}
}

let outcome = BatchOutcome { outcomes: results };

if any_changed {
let inserted_ids = outcome.inserted();
let _ = self.inner.event_tx.send(SchedulerEvent::BatchSubmitted {
count: submissions.len(),
inserted_ids,
});

self.inner.work_notify.notify_one();
}

Ok(results)
Ok(outcome)
}

/// Submit a batch built with [`BatchSubmission`].
///
/// Applies the builder's defaults and delegates to [`submit_batch`](Self::submit_batch).
pub async fn submit_built(&self, batch: BatchSubmission) -> Result<BatchOutcome, StoreError> {
let submissions = batch.build();
self.submit_batch(&submissions).await
}

/// Submit a [`TypedTask`], handling serialization automatically.
Expand Down
69 changes: 69 additions & 0 deletions src/scheduler/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use tokio::time::Duration;
use tokio_util::sync::CancellationToken;

use crate::backpressure::{CompositePressure, ThrottlePolicy};
use crate::priority::Priority;
use crate::registry::{TaskContext, TaskExecutor, TaskTypeRegistry};
use crate::store::TaskStore;
use crate::task::{SubmitOutcome, TaskError, TaskSubmission};
Expand Down Expand Up @@ -891,3 +892,71 @@ async fn byte_progress_in_snapshot() {
assert_eq!(snap.byte_progress[0].bytes_total, Some(1_048_576));
assert!(snap.byte_progress[0].bytes_completed > 0);
}

#[tokio::test]
async fn batch_submitted_event() {
let sched = setup(arc_erased(InstantExecutor)).await;
let mut rx = sched.subscribe();

let subs: Vec<_> = (0..3)
.map(|i| TaskSubmission::new("test").key(format!("ev-{i}")))
.collect();

let outcome = sched.submit_batch(&subs).await.unwrap();
assert_eq!(outcome.len(), 3);
assert_eq!(outcome.inserted().len(), 3);
assert_eq!(outcome.duplicated_count(), 0);

// The BatchSubmitted event should be receivable.
let event = rx.try_recv().unwrap();
match event {
SchedulerEvent::BatchSubmitted {
count,
inserted_ids,
} => {
assert_eq!(count, 3);
assert_eq!(inserted_ids.len(), 3);
}
other => panic!("expected BatchSubmitted, got {other:?}"),
}
}

#[tokio::test]
async fn batch_outcome_convenience_methods() {
let sched = setup(arc_erased(InstantExecutor)).await;

// Submit one task first so re-submitting it produces a Duplicate.
sched
.submit(&TaskSubmission::new("test").key("existing"))
.await
.unwrap();

let subs = vec![
TaskSubmission::new("test").key("new-1"),
TaskSubmission::new("test").key("existing"),
TaskSubmission::new("test").key("new-2"),
];

let outcome = sched.submit_batch(&subs).await.unwrap();
assert_eq!(outcome.len(), 3);
assert_eq!(outcome.inserted().len(), 2);
assert_eq!(outcome.duplicated_count(), 1);
assert!(outcome.upgraded().is_empty());
assert!(outcome.requeued().is_empty());
}

#[tokio::test]
async fn submit_built_applies_defaults() {
use crate::task::BatchSubmission;

let sched = setup(arc_erased(InstantExecutor)).await;

let batch = BatchSubmission::new()
.default_group("g1")
.default_priority(Priority::HIGH)
.task(TaskSubmission::new("test").key("built-1"))
.task(TaskSubmission::new("test").key("built-2"));

let outcome = sched.submit_built(batch).await.unwrap();
assert_eq!(outcome.inserted().len(), 2);
}
91 changes: 84 additions & 7 deletions src/store/submit.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,19 @@
//! Task submission: deduplication, priority upgrade, and requeue logic.
//! Task submission: deduplication, priority upgrade, requeue logic,
//! intra-batch last-wins dedup, and transaction chunking for large batches.

use std::collections::HashMap;

use sqlx::Row;

use crate::task::{SubmitOutcome, TaskSubmission, MAX_PAYLOAD_BYTES};

use super::{StoreError, TaskStore};

/// Maximum number of tasks per transaction chunk. Batches larger than this
/// are split into multiple transactions to avoid holding the SQLite write
/// lock for too long.
const BATCH_CHUNK_SIZE: usize = 10_000;

/// Core dedup logic for a single task submission within an existing connection.
///
/// Performs the three-step dedup: INSERT OR IGNORE → upgrade priority on
Expand Down Expand Up @@ -112,6 +120,15 @@ impl TaskStore {
/// This is significantly faster than calling [`submit`](Self::submit) in a
/// loop because all inserts share a single SQLite transaction (one
/// `BEGIN`/`COMMIT` pair instead of N implicit transactions).
///
/// **Intra-batch dedup:** When multiple tasks in the same batch share a
/// dedup key, only the last occurrence is submitted (last-wins). Earlier
/// duplicates receive [`SubmitOutcome::Duplicate`].
///
/// **Chunking:** Batches larger than 10,000 tasks are split into
/// sub-transactions to avoid holding the SQLite write lock for too long.
/// This means very large batches are not fully atomic, but task submission
/// is idempotent so re-submitting after a partial failure is safe.
pub async fn submit_batch(
&self,
submissions: &[TaskSubmission],
Expand All @@ -126,14 +143,31 @@ impl TaskStore {
}
}

// Intra-batch dedup: last-wins. Map each effective key to its last
// occurrence index so earlier duplicates are skipped.
let mut last_occurrence: HashMap<String, usize> = HashMap::new();
for (i, sub) in submissions.iter().enumerate() {
last_occurrence.insert(sub.effective_key(), i);
}

let mut results = Vec::with_capacity(submissions.len());
let mut conn = self.begin_write().await?;

for sub in submissions {
results.push(submit_one(&mut conn, sub).await?);
for chunk in submissions.chunks(BATCH_CHUNK_SIZE) {
let chunk_offset = results.len();
let mut conn = self.begin_write().await?;

for (i, sub) in chunk.iter().enumerate() {
let global_i = chunk_offset + i;
if last_occurrence[&sub.effective_key()] != global_i {
results.push(SubmitOutcome::Duplicate);
} else {
results.push(submit_one(&mut conn, sub).await?);
}
}

sqlx::query("COMMIT").execute(&mut *conn).await?;
}

sqlx::query("COMMIT").execute(&mut *conn).await?;
Ok(results)
}
}
Expand Down Expand Up @@ -357,13 +391,16 @@ mod tests {
let store = test_store().await;
let sub = make_submission("dup", Priority::NORMAL);

// Intra-batch dedup: last-wins, so the first is Duplicate and the
// second (last occurrence) is Inserted.
let results = store
.submit_batch(&[sub.clone(), sub.clone()])
.await
.unwrap();
assert!(results[0].is_inserted());
assert_eq!(results[1], SubmitOutcome::Duplicate);
assert_eq!(results[0], SubmitOutcome::Duplicate);
assert!(results[1].is_inserted());

// Re-submitting the same key hits the DB-level dedup.
let results = store.submit_batch(&[sub]).await.unwrap();
assert_eq!(results[0], SubmitOutcome::Duplicate);
}
Expand All @@ -375,6 +412,46 @@ mod tests {
assert!(results.is_empty());
}

#[tokio::test]
async fn submit_batch_intra_dedup_last_wins() {
let store = test_store().await;

// Two tasks with the same dedup key but different priorities.
// Last-wins: the second task (HIGH) should be inserted, first skipped.
let sub_normal = make_submission("same-key", Priority::NORMAL);
let sub_high = make_submission("same-key", Priority::HIGH);

let results = store
.submit_batch(&[sub_normal.clone(), sub_high.clone()])
.await
.unwrap();
assert_eq!(results[0], SubmitOutcome::Duplicate);
assert!(results[1].is_inserted());

// Verify the stored task has the second task's priority.
let key = sub_normal.effective_key();
let task = store.task_by_key(&key).await.unwrap().unwrap();
assert_eq!(task.priority, Priority::HIGH);
}

#[tokio::test]
async fn submit_batch_large_chunking() {
use super::BATCH_CHUNK_SIZE;

let store = test_store().await;
let count = BATCH_CHUNK_SIZE + 100;
let subs: Vec<_> = (0..count)
.map(|i| make_submission(&format!("chunk-{i}"), Priority::NORMAL))
.collect();

let results = store.submit_batch(&subs).await.unwrap();
assert_eq!(results.len(), count);
assert!(results.iter().all(|r| r.is_inserted()));

let pending = store.pending_count().await.unwrap();
assert_eq!(pending, count as i64);
}

#[tokio::test]
async fn submit_batch_rejects_oversized_payload() {
use crate::store::StoreError;
Expand Down
Loading
Loading