<a href="https://colab.research.google.com/github/Michael-Sylvester/Ashesi-Deep-Learning/blob/main/From_Scratch_Transformer.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Transformer Model from scratch

We'll be training a model on the works of shakespear to generate

In [9]:
import kagglehub
import torch
import torch.nn as nn
import torch.nn.functional as F
import math
import os


In [11]:
# Download dataset
data_path = kagglehub.dataset_download("nikhilxb/shakespeare")

print("Path to dataset files:", data_path)

Using Colab cache for faster access to the 'shakespeare' dataset.
Path to dataset files: /kaggle/input/shakespeare


In [13]:

# Load text data
text_path = "/kaggle/input/shakespeare/final/shakeGPT/final.txt"

#text_path = os.path.join(data_path, "shakeGPT/final.txt")
with open(text_path, 'r') as f:
    text = f.read()
# Create vocabulary from unique characters
chars = sorted(list(set(text)))
stoi = {ch: i for i, ch in enumerate(chars)}
itos = {i: ch for i, ch in enumerate(chars)}
def encode(s):
    return [stoi[c] for c in s]
def decode(l):
    return ''.join([itos[i] for i in l])
# Create train/validation split (90% train, 10% validation)
n = len(text)
train_data = text[:int(n*0.9)]
val_data = text[int(n*0.9):]
vocab_size = len(chars)

# Self attention block

In [16]:
class SelfAttention(nn.Module):
    def __init__(self, embed_size, head_size, block_size):
        super().__init__()
        self.key = nn.Linear(embed_size, head_size, bias=False)
        self.query = nn.Linear(embed_size, head_size, bias=False)
        self.value = nn.Linear(embed_size, head_size, bias=False)
        # Create a lower-triangular matrix for masking future tokens
        self.register_buffer('tril', torch.tril(torch.ones(block_size, block_size)))
        self.head_size = head_size

    def forward(self, x):
        B, T, C = x.shape
        k = self.key(x)   # (B, T, head_size)
        q = self.query(x) # (B, T, head_size)
        # Compute attention scores via matrix multiplication
        att = q @ k.transpose(-2, -1)  # (B, T, T)
        # Scale the scores by sqrt(head_size)
        att = att / math.sqrt(self.head_size)
        # Mask out future tokens (set to -inf)
        att = att.masked_fill(self.tril[:T, :T] == 0, float('-inf'))
        # Normalize the scores to probabilities
        att = F.softmax(att, dim=-1)
        # Aggregate values
        v = self.value(x)
        out = att @ v  # (B, T, head_size)
        return out


# Test the self-attention layer
block_size = 128
embed_size = 256
head_size = 64
att_layer = SelfAttention(embed_size, head_size, block_size)
x = torch.randn(1, block_size, embed_size)
output = att_layer(x)

#Posisional encoding block

In [17]:
class PositionalEncoding(nn.Module):
    def __init__(self, embed_size, max_len):
        super().__init__()
        pos = torch.arange(0, max_len).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, embed_size, 2) * -(math.log(10000.0) / embed_size))
        pe = torch.zeros(max_len, embed_size)
        pe[:, 0::2] = torch.sin(pos * div_term)
        pe[:, 1::2] = torch.cos(pos * div_term)
        pe = pe.unsqueeze(0)  # Shape: (1, max_len, embed_size)
        self.register_buffer('pe', pe)

    def forward(self, x):
        # Add positional encoding to the token embeddings
        x = x + self.pe[:, :x.size(1)]
        return x

# Transformer Block

In [14]:
class TransformerBlock(nn.Module):
    def __init__(self, embed_size, head_size, block_size, ffwd_hidden_mult=4, dropout=0.1):
        super().__init__()
        self.sa = SelfAttention(embed_size, head_size, block_size)
        self.ln1 = nn.LayerNorm(embed_size)
        self.ffwd = nn.Sequential(
            nn.Linear(embed_size, ffwd_hidden_mult * embed_size),
            nn.ReLU(),
            nn.Linear(ffwd_hidden_mult * embed_size, embed_size),
            nn.Dropout(dropout)
        )
        self.ln2 = nn.LayerNorm(embed_size)

    def forward(self, x):
        # Apply self-attention with a residual connection
        x = x + self.sa(self.ln1(x))
        # Apply feedforward network with a residual connection
        x = x + self.ffwd(self.ln2(x))
        return x

# Full Transformer Model

In [26]:
class TransformerModel(nn.Module):
    def __init__(self, vocab_size, embed_size, head_size, block_size, num_layers, dropout=0.1):
        super().__init__()
        self.token_embedding = nn.Embedding(vocab_size, embed_size)
        self.positional_encoding = PositionalEncoding(embed_size, block_size)
        self.blocks = nn.Sequential(*[TransformerBlock(embed_size, head_size, block_size, dropout=dropout)
                                        for _ in range(num_layers)])
        self.ln_f = nn.LayerNorm(embed_size)
        self.head = nn.Linear(embed_size, vocab_size)
        self.block_size = block_size # Store block_size for use in generate

    def forward(self, idx, targets=None):
        x = self.token_embedding(idx)
        x = self.positional_encoding(x)
        x = self.blocks(x)
        x = self.ln_f(x)
        logits = self.head(x)

        if targets is None:
            loss = None
        else:
            B, T, C = logits.shape
            logits = logits.view(B*T, C)
            targets = targets.view(B*T)
            loss = F.cross_entropy(logits, targets)
        return logits, loss

    def generate(self, idx, max_new_tokens):
        # Generate new tokens in an autoregressive manner
        for _ in range(max_new_tokens):
            # Crop idx to the last block_size tokens if it's longer
            idx_cond = idx[:, -self.block_size:] if idx.size(1) > self.block_size else idx
            logits, _ = self(idx_cond) # Pass the (potentially cropped) idx
            logits = logits[:, -1, :]  # Focus on last token of the cropped sequence
            probs = F.softmax(logits, dim=-1)
            idx_next = torch.multinomial(probs, num_samples=1)
            idx = torch.cat((idx, idx_next), dim=1)
        return idx

# Model instantiation

In [22]:
# Model instantiation
num_layers = 4
block_size = 128
embed_size = 256
head_size = embed_size # Changed head_size to match embed_size
transformer_model = TransformerModel(vocab_size, embed_size, head_size, block_size, num_layers)

In [23]:
# --- ADD THIS CODE ---

# 1. Convert the raw text strings into LongTensors
train_data_tensor = torch.tensor(encode(train_data), dtype=torch.long)
val_data_tensor = torch.tensor(encode(val_data), dtype=torch.long)

# 2. Create the Batch Loader
def get_batch(split):
    # Select the correct dataset
    data = train_data_tensor if split == 'train' else val_data_tensor

    # Choose random starting points for the batch
    # We need 'batch_size' number of random offsets
    ix = torch.randint(len(data) - block_size, (batch_size,))

    # Stack the sequences to create inputs (x) and targets (y)
    x = torch.stack([data[i:i+block_size] for i in ix])
    y = torch.stack([data[i+1:i+block_size+1] for i in ix])

    # Move to GPU if available
    device = 'cuda' if torch.cuda.is_available() else 'cpu'
    x, y = x.to(device), y.to(device)
    return x, y

# Define a batch size (how many sequences to process in parallel)
batch_size = 32

In [24]:
# --- ADD THIS CODE ---

# Move model to GPU if available
device = 'cuda' if torch.cuda.is_available() else 'cpu'
transformer_model = transformer_model.to(device)

# Create the optimizer (this is the mechanic that updates the weights)
optimizer = torch.optim.AdamW(transformer_model.parameters(), lr=1e-3)

# Training Settings
max_iters = 1000  # For a quick academic run. Increase to 5000 for better results.
eval_interval = 200

print(f"Starting training on {device}...")

for iter in range(max_iters):

    # Every now and then, print the loss so we know it's working
    if iter % eval_interval == 0:
        xb, yb = get_batch('val')
        logits, loss = transformer_model(xb, yb)
        print(f"Step {iter}: Validation Loss = {loss.item():.4f}")

    # 1. Get a batch of training data
    xb, yb = get_batch('train')

    # 2. Forward pass (Make predictions)
    logits, loss = transformer_model(xb, yb)

    # 3. Backward pass (Calculate gradients)
    optimizer.zero_grad(set_to_none=True)
    loss.backward()

    # 4. Update weights
    optimizer.step()

print(f"Final Loss: {loss.item():.4f}")

Starting training on cpu...
Step 0: Validation Loss = 4.6267
Step 200: Validation Loss = 2.1690
Step 400: Validation Loss = 1.9271
Step 600: Validation Loss = 1.8713
Step 800: Validation Loss = 1.7877
Final Loss: 1.5510


In [29]:
# Generate 120 characters of text
context = torch.zeros((1, 1), dtype=torch.long, device=device) # Start with a blank token
print(decode(transformer_model.generate(context, max_new_tokens=120)[0].tolist()))

	t-to the to Jew my to-staid's you my daresA sir. A fear away'd all no prayer fuiltyle but pray lost.
SOMER
'Tis my daugh
