Name: Aditya Singh | [LinkedIn](https://www.linkedin.com/in/adityasingh2022/) | [GitHub](https://github.com/adityasinghcoding)

# __Transformer from Scratch__

`Transformer`(transform/translate) helps in understanding real world meaning of the data by transforming the relations(important/priority-wise) w.r.t actual fundamental meaning. 
<br>___Note :___ Transformer acts as the model inside main model.

---
### __Components of Transformer__
1. ___Encoder Layer___
   - Input Embeddings
   - Positional Encoding
   - Self-Attention Layer
   - Feed Forward Network
2. ___Decoder Layer___
   - Masked Self-Attention
   - Encoder-Decoder Attention
   - Feed-Forward Network

---
**Attention**: It finds, measure, evaluate the relation of one word or any data with other data present in the batch/sequence/matrix. 
There are 2 types of Attentions:
- **Self Attention**: Evaluate connection/relation of each word/data. 
- **Multi Head Attention**: Multiple Self Attentions in parallel, to evaluate complex relations between entities(data). Head refers to the attention mechanism.
--- 
### __Architecture of Transformers__
#### __Encoder Layer__
- **Input** (Tokens/Embedding are the raw input to the Transformer.)
   Token's embedding transformed/split into 3 vectors: Q, K, V. Each token has its own Q, K, V.
- **Positional Encoding** (Metadata of data order with sine/cosine functions)
- **Self Attention Layer** (Relation capturing & evaluation or in technical terms: establishing weights)
   - $Attention(Q,K,V) = softmax\left(\frac{QK^T}{\sqrt{d_k}}\right)V$ <br>_Softmax(Sum to 1) is used to normalize the data(scores) & to convert scores to probability distribution._
      - _Q, K, V:_ Query, Key, Value
      - ${QK^T}$: Compute Scores/Interaction with others. <br>_Represents the strength of attention._
      - _V_: Value vectors, Dynamic dictionary. <br>_It contains content/dictionary/data which is linked with Q & K through multiplication helping in translating the relation in final understanding/output(placement of word). V helps in understanding the context of the language._
   - $\sqrt{d_k}$: Fixes large dot products causing gradient issues. </br> ${d_k}$ refers to the dimension of keys.<br>___"d"___ is the dimensionality.
- __Feed-Forward Network__ (Simple Neural Network)

#### __Decoder Layer__
- __Masked-Self Attention__ (Hiding future tokens during training)
- __Encoder-Decoder Attention__ (___Q___ from Decoder, ___K & V___ from Encoder. Encoder output used to focus on relevant input parts)
---
### How __Q, K, V__ Works in Transformers:
   1. __Input Tokens__ → Projected into __Keys (K)__ and __Values (V)__.
      - Think: Each token writes its __data (V)__ and a __label (K)__ into a dictionary.
   2. __Queries (Q)__ "search" this dictionary by comparing __Q__ to all __Ks__.
   3. The best-matching __Ks__ (highest attention scores) retrieve their corresponding __Vs__.
   4. The output is a __weighted blend__ of the retrieved __Vs__.

In [1]:
import numpy as np
import random

# Implementation method 1

In [9]:
Embedding = np.array([[1,2,3,4,5]]) # Vectorized input, (1,5)

# Hyperparameters Initialization
embedding_dim = len(Embedding)
qk_dim = embedding_dim
v_dim = embedding_dim

'''
-------------------------------------------------------------------------------------
'''
def SoftMax(input, axis= -1):
   normalized_input = np.exp(input - np.max(input, axis=axis, keepdims= True)) # Subtracting maximum value from input to avoid large exponential number
   return normalized_input / normalized_input.sum(axis = -1, keepdims = True) # keepdims = True; is used to maintain the compatibility of the matrix dimension with other dimension.

def SelfAttention(Q, K, V):
   QK = np.dot(Q, K.T) # scores  = sum(Query x Transposed_Keys)
   scaled_QK = QK / np.sqrt(qk_dim) # normalized_scores = scores / sqrt(keys_dimension)
   attention_weights = SoftMax(scaled_QK, axis = -1) # weights = SoftMax(normalized_scores, from_last_dimension) // It is used to normalize the scores/input summed to 1 at the same time it converts the scores/input in probabilities .
   attention = np.dot(attention_weights, V) # attention = attention_weights x Values

   # "attention" returned for further processing in model in the next layer.
   # "attention_weights" returned for understanding how the model is making decisions.
   return attention, attention_weights
'''
-------------------------------------------------------------------------------------
'''
# # Transformer Initialization without class 
# # Initializing the weights of Q, K, V
# WQ = np.random.randn(embedding_dim, qk_dim) # 2 parameters in random function are for matrix dimensions, ex: (5 x 5) // Note: About 99.7% values ranges from -3 to +3
# WK = np.random.randn(embedding_dim, qk_dim) 
# WV = np.random.randn(embedding_dim, qk_dim)

# # Computing Q, K, V from input embedding. In simple words multiplying weights(WQ, WK, WV) with inputs(Embeddings).
# Q = np.dot(Embedding, WQ)
# K = np.dot(Embedding, WK)
# V = np.dot(Embedding, WV)

'''
-------------------------------------------------------------------------------------
'''
# Transformer Initialization with class
class TransformerLayer:
   def __init__(self, dim):
      # Layer specific parameters
      self.WQ = np.random.randn(dim, dim) * np.sqrt(2./dim)
      self.WK = np.random.randn(dim, dim) * np.sqrt(2./dim)
      self.WV = np.random.randn(dim, dim) * np.sqrt(2./dim)

      # Residual connections help gradients flow through deep networks
      self.normalize = Layer_Normalization(dim)

   def __call__(self, input):
      residual = input
      Q = np.dot(input, self.WQ)
      K = np.dot(input, self.WK)
      V = np.dot(input, self.WV)

      # Calculating initial attention
      attention, _ = SelfAttention(Q, K, V)
      
      # Adding residual connection (original input) + applying layer normalization
      return self.normalize.forward(attention + residual)
       

'''
-------------------------------------------------------------------------------------
'''
# Feed Forward Neural Network without Class implementation
# He Initialization. It preserve the variance across layers & used for ReLu network to maintain stable gradients
def initializing_ffn(input_features, neurons, output_predicted):
   '''
   Variance = 2 / input_dim
   Standard deviation = sqrt(variance)

   This ensures the output of each layer has approx ~1 variance, which mitigate gradient issues in training.
   '''
   w1 = np.random.randn(input_features, neurons) * np.sqrt(2./input_features) # sqrt(2./input_dim) scales the random values.
   b1 = np.zeros(neurons)
   w2 = np.random.randn(neurons, output_predicted) * np.sqrt(2./neurons)
   b2 = np.zeros(output_predicted)
   return w1, b1, w2, b2

'''
-------------------------------------------------------------------------------------
'''

# Feed Forward Neural Network with Class implementation
class FeedForwardNetwork:
   def __init__(self, input_features, neurons, output_predicted):
      # HE Initialization
      self.w1 = np.random.randn(input_features, neurons) * np.sqrt(2./input_features)
      self.b1 = np.zeros(neurons)

      self.w2 = np.random.randn(neurons, output_predicted) * np.sqrt(2./neurons)
      self.b2 = np.zeros(output_predicted)

   # Decoder
   def forward_pass(self, input):
      # First linear neural layer with activation function ReLU. 
      self.neurons = np.maximum(0,np.dot(input,self.w1)+ self.b1)

      # 2nd(final) linear layer output without any activation function.
      output = np.dot(self.neurons,self.w2)+ self.b2
      return output
'''
-------------------------------------------------------------------------------------
'''
# Positional Encoding (Sinusoidal)
# Helping Transformer to understand word order, position information of embeddings.
def positional_encoding(sequence_length, embedding_dim):
   # Creating position indices (0, 1, 2,... seq_len-1)
   position = np.arange(sequence_length)[:, np.newaxis] # Convert to column vector
   
   # Calculating no. of frequency pairs needed
   num_pairs = embedding_dim // 2

   # Calculating division term for sinusodial functions
   # Creating alternating sine/cosine pattern across dimensions
   i = np.arange(num_pairs) # Handles both even & odd patterns
   div_term = np.exp(i * (-np.log(10000.0) * 2 / embedding_dim))

   # Initializing empty positional encoding matrix
   pe = np.zeros((sequence_length, embedding_dim))

   # Calculating indices explicitly/clearly/openly
   even_indices = 2 * i
   odd_indices = 2 * i + 1

   # Filling even indices with sine values
   pe[:, even_indices] = np.sin(position * div_term) # 0::2 mean start at 0 with stepping size of 2

   # Filling odd indices with cosine values
   pe[:, odd_indices] = np.cos(position * div_term) # 1::2 mean start at 1 with step size of 2

   return pe


'''
-------------------------------------------------------------------------------------
'''

# LAYER NORMALIZATION
# Stablizing network by normalization values in each layer
class Layer_Normalization:
   def __init__(self, dim):
      # Learnable parameters - network will adjust these during training
      self.gamma = np.ones(dim) # Scaling factor (intitally no scaling)
      self.beta = np.zeros(dim) # Shifting factor (initially no shift)
   
   def forward(self, input):
      # Calculating mean & standard deviation across features/input (last dimension)
      mean = input.mean(axis = -1, keepdims = True) # Keeping the dimensions for broadcasting
      standard_deviation = input.std(axis = -1, keepdims = True)

      # Normalizing: (input - mean) / standard_deviation, & then scale/shift
      # 1e-6: 1 x 10^(-6) or 0.000001, which prevents/protects from division by zero
      return self.gamma * (input - mean) / (standard_deviation + 1e-6) + self.beta       

'''
-------------------------------------------------------------------------------------
'''   

# TRANSFORMER LAYER STACKING
# Reapeating/Iterating 6 times as in original paper of Transformer
# for _ in range(6):
#    # Storing previous output for residual connection
#    residual = attention

#    # Self Attention using current representation
#    attention, _ = SelfAttention(attention, attention, attention)

#    # Adding residual connection + normalize
#    attention = Layer_Normalization(attention + residual)

# Creating stacks of transformer layers
transformer_layers = [TransformerLayer(5) for _ in range (6)]

# Processing through layers
attention = Embedding.copy() # Copying original embedding to create the attention input
for layer in transformer_layers: # Looping/automating over all layers of the transformer
   attention = layer(attention) # Applying each transformer layer to the attention


# Adding positional encoding to original embedding
sequence_length = 1 # Number of token(s)
pe = positional_encoding(1,5)
# Embedding += positional_encoding(sequence_length, embedding_dim) # Combining content with position info 
Embedding_Copy = Embedding + pe

# Using FFN (with class)
ffn = FeedForwardNetwork(input_features = 5, neurons = 2048, output_predicted = 5)
ffn_output = ffn.forward_pass(attention)
print("\nTransformer Output:\n",ffn_output)


Transformer Output:
 [[-1.40276885 -0.33733117 -2.87379079  1.36090018 -0.43000261]]


# Implementation method 2

In [10]:
import torch 
import torch.nn as nn
import torch.nn.functional as F

### Self-Attention Layer

In [17]:
class SelfAttention(nn.Module):
   def __init__(self, embed_size, heads):
      super(SelfAttention, self).__init__()
      self.embed_size = embed_size # Dimension of input embeddings (e.g., 512)
      self.heads= heads # Number of attention heads (e.g., 8)
      self.head_dim = embed_size // heads # Dimension per head (e.g., 512/8=64)

      # Ensuring embed_size is divisible by the number of heads
      assert self.head_dim*heads == embed_size, "Embed size must be divisible by heads"

      # Linear layers for Q, K, V
      # Linear layers to project embeddings into Query (Q), Key (K), Value (V) vectors
      self.values = nn.Linear(self.head_dim, self.head_dim, bias=False)
      self.keys = nn.Linear(self.head_dim, self.head_dim, bias=False)
      self.queries = nn.Linear(self.head_dim, self.head_dim, bias=False)

      # Final linear layer to combine outputs from all heads
      self.fc_out = nn.Linear(embed_size, embed_size)
   
   def forward(self, values, keys, queries, mask=None):
      # Get batch size (N) and sequence lengths for values, keys, queries
      N = queries.shape[0] #Batch Size
      value_len, key_len, query_len = values.shape[1], keys.shape[1], queries.shape[1]

      # Spliting embeddings into multiple heads (reshape for parallel computation)
      # New shape: (N, seq_len, heads, head_dim)
      values = values.reshape(N, value_len, self.heads, self.head_dim)
      keys = values.reshape(N, key_len, self.heads, self.head_dim)
      queries = values.reshape(N, query_len, self.heads, self.head_dim)
      
      # Computing Q, K, V
      # Projecting embeddings to Q, K, V using linear layers
      values = self.values(values)
      keys = self.keys(keys)
      queries = self.queries(queries)
      ''''
      Attention scores: (Q*K^T) / sqrt(d_k)
      Compute attention scores (Q * K^T)
      Einstein summation: (batch, query_len, heads, head_dim) x (batch, key_len, heads, head_dim)
      
      nqhd, nkhd -> nhqk; notation to describe the interaction of 2 tensors or more.
      (nqhd): Queries, (nkhd): keys, (nhqk): Result/Resulting shape

      n:	Batch size (number of sequences)
      q:	Query sequence length
      k:	Key sequence length
      h:	Number of attention heads
      d:	Dimension per head (head_dim)

      - nqhd,nkhd->nhqk computes all pairwise interactions between queries and keys across batches and heads.
      - This is the core step in self-attention to determine how words in a sequence relate to each other.
      '''
      energy = torch.einsum("nqhd, nkhd->nhqk", [queries, keys])  

      # Applying mask (if provided) to ignore certain positions (e.g., padding or future tokens)
      if mask is not None:
         # Replace masked positions with -inf
         energy = energy.masked_fill(mask == 0, float("-1e20")) 
      
      # Normalizing scores using softmax and scale by sqrt(embed_size) for stability
      attention = torch.softmax(energy / (self.embed_size**(0.5)), dim = 3)

      # Computing weighted sum of values using attention scores
      # Result shape: (batch, query_len, heads, head_dim)
      out = torch.einsum("nhql, nlhd->nqhd", [attention, values])
      
      # Reshaping back to (batch, query_len, embed_size) and pass through final linear layer
      out = out.reshape(N, query_len, self.embed_size)
      out = self.fc_out(out)
      return out
       

### Positional Encoding

In [18]:
class PositionalEncoding(nn.Module):
   def __init__(self, embed_size, max_seq_len):
      super(PositionalEncoding, self).__init__()

      # Creating a matrix of shape (max_seq_len, embed_size) initialized to zeros
      pe = torch.zeros(max_seq_len, embed_size)

      # Generating positions from 0 to max_seq_len-1
      position = torch.arange(0, max_seq_len, dtype=torch.float).unsqueeze(1)

      # Computing divisor term for scaling positional encoding
      # Using exp and log to avoid numerical instability
      div_term = torch.exp(torch.arange(0, embed_size, 2).float()*(-torch.log(torch.tensor(10000.0)) / embed_size))
      
      # Applying sine to even indices and cosine to odd indices
      pe[:, 0::2] = torch.sin(position * div_term) # Even positions
      pe[:, 1::2] = torch.cos(position * div_term) # Odd positions

      # Register as a buffer (non-trainable parameter) for saving/loading
      self.register_buffer("pe", pe.unsqueeze(0)) # (1, max_seq_len, embed_size)


   def forward(self, x):
      # Add positional encoding to input embeddings
      # x shape: (batch, seq_len, embed_size)
      # pe shape: (1, max_seq_len, embed_size) → automatically broadcasted
      return x + self.pe[:, :x.shape[1], :]
         
          

### Transformer Block (Encoder Layer)

In [19]:
class TransformerBlock(nn.Module):
   def __init__(self, embed_size, heads, dropout = 0.1):
      super(TransformerBlock, self).__init__()

      # Multi-head self-attention layer
      self.attention = SelfAttention(embed_size, heads)

      # Layer normalization for stabilizing training
      self.norm1 = nn.LayerNorm(embed_size)
      self.norm2 = nn.LayerNorm(embed_size)

       # Feed-forward network (expands and contracts embeddings)
      self.ff = nn.Sequential(
         nn.Linear(embed_size, 4 * embed_size), # Expand to 4 * embed_size
         nn.ReLU(), # Non-linearity
         nn.Linear(4 * embed_size, embed_size), # Contract back to embed_size
      )

      # Dropout for regularization
      self.dropout = nn.Dropout(dropout)
      
   def forward(self, x, mask = None):
      # Step 1: Compute self-attention
      attention = self.attention(x, x, x, mask)

      # Step 2: Residual connection + layer norm
      x = self.norm1(attention + x) # Residual skip connection
      x = self.dropout(x)

      # Step 3: Feed-forward network
      ff = self.ff(x)

      # Step 4: Residual connection + layer norm
      x = self.norm2(ff + x)
      x = self.dropout(x)
      return x
          

### Full Transformer (Encoder-Decoder)

In [20]:
class Transformer(nn.Module):
   def __init__(self, src_vocab_size, embed_size, num_layers, heads, max_seq_len, dropout = 0.1):
      super(Transformer, self).__init__()

      # Embedding layer to convert token IDs to vectors
      self.embed = nn.Embedding(src_vocab_size, embed_size)

      # Positional encoding to add sequence information
      self.pe = PositionalEncoding(embed_size, max_seq_len)

      # Stack multiple transformer blocks (encoder layers)
      self.layers = nn.ModuleList([TransformerBlock(embed_size, heads, dropout)
      for _ in range (num_layers)
      ])

      # Final linear layer to project embeddings back to vocabulary size
      self.fc_out = nn.Linear(embed_size, src_vocab_size)


   def forward(self, x, mask = None):
      # Step 1: Convert token IDs to embeddings
      x = self.embed(x) # (batch, seq_len) → (batch, seq_len, embed_size)

      # Step 2: Add positional encoding
      x = self.pe(x)

      # Step 3: Pass through each transformer block
      for layer in self.layers:
         x = layer(x,mask)

         # Step 4: Project embeddings to vocabulary logits
      x = self.fc_out(x) # (batch, seq_len, vocab_size)
      return x
          
       

### Training a Toy Example

In [21]:
# Hyperparameters
embed_size = 128 # Dimension of embeddings
heads = 8 # Number of attention heads
num_layers = 3 # Number of transformer blocks
max_seq_len = 10 # Maximum sequence length
vocab_size = 10 # Vocabulary size (e.g., 10 tokens: 0-9)

# Initializing model, loss, and optimizer
model = Transformer(vocab_size, embed_size, num_layers, heads, max_seq_len)
criterion = nn.CrossEntropyLoss() # For classification tasks
optimizer = torch.optim.Adam(model.parameters(), lr= 0.001)

# Generating toy data (input and target are the same for a copy task)
src = torch.randint(0, vocab_size, (32, max_seq_len)) # Fake input (batch_size=32)
trg = src.clone() # Target is same as input (simple copy task)


# Traning loop 
print("Transformer Output:\n")
for epoch in range(100):
   # Forward pass: compute model predictions
   output = model(src) # Shape: (batch, seq_len, vocab_size)

   # Compute loss (flatten batch and sequence dimensions for cross-entropy)
   loss = criterion(output.view(-1, vocab_size), trg.view(-1))

   # Backpropagation
   optimizer.zero_grad() # Clear gradients
   loss.backward() # Compute gradients
   optimizer.step() # Update weights
   print(f"Epoch {epoch}, Loss: {loss.item()}")

Transformer Output:

Epoch 0, Loss: 2.505314826965332
Epoch 1, Loss: 1.7464202642440796
Epoch 2, Loss: 1.1973932981491089
Epoch 3, Loss: 0.7467731237411499
Epoch 4, Loss: 0.4208246171474457
Epoch 5, Loss: 0.24964067339897156
Epoch 6, Loss: 0.14512111246585846
Epoch 7, Loss: 0.10315585136413574
Epoch 8, Loss: 0.07764178514480591
Epoch 9, Loss: 0.06334101408720016
Epoch 10, Loss: 0.04884720221161842
Epoch 11, Loss: 0.042373571544885635
Epoch 12, Loss: 0.03544330224394798
Epoch 13, Loss: 0.032105155289173126
Epoch 14, Loss: 0.027438536286354065
Epoch 15, Loss: 0.025192206725478172
Epoch 16, Loss: 0.022478139027953148
Epoch 17, Loss: 0.019663888961076736
Epoch 18, Loss: 0.01893792673945427
Epoch 19, Loss: 0.017982598394155502
Epoch 20, Loss: 0.016582416370511055
Epoch 21, Loss: 0.01589564047753811
Epoch 22, Loss: 0.014367098920047283
Epoch 23, Loss: 0.013960190117359161
Epoch 24, Loss: 0.013418605551123619
Epoch 25, Loss: 0.012272506020963192
Epoch 26, Loss: 0.012029631994664669
Epoch 27, 