In [None]:

import numpy as np # linear algebra
import pandas as pd # data processing

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))



/kaggle/input/constitution/swaziland_constitution.pdf


In [13]:
# Optional: free some space in Kaggle image
!pip uninstall -qqy jupyterlab kfp -y


[0m

In [None]:
# ================================
# Install Required Libraries
# ================================
# This cell installs the dependencies needed for:
# 1. google-genai (v1.7.0)  →  Accessing Google’s Generative AI models (Gemini, etc.)
# 2. chromadb (v0.6.3)      →  A lightweight vector database for storing and retrieving embeddings
# 3. pdfplumber             →  Extracting text from PDF files (preserves layout better than some alternatives)
# 4. PyPDF2                 →  Another PDF processing library, useful for splitting/merging/reading PDFs
# 5. ftfy                   →  "Fixes Text For You" — cleans messy text encoding issues (important for NLP tasks)

# The "-qU" flags mean:
# -q  → Quiet mode (less output in the notebook)
# -U  → Upgrade to the latest version if already installed
!pip install -qU "google-genai==1.7.0" "chromadb==0.6.3" pdfplumber PyPDF2 ftfy


In [None]:
# ==========================================
#  Import Required Python Libraries
# ==========================================

# --- Built-in Python modules ---
import os          # Work with operating system features (paths, environment variables)
import re          # Regular expressions for text pattern matching/cleaning
import json        # Work with JSON data (reading/writing configurations, API responses)
import unicodedata # Handle and normalize Unicode characters (useful for cleaning text)
import uuid        # Generate unique identifiers (e.g., for document IDs)
import textwrap    # Format and wrap text neatly (useful when printing long text blocks)

# dataclasses: Helps define classes for structured data with less boilerplate code
from dataclasses import dataclass, field

# typing: Used for type hints (makes code more readable and easier to debug)
from typing import List, Dict, Any, Optional


# --- Google Generative AI (Gemini) SDK ---
from google import genai                   # Main SDK for interacting with Google's Generative AI models
from google.genai import types             # Provides structured request/response types for API calls
from google.api_core import retry          # Enables automatic retries for failed API calls (network safe)


# --- PDF Processing Libraries ---
import pdfplumber                          # Extracts text from PDFs while preserving layout/structure
import PyPDF2                              # Additional PDF operations: merging, splitting, metadata access
from ftfy import fix_text                  # "Fixes Text For You" — cleans broken or misencoded text


# --- ChromaDB (Vector Database for RAG) ---
import chromadb                            # Main library for working with vector storage and retrieval
from chromadb import Documents, EmbeddingFunction, Embeddings
# (Documents, EmbeddingFunction, Embeddings help us work with text collections and custom embeddings)


# --- Kaggle Secrets ---
from kaggle_secrets import UserSecretsClient
# Allows secure storage and retrieval of API keys/secrets when running code on Kaggle notebooks


In [16]:
# --- Paths ---
PDF_PATH = "/kaggle/input/constitution/swaziland_constitution.pdf"  
OUT_DIR = "/kaggle/working"

In [17]:

MD_OUT = os.path.join(OUT_DIR, "constitution_clean.md")
STRUCT_JSON_OUT = os.path.join(OUT_DIR, "constitution_structured.json")
CHUNKS_JSONL_OUT = os.path.join(OUT_DIR, "constitution_chunks.jsonl")

In [18]:

DOC_ID = "swaziland_constitution_2005"
SOURCE_FILE = os.path.basename(PDF_PATH)

In [19]:
# Chunk sizes (approx words-as-tokens)
TARGET_TOKENS = 600
OVERLAP_TOKENS = 90

In [20]:
# Chroma collection name
DB_NAME = "swazi_constitution_chroma"

In [21]:

# --- Gemini API Key ---
GOOGLE_API_KEY = UserSecretsClient().get_secret("GOOGLE_API_KEY")
assert GOOGLE_API_KEY, "Please add a Kaggle Secret named GOOGLE_API_KEY and enable it for this notebook."

client = genai.Client(api_key=GOOGLE_API_KEY)

In [None]:
# ==========================================
# Text Normalization & Cleaning Functions
# ==========================================

def normalize_text(s: str) -> str:
    """
    Cleans and normalizes raw text strings by:
    1. Normalizing Unicode characters (NFKC form)
    2. Fixing common text encoding issues (using ftfy)
    3. Removing soft hyphens (\u00AD) which break words across lines
    4. Replacing non-breaking spaces (\xa0) with normal spaces
    5. Collapsing multiple spaces/tabs into a single space
    6. Stripping leading/trailing whitespace
    """
    s = unicodedata.normalize("NFKC", s or "")
    s = fix_text(s)
    s = s.replace("\u00AD", "")  # remove soft hyphen
    s = s.replace("\xa0", " ")   # convert non-breaking space to normal space
    s = re.sub(r"[ \t]+", " ", s).strip()
    return s


def looks_like_header_footer(line: str) -> bool:
    """
    Detects whether a given line is likely a header/footer (noise)
    that should be ignored when extracting text from PDFs.

    Rules:
    - If line is empty → not a header/footer.
    - If line is just a page number (1–3 digits) → header/footer.
    - If line matches known "document noise" strings → header/footer.
    - If line consists entirely of underscores → header/footer.
    """
    if not line:
        return False

    # Detect page numbers (e.g., "12")
    if re.fullmatch(r"[0-9]{1,3}", line):
        return True

    # Known header/footer noise strings
    doc_noise = {
        "Swaziland - Constitution 2005",
        "THE CONSTITUTION OF THE KINGDOM OF SWAZILAND ACT, 2005",
        "Arrangement of sections",
        "__________________",
        "___________________",
    }
    if line in doc_noise:
        return True

    # Detect lines made only of underscores
    if re.fullmatch(r"_+", line):
        return True

    return False


def merge_hyphenated_lines(lines: List[str]) -> List[str]:
    """
    Merges lines that were split across pages/lines using hyphenation.

    Example:
    "inter-" (end of line) + "national" (next line) → "international"
    """
    merged = []
    i = 0
    while i < len(lines):
        line = lines[i]
        # Look ahead at next line (avoid going out of bounds)
        if i < len(lines) - 1:
            nxt = lines[i + 1]
            # If current line ends with a hyphenated word and next line starts lowercase → merge
            if re.search(r"[A-Za-z]-$", line) and re.match(r"^[a-z].*", nxt):
                merged.append(line[:-1] + nxt)  # remove hyphen and join
                i += 2
                continue
        merged.append(line)
        i += 1
    return merged


def rebuild_paragraphs(clean_lines: List[str]) -> List[str]:
    """
    Rebuilds full paragraphs from cleaned lines of text.

    Logic:
    - Lines are grouped into paragraphs until a blank line or
      a special "section indicator" (e.g., numbering or CHAPTER title) is found.
    - Each paragraph is joined with spaces and extra whitespace is removed.

    This is crucial to make the text readable and ready for embeddings.
    """

    paragraphs, buf = [], []

    # Helper function to flush the buffer into a paragraph
    def flush():
        if not buf:
            return
        para = " ".join(buf)
        para = re.sub(r" +", " ", para).strip()
        if para:
            paragraphs.append(para)
        buf.clear()

    for line in clean_lines:
        # If line is blank, treat as paragraph break
        if not line.strip():
            flush()
            continue

        # If line starts with numbering or "CHAPTER", treat as its own paragraph
        if re.match(r"^(\(\d+\)|\([a-z]\)|\([ivx]+\)|\d+\.)\s", line, re.I) or re.match(r"^CHAPTER(\s+|$)", line, re.I):
            flush()
            paragraphs.append(line.strip())
        else:
            buf.append(line.strip())

    flush()  # flush remaining buffer
    return paragraphs


In [None]:
# ==========================================
# Extract and Clean Text from a PDF
# ==========================================

def extract_pages(pdf_path: str) -> List[Dict[str, Any]]:
    """
    Extracts text from each page of a PDF and returns a list of cleaned paragraphs.

    Parameters:
    ----------
    pdf_path : str
        The file path to the PDF document.

    Returns:
    -------
    List[Dict[str, Any]]:
        A list of dictionaries, where each dictionary contains:
        {
            "page": page_number,
            "text": cleaned_paragraph_text
        }

    Key Steps:
    ----------
    1. Try using `pdfplumber` first for better layout-preserving text extraction.
    2. If pdfplumber fails (e.g., due to file corruption), fallback to `PyPDF2`.
    3. Normalize and clean text line-by-line:
        - Remove headers, footers, page numbers
        - Merge hyphenated words split across lines
        - Remove decorative lines (underscores, dashes)
    4. Rebuild lines into paragraphs for semantic completeness.
    """

    pages_clean: List[Dict[str, Any]] = []  # Holds final output

    try:
        # --- Primary Extraction with pdfplumber ---
        with pdfplumber.open(pdf_path) as pdf:
            for pno, page in enumerate(pdf.pages, start=1):
                # Extract text with a small tolerance to preserve word grouping
                raw = page.extract_text(x_tolerance=2, y_tolerance=2) or ""
                raw = normalize_text(raw)

                # Split into lines and trim whitespace
                raw_lines = [l.rstrip() for l in raw.splitlines()]
                content_lines = []

                for ln in raw_lines:
                    line = ln.strip()

                    # Skip headers/footers and page numbers
                    if looks_like_header_footer(line):
                        continue
                    if re.fullmatch(r"(Page\s+)?\d{1,3}", line, re.I):
                        continue

                    content_lines.append(line)

                # Merge words split by hyphens across lines
                content_lines = merge_hyphenated_lines(content_lines)

                # Remove decorative lines (e.g., "___" or "---")
                content_lines = [re.sub(r"^[_\-]{3,}$", "", ln) for ln in content_lines]

                # Rebuild paragraphs
                paragraphs = rebuild_paragraphs(content_lines)

                # Store results with page number
                for para in paragraphs:
                    pages_clean.append({"page": pno, "text": para})

    except Exception as e:
        # --- Fallback to PyPDF2 if pdfplumber fails ---
        print(f"[warn] pdfplumber failed ({e}); falling back to PyPDF2...")

        reader = PyPDF2.PdfReader(pdf_path)
        for pno, page in enumerate(reader.pages, start=1):
            raw = page.extract_text() or ""
            raw = normalize_text(raw)
            raw_lines = [l.rstrip() for l in raw.splitlines()]
            content_lines = []

            for ln in raw_lines:
                line = ln.strip()

                # Skip headers/footers and page numbers
                if looks_like_header_footer(line):
                    continue
                if re.fullmatch(r"(Page\s+)?\d{1,3}", line, re.I):
                    continue

                content_lines.append(line)

            # Same cleaning steps as above
            content_lines = merge_hyphenated_lines(content_lines)
            content_lines = [re.sub(r"^[_\-]{3,}$", "", ln) for ln in content_lines]
            paragraphs = rebuild_paragraphs(content_lines)

            for para in paragraphs:
                pages_clean.append({"page": pno, "text": para})

    return pages_clean


In [None]:
from dataclasses import dataclass, field
from typing import List, Dict, Any, Optional
import re

# ==========================================
# Data Models for the Document Hierarchy
# ==========================================

@dataclass
class Subsection:
    """A labeled clause within a Section, e.g., '(a) ...' or '(1) ...'."""
    label: str
    text: str
    page_refs: List[int] = field(default_factory=list)

@dataclass
class Section:
    """A numbered section, e.g., '12. Right to ...', with optional text and subsections."""
    number: str
    title: str
    text: str = ""
    subsections: List[Subsection] = field(default_factory=list)
    page_refs: List[int] = field(default_factory=list)

@dataclass
class Chapter:
    """A CHAPTER with a roman numeral label and a title, containing sections."""
    label: str
    title: str
    sections: List[Section] = field(default_factory=list)
    page_refs: List[int] = field(default_factory=list)


# ==========================================
# 🧩 Parse Flat Paragraphs → Structured TOC
# ==========================================

def parse_structure(pages_paragraphs: List[Dict[str, Any]]) -> Dict[str, Any]:
    """
    Parses a list of cleaned (page, paragraph) entries into a hierarchical structure:
      Chapter → Section → Subsection, carrying page references at each level.
    """
    chapters: List[Chapter] = []
    current_chapter: Optional[Chapter] = None
    current_section: Optional[Section] = None
    pending_section_number: Optional[str] = None  # used when "12." appears and title follows on next line

    for entry in pages_paragraphs:
        page = entry["page"]
        para = entry["text"]

        # -----------------------
        # 1) Chapter detection
        # -----------------------
        m_ch = re.match(r"^CHAPTER\s+([IVXLCDM]+)(?:\s*(.*))?$", para, re.I)
        if m_ch:
            # New chapter resets section state
            pending_section_number = None
            current_section = None

            roman = m_ch.group(1).upper()
            rest = (m_ch.group(2) or "").strip(" -")

            ch_title = rest if rest else ""
            current_chapter = Chapter(
                label=f"CHAPTER {roman}",
                title=ch_title,
                page_refs=[page],
            )
            chapters.append(current_chapter)
            continue

        # If the CHAPTER title is on the next line in ALL CAPS, capture it.
        if current_chapter and not current_chapter.title:
            if len(para.split()) > 2 and para.isupper():
                current_chapter.title = para
                current_chapter.page_refs.append(page)
                continue

        # -----------------------
        # 2) Section detection
        #    Format: "12. Title..."  OR  "12." followed by next paragraph as title
        # -----------------------
        m_sec = re.match(r"^(\d{1,3})\.\s*(.*)$", para)
        if m_sec:
            sec_no = m_sec.group(1)
            tail = m_sec.group(2).strip()

            if tail:
                # Title provided on the same line
                current_section = Section(number=sec_no, title=tail, text="", page_refs=[page])

                # Ensure we have a chapter container (fallback if none parsed yet)
                if current_chapter is None:
                    current_chapter = Chapter(label="CHAPTER ?", title="", sections=[], page_refs=[page])
                    chapters.append(current_chapter)

                current_chapter.sections.append(current_section)
                pending_section_number = None
                continue
            else:
                # Only the section number is present; capture and wait for next paragraph as title
                pending_section_number = sec_no
                current_section = None
                continue

        # If previous line had "12." and THIS line is actually the section title
        if pending_section_number and current_chapter:
            current_section = Section(number=pending_section_number, title=para, text="", page_refs=[page])
            current_chapter.sections.append(current_section)
            pending_section_number = None
            continue

        # -----------------------
        # 3) Subsection detection
        #    e.g., "(1) ...", "(a) ...", "(iv) ..."
        # -----------------------
        m_sub = re.match(r"^\(([0-9ivx]+|[a-z])\)\s*(.*)$", para, re.I)
        if m_sub and current_section:
            label = m_sub.group(1)
            content = m_sub.group(2).strip()
            current_section.subsections.append(
                Subsection(label=f"({label})", text=content, page_refs=[page])
            )
            continue

        # -----------------------
        # 4) Body text accumulation
        # -----------------------
        if current_section:
            # Append paragraph text to the current section body
            current_section.text = (current_section.text + " " + para).strip()
            # Track page reference if new
            if page not in current_section.page_refs:
                current_section.page_refs.append(page)
        elif current_chapter:
            # If we have chapter context but no section, just keep page refs
            if page not in current_chapter.page_refs:
                current_chapter.page_refs.append(page)

    # ----------------------------------
    # Build serializable JSON-like dict
    # ----------------------------------
    # NOTE: The following two variables must be defined by your runtime:
    #   DOC_ID      → unique identifier for this document
    #   SOURCE_FILE → original path/filename for traceability
    result = {
        "doc_id": DOC_ID,
        "source_path": SOURCE_FILE,
        "title": "The Constitution of the Kingdom of Swaziland, 2005",
        "chapters": [
            {
                "label": ch.label,
                "title": ch.title,
                "page_refs": ch.page_refs,
                "sections": [
                    {
                        "number": sec.number,
                        "title": sec.title,
                        "page_refs": sec.page_refs,
                        "text": sec.text,
                        "subsections": [
                            {"label": sub.label, "text": sub.text, "page_refs": sub.page_refs}
                            for sub in sec.subsections
                        ],
                    }
                    for sec in ch.sections
                ],
            }
            for ch in chapters
        ],
    }
    return result


In [29]:
def split_into_chunks(struct: Dict[str, Any], target_tokens=600, overlap_tokens=90) -> List[Dict[str, Any]]:
    """
    Build RAG-ready chunks from the structured doc:
      - One chunk stream per Section (Chapter label + Section title as heading_path)
      - Concatenate section body + subsection bullets
      - If the text is longer than target_tokens, emit sliding windows with overlap

    NOTE: Despite the param names, this function currently uses *word counts*,
          not LLM tokens. See the optional tokenizer-based variant below.
    """
    chunks: List[Dict[str, Any]] = []

    # Simple "tokenizer": split on whitespace (words), fast and library-free
    def words(s: str) -> List[str]:
        return s.split()

    for ch in struct["chapters"]:
        ch_label = ch["label"]
        ch_title = ch.get("title", "")

        for sec in ch["sections"]:
            # Human-friendly breadcrumb for inspection/UIs
            heading_path = f"{ch_label} - {sec['title']}".strip(" -")

            # Minimal but useful metadata for later filtering/citation
            base_meta = {
                "doc_id": struct["doc_id"],
                "source_file": struct["source_path"],
                "chapter": ch_label,
                "chapter_title": ch_title,
                "section_number": sec["number"],
                "section_title": sec["title"],
                "page_refs": sec.get("page_refs", []),
            }

            # Build a single text stream: section body + each subsection as a prefixed line
            parts: List[str] = []
            if sec.get("text"):
                parts.append(sec["text"].strip())

            for sub in sec.get("subsections", []):
                label = sub.get("label", "")
                t = (sub.get("text") or "").strip()
                if t:
                    parts.append(f"{label} {t}".strip())

            full_text = "\n".join(parts).strip()
            if not full_text:
                continue  # skip empty sections

            w = words(full_text)

            # Case 1: short enough → one chunk
            if len(w) <= target_tokens:
                chunks.append({
                    "chunk_id": str(uuid.uuid4()),
                    "heading_path": heading_path,
                    "text": full_text,
                    "metadata": base_meta
                })
            else:
                # Case 2: long → sliding window with overlap
                # step = non-overlapped advance size
                step = max(1, target_tokens - overlap_tokens)

                for start in range(0, len(w), step):
                    window = w[start:start + target_tokens]
                    if not window:
                        break

                    text_piece = " ".join(window).strip()

                    chunks.append({
                        "chunk_id": str(uuid.uuid4()),
                        "heading_path": heading_path,
                        "text": text_piece,
                        "metadata": base_meta
                    })

                    # If we've reached or passed the tail, stop
                    if start + target_tokens >= len(w):
                        break

    return chunks


In [None]:
# ==========================================
# Pipeline Orchestration (Extract → Parse → Chunk → Export)
# ==========================================

# 0) Sanity check: ensure the input PDF exists
assert os.path.exists(PDF_PATH), f"PDF not found at: {PDF_PATH}"

# 1) Extract
print("[pipeline] Extracting pages ...")
pages_clean = extract_pages(PDF_PATH)
# Count unique pages seen and total paragraphs emitted
print(f"[pipeline] Pages extracted: {len({p['page'] for p in pages_clean})}, paragraphs: {len(pages_clean)}")

# 2) Parse structure (Chapter → Section → Subsection)
print("[pipeline] Parsing structure ...")
structured = parse_structure(pages_clean)
chapters_detected = len(structured["chapters"])
total_sections = sum(len(ch["sections"]) for ch in structured["chapters"])
print(f"[pipeline] Chapters detected: {chapters_detected}, sections: {total_sections}")

# 3) Chunk for RAG / fine-tuning (token-aware splitting with overlap)
print("[pipeline] Splitting into chunks ...")
# NOTE: Requires you to define split_into_chunks(), TARGET_TOKENS, OVERLAP_TOKENS beforehand.
chunks = split_into_chunks(
    structured,
    target_tokens=TARGET_TOKENS,
    overlap_tokens=OVERLAP_TOKENS
)
print(f"[pipeline] Chunks emitted: {len(chunks)}")

# 4) Emit outputs (Markdown for human inspection, JSON for structure, JSONL for RAG)
print("[pipeline] Writing outputs ...")

# 4.a) Build a readable Markdown summary of the parsed structure
md_lines = [
    f"# {structured.get('title','Document')}",
    f"_Source: {structured.get('source_path','')}_",
    ""
]

for ch in structured["chapters"]:
    # Chapter header: "## CHAPTER I: Preliminary"
    ch_header = f"## {ch['label']}"
    if ch.get("title"):
        ch_header += f": {ch['title'].title()}"  # Title-case for visual consistency
    md_lines.append(ch_header)
    md_lines.append("")

    # Sections inside the chapter
    for sec in ch["sections"]:
        # Section header: "### 12. Protection of right to life"
        md_lines.append(f"### {sec['number']}. {sec['title']}")
        md_lines.append("")

        # Optional body text of the section (wrapped to 100 chars)
        if sec.get("text"):
            md_lines.append(textwrap.fill(sec["text"], width=100))
            md_lines.append("")

        # 🔧 BUGFIX: iterate section subsections (not chapter)
        # Previously: for sub in ch.get("subsections", []):
        for sub in sec.get("subsections", []):
            line = f"- **{sub['label']}** {sub['text']}"
            # Wrap with indentation for nice bullets
            md_lines.append(textwrap.fill(line, subsequent_indent="  ", width=100))
        md_lines.append("")

# 4.b) Ensure output directory exists
os.makedirs(OUT_DIR, exist_ok=True)

# 4.c) Write Markdown
with open(MD_OUT, "w", encoding="utf-8") as f:
    f.write("\n".join(md_lines).strip() + "\n")

# 4.d) Write structured JSON (chapters/sections/subsections with page refs)
with open(STRUCT_JSON_OUT, "w", encoding="utf-8") as f:
    json.dump(structured, f, ensure_ascii=False, indent=2)

# 4.e) Write RAG chunks to JSONL (one JSON object per line)
with open(CHUNKS_JSONL_OUT, "w", encoding="utf-8") as f:
    for ch in chunks:
        f.write(json.dumps(ch, ensure_ascii=False) + "\n")

print("[pipeline] Outputs written:")
print("  - Markdown:", MD_OUT)
print("  - Structured JSON:", STRUCT_JSON_OUT)
print("  - RAG JSONL:", CHUNKS_JSONL_OUT)


[pipeline] Extracting pages ...
[pipeline] Pages extracted: 159, paragraphs: 3393
[pipeline] Parsing structure ...
[pipeline] Chapters detected: 38, sections: 563
[pipeline] Splitting into chunks ...
[pipeline] Chunks emitted: 308
[pipeline] Writing outputs ...
[pipeline] Outputs written:
  - Markdown: /kaggle/working/constitution_clean.md
  - Structured JSON: /kaggle/working/constitution_structured.json
  - RAG JSONL: /kaggle/working/constitution_chunks.jsonl


In [None]:
import json
from typing import List, Dict, Any
from google.api_core import retry
import chromadb
from chromadb import Documents, EmbeddingFunction, Embeddings

# Assumes these already exist in your runtime:
# - genai, types, client       (google-genai client & types)
# - DB_NAME: str               (name of the Chroma collection)
# - chunks: List[Dict[str,Any]] (RAG-ready chunks with "text", "chunk_id", "heading_path", "metadata")

# -------------------------------------------------------
# Retry predicate: retry only rate limit / service busy
# -------------------------------------------------------
# google-genai raises APIError with .code (HTTP status). We retry 429 & 503.
is_retriable = lambda e: (
    isinstance(e, genai.errors.APIError) and getattr(e, "code", None) in {429, 503}
)

# -------------------------------------------------------
# Gemini Embedding Function for Chroma
# -------------------------------------------------------
class GeminiEmbeddingFunction(EmbeddingFunction):
    """
    Minimal Chroma-compatible embedding function backed by Google Gemini embeddings.
    - Supports batching (Gemini embed_content supports up to ~100 docs per call for this model).
    - `document_mode=True` → use "retrieval_document"; `False` → "retrieval_query".
    """
    document_mode = True
    batch_size = 100  # Gemini embed_content limit per call (safe default)

    @retry.Retry(predicate=is_retriable)  # automatic backoff on 429/503
    def __call__(self, input: Documents) -> Embeddings:
        # Chroma may pass bytes/None; ensure strings.
        texts = [("" if x is None else str(x)) for x in input]
        task = "retrieval_document" if self.document_mode else "retrieval_query"

        all_vecs: List[List[float]] = []
        for i in range(0, len(texts), self.batch_size):
            batch = texts[i:i + self.batch_size]
            # google-genai v1.7.0 style:
            resp = client.models.embed_content(
                model="models/text-embedding-004",
                contents=batch,
                config=types.EmbedContentConfig(task_type=task),
            )
            # `resp.embeddings` is a list aligned to `batch`
            all_vecs.extend([e.values for e in resp.embeddings])

        return all_vecs


# -------------------------------------------------------
# Metadata sanitizer: keep nested types as JSON strings
# -------------------------------------------------------
def _sanitize_metadata(md: dict) -> dict:
    """
    Chroma metadata entries must be JSON-serializable scalars.
    We JSON-encode any lists/dicts so you can round-trip them later.
    """
    out = {}
    for k, v in md.items():
        if isinstance(v, (list, dict)):
            out[k] = json.dumps(v, ensure_ascii=False)  # preserve structure as string
        elif isinstance(v, (str, int, float, bool)) or v is None:
            out[k] = v
        else:
            out[k] = str(v)
    return out


# -------------------------------------------------------
# Build (or open) the Chroma vector DB and ingest chunks
# -------------------------------------------------------
def build_vector_db(chunks: List[Dict[str, Any]]):
    embed_fn = GeminiEmbeddingFunction()
    embed_fn.document_mode = True  # document vectors (switch to False for query vectors)

    # NOTE:
    # - chromadb.Client() is in-memory (ephemeral). Use PersistentClient()
    #   if you want data to persist across runs, e.g.:
    #   chroma_client = chromadb.PersistentClient(path="./chroma_store")
    chroma_client = chromadb.Client()

    # Optional clean re-run (uncomment to drop collection on each run)
    # try:
    #     chroma_client.delete_collection(DB_NAME)
    # except Exception:
    #     pass

    db = chroma_client.get_or_create_collection(
        name=DB_NAME,
        embedding_function=embed_fn,  # Chroma will call embed_fn(texts) automatically
    )

    # Prepare payloads
    documents = [c["text"] for c in chunks]
    ids = [c["chunk_id"] for c in chunks]  # must be unique
    metadatas = [
        _sanitize_metadata({
            "heading_path": c["heading_path"],
            **c["metadata"],
        })
        for c in chunks
    ]

    print(f"[chroma] Adding {len(documents)} chunks ...")

    # If you ever hit provider/chroma limits on very large datasets,
    #    you can add in smaller batches (e.g., batches of 1k):
    # for i in range(0, len(documents), 1000):
    #     db.add(
    #         documents=documents[i:i+1000],
    #         ids=ids[i:i+1000],
    #         metadatas=metadatas[i:i+1000],
    #     )

    db.add(documents=documents, ids=ids, metadatas=metadatas)

    print(f"[chroma] Count: {db.count()}")
    return db, embed_fn


# Build DB now
db, embed_fn = build_vector_db(chunks)


[chroma] Adding 308 chunks ...
[chroma] Count: 308


In [33]:
import json
from typing import List, Dict, Any

def make_prompt(query: str, passages: List[str], metadatas: List[Dict[str, Any]]) -> str:
    def _maybe_json(v):
        if isinstance(v, str):
            try:
                return json.loads(v)
            except Exception:
                return v
        return v

    q_one = query.replace("\n", " ")
    prompt = (
        "You are a careful legal assistant. Answer ONLY from the reference passages below.\n"
        "Cite section numbers and chapter labels when available. If the answer cannot be found in the passages, say you don't know.\n"
        "Keep the answer concise and precise.\n\n"
        f"QUESTION: {q_one}\n"
    )

    for i, (p, md) in enumerate(zip(passages, metadatas), start=1):
        p_one = (p or "").replace("\n", " ")

        # Safely read metadata (some values may be JSON strings)
        sec_no   = _maybe_json(md.get("section_number", "?"))
        sec_title= _maybe_json(md.get("section_title", ""))
        chapter  = _maybe_json(md.get("chapter", ""))
        pages    = _maybe_json(md.get("page_refs", []))

        prompt += (
            f"\nPASSAGE {i} (Section {sec_no}: {sec_title} | {chapter} | "
            f"pages {pages}): {p_one}\n"
        )
    return prompt


def rag_answer(db, embed_fn: GeminiEmbeddingFunction, query: str, top_k: int = 4, model: str = "gemini-2.0-flash"):
    # Switch to query mode for embeddings
    embed_fn.document_mode = False

    # IMPORTANT: do NOT include "ids" here; chroma raises on that.
    result = db.query(
        query_texts=[query],
        n_results=top_k,
        include=["documents", "metadatas", "distances"],  # no "ids" here
    )

    passages  = result.get("documents", [[]])[0]
    metadatas = result.get("metadatas", [[]])[0]
    distances = result.get("distances", [[]])[0]
    ids       = result.get("ids", [[]])[0]  # safe: available even if not in include

    prompt = make_prompt(query, passages, metadatas)
    answer = client.models.generate_content(model=model, contents=prompt)

    return {
        "answer": getattr(answer, "text", str(answer)),
        "passages": passages,
        "metadatas": metadatas,
        "distances": distances,
        "ids": ids,
        "prompt": prompt,
    }


In [None]:
import json

# ------------------------------------------
# Helper: Attempt to parse JSON-encoded metadata fields
# ------------------------------------------
def _maybe_json(v):
    """
    Safely decode metadata fields that may be stored as JSON strings.
    If parsing fails, return the original value.
    """
    if isinstance(v, str):
        try:
            return json.loads(v)
        except Exception:
            return v
    return v


# ------------------------------------------
# DEMO QUERY
# ------------------------------------------
demo_query = "What does the Constitution say the role of the King?"

# Call our RAG pipeline
result = rag_answer(db, embed_fn, demo_query, top_k=4)

# ------------------------------------------
# Display Answer
# ------------------------------------------
print("=== RAG ANSWER ===\n")
print(result.get("answer", "").strip())

# ------------------------------------------
# Show Sources (for transparency & debugging)
# ------------------------------------------
print("\n--- Sources ---")
metas = result.get("metadatas", [])
dists = result.get("distances", [])
ids   = result.get("ids", [])

for i, (md, d, cid) in enumerate(zip(metas, dists, ids), start=1):
    # Attempt to decode any JSON fields back to Python objects
    chapter   = _maybe_json(md.get("chapter"))
    sec_no    = _maybe_json(md.get("section_number"))
    sec_title = _maybe_json(md.get("section_title"))
    page_refs = _maybe_json(md.get("page_refs"))

    # Fallbacks: avoid printing "None"
    chapter   = chapter or ""
    sec_no    = sec_no or "?"
    sec_title = sec_title or ""
    page_refs = page_refs if isinstance(page_refs, list) else page_refs

    # Print citation-like reference with similarity distance (lower = closer)
    print(f"[{i}] id={cid} | {chapter} | Section {sec_no}: {sec_title} | pages {page_refs} | distance={d:.4f}")


=== RAG ANSWER ===

The executive authority of Swaziland vests in the King as Head of State and shall be exercised in accordance with the provisions of the Constitution (Section 64(1), CHAPTER VI). The King is a hereditary Head of State (Section 4(1), CHAPTER II) and a symbol of unity (Section 4(2), CHAPTER II). The King is also Commander-in-Chief of the Defence Force, Commissioner-in-Chief of the Police Service, and Commissioner-in-Chief of the Correctional Services (Section 4(3), CHAPTER II). The King and Parliament may make laws for the peace, order, and good government of Swaziland (Section 106(b), CHAPTER VII).

--- Sources ---
[1] id=6ad5e7a3-50ab-4703-bbf1-162fbf03bf24 | CHAPTER VI | Section 64: (1) The executive authority of Swaziland vests in the King as Head of State and | pages [50, 51] | distance=0.5717
[2] id=1b98da3e-f0cc-4bac-b628-cf336ebbb5ff | CHAPTER VI | Section 65: (1) In the exercise of the functions under this Constitution or any other law the | pages [51] | dista