feat(graph): parallel branches (proposal 0011)#47
Conversation
Implements pipeline-utilities §11 (parallel branches) + graph-
engine §6 (branch_name on NodeEvent). Final batch piece before
the consolidated release cuts.
- parallel_branches.py: BranchSpec (frozen dataclass — subgraph,
inputs/outputs projection mappings, optional middleware) and
ParallelBranchesNode (the engine-dispatched node type). Mirrors
FanOutNode's shape but with M heterogeneous compiled subgraphs
vs N instances of one. Buffer-then-apply semantics per §11.4:
per-branch contributions collected during dispatch, applied in
branch insertion order at node completion. The
_MultiContribution sentinel handles multi-branch writes to one
parent field (per-value reducer fold in insertion order).
- errors.py: ParallelBranchesNoBranches (compile-time, raised at
builder when branches mapping is empty) and
ParallelBranchesBranchFailed (runtime, subtype of
NodeException, carries branch_name + inherits transient
classification from __cause__ per spec §6.1).
- events.py: NodeEvent.branch_name optional non-empty string.
Populated only on events from nodes inside a branch.
Uniqueness invariant (namespace, branch_name, fan_out_index,
attempt_index, phase) jointly unique per graph-engine §6.
- builder.py: add_parallel_branches_node with full registration-
time validation (non-empty branches, non-empty branch names,
inputs/outputs against declared parent + subgraph fields via
MappingReferencesUndeclaredField, errors_field against parent
fields).
- compiled.py: _step_parallel_branches_node dispatcher mirrors
_step_fan_out_node (parent middleware wraps the dispatch as a
single unit per §11.6, observer-attribution + save-checkpoint
+ deferred-completed-dispatch pattern). _merge_partial
recognizes _MultiContribution and folds each value through the
parent's reducer in insertion order. Both _dispatch_started +
_dispatch_completed read current_branch_name() and stamp it
on every NodeEvent.
- observer.py: descend_into_parallel_branch builds the branch's
_InvocationContext (atomic-restart drops checkpointer + resume
state per §11.9 / §10.7 alignment with fan-out).
- correlation.py: _branch_name_var ContextVar +
current_branch_name() inspector + _set/_reset helpers. Mirrors
_fan_out_index_var shape. Set inside each branch's task
closure via copy_context.
- otel/observer.py: surfaces openarmature.branch_name on every
node span within a branch. Independent of
openarmature.node.fan_out_index — both MAY be present when a
branch's subgraph contains a fan-out.
Fail-fast cancellation per §11.5: asyncio.wait with
FIRST_EXCEPTION; on first failure cancel pending tasks and
gather with return_exceptions=True to drain CancelledError + any
near-simultaneous second exception (silently discarded with
DEBUG log per spec-agent Q5 note — raise is committed to the
first observed failure). recoverable_state = pre-entry parent
state per §11.5; buffered contributions discarded.
Collect per §11.5: gather with return_exceptions=True;
successful branches contribute to buffer, failed branches'
errors land in errors_field (when configured) as {branch_name,
category, message, cause_type} records.
Smoke-tested: two heterogeneous branches (int + str result
fields) dispatch and contribute back to parent state in
insertion order. 656/64/0 pass/skip/fail.
Spec graph-engine §6 (clarified in spec v0.16.1) requires the wrapping retry middleware's attempt counter to propagate to events emitted from any inner node the retry re-invokes, including transitively through parallel-branches branch middleware and fan-out instance_middleware on a wrapping subgraph. Retry middleware now sets _attempt_index_var via the existing ContextVar set/reset token pattern before each next() call. The engine's innermost dispatch reads current_attempt_index() when emitting NodeEvents and when recording the checkpoint-save attempt_index, instead of writing its own local counter. Innermost-wins precedence falls out of Python's ContextVar token-stack semantics: nested retries shadow the outer counter while inside and restore it on exit. Node-level retry behavior is unchanged because retry's set agrees with what the engine's local counter previously emitted. Branch and instance-middleware retry now correctly surface their counter on inner-node events, matching the v0.16.1 clarification. Bumps the spec submodule pin from v0.16.0 to v0.16.1 in pyproject.toml, __spec_version__, and tests/test_smoke.py.
Extends the conformance harness so the eight spec fixtures introduced by proposal 0011 parse and run end-to-end: - pipeline-utilities/032-038 (parallel-branches basic, fail-fast, collect, different-state-schemas, with-branch-middleware-retry, determinism, compose-with-fan-out). - graph-engine/021-observer-branch-name (NodeEvent.branch_name). Harness changes: - ParallelBranchSpec / ParallelBranchesSpec models on NodeSpec; sleep_ms node-companion modifier; recoverable_state expected key on the pipeline-utilities expected discriminator. - adapter.build_graph dispatches the parallel_branches directive to builder.add_parallel_branches_node, wrapping the result in a tracing variant so the execution-order trace records the dispatcher as one engine step. - test_pipeline_utilities driver: lifts the fixture-number gate from 23 to 38; loads top-level plural subgraphs blocks; translates per-branch middleware lists; wires graph-attached observers per run; asserts parallel-branches observer_event_invariants (branch_started_event_order, alpha_inner_attempt_indices_seen, fan-out-inside-branch invariants, plain-branch invariants); routes fail_fast assertions to surface branch_name + cause_message + recoverable_state alongside category. - test_conformance driver: drops the 021 skip and adds an invariants checker for outermost-events-have-no-branch-name and inner-events-carry-correct-branch-name. The checkpoint-resume fixtures (024-031) move into the deferred set since their cases-shape with first_run_expected_error / resume: blocks is driven by test_checkpoint.py.
Adds unit-test coverage and doc surface for proposal 0011. tests/unit/test_parallel_branches.py: - Compile-time validation: empty branches mapping raises ParallelBranchesNoBranches; empty branch_name raises ValueError; inputs / outputs / errors_field projection-field-existence checks raise MappingReferencesUndeclaredField on the correct side. - Runtime happy path: three heterogeneous branches merge their projected outputs into the parent. - fail_fast: ParallelBranchesBranchFailed surfaces branch_name and the original cause via __cause__; recoverable_state holds the parent's pre-dispatch snapshot with no buffered contributions applied. - collect: per-branch failures land in errors_field with branch_name + category; failed branches' outputs do not fire. - Determinism: when two branches write the same merge-reducer parent field, insertion order determines fan-in order regardless of inner-node completion timing. - Cancellation drain: a second branch racing past cancellation has its exception absorbed silently by gather(*, return_exceptions=True). docs/concepts/parallel-branches.md: new concepts page covering when to reach for parallel branches vs. fan-out, the BranchSpec shape, per-branch state / inputs / outputs, the two error policies (fail_fast buffer-and-apply semantics, collect with errors_field), branch middleware (with the v0.16.1 attempt_index propagation note), composition with other constructs, resume semantics, and when parallel branches is not the right shape. docs/concepts/index.md: link to the new page. docs/concepts/observability.md: NodeEvent shape gets branch_name; attempt_index description notes the v0.16.1 transitive-retry propagation; new branch_name bullet describes independence from fan_out_index and the openarmature.branch_name OTel attribute. CHANGELOG: adds the parallel-branches Unreleased entry plus the attempt-index ContextVar propagation entry; bumps the spec pin note from v0.16.0 to v0.16.1; flips the release-gate note since all five proposals in the batch are now landed.
There was a problem hiding this comment.
Pull request overview
Implements spec proposal 0011 (parallel branches) — the final PR in a five-PR batch — and bumps the pinned spec to v0.16.1. Adds a new graph primitive that dispatches M heterogeneous compiled subgraphs concurrently, with deterministic insertion-order fan-in, two error policies (fail_fast / collect), branch_name propagation on events and OTel spans, and refactors retry-middleware's attempt_index to flow through a ContextVar so transitive (branch/instance) retry wrapping correctly surfaces the wrapping counter on inner-node events.
Changes:
- New
ParallelBranchesNode/BranchSpectypes andGraphBuilder.add_parallel_branches_nodebuilder surface, withParallelBranchesNoBranches(compile) andParallelBranchesBranchFailed(runtime) error categories; conformance harness + 13 unit tests + concept docs added. NodeEvent.branch_namefield andopenarmature.branch_nameOTel attribute, populated on inner-node events/spans inside a branch (independent offan_out_index).- Retry middleware now writes
_attempt_index_varvia ContextVar tokens; engine reads it viacurrent_attempt_index()for both event emission and checkpoint-save attribution (replacing its own local counter at the_step_function_node/_step_fan_out_node/ new_step_parallel_branches_nodesites).
Reviewed changes
Copilot reviewed 25 out of 25 changed files in this pull request and generated no comments.
Show a summary per file
| File | Description |
|---|---|
src/openarmature/graph/parallel_branches.py |
New BranchSpec, ParallelBranchesNode, _MultiContribution; fail_fast/collect dispatch with cancellation drain. |
src/openarmature/graph/builder.py |
add_parallel_branches_node with projection-field + branch-name + empty-branches validation. |
src/openarmature/graph/compiled.py |
Adds _step_parallel_branches_node; refactors function/fan-out attempt-index to ContextVar reads; _merge_partial handles _MultiContribution; branch_name stamped on events. |
src/openarmature/graph/errors.py |
New ParallelBranchesNoBranches, ParallelBranchesBranchFailed categories. |
src/openarmature/graph/events.py |
NodeEvent.branch_name field + docstring updates. |
src/openarmature/graph/observer.py |
descend_into_parallel_branch atomic-restart descent. |
src/openarmature/graph/middleware/retry.py |
Sets/resets _attempt_index_var around next_(state). |
src/openarmature/graph/__init__.py |
Re-exports new types/errors. |
src/openarmature/observability/correlation.py |
New _branch_name_var ContextVar + current_branch_name() API. |
src/openarmature/observability/otel/observer.py |
Emits openarmature.branch_name span attribute. |
src/openarmature/__init__.py, pyproject.toml |
Spec pin bumped to 0.16.1. |
tests/unit/test_parallel_branches.py |
13 unit tests covering compile-time validation, fail_fast, collect, determinism, cancellation drain. |
tests/conformance/adapter.py |
parallel_branches: directive translation, _TracingParallelBranchesNode, sleep_ms wrapper. |
tests/conformance/harness/directives.py, expectations.py |
New ParallelBranchSpec / ParallelBranchesSpec models, sleep_ms + recoverable_state keys. |
tests/conformance/test_pipeline_utilities.py |
Drives 032-038, parallel-branches invariants, branch-middleware translation, errors_field subset matching. |
tests/conformance/test_conformance.py |
Drives graph-engine 021 observer-branch-name invariants. |
tests/conformance/test_fixture_parsing.py, tests/test_smoke.py |
Removes deferred-skip entries; spec-version assertion update. |
docs/concepts/parallel-branches.md, observability.md, index.md, CHANGELOG.md |
Docs + changelog for the new feature and the spec-pin bump. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
CodeQL flagged the trailing ``del pb_branches`` in _check_parallel_branches_invariants as an unnecessary delete on a local that's about to fall out of scope anyway. The discovery loop above it populated a dict that no invariant consumer read; it was speculative scaffolding for future invariant kinds. Removing both the loop and the del cleans up dead code without changing any of the invariant checks (036/037/038 still pass).
Consolidated release for the five-PR batch: - Structured output (proposal 0016, PR #42) - Image content blocks (proposal 0015, PR #44) - Prompt management (proposal 0017, PR #45) - State migration for checkpoints (proposal 0014, PR #46) - Parallel branches (proposal 0011, PR #47) Bumps: - ``pyproject.toml`` project.version: 0.5.0 → 0.6.0 - ``__version__`` in src/openarmature/__init__.py - ``uv.lock`` editable package version - ``tests/test_smoke.py`` version assertion Flips CHANGELOG ``[Unreleased]`` to ``[0.6.0] — 2026-05-16``, drops the release-gate Notes entry, and tightens the pre-1.0 MINOR note to list the two behavioral changes (retry-MW attempt-index propagation, CheckpointRecord.schema_version semantic shift) instead of the structured-output-specific note carried over from PR-1. Pinned spec stays at v0.16.1 (set in PR #47).
Summary
openarmature.graph. PR-5 of the five-PR batch following PR-1 (feat(llm): structured output (proposal 0016) #42), PR-2 (feat(llm): image content blocks (proposal 0015) #44), PR-3 (feat(prompts): prompt-management core (proposal 0017) #45), PR-4 (feat(checkpoint): state migration for checkpoints (proposal 0014) #46). With this PR, the batch is complete.GraphBuilder.add_parallel_branches_node(name, *, branches, error_policy, errors_field, middleware)surface dispatches M heterogeneous compiled subgraphs concurrently per pipeline-utilities §11.BranchSpec(frozen dataclass with subgraph + inputs/outputs projection + branch middleware) andParallelBranchesNodetypes exported fromopenarmature.graph._MultiContributionsentinel that_merge_partialfolds through the reducer in branch insertion order."fail_fast"raisesParallelBranchesBranchFailed(aNodeExceptionsubtype) withbranch_name, original cause as__cause__, andrecoverable_statecarrying the parent's pre-dispatch snapshot — no buffered branch contributions are applied (§11.5 buffer-and-apply)."collect"runs every branch to completion, records per-branch failures in an optionalerrors_field({branch_name, category, message, cause_type}), and continues.ParallelBranchesNoBranches(compile time, empty branches map) andParallelBranchesBranchFailed(runtime fail_fast).NodeEvent.branch_name: str | Nonepopulated on events from nodes inside a parallel-branches branch (graph-engine §6, proposal 0011). Independent offan_out_index— both may be present simultaneously when a branch contains a fan-out (or a fan-out instance contains a parallel-branches node).openarmature.branch_nameOTel span attribute parallel to the existingopenarmature.node.fan_out_index. Both coexist on inner nodes of a fan-out-inside-a-branch composition._attempt_index_varvia the existing ContextVar token pattern around eachawait next_(state)call; the engine readscurrent_attempt_index()when emitting events and when recording the checkpoint-save attempt_index, instead of writing its own local counter. Innermost-wins precedence falls out of Python's ContextVar token-stack semantics. Branch and instance-middleware retry now correctly surface their counter on inner-node events, matching the v0.16.1 clarification (and fixture 036'salpha_inner_attempt_indices_seen: [0, 1]invariant).What's new
BranchSpec,ParallelBranchesNodeopenarmature.graph.BranchSpecis a frozen dataclass;ParallelBranchesNodeholds the branches dict, error policy, errors_field, and dispatcher-level middleware.GraphBuilder.add_parallel_branches_nodeParallelBranchesNoBranches; empty branch names →ValueError; inputs/outputs reference declared fields via the existingmapping_references_undeclared_fieldpath.ParallelBranchesNoBranches,ParallelBranchesBranchFailedbranch_nameas a structured field, preserves the original cause via__cause__, and inherits transient classification from the wrapped exception.NodeEvent.branch_name: str | Noneopenarmature.branch_nameOTel attributeevent.branch_name is not None. NOT set on the parallel-branches node's own span._attempt_index_varpropagationRetryMiddleware.__call__now sets/resets the ContextVar around eachnext_(); engine readscurrent_attempt_index()for event emission and checkpoint save attribution. Innermost-wins via Python's ContextVar token-stack.descend_into_parallel_branchon_InvocationContextRelease gate
PR-5 is the final piece of the five-PR batch (
0016→0015→0017→0014→0011). With this PR merged, the gate is cleared and the consolidated v0.5.x release can tag the whole batch against spec v0.16.1. The CHANGELOG[Unreleased]notes flip to reflect this.Commits
Atop commit 945b4c8 (the engine surface that landed before the v0.16.1 spec clarification reopened the attempt-index question):
feat(graph): propagate retry attempt_indextest(conformance): drive 0011 parallel-branchestest(unit): parallel_branches + docs + CHANGELOGNotable implementation details
_MultiContributionsentinel pattern. A frozen dataclass holdingtuple[Any, ...]of per-branch contributions to one parent field._merge_partialincompiled.pyrecognizes the sentinel and folds each value through the parent's reducer in branch insertion order. Single-branch contributions stay as plain values — the sentinel only fires when two or more branches write the same field. Lazy import in_merge_partialbreaks the textual cycle (parallel_branches.pyhas aTYPE_CHECKINGback-ref tocompiled.py).asyncio.create_task(coro, context=ctx.copy())per branch. Per-task ContextVar isolation: each branch's_set_branch_namestays branch-local. Outer state (the wrapping retry's_attempt_index_var) flows in because it was set BEFORE the copy._set_branch_nameis set inside the spawned task body, not in the dispatcher loop, because each task's context is fixed at spawn time viacopy_context()._fail_fastwaits forFIRST_EXCEPTION, cancels pending tasks, then drains withgather(*, return_exceptions=True). A second task may race past the cancellation point with its own exception, absorbed into the gather return list. The raise is committed to the FIRST failure observed; residual non-CancelledErrorexceptions are discarded silently with aDEBUGlog line for post-mortem analysis._attempt_index_vardirectly — a subtle but important shift from pre-v0.16.1 behavior. Retry middleware is now the canonical writer; the engine is pure reader viacurrent_attempt_index(). The line-957 comment incompiled.pyexplicitly cites the v0.16.1 clarification so this isn't accidentally undone in a future refactor.deferred_info[0]capturescurrent_attempt_index()at the moment of the successful merge (not from a stale local counter) for checkpoint-save attribution. All three save sites incompiled.pysource fromdeferred_info[0][0]when available, falling back to 0 for the short-circuit case (middleware short-circuited without invokingnext).descend_into_parallel_branchatomic-restart contract. Mirrors fan-out's per-instance descent: drops checkpointer + resume state on the spawned branch's invocation context. Per spec §10.7, per-branch progress is not individually persisted in v1 — a checkpoint resume landing on a parallel-branches node re-dispatches all branches from scratch.Conformance fixtures (032–038 + graph-engine/021)
All 8 fixtures pass:
parallel_branches_branch_failedwithbranch_name=betaandcause_message;recoverable_statecarries no branch contributions (including the first branch's successful work, per §11.4 buffer-and-apply); third branch's slow inner node never completesoutputsdo not fire (beta_result stays at default);branch_errorsrecords the beta failureoutputsto differently-typed parent fieldsalpha_inner_attempt_indices_seen: [0, 1]— the canonical v0.16.1 demonstration of transitive-retry attempt_index propagation. Branches beta and gamma show[0](no retry)sleep_ms50/5/25). Two branches write the same merge-reducer parent field. Insertion order alpha→beta→gamma determines fan-in order regardless of completion order (which is beta→gamma→alpha at runtime).branch_started_event_order: [alpha, beta, gamma]branch_nameANDfan_out_index. The plain branch's inner events carrybranch_namebut nofan_out_indexbranch_name; inner nodes of branchalphacarrybranch_name=alpha; inner nodes of branchbetacarrybranch_name=betaHarness extensions
ParallelBranchSpec+ParallelBranchesSpectyped-directive models onNodeSpec.sleep_msnode-companion modifier (used by 033's slow third branch and 037's randomized completion timing).recoverable_stateexpected key on thePipelineUtilitiesExpecteddiscriminator.adapter._add_parallel_branches_nodetranslates theparallel_branches:block tobuilder.add_parallel_branches_node, resolving branch subgraph references against the case'ssubgraphs:block. The registered node is swapped for a_TracingParallelBranchesNodeso the conformance trace records the dispatcher as one engine step.adapter._wrap_with_sleepwraps a node body in anasyncio.sleep(ms/1000)for thesleep_mscompanion modifier.test_pipeline_utilitiesdriver: lifts the fixture-number gate from 23 to 38; loads top-level pluralsubgraphs:blocks (alongside the legacy singularsubgraph:/subgraph_with_idx:); translates per-branch middleware lists; wires graph-attached observers per run; asserts parallel-branchesobserver_event_invariants(branch_started_event_order,alpha_inner_attempt_indices_seen, fan-out-inside-branch invariants, plain-branch invariants); routesfail_fastassertions to surfacebranch_name+cause_message+recoverable_statealongsidecategory; subset-matcheserrors_fieldrecords (the spec mandatesbranch_name+category; the engine's record carries two extras).test_conformancedriver: drops the 021 skip and adds an invariants checker foroutermost_events_have_no_branch_nameandinner_events_carry_correct_branch_name.test_pipeline_utilities's deferred set since their cases-shape withfirst_run_expected_error/resume:blocks is driven bytest_checkpoint.py.Unit tests
13 tests under
tests/unit/test_parallel_branches.py:ParallelBranchesNoBranches; empty branch_name →ValueError; inputs/outputs/errors_field projection-field-existence checks raiseMappingReferencesUndeclaredFieldon the correct side.ParallelBranchesBranchFailedsurfacesbranch_name;__cause__chain walks back to the originalRuntimeError;recoverable_stateholds the pre-dispatch snapshot with no buffered contributions applied (alpha's contribution must NOT land even though its branch may have completed before cancellation).errors_fieldwithbranch_name+category; failed branches'outputsdo not fire (beta_result stays at default)._MultiContributionsentinel only fires for multi-branch fields).Test plan
uv run pytest(excludingtests/test_examples_smoke.pypre-existing failures from missingopenaiin CI venv) — 657 pass, 58 skipped, 0 failed.uv run pyright— clean.uv run ruff check+uv run ruff format— clean.tests/unit/test_parallel_branches.pypass.docs/concepts/parallel-branches.mdviamkdocs serveto confirm rendering.Pre-1.0 SemVer
Additive: new types, new errors, new builder method, new
NodeEventfield, new OTel attribute. Existing free-form callers (no parallel-branches nodes registered) see no behavior change.The retry-MW attempt_index propagation is a behavior CHANGE for users with retry middleware wrapping a subgraph (branch middleware, fan-out
instance_middleware): events from inner nodes of that subgraph now carry the wrapping retry's attempt counter rather than starting at 0 each re-invocation. Per-node retry behavior is unchanged. Spec v0.16.1 frames this as a clarification of the ambiguous pre-v0.16.0 wording, so the v0.16.1 pin is the framing pre-1.0 MINOR. CHANGELOG flags the change underAdded.