In [2]:
from transformers import AutoTokenizer
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader
from datasets import load_from_disk
import math
import time

  from .autonotebook import tqdm as notebook_tqdm


In [3]:
# Load tokenizer. It needs to be the same as the model we used in assignment 1.

model_name = "bert-base-uncased"
tokenizer = AutoTokenizer.from_pretrained(model_name)

# Configuration

In [4]:
B = 16  # Batch size (e.g., 16, 32, 64)
T = 64  # Sequence length (e.g., 32, 64, 128)
D_MODEL = 128  # Embedding dimension (e.g., 64, 128, 256)
N_HEAD = 4  # Number of attention heads (e.g., 2, 4)
N_LAYER = 2  # Number of transformer layers (e.g., 1, 2)
LEARNING_RATE = 5e-4  # Learning rate (e.g., 1e-3, 5e-4)
NUM_EPOCHS = 5
VOCAB_SIZE = tokenizer.vocab_size  # 30522 for bert-base-uncased
EVAL_INTERVAL = 100
LOG_INTERVAL = 10
CHECKPOINT_PATH = "mini_gpt_checkpoint.pt"

In [5]:
# Set device. I use MPS (Apple Silicon). You can also use "cuda" or "cpu".
device = "mps" if torch.backends.mps.is_available() else "cpu"
print(f"Using device: {device}")

Using device: mps


# Model Implementation
Implements a basic transformer-based language model with causal self-attention.
Based on the architecture described in "Attention Is All You Need" paper and GPT models.

In [6]:
class CausalSelfAttention(nn.Module):
    """
    A single head of self-attention with a causal mask to prevent
    the model from looking at future tokens.
    """

    def __init__(self, d_model, head_size):
        super().__init__()
        self.key = nn.Linear(d_model, head_size, bias=False)
        self.query = nn.Linear(d_model, head_size, bias=False)
        self.value = nn.Linear(d_model, head_size, bias=False)
        self.register_buffer('tril', torch.tril(torch.ones(T, T)))

    def forward(self, x):
        B, T_current, C = x.shape  # Get the current sequence length
        k = self.key(x)   # (B, T_current, head_size)
        q = self.query(x)  # (B, T_current, head_size)

        # Compute attention scores
        wei = q @ k.transpose(-2, -1) * C**-0.5  # (B, T_current, T_current)

        # Create a causal mask dynamically based on the current sequence length
        tril = torch.tril(torch.ones(T_current, T_current, device=x.device))

        wei = wei.masked_fill(tril == 0, float('-inf'))
        wei = F.softmax(wei, dim=-1)  # (B, T_current, T_current)

        # Perform weighted aggregation
        v = self.value(x)  # (B, T_current, head_size)
        out = wei @ v    # (B, T_current, head_size)
        return out


class MultiHeadAttention(nn.Module):
    """
    Multiple attention heads running in parallel.
    """

    def __init__(self, num_heads, head_size):
        super().__init__()
        self.heads = nn.ModuleList([CausalSelfAttention(
            D_MODEL, head_size) for _ in range(num_heads)])
        self.proj = nn.Linear(D_MODEL, D_MODEL)

    def forward(self, x):
        out = torch.cat([h(x) for h in self.heads], dim=-1)
        out = self.proj(out)
        return out


class FeedForward(nn.Module):
    """
    A simple linear layer followed by a non-linearity.
    """

    def __init__(self, d_model):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(d_model, 4 * d_model),
            nn.ReLU(),
            nn.Linear(4 * d_model, d_model)
        )

    def forward(self, x):
        return self.net(x)


class TransformerBlock(nn.Module):
    """
    A single Transformer block, combining multi-head attention and a feed-forward network.
    """

    def __init__(self, d_model, num_heads):
        super().__init__()
        head_size = d_model // num_heads
        self.sa = MultiHeadAttention(num_heads, head_size)
        self.ffwd = FeedForward(d_model)
        self.ln1 = nn.LayerNorm(d_model)
        self.ln2 = nn.LayerNorm(d_model)

    def forward(self, x):
        x = x + self.sa(self.ln1(x))
        x = x + self.ffwd(self.ln2(x))
        return x


class MiniGPT(nn.Module):
    def __init__(self):
        super().__init__()
        self.token_embedding_table = nn.Embedding(VOCAB_SIZE, D_MODEL)
        self.position_embedding_table = nn.Embedding(T, D_MODEL)
        self.blocks = nn.Sequential(
            *[TransformerBlock(D_MODEL, N_HEAD) for _ in range(N_LAYER)])
        self.ln_f = nn.LayerNorm(D_MODEL)  # final layer norm
        self.lm_head = nn.Linear(D_MODEL, VOCAB_SIZE)

    def forward(self, idx, targets=None):
        B, T = idx.shape

        # Token embeddings and positional embeddings
        tok_emb = self.token_embedding_table(idx)  # (B, T, D_MODEL)
        pos_emb = self.position_embedding_table(
            torch.arange(T, device=device))  # (T, D_MODEL)
        x = tok_emb + pos_emb  # (B, T, D_MODEL)

        # Transformer blocks
        x = self.blocks(x)  # (B, T, D_MODEL)
        x = self.ln_f(x)  # (B, T, D_MODEL)

        # Language model head
        logits = self.lm_head(x)  # (B, T, VOCAB_SIZE)

        loss = None
        if targets is not None:
            logits = logits.view(B * T, VOCAB_SIZE)
            targets = targets.view(B * T)
            loss = F.cross_entropy(logits, targets)

        return logits, loss

    @torch.no_grad()
    def generate(self, idx, max_new_tokens):
        # Generates new tokens based on an input sequence
        for _ in range(max_new_tokens):
            idx_cond = idx[:, -T:]
            logits, _ = self(idx_cond)
            logits = logits[:, -1, :]  # (B, C)
            probs = F.softmax(logits, dim=-1)
            idx_next = torch.multinomial(probs, num_samples=1)  # (B, 1)
            idx = torch.cat((idx, idx_next), dim=1)  # (B, T+1)
        return idx

# Training Setup
Load the preprocessed dataset from Assignment 1

In [7]:
try:
    chunked_dataset = load_from_disk("./processed_dataset")
    print("Dataset loaded successfully.")
except FileNotFoundError:
    print("Error: Processed dataset not found. Please run Assignment 1 to create it.")
    exit()

# Define the custom PyTorch Dataset and DataLoader
# The Dataset splits the input into X (input_ids) and Y (target)


class NextTokenPredictionDataset(torch.utils.data.Dataset):
    def __init__(self, dataset):
        self.dataset = dataset

    def __len__(self):
        return len(self.dataset)

    def __getitem__(self, idx):
        item = self.dataset[idx]
        input_ids = item['input_ids']
        
        # Note: We return lists here, as the tokenizer.pad function expects a list of dictionaries.
        # It's cleaner to handle the tensor conversion and padding in the collate function.
        return {'input_ids': input_ids}


def custom_collate_fn(batch):
    # Separate the input_ids and pad them
    input_ids = [item['input_ids'] for item in batch]

    # Pad the batch using the tokenizer
    # We must use a list of lists here for the tokenizer's padding method
    padded_input_ids = tokenizer.pad({'input_ids': input_ids},
                                     padding=True,
                                     return_tensors='pt',
                                     max_length=T)

    # Extract the padded input IDs and attention masks
    x = padded_input_ids['input_ids'][:, :-1]
    y = padded_input_ids['input_ids'][:, 1:]

    # The attention mask is not used by the MiniGPT model in the provided code,
    # but it's good practice to keep track of it if you were to implement it.
    # attention_mask = padded_input_ids['attention_mask'][:, :-1]

    return x, y


# Create DataLoader instances
train_dataset = NextTokenPredictionDataset(chunked_dataset)
train_dataloader = DataLoader(
    train_dataset, batch_size=B, shuffle=True, collate_fn=custom_collate_fn)


# Instantiate the model and optimizer
model = MiniGPT().to(device)
optimizer = torch.optim.AdamW(model.parameters(), lr=LEARNING_RATE)

Dataset loaded successfully.


# Training Loop

In [8]:
import numpy as np


# Function to calculate perplexity
@torch.no_grad()
def calculate_perplexity(loss):
    # Perplexity is the exponentiation of the loss (cross-entropy)
    # Smaller perplexity is better
    return math.exp(loss)

model.train()
start_time = time.time()
train_losses = []
perplexities = []
eval_losses = []

print("\nStarting training loop...")
for epoch in range(NUM_EPOCHS):
    epoch_start_time = time.time()
    total_loss = 0
    for i, (X, Y) in enumerate(train_dataloader):
        X, Y = X.to(device), Y.to(device)

        # Forward pass and loss computation
        logits, loss = model(X, Y)

        # Backward pass and optimizer step
        optimizer.zero_grad(set_to_none=True)
        loss.backward()
        optimizer.step()

        total_loss += loss.item()

        # Log metrics every N steps
        if (i + 1) % LOG_INTERVAL == 0:
            avg_loss = total_loss / LOG_INTERVAL
            train_losses.append(avg_loss)
            perplexity = calculate_perplexity(avg_loss)
            perplexities.append(perplexity)
            print(
                f"Epoch {epoch+1}/{NUM_EPOCHS}, Step {i+1}, Loss: {avg_loss:.4f}, Perplexity: {perplexity:.2f}")
            total_loss = 0

    epoch_end_time = time.time()
    print(
        f"Epoch {epoch+1} finished. Time taken: {epoch_end_time - epoch_start_time:.2f}s")

    # Save a model checkpoint after each epoch
    torch.save(model.state_dict(), CHECKPOINT_PATH)
    print(f"Model checkpoint saved to {CHECKPOINT_PATH}")

end_time = time.time()
print(
    f"\nTraining complete! Total time: {(end_time - start_time) / 60:.2f} minutes")
print(f"Final model checkpoint saved to {CHECKPOINT_PATH}")

# --- 4. Final deliverables preparation ---
# This part would typically involve saving the logs for plotting
# and preparing the report.
# Example of saving the metrics for later plotting
np.savez("training_metrics.npz", train_losses=train_losses,
         perplexities=perplexities)

print("\nTraining metrics saved for visualization.")
print("You can now load the 'mini_gpt_checkpoint.pt' file for inference or the 'training_metrics.npz' file for plotting loss and perplexity curves.")

You're using a BertTokenizerFast tokenizer. Please note that with a fast tokenizer, using the `__call__` method is faster than using a method to encode the text followed by a call to the `pad` method to get a padded encoding.



Starting training loop...




Epoch 1/5, Step 10, Loss: 9.9051, Perplexity: 20032.18
Epoch 1/5, Step 20, Loss: 8.5428, Perplexity: 5129.72
Epoch 1/5, Step 30, Loss: 8.0160, Perplexity: 3028.97
Epoch 1/5, Step 40, Loss: 7.4298, Perplexity: 1685.47
Epoch 1/5, Step 50, Loss: 7.0282, Perplexity: 1128.00
Epoch 1/5, Step 60, Loss: 6.8452, Perplexity: 939.37
Epoch 1/5, Step 70, Loss: 6.7707, Perplexity: 871.96
Epoch 1/5, Step 80, Loss: 6.5292, Perplexity: 684.84
Epoch 1/5, Step 90, Loss: 6.2807, Perplexity: 534.18
Epoch 1/5, Step 100, Loss: 6.4391, Perplexity: 625.82
Epoch 1/5, Step 110, Loss: 6.2999, Perplexity: 544.52
Epoch 1/5, Step 120, Loss: 6.5138, Perplexity: 674.39
Epoch 1/5, Step 130, Loss: 6.3268, Perplexity: 559.38
Epoch 1/5, Step 140, Loss: 6.2095, Perplexity: 497.44
Epoch 1/5, Step 150, Loss: 6.3722, Perplexity: 585.32
Epoch 1/5, Step 160, Loss: 6.0963, Perplexity: 444.21
Epoch 1/5, Step 170, Loss: 6.4435, Perplexity: 628.60
Epoch 1/5, Step 180, Loss: 6.1877, Perplexity: 486.74
Epoch 1/5, Step 190, Loss: 6.35

In [12]:
import torch
import torch.nn as nn

# Define the linear layer
# in_features=4 (input dimension)
# out_features=2 (output dimension)
linear_layer = nn.Linear(in_features=4, out_features=2)

# Print the initialized weight and bias matrices
print("Weights shape:", linear_layer.weight.shape)
print("Bias shape:", linear_layer.bias.shape)

Weights shape: torch.Size([2, 4])
Bias shape: torch.Size([2])


In [10]:
# Create a dummy input tensor (representing a single sample)
# The shape is (batch_size, in_features) -> (1, 4)
input_data = torch.randn(1, 4)
print("\nInput data:\n", input_data)
print("Input shape:", input_data.shape)

# Perform the forward pass (the linear transformation)
output_data = linear_layer(input_data)

print("\nOutput data:\n", output_data)
print("Output shape:", output_data.shape)


Input data:
 tensor([[-1.4292, -1.0276, -0.8672, -0.5103]])
Input shape: torch.Size([1, 4])

Output data:
 tensor([[-0.7717,  0.7331]], grad_fn=<AddmmBackward0>)
Output shape: torch.Size([1, 2])
