Skip to content

feat: expose child memos in finalize() for orchestrator aggregation #86

@deepjoy

Description

@deepjoy

Problem

When using the orchestrator pattern (parent task spawns children, finalize() runs after all complete), there's no way to access child task results in finalize().

Concrete use case

We have a BFS directory scanner where an orchestrator spawns ~238K child tasks (one per directory). Each child scans a single directory and produces a report:

struct L1DirReport { discovered: u64, skipped: u64, deleted: u64 }

impl TypedExecutor<ScanL1DirTask, L1DirReport> for ScanL1DirExecutor {
    async fn execute(&self, task: ScanL1DirTask, ctx: ...) -> Result<L1DirReport, TaskError> {
        // scan one directory...
        Ok(L1DirReport { discovered: 42, skipped: 100, deleted: 3 })
    }
}

In the orchestrator's finalize(), we want to aggregate these reports (total files discovered/skipped/deleted across all directories). But finalize() only receives the orchestrator's own memo — it has no access to child results.

Current workaround

Each child writes its results to an external database as a side effect. In finalize(), we query that database for aggregate stats. This works but means the aggregation logic lives outside taskmill, and the child memos are effectively wasted.

Storing child task IDs in the orchestrator memo doesn't work either — the orchestrator's memo is sealed when execute() returns, but most children are spawned later by sibling tasks via .parent(orchestrator_id).

Proposal

Expose a way to query child task memos from within finalize(). Two possible shapes:

Option 1 — Iterator/stream on context:

async fn finalize(&self, task: T, memo: MyMemo, ctx: DomainTaskContext<'_, D>) -> Result<(), TaskError> {
    let mut total = 0u64;
    let mut children = ctx.child_memos::<ScanL1DirTask, L1DirReport>().await?;
    while let Some(report) = children.next().await {
        total += report.discovered;
    }
    // ...
}

Option 2 — Pre-collected vec (simpler, higher memory):

async fn finalize(&self, task: T, memo: MyMemo, children: Vec<ChildResult<L1DirReport>>, ctx: ...) -> ...

Option 1 (streaming) is preferable at scale — 238K child memos shouldn't need to be held in memory at once.

Context

  • This is for a BFS filesystem scanner that decomposes a full-catalog scan into per-directory child tasks under a single orchestrator
  • The orchestrator uses fail_fast(false) so partial failures are tolerated
  • Child memos would also be useful for error summarization in finalize (e.g., "3/238K directories failed: permission denied on X, Y, Z")
  • Without this, the orchestrator pattern works for coordination but not for result aggregation — the two most common orchestrator needs

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions