In [1]:
import tensorflow as tf
import numpy as np
import tensorflow_probability as tfp

# Scaled Dot-Product attention function

In [2]:
def dot_product_attn(q, k, v, mask = None):
    
    lenQ = q.get_shape()[-1]
    
    energies = tf.multiply(1/lenQ**0.5, tf.matmul(q, k, transpose_b = True))
    
    if not mask is None:
        mask = (1. - mask) * -1e9
        energies = tf.add(energies, mask)
    
    alphas = tf.nn.softmax(energies, axis = -1)
    
    context = tf.matmul(alphas, v)
    
    return context

In [3]:
m, k, d_mod = 2, 5, 12
Q, K, V = tf.random.uniform((m, k, d_mod)), tf.random.uniform((m, k, d_mod)), tf.random.uniform((m, k, d_mod))

m = tf.convert_to_tensor(np.array([[1,1,0,0,0],[1,1,1,1,0]]), dtype = 'float32')
m = m[:, tf.newaxis, :]

In [4]:
dot_product_attn(Q, K, V, mask = m).get_shape()

TensorShape([2, 5, 12])

# Multihead Projection

In [5]:
class MultiHeadProjection(tf.keras.layers.Layer):
    
    def __init__(self, projected_dim, heads = 8, **kwargs):
        super().__init__(**kwargs)
        self.h = heads
        self.projected_dim = projected_dim
        
    def build(self, input_shape):
         
        assert(len(input_shape) == 3), 'Expected input of rank 3: (m, Tx, d_model)'
        
        self.m, self.k, self.model_dim = input_shape
        
        self.W = self.add_weight(
                shape = (self.h, self.model_dim, self.projected_dim), 
                initializer = 'glorot_normal', 
                trainable = True)
        
        self.b = self.add_weight(shape = (self.h, 1, self.projected_dim), initializer = 'Zeros', 
                trainable = True)
        
    def call(self, X):
        
        X = tf.expand_dims(X, 1) # adds a head layer
        
        output = tf.add(tf.matmul(X, self.W), self.b)
        
        return output

In [6]:
projer = MultiHeadProjection(8, 8)
x = tf.random.normal((2, 5, 64))
print(x.get_shape())

(2, 5, 64)


In [7]:
projer(x).get_shape()

TensorShape([2, 8, 5, 8])

# Encoder Attn Layer

In [8]:
class AttentionLayer(tf.keras.layers.Layer):
   
    def __init__(self, projected_dim, heads = 8, **kwargs):
        super().__init__(**kwargs)
        self.h = heads
        self.projected_dim = projected_dim
        
    def build(self, input_shape):
        
        for input_ in input_shape:
            assert(len(input_) == 3), 'Expected input shape of (m, Tx, d)'
        
        (self.projQ, self.projK, self.projV) = (MultiHeadProjection(self.projected_dim, self.h) 
                                       for input_ in input_shape)
        
        (output_m, output_k, output_d) = input_shape[-1]
        
        self.reshaper = tf.keras.layers.Reshape(target_shape = (-1, self.projected_dim * self.h))
        
        self.dense = tf.keras.layers.Dense(output_d)
        
    def call(self, X, mask = None):
        '''
        Arguments
        X: list of (Q, K, V)
        mask: for softmax layer
        '''
        
        (Q,K,V) = X
        
        Q, K, V = self.projQ(Q), self.projK(K), self.projV(V)
        
        #print(Q.get_shape(), K.get_shape(), V.get_shape())
        
        attention = dot_product_attn(Q, K, V, mask = mask)
        
        #print(attention.get_shape())
        attention = tf.transpose(attention, perm=[0, 2, 1, 3])
        
        flattened = self.reshaper(attention)
        
        #print(flattened.get_shape())
        
        output = self.dense(flattened)
        
        return output

In [9]:
attn = AttentionLayer(8, 8)
x = tf.random.normal((2, 5, 64))
print(x.get_shape())

(2, 5, 64)


In [10]:
attn((x,x,x)).get_shape()

TensorShape([2, 5, 64])

# Fully Connected Layer

In [11]:
class FCNNLayer(tf.keras.layers.Layer):
    
    def __init__(self, d_model, dff, **kwargs):
        super().__init__(**kwargs)
        self.d_model, self.dff = d_model, dff
        
    def build(self, input_shape):
        self.dense1 = tf.keras.layers.Dense(self.dff, activation = 'relu')
        self.dense2 = tf.keras.layers.Dense(self.d_model, activation = 'linear')
        
    def call(self, X):
        return self.dense2(self.dense1(X))

In [12]:
x = tf.random.normal((2, 5, 512))
f = FCNNLayer(512, 2048)

In [13]:
x = f(x)
x.get_shape()

TensorShape([2, 5, 512])

#  Encoder Layer

In [14]:
class TransformerEncoder(tf.keras.layers.Layer):
    
    def __init__(self, dff = 2048, heads = 8, dropout = 0.1, **kwargs):
        super().__init__(**kwargs)
        self.h = heads
        self.dropout = dropout
        self.dff = dff
        
    def build(self, input_shape):
        assert(len(input_shape) == 3), 'Expected input shape of (m, Tx, d)'
        
        (self.m, self.k, self.d_model) = input_shape
        
        self.projected_dim = self.d_model//self.h
        
        self.attn = AttentionLayer(self.projected_dim, self.h)
        self.drop1 = tf.keras.layers.Dropout(self.dropout)
        self.norm1 = tf.keras.layers.LayerNormalization()
        
        self.fcnn = FCNNLayer(self.d_model, self.dff)
        self.drop2 = tf.keras.layers.Dropout(self.dropout)
        self.norm2 = tf.keras.layers.LayerNormalization()
                
    def call(self, X, training = True, mask = None):
        
        attn_output = self.drop1(self.attn([X,X,X], mask = mask), training = training)
        
        X = self.norm1(attn_output + X)
        
        fcnn_output = self.drop2(self.fcnn(X), training = training)
        
        X = self.norm2(fcnn_output + X)
        
        return X  

In [89]:
x = tf.random.normal((2, 5, 512))
encoder_layer = TransformerEncoder()

In [90]:
y = encoder_layer(x)
y.get_shape()

TensorShape([2, 5, 512])

# DecoderLayer

In [17]:
class TransformerDecoder(tf.keras.layers.Layer):
    
    def __init__(self, dff = 2048, heads = 8, dropout = 0.1, **kwargs):
        super().__init__(**kwargs)
        self.h = heads
        self.dropout = dropout
        self.dff = dff
        
    def build(self, input_shape):
        assert(len(input_shape) == 3), 'Expected input shape of (m, Tx, d)'
        
        (self.m, self.k, self.d_model) = input_shape
        
        self.projected_dim = self.d_model//self.h
        
        self.intr_attn = AttentionLayer(self.projected_dim, self.h)
        self.drop1 = tf.keras.layers.Dropout(self.dropout)
        self.norm1 = tf.keras.layers.LayerNormalization()
        
        self.enc_dec_attn = AttentionLayer(self.projected_dim, self.h)
        self.drop2 = tf.keras.layers.Dropout(self.dropout)
        self.norm2 = tf.keras.layers.LayerNormalization()
        
        self.fcnn = FCNNLayer(self.d_model, self.dff)
        self.drop3 = tf.keras.layers.Dropout(self.dropout)
        self.norm3 = tf.keras.layers.LayerNormalization()
                
    def call(self, X, encoder_output, lookahead_mask = None, encoder_padding_mask = None, training = True):
        
        # attention mechanism 1
        attn_output = self.drop1(self.intr_attn([X,X,X], mask = lookahead_mask), training = training)
        X = self.norm1(attn_output + X)
                              
        # attention mechanism 2
        attn_output = self.drop2(self.enc_dec_attn([X,encoder_output, encoder_output], mask = encoder_padding_mask), training = training)
        X = self.norm2(attn_output + X)
                                 
        # fcnn
        fcnn_output = self.drop3(self.fcnn(X), training = training)
        X = self.norm3(fcnn_output + X)               
                
        return X  

In [18]:
x = tf.random.normal((2, 5, 512))
decoder_layer = TransformerDecoder()

In [19]:
decoder_layer(x, y).get_shape()

TensorShape([2, 5, 512])

# Position Encoding Layer

In [20]:
class PositionalEmbedding(tf.keras.layers.Layer):
    
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
    
    def build(self, input_shape):
        
        (self.m, self.k, self.d_model) = input_shape
        
        pos = np.arange(self.k).reshape(-1,1)
        
        i = 1 / np.power(10000, 2 * np.arange(self.d_model) / self.d_model)
        
        embeddings = pos * i
        
        evens = np.arange(0, self.d_model, 2)
        
        odds = evens + 1
        
        embeddings[:, evens] = np.sin(embeddings[:, evens])
        
        embeddings[:, odds] = np.cos(embeddings[:, odds])
        
        self.embeddings = tf.convert_to_tensor(np.expand_dims(embeddings, 0), dtype = 'float32')
        
    def call(self, X):
        X = X + self.embeddings
        
        return tf.multiply(X, self.d_model**0.5)

In [21]:
pe = PositionalEmbedding()
x = tf.ones((3, 5, 12))

In [22]:
y = pe(x)
y.get_shape()

TensorShape([3, 5, 12])

# Encoder Stack

In [23]:
class EncoderStack(tf.keras.layers.Layer):
    
    def __init__(self, num_classes, d_model = 512, num_layers = 6, num_heads = 8, dropout = 0.1, dff = 2048, **kwargs):
        super().__init__(**kwargs)
        self.num_classes = num_classes
        self.d_model = d_model
        self.num_layers = num_layers
        self.num_heads = num_heads
        self.dropout = dropout
        self.dff = dff
        
    def build(self, input_shape):
                
        (m, k) = input_shape
        
        seq_shape = (m, k, self.d_model)

        self.embedding = tf.keras.layers.Embedding(self.num_classes, self.d_model, mask_zero = True)
        self.positional_embedding = PositionalEmbedding()
        
        self.encoders = [
            TransformerEncoder(dff = self.dff, heads = self.num_heads, dropout = self.dropout) 
            for i in range(self.num_layers)
        ]
        
    def call(self, seqs, training = True):
        
        X = self.embedding(seqs)
        
        #expand the mask from the embedding layer from (m, Tx) to (m, 1, 1, Tx) for multihead softmax
        encoder_mask = tf.dtypes.cast(self.embedding.compute_mask(seqs), 'float32')[:, tf.newaxis, tf.newaxis, :]
        
        X = self.positional_embedding(X)
        
        #call(self, X, encoder_output, lookahead_mask = None, encoder_padding_mask = None, training = True)
        for encoder in self.encoders:
            X = encoder(X, mask = encoder_mask, training = training)
            
        return X, encoder_mask

# Decoder Stack

In [24]:
class DecoderStack(tf.keras.layers.Layer):
    
    
    def __init__(self, num_classes, d_model = 512, num_layers = 6, num_heads = 8, dropout = 0.1, dff = 2048, **kwargs):
        super().__init__(**kwargs)
        self.num_classes = num_classes
        self.d_model = d_model
        self.num_layers = num_layers
        self.num_heads = num_heads
        self.dropout = dropout
        self.dff = dff
        
    def build(self, input_shape):
        
        assert(len(input_shape) == 3), 'Expected input with len 3 in the form of (decoder_input, encoder_output, encoder_mask)'
        assert(input_shape[0][1] == input_shape[1][1]), 'Expected encoder output and decoder input to have same time dimension'
        
        (m, k) = input_shape[0]
        
        num_ones = 0.5 * (k**2 + k)

        self.trailing_mask = tfp.math.fill_triangular(tf.ones(num_ones), upper = False)

        self.embedding = tf.keras.layers.Embedding(self.num_classes, self.d_model, mask_zero = True)
        
        self.positional_embedding = PositionalEmbedding()
        
        self.decoders = [
            TransformerDecoder(dff = self.dff, heads = self.num_heads, dropout = self.dropout) 
            for i in range(self.num_layers)
        ]
            
    def call(self, inputs, training = True):
        
        (seqs, encoder_output, encoder_mask) = inputs
        
        X = self.embedding(seqs)
        
        #expand the mask from the embedding layer from (m, Tx) to (m, 1, 1, Tx) for multihead softmax
        decoder_padding_mask = tf.dtypes.cast(self.embedding.compute_mask(seqs), 'float32')[:, tf.newaxis, tf.newaxis, :]
        #then add trailing mask to it
        decoder_mask = tf.multiply(decoder_padding_mask, self.trailing_mask)
        
        #print(decoder_mask)
        
        X = self.positional_embedding(X)
        
        #call(self, X, encoder_output, lookahead_mask = None, encoder_padding_mask = None, training = True)
        for decoder in self.decoders:
            X = decoder(X, encoder_output, lookahead_mask = decoder_mask, 
                        encoder_padding_mask = encoder_mask, training = training)
            
        X = tf.matmul(X, tf.transpose(self.embedding.embeddings))
            
        return X

In [25]:
x = np.array([[1, 2, 3, 4, 0, 0, 0],[2,3,4,0,0,0,0],[1,5,0,0,0,0,0]])
print(x.shape)

encode = EncoderStack(100)
x, m = encode(x)

(3, 7)


In [26]:
y = np.array([[1, 2, 3, 4, 0, 0, 0],[2,3,4,0,0,0,0],[1,5,0,0,0,0,0]])
decode = DecoderStack(100)

y = decode((y, x, m))
y.get_shape()

TensorShape([3, 7, 100])

# Transformer Model

In [27]:
def Transformer(num_classes, max_seq_len, d_model = 512, num_layers = 6, num_heads = 8, dropout = 0.1, dff = 2048):
        
        X = tf.keras.Input(shape = (max_seq_len,))
        
        Y = tf.keras.Input(shape = (max_seq_len,))
        
        enc_output, encoder_mask = EncoderStack(num_classes, d_model, num_layers, num_heads, dropout, dff)(X)
    
        logits = DecoderStack(num_classes, d_model, num_layers, num_heads, dropout, dff)((Y, enc_output, encoder_mask))
        
        return tf.keras.Model(inputs = [X,Y], outputs = [logits])

In [28]:
x = np.array([[1, 2, 3, 4, 0, 0, 0],[2,3,4,0,0,0,0],[1,5,0,0,0,0,0]])
y = np.array([[1, 2, 3, 4, 0, 0, 0],[2,3,4,0,0,0,0],[1,5,0,0,0,0,0]])

tr = Transformer(10, 7)

In [29]:
tr.predict([x,y]).shape

(3, 7, 10)

# Loss Function

In [82]:
class TransformerLoss():
    
    '''
    logits: output from linear dense layer (m, Tx, num_classes)
    y_true: labels for sparse crossentropy (m, Tx)
    epsilon: label smoothing factor [0,1]
    returns loss value
    '''
    
    def __init__(self, epsilon = 0.):
        self.epsilon = epsilon
    
    def __call__(self, labels, logits):
    
        (m, Tx, num_classes) = logits.get_shape()

        expanded_y = tf.keras.utils.to_categorical(labels, num_classes = num_classes) #shape = (m, Tx, num_classes)

        losses = tf.keras.losses.categorical_crossentropy(expanded_y, logits, from_logits=True, label_smoothing = self.epsilon)

        mask = 1. - tf.dtypes.cast(tf.math.equal(y_true, 0), 'float32')

        return tf.reduce_mean(losses)

In [83]:
logits = tf.random.normal((2,5,12))

y_true = np.array([[1,2,3,0,0],[2,2,3,3,0]])

logits.shape, y_true.shape

(TensorShape([2, 5, 12]), (2, 5))

In [86]:
losser = TransformerLoss(epsilon = 0.1)
losser(y_true, logits)

<tf.Tensor: id=16242, shape=(), dtype=float32, numpy=3.642079>

# Optimizer

In [88]:
class CustomSchedule(tf.keras.optimizers.schedules.LearningRateSchedule):
    
    def __init__(self, d_model, warmup_steps=4000):
        super(CustomSchedule, self).__init__()

        self.d_model = d_model
        self.d_model = tf.cast(self.d_model, tf.float32)

        self.warmup_steps = warmup_steps
    
    def __call__(self, step):
        arg1 = tf.math.rsqrt(step)
        arg2 = step * (self.warmup_steps ** -1.5)

        return tf.math.rsqrt(self.d_model) * tf.math.minimum(arg1, arg2)

In [91]:
def TransformerOptimizer(d_model):
    
    learning_rate = CustomSchedule(d_model)

    return tf.keras.optimizers.Adam(learning_rate, beta_1=0.9, beta_2=0.98, 
                                     epsilon=1e-9)