# Baseline GPT model
* One-headed decoder-only self-attention transformer model

In [38]:
import torch
import torch.nn as nn
from torch.nn import functional as F

torch.manual_seed(1337)

# wget https://raw.githubusercontent.com/karpathy/char-rnn/master/data/tinyshakespeare/input.txt
with open('input.txt', 'r', encoding='utf-8') as f:
    text = f.read()


# here are all the unique characters that occur in this text
chars = sorted(list(set(text)))
vocab_size = len(chars)
# create a mapping from characters to integers
stoi = { ch:i for i,ch in enumerate(chars) }
itos = { i:ch for i,ch in enumerate(chars) }
encode = lambda s: [stoi[c] for c in s] # encoder: take a string, output a list of integers
decode = lambda l: ''.join([itos[i] for i in l]) # decoder: take a list of integers, output a string

# Train and test splits
data = torch.tensor(encode(text), dtype=torch.long)
n = int(0.9*len(data)) # first 90% will be train, rest val
train_data = data[:n]
val_data = data[n:]


In [39]:
batch_size, block_size, n_embed = 4, 8, 32  # AKA: B, T, C (Batch, Token, Channel)

In [40]:
# data loading
def get_batch(split):
    # generate a small batch of data of inputs x and targets y
    data = train_data if split == 'train' else val_data
    ix = torch.randint(len(data) - block_size, (batch_size,))
    x = torch.stack([data[i:i+block_size] for i in ix])
    y = torch.stack([data[i+1:i+block_size+1] for i in ix])
    return x, y

x, y = get_batch('train')

In [41]:
word_embed = nn.Embedding(num_embeddings=vocab_size, embedding_dim=n_embed)
pos_embed = nn.Embedding(num_embeddings=block_size, embedding_dim=n_embed)

# word_embed(torch.tensor(123))
# word_embed(x)
# pos_embed(torch.arange(block_size))
# (word_embed(x) + pos_embed(torch.arange(block_size))).shape

In [42]:
class SA(nn.Module):
    def __init__(self):
        super().__init__()
        self.value = nn.Linear(n_embed, n_embed)
        self.key = nn.Linear(n_embed, n_embed)
        self.query = nn.Linear(n_embed, n_embed)
        self.register_buffer('tril', torch.tril(torch.ones(block_size, block_size)))
        
    def forward(self, x):
        B, T, C = x.shape
        v, k, q = self.value(x), self.key(x), self.query(x)
        wei = q @ k.transpose(-2,-1) * k.shape[-1]**-0.5 # (B, T, hs) @ (B, hs, T) -> (B, T, T)
        wei = wei.masked_fill(self.tril[:T, :T] == 0, float('-inf')) # (B, T, T)
        wei = F.softmax(wei, dim=-1) # (B, T, T)
        # perform the weighted aggregation of the values
        out = wei @ v # (B, T, T) @ (B, T, hs) -> (B, T, hs)
        return out
    
class FeedFoward(nn.Module):
    """ a simple linear layer followed by a non-linearity to get probs for """

    def __init__(self, n_embd):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(n_embd, n_embd),
            nn.ReLU(),
            nn.Linear(n_embd, vocab_size),
        )

    def forward(self, x):
        return self.net(x)

class GPT(nn.Module):
    def __init__(self) -> None:
        super().__init__()
        self.word_embed = nn.Embedding(vocab_size, n_embed)
        self.pos_embed = nn.Embedding(block_size, n_embed)
        self.sa = SA()
        self.net = FeedFoward(n_embed)
        
    def forward(self, x, y=None):
        B, T = x.shape
        # print(f"*******B: {B}, T: {T}*******")
        x = self.word_embed(x)
        # print(x.shape)
        x = x + self.pos_embed(torch.arange(T))
        x = x + self.sa(x) # Residual connection
        logits = self.net(x)
        
        if not y: 
            loss = None
        else:
            # https://pytorch.org/docs/stable/generated/torch.nn.CrossEntropyLoss.html
            B, T, C = logits.shape
            logits = logits.view(B*T, C)
            targets = targets.view(B*T)
            loss = F.cross_entropy(logits, targets)
            
        return logits, loss # loss is needed for training steps. logits is needed as probs for token generation.
    
    def generate(self, idx, max_new_tokens):
        # idx is (B, T) array of indices in the current context
        for _ in range(max_new_tokens):
            # crop idx to the last block_size tokens
            idx_cond = idx[:, -block_size:]
            # get the predictions
            logits, loss = self(idx_cond)
            # focus only on the last time step
            logits = logits[:, -1, :] # becomes (B, C)
            # apply softmax to get probabilities
            probs = F.softmax(logits, dim=-1) # (B, C)
            # sample from the distribution
            idx_next = torch.multinomial(probs, num_samples=1) # (B, 1)
            # append sampled index to the running sequence
            idx = torch.cat((idx, idx_next), dim=1) # (B, T+1)
        return idx

In [43]:
idx = torch.zeros((1, 1), dtype=torch.long)
gpt = GPT()

In [44]:
optimizer = torch.optim.AdamW(gpt.parameters(), lr=3e-4)

max_iters = 1_000
for iter in range(max_iters):

    # # every once in a while evaluate the loss on train and val sets
    # if iter % eval_interval == 0 or iter == max_iters - 1:
    #     losses = estimate_loss()
    #     print(f"step {iter}: train loss {losses['train']:.4f}, val loss {losses['val']:.4f}")

    # sample a batch of data
    xb, yb = get_batch('train')

    # evaluate the loss
    logits, loss = gpt(xb, yb)
    optimizer.zero_grad(set_to_none=True)
    loss.backward()
    optimizer.step()


RuntimeError: Boolean value of Tensor with more than one value is ambiguous

In [33]:
for _ in range(10):
    print(_)
    print(f"idx: {idx}")
    idx_cond = idx[:, -block_size:]
    print(f"idx_cond: {idx_cond}")
    # get the predictions
    logits, loss = gpt(idx_cond)
    # focus only on the last time step
    logits = logits[:, -1, :] # becomes (B, C)
    # apply softmax to get probabilities
    probs = F.softmax(logits, dim=-1) # (B, C)
    # sample from the distribution
    idx_next = torch.multinomial(probs, num_samples=1) # (B, 1)
    print(f"idx_next: {idx_next}")
    # append sampled index to the running sequence
    idx = torch.cat((idx, idx_next), dim=1) # (B, T+1)
    print("=========\n")


0
idx: tensor([[0]])
idx_cond: tensor([[0]])
idx_next: tensor([[24]])

1
idx: tensor([[ 0, 24]])
idx_cond: tensor([[ 0, 24]])
idx_next: tensor([[49]])

2
idx: tensor([[ 0, 24, 49]])
idx_cond: tensor([[ 0, 24, 49]])
idx_next: tensor([[18]])

3
idx: tensor([[ 0, 24, 49, 18]])
idx_cond: tensor([[ 0, 24, 49, 18]])
idx_next: tensor([[63]])

4
idx: tensor([[ 0, 24, 49, 18, 63]])
idx_cond: tensor([[ 0, 24, 49, 18, 63]])
idx_next: tensor([[5]])

5
idx: tensor([[ 0, 24, 49, 18, 63,  5]])
idx_cond: tensor([[ 0, 24, 49, 18, 63,  5]])
idx_next: tensor([[37]])

6
idx: tensor([[ 0, 24, 49, 18, 63,  5, 37]])
idx_cond: tensor([[ 0, 24, 49, 18, 63,  5, 37]])
idx_next: tensor([[52]])

7
idx: tensor([[ 0, 24, 49, 18, 63,  5, 37, 52]])
idx_cond: tensor([[ 0, 24, 49, 18, 63,  5, 37, 52]])
idx_next: tensor([[57]])

8
idx: tensor([[ 0, 24, 49, 18, 63,  5, 37, 52, 57]])
idx_cond: tensor([[24, 49, 18, 63,  5, 37, 52, 57]])
idx_next: tensor([[18]])

9
idx: tensor([[ 0, 24, 49, 18, 63,  5, 37, 52, 57, 18]])
idx_

In [16]:
idx_cond

tensor([[0]])