In [1]:
%run ./utils.ipynb

✅ Successfully connected to MongoDB.


In [1]:
import gradio as gr
import os
import shutil
import fitz  # PyMuPDF
from pymongo import MongoClient
from langchain.text_splitter import RecursiveCharacterTextSplitter
from typing import List, Dict
import chromadb
from chromadb.config import Settings
from sentence_transformers import SentenceTransformer
from numpy import dot
from numpy.linalg import norm
import numpy as np


# Ensure ./PDFs directory exists
os.makedirs("./PDFs", exist_ok=True)

# MongoDB setup
MONGO_URI = "mongodb+srv://Aaron:1234@cluster0.erwea75.mongodb.net/?retryWrites=true&w=majority&appName=Cluster0"

def get_mongo_client(uri):
    return MongoClient(uri)

def get_collection(client, db_name, collection_name):
    return client[db_name][collection_name]

def insert_pdf_pages_to_mongo(collection, pdf_name, pages):
    for page in pages:
        if not collection.find_one({"pdf_name": pdf_name, "page_number": page["page_number"]}):
            collection.insert_one({
                "pdf_name": pdf_name,
                "page_number": page["page_number"],
                "text": page["text"]
            })

def extract_text_from_pdf(file_path):
    doc = fitz.open(file_path)
    pages = []
    for page_number in range(len(doc)):
        page = doc.load_page(page_number)
        text = page.get_text().strip()
        pages.append({
            "page_number": page_number + 1,
            "text": text
        })
    doc.close()
    return pages

# Upload Logic
uploaded_pdfs = []

def upload_docs(files):
    saved_files = []
    for file in files:
        filename = os.path.basename(file.name)
        dest_path = os.path.join("./PDFs", filename)

        if os.path.abspath(file.name) != os.path.abspath(dest_path):
            shutil.copy(file.name, dest_path)
            saved_files.append(filename)
        else:
            saved_files.append(filename + " (already exists)")

    global uploaded_pdfs
    uploaded_pdfs = saved_files
    return f"Uploaded: {', '.join(saved_files)}"

# Mongo Upload Logic
def upload_to_mongo():
    if not uploaded_pdfs:
        return "❌ No PDFs uploaded yet."

    client = get_mongo_client(MONGO_URI)
    collection = get_collection(client, "pdf_rag_db", "pages")

    inserted = []
    skipped = []

    for pdf in uploaded_pdfs:
        full_path = os.path.join("./PDFs", pdf)

        if collection.find_one({"pdf_name": pdf}):
            skipped.append(pdf)
            continue

        pages = extract_text_from_pdf(full_path)
        insert_pdf_pages_to_mongo(collection, pdf, pages)
        inserted.append(pdf)

    result = ""
    if inserted:
        result += f"✅ Uploaded to MongoDB: {', '.join(inserted)}\n"
    if skipped:
        result += f"⚠️ Skipped (already in DB): {', '.join(skipped)}"
    return result.strip()

# Add this helper function to get list of uploaded PDFs
def get_uploaded_pdfs():
    return [f for f in os.listdir("./PDFs") if f.endswith(".pdf")]

# ------------------ CHUNKING LOGIC ------------------
def chunking(
    selected_pdf,
    db_name: str = "pdf_rag_db",
    collection_name: str = "pages",
    chunk_size: int = 500,
    chunk_overlap: int = 100
) -> str:
    client = get_mongo_client(MONGO_URI)
    pages_collection = get_collection(client, db_name, collection_name)
    documents = list(pages_collection.find({"pdf_name": selected_pdf}))
    if not documents:
        return f"❌ No pages found in MongoDB for {selected_pdf}."

    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=chunk_size,
        chunk_overlap=chunk_overlap,
        separators=["\n\n", "\n", ".", " ", ""]
    )

    chunked_docs = []
    for doc in documents:
        chunks = text_splitter.split_text(doc["text"])
        for idx, chunk in enumerate(chunks):
            chunked_docs.append({
                "pdf_name": doc["pdf_name"],
                "page_number": doc["page_number"],
                "chunk_index": idx,
                "chunk_text": chunk
            })

    if not chunked_docs:
        return "❌ No chunks created."

    chunk_collection = get_collection(client, db_name, "chunks")
    chunk_collection.insert_many(chunked_docs)
    return f"✅ Stored {len(chunked_docs)} chunks for {selected_pdf} in MongoDB."

def display_chunks(selected_pdf):
    client = get_mongo_client(MONGO_URI)
    collection = get_collection(client, "pdf_rag_db", "chunks")
    chunks = list(collection.find({"pdf_name": selected_pdf}, {"_id": 0}))
    return chunks if chunks else f"⚠️ No chunks found in MongoDB for {selected_pdf}."

# ------------------ EMBEDDING & CHROMADB ------------------
def get_all_chroma_ids(collection):
    all_ids = set()
    offset = 0
    batch_size = 100  # or 500, adjust as needed

    while True:
        result = collection.get(ids=None, limit=batch_size, offset=offset)
        ids = result.get("ids", [])
        if not ids:
            break
        all_ids.update(ids)
        offset += batch_size

    return all_ids

def embed_chunks_to_mongo(selected_pdf):
    client = get_mongo_client(MONGO_URI)
    chunk_collection = get_collection(client, "pdf_rag_db", "chunks")

    chunks = list(chunk_collection.find({"pdf_name": selected_pdf}))
    if not chunks:
        return f"❌ No chunks found for {selected_pdf}."

    # Add embedding to each chunk
    model = SentenceTransformer("all-MiniLM-L6-v2")
    updates = []
    for chunk in chunks:
        if "embedding" in chunk:
            continue  # Already embedded

        embedding = model.encode(chunk["chunk_text"]).tolist()
        updates.append({
            "filter": {"_id": chunk["_id"]},
            "update": {"$set": {"embedding": embedding}}
        })

    for update in updates:
        chunk_collection.update_one(update["filter"], update["update"])

    return f"✅ Embedded and stored {len(updates)} new chunks for {selected_pdf} in MongoDB."


# Load the same model used for embedding
embedding_model = SentenceTransformer("all-MiniLM-L6-v2")

def cosine_similarity(a, b):
    return dot(a, b) / (norm(a) * norm(b))

def search_query(question, top_k=5):
    client = get_mongo_client(MONGO_URI)
    chunk_collection = get_collection(client, "pdf_rag_db", "chunks")

    # Embed the query
    query_embedding = embedding_model.encode([question])[0]

    # Find all chunks with embeddings
    chunks = list(chunk_collection.find({"embedding": {"$exists": True}}))

    # Compute similarity
    scored = []
    for chunk in chunks:
        score = cosine_similarity(query_embedding, chunk["embedding"])
        scored.append((score, chunk))

    # Sort by similarity
    top_matches = sorted(scored, key=lambda x: x[0], reverse=True)[:top_k]

    if not top_matches:
        return "⚠️ No matching chunks found."

    formatted = []
    for i, (score, chunk) in enumerate(top_matches):
        formatted.append(
            f"🧩 Chunk {i+1} (Score: {score:.4f}) — Page {chunk.get('page_number')}:\n{chunk.get('chunk_text')}"
        )
    return "\n\n---\n\n".join(formatted)


# ------------------ UI ------------------
with gr.Blocks() as demo:
    gr.Markdown("## 📄 PDF Upload + MongoDB + Knowledge Nuggets")

    with gr.Tab("Upload PDFs"):
        upload_input = gr.File(label="Upload PDFs", file_types=[".pdf"], file_count="multiple")
        upload_result = gr.Textbox(label="Upload Result")
        mongo_result = gr.Textbox(label="MongoDB Result")

        upload_btn = gr.Button("Upload to ./PDFs")
        mongo_btn = gr.Button("Upload Extracted Text to MongoDB")

        upload_btn.click(upload_docs, inputs=upload_input, outputs=upload_result)
        mongo_btn.click(upload_to_mongo, outputs=mongo_result)

    with gr.Tab("Knowledge Nuggets"):
        pdf_select_chunk = gr.Dropdown(label="Select PDF to Chunk", choices=get_uploaded_pdfs(), interactive=True)
        chunk_btn = gr.Button("🔪 Chunk PDF Pages")
        chunk_output = gr.Textbox(label="Chunking Result")

        pdf_select_display = gr.Dropdown(label="Select PDF to Display Chunks", choices=get_uploaded_pdfs(), interactive=True)
        display_btn = gr.Button("📚 Display Chunks")
        display_output = gr.JSON(label="Chunks from MongoDB")

        chunk_btn.click(chunking, inputs=pdf_select_chunk, outputs=chunk_output)
        display_btn.click(display_chunks, inputs=pdf_select_display, outputs=display_output)

    def view_embedded_chunks(pdf_name):
        """
        View all chunks for a given PDF that have been embedded (i.e., have an 'embedding' field).
        """
        chunks_collection = db["chunks"]
        embedded_chunks = list(chunks_collection.find(
            {"pdf_name": pdf_name, "embedding": {"$exists": True}},
            {"_id": 0, "page_number": 1, "chunk_index": 1, "chunk_text": 1}
        ).sort([("page_number", 1), ("chunk_index", 1)]))
    
        return embedded_chunks
        
    with gr.Tab("Embed to Mongo"):
        # PDF selection for embedding
        pdf_select_embed = gr.Dropdown(label="Select PDF to Embed", choices=get_uploaded_pdfs(), interactive=True)
        embed_btn = gr.Button("💾 Embed and Store in ChromaDB")
        embed_output = gr.Textbox(label="Embedding Result")
    
        # Embed selected PDF into ChromaDB
        embed_btn.click(embed_chunks_to_mongo, inputs=pdf_select_embed, outputs=embed_output)




    
    with gr.Tab("Query Knowledge Nuggets"):
        gr.Markdown("### Ask a question about the document")
    
        query_input = gr.Textbox(label="Enter your question", placeholder="e.g. What is the document about?")
        query_button = gr.Button("Search")
        query_output = gr.Textbox(label="Top 5 Matching Chunks", lines=15)
    
        query_button.click(fn=search_query, inputs=query_input, outputs=query_output)


demo.launch(share=True)

  import pkg_resources


Running on local URL:  http://127.0.0.1:7860
IMPORTANT: You are using gradio version 3.39.0, however version 4.44.1 is available, please upgrade.
--------
Running on public URL: https://6e3ec10643b681eb21.gradio.live

This share link expires in 72 hours. For free permanent hosting and GPU upgrades, run `gradio deploy` from Terminal to deploy to Spaces (https://huggingface.co/spaces)


