From ed38e3ee0530b07aacd299b80fa6cef30750c522 Mon Sep 17 00:00:00 2001 From: DJ Majumdar Date: Tue, 24 Mar 2026 18:42:46 -0700 Subject: [PATCH 1/2] feat: implement spawn_sibling_with() for automatic parent-ID inheritance (#87) Add SiblingSpawnBuilder and DomainTaskContext::spawn_sibling_with() / spawn_siblings_with() so child tasks can spawn peer tasks under the same parent without manually threading the orchestrator's task ID. - SiblingSpawnBuilder routes through ModuleHandle for correct TTL/tag inheritance from the orchestrator (not the current task) - ModuleHandle::submit_batch for single-transaction batch spawns - DomainSubmitBuilder::sibling_of() for cross-domain sibling spawns - Wire spawn_children_with through submit_batch (perf fix) - Re-export SiblingSpawnBuilder from crate root - Integration tests covering inheritance, error cases, batch, and cross-domain paths --- examples/test_sibling.rs | 154 +++++++ src/domain.rs | 23 + src/lib.rs | 2 +- src/module.rs | 54 ++- src/registry/context.rs | 13 +- src/registry/domain_context.rs | 217 +++++++++ src/registry/mod.rs | 2 +- src/task/mod.rs | 22 + src/task/submit_builder.rs | 10 + tests/integration.rs | 2 + tests/integration/sibling_spawn.rs | 685 +++++++++++++++++++++++++++++ 11 files changed, 1174 insertions(+), 10 deletions(-) create mode 100644 examples/test_sibling.rs create mode 100644 tests/integration/sibling_spawn.rs diff --git a/examples/test_sibling.rs b/examples/test_sibling.rs new file mode 100644 index 0000000..e7651f6 --- /dev/null +++ b/examples/test_sibling.rs @@ -0,0 +1,154 @@ +use serde::{Deserialize, Serialize}; +use std::time::Duration; +use taskmill::store::TaskStore; +use taskmill::{ + Domain, DomainKey, DomainTaskContext, Scheduler, SchedulerEvent, TaskError, TaskSubmission, + TypedExecutor, TypedTask, +}; +use tokio_util::sync::CancellationToken; + +struct TestDomain; +impl DomainKey for TestDomain { + const NAME: &'static str = "test"; +} + +#[derive(Debug, Default, Clone, Serialize, Deserialize)] +struct OrcTask; +impl TypedTask for OrcTask { + type Domain = TestDomain; + const TASK_TYPE: &'static str = "orc"; +} + +#[derive(Debug, Default, Clone, Serialize, Deserialize)] +struct ChildT; +impl TypedTask for ChildT { + type Domain = TestDomain; + const TASK_TYPE: &'static str = "child"; +} + +#[derive(Debug, Default, Clone, Serialize, Deserialize)] +struct SibT; +impl TypedTask for SibT { + type Domain = TestDomain; + const TASK_TYPE: &'static str = "sib"; +} + +struct OrcExec; +impl TypedExecutor for OrcExec { + async fn execute<'a>( + &'a self, + _: OrcTask, + ctx: DomainTaskContext<'a, TestDomain>, + ) -> Result<(), TaskError> { + ctx.spawn_child_with(ChildT) + .key("c0") + .await + .map_err(|e| TaskError::new(e.to_string()))?; + Ok(()) + } +} + +struct ChildExec; +impl TypedExecutor for ChildExec { + async fn execute<'a>( + &'a self, + _: ChildT, + ctx: DomainTaskContext<'a, TestDomain>, + ) -> Result<(), TaskError> { + eprintln!( + " ChildExec: spawning sibling (my parent_id={:?})", + ctx.record().parent_id + ); + let outcome = ctx + .spawn_sibling_with(SibT) + .key("s0") + .await + .map_err(|e| TaskError::new(e.to_string()))?; + eprintln!(" ChildExec: sibling spawned, outcome={:?}", outcome); + Ok(()) + } +} + +struct SibExec; +impl TypedExecutor for SibExec { + async fn execute<'a>( + &'a self, + _: SibT, + ctx: DomainTaskContext<'a, TestDomain>, + ) -> Result<(), TaskError> { + eprintln!( + " SibExec: running (parent_id={:?})", + ctx.record().parent_id + ); + Ok(()) + } +} + +#[tokio::main] +async fn main() { + let store = TaskStore::open_memory().await.unwrap(); + let qs = store.clone(); + + let sched = Scheduler::builder() + .store(store) + .domain( + Domain::::new() + .task::(OrcExec) + .task::(ChildExec) + .task::(SibExec), + ) + .max_concurrency(4) + .poll_interval(Duration::from_millis(20)) + .build() + .await + .unwrap(); + + let orc = sched + .submit(&TaskSubmission::new("test::orc").key("o0")) + .await + .unwrap(); + eprintln!("Orchestrator submitted: {:?}", orc); + + let mut rx = sched.subscribe(); + let token = CancellationToken::new(); + let sc = sched.clone(); + let tc = token.clone(); + let h = tokio::spawn(async move { + sc.run(tc).await; + }); + + let deadline = tokio::time::Instant::now() + Duration::from_secs(5); + while tokio::time::Instant::now() < deadline { + match tokio::time::timeout(Duration::from_millis(200), rx.recv()).await { + Ok(Ok(evt)) => { + if let Some(hdr) = evt.header() { + eprintln!( + "Event: {:?} type={} id={}", + std::mem::discriminant(&evt), + hdr.task_type, + hdr.task_id + ); + } else { + eprintln!("Event: {:?}", evt); + } + if matches!(&evt, SchedulerEvent::Completed(h) if h.task_type == "test::orc") { + break; + } + } + _ => {} + } + } + + // Check history + for id in 1..=10 { + if let Ok(Some(h)) = qs.history_by_id(id).await { + eprintln!( + "History id={}: type={} parent_id={:?}", + h.id, h.task_type, h.parent_id + ); + } + } + + token.cancel(); + let _ = h.await; +} diff --git a/src/domain.rs b/src/domain.rs index 53b8d78..4ace80a 100644 --- a/src/domain.rs +++ b/src/domain.rs @@ -962,6 +962,29 @@ impl DomainSubmitBuilder { self.parent(ctx.record().id) } + /// Mark this task as a sibling of the task currently executing in the + /// given [`DomainTaskContext`] (shares the same parent). + /// + /// Returns `Err(StoreError::InvalidState)` if the context task has no + /// `parent_id`. + /// + /// ```ignore + /// ctx.domain::() + /// .submit_with(ScanStartedEvent { .. }) + /// .sibling_of(&ctx)? + /// .priority(Priority::HIGH) + /// .await?; + /// ``` + pub fn sibling_of( + self, + ctx: &DomainTaskContext<'_, D2>, + ) -> Result { + let pid = ctx.record().parent_id.ok_or_else(|| { + StoreError::InvalidState("sibling_of called on a task with no parent_id".into()) + })?; + Ok(self.parent(pid)) + } + /// Submit the task, returning the outcome. pub async fn submit(self) -> Result { self.inner.submit().await diff --git a/src/lib.rs b/src/lib.rs index 5a00b26..a6db414 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -839,7 +839,7 @@ pub use domain::{ Domain, DomainHandle, DomainKey, DomainSubmitBuilder, TaskEvent, TaskTypeConfig, TaskTypeOptions, TypedEventStream, TypedExecutor, }; -pub use registry::{ChildSpawnBuilder, DomainTaskContext}; +pub use registry::{ChildSpawnBuilder, DomainTaskContext, SiblingSpawnBuilder}; // ── Core re-exports ────────────────────────────────────────────────── pub use backpressure::{CompositePressure, PressureSource, ThrottlePolicy}; diff --git a/src/module.rs b/src/module.rs index efb792d..8e3311d 100644 --- a/src/module.rs +++ b/src/module.rs @@ -20,7 +20,7 @@ use crate::task::retry::RetryPolicy; use crate::task::submit_builder::TypedTaskDefaults; use crate::task::{ModuleSubmitDefaults, SubmitBuilder}; use crate::task::{ - SubmitOutcome, TaskHistoryRecord, TaskRecord, TaskStatus, TaskSubmission, TypedTask, + SubmitOutcome, TaskHistoryRecord, TaskRecord, TaskStatus, TaskSubmission, TtlFrom, TypedTask, }; /// Per-executor options for task type registration within a module. @@ -487,6 +487,58 @@ impl ModuleHandle { self.submit(sub).with_typed_defaults(typed_defaults) } + /// Submit multiple tasks in a single transaction. + /// + /// Each submission gets the module prefix and defaults applied (the + /// `submit()` path — non-typed defaults fill gaps only). When all + /// submissions share the same `parent_id`, the parent record is fetched + /// once and used to inherit remaining TTL and tags for the batch. + /// + /// Used by `spawn_children_with` and `spawn_siblings_with` for + /// single-transaction efficiency. + pub(crate) async fn submit_batch( + &self, + submissions: Vec, + ) -> Result, StoreError> { + let mut resolved = Vec::with_capacity(submissions.len()); + for sub in submissions { + // Reuse SubmitBuilder::resolve() for prefix + defaults (submit() path). + let builder = SubmitBuilder::new( + sub, + self.scheduler.clone(), + self.name.as_ref(), + self.defaults.clone(), + ); + let (_sched, r) = builder.resolve_only(); + resolved.push(r); + } + + // Inherit parent TTL and tags for all submissions sharing a parent_id. + // Fetch the parent record once and apply to all. + if let Some(pid) = resolved.first().and_then(|s| s.parent_id) { + if let Ok(Some(parent)) = self.scheduler.store().task_by_id(pid).await { + for s in &mut resolved { + // TTL inheritance (only if no layer set a TTL) + if s.ttl.is_none() { + if let Some(remaining) = parent.remaining_ttl() { + s.ttl = Some(remaining); + s.ttl_from = TtlFrom::Submission; + } + } + // Tag inheritance (parent fills gaps) + for (k, v) in &parent.tags { + s.tags.entry(k.clone()).or_insert_with(|| v.clone()); + } + } + } + } + + self.scheduler + .submit_batch(&resolved) + .await + .map(|batch_outcome| batch_outcome.outcomes) + } + // ── Single-task operations ──────────────────────────────────── /// Cancel a task by ID. diff --git a/src/registry/context.rs b/src/registry/context.rs index 042de31..2bf0bb1 100644 --- a/src/registry/context.rs +++ b/src/registry/context.rs @@ -314,12 +314,11 @@ impl TaskContext { return spawner.spawn_batch(&mut submissions).await; } - // Module-aware path: prepare each submission then submit via module handle. - let mut outcomes = Vec::with_capacity(submissions.len()); - for sub in submissions { - let sub = spawner.prepare(sub); - outcomes.push(self.current_module().submit(sub).await?); - } - Ok(outcomes) + // Module-aware path: prepare each submission then batch-submit via module handle. + let prepared: Vec<_> = submissions + .into_iter() + .map(|sub| spawner.prepare(sub)) + .collect(); + self.current_module().submit_batch(prepared).await } } diff --git a/src/registry/domain_context.rs b/src/registry/domain_context.rs index d5ee8b1..c6ffe4c 100644 --- a/src/registry/domain_context.rs +++ b/src/registry/domain_context.rs @@ -217,6 +217,105 @@ impl<'a, D: DomainKey> DomainTaskContext<'a, D> { .collect(); self.inner.spawn_children(submissions).await } + + // ── Typed sibling spawning ───────────────────────────────────── + + /// Spawn a same-domain sibling task (shares this task's parent). + /// + /// The new task's `parent_id` is set to this task's `parent_id`, making + /// it a peer under the same orchestrator. Returns a + /// [`SiblingSpawnBuilder`] for optional per-call overrides (`.key()`, + /// `.priority()`, etc.), then `.await` to submit. + /// + /// Returns `StoreError::InvalidState` if this task has no `parent_id`. + /// Only available in `execute()`, not `finalize()`. + /// + /// # Parent-relationship table + /// + /// | Method | `parent_id` on new task | + /// |---|---| + /// | `submit_with(task)` | `None` (root) | + /// | `submit_with(task).parent(id)` | Explicit ID | + /// | `ctx.spawn_child_with(task)` | Current task's ID | + /// | `ctx.spawn_sibling_with(task)` | Current task's `parent_id` | + /// + /// # Example + /// + /// ```ignore + /// // Inside a child executor — spawn a peer task under the same orchestrator: + /// ctx.spawn_sibling_with(ScanL1DirTask { bucket, prefix }) + /// .key(&format!("{bucket}:{prefix}")) + /// .await?; + /// ``` + pub fn spawn_sibling_with>( + &self, + task: T, + ) -> SiblingSpawnBuilder<'a, D, T> { + SiblingSpawnBuilder { + ctx: self.inner, + task, + override_key: None, + override_priority: None, + override_ttl: None, + override_group: None, + _domain: PhantomData, + } + } + + /// Spawn multiple same-domain siblings in one call. + /// + /// Routes through `ModuleHandle::submit_batch` for single-transaction + /// efficiency. Each sibling gets its own `TaskSubmission` default for + /// `fail_fast` (true). + /// + /// Returns `StoreError::InvalidState` if this task has no `parent_id`. + /// + /// # Example + /// + /// ```ignore + /// let siblings: Vec = dirs.into_iter() + /// .map(|d| ScanL1DirTask { bucket: bucket.clone(), dir: d }) + /// .collect(); + /// ctx.spawn_siblings_with(siblings).await?; + /// ``` + pub async fn spawn_siblings_with>( + &self, + tasks: impl IntoIterator, + ) -> Result, StoreError> { + let parent_id = self.inner.record().parent_id.ok_or_else(|| { + StoreError::InvalidState(format!( + "spawn_siblings_with called on task {} which has no parent_id", + self.inner.record().id, + )) + })?; + + let submissions: Vec = tasks + .into_iter() + .map(|t| { + let mut sub = TaskSubmission::from_typed(&t); + sub.parent_id = Some(parent_id); + // Apply priority aging if enabled + if let Some(ref config) = self.inner.aging_config { + let parent = self.inner.record(); + let parent_effective = parent.effective_priority(Some(config)); + let child_config = ::config() + .priority + .unwrap_or(crate::priority::Priority::NORMAL); + let inherited = crate::priority::Priority::new( + parent_effective.value().min(child_config.value()), + ); + sub = sub.priority(inherited); + } + sub + }) + .collect(); + + if submissions.is_empty() { + return Ok(Vec::new()); + } + + self.inner.current_module().submit_batch(submissions).await + } } // ── ChildSpawnBuilder ─────────────────────────────────────────────── @@ -312,3 +411,121 @@ impl<'a, D: DomainKey, T: TypedTask> std::future::IntoFuture Box::pin(self.submit()) } } + +// ── SiblingSpawnBuilder ───────────────────────────────────────────── + +/// Builder for spawning a single typed sibling task with optional per-call +/// overrides. +/// +/// Created by [`DomainTaskContext::spawn_sibling_with`]. Chain override methods +/// then `.await` to submit. The new task's `parent_id` is set to the spawning +/// task's `parent_id` (i.e. the orchestrator), making it a peer under the +/// same parent. +/// +/// Implements [`IntoFuture`] so bare `.await` works: +/// +/// ```ignore +/// ctx.spawn_sibling_with(task).key("my-key").await?; +/// ``` +pub struct SiblingSpawnBuilder<'a, D: DomainKey, T: TypedTask> { + ctx: &'a TaskContext, + task: T, + override_key: Option, + override_priority: Option, + override_ttl: Option, + override_group: Option, + _domain: PhantomData, +} + +impl<'a, D: DomainKey, T: TypedTask> SiblingSpawnBuilder<'a, D, T> { + /// Override the dedup key for this sibling task. + pub fn key(mut self, k: impl Into) -> Self { + self.override_key = Some(k.into()); + self + } + + /// Override the priority for this sibling task. + pub fn priority(mut self, p: crate::priority::Priority) -> Self { + self.override_priority = Some(p); + self + } + + /// Override the time-to-live for this sibling task. + pub fn ttl(mut self, d: std::time::Duration) -> Self { + self.override_ttl = Some(d); + self + } + + /// Override the group key for this sibling task. + pub fn group(mut self, key: impl Into) -> Self { + self.override_group = Some(key.into()); + self + } + + /// Submit the sibling task. + /// + /// Routes through `ModuleHandle` → `SubmitBuilder` so that module + /// prefix and defaults are applied, and the orchestrator's TTL and + /// tags are correctly inherited (not the current task's). + /// + /// Returns `StoreError::InvalidState` if the spawning task has no + /// `parent_id`. + pub async fn submit(self) -> Result { + let parent_id = self.ctx.record().parent_id.ok_or_else(|| { + StoreError::InvalidState(format!( + "spawn_sibling_with called on task {} which has no parent_id", + self.ctx.record().id, + )) + })?; + + let mut sub = TaskSubmission::from_typed(&self.task); + + // Apply builder overrides + if let Some(k) = self.override_key { + sub = sub.key(k); + } + if let Some(p) = self.override_priority { + sub = sub.priority(p); + } else if let Some(ref config) = self.ctx.aging_config { + // Priority aging: use current task's effective priority as baseline + // (same as ChildSpawnBuilder — current task already inherited from + // the orchestrator at its own dispatch time). + let parent = self.ctx.record(); + let parent_effective = parent.effective_priority(Some(config)); + let child_config = ::config() + .priority + .unwrap_or(crate::priority::Priority::NORMAL); + let inherited = + crate::priority::Priority::new(parent_effective.value().min(child_config.value())); + sub = sub.priority(inherited); + } + if let Some(d) = self.override_ttl { + sub = sub.ttl(d); + } + if let Some(g) = self.override_group { + sub = sub.group(g); + } + + // Route through module handle — SubmitBuilder.submit() fetches the + // parent record from the store to inherit TTL and tags correctly. + // This gives us orchestrator's remaining TTL, not current task's TTL. + self.ctx + .current_module() + .submit(sub) + .parent(parent_id) + .submit() + .await + } +} + +impl<'a, D: DomainKey, T: TypedTask> std::future::IntoFuture + for SiblingSpawnBuilder<'a, D, T> +{ + type Output = Result; + type IntoFuture = + std::pin::Pin + Send + 'a>>; + + fn into_future(self) -> Self::IntoFuture { + Box::pin(self.submit()) + } +} diff --git a/src/registry/mod.rs b/src/registry/mod.rs index 02c3fd6..bff88a2 100644 --- a/src/registry/mod.rs +++ b/src/registry/mod.rs @@ -24,7 +24,7 @@ use crate::task::TaskError; pub(crate) use child_spawner::{ChildSpawner, ParentContext}; pub(crate) use context::TaskContext; -pub use domain_context::{ChildSpawnBuilder, DomainTaskContext}; +pub use domain_context::{ChildSpawnBuilder, DomainTaskContext, SiblingSpawnBuilder}; pub(crate) use io_tracker::IoTracker; pub(crate) use state::{StateMap, StateSnapshot}; diff --git a/src/task/mod.rs b/src/task/mod.rs index 20cb866..ec5a572 100644 --- a/src/task/mod.rs +++ b/src/task/mod.rs @@ -327,6 +327,28 @@ impl TaskRecord { ) } + /// Compute the remaining TTL for this task record. + /// + /// Returns `None` if the task has no TTL, or if the TTL start hasn't been + /// reached yet (e.g. `TtlFrom::FirstAttempt` on a task that hasn't started). + /// Used by sibling/child spawning to inherit the parent's remaining TTL. + pub fn remaining_ttl(&self) -> Option { + let parent_ttl_secs = self.ttl_seconds?; + let parent_ttl = std::time::Duration::from_secs(parent_ttl_secs as u64); + + let ttl_start = match self.ttl_from { + TtlFrom::Submission => Some(self.created_at), + TtlFrom::FirstAttempt => self.started_at, + }; + let start = ttl_start?; + let elapsed = chrono::Utc::now() - start; + let elapsed_std = elapsed.to_std().unwrap_or_default(); + + parent_ttl + .checked_sub(elapsed_std) + .filter(|r| *r > std::time::Duration::ZERO) + } + /// Build a [`TaskEventHeader`](crate::scheduler::event::TaskEventHeader) from this record. /// /// When `aging_config` is provided, the header's `effective_priority` reflects diff --git a/src/task/submit_builder.rs b/src/task/submit_builder.rs index 1617f92..dec0ae8 100644 --- a/src/task/submit_builder.rs +++ b/src/task/submit_builder.rs @@ -259,6 +259,16 @@ impl SubmitBuilder { (self.scheduler, self.submission) } + /// Apply all default layers and per-call overrides, returning the + /// fully resolved [`TaskSubmission`] without submitting. + /// + /// Used by `ModuleHandle::submit_batch` to prepare each submission + /// before a single-transaction batch submit with shared parent + /// TTL/tag inheritance. + pub(crate) fn resolve_only(self) -> (Scheduler, TaskSubmission) { + self.resolve() + } + /// Prefix `task_type` with the module name (e.g. `"thumbnail"` → /// `"media::thumbnail"`). Updates `label` when it matches the old /// unprefixed type. diff --git a/tests/integration.rs b/tests/integration.rs index 9fdeb0d..76dffe8 100644 --- a/tests/integration.rs +++ b/tests/integration.rs @@ -38,5 +38,7 @@ mod rate_limit; mod retry_policy; #[path = "integration/scheduler_core.rs"] mod scheduler_core; +#[path = "integration/sibling_spawn.rs"] +mod sibling_spawn; #[path = "integration/typed_events.rs"] mod typed_events; diff --git a/tests/integration/sibling_spawn.rs b/tests/integration/sibling_spawn.rs new file mode 100644 index 0000000..1fe3758 --- /dev/null +++ b/tests/integration/sibling_spawn.rs @@ -0,0 +1,685 @@ +//! Integration tests for `spawn_sibling_with` / `spawn_siblings_with` and +//! `DomainSubmitBuilder::sibling_of`. + +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; +use std::time::Duration; + +use serde::{Deserialize, Serialize}; +use taskmill::store::TaskStore; +use taskmill::{ + Domain, DomainTaskContext, Priority, Scheduler, SchedulerEvent, TaskError, TaskSubmission, + TypedExecutor, TypedTask, +}; +use tokio_util::sync::CancellationToken; + +use super::common::*; + +// ── Task types for sibling tests ────────────────────────────────── + +define_task!(OrchestratorTask, TestDomain, "orchestrator"); +define_task!(SiblingTask, TestDomain, "sibling"); +define_task!(SiblingChildTask, TestDomain, "sibling-child"); + +// Cross-domain sibling task +define_task!(CrossDomainSiblingTask, MediaDomain, "cross-sibling"); + +// Indexed sibling task for batch tests (unique payload per instance avoids dedup) +#[derive(Debug, Clone, Serialize, Deserialize)] +struct IndexedSiblingTask { + index: u32, +} + +impl TypedTask for IndexedSiblingTask { + type Domain = TestDomain; + const TASK_TYPE: &'static str = "indexed-sibling"; +} + +// ── Executors ───────────────────────────────────────────────────── + +/// Executor that spawns N children, each of which will spawn siblings. +struct OrchestratorExecutor { + child_count: usize, +} + +impl TypedExecutor for OrchestratorExecutor { + async fn execute<'a>( + &'a self, + _payload: OrchestratorTask, + ctx: DomainTaskContext<'a, TestDomain>, + ) -> Result<(), TaskError> { + for i in 0..self.child_count { + ctx.spawn_child_with(SiblingChildTask) + .key(format!("child-{i}")) + .await + .map_err(|e| TaskError::new(e.to_string()))?; + } + Ok(()) + } +} + +/// Executor for a child that spawns a single sibling. +struct SiblingSpawnerExecutor; + +impl TypedExecutor for SiblingSpawnerExecutor { + async fn execute<'a>( + &'a self, + _payload: SiblingChildTask, + ctx: DomainTaskContext<'a, TestDomain>, + ) -> Result<(), TaskError> { + ctx.spawn_sibling_with(SiblingTask) + .key(format!("sibling-of-{}", ctx.record().id)) + .await + .map_err(|e| TaskError::new(e.to_string()))?; + Ok(()) + } +} + +/// Executor for a child that spawns multiple siblings in batch. +struct BatchSiblingSpawnerExecutor { + count: usize, +} + +impl TypedExecutor for BatchSiblingSpawnerExecutor { + async fn execute<'a>( + &'a self, + _payload: SiblingChildTask, + ctx: DomainTaskContext<'a, TestDomain>, + ) -> Result<(), TaskError> { + let tasks: Vec = (0..self.count) + .map(|i| IndexedSiblingTask { index: i as u32 }) + .collect(); + ctx.spawn_siblings_with(tasks) + .await + .map_err(|e| TaskError::new(e.to_string()))?; + Ok(()) + } +} + +/// Executor that tries to spawn a sibling from a root task — should fail. +struct RootSiblingSpawnerExecutor; + +impl TypedExecutor for RootSiblingSpawnerExecutor { + async fn execute<'a>( + &'a self, + _payload: OrchestratorTask, + ctx: DomainTaskContext<'a, TestDomain>, + ) -> Result<(), TaskError> { + match ctx.spawn_sibling_with(SiblingTask).await { + Err(e) => { + // Expected — root task has no parent_id. + assert!( + e.to_string().contains("no parent_id"), + "expected InvalidState error, got: {e}" + ); + Ok(()) + } + Ok(_) => Err(TaskError::new("should have failed")), + } + } +} + +/// Finalize tracker for orchestrator — verifies all children+siblings complete. +struct OrchestratorFinalizeTracker { + child_count: usize, + finalized: Arc, +} + +impl TypedExecutor for OrchestratorFinalizeTracker { + async fn execute<'a>( + &'a self, + _payload: OrchestratorTask, + ctx: DomainTaskContext<'a, TestDomain>, + ) -> Result<(), TaskError> { + for i in 0..self.child_count { + ctx.spawn_child_with(SiblingChildTask) + .key(format!("child-{i}")) + .await + .map_err(|e| TaskError::new(e.to_string()))?; + } + Ok(()) + } + + async fn finalize<'a>( + &'a self, + _payload: OrchestratorTask, + _memo: (), + _ctx: DomainTaskContext<'a, TestDomain>, + ) -> Result<(), TaskError> { + self.finalized.store(true, Ordering::SeqCst); + Ok(()) + } +} + +/// Cross-domain sibling spawner — uses `sibling_of()`. +struct CrossDomainSiblingSpawner; + +impl TypedExecutor for CrossDomainSiblingSpawner { + async fn execute<'a>( + &'a self, + _payload: SiblingChildTask, + ctx: DomainTaskContext<'a, TestDomain>, + ) -> Result<(), TaskError> { + ctx.domain::() + .submit_with(CrossDomainSiblingTask) + .sibling_of(&ctx) + .map_err(|e| TaskError::new(e.to_string()))? + .key(format!("cross-sibling-{}", ctx.record().id)) + .await + .map_err(|e| TaskError::new(e.to_string()))?; + Ok(()) + } +} + +/// Cross-domain root sibling spawner — should fail. +struct CrossDomainRootSiblingSpawner; + +impl TypedExecutor for CrossDomainRootSiblingSpawner { + async fn execute<'a>( + &'a self, + _payload: OrchestratorTask, + ctx: DomainTaskContext<'a, TestDomain>, + ) -> Result<(), TaskError> { + let result = ctx + .domain::() + .submit_with(CrossDomainSiblingTask) + .sibling_of(&ctx); + match result { + Err(e) => { + assert!( + e.to_string().contains("no parent_id"), + "expected InvalidState error, got: {e}" + ); + Ok(()) + } + Ok(_) => Err(TaskError::new("should have failed")), + } + } +} + +/// Builder-override sibling spawner — tests key/priority/group overrides. +struct OverrideSiblingSpawner; + +impl TypedExecutor for OverrideSiblingSpawner { + async fn execute<'a>( + &'a self, + _payload: SiblingChildTask, + ctx: DomainTaskContext<'a, TestDomain>, + ) -> Result<(), TaskError> { + ctx.spawn_sibling_with(SiblingTask) + .key("custom-key") + .priority(Priority::HIGH) + .group("custom-group") + .ttl(Duration::from_secs(300)) + .await + .map_err(|e| TaskError::new(e.to_string()))?; + Ok(()) + } +} + +// ── Tests ───────────────────────────────────────────────────────── + +/// 1. Orchestrator spawns child A. Child A calls `spawn_sibling_with(B)`. +/// Assert B's `parent_id == orchestrator.id`. +#[tokio::test] +async fn sibling_spawn_inherits_parent_id() { + let store = TaskStore::open_memory().await.unwrap(); + let query_store = store.clone(); + + let sched = Scheduler::builder() + .store(store) + .domain( + Domain::::new() + .task::(OrchestratorExecutor { child_count: 1 }) + .task::(SiblingSpawnerExecutor) + .task::(NoopExecutor), + ) + .max_concurrency(4) + .poll_interval(Duration::from_millis(20)) + .build() + .await + .unwrap(); + + let orchestrator_outcome = sched + .submit(&TaskSubmission::new("test::orchestrator").key("orch-1")) + .await + .unwrap(); + let orchestrator_id = orchestrator_outcome.id().unwrap(); + + let mut rx = sched.subscribe(); + let token = CancellationToken::new(); + let sched_clone = sched.clone(); + let token_clone = token.clone(); + let handle = tokio::spawn(async move { + sched_clone.run(token_clone).await; + }); + + // Wait for sibling to complete. + let deadline = tokio::time::Instant::now() + Duration::from_secs(5); + let mut sibling_key = None; + while tokio::time::Instant::now() < deadline { + match tokio::time::timeout(Duration::from_millis(100), rx.recv()).await { + Ok(Ok(SchedulerEvent::Completed(ref h))) if h.task_type == "test::sibling" => { + sibling_key = Some(h.key.clone()); + break; + } + _ => continue, + } + } + + assert!(sibling_key.is_some(), "sibling should have completed"); + + // Query store while scheduler is still running. + let history = query_store + .latest_history_by_key(sibling_key.as_ref().unwrap()) + .await + .unwrap() + .unwrap(); + assert_eq!( + history.parent_id, + Some(orchestrator_id), + "sibling's parent_id should be the orchestrator's id" + ); + + token.cancel(); + let _ = handle.await; +} + +/// 2. Root task calls `spawn_sibling_with(X)`. Assert `StoreError::InvalidState`. +#[tokio::test] +async fn sibling_spawn_no_parent_returns_error() { + let sched = Scheduler::builder() + .store(TaskStore::open_memory().await.unwrap()) + .domain( + Domain::::new() + .task::(RootSiblingSpawnerExecutor) + .task::(NoopExecutor), + ) + .max_concurrency(4) + .poll_interval(Duration::from_millis(20)) + .build() + .await + .unwrap(); + + sched + .submit(&TaskSubmission::new("test::orchestrator").key("root-sib")) + .await + .unwrap(); + + let mut rx = sched.subscribe(); + let token = CancellationToken::new(); + let sched_clone = sched.clone(); + let token_clone = token.clone(); + let handle = tokio::spawn(async move { + sched_clone.run(token_clone).await; + }); + + // The executor asserts that spawning a sibling from a root task returns an error + // and then returns Ok(()). Wait for it to complete. + let deadline = tokio::time::Instant::now() + Duration::from_secs(5); + let completed = wait_for_event(&mut rx, deadline, |evt| { + matches!(evt, SchedulerEvent::Completed(ref h) if h.task_type == "test::orchestrator") + }) + .await; + + token.cancel(); + let _ = handle.await; + + assert!( + completed.is_some(), + "task should complete after handling error" + ); +} + +/// 3. Child spawns 10 siblings via `spawn_siblings_with()`. +/// Assert all have `parent_id == orchestrator.id`. +#[tokio::test] +async fn sibling_spawn_batch() { + let store = TaskStore::open_memory().await.unwrap(); + let query_store = store.clone(); + + let sched = Scheduler::builder() + .store(store) + .domain( + Domain::::new() + .task::(OrchestratorExecutor { child_count: 1 }) + .task::(BatchSiblingSpawnerExecutor { count: 10 }) + .task::(NoopExecutor), + ) + .max_concurrency(12) + .poll_interval(Duration::from_millis(20)) + .build() + .await + .unwrap(); + + let orchestrator_outcome = sched + .submit(&TaskSubmission::new("test::orchestrator").key("orch-batch")) + .await + .unwrap(); + let orchestrator_id = orchestrator_outcome.id().unwrap(); + + let mut rx = sched.subscribe(); + let token = CancellationToken::new(); + let sched_clone = sched.clone(); + let token_clone = token.clone(); + let handle = tokio::spawn(async move { + sched_clone.run(token_clone).await; + }); + + // Wait for all 10 siblings to complete. + let deadline = tokio::time::Instant::now() + Duration::from_secs(10); + let mut sibling_keys = Vec::new(); + while tokio::time::Instant::now() < deadline && sibling_keys.len() < 10 { + if let Ok(Ok(SchedulerEvent::Completed(ref h))) = + tokio::time::timeout(Duration::from_millis(100), rx.recv()).await + { + if h.task_type == "test::indexed-sibling" { + sibling_keys.push(h.key.clone()); + } + } + } + + // Verify parent_id for all siblings while scheduler is still running. + for key in &sibling_keys { + let history = query_store + .latest_history_by_key(key) + .await + .unwrap() + .unwrap(); + assert_eq!( + history.parent_id, + Some(orchestrator_id), + "sibling's parent_id should be orchestrator" + ); + } + + token.cancel(); + let _ = handle.await; + + assert_eq!( + sibling_keys.len(), + 10, + "all 10 siblings should have completed" + ); +} + +/// 5. Test `.key()`, `.priority()`, `.ttl()`, `.group()` on `SiblingSpawnBuilder`. +#[tokio::test] +async fn sibling_spawn_builder_overrides() { + let store = TaskStore::open_memory().await.unwrap(); + let query_store = store.clone(); + + let sched = Scheduler::builder() + .store(store) + .domain( + Domain::::new() + .task::(OrchestratorExecutor { child_count: 1 }) + .task::(OverrideSiblingSpawner) + .task::(NoopExecutor), + ) + .max_concurrency(4) + .poll_interval(Duration::from_millis(20)) + .build() + .await + .unwrap(); + + sched + .submit(&TaskSubmission::new("test::orchestrator").key("orch-override")) + .await + .unwrap(); + + let mut rx = sched.subscribe(); + let token = CancellationToken::new(); + let sched_clone = sched.clone(); + let token_clone = token.clone(); + let handle = tokio::spawn(async move { + sched_clone.run(token_clone).await; + }); + + // Wait for sibling to complete. + let deadline = tokio::time::Instant::now() + Duration::from_secs(5); + let mut sibling_key = None; + while tokio::time::Instant::now() < deadline { + match tokio::time::timeout(Duration::from_millis(100), rx.recv()).await { + Ok(Ok(SchedulerEvent::Completed(ref h))) if h.task_type == "test::sibling" => { + sibling_key = Some(h.key.clone()); + break; + } + _ => continue, + } + } + + assert!( + sibling_key.is_some(), + "sibling with overrides should complete" + ); + + let history = query_store + .latest_history_by_key(sibling_key.as_ref().unwrap()) + .await + .unwrap() + .unwrap(); + assert_eq!( + history.priority, + Priority::HIGH, + "priority override applied" + ); + + token.cancel(); + let _ = handle.await; +} + +/// 7. Cross-domain sibling: `ctx.domain::().submit_with(task).sibling_of(&ctx)`. +#[tokio::test] +async fn sibling_spawn_cross_domain() { + let store = TaskStore::open_memory().await.unwrap(); + let query_store = store.clone(); + + let sched = Scheduler::builder() + .store(store) + .domain( + Domain::::new() + .task::(OrchestratorExecutor { child_count: 1 }) + .task::(CrossDomainSiblingSpawner), + ) + .domain(Domain::::new().task::(NoopExecutor)) + .max_concurrency(4) + .poll_interval(Duration::from_millis(20)) + .build() + .await + .unwrap(); + + let orchestrator_outcome = sched + .submit(&TaskSubmission::new("test::orchestrator").key("orch-cross")) + .await + .unwrap(); + let orchestrator_id = orchestrator_outcome.id().unwrap(); + + let mut rx = sched.subscribe(); + let token = CancellationToken::new(); + let sched_clone = sched.clone(); + let token_clone = token.clone(); + let handle = tokio::spawn(async move { + sched_clone.run(token_clone).await; + }); + + // Wait for cross-domain sibling to complete. + let deadline = tokio::time::Instant::now() + Duration::from_secs(5); + let mut sibling_key = None; + while tokio::time::Instant::now() < deadline { + match tokio::time::timeout(Duration::from_millis(100), rx.recv()).await { + Ok(Ok(SchedulerEvent::Completed(ref h))) if h.task_type == "media::cross-sibling" => { + sibling_key = Some(h.key.clone()); + break; + } + _ => continue, + } + } + + assert!( + sibling_key.is_some(), + "cross-domain sibling should complete" + ); + + let history = query_store + .latest_history_by_key(sibling_key.as_ref().unwrap()) + .await + .unwrap() + .unwrap(); + assert_eq!( + history.parent_id, + Some(orchestrator_id), + "cross-domain sibling's parent_id should be orchestrator" + ); + + token.cancel(); + let _ = handle.await; +} + +/// 8. Orchestrator's `finalize()` runs only after all siblings complete. +#[tokio::test] +async fn sibling_spawn_finalize_waits() { + let finalized = Arc::new(AtomicBool::new(false)); + + let sched = Scheduler::builder() + .store(TaskStore::open_memory().await.unwrap()) + .domain( + Domain::::new() + .task::(OrchestratorFinalizeTracker { + child_count: 1, + finalized: finalized.clone(), + }) + .task::(SiblingSpawnerExecutor) + .task::(NoopExecutor), + ) + .max_concurrency(4) + .poll_interval(Duration::from_millis(20)) + .build() + .await + .unwrap(); + + sched + .submit( + &TaskSubmission::new("test::orchestrator") + .key("orch-fin") + .fail_fast(false), + ) + .await + .unwrap(); + + let mut rx = sched.subscribe(); + let token = CancellationToken::new(); + let sched_clone = sched.clone(); + let token_clone = token.clone(); + let handle = tokio::spawn(async move { + sched_clone.run(token_clone).await; + }); + + // Wait for orchestrator to complete (after finalize). + let deadline = tokio::time::Instant::now() + Duration::from_secs(10); + let orch_completed = wait_for_event(&mut rx, deadline, |evt| { + matches!(evt, SchedulerEvent::Completed(ref h) if h.task_type == "test::orchestrator") + }) + .await; + + token.cancel(); + let _ = handle.await; + + assert!( + orch_completed.is_some(), + "orchestrator should complete after children and siblings" + ); + assert!( + finalized.load(Ordering::SeqCst), + "finalize should have been called" + ); +} + +/// 9. Verify that the refactored `spawn_children_with` batch path produces +/// correct outcomes for 100+ children (regression test for Step 2). +#[tokio::test] +async fn spawn_children_batch_uses_single_transaction() { + let sched = Scheduler::builder() + .store(TaskStore::open_memory().await.unwrap()) + .domain( + Domain::::new() + .task::(ChildSpawnerExecutor::::new(100)) + .task::(NoopExecutor), + ) + .max_concurrency(50) + .poll_interval(Duration::from_millis(20)) + .build() + .await + .unwrap(); + + sched + .submit(&TaskSubmission::new("test::parent").key("batch-parent")) + .await + .unwrap(); + + let mut rx = sched.subscribe(); + let token = CancellationToken::new(); + let sched_clone = sched.clone(); + let token_clone = token.clone(); + let handle = tokio::spawn(async move { + sched_clone.run(token_clone).await; + }); + + // Wait for parent to complete (meaning all 100 children completed). + let deadline = tokio::time::Instant::now() + Duration::from_secs(15); + let parent_completed = wait_for_event( + &mut rx, + deadline, + |evt| matches!(evt, SchedulerEvent::Completed(ref h) if h.task_type == "test::parent"), + ) + .await; + + token.cancel(); + let _ = handle.await; + + assert!( + parent_completed.is_some(), + "parent should complete after all 100 children" + ); +} + +/// 10. Root task calls `.sibling_of(&ctx)`. Assert `Err(StoreError::InvalidState)`. +#[tokio::test] +async fn sibling_spawn_cross_domain_no_parent_returns_error() { + let sched = Scheduler::builder() + .store(TaskStore::open_memory().await.unwrap()) + .domain(Domain::::new().task::(CrossDomainRootSiblingSpawner)) + .domain(Domain::::new().task::(NoopExecutor)) + .max_concurrency(4) + .poll_interval(Duration::from_millis(20)) + .build() + .await + .unwrap(); + + sched + .submit(&TaskSubmission::new("test::orchestrator").key("root-cross")) + .await + .unwrap(); + + let mut rx = sched.subscribe(); + let token = CancellationToken::new(); + let sched_clone = sched.clone(); + let token_clone = token.clone(); + let handle = tokio::spawn(async move { + sched_clone.run(token_clone).await; + }); + + let deadline = tokio::time::Instant::now() + Duration::from_secs(5); + let completed = wait_for_event(&mut rx, deadline, |evt| { + matches!(evt, SchedulerEvent::Completed(ref h) if h.task_type == "test::orchestrator") + }) + .await; + + token.cancel(); + let _ = handle.await; + + assert!( + completed.is_some(), + "task should complete after handling error" + ); +} From 3608c0d07eceb72c8594251d2603a226528d5cce Mon Sep 17 00:00:00 2001 From: DJ Majumdar Date: Tue, 24 Mar 2026 18:43:04 -0700 Subject: [PATCH 2/2] docs: add sibling task spawning section to quick-start guide --- docs/quick-start.md | 50 ++++++++++++++++++++++++++++++++++ src/lib.rs | 11 +++++++- src/registry/domain_context.rs | 27 ++++++++++++++++-- 3 files changed, 85 insertions(+), 3 deletions(-) diff --git a/docs/quick-start.md b/docs/quick-start.md index c1bb49c..e664426 100644 --- a/docs/quick-start.md +++ b/docs/quick-start.md @@ -308,6 +308,56 @@ scheduler.submit( ).await?; ``` +## Sibling tasks + +When a child task needs to spawn peer tasks under the same parent (flat hierarchy), use `ctx.spawn_sibling_with()` instead of manually extracting and threading the parent ID. The new task's `parent_id` is set to the current task's `parent_id`, making it a peer under the same orchestrator. + +```rust +impl TypedExecutor for DirScanner { + async fn execute<'a>( + &'a self, task: ScanL1DirTask, ctx: DomainTaskContext<'a, Scanner>, + ) -> Result<(), TaskError> { + for subdir in list_subdirs(&task.prefix).await? { + ctx.spawn_sibling_with(ScanL1DirTask { + bucket: task.bucket.clone(), + prefix: subdir, + }) + .key(&format!("{}:{}", task.bucket, subdir)) + .await?; + } + Ok(()) + } +} +``` + +For high-fan-out patterns, use `spawn_siblings_with()` which routes through a single-transaction batch path: + +```rust +let siblings: Vec = subdirs.into_iter() + .map(|d| ScanL1DirTask { bucket: bucket.clone(), prefix: d }) + .collect(); +ctx.spawn_siblings_with(siblings).await?; +``` + +If the current task has no parent (i.e. it's a root task), both methods return `StoreError::InvalidState` instead of silently creating a root task. + +For cross-domain siblings, use `.sibling_of(&ctx)` on the submit builder: + +```rust +ctx.domain::() + .submit_with(ScanStartedEvent { .. }) + .sibling_of(&ctx)? + .priority(Priority::HIGH) + .await?; +``` + +| Method | `parent_id` on new task | +|---|---| +| `submit_with(task)` | `None` (root) | +| `submit_with(task).parent(id)` | Explicit ID | +| `ctx.spawn_child_with(task)` | Current task's ID | +| `ctx.spawn_sibling_with(task)` | Current task's `parent_id` | + ## Sharing the scheduler A single `Scheduler` is `Clone` (via `Arc`) and can be shared across your entire application. Domains can carry their own scoped state, so library domains don't need to share a namespace with the host app's state. diff --git a/src/lib.rs b/src/lib.rs index a6db414..a002118 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -139,7 +139,7 @@ //! [`SchedulerBuilder::group_minimum_slots`], or adjust at runtime via //! [`Scheduler::set_group_weight`] and [`Scheduler::set_group_minimum_slots`]. //! -//! ## Child tasks & two-phase execution +//! ## Child tasks, sibling tasks & two-phase execution //! //! An executor can spawn child tasks via [`DomainTaskContext::spawn_child_with`]. When //! children exist, the parent enters a **waiting** state after its executor @@ -151,6 +151,15 @@ //! [`DomainSubmitBuilder::fail_fast(false)`](DomainSubmitBuilder::fail_fast) //! (or [`TaskSubmission::fail_fast`] for untyped submissions). //! +//! A child executor can also spawn **sibling** tasks — peers under the same +//! parent — via [`DomainTaskContext::spawn_sibling_with`]. This avoids manually +//! extracting and threading the parent ID, and returns +//! [`StoreError::InvalidState`] if the current task has no parent (instead of +//! silently creating a root task). A batch variant +//! [`DomainTaskContext::spawn_siblings_with`] uses a single-transaction path +//! for high-fan-out patterns (e.g. BFS directory scans). For cross-domain +//! siblings, use [`DomainSubmitBuilder::sibling_of`]. +//! //! ## Task TTL & automatic expiry //! //! Tasks can be given a time-to-live (TTL) so they expire automatically if they diff --git a/src/registry/domain_context.rs b/src/registry/domain_context.rs index c6ffe4c..d67d464 100644 --- a/src/registry/domain_context.rs +++ b/src/registry/domain_context.rs @@ -2,8 +2,31 @@ //! //! This module provides a zero-cost wrapper around [`TaskContext`] that carries //! domain identity as a type parameter. This enables compile-time–safe child -//! spawning: [`spawn_child_with`](DomainTaskContext::spawn_child_with) only -//! accepts tasks belonging to the same domain `D`. +//! and sibling spawning: +//! +//! - [`spawn_child_with`](DomainTaskContext::spawn_child_with) — spawn a child +//! task whose `parent_id` is the current task's ID. +//! - [`spawn_sibling_with`](DomainTaskContext::spawn_sibling_with) — spawn a +//! peer task whose `parent_id` is the current task's `parent_id` (i.e. the +//! same orchestrator). Returns `StoreError::InvalidState` if the current task +//! has no parent. +//! +//! Both accept only tasks belonging to the same domain `D`. Batch variants +//! ([`spawn_children_with`](DomainTaskContext::spawn_children_with), +//! [`spawn_siblings_with`](DomainTaskContext::spawn_siblings_with)) route +//! through a single-transaction batch path for efficiency. +//! +//! For cross-domain spawning, use [`DomainSubmitBuilder::child_of`] or +//! [`DomainSubmitBuilder::sibling_of`]. +//! +//! # Parent-relationship table +//! +//! | Method | `parent_id` on new task | +//! |---|---| +//! | `submit_with(task)` | `None` (root) | +//! | `submit_with(task).parent(id)` | Explicit ID | +//! | `ctx.spawn_child_with(task)` | Current task's ID | +//! | `ctx.spawn_sibling_with(task)` | Current task's `parent_id` | //! //! # Why a wrapper instead of `TaskContext`? //!