In [1]:
from decouple import config
import os

from langchain.document_loaders import PyPDFLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_huggingface import HuggingFaceEmbeddings
from langchain.vectorstores.chroma import Chroma

from langchain_ollama import ChatOllama
from langchain_community.llms import LlamaCpp
from langchain_core.callbacks import CallbackManager, StreamingStdOutCallbackHandler

from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder

from langchain_community.chat_message_histories import ChatMessageHistory
from langchain_core.chat_history import BaseChatMessageHistory
from langchain_core.runnables.history import RunnableWithMessageHistory

from langchain.chains import create_history_aware_retriever, create_retrieval_chain
from langchain.chains.combine_documents import create_stuff_documents_chain

In [2]:
if 'OLLAMA_API_BASE_URL' not in os.environ:
    os.environ["OPENAI_API_KEY"] = config('OPENAI_API_KEY')
OLLAMA_API_BASE_URL = os.environ['OLLAMA_API_BASE_URL'] if 'OLLAMA_API_BASE_URL' in os.environ else config('OLLAMA_API_BASE_URL')   
LLM = os.environ['LLM'] if 'LLM' in os.environ else config('LLM')   
EMBEDDING_MODEL = os.environ['EMBEDDING_MODEL'] if 'EMBEDDING_MODEL' in os.environ else config('EMBEDDING_MODEL')  

print(f'Using LLM: {LLM}')
print(f'Using embedding model: {EMBEDDING_MODEL}')

Using LLM: deepseek-coder-v2
Using embedding model: sentence-transformers/all-MiniLM-L6-v2


In [3]:
def load_pdf_data(file_path, use_splitter=True):
    loader = PyPDFLoader(file_path)
    if use_splitter:
        text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200)
        return loader.load_and_split(text_splitter)
    else:
        return loader.load()


In [4]:
pdf_data = load_pdf_data("/Users/stolli/IT/Designing Data-Intensive Applications.pdf")

incorrect startxref pointer(1)
parsing for Object Streams


In [5]:
def create_vectorstore(pdf_data, embedding_model_name, persist_directory="chroma_db"):
    embedding = HuggingFaceEmbeddings(model_name=embedding_model_name)
    return Chroma.from_documents(pdf_data, embedding=embedding, persist_directory=persist_directory)

In [6]:
vectorstore = create_vectorstore(pdf_data, embedding_model_name=EMBEDDING_MODEL)

  from tqdm.autonotebook import tqdm, trange


In [7]:
# vectorstore.similarity_search_with_score("What is partitioning?", 5)

In [8]:
llm = ChatOllama(
    base_url=OLLAMA_API_BASE_URL, 
    model=LLM
)

In [10]:
def create_rag_chain(vectorstore, llm):
    retriever = vectorstore.as_retriever()
    contextualize_q_system_prompt = (
        "Given a chat history and the latest user question "
        "which might reference context in the chat history, "
        "formulate a standalone question which can be understood "
        "without the chat history. Do NOT answer the question, "
        "just reformulate it if needed and otherwise return it as is."
    )
    contextualize_q_prompt = ChatPromptTemplate.from_messages(
        [
            ("system", contextualize_q_system_prompt),
            MessagesPlaceholder("chat_history"),
            ("human", "{input}"),
        ]
    )
    history_aware_retriever = create_history_aware_retriever(
        llm, retriever, contextualize_q_prompt
    )

    system_prompt = """You are an assistant for question-answering tasks. 
        Use the chat history and the following pieces of retrieved context to answer the question. 
        If you don't know the answer, just say that you don't know, don't try to make up an answer.
        If you find the answer, write the answer in a concise way. 
        Context: {context}"""
    prompt = ChatPromptTemplate.from_messages(
        [
            ("system", system_prompt),
            MessagesPlaceholder("chat_history"),
            ("human", "{input}"),
        ]
    )
    question_answer_chain = create_stuff_documents_chain(llm, prompt)

    rag_chain = create_retrieval_chain(history_aware_retriever, question_answer_chain)

    # manage chat history
    store = {}
    def get_session_history(session_id: str) -> BaseChatMessageHistory:
        if session_id not in store:
            store[session_id] = ChatMessageHistory()
        return store[session_id]

    return RunnableWithMessageHistory(
        rag_chain,
        get_session_history,
        input_messages_key="input",
        history_messages_key="chat_history",
        output_messages_key="answer",
    )

In [11]:
rag_chain = create_rag_chain(vectorstore, llm)

In [12]:
rag_chain.invoke(
    {"input": "What is partitioning?"},
    config={
        "configurable": {"session_id": "abc123"}
    }
)

{'input': 'What is partitioning?',
 'chat_history': [],
 'context': [Document(metadata={'page': 220, 'source': '/Users/stolli/IT/Designing Data-Intensive Applications.pdf'}, page_content='Terminological confusion\nWhat we call a partition  here is called a shard  in MongoDB, Elas‐\nticsearch, and SolrCloud; it’s known as a region  in HBase, a tablet\nin Bigtable, a vnode  in Cassandra and Riak, and a vBucket  in\nCouchbase. However, partitioning  is the most established term, so\nwe’ll stick with that.\nNormally, partitions are defined in such a way that each piece of data (each record,\nrow, or document) belongs to exactly one partition. There are various ways of achiev‐\ning this, which we discuss in depth in this chapter. In effect, each partition is a small\ndatabase of its own, although the database may support operations that touch multi‐\nple partitions at the same time.\nThe main reason for wanting to partition data is scalability . Different partitions can\nbe placed on differ

In [13]:
rag_chain.invoke(
    {"input": "Can you repeat your answer as structured list please?"},
    config={
        "configurable": {"session_id": "abc123"}
    }
)

{'input': 'Can you repeat your answer as structured list please?',
 'chat_history': [HumanMessage(content='What is partitioning?', additional_kwargs={}, response_metadata={}),
  AIMessage(content=' Partitioning refers to dividing a large dataset into smaller parts, called partitions, which are stored and managed separately. Each piece of data (such as records, rows, or documents) belongs to exactly one partition. This allows for scalability by distributing the load across multiple nodes in a shared-nothing cluster.', additional_kwargs={}, response_metadata={})],
 'context': [Document(metadata={'page': 238, 'source': '/Users/stolli/IT/Designing Data-Intensive Applications.pdf'}, page_content='The goal of partitioning is to spread the data and query load evenly across multiple\nmachines, avoiding hot spots (nodes with disproportionately high load). This\nrequires choosing a partitioning scheme that is appropriate to your data, and reba‐\nlancing the partitions when nodes are added to or 

In [14]:
class HistoryAwareRagChain:
    def __init__(self, vectorstore, llm):
        contextualize_q_system_prompt = (
            "Given a chat history and the latest user question "
            "which might reference context in the chat history, "
            "formulate a standalone question which can be understood "
            "without the chat history. Do NOT answer the question, "
            "just reformulate it if needed and otherwise return it as is."
        )
        self._contextualize_q_prompt = ChatPromptTemplate.from_messages(
            [
                ("system", contextualize_q_system_prompt),
                MessagesPlaceholder("chat_history"),
                ("human", "{input}"),
            ]
        )

        self._history_aware_retriever = create_history_aware_retriever(
            llm, 
            vectorstore.as_retriever(), 
            self._contextualize_q_prompt
        )

        system_prompt = """You are an assistant for question-answering tasks. 
            Use the chat history and the following pieces of retrieved context to answer the question. 
            If you don't know the answer, just say that you don't know, don't try to make up an answer.
            If you find the answer, write the answer in a concise way. 
            Context: {context}"""
        self._prompt = ChatPromptTemplate.from_messages(
            [
                ("system", system_prompt),
                MessagesPlaceholder("chat_history"),
                ("human", "{input}"),
            ]
        )

        question_answer_chain = create_stuff_documents_chain(llm, self._prompt)
        self._rag_chain = create_retrieval_chain(self._history_aware_retriever, question_answer_chain)
        self._rag_chain_with_history = RunnableWithMessageHistory(
            self._rag_chain,
            self._get_session_history,
            input_messages_key="input",
            history_messages_key="chat_history",
            output_messages_key="answer",
        )

        self._store = {}
    
    def _get_session_history(self, session_id: str) -> BaseChatMessageHistory:
        if session_id not in self._store:
            self._store[session_id] = ChatMessageHistory()
        return self._store[session_id]

    def invoke(self, query, session_id):
        return self._rag_chain_with_history.invoke(
            {"input": query},
            config={
                "configurable": {"session_id": session_id}
            }
        )

In [15]:
rag_chain = HistoryAwareRagChain(vectorstore, llm)

In [16]:
rag_chain.invoke('What is partitioning?', session_id='abc123')

{'input': 'What is partitioning?',
 'chat_history': [],
 'context': [Document(metadata={'page': 220, 'source': '/Users/stolli/IT/Designing Data-Intensive Applications.pdf'}, page_content='Terminological confusion\nWhat we call a partition  here is called a shard  in MongoDB, Elas‐\nticsearch, and SolrCloud; it’s known as a region  in HBase, a tablet\nin Bigtable, a vnode  in Cassandra and Riak, and a vBucket  in\nCouchbase. However, partitioning  is the most established term, so\nwe’ll stick with that.\nNormally, partitions are defined in such a way that each piece of data (each record,\nrow, or document) belongs to exactly one partition. There are various ways of achiev‐\ning this, which we discuss in depth in this chapter. In effect, each partition is a small\ndatabase of its own, although the database may support operations that touch multi‐\nple partitions at the same time.\nThe main reason for wanting to partition data is scalability . Different partitions can\nbe placed on differ

In [17]:
rag_chain.invoke('Can you repeat the answer as structured list?', session_id='abc123')