In [1]:
# Implementing self attention
import numpy as np
import math
import os

In [2]:
ll=8# length of the input sequence
len_k=8# size of vectors
len_v=8# size of vectors

q=np.random.randn(ll,len_k)#what is expected
k=np.random.randn(ll,len_k)#what is presented
v=np.random.randn(ll,len_v)#what is given

mask=np.tril(np.ones((ll,ll)))
mask[mask==0]=-np.infty
mask[mask==1]=0

In [3]:
def scaled_dot_product_attention(q,k,v,mask=None):
    len_k=q.shape[1]

    # Standard softmax
    def softmax(x):
        return np.exp(x).T/np.sum(np.exp(x),axis=-1).T

    if mask is not None:
        attention=softmax(np.matmul(q,k.T)/math.sqrt(len_k)+mask)

    # We are adding a denominator eases the convergence of the model
    # and keeps it better scaled preventing it from exploding
    attention=softmax(np.matmul(q,k.T)/math.sqrt(len_k))

    # As one can see that, attention is trying to measure how closely
    # associated are q and k vectors. We later scale v as per that closness value
    ans=np.matmul(attention,v)

    return ans, attention

values, attention=scaled_dot_product_attention(q,k,v)

In [4]:
# Implementing multi head attention

import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F

In [5]:
# Number of words in a sequence
sentence_length = 12
batch_size = 1

# Length of the vector representing a word
input_dim = 512
# Length of the vector representing a word after getting attention
d_model = 512

sample_input = torch.randn( (batch_size, sentence_length, input_dim) )

#3 to include qkv all 3 vectors
qkv_layer=nn.Linear(input_dim, 3*d_model)

sample_input_qkv=qkv_layer(sample_input)

In [6]:
# Picking 8 as the Attention is all u need had 8 self attention heads
num_heads=8
head_dim=d_model//num_heads

sample_input_qkv=sample_input_qkv.reshape(batch_size,sentence_length ,num_heads,3*head_dim)

In [7]:
# We can get q, k and v seperately using
q, k, v = sample_input_qkv.chunk(3, dim=-1)

In [8]:
#Hence building Multi head attention class by combining all the chunks under one
# hood

# nn.Module is required for multiple nn functionalities
class MultiHeadAttention(nn.Module):
    def __init__(self, input_dim, d_model, num_heads):
        super().__init__()
        self.input_dim = input_dim
        self.d_model = d_model
        self.num_heads = num_heads
        self.head_dim = d_model // num_heads
        self.qkv_layer = nn.Linear(input_dim , 3 * d_model)
        self.linear_layer = nn.Linear(d_model, d_model)

    # Made some changes to work well for pytorch
    def __scaled_dot_product_attention(self,q,k,v,mask=None):
        len_k=q.shape[1]

        if mask is not None:
            attention=F.softmax(torch.matmul(q,k.transpose(-1,-2))/math.sqrt(len_k)+mask,dim=-1)
        attention=F.softmax(torch.matmul(q,k.transpose(-1,-2))/math.sqrt(len_k),dim=-1)
        ans=torch.matmul(attention,v)

        return ans, attention

    def forward(self, x, mask=None):
        batch_size, sequence_length, input_dim = x.size()
        qkv = self.qkv_layer(x)
        qkv = qkv.reshape(batch_size, sequence_length, self.num_heads, 3 * self.head_dim)
        qkv = qkv.permute(0, 2, 1, 3)
        q, k, v = qkv.chunk(3, dim=-1)
        values, attention = self.__scaled_dot_product_attention(q, k, v, mask)
        values = values.reshape(batch_size, sequence_length, self.num_heads * self.head_dim)
        out = self.linear_layer(values)
        return out


In [9]:
input_dim = 512
d_model = 512
num_heads = 8

batch_size = 16
sequence_length = 32
x = torch.randn( (batch_size, sequence_length, input_dim) )

model = MultiHeadAttention(input_dim, d_model, num_heads)
out = model.forward(x)

In [10]:
# Adding positional embeddings

import torch
import torch.nn as nn

In [11]:
max_sequence_length = 10
d_model = 6

even_i = torch.arange(0, d_model, 2).float()

# Denominator is kept high to spread embeddings as far as possible
even_denominator = torch.pow(10000, even_i/d_model)

odd_i = torch.arange(1, d_model, 2).float()
odd_denominator = torch.pow(10000, (odd_i - 1)/d_model)

# As both of them will have the same denominator
denominator = even_denominator

# Hence we define position as
position = torch.arange(max_sequence_length, dtype=torch.float).reshape(max_sequence_length, 1)

In [12]:
# We have broken these embeddings into sin and cosine to
# have periodicity. It also helps in linear transformations

# Adding sin and cosine to get the final embeddings
even_pos_embds = torch.sin(position / denominator)
odd_pos_embds = torch.cos(position / denominator)

# Just stacking even and odd position emeddings and reshaping it
final_pos_emb = torch.flatten(torch.stack([even_pos_embds, odd_pos_embds], dim=2), start_dim=1, end_dim=2)

In [13]:
class PositionalEncoding(nn.Module):

    def __init__(self, d_model, max_sequence_length):
        super().__init__()
        self.max_sequence_length = max_sequence_length
        self.d_model = d_model

    def forward(self):
        even_i = torch.arange(0, self.d_model, 2).float()
        denominator = torch.pow(10000, even_i/self.d_model)
        position = torch.arange(self.max_sequence_length).reshape(self.max_sequence_length, 1)
        even_pos_embds = torch.sin(position / denominator)
        odd_pos_embds = torch.cos(position / denominator)
        final_pos_emb = torch.flatten(torch.stack([even_pos_embds, odd_pos_embds], dim=2), start_dim=1, end_dim=2)
        return final_pos_emb

In [14]:
m = PositionalEncoding(d_model=6, max_sequence_length=10)
m.forward()

tensor([[ 0.0000,  1.0000,  0.0000,  1.0000,  0.0000,  1.0000],
        [ 0.8415,  0.5403,  0.0464,  0.9989,  0.0022,  1.0000],
        [ 0.9093, -0.4161,  0.0927,  0.9957,  0.0043,  1.0000],
        [ 0.1411, -0.9900,  0.1388,  0.9903,  0.0065,  1.0000],
        [-0.7568, -0.6536,  0.1846,  0.9828,  0.0086,  1.0000],
        [-0.9589,  0.2837,  0.2300,  0.9732,  0.0108,  0.9999],
        [-0.2794,  0.9602,  0.2749,  0.9615,  0.0129,  0.9999],
        [ 0.6570,  0.7539,  0.3192,  0.9477,  0.0151,  0.9999],
        [ 0.9894, -0.1455,  0.3629,  0.9318,  0.0172,  0.9999],
        [ 0.4121, -0.9111,  0.4057,  0.9140,  0.0194,  0.9998]])

In [30]:
# Implementing layer normalization

# Its quite straight forward, just like standard
# layer normalization in NN, implemented it for the sake of completeness
import torch
from torch import nn

class LayerNormalization(nn.Module):
    def __init__(self, parameters_shape, eps=1e-5):
        super().__init__()
        self.parameters_shape=parameters_shape
        self.eps=eps #eps task is not to make denominator zero.

        # Standard gamma and beta, please read on layer normalization
        # for better understading
        self.gamma = nn.Parameter(torch.ones(parameters_shape))
        self.beta =  nn.Parameter(torch.zeros(parameters_shape))

    def forward(self, inputs):
        dims = [-(i + 1) for i in range(len(self.parameters_shape))]
        mean = inputs.mean(dim=dims, keepdim=True)
        var = ((inputs - mean) ** 2).mean(dim=dims, keepdim=True)
        std = (var + self.eps).sqrt()
        y = (inputs - mean) / std
        out = self.gamma * y  + self.beta
        return out

In [31]:
batch_size = 3
sentence_length = 5
embedding_dim = 8
inp= torch.randn(sentence_length, batch_size, embedding_dim)

ln = LayerNormalization(inp.size()[-1:])
ln.forward(inp)

tensor([[[ 0.2237,  1.0983,  0.2260,  1.5452, -1.9365, -0.3385, -0.1797,
          -0.6386],
         [-1.8442,  0.3109,  0.0218,  1.8186,  0.7396, -0.4677, -0.6508,
           0.0719],
         [-0.0671,  0.6382,  0.5755, -1.9717, -0.3274,  1.7403, -0.1199,
          -0.4680]],

        [[-0.2946,  0.5916, -1.3892,  1.6275,  0.4714, -1.5742,  0.5317,
           0.0359],
         [ 1.5515, -0.0539, -1.6339, -1.4856,  0.4523,  0.2235,  0.5474,
           0.3987],
         [-1.6624,  0.5187,  0.9759,  0.6120, -0.2483,  1.4459, -1.0857,
          -0.5562]],

        [[ 0.1705, -1.3284,  0.5025, -0.0973,  1.2032, -0.2297,  1.3761,
          -1.5968],
         [ 0.2413, -0.2644, -0.1772,  0.2880, -2.0577, -0.2358,  1.8222,
           0.3836],
         [-0.1622,  0.1519,  0.3245, -0.9380,  2.2040,  0.2185, -0.4294,
          -1.3694]],

        [[ 0.5888, -0.1128, -0.3977, -2.4148,  0.9547,  0.3274,  0.3317,
           0.7228],
         [ 1.0433,  0.2276, -2.0210, -0.6999, -0.5630,  0.1693, 

In [32]:
# Let us create the encoder

# To create an encoder we will need one more component
# that is a normal feed forward NN
class FeedForwardLayer(nn.Module):
# Positionwise
    def __init__(self, d_model, hidden, drop_prob=0.1):
        super().__init__()
        # PositionwiseFeedForward, self
        self.linear1 = nn.Linear(d_model, hidden)
        self.linear2 = nn.Linear(hidden, d_model)
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(p=drop_prob)

    def forward(self, x):
        x = self.linear1(x)
        x = self.relu(x)
        x = self.dropout(x)
        x = self.linear2(x)
        return x

In [33]:
class EncoderLayer(nn.Module):

    def __init__(self, d_model, ffn_hidden, num_heads, drop_prob):
        super(EncoderLayer, self).__init__()
        self.attention = MultiHeadAttention(input_dim=d_model,d_model=d_model, num_heads=num_heads)
        self.norm1 = LayerNormalization(parameters_shape=[d_model])
        self.dropout1 = nn.Dropout(p=drop_prob)
        self.ffn = FeedForwardLayer(d_model=d_model, hidden=ffn_hidden, drop_prob=drop_prob)
        # PositionwiseFeedForward(d_model=d_model, hidden=ffn_hidden, drop_prob=drop_prob)
        self.norm2 = LayerNormalization(parameters_shape=[d_model])
        self.dropout2 = nn.Dropout(p=drop_prob)

    def forward(self, x):
        residual_x = x
        x = self.attention(x, mask=None)
        x = self.dropout1(x)
        x = self.norm1(x + residual_x)
        residual_x = x
        x = self.ffn(x)
        x = self.dropout2(x)
        x = self.norm2(x + residual_x)
        return x

class Encoder(nn.Module):
    def __init__(self, d_model, ffn_hidden, num_heads, drop_prob, num_layers):
        super().__init__()
        self.layers = nn.Sequential(*[EncoderLayer(d_model, ffn_hidden, num_heads, drop_prob)
                                     for _ in range(num_layers)])

    def forward(self, x):
        x = self.layers(x)
        return x


In [36]:
d_model = 512
num_heads = 8
drop_prob = 0.1
batch_size = 30
max_sequence_length = 200
ffn_hidden = 2048
num_layers = 5

encoder = Encoder(d_model, ffn_hidden, num_heads, drop_prob, num_layers)
x = torch.randn( (batch_size, max_sequence_length, d_model) ) # includes positional encoding
out = encoder(x)
out

tensor([[[ 0.1632,  1.7173,  0.3889,  ...,  0.2849,  0.7005,  2.4774],
         [-0.6468,  0.9080, -0.8771,  ..., -0.2510,  0.0443, -0.0343],
         [-0.4627, -1.7106,  2.6224,  ..., -0.8283,  1.4733,  1.3020],
         ...,
         [-0.7132, -0.7054,  0.9903,  ..., -1.6433,  0.3756,  0.2808],
         [-2.1918,  0.0538,  0.4344,  ..., -1.0681, -0.6622,  0.3531],
         [-0.4224, -0.2407,  0.9645,  ..., -0.6999, -1.0355,  0.1379]],

        [[-0.7046, -3.2670,  0.5777,  ...,  0.9631, -1.0930,  0.5923],
         [ 0.0895, -0.7336, -0.0172,  ...,  0.0103,  0.6214,  2.1696],
         [-2.2934,  0.7928,  0.7273,  ..., -1.1329, -2.4574, -0.9656],
         ...,
         [ 1.1689, -0.0733, -0.3280,  ..., -0.0928,  0.0429,  1.2562],
         [-0.5154, -0.4037,  1.6376,  ..., -1.9652, -1.0129, -0.5088],
         [ 0.5483,  0.8973,  0.3489,  ..., -0.6195, -2.2616,  0.8052]],

        [[ 0.7434, -2.6322, -0.1220,  ..., -1.4290, -0.2546,  2.6429],
         [-2.0935,  1.5937, -0.4408,  ...,  0

In [67]:
# Coding decoder

# We need a multi head cross attention module as in the decoder, we are
# connecting k and v from encoder and q from the decoder
# Other wise its mostly like self attention.
class MultiHeadCrossAttention(nn.Module):
    def __init__(self, d_model, num_heads):
        super().__init__()
        self.d_model = d_model
        self.num_heads = num_heads
        self.head_dim = d_model // num_heads

        # This is taken from the encoder
        self.kv_layer = nn.Linear(d_model , 2 * d_model)
        # This is taken from the decoder
        self.q_layer = nn.Linear(d_model , d_model)
        self.linear_layer = nn.Linear(d_model, d_model)

    # Made some changes to work well for pytorch
    def __scaled_dot_product_attention(self,q,k,v,mask=None):
        len_k=q.shape[1]

        if mask is not None:
            attention=F.softmax(torch.matmul(q,k.transpose(-1,-2))/math.sqrt(len_k)+mask,dim=-1)
        attention=F.softmax(torch.matmul(q,k.transpose(-1,-2))/math.sqrt(len_k),dim=-1)
        ans=torch.matmul(attention,v)

        return ans, attention


    def forward(self, x, y, mask=None):
        batch_size, sequence_length, d_model = x.size()
        kv = self.kv_layer(x)
        q = self.q_layer(y)

        # Some matrix multiplications
        kv = kv.reshape(batch_size, sequence_length, self.num_heads, 2 * self.head_dim)
        q = q.reshape(batch_size, sequence_length, self.num_heads, self.head_dim)
        kv = kv.permute(0, 2, 1, 3)
        q = q.permute(0, 2, 1, 3)
        k, v = kv.chunk(2, dim=-1)

        values, attention = self.__scaled_dot_product_attention(q, k, v, mask)
        values = values.reshape(batch_size, sequence_length, d_model)
        out = self.linear_layer(values)
        return out

In [68]:
# Same as positional encoding but with more layer and dropouts
class PositionwiseFeedForward(nn.Module):
    def __init__(self, d_model, hidden, drop_prob=0.1):
        super(PositionwiseFeedForward, self).__init__()
        self.linear1 = nn.Linear(d_model, hidden)
        self.linear2 = nn.Linear(hidden, d_model)
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(p=drop_prob)

    def forward(self, x):
        x = self.linear1(x)
        x = self.relu(x)
        x = self.dropout(x)
        x = self.linear2(x)
        return x


class DecoderLayer(nn.Module):

    def __init__(self, d_model, ffn_hidden, num_heads, drop_prob):
        super(DecoderLayer, self).__init__()
        self.self_attention = MultiHeadAttention(input_dim=d_model,d_model=d_model, num_heads=num_heads)
        self.norm1 = LayerNormalization(parameters_shape=[d_model])
        self.dropout1 = nn.Dropout(p=drop_prob)
        self.encoder_decoder_attention = MultiHeadCrossAttention(d_model=d_model, num_heads=num_heads)
        self.norm2 = LayerNormalization(parameters_shape=[d_model])
        self.dropout2 = nn.Dropout(p=drop_prob)
        self.ffn = PositionwiseFeedForward(d_model=d_model, hidden=ffn_hidden, drop_prob=drop_prob)
        self.norm3 = LayerNormalization(parameters_shape=[d_model])
        self.dropout3 = nn.Dropout(p=drop_prob)

    def forward(self, x, y, decoder_mask):
        _y = y
        y = self.self_attention(y, mask=decoder_mask)
        y = self.dropout1(y)
        y = self.norm1(y + _y)

        _y = y
        y = self.encoder_decoder_attention(x, y, mask=None)
        y = self.dropout2(y)
        y = self.norm2(y + _y)

        _y = y
        y = self.ffn(y)
        y = self.dropout3(y)
        y = self.norm3(y + _y)
        return y


In [69]:
class SequentialDecoder(nn.Sequential):
    # Yes, simple forward pass
    def forward(self, *inputs):
        x, y, mask = inputs
        for module in self._modules.values():
            y = module(x, y, mask)
        return y

class Decoder(nn.Module):
    def __init__(self, d_model, ffn_hidden, num_heads, drop_prob, num_layers=1):
        super().__init__()
        # Combining multiple decoder layers
        # * means sending all parametric output to Sequential Decoder
        self.layers = SequentialDecoder(*[DecoderLayer(d_model, ffn_hidden, num_heads, drop_prob)
                                          for _ in range(num_layers)])

    def forward(self, x, y, mask):
        y = self.layers(x, y, mask)
        return y

In [71]:
d_model = 512
num_heads = 8
drop_prob = 0.1
batch_size = 30
max_sequence_length = 200
ffn_hidden = 2048
num_layers = 5

x = torch.randn( (batch_size, max_sequence_length, d_model) ) # English sentence positional encoded
y = torch.randn( (batch_size, max_sequence_length, d_model) ) # Translated sentence positional encoded
mask = torch.full([max_sequence_length, max_sequence_length] , float('-inf'))
mask = torch.triu(mask, diagonal=1)
decoder = Decoder(d_model, ffn_hidden, num_heads, drop_prob, num_layers)
out = decoder(x, y, mask)
out

tensor([[[-1.4949, -0.8757,  0.8508,  ..., -0.9956, -0.9012, -0.0676],
         [ 1.3021, -0.6950,  0.6334,  ...,  0.4350,  0.1510,  0.4122],
         [ 0.4655, -1.3297, -0.8419,  ..., -0.2157, -0.9272, -2.6891],
         ...,
         [ 0.8866,  0.2122, -1.1576,  ..., -1.3499, -0.8153, -0.8631],
         [-0.0675, -1.9018, -2.2976,  ...,  1.6301, -0.2300, -1.9203],
         [ 0.2834, -0.8018,  1.4648,  ...,  0.5247,  0.9794,  0.1147]],

        [[-1.8060,  0.1756,  0.8302,  ..., -0.3361, -0.1578, -0.3368],
         [ 0.0628,  0.8252, -0.6606,  ...,  1.5227, -1.3413,  0.5388],
         [-0.5399, -0.5199,  0.1767,  ..., -1.7849,  1.1637,  0.7453],
         ...,
         [-1.0900, -0.1581, -0.0456,  ...,  0.1248, -0.3921,  1.2611],
         [ 0.6246,  0.2772,  1.0177,  ...,  0.4094,  0.5543, -0.2030],
         [-1.2364,  0.7276,  1.3087,  ..., -0.7991,  2.2529,  0.3428]],

        [[-0.8552, -0.0092,  0.8440,  ..., -0.5720, -0.9373,  1.2939],
         [ 0.2559, -1.0715,  0.7552,  ...,  0