# RAG Application with Langfuse Observability

## Homework 5: LLM Application with Comprehensive Tracing

This notebook demonstrates:
1. **RAG System** - Document loading and question answering
2. **Local Ollama LLM** - Using qwen2.5:1.5b model
3. **Langfuse Integration** - Full observability with traces, spans, generations, events, and scores
4. **Datasets** - Creating test datasets for evaluation
5. **Custom Evaluators** - LLM-as-a-judge and custom metrics

## 1. Setup and Configuration

In [1]:
# Install dependencies (run once)
# !pip install langchain langchain-community langchain-ollama langchain-chroma chromadb langfuse python-dotenv

In [None]:
import os
import time
import uuid
import json
from datetime import datetime
from typing import List, Dict, Any, Optional

# Langfuse - using decorator-based API (v2.x+)
from langfuse import Langfuse
from langfuse.decorators import observe, langfuse_context

# LangChain
from langchain_ollama import OllamaLLM, OllamaEmbeddings
from langchain_chroma import Chroma
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_core.documents import Document
from langchain_core.prompts import PromptTemplate

print("All imports successful!")

In [3]:
# Configuration
# For LOCAL Langfuse (Docker): http://localhost:3000
# For CLOUD Langfuse: https://cloud.langfuse.com

LANGFUSE_HOST = os.getenv("LANGFUSE_HOST", "http://localhost:3000")
LANGFUSE_PUBLIC_KEY = os.getenv("LANGFUSE_PUBLIC_KEY", "pk-lf-your-public-key")
LANGFUSE_SECRET_KEY = os.getenv("LANGFUSE_SECRET_KEY", "sk-lf-your-secret-key")

OLLAMA_BASE_URL = os.getenv("OLLAMA_BASE_URL", "http://localhost:11434")
OLLAMA_MODEL = os.getenv("OLLAMA_MODEL", "qwen2.5:1.5b")

# User ID for tracking
USER_ID = f"user_{uuid.uuid4().hex[:8]}"

print(f"Langfuse Host: {LANGFUSE_HOST}")
print(f"Ollama Model: {OLLAMA_MODEL}")
print(f"User ID: {USER_ID}")

Langfuse Host: http://localhost:3000
Ollama Model: qwen2.5:1.5b
User ID: user_b706d106


## 2. Initialize Langfuse Client

Langfuse provides comprehensive observability for LLM applications:
- **Traces**: Full execution path of requests
- **Spans**: Individual operations within a trace
- **Generations**: LLM calls with token usage
- **Events**: Point-in-time occurrences
- **Scores**: Quality and performance metrics

In [None]:
# Initialize Langfuse client
# Note: The decorator-based API uses environment variables or explicit configuration
langfuse = Langfuse(
    host=LANGFUSE_HOST,
    public_key=LANGFUSE_PUBLIC_KEY,
    secret_key=LANGFUSE_SECRET_KEY,
)

# Configure the decorator context to use our Langfuse instance
langfuse_context.configure(
    host=LANGFUSE_HOST,
    public_key=LANGFUSE_PUBLIC_KEY,
    secret_key=LANGFUSE_SECRET_KEY,
)

print("Langfuse client initialized!")
print(f"Dashboard URL: {LANGFUSE_HOST}")

## 3. Initialize Ollama LLM and Embeddings

In [5]:
# Initialize Ollama LLM
llm = OllamaLLM(
    model=OLLAMA_MODEL,
    base_url=OLLAMA_BASE_URL,
    temperature=0.7,
)

# Initialize Ollama Embeddings
embeddings = OllamaEmbeddings(
    model=OLLAMA_MODEL,
    base_url=OLLAMA_BASE_URL,
)

# Test LLM connection
test_response = llm.invoke("Say 'Hello, I am ready!'")
print(f"LLM Response: {test_response}")

LLM Response: Hello, I am ready!


## 4. Sample Documents for RAG

In [6]:
# Sample documents about AI/ML topics
SAMPLE_DOCUMENTS = [
    {
        "content": """
        Machine Learning Fundamentals
        
        Machine learning is a subset of artificial intelligence that enables systems to learn
        and improve from experience without being explicitly programmed. There are three main
        types of machine learning:
        
        1. Supervised Learning: The algorithm learns from labeled training data. Examples include
           classification (predicting categories) and regression (predicting continuous values).
           Common algorithms: Linear Regression, Decision Trees, Random Forest, SVM.
        
        2. Unsupervised Learning: The algorithm finds patterns in unlabeled data. Examples include
           clustering (grouping similar items) and dimensionality reduction.
           Common algorithms: K-Means, PCA, DBSCAN.
        
        3. Reinforcement Learning: The algorithm learns through trial and error, receiving rewards
           or penalties for actions taken. Used in robotics, game playing, and autonomous vehicles.
        """,
        "metadata": {"source": "ml_fundamentals.txt", "topic": "machine_learning"}
    },
    {
        "content": """
        Deep Learning and Neural Networks
        
        Deep learning is a subset of machine learning based on artificial neural networks.
        Key concepts include:
        
        - Neurons and Layers: Neural networks consist of interconnected nodes (neurons)
          organized in layers: input, hidden, and output layers.
        
        - Activation Functions: Functions like ReLU, Sigmoid, and Tanh introduce non-linearity,
          allowing networks to learn complex patterns.
        
        - Backpropagation: The algorithm used to train neural networks by calculating gradients
          and adjusting weights to minimize the loss function.
        
        - Common Architectures:
          * CNNs (Convolutional Neural Networks): Best for image processing
          * RNNs (Recurrent Neural Networks): Best for sequential data
          * Transformers: State-of-the-art for NLP tasks, basis for GPT and BERT
        """,
        "metadata": {"source": "deep_learning.txt", "topic": "deep_learning"}
    },
    {
        "content": """
        Large Language Models (LLMs)
        
        Large Language Models are AI systems trained on vast amounts of text data. Key aspects:
        
        - Architecture: Most modern LLMs use the Transformer architecture, which relies on
          self-attention mechanisms to process text efficiently.
        
        - Training: LLMs are trained on massive datasets using unsupervised learning, often
          followed by fine-tuning with human feedback (RLHF).
        
        - Capabilities: Text generation, summarization, translation, question answering,
          code generation, and reasoning tasks.
        
        - Examples: GPT-4, Claude, LLaMA, Qwen, Mistral
        
        - RAG (Retrieval-Augmented Generation): A technique that combines LLMs with external
          knowledge retrieval to provide more accurate and up-to-date responses.
        """,
        "metadata": {"source": "llm_overview.txt", "topic": "llm"}
    },
    {
        "content": """
        Natural Language Processing (NLP)
        
        NLP is a field of AI focused on enabling computers to understand, interpret,
        and generate human language. Key tasks include:
        
        - Tokenization: Breaking text into words or subwords
        - Part-of-Speech Tagging: Identifying grammatical roles
        - Named Entity Recognition: Identifying names, places, organizations
        - Sentiment Analysis: Determining emotional tone
        - Machine Translation: Converting between languages
        - Text Summarization: Creating concise summaries
        - Question Answering: Providing answers based on context
        
        Modern NLP heavily relies on transformer-based models like BERT for understanding
        and GPT for generation tasks.
        """,
        "metadata": {"source": "nlp_basics.txt", "topic": "nlp"}
    }
]

print(f"Loaded {len(SAMPLE_DOCUMENTS)} sample documents")
for doc in SAMPLE_DOCUMENTS:
    print(f"  - {doc['metadata']['source']}: {doc['metadata']['topic']}")

Loaded 4 sample documents
  - ml_fundamentals.txt: machine_learning
  - deep_learning.txt: deep_learning
  - llm_overview.txt: llm
  - nlp_basics.txt: nlp


## 5. Document Loading with Langfuse Tracing

This section demonstrates:
- Creating traces for operations
- Using spans for sub-operations
- Recording events
- Adding scores

In [None]:
@observe(name="document_indexing")
def load_documents_with_tracing(documents: List[Dict]) -> Chroma:
    """
    Load documents into vector store with comprehensive Langfuse tracing.
    
    Demonstrates:
    - @observe decorator for automatic tracing
    - Nested spans via nested @observe functions
    - Events and scores via langfuse_context
    """
    # Update trace with metadata
    langfuse_context.update_current_trace(
        user_id=USER_ID,
        metadata={
            "num_documents": len(documents),
            "model": OLLAMA_MODEL,
            "operation": "document_loading"
        },
        tags=["indexing", "rag", "setup"]
    )
    
    total_start = time.time()
    
    # Span 1: Text Splitting (nested observation)
    chunks = split_documents(documents)
    
    print(f"Split {len(documents)} documents into {len(chunks)} chunks")
    
    # Span 2: Embedding Generation (nested observation)
    vectorstore = create_embeddings(chunks)
    
    total_time = time.time() - total_start
    
    # Score: Processing quality
    langfuse_context.score_current_trace(
        name="indexing_success",
        value=1.0,
        comment=f"Successfully indexed {len(chunks)} chunks in {total_time:.2f}s"
    )
    
    trace_id = langfuse_context.get_current_trace_id()
    print(f"Indexing completed in {total_time:.2f}s")
    print(f"Trace ID: {trace_id}")
    
    return vectorstore


@observe(name="text_splitting")
def split_documents(documents: List[Dict]) -> List[Document]:
    """Split documents into chunks."""
    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=500,
        chunk_overlap=50,
        separators=["\n\n", "\n", ". ", " ", ""]
    )
    
    # Create Document objects
    docs = [
        Document(page_content=d["content"], metadata=d["metadata"])
        for d in documents
    ]
    
    # Split into chunks
    chunks = text_splitter.split_documents(docs)
    
    langfuse_context.update_current_observation(
        output={
            "num_chunks": len(chunks),
            "avg_chunk_size": sum(len(c.page_content) for c in chunks) / len(chunks)
        }
    )
    
    return chunks


@observe(name="embedding_generation")
def create_embeddings(chunks: List[Document]) -> Chroma:
    """Create embeddings and vector store."""
    embed_start = time.time()
    
    # Create vector store
    vectorstore = Chroma.from_documents(
        documents=chunks,
        embedding=embeddings,
        collection_name="rag_demo"
    )
    
    embed_time = time.time() - embed_start
    
    langfuse_context.update_current_observation(
        output={"status": "success", "vectorstore_type": "chroma"},
        metadata={"duration_seconds": embed_time}
    )
    
    return vectorstore

In [None]:
# Load documents
vectorstore = load_documents_with_tracing(SAMPLE_DOCUMENTS)

# Flush to ensure data is sent to Langfuse
langfuse.flush()

## 6. RAG Query with Full Instrumentation

This demonstrates comprehensive tracing for RAG queries:
- Main trace for the full query
- Span for document retrieval
- Generation for LLM call with token tracking
- Events for key points
- Scores for quality metrics

In [None]:
# RAG Prompt Template
RAG_PROMPT = PromptTemplate(
    template="""Use the following context to answer the question.
If you cannot find the answer in the context, say "I don't have enough information."

Context:
{context}

Question: {question}

Answer:""",
    input_variables=["context", "question"]
)


@observe(name="rag_query")
def rag_query(
    question: str,
    vectorstore: Chroma,
    session_id: Optional[str] = None,
    k: int = 3
) -> Dict[str, Any]:
    """
    Execute a RAG query with comprehensive Langfuse tracing.
    
    Tracks:
    - Execution time for all operations
    - Input/output data
    - Token usage (estimated)
    - Retrieved documents and relevance
    """
    session_id = session_id or f"session_{uuid.uuid4().hex[:8]}"
    
    # Update trace with metadata
    langfuse_context.update_current_trace(
        user_id=USER_ID,
        session_id=session_id,
        input=question,
        metadata={
            "model": OLLAMA_MODEL,
            "k": k,
            "question_length": len(question)
        },
        tags=["rag", "query", "qa"]
    )
    
    total_start = time.time()
    
    # Span: Document Retrieval (nested observation)
    docs, context, sources, retrieval_time = retrieve_documents(question, vectorstore, k)
    
    # Generation: LLM Call (nested observation)
    answer, generation_time, input_tokens, output_tokens = generate_answer(question, context)
    
    total_time = time.time() - total_start
    
    # Update trace with final output
    langfuse_context.update_current_trace(
        output=answer,
        metadata={
            "total_time_seconds": total_time,
            "retrieval_time_seconds": retrieval_time,
            "generation_time_seconds": generation_time
        }
    )
    
    # Score: Response quality (basic heuristic)
    quality_score = min(1.0, len(answer) / 100) if len(answer) > 20 else 0.3
    langfuse_context.score_current_trace(
        name="response_quality",
        value=quality_score,
        comment=f"Auto-scored based on response length ({len(answer)} chars)"
    )
    
    trace_id = langfuse_context.get_current_trace_id()
    
    return {
        "question": question,
        "answer": answer,
        "sources": sources,
        "context": context,
        "trace_id": trace_id,
        "metrics": {
            "total_time": total_time,
            "retrieval_time": retrieval_time,
            "generation_time": generation_time,
            "docs_retrieved": len(docs),
            "context_length": len(context),
            "input_tokens": input_tokens,
            "output_tokens": output_tokens
        }
    }


@observe(name="document_retrieval")
def retrieve_documents(question: str, vectorstore: Chroma, k: int):
    """Retrieve relevant documents for the question."""
    retrieval_start = time.time()
    
    # Retrieve documents
    retriever = vectorstore.as_retriever(search_kwargs={"k": k})
    docs = retriever.invoke(question)
    
    retrieval_time = time.time() - retrieval_start
    
    # Build context
    context = "\n\n---\n\n".join([doc.page_content for doc in docs])
    sources = [doc.metadata for doc in docs]
    
    langfuse_context.update_current_observation(
        input={"question": question, "k": k},
        output={
            "num_docs": len(docs),
            "context_length": len(context),
            "sources": sources
        },
        metadata={"duration_seconds": retrieval_time}
    )
    
    return docs, context, sources, retrieval_time


@observe(name="llm_answer_generation", as_type="generation")
def generate_answer(question: str, context: str):
    """Generate answer using LLM."""
    generation_start = time.time()
    
    # Update generation metadata
    langfuse_context.update_current_observation(
        model=OLLAMA_MODEL,
        input={
            "prompt_template": "rag_qa",
            "question": question,
            "context_length": len(context)
        },
        model_parameters={"temperature": 0.7}
    )
    
    # Format prompt and call LLM
    prompt = RAG_PROMPT.format(context=context, question=question)
    answer = llm.invoke(prompt)
    
    generation_time = time.time() - generation_start
    
    # Estimate tokens (rough approximation)
    input_tokens = int(len(prompt.split()) * 1.3)
    output_tokens = int(len(answer.split()) * 1.3)
    
    langfuse_context.update_current_observation(
        output=answer,
        usage={
            "input": input_tokens,
            "output": output_tokens,
            "total": input_tokens + output_tokens
        },
        metadata={"duration_seconds": generation_time}
    )
    
    return answer, generation_time, input_tokens, output_tokens

In [None]:
# Test queries
TEST_QUESTIONS = [
    "What are the three types of machine learning?",
    "What is backpropagation and how does it work?",
    "What is RAG and why is it useful?",
    "What are common NLP tasks?",
    "Explain the transformer architecture."
]

session_id = f"demo_session_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
results = []

print("Running test queries...\n")
for i, question in enumerate(TEST_QUESTIONS, 1):
    print(f"Query {i}: {question}")
    print("-" * 50)
    
    result = rag_query(question, vectorstore, session_id=session_id)
    results.append(result)
    
    print(f"Answer: {result['answer'][:300]}..." if len(result['answer']) > 300 else f"Answer: {result['answer']}")
    print(f"\nMetrics:")
    print(f"  Total time: {result['metrics']['total_time']:.2f}s")
    print(f"  Retrieval: {result['metrics']['retrieval_time']:.2f}s")
    print(f"  Generation: {result['metrics']['generation_time']:.2f}s")
    print(f"  Docs: {result['metrics']['docs_retrieved']}, Tokens: {result['metrics']['input_tokens']}+{result['metrics']['output_tokens']}")
    print(f"  Trace ID: {result['trace_id']}")
    print("\n")

# Flush to ensure data is sent
langfuse.flush()

## 7. Langfuse Datasets

Datasets in Langfuse allow you to:
- Store test cases for evaluation
- Track expected outputs
- Run experiments across multiple runs

In [None]:
# Create a dataset for RAG evaluation
DATASET_NAME = "rag_evaluation_dataset"

# Create or get dataset
dataset = langfuse.create_dataset(
    name=DATASET_NAME,
    description="Test dataset for RAG system evaluation",
    metadata={
        "created_by": USER_ID,
        "version": "1.0",
        "domain": "AI/ML knowledge"
    }
)

print(f"Dataset created: {DATASET_NAME}")

In [None]:
# Define test items with expected outputs
TEST_ITEMS = [
    {
        "input": {"question": "What are the three types of machine learning?"},
        "expected_output": "supervised learning, unsupervised learning, and reinforcement learning",
        "metadata": {"topic": "ml_basics", "difficulty": "easy"}
    },
    {
        "input": {"question": "What is backpropagation?"},
        "expected_output": "algorithm for training neural networks by calculating gradients",
        "metadata": {"topic": "deep_learning", "difficulty": "medium"}
    },
    {
        "input": {"question": "What is RAG?"},
        "expected_output": "Retrieval-Augmented Generation combines LLMs with external knowledge retrieval",
        "metadata": {"topic": "llm", "difficulty": "medium"}
    },
    {
        "input": {"question": "What are common activation functions?"},
        "expected_output": "ReLU, Sigmoid, and Tanh",
        "metadata": {"topic": "deep_learning", "difficulty": "easy"}
    },
    {
        "input": {"question": "Name some large language models."},
        "expected_output": "GPT-4, Claude, LLaMA, Qwen, Mistral",
        "metadata": {"topic": "llm", "difficulty": "easy"}
    }
]

# Add items to dataset
for item in TEST_ITEMS:
    langfuse.create_dataset_item(
        dataset_name=DATASET_NAME,
        input=item["input"],
        expected_output=item["expected_output"],
        metadata=item["metadata"]
    )

langfuse.flush()
print(f"Added {len(TEST_ITEMS)} items to dataset")

## 8. Custom Evaluator: LLM-as-a-Judge

Create custom evaluation logic using LLM to assess response quality.

In [None]:
# LLM-as-a-Judge Evaluator
JUDGE_PROMPT = PromptTemplate(
    template="""You are an expert evaluator. Assess the quality of the AI response.

Question: {question}
Expected Answer (key points): {expected}
Actual Response: {response}

Evaluate on these criteria (score 0-10 for each):
1. Relevance: Does it answer the question?
2. Accuracy: Are the facts correct based on expected answer?
3. Completeness: Does it cover key points?
4. Clarity: Is it well-written and clear?

Provide your evaluation as JSON:
{{
    "relevance": <score>,
    "accuracy": <score>,
    "completeness": <score>,
    "clarity": <score>,
    "overall": <average>,
    "explanation": "<brief explanation>"
}}

JSON evaluation:""",
    input_variables=["question", "expected", "response"]
)


@observe(name="llm_judge_evaluation")
def llm_judge_evaluate(
    question: str,
    expected: str,
    response: str,
    trace_id: str,
) -> Dict[str, Any]:
    """
    Use LLM as a judge to evaluate response quality.
    """
    # Update trace metadata
    langfuse_context.update_current_trace(
        user_id=USER_ID,
        metadata={
            "evaluated_trace_id": trace_id,
            "evaluation_type": "llm_as_judge"
        },
        tags=["evaluation", "llm-judge"]
    )
    
    # Call the judge generation
    scores = judge_generation(question, expected, response)
    
    # Add scores to original trace using the langfuse client directly
    overall_score = float(scores.get("overall", 5)) / 10.0
    
    langfuse.score(
        trace_id=trace_id,
        name="llm_judge_overall",
        value=overall_score,
        comment=scores.get("explanation", "LLM judge evaluation")
    )
    
    # Add individual metric scores if available
    for metric in ["relevance", "accuracy", "completeness", "clarity"]:
        if metric in scores:
            langfuse.score(
                trace_id=trace_id,
                name=f"llm_judge_{metric}",
                value=float(scores[metric]) / 10.0
            )
    
    langfuse.flush()
    
    return scores


@observe(name="judge_evaluation", as_type="generation")
def judge_generation(question: str, expected: str, response: str) -> Dict[str, Any]:
    """Generate judgment from LLM."""
    langfuse_context.update_current_observation(
        model=OLLAMA_MODEL,
        input={
            "question": question,
            "expected": expected,
            "response": response[:500]  # Truncate for context
        }
    )
    
    # Call LLM judge
    prompt = JUDGE_PROMPT.format(
        question=question,
        expected=expected,
        response=response
    )
    
    judge_response = llm.invoke(prompt)
    
    langfuse_context.update_current_observation(output=judge_response)
    
    # Parse scores (with fallback)
    try:
        # Try to extract JSON from response
        import re
        json_match = re.search(r'\{[^{}]+\}', judge_response, re.DOTALL)
        if json_match:
            scores = json.loads(json_match.group())
        else:
            scores = {"overall": 5.0, "explanation": "Could not parse evaluation"}
    except:
        scores = {"overall": 5.0, "explanation": "Evaluation parsing failed"}
    
    return scores

print("LLM-as-Judge evaluator defined")

In [None]:
# Run evaluation on our test results
print("Running LLM-as-Judge evaluation on results...\n")

for i, (result, test_item) in enumerate(zip(results[:3], TEST_ITEMS[:3]), 1):  # Evaluate first 3
    print(f"Evaluating Query {i}: {result['question']}")
    
    scores = llm_judge_evaluate(
        question=result['question'],
        expected=test_item['expected_output'],
        response=result['answer'],
        trace_id=result['trace_id'],
    )
    
    print(f"  Evaluation: {scores}")
    print()

langfuse.flush()

## 9. Custom Evaluators: Programmatic Metrics

Create custom evaluators for specific metrics like:
- Answer length
- Keyword coverage
- Source relevance

In [None]:
def custom_evaluators(
    result: Dict[str, Any],
    expected_keywords: List[str],
) -> Dict[str, float]:
    """
    Custom programmatic evaluators for RAG responses.
    """
    trace_id = result['trace_id']
    answer = result['answer'].lower()
    
    scores = {}
    
    # 1. Answer Length Score (normalized)
    # Ideal length: 100-500 chars
    length = len(result['answer'])
    if length < 50:
        length_score = 0.3
    elif length < 100:
        length_score = 0.6
    elif length <= 500:
        length_score = 1.0
    else:
        length_score = max(0.5, 1.0 - (length - 500) / 1000)
    
    scores['length_score'] = length_score
    langfuse.score(
        trace_id=trace_id,
        name="answer_length",
        value=length_score,
        comment=f"Response length: {length} chars"
    )
    
    # 2. Keyword Coverage Score
    keywords_found = sum(1 for kw in expected_keywords if kw.lower() in answer)
    keyword_score = keywords_found / len(expected_keywords) if expected_keywords else 0
    
    scores['keyword_coverage'] = keyword_score
    langfuse.score(
        trace_id=trace_id,
        name="keyword_coverage",
        value=keyword_score,
        comment=f"Found {keywords_found}/{len(expected_keywords)} keywords"
    )
    
    # 3. Response Time Score
    total_time = result['metrics']['total_time']
    if total_time < 2:
        time_score = 1.0
    elif total_time < 5:
        time_score = 0.8
    elif total_time < 10:
        time_score = 0.6
    else:
        time_score = max(0.2, 1.0 - total_time / 30)
    
    scores['response_time'] = time_score
    langfuse.score(
        trace_id=trace_id,
        name="response_time",
        value=time_score,
        comment=f"Total time: {total_time:.2f}s"
    )
    
    # 4. Context Utilization Score
    # Check if answer uses information from context
    context_words = set(result['context'].lower().split())
    answer_words = set(answer.split())
    overlap = len(context_words.intersection(answer_words))
    context_score = min(1.0, overlap / 20)  # Normalize
    
    scores['context_utilization'] = context_score
    langfuse.score(
        trace_id=trace_id,
        name="context_utilization",
        value=context_score,
        comment=f"Word overlap with context: {overlap}"
    )
    
    langfuse.flush()
    
    return scores

print("Custom evaluators defined")

In [None]:
# Apply custom evaluators to results
EXPECTED_KEYWORDS = {
    0: ["supervised", "unsupervised", "reinforcement"],
    1: ["backpropagation", "gradient", "neural", "weights"],
    2: ["retrieval", "augmented", "generation", "llm"],
    3: ["tokenization", "sentiment", "translation", "nlp"],
    4: ["transformer", "attention", "gpt", "bert"]
}

print("Running custom evaluations...\n")

for i, result in enumerate(results):
    keywords = EXPECTED_KEYWORDS.get(i, [])
    scores = custom_evaluators(result, keywords)
    
    print(f"Query {i+1}: {result['question'][:50]}...")
    print(f"  Length: {scores['length_score']:.2f}")
    print(f"  Keywords: {scores['keyword_coverage']:.2f}")
    print(f"  Time: {scores['response_time']:.2f}")
    print(f"  Context: {scores['context_utilization']:.2f}")
    print()

## 10. Running Dataset Experiment

Run the RAG system against the dataset and track results.

In [None]:
def run_dataset_experiment(
    dataset_name: str,
    run_name: str,
    vectorstore: Chroma,
):
    """
    Run experiment against a dataset.
    """
    # Get dataset items
    dataset = langfuse.get_dataset(dataset_name)
    
    print(f"Running experiment '{run_name}' on dataset '{dataset_name}'")
    print(f"Total items: {len(dataset.items)}\n")
    
    experiment_results = []
    
    for item in dataset.items:
        question = item.input.get("question", "")
        expected = item.expected_output
        
        print(f"Processing: {question[:50]}...")
        
        # Run RAG query
        result = rag_query(
            question=question,
            vectorstore=vectorstore,
            session_id=f"experiment_{run_name}"
        )
        
        # Link to dataset run
        item.link(
            trace_id=result['trace_id'],
            run_name=run_name
        )
        
        # Simple accuracy check
        expected_lower = expected.lower() if expected else ""
        answer_lower = result['answer'].lower()
        
        # Check keyword overlap
        expected_words = set(expected_lower.split())
        answer_words = set(answer_lower.split())
        overlap = len(expected_words.intersection(answer_words))
        accuracy = overlap / len(expected_words) if expected_words else 0
        
        # Score the result
        langfuse.score(
            trace_id=result['trace_id'],
            name="dataset_accuracy",
            value=min(1.0, accuracy),
            comment=f"Keyword overlap: {overlap}/{len(expected_words)}"
        )
        
        experiment_results.append({
            "question": question,
            "expected": expected,
            "actual": result['answer'][:200],
            "accuracy": accuracy,
            "trace_id": result['trace_id']
        })
        
        print(f"  Accuracy: {accuracy:.2f}")
    
    langfuse.flush()
    
    # Summary
    avg_accuracy = sum(r['accuracy'] for r in experiment_results) / len(experiment_results)
    print(f"\n{'='*50}")
    print(f"Experiment Complete: {run_name}")
    print(f"Average Accuracy: {avg_accuracy:.2%}")
    print(f"{'='*50}")
    
    return experiment_results

In [None]:
# Run the experiment
experiment_results = run_dataset_experiment(
    dataset_name=DATASET_NAME,
    run_name=f"rag_v1_{datetime.now().strftime('%Y%m%d_%H%M%S')}",
    vectorstore=vectorstore,
)

## 11. Human Annotation Support

Add scores that can be used for human annotation workflows.

In [None]:
def add_human_annotation_placeholder(trace_id: str):
    """
    Add placeholder scores that can be updated through human annotation in Langfuse UI.
    """
    # These scores can be updated via Langfuse UI annotation queues
    annotation_categories = [
        ("human_accuracy", "Is the answer factually correct?"),
        ("human_helpfulness", "Is the answer helpful and complete?"),
        ("human_relevance", "Does it directly answer the question?"),
        ("human_safety", "Is the response appropriate and safe?")
    ]
    
    for score_name, comment in annotation_categories:
        langfuse.score(
            trace_id=trace_id,
            name=score_name,
            value=0.5,  # Placeholder - to be updated by human annotator
            comment=f"PENDING HUMAN REVIEW: {comment}"
        )
    
    langfuse.flush()
    print(f"Added annotation placeholders for trace {trace_id}")

# Add annotation placeholders to first result
if results:
    add_human_annotation_placeholder(results[0]['trace_id'])

## 12. Summary and Dashboard Information

Access Langfuse dashboard to view:
- **Traces**: Full query execution paths
- **Observations**: Individual operations (spans, generations)
- **Dashboards**: Aggregate metrics and trends
- **Datasets**: Test cases and experiments
- **Annotation Queues**: Human review workflows

In [None]:
# Final flush
langfuse.flush()

print("="*60)
print("RAG APPLICATION WITH LANGFUSE - COMPLETE")
print("="*60)
print(f"\nLangfuse Dashboard: {LANGFUSE_HOST}")
print(f"\nKey entities to explore:")
print(f"  - Traces: View full execution paths")
print(f"  - Observations: See spans and generations")
print(f"  - Scores: Review quality metrics")
print(f"  - Datasets: {DATASET_NAME}")
print(f"  - Sessions: demo_session_*")
print(f"\nUser ID for filtering: {USER_ID}")
print(f"\nTest queries executed: {len(results)}")
print(f"Dataset items: {len(TEST_ITEMS)}")

In [None]:
# Print trace IDs for easy access
print("\nTrace IDs for review:")
for i, result in enumerate(results, 1):
    print(f"  {i}. {result['trace_id']} - {result['question'][:40]}...")

---

## Langfuse UI Screenshots Guide

After running this notebook, take screenshots of:

1. **Traces (General View)**: Shows all traces with timing and status
2. **Trace Detail (Expanded)**: Shows spans, generations, and events within a trace
3. **Observations**: Individual spans and generations
4. **Dashboards**: Aggregate metrics, latency, token usage
5. **LLM-as-Judge**: Traces with evaluation scores
6. **Annotation Queue**: Human review workflow (if configured)
7. **Custom Evaluator**: Score distributions and trends