# Notes & Tips

- Use `--pretrained` (or load with `GPT.from_pretrained('gpt2')`) for much faster convergence to low loss.
- For best performance on Colab T4: use mixed precision (AMP) and increase batch size and gradient accumulation to reach an effective large batch size.
- Save checkpoints to Drive to avoid losing progress when the runtime disconnects.
- To get to a validation loss < 0.1 quickly, you can fine-tune a pretrained model on a small dataset (but that will overfit; use a validation set to monitor generalization).


In [None]:
# Text generation example (load best checkpoint and generate)
from pathlib import Path
ckpt_path = Path(DRIVE_CKPT_DIR) / 'ckpt_best.pth'
if ckpt_path.exists():
    print('Loading checkpoint', ckpt_path)
    load_checkpoint(str(ckpt_path), model)
else:
    print('Best checkpoint not found, using current model state')

# initialize generation sequence from training batch
x, _ = train_loader.next_batch(split='train')
# take the first example in batch as prompt
x = x.to(device)[:1]

num_return_sequences = 1
max_length = 100
with torch.no_grad():
    while x.size(1) < max_length:
        logits, _ = model(x)
        logits = logits[:, -1, :]
        probs = F.softmax(logits, dim=-1)
        topk_probs, topk_indices = torch.topk(probs, 50, dim=-1)
        ix = torch.multinomial(topk_probs, 1)
        xcol = torch.gather(topk_indices, -1, ix)
        x = torch.cat((x, xcol), dim=1)

tokens = x[0].tolist()
print('Generated text:\n', train_loader.enc.decode(tokens))


In [None]:
# Example: quick run (pretrained) - adjust hyperparameters as needed
# Make sure you have ./input.txt in the notebook working directory (upload or copy from Drive)

# Example settings
B = 8
T = 128
max_steps = 2000  # increase for real training
lr = 3e-4
accum_steps = 1

# Create data loader (point path to your input.txt)
!ls -lh input.txt || echo 'input.txt not found; place it in the notebook dir or copy from Drive'
train_loader = DataLoaderLite(B=B, T=T, path='input.txt', val_fraction=0.05)

# Load pretrained GPT-2 into our model class (recommended)
model = GPT.from_pretrained('gpt2')
model.to(device)

# Kick off training (saves checkpoints to DRIVE_CKPT_DIR)
train_simple(model, train_loader, device, ckpt_dir=DRIVE_CKPT_DIR, batch_size=B, seq_len=T, max_steps=max_steps, lr=lr, weight_decay=0.1, accum_steps=accum_steps, val_interval=200, save_interval=500)


In [None]:
# Training utilities: evaluate, checkpoint saving/loading, train
import types

def evaluate(model, data_loader, device, use_amp=False, max_batches=None):
    model.eval()
    losses = []
    with torch.no_grad():
        num = 0
        while True:
            if max_batches is not None and num >= max_batches:
                break
            try:
                x, y = data_loader.next_batch(split='val')
            except Exception:
                break
            x, y = x.to(device), y.to(device)
            with torch.cuda.amp.autocast(enabled=use_amp):
                _, loss = model(x, y)
            losses.append(loss.item())
            num += 1
            if data_loader.val_position == 0:
                break
    model.train()
    return float(sum(losses) / max(1, len(losses))) if losses else float('inf')


def save_checkpoint(path, model, optimizer, scaler, step, best_val):
    ckpt = {
        'model_state': model.state_dict(),
        'optimizer_state': optimizer.state_dict() if optimizer is not None else None,
        'scaler_state': scaler.state_dict() if scaler is not None else None,
        'step': step,
        'best_val': best_val,
    }
    torch.save(ckpt, path)
    print('Saved checkpoint to', path)


def load_checkpoint(path, model, optimizer=None, scaler=None):
    ckpt = torch.load(path, map_location='cpu')
    model.load_state_dict(ckpt['model_state'])
    if optimizer is not None and ckpt.get('optimizer_state') is not None:
        optimizer.load_state_dict(ckpt['optimizer_state'])
    if scaler is not None and ckpt.get('scaler_state') is not None:
        scaler.load_state_dict(ckpt['scaler_state'])
    return ckpt.get('step', 0), ckpt.get('best_val', float('inf'))


def train_simple(
    model,
    train_loader,
    device,
    ckpt_dir,
    batch_size=8,
    seq_len=128,
    max_steps=2000,
    lr=3e-4,
    weight_decay=0.1,
    accum_steps=1,
    val_interval=200,
    save_interval=500,
    target_loss=0.099999,
    use_pretrained=False,
):
    use_amp = (device.type == 'cuda')
    optimizer = torch.optim.AdamW(model.parameters(), lr=lr, weight_decay=weight_decay)
    scaler = torch.cuda.amp.GradScaler(enabled=use_amp)
    best_val = float('inf')
    pbar = tqdm(total=max_steps)
    step = 0
    while step < max_steps:
        optimizer.zero_grad()
        total_loss = 0.0
        for _ in range(accum_steps):
            x, y = train_loader.next_batch(split='train')
            x, y = x.to(device), y.to(device)
            with torch.cuda.amp.autocast(enabled=use_amp):
                _, loss = model(x, y)
            loss = loss / accum_steps
            scaler.scale(loss).backward()
            total_loss += loss.item()
        scaler.step(optimizer)
        scaler.update()
        step += 1
        pbar.update(1)
        if step % 10 == 0:
            pbar.set_postfix({'loss': f"{total_loss:.6f}", 'step': step})
        if step % val_interval == 0:
            val_loss = evaluate(model, train_loader, device, use_amp, max_batches=32)
            print(f"[val] step {step} loss {val_loss:.6f}")
            if val_loss < best_val:
                best_val = val_loss
                save_checkpoint(os.path.join(ckpt_dir, 'ckpt_best.pth'), model, optimizer, scaler, step, best_val)
            if val_loss < target_loss:
                print(f"Target val loss {target_loss} reached at step {step} (val {val_loss:.6f}).")
                break
        if step % save_interval == 0:
            save_checkpoint(os.path.join(ckpt_dir, f'ckpt_step{step}.pth'), model, optimizer, scaler, step, best_val)
    pbar.close()
    save_checkpoint(os.path.join(ckpt_dir, 'ckpt_final.pth'), model, optimizer, scaler, step, best_val)
    print('Training finished. Best val:', best_val)


In [None]:
# Data loader (loads text and tokenizes with tiktoken)
import tiktoken

class DataLoaderLite:
    def __init__(self, B, T, path='input.txt', val_fraction=0.05):
        self.B = B
        self.T = T
        with open(path, 'r', encoding='utf-8') as f:
            text = f.read()
        self.enc = tiktoken.get_encoding('gpt2')
        tokens = self.enc.encode(text)
        self.tokens = torch.tensor(tokens, dtype=torch.long)
        self.val_split = int(val_fraction * len(self.tokens))
        if self.val_split < (B * T + 1):
            self.val_split = 0
        if self.val_split > 0:
            self.val_tokens = self.tokens[-self.val_split:]
            self.train_tokens = self.tokens[:-self.val_split]
        else:
            self.val_tokens = torch.tensor([], dtype=self.tokens.dtype)
            self.train_tokens = self.tokens
        print(f'loaded {len(self.tokens)} tokens (train={len(self.train_tokens)}, val={len(self.val_tokens)})')
        print(f'1 epoch = {len(self.train_tokens) // (B * T)} batches')
        self.current_position = 0
        self.val_position = 0

    def next_batch(self, split='train'):
        B, T = self.B, self.T
        if split == 'train' or len(self.val_tokens) == 0:
            tokens = self.train_tokens
            pos = self.current_position
            if pos + (B * T + 1) > len(tokens):
                pos = 0
            buf = tokens[pos: pos + B * T + 1]
            x = (buf[:-1]).view(B, T)
            y = (buf[1:]).view(B, T)
            self.current_position = pos + B * T
            if self.current_position + (B * T + 1) > len(tokens):
                self.current_position = 0
            return x, y
        else:
            tokens = self.val_tokens
            pos = self.val_position
            if pos + (B * T + 1) > len(tokens):
                pos = 0
            buf = tokens[pos: pos + B * T + 1]
            x = (buf[:-1]).view(B, T)
            y = (buf[1:]).view(B, T)
            self.val_position = pos + B * T
            if self.val_position + (B * T + 1) > len(tokens):
                self.val_position = 0
            return x, y


In [None]:
# GPT model and config
from dataclasses import dataclass

@dataclass
class GPTConfig:
    block_size: int = 1024
    vocab_size: int = 50257
    n_layer: int = 12
    n_head: int = 12
    n_embd: int = 768

class GPT(nn.Module):
    def __init__(self, config: GPTConfig):
        super().__init__()
        self.config = config
        self.transformer = nn.ModuleDict(dict(
            wte = nn.Embedding(config.vocab_size, config.n_embd),
            wpe = nn.Embedding(config.block_size, config.n_embd),
            h = nn.ModuleList([Block(config) for _ in range(config.n_layer)]),
            ln_f = nn.LayerNorm(config.n_embd),
        ))
        self.lm_head = nn.Linear(config.n_embd, config.vocab_size, bias=False)
        self.transformer.wte.weight = self.lm_head.weight
        self.apply(self._init_weights)

    def _init_weights(self, module):
        if isinstance(module, nn.Linear):
            std = 0.02
            if hasattr(module, 'NANGPT_SCALE_INIT'):
                std *= (2 * self.config.n_layer) ** -0.5
            torch.nn.init.normal_(module.weight, mean=0.0, std=std)
            if module.bias is not None:
                torch.nn.init.zeros_(module.bias)
        elif isinstance(module, nn.Embedding):
            torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)

    def forward(self, idx, targets=None):
        B, T = idx.size()
        pos = torch.arange(0, T, dtype=torch.long, device=idx.device)
        pos_emb = self.transformer.wpe(pos)
        tok_emb = self.transformer.wte(idx)
        x = tok_emb + pos_emb
        for block in self.transformer.h:
            x = block(x)
        x = self.transformer.ln_f(x)
        logits = self.lm_head(x)
        loss = None
        if targets is not None:
            loss = F.cross_entropy(logits.view(-1, logits.size(-1)), targets.view(-1))
        return logits, loss

    @classmethod
    def from_pretrained(cls, model_type='gpt2'):
        print('Loading pretrained from HF:', model_type)
        config_map = {
            'gpt2': dict(n_layer=12, n_head=12, n_embd=768),
            'gpt2-medium': dict(n_layer=24, n_head=16, n_embd=1024),
            'gpt2-large': dict(n_layer=36, n_head=20, n_embd=1280),
            'gpt2-xl': dict(n_layer=48, n_head=25, n_embd=1600),
        }
        cfg = GPTConfig(**config_map[model_type])
        model = GPT(cfg)
        # load HF weights and copy
        hf = GPT2LMHeadModel.from_pretrained(model_type)
        sd = model.state_dict()
        sd_hf = hf.state_dict()
        skip = [k for k in sd.keys() if k.endswith('.attn.bias')]
        sd_keys = [k for k in sd.keys() if k not in skip]
        sd_hf_keys = [k for k in sd_hf.keys() if not k.endswith('.attn.masked_bias') and not k.endswith('.attn.bias')]
        transposed = ['attn.c_attn.weight', 'attn.c_proj.weight', 'mlp.c_fc.weight', 'mlp.c_proj.weight']
        assert len(sd_keys) == len(sd_hf_keys), 'mismatched keys when loading HF model'
        for k_hf, k in zip(sd_hf_keys, sd_keys):
            if any(k.endswith(w) for w in transposed):
                sd[k].copy_(sd_hf[k_hf].t())
            else:
                sd[k].copy_(sd_hf[k_hf])
        return model


In [None]:
# Model architecture (CausalSelfAttention, MLP, Block)
import math
import torch.nn.functional as F
from torch import nn

class CausalSelfAttention(nn.Module):
    def __init__(self, config):
        super().__init__()
        assert config.n_embd % config.n_head == 0
        self.c_attn = nn.Linear(config.n_embd, 3 * config.n_embd)
        self.c_proj = nn.Linear(config.n_embd, config.n_embd)
        self.c_proj.NANGPT_SCALE_INIT = 1
        self.n_head = config.n_head
        self.n_embd = config.n_embd
        self.register_buffer("bias", torch.tril(torch.ones(config.block_size, config.block_size)).view(1, 1, config.block_size, config.block_size))

    def forward(self, x):
        B, T, C = x.size()
        qkv = self.c_attn(x)
        q, k, v = qkv.split(self.n_embd, dim=2)
        k = k.view(B, T, self.n_head, C // self.n_head).transpose(1, 2)
        q = q.view(B, T, self.n_head, C // self.n_head).transpose(1, 2)
        v = v.view(B, T, self.n_head, C // self.n_head).transpose(1, 2)

        att = (q @ k.transpose(-2, -1)) * (1.0 / math.sqrt(k.size(-1)))
        att = att.masked_fill(self.bias[:, :, :T, :T] == 0, float('-inf'))
        att = F.softmax(att, dim=-1)
        y = att @ v
        y = y.transpose(1, 2).contiguous().view(B, T, C)
        y = self.c_proj(y)
        return y

class MLP(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.c_fc    = nn.Linear(config.n_embd, 4 * config.n_embd)
        self.gelu    = nn.GELU(approximate='tanh')
        self.c_proj  = nn.Linear(4 * config.n_embd, config.n_embd)
        self.c_proj.NANGPT_SCALE_INIT = 1

    def forward(self, x):
        x = self.c_fc(x)
        x = self.gelu(x)
        x = self.c_proj(x)
        return x

class Block(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.ln_1 = nn.LayerNorm(config.n_embd)
        self.attn = CausalSelfAttention(config)
        self.ln_2 = nn.LayerNorm(config.n_embd)
        self.mlp = MLP(config)

    def forward(self, x):
        x = x + self.attn(self.ln_1(x))
        x = x + self.mlp(self.ln_2(x))
        return x


In [None]:
# Device configuration and seeds
import random
import numpy as np

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print('Using device:', device)

seed = 1337
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
if device.type == 'cuda':
    torch.cuda.manual_seed_all(seed)


In [None]:
# Upload input.txt from local machine (optional)
# Run this cell and choose your file; it will be stored as ./input.txt
from google.colab import files
uploaded = files.upload()
if 'input.txt' in uploaded:
    print('input.txt uploaded')
else:
    # if user uploaded under a different name, copy it
    for fname in uploaded:
        print('uploaded', fname)
    # optionally, if you named file differently, copy it
    # e.g., files.upload() -> uploaded['myfile.txt'] ; then rename or copy


In [None]:
# Mount Google Drive and prepare checkpoint directory
from google.colab import drive, files
import os

drive.mount('/content/drive')
DRIVE_CKPT_DIR = '/content/drive/MyDrive/gpt_ckpts'
os.makedirs(DRIVE_CKPT_DIR, exist_ok=True)
print('Checkpoints will be saved to:', DRIVE_CKPT_DIR)

print('\nIf you want to upload a local input.txt, run the cell below to upload and copy it to ./input.txt')


In [None]:
# Setup and install dependencies
!pip install -q transformers tiktoken tqdm accelerate

import torch
print('torch', torch.__version__, 'cuda available:', torch.cuda.is_available())
!nvidia-smi


# Colab: Train minGPT (GPT-2 small / 124M) on a T4 GPU

This notebook prepares the environment, mounts Google Drive for checkpoints, loads data, and trains a decoder-only model based on the `main.py` code in this repo.

Instructions:
- Runtime → Change runtime type → GPU (T4 recommended)
- Mount Drive (below) to save checkpoints and load your `input.txt` dataset
- Consider using `--pretrained` (pretrained GPT-2) for much faster convergence
