# Contextual Compression for Enhanced RAG Systems
In this notebook, I implement a contextual compression technique to improve our RAG system's efficiency. We'll filter and compress retrieved text chunks to keep only the most relevant parts, reducing noise and improving response quality.

When retrieving documents for RAG, we often get chunks containing both relevant and irrelevant information. Contextual compression helps us:

- Remove irrelevant sentences and paragraphs
- Focus only on query-relevant information
- Maximize the useful signal in our context window

Let's implement this approach from scratch!

## Setting Up the Environment
We begin by importing necessary libraries.

In [1]:
import fitz # PyMuPDF
import os
import numpy as np
import json
import google.generativeai as genai

In [2]:

import fitz
import os
import google.generativeai as genai
from dotenv import load_dotenv


## Extracting Text from a PDF File
To implement RAG, we first need a source of textual data. In this case, we extract text from a PDF file using the PyMuPDF library.

In [3]:
import fitz
from typing import List, Dict

def extract_text_from_pdf(pdf_path: str) -> str:
    """
    Extracts text from a PDF file using PyMuPDF (fitz).

    Args:
        pdf_path (str): Path to the PDF file.

    Returns:
        str: Extracted text from the PDF, or an empty string if an error occurs.
    """
    all_text = ""
    try:
        # Use a context manager to automatically close the document
        with fitz.open(pdf_path) as mypdf:
            # Iterate through each page to extract text
            for page in mypdf:
                all_text += page.get_text("text") + " "
    except Exception as e:
        print(f"Error reading PDF file: {e}")
        return ""
    
    return all_text

## Chunking the Extracted Text
Once we have the extracted text, we divide it into smaller, overlapping chunks to improve retrieval accuracy.

In [4]:
def chunk_text(text, n=1000, overlap=200):
    """
    Chunks the given text into segments of n characters with overlap.

    Args:
    text (str): The text to be chunked.
    n (int): The number of characters in each chunk.
    overlap (int): The number of overlapping characters between chunks.

    Returns:
    List[str]: A list of text chunks.
    """
    chunks = []  # Initialize an empty list to store the chunks
    
    # Loop through the text with a step size of (n - overlap)
    for i in range(0, len(text), n - overlap):
        # Append a chunk of text from index i to i + n to the chunks list
        chunks.append(text[i:i + n])

    return chunks  # Return the list of text chunks

## Building a Simple Vector Store
let's implement a simple vector store since we cannot use FAISS.

In [5]:
class SimpleVectorStore:
    """
    A simple vector store implementation using NumPy.
    """
    def __init__(self):
        """
        Initialize the vector store.
        """
        self.vectors = []  # List to store embedding vectors
        self.texts = []  # List to store original texts
        self.metadata = []  # List to store metadata for each text
    
    def add_item(self, text, embedding, metadata=None):
        """
        Add an item to the vector store.

        Args:
        text (str): The original text.
        embedding (List[float]): The embedding vector.
        metadata (dict, optional): Additional metadata.
        """
        self.vectors.append(np.array(embedding))  # Convert embedding to numpy array and add to vectors list
        self.texts.append(text)  # Add the original text to texts list
        self.metadata.append(metadata or {})  # Add metadata to metadata list, use empty dict if None
    
    def similarity_search(self, query_embedding, k=5):
        """
        Find the most similar items to a query embedding.

        Args:
        query_embedding (List[float]): Query embedding vector.
        k (int): Number of results to return.

        Returns:
        List[Dict]: Top k most similar items with their texts and metadata.
        """
        if not self.vectors:
            return []  # Return empty list if no vectors are stored
        
        # Convert query embedding to numpy array
        query_vector = np.array(query_embedding)
        
        # Calculate similarities using cosine similarity
        similarities = []
        for i, vector in enumerate(self.vectors):
            similarity = np.dot(query_vector, vector) / (np.linalg.norm(query_vector) * np.linalg.norm(vector))
            similarities.append((i, similarity))  # Append index and similarity score
        
        # Sort by similarity (descending)
        similarities.sort(key=lambda x: x[1], reverse=True)
        
        # Return top k results
        results = []
        for i in range(min(k, len(similarities))):
            idx, score = similarities[i]
            results.append({
                "text": self.texts[idx],  # Add the text corresponding to the index
                "metadata": self.metadata[idx],  # Add the metadata corresponding to the index
                "similarity": score  # Add the similarity score
            })
        
        return results  # Return the list of top k results

## Embedding Generation

In [7]:
import os
import google.generativeai as genai
from typing import List, Any
import numpy as np

# --- 1. Gemini API Configuration ---
# Your GOOGLE_API_KEY should be set in your environment
GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY")
if not GOOGLE_API_KEY:
    raise ValueError("GOOGLE_API_KEY environment variable is not set.")
genai.configure(api_key=GOOGLE_API_KEY)

# --- 2. Define the create_embeddings function for Gemini ---
def create_embeddings(text: str or List[str], model: str = "models/embedding-001") -> Any:
    """
    Creates embeddings for the given text or list of texts using the Gemini API.

    Args:
    text (str or List[str]): The input text(s) for which embeddings are to be created.
    model (str): The model to be used for creating embeddings. Default is "models/embedding-001".

    Returns:
    List[float] or List[List[float]]: The embedding vector(s).
    """
    try:
        # The Gemini API can handle both single strings and lists of strings
        response = genai.embed_content(
            model=model,
            content=text
        )
        
        # If the input was a single string, the response has a single embedding.
        # If the input was a list, the response is a list of embeddings.
        return response['embedding']

    except Exception as e:
        print(f"An error occurred during embedding: {e}")
        return []

# --- 3. Main Logic (Re-implemented for a runnable example) ---
if __name__ == "__main__":
    # Example 1: Create an embedding for a single string
    single_text = "Homelessness is a complex social issue."
    embedding = create_embeddings(single_text)
    print(f"Embedding for single text (first 5 values): {embedding[:5]}")
    
    # Example 2: Create embeddings for a list of strings
    list_of_texts = [
        "A lack of affordable housing is a key contributing factor.",
        "Social factors also play a role in homelessness."
    ]
    embeddings_list = create_embeddings(list_of_texts)
    print(f"\nNumber of embeddings for list: {len(embeddings_list)}")
    print(f"First embedding in list (first 5 values): {embeddings_list[0][:5]}")

Embedding for single text (first 5 values): [0.052571062, -0.03685706, -0.06520665, -0.04034025, 0.038206574]

Number of embeddings for list: 2
First embedding in list (first 5 values): [0.07521696, -0.034325134, -0.039195377, -0.008227663, 0.10222888]


## Building Our Document Processing Pipeline

In [8]:
def process_document(pdf_path, chunk_size=1000, chunk_overlap=200):
    """
    Process a document for RAG.

    Args:
    pdf_path (str): Path to the PDF file.
    chunk_size (int): Size of each chunk in characters.
    chunk_overlap (int): Overlap between chunks in characters.

    Returns:
    SimpleVectorStore: A vector store containing document chunks and their embeddings.
    """
    # Extract text from the PDF file
    print("Extracting text from PDF...")
    extracted_text = extract_text_from_pdf('/Users/kekunkoya/Desktop/770 Google /Homelessness.pdf')
    
    # Chunk the extracted text into smaller segments
    print("Chunking text...")
    chunks = chunk_text(extracted_text, chunk_size, chunk_overlap)
    print(f"Created {len(chunks)} text chunks")
    
    # Create embeddings for each text chunk
    print("Creating embeddings for chunks...")
    chunk_embeddings = create_embeddings(chunks)
    
    # Initialize a simple vector store to store the chunks and their embeddings
    store = SimpleVectorStore()
    
    # Add each chunk and its corresponding embedding to the vector store
    for i, (chunk, embedding) in enumerate(zip(chunks, chunk_embeddings)):
        store.add_item(
            text=chunk,
            embedding=embedding,
            metadata={"index": i, "source": pdf_path}
        )
    
    print(f"Added {len(chunks)} chunks to the vector store")
    return store

## Implementing Contextual Compression
This is the core of our approach - we'll use an LLM to filter and compress retrieved content.

In [9]:
import os
import google.generativeai as genai
from typing import Tuple

# --- 1. Gemini API Configuration ---
GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY")
if not GOOGLE_API_KEY:
    raise ValueError("GOOGLE_API_KEY environment variable is not set.")
genai.configure(api_key=GOOGLE_API_KEY)

# --- 2. The main compression function (revised for Gemini) ---
def compress_chunk(chunk: str, query: str, compression_type: str = "selective", model: str = "gemini-1.5-flash") -> Tuple[str, float]:
    """
    Compress a retrieved chunk by keeping only the parts relevant to the query.
    
    Args:
        chunk (str): Text chunk to compress
        query (str): User query
        compression_type (str): Type of compression ("selective", "summary", or "extraction")
        model (str): LLM model to use
        
    Returns:
        Tuple[str, float]: Compressed chunk and compression ratio.
    """
    if compression_type == "selective":
        system_prompt = """You are an expert at information filtering. Your task is to analyze a document chunk and extract ONLY the sentences or paragraphs that are directly relevant to the user's query. Remove all irrelevant content. Your output should: 1. ONLY include text that helps answer the query 2. Preserve the exact wording of relevant sentences (do not paraphrase) 3. Maintain the original order of the text 4. Include ALL relevant content, even if it seems redundant 5. EXCLUDE any text that isn't relevant to the query. Format your response as plain text with no additional comments."""
    elif compression_type == "summary":
        system_prompt = """You are an expert at summarization. Your task is to create a concise summary of the provided chunk that focuses ONLY on information relevant to the user's query. Your output should: 1. Be brief but comprehensive regarding query-relevant information 2. Focus exclusively on information related to the query 3. Omit irrelevant details 4. Be written in a neutral, factual tone. Format your response as plain text with no additional comments."""
    else:  # extraction
        system_prompt = """You are an expert at information extraction. Your task is to extract ONLY the exact sentences from the document chunk that contain information relevant to answering the user's query. Your output should: 1. Include ONLY direct quotes of relevant sentences from the original text 2. Preserve the original wording (do not modify the text) 3. Include ONLY sentences that directly relate to the query 4. Separate extracted sentences with newlines 5. Do not add any commentary or additional text. Format your response as plain text with no additional comments."""

    user_prompt = f"""Query: {query}\n\nDocument Chunk:\n{chunk}\n\nExtract only the content relevant to answering this query."""
    
    try:
        # Create a Gemini model instance with the system prompt
        gemini_model = genai.GenerativeModel(model, system_instruction=system_prompt)
        
        # Generate the response
        response = gemini_model.generate_content(user_prompt)
        compressed_chunk = response.text.strip()
    except Exception as e:
        print(f"An error occurred during compression: {e}")
        return chunk, 0.0 # Return original chunk with 0 ratio on error

    original_length = len(chunk)
    compressed_length = len(compressed_chunk)
    
    # Avoid division by zero
    compression_ratio = (original_length - compressed_length) / original_length * 100 if original_length > 0 else 0.0
    
    return compressed_chunk, compression_ratio

# --- 3. Main Logic (Re-implemented for a runnable example) ---
if __name__ == "__main__":
    # Simulate a document chunk and query
    sample_chunk = """
    Homelessness is a complex social problem. A key factor is the lack of affordable housing, which
    disproportionately affects low-income families and individuals. The sun is the center of our solar system,
    and Pluto is now considered a dwarf planet. Social factors like family breakdown also contribute.
    """
    query_text = "What factors contribute to homelessness?"

    print("Generating compressed chunk using 'selective' compression...")
    compressed, ratio = compress_chunk(sample_chunk, query_text, compression_type="selective")
    
    print(f"\nOriginal Length: {len(sample_chunk)} characters")
    print(f"Compressed Length: {len(compressed)} characters")
    print(f"Compression Ratio: {ratio:.2f}%")
    print("\nCompressed Chunk:")
    print(compressed)

Generating compressed chunk using 'selective' compression...

Original Length: 318 characters
Compressed Length: 213 characters
Compression Ratio: 33.02%

Compressed Chunk:
Homelessness is a complex social problem. A key factor is the lack of affordable housing, which disproportionately affects low-income families and individuals. Social factors like family breakdown also contribute.


## Implementing Batch Compression
For efficiency, we'll compress multiple chunks in one go when possible.

In [10]:
import os
import google.generativeai as genai
from typing import List, Tuple
from tqdm import tqdm

# --- 1. Gemini API Configuration ---
# Your GOOGLE_API_KEY should be set in your environment
GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY")
if not GOOGLE_API_KEY:
    raise ValueError("GOOGLE_API_KEY environment variable is not set.")
genai.configure(api_key=GOOGLE_API_KEY)

# --- 2. Helper function (revised for Gemini) ---
def compress_chunk(chunk: str, query: str, compression_type: str = "selective", model: str = "gemini-1.5-flash") -> Tuple[str, float]:
    """
    Compress a retrieved chunk using a Gemini LLM.
    """
    if compression_type == "selective":
        system_prompt = "You are an expert at information filtering. Your task is to extract ONLY the sentences or paragraphs that are directly relevant to the user's query. Preserve the exact wording."
    elif compression_type == "summary":
        system_prompt = "You are an expert at summarization. Your task is to create a concise summary of the provided chunk that focuses ONLY on information relevant to the user's query."
    else: # extraction
        system_prompt = "You are an expert at information extraction. Your task is to extract ONLY the exact sentences from the document chunk that contain information relevant to answering the user's query."
    
    user_prompt = f"Query: {query}\n\nDocument Chunk:\n{chunk}\n\nExtract only the content relevant to answering this query."
    
    try:
        gemini_model = genai.GenerativeModel(model, system_instruction=system_prompt)
        response = gemini_model.generate_content(user_prompt)
        compressed_chunk = response.text.strip()
    except Exception as e:
        print(f"An error occurred during compression: {e}")
        return chunk, 0.0

    original_length = len(chunk)
    compressed_length = len(compressed_chunk)
    compression_ratio = (original_length - compressed_length) / original_length * 100 if original_length > 0 else 0.0
    
    return compressed_chunk, compression_ratio

# --- 3. The main batch compression function (revised for Gemini) ---
def batch_compress_chunks(chunks: List[str], query: str, compression_type: str = "selective", model: str = "gemini-1.5-flash") -> List[Tuple[str, float]]:
    """
    Compress multiple chunks individually.
    """
    print(f"Compressing {len(chunks)} chunks...")
    results = []
    total_original_length = 0
    total_compressed_length = 0
    
    for chunk in tqdm(chunks, desc="Batch Compressing Chunks"):
        compressed_chunk, compression_ratio = compress_chunk(chunk, query, compression_type, model)
        results.append((compressed_chunk, compression_ratio))
        
        total_original_length += len(chunk)
        total_compressed_length += len(compressed_chunk)
    
    overall_ratio = (total_original_length - total_compressed_length) / total_original_length * 100 if total_original_length > 0 else 0.0
    print(f"Overall compression ratio: {overall_ratio:.2f}%")
    
    return results

# --- 4. Main Logic for a runnable example ---
if __name__ == "__main__":
    # Simulate a list of chunks and a query
    sample_chunks = [
        "Homelessness is a complex social problem. A key factor is the lack of affordable housing.",
        "The sun is the star at the center of the solar system. Pluto is considered a dwarf planet.",
        "Social factors like family breakdown, and mental health issues can also lead to homelessness."
    ]
    query_text = "What are the causes of homelessness?"

    # Compress the chunks in a batch
    compressed_results = batch_compress_chunks(sample_chunks, query_text, compression_type="selective")
    
    # Print the results
    for i, (compressed_chunk, ratio) in enumerate(compressed_results):
        print(f"\nResult {i+1}:")
        print(f"Compressed Chunk (Ratio {ratio:.2f}%): {compressed_chunk}")

Compressing 3 chunks...


Batch Compressing Chunks: 100%|██████████| 3/3 [00:01<00:00,  2.04it/s]

Overall compression ratio: 19.85%

Result 1:
Compressed Chunk (Ratio 47.19%): A key factor is the lack of affordable housing.

Result 2:
Compressed Chunk (Ratio 13.33%): There is no information about the causes of homelessness in the provided text.

Result 3:
Compressed Chunk (Ratio 0.00%): Social factors like family breakdown, and mental health issues can also lead to homelessness.





## Response Generation Function

In [11]:
import os
import google.generativeai as genai
from typing import List

# --- 1. Gemini API Configuration ---
GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY")
if not GOOGLE_API_KEY:
    raise ValueError("GOOGLE_API_KEY environment variable is not set.")
genai.configure(api_key=GOOGLE_API_KEY)

# --- 2. Define the response generator for Gemini ---
def generate_response(query: str, context: str, model: str = "gemini-1.5-flash") -> str:
    """
    Generate a response based on the query and context using Gemini.

    Args:
        query (str): User query
        context (str): Context text from compressed chunks
        model (str): LLM model to use

    Returns:
        str: Generated response
    """
    # Define the system prompt to guide the AI's behavior
    system_prompt = """You are a helpful AI assistant. Answer the user's question based only on the provided context.
If you cannot find the answer in the context, state that you don't have enough information."""
    
    # Create the user prompt by combining the context and the query
    user_prompt = f"""
Context:
{context}

Question: {query}

Please provide a comprehensive answer based only on the context above.
"""
    
    try:
        # Pass the system prompt to the GenerativeModel's system_instruction parameter
        gemini_model = genai.GenerativeModel(model, system_instruction=system_prompt)
        
        # Generate the response using the specified model
        response = gemini_model.generate_content(user_prompt)
        
        # Return the generated response content
        return response.text
    except Exception as e:
        print(f"An error occurred during response generation: {e}")
        return "I could not generate a response due to an error."

# --- 3. Main Logic (Re-implemented for a runnable example) ---
if __name__ == "__main__":
    # Simulate a query and context from a previous step
    query = "What are the main causes of homelessness?"
    context = "Homelessness is a complex social problem. A key factor is the lack of affordable housing, which disproportionately affects low-income families and individuals."
    
    print("Generating AI response with Gemini...")
    ai_response = generate_response(query, context)
    
    print("\nAI Response:")
    print(ai_response)

Generating AI response with Gemini...

AI Response:
Based on the provided text, a key factor contributing to homelessness is the lack of affordable housing.  This affects low-income families and individuals disproportionately.



## The Complete RAG Pipeline with Contextual Compression

In [12]:
import os
import fitz
import numpy as np
import google.generativeai as genai
from typing import List, Tuple, Dict, Any
from tqdm import tqdm

# --- 1. Gemini API Configuration and Helper Functions ---
GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY")
if not GOOGLE_API_KEY:
    raise ValueError("GOOGLE_API_KEY environment variable is not set.")
genai.configure(api_key=GOOGLE_API_KEY)

def extract_text_from_pdf(pdf_path: str) -> str:
    """A placeholder for your PDF extraction function."""
    all_text = []
    try:
        with fitz.open(pdf_path) as doc:
            for page in doc:
                all_text.append(page.get_text("text"))
    except Exception as e:
        print(f"Error reading PDF: {e}")
        return ""
    return "\n".join(all_text)

def chunk_text(text: str, chunk_size: int, overlap: int) -> List[str]:
    """A placeholder for your text chunking function."""
    chunks = []
    step = chunk_size - overlap
    for i in range(0, len(text), step):
        chunks.append(text[i:i + chunk_size])
    return chunks

def create_embeddings(texts: str or List[str], model: str = "models/embedding-001") -> Any:
    """Creates embeddings for a list of texts using the Gemini API."""
    try:
        response = genai.embed_content(model=model, content=texts)
        # Gemini returns a list of embeddings, even for a single text.
        return response['embedding']
    except Exception as e:
        print(f"Embedding error: {e}")
        return []

class SimpleVectorStore:
    """A placeholder for your vector store class."""
    def __init__(self):
        self.vectors = []
        self.documents = []
        self.metadata = []

    def add_item(self, text, embedding, metadata=None):
        self.vectors.append(np.array(embedding))
        self.documents.append(text)
        self.metadata.append(metadata or {})
    
    def similarity_search(self, query_embedding, k=5):
        if not self.vectors: return []
        
        query_vector = np.array(query_embedding)
        similarities = []
        for i, vector in enumerate(self.vectors):
            if np.linalg.norm(query_vector) == 0 or np.linalg.norm(vector) == 0:
                similarity = 0
            else:
                similarity = np.dot(query_vector, vector) / (np.linalg.norm(query_vector) * np.linalg.norm(vector))
            similarities.append({"text": self.documents[i], "score": similarity, "metadata": self.metadata[i]})
        similarities.sort(key=lambda x: x["score"], reverse=True)
        return similarities[:k]

def batch_compress_chunks(chunks: List[str], query: str, compression_type: str, model: str) -> List[Tuple[str, float]]:
    """A placeholder for your batch chunk compression function."""
    compressed_results = []
    for chunk in chunks:
        # Here's where the LLM call happens inside your function
        # This is the part that would be Gemini-compatible
        compressed_text = f"Compressed: '{chunk[:50]}...'"
        ratio = 50.0
        compressed_results.append((compressed_text, ratio))
    return compressed_results

def generate_response(query: str, context: str, model: str = "gemini-1.5-flash") -> str:
    """Generate a response based on the query and context using Gemini."""
    system_prompt = "You are a helpful AI assistant. Answer the user's question based only on the provided context. If the context doesn't contain the answer, say so clearly."
    user_prompt = f"Context:\n{context}\n\nQuestion: {query}"
    
    try:
        gemini_model = genai.GenerativeModel(model, system_instruction=system_prompt)
        response = gemini_model.generate_content(user_prompt)
        return response.text
    except Exception as e:
        print(f"An error occurred during response generation: {e}")
        return "I could not generate a response due to an error."

def process_document(pdf_path: str, chunk_size: int = 800) -> SimpleVectorStore:
    """Process a document to create chunks and a vector store."""
    text = extract_text_from_pdf(pdf_path)
    chunks = chunk_text(text, chunk_size, 0)
    chunk_embeddings = create_embeddings(chunks)
    
    vector_store = SimpleVectorStore()
    metadata = [{"index": i, "source": pdf_path} for i in range(len(chunks))]
    
    for chunk, embedding, meta in zip(chunks, chunk_embeddings, metadata):
        vector_store.add_item(chunk, embedding, meta)
    
    return vector_store

# --- 2. Your original `rag_with_compression` function ---
def rag_with_compression(pdf_path: str, query: str, k: int = 10, compression_type: str = "selective", model: str = "gemini-1.5-flash") -> Dict:
    """
    Complete RAG pipeline with contextual compression.
    
    Args:
        pdf_path (str): Path to PDF document
        query (str): User query
        k (int): Number of chunks to retrieve initially
        compression_type (str): Type of compression
        model (str): LLM model to use
        
    Returns:
        dict: Results including query, compressed chunks, and response
    """
    print("\n=== RAG WITH CONTEXTUAL COMPRESSION ===")
    print(f"Query: {query}")
    print(f"Compression type: {compression_type}")
    
    # Process the document to extract text, chunk it, and create embeddings
    vector_store = process_document(pdf_path)
    
    # Create an embedding for the query
    query_embedding = create_embeddings(query)
    
    # Retrieve the top k most similar chunks based on the query embedding
    print(f"Retrieving top {k} chunks...")
    results = vector_store.similarity_search(query_embedding, k=k)
    retrieved_chunks = [result["text"] for result in results]
    
    # Apply compression to the retrieved chunks
    compressed_results = batch_compress_chunks(retrieved_chunks, query, compression_type, model)
    compressed_chunks = [result[0] for result in compressed_results]
    compression_ratios = [result[1] for result in compressed_results]
    
    # Filter out any empty compressed chunks
    filtered_chunks = [(chunk, ratio) for chunk, ratio in zip(compressed_chunks, compression_ratios) if chunk.strip()]
    
    if not filtered_chunks:
        print("Warning: All chunks were compressed to empty strings. Using original chunks.")
        filtered_chunks = [(chunk, 0.0) for chunk in retrieved_chunks]
    
    compressed_chunks, compression_ratios = zip(*filtered_chunks)
    
    # Generate context from the compressed chunks
    context = "\n\n---\n\n".join(compressed_chunks)
    
    # Generate a response based on the compressed chunks
    print("Generating response based on compressed chunks...")
    response = generate_response(query, context, model)
    
    result = {
        "query": query,
        "original_chunks": retrieved_chunks,
        "compressed_chunks": compressed_chunks,
        "compression_ratios": compression_ratios,
        "context_length_reduction": f"{sum(compression_ratios)/len(compression_ratios):.2f}%",
        "response": response
    }
    
    print("\n=== RESPONSE ===")
    print(response)
    
    return result

# --- 3. Main Logic for a runnable example ---
if __name__ == "__main__":
    pdf_file_path = '/Users/kekunkoya/Desktop/ISEM 770 Class Project/Homelessness.pdf'
    if not os.path.exists(pdf_file_path):
        print(f"Error: PDF file not found at '{pdf_file_path}'")
        exit()
    
    user_query = "What are the main contributing factors to homelessness?"
    rag_with_compression(pdf_file_path, user_query, compression_type="selective")


=== RAG WITH CONTEXTUAL COMPRESSION ===
Query: What are the main contributing factors to homelessness?
Compression type: selective
Retrieving top 10 chunks...
Generating response based on compressed chunks...

=== RESPONSE ===
The provided text does not contain information about the main contributing factors to homelessness.



## Comparing RAG With and Without Compression
Let's create a function to compare standard RAG with our compression-enhanced version:



In [14]:
import os
import fitz
import numpy as np
import google.generativeai as genai
from typing import List, Tuple, Dict, Any
from tqdm import tqdm


def extract_text_from_pdf(pdf_path: str) -> str:
    """Extracts text from a PDF file."""
    all_text = []
    try:
        with fitz.open(pdf_path) as doc:
            for page in doc:
                all_text.append(page.get_text("text"))
    except Exception as e:
        print(f"Error reading PDF: {e}")
        return ""
    return "\n".join(all_text)

def chunk_text(text: str, chunk_size: int, overlap: int) -> List[str]:
    """Chunks the given text into segments."""
    chunks = []
    step = chunk_size - overlap
    for i in range(0, len(text), step):
        chunks.append(text[i:i + chunk_size])
    return chunks

def create_embeddings(texts: str or List[str], model: str = "models/embedding-001") -> Any:
    """Creates embeddings for a list of texts using the Gemini API."""
    try:
        if not texts:
            return []
        # The embed_content endpoint takes a list of strings
        response = genai.embed_content(model=model, content=texts)
        return response['embedding']
    except Exception as e:
        print(f"Embedding error: {e}")
        return []

class SimpleVectorStore:
    """A simple vector store implementation using NumPy."""
    def __init__(self):
        self.vectors = []
        self.documents = []
        self.metadata = []

    def add_documents(self, documents: List[str], vectors: List[Any], metadata: List[Dict]):
        for doc, vec, meta in zip(documents, vectors, metadata):
            self.documents.append(doc)
            self.vectors.append(np.array(vec, dtype=np.float32))
            self.metadata.append(meta)

    def similarity_search(self, query_embedding: Any, top_k: int = 5) -> List[Dict]:
        query_array = np.array(query_embedding, dtype=np.float32)
        similarities = []
        for i, vector in enumerate(self.vectors):
            if np.linalg.norm(query_array) == 0 or np.linalg.norm(vector) == 0:
                similarity = 0
            else:
                similarity = np.dot(query_array, vector) / (np.linalg.norm(query_array) * np.linalg.norm(vector))
            similarities.append({"document": self.documents[i], "score": similarity, "metadata": self.metadata[i]})
        similarities.sort(key=lambda x: x["score"], reverse=True)
        return similarities[:top_k]

def process_document(pdf_path: str, chunk_size: int) -> SimpleVectorStore:
    """Process a document to create chunks and a vector store."""
    text = extract_text_from_pdf(pdf_path)
    chunks = chunk_text(text, chunk_size, 0)
    chunk_embeddings = create_embeddings(chunks)
    if not chunk_embeddings or len(chunks) != len(chunk_embeddings):
        raise RuntimeError("Failed to create embeddings or embedding count mismatch.")
    
    vector_store = SimpleVectorStore()
    metadata = [{"index": i, "source": pdf_path} for i in range(len(chunks))]
    vector_store.add_documents(chunks, chunk_embeddings, metadata)
    
    return vector_store

def generate_response(query: str, context: str, model: str = "gemini-1.5-flash") -> str:
    """Generate a response based on the query and context using Gemini."""
    system_prompt = "You are a helpful assistant that answers questions based on the provided context. If the context doesn't contain the answer, say so clearly."
    user_prompt = f"Context:\n{context}\n\nQuestion: {query}"
    
    try:
        gemini_model = genai.GenerativeModel(model, system_instruction=system_prompt)
        response = gemini_model.generate_content(user_prompt)
        return response.text
    except Exception as e:
        print(f"An error occurred during response generation: {e}")
        return "I could not generate a response due to an error."

# --- 2. Your original `standard_rag` function ---
def standard_rag(pdf_path: str, query: str, k: int = 10, model: str = "gemini-1.5-flash") -> Dict:
    """
    Standard RAG without compression.
    """
    print("\n=== STANDARD RAG ===")
    print(f"Query: {query}")
    
    vector_store = process_document(pdf_path, chunk_size=800)
    
    print("Creating query embedding and retrieving chunks...")
    query_embedding = create_embeddings(query)
    
    results = vector_store.similarity_search(query_embedding, top_k=k)
    retrieved_chunks = [result["document"] for result in results]
    
    context = "\n\n---\n\n".join(retrieved_chunks)
    
    print("Generating response...")
    response = generate_response(query, context, model)
    
    result = {
        "query": query,
        "chunks": retrieved_chunks,
        "response": response
    }
    
    print("\n=== RESPONSE ===")
    print(response)
    
    return result

# --- 3. Main Logic for a runnable example ---
if __name__ == "__main__":
    pdf_file_path = '/Users/kekunkoya/Desktop/ISEM 770 Class Project/Homelessness.pdf'
    if not os.path.exists(pdf_file_path):
        print(f"Error: PDF file not found at '{pdf_file_path}'")
        exit()
    
    user_query = "What are the key contributing factors to homelessness?"
    standard_rag(pdf_file_path, user_query)




=== STANDARD RAG ===
Query: What are the key contributing factors to homelessness?
Creating query embedding and retrieving chunks...
Generating response...

=== RESPONSE ===
This document discusses methods for measuring and understanding homelessness,  including the types of homelessness (long-term, episodic, etc.) and data collection strategies, but it does not identify the key contributing factors to homelessness.



## Evaluating Our Approach
Now, let's implement a function to evaluate and compare the responses:

In [15]:
def evaluate_compression(pdf_path, query, reference_answer=None, compression_types=["selective", "summary", "extraction"]):
    """
    Compare different compression techniques with standard RAG.
    
    Args:
        pdf_path (str): Path to PDF document
        query (str): User query
        reference_answer (str): Optional reference answer
        compression_types (List[str]): Compression types to evaluate
        
    Returns:
        dict: Evaluation results
    """
    print("\n=== EVALUATING CONTEXTUAL COMPRESSION ===")
    print(f"Query: {query}")
    
    # Run standard RAG without compression
    standard_result = standard_rag(pdf_path, query)
    
    # Dictionary to store results of different compression techniques
    compression_results = {}
    
    # Run RAG with each compression technique
    for comp_type in compression_types:
        print(f"\nTesting {comp_type} compression...")
        compression_results[comp_type] = rag_with_compression(pdf_path, query, compression_type=comp_type)
    
    # Gather responses for evaluation
    responses = {
        "standard": standard_result["response"]
    }
    for comp_type in compression_types:
        responses[comp_type] = compression_results[comp_type]["response"]
    
    # Evaluate responses if a reference answer is provided
    if reference_answer:
        evaluation = evaluate_responses(query, responses, reference_answer)
        print("\n=== EVALUATION RESULTS ===")
        print(evaluation)
    else:
        evaluation = "No reference answer provided for evaluation."
    
    # Calculate metrics for each compression type
    metrics = {}
    for comp_type in compression_types:
        metrics[comp_type] = {
            "avg_compression_ratio": f"{sum(compression_results[comp_type]['compression_ratios'])/len(compression_results[comp_type]['compression_ratios']):.2f}%",
            "total_context_length": len("\n\n".join(compression_results[comp_type]['compressed_chunks'])),
            "original_context_length": len("\n\n".join(standard_result['chunks']))
        }
    
    # Return the evaluation results, responses, and metrics
    return {
        "query": query,
        "responses": responses,
        "evaluation": evaluation,
        "metrics": metrics,
        "standard_result": standard_result,
        "compression_results": compression_results
    }

## Running Our Complete System (Custom Query)

In [16]:
import os
import json
import fitz
import google.generativeai as genai
from typing import List, Dict, Any

# Initialize Gemini client
GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY")
if not GOOGLE_API_KEY:
    raise ValueError("GOOGLE_API_KEY environment variable is not set.")
genai.configure(api_key=GOOGLE_API_KEY)

def extract_text_from_pdf(pdf_path: str) -> str:
    """Extracts text from a PDF file."""
    doc = fitz.open(pdf_path)
    text_chunks = [page.get_text("text") for page in doc]
    doc.close()
    return "\n".join(text_chunks)

def compress_text(full_text: str, method: str) -> str:
    """Stub for text compression."""
    length = len(full_text)
    if method == "selective":
        return full_text[:500]
    elif method == "summary":
        start = max(0, (length - 500) // 2)
        return full_text[start:start+500]
    elif method == "extraction":
        return full_text[-500:]
    else:
        return full_text

def run_rag_pipeline(context: str, query: str) -> str:
    """
    Stub for RAG pipeline.
    Returns a dummy answer. Replace with actual retrieval and generation steps.
    """
    return f"Dummy RAG answer for query: '{query}' based on provided context."

def evaluate_compression(pdf_path: str,
                         query: str,
                         reference_answer: str,
                         compression_types: list[str]) -> Dict[str, str]:
    """
    Applies each compression method, runs a dummy RAG, and evaluates against a reference using Gemini.
    """
    results = {}
    full_text = extract_text_from_pdf(pdf_path)
    
    for ctype in compression_types:
        compressed_ctx = compress_text(full_text, method=ctype)
        rag_answer = run_rag_pipeline(compressed_ctx, query)
        
        # If no reference, store raw answer
        if not reference_answer.strip():
            results[ctype] = rag_answer
            continue
        
        eval_prompt = (
            f"Compression type: {ctype}\n\n"
            f"Question: {query}\n\n"
            f"Context:\n{compressed_ctx}\n\n"
            f"RAG Answer: {rag_answer}\n\n"
            f"Reference Answer: {reference_answer}\n\n"
            "Evaluate for faithfulness and relevance. Provide details."
        )
        
        try:
            # Create a Gemini model instance with the system prompt
            resp = genai.GenerativeModel(
                "gemini-1.5-flash",
                system_instruction="You are an objective evaluator."
            ).generate_content(eval_prompt)
            results[ctype] = resp.text
        except Exception as e:
            print(f"An error occurred during evaluation for {ctype}: {e}")
            results[ctype] = "Evaluation failed due to an error."
    
    return results

# Usage example
pdf_path = "/Users/kekunkoya/Desktop/ISEM 770 Class Project/AI_Information.pdf"
query = "What are typical policy objectives in European homelessness strategies?"
reference_answer = """
Section Setting concrete targets
listing of subsidized housing in the newspapers
prevention reduction of homelessness through predictive AI
supportive programs through workplace, hospitals and churches
"""
compression_types = ["selective", "summary", "extraction"]

results = evaluate_compression(
    pdf_path=pdf_path,
    query=query,
    reference_answer=reference_answer,
    compression_types=compression_types
)

for ctype, eval_text in results.items():
    print(f"\n--- {ctype.upper()} EVALUATION ---\n{eval_text}")


--- SELECTIVE EVALUATION ---
**Faithfulness:** The RAG answer is completely unfaithful to the provided context.  The context is an introduction to artificial intelligence, containing no information whatsoever about European homelessness strategies or policy objectives. The RAG answer hallucinates information entirely unrelated to the input.

**Relevance:** The RAG answer is completely irrelevant to the provided context. The question about European homelessness strategies is unrelated to the text about artificial intelligence.  The generated answer, even if factually correct (which it may not be), is entirely out of scope.


**Overall:** The performance of the RAG system is extremely poor in this instance.  It demonstrates a complete failure to appropriately leverage the provided context and instead generates a fabricated response.  The lack of faithfulness and relevance indicates a critical flaw in the system's ability to perform its intended function.


--- SUMMARY EVALUATION ---
The

## Visualizing Compression Results

In [17]:
def visualize_compression_results(evaluation_results):
    """
    Visualize the results of different compression techniques.
    
    Args:
        evaluation_results (Dict): Results from evaluate_compression function
    """
    # Extract the query and standard chunks from the evaluation results
    query = evaluation_results["query"]
    standard_chunks = evaluation_results["standard_result"]["chunks"]
    
    # Print the query
    print(f"Query: {query}")
    print("\n" + "="*80 + "\n")
    
    # Get a sample chunk to visualize (using the first chunk)
    original_chunk = standard_chunks[0]
    
    # Iterate over each compression type and show a comparison
    for comp_type in evaluation_results["compression_results"].keys():
        compressed_chunks = evaluation_results["compression_results"][comp_type]["compressed_chunks"]
        compression_ratios = evaluation_results["compression_results"][comp_type]["compression_ratios"]
        
        # Get the corresponding compressed chunk and its compression ratio
        compressed_chunk = compressed_chunks[0]
        compression_ratio = compression_ratios[0]
        
        print(f"\n=== {comp_type.upper()} COMPRESSION EXAMPLE ===\n")
        
        # Show the original chunk (truncated if too long)
        print("ORIGINAL CHUNK:")
        print("-" * 40)
        if len(original_chunk) > 800:
            print(original_chunk[:800] + "... [truncated]")
        else:
            print(original_chunk)
        print("-" * 40)
        print(f"Length: {len(original_chunk)} characters\n")
        
        # Show the compressed chunk
        print("COMPRESSED CHUNK:")
        print("-" * 40)
        print(compressed_chunk)
        print("-" * 40)
        print(f"Length: {len(compressed_chunk)} characters")
        print(f"Compression ratio: {compression_ratio:.2f}%\n")
        
        # Show overall statistics for this compression type
        avg_ratio = sum(compression_ratios) / len(compression_ratios)
        print(f"Average compression across all chunks: {avg_ratio:.2f}%")
        print(f"Total context length reduction: {evaluation_results['metrics'][comp_type]['avg_compression_ratio']}")
        print("=" * 80)
    
    # Show a summary table of compression techniques
    print("\n=== COMPRESSION SUMMARY ===\n")
    print(f"{'Technique':<15} {'Avg Ratio':<15} {'Context Length':<15} {'Original Length':<15}")
    print("-" * 60)
    
    # Print the metrics for each compression type
    for comp_type, metrics in evaluation_results["metrics"].items():
        print(f"{comp_type:<15} {metrics['avg_compression_ratio']:<15} {metrics['total_context_length']:<15} {metrics['original_context_length']:<15}")

In [18]:
def visualize_compression_results(evaluation_results: dict[str, str]) -> None:
    """
    Print out each compression type alongside its evaluation.
    
    Args:
        evaluation_results: dict mapping compression_type -> evaluation text
    """
    for method, evaluation in evaluation_results.items():
        print(f"--- {method.upper()} ---")
        print(evaluation)
        print()

# Now call it:
visualize_compression_results(results)


--- SELECTIVE ---
**Faithfulness:** The RAG answer is completely unfaithful to the provided context.  The context is an introduction to artificial intelligence, containing no information whatsoever about European homelessness strategies or policy objectives. The RAG answer hallucinates information entirely unrelated to the input.

**Relevance:** The RAG answer is completely irrelevant to the provided context. The question about European homelessness strategies is unrelated to the text about artificial intelligence.  The generated answer, even if factually correct (which it may not be), is entirely out of scope.


**Overall:** The performance of the RAG system is extremely poor in this instance.  It demonstrates a complete failure to appropriately leverage the provided context and instead generates a fabricated response.  The lack of faithfulness and relevance indicates a critical flaw in the system's ability to perform its intended function.


--- SUMMARY ---
The RAG answer is complete