Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 27 additions & 8 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ openai = ["openai>=1.0.0"]
anthropic = ["anthropic>=0.18.0"]
langchain = ["langchain-core>=0.1.0"]
litellm = ["litellm>=1.0.0"]
pydantic-ai = ["pydantic-ai>=0.2.0"]
openai-agents = ["openai-agents>=0.1.0"]
semantic-kernel = ["semantic-kernel>=1.0.0"]

[project.urls]
Homepage = "https://github.com/LayerLens/stratix-python"
Expand All @@ -50,14 +53,15 @@ stratix = "layerlens.cli:main"
managed = true
# version pins are in requirements-dev.lock
dev-dependencies = [
"mypy",
"pytest",
"pyright==1.1.399",
"pytest-cov>=6.2.1",
"ruff",
"build",
"twine==6.1.0",
"click>=8.0.0",
"mypy",
"pytest",
"pyright==1.1.399",
"pytest-cov>=6.2.1",
"ruff",
"build",
"twine==6.1.0",
"click>=8.0.0",
"crewai>=0.5.0",
]

[tool.rye.scripts]
Expand Down Expand Up @@ -146,6 +150,21 @@ known-first-party = ["openai", "tests"]
"src/layerlens/cli/**" = ["T201", "T203"]
"src/layerlens/instrument/adapters/frameworks/langchain.py" = ["ARG002"]
"src/layerlens/instrument/adapters/frameworks/langgraph.py" = ["ARG002"]
"src/layerlens/instrument/adapters/frameworks/crewai.py" = ["ARG002"]
"src/layerlens/instrument/adapters/frameworks/pydantic_ai.py" = ["ARG002"]
"src/layerlens/instrument/adapters/frameworks/openai_agents.py" = ["ARG002"]
"src/layerlens/instrument/adapters/frameworks/autogen.py" = ["ARG002"]
"src/layerlens/instrument/adapters/frameworks/llamaindex.py" = ["ARG002"]
"src/layerlens/instrument/adapters/frameworks/semantic_kernel.py" = ["ARG002"]
"src/layerlens/instrument/adapters/frameworks/smolagents.py" = ["ARG002"]
"src/layerlens/instrument/adapters/frameworks/google_adk.py" = ["ARG002"]
"src/layerlens/instrument/adapters/frameworks/agno.py" = ["ARG002"]
"src/layerlens/instrument/adapters/frameworks/strands.py" = ["ARG002"]
"src/layerlens/instrument/adapters/frameworks/bedrock_agents.py" = ["ARG002"]
"src/layerlens/instrument/adapters/frameworks/ms_agent_framework.py" = ["ARG002"]
"src/layerlens/instrument/adapters/frameworks/haystack.py" = ["ARG002"]
"src/layerlens/instrument/adapters/frameworks/langfuse.py" = ["ARG002"]
"src/layerlens/instrument/adapters/frameworks/agentforce.py" = ["ARG002"]

[tool.pyright]
include = ["src", "tests"]
Expand Down
3 changes: 3 additions & 0 deletions src/layerlens/instrument/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from ._capture_config import CaptureConfig
from ._collector import TraceCollector
from ._decorator import trace
from ._context_propagation import trace_context, get_trace_context
from .adapters._base import AdapterInfo, BaseAdapter

__all__ = [
Expand All @@ -13,6 +14,8 @@
"CaptureConfig",
"TraceCollector",
"emit",
"get_trace_context",
"span",
"trace",
"trace_context",
]
75 changes: 50 additions & 25 deletions src/layerlens/instrument/_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,25 @@
import time
import uuid
import logging
import threading
from typing import Any, Dict, List, Optional

from layerlens.attestation import HashChain

from ._capture_config import CaptureConfig
from ._upload import upload_trace, async_upload_trace
from ._upload import enqueue_upload

log: logging.Logger = logging.getLogger(__name__)


class TraceCollector:
"""Collects flat events for a single trace, with CaptureConfig gating and attestation."""
"""Collects flat events for a single trace, with CaptureConfig gating and attestation.

Thread-safe: all mutations go through ``self._lock``.
Once ``flush()`` is called the collector is sealed — further ``emit()`` calls are no-ops.
"""

MAX_EVENTS = 10_000

def __init__(self, client: Any, config: CaptureConfig) -> None:
self._client = client
Expand All @@ -23,6 +30,9 @@ def __init__(self, client: Any, config: CaptureConfig) -> None:
self._events: List[Dict[str, Any]] = []
self._sequence: int = 0
self._chain = HashChain()
self._capped = False
self._sealed = False
self._lock = threading.Lock()

@property
def trace_id(self) -> str:
Expand All @@ -46,19 +56,32 @@ def emit(

payload = self._config.redact_payload(event_type, payload)

self._sequence += 1
event: Dict[str, Any] = {
"event_type": event_type,
"trace_id": self._trace_id,
"span_id": span_id,
"parent_span_id": parent_span_id,
"span_name": span_name,
"sequence_id": self._sequence,
"timestamp_ns": time.time_ns(),
"payload": payload,
}
self._chain.add_event(event)
self._events.append(event)
with self._lock:
if self._sealed:
return

if len(self._events) >= self.MAX_EVENTS:
if not self._capped:
self._capped = True
log.warning(
"layerlens: trace %s hit %d event limit, further events dropped",
self._trace_id, self.MAX_EVENTS,
)
return

self._sequence += 1
event: Dict[str, Any] = {
"event_type": event_type,
"trace_id": self._trace_id,
"span_id": span_id,
"parent_span_id": parent_span_id,
"span_name": span_name,
"sequence_id": self._sequence,
"timestamp_ns": time.time_ns(),
"payload": payload,
}
self._chain.add_event(event)
self._events.append(event)

def _build_trace_payload(self) -> Dict[str, Any]:
"""Build the attestation envelope and trace payload."""
Expand All @@ -73,21 +96,23 @@ def _build_trace_payload(self) -> Dict[str, Any]:
log.warning("Failed to build attestation chain", exc_info=True)
attestation = {"attestation_error": str(exc)}

return {
trace_payload: Dict[str, Any] = {
"trace_id": self._trace_id,
"events": self._events,
"capture_config": self._config.to_dict(),
"attestation": attestation,
}
if self._capped:
trace_payload["truncated"] = True
trace_payload["max_events"] = self.MAX_EVENTS
return trace_payload

def flush(self) -> None:
"""Build attestation and upload the trace."""
if not self._events:
return
upload_trace(self._client, self._build_trace_payload())
"""Seal the collector, build attestation, and enqueue the trace for background upload."""
with self._lock:
if self._sealed or not self._events:
return
self._sealed = True
payload = self._build_trace_payload()
enqueue_upload(self._client, payload)

async def async_flush(self) -> None:
"""Async version of flush."""
if not self._events:
return
await async_upload_trace(self._client, self._build_trace_payload())
25 changes: 24 additions & 1 deletion src/layerlens/instrument/_context.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

from typing import Any, Optional, NamedTuple
from dataclasses import dataclass, field
from typing import Any, Dict, Optional, NamedTuple
from contextvars import ContextVar

from ._collector import TraceCollector
Expand All @@ -11,6 +12,28 @@
_current_span_name: ContextVar[Optional[str]] = ContextVar("_current_span_name", default=None)


@dataclass
class RunState:
"""Per-run state isolated via ContextVar.

Each concurrent run (agent invocation, crew kickoff, etc.) gets its own
RunState stored in ``_current_run``. This isolates the collector, root span,
timers, and any adapter-specific data so concurrent runs on the same adapter
instance don't clobber each other.
"""

collector: TraceCollector
root_span_id: str
timers: Dict[str, int] = field(default_factory=dict)
data: Dict[str, Any] = field(default_factory=dict)
_token: Any = field(default=None, repr=False)
_col_token: Any = field(default=None, repr=False)
_span_snapshot: Any = field(default=None, repr=False)


_current_run: ContextVar[Optional[RunState]] = ContextVar("_current_run", default=None)


class _SpanSnapshot(NamedTuple):
span_id: Any
parent_span_id: Any
Expand Down
93 changes: 93 additions & 0 deletions src/layerlens/instrument/_context_propagation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
from __future__ import annotations

import uuid
from typing import Any, Dict, Generator, Optional
from contextlib import contextmanager

from ._collector import TraceCollector
from ._capture_config import CaptureConfig
from ._context import (
_current_collector,
_current_span_id,
_parent_span_id,
_push_span,
_pop_span,
)


@contextmanager
def trace_context(
client: Any,
*,
capture_config: Optional[CaptureConfig] = None,
from_context: Optional[Dict[str, Any]] = None,
) -> Generator[TraceCollector, None, None]:
"""Establish a shared trace context for multiple adapters.

Creates a :class:`TraceCollector` and sets it as the active collector
in ``contextvars`` so that any adapter emitting events inside the
block will use the same ``trace_id`` and span hierarchy.

When *from_context* is provided (a dict from :func:`get_trace_context`),
the new collector reuses the original ``trace_id`` so events on both
sides of a boundary belong to the same trace.

The collector is flushed automatically when the context exits.

Args:
client: A :class:`~layerlens.Stratix` (or compatible) client used
for uploading the trace on flush.
capture_config: Optional capture configuration. Falls back to
:meth:`CaptureConfig.standard` if not provided.
from_context: Optional dict produced by :func:`get_trace_context`.
When supplied the collector inherits the original trace_id.

Yields:
The shared :class:`TraceCollector`.
"""
config = capture_config or CaptureConfig.standard()
collector = TraceCollector(client, config)

if from_context is not None:
collector._trace_id = from_context["trace_id"] # noqa: SLF001

root_span_id = uuid.uuid4().hex[:16]

col_token = _current_collector.set(collector)
span_snapshot = _push_span(root_span_id, "trace_context")
try:
yield collector
finally:
_pop_span(span_snapshot)
_current_collector.reset(col_token)
collector.flush()


def get_trace_context() -> Optional[Dict[str, Any]]:
"""Snapshot the current trace context as a plain dict.

Returns ``None`` when called outside a ``@trace`` / ``trace_context``
block. The returned dict is safe to serialise (JSON, headers, message
queues, etc.) and restore via ``trace_context(client, from_context=ctx)``.

Keys:

* ``trace_id`` — 16-char hex trace identifier
* ``span_id`` — current span (becomes the parent in the remote scope)
* ``parent_span_id`` — optional grandparent for reference
* ``version`` — format version for forward compatibility
"""
collector = _current_collector.get()
if collector is None:
return None

span_id = _current_span_id.get()
if span_id is None:
return None

return {
"trace_id": collector.trace_id,
"span_id": span_id,
"parent_span_id": _parent_span_id.get(),
"version": 1,
}
4 changes: 2 additions & 2 deletions src/layerlens/instrument/_decorator.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ async def async_wrapper(*args: Any, **kwargs: Any) -> Any:
span_id=root_span_id,
span_name=span_name,
)
await collector.async_flush()
collector.flush()
return result
except Exception as exc:
collector.emit(
Expand All @@ -55,7 +55,7 @@ async def async_wrapper(*args: Any, **kwargs: Any) -> Any:
span_id=root_span_id,
span_name=span_name,
)
await collector.async_flush()
collector.flush()
raise
finally:
_pop_span(span_snapshot)
Expand Down
Loading