# Transofmer Module

##### Parameters

*   **Block size**: Max seq length
*   **Embedding size**: size of the vector used to represent each token.
*   **Number of heads**: No. of hgeads in the multi-head attention
*   **Head size:** the size of each head
*   **Number of blocks** (layers): How many tmies we duplicate the entire block (llokup transformer acrchitecture)

In [1]:
import sys
sys.path.append('..')

## Load Tokenizer

In [2]:
from minbpe import BasicTokenizer

tokenizer = BasicTokenizer()
tokenizer.load(model_file="../output/tokenizer/my_tokenizer.model")

## Helper Function to get the vocab size

In [3]:
def get_vocab_size(tokenizer: BasicTokenizer) -> int:
    vocab = tokenizer.vocab
    special_tokens = tokenizer.special_tokens

    return len(vocab) + len(special_tokens)

## Creating the Model

Based on Andrej Karapathy's implementation of gpt 2

In [4]:
import torch
torch.manual_seed(6464)  # for reproducibility

<torch._C.Generator at 0x2777f47c5d0>

##### Hyperparameters


In [5]:
block_size = 256  # max seq length
n_embed = 384  # embedding dimension
n_head = 6  # no of attention heads
n_layer = 6  # no. of transformer blocks stacked on each other
dropout = 0.2

vocab_size = get_vocab_size(tokenizer)

device = 'cuda' if torch.cuda.is_available() else 'cpu'

In [6]:
print(device)

cuda


In [7]:
# print(torch.version.cuda)
# print(torch.cuda.is_available())

##### HEAD class

In [8]:
from typing import Optional, Tuple
import torch
import torch.nn as nn
from torch.nn import functional as F

class Head(nn.Module):
    '''one attention head in the self-attention'''

    def __init__(self, head_size: int) -> None:
        super().__init__()

        # creating the Q, K, and V layers; Learend during the optimization process
        self.key = nn.Linear(n_embed, head_size, bias=False)
        self.query = nn.Linear(n_embed, head_size, bias=False)
        self.value = nn.Linear(n_embed, head_size, bias = False)

        # Attention mask
        # self.register will make sure it's a fixed parameter and not a learnable parameter
        # tril is referring to lower triangle
        # only the lower triangle & principle diagonal is filled with ones, rest are filled with zeroes
        self.register_buffer('tril', torch.tril(
            torch.ones(block_size, block_size)
        ))

        self.dropout = nn.Dropout(dropout)

    # defines the data flow
    def forward(self, x:torch.Tensor) -> torch.Tensor:
        # input shape: (batch, time-step, channels) -> batch size, no. of tokens in each sequence, embedding dim for each token
        # ouput shape: (batch, time-step, head_size) -> the channels dimension has been transformed into a smaller head_size (size of each head)
        _, T, _ = x.shape
        k = self.key(x)  # (B, T, hs)
        q = self.query(x)  # (B, T, hs)
        v = self.value(x)  # (B, T, hs)

        # compute attention scores
        # (B, T, hs) @ (B, hs, T) -> (B, T, T)
        weights = q @ k.transpose(-2, -1) * k.shape[-1]**(-0.50)

        weights = weights.masked_fill(
            # trims the tril matrix to [:T, :T] -> if value == 0 -> replaces that with "-inf"
            self.tril[:T, :T] == 0, float('-inf')
        )
        
        # softmax activation
        weights = F.softmax(weights, dim=-1)  # (B, T, T)

        # dropout 
        weights = self.dropout(weights)

        # weighted aggregation on Values
        out = weights @ v  # (B, T, T) @ (B, T, hs) -> (B, T, hs)

        return out

##### Multi Head Attention class

In [9]:
class MultiHeadAttention(nn.Module):
    ''' multiple heads of the self-attention in parallel'''

    def __init__(self, num_heads: int, head_size: int) -> None:
        super().__init__()

        # creating new heads acc to num_heads defined
        self.heads = nn.ModuleList([Head(head_size) for _ in range(num_heads)])

        # proj layer to proj back into original dimension
        self.projection = nn.Linear(head_size * num_heads, n_embed)

        self.dropout = nn.Dropout(dropout)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        out = torch.cat([h(x) for h in self.heads], dim = -1)  # concatting outputs from all heads
        out = self.dropout(self.projection(out))  # projects back into n_embed -> performs regularization
        return out

##### Feed Forward class

In [10]:
class FeedForward(nn.Module):
    ''' simple Linear Layer followed by Non-Linear layer'''

    def __init__(self, n_embed: int) -> None:
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(n_embed, 4 * n_embed),  # we make the space wider so the model can learn more complex patterns
            nn.ReLU(),  # activation function to introduce non-linearity (to cpature more complex relationships. ex: curvy shapes etc)
            nn.Linear(4 * n_embed, n_embed),
            nn.Dropout(dropout)
        )

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        return self.net(x)

##### Block class

In [11]:
class Block(nn.Module):
    ''' Transformer Block: one full block'''

    def __init__(self, n_embed: int, n_head:int) -> None:
        super().__init__()

        head_size = n_embed // n_head

        self.self_attention = MultiHeadAttention(n_head, head_size)
        self.feed_forward = FeedForward(n_embed)
        self.layer_norm_1 = nn.LayerNorm(n_embed)  # we use standard normalization
        self.layer_norm_2 = nn.LayerNorm(n_embed)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        # you can apply transformation before or after self attention
        x = x + self.self_attention(self.layer_norm_1(x))
        x = x + self.feed_forward(self.layer_norm_2(x))
        return x


##### Final Assembly

In [12]:
class GPTLanguageModel(nn.Module):

    def __init__(self) -> None:
        super().__init__()

        # (basically lookup tables) convert token IDs and position indices into meaningful vectors
        self.token_embedding_table = nn.Embedding(vocab_size, n_embed)
        self.position_embedding_table = nn.Embedding(block_size, n_embed)

        # creating sequential stack of blocks
        self.blocks = nn.Sequential(
            
            # creates n_layer no of Block modules as a list
            # "*" is an operator to unpack, it takes the list of blocks and passes them as seperate arguments to nn.Sequential
            # Block is the module we created earlier
            *[Block(n_embed, n_head = n_head) for _ in range(n_layer)]
        )

        self.final_layer_norm = nn.LayerNorm(n_embed)

        # this contains the raw scores for all tokens in the vocab, 
        # where each score corresponds to a token in the vocabulary.
        # The highest score is the model's prediction for the next most likely token.
        
        # we convert iot from the mode;s internal embedding dimension "n_embed" to the much lagrer dimension of the vocabulary
        self.final_linear_layer = nn.Linear(n_embed, vocab_size) 

        self.apply(self._init_weights)

    # initializes the weightrs and biases to stabilize training
    # for controlled randomness
    def _init_weights(self, module: nn.Module) -> None:
        if isinstance(module, nn.Linear):
            torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)
            if module.bias is not None:
                torch.nn.init.zeros_(module.bias)
        elif isinstance(module, nn.Embedding):
            torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)

    def forward(self, input_tokens: torch.Tensor, targets: Optional[torch.Tensor] = None) -> Tuple[torch.Tensor, Optional[torch.Tensor]]:
        # "targets" contains the correct next tokens that the model is supposed to predict.
        # if targets are provided then the model makes its prediction and calculates loss by comparing predicted logits to target's logits
        # this loss value is then used in the backward pass to update weights
        
        ''' 
            forward pass of the model.
            
            Args:
                input_tokens: Tensor of token indices of shape (batch_size, sequence_length)
                targets: optional tensor of target token indices of same shape as input_token

            Output:
                Tuple of (logits, loss) where logits has shape (batch_size, sequence_length, vocab_size)
                and loss is optional cross-entropy loss if targets are provided.
        '''
        # the third dimension in the output[logits]:
        #       for each position in the input sequence, the model outputs a single vector that conatins the raw scores for all tokens in the vocabulary

        B, T = input_tokens.shape

        # C -> channels. "n_embed"
        # T -> time-step. "sequence length"
        # input_tokens and targets are both (B, T) tensor of integers
        token_embedding = self.token_embedding_table(input_tokens)  # (B, T, C)  # to convert (look at __init__())
        positional_embedding = self.position_embedding_table(torch.arange(T, device=device))  # (T, C)

        # attaching position vector to the hidden states. (new alternatice is RoPE)
        x = token_embedding + positional_embedding  # (B, T, c)

        x = self.blocks(x)  # (B, T, c)
        x = self.final_layer_norm(x)  # (B, T, C)
        logits = self.final_linear_layer(x)  # (B, T, vocab_size)

        if targets is None:
            loss = None
        else:
            B, T, C = logits.shape

            # reshape to 2D to find cross_entropy
            logits = logits.view(B*T, C)
            targets = targets.view(B*T)

            loss = F.cross_entropy(logits, targets)

        return logits, loss

    def generate(self, input_tokens: torch.Tensor, max_new_tokens: int) -> torch.Tensor:
        '''
            Generate new tokens given a context.

            Args:
                input_tokens: starting token indices of shape (batch_size, sequence_length)
                max_new_tokens: no. of new tokens to generate
        '''

        for _ in range(max_new_tokens):

            # Crop the context to the last block_size tokens. The model has a fixed context
            # window and can only see up to `block_size` tokens at a time. On each
            # generation step, we feed it the most recent part of the sequence.
            cropped_input = input_tokens[:, -block_size:]

            # get predictions
            logits, _ = self(cropped_input)  # logits is (B, T, vocab-size)

            # Select logits only for the last token
            # [:, -> select all items in the batch
            # -1, -> select only the last item in the time-step (sequence_length )
            # :] -> select all scores in the vocabulary
            logits = logits[:, -1, :]  # becomes (B, C)

            # apply softmax to convert raw probabilities to usable probabilities
            probs = F.softmax(logits, dim = 1)  # (B, C)

            # sample fro the distribution
            idx_next = torch.multinomial(probs, num_samples=1)  # (B, 1)

            # append sampled index to the running sequence
            # appending the predict4ed token to the previsou input to create a new input for the next iteration
            input_tokens = torch.cat(
                (input_tokens, idx_next), dim=1  # (B, 1)
            )

        return input_tokens

###### IMPROVEMENTS FOR ABOVE CELL

1. ***The Modern Standard: Variance-Aware Initialization*** 

Most modern deep learning frameworks, including PyTorch, now use more advanced default methods. The most common are:

* **Kaiming (He) Initialization**: The standard for layers that are followed by a ReLU activation.

* **Xavier (Glorot) Initialization**: The standard for layers followed by activations like tanh or sigmoid.
  
  
2. ***Use RoPE Embeddings***

## Parameters & Dummy input

In [13]:
model = GPTLanguageModel()
model = model.to(device)

print(sum(param.numel() for param in model.parameters())/1e6, 'M Parameters')

11.533321 M Parameters


In [14]:
batch_size = 1
seq_length = 6

x = torch.randint(0, vocab_size, (batch_size, seq_length))
x = x.to(device)

logits, loss = model(x)
print(logits.shape, loss)

torch.Size([1, 6, 1033]) None


In [15]:
logits

tensor([[[ 0.5514, -0.1546,  0.3715,  ..., -0.4380,  0.5407, -0.2988],
         [-0.2836, -0.7895,  0.4703,  ..., -0.2010,  0.2372, -0.1207],
         [-0.0333,  0.1205,  0.1493,  ..., -0.0658,  0.2216, -0.0035],
         [ 0.4835,  0.1945, -0.1923,  ...,  0.1604, -0.0217, -0.1989],
         [ 0.2329, -0.4434,  0.1450,  ...,  0.2638, -0.2986,  0.1897],
         [ 0.7445, -0.2712,  0.1245,  ...,  0.1726, -0.3442,  0.2944]]],
       device='cuda:0', grad_fn=<ViewBackward0>)

## Model Summary

In [16]:
def print_model_structure(model: torch.nn.Module, indent: str = '') -> None:
    ''''
        Custom function to print the model's structure in a hierarchical format.
    '''

    for name, child in model.named_children():
        params = sum(param.numel() for param in child.parameters())
        print(f'{indent}|- {name}: {child.__class__.__name__} ({params:,} parameters)')
        print_model_structure(child, indent + '|  ')

print('     Model Structure\n\n')
print_model_structure(model)

     Model Structure


|- token_embedding_table: Embedding (396,672 parameters)
|- position_embedding_table: Embedding (98,304 parameters)
|- blocks: Sequential (10,639,872 parameters)
|  |- 0: Block (1,773,312 parameters)
|  |  |- self_attention: MultiHeadAttention (590,208 parameters)
|  |  |  |- heads: ModuleList (442,368 parameters)
|  |  |  |  |- 0: Head (73,728 parameters)
|  |  |  |  |  |- key: Linear (24,576 parameters)
|  |  |  |  |  |- query: Linear (24,576 parameters)
|  |  |  |  |  |- value: Linear (24,576 parameters)
|  |  |  |  |  |- dropout: Dropout (0 parameters)
|  |  |  |  |- 1: Head (73,728 parameters)
|  |  |  |  |  |- key: Linear (24,576 parameters)
|  |  |  |  |  |- query: Linear (24,576 parameters)
|  |  |  |  |  |- value: Linear (24,576 parameters)
|  |  |  |  |  |- dropout: Dropout (0 parameters)
|  |  |  |  |- 2: Head (73,728 parameters)
|  |  |  |  |  |- key: Linear (24,576 parameters)
|  |  |  |  |  |- query: Linear (24,576 parameters)
|  |  |  |  |  |- valu

In [17]:
import pandas as pd

def get_model_stats(model: torch.nn.Module) -> pd.DataFrame:
    ''''
        Create a DataFrame with detailed layer statistics
    '''
    
    stats = []
    for name, module in model.named_modules():
        if len(list(module.children())) == 0:
            params = sum(param.numel() for param in module.parameters())
            stats.append({
                'Layer Name': name,
                'Type': module.__class__.__name__,
                'Total Parameters': params,
                'Trainable Parameters': sum(param.numel() for param in module.parameters() if param.requires_grad)
            })
    
    return pd.DataFrame(stats)

stats_df = get_model_stats(model)

stats_df

Unnamed: 0,Layer Name,Type,Total Parameters,Trainable Parameters
0,token_embedding_table,Embedding,396672,396672
1,position_embedding_table,Embedding,98304,98304
2,blocks.0.self_attention.heads.0.key,Linear,24576,24576
3,blocks.0.self_attention.heads.0.query,Linear,24576,24576
4,blocks.0.self_attention.heads.0.value,Linear,24576,24576
...,...,...,...,...
191,blocks.5.feed_forward.net.3,Dropout,0,0
192,blocks.5.layer_norm_1,LayerNorm,768,768
193,blocks.5.layer_norm_2,LayerNorm,768,768
194,final_layer_norm,LayerNorm,768,768


In [18]:
partially_trainable_params = stats_df[stats_df['Total Parameters'] != stats_df['Trainable Parameters']]
partially_trainable_params

Unnamed: 0,Layer Name,Type,Total Parameters,Trainable Parameters


All the params are trainable cos we first need to train the base model. this will be different in the finetuning stage