## Building GPT Language Model

In [45]:
import torch
import torch.nn as nn
from torch.nn import functional as F

device = "cuda" if torch.cuda.is_available() else "cpu"
print(device)

cpu


In [75]:
chars = ""
with open("../samples/wizard_of_oz.txt", "r") as f:
    text = f.read()
    chars = sorted(list(set(text)))

print(chars)

vocab_size = len(chars)
print(vocab_size)

['\n', ' ', '!', '"', '$', '%', '&', "'", '(', ')', '*', ',', '-', '.', '/', '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', ':', ';', '?', 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z', '—', '‘', '’', '“', '”', '•', '™']
87


In [76]:
import random
import re

# Remove unwanted symbols (keep only letters, punctuation, some symbols and spaces)
text = re.sub(r'[^a-zA-Z\s.,;!?()\'\-—]', '', text)

# Remove single newlines within paragraphs (replace them with space)
text = re.sub(r'(?<!\n)\n(?!\n)', ' ', text)

# Ensure paragraph breaks (keep double newlines as they separate paragraphs)
text = re.sub(r'\n{2,}', '\n\n', text)  # Replace 3+ newlines with 2
text = re.sub(r'[ \t]+', ' ', text)

print(text[514:2000])

They cry Oz--Oz! more about Oz, Mr. Baum! and what can I do but obey their commands?

This is Our Book--mine and the children's. For they have flooded me with thousands of suggestions in regard to it, and I have honestly tried to adopt as many of these suggestions as could be fitted into one story.

After the wonderful success of Ozma of Oz it is evident that Dorothy has become a firm fixture in these Oz stories. The little ones all love Dorothy, and as one of my small friends aptly states It isn't a real Oz story without her. So here she is again, as sweet and gentle and innocent as ever, I hope, and the heroine of another strange adventure.

There were many requests from my little correspondents for more about the Wizard. It seems the jolly old fellow made hosts of friends in the first Oz book, in spite of the fact that he frankly acknowledged himself a humbug. The children had heard how he mounted into the sky in a balloon and they were all waiting for him to come down again. So wha

In [78]:
chars = sorted(list(set(text)))
print(chars)

vocab_size = len(chars)
print(vocab_size)

['\n', ' ', '!', "'", '(', ')', ',', '-', '.', ';', '?', 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z', '—']
64


In [79]:
string_to_int = {char: i for i, char in enumerate(chars)}
int_to_string = {i: char for i, char in enumerate(chars)}

encode = lambda s: [string_to_int[c] for c in s]
decode = lambda l: ''.join(int_to_string[i] for i in l)

# Encoding entire text within wizard_of_oz.txt
data = torch.tensor(encode(text), dtype=torch.long)
print(data[:50])

tensor([14, 51, 54, 51, 56, 44, 61,  1, 37, 50, 40,  1, 56, 44, 41,  1, 33, 45,
        62, 37, 54, 40,  1, 45, 50,  1, 25, 62,  0,  0,  1, 11,  1, 16, 37, 45,
        56, 44, 42, 57, 48,  1, 28, 41, 39, 51, 54, 40,  1, 51])


In [80]:
# Hyperparams
block_size = 8
batch_size = 32

n_embed = 384
n_layer = 4
n_head = 3
dropout = 0.2
learning_rate = 2e-4 

In [81]:
# Split 80:20 for train and val
n = int(0.8 * len(data))

train_data, val_data = data[:n], data[n:]
print("Sample_size:", len(train_data), len(val_data))

def get_batch(data, batch_size, block_size):
    """ Batch Generator """
    num_batches = len(data) // batch_size   # Ensure complete batches
    indices = list(range(num_batches * batch_size))
    random.shuffle(indices)                 # Shuffle for randomness

    for i in range(0, len(indices), batch_size):
        batch_indices = indices[i:i + batch_size]
        batch_indices = [idx % (len(data) - block_size) for idx in batch_indices]

        # Generate Input and Target Batch (character level token)
        x = torch.stack([data[i:i + block_size] for i in batch_indices])            # Input
        y = torch.stack([data[i + 1:i + block_size + 1] for i in batch_indices])    # Targets: Input Shift right by 1

        # Push batches to device (preferrebly to CUDA)
        yield x.to(device), y.to(device)

Sample_size: 194961 48741


### GPT Model (Decoder Only)

In [82]:
class FeedForward(nn.Module):
    """ Linear Layers follwed by non-linearity """
    def __init__(self, n_embed, dropout=0.2):
        super().__init__()
        self.linear_layers = nn.Sequential(
            nn.Linear(n_embed, 4 * n_embed),
            nn.ReLU(),
            nn.Linear(4 * n_embed, n_embed),
            nn.Dropout(dropout),
        ) 

    def forward(self, x):
        return self.linear_layers(x)

**Scaled Dot-Product Attention**

$$w = \frac{q \cdot k^T}{\sqrt{d_k}}$$

where:  
- \( w \) is the attention score matrix.  
- \( q \) (query) and \( k \) (key) are transformed input embeddings.  
- \( d_k \) is the **dimensionality of the key vectors** (i.e., `head_size`).  
- The scaling factor **$\frac{1}{\sqrt{d_k}}$** helps control the magnitude of the dot product.  


In [83]:
class Head(nn.Module):
    """ Head for Self-Attention (Scaled-Dot Product) """
    def __init__(self, head_size):
        super().__init__()
        self.key = nn.Linear(n_embed, head_size, bias=False)
        self.query = nn.Linear(n_embed, head_size, bias=False)
        self.value = nn.Linear(n_embed, head_size, bias=False)

        # NOTE: Register a lower triangular matrix as a buffer (used for masking future tokens in self-attention)
        # It’s non-trainable, included in state_dict(), and avoids recomputation. 
        self.register_buffer("tril", torch.tril(torch.ones(block_size, block_size)))

        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        # Input: (B, T, C) -> Output: (B, T, head_size)
        B, T, C = x.shape                   # Unpack Input Dimensions
        k, q = self.key(x), self.query(x)   # (B, T, head_size)

        # Compute attn_scores(attn_weights) [creating q @ transposed k grid matrix]
        w = q @ k.transpose(-2, -1) * k.shape[-1]**-0.5             # (B, T, head_size) @ # (B, head_size, T) -> (B, T, T)
        w = w.masked_fill(self.tril[:T, :T] == 0, float("-inf"))   # (B, T, T)
        w = F.softmax(w, dim=-1)                                    # (B, T, T)
        w = self.dropout(w)

        # Perform the weighted aggregation of the values
        v = self.value(x)                   # (B, T, head_size)
        out = w @ v                         # (B, T, T) @ # (B, T, head_size) -> (B, T, head_size)
        return out
        

class MultiHeadAttention(nn.Module):
    """ Multiple heads of Attn in Parallel """
    def __init__(self, num_heads, head_size, dropout=0.2):
        super().__init__()
        self.heads = nn.ModuleList([Head(head_size) for _ in range(num_heads)])     # Create heads in parallel
        self.proj = nn.Linear(head_size * num_heads, n_embed)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        # (B, T, C) -> Concat feature(last_dim): (B, T, [h0_1, h0_2, h0_3, h0_4, h1_1, h1_2, h1_3, h1_4, h2_1, h2_2, h2_3, h2_4])
        out = torch.cat([h(x) for h in self.heads], dim=-1)

        out = self.dropout(self.proj(out))
        return out

In [84]:
class Block(nn.Module):
    """ Transformer Blocks """
    def __init__(self, n_embed, n_head):
        super().__init__()
        head_size = n_embed // n_head   # Head_size to capture features
        self.self_attn = MultiHeadAttention(n_head, head_size)
        self.feed_forward = FeedForward(n_embed)
        self.lnorm1 = nn.LayerNorm(n_embed)
        self.lnorm2 = nn.LayerNorm(n_embed)

    def forward(self, x):
        y = self.self_attn(x)
        x = self.lnorm1(x+y)
        y = self.feed_forward(x)
        x = self.lnorm2(x+y)
        return x


In [None]:
# GPT Model
class GPTModel(nn.Module):
    def __init__(self, vocab_size, block_size, n_embed, n_head, n_layer):
        super().__init__()
        self.token_embeddings = nn.Embedding(vocab_size, n_embed)
        self.positional_embeddings = nn.Embedding(block_size, n_embed)

        self.decoder_blocks = nn.Sequential(*[Block(n_embed, n_head=n_head) for _ in range(n_layer)])

        self.final_layer = nn.Linear(n_embed, vocab_size)
        self.final_layernorm = nn.LayerNorm(n_embed)

        self.apply(self.__init_weights)

    
    def __init_weights(self, module):
        """ 
        Initialize proper (gaussian distribution) weights for stable training and convergence
        Docs: https://pytorch.org/docs/stable/nn.init.html#torch.nn.init.normal_
        """
        if isinstance(module, nn.Linear):
            # Initializes weights with a normal (Gaussian) distribution
            torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)
            if module.bias is not None:
                # Set the biases to zero
                torch.nn.init.zeros_(module.bias)
        elif isinstance(module, nn.Embedding):
            # Initializes embeddings weights with a normal (Gaussian) distribution
            torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)


    def forward(self, index, targets = None):
        B, T = index.shape
        # Index and targets are both (B, T) tokens of integers
        token_embed = self.token_embeddings(index)

        # torch.arange(T) -> list of indices
        pos_embed = self.positional_embeddings(torch.arange(T, device=device))   # (T, C)
        x = token_embed + pos_embed     # (B, T, C)
        x = self.decoder_blocks(x)      # (B, T, C)
        x = self.final_layernorm(x)     # (B, T, C)
        logits = self.final_layer(x)    # (B, T, vocab_size)

        if targets is None:
            loss = None
        else:
            # Unpack logits shape to batch, seq_len, class
            B, T, C = logits.shape
            # Reshape 3D logits -> 2D logits
            logits = logits.view(B*T, C)
            targets = targets.view(B*T)

            # Compute loss fn
            loss = F.cross_entropy(logits, targets)

        return logits, loss


    def generate(self, index, max_new_tokens):
        # index is (B, T) array of indices in the current context
        for _ in range(max_new_tokens):
            logits, _ = self.forward(index)

            # Take only last time step
            logits = logits[:, -1, :]   # (B, C)

            # Apply softmax to get probs
            probs = F.softmax(logits, dim=-1)

            # Sample from distribution
            index_next = torch.multinomial(probs, num_samples=1)     # (B, 1)

            # Append sampled index to the running sequence
            index = torch.cat((index, index_next), dim=1)   # (B, T+1)

        return index

In [89]:
model = GPTModel(vocab_size, block_size, n_embed, n_head, n_layer).to(device)
model

GPTModel(
  (token_embeddings): Embedding(64, 384)
  (positional_embeddings): Embedding(8, 384)
  (decoder_blocks): Sequential(
    (0): Block(
      (self_attn): MultiHeadAttention(
        (heads): ModuleList(
          (0-2): 3 x Head(
            (key): Linear(in_features=384, out_features=128, bias=False)
            (query): Linear(in_features=384, out_features=128, bias=False)
            (value): Linear(in_features=384, out_features=128, bias=False)
            (dropout): Dropout(p=0.2, inplace=False)
          )
        )
        (proj): Linear(in_features=384, out_features=384, bias=True)
        (dropout): Dropout(p=0.2, inplace=False)
      )
      (feed_forward): FeedForward(
        (linear_layers): Sequential(
          (0): Linear(in_features=384, out_features=1536, bias=True)
          (1): ReLU()
          (2): Linear(in_features=1536, out_features=384, bias=True)
          (3): Dropout(p=0.2, inplace=False)
        )
      )
      (lnorm1): LayerNorm((384,), eps=1e-0

In [91]:
context = torch.zeros((1, 1), dtype=torch.long, device=device)
generated_chars = decode(model.generate(context, max_new_tokens=32)[0].tolist())
print(generated_chars)


Hxm?lsDItPQp!-.s(S)l;XXU—hg!fXeV


In [92]:
optimizer = torch.optim.AdamW(model.parameters(), lr=2e-5)
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=20, gamma=0.8)  # Decay LR
epochs = 100

In [93]:
for epoch in range(epochs):
    model.train()
    total_train_loss = 0
    num_batches = 0

    for x_train, y_train in get_batch(train_data, batch_size, block_size):
        logits, loss = model(x_train, y_train)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        total_train_loss += loss.item()
        num_batches += 1

    avg_train_loss = total_train_loss / num_batches

    model.eval()
    total_val_loss = 0
    num_val_batches = 0

    with torch.no_grad():
        for x_val, y_val in get_batch(val_data, batch_size, block_size):
            _, val_loss = model(x_val, y_val)
            total_val_loss += val_loss.item()
            num_val_batches += 1

    avg_val_loss = total_val_loss / num_val_batches
    print(f"Epoch {epoch + 1}/{epochs} | Train Loss: {avg_train_loss:.4f} | Validation Loss: {avg_val_loss:.4f}")

    scheduler.step()

print("Training Complete!")

KeyboardInterrupt: 

In [None]:
context = torch.zeros((1, 1), dtype=torch.long, device=device)
print(context)

generated_chars = decode(model.generate(context, max_new_tokens=32)[0].tolist())
print(generated_chars)

tensor([[0]], device='cuda:0')

Thes fo ssad he, thtond tazm s, 


In [None]:
sample_text = "wizard of oz "
sample_context = torch.tensor(encode(sample_text), dtype=torch.long, device=device).unsqueeze(0)
generated_chars = decode(model.generate(sample_context, max_new_tokens=8)[0].tolist())
print(generated_chars)

wizard of oz at beand
