# GPT Model

In this notebook we will implement the GPT-2 model. 

To start, we implement at dummy class that shows the architecture of the neural network. 

Then, we implement the layers we mocked in the dummy GPT model.

Finally, we put everything back together with a few additions.

In [6]:
import torch
import torch.nn as nn

## Dummy GPT

We implement a dummy GPT class below, with the following architecture:


Architecture:
```
DummyGPT(
  (dropout): Dropout(p=0.1, inplace=False)
  (tokenEmbed): Embedding(50257, 768)
  (posEmbed): Embedding(1024, 768)
  (normLayer): DummyLayerNorm()
  (trfBlocks): Sequential(
    (0): DummyTransformer()
    (1): DummyTransformer()
    (2): DummyTransformer()
    (3): DummyTransformer()
    (4): DummyTransformer()
    (5): DummyTransformer()
    (6): DummyTransformer()
    (7): DummyTransformer()
    (8): DummyTransformer()
    (9): DummyTransformer()
    (10): DummyTransformer()
    (11): DummyTransformer()
  )
  (out_head): Linear(in_features=768, out_features=50257, bias=False)
)
```

The model will be initialized using a configuration dictionary that will contain the number of layers and other model parameters. 

We use the parameters from GPT-2 for reference:

In [None]:
cfg = {
    "vocab_size": 50257, # Number of tokens in vocabulary.
    "context_length": 256, # Max. number of tokens the LLM sees per run.
    "emb_dim": 768, # Size of the internal. embeddings used in the LLM attention mechanism.
    "n_layers" : 12, # Number of transformer layers.
    "drop_rate": 0.1, # Feature dropout rate.
    "n_heads": 12, # Num. Attention heads per multi-head attention block
    "qkv_bias" : False,
}


In [8]:
class DummyLayerNorm(nn.Module):
    def __init__(self):
        super().__init__()

    def forward(self, x):
        return x

class DummyTransformer(nn.Module):
    def __init__(self):
        super().__init__()

    def forward(self, x):
        return x

In [None]:
class DummyGPT(nn.Module):
    def __init__(self, cfg):
        super().__init__()
        # Dropout
        self.dropout = nn.Dropout(cfg["drop_rate"])
        # Token embedding
        self.tokenEmbed = nn.Embedding(cfg["vocab_size"], cfg["emb_dim"])
        # Positional embedding
        self.pos_emb = nn.Embedding(cfg["context_length"], cfg["emb_dim"])
        # Normalization
        self.normLayer = DummyLayerNorm()
        # Transformer Blocks
        self.trf_blocks = nn.Sequential(*[
            DummyTransformer() for _ in range(cfg["n_layers"])
        ])
        # Linear projection.
        self.out_head = nn.Linear(cfg["emb_dim"], cfg["vocab_size"], bias=False)

    def forward(self, x):
        b, seqLen = x.shape
        tokEmbeds = self.tokenEmbed(x)
        posEmbeds = self.pos_emb(torch.arange(seqLen, device=x.device))
        x = tokEmbeds + posEmbeds
        x = self.dropout(x)
        x = self.normLayer(x)
        for l in self.trf_blocks:
            x = l(x)
        return self.out_head(x)

In [10]:
# Initialize model.
dummyGpt = DummyGPT(cfg)
dummyGpt

DummyGPT(
  (dropout): Dropout(p=0.1, inplace=False)
  (tokenEmbed): Embedding(50257, 768)
  (pos_emb): Embedding(256, 768)
  (normLayer): DummyLayerNorm()
  (trf_blocks): Sequential(
    (0): DummyTransformer()
    (1): DummyTransformer()
    (2): DummyTransformer()
    (3): DummyTransformer()
    (4): DummyTransformer()
    (5): DummyTransformer()
    (6): DummyTransformer()
    (7): DummyTransformer()
    (8): DummyTransformer()
    (9): DummyTransformer()
    (10): DummyTransformer()
    (11): DummyTransformer()
  )
  (out_head): Linear(in_features=768, out_features=50257, bias=False)
)

In [11]:
# Define a batch to be used for testing purposes
import tiktoken

tokenizer = tiktoken.get_encoding("gpt2")
batch = []
txt1 = "Every effort moves you"
txt2 = "Every day holds a"
batch.append(torch.tensor(tokenizer.encode(txt1)))
batch.append(torch.tensor(tokenizer.encode(txt2)))
batch = torch.stack(batch, dim=0)

In [None]:
# Initialize model and run model on batch.
# Model isn't trained and all ouputs are gibberish.
torch.manual_seed(123)

logits = dummyGpt(batch)  # shape: [batch, seqLen, vocab_size]
pred_ids = logits.argmax(dim=-1)  # shape: [batch, seqLen]
print(batch)
for i in range(pred_ids.shape[0]):
    print(tokenizer.decode(pred_ids[i].tolist()))


tensor([[6109, 3626, 6100,  345],
        [6109, 1110, 6622,  257]])
private privileDust proclaimed
IsnAPH guided predecessors


## Layer Normalization

Next step is to implement the layer normalization class.

Layer normalization is used to keep a stable model training, by reducing internal co-variate shift.  

Let's explain in more details:

- Covariate: Refers to the input features of a ML model.
- Shift : Distribution change.
- Internal : Refers to the internal covariates, which are the internal activations of the neural networks.


### Why does layer normalization help reduce internal covariate shift ?

While the model is trained, model weights are updated at each training step. Without normalization, each layer is forced to learn the new distribution of the previous layer outputs. Layer normalization helps by making the mean = 0 and variance = 1, maintaining the input feature distribution regardless of the iteration.

### How does it work ?

For an input vector `X = [x1, x2, ..., xn]`, layer normalization works by computing:

`z =  (  xi  - mean(X)  )  /  std(X) `

In practice, layer normalization includes two learnable terms that allow the model to rescale and shift the inputs, learning better representations of the input activations:

`z = γ * (  xi  - mean(X)  )  /  ( std(X) + ε )    +    β `

Where:
- `γ`: Scale
- `ε`: Small Epsilon, avoid division by 0.
- `β`: Shift

In [None]:
class LayerNorm(nn.Module):
    def __init__(self, emb_dim):
        super().__init__()
        # Scale parameter.
        self.scale = nn.Parameter(torch.ones(emb_dim))
        # Shift parameter.
        self.shift = nn.Parameter(torch.zeros(emb_dim))
        # Epsilon constant. Avoids division by zero.
        self.eps = 1e-8
    
    def forward(self, x):
        mean = x.mean(dim=-1, keepdim=True)
        var  = x.var(dim=-1, keepdim=True, unbiased=False) 
        normX = (x - mean) / torch.sqrt(var + self.eps)
        return self.scale * normX + self.shift

In [14]:
# Try LayerNorm.
torch.manual_seed(123)
torch.set_printoptions(sci_mode=False)

batch = torch.randn(2, 5)
print("Mean:\n ", batch.mean(dim=-1))
print("Variance :\n ", batch.var(dim=-1,  unbiased=False))

ln = LayerNorm(5)
out = ln(batch)
print("Norm. Mean:\n ", out.mean(dim=-1))
print("Norm. Variance :\n ", out.var(dim=-1,  unbiased=False))

Mean:
  tensor([-0.3596, -0.2606])
Variance :
  tensor([0.2015, 0.2673])
Norm. Mean:
  tensor([    -0.0000,      0.0000], grad_fn=<MeanBackward1>)
Norm. Variance :
  tensor([1.0000, 1.0000], grad_fn=<VarBackward0>)


## GELU activation

In [15]:
class GELU(nn.Module):
    def __init__(self):
        super().__init__()

    def forward(self, x):
        return 0.5 * x * (1 + torch.tanh(
            torch.sqrt(torch.tensor(2/torch.pi)) * 
            (x + 0.044715 * torch.pow(x,3))
        ))

In [16]:
GELU()(batch)

tensor([[-0.0508,  0.0659, -0.1315, -0.0974, -0.1387],
        [ 0.1220, -0.1610, -0.1700,  0.2031, -0.0496]])

## FeedForward

In [None]:
class FeedForwardModule(nn.Module):
    def __init__(self, cfg):
        super().__init__()
        emb_dim = cfg["emb_dim"]

        # Sequence of layers, linear -> GELU -> linear. 
        # Note that the first linear layer expands the dimension of the input,
        # whereas the second one, reduces the dimension to the original embed-dimension size.
        self.layers = nn.Sequential(
            nn.Linear(emb_dim, 4 * emb_dim),
            GELU(),
            nn.Linear(4 * emb_dim, emb_dim)
        )

    def forward(self, x):
        return self.layers(x)

In [18]:
FeedForwardModule({"emb_dim": 5})(batch)

tensor([[-0.0692,  0.1527, -0.0808,  0.0560,  0.1473],
        [ 0.0020,  0.1303, -0.2325,  0.0055,  0.2685]],
       grad_fn=<AddmmBackward0>)

## Transformer Module

In [19]:
# Load attention modules from previous ipynb.
%run coding-attention-mechanisms.ipynb

tensor([[0.2685, 0.7413],
        [0.2738, 0.7564],
        [0.2668, 0.7366],
        [0.2618, 0.7218],
        [0.2712, 0.7495]], grad_fn=<MmBackward0>)
tensor([[-0.4927, -0.0791],
        [-0.4938, -0.0806],
        [-0.4924, -0.0851],
        [-0.4923, -0.0819],
        [-0.4928, -0.0853]], grad_fn=<MmBackward0>)
Self Attention V2 output: 
 tensor([[-0.4927, -0.0791],
        [-0.4938, -0.0806],
        [-0.4924, -0.0851],
        [-0.4923, -0.0819],
        [-0.4928, -0.0853]], grad_fn=<MmBackward0>)
Self Attention V1 output: 
 tensor([[-0.4927, -0.0791],
        [-0.4938, -0.0806],
        [-0.4924, -0.0851],
        [-0.4923, -0.0819],
        [-0.4928, -0.0853]], grad_fn=<MmBackward0>)
batch.shape: torch.Size([2, 5, 3])
contextVecs.shape: torch.Size([2, 5, 2])
batch.shape: torch.Size([2, 5, 3])
contextVecs.shape: torch.Size([2, 5, 4])
batch.shape: torch.Size([2, 5, 3])
contextVecs.shape: torch.Size([2, 5, 2])
batch.shape: torch.Size([2, 1024, 768])
contextVecs.shape: torch.Size(

In [None]:
class TransformerBlock(nn.Module):
    
    def __init__(self, cfg):
        super().__init__()
        emb_dim = cfg["emb_dim"]
        self.norm1 = LayerNorm(emb_dim)
        self.norm2 = LayerNorm(emb_dim)
        self.dropout = nn.Dropout(cfg["drop_rate"])
        self.ff = FeedForwardModule(cfg)
        self.att = MultiHeadAttention(
            emb_dim, emb_dim, 
            context_length=cfg["context_length"],
            n_heads=cfg["n_heads"],
            dropout=cfg["drop_rate"],
            qkv_bias=cfg["qkv_bias"]
        )

        
    def forward(self, x):
        shortcut = x

        # LayerNorm
        x = self.norm1(x)
        # MultiHeadAttention
        x = self.att(x)
        # Dropout
        x = self.dropout(x)
        # Shortcut
        x = x + shortcut

        shortcut = x
        # LayerNorm
        x = self.norm2(x)
        # Forward
        x = self.ff(x)
        # Dropout
        x = self.dropout(x)
        # Shortcut
        x = x + shortcut

        return x

In [21]:
# Test transformer block
TransformerBlock(cfg)(torch.ones(1,1,768))[:,:,:10]

tensor([[[0.9744, 0.7451, 1.1016, 1.0350, 1.0850, 1.0020, 0.8070, 0.7283,
          1.0585, 1.0846]]], grad_fn=<SliceBackward0>)

# GPT Module

In [None]:
class GPTModel(nn.Module):

    def __init__(self, cfg):
        super().__init__()
        self.tok_emb = nn.Embedding(cfg["vocab_size"], cfg["emb_dim"])
        self.pos_emb = nn.Embedding(cfg["context_length"], cfg["emb_dim"])
        self.trf_blocks = nn.Sequential(*[
            TransformerBlock(cfg) for _ in range(cfg["n_layers"])
        ])
        self.dropout = nn.Dropout(cfg["drop_rate"])
        self.final_norm = LayerNorm(cfg["emb_dim"])
        self.out_head = nn.Linear(cfg["emb_dim"], cfg["vocab_size"], bias=False)

    def forward(self, x):
        # TokEmbed & PosEmbed
        b, seqLen = x.shape
        tokens = self.tok_emb(x)
        pos = self.pos_emb(torch.arange(seqLen, device=x.device))
        x = tokens + pos
        # Dropout
        x = self.dropout(x)
        # Transformer[]
        x = self.trf_blocks(x)
        # LayerNorm
        x = self.final_norm(x)
        # Output heads
        logits = self.out_head(x)
        
        return logits

In [23]:
# Test the GPT Moduel
import tiktoken

tokenizer = tiktoken.get_encoding("gpt2")
batch = []
txt1 = "Every effort moves you"
txt2 = "Every day holds a"
batch.append(torch.tensor(tokenizer.encode(txt1)))
batch.append(torch.tensor(tokenizer.encode(txt2)))
batch = torch.stack(batch, dim=0)

model = GPTModel(cfg)

### Generating Text

- **ArgMax**: Choose the token with the highest score.

- **TopK**: Choose only from the topK token scores.

- **Temperature**: Logits are divided by temperature. A temperature > 1 scales values, making them more likely. A temperature < 1 makes tokens with highest score more likely to be selected

In [None]:
# Define a function that runs the model `max_new_tokens` times.
# It appends the predicted text to the input `idx` text. 
# The output is encoded, it needs to be decoded with the tokenizer.
def generateText(model: nn.Module, idx: tuple, max_new_tokens: int, context_size: int, 
                 temperature : int = 0.0, top_k=None, eos_id=None):
    for i in range(max_new_tokens):
        idxCond = idx[:, -context_size:]

        with torch.no_grad():
            logits = model(idxCond)
        logits = logits[:, -1, :]
               
        # Apply topk if specified.
        if top_k:
            top_logits, _ = torch.topk(logits, top_k)
            min_val = top_logits[:, -1]
            logits = torch.where(logits < min_val, 
                                 torch.tensor(float('-inf')).to(logits.device),
                                 logits)

        if temperature > 0.0:
            logits = logits / temperature
            probas = torch.softmax(logits, dim=-1)
            idx_next = torch.multinomial(probas, num_samples=1)
        else:
            idx_next = torch.argmax(logits, dim=-1, keepdim=True)
        if idx_next == eos_id:
            break
        idx = torch.cat((idx, idx_next), dim=1)
    return idx

In [25]:
# Generate start tensor.
startContext = "Hello, I am"
encoded = tokenizer.encode(startContext)
print("encoded:", encoded)
encodedTensor = torch.tensor(encoded).unsqueeze(0)
print("encoded_tensor.shape:", encodedTensor.shape)

encoded: [15496, 11, 314, 716]
encoded_tensor.shape: torch.Size([1, 4])


In [26]:
# Run GPT and decode outputs.
model.eval()
out = generateText(
    model=model,
    idx=encodedTensor,
    max_new_tokens=6,
    context_size=cfg["context_length"]
)

print("Output:", out)
print("Output length:", len(out[0]))
decodedText = tokenizer.decode(out.squeeze(0).tolist())
print(decodedText)

Output: tensor([[15496,    11,   314,   716, 24111, 43446, 12663, 18650, 28505, 27960]])
Output length: 10
Hello, I amFrench rebirth fundra Oracle Worm Midnight
