# Agent Evaluation with Langfuse Tracing

This notebook demonstrates how to evaluate AI agents using **Langfuse** for observability and tracing.

1. **Langfuse Setup** - Installing and configuring Langfuse for tracing
2. **Agent Implementation** - Building a simple agent
3. **Manual Trace Capture** - Capture a trace manually and how to read it
4. **Automatic Trace Capture** - Capture a trace with callback handler.
5. **Test Dataset Creation** - Defining test cases with expected behaviors
6. **Trajectory Evaluation** - Validating tool calls and execution paths
7. **Response Evaluation** - Assessing agent responses using LLM-as-a-Judge

## Why Agent Evaluation?

Before increasing the complexity of your agent architecture, you should validate the need for additions. 

This is done by:

- Capture agent traces
- Defining a test set capturing desired use cases
- Running evaluations against the agent
- Measuring trajectory accuracy and response quality

## 1. Setup & Installation

### Install Required Packages

In [None]:
# Install required packages
!pip install -q langfuse langchain langgraph langchain_ibm pandas python-dotenv matplotlib seaborn
!pip install -q "git+https://github.com/ibm-granite-community/utils.git"

### Import Dependencies

In [None]:
import os
import uuid
import asyncio
from typing import Optional
import nest_asyncio
import requests
from dotenv import load_dotenv
from langfuse import Langfuse
import seaborn as sns
from IPython.display import Image, display
from typing import Any, Dict
from typing import Annotated, TypedDict
from ibm_granite_community.notebook_utils import get_env_var
from langchain_core.utils.utils import convert_to_secret_str
from langchain_core.messages import AnyMessage, AIMessage, HumanMessage
from langchain.chat_models import init_chat_model
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langgraph.graph.state import CompiledStateGraph
from langgraph.prebuilt import ToolNode

load_dotenv()
nest_asyncio.apply()

model = "ibm/granite-4-h-small"

model_parameters = {
    "temperature": 0,
    "max_completion_tokens": 200,
    "repetition_penalty": 1.05,
}

### Langfuse Setup

#### Launch Langfuse with Docker

Before running this notebook, start Langfuse locally:

```bash
# Clone Langfuse repository
git clone https://github.com/langfuse/langfuse.git
cd langfuse

# Start Langfuse with Docker Compose
docker-compose up
```

### Configure Langfuse

1. Open http://localhost:3000 in your browser
2. Sign up or log in
3. Create a new project (e.g., "Agent Evaluation")
4. Navigate to **Settings → API Keys**
5. Create new API keys and copy:
   - Public Key
   - Secret Key
   - Host URL (http://localhost:3000)

### Configure watsonx.ai

You'll need the following environment variables for watsonx.ai:
- `WATSONX_URL`: Your watsonx.ai endpoint URL
- `WATSONX_APIKEY`: Your watsonx.ai API key
- `WATSONX_PROJECT_ID`: Your watsonx.ai project ID

Set these in a `.env` file or export them as environment variables.

### Set Environment Variables

### Initialize Langfuse Client

In [None]:
# Initialize Langfuse client
langfuse_client = Langfuse(
    public_key=os.environ['LANGFUSE_PUBLIC_KEY'],
    secret_key=os.environ['LANGFUSE_SECRET_KEY'],
    host=os.environ['LANGFUSE_HOST']
)

## 2. Agent Implementation

### Build a Minimal Function Calling Agent

Create the FC agent using LangChain's `create_agent` from [Function_Calling](https://github.com/ibm-granite-community/granite-agent-cookbook/blob/main/recipes/Function_Calling/Function_Calling_Agent.ipynb) recipe

In [None]:
AV_STOCK_API_KEY = convert_to_secret_str(get_env_var("AV_STOCK_API_KEY", "unset"))

WEATHER_API_KEY = convert_to_secret_str(get_env_var("WEATHER_API_KEY", "unset"))

llm = init_chat_model(
    model=model,
    model_provider="ibm",
    url=convert_to_secret_str(get_env_var("WATSONX_URL")),
    apikey=convert_to_secret_str(get_env_var("WATSONX_APIKEY")),
    project_id=get_env_var("WATSONX_PROJECT_ID"),
    params=model_parameters,
)

class State(TypedDict, total=False):
    messages: Annotated[list[AnyMessage], add_messages]

def get_stock_price(ticker: str, date: str) -> dict:
    """
    Retrieves the lowest and highest stock prices for a given ticker and date.

    Args:
        ticker: The stock ticker symbol, for example, "IBM".
        date: The date in "YYYY-MM-DD" format for which you want to get stock prices.

    Returns:
        A dictionary containing the low and high stock prices on the given date.
    """
    print(f"Getting stock price for {ticker} on {date}")

    apikey = AV_STOCK_API_KEY.get_secret_value()
    if apikey == "unset":
        print("No API key present; using a fixed, predetermined value for demonstration purposes")
        return {
            "low": "245.4500",
            "high": "249.0300"
        }

    try:
        stock_url = f"https://www.alphavantage.co/query?function=TIME_SERIES_DAILY&symbol={ticker}&apikey={apikey}"
        stock_data = requests.get(stock_url)
        data = stock_data.json()
        stock_low = data["Time Series (Daily)"][date]["3. low"]
        stock_high = data["Time Series (Daily)"][date]["2. high"]
        return {
            "low": stock_low,
            "high": stock_high
        }
    except Exception as e:
        print(f"Error fetching stock data: {e}")
        return {
            "low": "$245.45",
            "high": "$249.03"
        }


def get_current_weather(location: str) -> dict:
    """
    Fetches the current weather for a given location (default: San Francisco).

    Args:
        location: The name of the city for which to retrieve the weather information.

    Returns:
        A dictionary containing weather information such as temperature in celsius, weather description, and humidity.
    """
    print(f"Getting current weather for {location}")
    apikey=WEATHER_API_KEY.get_secret_value()
    if apikey == "unset":
        print("No API key present; using a fixed, predetermined value for demonstration purposes")
        return {
            "description": "thunderstorms",
            "temperature": 25.3,
            "humidity": 94
        }

    try:
        # API request to fetch weather data
        weather_url = f"https://api.openweathermap.org/data/2.5/weather?q={location}&appid={apikey}&units=metric"
        weather_data = requests.get(weather_url)
        data = weather_data.json()
        # Extracting relevant weather details
        weather_description = data["weather"][0]["description"]
        temperature = data["main"]["temp"]
        humidity = data["main"]["humidity"]

        # Returning weather details
        return {
            "description": weather_description,
            "temperature": temperature,
            "humidity": humidity
        }
    except Exception as e:
        print(f"Error fetching weather data: {e}")
        return {
            "description": "none",
            "temperature": "none",
            "humidity": "none"
        }


def route_tools(state: State) -> str:
    """
    This is conditional_edge function to route to the ToolNode if the last message
    in the state has tool calls. Otherwise, route to the END node to complete the
    workflow.
    """
    messages = state.get("messages")
    if not messages:
        raise ValueError(f"No messages found in input state to tool_edge: {state}")

    last_message = messages[-1]
    # If the last message is from the model and it contains a tool call request
    if isinstance(last_message, AIMessage) and len(last_message.tool_calls) > 0:
        return "tools"
    return END


def create_llm():
    """Create and return the LLM instance."""
    return init_chat_model(
        model=MODEL,
        model_provider="ibm",
        url=convert_to_secret_str(get_env_var("WATSONX_URL")),
        apikey=convert_to_secret_str(get_env_var("WATSONX_APIKEY")),
        project_id=get_env_var("WATSONX_PROJECT_ID"),
        params=MODEL_PARAMETERS,
    )

def llm_node(state: State) -> State:
    messages = state["messages"]
    response_message = llm_with_tools.invoke(messages)
    state_update = State(messages=[response_message])
    return state_update

### Build graph

In [None]:
tools = [get_stock_price, get_current_weather]
llm_with_tools = llm.bind_tools(tools)
tool_node = ToolNode(tools=tools)

graph_builder = StateGraph(State)
graph_builder.add_node("llm", llm_node)
graph_builder.add_node("tools", tool_node)
graph_builder.add_edge(START, "llm")
graph_builder.add_conditional_edges(
    "llm",
    route_tools,
    # The following dictionary lets you tell the graph to interpret the condition's outputs as a specific node
    # It defaults to the identity function, but if you
    # want to use a node named something else apart from "tools",
    # You can update the value of the dictionary to something else
    # e.g., "tools": "my_tools"
    {
        "tools": "tools",
        END: END,
    },
)
graph_builder.add_edge("tools", "llm")


graph: CompiledStateGraph[State] = graph_builder.compile()

display(Image(graph.get_graph().draw_mermaid_png()))

def function_calling_agent(graph: CompiledStateGraph, user_input: str):
    user_message = HumanMessage(user_input)
    print(user_message.pretty_repr())
    input = State(messages=[user_message])
    for event in graph.stream(input):
        for value in event.values():
            print(value["messages"][-1].pretty_repr())

### Run the Agent

In [None]:
function_calling_agent(graph, "What is the weather in Miami?")

### Create a Langfuse experiment based on the output

In [None]:
from langfuse import get_client
from langfuse import Evaluation

def my_task(*, item, **kwargs):
    user_message = HumanMessage(item["input"])
    response = graph.invoke( {"messages": [user_message]})
    return response.get('messages')[-1].content
    
    
# Initialize client
langfuse = get_client()

# Run experiment on local data
local_data = [
    {"input": "What were the IBM stock prices on September 5, 2025?", "expected_output": "On September 5, 2025, the stock price of IBM ranged from a low of $245.45 to a high of $249.03."},
]

# Define evaluation functions
def accuracy_evaluator(*, input, output, expected_output, metadata, **kwargs):
    if expected_output and expected_output.lower() in output.lower():
        return Evaluation(name="accuracy", value=1.0, comment="Correct answer found")

    return Evaluation(name="accuracy", value=0.0, comment="Incorrect answer")


def length_evaluator(*, input, output, **kwargs):
    return Evaluation(name="response_length", value=len(output), comment=f"Response has {len(output)} characters")
    
result = langfuse.run_experiment(
    name="Stock Market Quizz",
    description="Testing basic functionality",
    data=local_data,
    task=my_task,
    evaluators=[accuracy_evaluator, length_evaluator]
)

### Review experiment in Langfuse UI

After running the experiment, you can view the results in the Langfuse dashboard.
The experiment tracked two key metrics:

**Accuracy**: Whether the model's response contained the expected stock price information

**Response Length**: The character count of each response, helping assess verbosity

![experiment](experiment.png)

## 3. Send a Trace to langfuse Manually

### Managing Langfuse observations for LangGraph events

Langfuse defines three types of observations, each serving a specific purpose:

1. **Events**

Events are the basic building blocks used to track discrete occurrences in a trace. They represent point-in-time actions without duration.

Use cases:
- Logging custom application events
- Recording state changes
- Tracking decision points or checkpoints

2. **Spans**

Spans represent durations of units of work in a trace.
They track operations that have a start time and end time, providing performance insights.

Common span types:
- Chain executions (sequences of operations)
- Tool invocations (external API calls, database queries)
- Retriever operations (vector database searches)
- Agent decision loops

3. **Generations**

Generations are specialized spans designed specifically for AI model interactions.

They capture comprehensive details about language model calls.

Generation details include:

- Model name and version
- Model parameters (temperature, max tokens, top-p)
- Complete prompt or message history
- Generated output text
- Token usage (input tokens, output tokens, total tokens)
- Cost calculation based on token usage

In [None]:
class LangfuseObservationTracker:
    """
    Observation tracker

    This class manages Langfuse observations for LangGraph events,
    tracking run IDs, parent relationships, and observation lifecycles
    """

    def __init__(self, client, root_span=None):
        """
        Initialize the observation tracker.

        Args:
            client: The Langfuse client instance
            root_span: Optional root span for the trace
        """
        self.client = client
        self.root_span = root_span

        # Track active observations by run_id (mirrors LangChain's self.runs)
        self.runs: Dict[str, Any] = {}

        # Track parent-child relationships (mirrors LangChain's _child_to_parent_run_id_map)
        self._child_to_parent_run_id_map: Dict[str, Optional[str]] = {}

        # Track context managers for proper cleanup
        self.context_managers: Dict[str, Any] = {}

        # Track completion start times for streaming (mirrors LangChain's updated_completion_start_time_memo)
        self.updated_completion_start_time_memo: set = set()

        # Store the last trace ID
        self.last_trace_id: Optional[str] = None

    def _get_observation_type(self, event_type: str, name: str, serialized: Optional[Dict] = None) -> str:
        """
        Determine Langfuse observation type from event type and name.

        Args:
            event_type: The LangGraph event type
            name: The component name
            serialized: Optional serialized component data

        Returns:
            The appropriate Langfuse observation type
        """
        if "tool" in event_type:
            return "tool"
        elif "chat_model" in event_type or "llm" in event_type:
            return "generation"
        elif "retriever" in event_type:
            return "retriever"
        elif "chain" in event_type:
            # Check if it's an agent
            if "agent" in name.lower():
                return "agent"
            return "chain"
        return "span"

    def _get_parent_observation(self, parent_run_id: Optional[str]):
        """
        Get the parent observation for nesting.

        Args:
            parent_run_id: The parent run ID

        Returns:
            The parent observation or client for starting new observations
        """
        if parent_run_id and parent_run_id in self.runs:
            return self.runs[parent_run_id]
        if self.root_span:
            return self.root_span
        return self.client

    def _attach_observation(self, run_id: str, observation, context_manager=None):
        """
        Store an observation for tracking.

        Args:
            run_id: The run ID
            observation: The Langfuse observation
            context_manager: Optional context manager for cleanup
        """
        self.runs[run_id] = observation
        if context_manager:
            self.context_managers[run_id] = context_manager

        if hasattr(observation, 'trace_id'):
            self.last_trace_id = observation.trace_id

    def _detach_observation(self, run_id: str):
        """
        Remove and return an observation.

        Args:
            run_id: The run ID

        Returns:
            Tuple of (observation, context_manager) or (None, None)
        """
        observation = self.runs.pop(run_id, None)
        context_manager = self.context_managers.pop(run_id, None)
        return observation, context_manager

    def on_chain_start(
        self,
        run_id: str,
        name: str,
        inputs: Any,
        parent_run_id: Optional[str] = None,
        metadata: Optional[Dict] = None,
        tags: Optional[list] = None,
    ):
        """
        Handle chain/node start event.
        """
        self._child_to_parent_run_id_map[run_id] = parent_run_id

        obs_type = self._get_observation_type("chain", name)

        # Build metadata
        span_metadata = {}
        if tags:
            span_metadata["tags"] = tags
        if metadata:
            span_metadata.update(metadata)

        # Create observation
        obs_context = self.client.start_as_current_observation(
            as_type=obs_type,
            name=name,
            input=inputs,
            metadata=span_metadata if span_metadata else None,
        )
        obs = obs_context.__enter__()
        self._attach_observation(run_id, obs, obs_context)

        return obs

    def on_chain_end(
        self,
        run_id: str,
        outputs: Any,
    ):
        """
        Handle chain/node end event.
        """
        obs, obs_context = self._detach_observation(run_id)

        if obs is not None:
            obs.update(output=outputs)
            if obs_context:
                obs_context.__exit__(None, None, None)

        return obs

    def on_chain_error(
        self,
        run_id: str,
        error: BaseException,
    ):
        """
        Handle chain/node error event.
        """
        obs, obs_context = self._detach_observation(run_id)

        if obs is not None:
            obs.update(
                level="ERROR",
                status_message=str(error),
            )
            if obs_context:
                obs_context.__exit__(None, None, None)

        return obs

    def on_chat_model_start(
        self,
        run_id: str,
        name: str,
        messages: Any,
        parent_run_id: Optional[str] = None,
        model_name: Optional[str] = None,
        model_parameters: Optional[Dict] = None,
        metadata: Optional[Dict] = None,
    ):
        """
        Handle LLM/chat model start event.
        """
        self._child_to_parent_run_id_map[run_id] = parent_run_id

        obs_context = self.client.start_as_current_observation(
            as_type="generation",
            name=name,
            input=messages,
            model=model_name,
            model_parameters=model_parameters,
            metadata=metadata,
        )
        obs = obs_context.__enter__()
        self._attach_observation(run_id, obs, obs_context)

        return obs

    def on_chat_model_end(
        self,
        run_id: str,
        output: Any,
        usage: Optional[Dict] = None,
        model: Optional[str] = None,
    ):
        """
        Handle LLM/chat model end event.
        """
        obs, obs_context = self._detach_observation(run_id)

        if obs is not None:
            update_kwargs = {"output": output}
            if usage:
                update_kwargs["usage"] = usage
                update_kwargs["usage_details"] = usage
            if model:
                update_kwargs["model"] = model

            obs.update(**update_kwargs)
            if obs_context:
                obs_context.__exit__(None, None, None)

        # Clean up streaming memo
        self.updated_completion_start_time_memo.discard(run_id)

        return obs

    def on_llm_new_token(self, run_id: str):
        """
        Handle streaming token event - update completion start time.
        """
        if run_id in self.runs and run_id not in self.updated_completion_start_time_memo:
            obs = self.runs[run_id]
            from langfuse._utils import _get_timestamp
            obs.update(completion_start_time=_get_timestamp())
            self.updated_completion_start_time_memo.add(run_id)

    def on_tool_start(
        self,
        run_id: str,
        name: str,
        input_str: Any,
        parent_run_id: Optional[str] = None,
        metadata: Optional[Dict] = None,
    ):
        """
        Handle tool start event.
        """
        self._child_to_parent_run_id_map[run_id] = parent_run_id

        obs_context = self.client.start_as_current_observation(
            as_type="tool",
            name=name,
            input=input_str,
            metadata=metadata,
        )
        obs = obs_context.__enter__()
        self._attach_observation(run_id, obs, obs_context)

        return obs

    def on_tool_end(
        self,
        run_id: str,
        output: Any,
    ):
        """
        Handle tool end event.
        """
        obs, obs_context = self._detach_observation(run_id)

        if obs is not None:
            obs.update(output=output)
            if obs_context:
                obs_context.__exit__(None, None, None)

        return obs

    def on_tool_error(
        self,
        run_id: str,
        error: BaseException,
    ):
        """
        Handle tool error event.
        """
        obs, obs_context = self._detach_observation(run_id)

        if obs is not None:
            obs.update(
                level="ERROR",
                status_message=str(error),
            )
            if obs_context:
                obs_context.__exit__(None, None, None)

        return obs

    def cleanup(self):
        """Clean up any remaining observations."""
        for run_id in list(self.runs.keys()):
            obs, obs_context = self._detach_observation(run_id)
            if obs_context:
                try:
                    obs_context.__exit__(None, None, None)
                except Exception:
                    pass



In [None]:
def _parse_usage_from_response(output) -> Optional[Dict]:
    """
    Parse usage information from LLM response.
    """
    if not output:
        return None

    usage = None

    # Try to get from response_metadata
    if hasattr(output, "response_metadata"):
        response_metadata = output.response_metadata

        # Standard LangChain format
        token_usage = response_metadata.get("token_usage", {})
        if token_usage:
            usage = {
                "input": token_usage.get("prompt_tokens", 0),
                "output": token_usage.get("completion_tokens", 0),
                "total": token_usage.get("total_tokens", 0),
            }

        # Alternative formats
        if not usage and "usage" in response_metadata:
            raw_usage = response_metadata["usage"]
            if isinstance(raw_usage, dict):
                usage = {
                    "input": raw_usage.get("input_tokens", raw_usage.get("prompt_tokens", 0)),
                    "output": raw_usage.get("output_tokens", raw_usage.get("completion_tokens", 0)),
                    "total": raw_usage.get("total_tokens", 0),
                }

    # Try to get from usage_metadata (Ollama, etc.)
    if not usage and hasattr(output, "usage_metadata"):
        usage_metadata = output.usage_metadata
        if usage_metadata:
            usage = {
                "input": getattr(usage_metadata, "input_tokens", 0),
                "output": getattr(usage_metadata, "output_tokens", 0),
                "total": getattr(usage_metadata, "total_tokens", 0),
            }

    return usage


def _format_llm_output(output) -> Dict:
    """
    Format LLM output for Langfuse.
    """
    if not output:
        return output

    if hasattr(output, "content"):
        formatted = {"content": output.content}

        if hasattr(output, "tool_calls") and output.tool_calls:
            formatted["tool_calls"] = [
                {
                    "name": tc.get("name", ""),
                    "args": tc.get("args", {}),
                    "id": tc.get("id", ""),
                }
                for tc in output.tool_calls
            ]
            formatted["finish_reason"] = "tool_calls"

        return formatted

    return output



In [None]:
async def test_event_generator(graph:CompiledStateGraph,user_input:str):
    """
    Async test function with proper Langfuse tracing using callback handler pattern.

    This function implements Langfuse tracing:
    1. Uses LangfuseObservationTracker to manage observation lifecycles
    2. Tracks run IDs and parent relationships
    3. Creates proper nested observations (chain → generation → tool)
    4. Handles start/end events for each observation type

    The tracing pattern mirrors LangChain's approach:
    - on_chain_start/end: Creates chain observations for nodes
    - on_chat_model_start/end: Creates generation observations for LLM calls
    - on_tool_start/end: Creates tool observations for tool executions
    """
    print("\n" + "=" * 80)
    print("TESTING MANUAL TRACING OF AGENT EXECUTION EVENTS")
    print("=" * 80 + "\n")

    # Create test input
    user_message = HumanMessage(user_input)
    input_state = State(messages=[user_message])
    session_id = str(uuid.uuid4())

    print(f"User input: {user_input}")
    print(f"Session ID: {session_id}")
    print("\nStreaming events with Langfuse tracing...\n")

    # =========================================================================
    # ROOT TRACE: Create the main trace for the entire execution
    # =========================================================================
    with langfuse_client.start_as_current_observation(
        as_type="chain",
        name="Manual_Trace",
        input={"messages": [{"content": user_input, "type": "human"}]},
        metadata={"session_id": session_id}
    ) as root_span:

        # Initialize observation tracker
        tracker = LangfuseObservationTracker(langfuse_client, root_span)

        final_output = None
        step_count = 0

        # =====================================================================
        # STREAM EVENTS: Process events
        # =====================================================================
        async for event in graph.astream_events(input_state, version="v2"):
            event_type = event["event"]
            name = event.get("name", "unknown")
            run_id = event.get("run_id", "")
            parent_ids = event.get("parent_ids", [])
            data = event.get("data", {})
            metadata = event.get("metadata", {})
            tags = event.get("tags", [])

            # Determine parent run_id from parent_ids list
            parent_run_id = parent_ids[-1] if parent_ids else None

            # =================================================================
            # ON_CHAIN_START
            # =================================================================
            if event_type == "on_chain_start":
                # Skip root graph (already have root_span)
                if name == "Manual_Trace":
                    tracker._attach_observation(run_id, root_span)
                    continue

                step_count += 1
                print(f"[CHAIN START] {name} (run_id: {run_id[:8]}...)")

                tracker.on_chain_start(
                    run_id=run_id,
                    name=name,
                    inputs=data.get("input"),
                    parent_run_id=parent_run_id,
                    metadata=metadata,
                    tags=tags,
                )

            # =================================================================
            # ON_CHAIN_END
            # =================================================================
            elif event_type == "on_chain_end":
                if name == "Manual_Trace":
                    final_output = data.get("output")
                    continue

                print(f"[CHAIN END] {name}")

                tracker.on_chain_end(
                    run_id=run_id,
                    outputs=data.get("output"),
                )

            # =================================================================
            # ON_CHAIN_ERROR
            # =================================================================
            elif event_type == "on_chain_error":
                print(f"[CHAIN ERROR] {name}")

                error = data.get("error", Exception("Unknown error"))
                tracker.on_chain_error(run_id=run_id, error=error)

            # =================================================================
            # ON_CHAT_MODEL_START
            # =================================================================
            elif event_type == "on_chat_model_start":
                print(f"[LLM START] {name}")

                # Extract model info from metadata
                model_name = metadata.get("ls_model_name", "unknown")

                # Extract messages
                messages = data.get("messages", data.get("input", []))

                tracker.on_chat_model_start(
                    run_id=run_id,
                    name=name,
                    messages=messages,
                    parent_run_id=parent_run_id,
                    model_name=model_name,
                    metadata=metadata,
                )

            # =================================================================
            # ON_CHAT_MODEL_STREAM
            # =================================================================
            elif event_type == "on_chat_model_stream":
                # Update completion start time on first token
                tracker.on_llm_new_token(run_id)

                chunk = data.get("chunk")
                if chunk and hasattr(chunk, "content") and chunk.content:
                    print(f"[LLM STREAM] {chunk.content[:50]}...")

            # =================================================================
            # ON_CHAT_MODEL_END
            # =================================================================
            elif event_type == "on_chat_model_end":
                print(f"[LLM END] {name}")

                output = data.get("output")

                # Parse usage
                usage = _parse_usage_from_response(output)

                # Format output
                formatted_output = _format_llm_output(output)

                # Extract model from response if available
                model = None
                if output and hasattr(output, "response_metadata"):
                    model = output.response_metadata.get("model_name")

                tracker.on_chat_model_end(
                    run_id=run_id,
                    output=formatted_output,
                    usage=usage,
                    model=model,
                )

            # =================================================================
            # ON_LLM_ERROR
            # =================================================================
            elif event_type == "on_llm_error":
                print(f"[LLM ERROR] {name}")

                error = data.get("error", Exception("Unknown error"))
                obs, obs_context = tracker._detach_observation(run_id)
                if obs is not None:
                    obs.update(level="ERROR", status_message=str(error))
                    if obs_context:
                        obs_context.__exit__(None, None, None)

            # =================================================================
            # ON_TOOL_START
            # =================================================================
            elif event_type == "on_tool_start":
                print(f"[TOOL START] {name}")

                tracker.on_tool_start(
                    run_id=run_id,
                    name=name,
                    input_str=data.get("input"),
                    parent_run_id=parent_run_id,
                    metadata=metadata,
                )

            # =================================================================
            # ON_TOOL_END
            # =================================================================
            elif event_type == "on_tool_end":
                print(f"[TOOL END] {name}")

                tracker.on_tool_end(
                    run_id=run_id,
                    output=data.get("output"),
                )

            # =================================================================
            # ON_TOOL_ERROR
            # =================================================================
            elif event_type == "on_tool_error":
                print(f"[TOOL ERROR] {name}")

                error = data.get("error", Exception("Unknown error"))
                tracker.on_tool_error(run_id=run_id, error=error)

            # =================================================================
            # ON_RETRIEVER_START
            # =================================================================
            elif event_type == "on_retriever_start":
                print(f"[RETRIEVER START] {name}")

                obs_context = langfuse_client.start_as_current_observation(
                    as_type="retriever",
                    name=name,
                    input=data.get("input"),
                    metadata=metadata,
                )
                obs = obs_context.__enter__()
                tracker._attach_observation(run_id, obs, obs_context)

            # =================================================================
            # ON_RETRIEVER_END
            # =================================================================
            elif event_type == "on_retriever_end":
                print(f"[RETRIEVER END] {name}")

                obs, obs_context = tracker._detach_observation(run_id)
                if obs is not None:
                    obs.update(output=data.get("output"))
                    if obs_context:
                        obs_context.__exit__(None, None, None)

        # =====================================================================
        # UPDATE ROOT SPAN WITH FINAL OUTPUT
        # =====================================================================
        root_span.update(output=final_output)

        # =====================================================================
        # CLEANUP
        # =====================================================================
        tracker.cleanup()

        # Extract final response
        if final_output and "messages" in final_output:
            final_message = final_output["messages"][-1]
            final_response = final_message.content if hasattr(final_message, "content") else str(final_message)
        else:
            final_response = str(final_output)

    # =========================================================================
    # FLUSH: Ensure all observations are sent to Langfuse
    # =========================================================================
    langfuse_client.flush()

    print("\n" + "-" * 80)
    print(f"Final response: {final_response}")
    print(f"Total chain steps: {step_count}")
    print(f"Trace ID: {tracker.last_trace_id}")
    print("=" * 80 + "\n")

    return {"session_id": session_id, "output": final_response, "trace_id": tracker.last_trace_id}

### Review Langfuse trace in Langfuse UI

In [None]:
user_input = "What is the weather in Miami?"
langfuse_client = get_client()
asyncio.run(test_event_generator(graph=graph,user_input=user_input))

![manual_trace](langgraph_manual_trace.png)

## 4. Send a Trace to langfuse automatically

### Using CallbackHandler for simplicity

In [None]:
from langfuse.langchain import CallbackHandler

In [None]:
langfuse_handler = CallbackHandler()

In [None]:
user_input = "What is the weather in Miami?"
config = {"callbacks": [langfuse_handler]}
input_state = State(messages=[user_input])

In [None]:
result = graph.invoke(input_state, config=config)
print(result.get('messages')[-1].content)

### Review Langfuse automatic trace in Langfuse UI

![automatic_trace](langgraph_automatic_trace.png)