Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions components/runners/ambient-runner/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ The workspace context prompt is built by `build_sdk_system_prompt()` in `prompts
**Backend selection**

- `OBSERVABILITY_BACKENDS` - Comma-separated: `langfuse`, `mlflow`, or both (e.g. `langfuse,mlflow`). If unset, defaults to **`langfuse`** only so existing Langfuse behaviour is preserved.
- Turn traces are named **`llm_interaction`** (vendor-neutral). **`RUNNER_TYPE`** (same values as bridge selection: `claude-agent-sdk`, `gemini-cli`, …) is added to Langfuse tags (`runner:<type>`) and to span metadata for MLflow / Langfuse.

**MLflow GenAI tracing** (optional extra: `pip install 'ambient-runner[mlflow-observability]'` — pins **`mlflow[kubernetes]>=3.11`** for cluster auth)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,9 @@ async def tracing_middleware(
prompt: User prompt (used as input for the first turn trace).

Yields:
The original events plus an ``ambient:langfuse_trace`` ``CustomEvent``
once the Langfuse trace ID becomes available.
The original events plus an ``ambient:trace_id`` ``CustomEvent``
once the trace ID (from Langfuse or MLflow, depending on active
backend) becomes available.
"""
# Fast path: no observability — just pass through
if obs is None:
Expand Down Expand Up @@ -71,7 +72,7 @@ async def tracing_middleware(
if trace_id:
yield CustomEvent(
type=EventType.CUSTOM,
name="ambient:langfuse_trace",
name="ambient:trace_id",
value={"traceId": trace_id},
)
trace_id_emitted = True
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@

logger = logging.getLogger(__name__)

# Align with observability.TURN_TRACE_NAME / SESSION_METRICS_* (avoid import cycle).
_TURN_SPAN_NAME = "llm_interaction"
_SESSION_METRICS_SPAN_NAME = "Session Metrics"
_SESSION_METRICS_SOURCE = "ambient-runner-metrics"


class MLflowSessionTracer:
"""Mirrors turn/tool boundaries from ObservabilityManager into MLflow spans."""
Expand All @@ -22,6 +27,7 @@ def __init__(self, session_id: str, user_id: str, user_name: str) -> None:
self._turn_gen: Any = None
self._turn_span: Any = None
self._tool_ctx: dict[str, tuple[Any, Any]] = {}
self._runner_type = ""

@property
def enabled(self) -> bool:
Expand All @@ -41,6 +47,7 @@ def initialize(
workflow_branch: str,
workflow_path: str,
mask_fn: Callable[[Any], Any] | None,
runner_type: str | None = None,
) -> bool:
"""Configure tracking URI and experiment. Returns True on success."""
try:
Expand Down Expand Up @@ -88,6 +95,9 @@ def initialize(

self._namespace = namespace
self._mask_fn = mask_fn
self._runner_type = (
runner_type or os.getenv("RUNNER_TYPE", "claude-agent-sdk") or ""
).strip().lower() or "unknown"
self._enabled = True
logger.info(
"MLflow: session tracing enabled (session_id=%s, experiment=%s)",
Expand Down Expand Up @@ -125,12 +135,13 @@ def start_turn(self, model: str, user_input: str | None) -> None:
text_in = self._apply_mask(text_in)

gen = mlflow.start_span(
name="claude_interaction",
name=_TURN_SPAN_NAME,
span_type=SpanType.CHAIN,
attributes={
"ambient.session_id": self.session_id,
"ambient.user_id": self.user_id,
"ambient.namespace": self._namespace,
"ambient.runner_type": self._runner_type,
"llm.model_name": model,
},
)
Expand Down Expand Up @@ -249,9 +260,12 @@ def emit_session_summary_span(self, metadata: dict[str, Any]) -> None:
from mlflow.entities import SpanType

with mlflow.start_span(
name="Claude Code - Session Metrics",
name=_SESSION_METRICS_SPAN_NAME,
span_type=SpanType.CHAIN,
attributes={"ambient.source": "claude-code-metrics"},
attributes={
"ambient.source": _SESSION_METRICS_SOURCE,
"ambient.runner_type": self._runner_type,
},
) as span:
span.set_inputs(
{
Expand Down
152 changes: 101 additions & 51 deletions components/runners/ambient-runner/ambient_runner/observability.py
Original file line number Diff line number Diff line change
@@ -1,38 +1,44 @@
"""
Observability manager for Claude Code runner - hybrid Langfuse integration.

Provides Langfuse LLM observability for Claude sessions with trace structure:

1. Turn Traces (top-level generations):
- ONE trace per turn (SDK sends multiple AssistantMessages during streaming, but guard prevents duplicates)
- Named: "claude_interaction" (turn number stored in metadata)
- First AssistantMessage creates trace, subsequent ones ignored until end_turn() clears it
- Final trace contains authoritative turn number and usage data from ResultMessage
- Canonical format with separate cache token tracking for accurate cost
- All traces grouped by session_id via propagate_attributes()

2. Tool Spans (observations within turn traces):
- Named: tool_Read, tool_Write, tool_Bash, etc.
- Shows tool execution in real-time
- NO usage/cost data (prevents inflation from SDK's cumulative metrics)
- Child observations of their parent turn trace
Observability manager for ambient-runner — Langfuse and/or MLflow.

Works across runner backends (Claude Agent SDK, Gemini CLI, etc.). Span names are
vendor-neutral; ``RUNNER_TYPE`` tags traces for the active bridge. When both
backends are enabled, the same turn/tool boundaries are mirrored into MLflow.

1. Turn traces (top-level generations):
- ONE trace per turn (SDK may send multiple assistant messages during streaming;
a guard prevents duplicate traces for the same turn)
- Named ``llm_interaction`` (turn number stored in metadata)
- First assistant message for a turn creates the trace; later ones are ignored
until ``end_turn()`` clears the active turn
- Final trace update contains the authoritative turn number and usage from the
SDK result message (e.g. ``ResultMessage``)
- Canonical usage format with separate cache token fields for accurate cost
- Traces grouped by ``session_id`` via ``propagate_attributes()`` (Langfuse);
MLflow uses the same session/user tags on spans

2. Tool spans (within the current turn trace):
- Named ``tool_<name>`` (e.g. ``tool_Read``, ``tool_Write``, ``tool_Bash``)
- Reflect tool execution in real time
- NO usage/cost on tool spans (avoids double-counting vs turn-level usage)

Architecture:
- Session-based grouping via propagate_attributes() with session_id and user_id
- Each turn creates ONE independent trace (not nested under session)
- Langfuse automatically aggregates tokens and costs across all traces with same session_id
- Filter by session_id, user_id, model, or metadata.turn in Langfuse UI
- Sessions can be paused/resumed: each turn creates a trace regardless of session lifecycle

Trace Hierarchy:
claude_interaction (trace - generation, metadata: {turn: 1})
├── tool_Read (observation - span)
└── tool_Write (observation - span)

claude_interaction (trace - generation, metadata: {turn: 2})
└── tool_Bash (observation - span)

Usage Format:
- Session-based grouping via ``propagate_attributes()`` with ``session_id`` and
``user_id`` (Langfuse); MLflow sets equivalent attributes/tags on spans
- Each turn is ONE independent trace (not nested under a single parent session trace)
- Langfuse aggregates tokens/costs across traces sharing ``session_id``; filter by
``session_id``, ``user_id``, model, or ``metadata.turn`` in the Langfuse UI
- Sessions can be paused/resumed: each turn still gets a trace when it runs

Trace hierarchy (conceptual):
llm_interaction (generation, metadata: {turn: 1})
├── tool_Read (observation / span)
└── tool_Write (observation / span)

llm_interaction (generation, metadata: {turn: 2})
└── tool_Bash (observation / span)

Usage format (turn-level):
{
"input": int, # Regular input tokens
"output": int, # Output tokens
Expand Down Expand Up @@ -65,6 +71,18 @@
# Alias for tests and legacy imports
_privacy_masking_function = privacy_mask_message_data


def _runner_type_slug() -> str:
"""Stable label from ``RUNNER_TYPE`` (see ``main.BRIDGE_REGISTRY``)."""
return os.getenv("RUNNER_TYPE", "claude-agent-sdk").strip().lower() or "unknown"


# Langfuse / MLflow turn trace name — not tied to a single vendor SDK.
TURN_TRACE_NAME = "llm_interaction"
SESSION_METRICS_SPAN_NAME = "Session Metrics"
# Metadata ``source`` for session-level metric spans (Langfuse + MLflow).
SESSION_METRICS_SOURCE = "ambient-runner-metrics"

# Canonical token key names used across usage dicts from the Claude Agent SDK.
_TOKEN_KEYS = (
"input_tokens",
Expand All @@ -80,7 +98,7 @@ def is_langfuse_enabled() -> bool:


class ObservabilityManager:
"""Manages Langfuse observability for Claude sessions."""
"""Manages Langfuse and/or MLflow observability for agent sessions."""

def __init__(self, session_id: str, user_id: str, user_name: str):
"""Initialize observability manager.
Expand Down Expand Up @@ -185,6 +203,7 @@ async def initialize(
workflow_branch=workflow_branch,
workflow_path=workflow_path,
mask_fn=mask_fn,
runner_type=_runner_type_slug(),
)
except Exception as e:
logging.warning(
Expand Down Expand Up @@ -261,7 +280,7 @@ def _initialize_langfuse(
"initial_prompt": prompt[:200] if len(prompt) > 200 else prompt,
}

tags = ["claude-code", f"namespace:{namespace}"]
tags = [f"runner:{_runner_type_slug()}", f"namespace:{namespace}"]

if model:
sanitized_model = sanitize_model_name(model)
Expand Down Expand Up @@ -351,7 +370,7 @@ def _has_active_turn(self) -> bool:

@staticmethod
def _extract_assistant_text(message: Any) -> str:
"""Extract assistant text from a Claude SDK message (or best-effort without SDK)."""
"""Extract assistant text from an agent SDK message (or best-effort without SDK)."""
try:
from claude_agent_sdk import TextBlock
except ImportError:
Expand Down Expand Up @@ -421,10 +440,10 @@ def start_turn(self, model: str, user_input: str | None = None) -> None:
self._current_turn_ctx = (
self.langfuse_client.start_as_current_observation(
as_type="generation",
name="claude_interaction",
name=TURN_TRACE_NAME,
input=input_content,
model=model,
metadata={},
metadata={"runner_type": _runner_type_slug()},
)
)
self._current_turn_generation = self._current_turn_ctx.__enter__()
Expand All @@ -445,28 +464,58 @@ def start_turn(self, model: str, user_input: str | None = None) -> None:
self._mlflow.start_turn(model, resolved_input)
except Exception as e:
logging.warning("MLflow: start_turn failed: %s", e, exc_info=True)
else:
# Langfuse sets _last_trace_id when _current_turn_generation exists; for
# MLflow-only runs, persist the active MLflow trace id for middleware/feedback.
if not self._current_turn_generation:
self._sync_last_trace_id_from_mlflow()

def _sync_last_trace_id_from_mlflow(self) -> None:
"""Set _last_trace_id from MLflow when a turn span is active (MLflow-only path)."""
if not self.mlflow_tracing_active or self._mlflow is None:
return
if not self._mlflow.has_active_turn:
return
try:
import mlflow

tid = mlflow.get_active_trace_id()
if tid:
self._last_trace_id = tid
except Exception as e:
logging.debug("MLflow: could not read active trace id: %s", e)

def get_current_trace_id(self) -> str | None:
"""Get the current turn's trace ID for feedback association.

Returns:
The Langfuse trace ID if a turn is active, None otherwise.
Langfuse trace ID when a Langfuse turn is active; otherwise the MLflow
active trace ID when MLflow tracing is on and a turn span is open.
"""
if not self._current_turn_generation:
return None
if self._current_turn_generation:
try:
return getattr(self._current_turn_generation, "trace_id", None)
except Exception:
return None
if (
self.mlflow_tracing_active
and self._mlflow is not None
and self._mlflow.has_active_turn
):
try:
import mlflow

# The generation object has a trace_id attribute
try:
return getattr(self._current_turn_generation, "trace_id", None)
except Exception:
return None
return mlflow.get_active_trace_id()
except Exception:
return None
return None

@property
def last_trace_id(self) -> str | None:
"""Most recent Langfuse trace ID (persists after turn ends).
"""Most recent trace ID for the active backends (persists after turn ends).

Used by the feedback endpoint to attach scores to the correct trace
without requiring the backend to scan the event log.
Langfuse or MLflow depending on configuration; used by the feedback endpoint
and AG-UI trace events when the runner owns the correlation id.
"""
return self._last_trace_id

Expand Down Expand Up @@ -495,7 +544,7 @@ def end_turn(

Args:
turn_count: Current turn number (from SDK's authoritative num_turns in ResultMessage)
message: AssistantMessage from Claude SDK
message: Assistant message from the active agent SDK
usage: Usage dict from ResultMessage with input_tokens, output_tokens, cache tokens, etc.
"""
if not self.langfuse_client and not self.mlflow_tracing_active:
Expand Down Expand Up @@ -871,7 +920,8 @@ def _emit_session_summary(self) -> None:
scores = metric.to_flat_scores()

span_metadata = {
"source": "claude-code-metrics",
"source": SESSION_METRICS_SOURCE,
"runner_type": _runner_type_slug(),
"session_id": self.session_id,
"user_id": self.user_id,
"namespace": self.namespace,
Expand All @@ -883,7 +933,7 @@ def _emit_session_summary(self) -> None:

if self.langfuse_client:
with self.langfuse_client.start_as_current_span(
name="Claude Code - Session Metrics",
name=SESSION_METRICS_SPAN_NAME,
input={
"session_id": self.session_id,
"user_id": self.user_id,
Expand Down
4 changes: 2 additions & 2 deletions components/runners/ambient-runner/tests/test_observability.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ async def test_init_successful(
call_kwargs = mock_propagate.call_args[1]
assert call_kwargs["user_id"] == manager.user_id
assert call_kwargs["session_id"] == manager.session_id
assert "claude-code" in call_kwargs["tags"]
assert "runner:claude-agent-sdk" in call_kwargs["tags"]

assert "Session tracking enabled" in caplog.text

Expand Down Expand Up @@ -291,7 +291,7 @@ def test_start_turn_creates_generation(self, manager):
mock_client.start_as_current_observation.assert_called_once()
call_kwargs = mock_client.start_as_current_observation.call_args[1]
assert call_kwargs["as_type"] == "generation"
assert call_kwargs["name"] == "claude_interaction"
assert call_kwargs["name"] == "llm_interaction"
assert call_kwargs["model"] == "claude-3-5-sonnet"

assert manager._current_turn_generation is not None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -369,10 +369,11 @@ def test_emit_summary_with_metrics(self, manager_with_langfuse):
# Verify Langfuse span was created
assert mgr.langfuse_client.start_as_current_span.called
call_kwargs = mgr.langfuse_client.start_as_current_span.call_args[1]
assert call_kwargs["name"] == "Claude Code - Session Metrics"
assert call_kwargs["name"] == "Session Metrics"

meta = call_kwargs["metadata"]
assert meta["source"] == "claude-code-metrics"
assert meta["source"] == "ambient-runner-metrics"
assert meta["runner_type"] == "claude-agent-sdk"
# Verify consolidated metadata matches trace-level fields
assert meta["namespace"] == ""
assert meta["user_name"] == "Test User"
Expand Down
Loading