In [None]:
#Install dependencies
!pip install -q jedi
!pip install -q transformers accelerate
!pip install -q llama-index-embeddings-huggingface
!pip install -q llama-index llama-index-llms-google-genai
!pip install -q llama-index-readers-file
!pip install -q pymupdf
!pip install -q faiss-cpu
!pip install -q llama-index-vector-stores-faiss
!pip install -q gradio-pdf
!pip install -q numpy pandas

In [None]:
import os
from IPython.display import Markdown,display
import fitz
import faiss
from datetime import datetime
from dataclasses import dataclass
import json
import gradio as gr
from llama_index.core import Document,VectorStoreIndex,SimpleDirectoryReader,ServiceContext,Settings,StorageContext
from llama_index.llms.google_genai import GoogleGenAI
#from llama_index.llms.huggingface import HuggingFaceLLM
from llama_index.embeddings.huggingface import HuggingFaceEmbedding
from llama_index.core.node_parser import SentenceSplitter
from llama_index.core.response_synthesizers import get_response_synthesizer
from llama_index.core.prompts import PromptTemplate
from llama_index.vector_stores.faiss import FaissVectorStore
from llama_index.core.query_engine import RetrieverQueryEngine
from typing import List, Dict, Tuple, Optional
import numpy as np

In [None]:
# Initialize embedding model
embed_model = HuggingFaceEmbedding(model_name = "sentence-transformers/all-MiniLM-L6-v2")
Settings.embed_model = embed_model

In [None]:
llm_gemini = GoogleGenAI(model="models/gemini-2.5-flash")
genai.configure("YOUR_API_KEY_HERE")
Settings.llm = llm_gemini

In [None]:
@dataclass
class PageInfo:
  """Stores information about a single page"""
  page_num: int
  text: str
  doc_type: Optional[str] = None
@dataclass
class LogicalDocument:
  """Represents a logical document within a PDF"""
  doc_id: str
  doc_type: str
  page_start: int
  page_end: int
  text: str
  chunks: List[Dict] = None
@dataclass
class ChunkMetadata:
  """Metadata for each chunk"""
  chunk_id: str
  doc_id: str
  doc_type: str
  chunk_index: str
  page_start: int
  page_end: int
  text: str
  embedding: Optional[np.ndarray] = None

In [None]:
# Function to classify the document type based on its content.
def classify_document(text:str, max_length: int = 500) -> str:

  text_sample = text[:max_length] if len(text) > max_length else text

  prompt = f"""
  You are a document type classifier.Analyze this document and classify it into ONE of these categories : contract,fees worksheet,resume,payslip,other.
  Do not display the contents of the text.
  Document:
  {text_sample}
  Respond with ONLY the category name, nothing else.
  """
  try:
    response = llm_gemini.complete(prompt)
    doc_type = response.text.lower().strip().rstrip(".")
    valid_types = ['Resume','Contract','Fees Worksheet','PaySlip','Other']
    for valid_type in valid_types:
      if doc_type.lower() == valid_type.lower():
        return valid_type
    return 'Other'

    return doc_type
  except Exception as e:
    print(f"Error {str(e)}")
    return "other"
# Function to detect document boundary
  def is_same_document(prev_text: str, curr_text: str, doc_type: str = None) -> bool:
    if not prev_text or not curr_text:
        return False
    prev_sample = prev_text[-500:] if len(prev_text) > 500 else prev_text
    curr_sample = curr_text[:500] if len(curr_text) > 500 else curr_text
    prompt = f"""
    Determine if these two pages are from the SAME document.

    Current document type: {current_doc_type or 'Unknown'}
    End of Previous Page:
    ... {prev_sample}

    Start of Current Page:
    {curr_sample} ...

    Consider:
    - Continuity of content
    - Formatting consistency
    - Topic coherence
    - Page numbers or headers

    Answer ONLY 'Yes' if same document or 'No' if different document.
  """
    try:
      response = llm_gemini.complete(prompt)
      return response.text.strip().lower().startswith("yes")
    except Exception as e:
      print(f"Boundary detection error: {e}")
      return True



In [None]:
def extract_and_analyze_pdf(pdf_file):
    """
    Extract text from a PDF, classify pages by doc_type, group them into logical documents,
    and return page-level info and logical document groupings.
    """
    print("üìñ Starting PDF extraction and analysis...")

    # Load PDF
    if isinstance(pdf_file, dict) and "content" in pdf_file:
        doc = fitz.open(stream=pdf_file["content"], filetype="pdf")
    elif hasattr(pdf_file, "read"):
        doc = fitz.open(stream=pdf_file.read(), filetype="pdf")
    else:
        doc = fitz.open(pdf_file)

    # Extract page text with optional OCR
    pages_info = []
    for i, page in enumerate(doc):
        text = page.get_text()
        if not text.strip():
            print(f"  Page {i}: No text found, skipping...")
            continue
        # Classify the document type
        doc_type = classify_document(text)
        pages_info.append(PageInfo(page_num=i, text=text, doc_type=doc_type))

    doc.close()

    if not pages_info:
        raise ValueError("No text could be extracted from PDF")

    print(f"‚úÖ Extracted {len(pages_info)} pages")

    # Group consecutive pages by doc_type
    current_doc_type = None
    doc_counter = 0

    for i, page in enumerate(pages_info):
        if i == 0:
            current_doc_type = page.doc_type
        else:
            if page.doc_type != current_doc_type:
                doc_counter += 1
                current_doc_type = page.doc_type

        # Assign doc_id and page_in_doc
        page.doc_id = f"doc_{doc_counter}"
        page.page_in_doc = sum(1 for p in pages_info[:i] if p.doc_id == page.doc_id)

    # Create LogicalDocument objects
    logical_docs = []
    unique_doc_ids = sorted(set(p.doc_id for p in pages_info))
    for doc_id in unique_doc_ids:
        pages_in_doc = [p for p in pages_info if p.doc_id == doc_id]
        logical_doc = LogicalDocument(
            doc_id=doc_id,
            doc_type=pages_in_doc[0].doc_type,
            page_start=pages_in_doc[0].page_num,
            page_end=pages_in_doc[-1].page_num,
            text="\n\n".join([p.text for p in pages_in_doc])
        )
        logical_docs.append(logical_doc)

    print(f"‚úÖ Identified {len(logical_docs)} logical documents")
    for ld in logical_docs:
        print(f"   - {ld.doc_type}: Pages {ld.page_start}-{ld.page_end} (ID: {ld.doc_id})")

    return pages_info, logical_docs


In [None]:
def chunk_with_llama_index(logical_doc: LogicalDocument,
                           chunk_size: int = 500,
                           chunk_overlap: int = 100) -> List[Document]:
    """
    Uses LlamaIndex's advanced chunking with metadata.
    """
    # Create LlamaIndex document with metadata
    doc = Document(
        text=logical_doc.text,
        metadata={
            "doc_id": logical_doc.doc_id,
            "doc_type": logical_doc.doc_type,
            "page_start": logical_doc.page_start,
            "page_end": logical_doc.page_end,
            "source": f"{logical_doc.doc_type}_document"
        }
    )

    # Use LlamaIndex's sentence splitter for better chunking
    splitter = SentenceSplitter(
        chunk_size=chunk_size,
        chunk_overlap=chunk_overlap,
        paragraph_separator="\n\n",
        separator=" ",
    )

    # Create nodes (chunks) from document
    nodes = splitter.get_nodes_from_documents([doc])

    # Convert to our ChunkMetadata format for consistency
    chunks_metadata = []
    for i, node in enumerate(nodes):
        chunk_meta = ChunkMetadata(
            chunk_id=f"{logical_doc.doc_id}_chunk_{i}",
            doc_id=logical_doc.doc_id,
            doc_type=logical_doc.doc_type,
            chunk_index=i,
            page_start=node.metadata.get("page_start", logical_doc.page_start),
            page_end=node.metadata.get("page_end", logical_doc.page_end),
            text=node.text
        )
        chunks_metadata.append(chunk_meta)

    return chunks_metadata
def process_all_documents(logical_docs: List[LogicalDocument],
                         chunk_size: int=500,chunk_overlap:int=100) -> List[ChunkMetadata]:
    """
    Process all logical documents into chunks with metadata.
    """
    all_chunks = []

    for logical_doc in logical_docs:
        chunks = chunk_with_llama_index(logical_doc,chunk_size,chunk_overlap)
        logical_doc.chunks = chunks  # Store reference
        all_chunks.extend(chunks)
        print(f"üìÑ {logical_doc.doc_type}: Created {len(chunks)} chunks")

    return all_chunks

In [None]:
def build_faiss_index_from_chunks(all_chunks,embedding_dim=384):
  """
  Build a FAISS based index from document chunks.
  """
  print("Building FAISS semantic index...")
  print(f" Input: {len(all_chunks)} chunks")
  documents = []
  for idx,chunk in enumerate(all_chunks):
    doc = Document(
        text = chunk.text,
        metadata = {
            "chunk_id":chunk.chunk_id,
            "doc_id":chunk.doc_id,
            "doc_type":chunk.doc_type,
            "page_start":chunk.page_start,
            "page_end":chunk.page_end,
            "chunk_index":chunk.chunk_index
        }
    )
    documents.append(doc)

    # Creating FAISS index
  faiss_index = faiss.IndexFlatL2(embedding_dim)
  vector_store = FaissVectorStore(faiss_index=faiss_index)
  storage_context = StorageContext.from_defaults(vector_store=vector_store)

  vector_index = VectorStoreIndex.from_documents(
        documents,
        storage_context=storage_context
    )
  print(f"‚úÖ Created FAISS index with {len(documents)} chunks")
  return vector_index,vector_store





In [None]:
# Global variable to store index
index = None


# Function we use to process the uploaded PDF
def process_pdf(pdf_file,chunk_size=500,chunk_overlap=100):
    global index,current_logical_docs
    if pdf_file is None:
        return "‚ö†Ô∏è Please upload a PDF first!",""

    try:
      pages_info,logical_docs = extract_and_analyze_pdf(pdf_file)
      current_logical_docs = logical_docs
      print(f"\nüîÑ Processing {len(logical_docs)} logical documents...")
      all_chunks = process_all_documents(
          logical_docs,
          chunk_size=chunk_size,
          chunk_overlap=chunk_overlap
      )
      index, vector_store = build_faiss_index_from_chunks(
                              all_chunks,
                              embedding_dim=384
                              )

      # Display information about the page
      page_info_display = "# üìÑ Document Analysis\n\n"
      page_info_display += "## Logical Document Structure\n\n"

      for ld in logical_docs:
            display_start = ld.page_start + 1
            display_end = ld.page_end + 1
            page_range = f"{display_start}-{display_end}" if ld.page_start != ld.page_end else str(display_start)
            num_chunks = len(ld.chunks) if ld.chunks else 0
            page_info_display += f"**{ld.doc_type.upper()}** ({ld.doc_id})\n"
            page_info_display += f"  - Pages: {page_range}\n"
            page_info_display += f"  - Chunks: {num_chunks}\n\n"
      page_info_display += "---\n\n"
      page_info_display += "## üìä Processing Statistics\n\n"
      page_info_display += f"- **Total pages:** {len(pages_info)}\n"
      page_info_display += f"- **Logical documents:** {len(logical_docs)}\n"
      page_info_display += f"- **Total chunks:** {len(all_chunks)}\n"
      page_info_display += f"- **Chunk size:** {chunk_size} tokens\n"
      page_info_display += f"- **Chunk overlap:** {chunk_overlap} tokens\n"
      success_msg = f"‚úÖ Successfully processed {len(pages_info)} page(s) and created {len(all_chunks)} chunks!\n\nYou can now ask questions about the document."
      return success_msg, page_info_display
    except Exception as e:
        error_msg = f"‚ùå Error processing PDF: {str(e)}\n\nPlease make sure the file is a valid PDF."
        return error_msg,""

# Function to handle user questions
def answer(question, chat_history):
    global index

    if index is None:
        return chat_history + [{"role": "assistant", "content": "‚ö†Ô∏è Please upload and process a PDF document first!"}], ""

    if not question or not question.strip():
        return chat_history, ""

    try:
        response_synthesizer = get_response_synthesizer(response_mode="compact",
                                                        text_qa_template = PromptTemplate("You are a helpful and friendly document answering assistant. "
    "Using only the provided context, answer the question accurately.\n\n"
    "If the context does not contain enough information, reply 'Not enough context to provide this answer'."
    "Context:\n{context_str}\n\n"
    "Question: {query_str}\n\n"
    ))
        retriever = index.as_retriever(similarity_top_k=5)

        query_engine = RetrieverQueryEngine(
            retriever=retriever,
            response_synthesizer=response_synthesizer
        )
        response = query_engine.query(question)
        print(response.source_nodes)
        response_text = str(response)
        is_no_context = "not enough context to provide this answer" in response_text.lower()
        if hasattr(response,'source_nodes') and response.source_nodes and not is_no_context:
            top_node = response.source_nodes[0]
            meta = top_node.metadata
            page_start = meta.get('page_start',0) + 1
            page_end = meta.get('page_end',page_start) + 1
            doc_type = meta.get('doc_type','unknown')
            if page_start == page_end:
              source_str = f"Page {page_start} ({doc_type})"
            else:
              source_str = f"Pages {page_start}-{page_end} ({doc_type})"
            response_text += f"\n\nüìç *Sources: {source_str}*"
        chat_history = chat_history + [
            {"role": "user", "content": question},
            {"role": "assistant", "content": response_text}
        ]
        return chat_history, ""  # we are returning the empty string to clear input
    except Exception as e:
        chat_history = chat_history + [
            {"role": "user", "content": question},
            {"role": "assistant", "content": f"‚ùå Error: {str(e)}"}
        ]
        return chat_history, ""

def save_chat(chat_history):
  if not chat_history:
    return "‚ö†Ô∏è No chat to save!"
  timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
  filename = f"chat_{timestamp}.txt"
  with open(filename,'w') as f:
    f.write(f"Chat History -{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n")
    for msg in chat_history:
      f.write(f"{msg['role'].upper()}:\n")
      f.write(f"{msg['content']}\n")
      f.write("-" * 50 + "\n\n")
  return f"Saved to : {filename}"



In [None]:
# Gradio UI
with gr.Blocks(title="PDF Q&A", theme=gr.themes.Soft()) as demo:
    gr.Markdown("# üìö PDF Q&A with Advanced Document Analysis")
    gr.Markdown("Upload a PDF with multiple document types. The system automatically detects, groups, and chunks them.")

    with gr.Row():
        with gr.Column(scale=3):
            chatbot = gr.Chatbot(
                label="Chat with Your Documents",
                height=300,
                type="messages",
            )
            user_input = gr.Textbox(
                placeholder=" Ask a question...",
                show_label=False,
                container=False
            )
            with gr.Row():
                send_btn = gr.Button("üì§ Send", variant="primary", size="sm")
                clear_btn = gr.Button("üóëÔ∏è Clear Chat", size="sm")
                save_btn = gr.Button("üíæ Save Chat",size="sm")

        with gr.Column(scale=1):
            pdf_input = gr.File(
                label="üìÑ Upload PDF",
                file_types=[".pdf"],
                type="filepath"
            )
            process_btn = gr.Button("üîÑ Process Document", variant="primary")
            status_box = gr.Textbox(
                label="Status",
                interactive=False,
                lines=2,
                max_lines=2
            )
            page_info_box = gr.Markdown(
                label="üìã Analysis",
                value="Upload and process a document to see the analysis."
            )


    # Event Handlers
    process_btn.click(
        fn=process_pdf,
        inputs=[pdf_input],
        outputs=[status_box,page_info_box],
        queue=True
    )

    send_btn.click(
        fn=answer,
        inputs=[user_input, chatbot],
        outputs=[chatbot, user_input],
        queue=True
    )

    user_input.submit(
        fn=answer,
        inputs=[user_input, chatbot],
        outputs=[chatbot, user_input],
        queue=True
    )

    clear_btn.click(
        fn=lambda: ([], ""),
        outputs=[chatbot, user_input]
    )
    save_btn.click(fn=save_chat,inputs=chatbot,outputs=status_box)
demo.queue()
demo.launch(debug=False,share=True,inline=True)