In [1]:
import os, math, random, time
import numpy as np
from tqdm import tqdm
import sentencepiece as spm

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader


In [2]:
def train_sentencepiece(corpus_path: str, model_prefix: str, vocab_size: int = 8000, model_type: str = "bpe"):
    """
    Train sentencepiece tokenizer from raw corpus.
    """
    cmd = f"--input={corpus_path} --model_prefix={model_prefix} --vocab_size={vocab_size} --model_type={model_type} --character_coverage=1.0 --bos_id=-1 --eos_id=-1"
    spm.SentencePieceTrainer.Train(cmd)
    print("Tokenizer saved as:", model_prefix + ".model")

def load_tokenizer(model_file: str):
    sp = spm.SentencePieceProcessor()
    sp.Load(model_file)
    return sp


In [4]:
class RandomWindowTextDataset(Dataset):
    """
    Tokenized corpus dataset: random windows of tokens.
    """
    def __init__(self, token_array: np.ndarray, seq_len: int):
        self.tokens = token_array
        self.seq_len = seq_len

    def __len__(self):
        return max(1, len(self.tokens) - self.seq_len)

    def __getitem__(self, idx):
        if len(self.tokens) <= self.seq_len + 1:
            start = 0
        else:
            start = random.randint(0, len(self.tokens) - self.seq_len - 1)
        x = self.tokens[start:start + self.seq_len].astype(np.int64)
        y = self.tokens[start + 1:start + 1 + self.seq_len].astype(np.int64)
        return torch.from_numpy(x), torch.from_numpy(y)


In [5]:
class GPTConfig:
    def __init__(self, vocab_size, block_size=128, n_layer=4, n_head=4, n_embd=256, dropout=0.1):
        self.vocab_size = vocab_size
        self.block_size = block_size
        self.n_layer = n_layer
        self.n_head = n_head
        self.n_embd = n_embd
        self.dropout = dropout

class CausalSelfAttention(nn.Module):
    def __init__(self, config):
        super().__init__()
        assert config.n_embd % config.n_head == 0
        self.n_head = config.n_head
        self.head_dim = config.n_embd // config.n_head
        self.block_size = config.block_size

        self.qkv = nn.Linear(config.n_embd, 3 * config.n_embd)
        self.out_proj = nn.Linear(config.n_embd, config.n_embd)
        self.attn_dropout = nn.Dropout(config.dropout)
        self.resid_dropout = nn.Dropout(config.dropout)

    def forward(self, x):
        B, T, C = x.size()
        qkv = self.qkv(x)
        q, k, v = qkv.chunk(3, dim=-1)
        q = q.view(B, T, self.n_head, self.head_dim).transpose(1,2)
        k = k.view(B, T, self.n_head, self.head_dim).transpose(1,2)
        v = v.view(B, T, self.n_head, self.head_dim).transpose(1,2)

        att = (q @ k.transpose(-2, -1)) / math.sqrt(self.head_dim)
        mask = torch.tril(torch.ones(T, T, device=x.device)).view(1,1,T,T)
        att = att.masked_fill(mask==0, float("-inf"))
        att = F.softmax(att, dim=-1)
        att = self.attn_dropout(att)

        out = att @ v
        out = out.transpose(1,2).contiguous().view(B, T, C)
        out = self.out_proj(out)
        return self.resid_dropout(out)

class MLP(nn.Module):
    def __init__(self, n_embd, dropout):
        super().__init__()
        self.fc1 = nn.Linear(n_embd, 4*n_embd)
        self.fc2 = nn.Linear(4*n_embd, n_embd)
        self.act = nn.GELU()
        self.drop = nn.Dropout(dropout)

    def forward(self, x):
        return self.drop(self.fc2(self.act(self.fc1(x))))

class Block(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.ln1 = nn.LayerNorm(config.n_embd)
        self.attn = CausalSelfAttention(config)
        self.ln2 = nn.LayerNorm(config.n_embd)
        self.mlp = MLP(config.n_embd, config.dropout)

    def forward(self, x):
        x = x + self.attn(self.ln1(x))
        x = x + self.mlp(self.ln2(x))
        return x

class TinyGPT(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.config = config
        self.tok_emb = nn.Embedding(config.vocab_size, config.n_embd)
        self.pos_emb = nn.Parameter(torch.zeros(1, config.block_size, config.n_embd))
        self.drop = nn.Dropout(config.dropout)
        self.blocks = nn.ModuleList([Block(config) for _ in range(config.n_layer)])
        self.ln_f = nn.LayerNorm(config.n_embd)
        self.head = nn.Linear(config.n_embd, config.vocab_size, bias=False)

    def forward(self, idx):
        B, T = idx.shape
        tok = self.tok_emb(idx)
        pos = self.pos_emb[:, :T, :]
        x = self.drop(tok + pos)
        for block in self.blocks:
            x = block(x)
        x = self.ln_f(x)
        return self.head(x)


In [6]:
def evaluate(model, dataloader, device):
    model.eval()
    losses = []
    with torch.no_grad():
        for x,y in dataloader:
            x,y = x.to(device), y.to(device)
            logits = model(x)
            B,T,V = logits.size()
            loss = F.cross_entropy(logits.view(B*T,V), y.view(B*T))
            losses.append(loss.item())
    return np.mean(losses)

@torch.no_grad()
def generate_text(model, tokenizer, prompt, max_new_tokens=50, temperature=1.0, top_k=40, device="cpu"):
    model.eval()
    tokens = tokenizer.EncodeAsIds(prompt)
    input_ids = torch.tensor(tokens, dtype=torch.long, device=device)[None,:]
    for _ in range(max_new_tokens):
        idx_cond = input_ids[:, -model.config.block_size:]
        logits = model(idx_cond)
        logits = logits[:, -1, :] / temperature
        if top_k is not None:
            v, ix = torch.topk(logits, top_k, dim=-1)
            probs = F.softmax(v, dim=-1)
            next_id = ix[0, torch.multinomial(probs[0], 1)]
        else:
            probs = F.softmax(logits, dim=-1)
            next_id = torch.multinomial(probs[0], 1)
        input_ids = torch.cat([input_ids, next_id.view(1,1)], dim=1)
    return tokenizer.DecodeIds(input_ids[0].tolist())


In [7]:
# Change this to your dataset file path
corpus_file = "wizard_of_oz.txt"  

# Train tokenizer
if not os.path.exists("spm.model"):
    train_sentencepiece(corpus_file, "spm", vocab_size=8000)
    
tokenizer = load_tokenizer("spm.model")

# Load corpus into tokens
with open(corpus_file, "r", encoding="utf-8") as f:
    raw = f.read()
token_ids = np.array(tokenizer.EncodeAsIds(raw), dtype=np.int32)

# Split train/val
split = int(len(token_ids)*0.95)
train_ids, val_ids = token_ids[:split], token_ids[split:]


Tokenizer saved as: spm.model


In [8]:
block_size = 128
batch_size = 8

train_ds = RandomWindowTextDataset(train_ids, seq_len=block_size)
val_ds   = RandomWindowTextDataset(val_ids, seq_len=block_size)

train_loader = DataLoader(train_ds, batch_size=batch_size, shuffle=True)
val_loader   = DataLoader(val_ds, batch_size=batch_size)


In [9]:
device = "cpu"
cfg = GPTConfig(vocab_size=tokenizer.GetPieceSize(), block_size=block_size,
                n_layer=4, n_head=4, n_embd=256)

model = TinyGPT(cfg).to(device)
optimizer = torch.optim.AdamW(model.parameters(), lr=3e-4)


In [10]:
max_steps = 500  # adjust as needed
log_interval = 50

model.train()
step = 0
for epoch in range(10):  # just loop enough until steps reached
    for x,y in train_loader:
        if step >= max_steps:
            break
        x,y = x.to(device), y.to(device)
        logits = model(x)
        B,T,V = logits.size()
        loss = F.cross_entropy(logits.view(B*T,V), y.view(B*T))

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        if step % log_interval == 0:
            val_loss = evaluate(model, val_loader, device)
            print(f"Step {step} | Train loss {loss.item():.4f} | Val loss {val_loss:.4f}")
        step += 1
    if step >= max_steps:
        break


Step 0 | Train loss 9.1077 | Val loss 8.9825
Step 50 | Train loss 5.9681 | Val loss 6.1737
Step 100 | Train loss 5.1545 | Val loss 5.7972
Step 150 | Train loss 4.8863 | Val loss 5.6213
Step 200 | Train loss 4.8833 | Val loss 5.4900
Step 250 | Train loss 4.4175 | Val loss 5.4318
Step 300 | Train loss 4.4837 | Val loss 5.4036
Step 350 | Train loss 4.0470 | Val loss 5.3530
Step 400 | Train loss 4.0362 | Val loss 5.3476
Step 450 | Train loss 4.1814 | Val loss 5.3393


In [11]:
print(generate_text(model, tokenizer, prompt="Once upon a time", max_new_tokens=200, temperature=1.0))


Once upon a time the buggy with the top of a corner. The Gargoyles, and the roof, who was so that we do." "Oh, too," said the buggy and the earth to go away." He, who had no way. As the other that no time I shall we were you when Jim, after of our own big horse, Jim, and Dorothy, for such a little man was a small crack in her face. "But in a moment her breath--or he has been horrified; "and are any way to me to this thing it. Also I suppose you will be afraid I had gone. No one of him. Just then you all my dear we can know this he said to the buggy and you are all the little Wizard, I can't you the piglets and I found the Sorcerer, so you with us up in the floor!" cried Dorothy?" asked the girl. "Why, too," continued another, but there. The Wizard now, and if I left you?" asked
