# Implementing GPT-2 model:

In [1]:
GPT_CONFIG_124M = {
  "vocab_size": 50257, # Vocabulary size
  "context_length": 1024, # Context length
  "emb_dim": 768, # Embedding dimension
  "n_heads": 12, # Number of attention heads
  "n_layers": 12, # Number of layers
  "drop_rate": 0.1, # Dropout rate
  "qkv_bias": False # Query-Key-Value bias
}

In [2]:
import torch
from torch import nn

In [3]:
class DummyGPTModel(nn.Module):
  def __init__(self, cfg_map):
    super().__init__()

    # embedding components
    self.emb_layer = nn.Embedding(cfg_map['vocab_size'], cfg_map['emb_dim'])
    self.pos_emb_layer = nn.Embedding(cfg_map['context_length'], cfg_map['emb_dim'])

    # huh
    self.dropout = nn.Dropout(cfg_map['drop_rate'])

    # transformer
    self.trfm_block = nn.Sequential(*[TransformerBlock(cfg_map) for i in range(cfg_map['n_layers'])])

    self.final_norm = LayerNorm(cfg_map['emb_dim'])

    # convert to logits
    self.out_head = nn.Linear(cfg_map['emb_dim'], cfg_map['vocab_size'], bias = False)

  def forward(self, in_idx):
    batch_size, seq_len = in_idx.shape
    tok_embed = self.emb_layer(in_idx)
    pos_embed = self.pos_emb_layer(torch.arange(seq_len, device = in_idx.device))
    x = tok_embed + pos_embed
    x = self.dropout(x)
    x = self.trfm_block(x)
    x = self.final_norm(x)

    logits = self.out_head(x)

    return logits

class TransformerBlock(nn.Module):
  def __init__(self, cfg_map):
    super().__init__()
    self.norm_1 = LayerNorm(cfg_map['emb_dim'])
    self.multihead_attention = MultiheadAttention(cfg_map['emb_dim'], cfg_map['emb_dim'], cfg_map['drop_rate'], cfg_map['context_length'], cfg_map['n_heads'])
    self.dropout = nn.Dropout(cfg_map['drop_rate'])
    self.norm_2 = LayerNorm(cfg_map['emb_dim'])
    self.ffw = FeedForward(cfg_map)
  def forward(self, x):
    shortcut_x  = x
    x = self.norm_1(x)
    x = self.multihead_attention(x)
    x = self.dropout(x)
    x = shortcut_x + x

    shortcut_x = x
    x = self.norm_2(x)
    x = self.ffw(x)
    x = self.dropout(x)
    x = x + shortcut_x
    return x

class LayerNorm(nn.Module):
  def __init__(self, emb_dim, eps=1e-5):
    super().__init__()
    self.epsilon = eps

    # learnable params to tweak the layer norm
    self.scale = nn.Parameter(torch.ones(emb_dim))
    self.shift = nn.Parameter(torch.zeros(emb_dim))

  def forward(self, x):
    mean = torch.mean(x, dim = -1, keepdim = True)
    var = torch.var(x, dim = -1, keepdim = True, correction = False)
    norm_x = (x - mean) / (var + self.epsilon)**0.5
    return self.scale * norm_x + self.shift

class GELULayer(nn.Module):
  def __init__(self):
    super().__init__()

  def forward(self, x):
    return 0.5 * x * (1 + torch.tanh((2/torch.pi)**0.5 * (x + 0.044715 * x**3)))

class FeedForward(nn.Module):
  def __init__(self, cfg):
    super().__init__()
    self.layers = nn.Sequential(
        nn.Linear(cfg['emb_dim'], 4 * cfg['emb_dim']), # domain expansion
        GELULayer(), # just gelu for non-linear
        nn.Linear(4 * cfg['emb_dim'], cfg['emb_dim']), # domain contraction
    )

  def forward(self, x):
    return self.layers(x)

class MultiheadAttention(nn.Module):
  def __init__(self, d_in, d_out, drop_out_rate, context_length, num_heads, ena_bias = False):
    super().__init__()

    assert (d_out % num_heads == 0), \
      "d_out must be divisible by num_heads"

    self.d_in = d_in
    self.d_out = d_out
    self.num_heads = num_heads
    self.head_dim = self.d_out // self.num_heads

    self.W_Q = nn.Linear(d_in, d_out, bias = ena_bias)
    self.W_K = nn.Linear(d_in, d_out, bias = ena_bias)
    self.W_V = nn.Linear(d_in, d_out, bias = ena_bias)

    # projection?
    self.out_proj = nn.Linear(d_out, d_out)

    self.drop_out_layer = nn.Dropout(drop_out_rate)
    self.register_buffer('mask', torch.triu(torch.ones(context_length, context_length), diagonal = 1))

  def forward(self, x):

    batch, num_tokens, d_in = x.shape

    queries = self.W_Q(x)
    keys = self.W_K(x)
    values = self.W_V(x)

    queries = queries.view(batch, num_tokens, self.num_heads, self.head_dim)
    keys = keys.view(batch, num_tokens, self.num_heads, self.head_dim)
    values = values.view(batch, num_tokens, self.num_heads, self.head_dim)

    queries = queries.transpose(1,2)
    keys = keys.transpose(1,2)
    values = values.transpose(1,2)

    attention_score = queries @ keys.transpose(2, 3)

    mask_bool = self.mask.bool()[:num_tokens, :num_tokens]
    attention_score.masked_fill_(mask_bool, -torch.inf)
    attention_score = attention_score / self.head_dim**0.5
    attention_weight = torch.softmax(attention_score, dim = -1)
    attention_weight = self.drop_out_layer(attention_weight)

    context_vectors = (attention_weight @ values).transpose(1, 2)

    context_vectors = context_vectors.contiguous().view(batch, num_tokens, self.d_out)

    # combs for learning relationship of head's results
    context_vectors = self.out_proj(context_vectors)

    return context_vectors


In [4]:
# this block of code for demonstrating the shortcut connection procedure
class ExampleNeuralNetwork(nn.Module):
  def __init__(self, shortcut_connection, size_arr):
    super().__init__()
    self.shortcut_connection = shortcut_connection
    self.layers = nn.Sequential(
        nn.Linear(size_arr[0], size_arr[1], GELULayer()),
        nn.Linear(size_arr[1], size_arr[2], GELULayer()),
        nn.Linear(size_arr[2], size_arr[3], GELULayer()),
        nn.Linear(size_arr[3], size_arr[4], GELULayer()),
        nn.Linear(size_arr[4], size_arr[5], GELULayer())
    )

  def forward(self, x):
    for layer in self.layers:
      x_out = layer(x)

      if self.shortcut_connection and x.shape == x_out.shape:
        x = x_out + x
      else:
        x = x_out
    return x

In [5]:
def print_gradient(model, x):
  output = model(x)
  target = torch.tensor([[0.]])

  loss = nn.MSELoss()
  loss = loss(output, target)

  loss.backward()

  for name, param in model.named_parameters():
    if 'weight' in name:
      print(f"{name} has gradient mean of {param.grad.abs().mean().item()}")

In [6]:
size_arr = [3, 3, 3, 3, 3, 1]
sample_input = torch.tensor([[1., 0., -1.]])

torch.manual_seed(42)

model_without_connection = ExampleNeuralNetwork(shortcut_connection=False, size_arr = size_arr)
print_gradient(model_without_connection, sample_input)

print("------------------------------------------------------------------------------------")

model_with_connection = ExampleNeuralNetwork(shortcut_connection=True, size_arr = size_arr)
print_gradient(model_with_connection, sample_input)

layers.0.weight has gradient mean of 0.010758272372186184
layers.1.weight has gradient mean of 0.02833496779203415
layers.2.weight has gradient mean of 0.031643930822610855
layers.3.weight has gradient mean of 0.12355596572160721
layers.4.weight has gradient mean of 0.18377549946308136
------------------------------------------------------------------------------------
layers.0.weight has gradient mean of 0.08325278759002686
layers.1.weight has gradient mean of 0.02883332222700119
layers.2.weight has gradient mean of 0.06909593939781189
layers.3.weight has gradient mean of 0.13771231472492218
layers.4.weight has gradient mean of 1.082464575767517


In [7]:
# this only for testing dumb gpt
import tiktoken

tokenizer = tiktoken.get_encoding("gpt2")

batch = []

input_1 = "hello, how are u?"
input_2 = "plato is drinking water."

batch.append(torch.tensor(tokenizer.encode(input_1)))
batch.append(torch.tensor(tokenizer.encode(input_2)))

batch = torch.stack(batch, dim = 0)

print(batch)

torch.manual_seed(123)

dumb_gpt = DummyGPTModel(GPT_CONFIG_124M)

logits = dumb_gpt(batch)

logits
print(logits.shape)


tensor([[31373,    11,   703,   389,   334,    30],
        [  489,  5549,   318,  7722,  1660,    13]])
torch.Size([2, 6, 50257])


In [8]:
# testing normalization layer
torch.manual_seed(42)
toy_input = torch.randn(3, 10)

print("Mean before normalize:")
print(torch.mean(toy_input, dim = -1, keepdim = True))

print("Var before normalize:")
print(torch.var(toy_input, dim = -1, keepdim = True, correction = False))


norm_layer = LayerNorm(10)
norm_input = norm_layer(toy_input)

print("Mean after normalize:")
print(torch.mean(norm_input, dim = -1, keepdim = True))

print("Var after normalize:")
print(torch.var(norm_input, dim = -1, keepdim = True, correction = False))

Mean before normalize:
tensor([[ 0.0902],
        [-0.6379],
        [-0.1798]])
Var before normalize:
tensor([[1.8933],
        [0.9988],
        [1.1437]])
Mean after normalize:
tensor([[ 0.0000e+00],
        [-2.5332e-08],
        [ 8.9407e-09]], grad_fn=<MeanBackward1>)
Var after normalize:
tensor([[1.0000],
        [1.0000],
        [1.0000]], grad_fn=<VarBackward0>)


In [9]:
# testing feed forward layer
ffn = FeedForward(GPT_CONFIG_124M)
x = torch.rand(2, 3, 768) #1
out = ffn(x)
print(out.shape)

torch.Size([2, 3, 768])


In [10]:
# testing transformer block
torch.manual_seed(123)
x = torch.rand(2, 4, 768) #1
block = TransformerBlock(GPT_CONFIG_124M)
output = block(x)
print("Input shape:", x.shape)
print("Output shape:", output.shape)

Input shape: torch.Size([2, 4, 768])
Output shape: torch.Size([2, 4, 768])
