Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions docs/multi-module-apps.md
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,29 @@ process.cancel_where(|task| {
}).await?;
```

### Namespace-scoped queries with tag key prefixes

When multiple libraries share a scheduler, each naturally namespaces its tags with a prefix (`billing.customer_id`, `media.pipeline`, etc.). Use the tag key prefix APIs to discover and operate on an entire namespace without knowing every possible key upfront:

```rust
let billing: DomainHandle<Billing> = scheduler.domain::<Billing>();

// Discover all billing.* tag keys in use
let keys = billing.tag_keys_by_prefix("billing.").await?;
// e.g. ["billing.customer_id", "billing.plan", "billing.region"]

// Count how many billing tasks are active
let count = billing.count_by_tag_key_prefix("billing.", None).await?;

// Fetch those tasks (optionally filter by status)
let tasks = billing.tasks_by_tag_key_prefix("billing.", Some(TaskStatus::Pending)).await?;

// Cancel all tasks in the billing namespace
let cancelled = billing.cancel_by_tag_key_prefix("billing.").await?;
```

LIKE wildcards (`%`, `_`) in the prefix are escaped automatically — only true prefix matching is performed.

## Module-level pause and resume

Each domain can be independently paused and resumed without affecting other domains.
Expand Down
4 changes: 4 additions & 0 deletions docs/query-apis.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ These methods are available on `DomainHandle<D>` and are automatically scoped to
| `handle.tasks_by_tags(filters, status)` | `Vec<TaskRecord>` | Active tasks in this domain matching the given tag filters and optional status. |
| `handle.count_by_tag(key, status)` | `Vec<(String, i64)>` | Tag value counts for a given key within this domain. |
| `handle.tag_values(key)` | `Vec<(String, i64)>` | Distinct values for a tag key within this domain. |
| `handle.tag_keys_by_prefix(prefix)` | `Vec<String>` | Discover distinct tag keys matching a prefix (e.g. `"billing."`) within this domain. |
| `handle.tasks_by_tag_key_prefix(prefix, status)` | `Vec<TaskRecord>` | Find tasks with any tag key matching the prefix, with optional status filter. |
| `handle.count_by_tag_key_prefix(prefix, status)` | `i64` | Count tasks with any tag key matching the prefix, with optional status filter. |

## Cross-domain operations (Scheduler)

Expand All @@ -69,6 +72,7 @@ See [Multi-Module Applications](multi-module-apps.md#building-a-cross-module-das
|--------|---------|-------------|
| `handle.cancel_all()` | `Vec<i64>` | Cancel all tasks belonging to this domain. |
| `handle.cancel_where(predicate)` | `Vec<i64>` | Cancel domain tasks matching a predicate. |
| `handle.cancel_by_tag_key_prefix(prefix)` | `Vec<i64>` | Cancel all tasks with any tag key matching the prefix. |

`cancel_all()` and `cancel_where()` affect tasks in any active status:

Expand Down
28 changes: 28 additions & 0 deletions src/domain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -649,6 +649,34 @@ impl<D: DomainKey> DomainHandle<D> {
self.inner.tasks_by_tags(filters, status).await
}

/// Discover tag keys matching a prefix within this domain.
pub async fn tag_keys_by_prefix(&self, prefix: &str) -> Result<Vec<String>, StoreError> {
self.inner.tag_keys_by_prefix(prefix).await
}

/// Find domain tasks with any tag key matching the given prefix.
pub async fn tasks_by_tag_key_prefix(
&self,
prefix: &str,
status: Option<TaskStatus>,
) -> Result<Vec<TaskRecord>, StoreError> {
self.inner.tasks_by_tag_key_prefix(prefix, status).await
}

/// Count domain tasks with any tag key matching the given prefix.
pub async fn count_by_tag_key_prefix(
&self,
prefix: &str,
status: Option<TaskStatus>,
) -> Result<i64, StoreError> {
self.inner.count_by_tag_key_prefix(prefix, status).await
}

/// Cancel all domain tasks with any tag key matching the given prefix.
pub async fn cancel_by_tag_key_prefix(&self, prefix: &str) -> Result<Vec<i64>, StoreError> {
self.inner.cancel_by_tag_key_prefix(prefix).await
}

/// Set the maximum number of concurrent tasks for this domain.
pub fn set_max_concurrency(&self, n: usize) {
self.inner.set_max_concurrency(n);
Expand Down
10 changes: 8 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,14 @@
//! Tags are copied to history on all terminal transitions and are included in
//! [`TaskEventHeader`] for event subscribers.
//!
//! Query by tags via the domain handle with [`DomainHandle::tasks_by_tags`]
//! (AND semantics).
//! Query by exact tags via [`DomainHandle::tasks_by_tags`] (AND semantics),
//! or discover and query by tag key prefix via
//! [`DomainHandle::tag_keys_by_prefix`],
//! [`DomainHandle::tasks_by_tag_key_prefix`],
//! [`DomainHandle::count_by_tag_key_prefix`], and
//! [`DomainHandle::cancel_by_tag_key_prefix`]. Prefix queries are useful when
//! multiple libraries share a scheduler and namespace their tags
//! (`billing.customer_id`, `media.pipeline`, etc.).
//!
//! ## Delayed & scheduled tasks
//!
Expand Down
52 changes: 52 additions & 0 deletions src/module.rs
Original file line number Diff line number Diff line change
Expand Up @@ -724,6 +724,58 @@ impl ModuleHandle {
.await
}

/// Discover tag keys matching a prefix, scoped to this module's tasks.
pub async fn tag_keys_by_prefix(&self, prefix: &str) -> Result<Vec<String>, StoreError> {
self.scheduler
.inner
.store
.tag_keys_by_prefix_with_prefix(&self.prefix, prefix)
.await
}

/// Find module tasks with any tag key matching the given prefix.
pub async fn tasks_by_tag_key_prefix(
&self,
prefix: &str,
status: Option<TaskStatus>,
) -> Result<Vec<TaskRecord>, StoreError> {
self.scheduler
.inner
.store
.tasks_by_tag_key_prefix_with_prefix(&self.prefix, prefix, status)
.await
}

/// Count module tasks with any tag key matching the given prefix.
pub async fn count_by_tag_key_prefix(
&self,
prefix: &str,
status: Option<TaskStatus>,
) -> Result<i64, StoreError> {
self.scheduler
.inner
.store
.count_by_tag_key_prefix_with_prefix(&self.prefix, prefix, status)
.await
}

/// Cancel module tasks with any tag key matching the given prefix.
pub async fn cancel_by_tag_key_prefix(&self, prefix: &str) -> Result<Vec<i64>, StoreError> {
let tasks = self
.scheduler
.inner
.store
.tasks_by_tag_key_prefix_with_prefix(&self.prefix, prefix, None)
.await?;
let mut cancelled = Vec::new();
for task in &tasks {
if self.scheduler.cancel(task.id).await? {
cancelled.push(task.id);
}
}
Ok(cancelled)
}

/// Estimated progress for all running tasks in this module.
pub async fn estimated_progress(&self) -> Vec<EstimatedProgress> {
let snapshots = self
Expand Down
29 changes: 29 additions & 0 deletions src/scheduler/queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,35 @@ impl Scheduler {
self.inner.store.tag_values(key).await
}

/// Discover all tag keys matching a prefix across active tasks.
pub async fn tag_keys_by_prefix(&self, prefix: &str) -> Result<Vec<String>, StoreError> {
self.inner.store.tag_keys_by_prefix(prefix).await
}

/// Find active tasks with any tag key matching the given prefix.
pub async fn tasks_by_tag_key_prefix(
&self,
prefix: &str,
status: Option<crate::task::TaskStatus>,
) -> Result<Vec<crate::task::TaskRecord>, StoreError> {
self.inner
.store
.tasks_by_tag_key_prefix(prefix, status)
.await
}

/// Count active tasks with any tag key matching the given prefix.
pub async fn count_by_tag_key_prefix(
&self,
prefix: &str,
status: Option<crate::task::TaskStatus>,
) -> Result<i64, StoreError> {
self.inner
.store
.count_by_tag_key_prefix(prefix, status)
.await
}

/// Dead-lettered tasks (retries exhausted), newest first.
///
/// These are tasks that failed with a retryable error but exhausted their
Expand Down
19 changes: 19 additions & 0 deletions src/scheduler/submit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,25 @@ impl Scheduler {
Ok(cancelled)
}

/// Cancel all active tasks that have any tag key matching the given prefix.
///
/// Finds tasks via [`crate::TaskStore::tasks_by_tag_key_prefix`] and cancels each.
/// Returns the ids of tasks that were successfully cancelled.
pub async fn cancel_by_tag_key_prefix(&self, prefix: &str) -> Result<Vec<i64>, StoreError> {
let tasks = self
.inner
.store
.tasks_by_tag_key_prefix(prefix, None)
.await?;
let mut cancelled = Vec::new();
for task in &tasks {
if self.cancel(task.id).await? {
cancelled.push(task.id);
}
}
Ok(cancelled)
}

/// Cancel all tasks matching a predicate.
pub async fn cancel_where(
&self,
Expand Down
Loading
Loading