# config

In [None]:
from loguru import logger
import sys
import os
from pathlib import Path

class Config:
    ALLOWED_FILE_EXTENSIONS = set([".pdf", ".md", ".txt"])
    SEED = 42

class Model:
    NAME = "deepseek-r1:14b"
    TEMPERATURE = 0.6

class Preprocessing:
    CHUNK_SIZE = 2048
    CHUNK_OVERLAP = 128
    EMBEDDING_MODEL = "BAAI/bge-small-en-v1.5"
    RERANKER = "ms-marco-MiniLM-L-12-v2"
    LLM = "llama3.2"
    CONTEXTUALIZE_CHUNKS = True
    N_SEMANTIC_RESULTS = 5
    N_BM25_RESULTS = 5

class Chatbot:
    N_CONTEXT_RESULTS = 3

class Path:
    APP_HOME = Path(os.getenv("APP_HOME", Path(__file__).parent.parent))
    DATA_DIR = APP_HOME / "data"


def configure_logging():
    config = {
        "handlers": [
            {
                "sink": sys.stdout, 
                "colorize": True,
                "format": "<green>{time:YYYY-MM-DD HH:mm:ss}</green> | <level>{level}</level> | {message}",
            },
            {
                "sink": "app.log",  # Log to a file as well
                "rotation": "10 MB",  # Rotate log file when it reaches 10MB
                "compression": "zip", # Compress old log files
                "format": "{time:YYYY-MM-DD HH:mm:ss} | {level} | {message}",
            },
        ],
        "levels": [
            {"name": "TRACE", "color": "<cyan>"},
            {"name": "DEBUG", "color": "<blue>"},
            {"name": "INFO", "color": "<green>"},
            {"name": "WARNING", "color": "<yellow>"},
            {"name": "ERROR", "color": "<red>"},
            {"name": "CRITICAL", "color": "<BOLD><RED>"},
        ],
    }

    logger.configure(**config)

# Example usage:
configure_logging()

logger.trace("This is a trace message.")
logger.debug("This is a debug message.")
logger.info("This is an info message.")
logger.warning("This is a warning message.")
logger.error("This is an error message.")
logger.critical("This is a critical message.")

try:
    result = 10 / 0
except ZeroDivisionError as e:
    logger.exception("An exception occurred: {}", e)

In [3]:
import numpy as np

Xs = np.array([0.0339, 0.0423, 0.213, 0.257, 0.273, 0.273, 0.450, 0.503, 0.503, \
0.637, 0.805, 0.904, 0.904, 0.910, 0.910, 1.02, 1.11, 1.11, 1.41, \
1.72, 2.03, 2.02, 2.02, 2.02])

Ys = np.array([-19.3, 30.4, 38.7, 5.52, -33.1, -77.3, 398.0, 406.0, 436.0, 320.0, 373.0, \
93.9, 210.0, 423.0, 594.0, 829.0, 718.0, 561.0, 608.0, 1.04E3, 1.10E3, \
840.0, 801.0, 519.0])

N = 24

# Calculate the covariance (biased - dividing by N)
covariance_biased = np.cov(Xs, Ys)[0, 1]  # [0, 1] gets the covariance from the matrix

# Calculate the covariance (unbiased - dividing by N-1)
covariance_unbiased = np.cov(Xs, Ys, ddof=1)[0, 1] # ddof=1 for unbiased

print("Biased Covariance:", covariance_biased)
print("Unbiased Covariance:", covariance_unbiased)

Biased Covariance: 191.20706528260865
Unbiased Covariance: 191.20706528260865


In [1]:
from pypdfium2 import PdfDocument

In [2]:
def extract_pdf_content(data: bytes) -> str:
    pdf = PdfDocument(data)

    content = ""
    for page in pdf:
        text_page = page.get_textpage()
        content += f"{text_page.get_text_bounded()}\n"

    return content

In [4]:
data = """This is the first line of the paragraph.
This is the second line of the paragraph.
This is the third line of the paragraph.
This is the fourth line of the paragraph.
This is the fifth line of the paragraph.
This is the sixth line of the paragraph.
This is the seventh line of the paragraph.
This is the eighth line of the paragraph.
This is the ninth line of the paragraph.
This is the tenth line of the paragraph."""

In [7]:
# Load a PDF file and pass its content as bytes to the extract_pdf_content function
pdf_path = "APJ_Abdul_Kalam.pdf"
with open(pdf_path, "rb") as f:
	pdf_data = f.read()

content = extract_pdf_content(pdf_data)
print(content)

Dr. APJ Abdul Kalam: The Missile Man of India
Introduction
Dr. Avul Pakir Jainulabdeen Abdul Kalam, popularly known as the "Missile Man of India," was
a renowned scientist, visionary, and the 11th President of India. He played a crucial role in
India's space and missile development programs and was widely admired for his simplicity,
humility, and dedication to education and innovation. His life and works continue to inspire
millions around the world.
Early Life and Education
Born on October 15, 1931, in Rameswaram, Tamil Nadu, Dr. Kalam hailed from a modest
family. Despite financial constraints, he pursued his education with determination. He
completed his degree in aerospace engineering from the Madras Institute of Technology (MIT).
His passion for science and technology led him to join the Defence Research and
Development Organisation (DRDO) and later the Indian Space Research Organisation (ISRO).
Contributions to Science and Technology
Dr. Kalam made significant contributions to Ind

In [11]:
from streamlit.runtime.uploaded_file_manager import UploadedFile

In [18]:
from dataclasses import dataclass
from pathlib import Path
from src.config import Config
PDF_EXTENSION = ".pdf"
@dataclass
class File:
    name: str
    content: str

In [19]:
def load_uploaded_file(uploaded_file: UploadedFile) -> File:
    file_extension = Path(uploaded_file.name).suffix

    if file_extension not in Config.ALLOWED_FILE_EXTENSIONS:
        raise ValueError(f"Invalid file extension: {file_extension} for file {uploaded_file.name}")

    if file_extension == PDF_EXTENSION:
        return File(name=uploaded_file.name, content=extract_pdf_content(uploaded_file.getvalue()))

    return File(name=uploaded_file.name, content=uploaded_file.getvalue().decode("utf-8"))

In [21]:
from streamlit.runtime.uploaded_file_manager import UploadedFile

class MockUploadedFile:
	def __init__(self, name, data):
		self.name = name
		self.data = data

	def getvalue(self):
		return self.data

# Create a mock UploadedFile object
mock_uploaded_file = MockUploadedFile(pdf_path, pdf_data)

# Load the uploaded file
file = load_uploaded_file(mock_uploaded_file)
print(file)

File(name='APJ_Abdul_Kalam.pdf', content='Dr. APJ Abdul Kalam: The Missile Man of India\r\nIntroduction\r\nDr. Avul Pakir Jainulabdeen Abdul Kalam, popularly known as the "Missile Man of India," was\r\na renowned scientist, visionary, and the 11th President of India. He played a crucial role in\r\nIndia\'s space and missile development programs and was widely admired for his simplicity,\r\nhumility, and dedication to education and innovation. His life and works continue to inspire\r\nmillions around the world.\r\nEarly Life and Education\r\nBorn on October 15, 1931, in Rameswaram, Tamil Nadu, Dr. Kalam hailed from a modest\r\nfamily. Despite financial constraints, he pursued his education with determination. He\r\ncompleted his degree in aerospace engineering from the Madras Institute of Technology (MIT).\r\nHis passion for science and technology led him to join the Defence Research and\r\nDevelopment Organisation (DRDO) and later the Indian Space Research Organisation (ISRO).\r\nContr

## Config

In [None]:
from loguru import logger
import sys
import os
from pathlib import Path

class Config:
    ALLOWED_FILE_EXTENSIONS = set([".pdf", ".md", ".txt"])
    SEED = 42

    class Model:
        NAME = "deepseek-r1:14b"
        TEMPERATURE = 0.6

    class Preprocessing:
        CHUNK_SIZE = 2048
        CHUNK_OVERLAP = 128
        EMBEDDING_MODEL = "BAAI/bge-small-en-v1.5"
        RERANKER = "ms-marco-MiniLM-L-12-v2"
        LLM = "llama3.2"
        CONTEXTUALIZE_CHUNKS = True
        N_SEMANTIC_RESULTS = 5
        N_BM25_RESULTS = 5

    class Chatbot:
        N_CONTEXT_RESULTS = 3

    class Path:
        APP_HOME = Path(os.getenv("APP_HOME", Path(__file__).parent.parent))
        DATA_DIR = APP_HOME / "data"


def configure_logging():
    config = {
        "handlers": [
            {
                "sink": sys.stdout, 
                "colorize": True,
                "format": "<green>{time:YYYY-MM-DD HH:mm:ss}</green> | <level>{level}</level> | {message}",
            },
            {
                "sink": "app.log",  # Log to a file as well
                "rotation": "10 MB",  # Rotate log file when it reaches 10MB
                "compression": "zip", # Compress old log files
                "format": "{time:YYYY-MM-DD HH:mm:ss} | {level} | {message}",
            },
        ],
        "levels": [
            {"name": "TRACE", "color": "<cyan>"},
            {"name": "DEBUG", "color": "<blue>"},
            {"name": "INFO", "color": "<green>"},
            {"name": "WARNING", "color": "<yellow>"},
            {"name": "ERROR", "color": "<red>"},
            {"name": "CRITICAL", "color": "<bold><red>"},
        ],
    }

    logger.configure(**config)

# Example usage:
configure_logging()

logger.trace("This is a trace message.")
logger.debug("This is a debug message.")
logger.info("This is an info message.")
logger.warning("This is a warning message.")
logger.error("This is an error message.")
logger.critical("This is a critical message.")

## file_loader

In [None]:
from dataclasses import dataclass
from pathlib import Path
from src.exception import CustomException
import sys
from pypdfium2 import PdfDocument
from streamlit.runtime.uploaded_file_manager import UploadedFile

from src.config import Config

TEXT_FILE_EXTENSION = ".txt"
MD_FILE_EXTENSION = ".md"

PDF_EXTENSION = ".pdf"

@dataclass
class File:
    name: str
    content: str

def extract_pdf_content(data: bytes) -> str:
    try:
        pdf = PdfDocument(data)

        content = ""
        for page in pdf:
            text_page = page.get_textpage()
            content += f"{text_page.get_text_bounded()}\n"

        return content
    except Exception as e:
        raise CustomException(e,sys)

def load_uploaded_file(uploaded_file: UploadedFile) -> File:
    try:
        file_extension = Path(uploaded_file.name).suffix

        if file_extension not in Config.ALLOWED_FILE_EXTENSIONS:
            raise ValueError(f"Invalid file extension: {file_extension} for file {uploaded_file.name}")

        if file_extension == PDF_EXTENSION:
            return File(name=uploaded_file.name, content=extract_pdf_content(uploaded_file.getvalue()))

        return File(name=uploaded_file.name, content=uploaded_file.getvalue().decode("utf-8"))
    except Exception as e:
        raise CustomException(e,sys)

# data ingestion

In [None]:
from typing import List
from src.exception import CustomException
import sys
from langchain.prompts import ChatPromptTemplate
from langchain.retrievers import ContextualCompressionRetriever, EnsembleRetriever

from langchain_community.document_compressors.flashrank_rerank import FlashrankRerank
from langchain_community.embeddings.fastembed import FastEmbedEmbeddings

from langchain_community.retrievers import BM25Retriever
from langchain_core.documents import Document

from langchain_core.retrievers import BaseRetriever
from langchain_core.vectorstores import InMemoryVectorStore

from langchain_ollama import ChatOllama
from langchain_text_splitters import RecursiveCharacterTextSplitter

from src.config import Config
from src.file_loader import File

CONTEXT_PROMPT = ChatPromptTemplate.from_template(
    """
    You're an expert in document analysis. Your task is to provide brief, relevant context for a chunk of text.

    Here is the document:
    <document>

    {document}
    </document>

    Here is the chunk we want to situate within the whole document:
    <chunk>
    {chunk}
    </chunk>
    Provide a concise context (2-3 sentences) for this chunk , considering the following guidelines:
    1.Identify the main topic or concept discussed in the chunk.
    2. Mention any relevant information or comparision from the broader document.
    3.If applicable, note how this information relates to overall theme or purpose of the documents.
    4.Include any key figure,dates,or percentages that provide importent context.

    context:

    """.strip()
)

text_splitter = RecursiveCharacterTextSplitter(
    chunk_size=Config.Preprocessing.CHUNK_SIZE,
    chunk_overlap=Config.Preprocessing.CHUNK_OVERLAP,
)

def create_llm() -> ChatOllama:
    return ChatOllama(model=Config.Preprocessing.LLM, temperature=0, keep_alive=-1)

def create_embeddings() -> FastEmbedEmbeddings:
    return FastEmbedEmbeddings(model_name=Config.Preprocessing.EMBEDDING_MODEL)

def create_reranker() -> FlashrankRerank:
    try:
        return FlashrankRerank(
            model=Config.Preprocessing.RERANKER, 
            top_n=Config.Chatbot.N_CONTEXT_RESULTS
        )
    except Exception as e:
        raise CustomException(e,sys)

def _generate_context(llm: ChatOllama, document: str, chunk: str) -> str:
    try:
        messages = CONTEXT_PROMPT.format_messages(document=document, chunk=chunk)
        response = llm.invoke(messages)
        return response.content
    except Exception as e:
        raise CustomException(e,sys)

def _create_chunks(document: Document) -> List[Document]:
    try:
        chunks = text_splitter.split_documents([document])

        if not Config.Preprocessing.CONTEXTUALIZE_CHUNKS:
            return chunks

        llm = create_llm()
        contextual_chunks = []

        for chunk in chunks:
            context = _generate_context(llm, document.page_content, chunk.page_content)

            chunk_with_context = f"{context}\n\n{chunk.page_content}"
            contextual_chunks.append(Document(page_content=chunk_with_context, metadata=chunk.metadata))

        return contextual_chunks
    except Exception as e:
        raise CustomException(e,sys)

def ingest_files(files: List[File]) -> BaseRetriever:
    try:
        documents = [Document(page_content=file.content, metadata={"source": file.name}) for file in files]

        chunks = []
        for document in documents:
            chunks.extend(_create_chunks(document))

        semantic_retriever = InMemoryVectorStore.from_documents(
            chunks, create_embeddings()
        ).as_retriever(search_kwargs={"k": Config.Preprocessing.N_SEMANTIC_RESULTS})

        bm25_retriever = BM25Retriever.from_documents(chunks)
        bm25_retriever.k = Config.Preprocessing.N_BM25_RESULTS  # Assign k here

        ensemble_retriever = EnsembleRetriever(
            retrievers=[semantic_retriever, bm25_retriever],
            weights=[0.6, 0.4],
        )

        return ContextualCompressionRetriever(
            base_compressor=create_reranker(), base_retriever=ensemble_retriever
        )
    except Exception as e:
        raise CustomException(e,sys)

# Chatbot


In [None]:
from langgraph.graph import START, StateGraph
from langgraph.graph.state import CompiledStateGraph
from src.exception import CustomException
from src.config import Config
from src.data_ingestion import ingest_files

from src.file_loader import File
from typing import List, TypedDict, Iterable
from enum import Enum
from dataclasses import dataclass
from langchain.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain.schema import BaseMessage, Document, AIMessage, HumanMessage
# from langchain.llms import ChatOllama
from langchain_ollama import ChatOllama
import sys

SYSTEM_PROMPT = """
You're having a conversation with an user about excerpts of their files. Try to be helpful and answer their questions.

If you don't know the answer, say that you don't know and try to ask clarifying questions.
""".strip()

PROMPT = """
Here's the information you have about the excerpts of the files:

<context>
{context}
</context>

One file can have multiple excerpts.

Please, respond to the query below:

<question>
{question}
</question>

Answer:

"""

FILE_TEMPLATE="""
<file>
<name>{name}</name>
<context>{context}</context>
</file>

""".strip()

PROMPT_TEMPLATE=ChatPromptTemplate.from_messages(

    [
        (
            "system",
            SYSTEM_PROMPT,
        ),
        MessagesPlaceholder(variable_name="chat_history"),
        ("human",PROMPT)
    ]

)

class Role(Enum):
    USER="user"
    ASSISTANT="assistant"

@dataclass
class Message:
    role: Role 
    content: str

    

@dataclass
class ChunkEvent:
    content: str

@dataclass
class SourcesEvent:
    content: List[Document]

@dataclass
class FinalAnswerEvent:
    content: str

class State(TypedDict):
    question: str
    chat_history: List[BaseMessage]
    context: List[Document]
    answer:str
    
def _remove_thinking_from_message(message: str) -> str:
    close_tag = "</think>"
    tag_length = len(close_tag)
    return message[message.find(close_tag) + tag_length :].strip()

def create_history(welcome_message: Message) -> List[Message]:
    return [welcome_message]


class Chatbot:
    def __init__(self, files: List[File]):
        self.files = files
        self.retriever = ingest_files(files)
        self.llm = ChatOllama(
            model=Config.Model.NAME,
            temperature=Config.Model.TEMPERATURE,
            verbose=False,
            keep_alive=-1,
        )
        self.workflow = self._create_workflow()

    def _format_docs(self, docs: List[Document]) -> str:
        try:
            return "\n\n".join(
                FILE_TEMPLATE.format(name=doc.metadata["source"], content=doc.page_content)
                for doc in docs
            )
        except Exception as e:
            raise CustomException(e,sys)

        

    def _retrieve(self, state: State):
        try:
            context = self.retriever.invoke(state["question"])
            return {"context": context}
        except Exception as e:
            raise CustomException(e,sys)
    
    def _generate(self, state: State):
        try:
            messages = PROMPT_TEMPLATE.invoke(
                {
                    "question": state["question"],
                    "context": self._format_docs(state["context"]),
                    "chat_history": state["chat_history"],
                }
            )
            answer = self.llm.invoke(messages)
            return {"answer": answer}
        except Exception as e:
            raise CustomException(e,sys)

    def _create_workflow(self) -> CompiledStateGraph:
        graph_builder = StateGraph(State).add_sequence([self._retrieve, self._generate])
        graph_builder.add_edge(START, "_retrieve")
        return graph_builder.compile()
    
    def _ask_model(
        self, prompt: str, chat_history: List[Message]
    ) -> Iterable[SourcesEvent | ChunkEvent | FinalAnswerEvent]:
        try:
            history = [
                AIMessage(m.content) if m.role == Role.ASSISTANT else HumanMessage(m.content)
                for m in chat_history
            ]
            payload = {"question": prompt, "chat_history": history}

            config = {
                "configurable": {"thread_id": 42},
            }
            for event_type, event_data in self.workflow.stream(
                payload,
                config=config,
                stream_mode=["updates", "messages"],
            ):
                if event_type == "messages":
                    chunk, _ = event_data
                    yield ChunkEvent(chunk.content)

                if event_type == "updates":
                    if "_retrieve" in event_data:
                        documents = event_data["_retrieve"]["context"]
                        yield SourcesEvent(documents)

                    if "_generate" in event_data:
                        answer = event_data["_generate"]["answer"]
                        yield FinalAnswerEvent(answer.content)
        except Exception as e:
            raise CustomException(e,sys)
                

    def ask(
        self, prompt: str, chat_history: List[Message]
    ) -> Iterable[SourcesEvent | ChunkEvent | FinalAnswerEvent]:
        try:
            for event in self._ask_model(prompt, chat_history):
                yield event
                if isinstance(event, FinalAnswerEvent):
                    response = _remove_thinking_from_message("".join(event.content))
                    chat_history.append(Message(role=Role.USER, content=prompt))
                    chat_history.append(Message(role=Role.ASSISTANT, content=response))
        except Exception as e:
            raise CustomException(e,sys)