In [1]:
!pip install torch transformers datasets matplotlib seaborn pandas tqdm



Collecting torch
  Downloading torch-2.9.1-cp312-cp312-manylinux_2_28_x86_64.whl.metadata (30 kB)
Collecting transformers
  Downloading transformers-4.57.3-py3-none-any.whl.metadata (43 kB)
Collecting datasets
  Downloading datasets-4.4.1-py3-none-any.whl.metadata (19 kB)
Collecting matplotlib
  Downloading matplotlib-3.10.7-cp312-cp312-manylinux2014_x86_64.manylinux_2_17_x86_64.whl.metadata (11 kB)
Collecting seaborn
  Downloading seaborn-0.13.2-py3-none-any.whl.metadata (5.4 kB)
Collecting pandas
  Downloading pandas-2.3.3-cp312-cp312-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl.metadata (91 kB)
Collecting filelock (from torch)
  Downloading filelock-3.20.0-py3-none-any.whl.metadata (2.1 kB)
Collecting sympy>=1.13.3 (from torch)
  Downloading sympy-1.14.0-py3-none-any.whl.metadata (12 kB)
Collecting networkx>=2.5.1 (from torch)
  Downloading networkx-3.6-py3-none-any.whl.metadata (6.8 kB)
Collecting fsspec>=0.8.5 (from torch)
  Downloading fsspec-2025.10.0-py3-none-any.whl.metadat

In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader
from transformers import BertTokenizerFast
from datasets import load_dataset
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
from tqdm.notebook import tqdm
import math

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

Using device: cpu


In [2]:
# Load AG News
dataset = load_dataset("ag_news")
tokenizer = BertTokenizerFast.from_pretrained('bert-base-uncased')

# Use subset for faster training
train_data = dataset['train'].select(range(10000))  # Use 10k samples
test_data = dataset['test'].select(range(2000))      # Use 2k for testing

print(f"Train samples: {len(train_data)}")
print(f"Test samples: {len(test_data)}")

Train samples: 10000
Test samples: 2000


In [3]:
class SimpleDataset(torch.utils.data.Dataset):
    def __init__(self, texts, labels, max_len=64):
        self.encodings = tokenizer(texts, truncation=True, padding='max_length', 
                                   max_length=max_len, return_tensors='pt')
        self.labels = torch.tensor(labels)
    
    def __len__(self):
        return len(self.labels)
    
    def __getitem__(self, idx):
        return {
            'input_ids': self.encodings['input_ids'][idx],
            'labels': self.labels[idx]
        }

# Convert to lists before passing to SimpleDataset
train_dataset = SimpleDataset(list(train_data['text']), list(train_data['label']))
test_dataset = SimpleDataset(list(test_data['text']), list(test_data['label']))

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32)

In [4]:
class SinusoidalPositioning(nn.Module):
    def __init__(self, d_model, max_len=512):
        super().__init__()
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len).unsqueeze(1).float()
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        self.register_buffer('pe', pe.unsqueeze(0))
    
    def forward(self, x):
        return x + self.pe[:, :x.size(1)]

class LearnedPositioning(nn.Module):
    def __init__(self, d_model, max_len=512):
        super().__init__()
        self.pos_embedding = nn.Embedding(max_len, d_model)
    
    def forward(self, x):
        positions = torch.arange(x.size(1), device=x.device).unsqueeze(0)
        return x + self.pos_embedding(positions)

class RoPE(nn.Module):
    def __init__(self, d_model, max_len=512):
        super().__init__()
        self.d_model = d_model
        
        # Pre-compute and cache sin/cos for efficiency
        inv_freq = 1.0 / (10000 ** (torch.arange(0, d_model, 2).float() / d_model))
        positions = torch.arange(max_len).float()
        sinusoid = torch.einsum('i,j->ij', positions, inv_freq)
        
        # Register as buffers so they move with the model to GPU/CPU
        self.register_buffer('sin', sinusoid.sin())
        self.register_buffer('cos', sinusoid.cos())
    
    def forward(self, q, k):
        # Get sin/cos for sequence length
        seq_len = q.size(2)
        sin = self.sin[:seq_len, :].unsqueeze(0).unsqueeze(0)  # [1, 1, seq_len, d//2]
        cos = self.cos[:seq_len, :].unsqueeze(0).unsqueeze(0)
        
        # Apply rotation
        def rotate(x, sin, cos):
            x1, x2 = x[..., 0::2], x[..., 1::2]
            return torch.stack([
                x1 * cos - x2 * sin,
                x1 * sin + x2 * cos
            ], dim=-1).flatten(-2)
        
        q_rot = rotate(q, sin, cos)
        k_rot = rotate(k, sin, cos)
        return q_rot, k_rot

print('Positional encodings defined with optimized RoPE caching')

Positional encodings defined with optimized RoPE caching


In [5]:
class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, n_heads, dropout=0.1, use_rope=False):
        super().__init__()
        self.d_model = d_model
        self.n_heads = n_heads
        self.d_k = d_model // n_heads
        self.use_rope = use_rope
        
        self.q_linear = nn.Linear(d_model, d_model)
        self.k_linear = nn.Linear(d_model, d_model)
        self.v_linear = nn.Linear(d_model, d_model)
        self.out = nn.Linear(d_model, d_model)
        
        # Add attention dropout
        self.attn_dropout = nn.Dropout(dropout)
        
        if use_rope:
            self.rope = RoPE(self.d_k)
    
    def forward(self, x):
        batch_size, seq_len, _ = x.size()
        
        # Linear projections
        Q = self.q_linear(x).view(batch_size, seq_len, self.n_heads, self.d_k).transpose(1, 2)
        K = self.k_linear(x).view(batch_size, seq_len, self.n_heads, self.d_k).transpose(1, 2)
        V = self.v_linear(x).view(batch_size, seq_len, self.n_heads, self.d_k).transpose(1, 2)
        
        # Apply RoPE if needed
        if self.use_rope:
            Q, K = self.rope(Q, K)
        
        # Attention
        scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(self.d_k)
        attn = F.softmax(scores, dim=-1)
        attn = self.attn_dropout(attn)  # Apply dropout to attention weights
        out = torch.matmul(attn, V)
        
        # Reshape and output projection
        out = out.transpose(1, 2).contiguous().view(batch_size, seq_len, self.d_model)
        return self.out(out)

print('MultiHeadAttention defined with attention dropout')

MultiHeadAttention defined with attention dropout


In [6]:
class FeedForward(nn.Module):
    def __init__(self, d_model, d_ff, activation='relu'):
        super().__init__()
        self.activation = activation
        
        if activation == 'swiglu':
            # SwiGLU requires separate gate and value projections
            self.gate_proj = nn.Linear(d_model, d_ff)
            self.value_proj = nn.Linear(d_model, d_ff)
            self.linear2 = nn.Linear(d_ff, d_model)
        else:
            self.linear1 = nn.Linear(d_model, d_ff)
            self.linear2 = nn.Linear(d_ff, d_model)
    
    def forward(self, x):
        if self.activation == 'relu':
            return self.linear2(F.relu(self.linear1(x)))
        elif self.activation == 'gelu':
            return self.linear2(F.gelu(self.linear1(x)))
        elif self.activation == 'swiglu':
            # SwiGLU: separate gate and value, then element-wise multiply
            gate = F.silu(self.gate_proj(x))
            value = self.value_proj(x)
            return self.linear2(gate * value)

print('FeedForward defined with correct SwiGLU implementation')

FeedForward defined with correct SwiGLU implementation


In [7]:
class TransformerLayer(nn.Module):
    def __init__(self, d_model, n_heads, d_ff, dropout=0.1, 
                 pre_norm=False, use_rope=False, activation='relu'):
        super().__init__()
        self.pre_norm = pre_norm
        
        self.attn = MultiHeadAttention(d_model, n_heads, dropout, use_rope)
        self.ffn = FeedForward(d_model, d_ff, activation)
        
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)
    
    def forward(self, x):
        # Pre-LN or Post-LN
        if self.pre_norm:
            # Pre-LN: norm before sublayer
            x = x + self.dropout(self.attn(self.norm1(x)))
            x = x + self.dropout(self.ffn(self.norm2(x)))
        else:
            # Post-LN: norm after sublayer
            x = self.norm1(x + self.dropout(self.attn(x)))
            x = self.norm2(x + self.dropout(self.ffn(x)))
        return x

print('TransformerLayer defined')

TransformerLayer defined


In [8]:
class TransformerClassifier(nn.Module):
    def __init__(self, vocab_size, d_model, n_layers, n_heads, d_ff, num_classes,
                 max_len=512, dropout=0.1, pre_norm=False, pos_type='sinusoidal', 
                 activation='relu'):
        super().__init__()
        
        self.d_model = d_model
        self.embedding = nn.Embedding(vocab_size, d_model)
        
        # Positional encoding
        if pos_type == 'sinusoidal':
            self.pos_encoder = SinusoidalPositioning(d_model, max_len)
        elif pos_type == 'learned':
            self.pos_encoder = LearnedPositioning(d_model, max_len)
        elif pos_type == 'rope':
            self.pos_encoder = None  # RoPE applied in attention
        
        use_rope = (pos_type == 'rope')
        
        # Transformer layers
        self.layers = nn.ModuleList([
            TransformerLayer(d_model, n_heads, d_ff, dropout, pre_norm, use_rope, activation)
            for _ in range(n_layers)
        ])
        
        # Final layer norm for Pre-LN architectures
        self.final_norm = nn.LayerNorm(d_model) if pre_norm else None
        
        self.classifier = nn.Linear(d_model, num_classes)
        self.dropout = nn.Dropout(dropout)
    
    def forward(self, x):
        # Embedding with scaling (as per original Transformer paper)
        x = self.embedding(x) * math.sqrt(self.d_model)
        
        # Add positional encoding (if not RoPE)
        if self.pos_encoder is not None:
            x = self.pos_encoder(x)
        
        x = self.dropout(x)
        
        # Transformer layers
        for layer in self.layers:
            x = layer(x)
        
        # Apply final norm for Pre-LN
        if self.final_norm is not None:
            x = self.final_norm(x)
        
        # Classification (use [CLS] token = first token)
        x = x[:, 0, :]
        return self.classifier(x)

print('TransformerClassifier defined with embedding scaling and final Pre-LN norm')

TransformerClassifier defined with embedding scaling and final Pre-LN norm


In [9]:
def train_model(model, train_loader, test_loader, epochs=10, lr=1e-4):
    model = model.to(device)
    optimizer = torch.optim.Adam(model.parameters(), lr=lr)
    criterion = nn.CrossEntropyLoss()
    
    history = {'train_loss': [], 'train_acc': [], 'test_acc': []}
    
    for epoch in range(epochs):
        # Training
        model.train()
        train_loss = 0
        correct = 0
        total = 0
        
        for batch in tqdm(train_loader, desc=f'Epoch {epoch+1}/{epochs}'):
            input_ids = batch['input_ids'].to(device)
            labels = batch['labels'].to(device)
            
            optimizer.zero_grad()
            outputs = model(input_ids)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            
            train_loss += loss.item()
            _, predicted = outputs.max(1)
            total += labels.size(0)
            correct += predicted.eq(labels).sum().item()
        
        train_acc = 100. * correct / total
        
        # Testing
        model.eval()
        correct = 0
        total = 0
        
        with torch.no_grad():
            for batch in test_loader:
                input_ids = batch['input_ids'].to(device)
                labels = batch['labels'].to(device)
                outputs = model(input_ids)
                _, predicted = outputs.max(1)
                total += labels.size(0)
                correct += predicted.eq(labels).sum().item()
        
        test_acc = 100. * correct / total
        
        history['train_loss'].append(train_loss / len(train_loader))
        history['train_acc'].append(train_acc)
        history['test_acc'].append(test_acc)
        
        print(f'Train Loss: {train_loss/len(train_loader):.3f}, Train Acc: {train_acc:.2f}%, Test Acc: {test_acc:.2f}%')
    
    return history

print('done')

done


In [10]:
config = {
    'vocab_size': 30522,
    'd_model': 128,
    'n_layers': 4,
    'n_heads': 4,
    'd_ff': 512,
    'num_classes': 4,
    'max_len': 64,
    'dropout': 0.1
}

# Define 8 variants
variants = {
    'Original': {'pre_norm': False, 'pos_type': 'sinusoidal', 'activation': 'relu'},
    'Modern': {'pre_norm': True, 'pos_type': 'rope', 'activation': 'swiglu'},
}

print('done')

done


In [11]:
# Initialize results dictionary
results = {}
print('Ready to train models individually!')

Ready to train models individually!


In [None]:
# Train Variant 1: Original (Post-LN + Sinusoidal + ReLU)
print("="*50)
print("Training: Original")
print("="*50)

model_original = TransformerClassifier(**config, pre_norm=False, pos_type='sinusoidal', activation='relu')
history_original = train_model(model_original, train_loader, test_loader, epochs=10)
results['Original'] = history_original

# Save model
torch.save(model_original.state_dict(), 'Original_model.pt')
print(f"✓ Original completed! Final Test Acc: {history_original['test_acc'][-1]:.2f}%")

In [12]:
# Train Variant 2: Modern (Pre-LN + RoPE + SwiGLU)
print("="*50)
print("Training: Modern")
print("="*50)

model_modern = TransformerClassifier(**config, pre_norm=True, pos_type='rope', activation='swiglu')
history_modern = train_model(model_modern, train_loader, test_loader, epochs=5)
results['Modern'] = history_modern

# Save model
torch.save(model_modern.state_dict(), 'Modern_model.pt')
print(f"✓ Modern completed! Final Test Acc: {history_modern['test_acc'][-1]:.2f}%")

Training: Modern


Epoch 1/5:   0%|          | 0/313 [00:00<?, ?it/s]

Train Loss: 1.381, Train Acc: 28.84%, Test Acc: 37.10%


Epoch 2/5:   0%|          | 0/313 [00:00<?, ?it/s]

Train Loss: 1.125, Train Acc: 50.14%, Test Acc: 55.00%


Epoch 3/5:   0%|          | 0/313 [00:00<?, ?it/s]

Train Loss: 0.864, Train Acc: 65.36%, Test Acc: 67.60%


Epoch 4/5:   0%|          | 0/313 [00:00<?, ?it/s]

Train Loss: 0.697, Train Acc: 73.18%, Test Acc: 73.25%


Epoch 5/5:   0%|          | 0/313 [00:00<?, ?it/s]

Train Loss: 0.596, Train Acc: 77.93%, Test Acc: 75.40%
✓ Modern completed! Final Test Acc: 75.40%


In [None]:
# Summary of all results
print("\n" + "="*60)
print("TRAINING COMPLETE - SUMMARY")
print("="*60)

if results:
    for name, history in results.items():
        final_acc = history['test_acc'][-1]
        print(f"{name:20s}: {final_acc:.2f}%")
    print("\n✓ All trained models saved!")
else:
    print("No results yet. Run the training cells above.")

In [None]:
# Plot test accuracies
fig, axes = plt.subplots(1, 2, figsize=(15, 5))

# Training curves
for name, history in results.items():
    axes[0].plot(history['test_acc'], label=name, marker='o')

axes[0].set_xlabel('Epoch')
axes[0].set_ylabel('Test Accuracy (%)')
axes[0].set_title('Test Accuracy Over Time')
axes[0].legend()
axes[0].grid(True, alpha=0.3)

# Final accuracy comparison
final_accs = {name: history['test_acc'][-1] for name, history in results.items()}
names = list(final_accs.keys())
accs = list(final_accs.values())

axes[1].barh(names, accs)
axes[1].set_xlabel('Test Accuracy (%)')
axes[1].set_title('Final Test Accuracy Comparison')
axes[1].grid(True, alpha=0.3, axis='x')

plt.tight_layout()
plt.savefig('results_comparison.png', dpi=300, bbox_inches='tight')
plt.show()

In [None]:
# Create summary table
summary = pd.DataFrame({
    'Model': list(final_accs.keys()),
    'Test Accuracy (%)': list(final_accs.values())
})
summary = summary.sort_values('Test Accuracy (%)', ascending=False)
summary['Improvement over Original'] = summary['Test Accuracy (%)'] - summary[summary['Model']=='Original']['Test Accuracy (%)'].values[0]

print(summary.to_string(index=False))