<a href="https://colab.research.google.com/github/chpranav7/outamation-document-intelligence/blob/main/notebooks/outamation_rag_ui.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# =========================
# 0) System prep (Colab)
# =========================
!apt-get -y update >/dev/null
!apt-get -y install -qq poppler-utils tesseract-ocr >/dev/null

!pip -q install "gradio>=4.44.0" "pypdf>=4.2.0" "pdf2image>=1.17.0" "pytesseract>=0.3.10" \
                 "sentence-transformers>=3.0.1" "faiss-cpu>=1.8.0" "transformers>=4.43.0" \
                 "accelerate>=0.33.0" "torch>=2.3.0" scikit-learn pandas



W: Skipping acquire of configured file 'main/source/Sources' as repository 'https://r2u.stat.illinois.edu/ubuntu jammy InRelease' does not seem to provide it (sources.list entry misspelt?)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m329.0/329.0 kB[0m [31m3.8 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m23.8/23.8 MB[0m [31m39.4 MB/s[0m eta [36m0:00:00[0m
[?25h

In [None]:
# =========================
# 1) Imports & Globals
# =========================
import os, re, io, json, time, shutil
from pathlib import Path
from typing import List, Dict, Any, Optional, Tuple

import numpy as np
import pandas as pd
import gradio as gr
from pypdf import PdfReader
from pdf2image import convert_from_path
import pytesseract

import torch
from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline
from sentence_transformers import SentenceTransformer
import faiss

from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity

DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
DTYPE  = torch.float16 if torch.cuda.is_available() else torch.float32
UPLOAD_DIR = "/content/uploads"
os.makedirs(UPLOAD_DIR, exist_ok=True)


In [None]:
# =========================
# 2) Utilities
# =========================
def clean_text(t: str) -> str:
    return re.sub(r"\s+", " ", (t or "").strip())

def safe_path(file_obj) -> Optional[str]:
    if file_obj is None: return None
    if isinstance(file_obj, (str, Path)): return str(file_obj)
    if isinstance(file_obj, dict): return file_obj.get("name") or file_obj.get("path")
    return getattr(file_obj, "name", None)

def ocr_pdf_to_text(path: str, dpi: int = 250) -> List[str]:
    pages = convert_from_path(path, dpi=dpi)
    out = []
    for img in pages:
        txt = pytesseract.image_to_string(img)
        out.append(clean_text(txt))
    return out

def read_pdf_pages(path: str) -> List[str]:
    reader = PdfReader(path)
    page_texts = []
    for p in reader.pages:
        txt = p.extract_text() or ""
        page_texts.append(clean_text(txt))
    # OCR fallback if any page looks empty
    if any(len(t) < 30 for t in page_texts):
        ocr_all = ocr_pdf_to_text(path)
        page_texts = [ocr_all[i] if len(page_texts[i]) < 30 else page_texts[i]
                      for i in range(len(page_texts))]
    return page_texts

def chunk_text(s: str, size=800, overlap=150):
    s = s or ""
    if not s: return []
    out, start, L = [], 0, len(s)
    while start < L:
        end = min(L, start + size)
        out.append(s[start:end])
        if end == L: break
        start = max(end - overlap, start + 1)
    return out


In [None]:
# =========================
# 3) Simple routers (rules)
# =========================
def classify_doc_type_rule(text: str) -> str:
    t = (text or "").lower()
    if any(k in t for k in ["net pay","gross pay","pay date","payslip","pay statement"]): return "pay_stub"
    if any(k in t for k in ["agreement","contract","termination","probation"]): return "contract"
    if re.search(r"\bw-2\b", t) or "w2" in t: return "w2"
    if "lender fee" in t or "loan estimate" in t or "fees worksheet" in t: return "lender_fees"
    return "other"

def classify_query_rule(query: str) -> str:
    q = (query or "").lower()
    if any(k in q for k in ["salary","net pay","gross","payslip","pay stub","pay date"]): return "pay_stub"
    if any(k in q for k in ["contract","termination","clause","penalties","term length","probation"]): return "contract"
    if any(k in q for k in ["lender","fees","loan estimate","escrow","underwriting"]): return "lender_fees"
    if "w2" in q: return "w2"
    return "other"


In [None]:
# =========================
# 4) Vector store (MiniLM+FAISS with TF-IDF fallback)
# =========================
class VectorStore:
    def __init__(self, model_name="sentence-transformers/all-MiniLM-L6-v2"):
        self.model_name = model_name
        self.embedder = None
        self.index = None
        self.meta: List[Dict[str, Any]] = []
        self.dim = None
        self.backend = None
        # TF-IDF fallback
        self.tfidf_vec = None
        self.tfidf_mat = None

    def _try_load_embedder(self):
        if self.embedder is None:
            self.embedder = SentenceTransformer(self.model_name, device=DEVICE)

    def _embed(self, texts: List[str]) -> np.ndarray:
        self._try_load_embedder()
        emb = self.embedder.encode(texts, convert_to_numpy=True, normalize_embeddings=True, batch_size=64)
        return emb.astype("float32")

    def fit(self, chunks: List[Dict[str, Any]]):
        self.meta = chunks
        try:
            X = self._embed([c["text"] for c in chunks])
            self.dim = X.shape[1]
            self.index = faiss.IndexFlatIP(self.dim)   # cosine if normalized
            self.index.add(X)
            self.backend = "faiss+MiniLM"
        except Exception as e:
            self.tfidf_vec = TfidfVectorizer(lowercase=True, ngram_range=(1,2), stop_words="english")
            self.tfidf_mat = self.tfidf_vec.fit_transform([c["text"] for c in chunks])
            self.backend = f"tfidf (fallback: {e})"

    def search(self, query: str, top_k=8, cand=64, doc_type_filter: Optional[str]=None):
        rows = []
        if self.index is not None:
            q = self._embed([query])
            sims, idxs = self.index.search(q, min(cand, self.index.ntotal))
            for score, ix in zip(sims[0], idxs[0]):
                if ix < 0: continue
                m = self.meta[ix]
                if doc_type_filter and doc_type_filter != "all" and m.get("doc_type") != doc_type_filter:
                    continue
                rows.append((ix, float(score), m))
            rows.sort(key=lambda r: r[1], reverse=True)
            return rows[:top_k]
        elif self.tfidf_mat is not None:
            qv = self.tfidf_vec.transform([query])
            sims = cosine_similarity(qv, self.tfidf_mat).ravel()
            order = sims.argsort()[::-1]
            for ix in order:
                m = self.meta[int(ix)]
                if doc_type_filter and doc_type_filter != "all" and m.get("doc_type") != doc_type_filter:
                    continue
                rows.append((int(ix), float(sims[int(ix)]), m))
                if len(rows) >= top_k: break
            return rows
        return []

    def save(self, base="/content"):
        with open(os.path.join(base, "chunks.json"), "w", encoding="utf-8") as f:
            json.dump(self.meta, f, ensure_ascii=False, indent=2)
        with open(os.path.join(base, "index_backend.txt"), "w") as f:
            f.write(self.backend or "unknown")


In [None]:
# =========================
# 5) Open-source LLM (TinyLlama) with extractive fallback
# =========================
class TinyLlamaAnswerer:
    def __init__(self):
        self.ok = False
        try:
            self.tok = AutoTokenizer.from_pretrained("TinyLlama/TinyLlama-1.1B-Chat-v1.0")
            self.model = AutoModelForCausalLM.from_pretrained(
                "TinyLlama/TinyLlama-1.1B-Chat-v1.0",
                torch_dtype=DTYPE,
                device_map="auto"
            )
            self.pipe = pipeline("text-generation", model=self.model, tokenizer=self.tok,
                                 device=0 if DEVICE=="cuda" else -1)
            self.ok = True
        except Exception as e:
            print("TinyLlama load failed, using extractive fallback. Err:", e)

    def _prompt(self, query: str, ctx_blocks: List[Dict[str,str]]) -> str:
        ctx = ""
        for b in ctx_blocks:
            ctx += f"[Source: {b['file']} | p.{b['page_start']}-{b['page_end']}]\n{b['text']}\n\n"
        sys = ("You are a precise assistant. Answer ONLY from the provided context. "
               "Cite sources inline like (source: file, pages). If not in context, say you don't know.")
        return f"<s>[INST] <<SYS>>\n{sys}\n<</SYS>>\n\nContext:\n{ctx}\nQuestion: {query}\n\nAnswer: [/INST]"

    def answer(self, query: str, ctx_blocks: List[Dict[str,str]]) -> str:
        if not self.ok:
            def snip(t, q):
                kws = [w for w in re.findall(r"[a-zA-Z]{4,}", q.lower())]
                sents = re.split(r'(?<=[\.!\?])\s+', t)
                keep = [s for s in sents if any(k in s.lower() for k in kws)]
                return " ".join(keep[:3]) if keep else " ".join(sents[:2])
            parts = [f"{snip(b['text'], query)} (source: {b['file']} p.{b['page_start']}-{b['page_end']})"
                     for b in ctx_blocks[:3]]
            return " ".join(parts) if parts else "I couldn't find relevant information in the context."

        out = self.pipe(self._prompt(query, ctx_blocks),
                        max_new_tokens=256, do_sample=False, temperature=0.1,
                        repetition_penalty=1.05)[0]["generated_text"]
        return out.split("[/INST]")[-1].strip()

answerer = TinyLlamaAnswerer()


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


tokenizer_config.json: 0.00B [00:00, ?B/s]

tokenizer.model:   0%|          | 0.00/500k [00:00<?, ?B/s]

tokenizer.json: 0.00B [00:00, ?B/s]

special_tokens_map.json:   0%|          | 0.00/551 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/608 [00:00<?, ?B/s]

`torch_dtype` is deprecated! Use `dtype` instead!


model.safetensors:   0%|          | 0.00/2.20G [00:00<?, ?B/s]

generation_config.json:   0%|          | 0.00/124 [00:00<?, ?B/s]

TinyLlama load failed, using extractive fallback. Err: The model has been loaded with `accelerate` and therefore cannot be moved to a specific device. Please discard the `device` argument when creating your pipeline object.


In [None]:
# =========================
# 6) Pipeline build + retrieval
# =========================
def process_documents(files, chunk_size=800, overlap=150):
    if not files: raise gr.Error("Please upload at least one PDF.")
    chunks: List[Dict[str,Any]] = []
    n_files, n_pages, t0 = 0, 0, time.time()

    for f in files:
        src = safe_path(f)
        if not src: continue
        dst = os.path.join(UPLOAD_DIR, os.path.basename(src))
        try: shutil.copy(src, dst)
        except Exception: dst = src

        pages = read_pdf_pages(dst)
        n_files += 1; n_pages += len(pages)
        for pnum, ptxt in enumerate(pages):
            doc_type = classify_doc_type_rule(ptxt)
            for i, ch in enumerate(chunk_text(ptxt, chunk_size, overlap)):
                chunks.append({
                    "id": len(chunks),
                    "text": ch,
                    "doc_type": doc_type,
                    "file": os.path.basename(dst),
                    "page_start": pnum + 1,
                    "page_end":   pnum + 1,
                    "chunk_index": i,
                })

    if not chunks: raise gr.Error("No text extracted (even after OCR).")

    vs = VectorStore()
    vs.fit(chunks)
    vs.save("/content")

    elapsed = time.time() - t0
    by_type = {}
    for c in chunks: by_type[c["doc_type"]] = by_type.get(c["doc_type"], 0) + 1

    status = f"""
**Processed**: {n_files} file(s), {n_pages} page(s)
**Chunks**: {len(chunks)}
**By type**: {', '.join([f'{k}:{v}' for k,v in by_type.items()])}
**Index**: {vs.backend}
**Time**: {elapsed:.1f}s

**Saved**: /content/chunks.json, /content/index_backend.txt
"""
    store = {"chunks": chunks, "stats": {"files": n_files, "pages": n_pages}}
    return status, store, vs

def retrieve_with_routing(vs: VectorStore, query: str, k=5, filter_doc_type="all", auto_route=True):
    route = classify_query_rule(query) if auto_route and filter_doc_type=="all" else filter_doc_type
    rows = vs.search(query, top_k=k, cand=max(64, k*8), doc_type_filter=route)
    if not rows: return [], 0.0
    scores = [r[1] for r in rows]; conf = float(np.mean(scores))
    ctx = []
    for ix, score, meta in rows:
        ctx.append({
            "text": meta["text"], "file": meta["file"],
            "page_start": meta["page_start"], "page_end": meta["page_end"],
            "doc_type": meta["doc_type"], "score": score, "idx": meta["id"]
        })
    return ctx, conf


In [None]:
# =========================
# 7) Gradio callbacks & UI
# =========================
def cb_process(files):
    status, store, vs = process_documents(files)
    return status, store, vs

def cb_chat(message, history, filt, auto_route, k, store, vs):
    if not message: return history, "", gr.update(visible=False)
    if vs is None:
        bot = "Please upload PDFs and click **Process Documents** first."
        return history + [{"role":"user","content":message},{"role":"assistant","content":bot}], "", gr.update(visible=False)

    t0 = time.time()
    ctx, conf = retrieve_with_routing(vs, message, k=k, filter_doc_type=filt, auto_route=auto_route)
    if not ctx:
        bot = "I couldn't find relevant information to answer your question."
        return history + [{"role":"user","content":message},{"role":"assistant","content":bot}], "", gr.update(visible=False)
    answer = answerer.answer(message, ctx); latency = time.time()-t0

    src_lines = [f"{i+1}. **{b['file']}** (type: {b['doc_type']}, p.{b['page_start']}-{b['page_end']}, score {b['score']:.3f}, idx {b['idx']})"
                 for i,b in enumerate(ctx)]
    src_md = f"### Sources (k={len(ctx)}) • Confidence: **{conf:.3f}** • Latency: **{latency:.2f}s**\n" + "\n".join([f"- {ln}" for ln in src_lines])

    history = history + [{"role":"user","content":message},{"role":"assistant","content":answer}]
    return history, "", gr.update(value=src_md, visible=True)

def cb_clear(): return [], "", gr.update(visible=False)

def cb_save(history):
    """
    Save the chat to a .txt file and return a local path that a gr.File can download.
    IMPORTANT: return a *string path*, not File.update.
    """
    lines = []
    for m in history or []:
        role = (m.get("role") or "").upper()
        content = m.get("content") or ""
        lines.append(f"{role}: {content}\n\n")
    txt = "".join(lines) if lines else "EMPTY CHAT\n"

    ts = time.strftime("%Y%m%d-%H%M%S")
    out_path = f"/content/chat_history_{ts}.txt"
    with open(out_path, "w", encoding="utf-8") as f:
        f.write(txt)
    return out_path


def cb_eval(vs_state):
    """
    Run a tiny evaluation and return BOTH: (markdown_summary, path_to_csv).
    The UI must wire two outputs: a Markdown and a File.
    """
    vs = vs_state
    if vs is None:
        return "Process documents first.", None

    TESTS = [
        {"q":"What is the Pay Date and Net Pay on this payslip?", "doc_type":"pay_stub",
         "gold_keywords":["pay date","net pay","date","net"]},
        {"q":"What are the amounts for Underwriting Fee and Closing/Escrow Fee", "doc_type":"lender_fees",
         "gold_keywords":["underwriting","closing","escrow","fee"]},
        {"q":"What is the termination notice period and the contract term length?", "doc_type":"contract",
         "gold_keywords":["thirty","30","one year","1 year","termination","notice","period"]},
    ]

    rows = []
    for t in TESTS:
        q, typ = t["q"], t["doc_type"]
        t0 = time.time()
        hits, _ = retrieve_with_routing(vs, q, k=5, filter_doc_type=typ, auto_route=False)
        latency = round(time.time() - t0, 3)

        recall5, mrr = 0.0, 0.0
        if hits:
            pos = None
            for i, h in enumerate(hits, start=1):
                if any(kw in (h["text"] or "").lower() for kw in t["gold_keywords"]):
                    pos = i; break
            recall5 = 1.0 if pos else 0.0
            mrr = (1.0/pos) if pos else 0.0

        rows.append({"query": q, "doc_type": typ,
                     "Recall@5": recall5, "MRR": mrr, "LatencySec": latency})

    df = pd.DataFrame(rows)
    out_csv = "/content/metrics.csv"
    df.to_csv(out_csv, index=False)
    msg = "Saved metrics to **/content/metrics.csv**\n\n" + df.to_string(index=False)
    return msg, out_csv


def cb_scripted_demo(vs_state):
    """
    Save a small scripted demo (queries → answers + sources) and return
    (markdown_status, path_to_json). UI must have 2 outputs.
    """
    vs = vs_state
    if vs is None:
        return "Process documents first.", None

    QUERIES = [
        ("pay_stub", "What is the Pay Date and Net Pay on this payslip?"),
        ("lender_fees","What are the amounts for Underwriting Fee and Closing/Escrow Fee?"),
        ("contract","What is the termination notice period and the contract term length?"),
    ]
    log = []
    for typ, q in QUERIES:
        ctx, conf = retrieve_with_routing(vs, q, k=7, filter_doc_type=typ, auto_route=False)
        ans = answerer.answer(q, ctx) if ctx else "No context found."
        log.append({"query": q, "filter": typ, "confidence": conf,
                    "answer": ans, "sources": ctx})

    out_json = "/content/scripted_demo.json"
    with open(out_json, "w", encoding="utf-8") as f:
        json.dump(log, f, ensure_ascii=False, indent=2)
    return "Saved demo run to **/content/scripted_demo.json**", out_json




In [None]:
# =========================
# 8) Build the UI
# =========================
BRAND_CSS = """
.gradio-container { font-family: Inter, system-ui, -apple-system, Segoe UI, Roboto, Arial; }
#title h1 { margin: 0 0 8px 0; }
#subtitle { color:#64748b; margin-bottom: 14px; }
"""

with gr.Blocks(title="Full RAG UI (Open-Source)", theme=gr.themes.Soft(), css=BRAND_CSS) as demo:
    gr.Markdown("<div id='title'><h1>Full RAG Chatbot (Open-Source)</h1></div>"
                "<div id='subtitle'>Upload PDFs → Process → Ask questions → Get grounded answers with sources & confidence</div>")

    store_state = gr.State(None)
    vs_state    = gr.State(None)

    with gr.Row():
        with gr.Column(scale=2):
            chatbot = gr.Chatbot(label="Chat", type="messages", height=450)
            with gr.Row():
                user_box = gr.Textbox(placeholder="Ask a question about your documents…", label="Your Question", scale=6)
                send_btn = gr.Button(" Send", scale=1)
            with gr.Row():
                filt = gr.Dropdown(
                    choices=[("All","all"), ("Pay Stub","pay_stub"), ("Contract","contract"),
                             ("W2","w2"), ("Lender Fees","lender_fees"), ("Other","other")],
                    value="all", label="Filter by doc type", scale=3
                )
                auto_route = gr.Checkbox(value=True, label="Auto-route by query", scale=2)
                k_slider = gr.Slider(3, 12, value=5, step=1, label="Chunks (k)", scale=2)

            gr.Examples(
                examples=[
                    ["What is the Pay Date and Net Pay on this payslip?"],
                    ["What are the amounts for Underwriting Fee and Closing/Escrow Fee?"],
                    ["What is the termination notice period and the contract term length?"]
                ],
                inputs=[user_box],
                label="Try one of these"
            )

            with gr.Row():
                clear_btn = gr.Button(" Clear Chat")
                save_btn  = gr.Button(" Save Chat")
                download  = gr.File(label="Download chat history", visible=True)

            sources_md = gr.Markdown(visible=False)

        with gr.Column(scale=1):
            files = gr.Files(label="📎 Upload PDF(s)", file_types=[".pdf"])
            process_btn = gr.Button(" Process Documents")
            status = gr.Markdown("")
            with gr.Row():
                eval_btn = gr.Button(" Evaluate (Recall@5, MRR, Latency)")
                scripted_btn = gr.Button(" Scripted Demo (save JSON)")
            eval_out = gr.Markdown("")
            metrics_file = gr.File(label="metrics.csv", visible=True)
            demo_file    = gr.File(label="scripted_demo.json", visible=True)


    process_btn.click(cb_process, inputs=[files], outputs=[status, store_state, vs_state])
    send_btn.click(cb_chat, inputs=[user_box, chatbot, filt, auto_route, k_slider, store_state, vs_state],
                   outputs=[chatbot, user_box, sources_md])
    user_box.submit(cb_chat, inputs=[user_box, chatbot, filt, auto_route, k_slider, store_state, vs_state],
                    outputs=[chatbot, user_box, sources_md])
    clear_btn.click(cb_clear, outputs=[chatbot, user_box, sources_md])
    save_btn.click(cb_save, inputs=[chatbot], outputs=[download])
    eval_btn.click(cb_eval, inputs=[vs_state], outputs=[eval_out])
    scripted_btn.click(cb_scripted_demo, inputs=[vs_state], outputs=[eval_out])

demo.launch(share=True, inline=False, debug=False, show_error=True)


  with gr.Blocks(title="Full RAG UI (Open-Source)", theme=gr.themes.Soft(), css=BRAND_CSS) as demo:
  with gr.Blocks(title="Full RAG UI (Open-Source)", theme=gr.themes.Soft(), css=BRAND_CSS) as demo:
  chatbot = gr.Chatbot(label="Chat", type="messages", height=450)


Colab notebook detected. To show errors in colab notebook, set debug=True in launch()
* Running on public URL: https://619cd1eb28410612d9.gradio.live

This share link expires in 1 week. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)


