# Load Packages

In [None]:
torch.__version__

In [38]:
!pip install -q tiktoken

In [39]:
import tiktoken
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader

In [40]:
torch.manual_seed(42)

<torch._C.Generator at 0x7d3dc8155dd0>

In [41]:
!wget https://raw.githubusercontent.com/karpathy/char-rnn/refs/heads/master/data/tinyshakespeare/input.txt

--2025-04-05 17:49:13--  https://raw.githubusercontent.com/karpathy/char-rnn/refs/heads/master/data/tinyshakespeare/input.txt
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.111.133, 185.199.108.133, 185.199.109.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.111.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1115394 (1.1M) [text/plain]
Saving to: ‘input.txt.1’


2025-04-05 17:49:13 (115 MB/s) - ‘input.txt.1’ saved [1115394/1115394]



In [42]:
with open('input.txt') as file:
    corpus = file.read()

corpus[:100]

'First Citizen:\nBefore we proceed any further, hear me speak.\n\nAll:\nSpeak, speak.\n\nFirst Citizen:\nYou'

# Build Model

In [43]:
GPT2_SMALL_CONFIG = {
    "vocab_size": 50257,
    "context_length": 1024,       # Max sequence length (n_positions)
    "embedding_dim": 768,         # Hidden size (n_embd)
    "n_attention_heads": 12,      # Number of attention heads
    "n_layers": 12,               # Number of transformer blocks
    "dropout_rate": 0.1,
    "qkv_bias": True
}

In [44]:
class LayerNorm(nn.Module):
    def __init__(self, embedding_dim, eps=1e-12):
        super(LayerNorm, self).__init__()
        self.eps = eps
        self.scale = nn.Parameter(torch.ones(embedding_dim))
        self.shift = nn.Parameter(torch.zeros(embedding_dim))

    def forward(self, x):
        mean = torch.mean(x, dim=-1, keepdim=True)
        var = torch.var(x, dim=-1, keepdim=True)
        normalized_x = (x - mean) / torch.sqrt(var + self.eps)
        return self.scale * normalized_x + self.shift

In [45]:
class GELU(nn.Module):
    def __init__(self) -> None:
        super(GELU, self).__init__()

    def forward(self, x):
        return 0.5 * x * (1 + torch.tanh(torch.sqrt(torch.tensor(2 / torch.pi)) *
                                         (x + 0.044715 * torch.pow(x, 3))))

In [46]:
class FeedForward(nn.Module):
    def __init__(self, embedding_dim):
        super(FeedForward, self).__init__()
        self.layers = nn.Sequential(
            nn.Linear(embedding_dim, 4 * embedding_dim),
            GELU(),
            nn.Linear(4 * embedding_dim, embedding_dim),
        )

    def forward(self, x):
        return self.layers(x)

In [47]:
class MultiHeadAttention(nn.Module):
    def __init__(self, in_dim, out_dim, context_length, n_heads, dropout, qkv_bias):
        super(MultiHeadAttention, self).__init__()

        assert(out_dim % n_heads == 0), "out_dim must be divisible by n_heads"

        self.out_dim = out_dim
        self.n_heads = n_heads
        self.head_dim = out_dim // n_heads
        self.query_weights = nn.Linear(in_dim, out_dim, bias=qkv_bias)
        self.key_weights = nn.Linear(in_dim, out_dim, bias=qkv_bias)
        self.value_weights = nn.Linear(in_dim, out_dim, bias=qkv_bias)
        self.out_weights = nn.Linear(out_dim, out_dim)
        self.dropout = nn.Dropout(dropout)
        self.register_buffer(
            "mask",
            torch.triu(torch.ones(context_length, context_length), diagonal=1)
        )

    def forward(self, x):
        batch_size, context_length, in_dim = x.shape

        queries = self.query_weights(x)
        keys = self.key_weights(x)
        values = self.value_weights(x)

        queries = queries.view(batch_size, context_length, self.n_heads, self.head_dim)
        keys = keys.view(batch_size, context_length, self.n_heads, self.head_dim)
        values = values.view(batch_size, context_length, self.n_heads, self.head_dim)

        queries = queries.transpose(1, 2)
        keys = keys.transpose(1, 2)
        values = values.transpose(1, 2)

        scores = queries @ keys.transpose(-2, -1)
        mask_bool = self.mask.bool()[:context_length, :context_length]
        scores.masked_fill_(mask_bool, -torch.inf)

        attention_weights = torch.softmax(scores / (keys.shape[-1] ** 0.5), dim=-1)
        attention_weights = self.dropout(attention_weights)

        context_vec = attention_weights @ values
        context_vec = context_vec.transpose(1, 2).contiguous().view(
            batch_size, context_length, self.out_dim
        )
        context_vec = self.out_weights(context_vec)

        return context_vec

In [48]:
class TransformerBlock(nn.Module):
    def __init__(self, cfg):
        super(TransformerBlock, self).__init__()
        self.attention = MultiHeadAttention(
            cfg["embedding_dim"],
            cfg["embedding_dim"],
            cfg["context_length"],
            cfg["n_attention_heads"],
            cfg["dropout_rate"],
            cfg["qkv_bias"],
        )
        self.feed_forward = FeedForward(cfg["embedding_dim"])
        self.layer_norm1 = LayerNorm(cfg["embedding_dim"])
        self.layer_norm2 = LayerNorm(cfg["embedding_dim"])
        self.dropout = nn.Dropout(cfg["dropout_rate"])

    def forward(self, x):
        shortcut = x
        x = self.layer_norm1(x)
        x = self.attention(x)
        x = self.dropout(x)
        x = shortcut + x

        shortcut = x
        x = self.layer_norm2(x)
        x = self.feed_forward(x)
        x = self.dropout(x)
        x = shortcut + x

        return x

In [49]:
class GPT(nn.Module):
    def __init__(self, cfg):
        super(GPT, self).__init__()
        self.token_embedding = nn.Embedding(cfg["vocab_size"], cfg["embedding_dim"])
        self.position_embedding = nn.Embedding(cfg["context_length"], cfg["embedding_dim"])
        self.blocks = nn.Sequential(*[TransformerBlock(cfg) for _ in range(cfg["n_layers"])])
        self.dropout = nn.Dropout(cfg["dropout_rate"])
        self.layer_norm = LayerNorm(cfg["embedding_dim"])
        self.out_head = nn.Linear(cfg["embedding_dim"], cfg["vocab_size"])

    def forward(self, x):
        batch_size, context_length = x.shape
        token_embeddings = self.token_embedding(x)
        position_embeddings = self.position_embedding(
            torch.arange(context_length, dtype=torch.long, device=x.device))
        x = token_embeddings + position_embeddings

        x = self.dropout(x)
        x = self.blocks(x)
        x = self.layer_norm(x)
        logits = self.out_head(x)
        return logits


In [50]:
def encode_text(text, tokenizer):
    encoded = tokenizer.encode(text, allowed_special={'<|endoftext|>'})
    encoded_tensor = torch.tensor(encoded).unsqueeze(0)
    return encoded_tensor

def decode_tokens(token_ids, tokenizer):
    flat = token_ids.squeeze(0)
    return tokenizer.decode(flat.tolist())

# Train Corpus

In [51]:
class GPTDataset(Dataset):
    def __init__(self, txt, tokenizer, max_length, stride):
        self.input_ids = []
        self.target_ids = []

        token_ids = tokenizer.encode(txt)

        for i in range(0, len(token_ids) - max_length, stride):
            input_chunk = token_ids[i:i + max_length]
            target_chunk = token_ids[i + 1: i + max_length + 1]
            self.input_ids.append(torch.tensor(input_chunk))
            self.target_ids.append(torch.tensor(target_chunk))

    def __len__(self):
        return len(self.input_ids)

    def __getitem__(self, idx):
        return self.input_ids[idx], self.target_ids[idx]


In [52]:
def create_dataloader(corpus, batch_size=4, max_length=256,
                         stride=128, shuffle=True, drop_last=True,
                         num_workers=0):
    tokenizer = tiktoken.get_encoding("gpt2")
    dataset = GPTDataset(corpus, tokenizer, max_length, stride)
    dataloader = DataLoader(
        dataset,
        batch_size=batch_size,
        shuffle=shuffle,
        drop_last=drop_last,
        num_workers=num_workers
    )

    return dataloader

In [53]:
train_size = int(0.9 * len(corpus))
train_text, val_text = corpus[:train_size], corpus[train_size:]

In [54]:
train_loader = create_dataloader(
    train_text,
    batch_size=2,
    max_length=GPT2_SMALL_CONFIG["context_length"],
    stride=GPT2_SMALL_CONFIG["context_length"],
    drop_last=True,
    shuffle=True,
    num_workers=0
)
val_loader = create_dataloader(
    val_text,
    batch_size=2,
    max_length=GPT2_SMALL_CONFIG["context_length"],
    stride=GPT2_SMALL_CONFIG["context_length"],
    drop_last=False,
    shuffle=False,
    num_workers=0
)

In [55]:
def calc_loss_batch(input_batch, target_batch, model, device):
    input_batch = input_batch.to(device)
    target_batch = target_batch.to(device)
    logits = model(input_batch)
    loss = torch.nn.functional.cross_entropy(
        logits.flatten(0, 1), target_batch.flatten())

    return loss

In [56]:
def calc_loss(data_loader, model, device, num_batches=None):
    total_loss = 0.
    if len(data_loader) == 0:
        return float("nan")
    elif num_batches is None:
        num_batches = len(data_loader)
    else:
        num_batches = min(num_batches, len(data_loader))

    for i, (input_batch, target_batch) in enumerate(data_loader):
        if i < num_batches:
            loss = calc_loss_batch(
                input_batch, target_batch, model, device)
            total_loss += loss.item()
        else:
            break

    return total_loss / num_batches

In [57]:
def evaluate_model(model, train_loader, val_loader, device, eval_iter):
    model.eval()
    with torch.no_grad():
        train_loss = calc_loss(
            train_loader, model, device, num_batches=eval_iter
        )
        val_loss = calc_loss(
            val_loader, model, device, num_batches=eval_iter
        )
    model.train()
    return train_loss, val_loss

In [58]:
def generate(model, idx, max_new_tokens, context_size,
             temperature=0.0, top_k=None, eos_id=None):
    for _ in range(max_new_tokens):
        idx_cond = idx[:, -context_size:]
        with torch.no_grad():
            logits = model(idx_cond)

        logits = logits[:, -1, :]
        if top_k is not None:
            top_logits, _ = torch.topk(logits, top_k)
            min_val = top_logits[:, -1]
            logits = torch.where(
                logits < min_val,
                torch.tensor(float('-inf')).to(logits.device),
                logits
            )

        if temperature > 0.0:
            logits = logits / temperature
            probs = torch.softmax(logits, dim=-1)
            idx_next = torch.multinomial(probs, num_samples=1)
        else:
            idx_next = torch.argmax(logits, dim=-1, keepdim=True)

        if idx_next == eos_id:
            break
        idx = torch.cat((idx, idx_next), dim=1)

    return idx

In [59]:
def train_model(model, train_loader, val_loader,
                       optimizer, device, num_epochs,
                       eval_freq, eval_iter, start_context):
    train_losses, val_losses, track_tokens_seen = [], [], []
    tokens_seen, global_step = 0, -1

    for epoch in range(num_epochs):
        model.train()
        for input_batch, target_batch in train_loader:
            optimizer.zero_grad()
            loss = calc_loss_batch(
                input_batch, target_batch, model, device
            )
            loss.backward()
            optimizer.step()
            tokens_seen += input_batch.numel()
            global_step += 1

            if global_step % eval_freq == 0:
                train_loss, val_loss = evaluate_model(
                    model, train_loader, val_loader, device, eval_iter)
                train_losses.append(train_loss)
                val_losses.append(val_loss)
                track_tokens_seen.append(tokens_seen)
                print(f"Ep {epoch+1} (Step {global_step:06d}): "
                      f"Train loss {train_loss:.3f}, "
                      f"Val loss {val_loss:.3f}"
                )

                generate(
                    model, start_context, 50, GPT2_SMALL_CONFIG["context_length"],
                    temperature=1.3
                )

    return train_losses, val_losses, track_tokens_seen

In [60]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'
model = GPT(GPT2_SMALL_CONFIG).to(device)
tokenizer = tiktoken.get_encoding("gpt2")
optimizer = torch.optim.AdamW(model.parameters(), lr=5e-4)

In [61]:
train_model(
    model, train_loader, val_loader, optimizer, device,
    5, 100, 100, encode_text('Alone, wind howled', tokenizer).to(device))

Ep 1 (Step 000000): Train loss 9.287, Val loss 9.207
Ep 1 (Step 000100): Train loss 5.835, Val loss 6.019
Ep 2 (Step 000200): Train loss 5.255, Val loss 5.604
Ep 3 (Step 000300): Train loss 4.846, Val loss 5.495
Ep 3 (Step 000400): Train loss 4.562, Val loss 5.272
Ep 4 (Step 000500): Train loss 4.310, Val loss 5.153
Ep 5 (Step 000600): Train loss 4.053, Val loss 5.171
Ep 5 (Step 000700): Train loss 3.813, Val loss 5.141


([9.286900596618652,
  5.835445618629455,
  5.255203905105591,
  4.846249089241028,
  4.561634974479675,
  4.309967260360718,
  4.053466455936432,
  3.8125492548942566],
 [9.206994745466444,
  6.019253810246785,
  5.604188892576429,
  5.495038323932224,
  5.271844651963976,
  5.15303021007114,
  5.17082945505778,
  5.140641530354817],
 [2048, 206848, 411648, 616448, 821248, 1026048, 1230848, 1435648])

In [63]:
torch.save(model.state_dict(), 'gpt2-small.pth')

In [65]:
res = generate(
    model, encode_text('Alone, wind howled', tokenizer).to(device), 50, GPT2_SMALL_CONFIG["context_length"],
    temperature=1.3)

print(decode_tokens(res, tokenizer))

Alone, wind howled Hermione was; circling do meet welcome.
PETERIA:
se limit meet without fellowship, whom he-- Reaper v felonapWound for what stay and patience.
pherd:
Every thousand otherft a king nor sentenced York shall lie


In [78]:
res = generate(
    model,
    encode_text('Once upon a time', tokenizer).to(device),
    50, GPT2_SMALL_CONFIG["context_length"],
    temperature=1.3)

print(decode_tokens(res, tokenizer))

Once upon a time
Plagles SRver I am the prince from dark?
Ay, instruct sovereign worsories tunes conj torts such deceit and tumultuous prodig your bed in craftpy.

Thy time the chalmost M blackmail infection think can Warwick shall


In [75]:
train_model(
    model, train_loader, val_loader, optimizer, device,
    1, 100, 100, encode_text('Alone, wind howled', tokenizer).to(device))

Ep 1 (Step 000000): Train loss 3.780, Val loss 5.194
Ep 1 (Step 000100): Train loss 3.468, Val loss 5.194


([3.7798954343795774, 3.46810697555542],
 [5.193729877471924, 5.194102552202013],
 [2048, 206848])