# Adaptive Retrieval for Enhanced RAG Systems

In this notebook, I implement an Adaptive Retrieval system that dynamically selects the most appropriate retrieval strategy based on the type of query. This approach significantly enhances our RAG system's ability to provide accurate and relevant responses across a diverse range of questions.

Different questions demand different retrieval strategies. Our system:

1. Classifies the query type (Factual, Analytical, Opinion, or Contextual)
2. Selects the appropriate retrieval strategy
3. Executes specialized retrieval techniques
4. Generates a tailored response

## Setting Up the Environment
We begin by importing necessary libraries.

In [1]:
import os
import numpy as np
import json
import fitz
from openai import OpenAI
import re

## Extracting Text from a PDF File
To implement RAG, we first need a source of textual data. In this case, we extract text from a PDF file using the PyMuPDF library.

In [2]:
import os
import fitz  # pip install PyMuPDF

def extract_text_from_pdf(pdf_path: str) -> str:
    """
    Extracts text from a PDF file.

    Args:
        pdf_path (str): Path to the PDF file.

    Returns:
        str: Extracted text from the entire PDF.
    """
    doc = fitz.open(pdf_path)
    all_text = []
    for page in doc:
        all_text.append(page.get_text("text"))
    doc.close()
    return "\n".join(all_text)

def extract_texts_from_folder(folder_path: str):
    """
    Extracts text from all PDF files in a folder (recursively).
    Args:
        folder_path (str): Path to the folder containing PDFs.
    Returns:
        dict: {pdf_filename: extracted_text, ...}
    """
    pdf_texts = {}
    for root, _, files in os.walk(folder_path):
        for file in files:
            if file.lower().endswith('.pdf'):
                pdf_path = os.path.join(root, file)
                try:
                    pdf_texts[pdf_path] = extract_text_from_pdf(pdf_path)
                except Exception as e:
                    print(f"Failed to extract {pdf_path}: {e}")
    return pdf_texts

# Example usage:
folder_path = "/Users/kekunkoya/Desktop/RAG Google 2/PDFs"
pdf_texts = extract_texts_from_folder(folder_path)

for pdf_file, text in pdf_texts.items():
    print(f"\n--- {os.path.basename(pdf_file)} ---")
    print(text[:500])  # Print the first 500 characters to verify extraction



--- PA 211 Disaster Community Resources.pdf ---
PA 211 Community Disaster and Human 
Services Resources in Pennsylvania 
Introduction 
 
Community Disaster and Human Services Resources in Pennsylvania 
 
Disasters, whether natural or man-made, have significant and far-reaching impacts on 
individuals, families, and communities. Pennsylvania, with its mix of urban, suburban, and 
rural regions, faces a diverse array of emergencies ranging from floods and severe storms to 
public health crises and housing instability. To ensure an effective res

--- 211 RESPONDS TO URGENT NEEDS.pdf ---
211 RESPONDS TO URGENT NEEDS 
FACT
211 stood up a statewide text
response to support employees
impacted by the partial federal
government shutdown who did
not know when they would
receive their next paycheck.
211 assists in times of
disaster and widespread
need
FACT
FACT
1
PLEASE VOTE TO INCLUDE FUNDING FOR PENNSYLVANIA'S 211 SYSTEM IN THE STATE BUDGET TO
SUPPORT 211'S CAPACITY TO HELP OUR COMMUNITIES IN 

## Chunking the Extracted Text
Once we have the extracted text, we divide it into smaller, overlapping chunks to improve retrieval accuracy.

In [3]:
def chunk_text(text, n, overlap):
    """
    Chunks the given text into segments of n characters with overlap.

    Args:
    text (str): The text to be chunked.
    n (int): The number of characters in each chunk.
    overlap (int): The number of overlapping characters between chunks.

    Returns:
    List[str]: A list of text chunks.
    """
    chunks = []  # Initialize an empty list to store the chunks
    
    # Loop through the text with a step size of (n - overlap)
    for i in range(0, len(text), n - overlap):
        # Append a chunk of text from index i to i + n to the chunks list
        chunks.append(text[i:i + n])

    return chunks  # Return the list of text chunks

## Setting Up the OpenAI API Client
We initialize the OpenAI client to generate embeddings and responses.

In [4]:
# Initialize the OpenAI client with the base URL and API key
client = OpenAI(
    api_key=os.getenv("GOOGLE_API_KEY")  # Retrieve the API key from environment variables
)

## Simple Vector Store Implementation
We'll create a basic vector store to manage document chunks and their embeddings.

In [6]:
import os
import numpy as np
import fitz  # pip install PyMuPDF
import google.generativeai as genai
from dotenv import load_dotenv

class SimpleVectorStore:
    """
    A simple vector store implementation using NumPy.
    """
    def __init__(self):
        self.vectors = []   # List to store embedding vectors
        self.texts = []     # List to store original texts
        self.metadata = []  # List to store metadata for each text

    def add_item(self, text, embedding, metadata=None):
        """
        Add an item to the vector store.
        """
        self.vectors.append(np.array(embedding))
        self.texts.append(text)
        self.metadata.append(metadata or {})

    def similarity_search(self, query_embedding, k=5, filter_func=None):
        """
        Find the most similar items to a query embedding.
        """
        if not self.vectors:
            return []
        query_vector = np.array(query_embedding)
        similarities = []
        for i, vector in enumerate(self.vectors):
            if filter_func and not filter_func(self.metadata[i]):
                continue
            norm_query = np.linalg.norm(query_vector)
            norm_vector = np.linalg.norm(vector)
            if norm_query == 0 or norm_vector == 0:
                similarity = 0.0
            else:
                similarity = np.dot(query_vector, vector) / (norm_query * norm_vector)
            similarities.append((i, similarity))
        similarities.sort(key=lambda x: x[1], reverse=True)
        results = []
        for i in range(min(k, len(similarities))):
            idx, score = similarities[i]
            results.append({
                "text": self.texts[idx],
                "metadata": self.metadata[idx],
                "similarity": score
            })
        return results

def extract_text_from_pdf(pdf_path: str) -> str:
    doc = fitz.open(pdf_path)
    all_text = []
    for page in doc:
        all_text.append(page.get_text("text"))
    doc.close()
    return "\n".join(all_text)

def chunk_text(text, chunk_size=1000, overlap=200):
    chunks = []
    start = 0
    while start < len(text):
        end = min(start + chunk_size, len(text))
        chunks.append(text[start:end])
        start += chunk_size - overlap
        if start >= len(text):
            break
    return chunks

def create_gemini_embedding(text, model="models/embedding-001"):
    response = genai.embed_content(model=model, content=text)
    return response['embedding']

if __name__ == '__main__':
    # Load API key
    load_dotenv()
    api_key = os.getenv("GEMINI_API_KEY")

    try:
        genai.configure(api_key=api_key)
    except Exception as e:
        print(f"An error occurred during Gemini API configuration: {e}")
        exit()

    # Create the vector store
    store = SimpleVectorStore()

    # Folder containing PDFs
    folder_path = "/Users/kekunkoya/Desktop/RAG Google 2/PDFs/"
    for root, _, files in os.walk(folder_path):
        for file in files:
            if file.lower().endswith('.pdf'):
                pdf_path = os.path.join(root, file)
                print(f"Processing PDF: {pdf_path}")
                try:
                    text = extract_text_from_pdf(pdf_path)
                except Exception as e:
                    print(f"Failed to extract {pdf_path}: {e}")
                    continue
                chunks = chunk_text(text, chunk_size=1000, overlap=200)
                for i, chunk in enumerate(chunks):
                    if not chunk.strip():
                        continue
                    try:
                        embedding = create_gemini_embedding(chunk)
                        store.add_item(chunk, embedding, metadata={"source": file, "chunk_id": i})
                    except Exception as e:
                        print(f"Embedding failed for {file} chunk {i}: {e}")

    print("Vector store populated with Gemini embeddings from all PDFs.")

    # Sample similarity search
    query_text = "how to make a plan for my family"
    print(f"\nSearching for items similar to: '{query_text}'")
    query_embedding = create_gemini_embedding(query_text)
    search_results = store.similarity_search(query_embedding, k=3)

    print("\nTop 3 search results:")
    for result in search_results:
        print(f"  - Text: {result['text'][:150].replace('\n',' ')}...")
        print(f"    Similarity: {result['similarity']:.4f}")
        print(f"    Metadata: {result['metadata']}")


Processing PDF: /Users/kekunkoya/Desktop/RAG Google 2/PDFs/PA 211 Disaster Community Resources.pdf
Processing PDF: /Users/kekunkoya/Desktop/RAG Google 2/PDFs/211 RESPONDS TO URGENT NEEDS.pdf
Processing PDF: /Users/kekunkoya/Desktop/RAG Google 2/PDFs/PEMA.pdf
Processing PDF: /Users/kekunkoya/Desktop/RAG Google 2/PDFs/ready-gov_disaster-preparedness-guide-for-older-adults.pdf
Processing PDF: /Users/kekunkoya/Desktop/RAG Google 2/PDFs/Substantial Damages Toolkit.pdf
Vector store populated with Gemini embeddings from all PDFs.

Searching for items similar to: 'how to make a plan for my family'

Top 3 search results:
  - Text: fore an emergency happens, sit down together and decide how you will get in  contact with each other, what mobility and/ or medication issues will nee...
    Similarity: 0.6842
    Metadata: {'source': 'PEMA.pdf', 'chunk_id': 65}
  - Text:  your family and friends have a plan in case of an emergency. Fill  out these cards and give one to each of them to make sure they

## Creating Embeddings

In [7]:
import google.generativeai as genai

# Assume genai.configure(api_key="YOUR_API_KEY") has been called and 'client' is not used.

def create_embeddings(text, model="models/embedding-001"):
    """
    Creates embeddings for the given text using the specified Gemini model.

    Args:
    text (str or List[str]): The input text(s) for which embeddings are to be created.
    model (str): The model to be used for creating embeddings. Defaults to "models/embedding-001".

    Returns:
    List[float] or List[List[float]]: The embedding vector(s).
    """
    # Gemini's embed_content can handle both a single string or a list of strings
    # in the 'content' parameter.
    response = genai.embed_content(
        model=model,
        content=text
    )

    # If the original input was a single string, return just the first embedding vector.
    if isinstance(text, str):
        return response['embedding']

    # Otherwise, return all embedding vectors as a list of lists.
    return response['embedding']

## Document Processing Pipeline

In [8]:
import os

def process_folder(folder_path, chunk_size=1000, chunk_overlap=200):
    """
    Process all PDFs in a folder for use with adaptive retrieval.

    Args:
        folder_path (str): Path to the folder containing PDF files.
        chunk_size (int): Size of each chunk in characters.
        chunk_overlap (int): Overlap between chunks in characters.

    Returns:
        Tuple[List[str], SimpleVectorStore]: All document chunks and combined vector store.
    """
    all_chunks = []
    store = SimpleVectorStore()

    for root, _, files in os.walk(folder_path):
        for file in files:
            if file.lower().endswith('.pdf'):
                pdf_path = os.path.join(root, file)
                print(f"\nExtracting text from PDF: {pdf_path}")
                extracted_text = extract_text_from_pdf(pdf_path)

                if not extracted_text:
                    print(f"Failed to extract text from {pdf_path}. Skipping.")
                    continue

                print("Chunking text...")
                chunks = chunk_text(extracted_text, chunk_size, chunk_overlap)
                print(f"Created {len(chunks)} text chunks for {file}")

                print("Creating embeddings for chunks...")
                chunk_embeddings = create_embeddings(chunks)

                if len(chunks) != len(chunk_embeddings):
                    print(f"Error: Mismatch between number of chunks and embeddings for {file}. Skipping.")
                    continue

                for i, (chunk, embedding) in enumerate(zip(chunks, chunk_embeddings)):
                    store.add_item(
                        text=chunk,
                        embedding=embedding,
                        metadata={"index": i, "source": file}
                    )
                all_chunks.extend(chunks)
                print(f"Added {len(chunks)} chunks from {file} to the vector store.")

    print(f"\nAll done! Processed {len(all_chunks)} chunks from all PDFs.")
    return all_chunks, store

# Example usage:
# folder_path = "/Users/kekunkoya/Desktop/RAG Google 2/PDFs/"
# all_chunks, store = process_folder(folder_path)


## Query Classification

In [9]:
import google.generativeai as genai

def classify_query(query, model="gemini-pro"):
    """
    Classify a query into one of four categories: Factual, Analytical, Opinion, or Contextual.
    
    Args:
        query (str): User query
        model (str): LLM model to use. Defaults to "gemini-2.0-flash".
        
    Returns:
        str: Query category
    """
    # Define the prompt to guide the AI's classification
    prompt = f"""
    You are an expert at classifying questions.
    Classify the given query into exactly one of these categories:
    - Factual: Queries seeking specific, verifiable information.
    - Analytical: Queries requiring comprehensive analysis or explanation.
    - Opinion: Queries about subjective matters or seeking diverse viewpoints.
    - Contextual: Queries that depend on user-specific context.

    Return ONLY the category name, without any explanation or additional text.

    Query: {query}

    Category:
    """

    # Create a GenerativeModel instance
    model_instance = genai.GenerativeModel(model)

    # Generate the classification response from the AI model
    try:
        response = model_instance.generate_content(
            prompt,
            generation_config=genai.GenerationConfig(
                temperature=0.0, # Low temperature for deterministic output
                max_output_tokens=20 # Limit output to ensure it's just the category name
            )
        )
        
        # Extract and strip the category from the response
        category = response.text.strip()
    
        # Define the list of valid categories
        valid_categories = ["Factual", "Analytical", "Opinion", "Contextual"]
        
        # Ensure the returned category is a valid, single word
        for valid in valid_categories:
            if valid.lower() in category.lower():
                return valid
    
    except Exception as e:
        print(f"An error occurred during query classification: {e}")
        # Default to "Factual" if classification fails
        return "Factual"
    
    # Default to "Factual" if classification is not one of the valid categories
    return "Factual"

## Implementing Specialized Retrieval Strategies
### 1. Factual Strategy - Focus on Precision

In [10]:
import google.generativeai as genai

# Assume genai.configure(api_key="YOUR_API_KEY") has been called.

def call_gemini(prompt, model="gemini-2.0-flash", temperature=0):
    """A helper function to make a call to the Gemini API."""
    model_instance = genai.GenerativeModel(model)
    response = model_instance.generate_content(
        prompt,
        generation_config=genai.GenerationConfig(temperature=temperature)
    )
    return response.text.strip()

def factual_retrieval_strategy(query, vector_store, k=4):
    """
    Retrieval strategy for factual queries focusing on precision.
    
    Args:
        query (str): User query
        vector_store (SimpleVectorStore): Vector store
        k (int): Number of documents to return
        
    Returns:
        List[Dict]: Retrieved documents
    """
    print(f"Executing Factual retrieval strategy for: '{query}'")
    
    # Use LLM to enhance the query for better precision
    system_prompt = """You are an expert at enhancing search queries.
    Your task is to reformulate the given factual query to make it more precise and
    specific for information retrieval. Focus on key entities and their relationships.
    Provide ONLY the enhanced query without any explanation.
    """

    user_prompt = f"Enhance this factual query: {query}"
    
    # Generate the enhanced query using the LLM
    enhanced_query_prompt = f"{system_prompt}\n\n{user_prompt}"
    enhanced_query = call_gemini(enhanced_query_prompt, model="gemini-2.0-flash", temperature=0)
    print(f"Enhanced query: {enhanced_query}")
    
    # Create embeddings for the enhanced query
    query_embedding = create_embeddings(enhanced_query)
    
    # Perform initial similarity search to retrieve documents
    initial_results = vector_store.similarity_search(query_embedding, k=k*2)
    
    # Initialize a list to store ranked results
    ranked_results = []
    
    # Score and rank documents by relevance using LLM
    for doc in initial_results:
        relevance_score = score_document_relevance(enhanced_query, doc["text"])
        ranked_results.append({
            "text": doc["text"],
            "metadata": doc["metadata"],
            "similarity": doc["similarity"],
            "relevance_score": relevance_score
        })
    
    # Sort the results by relevance score in descending order
    ranked_results.sort(key=lambda x: x["relevance_score"], reverse=True)
    
    # Return the top k results
    return ranked_results[:k]



### 2. Analytical Strategy - Comprehensive Coverage

In [11]:
import google.generativeai as genai



def analytical_retrieval_strategy(query, vector_store, k=4):
    """
    Retrieval strategy for analytical queries focusing on comprehensive coverage.

    Args:
        query (str): User query
        vector_store (SimpleVectorStore): Vector store
        k (int): Number of documents to return

    Returns:
        List[Dict]: Retrieved documents
    """
    print(f"Executing Analytical retrieval strategy for: '{query}'")

    # Define the prompt to guide the AI in generating sub-questions
    prompt = f"""
    You are an expert at breaking down complex questions.
    Generate sub-questions that explore different aspects of the main analytical query.
    These sub-questions should cover the breadth of the topic and help retrieve
    comprehensive information.

    Return a list of exactly 3 sub-questions, one per line.

    Main query: {query}
    """

    # Create a GenerativeModel instance
    model_instance = genai.GenerativeModel("gemini-2.0-flash")

    # Generate the sub-questions using the LLM
    try:
        response = model_instance.generate_content(
            prompt,
            generation_config=genai.GenerationConfig(
                temperature=0.3,
                max_output_tokens=150 # A reasonable limit for 3 questions
            )
        )
        
        # Extract and clean the sub-questions
        sub_queries = response.text.strip().split('\n')
        sub_queries = [q.strip() for q in sub_queries if q.strip()]
        print(f"Generated sub-queries: {sub_queries}")

    except Exception as e:
        print(f"An error occurred during sub-query generation: {e}")
        sub_queries = [query] # Fallback to the original query
    
    # Retrieve documents for each sub-query
    all_results = []
    for sub_query in sub_queries:
        # Create embeddings for the sub-query
        sub_query_embedding = create_embeddings(sub_query)
        # Perform similarity search for the sub-query
        results = vector_store.similarity_search(sub_query_embedding, k=2)
        all_results.extend(results)
    
    # Ensure diversity by selecting from different sub-query results
    # Remove duplicates (same text content)
    unique_texts = set()
    diverse_results = []
    
    for result in all_results:
        if result["text"] not in unique_texts:
            unique_texts.add(result["text"])
            diverse_results.append(result)
    
    # If we need more results to reach k, add more from initial results
    if len(diverse_results) < k:
        # Direct retrieval for the main query
        main_query_embedding = create_embeddings(query)
        main_results = vector_store.similarity_search(main_query_embedding, k=k)
        
        for result in main_results:
            if result["text"] not in unique_texts and len(diverse_results) < k:
                unique_texts.add(result["text"])
                diverse_results.append(result)
    
    # Return the top k diverse results
    return diverse_results[:k]

### 3. Opinion Strategy - Diverse Perspectives

In [12]:
import google.generativeai as genai

# Assume genai.configure(api_key="YOUR_API_KEY") has been called.
# Also assume that `create_embeddings` and `SimpleVectorStore` are defined.

def opinion_retrieval_strategy(query, vector_store, k=4):
    """
    Retrieval strategy for opinion queries focusing on diverse perspectives.
    
    Args:
        query (str): User query
        vector_store (SimpleVectorStore): Vector store
        k (int): Number of documents to return
        
    Returns:
        List[Dict]: Retrieved documents
    """
    print(f"Executing Opinion retrieval strategy for: '{query}'")
    
    # Define the prompt to guide the AI in identifying different perspectives
    prompt = f"""
    You are an expert at identifying different perspectives on a topic.
    For the given query about opinions or viewpoints, identify different perspectives
    that people might have on this topic.

    Return a list of exactly 3 different viewpoint angles, one per line.

    Query: {query}
    """

    # Create a GenerativeModel instance
    model_instance = genai.GenerativeModel("gemini-2.0-flash")
    
    # Generate the different perspectives using the LLM
    try:
        response = model_instance.generate_content(
            prompt,
            generation_config=genai.GenerationConfig(
                temperature=0.3,
                max_output_tokens=150 # A reasonable limit for 3 short viewpoints
            )
        )
        
        # Extract and clean the viewpoints
        viewpoints = response.text.strip().split('\n')
        viewpoints = [v.strip() for v in viewpoints if v.strip()]
        print(f"Identified viewpoints: {viewpoints}")

    except Exception as e:
        print(f"An error occurred during viewpoint generation: {e}")
        # Fallback to a simple retrieval if viewpoint generation fails
        viewpoint_embedding = create_embeddings(query)
        return vector_store.similarity_search(viewpoint_embedding, k=k)
    
    # Retrieve documents representing each viewpoint
    all_results = []
    for viewpoint in viewpoints:
        # Combine the main query with the viewpoint
        combined_query = f"{query} {viewpoint}"
        # Create embeddings for the combined query
        viewpoint_embedding = create_embeddings(combined_query)
        # Perform similarity search for the combined query
        results = vector_store.similarity_search(viewpoint_embedding, k=2)
        
        # Mark results with the viewpoint they represent
        for result in results:
            result["viewpoint"] = viewpoint
        
        # Add the results to the list of all results
        all_results.extend(results)
    
    # Select a diverse range of opinions
    # Ensure we get at least one document from each viewpoint if possible
    selected_results = []
    for viewpoint in viewpoints:
        # Filter documents by viewpoint
        viewpoint_docs = [r for r in all_results if r.get("viewpoint") == viewpoint]
        if viewpoint_docs:
            selected_results.append(viewpoint_docs[0])
    
    # Fill remaining slots with highest similarity docs
    remaining_slots = k - len(selected_results)
    if remaining_slots > 0:
        # Sort remaining docs by similarity
        remaining_docs = [r for r in all_results if r not in selected_results]
        remaining_docs.sort(key=lambda x: x["similarity"], reverse=True)
        selected_results.extend(remaining_docs[:remaining_slots])
    
    # Return the top k results
    return selected_results[:k]

### 4. Contextual Strategy - User Context Integration

In [13]:
import google.generativeai as genai



def call_gemini(prompt, model="gemini-pro", temperature=0):
    """A helper function to make a call to the Gemini API."""
    model_instance = genai.GenerativeModel(model)
    response = model_instance.generate_content(
        prompt,
        generation_config=genai.GenerationConfig(temperature=temperature)
    )
    return response.text.strip()

def contextual_retrieval_strategy(query, vector_store, k=4, user_context=None):
    """
    Retrieval strategy for contextual queries integrating user context.
    
    Args:
        query (str): User query
        vector_store (SimpleVectorStore): Vector store
        k (int): Number of documents to return
        user_context (str): Additional user context
        
    Returns:
        List[Dict]: Retrieved documents
    """
    print(f"Executing Contextual retrieval strategy for: '{query}'")
    
    # If no user context provided, try to infer it from the query
    if not user_context:
        system_prompt = """You are an expert at understanding implied context in questions.
        For the given query, infer what contextual information might be relevant or implied
        but not explicitly stated. Focus on what background would help answering this query.

        Return a brief description of the implied context."""

        user_prompt = f"Infer the implied context in this query: {query}"
        
        inferred_context_prompt = f"{system_prompt}\n\n{user_prompt}"
        
        try:
            # Generate the inferred context using the LLM
            user_context = call_gemini(inferred_context_prompt, model="gemini-2.0-flash", temperature=0.1)
            print(f"Inferred context: {user_context}")
        except Exception as e:
            print(f"Error inferring context: {e}")
            user_context = "" # Fallback to empty context
    
    # Reformulate the query to incorporate context
    system_prompt = """You are an expert at reformulating questions with context.
    Given a query and some contextual information, create a more specific query that
    incorporates the context to get more relevant information.

    Return ONLY the reformulated query without explanation."""

    user_prompt = f"""
    Query: {query}
    Context: {user_context}

    Reformulate the query to incorporate this context:"""
    
    contextualized_query_prompt = f"{system_prompt}\n\n{user_prompt}"
    
    try:
        # Generate the contextualized query using the LLM
        contextualized_query = call_gemini(contextualized_query_prompt, model="gemini-2.0-flash", temperature=0)
        print(f"Contextualized query: {contextualized_query}")
    except Exception as e:
        print(f"Error reformulating query: {e}")
        contextualized_query = query # Fallback to original query
    
    # Retrieve documents based on the contextualized query
    query_embedding = create_embeddings(contextualized_query)
    initial_results = vector_store.similarity_search(query_embedding, k=k*2)
    
    # Rank documents considering both relevance and user context
    ranked_results = []
    
    for doc in initial_results:
        # Score document relevance considering the context
        context_relevance = score_document_context_relevance(query, user_context, doc["text"])
        ranked_results.append({
            "text": doc["text"],
            "metadata": doc["metadata"],
            "similarity": doc["similarity"],
            "context_relevance": context_relevance
        })
    
    # Sort by context relevance and return top k results
    ranked_results.sort(key=lambda x: x["context_relevance"], reverse=True)
    return ranked_results[:k]

## Helper Functions for Document Scoring

In [14]:
import google.generativeai as genai
import re

# Assume genai.configure(api_key="YOUR_API_KEY") has been called.

def score_document_relevance(query, document, model="gemini-2.0-flash"):
    """
    Score document relevance to a query using a Gemini model.

    Args:
        query (str): User query
        document (str): Document text
        model (str): LLM model. Defaults to "gemini-2.0-flash".

    Returns:
        float: Relevance score from 0-10
    """
    # System prompt to instruct the model on how to rate relevance
    system_prompt = """You are an expert at evaluating document relevance.
    Rate the relevance of a document to a query on a scale from 0 to 10, where:
    0 = Completely irrelevant
    10 = Perfectly addresses the query

    Return ONLY a numerical score between 0 and 10, nothing else.
    """

    # Truncate document if it's too long
    # Gemini models have higher context limits, but truncating is still good practice.
    doc_preview = document[:4000] + "..." if len(document) > 4000 else document

    # User prompt containing the query and document preview
    user_prompt = f"""
    Query: {query}

    Document: {doc_preview}

    Relevance score (0-10):
    """
    
    # Combine the system and user prompts into a single prompt for Gemini
    full_prompt = f"{system_prompt}\n\n{user_prompt}"

    # Create a GenerativeModel instance
    model_instance = genai.GenerativeModel(model)

    # Generate response from the model
    try:
        response = model_instance.generate_content(
            full_prompt,
            generation_config=genai.GenerationConfig(
                temperature=0.0, # Low temperature for a deterministic score
                max_output_tokens=10 # Keep output short
            )
        )
        
        # Extract the score from the model's response
        score_text = response.text.strip()
        
        # Extract numeric score using regex
        match = re.search(r'(\d+(\.\d+)?)', score_text)
        if match:
            score = float(match.group(1))
            return min(10.0, max(0.0, score))  # Ensure score is within 0-10
        else:
            # Default score if extraction fails
            return 5.0

    except Exception as e:
        print(f"An error occurred during relevance scoring: {e}")
        return 5.0 # Return a neutral score on error

## The Core Adaptive Retriever

In [15]:


def adaptive_retrieval(query, vector_store, k=4, user_context=None):
    """
    Perform adaptive retrieval by selecting and executing the appropriate strategy.

    Args:
        query (str): User query
        vector_store (SimpleVectorStore): Vector store
        k (int): Number of documents to retrieve
        user_context (str): Optional user context for contextual queries

    Returns:
        List[Dict]: Retrieved documents
    """
    # Classify the query to determine its type
    try:
        query_type = classify_query(query)
    except Exception as e:
        print(f"Error classifying query. Falling back to Factual retrieval. Details: {e}")
        query_type = "Factual"
        
    print(f"Query classified as: {query_type}")

    # Select and execute the appropriate retrieval strategy based on the query type
    if query_type == "Factual":
        # Use the factual retrieval strategy for precise information
        results = factual_retrieval_strategy(query, vector_store, k)
    elif query_type == "Analytical":
        # Use the analytical retrieval strategy for comprehensive coverage
        results = analytical_retrieval_strategy(query, vector_store, k)
    elif query_type == "Opinion":
        # Use the opinion retrieval strategy for diverse perspectives
        results = opinion_retrieval_strategy(query, vector_store, k)
    elif query_type == "Contextual":
        # Use the contextual retrieval strategy, incorporating user context
        results = contextual_retrieval_strategy(query, vector_store, k, user_context)
    else:
        # Default to factual retrieval strategy if classification fails
        results = factual_retrieval_strategy(query, vector_store, k)
        
    return results  # Return the retrieved documents

In [16]:
# Response Generation
# 
# import google.generativeai as genai

# Assume genai.configure(api_key="YOUR_API_KEY") has been called.

def generate_response(query, results, query_type, model="gemini-2.0-flash"):
    """
    Generate a response based on query, retrieved documents, and query type.

    Args:
        query (str): User query
        results (List[Dict]): Retrieved documents
        query_type (str): Type of query
        model (str): LLM model. Defaults to "gemini-2.0-flash".

    Returns:
        str: Generated response
    """
    # Prepare context from retrieved documents by joining their texts with separators
    context = "\n\n---\n\n".join([r["text"] for r in results])

    # Create custom system prompt based on query type
    if query_type == "Factual":
        system_prompt = """You are a helpful assistant providing factual information.
        Answer the question based on the provided context. Focus on accuracy and precision.
        If the context doesn't contain the information needed, acknowledge the limitations."""

    elif query_type == "Analytical":
        system_prompt = """You are a helpful assistant providing analytical insights.
        Based on the provided context, offer a comprehensive analysis of the topic.
        Cover different aspects and perspectives in your explanation.
        If the context has gaps, acknowledge them while providing the best analysis possible."""

    elif query_type == "Opinion":
        system_prompt = """You are a helpful assistant discussing topics with multiple viewpoints.
        Based on the provided context, present different perspectives on the topic.
        Ensure fair representation of diverse opinions without showing bias.
        Acknowledge where the context presents limited viewpoints."""

    elif query_type == "Contextual":
        system_prompt = """You are a helpful assistant providing contextually relevant information.
        Answer the question considering both the query and its context.
        Make connections between the query context and the information in the provided documents.
        If the context doesn't fully address the specific situation, acknowledge the limitations."""

    else:
        system_prompt = """You are a helpful assistant. Answer the question based on the provided context. If you cannot answer from the context, acknowledge the limitations."""

    # Create a single user prompt by combining the system prompt, context, and query
    user_prompt = f"""
    {system_prompt}

    Context:
    {context}

    Question: {query}

    Please provide a helpful response based on the context.
    """

    # Initialize the Gemini GenerativeModel
    model_instance = genai.GenerativeModel(model)

    # Generate response using the Gemini API
    try:
        response = model_instance.generate_content(
            user_prompt,
            generation_config=genai.GenerationConfig(
                temperature=0.2 # Temperature for some creativity
            )
        )

        # Return the generated response content
        return response.text.strip()
        
    except Exception as e:
        return f"An error occurred while generating the response: {e}"

## Complete RAG Pipeline with Adaptive Retrieval

In [18]:
def rag_with_adaptive_retrieval(pdf_path, query, k=4, user_context=None):
    """
    Complete RAG pipeline with adaptive retrieval.
    
    Args:
        pdf_path (str): Path to PDF document
        query (str): User query
        k (int): Number of documents to retrieve
        user_context (str): Optional user context
        
    Returns:
        Dict: Results including query, retrieved documents, query type, and response
    """
    print("\n=== RAG WITH ADAPTIVE RETRIEVAL ===")
    print(f"Query: {query}")
    
    # Process the document to extract text, chunk it, and create embeddings
    chunks, vector_store = process_document(pdf_path)
    
    # Classify the query to determine its type
    query_type = classify_query(query)
    print(f"Query classified as: {query_type}")
    
    # Retrieve documents using the adaptive retrieval strategy based on the query type
    retrieved_docs = adaptive_retrieval(query, vector_store, k, user_context)
    
    # Generate a response based on the query, retrieved documents, and query type
    response = generate_response(query, retrieved_docs, query_type)
    
    # Compile the results into a dictionary
    result = {
        "query": query,
        "query_type": query_type,
        "retrieved_documents": retrieved_docs,
        "response": response
    }
    
    print("\n=== RESPONSE ===")
    print(response)
    
    return result

## Evaluation Framework

In [19]:
# pip install google-generativeai python-dotenv pymupdf
import os, time, json
import numpy as np
import fitz  # PyMuPDF
import google.generativeai as genai
from dotenv import load_dotenv

# -------------------- Gemini setup --------------------
load_dotenv()
GEMINI_API_KEY = os.getenv("GEMINI_API_KEY")
if not GEMINI_API_KEY:
    raise RuntimeError("GEMINI_API_KEY is not set in your environment.")
genai.configure(api_key=GEMINI_API_KEY)

GEN_MODEL = "gemini-2.0-flash"
EMBED_MODEL = "models/embedding-001"

# -------------------- Helpers --------------------
def _gen_with_retry(prompt, temperature=0, max_retries=5, base_sleep=5):
    model = genai.GenerativeModel(GEN_MODEL)
    for att in range(max_retries):
        try:
            resp = model.generate_content(
                prompt,
                generation_config=genai.GenerationConfig(temperature=temperature)
            )
            return (resp.text or "").strip()
        except Exception as e:
            msg = str(e)
            if "429" in msg or "ResourceExhausted" in msg:
                sleep_s = base_sleep * (att+1)
                print(f"[generate] Quota hit. Sleeping {sleep_s}s...")
                time.sleep(sleep_s)
                continue
            raise

def _embed_one(text, max_retries=5, base_sleep=5):
    for att in range(max_retries):
        try:
            resp = genai.embed_content(model=EMBED_MODEL, content=text)
            # normalize response
            if isinstance(resp, dict):
                emb = resp.get("embedding", {}).get("values")
                if emb is None and "embedding" in resp:
                    emb = resp["embedding"]
            else:
                emb = getattr(resp, "embedding", None)
                emb = emb.values if hasattr(emb, "values") else emb
            if emb is None:
                raise RuntimeError(f"Unexpected embedding response: {type(resp)} {resp}")
            arr = np.array(emb, dtype=np.float32)
            return arr if arr.ndim == 1 else arr.reshape(-1)
        except Exception as e:
            msg = str(e)
            if "429" in msg or "ResourceExhausted" in msg:
                sleep_s = base_sleep * (att+1)
                print(f"[embed] Quota hit. Sleeping {sleep_s}s...")
                time.sleep(sleep_s)
                continue
            raise

def cosine(a, b):
    an = np.linalg.norm(a); bn = np.linalg.norm(b)
    if an == 0 or bn == 0: return 0.0
    return float(np.dot(a, b) / (an * bn))

# -------------------- Minimal vector store (NumPy) --------------------
class GeminiVectorStore:
    def __init__(self):
        self.vectors = []
        self.texts = []
        self.metadata = []

    def add_item(self, text, embedding, metadata=None):
        self.vectors.append(np.array(embedding, dtype=np.float32))
        self.texts.append(text)
        self.metadata.append(metadata or {})

    def add_texts(self, texts, metadatas=None, sleep_between=0.0):
        metadatas = metadatas or [{}] * len(texts)
        for t, m in zip(texts, metadatas):
            self.add_item(t, _embed_one(t), m)
            if sleep_between: time.sleep(sleep_between)

    def similarity_search(self, query_embedding, k=4, filter_func=None):
        if not self.vectors: return []
        q = np.array(query_embedding, dtype=np.float32)
        qn = np.linalg.norm(q)
        sims = []
        for i, v in enumerate(self.vectors):
            if filter_func and not filter_func(self.metadata[i]): 
                continue
            vn = np.linalg.norm(v)
            sim = 0.0 if qn == 0 or vn == 0 else float(np.dot(q, v) / (qn * vn))
            sims.append((i, sim))
        sims.sort(key=lambda x: x[1], reverse=True)
        out = []
        for idx, score in sims[:k]:
            out.append({"text": self.texts[idx], "metadata": self.metadata[idx], "similarity": score})
        return out

    def similarity_search_by_text(self, query_text, k=4, filter_func=None):
        q_emb = _embed_one(query_text)
        return self.similarity_search(q_emb, k=k, filter_func=filter_func)

# -------------------- PDF -> chunks -> store --------------------
def extract_text_from_pdf(pdf_path):
    doc = fitz.open(pdf_path)
    text = []
    for page in doc:
        text.append(page.get_text("text"))
    doc.close()
    return "\n".join(text)

def chunk_text(text, chunk_size=1000, overlap=200):
    chunks = []
    start = 0
    while start < len(text):
        end = min(start + chunk_size, len(text))
        chunks.append(text[start:end])
        if end == len(text): break
        start += chunk_size - overlap
    return chunks

def process_document_gemini(pdf_path, chunk_size=1000, overlap=200):
    raw = extract_text_from_pdf(pdf_path)
    chunks = chunk_text(raw, chunk_size, overlap)
    vs = GeminiVectorStore()
    vs.add_texts(chunks)  # embeds each chunk
    return chunks, vs

# -------------------- Query classification & retrieval --------------------
def classify_query_gemini(query):
    # Cheap heuristic first; adjust as you like
    q = query.lower()
    if any(x in q for x in ["why", "how", "analyz", "compare", "tradeoff"]): return "Analytical"
    if any(x in q for x in ["opinion", "should i", "what do you think"]): return "Opinion"
    if any(x in q for x in ["context", "background", "overview", "explain"]): return "Contextual"
    return "Factual"

def adaptive_retrieval_gemini(query, vector_store, k=4):
    qtype = classify_query_gemini(query)
    # simple policy: factual/contextual => higher k, analytical => same k, opinion => smaller k
    kk = {"Factual": max(4, k), "Contextual": max(5, k), "Analytical": k, "Opinion": max(3, k//1)}.get(qtype, k)
    docs = vector_store.similarity_search_by_text(query, k=kk)
    return qtype, docs

# -------------------- Response generator --------------------
def generate_response_gemini(query, docs, query_type="General"):
    context = "\n\n".join([f"[{i+1}] {d['text']}" for i, d in enumerate(docs)])
    prompt = f"""You are a helpful assistant answering with ONLY the provided context if possible.
Query type: {query_type}
User query: {query}

Context:
{context}

Instructions:
- Cite supporting chunk indices like [1], [2] in-line.
- If the answer is not in context, say so briefly and provide best-effort guidance.
- Be concise (120-180 words).
"""
    return _gen_with_retry(prompt, temperature=0)

# -------------------- Optional comparison against references --------------------
def compare_responses(results):
    # Simple embedding-based score vs reference, averaged
    scores = []
    for r in results:
        ref = r.get("reference_answer")
        if not ref: 
            continue
        # score standard
        std_emb = _embed_one(r["standard_retrieval"]["response"])
        adp_emb = _embed_one(r["adaptive_retrieval"]["response"])
        ref_emb = _embed_one(ref)
        scores.append({
            "std": cosine(std_emb, ref_emb),
            "adp": cosine(adp_emb, ref_emb)
        })
    if not scores:
        return {"message": "No references provided; skipping quantitative comparison."}
    std_avg = float(np.mean([s["std"] for s in scores]))
    adp_avg = float(np.mean([s["adp"] for s in scores]))
    return {
        "standard_avg_sim": round(std_avg, 3),
        "adaptive_avg_sim": round(adp_avg, 3),
        "winner": "adaptive" if adp_avg > std_avg else ("standard" if std_avg > adp_avg else "tie")
    }

# -------------------- The function you asked to convert --------------------
def evaluate_adaptive_vs_standard_gemini(pdf_path, test_queries, reference_answers=None):
    """
    Gemini-powered evaluation of adaptive vs standard retrieval.
    """
    print("=== EVALUATING ADAPTIVE VS. STANDARD RETRIEVAL (Gemini) ===")

    # Build chunks & vector store
    chunks, vector_store = process_document_gemini(pdf_path)

    results = []
    for i, query in enumerate(test_queries):
        print(f"\n\nQuery {i+1}: {query}")

        # --- Standard retrieval ---
        print("\n--- Standard Retrieval ---")
        query_embedding = _embed_one(query)
        standard_docs = vector_store.similarity_search(query_embedding, k=4)
        standard_response = generate_response_gemini(query, standard_docs, "General")

        # --- Adaptive retrieval ---
        print("\n--- Adaptive Retrieval ---")
        qtype, adaptive_docs = adaptive_retrieval_gemini(query, vector_store, k=4)
        adaptive_response = generate_response_gemini(query, adaptive_docs, qtype)

        entry = {
            "query": query,
            "query_type": qtype,
            "standard_retrieval": {
                "documents": standard_docs,
                "response": standard_response
            },
            "adaptive_retrieval": {
                "documents": adaptive_docs,
                "response": adaptive_response
            }
        }
        if reference_answers and i < len(reference_answers):
            entry["reference_answer"] = reference_answers[i]

        results.append(entry)

        print("\n--- Responses ---")
        print(f"Standard: {standard_response[:200]}...")
        print(f"Adaptive: {adaptive_response[:200]}...")

    comparison = None
    if reference_answers:
        comparison = compare_responses(results)
        print("\n=== EVALUATION RESULTS ===")
        print(comparison)

    return {
        "results": results,
        "comparison": comparison if reference_answers else "No reference answers provided for evaluation"
    }


In [20]:
# pip install google-generativeai python-dotenv
import os, time
import google.generativeai as genai
from dotenv import load_dotenv

# --- one-time setup (safe to move elsewhere) ---
load_dotenv()
GEMINI_API_KEY = os.getenv("GEMINI_API_KEY")
if not GEMINI_API_KEY:
    raise RuntimeError("GEMINI_API_KEY is not set.")
genai.configure(api_key=GEMINI_API_KEY)

_GEMINI_MODEL = "gemini-2.0-flash"

def _gen_with_retry(system_prompt: str, user_prompt: str, temperature: float = 0.2,
                    max_retries: int = 5, base_sleep: int = 5) -> str:
    """
    Generate with Gemini, retrying on quota (429/ResourceExhausted).
    Returns response text ('' if unavailable).
    """
    model = genai.GenerativeModel(_GEMINI_MODEL)
    for attempt in range(max_retries):
        try:
            resp = model.generate_content(
                contents=[
                    {"role": "system", "parts": [system_prompt]},
                    {"role": "user", "parts": [user_prompt]},
                ],
                generation_config=genai.GenerationConfig(temperature=temperature)
            )
            return (resp.text or "").strip()
        except Exception as e:
            msg = str(e)
            if "429" in msg or "ResourceExhausted" in msg:
                sleep_s = base_sleep * (attempt + 1)
                print(f"[Gemini] Quota hit (attempt {attempt+1}/{max_retries}). Sleeping {sleep_s}s...")
                time.sleep(sleep_s)
                continue
            raise
    return ""

def compare_responses_gemini(results):
    """
    Compare standard and adaptive responses against reference answers using Gemini.
    
    Args:
        results (List[Dict]): items with keys:
          - 'query', 'query_type'
          - 'reference_answer' (optional)
          - 'standard_retrieval': {'response': str}
          - 'adaptive_retrieval': {'response': str}
    Returns:
        str: Markdown comparison analysis.
    """
    comparison_prompt = (
        "You are an expert evaluator of information retrieval systems. "
        "Compare the standard retrieval and adaptive retrieval responses for each query. "
        "Assess accuracy, relevance, comprehensiveness, grounding in the provided reference, "
        "and hallucination risk. Be concise but specific. End with a 1–2 sentence verdict."
    )

    comparison_text = "# Evaluation of Standard vs. Adaptive Retrieval\n\n"

    for i, result in enumerate(results):
        if "reference_answer" not in result:
            # Skip queries with no reference
            continue

        query = result.get("query", "")
        qtype = result.get("query_type", "Unknown")
        ref = result.get("reference_answer", "")
        std_resp = result.get("standard_retrieval", {}).get("response", "")
        adp_resp = result.get("adaptive_retrieval", {}).get("response", "")

        comparison_text += f"## Query {i+1}: {query}\n"
        comparison_text += f"*Query Type: {qtype}*\n\n"
        comparison_text += f"**Reference Answer:**\n{ref}\n\n"
        comparison_text += f"**Standard Retrieval Response:**\n{std_resp}\n\n"
        comparison_text += f"**Adaptive Retrieval Response:**\n{adp_resp}\n\n"

        user_prompt = (
            f"Reference Answer:\n{ref}\n\n"
            f"Standard Retrieval Response:\n{std_resp}\n\n"
            f"Adaptive Retrieval Response:\n{adp_resp}\n\n"
            "Provide a structured comparison:\n"
            "1) Accuracy vs reference\n"
            "2) Relevance / coverage of key points\n"
            "3) Hallucinations / unsupported claims\n"
            "4) Clarity and organization\n"
            "Finish with: Verdict - <which is better and why>."
        )

        analysis = _gen_with_retry(
            system_prompt=comparison_prompt,
            user_prompt=user_prompt,
            temperature=0.2
        ) or "_(No analysis returned)_"

        comparison_text += f"**Comparison Analysis:**\n{analysis}\n\n"

    return comparison_text


## Evaluating the Adaptive Retrieval System (Customized Queries)

The final step to use the adaptive RAG evaluation system is to call the evaluate_adaptive_vs_standard() function with your PDF document and test queries:

In [22]:
def _extract_first_embedding(resp):
    """
    Normalize Gemini embed_content responses to a single 1-D list of floats.
    Handles dict/object/list variants across SDK versions.
    """
    # 1) Dict responses
    if isinstance(resp, dict):
        # Single embedding
        if "embedding" in resp:
            emb = resp["embedding"]
            if isinstance(emb, dict) and "values" in emb:
                return emb["values"]
            return emb  # already a list of floats
        # Batch embeddings
        if "embeddings" in resp and resp["embeddings"]:
            emb0 = resp["embeddings"][0]
            if isinstance(emb0, dict) and "values" in emb0:
                return emb0["values"]
            return emb0  # already list
        raise RuntimeError(f"Unexpected dict embed response: keys={list(resp.keys())}")

    # 2) Object-like responses
    if hasattr(resp, "embedding") and resp.embedding is not None:
        emb = resp.embedding
        return getattr(emb, "values", emb)
    if hasattr(resp, "embeddings") and resp.embeddings:
        emb0 = resp.embeddings[0]
        return getattr(emb0, "values", emb0)

    # 3) List responses (SDK sometimes returns a list)
    if isinstance(resp, list):
        if not resp:
            raise RuntimeError("Empty list embed response.")
        # If it's already a flat list of numbers
        if all(isinstance(x, (int, float)) for x in resp):
            return resp
        # If it's a list of dicts/objects, take the first
        first = resp[0]
        if isinstance(first, dict) and "values" in first:
            return first["values"]
        if hasattr(first, "values"):
            return first.values
        # If the first is a nested list of floats
        if isinstance(first, list) and all(isinstance(x, (int, float)) for x in first):
            return first
        raise RuntimeError(f"Unexpected list embed response element type: {type(first)}")

    raise RuntimeError(f"Unknown embed response type: {type(resp)}")


def _to_1d(vec) -> np.ndarray:
    arr = np.array(vec, dtype=np.float32)
    if arr.ndim == 1:
        return arr
    if arr.ndim == 2:
        # Take first row if a batch slips through
        return arr[0]
    return arr.reshape(-1)


def _embed_one(text: str, max_retries: int = 5, base_sleep: float = 5.0) -> np.ndarray:
    """
    Embed a single string using Gemini. Returns a 1-D np.ndarray.
    Retries on 429/ResourceExhausted.
    """
    for attempt in range(max_retries):
        try:
            resp = genai.embed_content(model=EMBED_MODEL, content=text)
            emb = _extract_first_embedding(resp)
            return _to_1d(emb)
        except Exception as e:
            msg = str(e)
            if "429" in msg or "ResourceExhausted" in msg:
                sleep_s = base_sleep * (attempt + 1)
                print(f"[embed] Quota hit (attempt {attempt+1}/{max_retries}). Sleeping {sleep_s}s...")
                time.sleep(sleep_s)
                continue
            # Helpful debug for format surprises
            print(f"[embed] Unexpected response type: {type(resp) if 'resp' in locals() else None}")
            raise


In [23]:
# pip install google-generativeai python-dotenv
import os
from dotenv import load_dotenv

# ---- Load environment and configure Gemini ----
load_dotenv()
import google.generativeai as genai

GEMINI_API_KEY = os.getenv("GEMINI_API_KEY")
if not GEMINI_API_KEY:
    raise RuntimeError("GEMINI_API_KEY is not set in .env or environment variables.")
genai.configure(api_key=GEMINI_API_KEY)

# ---- Path to your knowledge source folder (containing all PDFs) ----
pdf_folder = "/Users/kekunkoya/Desktop/RAG Google 2/PDFs"

# ---- Build a list of all PDF paths ----
pdf_paths = [
    os.path.join(pdf_folder, filename)
    for filename in os.listdir(pdf_folder)
    if filename.lower().endswith(".pdf")
]

# ---- Debug: print what we found ----
if not pdf_paths:
    print(f"No PDFs found in folder: {pdf_folder}")
else:
    print(f"Found {len(pdf_paths)} PDFs in {pdf_folder}:")
    for path in pdf_paths:
        print(f" - {os.path.basename(path)}")


Found 5 PDFs in /Users/kekunkoya/Desktop/RAG Google 2/PDFs:
 - PA 211 Disaster Community Resources.pdf
 - 211 RESPONDS TO URGENT NEEDS.pdf
 - PEMA.pdf
 - ready-gov_disaster-preparedness-guide-for-older-adults.pdf
 - Substantial Damages Toolkit.pdf


In [24]:
def evaluate_adaptive_vs_standard_gemini(pdf_paths, test_queries, reference_answers=None,
                                         chunk_size=1000, overlap=200, k=4):
    """
    Gemini-powered evaluation that accepts a list of PDF paths.
    - Concats all PDF text,
    - Chunks + embeds with Gemini,
    - Compares standard vs. adaptive retrieval for each query.

    Requires:
      - extract_text_from_pdf(path) -> str
      - chunk_text(text, chunk_size, overlap) -> List[str]
      - class GeminiVectorStore with .add_texts([...]) and .similarity_search_by_text(query, k)
      - adaptive_retrieval_gemini(query, vector_store, k) -> (query_type, docs)
      - generate_response_gemini(query, docs, query_type) -> str
      - (optional) compare_responses_gemini(results) -> str
    """
    print("=== EVALUATING ADAPTIVE VS. STANDARD RETRIEVAL (Gemini, Multi-PDF) ===")

    # 1) Combine all PDF text
    all_text = []
    for p in pdf_paths:
        try:
            all_text.append(extract_text_from_pdf(p))
        except Exception as e:
            print(f"Skipping {p} due to extraction error: {e}")
    all_text = "\n".join(t for t in all_text if t and t.strip())

    if not all_text.strip():
        raise ValueError("No extractable text found across provided PDFs.")

    # 2) Chunk + build vector store (Gemini embeddings under the hood)
    chunks = chunk_text(all_text, chunk_size, overlap)
    vector_store = GeminiVectorStore()
    vector_store.add_texts(chunks)  # embeds each chunk via Gemini

    # 3) Evaluate per query
    results = []
    for i, query in enumerate(test_queries):
        print(f"\n\nQuery {i+1}: {query}")

        # --- Standard ---
        print("\n--- Standard Retrieval ---")
        std_docs = vector_store.similarity_search_by_text(query, k=k)
        std_resp = generate_response_gemini(query, std_docs, "General")

        # --- Adaptive ---
        print("\n--- Adaptive Retrieval ---")
        qtype, adp_docs = adaptive_retrieval_gemini(query, vector_store, k=k)
        adp_resp = generate_response_gemini(query, adp_docs, qtype)

        entry = {
            "query": query,
            "query_type": qtype,
            "standard_retrieval": {"documents": std_docs, "response": std_resp},
            "adaptive_retrieval": {"documents": adp_docs, "response": adp_resp},
        }
        if reference_answers and i < len(reference_answers):
            entry["reference_answer"] = reference_answers[i]

        results.append(entry)

        print("\n--- Responses ---")
        print(f"Standard: {std_resp[:200]}...")
        print(f"Adaptive:  {adp_resp[:200]}...")

    # 4) Optional comparison against references
    comparison = None
    if reference_answers:
        try:
            comparison = compare_responses_gemini(results)
            print("\n=== EVALUATION RESULTS ===")
            print(comparison[:500] + ("..." if len(comparison) > 500 else ""))
        except NameError:
            # compare_responses_gemini not provided; skip gracefully
            pass

    return {
        "results": results,
        "comparison": comparison or "No reference-based comparison (none provided or function missing).",
    }


In [30]:
import os

def evaluate_adaptive_vs_standard(pdf_paths, test_queries, reference_answers):
    """
    Accepts a list of PDF paths instead of a single path.
    Returns a dictionary with a 'comparison' key for printout.
    """
    # Combine all text from all PDFs
    all_text = ""
    for pdf_path in pdf_paths:
        all_text += extract_text_from_pdf(pdf_path) + "\n"
    
    # (Example placeholder logic, replace with your real chunking/retrieval/eval code)
    # For now, let's just compare the length of the text for demonstration:
    comparison = {
        "total_text_length": len(all_text),
        "number_of_pdfs": len(pdf_paths),
        "test_queries": test_queries,
        "reference_answers": reference_answers
    }
    return {"comparison": comparison}

# --- Find all PDFs in the folder ---
pdf_folder = "/Users/kekunkoya/Desktop/Revised RAG Project/PDFs"
pdf_paths = [
    os.path.join(pdf_folder, fname)
    for fname in os.listdir(pdf_folder)
    if fname.lower().endswith(".pdf")
]

# --- Example test queries and reference answers ---
test_queries = [
    "What should I do if I am unable to evacuate during an emergency?" 
]
reference_answers = [
    "Shelter in place in the safest room in your home, and inform your support network of your situation. Keep a radio or phone handy for updates from officials."
]

# --- Run evaluation and print result ---
evaluation_results = evaluate_adaptive_vs_standard(
    pdf_paths=pdf_paths,
    test_queries=test_queries,
    reference_answers=reference_answers
)

print("\n--- Comparison Results ---")
print(evaluation_results["comparison"])



--- Comparison Results ---
{'total_text_length': 326826, 'number_of_pdfs': 5, 'test_queries': ['What should I do if I am unable to evacuate during an emergency?'], 'reference_answers': ['Shelter in place in the safest room in your home, and inform your support network of your situation. Keep a radio or phone handy for updates from officials.']}


In [27]:
def _extract_first_embedding(resp):
    """
    Normalize google.generativeai embed_content responses into a single 1-D list of floats.
    Handles dict/object/list variants across SDK versions.
    """
    # dict (single)
    if isinstance(resp, dict) and "embedding" in resp:
        emb = resp["embedding"]
        return emb["values"] if isinstance(emb, dict) and "values" in emb else emb

    # dict (batch)
    if isinstance(resp, dict) and "embeddings" in resp:
        emb0 = resp["embeddings"][0]
        return emb0["values"] if isinstance(emb0, dict) and "values" in emb0 else emb0

    # object (single)
    if hasattr(resp, "embedding") and resp.embedding is not None:
        return getattr(resp.embedding, "values", resp.embedding)

    # object (batch)
    if hasattr(resp, "embeddings") and resp.embeddings:
        emb0 = resp.embeddings[0]
        return getattr(emb0, "values", emb0)

    # list (SDK sometimes returns a list)
    if isinstance(resp, list):
        if not resp:
            raise RuntimeError("Empty list embed response.")
        first = resp[0]
        if isinstance(first, dict) and "values" in first:
            return first["values"]
        if hasattr(first, "values"):
            return first.values
        if isinstance(first, list) and all(isinstance(x, (int, float)) for x in first):
            return first
        # already a flat list?
        if all(isinstance(x, (int, float)) for x in resp):
            return resp

    raise RuntimeError(f"Unexpected embed response type/shape: {type(resp)} -> {resp!r}")


def _to_1d(vec):
    import numpy as np
    arr = np.array(vec, dtype=np.float32)
    if arr.ndim == 1:
        return arr
    if arr.ndim == 2:
        return arr[0]  # pick first if a batch sneaks in
    return arr.reshape(-1)


def _embed_one(text: str, max_retries: int = 5, base_sleep: float = 5.0):
    """
    Embed a single string using Gemini. Returns a 1-D np.ndarray.
    Retries on 429/ResourceExhausted. Robust to SDK response variants.
    """
    import time, numpy as np, google.generativeai as genai
    for attempt in range(max_retries):
        try:
            resp = genai.embed_content(model=EMBED_MODEL, content=text)
            emb = _extract_first_embedding(resp)
            return _to_1d(emb)
        except Exception as e:
            msg = str(e)
            if "429" in msg or "ResourceExhausted" in msg:
                sleep_s = base_sleep * (attempt + 1)
                print(f"[embed] Quota hit (attempt {attempt+1}/{max_retries}). Sleeping {sleep_s}s...")
                time.sleep(sleep_s)
                continue
            # Debug aid if the format is odd
            print(f"[embed] Unexpected response type: {type(resp) if 'resp' in locals() else None}")
            raise


In [30]:
# --- DROP-IN REPLACEMENTS (put these above GeminiVectorStore) ---

def _extract_first_embedding(resp):
    """
    Normalize google.generativeai embed_content responses into a single 1-D list of floats.
    Handles dict/object/list variants across SDK versions.
    """
    # dict (single)
    if isinstance(resp, dict) and "embedding" in resp:
        emb = resp["embedding"]
        return emb["values"] if isinstance(emb, dict) and "values" in emb else emb

    # dict (batch)
    if isinstance(resp, dict) and "embeddings" in resp and resp["embeddings"]:
        emb0 = resp["embeddings"][0]
        return emb0["values"] if isinstance(emb0, dict) and "values" in emb0 else emb0

    # object (single)
    if hasattr(resp, "embedding") and resp.embedding is not None:
        return getattr(resp.embedding, "values", resp.embedding)

    # object (batch)
    if hasattr(resp, "embeddings") and resp.embeddings:
        emb0 = resp.embeddings[0]
        return getattr(emb0, "values", emb0)

    # list variants
    if isinstance(resp, list):
        if not resp:
            raise RuntimeError("Empty list embed response.")
        first = resp[0]
        if isinstance(first, dict) and "values" in first:
            return first["values"]
        if hasattr(first, "values"):
            return first.values
        if isinstance(first, list) and all(isinstance(x, (int, float)) for x in first):
            return first
        # flat list of floats already?
        if all(isinstance(x, (int, float)) for x in resp):
            return resp

    raise RuntimeError(f"Unexpected embed response type/shape: {type(resp)} -> {resp!r}")


def _to_1d(vec):
    arr = np.array(vec, dtype=np.float32)
    if arr.ndim == 1:
        return arr
    if arr.ndim == 2:
        return arr[0]  # pick first row if a batch sneaks in
    return arr.reshape(-1)


def _embed_one(text: str, max_retries: int = 5, base_sleep: float = 5.0):
    """
    Embed a single string using Gemini. Returns a 1-D np.ndarray.
    Retries on 429/ResourceExhausted. Robust to SDK response variants.
    """
    for attempt in range(max_retries):
        try:
            resp = genai.embed_content(model=EMBED_MODEL, content=text)
            emb = _extract_first_embedding(resp)
            return _to_1d(emb)
        except Exception as e:
            msg = str(e)
            if "429" in msg or "ResourceExhausted" in msg:
                sleep_s = base_sleep * (attempt + 1)
                print(f"[embed] Quota hit (attempt {attempt+1}/{max_retries}). Sleeping {sleep_s}s...")
                time.sleep(sleep_s)
                continue
            print(f"[embed] Unexpected response type: {type(resp) if 'resp' in locals() else None}")
            raise


In [1]:
import time, json, hashlib, pathlib
from collections import deque

# --- Config knobs (tune to taste) ---
MAX_GEN_PER_MIN = 12   # keep a safety margin under free-tier 15/min
MAX_EMB_PER_MIN = 50   # under the 60/min embed guideline
SLEEP_ON_SUCCESS = 0.5 # small gap between requests
EMB_CACHE_DIR = pathlib.Path(".cache/embeds")
GEN_CACHE_PATH = pathlib.Path(".cache/generations.jsonl")
EMB_CACHE_DIR.mkdir(parents=True, exist_ok=True)
GEN_CACHE_PATH.parent.mkdir(parents=True, exist_ok=True)

class RateLimiter:
    """Token-bucket-ish per-minute limiter using timestamps."""
    def __init__(self, per_minute: int):
        self.per_minute = per_minute
        self.events = deque()  # timestamps of last calls

    def wait(self):
        now = time.time()
        # drop events older than 60s
        while self.events and now - self.events[0] > 60:
            self.events.popleft()
        if len(self.events) >= self.per_minute:
            sleep_s = 60 - (now - self.events[0]) + 0.05
            time.sleep(max(sleep_s, 0.05))
        # record this event
        self.events.append(time.time())

gen_limiter = RateLimiter(MAX_GEN_PER_MIN)
emb_limiter = RateLimiter(MAX_EMB_PER_MIN)

def _hash_key(*parts) -> str:
    m = hashlib.sha256()
    for p in parts:
        m.update(str(p).encode("utf-8"))
    return m.hexdigest()

def _save_generation_cache(record: dict):
    with GEN_CACHE_PATH.open("a") as f:
        f.write(json.dumps(record, ensure_ascii=False) + "\n")

def _load_generation_cache():
    cache = {}
    if GEN_CACHE_PATH.exists():
        with GEN_CACHE_PATH.open() as f:
            for line in f:
                try:
                    rec = json.loads(line)
                    cache[rec["key"]] = rec["text"]
                except:
                    pass
    return cache

_GEN_CACHE = _load_generation_cache()

def _gen_with_retry(prompt, temperature=0.0, max_retries=6, base_sleep=5):
    """Generation with rate-limit pacing, backoff, and disk cache."""
    key = _hash_key("gen", prompt, temperature)
    if key in _GEN_CACHE:
        return _GEN_CACHE[key]

    model = genai.GenerativeModel(GEN_MODEL)

    for attempt in range(max_retries):
        try:
            gen_limiter.wait()
            resp = model.generate_content(
                prompt,
                generation_config=genai.GenerationConfig(temperature=temperature)
            )
            text = (resp.text or "").strip()
            _GEN_CACHE[key] = text
            _save_generation_cache({"key": key, "text": text})
            time.sleep(SLEEP_ON_SUCCESS)
            return text
        except Exception as e:
            msg = str(e)
            # Parse suggested retry_delay if available
            sleep_s = base_sleep * (attempt + 1)
            try:
                # The SDK often includes 'retry_delay { seconds: N }'
                if "retry_delay" in msg:
                    import re
                    m = re.search(r"retry_delay\s*{\s*seconds:\s*(\d+)", msg)
                    if m:
                        sleep_s = max(sleep_s, int(m.group(1)) + 1)
            except:
                pass

            if "ResourceExhausted" in msg or "429" in msg:
                # DAILY QUOTA?
                if "PerDay" in msg or "PerDayPerProjectPerModel" in msg:
                    raise RuntimeError(
                        "Daily generation quota exhausted. Try again after reset or upgrade."
                    )
                # Otherwise, wait and retry
                time.sleep(sleep_s)
                continue
            raise

def _embed_one_cached(text: str, max_retries=6, base_sleep=3):
    """Embedding with per-minute limiter, backoff, and disk cache."""
    key = _hash_key("emb", text)
    cache_path = EMB_CACHE_DIR / f"{key}.npy"
    if cache_path.exists():
        return np.load(cache_path)

    for attempt in range(max_retries):
        try:
            emb_limiter.wait()
            resp = genai.embed_content(model=EMBED_MODEL, content=text)
            # robust normalization
            emb = _extract_first_embedding(resp)  # <- use your robust parser
            vec = _to_1d(emb)                     # <- ensure 1-D
            np.save(cache_path, vec)
            time.sleep(SLEEP_ON_SUCCESS)
            return vec
        except Exception as e:
            msg = str(e)
            if "ResourceExhausted" in msg or "429" in msg:
                # DAILY embed quota? (rare; usually per-minute)
                if "PerDay" in msg:
                    raise RuntimeError("Daily embedding quota exhausted.")
                time.sleep(base_sleep * (attempt + 1))
                continue
            raise
# --- Run Gemini evaluation ---
evaluation_results = evaluate_adaptive_vs_standard_gemini(
    pdf_paths=pdf_paths,
    test_queries=test_queries,
    reference_answers=reference_answers
)

# --- Print results nicely ---
print("\n=== Gemini Evaluation Results ===")
for i, res in enumerate(evaluation_results["results"], start=1):
    print(f"\n--- Query {i}: {res['query']} ---")
    print(f"Query Type: {res['query_type']}")
    print("\n[Standard Retrieval Response]:")
    print(res["standard_retrieval"]["response"][:300], "...")
    print("\n[Adaptive Retrieval Response]:")
    print(res["adaptive_retrieval"]["response"][:300], "...")
    if "reference_answer" in res:
        print("\n[Reference Answer]:")
        print(res["reference_answer"])

if "comparison" in evaluation_results:
    print("\n=== Comparison Analysis ===")
    print(evaluation_results["comparison"])

NameError: name 'evaluate_adaptive_vs_standard_gemini' is not defined

In [2]:
# pip install google-generativeai python-dotenv pymupdf
import os, time, json, hashlib, pathlib, re
from collections import deque

import numpy as np
import fitz  # PyMuPDF
import google.generativeai as genai
from dotenv import load_dotenv

# =========================
# 1) Gemini setup
# =========================
load_dotenv()
GEMINI_API_KEY = os.getenv("GEMINI_API_KEY")
if not GEMINI_API_KEY:
    raise RuntimeError("GEMINI_API_KEY is not set in your environment/.env")
genai.configure(api_key=GEMINI_API_KEY)

GEN_MODEL = "gemini-2.0-flash"
EMBED_MODEL = "models/embedding-001"

# =========================
# 2) Rate limiting, backoff, and caches
# =========================
MAX_GEN_PER_MIN = 12    # below free-tier 15/min
MAX_EMB_PER_MIN = 50    # below 60/min guideline
SLEEP_ON_SUCCESS = 0.5  # small spacing between calls
EMB_CACHE_DIR = pathlib.Path(".cache/embeds")
GEN_CACHE_PATH = pathlib.Path(".cache/generations.jsonl")
EMB_CACHE_DIR.mkdir(parents=True, exist_ok=True)
GEN_CACHE_PATH.parent.mkdir(parents=True, exist_ok=True)

class RateLimiter:
    def __init__(self, per_minute: int):
        self.per_minute = per_minute
        self.events = deque()
    def wait(self):
        now = time.time()
        while self.events and now - self.events[0] > 60:
            self.events.popleft()
        if len(self.events) >= self.per_minute:
            sleep_s = 60 - (now - self.events[0]) + 0.05
            time.sleep(max(sleep_s, 0.05))
        self.events.append(time.time())

gen_limiter = RateLimiter(MAX_GEN_PER_MIN)
emb_limiter = RateLimiter(MAX_EMB_PER_MIN)

def _hash_key(*parts) -> str:
    m = hashlib.sha256()
    for p in parts:
        m.update(str(p).encode("utf-8"))
    return m.hexdigest()

def _save_generation_cache(record: dict):
    with GEN_CACHE_PATH.open("a", encoding="utf-8") as f:
        f.write(json.dumps(record, ensure_ascii=False) + "\n")

def _load_generation_cache():
    cache = {}
    if GEN_CACHE_PATH.exists():
        with GEN_CACHE_PATH.open(encoding="utf-8") as f:
            for line in f:
                try:
                    rec = json.loads(line)
                    cache[rec["key"]] = rec["text"]
                except:
                    pass
    return cache

_GEN_CACHE = _load_generation_cache()

def _gen_with_retry(prompt, temperature=0.0, max_retries=6, base_sleep=5):
    key = _hash_key("gen", prompt, temperature)
    if key in _GEN_CACHE:
        return _GEN_CACHE[key]

    model = genai.GenerativeModel(GEN_MODEL)

    for attempt in range(max_retries):
        try:
            gen_limiter.wait()
            resp = model.generate_content(
                prompt,
                generation_config=genai.GenerationConfig(temperature=temperature)
            )
            text = (resp.text or "").strip()
            _GEN_CACHE[key] = text
            _save_generation_cache({"key": key, "text": text})
            time.sleep(SLEEP_ON_SUCCESS)
            return text
        except Exception as e:
            msg = str(e)
            sleep_s = base_sleep * (attempt + 1)
            # honor retry_delay if present
            m = re.search(r"retry_delay\s*{\s*seconds:\s*(\d+)", msg)
            if m:
                sleep_s = max(sleep_s, int(m.group(1)) + 1)

            if "ResourceExhausted" in msg or "429" in msg:
                if "PerDay" in msg or "PerDayPerProjectPerModel" in msg:
                    raise RuntimeError("Daily generation quota exhausted. Try after reset or upgrade.")
                time.sleep(sleep_s)
                continue
            raise

def _extract_first_embedding(resp):
    # dict single
    if isinstance(resp, dict) and "embedding" in resp:
        emb = resp["embedding"]
        return emb["values"] if isinstance(emb, dict) and "values" in emb else emb
    # dict batch
    if isinstance(resp, dict) and "embeddings" in resp and resp["embeddings"]:
        emb0 = resp["embeddings"][0]
        return emb0["values"] if isinstance(emb0, dict) and "values" in emb0 else emb0
    # object single
    if hasattr(resp, "embedding") and resp.embedding is not None:
        return getattr(resp.embedding, "values", resp.embedding)
    # object batch
    if hasattr(resp, "embeddings") and resp.embeddings:
        emb0 = resp.embeddings[0]
        return getattr(emb0, "values", emb0)
    # list variants
    if isinstance(resp, list):
        if not resp:
            raise RuntimeError("Empty list embed response.")
        first = resp[0]
        if isinstance(first, dict) and "values" in first:
            return first["values"]
        if hasattr(first, "values"):
            return first.values
        if isinstance(first, list) and all(isinstance(x, (int, float)) for x in first):
            return first
        if all(isinstance(x, (int, float)) for x in resp):
            return resp
    raise RuntimeError(f"Unexpected embed response type: {type(resp)} -> {resp!r}")

def _to_1d(vec):
    arr = np.array(vec, dtype=np.float32)
    if arr.ndim == 1:
        return arr
    if arr.ndim == 2:
        return arr[0]
    return arr.reshape(-1)

def _embed_one_cached(text: str, max_retries=6, base_sleep=3):
    key = _hash_key("emb", text)
    cache_path = EMB_CACHE_DIR / f"{key}.npy"
    if cache_path.exists():
        return np.load(cache_path)

    for attempt in range(max_retries):
        try:
            emb_limiter.wait()
            resp = genai.embed_content(model=EMBED_MODEL, content=text)
            emb = _extract_first_embedding(resp)
            vec = _to_1d(emb)
            np.save(cache_path, vec)
            time.sleep(SLEEP_ON_SUCCESS)
            return vec
        except Exception as e:
            msg = str(e)
            if "ResourceExhausted" in msg or "429" in msg:
                if "PerDay" in msg:
                    raise RuntimeError("Daily embedding quota exhausted.")
                time.sleep(base_sleep * (attempt + 1))
                continue
            raise

# =========================
# 3) PDF -> text -> chunks
# =========================
def extract_text_from_pdf(pdf_path):
    doc = fitz.open(pdf_path)
    out = []
    for page in doc:
        out.append(page.get_text("text"))
    doc.close()
    return "\n".join(out)

def chunk_text(text, chunk_size=1000, overlap=200):
    chunks, start = [], 0
    while start < len(text):
        end = min(start + chunk_size, len(text))
        chunks.append(text[start:end])
        if end == len(text): break
        start += chunk_size - overlap
    return chunks

# =========================
# 4) Tiny vector store (NumPy)
# =========================
class GeminiVectorStore:
    def __init__(self):
        self.vectors, self.texts, self.metadata = [], [], []
    def add_item(self, text, embedding, metadata=None):
        self.vectors.append(np.array(embedding, dtype=np.float32))
        self.texts.append(text)
        self.metadata.append(metadata or {})
    def add_texts(self, texts, metadatas=None, sleep_between=0.25):
        metadatas = metadatas or [{}]*len(texts)
        for t, m in zip(texts, metadatas):
            vec = _embed_one_cached(t)
            self.add_item(t, vec, m)
            if sleep_between:
                time.sleep(sleep_between)
    def similarity_search_by_text(self, query_text, k=4, filter_func=None):
        q = _embed_one_cached(query_text)
        qn = np.linalg.norm(q)
        sims = []
        for i, v in enumerate(self.vectors):
            if filter_func and not filter_func(self.metadata[i]): 
                continue
            vn = np.linalg.norm(v)
            sim = 0.0 if qn == 0 or vn == 0 else float(np.dot(q, v) / (qn * vn))
            sims.append((i, sim))
        sims.sort(key=lambda x: x[1], reverse=True)
        return [{"text": self.texts[i], "metadata": self.metadata[i], "similarity": s}
                for i, s in sims[:k]]

# =========================
# 5) Retrieval policies + generation
# =========================
def classify_query_gemini(query):
    q = query.lower()
    if any(x in q for x in ["why", "how", "analyz", "compare", "tradeoff"]): return "Analytical"
    if any(x in q for x in ["opinion", "should i", "what do you think"]):   return "Opinion"
    if any(x in q for x in ["context", "background", "overview", "explain"]): return "Contextual"
    return "Factual"

def adaptive_retrieval_gemini(query, vector_store, k=4):
    qtype = classify_query_gemini(query)
    kk = {"Factual": max(4, k), "Contextual": max(5, k), "Analytical": k, "Opinion": max(3, k)}.get(qtype, k)
    return qtype, vector_store.similarity_search_by_text(query, k=kk)

def generate_response_gemini(query, docs, query_type="General"):
    context = "\n\n".join([f"[{i+1}] {d['text']}" for i, d in enumerate(docs)])
    prompt = f"""You are a helpful assistant answering with ONLY the provided context if possible.
Query type: {query_type}
User query: {query}

Context:
{context}

Instructions:
- Cite supporting chunk indices like [1], [2] in-line.
- If the answer is not in context, say so briefly and provide best-effort guidance.
- Be concise (120-180 words).
"""
    return _gen_with_retry(prompt, temperature=0.0)

# =========================
# 6) The missing function
# =========================
def evaluate_adaptive_vs_standard_gemini(pdf_paths, test_queries, reference_answers=None,
                                         chunk_size=1200, overlap=250, k=4):
    # Combine PDF text
    all_texts = []
    for p in pdf_paths:
        try:
            t = extract_text_from_pdf(p)
            if t.strip():
                all_texts.append(t)
            else:
                print(f"Skipping empty/non-text PDF: {os.path.basename(p)}")
        except Exception as e:
            print(f"Skipping {p}: {e}")
    combined = "\n".join(all_texts)
    if not combined.strip():
        raise ValueError("No extractable text found in provided PDFs.")

    # Chunk + embed
    chunks = chunk_text(combined, chunk_size=chunk_size, overlap=overlap)
    vs = GeminiVectorStore()
    vs.add_texts(chunks, sleep_between=0.25)  # throttle a bit

    # Evaluate queries
    results = []
    for i, query in enumerate(test_queries):
        # Standard
        std_docs = vs.similarity_search_by_text(query, k=k)
        std_resp = generate_response_gemini(query, std_docs, "General")

        # Adaptive
        qtype, adp_docs = adaptive_retrieval_gemini(query, vs, k=k)
        adp_resp = generate_response_gemini(query, adp_docs, qtype)

        entry = {
            "query": query,
            "query_type": qtype,
            "standard_retrieval": {"documents": std_docs, "response": std_resp},
            "adaptive_retrieval": {"documents": adp_docs, "response": adp_resp},
        }
        if reference_answers and i < len(reference_answers):
            entry["reference_answer"] = reference_answers[i]
        results.append(entry)

    return {"results": results}



In [5]:
import os

# 1️⃣ Build list of all PDF files in the folder
pdf_folder = "/Users/kekunkoya/Desktop/Revised RAG Project/PDFs"
pdf_paths = [
    os.path.join(pdf_folder, fname)
    for fname in os.listdir(pdf_folder)
    if fname.lower().endswith(".pdf")
]

test_queries = [
    "What should I do if I am unable to evacuate during an emergency?"
]
reference_answers = [
    "Shelter in place in the safest room in your home, and inform your support network of your situation. Keep a radio or phone handy for updates from officials."
]


In [7]:
evaluation_results = evaluate_adaptive_vs_standard_gemini(
    pdf_paths=pdf_paths,
    test_queries=test_queries,
    reference_answers=reference_answers
)


In [8]:
# Run Gemini evaluation
evaluation_results = evaluate_adaptive_vs_standard_gemini(
    pdf_paths=pdf_paths,
    test_queries=test_queries,
    reference_answers=reference_answers
)

# Print results nicely
print("\n=== Gemini Evaluation Results ===")
for i, res in enumerate(evaluation_results["results"], start=1):
    print(f"\n--- Query {i}: {res['query']} ---")
    print(f"Query Type: {res['query_type']}")
    
    print("\n[Standard Retrieval Response]:")
    print(res["standard_retrieval"]["response"][:300], "...")
    
    print("\n[Adaptive Retrieval Response]:")
    print(res["adaptive_retrieval"]["response"][:300], "...")
    
    if "reference_answer" in res:
        print("\n[Reference Answer]:")
        print(res["reference_answer"])

if "comparison" in evaluation_results:
    print("\n=== Comparison Analysis ===")
    print(evaluation_results["comparison"])



=== Gemini Evaluation Results ===

--- Query 1: What should I do if I am unable to evacuate during an emergency? ---
Query Type: Opinion

[Standard Retrieval Response]:
If you are unable to evacuate, you should plan to get inside, find a safe spot, and stay put until local officials say the threat has passed [1]. Identify a safe spot in your home to shelter in place with members of your household and your pets; the safest spot will depend on the type of disaster [1 ...

[Adaptive Retrieval Response]:
If you are unable to evacuate during an emergency, you should plan to get inside, find a safe spot, and stay put until local officials say the threat has passed [1]. Identify a safe spot in your home to shelter in place with members of your household and your pets; the safest spot will depend on the ...

[Reference Answer]:
Shelter in place in the safest room in your home, and inform your support network of your situation. Keep a radio or phone handy for updates from officials.
