Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions migrations/007_tags.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
-- Task metadata tags (key-value pairs).
CREATE TABLE IF NOT EXISTS task_tags (
task_id INTEGER NOT NULL,
key TEXT NOT NULL,
value TEXT NOT NULL,
PRIMARY KEY (task_id, key)
);

CREATE INDEX IF NOT EXISTS idx_task_tags_kv ON task_tags(key, value);

CREATE TABLE IF NOT EXISTS task_history_tags (
history_rowid INTEGER NOT NULL,
key TEXT NOT NULL,
value TEXT NOT NULL,
PRIMARY KEY (history_rowid, key)
);

CREATE INDEX IF NOT EXISTS idx_history_tags_kv ON task_history_tags(key, value);
24 changes: 23 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,27 @@
//! (with `TtlFrom::Submission`), so a child can never outlive its parent's
//! deadline.
//!
//! ## Task metadata tags
//!
//! Tasks can carry schema-free key-value metadata tags for filtering, grouping,
//! and display — without deserializing the task payload. Tags are immutable
//! after submission and are persisted, indexed, and queryable.
//!
//! Set tags per-task via [`TaskSubmission::tag`], per-type via
//! [`TypedTask::tags`], or as batch defaults via
//! [`BatchSubmission::default_tag`]. Tag keys and values are validated at submit
//! time against [`MAX_TAG_KEY_LEN`], [`MAX_TAG_VALUE_LEN`], and
//! [`MAX_TAGS_PER_TASK`].
//!
//! Child tasks inherit parent tags by default (child tags take precedence).
//! Tags are copied to history on all terminal transitions and are included in
//! [`TaskEventHeader`] for event subscribers.
//!
//! Query by tags with [`TaskStore::tasks_by_tags`] (AND semantics),
//! [`TaskStore::count_by_tag`] (grouped counts), or
//! [`TaskStore::tag_values`] (distinct values). Cancel by tag with
//! [`Scheduler::cancel_by_tag`].
//!
//! ## Delayed & scheduled tasks
//!
//! A task can declare **when** it becomes eligible for dispatch:
Expand Down Expand Up @@ -774,7 +795,8 @@ pub use task::{
generate_dedup_key, BatchOutcome, BatchSubmission, DependencyFailurePolicy, DuplicateStrategy,
HistoryStatus, IoBudget, ParentResolution, RecurringSchedule, RecurringScheduleInfo,
SubmitOutcome, TaskError, TaskHistoryRecord, TaskLookup, TaskRecord, TaskStatus,
TaskSubmission, TtlFrom, TypeStats, TypedTask,
TaskSubmission, TtlFrom, TypeStats, TypedTask, MAX_TAGS_PER_TASK, MAX_TAG_KEY_LEN,
MAX_TAG_VALUE_LEN,
};

#[cfg(feature = "sysinfo-monitor")]
Expand Down
43 changes: 27 additions & 16 deletions src/registry/child_spawner.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
//! Child task spawning from within an executor.

use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;

Expand All @@ -8,6 +9,16 @@ use chrono::{DateTime, Utc};
use crate::store::{StoreError, TaskStore};
use crate::task::{SubmitOutcome, TaskSubmission, TtlFrom};

/// Inherited parent context for child spawning: TTL and tags.
#[derive(Clone)]
pub(crate) struct ParentContext {
pub created_at: DateTime<Utc>,
pub ttl_seconds: Option<i64>,
pub ttl_from: TtlFrom,
pub started_at: Option<DateTime<Utc>>,
pub tags: HashMap<String, String>,
}

/// Handle for spawning child tasks from within an executor.
///
/// Wraps a [`TaskStore`] reference and the parent task ID so that
Expand All @@ -22,30 +33,21 @@ pub(crate) struct ChildSpawner {
store: TaskStore,
parent_id: i64,
work_notify: Arc<tokio::sync::Notify>,
parent_created_at: DateTime<Utc>,
parent_ttl_seconds: Option<i64>,
parent_ttl_from: TtlFrom,
parent_started_at: Option<DateTime<Utc>>,
parent: ParentContext,
}

impl ChildSpawner {
pub(crate) fn new(
store: TaskStore,
parent_id: i64,
work_notify: Arc<tokio::sync::Notify>,
parent_created_at: DateTime<Utc>,
parent_ttl_seconds: Option<i64>,
parent_ttl_from: TtlFrom,
parent_started_at: Option<DateTime<Utc>>,
parent: ParentContext,
) -> Self {
Self {
store,
parent_id,
work_notify,
parent_created_at,
parent_ttl_seconds,
parent_ttl_from,
parent_started_at,
parent,
}
}

Expand All @@ -55,15 +57,15 @@ impl ChildSpawner {
if sub.ttl.is_some() {
return; // Child has explicit TTL, don't override.
}
let Some(parent_ttl_secs) = self.parent_ttl_seconds else {
let Some(parent_ttl_secs) = self.parent.ttl_seconds else {
return; // Parent has no TTL.
};
let parent_ttl = Duration::from_secs(parent_ttl_secs as u64);

// Determine when the parent's TTL started.
let ttl_start = match self.parent_ttl_from {
TtlFrom::Submission => self.parent_created_at,
TtlFrom::FirstAttempt => match self.parent_started_at {
let ttl_start = match self.parent.ttl_from {
TtlFrom::Submission => self.parent.created_at,
TtlFrom::FirstAttempt => match self.parent.started_at {
Some(started) => started,
None => return, // Parent hasn't started yet, can't compute remaining.
},
Expand All @@ -80,10 +82,18 @@ impl ChildSpawner {
}
}

/// Inherit parent tags into a child submission. Child tags take precedence.
fn inherit_tags(&self, sub: &mut TaskSubmission) {
for (k, v) in &self.parent.tags {
sub.tags.entry(k.clone()).or_insert_with(|| v.clone());
}
}

/// Submit a single child task. Sets `parent_id` automatically.
pub async fn spawn(&self, mut sub: TaskSubmission) -> Result<SubmitOutcome, StoreError> {
sub.parent_id = Some(self.parent_id);
self.inherit_ttl(&mut sub);
self.inherit_tags(&mut sub);
let outcome = self.store.submit(&sub).await?;
self.work_notify.notify_one();
Ok(outcome)
Expand All @@ -97,6 +107,7 @@ impl ChildSpawner {
for sub in submissions.iter_mut() {
sub.parent_id = Some(self.parent_id);
self.inherit_ttl(sub);
self.inherit_tags(sub);
}
let outcomes = self.store.submit_batch(submissions).await?;
self.work_notify.notify_one();
Expand Down
2 changes: 1 addition & 1 deletion src/registry/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use std::sync::Arc;

use crate::task::TaskError;

pub(crate) use child_spawner::ChildSpawner;
pub(crate) use child_spawner::{ChildSpawner, ParentContext};
pub use context::TaskContext;
pub(crate) use io_tracker::IoTracker;
pub(crate) use state::{StateMap, StateSnapshot};
Expand Down
13 changes: 8 additions & 5 deletions src/scheduler/dispatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::sync::{Arc, Mutex};
use tokio_util::sync::CancellationToken;

use crate::priority::Priority;
use crate::registry::{ChildSpawner, IoTracker, TaskContext};
use crate::registry::{ChildSpawner, IoTracker, ParentContext, TaskContext};
use crate::store::TaskStore;
use crate::task::{IoBudget, ParentResolution, TaskRecord};

Expand Down Expand Up @@ -300,10 +300,13 @@ pub(crate) async fn spawn_task(
store.clone(),
task.id,
work_notify.clone(),
task.created_at,
task.ttl_seconds,
task.ttl_from,
task.started_at,
ParentContext {
created_at: task.created_at,
ttl_seconds: task.ttl_seconds,
ttl_from: task.ttl_from,
started_at: task.started_at,
tags: task.tags.clone(),
},
);
let io = Arc::new(IoTracker::new());

Expand Down
4 changes: 4 additions & 0 deletions src/scheduler/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
//! - [`DependencyFailed { task_id, failed_dependency }`](SchedulerEvent::DependencyFailed)
//! — a blocked task was cancelled because a dependency failed

use std::collections::HashMap;

use serde::{Deserialize, Serialize};
use tokio::time::Duration;

Expand Down Expand Up @@ -62,6 +64,8 @@ pub struct TaskEventHeader {
pub task_type: String,
pub key: String,
pub label: String,
/// Key-value metadata tags from the task record.
pub tags: HashMap<String, String>,
}

// ── Events ──────────────────────────────────────────────────────────
Expand Down
29 changes: 29 additions & 0 deletions src/scheduler/queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,35 @@ impl Scheduler {
.collect()
}

/// Find active tasks matching all specified tag filters (AND semantics).
///
/// Delegates to [`TaskStore::tasks_by_tags`].
pub async fn tasks_by_tags(
&self,
filters: &[(&str, &str)],
status: Option<crate::task::TaskStatus>,
) -> Result<Vec<crate::task::TaskRecord>, StoreError> {
self.inner.store.tasks_by_tags(filters, status).await
}

/// Count active tasks grouped by a tag key's values.
///
/// Delegates to [`TaskStore::count_by_tag`].
pub async fn count_by_tag(
&self,
key: &str,
status: Option<crate::task::TaskStatus>,
) -> Result<Vec<(String, i64)>, StoreError> {
self.inner.store.count_by_tag(key, status).await
}

/// List distinct values for a tag key across active tasks, with counts.
///
/// Delegates to [`TaskStore::tag_values`].
pub async fn tag_values(&self, key: &str) -> Result<Vec<(String, i64)>, StoreError> {
self.inner.store.tag_values(key).await
}

/// Capture a single status snapshot for dashboard UIs.
///
/// Gathers running tasks, queue depths, progress estimates, and
Expand Down
21 changes: 21 additions & 0 deletions src/scheduler/submit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ impl Scheduler {
task_type: sub.task_type.clone(),
key: sub.effective_key(),
label: sub.label.clone(),
tags: sub.tags.clone(),
};
let _ = self.inner.event_tx.send(SchedulerEvent::Superseded {
old: old_header,
Expand Down Expand Up @@ -110,6 +111,7 @@ impl Scheduler {
task_type: sub.task_type.clone(),
key: sub.effective_key(),
label: sub.label.clone(),
tags: sub.tags.clone(),
};
let _ = self.inner.event_tx.send(SchedulerEvent::Superseded {
old: old_header,
Expand Down Expand Up @@ -294,6 +296,25 @@ impl Scheduler {
Ok(cancelled)
}

/// Cancel all active tasks matching a tag key-value pair.
///
/// Finds tasks via [`TaskStore::tasks_by_tags`] and cancels each one.
/// Returns the ids of tasks that were successfully cancelled.
pub async fn cancel_by_tag(&self, key: &str, value: &str) -> Result<Vec<i64>, StoreError> {
let tasks = self
.inner
.store
.tasks_by_tags(&[(key, value)], None)
.await?;
let mut cancelled = Vec::new();
for task in &tasks {
if self.cancel(task.id).await? {
cancelled.push(task.id);
}
}
Ok(cancelled)
}

/// Cancel all tasks matching a predicate.
pub async fn cancel_where(
&self,
Expand Down
73 changes: 73 additions & 0 deletions src/store/hierarchy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,79 @@ mod tests {
assert!(t.started_at.is_some());
}

#[tokio::test]
async fn child_inherits_parent_tags() {
use crate::registry::child_spawner::{ChildSpawner, ParentContext};
use std::sync::Arc;

let store = test_store().await;
let notify = Arc::new(tokio::sync::Notify::new());

// Submit a parent with tags.
let parent_sub = TaskSubmission::new("test")
.key("tagged-parent")
.tag("env", "prod")
.tag("region", "us-east");
let parent_id = store.submit(&parent_sub).await.unwrap().id().unwrap();
let parent = store.pop_next().await.unwrap().unwrap();

let ctx = ParentContext {
created_at: parent.created_at,
ttl_seconds: None,
ttl_from: crate::task::TtlFrom::Submission,
started_at: parent.started_at,
tags: parent.tags.clone(),
};
let spawner = ChildSpawner::new(store.clone(), parent_id, notify, ctx);

// Spawn a child without tags — should inherit parent tags.
let child_sub = TaskSubmission::new("test").key("child-no-tags");
let outcome = spawner.spawn(child_sub).await.unwrap();
let child_id = outcome.id().unwrap();

let child = store.task_by_id(child_id).await.unwrap().unwrap();
assert_eq!(child.tags.get("env").unwrap(), "prod");
assert_eq!(child.tags.get("region").unwrap(), "us-east");
}

#[tokio::test]
async fn child_overrides_parent_tag() {
use crate::registry::child_spawner::{ChildSpawner, ParentContext};
use std::sync::Arc;

let store = test_store().await;
let notify = Arc::new(tokio::sync::Notify::new());

let parent_sub = TaskSubmission::new("test")
.key("tagged-parent-2")
.tag("env", "prod")
.tag("region", "us-east");
let parent_id = store.submit(&parent_sub).await.unwrap().id().unwrap();
let parent = store.pop_next().await.unwrap().unwrap();

let ctx = ParentContext {
created_at: parent.created_at,
ttl_seconds: None,
ttl_from: crate::task::TtlFrom::Submission,
started_at: parent.started_at,
tags: parent.tags.clone(),
};
let spawner = ChildSpawner::new(store.clone(), parent_id, notify, ctx);

// Spawn a child that overrides "region" but inherits "env".
let child_sub = TaskSubmission::new("test")
.key("child-override")
.tag("region", "eu-west")
.tag("extra", "yes");
let outcome = spawner.spawn(child_sub).await.unwrap();
let child_id = outcome.id().unwrap();

let child = store.task_by_id(child_id).await.unwrap().unwrap();
assert_eq!(child.tags.get("env").unwrap(), "prod"); // Inherited.
assert_eq!(child.tags.get("region").unwrap(), "eu-west"); // Overridden.
assert_eq!(child.tags.get("extra").unwrap(), "yes"); // Child's own.
}

#[tokio::test]
async fn recover_preserves_waiting_parents() {
let store = test_store().await;
Expand Down
Loading
Loading