In [1]:
import torch
import torch.nn as nn
from torch.nn import functional as F

In [None]:
# check for GPU for faster computation
device = 'cuda' if torch.cuda.is_available() else 'cpu'
print(device)

cuda


In [None]:
batch_size = 64         # number of samples to process in parallel
block_size = 256        # context window size
max_iters = 5000        # number of training iterations
eval_interval = 500     # number of iterations between evaluations
learning_rate = 3e-4    # learning rate for the optimizer
eval_iters = 200        # number of iterations to evaluate the model
n_embd = 384            # embedding dimension
n_head = 6              # number of attention heads
n_layer = 6             # number of transformer layers
dropout = 0.2           # dropout rate

In [None]:
# load the text data
with open('wizard_of_oz.txt', 'r', encoding='utf-8') as file:
    text = file.read()
chars = ''.join(sorted(set(text)))
print(chars)
vocab_size = len(chars)


 !"&'()*,-.0123456789:;?ABCDEFGHIJKLMNOPQRSTUVWXYZ[]_abcdefghijklmnopqrstuvwxyz﻿


In [None]:
stoi = { ch:i for i, ch in enumerate(chars) }     # mapping: string -> int
itos = { i:ch for i, ch in enumerate(chars) }     # mapping: int -> string
encode = lambda s: [stoi[c] for c in s]           # convert string to list of integers
decode = lambda l: ''.join([itos[i] for i in l])  # convert list of integers to string

data = torch.tensor(encode(text), dtype=torch.long)
print(data)

tensor([80, 44, 61,  ..., 29, 67, 57])


232458

In [15]:
# split the data into training (80%) and validation (20%)
n = int(0.8 * len(data))
train_data = data[:n]
val_data = data[n:]

In [17]:
def get_batch(split='train'):
    """
    Get a random batch of data from the training or validation set.
    """
    data = train_data if split == 'train' else val_data
    starting_indices = torch.randint(len(data) - block_size, (batch_size, 1)) # (batch_size, 1)
    x = torch.stack([data[i:i+block_size] for i in starting_indices]) # (batch_size, block_size)
    y = torch.stack([data[i+1:i+block_size+1] for i in starting_indices]) # (batch_size, block_size)
    x, y = x.to(device), y.to(device)
    return x, y

In [None]:
@torch.no_grad() # disable gradient computation because we are only evaluating
def estimate_loss(model):
    out = {}
    model.eval() # set the model to evaluation mode
    for split in ['train', 'val']:
        losses = torch.zeros(eval_iters) # store the loss for each iteration
        for k in range(eval_iters):
            X, Y = get_batch(split) # get random batch
            logits, loss = model.forward(X, Y) # pass through the neural network and return loss
            losses[k] = loss.item() # store the loss
        out[split] = losses.mean()
    model.train() # set the model back to training mode
    return out # {'train': train_loss, 'val': val_loss}

In [None]:
class Head(nn.Module):
    """ one head of self-attention """

    def __init__(self, head_size):
        super().__init__()
        self.key = nn.Linear(n_embd, head_size, bias=False)
        self.query = nn.Linear(n_embd, head_size, bias=False)
        self.value = nn.Linear(n_embd, head_size, bias=False)
        self.register_buffer('tril', torch.tril(torch.ones(block_size, block_size)))

        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        # input of size (batch, time-step, channels)
        # output of size (batch, time-step, head size)
        B,T,C = x.shape
        k = self.key(x)   # (B,T,hs)
        q = self.query(x) # (B,T,hs)
        # compute attention scores ("affinities")
        wei = q @ k.transpose(-2,-1) * k.shape[-1]**-0.5 # (B, T, hs) @ (B, hs, T) -> (B, T, T)
        wei = wei.masked_fill(self.tril[:T, :T] == 0, float('-inf')) # (B, T, T)
        wei = F.softmax(wei, dim=-1) # (B, T, T)
        wei = self.dropout(wei)
        # perform the weighted aggregation of the values
        v = self.value(x) # (B,T,hs)
        out = wei @ v # (B, T, T) @ (B, T, hs) -> (B, T, hs)
        return out

In [None]:
class MultiHeadAttention(nn.Module):
    """ multiple heads of self-attention in parallel """

    def __init__(self, num_heads, head_size):
        super().__init__()
        self.heads = nn.ModuleList([Head(head_size) for _ in range(num_heads)])
        self.proj = nn.Linear(head_size * num_heads, n_embd)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        out = torch.cat([h(x) for h in self.heads], dim=-1)
        out = self.dropout(self.proj(out))
        return out

In [None]:
class FeedFoward(nn.Module):
    """ a simple linear layer followed by a non-linearity """

    def __init__(self, n_embd):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(n_embd, 4 * n_embd),
            nn.ReLU(),
            nn.Linear(4 * n_embd, n_embd),
            nn.Dropout(dropout),
        )

    def forward(self, x):
        return self.net(x)

In [None]:
class Block(nn.Module):
    """ Transformer block: communication followed by computation """

    def __init__(self, n_embd, n_head):
        # n_embd: embedding dimension, n_head: the number of heads we'd like
        super().__init__()
        head_size = n_embd // n_head
        self.sa = MultiHeadAttention(n_head, head_size)
        self.ffwd = FeedFoward(n_embd)
        self.ln1 = nn.LayerNorm(n_embd)
        self.ln2 = nn.LayerNorm(n_embd)

    def forward(self, x):
        x = x + self.sa(self.ln1(x))
        x = x + self.ffwd(self.ln2(x))
        return x

In [None]:
class LLM(nn.Module):
    def __init__(self):
        super().__init__()
        # each token directly reads off the logits for the next token from a lookup table
        self.token_embedding_table = nn.Embedding(vocab_size, n_embd)
        self.position_embedding_table = nn.Embedding(block_size, n_embd)
        self.blocks = nn.Sequential(*[Block(n_embd, n_head=n_head) for _ in range(n_layer)])
        self.ln_f = nn.LayerNorm(n_embd) # final layer norm
        self.lm_head = nn.Linear(n_embd, vocab_size) # output layer

        self.apply(self._init_weights)

    def _init_weights(self, module):
        if isinstance(module, nn.Linear):
            torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)
            if module.bias is not None:
                torch.nn.init.zeros_(module.bias)
        elif isinstance(module, nn.Embedding):
            torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)

    def forward(self, idx, targets=None):
        B, T = idx.shape

        # idx and targets are both (B,T) tensor of integers
        tok_emb = self.token_embedding_table(idx) # (B,T,C)
        pos_emb = self.position_embedding_table(torch.arange(T, device=device)) # (T,C)
        x = tok_emb + pos_emb # (B,T,C)
        x = self.blocks(x) # (B,T,C)
        x = self.ln_f(x) # (B,T,C)
        logits = self.lm_head(x) # (B,T,vocab_size)

        if targets is None:
            loss = None
        else:
            B, T, C = logits.shape
            logits = logits.view(B*T, C)
            targets = targets.view(B*T)
            loss = F.cross_entropy(logits, targets)

        return logits, loss

    def generate(self, idx, max_new_tokens):
        # idx is (B, T) array of indices in the current context
        for _ in range(max_new_tokens):
            # crop idx to the last block_size tokens
            idx_cond = idx[:, -block_size:]
            # get the predictions
            logits, loss = self(idx_cond)
            # focus only on the last time step
            logits = logits[:, -1, :] # becomes (B, C)
            # apply softmax to get probabilities
            probs = F.softmax(logits, dim=-1) # (B, C)
            # sample from the distribution
            idx_next = torch.multinomial(probs, num_samples=1) # (B, 1)
            # append sampled index to the running sequence
            idx = torch.cat((idx, idx_next), dim=1) # (B, T+1)
        return idx

model = LLM()
m = model.to(device)

In [None]:
optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate)

for iter in range(max_iters+1):
    if iter % eval_interval == 0:
        losses = estimate_loss(model)
        print(f"step {iter}: train loss {losses['train']:.4f}, val loss {losses['val']:.4f}")

    xb, yb = get_batch('train') # get a random batch of data

    logits, loss = model(xb, yb) # forward pass through the neural network, compute logits and loss
    optimizer.zero_grad(set_to_none=True) # clear previous gradients
    loss.backward() # backpropagate to compute gradients
    optimizer.step() # update the model weights

In [22]:
context = torch.zeros((1, 1), dtype=torch.long, device=device) # initialize context
print(decode(m.generate(context, max_new_tokens=500)[0].tolist())) # generate text


gaing soring Veres annouched of the girles eachd insonse;
ficely to from up pitlet. But soome Wizard prosething apporen there
mitter one for rom the buggy and of they the back with a Land see reantted
olklver walk the know Kinged his ladt akige and of struesh turt had Tile overyfor dingscopent
somently eep siving sily.

"Ohad buends," down, "he rettle bigs. "I've do milto
me.
"I a have was you centlet," retureked und turnedied. "But I'm treetter
peclue sitenting out is yhis arve is,eennow deer w
