Skip to content

Feature: Task metadata tags for querying and filtering #19

@deepjoy

Description

@deepjoy

Summary

Allow tasks to carry arbitrary key-value metadata tags that are persisted, indexed, and queryable — enabling filtering, grouping, and display without overloading existing fields.

Motivation

TaskMill's current task model has dedup_key (for deduplication) and group (for concurrency limits), but consumers often need to associate additional metadata with tasks for operational purposes:

  • Filtering: "Show me all in-progress transfers for profile disaster-recovery"
  • Aggregation: "How many bytes are queued for endpoint s3://play.min.io?"
  • Display: "What is the source key, destination key, and profile name for this task?" (for CLI progress bars)
  • Debugging: "Find all tasks related to object data/2024/report.csv across all profiles"

Today, this metadata must be embedded in the task payload, which means consumers must deserialize every task's payload to filter or display — expensive and tightly coupled. Tags provide a lightweight, indexed, schema-free metadata layer.

Proposed Behavior

Submission

scheduler.submit(
    TaskSubmission::new("file-transfer")
        .tag("profile", "disaster-recovery")
        .tag("src_endpoint", "s3://us-east-1.amazonaws.com")
        .tag("dst_endpoint", "s3://play.min.io")
        .tag("object_key", "data/2024/report.csv")
        .tag("direction", "left_to_right")
        .payload_json(&plan)?
).await?;

Querying

// Find all tasks for a profile
let tasks = scheduler.query_tasks(
    TaskQuery::new()
        .with_tag("profile", "disaster-recovery")
        .with_status(TaskStatus::Running)
).await?;

// Aggregate bytes queued per endpoint
let stats = scheduler.aggregate_tasks(
    TaskAggregate::new()
        .group_by_tag("dst_endpoint")
        .sum("expected_net_tx_bytes")
).await?;

// Count tasks by profile
let counts = scheduler.count_by_tag("profile").await?;
// → [("disaster-recovery", 1234), ("cross-cloud-mirror", 567)]

In Events

SchedulerEvent variants that reference tasks include the task's tags, so event consumers can filter and display without additional queries:

SchedulerEvent::TaskStarted {
    task_id,
    task_type: "file-transfer",
    tags: {
        "profile": "disaster-recovery",
        "object_key": "data/2024/report.csv",
    },
    ..
}

In Snapshots

SchedulerSnapshot includes tag-based aggregations:

let snapshot = scheduler.snapshot().await;
for (profile, stats) in snapshot.stats_by_tag("profile") {
    println!("{}: {} running, {} pending", profile, stats.running, stats.pending);
}

Storage

Tags are stored in a separate task_tags table with a composite index on (key, value):

CREATE TABLE task_tags (
    task_id TEXT NOT NULL REFERENCES tasks(id),
    key TEXT NOT NULL,
    value TEXT NOT NULL,
    PRIMARY KEY (task_id, key)
);
CREATE INDEX idx_task_tags_kv ON task_tags(key, value);

Design Considerations

  • Tags are immutable after submission — they describe the task's identity, not its progress. Mutable state belongs in StateMap
  • Tag keys should be short strings (recommended max 64 chars), values up to 256 chars. Not designed for storing large data
  • Children inherit parent tags by default but can add/override their own (e.g. child adds part_number: "7")
  • Bulk submission (submit_batch) should efficiently insert tags for all tasks in the same transaction
  • Tag queries should be efficient for common patterns (exact match on key+value, list all values for a key). Full-text search on tag values is out of scope
  • Consider reserving a taskmill.* tag namespace for internal use (e.g. taskmill.type, taskmill.group)

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions