graph: spec v0.6 observer pair model (proposal 0005)#9
Merged
Conversation
Replace single-event-per-attempt observer hooks with started/completed pairs. Each node attempt now produces two NodeEvents sharing a step: the started event fires before the wrapped node runs, the completed event fires after the merge succeeds (with post_state) or after the node, reducer, or state validation fails (with error). Adds per-observer phase subscription via a new public SubscribedObserver dataclass. attach_observer accepts an optional phases= kwarg; invoke accepts Observer | SubscribedObserver in its observers list. Empty phase sets raise ValueError at registration time. The delivery worker filters by event.phase against each observer's subscribed phases. NodeEvent gains attempt_index (default 0 until retry middleware lands) and fan_out_index (default None until fan-out runtime lands). Conformance fixtures 012-016 and 018 now pass; 017 still skips pending fan-out runtime (proposal 0005 pipeline-utilities side).
There was a problem hiding this comment.
Pull request overview
Retrofits the graph engine’s observer subsystem to spec v0.6.0’s started/completed pair model, adding phase-based subscriptions and extending NodeEvent to support upcoming retry/fan-out features.
Changes:
- Emit two
NodeEvents per node attempt (phase="started"andphase="completed") sharing the samestep. - Add
SubscribedObserver+ phase subscription plumbing (including queue-side filtering) and wire it throughattach_observer(...)/invoke(observers=...). - Update unit + conformance harnesses to assert phase-aware observer delivery and new event fields (
phase,attempt_index,fan_out_index).
Reviewed changes
Copilot reviewed 7 out of 7 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
src/openarmature/graph/observer.py |
Introduces SubscribedObserver, phase coercion, and delivery-time phase filtering. |
src/openarmature/graph/compiled.py |
Emits started/completed event pairs and updates observer registration/invocation types. |
src/openarmature/graph/events.py |
Extends NodeEvent with phase, attempt_index, and fan_out_index. |
src/openarmature/graph/__init__.py |
Re-exports SubscribedObserver in the public API. |
tests/unit/test_observer.py |
Updates queue/worker tests for subscribed observers and adds phase-filtering coverage. |
tests/conformance/test_conformance.py |
Updates conformance harness to support phase subscriptions and pair-model assertions. |
tests/conformance/adapter.py |
Records phase/attempt_index/fan_out_index and updates delivery-order shape. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Drain blocks until every queued event has been delivered, so a slow or hung observer can hold the calling process indefinitely. Note this in the drain() docstring with the asyncio.wait_for stop-gap pattern. A spec proposal for a first-class deadline parameter is in flight; this docstring nudge holds until it lands. Also bump the docstring's spec version reference from v0.3.0 to v0.6.0 to match the rest of the file.
Lead with the structural-Protocol idea so users see they don't need to subclass — any async callable with a matching signature works. Includes a short usage example. Simplify __call__ body to `...` (the PEP 544 idiomatic form for Protocol methods) since the body is never executed.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 7 out of 7 changed files in this pull request and generated 2 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Switching the truthiness check to `is not None` so an explicit `phases: []` in a fixture produces a `frozenset()` rather than collapsing to `None`. The engine's empty-set check then fires the intended ValueError instead of silently defaulting to ALL_PHASES. No current fixture uses `phases: []` for a regular observer, but the harness should faithfully translate fixture intent so a future fixture with that input doesn't silently mask a registration bug.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
NodeEvents sharing astep.startedfires before the wrapped node runs;completedfires after the merge (or after a node/reducer/state-validation failure, before the error propagates).SubscribedObserverdataclass pairs anObserverwith its phase subscription.attach_observer(observer, *, phases=...)andinvoke(observers=[Observer | SubscribedObserver])both accept the explicit form. Empty phase sets raiseValueErrorat registration.NodeEventgainsattempt_index(default 0 until retry middleware lands) andfan_out_index(default None until fan-out runtime lands).Out of scope
attempt_index > 0requires retry middleware (proposal 0004, future phase)._unsupported_directive.Conformance impact
fan_outnode directive).Test plan
uv run pytest -q— 213 passed (+9 from main), 1 skipped (017).uv run pyright src/ tests/— 0 errors.uv run ruff check src/ tests/— clean.uv run pytest tests/conformance/test_fixture_parsing.py -W error::UserWarning— 137 passed, no warnings (Phase 0 fix still holds).