In [26]:
# Cell 1: Imports and config
import math
import torch
import torch.nn as nn
import torch.nn.functional as F
from torchtext.datasets import PennTreebank
from torchtext.data.utils import get_tokenizer
from torchtext.vocab import build_vocab_from_iterator

import random
import numpy as np

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", device)


Using device: cpu


In [2]:
# Cell 2: Load PTB
def load_ptb_split(split="train"):
    return [line for line in PennTreebank(split=split)]

train_lines = load_ptb_split("train")
val_lines = load_ptb_split("valid")
test_lines = load_ptb_split("test")

In [3]:
# Cell 3: Tokenizer and Vocab
tokenizer = get_tokenizer("basic_english")

def yield_tokens(lines):
    for line in lines:
        yield tokenizer(line)

vocab = build_vocab_from_iterator(yield_tokens(train_lines), specials=["<unk>"])
vocab.set_default_index(vocab["<unk>"])
vocab_size = len(vocab)
print("Vocab size:", vocab_size)


Vocab size: 9922


In [4]:
# Cell 4: Convert text to tensor
def process_data(lines):
    return torch.cat([
        torch.tensor(vocab(tokenizer(line)), dtype=torch.long)
        for line in lines
    ])

train_data = process_data(train_lines)
val_data = process_data(val_lines)
test_data = process_data(test_lines)
print("Train data tokens:", train_data.size(0))


Train data tokens: 924412


In [5]:
# Cell 5: Get batches
def get_batch(data, block_size, batch_size):
    ix = torch.randint(len(data) - block_size - 1, (batch_size,))
    x = torch.stack([data[i:i+block_size] for i in ix])
    y = torch.stack([data[i+1:i+1+block_size] for i in ix])
    return x.to(device), y.to(device)

block_size = 64
batch_size = 32


In [6]:
# Cell 6: Positional Encoding
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=5000):
        super().__init__()
        pe = torch.zeros(max_len, d_model)
        pos = torch.arange(0, max_len).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2) * -(math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(pos * div_term)
        pe[:, 1::2] = torch.cos(pos * div_term)
        pe = pe.unsqueeze(0)  # [1, max_len, d_model]
        self.register_buffer('pe', pe)

    def forward(self, x):
        return x + self.pe[:, :x.size(1)]


In [7]:
# Cell 7: Attention function with optional masking
def attention(q, k, v, mask=None, dropout=None):
    d_k = q.size(-1)
    scores = q @ k.transpose(-2, -1) / math.sqrt(d_k)  # [B, heads, T, T]

    if mask is not None:
        scores = scores.masked_fill(mask[:, :, :scores.size(-2), :scores.size(-1)] == 0, float('-inf'))

    attn = F.softmax(scores, dim=-1)

    if dropout is not None:
        attn = dropout(attn)

    return attn @ v  # [B, heads, T, d_k]


In [8]:
# Cell 8: Multi-head attention
class MultiHeadAttention(nn.Module):
    def __init__(self, embed_dim, num_heads, dropout=0.1):
        super().__init__()
        assert embed_dim % num_heads == 0
        self.d_k = embed_dim // num_heads
        self.num_heads = num_heads

        self.qkv_proj = nn.Linear(embed_dim, embed_dim * 3)
        self.out_proj = nn.Linear(embed_dim, embed_dim)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, mask=None):
        B, T, C = x.size()
    
        # Project to Q, K, V and reshape
        qkv = self.qkv_proj(x)  # (B, T, 3*C)
        qkv = qkv.view(B, T, 3, self.num_heads, self.d_k)  # (B, T, 3, heads, d_k)
        qkv = qkv.permute(2, 0, 3, 1, 4)  # (3, B, heads, T, d_k)
        q, k, v = qkv[0], qkv[1], qkv[2]  # each: (B, heads, T, d_k)
    
        # Attention
        x = attention(q, k, v, mask=mask, dropout=self.dropout)  # (B, heads, T, d_k)
    
        # Recombine heads
        x = x.transpose(1, 2).contiguous()  # (B, T, heads, d_k)
        x = x.view(B, T, C)  # (B, T, C)
    
        return self.out_proj(x)




In [9]:
# Cell 9: Position-wise feedforward network
class FeedForward(nn.Module):
    def __init__(self, embed_dim, ff_dim, dropout=0.1):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(embed_dim, ff_dim),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(ff_dim, embed_dim),
            nn.Dropout(dropout),
        )

    def forward(self, x):
        return self.net(x)


In [10]:
# Cell 10: One decoder block
class GPTBlock(nn.Module):
    def __init__(self, embed_dim, num_heads, ff_dim, dropout=0.1):
        super().__init__()
        self.ln1 = nn.LayerNorm(embed_dim)
        self.ln2 = nn.LayerNorm(embed_dim)
        self.attn = MultiHeadAttention(embed_dim, num_heads, dropout)
        self.ff = FeedForward(embed_dim, ff_dim, dropout)

    def forward(self, x, mask=None):
        x = x + self.attn(self.ln1(x), mask)
        x = x + self.ff(self.ln2(x))
        return x


In [11]:
# Cell 11: Full GPT model
class GPT(nn.Module):
    def __init__(self, vocab_size, embed_dim, block_size, num_layers, num_heads, ff_dim, dropout=0.1):
        super().__init__()
        self.token_embed = nn.Embedding(vocab_size, embed_dim)
        self.pos_enc = PositionalEncoding(embed_dim, max_len=block_size)
        self.blocks = nn.ModuleList([
            GPTBlock(embed_dim, num_heads, ff_dim, dropout) for _ in range(num_layers)
        ])
        self.ln_f = nn.LayerNorm(embed_dim)
        self.head = nn.Linear(embed_dim, vocab_size)

        self.block_size = block_size
        self.apply(self._init_weights)

    def forward(self, idx, targets=None):
        B, T = idx.size()
        assert T <= self.block_size, "Sequence length exceeds block size."

        tok_emb = self.token_embed(idx)           # (B, T, C)
        x = self.pos_enc(tok_emb)                 # (B, T, C)
        mask = torch.tril(torch.ones(T, T)).to(device)
        mask = mask.unsqueeze(0).unsqueeze(0)  # Shape: [1, 1, T, T]


        for block in self.blocks:
            x = block(x, mask=mask)
        x = self.ln_f(x)
        logits = self.head(x)                     # (B, T, vocab_size)

        if targets is None:
            return logits, None

        loss = F.cross_entropy(logits.view(-1, logits.size(-1)), targets.view(-1))
        return logits, loss

    def _init_weights(self, module):
        if isinstance(module, (nn.Linear, nn.Embedding)):
            nn.init.normal_(module.weight, mean=0.0, std=0.02)
            if isinstance(module, nn.Linear) and module.bias is not None:
                nn.init.zeros_(module.bias)


In [12]:
# Cell 12: Define hyperparameters and initialize model
embed_dim = 256
num_heads = 4
num_layers = 4
ff_dim = 1024
dropout = 0.1

model = GPT(
    vocab_size=vocab_size,
    embed_dim=embed_dim,
    block_size=block_size,
    num_layers=num_layers,
    num_heads=num_heads,
    ff_dim=ff_dim,
    dropout=dropout
).to(device)

optimizer = torch.optim.AdamW(model.parameters(), lr=3e-4)


In [16]:
# Cell 13: Training loop
def train(model, train_data, val_data, epochs=5, batch_size=32, block_size=64, eval_interval=500):
    model.train()
    steps_per_epoch = 1400

    for epoch in range(epochs):
        total_loss = 0
        for step in range(steps_per_epoch):
            xb, yb = get_batch(train_data, block_size, batch_size)

            logits, loss = model(xb, yb)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            total_loss += loss.item()

            if step % eval_interval == 0 and step > 0:
                val_loss = evaluate(model, val_data, block_size, batch_size)
                print(f"Epoch {epoch+1}, Step {step}, Train Loss: {loss.item():.4f}, Val Loss: {val_loss:.4f}")

        avg_loss = total_loss / step
        print(f"Epoch {epoch+1} completed | Avg Train Loss: {avg_loss:.4f}")


In [17]:
# Cell 14: Evaluation function
@torch.no_grad()
def evaluate(model, data, block_size, batch_size):
    model.eval()
    losses = []
    for _ in range(50):  # evaluate on 50 mini-batches
        xb, yb = get_batch(data, block_size, batch_size)
        _, loss = model(xb, yb)
        losses.append(loss.item())
    model.train()
    return sum(losses) / len(losses)


In [18]:
# Cell 15: Run training
train(
    model=model,
    train_data=train_data,
    val_data=val_data,
    epochs=5,
    batch_size=batch_size,
    block_size=block_size,
    eval_interval=200
)


Epoch 1, Step 200, Train Loss: 4.4192, Val Loss: 4.7861
Epoch 1, Step 400, Train Loss: 4.2175, Val Loss: 4.7714
Epoch 1, Step 600, Train Loss: 4.4711, Val Loss: 4.7919
Epoch 1, Step 800, Train Loss: 4.2877, Val Loss: 4.7762
Epoch 1, Step 1000, Train Loss: 4.0098, Val Loss: 4.7211
Epoch 1, Step 1200, Train Loss: 4.0254, Val Loss: 4.7668
Epoch 1 completed | Avg Train Loss: 4.1910
Epoch 2, Step 200, Train Loss: 4.2112, Val Loss: 4.6928
Epoch 2, Step 400, Train Loss: 3.8234, Val Loss: 4.7334
Epoch 2, Step 600, Train Loss: 3.8045, Val Loss: 4.7407
Epoch 2, Step 800, Train Loss: 3.8953, Val Loss: 4.7343
Epoch 2, Step 1000, Train Loss: 4.1481, Val Loss: 4.7125
Epoch 2, Step 1200, Train Loss: 3.9769, Val Loss: 4.7365
Epoch 2 completed | Avg Train Loss: 3.9385
Epoch 3, Step 200, Train Loss: 3.8361, Val Loss: 4.7048
Epoch 3, Step 400, Train Loss: 3.8071, Val Loss: 4.7656
Epoch 3, Step 600, Train Loss: 3.6637, Val Loss: 4.7661
Epoch 3, Step 800, Train Loss: 3.4489, Val Loss: 4.7519
Epoch 3, Step 

In [19]:
# Cell 16: Final evaluation on test set
test_loss = evaluate(model, test_data, block_size, batch_size)
print(f"Final Test Loss: {test_loss:.4f}")

Final Test Loss: 4.8508


In [20]:
# Cell 17: Text generation function
def generate_text(model, vocab, tokenizer, prompt, max_new_tokens=50):
    model.eval()
    idx = torch.tensor(vocab(tokenizer(prompt)), dtype=torch.long).unsqueeze(0).to(device)  # [1, T]

    for _ in range(max_new_tokens):
        idx_cond = idx[:, -block_size:]  # crop to block_size
        logits, _ = model(idx_cond)
        next_token_logits = logits[:, -1, :]  # [1, vocab_size]
        probs = F.softmax(next_token_logits, dim=-1)  # [1, vocab_size]
        next_token = torch.multinomial(probs, num_samples=1)  # [1, 1]
        idx = torch.cat((idx, next_token), dim=1)  # append to sequence

    output_tokens = idx[0].tolist()
    output_text = ' '.join(vocab.lookup_tokens(output_tokens))
    return output_text


In [25]:
# Cell 18: Run generation from a prompt
prompt = "In the future,"
generated = generate_text(model, vocab, tokenizer, prompt, max_new_tokens=50)
print("Generated text:\n")
print(generated)


Generated text:

in the future <unk> funds have fallen to several percentage points on the decline in yields on the three-month cds the derivative markets fell higher than the the average yield on six-month treasury bills was slightly stronger says <unk> <unk> a vice president for finance at credit suisse the show <unk> several health disasters


In [27]:
# Cell 19: Save trained model
model_path = "gpt_ptb_model.pth"
torch.save({
    "model_state_dict": model.state_dict(),
    "vocab": vocab,
    "config": {
        "vocab_size": vocab_size,
        "embed_dim": embed_dim,
        "block_size": block_size,
        "num_layers": num_layers,
        "num_heads": num_heads,
        "ff_dim": ff_dim,
        "dropout": dropout
    }
}, model_path)

print(f"Model saved to {model_path}")


Model saved to gpt_ptb_model.pth
