Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/layerlens/instrument/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from ._capture_config import CaptureConfig
from ._collector import TraceCollector
from ._decorator import trace
from ._context_propagation import trace_context, get_trace_context
from .adapters._base import AdapterInfo, BaseAdapter

__all__ = [
Expand All @@ -13,6 +14,8 @@
"CaptureConfig",
"TraceCollector",
"emit",
"get_trace_context",
"span",
"trace",
"trace_context",
]
12 changes: 12 additions & 0 deletions src/layerlens/instrument/_capture_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,18 @@ class CaptureConfig:
# Gates LLM message content (prompts/completions) independently of L-layers
capture_content: bool = True

def redact_payload(
self, event_type: str, payload: Dict[str, Any]
) -> Dict[str, Any]:
"""Return a copy of payload with fields removed per config."""
if not self.capture_content and event_type == "model.invoke":
payload = {
k: v
for k, v in payload.items()
if k not in ("messages", "output_message")
}
return payload

def is_layer_enabled(self, event_type: str) -> bool:
"""Check if an event type is enabled by this config.

Expand Down
44 changes: 11 additions & 33 deletions src/layerlens/instrument/_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,7 @@ def emit(
if not self._config.is_layer_enabled(event_type):
return

# Strip LLM message content when capture_content is off
if not self._config.capture_content and event_type == "model.invoke":
payload = {
k: v
for k, v in payload.items()
if k not in ("messages", "output_message")
}
payload = self._config.redact_payload(event_type, payload)

self._sequence += 1
event: Dict[str, Any] = {
Expand All @@ -66,11 +60,8 @@ def emit(
self._chain.add_event(event)
self._events.append(event)

def flush(self) -> None:
"""Build attestation and upload the trace."""
if not self._events:
return

def _build_trace_payload(self) -> Dict[str, Any]:
"""Build the attestation envelope and trace payload."""
try:
trial = self._chain.finalize()
attestation: Dict[str, Any] = {
Expand All @@ -82,34 +73,21 @@ def flush(self) -> None:
log.warning("Failed to build attestation chain", exc_info=True)
attestation = {"attestation_error": str(exc)}

payload = {
return {
"trace_id": self._trace_id,
"events": self._events,
"capture_config": self._config.to_dict(),
"attestation": attestation,
}
upload_trace(self._client, payload)

def flush(self) -> None:
"""Build attestation and upload the trace."""
if not self._events:
return
upload_trace(self._client, self._build_trace_payload())

async def async_flush(self) -> None:
"""Async version of flush."""
if not self._events:
return

try:
trial = self._chain.finalize()
attestation: Dict[str, Any] = {
"chain": self._chain.to_dict(),
"root_hash": trial.hash,
"schema_version": "1.0",
}
except Exception as exc:
log.warning("Failed to build attestation chain", exc_info=True)
attestation = {"attestation_error": str(exc)}

payload = {
"trace_id": self._trace_id,
"events": self._events,
"capture_config": self._config.to_dict(),
"attestation": attestation,
}
await async_upload_trace(self._client, payload)
await async_upload_trace(self._client, self._build_trace_payload())
37 changes: 29 additions & 8 deletions src/layerlens/instrument/_context.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

from typing import Any, Optional, NamedTuple
from dataclasses import dataclass, field
from typing import Any, Dict, Optional, NamedTuple
from contextvars import ContextVar

from ._collector import TraceCollector
Expand All @@ -11,24 +12,44 @@
_current_span_name: ContextVar[Optional[str]] = ContextVar("_current_span_name", default=None)


class _SpanTokens(NamedTuple):
@dataclass
class RunState:
"""Per-run state isolated via ContextVar.

Each concurrent run (agent invocation, crew kickoff, etc.) gets its own
RunState stored in ``_current_run``. This isolates the collector, root span,
timers, and any adapter-specific data so concurrent runs on the same adapter
instance don't clobber each other.
"""

collector: TraceCollector
root_span_id: str
timers: Dict[str, int] = field(default_factory=dict)
data: Dict[str, Any] = field(default_factory=dict)
_token: Any = field(default=None, repr=False)


_current_run: ContextVar[Optional[RunState]] = ContextVar("_current_run", default=None)


class _SpanSnapshot(NamedTuple):
span_id: Any
parent_span_id: Any
span_name: Any


def _push_span(span_id: str, name: Optional[str] = None) -> _SpanTokens:
def _push_span(span_id: str, name: Optional[str] = None) -> _SpanSnapshot:
"""Push a new span onto the context stack. The current span becomes the parent."""
old_span_id = _current_span_id.get()
return _SpanTokens(
return _SpanSnapshot(
span_id=_current_span_id.set(span_id),
parent_span_id=_parent_span_id.set(old_span_id),
span_name=_current_span_name.set(name),
)


def _pop_span(tokens: _SpanTokens) -> None:
def _pop_span(snapshot: _SpanSnapshot) -> None:
"""Restore the previous span context."""
_current_span_name.reset(tokens.span_name)
_parent_span_id.reset(tokens.parent_span_id)
_current_span_id.reset(tokens.span_id)
_current_span_name.reset(snapshot.span_name)
_parent_span_id.reset(snapshot.parent_span_id)
_current_span_id.reset(snapshot.span_id)
93 changes: 93 additions & 0 deletions src/layerlens/instrument/_context_propagation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
from __future__ import annotations

import uuid
from typing import Any, Dict, Generator, Optional
from contextlib import contextmanager

from ._collector import TraceCollector
from ._capture_config import CaptureConfig
from ._context import (
_current_collector,
_current_span_id,
_parent_span_id,
_push_span,
_pop_span,
)


@contextmanager
def trace_context(
client: Any,
*,
capture_config: Optional[CaptureConfig] = None,
from_context: Optional[Dict[str, Any]] = None,
) -> Generator[TraceCollector, None, None]:
"""Establish a shared trace context for multiple adapters.

Creates a :class:`TraceCollector` and sets it as the active collector
in ``contextvars`` so that any adapter emitting events inside the
block will use the same ``trace_id`` and span hierarchy.

When *from_context* is provided (a dict from :func:`get_trace_context`),
the new collector reuses the original ``trace_id`` so events on both
sides of a boundary belong to the same trace.

The collector is flushed automatically when the context exits.

Args:
client: A :class:`~layerlens.Stratix` (or compatible) client used
for uploading the trace on flush.
capture_config: Optional capture configuration. Falls back to
:meth:`CaptureConfig.standard` if not provided.
from_context: Optional dict produced by :func:`get_trace_context`.
When supplied the collector inherits the original trace_id.

Yields:
The shared :class:`TraceCollector`.
"""
config = capture_config or CaptureConfig.standard()
collector = TraceCollector(client, config)

if from_context is not None:
collector._trace_id = from_context["trace_id"] # noqa: SLF001

root_span_id = uuid.uuid4().hex[:16]

col_token = _current_collector.set(collector)
span_snapshot = _push_span(root_span_id, "trace_context")
try:
yield collector
finally:
_pop_span(span_snapshot)
_current_collector.reset(col_token)
collector.flush()


def get_trace_context() -> Optional[Dict[str, Any]]:
"""Snapshot the current trace context as a plain dict.

Returns ``None`` when called outside a ``@trace`` / ``trace_context``
block. The returned dict is safe to serialise (JSON, headers, message
queues, etc.) and restore via ``trace_context(client, from_context=ctx)``.

Keys:

* ``trace_id`` — 16-char hex trace identifier
* ``span_id`` — current span (becomes the parent in the remote scope)
* ``parent_span_id`` — optional grandparent for reference
* ``version`` — format version for forward compatibility
"""
collector = _current_collector.get()
if collector is None:
return None

span_id = _current_span_id.get()
if span_id is None:
return None

return {
"trace_id": collector.trace_id,
"span_id": span_id,
"parent_span_id": _parent_span_id.get(),
"version": 1,
}
8 changes: 4 additions & 4 deletions src/layerlens/instrument/_decorator.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ async def async_wrapper(*args: Any, **kwargs: Any) -> Any:
root_span_id = uuid.uuid4().hex[:16]

col_token = _current_collector.set(collector)
span_tokens = _push_span(root_span_id, span_name)
span_snapshot = _push_span(root_span_id, span_name)
try:
collector.emit(
"agent.input",
Expand Down Expand Up @@ -58,7 +58,7 @@ async def async_wrapper(*args: Any, **kwargs: Any) -> Any:
await collector.async_flush()
raise
finally:
_pop_span(span_tokens)
_pop_span(span_snapshot)
_current_collector.reset(col_token)

return async_wrapper
Expand All @@ -71,7 +71,7 @@ def sync_wrapper(*args: Any, **kwargs: Any) -> Any:
root_span_id = uuid.uuid4().hex[:16]

col_token = _current_collector.set(collector)
span_tokens = _push_span(root_span_id, span_name)
span_snapshot = _push_span(root_span_id, span_name)
try:
collector.emit(
"agent.input",
Expand Down Expand Up @@ -100,7 +100,7 @@ def sync_wrapper(*args: Any, **kwargs: Any) -> Any:
collector.flush()
raise
finally:
_pop_span(span_tokens)
_pop_span(span_snapshot)
_current_collector.reset(col_token)

return sync_wrapper
Expand Down
4 changes: 2 additions & 2 deletions src/layerlens/instrument/_span.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ def span(name: str) -> Generator[str, None, None]:
Yields the span_id string.
"""
new_span_id = uuid.uuid4().hex[:16]
tokens = _push_span(new_span_id, name)
snapshot = _push_span(new_span_id, name)
try:
yield new_span_id
finally:
_pop_span(tokens)
_pop_span(snapshot)
Loading