
# RAG Lab: PDF â†’ Chunking â†’ Embeddings â†’ ChromaDB â†’ Retrieval

**Updated:** 2025-11-08

Welcome! In this lab you'll build a small Retrieval-Augmented Generation (RAG) data pipeline:
1. Load PDFs from a directory
2. Chunk the text using either **fixed-size** chunks with **overlap** or **LLM-assisted** segmentation
3. Embed the chunks with a compact Sentence-Transformers model
4. Store vectors and metadata in a local **ChromaDB** collection
5. Run a simple **retrieval** (top-k nearest neighbors) and optionally synthesize a short answer

### What you'll learn
- Why chunking and overlap improve recall
- How to choose and use an embedding model consistently
- How a local vector DB (ChromaDB) stores and retrieves embeddings
- How to ground an answer with top-k passages

### Glossary (quick)
- **Chunk**: A small slice of text extracted from documents.
- **Overlap**: Repeating some tokens between adjacent chunks to avoid cutting important context.
- **Embedding**: A fixed-length vector representation of text.
- **Vector DB**: A database that stores vectors and supports similarity search (e.g., top-k retrieval).
- **Top-k retrieval**: Return the k most similar chunks to a query (nearest neighbors).


In [None]:

# âœ… Install minimal dependencies (CPU-friendly). If these are already installed, this cell is a no-op.
# We pin versions for classroom reproducibility. Feel free to loosen pins later.
%pip -q install "pymupdf<1.25" "chromadb==0.4.24" "sentence-transformers==2.5.1" "tqdm>=4.66.0"

import sys, platform, importlib
print("Python:", sys.version.split()[0], "| Platform:", platform.platform())

def _ver(pkg):
    try:
        return importlib.import_module(pkg).__version__
    except Exception as e:
        return f"not found ({e})"

print("PyMuPDF:", _ver("fitz"))
print("chromadb:", _ver("chromadb"))
print("sentence_transformers:", _ver("sentence_transformers"))
print("tqdm:", _ver("tqdm"))


In [None]:

# ---- Imports & configuration ----
import os, re, uuid, glob, math, json
from pathlib import Path
from typing import List, Dict, Any, Optional
from dataclasses import dataclass

import numpy as np
import pandas as pd
from tqdm import tqdm
import fitz  # PyMuPDF

# Embeddings
from sentence_transformers import SentenceTransformer

# --- Student-editable configuration ---
PDF_DIR = "./pdfs"                  # directory with source PDFs
PERSIST_DIR = "./rag_chroma"        # ChromaDB persistence path (folder will be created)
COLLECTION_NAME = "cnu_rag_lab"     # collection name
CHUNK_WORDS = 220                   # fixed-size chunk length (~words)
CHUNK_OVERLAP_WORDS = 40            # overlap between chunks (~words)
EMBED_MODEL_NAME = "sentence-transformers/all-MiniLM-L6-v2"
USE_LLM_CHUNKING = False            # set True to try optional LLM-assisted chunking
MAX_PAGES_PER_PDF: Optional[int] = None  # set an int (e.g., 5) to limit pages for demos

# Sanity: ensure dirs exist
Path(PDF_DIR).mkdir(parents=True, exist_ok=True)
Path(PERSIST_DIR).mkdir(parents=True, exist_ok=True)

# Helper for neat printing
def head(df, n=5):
    try:
        from caas_jupyter_tools import display_dataframe_to_user
        display_dataframe_to_user("Preview", df.head(n))
    except Exception:
        display(df.head(n))



## 1) Load PDFs

We'll extract text with **PyMuPDF** page-by-page. Empty or whitespace-only pages are dropped.  
**Why this matters:** We want a clean, normalized text corpus before chunking to ensure consistent chunk lengths and quality.


In [None]:

def normalize_ws(text: str) -> str:
    '''Normalize whitespace and strip control chars.'''
    text = re.sub(r"[ \t]+", " ", text)
    text = re.sub(r"\s+\n", "\n", text)
    text = re.sub(r"\n\s+", "\n", text)
    return text.strip()

def load_pdfs(pdf_dir: str, max_pages: Optional[int] = None) -> List[Dict[str, Any]]:
    '''
    Extract text from all PDFs in a directory.
    Returns a list of page records: {"doc_id","source","page","text"}.
    '''
    records = []
    paths = sorted(glob.glob(os.path.join(pdf_dir, "*.pdf")))
    if not paths:
        print(f"[WARN] No PDFs found in {pdf_dir}. Place some PDFs there and re-run.")
        return records

    for path in paths:
        try:
            doc = fitz.open(path)
        except Exception as e:
            print(f"[WARN] Could not open {path}: {e}")
            continue
        doc_id = str(uuid.uuid4())
        page_count = len(doc)
        limit = min(page_count, max_pages) if isinstance(max_pages, int) else page_count
        for i in range(limit):
            try:
                text = doc[i].get_text()
            except Exception as e:
                print(f"[WARN] Could not read page {i} of {path}: {e}")
                continue
            text = normalize_ws(text)
            if text:
                records.append({
                    "doc_id": doc_id,
                    "source": os.path.basename(path),
                    "page": i + 1,  # 1-based for humans
                    "text": text
                })
        doc.close()
    return records

pages = load_pdfs(PDF_DIR, MAX_PAGES_PER_PDF)
print(f"Loaded {len(pages)} non-empty pages from {PDF_DIR}.")
if pages:
    import pandas as _pd
    _df_pages = _pd.DataFrame(pages)
    head(_df_pages, 5)
else:
    print("Add PDFs to the directory and re-run this cell.")



## 2) Chunking (Fixed-Size with Overlap)

We'll implement a tokenizer-free word splitter and then slide a window with overlap.  
**Why overlap?** It preserves context that may straddle chunk boundaries, improving recall during retrieval.


In [None]:

WORD_RE = re.compile(r"\w+(?:'\w+)?|[^\w\s]", flags=re.UNICODE)

def words(text: str):
    'A lightweight word+punctuation splitter.'
    return WORD_RE.findall(text)

def chunk_fixed(text: str, size_words: int = CHUNK_WORDS, overlap_words: int = CHUNK_OVERLAP_WORDS):
    '''
    Split text into overlapping chunks measured in approx words.
    Returns a list of chunk strings.
    '''
    w = words(text)
    chunks = []
    start = 0
    while start < len(w):
        end = min(start + size_words, len(w))
        chunk_text = " ".join(w[start:end])
        if chunk_text.strip():
            chunks.append(chunk_text)
        if end == len(w):
            break
        start = max(end - overlap_words, 0)
    return chunks

def chunk_pages_fixed(page_records: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
    '''
    Map pages -> fixed-size chunks with metadata for each chunk.
    '''
    out = []
    for rec in page_records:
        for ch in chunk_fixed(rec["text"], CHUNK_WORDS, CHUNK_OVERLAP_WORDS):
            out.append({
                "chunk_id": str(uuid.uuid4()),
                "doc_id": rec["doc_id"],
                "source": rec["source"],
                "page": rec["page"],
                "method": "fixed",
                "text": ch
            })
    return out

chunk_records_fixed = chunk_pages_fixed(pages) if pages else []
print(f"Fixed-size chunking produced {len(chunk_records_fixed)} chunks.")
if chunk_records_fixed:
    import pandas as _pd
    _df_chunks_fixed = _pd.DataFrame(chunk_records_fixed)
    head(_df_chunks_fixed[ ['chunk_id','source','page','method','text'] ], 5)



## 3) Optional: LLM-Assisted Chunking (Semantic Segmentation)

**Idea:** Ask an LLM to segment a page into coherent sections (headings, paragraphs, lists). Then, for any very long segments, re-chunk using the fixed method above so that chunks stay small.

> This path is **optional** and requires an API key. If you set `USE_LLM_CHUNKING = True`, make sure `OPENAI_API_KEY` is in your environment. Keep your key private.


In [None]:

def llm_segment_page(text: str, model: str = "gpt-4o-mini") -> List[str]:
    '''
    Use an LLM to segment the page text into coherent sections.
    Returns a list of segments. On any error or short text, returns [text].
    '''
    text = text.strip()
    if len(text.split()) < 80:
        return [text]
    try:
        import os
        api_key = os.getenv("OPENAI_API_KEY")
        if not api_key:
            # No key -> no network call; just fall back gracefully
            return [text]

        # Install only if needed
        try:
            from openai import OpenAI  # modern SDK
        except Exception:
            import sys
            print("[INFO] Installing openai client...")
            !{sys.executable} -m pip -q install --upgrade openai
            from openai import OpenAI

        client = OpenAI(api_key=api_key)

        prompt = (
            "You segment text into coherent sections such as headings and paragraphs. "
            "Return a JSON list of strings where each string is a segment. "
            "Keep segments between 150-400 words when possible.\n\n"
            f"TEXT:\n{text[:8000]}"
        )
        resp = client.responses.create(
            model=model,
            input=[{"role":"user","content":prompt}],
            response_format={"type":"json_object"}
        )
        # Extract JSON safely
        content = resp.output[0].content[0].text  # SDK shapes can vary; adjust if needed
        import json as _json
        parsed = _json.loads(content)
        segments = parsed.get("segments") or parsed.get("data") or parsed.get("list")
        if isinstance(segments, list) and all(isinstance(s, str) for s in segments):
            # light cleanup
            segments = [s.strip() for s in segments if s.strip()]
            return segments or [text]
        return [text]
    except Exception as e:
        # Silent, robust fallback
        return [text]

def rechunk_if_long(segments: List[str], max_words: int = 600) -> List[str]:
    '''If a segment is too long, re-chunk with the fixed method to keep sizes bounded.'''
    out = []
    for s in segments:
        if len(words(s)) > max_words:
            out.extend(chunk_fixed(s, CHUNK_WORDS, CHUNK_OVERLAP_WORDS))
        else:
            out.append(s)
    return out

def chunk_pages_llm(page_records: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
    '''
    Segment each page via LLM, then bound segment length. Tag method='llm'.
    Requires OPENAI_API_KEY for actual LLM calls; otherwise falls back implicitly.
    '''
    out = []
    for rec in tqdm(page_records, desc="LLM segmenting pages"):
        segs = llm_segment_page(rec["text"])
        segs = rechunk_if_long(segs, max_words=600)
        for s in segs:
            out.append({
                "chunk_id": str(uuid.uuid4()),
                "doc_id": rec["doc_id"],
                "source": rec["source"],
                "page": rec["page"],
                "method": "llm",
                "text": s
            })
    return out

chunk_records_llm = chunk_pages_llm(pages) if (pages and USE_LLM_CHUNKING) else []
print(f"LLM-assisted path produced {len(chunk_records_llm)} chunks (0 if not enabled).")



## 4) Embeddings

We'll use a compact, high-quality model: `sentence-transformers/all-MiniLM-L6-v2` (384-dim).  
**Why this matters:** Consistent embeddings for documents **and** queries are necessary for good retrieval.


In [None]:

# Choose which chunks to use based on the flag
chunks = chunk_records_llm if (USE_LLM_CHUNKING and chunk_records_llm) else chunk_records_fixed

if not chunks:
    raise RuntimeError("No chunks available. Add PDFs to PDF_DIR and re-run the earlier cells.")

# Build a DataFrame for convenience
df = pd.DataFrame(chunks)
print("Chunks:", len(df))
head(df[['chunk_id','source','page','method','text']], 5)

# Load the embedder and encode in batches
embedder = SentenceTransformer(EMBED_MODEL_NAME)
print("Embedding model loaded:", EMBED_MODEL_NAME)

batch_size = 64
embeddings = []
for i in tqdm(range(0, len(df), batch_size), desc="Embedding chunks"):
    batch_texts = df['text'].iloc[i:i+batch_size].tolist()
    batch_vecs = embedder.encode(batch_texts, batch_size=min(32, len(batch_texts)), show_progress_bar=False, convert_to_numpy=True, normalize_embeddings=True)
    embeddings.append(batch_vecs)

embeddings = np.vstack(embeddings).astype("float32")
print("Embeddings shape:", embeddings.shape)
df["embedding"] = list(embeddings)



## 5) ChromaDB: Create / Persist Collection

We'll use a **PersistentClient** so your vectors survive across sessions in `PERSIST_DIR`.  
**Why this matters:** Persistence lets you build once and query many times.


In [None]:

import chromadb
from chromadb.utils import embedding_functions

client = chromadb.PersistentClient(path=PERSIST_DIR)

# Create or get the collection. Using upsert allows repeatable runs.
try:
    collection = client.get_collection(COLLECTION_NAME)
except Exception:
    collection = client.create_collection(COLLECTION_NAME)

# Upsert (safe to re-run)
collection.upsert(
    ids=df["chunk_id"].tolist(),
    embeddings=df["embedding"].tolist(),
    metadatas=df[["doc_id", "source", "page", "method"]].to_dict(orient="records"),
    documents=df["text"].tolist()
)
print(f"Collection '{COLLECTION_NAME}' now has {collection.count()} vectors. Persisted at: {PERSIST_DIR}")



## 6) Retrieval: Topâ€‘k Nearest Chunks

We'll embed the user query with the **same model** and query ChromaDB for the nearest chunks.


In [None]:

def embed_query(q: str) -> np.ndarray:
    '''Embed a query string using the same SentenceTransformer model (normalized).'''
    v = embedder.encode([q], convert_to_numpy=True, normalize_embeddings=True)
    return v[0].astype("float32")

def query_topk(question: str, k: int = 5) -> pd.DataFrame:
    '''Return a pretty DataFrame with rank, distance (smaller is closer), and metadata.'''
    q_emb = embed_query(question)
    res = collection.query(
        query_embeddings=[q_emb],
        n_results=k,
        include=["documents", "metadatas", "distances", "ids"]
    )
    # Flatten results
    rows = []
    for rank, (cid, dist, meta, doc) in enumerate(zip(res["ids"][0], res["distances"][0], res["metadatas"][0], res["documents"][0]), start=1):
        snippet = (doc[:240] + "â€¦") if len(doc) > 240 else doc
        rows.append({
            "rank": rank,
            "distance": round(float(dist), 4),
            "source": meta.get("source"),
            "page": meta.get("page"),
            "method": meta.get("method"),
            "chunk_id": cid,
            "snippet": snippet
        })
    out = pd.DataFrame(rows)
    return out

# Try a query (edit this to your corpus):
example_queries = [
    "Summarize the main purpose of this document.",
    "What are the key definitions introduced?",
    "List the steps or procedures mentioned in the text."
]
print("Try:", example_queries[0])
res_df = query_topk(example_queries[0], k=5)
head(res_df, 5)



## 7) Miniâ€‘RAG: Concise Answer Synthesis (Optional)

If you have an API key in your environment (`OPENAI_API_KEY`), we can ask a model for a **short, grounded** answer. Otherwise, we'll just show the retrieved context.


In [None]:

def build_context(df_hits: pd.DataFrame) -> str:
    ctx_lines = []
    for _, row in df_hits.iterrows():
        ctx_lines.append(f"[{row['source']} p.{row['page']} | {row['method']}] {row['snippet']}")
    return "\n".join(ctx_lines)

def synthesize_answer(question: str, topk: int = 5, model: str = "gpt-4o-mini") -> None:
    hits = query_topk(question, k=topk)
    ctx = build_context(hits)
    api_key = os.getenv("OPENAI_API_KEY")
    if not api_key:
        print("No OPENAI_API_KEY found. Showing retrieved context instead:\n")
        print(ctx)
        return
    try:
        from openai import OpenAI
    except Exception:
        import sys
        print("[INFO] Installing openai client...")
        !{sys.executable} -m pip -q install --upgrade openai
        from openai import OpenAI

    client = OpenAI(api_key=api_key)

    sys_prompt = "You are a concise teaching assistant. Answer only using the provided evidence. Cite filename and page."
    user_prompt = f"Question: {question}\n\nEvidence:\n{ctx}\n\nIf insufficient, say you don't know."
    try:
        resp = client.responses.create(model=model, input=[
            {"role":"system","content":sys_prompt},
            {"role":"user","content":user_prompt}
        ])
        answer = resp.output_text if hasattr(resp, "output_text") else str(resp)
        print(answer.strip())
    except Exception as e:
        print("LLM call failed; printing context instead.\nReason:", e)
        print(ctx)

# Example (uncomment to try):
# synthesize_answer("What problem does this document address?", topk=5)



## 8) Sanity Checks & Simple Stats

Quick checks to ensure counts and shapes look reasonable.


In [None]:

def chunk_word_count(s: str) -> int:
    return len(words(s))

stats = df["text"].apply(chunk_word_count).describe()
print("Chunk length (words) stats:\n", stats.to_string())

print("\nVectors in collection:", collection.count())
print("Unique docs:", df["doc_id"].nunique())
print("Chunking method(s):", df["method"].value_counts().to_dict())



## 9) Wrapâ€‘Up & Next Steps

**Key takeaways**
- Fixed-size chunks with overlap are simple and reliable; LLM-assisted segmentation can yield cleaner semantic boundaries but requires an API key and careful prompting.
- Use the **same** embedding model for both documents and queries.
- Persisting to ChromaDB allows fast iteration without re-embedding every run.

**Try next**
- Tune `CHUNK_WORDS`/`CHUNK_OVERLAP_WORDS`.
- Swap the embedder: e.g., `"BAAI/bge-small-en-v1.5"` (remember to re-embed queries too).
- Add a reranker (e.g., cross-encoder) to re-order top-k hits before synthesis.
- Track sources when synthesizing answers (we included filename + page).

Happy building! ðŸŽ“
