# Notebook Purpose

This notebook demonstrates the evaluation of an AI agent using the LangFuse platform. It is designed to benchmark agent performance on a golden dataset, track execution traces, and apply custom evaluation metrics.

## Key Tasks
- Connect to LangFuse and verify authentication
- Create and manage evaluation datasets and items
- Define and run the target agent function
- Extract and analyze agent trajectories and context
- Implement custom evaluation metrics for agent outputs
- Log and score results using LangFuse's experiment tracking

## Usage Type
This notebook is intended for **research and prototyping**. It is not optimized for production use.

### LangFuse Evaluation

In [None]:
# Install required package for Gemini API (NOT langchain-google-vertexai)
!pip install langchain-google-genai

# Install Google Generative AI package
!pip install -q -U google-generativeai openevals


In [15]:
from dotenv import load_dotenv

load_dotenv()

True

In [16]:
import os
os.getenv("LANGFUSE_SECRET_KEY")

'sk-lf-efa86e37-6146-4171-a06c-74fd4ebcdd7a'

In [17]:
# Import LangFuse client library for experiment tracking
from langfuse import get_client


In [18]:
# Initialize LangFuse client and verify authentication
langfuse = get_client()

# Verify connection (not recommended for production, as this is synchronous)
if langfuse.auth_check():
    print("Langfuse client is authenticated and ready!")
else:
    print("Authentication failed. Please check your credentials and host.")

Langfuse client is authenticated and ready!


## 2. Dataset Creation and Management


In [19]:
# Define the name for the evaluation dataset
dataset_name = "golden_dataset_agent_evals_with_tools"

#### Create new dataset items if Dataset does not Exist

In [20]:
from datetime import datetime
# Create a new dataset in LangFuse for agent evaluation
try:
    langfuse.get_dataset(dataset_name)
    print("Dataset already exists")
except Exception as e:
    print("Dataset does not exist, Creating Dataset")
    langfuse.create_dataset(
        name=dataset_name,
        description="Evaluation of AI enabler data",
        metadata={
            "author": "UserID:Saumya",
            "date": datetime.now().strftime("%Y-%m-%d"),
            "type": "benchmark"
        }
    )

Dataset already exists


In [21]:
!pip install openpyxl

huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)




In [22]:
# Load the golden dataset for evaluation from a JSON file
import pandas as pd

with open("langfuse_data/golden_dataset/Ground_truth_golden_dataset.xlsx", "rb") as f:
    golden_dataset = pd.read_excel(f)
    golden_data_list = golden_dataset.to_dict(orient="records")

In [23]:
# Display the first item in the loaded golden dataset for inspection
golden_data_list[0]["Question"]

'What can i get for breakfast in Paris?'

In [24]:
# Check the structure and types of the data
print("First item:")
print(golden_data_list[0])
print("\nData types:")
for key, value in golden_data_list[0].items():
    print(f"{key}: {type(value)} = {repr(value)}")

First item:
{'Question': 'What can i get for breakfast in Paris?', 'Answer': '\ufeffCroissant, pain au chocolat, and café au lait', 'Document': './Paris.pdf', 'Paragraph': '§Food & Drink', 'Tool': 'retriever_tool'}

Data types:
Question: <class 'str'> = 'What can i get for breakfast in Paris?'
Answer: <class 'str'> = '\ufeffCroissant, pain au chocolat, and café au lait'
Document: <class 'str'> = './Paris.pdf'
Paragraph: <class 'str'> = '§Food & Drink'
Tool: <class 'str'> = 'retriever_tool'


In [25]:
# Add each item from the golden dataset to LangFuse as a dataset item
# Check if dataset is empty before adding items
if langfuse.get_dataset(dataset_name).items == None or len(langfuse.get_dataset(dataset_name).items) == 0:
    print("Dataset is empty, adding items from golden dataset...")
    for item in golden_data_list:
        # Clean the answer (remove BOM character if present)
        answer = str(item["Answer"]).lstrip('\ufeff')
        
        # Convert tool string to list format for trajectory matching
        tool_value = item["Tool"]
        trajectory = [tool_value.lower()] if isinstance(tool_value, str) else tool_value
        
        langfuse.create_dataset_item(
            dataset_name=dataset_name,
            input=str(item["Question"]),
            expected_output={
                "expected_answer": answer, 
                "trajectory": trajectory,
                "golden_context": str(item["Paragraph"])
            },
            metadata={
                "agent_name": "Datamics Agent",
                "input_type": "text",
                "output_type": "text, list_trajectory"
            }
        )
else:
    print("Dataset already has items, skipping addition.")  

Dataset already has items, skipping addition.


In [26]:
# Note: You can create new evaluation dataset items for queries with poor performance (e.g., high hallucination or low helpfulness).
## Such traces can be filtered and added to the existing or new dataset for further analysis.

## 3. Target Function Definition and Agent Invocation


In [27]:
import os
from graph import app
from langfuse.langchain import CallbackHandler

# Initialize Langfuse CallbackHandler for Langchain (tracing)
langfuse_handler = CallbackHandler()


In [28]:
# Helper function to extract tool call names from agent messages
from typing import Any, List

def extract_tool_calls(messages: List[Any]) -> List[str]:
    """
    Extract tool call names from agent messages.
    Args:
        messages (List[Any]): List of agent messages, each may contain tool_calls.
    Returns:
        List[str]: List of tool call names (lowercase).
    """
    tool_call_names = []
    for message in messages:
        # Check if message is a dict and has tool_calls
        if isinstance(message, dict) and message.get("tool_calls"):
            tool_call_names.extend([call["name"].lower() for call in message["tool_calls"]])
        # Check if message is an object with tool_calls attribute
        elif hasattr(message, "tool_calls") and message.tool_calls:
            tool_call_names.extend([call["name"].lower() for call in message.tool_calls])
    
    return tool_call_names

In [4]:
# Helper function to extract context documents from agent response messages
def extract_qdrant_context(response):
    """
    Extracts context documents from agent response messages.
    Args:
        response (dict): Agent response containing messages.
    Returns:
        dict: Dictionary with a list of document page contents.
    """
    import re
    documents = []
    for msg in response.get("messages", []):
        if (isinstance(msg, dict) and msg.get("name") == "qdrant_retriever") or \
            (hasattr(msg, "name") and msg.name == "qdrant_retriever"):
            content = msg.get("content") if isinstance(msg, dict) else getattr(msg, "content", "")
            matches = re.findall(r"page_content='(.*?)'", content, re.DOTALL)
            documents.extend(matches)

    return {"documents": documents}

In [5]:
# Define the application logic to be evaluated. This function is called for each dataset item.
def target(inputs: dict, langfuse_handler) -> dict:
    """
    Invokes the appliance agent with provided inputs and LangFuse callback handler.
    Args:
        inputs (dict): Input data for the agent (expects a 'question' key).
        langfuse_handler: LangFuse callback handler for tracing.
    Returns:
        dict: Dictionary containing the agent's answer, retrieved context, and tool call trajectory.
    """
    # print("Input question:", inputs["question"])
    response = app.invoke(
        inputs,
        config={"callbacks":[langfuse_handler]}
    )
    context = extract_qdrant_context(response)
    # return response
    return { "answer": response["messages"][-1].content, "context": context, "trajectory": extract_tool_calls(response["messages"]) }

In [6]:
# Test the target function with a sample question to verify agent response and context extraction
target({"messages": "What is famous in Paris?"}, langfuse_handler)

{'answer': [{'type': 'text',
   'text': "I couldn't find information about what is famous in Paris in my internal documents. However, I found some information about Barcelona. Would you like to know what is famous in Barcelona?",
   'extras': {'signature': 'Ci4BcsjafKukgGYFp+w6n5sB/bQC8HGcU+URn8RUXhaA/r4r/yNam1tpT8v10cGECocBAXLI2nwjDaSBezkFYukq+/rZVTd71Yu7jH+DEXKUC5Z3yfuHA5k53DB3mcaf7unMVKYpQab9o2r/cFIgV/DlfuTqP7vl3UQTwWvyFmnEebR4oUTz7dm0+J6RpsEYAHdJa/PdTL1gA6yyX1qv2Y6dqme5Ac3O2iTYio8JMLH55rtlZHzfjun+CtwBAXLI2nwC2xbPqEvaKM7GD4XXiuyRH65rLEazCta3Or614Lrw+DqTggz/xdQ5wf//lfHe3TXKHqt6hPGIjoHyfWVJdP45mtfBeA9a87QeZ6g4ef66x5kK4z/ZI5kRTqcgItbJ/hICe3pAiEUwhNHkZL3sHoQvQAVzNqebcTnJW/vItArgzfod7GK26BE8Y55rOWrDXDPfROl4iNlCh8SY7jYq7xLUN8hT0L81N34EYA/XK2w6tLhXmXe/s/A8nViElI7bbScTZS2R99jbXDFVO1Cf939ZgMzj9hXvmQ=='},
   'index': 0}],
 'context': {'documents': []},
 'trajectory': ['retriever_tool']}

## 4. Custom Evaluation Metrics and Scoring


#### 1. Evaluation: FINAL RESULT : LLM as judge
Defined on LangFuse Interface

### 2. Evaluation: Trajectory

- Compares the actual sequence of steps the agent took against an expected sequence
- Calculates a score based on how many of the expected steps were completed correctly

In [7]:
# Custom evaluation metric: Check if agent trajectory exactly matches expected output
def evaluate_exact_match(outputs: dict, reference_outputs: dict) -> dict:
    """
    Evaluate whether the agent's trajectory exactly matches the expected output.
    Args:
        outputs (dict): Agent output containing trajectory.
        reference_outputs (dict): Reference output containing expected trajectory.
    Returns:
        dict: Result with key, score, and optional error.
    """
    try:
        score = outputs["trajectory"] == reference_outputs["trajectory"]
        return {
            "key": "exact_match", 
            "score": score
        }
    except Exception as e:
        return {
            "key": "exact_match",
            "score": False,
            "error": str(e)
        }

# Custom evaluation metric: Count unmatched steps in agent's output
def evaluate_unmatched_steps(outputs: dict, reference_outputs: dict) -> dict:
    """
    Evaluate the number of unmatched steps in the agent's output compared to reference.
    Args:
        outputs (dict): Agent output containing trajectory.
        reference_outputs (dict): Reference output containing expected trajectory.
    Returns:
        dict: Result with key, score, and optional error.
    """
    try:
        i = j = 0
        unmatched_steps = 0

        while i < len(reference_outputs['trajectory']) and j < len(outputs['trajectory']):
            if reference_outputs['trajectory'][i] == outputs['trajectory'][j]:
                i += 1  # Match found, move to the next step in reference trajectory
            else:
                unmatched_steps += 1  # Step is not part of the reference trajectory
            j += 1  # Always move to the next step in outputs trajectory

        # Count remaining unmatched steps in outputs beyond the comparison loop
        unmatched_steps += len(outputs['trajectory']) - j

        return {
            "key": "unmatched_steps",
            "score": unmatched_steps,
        }
    except Exception as e:
        return {
            "key": "unmatched_steps",
            "score": None,
            "error": str(e)
        }

# Custom evaluation metric: Check if tool execution order matches expected trajectory
def evaluate_tool_order(outputs: dict, reference_outputs: dict) -> dict:
    """
    Evaluate if the order of tool execution matches the expected trajectory order.
    Args:
        outputs (dict): Agent output containing trajectory.
        reference_outputs (dict): Reference output containing expected trajectory.
    Returns:
        dict: Result with key, score, and optional error.
    """
    try:
        is_order_correct = outputs.get("trajectory", []) == reference_outputs.get("trajectory", [])
        return {
            "key": "tool_order_correctness",
            "score": is_order_correct
        }
    except Exception as e:
        return {
            "key": "tool_order_correctness",
            "score": False,
            "error": str(e)
        }

huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)




### 3. Evaluation: Answer Correctness using Gemini API

- Uses Gemini API to evaluate the correctness of generated answers
- Compares the generated answer against the expected answer given the input question
- Returns a score between 0-100

huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)


[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
langchain-google-genai 3.1.0 requires google-ai-generativelanguage<1.0.0,>=0.9.0, but you have google-ai-generativelanguage 0.6.15 which is incompatible.[0m[31m
[0m

In [10]:
import os
from openevals.llm import create_llm_as_judge
from openevals.prompts import CORRECTNESS_PROMPT

In [11]:

def correctness_evaluator(input : str, outputs: dict, reference_outputs: dict):
    correctness_evaluator = create_llm_as_judge(
        prompt=CORRECTNESS_PROMPT,
        model="google_genai:gemini-2.5-flash",  # Correct model name for Google AI Studio
        feedback_key="correctness",
    )

    eval_result = correctness_evaluator(
        inputs=input,
        outputs=outputs["answer"],
        reference_outputs=reference_outputs["expected_answer"]
    )
    return eval_result

In [12]:
output = target({"messages": "what is there in budapest"}, langfuse_handler)

In [13]:
correctness_evaluator(
        input= "what is there in budapest",
        outputs = output, 
        reference_outputs = {"expected_answer": "Budapest is known for its stunning architecture, thermal baths, the Danube River, Buda Castle, and vibrant cultural scene."})

Key 'additionalProperties' is not supported in schema, ignoring
Key 'parameters' is not supported in schema, ignoring


{'key': 'correctness',
 'score': False,
 'comment': "The model failed to provide any information about Budapest, stating it couldn't find specific information. Instead, it offered information about Barcelona, which is not what the user asked for. The answer is incomplete and does not address the question asked by the user. Thus, the score should be: False.",
 'metadata': None}

In [29]:
# Prepare metadata for experiment tracking and reproducibility
metadata = {
    "llm_model_name": os.getenv("AZURE_OPENAI_DEPLOYMENT_NAME"),
    "embedding_model_name": os.getenv("EMBEDDING_MODEL_NAME"),
    "dataset_name": dataset_name,
    "qdrant_url": os.getenv("QDRANT_URL"),
    "collection_name": os.getenv("COLLECTION_NAME"),
    "chunking_strategy": {"chunk_size": 150, "chunk_overlap": 64},
}

# Define experiment prefix for logging and organization
# experiment_prefix = f"agent_eval_{prompt_detail['agent_system_prompt']}_{prompt_detail['router_prompt']}_{prompt_detail['user_prompt']}"
run_prefix = f"agent_eval_{os.getenv("COLLECTION_NAME")}_{os.getenv("AZURE_OPENAI_DEPLOYMENT_NAME")}"

In [30]:
metadata, run_prefix

({'llm_model_name': None,
  'embedding_model_name': '“huggingface-sentence-transformers/all-MiniLM-L6-v2"',
  'dataset_name': 'golden_dataset_agent_evals_with_tools',
  'qdrant_url': 'http://localhost:6333',
  'collection_name': 'locations_pdfs',
  'chunking_strategy': {'chunk_size': 150, 'chunk_overlap': 64}},
 'agent_eval_locations_pdfs_None')

## 5. Experiment Execution and Results Logging


In [32]:
# Load the evaluation dataset from LangFuse for experiment execution
from langfuse import get_client

dataset = get_client().get_dataset(dataset_name)

In [33]:
# Define run names for experiment runs (can be extended for multiple runs)
run_names = ["demo_run"]

In [34]:
# Print input and expected output for the first dataset item to verify data integrity
for item in dataset.items:
    print(item.input)
    print(item.expected_output)
    break

National Holidays in Austria
{'trajectory': ['generic'], 'golden_context': 'nan', 'expected_answer': 'List of National holidays in Austria'}


In [35]:
import time
# Run the experiment for each dataset item and log results to LangFuse
for item in dataset.items:
    # Use the item.run() context manager for automatic trace linking
    with item.run(
        run_name=run_prefix,
        run_description="Demo AI Agent Evaluation",
        run_metadata=metadata
    ) as root_span:

        # Invoke the target agent function and get output
        output = target({"messages": item.input}, langfuse_handler)
        root_span.update(output=output, input=item.input, metadata=metadata)

        time.sleep(30)  ## adding delays 
        # Track custom trajectory evaluations
        evaluate_unmatched_steps_dict = evaluate_unmatched_steps(output, item.expected_output)
        evaluate_exact_match_dict = evaluate_exact_match(output, item.expected_output)
        evaluate_tool_order_dict = evaluate_tool_order(output, item.expected_output) 
        evaluate_correctness = correctness_evaluator(item.input, output, item.expected_output )
        
        # Log evaluation scores to LangFuse
        root_span.score_trace(
            name=evaluate_unmatched_steps_dict["key"],
            value=evaluate_unmatched_steps_dict["score"],
        )       

        root_span.score_trace(
            name=evaluate_exact_match_dict["key"],
            value=evaluate_exact_match_dict["score"],
        )

        root_span.score_trace(
            name=evaluate_tool_order_dict["key"],
            value=evaluate_tool_order_dict["score"],
            comment=evaluate_tool_order_dict.get("error", "check tool order correctness")
        )

        root_span.score_trace(
            name=evaluate_correctness["key"],
            value=evaluate_correctness["score"],
            comment=evaluate_correctness["comment"]
        )

        # Optionally, score the result against the expected output
        # root_span.score_trace(
        #     name="user-feedback_test",
        #     value=1,
        #     comment="This is a test comment", 
        # )
        
        
# Ensure all data is sent to the LangFuse server at the end of the experiment
langfuse.flush()

Key 'additionalProperties' is not supported in schema, ignoring
Key 'parameters' is not supported in schema, ignoring
Key 'additionalProperties' is not supported in schema, ignoring
Key 'parameters' is not supported in schema, ignoring
Key 'additionalProperties' is not supported in schema, ignoring
Key 'parameters' is not supported in schema, ignoring
Key 'additionalProperties' is not supported in schema, ignoring
Key 'parameters' is not supported in schema, ignoring
Key 'additionalProperties' is not supported in schema, ignoring
Key 'parameters' is not supported in schema, ignoring
Key 'additionalProperties' is not supported in schema, ignoring
Key 'parameters' is not supported in schema, ignoring
Key 'additionalProperties' is not supported in schema, ignoring
Key 'parameters' is not supported in schema, ignoring
Key 'additionalProperties' is not supported in schema, ignoring
Key 'parameters' is not supported in schema, ignoring
Key 'additionalProperties' is not supported in schema, i