In [121]:
import torch
import torch.nn as nn
import math
import copy

In [122]:
class Embedding(nn.Module):
    def __init__(self, vocab_size, d_model=512) -> None:
        super(Embedding, self).__init__()
        self.vocab_size = vocab_size
        self.d_model = d_model
        # dmodel -> Dimension of model
        self.embed_layer = nn.Embedding(vocab_size, d_model)
        
    def forward(self, x):
        # As author mentions -> In the embedding layers, we multiply those weights by sqrt(dmodel)  -> page 5
        return self.embed_layer(x) * math.sqrt(self.d_model)

In [123]:
class PositionalEncoding(nn.Module):
    def __init__(self,max_seq_len,d_model=512) -> None:
        super(PositionalEncoding, self).__init__()
        self.d_model = d_model
        # create a matrix of positional encoding
        pos = torch.arange(0, max_seq_len,dtype = torch.float).unsqueeze(1)
        # we are dividing by 10000 because we know that we are using sin and cos function
        # we know a^-x  is equals to 1/a^x
        frequency = torch.pow(10000,-torch.arange(0, d_model, 2, dtype = torch.float)/self.d_model)
        
        # By alternating sine and cosine functions for even and odd dimensions, this encoding scheme ensures that each position in the sequence has a unique representation. 
        # This helps the transformer model to distinguish between different positions and capture the sequential nature of the data.
        pe = torch.zeros((max_seq_len, d_model))
        pe[:,0::2] = torch.sin(pos * frequency)
        pe[:,1::2] = torch.cos(pos * frequency)
        
        # Here we use register_buffer, because it avoid update model parameter during backpropagation
        self.register_buffer('pe', pe)
        
    def forward(self, embed_vect):
        # add embedding output and positional encoding
        return embed_vect + self.pe

In [124]:
class MultiHeadAttention(nn.Module):
    def __init__(self, d_model=512, n_heads=8, dropout_rate=0.2) -> None:
        super(MultiHeadAttention, self).__init__()
        self.d_model = d_model
        self.n_heads = n_heads
        self.dropout = nn.Dropout(p=dropout_rate)
        self.head_dim = d_model // n_heads
        self.softmax_layer = nn.Softmax(dim=-1)
        self.q_linear = nn.Linear(d_model, d_model)
        self.v_linear = nn.Linear(d_model, d_model)
        self.k_linear = nn.Linear(d_model, d_model)
        self.out = nn.Linear(d_model, d_model)
    
    def attention(self, query, key, value, mask=None):
        """
         BS: Batch Size - the number of sequences processed in parallel.
         NH: Number of Heads - the number of attention heads in a multi-head attention mechanism.
         S/T: Sequence Length or Target Length - the length of the input sequence (for encoder) or the target sequence (for decoder).
         HD: Head Dimension - the dimensionality of each attention head.
         
         calculate attention score
         query = (BS,NH,S/T,HD) , key.transpose(-2,-1) = (BS,NH,HD,S/T)
         attention score size for encoder attention = (BS,NH,S,S) , decoder attention = (BS,NH,T,T), encoder-decoder attention = (BS,NH,T,S)
        """
        # dot product of query and key
        scores = torch.matmul(query, key.transpose(-2, -1)) / math.sqrt(self.head_dim)
        # apply mask if it is not None
        if mask is not None:
            scores = scores.masked_fill(mask == torch.tensor(False), float("-inf"))
            
        # pass through softmax layer
        attention_weight = self.softmax_layer(scores)

        # shape of output = (BS,NH,S/T,HD)
        # return matrix multiplication of attention weight and value
        return torch.matmul(attention_weight, value)
    
    def forward(self, query, key, value, mask=None):
        batch_size = key.size(0)
        
        """
         dot product with weight matrices
         size of key/query/value = (BS,S/T,ED) ,
         where BS = batch size,
         S = Source Sequence length, T = target sequence length,
         ED = Embedding dimension,
         NH = Number Of Head, HD = head dimension
        """
        key, query, value = self.k_linear(key), self.q_linear(query), self.v_linear(value)
        
        """
         split vector by number of head and transpose
         size of key/query/value = (BS,NH,S/T,HD) , where BS = batch size, NH = Number Of Head, HD = Head dimension
        """
        key = key.view(batch_size, -1, self.n_heads, self.head_dim).transpose(1, 2)
        query = query.view(batch_size, -1, self.n_heads, self.head_dim).transpose(1, 2)
        value = value.view(batch_size, -1, self.n_heads, self.head_dim).transpose(1, 2)
        
        # size of attention score = (BS,NH,S/T,HD)
        attention_score = self.attention(query, key, value, mask) # size - torch.Size([2, 4, 8, 64]) -> [batch_size, max_seq_len, n_head, head_dim]
        attention_score = self.dropout(attention_score)
        
        # concatenate heads and put through final linear layer
        # size of output = (BS,S/T,NH,HD))
        attention_score = attention_score.transpose(1, 2).reshape(batch_size, -1, self.head_dim * self.n_heads) # size = (BS, S/T, ED)
        
        return self.out(attention_score)

In [125]:
# Page 5, section 3.3
import torch.nn.functional as F
class PositionWiseFeedForward(nn.Module):
    """
    This consists of two linear transformations with a ReLU activation in between. 
    Purpose ot this layer is introduce non-linearity into the Transformer architecture, 
    allowing the model to learn more complex relationships between words and their positions within the sentence
    
    FFN(x) = max(0, xW1 + b1)W2 + b2

    Args:
        d_model (int, optional): [description]. Defaults to 512.
        dropout_rate (float, optional): [description]. Defaults to 0.2.
    """
    def __init__(self, d_model=512, dropout_rate=0.2) -> None:
        super(PositionWiseFeedForward, self).__init__()
        self.d_model = d_model
        hidden_width = 4
        self.linear1 = nn.Linear(d_model, d_model * hidden_width)
        self.linear2 = nn.Linear(d_model * hidden_width, d_model)
        self.dropout = nn.Dropout(p = dropout_rate)
    
    def forward(self, x):
        return self.linear2(self.dropout(F.relu(self.linear1(x))))

In [126]:
# Page 3, section 3.1
class SubLayer(nn.Module):
    """
    In Transformers, each encoder block consist two sub-layer: a multi-head attention mechanism and a position-wise feed-forward network (FFN). 
    That is, the output of each sub-layer is LayerNorm(x + Sublayer(x)), where Sublayer(x) is the function implemented by the sub-layer itself.
    """
    def __init__(self, d_model = 512) -> None:
        super(SubLayer, self).__init__()
        self.norm = nn.LayerNorm(d_model)
        
    def forward(self, x, sub_layer_x):
        return self.norm(x + sub_layer_x)

In [127]:
# Page 3, section 3.1
class EncoderLayer(nn.Module):
    """
    Encoder Layer has two sub layer. The first is a multi-head self-attention mechanism, 
    and the second is a simple, position-wise fully connected feed-forward network.
    """
    def __init__(self, d_model=512, n_heads=8, dropout_rate=0.2) -> None:
        super(EncoderLayer, self).__init__()
        self.d_model = d_model

        self.self_attention = MultiHeadAttention(d_model, n_heads, dropout_rate)
        self.sub_layer1 = SubLayer(d_model)
        self.dropout1 = nn.Dropout(p=dropout_rate)

        self.feed_forward = PositionWiseFeedForward(d_model, dropout_rate)
        self.sub_layer2 = SubLayer(d_model)
        self.dropout2 = nn.Dropout(p=dropout_rate)

    def forward(self, vec_representation, src_mask=None):
        """
        The output of the self-attention mechanism is passed through a feed-forward neural network, 
        which consists of two linear transformations with a ReLU activation in between.
        """
        # pass through self-attention
        attention_output = self.self_attention(key=vec_representation, query=vec_representation, value=vec_representation, mask=src_mask)
        attention_output = self.dropout1(attention_output)

        # pass through sub-layer 1
        sub_layer1_output = self.sub_layer1(vec_representation, attention_output)
        
        # pass through feed forward network
        feed_forward_output = self.feed_forward(sub_layer1_output)
        feed_forward_output = self.dropout2(feed_forward_output)
        
        # pass through sub-layer 2
        return self.sub_layer2(sub_layer1_output, feed_forward_output)

In [128]:
# Page 3, section 3.1
class EncoderBlock(nn.Module):
    #The encoder is composed of a stack of N = 6 identical layers.
    def __init__(self, encoder_layer, n_layers=6) -> None:
        super(EncoderBlock, self).__init__()
        self.layers = nn.ModuleList([copy.deepcopy(encoder_layer) for _ in range(n_layers)])
    
    def forward(self, src_embedding, src_mask=None):
        for layer in self.layers:
            src_embedding = layer(src_embedding, src_mask)
        return src_embedding

In [129]:
class DecoderLayer(nn.Module):
    """
    Encoder Layer has three sub layer. 
    In addition to the two sub-layers in each encoder layer, the decoder inserts a third sub-layer, 
    which performs multi-head attention over the output of the encoder stack.

    Decoder layer contain 2 layer, one is Mask Multi-Head Attention another one is “encoder-decoder attention”. 
    In “encoder-decoder attention” layers, the queries come from the previous decoder layer, and the memory keys and values come from the output of the encoder. 
    This allows every position in the decoder to attend over all positions in the input sequence.
    """
    def __init__(self, d_model, multi_head_attention_layer, position_wise_feedforward_layer, dropout_rate = 0.2) -> None:
        super(DecoderLayer, self).__init__()
        self.d_model = d_model
        self.decoder_attention_layer = copy.deepcopy(multi_head_attention_layer)
        self.sub_layer1 = SubLayer(d_model)
        self.dropout1 = nn.Dropout(p=dropout_rate)

        self.encoder_decoder_attention_layer = copy.deepcopy(multi_head_attention_layer)
        self.sub_layer2 = SubLayer(d_model)
        self.dropout2 = nn.Dropout(p=dropout_rate)

        self.feed_forward = copy.deepcopy(position_wise_feedforward_layer)
        self.sub_layer3 = SubLayer(d_model)
        self.dropout3 = nn.Dropout(p=dropout_rate)

    def forward(self,enc,dec,src_mask=None,target_mask=None):
        """
        The decoder layer is composed of three sub-layers. 
        The first is a multi-head self-attention mechanism, the second is a multi-head attention mechanism, 
        and the third is a simple, position-wise fully connected feed-forward network.
        """
        # pass through self-attention
        decoder_attention_output = self.decoder_attention_layer(key=dec, query=dec, value=dec, mask=target_mask)
        decoder_attention_output = self.dropout1(decoder_attention_output)

        # pass through sub-layer 1
        sub_layer1_output = self.sub_layer1(dec, decoder_attention_output)

        # pass through encoder-decoder attention
        encoder_decoder_attention_output = self.encoder_decoder_attention_layer(key=enc, query=sub_layer1_output, value=enc, mask=src_mask)
        encoder_decoder_attention_output = self.dropout2(encoder_decoder_attention_output)

        # pass through sub-layer 2
        sub_layer2_output = self.sub_layer2(sub_layer1_output, encoder_decoder_attention_output)

        # pass through feed forward network
        feed_forward_output = self.feed_forward(sub_layer2_output)
        feed_forward_output = self.dropout3(feed_forward_output)

        # pass through sub-layer 3
        return self.sub_layer3(sub_layer2_output, feed_forward_output)

In [130]:
class DecoderBlock(nn.Module):
    # The decoder is also composed of a stack of N = 6 identical layers.
    def __init__(self, decoder_layer, n_layers=6) -> None:
        super(DecoderBlock, self).__init__()
        self.layers = nn.ModuleList([copy.deepcopy(decoder_layer) for _ in range(n_layers)])
        self.layer_norm = nn.LayerNorm(decoder_layer.d_model)
    
    def forward(self, encoder_out_vec, decoder_embedding, src_mask=None, target_mask=None):
        for layer in self.layers:
            decoder_embedding = layer(enc = encoder_out_vec, dec = decoder_embedding, src_mask =  src_mask, target_mask = target_mask)
        return decoder_embedding

In [131]:
class DecoderGenerator(nn.Module):
    # In this layer we generating the output sequence one token at a time.
    # The output of the final decoder layer is passed through a linear transformation and a softmax function to generate the output sequence.
    def __init__(self, d_model, vocab_size) -> None:
        super(DecoderGenerator, self).__init__()
        self.linear = nn.Linear(d_model, vocab_size)
        self.softmax = nn.LogSoftmax(dim=-1)

    def forward(self, x):
        return self.softmax(self.linear(x))

In [132]:
class Transformers(nn.Module):
    # merging all the layers together
    def __init__(self,src_seq_len,trg_seq_len,d_model,num_head,dropout_rate = 0.2) -> None:
        super(Transformers, self).__init__()
        self.src_seq_len = src_seq_len
        self.trg_seq_len = trg_seq_len
        self.d_model = d_model
        self.num_head = num_head

        self.src_embedding = Embedding(src_seq_len, d_model)
        self.src_positional_encoding = PositionalEncoding(src_seq_len, d_model)

        self.trg_embedding = Embedding(trg_seq_len, d_model)
        self.trg_positional_encoding = PositionalEncoding(trg_seq_len, d_model)

        self.multi_head_attention = MultiHeadAttention(d_model, num_head, dropout_rate)
        self.position_wise_feedforward = PositionWiseFeedForward(d_model, dropout_rate)

        self.encoder_layer = EncoderLayer(d_model, num_head, dropout_rate)
        self.encoder_block = EncoderBlock(self.encoder_layer)

        self.decoder_layer = DecoderLayer(d_model, self.multi_head_attention, self.position_wise_feedforward, dropout_rate)
        self.decoder_block = DecoderBlock(self.decoder_layer)

        self.decoder_generator = DecoderGenerator(d_model, trg_seq_len)

    def forward(self,src_token_id,target_token_id,src_mask=None,target_mask=None):
        encode_out = self.encode(src_token_id, src_mask)
        decode_out = self.decode(target_token_id, encode_out, src_mask, target_mask)
        return decode_out
    
    def encode(self,src_token_id,src_mask):
        src_embedding = self.src_embedding(src_token_id)
        src_embedding = self.src_positional_encoding(src_embedding)
        return self.encoder_block(src_embedding, src_mask)
    
    def decode(self,target_token_id,encode_out,src_mask,target_mask):
        embed = self.src_embedding(target_token_id)
        pe_out = self.src_positional_encoding(embed)
        decode_out = self.decoder_block(encode_out, pe_out, src_mask, target_mask)
        return self.decoder_generator(decode_out)

In [None]:
def get_src_mask(src_token_ids_batch,pad_tok_id):
    batch_size = src_token_ids_batch.size()[0]
    src_mask = (src_token_ids_batch!=pad_tok_id).view(batch_size, 1, 1,-1) #SIZE = (BS,1,1,S)
    return src_mask
def get_trg_mask(trg_token_ids_batch,pad_tok_id):
    batch_size = trg_token_ids_batch.size()[0]
    seq_len = trg_token_ids_batch.size()[1]
    trg_pad_mask = (trg_token_ids_batch!=pad_tok_id).view(batch_size, 1, 1,-1) #SIZE = (BS,1,1,T)
    trg_look_forward = torch.triu(torch.ones(1,1,seq_len,seq_len, dtype=torch.bool)).transpose(2,3)
    trg_mask = trg_pad_mask & trg_look_forward
    return trg_mask

UNK_IDX, PAD_IDX, BOS_IDX, EOS_IDX = 0, 1, 2, 3
low_bound = 3
high_bound = 15
batch_size = 32
# Update src_seq_len and trg_seq_len to match the expected sequence length
src_seq_len = 20
trg_seq_len = 20

# Update tensor sizes accordingly
src_tensor_size = (batch_size, src_seq_len)
trg_tensor_size = (batch_size, trg_seq_len)

# Generate source and target sequences with the updated sizes
src_seq = torch.randint(3, 16, size=src_tensor_size, dtype=torch.int32)
trg_seq = torch.randint(3, 16, size=trg_tensor_size, dtype=torch.int32)

# Initialize the transformer with the updated sequence lengths
transformer = Transformers(
    src_seq_len=src_seq_len,
    trg_seq_len=trg_seq_len,
    d_model=512,
    num_head=8,
    dropout_rate=0.2
)

# Generate masks
src_mask = get_src_mask(src_seq, PAD_IDX)
trg_mask = get_trg_mask(trg_seq, PAD_IDX)

# Forward pass
output = transformer(src_seq, trg_seq, src_mask, trg_mask)