# Imports

In [245]:
import numpy as np
from typing import Optional, Tuple

# Implement a numerically stable softmax

In [246]:
def softmax(x: np.ndarray, axis: int = -1) -> np.ndarray:
    """Compute the softmax of each element along an axis of X."""
    # Subtract the max for numerical stability
    x_max = np.max(x, axis=axis, keepdims=True)
    e_x = np.exp(x - x_max)
    sum_e_x = np.sum(e_x, axis=axis, keepdims=True)
    return e_x / sum_e_x

In [247]:
# Testing softmax function
x = np.array([1,2,3,0])
print("Softmax result:\n", softmax(x, axis=-1))
print("Sum along axis -1 (should be 1):\n", np.sum(softmax(x, axis=-1), axis=-1))

Softmax result:
 [0.08714432 0.23688282 0.64391426 0.0320586 ]
Sum along axis -1 (should be 1):
 1.0


# Implement the scaled dot-prodcut attention mechanism

$$\text{Attention}(Q, K, V) = \text{softmax}\left(\frac{QK^T}{\sqrt{d_k}}\right)V$$

In [248]:
def scaled_dot_product_attention(Q: np.ndarray, K: np.ndarray, V: np.ndarray, mask: Optional[np.ndarray] = None) -> Tuple[np.ndarray, np.ndarray]:
    """Compute scaled dot-product attention.
    Args:
        Q: queries shape (batch, seq_len_q, depth)
        K: keys shape (batch, seq_len_k, depth)
        V: values shape (batch, seq_len_v, depth_v)
        mask: optional mask broadcastable to (batch, seq_len_q, seq_len_k) where masked positions are True/1
    Returns:
        output: shape (batch, seq_len_q, depth_v)
        attention_weights: shape (batch, seq_len_q, seq_len_k)
    Notes:
        This implementation assumes K and V have compatible sequence lengths and uses the last dimension as depth.
    """
    # Compute raw scores: (batch, seq_q, seq_k)
    
    # Scaling factor
    dk = Q.shape[-1] # equivalent to the depth of K
    # Swap the last two axes of K same as K.T in the formula
    scores = np.matmul(Q, np.swapaxes(K, -1, -2)) / np.sqrt(dk)

    if mask is not None:
        # set large negative value to the masked positions
        scores = np.where(mask, -1e10, scores)
    '''
    ###
    Attention: Expresses the relevance of each key to a given query.
    Output = softmax(QK^T / sqrt(dk)) V
    ###
    '''
    # Apply softmax to get attention weights
    attn = softmax(scores, axis=-1)
    # Compute the final output
    output = np.matmul(attn, V)
    return output, attn



In [260]:
# Testing scaled dot-product attention
batch, seq_q, seq_k, depth = 1, 2, 2, 2
Q = np.random.randn(batch, seq_q, depth)
K = np.random.randn(batch, seq_k, depth)
V = np.random.randn(batch, seq_k, depth)
print("Q:\n", Q)
mask = np.random.rand(batch, seq_q, seq_k) > 0.5
print("Mask:\n", mask.astype(int))
output, attn = scaled_dot_product_attention(Q, K, V, mask)
print("Output shape:", output.shape) 
print(output)
print("Attention shape:", attn.shape) 
print(attn)

Q:
 [[[ 0.81362694 -0.28537062]
  [-0.085753   -0.53365127]]]
Mask:
 [[[1 1]
  [0 0]]]
Output shape: (1, 2, 2)
[[[0.20485025 0.11794609]
  [0.09490977 0.01956671]]]
Attention shape: (1, 2, 2)
[[[0.5        0.5       ]
  [0.41702649 0.58297351]]]


# Implement positional encoding

inject information about the position of the tokens in a sequence <br>
Important as:
Eve likes Adam is not equal to Adam likes Eve

In [261]:
def positional_encoding(max_len: int, d_model: int) -> np.ndarray:
    """Generate sinusoidal positional encodings as in the original paper.
    Returns array of shape (max_len, d_model).
    Args:
        max_len: maximum length of the sequence
        d_model: dimension of the model's embeddings
    """
    # Create a position matrix
    position = np.arange(max_len)[:, np.newaxis]  # (max_len, 1)
    # Create a constant for numerical stability
    # Compute the positional encodings
    div_term = np.exp(np.arange(0, d_model, 2) * - (np.log(10000.0) / d_model))
    # Create an all zero matrix to be filled with encodings
    positional_encoding = np.zeros((max_len, d_model))
    # Apply sin to the evenly indexed positions
    positional_encoding[:, 0::2] = np.sin(position * div_term)
    
    # Fix the odd dimensionality issue
    if d_model % 2 == 1:
        no_odd_dims = positional_encoding[:, 1::2].shape[1]
        positional_encoding[:, 1::2] = np.cos(position * div_term[:no_odd_dims])
    else:
        # Apply cos to the others
        positional_encoding[:, 1::2] = np.cos(position * div_term)
    return positional_encoding

In [264]:
# Test positional encoding
max_len = 10
d_model = 3
pos_encoding = positional_encoding(max_len, d_model)
print("Positional Encoding shape:", pos_encoding.shape)
print(pos_encoding)

Positional Encoding shape: (10, 3)
[[ 0.          1.          0.        ]
 [ 0.84147098  0.54030231  0.00215443]
 [ 0.90929743 -0.41614684  0.00430886]
 [ 0.14112001 -0.9899925   0.00646326]
 [-0.7568025  -0.65364362  0.00861763]
 [-0.95892427  0.28366219  0.01077197]
 [-0.2794155   0.96017029  0.01292625]
 [ 0.6569866   0.75390225  0.01508047]
 [ 0.98935825 -0.14550003  0.01723462]
 [ 0.41211849 -0.91113026  0.0193887 ]]


# Multi headed attention

Split the input into multiple heads <br>
$${MultiHead(Q,K,V)=Concat(head_1,head_2,...,head_h)*W^O}$$
where
$${head_i=Attention(QW_i^Q,KW_i^K,VW_i^V)}$$

In [265]:
class MultiHeadAttention:
    def __init__(self, d_model: int, num_heads: int, seed: int = 42):
        """
        Args:
            d_model: model dimensionality
            num_heads: number of attention heads
            seed: random seed for weight initialization and reproducibility
        """
        assert d_model % num_heads == 0, "It is required for the model dimension to be divisible by the num. of heads."
        self.d_model = d_model
        self.num_heads = num_heads
        self.depth = d_model // num_heads
        
        # Initialise a random state
        generator = np.random.RandomState(seed)

        # Initialise weights
        # inside __init__
        std = 1.0 / np.sqrt(d_model)
        self.Wq = generator.normal(scale=std, size=(d_model, d_model))
        self.Wk = generator.normal(scale=std, size=(d_model, d_model))
        self.Wv = generator.normal(scale=std, size=(d_model, d_model))
        self.Wo = generator.normal(scale=std, size=(d_model, d_model))
        # self.Wq = generator.normal(scale=0.2, size=(d_model, d_model))
        # self.Wk = generator.normal(scale=0.2, size=(d_model, d_model))
        # self.Wv = generator.normal(scale=0.2, size=(d_model, d_model))
        # self.Wo = generator.normal(scale=0.2, size=(d_model, d_model))
        
    def split_heads(self, x: np.ndarray) -> np.ndarray:
        """
        Split the last dimension into (num_heads, depth).
        Args:
            x: input of shape (batch_size, seq_len, d_model)
        Returns:
            reshaped x of shape (batch_size, num_heads, seq_len, depth)
        """
        # x.shape: (batch_size, seq_len, d_model)
        batch_size = x.shape[0]
        sequence_length = x.shape[1]
        x = x.reshape(batch_size, sequence_length, self.num_heads, self.depth)
        return np.transpose(x, (0, 2, 1, 3))
        
        
    def combine_heads(self, x: np.ndarray) -> np.ndarray:
        """
        Combine the heads to get back to original d_model dimension.
        Args:
            x: input of shape (batch_size, num_heads, seq_len, depth)
            Returns:
            reshaped x of shape (batch_size, seq_len, d_model)
        """
        x = np.transpose(x, (0, 2, 1, 3))  # (batch_size, seq_len, ...)
        batch_size = x.shape[0]
        sequence_length = x.shape[1]
        return x.reshape(batch_size, sequence_length, self.d_model)
    
    def __call__(self, Q: np.ndarray, K: np.ndarray, V: np.ndarray, mask: Optional[np.ndarray] = None) -> Tuple[np.ndarray, np.ndarray]:
        """
        Calculate the multi headed attention.
        Return a tuple of (output, attention_weights)
        Args:
            Q: queries shape (batch_size, seq_len_q, d_model)
            K: keys shape (batch_size, seq_len_k, d_model)
            V: values shape (batch_size, seq_len_v, d_model)
            mask: optional mask broadcastable to (batch_size, num_heads, seq_len_q, seq_len_k)
        """
        # Linearly project Q, K, and V
        q = Q @ self.Wq  # (batch_size, seq_len_q, d_model)
        k = K @ self.Wk  
        v = V @ self.Wv  
        
        # Split the heads into multiple ones
        qheads = self.split_heads(q)  # (batch_size, num_heads, seq_len_q, depth)
        kheads = self.split_heads(k)  # (batch_size, num_heads, seq_len_k, depth)
        vheads = self.split_heads(v)  # (batch_size, num_heads, seq_len_v, depth)
        
        # Batched attention
        # Way more efficient than a standard for loop over the heads
        # batch, num_heads, sequence_q, depth
        batch, num_heads, sequence_q, depth = qheads.shape
        _q = qheads.reshape(batch * num_heads, sequence_q, depth)
        _k = kheads.reshape(batch * num_heads, kheads.shape[2], depth)
        _v = vheads.reshape(batch * num_heads, vheads.shape[2], depth)
        
        # We need to expand the mask for all the heads
        if mask is not None:
            mask = np.repeat(mask, self.num_heads, axis=0)

        output, attention = scaled_dot_product_attention(_q, _k, _v, mask)
        
        # Reashape everything back for combining heads
        output = output.reshape(batch, num_heads, sequence_q, depth)
        combined_output = self.combine_heads(output)  # (batch_size, seq_len_q, d_model)
        output = combined_output @ self.Wo  # Multiplying with the final weight 
        
        # Reshape attention back to (batch_size, num_heads, seq_len_q, seq_len_k)
        attention = attention.reshape(batch, num_heads, attention.shape[1], attention.shape[2])

        return output, attention

In [269]:
#Test multi headed attention
batch, seq_q, seq_k, depth = 1, 2, 2, 2
Q = np.random.randn(batch, seq_q, depth)
K = np.random.randn(batch, seq_k, depth)
V = np.random.randn(batch, seq_k, depth)
print("Q:\n", Q)
mask = np.random.rand(batch, seq_q, seq_k) > 0.5
print("Mask:\n", mask.astype(int))

mha = MultiHeadAttention(d_model = depth, num_heads = 2, seed = 42)
output, attention = mha(Q, K, V, mask)
print("Output:\n", output)
print("Attention:\n", attention)

Q:
 [[[-0.42421834 -0.85574172]
  [-0.29170982 -0.35202485]]]
Mask:
 [[[1 1]
  [0 1]]]
Output:
 [[[ 0.47520055 -0.46817396]
  [ 0.35769828 -0.29930607]]]
Attention:
 [[[[0.5 0.5]
   [1.  0. ]]

  [[0.5 0.5]
   [1.  0. ]]]]


# Position wise Feed Forward Network

$$FFN(x) = \text{Linear}_2(\text{ReLU}(\text{Linear}_1(x))) $$
$$FFN(x) = \max(0, xW\_1 + b\_1)W\_2 + b\_2 $$
Based on the original paper: <br>
"This consists of two linear transformations with a ReLU activation in between."

In [270]:
class PositionwiseFeedForward:
    def __init__(self, d_model: int, d_ff: int, seed: int = 42):
        """
        Args:
            d_model: model dimensionality
            d_ff: dimensionality of the feed forward network
            seed: random seed for weight initialization and reproducibility
        """
        # Random state generator
        generator = np.random.RandomState(seed)
        self.W1 = generator.normal(scale=0.2, size=(d_model, d_ff))
        self.b1 = np.zeros((d_ff,))
        self.W2 = generator.normal(scale=0.2, size=(d_ff, d_model))
        self.b2 = np.zeros((d_model,))
        
    def __call__(self, x: np.ndarray) -> np.ndarray:
        """
        Args:
            x: input tensor of shape (batch_size, seq_length, d_model)

        Returns:
            output tensor of shape (batch_size, seq_length, d_model)
        """
        # Apply the first linear transformation
        x = x @ self.W1 + self.b1
        x = np.maximum(0, x)  # ReLU
        x = x @ self.W2 + self.b2
        return x


In [273]:
# Test Position Wise Feed Forward Network
d_model = 4
d_ff = 8
ffn = PositionWiseFeedForward(d_model, d_ff, seed=42)
print(ffn(np.random.rand(1, 3, d_model)))

[[[ 0.01458096 -0.08822238 -0.00927982 -0.00983293]
  [ 0.00263115 -0.03944536 -0.01325151 -0.01272889]
  [ 0.02267821 -0.14046964 -0.02002302 -0.01371408]]]


# Implement the layer normalisation

needed for the encoding and decoding stack

In [274]:
# No standard layer normalisation in the original paper, thus this is one possible one. 
def layer_normalization(x: np.ndarray, epsilon: float = 1e-10) -> np.ndarray:
    """
    Layer for normalising an input ndarray/tensor.
    Args:
        x: input tensor of shape (..., features)
        epsilon: small float added to variance to avoid dividing by zero
    """
    mean = np.mean(x, axis=-1, keepdims=True)
    var = np.var(x, axis=-1, keepdims=True)
    return (x - mean) / np.sqrt(var + epsilon)

In [279]:
# Test layer normalization
x = np.random.rand(2, 3, 4)
print("Before Layer Norm:\n", x)
x = layer_normalization(x)
print("After Layer Norm:\n", x)

Before Layer Norm:
 [[[0.97428919 0.74905569 0.74863568 0.8047723 ]
  [0.10825874 0.51662637 0.68566105 0.48331562]
  [0.80844796 0.34834711 0.46362344 0.93173749]]

 [[0.46096797 0.75113865 0.8247389  0.35784697]
  [0.51525289 0.93192136 0.63555553 0.68139192]
  [0.79260925 0.77292006 0.86060247 0.41410598]]]
After Layer Norm:
 [[[ 1.67835292 -0.75890644 -0.76345136 -0.15599512]
  [-1.61335633  0.32323837  1.12484859  0.16526937]
  [ 0.711211   -1.20904476 -0.72793276  1.22576652]]

 [[-0.70801805  0.78390927  1.16232872 -1.23821994]
  [-1.15843505  1.58755497 -0.36559883 -0.0635211 ]
  [ 0.47459353  0.36139692  0.86549868 -1.70148913]]]


# Encoder layer

MHA -> ADD&NORM -> FF -> ADD&NORM

In [280]:
class EncoderLayer:
    def __init__(self, d_model: int, num_heads: int, d_ff: int):
        """
        Args:
            d_model: model dimensionality
            num_heads: number of attention heads
            d_ff: dimensionality of the feed forward network
            seed: random seed for weight initialization and reproducibility
        """
        self.seed = 42
        self.mha = MultiHeadAttention(d_model, num_heads, self.seed)
        self.ffn = PositionwiseFeedForward(d_model, d_ff, self.seed)
        
    def __call__(self, x: np.ndarray, mask: Optional[np.ndarray] = None) -> np.ndarray:
        """
        Args:
            x: input tensor of shape (batch_size, seq_length, d_model)
            mask: optional mask broadcastable to (batch_size, num_heads, seq_len_q, seq_len_k)
        Returns:
            output tensor of shape (batch_size, seq_length, d_model)
        """
        attention_output, _ = self.mha(x, x, x, mask)  # Self-attention
        x = x + attention_output  # Residual connection
        ann1 = layer_normalization(x)  # Add & Norm
        
        ffn_output = self.ffn(ann1) # Feed Forward Network
        x = ann1 + ffn_output  # Residual connection
        output = layer_normalization(x)  # Add & Norm
        
        return output
        
        

In [282]:
# Test encoder layer
d_model = 4
num_heads = 2
d_ff = 8
input_embeddings = np.random.rand(1, 3, 4)
positional_encodings = positional_encoding(max_len=3, d_model=d_model)
input_with_pos = input_embeddings + positional_encodings[np.newaxis, :3, :]
print("Input with positional encodings:\n", input_with_pos)

encoder_layer = EncoderLayer(d_model=d_model, num_heads=num_heads, d_ff=d_ff)
output = encoder_layer(input_with_pos)
print("Encoder output:\n", output)

Input with positional encodings:
 [[[0.80445011 1.32273761 0.71461465 1.90757689]
  [1.5240724  1.07455612 0.34228796 1.7038028 ]
  [1.17866779 0.45362952 0.57496512 1.95711781]]]
Encoder output:
 [[[ 0.76886472  0.90609297 -1.60725184 -0.06770585]
  [ 1.16918237  0.50319328 -1.5366837  -0.13569195]
  [ 1.21547546  0.13433746 -1.56753611  0.21772319]]]


# Decoder layer

MMHA -> A&N -> MHA -> A&N -> FF -> A&N -> Lin -> Softmax

In [283]:
class DecoderLayer:
    def __init__(self, d_model: int, num_heads: int, d_ff: int):
        """
        Args:
            d_model: model dimensionality
            num_heads: number of attention heads
            d_ff: dimensionality of the feed forward network
            seed: random seed for weight initialization and reproducibility
        """
        self.seed = 42
        self.masked_mha1 = MultiHeadAttention(d_model, num_heads, self.seed)  # Masked MHA
        self.encoder_decoder_mha2 = MultiHeadAttention(d_model, num_heads, self.seed)  # MHA with encoder output
        self.ffn = PositionwiseFeedForward(d_model, d_ff, self.seed)
        
    def __call__(self, x: np.ndarray, encoder_output: np.ndarray, look_ahead_mask: Optional[np.ndarray] = None, padding_mask: Optional[np.ndarray] = None) -> np.ndarray:
        """
        Args:
            x: input tensor of shape (batch_size, target_seq_length, d_model)
            enc_output: encoder output tensor of shape (batch_size, input_seq_length, d_model)
            look_ahead_mask: triangular mask to prevent attending future tokens
            padding_mask: optional mask for the second MHA
        Returns:
            output tensor of shape (batch_size, target_seq_length, d_model)
        """
        # Masked Multi-Head Attention
        attention_1, _ = self.masked_mha1(x, x, x, look_ahead_mask)  # Self-attention with look-ahead mask
        x = x + attention_1  # Residual connection
        ann1 = layer_normalization(x)  # Add & Norm
        
        # Multi-Head Attention with encoder output
        attention_2, _ = self.encoder_decoder_mha2(ann1, encoder_output, encoder_output, padding_mask)
        x = ann1 + attention_2  # Residual connection
        ann2 = layer_normalization(x)  # Add & Norm
        
        # Feed Forward Network
        ffn_output = self.ffn(ann2)
        x = ann2 + ffn_output  # Residual connection
        output = layer_normalization(x)  # Add & Norm
        
        return output


In [284]:
# Test decoder layer with the previous encoder output
d_model = 4
num_heads = 2
d_ff = 8

decoder_layer = DecoderLayer(d_model=d_model, num_heads=num_heads, d_ff=d_ff)
decoder_input = np.random.rand(1, 3, 4)
decoder_pos_enc = positional_encoding(max_len=3, d_model=d_model)
# Triangular look-ahead mask
look_ahead_mask = np.triu(np.ones((1, 3, 3)), k=1).astype(bool)
decoder_output = decoder_layer(decoder_input + decoder_pos_enc, output, look_ahead_mask=look_ahead_mask)

print("Decoder output:\n", decoder_output)

Decoder output:
 [[[ 1.02720323  0.87541866 -1.38056066 -0.52206124]
  [ 1.37708826  0.48709442 -1.18584163 -0.67834106]
  [ 1.54990391 -0.32052063 -1.22271004 -0.00667324]]]


# Encoder stack

In [285]:
class Encoder:
    def __init__(self, num_layers: int, d_model: int, num_heads: int, d_ff: int):
        """
        Args:
            num_layers: number of encoder layers
            d_model: model dimensionality
            num_heads: number of attention heads
            d_ff: dimensionality of the feed forward network
        """
        self.layers = [EncoderLayer(d_model, num_heads, d_ff) for _ in range(num_layers)]
        
    def __call__(self, x: np.ndarray, mask: Optional[np.ndarray] = None) -> np.ndarray:
        """
        Args:
            x: input tensor of shape (batch_size, seq_length, d_model)
            mask: optional mask broadcastable to (batch_size, num_heads, seq_len_q, seq_len_k)
        Returns:
            output tensor of shape (batch_size, seq_length, d_model)
        """
        for layer in self.layers:
            x = layer(x, mask)
        return x

In [286]:
# Test encoder

# Test encoder layer
d_model = 4
num_heads = 2
d_ff = 8
input_embeddings = np.random.rand(1, 3, 4)
positional_encodings = positional_encoding(max_len=3, d_model=d_model)
input_with_pos = input_embeddings + positional_encodings[np.newaxis, :3, :]
print("Input with positional encodings:\n", input_with_pos)

encoder = Encoder(num_layers=10, d_model=d_model, num_heads=num_heads, d_ff=d_ff)
output = encoder(input_with_pos)
print("Encoder output:\n", output)

Input with positional encodings:
 [[[0.67408879 1.11600415 0.07145756 1.6237182 ]
  [1.630549   0.57626199 0.77138863 1.21269814]
  [1.50757462 0.3350594  0.20329144 1.80743073]]]
Encoder output:
 [[[ 1.03662878  0.58078776 -1.60872959 -0.00868695]
  [ 1.09657336  0.45023325 -1.60961845  0.06281184]
  [ 1.03892233  0.47270247 -1.63748354  0.12585874]]]


# Decoder stack


In [287]:
class Decoder:
    def __init__(self, num_layers: int, d_model: int, num_heads: int, d_ff: int):
        """
        Args:
            num_layers: number of decoder layers
            d_model: model dimensionality
            num_heads: number of attention heads
            d_ff: dimensionality of the feed forward network
        """
        self.layers = [DecoderLayer(d_model, num_heads, d_ff) for _ in range(num_layers)]
        
    def __call__(self, x: np.ndarray, encoder_output: np.ndarray, look_ahead_mask: Optional[np.ndarray] = None, padding_mask: Optional[np.ndarray] = None) -> np.ndarray:
        """
        Args:
            x: input tensor of shape (batch_size, target_seq_length, d_model)
            enc_output: encoder output tensor of shape (batch_size, input_seq_length, d_model)
            look_ahead_mask: triangular mask to prevent attending future tokens
            padding_mask: optional mask for the second MHA
        """
        for layer in self.layers:
            x = layer(x, encoder_output, look_ahead_mask, padding_mask)
        return x

In [288]:
# Test decoder with encoder output
d_model = 4
num_heads = 2
d_ff = 8
decoder = Decoder(num_layers=10, d_model=d_model, num_heads=num_heads, d_ff=d_ff)
decoder_input = np.random.rand(1, 3, 4)
decoder_output = decoder(decoder_input, output)
print("Decoder output:\n", decoder_output)

Decoder output:
 [[[ 1.15950012  0.4638494  -1.56093152 -0.06241799]
  [ 1.17375064  0.47676652 -1.54390302 -0.10661414]
  [ 1.15429576  0.44697933 -1.57062775 -0.03064734]]]


# Full transformer architecture

In [289]:
class Transformer:
    def __init__(self, num_encoder_layers: int, num_decoder_layers: int, d_model: int, num_heads: int, d_ff: int):
        """
        Args:
            num_encoder_layers: number of encoder layers
            num_decoder_layers: number of decoder layers
            d_model: model dimensionality
            num_heads: number of attention heads
            d_ff: dimensionality of the feed forward network
        """
        self.encoder = Encoder(num_encoder_layers, d_model, num_heads, d_ff)
        self.decoder = Decoder(num_decoder_layers, d_model, num_heads, d_ff)

    def __call__(self, encoder_input: np.ndarray, decoder_input: np.ndarray, look_ahead_mask: Optional[np.ndarray] = None, padding_mask: Optional[np.ndarray] = None) -> np.ndarray:
        """
        Args:
            encoder_input: input tensor for the encoder of shape (batch_size, input_seq_length, d_model)
            decoder_input: input tensor for the decoder of shape (batch_size, target_seq_length, d_model)
            look_ahead_mask: optional mask for the decoder
            padding_mask: optional mask for the encoder
        """
        encoder_output = self.encoder(encoder_input, padding_mask)
        decoder_output = self.decoder(decoder_input, encoder_output, look_ahead_mask, padding_mask)
        return decoder_output

In [291]:
# Test full transformer architecture
d_model = 4
num_heads = 2
d_ff = 8
transformer = Transformer(num_encoder_layers=2, num_decoder_layers=2, d_model=d_model, num_heads=num_heads, d_ff=d_ff)
encoder_input = np.random.rand(1, 3, 4)
decoder_input = np.random.rand(1, 2, 4)
output = transformer(encoder_input, decoder_input)
print("Transformer output:\n", output)

Transformer output:
 [[[-0.48432267  0.71760103 -1.38614123  1.15286287]
  [-0.32871729  0.43114645 -1.41151084  1.30908167]]]


# Unit tests

In [294]:
import unittest

class TestTransformerComponents(unittest.TestCase):
    def test_scaled_dot_product_attention_shape_and_probs(self):
        batch, seq_q, seq_k, depth = 2, 3, 4, 8
        Q = np.random.randn(batch, seq_q, depth)
        K = np.random.randn(batch, seq_k, depth)
        V = np.random.randn(batch, seq_k, depth)
        out, attn = scaled_dot_product_attention(Q, K, V)
        self.assertEqual(out.shape, (batch, seq_q, depth))
        self.assertEqual(attn.shape, (batch, seq_q, seq_k))
        # attention weights should sum to 1 across keys (allow small numerical error)
        s = np.sum(attn, axis=-1)
        self.assertTrue(np.allclose(s, 1.0, atol=1e-5))

    def test_positional_encoding(self):
        pe = positional_encoding(10, 16)
        self.assertEqual(pe.shape, (10, 16))
        # positions should differ
        self.assertFalse(np.allclose(pe[0], pe[1]))

    def test_multi_head_attention_shapes(self):
        batch, seq, d_model, heads = 2, 5, 16, 4
        mha = MultiHeadAttention(d_model, heads)
        x = np.random.randn(batch, seq, d_model)
        out, attn = mha(x, x, x)
        self.assertEqual(out.shape, (batch, seq, d_model))
        self.assertEqual(attn.shape, (batch, heads, seq, seq))

    def test_ffn_shape(self):
        batch, seq, d_model, d_ff = 2, 6, 16, 32
        ffn = PositionwiseFeedForward(d_model, d_ff)
        x = np.random.randn(batch, seq, d_model)
        y = ffn(x)
        self.assertEqual(y.shape, (batch, seq, d_model))

    def test_encoder_decoder_forward_shapes(self):
        batch, seq_inp, seq_tar, d_model = 2, 7, 5, 16
        enc = Encoder(num_layers=2, d_model=d_model, num_heads=4, d_ff=64)
        dec = Decoder(num_layers=2, d_model=d_model, num_heads=4, d_ff=64)
        inp = np.random.randn(batch, seq_inp, d_model)
        tar = np.random.randn(batch, seq_tar, d_model)
        enc_out = enc(inp)
        dec_out = dec(tar, enc_out)
        self.assertEqual(enc_out.shape, (batch, seq_inp, d_model))
        self.assertEqual(dec_out.shape, (batch, seq_tar, d_model))

    def test_transformer_forward_shape(self):
        batch, seq_inp, seq_tar, d_model = 2, 6, 4, 16
        model = Transformer(num_encoder_layers=2, num_decoder_layers=2, d_model=d_model, num_heads=4, d_ff=64)
        inp = np.random.randn(batch, seq_inp, d_model)
        tar = np.random.randn(batch, seq_tar, d_model)
        out = model(inp, tar)
        self.assertEqual(out.shape, (batch, seq_tar, d_model))

# Run tests and print results
suite = unittest.defaultTestLoader.loadTestsFromTestCase(TestTransformerComponents)
runner = unittest.TextTestRunner(verbosity=2)
result = runner.run(suite)
if not result.wasSuccessful():
    raise SystemExit('Some tests failed')

test_encoder_decoder_forward_shapes (__main__.TestTransformerComponents) ... ok
test_ffn_shape (__main__.TestTransformerComponents) ... ok
test_ffn_shape (__main__.TestTransformerComponents) ... ok
test_multi_head_attention_shapes (__main__.TestTransformerComponents) ... ok
test_positional_encoding (__main__.TestTransformerComponents) ... ok
test_scaled_dot_product_attention_shape_and_probs (__main__.TestTransformerComponents) ... ok
test_transformer_forward_shape (__main__.TestTransformerComponents) ... ok
test_multi_head_attention_shapes (__main__.TestTransformerComponents) ... ok
test_positional_encoding (__main__.TestTransformerComponents) ... ok
test_scaled_dot_product_attention_shape_and_probs (__main__.TestTransformerComponents) ... ok
test_transformer_forward_shape (__main__.TestTransformerComponents) ... ok

----------------------------------------------------------------------
Ran 6 tests in 0.035s

OK
ok

----------------------------------------------------------------------