In [26]:
import torch
import torch.nn as nn
import torch.optim as optim
from model import TinyGPT
from data import CharDataset, CharTokenizer

In [27]:
import requests, os

url = "https://raw.githubusercontent.com/karpathy/char-rnn/master/data/tinyshakespeare/input.txt"
if not os.path.exists("input.txt"):
    with open("input.txt", "w", encoding="utf-8") as f:
        f.write(requests.get(url).text)

with open("input.txt", "r", encoding="utf-8") as f:
    text = f.read()

print("Dataset length:", len(text))
print("Sample:\n", text[:500])


Dataset length: 1115394
Sample:
 First Citizen:
Before we proceed any further, hear me speak.

All:
Speak, speak.

First Citizen:
You are all resolved rather to die than to famish?

All:
Resolved. resolved.

First Citizen:
First, you know Caius Marcius is chief enemy to the people.

All:
We know't, we know't.

First Citizen:
Let us kill him, and we'll have corn at our own price.
Is't a verdict?

All:
No more talking on't; let it be done: away, away!

Second Citizen:
One word, good citizens.

First Citizen:
We are accounted poor


### Create BPE Tokenizer

In [None]:
from tokenizers import Tokenizer, models, trainers, pre_tokenizers, decoders, normalizers

# Create a BPE tokenizer
tokenizer = Tokenizer(models.BPE())
tokenizer.pre_tokenizer = pre_tokenizers.ByteLevel()

tokenizer.normalizer = normalizers.Sequence([
    normalizers.NFD(), # splits accented letters into base + accent mark
    normalizers.StripAccents() # removes the accent mark
])

tokenizer.decoder = decoders.ByteLevel()

trainer = trainers.BpeTrainer(vocab_size=5000, special_tokens=["<pad>", "<unk>"])
tokenizer.train(files=["input.txt"], trainer=trainer)

# Save and reload
tokenizer.save("bpe_tokenizer.json")

In [63]:
from tokenizers import Tokenizer
tokenizer = Tokenizer.from_file("bpe_tokenizer.json")

encoded = tokenizer.encode("Hello world!")
print(encoded.tokens)   # subword tokens
print(encoded.ids)      # corresponding IDs


['ĠH', 'ello', 'Ġworld', '!']
[141, 3709, 696, 2]


In [None]:
def encode(text):
    return tokenizer.encode(text).ids

def decode(ids):
    return tokenizer.decode(ids)

# Encode full dataset
with open("input.txt", "r", encoding="utf-8") as f:
    text = f.read()

data = torch.tensor(encode(text), dtype=torch.long)

# Train/val split
n = int(0.9 * len(data))
train_data, val_data = data[:n], data[n:]

dataset = {
    "train": train_data,
    "val": val_data
}

vocab_size = tokenizer.get_vocab_size()
block_size = 128

In [31]:
# # Example toy dataset
# text = "abcdefghijklmnopqrstuvwxyz ABCDEFGHIJKLMNOPQRSTUVWXYZ"
# tok = CharTokenizer(text)
# print("Vocab size:", tok.vocab_size)

# # Encode the full text into tokens
# data = torch.tensor(tok.encode(text), dtype=torch.long)

# # 90% train, 10% val split
# n = int(0.9 * len(data))
# train_data = data[:n]
# val_data = data[n:]

# dataset = {
#     "train": train_data,
#     "val": val_data
# }

# block_size = 8
# vocab_size = tok.vocab_size


In [32]:
def get_batch(split, data, batch_size=4, block_size=8, device="cuda"):
    # split: "train" or "val"
    # data: dict with {"train": tensor, "val": tensor}
    
    # pick the right dataset
    data_split = data[split]

    # pick random starting indices
    ix = torch.randint(len(data_split) - block_size, (batch_size,))
    
    # slice out input (x) and target (y)
    x = torch.stack([data_split[i:i+block_size] for i in ix])
    y = torch.stack([data_split[i+1:i+block_size+1] for i in ix])
    
    return x.to(device), y.to(device)

In [33]:
import torch.nn.functional as F
@torch.no_grad()
def evaluate(model, data, split, batch_size=32, block_size=8, device="cpu"):
    model.eval()
    losses = []
    for _ in range(100):  # sample 100 mini-batches for val
        x, y = get_batch(split, data, batch_size=batch_size, block_size=block_size, device=device)
        logits = model(x)
        loss = F.cross_entropy(logits.view(-1, logits.size(-1)), y.view(-1))
        losses.append(loss.item())
    return sum(losses) / len(losses)


In [34]:
import math
import csv

device = "cpu"
print("Using device:", device)

model = TinyGPT(
    vocab_size=vocab_size,
    d_model=128,
    n_heads=4,
    n_layers=2,
    max_seq_len=block_size
).to(device)

optimizer = optim.AdamW(model.parameters(), lr=3e-4)
loss_fn = nn.CrossEntropyLoss()

with open("metrics/metrics.csv", "w", newline="") as f:
    writer = csv.writer(f)
    writer.writerow(["step", "train_loss", "val_loss", "val_ppl"])

# Training loop
for step in range(2500):
    # 1. get batch
    x, y = get_batch("train", dataset, block_size=block_size, device=device)

    # 2. forward
    logits = model(x)

    # 3. compute loss
    loss = loss_fn(logits.view(-1, logits.size(-1)), y.view(-1))

    # 4. backward
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

    if step % 50 == 0:
        val_loss = evaluate(model, dataset, "val", block_size=block_size, device=device)
        train_loss = loss.item()
        with open("metrics/metrics.csv", "a", newline="") as f:
            writer = csv.writer(f)
            writer.writerow([step, train_loss, val_loss, math.exp(val_loss)])
        print(f"Step {step}, train loss = {train_loss:.4f}, val loss = {val_loss:.4f}, val ppl = {math.exp(val_loss):.2f}")


Using device: cpu
Step 0, train loss = 85.9706, val loss = 84.1309, val ppl = 3448178346347383671907880052107247616.00
Step 50, train loss = 29.7327, val loss = 28.0439, val ppl = 1511179072050.17
Step 100, train loss = 22.0310, val loss = 20.8129, val ppl = 1093793479.46
Step 150, train loss = 18.4586, val loss = 17.4859, val ppl = 39268022.10
Step 200, train loss = 14.2242, val loss = 15.2872, val ppl = 4356382.78
Step 250, train loss = 11.9233, val loss = 13.7279, val ppl = 916148.13
Step 300, train loss = 12.8864, val loss = 12.5391, val ppl = 279025.76
Step 350, train loss = 10.2429, val loss = 11.6767, val ppl = 117799.57
Step 400, train loss = 13.2997, val loss = 11.1662, val ppl = 70702.62
Step 450, train loss = 12.7010, val loss = 10.4171, val ppl = 33425.61
Step 500, train loss = 9.0996, val loss = 9.9958, val ppl = 21934.90
Step 550, train loss = 8.8640, val loss = 9.5351, val ppl = 13836.67
Step 600, train loss = 6.8515, val loss = 9.1199, val ppl = 9135.46
Step 650, train 

In [55]:
import torch
import torch.nn.functional as F

@torch.no_grad()
def sample(
    model,
    tokenizer,
    start,
    max_new_tokens: int = 50,
    temperature: float = 1.0,
    top_k: int | None = None,
    top_p: float | None = None,
    device: str | None = None,
):
    """
    Robust autoregressive sampling for TinyGPT-like models.

    - model: your TinyGPT instance (should have model.pos_emb)
    - tokenizer: your CharTokenizer with encode()/decode()
    - start: prompt string
    - max_new_tokens: how many tokens to generate
    - temperature: >0 float (small <1 => deterministic), 0 => greedy
    - top_k: keep only top_k logits (int) if not None
    - top_p: nucleus probability threshold (0<p<1) if not None
    """
    model.eval()
    if device is None:
        device = next(model.parameters()).device

    # encode prompt
    if isinstance(start, str):
        input_ids = tokenizer.encode(start).ids
    elif isinstance(start, list):
        input_ids = start
    else:
        raise ValueError("start must be a string or list of token IDs")
    
    if len(input_ids) == 0:
        raise ValueError("Prompt must be non-empty (or handle a default token).")
    # make tensor shape (1, seq)
    generated = torch.tensor([input_ids], dtype=torch.long, device=device)

    # max context length supported by the model's positional embeddings
    # we read num_embeddings (safe even if model doesn't store max_seq_len explicitly)
    max_context = model.pos_emb.num_embeddings

    for _ in range(max_new_tokens):
        # trim to model context window (use only the last max_context tokens)
        if generated.size(1) > max_context:
            input_ids_tensor = generated[:, -max_context:]
        else:
            input_ids_tensor = generated

        # forward pass to get logits: shape (1, seq, vocab)
        logits = model(input_ids_tensor)  
        logits = logits[:, -1, :]  # take logits for the last position -> shape (1, vocab)

        # temperature handling
        if temperature == 0:
            # greedy
            next_token = torch.argmax(logits, dim=-1, keepdim=True)  # shape (1,1)
            generated = torch.cat((generated, next_token), dim=1)
            continue
        else:
            logits = logits / float(temperature)

        # top-k filtering (optional)
        if top_k is not None and top_k > 0:
            top_k = min(top_k, logits.size(-1))  # don't exceed vocab
            # get the kth largest logit value for each batch row and mask below it
            values, _ = torch.topk(logits, top_k, dim=-1)
            min_values = values[..., -1, None]  # threshold
            logits = torch.where(logits < min_values, torch.full_like(logits, float("-inf")), logits)

        # top-p (nucleus) filtering (optional)
        if top_p is not None and 0.0 < top_p < 1.0:
            sorted_logits, sorted_indices = torch.sort(logits, descending=True)
            sorted_probs = F.softmax(sorted_logits, dim=-1)
            cumulative_probs = torch.cumsum(sorted_probs, dim=-1)

            # mask tokens with cumulative prob > top_p
            sorted_mask = cumulative_probs > top_p
            # keep at least one token: shift mask right so first token included
            sorted_mask[..., 1:] = sorted_mask[..., :-1].clone()
            sorted_mask[..., 0] = False

            # now set logits of tokens to remove to -inf
            indices_to_remove = sorted_mask.scatter(-1, sorted_indices, sorted_mask)
            logits = logits.masked_fill(indices_to_remove, float("-inf"))

        # convert to probabilities
        probs = F.softmax(logits, dim=-1)  # (1, vocab)

        # sample
        next_token = torch.multinomial(probs, num_samples=1)  # (1,1)
        generated = torch.cat((generated, next_token), dim=1)

    # return the entire sequence as text (prompt + generated)
    return tokenizer.decode(generated[0].tolist())


In [108]:
start_ids = tokenizer.encode("You are all resolved").ids

text = sample(model, tokenizer=tokenizer, start=start_ids, max_new_tokens=30,
              temperature=0.8, top_k=20, top_p=0.9, device="cpu")
print(text)


 You are all resolved what what what what what what what what what what what what what what what what what what what what what what what what what what what what what what


In [64]:
encoded = tokenizer.encode("Hello world!")
print(encoded.tokens)  # ['ĠH', 'ello', 'Ġworld', '!']
print(tokenizer.decode(encoded.ids))  # Hello world!


['ĠH', 'ello', 'Ġworld', '!']
 Hello world!
