In [1]:
import numpy as np

In [2]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import math

In [5]:
torch.cuda.is_available()

False

## Scaled Dot Product Self-Attention

In [3]:
class ScaledDotProductAttention(nn.Module):
    def __init__(self, d_model, dropout=0.1):
        super(ScaledDotProductAttention, self).__init__()
        self.scale = math.sqrt(d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, query, key, value, mask=None):
        # Calculate attention scores
        scores = torch.matmul(query, key.transpose(-2, -1)) / self.scale
        
        # Apply mask (e.g., causal mask for autoregressive tasks)
        if mask is not None:
            scores = scores.masked_fill(mask == 0, float('-inf'))
        
        # Apply softmax to get attention weights
        attention_weights = F.softmax(scores, dim=-1)
        attention_weights = self.dropout(attention_weights)
        
        # Weighted sum of values
        output = torch.matmul(attention_weights, value)
        return output, attention_weights


## Multi-Head Attention

In [4]:
class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, num_heads, dropout=0.1):
        super(MultiHeadAttention, self).__init__()
        assert d_model % num_heads == 0, "d_model must be divisible by num_heads"
        self.num_heads = num_heads
        self.head_dim = d_model // num_heads

        # Linear layers for query, key, value
        self.query = nn.Linear(d_model, d_model)
        self.key = nn.Linear(d_model, d_model)
        self.value = nn.Linear(d_model, d_model)
        self.fc_out = nn.Linear(d_model, d_model)
        self.attention = ScaledDotProductAttention(d_model, dropout)
        self.dropout = nn.Dropout(dropout)
    
    def forward(self, x, mask=None):
        batch_size, seq_len, d_model = x.size()

        # Linear projections
        query = self.query(x)
        key = self.key(x)
        value = self.value(x)
        
        # Split into multiple heads
        query = query.view(batch_size, seq_len, self.num_heads, self.head_dim).transpose(1, 2)
        key = key.view(batch_size, seq_len, self.num_heads, self.head_dim).transpose(1, 2)
        value = value.view(batch_size, seq_len, self.num_heads, self.head_dim).transpose(1, 2)

        # Scaled dot-product attention
        attention_output, _ = self.attention(query, key, value, mask)

        # Concatenate heads
        attention_output = attention_output.transpose(1, 2).contiguous()
        attention_output = attention_output.view(batch_size, seq_len, d_model)

        # Final linear layer
        output = self.fc_out(attention_output)
        return self.dropout(output)


## Feed Forward Network

In [6]:
class FeedForwardNetwork(nn.Module):
    def __init__(self, d_model, ff_dim, dropout=0.1):
        super(FeedForwardNetwork, self).__init__()
        self.fc1 = nn.Linear(d_model, ff_dim)
        self.fc2 = nn.Linear(ff_dim, d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = self.dropout(x)
        x = self.fc2(x)
        return self.dropout(x)


## Transformer Block

In [7]:
class TransformerBlock(nn.Module):
    def __init__(self, d_model, num_heads, ff_dim, dropout=0.1):
        super(TransformerBlock, self).__init__()
        self.attention = MultiHeadAttention(d_model, num_heads, dropout)
        self.norm1 = nn.LayerNorm(d_model)
        self.ffn = FeedForwardNetwork(d_model, ff_dim, dropout)
        self.norm2 = nn.LayerNorm(d_model)
    
    def forward(self, x, mask=None):
        # Self-attention + Add & Norm
        attn_output = self.attention(x, mask)
        x = self.norm1(x + attn_output)

        # Feed-forward network + Add & Norm
        ffn_output = self.ffn(x)
        x = self.norm2(x + ffn_output)
        return x


## Positional Encoding

In [8]:
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_seq_len):
        super(PositionalEncoding, self).__init__()
        self.encoding = torch.zeros(max_seq_len, d_model)
        pos = torch.arange(0, max_seq_len).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2) * -(math.log(10000.0) / d_model))
        self.encoding[:, 0::2] = torch.sin(pos * div_term)
        self.encoding[:, 1::2] = torch.cos(pos * div_term)
        self.encoding = self.encoding.unsqueeze(0)

    def forward(self, x):
        seq_len = x.size(1)
        return x + self.encoding[:, :seq_len, :].to(x.device)


# Full Transformer Model

In [9]:
class MiniTransformerFromScratch(nn.Module):
    def __init__(self, vocab_size, d_model, num_heads, ff_dim, num_layers, max_seq_len, dropout=0.1):
        super(MiniTransformerFromScratch, self).__init__()
        self.embedding = nn.Embedding(vocab_size, d_model)
        self.positional_encoding = PositionalEncoding(d_model, max_seq_len)
        self.layers = nn.ModuleList([
            TransformerBlock(d_model, num_heads, ff_dim, dropout) for _ in range(num_layers)
        ])
        self.fc_out = nn.Linear(d_model, vocab_size)
    
    def forward(self, x, mask=None):
        # Embedding and positional encoding
        x = self.embedding(x)
        x = self.positional_encoding(x)

        # Pass through transformer layers
        for layer in self.layers:
            x = layer(x, mask)
        
        # Output layer
        return self.fc_out(x)


# Running the Model

In [10]:
# Model parameters
vocab_size = 5000
d_model = 128
num_heads = 4
ff_dim = 512
num_layers = 2
max_seq_len = 128

# Instantiate model
model = MiniTransformerFromScratch(vocab_size, d_model, num_heads, ff_dim, num_layers, max_seq_len)
print(model)

# Example input
dummy_input = torch.randint(0, vocab_size, (8, 128))  # Batch size: 8, Seq len: 128
output = model(dummy_input)
print(output.shape)  # Output: [8, 128, vocab_size]


MiniTransformerFromScratch(
  (embedding): Embedding(5000, 128)
  (positional_encoding): PositionalEncoding()
  (layers): ModuleList(
    (0-1): 2 x TransformerBlock(
      (attention): MultiHeadAttention(
        (query): Linear(in_features=128, out_features=128, bias=True)
        (key): Linear(in_features=128, out_features=128, bias=True)
        (value): Linear(in_features=128, out_features=128, bias=True)
        (fc_out): Linear(in_features=128, out_features=128, bias=True)
        (attention): ScaledDotProductAttention(
          (dropout): Dropout(p=0.1, inplace=False)
        )
        (dropout): Dropout(p=0.1, inplace=False)
      )
      (norm1): LayerNorm((128,), eps=1e-05, elementwise_affine=True)
      (ffn): FeedForwardNetwork(
        (fc1): Linear(in_features=128, out_features=512, bias=True)
        (fc2): Linear(in_features=512, out_features=128, bias=True)
        (dropout): Dropout(p=0.1, inplace=False)
      )
      (norm2): LayerNorm((128,), eps=1e-05, elementwis

In [11]:
output

tensor([[[ 0.8416,  0.3240, -0.3630,  ...,  0.0953, -0.4463,  0.1273],
         [ 0.4044,  0.4454,  0.2501,  ...,  0.9064, -0.6169, -0.2953],
         [-0.3364,  0.4424, -0.5158,  ...,  0.6443, -0.4452,  0.5002],
         ...,
         [ 1.4170, -1.0453,  0.5021,  ...,  0.4204, -0.5039, -0.7998],
         [ 0.5345, -0.8295, -0.9758,  ...,  0.2017, -0.6027, -0.5465],
         [-0.3442, -0.4099, -0.0974,  ...,  0.1960, -0.6370,  0.4604]],

        [[-0.3406,  0.2580, -0.4935,  ...,  0.9173, -0.2970,  0.5329],
         [ 0.7357, -0.7487, -0.4306,  ...,  0.9837,  0.3500,  0.1061],
         [-0.1801,  1.3837, -1.1732,  ...,  0.3499, -0.1861,  1.2262],
         ...,
         [ 0.0654,  0.4164, -0.3795,  ..., -0.5725, -0.6172,  1.0195],
         [ 0.2763, -1.2397,  0.6114,  ..., -0.1280,  0.1388,  0.7767],
         [ 0.4981,  0.4101,  0.8199,  ...,  1.1147, -0.9355,  1.1054]],

        [[ 0.5130, -0.0166, -0.1584,  ..., -0.0459, -0.4756,  0.8818],
         [-0.4252,  1.6053, -1.1361,  ...,  0