Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions src/openarmature/graph/compiled.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,12 @@
current_active_observer_span,
current_attempt_index,
)
from openarmature.observability.metadata import (
_reset_invocation_metadata,
_set_invocation_metadata,
current_invocation_metadata,
validate_invocation_metadata,
)

from .edges import END, ConditionalEdge, EndSentinel, StaticEdge
from .errors import (
Expand Down Expand Up @@ -767,6 +773,7 @@ async def invoke(
*,
correlation_id: str | None = None,
resume_invocation: str | None = None,
metadata: Mapping[str, Any] | None = None,
) -> StateT:
"""Run the graph from ``initial_state`` to END and return the
final state.
Expand Down Expand Up @@ -805,8 +812,33 @@ async def invoke(
own retry logic if transient backend failures should be
reattempted.

**Caller-supplied invocation metadata (proposal 0034).**

- ``metadata`` is an optional mapping of arbitrary
``key → value`` entries the framework propagates to every
observability backend. Values MUST be OTel-attribute-
compatible scalars (``str`` / ``int`` / ``float`` / ``bool``)
or homogeneous arrays of those types. Keys MUST NOT use
the ``openarmature.*`` or ``gen_ai.*`` reserved namespaces.
Validation runs synchronously at the API boundary; rule
violations raise ``ValueError`` BEFORE any work begins.
- Per spec §5.6 the OTel observer emits each entry as an
``openarmature.user.<key>`` cross-cutting span attribute on
every span and OTel log record. The Langfuse observer
merges each entry into ``trace.metadata`` AND every
``observation.metadata`` (top level, sibling to
``correlation_id``).
- Mid-invocation augmentation via
:func:`openarmature.observability.set_invocation_metadata`
merges into the same ContextVar with the same validation
rules; affects spans emitted AFTER the call returns.

Raises one of the runtime error categories on failure.
"""
# Validate caller-supplied metadata at the API boundary so any
# rule violation surfaces synchronously before the worker task
# is created or any node body runs.
validated_metadata = validate_invocation_metadata(metadata)

invocation_scoped = tuple(_coerce_subscribed(o) for o in (observers or ()))
queue: asyncio.Queue[_QueuedItem | None] = asyncio.Queue()
Expand Down Expand Up @@ -943,6 +975,7 @@ async def invoke(
# "per-invocation is OUTERMOST invoke" wording).
correlation_token = _set_correlation_id(resolved_correlation_id)
invocation_token = _set_invocation_id(invocation_id)
metadata_token = _set_invocation_metadata(validated_metadata)
Comment thread
chris-colinsky marked this conversation as resolved.
worker = asyncio.create_task(deliver_loop(queue, context.drain_counters))
self._active_workers[worker] = context
# Auto-prune: when the worker completes (after the sentinel is
Expand Down Expand Up @@ -973,11 +1006,13 @@ async def invoke(
post_state=None,
error=None,
parent_states=(),
caller_invocation_metadata=current_invocation_metadata(),
),
)
try:
return await self._invoke(starting_state, context)
finally:
_reset_invocation_metadata(metadata_token)
_reset_invocation_id(invocation_token)
_reset_correlation_id(correlation_token)
# Sentinel terminates the worker after it processes events
Expand Down Expand Up @@ -1988,6 +2023,7 @@ def _dispatch_started(
fan_out_config=fan_out_config,
branch_name=current_branch_name(),
subgraph_identities=context.subgraph_identities,
caller_invocation_metadata=current_invocation_metadata(),
),
)

Expand Down Expand Up @@ -2022,6 +2058,7 @@ def _dispatch_completed(
fan_out_config=fan_out_config,
branch_name=current_branch_name(),
subgraph_identities=context.subgraph_identities,
caller_invocation_metadata=current_invocation_metadata(),
),
)

Expand Down Expand Up @@ -2205,5 +2242,6 @@ async def _maybe_save_checkpoint(
attempt_index=attempt_index,
fan_out_index=None,
subgraph_identities=context.subgraph_identities,
caller_invocation_metadata=current_invocation_metadata(),
),
)
26 changes: 25 additions & 1 deletion src/openarmature/graph/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,21 @@
Frozen dataclass; observers receive a snapshot, not a live handle.
"""

from dataclasses import dataclass
from collections.abc import Mapping
from dataclasses import dataclass, field
from types import MappingProxyType
from typing import Any, Literal

from openarmature.observability.metadata import AttributeValue

from .errors import RuntimeGraphError
from .state import State

# Sentinel empty metadata mapping for events constructed without a
# live caller-metadata snapshot (test helpers, synthetic events).
# Read-only proxy keeps the default allocation-free.
_EMPTY_METADATA: MappingProxyType[str, AttributeValue] = MappingProxyType({})


# Spec: realizes observability §5.4 fan-out attributes via the
# event-payload mechanism added by proposal 0013 (v0.10.0). Backend
Expand Down Expand Up @@ -205,6 +214,21 @@ class NodeEvent:
# empty string when ``None`` per §5.3's "if the implementation
# tracks one" clause.
subgraph_identities: tuple[str | None, ...] = ()
# Per observability §3.4 + §5.6 (proposal 0034): snapshot of the
# caller-supplied invocation metadata at event-construction
# time. The engine reads ``current_invocation_metadata()`` when
# it constructs the event (in the engine task / node body's
# Context); the observer reads from the snapshot on the event
# rather than re-reading the ContextVar at observer time —
# critical because the observer runs on the engine's
# ``deliver_loop`` task whose Context is frozen at invoke time
# (asyncio.create_task copies the parent Context at task
# creation), so the live ContextVar value in the deliver_loop
# would NOT reflect mid-invocation augmentations made by node
# bodies running in the main engine task. Observers emit each
# entry as ``openarmature.user.<key>`` (OTel, §5.6) /
# ``metadata.<key>`` (Langfuse, §8.4.1+§8.4.2).
caller_invocation_metadata: Mapping[str, AttributeValue] = field(default_factory=lambda: _EMPTY_METADATA)


__all__ = ["FanOutEventConfig", "NodeEvent"]
2 changes: 2 additions & 0 deletions src/openarmature/llm/providers/openai.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
current_namespace_prefix,
)
from openarmature.observability.llm_event import LlmEventPayload
from openarmature.observability.metadata import current_invocation_metadata

# ``current_prompt_group`` / ``current_prompt_result`` are imported
# lazily inside :meth:`OpenAIProvider.complete` to avoid a module-load
Expand Down Expand Up @@ -1264,6 +1265,7 @@ def _make_llm_event(
response_id=response_id,
response_model=response_model,
genai_system=genai_system,
caller_invocation_metadata=dict(current_invocation_metadata()),
)
return NodeEvent(
node_name="openarmature.llm.complete",
Expand Down
12 changes: 12 additions & 0 deletions src/openarmature/observability/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,16 @@
# ``opentelemetry-sdk`` dependency) along.
from .llm_event import LLM_NAMESPACE, LlmEventPayload

# v0.10.0 (proposal 0034): caller-supplied invocation metadata surface.
# `set_invocation_metadata` is the public augmentation helper users
# call from inside node bodies / middleware / observers;
# `current_invocation_metadata` is the public reader observers and
# capability code consume.
from .metadata import (
current_invocation_metadata,
set_invocation_metadata,
)

__all__ = [
"LLM_NAMESPACE",
"LlmEventPayload",
Expand All @@ -50,5 +60,7 @@
"current_dispatch",
"current_fan_out_index",
"current_invocation_id",
"current_invocation_metadata",
"current_namespace_prefix",
"set_invocation_metadata",
]
34 changes: 34 additions & 0 deletions src/openarmature/observability/langfuse/observer.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import json
import uuid
from collections.abc import Mapping
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Any, cast

Expand Down Expand Up @@ -80,6 +81,28 @@ def _empty_str_frozenset() -> frozenset[str]:
return frozenset()


def _apply_caller_metadata(metadata: dict[str, Any], caller_metadata: Mapping[str, Any]) -> None:
"""Merge caller-supplied invocation metadata into a Trace's or
Observation's metadata bag at top level per observability §8.4.1
+ §8.4.2 (proposal 0034).

Top-level placement is by spec: Langfuse UI filters on
``metadata.<key>`` directly, so caller-supplied entries become
siblings to ``correlation_id`` / ``entry_node`` rather than
nested under a ``user`` sub-object.

Reserved-key collision with §8.4.1 / §8.4.2 keys
(``correlation_id``, ``entry_node``, ``spec_version``,
``namespace``, etc.) is not currently checked here: the spec
permits the rejection to happen at either boundary, and the
``invoke()`` API-boundary validation already rejects
``openarmature.*`` / ``gen_ai.*`` prefixed keys. Per-Langfuse-
backend collision rejection is queued as a follow-up.
"""
for key, value in caller_metadata.items():
metadata[key] = value


def _subgraph_identity_at(event: NodeEvent, depth: int) -> str:
"""Return the compiled-subgraph identity for the wrapper at the
given 1-based namespace depth, or the empty string when no
Expand Down Expand Up @@ -366,6 +389,7 @@ def _open_trace(self, invocation_id: str, correlation_id: str | None, event: Nod
}
if correlation_id is not None:
metadata["correlation_id"] = correlation_id
_apply_caller_metadata(metadata, event.caller_invocation_metadata)
# §8.6 trace name: caller-supplied invocation label takes
# precedence; entry-node name is the spec-recommended fallback.
# The caller-supplied path lands in proposal 0034 (PR 4) — for
Expand Down Expand Up @@ -533,6 +557,7 @@ def _open_subgraph_observation(
}
if correlation_id is not None:
metadata["correlation_id"] = correlation_id
_apply_caller_metadata(metadata, event.caller_invocation_metadata)
handle = self.client.span(
trace_id=inv_state.trace_id,
name=prefix[-1],
Expand Down Expand Up @@ -566,6 +591,7 @@ def _open_fan_out_instance_dispatch_observation(
}
if correlation_id is not None:
metadata["correlation_id"] = correlation_id
_apply_caller_metadata(metadata, event.caller_invocation_metadata)
handle = self.client.span(
trace_id=inv_state.trace_id,
name=prefix[-1],
Expand Down Expand Up @@ -622,6 +648,7 @@ def _open_detached_subgraph_trace(
}
if correlation_id is not None:
link_metadata["correlation_id"] = correlation_id
_apply_caller_metadata(link_metadata, event.caller_invocation_metadata)
parent_observation_id: str | None = None
for plen in range(len(prefix) - 1, 0, -1):
outer = prefix[:plen]
Expand All @@ -646,6 +673,7 @@ def _open_detached_subgraph_trace(
detached_metadata: dict[str, Any] = {"detached_from_invocation_id": inv_state.trace_id}
if correlation_id is not None:
detached_metadata["correlation_id"] = correlation_id
_apply_caller_metadata(detached_metadata, event.caller_invocation_metadata)
identity = _subgraph_identity_at(event, len(prefix))
# The detached trace's wrapper observation IS the migrated
# SubgraphNode wrapper. Per the resolution in coord thread
Expand Down Expand Up @@ -673,6 +701,7 @@ def _open_detached_subgraph_trace(
}
if correlation_id is not None:
dispatch_metadata["correlation_id"] = correlation_id
_apply_caller_metadata(dispatch_metadata, event.caller_invocation_metadata)
handle = self.client.span(
trace_id=detached_trace_id,
name=wrapper_obs_name,
Expand Down Expand Up @@ -715,6 +744,7 @@ def _open_detached_fan_out_instance_trace(
}
if correlation_id is not None:
link_metadata["correlation_id"] = correlation_id
_apply_caller_metadata(link_metadata, event.caller_invocation_metadata)
fan_out_open.handle.update(metadata=link_metadata)
# Open the detached Trace + per-instance dispatch observation.
detached_metadata: dict[str, Any] = {
Expand All @@ -723,6 +753,7 @@ def _open_detached_fan_out_instance_trace(
}
if correlation_id is not None:
detached_metadata["correlation_id"] = correlation_id
_apply_caller_metadata(detached_metadata, event.caller_invocation_metadata)
self.client.trace(
id=detached_trace_id,
name=prefix[-1],
Expand All @@ -736,6 +767,7 @@ def _open_detached_fan_out_instance_trace(
}
if correlation_id is not None:
dispatch_metadata["correlation_id"] = correlation_id
_apply_caller_metadata(dispatch_metadata, event.caller_invocation_metadata)
handle = self.client.span(
trace_id=detached_trace_id,
name=prefix[-1],
Expand Down Expand Up @@ -866,6 +898,7 @@ def _observation_metadata(self, event: NodeEvent, correlation_id: str | None) ->
metadata["fan_out_item_count"] = cfg.item_count
metadata["fan_out_concurrency"] = 0 if cfg.concurrency is None else cfg.concurrency
metadata["fan_out_error_policy"] = cfg.error_policy
_apply_caller_metadata(metadata, event.caller_invocation_metadata)
return metadata

# ------------------------------------------------------------------
Expand Down Expand Up @@ -1005,6 +1038,7 @@ def _llm_metadata_and_payload(
active_group = payload.active_prompt_group
if active_group is not None:
metadata["prompt_group_name"] = active_group.group_name
_apply_caller_metadata(metadata, payload.caller_invocation_metadata)

model_parameters: dict[str, Any] = {}
request_params = payload.request_params or {}
Expand Down
14 changes: 13 additions & 1 deletion src/openarmature/observability/llm_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@

from typing import Any

from pydantic import BaseModel, ConfigDict
from pydantic import BaseModel, ConfigDict, Field

# Sentinel namespace the LLM provider emits to signal "this is an LLM
# event, not a regular node event." Backend mappings (the OTel observer
Expand Down Expand Up @@ -112,6 +112,18 @@ class LlmEventPayload(BaseModel):
response_id: str | None = None
response_model: str | None = None
genai_system: str = "openai"
# Per proposal 0034 / observability §3.4 + §5.6: snapshot of
# caller-supplied invocation metadata captured at LLM-event
# dispatch time (in the calling node's Context). Backend
# observers read from the snapshot rather than re-reading the
# ContextVar at observer time — the OTel + Langfuse observers
# run on the engine's ``deliver_loop`` task whose Context is
# frozen at invoke time, so mid-invocation augmentations made
# by node bodies running in the main engine task are NOT visible
# there. The snapshot pattern mirrors the existing
# ``calling_namespace_prefix`` / ``calling_attempt_index`` /
# ``calling_fan_out_index`` fields.
caller_invocation_metadata: dict[str, Any] = Field(default_factory=dict)


__all__ = ["LLM_NAMESPACE", "LlmEventPayload"]
Loading