Implement proposal 0009: per-instance fan-out resume#68
Merged
Conversation
Bump openarmature-spec pin from v0.17.1 to v0.18.0 (proposal 0009 accepted). Bump matching version markers in pyproject.toml, __init__.py, and the smoke test. Promote CheckpointRecord.fan_out_progress from a None placeholder to a tuple[FanOutProgress, ...] populated structure per §10.11. Add FanOutProgress (one entry per in-flight fan-out node, carrying fan_out_node_name, namespace, instance_count, and an indexed tuple of FanOutInstanceProgress) and FanOutInstanceProgress (per- instance state plus result for completed instances or completed_inner_positions for in-flight instances). Default is empty tuple. This is the field-shape change only. Engine save granularity, resume logic, composition rules, and the configurable batching knob land in follow-up commits. Backends round-trip the new field generically (in-memory: by reference; sqlite: needs an explicit serialization update which lands with the engine save changes).
Adds _FanOutInstanceState and _FanOutExecutionState mutable dataclasses in observer.py for tracking per-instance progress inside fan-outs. Adds fan_out_progress_state field to _InvocationContext, keyed by (namespace, fan_out_node_name) to disambiguate same-named fan-outs in different subgraph descents. Threads the dict through descend_into_subgraph, descend_into_fan_out_instance, and descend_into_parallel_branch so descendant contexts share the same dict by reference. An inner-instance node can update its own entry; saves anywhere in the invocation see the consistent sibling state. This is the data plumbing only. The engine save granularity change, FanOutNode tracking updates, and resume logic land in follow-up commits.
Sqlite backend gains a fan_out_progress_blob column (added via ALTER TABLE for backward compat) plus to/from-dict helpers for the proposal 0009 per-fan-out-node progress entries. The dedicated column approach was preferred over JSON-blob expansion: the existing record blobs each hold one logical field and adding a new column keeps the field-to-blob mapping obvious. InMemoryCheckpointer gains a FanOutInternalSaveBatching config (default: disabled) implementing the section 10.11.4 contract. Fan-out instance internal saves route through a new save_fan_out_internal method that buffers under batching; outer-graph, subgraph-internal, and fan-out node completion saves continue through save() and remain synchronous. A separate save_fan_out_in_flight_failure method buffers without triggering the flush count - by design, the "crash" the failure represents would lose the buffer. The backends package re-exports FanOutInternalSaveBatching so callers can construct a batching-enabled checkpointer via openarmature.checkpoint.
Implement the spec proposal 0009 contract end to end in the engine. Save granularity (section 10.3 as revised by 0009): the fan-out gate in _maybe_save_checkpoint is removed; fan-out instance internal nodes now fire saves like every other completed event. descend_into_fan_out_instance propagates the checkpointer reference so inner-node saves reach the backend. Inner-instance NodePositions land on the per-instance state's completed_inner_positions list rather than the outer completed_positions list - matches the section 10.11 scoping text where the inner positions are "scoped to this fan-out instance's inner subgraph execution rather than the outer graph." Per-instance accumulator (section 10.11): FanOutNode.run_with_context gains a _FanOutExecutionState entry on the shared context.fan_out_progress_state dict before dispatching instances. Each instance transitions not_started -> in_flight -> completed as it progresses; the in_flight transition fires before any inner work so a sibling- triggered save observes the correct state. On successful instance completion the contribution is recorded into the accumulator (tracked.result + tracked.extra_outputs) BEFORE the explicit "instance completed" save fires - the section 10.11 atomicity contract that the append reducer's no-double-merge guarantee depends on. A crash between the accumulator mutation and the save leaves the saved record showing in_flight; resume re-runs the instance. Fail_fast cancellation (section 9.5 + 10.11.2): asyncio.gather does NOT cancel siblings on exception by default; the dispatcher now uses asyncio.wait(FIRST_EXCEPTION) plus an explicit cancel_signal Event so sibling tasks waiting on the bounded-concurrency semaphore exit early as CancelledError. Without this, siblings could race past the cancellation window and mutate their tracked state to completed - violating the section 10.11.2 invariant that failed-instance state is in_flight and cancelled siblings are in_flight or not_started. Collect mode (section 10.11.2): per-instance failures promote inline inside run_instance (mutate tracked state + save + raise) so the completion save fires before any sibling dispatches. The post-gather loop in the prior atomic-restart impl is removed - its job is now done inline. result_is_error distinguishes success contributions from collect-mode error contributions; on resume the heuristic _looks_like_error_record reconstructs the flag from the public FanOutInstanceProgress.result shape. Resume (section 10.4 + 10.7): invoke() now restores the per-fan- out tracking dict from the loaded record's fan_out_progress field via _restore_fan_out_progress_state. FanOutNode.run_with_context consults the restored entry: completed instances skip with a rolled-forward partial; in_flight and not_started instances dispatch normally. The inner subgraph re-enters at its declared entry node, not at any completed_inner_positions (per section 10.7 - those positions are observational, not state- restore points). In_flight observability (section 10.11): when an instance fails before any sibling-triggered save fires (the serial + 1-node- subgraph case from fixture 048), an explicit _save_instance_in_flight closes the gap. Routed through save_fan_out_in_flight_failure so under batching the save buffers without triggering the flush count. Batching (section 10.11.4): _save_fan_out_internal and _save_fan_out_in_flight_failure are the engine-side routers to the checkpointer's batching-aware save methods. Backends without the hook fall back to plain save() so non-batching backends are unaffected.
Extend the typed-harness models so fixtures 048-054 parse:
- FanOutSpec gains concurrent_mode ("serial" | "concurrent") +
abort_after_instance. concurrent_mode: serial is sugar for
concurrency=1; abort_after_instance is a harness directive
(engine doesn't see it) that the test driver interprets.
- FlakyPerIndexSpec gains always_fail_indices alongside
fail_first_run_indices - 052 needs the always-fail flavor for
collect-mode error contributions.
- A model_validator on FanOutSpec normalizes the key-as-tag YAML
shape ({retry: {...}}) that fixture 053 uses into the explicit
{type: retry, ...} shape the discriminated-union models expect.
- CaseSpec's checkpointer field widens from str to str | dict so
fixture 054's {kind: in_memory_batched, ...} config parses.
Adapter extensions:
- _make_flaky_per_index_fn implements the two flaky-per-index
shapes against current_fan_out_index(); takes an optional
instance_attempt_recorder so the test driver can assert
per-instance retry-budget reset on resume.
- list<error_entry> resolves to list[dict[str, str]] - the engine's
ship shape for per-instance error records under collect.
- _add_fan_out_node translates concurrent_mode: serial to
concurrency=1.
- _make_pure_update_fn now interprets string values as field
references when the string matches a declared state field. This
handles fixture 050's update_pure: {stage1: input} which means
"copy state.input into stage1" rather than literal "input".
Conformance runner (tests/conformance/test_checkpoint.py): Fixture range extended to include 048-054 alongside the existing 024-031 (minus 028 which was REMOVED in spec v0.18.0). 027 stays deferred until the resume-aware flaky_resume_aware test seam lands; all others run end to end. The capturing checkpointer wrapper grows save_fan_out_internal + save_fan_out_in_flight_failure mirrors for the proposal 0009 engine seams. A FanOutInternalSaveBatching config from the fixture's checkpointer dict gets wired through to the InMemoryCheckpointer constructor so fixture 054's "buffered saves lost on crash" path tests against the real batching shape. The abort_after_instance harness directive (fixture 052) hooks the wrapper's save flow: after the named instance transitions to completed on the saved record, the wrapper raises _AbortAfterInstance to simulate a crash at that exact point. Subsequent post-abort save calls also raise (modelled as the crash propagating through the engine's save path) so siblings dispatched after the abort don't pollute the loaded record. New matchers for the proposal 0009 fixture vocabulary: - saved_record_assertions.fan_out_progress with state, result, state_one_of, completed_inner_positions, result_kind: error. - saved_record_assertions reads from load() rather than the in-memory saves list so the batching backend's buffered-not-flushed contract holds. - instances_executed_during_resume / instances_skipped_during_resume sourced from the flaky_per_index attempt recorder. - Resume-side invariants: no_duplicate_results, results_list_length, errors_list_length, no_duplicate_error_entries, instance_N_attempt_index_on_resume, instance_N_resume_attempt_count. Subgraph build now threads the flaky_per_index attempt recorder through so the per-instance flaky bodies inside inner subgraphs populate the same shared map the resume assertions consult. Unit tests (tests/unit/test_checkpoint.py): The atomic-restart test test_fan_out_internal_saves_are_gated_off is replaced by test_fan_out_internal_saves_fire_per_instance - the spec changed under proposal 0009 and the old assertion (one save total for an N-instance fan-out, every position with fan_out_index=None) no longer holds. The new test verifies that inner-node saves fire and surface inner positions on the per- instance completed_inner_positions field. Closes Q4 from the spec impl-plan review: a focused unit test on the fail_fast fast-cancel path that asserts fan_out_progress for the failed instance is in_flight (no result) and cancelled siblings are in_flight or not_started after cancellation completes.
There was a problem hiding this comment.
Pull request overview
Implements spec proposal 0009 (per-instance fan-out resume) by persisting per-fan-out/per-instance progress in checkpoint records, updating engine save/resume behavior (including fail_fast cancellation semantics and optional fan-out-internal save batching), and extending the conformance harness + backends for spec v0.18.0.
Changes:
- Promote
CheckpointRecord.fan_out_progressto a populated, frozen per-fan-out/per-instance progress structure and restore it on resume. - Update fan-out execution to emit per-instance internal saves, explicit “instance completed” saves, and improved fail_fast cancellation behavior.
- Add backend support (InMemory batching config; SQLite schema + serialization) and expand conformance coverage for fixtures 048–054; bump spec pin to v0.18.0.
Reviewed changes
Copilot reviewed 17 out of 17 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| tests/unit/test_checkpoint.py | Updates unit expectations and adds tests for per-instance internal saves + fail_fast in-flight state. |
| tests/test_smoke.py | Updates expected __spec_version__ to 0.18.0. |
| tests/conformance/test_checkpoint.py | Extends conformance driver for proposal-0009 fixtures and new directives (fan_out_progress, batching, abort_after_instance). |
| tests/conformance/harness/fixtures.py | Allows structured checkpointer config for batching backends. |
| tests/conformance/harness/directives.py | Adds/normalizes proposal-0009 fixture directives (flaky_per_index shapes, concurrent_mode, abort_after_instance). |
| tests/conformance/adapter.py | Adds flaky_per_index directive support, update_pure field-reference handling, and fan-out concurrency-mode translation. |
| src/openarmature/graph/observer.py | Introduces mutable per-fan-out/per-instance execution state threaded through invocation context. |
| src/openarmature/graph/fan_out.py | Implements per-instance resume behavior, explicit per-instance saves, and updated fail_fast cancellation. |
| src/openarmature/graph/compiled.py | Projects/restores fan_out_progress in checkpoints; routes fan-out-internal saves via optional checkpointer hooks. |
| src/openarmature/checkpoint/protocol.py | Defines FanOutProgress/FanOutInstanceProgress and updates CheckpointRecord fan_out_progress default/shape. |
| src/openarmature/checkpoint/backends/sqlite.py | Adds fan_out_progress_blob column (ALTER TABLE) and serialization/deserialization for fan_out_progress. |
| src/openarmature/checkpoint/backends/memory.py | Adds optional fan-out internal save batching + related save hooks. |
| src/openarmature/checkpoint/backends/init.py | Exports FanOutInternalSaveBatching. |
| src/openarmature/checkpoint/init.py | Re-exports new progress types + batching config. |
| src/openarmature/init.py | Bumps __spec_version__ to 0.18.0. |
| pyproject.toml | Bumps [tool.openarmature].spec_version to 0.18.0. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Six CoPilot / CodeQL / code-quality findings, all addressed:
- Drop unused params and del statements in _save_instance_completed
and _save_instance_in_flight (fan_out.py). The "reserved for future
event correlation" framing was YAGNI; reintroduce if/when needed.
- Remove unused _failing_instance_counter global from
test_fail_fast_cancellation. The failure trigger is the s.item == 999
sentinel; the counter was a stale draft remnant.
- Convert cast("Iterable[Any]", ...) to cast(Iterable[Any], ...) at
the two call sites in test_checkpoint.py so the Iterable import is
an unambiguous usage rather than a string-form forward reference.
- Pop fan_out_progress_state entries in _step_fan_out_node's
try/finally after the fan-out's own completion save fires. Per spec
10.11 the entry is "in-flight only"; proposal 0009's save text says
the fan-out node's own completion save "also finalizes
fan_out_progress to mark all instances complete," so the pop has to
happen AFTER that save projects the final state. The finally also
covers raise paths so a retry middleware on the fan-out node gets a
fresh tracked state on the second attempt.
- Switch InMemoryCheckpointer's _fan_out_buffer_count from a global
int to a per-invocation dict (_fan_out_buffer_counts). Concurrent
invocations sharing the same checkpointer no longer interfere with
each other's flush thresholds; a buffered invocation's saves can
reach their own count and flush without another invocation's
flush zeroing the counter prematurely.
Bring the user-facing docs in sync with the engine changes: - concepts/checkpointing.md: reverse the atomic-restart language to describe saves firing at fan-out instance internal events and the per-instance resume contract; update the CheckpointRecord shape to show fan_out_progress as tuple[FanOutProgress, ...] (default empty tuple) instead of the None placeholder; update the field framing to describe the populated shape and cross-link to fan-out's resume section. - concepts/fan-out.md: rewrite the Resume semantics section. New text covers the three-state per-instance machine (completed, in_flight, not_started), the append reducer no-double-merge guarantee, error_policy composition (fail_fast vs collect), and the optional fan-out-internal save batching knob with the InMemoryCheckpointer example. - CHANGELOG.md: add entries under Unreleased for the per-instance resume contract, the new public types (FanOutProgress, FanOutInstanceProgress, FanOutInternalSaveBatching), the CheckpointRecord field promotion, and the v0.18.0 pin bump. - examples/05-fan-out-with-retry.md: add a "Composing with checkpointing" section noting per-instance resume + attempt_index reset on resume, with cross-refs. The example itself doesn't use a Checkpointer; just documenting the composition shape. A new example demonstrating per-instance fan-out resume end-to-end and a verify-reference-docs-render pass are deferred to a follow-up PR (out of scope for this one).
Two CoPilot findings, both addressed:
- tests/conformance/test_checkpoint.py: fix the unreachable invariant
branch for fixture 050's instance_N_executes_step_X_on_resume
matcher. The middle clause was endswith("_executes_step_") AND
endswith("_on_resume") which can never both be true. Switch the
middle to substring containment so the branch matches the actual
pattern.
- src/openarmature/graph/fan_out.py: switch asyncio.ensure_future
to asyncio.create_task for consistency with the rest of the
codebase (compiled.py, parallel_branches.py). Semantically
equivalent for spawning a coroutine as a task; create_task is the
more explicit form when wrapping arbitrary futures isn't needed.
Bump openarmature-spec pin from v0.18.0 to v0.18.1 to pick up the fixture 052 results-literal patch shipped on the release/v0.18.1 maintenance branch. Bump matching version markers in pyproject.toml, __init__.py, and the smoke test. Fix a harness gap surfaced by the patched fixture 052: the fixture's expected.final_state block carries magic-key length assertions (errors_list_length, results_list_length) as siblings of literal field assertions. The harness's _assert_state_matches treated every key as a literal field equality check, so the magic keys raised "None != 1" against the missing state field. Route the magic keys to list-length checks against the corresponding state field (errors / results); literal keys take the unchanged equality path. Result: all 7 of the proposal-0009 new conformance fixtures (048-054) pass against the engine. Pyright + ruff clean.
Per spec §10.2 the CheckpointRecord.schema_version is the outermost graph state's version: the record represents the whole invocation tree. The fan_out.py helpers _save_instance_completed and _save_instance_in_flight were reading from type(parent_state).schema_version. For an outermost fan-out parent_state IS the outermost state, so the read aligned with what _maybe_save_checkpoint writes via self.state_cls.schema_version. For a nested fan-out (a fan-out inside a subgraph), parent_state is the subgraph's state and the read would record the subgraph's declared schema_version on the save record instead of the outermost's. Switch both helpers to read from context.parent_states_prefix[0] when the prefix is non-empty (the outermost state lives at index 0 of the parent chain), else from parent_state directly. The latter branch covers outermost fan-outs where parent_state IS the outermost state. Add a focused unit test using a three-level setup (outer state with schema_version "outer-v1" wraps a subgraph with state schema_version "middle-v1" wrapping a fan-out using a plain inner state) and asserts that the saves fired by _save_instance_completed carry "outer-v1" rather than "middle-v1". Filter is precise to avoid catching inner-instance node saves (length-2 parent_states + empty completed_positions go through _maybe_save_checkpoint with the inner graph's state_cls) and the fan-out node's own completion save (which appends the fan-out position to completed_positions). No fixture exercises this combination; closes the spec impl-review observation (a) raised in 05-spec-fixture-052-and-pr-review.md.
Four CoPilot findings fixed; one deferred for spec direction (surfaced in coord thread 07-python-schema-version-follow-on). Fixed in this commit: - fan_out.py cancel_signal race after semaphore acquisition. Lifted cancel_signal out of the fail_fast branch (defined before run_instance so the closure sees it) and added a check inside run_instance after the resume rollforward branch but before the first tracked-state mutation. A task that acquired the semaphore after a sibling failed now exits as CancelledError without flipping tracked.state to in_flight, preserving the not_started contract from spec section 10.11.2. Collect mode leaves the signal unset so the new check is a no-op there. - fan_out.py un-retrieved task exceptions. The fail_fast loop that selects failed_cause used to break after the first match, leaving other failed tasks' exceptions unretrieved and triggering "Task exception was never retrieved" warnings on GC. Loop now iterates all completed-not-cancelled tasks and calls t.exception() on each; failed_cause still captures only the first failure for the NodeException's cause. - tests/conformance/adapter.py fail_first_run_indices global flag. The flaky_per_index harness used a single has_failed_once flag that fired only on the first index encountered. Switched to a per-index set so each index named in fail_first_run_indices fails on its first call independently of the others. Current fixtures use single-index configurations so the bug was latent. - CHANGELOG.md two pin-bump bullets contradicted each other. Consolidated into a single bullet covering the cumulative cycle (v0.17.0 to v0.18.1) and naming all three absorbed spec versions inline. Deferred (surfaced in coord 07 for spec direction): - fan_out.py schema_version source asymmetry with _maybe_save_checkpoint's self.state_cls pattern. The current shape matches spec impl-review (a)'s explicit code suggestion; the broader engine-wide consistency question (type(state) vs state_cls everywhere) needs spec to pick the canonical pattern.
Three CoPilot findings. One already addressed by an earlier consolidation; two fixed here. - CHANGELOG.md Added bullet: clarified the spec-version reference. Was "(proposal 0009 / spec v0.18.0)" which parsed as if v0.18.0 is the current pin. Now "(proposal 0009, accepted in spec v0.18.0)" to make the wording unambiguous: v0.18.0 is the spec version where proposal 0009 was accepted; v0.18.1 is the current pin (a fixture-only patch on release/v0.18.1). The Notes bullet documents the cumulative pin state. - tests/unit/test_checkpoint.py fan-out save count assertion tightened from >= 3 to >= 7. The test sets up 3 instances with a 1-node inner subgraph, so the deterministic save breakdown is 3 inner-node saves + 3 explicit _save_instance_completed saves + 1 fan-out node completion save = 7. The earlier >= 3 assertion would pass even if 4 of 7 saves silently dropped. Kept >= rather than == so future engine internals can add saves without breaking the test, but the new lower bound is tight enough to catch regressions. Already addressed (one comment): the CHANGELOG Notes bullet was consolidated in commit b71a6ce to read "v0.17.0 to v0.18.1" cumulatively with the three absorbed spec versions named inline. CoPilot's earlier-diff comment about the pin showing v0.18.0 is no longer applicable to the current text.
Three CoPilot findings. One code fix; two were stale-PR-description
concerns addressed by editing the PR body directly via gh pr edit.
Code fix:
- tests/conformance/test_checkpoint.py: explicitly exclude 028 from
_CHECKPOINT_FIXTURE_NUMBERS rather than relying on the test
runner's file-glob to filter the missing-fixture out at discovery
time. The prior shape included 28 in the set via range(24, 32)
but the comment claimed it was "naturally excluded"; the set and
the comment disagreed. Build the proposal-0008 range as
set(range(24, 32)) - {28} so the set matches the on-disk fixture
set.
PR description edits (not commits; via gh pr edit):
- Pin reference updated from v0.18.0 to v0.18.1; v0.18.0 framed as
the introducing spec version for proposal 0009 and v0.18.1 framed
as the maintenance tag carrying the fixture-052 patch.
- Test-status section updated from "6 of 7 fixtures pass; 052
fails on a spec-side typo" to "all 7 fixtures pass" with the
pin-v0.18.1 attribution.
- Total test counts updated (750 passed, 73 skipped; pyright 0
errors).
- Scope section expanded to reflect the nested-fan-out
schema_version fix, the InMemoryCheckpointer per-invocation
flush accounting fix, the documentation updates, and the
explicit fixture-028 exclusion.
- Known limitations section gains the schema_version asymmetry
follow-on (deferred to spec per coord thread 07).
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Implements spec proposal 0009 (per-instance fan-out resume) in
openarmature-python. Bumps the spec submodule pin from v0.17.1 to v0.18.1 (v0.18.0 introduces the proposal; v0.18.1 is a maintenance tag onrelease/v0.18.1carrying a fixture-only fix to 052) and lands the engine, harness, and test changes required by the new contract.The shipping atomic-restart fan-out resume from proposal 0008 (which replayed the whole fan-out on resume after a mid-fan-out crash) is superseded by per-instance resume:
completedinstances skip and roll forward their accumulator contribution;in_flightinstances re-run from the inner subgraph entry;not_startedinstances dispatch normally.Scope
CheckpointRecord:fan_out_progressfield promoted fromNoneplaceholder to a populatedtuple[FanOutProgress, ...]of frozen entries with per-instance state, result, andcompleted_inner_positions._maybe_save_checkpointnow fires saves inside fan-out instances and projects the sharedfan_out_progress_statedict into the record's frozen progress field. Inner-instance positions accumulate on per-instance state rather than the outercompleted_positionslist. Resume restores the saved progress into the engine context soFanOutNodeconsults it before re-dispatching.FanOutNode.run_with_context: tracks per-instance state through thenot_started -> in_flight -> completedtransition, fires explicit "instance completed" saves after accumulator mutation (atomicity ordering per §10.11), supports rollforward of both success and collect-mode error contributions on resume, and uses an explicitcancel_signal: asyncio.Event+asyncio.wait(FIRST_EXCEPTION)for fail_fast cancellation (asyncio.gather does not strongly cancel siblings on bounded-concurrency semaphore wait).save_fan_out_internal/save_fan_out_in_flight_failurehooks for backends opting into §10.11.4 batching. Backends without the hooks fall back to the standardsavefor the same correctness.InMemoryCheckpointer: optionalfan_out_internal_save_batching: FanOutInternalSaveBatching(flush_every=N)config. Default off (every save flushes synchronously). Per-invocation flush accounting so concurrent invocations sharing one checkpointer don't interfere with each other's thresholds.SQLiteCheckpointer: newfan_out_progress_blobcolumn added viaALTER TABLEfor backward compatibility with pre-0009 databases. Round-trips through both pickle and JSON serialization modes.fan_out_progressmatchers,state_one_of(execution-mode-dependent state assertions),result_kind: error,completed_inner_positions,instances_executed_during_resume/instances_skipped_during_resume,instance_N_attempt_index_on_resume,abort_after_instancefan-out abort,in_memory_batchedCheckpointer kind, andflaky_per_indexwithalways_fail_indices/fail_first_run_indicesvariants (per-index tracking).tests/conformance/test_checkpoint.pyfixture range expanded to cover 048-054. Fixture 028 (atomic-restart) was REMOVED in spec v0.18.0 and is explicitly excluded from_CHECKPOINT_FIXTURE_NUMBERS._save_instance_completed/_save_instance_in_flightnow read schema_version fromcontext.parent_states_prefix[0]when non-empty (the outermost state) rather thantype(parent_state). New unit testtest_nested_fan_out_records_outermost_schema_versionexercises a 3-level wrapping setup.docs/concepts/checkpointing.mdreverses the atomic-restart language to describe per-instance saves + resume; updatesCheckpointRecordshape andfan_out_progressfield framing.docs/concepts/fan-out.mdrewrites the Resume semantics section with the three-state machine, reducer composition note, error_policy composition, and the in-memory batching example.docs/examples/05-fan-out-with-retry.mdadds a "Composing with checkpointing" section.CHANGELOG.mddocuments the per-instance contract, public types, and cumulative pin bump (v0.17.0 to v0.18.1).Test status
All 7 new conformance fixtures (048-054) pass:
resultsliteral)Existing fixture suite (024-031 minus removed-028, plus the prior graph-engine + observability suites) continues to pass. pyright clean (0 errors). ruff clean.
Design decisions worth flagging
CheckpointRecordfield; the new column preserves that mapping. ALTER TABLE for backward compat.in_flight(accumulator mutation happens AFTER_invokereturns and the per-instance partial is extracted). An explicit engine-internal save closes the atomicity gap. Spec §10.3 says "one save per inner-node completion" but doesn't forbid engine-internal extra saves beyond that minimum.asyncio.gatherdoes NOT cancel siblings strongly under bounded concurrency. The prior fan_out.py had a stale comment claiming it does. Replaced withasyncio.wait(FIRST_EXCEPTION)+ explicitcancel_signal: asyncio.Eventchecked at both the outer wrapper AND insiderun_instanceafter semaphore acquisition (the in-instance check covers the race window where a sem-blocked task would otherwise fliptracked.statetoin_flightafter a sibling failed). Mirrors the parallel-branches dispatcher.Known limitations (acknowledged in code; deferred to spec follow-ons)
extra_outputsper-instance values are not preserved across resume. Spec modelsresultas a single accumulator entry; preserving extras would need a spec change. No fixture exercises this._save_instance_completed,_save_instance_in_flight) do not dispatch acheckpoint_savedobserver event. The inner node's intrinsic save already emits one for the same per-instance state; duplicating would double-count for OTel span emission. Per §10.8 SHOULD.schema_versionsource asymmetry between_save_instance_completed'stype(state)-based read (per spec impl-review (a)'s explicit code) and_maybe_save_checkpoint'sself.state_cls-based read. The two disagree only when a user passes a State subclass instance with a differentschema_versionthan the declared graph class. Surfaced in coord threadproposal-0009-per-instance-fan-out-resume/07-python-schema-version-follow-on.mdfor spec to land a canonical pattern.Test plan
uv run pytest tests/ -q(750 passed, 73 skipped, 0 failed)uv run pyright src/ tests/(0 errors, 0 warnings, 0 informations)uv run ruff check .(clean)uv run mkdocs build --strict(clean)