# End-to-End Multi-Document RAG System (with OCR + Query Routing)

This notebook implements a document Q&A pipeline that can ingest mixed PDFs (digital + scanned),
detect and classify multiple logical documents, index them with embeddings, and answer user questions
with source attribution (document type + page range) through a Gradio chatbot UI.

**Key capabilities**
- Multi-document boundary detection + document-type classification
- OCR fallback for scanned / image-only pages (Tesseract)
- Chunking with overlap (500 words, 100 overlap) + metadata preservation
- Dense vector retrieval with query routing by document type (FAISS)
- LLM answer generation constrained to retrieved context (Gemini 2.5 Flash-Lite)



## üìö Setup
Install dependencies for:
- UI (Gradio)
- PDF parsing + page rendering (PyMuPDF / PyPDF2)
- OCR (Tesseract + pytesseract)
- Embeddings + vector search (SentenceTransformers + FAISS)
- LLM + optional framework utilities (Gemini + LlamaIndex utilities)


In [None]:
# Install required packages
!pip install -q gradio
!pip install -q gradio_pdf
!pip install -q pypdf PyPDF2 pymupdf
!pip install -q sentence-transformers transformers
!pip install -q faiss-cpu
!pip install -q google-generativeai
!pip install -q numpy pandas
!apt-get -qq install -y tesseract-ocr
!pip -q install pytesseract pillow

# Install LlamaIndex packages for enhanced document processing
!pip install -q llama-index
!pip install -q llama-index-readers-file
!pip install -q llama-index-embeddings-huggingface
!pip install -q llama-index-vector-stores-faiss
!pip install -q llama-index-llms-gemini


## üîß Imports & Configuration
Load libraries, initialize the LLM client, and configure embedding models used for retrieval.


In [None]:
import gradio as gr
from gradio_pdf import PDF
import fitz  # PyMuPDF
from PyPDF2 import PdfReader
import numpy as np
from sentence_transformers import SentenceTransformer
import faiss
import google.generativeai as genai
from typing import List, Dict, Tuple, Optional
from dataclasses import dataclass
import json
from datetime import datetime
import hashlib
import os
import tempfile
import csv
from google.colab import userdata



# LlamaIndex imports for enhanced document processing
from llama_index.core import Document, VectorStoreIndex, StorageContext
from llama_index.core.schema import TextNode
from llama_index.embeddings.huggingface import HuggingFaceEmbedding
from llama_index.core.node_parser import SentenceSplitter
from llama_index.core.vector_stores import MetadataFilters, MetadataFilter, FilterOperator


# --- LLM Configuration (Gemini) ---
# Reads API key from Colab Secrets. Keeps keys out of the notebook and version control.
GEMINI_API_KEY = userdata.get("GEMINI_API_KEY")
if not GEMINI_API_KEY:
    raise ValueError("Missing GEMINI_API_KEY in Colab Secrets.")
genai.configure(api_key=GEMINI_API_KEY)
gemini_model = genai.GenerativeModel("models/gemini-2.5-flash-lite")

# --- Embeddings ---
# Sentence-BERT (all-MiniLM-L6-v2) provides lightweight, fast embeddings for dense retrieval.
embed_model = SentenceTransformer("all-MiniLM-L6-v2")
llama_embed_model = HuggingFaceEmbedding(model_name="sentence-transformers/all-MiniLM-L6-v2")


def llm_generate(prompt: str) -> str:
    """
    Wrapper around Gemini generation.
    - temperature=0 for deterministic outputs during evaluation
    - returns plain text only (safe extraction from response candidates)
    """
    response = gemini_model.generate_content(
        prompt,
        generation_config={
            "temperature": 0.0,
            "max_output_tokens": 512,
        }
    )

    try:
        parts = []
        for cand in getattr(response, "candidates", []) or []:
            content = getattr(cand, "content", None)
            if not content:
                continue
            for part in getattr(content, "parts", []) or []:
                if hasattr(part, "text"):
                    parts.append(part.text)

        stitched = "".join(parts).strip()
        if stitched:
            return stitched
    except Exception:
        pass

    return (getattr(response, "text", "") or "").strip()






## üìÑ Data Structures
Lightweight dataclasses used to track:
- page-level text extraction
- logical document boundaries within a PDF
- chunk metadata (doc type, page range, retrieval info)


In [None]:
@dataclass
class PageInfo:
    """Stores information about a single page"""
    page_num: int
    text: str
    doc_type: Optional[str] = None
    page_in_doc: int = 0

@dataclass
class LogicalDocument:
    """Represents a logical document within a PDF"""
    doc_id: str
    doc_type: str
    page_start: int
    page_end: int
    text: str
    chunks: List[Dict] = None

@dataclass
class ChunkMetadata:
    """Rich metadata for each chunk"""
    chunk_id: str
    doc_id: str
    doc_type: str
    chunk_index: int
    page_start: int
    page_end: int
    text: str
    embedding: Optional[np.ndarray] = None

## üß† Document Intelligence
Detect document boundaries and classify document types so retrieval can be routed to the most
relevant subset of chunks (improves precision in multi-document PDFs).


In [None]:
def classify_document_type(text: str, max_length: int = 1500) -> str:
    """
    Classify the document type based on its content.
    Uses LLM to intelligently identify document category.
    """
    # Truncate text if too long to avoid token limits
    text_sample = text[:max_length] if len(text) > max_length else text

    prompt = f"""
    Analyze this document and classify it into ONE of these categories:
    - Resume: CV, professional profile, work history
    - Contract: Legal agreement, terms and conditions, service agreement
    - Mortgage Contract: Home loan agreement, mortgage terms, property financing
    - Invoice: Bill, payment request, financial statement
    - Pay Slip: Salary statement, wage slip, earnings statement
    - Lender Fee Sheet: Loan fees, lender charges, closing costs
    - Land Deed: Property deed, title document, ownership certificate
    - Bank Statement: Account statement, transaction history
    - Tax Document: W2, 1099, tax return, tax form
    - Insurance: Insurance policy, coverage document
    - Report: Analysis, research document, findings
    - Letter: Correspondence, memo, communication
    - Form: Application, questionnaire, data entry form
    - ID Document: Driver's license, passport, identification
    - Medical: Medical report, prescription, health record
    - Other: Doesn't fit other categories

    Document sample:
    {text_sample}

    Respond with ONLY the category name, nothing else.
    """

    try:
        doc_type = llm_generate(prompt).strip()


        # Normalize the response
        valid_types = [
            'Resume', 'Contract', 'Mortgage Contract', 'Invoice', 'Pay Slip',
            'Lender Fee Sheet', 'Land Deed', 'Bank Statement', 'Tax Document',
            'Insurance', 'Report', 'Letter', 'Form', 'ID Document',
            'Medical', 'Other'
        ]

        # Find best match (case-insensitive)
        for valid_type in valid_types:
            if doc_type.lower() == valid_type.lower():
                return valid_type

        return 'Other'
    except Exception as e:
        print(f"Classification error: {e}")
        return 'Other'

def detect_document_boundary(prev_text: str, curr_text: str,
                            current_doc_type: str = None) -> bool:
    """
    Detect if two consecutive pages belong to the same document.
    Returns True if they're from the same document.
    """
    # Quick heuristic checks first
    if not prev_text or not curr_text:
        return False

    # Sample the texts for LLM analysis
    prev_sample = prev_text[-500:] if len(prev_text) > 500 else prev_text
    curr_sample = curr_text[:500] if len(curr_text) > 500 else curr_text

    prompt = f"""
    Determine if these two pages are from the SAME document.

    Current document type: {current_doc_type or 'Unknown'}

    End of Previous Page:
    ...{prev_sample}

    Start of Current Page:
    {curr_sample}...

    Consider:
    - Continuity of content
    - Formatting consistency
    - Topic coherence
    - Page numbers or headers

    Answer ONLY 'Yes' if same document or 'No' if different document.
    """

    try:
        return llm_generate(prompt).strip().lower().startswith("yes")

    except Exception as e:
        print(f"Boundary detection error: {e}")
        # Default to keeping pages together if uncertain
        return True

## üìë PDF Extraction & Document Segmentation
Extract text page-by-page (OCR fallback when needed), then split the PDF into logical documents
using boundary detection. Each logical document is labeled with a document type.


In [None]:
def extract_and_analyze_pdf(pdf_file) -> Tuple[List[PageInfo], List[LogicalDocument]]:
    """
    Extract text from PDF and perform intelligent document analysis.
    Returns both page-level info and logical document groupings.
    Supports various file types including scanned PDFs with OCR.
    """
    print("üìñ Starting PDF extraction and analysis...")

    # Extract text from each page
    if isinstance(pdf_file, dict) and "content" in pdf_file:
        doc = fitz.open(stream=pdf_file["content"], filetype="pdf")
    elif hasattr(pdf_file, "read"):
        doc = fitz.open(stream=pdf_file.read(), filetype="pdf")
    else:
        doc = fitz.open(pdf_file)

    pages_info = []
    for i, page in enumerate(doc):
        text = page.get_text()

        # OCR fallback: used only when a page has no selectable text (common for scanned PDFs).

        if not text.strip():
            print(f"  Page {i}: No text found, attempting OCR...")
            try:
                # Convert page to image and perform OCR
                pix = page.get_pixmap(matrix=fitz.Matrix(2, 2))  # higher res helps OCR
                img_data = pix.tobytes("png")

                from PIL import Image
                import pytesseract, io

                img = Image.open(io.BytesIO(img_data)).convert("L")  # grayscale
                text = pytesseract.image_to_string(img, config="--oem 3 --psm 6")

                print(f"  Page {i}: OCR extracted {len(text)} characters")
            except Exception as e:
                print(f"  Page {i}: OCR failed - {e}")
                text = ""

        pages_info.append(PageInfo(page_num=i, text=text))

    doc.close()

    if not pages_info:
        raise ValueError("No text could be extracted from PDF")

    print(f"‚úÖ Extracted {len(pages_info)} pages")

    # Perform document classification and boundary detection
    print("üß† Analyzing document structure...")
    logical_docs = []
    current_doc_type = None
    current_doc_pages = []
    doc_counter = 0

    for i, page_info in enumerate(pages_info):
        if i == 0:
            # First page - classify document type
            current_doc_type = classify_document_type(page_info.text)
            page_info.doc_type = current_doc_type
            page_info.page_in_doc = 0
            current_doc_pages = [page_info]
            print(f"  Page {i}: New document detected - {current_doc_type}")
        else:
            # Check if this page continues the previous document
            prev_text = pages_info[i-1].text
            is_same = detect_document_boundary(prev_text, page_info.text, current_doc_type)

            if is_same:
                # Continue current document
                page_info.doc_type = current_doc_type
                page_info.page_in_doc = len(current_doc_pages)
                current_doc_pages.append(page_info)
            else:
                # New document detected - save previous and start new
                logical_doc = LogicalDocument(
                    doc_id=f"doc_{doc_counter}",
                    doc_type=current_doc_type,
                    page_start=current_doc_pages[0].page_num,
                    page_end=current_doc_pages[-1].page_num,
                    text="\n\n".join([p.text for p in current_doc_pages])
                )
                logical_docs.append(logical_doc)
                doc_counter += 1

                # Start new document
                current_doc_type = classify_document_type(page_info.text)
                page_info.doc_type = current_doc_type
                page_info.page_in_doc = 0
                current_doc_pages = [page_info]
                print(f"  Page {i}: New document detected - {current_doc_type}")

    # Don't forget the last document
    if current_doc_pages:
        logical_doc = LogicalDocument(
            doc_id=f"doc_{doc_counter}",
            doc_type=current_doc_type,
            page_start=current_doc_pages[0].page_num,
            page_end=current_doc_pages[-1].page_num,
            text="\n\n".join([p.text for p in current_doc_pages])
        )
        logical_docs.append(logical_doc)

    print(f"‚úÖ Identified {len(logical_docs)} logical documents")
    for ld in logical_docs:
        print(f"   - {ld.doc_type}: Pages {ld.page_start}-{ld.page_end}")

    return pages_info, logical_docs

## ‚úÇÔ∏è Chunking & Metadata
Chunk each logical document into overlapping windows (500 words, 100 overlap) and attach metadata
(doc type + page range) so results can be cited in the final answer.


In [None]:
def chunk_document_with_metadata(logical_doc: LogicalDocument,
                                chunk_size: int = 500,
                                overlap: int = 100) -> List[ChunkMetadata]:
    """
    Chunk a logical document while preserving rich metadata.
    Uses sliding window with overlap for better context.
    """
    chunks_metadata = []
    words = logical_doc.text.split()

    if len(words) <= chunk_size:
        # Document is small enough to be a single chunk
        chunk_meta = ChunkMetadata(
            chunk_id=f"{logical_doc.doc_id}_chunk_0",
            doc_id=logical_doc.doc_id,
            doc_type=logical_doc.doc_type,
            chunk_index=0,
            page_start=logical_doc.page_start,
            page_end=logical_doc.page_end,
            text=logical_doc.text
        )
        chunks_metadata.append(chunk_meta)
    else:
        # Create overlapping chunks
        stride = chunk_size - overlap
        for i, start_idx in enumerate(range(0, len(words), stride)):
            end_idx = min(start_idx + chunk_size, len(words))
            chunk_text = ' '.join(words[start_idx:end_idx])

            # Calculate which pages this chunk spans
            # NOTE: Page span estimation is approximate.
            # A production version would track token-to-page mapping precisely.
            chunk_position = start_idx / len(words)
            page_range = logical_doc.page_end - logical_doc.page_start
            relative_page = int(chunk_position * page_range)
            chunk_page_start = logical_doc.page_start + relative_page
            chunk_page_end = min(chunk_page_start + 1, logical_doc.page_end)

            chunk_meta = ChunkMetadata(
                chunk_id=f"{logical_doc.doc_id}_chunk_{i}",
                doc_id=logical_doc.doc_id,
                doc_type=logical_doc.doc_type,
                chunk_index=i,
                page_start=chunk_page_start,
                page_end=chunk_page_end,
                text=chunk_text
            )
            chunks_metadata.append(chunk_meta)

            if end_idx >= len(words):
                break

    return chunks_metadata

def chunk_with_llama_index(logical_doc: LogicalDocument,
                           chunk_size: int = 500,
                           chunk_overlap: int = 100) -> List[Document]:
    """
    Alternative: Use LlamaIndex's advanced chunking with metadata.
    """
    # Create LlamaIndex document with metadata
    doc = Document(
        text=logical_doc.text,
        metadata={
            "doc_id": logical_doc.doc_id,
            "doc_type": logical_doc.doc_type,
            "page_start": logical_doc.page_start,
            "page_end": logical_doc.page_end,
            "source": f"{logical_doc.doc_type}_document"
        }
    )

    # Use LlamaIndex's sentence splitter for better chunking
    splitter = SentenceSplitter(
        chunk_size=chunk_size,
        chunk_overlap=chunk_overlap,
        paragraph_separator="\n\n",
        separator=" ",
    )

    # Create nodes (chunks) from document
    nodes = splitter.get_nodes_from_documents([doc])

    # Convert to our ChunkMetadata format for consistency
    chunks_metadata = []
    for i, node in enumerate(nodes):
        chunk_meta = ChunkMetadata(
            chunk_id=f"{logical_doc.doc_id}_chunk_{i}",
            doc_id=logical_doc.doc_id,
            doc_type=logical_doc.doc_type,
            chunk_index=i,
            page_start=node.metadata.get("page_start", logical_doc.page_start),
            page_end=node.metadata.get("page_end", logical_doc.page_end),
            text=node.text
        )
        chunks_metadata.append(chunk_meta)

    return chunks_metadata

def process_all_documents(logical_docs: List[LogicalDocument],
                         use_llama_index: bool = False) -> List[ChunkMetadata]:
    """
    Process all logical documents into chunks with metadata.
    Can use either custom or LlamaIndex chunking.
    """
    all_chunks = []

    for logical_doc in logical_docs:
        if use_llama_index:
            chunks = chunk_with_llama_index(logical_doc)
        else:
            chunks = chunk_document_with_metadata(logical_doc)

        logical_doc.chunks = chunks  # Store reference
        all_chunks.extend(chunks)
        print(f"üìÑ {logical_doc.doc_type}: Created {len(chunks)} chunks")

    return all_chunks

## üéØ Retrieval (Dense Vector Search + Query Routing)
We use dense embeddings + FAISS for similarity search. For multi-document PDFs, we route queries to the
most likely document type (when confidence is high) to reduce noise and improve precision.




In [None]:
def predict_query_document_type(query: str) -> Tuple[str, float]:
    """
    Predict which document type is most likely to contain the answer.
    Returns predicted type and confidence score.
    """
    prompt = f"""
    Analyze this query and predict which document type would most likely contain the answer.

    Query: "{query}"

    Choose the MOST LIKELY type from:
    - Resume: Career, experience, education, skills, employment history
    - Contract: Terms, agreements, obligations, parties, legal terms
    - Mortgage Contract: Home loan, property financing, mortgage terms, interest rates
    - Invoice: Payments, amounts due, billing, charges, invoiced items
    - Pay Slip: Salary, wages, deductions, earnings, pay period
    - Lender Fee Sheet: Loan fees, closing costs, origination fees, lender charges
    - Land Deed: Property ownership, deed information, property description, title
    - Bank Statement: Account balance, transactions, deposits, withdrawals
    - Tax Document: Tax information, W2, 1099, tax returns, tax amounts
    - Insurance: Coverage, policy details, premiums, claims
    - Report: Analysis, findings, conclusions, research data
    - Letter: Communications, requests, notifications, correspondence
    - Form: Applications, submitted data, form fields
    - ID Document: Personal identification, ID numbers, identity verification
    - Medical: Health information, medical conditions, prescriptions
    - Other: General or unclear

    Respond in JSON format:
    {{"type": "DocumentType", "confidence": 0.85}}

    Confidence should be between 0.0 and 1.0
    """

    try:
        result = json.loads(llm_generate(prompt).strip())

        return result.get("type", "Other"), result.get("confidence", 0.5)
    except Exception as e:
        print(f"Query routing error: {e}")
        return "Other", 0.0

class IntelligentRetriever:
    """
    Advanced retrieval system with metadata filtering and query routing.
    """

    def __init__(self):
        self.index = None
        self.chunks_metadata = []
        self.doc_type_indices = {}  # Separate indices per doc type

    def build_indices(self, chunks_metadata: List[ChunkMetadata]):
        """
        Build FAISS indices with document type segregation.
        """
        print("üî® Building vector indices...")
        self.chunks_metadata = chunks_metadata

        # Create embeddings for all chunks
        texts = [chunk.text for chunk in chunks_metadata]
        embeddings = embed_model.encode(texts, show_progress_bar=True)

        # Store embeddings in metadata
        for i, chunk in enumerate(chunks_metadata):
            chunk.embedding = embeddings[i]

        # Build main index
        dim = embeddings.shape[1]
        self.index = faiss.IndexFlatL2(dim)
        self.index.add(embeddings)

        # Build separate indices for each document type
        doc_types = set(chunk.doc_type for chunk in chunks_metadata)
        for doc_type in doc_types:
            type_indices = [i for i, chunk in enumerate(chunks_metadata)
                          if chunk.doc_type == doc_type]
            if type_indices:
                type_embeddings = embeddings[type_indices]
                type_index = faiss.IndexFlatL2(dim)
                type_index.add(type_embeddings)
                self.doc_type_indices[doc_type] = {
                    'index': type_index,
                    'mapping': type_indices  # Maps back to original chunks
                }

        print(f"‚úÖ Indexed {len(chunks_metadata)} chunks across {len(doc_types)} document types")

    def retrieve(self, query: str, k: int = 4,
                filter_doc_type: Optional[str] = None,
                auto_route: bool = True) -> List[Tuple[ChunkMetadata, float]]:
        """
        Retrieve relevant chunks with optional filtering and routing.
        Returns chunks with relevance scores.
        """
        query_embedding = embed_model.encode([query])

        # Determine which index to search
        if filter_doc_type and filter_doc_type in self.doc_type_indices:
            # Use filtered index
            type_data = self.doc_type_indices[filter_doc_type]
            D, I = type_data['index'].search(query_embedding, k)
            # Map back to original chunks
            chunk_indices = [type_data['mapping'][i] for i in I[0]]
            distances = D[0]
        elif auto_route:
            # Predict best document type
            predicted_type, confidence = predict_query_document_type(query)
            print(f"üéØ Query routed to: {predicted_type} (confidence: {confidence:.2f})")

            if confidence > 0.7 and predicted_type in self.doc_type_indices:
                # High confidence - use specific index
                type_data = self.doc_type_indices[predicted_type]
                D, I = type_data['index'].search(query_embedding, k)
                chunk_indices = [type_data['mapping'][i] for i in I[0]]
                distances = D[0]
            else:
                # Low confidence - search all
                D, I = self.index.search(query_embedding, k)
                chunk_indices = I[0]
                distances = D[0]
        else:
            # Search all chunks
            D, I = self.index.search(query_embedding, k)
            chunk_indices = I[0]
            distances = D[0]

        # Convert distances to similarity scores (inverse)
        max_dist = max(distances) if len(distances) > 0 else 1.0
        scores = [(max_dist - d) / max_dist for d in distances]

        results = [(self.chunks_metadata[i], scores[idx])
                  for idx, i in enumerate(chunk_indices)]

        return results

## üí¨ Enhanced Answer Generation with Source
Use the retrieved chunks as the only allowed context for the LLM. The response includes:
- final answer
- document type + page ranges used
- simple confidence signal based on retrieval similarity

In [None]:
def generate_answer_with_sources(query: str,
                                retrieved_chunks: List[Tuple[ChunkMetadata, float]]) -> Dict:
    """
    Generate answer with detailed source attribution.
    """
    if not retrieved_chunks:
        return {
            'answer': "I couldn't find relevant information to answer your question.",
            'sources': [],
            'confidence': 0.0
        }

    # Prepare context from retrieved chunks
    context_parts = []
    sources = []

    for chunk_meta, score in retrieved_chunks:
        context_parts.append(f"[From {chunk_meta.doc_type}, Pages {chunk_meta.page_start}-{chunk_meta.page_end}]")
        context_parts.append(chunk_meta.text)
        context_parts.append("")

        sources.append({
            'doc_type': chunk_meta.doc_type,
            'pages': f"{chunk_meta.page_start}-{chunk_meta.page_end}",
            'relevance': f"{score:.2%}",
            'preview': chunk_meta.text[:100] + "..."
        })

    context = "\n".join(context_parts)

    # Generate answer
    prompt = f"""
    You are a helpful AI assistant. Use the provided context to answer the question.
    Be specific and cite which document type and pages support your answer.

    Context:
    {context}

    Question: {query}

    Instructions:
    1. Answer based ONLY on the provided context
    2. Mention which document type(s) contain the information
    3. Be concise (< 500 characters) but complete
    4. If the context doesn't contain enough information, say so

    Answer:
    """

    try:
        answer = llm_generate(prompt).strip()


        # Calculate overall confidence based on retrieval scores
        avg_score = sum(s for _, s in retrieved_chunks) / len(retrieved_chunks)

        return {
            'answer': answer,
            'sources': sources,
            'confidence': avg_score,
            'chunks_used': len(retrieved_chunks)
        }
    except Exception as e:
        print(f"Answer generation error: {e}")
        return {
            'answer': f"Error generating answer: {str(e)}",
            'sources': sources,
            'confidence': 0.0
        }

## üèóÔ∏è Enhanced Document Store

In [None]:
import time
class EnhancedDocumentStore:
    """
    Manages the complete document processing and retrieval pipeline.
    """

    def __init__(self):
        self.pages_info = []
        self.logical_docs = []
        self.chunks_metadata = []
        self.retriever = IntelligentRetriever()
        self.is_ready = False
        self.processing_stats = {}
        self.filename = None

    def process_pdf(self, pdf_file, filename: str = "document.pdf"):
        """
        Complete PDF processing pipeline.
        """
        self.filename = filename
        self.is_ready = False
        start_time = datetime.now()

        try:
            # Extract and analyze PDF
            self.pages_info, self.logical_docs = extract_and_analyze_pdf(pdf_file)

            # Chunk documents with metadata
            self.chunks_metadata = process_all_documents(self.logical_docs)

            # Build retrieval indices
            self.retriever.build_indices(self.chunks_metadata)

            # Calculate processing statistics
            process_time = (datetime.now() - start_time).total_seconds()
            self.processing_stats = {
                'filename': filename,
                'total_pages': len(self.pages_info),
                'documents_found': len(self.logical_docs),
                'total_chunks': len(self.chunks_metadata),
                'document_types': list(set(doc.doc_type for doc in self.logical_docs)),
                'processing_time': f"{process_time:.1f}s"
            }

            self.is_ready = True
            return True, self.processing_stats

        except Exception as e:
            return False, {'error': str(e)}

    def query(self, question: str, filter_type: Optional[str] = None,
          auto_route: bool = True, k: int = 4) -> Dict:
        """
        Query the document store.
        """
        if not self.is_ready:
            return {
                'answer': "Please upload and process a PDF first.",
                'sources': [],
                'confidence': 0.0
            }

        t0 = time.time()  # ‚úÖ start timer

        # Retrieve relevant chunks
        retrieved = self.retriever.retrieve(
            question, k=k,
            filter_doc_type=filter_type,
            auto_route=auto_route
        )

        # Generate answer with sources
        result = generate_answer_with_sources(question, retrieved)
        result['filter_used'] = filter_type or ('auto' if auto_route else 'none')

        result['latency_sec'] = round(time.time() - t0, 2)  # ‚úÖ end timer
        return result


    def get_document_structure(self) -> List[Dict]:
        """
        Get the document structure for UI display.
        """
        if not self.logical_docs:
            return []

        structure = []
        for doc in self.logical_docs:
            structure.append({
                'id': doc.doc_id,
                'type': doc.doc_type,
                'pages': f"{doc.page_start + 1}-{doc.page_end + 1}",  # 1-indexed for UI
                'chunks': len(doc.chunks) if doc.chunks else 0,
                'preview': doc.text[:200] + "..." if len(doc.text) > 200 else doc.text
            })

        return structure

## üé® Demo UI (Gradio)
Interactive interface to:
- Upload and preview PDFs
- Process + segment documents
- Ask questions with optional filters
- Download chat logs for evaluation evidence

In [None]:
# Initialize global document store for the demo session
doc_store = EnhancedDocumentStore()

def process_pdf_handler(pdf_file):
    """Handle PDF upload and processing."""
    if pdf_file is None:
        return "‚ö†Ô∏è Please upload a PDF file", "", gr.update(choices=["All"], value="All")

    # ‚úÖ pdf_file is a filepath string because gr.File(type="filepath")
    filename = os.path.basename(pdf_file) if isinstance(pdf_file, str) else getattr(pdf_file, "name", "document.pdf")

    # Process the PDF
    success, stats = doc_store.process_pdf(pdf_file, filename=filename)

    if success:
        # Prepare status message
        status_msg = f"""
‚úÖ **Successfully Processed:**
- üìÑ File: {stats.get('filename', filename)}
- üìë Pages: {stats.get('total_pages', 0)}
- üìö Documents Found: {stats.get('documents_found', 0)}
- üß© Chunks Created: {stats.get('total_chunks', 0)}
- üè∑Ô∏è Types: {', '.join(stats.get('document_types', []))}
- ‚è±Ô∏è Time: {stats.get('processing_time', 'N/A')}
"""

        # Get document structure for display
        structure = doc_store.get_document_structure()
        structure_display = "\n".join([
            f"‚Ä¢ **{doc['type']}** (Pages {doc['pages']}): {doc['chunks']} chunks"
            for doc in structure
        ]) if structure else "_No structure detected._"

        # Update filter choices
        doc_types = ["All"] + stats.get("document_types", ["Other"])
        return status_msg, structure_display, gr.update(choices=doc_types, value="All")

    return f"‚ùå Error: {stats.get('error', 'Unknown error')}", "", gr.update(choices=["All"], value="All")

def _pdf_page_count(pdf_path: str) -> int:
    doc = fitz.open(pdf_path)
    n = doc.page_count
    doc.close()
    return max(1, n)

def render_page(pdf_path: str, page_num: int):
    """Return (image_array, page_label_text)."""
    if not pdf_path:
        return None, "Page 0 / 0"

    total = _pdf_page_count(pdf_path)
    page_num = int(page_num)
    page_num = max(1, min(page_num, total))

    doc = fitz.open(pdf_path)
    page = doc.load_page(page_num - 1)
    pix = page.get_pixmap(matrix=fitz.Matrix(2, 2))
    doc.close()

    # ‚úÖ Convert pixmap to numpy array Gradio can display
    img = np.frombuffer(pix.samples, dtype=np.uint8).reshape(pix.height, pix.width, pix.n)

    return img, f"Page {page_num} / {total}"

def on_pdf_uploaded(pdf_path: str):
    """Hide uploader, show viewer, initialize page=1."""
    if not pdf_path:
        return (
            gr.update(visible=True),   # upload_box
            gr.update(visible=False),  # viewer_box
            None,                      # page_image
            1,                         # page_state
            "Page 0 / 0",              # page_label
            None                       # stored_pdf
        )

    img, label = render_page(pdf_path, 1)
    return (
        gr.update(visible=False),     # upload_box
        gr.update(visible=True),      # viewer_box
        img,                          # page_image
        1,                            # page_state
        label,                        # page_label
        pdf_path                      # stored_pdf
    )

def go_next(pdf_path: str, page_num: int):
    if not pdf_path:
        return None, page_num, "Page 0 / 0"
    total = _pdf_page_count(pdf_path)
    page_num = min(int(page_num) + 1, total)
    img, label = render_page(pdf_path, page_num)
    return img, page_num, label

def go_prev(pdf_path: str, page_num: int):
    if not pdf_path:
        return None, page_num, "Page 0 / 0"
    page_num = max(int(page_num) - 1, 1)
    img, label = render_page(pdf_path, page_num)
    return img, page_num, label

def replace_pdf():
    """Show uploader again and hide viewer."""
    return (
        gr.update(visible=True),   # upload_box
        gr.update(visible=False),  # viewer_box
        None,                      # page_image
        1,                         # page_state
        "Page 0 / 0",              # page_label
        None                       # stored_pdf
    )

def chat_handler(message, history, doc_filter, auto_route, num_chunks):
    # Gradio "messages format": list of dicts with role/content
    history = history or []

    if not doc_store.is_ready:
        history.append({"role": "user", "content": message})
        history.append({"role": "assistant", "content": "üìö Please upload and process a PDF document first."})
        return history

    filter_type = None if doc_filter == "All" else doc_filter

    result = doc_store.query(
        message,
        filter_type=filter_type,
        auto_route=auto_route and filter_type is None,
        k=int(num_chunks)
    )

    response = f"{result.get('answer', '')}\n\n"
    if result.get("sources"):
        response += "üìç **Sources:**\n"
        for src in result["sources"]:
            response += f"‚Ä¢ {src['doc_type']} (Pages {src['pages']}) - Relevance: {src['relevance']}\n"

    response += (
        f"\n*Confidence: {result.get('confidence', 0.0):.1%} | "
        f"Filter: {result.get('filter_used', 'auto')} | "
        f"Latency: {result.get('latency_sec', 'N/A')}s*"
    )

    history.append({"role": "user", "content": message})
    history.append({"role": "assistant", "content": response})

    return history


def _content_to_text(content):
    """Convert Gradio content (str | list | dict) into plain text."""
    if content is None:
        return ""
    if isinstance(content, str):
        return content
    if isinstance(content, list):
        # Could be list of strings or list of dict parts
        parts = []
        for item in content:
            if isinstance(item, str):
                parts.append(item)
            elif isinstance(item, dict):
                # common shapes: {"text": "..."} or {"type":"text","text":"..."}
                parts.append(str(item.get("text") or item.get("content") or item))
            else:
                parts.append(str(item))
        return "\n".join([p for p in parts if p]).strip()
    if isinstance(content, dict):
        return str(content.get("text") or content.get("content") or content).strip()
    return str(content).strip()

def save_chat_txt(history):
    """Save chat to a .txt file and return filepath for download."""
    history = history or []

    fd, path = tempfile.mkstemp(suffix=".txt", prefix="chat_")
    os.close(fd)

    with open(path, "w", encoding="utf-8") as f:
        f.write("Document Q&A Chat Log\n")
        f.write(f"Saved: {datetime.now().isoformat()}\n")
        f.write(f"PDF: {getattr(doc_store, 'filename', None)}\n")
        f.write("\n" + "=" * 50 + "\n\n")

        # CASE A: old Gradio tuple format [(user, bot), ...]
        if len(history) > 0 and isinstance(history[0], (list, tuple)) and len(history[0]) == 2:
            for i, (u, a) in enumerate(history, start=1):
                f.write(f"--- Turn {i} ---\n")
                f.write(f"USER: {_content_to_text(u)}\n\n")
                f.write(f"ASSISTANT: {_content_to_text(a)}\n\n")
            return path

        # CASE B: messages format [{"role": "...", "content": ...}, ...]
        turn = 0
        for msg in history:
            if not isinstance(msg, dict):
                # fallback if Gradio gives something unexpected
                f.write(f"{_content_to_text(msg)}\n\n")
                continue

            role = str(msg.get("role", "unknown")).upper()
            content = _content_to_text(msg.get("content"))
            if not content:
                continue

            if role == "USER":
                turn += 1
                f.write(f"--- Turn {turn} ---\n")

            f.write(f"{role}: {content}\n\n")

    return path




def create_interface():
    """Create the enhanced Gradio interface with unified single-tab layout."""
    with gr.Blocks(title="Enhanced Document Q&A", theme=gr.themes.Soft()) as demo:
        gr.Markdown("""
# üöÄ Enhanced Document Q&A System
### Intelligent Multi-Document Analysis with Advanced RAG Pipeline
""")

        with gr.Row():
            # Left side - PDF preview and upload
            with gr.Column(scale=2):

                # --- NEW: two "boxes" in the SAME spot ---
                upload_box = gr.Column(visible=True)
                viewer_box = gr.Column(visible=False)

                # --- NEW: state for current page + stored pdf path ---
                page_state = gr.State(1)
                stored_pdf = gr.State(None)

                with upload_box:
                    pdf_input = gr.File(
                        label="üìÑ Upload PDF",
                        file_types=[".pdf"],
                        type="filepath"  # gives you a path string
                    )

                with viewer_box:
                    page_label = gr.Markdown("Page 0 / 0")
                    page_image = gr.Image(label="PDF Page Preview", height=600)

                    with gr.Row():
                        prev_btn = gr.Button("‚¨ÖÔ∏è Prev", scale=1)
                        next_btn = gr.Button("Next ‚û°Ô∏è", scale=1)
                        replace_btn = gr.Button("üîÅ Replace PDF", scale=1)

                with gr.Row():
                    process_btn = gr.Button(
                        "üîÑ Process Document",
                        variant="primary",
                        size="lg",
                        scale=2
                    )
                    clear_all_btn = gr.Button(
                        "üóëÔ∏è Clear All",
                        variant="secondary",
                        size="lg",
                        scale=1
                    )

            # Middle - Document info and settings
            with gr.Column(scale=1):
                gr.Markdown("### üìä Document Info")
                status_output = gr.Markdown(
                    value="‚è≥ Waiting for PDF upload..."
                )

                structure_output = gr.Markdown(
                    value="",
                    label="Document Structure"
                )

                gr.Markdown("### ‚öôÔ∏è Settings")
                doc_filter = gr.Dropdown(
                    choices=["All"],
                    value="All",
                    label="üè∑Ô∏è Document Type Filter",
                    info="Filter search to specific document type"
                )

                auto_route = gr.Checkbox(
                    value=True,
                    label="üéØ Auto-Route Queries",
                    info="Automatically detect relevant document type"
                )

                num_chunks = gr.Slider(
                    minimum=1,
                    maximum=10,
                    value=4,
                    step=1,
                    label="üìä Chunks to Retrieve"
                )

            # Right side - Chat interface
            with gr.Column(scale=2):
                gr.Markdown("### üí¨ Ask Questions")
                chatbot = gr.Chatbot(
                  label="Conversation",
                  height=500,
                  elem_id="chatbot",
                  show_label=False,
                )

                with gr.Row():
                    msg_input = gr.Textbox(
                        label="Ask a question",
                        placeholder="e.g., What are the payment terms? What is the total amount?",
                        scale=4,
                        show_label=False
                    )
                    send_btn = gr.Button("üì§ Send", scale=1, variant="primary")

                with gr.Row():
                    example_btn1 = gr.Button("üìù What's the summary?", size="sm", scale=1)
                    example_btn2 = gr.Button("üí∞ Find amounts", size="sm", scale=1)

                with gr.Row():
                    clear_chat_btn = gr.Button("üóëÔ∏è Clear Chat", size="sm", scale=1)


                with gr.Row():
                    download_chat_btn = gr.DownloadButton("üíæ Download Chat (.txt)", variant= "primary")




        # Status bar at the bottom
        with gr.Row():
            status_bar = gr.Markdown(
                value="**Status:** Ready | **Documents:** 0 | **Chunks:** 0 | **Cache Hits:** 0/0",
                elem_id="status_bar"
            )

        # Event handlers
        def update_status_bar():
            """Update the status bar with current statistics."""
            if doc_store.is_ready:
                stats = doc_store.processing_stats

                # ‚úÖ Safe getattr so it never crashes
                total_q = getattr(doc_store.retriever, "total_queries", 0)
                hits = getattr(doc_store.retriever, "cache_hits", 0)
                cache_rate = (hits / total_q) * 100 if total_q else 0

                return (
                    f"**Status:** ‚úÖ Ready | **Documents:** {stats.get('documents_found', 0)} | "
                    f"**Chunks:** {stats.get('total_chunks', 0)} | **Cache Rate:** {cache_rate:.0f}%"
                )
            return "**Status:** Ready | **Documents:** 0 | **Chunks:** 0 | **Cache Hits:** 0/0"

        def clear_all():
            """Clear everything and reset the interface."""
            global doc_store
            doc_store = EnhancedDocumentStore()
            return (
                None,  # pdf_input
                "‚è≥ Waiting for PDF upload...",  # status_output
                "",  # structure_output
                gr.update(choices=["All"], value="All"),  # doc_filter
                [],  # chatbot
                "",  # msg_input
                update_status_bar(),  # status_bar
                gr.update(visible=True),   # upload_box
                gr.update(visible=False),  # viewer_box
                None,  # page_image
                "Page 0 / 0",  # page_label
                1,  # page_state
                None  # stored_pdf
            )

        # Process PDF handler with status bar update
        def process_pdf_with_status(pdf_file):
            status, structure, filter_update = process_pdf_handler(pdf_file)
            status_bar_text = update_status_bar()
            return status, structure, filter_update, status_bar_text

        # Chat handler with status bar update
        def chat_with_status(message, history, doc_filter, auto_route, num_chunks):
            new_history = chat_handler(message, history, doc_filter, auto_route, num_chunks)
            status_bar_text = update_status_bar()
            return new_history, status_bar_text

        # Example question handlers
        def ask_summary(history, doc_filter, auto_route, num_chunks):
            return chat_handler(
                "Can you provide a summary of the main points in this document?",
                history, doc_filter, auto_route, num_chunks
            )

        def ask_amounts(history, doc_filter, auto_route, num_chunks):
            return chat_handler(
                "What are all the monetary amounts or financial figures mentioned?",
                history, doc_filter, auto_route, num_chunks
            )

        # Wire up all the events
        process_btn.click(
            fn=process_pdf_with_status,
            inputs=[pdf_input],
            outputs=[status_output, structure_output, doc_filter, status_bar]
        )

        clear_all_btn.click(
            fn=clear_all,
            outputs=[
                pdf_input, status_output, structure_output, doc_filter,
                chatbot, msg_input, status_bar,
                upload_box, viewer_box, page_image, page_label, page_state, stored_pdf
            ]
        )

        # Chat interactions
        msg_input.submit(
            fn=chat_with_status,
            inputs=[msg_input, chatbot, doc_filter, auto_route, num_chunks],
            outputs=[chatbot, status_bar]
        ).then(lambda: "", outputs=[msg_input])

        send_btn.click(
            fn=chat_with_status,
            inputs=[msg_input, chatbot, doc_filter, auto_route, num_chunks],
            outputs=[chatbot, status_bar]
        ).then(lambda: "", outputs=[msg_input])

        clear_chat_btn.click(lambda: [], outputs=[chatbot])

        example_btn1.click(
            fn=ask_summary,
            inputs=[chatbot, doc_filter, auto_route, num_chunks],
            outputs=[chatbot]
        ).then(fn=update_status_bar, outputs=[status_bar])

        example_btn2.click(
            fn=ask_amounts,
            inputs=[chatbot, doc_filter, auto_route, num_chunks],
            outputs=[chatbot]
        ).then(fn=update_status_bar, outputs=[status_bar])

                # Auto-process when PDF is uploaded (your existing logic)
        pdf_input.upload(
            fn=process_pdf_with_status,
            inputs=[pdf_input],
            outputs=[status_output, structure_output, doc_filter, status_bar]
        )

        pdf_input.upload(
            fn=on_pdf_uploaded,
            inputs=[pdf_input],
            outputs=[upload_box, viewer_box, page_image, page_state, page_label, stored_pdf]
        )

        # ‚úÖ BULLETPROOF: Colab sometimes doesn't fire .upload(), so also use .change()
        pdf_input.change(
            fn=process_pdf_with_status,
            inputs=[pdf_input],
            outputs=[status_output, structure_output, doc_filter, status_bar]
        )

        pdf_input.change(
            fn=on_pdf_uploaded,
            inputs=[pdf_input],
            outputs=[upload_box, viewer_box, page_image, page_state, page_label, stored_pdf]
        )

        # ‚úÖ page navigation buttons (fix output order)
        prev_btn.click(
            fn=go_prev,
            inputs=[stored_pdf, page_state],
            outputs=[page_image, page_state, page_label]
        )

        next_btn.click(
            fn=go_next,
            inputs=[stored_pdf, page_state],
            outputs=[page_image, page_state, page_label]
        )

        # ‚úÖ replace button (fix output order)
        replace_btn.click(
            fn=replace_pdf,
            outputs=[upload_box, viewer_box, page_image, page_state, page_label, stored_pdf]
        )

        # DownloadButton expects a filepath.
        #We generate a .txt file from chat history and return its path.
        download_chat_btn.click(
            fn=save_chat_txt,
            inputs=[chatbot],
            outputs=[download_chat_btn]
        )







    return demo




In [None]:
print(llm_generate("Respond with ONLY this exact word: Resume"))


In [None]:
demo = create_interface()
demo.launch(share=True, debug=True)