From cbdddd226ffd1f8ca502ea3d63c71c594597e80d Mon Sep 17 00:00:00 2001 From: DJ Majumdar Date: Sat, 21 Mar 2026 19:01:50 -0700 Subject: [PATCH 1/3] feat: add tag key prefix queries for namespace-scoped discovery (#41) Support querying, counting, and cancelling tasks by tag key prefix (e.g. "billing.") across all layers: TaskStore, Scheduler, ModuleHandle, and DomainHandle. LIKE wildcards in user-supplied prefixes are escaped. --- src/domain.rs | 34 +++++ src/module.rs | 58 ++++++++ src/scheduler/queries.rs | 26 ++++ src/scheduler/submit.rs | 18 +++ src/store/query/tags.rs | 167 +++++++++++++++++++++++ src/store/query/tests.rs | 279 +++++++++++++++++++++++++++++++++++++++ 6 files changed, 582 insertions(+) diff --git a/src/domain.rs b/src/domain.rs index 4d33c7e..12b1431 100644 --- a/src/domain.rs +++ b/src/domain.rs @@ -649,6 +649,40 @@ impl DomainHandle { self.inner.tasks_by_tags(filters, status).await } + /// Discover tag keys matching a prefix within this domain. + pub async fn tag_keys_by_prefix( + &self, + prefix: &str, + ) -> Result, StoreError> { + self.inner.tag_keys_by_prefix(prefix).await + } + + /// Find domain tasks with any tag key matching the given prefix. + pub async fn tasks_by_tag_key_prefix( + &self, + prefix: &str, + status: Option, + ) -> Result, StoreError> { + self.inner.tasks_by_tag_key_prefix(prefix, status).await + } + + /// Count domain tasks with any tag key matching the given prefix. + pub async fn count_by_tag_key_prefix( + &self, + prefix: &str, + status: Option, + ) -> Result { + self.inner.count_by_tag_key_prefix(prefix, status).await + } + + /// Cancel all domain tasks with any tag key matching the given prefix. + pub async fn cancel_by_tag_key_prefix( + &self, + prefix: &str, + ) -> Result, StoreError> { + self.inner.cancel_by_tag_key_prefix(prefix).await + } + /// Set the maximum number of concurrent tasks for this domain. pub fn set_max_concurrency(&self, n: usize) { self.inner.set_max_concurrency(n); diff --git a/src/module.rs b/src/module.rs index 3396291..f1c6ba7 100644 --- a/src/module.rs +++ b/src/module.rs @@ -724,6 +724,64 @@ impl ModuleHandle { .await } + /// Discover tag keys matching a prefix, scoped to this module's tasks. + pub async fn tag_keys_by_prefix( + &self, + prefix: &str, + ) -> Result, StoreError> { + self.scheduler + .inner + .store + .tag_keys_by_prefix_with_prefix(&self.prefix, prefix) + .await + } + + /// Find module tasks with any tag key matching the given prefix. + pub async fn tasks_by_tag_key_prefix( + &self, + prefix: &str, + status: Option, + ) -> Result, StoreError> { + self.scheduler + .inner + .store + .tasks_by_tag_key_prefix_with_prefix(&self.prefix, prefix, status) + .await + } + + /// Count module tasks with any tag key matching the given prefix. + pub async fn count_by_tag_key_prefix( + &self, + prefix: &str, + status: Option, + ) -> Result { + self.scheduler + .inner + .store + .count_by_tag_key_prefix_with_prefix(&self.prefix, prefix, status) + .await + } + + /// Cancel module tasks with any tag key matching the given prefix. + pub async fn cancel_by_tag_key_prefix( + &self, + prefix: &str, + ) -> Result, StoreError> { + let tasks = self + .scheduler + .inner + .store + .tasks_by_tag_key_prefix_with_prefix(&self.prefix, prefix, None) + .await?; + let mut cancelled = Vec::new(); + for task in &tasks { + if self.scheduler.cancel(task.id).await? { + cancelled.push(task.id); + } + } + Ok(cancelled) + } + /// Estimated progress for all running tasks in this module. pub async fn estimated_progress(&self) -> Vec { let snapshots = self diff --git a/src/scheduler/queries.rs b/src/scheduler/queries.rs index cb5bbca..2833aa3 100644 --- a/src/scheduler/queries.rs +++ b/src/scheduler/queries.rs @@ -92,6 +92,32 @@ impl Scheduler { self.inner.store.tag_values(key).await } + /// Discover all tag keys matching a prefix across active tasks. + pub async fn tag_keys_by_prefix( + &self, + prefix: &str, + ) -> Result, StoreError> { + self.inner.store.tag_keys_by_prefix(prefix).await + } + + /// Find active tasks with any tag key matching the given prefix. + pub async fn tasks_by_tag_key_prefix( + &self, + prefix: &str, + status: Option, + ) -> Result, StoreError> { + self.inner.store.tasks_by_tag_key_prefix(prefix, status).await + } + + /// Count active tasks with any tag key matching the given prefix. + pub async fn count_by_tag_key_prefix( + &self, + prefix: &str, + status: Option, + ) -> Result { + self.inner.store.count_by_tag_key_prefix(prefix, status).await + } + /// Dead-lettered tasks (retries exhausted), newest first. /// /// These are tasks that failed with a retryable error but exhausted their diff --git a/src/scheduler/submit.rs b/src/scheduler/submit.rs index 15b1f93..ad4c6f9 100644 --- a/src/scheduler/submit.rs +++ b/src/scheduler/submit.rs @@ -337,6 +337,24 @@ impl Scheduler { Ok(cancelled) } + /// Cancel all active tasks that have any tag key matching the given prefix. + /// + /// Finds tasks via [`TaskStore::tasks_by_tag_key_prefix`] and cancels each. + /// Returns the ids of tasks that were successfully cancelled. + pub async fn cancel_by_tag_key_prefix( + &self, + prefix: &str, + ) -> Result, StoreError> { + let tasks = self.inner.store.tasks_by_tag_key_prefix(prefix, None).await?; + let mut cancelled = Vec::new(); + for task in &tasks { + if self.cancel(task.id).await? { + cancelled.push(task.id); + } + } + Ok(cancelled) + } + /// Cancel all tasks matching a predicate. pub async fn cancel_where( &self, diff --git a/src/store/query/tags.rs b/src/store/query/tags.rs index fe76a2e..91aaca0 100644 --- a/src/store/query/tags.rs +++ b/src/store/query/tags.rs @@ -4,6 +4,21 @@ use crate::store::row_mapping::row_to_task_record; use crate::store::{StoreError, TaskStore}; use crate::task::TaskRecord; +/// Escape SQL LIKE wildcards in `prefix` and append `%` for a safe prefix pattern. +/// +/// The returned value is intended for use with `LIKE ? ESCAPE '\'`. +fn escape_like_prefix(prefix: &str) -> String { + let mut escaped = String::with_capacity(prefix.len() + 1); + for ch in prefix.chars() { + if ch == '%' || ch == '_' || ch == '\\' { + escaped.push('\\'); + } + escaped.push(ch); + } + escaped.push('%'); + escaped +} + impl TaskStore { /// Find active tasks matching all specified tag filters (AND semantics). /// @@ -205,4 +220,156 @@ impl TaskStore { let rows = q.fetch_all(&self.pool).await?; Ok(rows) } + + // ── Tag key prefix queries ────────────────────────────────────── + + /// Discover distinct tag keys matching a prefix across active tasks. + /// + /// Returns keys sorted alphabetically. The prefix is matched literally — + /// LIKE wildcards (`%`, `_`) in the prefix are escaped. + pub async fn tag_keys_by_prefix( + &self, + prefix: &str, + ) -> Result, StoreError> { + let pattern = escape_like_prefix(prefix); + let rows: Vec<(String,)> = sqlx::query_as( + "SELECT DISTINCT key FROM task_tags WHERE key LIKE ? ESCAPE '\\' ORDER BY key", + ) + .bind(&pattern) + .fetch_all(&self.pool) + .await?; + Ok(rows.into_iter().map(|(k,)| k).collect()) + } + + /// Discover distinct tag keys matching a key prefix, scoped to a task_type prefix. + pub async fn tag_keys_by_prefix_with_prefix( + &self, + task_type_prefix: &str, + key_prefix: &str, + ) -> Result, StoreError> { + let key_pattern = escape_like_prefix(key_prefix); + let type_pattern = format!("{task_type_prefix}%"); + let rows: Vec<(String,)> = sqlx::query_as( + "SELECT DISTINCT tt.key FROM task_tags tt \ + JOIN tasks t ON t.id = tt.task_id \ + WHERE tt.key LIKE ? ESCAPE '\\' AND t.task_type LIKE ? \ + ORDER BY tt.key", + ) + .bind(&key_pattern) + .bind(&type_pattern) + .fetch_all(&self.pool) + .await?; + Ok(rows.into_iter().map(|(k,)| k).collect()) + } + + /// Find active tasks that have any tag key matching the prefix. + /// + /// Optionally filter by status. Returns tasks ordered by priority. + /// LIKE wildcards in the prefix are escaped — only true prefix matching. + pub async fn tasks_by_tag_key_prefix( + &self, + prefix: &str, + status: Option, + ) -> Result, StoreError> { + let pattern = escape_like_prefix(prefix); + let mut sql = String::from( + "SELECT t.* FROM tasks t \ + WHERE EXISTS (SELECT 1 FROM task_tags tt \ + WHERE tt.task_id = t.id AND tt.key LIKE ? ESCAPE '\\')", + ); + if let Some(ref s) = status { + sql.push_str(&format!(" AND t.status = '{}'", s.as_str())); + } + sql.push_str(" ORDER BY t.priority ASC, t.id ASC"); + + let rows = sqlx::query(&sql) + .bind(&pattern) + .fetch_all(&self.pool) + .await?; + let mut records: Vec = rows.iter().map(row_to_task_record).collect(); + self.populate_tags(&mut records).await?; + Ok(records) + } + + /// Find active tasks that have any tag key matching the prefix, scoped to a task_type prefix. + pub async fn tasks_by_tag_key_prefix_with_prefix( + &self, + task_type_prefix: &str, + prefix: &str, + status: Option, + ) -> Result, StoreError> { + let pattern = escape_like_prefix(prefix); + let type_pattern = format!("{task_type_prefix}%"); + let mut sql = String::from( + "SELECT t.* FROM tasks t \ + WHERE EXISTS (SELECT 1 FROM task_tags tt \ + WHERE tt.task_id = t.id AND tt.key LIKE ? ESCAPE '\\') \ + AND t.task_type LIKE ?", + ); + if let Some(ref s) = status { + sql.push_str(&format!(" AND t.status = '{}'", s.as_str())); + } + sql.push_str(" ORDER BY t.priority ASC, t.id ASC"); + + let rows = sqlx::query(&sql) + .bind(&pattern) + .bind(&type_pattern) + .fetch_all(&self.pool) + .await?; + let mut records: Vec = rows.iter().map(row_to_task_record).collect(); + self.populate_tags(&mut records).await?; + Ok(records) + } + + /// Count active tasks that have any tag key matching the prefix. + /// + /// LIKE wildcards in the prefix are escaped — only true prefix matching. + pub async fn count_by_tag_key_prefix( + &self, + prefix: &str, + status: Option, + ) -> Result { + let pattern = escape_like_prefix(prefix); + let mut sql = String::from( + "SELECT COUNT(*) FROM tasks t \ + WHERE EXISTS (SELECT 1 FROM task_tags tt \ + WHERE tt.task_id = t.id AND tt.key LIKE ? ESCAPE '\\')", + ); + if let Some(ref s) = status { + sql.push_str(&format!(" AND t.status = '{}'", s.as_str())); + } + + let (count,) = sqlx::query_as::<_, (i64,)>(&sql) + .bind(&pattern) + .fetch_one(&self.pool) + .await?; + Ok(count) + } + + /// Count active tasks that have any tag key matching the prefix, scoped to a task_type prefix. + pub async fn count_by_tag_key_prefix_with_prefix( + &self, + task_type_prefix: &str, + prefix: &str, + status: Option, + ) -> Result { + let pattern = escape_like_prefix(prefix); + let type_pattern = format!("{task_type_prefix}%"); + let mut sql = String::from( + "SELECT COUNT(*) FROM tasks t \ + WHERE EXISTS (SELECT 1 FROM task_tags tt \ + WHERE tt.task_id = t.id AND tt.key LIKE ? ESCAPE '\\') \ + AND t.task_type LIKE ?", + ); + if let Some(ref s) = status { + sql.push_str(&format!(" AND t.status = '{}'", s.as_str())); + } + + let (count,) = sqlx::query_as::<_, (i64,)>(&sql) + .bind(&pattern) + .bind(&type_pattern) + .fetch_one(&self.pool) + .await?; + Ok(count) + } } diff --git a/src/store/query/tests.rs b/src/store/query/tests.rs index 4b4ab5f..ace2b6a 100644 --- a/src/store/query/tests.rs +++ b/src/store/query/tests.rs @@ -289,3 +289,282 @@ async fn tag_values_distinct() { let empty = store.tag_values("nonexistent").await.unwrap(); assert!(empty.is_empty()); } + +// ── Tag key prefix query tests ────────────────────────────────── + +#[tokio::test] +async fn tag_keys_by_prefix_discovers_keys() { + let store = test_store().await; + + store + .submit( + &TaskSubmission::new("test") + .key("tkp-1") + .tag("billing.customer_id", "cust_1") + .tag("billing.plan", "enterprise"), + ) + .await + .unwrap(); + store + .submit( + &TaskSubmission::new("test") + .key("tkp-2") + .tag("media.pipeline", "transcode"), + ) + .await + .unwrap(); + + let keys = store.tag_keys_by_prefix("billing.").await.unwrap(); + assert_eq!(keys, vec!["billing.customer_id", "billing.plan"]); + + let keys = store.tag_keys_by_prefix("media.").await.unwrap(); + assert_eq!(keys, vec!["media.pipeline"]); + + // Non-matching prefix returns empty. + let keys = store.tag_keys_by_prefix("nonexistent.").await.unwrap(); + assert!(keys.is_empty()); +} + +#[tokio::test] +async fn tasks_by_tag_key_prefix_finds_tasks() { + let store = test_store().await; + + store + .submit( + &TaskSubmission::new("test") + .key("tp-1") + .tag("billing.plan", "free"), + ) + .await + .unwrap(); + store + .submit( + &TaskSubmission::new("test") + .key("tp-2") + .tag("billing.customer_id", "cust_1") + .tag("media.codec", "h265"), + ) + .await + .unwrap(); + store + .submit( + &TaskSubmission::new("test") + .key("tp-3") + .tag("media.codec", "h264"), + ) + .await + .unwrap(); + + let tasks = store.tasks_by_tag_key_prefix("billing.", None).await.unwrap(); + assert_eq!(tasks.len(), 2); // tp-1 and tp-2 + + let tasks = store.tasks_by_tag_key_prefix("media.", None).await.unwrap(); + assert_eq!(tasks.len(), 2); // tp-2 and tp-3 +} + +#[tokio::test] +async fn tasks_by_tag_key_prefix_with_status_filter() { + let store = test_store().await; + + store + .submit( + &TaskSubmission::new("test") + .key("ts-1") + .tag("billing.plan", "free"), + ) + .await + .unwrap(); + + let tasks = store + .tasks_by_tag_key_prefix("billing.", Some(TaskStatus::Pending)) + .await + .unwrap(); + assert_eq!(tasks.len(), 1); + + let tasks = store + .tasks_by_tag_key_prefix("billing.", Some(TaskStatus::Running)) + .await + .unwrap(); + assert_eq!(tasks.len(), 0); +} + +#[tokio::test] +async fn count_by_tag_key_prefix_counts_distinct_tasks() { + let store = test_store().await; + + // Task with two billing.* tags should be counted once. + store + .submit( + &TaskSubmission::new("test") + .key("cp-1") + .tag("billing.plan", "free") + .tag("billing.customer_id", "cust_1"), + ) + .await + .unwrap(); + store + .submit( + &TaskSubmission::new("test") + .key("cp-2") + .tag("billing.plan", "pro"), + ) + .await + .unwrap(); + store + .submit( + &TaskSubmission::new("test") + .key("cp-3") + .tag("media.codec", "h265"), + ) + .await + .unwrap(); + + let count = store.count_by_tag_key_prefix("billing.", None).await.unwrap(); + assert_eq!(count, 2); // cp-1 counted once despite two matching keys + + let count = store.count_by_tag_key_prefix("media.", None).await.unwrap(); + assert_eq!(count, 1); +} + +#[tokio::test] +async fn tag_keys_by_prefix_empty_prefix_returns_all() { + let store = test_store().await; + + store + .submit( + &TaskSubmission::new("test") + .key("ep-1") + .tag("billing.plan", "free") + .tag("media.codec", "h265"), + ) + .await + .unwrap(); + + let keys = store.tag_keys_by_prefix("").await.unwrap(); + assert_eq!(keys.len(), 2); +} + +#[tokio::test] +async fn tag_keys_by_prefix_escapes_like_wildcards() { + let store = test_store().await; + + store + .submit( + &TaskSubmission::new("test") + .key("esc-1") + .tag("a%b.key", "val") + .tag("a_b.key", "val") + .tag("axb.key", "val"), + ) + .await + .unwrap(); + + // "a%b." should only match the literal "a%b.key", not "axb.key" + let keys = store.tag_keys_by_prefix("a%b.").await.unwrap(); + assert_eq!(keys, vec!["a%b.key"]); + + // "a_b." should only match the literal "a_b.key", not "axb.key" + let keys = store.tag_keys_by_prefix("a_b.").await.unwrap(); + assert_eq!(keys, vec!["a_b.key"]); +} + +// ── Domain-scoped (_with_prefix) tests ────────────────────────── + +#[tokio::test] +async fn tag_keys_by_prefix_with_prefix_scopes_to_task_type() { + let store = test_store().await; + + // Two different task_type prefixes, same tag key prefix. + store + .submit( + &TaskSubmission::new("billing::invoice") + .key("wp-1") + .tag("region.zone", "us-east"), + ) + .await + .unwrap(); + store + .submit( + &TaskSubmission::new("media::transcode") + .key("wp-2") + .tag("region.cluster", "gpu-1"), + ) + .await + .unwrap(); + + // Scoped to "billing::" — only sees billing task's tags. + let keys = store + .tag_keys_by_prefix_with_prefix("billing::", "region.") + .await + .unwrap(); + assert_eq!(keys, vec!["region.zone"]); + + // Scoped to "media::" — only sees media task's tags. + let keys = store + .tag_keys_by_prefix_with_prefix("media::", "region.") + .await + .unwrap(); + assert_eq!(keys, vec!["region.cluster"]); +} + +#[tokio::test] +async fn tasks_by_tag_key_prefix_with_prefix_scopes_to_task_type() { + let store = test_store().await; + + store + .submit( + &TaskSubmission::new("billing::charge") + .key("wpt-1") + .tag("billing.plan", "pro"), + ) + .await + .unwrap(); + store + .submit( + &TaskSubmission::new("media::encode") + .key("wpt-2") + .tag("billing.plan", "free"), + ) + .await + .unwrap(); + + // Both tasks have billing.* tags, but scoped query returns only the billing module's task. + let tasks = store + .tasks_by_tag_key_prefix_with_prefix("billing::", "billing.", None) + .await + .unwrap(); + assert_eq!(tasks.len(), 1); + assert_eq!(tasks[0].task_type, "billing::charge"); +} + +#[tokio::test] +async fn count_by_tag_key_prefix_with_prefix_scopes_to_task_type() { + let store = test_store().await; + + store + .submit( + &TaskSubmission::new("billing::charge") + .key("wpc-1") + .tag("billing.plan", "pro"), + ) + .await + .unwrap(); + store + .submit( + &TaskSubmission::new("media::encode") + .key("wpc-2") + .tag("billing.plan", "free"), + ) + .await + .unwrap(); + + let count = store + .count_by_tag_key_prefix_with_prefix("billing::", "billing.", None) + .await + .unwrap(); + assert_eq!(count, 1); + + // Global count sees both. + let count = store.count_by_tag_key_prefix("billing.", None).await.unwrap(); + assert_eq!(count, 2); +} From 71088a432f285571d38399538de6707adbf1d4aa Mon Sep 17 00:00:00 2001 From: DJ Majumdar Date: Sat, 21 Mar 2026 19:03:36 -0700 Subject: [PATCH 2/3] docs: document tag key prefix query APIs --- docs/multi-module-apps.md | 23 +++++++++++++++++++++++ docs/query-apis.md | 4 ++++ src/lib.rs | 10 ++++++++-- src/scheduler/submit.rs | 2 +- src/store/query/tags.rs | 2 +- 5 files changed, 37 insertions(+), 4 deletions(-) diff --git a/docs/multi-module-apps.md b/docs/multi-module-apps.md index 6e7e160..7f0e78a 100644 --- a/docs/multi-module-apps.md +++ b/docs/multi-module-apps.md @@ -271,6 +271,29 @@ process.cancel_where(|task| { }).await?; ``` +### Namespace-scoped queries with tag key prefixes + +When multiple libraries share a scheduler, each naturally namespaces its tags with a prefix (`billing.customer_id`, `media.pipeline`, etc.). Use the tag key prefix APIs to discover and operate on an entire namespace without knowing every possible key upfront: + +```rust +let billing: DomainHandle = scheduler.domain::(); + +// Discover all billing.* tag keys in use +let keys = billing.tag_keys_by_prefix("billing.").await?; +// e.g. ["billing.customer_id", "billing.plan", "billing.region"] + +// Count how many billing tasks are active +let count = billing.count_by_tag_key_prefix("billing.", None).await?; + +// Fetch those tasks (optionally filter by status) +let tasks = billing.tasks_by_tag_key_prefix("billing.", Some(TaskStatus::Pending)).await?; + +// Cancel all tasks in the billing namespace +let cancelled = billing.cancel_by_tag_key_prefix("billing.").await?; +``` + +LIKE wildcards (`%`, `_`) in the prefix are escaped automatically — only true prefix matching is performed. + ## Module-level pause and resume Each domain can be independently paused and resumed without affecting other domains. diff --git a/docs/query-apis.md b/docs/query-apis.md index 65d9821..2950326 100644 --- a/docs/query-apis.md +++ b/docs/query-apis.md @@ -43,6 +43,9 @@ These methods are available on `DomainHandle` and are automatically scoped to | `handle.tasks_by_tags(filters, status)` | `Vec` | Active tasks in this domain matching the given tag filters and optional status. | | `handle.count_by_tag(key, status)` | `Vec<(String, i64)>` | Tag value counts for a given key within this domain. | | `handle.tag_values(key)` | `Vec<(String, i64)>` | Distinct values for a tag key within this domain. | +| `handle.tag_keys_by_prefix(prefix)` | `Vec` | Discover distinct tag keys matching a prefix (e.g. `"billing."`) within this domain. | +| `handle.tasks_by_tag_key_prefix(prefix, status)` | `Vec` | Find tasks with any tag key matching the prefix, with optional status filter. | +| `handle.count_by_tag_key_prefix(prefix, status)` | `i64` | Count tasks with any tag key matching the prefix, with optional status filter. | ## Cross-domain operations (Scheduler) @@ -69,6 +72,7 @@ See [Multi-Module Applications](multi-module-apps.md#building-a-cross-module-das |--------|---------|-------------| | `handle.cancel_all()` | `Vec` | Cancel all tasks belonging to this domain. | | `handle.cancel_where(predicate)` | `Vec` | Cancel domain tasks matching a predicate. | +| `handle.cancel_by_tag_key_prefix(prefix)` | `Vec` | Cancel all tasks with any tag key matching the prefix. | `cancel_all()` and `cancel_where()` affect tasks in any active status: diff --git a/src/lib.rs b/src/lib.rs index 667ac31..49f1bf2 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -170,8 +170,14 @@ //! Tags are copied to history on all terminal transitions and are included in //! [`TaskEventHeader`] for event subscribers. //! -//! Query by tags via the domain handle with [`DomainHandle::tasks_by_tags`] -//! (AND semantics). +//! Query by exact tags via [`DomainHandle::tasks_by_tags`] (AND semantics), +//! or discover and query by tag key prefix via +//! [`DomainHandle::tag_keys_by_prefix`], +//! [`DomainHandle::tasks_by_tag_key_prefix`], +//! [`DomainHandle::count_by_tag_key_prefix`], and +//! [`DomainHandle::cancel_by_tag_key_prefix`]. Prefix queries are useful when +//! multiple libraries share a scheduler and namespace their tags +//! (`billing.customer_id`, `media.pipeline`, etc.). //! //! ## Delayed & scheduled tasks //! diff --git a/src/scheduler/submit.rs b/src/scheduler/submit.rs index ad4c6f9..eb6f461 100644 --- a/src/scheduler/submit.rs +++ b/src/scheduler/submit.rs @@ -339,7 +339,7 @@ impl Scheduler { /// Cancel all active tasks that have any tag key matching the given prefix. /// - /// Finds tasks via [`TaskStore::tasks_by_tag_key_prefix`] and cancels each. + /// Finds tasks via [`crate::TaskStore::tasks_by_tag_key_prefix`] and cancels each. /// Returns the ids of tasks that were successfully cancelled. pub async fn cancel_by_tag_key_prefix( &self, diff --git a/src/store/query/tags.rs b/src/store/query/tags.rs index 91aaca0..9a8b0ea 100644 --- a/src/store/query/tags.rs +++ b/src/store/query/tags.rs @@ -1,4 +1,4 @@ -//! Tag-based queries: filtering and aggregation by task tags. +//! Tag-based queries: filtering, aggregation, and prefix discovery by task tags. use crate::store::row_mapping::row_to_task_record; use crate::store::{StoreError, TaskStore}; From 2ce428b019d06bb8d96b18eb20de68b41e1b6764 Mon Sep 17 00:00:00 2001 From: DJ Majumdar Date: Sat, 21 Mar 2026 19:04:22 -0700 Subject: [PATCH 3/3] style: apply rustfmt formatting --- src/domain.rs | 10 ++-------- src/module.rs | 10 ++-------- src/scheduler/queries.rs | 15 +++++++++------ src/scheduler/submit.rs | 11 ++++++----- src/store/query/tags.rs | 5 +---- src/store/query/tests.rs | 15 ++++++++++++--- 6 files changed, 32 insertions(+), 34 deletions(-) diff --git a/src/domain.rs b/src/domain.rs index 12b1431..3c5c6ba 100644 --- a/src/domain.rs +++ b/src/domain.rs @@ -650,10 +650,7 @@ impl DomainHandle { } /// Discover tag keys matching a prefix within this domain. - pub async fn tag_keys_by_prefix( - &self, - prefix: &str, - ) -> Result, StoreError> { + pub async fn tag_keys_by_prefix(&self, prefix: &str) -> Result, StoreError> { self.inner.tag_keys_by_prefix(prefix).await } @@ -676,10 +673,7 @@ impl DomainHandle { } /// Cancel all domain tasks with any tag key matching the given prefix. - pub async fn cancel_by_tag_key_prefix( - &self, - prefix: &str, - ) -> Result, StoreError> { + pub async fn cancel_by_tag_key_prefix(&self, prefix: &str) -> Result, StoreError> { self.inner.cancel_by_tag_key_prefix(prefix).await } diff --git a/src/module.rs b/src/module.rs index f1c6ba7..f8f9f54 100644 --- a/src/module.rs +++ b/src/module.rs @@ -725,10 +725,7 @@ impl ModuleHandle { } /// Discover tag keys matching a prefix, scoped to this module's tasks. - pub async fn tag_keys_by_prefix( - &self, - prefix: &str, - ) -> Result, StoreError> { + pub async fn tag_keys_by_prefix(&self, prefix: &str) -> Result, StoreError> { self.scheduler .inner .store @@ -763,10 +760,7 @@ impl ModuleHandle { } /// Cancel module tasks with any tag key matching the given prefix. - pub async fn cancel_by_tag_key_prefix( - &self, - prefix: &str, - ) -> Result, StoreError> { + pub async fn cancel_by_tag_key_prefix(&self, prefix: &str) -> Result, StoreError> { let tasks = self .scheduler .inner diff --git a/src/scheduler/queries.rs b/src/scheduler/queries.rs index 2833aa3..218a8ee 100644 --- a/src/scheduler/queries.rs +++ b/src/scheduler/queries.rs @@ -93,10 +93,7 @@ impl Scheduler { } /// Discover all tag keys matching a prefix across active tasks. - pub async fn tag_keys_by_prefix( - &self, - prefix: &str, - ) -> Result, StoreError> { + pub async fn tag_keys_by_prefix(&self, prefix: &str) -> Result, StoreError> { self.inner.store.tag_keys_by_prefix(prefix).await } @@ -106,7 +103,10 @@ impl Scheduler { prefix: &str, status: Option, ) -> Result, StoreError> { - self.inner.store.tasks_by_tag_key_prefix(prefix, status).await + self.inner + .store + .tasks_by_tag_key_prefix(prefix, status) + .await } /// Count active tasks with any tag key matching the given prefix. @@ -115,7 +115,10 @@ impl Scheduler { prefix: &str, status: Option, ) -> Result { - self.inner.store.count_by_tag_key_prefix(prefix, status).await + self.inner + .store + .count_by_tag_key_prefix(prefix, status) + .await } /// Dead-lettered tasks (retries exhausted), newest first. diff --git a/src/scheduler/submit.rs b/src/scheduler/submit.rs index eb6f461..08d5608 100644 --- a/src/scheduler/submit.rs +++ b/src/scheduler/submit.rs @@ -341,11 +341,12 @@ impl Scheduler { /// /// Finds tasks via [`crate::TaskStore::tasks_by_tag_key_prefix`] and cancels each. /// Returns the ids of tasks that were successfully cancelled. - pub async fn cancel_by_tag_key_prefix( - &self, - prefix: &str, - ) -> Result, StoreError> { - let tasks = self.inner.store.tasks_by_tag_key_prefix(prefix, None).await?; + pub async fn cancel_by_tag_key_prefix(&self, prefix: &str) -> Result, StoreError> { + let tasks = self + .inner + .store + .tasks_by_tag_key_prefix(prefix, None) + .await?; let mut cancelled = Vec::new(); for task in &tasks { if self.cancel(task.id).await? { diff --git a/src/store/query/tags.rs b/src/store/query/tags.rs index 9a8b0ea..4c54dd6 100644 --- a/src/store/query/tags.rs +++ b/src/store/query/tags.rs @@ -227,10 +227,7 @@ impl TaskStore { /// /// Returns keys sorted alphabetically. The prefix is matched literally — /// LIKE wildcards (`%`, `_`) in the prefix are escaped. - pub async fn tag_keys_by_prefix( - &self, - prefix: &str, - ) -> Result, StoreError> { + pub async fn tag_keys_by_prefix(&self, prefix: &str) -> Result, StoreError> { let pattern = escape_like_prefix(prefix); let rows: Vec<(String,)> = sqlx::query_as( "SELECT DISTINCT key FROM task_tags WHERE key LIKE ? ESCAPE '\\' ORDER BY key", diff --git a/src/store/query/tests.rs b/src/store/query/tests.rs index ace2b6a..d6db544 100644 --- a/src/store/query/tests.rs +++ b/src/store/query/tests.rs @@ -355,7 +355,10 @@ async fn tasks_by_tag_key_prefix_finds_tasks() { .await .unwrap(); - let tasks = store.tasks_by_tag_key_prefix("billing.", None).await.unwrap(); + let tasks = store + .tasks_by_tag_key_prefix("billing.", None) + .await + .unwrap(); assert_eq!(tasks.len(), 2); // tp-1 and tp-2 let tasks = store.tasks_by_tag_key_prefix("media.", None).await.unwrap(); @@ -419,7 +422,10 @@ async fn count_by_tag_key_prefix_counts_distinct_tasks() { .await .unwrap(); - let count = store.count_by_tag_key_prefix("billing.", None).await.unwrap(); + let count = store + .count_by_tag_key_prefix("billing.", None) + .await + .unwrap(); assert_eq!(count, 2); // cp-1 counted once despite two matching keys let count = store.count_by_tag_key_prefix("media.", None).await.unwrap(); @@ -565,6 +571,9 @@ async fn count_by_tag_key_prefix_with_prefix_scopes_to_task_type() { assert_eq!(count, 1); // Global count sees both. - let count = store.count_by_tag_key_prefix("billing.", None).await.unwrap(); + let count = store + .count_by_tag_key_prefix("billing.", None) + .await + .unwrap(); assert_eq!(count, 2); }