In [None]:
# Cell 1: Install (run once)
!pip -q install pymupdf sentence-transformers faiss-cpu transformers accelerate

In [None]:
# Cell 2: Imports and Upload PDF
from google.colab import files
import fitz  # PyMuPDF
import numpy as np
import faiss
import torch
import time
import re
from typing import List

from sentence_transformers import SentenceTransformer
from transformers import AutoTokenizer, AutoModelForCausalLM

uploaded = files.upload()
pdf_file = list(uploaded.keys())[0]
print("Uploaded:", pdf_file)

In [None]:
# Cell 3: Extract Text from PDF (PyMuPDF)
def extract_text_from_pdf(pdf_path: str) -> str:
    doc = fitz.open(pdf_path)
    pages = []
    for i in range(len(doc)):
        pages.append(doc.load_page(i).get_text())
    return "\n".join(pages)

raw_text = extract_text_from_pdf(pdf_file).strip()
print("Extracted characters:", len(raw_text))
print(raw_text[:600])

In [None]:
# Cell 4: Custom Text Splitter (No LangChain)
def _split_by_separator(text: str, sep: str) -> List[str]:
    if sep == "":
        return list(text)
    parts = text.split(sep)
    out = []
    for i, p in enumerate(parts):
        if i < len(parts) - 1:
            out.append(p + sep)
        else:
            out.append(p)
    return out

def recursive_text_splitter(
    text: str,
    chunk_size: int = 900,
    chunk_overlap: int = 150,
    separators: List[str] = None,
    min_chunk_chars: int = 200
) -> List[str]:
    if separators is None:
        separators = ["\n\n", "\n", ". ", " ", ""]

    text = (text or "").strip()
    if not text:
        return []

    def _recurse(t: str, seps: List[str]) -> List[str]:
        if len(t) <= chunk_size or not seps:
            return [t]

        sep = seps[0]
        parts = _split_by_separator(t, sep)

        if len(parts) == 1:
            return _recurse(t, seps[1:])

        out, buf = [], ""
        for part in parts:
            if len(part) > chunk_size and seps[1:]:
                if buf.strip():
                    out.append(buf)
                    buf = ""
                out.extend(_recurse(part, seps[1:]))
                continue

            if len(buf) + len(part) <= chunk_size:
                buf += part
            else:
                if buf.strip():
                    out.append(buf)
                buf = part

        if buf.strip():
            out.append(buf)

        return out

    pieces = _recurse(text, separators)

    cleaned = []
    for p in pieces:
        p = p.strip()
        if not p:
            continue
        if cleaned and len(p) < min_chunk_chars:
            cleaned[-1] = (cleaned[-1].rstrip() + " " + p).strip()
        else:
            cleaned.append(p)

    chunks = []
    for p in cleaned:
        if not chunks:
            chunks.append(p)
            continue
        overlap_text = chunks[-1][-chunk_overlap:] if chunk_overlap > 0 else ""
        merged = (overlap_text + p).strip()
        if len(merged) > chunk_size:
            merged = merged[-chunk_size:]
        chunks.append(merged)

    return chunks

chunks = recursive_text_splitter(raw_text, chunk_size=900, chunk_overlap=150)
print("Chunks:", len(chunks))
print("Sample chunk:\n", chunks[0][:600])

In [None]:
# Cell 5: Build Embeddings and FAISS Index
embedding_model = SentenceTransformer("all-MiniLM-L6-v2")

t0 = time.time()
emb = embedding_model.encode(chunks, convert_to_numpy=True, show_progress_bar=True)
t1 = time.time()

dim = emb.shape[1]
index = faiss.IndexFlatL2(dim)
index.add(emb)

print("Embeddings shape:", emb.shape)
print("FAISS vectors:", index.ntotal)
print("Embedding time (s):", round(t1 - t0, 2))

In [None]:
model_name = "distilgpt2"

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Device:", device)

tokenizer = AutoTokenizer.from_pretrained(model_name)

model = AutoModelForCausalLM.from_pretrained(model_name)
model.to(device)
model.eval()

if tokenizer.pad_token_id is None:
    tokenizer.pad_token = tokenizer.eos_token

print("Model loaded on:", next(model.parameters()).device)

In [None]:
def retrieve_context(question: str, top_k: int = 3) -> str:
    q_emb = embedding_model.encode([question], convert_to_numpy=True)
    distances, indices = index.search(q_emb, top_k)
    selected = [chunks[i] for i in indices[0]]
    return "\n\n".join(selected)

def ask_question(
    question: str,
    top_k: int = 3,
    max_input_tokens: int = 512,
    max_new_tokens: int = 120
) -> str:
    t0 = time.time()

    context = retrieve_context(question, top_k=top_k)

    prompt = (
        "Use the context to answer the question.\n"
        "If the answer is not in the context, say: I do not know.\n\n"
        f"Context:\n{context}\n\n"
        f"Question:\n{question}\n\n"
        "Answer:"
    )

    inputs = tokenizer(
        prompt,
        return_tensors="pt",
        truncation=True,
        max_length=max_input_tokens
    )
    inputs = {k: v.to(device) for k, v in inputs.items()}

    t1 = time.time()

    with torch.inference_mode():
        outputs = model.generate(
            **inputs,
            do_sample=False,
            max_new_tokens=max_new_tokens,
            eos_token_id=tokenizer.eos_token_id,
            pad_token_id=tokenizer.pad_token_id
        )

    t2 = time.time()

    gen_ids = outputs[0][inputs["input_ids"].shape[-1]:]
    answer = tokenizer.decode(gen_ids, skip_special_tokens=True).strip()

    print("Timing, tokenize:", round(t1 - t0, 2), "s, generate:", round(t2 - t1, 2), "s")
    print("Input tokens:", inputs["input_ids"].shape[-1])

    return answer

In [None]:
print(ask_question("What is this document about?", top_k=3))

In [None]:
while True:
    query = input("\nAsk a question (type 'exit' to stop): ").strip()
    if query.lower() == "exit":
        break
    print("\nAnswer:\n")
    print(ask_question(query, top_k=3))