# Transformer from Scratch

Author: Deeepwin, Jason Brownlee [(Machine Learning Mastery)](https://machinelearningmastery.com)   
Date: 05.11.2022 
***

## Setup

In [26]:
import os
import numpy as np
import tensorflow as tf

from tensorflow import math, matmul, reshape, shape, transpose, cast, float32, linalg, ones, maximum, newaxis
from tensorflow.keras import Model
from tensorflow.keras.layers import Dense, LayerNormalization, Layer, ReLU, Dropout, TextVectorization, Embedding, Input
 
from keras.backend import softmax

In [27]:
np.set_printoptions(linewidth=160)

# disable GPU
os.environ["CUDA_VISIBLE_DEVICES"] = "-1"

## Configuration

In [28]:
num_heads       = 8         # Number of self-attention heads
d_k             = 64        # Dimensionality of the linearly projected queries and keys
d_v             = 64        # Dimensionality of the linearly projected values
dense_dim       = 2048      # Dimensionality of the inner fully connected layer
embed_dim       = 512       # Dimensionality of the model sub-layers' outputs
enc_num_layers  = 6         # Number of layers in the encoder stack
 
dropout_rate    = 0.1       # Frequency of dropping the input units in the dropout layers

vocab_size      = 15000     # Vocabulary size
sequence_length = 20        # Maximum length of the input sequence

batch_size      = 32        # Batch size from the training process

## Attention Layer

See [(Link)](https://machinelearningmastery.com/how-to-implement-multi-head-attention-from-scratch-in-tensorflow-and-keras) and [(Link)](https://machinelearningmastery.com/how-to-implement-scaled-dot-product-attention-from-scratch-in-tensorflow-and-keras)

![image](pics/dot_product_1.png)

You may note that the scaled dot-product attention can also apply a mask to the attention scores before feeding them into the softmax function. 

Since the word embeddings are zero-padded to a specific sequence length, a ***padding mask needs to be introduced in order to prevent the zero tokens from being processed*** along with the input in both the encoder and decoder stages. Furthermore, a look-ahead mask is also required to prevent the decoder from attending to succeeding words, such that the prediction for a particular word can only depend on known outputs for the words that come before it.

These look-ahead and padding masks are applied inside the scaled dot-product attention set to -infinity all the values in the input to the softmax function that should not be considered. For each of these large negative inputs, the softmax function will, in turn, produce an output value that is close to zero, effectively masking them out. The use of these masks will become clearer when you progress to the implementation of the encoder and decoder blocks in separate tutorials.

In [29]:
# Implementing the Scaled-Dot Product Attention
class DotProductAttention(Layer):

    def __init__(self, **kwargs):
    
        super(DotProductAttention, self).__init__(**kwargs)
 
    def call(self, queries, keys, values, d_k, mask=None):
    
        # Scoring the queries against the keys after transposing the latter, and scaling
        scores = matmul(queries, keys, transpose_b=True) / math.sqrt(cast(d_k, float32))
 
        # Apply mask to the attention scores
        if mask is not None:
            scores += -1e9 * mask
 
        # Computing the weights by a softmax operation
        weights = softmax(scores)
 
        # Computing the attention by a weighted sum of the value vectors
        return matmul(weights, values)
 
# Implementing the Multi-Head Attention
class MultiHeadAttention(Layer):

    def __init__(self, num_heads, d_k, d_v, embed_dim, **kwargs):
    
        super(MultiHeadAttention, self).__init__(**kwargs)

        self.attention = DotProductAttention()      # Scaled dot product attention
        self.heads = num_heads                      # Number of attention heads to use
        self.d_k = d_k                              # Dimensionality of the linearly projected queries and keys
        self.d_v = d_v                              # Dimensionality of the linearly projected values
        self.embed_dim = embed_dim                  # Dimensionality of the model
        self.W_q = Dense(d_k)                       # Learned projection matrix for the queries
        self.W_k = Dense(d_k)                       # Learned projection matrix for the keys
        self.W_v = Dense(d_v)                       # Learned projection matrix for the values
        self.W_o = Dense(embed_dim)                 # Learned projection matrix for the multi-head output
 
    def reshape_tensor(self, x, heads, flag):
        if flag:
            # Tensor shape after reshaping and transposing: (batch_size, heads, seq_length, -1)
            x = reshape(x, shape=(shape(x)[0], shape(x)[1], heads, -1))
            x = transpose(x, perm=(0, 2, 1, 3))
        else:
            # Reverting the reshaping and transposing operations: (batch_size, seq_length, d_k)
            x = transpose(x, perm=(0, 2, 1, 3))
            x = reshape(x, shape=(shape(x)[0], shape(x)[1], self.d_k))
        return x
 
    def call(self, queries, keys, values, mask=None):
 
        # Rearrange the queries to be able to compute all heads in parallel
        q_reshaped = self.reshape_tensor(self.W_q(queries), self.heads, True)
        
        # Resulting tensor shape: (batch_size, heads, input_seq_length, -1)
 
        # Rearrange the keys to be able to compute all heads in parallel
        k_reshaped = self.reshape_tensor(self.W_k(keys), self.heads, True)
        # Resulting tensor shape: (batch_size, heads, input_seq_length, -1)
 
        # Rearrange the values to be able to compute all heads in parallel
        v_reshaped = self.reshape_tensor(self.W_v(values), self.heads, True)
        # Resulting tensor shape: (batch_size, heads, input_seq_length, -1)
 
        # Compute the multi-head attention output using the reshaped queries, keys and values
        o_reshaped = self.attention(q_reshaped, k_reshaped, v_reshaped, self.d_k, mask)
        # Resulting tensor shape: (batch_size, heads, input_seq_length, -1)
 
        # Rearrange back the output into concatenated form
        output = self.reshape_tensor(o_reshaped, self.heads, False)
        # Resulting tensor shape: (batch_size, input_seq_length, d_v)
 
        # Apply one final linear projection to the output to generate the multi-head attention
        # Resulting tensor shape: (batch_size, input_seq_length, embed_dim)
        return self.W_o(output)


### Test

In [30]:
from numpy import random
 
queries     = random.random((batch_size, sequence_length, d_k))
keys        = random.random((batch_size, sequence_length, d_k))
values      = random.random((batch_size, sequence_length, d_v))
 
output = MultiHeadAttention(num_heads, d_k, d_v, embed_dim)(queries, keys, values);
output.numpy().shape

(32, 20, 512)

## Embedding Layer

See [(Link)](https://machinelearningmastery.com/the-transformer-positional-encoding-layer-in-keras-part-2/)

In [31]:

class PositionEmbeddingFixedWeights(Layer):

    def __init__(self, sequence_length, vocab_size, output_dim, **kwargs):
    
        super(PositionEmbeddingFixedWeights, self).__init__(**kwargs)

        word_embedding_matrix = self.get_position_encoding(vocab_size, output_dim)   
        position_embedding_matrix = self.get_position_encoding(sequence_length, output_dim)     

        self.word_embedding_layer = Embedding(
            input_dim=vocab_size, output_dim=output_dim,
            weights=[word_embedding_matrix],
            trainable=False
        )
        self.position_embedding_layer = Embedding(
            input_dim=sequence_length, output_dim=output_dim,
            weights=[position_embedding_matrix],
            trainable=False
        )
             
    def get_position_encoding(self, seq_len, d, n=10000):
        P = np.zeros((seq_len, d))
        for k in range(seq_len):
            for i in np.arange(int(d/2)):
                denominator = np.power(n, 2*i/d)
                P[k, 2*i] = np.sin(k/denominator)
                P[k, 2*i+1] = np.cos(k/denominator)
        return P
 
    def call(self, inputs):        
        position_indices = tf.range(tf.shape(inputs)[-1])
        embedded_words = self.word_embedding_layer(inputs)
        embedded_indices = self.position_embedding_layer(position_indices)
        return embedded_words + embedded_indices

### Test

In [32]:
sentences = [["I am a robot"], ["you too robot"]]
sentence_data = tf.data.Dataset.from_tensor_slices(sentences)

# Create the TextVectorization layer
vectorize_layer = TextVectorization(max_tokens=vocab_size,
                                    output_sequence_length=vocab_size)

# Train the layer to create a dictionary
vectorize_layer.adapt(sentence_data)

# Convert all sentences to tensors
word_tensors = tf.convert_to_tensor(sentences, dtype=tf.string)

# Use the word tensors to get vectorized phrases
vectorized_words = vectorize_layer(word_tensors)
print("Vocabulary: ", vectorize_layer.get_vocabulary())
print("Vectorized words: ", vectorized_words.numpy())

Vocabulary:  ['', '[UNK]', 'robot', 'you', 'too', 'i', 'am', 'a']
Vectorized words:  [[5 6 7 ... 0 0 0]
 [3 4 2 ... 0 0 0]]


In [33]:
attnisallyouneed_embedding = PositionEmbeddingFixedWeights(sequence_length, vocab_size, sequence_length)
attnisallyouneed_output = attnisallyouneed_embedding(vectorized_words)
attnisallyouneed_output.numpy().shape

InvalidArgumentError: Exception encountered when calling layer "embedding_11" "                 f"(type Embedding).

{{function_node __wrapped__ResourceGather_device_/job:localhost/replica:0/task:0/device:CPU:0}} indices[11875] = 11875 is not in [0, 20) [Op:ResourceGather]

Call arguments received by layer "embedding_11" "                 f"(type Embedding):
  • inputs=tf.Tensor(shape=(15000,), dtype=int32)

## Encoder

See [(Link)](https://machinelearningmastery.com/implementing-the-transformer-encoder-from-scratch-in-tensorflow-and-keras)

In [None]:
# Implementing the Add & Norm Layer
class AddNormalization(Layer):

    def __init__(self, **kwargs):
    
        super(AddNormalization, self).__init__(**kwargs)
        self.layer_norm = LayerNormalization()  # Layer normalization layer
 
    def call(self, x, sublayer_x):
    
        # The sublayer input and output need to be of the same shape to be summed
        add = x + sublayer_x
 
        # Apply layer normalization to the sum
        return self.layer_norm(add)
 
# Implementing the Feed-Forward Layer
class FeedForward(Layer):
    
    def __init__(self, dense_dim, embed_dim, **kwargs):
        super(FeedForward, self).__init__(**kwargs)
        self.fully_connected1 = Dense(dense_dim)    # First fully connected layer
        self.fully_connected2 = Dense(embed_dim)    # Second fully connected layer
        self.activation = ReLU()                    # ReLU activation layer
 
    def call(self, x):
        
        # The input is passed into the two fully-connected layers, with a ReLU in between
        x_fc1 = self.fully_connected1(x)
 
        return self.fully_connected2(self.activation(x_fc1))
 
# Implementing the Encoder Layer
class EncoderLayer(Layer):
    def __init__(self, sequence_length, num_heads, d_k, d_v, embed_dim, dense_dim, rate, **kwargs):
        super(EncoderLayer, self).__init__(**kwargs)
        self.build(input_shape=[None, sequence_length, embed_dim])
        self.sequence_length = sequence_length
        self.embed_dim = embed_dim
        self.multihead_attention = MultiHeadAttention(num_heads, d_k, d_v, embed_dim)
        self.dropout1 = Dropout(rate)
        self.add_norm1 = AddNormalization()
        self.feed_forward = FeedForward(dense_dim, embed_dim)
        self.dropout2 = Dropout(rate)
        self.add_norm2 = AddNormalization()
 
    def build_graph(self):
        input_layer = Input(shape=(self.sequence_length, self.embed_dim))
        return Model(inputs=[input_layer], outputs=self.call(input_layer, None, True))

    def call(self, x, padding_mask, training):

        # Multi-head attention layer
        multihead_output = self.multihead_attention(x, x, x, padding_mask)
        # Expected output shape = (batch_size, sequence_length, embed_dim)
 
        # Add in a dropout layer
        multihead_output = self.dropout1(multihead_output, training=training)
 
        # Followed by an Add & Norm layer
        addnorm_output = self.add_norm1(x, multihead_output)
        # Expected output shape = (batch_size, sequence_length, embed_dim)
 
        # Followed by a fully connected layer
        feedforward_output = self.feed_forward(addnorm_output)
        # Expected output shape = (batch_size, sequence_length, embed_dim)
 
        # Add in another dropout layer
        feedforward_output = self.dropout2(feedforward_output, training=training)
 
        # Followed by another Add & Norm layer
        return self.add_norm2(addnorm_output, feedforward_output)
 
# Implementing the Encoder
class Encoder(Layer):

    def __init__(self, vocab_size, sequence_length, num_heads, d_k, d_v, embed_dim, dense_dim, enc_num_layers, rate, **kwargs):

        super(Encoder, self).__init__(**kwargs)
        
        self.pos_encoding   = PositionEmbeddingFixedWeights(sequence_length, vocab_size, embed_dim)
        self.dropout        = Dropout(rate)
        self.encoder_layer  = [EncoderLayer(sequence_length, num_heads, d_k, d_v, embed_dim, dense_dim, rate) for _ in range(enc_num_layers)]
 
    def call(self, input_sentence, padding_mask, training):
        
        # Generate the positional encoding
        pos_encoding_output = self.pos_encoding(input_sentence)
        # Expected output shape = (batch_size, sequence_length, embed_dim)
 
        # Add in a dropout layer
        x = self.dropout(pos_encoding_output, training=training)
 
        # Pass on the positional encoded values to each encoder layer
        for i, layer in enumerate(self.encoder_layer):
            x = layer(x, padding_mask, training)
 
        return x


### Test

In [None]:
enc_input = random.random((batch_size, sequence_length))

(32, 20)

In [None]:
if False:
    output = Encoder(vocab_size, sequence_length, num_heads, d_k, d_v, embed_dim, dense_dim, enc_num_layers, dropout_rate)(enc_input, None, True)
    output.numpy().shape
    output.build_graph().summary()

## Decoder

See [(Link)](https://machinelearningmastery.com/implementing-the-transformer-decoder-from-scratch-in-tensorflow-and-keras)   

In [34]:
# Implementing the Decoder Layer
class DecoderLayer(Layer):
    def __init__(self, sequence_length, num_heads, d_k, d_v, embed_dim, dense_dim, rate, **kwargs):
        
        super(DecoderLayer, self).__init__(**kwargs)

        self.build(input_shape=[None, sequence_length, embed_dim])
        self.sequence_length = sequence_length
        self.embed_dim = embed_dim
        self.multihead_attention1 = MultiHeadAttention(num_heads, d_k, d_v, embed_dim)
        self.dropout1 = Dropout(rate)
        self.add_norm1 = AddNormalization()
        self.multihead_attention2 = MultiHeadAttention(num_heads, d_k, d_v, embed_dim)
        self.dropout2 = Dropout(rate)
        self.add_norm2 = AddNormalization()
        self.feed_forward = FeedForward(dense_dim, embed_dim)
        self.dropout3 = Dropout(rate)
        self.add_norm3 = AddNormalization()

    def build_graph(self):
        input_layer = Input(shape=(self.sequence_length, self.embed_dim))
        return Model(inputs=[input_layer], outputs=self.call(input_layer, input_layer, None, None, True))
        
    def call(self, x, encoder_output, lookahead_mask, padding_mask, training):

        # Multi-head attention layer                                            # x = encoder_output = (32, 20, 512)
                                                                                # lookahead_mask = (32, 1, 20, 20)
                                                                                # padding_mask = (32, 1, 1, 20)
        multihead_output1 = self.multihead_attention1(x, x, x, lookahead_mask)
        # Expected output shape = (batch_size, sequence_length, embed_dim)
 
        # Add in a dropout layer
        multihead_output1 = self.dropout1(multihead_output1, training=training)
 
        # Followed by an Add & Norm layer
        addnorm_output1 = self.add_norm1(x, multihead_output1)
        # Expected output shape = (batch_size, sequence_length, embed_dim)
 
        # Followed by another multi-head attention layer
        multihead_output2 = self.multihead_attention2(addnorm_output1, encoder_output, encoder_output, padding_mask)
 
        # Add in another dropout layer
        multihead_output2 = self.dropout2(multihead_output2, training=training)
 
        # Followed by another Add & Norm layer
        addnorm_output2 = self.add_norm1(addnorm_output1, multihead_output2)
 
        # Followed by a fully connected layer
        feedforward_output = self.feed_forward(addnorm_output2)
        # Expected output shape = (batch_size, sequence_length, embed_dim)
 
        # Add in another dropout layer
        feedforward_output = self.dropout3(feedforward_output, training=training)
 
        # Followed by another Add & Norm layer
        return self.add_norm3(addnorm_output2, feedforward_output)
 
# Implementing the Decoder
class Decoder(Layer):

    def __init__(self, vocab_size, sequence_length, num_heads, d_k, d_v, embed_dim, dense_dim, enc_num_layers, rate, **kwargs):
    
        super(Decoder, self).__init__(**kwargs)
    
        self.pos_encoding = PositionEmbeddingFixedWeights(sequence_length, vocab_size, embed_dim)
        self.dropout = Dropout(rate)
        self.decoder_layer = [DecoderLayer(sequence_length, num_heads, d_k, d_v, embed_dim, dense_dim, rate) for _ in range(enc_num_layers)]
 
    def call(self, output_target, encoder_output, lookahead_mask, padding_mask, training):

        # Generate the positional encoding                                          # output_target = (32, 20)
        pos_encoding_output = self.pos_encoding(output_target)                      # (32, 20, 512)
        # Expected output shape = (number of sentences, sequence_length, embed_dim)
 
        # Add in a dropout layer
        x = self.dropout(pos_encoding_output, training=training)                    # (32, 20, 512)
 
        # Pass on the positional encoded values to each encoder layer
        for i, layer in enumerate(self.decoder_layer):                              # encoder_output = (32, 20, 512)
                                                                                    # lookahead_mask = (32, 1, 20, 20)
            x = layer(x, encoder_output, lookahead_mask, padding_mask, training)    # (32, 20, 512)
 
        return x

### Test

In [35]:
dec_input = random.random((batch_size, sequence_length))
enc_output = random.random((batch_size, sequence_length, embed_dim))

In [36]:
if False:
    output = Decoder(vocab_size, sequence_length, num_heads, d_k, d_v, embed_dim, dense_dim, enc_num_layers, dropout_rate)(dec_input, enc_output, None, True)
    output.numpy().shape
    output.build_graph().summary()

## Transformer Model

See [(Link)](https://machinelearningmastery.com/joining-the-transformer-encoder-and-decoder-and-masking/)

In [37]:

class TransformerModel(Model):
    
    def __init__(self, vocab_size, sequence_length, num_heads, d_k, d_v, embed_dim, d_ff_inner, enc_num_layers, rate, **kwargs):

        super(TransformerModel, self).__init__(**kwargs)
 
        # Set up the encoder
        self.encoder = Encoder(vocab_size, sequence_length, num_heads, d_k, d_v, embed_dim, d_ff_inner, enc_num_layers, rate)
 
        # Set up the decoder
        self.decoder = Decoder(vocab_size, sequence_length, num_heads, d_k, d_v, embed_dim, d_ff_inner, enc_num_layers, rate)
 
        # Define the final dense layer
        self.model_last_layer = Dense(vocab_size)
 
    def call(self, encoder_input, decoder_input, training):
 
        # Create padding mask to mask the encoder inputs and the encoder outputs in the decoder
        enc_padding_mask = self.padding_mask(encoder_input)
 
        # Create and combine padding and look-ahead masks to be fed into the decoder
        dec_in_padding_mask = self.padding_mask(decoder_input)
        dec_in_lookahead_mask = self.lookahead_mask(decoder_input.shape[1])
        dec_in_lookahead_mask = maximum(dec_in_padding_mask, dec_in_lookahead_mask)
 
        # Feed the input into the encoder
        encoder_output = self.encoder(encoder_input, enc_padding_mask, training)
 
        # Feed the encoder output into the decoder
        decoder_output = self.decoder(decoder_input, encoder_output, dec_in_lookahead_mask, enc_padding_mask, training)
 
        # Pass the decoder output through a final dense layer
        model_output = self.model_last_layer(decoder_output)
 
        return model_output

    def padding_mask(self, input):

        # Create mask which marks the zero padding values in the input by a 1.0
        mask = math.equal(input, 0)
        mask = cast(mask, float32)
 
        # The shape of the mask should be broadcastable to the shape
        # of the attention weights that it will be masking later on
        return mask[:, newaxis, newaxis, :]
 
    def lookahead_mask(self, shape):
        
        # Mask out future entries by marking them with a 1.0
        mask = 1 - linalg.band_part(ones((shape, shape)), -1, 0)
 
        return mask

## Transformer

In [38]:
# build transformer model
transformer = TransformerModel(vocab_size, sequence_length, num_heads, d_k, d_v, embed_dim, dense_dim, enc_num_layers, dropout_rate)

### Test

In [39]:
enc_input.shape, dec_input.shape

((32, 20), (32, 20))

In [41]:
output = transformer(enc_input, dec_input)
output.numpy().shape



Let's test the decoder part.

In [19]:
# Create padding mask to mask the encoder inputs and the encoder outputs in the decoder
enc_padding_mask = transformer.padding_mask(enc_input)

# Create and combine padding and look-ahead masks to be fed into the decoder
dec_in_padding_mask = transformer.padding_mask(dec_input)
dec_in_lookahead_mask = transformer.lookahead_mask(dec_input.shape[1])
dec_in_lookahead_mask = maximum(dec_in_padding_mask, dec_in_lookahead_mask)

# Feed the encoder output into the decoder
decoder_output = transformer.decoder(dec_input, enc_output, dec_in_lookahead_mask, enc_padding_mask)

