### Import


In [1]:
import os
from dotenv import load_dotenv
from langchain_community.llms import Ollama
from langchain_community.embeddings import OllamaEmbeddings
from langchain_core.output_parsers import StrOutputParser
from langchain.prompts import PromptTemplate
from langchain_community.vectorstores import DocArrayInMemorySearch
from operator import itemgetter
from langchain.retrievers import ContextualCompressionRetriever
from langchain.retrievers.document_compressors import LLMChainExtractor
from langchain.schema import Document
from langchain.text_splitter import RecursiveCharacterTextSplitter
import fitz  # PyMuPDF library for PDF manipulation
import re
import unicodedata
from langchain_community.document_loaders import (
    PyPDFLoader,
    Docx2txtLoader,
    UnstructuredWordDocumentLoader,
    UnstructuredExcelLoader,
    UnstructuredPowerPointLoader,
    UnstructuredImageLoader,
    UnstructuredHTMLLoader,
)
from typing import List, Dict

### Setup environment


In [2]:
load_dotenv()

OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
MODEL = "llama3"

model = Ollama(model=MODEL)
embeddings = OllamaEmbeddings(model=MODEL)

parser = StrOutputParser()
chain = model | parser


### Utility functions


In [3]:
def normalize_text(text):
    return unicodedata.normalize("NFKD", text).encode("ASCII", "ignore").decode("ASCII")


def load_documents(file_paths):
    documents = []
    for file_path in file_paths:
        _, file_extension = os.path.splitext(file_path.lower())
        if file_extension == ".pdf":
            loader = PyPDFLoader(file_path)
        elif file_extension in [".doc", ".docx", ".odt"]:
            loader = Docx2txtLoader(file_path)
        elif file_extension in [".rtf", ".txt"]:
            loader = UnstructuredWordDocumentLoader(file_path)
        elif file_extension in [".xls", ".xlsx", ".ods", ".csv"]:
            loader = UnstructuredExcelLoader(file_path)
        elif file_extension in [".ppt", ".pptx", ".odp"]:
            loader = UnstructuredPowerPointLoader(file_path)
        elif file_extension in [
            ".bmp",
            ".gif",
            ".jpg",
            ".jpeg",
            ".png",
            ".svg",
            ".tiff",
        ]:
            loader = UnstructuredImageLoader(file_path)
        elif file_extension == ".html":
            loader = UnstructuredHTMLLoader(file_path)
        else:
            print(f"Unsupported file format: {file_extension}")
            continue

        documents.extend(loader.load())

    return documents


def extract_highlighted_text(pdf_path, page_num, start_char, end_char):
    doc = fitz.open(pdf_path)
    page = doc[page_num]

    # Get the rectangle coordinates for the text range
    start_rect = page.get_text("words")[start_char][:4]
    end_rect = page.get_text("words")[end_char - 1][:4]

    # Create a rectangle that encompasses the text range
    highlight_rect = fitz.Rect(start_rect[0], start_rect[1], end_rect[2], end_rect[3])

    # Extract the text within the rectangle
    highlighted_text = page.get_text("text", clip=highlight_rect)

    # Optionally, you can still add a highlight annotation if needed
    # page.add_highlight_annot(highlight_rect)

    doc.close()
    return highlighted_text


def create_citation(document, relevant_text):
    return {
        "document_name": document.metadata.get("source", "Unknown"),
        "page_number": document.metadata.get("page", 0) + 1,
        "text": relevant_text,
        "start_char": document.page_content.index(relevant_text),
        "end_char": document.page_content.index(relevant_text) + len(relevant_text),
    }

### Load and split documents


In [4]:
file_paths = ["dotnet.pdf"]
pages = load_documents(file_paths)

text_splitter = RecursiveCharacterTextSplitter(
    chunk_size=250,
    chunk_overlap=20,
    length_function=len,
)

### Setup prompt and retriever


In [5]:
template = """
Answer the question based on the context below and the conversation history. If you can't answer the question, reply "I don't know".
When using information from the context, sources with the format [Citation X] must be included where X is the number of citation of each answer. 
If answer come from the same source, reuse the same citation number.  

Context: {context}

Conversation History:
{history}

Question: {question}

Answer:
"""

prompt = PromptTemplate.from_template(template)
# splits = text_splitter.split_documents(pages)
vectorstore = DocArrayInMemorySearch.from_documents(pages, embedding=embeddings)
base_retriever = vectorstore.as_retriever(search_type="mmr", search_kwargs={"k": 5})

# compressor = LLMChainExtractor.from_llm(model)
# retriever = ContextualCompressionRetriever(
#     base_compressor=compressor,
#     base_retriever=base_retriever,
# )
retriever = vectorstore.as_retriever(search_type="mmr", search_kwargs={"k": 5})
chain = prompt | model | parser

chain.input_schema.schema()
chain = (
    {
        "context": itemgetter("question") | retriever,
        "question": itemgetter("question"),
        "history": itemgetter("history"),
    }
    | prompt
    | model
    | parser
)



### Conversation Memory


In [6]:
class ConversationMemory:
  def __init__(self, max_history: int = 5):
    self.history: List[Dict[str, str]] = []
    self.max_history = max_history

  def add_interaction(self, question: str, answer: str):
    self.history.append({"question": question, "answer": answer})
    if len(self.history) > self.max_history:
      self.history.pop(0)

  def get_formatted_history(self) -> str:
    return "\n".join(
      [
        f"Human: {interaction['question']}\nAI: {interaction['answer']}"
        for interaction in self.history
      ]
    )

### Generate Response with Citations


In [7]:
def generate_response_with_citations(question: str, conversation_memory: ConversationMemory):
    retrieved_docs = retriever.invoke(question)
    context = ""
    citations = []
    for i, doc in enumerate(retrieved_docs):
        relevant_text = doc.page_content
        citation = create_citation(doc, relevant_text)
        citations.append(citation)
        context += f"[Citation {i + 1}] {relevant_text}\n\n"

    history = conversation_memory.get_formatted_history()
    response = chain.invoke({"context": context, "question": question, "history": history})

    used_citations = []
    for match in re.finditer(r"\[Citation (\d+)\]", response):
        citation_num = int(match.group(1))
        if 1 <= citation_num <= len(citations):
            used_citations.append(citations[citation_num - 1])

    conversation_memory.add_interaction(question, response)
    return response, used_citations

In [8]:
def format_response_with_citations(response, citations):
    formatted_response = f"{response}\n\nCitations:\n"
    if not citations:
        formatted_response += "No citations available.\n"
    for i, citation in enumerate(citations):
        try:
            formatted_response += f"{i+1}. Document: {citation['document_name']}, Page: {citation['page_number']}\n"
            formatted_response += f"   Text: {citation['text'][:100]}...\n\n"
        except Exception as e:
            formatted_response += f"{i+1}. Error formatting citation: {str(e)}\n\n"
    return formatted_response

### Example Usage


In [9]:
# Example usage
questions = [
    "What is the purpose of .NET?",
    "What is the environment of .NET?",
    "How does .NET handle errors?",
    "Do you have information about DLL?",
    "What is the purpose of .NET?",
]
conversation_memory = ConversationMemory()
for question in questions:
    print(f"Question: {question}")
    response, citations = generate_response_with_citations(
        question, conversation_memory
    )
    formatted_response = format_response_with_citations(response, citations)
    print(formatted_response)
    print()

Question: What is the purpose of .NET?
With reference to the context [Citation 1], .NET is a framework developed by Microsoft as a response to the Java platform, with the goal of allowing developers to use multiple languages, editors, and libraries to build applications for various platforms. It includes programming languages (C#, F#, VB), common libraries (.NET standard), compiler for each programming language (that compile to CIL), CIL (Common Infrastructure Language), and CLR (Common Language Runtime).

Answer: To allow developers to use multiple languages, editors, and libraries to build applications for various platforms.

[Citation 1]: https://medium.com/c-sharp-progarmming/net-behind-the-scene-a229f83083d0

Citations:
1. Document: dotnet.pdf, Page: 7
   Text: 11/28/23, 9:58 AM .NET behind the scene. What is it .NET, How it works, and Why… | by Ofir Elarat | ...

2. Document: dotnet.pdf, Page: 7
   Text: 11/28/23, 9:58 AM .NET behind the scene. What is it .NET, How it works, and 

: 