In [1]:
import numpy as np
import tensorflow as tf

from tensorflow.keras.layers import Embedding, Dense, LayerNormalization, Dropout

# Positional Encoding

Positional embeddings are order or position identifiers added to the initial vector representation of the inputs for the transformer to know the order of sequence.

The transformer does not process the inputs sequentially, but in parallel. For each element it combines information from each other elements through self-attention, but each element does this aggregation on its own, independently of what other elements do or have done yet.

Because the transformer architecture does not model the order of the input anywhere, we must encode the order of the input explicitly. 

In [None]:
def get_angles(pos, i, d):
    return pos  / np.power(10000, (2 * (i//2) / d))

In [None]:
def get_positional_encodings(num_words, embed_dim):
    angles_rad = get_angles(pos = np.arange(num_words)[:, np.newaxis], # pos - shape: (num_words, 1)
                            i = np.arange(embed_dim)[np.newaxis, :],   # i   - shape: (1, embed_dim)
                            d = embed_dim) # scaler


    # For even indices, get 'sine' value of angles
    angles_rad[:, 0::2] = np.sin(angles_rad[:, 0::2])

    # For odd indices, get 'cosine' value of angles
    angles_rad[:, 1::2] = np.cos(angles_rad[:, 1::2])

    pos_encoding = angles_rad[np.newaxis, ...]
    return tf.cast(pos_encoding, dtype=tf.float32)

    # return angles_rad

# Masking

**Padding Mask**

When passing sequences into a transformer model, it is important that they are of uniform length. This can be achieved by padding the sequences with zeros, and truncating sentences that exceed the maximum length of the model.

However, the padded zeros will affect the softmax calculation. In order to ensure that padding doesn’t contribute to the self-attention we use Padding Mask.

**Look-ahead Mask**

In training, the model has access to the complete correct output of the training example. 

Therefore, while predicting a word at a certain position, the Decoder has available to it the target words preceding that word as well as the target words following that word. This allows the Decoder to ‘cheat’ by using target words from future ‘time steps’. The Look-ahead Mask is used to mask out input words that appear later in the sequence.

In [2]:
def create_padding_mask(seq):
    seq = tf.cast(tf.math.equal(seq, 0), tf.float32)
    return seq[:, tf.newaxis, tf.newaxis, :]

In [None]:
def create_look_ahead_mask(size):
    mask = 1 - tf.linalg.band_part(tf.ones((size, size)), -1, 0)
    return mask

#  Building Transformer Network

### Scaled Dot Product

In [None]:
def scaled_dot_product_attention(q, k, v, mask):
    # Give each word a score that corresponds to the focus it should put to 
    # other words in the sequence
    score_matrix = tf.matmul(q, k, transpose_b=True)

    # Scale down the scores to allow for more stable gradients as multiplying
    # values can have explidng effects
    dk = tf.cast(tf.shape(k)[-1], tf.float32)
    scaled_attention_scores = score_matrix / tf.math.sqrt(dk)

    if mask is not None:
        scaled_attention_scores += (mask * -1e9)  

    attention_weights = tf.nn.softmax(scaled_attention_scores, axis=-1)

    output = tf.matmul(attention_weights, v)
    
    return output, attention_weights

### Multi-head Attention Layer

In [None]:
class MultiHeadAttention(tf.keras.layers.Layer):
    def __init__(self, d_model, num_heads):
        super(MultiHeadAttention, self).__init__()
        self.num_heads = num_heads
        self.d_model = d_model

        assert d_model % self.num_heads == 0

        self.depth = d_model // self.num_heads

        self.wq = tf.keras.layers.Dense(d_model)
        self.wk = tf.keras.layers.Dense(d_model)
        self.wv = tf.keras.layers.Dense(d_model)

        self.dense = tf.keras.layers.Dense(d_model)
        

    def split_heads(self, x, batch_size):
        x = tf.reshape(x, (batch_size, -1, self.num_heads, self.depth))
        return tf.transpose(x, perm=[0, 2, 1, 3])
    
    
    def call(self, v, k, q, mask):
        batch_size = tf.shape(q)[0]

        q = self.wq(q)
        k = self.wk(k)
        v = self.wv(v)

        q = self.split_heads(q, batch_size)
        k = self.split_heads(k, batch_size)
        v = self.split_heads(v, batch_size)

        scaled_attention, attention_weights = scaled_dot_product_attention(
            q, k, v, mask)

        scaled_attention = tf.transpose(scaled_attention, perm=[0, 2, 1, 3])

        concat_attention = tf.reshape(scaled_attention, (batch_size, -1, self.d_model))
        output = self.dense(concat_attention)
            
        return output, attention_weights

### Feed Forward Network

In [None]:
def FullyConnected(embedding_dim, fully_connected_dim):
    ffn = tf.keras.Sequential([
        Dense(fully_connected_dim, activation='relu'),
        Dense(embedding_dim)
    ])

    return ffn

### Transformer Encoder Layer

In [None]:
class EncoderLayer(tf.keras.layers.Layer):
    def __init__(self, embedding_dim, num_heads, fully_connected_dim, dropout_rate=0.1, layernorm_eps=1e-6):
        super(EncoderLayer, self).__init__()

        self.mha = MultiHeadAttention(d_model=embedding_dim,
                                      num_heads=num_heads)

        self.ffn = FullyConnected(embedding_dim=embedding_dim,
                                  fully_connected_dim=fully_connected_dim)

        self.layernorm1 = LayerNormalization(epsilon=layernorm_eps)
        self.layernorm2 = LayerNormalization(epsilon=layernorm_eps)

        self.dropout1 = Dropout(dropout_rate)
        self.dropout2 = Dropout(dropout_rate)
    

    def call(self, x, training, mask):
        # Pass the Q, V, K matrices and a boolean mask to the Multi-head Attention layer.
        # To compute self-attention Q, V and K should be the same (x)
        self_attn_output, _ = self.mha(x, x, x, mask)
        # Apply dropout layer to the self-attention output
        self_attn_output = self.dropout1(self_attn_output, training=training)
        # Apply layer normalization on sum of the input and the attention output
        mult_attn_out = self.layernorm1(x + self_attn_output) 
        # Pass the output of the multi-head attention layer through a ffn
        ffn_output = self.ffn(mult_attn_out) 
        # Apply dropout layer to ffn output
        ffn_output = self.dropout2(ffn_output, training=training)
        # Apply layer normalization on sum of the output from multi-head attention
        # and ffn output to get the output of the encoder layer
        encoder_layer_out = self.layernorm2(ffn_output + mult_attn_out)
        
        return encoder_layer_out

### Transformer Encoder

In [None]:
class Encoder(tf.keras.layers.Layer):   
    def __init__(self, num_layers, embedding_dim, num_heads, fully_connected_dim, input_vocab_size,
               maximum_position_encoding, dropout_rate=0.1, layernorm_eps=1e-6):
        super(Encoder, self).__init__()

        self.embedding_dim = embedding_dim
        self.num_layers = num_layers

        self.embedding = Embedding(input_vocab_size, self.embedding_dim)
        self.pos_encoding = get_positional_encodings(maximum_position_encoding, 
                                                     self.embedding_dim)
 
        self.enc_layers = [EncoderLayer(embedding_dim=self.embedding_dim,
                                        num_heads=num_heads,
                                        fully_connected_dim=fully_connected_dim,
                                        dropout_rate=dropout_rate,
                                        layernorm_eps=layernorm_eps) 
                           for _ in range(self.num_layers)]

        self.dropout = Dropout(dropout_rate)
        
        
    def call(self, x, training, mask):
        seq_len = tf.shape(x)[1]
        
        # Pass input through the Embedding layer
        x = self.embedding(x)
        # Scale embedding by multiplying it by the square root of the embedding dimension
        x *= tf.math.sqrt(tf.cast(self.embedding_dim,tf.float32))
        # Add the position encoding to embedding
        x += self.pos_encoding[:, :seq_len, :]
        # Pass the encoded embedding through a dropout layer
        x = self.dropout(x, training=training) 
        # Pass the output through the stack of encoding layers 
        for i in range(self.num_layers):
            x = self.enc_layers[i](x, training, mask)

        return x 

### Transformer Decoder Layer

In [None]:
class DecoderLayer(tf.keras.layers.Layer):
    def __init__(self, embedding_dim, num_heads, fully_connected_dim, dropout_rate=0.1, layernorm_eps=1e-6):
        super(DecoderLayer, self).__init__()

        self.mha1 = MultiHeadAttention(d_model=embedding_dim,
                                       num_heads=num_heads)

        self.mha2 = MultiHeadAttention(d_model=embedding_dim,
                                       num_heads=num_heads)

        self.ffn = FullyConnected(embedding_dim=embedding_dim,
                                  fully_connected_dim=fully_connected_dim)

        self.layernorm1 = LayerNormalization(epsilon=layernorm_eps)
        self.layernorm2 = LayerNormalization(epsilon=layernorm_eps)
        self.layernorm3 = LayerNormalization(epsilon=layernorm_eps)

        self.dropout1 = Dropout(dropout_rate)
        self.dropout2 = Dropout(dropout_rate)
        self.dropout3 = Dropout(dropout_rate)
    
    
    def call(self, x, enc_output, training, look_ahead_mask, padding_mask):       
        # Calculate self-attention and return attention scores as attn_weights_block1
        attn1, attn_weights_block1 = self.mha1(x, x, x, look_ahead_mask)
        # Apply dropout layer on the attention output
        attn1 = self.dropout1(attn1, training = training)
        # Apply layer normalization to the sum of the attention output and the input
        out1 = self.layernorm1(attn1 + x)
        # Calculate self-attention using the Q from the first block and K and V from 
        # the encoder output and return attention scores as attn_weights_block2
        attn2, attn_weights_block2 = self.mha2(enc_output, enc_output, out1, padding_mask)          
        # Apply dropout layer on the attention output
        attn2 = self.dropout2(attn2, training=training)
        # Apply layer normalization to the sum of the attention output and the output of the first block 
        out2 = self.layernorm2(attn2 + out1)
        # Pass the output of the second block through a ffn
        ffn_output = self.ffn(out2)
        # Apply a dropout layer to the ffn output
        ffn_output = self.dropout3(ffn_output, training=training)
        # Apply layer normalization to the sum of the ffn output and the output of the second block
        out3 =  self.layernorm3(ffn_output + out2)

        return out3, attn_weights_block1, attn_weights_block2

### Transformer Decoder

In [None]:
class Decoder(tf.keras.layers.Layer):
    def __init__(self, num_layers, embedding_dim, num_heads, fully_connected_dim, target_vocab_size,
               maximum_position_encoding, dropout_rate=0.1, layernorm_eps=1e-6):
        super(Decoder, self).__init__()

        self.embedding_dim = embedding_dim
        self.num_layers = num_layers

        self.embedding = Embedding(target_vocab_size, self.embedding_dim)
        self.pos_encoding = get_positional_encodings(maximum_position_encoding, self.embedding_dim)

        self.dec_layers = [DecoderLayer(embedding_dim=self.embedding_dim,
                                        num_heads=num_heads,
                                        fully_connected_dim=fully_connected_dim,
                                        dropout_rate=dropout_rate,
                                        layernorm_eps=layernorm_eps) 
                           for _ in range(self.num_layers)]
                           
        self.dropout = Dropout(dropout_rate)
    
    
    def call(self, x, enc_output, training, look_ahead_mask, padding_mask):
        seq_len = tf.shape(x)[1]
        attention_weights = {}
        
        # Create word embeddings 
        x = self.embedding(x)
        # Scale embedding by multiplying it by the square root of the embedding dimension
        x *= tf.math.sqrt(tf.cast(self.embedding_dim, tf.float32))
        # Calculate positional encodings and add to word embedding
        x += self.pos_encoding[:, :seq_len, :]
        # Pass the encoded embeddings through a dropout layer
        x = self.dropout(x, training=training)
        # Pass the output through a stack of decoder layers and update attention_weights
        for i in range(self.num_layers):
            x, block1, block2 = self.dec_layers[i](x, enc_output, training, look_ahead_mask, padding_mask)

            # Update attention_weights dictionary with the attention weights of block 1 and block 2
            attention_weights['decoder_layer{}_block1_self_att'.format(i+1)] = block1
            attention_weights['decoder_layer{}_block2_decenc_att'.format(i+1)] = block2
        
        return x, attention_weights

### Transformer (Encoder + Decoder)

In [None]:
class Transformer(tf.keras.Model):
    """
    Complete transformer with an Encoder and a Decoder
    """
    def __init__(self, num_layers, embedding_dim, num_heads, fully_connected_dim, input_vocab_size, 
                 target_vocab_size, max_positional_encoding_input, max_positional_encoding_target,
                 dropout_rate=0.1, layernorm_eps=1e-6):
        super(Transformer, self).__init__()

        self.encoder = Encoder(num_layers=num_layers,
                               embedding_dim=embedding_dim,
                               num_heads=num_heads,
                               fully_connected_dim=fully_connected_dim,
                               input_vocab_size=input_vocab_size,
                               maximum_position_encoding=max_positional_encoding_input,
                               dropout_rate=dropout_rate,
                               layernorm_eps=layernorm_eps)

        self.decoder = Decoder(num_layers=num_layers, 
                               embedding_dim=embedding_dim,
                               num_heads=num_heads,
                               fully_connected_dim=fully_connected_dim,
                               target_vocab_size=target_vocab_size, 
                               maximum_position_encoding=max_positional_encoding_target,
                               dropout_rate=dropout_rate,
                               layernorm_eps=layernorm_eps)

        self.final_layer = Dense(target_vocab_size, activation='softmax')
    
    
    def call(self, inp, tar, training, enc_padding_mask, look_ahead_mask, dec_padding_mask):
        enc_output = self.encoder(inp, training, enc_padding_mask)
        dec_output, attention_weights = self.decoder(tar, enc_output, training, look_ahead_mask, dec_padding_mask)
        
        final_output = self.final_layer(dec_output)

        return final_output, attention_weights