In [None]:
from datasets import load_dataset
from tokenizers import Tokenizer
from tokenizers.models import BPE
from tokenizers.trainers import BpeTrainer
from tokenizers.pre_tokenizers import ByteLevel
from tokenizers.decoders import ByteLevel as ByteLevelDecoder
from collections import Counter
import numpy as np
from scipy.optimize import curve_fit

In [None]:
# ---------------------------
# 1. Load TinyStories corpus
# ---------------------------

def load_tinystories(split="train", max_examples=None):
    ds = load_dataset("roneneldan/TinyStories", split=split)  # [web:47][web:93]
    texts = ds["text"] if "text" in ds.column_names else ds["story"]
    if max_examples is not None:
        texts = texts[:max_examples]
    return texts

In [None]:
# ---------------------------
# 2. Train byte-level BPE tokenizer
# ---------------------------

def train_byte_bpe(texts, vocab_size, min_frequency=2, special_tokens=None):
    if special_tokens is None:
        special_tokens = ["<pad>", "<bos>", "<eos>", "<unk>"]

    tokenizer = Tokenizer(BPE(unk_token="<unk>"))  # [web:97][web:94]
    tokenizer.pre_tokenizer = ByteLevel()          # byte-level pretokenizer
    tokenizer.decoder = ByteLevelDecoder()

    trainer = BpeTrainer(
        vocab_size=vocab_size,
        min_frequency=min_frequency,
        special_tokens=special_tokens,
        show_progress=True,
    )

    # tokenizers expects an iterator over strings
    tokenizer.train_from_iterator(texts, trainer=trainer)

    return tokenizer

In [None]:
# ---------------------------
# 3. Compute Zipf-style metric
# ---------------------------

def power_law(rank, a, b):
    # log(freq) ≈ a + b * log(rank)  -> freq ≈ exp(a) * rank^b  [web:98][web:95]
    return a + b * np.log(rank)

def zipf_r2_from_counts(counts, top_k=None):
    """
    counts: iterable of token counts (already sorted descending).
    Returns R^2 of log(count) vs log(rank) fit.
    """
    freqs = np.array(sorted(counts, reverse=True), dtype=np.float64)

    if top_k is not None:
        freqs = freqs[:top_k]

    # ranks: 1..N
    ranks = np.arange(1, len(freqs) + 1, dtype=np.float64)

    # log-space
    log_ranks = np.log(ranks)
    log_freqs = np.log(freqs)

    # fit log_freqs = a + b * log_ranks
    popt, _ = curve_fit(power_law, ranks, log_freqs)  # [web:98][web:100]
    a, b = popt

    # predictions
    log_freqs_pred = power_law(ranks, a, b)

    # R^2
    ss_res = np.sum((log_freqs - log_freqs_pred) ** 2)
    ss_tot = np.sum((log_freqs - np.mean(log_freqs)) ** 2)
    r2 = 1.0 - ss_res / ss_tot if ss_tot > 0 else 0.0

    return r2, a, b

In [None]:
# ---------------------------
# 4. Evaluate a tokenizer on Zipf-style metric
# ---------------------------

def tokenizer_zipf_score(tokenizer, texts, max_docs=None, top_k=None):
    """
    Tokenize a subset of texts, compute token frequency,
    then fit Zipf-like curve and return R^2 and basic stats.
    """
    if max_docs is not None:
        texts = texts[:max_docs]

    counter = Counter()

    for t in texts:
        ids = tokenizer.encode(t).ids
        counter.update(ids)

    counts = list(counter.values())
    total_tokens = sum(counts)
    vocab_in_use = len(counts)

    r2, a, b = zipf_r2_from_counts(counts, top_k=top_k)

    stats = {
        "r2": r2,
        "a": a,
        "b": b,
        "total_tokens": total_tokens,
        "vocab_in_use": vocab_in_use,
    }
    return stats

In [None]:
# Hyperparameters
vocab_sizes = [4096, 8192, 16384]
max_train_examples = 20000       # for tokenizer training
max_eval_docs = 5000             # for Zipf metric evaluation
top_k_zipf = 5000                # fit Zipf on top-K most frequent tokens

print("Loading TinyStories texts...")
texts = load_tinystories(split="train", max_examples=max_train_examples)
print(f"Loaded {len(texts)} examples.")

results = []

for vs in vocab_sizes:
    print(f"\n=== Training tokenizer with vocab_size={vs} ===")
    tok = train_byte_bpe(texts, vocab_size=vs)

    print("Computing Zipf-style statistics...")
    stats = tokenizer_zipf_score(tok, texts, max_docs=max_eval_docs, top_k=top_k_zipf)

    print(f"Vocab size (target): {vs}")
    print(f"Vocab size (in use): {stats['vocab_in_use']}")
    print(f"Total tokens (sample): {stats['total_tokens']}")
    print(f"Zipf fit R^2: {stats['r2']:.4f}  (a={stats['a']:.3f}, b={stats['b']:.3f})")

    results.append((vs, stats))

print("\n=== Summary ===")
for vs, stats in results:
    print(
        f"vocab={vs:5d} | in_use={stats['vocab_in_use']:5d} | "
        f"tokens={stats['total_tokens']:8d} | R^2={stats['r2']:.4f}"
    )

# Pick the vocab size with the highest R^2
best_vs, best_stats = max(results, key=lambda x: x[1]["r2"])
print(f"\nBest by Zipf R^2: vocab_size={best_vs}, R^2={best_stats['r2']:.4f}")


Loading TinyStories texts...
Loaded 20000 examples.

=== Training tokenizer with vocab_size=4096 ===
Computing Zipf-style statistics...
Vocab size (target): 4096
Vocab size (in use): 3880
Total tokens (sample): 1052178
Zipf fit R^2: 0.8368  (a=14.309, b=-1.425)

=== Training tokenizer with vocab_size=8192 ===
Computing Zipf-style statistics...
Vocab size (target): 8192
Vocab size (in use): 7132
Total tokens (sample): 1010099
Zipf fit R^2: 0.9823  (a=13.970, b=-1.397)

=== Training tokenizer with vocab_size=16384 ===
Computing Zipf-style statistics...
Vocab size (target): 16384
Vocab size (in use): 9485
Total tokens (sample): 1001119
Zipf fit R^2: 0.9725  (a=14.448, b=-1.478)

=== Summary ===
vocab= 4096 | in_use= 3880 | tokens= 1052178 | R^2=0.8368
vocab= 8192 | in_use= 7132 | tokens= 1010099 | R^2=0.9823
vocab=16384 | in_use= 9485 | tokens= 1001119 | R^2=0.9725

Best by Zipf R^2: vocab_size=8192, R^2=0.9823


In [None]:
import random

# Re-train the best tokenizer using best_vs
print(f"\n=== Re-training the best tokenizer with vocab_size={best_vs} ===")
best_tokenizer = train_byte_bpe(texts, vocab_size=best_vs)


=== Re-training the best tokenizer with vocab_size=8192 ===


In [None]:
# Select random stories from the dataset
random_stories = random.sample(texts, 1)

print("\n=== Encoding and Decoding Random Stories ===")
for i, story in enumerate(random_stories):
    print(f"\n--- Story {i+1} ---")
    print("Original:")
    print(story)

    # Encode the story
    encoded_tokens = best_tokenizer.encode(story)
    print("\nEncoded IDs (first 20):", encoded_tokens.ids[:20])

    # Decode the story
    decoded_story = best_tokenizer.decode(encoded_tokens.ids)
    print("\nDecoded:")
    print(decoded_story)


=== Encoding and Decoding Random Stories ===

--- Story 1 ---
Original:
Lily wanted to teach her doll how to drink wine. She saw her mom and dad drink wine sometimes, and they said it was good. Lily found a bottle of wine in the kitchen and poured some in a cup. She brought the cup and the doll to her room.

"Look, doll, this is wine. It is a drink for grown-ups. You can try some, but only a little bit, okay?" Lily said to her doll. She held the cup to the doll's mouth and pretended to make her drink. "Mmm, do you like it? It is sweet and sour."

But then, Lily heard a knock on the door. It was her mom. She opened the door and saw Lily and the doll with the cup of wine. She was very angry and surprised.

"Lily, what are you doing? Where did you get that wine? You know you are not allowed to touch that!" her mom said in a loud voice.

Lily felt embarrassed and scared. She dropped the cup and the wine spilled on the floor. She started to cry and hugged her doll.

"I'm sorry, mom. I just