Import Libraries

In [2]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
import os
import re
import math
import numpy as np
from google.colab import files

Load Dataset

In [3]:
uploaded = files.upload()

train_path = "ptb.train.txt"
valid_path = "ptb.valid.txt"
test_path = "ptb.test.txt"

with open(train_path, "r") as f:
    train_text = f.read()

with open(valid_path, "r") as f:
    valid_text = f.read()

with open(test_path, "r") as f:
    test_text = f.read()

print("Sample from training text:\n", train_text[:200])

Saving ptb.test.txt to ptb.test.txt
Saving ptb.train.txt to ptb.train.txt
Saving ptb.valid.txt to ptb.valid.txt
Sample from training text:
  aer banknote berlitz calloway centrust cluett fromstein gitano guterman hydro-quebec ipo kia memotec mlx nahb punts rake regatta rubens sim snack-food ssangyong swapo wachter 
 pierre <unk> N years o


Tokenization

In [1]:
!pip install tokenizers



In [4]:
from tokenizers import ByteLevelBPETokenizer

# Train a BPE tokenizer on your training text
tokenizer = ByteLevelBPETokenizer()
tokenizer.train_from_iterator([train_text], vocab_size=8000, min_frequency=2, special_tokens=["<unk>"])

# Use the trained tokenizer instead of whitespace split
def tokenize(text):
    return tokenizer.encode(text).tokens

train_tokens = tokenize(train_text)
valid_tokens = tokenize(valid_text)
test_tokens = tokenize(test_text)

# Build vocab from tokenizer
stoi = tokenizer.get_vocab()
itos = {i: w for w, i in stoi.items()}
vocab_size = len(stoi)
print("Vocab size:", vocab_size)

def encode(tokens):
    return [stoi.get(t, stoi["<unk>"]) for t in tokens]

def decode(indices):
    return [itos[i] for i in indices]

train_ids = encode(train_tokens)
valid_ids = encode(valid_tokens)
test_ids = encode(test_tokens)

Vocab size: 8000


Dataset Class

In [None]:
class LanguageModelingDataset(Dataset):
    def __init__(self, ids, block_size):
        self.ids = ids
        self.block_size = block_size

    def __len__(self):
        return len(self.ids) - self.block_size

    def __getitem__(self, idx):
        x = torch.tensor(self.ids[idx:idx+self.block_size], dtype=torch.long)
        y = torch.tensor(self.ids[idx+1:idx+self.block_size+1], dtype=torch.long)
        return x, y

block_size = 64
train_dataset = LanguageModelingDataset(train_ids, block_size)
valid_dataset = LanguageModelingDataset(valid_ids, block_size)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
valid_loader = DataLoader(valid_dataset, batch_size=32)


Positional Encoding

In [None]:
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=5000):
        super().__init__()
        self.max_len = max_len
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len).unsqueeze(1).float()
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))

        pe[:, 0::2] = torch.sin(position * div_term)  # even indices
        pe[:, 1::2] = torch.cos(position * div_term)  # odd indices
        pe = pe.unsqueeze(0)
        self.register_buffer('pe', pe)

    def forward(self, x):
        # x shape: (batch_size, seq_len, d_model)
        return x + self.pe[:, :x.size(1)]


Casual Mask

In [None]:
def generate_causal_mask(size):
    mask = torch.tril(torch.ones(size, size)).unsqueeze(0)
    return mask

Masked Multi-Head self-attention

In [None]:
class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, num_heads):
        super().__init__()
        assert d_model % num_heads == 0
        self.d_model = d_model
        self.num_heads = num_heads
        self.head_dim = d_model // num_heads

        self.qkv = nn.Linear(d_model, d_model * 3)
        self.fc_out = nn.Linear(d_model, d_model)

    def forward(self, x, mask):
        B, T, C = x.shape
        qkv = self.qkv(x).chunk(3, dim=-1)
        q, k, v = [t.view(B, T, self.num_heads, self.head_dim).transpose(1, 2) for t in qkv]

        scores = (q @ k.transpose(-2, -1)) / math.sqrt(self.head_dim)
        scores = scores.masked_fill(mask[:, :T, :T] == 0, float('-inf'))
        attn = torch.softmax(scores, dim=-1)
        out = (attn @ v).transpose(1, 2).contiguous().view(B, T, C)
        return self.fc_out(out)


Feed-Forward

In [None]:
class FeedForward(nn.Module):
    def __init__(self, d_model, d_ff):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(d_model, d_ff),
            nn.ReLU(),
            nn.Linear(d_ff, d_model)
        )

    def forward(self, x):
        return self.net(x)


Decoder

In [None]:
class DecoderBlock(nn.Module):
    def __init__(self, d_model, num_heads, d_ff):
        super().__init__()
        self.attn = MultiHeadAttention(d_model, num_heads)
        self.ff = FeedForward(d_model, d_ff)
        self.ln1 = nn.LayerNorm(d_model)
        self.ln2 = nn.LayerNorm(d_model)

    def forward(self, x, mask):
        x = self.ln1(x + self.attn(x, mask))
        x = self.ln2(x + self.ff(x))
        return x


Transformer Decoder

In [None]:
class TransformerDecoder(nn.Module):
    def __init__(self, vocab_size, d_model=256, num_heads=8, d_ff=1024, num_layers=4, max_len=512):
        super().__init__()
        self.token_embed = nn.Embedding(vocab_size, d_model)
        self.pos_embed = PositionalEncoding(d_model, max_len)
        self.layers = nn.ModuleList([DecoderBlock(d_model, num_heads, d_ff) for _ in range(num_layers)])
        self.norm = nn.LayerNorm(d_model)
        self.lm_head = nn.Linear(d_model, vocab_size)

    def forward(self, x):
        B, T = x.shape
        mask = generate_causal_mask(T).to(x.device)
        x = self.token_embed(x)
        x = self.pos_embed(x)
        for layer in self.layers:
            x = layer(x, mask)
        x = self.norm(x)
        return self.lm_head(x)


Training

In [None]:
# Checkpoint Utils
def save_checkpoint(model, optimizer, epoch, path="checkpoint.pt"):
    torch.save({
        'epoch': epoch,
        'model_state': model.state_dict(),
        'optimizer_state': optimizer.state_dict()
    }, path)
    print(f"Checkpoint saved at {path}")

def load_checkpoint(model, optimizer, path="checkpoint.pt", device="cpu"):
    checkpoint = torch.load(path, map_location=device)
    model.load_state_dict(checkpoint['model_state'])
    optimizer.load_state_dict(checkpoint['optimizer_state'])
    print(f"Checkpoint loaded from {path}, epoch {checkpoint['epoch']}")
    return checkpoint['epoch']


# Training Loop with checkpoint
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = TransformerDecoder(vocab_size).to(device)
optimizer = optim.Adam(model.parameters(), lr=3e-4)
criterion = nn.CrossEntropyLoss()

# Resume from checkpoint if exists
start_epoch = 1
# Uncomment to resume:
# start_epoch = load_checkpoint(model, optimizer, "decoder_checkpoint.pt", device=device) + 1

num_epochs = 3
for epoch in range(start_epoch, num_epochs + 1):
    # Training
    model.train()
    total_loss = 0
    for xb, yb in train_loader:
        xb, yb = xb.to(device), yb.to(device)
        logits = model(xb)
        loss = criterion(logits.view(-1, vocab_size), yb.view(-1))
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        total_loss += loss.item()

    # Validation
    val_loss = 0
    model.eval()
    with torch.no_grad():
        for xb, yb in valid_loader:
            xb, yb = xb.to(device), yb.to(device)
            logits = model(xb)
            loss = criterion(logits.view(-1, vocab_size), yb.view(-1))
            val_loss += loss.item()

    print(f"Epoch {epoch} | Train Loss: {total_loss/len(train_loader):.4f} | Val Loss: {val_loss/len(valid_loader):.4f}")

    # Save checkpoint after each epoch
    save_checkpoint(model, optimizer, epoch, f"decoder_checkpoint_epoch{epoch}.pt")


Text Generation

In [None]:
def generate_text(model, start_text, stoi, itos, max_new_tokens=50, temperature=1.0, device="cpu"):
    model.eval()
    tokens = start_text.split()
    input_ids = torch.tensor([stoi.get(t, stoi["<unk>"]) for t in tokens], dtype=torch.long, device=device).unsqueeze(0)

    for _ in range(max_new_tokens):
        with torch.no_grad():
            logits = model(input_ids)[:, -1, :]  # last step logits
            logits = logits / temperature
            probs = F.softmax(logits, dim=-1)
            next_id = torch.multinomial(probs, num_samples=1)
            input_ids = torch.cat([input_ids, next_id], dim=1)

    output_tokens = [itos[i.item()] for i in input_ids[0]]
    return " ".join(output_tokens)


#Usage
print(generate_text(model, "the company", stoi, itos, max_new_tokens=30, device=device))

