In [2]:
print("success!!")

success!!


In [49]:
from dotenv import load_dotenv
load_dotenv()

True

In [1]:
from typing import List, Tuple, Dict

In [51]:
from langchain_community.document_loaders import PyPDFLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_chroma import Chroma
from langchain.retrievers import BM25Retriever
from langchain_ollama import OllamaEmbeddings, ChatOllama
from langchain.retrievers import EnsembleRetriever
from langchain.retrievers import ParentDocumentRetriever
from langchain.storage import InMemoryStore
from langgraph.graph import StateGraph, Graph
from concurrent.futures import ThreadPoolExecutor
from langchain_core.messages import HumanMessage
from langchain_openai import ChatOpenAI


In [23]:
class DataIngestionPipeline:
    def __init__(self,chunk_size=2000) -> None:
        self.chunk_size = chunk_size 
        self.text_splitters = RecursiveCharacterTextSplitter(chunk_size=self.chunk_size, chunk_overlap=100)
        self.embeddings = OllamaEmbeddings(model="phi3")

    def process_document(self, doc) -> List:
        chunks = self.text_splitters.split_documents([doc])
        return chunks
    
    def ingect(self, path:str) -> Tuple[Chroma, BM25Retriever]:
        loader = PyPDFLoader(path)
        docs = loader.load()
        chunks = []
        for doc in docs:
            chunk = self.process_document(doc)
            chunks.extend(chunk)

        vectorstore = Chroma.from_documents(documents=chunks, embedding=self.embeddings)
        bm25retriever = BM25Retriever.from_documents(documents=chunks)
        return vectorstore, bm25retriever

In [6]:
class GraphState(Dict):
    question: str = ""
    docs: List = []
    answer: str = ""

In [39]:
class QAPipeline:
    def __init__(self, vectorstore: Chroma, bm25retriever: BM25Retriever) -> None:
        self.vectorstore = vectorstore
        self.bm25retriever = bm25retriever
        self.llm = ChatOllama(model="phi3")
        self.retriever = EnsembleRetriever(
            retrievers=[
                vectorstore.as_retriever(search_kwargs={"k": 2}),
                self.bm25retriever
            ],
            weights=[0.7,0.3]
        )

        self.parent_retriever = ParentDocumentRetriever(
            vectorstore=self.vectorstore, 
            docstore=InMemoryStore(), 
            child_splitter = RecursiveCharacterTextSplitter(chunk_size=400, chunk_overlap=10),
            parent_splittr = RecursiveCharacterTextSplitter(chunk_size= 2000, chunk_overlap=100)
        )

    def __process_retrieved_documents(self, docs) -> str:
        return "\n\n".join([doc.page_content for doc in docs])

    def create_graph(self) -> Graph:
        
        def retriever(state):
            question = state["question"]
            with ThreadPoolExecutor() as executor:
                results = list( 
                    executor.map(
                        lambda r : r.get_relevant_documents(question),
                        [self.retriever, self.parent_retriever]

                    )
                )

            all_docs = list( {doc.page_content: doc  for doc in results[0] + results[1]}.values())

            return {"docs": all_docs, **state}
        
        def generate_answer(state):
            context = self.__process_retrieved_documents(state["docs"])
            prompt = f""" Answer the question based on the following context:
                Context: {context}
                Question: {state["question"]}
                Answer: 

             """
            
            messages = [HumanMessage(content=prompt)]
            resulst = self.llm.invoke(messages)
            return {"answer": resulst, **state}
        
        workflow = StateGraph(GraphState)
        workflow.add_node("retriever", retriever)
        workflow.add_node("generator", generate_answer)

        workflow.set_entry_point("retriever")
        workflow.add_edge("retriever", "generator")
        workflow.set_finish_point("generator")

        graph = workflow.compile()

        return graph



In [35]:
data_ingestion_pipeline = DataIngestionPipeline()
doc_path = "/Users/mohamednajiaboo/Desktop/DocumentChat/Document_Chat_Langchain/datas/HR-Policy-DHLL.pdf"
vectorstore, bm25retriever = data_ingestion_pipeline.ingect(path=doc_path)

In [40]:
qa_pipeline = QAPipeline(vectorstore=vectorstore, bm25retriever=bm25retriever)

In [41]:
graph = qa_pipeline.create_graph()

In [42]:
result = graph.invoke({"question": "What is office timing?"})

In [43]:
result

{'question': 'What is office timing?',
 'docs': [Document(metadata={'page': 0, 'source': '/Users/mohamednajiaboo/Desktop/DocumentChat/Document_Chat_Langchain/datas/HR-Policy-DHLL.pdf'}, page_content='l. Leave to be applied three days in advance for both in the cases of casual leave and privilege leave. \n \n \nTypes of Leave \n \nNature of Leave Eligible Leave In calendar year Remarks \nCasual Leave 12 Privilege leaves \navailable after one \nyear for Confirmed \nEmployees \nPrivilege Leave 12 \nSick Leave 12 \nMaternity leave As per Govt Rule \nCompensatory off   \n \n\uf0d8 In case of exigencies whereas a staff member worked on holidays / restricted holidays, compensatory off is \npermitted on later date \n\uf0d8 In case of exigencies respective department head can permit the late coming/ early going without regard to \npunching  \n\uf0d8 Additional Leave apart above leaves will be treated as Loss of pay, maximum 15 days LOP allowed in very \nspecial case with higher authority’s appr

In [48]:
result["answer"].content

'The working hours at the office are from 9.30 am to 5.30 pm, with a lunch break taken by staff in turn lasting for about an hour and thirty minutes without affecting customer service. The Biometric punching should be done promptly within these timings or as late as up to ten minutes past time if unforeseen reasons arise - though such instances require compensation preferably on the same day, failure of which leads to treated as half-day leave. \n\nLeave timing and conditions for early departures are also specified in case an employee needs it due to exigencies or official work outside but must get prior approval from their reporting authority with a condition that such absences should be compensated by working extra time within the same month, as needed.'