## Hybrid Search Evaluation with Qdrant

This notebook demonstrates hybrid search combining dense embeddings and sparse BM25 vectors using Qdrant. We'll evaluate different search approaches and compare their performance using RRF (Reciprocal Rank Fusion).

Overview
Dense vectors: Capture semantic meaning, good for natural language queries
Sparse vectors (BM25): Excel at exact keyword matches, fast and lightweight
Hybrid Search: Combines both approaches for better overall performance

In [None]:
# Install required packages
!python -m pip install -q "qdrant-client[fastembed]>=1.14.2" pandas numpy

### Start Qdrant server in Docker (run this once)
!docker run -d -p 6333:6333 -p 6334:6334 \
   -v "./qdrant_storage:/qdrant/storage:z" \
   qdrant/qdrant

In [None]:
# Import required libraries
import json
import pandas as pd
import uuid
import os
import pickle
from typing import List, Dict, Any
import numpy as np
from qdrant_client import QdrantClient, models
import time
from datetime import datetime

In [None]:
# Load documents and ground truth data
with open('../data/processed/documents-with-ids.json', 'r') as f:
    documents = json.load(f)

df_ground_truth = pd.read_csv('../data/processed/ground-truth-retrieval.csv')
ground_truth = df_ground_truth.to_dict(orient='records')

print(f"Loaded {len(documents)} documents and {len(ground_truth)} ground truth questions")
print(f"First document keys: {list(documents[0].keys()) if documents else 'No documents'}")
print(f"Ground truth columns: {df_ground_truth.columns.tolist()}")

In [None]:
# Connect to Qdrant server
client = QdrantClient("http://localhost:6333")

# Test connection by listing collections
try:
    collections = client.get_collections()
    print(f"Connected to Qdrant. Existing collections: {[c.name for c in collections.collections]}")
except Exception as e:
    print(f"Failed to connect to Qdrant: {e}")
    print("Make sure Qdrant is running on localhost:6333")

In [None]:
# Examine data structure
print("Sample document:")
print(json.dumps(documents[0], indent=2))
print("\nSample ground truth entry:")
print(ground_truth[0])

### Create Sparse Vector Collection (BM25)

In [None]:
# Create collection for sparse vectors only
collection_name_sparse = "documents-sparse-bm25"

# Delete collection if it exists
try:
    client.delete_collection(collection_name_sparse)
    print(f"Deleted existing collection: {collection_name_sparse}")
except:
    pass

# Create new collection with sparse vector configuration
client.create_collection(
    collection_name=collection_name_sparse,
    sparse_vectors_config={
        "bm25": models.SparseVectorParams(
            modifier=models.Modifier.IDF,  # Enable IDF calculation for BM25
        )
    }
)
print(f"Created sparse collection: {collection_name_sparse}")

In [None]:
# Upload documents to sparse collection
def upload_sparse_vectors(documents: List[Dict], collection_name: str) -> None:
    """Upload documents as sparse vectors using BM25 model"""
    
    points = []
    for doc in documents:
        # Create point with sparse BM25 vector
        point = models.PointStruct(
            id=doc.get('id', uuid.uuid4().hex),
            vector={
                "bm25": models.Document(
                    text=doc["text"], 
                    model="Qdrant/bm25",
                ),
            },
            payload={
                "text": doc["text"],
                "section": doc.get("section", ""),
                "course": doc.get("course", ""),
                "question": doc.get("question", ""),
                "id": doc.get('id', uuid.uuid4().hex)
            }
        )
        points.append(point)
    
    # Upload in batches for better performance
    batch_size = 100
    for i in range(0, len(points), batch_size):
        batch = points[i:i+batch_size]
        client.upsert(collection_name=collection_name, points=batch)
        print(f"Uploaded batch {i//batch_size + 1}/{(len(points)-1)//batch_size + 1}")
    
    print(f"Successfully uploaded {len(points)} documents to {collection_name}")

# Upload documents
start_time = time.time()
upload_sparse_vectors(documents, collection_name_sparse)
upload_time_sparse = time.time() - start_time
print(f"Sparse vector upload took {upload_time_sparse:.2f} seconds")

### Sparse Vector Search Functions


In [None]:
def sparse_search(query: str, collection_name: str, limit: int = 5) -> List[models.ScoredPoint]:
    """Perform sparse vector search using BM25"""
    
    results = client.query_points(
        collection_name=collection_name,
        query=models.Document(
            text=query,
            model="Qdrant/bm25",
        ),
        using="bm25",
        limit=limit,
        with_payload=True,
    )
    
    return results.points

In [None]:
# Test sparse search
test_query = "How do I install dependencies?"
sparse_results = sparse_search(test_query, collection_name_sparse, limit=3)

print(f"Sparse search results for: '{test_query}'")
for i, result in enumerate(sparse_results, 1):
    print(f"\n{i}. Score: {result.score:.4f}")
    print(f"   Text: {result.payload['text'][:150]}...")
    print(f"   Section: {result.payload.get('section', 'N/A')}")

### Create Hybrid Collection (Dense + Sparse Vectors)

In [None]:
# Create collection for hybrid search (both dense and sparse vectors)
collection_name_hybrid = "documents-hybrid-search"

# Delete collection if it exists
try:
    client.delete_collection(collection_name_hybrid)
    print(f"Deleted existing collection: {collection_name_hybrid}")
except:
    pass

# Create hybrid collection with both vector types
client.create_collection(
    collection_name=collection_name_hybrid,
    vectors_config={
        # Dense vector configuration
        "jina-small": models.VectorParams(
            size=512,  # Dimension size for jina-embeddings-v2-small-en
            distance=models.Distance.COSINE,
        ),
    },
    sparse_vectors_config={
        # Sparse vector configuration  
        "bm25": models.SparseVectorParams(
            modifier=models.Modifier.IDF,
        )
    }
)
print(f"Created hybrid collection: {collection_name_hybrid}")

In [None]:
# Upload documents to hybrid collection with both vector types
def upload_hybrid_vectors(documents: List[Dict], collection_name: str) -> None:
    """Upload documents with both dense and sparse vectors"""
    
    points = []
    for doc in documents:
        # Create point with both dense and sparse vectors
        point = models.PointStruct(
            id=doc.get('id', uuid.uuid4().hex),
            vector={
                # Dense vector using Jina model
                "jina-small": models.Document(
                    text=doc["text"],
                    model="jinaai/jina-embeddings-v2-small-en",
                ),
                # Sparse vector using BM25
                "bm25": models.Document(
                    text=doc["text"], 
                    model="Qdrant/bm25",
                ),
            },
            payload={
                "text": doc["text"],
                "section": doc.get("section", ""),
                "course": doc.get("course", ""),
                "question": doc.get("question", ""),
                "id": doc.get('id', uuid.uuid4().hex)
            }
        )
        points.append(point)
    
    # Upload in batches
    batch_size = 50  # Smaller batches for hybrid upload
    for i in range(0, len(points), batch_size):
        batch = points[i:i+batch_size]
        client.upsert(collection_name=collection_name, points=batch)
        print(f"Uploaded hybrid batch {i//batch_size + 1}/{(len(points)-1)//batch_size + 1}")
    
    print(f"Successfully uploaded {len(points)} documents with hybrid vectors")

# Upload documents (this will take longer due to dense embeddings)
start_time = time.time()
upload_hybrid_vectors(documents, collection_name_hybrid)
upload_time_hybrid = time.time() - start_time
print(f"Hybrid vector upload took {upload_time_hybrid:.2f} seconds")

### Hybrid Search Functions


In [None]:
def dense_search(query: str, collection_name: str, limit: int = 5) -> List[models.ScoredPoint]:
    """Perform dense vector search using semantic embeddings"""
    
    results = client.query_points(
        collection_name=collection_name,
        query=models.Document(
            text=query,
            model="jinaai/jina-embeddings-v2-small-en",
        ),
        using="jina-small",
        limit=limit,
        with_payload=True,
    )
    
    return results.points

def multi_stage_search(query: str, collection_name: str, limit: int = 5) -> List[models.ScoredPoint]:
    """Multi-stage search: dense retrieval followed by sparse reranking"""
    
    results = client.query_points(
        collection_name=collection_name,
        prefetch=[
            models.Prefetch(
                query=models.Document(
                    text=query,
                    model="jinaai/jina-embeddings-v2-small-en",
                ),
                using="jina-small",
                limit=(10 * limit),  # Prefetch more candidates for reranking
            ),
        ],
        query=models.Document(
            text=query,
            model="Qdrant/bm25", 
        ),
        using="bm25",
        limit=limit,
        with_payload=True,
    )
    
    return results.points

def rrf_search(query: str, collection_name: str, limit: int = 5) -> List[models.ScoredPoint]:
    """Reciprocal Rank Fusion combining dense and sparse search"""
    
    results = client.query_points(
        collection_name=collection_name,
        prefetch=[
            # Dense vector prefetch
            models.Prefetch(
                query=models.Document(
                    text=query,
                    model="jinaai/jina-embeddings-v2-small-en",
                ),
                using="jina-small",
                limit=(5 * limit),  # Get more candidates for fusion
            ),
            # Sparse vector prefetch
            models.Prefetch(
                query=models.Document(
                    text=query,
                    model="Qdrant/bm25",
                ),
                using="bm25",
                limit=(5 * limit),
            ),
        ],
        # Apply RRF fusion to combine results
        query=models.FusionQuery(fusion=models.Fusion.RRF),
        limit=limit,
        with_payload=True,
    )
    
    return results.points

In [None]:
# Test all search methods with the same query
test_query = "How to set up environment variables?"

print(f"Testing search methods with query: '{test_query}'\n")

# Dense search
dense_results = dense_search(test_query, collection_name_hybrid, limit=3)
print("=== DENSE SEARCH RESULTS ===")
for i, result in enumerate(dense_results, 1):
    print(f"{i}. Score: {result.score:.4f} | {result.payload['text'][:100]}...")

# Sparse search on hybrid collection
sparse_results_hybrid = sparse_search(test_query, collection_name_hybrid, limit=3)
print("\n=== SPARSE SEARCH RESULTS ===")
for i, result in enumerate(sparse_results_hybrid, 1):
    print(f"{i}. Score: {result.score:.4f} | {result.payload['text'][:100]}...")

# Multi-stage search
multi_stage_results = multi_stage_search(test_query, collection_name_hybrid, limit=3)
print("\n=== MULTI-STAGE SEARCH RESULTS ===")
for i, result in enumerate(multi_stage_results, 1):
    print(f"{i}. Score: {result.score:.4f} | {result.payload['text'][:100]}...")

# RRF search
rrf_results = rrf_search(test_query, collection_name_hybrid, limit=3)
print("\n=== RRF HYBRID SEARCH RESULTS ===")
for i, result in enumerate(rrf_results, 1):
    print(f"{i}. Score: {result.score:.4f} | {result.payload['text'][:100]}...")

### Evaluation Framework

In [None]:
def evaluate_search_method(search_function, collection_name: str, ground_truth_data: List[Dict], 
                         method_name: str, top_k: int = 5) -> Dict[str, Any]:
    """Evaluate a search method against ground truth data"""
    
    results = {
        'method': method_name,
        'total_queries': len(ground_truth_data),
        'hits_at_1': 0,
        'hits_at_3': 0, 
        'hits_at_5': 0,
        'mrr_scores': [],  # Mean Reciprocal Rank
        'search_times': [],
        'failed_queries': 0
    }
    
    print(f"\nEvaluating {method_name}...")
    
    for i, gt_item in enumerate(ground_truth_data):
        if i % 20 == 0:
            print(f"Processed {i}/{len(ground_truth_data)} queries")
            
        query = gt_item['question']
        expected_doc_id = gt_item['document_id']
        
        try:
            # Measure search time
            start_time = time.time()
            search_results = search_function(query, collection_name, limit=top_k)
            search_time = time.time() - start_time
            results['search_times'].append(search_time)
            
            # Extract document IDs from results
            retrieved_doc_ids = [result.payload.get('id') for result in search_results]
            
            # Calculate hits@k and MRR
            if expected_doc_id in retrieved_doc_ids:
                rank = retrieved_doc_ids.index(expected_doc_id) + 1
                
                # Hits@k calculation
                if rank <= 1:
                    results['hits_at_1'] += 1
                if rank <= 3:
                    results['hits_at_3'] += 1
                if rank <= 5:
                    results['hits_at_5'] += 1
                    
                # MRR calculation
                results['mrr_scores'].append(1.0 / rank)
            else:
                results['mrr_scores'].append(0.0)
                
        except Exception as e:
            print(f"Failed query {i}: {e}")
            results['failed_queries'] += 1
            results['mrr_scores'].append(0.0)
            results['search_times'].append(0.0)
    
    # Calculate final metrics
    total_queries = results['total_queries']
    results['hit_rate_at_1'] = results['hits_at_1'] / total_queries
    results['hit_rate_at_3'] = results['hits_at_3'] / total_queries  
    results['hit_rate_at_5'] = results['hits_at_5'] / total_queries
    results['mean_reciprocal_rank'] = np.mean(results['mrr_scores'])
    results['avg_search_time'] = np.mean(results['search_times'])
    
    print(f"Completed evaluation of {method_name}")
    return results

In [None]:
# Run comprehensive evaluation on all methods
print("Starting comprehensive evaluation...")
print(f"Evaluating on {len(ground_truth)} ground truth questions")

evaluation_results = []

# Evaluate sparse search (BM25 only)
sparse_eval = evaluate_search_method(
    lambda q, c, l: sparse_search(q, collection_name_sparse, l), 
    collection_name_sparse,
    ground_truth[:100],  # Use subset for faster evaluation
    "BM25 Sparse"
)
evaluation_results.append(sparse_eval)

# Evaluate dense search
dense_eval = evaluate_search_method(
    dense_search,
    collection_name_hybrid, 
    ground_truth[:100],
    "Dense Semantic"
)
evaluation_results.append(dense_eval)

# Evaluate multi-stage search
multi_stage_eval = evaluate_search_method(
    multi_stage_search,
    collection_name_hybrid,
    ground_truth[:100], 
    "Multi-stage (Dense→Sparse)"
)
evaluation_results.append(multi_stage_eval)

# Evaluate RRF hybrid search
rrf_eval = evaluate_search_method(
    rrf_search,
    collection_name_hybrid,
    ground_truth[:100],
    "RRF Hybrid"
)
evaluation_results.append(rrf_eval)

print("\nEvaluation completed!")

### Results Analysis and Comparison

In [None]:
# Create comprehensive results summary
def create_results_summary(evaluation_results: List[Dict]) -> pd.DataFrame:
    """Create a summary DataFrame of all evaluation results"""
    
    summary_data = []
    
    for result in evaluation_results:
        summary_data.append({
            'Method': result['method'],
            'Hit Rate @1': f"{result['hit_rate_at_1']:.3f}",
            'Hit Rate @3': f"{result['hit_rate_at_3']:.3f}", 
            'Hit Rate @5': f"{result['hit_rate_at_5']:.3f}",
            'Mean Reciprocal Rank': f"{result['mean_reciprocal_rank']:.3f}",
            'Avg Search Time (ms)': f"{result['avg_search_time']*1000:.1f}",
            'Failed Queries': result['failed_queries'],
            'Total Queries': result['total_queries']
        })