# Topic 1

**Positional Encoding** This is used as in LLM they do not have recurrence as in sequential models they do not know how to treat sequence so for getting the position of tokens and providing sequence.

Positional encoding is used

## *Imports*

In [51]:
import torch
import torch.nn as nn
import torch.nn.functional as F

import math
import sentencepiece as spm
from datasets import Dataset

import warnings

In [52]:
warnings.filterwarnings( category = FutureWarning , action =  'ignore')

##### Device Setup

In [53]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f'Device : {device}')

Device : cuda


## *Pos Encoding*

In [54]:
def positonal_encoding(seq_length , d_model):
    
    pe = torch.zeros(seq_length , d_model)
    postions = torch.arange(0 , seq_length , dtype = torch.float32).unsqueeze(1)
    div_term = torch.exp(torch.arange(0 , d_model , 2).float() * (-math.log(10_000) / d_model))
    
    pe[: , 0::2] = torch.sin(postions * div_term)
    pe[: , 1::2] = torch.cos(postions * div_term)
    
    return pe

# Attention Mechanism 

The core of transformer is **Attention** we will start with *scaled dot product attention* 

1. In this a position token give attention to other tokens

2. It helps to weighs the importance in the embedding of each token

3. Softmax converts these weights to probabilites

4. The dot product helps to see similarity between the tokens

## *Scaled Dot product Attention*

In [55]:
def scaled_dot_product_attention(Q , K , V):
    
    # Query is a 3D (batch , seq_length , d_k)
    
    d_k = Q.size(-1)
    
    # Matmul is matrix multiplication
    
    scores = torch.matmul(Q , K.transpose(-2 , -1) / torch.sqrt(torch.tensor(d_k , dtype = torch.float32)))
    
    # K also has same dimension but the transpose changes its dimension to (batch , d_k , seq)
    
    attn_weights = torch.softmax(scores , dim = -1)
    
    # softmax convert the weights to probabilities
    
    output = torch.matmul(attn_weights , V)
    
    return output , attn_weights



# Example input (batch size = 1, seq_len = 4, d_k = 8)
Q = torch.rand(1, 4, 8)
K = torch.rand(1, 4, 8)
V = torch.rand(1, 4, 8)

output, attn_weights = scaled_dot_product_attention(Q, K, V)
print("Output:\n", output)
print("Attention Weights:\n", attn_weights)
    

Output:
 tensor([[[0.5216, 0.7006, 0.6354, 0.5056, 0.5217, 0.5956, 0.4607, 0.4217],
         [0.5254, 0.6957, 0.6442, 0.5142, 0.5234, 0.5924, 0.4686, 0.4219],
         [0.5253, 0.7074, 0.6406, 0.5035, 0.5170, 0.5959, 0.4565, 0.4170],
         [0.5337, 0.6973, 0.6407, 0.5128, 0.5122, 0.5892, 0.4868, 0.4318]]])
Attention Weights:
 tensor([[[0.2235, 0.3092, 0.2360, 0.2313],
         [0.2337, 0.2857, 0.2515, 0.2290],
         [0.2332, 0.3043, 0.2448, 0.2177],
         [0.2521, 0.2797, 0.2352, 0.2331]]])


# *Multi Head Attention*

In this technique parallel layers will focus attention on dimension and is far more significantly impactful then single dot product attention

In [56]:
class Multi_Head_Attention(nn.Module):
    
    def __init__(self, d_model, num_heads):
        super(Multi_Head_Attention , self).__init__()
    
        # Assert if num heads are applicable
        try :
            assert d_model % num_heads == 0
        except:
            print(f'Number of heads are not applicable on the dimension of model')
            
            
        self.d_model = d_model
        self.num_heads = num_heads
        self.head_dim = d_model // num_heads
        
        
        # Create linear layers for Q , K , V
        
        self.W_Q = nn.Linear(d_model, d_model)
        self.W_K = nn.Linear(d_model, d_model)
        self.W_V = nn.Linear(d_model, d_model)
        
        # Output projection
        
        self.projection = nn.Linear(d_model, d_model)
        
    def single_dot_attention(self, Q, K, V):
        
        d_k = Q.size(-1)
        
        scores = torch.matmul(Q, K.transpose(-2, -1)) / torch.sqrt(torch.tensor(d_k, dtype = torch.float32))
        
        attention_weights = torch.softmax(scores, dim = -1)
        
        output = torch.matmul(attention_weights, V)
        
        return output, attention_weights
    
    
    def forward(self, query, key, value, mask = None):
        
        batch_size = query.size(0)
        
        # Linear projection
        
        Q = self.W_Q(query)
        K = self.W_K(key)
        V = self.W_V(value)
        
        def reshape(x):
            x.view(batch_size, -1, self.num_heads, self.head_dim).transpose(1, 2)
            
            return x

        Q = reshape(Q)
        K = reshape(K)
        V = reshape(V)
        
        # Now add attention
        
        output , attn = self.single_dot_attention(Q, K, V)
        
        output = output.transpose(1,2).contiguous().view(batch_size, -1, self.d_model)
        
        projection_out = self.projection(output)
        
        return projection_out , attn
    

# *Encoded Layer*

It intact all the layers:

1.  Optinal(Layer Normalization)

1. MHA

2. Layer Normalization

3. Feed forward Network

4. Layer Normalization

In [57]:
class Transformer_Encoder(nn.Module):
    
    def __init__(self, d_model, num_heads, d_ff, dropout = 0.1):
        super(Transformer_Encoder, self).__init__()
        
        # Normalization
        
        self.norm1 = nn.LayerNorm(d_model)

        
        # MHA
        
        self.MHA = Multi_Head_Attention(d_model, num_heads)
        
        # Layer Normalization 2
        
        self.norm2 = nn.LayerNorm(d_model)

        
        
        # Feed forward Network
        
        self.FFN = nn.Sequential(
            
            nn.Linear(d_model, d_ff),
            nn.SiLU(),
            nn.Linear(d_ff, d_ff),
            
            
            nn.Linear(d_ff, d_ff),
            nn.SiLU(),
            nn.Linear(d_ff, d_model)
        )
        
        # Layer Normalization 3
        self.norm3 = nn.LayerNorm(d_model)
        
        # Dropout layer
        
        self.drop_1 = nn.Dropout(dropout)
        self.drop_2 = nn.Dropout(dropout)
        
        
        
    def forward(self, x):
        
        # Pass to norm 1
        
        x = self.norm1(x)
        
        # Pass to MHA
        
        attn_output, _ = self.MHA(x, x, x)
        
        # Pass to norm 2
        
        x = self.norm2(x + self.drop_1(attn_output))
        
        # Pass to FFN
        
        ffn = self.FFN(x)
        
        # Pass to norm 3
        
        x = self.norm3(x + self.drop_2(ffn))
        
        return x
        

# *Stack*

Here we will stack the encoded layer for parallel and significant processing

In [58]:
class Transformer_Stack(nn.Module):
    
    def __init__(self, d_model, d_ff, num_heads, num_layer):
        super(Transformer_Stack, self).__init__()
        
        
        self.layer = nn.ModuleList([Transformer_Encoder(d_model, num_heads, d_ff)
                                    for _ in range(num_layer)])
        
        
        self.d_model = d_model
        
    def forward(self, x):
        
        seq_length = x.size(1)
        
        pe = positonal_encoding(seq_length, self.d_model).unsqueeze(0).to(x.device)
        
        x = x + pe
        
        for layer in self.layer:
            x = layer(x)
            
            
        return x
        

### Initailizating

In [59]:
encoder_stack = Transformer_Stack(
    num_layer=6,                        
    d_model=512,
    num_heads=8,
    d_ff=2048
)

dummy_input = torch.rand(32, 50, 512)  # (batch_size, seq_len, d_model)
out = encoder_stack(dummy_input)
print(out.shape)

torch.Size([32, 50, 512])


# *Transformer Decoder*

In [60]:
class Transformer_Decoder(nn.Module):
    def __init__(self, d_model, d_ff, num_heads, dropout = 0.1):
        super(Transformer_Decoder, self).__init__()
        
        # Masked Self attention 
        
        self.norm_1 = nn.LayerNorm(d_model)
        self.masked = Multi_Head_Attention(d_model, num_heads)
        
        # Encoder Decoder Attention
        
        self.norm_2 = nn.LayerNorm(d_model)
        self.enc_dec = Multi_Head_Attention(d_model, num_heads)
        
        # Feed Forward Network
        
        self.norm_3 = nn.LayerNorm(d_model)
        self.FFN = nn.Sequential(
            
            nn.Linear(d_model, d_ff),
            nn.SiLU(),
            
            nn.Linear(d_ff, d_ff),
            nn.SiLU(),
            
            nn.Linear(d_ff, d_model),
            nn.SiLU()
        )
        
        
        # Norm
        
        self.norm_4 = nn.LayerNorm(d_model)
        
        # Dropouts
        
        self.drop_1 = nn.Dropout(dropout)
        self.drop_2 = nn.Dropout(dropout)
        self.drop_3 = nn.Dropout(dropout)
        
        
    def forward(self, x, enc_out, mask = None):
        
        # LayerNorm -> Masked 
        
        x_2 = self.norm_1(x)
        
        attn_output, _ = self.masked(x_2, x_2, x_2 , mask)
        
        # Masked -> Drop -> LayerNorm -> Enc-Dec
        
        x = x + self.drop_1(attn_output)
        
        x_2 = self.norm_2(x)
        
        attn_output_2, _ = self.enc_dec(x_2, enc_out, enc_out)
        
        # Enc-Dec -> Drop -> LayerNorm -> FFN
        
        x = x + self.drop_2(attn_output_2)
        
        x_2 = self.norm_3(x)
        
        ffn_out = self.FFN(x_2)
        
        x_2 = x + self.drop_3(ffn_out)
        
        x = self.norm_4(x_2)
        
        return x

# *Masking*

It prevents the transformer to see the future tokens and help it to learn to adjust what is has produced till now
without it the transformer will never learn thus large losses and increased training time can be seen.

In [61]:
def generate_mask(seq_length):
    
    mask = torch.triu(torch.ones(seq_length, seq_length) , diagonal = 1)
    
    mask = mask.masked_fill(mask == 1 , -torch.inf)
    
    return mask

Stacking the Decoding Layers

# *Encoder Decoder Stack*

In [62]:
class Encoder_Decoder_Stack(nn.Module):
    
    def __init__(self, d_model, d_ff, num_heads, num_layer):
        super(Encoder_Decoder_Stack, self).__init__()
        
        self.layer = nn.ModuleList([Transformer_Decoder(d_model, d_ff, num_heads)
                                    for _ in range(num_layer)])
        
        
        self.d_model = d_model
        
    def forward(self, x, enc_out):
        
        seq_length = x.size(0)
        
        pe = positonal_encoding(seq_length, self.d_model).to(x.device)
        
        x = x + pe
        
        mask = generate_mask(seq_length).to(x.device)
        
        for layer in self.layer:
            x = layer(x, enc_out, mask)
            
        return x

### *Initialize*

In [63]:
decoder_stack = Encoder_Decoder_Stack(
    num_layer=6,     
    d_model=512,
    num_heads=8,
    d_ff=2048
)

dummy_input = torch.rand(32, 50, 512)  # (batch_size, seq_len, d_model)
out = encoder_stack(dummy_input)
print(out.shape)

torch.Size([32, 50, 512])


# *Full Transformer*


In [64]:
class Full_Transformer(nn.Module):
    def __init__(self, src_vocab_size, tgt_vocab_size, d_ff, d_model, num_heads, num_layers, dropout = 0.1):
        super(Full_Transformer, self).__init__()
        
        
        self.d_model = d_model
        
        # Embeddings
        
        self.src_embedd = nn.Embedding(src_vocab_size, d_model)
        self.tgt_embedd = nn.Embedding(tgt_vocab_size, d_model)
        
        # Encoder Decoder Stack
        
        self.encoder = Transformer_Stack(d_model, d_ff, num_heads, num_layers)
        self.decoder = Encoder_Decoder_Stack(d_model, d_ff, num_heads, num_layers)
        
        # Final Projection
        
        self.projection = nn.Linear(d_model, tgt_vocab_size)
        
        # Dropout
        
        self.drop = nn.Dropout(dropout)
        
    def forward(self, src, tgt):
        
        src_seq_length = src.size(0)
        tgt_seq_length = tgt.size(0)
        
        # Embeddings
        
        src = self.src_embedd(src) * math.sqrt(self.d_model)
        tgt = self.tgt_embedd(tgt) * math.sqrt(self.d_model)
        
        # Pos Encoding
        
        src_pe = positonal_encoding(src_seq_length, self.d_model).unsqueeze(0).to(src.device)
        tgt_pe = positonal_encoding(tgt_seq_length, self.d_model).unsqueeze(0).to(tgt.device)
        
        # Drops
        
        src = self.drop(src + src_pe)
        tgt = self.drop(tgt + tgt_pe)
        
        # Encoder pass
        
        enc_out = self.encoder(src)
        
        # Decoder pass
        
        dec_out = self.decoder(tgt, enc_out)
        
        # Final Projection
        
        out = self.projection(dec_out)
        
        return out

# *Tokenizer*

Transformers works on numeric ids then words which leads to need of tokenizer.
The **Tokenizer** used in this transformer is **Sentence Piece** and not an scratch cause it was not possible for me to train such large dataset on a manual build tokenizer i would eventually run of **memory and compututaion**.

In [65]:
spm.SentencePieceTrainer.train(
    input = 'C:\LLM & Agents\HN.json',
    model_prefix = 'HN_Tokenizer',
    vocab_size = 800
)


In [66]:
sp = spm.SentencePieceProcessor()

# Loading the tokenizer
sp.Load('HN_Tokenizer.model')

tokens = sp.Encode('What is the first law and most important principle' , int)
print(tokens)

decode = sp.decode(tokens)
print(decode)

[4, 235, 505, 58, 90, 28, 142, 114, 219, 4, 18, 4, 15, 288, 219, 4, 743, 89, 744, 143, 12, 4, 59, 26, 78, 54]
What is the first law and most important principle


In [67]:
src_vocab_size = tgt_vocab_size = 800

Champion = Full_Transformer(
    src_vocab_size,
    tgt_vocab_size,
    d_ff = 2048,
    d_model = 512,
    num_heads = 8,
    num_layers = 6
).to(device)
