graph: spec v0.4 fan-out runtime (proposal 0005 PU side) — phase 3#16
Merged
Conversation
Implements pipeline-utilities §9: parallel fan-out of compiled
subgraphs with bounded concurrency, two error policies, configurable
empty-input behavior, and per-instance middleware composition. Lands
all 8 target conformance fixtures (pipeline-utilities 017-023 +
graph-engine 017-observer-fan-out-index).
Engine:
- fan_out.py: FanOutNode (a third Node sibling alongside FunctionNode
and SubgraphNode), FanOutConfig dataclass, per-instance projection
helpers, concurrency-bounded execution via asyncio.gather +
Semaphore, fan-in merge for fail_fast and collect policies,
on_empty raise/noop handling.
- errors.py: FanOutCountModeAmbiguous, FanOutFieldNotList compile
errors; FanOutEmpty, FanOutInvalidCount, FanOutInvalidConcurrency
runtime errors. The runtime trio subclasses NodeException so they
surface as graph-engine §4 node_exception with an additional
fan_out_category attribute (matching fixture 023's expected
shape).
- observer.py: _InvocationContext gains fan_out_index; new
descend_into_fan_out_instance helper stamps the index onto the
child context so inner-node events fire with it populated.
- compiled.py: _step_fan_out_node — wraps the whole fan-out as one
parent dispatch (per §9.6) with started/completed events around
the chain; _dispatch_started/_completed propagate fan_out_index
from context onto every NodeEvent.
- builder.py: GraphBuilder.add_fan_out_node with full §9 compile-
time validation (mode mutual-exclusion, items_field list-typed
check, declared-field references for collect_field, target_field,
count_field, errors_field, inputs, extra_outputs).
Conformance harness:
- adapter.py: fan_out node directive translation; new test seam
directives update_pure, update_from_field, flaky_by_index (both
fail_count_per_idx and fail_when_idx shapes), flaky_instance_only;
_TracingFanOutNode for execution-order trace recording.
- test_pipeline_utilities.py: instance_middleware threading via a
new fan_out_instance_middleware kwarg on build_graph; cases-
fixture handling now merges shared subgraph blocks into each
case so 018-019, 021-023 see them; supports state_field_read and
queue_chunk callable resolvers for count + concurrency.
- test_conformance.py: removed fan_out from
_UNSUPPORTED_NODE_DIRECTIVES so graph-engine fixture 017 runs.
Unit tests (test_fan_out.py): 19 tests covering items_field
projection, count modes (literal int + state-reading callable),
count + concurrency callables resolved exactly once at entry, inputs
mapping projection, concurrency limit enforcement, fail_fast
recoverable_state contract, collect errors_field shape, on_empty
raise/noop, count_field write, extra_outputs merge,
instance_middleware retry composition, fan-in determinism under
nondeterministic completion timing, four compile-error checks
(count_mode_ambiguous both/neither, field_not_list,
inputs/extra_outputs undeclared field references).
Total tests: 276 passing, 0 skipped.
There was a problem hiding this comment.
Pull request overview
Adds spec v0.4 pipeline-utilities §9 fan-out support to the graph engine, including bounded-concurrency per-instance subgraph dispatch, new fan-out error types, observer tagging via fan_out_index, and conformance/unit-test coverage.
Changes:
- Introduces
FanOutNode/FanOutConfigruntime with fail-fast vs collect policies, empty-input handling, and per-instance middleware. - Threads
fan_out_indexthrough invocation contexts and node observer events; adds compiled-engine dispatch path for fan-out nodes. - Extends conformance adapter/harness to translate
fan_outand related directives; adds a newtest_fan_out.pyunit test suite.
Reviewed changes
Copilot reviewed 10 out of 10 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
src/openarmature/graph/fan_out.py |
Implements fan-out runtime node, projection, bounded concurrency, and fan-in behavior. |
src/openarmature/graph/builder.py |
Adds GraphBuilder.add_fan_out_node with compile-time validation and config wiring. |
src/openarmature/graph/compiled.py |
Dispatches FanOutNode as a single engine step and emits events with fan_out_index. |
src/openarmature/graph/observer.py |
Adds fan_out_index to invocation context; supports descending into fan-out instances. |
src/openarmature/graph/errors.py |
Adds fan-out compile/runtime error categories. |
src/openarmature/graph/__init__.py |
Exports fan-out node/config and new fan-out errors. |
tests/conformance/adapter.py |
Adds fan-out fixture translation and new directive seams (update_pure, update_from_field, flaky_by_index, flaky_instance_only). |
tests/conformance/test_pipeline_utilities.py |
Enables fan-out fixtures up to phase 3; supports per-fan-out instance middleware translation and shared subgraph blocks. |
tests/conformance/test_conformance.py |
Removes fan-out-related directives from unsupported set. |
tests/unit/test_fan_out.py |
Adds comprehensive unit tests for fan-out spec corner cases. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
The PR review thread suggested keying flaky_by_index and flaky_instance_only by id(state) for per-instance counters. Tried that; broke fixture 021. Root cause: instance-level retry (021) constructs a fresh subgraph state on each retry, so id(state) resets per retry attempt — the per-instance counter starts over and retry exhausts. True per-instance semantics need an identifier stable across both node-level retry (state stable, id works) AND instance-level retry (state changes, id doesn't work). A state-field key would work but the field name is fixture-specific (item for 020, input for 021). Reverting to a fixture-global counter and documenting the limitation in the docstring + an inline comment, so a future fixture exercising the gap surfaces a real failure rather than silently miscounting. The existing fixtures (019 collect, 020 node-level retry, 021 instance-level retry) align with the global counter at runtime — not because the semantics are correct, but because the timing happens to land the failure on the right call.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Phase 3 of the implementation plan — pipeline-utilities §9 fan-out runtime: parallel per-item / per-count subgraph dispatch with bounded concurrency, two error policies, configurable empty-input behavior, and per-instance middleware composition. Lands all 8 target fixtures (pipeline-utilities 017–023 + graph-engine 017-observer-fan-out-index) and adds 19 unit tests for the spec-corner cases the fixtures only exercise implicitly.
What's in the PR
Engine
src/openarmature/graph/fan_out.py— new module:FanOutNode: third sibling toFunctionNodeandSubgraphNode. Recognized in_invokeas a distinct dispatch type; itsrun_with_contextwraps the parent's middleware chain around a single fan-out dispatch.FanOutConfig: frozen dataclass with all §9 config fields (subgraph, items_field, item_field, count, concurrency, error_policy, on_empty, count_field, inputs, extra_outputs, instance_middleware, errors_field).asyncio.gather+asyncio.Semaphore, fan-in merge for fail_fast and collect policies, on_empty raise/noop handling.errors.py— five new categories:FanOutCountModeAmbiguous,FanOutFieldNotList.FanOutEmpty,FanOutInvalidCount,FanOutInvalidConcurrency. Each subclassesNodeExceptionso they surface ascategory: "node_exception"with an additionalfan_out_categoryattribute (matching fixture 023's expected shape).observer.py—_InvocationContextgainsfan_out_index;descend_into_fan_out_instancestamps the index onto the child context. Inherited unchanged through any subgraph descents inside an instance.compiled.py—_step_fan_out_nodewraps the whole fan-out as one parent dispatch per §9.6. Per-instance events fire withfan_out_indexpopulated.builder.py—GraphBuilder.add_fan_out_nodewith full §9 compile-time validation.Harness
adapter.py—fan_out:translation; new test directivesupdate_pure,update_from_field,flaky_by_index(bothfail_count_per_idxandfail_when_idxshapes),flaky_instance_only;_TracingFanOutNodefor execution-order tracing.test_pipeline_utilities.py—fan_out_instance_middlewarethreading; cases-fixture merge of shared subgraph blocks;state_field_readandqueue_chunkcallable resolvers for count/concurrency.test_conformance.py—fan_outremoved from_UNSUPPORTED_NODE_DIRECTIVES; graph-engine fixture 017 now passes.Unit tests (
test_fan_out.py, 19 tests)items_field projection · count modes (literal + state-reading callable) · count and concurrency callables resolved exactly once at entry · inputs mapping · concurrency limit · fail_fast recoverable_state contract · collect errors_field shape · on_empty raise/noop · count_field write · extra_outputs merge · instance_middleware retry · fan-in determinism under nondeterministic timing · four compile errors (count_mode_ambiguous both/neither, field_not_list, inputs/extra_outputs undeclared field references).
Conformance impact
Test plan
uv run pytest -q— 276 passed, 0 skipped, 3 expected warnings.uv run pyright src/ tests/— 0 errors.uv run ruff check src/ tests/— clean.Notes
errors_fieldrecords use stringifiedfan_out_indexper fixture 019's choice — see comment in_fan_in_collect.compose_chain(Phase 2 finding) andfan_out.pymodule docstring; worth measuring at large N once that path is exercised in real workloads.