In [1]:
import fitz
import os
import numpy as np
import json
import re
from openai import OpenAI
import torch
from sentence_transformers import SentenceTransformer, SimilarityFunction
from tqdm import tqdm

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
# Load the pre-trained model
embedder = SentenceTransformer("all-MiniLM-L6-v2") # Fazer o embedding usando esse

client = OpenAI(
    api_key=os.getenv("OPENAI_API_KEY")
)

In [3]:
def extract_text_from_pdf(pdf_path):
    """
    Extracts text from a PDF file and prints the first `num_chars` characters.

    Args:
    pdf_path (str): Path to the PDF file.

    Returns:
    str: Extracted text from the PDF.
    """
    # Open the PDF file
    mypdf = fitz.open(pdf_path)
    all_text = ""  # Initialize an empty string to store the extracted text

    # Iterate through each page in the PDF
    for page_num in range(mypdf.page_count):
        page = mypdf[page_num]  # Get the page
        text = page.get_text("text")  # Extract text from the page
        all_text += text  # Append the extracted text to the all_text string

    return all_text  # Return the extracted text

In [4]:
def chunk_text(text, chunk_size, overlap):
    """
    Chunks the given text into segments of n characters with overlap.

    Args:
    text (str): The text to be chunked.
    n (int): The number of characters in each chunk.
    overlap (int): The number of overlapping characters between chunks.

    Returns:
    List[str]: A list of text chunks.
    """
    chunks = []

    for i in range(0, len(text), chunk_size-overlap):
        chunks.append(text[i:i+chunk_size])
    return chunks

In [5]:
def create_embeddings(text):
    response = embedder.encode(text)
    return response

In [6]:
class SimpleVectorStore:
    """
    A lightweight vector store implementation using NumPy.
    """
    def __init__(self, dimension=1536):
        """
        Initialize the vector store.
        
        Args:
            dimension (int): Dimension of embeddings
        """
        self.dimension = dimension
        self.vectors = []
        self.documents = []
        self.metadata = []
    
    def add_documents(self, documents, vectors=None, metadata=None):
        """
        Add documents to the vector store.
        
        Args:
            documents (List[str]): List of document chunks
            vectors (List[List[float]], optional): List of embedding vectors
            metadata (List[Dict], optional): List of metadata dictionaries
        """
        if vectors is None:
            vectors = [None] * len(documents)
        
        if metadata is None:
            metadata = [{} for _ in range(len(documents))]
        
        for doc, vec, meta in zip(documents, vectors, metadata):
            self.documents.append(doc)
            self.vectors.append(vec)
            self.metadata.append(meta)
    
    def similarity_search(self, query_vector, k=5):
        """
        Search for most similar documents.
        
        Args:
            query_vector (List[float]): Query embedding vector
            k (int): Number of results to return
            
        Returns:
            List[Dict]: List of results with documents, scores, and metadata
        """
        if not self.vectors or not self.documents:
            return []
        
        # Convert query vector to numpy array
        query_array = np.array(query_vector)
        
        # Calculate similarities
        similarities = []
        for i, vector in enumerate(self.vectors):
            if vector is not None:
                # Compute cosine similarity
                similarity = np.dot(query_array, vector) / (
                    np.linalg.norm(query_array) * np.linalg.norm(vector)
                )
                similarities.append((i, similarity))
        
        # Sort by similarity (descending)
        similarities.sort(key=lambda x: x[1], reverse=True)
        
        # Get top-k results
        results = []
        for i, score in similarities[:k]:
            results.append({
                "document": self.documents[i],
                "score": float(score),
                "metadata": self.metadata[i]
            })
        
        return results

In [7]:
def process_document(pdf_path, chunk_size=800):
    """
    Process a document for use with RSE.
    
    Args:
        pdf_path (str): Path to the PDF document
        chunk_size (int): Size of each chunk in characters
        
    Returns:
        Tuple[List[str], SimpleVectorStore, Dict]: Chunks, vector store, and document info
    """
    # Extract text from the PDF file
    text = extract_text_from_pdf(pdf_path)
    
    # Chunk the extracted text into non-overlapping segments
    chunks = chunk_text(text, chunk_size=chunk_size, overlap=0)
    
    # Generate embeddings for the text chunks
    chunk_embeddings = create_embeddings(chunks)
    
    # Create an instance of the SimpleVectorStore
    vector_store = SimpleVectorStore()
    
    # Add documents with metadata (including chunk index for later reconstruction)
    metadata = [{"chunk_index": i, "source": pdf_path} for i in range(len(chunks))]
    vector_store.add_documents(chunks, chunk_embeddings, metadata)
    
    # Track original document structure for segment reconstruction
    doc_info = {
        "chunks": chunks,
        "source": pdf_path,
    }
    
    return vector_store

In [8]:
def reconstruct_segments(chunks, best_segments):
    """
    Reconstruct text segments based on chunk indices.
    
    Args:
        chunks (List[str]): List of all document chunks
        best_segments (List[Tuple[int, int]]): List of (start, end) indices for segments
        
    Returns:
        List[str]: List of reconstructed text segments
    """
    reconstructed_segments = []  # Initialize an empty list to store the reconstructed segments
    
    for start, end in best_segments:
        # Join the chunks in this segment to form the complete segment text
        segment_text = " ".join(chunks[start:end])
        # Append the segment text and its range to the reconstructed_segments list
        reconstructed_segments.append({
            "text": segment_text,
            "segment_range": (start, end),
        })
    
    return reconstructed_segments  # Return the list of reconstructed text segments

In [9]:
def compress_chunk(chunk, query, compression_type="selective", model="gpt-3.5-turbo-1106"):
    """
    Compress a retrieved chunk by keeping only the parts relevant to the query.
    
    Args:
        chunk (str): Text chunk to compress
        query (str): User query
        compression_type (str): Type of compression ("selective", "summary", or "extraction")
        model (str): LLM model to use
        
    Returns:
        str: Compressed chunk
    """
    # Define system prompts for different compression approaches
    if compression_type == "selective":
        system_prompt = """You are an expert at information filtering. 
        Your task is to analyze a document chunk and extract ONLY the sentences or paragraphs that are directly 
        relevant to the user's query. Remove all irrelevant content.

        Your output should:
        1. ONLY include text that helps answer the query
        2. Preserve the exact wording of relevant sentences (do not paraphrase)
        3. Maintain the original order of the text
        4. Include ALL relevant content, even if it seems redundant
        5. EXCLUDE any text that isn't relevant to the query

        Format your response as plain text with no additional comments."""
    elif compression_type == "summary":
        system_prompt = """You are an expert at summarization. 
        Your task is to create a concise summary of the provided chunk that focuses ONLY on 
        information relevant to the user's query.

        Your output should:
        1. Be brief but comprehensive regarding query-relevant information
        2. Focus exclusively on information related to the query
        3. Omit irrelevant details
        4. Be written in a neutral, factual tone

        Format your response as plain text with no additional comments."""
    else:  # extraction
        system_prompt = """You are an expert at information extraction.
        Your task is to extract ONLY the exact sentences from the document chunk that contain information relevant 
        to answering the user's query.

        Your output should:
        1. Include ONLY direct quotes of relevant sentences from the original text
        2. Preserve the original wording (do not modify the text)
        3. Include ONLY sentences that directly relate to the query
        4. Separate extracted sentences with newlines
        5. Do not add any commentary or additional text

        Format your response as plain text with no additional comments."""

    # Define the user prompt with the query and document chunk
    user_prompt = f"""
        Query: {query}

        Document Chunk:
        {chunk}

        Extract only the content relevant to answering this query.
    """
    
    # Generate a response using the OpenAI API
    response = client.chat.completions.create(
        model=model,
        messages=[
            {"role": "system", "content": system_prompt},
            {"role": "user", "content": user_prompt}
        ],
        temperature=0
    )
    
    # Extract the compressed chunk from the response
    compressed_chunk = response.choices[0].message.content.strip()
    
    # Calculate compression ratio
    original_length = len(chunk)
    compressed_length = len(compressed_chunk)
    compression_ratio = (original_length - compressed_length) / original_length * 100
    
    return compressed_chunk, compression_ratio

In [10]:
def batch_compress_chunks(chunks, query, compression_type="selective", model="gpt-3.5-turbo-1106"):
    """
    Compress multiple chunks individually.
    
    Args:
        chunks (List[str]): List of text chunks to compress
        query (str): User query
        compression_type (str): Type of compression ("selective", "summary", or "extraction")
        model (str): LLM model to use
        
    Returns:
        List[Tuple[str, float]]: List of compressed chunks with compression ratios
    """
    results = []  # Initialize an empty list to store the results
    total_original_length = 0  # Initialize a variable to store the total original length of chunks
    total_compressed_length = 0  # Initialize a variable to store the total compressed length of chunks
    
    # Iterate over each chunk
    for i, chunk in enumerate(chunks):
        # Compress the chunk and get the compressed chunk and compression ratio
        compressed_chunk, compression_ratio = compress_chunk(chunk, query, compression_type, model)
        results.append((compressed_chunk, compression_ratio))  # Append the result to the results list
        
        total_original_length += len(chunk)  # Add the length of the original chunk to the total original length
        total_compressed_length += len(compressed_chunk)  # Add the length of the compressed chunk to the total compressed length
    
    # Calculate the overall compression ratio
    overall_ratio = (total_original_length - total_compressed_length) / total_original_length * 100
    
    return results  # Return the list of compressed chunks with compression ratios

In [11]:
# Define the path to the PDF file
pdf_path = "AI_Information.pdf"

# Process the document (extract text, create chunks, generate questions, build vector store)
vector_store = process_document(
    pdf_path, 
    chunk_size=1000)

In [12]:
def semantic_search(query, vector_store, k=5):
    """
    Performs semantic search using the query and vector store.

    Args:
    query (str): The search query.
    vector_store (SimpleVectorStore): The vector store to search in.
    k (int): Number of results to return.

    Returns:
    List[Dict]: Top k most relevant items.
    """
    # Create embedding for the query
    query_embedding_response = create_embeddings(query)
    query_embedding = query_embedding_response
    
    # Search the vector store
    results = vector_store.similarity_search(query_embedding, k=k)
    
    return results

In [13]:
def generate_response(query, context, model="gpt-3.5-turbo-1106"):
    """
    Generates a response based on the query and context.

    Args:
    query (str): User's question.
    context (str): Context information retrieved from the vector store.
    model (str): Model to use for response generation.

    Returns:
    str: Generated response.
    """
    system_prompt = "You are an AI assistant that strictly answers based on the given context. If the answer cannot be derived directly from the provided context, respond with: 'I do not have enough information to answer that.'"
    
    user_prompt = f"""
        Context:
        {context}

        Question: {query}

        Please answer the question based only on the context provided above. Be concise and accurate.
    """
    
    response = client.chat.completions.create(
        model=model,
        temperature=0,
        messages=[
            {"role": "system", "content": system_prompt},
            {"role": "user", "content": user_prompt}
        ]
    )
    
    return response.choices[0].message.content

In [14]:
def rag_with_compression(pdf_path, query, k=10, compression_type="selective", model="gpt-3.5-turbo-1106"):
    """
    Complete RAG pipeline with contextual compression.
    
    Args:
        pdf_path (str): Path to PDF document
        query (str): User query
        k (int): Number of chunks to retrieve initially
        compression_type (str): Type of compression
        model (str): LLM model to use
        
    Returns:
        dict: Results including query, compressed chunks, and response
    """
    
    # Process the document to extract text, chunk it, and create embeddings
    vector_store = process_document(pdf_path)
    
    # Create an embedding for the query
    query_embedding = create_embeddings(query)
    
    # Retrieve the top k most similar chunks based on the query embedding
    results = vector_store.similarity_search(query_embedding, k=k)
    retrieved_chunks = [result["document"] for result in results]
    
    # Apply compression to the retrieved chunks
    compressed_results = batch_compress_chunks(retrieved_chunks, query, compression_type, model)
    compressed_chunks = [result[0] for result in compressed_results]
    compression_ratios = [result[1] for result in compressed_results]
    
    # Filter out any empty compressed chunks
    filtered_chunks = [(chunk, ratio) for chunk, ratio in zip(compressed_chunks, compression_ratios) if chunk.strip()]
    
    if not filtered_chunks:
        # If all chunks are compressed to empty strings, use the original chunks
        filtered_chunks = [(chunk, 0.0) for chunk in retrieved_chunks]
    else:
        compressed_chunks, compression_ratios = zip(*filtered_chunks)
    
    # Generate context from the compressed chunks
    context = "\n\n---\n\n".join(compressed_chunks)
    
    # Generate a response based on the compressed chunks
    response = generate_response(query, context, model)
    
    # Prepare the result dictionary
    result = {
        "query": query,
        "original_chunks": retrieved_chunks,
        "compressed_chunks": compressed_chunks,
        "compression_ratios": compression_ratios,
        "context_length_reduction": f"{sum(compression_ratios)/len(compression_ratios):.2f}%",
        "response": response
    }
    
    
    return result

In [15]:
extracted_text = extract_text_from_pdf(pdf_path)
# Chunk the extracted text into non-overlapping segments
chunks = chunk_text(extracted_text, chunk_size=1000, overlap=100)

In [16]:
import json
import asyncio
import numpy as np
import pandas as pd

# Initialize the SentenceTransformer model and set the similarity function
model = SentenceTransformer("all-MiniLM-L6-v2")
model.similarity_fn_name = SimilarityFunction.DOT

In [17]:
async def process_validation_data(k, compression_type):
    system_prompt = """You are an AI assistant that strictly answers based on the given context. If the answer cannot be derived directly and exactly from the provided context, respond with: 'I do not have enough information to answer that.'
    First think about the keywords from the question and then use them to elaborate the answer.
    The response needs to be just the answer sentence
    
    """
    # Load the validation data from the JSON file
    with open('val.json') as f:
        data = json.load(f)

    # List to store the results for each sample
    results = []

    # Iterate over each example in the validation data
    for idx, item in enumerate(data):
        query = item['question']
        ideal_answer = item['ideal_answer']
        
        # Retrieve the top k most relevant context chunks
                
        # Generate the AI response using the system prompt and the user prompt
        result = rag_with_compression(pdf_path, query, k, compression_type)
        ai_response = result['response']
        # Evaluate similarity using SentenceTransformer
        # Encode the AI response and ideal answer
        embedding_response = model.encode([ai_response])
        embedding_ideal = model.encode([ideal_answer])
        # Compute similarity score (result is a 1x1 matrix; extract the single value)
        similarity_matrix = model.similarity(embedding_response, embedding_ideal)
        score = similarity_matrix[0][0].numpy()
        
        # Prepare the result dictionary with dynamic context columns
        result = {
            "Query": query,
            "Ideal Answer": ideal_answer,
            "AI Response": ai_response,
            "Score": score
        }
        
        # Append the result to the list
        results.append(result)

    # Create a DataFrame from the results
    df = pd.DataFrame(results)
    return df

In [18]:
import nest_asyncio
nest_asyncio.apply()

In [19]:
for k in range(1, 9):
    result_df = await process_validation_data(k= k, compression_type= "compression")
    print(result_df['Score'].mean())

0.5974777221679688
0.4966762542724609
0.4966740608215332
0.5844135284423828
0.5023880004882812
0.5558583259582519
0.42058844566345216
0.48969259262084963


In [None]:
for k in range(1, 9):
    result_df = await process_validation_data(k= k, compression_type= "extraction")
    print(result_df['Score'].mean())

In [None]:
for k in range(1, 9):
    result_df = await process_validation_data(k= k, compression_type= "summary")
    print(result_df['Score'].mean())