graph: spec v0.3 observer hooks (proposal 0003)#5
Merged
Conversation
Implement spec v0.3 / proposal 0003: node-boundary observer hooks.
A `NodeEvent` is dispatched once per node execution onto a per-
invocation delivery queue that runs concurrently with the graph's
execution loop.
Public surface:
- `NodeEvent` (frozen dataclass): node_name, namespace tuple, step,
pre/post state, error category+instance, parent_states tuple.
- `Observer` Protocol: async callable receiving a NodeEvent;
parameter is positional-only so conformance isn't tied to a
parameter name.
- `CompiledGraph.attach_observer(fn) -> RemoveHandle` for graph-
attached observers; `invoke(state, observers=...)` for invocation-
scoped.
- `CompiledGraph.drain()` awaits delivery of every event dispatched
by prior invocations of this graph.
Delivery semantics per spec §6:
- Strictly serial within an invocation: no two observers process the
same event concurrently; no observer sees event N+1 until everyone
has finished N. Order is graph-attached (outermost → innermost),
then invocation-scoped, both in registration order.
- Async-from-graph: invoke() returns when the graph reaches END
regardless of queue state. Use drain() for short-lived processes.
- Observer exceptions are caught and reported via warnings.warn —
they don't break siblings, subsequent events, or the graph run.
- Subgraph-internal events bubble up: the subgraph wrapper itself
does NOT generate an event (per fixture 013); only its inner nodes
do. Step counter spans the subgraph boundary; namespace and
parent_states extend.
Bumps openarmature to 0.4.0 and the spec submodule to v0.3.1.
There was a problem hiding this comment.
Pull request overview
Implements OpenArmature graph-engine spec v0.3 observer hooks (proposal 0003) by introducing node-boundary NodeEvent delivery via a per-invocation async queue/worker, plus public APIs for observer registration and draining.
Changes:
- Add observer public API (
Observer,NodeEvent,RemoveHandle) and engine internals for dispatch + delivery (_InvocationContext, queue worker,CompiledGraph.attach_observer(), invocation-scoped observers,CompiledGraph.drain()). - Thread observer invocation context through subgraph execution so inner-node events bubble up with correct namespace/parent_state/step semantics.
- Update conformance + unit tests and bump package/spec versions to
openarmature==0.4.0/ specv0.3.1.
Reviewed changes
Copilot reviewed 13 out of 14 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| uv.lock | Bumps editable openarmature version to 0.4.0. |
| pyproject.toml | Version/spec bump; pyright config tweak; ruff exclude for spec submodule. |
| src/openarmature/init.py | Updates __version__ and __spec_version__. |
| src/openarmature/graph/init.py | Re-exports new observer/event public API. |
| src/openarmature/graph/events.py | Introduces NodeEvent frozen dataclass. |
| src/openarmature/graph/observer.py | Adds Observer protocol, delivery queue/worker, invocation context, remove handle. |
| src/openarmature/graph/compiled.py | Dispatches NodeEvents during execution; adds observer registration + drain(); supports invocation-scoped observers. |
| src/openarmature/graph/subgraph.py | Threads _InvocationContext into subgraph execution for bubbling semantics. |
| tests/unit/test_observer.py | New unit tests for queue delivery ordering, error isolation, sentinel behavior, and helpers. |
| tests/conformance/adapter.py | Uses real SubgraphNode subclass for tracing and adds observer fixture helpers. |
| tests/conformance/test_conformance.py | Wires fixture-declared observers; asserts per-observer events and global delivery order; drains after invokes. |
| tests/test_smoke.py | Updates expected package/spec versions. |
| README.md | Updates stated implemented spec version to v0.3.1. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
3 tasks
CodeQL noise (10 threads): replace bare `...` Protocol/stub bodies with explicit forms. `Observer.__call__` gets a docstring + `raise NotImplementedError` (so pyright accepts the declared return type, same pattern as Protocol bodies fixed in #4); test stub observers get `pass`. Add an explanatory comment to `RemoveHandle.remove`'s `try/except ValueError: pass` documenting the idempotency intent the docstring already promises. Logic (2 threads): - `_active_workers` switched from list to set and each per- invocation worker now registers add_done_callback( self._active_workers.discard). Long-running services that never call drain() no longer accumulate completed Task references indefinitely. drain() simplifies to a one-line asyncio.gather over a snapshot of the set. - SubgraphNode branch in _invoke now wraps non-RuntimeGraphError exceptions from node.run(state, context=context) as NodeException tagged with the wrapper's name. Projection errors (project_in / project_out) and subgraph state-class init errors (e.g. Pydantic ValidationError) were previously propagating raw, bypassing the spec §4 error contract. Already- wrapped errors from inside the subgraph's _invoke pass through unchanged so the inner node's identity stays attached. Adds test_subgraph_projection_error_wrapped_as_node_exception covering the case.
chris-colinsky
added a commit
that referenced
this pull request
May 5, 2026
* ci: align ruff tooling and add CI workflow
The .pre-commit-config.yaml was pinning ruff v0.5.0 (mid-2024) while
the dev-dep ruff resolved to v0.15.11. The two versions disagree on
isort grouping (newer ruff splits pydantic and openarmature into
separate import groups; v0.5.0 merges them), which caused test files
to ping-pong between forms on every commit cycle.
Bump pre-commit ruff to v0.15.11 to match the dev dep, then run
`ruff check --fix` once to settle the disagreement on the newer
form. Eight test-file imports reordered as a result.
Add `.github/workflows/ci.yml` running:
- Checkout with submodules (the openarmature-spec submodule carries
the conformance fixtures)
- uv sync --frozen
- ruff check
- ruff format --check
- pyright src/ tests/
- pytest -q
Same checks the pre-commit hook runs locally, now also enforced
server-side so PRs from contributors who haven't installed pre-commit
don't bypass them. Concurrency group cancels in-flight runs on the
same ref.
* ci: add explicit permissions block for least-privilege
Mirrors LunarCommand/openarmature-examples#3: pin GITHUB_TOKEN to
`contents: read`. The workflow only needs to checkout code + the
spec submodule; no writes.
* ci: apply v0.15.11 ordering to files added since rebase base
Post-rebase: ruff v0.15.11 splits pydantic and openarmature into
separate import groups, where v0.5.0 (the version on main when #5
landed) merged them. The bumped pre-commit hook in this branch
applies the new ordering to test files brought in by #5.
* ci: drop explicit Python install, use .python-version
Per PR #6 review: the workflow had `uv python install 3.12` while
.python-version pins 3.14, creating a CI/dev skew where CI could
end up testing a different interpreter than developers run locally.
Drop the explicit install step. uv sync auto-provisions whatever
.python-version pins, keeping CI and dev aligned. requires-python
">=3.12" in pyproject.toml still establishes 3.12 as the floor;
testing the floor specifically is a separate concern (matrix build)
we can add later if needed.
8 tasks
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
NodeEventis dispatched once per node execution onto a per-invocation delivery queue that runs concurrently with the graph's execution loop.ObserverProtocol,NodeEventfrozen dataclass,RemoveHandle,CompiledGraph.attach_observer, invocation-scoped observers viainvoke(state, observers=[...]), andCompiledGraph.drain()for short-lived processes.openarmatureto 0.4.0 and the spec submodule to v0.3.1 (the v0.3.0 spec shipped fixture 013 with a YAML syntax bug; v0.3.1 is the corrigendum).Spec reference
Delivery semantics (per spec §6)
invoke()returns when the graph reaches END regardless of queue state.drain()is required for short-lived processes.warnings.warn— they don't break siblings, subsequent events, or the graph run.namespacechains andparent_statesextends.Test plan
uv run pyright src/ tests/zero errorsuv run ruff check . && uv run ruff formatclean (pre-commit ruff is authoritative)_InvocationContextthreading throughSubgraphNodefor nested-subgraph correctness (the spec invariantlen(parent_states) == len(namespace) - 1is enforced by construction indescend_into_subgraph)