observability: phase 6.1 PR-C.3 — prepare_sync + fixture 010#27
Merged
Conversation
Step 1 of PR-C.3. Adds the engine-readable ``current_active_observer_span`` ContextVar in ``observability/correlation.py``, with the inverted-directionality docstring spec recommended (``observer→engine`` flow vs. PR-A's ``engine→observer`` set). Typed as ``object | None`` so the base package stays free of an OpenTelemetry import — the OTel observer writes ``Span`` instances; the engine treats the value opaquely and delegates the actual attach to a try-imported OTel helper. Also extends the ``Observer`` Protocol docstring to document the optional ``prepare_sync(event) -> None`` extension method: opt-in via ``hasattr``, no subclass or runtime_checkable Protocol required, engine calls only for ``"started"``-phase events with the same isolation contract as the async path. Engine wiring + OTel observer refactor land in subsequent steps.
Step 2 of PR-C.3. ``_dispatch`` now calls each subscribed
observer's optional ``prepare_sync(event)`` synchronously BEFORE
queueing for ``"started"``-phase events, with the same isolation
contract as the async path: ``warnings.warn`` on exception,
doesn't block queueing or subsequent events.
Phase-gated: forwarding to ``prepare_sync`` only fires when
``"started"`` is in the subscribed observer's ``phases`` set —
mirrors how ``deliver_loop`` filters async dispatch. A user who
explicitly subscribes only to ``{"completed"}`` gets neither the
sync prep nor the async started events, so the wrapper acts as a
uniform phase shield across both axes.
Hook is opt-in via ``hasattr`` — observers without
``prepare_sync`` are unaffected. OTel observer's ``prepare_sync``
method lands in step 3.
Step 3 of PR-C.3. Renames ``_handle_started`` → ``_open_started_span`` and bakes idempotency into it: a short-circuit at the top returns early if a span already exists for the event's ``_StackKey``. That covers the common case where ``prepare_sync`` opened the span synchronously in the engine task and the async ``__call__`` later re-fires for the same event — the second call becomes a true no-op rather than opening a duplicate span. Observer-attached-late and test paths that bypass ``prepare_sync`` still get the span opened via ``__call__``'s fall-through. Adds the public ``prepare_sync(event)`` method. Routing-gated (only ``"started"``-phase non-LLM events qualify), it calls ``_open_started_span`` and then publishes the just-opened span via ``_set_active_observer_span``. The engine's ``innermost`` reads the ContextVar in step 4 to attach the span into the OTel context so logs emitted from inside the node body — even on the first line, before any ``await`` — pick up the right trace_id/span_id via the OTel ``LoggingHandler``. The Token returned by ``_set_active_observer_span`` is discarded on purpose: last-writer-wins is the documented contract — the next ``prepare_sync`` call overwrites, and the task-local context dies with the invocation task.
Step 4 of PR-C.3. Adds a try-imported OTel attach helper pair in compiled.py: ``_attach_active_observer_span`` reads ``current_active_observer_span`` (set synchronously by an observer's ``prepare_sync`` before queueing) and splices the span into the OTel context via ``opentelemetry.context.attach(set_span_in_context(span))``; ``_detach_active_observer_span`` pairs the detach in ``finally``. Both ``_step_function_node``'s and ``_step_fan_out_node``'s ``innermost`` closures now attach right after ``_dispatch_started`` returns and detach in a ``finally`` around the ``await node.run(...)`` / ``await node.run_with_context(...)`` call. That puts the attach scope around exactly the user-code window — so logs emitted on the FIRST line of a node body, before any ``await``, pick up the right ``trace_id``/``span_id`` via OTel's ``LoggingHandler`` — and the detach fires before ``_dispatch_completed`` queues the completed event or the merge runs. The except branch binds the OTel names to ``None`` so pyright narrows on ``if _otel_attach is None: ...`` rather than flagging "possibly unbound." Engine stays no-OTel-dep at runtime: installs without ``[otel]`` get a no-op attach/detach, the ContextVar stays ``None``, and nothing changes. Drives the load-bearing log-correlation cases landing in steps 5 and 6.
Step 5 of PR-C.3. Promotes ``010-otel-log-correlation`` from ``_DEFERRED_FIXTURES`` to ``_SUPPORTED_FIXTURES`` and adds a hand-built driver covering both YAML sub-cases. Driver is hand-built rather than going through the conformance adapter — fixture 010's ``emits_log:`` directive isn't an adapter primitive (the adapter recognizes ``update_pure``, ``subgraph``, etc., and silently ignores anything else), and the sub-cases are small enough that hand-built python is clearer than threading a new directive through the adapter. Sub-case 1 (``log_records_carry_trace_span_correlation_ids``): two nodes ``a`` → ``b``, both emit a log on the FIRST line of their body (before any ``await`` — the load-bearing case ``prepare_sync`` exists to cover). Asserts all logs share a trace_id, each log's span_id matches the active node span at emission, and all carry the invocation's correlation_id. Sub-case 2 (``detached_subgraph_log_uses_detached_trace_id...``): outer invocation has a detached subgraph; logs across the boundary land in different traces but share the correlation_id. Outer log fires from per-node middleware on the SubgraphNode wrapper (SubgraphNode wrappers don't get ``prepare_sync`` per spec — the inner detached node handles attach for itself). Asserts trace_ids differ + correlation_id flows unchanged. Helpers ``_setup_isolated_log_bridge`` and ``_restore_log_state`` snapshot/restore root-logger handler+filter+factory state so the process-global ``install_log_bridge`` mutations don't bleed into neighboring tests. ``_enable_test_logger_at_info`` walks the fixture-010 logger up to ``INFO`` so YAML's ``level: INFO`` records actually flow through Python's logger-level filter to the bridge handler — undone on exit.
Step 6 of PR-C.3. Adds ``test_log_on_first_line_of_node_body_carries_node_span`` under ``tests/unit/test_observability_otel.py``: a focused single-node test that emits a log on the FIRST line of a node body (before any ``await``) and asserts the resulting log record carries the node span's ``trace_id`` AND ``span_id``. This is the regression target ``prepare_sync`` exists to cover. Without the synchronous engine-task observer prep: - The engine queues the started event for async dispatch. - The node body runs immediately in the engine task. - A log emitted on the first line, before any ``await``, runs before the OTel observer's ``__call__`` has fired on the worker task — so the span isn't open yet, OTel's ``get_current()`` returns an invalid span, and the log lands with ``trace_id=0`` / ``span_id=0``. With ``prepare_sync``, the observer creates the span synchronously in the engine task BEFORE queueing, publishes it via the ``current_active_observer_span`` ContextVar, and the engine attaches it to OTel context around the body. The first-line log sees the right span. Lives in unit/ (not just buried in fixture 010's driver) so a regression jumps straight to ``prepare_sync``- related code. Snapshot/restore the root logger's handlers, filters, factory, and the test logger's level so process-global ``install_log_bridge`` state doesn't bleed into other tests.
There was a problem hiding this comment.
Pull request overview
Adds an opt-in synchronous prepare_sync(event) observer hook so the engine can synchronously attach an observer-published OpenTelemetry span around node-body execution, closing the timing gap where “first-line” logs previously lacked the correct trace_id/span_id. This also promotes/implements conformance fixture 010-otel-log-correlation and adds a load-bearing unit regression test.
Changes:
- Introduces
current_active_observer_spanContextVar for observer → engine span handoff and wires engine-side OTel attach/detach around node bodies. - Adds
prepare_syncsupport in the engine’s dispatch pipeline and implements it inOTelObserverwith idempotent started-span opening. - Drives conformance fixture 010 and adds a focused unit test ensuring first-line logs carry the active node span correlation.
Reviewed changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
src/openarmature/observability/correlation.py |
Adds current_active_observer_span ContextVar API for observer→engine span publication. |
src/openarmature/graph/observer.py |
Calls optional prepare_sync(event) synchronously for "started" events before queueing. |
src/openarmature/observability/otel/observer.py |
Implements prepare_sync and makes started-span opening idempotent via _open_started_span. |
src/openarmature/graph/compiled.py |
Tries to import OTel attach/detach and wraps node bodies (function + fan-out) with attach/detach around await node.run(...). |
tests/conformance/test_observability.py |
Promotes and drives fixture 010-otel-log-correlation (nested + detached cases) with isolated logging bridge setup/restore. |
tests/unit/test_observability_otel.py |
Adds a load-bearing unit test asserting first-line logs carry the node span’s trace_id/span_id. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
PR-C.3 review fixup. The cleared spec lifecycle reasoning at the coord thread covered only the happy path: "ContextVar gets overwritten on the next prepare_sync." If a subsequent prepare_sync raises or early-returns without publishing — for any reason — the engine reads the previous node's span and attaches it around the new node's body, producing wrong log correlation. Bound the "ContextVar is set" window to the node-body scope by clearing it to None in innermost's finally right after the OTel detach (both _step_function_node's and _step_fan_out_node's paths). Between dispatches and during merge / completed-event dispatch the ContextVar is now None, so a failing or early-returning prepare_sync can't reveal a stale span when the engine reads. Lifecycle ownership stays with the attach/detach scope rather than fanning out across observers in _dispatch. Updated current_active_observer_span's docstring to reflect the narrower lifecycle.
PR-C.3 review fixup. The opt-in-via-hasattr contract means pyright doesn't catch a user signature mismatch when a developer assumes "all observer methods are async" and defines ``async def prepare_sync(...)``. Today the call silently returns an unawaited coroutine — the prep work never runs and Python emits a delayed "coroutine was never awaited" RuntimeWarning at GC time, breaking log correlation in a way that's hard to trace back to the observer. In ``_dispatch``, after each ``prepare_sync(event)`` returns, check ``inspect.isawaitable(result)``. On hit: close the awaitable (suppresses the secondary RuntimeWarning) and emit an explicit ``warnings.warn`` naming the misconfiguration so it fails loudly at the call site. Post-call detection catches the common ``async def`` case AND the rarer lambda-returning-coroutine / ``functools.partial``-of-async cases — one check, all forms covered.
PR-C.3 review fixup. The ``except Exception: pass`` after the best-effort ``close_method()`` call tripped CodeQL's ``py/empty-except`` rule on two surfaces (code-quality + advanced security). Cleanup is intentionally best-effort — a raise here MUST NOT propagate or break sibling observers' dispatch — but swallowing silently makes the rare cleanup-failure case invisible. Replace the empty pass with ``except Exception as close_error:`` followed by a ``warnings.warn`` mentioning the cleanup-failure. Same isolation contract preserved (no propagation, no sibling-blocking) but the swallow is now observable. CodeQL ``py/empty-except`` cleared on both surfaces.
PR-C.3 review fixup.
- observer.py: rewrite the awaitable-from-prepare_sync warning.
The old text claimed "did NOT run", but a user returning an
``asyncio.Task`` / ``Future`` may have work in flight on the
loop — just not awaited at the prepare_sync call site. The
contract violation is "no guarantee the prep completes before
the node body," not "definitely doesn't run." Reworded to that
shape and included ``type(result).__name__`` so the user
can see which awaitable they returned at a glance.
- tests/conformance/test_observability.py: sub-case 1's driver
hardcoded the YAML message bodies ("node a executing" /
"node b executing") for record filtering and lookup, even
though it had already read ``emits_log.message`` from the
spec to drive the node body. That duplicated spec data and
made the test brittle to fixture wording changes. Derive a
``node_emit_messages`` map from ``nodes_spec`` up front; use
the values for both record filtering and ``by_body`` indexing.
Sub-case 2 already worked this way (uses ``outer_emit`` /
``inner_emit`` derived from spec); sub-case 1 now matches.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Phase 6.1 PR-C.3. Lands the optional
prepare_sync(event)observer hook so the engine can attach an observer-published OTel span synchronously around the node body — closing the timing gap that prevented logs emitted on the FIRST line of a node body (before anyawait) from picking up the righttrace_id/span_id. Drives fixture010-otel-log-correlationto green.The hook is opt-in via
hasattron the observer (noruntime_checkableProtocol required) and phase-gated throughSubscribedObserver("started"-only).OTelObserverextracts_open_started_spanas the synchronous core;prepare_synccalls it and publishes the just-opened span via the newcurrent_active_observer_spanContextVar inobservability/correlation.py. The engine'sinnermost(both function-node and fan-out paths) reads the ContextVar after_dispatch_startedreturns and attaches the span into OTel context via a try-imported attach pair, paired with afinally-block detach around theawait node.run(...)call.The async
__call__for"started"events becomes idempotent — short-circuits if the span is already ininv_state.open_spans— so observers attached late or test paths bypassingprepare_syncstill work.Spec coordination thread:
threads/phase-6-1-pr-c3-prepare-sync/(cleared at02-spec-pr-c3-cleared.md).Commit-by-commit
prepare-sync: add active-span ContextVar + protocol docstring—current_active_observer_span: ContextVar[object | None]incorrelation.py(typedobject | Noneso the base package stays no-OTel-dep);ObserverProtocol docstring documents the optionalprepare_sync(event)extension.prepare-sync: engine wiring + phase-gated forward in _dispatch—_dispatchcalls each subscribed observer'sprepare_syncsynchronously before queueing for"started"events; phase-gated by the wrapper'sphasesset;warnings.warnisolation matching the async path.prepare-sync: OTelObserver sync core + ContextVar publish— extract_open_started_spanfrom_handle_started; addprepare_syncthat calls it + publishes via the ContextVar; idempotency check at the top of_open_started_spanso duplicate calls become true no-ops.prepare-sync: engine OTel attach around node bodies— try-imported_attach_active_observer_span/_detach_active_observer_spanhelpers incompiled.py; both_step_function_nodeand_step_fan_out_nodeinnermostclosures attach right after_dispatch_startedreturns and detach infinallyaround theawait node.run(...)call. Engine stays no-OTel-dep at runtime: installs without[otel]get a no-op pair.prepare-sync: drive fixture 010 (log correlation)— promotes010-otel-log-correlationto_SUPPORTED_FIXTURES; hand-built driver covering both sub-cases (nested-trace + detached subgraph). Helpers snapshot/restore root-logger state so process-globalinstall_log_bridgemutations don't bleed into other tests.prepare-sync: load-bearing first-line-log unit test— focused single-node test intests/unit/test_observability_otel.pythat emits a log on the FIRST line of a node body and asserts the resulting log record carries the node span'strace_idANDspan_id. Lives in unit/ so a regression jumps straight toprepare_sync-related code.Test plan
uv run pytest -q— 401 passed, 2 skipped.uv run pyright src/ tests/— clean.uv run ruff check src/ tests/— clean._DEFERRED_FIXTURESis now empty).test_log_on_first_line_of_node_body_carries_node_spanis green.Phase 6.1 status
PR-A (#22), PR-B (#23), PR-C (#24), PR-C.1 (#25), and PR-C.2 (#26) merged. PR-C.3 closes the phase.