In [None]:
!pip install python-dotenv langchain_core langchain langchain_community langchain_openai

In [8]:
from dotenv import load_dotenv
import os

load_dotenv()

True

In [15]:
from langchain_core.documents.base import Document
from typing import List, Dict, Any
from langchain_text_splitters import CharacterTextSplitter
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_community.vectorstores import FAISS
from langchain import hub
from langchain.chains import create_retrieval_chain
from langchain.chains.combine_documents import create_stuff_documents_chain
from langchain.document_loaders import PyPDFLoader
from langchain.chains import create_history_aware_retriever
from langchain_core.prompts import MessagesPlaceholder, ChatPromptTemplate
from langchain_core.messages import AIMessage, HumanMessage, SystemMessage

In [10]:
def pdf_preprocess(docs: List[Document]):
    splitter = CharacterTextSplitter.from_tiktoken_encoder(
        encoding_name="cl100k_base",
        chunk_size=4000,
        chunk_overlap=200,
    )

    text = splitter.split_documents(docs)

    return text

In [85]:
#os.environ["OPENAI_API_KEY"] = "Insert your OpenAI API key here"

class RAGPipeline:
    def __init__(self, 
                 file_path: str = None, 
                 model: str = "gpt-4o-mini", 
                 temperature: float = 0,
                 splitter_encoding_name: str = "cl100k_base", 
                 embedding_model: str = "text-embedding-3-large",
                 chunk_size: int = 4000,
                 chunk_overlap: int = 200,
                 vectorstore: FAISS = None) -> None:
        
        """
        Initialize a RAGPipeline object with a file path to a PDF document.\n
        Init params:
        - ``file_path``: **str** -> Path to the PDF document.
        - ``model``: **Optional[str]** -> OpenAI model name. *(default: "gpt-4o-mini")*
        - ``temperature``: **Optional[float]** -> OpenAI temperature. *(default: 0)*
        - ``splitter_encoding_name``: **Optional[str]** -> CharacterTextSplitter encoding name. *(default: "cl100k_base")*
        - ``embedding_model``: **Optional[str]** -> OpenAI embedding model name. *(default: "text-embedding-3-large")*
        - ``chunk_size``: **Optional[int]** -> Chunk size for the PDF document. *(default: 4000)*
        - ``chunk_overlap``: **Optional[int]** -> Chunk overlap for the PDF document. *(default: 200)*
        - ``vectorstore``: **Optional[FAISS]** -> FAISS vectorstore object. *(default: None)*
        """

        # Setup splitter hyperparameters
        self.SPLITTER_ENCODING_NAME = splitter_encoding_name
        self.CHUNK_SIZE = chunk_size
        self.CHUNK_OVERLAP = chunk_overlap
        
        # Set the OpenAI embeddings model
        self.embeddings = OpenAIEmbeddings(
                api_key=os.getenv("OPENAI_API_KEY"),
                model=embedding_model,
        )
        
        # Load the PDF document
        if file_path is not None:
            self.document = PyPDFLoader(file_path).load()

            self.text = self.pdf_preprocess()

        self.model = ChatOpenAI(
            api_key=os.getenv("OPENAI_API_KEY"),
            model=model,
            temperature=temperature,
        )

        # Create a FAISS vectorstore object
        if vectorstore is None:
            self.vectorstore = FAISS.from_documents(
                documents=self.text,
                embedding=self.embeddings,
            )
        else:
            self.vectorstore = vectorstore

        # Load the retrieval and chat models
        # Define the system basic RAG prompt for model
        self.system_prompt = (
            "You are an assistant for question-answering tasks. "
            "Use the following pieces of retrieved context to answer "
            "the question. If you don't know the answer, say that you "
            "don't know. Use three sentences maximum and keep the "
            "answer concise."
            "\n\n"
            "{context}"
        )

        # Define the ChatPromptTemplate for the model which includes SystemMessage, ChatHistory and HumanMessage
        self.prompt = ChatPromptTemplate.from_messages(
            [
                ("system", self.system_prompt),
                MessagesPlaceholder("chat_history"),
                ("human", "{input}"),
            ]
        )

        # Save conversation history and contextualize future questions
        # Define the system prompt for contextualizing previous interactions
        self.contextualize_system_prompt = (
            "Given a chat history and the latest user question "
            "which might reference context in the chat history, "
            "formulate a standalone question which can be understood "
            "without the chat history. Do NOT answer the question, "
            "just reformulate it if needed and otherwise return it as is."
        )

        self.contextualize_prompt = ChatPromptTemplate.from_messages(
            [
                ("system", self.contextualize_system_prompt), 
                MessagesPlaceholder("chat_history"),
                ("human", "{input}"),
            ]
        )

        self.contextualized_model = self.model.with_config(tags=["contextualized_model"])

        self.history_aware_retriever = create_history_aware_retriever(
            self.contextualized_model,
            self.vectorstore.as_retriever(),
            self.contextualize_prompt,
        )

        self.combine_docs_chain = create_stuff_documents_chain(self.model, self.prompt)
        self.chain = create_retrieval_chain(self.history_aware_retriever, self.combine_docs_chain)

        self.chat_history = []

    
    def invoke_(self, query: str):
        """
        Execute the RAGPipeline with a query.\n
        **Method:**
        - Execute the RAGPipeline chain with the query.
        - Return the response.
        """
        response = self.chain.invoke({"input": query, "chat_history": self.chat_history})
        self.chat_history.extend(
            [
                HumanMessage(content=query),
                AIMessage(content=response["answer"]),
            ]
        )
        return response
    
    def stream_(self, query: str):
        """
        Execute the RAGPipeline with a query (stream enabled).\n
        **Method:**
        - Execute the RAGPipeline chain with the query.
        - Stream the response.
        """
        
        response = []
        
        for chunk in self.chain.stream({"input": query, "chat_history": self.chat_history}):
            if 'answer' in chunk:
                response.append(chunk['answer'])

                print(f"{chunk['answer']}", end="")
                                
        return "".join(response)
    
    async def astream_(self, query: str):
        """
        Execute the RAGPipeline with a query (async stream enabled).\n
        **Method:**
        - Execute the RAGPipeline chain with the query.
        - Stream the response asynchronously.
        """
        response = []
        async for event in self.chain.astream_events(
            {"input": query, "chat_history": self.chat_history}, 
            version="v2",
        ):
            if (event["event"] == "on_chat_model_stream" and "contextualized_model" in event["tags"]):
                response_chunk = event["data"]["chunk"]
                response.extend(response_chunk.content)
                print(f"{response_chunk.content}", end="")
                                
        return "".join(response)
    

    @classmethod
    def from_local(cls, 
                   folder_path: str,
                   model: str = "gpt-4o-mini",
                   temperature: float = 0,
                   embedding_model: str = "text-embedding-3-large"):
        """
        Load a RAGPipeline from a local **FAISS vectorstore** folder path.\n
        Example FAISS vectorstore folder structure:
        ```
        ./src
        |___vectorstore:
        |   |   index.faiss
        |   |   index.pkl
        ```
        """
        
        embeddings = OpenAIEmbeddings(
            api_key=os.getenv("OPENAI_API_KEY"),
            model=embedding_model,
        )

        vectorstore = FAISS.load_local(
            folder_path=folder_path,
            embeddings=embeddings,
            allow_dangerous_deserialization=True,
        )

        return cls(vectorstore=vectorstore, 
                   model=model, 
                   temperature=temperature, 
                   embedding_model=embedding_model)

    # Preprocess the PDF document
    def pdf_preprocess(self) -> List[Document]:
        """
        Preprocess the PDF document using CharacterTextSplitter (WIP).\n
        **Method:**
        - Initialize a [CharacterTextSplitter](https://python.langchain.com/api_reference/text_splitters/character/langchain_text_splitters.character.CharacterTextSplitter.html) object.
        - Parse the **Document** Object to the splitter with default params: ``encoding_name="cl100k_base"``, ``chunk_size=4000`` and ``overlap=200``.

        **Output**
        - **List[Document]**: List of Document objects.

        **Todo:**
        - Implement a way to parse tables and equations from the PDF document.
        """
        splitter = CharacterTextSplitter.from_tiktoken_encoder(
            encoding_name=self.SPLITTER_ENCODING_NAME,
            chunk_size=self.CHUNK_SIZE,
            chunk_overlap=self.CHUNK_OVERLAP,
        )

        splits = splitter.split_documents(self.document)

        return splits
    
    # Save the FAISS vectorstore to a local folder
    def save_vectorstore(self, path: str) -> str:
        """
        Save the FAISS vectorstore to a local folder.
        """
        self.vectorstore.save_local(path)
        return f"Saved vectorstore to {path}"

In [68]:
retriever = RAGPipeline.from_local("./vectorstore")

In [10]:
retriever.save_vectorstore("./vectorstore")

'Saved vectorstore to ./vectorstore'

In [88]:
# QA with output as dict (metadata)
invoke_response = retriever.invoke_("Sách này là gì?")
invoke_response

{'input': 'Sách này là gì?',
 'chat_history': [HumanMessage(content='Sách này là gì?', additional_kwargs={}, response_metadata={}),
  AIMessage(content='Sách này là giáo trình "Phân tích báo cáo tài chính" do Nhà xuất bản Đại học Kinh tế Quốc dân phát hành năm 2013. Chủ biên của giáo trình là PGS. TS. Nguyễn Năng Phúc. Nó được sử dụng trong lĩnh vực kế toán và tài chính.', additional_kwargs={}, response_metadata={}),
  HumanMessage(content='Sách này là gì?', additional_kwargs={}, response_metadata={}),
  AIMessage(content='Sách này là giáo trình "Phân tích báo cáo tài chính" do Nhà xuất bản Đại học Kinh tế Quốc dân phát hành năm 2013. Chủ biên của giáo trình là PGS. TS. Nguyễn Năng Phúc. Nó phục vụ cho việc học tập và nghiên cứu trong lĩnh vực kế toán và tài chính.', additional_kwargs={}, response_metadata={})],
 'context': [Document(metadata={'source': './docs.pdf', 'page': 360}, page_content='GIÁO TRÌNH\nP H Â N  TÍCH  B Á O  C Á O  T À I C H ÍN H\nNHÀ XUẤT BẢN ĐẠI HỌC KINH TẾ QUỐC D

In [86]:
# QA with output as dictionary
stream_response = retriever.stream_("Sách này nói về gì?")

Sách này là giáo trình "Phân tích báo cáo tài chính", nhằm cung cấp thông tin cần thiết để đánh giá sức mạnh tài chính, khả năng sinh lời và triển vọng phát triển của doanh nghiệp. Nó phục vụ cho nhiều đối tượng như nhà quản lý, nhà đầu tư, và sinh viên kinh tế, với nội dung được biên soạn dựa trên tài liệu trong nước và quốc tế.