# 🧪 Intelligent RAG Chatbot - Batch Evaluation

This notebook performs comprehensive batch evaluation of the Intelligent Multi-Stage RAG Chatbot using TruLens and Snowflake AI Observability.

**Purpose**: Run systematic evaluations with RAG triad metrics on legal document analysis tasks.

**Prerequisites**:
- Install packages: `trulens-core==1.5.2`, `trulens-providers-cortex==1.5.2`, `trulens-connectors-snowflake==1.5.2`
- Ensure RAG objects exist (CS_DOCUMENTS_METADATA, CS_DOCUMENTS_CHUNKS)
- Required privileges: SNOWFLAKE.CORTEX_USER, AI_OBSERVABILITY_EVENTS_LOOKUP

**Why Notebook?**: TruLens batch evaluation works better in notebook environment due to execution context requirements.


## 🛠️ Setup Environment


In [None]:
import os
import json
from typing import Dict, List, Any
from snowflake.snowpark.context import get_active_session
from snowflake.core import Root
from snowflake.cortex import Complete

# Enable TruLens OpenTelemetry tracing
os.environ["TRULENS_OTEL_TRACING"] = "1"

# TruLens imports
from trulens.core.otel.instrument import instrument
from trulens.otel.semconv.trace import SpanAttributes
from trulens.apps.app import TruApp
from trulens.connectors.snowflake import SnowflakeConnector
from trulens.core.run import Run, RunConfig

# Get active session
session = get_active_session()
print(f"✅ Connected to Snowflake: {session.get_current_database()}.{session.get_current_schema()}")


## 📊 Setup Observability Database & Schema


In [None]:
CREATE DATABASE IF NOT EXISTS AI_OBSERVABILITY_DB;
CREATE SCHEMA IF NOT EXISTS AI_OBSERVABILITY_DB.EVALUATION_SCHEMA;
USE SCHEMA AI_OBSERVABILITY_DB.EVALUATION_SCHEMA;


In [None]:
# Verify observability context
current_db = session.get_current_database()
current_schema = session.get_current_schema()
print(f"📊 Observability Context: {current_db}.{current_schema}")


## 🤖 Create Instrumented RAG Class

Recreate the RAG logic with proper TruLens instrumentation for notebook execution.


In [None]:
class NotebookRAGChatbot:
    """
    Simplified RAG chatbot for notebook-based batch evaluation.
    """
    
    def __init__(self, session):
        self.session = session
        self.root = Root(session)
        
        # Store original context to restore later
        self.original_db = session.get_current_database()
        self.original_schema = session.get_current_schema()
        
        print(f"🔍 RAG will use context: {self.original_db}.{self.original_schema}")
    
    @instrument(
        span_type=SpanAttributes.SpanType.RETRIEVAL,
        attributes={
            SpanAttributes.RETRIEVAL.QUERY_TEXT: "query",
            SpanAttributes.RETRIEVAL.RETRIEVED_CONTEXTS: "return"
        }
    )
    def query_metadata_search_service(self, query: str, limit: int = 10) -> List[Dict]:
        """Query metadata search service."""
        try:
            # Temporarily switch to RAG context
            self.session.sql(f"USE SCHEMA {self.original_db}.{self.original_schema}").collect()
            
            search_service = (
                self.root.databases[self.original_db]
                .schemas[self.original_schema]
                .cortex_search_services["CS_DOCUMENTS_METADATA"]
            )
            
            response = search_service.search(
                query=query,
                columns=["FILENAME", "FILE_URL", "CONTENT_METADATA"],
                limit=limit
            )
            
            # Switch back to observability context
            self.session.sql("USE SCHEMA AI_OBSERVABILITY_DB.EVALUATION_SCHEMA").collect()
            
            return response.results if response.results else []
        except Exception as e:
            print(f"Error in metadata search: {e}")
            return []
    
    @instrument(
        span_type=SpanAttributes.SpanType.RETRIEVAL,
        attributes={
            SpanAttributes.RETRIEVAL.QUERY_TEXT: "query",
            SpanAttributes.RETRIEVAL.RETRIEVED_CONTEXTS: "return"
        }
    )
    def query_chunks_search_service(self, query: str, relevant_filenames: List[str], limit: int = 5) -> List[Dict]:
        """Query chunks search service with filtering."""
        try:
            # Temporarily switch to RAG context
            self.session.sql(f"USE SCHEMA {self.original_db}.{self.original_schema}").collect()
            
            search_service = (
                self.root.databases[self.original_db]
                .schemas[self.original_schema]
                .cortex_search_services["CS_DOCUMENTS_CHUNKS"]
            )
            
            if len(relevant_filenames) == 1:
                filter_dict = {"@eq": {"FILENAME": relevant_filenames[0]}}
            else:
                filter_dict = {"@or": [{"@eq": {"FILENAME": filename}} for filename in relevant_filenames]}
            
            response = search_service.search(
                query=query,
                columns=["FILENAME", "CHUNK_TEXT", "FILE_URL"],
                filter=filter_dict,
                limit=limit
            )
            
            # Switch back to observability context
            self.session.sql("USE SCHEMA AI_OBSERVABILITY_DB.EVALUATION_SCHEMA").collect()
            
            return response.results if response.results else []
        except Exception as e:
            print(f"Error in chunks search: {e}")
            return []
    
    @instrument(span_type=SpanAttributes.SpanType.GENERATION)
    def generate_completion(self, user_question: str, context_str: str) -> str:
        """Generate completion using Cortex LLM."""
        prompt = f"""
        You are an expert legal document analysis assistant. Based on the provided context from legal contracts,
        provide a comprehensive and accurate answer to the user's question.
        
        Context: {context_str}
        
        Question: {user_question}
        
        Answer:
        """
        
        return Complete("mistral-large2", prompt)
    
    @instrument(
        span_type=SpanAttributes.SpanType.RECORD_ROOT,
        attributes={
            SpanAttributes.RECORD_ROOT.INPUT: "user_question",
            SpanAttributes.RECORD_ROOT.OUTPUT: "return"
        }
    )
    def query(self, user_question: str) -> str:
        """Main query method for batch evaluation."""
        # Step 1: Get relevant documents from metadata search
        metadata_results = self.query_metadata_search_service(user_question)
        
        if not metadata_results:
            return "I couldn't find relevant documents for your query."
        
        # Step 2: Extract filenames
        relevant_filenames = [result["FILENAME"] for result in metadata_results[:3]]  # Top 3
        
        # Step 3: Get chunks from relevant documents
        chunk_results = self.query_chunks_search_service(user_question, relevant_filenames)
        
        if not chunk_results:
            return "I found relevant documents but couldn't extract specific information."
        
        # Step 4: Build context and generate response
        context_chunks = [result["CHUNK_TEXT"] for result in chunk_results]
        context_str = "\n\n".join(context_chunks)
        
        response = self.generate_completion(user_question, context_str)
        
        return response

# Create RAG instance
rag_chatbot = NotebookRAGChatbot(session)
print("✅ RAG Chatbot created successfully")


## 🧪 Test RAG Functionality


In [None]:
# Test the RAG with a sample query
test_response = rag_chatbot.query("What are the termination clauses in the distributor agreement?")
print("🎯 Test Response:")
print(test_response[:500] + "..." if len(test_response) > 500 else test_response)


## 📝 Register App with TruLens


In [None]:
# Register the app with TruLens
tru_connector = SnowflakeConnector(snowpark_session=session)

app_name = "intelligent_rag_chatbot"
app_version = "notebook_evaluation"

tru_app = TruApp(
    rag_chatbot,
    app_name=app_name,
    app_version=app_version,
    connector=tru_connector
)

print(f"✅ TruApp registered: {app_name} v{app_version}")


## 📊 Create Evaluation Dataset


In [None]:
CREATE TABLE IF NOT EXISTS LEGAL_RAG_EVALUATION_DATASET (
    query STRING,
    ground_truth_response STRING,
    category STRING,
    difficulty STRING
);


In [None]:
# Sample evaluation queries for legal document analysis
evaluation_data = [
    {
        "query": "What are the termination clauses in the NETGEAR distributor agreement?",
        "ground_truth_response": "The NETGEAR distributor agreement contains specific termination provisions that allow for termination with notice under certain conditions.",
        "category": "termination",
        "difficulty": "medium"
    },
    {
        "query": "Compare the IP ownership provisions between the development and endorsement agreements",
        "ground_truth_response": "The development agreement typically assigns IP rights to the developer, while the endorsement agreement maintains IP with the original owner.",
        "category": "comparison",
        "difficulty": "hard"
    },
    {
        "query": "What are the liability limitations in the hosting agreement?",
        "ground_truth_response": "The hosting agreement includes liability caps and exclusions for certain types of damages.",
        "category": "liability",
        "difficulty": "medium"
    },
    {
        "query": "What governing law provisions are common across all agreements?",
        "ground_truth_response": "Most agreements specify the jurisdiction and governing law for dispute resolution.",
        "category": "general",
        "difficulty": "easy"
    },
    {
        "query": "How do renewal terms vary across different contract types?",
        "ground_truth_response": "Renewal terms differ based on contract type, with some automatic renewals and others requiring explicit agreement.",
        "category": "renewal",
        "difficulty": "medium"
    }
]

# Check if data exists
count_result = session.sql("SELECT COUNT(*) FROM LEGAL_RAG_EVALUATION_DATASET").collect()
current_count = count_result[0][0]

if current_count == 0:
    # Insert evaluation data
    for item in evaluation_data:
        session.sql(f"""
        INSERT INTO LEGAL_RAG_EVALUATION_DATASET 
        (query, ground_truth_response, category, difficulty)
        VALUES ('{item["query"]}', '{item["ground_truth_response"]}', '{item["category"]}', '{item["difficulty"]}')
        """).collect()
    
    print(f"✅ Inserted {len(evaluation_data)} evaluation queries")
else:
    print(f"📊 Found {current_count} existing evaluation queries")


In [None]:
SELECT * FROM LEGAL_RAG_EVALUATION_DATASET;


## 🚀 Create and Execute Evaluation Run


In [None]:
# Create evaluation run configuration
run_name = "legal_rag_batch_eval_v1"

run_config = RunConfig(
    run_name=run_name,
    dataset_name="LEGAL_RAG_EVALUATION_DATASET",
    description="Batch evaluation of intelligent multi-stage RAG on legal documents",
    label="legal_rag_notebook_eval",
    source_type="TABLE",
    dataset_spec={
        "input": "query",
        "ground_truth_output": "ground_truth_response",
    },
    llm_judge_name="mistral-large2"
)

# Add run to TruLens
run: Run = tru_app.add_run(run_config=run_config)
print(f"✅ Created evaluation run: {run_name}")


In [None]:
# Scale up warehouse for evaluation
import pandas as pd

current_wh = session.get_current_warehouse().strip('"')
print(f"🏭 Current warehouse: {current_wh}")

# Scale up to Large for evaluation
session.sql(f"ALTER WAREHOUSE {current_wh} SET WAREHOUSE_SIZE='LARGE'").collect()
print(f"📈 Scaled up warehouse: {current_wh} → LARGE")


In [None]:
# Start the evaluation run
print("🚀 Starting batch evaluation...")
print("This will execute the RAG on all test queries and collect traces.")

try:
    run.start()
    print("✅ Batch evaluation completed successfully!")
except Exception as e:
    print(f"❌ Evaluation failed: {e}")


In [None]:
# Compute RAG triad metrics
print("📊 Computing evaluation metrics...")

try:
    run.compute_metrics([
        "answer_relevance",
        "context_relevance",
        "groundedness",
    ])
    print("✅ Metrics computed successfully!")
    print("📈 RAG Triad metrics: Answer Relevance, Context Relevance, Groundedness")
except Exception as e:
    print(f"❌ Metrics computation failed: {e}")


In [None]:
# Scale warehouse back down
session.sql(f"ALTER WAREHOUSE {current_wh} SET WAREHOUSE_SIZE='X-SMALL'").collect()
print(f"📉 Scaled down warehouse: {current_wh} → X-SMALL")


## 📊 View Evaluation Results

To view the detailed evaluation results:

1. **Navigate to Snowsight** → **AI & ML** → **Evaluations**
2. **Select Application**: `intelligent_rag_chatbot`
3. **Select Run**: `legal_rag_batch_eval_v1`
4. **View Metrics**: RAG triad scores and detailed traces
5. **Analyze Results**: Individual query performance and overall statistics

### 🎯 What You'll See:

- **Answer Relevance**: How well responses address the user's question
- **Context Relevance**: Quality of retrieved documents/chunks
- **Groundedness**: How well responses are supported by retrieved context
- **Detailed Traces**: Step-by-step execution with timing and intermediate results
- **Comparative Analysis**: Performance across different query types and difficulties

### 📈 Success Criteria:

- **Answer Relevance** > 0.8: Responses directly address queries
- **Context Relevance** > 0.7: Retrieved documents are pertinent
- **Groundedness** > 0.8: Responses are well-supported by evidence

### 🔄 Hybrid Evaluation Approach:

- **Notebook**: Systematic batch evaluation with TruLens RAG triad metrics
- **Streamlit App**: Real-time user feedback and interactive experience
- **Combined Insights**: Professional evaluation + user satisfaction data

🎉 **Congratulations!** You've successfully run a comprehensive batch evaluation of your Intelligent Multi-Stage RAG Chatbot!
