# Custom RAG Pipeline w/ History Implementation

This program will be adding history to our RAG pipeline, modifying history from built in RAG history retrievers to creating custom chains with the "|" operator. Creating a custom retriever allows us better control over context injection, injecting in smaller chunks with metadata chunks.

In [None]:
import os
import faiss
from langchain_community.document_loaders import PyPDFLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_huggingface import HuggingFaceEmbeddings
from langchain_ollama import OllamaLLM

In [None]:
MODEL_NAME = "llama3.2"
llm = OllamaLLM(model= MODEL_NAME)

In [None]:
def load_docs(pdf_folder = "./pdf_folder"):
    
    document_loader = []

    for root, dirs, files in os.walk(pdf_folder):
        for file in files:
            if file.lower().endswith(".pdf"):
                full_path = os.path.join(root, file)
                document_loader.append(full_path)

    return document_loader

In [None]:
document_loader = load_docs()
document_loader

In [None]:
from sentence_transformers import SentenceTransformer

model = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2")
model.save("./local_models/all-MiniLM-L6-v2")

In [None]:
embedding_model ="./local_models/all-MiniLM-L6-v2" #embedding matrix model

def embed_splitting(document_loader, embedding_model):
    embeddings = HuggingFaceEmbeddings(model = embedding_model, encode_kwargs={'normalize_embeddings': True})

    doc_store = []
    for file_path in document_loader:
        loader = PyPDFLoader(file_path)
        docs = loader.load()

        # Clean the metadata: keep only the filename, not full path
        for doc in docs:
            doc.metadata["source"] = os.path.basename(file_path)

        doc_store += docs


    text_splitter = RecursiveCharacterTextSplitter.from_tiktoken_encoder(
        chunk_size = 400,
        chunk_overlap = 64
        )
    
    #Make splits
    splits = text_splitter.split_documents(doc_store)

    return embeddings, splits

In [None]:
import os
import pickle

SPLITS_CACHE_PATH = "splits_cache.pkl"

def get_splits(document_loader, embedding_model):
    if os.path.exists(SPLITS_CACHE_PATH):
        print("Loading cached splits from disk...")
        with open(SPLITS_CACHE_PATH, "rb") as f:
            splits = pickle.load(f)
        embeddings = HuggingFaceEmbeddings(model=embedding_model, encode_kwargs={'normalize_embeddings': True})
    else:
        print("Creating new splits...")
        embeddings, splits = embed_splitting(document_loader, embedding_model)
        with open(SPLITS_CACHE_PATH, "wb") as f:
            pickle.dump(splits, f)
    return embeddings, splits

In [None]:
embeddings, splits = get_splits(document_loader, embedding_model)

In [None]:
example_split = splits[106]
example_split

In [None]:
metadata = example_split.metadata
for key in metadata:
    print(f"{key}: {metadata[key]}")

In [None]:
embeddings

In [None]:
len(splits)

In [None]:
from langchain_community.docstore.in_memory import InMemoryDocstore
from langchain_community.vectorstores import FAISS

In [None]:
dim = len(embeddings.embed_query("test sentence"))
index = faiss.IndexFlatL2(dim)

if os.path.exists("faiss_index"):
    print("Loading FAISS index from disk...")
    vector_store = FAISS.load_local("faiss_index", embeddings=embeddings, allow_dangerous_deserialization=True)
else:
    print("Building FAISS index from scratch...")
    vector_store = FAISS(
        embedding_function=embeddings,
        index=index,
        docstore=InMemoryDocstore(),
        index_to_docstore_id={},
    )
    vector_store.add_documents(splits)
    vector_store.save_local("faiss_index")

In [None]:
# create the retriever object once
semantic_retriever = vector_store.as_retriever(search_kwargs={'k': 4})

# define your function to query it
def semantic_search(retriever_obj, input_context: str):
    return retriever_obj.invoke(input_context)

# call the function with retriever and query string
results = semantic_search(semantic_retriever, "Explain transformers")

In [None]:
results[0].metadata

In [None]:
for i in range(len(results)):
    source_data = results[i].metadata["source"]
    page = results[i].metadata["page"]
    page_content = results[i].page_content

    print(f"This is chunk number {i+1}.\n\n The source is {source_data}, found on page number {page}. \n\n The page content is {page_content} \n")

In [None]:
from langchain_community.retrievers import BM25Retriever

bm25_retriever = BM25Retriever.from_documents(splits)
bm25_retriever.k = 4

def keyword_search(retriever_obj, input_context: str):
    return retriever_obj.invoke(input_context)

In [None]:
keyword_results = keyword_search(bm25_retriever, "Explain transformers")

In [None]:
keyword_results 

In [None]:
for i in range(len(keyword_results)):
    source_data = keyword_results[i].metadata["source"]
    page = keyword_results[i].metadata["page"]
    page_content = keyword_results[i].page_content

    print(f"This is chunk number {i+1}.\n\n The source is {source_data}, found on page number {page}. \n\n The page content is {page_content} \n")

In [None]:
from langchain.retrievers import EnsembleRetriever

ensemble_retriever = EnsembleRetriever(retrievers= [semantic_retriever, bm25_retriever], weights = [0.5, 0.5])

In [None]:
combined_results = ensemble_retriever.invoke("Explain transformers")

In [None]:
combined_results

In [None]:
for i in combined_results:

    print(i.metadata)

In [None]:
i = 1
result_list = [combined_results[0]]
seen_pages = {combined_results[0].metadata["page_label"]}

while i < len(combined_results):
    metadata = combined_results[i].metadata
    page_label = metadata["page_label"]

    if page_label in seen_pages:
        i += 1  # You MUST increment i here
        continue

    result_list.append(combined_results[i])
    seen_pages.add(page_label)
    i += 1

In [None]:
result_list

In [None]:
input_template = """You are an expert assistant answering based only on the provided context.

The retrieved documents have been joined together and are separated by "Chunk_n", where n is the chunk number. Here is the context:

{context}

Use **ALL** relevant information above to answer the question below. If the answer isn't found in the chunks, say:
"I cannot answer this question because the necessary information was not found in the provided documents."

❗Do not cite or mention any source files or page numbers in the body of your answer.

At the end of your answer, add a single line in this format:

Information was pulled from: <source_file_1>: pages <comma-separated page numbers>; <source_file_2>: pages <...>; ...

Use only one entry per document, listing all unique page numbers where information was pulled from.
Do not mention metadata_n, chunk_n, or include references in the main answer.

Metadata:
{metadata}

Question: {question}
"""

In [None]:
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.runnables import RunnableLambda, RunnableParallel

def content_parser(results):
    context_string = ""
    for i in range(len(results)):

        context_string += f"\nChunk_{i+1}:\n\n{results[i].page_content}\n\n\n"

    return context_string 

chunks = content_parser(result_list)

In [None]:
chunk_runnable = RunnableLambda(content_parser)

In [None]:
def metadata_parser(results):
    metadata_files = {}

    for doc in results:
        source = doc.metadata["source"]
        page = doc.metadata["page_label"]

        if source in metadata_files:
            if page not in metadata_files[source]:
                metadata_files[source].append(page)
        else:
            metadata_files[source] = [page]

    metadata_string = "This file uses the following sources:"

    for key in metadata_files:
        pages = ", ".join(str(i) for i in metadata_files[key])
        metadata_string += f"\n\n{key}, pages {pages}"
        

    return metadata_string

In [None]:
metadata_runnable = RunnableLambda(metadata_parser)
print(metadata_runnable.invoke(result_list))

In [None]:
metadata = metadata_parser(result_list)

In [None]:
print(metadata)

In [None]:
from embed_splitting import load_docs, get_splits

In [None]:
prompt_template = ChatPromptTemplate(
    [
        ("system", input_template),
        MessagesPlaceholder("chat_history"),
        ("human", "{input}"),
    ]
)

chain = prompt_template | llm

In [None]:
contextualize_q_system_prompt = """
You are a question reformulator in a retrieval-based QA system.

Given the latest user question and the preceding chat history, your task is:

1. If the question is fully self-contained — i.e., it is grammatically and semantically complete and understandable on its own — return it **exactly as-is**.

2. If the question is ambiguous without the chat history or depends on previous turns, rewrite it into a fully standalone, self-contained question.

⚠️ Do NOT answer the question.  
⚠️ Do NOT add any preamble, commentary, or extra explanation.  
⚠️ Output **only** the final question text (either original or reformulated).

Your job is to produce a single, context-independent question if needed — nothing else.
"""

contextual_prompt = ChatPromptTemplate(
    [
        ("system", contextualize_q_system_prompt),
        MessagesPlaceholder("chat_history"),
        ("human", "{input}"),
    ]
)


In [None]:
answer1 = chain.invoke(
    {'context': chunks,
     'metadata': metadata,
     'chat_history': [],
     'input': "Explain transformers",
     'question': "Explain transformers"}
)

In [None]:
answer1

In [None]:
a = RunnableParallel({'context': chunk_runnable, 'metadata': metadata_runnable})
b = a.invoke(result_list)

In [None]:
b

In [None]:
inputs = ({
    **b,
    'chat_history': [],
    'input': 'Explain transformers',
    'question': 'Explain transformers'
})

In [None]:
inputs

In [None]:
from langchain_core.chat_history import BaseChatMessageHistory
from langchain_community.chat_message_histories import ChatMessageHistory
from langchain_core.runnables.history import RunnableWithMessageHistory

store = {}

def get_session_history(session_id: str) -> BaseChatMessageHistory:
    if session_id not in store:
        store[session_id] = ChatMessageHistory()
    return store[session_id]

In [None]:
answers = ensemble_retriever.invoke("Explain transformers")

metadata_dict = {}

for i in range(len(answers)):
    metadata_dict[f"metadata {i}"] = answers[i].metadata

metadata_dict

In [None]:
history_aware_chain = RunnableWithMessageHistory(
    chain,
    get_session_history=get_session_history,
    input_messages_key="input",
    history_messages_key="chat_history"
)

In [None]:
get_session_history('abb73283').messages

In [None]:
rephrase_pipe

In [None]:
import uuid

def temp_pipeline():

    session_id = str(uuid.uuid4())[:8]
    print(f"Session ID: {session_id}")
    
    history = get_session_history(session_id)

    print(f"\nModel {MODEL_NAME} has been initiated with memory. Please feel free to ask questions or type 'exit' to quit.")
    while True:
        
        user_input = input("You: ")
        if user_input.lower() in ["exit", "quit"]:
            print("Session ended. Have a good day.")
            break

        print(f"{user_input}\n\n\n")
        
        recontextual_chain = contextual_prompt | llm
        rephrased_question = recontextual_chain.invoke(
            {'chat_history': history.messages,
             'input': user_input})
        
        print(f"{rephrased_question} \n\n\n")

        context_injection = (ensemble_retriever | RunnableParallel({'context': chunk_runnable, 'metadata': metadata_runnable})).invoke(rephrased_question)

        print("Metadata:\n", context_injection['metadata'])
        
        response = history_aware_chain.invoke(
            {**context_injection,
            'input': user_input,
            'question': rephrased_question},
            config={"configurable": {"session_id": session_id}}
        )
        
        print(f"LLM: {response}\n")

In [None]:
temp_pipeline()

In [None]:
import uuid

history1 = []

In [None]:

def pipeline():

    session_id = str(uuid.uuid4())[:8]
    print(f"Session ID: {session_id}")
    
    history = get_session_history(session_id)

    print(f"\nModel {MODEL_NAME} has been initiated with memory. Please feel free to ask questions or type 'exit' to quit.")
    while True:
        
        user_input = input("You: ")
        if user_input.lower() in ["exit", "quit"]:
            print("Session ended. Have a good day.")
            break

        print(f"{user_input}\n\n\n")
        
        MAX_HISTORY_TURNS = 1
        recontextual_chain = contextual_prompt | llm
        rephrased_question = recontextual_chain.invoke(
            {'chat_history': history.messages[-MAX_HISTORY_TURNS:],
             'input': user_input})
        
        print(f"{rephrased_question} \n\n\n")

        context_injection = (ensemble_retriever | RunnableParallel({'context': chunk_runnable, 'metadata': metadata_runnable})).invoke(rephrased_question)

        expected_context = ensemble_retriever.invoke(user_input)
        rephrased_context = ensemble_retriever.invoke(rephrased_question)

        for i in expected_context:
            source = i.metadata["source"]
            page_label = i.metadata["page_label"]
            print(f"Expected metdata is:\n\n {source}, page number {page_label}")

        for i in rephrased_context:
            source = i.metadata["source"]
            page_label = i.metadata["page_label"]
            print(f"Rephrased question metdata is:\n\n {source}, page number {page_label}")
        
        print(f"Metadata:\n, {context_injection['metadata']}\n\n")
        
        response = history_aware_chain.invoke(
            {**context_injection,
            'input': user_input,
            'question': rephrased_question},
            config={"configurable": {"session_id": session_id}}
        )
        
        print(f"LLM: {response}\n")


In [None]:
pipeline()

In [None]:
get_session_history('37b5e65a').messages

In [None]:
from docx import Document
from docx.shared import Inches

document = Document()

document.add_heading('Document Title', 0)

p = document.add_paragraph('A plain paragraph having some ')
p.add_run('bold').bold = True
p.add_run(' and some ')
p.add_run('italic.').italic = True

document.add_heading('Heading, level 1', level=1)
document.add_paragraph('Intense quote', style='Intense Quote')

document.add_paragraph(
    'first item in unordered list', style='List Bullet'
)
document.add_paragraph(
    'first item in ordered list', style='List Number'
)


records = (
    (3, '101', 'Spam'),
    (7, '422', 'Eggs'),
    (4, '631', 'Spam, spam, eggs, and spam')
)

table = document.add_table(rows=1, cols=3)
hdr_cells = table.rows[0].cells
hdr_cells[0].text = 'Qty'
hdr_cells[1].text = 'Id'
hdr_cells[2].text = 'Desc'
for qty, id, desc in records:
    row_cells = table.add_row().cells
    row_cells[0].text = str(qty)
    row_cells[1].text = id
    row_cells[2].text = desc

document.add_page_break()

document.save('demo.docx')

In [None]:
from prompts import chunk_runnable

In [None]:
import pickle

with open('session_store.pkl', 'rb') as f:
    data = pickle.load(f)

print(data)

In [None]:
from session_history import get_session_history

In [1]:
from session_history import get_session_history, list_sessions, load_session_messages, store

list_sessions()

['d1723d2a', 'a0bfaac3']

In [3]:
a = load_session_messages('a0bfaac3')
print(a)

Human: Explain transformers
AI: Transformers are electrical devices that transfer energy between two or more circuits through electromagnetic induction. They consist of two or more coils of wire, known as windings, which are wrapped around a common magnetic core.

The basic principle of a transformer is based on the concept of electromagnetic induction, where a changing magnetic field induces an electric field in the surrounding conductors. In a transformer, the primary coil (connected to the power source) and secondary coil (connected to the load) are wound around a common magnetic core.

When an alternating current flows through the primary coil, it generates a changing magnetic field, which induces an electromotive force (EMF) in the secondary coil. The direction of the induced EMF is such that it opposes the change in the magnetic field, creating a net flux that drives an electric current in the secondary coil.

The ratio of the voltage and current between the primary and secondary