Skip to content

Feature: Bulk task submission #16

@deepjoy

Description

@deepjoy

Summary

Add a batch submission API that inserts multiple tasks in a single SQLite transaction, with bulk versions of deduplication and group assignment.

Motivation

When a sync engine diffs two large buckets and produces tens of thousands of sync actions, each action becomes a TaskMill task. Submitting them one-by-one means:

  • 50,000 individual SQLite INSERT statements (even with WAL mode, this is slow)
  • 50,000 dedup key lookups
  • 50,000 SchedulerEvent::TaskSubmitted events flooding the broadcast channel
  • The scheduler's dispatch loop potentially waking up after each submission

For a bucket with 100,000 objects, the submission phase alone could take 10-30 seconds. A bulk API wrapping everything in a single transaction would reduce this to sub-second.

Proposed Behavior

  • New Scheduler method:
    pub async fn submit_batch(
        &self,
        tasks: Vec<TaskSubmission>,
    ) -> Result<BatchOutcome, SchedulerError>
  • BatchOutcome provides per-task results:
    struct BatchOutcome {
        submitted: Vec<TaskId>,        // successfully queued
        deduplicated: Vec<String>,     // dedup keys that were skipped
        superseded: Vec<(TaskId, TaskId)>,  // (new, replaced) pairs
        failed: Vec<(usize, SchedulerError)>,  // (index, error) for failures
    }
  • Implementation details:
    • All inserts happen in a single SQLite transaction (BEGIN → INSERT × N → COMMIT)
    • Dedup key checks are batched (single SELECT ... WHERE dedup_key IN (...))
    • A single SchedulerEvent::BatchSubmitted { count, task_ids } event replaces N individual events
    • The scheduler's dispatch loop is notified once after the batch, not per-task
  • Optional builder pattern for common batch settings:
    scheduler.submit_batch(
        BatchSubmission::new()
            .default_group("s3://us-east-1.amazonaws.com")
            .default_priority(Priority::NORMAL)
            .on_duplicate(DuplicateStrategy::Skip)
            .tasks(sync_actions.into_iter().map(|a| a.into_task_submission()))
    ).await?;

Example: Sync Engine Integration

// Differ produces ~50,000 sync actions
let sync_plan = planner.plan(&diff_entries).await?;

// Submit all at once
let outcome = scheduler.submit_batch(
    sync_plan.actions.into_iter().map(|action| {
        TaskSubmission::new(action.task_type())
            .dedup_key(&action.dedup_key())
            .group(&action.endpoint())
            .priority(action.priority())
            .expected_net_io(0, action.size() as i64)
            .payload_json(&action)
            .unwrap()
    }).collect()
).await?;

tracing::info!(
    submitted = outcome.submitted.len(),
    skipped = outcome.deduplicated.len(),
    "Batch submission complete"
);

Design Considerations

  • The batch should be atomic from a persistence perspective (single transaction), but the scheduler should start dispatching tasks from the batch as soon as the transaction commits — it doesn't need to wait for all tasks to be "processed"
  • For very large batches (>100k tasks), consider chunking the transaction to avoid holding the SQLite write lock for too long (e.g. 10k per transaction)
  • Dedup within the batch itself should be handled (if two tasks in the same batch have the same dedup key, only the last one wins)
  • Priority ordering within the batch should be respected — submitting a batch shouldn't cause all batch tasks to run before previously-queued tasks of the same priority

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions