In [20]:
import numpy as np
import matplotlib.pyplot as plt

In [21]:
# Define softmax function
def softmax(x):
    """Compute softmax along last dimension"""
    exp_x = np.exp(x - np.max(x, axis=-1, keepdims=True))
    return exp_x / np.sum(exp_x, axis=-1, keepdims=True)

In [None]:
def scaled_dot_product_attention(Q, K, V, mask=None):
    """
    Single attention head (replaces self_attention)

    Args:
        Q: queries (seq_len, d_k)
        K: keys (seq_len, d_k)
        V: values (seq_len, d_v)
        mask: optional mask (seq_len, seq_len)

    Returns:
        output: (seq_len, d_v)
        attention_weights: (seq_len, seq_len)
    """
    d_k = Q.shape[-1]
    scores = (Q @ K.T) / np.sqrt(d_k)

    if mask is not None:
        scores = scores + mask

    attention_weights = softmax(scores)
    output = attention_weights @ V

    return output, attention_weights

In [None]:
def multi_head_attention(X, d_model, num_heads):
    """
    Multi-head attention mechanism

    Args:
        X: input sequence (seq_len, d_model)
        d_model: total dimension
        num_heads: number of attention heads

    Returns:
        output: (seq_len, d_model)
        all_attention_weights: list of (seq_len, seq_len) for each head
    """
    seq_len, _ = X.shape

    # Ensure d_model is divisible by num_heads
    assert d_model % num_heads == 0, "d_model must be divisible by num_heads"

    d_k = d_model // num_heads  # Dimension per head

    # Initialize weight matrices for all heads
    # In practice, these would be learned parameters
    W_q = np.random.randn(d_model, d_model) * 0.01
    W_k = np.random.randn(d_model, d_model) * 0.01
    W_v = np.random.randn(d_model, d_model) * 0.01
    W_o = np.random.randn(d_model, d_model) * 0.01  # Output projection

    # Compute Q, K, V for ALL heads at once
    Q = X @ W_q  # (seq_len, d_model)
    K = X @ W_k  # (seq_len, d_model)
    V = X @ W_v  # (seq_len, d_model)

    # Reshape to separate heads
    # (seq_len, d_model) -> (seq_len, num_heads, d_k)
    Q = Q.reshape(seq_len, num_heads, d_k)
    K = K.reshape(seq_len, num_heads, d_k)
    V = V.reshape(seq_len, num_heads, d_k)

    # Transpose to (num_heads, seq_len, d_k) for easier processing
    Q = Q.transpose(1, 0, 2)
    K = K.transpose(1, 0, 2)
    V = V.transpose(1, 0, 2)

    # Apply attention to each head
    head_outputs = []
    all_attention_weights = []

    for i in range(num_heads):
        head_output, attn_weights = scaled_dot_product_attention(Q[i], K[i], V[i])
        head_outputs.append(head_output)
        all_attention_weights.append(attn_weights)

    # Concatenate all heads
    # Stack: (num_heads, seq_len, d_k) -> (seq_len, num_heads, d_k)
    concat_heads = np.stack(head_outputs, axis=1)

    # Reshape: (seq_len, num_heads, d_k) -> (seq_len, d_model)
    concat_heads = concat_heads.reshape(seq_len, d_model)

    # Final linear projection
    output = concat_heads @ W_o

    return output, all_attention_weights

In [None]:
def positional_encoding(seq_len, d_model):
    """
    Generate positional encoding

    Args:
        seq_len: length of sequence
        d_model: dimension of embeddings

    Returns:
        pos_encoding: (seq_len, d_model)
    """
    # Initialize matrix
    pos_encoding = np.zeros((seq_len, d_model))

    # Create position indices [0, 1, 2, ..., seq_len-1]
    position = np.arange(seq_len)[:, np.newaxis]  # (seq_len, 1)

    # Create dimension indices [0, 2, 4, ..., d_model-2]
    div_term = np.exp(np.arange(0, d_model, 2) * -(np.log(10000.0) / d_model))

    # Apply sine to even indices
    pos_encoding[:, 0::2] = np.sin(position * div_term)

    # Apply cosine to odd indices
    pos_encoding[:, 1::2] = np.cos(position * div_term)

    return pos_encoding

In [None]:
def layer_norm(x, epsilon=1e-6):
    """
    Layer normalization

    Args:
        x: input (seq_len, d_model)
        epsilon: small constant for numerical stability

    Returns:
        normalized: (seq_len, d_model)
    """
    mean = np.mean(x, axis=-1, keepdims=True)
    std = np.std(x, axis=-1, keepdims=True)
    return (x - mean) / (std + epsilon)

In [None]:
def feed_forward(x, d_model, d_ff):
    """
    Position-wise feed-forward network
    FFN(x) = ReLU(xW1 + b1)W2 + b2

    Args:
        x: input (seq_len, d_model)
        d_model: model dimension
        d_ff: hidden dimension (typically 4 * d_model)

    Returns:
        output: (seq_len, d_model)
    """
    # Initialize weights (in practice, these are learned)
    W1 = np.random.randn(d_model, d_ff) * 0.01
    b1 = np.zeros(d_ff)
    W2 = np.random.randn(d_ff, d_model) * 0.01
    b2 = np.zeros(d_model)

    # First linear layer + ReLU
    hidden = np.maximum(0, x @ W1 + b1)  # ReLU activation

    # Second linear layer
    output = hidden @ W2 + b2

    return output

In [None]:
def encoder_layer(x, d_model, num_heads, d_ff):
    """
    Single Transformer encoder layer

    Args:
        x: input (seq_len, d_model)
        d_model: model dimension
        num_heads: number of attention heads
        d_ff: feed-forward hidden dimension

    Returns:
        output: (seq_len, d_model)
    """
    # 1. Multi-head self-attention
    attn_output, _ = multi_head_attention(x, d_model, num_heads)

    # 2. Add & Norm (residual connection)
    x = layer_norm(x + attn_output)

    # 3. Feed-forward network
    ff_output = feed_forward(x, d_model, d_ff)

    # 4. Add & Norm (residual connection)
    output = layer_norm(x + ff_output)

    return output

In [None]:
def transformer_encoder(x, num_layers, d_model, num_heads, d_ff):
    """
    Stack of encoder layers

    Args:
        x: input with positional encoding (seq_len, d_model)
        num_layers: number of encoder layers to stack
        d_model: model dimension
        num_heads: number of attention heads
        d_ff: feed-forward hidden dimension

    Returns:
        output: (seq_len, d_model)
    """
    # Pass through each encoder layer
    for i in range(num_layers):
        x = encoder_layer(x, d_model, num_heads, d_ff)

    return x

In [None]:
# Test encoding
if __name__ == "__main__":
    # Parameters (same as original Transformer paper)
    seq_len = 10
    d_model = 512
    num_heads = 8
    d_ff = 2048  # Typically 4 * d_model
    num_layers = 6

    # Create input sequence (random embeddings)
    x = np.random.randn(seq_len, d_model)

    # Add positional encoding
    pe = positional_encoding(seq_len, d_model)
    x_with_pos = x + pe

    print("=" * 50)
    print("TRANSFORMER ENCODER TEST")
    print("=" * 50)

    # Test single encoder layer
    print("\n1. Single Encoder Layer:")
    layer_output = encoder_layer(x_with_pos, d_model, num_heads, d_ff)
    print(f"   Input shape: {x_with_pos.shape}")
    print(f"   Output shape: {layer_output.shape}")
    print(f"   ✓ Shapes match!")

    # Test full encoder (6 layers stacked)
    print("\n2. Full Encoder (6 layers):")
    encoder_output = transformer_encoder(
        x_with_pos, num_layers, d_model, num_heads, d_ff
    )
    print(f"   Input shape: {x_with_pos.shape}")
    print(f"   Output shape: {encoder_output.shape}")
    print(f"   ✓ Shapes match!")

    # Verify output statistics
    print("\n3. Output Statistics:")
    print(f"   Mean: {encoder_output.mean():.4f}")
    print(f"   Std: {encoder_output.std():.4f}")
    print(f"   Min: {encoder_output.min():.4f}")
    print(f"   Max: {encoder_output.max():.4f}")

    print("\n" + "=" * 50)
    print("✓ ENCODER COMPLETE!")
    print("=" * 50)

TRANSFORMER ENCODER TEST

1. Single Encoder Layer:
   Input shape: (10, 512)
   Output shape: (10, 512)
   ✓ Shapes match!

2. Full Encoder (6 layers):
   Input shape: (10, 512)
   Output shape: (10, 512)
   ✓ Shapes match!

3. Output Statistics:
   Mean: -0.0000
   Std: 1.0000
   Min: -3.7993
   Max: 3.5101

✓ ENCODER COMPLETE!


In [None]:
def create_look_ahead_mask(seq_len):
    """
    Create mask to prevent attending to future positions

    Args:
        seq_len: sequence length

    Returns:
        mask: (seq_len, seq_len) with -inf in upper triangle
    """
    # Create upper triangular matrix of -inf
    mask = np.triu(np.ones((seq_len, seq_len)) * -1e9, k=1)
    return mask

In [32]:
def cross_attention(decoder_input, encoder_output, d_model, num_heads):
    """
    Cross-attention: decoder attends to encoder output

    Args:
        decoder_input: queries from decoder (seq_len_dec, d_model)
        encoder_output: keys/values from encoder (seq_len_enc, d_model)
        d_model: model dimension
        num_heads: number of attention heads

    Returns:
        output: (seq_len_dec, d_model)
        attention_weights: list of attention matrices
    """
    seq_len_dec, _ = decoder_input.shape
    seq_len_enc, _ = encoder_output.shape

    assert d_model % num_heads == 0
    d_k = d_model // num_heads

    # Weight matrices
    W_q = np.random.randn(d_model, d_model) * 0.01  # Queries from decoder
    W_k = np.random.randn(d_model, d_model) * 0.01  # Keys from encoder
    W_v = np.random.randn(d_model, d_model) * 0.01  # Values from encoder
    W_o = np.random.randn(d_model, d_model) * 0.01

    # Q from decoder, K and V from encoder
    Q = decoder_input @ W_q
    K = encoder_output @ W_k
    V = encoder_output @ W_v

    # Reshape for multi-head
    Q = Q.reshape(seq_len_dec, num_heads, d_k).transpose(1, 0, 2)
    K = K.reshape(seq_len_enc, num_heads, d_k).transpose(1, 0, 2)
    V = V.reshape(seq_len_enc, num_heads, d_k).transpose(1, 0, 2)

    # Apply attention to each head
    head_outputs = []
    all_attention_weights = []

    for i in range(num_heads):
        head_output, attn_weights = scaled_dot_product_attention(Q[i], K[i], V[i])
        head_outputs.append(head_output)
        all_attention_weights.append(attn_weights)

    # Concatenate heads
    concat_heads = np.stack(head_outputs, axis=1)
    concat_heads = concat_heads.reshape(seq_len_dec, d_model)

    # Output projection
    output = concat_heads @ W_o

    return output, all_attention_weights

In [None]:
def decoder_layer(x, encoder_output, d_model, num_heads, d_ff):
    """
    Single Transformer decoder layer

    Args:
        x: decoder input (seq_len, d_model)
        encoder_output: output from encoder (seq_len_enc, d_model)
        d_model: model dimension
        num_heads: number of attention heads
        d_ff: feed-forward hidden dimension

    Returns:
        output: (seq_len, d_model)
    """
    seq_len = x.shape[0]

    # 1. Masked self-attention (decoder can't look ahead)
    mask = create_look_ahead_mask(seq_len)

    # Use multi_head_attention but with mask
    # We need to modify multi_head_attention to accept mask
    # For now, we'll do it manually here
    W_q = np.random.randn(d_model, d_model) * 0.01
    W_k = np.random.randn(d_model, d_model) * 0.01
    W_v = np.random.randn(d_model, d_model) * 0.01
    W_o = np.random.randn(d_model, d_model) * 0.01

    d_k = d_model // num_heads

    Q = (x @ W_q).reshape(seq_len, num_heads, d_k).transpose(1, 0, 2)
    K = (x @ W_k).reshape(seq_len, num_heads, d_k).transpose(1, 0, 2)
    V = (x @ W_v).reshape(seq_len, num_heads, d_k).transpose(1, 0, 2)

    head_outputs = []
    for i in range(num_heads):
        head_output, _ = scaled_dot_product_attention(Q[i], K[i], V[i], mask=mask)
        head_outputs.append(head_output)

    masked_attn_output = np.stack(head_outputs, axis=1).reshape(seq_len, d_model) @ W_o

    # Add & Norm
    x = layer_norm(x + masked_attn_output)

    # 2. Cross-attention to encoder output
    cross_attn_output, _ = cross_attention(x, encoder_output, d_model, num_heads)

    # Add & Norm
    x = layer_norm(x + cross_attn_output)

    # 3. Feed-forward network
    ff_output = feed_forward(x, d_model, d_ff)

    # Add & Norm
    output = layer_norm(x + ff_output)

    return output

In [None]:
def transformer_decoder(x, encoder_output, num_layers, d_model, num_heads, d_ff):
    """
    Stack of decoder layers

    Args:
        x: decoder input with positional encoding (seq_len, d_model)
        encoder_output: output from encoder (seq_len_enc, d_model)
        num_layers: number of decoder layers
        d_model: model dimension
        num_heads: number of attention heads
        d_ff: feed-forward hidden dimension

    Returns:
        output: (seq_len, d_model)
    """
    for i in range(num_layers):
        x = decoder_layer(x, encoder_output, d_model, num_heads, d_ff)

    return x

In [None]:
if __name__ == "__main__":
    # Parameters
    seq_len_enc = 10  # Encoder sequence length
    seq_len_dec = 8  # Decoder sequence length (can be different)
    d_model = 512
    num_heads = 8
    d_ff = 2048
    num_layers = 6

    print("=" * 50)
    print("TRANSFORMER DECODER TEST")
    print("=" * 50)

    # Create encoder input and get encoder output
    print("\n1. Running Encoder:")
    encoder_input = np.random.randn(seq_len_enc, d_model)
    pe_enc = positional_encoding(seq_len_enc, d_model)
    encoder_input_with_pos = encoder_input + pe_enc
    encoder_output = transformer_encoder(
        encoder_input_with_pos, num_layers, d_model, num_heads, d_ff
    )
    print(f"   Encoder output shape: {encoder_output.shape}")

    # Create decoder input
    print("\n2. Running Decoder:")
    decoder_input = np.random.randn(seq_len_dec, d_model)
    pe_dec = positional_encoding(seq_len_dec, d_model)
    decoder_input_with_pos = decoder_input + pe_dec

    # Test single decoder layer
    print("\n3. Single Decoder Layer:")
    layer_output = decoder_layer(
        decoder_input_with_pos, encoder_output, d_model, num_heads, d_ff
    )
    print(f"   Decoder input shape: {decoder_input_with_pos.shape}")
    print(f"   Encoder output shape: {encoder_output.shape}")
    print(f"   Decoder layer output shape: {layer_output.shape}")

    # Test full decoder
    print("\n4. Full Decoder (6 layers):")
    decoder_output = transformer_decoder(
        decoder_input_with_pos, encoder_output, num_layers, d_model, num_heads, d_ff
    )
    print(f"   Decoder output shape: {decoder_output.shape}")

    # Test look-ahead mask
    print("\n5. Look-Ahead Mask (prevents future attention):")
    mask = create_look_ahead_mask(5)
    print("   First 5x5 positions:")
    print(mask)
    print("   (0 = can attend, -inf = masked)")

    print("\n6. Output Statistics:")
    print(f"   Mean: {decoder_output.mean():.4f}")
    print(f"   Std: {decoder_output.std():.4f}")
    print(f"   Min: {decoder_output.min():.4f}")
    print(f"   Max: {decoder_output.max():.4f}")

    print("\n" + "=" * 50)
    print("✓ DECODER COMPLETE!")
    print("=" * 50)

TRANSFORMER DECODER TEST

1. Running Encoder:
   Encoder output shape: (10, 512)

2. Running Decoder:

3. Single Decoder Layer:
   Decoder input shape: (8, 512)
   Encoder output shape: (10, 512)
   Decoder layer output shape: (8, 512)

4. Full Decoder (6 layers):
   Decoder output shape: (8, 512)

5. Look-Ahead Mask (prevents future attention):
   First 5x5 positions:
[[ 0.e+00 -1.e+09 -1.e+09 -1.e+09 -1.e+09]
 [ 0.e+00  0.e+00 -1.e+09 -1.e+09 -1.e+09]
 [ 0.e+00  0.e+00  0.e+00 -1.e+09 -1.e+09]
 [ 0.e+00  0.e+00  0.e+00  0.e+00 -1.e+09]
 [ 0.e+00  0.e+00  0.e+00  0.e+00  0.e+00]]
   (0 = can attend, -inf = masked)

6. Output Statistics:
   Mean: 0.0000
   Std: 1.0000
   Min: -3.5534
   Max: 3.0543

✓ DECODER COMPLETE!
