In [None]:
import sys                                                                                                                                                                                             
sys.path.insert(0, os.path.join(os.path.dirname("__file__"), ".."))  # add src/ to path

import os
import math
import numpy as np
import random
import logging
import importlib

import torch
import torch.nn as nn
import torch.functional as F
from torch.nn import RMSNorm
from torch.amp import autocast, GradScaler

from torch.utils.data import Dataset, DataLoader
import json
import glob
import gzip
import bz2

from transformers import AutoTokenizer
import tiktoken


from tqdm.auto import tqdm, trange
import time

import models.gpt2 as gpt2
importlib.reload(gpt2)  # Reload to get latest changes

print("✅ All imports successful!")
print(f"PyTorch version: {torch.__version__}")
print(f"CUDA available: {torch.cuda.is_available()}")
if torch.cuda.is_available():
    print(f"CUDA device: {torch.cuda.get_device_name()}")


✅ All imports successful!
PyTorch version: 2.6.0
CUDA available: False


## Tokenizer Setup and Testing

In [None]:
importlib.reload(gpt2)  # Reload to get latest changes

# Set up the tokenizer
print("Setting up tokenizer...")
tokenizer = gpt2.setup_tokenizer()

# Test tokenization
test_text = "The quick brown fox jumps over the lazy dog."
tokens = tokenizer.encode(test_text)
decoded = tokenizer.decode(tokens)

print(f"\nTest text: '{test_text}'")
print(f"Tokens: {tokens}")
print(f"Decoded: '{decoded}'")
print(f"Vocabulary size: {tokenizer.vocab_size}")

# Find special token IDs
special_tokens = ["<|user|>", "<|assistant|>", "<|end|>", "<|system|>", "<|pad|>"]
print(f"\nSpecial token IDs:")
for token in special_tokens:
    token_id = tokenizer.convert_tokens_to_ids(token)
    print(f"  {token}: {token_id}")

# Calculate actual vocabulary size needed
max_token_id = max(tokenizer.convert_tokens_to_ids(token) for token in special_tokens)
actual_vocab_size = max_token_id + 1
print(f"\nActual vocabulary size needed: {actual_vocab_size}")
print(f"Difference from tokenizer vocab size: {actual_vocab_size - tokenizer.vocab_size}")

print("\n✅ Tokenizer setup complete!")


Setting up tokenizer...

Test text: 'The quick brown fox jumps over the lazy dog.'
Tokens: [464, 2068, 7586, 21831, 18045, 625, 262, 16931, 3290, 13]
Decoded: 'The quick brown fox jumps over the lazy dog.'
Vocabulary size: 50257

Special token IDs:
  <|user|>: 50259
  <|assistant|>: 50260
  <|end|>: 50261
  <|system|>: 50258
  <|pad|>: 50257

Actual vocabulary size needed: 50262
Difference from tokenizer vocab size: 5

✅ Tokenizer setup complete!


## GPTEmbedding Layer Testing

In [None]:
importlib.reload(gpt2)  # Reload to get latest changes

# Test parameters
vocab_size = 1000
emb_dim = 8
context_length = 256
batch_size = 2
seq_length = 6

# Create random token IDs
token_ids = torch.randint(0, vocab_size, (batch_size, seq_length))
print(f"Input token IDs shape: {token_ids.shape}")
print(f"Sample token IDs: {token_ids[0]}")

# Initialize and test the embedding layer
print("\nTesting GPTEmbedding layer...")
embedding_layer = gpt2.GPTEmbedding(vocab_size, emb_dim, context_length)
output = embedding_layer(token_ids)

# Verify output
print(f"Output shape: {output.shape}")
print(f"Expected shape: {(batch_size, seq_length, emb_dim)}")
print(f"Output sample (first token): {output[0, 0, :5]}")

# Sanity checks
assert output.shape == (batch_size, seq_length, emb_dim), \
    f"Expected output shape {(batch_size, seq_length, emb_dim)}, got {output.shape}"

# Check that embeddings are different for different tokens
if not torch.allclose(output[0, 0], output[0, 1]):
    print("✅ Different tokens produce different embeddings")
else:
    print("⚠️  Warning: Different tokens produce similar embeddings")

print("\n✅ GPTEmbedding layer test passed!")


Input token IDs shape: torch.Size([2, 6])
Sample token IDs: tensor([714, 792, 689,  27, 593, 683])

Testing GPTEmbedding layer...
Output shape: torch.Size([2, 6, 8])
Expected shape: (2, 6, 8)
Output sample (first token): tensor([ 0.3039, -1.1265, -0.6162,  0.0638,  0.5015], grad_fn=<SliceBackward0>)
✅ Different tokens produce different embeddings

✅ GPTEmbedding layer test passed!


## Model Component Testing

In [19]:
# Cell 4: Test MultiHeadAttention with RoPE
importlib.reload(gpt2)  # Reload to get latest changes

# Test parameters
torch.manual_seed(123)  # For reproducible results
d_in = 16
d_out = d_in
num_heads = 4
num_kv_heads = 2
context_length = 32
dropout = 0.0
batch_size = 3
seq_len = 7

# Create random input tensor
x = torch.randn(batch_size, seq_len, d_in)
print(f"Input shape: {x.shape}")

# Initialize MultiHeadAttention with RoPE
print("\nTesting MultiHeadAttention with RoPE...")
mha = gpt2.MultiHeadAttention(d_in=d_in, max_seq_length=context_length, dropout=dropout, num_heads=num_heads, num_kv_heads=num_kv_heads, bias_qkv=True)
out = mha(x)[0]

# Verify output
print(f"Output shape: {out.shape}")
print(f"Expected shape: {(batch_size, seq_len, d_out)}")
print(f"Output sample: {out[0, 0, :5]}")

# Sanity checks
assert out.shape == (batch_size, seq_len, d_out), \
    f"Expected output shape {(batch_size, seq_len, d_out)}, got {out.shape}"
assert not torch.isnan(out).any(), "Output contains NaNs!"

# Test RoPE positional sensitivity
seq = torch.randn(1, 2, d_in)
seq_shifted = torch.cat([torch.zeros(1, 3, d_in), seq], dim=1)  # same tokens at later positions

out1 = mha(seq)[0]
out2 = mha(seq_shifted)[0][:, -2:, :]

if not torch.allclose(out1, out2, atol=1e-5):
    print(":white_check_mark: RoPE working: outputs differ for same tokens at different positions")
else:
    print(":warning: RoPE may not be applied correctly")

# The outputs should be different due to RoPE encoding different positions
if not torch.allclose(out1, out2):
    print("✅ RoPE is working: same tokens at different positions produce different outputs")
else:
    print("⚠️  Warning: RoPE might not be working correctly")

print("\n✅ MultiHeadAttention with RoPE test passed!")


Input shape: torch.Size([3, 7, 16])

Testing MultiHeadAttention with RoPE...
Output shape: torch.Size([3, 7, 16])
Expected shape: (3, 7, 16)
Output sample: tensor([-0.2015,  0.3772,  0.2905, -0.0066, -0.2134], grad_fn=<SliceBackward0>)
:white_check_mark: RoPE working: outputs differ for same tokens at different positions
✅ RoPE is working: same tokens at different positions produce different outputs

✅ MultiHeadAttention with RoPE test passed!


In [25]:
# Cell 5: Test SwiGLU Activation Function
importlib.reload(gpt2)  # Reload to get latest changes

# Test parameters
d_model = 16
d_ff = 32
batch_size = 4
seq_len = 8

# Create test input
x = torch.randn(batch_size, seq_len, d_model)
print(f"Input shape: {x.shape}")
print(f"Input sample: {x[0, 0, :5]}")

# Initialize and test SwiGLU
print("\nTesting SwiGLU activation...")
swiglu = gpt2.SwiGLU(d_model, d_ff, d_model)
out = swiglu(x)

# Verify output
print(f"Output shape: {out.shape}")
print(f"Expected shape: {(batch_size, seq_len, d_model)}")
print(f"Output sample: {out[0, 0, :5]}")

# Sanity checks
assert out.shape == (batch_size, seq_len, d_model), \
    f"Expected output shape {(batch_size, seq_len, d_model)}, got {out.shape}"
assert not torch.isnan(out).any(), "Output contains NaNs!"

# Test that SwiGLU is non-linear
# Create two different inputs
x1 = torch.randn(1, 1, d_model)
x2 = torch.randn(1, 1, d_model)
out1 = swiglu(x1)
out2 = swiglu(x2)

# Test linearity: SwiGLU(x1 + x2) should NOT equal SwiGLU(x1) + SwiGLU(x2)
combined_input = x1 + x2
combined_output = swiglu(combined_input)
sum_outputs = out1 + out2

if not torch.allclose(combined_output, sum_outputs):
    print("✅ SwiGLU is non-linear (as expected)")
else:
    print("⚠️  Warning: SwiGLU appears to be linear")

print("\n✅ SwiGLU activation test passed!")


Input shape: torch.Size([4, 8, 16])
Input sample: tensor([-0.2807, -0.3287,  0.4688,  1.5427, -1.2366])

Testing SwiGLU activation...
Output shape: torch.Size([4, 8, 16])
Expected shape: (4, 8, 16)
Output sample: tensor([-0.0409, -0.0312, -0.1020,  0.0339,  0.0536], grad_fn=<SliceBackward0>)
✅ SwiGLU is non-linear (as expected)

✅ SwiGLU activation test passed!


In [26]:
# Cell 6: Test FeedForward Layer
importlib.reload(gpt2)  # Reload to get latest changes

# Test parameters
emb_dim = 16
batch_size = 10
seq_len = 4

# Create test input
x = torch.randn(batch_size, seq_len, emb_dim)
print(f"Input shape: {x.shape}")
print(f"Input sample: {x[0, 0, :5]}")

# Initialize and test FeedForward
print("\nTesting FeedForward layer...")
ff = gpt2.FeedForward(emb_dim)
out = ff(x)

# Verify output
print(f"Output shape: {out.shape}")
print(f"Expected shape: {(batch_size, seq_len, emb_dim)}")
print(f"Output sample: {out[0, 0, :5]}")

# Sanity checks
assert out.shape == (batch_size, seq_len, emb_dim), \
    f"Expected output shape {(batch_size, seq_len, emb_dim)}, got {out.shape}"
assert not torch.isnan(out).any(), "Output contains NaNs!"

# Test that FeedForward transforms the input
if not torch.allclose(x, out):
    print("✅ FeedForward transforms the input (as expected)")
else:
    print("⚠️  Warning: FeedForward doesn't seem to transform the input")

print("\n✅ FeedForward layer test passed!")


Input shape: torch.Size([10, 4, 16])
Input sample: tensor([-0.2383, -0.4250, -0.7056, -0.7724,  0.2178])

Testing FeedForward layer...
Output shape: torch.Size([10, 4, 16])
Expected shape: (10, 4, 16)
Output sample: tensor([ 0.1601,  0.0216, -0.1432,  0.0224,  0.2945], grad_fn=<SliceBackward0>)
✅ FeedForward transforms the input (as expected)

✅ FeedForward layer test passed!


In [28]:
# Cell 7: Test TransformerBlock
importlib.reload(gpt2)  # Reload to get latest changes

# Test configuration
OG_GPT_CONFIG = {
    "vocab_size": 50257,
    "context_length": 1024,
    "emb_dim": 768,
    "n_heads": 12,
    "n_layers": 12,
    "drop_rate": 0.1,
    "qkv_bias": False
}

# Create test input
torch.manual_seed(123)
x = torch.rand(2, 4, OG_GPT_CONFIG["emb_dim"])
print(f"Input shape: {x.shape}")

# Initialize and test TransformerBlock
print("\nTesting TransformerBlock...")
block = gpt2.TransformerBlock(OG_GPT_CONFIG)
output = block(x)[0]

# Verify output
print(f"Output shape: {output.shape}")
print(f"Expected shape: {(2, 4, OG_GPT_CONFIG['emb_dim'])}")

# Sanity checks
assert output.shape == (2, 4, OG_GPT_CONFIG["emb_dim"]), \
    f"Expected output shape {(2, 4, OG_GPT_CONFIG['emb_dim'])}, got {output.shape}"
assert not torch.isnan(output).any(), "Output contains NaNs!"

# Test that the block transforms the input
if not torch.allclose(x, output):
    print("✅ TransformerBlock transforms the input (as expected)")
else:
    print("⚠️  Warning: TransformerBlock doesn't seem to transform the input")

print("\n✅ TransformerBlock test passed!")


Input shape: torch.Size([2, 4, 768])

Testing TransformerBlock...
Output shape: torch.Size([2, 4, 768])
Expected shape: (2, 4, 768)
✅ TransformerBlock transforms the input (as expected)

✅ TransformerBlock test passed!


In [32]:
# Cell 8: Test Complete GPTModel
importlib.reload(gpt2)  # Reload to get latest changes

# Calculate vocabulary size from tokenizer
special_tokens = ["<|user|>", "<|assistant|>", "<|end|>", "<|system|>", "<|pad|>"]
max_token_id = max(tokenizer.convert_tokens_to_ids(token) for token in special_tokens)
actual_vocab_size = max_token_id + 1

# Test configuration
CUSTOM_GPT_CONFIG = {
    "vocab_size": actual_vocab_size,
    "context_length": 1024,
    "emb_dim": 512,
    "n_heads": 8,
    "n_layers": 12,
    "drop_rate": 0.1,
    "qkv_bias": False
}

print(f"Using vocabulary size: {actual_vocab_size}")

# Test with real tokenized text
sentence = "The quick brown fox jumps over the lazy dog."
token_ids = tokenizer.encode(sentence)
token_ids = torch.tensor(token_ids)
print(f"Input sentence: '{sentence}'")
print(f"Token IDs: {token_ids}")
print(f"Token IDs shape: {token_ids.unsqueeze(0).shape}")

# Initialize and test GPTModel
print("\nTesting GPTModel...")
gpt_model = gpt2.GPT(CUSTOM_GPT_CONFIG)
output = gpt_model(token_ids.unsqueeze(0))[0]

# Verify output
print(f"Output shape: {output.shape}")
print(f"Expected shape: {(1, len(token_ids), actual_vocab_size)}")

# Sanity checks
assert output.shape == (1, len(token_ids), actual_vocab_size), \
    f"Expected output shape {(1, len(token_ids), actual_vocab_size)}, got {output.shape}"
assert not torch.isnan(output).any(), "Output contains NaNs!"

# Check that logits are reasonable (not all the same)
logits_variance = output.var()
print(f"Logits variance: {logits_variance:.4f}")
if logits_variance > 0.01:
    print("✅ Logits have reasonable variance")
else:
    print("⚠️  Warning: Logits have very low variance")

print("\n✅ GPTModel test passed!")


Using vocabulary size: 50262
Input sentence: 'The quick brown fox jumps over the lazy dog.'
Token IDs: tensor([  464,  2068,  7586, 21831, 18045,   625,   262, 16931,  3290,    13])
Token IDs shape: torch.Size([1, 10])

Testing GPTModel...
Output shape: torch.Size([1, 10, 50262])
Expected shape: (1, 10, 50262)
Logits variance: 515.8440
✅ Logits have reasonable variance

✅ GPTModel test passed!


## Text Generation Testing

In [None]:
importlib.reload(gpt2)  # Reload to get latest changes

# Test text generation
start_context = "The quick brown fox"
print(f"Starting context: '{start_context}'")

# Generate text (will be random since model is untrained)
full_text = gpt2.generate_text(
    start_context=start_context,
    tokenizer=tokenizer,
    model=gpt_model,
    max_new_tokens=10,
    context_size=CUSTOM_GPT_CONFIG["context_length"]
)

print(f"Generated text: '{full_text}'")
print("\nNote: The generated text will be random since the model is untrained.")
print("This is expected! After training, the model should generate more coherent text.")

print("\n✅ Text generation test passed!")


Starting context: 'The quick brown fox'
Output: tensor([[  464,  2068,  7586, 21831, 21831, 21831, 21831, 21831, 21831, 21831,
         21831, 21831, 21831, 21831]])
Output length: 14
Generated text: 'The quick brown fox fox fox fox fox fox fox fox fox fox fox'

Note: The generated text will be random since the model is untrained.
This is expected! After training, the model should generate more coherent text.

✅ Text generation test passed!


## Dataset Creation Testing

In [None]:
##

## DataLoader Creation Testing

## Dataset Creating Testing