Skip to content

Feature: Task superseding (cancel-and-replace by dedup key) #15

@deepjoy

Description

@deepjoy

Summary

When a new task is submitted with a dedup key that matches an already-running task, provide an option to cancel the in-flight task and replace it with the new submission, rather than silently deduplicating (skipping) the new one.

Motivation

TaskMill's current dedup-by-key behavior returns SubmitOutcome::Deduplicated when a task with the same key is already queued or running. This is correct for idempotent work, but wrong for latest-value-wins scenarios.

In a continuous file sync engine (mantle watch), a file may change again while it's still being transferred. The in-flight transfer is now uploading a stale version. The correct behavior is:

  1. Cancel the running transfer (including its child parts) and invoke abort hooks (e.g. AbortMultipartUpload)
  2. Submit the new transfer for the latest version
  3. The new task takes the old task's place in the queue

Without superseding, the consumer must manually query for existing tasks by key, cancel them, wait for cancellation to complete, and then submit — a race-prone sequence that TaskMill could handle atomically.

Proposed Behavior

  • TaskSubmission gains a submit mode:
    TaskSubmission::new("file-transfer")
        .dedup_key("s3://bucket/path/to/file.tar.gz")
        .on_duplicate(DuplicateStrategy::Supersede)  // new
        .payload_json(&new_transfer_plan)?
  • DuplicateStrategy enum:
    enum DuplicateStrategy {
        Skip,        // Current behavior — return Deduplicated (default)
        Supersede,   // Cancel existing + submit new atomically
        Reject,      // Return an error if duplicate exists
    }
  • When Supersede is used:
    • If the existing task is pending: replace it in-place (same queue position, updated payload)
    • If the existing task is running: cancel it (trigger on_cancel hook), then enqueue the new task
    • If the existing task has children: cascade cancellation to all children before enqueueing the replacement
    • The replacement task gets a fresh retry counter
  • SubmitOutcome gains a new variant:
    SubmitOutcome::Superseded {
        new_task_id: TaskId,
        replaced_task_id: TaskId,
    }
  • The superseded task is recorded in task history with HistoryStatus::Superseded for audit trail

Example: Continuous Sync

// Watcher detects file changed — submit transfer
// If a transfer for this key is already running, cancel it and start fresh
scheduler.submit(
    TaskSubmission::new("file-transfer")
        .dedup_key(&format!("transfer:{profile}:{key}"))
        .on_duplicate(DuplicateStrategy::Supersede)
        .group(&dst_endpoint)
        .expected_net_io(new_size as i64, new_size as i64)
        .payload_json(&TransferPlan { /* latest version */ })?
).await?;

Design Considerations

  • Superseding must be atomic from the scheduler's perspective — there should be no window where both the old and new task are running simultaneously
  • For parent tasks with children, the cancellation cascade must complete (and on_cancel hooks must fire) before the replacement is scheduled
  • If the old task is in finalize phase (all children done, running finalize), superseding should still work — cancel finalize and start fresh
  • Superseding a task that itself superseded another should work correctly (chain of replacements)
  • The replaced task's StateMap should be accessible to the new task if needed (e.g. to reuse an upload_id rather than creating a new multipart upload)

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions