In [None]:
# Login into Hugging Face Hub
from huggingface_hub import login
login()

In [None]:
import os
from pathlib import Path
from typing import List, Dict, Any, Tuple
import uuid
import nltk
nltk.download('punkt_tab')

import fitz  # PyMuPDF for robust PDF parsing
from sentence_transformers import SentenceTransformer
import numpy as np

# Optional OCR
try:
    from pdf2image import convert_from_path
    import pytesseract
    OCR_AVAILABLE = True
except Exception:
    OCR_AVAILABLE = False

from nltk.tokenize import sent_tokenize
import nltk
nltk.download('punkt', quiet=True)


# ----------------------------
# 1) PDF parsing with OCR fallback
# ----------------------------
def parse_pdf(filepath: str, ocr_if_empty: bool = True) -> List[Dict[str, Any]]:
    """
    Parse PDF into page-level dicts with OCR fallback.
    Returns: [{'page': int, 'text': str, 'source_file': str}]
    """
    doc = fitz.open(filepath)
    pages = []
    for pno in range(len(doc)):
        page = doc[pno]
        text = page.get_text("text") or ""
        text = text.strip()

        # OCR fallback for scanned pages
        if not text and ocr_if_empty and OCR_AVAILABLE:
            pil_pages = convert_from_path(filepath, first_page=pno + 1, last_page=pno + 1, dpi=200)
            if pil_pages:
                text = pytesseract.image_to_string(pil_pages[0])
                text = text.strip()

        pages.append({
            "page": pno + 1,
            "text": text,
            "source_file": os.path.basename(filepath),
            "file_type": "pdf"
        })
    return pages


# ----------------------------
# 2) Header/footer removal
# ----------------------------
def clean_headers_footers(pages: List[Dict], n_top_lines: int = 2) -> List[Dict]:
    """
    Remove repeated headers/footers across pages (heuristic).
    """
    first_lines = []
    last_lines = []
    for p in pages:
        lines = [ln.strip() for ln in p["text"].splitlines() if ln.strip()]
        if lines:
            first_lines.append(lines[:n_top_lines])
            last_lines.append(lines[-n_top_lines:])

    # Flatten and detect repeated candidates
    def flatten(ll): return ["\n".join(seq) for seq in ll if seq]
    common_first = max(flatten(first_lines), key=flatten(first_lines).count, default=None)
    common_last = max(flatten(last_lines), key=flatten(last_lines).count, default=None)

    cleaned = []
    for p in pages:
        text = p["text"]
        if common_first and text.startswith(common_first):
            text = text[len(common_first):].lstrip()
        if common_last and text.endswith(common_last):
            text = text[:-len(common_last)].rstrip()
        p["text"] = text
        cleaned.append(p)
    return cleaned


# ----------------------------
# 3) Semantic chunking
# ----------------------------
def chunk_texts(pages: List[Dict], chunk_chars: int = 1000, overlap: int = 200) -> List[Dict]:
    """
    Split page text into overlapping sentence-aware chunks.
    """
    chunks = []
    cid = 0
    for p in pages:
        sentences = sent_tokenize(p["text"])
        current = ""
        for sent in sentences:
            if len(current) + len(sent) + 1 <= chunk_chars:
                current += " " + sent
            else:
                if current.strip():
                    chunks.append({
                        "id": str(uuid.uuid4()),
                        "page": p["page"],
                        "source_file": p["source_file"],
                        "text": current.strip()
                    })
                # start new chunk with overlap tail
                current = current[-overlap:] + " " + sent
        if current.strip():
            chunks.append({
                "id": str(uuid.uuid4()),
                "page": p["page"],
                "source_file": p["source_file"],
                "text": current.strip()
            })
    return chunks


# ----------------------------
# 4) Main pipeline: process all PDFs in a directory
# ----------------------------
def process_all_pdfs(pdf_directory: str) -> List[Dict]:
    """Process all PDF files in a directory into clean chunks."""
    pdf_dir = Path(pdf_directory)
    pdf_files = list(pdf_dir.glob("**/*.pdf"))

    print(f"Found {len(pdf_files)} PDF files to process")
    all_chunks = []

    for pdf_file in pdf_files:
        print(f"\nProcessing: {pdf_file.name}")
        try:
            pages = parse_pdf(str(pdf_file))
            pages = clean_headers_footers(pages)
            chunks = chunk_texts(pages, chunk_chars=1200, overlap=200)
            all_chunks.extend(chunks)
            print(f"  ✓ {len(chunks)} chunks created from {pdf_file.name}")

        except Exception as e:
            print(f"  ✗ Error processing {pdf_file.name}: {e}")

    print(f"\nTotal chunks generated: {len(all_chunks)}")
    return all_chunks




In [None]:
if __name__ == "__main__":
    chunks = process_all_pdfs("/content/pdf")
    print("\nSample chunk:\n", chunks[5] if chunks else "No chunks")

In [None]:
import numpy as np
from sentence_transformers import SentenceTransformer
from typing import List, Union


class EmbeddingManager:
    """Handles document embedding generation using SentenceTransformer"""

    def __init__(self, model_name: str = "all-MiniLM-L6-v2", device: str = None, normalize: bool = True):
        """
        Initialize the embedding manager

        Args:
            model_name: HuggingFace model name for sentence embeddings
            device: 'cuda', 'cpu', or None (auto-detect)
            normalize: whether to L2-normalize embeddings (recommended for cosine sim)
        """
        self.model_name = model_name
        self.device = device
        self.normalize = normalize
        self.model = None
        self._load_model()

    def _load_model(self):
        """Load the SentenceTransformer model"""
        try:
            print(f"Loading embedding model: {self.model_name} (device={self.device or 'auto'})")
            self.model = SentenceTransformer(self.model_name, device=self.device)
            print(f"✓ Model loaded. Embedding dimension: {self.model.get_sentence_embedding_dimension()}")
        except Exception as e:
            print(f"✗ Error loading model {self.model_name}: {e}")
            raise

    def generate_embeddings(self, texts: List[str], batch_size: int = 32) -> np.ndarray:
        """
        Generate embeddings for a list of texts

        Args:
            texts: List of text strings to embed
            batch_size: Batch size for encoding (avoid GPU OOM)

        Returns:
            numpy array of embeddings with shape (len(texts), embedding_dim)
        """
        if not self.model:
            raise ValueError("Model not loaded")

        if not texts:
            return np.array([])

        print(f"Generating embeddings for {len(texts)} texts (batch={batch_size})...")
        try:
            embeddings = self.model.encode(
                texts,
                batch_size=batch_size,
                show_progress_bar=True,
                convert_to_numpy=True,
                normalize_embeddings=self.normalize
            )
            print(f"✓ Embeddings shape: {embeddings.shape}")
            return embeddings
        except Exception as e:
            print(f"✗ Error generating embeddings: {e}")
            raise

    def embed_query(self, query: str) -> np.ndarray:
        """
        Generate embedding for a single query string
        """
        return self.generate_embeddings([query])[0]





In [None]:
if __name__ == "__main__":
    embedding_manager = EmbeddingManager(model_name="all-MiniLM-L6-v2", device="cuda")
    sample_embeddings = embedding_manager.generate_embeddings(
        ["This is a test sentence.", "This is another one."]
    )
    print("Sample embedding[0] shape:", sample_embeddings[0].shape)

    query_emb = embedding_manager.embed_query("test sentence")
    print("Query embedding shape:", query_emb.shape)

In [None]:
import os
import uuid
import chromadb
import numpy as np
from typing import List, Dict, Any


class VectorStore:
    """Manages document embeddings in a ChromaDB vector store"""

    def __init__(self, collection_name: str = "pdf_documents", persist_directory: str = "/content/vector_store"):
        """
        Initialize the vector store

        Args:
            collection_name: Name of the ChromaDB collection
            persist_directory: Directory to persist the vector store
        """
        self.collection_name = collection_name
        self.persist_directory = persist_directory
        self.client = None
        self.collection = None
        self._initialize_store()

    def _initialize_store(self):
        """Initialize ChromaDB client and collection"""
        try:
            os.makedirs(self.persist_directory, exist_ok=True)
            self.client = chromadb.PersistentClient(path=self.persist_directory)

            self.collection = self.client.get_or_create_collection(
                name=self.collection_name,
                metadata={"description": "PDF document embeddings for RAG"}
            )
            print(f"✓ Vector store initialized. Collection: {self.collection_name}")
            print(f"Existing documents in collection: {self.collection.count()}")
        except Exception as e:
            print(f"✗ Error initializing vector store: {e}")
            raise

    def add_chunks(self, chunks: List[Dict[str, Any]], embeddings: np.ndarray):
        """
        Add text chunks + embeddings to the vector store

        Args:
            chunks: List of dicts with keys: id, text, page, source_file
            embeddings: np.ndarray with shape (len(chunks), dim)
        """
        if len(chunks) != len(embeddings):
            raise ValueError("Number of chunks must match number of embeddings")

        print(f"Adding {len(chunks)} chunks to vector store...")

        ids = []
        metadatas = []
        documents_text = []
        embeddings_list = []

        for i, (chunk, embedding) in enumerate(zip(chunks, embeddings)):
            cid = chunk.get("id") or f"chunk_{uuid.uuid4().hex[:8]}"
            ids.append(cid)

            # metadata
            metadata = {
                "source_file": chunk.get("source_file", "unknown"),
                "page": chunk.get("page", None),
                "content_length": len(chunk.get("text", "")),
                "chunk_index": i
            }
            metadatas.append(metadata)

            # text + embedding
            documents_text.append(chunk["text"])
            embeddings_list.append(embedding.tolist())

        try:
            self.collection.add(
                ids=ids,
                embeddings=embeddings_list,
                metadatas=metadatas,
                documents=documents_text
            )
            print(f"✓ Successfully added {len(chunks)} chunks to vector store")
            print(f"Total documents in collection: {self.collection.count()}")
        except Exception as e:
            print(f"✗ Error adding chunks: {e}")
            raise


In [None]:
# 1) Process PDFs → get clean chunks
chunks = process_all_pdfs("/content/pdf")

# 2) Convert chunks into embeddings
texts = [c["text"] for c in chunks]
embeddings = embedding_manager.generate_embeddings(texts)

# 3) Store in vector database
vectorstore = VectorStore()
vectorstore.add_chunks(chunks, embeddings)


In [None]:
class RAGRetriever:
    """Handles query-based retrieval from the vector store"""

    def __init__(self, vector_store: VectorStore, embedding_manager: EmbeddingManager):
        self.vector_store = vector_store
        self.embedding_manager = embedding_manager

    def retrieve(self, query: str, top_k: int = 5, distance_threshold: float = None) -> List[Dict[str, Any]]:
        """
        Retrieve relevant documents for a query

        Args:
            query: The search query
            top_k: Number of top results to return
            distance_threshold: Optional maximum distance (lower = more similar)

        Returns:
            List of dictionaries containing retrieved documents and metadata
        """
        print(f"\n🔎 Query: {query}")
        print(f"Top K: {top_k}, Distance threshold: {distance_threshold}")

        # Safety: check collection
        if not self.vector_store.collection or self.vector_store.collection.count() == 0:
            print("⚠️ Vector store is empty")
            return []

        # Generate query embedding
        query_embedding = self.embedding_manager.generate_embeddings([query])[0]

        try:
            results = self.vector_store.collection.query(
                query_embeddings=[query_embedding.tolist()],
                n_results=top_k
            )
        except Exception as e:
            print(f"✗ Error during retrieval: {e}")
            return []

        retrieved_docs = []
        documents = results.get("documents", [[]])[0]
        metadatas = results.get("metadatas", [[]])[0]
        distances = results.get("distances", [[]])[0]
        ids = results.get("ids", [[]])[0]

        for i, (doc_id, doc, metadata, dist) in enumerate(zip(ids, documents, metadatas, distances)):
            if distance_threshold is None or dist <= distance_threshold:
                retrieved_docs.append({
                    "id": doc_id,
                    "content": doc,
                    "metadata": metadata,
                    "distance": dist,
                    "rank": i + 1
                })

        print(f"✅ Retrieved {len(retrieved_docs)} documents")
        return retrieved_docs


In [None]:
rag_retriever = RAGRetriever(vectorstore, embedding_manager)

results = rag_retriever.retrieve("what is self attention", top_k=3)
for r in results:
    print(f"\nRank {r['rank']} | Distance={r['distance']:.4f}")
    print(f"From {r['metadata'].get('source_file')} (Page {r['metadata'].get('page')})")
    print(r["content"][:200], "...")


In [None]:
from transformers import AutoTokenizer, AutoModelForCausalLM
import torch
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity

# ---------------------------
# Load Hugging Face model
# ---------------------------
model_name = "google/gemma-3-270m-it"
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForCausalLM.from_pretrained(model_name, device_map="auto")

# ---------------------------
# Local LLM wrapper (Gemma-safe)
# ---------------------------
class LocalLLM:
    def __init__(self, tokenizer, model, max_new_tokens=256, temperature=0.1):
        self.tokenizer = tokenizer
        self.model = model
        self.max_new_tokens = max_new_tokens
        self.temperature = temperature

    def invoke(self, messages):
        # Accept string or list
        prompt = messages[0] if isinstance(messages, list) else messages
        chat_messages = [{"role": "user", "content": prompt}]

        # Tokenize in Gemma's chat template
        inputs = self.tokenizer.apply_chat_template(
            chat_messages,
            add_generation_prompt=True,
            tokenize=True,
            return_dict=True,
            return_tensors="pt"
        ).to(self.model.device)

        with torch.no_grad():
            outputs = self.model.generate(
                input_ids=inputs["input_ids"],
                attention_mask=inputs["attention_mask"],
                max_new_tokens=self.max_new_tokens,
                do_sample=(self.temperature > 0),
                temperature=self.temperature,
            )

        # Decode both ways to avoid empty answers
        raw_output = self.tokenizer.decode(
            outputs[0][inputs["input_ids"].shape[-1]:],
            skip_special_tokens=False
        )
        cleaned_output = self.tokenizer.decode(
            outputs[0][inputs["input_ids"].shape[-1]:],
            skip_special_tokens=True
        ).strip()

        return type("Obj", (), {"content": cleaned_output if cleaned_output else raw_output})

# Initialize LLM
llm = LocalLLM(tokenizer, model)

# ---------------------------
# RAG with MMR + Citations
# ---------------------------
def rag_simple(
    query: str,
    retriever,
    llm,
    top_k: int = 5,
    max_context_chars: int = 4000,
    use_mmr: bool = True,
    diversity: float = 0.7
):
    """
    RAG pipeline for research PDFs with citations and MMR reranking.
    """
    # Step 1: Retrieve
    results = retriever.retrieve(query, top_k=top_k)
    if not results:
        return "No relevant context found to answer the question."

    # Step 2: MMR reranking (optional)
    if use_mmr and len(results) > 2:
        query_emb = retriever.embedding_manager.generate_embeddings([query])[0].reshape(1, -1)
        doc_embs = retriever.embedding_manager.generate_embeddings([r["content"] for r in results])

        selected = []
        candidates = list(range(len(results)))
        while candidates and len(selected) < top_k:
            if not selected:
                sims = cosine_similarity(query_emb, doc_embs[candidates])[0]
                best_idx = candidates[int(sims.argmax())]
            else:
                sims_query = cosine_similarity(query_emb, doc_embs[candidates])[0]
                sims_selected = cosine_similarity(doc_embs[selected], doc_embs[candidates]).max(axis=0)
                mmr_score = diversity * sims_query - (1 - diversity) * sims_selected
                best_idx = candidates[int(mmr_score.argmax())]

            selected.append(best_idx)
            candidates.remove(best_idx)

        results = [results[i] for i in selected]

    # Step 3: Build context with citations
    context_parts, total_len = [], 0
    for doc in results:
        chunk_text = doc['content'].strip()
        source = doc['metadata'].get('source_file', 'unknown')
        page = doc['metadata'].get('page', '?')
        ref = f"[{source} - page {page}]"

        if total_len + len(chunk_text) > max_context_chars:
            break

        context_parts.append(f"{ref}\n{chunk_text}")
        total_len += len(chunk_text)

    context = "\n\n".join(context_parts)

    # Step 4: Structured academic prompt
    prompt = f"""
You are an assistant specialized in academic research papers.
Use ONLY the provided context to answer the question.
If the context does not contain the answer, say clearly:
"The context does not provide enough information."

Context:
{context}

Question: {query}

Answer (include citations like [filename - page]):
"""

    # Step 5: Generate with LLM
    response = llm.invoke([prompt])
    return response.content.strip()




In [None]:
answer = rag_simple("what is an Multi-Head Attention", rag_retriever, llm, top_k=5)
print("---- ANSWER ----")
print(answer if answer else "[EMPTY ANSWER]")
