In [2]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import math
import numpy as np

In [3]:
class PositionalEncoding(nn.Module):
    """
    Positional Encoding adds information about the position of tokens in a sequence.
    
    Since Transformers don't have recurrence or convolution, they need explicit 
    position information. This is done using sine and cosine functions of different 
    frequencies.
    
    PE(pos, 2i) = sin(pos / 10000^(2i/d_model))
    PE(pos, 2i+1) = cos(pos / 10000^(2i/d_model))
    
    where:
        - pos is the position in the sequence
        - i is the dimension index
        - d_model is the embedding dimension
    """
    
    def __init__(self, d_model, max_len=5000, dropout=0.1):
        """
        Args:
            d_model: Dimension of the model (embedding size)
            max_len: Maximum sequence length
            dropout: Dropout probability
        """
        super(PositionalEncoding, self).__init__()
        self.dropout = nn.Dropout(p=dropout)
        
        # Create a matrix of shape (max_len, d_model) for positional encodings
        pe = torch.zeros(max_len, d_model)
        
        # Create position indices [0, 1, 2, ..., max_len-1]
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        
        # Create the division term for the positional encoding formula
        # This creates: [1, 10000^(2/d_model), 10000^(4/d_model), ...]
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * 
                            (-math.log(10000.0) / d_model))
        
        # Apply sine to even indices in the array (2i)
        pe[:, 0::2] = torch.sin(position * div_term)
        
        # Apply cosine to odd indices in the array (2i+1)
        pe[:, 1::2] = torch.cos(position * div_term)
        
        # Add a batch dimension: (max_len, d_model) -> (1, max_len, d_model)
        pe = pe.unsqueeze(0)
        
        # Register as buffer (not a parameter, but should be saved with the model)
        self.register_buffer('pe', pe)
    
    def forward(self, x):
        """
        Args:
            x: Input embeddings of shape (batch_size, seq_len, d_model)
        
        Returns:
            Embeddings with positional encoding added
        """
        # Add positional encoding to input embeddings
        # x.size(1) is the sequence length
        x = x + self.pe[:, :x.size(1), :]
        return self.dropout(x)

In [4]:
class MultiHeadAttention(nn.Module):
    """
    Multi-Head Attention allows the model to jointly attend to information from 
    different representation subspaces at different positions.
    
    The attention mechanism consists of:
    1. Linear projections of queries, keys, and values
    2. Scaled dot-product attention
    3. Concatenation of attention outputs from multiple heads
    4. Final linear projection
    
    Attention(Q, K, V) = softmax(QK^T / sqrt(d_k)) * V
    
    where:
        - Q: Queries matrix
        - K: Keys matrix
        - V: Values matrix
        - d_k: Dimension of keys
    """
    
    def __init__(self, d_model, num_heads, dropout=0.1):
        """
        Args:
            d_model: Dimension of the model
            num_heads: Number of attention heads
            dropout: Dropout probability
        """
        super(MultiHeadAttention, self).__init__()
        
        assert d_model % num_heads == 0, "d_model must be divisible by num_heads"
        
        self.d_model = d_model
        self.num_heads = num_heads
        self.d_k = d_model // num_heads  # Dimension of each head
        
        # Linear layers for Q, K, V projections
        self.W_q = nn.Linear(d_model, d_model)
        self.W_k = nn.Linear(d_model, d_model)
        self.W_v = nn.Linear(d_model, d_model)
        
        # Final linear layer after concatenating heads
        self.W_o = nn.Linear(d_model, d_model)
        
        self.dropout = nn.Dropout(dropout)
        
    def scaled_dot_product_attention(self, Q, K, V, mask=None):
        """
        Compute scaled dot-product attention.
        
        Args:
            Q: Queries of shape (batch_size, num_heads, seq_len, d_k)
            K: Keys of shape (batch_size, num_heads, seq_len, d_k)
            V: Values of shape (batch_size, num_heads, seq_len, d_k)
            mask: Optional mask of shape (batch_size, 1, seq_len, seq_len)
        
        Returns:
            output: Attention output of shape (batch_size, num_heads, seq_len, d_k)
            attention_weights: Attention weights of shape (batch_size, num_heads, seq_len, seq_len)
        """
        # Calculate attention scores: Q @ K^T / sqrt(d_k)
        # Shape: (batch_size, num_heads, seq_len_q, seq_len_k)
        scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(self.d_k)
        
        # Apply mask (if provided) by setting masked positions to large negative value
        if mask is not None:
            scores = scores.masked_fill(mask == 0, -1e9)
        
        # Apply softmax to get attention weights
        attention_weights = F.softmax(scores, dim=-1)
        attention_weights = self.dropout(attention_weights)
        
        # Apply attention weights to values
        # Shape: (batch_size, num_heads, seq_len, d_k)
        output = torch.matmul(attention_weights, V)
        
        return output, attention_weights
    
    def forward(self, query, key, value, mask=None):
        """
        Args:
            query: Query tensor of shape (batch_size, seq_len, d_model)
            key: Key tensor of shape (batch_size, seq_len, d_model)
            value: Value tensor of shape (batch_size, seq_len, d_model)
            mask: Optional mask tensor
        
        Returns:
            output: Multi-head attention output
            attention_weights: Attention weights for visualization
        """
        batch_size = query.size(0)
        
        # 1. Linear projections in batch: (batch_size, seq_len, d_model)
        Q = self.W_q(query)
        K = self.W_k(key)
        V = self.W_v(value)
        
        # 2. Split into multiple heads: (batch_size, num_heads, seq_len, d_k)
        Q = Q.view(batch_size, -1, self.num_heads, self.d_k).transpose(1, 2)
        K = K.view(batch_size, -1, self.num_heads, self.d_k).transpose(1, 2)
        V = V.view(batch_size, -1, self.num_heads, self.d_k).transpose(1, 2)
        
        # 3. Apply scaled dot-product attention
        attn_output, attention_weights = self.scaled_dot_product_attention(Q, K, V, mask)
        
        # 4. Concatenate heads: (batch_size, seq_len, d_model)
        attn_output = attn_output.transpose(1, 2).contiguous().view(
            batch_size, -1, self.d_model
        )
        
        # 5. Apply final linear layer
        output = self.W_o(attn_output)
        
        return output, attention_weights

In [5]:
class PositionWiseFeedForward(nn.Module):
    """
    Position-wise Feed-Forward Network consists of two linear transformations
    with a ReLU activation in between.
    
    FFN(x) = max(0, xW1 + b1)W2 + b2
    
    This is applied to each position separately and identically. The same
    FFN is applied at every position.
    """
    
    def __init__(self, d_model, d_ff, dropout=0.1):
        """
        Args:
            d_model: Dimension of the model
            d_ff: Dimension of the feed-forward layer (typically 4 * d_model)
            dropout: Dropout probability
        """
        super(PositionWiseFeedForward, self).__init__()
        
        # First linear layer expands dimension
        self.linear1 = nn.Linear(d_model, d_ff)
        
        # Second linear layer projects back to d_model
        self.linear2 = nn.Linear(d_ff, d_model)
        
        self.dropout = nn.Dropout(dropout)
    
    def forward(self, x):
        """
        Args:
            x: Input tensor of shape (batch_size, seq_len, d_model)
        
        Returns:
            Output tensor of shape (batch_size, seq_len, d_model)
        """
        # Apply first linear layer with ReLU activation
        x = self.linear1(x)
        x = F.relu(x)
        x = self.dropout(x)
        
        # Apply second linear layer
        x = self.linear2(x)
        
        return x


In [6]:
class EncoderLayer(nn.Module):
    """
    A single Transformer Encoder Layer consists of:
    1. Multi-Head Self-Attention
    2. Add & Norm (Residual connection + Layer Normalization)
    3. Position-wise Feed-Forward Network
    4. Add & Norm (Residual connection + Layer Normalization)
    
    Each sub-layer has a residual connection followed by layer normalization.
    """
    
    def __init__(self, d_model, num_heads, d_ff, dropout=0.1):
        """
        Args:
            d_model: Dimension of the model
            num_heads: Number of attention heads
            d_ff: Dimension of feed-forward layer
            dropout: Dropout probability
        """
        super(EncoderLayer, self).__init__()
        
        # Multi-head self-attention
        self.self_attn = MultiHeadAttention(d_model, num_heads, dropout)
        
        # Position-wise feed-forward network
        self.feed_forward = PositionWiseFeedForward(d_model, d_ff, dropout)
        
        # Layer normalization
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        
        # Dropout
        self.dropout1 = nn.Dropout(dropout)
        self.dropout2 = nn.Dropout(dropout)
    
    def forward(self, x, mask=None):
        """
        Args:
            x: Input tensor of shape (batch_size, seq_len, d_model)
            mask: Optional attention mask
        
        Returns:
            Output tensor of shape (batch_size, seq_len, d_model)
        """
        # 1. Multi-head self-attention with residual connection and layer norm
        attn_output, _ = self.self_attn(x, x, x, mask)
        x = x + self.dropout1(attn_output)  # Residual connection
        x = self.norm1(x)  # Layer normalization
        
        # 2. Feed-forward network with residual connection and layer norm
        ff_output = self.feed_forward(x)
        x = x + self.dropout2(ff_output)  # Residual connection
        x = self.norm2(x)  # Layer normalization
        
        return x

In [7]:
class DecoderLayer(nn.Module):
    """
    A single Transformer Decoder Layer consists of:
    1. Masked Multi-Head Self-Attention
    2. Add & Norm
    3. Multi-Head Cross-Attention (attending to encoder output)
    4. Add & Norm
    5. Position-wise Feed-Forward Network
    6. Add & Norm
    
    The self-attention in the decoder is masked to prevent positions from 
    attending to subsequent positions (autoregressive property).
    """
    
    def __init__(self, d_model, num_heads, d_ff, dropout=0.1):
        """
        Args:
            d_model: Dimension of the model
            num_heads: Number of attention heads
            d_ff: Dimension of feed-forward layer
            dropout: Dropout probability
        """
        super(DecoderLayer, self).__init__()
        
        # Masked multi-head self-attention
        self.self_attn = MultiHeadAttention(d_model, num_heads, dropout)
        
        # Multi-head cross-attention (decoder attends to encoder)
        self.cross_attn = MultiHeadAttention(d_model, num_heads, dropout)
        
        # Position-wise feed-forward network
        self.feed_forward = PositionWiseFeedForward(d_model, d_ff, dropout)
        
        # Layer normalization
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.norm3 = nn.LayerNorm(d_model)
        
        # Dropout
        self.dropout1 = nn.Dropout(dropout)
        self.dropout2 = nn.Dropout(dropout)
        self.dropout3 = nn.Dropout(dropout)
    
    def forward(self, x, encoder_output, src_mask=None, tgt_mask=None):
        """
        Args:
            x: Decoder input of shape (batch_size, tgt_seq_len, d_model)
            encoder_output: Encoder output of shape (batch_size, src_seq_len, d_model)
            src_mask: Source attention mask
            tgt_mask: Target attention mask (for masking future positions)
        
        Returns:
            Output tensor of shape (batch_size, tgt_seq_len, d_model)
        """
        # 1. Masked multi-head self-attention with residual and layer norm
        self_attn_output, _ = self.self_attn(x, x, x, tgt_mask)
        x = x + self.dropout1(self_attn_output)
        x = self.norm1(x)
        
        # 2. Multi-head cross-attention with residual and layer norm
        # Query from decoder, Key and Value from encoder
        cross_attn_output, _ = self.cross_attn(x, encoder_output, encoder_output, src_mask)
        x = x + self.dropout2(cross_attn_output)
        x = self.norm2(x)
        
        # 3. Feed-forward network with residual and layer norm
        ff_output = self.feed_forward(x)
        x = x + self.dropout3(ff_output)
        x = self.norm3(x)
        
        return x

In [8]:
class TransformerEncoder(nn.Module):
    """
    The Transformer Encoder consists of:
    1. Input Embedding + Positional Encoding
    2. N stacked Encoder Layers
    
    The encoder processes the source sequence and produces a continuous 
    representation that the decoder can attend to.
    """
    
    def __init__(self, vocab_size, d_model, num_heads, num_layers, d_ff, 
                 max_len=5000, dropout=0.1):
        """
        Args:
            vocab_size: Size of the vocabulary
            d_model: Dimension of the model
            num_heads: Number of attention heads
            num_layers: Number of encoder layers
            d_ff: Dimension of feed-forward layer
            max_len: Maximum sequence length
            dropout: Dropout probability
        """
        super(TransformerEncoder, self).__init__()
        
        self.d_model = d_model
        
        # Embedding layer
        self.embedding = nn.Embedding(vocab_size, d_model)
        
        # Positional encoding
        self.pos_encoding = PositionalEncoding(d_model, max_len, dropout)
        
        # Stack of N encoder layers
        self.layers = nn.ModuleList([
            EncoderLayer(d_model, num_heads, d_ff, dropout)
            for _ in range(num_layers)
        ])
        
        self.dropout = nn.Dropout(dropout)
    
    def forward(self, src, src_mask=None):
        """
        Args:
            src: Source sequence of shape (batch_size, src_seq_len)
            src_mask: Optional source mask
        
        Returns:
            Encoder output of shape (batch_size, src_seq_len, d_model)
        """
        # 1. Embedding and scaling
        # The embeddings are scaled by sqrt(d_model) as per the paper
        x = self.embedding(src) * math.sqrt(self.d_model)
        
        # 2. Add positional encoding
        x = self.pos_encoding(x)
        
        # 3. Pass through each encoder layer
        for layer in self.layers:
            x = layer(x, src_mask)
        
        return x

In [9]:

class TransformerDecoder(nn.Module):
    """
    The Transformer Decoder consists of:
    1. Output Embedding + Positional Encoding
    2. N stacked Decoder Layers
    3. Final linear layer to project to vocabulary
    
    The decoder generates the target sequence one token at a time,
    using both the encoder output and previously generated tokens.
    """
    
    def __init__(self, vocab_size, d_model, num_heads, num_layers, d_ff,
                 max_len=5000, dropout=0.1):
        """
        Args:
            vocab_size: Size of the vocabulary
            d_model: Dimension of the model
            num_heads: Number of attention heads
            num_layers: Number of decoder layers
            d_ff: Dimension of feed-forward layer
            max_len: Maximum sequence length
            dropout: Dropout probability
        """
        super(TransformerDecoder, self).__init__()
        
        self.d_model = d_model
        
        # Embedding layer
        self.embedding = nn.Embedding(vocab_size, d_model)
        
        # Positional encoding
        self.pos_encoding = PositionalEncoding(d_model, max_len, dropout)
        
        # Stack of N decoder layers
        self.layers = nn.ModuleList([
            DecoderLayer(d_model, num_heads, d_ff, dropout)
            for _ in range(num_layers)
        ])
        
        # Final linear layer to project to vocabulary
        self.fc_out = nn.Linear(d_model, vocab_size)
        
        self.dropout = nn.Dropout(dropout)
    
    def forward(self, tgt, encoder_output, src_mask=None, tgt_mask=None):
        """
        Args:
            tgt: Target sequence of shape (batch_size, tgt_seq_len)
            encoder_output: Encoder output of shape (batch_size, src_seq_len, d_model)
            src_mask: Optional source mask
            tgt_mask: Optional target mask (for masking future positions)
        
        Returns:
            Decoder output of shape (batch_size, tgt_seq_len, vocab_size)
        """
        # 1. Embedding and scaling
        x = self.embedding(tgt) * math.sqrt(self.d_model)
        
        # 2. Add positional encoding
        x = self.pos_encoding(x)
        
        # 3. Pass through each decoder layer
        for layer in self.layers:
            x = layer(x, encoder_output, src_mask, tgt_mask)
        
        # 4. Project to vocabulary size
        output = self.fc_out(x)
        
        return output

In [10]:
class TransformerDecoder(nn.Module):
    """
    The Transformer Decoder consists of:
    1. Output Embedding + Positional Encoding
    2. N stacked Decoder Layers
    3. Final linear layer to project to vocabulary
    
    The decoder generates the target sequence one token at a time,
    using both the encoder output and previously generated tokens.
    """
    
    def __init__(self, vocab_size, d_model, num_heads, num_layers, d_ff,
                 max_len=5000, dropout=0.1):
        """
        Args:
            vocab_size: Size of the vocabulary
            d_model: Dimension of the model
            num_heads: Number of attention heads
            num_layers: Number of decoder layers
            d_ff: Dimension of feed-forward layer
            max_len: Maximum sequence length
            dropout: Dropout probability
        """
        super(TransformerDecoder, self).__init__()
        
        self.d_model = d_model
        
        # Embedding layer
        self.embedding = nn.Embedding(vocab_size, d_model)
        
        # Positional encoding
        self.pos_encoding = PositionalEncoding(d_model, max_len, dropout)
        
        # Stack of N decoder layers
        self.layers = nn.ModuleList([
            DecoderLayer(d_model, num_heads, d_ff, dropout)
            for _ in range(num_layers)
        ])
        
        # Final linear layer to project to vocabulary
        self.fc_out = nn.Linear(d_model, vocab_size)
        
        self.dropout = nn.Dropout(dropout)
    
    def forward(self, tgt, encoder_output, src_mask=None, tgt_mask=None):
        """
        Args:
            tgt: Target sequence of shape (batch_size, tgt_seq_len)
            encoder_output: Encoder output of shape (batch_size, src_seq_len, d_model)
            src_mask: Optional source mask
            tgt_mask: Optional target mask (for masking future positions)
        
        Returns:
            Decoder output of shape (batch_size, tgt_seq_len, vocab_size)
        """
        # 1. Embedding and scaling
        x = self.embedding(tgt) * math.sqrt(self.d_model)
        
        # 2. Add positional encoding
        x = self.pos_encoding(x)
        
        # 3. Pass through each decoder layer
        for layer in self.layers:
            x = layer(x, encoder_output, src_mask, tgt_mask)
        
        # 4. Project to vocabulary size
        output = self.fc_out(x)
        
        return output


# ============================================================================
# 8. COMPLETE TRANSFORMER MODEL
# ============================================================================

class Transformer(nn.Module):
    """
    Complete Transformer model combining Encoder and Decoder.
    
    The Transformer is used for sequence-to-sequence tasks like machine translation,
    text summarization, etc.
    
    Architecture:
        Source -> Encoder -> Encoder Output
        Target -> Decoder (with Encoder Output) -> Output Logits
    """
    
    def __init__(self, src_vocab_size, tgt_vocab_size, d_model=512, num_heads=8,
                 num_encoder_layers=6, num_decoder_layers=6, d_ff=2048,
                 max_len=5000, dropout=0.1):
        """
        Args:
            src_vocab_size: Source vocabulary size
            tgt_vocab_size: Target vocabulary size
            d_model: Dimension of the model (default: 512)
            num_heads: Number of attention heads (default: 8)
            num_encoder_layers: Number of encoder layers (default: 6)
            num_decoder_layers: Number of decoder layers (default: 6)
            d_ff: Dimension of feed-forward layer (default: 2048)
            max_len: Maximum sequence length (default: 5000)
            dropout: Dropout probability (default: 0.1)
        """
        super(Transformer, self).__init__()
        
        # Encoder
        self.encoder = TransformerEncoder(
            src_vocab_size, d_model, num_heads, num_encoder_layers,
            d_ff, max_len, dropout
        )
        
        # Decoder
        self.decoder = TransformerDecoder(
            tgt_vocab_size, d_model, num_heads, num_decoder_layers,
            d_ff, max_len, dropout
        )
    
    def make_src_mask(self, src, src_pad_idx=0):
        """
        Create mask for source sequence to ignore padding tokens.
        
        Args:
            src: Source sequence of shape (batch_size, src_seq_len)
            src_pad_idx: Index of padding token
        
        Returns:
            Source mask of shape (batch_size, 1, 1, src_seq_len)
        """
        src_mask = (src != src_pad_idx).unsqueeze(1).unsqueeze(2)
        return src_mask
    
    def make_tgt_mask(self, tgt, tgt_pad_idx=0):
        """
        Create mask for target sequence to:
        1. Ignore padding tokens
        2. Prevent attention to future positions (causal mask)
        
        Args:
            tgt: Target sequence of shape (batch_size, tgt_seq_len)
            tgt_pad_idx: Index of padding token
        
        Returns:
            Target mask of shape (batch_size, 1, tgt_seq_len, tgt_seq_len)
        """
        batch_size, tgt_len = tgt.shape
        
        # Create padding mask
        tgt_pad_mask = (tgt != tgt_pad_idx).unsqueeze(1).unsqueeze(2)
        # Shape: (batch_size, 1, 1, tgt_seq_len)
        
        # Create subsequent mask (no look ahead mask)
        tgt_sub_mask = torch.tril(
            torch.ones((tgt_len, tgt_len), device=tgt.device)
        ).bool()
        # Shape: (tgt_seq_len, tgt_seq_len)
        
        # Combine both masks
        tgt_mask = tgt_pad_mask & tgt_sub_mask
        # Shape: (batch_size, 1, tgt_seq_len, tgt_seq_len)
        
        return tgt_mask
    
    def forward(self, src, tgt, src_pad_idx=0, tgt_pad_idx=0):
        """
        Forward pass through the Transformer.
        
        Args:
            src: Source sequence of shape (batch_size, src_seq_len)
            tgt: Target sequence of shape (batch_size, tgt_seq_len)
            src_pad_idx: Source padding token index
            tgt_pad_idx: Target padding token index
        
        Returns:
            Output logits of shape (batch_size, tgt_seq_len, tgt_vocab_size)
        """
        # Create masks
        src_mask = self.make_src_mask(src, src_pad_idx)
        tgt_mask = self.make_tgt_mask(tgt, tgt_pad_idx)
        
        # Encode source sequence
        encoder_output = self.encoder(src, src_mask)
        
        # Decode to generate target sequence
        output = self.decoder(tgt, encoder_output, src_mask, tgt_mask)
        
        return output

In [11]:
def create_sample_transformer():
    """
    Create a sample Transformer model with common hyperparameters.
    
    This example uses parameters similar to the base model in the paper:
    - d_model = 512
    - num_heads = 8
    - num_layers = 6
    - d_ff = 2048
    """
    # Example vocabulary sizes
    src_vocab_size = 10000  # Source language vocabulary
    tgt_vocab_size = 10000  # Target language vocabulary
    
    # Create the model
    model = Transformer(
        src_vocab_size=src_vocab_size,
        tgt_vocab_size=tgt_vocab_size,
        d_model=512,
        num_heads=8,
        num_encoder_layers=6,
        num_decoder_layers=6,
        d_ff=2048,
        max_len=100,
        dropout=0.1
    )
    
    return model


In [12]:
create_sample_transformer()

Transformer(
  (encoder): TransformerEncoder(
    (embedding): Embedding(10000, 512)
    (pos_encoding): PositionalEncoding(
      (dropout): Dropout(p=0.1, inplace=False)
    )
    (layers): ModuleList(
      (0-5): 6 x EncoderLayer(
        (self_attn): MultiHeadAttention(
          (W_q): Linear(in_features=512, out_features=512, bias=True)
          (W_k): Linear(in_features=512, out_features=512, bias=True)
          (W_v): Linear(in_features=512, out_features=512, bias=True)
          (W_o): Linear(in_features=512, out_features=512, bias=True)
          (dropout): Dropout(p=0.1, inplace=False)
        )
        (feed_forward): PositionWiseFeedForward(
          (linear1): Linear(in_features=512, out_features=2048, bias=True)
          (linear2): Linear(in_features=2048, out_features=512, bias=True)
          (dropout): Dropout(p=0.1, inplace=False)
        )
        (norm1): LayerNorm((512,), eps=1e-05, elementwise_affine=True)
        (norm2): LayerNorm((512,), eps=1e-05, elemen