<a href="https://colab.research.google.com/github/ShantanuKadam3115/MachineLearningBasics/blob/ML_implementations/miniTransformer.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [29]:
import torch
import torch.nn as nn
import math

class PositionalEncoding(nn.Module):
  def __init__(self, d_model, max_len = 5000):
    super(PositionalEncoding, self).__init__()

    pe = torch.zeros(max_len, d_model)
    position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
    div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000)/d_model))

    pe[:, 0::2] = torch.sin(position * div_term)
    pe[:, 1::2] = torch.cos(position * div_term)

    self.register_buffer('pe', pe.unsqueeze(0))

  def forward(self, x):
    x= x + self.pe[:, :x.size(1)]
    # print(f"PositionalEncoding: {x}")
    # print(f"PositionalEncoding shape: {x.shape}")
    return x



In [30]:
class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, num_heads):
        super(MultiHeadAttention, self).__init__()
        self.d_model = d_model
        self.num_heads = num_heads
        self.head_dim = d_model // num_heads

        assert d_model % num_heads == 0, "d_model must be divisible by num_heads"

        # Linear layers to project Input to Q, K, V
        self.W_q = nn.Linear(d_model, d_model)
        self.W_k = nn.Linear(d_model, d_model)
        self.W_v = nn.Linear(d_model, d_model)

        # Final output projection
        self.W_o = nn.Linear(d_model, d_model)

    def forward(self, x, mask=None):
        batch_size, seq_len, _ = x.shape

        # 1. Linear Projection (Create Q, K, V)
        Q = self.W_q(x)
        K = self.W_k(x)
        V = self.W_v(x)

        # 2. Split into Heads (Reshape)
        # Shape: [Batch, Seq, Heads, Dim] -> [Batch, Heads, Seq, Dim]
        Q = Q.view(batch_size, seq_len, self.num_heads, self.head_dim).transpose(1, 2)
        K = K.view(batch_size, seq_len, self.num_heads, self.head_dim).transpose(1, 2)
        V = V.view(batch_size, seq_len, self.num_heads, self.head_dim).transpose(1, 2)

        # 3. Scaled Dot-Product Attention
        # Scores = (Q @ K^T) / sqrt(d_k)
        scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(self.head_dim)

        if mask is not None:
            scores = scores.masked_fill(mask == 0, -1e9)

        attention_weights = torch.softmax(scores, dim=-1)

        # 4. Weighted Sum (Weights @ V)
        out = torch.matmul(attention_weights, V)

        # 5. Concatenate Heads and Final Linear
        # [Batch, Heads, Seq, Dim] -> [Batch, Seq, Heads * Dim]
        out = out.transpose(1, 2).contiguous().view(batch_size, seq_len, self.d_model)
        return self.W_o(out)

In [31]:
class FeedForward(nn.Module):
    def __init__(self, d_model, d_ff=2048):
        super(FeedForward, self).__init__()
        self.net = nn.Sequential(
            nn.Linear(d_model, d_ff),
            nn.ReLU(),
            nn.Linear(d_ff, d_model)
        )
    def forward(self, x):
        return self.net(x)

In [32]:
class EncoderBlock(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, dropout=0.1):
        super(EncoderBlock, self).__init__()

        self.attention = MultiHeadAttention(d_model, num_heads)
        self.norm1 = nn.LayerNorm(d_model)
        self.dropout1 = nn.Dropout(dropout)

        self.ff = FeedForward(d_model, d_ff)
        self.norm2 = nn.LayerNorm(d_model)
        self.dropout2 = nn.Dropout(dropout)

    def forward(self, x, mask=None):
        # 1. Attention Sublayer
        # Note: In the original paper, Norm is AFTER addition.
        # In modern implementation (Pre-Norm), it's often BEFORE. We stick to paper (Post-Norm).
        attn_out = self.attention(x, mask)
        x = self.norm1(x + self.dropout1(attn_out)) # Add & Norm

        # 2. FeedForward Sublayer
        ff_out = self.ff(x)
        x = self.norm2(x + self.dropout2(ff_out))   # Add & Norm

        return x

In [35]:
class MiniTransformer(nn.Module):
    def __init__(self, vocab_size, d_model, num_heads, num_layers, d_ff, max_len):
        super(MiniTransformer, self).__init__()

        self.embedding = nn.Embedding(vocab_size, d_model)

        self.pos_encoder = PositionalEncoding(d_model, max_len)

        # Stack multiple Encoder Blocks
        self.layers = nn.ModuleList([
            EncoderBlock(d_model, num_heads, d_ff) for _ in range(num_layers)
        ])

        self.fc_out = nn.Linear(d_model, vocab_size)

    def forward(self, x, mask=None):
        # x shape: [Batch, Seq_Len]

        # 1. Embed + Position
        # print(x.shape)
        x = self.embedding(x) * math.sqrt(self.embedding.embedding_dim)
        # print(x.shape)
        x = self.pos_encoder(x)
        # print(x.shape)
        # 2. Pass through all Transformer Layers
        for layer in self.layers:
            x = layer(x, mask)

        # 3. Final Prediction
        output = self.fc_out(x)
        return output

# --- TEST IT ---
# Vocab size 1000, Embedding Dim 512, 8 Heads, 6 Layers
model = MiniTransformer(vocab_size=1000, d_model=512, num_heads=8, num_layers=6, d_ff=2048, max_len=100)

# Dummy Input: Batch of 2 sentences, length 10
dummy_input = torch.randint(0, 1000, (2, 10))
# print(dummy_input)
# Forward Pass
output = model(dummy_input)
print(f"Input Shape: {dummy_input.shape}")
print(f"Output Shape: {output.shape}")
# print(output)
# Output should be [2, 10, 1000] -> Probabilities for every word in sequence

Input Shape: torch.Size([2, 10])
Output Shape: torch.Size([2, 10, 1000])
