# 4 Implementing a GPT model from scratch to generate text

# 4.1 Coding an LLM architecture

In [4]:
GPT_CONFIG_128M = {
    "vocab_size": 500257,      # vocabulary size
    "context_length": 1024,    #context length
    "emb_dim": 768,            # Embedding dimension
    "n_heads": 12,             # number of attention head
    "n_layers": 12,            # number of layers
    "drop_rate": 0.1,          # dropout rate
    "qkv_bias":False           # Query-key-value bias
}
cfg = GPT_CONFIG_128M

In [3]:
import torch
import torch.nn as nn

In [40]:
import torch
import torch.nn as nn


class DummyGPTModel(nn.Module):
  def __init__(self, cfg):
    super().__init__()
    self.tok_emb =nn.Embedding(cfg["vocab_size"],cfg["emb_dim"])
    self.pos_emb =nn.Embedding(cfg["context_length"],cfg["emb_dim"])
    self.drop_emb = nn.Dropout(cfg["drop_rate"])

    # Use a placeholder for TransformerBlock
    self.trf_blocks = nn.Sequential(
        *[DummyTransformerBlock(cfg) for _ in range (cfg["n_layers"])])

    # Use a place holder for LayerNorm
    self.final_norm = LayerNorm(cfg["emb_dim"])
    self.out_head = nn.Linear(cfg["emb_dim"], cfg["vocab_size"], bias=False)


  def forward(self, in_idx):
    batch_size, seq_len, = in_idx.shape
    tok_embeds = self.tok_emb(in_idx)
    pos_embeds = self.pos_emb(torch.arange(seq_len, device=in_idx.device))
    x = tok_embeds + pos_embeds
    x = self.drop_emb(x)
    x = self.trf_blocks(x)
    x = self.final_norm(x)
    logits = self.out_head(x)
    return logits

In [5]:
  class DummyTransformerBlock(nn.Module):
     def __init__(self, cfg):
         super().__init__()
        # A simple placeholder


     def forward(self,x):
      # This block does nothing and returns its input
        return x


In [6]:
  class DummyLayerNorm(nn.Module):
      def __init__(self, normalized_shape, eps=1e-5):
        super().__init__()
        # Parameters here are just mimic the LayerNorm interface

      def forward(self, x):
        # This block does nothing and returns its input
        return x

In [7]:
  import tiktoken


  tokenizer = tiktoken.get_encoding("gpt2")
  batch = []
  txt1 = "Every effort moves you"
  txt2 = "Every day holds a"

  batch.append(torch.tensor(tokenizer.encode(txt1)))
  batch.append(torch.tensor(tokenizer.encode(txt2)))
  batch = torch.stack(batch,dim=0)
  print(batch)


tensor([[6109, 3626, 6100,  345],
        [6109, 1110, 6622,  257]])


In [10]:
torch.manual_seed(123)
model = DummyGPTModel(cfg)
logits = model(batch)
print("output shape:",logits.shape)
print(logits)

output shape: torch.Size([2, 4, 500257])
tensor([[[ 0.2593, -0.9350, -0.6097,  ...,  1.2670, -0.8415,  0.4850],
         [-0.1641, -0.5332,  1.0188,  ...,  0.3370,  0.3371,  0.8372],
         [-0.4035,  1.2549,  1.3933,  ...,  0.3039, -1.2841,  0.3997],
         [ 0.3540,  1.3452, -0.3613,  ...,  0.1178, -1.0502, -0.4579]],

        [[ 0.6703, -0.7034, -0.2323,  ...,  1.1014, -1.0304,  0.7107],
         [-1.6335, -0.7888,  0.0065,  ..., -0.1306, -0.0326,  0.1084],
         [-0.3380,  1.7104,  0.5594,  ...,  0.2230, -0.7844, -0.0983],
         [-0.3292,  0.3263,  1.0420,  ...,  0.2344,  0.7905,  0.5320]]],
       grad_fn=<UnsafeViewBackward0>)


# 4.2 Normalizing activations with layer normalization

In [18]:
torch.manual_seed(123)

batch_example = torch.randn(2,5)
batch_example

tensor([[-0.1115,  0.1204, -0.3696, -0.2404, -1.1969],
        [ 0.2093, -0.9724, -0.7550,  0.3239, -0.1085]])

In [19]:
layer = nn.Sequential(nn.Linear(5,6),nn.ReLU())
out = layer(batch_example)


In [20]:
out

tensor([[0.2260, 0.3470, 0.0000, 0.2216, 0.0000, 0.0000],
        [0.2133, 0.2394, 0.0000, 0.5198, 0.3297, 0.0000]],
       grad_fn=<ReluBackward0>)

In [27]:
mean = out.mean(dim=-1, keepdim=True)
mean

tensor([[0.1324],
        [0.2170]], grad_fn=<MeanBackward1>)

In [28]:
var = out.var(dim=-1, keepdim=True)
var

tensor([[0.0231],
        [0.0398]], grad_fn=<VarBackward0>)

In [32]:
(out - mean).mean(dim=1)

tensor([    -0.0000,      0.0000], grad_fn=<MeanBackward1>)

In [30]:
torch.set_printoptions(sci_mode=False)

In [33]:
normed = ((out - mean) /torch.sqrt(var))
normed.var(dim=1, keepdim=True)

tensor([[1.0000],
        [1.0000]], grad_fn=<VarBackward0>)

In [10]:
class LayerNormal(nn.Module):
  def __init__(self, emb_dim):
    super().__init__()
    self.esp = 1e-5
    self.scale = nn.Parameter(torch.ones(emb_dim))
    self.shift = nn.Parameter(torch.zeros(emb_dim))

  def forward(self, x):
    mean = x.mean(dim=-1, keepdim=True)
    var = x.var(dim=-1, keepdim=True, unbiased=True)
    norm_x = ((x - mean) / torch.sqrt(var))
    return self.scale * ((x - mean) / torch.sqrt(var +self.esp)) + self.shift

In [39]:
ln = LayerNormal(6)
outputs_normed = ln(out)
outputs_normed

tensor([[ 0.6157,  1.4123, -0.8717,  0.5871, -0.8717, -0.8717],
        [-0.0189,  0.1121, -1.0875,  1.5171,  0.5647, -1.0875]],
       grad_fn=<AddBackward0>)

# 4.3 Implementing a feed forward network with GELU activations

In [47]:
class GELU (nn.Module):
  def __init__(self):
    super().__init__()

    def forward(self, x):
      return 0.5 * x * (1 + torch.tanh(torch.sqrt(2 / torch.pi) *
       (x + 0.044715 * torch.pow(x, 3))))

In [9]:
class FeedForward(nn.Module):
  def __init__(self,cfg):
    super().__init__()
    self.layer = nn.Sequential(nn.Linear(cfg["emb_dim"], 4*cfg["emb_dim"]),
    nn.GELU(),
    nn.Linear(4*cfg["emb_dim"], cfg["emb_dim"]),)


  def forward(self,x):

    return self.layer(x)


In [44]:
ffn = FeedForward(cfg)


In [46]:
x= torch.randn(2,3,768)
ffn(x).shape

torch.Size([2, 3, 768])

# 4.4 Adding shortcut connection

In [64]:
class ExampleDeepNeuralNetwork(nn.Module):
  def __init__(self, layer_size, use_shortcut):
    super().__init__()
    self.use_shortcut = use_shortcut
    self.layers = nn.ModuleList([
         nn.Sequential(nn.Linear(layer_size[i], layer_size[i+1]), nn.GELU())
         for i in range(len(layer_size) - 1)
          ])
  def forward(self, x):
    for layer in self.layers:
      layer_output = layer(x)
      #  Check if shortcut can be applied
      if self.use_shortcut and x.shape == layer_output.shape:
        x = x + layer_output

      else:
        x = layer_output
    return x


def print_gradients(model,x):
    # forward pass
    output = model(x)
    target = torch.tensor(0.)

    # calculate loss based on how close the target and output are
    loss = nn.MSELoss()
    loss = loss(output, target)

    # Backward pass to calculate the gradients
    loss.backward()


    for name, param in model.named_parameters():
      if 'weight' in name:
        # print mean absolute gradient of the weights
        print(f"{name} has gradient mean of {param.grad.abs().mean().item()}")

In [52]:
layer_size = [3,3,3,3,3,1]
x= torch.tensor([1., 0.,-1])

In [65]:
torch.manual_seed(123)
model_without_shortcut = ExampleDeepNeuralNetwork(layer_size, use_shortcut=False)

print_gradients(model_without_shortcut, x)

layers.0.0.weight has gradient mean of 0.00020174118981231004
layers.1.0.weight has gradient mean of 0.00012011769285891205
layers.2.0.weight has gradient mean of 0.0007152436301112175
layers.3.0.weight has gradient mean of 0.00139885104727
layers.4.0.weight has gradient mean of 0.005049602594226599


In [66]:
torch.manual_seed(123)
model_without_shortcut = ExampleDeepNeuralNetwork(layer_size, use_shortcut=True)

print_gradients(model_without_shortcut, x)

layers.0.0.weight has gradient mean of 0.22186800837516785
layers.1.0.weight has gradient mean of 0.20709273219108582
layers.2.0.weight has gradient mean of 0.3292388319969177
layers.3.0.weight has gradient mean of 0.2667772173881531
layers.4.0.weight has gradient mean of 1.3268063068389893


# 4.5 Connecting attention and layers in a transformer block

In [7]:
class TransformerBlock(nn.Module):
  def __init__(self, cfg):
    super().__init__()
    self.att = MultiHeadAttention(
        d_in = cfg["emb_dim"],
        d_out = cfg["emb_dim"],
        context_length =cfg["context_length"],
        num_heads=cfg["n_heads"],
        dropout = cfg["drop_rate"],
        qkv_bias = cfg["qkv_bias"]
        )
    self.ffn = FeedForward(cfg)
    self.norm1 = LayerNormal(cfg["emb_dim"])
    self.norm2 = LayerNormal(cfg["emb_dim"])
    self.drop_out = nn.Dropout(cfg["drop_rate"])


  def forward(self, x):
    shortcut = x
    x = self.norm1(x)
    x =self.att(x)
    x = self.drop_out(x)
    x = x + shortcut
    x = self.norm2(x)
    x = self.ffn(x)
    x = self.drop_out(x)
    x = x + shortcut
    return x



In [71]:
torch.manual_seed(123)
x = torch.randn(2,4, 768)
block = TransformerBlock(cfg)
block(x).shape

torch.Size([2, 4, 768])

from chapter 3

In [8]:
class MultiHeadAttention(nn.Module):
  def __init__(self,d_in, d_out,context_length, dropout, num_heads=2, qkv_bias=False):
    super().__init__()
    assert(d_out % num_heads == 0), \
        "d_out must be divisible by num_heads"

    self.d_out = d_out
    self.num_heads = num_heads
    self.head_dim = d_out // num_heads # reduce the projection dim to match desired output dim

    self.W_query = torch.nn.Linear(d_in, d_out, bias=qkv_bias)
    self.W_key = torch.nn.Linear(d_in, d_out, bias=qkv_bias)
    self.W_value = torch.nn.Linear(d_in, d_out, bias=qkv_bias)
    self.out_proj = torch.nn.Linear(d_out, d_out)
    self.dropout = torch.nn.Dropout(dropout)
    self.register_buffer("mask", torch.triu(torch.ones(context_length, context_length), diagonal=1))


  def forward(self, x):
    b, num_tokens, d_in = x.shape # Shape : (b, num_tokens, d_out)
    queries = self.W_query(x)
    keys = self.W_key(x)
    values = self.W_value(x)

    # we implicitly split the matrix by adding a ` num_heads` dimention
    # unroll llast dim : (b, num_tokens, d_out) -->(b, num_tokens,  head_dim)

    keys = keys.reshape(b, num_tokens, self.num_heads, self.head_dim)
    queries = queries.reshape(b, num_tokens, self.num_heads, self.head_dim)
    values = values.reshape(b, num_tokens, self.num_heads, self.head_dim)

    #Transpose: ((b, num_tokens, num_heads, head_dim) -->(b, num_heads, num_tokens,  head_dim)
    keys = keys.transpose(1,2)
    queries = queries.transpose(1,2)
    values = values.transpose(1,2)

    # Compute scaled dot-product attention (selfattention) with a causal mask

    attn_scores = queries @ keys.transpose(2,3) # dot product for each head

    # Original mask truncated to the number of tokens and converted to boolean

    mask_bool = self.mask[:num_tokens, :num_tokens].bool()

    # Use the mask to fill attention scores
    attn_scores.masked_fill_(mask_bool, -torch.inf)
    attn_weights = torch.softmax(attn_scores / keys.shape[-1]**0.5,dim=-1)
    attn_weights = self.dropout(attn_weights)

    # Shape: (b, num_tokens, num_heads, head_dim)
    context_vec = (attn_weights @ values).transpose(1,2)

    # Combine heads, where self.d_out = self.num_heads * self.head_dim
    context_vec = context_vec.reshape(b, num_tokens, self.d_out)

    contex_vec = self.out_proj(context_vec) # optional projection

    return context_vec

# 4.6 Coding the GPT model

In [2]:
  import tiktoken


  tokenizer = tiktoken.get_encoding("gpt2")
  batch = []
  txt1 = "Every effort moves you"
  txt2 = "Every day holds a"

  batch.append(torch.tensor(tokenizer.encode(txt1)))
  batch.append(torch.tensor(tokenizer.encode(txt2)))
  batch = torch.stack(batch,dim=0)
  print(batch)


tensor([[6109, 3626, 6100,  345],
        [6109, 1110, 6622,  257]])


In [5]:
GPT_CONFIG_128M = {
    "vocab_size": 500257,      # vocabulary size
    "context_length": 1024,    #context length
    "emb_dim": 768,            # Embedding dimension
    "n_heads": 12,             # number of attention head
    "n_layers": 12,            # number of layers
    "drop_rate": 0.1,          # dropout rate
    "qkv_bias":False           # Query-key-value bias
}
cfg = GPT_CONFIG_128M

In [1]:
import torch
import torch.nn as nn


class GPTModel(nn.Module):
  def __init__(self, cfg):
    super().__init__()
    self.tok_emb =nn.Embedding(cfg["vocab_size"],cfg["emb_dim"])
    self.pos_emb =nn.Embedding(cfg["context_length"],cfg["emb_dim"])
    self.drop_emb = nn.Dropout(cfg["drop_rate"])
    self.trf_blocks = nn.Sequential(
        *[TransformerBlock(cfg) for _ in range (cfg["n_layers"])])

    self.final_norm = LayerNormal(cfg["emb_dim"])
    self.out_head = nn.Linear(cfg["emb_dim"], cfg["vocab_size"], bias=False)


  def forward(self, in_idx):
    batch_size, seq_len, = in_idx.shape
    tok_embeds = self.tok_emb(in_idx)
    pos_embeds = self.pos_emb(torch.arange(seq_len, device=in_idx.device))
    x = tok_embeds + pos_embeds
    x = self.drop_emb(x)
    x = self.trf_blocks(x)
    x = self.final_norm(x)
    logits = self.out_head(x)
    return logits

In [11]:
torch.manual_seed(123)

model = GPTModel(cfg)
out = model(batch)

In [12]:
out.shape

torch.Size([2, 4, 500257])