<a href="https://colab.research.google.com/github/MariyahW/Outamation_Externship/blob/main/DocChatBox.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:


# =============================
# INSTALLS )
# =============================
!pip install -U gradio pymupdf llama-index llama-index-embeddings-huggingface langchain-text-splitters

# =============================
# IMPORTS
# =============================
import re
from typing import Optional, Dict, Any, List

import gradio as gr
import fitz  # PyMuPDF

from langchain_text_splitters import RecursiveCharacterTextSplitter

from llama_index.core import Document, VectorStoreIndex, Settings
from llama_index.embeddings.huggingface import HuggingFaceEmbedding
from llama_index.core.vector_stores import MetadataFilters, ExactMatchFilter


# =============================
# GLOBAL SETTINGS
# =============================

Settings.embed_model = HuggingFaceEmbedding(model_name="BAAI/bge-small-en-v1.5")

DEFAULT_DOC_TYPE = "Contract"   # change to "loan_form" if you're using fee worksheet PDFs
DEFAULT_DOC_ID = "contract_01"


# =============================
# HELPERS
# =============================
def extract_pdf_pages(pdf_path: str, doc_type: str, doc_id: str) -> List[Dict[str, Any]]:
    """Extract each page text and attach minimal metadata."""
    source_file = pdf_path.split("/")[-1]
    doc = fitz.open(pdf_path)

    pages = []
    for i in range(doc.page_count):
        text = doc.load_page(i).get_text("text") or ""
        pages.append(
            {
                "page": i,
                "doc_type": doc_type,
                "text": text,
                "doc_id": doc_id,
                "source_file": source_file,
                "page_in_doc": i,
            }
        )
    doc.close()
    return pages


def chunk_text(full_text: str, chunk_size: int = 512, chunk_overlap: int = 100) -> List[str]:
    splitter = RecursiveCharacterTextSplitter(chunk_size=chunk_size, chunk_overlap=chunk_overlap)
    return splitter.split_text(full_text)


def build_index_from_pdf(pdf_path: str, doc_type: str, doc_id: str) -> Dict[str, Any]:
    """
    Build a vector index from one PDF:
      - Extract pages -> concat -> chunk -> llama_index Documents w/ metadata -> VectorStoreIndex
    Returns a dict you store in gr.State.
    """
    pages = extract_pdf_pages(pdf_path, doc_type=doc_type, doc_id=doc_id)
    full_text = "\n\n".join(p["text"] for p in pages)
    chunks = chunk_text(full_text, chunk_size=512, chunk_overlap=100)

    documents: List[Document] = []
    for i, chunk in enumerate(chunks):
        documents.append(
            Document(
                text=chunk,
                metadata={
                    "doc_type": doc_type,
                    "chunk_index": i,
                    "doc_id": doc_id,
                    "source_file": pages[0]["source_file"] if pages else pdf_path.split("/")[-1],
                },
            )
        )

    index = VectorStoreIndex.from_documents(documents)

    return {
        "index": index,
        "doc_type": doc_type,
        "doc_id": doc_id,
        "source_file": pages[0]["source_file"] if pages else pdf_path.split("/")[-1],
        "num_pages": len(pages),
        "num_chunks": len(chunks),
    }


def retrieve_chunks(state: Dict[str, Any], query: str, top_k: int = 5) -> List[Dict[str, Any]]:
    """Retrieve chunks using metadata filter doc_type."""
    index: VectorStoreIndex = state["index"]
    doc_type = state["doc_type"]

    filters = MetadataFilters(filters=[ExactMatchFilter(key="doc_type", value=doc_type)])
    retriever = index.as_retriever(filters=filters, similarity_top_k=top_k)

    nodes = retriever.retrieve(query)
    return [{"text": n.get_text(), "metadata": dict(n.metadata)} for n in nodes]


def _try_extract_answer(query: str, matched_chunks: List[Dict[str, Any]]) -> str:
    """
    Simple extraction-based answer (no LLM):
    - tries to pull money amounts near common labels
    - otherwise returns a short preview of the best chunk
    """
    q = (query or "").lower()
    blob = "\n".join(c["text"] for c in matched_chunks)

    def find_money(label_patterns: List[str]) -> Optional[str]:
        for pat in label_patterns:
            m = re.search(pat + r".{0,80}?(\$?\s*[\d,]+\.\d{2})", blob, flags=re.I)
            if m:
                return m.group(1).replace(" ", "")
        return None

    # Generic money-related fields you might ask about
    monthly_payment = find_money([r"total\s+estimated\s+monthly\s+payment", r"total\s+monthly\s+payment"])
    funds_to_close = find_money([r"total\s+estimated\s+funds\s+needed\s+to\s+close", r"funds\s+needed\s+to\s+close"])
    penalty = find_money([r"penalt(y|ies)", r"early\s+repayment", r"prepayment"])

    parts = []
    if ("monthly" in q and "payment" in q) and monthly_payment:
        parts.append(f"Total estimated monthly payment looks like {monthly_payment}.")
    if ("close" in q or "closing" in q) and funds_to_close:
        parts.append(f"Funds needed to close looks like {funds_to_close}.")
    if ("penalt" in q or "prepay" in q or "early repayment" in q) and penalty:
        parts.append(f"I see a penalty amount of {penalty} mentioned near the relevant section.")

    if parts:
        return " ".join(parts)

    if matched_chunks:
        preview = matched_chunks[0]["text"].strip().replace("\n", " ")
        preview = (preview[:260] + "‚Ä¶") if len(preview) > 260 else preview
        return f"I pulled the most relevant section(s). Top match starts with: ‚Äú{preview}‚Äù"

    return "I couldn‚Äôt find a strong match. Try using a specific term or section name."


# =============================
# GRADIO BACKEND FUNCTIONS
# =============================
def process_pdf(file_obj, doc_type: str, doc_id: str):
    """Build and store the index in state."""
    if file_obj is None:
        return None, "Upload a PDF first."

    pdf_path = file_obj.name
    try:
        state = build_index_from_pdf(
            pdf_path,
            doc_type=(doc_type.strip() if doc_type else DEFAULT_DOC_TYPE),
            doc_id=(doc_id.strip() if doc_id else DEFAULT_DOC_ID),
        )
        msg = (
            f"‚úÖ Processed {state['source_file']}\n"
            f"- doc_type: {state['doc_type']}\n"
            f"- doc_id: {state['doc_id']}\n"
            f"- pages: {state['num_pages']}\n"
            f"- chunks: {state['num_chunks']}"
        )
        return state, msg
    except Exception as e:
        return None, f"‚ùå Failed to process PDF: {repr(e)}"


def handle_chat(message: str, history: List[Dict[str, str]], state: Optional[Dict[str, Any]]):
    """
    NEW gr.Chatbot format: history is a list of dicts like:
      {"role": "user", "content": "..."}
      {"role": "assistant", "content": "..."}
    """
    message = (message or "").strip()
    if not message:
        return history, ""

    if history is None:
        history = []

    # Add user message
    history.append({"role": "user", "content": message})

    # Must process PDF first
    if not state or "index" not in state:
        history.append({"role": "assistant", "content": "Upload a PDF and click üîÑ Process Document first."})
        return history, ""

    matched = retrieve_chunks(state, message, top_k=5)
    answer = _try_extract_answer(message, matched)

    if matched:
        src = matched[0]["metadata"].get("source_file", "document")
        idx = matched[0]["metadata"].get("chunk_index", "?")
        answer += f"\n\n(Source: {src}, chunk {idx})"

    history.append({"role": "assistant", "content": answer})
    return history, ""


def clear_chat():
    return [], ""


# =============================
# GRADIO UI (Blocks)
# =============================
with gr.Blocks(title="Document Q&A Chatbot") as demo:
    gr.Markdown("## üìÑ Document Q&A Chatbot\nUpload a PDF, click **Process Document**, then ask questions in chat.")

    app_state = gr.State(value=None)

    with gr.Row():
        with gr.Column(scale=2):
            chatbot = gr.Chatbot(label="Chat History", height=520)  # NEW messages format
            user_input = gr.Textbox(
                placeholder="Ask a question about your document...",
                label="Your Question",
            )
            send_btn = gr.Button("üì§ Send")

        with gr.Column(scale=1):
            pdf_input = gr.File(label="üìÑ Upload PDF", file_types=[".pdf"])
            doc_type_in = gr.Textbox(label="doc_type", value=DEFAULT_DOC_TYPE)
            doc_id_in = gr.Textbox(label="doc_id", value=DEFAULT_DOC_ID)

            process_btn = gr.Button("üîÑ Process Document")
            clear_btn = gr.Button("üóëÔ∏è Clear Chat")
            status = gr.Textbox(label="Status", value="Upload a PDF, then click Process.", lines=6)

    process_btn.click(
        fn=process_pdf,
        inputs=[pdf_input, doc_type_in, doc_id_in],
        outputs=[app_state, status],
    )

    send_btn.click(
        fn=handle_chat,
        inputs=[user_input, chatbot, app_state],
        outputs=[chatbot, user_input],
    )

    user_input.submit(
        fn=handle_chat,
        inputs=[user_input, chatbot, app_state],
        outputs=[chatbot, user_input],
    )

    clear_btn.click(
        fn=clear_chat,
        inputs=None,
        outputs=[chatbot, user_input],
    )

demo.launch(debug=True)



Loading weights:   0%|          | 0/199 [00:00<?, ?it/s]

BertModel LOAD REPORT from: BAAI/bge-small-en-v1.5
Key                     | Status     |  | 
------------------------+------------+--+-
embeddings.position_ids | UNEXPECTED |  | 

Notes:
- UNEXPECTED	:can be ignored when loading from different task/architecture; not ok if you expect identical arch.


It looks like you are running Gradio on a hosted Jupyter notebook, which requires `share=True`. Automatically setting `share=True` (you can turn this off by setting `share=False` in `launch()` explicitly).

Colab notebook detected. This cell will run indefinitely so that you can see errors and logs. To turn off, set debug=False in launch().
* Running on public URL: https://6f3cc78100a0e3e512.gradio.live

This share link expires in 1 week. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)
