In [2]:
import numpy as np
import tensorflow_datasets as tfds
import tensorflow_text as text
import tensorflow as tf
from tensorflow_text.tools.wordpiece_vocab import bert_vocab_from_dataset as bert


#_____________________________________

def angle(pos,i,dimension):
    return pos / np.power(10000, (2 * (i//2)) / np.float32(dimension))
   
def positional(pos,dimension):
    posmatrix=np.arange(pos)[:,np.newaxis]
    dimmatrix=np.arange(dimension)[np.newaxis,:]
    angles=angle(posmatrix,dimmatrix,dimension)
    i=0
    for index1,substack in enumerate(angles):
        i=0
        for index2,value in enumerate(substack):
            if i%2==0:
                substack[index2]=np.sin(value)#~we apply what is desired
            else:
                substack[index2]=np.cos(value)
            i+=1
    
    angles=angles[np.newaxis,...]
    #this will return a [ [](dimensions) ,... []] for every position we have dimension values think it as an embedding dimension
    return tf.cast(angles,dtype=tf.float32)
#And we have or positional embedding fully defined

def paddingmask(data):
    #data will have dims (batchsize, sequence)
    #for attention layers we will include two extra dimensions(batchsize,1,1,sequence)
    data = tf.cast(tf.math.equal(data, 0), tf.float32)
    #we do math equal in case of true cast will make it a 1.0 otherwise it will be false and cast into 0.0
    
    return data[:,tf.newaxis,tf.newaxis,:]


def create_padding_mask(seq):
    seq = tf.cast(tf.math.equal(seq, 0), tf.float32)

  # add extra dimensions to add the padding
  # to the attention logits.
    return seq[:, tf.newaxis, tf.newaxis, :]  # (batch_size, 1, 1, seq_len

def lookahead(dimension): #Dimensions has shape (dim,dim)
    #what we want is something that has first 0.0 and 1.0 for all values in the first row and then add another 0.0 for each row until 
    #completed
    #we will give dimension of the maximum sequence length ie(1,5) a sequence of five words 
    #in general we will have a (sequence,sequence) output dimension
    res=np.ones(dimension)
    for index,value in enumerate(res):
        res[index:,index]=0.0
        #we set from (x,x) to below as 0
        
    return tf.convert_to_tensor(res)



#This one is the one created by tensorflow post, it will come in handy the concept is exactly the same    
def create_look_ahead_mask(size):
    mask = 1 - tf.linalg.band_part(tf.ones((size, size)), -1, 0)
    return mask  # (seq_len, seq_len)

def dotproductattention(q,k,v,mask):
    
    
    #First we need to matmul q,k transposing k
    resmatmul=tf.matmul(q,k,transpose_b=True)
    
    #then we need the dimension of k d_k 
    kdim=float(tf.shape(k)[-1])
    resmatmul=resmatmul/(tf.math.sqrt(kdim))
    
    #It may be useful to include the situation where mask is None
    if mask is not None:
        resmatmul += mask*-1e9
    
    ressoftmax=tf.nn.softmax(resmatmul,axis=-1) # we need to do softmax on the last dimension
    return tf.matmul(ressoftmax,v), ressoftmax
    
    
class MultiAttention(tf.keras.layers.Layer):
    def __init__(self,modeldim,layersnum): 
        # we need to inherit all of the keras information of th super
        super().__init__()
        #now we can start working
        
        #WEIGHTS FOR Q,K,V
        self.wq=tf.keras.layers.Dense(modeldim)
        self.wk=tf.keras.layers.Dense(modeldim)
        self.wv=tf.keras.layers.Dense(modeldim)
                          
            
        #WEIGHTS FO AFTER CONCATENTION
        self.dense=tf.keras.layers.Dense(modeldim)
        
        #extra information
        
        self.modeldim=modeldim
        self.layersnum=layersnum
        assert modeldim % self.layersnum == 0
        self.depth=modeldim//self.layersnum
        
        
    #WE NEED TO SPLIT INTO modeldim /layresnum
    
    #All this part is heavily optimized thanks to the tensorflow tutorial for transformers
    #The main idea is to split the last dimension into (number of layers, model dimension)
    def split_heads(self, x, batch_size):
        #Split the last dimension into (layersnum, depth).
        #Transpose the result such that the shape is (batch_size, num_heads, seq_len, depth)
        #
        x = tf.reshape(x, (batch_size, -1, self.layersnum, self.depth))
        return tf.transpose(x, perm=[0, 2, 1, 3])

    def call(self, v, k, q, mask):
        batch_size = tf.shape(q)[0]

        q = self.wq(q)  # (batch_size, seq_len, d_model)
        k = self.wk(k)  # (batch_size, seq_len, d_model)
        v = self.wv(v)  # (batch_size, seq_len, d_model)

        q = self.split_heads(q, batch_size)  # (batch_size, layersnum, seq_len_q, depth)
        k = self.split_heads(k, batch_size)  # (batch_size, layersnum, seq_len_k, depth)
        v = self.split_heads(v, batch_size)  # (batch_size, layersnum, seq_len_v, depth)

        # scaled_attention.shape == (batch_size, layersnum, seq_len_q, depth)
        # attention_weights.shape == (batch_size, layersnum, seq_len_q, seq_len_k)
        scaled_attention, attention_weights = dotproductattention(
            q, k, v, mask)

        scaled_attention = tf.transpose(scaled_attention, perm=[0, 2, 1, 3])  # (batch_size, seq_len_q, num_heads, depth)

        concat_attention = tf.reshape(scaled_attention,
                                      (batch_size, -1, self.modeldim))  # (batch_size, seq_len_q, d_model)

        output = self.dense(concat_attention)  # (batch_size, seq_len_q, d_model)

        return output, attention_weights
    
    
def point_wise_feed_forward_network(d_model, dff):
    return tf.keras.Sequential([
          tf.keras.layers.Dense(dff, activation='relu'),  # (batch_size, seq_len, dff)
          tf.keras.layers.Dense(d_model)  # (batch_size, seq_len, d_model)
      ])


#here we have the definition of the layers
class EncoderLayer(tf.keras.layers.Layer):
    def __init__(self, d_model, num_heads, dff, rate=0.1):
        super(EncoderLayer, self).__init__()

        self.mha = MultiAttention(d_model, num_heads)
        self.ffn = point_wise_feed_forward_network(d_model, dff)

        self.layernorm1 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
        self.layernorm2 = tf.keras.layers.LayerNormalization(epsilon=1e-6)

        self.dropout1 = tf.keras.layers.Dropout(rate)
        self.dropout2 = tf.keras.layers.Dropout(rate)

    def call(self, x, training, mask):
        #we can see that q,k,v is the same vector given for encoders
        attn_output, _ = self.mha(x, x, x, mask)  # (batch_size, input_seq_len, d_model) we do an attention
        attn_output = self.dropout1(attn_output, training=training)#droupout for better training
        out1 = self.layernorm1(x + attn_output)  # (batch_size, input_seq_len, d_model) normalization
 
        ffn_output = self.ffn(out1)  # (batch_size, input_seq_len, d_model) feedforwards
        ffn_output = self.dropout2(ffn_output, training=training) 
        out2 = self.layernorm2(out1 + ffn_output)  # (batch_size, input_seq_len, d_model) 

        return out2
    
class DecoderLayer(tf.keras.layers.Layer):
    def __init__(self, d_model, num_heads, dff, rate=0.1):
        super(DecoderLayer, self).__init__()

        self.mha1 = MultiAttention(d_model, num_heads)
        self.mha2 = MultiAttention(d_model, num_heads)

        self.ffn = point_wise_feed_forward_network(d_model, dff)

        self.layernorm1 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
        self.layernorm2 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
        self.layernorm3 = tf.keras.layers.LayerNormalization(epsilon=1e-6)

        self.dropout1 = tf.keras.layers.Dropout(rate)
        self.dropout2 = tf.keras.layers.Dropout(rate)
        self.dropout3 = tf.keras.layers.Dropout(rate)

    def call(self, x, enc_output, training,
               look_ahead_mask, padding_mask):
        # enc_output.shape == (batch_size, input_seq_len, d_model)

        attn1, attn_weights_block1 = self.mha1(x, x, x, look_ahead_mask)  # (batch_size, target_seq_len, d_model)
        attn1 = self.dropout1(attn1, training=training)
        out1 = self.layernorm1(attn1 + x)

        attn2, attn_weights_block2 = self.mha2(
            enc_output, enc_output, out1, padding_mask)  # (batch_size, target_seq_len, d_model)
        attn2 = self.dropout2(attn2, training=training)
        out2 = self.layernorm2(attn2 + out1)  # (batch_size, target_seq_len, d_model)

        ffn_output = self.ffn(out2)  # (batch_size, target_seq_len, d_model)
        ffn_output = self.dropout3(ffn_output, training=training)
        out3 = self.layernorm3(ffn_output + out2)  # (batch_size, target_seq_len, d_model)

        return out3, attn_weights_block1, attn_weights_block2


#AND finally we define the encoder and the decoder
class Encoder(tf.keras.layers.Layer):
    def __init__(self, num_layers, d_model, num_heads, dff, input_vocab_size,
                   maximum_position_encoding, rate=0.1):
        super(Encoder, self).__init__()

        self.d_model = d_model
        self.num_layers = num_layers

        self.embedding = tf.keras.layers.Embedding(input_vocab_size, d_model)
        self.pos_encoding = positional(maximum_position_encoding,
                                                self.d_model)

        self.enc_layers = [EncoderLayer(d_model, num_heads, dff, rate)
                           for _ in range(num_layers)]

        self.dropout = tf.keras.layers.Dropout(rate)

    def call(self, x, training, mask):

        seq_len = tf.shape(x)[1]

        # adding embedding and position encoding.
        x = self.embedding(x)  # (batch_size, input_seq_len, d_model) we create the embedding so it is of the dimensions we want
        x *= tf.math.sqrt(tf.cast(self.d_model, tf.float32))
        x += self.pos_encoding[:, :seq_len, :]
        
        x = self.dropout(x, training=training)

        for i in range(self.num_layers):
            x = self.enc_layers[i](x, training, mask)

        return x  # (batch_size, input_seq_len, d_model)
class Decoder(tf.keras.layers.Layer):
    def __init__(self, num_layers, d_model, num_heads, dff, target_vocab_size,
               maximum_position_encoding, rate=0.1):
        super(Decoder, self).__init__()

        self.d_model = d_model
        self.num_layers = num_layers

        self.embedding = tf.keras.layers.Embedding(target_vocab_size, d_model)
        self.pos_encoding = positional(maximum_position_encoding, d_model)

        self.dec_layers = [DecoderLayer(d_model, num_heads, dff, rate)
                           for _ in range(num_layers)]
        self.dropout = tf.keras.layers.Dropout(rate)

    def call(self, x, enc_output, training,
               look_ahead_mask, padding_mask):

        seq_len = tf.shape(x)[1]
        attention_weights = {}

        x = self.embedding(x)  # (batch_size, target_seq_len, d_model)
        x *= tf.math.sqrt(tf.cast(self.d_model, tf.float32))
        x += self.pos_encoding[:, :seq_len, :]

        x = self.dropout(x, training=training)

        for i in range(self.num_layers):
            x, block1, block2 = self.dec_layers[i](x, enc_output, training,
                                                 look_ahead_mask, padding_mask)

            attention_weights[f'decoder_layer{i+1}_block1'] = block1
            attention_weights[f'decoder_layer{i+1}_block2'] = block2

        # x.shape == (batch_size, target_seq_len, d_model)
        return x, attention_weights
    
    
class Transformer(tf.keras.Model):
    def __init__(self, num_layers, d_model, num_heads, dff, input_vocab_size,
                   target_vocab_size, pe_input, pe_target, rate=0.1):
        super().__init__()
        self.encoder = Encoder(num_layers, d_model, num_heads, dff,
                                 input_vocab_size, pe_input, rate)

        self.decoder = Decoder(num_layers, d_model, num_heads, dff,
                               target_vocab_size, pe_target, rate)

        self.final_layer = tf.keras.layers.Dense(target_vocab_size)

    def call(self, inputs, training):
    # Keras models prefer if you pass all your inputs in the first argument
        inp, tar = inputs

        enc_padding_mask, look_ahead_mask, dec_padding_mask = self.create_masks(inp, tar)

        enc_output = self.encoder(inp, training, enc_padding_mask)  # (batch_size, inp_seq_len, d_model)

        # dec_output.shape == (batch_size, tar_seq_len, d_model)
        dec_output, attention_weights = self.decoder(
            tar, enc_output, training, look_ahead_mask, dec_padding_mask)

        final_output = self.final_layer(dec_output)  # (batch_size, tar_seq_len, target_vocab_size)

        return final_output, attention_weights

    def create_masks(self, inp, tar):
        # Encoder padding mask
        enc_padding_mask = paddingmask(inp)

        # Used in the 2nd attention block in the decoder.
        # This padding mask is used to mask the encoder outputs.
        dec_padding_mask = paddingmask(inp)

        # Used in the 1st attention block in the decoder.
        # It is used to pad and mask future tokens in the input received by
        # the decoder.
        look_ahead_mask = create_look_ahead_mask(tf.shape(tar)[1])
        dec_target_padding_mask = create_padding_mask(tar) #lookaheadmask
        look_ahead_mask = tf.maximum(dec_target_padding_mask, look_ahead_mask)

        return enc_padding_mask, look_ahead_mask, dec_padding_mask