# Creating the GPT Model

## Stage 1: Proof of Concept with Small Dataset

In this initial stage, I'll develop a functioning GPT-2 architecture from scratch and test it on a smaller text sample, namely the Lord of the Rings books. This approach allows it to be validated while working within reasonable computational constraints.

**Objectives:**
- Build all essential components of the transformer architecture
- Train the model on a limited dataset to demonstrate operational functionality
- Establish evaluation metrics for baseline performance
- Document architectural decisions and implementation details

While this initial model won't produce state-of-the-art text generation results, it will serve as a critical proof of concept, confirming that the implementation correctly reflects the theoretical foundations of transformer-based language models.

# Data preparation and sampling

Below is a diagram showing all of the stages to design an LLM.  I will be using Bite Pair Encoding a Sherlock Holmes texts in the below code block to complete Stage 1 part 1.

![GPT Structure](resources/Stage_1_of_LLM_Design.jpg)

In [3]:
%pip install tiktoken
%pip install tqdm
from tqdm.auto import tqdm 
import time
import tiktoken
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m24.0[0m[39;49m -> [0m[32;49m25.0.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip3 install --upgrade pip[0m
Note: you may need to restart the kernel to use updated packages.

[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m24.0[0m[39;49m -> [0m[32;49m25.0.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip3 install --upgrade pip[0m
Note: you may need to restart the kernel to use updated packages.


  from .autonotebook import tqdm as notebook_tqdm


Below are the general config for the GPT model, this is exactly the same as used apart from the context_length which was 1024 is in the real training. Remember the context length in LLMs refers to the maximum number of tokens the model can consider as input when generating text, influencing its ability to maintain coherence and capture long-range dependencies.

In [4]:
GPT_CONFIG_124M = {
    "vocab_size": 50257,   # Vocabulary size
    "context_length": 256, # Shortened context length (orig: 1024)
    "emb_dim": 768,        # Embedding dimension
    "n_heads": 12,         # Number of attention heads
    "n_layers": 12,        # Number of layers
    "drop_rate": 0.1,      # Dropout rate
    "qkv_bias": False      # Query-key-value bias
}

Below I create a window slider to tockenize the text into chunks for later processes, here is a graphical representation.  Note the stride is important because this governs whether there is overlap between tockens which would negatively effect trianing.


![GPT Structure](resources/window.jpg)

In [None]:
class GPTDataset(Dataset):
    """
    Custom dataset for GPT training that converts text into overlapping token sequences.
    
    This class splits a tokenized text into chunks of specified length with a stride,
    creating input-target pairs where the target is the input shifted by one position.
    """
    def __init__(self, txt, tokenizer, max_length, stride):
        self.input_ids = []
        self.target_ids = []

        # Tokenize the entire text
        token_ids = tokenizer.encode(txt, allowed_special={"<|endoftext|>"})

        # Use a sliding window to chunk the book into overlapping sequences of max_length
        for i in range(0, len(token_ids) - max_length, stride):
            input_chunk = token_ids[i:i + max_length]
            target_chunk = token_ids[i + 1: i + max_length + 1]
            self.input_ids.append(torch.tensor(input_chunk))
            self.target_ids.append(torch.tensor(target_chunk))

    def __len__(self):
        return len(self.input_ids)

    def __getitem__(self, idx):
        return self.input_ids[idx], self.target_ids[idx]


def create_dataloader(txt, batch_size=2, max_length=256, 
                         stride=128, shuffle=True, drop_last=True,
                         num_workers=0):
    """
    Create a DataLoader for GPT training.
    """

    # Initialize the tokenizer
    tokenizer = tiktoken.get_encoding("gpt2")    

    # Create dataset
    dataset = GPTDataset(txt, tokenizer, max_length, stride)

    # Create dataloader
    dataloader = DataLoader(
        dataset,
        batch_size=batch_size,
        shuffle=shuffle,
        drop_last=drop_last,
        num_workers=num_workers
    )

    return dataloader

The first thing to do is turn the text into tokens, for modern LLMs we use Byte Pair encoding, this is a subword tokenizera nd results in 50257 unique possible ids.  These token embeddings are then moved into higher dimensional space (768 for GPT 2) to give the tokens meaning in relation to each other.  This is then combined with a positional embedding to make INPUT EMBEDDINGS.

The graph below shows the attention mechanism which is central to the llm work flow, this is a very important part where we take token and give them meaning to turn them into context vectors.  They do this by calculating relationships between tokens using queries, keys, and values, then weighting and summing the values to create a context vector for each token, capturing the importance of other tokens in the sequence. This allows the model to understand the context and relationships within the input.

Queries * Keys transpose gives an attention score.  This is then scaled by the square root of the keys dimension, drop out applied and soft max to get attention weights.  Finally attention weights are mutliplied by values matrices to give context vectors.  This is for one attention head, in the context of my code I have 12 heads

![GPT Structure](resources/multihead.jpg)

In [None]:
class MultiHeadAttention(nn.Module):
    """
    Multi-head attention mechanism as described in the 'Attention is All You Need' paper.
    
    This implementation includes causal (masked) self-attention to ensure that
    predictions for a position can only depend on known outputs from previous positions.
    """
    def __init__(self, d_in, d_out,context_length, dropout, num_heads, qkv_bias=False):
        super().__init__()
        assert (d_out % num_heads == 0)

        self.d_out = d_out
        self.num_heads = num_heads
        self.head_dim = d_out // num_heads
        self.W_query = nn.Linear(d_in, d_out, bias=qkv_bias)
        self.W_key = nn.Linear(d_in, d_out, bias=qkv_bias)
        self.W_value = nn.Linear(d_in, d_out, bias=qkv_bias)
        self.out_proj = nn.Linear(d_out, d_out)
        self.dropout = nn.Dropout(dropout)
        self.register_buffer("mask",torch.triu(torch.ones(context_length, context_length),
        diagonal=1))
    def forward(self, x):
        b, num_tokens, d_in = x.shape
        keys = self.W_key(x)
        queries = self.W_query(x)
        values = self.W_value(x)
        keys = keys.view(b, num_tokens, self.num_heads, self.head_dim)
        values = values.view(b, num_tokens, self.num_heads, self.head_dim)
        queries = queries.view( b, num_tokens, self.num_heads, self.head_dim)
        keys = keys.transpose(1, 2)
        queries = queries.transpose(1, 2)
        values = values.transpose(1, 2)
        attn_scores = queries @ keys.transpose(2, 3)
        mask_bool = self.mask.bool()[:num_tokens, :num_tokens]
        attn_scores.masked_fill_(mask_bool, -torch.inf)
        attn_weights = torch.softmax(
        attn_scores / keys.shape[-1]**0.5, dim=-1)
        attn_weights = self.dropout(attn_weights)
        context_vec = (attn_weights @ values).transpose(1, 2)
        context_vec = context_vec.contiguous().view(b, num_tokens, self.d_out)
        context_vec = self.out_proj(context_vec)
        return context_vec

Above there are two interesting processes as well.  Masked fill covers words after the current token because the transformer predicts the next word without seeing it.  If you imagine the matrix, this mask covers the top right diagonal half.  Also there is drop out where random elements are dropped to force lazy neurons to be active and not rely on others to do the lifting.

In [None]:
class LayerNorm(nn.Module):
    """
    Layer normalization module that normalizes inputs across the feature dimension.
    
    This is a custom implementation of Layer Normalization as described in the paper
    'Layer Normalization' by Jimmy Lei Ba et al.
    """
    def __init__(self, emb_dim):
        super().__init__()
        self.eps = 1e-5
        self.scale = nn.Parameter(torch.ones(emb_dim))
        self.shift = nn.Parameter(torch.zeros(emb_dim))

    def forward(self, x):
        mean = x.mean(dim=-1, keepdim=True)
        var = x.var(dim=-1, keepdim=True, unbiased=False)
        norm_x = (x - mean) / torch.sqrt(var + self.eps)
        return self.scale * norm_x + self.shift

class GELU(nn.Module):
    """
    Gaussian Error Linear Unit activation function module.
    
    This is an implementation of GELU as described in the paper
    'Gaussian Error Linear Units (GELUs)' by Dan Hendrycks and Kevin Gimpel.
    GELU provides a smooth, non-linear activation that performs well in
    transformer-based language models.
    """
    def __init__(self):
        super().__init__()

    def forward(self, x):
        return 0.5 * x * (1 + torch.tanh(
            torch.sqrt(torch.tensor(2.0 / torch.pi)) * 
            (x + 0.044715 * torch.pow(x, 3))
        ))


class FeedForward(nn.Module):
    """
    Position-wise feed-forward network used in transformer blocks.
    
    This implements the FFN as described in 'Attention is All You Need',
    consisting of two linear transformations with a GELU activation in between.
    """
    def __init__(self, cfg):
        super().__init__()
        self.layers = nn.Sequential(
            nn.Linear(cfg["emb_dim"], 4 * cfg["emb_dim"]), ## Expansion
            GELU(), ## Activation
            nn.Linear(4 * cfg["emb_dim"], cfg["emb_dim"]), ## Contraction
        )

    def forward(self, x):
        return self.layers(x)

 ![GPT Structure](resources/transformer.jpg)

Graphical representation of the steps in code below.

In [None]:
class TransformerBlock(nn.Module):
    """
    Complete transformer block as used in GPT architecture.
    
    Each block includes a multi-head self-attention layer followed by a feed-forward
    network, with layer normalization and residual connections.
    """
    def __init__(self, cfg):
        super().__init__()
        self.att = MultiHeadAttention(
            d_in=cfg["emb_dim"],
            d_out=cfg["emb_dim"],
            context_length=cfg["context_length"],
            num_heads=cfg["n_heads"], 
            dropout=cfg["drop_rate"],
            qkv_bias=cfg["qkv_bias"])
        self.ff = FeedForward(cfg)
        self.norm1 = LayerNorm(cfg["emb_dim"])
        self.norm2 = LayerNorm(cfg["emb_dim"])
        self.drop_shortcut = nn.Dropout(cfg["drop_rate"])

    def forward(self, x):
        # Shortcut connection for attention block
        shortcut = x
        x = self.norm1(x)
        x = self.att(x)  # Shape [batch_size, num_tokens, emb_size]
        x = self.drop_shortcut(x)
        x = x + shortcut  # Add the original input back

        # Shortcut connection for feed forward block
        shortcut = x
        x = self.norm2(x)
        x = self.ff(x)
        x = self.drop_shortcut(x)
        x = x + shortcut  # Add the original input back

        return x

In [None]:
class GPTModel(nn.Module):
    """
    Complete GPT model implementation based on the GPT-2 architecture.
    
    This model includes token and position embeddings, multiple transformer blocks,
    and a final output head for token prediction.
    """
    def __init__(self, cfg):
        super().__init__()
        self.tok_emb = nn.Embedding(cfg["vocab_size"], cfg["emb_dim"])
        self.pos_emb = nn.Embedding(cfg["context_length"], cfg["emb_dim"])
        self.drop_emb = nn.Dropout(cfg["drop_rate"])
        
        self.trf_blocks = nn.Sequential(
            *[TransformerBlock(cfg) for _ in range(cfg["n_layers"])])
        
        self.final_norm = LayerNorm(cfg["emb_dim"])
        self.out_head = nn.Linear(
            cfg["emb_dim"], cfg["vocab_size"], bias=False
        )

    def forward(self, in_idx):
        batch_size, seq_len = in_idx.shape
        tok_embeds = self.tok_emb(in_idx)
        pos_embeds = self.pos_emb(torch.arange(seq_len, device=in_idx.device))
        x = tok_embeds + pos_embeds  # Shape [batch_size, num_tokens, emb_size]
        x = self.drop_emb(x)
        x = self.trf_blocks(x)
        x = self.final_norm(x)
        logits = self.out_head(x)
        return logits

torch.manual_seed(123)
model = GPTModel(GPT_CONFIG_124M)
model.eval()

In [None]:

def generate_text_simple(model, idx, max_new_tokens, context_size):
    for _ in range(max_new_tokens):
        idx_cond = idx[:, -context_size:]
        
        with torch.no_grad():
            logits = model(idx_cond)
        
        logits = logits[:, -1, :]
        
        probas = torch.softmax(logits, dim=-1)
        
        idx_next = torch.argmax(probas, dim=-1, keepdim=True)
        
        idx = torch.cat((idx, idx_next), dim=1)
    
    return idx

![GPT Structure](resources/Stage_2_of_LLM_Design.jpg)


In [11]:
tokenizer = tiktoken.get_encoding("gpt2")

def text_to_token_ids(text, tokenizer):
    encoded = tokenizer.encode(text, allowed_special={'<|endoftext|>'})
    encoded_tensor = torch.tensor(encoded).unsqueeze(0) # add batch dimension
    return encoded_tensor

def token_ids_to_text(token_ids, tokenizer):
    flat = token_ids.squeeze(0) # remove batch dimension
    return tokenizer.decode(flat.tolist())

start_context = "Hello I am"



token_ids = generate_text_simple(
    model=model,
    idx=text_to_token_ids(start_context, tokenizer),
    max_new_tokens=10,
    context_size=GPT_CONFIG_124M["context_length"]
)

print("Output text:\n", token_ids_to_text(token_ids, tokenizer))

Output text:
 Hello I am 950ackersframeuffمブ pi…] materials polish


# THE FILE PATH BELOW WILL NEED TO BE CHANGED WHEN RUN LOCALLY OR ON COLAB.

In [12]:
with open("resources/holmes.txt", "r", encoding="utf-8") as h:
    holmes_text = h.read()

In [None]:
if torch.cuda.is_available():
   device = torch.device("cuda")
elif torch.backends.mps.is_available():
   device = torch.device("mps")
else:
   device = torch.device("cpu")

# Train/validation ratio
train_ratio = 0.90
split_idx = int(train_ratio * len(holmes_text))
train_data = holmes_text[:split_idx]
val_data = holmes_text[split_idx:]


torch.manual_seed(42)

def create_dataloader(data, batch_size, max_length, stride, drop_last, shuffle, num_workers,device):
   """
   Create a DataLoader for train or validation data.
   """
   dataset = GPTDataset(data, tokenizer, max_length, stride)
   dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=shuffle, drop_last=drop_last, num_workers=num_workers,)
   return dataloader

train_loader = create_dataloader(
    train_data,
    batch_size=16,
    max_length=GPT_CONFIG_124M["context_length"],
    stride=GPT_CONFIG_124M["context_length"],
    drop_last=True,
    shuffle=True,
    num_workers=0,
    device=device
)

val_loader = create_dataloader(
    val_data,
    batch_size=16,
    max_length=GPT_CONFIG_124M["context_length"],
    stride=GPT_CONFIG_124M["context_length"],
    drop_last=False,
    shuffle=False,
    num_workers=0,
    device=device
)

In [None]:
#this is just an information cell to show sizes and can be removed if desired

train_tokens = 0
for input_batch, target_batch in train_loader:
    train_tokens += input_batch.numel()

val_tokens = 0
for input_batch, target_batch in val_loader:
    val_tokens += input_batch.numel()

print("Training tokens:", train_tokens)
print("Validation tokens:", val_tokens)
print("All tokens:", train_tokens + val_tokens)

Training tokens: 65536
Validation tokens: 7424
All tokens: 72960


In [None]:
# Function to calculate loss for a single batch
def calc_loss_batch(input_batch, target_batch, model, device):
    """
    Calculate cross-entropy loss for a single batch.
    
    This function handles potential memory errors by falling back to CPU if needed.
    """
    try:
        input_batch, target_batch = input_batch.to(device), target_batch.to(device)
        logits = model(input_batch)
        loss = torch.nn.functional.cross_entropy(logits.reshape(-1, logits.size(-1)), target_batch.reshape(-1))
        return loss
    except RuntimeError as e:
        if "MPS backend out of memory" in str(e):
            print("MPS memory error, falling back to CPU for this batch")
            model_cpu = model.to("cpu")
            input_batch_cpu, target_batch_cpu = input_batch.to("cpu"), target_batch.to("cpu")
            logits = model_cpu(input_batch_cpu)
            loss = torch.nn.functional.cross_entropy(logits.reshape(-1, logits.size(-1)), target_batch_cpu.reshape(-1))
            model.to(device)  
            return loss.to(device)
        else:
            raise 

In [None]:
def calc_loss_loader(data_loader, model, device, max_batches=None):
    """
    Calculate average loss across multiple batches with progress bar.
    """
    total_loss = 0.0
    batch_count = 0

    # Create a progress bar for loss calculation
    max_batches = max_batches if max_batches is not None else len(data_loader)
    pbar = tqdm(total=max_batches, desc="Calculating loss", leave=False, position=2)

    for batch_idx, (inputs, targets) in enumerate(data_loader):
        # Process only max_batches if specified
        if max_batches is not None and batch_idx >= max_batches:
            break

        inputs, targets = inputs.to(device), targets.to(device)
        loss = calc_loss_batch(inputs, targets, model, device)
        total_loss += loss.item()
        batch_count += 1
        pbar.update(1)

    pbar.close()
    return total_loss / max(1, batch_count)  # Avoid division by zero

In [15]:
def generate_and_print_sample(model, tokenizer, device, start_context):
    model.eval()
    context_size = model.pos_emb.weight.shape[0]
    encoded = text_to_token_ids(start_context, tokenizer).to(device)
    with torch.no_grad():
        token_ids = generate_text_simple(
            model=model, idx=encoded,
            max_new_tokens=50, context_size=context_size
        )
    decoded_text = token_ids_to_text(token_ids, tokenizer)
    print(decoded_text.replace("\n", " "))  # Compact print format
    model.train()

In [None]:
def evaluate_model(model, train_loader, val_loader, device, eval_iter, progress_callback=None):
    """Evaluate model on train and validation sets with progress tracking"""
    model.eval()
    with torch.no_grad():
        # Evaluate on training set
        train_loss = calc_loss_loader(train_loader, model, device, max_batches=eval_iter)
        if progress_callback:
            progress_callback(1)  # Update progress by 1 step

        # Evaluate on validation set
        val_loss = calc_loss_loader(val_loader, model, device, max_batches=eval_iter)
        if progress_callback:
            progress_callback(1)  # Update progress by 1 step

    model.train()
    return train_loss, val_loss

Here is a diagram of how the training loop works

![GPT Structure](resources/train_process.jpg)

The trainable weights in this model fall into the token embeddings, positional embeddings, key matrix, value matrix, query matrix, scale and shift in the layernorm

In [None]:
def train_model_simple(model, train_loader, val_loader, optimizer, device, num_epochs,
                       eval_freq, eval_iter, start_context, tokenizer):
    train_losses, val_losses, track_tokens_seen = [], [], []
    tokens_seen, global_step = 0, -1

    model.to(device)

    for epoch in range(num_epochs):
        model.train() 
        print(f"\nEpoch {epoch+1}/{num_epochs} started.")

        epoch_progress = tqdm(train_loader, desc=f"Epoch {epoch+1}/{num_epochs}",
                             total=len(train_loader), position=0)

        for batch_idx, (input_batch, target_batch) in enumerate(epoch_progress):
            try:
                input_batch = input_batch.to(device)
                target_batch = target_batch.to(device)

                optimizer.zero_grad() 
                loss = calc_loss_batch(input_batch, target_batch, model, device)
                loss.backward()  
                optimizer.step()  
                tokens_seen += input_batch.numel()
                global_step += 1

                epoch_progress.set_postfix({
                    'loss': f"{loss.item():.3f}",
                    'tokens': tokens_seen
                })

                if global_step % eval_freq == 0:
                    print(f"\nEvaluating at step {global_step}...")

                    with tqdm(total=2, desc="Evaluation", position=1) as eval_progress:
                        train_loss, val_loss = evaluate_model(
                            model, train_loader, val_loader, device, eval_iter,
                            progress_callback=lambda x: eval_progress.update(x))

                    train_losses.append(train_loss)
                    val_losses.append(val_loss)
                    track_tokens_seen.append(tokens_seen)
                    print(f"Epoch {epoch+1} (Step {global_step:06d}): "
                          f"Train loss {train_loss:.3f}, Val loss {val_loss:.3f}")

            except RuntimeError as e:
                if "MPS" in str(e):
                    print(f"MPS memory error at Epoch {epoch+1}, Batch {batch_idx+1}. Falling back to CPU.")
                    device_cpu = torch.device("cpu")
                    input_batch = input_batch.to(device_cpu)
                    target_batch = target_batch.to(device_cpu)
                    model.to(device_cpu)

                    optimizer.zero_grad()
                    loss = calc_loss_batch(input_batch, target_batch, model, device_cpu)
                    loss.backward()
                    optimizer.step()
                    tokens_seen += input_batch.numel()
                    global_step += 1

                    epoch_progress.set_postfix({
                        'loss (CPU)': f"{loss.item():.3f}",
                        'tokens': tokens_seen
                    })

                    model.to(device)
                    input_batch = input_batch.to(device)
                    target_batch = target_batch.to(device)
                else:
                    raise e

        epoch_progress.close()

        print(f"\n{'='*50}")
        print(f"EPOCH {epoch+1} COMPLETED - MODEL GENERATION:")
        
        model.eval()
        context_size = model.pos_emb.weight.shape[0]
        encoded = text_to_token_ids(start_context, tokenizer).to(device)
        with torch.no_grad():
            token_ids = generate_text_simple(
                model=model, idx=encoded,
                max_new_tokens=50, context_size=context_size
            )
        
        full_text = token_ids_to_text(token_ids, tokenizer)
        continuation = full_text[len(start_context):]
        
        model.train()

    return train_losses, val_losses, track_tokens_seen

In [None]:
start_time = time.time()

torch.manual_seed(42)
model = GPTModel(GPT_CONFIG_124M)
model.to(device)
optimizer = torch.optim.AdamW(model.parameters(), lr=0.0004, weight_decay=0.1)

num_epochs = 10
train_losses, val_losses, tokens_seen = train_model_simple(
    model, train_loader, val_loader, optimizer, device,
    num_epochs=num_epochs, eval_freq=20, eval_iter=1,
    start_context="Every effort moves you", tokenizer=tokenizer
)

end_time = time.time()
execution_time_minutes = (end_time - start_time) / 60
print(f"Training completed in {execution_time_minutes:.2f} minutes.")

In [None]:
def generate(model, idx, max_new_tokens, context_size, temperature=0.0, top_k=None, eos_id=None):
    # Ensure model and idx are on the same device
    device = next(model.parameters()).device
    idx = idx.to(device)

    for _ in range(max_new_tokens):
        idx_cond = idx[:, -context_size:]
        with torch.no_grad():
            logits = model(idx_cond)
        logits = logits[:, -1, :]

        if top_k is not None:
            top_logits, _ = torch.topk(logits, top_k)
            min_val = top_logits[:, -1]
            neg_inf = torch.tensor(float("-inf"), device=device)
            logits = torch.where(logits < min_val, neg_inf, logits)

        if temperature > 0.0:
            logits = logits / temperature

            probs = torch.softmax(logits, dim=-1)  # (batch_size, context_len)

            idx_next = torch.multinomial(probs, num_samples=1)  # (batch_size, 1)

        else:
            idx_next = torch.argmax(logits, dim=-1, keepdim=True)  # (batch_size, 1)

        if eos_id is not None and (idx_next == eos_id).all():  
            break

        idx = torch.cat((idx, idx_next), dim=1)  # (batch_size, num_tokens+1)

    return idx

In [None]:
torch.manual_seed(123)

token_ids = generate(
model=model,
idx=text_to_token_ids("Every effort moves you", tokenizer),
max_new_tokens=15,
context_size=GPT_CONFIG_124M["context_length"],
top_k=25,
temperature=1.4
)

print("Output text:\n", token_ids_to_text(token_ids, tokenizer))