**A RAG Ensemble Pipeline Implementation**  

**Overview**  
This is an ensemble of the following RAG (Retrieval-Augmented Generation) techniques:
- Query Rewriting
- LLM-based Reranker (previously used cross-encoder reranker in comments)
- Context Retrieval

This pipeline uses:
- LangChain
- FAISS (Facebook AI Similarity Search)
- OpenAI embeddings
- GPT-4o-mini API

**Preprocessing**  
I preprocessed my grandfather's memoir titled "My Life Story" into 10 PDFs (chapters). Each PDF was processed using Fitz into continuous strings and chunked into langchain Document objects. Metadata was added to each chunk to aid in retrieval of context.

In [7]:
import os
import re
import fitz
from dotenv import load_dotenv
from pydantic import BaseModel, Field
from langchain.docstore.document import Document
from sentence_transformers import CrossEncoder
from langchain_openai import ChatOpenAI
from langchain_openai import OpenAIEmbeddings
from langchain.prompts import PromptTemplate
from langchain_community.vectorstores import FAISS

In [2]:
# Load from .env file that contains the OpenAI API key
load_dotenv() 

# Get OpenAI API key from .env file
openai_api_key = os.getenv("OPENAI_API_KEY")

In [212]:
# Make a list of the PDF paths
paths = [os.path.join(os.getcwd(), "RAG Eval", "pdfs", file) for file in os.listdir(os.path.join(os.getcwd(), "RAG Eval", "pdfs"))]

In [213]:
def encode_pdfs(paths, chunk_size, chunk_overlap):
    """
    Preprocesses PDFs using Fitz then encodes chunks into a vector store using OpenAI 
    embeddings while saving source and index as metadata. 
        paths: A list of paths to the PDF files.
        chunk_size: The desired size of each text chunk.
        chunk_overlap: The amount of overlap between consecutive chunks.

    Returns:
        A FAISS vector store containing the encoded content of the PDFs with citations.
    """

    all_texts = []

    index = 0  # Initialize a global index to keep track of the index across all chunks

    for path in paths:
        # Open the PDF document located at the specified path
        doc = fitz.open(path)
        content = ""
        # Iterate over each page in the document
        for page_num in range(len(doc)):
            # Get the current page
            page = doc[page_num]
            # Extract the text content from the current page and append it to the content string
            content += page.get_text()
        # Divide the content into chunks of specified size with overlap.
        chunks = []
        start = 0
        while start < len(content):
            end = start + chunk_size
            chunk = content[start:end]
            # Chunk is concatinated 
            chunks.append(Document(page_content=chunk))
            # The start position is incremented by the chunk size minus the overlap to ensure consecutive chunks overlap.
            start += chunk_size - chunk_overlap
        # Extract file name from path
        file_name = os.path.basename(path)
        
        # Update metadata instead of appending to page_content
        for chunk in chunks:
            chunk.metadata.update({
                "index": index,
                "source": file_name
            })
            index += 1  # Increment the global index for each chunk

        all_texts.extend(chunks)

    # Create embeddings
    embeddings = OpenAIEmbeddings()

    # Create vector store
    vectorstore = FAISS.from_documents(all_texts, embeddings)

    return vectorstore

In [214]:
chunk_size = 300
chunk_overlap= 200
# Encode the PDFs
chunks_vector_store = encode_pdfs(paths, chunk_size, chunk_overlap)

In [3]:
#save the vector store
#chunks_vector_store.save_local("my_life_story_ensemble.json")

#load the vector store
chunks_vector_store = FAISS.load_local("my_life_story_ensemble.json", OpenAIEmbeddings(), allow_dangerous_deserialization=True)

In [4]:
def get_chunk_by_index(vectorstore, target_index):
    """
    Retrieve a chunk from the vectorstore based on its index in the metadata. Will be called in
    get_contex().
    
    Args:
    vectorstore (VectorStore): The vectorstore containing the chunks.
    target_index (int): The index of the chunk to retrieve.
    
    Returns:
    Optional[Document]: The retrieved chunk as a Document object, or None if not found.
    """
    # Retrieve all documents from the vectorstore
    all_docs = vectorstore.similarity_search("", k=vectorstore.index.ntotal)
    
    # Search for the document with the specified index
    for doc in all_docs:
        if doc.metadata.get('index') == target_index:
            return doc
            
    # If not found, return None
    return None

In [25]:
def rewrite_query(original_query):
    """
    Rewrites the original query to improve retrieval.
    
    Args:
    original_query (str): The original user query
    
    Returns:
    str: The rewritten query
    """
    re_write_llm = ChatOpenAI(temperature=0, model_name="gpt-4o-mini", max_tokens=4000)

    # Create a prompt template for query rewriting
    query_rewrite_template = """You are an AI assistant tasked with reformulating user queries to improve retrieval in a RAG system. 
    The following query is a question pertaining to George Shambaugh's life. Reword the same question in 3 very concise ways, using 
    examples of first-person as if George is asking himself and third-person as if someone else is asking about him.

    Original query: {original_query}

    Rewritten query:"""

    query_rewrite_prompt = PromptTemplate(
        input_variables=["original_query"],
        template=query_rewrite_template
    )

    # Create an LLMChain for query rewriting
    query_rewriter = query_rewrite_prompt | re_write_llm
    
    response = query_rewriter.invoke(original_query)
    return response.content

In [15]:
#LLM-based Reranker
class RatingScore(BaseModel):
    """
    Represents a rating score for a document's relevance to a query.
    
    Attributes:
    relevance_score (float): The relevance score of a document to a query.
    """
    relevance_score: float = Field(..., description="The relevance score of a document to a query.")

def reranker(new_query, chunks_vector_store, top_n: int = 3):
    """
    Reranks documents based on their relevance to a new query using an LLM model.
    
    Args:
    new_query (str): The new query to search for relevant documents.
    chunks_vector_store: The vector store to query.
    top_n (int, optional): The number of top-ranked documents to return. Defaults to 3.
    
    Returns:
    List[Document]: A list of documents reranked by their relevance to the new query.
    """
    # Retrieve initial documents based on the new query
    docs = chunks_vector_store.similarity_search(new_query, k=10)

    # Define a prompt template for the LLM to rate document relevance
    prompt_template = PromptTemplate(
        input_variables=["query", "doc"],
        template= """On a scale of 1-10, rate the relevance of the following chunk from 
        George Shambaugh's memoir to the query. Consider the specific context and intent 
        of the query, not just keyword matches.
        Query: {query}
        Document: {doc}
        Relevance Score:"""
    )
    
    # Initialize the LLM model for rating document relevance
    llm = ChatOpenAI(temperature=0, model_name="gpt-4o-mini", max_tokens=4000)
    llm_chain = prompt_template | llm.with_structured_output(RatingScore)
    
    # Score each document based on its relevance to the new query
    scored_docs = []
    for doc in docs:
        input_data = {"query": new_query, "doc": doc.page_content}
        score = llm_chain.invoke(input_data).relevance_score
        try:
            score = float(score)
        except ValueError:
            score = 0  # Default score if parsing fails
        scored_docs.append((doc, score))
    
    # Sort documents by their relevance scores in descending order
    reranked_docs = sorted(scored_docs, key=lambda x: x[1], reverse=True)
    # Return the top N reranked documents
    return [doc for doc, _ in reranked_docs[:top_n]]

In [None]:
# Cross-encoder Reranker (not used in lastest enseble)
"""def reranker(new_query, chunks_vector_store, rerank_top_k = 3):
    Retrieve and rerank documents based on the query using a cross-encoder model.

    #Args:
    #query (str): The query to search for relevant documents.
    #chunks_vector_store: The vector store to query.

    #Returns:
    #List[str]: A list of documents reranked by their relevance to the query

    cross_encoder = CrossEncoder('cross-encoder/ms-marco-MiniLM-L-6-v2')

    # Initial retrieval
    initial_docs = chunks_vector_store.similarity_search(new_query, k=10)

    # Prepare pairs for cross-encoder
    pairs = [[new_query, doc.page_content] for doc in initial_docs]

    # Get cross-encoder scores
    scores = cross_encoder.predict(pairs)

    # Sort documents by score and include index metadata
    scored_docs = sorted(zip(initial_docs, scores), key=lambda x: x[1], reverse=True)

    # Return top reranked documents with their index metadata
    return [doc for doc, _ in scored_docs[:rerank_top_k]]"""

In [9]:
def get_context(chunks_vector_store, reranked_chunks, num_neighbors: int = 4, chunk_overlap: int = 200):
    """
    This function retrieves the context (surrounding chunks) of the reranked chunks 
    and concatonates them together accounting for overlap.

    Args:
        chunks_vector_store: The vector store to query.
        reranked_chunks: The reranked chunks to retrieve the context for.
        num_neighbors: The number of neighboring chunks to retrieve.
        chunk_overlap: The amount of overlap between neighboring chunks.

    Returns:
        A list of context sequences for the reranked chunks.
    """
    
    result_sequences = []

    for chunk in reranked_chunks:
        current_index = chunk.metadata.get('index', None)
        if current_index is None:
            continue

        # Determine the range of chunks to retrieve
        start_index = max(0, current_index - num_neighbors)
        end_index = current_index + num_neighbors + 1  # +1 because range is exclusive at the end

        # Retrieve all chunks in the range
        neighbor_chunks = []
        for i in range(start_index, end_index):
            neighbor_chunk = get_chunk_by_index(chunks_vector_store, i)
            if neighbor_chunk:
                neighbor_chunks.append(neighbor_chunk)

        # Check if neighbor_chunks is empty
        if not neighbor_chunks:
            continue  # Skip to the next chunk if no neighbors found

        # Concatenate chunks, accounting for overlap
        concatenated_text = neighbor_chunks[0].page_content
        for i in range(1, len(neighbor_chunks)):
            current_chunk = neighbor_chunks[i].page_content
            overlap_start = max(0, len(concatenated_text) - chunk_overlap)
            concatenated_text = concatenated_text[:overlap_start] + current_chunk

        result_sequences.append(concatenated_text)

    return result_sequences

In [10]:
class QuestionAnswerFromContext(BaseModel):
    """
    Model to generate an answer to a query based on a given context.
    
    Attributes:
        answer_based_on_content (str): The generated answer and citation based on the context.
    """
    answer_based_on_content: str = Field(description="Generates an answer and [citation] to a query based on a given context.")
    
def create_question_answer_from_context_chain(llm):
    # Initialize the ChatOpenAI model with specific parameters
    question_answer_from_context_llm = llm

    # Define the prompt template for chain-of-thought reasoning
    question_answer_prompt_template = """ 
    You are querying a memior called "My Life Story" written by George Shambaugh.
    For the question below, provide a concise but suffice answer. If you don't know, only write "The RAG retrieval was unable to provide sufficient context":
    {context}
    Question
    {question}
    """

    # Create a PromptTemplate object with the specified template and input variables
    question_answer_from_context_prompt = PromptTemplate(
        template=question_answer_prompt_template,
        input_variables=["context", "question"],
    )

    # Create a chain by combining the prompt template and the language model
    question_answer_from_context_cot_chain = question_answer_from_context_prompt | question_answer_from_context_llm.with_structured_output(
        QuestionAnswerFromContext)
    return question_answer_from_context_cot_chain

In [11]:
def answer_question_from_context(question, context, question_answer_from_context_chain):
    """
    Answer a question using the given context by invoking a chain of reasoning.

    Args:
        question: The question to be answered.
        context: The context to be used for answering the question.

    Returns:
        A dictionary containing the answer, context, and question.
    """
    input_data = {
        "question": question,
        "context": context
    }
    output = question_answer_from_context_chain.invoke(input_data)
    answer = output.answer_based_on_content
    return {"answer": answer, "context": context, "question": question}

In [12]:
def show_context(context):
    """
    Display the contents of the provided context list.

    Args:
        context (list): A list of context items to be displayed.

    Prints each context item in the list with a heading indicating its position.
    """
    for i, c in enumerate(context):
        print(f"Context {i + 1}:")
        print(c)
        print("\n")

In [23]:
def test_RAG(original_query):
    """
    Test the Retrieval-Augmented Generation (RAG) process with a given query. It also prints the context chunks retrieved from the vector store.

    Args:
        original_query (str): The query to be tested against the vector store created from my Grandfather's memoir.

    Returns:
        str: The answer generated by the language model based on the retrieved context.
    """
    # Rewrite the original query to enhance its retrieval capabilities
    new_query = rewrite_query(original_query)
    # Rerank chunks from the vector store based on the enhanced query
    reranked_chunks = reranker(new_query, chunks_vector_store)
    # Extract context from the reranked chunks
    context = get_context(chunks_vector_store, reranked_chunks)
    # Initialize the language model with specific parameters
    llm = ChatOpenAI(temperature=0, model_name="gpt-4o-mini", max_tokens=2000)
    # Create a chain for question answering from context
    question_answer_from_context_chain = create_question_answer_from_context_chain(llm)
    # Answer the question using the context and the chain
    answer = answer_question_from_context(original_query, context, question_answer_from_context_chain)
    # Print the original query, enhanced query, and the response
    print("Original Query:", original_query + "\n" + "Enhanced Query:", new_query + "\n" )
    print("Response:", answer["answer"], "\n")
    # Display the context chunks
    show_context(context)
    

In [26]:
test_RAG("What is an example of one of his mom's sayings, and what did he refer to these sayings as?")

Original Query: What is an example of one of his mom's sayings, and what did he refer to these sayings as?
Enhanced Query: 1. What’s an example of my mom's sayings, and what do I call them?  
2. Can you provide an example of George's mom's sayings and what he refers to them as?  
3. What did he call his mom's sayings, and can you give an example?  

Response: One example of one of his mom's sayings is, "I told you not to climb that tree! When you fall out and break both your legs, don’t come running into me!" He referred to these sayings as "Momisms." 

Context 1:
ter if I were late. I tried to be careful carrying out 
the full pan of water, but accidents happen even to the best of us, not to mention us Klutzes. 
 
Early in life at Republic Avenue we boys were playing cowboys and cattle rustlers. Since I was oldest, I 
was the Head Cowboy. Since Chub was next oldest, he was the Cattle Rustler. Paul and Bob were part 
of my posse. After yelling and chasing Chub until we caught him, we d