<a href="https://colab.research.google.com/github/Hasinireddy-Ainavole/attention-and-transformers/blob/main/attention_and_transformers.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import numpy as np

def scaled_dot_product_attention(Q, K, V, mask=None):
    """
    Compute scaled dot-product attention.

    Args:
        Q: Query matrix of shape (batch_size, seq_len_q, d_k)
        K: Key matrix of shape (batch_size, seq_len_k, d_k)
        V: Value matrix of shape (batch_size, seq_len_v, d_v)
        mask: Optional mask of shape (batch_size, seq_len_q, seq_len_k)

    Returns:
        context: Context vector of shape (batch_size, seq_len_q, d_v)
        attention_weights: Attention weights of shape (batch_size, seq_len_q, seq_len_k)
    """
    # Get the dimension of the key vectors
    d_k = K.shape[-1]

    # Step 1: Compute attention scores (Q · K^T)
    # Shape: (batch_size, seq_len_q, seq_len_k)
    scores = np.matmul(Q, K.transpose(0, 2, 1))

    # Step 2: Scale by sqrt(d_k)
    scaled_scores = scores / np.sqrt(d_k)

    # Step 3: Apply mask if provided (optional)
    if mask is not None:
        scaled_scores = np.where(mask == 0, -1e9, scaled_scores)

    # Step 4: Apply softmax to get attention weights
    # Softmax is applied along the last dimension (over keys)
    attention_weights = softmax(scaled_scores, axis=-1)

    # Step 5: Compute context vector (weighted sum of values)
    # Shape: (batch_size, seq_len_q, d_v)
    context = np.matmul(attention_weights, V)

    return context, attention_weights


def softmax(x, axis=-1):
    """
    Compute softmax values for array x along specified axis.
    Uses numerical stability trick (subtract max).
    """
    # Subtract max for numerical stability
    x_max = np.max(x, axis=axis, keepdims=True)
    exp_x = np.exp(x - x_max)
    return exp_x / np.sum(exp_x, axis=axis, keepdims=True)


# Example usage and testing
if __name__ == "__main__":
    # Set random seed for reproducibility
    np.random.seed(42)

    # Define dimensions
    batch_size = 2
    seq_len_q = 4  # Query sequence length
    seq_len_k = 4  # Key sequence length (same as value)
    d_k = 8        # Dimension of key/query vectors
    d_v = 8        # Dimension of value vectors

    # Create random query, key, value matrices
    Q = np.random.randn(batch_size, seq_len_q, d_k)
    K = np.random.randn(batch_size, seq_len_k, d_k)
    V = np.random.randn(batch_size, seq_len_k, d_v)

    print("Input Shapes:")
    print(f"Q shape: {Q.shape}")
    print(f"K shape: {K.shape}")
    print(f"V shape: {V.shape}")
    print()

    # Compute attention
    context, attention_weights = scaled_dot_product_attention(Q, K, V)

    print("Output Shapes:")
    print(f"Context shape: {context.shape}")
    print(f"Attention weights shape: {attention_weights.shape}")
    print()

    print("Attention Weights (first batch):")
    print(attention_weights[0])
    print()

    # Verify that attention weights sum to 1 along the last dimension
    print("Sum of attention weights (should be ~1.0 for each query):")
    print(np.sum(attention_weights[0], axis=-1))
    print()

    print("Context Vector (first batch, first query):")
    print(context[0, 0])

    # Example with masking (e.g., for causal attention)
    print("\n" + "="*50)
    print("Example with Causal Masking:")
    print("="*50)

    # Create causal mask (lower triangular matrix)
    causal_mask = np.tril(np.ones((seq_len_q, seq_len_k)))
    causal_mask = np.expand_dims(causal_mask, 0)  # Add batch dimension
    causal_mask = np.repeat(causal_mask, batch_size, axis=0)

    context_masked, attention_weights_masked = scaled_dot_product_attention(
        Q, K, V, mask=causal_mask
    )

    print("\nCausal Attention Weights (first batch):")
    print(attention_weights_masked[0])
    print("\nNote: Upper triangle is zero (future positions masked)")

Input Shapes:
Q shape: (2, 4, 8)
K shape: (2, 4, 8)
V shape: (2, 4, 8)

Output Shapes:
Context shape: (2, 4, 8)
Attention weights shape: (2, 4, 4)

Attention Weights (first batch):
[[0.50258161 0.07125796 0.29559999 0.13056044]
 [0.1356621  0.4668456  0.09868613 0.29880617]
 [0.04296256 0.73176166 0.07266276 0.15261301]
 [0.54250442 0.19083811 0.14071448 0.12594299]]

Sum of attention weights (should be ~1.0 for each query):
[1. 1. 1. 1.]

Context Vector (first batch, first query):
[-0.01769307 -0.01455707 -1.04874428 -0.53684339 -0.11980371  0.48081568
 -0.65812311  0.98025517]

Example with Causal Masking:

Causal Attention Weights (first batch):
[[1.         0.         0.         0.        ]
 [0.22516244 0.77483756 0.         0.        ]
 [0.05070005 0.86355075 0.0857492  0.        ]
 [0.54250442 0.19083811 0.14071448 0.12594299]]

Note: Upper triangle is zero (future positions masked)


In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import math


class MultiHeadAttention(nn.Module):
    """Multi-Head Self-Attention mechanism."""

    def __init__(self, d_model, num_heads):
        """
        Args:
            d_model: Model dimension (embedding size)
            num_heads: Number of attention heads
        """
        super(MultiHeadAttention, self).__init__()
        assert d_model % num_heads == 0, "d_model must be divisible by num_heads"

        self.d_model = d_model
        self.num_heads = num_heads
        self.d_k = d_model // num_heads  # Dimension per head

        # Linear projections for Q, K, V
        self.W_q = nn.Linear(d_model, d_model)
        self.W_k = nn.Linear(d_model, d_model)
        self.W_v = nn.Linear(d_model, d_model)

        # Output projection
        self.W_o = nn.Linear(d_model, d_model)

    def scaled_dot_product_attention(self, Q, K, V, mask=None):
        """
        Compute scaled dot-product attention.

        Args:
            Q, K, V: Shape (batch_size, num_heads, seq_len, d_k)
            mask: Optional mask
        """
        # Compute attention scores
        scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(self.d_k)

        # Apply mask if provided
        if mask is not None:
            scores = scores.masked_fill(mask == 0, -1e9)

        # Apply softmax
        attention_weights = F.softmax(scores, dim=-1)

        # Compute context
        context = torch.matmul(attention_weights, V)

        return context, attention_weights

    def forward(self, x, mask=None):
        """
        Args:
            x: Input tensor of shape (batch_size, seq_len, d_model)
            mask: Optional attention mask

        Returns:
            output: Shape (batch_size, seq_len, d_model)
        """
        batch_size, seq_len, d_model = x.size()

        # Linear projections and split into multiple heads
        # Shape: (batch_size, seq_len, d_model) -> (batch_size, num_heads, seq_len, d_k)
        Q = self.W_q(x).view(batch_size, seq_len, self.num_heads, self.d_k).transpose(1, 2)
        K = self.W_k(x).view(batch_size, seq_len, self.num_heads, self.d_k).transpose(1, 2)
        V = self.W_v(x).view(batch_size, seq_len, self.num_heads, self.d_k).transpose(1, 2)

        # Apply attention
        context, attention_weights = self.scaled_dot_product_attention(Q, K, V, mask)

        # Concatenate heads
        # Shape: (batch_size, num_heads, seq_len, d_k) -> (batch_size, seq_len, d_model)
        context = context.transpose(1, 2).contiguous().view(batch_size, seq_len, d_model)

        # Final linear projection
        output = self.W_o(context)

        return output


class FeedForwardNetwork(nn.Module):
    """Position-wise Feed-Forward Network."""

    def __init__(self, d_model, d_ff, dropout=0.1):
        """
        Args:
            d_model: Model dimension
            d_ff: Hidden dimension of feed-forward network
            dropout: Dropout rate
        """
        super(FeedForwardNetwork, self).__init__()
        self.linear1 = nn.Linear(d_model, d_ff)
        self.linear2 = nn.Linear(d_ff, d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        """
        Args:
            x: Input tensor of shape (batch_size, seq_len, d_model)

        Returns:
            output: Shape (batch_size, seq_len, d_model)
        """
        return self.linear2(self.dropout(F.relu(self.linear1(x))))


class TransformerEncoderBlock(nn.Module):
    """Single Transformer Encoder Block with Multi-Head Attention and FFN."""

    def __init__(self, d_model, num_heads, d_ff, dropout=0.1):
        """
        Args:
            d_model: Model dimension (embedding size)
            num_heads: Number of attention heads
            d_ff: Hidden dimension of feed-forward network
            dropout: Dropout rate
        """
        super(TransformerEncoderBlock, self).__init__()

        # Multi-head self-attention
        self.attention = MultiHeadAttention(d_model, num_heads)

        # Feed-forward network
        self.ffn = FeedForwardNetwork(d_model, d_ff, dropout)

        # Layer normalization
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)

        # Dropout
        self.dropout1 = nn.Dropout(dropout)
        self.dropout2 = nn.Dropout(dropout)

    def forward(self, x, mask=None):
        """
        Args:
            x: Input tensor of shape (batch_size, seq_len, d_model)
            mask: Optional attention mask

        Returns:
            output: Shape (batch_size, seq_len, d_model)
        """
        # Multi-head attention with residual connection and layer norm
        attn_output = self.attention(x, mask)
        x = self.norm1(x + self.dropout1(attn_output))  # Add & Norm

        # Feed-forward network with residual connection and layer norm
        ffn_output = self.ffn(x)
        x = self.norm2(x + self.dropout2(ffn_output))  # Add & Norm

        return x


# Example usage and testing
if __name__ == "__main__":
    # Set random seed for reproducibility
    torch.manual_seed(42)

    # Define dimensions as per the question
    batch_size = 32
    seq_len = 10
    d_model = 512      # Model dimension
    num_heads = 8      # Number of attention heads
    d_ff = 2048        # Feed-forward hidden dimension
    dropout = 0.1

    print("="*60)
    print("Transformer Encoder Block - Configuration")
    print("="*60)
    print(f"Batch size: {batch_size}")
    print(f"Sequence length: {seq_len}")
    print(f"Model dimension (d_model): {d_model}")
    print(f"Number of heads: {num_heads}")
    print(f"Feed-forward dimension (d_ff): {d_ff}")
    print(f"Dropout rate: {dropout}")
    print()

    # Initialize the encoder block
    encoder_block = TransformerEncoderBlock(d_model, num_heads, d_ff, dropout)

    # Create sample input (batch of 32 sentences, each with 10 tokens)
    x = torch.randn(batch_size, seq_len, d_model)

    print("Input shape:", x.shape)
    print()

    # Forward pass
    output = encoder_block(x)

    print("="*60)
    print("Output Verification")
    print("="*60)
    print(f"Output shape: {output.shape}")
    print(f"Expected shape: torch.Size([{batch_size}, {seq_len}, {d_model}])")
    print()

    # Verify output shape
    assert output.shape == (batch_size, seq_len, d_model), "Output shape mismatch!"
    print("✓ Output shape verification passed!")
    print()

    # Display model architecture
    print("="*60)
    print("Model Architecture Summary")
    print("="*60)
    print(encoder_block)
    print()

    # Count parameters
    total_params = sum(p.numel() for p in encoder_block.parameters())
    trainable_params = sum(p.numel() for p in encoder_block.parameters() if p.requires_grad)

    print("="*60)
    print("Parameter Count")
    print("="*60)
    print(f"Total parameters: {total_params:,}")
    print(f"Trainable parameters: {trainable_params:,}")
    print()

    # Test with evaluation mode
    encoder_block.eval()
    with torch.no_grad():
        output_eval = encoder_block(x)

    print("="*60)
    print("Evaluation Mode Test")
    print("="*60)
    print(f"Evaluation output shape: {output_eval.shape}")
    print("✓ Evaluation mode test passed!")

Transformer Encoder Block - Configuration
Batch size: 32
Sequence length: 10
Model dimension (d_model): 512
Number of heads: 8
Feed-forward dimension (d_ff): 2048
Dropout rate: 0.1

Input shape: torch.Size([32, 10, 512])

Output Verification
Output shape: torch.Size([32, 10, 512])
Expected shape: torch.Size([32, 10, 512])

✓ Output shape verification passed!

Model Architecture Summary
TransformerEncoderBlock(
  (attention): MultiHeadAttention(
    (W_q): Linear(in_features=512, out_features=512, bias=True)
    (W_k): Linear(in_features=512, out_features=512, bias=True)
    (W_v): Linear(in_features=512, out_features=512, bias=True)
    (W_o): Linear(in_features=512, out_features=512, bias=True)
  )
  (ffn): FeedForwardNetwork(
    (linear1): Linear(in_features=512, out_features=2048, bias=True)
    (linear2): Linear(in_features=2048, out_features=512, bias=True)
    (dropout): Dropout(p=0.1, inplace=False)
  )
  (norm1): LayerNorm((512,), eps=1e-05, elementwise_affine=True)
  (norm2):