Importing libs

In [5]:
import tensorflow as tf
import tensorflow.keras.layers as L
from tensorflow.keras.layers import *
import matplotlib.pyplot as plt
import pandas as pd
import time
import numpy as np

Attention (Single, Scaled)

`Q` query shape

`K` key -||-

`V` value -||-

In [4]:
#Dot product
def dp_attention(Q, K, V, mask=None):
    QK = tf.matmul(Q, K, transpose_b=True)
    dk = K.shape[-1]
    scaled_dot_product = QK/np.sqrt(dk)
    
    if mask != None:
        scaled_dot_product += (1.0 - mask) * (-1e9)
    
    weights = tf.nn.softmax(scaled_dot_product, axis=-1)
    output = tf.matmul(weights, V)
    
    return output, weights

Attention (Multihead)

`Q` query shape

`K` key -||-

`V` value -||-

`nH` is a number of heads

`d_model` is an embedding dimenstion

`dk` Q and K depth

`dv` -||- of V

In [8]:
class MultiHeadAttention(L.Layer):
    def __init__(self, nH, d_model, dk, dv):
        super(MultiHeadAttention, self).__init__()
        initializer = tf.keras.initializers.GlorotUniform()
        self.WQ = tf.Variable(initializer(shape=(nH, d_model, dk)), trainable=True)
        self.WK = tf.Variable(initializer(shape=(nH, d_model, dk)), trainable=True)
        self.WV = tf.Variable(initializer(shape=(nH, d_model, dk)), trainable=True)
        self.WO = tf.Variable(initializer(shape=(nH * dv, d_model, dk)), trainable=True)
        
    def call(self, Q, K, V, mask=None):
        Qh = tf.experimental.numpy.dot(Q, self.WQ)
        Kh = tf.experimental.numpy.dot(K, self.WK)
        Vh = tf.experimental.numpy.dot(V, self.WV)
        
        Qh = tf.transpose(Qh, [0, 2, 1, 3])
        Kh = tf.transpose(Kh, [0, 2, 1, 3])
        Vh = tf.transpose(Vh, [0, 2, 1, 3])
        
        Ah,_ = dp_attention(Qh, Kh, Vh, mask=mask)
        s = Ah.shape
        A = tf.reshape(Ah, (s[0], s[2], s[1]*s[3]))
        A = tf.experimental.numpy.dot(A, self.WO)
        
        return A

Feed forward network

In [10]:
class FNNLayer(L.Layer):
    def __init__(self, d_model, dims):
        super(FNNLayer, self).__init__()
        self.layer1 = L.Conv1D(filters=dims, kernel_size=1,activation="relu")
        self.layer2 = L.Conv1D(filters=d_model, kernel_size=1)
        
    def call(self, x):
        x = self.layer1(x)
        output = self.layer2(x)
        
        return output

Positional encoding

In [11]:
def pos_encoding(max_positions, d):
    pos = np.arange(max_positions)[:, np.newaxis]
    k = np.arange(d)[np.newaxis, :]
    
    i = k//2
    angle_rads = pos/(10000**(2*i/d))
    angle_rads[:, 0::2] = np.sin(angle_rads[:, 0::2])
    angle_rads[:, 1::2] = np.cos(angle_rads[:, 1::2])
    pos_encoding = angle_rads[np.newaxis, ...]
    
    return tf.cast(pos_encoding, dtype=tf.float32)

In [13]:
#Padding masking
def pad_mask(decoder_token_ids):
    seq = 1 - tf.cast(tf.math.equal(decoder_token_ids, 0), tf.float32)
    
    return seq[:, tf.newaxis, :]

In [14]:
#Look-ahead Mask
def create_look_ahead_mask(sequence_length):
    mask = tf.linalg.band_part(tf.ones((1, sequence_length, sequence_length)), -1, 0)
    
    return mask

Encoder block (6 originally)

In [16]:
class EncoderBlock(L.Layer):
    def __init__(self, H, d_model, dk, dv, dims, dropout_rate=0.1, layernorm_eps = 1e-6):
        super(EncoderBlock, self).__init__()
        
        self.mha = MultiHeadAttention(H, d_model, dk, dv)
        self.ffn = FNNLayer(d_model, dims)
        self.layernorm1 = L.LayerNormalization(epsilon=layernorm_eps)
        self.layernorm2 = L.LayerNormalization(epsilon=layernorm_eps)
        self.dropout_mha = L.Dropout(dropout_rate)
        self.dropout_ffn = L.Dropout(dropout_rate)
    
    def call(self, x, training=False, mask=None):
        A = self.mha(x,x,x,mask=mask)
        A = self.dropout_mha(A, training=training)
        out1 = self.layernorm1(x+A)
        ffn_output = self.ffn(out1)
        ffn_output = self.dropout_ffn(ffn_output, training=training)
        output = self.layernorm2(ffn_output+out1)
        
        return output

Encoder

In [18]:
class Encoder(tf.keras.layers.Layer):
    def __init__(self, N, H, d_model, dk, dv, dims, dropout_rate=0.1, layernorm_eps=1e-6):
        super(Encoder, self).__init__()
        self.layers=[L.EncoderLayer(H, d_model, dk, dv, dims, dropout_rate=dropout_rate, layernorm_eps=layernorm_eps) for i in range(N)]
    def call(self, x, training=False, mask=None):                      
        for layer in self.layers:
            x = layer(x, training=training, mask=mask)                     
        return x

Decoder block

In [20]:
class DecoderBlock(tf.keras.layers.Layer):

    def __init__(self, H, d_model, dk, dv, dims, dropout_rate=0.1, layernorm_eps=1e-6):
        super(DecoderBlock, self).__init__()
        self.mha1 = MultiHeadAttention(H, d_model, dk, dv)
        self.mha2 = MultiHeadAttention(H, d_model, dk, dv)
        self.ffn = FNNLayer(d_model, dims)
        self.layernorm1 = L.LayerNormalization(epsilon=layernorm_eps)
        self.layernorm2 = L.LayerNormalization(epsilon=layernorm_eps)
        self.layernorm3 = L.LayerNormalization(epsilon=layernorm_eps)
        self.dropout_mha1 = L.Dropout(dropout_rate)
        self.dropout_mha2 = L.Dropout(dropout_rate)                                     
        self.dropout_ffn = L.Dropout(dropout_rate)
    
    def call(self, x, encoder_output, training=False, look_ahead_mask=None, padding_mask=None):
        # 1st Masked MultiHead attention                                     
        A1 = self.mha1(x,x,x,mask=look_ahead_mask)
        A1 = self.dropout_mha1(A1, training=training)
        
        #  Residual connection + Layer normalization
        out1 = self.layernorm1(x+A1)

        # 2nd Masked MultiHead attention                                     
        A2 = self.mha2(x,encoder_output,encoder_output,mask=padding_mask)
        A2 = self.dropout_mha2(A2, training=training)
        
        #  Residual connection + Layer normalization
        out2 = self.layernorm2(out1+A2)
                                             
        # Pointwise ffn
        ffn_output = self.ffn(out2)
        ffn_output = self.dropout_ffn(ffn_output, training=training)
        
        decoder_layer_out = self.layernorm3(ffn_output+out2)
        return decoder_layer_out

Decoder

In [21]:
class Decoder(tf.keras.layers.Layer):

    def __init__(self, N, H, d_model, dk, dv, dims, dropout_rate=0.1, layernorm_eps=1e-6):
        super(Decoder, self).__init__()
        
        self.layers=[DecoderBlock(H, d_model, dk, dv, dims, dropout_rate=dropout_rate, layernorm_eps=layernorm_eps) for i in range(N)]
    
    def call(self, x, encoder_output, training=False, look_ahead_mask=None, padding_mask=None):
        for layer in self.layers:
            x = layer(x,encoder_output, look_ahead_mask=look_ahead_mask, padding_mask=padding_mask)
                          
        return x

Transformer model

In [22]:
class Transformer(tf.keras.Model):
    def __init__(self, N, H, d_model, dk, dv, dims, vocab_size, max_positional_encoding, dropout_rate=0.1, layernorm_eps=1e-6):
        super(Transformer, self).__init__()
        
        initializer = tf.keras.initializers.GlorotUniform()
        self.embedding = tf.Variable(initializer(shape=(vocab_size, d_model)), trainable=True)
        self.PE = pos_encoding(max_positional_encoding, d_model)
        
        self.dropout_encoding_input = L.Dropout(dropout_rate)
        self.dropout_decoding_input = L.Dropout(dropout_rate)
        
        self.encoder = Encoder(N, H, d_model, dk, dv, dims, dropout_rate=dropout_rate, layernorm_eps=layernorm_eps)
        self.decoder = Decoder(N, H, d_model, dk, dv, dims, dropout_rate=dropout_rate, layernorm_eps=layernorm_eps)

        

    def call(self, x, y, training=False, enc_padding_mask=None, look_ahead_mask=None, dec_padding_mask=None):
        x = tf.matmul(x,self.embedding)
        x = x + self.PE
        x = self.dropout_encoding_input(x,training=training)

        encoder_output = self.encoder(x,training=training, mask=enc_padding_mask)
        
        y = tf.matmul(y,self.embedding)
        y = y + self.PE
        y = self.dropout_decoding_input(y,training=training)
        
        dec_output = self.decoder(y, encoder_output, training=training, look_ahead_mask=look_ahead_mask, padding_mask=dec_padding_mask)
        
        pred = tf.matmul(self.embedding,dec_output,transpose_b=True)
        pred = tf.nn.softmax(pred)
        
        return pred

TODO `compilation` and `learning rate visualizaion`