Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions src/layerlens/instrument/_capture_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,18 @@ class CaptureConfig:
# Gates LLM message content (prompts/completions) independently of L-layers
capture_content: bool = True

def redact_payload(
self, event_type: str, payload: Dict[str, Any]
) -> Dict[str, Any]:
"""Return a copy of payload with fields removed per config."""
if not self.capture_content and event_type == "model.invoke":
payload = {
k: v
for k, v in payload.items()
if k not in ("messages", "output_message")
}
return payload

def is_layer_enabled(self, event_type: str) -> bool:
"""Check if an event type is enabled by this config.

Expand Down
44 changes: 11 additions & 33 deletions src/layerlens/instrument/_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,7 @@ def emit(
if not self._config.is_layer_enabled(event_type):
return

# Strip LLM message content when capture_content is off
if not self._config.capture_content and event_type == "model.invoke":
payload = {
k: v
for k, v in payload.items()
if k not in ("messages", "output_message")
}
payload = self._config.redact_payload(event_type, payload)

self._sequence += 1
event: Dict[str, Any] = {
Expand All @@ -66,11 +60,8 @@ def emit(
self._chain.add_event(event)
self._events.append(event)

def flush(self) -> None:
"""Build attestation and upload the trace."""
if not self._events:
return

def _build_trace_payload(self) -> Dict[str, Any]:
"""Build the attestation envelope and trace payload."""
try:
trial = self._chain.finalize()
attestation: Dict[str, Any] = {
Expand All @@ -82,34 +73,21 @@ def flush(self) -> None:
log.warning("Failed to build attestation chain", exc_info=True)
attestation = {"attestation_error": str(exc)}

payload = {
return {
"trace_id": self._trace_id,
"events": self._events,
"capture_config": self._config.to_dict(),
"attestation": attestation,
}
upload_trace(self._client, payload)

def flush(self) -> None:
"""Build attestation and upload the trace."""
if not self._events:
return
upload_trace(self._client, self._build_trace_payload())

async def async_flush(self) -> None:
"""Async version of flush."""
if not self._events:
return

try:
trial = self._chain.finalize()
attestation: Dict[str, Any] = {
"chain": self._chain.to_dict(),
"root_hash": trial.hash,
"schema_version": "1.0",
}
except Exception as exc:
log.warning("Failed to build attestation chain", exc_info=True)
attestation = {"attestation_error": str(exc)}

payload = {
"trace_id": self._trace_id,
"events": self._events,
"capture_config": self._config.to_dict(),
"attestation": attestation,
}
await async_upload_trace(self._client, payload)
await async_upload_trace(self._client, self._build_trace_payload())
14 changes: 7 additions & 7 deletions src/layerlens/instrument/_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,24 +11,24 @@
_current_span_name: ContextVar[Optional[str]] = ContextVar("_current_span_name", default=None)


class _SpanTokens(NamedTuple):
class _SpanSnapshot(NamedTuple):
span_id: Any
parent_span_id: Any
span_name: Any


def _push_span(span_id: str, name: Optional[str] = None) -> _SpanTokens:
def _push_span(span_id: str, name: Optional[str] = None) -> _SpanSnapshot:
"""Push a new span onto the context stack. The current span becomes the parent."""
old_span_id = _current_span_id.get()
return _SpanTokens(
return _SpanSnapshot(
span_id=_current_span_id.set(span_id),
parent_span_id=_parent_span_id.set(old_span_id),
span_name=_current_span_name.set(name),
)


def _pop_span(tokens: _SpanTokens) -> None:
def _pop_span(snapshot: _SpanSnapshot) -> None:
"""Restore the previous span context."""
_current_span_name.reset(tokens.span_name)
_parent_span_id.reset(tokens.parent_span_id)
_current_span_id.reset(tokens.span_id)
_current_span_name.reset(snapshot.span_name)
_parent_span_id.reset(snapshot.parent_span_id)
_current_span_id.reset(snapshot.span_id)
8 changes: 4 additions & 4 deletions src/layerlens/instrument/_decorator.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ async def async_wrapper(*args: Any, **kwargs: Any) -> Any:
root_span_id = uuid.uuid4().hex[:16]

col_token = _current_collector.set(collector)
span_tokens = _push_span(root_span_id, span_name)
span_snapshot = _push_span(root_span_id, span_name)
try:
collector.emit(
"agent.input",
Expand Down Expand Up @@ -58,7 +58,7 @@ async def async_wrapper(*args: Any, **kwargs: Any) -> Any:
await collector.async_flush()
raise
finally:
_pop_span(span_tokens)
_pop_span(span_snapshot)
_current_collector.reset(col_token)

return async_wrapper
Expand All @@ -71,7 +71,7 @@ def sync_wrapper(*args: Any, **kwargs: Any) -> Any:
root_span_id = uuid.uuid4().hex[:16]

col_token = _current_collector.set(collector)
span_tokens = _push_span(root_span_id, span_name)
span_snapshot = _push_span(root_span_id, span_name)
try:
collector.emit(
"agent.input",
Expand Down Expand Up @@ -100,7 +100,7 @@ def sync_wrapper(*args: Any, **kwargs: Any) -> Any:
collector.flush()
raise
finally:
_pop_span(span_tokens)
_pop_span(span_snapshot)
_current_collector.reset(col_token)

return sync_wrapper
Expand Down
4 changes: 2 additions & 2 deletions src/layerlens/instrument/_span.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ def span(name: str) -> Generator[str, None, None]:
Yields the span_id string.
"""
new_span_id = uuid.uuid4().hex[:16]
tokens = _push_span(new_span_id, name)
snapshot = _push_span(new_span_id, name)
try:
yield new_span_id
finally:
_pop_span(tokens)
_pop_span(snapshot)
158 changes: 108 additions & 50 deletions src/layerlens/instrument/adapters/frameworks/_base_framework.py
Original file line number Diff line number Diff line change
@@ -1,79 +1,137 @@
"""Unified base class for all framework adapters.

Framework adapters hook into a framework's callback / event / tracing
system and emit LayerLens events. They share a common lifecycle:

1. Lazy-init a :class:`TraceCollector` on first event.
2. Emit events through a thread-safe helper.
3. Flush the collector when a logical trace ends (root span completes,
agent run finishes, disconnect, etc.).

Subclasses MUST set ``name`` and implement ``connect()``.
Subclasses SHOULD call ``super().disconnect()`` after unhooking.
"""
from __future__ import annotations

import uuid
from uuid import UUID
from typing import Any, Dict, Optional, Tuple
import threading
from typing import Any, Dict, Optional

from .._base import AdapterInfo, BaseAdapter
from ..._capture_config import CaptureConfig
from ..._collector import TraceCollector
from ..._capture_config import CaptureConfig


class FrameworkTracer(BaseAdapter):
"""Base class for framework adapters that manage their own collector.

Framework adapters (LangChain, LangGraph, etc.) receive callbacks
from the framework rather than wrapping SDK methods. They maintain
their own TraceCollector and map framework run_ids to span_ids.
"""
class FrameworkAdapter(BaseAdapter):
"""Base for framework adapters with collector lifecycle management."""

_adapter_name: str = "framework"
name: str # Subclass must set: "crewai", "llamaindex", etc.

def __init__(self, client: Any, capture_config: Optional[CaptureConfig] = None) -> None:
self._client: Any = None
self._client = client
self._config = capture_config or CaptureConfig.standard()
self._lock = threading.Lock()
self._connected = False
self._collector: Optional[TraceCollector] = None
self._root_span_id: Optional[str] = None
# Optional run_id → span_id mapping for callback-style frameworks
self._span_ids: Dict[str, str] = {}
self._root_run_id: Optional[str] = None
self.connect(client)

def connect(self, target: Any = None, **kwargs: Any) -> Any: # noqa: ARG002
self._client = target
return target

def disconnect(self) -> None:
self._span_ids.clear()
self._root_run_id = None
self._collector = None

def adapter_info(self) -> AdapterInfo:
return AdapterInfo(
name=self._adapter_name,
adapter_type="framework",
connected=self._client is not None,
)
# ------------------------------------------------------------------
# Collector lifecycle
# ------------------------------------------------------------------

def _ensure_collector(self) -> TraceCollector:
"""Lazily create a collector and root span ID."""
if self._collector is None:
self._collector = TraceCollector(self._client, self._config)
self._root_span_id = uuid.uuid4().hex[:16]
return self._collector

def _get_or_create_span_id(
self, run_id: UUID, parent_run_id: Optional[UUID] = None
) -> Tuple[str, Optional[str]]:
rid = str(run_id)
if rid not in self._span_ids:
self._span_ids[rid] = uuid.uuid4().hex[:16]
span_id = self._span_ids[rid]
parent_span_id = self._span_ids.get(str(parent_run_id)) if parent_run_id else None
if self._root_run_id is None:
self._root_run_id = rid
return span_id, parent_span_id
@staticmethod
def _new_span_id() -> str:
return uuid.uuid4().hex[:16]

# ------------------------------------------------------------------
# Event emission (thread-safe)
# ------------------------------------------------------------------

def _emit(
self,
event_type: str,
payload: Dict[str, Any],
run_id: UUID,
parent_run_id: Optional[UUID] = None,
span_id: Optional[str] = None,
parent_span_id: Optional[str] = None,
span_name: Optional[str] = None,
) -> None:
collector = self._ensure_collector()
span_id, parent_span_id = self._get_or_create_span_id(run_id, parent_run_id)
collector.emit(event_type, payload, span_id=span_id, parent_span_id=parent_span_id)
"""Thread-safe event emission through the collector."""
with self._lock:
collector = self._ensure_collector()
sid = span_id or self._new_span_id()
parent = parent_span_id or self._root_span_id
collector.emit(
event_type, payload,
span_id=sid, parent_span_id=parent, span_name=span_name,
)

def _maybe_flush(self, run_id: UUID) -> None:
if str(run_id) == self._root_run_id and self._collector is not None:
self._collector.flush()
self._span_ids.clear()
self._root_run_id = None
# ------------------------------------------------------------------
# Run ID → span ID mapping (opt-in for callback-style frameworks)
# ------------------------------------------------------------------

def _span_id_for(self, run_id: Any, parent_run_id: Any = None) -> tuple[str, Optional[str]]:
"""Map a framework run_id to a span_id, creating one if needed.

Returns ``(span_id, parent_span_id)``. Useful for frameworks
(LangChain, CrewAI, OpenAI Agents) that assign their own run
identifiers to each step.
"""
rid = str(run_id)
if rid not in self._span_ids:
self._span_ids[rid] = self._new_span_id()
span_id = self._span_ids[rid]
parent_span_id = self._span_ids.get(str(parent_run_id)) if parent_run_id else None
return span_id, parent_span_id

# ------------------------------------------------------------------
# Flush
# ------------------------------------------------------------------

def _flush_collector(self) -> None:
"""Flush the current collector and reset state."""
with self._lock:
collector = self._collector
self._collector = None
self._root_span_id = None
self._span_ids.clear()
if collector is not None:
collector.flush()

# ------------------------------------------------------------------
# BaseAdapter interface
# ------------------------------------------------------------------

def connect(self, target: Any = None, **kwargs: Any) -> Any:
"""Mark the adapter as connected.

Callback-style adapters (LangChain, LangGraph) are passed directly
to the framework, so ``connect()`` just flips the flag. Adapters
that need registration (CrewAI, LlamaIndex, etc.) should override.
"""
self._connected = True
return target

def disconnect(self) -> None:
"""Flush remaining events and mark as disconnected.

Subclasses should unhook from the framework first, then call
``super().disconnect()``.
"""
self._flush_collector()
self._connected = False

def adapter_info(self) -> AdapterInfo:
return AdapterInfo(
name=self.name,
adapter_type="framework",
connected=self._connected,
)
Loading