In [1]:
import matplotlib.pyplot as plt
from matplotlib.ticker import MaxNLocator
import tiktoken
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader


class GPTTrainer:
    def __init__(self, cfg, text=None, batch_size=4, max_length=256, stride=128, device=None):
        self.cfg = cfg
        self.tokenizer = tiktoken.get_encoding("gpt2")
        self.device = device or ("cuda" if torch.cuda.is_available() else "cpu")
        self.model = self.build_model().to(self.device)
        self.train_loader = self.create_loader(text, batch_size, max_length, stride) if text else None

    def build_model(self):
        class LayerNorm(nn.Module):
            def __init__(self, emb_dim):
                super().__init__()
                self.eps = 1e-5
                self.scale = nn.Parameter(torch.ones(emb_dim))
                self.shift = nn.Parameter(torch.zeros(emb_dim))

            def forward(self, x):
                mean = x.mean(dim=-1, keepdim=True)
                var = x.var(dim=-1, keepdim=True, unbiased=False)
                return self.scale * (x - mean) / torch.sqrt(var + self.eps) + self.shift

        class GELU(nn.Module):
            def forward(self, x):
                return 0.5 * x * (1 + torch.tanh(torch.sqrt(torch.tensor(2.0 / torch.pi)) * (x + 0.044715 * x.pow(3))))

        class FeedForward(nn.Module):
            def __init__(self, cfg):
                super().__init__()
                self.net = nn.Sequential(
                    nn.Linear(cfg["emb_dim"], 4 * cfg["emb_dim"]),
                    GELU(),
                    nn.Linear(4 * cfg["emb_dim"], cfg["emb_dim"])
                )

            def forward(self, x):
                return self.net(x)

        class MultiHeadAttention(nn.Module):
            def __init__(self, d_in, d_out, context_length, dropout, num_heads, qkv_bias):
                super().__init__()
                self.num_heads = num_heads
                self.head_dim = d_out // num_heads
                self.query = nn.Linear(d_in, d_out, bias=qkv_bias)
                self.key = nn.Linear(d_in, d_out, bias=qkv_bias)
                self.value = nn.Linear(d_in, d_out, bias=qkv_bias)
                self.proj = nn.Linear(d_out, d_out)
                self.dropout = nn.Dropout(dropout)
                self.register_buffer('mask', torch.triu(torch.ones(context_length, context_length), diagonal=1))

            def forward(self, x):
                B, T, _ = x.size()
                q = self.query(x).view(B, T, self.num_heads, self.head_dim).transpose(1, 2)
                k = self.key(x).view(B, T, self.num_heads, self.head_dim).transpose(1, 2)
                v = self.value(x).view(B, T, self.num_heads, self.head_dim).transpose(1, 2)

                att = (q @ k.transpose(-2, -1)) / (self.head_dim ** 0.5)
                mask = self.mask[:T, :T].bool()
                att.masked_fill_(mask, float('-inf'))
                att = torch.softmax(att, dim=-1)
                att = self.dropout(att)
                out = (att @ v).transpose(1, 2).contiguous().view(B, T, -1)
                return self.proj(out)

        class TransformerBlock(nn.Module):
            def __init__(self, cfg):
                super().__init__()
                self.att = MultiHeadAttention(cfg["emb_dim"], cfg["emb_dim"], cfg["context_length"], cfg["drop_rate"], cfg["n_heads"], cfg["qkv_bias"])
                self.ff = FeedForward(cfg)
                self.norm1 = LayerNorm(cfg["emb_dim"])
                self.norm2 = LayerNorm(cfg["emb_dim"])
                self.drop = nn.Dropout(cfg["drop_rate"])

            def forward(self, x):
                x = x + self.drop(self.att(self.norm1(x)))
                x = x + self.drop(self.ff(self.norm2(x)))
                return x

        class GPTModel(nn.Module):
            def __init__(self, cfg):
                super().__init__()
                self.tok_emb = nn.Embedding(cfg["vocab_size"], cfg["emb_dim"])
                self.pos_emb = nn.Embedding(cfg["context_length"], cfg["emb_dim"])
                self.drop_emb = nn.Dropout(cfg["drop_rate"])
                self.blocks = nn.Sequential(*[TransformerBlock(cfg) for _ in range(cfg["n_layers"])] )
                self.norm = LayerNorm(cfg["emb_dim"])
                self.out = nn.Linear(cfg["emb_dim"], cfg["vocab_size"], bias=False)

            def forward(self, x):
                B, T = x.shape
                x = self.tok_emb(x) + self.pos_emb(torch.arange(T, device=x.device))
                x = self.drop_emb(x)
                x = self.blocks(x)
                x = self.norm(x)
                return self.out(x)

        return GPTModel(self.cfg)

    def create_loader(self, text, batch_size, max_length, stride, drop_last=True, shuffle=True):
        class GPTDataset(Dataset):
            def __init__(self, text, tokenizer, max_length, stride):
                tokens = tokenizer.encode(text, allowed_special={"<|endoftext|>"})
                self.inputs = [torch.tensor(tokens[i:i+max_length]) for i in range(0, len(tokens)-max_length, stride)]
                self.targets = [torch.tensor(tokens[i+1:i+max_length+1]) for i in range(0, len(tokens)-max_length, stride)]

            def __len__(self): return len(self.inputs)
            def __getitem__(self, i): return self.inputs[i], self.targets[i]

        dataset = GPTDataset(text, self.tokenizer, max_length, stride)
        return DataLoader(dataset, batch_size=batch_size, shuffle=True, drop_last=True)

    def calc_loss_batch(self, inputs, targets):
        inputs, targets = inputs.to(self.device), targets.to(self.device)
        logits = self.model(inputs)
        return nn.functional.cross_entropy(logits.view(-1, logits.size(-1)), targets.view(-1))

    def calc_loss_loader(self, loader, num_batches=None):
        self.model.eval()
        losses = []
        with torch.no_grad():
            for i, (x, y) in enumerate(loader):
                if num_batches and i >= num_batches: break
                losses.append(self.calc_loss_batch(x, y).item())
        self.model.train()
        return sum(losses) / len(losses)

    def generate(self, start_text, max_new_tokens=50):
        self.model.eval()
        idx = torch.tensor(self.tokenizer.encode(start_text)).unsqueeze(0).to(self.device)
        context_size = self.cfg["context_length"]
        for _ in range(max_new_tokens):
            idx_cond = idx[:, -context_size:]
            logits = self.model(idx_cond)
            probs = torch.softmax(logits[:, -1, :], dim=-1)
            idx_next = torch.argmax(probs, dim=-1, keepdim=True)
            idx = torch.cat([idx, idx_next], dim=1)
        decoded = self.tokenizer.decode(idx[0].tolist())
        self.model.train()
        return decoded

    def plot_losses(self, epochs_seen, tokens_seen, train_losses, val_losses):
        fig, ax1 = plt.subplots(figsize=(5, 3))
        ax1.plot(epochs_seen, train_losses, label="Training loss")
        ax1.plot(epochs_seen, val_losses, label="Validation loss", linestyle="-.")
        ax1.set_xlabel("Epochs")
        ax1.set_ylabel("Loss")
        ax1.legend(loc="upper right")
        ax1.xaxis.set_major_locator(MaxNLocator(integer=True))

        ax2 = ax1.twiny()
        ax2.plot(tokens_seen, train_losses, alpha=0)
        ax2.set_xlabel("Tokens seen")

        plt.tight_layout()
        plt.savefig("loss-plot.pdf")
        plt.show()

    def build_dataloaders(self, train_text, val_text, batch_size=4, max_length=256, stride=128, drop_last=True):
        self.train_loader = self.create_loader(train_text, batch_size, max_length, stride, drop_last=drop_last, shuffle=True)
        self.val_loader = self.create_loader(val_text, batch_size, max_length, stride, drop_last=False, shuffle=False)


In [5]:
import torch
GPT_CONFIG_124M = {
    "vocab_size": 50257,    # Vocabulary size
    "context_length": 256,  # Shortened context length (orig: 1024)
    "emb_dim": 768,         # Embedding dimension
    "n_heads": 12,          # Number of attention heads
    "n_layers": 12,         # Number of transformer blocks
    "drop_rate": 0.1,       # Dropout rate
    "qkv_bias": False       # Whether to use bias in QKV projections
}

In [7]:
trainer = GPTTrainer(GPT_CONFIG_124M)
model = trainer.model
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.load_state_dict(torch.load("model.pth", map_location=device))
model.eval();

In [11]:
start_text = "Every effort moves you"

token_ids = trainer.generate(
    start_text=start_text,
    max_new_tokens=10
)

print("Generated text:\n", token_ids)

Generated text:
 Every effort moves you?"

"Yes--quite insensible to
