# End-to-End Vector Search Evaluation with Databricks & MLflow

This notebook demonstrates:
1. **Vector Database Setup**: Create Delta Sync indexes that automatically generate embeddings from source columns
2. **Retrieval Evaluation**: Compare hybrid vs dense retrieval using MLflow
3. **Performance Analysis**: Analyze and visualize comparative results

**Models Evaluated:**
- `databricks-gte-large-en`
- `databricks-bge-large-en`

**Retrieval Types:**
- Hybrid Search (combines keyword + semantic)
- Dense Retrieval (semantic only)

**Key Architecture:**
- Delta table with CDC enabled
- Delta Sync indexes with `embedding_source_column` + `embedding_model_endpoint_name`
- Automatic embedding generation (no manual embedding tables!)
- Hybrid search: Just pass `query_text` 
- Dense search: Generate query embedding + pass `query_vector`

## Setup & Installation

In [0]:
%pip install "datasets<4.0.0" databricks-vectorsearch mlflow pandas numpy matplotlib seaborn --quiet
dbutils.library.restartPython()

In [0]:
import json
import pandas as pd
import numpy as np
from datetime import datetime
from typing import List, Dict, Any

# Databricks imports
from databricks.vector_search.client import VectorSearchClient
from databricks.sdk import WorkspaceClient
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, monotonically_increasing_id

# MLflow imports
import mlflow
import mlflow.data
from mlflow.metrics.genai import relevance, answer_similarity

# Visualization
import matplotlib.pyplot as plt
import seaborn as sns

# Initialize clients
w = WorkspaceClient()
vsc = VectorSearchClient()

print("✓ All libraries imported successfully")

## Configuration

In [0]:
# Catalog and schema configuration
CATALOG = "main"  # Update with your catalog
SCHEMA = "default"  # Update with your schema
TABLE_NAME = "wikipedia_docs"
VECTOR_SEARCH_ENDPOINT = "vector_search_endpoint"  # Update with your endpoint name

# Embedding models to compare
EMBEDDING_MODELS = [
    "databricks-gte-large-en",
    "databricks-bge-large-en"
]

# MLflow experiment
EXPERIMENT_NAME = "/Users/{}/vector_search_evaluation_new".format(
    spark.sql("SELECT current_user()").collect()[0][0]
)

# Sample size for evaluation
SAMPLE_SIZE = 1000
EVAL_SAMPLE_SIZE = 50  # Number of queries to evaluate

print(f"Catalog: {CATALOG}")
print(f"Schema: {SCHEMA}")
print(f"Experiment: {EXPERIMENT_NAME}")

# Step 1: Vector Database Instantiation

### 1.1 Load Dataset
We'll use a subset of Wikipedia articles as our document corpus

In [0]:
from datasets import load_dataset

# Load a sample Wikipedia dataset
print("Loading Wikipedia dataset...")
dataset = load_dataset("wikipedia", "20220301.en", split="train", streaming=True, cache_dir="/tmp/hf_cache2", trust_remote_code=True)

# Take first 1000 documents
documents = []
for i, doc in enumerate(dataset):
    if i >= SAMPLE_SIZE:
        break
    documents.append({
        "id": str(i),
        "title": doc["title"],
        "text": doc["text"][:1000],  # Truncate to first 1000 chars
        "url": doc["url"]
    })
    if (i + 1) % 100 == 0:
        print(f"Loaded {i + 1} documents...")

print(f"✓ Loaded {len(documents)} documents")

# Display sample
df_docs = pd.DataFrame(documents)
display(df_docs.head())

In [0]:

from pyspark.sql.functions import monotonically_increasing_id

# Convert to Spark DataFrame
spark_df = spark.createDataFrame(df_docs)

# Add unique identifier using monotonically_increasing_id
spark_df = spark_df.withColumn("id", monotonically_increasing_id())

# Create Delta table with Change Data Feed enabled (required for Delta Sync)
full_table_name = f"{CATALOG}.{SCHEMA}.{TABLE_NAME}"

# Drop table if exists (for demo purposes)
spark.sql(f"DROP TABLE IF EXISTS {full_table_name}")

# Write to Delta with CDC enabled
spark_df.write \
    .format("delta") \
    .mode("overwrite") \
    .option("delta.enableChangeDataFeed", "true") \
    .saveAsTable(full_table_name)

print(f"✓ Created Delta table: {full_table_name}")
print(f"  Total documents: {spark.table(full_table_name).count()}")
print(f"  Change Data Feed: ENABLED")

# Display sample
display(spark.table(full_table_name).limit(5))

### 1.2 Create Vector Search Endpoint
A Vector Search endpoint is required to host the indexes

In [0]:
try:
    # Try to get existing endpoint
    endpoint = vsc.get_endpoint(VECTOR_SEARCH_ENDPOINT)
    print(f"✓ Using existing endpoint: {VECTOR_SEARCH_ENDPOINT}")
except Exception as e:
    print(f"Creating new endpoint: {VECTOR_SEARCH_ENDPOINT}")
    vsc.create_endpoint(
        name=VECTOR_SEARCH_ENDPOINT,
        endpoint_type="STANDARD"
    )
    print(f"✓ Created endpoint: {VECTOR_SEARCH_ENDPOINT}")

### 1.3 Create Vector Search Indexes with Automatic Embeddings

Delta Sync indexes automatically generate embeddings using the specified model endpoint.
No need to manually create embedding tables!

In [0]:
import time

indexes_info = {}

for model_name in EMBEDDING_MODELS:
    index_name = f"{CATALOG}.{SCHEMA}.{TABLE_NAME}_{model_name.replace('-', '_')}_index2"
    
    print(f"\nProcessing index for {model_name}...")
    print(f"  Index name: {index_name}")
    
    try:
        # Check if index already exists
        try:
            existing_index = vsc.get_index(index_name=index_name)
            index_status = existing_index.describe()
            state = index_status.get("status", {}).get("state", "Ready")
            
            print(f"  ✓ Index already exists (state: {state})")
            
            # Add to indexes_info without recreating
            indexes_info[model_name] = {
                "index_name": index_name,
                "model": model_name,
                "source_table": full_table_name
            }
            continue  # Skip to next model
            
        except Exception as e:
            # Index doesn't exist, proceed with creation
            if "NOT_FOUND" in str(e) or "RESOURCE_DOES_NOT_EXIST" in str(e):
                print(f"  Index not found, creating new index...")
            else:
                # Some other error, re-raise
                raise
        
        # Create Delta Sync index with automatic embedding generation
        print(f"  Source table: {full_table_name}")
        print(f"  Embedding source: text column")
        
        index = vsc.create_delta_sync_index(
            endpoint_name=VECTOR_SEARCH_ENDPOINT,
            source_table_name=full_table_name,
            index_name=index_name,
            pipeline_type="TRIGGERED",
            primary_key="id",
            embedding_source_column="text",
            embedding_model_endpoint_name=model_name
        )
        
        print(f"  ✓ Created index: {index_name}")
        print(f"  ✓ Embeddings will be auto-generated from 'text' column")
        print(f"  ✓ Using model: {model_name}")
        
        indexes_info[model_name] = {
            "index_name": index_name,
            "model": model_name,
            "source_table": full_table_name
        }
        
    except Exception as e:
        print(f"  ✗ Error: {str(e)}")
        raise

print(f"\n✓ Processed {len(indexes_info)} vector search indexes")
print("\nNote: New indexes are syncing and generating embeddings in the background...")

In [0]:
print("Waiting for indexes to sync and generate embeddings...")
print("(This may take several minutes for 1K documents)\n")

for model_name, info in indexes_info.items():
    index = vsc.get_index(index_name=info["index_name"])
    
    # Wait for index to be ready
    max_wait = 600  # 10 minutes (embedding generation takes time)
    wait_time = 0
    last_status = None
    
    while wait_time < max_wait:
        try:
            status = index.describe()
            index_status = status.get("status", {})
            state = index_status.get("ready", False)
            
            if state:
                row_count = index_status.get("indexed_row_count", 0)
                print(f"✓ {model_name}: READY ({row_count} documents indexed)")
                break
            
            current_status = index_status.get("message", "Syncing...")
            if current_status != last_status:
                print(f"  {model_name}: {current_status}")
                last_status = current_status
            
        except Exception as e:
            print(f"  {model_name}: Checking status... ({wait_time}s)")
        
        time.sleep(15)
        wait_time += 15
    
    if wait_time >= max_wait:
        print(f"⚠ {model_name}: Sync timeout - but continuing (check index status manually)")

print("\n✓ All indexes ready for querying")

# Step 2: Define Retrieval Functions & Evaluate with MLflow

### 2.1 Create Evaluation Dataset
Generate queries based on document titles and texts

In [0]:
np.random.seed(42)

# Sample documents for evaluation
eval_indices = np.random.choice(len(documents), EVAL_SAMPLE_SIZE, replace=False)
eval_docs = [documents[i] for i in eval_indices]

# Generate queries (using title + first sentence of text)
eval_data = []
for doc in eval_docs:
    # Create a question based on the document
    first_sentence = doc["text"].split(".")[0] if "." in doc["text"] else doc["text"][:100]
    query = f"What is information about {doc['title']}?"
    
    eval_data.append({
        "query": query,
        "ground_truth_doc_id": doc["id"],
        "ground_truth_title": doc["title"],
        "ground_truth_text": doc["text"]
    })

eval_df = pd.DataFrame(eval_data)
print(f"✓ Generated {len(eval_df)} evaluation queries")
display(eval_df.head())

### 2.2 Define Retrieval Functions

In [0]:
#Testing retrieval
results = index.similarity_search(
            columns=["id", "title", "text", "url"],
            query_text="What is information about Astrology?",  # Triggers: embedding generation + BM25 + fusion
            num_results=5,
            disable_notice=True
        )
results

### 2.3 Run Evaluation with MLflow

In [0]:
import mlflow
import pandas as pd
from typing import List
from mlflow.genai import scorer
from mlflow.entities import Feedback
import numpy as np
from functools import partial

mlflow.set_experiment(EXPERIMENT_NAME)

# Prepare data
eval_df = pd.DataFrame(eval_data)
eval_df["ground_truth"] = eval_df["ground_truth_doc_id"].apply(lambda x: [f"{float(x):.1f}"])

# Define custom retrieval scorers (without @scorer decorator for the base functions)
def recall_at_k(outputs: List[str], expectations: dict, k: int = 5) -> Feedback:
    """Calculate recall@k for retrieval results"""
    ground_truth = set(expectations.get("expected_response", []))
    retrieved = set(outputs[:k])
    
    if len(ground_truth) == 0:
        return Feedback(value=0.0, rationale="No ground truth provided")
    
    hits = len(ground_truth & retrieved)
    recall = hits / len(ground_truth)
    
    return Feedback(
        value=recall,
        rationale=f"Retrieved {hits}/{len(ground_truth)} relevant documents in top-{k}"
    )

def precision_at_k(outputs: List[str], expectations: dict, k: int = 5) -> Feedback:
    """Calculate precision@k for retrieval results"""
    ground_truth = set(expectations.get("expected_response", []))
    retrieved = outputs[:k]
    
    if len(retrieved) == 0:
        return Feedback(value=0.0, rationale="No documents retrieved")
    
    hits = len(set(retrieved) & ground_truth)
    precision = hits / len(retrieved)
    
    return Feedback(
        value=precision,
        rationale=f"Found {hits} relevant documents in {len(retrieved)} retrieved"
    )

def ndcg_at_k(outputs: List[str], expectations: dict, k: int = 10) -> Feedback:
    """Calculate NDCG@k for retrieval results"""
    ground_truth = set(expectations.get("expected_response", []))
    retrieved = outputs[:k]
    
    # Calculate DCG
    dcg = 0.0
    for i, doc_id in enumerate(retrieved):
        if doc_id in ground_truth:
            # Relevance is 1 if document is in ground truth
            dcg += 1.0 / np.log2(i + 2)  # i+2 because position starts at 0
    
    # Calculate IDCG (perfect ranking)
    idcg = sum(1.0 / np.log2(i + 2) for i in range(min(len(ground_truth), k)))
    
    if idcg == 0:
        return Feedback(value=0.0, rationale="No relevant documents in ground truth")
    
    ndcg = dcg / idcg
    return Feedback(
        value=ndcg,
        rationale=f"NDCG@{k} = {ndcg:.4f} (DCG={dcg:.4f}, IDCG={idcg:.4f})"
    )

# Create decorated scorer variants using wrapper functions
@scorer
def recall_1(outputs, expectations):
    return recall_at_k(outputs, expectations, k=1)

@scorer
def recall_3(outputs, expectations):
    return recall_at_k(outputs, expectations, k=3)

@scorer
def recall_5(outputs, expectations):
    return recall_at_k(outputs, expectations, k=5)

@scorer
def recall_10(outputs, expectations):
    return recall_at_k(outputs, expectations, k=10)

@scorer
def precision_5(outputs, expectations):
    return precision_at_k(outputs, expectations, k=5)

@scorer
def ndcg_10(outputs, expectations):
    return ndcg_at_k(outputs, expectations, k=10)

def retrieve_documents(query: str, index_name: str, query_type: str = "hybrid", k: int = 10) -> List[str]:
    """Retrieve documents from vector index"""
    index = vsc.get_index(index_name=index_name)
    results = index.similarity_search(
        columns=["id"],
        query_text=query,
        num_results=k,
        query_type=query_type,
        disable_notice=True
    )
    data_array = results.get("result", {}).get("data_array", [])
    doc_ids = [f"{float(row[0]):.1f}" for row in data_array]
    return doc_ids

# Compare both models
model_comparison = {}
all_eval_results = {}  # Store detailed results for downstream analysis

In [0]:
for query_type in ["FULL_TEXT", "hybrid", "ANN"]:
    for model_name, info in indexes_info.items():
        print(f"\n{'='*70}")
        print(f"EVALUATING: {model_name} - {query_type}")
        print(f"{'='*70}")
        
        # Prepare evaluation data with retrieved documents
        eval_data_with_outputs = []
        for row in eval_data:
            retrieved_docs = retrieve_documents(
                row["query"], 
                info['index_name'], 
                query_type, 
                10
            )
            eval_data_with_outputs.append({
                "inputs": {"query": row["query"]},
                "outputs": retrieved_docs,
                "expectations": {
                    "expected_response": [f"{float(row['ground_truth_doc_id']):.1f}"]
                }
            })
        
        with mlflow.start_run(run_name=f"retrieval_{model_name}_{query_type}"):
            mlflow.log_param("model", model_name)
            mlflow.log_param("index", info['index_name'])
            mlflow.log_param("query_type", query_type)
            
            # Run evaluation with custom scorers
            results = mlflow.genai.evaluate(
                data=eval_data_with_outputs,
                scorers=[recall_1, recall_3, recall_5, recall_10, precision_5, ndcg_10]
            )
            
            # Store detailed results for analysis
            run_key = f"{model_name}_{query_type}"
            all_eval_results[run_key] = results
            
            # Extract aggregate metrics
            model_comparison[run_key] = {
                "recall@1": results.metrics["recall_1/mean"],
                "recall@3": results.metrics["recall_3/mean"],
                "recall@5": results.metrics["recall_5/mean"],
                "recall@10": results.metrics["recall_10/mean"],
                "precision@5": results.metrics["precision_5/mean"],
                "ndcg@10": results.metrics["ndcg_10/mean"]
            }
            
            print(f"\nResults:")
            for k, v in model_comparison[run_key].items():
                print(f"  {k}: {v:.4f}")

# Final comparison
print("\n" + "="*70)
print("MODEL COMPARISON")
print("="*70)
comparison_df = pd.DataFrame(model_comparison).T
print(comparison_df)

print("\nWinner by metric:")
for metric in comparison_df.columns:
    winner = comparison_df[metric].idxmax()
    print(f"  {metric}: {winner}")
    
print("\n✓ Evaluation complete! Results stored in 'all_eval_results' for downstream analysis")

### 2.3 Analyze Individual Examples

Examine which specific queries caused performance issues.

In [0]:
# Get all runs from the experiment
experiment = mlflow.get_experiment_by_name(EXPERIMENT_NAME)
runs_df = mlflow.search_runs(
    experiment_ids=[experiment.experiment_id],
    order_by=["start_time DESC"]
)

print(f"✓ Retrieved {len(runs_df)} runs from MLflow experiment")
display(runs_df[["run_id", "tags.mlflow.runName", "params.model", "params.query_type", 
                 "metrics.recall_1/mean", "metrics.recall_5/mean", "metrics.ndcg_10/mean"]])

In [0]:
experiment = mlflow.get_experiment_by_name(EXPERIMENT_NAME)
traces = mlflow.search_traces(
    experiment_ids=[experiment.experiment_id],
    run_id= "6fec0c66fbbd49899d5acb821e8037e5"
)
display(traces)

In [0]:
# Filter for traces where recall_1 < 1.0
bad_recall_1 = traces[
    traces['assessments'].apply(
        lambda assessments: any(
            a['assessment_name'] == 'recall_1' and a.get('feedback', {}).get('value', 1) < 1.0
            for a in assessments
        )
    )
]

display(bad_recall_1)

In [0]:
#Testing retrieval
index = vsc.get_index(index_name="main.default.wikipedia_docs_databricks_bge_large_en_index2")
results = index.similarity_search(
            columns=["id", "title", "text", "url"],
            query_text="What is information about Artificial intelligence?", 
            num_results=5,
            disable_notice=True,
            query_type="hybrid"
        )
results