# Transformer Layers

Given an input X, a (T, d) dimensional input write a function using numpy to feed it to an encoder and decoder layer of the transformer. You will also have to implement the internal workings of scaled dot-product Attention and Multi-head attention layers. For convenience consider the projection dimension to be the same (dim_size=d) for all Query, Key and Value. Only The forward propagation is expected to be implemented which will be executed by the forward method of each class.
<center>
<img src="./fig/Transformer.png" width="324" height="470">
</center>
<br>
<center>
<img src="./fig/Attention-layers.png" width="550" height="350">
</center>


In [2]:
import numpy as np

In [3]:
class ScaledDotProductAttention:
    def __init__(self, dim_size):
        self.query_weights = np.random.randn(dim_size, dim_size)
        self.query_bias = np.random.randn(dim_size)

        self.key_weights = np.random.randn(dim_size, dim_size)
        self.key_bias = np.random.randn(dim_size)

        self.value_weights = np.random.randn(dim_size, dim_size)
        self.value_bias = np.random.randn(dim_size)

    def forward(self, queries, keys, values):
        # Compute dot product of queries and keys
        dot_product = np.matmul(queries, keys.T)

        # Divide dot product by square root of key dimension
        dot_product /= np.sqrt(keys.shape[1])

        # Apply softmax to the dot product to compute attention weights
        attention_weights = np.exp(dot_product) / np.sum(np.exp(dot_product), axis=1, keepdims=True)

        # Compute the weighted sum of the values using the attention weights
        attention_outputs = np.matmul(attention_weights, values)

        return attention_outputs

In [4]:
class MultiHeadAttention:
    def __init__(self, dim_size, num_heads):
        # for each head, we shall create an SDPA (ScaledDotProductAttention)
        # object that shall perform scaled dot roduct attention
        self.SDPA_heads = [ScaledDotProductAttention(dim_size) for i in range(num_heads)]
        self.linear_weights = np.random.randn(dim_size, dim_size)
        self.linear_bias = np.random.randn(dim_size)

    def forward(self,  Q, K, V, Mask=None):
        # we will project the query, key and value based on 
        # W_q, W_k, W_v for each head
        Q_heads = [np.matmul(Q, obj.query_weights) + obj.query_bias for obj in self.SDPA_heads]
        K_heads = [np.matmul(K, obj.key_weights) + obj.key_bias for obj in self.SDPA_heads]
        V_heads = [np.matmul(V, obj.value_weights) + obj.value_bias for obj in self.SDPA_heads]

        # iteratively perform SDPA for each head and append the outputs
        SDPA_outs = []
        for obj, q, k, v in zip(self.SDPA_heads, Q_heads, K_heads, V_heads):
            SDPA_outs.append(obj.forward(q, k, v))
        
        # concatenate the outputs of SDPA
        # and reproject them using a linear function
        SDPA_outputs_concatenated = np.concatenate(SDPA_outs, 1)
        output = np.matmul(SDPA_outputs_concatenated, self.linear_weights) + \
                    self.linear_bias
        return output

In [5]:
class Encoder:
    def __init__(self, dim_size, num_heads):
        # declare your MHA layer
        self.MHA = MultiHeadAttention(dim_size=dim_size, num_heads=num_heads)
        # weights for Feedforward layer
        self.linear_weights = np.random.randn(dim_size, dim_size)
        self.linear_bias = np.random.randn(dim_size)

    def forward(self, X):
        # returns the forward propagation output
        MHA_out = self.MHA.forward(X, X, X)
        mu1, sigma1 = np.mean(MHA_out, 1), np.sqrt(np.var(MHA_out,1))
        sublayer1_out = (X + MHA_out - mu1)/sigma1
        
        # feed-forward and ReLU function
        ff_out = np.maximum(0,np.matmul(sublayer1_out, self.linear_weights) + self.linear_bias)
        sublayer2_out = ff_out + sublayer1_out

        # compute the mean and sigma after the residual connection
        mu2, sigma2 = np.mean(sublayer2_out, 1),\
                np.sqrt(np.var(sublayer2_out,1))
        return (sublayer2_out - mu2)/sigma2

In [6]:
class Decoder:
    def __init__(self, dim_size, num_heads1, num_heads2):
        # declare your MHA layer
        self.MHA1 = MultiHeadAttention(dim_size=dim_size, num_heads=num_heads1)
        self.MHA2 = MultiHeadAttention(dim_size=dim_size, num_heads=num_heads2)

        # weights for Feedforward layer
        self.linear_weights = np.random.randn(dim_size, dim_size)
        self.linear_bias = np.random.randn(dim_size)

    def forward(self, encoder_out, x):
        MHA1_out = self.MHA1.forward(x, x, x) + x
        mu1, sigma1 = np.mean(MHA1_out, 1), np.sqrt(np.var(MHA1_out,1))
        MHA1_out = (MHA1_out - mu1)/sigma1

        MHA2_out = self.MHA2.forward(encoder_out, encoder_out, MHA1_out) + MHA1_out
        mu2, sigma2 = np.mean(MHA2_out, 1), np.sqrt(np.var(MHA2_out,1))
        MHA2_out = (MHA2_out - mu2)/sigma2
        
        ff_out = np.maximum(0,np.matmul(MHA2_out, self.linear_weights) + self.linear_bias)
        sublayer3_out = ff_out + MHA2_out

        # compute the mean and sigma after the residual connection
        mu3, sigma3 = np.mean(sublayer3_out, 1),\
                np.sqrt(np.var(sublayer3_out,1))
        return (sublayer3_out - mu3)/sigma3