# Transformer Encoder in Keras

In [1]:
import tensorflow as tf
import numpy as np
from tensorflow import convert_to_tensor, string, matmul, math, cast, float32, reshape, shape, transpose
from tensorflow.keras.layers import TextVectorization, Embedding, Layer
from tensorflow.keras.layers import LayerNormalization, Layer, Dense, ReLU, Dropout
from numpy import random
from tensorflow.keras.backend import softmax

In [2]:
class FeedForward(Layer):
    def __init__(self, d_ff, d_model, **kwargs):
        super().__init__(**kwargs)
        self.fully_connected_1 = Dense(d_ff) # First fully connected layer
        self.fully_connected_2 = Dense(d_model) # Second Fully connected layer
        self.activation = ReLU() #Relu activation layer
    
    def call(self,x):
        # The input is passed into the two fully-connected layers, with a ReLU in between
        x_fc1 = self.fully_connected_1(x)
        return self.fully_connected_2(self.activation(x_fc1))

In [3]:
class AddNormalization(Layer):
    def __init__(self,**kwargs):
        super().__init__(**kwargs)
        self.layer_norm = LayerNormalization() # Later Normalization layer
    
    def call(self, x, sublayer_x):
        add = x + sublayer_x
        return self.layer_norm(add)


In [4]:
class DotProductAttention(Layer):
    def __int__(self,**kwargs):
        super().__init__(**kwargs)
    
    def call(self,queries,keys,values,d_k,mask=None):
        # Scoring the queries against the keys after transposing the latter, and scaling 
        scores = matmul(queries, keys, transpose_b=True) / math.sqrt(cast(d_k, float32))
        # Apply mask to the attention scores
        if mask is not None: scores += -1e9 * mask
                 # Computing the weights by a softmax operation
        weights = softmax(scores)
        # Computing the attention by a weighted sum of the value vectors
        return matmul(weights, values)

In [5]:
class MultiHeadAttention(Layer):
    def __init__(self, h, d_k, d_v, d_model, **kwargs):
        super().__init__(**kwargs)
        self.attention = DotProductAttention() #Scaled dot product attention
        self.heads = h # Number of attention heads to use
        self.d_k = d_k # Dimensionality of the linearly projected queries and keys
        self.d_v = d_v # Dimensionality of the linearly projected values
        self.W_q = Dense(d_k) # Learned projection matrix for the queries
        self.W_k = Dense(d_k) # Learned projection matrix for the keys
        self.W_v = Dense(d_v) # Learned projection matrix for the values
        self.W_o = Dense(d_model) # Learned projection matrix for the multi-head output
            
    def reshape_tensor(self, x, heads, flag):
        if flag:
            x = reshape(x, shape=(shape(x)[0], shape(x)[1], heads, -1))
            x = transpose(x, perm=(0, 2, 1, 3))
        else:
            x = transpose(x, perm=(0, 2, 1, 3))
            x = reshape(x, shape=(shape(x)[0], shape(x)[1], -1))
        return x
    
    def call(self, queries, keys, values, mask=None):
        q_reshaped = self.reshape_tensor(self.W_q(queries), self.heads, True)
        k_reshaped = self.reshape_tensor(self.W_k(keys), self.heads, True)
        v_reshaped = self.reshape_tensor(self.W_v(values), self.heads, True)
        o_reshaped = self.attention(q_reshaped, k_reshaped, v_reshaped, self.d_k, mask)
        
        output = self.reshape_tensor(o_reshaped, self.heads, False)
        return self.W_o(output)

In [6]:
class EncoderLayer(Layer):
    def __init__(self, h, d_k, d_v, d_model, d_ff, rate, **kwargs):
        super().__init__(**kwargs)
        self.multihead_attention = MultiHeadAttention(h, d_k, d_v, d_model)
        self.dropout1 = Dropout(rate)
        self.add_norm1 = AddNormalization()
        self.feed_forward = FeedForward(d_ff, d_model)
        self.dropout2 = Dropout(rate)
        self.add_norm2 = AddNormalization()
    
    def call(self, x, padding_mask, training):
        multihead_output = self.multihead_attention(x, x, x, padding_mask)
        # Expected output shape = (batch_size, sequence_length, d_model)
        
        # Add in a dropout layer
        multihead_output = self.dropout1(multihead_output, training=training)
        
        # Followed by an Add & Norm layer
        addnorm_output = self.add_norm1(x, multihead_output)
        # Expected output shape = (batch_size, sequence_length, d_model)
        
        feedforward_output = self.feed_forward(addnorm_output)
        # Expected output shape = (batch_size, sequence_length, d_model)
        
        # Add in another dropout layer
        feedforward_output = self.dropout2(feedforward_output, training=training) 
        
        # Followed by another Add & Norm layer
        return self.add_norm2(addnorm_output, feedforward_output)

In [7]:
class PositionEmbeddingFixedWeights(Layer):
    def __init__(self, seq_length, vocab_size, output_dim, **kwargs):
        super().__init__(**kwargs)
        word_embedding_matrix = self.get_position_encoding(vocab_size, output_dim)
        pos_embedding_matrix = self.get_position_encoding(seq_length, output_dim)

        self.word_embedding_layer = Embedding(
                                        input_dim = vocab_size,
                                        output_dim = output_dim,
                                        weights = [word_embedding_matrix],
                                        trainable=False)
        self.position_embedding_layer = Embedding(
                                        input_dim = seq_length,
                                        output_dim = output_dim,
                                        weights = [pos_embedding_matrix],
                                        trainable=False)
    
    def get_position_encoding(self, seq_len, d, n=10000):
        P = np.zeros((seq_len, d))
        for k in range(seq_len):
            for i in range(int(d/2)):
                denominator = np.power(n,2*i/d)
                P[k, 2*i] = np.sin(k/denominator)
                P[k, 2*i+1] = np.cos(k/denominator)
        return P
    
    def call(self,inputs):
        position_indices = tf.range(tf.shape(inputs)[-1])
        embedded_words = self.word_embedding_layer(inputs)
        embedded_indices = self.position_embedding_layer(position_indices)
        return embedded_words + embedded_indices

In [8]:
class Encoder(Layer):
    def __init__(self, vocab_size, sequence_length, h, d_k, d_v, d_model, d_ff, n, rate,**kwargs):
        super().__init__(**kwargs)
        self.pos_encoding = PositionEmbeddingFixedWeights(sequence_length, vocab_size,d_model)
        self.dropout = Dropout(rate)
        self.encoder_layer = [EncoderLayer(h, d_k, d_v, d_model, d_ff, rate) for _ in range(n)]
    
    def call(self, input_sentence, padding_mask, training:bool):
        # Generate the positional encoding
        pos_encoding_output = self.pos_encoding(input_sentence)
        # Expected output shape = (batch_size, sequence_length, d_model)
        # Add in a dropout layer
        x = self.dropout(pos_encoding_output, training=training)
        # Pass on the positional encoded values to each encoder layer
        for i, layer in enumerate(self.encoder_layer):
            x = layer(x, padding_mask, training)
        return x
 

### Test your code

In [9]:
h = 8 # Number of self-attention heads
d_k = 64 # Dimensionality of the linearly projected queries and keys 
d_v = 64 # Dimensionality of the linearly projected values
d_ff = 2048 # Dimensionality of the inner fully connected layer 
d_model = 512 # Dimensionality of the model sub-layers' outputs
n = 6 # Number of layers in the encoder stack
batch_size = 64 # Batch size from the training process
dropout_rate = 0.1 # Frequency of dropping the input units in the dropout layers

In [10]:
enc_vocab_size = 20 # Vocabulary size for the encoder 
input_seq_length = 5 # Maximum length of the input sequence
input_seq = random.random((batch_size, input_seq_length))

In [11]:
encoder = Encoder(enc_vocab_size, input_seq_length, h, d_k, d_v, d_model, d_ff, n, dropout_rate) 
# print(encoder(input_seq, None, True))