# Observability with LangFuse and Evaluation with RAGAS 🔍📊

In the Strands Agents SDK, observability refers to your ability to measure system behavior and performance. Observability combines instrumentation, data collection, and analysis techniques. These techniques provide insights into an agent's behavior and performance, helping you effectively build, debug, and maintain agents that better serve your unique needs and reliably complete tasks.

This notebook demonstrates how to build an agent with observability and evaluation capabilities. 

We use [Langfuse](https://langfuse.com/) to process the Strands Agent traces and [Ragas](https://www.ragas.io/) metrics to evaluate agent performance. The primary focus is on agent evaluation and the quality of responses generated by the agent using traces produced by the SDK.

### What is Observability and Evaluation?

**Observability** means being able to see what your AI agent is doing "behind the scenes" - like watching its thought process. It helps you understand why your agent makes certain decisions or gives particular responses.

**Evaluation** is how we measure if our agent is doing a good job. Instead of just guessing if responses are good, we use specific metrics to score the agent's performance.

### Observability Components

All observability APIs are embedded directly within the Strands Agents SDK. The following are key observability data points:

[**Metrics**](https://strandsagents.com/latest/user-guide/observability-evaluation/metrics/) - Essential for understanding agent performance, optimizing behavior, and monitoring resource usage.

[**Traces**](https://strandsagents.com/latest/user-guide/observability-evaluation/traces/) - A fundamental component of the Strands SDK's observability framework, providing detailed insights into your agent's execution.

[**Logs**](https://strandsagents.com/latest/user-guide/observability-evaluation/logs/) - Strands SDK uses Python's standard logging module to provide visibility into operations.

[**Evaluation**](https://strandsagents.com/latest/user-guide/observability-evaluation/evaluation/) - Essential for measuring agent performance, tracking improvements, and ensuring your agents meet quality standards. With Strands SDK, you can perform Manual Evaluation, Structured Testing, LLM Judge Evaluation, and Tool-Specific Evaluation.

### OpenTelemetry Integration

Strands natively integrates with OpenTelemetry, an industry standard for distributed tracing. You can visualize and analyze traces using any OpenTelemetry-compatible tool. This integration provides:

- **Compatibility with existing observability tools:** Send traces to platforms such as Jaeger, Grafana Tempo, AWS X-Ray, Datadog, and more
- **Standardized attribute naming:** Uses OpenTelemetry semantic conventions
- **Flexible export options:** Console output for development, OTLP endpoint for production
- **Auto-instrumentation:** Trace creation is handled automatically when you turn on tracing

## 🍽️🔍Observability and Evaluation with Restaurant Agent

In this notebook, we'll demonstrate how to build a restaurant recommendation agent with observability and evaluation capabilities. This is designed for beginners who want to learn about AI agents, observability, and evaluation without complex infrastructure setup.

> ⭐ Based on the code from [08-observability-and-evaluation/Observability-and-Evaluation-sample.ipynb](https://github.com/strands-agents/samples/blob/main/01-tutorials/01-fundamentals/08-observability-and-evaluation/Observability-and-Evaluation-sample.ipynb) of the [Strands Agents Samples repository](https://github.com/strands-agents/)

### What We'll Build

We'll use these key components:

1. **Local Vector Database**: A searchable collection of restaurant information that our agent can query
2. **Strands Agent**: An AI assistant that can recommend restaurants based on user preferences
3. **LangFuse**: A tool that lets us "see" how our agent works and makes decisions
4. **RAGAS**: A framework that helps us evaluate how well our agent is performing
![image](image/restaurant_agent_architecture.png)

### ✅ Install Required Packages

First, we need to install all the necessary packages for our notebook. Each package has a specific purpose:

- **langchain**: Helps us build applications with language models
- **langfuse**: Provides observability for our agent
- **ragas**: Helps us evaluate our agent's performance
- **chromadb**: A database for storing and searching vector embeddings
- **docx2txt**: Converts Word documents to text
- **boto3**: AWS SDK for Python, used to access AWS services
- **strands**: Framework for building AI agents

In [None]:
# Install required packages
# The -q flag makes the installation quieter (less output)
!pip install -q langchain langfuse ragas chromadb docx2txt boto3 strands

### ✅ Create Vector Database from Restaurant Data

A vector database stores text as numbers (vectors) that represent the meaning of the text. This allows us to search for similar meanings, not just exact word matches. For example, if we search for "vegetarian food", we might also find results about "plant-based dishes" even if those exact words weren't used.

We'll create a vector database using restaurant data files in the `restaurant-data` folder. These files contain information about different restaurants, their menus, and specialties.

In [None]:
# Import necessary libraries for document loading
import os  # For working with files and directories
import docx2txt  # For converting Word documents to text
from langchain.document_loaders import TextLoader  # For loading text documents
from langchain.text_splitter import RecursiveCharacterTextSplitter  # For splitting text into chunks
from langchain.schema.document import Document  # For creating document objects

In [None]:
# Function to load DOCX files
def load_docx(file_path):
    """
    This function takes a Word document (.docx) file path and converts it to text.
    It then creates a Document object that can be used by our vector database.
    
    Args:
        file_path: Path to the DOCX file
        
    Returns:
        A Document object containing the text and metadata
    """
    # Extract text from the DOCX file
    text = docx2txt.process(file_path)
    
    # Create a Document object with the text and source information
    return Document(page_content=text, metadata={"source": file_path})

In [None]:
# Load all restaurant data files
restaurant_data_dir = './restaurant-data/'  # Directory containing restaurant data files
documents = []  # Empty list to store our documents

# Loop through all files in the restaurant data directory
for filename in os.listdir(restaurant_data_dir):
    # Only process .docx files and skip temporary files (those starting with ~)
    if filename.endswith('.docx') and not filename.startswith('~'):
        file_path = os.path.join(restaurant_data_dir, filename)
        try:
            # Load the document and add it to our list
            doc = load_docx(file_path)
            documents.append(doc)
            print(f"Loaded: {filename}")
        except Exception as e:
            # If there's an error, print it but continue with other files
            print(f"Error loading {filename}: {e}")

print(f"Total documents loaded: {len(documents)}")

In [None]:
# Split documents into chunks
# We split large documents into smaller chunks to make search more effective
text_splitter = RecursiveCharacterTextSplitter(
    chunk_size=500,  # Each chunk will be about 500 characters
    chunk_overlap=100  # Chunks will overlap by 100 characters to maintain context
)

# Apply the splitter to our documents
splits = text_splitter.split_documents(documents)
print(f"Created {len(splits)} document chunks")

### ✅ Set up embeddings

Embeddings are the mathematical representations (vectors) of text. We need to convert our text chunks into these vectors so they can be searched efficiently. For simplicity, we'll use a model from Amazon Bedrock, but in a production environment, you might want to use other embedding models.

In [None]:
# Import necessary libraries for embeddings
import boto3  # AWS SDK for Python
from langchain_aws import BedrockEmbeddings  # For using Amazon Bedrock embeddings

# Create a client for Amazon Bedrock
# This allows us to communicate with the Bedrock service
bedrock_client = boto3.client("bedrock-runtime", region_name="us-east-1")

# Specify which embedding model to use
# Titan is Amazon's embedding model that converts text to vectors
bedrock_embedding_model_id = 'amazon.titan-embed-text-v1'

In [None]:
# Create the embedding model object
# This will be used to convert our text to vectors
embedding_model = BedrockEmbeddings(
    client=bedrock_client,
    model_id=bedrock_embedding_model_id)

In [None]:
# Create and persist the vector database
from langchain_chroma import Chroma  # Chroma is a vector database

# Define the directory where we'll save our vector database
# This allows us to reuse it later without recreating it
persist_directory = './restaurant-vectordb/'

# Create the vector database from our document chunks
vectordb = Chroma.from_documents(
    documents=splits,  # Our document chunks
    embedding=embedding_model,  # The embedding model to use
    persist_directory=persist_directory  # Where to save the database
)

print("Vector database created successfully!")

### ✅ Test the Vector Database

Now that we've created our vector database, let's test it with a simple query to make sure it works correctly. We'll search for vegetarian options and see what results we get.

In [None]:
# Test a simple query
query = "What vegetarian options are available?"  # Our test question
results = vectordb.similarity_search(query, k=3)  # Get the top 3 most relevant results

print("Query:", query)
print("\nTop 3 results:")
for i, doc in enumerate(results):
    print(f"\nResult {i+1}:")
    print(f"Source: {doc.metadata['source']}")  # Which restaurant this is from
    print(f"Content: {doc.page_content[:200]}...")  # Show the first 200 characters

### ✅ Set Up LangFuse for Observability

**What is LangFuse?**

LangFuse is like a dashboard for your AI agent. It helps you see what's happening inside your agent - what questions it's getting, how it's thinking about them, and what answers it's giving. This is incredibly useful for debugging and improving your agent.

Now, let's configure LangFuse for observability. You'll need to create a LangFuse account and get your API keys.

### [Create a new project in Langfuse](https://catalog.us-east-1.prod.workshops.aws/workshops/33f099a6-45a2-47d7-9e3c-a23a6568821e/en-US/01-fundamentals/18-agent-observability-and-evaluation#create-a-new-project-in-langfuse)

1- Click on Sign-up to create a [langfuse account](https://us.cloud.langfuse.com/) or [Sign-in to an existing account](https://us.cloud.langfuse.com/).

![image](image/1-langfuse.png)

2- Create a New Organization and enter an orgnization name. Skip the Invite Members. Then create the project. 

![image](image/project.png)

3- Copy and paste the Secret Key, Public Key and Host. Note you can also find the credentials in the Settings -> API Keys page.

![api](image/api.png)

In [None]:
# LangFuse configuration
from langfuse import Langfuse

# Replace these with your LangFuse credentials
# These keys are like passwords that let your code connect to LangFuse
public_key = "your-public-key"  # Replace with your public key
secret_key = "your-secret-key"  # Replace with your secret key

# Create a LangFuse client
# This is the object we'll use to communicate with LangFuse
langfuse = Langfuse(
  public_key=public_key,
  secret_key=secret_key,
  host="https://us.cloud.langfuse.com"  # For US region
)

In [None]:
# Make sure we can access the client
from langfuse import get_client
 
# Access the client directly
langfuse = get_client(public_key=public_key)
 
# Flush all pending observations
# This ensures all data is sent to LangFuse
langfuse.flush()

### ✅  Create a Restaurant Recommendation Agent

Now, let's create a Strands Agent that uses our vector database to provide restaurant recommendations. This agent will:
1. Receive questions from users about restaurants
2. Search our vector database for relevant information
3. Generate helpful responses based on the search results

In [None]:
# Import necessary libraries for the agent
from strands import Agent  # The main Agent class
from strands.models.anthropic import AnthropicModel  # For using Claude model
from strands.tools import tool  # For creating tools the agent can use

In [None]:
# Set up the language model for our agent
# We're using Claude 3 Sonnet from Anthropic
model = AnthropicModel(
    client_args={
        "api_key": "your-anthropic-api-key",  # Replace with your API key
    },
    max_tokens=1028,  # Maximum response length
    model_id="claude-3-sonnet-20240229",  # Which model to use
    params={
        "temperature": 0.3,  # Lower temperature means more consistent, focused responses
    }
)

In [None]:
# Create a retrieval tool using our vector database
# This tool will allow our agent to search for restaurant information

@tool
def search_restaurants(query):
    """
    Search for restaurant information based on cuisine, dietary preferences, location, or other criteria.
    
    Args:
        query (str): The search query about restaurants
        
    Returns:
        str: Relevant information about restaurants matching the query
    """
    # Load the persisted vector database
    loaded_vectordb = Chroma(persist_directory=persist_directory, embedding_function=embedding_model)
    
    # Perform a similarity search
    results = loaded_vectordb.similarity_search(query, k=3)
    
    # Format the results
    formatted_results = ""
    for i, doc in enumerate(results):
        restaurant_name = os.path.basename(doc.metadata['source']).replace('.docx', '')
        formatted_results += f"Restaurant: {restaurant_name}\n"
        formatted_results += f"Information: {doc.page_content}\n\n"
    
    return formatted_results if formatted_results else "No relevant information found."

In [None]:
# Create the restaurant recommendation agent
import uuid  # For generating unique IDs

restaurant_agent = Agent(
    name = "Restaurant Recommendation Agent",
    model=model,
    tools=[search_restaurants],  # Give the agent access to our search tool
    system_prompt="""You are a helpful restaurant recommendation assistant. 
    Use the search_restaurants tool to find information about restaurants based on user queries.
    Provide detailed recommendations based on the search results.
    If asked about restaurants that aren't in the database, politely explain that you can only provide information about restaurants in your database.
    Always be friendly, helpful, and concise in your responses.
    """,
    record_direct_tool_call = True,  # Record when tools are used
    trace_attributes={
        "session.id": str(uuid.uuid4()),  # Generate a unique session ID
        "user.id": "user-email-example@domain.com",  # Example user ID
        "langfuse.tags": [
            "Agent-SDK-Example",
            "Strands-Project-Demo",
            "Observability-Tutorial"
        ]
    }
)

### ✅  Test the Agent with Tracing

Now let's test our agent with a simple query and see how it performs. The agent will use the search tool to find relevant information and then generate a response.

In [None]:
# Test the agent with a simple query
response = restaurant_agent("I'm looking for a restaurant with good vegetarian options. Any recommendations?")
print(response)

### ✅ Review the traces

After running the agent, you can review the traces in LangFuse:

1. Go to the tracing menu in your LangFuse project
2. Select the trace you want to view
3. Examine how the agent processed the request, what tools it used, and what response it generated

This gives you visibility into how your agent is working and helps you identify any issues or areas for improvement.

## 6. Set Up RAGAS for Evaluation

Now, let's use RAGAS to evaluate the quality of our agent's responses. RAGAS (Retrieval Augmented Generation Assessment) is a framework for evaluating RAG systems.

### What is RAGAS?

RAGAS helps us measure how well our agent is performing by looking at different aspects of its responses:
- Is the information accurate?
- Is it relevant to the user's question?
- Is it using the right tools?
- Is it communicating in a friendly way?

In [None]:
# Import RAGAS libraries
from ragas.llms import LangchainLLMWrapper

# Set up the evaluator LLM (we'll use the same model as our agent)
evaluator_llm = LangchainLLMWrapper(model)

### Define RAGAS Metrics

We'll define several metrics to evaluate different aspects of our agent's performance:

In [None]:
from ragas.metrics import AspectCritic

# Metric to check if the agent fulfills all user requests
request_completeness = AspectCritic(
    name="Request Completeness",
    llm=evaluator_llm,
    definition=(
        "Return 1 if the agent completely fulfills all the user requests with no omissions. "
        "otherwise, return 0."
    ),
)

# Metric to assess if the AI's communication aligns with the desired brand voice
brand_tone = AspectCritic(
    name="Brand Voice Metric",
    llm=evaluator_llm,
    definition=(
        "Return 1 if the AI's communication is friendly, approachable, helpful, clear, and concise; "
        "otherwise, return 0."
    ),
)

# Tool usage effectiveness metric
tool_usage_effectiveness = AspectCritic(
    name="Tool Usage Effectiveness",
    llm=evaluator_llm,
    definition=(
        "Return 1 if the agent appropriately used available tools to fulfill the user's request "
        "(such as using retrieve for menu questions and current_time for time questions). "
        "Return 0 if the agent failed to use appropriate tools or used unnecessary tools."
    ),
)

# Tool selection appropriateness metric
tool_selection_appropriateness = AspectCritic(
    name="Tool Selection Appropriateness",
    llm=evaluator_llm,
    definition=(
        "Return 1 if the agent selected the most appropriate tools for the task. "
        "Return 0 if better tool choices were available or if unnecessary tools were selected."
    ),
)

We'll also define a rubric score metric to evaluate how the agent handles situations where requested items aren't available:

In [None]:
from ragas.metrics import RubricsScore

# Define a rubric for evaluating recommendations
rubrics = {
    "score-1_description": (
        """The item requested by the customer is not present in the menu and no 
        recommendations were made."""
    ),
    "score0_description": (
        "Either the item requested by the customer is present in the menu, "
        "or the conversation does not include any "
        "food or menu inquiry (e.g., booking, cancellation). "
        "This score applies regardless of whether any recommendation was "
        "provided."
    ),
    "score1_description": (
        "The item requested by the customer is not present in the menu "
        "and a recommendation was provided."
    ),
}

# Create the recommendations metric
recommendations = RubricsScore(rubrics=rubrics, llm=evaluator_llm, name="Recommendations")

Finally, let's define metrics to evaluate the RAG (Retrieval-Augmented Generation) aspects of our agent:

In [None]:
from ragas.metrics import ContextRelevance, ResponseGroundedness 

# Context relevance measures how well the retrieved contexts address the user's query
context_relevance = ContextRelevance(llm=evaluator_llm)

# Response groundedness determines if the response is supported by the provided contexts
response_groundedness = ResponseGroundedness(llm=evaluator_llm)

metrics=[context_relevance, response_groundedness]

## 7. Evaluate the Agent and Send Results to LangFuse

Now we'll create functions to evaluate our agent and send the results back to LangFuse:

In [None]:
import pandas as pd
from datetime import datetime, timedelta
from ragas.dataset_schema import (
    SingleTurnSample,
    MultiTurnSample,
    EvaluationDataset
)
from ragas import evaluate

# Function to evaluate agent responses
def evaluate_agent_response(query, response, context):
    """
    Evaluate the agent's response using RAGAS metrics.
    
    Args:
        query (str): The user's query
        response (str): The agent's response
        context (str): The context used to generate the response
        
    Returns:
        dict: Evaluation scores
    """
    # Create a dataset for evaluation
    sample = SingleTurnSample(
        user_input=query,
        response=response,
        retrieved_contexts=[context]
    )
    dataset = EvaluationDataset(samples=[sample])
    
    # Run evaluation with RAGAS metrics
    result = evaluate(
        dataset=dataset,
        metrics=[
            context_relevance,
            response_groundedness
        ]
    )
    
    return result

In [None]:
# Function to send evaluation scores to LangFuse
def send_evaluation_to_langfuse(trace_id, evaluation_result):
    """
    Send evaluation scores to LangFuse.
    
    Args:
        trace_id (str): The trace ID from LangFuse
        evaluation_result (dict): The evaluation results from RAGAS
    """
    # Convert evaluation result to pandas DataFrame
    eval_df = evaluation_result.to_pandas()
    
    # Extract scores from evaluation result
    scores = {}
    for column in eval_df.columns:
        if column not in ['user_input', 'response', 'retrieved_contexts']:
            try:
                scores[column] = float(eval_df[column].iloc[0])
            except:
                scores[column] = 0.0
    
    # Calculate average score
    avg_score = sum(scores.values()) / len(scores) if scores else 0
    
    # Send scores to LangFuse
    langfuse.score(
        trace_id=trace_id,
        name="ragas_evaluation",
        value=avg_score,
        comment="RAGAS evaluation scores",
        metadata=scores
    )
    
    print(f"Evaluation scores sent to LangFuse for trace {trace_id}")

Let's run a complete example with tracing and evaluation:

In [None]:
# Run a complete example with tracing and evaluation
query = "I need a restaurant for a business dinner with clients who prefer seafood. What do you recommend?"

# Start a new trace in LangFuse
trace = langfuse.trace(name="restaurant_recommendation")

# Get context
context = search_restaurants(query)

# Log the context retrieval
context_span = trace.span(
    name="context_retrieval",
    input={"query": query},
    output={"context": context}
)

# Get agent response
response = restaurant_agent(
    query,
    metadata={"langfuse_trace_id": trace.id}
)

# Log the agent response
response_span = trace.span(
    name="agent_response",
    input={"query": query},
    output={"response": response}
)

# Evaluate the response
evaluation_result = evaluate_agent_response(query, response, context)

# Send evaluation to LangFuse
send_evaluation_to_langfuse(trace.id, evaluation_result)

# End the trace
trace.end()

print("Query:", query)
print("\nAgent Response:")
print(response)
print("\nEvaluation Results:")
print(evaluation_result.to_pandas())
print(f"\nLangFuse Trace ID: {trace.id}")

### ✅ Conclusion

In this notebook, we've demonstrated how to:

1. Create a local vector database from restaurant data files
2. Build a restaurant recommendation agent using Strands Agent
3. Set up LangFuse for observability and tracing
4. Use RAGAS to evaluate the quality of agent responses
5. Send evaluation results back to LangFuse

This approach provides a comprehensive framework for building, monitoring, and evaluating AI agents without requiring complex AWS infrastructure deployment.

You can view your traces and evaluation results in the LangFuse dashboard to gain insights into your agent's performance and identify areas for improvement.