In [None]:
import matplotlib.pyplot as plt
import torch
import time

from model import *

USE_LARGE_MODEL = True

In [None]:
torch.manual_seed(42)

# Model parameters (small)
block_size = 8
batch_size = 32
d_model=32
n_heads=4
d_head=d_model//n_heads
n_blocks=4
d_inner=4*d_model
dropout=0.2

if USE_LARGE_MODEL:
    # Model parameters (large)
    block_size = 256
    batch_size = 64
    d_model=384
    n_heads=6
    d_head=d_model//n_heads
    n_blocks=6
    d_inner=4*d_model
    dropout=0.2

# Training parameters
max_iters = 5000
eval_interval = 50
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [None]:
# load data
with open("data/tiny_shakespeare.txt", "r") as f:
    text = f.read()
print(text[:200])

In [None]:
# tokenizer
chars = sorted(list(set(text)))
vocab_size = len(chars)

c2i = {c: i for i, c in enumerate(chars)}
i2c = {i: c for i, c in enumerate(chars)}

encode = lambda s: [c2i[c] for c in s]
decode = lambda l: "".join([i2c[i] for i in l])

In [None]:
# train and test data
data = torch.tensor(encode(text), dtype=torch.long)
n = int(len(data) * 0.9)

train_data = data[:n]
val_data = data[n:]

In [None]:
# data loader
def get_batch(split):
    data = train_data if split == "train" else val_data
    ix = torch.randint(len(data) - block_size, (batch_size,))
    x = torch.stack([data[i : i + block_size] for i in ix])
    y = torch.stack([data[i + 1 : i + block_size + 1] for i in ix])
    x, y = x.to(device), y.to(device)
    return x, y

In [None]:
# loss
def estimate_loss(model, eval_iters=200):
    out = {}
    model.eval()
    for split in ["train", "val"]:
        losses = torch.zeros(eval_iters)
        for i in range(eval_iters):
            x, y = get_batch(split)
            logitis, loss = model(x, y)
            losses[i] = loss.item()
        out[split] = losses.mean()
    model.train()
    return out

In [None]:
# model
language_model = LanguageModel(
    vocab_size=vocab_size,
    block_size=block_size,
    d_model=d_model,
    n_heads=n_heads,
    d_head=d_head,
    n_blocks=n_blocks,
    d_inner=d_inner,
    dropout=dropout,
).to(device)

# number of parameters
n_params = sum(p.numel() for p in language_model.parameters() if p.requires_grad)

In [None]:
# optimizer
optimizer = torch.optim.AdamW(language_model.parameters(), lr=0.001)

In [None]:
# training
t_start = time.time()
loss_history = {
    "train": [],
    "val": [],
    "step": [],
}
for iter in range(max_iters):

    # evaluate current performance
    if iter % eval_interval == 0:
        losses = estimate_loss(language_model)
        ts = (time.time() - t_start)
        print(f"step: {iter:04d}, ts: {ts/60:.2f} min, train loss: {losses['train']:.4f}, val loss: {losses['val']:.4f}", end="\r")
        loss_history["train"].append(float(losses["train"]))
        loss_history["val"].append(float(losses["val"]))
        loss_history["step"].append(iter)
        
    # next batch
    xb, yb = get_batch("train")

    # train
    logitis, loss = language_model(xb, yb)
    optimizer.zero_grad(set_to_none=True)
    loss.backward()
    optimizer.step()

print("\n")
print(f"Number of parameters: {n_params}")
print(f"Training time:        {(time.time() - t_start)/60:.2f} min")

In [None]:
plt.plot(loss_history["step"], loss_history["train"], label="train")
plt.plot(loss_history["step"], loss_history["val"], label="val")
plt.legend()
plt.xlabel("step")
plt.ylabel("loss")
plt.title("Loss history")
plt.show()

In [None]:
# generate text
context = torch.zeros(1, 1, dtype=torch.long).to(device)
ret = language_model.generate(context, max_new_tokens=500)[0].tolist()
print(decode(ret))