# GPT from scratch

In [9]:
import os
import requests
import tiktoken
import numpy as np

input_file_path = os.path.join('Data', 'input.txt')
if not os.path.exists(input_file_path):
    data_url = 'https://raw.githubusercontent.com/karpathy/char-rnn/master/data/tinyshakespeare/input.txt'
    with open(input_file_path, 'w', encoding='utf-8') as f:
        f.write(requests.get(data_url).text)

with open(input_file_path, 'r', encoding='utf-8') as f:
    text = f.read()


##### Tokenizing dataset

In [10]:
# this is just for character-level tokentization as i only have a mac M4
# Sentencepiece is whats used commonly within the NLP community
chars = sorted(list(set(text)))
vocab_size = len(chars) # gpt2 is around 50000

# Encode and decode 
stoi = { ch:i for i,ch in enumerate(chars) }
itos = { i:ch for i,ch in enumerate(chars) }
encode = lambda s: [stoi[c] for c in s] # encoder: take a string, output a list of integers
decode = lambda l: ''.join([itos[i] for i in l]) # decoder: take a list of integers, output a string


In [16]:
import torch
from torch.utils.data import Dataset, DataLoader

torch.manual_seed(1337)
class CharDataset(Dataset):
    def __init__(self, data, block_size):
        self.data = data
        self.block_size = block_size
        
    def __len__(self):
        return len(self.data) - self.block_size - 1
    
    def __getitem__(self, idx):
        # Get a chunk of text of size block_size + 1
        chunk = self.data[idx:idx + self.block_size + 1]
    
        # Split into input and target
        x = torch.tensor(chunk[:-1], dtype=torch.long)
        y = torch.tensor(chunk[1:], dtype=torch.long)
        return x, y


tokenized_data = torch.tensor(encode(text), dtype=torch.long)
n = len(tokenized_data)
train_data = tokenized_data[:int(n*0.9)]
val_data = tokenized_data[int(n*0.9):]

train_dataset = CharDataset(train_data, block_size=16)
val_dataset = CharDataset(val_data, block_size=16)

# Create data loaders
batch_size = 4  
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)


for batch_idx, (x_batch, y_batch) in enumerate(train_loader):
    if batch_idx == 0:  # Just show the first batch
        print("Input batch shape:", x_batch.shape)
        print("Target batch shape:", y_batch.shape)
        print("Sample input:", x_batch[0])
        print("Sample target:", y_batch[0])
        break

Input batch shape: torch.Size([4, 16])
Target batch shape: torch.Size([4, 16])
Sample input: tensor([57, 58, 47, 50, 47, 58, 63,  0, 32, 53,  1, 57, 43, 43, 49,  1])
Sample target: tensor([58, 47, 50, 47, 58, 63,  0, 32, 53,  1, 57, 43, 43, 49,  1, 58])


  x = torch.tensor(chunk[:-1], dtype=torch.long)
  y = torch.tensor(chunk[1:], dtype=torch.long)


## BIgram LM

In [None]:
import torch.nn as nn
from torch.nn import functional as F 

torch.manual_seed(1337)

class BigramlanguageModel(nn.Module):

    def __init__(self, vocab_size) -> None:
        super().__init__()
        self.token_embedding_table = nn.Embedding(vocab_size, vocab_size) # used to pluck out the embedding of each input idx
        # The second dimension is also vocab_size because each token's embedding is used directly as logits
    
    def forward(self, idx, targets=None):

        # idx and targets are both (B, T) tensor of integers
        logits = self.token_embedding_table(idx) # (B, T, C) (batch, time, channel) (4, 8, vocab_size)

        if targets is None:
            loss = None
        else:
            B, T, C = logits.shape
            logits = logits.view(B*T, C)
            targets = targets.view(-1)
            loss = F.cross_entropy(logits, targets) # cross entripy exepects (B,C, T)
        
        return logits, loss

    def generate(self, idx, max_new_tokens):
        for _ in range(max_new_tokens):
            logits, loss = self(idx)
            logits = logits[:, -1, :] # Focus on the last time step
            probs = F.softmax(logits, dim=-1)
            idx_next = torch.multinomial(probs, num_samples=1)
            idx = torch.cat((idx, idx_next), dim=1)
        return idx

blm= BigramlanguageModel(vocab_size)
logits, loss = blm(x_batch, y_batch)
print(logits.shape, loss)

idx = torch.zeros((1, 1), dtype=torch.long)
idx = blm.generate(idx, max_new_tokens=50)[0].tolist()
print(decode(idx))

torch.Size([64, 65]) tensor(4.7988, grad_fn=<NllLossBackward0>)

SKIcLT;AcELMoTbvZv C?nq-QE33:CJqkOKH-q;:la!oiywkHj


This is a simplified approach where the model learns to predict the next token based solely on the current token's embedding, without any additional processing. In more complex models (like full Transformers), these embeddings would be further processed through multiple layers of self-attention and feed-forward networks before producing the final logits.


In [18]:
# Training
optimizer = torch.optim.AdamW(blm.parameters(), lr=1e-3)

# Create data loaders
batch_size = 32
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)

# Training
num_epochs = 5

for epoch in range(num_epochs):
    # Training phase
    blm.train()  # Set the model to training mode
    train_loss = 0.0
    train_batches = 0
    
    for x_batch, y_batch in train_loader:
        x_batch, y_batch = x_batch, y_batch
        
        optimizer.zero_grad(set_to_none=True)
        _, loss = blm(x_batch, y_batch)
        loss.backward()
        optimizer.step()
        
        train_loss += loss.item()
        train_batches += 1
    
    # Calculate average training loss for the epoch
    avg_train_loss = train_loss / train_batches
    
    # Validation phase
    blm.eval()  # Set the model to evaluation mode
    val_loss = 0.0
    val_batches = 0
    
    with torch.no_grad():  # No need to track gradients during validation
        for x_batch, y_batch in val_loader:
            x_batch, y_batch = x_batch, y_batch
            _, loss = blm(x_batch, y_batch)
            val_loss += loss.item()
            val_batches += 1
    
    # Calculate average validation loss
    avg_val_loss = val_loss / val_batches
    
    print(f'Epoch {epoch + 1}/{num_epochs}, ' 
          f'Train Loss: {avg_train_loss:.4f}, '
          f'Val Loss: {avg_val_loss:.4f}')


  x = torch.tensor(chunk[:-1], dtype=torch.long)
  y = torch.tensor(chunk[1:], dtype=torch.long)


Epoch 1/5, Train Loss: 2.5626, Val Loss: 2.4875
Epoch 2/5, Train Loss: 2.4526, Val Loss: 2.4894
Epoch 3/5, Train Loss: 2.4525, Val Loss: 2.4905
Epoch 4/5, Train Loss: 2.4525, Val Loss: 2.4898
Epoch 5/5, Train Loss: 2.4525, Val Loss: 2.4901


In [21]:
idx = torch.zeros((1, 1), dtype=torch.long)
idx = blm.generate(idx, max_new_tokens=100)[0].tolist()
print(decode(idx))


BRDY LELERILA:
Whistr.

Age us r f istend aly n;

ORole tars otat CATousur,

Al lillong m ato s-bly,


##### The loss plateaus at 2.45. The model predictions do look like words but it still sucks because we are only predicting based on a context of one single token.
-> > the objective is to broaden this context and see how that enhances our predictions.

In [49]:
class BigramMeanModel(nn.Module):
    def __init__(self, vocab_size, block_size):
        super().__init__()
        self.token_embeddings = nn.Embedding(vocab_size, vocab_size)
        self.block_size = block_size
        
    def forward(self, idx, targets=None):
        B, T = idx.shape
        device = idx.device
        
        # Ensure we don't process more than block_size tokens
        idx = idx[:, -self.block_size:]  # (B, min(T, block_size))
        T = idx.shape[1]  # Update T to actual sequence length
        
        # Get token embeddings: (B, T, C)
        token_embeddings = self.token_embeddings(idx)
        
        # Create a lower triangular matrix for masking (including current token)
        mask = torch.tril(torch.ones(T, T, device=device))  # (T, T)
        mask = mask.unsqueeze(0)  # (1, T, T)
        
        # Compute cumulative sum of embeddings and counts
        cum_embeddings = torch.einsum('btc,bts->bsc', token_embeddings, mask)  # (B, T, C)
        counts = mask.sum(dim=2, keepdim=True)  # (1, T, 1)
        mean_embeddings = cum_embeddings / (counts + 1e-8)  # (B, T, C)
        
        # Project to vocabulary space
        logits = mean_embeddings @ self.token_embeddings.weight.T  # (B, T, V)
        
        if targets is None:
            loss = None
        else:
            # For loss, we predict the next token
            logits = logits[:, :-1, :]  # (B, T-1, V)
            targets = targets[..., 1:T]  # (B, T-1)
            
            # Reshape for cross_entropy
            logits_flat = logits.reshape(-1, logits.size(-1))  # (B*(T-1), V)
            targets_flat = targets.reshape(-1)  # (B*(T-1),)
            loss = F.cross_entropy(logits_flat, targets_flat)
            
        return logits, loss
    
    def generate(self, idx, max_new_tokens):
        # idx is (B, T) array of indices in the current context
        for _ in range(max_new_tokens):
            # Crop idx to the last block_size tokens
            idx_cond = idx[:, -self.block_size:]
            # Get the predictions
            logits, _ = self(idx_cond)
            # Focus on the last time step
            logits = logits[:, -1, :]  # (B, V)
            # Get the probabilities
            probs = F.softmax(logits, dim=-1)
            # Sample from the distribution
            idx_next = torch.multinomial(probs, num_samples=1)  # (B, 1)
            # Append sampled index to the running sequence
            idx = torch.cat((idx, idx_next), dim=1)  # (B, T+1)
        return idx

block_size = 5  # context window
bmm = BigramMeanModel(vocab_size, block_size)

# This should now work with any sequence length
x_batch = torch.randint(0, vocab_size, (4, 10))  # Batch of 4 sequences, each of length 10
y_batch = torch.randint(0, vocab_size, (4, 10))

logits, loss = bmm(x_batch, y_batch)
print(logits.shape, loss)

# Initialize with a non-zero starting token
context = torch.ones((8, 8), dtype=torch.long)  # Start with token 1
generated = bmm.generate(context, max_new_tokens=10)
print(generated[0].tolist())  # Convert to list for better readability

torch.Size([4, 4, 65]) tensor(42.6758, grad_fn=<NllLossBackward0>)
[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]


In [50]:
torch.ones((8, 8), dtype=torch.long) 

tensor([[1, 1, 1, 1, 1, 1, 1, 1],
        [1, 1, 1, 1, 1, 1, 1, 1],
        [1, 1, 1, 1, 1, 1, 1, 1],
        [1, 1, 1, 1, 1, 1, 1, 1],
        [1, 1, 1, 1, 1, 1, 1, 1],
        [1, 1, 1, 1, 1, 1, 1, 1],
        [1, 1, 1, 1, 1, 1, 1, 1],
        [1, 1, 1, 1, 1, 1, 1, 1]])

# Self-Attention

In [None]:

# This should now work with any sequence length
x_batch = torch.randint(0, vocab_size, (4, 8))  # Batch of 4 sequences, each of length 10
y_batch = torch.randint(0, vocab_size, (4, 8))
print(x_batch)  # Should print: torch.Size([4, 7, vocab_size]) tensor(...)

logits, loss = bmm(x_batch, y_batch)


tensor([[11,  0,  6, 37, 14, 58, 37, 15],
        [35, 54, 13, 23, 35,  3,  1, 42],
        [49, 50,  6, 56,  2, 13, 31, 41],
        [22, 19, 28, 23,  8, 18, 23, 19]])
