# Fusion Retrieval: Combining Vector and Keyword Search

In this notebook, I implement a fusion retrieval system that combines the strengths of semantic vector search with keyword-based BM25 retrieval. This approach improves retrieval quality by capturing both conceptual similarity and exact keyword matches.

## Why Fusion Retrieval Matters

Traditional RAG systems typically rely on vector search alone, but this has limitations:

- Vector search excels at semantic similarity but may miss exact keyword matches
- Keyword search is great for specific terms but lacks semantic understanding
- Different queries perform better with different retrieval methods

Fusion retrieval gives us the best of both worlds by:

- Performing both vector-based and keyword-based retrieval
- Normalizing the scores from each approach
- Combining them with a weighted formula
- Ranking documents based on the combined score

## Setting Up the Environment
We begin by importing necessary libraries.

In [1]:
import os
import numpy as np
from rank_bm25 import BM25Okapi
import fitz
from openai import OpenAI
import re
import json
import time
from sklearn.metrics.pairwise import cosine_similarity

## Setting Up the OpenAI API Client
We initialize the OpenAI client to generate embeddings and responses.

In [2]:
# Initialize the OpenAI client with the base URL and API key
client = OpenAI(
    api_key=os.getenv("GOOGLE_API_KEY")  # Retrieve the API key from environment variables
)

## Document Processing Functions

In [3]:
def extract_text_from_pdf(pdf_path):
    """
    Extract text content from a PDF file.
    
    Args:
        pdf_path (str): Path to the PDF file
        
    Returns:
        str: Extracted text content
    """
    print(f"Extracting text from {pdf_path}...")  # Print the path of the PDF being processed
    pdf_document = fitz.open(pdf_path)  # Open the PDF file using PyMuPDF
    text = ""  # Initialize an empty string to store the extracted text
    
    # Iterate through each page in the PDF
    for page_num in range(pdf_document.page_count):
        page = pdf_document[page_num]  # Get the page object
        text += page.get_text()  # Extract text from the page and append to the text string
    
    return text  # Return the extracted text content

In [4]:
def chunk_text(text, chunk_size=1000, chunk_overlap=200):
    """
    Split text into overlapping chunks.
    
    Args:
        text (str): Input text to chunk
        chunk_size (int): Size of each chunk in characters
        chunk_overlap (int): Overlap between chunks in characters
        
    Returns:
        List[Dict]: List of chunks with text and metadata
    """
    chunks = []  # Initialize an empty list to store chunks
    
    # Iterate over the text with the specified chunk size and overlap
    for i in range(0, len(text), chunk_size - chunk_overlap):
        chunk = text[i:i + chunk_size]  # Extract a chunk of the specified size
        if chunk:  # Ensure we don't add empty chunks
            chunk_data = {
                "text": chunk,  # The chunk text
                "metadata": {
                    "start_char": i,  # Start character index of the chunk
                    "end_char": i + len(chunk)  # End character index of the chunk
                }
            }
            chunks.append(chunk_data)  # Add the chunk data to the list
    
    print(f"Created {len(chunks)} text chunks")  # Print the number of created chunks
    return chunks  # Return the list of chunks

In [5]:
def clean_text(text):
    """
    Clean text by removing extra whitespace and special characters.
    
    Args:
        text (str): Input text
        
    Returns:
        str: Cleaned text
    """
    # Replace multiple whitespace characters (including newlines and tabs) with a single space
    text = re.sub(r'\s+', ' ', text)
    
    # Fix common OCR issues by replacing tab and newline characters with a space
    text = text.replace('\\t', ' ')
    text = text.replace('\\n', ' ')
    
    # Remove any leading or trailing whitespace and ensure single spaces between words
    text = ' '.join(text.split())
    
    return text

## Creating Our Vector Store

In [6]:
import google.generativeai as genai



def create_embeddings(texts, model="models/embedding-001"):
    """
    Create embeddings for the given texts using the specified Gemini model.

    Args:
        texts (str or List[str]): Input text(s)
        model (str): Embedding model name. Defaults to "models/embedding-001".

    Returns:
        List[float] or List[List[float]]: Embedding vector(s)
    """
    # Gemini's `embed_content` can handle both a single string or a list of strings
    # in the 'content' parameter.
    response = genai.embed_content(
        model=model,
        content=texts
    )

    # The response['embedding'] key contains the embedding vectors.
    # If the original input was a single string, return just the first embedding vector.
    if isinstance(texts, str):
        return response['embedding']
    
    # Otherwise, return all embedding vectors as a list of lists.
    return response['embedding']

In [1]:
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity
import google.generativeai as genai
import os
from dotenv import load_dotenv

# --- The SimpleVectorStore class (no changes needed) ---
class SimpleVectorStore:
    """
    A simple vector store implementation using NumPy.
    """
    def __init__(self):
        self.vectors = []  # List to store embedding vectors
        self.texts = []  # List to store text content
        self.metadata = []  # List to store metadata
    
    def add_item(self, text, embedding, metadata=None):
        """
        Add an item to the vector store.
        
        Args:
            text (str): The text content
            embedding (List[float]): The embedding vector
            metadata (Dict, optional): Additional metadata
        """
        self.vectors.append(np.array(embedding))
        self.texts.append(text)
        self.metadata.append(metadata or {})
    
    def add_items(self, items, embeddings):
        """
        Add multiple items to the vector store.
        
        Args:
            items (List[Dict]): List of text items
            embeddings (List[List[float]]): List of embedding vectors
        """
        for i, (item, embedding) in enumerate(zip(items, embeddings)):
            self.add_item(
                text=item["text"],
                embedding=embedding,
                metadata={**item.get("metadata", {}), "index": i}
            )
    
    def similarity_search_with_scores(self, query_embedding, k=5):
        """
        Find the most similar items to a query embedding with similarity scores.
        
        Args:
            query_embedding (List[float]): Query embedding vector
            k (int): Number of results to return
            
        Returns:
            List[Tuple[Dict, float]]: Top k most similar items with scores
        """
        if not self.vectors:
            return []
        
        query_vector = np.array(query_embedding).reshape(1, -1)
        stored_vectors = np.array(self.vectors)
        
        # Calculate similarities using scikit-learn's cosine_similarity
        # This is more efficient than a manual loop for large datasets
        similarities = cosine_similarity(query_vector, stored_vectors)[0]
        
        # Get the indices of the top k scores
        top_k_indices = np.argsort(similarities)[-k:][::-1]
        
        # Return top k results with scores
        results = []
        for idx in top_k_indices:
            score = similarities[idx]
            results.append({
                "text": self.texts[idx],
                "metadata": self.metadata[idx],
                "similarity": float(score)
            })
        
        return results
    
    def get_all_documents(self):
        """
        Get all documents in the store.
        
        Returns:
            List[Dict]: All documents
        """
        return [{"text": text, "metadata": meta} for text, meta in zip(self.texts, self.metadata)]

# --- Example Usage with Gemini ---

def create_gemini_embeddings(texts, model="models/embedding-001"):
    """Creates embeddings for text(s) using Gemini."""
    response = genai.embed_content(model=model, content=texts)
    return response['embedding']

if __name__ == '__main__':
    # 1. Initialize Gemini client
    load_dotenv()
    api_key = os.getenv("GEMINI_API_KEY")
 
    try:
        genai.configure(api_key=api_key)
    except Exception as e:
        print(f"An error occurred during Gemini API configuration: {e}")
        exit()
    
    # 2. Initialize the vector store
    store = SimpleVectorStore()

    # 3. Define and embed some texts using Gemini
    texts_to_add = [
        "The Pennsylvania Emergency Preparedness Guide details how to make a family emergency plan.",
        "A Home Emergency Kit Checklist is available on pages 12-15.",
        "The guide lists the top 10 emergencies, including floods, fires, and winter storms.",
        "The phone number for the Central Pennsylvania Food Bank is (717) 564-1700."
    ]
    
    embeddings = create_gemini_embeddings(texts_to_add)
    
    items = [{"text": t} for t in texts_to_add]
    store.add_items(items, embeddings)
    
    print("Vector store populated with Gemini embeddings.")
    
    # 4. Perform a similarity search with a new query
    query_text = "how to create a family emergency plan"
    print(f"\nSearching for items similar to: '{query_text}'")
    query_embedding = create_gemini_embeddings(query_text)
    
    search_results = store.similarity_search_with_scores(query_embedding, k=2)

    # 5. Print the search results
    print("\nTop 2 search results:")
    for result in search_results:
        print(f"  - Text: {result['text']}")
        print(f"    Similarity: {result['similarity']:.4f}")
        print(f"    Metadata: {result['metadata']}")

Vector store populated with Gemini embeddings.

Searching for items similar to: 'how to create a family emergency plan'

Top 2 search results:
  - Text: The Pennsylvania Emergency Preparedness Guide details how to make a family emergency plan.
    Similarity: 0.8418
    Metadata: {'index': 0}
  - Text: A Home Emergency Kit Checklist is available on pages 12-15.
    Similarity: 0.7590
    Metadata: {'index': 1}


## BM25 Implementation

In [2]:
def create_bm25_index(chunks):
    """
    Create a BM25 index from the given chunks.
    
    Args:
        chunks (List[Dict]): List of text chunks
        
    Returns:
        BM25Okapi: A BM25 index
    """
    # Extract text from each chunk
    texts = [chunk["text"] for chunk in chunks]
    
    # Tokenize each document by splitting on whitespace
    tokenized_docs = [text.split() for text in texts]
    
    # Create the BM25 index using the tokenized documents
    bm25 = BM25Okapi(tokenized_docs)
    
    # Print the number of documents in the BM25 index
    print(f"Created BM25 index with {len(texts)} documents")
    
    return bm25

In [3]:
def bm25_search(bm25, chunks, query, k=5):
    """
    Search the BM25 index with a query.
    
    Args:
        bm25 (BM25Okapi): BM25 index
        chunks (List[Dict]): List of text chunks
        query (str): Query string
        k (int): Number of results to return
        
    Returns:
        List[Dict]: Top k results with scores
    """
    # Tokenize the query by splitting it into individual words
    query_tokens = query.split()
    
    # Get BM25 scores for the query tokens against the indexed documents
    scores = bm25.get_scores(query_tokens)
    
    # Initialize an empty list to store results with their scores
    results = []
    
    # Iterate over the scores and corresponding chunks
    for i, score in enumerate(scores):
        # Create a copy of the metadata to avoid modifying the original
        metadata = chunks[i].get("metadata", {}).copy()
        # Add index to metadata
        metadata["index"] = i
        
        results.append({
            "text": chunks[i]["text"],
            "metadata": metadata,  # Add metadata with index
            "bm25_score": float(score)
        })
    
    # Sort the results by BM25 score in descending order
    results.sort(key=lambda x: x["bm25_score"], reverse=True)
    
    # Return the top k results
    return results[:k]

## Fusion Retrieval Function

In [4]:
import numpy as np

def fusion_retrieval(query, chunks, vector_store, bm25_index, k=5, alpha=0.5):
    """
    Perform fusion retrieval combining vector-based and BM25 search.
    
    Args:
        query (str): Query string
        chunks (List[Dict]): Original text chunks
        vector_store (SimpleVectorStore): Vector store
        bm25_index (BM25Okapi): BM25 index
        k (int): Number of results to return
        alpha (float): Weight for vector scores (0-1), where 1-alpha is BM25 weight
        
    Returns:
        List[Dict]: Top k results based on combined scores
    """
    print(f"Performing fusion retrieval for query: {query}")
    
    # Define small epsilon to avoid division by zero
    epsilon = 1e-8
    
    # Get vector search results
    query_embedding = create_embeddings(query)
    vector_results = vector_store.similarity_search_with_scores(query_embedding, k=len(chunks))
    
    # Get BM25 search results
    bm25_results = bm25_search(bm25_index, chunks, query, k=len(chunks))
    
    # Create dictionaries to map document index to score
    vector_scores_dict = {result["metadata"]["index"]: result["similarity"] for result in vector_results}
    bm25_scores_dict = {result["metadata"]["index"]: result["bm25_score"] for result in bm25_results}
    
    # Ensure all documents have scores for both methods
    all_docs = vector_store.get_all_documents()
    combined_results = []
    
    for i, doc in enumerate(all_docs):
        vector_score = vector_scores_dict.get(i, 0.0)
        bm25_score = bm25_scores_dict.get(i, 0.0)
        combined_results.append({
            "text": doc["text"],
            "metadata": doc["metadata"],
            "vector_score": vector_score,
            "bm25_score": bm25_score,
            "index": i
        })
    
    # Extract scores as arrays
    vector_scores = np.array([doc["vector_score"] for doc in combined_results])
    bm25_scores = np.array([doc["bm25_score"] for doc in combined_results])
    
    # Normalize scores
    norm_vector_scores = (vector_scores - np.min(vector_scores)) / (np.max(vector_scores) - np.min(vector_scores) + epsilon)
    norm_bm25_scores = (bm25_scores - np.min(bm25_scores)) / (np.max(bm25_scores) - np.min(bm25_scores) + epsilon)
    
    # Compute combined scores
    combined_scores = alpha * norm_vector_scores + (1 - alpha) * norm_bm25_scores
    
    # Add combined scores to results
    for i, score in enumerate(combined_scores):
        combined_results[i]["combined_score"] = float(score)
    
    # Sort by combined score (descending)
    combined_results.sort(key=lambda x: x["combined_score"], reverse=True)
    
    # Return top k results
    top_results = combined_results[:k]
    
    print(f"Retrieved {len(top_results)} documents with fusion retrieval")
    return top_results

## Document Processing Pipeline

In [5]:
import numpy as np

# Assuming the following Gemini-compatible functions/classes are defined elsewhere:
# - create_embeddings(query)
# - bm25_search(bm25_index, chunks, query, k)
# - SimpleVectorStore (with methods similarity_search_with_scores and get_all_documents)

def fusion_retrieval(query, chunks, vector_store, bm25_index, k=5, alpha=0.5):
    """
    Perform fusion retrieval combining vector-based and BM25 search.
    
    Args:
        query (str): Query string
        chunks (List[Dict]): Original text chunks
        vector_store (SimpleVectorStore): Vector store
        bm25_index (BM25Okapi): BM25 index
        k (int): Number of results to return
        alpha (float): Weight for vector scores (0-1), where 1-alpha is BM25 weight
        
    Returns:
        List[Dict]: Top k results based on combined scores
    """
    print(f"Performing fusion retrieval for query: {query}")
    
    # Define small epsilon to avoid division by zero
    epsilon = 1e-8
    
    # Get vector search results
    query_embedding = create_embeddings(query)
    vector_results = vector_store.similarity_search_with_scores(query_embedding, k=len(chunks))
    
    # Get BM25 search results
    bm25_results = bm25_search(bm25_index, chunks, query, k=len(chunks))
    
    # Create dictionaries to map document index to score
    vector_scores_dict = {result["metadata"]["index"]: result["similarity"] for result in vector_results}
    bm25_scores_dict = {result["metadata"]["index"]: result["bm25_score"] for result in bm25_results}
    
    # Ensure all documents have scores for both methods
    all_docs = vector_store.get_all_documents()
    combined_results = []
    
    for i, doc in enumerate(all_docs):
        vector_score = vector_scores_dict.get(i, 0.0)
        bm25_score = bm25_scores_dict.get(i, 0.0)
        combined_results.append({
            "text": doc["text"],
            "metadata": doc["metadata"],
            "vector_score": vector_score,
            "bm25_score": bm25_score,
            "index": i
        })
    
    # Extract scores as arrays
    vector_scores = np.array([doc["vector_score"] for doc in combined_results])
    bm25_scores = np.array([doc["bm25_score"] for doc in combined_results])
    
    # Normalize scores
    norm_vector_scores = (vector_scores - np.min(vector_scores)) / (np.max(vector_scores) - np.min(vector_scores) + epsilon)
    norm_bm25_scores = (bm25_scores - np.min(bm25_scores)) / (np.max(bm25_scores) - np.min(bm25_scores) + epsilon)
    
    # Compute combined scores
    combined_scores = alpha * norm_vector_scores + (1 - alpha) * norm_bm25_scores
    
    # Add combined scores to results
    for i, score in enumerate(combined_scores):
        combined_results[i]["combined_score"] = float(score)
    
    # Sort by combined score (descending)
    combined_results.sort(key=lambda x: x["combined_score"], reverse=True)
    
    # Return top k results
    top_results = combined_results[:k]
    
    print(f"Retrieved {len(top_results)} documents with fusion retrieval")
    return top_results

## Response Generation

In [6]:
import google.generativeai as genai

# Assume genai.configure(api_key="YOUR_API_KEY") has been called.

def generate_response(query, context, model="gemini-2.0-flash"):
    """
    Generate a response based on the query and context using a Gemini model.

    Args:
        query (str): User query
        context (str): Context from retrieved documents
        model (str): The model to use for response generation. Defaults to "gemini-2.0-flash".

    Returns:
        str: Generated response
    """
    # For Gemini, it's often best to combine the system prompt with the user's prompt
    # into a single, cohesive instruction to guide the model's behavior.

    # Combine the system prompt with the user's query and context into a single prompt string.
    prompt = f"""
    You are a helpful AI assistant. Answer the user's question based on the provided context. 
    If the context doesn't contain relevant information to answer the question fully, acknowledge this limitation.

    Context:
    {context}

    Question: {query}

    Please answer the question based on the provided context.
    """
    
    # Initialize the Gemini GenerativeModel
    model_instance = genai.GenerativeModel(model)

    # Generate the response using the Gemini API
    try:
        response = model_instance.generate_content(
            prompt,
            generation_config=genai.GenerationConfig(
                temperature=0.3 # Set the temperature for response generation
            )
        )
        
        # Return the generated response, stripping any leading/trailing whitespace
        return response.text.strip()
    
    except Exception as e:
        # Handle potential errors from the API call
        return f"An error occurred while generating the response: {e}"

## Main Retrieval Function

In [7]:


def answer_with_fusion_rag(query, chunks, vector_store, bm25_index, k=5, alpha=0.5):
    """
    Answer a query using fusion RAG with Gemini-compatible functions.
    
    Args:
        query (str): User query
        chunks (List[Dict]): Text chunks
        vector_store (SimpleVectorStore): Vector store
        bm25_index (BM25Okapi): BM25 index
        k (int): Number of documents to retrieve
        alpha (float): Weight for vector scores
        
    Returns:
        Dict: Query results including retrieved documents and response
    """
    # Retrieve documents using fusion retrieval method
    retrieved_docs = fusion_retrieval(query, chunks, vector_store, bm25_index, k=k, alpha=alpha)
    
    # Format the context from the retrieved documents by joining their text with separators
    context = "\n\n---\n\n".join([doc["text"] for doc in retrieved_docs])
    
    # Generate a response based on the query and the formatted context
    response = generate_response(query, context)
    
    # Return the query, retrieved documents, and the generated response
    return {
        "query": query,
        "retrieved_documents": retrieved_docs,
        "response": response
    }

## Comparing Retrieval Methods

In [8]:


def vector_only_rag(query, vector_store, k=5):
    """
    Answer a query using only vector-based RAG with Gemini-compatible functions.
    
    Args:
        query (str): User query
        vector_store (SimpleVectorStore): Vector store
        k (int): Number of documents to retrieve
        
    Returns:
        Dict: Query results
    """
    # Create query embedding
    query_embedding = create_embeddings(query)
    
    # Retrieve documents using vector-based similarity search
    retrieved_docs = vector_store.similarity_search_with_scores(query_embedding, k=k)
    
    # Format the context from the retrieved documents by joining their text with separators
    context = "\n\n---\n\n".join([doc["text"] for doc in retrieved_docs])
    
    # Generate a response based on the query and the formatted context
    response = generate_response(query, context)
    
    # Return the query, retrieved documents, and the generated response
    return {
        "query": query,
        "retrieved_documents": retrieved_docs,
        "response": response
    }

In [15]:
def bm25_only_rag(query, chunks, bm25_index, k=5):
    """
    Answer a query using only BM25-based RAG.
    
    Args:
        query (str): User query
        chunks (List[Dict]): Text chunks
        bm25_index (BM25Okapi): BM25 index
        k (int): Number of documents to retrieve
        
    Returns:
        Dict: Query results
    """
    # Retrieve documents using BM25 search
    retrieved_docs = bm25_search(bm25_index, chunks, query, k=k)
    
    # Format the context from the retrieved documents by joining their text with separators
    context = "\n\n---\n\n".join([doc["text"] for doc in retrieved_docs])
    
    # Generate a response based on the query and the formatted context
    response = generate_response(query, context)
    
    # Return the query, retrieved documents, and the generated response
    return {
        "query": query,
        "retrieved_documents": retrieved_docs,
        "response": response
    }

## Evaluation Functions

In [10]:
def compare_retrieval_methods(query, chunks, vector_store, bm25_index, k=5, alpha=0.5, reference_answer=None):
    """
    Compare different retrieval methods for a query.
    
    Args:
        query (str): User query
        chunks (List[Dict]): Text chunks
        vector_store (SimpleVectorStore): Vector store
        bm25_index (BM25Okapi): BM25 index
        k (int): Number of documents to retrieve
        alpha (float): Weight for vector scores in fusion retrieval
        reference_answer (str, optional): Reference answer for comparison
        
    Returns:
        Dict: Comparison results
    """
    print(f"\n=== Comparing retrieval methods for query: {query} ===\n")
    
    # Run vector-only RAG
    print("\nRunning vector-only RAG...")
    vector_result = vector_only_rag(query, vector_store, k)
    
    # Run BM25-only RAG
    # This assumes a Gemini-compatible `bm25_only_rag` helper function exists.
    print("\nRunning BM25-only RAG...")
    bm25_result = bm25_only_rag(query, chunks, bm25_index, k)
    
    # Run fusion RAG
    # This assumes a Gemini-compatible `answer_with_fusion_rag` helper function exists.
    print("\nRunning fusion RAG...")
    fusion_result = answer_with_fusion_rag(query, chunks, vector_store, bm25_index, k, alpha)
    
    # Compare responses from different retrieval methods
    # This assumes a Gemini-compatible `evaluate_responses` helper function exists.
    print("\nComparing responses...")
    comparison = evaluate_responses(
        query, 
        vector_result["response"], 
        bm25_result["response"], 
        fusion_result["response"],
        reference_answer
    )
    
    # Return the comparison results
    return {
        "query": query,
        "vector_result": vector_result,
        "bm25_result": bm25_result,
        "fusion_result": fusion_result,
        "comparison": comparison
    }

In [11]:
import google.generativeai as genai

def evaluate_responses(query, vector_response, bm25_response, fusion_response, reference_answer=None):
    """
    Evaluate the responses from different retrieval methods using a Gemini model.

    Args:
        query (str): User query
        vector_response (str): Response from vector-only RAG
        bm25_response (str): Response from BM25-only RAG
        fusion_response (str): Response from fusion RAG
        reference_answer (str, optional): Reference answer
        
    Returns:
        str: Evaluation of responses
    """
    # System prompt for the evaluator to guide the evaluation process
    system_prompt = """You are an expert evaluator of RAG systems. Compare responses from three different retrieval approaches:
    1. Vector-based retrieval: Uses semantic similarity for document retrieval
    2. BM25 keyword retrieval: Uses keyword matching for document retrieval
    3. Fusion retrieval: Combines both vector and keyword approaches

    Evaluate the responses based on:
    - Relevance to the query
    - Factual correctness
    - Comprehensiveness
    - Clarity and coherence"""

    # User prompt containing the query and responses
    user_prompt = f"""Query: {query}

    Vector-based response:
    {vector_response}

    BM25 keyword response:
    {bm25_response}

    Fusion response:
    {fusion_response}
    """

    # Add reference answer to the prompt if provided
    if reference_answer:
        user_prompt += f"""
        Reference answer:
        {reference_answer}
        """

    # Add instructions for detailed comparison to the user prompt
    user_prompt += """
    Please provide a detailed comparison of these three responses. Which approach performed best for this query and why?
    Be specific about the strengths and weaknesses of each approach for this particular query.
    """

    # Combine system and user prompts into a single prompt for Gemini
    full_prompt = f"{system_prompt}\n\n{user_prompt}"

    # Initialize the Gemini GenerativeModel
    model_instance = genai.GenerativeModel("gemini-2.0-flash")
    
    try:
        # Generate the evaluation using the Gemini API
        response = model_instance.generate_content(
            full_prompt,
            generation_config=genai.GenerationConfig(
                temperature=0.0 # Set a low temperature for a more objective evaluation
            )
        )
        
        # Return the generated evaluation content
        return response.text.strip()

    except Exception as e:
        return f"An error occurred during evaluation: {e}"

## Complete Evaluation Pipeline

In [12]:
# Assuming the following Gemini-compatible functions are defined elsewhere:
# - process_document(pdf_path)
# - compare_retrieval_methods(...)
# - generate_overall_analysis(results)

def evaluate_fusion_retrieval(pdf_path, test_queries, reference_answers=None, k=5, alpha=0.5):
    """
    Evaluate fusion retrieval compared to other methods.
    
    Args:
        pdf_path (str): Path to the PDF file
        test_queries (List[str]): List of test queries
        reference_answers (List[str], optional): Reference answers
        k (int): Number of documents to retrieve
        alpha (float): Weight for vector scores in fusion retrieval
        
    Returns:
        Dict: Evaluation results
    """
    print("=== EVALUATING FUSION RETRIEVAL ===\n")
    
    # Process the document to extract text, create chunks, and build vector and BM25 indices
    # Assumes process_document returns chunks, vector_store, and bm25_index
    chunks, vector_store, bm25_index = process_document(pdf_path)
    
    # Initialize a list to store results for each query
    results = []
    
    # Iterate over each test query
    for i, query in enumerate(test_queries):
        print(f"\n\n=== Evaluating Query {i+1}/{len(test_queries)} ===")
        print(f"Query: {query}")
        
        # Get the reference answer if available
        reference = None
        if reference_answers and i < len(reference_answers):
            reference = reference_answers[i]
        
        # Compare retrieval methods for the current query
        comparison = compare_retrieval_methods(
            query, 
            chunks, 
            vector_store, 
            bm25_index, 
            k=k, 
            alpha=alpha,
            reference_answer=reference
        )
        
        # Append the comparison results to the results list
        results.append(comparison)
        
        # Print the responses from different retrieval methods
        print("\n=== Vector-based Response ===")
        print(comparison["vector_result"]["response"])
        
        print("\n=== BM25 Response ===")
        print(comparison["bm25_result"]["response"])
        
        print("\n=== Fusion Response ===")
        print(comparison["fusion_result"]["response"])
        
        print("\n=== Comparison ===")
        print(comparison["comparison"])
    
    # Generate an overall analysis of the fusion retrieval performance
    overall_analysis = generate_overall_analysis(results)
    
    # Return the results and overall analysis
    return {
        "results": results,
        "overall_analysis": overall_analysis
    }

In [13]:
import google.generativeai as genai


def generate_overall_analysis(results):
    """
    Generate an overall analysis of fusion retrieval using a Gemini model.
    
    Args:
        results (List[Dict]): Results from evaluating queries
        
    Returns:
        str: Overall analysis
    """
    # System prompt to guide the evaluation process
    system_prompt = """You are an expert at evaluating information retrieval systems.
    Based on multiple test queries, provide an overall analysis comparing three retrieval approaches:
    1. Vector-based retrieval (semantic similarity)
    2. BM25 keyword retrieval (keyword matching)
    3. Fusion retrieval (combination of both)

    Focus on:
    1. Types of queries where each approach performs best
    2. Overall strengths and weaknesses of each approach
    3. How fusion retrieval balances the trade-offs
    4. Recommendations for when to use each approach"""

    # Create a summary of evaluations for each query
    evaluations_summary = ""
    for i, result in enumerate(results):
        evaluations_summary += f"Query {i+1}: {result['query']}\n"
        # Safely get a preview of the comparison summary
        comparison_summary = result.get('comparison', 'No comparison available.')
        evaluations_summary += f"Comparison Summary: {comparison_summary[:200]}...\n\n"

    # User prompt containing the evaluations summary
    user_prompt = f"""Based on the following evaluations of different retrieval methods across {len(results)} queries,
    provide an overall analysis comparing these three approaches:

    {evaluations_summary}

    Please provide a comprehensive analysis of vector-based, BM25, and fusion retrieval approaches,
    highlighting when and why fusion retrieval provides advantages over the individual methods."""

    # Combine system and user prompts into a single prompt for Gemini
    full_prompt = f"{system_prompt}\n\n{user_prompt}"

    # Initialize the Gemini GenerativeModel
    model_instance = genai.GenerativeModel("gemini-2.0-flash")
    
    try:
        # Generate the overall analysis using the Gemini API
        response = model_instance.generate_content(
            full_prompt,
            generation_config=genai.GenerationConfig(
                temperature=1 # A higher temperature allows for more creative analysis
            )
        )
        
        # Return the generated analysis content
        return response.text.strip()
        
    except Exception as e:
        return f"An error occurred during analysis generation: {e}"

## Evaluating Fusion Retrieval

In [22]:
import os
import fitz  # PyMuPDF for PDF reading
from openai import OpenAI
from dotenv import load_dotenv

# -----------------------------
# Load .env file with GOOGLE_API_KEY
# -----------------------------
load_dotenv()
GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY")



# -----------------------------
# Initialize Gemini client
# -----------------------------
client = OpenAI(
    api_key=GOOGLE_API_KEY,
    base_url="https://generativelanguage.googleapis.com/v1beta/openai/"
)

# -----------------------------
# Function to read PDF
# -----------------------------
def load_pdf_text(pdf_path):
    doc = fitz.open(pdf_path)
    text = ""
    for page in doc:
        text += page.get_text()
    return text

# -----------------------------
# Function to run Gemini retrieval + evaluation
# -----------------------------
def run_gemini_retrieval(pdf_path, test_queries, reference_answers=None):
    document_text = load_pdf_text(pdf_path)

    results = []
    for i, query in enumerate(test_queries):
        # Prompt Gemini
        prompt = f"""
You are a disaster resource assistant. 
Answer the following question ONLY using this document:

DOCUMENT:
{document_text}

QUESTION:
{query}
"""

        response = client.chat.completions.create(
            model="gemini-2.0-flash",
            messages=[
                {"role": "system", "content": "You are a helpful assistant that answers based on provided documents only."},
                {"role": "user", "content": prompt}
            ],
            temperature=0.0
        )

        ai_answer = response.choices[0].message.content.strip()

        # Evaluate vs. reference answer
        score = None
        if reference_answers and i < len(reference_answers):
            ref = reference_answers[i].strip().lower()
            ai_clean = ai_answer.strip().lower()
            score = 1 if ref in ai_clean else 0

        results.append({
            "query": query,
            "ai_answer": ai_answer,
            "reference_answer": reference_answers[i] if reference_answers else None,
            "score": score
        })

    avg_score = sum(r["score"] for r in results if r["score"] is not None) / len(results) if any(r["score"] is not None for r in results) else None

    return {
        "results": results,
        "overall_analysis": f"Average Score: {avg_score}" if avg_score is not None else "No reference answers provided."
    }

# -----------------------------
# Path & Queries
# -----------------------------
pdf_path = "/Users/kekunkoya/Desktop/RAG Google/Resources.pdf"
test_queries = ["Is it safe to drink the tap water in 17104 after the flood?"]
reference_answers = [
    "Tap water may be contaminated after a flood. Boil for at least 1 minute or use bottled water.\n\nCall 2-1-1 to find water testing kits or bottled water stations near you."
]

# -----------------------------
# Run Evaluation
# -----------------------------
evaluation_results = run_gemini_retrieval(
    pdf_path=pdf_path,
    test_queries=test_queries,
    reference_answers=reference_answers
)

# -----------------------------
# Print Results
# -----------------------------
print("\n\n=== OVERALL ANALYSIS ===\n")
print(evaluation_results["overall_analysis"])
for res in evaluation_results["results"]:
    print(f"\nQ: {res['query']}\nAI Answer: {res['ai_answer']}\nReference: {res['reference_answer']}\nScore: {res['score']}")




=== OVERALL ANALYSIS ===

Average Score: 0.0

Q: Is it safe to drink the tap water in 17104 after the flood?
AI Answer: This document does not contain information about the safety of tap water in the 17104 zip code after a flood.
Reference: Tap water may be contaminated after a flood. Boil for at least 1 minute or use bottled water.

Call 2-1-1 to find water testing kits or bottled water stations near you.
Score: 0
