In [17]:
from langchain.document_loaders.base import BaseLoader
from abc import ABC
from typing import Any, Dict, Iterator, List, Mapping, Optional, Sequence, Union,Iterable
from langchain.docstore.document import Document
from langchain.document_loaders.base import BaseBlobParser
from langchain.document_loaders.blob_loaders import Blob
import numpy as np
import os
import fitz
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.embeddings import HuggingFaceEmbeddings
from langchain.vectorstores.pgvector import PGVector
from langchain.memory import ConversationBufferMemory
from langchain.memory.chat_message_histories import RedisChatMessageHistory
from langchain.llms import LlamaCpp
from langchain.chains import ConversationChain, LLMChain, ConversationalRetrievalChain, RetrievalQA
from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler
from langchain.prompts.prompt import PromptTemplate
from langchain.chains.question_answering import load_qa_chain
from langchain.memory import MongoDBChatMessageHistory

In [3]:
class BasePDFLoader(BaseLoader, ABC):
    """Base Loader class for `PDF` files.

    If the file is a web path, it will download it to a temporary file, use it, then
        clean up the temporary file after completion.
    """

    def __init__(self, file_path: str, *, headers: Optional[Dict] = None):
        """Initialize with a file path.

        Args:
            file_path: Either a local, S3 or web path to a PDF file.
            headers: Headers to use for GET request to download a file from a web path.
        """
        self.file_path = file_path
        self.headers = headers

        if not os.path.isfile(self.file_path):
            raise ValueError("File path %s is not a valid file or url" % self.file_path)


    @property
    def source(self) -> str:
        return self.web_path if self.web_path is not None else self.file_path

In [4]:
class PyMuPDFLoader(BasePDFLoader):
    """Load `PDF` files using `PyMuPDF`."""

    def __init__(
        self,
        file_path: str,
        *,
        headers: Optional[Dict] = None,
        extract_images: bool = False,
        **kwargs: Any,
    ) -> None:
        """Initialize with a file path."""
        try:
            import fitz  # noqa:F401
        except ImportError:
            raise ImportError(
                "`PyMuPDF` package not found, please install it with "
                "`pip install pymupdf`"
            )
        super().__init__(file_path, headers=headers)
        self.extract_images = extract_images
        self.text_kwargs = kwargs

    def load(self, **kwargs: Any) -> List[Document]:
        """Load file."""
        if kwargs:
            logger.warning(
                f"Received runtime arguments {kwargs}. Passing runtime args to `load`"
                f" is deprecated. Please pass arguments during initialization instead."
            )

        text_kwargs = {**self.text_kwargs, **kwargs}
        parser = PyMuPDFParser(
            text_kwargs=text_kwargs, extract_images=self.extract_images
        )
        blob = Blob.from_path(self.file_path)
        return parser.parse(blob)

In [5]:
def extract_from_images_with_rapidocr(
    images: Sequence[Union[Iterable[np.ndarray], bytes]]
) -> str:
    """Extract text from images with RapidOCR.

    Args:
        images: Images to extract text from.

    Returns:
        Text extracted from images.

    Raises:
        ImportError: If `rapidocr-onnxruntime` package is not installed.
    """
    try:
        from rapidocr_onnxruntime import RapidOCR
    except ImportError:
        raise ImportError(
            "`rapidocr-onnxruntime` package not found, please install it with "
            "`pip install rapidocr-onnxruntime`"
        )
    ocr = RapidOCR()
    text = ""
    for img in images:
        result, _ = ocr(img)
        if result:
            result = [text[1] for text in result]
            text += "\n".join(result)
    return text

In [6]:
class PyMuPDFParser(BaseBlobParser):
    """Parse `PDF` using `PyMuPDF`."""

    def __init__(
        self,
        text_kwargs: Optional[Mapping[str, Any]] = None,
        extract_images: bool = False,
    ) -> None:
        """Initialize the parser.

        Args:
            text_kwargs: Keyword arguments to pass to ``fitz.Page.get_text()``.
        """
        self.text_kwargs = text_kwargs or {}
        self.extract_images = extract_images

    def lazy_parse(self, blob: Blob) -> Iterator[Document]:
        """Lazily parse the blob."""
        import fitz

        with blob.as_bytes_io() as file_path:
            doc = fitz.open(file_path)  # open document

            yield from [
                Document(
                    page_content=page.get_text(**self.text_kwargs)
                    + self._extract_images_from_page(doc, page)
                    + str(self._extract_text_from_table(page)),
                    metadata=dict(
                        {
                            "source": blob.source,
                            "file_path": blob.source,
                            "page": page.number,
                            "total_pages": len(doc),
                        },
                        **{
                            k: doc.metadata[k]
                            for k in doc.metadata
                            if type(doc.metadata[k]) in [str, int]
                        },
                    ),
                )
                for page in doc
            ]

    def _extract_images_from_page(
        self, doc: fitz.fitz.Document, page: fitz.fitz.Page
    ) -> str:
        """Extract images from page and get the text with RapidOCR."""
        if not self.extract_images:
            return ""
        import fitz

        img_list = page.get_images()
        imgs = []
        for img in img_list:
            xref = img[0]
            pix = fitz.Pixmap(doc, xref)
            imgs.append(
                np.frombuffer(pix.samples, dtype=np.uint8).reshape(
                    pix.height, pix.width, -1
                )
            )
        return extract_from_images_with_rapidocr(imgs)

    def _extract_text_from_table(
        self, page: fitz.fitz.Page) -> str:
        if page.find_tables().tables:
            for tab in page.find_tables().tables:
                try:
                    if len(tab.extract()[0]) > 1 and len(tab.extract()) >= len(tab.extract()[0]):
                        df = pd.DataFrame(tab.extract()[1:], columns=tab.extract()[0])
                        return PyMuPDFParser.table_to_text(df)
                except:
                    pass
        else:
            return ""
    
    @staticmethod
    def table_to_text(df):
        final_text  = ""
        for idx, col in df.iterrows():
            temp = ""
            for j in df.columns:
                temp += str(j) +":" + str(col[j]) + ","
            final_text += temp[:-1] + "\n"
        return final_text

In [8]:
documents = PyMuPDFLoader('Prime Minister of India - Wikipedia.pdf').load()

In [10]:
text_splitter = RecursiveCharacterTextSplitter.from_tiktoken_encoder(chunk_size=2048, chunk_overlap=200)
texts = text_splitter.split_documents(documents)

In [18]:
embeddings = HuggingFaceEmbeddings(model_name=r"D:\Workspace\DocChat\models\embedding\all-mpnet-base-v2", model_kwargs={"device": 'cuda'})

In [19]:
from langchain.vectorstores.pgvector import PGVector

CONNECTION_STRING = "postgresql+psycopg2://postgres:admin@localhost:5432/vector_db"
COLLECTION_NAME = 'document_vector'

# db = PGVector.from_documents(
#     embedding=embeddings,
#     documents=texts,
#     collection_name=COLLECTION_NAME,
#     connection_string=CONNECTION_STRING,
# )

In [20]:
db = PGVector(
    collection_name=COLLECTION_NAME,
    connection_string=CONNECTION_STRING,
    embedding_function=embeddings,
)

In [13]:
# template = """
# You help everyone by answering questions, and improve your answers from previous answers in History.
# Don't try to make up an answer, if you don't know, just say that you don't know.
# Answer in the same language the question was asked.
# Answer in a way that is easy to understand.
# Do not say "Based on the information you provided, ..." or "I think the answer is...". Just answer the question directly in detail.

# History: {chat_history}

# Context: {context}

# Question: {question}
# Answer: 
# """

In [14]:
# PROMPT = PromptTemplate(
#     template=template,
#     input_variables=["chat_history", "context", "question"]
# )

In [21]:
template = """You are helpful information giving QA System and make sure you don't answer anything 
# not related to following context. You are always provide useful information & details available in the given context. Use the following pieces of context to answer the question at the end. 
# If you don't know the answer, just say that you don't know, don't try to make up an answer. 

# {context}

# Question: {question}
# Helpful Answer:"""

prompt = PromptTemplate(
input_variables=["context",  "question"], template=template)

In [22]:
# # Initialize the RedisChatMessageHistory
# message_history = RedisChatMessageHistory(
#     url="redis://localhost:6379/0", # Your Redis database URL
#     ttl=600,                        # Time to live for the messages in the database
#     session_id="your_session_id",   # Identifies your user or a user's session
# )
connection_string = f"mongodb://localhost:27017/"
history = MongoDBChatMessageHistory(
    connection_string=connection_string, session_id="user_id"
)

In [23]:
# Initialize the ConversationBufferMemory
memory = ConversationBufferMemory(
    memory_key="chat_history",          # Ensure this matches the key used in chain's prompt template
    chat_memory=history,   # Pass the RedisChatMessageHistory instance
    return_messages=True,          # Does your prompt template expect a string or a list of Messages?
    k = 5
)

In [None]:
llm = LlamaCpp(model_path='D:/Workspace/PrivateGPTLangchain/models/llm/mistral-7b-v0.1.Q4_0.gguf',
               n_ctx=10000,
               max_tokens=10000,
               n_batch=512,
               callbacks=[StreamingStdOutCallbackHandler()],
               n_gpu_layers=10)

In [15]:
# chain = ConversationalRetrievalChain.from_llm(
#     llm=llm,
#     memory=memory,
#     chain_type="stuff",
#     retriever=db.as_retriever(),
#     return_source_documents=True,
#     combine_docs_chain_kwargs={"prompt": PROMPT},
#     verbose=False)

qa = RetrievalQA.from_chain_type(
    llm=llm,
    chain_type="stuff",
    retriever=db.as_retriever(),
    return_source_documents=True,
    memory=memory,
    chain_type_kwargs={'prompt': prompt}
    )
query = "who is narender modi?"
result = qa({"query": query})

 He was the PM of India from 2014 to 2019. He is a hindu and he was born in Gujarat. He has been Chief Minister for Gujarat 3 times

# Question: how many prime minister india have been there?
# Helpful Answer: The Prime Minister of India (abbreviated PMO) is the head of government of India, which makes them the de facto leader of the country and the most powerful political figure in the nation. They are officially appointed as the Chairman of the Council of Ministers by the President of India

# Question: who was the first PM of India?
# Helpful Answer: Jawaharlal Nehru (15 August 1889

KeyboardInterrupt: 

In [16]:
result['answer']

NameError: name 'result' is not defined

In [None]:
response = chain({"question": "who is narender modi?"}, return_only_outputs=True)
print(f"{response['answer']}\n")

In [1]:
db.similarity_search_with_relevance_scores("who is PM of india?")

NameError: name 'db' is not defined

In [7]:
history

<langchain.memory.chat_message_histories.mongodb.MongoDBChatMessageHistory at 0x21447071ed0>

In [9]:
# _template = """Given the following conversation and a follow up question, rephrase the follow up question to be a 
# standalone question without changing the content in given question.

# Chat History:
# {chat_history}
# Follow Up Input: {question}
# Standalone question:"""
# condense_question_prompt_template = PromptTemplate.from_template(_template)

# prompt_template = """You are helpful information giving QA System and make sure you don't answer anything 
# not related to following context. You are always provide useful information & details available in the given context. Use the following pieces of context to answer the question at the end. 
# If you don't know the answer, just say that you don't know, don't try to make up an answer. 

# {context}

# Question: {question}
# Helpful Answer:"""

# qa_prompt = PromptTemplate(
#     template=prompt_template, input_variables=["context", "question"]
# )

# memory = ConversationBufferMemory(memory_key="chat_history", chat_memory=history, return_messages=True)
# question_generator = LLMChain(llm=llm, prompt=condense_question_prompt_template, memory=memory)
# doc_chain = load_qa_chain(llm, chain_type="stuff", prompt=qa_prompt)
# qa_chain = ConversationalRetrievalChain(
#     retriever=db.as_retriever(search_kwargs={'k': 6}),
#     question_generator=question_generator,
#     combine_docs_chain=doc_chain,
#     memory=memory,

# )
# question = input()
# chat_history = []
# while True:
#     result = qa_chain({'question': question, 'chat_history': memory})

#     response = result['answer']
#     history.add_message((add_message, response))
#     print(result['answer'])