# Chunking Testing Notebook
An isolated environment to experiment with different document loaders, chunking strategies, and parameters — and visualize the results before committing to an approach.

## Part 1: Setup

In [None]:
import os
import glob
import tiktoken
import numpy as np
import pandas as pd
import plotly.graph_objects as go
import plotly.express as px
from IPython.display import display, HTML
from langchain_community.document_loaders import DirectoryLoader, TextLoader
from langchain_text_splitters import (
    RecursiveCharacterTextSplitter,
    CharacterTextSplitter,
    MarkdownHeaderTextSplitter,
    TokenTextSplitter,
)

# ── Tokenizer (swap model name to match your LLM) ──────────────────────────
TOKENIZER_MODEL = "gpt-4o"   # used only for token counting, not for embeddings
encoding = tiktoken.encoding_for_model(TOKENIZER_MODEL)

def count_tokens(text: str) -> int:
    return len(encoding.encode(text))

print("Setup complete ✓")

## Part 2: Load Documents

> **TIP — Swapping loaders:** This cell uses `DirectoryLoader + TextLoader` by default.
> To try a different loader, comment out the current block and uncomment one of the alternatives below.
> All loaders produce a list of `Document` objects with `.page_content` and `.metadata`, so the rest of the notebook works unchanged.

In [None]:
# ── CONFIGURE: point this at your data ────────────────────────────────────
DATA_PATH = "../../data/knowledge-base"
# --------------------------------------------------------------------------


# ══════════════════════════════════════════════════════════════════════════
# LOADER OPTION 1 (active): DirectoryLoader + TextLoader
# Good for: plain text, markdown files organised in folders.
# ══════════════════════════════════════════════════════════════════════════
folders = glob.glob(os.path.join(DATA_PATH, "*"))
documents = []
for folder in folders:
    doc_type = os.path.basename(folder)
    loader = DirectoryLoader(
        folder,
        glob="**/*.md",
        loader_cls=TextLoader,
        loader_kwargs={"encoding": "utf-8"},
    )
    for doc in loader.load():
        doc.metadata["doc_type"] = doc_type
        documents.append(doc)


# ══════════════════════════════════════════════════════════════════════════
# LOADER OPTION 2 (swap in): UnstructuredMarkdownLoader
# Good for: richer markdown parsing (tables, headers preserved as elements).
# ══════════════════════════════════════════════════════════════════════════
# from langchain_community.document_loaders import UnstructuredMarkdownLoader
# files = glob.glob(os.path.join(DATA_PATH, "**/*.md"), recursive=True)
# documents = []
# for fp in files:
#     docs = UnstructuredMarkdownLoader(fp).load()
#     for doc in docs:
#         doc.metadata["doc_type"] = os.path.basename(os.path.dirname(fp))
#         documents.append(doc)


# ══════════════════════════════════════════════════════════════════════════
# LOADER OPTION 3 (swap in): PyPDFLoader
# Good for: PDFs — each page becomes a Document.
# ══════════════════════════════════════════════════════════════════════════
# from langchain_community.document_loaders import PyPDFLoader
# files = glob.glob(os.path.join(DATA_PATH, "**/*.pdf"), recursive=True)
# documents = []
# for fp in files:
#     docs = PyPDFLoader(fp).load()
#     for doc in docs:
#         doc.metadata["doc_type"] = os.path.basename(os.path.dirname(fp))
#         documents.append(doc)


# ══════════════════════════════════════════════════════════════════════════
# LOADER OPTION 4 (swap in): CSVLoader
# Good for: structured/tabular data — each row becomes a Document.
# ══════════════════════════════════════════════════════════════════════════
# from langchain_community.document_loaders import CSVLoader
# files = glob.glob(os.path.join(DATA_PATH, "**/*.csv"), recursive=True)
# documents = []
# for fp in files:
#     docs = CSVLoader(fp).load()
#     for doc in docs:
#         doc.metadata["doc_type"] = os.path.basename(os.path.dirname(fp))
#         documents.append(doc)


# ══════════════════════════════════════════════════════════════════════════
print(f"Loaded {len(documents)} documents")
total_chars = sum(len(d.page_content) for d in documents)
total_tokens = sum(count_tokens(d.page_content) for d in documents)
print(f"Total characters : {total_chars:,}")
print(f"Total tokens     : {total_tokens:,}  (model: {TOKENIZER_MODEL})")

### Quick peek at a raw document

In [None]:
DOC_INDEX = 0   # ← change to inspect a different document

doc = documents[DOC_INDEX]
print(f"Metadata : {doc.metadata}")
print(f"Characters: {len(doc.page_content):,}")
print(f"Tokens    : {count_tokens(doc.page_content):,}")
print("─" * 60)
print(doc.page_content[:2000])
if len(doc.page_content) > 2000:
    print("\n... [truncated — set DOC_INDEX or slice further to see more]")

## Part 3: Define Chunking Strategies

> **TIP:** Edit the `STRATEGIES` dict to add, remove, or tweak any strategy.
> Each entry is just a LangChain splitter instance — the analysis cells below will pick them all up automatically.

In [None]:
# ── CONFIGURE: add / edit / comment out strategies here ────────────────────
STRATEGIES = {

    # ── Active strategies ──────────────────────────────────────────────────

    "Recursive · 1000/200": RecursiveCharacterTextSplitter(
        chunk_size=1000,
        chunk_overlap=200,
    ),

    # ── Swap in / experiment with any of the below ─────────────────────────

    # "Recursive · 500/50": RecursiveCharacterTextSplitter(
    #     chunk_size=500, chunk_overlap=50
    # ),

    # "Recursive · 2000/400": RecursiveCharacterTextSplitter(
    #     chunk_size=2000, chunk_overlap=400
    # ),

    # "Character · newline": CharacterTextSplitter(
    #     separator="\n", chunk_size=1000, chunk_overlap=200
    # ),

    # "Token · 256/32": TokenTextSplitter(
    #     chunk_size=256, chunk_overlap=32
    # ),

    # "Markdown Headers": MarkdownHeaderTextSplitter(
    #     headers_to_split_on=[("#", "h1"), ("##", "h2"), ("###", "h3")]
    # ),
}
# ──────────────────────────────────────────────────────────────────────────


# Run all strategies and store results
results = {}
for name, splitter in STRATEGIES.items():
    # MarkdownHeaderTextSplitter takes raw text, not Documents
    if isinstance(splitter, MarkdownHeaderTextSplitter):
        chunks = []
        for doc in documents:
            split_docs = splitter.split_text(doc.page_content)
            for sd in split_docs:
                sd.metadata.update(doc.metadata)
                chunks.append(sd)
    else:
        chunks = splitter.split_documents(documents)

    sizes = [len(c.page_content) for c in chunks]
    tokens = [count_tokens(c.page_content) for c in chunks]
    results[name] = {"chunks": chunks, "sizes": sizes, "tokens": tokens}
    print(f"{name:35s} → {len(chunks):>5} chunks")

## Part 4: Strategy Comparison Table
High-level stats across all active strategies at a glance.

In [None]:
rows = []
for name, r in results.items():
    s, t = r["sizes"], r["tokens"]
    rows.append({
        "Strategy"       : name,
        "# Chunks"       : len(s),
        "Avg chars"      : int(np.mean(s)),
        "Min chars"      : min(s),
        "Max chars"      : max(s),
        "Avg tokens"     : int(np.mean(t)),
        "Min tokens"     : min(t),
        "Max tokens"     : max(t),
        "Total tokens"   : sum(t),
    })

df_compare = pd.DataFrame(rows).set_index("Strategy")
display(df_compare.style.background_gradient(cmap="Blues", subset=["# Chunks", "Avg chars", "Avg tokens"]))

## Part 5: Chunk Size Distribution
Spot outliers — chunks that are suspiciously tiny or unexpectedly large.

In [None]:
fig = go.Figure()
for name, r in results.items():
    fig.add_trace(go.Histogram(
        x=r["sizes"],
        name=name,
        opacity=0.7,
        nbinsx=40,
    ))

fig.update_layout(
    title="Chunk Size Distribution (characters)",
    barmode="overlay",
    xaxis_title="Characters per chunk",
    yaxis_title="Number of chunks",
    width=900, height=450,
    legend=dict(x=0.75, y=0.95),
)
fig.show()

In [None]:
# Same plot but for TOKENS
fig = go.Figure()
for name, r in results.items():
    fig.add_trace(go.Histogram(
        x=r["tokens"],
        name=name,
        opacity=0.7,
        nbinsx=40,
    ))

fig.update_layout(
    title="Chunk Token Distribution",
    barmode="overlay",
    xaxis_title="Tokens per chunk",
    yaxis_title="Number of chunks",
    width=900, height=450,
    legend=dict(x=0.75, y=0.95),
)
fig.show()

## Part 6: Overlap Visualizer
Highlights the shared text between consecutive chunks so you can *see* your overlap strategy working (or not).

In [None]:
def find_overlap(text_a: str, text_b: str) -> str:
    """Return the longest suffix of text_a that is a prefix of text_b."""
    max_overlap = min(len(text_a), len(text_b))
    for size in range(max_overlap, 0, -1):
        if text_a[-size:] == text_b[:size]:
            return text_a[-size:]
    return ""


def render_overlap_html(chunk_a: str, chunk_b: str, overlap: str) -> str:
    """Return an HTML string highlighting the overlapping region."""
    def _highlight(text, overlap, position):
        if not overlap:
            return f"<pre style='white-space:pre-wrap;font-size:13px'>{text}</pre>"
        if position == "tail":
            idx = text.rfind(overlap)
        else:
            idx = text.find(overlap)
        if idx == -1:
            return f"<pre style='white-space:pre-wrap;font-size:13px'>{text}</pre>"
        before = text[:idx]
        after  = text[idx + len(overlap):]
        return (
            f"<pre style='white-space:pre-wrap;font-size:13px'>"
            f"{before}"
            f"<mark style='background:#ffe066;padding:0'>{overlap}</mark>"
            f"{after}</pre>"
        )

    html = (
        "<div style='display:flex;gap:16px'>"
        "<div style='flex:1;border:1px solid #ccc;border-radius:6px;padding:12px'>"
        f"<b>Chunk A</b> ({len(chunk_a)} chars)"
        + _highlight(chunk_a, overlap, "tail") +
        "</div>"
        "<div style='flex:1;border:1px solid #ccc;border-radius:6px;padding:12px'>"
        f"<b>Chunk B</b> ({len(chunk_b)} chars)"
        + _highlight(chunk_b, overlap, "head") +
        "</div>"
        "</div>"
        f"<p style='color:#666;font-size:12px'>Overlap: <b>{len(overlap)}</b> characters highlighted in yellow</p>"
    )
    return html


print("Overlap visualizer ready — run the cell below to inspect a pair of chunks.")

In [None]:
# ── CONFIGURE ──────────────────────────────────────────────────────────────
STRATEGY_TO_INSPECT = list(STRATEGIES.keys())[0]   # ← swap strategy name here
CHUNK_INDEX         = 0                             # ← inspect chunks N and N+1
# ──────────────────────────────────────────────────────────────────────────

chunks = results[STRATEGY_TO_INSPECT]["chunks"]
ca, cb = chunks[CHUNK_INDEX].page_content, chunks[CHUNK_INDEX + 1].page_content
overlap = find_overlap(ca, cb)
display(HTML(render_overlap_html(ca, cb, overlap)))

## Part 7: Raw Chunk Inspector
Page through individual chunks to read the actual text and evaluate quality directly.

In [None]:
# ── CONFIGURE ──────────────────────────────────────────────────────────────
STRATEGY_TO_INSPECT = list(STRATEGIES.keys())[0]   # ← swap strategy name here
CHUNK_INDEX         = 0                             # ← which chunk to display
# ──────────────────────────────────────────────────────────────────────────

chunks = results[STRATEGY_TO_INSPECT]["chunks"]
chunk  = chunks[CHUNK_INDEX]

print(f"Strategy : {STRATEGY_TO_INSPECT}")
print(f"Chunk    : {CHUNK_INDEX + 1} / {len(chunks)}")
print(f"Chars    : {len(chunk.page_content):,}")
print(f"Tokens   : {count_tokens(chunk.page_content):,}")
print(f"Metadata : {chunk.metadata}")
print("─" * 60)
print(chunk.page_content)

In [None]:
# ── Batch inspector: display multiple chunks at once ───────────────────────
# ── CONFIGURE ──────────────────────────────────────────────────────────────
STRATEGY_TO_INSPECT = list(STRATEGIES.keys())[0]
START_INDEX = 0
END_INDEX   = 5    # exclusive — shows chunks START_INDEX to END_INDEX - 1
# ──────────────────────────────────────────────────────────────────────────

chunks = results[STRATEGY_TO_INSPECT]["chunks"]
rows_html = ""
for i, chunk in enumerate(chunks[START_INDEX:END_INDEX], start=START_INDEX):
    rows_html += (
        f"<tr>"
        f"<td style='padding:6px;vertical-align:top;font-weight:bold'>#{i}</td>"
        f"<td style='padding:6px;vertical-align:top'>{chunk.metadata.get('doc_type','—')}</td>"
        f"<td style='padding:6px;vertical-align:top'>{len(chunk.page_content):,}</td>"
        f"<td style='padding:6px;vertical-align:top'>{count_tokens(chunk.page_content):,}</td>"
        f"<td style='padding:6px;vertical-align:top;white-space:pre-wrap;font-size:12px;max-width:600px'>"
        f"{chunk.page_content[:500]}{'...' if len(chunk.page_content) > 500 else ''}"
        f"</td>"
        f"</tr>"
    )

html = (
    f"<h4>{STRATEGY_TO_INSPECT} — chunks {START_INDEX}–{END_INDEX-1}</h4>"
    "<table border='1' style='border-collapse:collapse;font-size:13px;width:100%'>"
    "<tr style='background:#f0f0f0'>"
    "<th>Index</th><th>Doc type</th><th>Chars</th><th>Tokens</th><th>Content preview</th>"
    "</tr>"
    + rows_html +
    "</table>"
)
display(HTML(html))

## Part 8: Find Outlier Chunks
Surface the smallest and largest chunks — useful for spotting broken splits or runaway chunks.

In [None]:
# ── CONFIGURE ──────────────────────────────────────────────────────────────
STRATEGY_TO_INSPECT = list(STRATEGIES.keys())[0]
TOP_N = 5   # how many smallest and largest to show
# ──────────────────────────────────────────────────────────────────────────

chunks = results[STRATEGY_TO_INSPECT]["chunks"]
indexed = sorted(enumerate(chunks), key=lambda x: len(x[1].page_content))

print(f"{'─'*60}\n  {TOP_N} SMALLEST chunks — {STRATEGY_TO_INSPECT}\n{'─'*60}")
for idx, chunk in indexed[:TOP_N]:
    print(f"\n[Chunk #{idx} | {len(chunk.page_content)} chars | {count_tokens(chunk.page_content)} tokens]")
    print(chunk.page_content)

print(f"\n{'─'*60}\n  {TOP_N} LARGEST chunks — {STRATEGY_TO_INSPECT}\n{'─'*60}")
for idx, chunk in indexed[-TOP_N:][::-1]:
    print(f"\n[Chunk #{idx} | {len(chunk.page_content)} chars | {count_tokens(chunk.page_content)} tokens]")
    print(chunk.page_content[:500], "..." if len(chunk.page_content) > 500 else "")

## Part 9: Search Within Chunks
Check whether a specific concept, term, or sentence ended up split across chunks or kept intact.

In [None]:
# ── CONFIGURE ──────────────────────────────────────────────────────────────
STRATEGY_TO_INSPECT = list(STRATEGIES.keys())[0]
SEARCH_TERM = "your search term here"   # ← case-insensitive substring match
# ──────────────────────────────────────────────────────────────────────────

chunks   = results[STRATEGY_TO_INSPECT]["chunks"]
matches  = [(i, c) for i, c in enumerate(chunks) if SEARCH_TERM.lower() in c.page_content.lower()]

print(f"Found '{SEARCH_TERM}' in {len(matches)} / {len(chunks)} chunks\n")
for idx, chunk in matches:
    text = chunk.page_content
    pos  = text.lower().find(SEARCH_TERM.lower())
    snippet_start = max(0, pos - 100)
    snippet_end   = min(len(text), pos + len(SEARCH_TERM) + 100)
    snippet = text[snippet_start:snippet_end]
    print(f"─── Chunk #{idx} ({len(text)} chars) ───")
    print("..." if snippet_start > 0 else "", snippet, "..." if snippet_end < len(text) else "")
    print()