## 📦 Setup and Dependencies
Install all necessary packages

In [None]:
!pip install -q llama-index-llms-google-genai
!pip install -q llama-index-embeddings-huggingface
!pip install -q llama-index
!pip install -q pymupdf
!apt install tesseract-ocr
!pip install pytesseract
!pip install opencv-python
!pip install pillow
!pip install sentence-transformers
!pip install gradio
!pip install llama-index-retrievers-bm25
!pip install nest_asyncio

##🔧 Core Imports and Configuration

In [None]:
from google.colab import files
from google import genai
from llama_index.llms.google_genai import GoogleGenAI
import gradio as gr
import fitz
import os
import cv2
import numpy as np
from PIL import Image
import pytesseract
import json
from llama_index.core import Document
from typing import List
from llama_index.core import Settings
from llama_index.core.node_parser import SemanticSplitterNodeParser
from llama_index.embeddings.huggingface import HuggingFaceEmbedding
from llama_index.core import VectorStoreIndex
from llama_index.core.query_engine import RetrieverQueryEngine
from llama_index.core.retrievers import QueryFusionRetriever
from llama_index.core.postprocessor import SentenceTransformerRerank
from llama_index.retrievers.bm25 import BM25Retriever
from llama_index.core.retrievers import BaseRetriever
from llama_index.core.schema import NodeWithScore

## 🧠 LLM and Embedding Model Initialization

In [None]:
#Google API key
GOOGLE_API_KEY = "AIzaSyCSADKKVIoz7Hu7jPyuzXcOmgAJLTiL3yQ"
os.environ["GOOGLE_API_KEY"] = GOOGLE_API_KEY

# Initialize Gemini LLM
llm = GoogleGenAI(model="gemini-2.0-flash")

# Set as default in LlamaIndex
Settings.llm = llm

# The embedding model setup remains the same
embed_model = HuggingFaceEmbedding(model_name="BAAI/bge-small-en")
Settings.embed_model = embed_model

## 💡 Document Intelligence Functions
These functions handle document classification, boundary detection, prediected query type:

In [None]:
def classify_document_type(text: str, max_length: int = 1500) -> str:
    """
    Classify the document type based on its content.
    Uses LLM to intelligently identify document category.
    """

    text_sample = text[:max_length] if len(text) > max_length else text

    prompt = f"""
    Analyze this document and classify it into ONE of these categories:
    - Resume: CV, professional profile, work history
    - Contract: Legal agreement, terms and conditions, service agreement
    - Mortgage Contract: Home loan agreement, mortgage terms, property financing
    - Invoice: Bill, payment request, financial statement
    - Pay Slip: Salary statement, wage slip, earnings statement
    - Lender Fee Sheet: Loan fees, lender charges, closing costs
    - Land Deed: Property deed, title document, ownership certificate
    - Bank Statement: Account statement, transaction history
    - Tax Document: W2, 1099, tax return, tax form
    - Insurance: Insurance policy, coverage document
    - Report: Analysis, research document, findings
    - Letter: Correspondence, memo, communication
    - Form: Application, questionnaire, data entry form
    - ID Document: Driver's license, passport, identification
    - Medical: Medical report, prescription, health record
    - Other: Doesn't fit other categories

    Document sample:
    {text_sample}

    Respond with ONLY the category name, nothing else.
    """

    try:
        response = llm.complete(prompt)
        doc_type = response.text.strip()

        # Normalize the response
        valid_types = [
            'Resume', 'Contract', 'Mortgage Contract', 'Invoice', 'Pay Slip',
            'Lender Fee Sheet', 'Land Deed', 'Bank Statement', 'Tax Document',
            'Insurance', 'Report', 'Letter', 'Form', 'ID Document',
            'Medical', 'Other'
        ]

        # Find best match (case-insensitive)
        for valid_type in valid_types:
            if doc_type.lower() == valid_type.lower():
                return valid_type

        return 'Other'
    except Exception as e:
        print(f"Classification error: {e}")
        return 'Other'

In [None]:
def detect_document_boundary(prev_text: str, curr_text: str,
                            current_doc_type: str = None) -> bool:
    """
    Detect if two consecutive pages belong to the same document.
    Returns True if they're from the same document.
    """
    # Quick heuristic checks first
    if not prev_text or not curr_text:
        return False

    # Sample the texts for LLM analysis
    prev_sample = prev_text[-500:] if len(prev_text) > 500 else prev_text
    curr_sample = curr_text[:500] if len(curr_text) > 500 else curr_text

    prompt = f"""
    Determine if these two pages are from the SAME document.

    Current document type: {current_doc_type or 'Unknown'}

    End of Previous Page:
    ...{prev_sample}

    Start of Current Page:
    {curr_sample}...

    Consider:
    - Continuity of content
    - Formatting consistency
    - Topic coherence
    - Page numbers or headers

    Answer ONLY 'Yes' if same document or 'No' if different document.
    """

    try:
        response = llm.complete(prompt)
        return response.text.strip().lower().startswith('yes')
    except Exception as e:
        print(f"Boundary detection error: {e}")
        # Default to keeping pages together if uncertain
        return True

In [None]:
def predict_query_document_type(query: str):
    """
    Predict which document type is most likely to contain the answer.
    Returns predicted type and confidence score.
    """
    prompt = f"""
    Analyze this query and predict which document type would most likely contain the answer.

    Query: "{query}"

    Choose the MOST LIKELY type from:
    - Resume: Career, experience, education, skills, employment history
    - Contract: Terms, agreements, obligations, parties, legal terms
    - Mortgage Contract: Home loan, property financing, mortgage terms, interest rates
    - Invoice: Payments, amounts due, billing, charges, invoiced items
    - Pay Slip: Salary, wages, deductions, earnings, pay period
    - Lender Fee Sheet: Loan fees, closing costs, origination fees, lender charges
    - Land Deed: Property ownership, deed information, property description, title
    - Bank Statement: Account balance, transactions, deposits, withdrawals
    - Tax Document: Tax information, W2, 1099, tax returns, tax amounts
    - Insurance: Coverage, policy details, premiums, claims
    - Report: Analysis, findings, conclusions, research data
    - Letter: Communications, requests, notifications, correspondence
    - Form: Applications, submitted data, form fields
    - ID Document: Personal identification, ID numbers, identity verification
    - Medical: Health information, medical conditions, prescriptions
    - Other: General or unclear

    Respond in JSON format:
    {{"type": "DocumentType", "confidence": 0.85}}

    Confidence should be between 0.0 and 1.0
    """

    try:
        response = llm.complete(prompt)
        result = json.loads(response.text.strip())
        return result.get("type", "Other"), result.get("confidence", 0.5)
    except Exception as e:
        print(f"Query routing error: {e}")
        return "Other", 0.0

## ✨ Image Preprocessing for OCR
Functions to enhance images for better text extraction.

In [None]:
# =======================
# 📌 Preprocess Image for OCR
# =======================
def preprocess_image(img, show_preview=False):
    """
    Preprocess an image for OCR by enhancing contrast and sharpening.
    Uses CLAHE for local contrast normalization and Laplacian filtering for edge enhancement.
    """
    # Convert to grayscale
    gray = cv2.cvtColor(img, cv2.COLOR_RGB2GRAY)

    # Apply CLAHE to enhance local contrast
    clahe = cv2.createCLAHE(clipLimit=1.0, tileGridSize=(8, 8))
    gray = clahe.apply(gray)

    # Apply a mild sharpening filter to enhance edges
    sharpening_kernel = np.array([
        [0, -1, 0],
        [-1, 4.5, -1],
        [0, -1, 0]
    ])
    gray = cv2.filter2D(gray, -1, sharpening_kernel)

    # Resize for better OCR accuracy (Tesseract works better on larger text)
    scale_percent = 200  # Increase image size by 200%
    width = int(gray.shape[1] * scale_percent / 100)
    height = int(gray.shape[0] * scale_percent / 100)
    gray = cv2.resize(gray, (width, height), interpolation=cv2.INTER_CUBIC)

    return gray

## 📄 Document Ingestion and Extraction

In [None]:
def extract_and_analyze(pdf_path):
  doc = fitz.open(pdf_path)
  pages_info = []

  for i, page in enumerate(doc):
    text = page.get_text()

    if not text.strip():
      print(f"  Page {i}: No text found, attempting OCR...")

      try:
        pix = page.get_pixmap(dpi=300) # Increase DPI for better OCR
        img = np.array(Image.frombytes("RGB", [pix.width, pix.height], pix.samples))

        # Use the new preprocess_image function
        preprocessed_img = preprocess_image(img)

        custom_config = r'--oem 3 -l eng'
        text = pytesseract.image_to_string(preprocessed_img, config=custom_config)

        print(f"  Page {i}: OCR extracted {len(text)} characters")
      except Exception as e:
        print(f"OCR Failed - {e} for Page {i}")
        text = ""

    #Applies metadata to each page of text
    pages_info.append(
          Document(
              text=text,
              metadata={
                  "page_number": i,
                  "doc_type": None,
                  "page_in_doc": None,
                  "source": pdf_path
                }
            )
        )
  doc.close()

  if not pages_info:
    raise ValueError("No text could be extracted from PDF")

  print(f"✅ Extracted {len(pages_info)} pages")

  doc_counter = 0
  current_doc_type = None

  #Uses current_doc_type and classify_document_type to fill in reast of metadata
  for i, page_info in enumerate(pages_info):

    if i == 0:
      current_doc_type = classify_document_type(page_info.text)
      page_info.metadata["doc_type"] = current_doc_type
      page_info.metadata["page_in_doc"] = 0
    else:
      prev_text = pages_info[i-1].text
      curr_text = page_info.text

      same = detect_document_boundary(prev_text, curr_text, current_doc_type)

      if not same:
        doc_counter = 0
        page_info.metadata["page_in_doc"] = doc_counter
        current_doc_type = classify_document_type(page_info.text)
        page_info.metadata["doc_type"] = current_doc_type
      else:
        doc_counter += 1
        page_info.metadata["page_in_doc"] = doc_counter
        page_info.metadata["doc_type"] = current_doc_type

  return pages_info

In [None]:
def process_and_index_pdf(pdf_path):
    documents = extract_and_analyze(pdf_path)
    all_nodes = []
    for doc in documents:
        # Use chunk_page_with_metadata on each document (which represents a page)
        nodes_from_page = chunk_page_with_metadata(doc)
        all_nodes.extend(nodes_from_page)

    # Build the index from all the collected nodes
    vector_index = VectorStoreIndex(all_nodes)
    print(f"Indexed {len(all_nodes)} document chunks")
    return vector_index

## ✂️ Chunking and Metadata Assignment

In [None]:
def chunk_page_with_metadata(page_info: Document, chunk_size: int = 500, chunk_overlap: int = 100):

  splitter = SemanticSplitterNodeParser( # Creates semantic splitter with embedding model
    buffer_size = 1,
    breakpoint_percentile_threshold = 95, # How sensitive to change in meaning
    embed_model = embed_model
  )

  # Create nodes(chunks) from documents
  # Pass the single Document object (page_info) as a list to get_nodes_from_documents
  nodes = splitter.get_nodes_from_documents([page_info])

  # Assign additional metadata to the LlamaIndex Node objects
  for i, node in enumerate(nodes):
      # Add metadata from the original page_info Document
      node.metadata["page_number"] = page_info.metadata.get("page_number")
      node.metadata["doc_type"] = page_info.metadata.get("doc_type")
      node.metadata["page_in_doc"] = page_info.metadata.get("page_in_doc")
      node.metadata["source"] = page_info.metadata.get("source")
      # Add chunk-specific metadata
      node.metadata["chunk_index"] = i
      node.metadata["doc_id"] = page_info.id_ # Use the Document's id as the base doc_id for chunks
      node.id_ = f"{page_info.id_}_chunk_{i}" # Assign a unique ID to the node

  return nodes # Return the list of LlamaIndex Node objects

## 🏗️ RAG Pipeline Construction

In [None]:
def build_rag_pipeline(index, predicted_doc_type: str = None, num_chunks: int = 4):
    nodes = list(index.docstore.docs.values())
    num_nodes = len(nodes)
    safe_top_k = min(num_chunks, max(1, num_nodes)) # Use num_chunks from input

    # --- Add Metadata Filtering ---
    from llama_index.core.vector_stores import ExactMatchFilter, MetadataFilters

    filters = None
    if predicted_doc_type and predicted_doc_type != "Other":
        print(f"Applying filter for document type: {predicted_doc_type}")
        filters = MetadataFilters(
            filters=[ExactMatchFilter(key="doc_type", value=predicted_doc_type)]
        )

    # Use the filter in the vector retriever
    vector_retriever = index.as_retriever(
        similarity_top_k=safe_top_k,
        filters=filters # This is the new part!
    )
    # --- End of Filtering Logic ---

    bm25_retriever = BM25Retriever.from_defaults(
        nodes=nodes,
        similarity_top_k=safe_top_k
    )


    class HybridRetriever(BaseRetriever): #Custom class to combine both vector and keyword search

     def __init__(self, vector_retriever, keyword_retriever, top_k=2):
        self.vector_retriever = vector_retriever
        self.keyword_retriever = keyword_retriever
        self.top_k = top_k
        super().__init__()
     def _retrieve(self, query_bundle, **kwargs):
        vector_nodes = self.vector_retriever.retrieve(query_bundle)
        keyword_nodes = self.keyword_retriever.retrieve(query_bundle)
        all_nodes = list(vector_nodes) + list(keyword_nodes)
        unique_nodes = {node.node_id: node for node in all_nodes}
        sorted_nodes = sorted(
           unique_nodes.values(),
           key=lambda x: x.score if hasattr(x, 'score') else 0.0,
           reverse=True
        )
        return sorted_nodes[:self.top_k]

    hybrid_retriever = HybridRetriever( # Creates instance of class defined above
      vector_retriever=vector_retriever,
      keyword_retriever=bm25_retriever,
      top_k=safe_top_k
    )

    if num_nodes > 1:
      reranker = SentenceTransformerRerank( #Checks which chunk is most relevant to original query
         model="cross-encoder/ms-marco-MiniLM-L-12-v2", # More powerful than l-6
         top_n=min(2, num_nodes) # Use a smaller top_n for reranker, typically 2 is enough
      )
      node_postprocessors = [reranker]
    else:
      node_postprocessors = []

    fusion_retriever = QueryFusionRetriever( #Creates multiple versions of the user's query
      retrievers=[hybrid_retriever],
      llm=llm,
      similarity_top_k=safe_top_k, # Use num_chunks here as well
      num_queries=3,  # Generate 3 queries per original query
      mode="reciprocal_rerank"
    )

    query_engine = RetrieverQueryEngine.from_args( # Takes fusion retriever and reranker and combines them
        retriever=fusion_retriever,
        llm=llm,
        node_postprocessors=node_postprocessors
    )
    return query_engine # Returns output

## 💬 Gradio Chat Interface

In [None]:
# Global variable to store the index and document info after processing.
# This ensures we only process the PDF once.
global_index = None
document_info = {
    "doc_types": ["All"],
    "structure": "Document structure will appear here."
}

def process_pdf_for_gradio(pdf_file):
    """
    Handles the PDF processing when the user clicks the 'Process' button.
    It processes the PDF, builds the index, and extracts metadata for the UI.
    """
    global global_index, document_info

    if pdf_file is None:
        return "⚠️ Please upload a PDF file first.", "Document structure will appear here.", gr.update(choices=["All"], value="All")

    try:
        # Store the index in the global variable so we don't have to re-process
        index = process_and_index_pdf(pdf_file.name)
        global_index = index

        # --- Extract document structure and types for the UI ---
        nodes = list(index.docstore.docs.values())
        doc_types = sorted(list(set(node.metadata.get("doc_type", "N/A") for node in nodes)))

        structure_summary = {}
        for doc_type in doc_types:
            pages = sorted(list(set(
                node.metadata.get("page_number", -1) + 1
                for node in nodes if node.metadata.get("doc_type") == doc_type
            )))
            page_str = ", ".join(map(str, pages))
            structure_summary[doc_type] = f"Pages: {page_str}"

        structure_display = "### Document Structure\n" + "\n".join(
            f"- **{dtype}**: {info}" for dtype, info in structure_summary.items()
        )

        # Store info and prepare UI updates
        document_info['doc_types'] = ["All"] + doc_types
        document_info['structure'] = structure_display

        status_msg = f"✅ Successfully processed **{os.path.basename(pdf_file.name)}**. Ready to chat."

        return status_msg, structure_display, gr.update(choices=document_info['doc_types'], value="All")

    except Exception as e:
        global_index = None
        error_msg = f"❌ **Error processing PDF:** {str(e)}"
        return error_msg, "Failed to process.", gr.update(choices=["All"], value="All")

In [None]:
def chat_handler(user_input, history, num_chunks, auto_route, doc_filter):
    """
    Handles the chat logic. Now formats output for type='messages'.
    """
    # Don't do anything if the user input is empty
    if not user_input or not user_input.strip():
        return history

    # Append the user's message to the history in the new dictionary format
    history.append({"role": "user", "content": user_input})

    # Handle the case where no document is processed
    if global_index is None:
        history.append({"role": "assistant", "content": "📚 Please upload and process a PDF document first."})
        return history

    try:

        predicted_doc_type = None
        confidence_pred = 0.0

        if auto_route and doc_filter == "All":
            predicted_doc_type, confidence_pred = predict_query_document_type(user_input)
            print(f"Predicted query document type: {predicted_doc_type} with confidence {confidence_pred:.2f}")
        elif doc_filter != "All":
            predicted_doc_type = doc_filter
            print(f"Using manual filter for document type: {predicted_doc_type}")

        rag_engine = build_rag_pipeline(
            global_index,
            predicted_doc_type=predicted_doc_type,
            num_chunks=int(num_chunks)
        )

        response = rag_engine.query(user_input)

        sources = []
        if hasattr(response, 'source_nodes'):
            for node in response.source_nodes:
                score = node.score
                chunk_text = node.get_content()
                metadata = node.metadata
                page_num = metadata.get('page_number', -1)
                doc_type_info = metadata.get('doc_type', 'N/A')

                # Format the new, more detailed source string
                source_info = (
                    f"- **{doc_type_info}** (Page: {page_num + 1}) [Score: {score:.2f}]\n"
                    f"  > \"{chunk_text[:250]}...\""  # Show a 250-character preview
                )
                sources.append(source_info)

        source_text = "\n\n📍 **Sources:**\n" + "\n".join(sources) if sources else ""
        formatted_response = f"{response.response}{source_text}"
        # --- End of original logic ---

        # Append the bot's successful response
        history.append({"role": "assistant", "content": formatted_response})

    except Exception as e:
        # Append an error message if something goes wrong
        error_message = f"⚠️ An error occurred: {str(e)}"
        history.append({"role": "assistant", "content": error_message})

    return history

In [None]:
# --- Main Gradio UI Blocks ---
with gr.Blocks(theme=gr.themes.Soft(), title="Enhanced Document RAG") as demo:
    gr.Markdown("## 🚀 Document RAG with Enhanced Processing")

    with gr.Row():
        with gr.Column(scale=1):
            pdf_input = gr.File(label="1. Upload PDF", file_types=[".pdf"])

            # A dedicated button to trigger processing
            process_btn = gr.Button("🔄 Process Document", variant="primary")

            gr.Markdown("### ⚙️ Settings")
            doc_type_filter = gr.Dropdown(
                choices=["All"], value="All", label="Filter by Document Type"
            )
            auto_route_toggle = gr.Checkbox(
                value=True, label="Enable Auto Document Routing"
            )
            num_chunks_input = gr.Slider(
                minimum=1, maximum=10, value=4, step=1, label="Chunks to Retrieve (k)"
            )

            gr.Markdown("### 📊 Processing Status")
            process_status = gr.Markdown("⏳ Waiting for document to be processed...")
            document_structure = gr.Markdown()

        with gr.Column(scale=2):
            chatbot = gr.Chatbot(label="Chat History", height=600, type="messages")

            with gr.Row():
                user_input = gr.Textbox(
                    placeholder="Ask a question about your document...",
                    label="2. Ask a Question",
                    scale=4
                )
                send_btn = gr.Button("📤 Send", variant="primary", scale=1)

            clear_btn = gr.Button("🗑️ Clear Chat")

    # --- Event Handlers ---

    # 1. When the "Process" button is clicked
    process_btn.click(
        fn=process_pdf_for_gradio,
        inputs=[pdf_input],
        outputs=[process_status, document_structure, doc_type_filter]
    )

    # 2. When the "Send" button is clicked for chat
    send_btn.click(
        fn=chat_handler,
        inputs=[user_input, chatbot, num_chunks_input, auto_route_toggle, doc_type_filter],
        outputs=[chatbot]
    ).then(lambda: gr.update(value=""), outputs=[user_input]) # Clears the textbox after sending

    # 3. When the user presses Enter in the textbox
    user_input.submit(
        fn=chat_handler,
        inputs=[user_input, chatbot, num_chunks_input, auto_route_toggle, doc_type_filter],
        outputs=[chatbot]
    ).then(lambda: gr.update(value=""), outputs=[user_input]) # Clears the textbox after sending

    # 4. Clear chat button
    clear_btn.click(lambda: [], outputs=[chatbot])
# This line starts the user interface
demo.launch(share=True)