#### Chapter 5 : Design Patterns for Transformers


##### Positional Encoding sample code

In [None]:
import numpy as np
# Function to generate the positional encoding of a sentence
def positional_encoding(L, d_model):
    pos_enc = np.zeros((L, d_model))
    for pos in range(L):
        for i in range(d_model):
            if i % 2 == 0:
                pos_enc[pos, i] = np.sin(pos / 10000**(i/d_model))
            else:
                pos_enc[pos, i] = np.cos(pos / 10000**((i-1)/d_model))
    return pos_enc


In [None]:
# Define a sentence and generate the positional embedding
sentence = "Hello world"
words = sentence.split()
L = len(words)  # length of the sentence
d_model = 16  # embedding size

pos_enc = positional_encoding(L, d_model)
print(pos_enc)


Building positional encoding in the class for Transformers

In [None]:
# Define the configurations as a dictionary
config = {"seq" : 64,
          "encode_vocab_size":2320,
          "decode_vocab_size":3667,
          "inp_seq_dim" :15,
          "HL" : 128
          
            
}

In [None]:
# Class to prepare the input data with embedding and positional encoding

class InputTransformer(tf.keras.layers.Layer):
    def __init__(self, config):
        super(InputTransformer, self).__init__()
        self.embedding_encode = Embedding(config["encode_vocab_size"], config["HL"])
        self.embedding_decode = Embedding(config["decode_vocab_size"], config["HL"])
        self.positional_encoding = self.create_positional_encoding(config["inp_seq_dim"], config["HL"])
        
    def create_positional_encoding(self, max_seq_length, d_model):
        position = tf.expand_dims(tf.range(max_seq_length, dtype=tf.float32), axis=1)
        div_term = tf.exp(tf.range(0, d_model, 2, dtype=tf.float32) * -(tf.math.log(10000.0) / d_model))
        pos_enc = tf.concat([tf.sin(position * div_term), tf.cos(position * div_term)], axis=-1)
        pos_enc = tf.expand_dims(pos_enc, axis=0)
        return pos_enc
    
    def call(self, inputs,decode=False):
        input_seq = inputs
        seq_length = tf.shape(input_seq)[1]
        
        if decode:
            # Embedding
            embedded_seq = self.embedding_decode(input_seq)
            # Positional Encoding
            encoded_seq = embedded_seq + self.positional_encoding[:, :seq_length, :]
        else:
            # Embedding
            embedded_seq = self.embedding_encode(input_seq)
            # Positional Encoding
            encoded_seq = embedded_seq + self.positional_encoding[:, :seq_length, :]
        
        return encoded_seq


#### Scaled Dot Product attention

In [None]:
import tensorflow as tf
from tensorflow import math, matmul, reshape, shape, transpose, cast, float32 

from tensorflow.keras.layers import Dense, Layer
from tensorflow.keras.backend import softmax


In [None]:
# Define the configurations as a dictionary
config = {"seq" : 64,
          "encode_vocab_size":2320,
          "decode_vocab_size":3667,
          "inp_seq_dim" :15,
          "HL" : 128,
          "dim_QKV" : 64,
         }

In [None]:
# Scaled-Dot Product Attention implementation
class ScaledDotProdAttn(Layer): 
    def __init__(self,config, **kwargs):
        super().__init__(**kwargs)
        self.config = config
            
    def call(self,Q,K,V,mask=None):
        '''
        Inputs : Q,K, V of shape > (batch size (bs),Sequence Length (seq), Embedding dimension)
        returns : Attention of shape > (bs,seq,dim)
        '''
        # Calculating the attention scores and then scaling: Shape (bs,seq,dimension) > bs,seq,seq)
        Attnscores = matmul(Q,K,transpose_b=True) / math.sqrt(cast(self.config["dim_QKV"], float32))
        # Apply mask to the attention scores
        if mask is not None: 
            Attnscores += -1e9 * mask        
        # Softmax applied to calculate weighted attention scores
        Attnweights = softmax(Attnscores) # Shape (bs,seq,seq)
        # Attention computed by multipliying weights with values
        Attn = matmul(Attnweights, V) # Shape (bs,seq,dim)
        return Attn

#### Multihead Attention

In [None]:
# Define the configurations as a dictionary
config = {"seq" : 64,
          "encode_vocab_size":2320,
          "decode_vocab_size":3667,
          "inp_seq_dim" :15,
          "HL" : 128,
          "dim_QKV" : 64,
          "h":8,
          "HFF" : 512,
          "prob" : 0.1,
          "n_layers":2
            
}

In [None]:
class MultiheadAttn(Layer):
    def __init__(self,config, **kwargs):
        super().__init__(**kwargs)
        self.attention = ScaledDotProdAttn(config) # Scaled dot product attention 
        self.heads = config["h"] # Number of attention heads
        self.d_QKV = config["dim_QKV"] # Dimensionality of the linearly projection of queries,keys and values
        self.hl = config["HL"] # Dimensionality of linear layer of the attention model
        self.WQ = Dense(self.hl)
        self.WK = Dense(self.hl)
        self.WV = Dense(self.hl)
        # Learned linear projection matrix of the multi-head output
        self.WO = Dense(self.hl) 
    # Method to reshape tensors for multihead attention
    def split_heads(self, tensor):
        """Function for computing attention on several heads simultaneously
        Splits embedded dimension >  (num_heads, head_depth).
        Transpose the tensor >  (bs, num_heads, ..., head_depth)
        """
        tensor = tf.reshape(tensor, (shape(tensor)[0], shape(tensor)[1],self.heads, -1))
        return tf.transpose(tensor, perm=[0, 2, 1, 3]) 
    def call(self,q,k,v,mask=None):
        # Get the linear projection of Queries,keys and values
        q = self.WQ(q) # (bs, seq, dim) x (dim, hl) --> (bs, seq, hl)
        k = self.WK(k) # --> (bs, seq, hl)
        v = self.WV(v) # --> (bs, seq, hl)
        # Split the heads of the tensors
        Q = self.split_heads(q) # (bs, num_heads, seq, head_depth)
        K = self.split_heads(k) # (bs, num_heads, seq, head_depth)
        V = self.split_heads(v) # (bs, num_heads, seq, head_depth)
        # Calling the Scaled dot product attention
        attn = self.attention(Q,K,V,mask) # (bs, num_heads, seq, head_depth)
        # Transpose the attnetion
        attn = tf.transpose(attn, perm=[0, 2, 1, 3]) 
        # Reshape the attn 
        output = tf.reshape(attn, (shape(attn)[0], -1, self.hl))  # (bs, seq, hl)
        # Return the linear projection of the output
        return self.WO(output)

#### Encoder

In [None]:
# Residual + Normalisation Layer
class ResidNorm(Layer): 
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        # Layer normalization layer
        self.resid_norm = LayerNormalization() 
    def call(self, x, output):
        # The reidual layer which sums up input and output 
        x = x + output
        # Apply layer normalization to the sum
        return self.resid_norm(x)

In [None]:
# Feed forward layer
class FeedForward(Layer):
    def __init__(self, config, **kwargs):
        super().__init__(**kwargs)
        # First fully connected feed forward layer 
        self.ff1 = Dense(config["HFF"]) 
        # Second fully connected feed forward layer 
        self.ff2 = Dense(config["HL"]) 
        # ReLU activation layer
        self.activation = ReLU() 
    def call(self, x):
        # The input is first passed through the feed forward
        ff1 = self.ff1(x)
        # Activation layer for this output
        ac1 = self.activation(ff1)
        # Return the second feed forward layer
        return self.ff2(ac1)

In [None]:
# Class for a layer of encoder
class EncodLayer(Layer):
    def __init__(self,config, **kwargs):
        super().__init__(**kwargs)
        self.mha = MultiheadAttn(config)
        self.dropout = Dropout(config["prob"])
        self.residnorm = ResidNorm()
        self.ff = FeedForward(config)
    def call(self, x, padd_mask, training):
        # Multi-head attention layer
        mha_output = self.mha(x, x, x, padd_mask) # shape >  (bs, seq, dim)
        # dropout layer
        mha_output = self.dropout(mha_output, training=training)
        # residual + normalisation layer
        residNorm_output = self.residnorm(x, mha_output) # shape >  (bs, seq, dim)
        # Fully connected layer
        ff_output = self.ff(residNorm_output) # shape >  (bs, seq, dim)
        # Second dropout layer
        ff_output = self.dropout(ff_output, training=training) 
        # Second residual + normalisation layer
        return self.residnorm(residNorm_output, ff_output)

In [None]:
# Implementing the Encoder
class Encoder(Layer):
    def __init__(self, config,**kwargs):
        super().__init__(**kwargs)
        self.input_tran = InputTransformer(config)
        self.dropout = Dropout(config["prob"])
        self.encoder_layer = [EncodLayer(config) for _ in range(config["n_layers"])]
    def call(self, input_seq, padding_mask, training):
        # Transform input using positional encoding + embedding layer
        input_output = self.input_tran(input_seq) # shape >  (bs, seq, dim)
        # Dropout layer
        x = self.dropout(input_output, training=training)
         # Input with positional encoder is fed into each layer of encoder
        for i, layer in enumerate(self.encoder_layer): 
            x = layer(x, padding_mask, training)
        return x

#### Decoder

First building a sample matrix to demostrate masks

In [None]:
# Define the matrix size
matrix_size = (7,7)
# Create a matrix for the decoder input
input_matrix = tf.random.normal(matrix_size)
input_matrix


In [None]:
# Create a ones matrix
ones_matrix = tf.ones(matrix_size)
# Create a lower triangle mask
lower_triangle_mask = tf.linalg.band_part(ones_matrix, -1, 0)
lower_triangle_mask


In [None]:
# Create the masked input
masked_input = input_matrix * lower_triangle_mask
masked_input


##### Decoder layer and decoder

In [None]:
# Class for a layer of decoder
class DecodeLayer(Layer):
    def __init__(self,config, **kwargs):
        super().__init__(**kwargs)
        self.mha1 = MultiheadAttn(config)
        self.dropout = Dropout(config["prob"])
        self.residnorm = ResidNorm()
        self.mha2 = MultiheadAttn(config)
        self.ff = FeedForward(config)
        
    def call(self, x,encoder_output, la_mask, padd_mask, training):
        # Self Multi-head attention layer
        mha1_output = self.mha1(x, x, x, la_mask) # shape >  (bs, seq, dim)
        # First dropout layer
        mha1_output = self.dropout(mha1_output, training=training)
        # First residual + normalisation layer
        residNorm1_output = self.residnorm(x, mha1_output) # shape >  (bs, seq, dim)
        # Cross Multi-head attention layer
        mha2_output = self.mha1(residNorm1_output, encoder_output, encoder_output, padd_mask) # shape >  (bs, seq, dim)
        # Second dropout layer
        mha2_output = self.dropout(mha2_output, training=training)
        # Second residual + normalisation layer
        residNorm2_output = self.residnorm(residNorm1_output, mha2_output) # shape >  (bs, seq, dim)
        # Fully connected layer
        ff_output = self.ff(residNorm2_output) # shape >  (bs, seq, dim)
        # Third dropout layer
        ff_output = self.dropout(ff_output, training=training) 
        # Third residual + normalisation layer
        return self.residnorm(residNorm2_output, ff_output)

In [None]:
# Implementing the Decoder
class Decoder(Layer):
    def __init__(self, config,**kwargs):
        super().__init__(**kwargs)
        self.input_tran = InputTransformer(config)
        self.dropout = Dropout(config["prob"])
        self.decoder_layer = [DecodeLayer(config) for _ in range(config["n_layers"])]
    def call(self,target_output, encoder_output, la_mask, padd_mask,training):
        # Transform input using positional encoding + embedding layer
        target_transformed = self.input_tran(target_output,True) # shape >  (bs, seq, dim)
        # Dropout layer
        x = self.dropout(target_transformed, training=training)
         # Input with positional encoder is fed into each layer of encoder
        for i, layer in enumerate(self.decoder_layer): 
            x = layer(x, encoder_output, la_mask, padd_mask, training)
        return x

#### Complete transformer model

In [None]:
def create_padding_mask(self, inputs):
        mask = tf.math.equal(inputs, 0)  # shape: (batch_size, seq_length)
        mask = tf.cast(mask, tf.float32)
        mask = tf.expand_dims(tf.expand_dims(mask, axis=1), axis=1)  # shape: (batch_size, 1, 1, seq_length)
        return mask

In [None]:
# Create the look ahead mask
def create_look_ahead_mask(size):
    # Create a matrix of ones with shape (size, size)
    mask = tf.ones((size, size))
    # Set the upper triangular region to 1
    mask = 1- tf.linalg.band_part(mask, -1, 0)
    return mask

In [None]:
class TransformerModel(Model):
    def __init__(self, config, **kwargs):
        super(TransformerModel, self).__init__(**kwargs)
        # Initialise the encoder, decoder and the final dense layers
        self.encoder = Encoder(config)
        self.decoder = Decoder(config)
        self.denseLayer = Dense(config["decode_vocab_size"])
    ##################################################################
    # Create the padding mask 
    def create_padding_mask(self, inputs):
        mask = tf.math.equal(inputs, 0)  # shape: (batch_size, seq_length)
        mask = tf.cast(mask, tf.float32)
        mask = tf.expand_dims(tf.expand_dims(mask, axis=1), axis=1)  # shape: (batch_size, 1, 1, seq_length)
        return mask
    # Create the look ahead mask
    def create_look_ahead_mask(self, size):
        mask = 1 - tf.linalg.band_part(tf.ones((size, size)), -1, 0)
        return mask
    #############################################################
    # The call method for the transformer
    def call(self, encoder_input, decoder_input, training):
        # Create both masks
        enc_pad_mask = self.create_padding_mask(encoder_input)
        dec_look_ahead_mask = self.create_look_ahead_mask(decoder_input.shape[1])
        # Generate Encoder and Decoder multihead attention output
        encoder_output = self.encoder(encoder_input, enc_pad_mask, training)
        decoder_output = self.decoder(decoder_input, encoder_output, dec_look_ahead_mask, enc_pad_mask, training)
        # Final Dense layer for the decoder output
        trans_output = self.denseLayer(decoder_output)
        return trans_output

##### Training the transformer model

The data has to be downloaded from the below link and saved in your local machine

https://github.com/Rishav09/Neural-Machine-Translation-System/blob/master/english-german-both.pkl

In [None]:
from pickle import load
# Define the path to the data
dataPath = "data/english-german-both.pkl"
# Load the data using pickle
raw_dataset = load(open(dataPath, 'rb'))


In [None]:
# Include start and end of string tokens
for i in range(raw_dataset[:, 0].size):
    raw_dataset[i, 0] = "<START> " + raw_dataset[i, 0] + " <EOS>"
    raw_dataset[i, 1] = "<START> " + raw_dataset[i, 1] + " <EOS>”


In [None]:
# Fit a tokenizer
def create_tokenizer(dataset): 

    tokenizer = Tokenizer() 

    tokenizer.fit_on_texts(dataset)
    return tokenizer
# Find the sequence length
def find_seq_length(dataset):
    return max(len(seq.split()) for seq in dataset)
# Find the vocabulary size
def find_vocab_size(tokenizer, dataset):
    tokenizer.fit_on_texts(dataset)
    return len(tokenizer.word_index) + 1


In [None]:
from numpy.random import shuffle

In [None]:
# Random shuffle the dataset
shuffle(raw_dataset)
# Split into train and validation sets
train_split = 0.9
train = raw_dataset[:int(raw_dataset[:,0].shape * train_split)]
val = raw_dataset[int(raw_dataset[:,0].shape * train_split) : ]

# Prepare tokenizer for the encoder 

enc_tokenizer = create_tokenizer(train[:, 0])
enc_seq_length = find_seq_length(train[:, 0])
enc_vocab_size = find_vocab_size(enc_tokenizer, train[:, 0])

# Prepare tokenizer for the decoder 

dec_tokenizer = create_tokenizer(train[:, 1])
dec_seq_length = find_seq_length(train[:, 1])
dec_vocab_size = find_vocab_size(dec_tokenizer, train[:, 1])


In [None]:
# Encode and pad the encoder sequences
trainX = enc_tokenizer.texts_to_sequences(train[:, 0])
trainX = pad_sequences(trainX, maxlen=enc_seq_length, padding=‘post')

# Encode and pad the decoder sequences
trainY = dec_tokenizer.texts_to_sequences(train[:, 1])
trainY = pad_sequences(trainY, maxlen=dec_seq_length, padding=‘post')

# Converting the data set to tensors

trainX = convert_to_tensor(trainX, dtype=int64)
trainY = convert_to_tensor(trainY, dtype=int64)


In [None]:
from tensorflow import data

In [None]:
batch_size = 16
# Training data set
train_dataset = data.Dataset.from_tensor_slices((trainX, trainY[:,:-1],trainY[:,1:])).batch(batch_size)

In [None]:
from tensorflow.keras.optimizers.schedules import LearningRateSchedule
# Custom scheduler for the optimizer
class LRScheduler(LearningRateSchedule):
    def __init__(self, d_model, warmup_steps=4000):
        super().__init__()

        self.d_model = d_model
        self.d_model = tf.cast(self.d_model, tf.float32)

        self.warmup_steps = warmup_steps

    def __call__(self, step):
        step = tf.cast(step, dtype=tf.float32)
        arg1 = tf.math.rsqrt(step)
        arg2 = step * (self.warmup_steps ** -1.5)

        return tf.math.rsqrt(self.d_model) * tf.math.minimum(arg1, arg2)


In [None]:
# Defining the optimiser
optimizer = tf.keras.optimizers.Adam(LRScheduler(config["HL"]), beta_1=0.9, beta_2=0.98,
                                     epsilon=1e-9)

In [None]:
# Masked loss function 
def masked_loss(label, pred):
    mask = label != 0
    loss = sparse_categorical_crossentropy(label, pred,from_logits=True)
    mask = tf.cast(mask, dtype=loss.dtype)
    loss *= mask
    return tf.reduce_sum(loss)/tf.reduce_sum(mask)

# Maked accuracy function
def masked_accuracy(label, pred):
    # Create mask so that the zero padding values are not included in the computation of accuracy
    mask = label != 0
    pred = tf.argmax(pred, axis=2)
    label = tf.cast(label, pred.dtype)
    match = label == pred
    match = match & mask

    match = tf.cast(match, dtype=tf.float32)
    mask = tf.cast(mask, dtype=tf.float32)
    return tf.reduce_sum(match)/tf.reduce_sum(mask)

In [None]:
from tensorflow import data, train, math, reduce_sum, cast, equal, argmax,float32, GradientTape, TensorSpec, function, int64


In [None]:
@function
def train_step(encoder_input, decoder_input, decoder_output): 
    with GradientTape() as tape:
        # Run the forward pass of the model to generate a prediction
        prediction = train_model(encoder_input, decoder_input, training=True) 
        # Compute the training loss
        #loss = loss_fcn(decoder_output, prediction) 
        loss = masked_loss(decoder_output, prediction) 
        # Compute the training accuracy
        #accuracy = accuracy_fcn(decoder_output, prediction)
        accuracy = masked_accuracy(decoder_output, prediction)
        # Retrieve gradients of the trainable variables with respect to the training loss
    gradients = tape.gradient(loss, train_model.trainable_weights) 
    # Update the values of the trainable variables by gradient descent
    optimizer.apply_gradients(zip(gradients, train_model.trainable_weights))
    train_loss(loss)
    train_accuracy(accuracy)

In [None]:
from tensorflow.keras.metrics import Mean

In [None]:
# Initialise the metrics for monitoring
train_loss = Mean(name='train_loss')
train_accuracy = Mean(name='train_accuracy')

# Instantiate the transformer model
train_model = TransformerModel(config)

# Start the training 
epochs = 10
for epoch in range(epochs): 
    train_loss.reset_states() 
    train_accuracy.reset_states()
    print("\nStart of epoch %d" % (epoch + 1)) 
    start_time = time()
    for step, (encoder_input,decoder_input,decoder_output) in enumerate(train_dataset):
        train_step(encoder_input, decoder_input, decoder_output)
        if step % 50 == 0:
            print(f"Epoch {epoch+1} Step {step} Loss {train_loss.result():.3f} "
            + f"Accuracy {train_accuracy.result():.3f}")
    # Print epoch number and loss value at the end of every epoch
    print(f"Epoch {epoch+1}: Training Loss {train_loss.result():.4f}, " + f"Training Accuracy {train_accuracy.result():.4f}")
