# Lab 4: Hybrid Search & Re-ranking
## Learning Objectives
By the end of this lab, you will:
- Implement BM25 keyword search alongside semantic search
- Combine results using Reciprocal Rank Fusion (RRF)
- Add Cross-Encoder re-ranking as a quality gate
- Build a query preprocessing and expansion pipeline
## Setup

In [None]:
!uv pip install rank-bm25 sentence-transformers numpy -q


## Part 1: Why Simple Search Fails

Pure semantic search has **three key failure modes**:

1. **Exact match blindness** — Semantic models may not prioritize documents containing the exact query terms (e.g., error codes like `503`, product IDs, or acronyms).
2. **Out-of-domain vocabulary** — Embedding models trained on general text struggle with domain-specific jargon, rare terms, or newly coined words.
3. **Over-generalization** — Semantic similarity can surface documents that are topically related but not actually relevant to the specific question.

BM25 keyword search complements semantic search by excelling at exact term matching. Let's see this in action.

In [None]:
import numpy as np
from rank_bm25 import BM25Okapi

# Demonstrate keyword search advantage
corpus = [
    "Error 503: Service temporarily unavailable. Retry after 30 seconds.",
    "The server experienced an internal problem and could not fulfill the request.",
    "Network connectivity issues can cause service disruptions and timeouts.",
    "HTTP status code 503 indicates the server is currently unable to handle the request.",
    "Troubleshooting guide: when your application returns errors, check the logs first.",
]

query = "Error 503"

# BM25 (keyword) search
tokenized_corpus = [doc.lower().split() for doc in corpus]
bm25 = BM25Okapi(tokenized_corpus)
keyword_scores = bm25.get_scores(query.lower().split())

# Simulated semantic search (random for demo)
np.random.seed(42)
semantic_scores = np.random.uniform(0.3, 0.8, len(corpus))
# Boost docs that are semantically related but don't have exact keywords
semantic_scores[1] = 0.85  # "server problem" is semantically similar
semantic_scores[2] = 0.75  # "service disruptions" is related

print(f"Query: '{query}'\n")
print(f"{'Doc':<5} {'BM25':>8} {'Semantic':>10} {'Text'}")
print("-" * 80)
for i, doc in enumerate(corpus):
    print(f"  {i+1:<3} {keyword_scores[i]:>8.3f} {semantic_scores[i]:>10.3f}   {doc[:60]}...")

print(f"\nBM25 top result: Doc {np.argmax(keyword_scores)+1} (exact match)")
print(f"Semantic top result: Doc {np.argmax(semantic_scores)+1} (meaning match)")
print("\nNeither alone finds all relevant documents!")


## Part 2: Reciprocal Rank Fusion (RRF)

Reciprocal Rank Fusion is a simple but effective technique for combining ranked lists from different retrieval methods. The key insight is that it only uses **rank positions**, not raw scores, making it robust to different score scales.

The formula is:

$$\text{RRF}(d) = \sum_{r \in R} \frac{1}{k + \text{rank}_r(d)}$$

Where `k` is a smoothing constant (typically 60) and `rank_r(d)` is the rank of document `d` in ranked list `r`.

In [None]:
def reciprocal_rank_fusion(ranked_lists, k=60):
    """
    Combine multiple ranked result lists using RRF.
    
    RRF Score = sum(1 / (rank + k)) across all lists
    
    Args:
        ranked_lists: List of lists, each containing (doc_id, score) tuples sorted by score desc
        k: Smoothing constant (default 60)
    
    Returns:
        List of (doc_id, fused_score) sorted by fused score descending
    """
    fused_scores = {}
    
    for result_list in ranked_lists:
        for rank, (doc_id, _score) in enumerate(result_list):
            if doc_id not in fused_scores:
                fused_scores[doc_id] = 0.0
            fused_scores[doc_id] += 1.0 / (rank + k)
    
    # Sort by fused score descending
    return sorted(fused_scores.items(), key=lambda x: x[1], reverse=True)

# Build ranked lists from our scores
keyword_ranked = sorted(enumerate(keyword_scores), key=lambda x: x[1], reverse=True)
keyword_ranked = [(i, s) for i, s in keyword_ranked]

semantic_ranked = sorted(enumerate(semantic_scores), key=lambda x: x[1], reverse=True)
semantic_ranked = [(i, s) for i, s in semantic_ranked]

# Fuse!
fused = reciprocal_rank_fusion([keyword_ranked, semantic_ranked], k=60)

print("RRF Results:")
print(f"{'Rank':<6} {'Doc':<5} {'RRF Score':>10} {'Text'}")
print("-" * 70)
for rank, (doc_id, score) in enumerate(fused, 1):
    print(f"  {rank:<4} {doc_id+1:<5} {score:>10.6f}   {corpus[doc_id][:55]}...")


### Exercise 2.1: Implement Weighted RRF

Standard RRF treats all retrieval methods equally. But what if you know that semantic search is better for your domain? Implement a **weighted** version of RRF where each ranked list gets a configurable weight.

In [None]:
def weighted_rrf(ranked_lists, weights=None, k=60):
    """RRF with configurable weights per result list."""
    if weights is None:
        weights = [1.0] * len(ranked_lists)
    
    # TODO: Implement weighted RRF
    # Same as RRF, but multiply each list's contribution by its weight
    # fused_scores[doc_id] += weight * (1 / (rank + k))
    fused_scores = {}
    
    # Your implementation here
    pass
    
    return sorted(fused_scores.items(), key=lambda x: x[1], reverse=True)

# Test: weight semantic search 2x more than keyword
result = weighted_rrf([keyword_ranked, semantic_ranked], weights=[1.0, 2.0])
from tests import checks
checks.check_lab_4_2(result)

# The semantic-heavy weighting should change the ranking
print("Weighted RRF (semantic 2x):")
for rank, (doc_id, score) in enumerate(result[:3], 1):
    print(f"  {rank}. Doc {doc_id+1}: {score:.6f} - {corpus[doc_id][:60]}...")
print("Weighted RRF working!")


## Part 3: Cross-Encoder Re-ranking

### Bi-Encoder vs. Cross-Encoder

| | Bi-Encoder | Cross-Encoder |
|---|---|---|
| **How it works** | Encodes query and document independently, compares with cosine similarity | Encodes query and document together as a single input |
| **Speed** | Fast (pre-compute embeddings) | Slow (must run for each query-document pair) |
| **Accuracy** | Good | Better (sees full interaction between query and document) |
| **Use case** | First-stage retrieval (search millions of docs) | Second-stage re-ranking (re-score top 10-50 candidates) |

The typical pattern: use a bi-encoder (or hybrid search) to retrieve ~50 candidates, then use a cross-encoder to re-rank those candidates for the final top-K.

In [None]:
class SimpleReranker:
    """
    Simulated cross-encoder re-ranker.
    In production: use sentence_transformers.CrossEncoder('BAAI/bge-reranker-base')
    """
    
    def rerank(self, query, candidates, top_k=3):
        """
        Re-rank candidates based on query-document relevance.
        
        In production, this would use a CrossEncoder model:
            pairs = [[query, doc['text']] for doc in candidates]
            scores = cross_encoder.predict(pairs)
        """
        scored = []
        query_terms = set(query.lower().split())
        
        for doc in candidates:
            text_terms = set(doc['text'].lower().split())
            # Simulate relevance: overlap + length penalty
            overlap = len(query_terms & text_terms)
            length_penalty = min(1.0, 50 / max(len(doc['text']), 1))
            score = overlap * 0.3 + (1 - length_penalty) * 0.7
            scored.append({**doc, 'relevance_score': score})
        
        scored.sort(key=lambda x: x['relevance_score'], reverse=True)
        return scored[:top_k]

# Demonstrate the full pipeline
candidates = [
    {"id": i, "text": doc, "initial_score": float(fused[j][1])}
    for j, (i, _) in enumerate(fused)
]

reranker = SimpleReranker()
final = reranker.rerank("Error 503", candidates, top_k=3)

print("After Re-ranking:")
for i, doc in enumerate(final, 1):
    print(f"  {i}. [relevance={doc['relevance_score']:.3f}] {doc['text'][:70]}...")


## Part 4: Complete AdvancedRetriever

Now let's put it all together into a single class that implements the full hybrid search + re-ranking pipeline.

In [None]:
class AdvancedRetriever:
    """Full hybrid search + re-ranking pipeline."""
    
    def __init__(self, documents):
        self.documents = documents
        # Build BM25 index
        tokenized = [doc['text'].lower().split() for doc in documents]
        self.bm25 = BM25Okapi(tokenized)
        self.reranker = SimpleReranker()
    
    def _keyword_search(self, query, top_k=10):
        scores = self.bm25.get_scores(query.lower().split())
        ranked_idx = np.argsort(scores)[::-1][:top_k]
        return [(int(i), float(scores[i])) for i in ranked_idx if scores[i] > 0]
    
    def _semantic_search(self, query, top_k=10):
        # Simulated - in production: embed query + vector store search
        np.random.seed(hash(query) % 2**32)
        scores = np.random.uniform(0.3, 0.9, len(self.documents))
        ranked_idx = np.argsort(scores)[::-1][:top_k]
        return [(int(i), float(scores[i])) for i in ranked_idx]
    
    def retrieve(self, query, top_k=3, candidates=10):
        """Full pipeline: hybrid search -> RRF -> re-rank -> top K."""
        # 1. Parallel retrieval
        keyword_results = self._keyword_search(query, candidates)
        semantic_results = self._semantic_search(query, candidates)
        
        # 2. Fuse with RRF
        fused = reciprocal_rank_fusion([keyword_results, semantic_results])
        
        # 3. Get full documents for top candidates
        candidate_docs = []
        for doc_id, rrf_score in fused[:candidates]:
            doc = {**self.documents[doc_id], 'id': doc_id, 'rrf_score': rrf_score}
            candidate_docs.append(doc)
        
        # 4. Re-rank
        final = self.reranker.rerank(query, candidate_docs, top_k=top_k)
        
        return final

# Test
docs = [{"text": doc} for doc in corpus]
retriever = AdvancedRetriever(docs)
results = retriever.retrieve("Error 503 service unavailable", top_k=3)

print("Advanced Retrieval Results:")
for i, r in enumerate(results, 1):
    print(f"  {i}. [score={r['relevance_score']:.3f}] {r['text'][:70]}...")


## Part 5: Query Preprocessing

Good retrieval starts before you even search. Query preprocessing can significantly improve results by:
- Normalizing formatting and whitespace
- Expanding acronyms so both forms are searched
- Classifying query intent to route to the best retrieval strategy

In [None]:
import re

class QueryPreprocessor:
    """Cleans and expands queries for better retrieval."""
    
    def __init__(self, acronym_map=None):
        self.acronym_map = acronym_map or {
            "RAG": "Retrieval-Augmented Generation",
            "LLM": "Large Language Model",
            "NLP": "Natural Language Processing",
            "HNSW": "Hierarchical Navigable Small World",
        }
    
    def preprocess(self, query):
        # Normalize whitespace
        query = re.sub(r'\s+', ' ', query.strip())
        return query
    
    def expand_acronyms(self, query):
        expanded = query
        for acronym, full in self.acronym_map.items():
            if acronym in query.upper():
                expanded = f"{expanded} ({full})"
        return expanded
    
    def classify_intent(self, query):
        q = query.lower()
        if any(w in q for w in ["compare", "vs", "difference", "versus"]):
            return "comparison"
        elif any(w in q for w in ["how to", "how do", "implement", "build"]):
            return "how_to"
        elif any(w in q for w in ["what is", "what are", "define", "explain"]):
            return "factual"
        elif any(w in q for w in ["why", "error", "fail", "wrong"]):
            return "troubleshooting"
        return "general"

# Test
preprocessor = QueryPreprocessor()
queries = [
    "What is RAG?",
    "Compare HNSW vs flat index",
    "How to implement re-ranking?",
    "Why is my retrieval accuracy low?",
]

for q in queries:
    expanded = preprocessor.expand_acronyms(q)
    intent = preprocessor.classify_intent(q)
    print(f"  [{intent:<16}] {q}")
    if expanded != q:
        print(f"  {'':>18} -> {expanded}")


### Exercise 5.1: Add Query Expansion

Query expansion generates multiple variations of a query to improve recall. In production, you would use an LLM to generate semantically diverse variations. For this exercise, implement a simple rule-based approach.

In [None]:
def generate_query_variations(query, n=3):
    """
    Generate search query variations to improve recall.
    In production: use an LLM to generate variations.
    """
    # TODO: Generate n variations of the query
    # Strategy: append different suffixes that capture different aspects
    # Example: "What is RAG?" -> ["What is RAG?", "RAG methodology", "RAG state of the art"]
    variations = [query]  # Always include original
    
    # TODO: Add n-1 more variations
    # Hint: Try appending words like "methodology", "comparison", "implementation", "overview"
    
    return variations[:n+1]  # Original + n variations

# Test
variations = generate_query_variations("What is chunking in RAG?", n=3)
from tests import checks
checks.check_lab_4_5(variations)
print("Query variations:")
for i, v in enumerate(variations):
    print(f"  {i+1}. {v}")
print("Query expansion working!")


## Reflection Questions
1. **Hybrid Search**: When would pure BM25 outperform pure semantic search? Give a specific example query.
2. **Re-ranking Cost**: If you have 1000 queries/day and retrieve 50 candidates each, how many cross-encoder inferences per day? Is this feasible?
3. **RRF Parameter k**: What happens if k=1 (very small)? What about k=10000 (very large)? How does it affect the fusion?

*Your answers here:*
1. ...
2. ...
3. ...