# 1) Importations

In [1]:
import tensorflow as tf
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

# 2) Preprocessing input et output embedding

In [2]:
input_embedding = [["Salut", "comment", "ca", "va", "?"]] #1 batch de 1 sequence
output_embedding = [["<START>", "Hi", "how", "are", "you", "?"]]

NB_ENCODER = 6
NB_DECODER = 6

In [3]:
def get_vocabulary(sequences):
    token_to_info = {}
    for sequence in sequences:
        for word in sequence:
            if word not in token_to_info:                #Pas de doublons dans les tokens
                token_to_info[word] = len(token_to_info) #On donne un ID au token, qui sera la longueur de la liste de token
    return token_to_info

In [4]:
input_voc = get_vocabulary(input_embedding)
output_voc = get_vocabulary(output_embedding)
print(input_voc)
print(output_voc)

{'Salut': 0, 'comment': 1, 'ca': 2, 'va': 3, '?': 4}
{'<START>': 0, 'Hi': 1, 'how': 2, 'are': 3, 'you': 4, '?': 5}


### --> On ajoute les tokens spécifiques

In [5]:
input_voc["<START>"] = len(input_voc)
input_voc["<END>"] = len(input_voc)
input_voc["<PAD >"] = len(input_voc) #Le padding est utile pr remplir les sequences n'étant pas de même taille que d'autres

output_voc["<END>"] = len(input_voc) #Attention à ne pas ajouter <START> ici si déjà fait dans le output_embedding !!!
output_voc["<PAD >"] = len(input_voc)

print(input_voc)
print(output_voc)

NB_TOKEN = len(input_embedding[0])
SEQ_LEN = len(input_embedding[0])

NB_TOKEN_OUT = len(output_embedding[0])
SEQ_LEN_OUT = len(output_embedding[0])

{'Salut': 0, 'comment': 1, 'ca': 2, 'va': 3, '?': 4, '<START>': 5, '<END>': 6, '<PAD >': 7}
{'<START>': 0, 'Hi': 1, 'how': 2, 'are': 3, 'you': 4, '?': 5, '<END>': 8, '<PAD >': 8}


### --> Transformation des mots en int pour notre modèle

In [6]:
def sequences_to_int(sequences, voc):
    for sequence in sequences:
        for index, word in enumerate(sequence):
            sequence[index] = voc[word]
    return np.array(sequences)

In [7]:
input_seq = sequences_to_int(input_embedding, input_voc)
output_seq = sequences_to_int(output_embedding, output_voc)
print(input_seq)
print(output_seq)

[[0 1 2 3 4]]
[[0 1 2 3 4 5]]


# 3) Layers

### --> Input embedding layer

In [8]:
class Embedding(tf.keras.layers.Layer):
    def __init__(self, nb_token, **kwargs):
        self.nb_token = nb_token
        super(**kwargs).__init__()
        
    def build(self, input_shape):
        self.word_embedding = tf.keras.layers.Embedding(self.nb_token, 256)
        super().build(input_shape)
        
    def call(self, x):
        embed = self.word_embedding(x)
        return embed

### --> Scaled Dot-Product Attention

In [9]:
class ScaledDotProductAttention(tf.keras.layers.Layer):
    def __init__(self, **kwargs):
        super(**kwargs).__init__()
        
    def build(self, input_shape):
        self.query_layer = tf.keras.layers.Dense(256)
        self.key_layer   = tf.keras.layers.Dense(256)
        self.value_layer = tf.keras.layers.Dense(256)
        super().build(input_shape)
        
    def call(self, x):
        Q = self.query_layer(x)
        K = self.key_layer(x)
        V = self.value_layer(x)
        QK = tf.matmul(Q, K, transpose_b=True)
        QK = QK / tf.math.sqrt(256.0)           #Normalise  les valeurs
        softmax_QK = tf.nn.softmax(QK, axis=-1) #Attention pour chaque mots de la sequence
        attention = tf.matmul(softmax_QK, V)    #Applique notre attention aux V
        return attention

In [10]:
def test_ScaledDotProductAttention():
    layer_input = tf.keras.Input(shape=(SEQ_LEN)) #Taille sequence : 5 (On peut gerer les autres tailles avec de PAD)
    embedding = Embedding(nb_token=NB_TOKEN)(layer_input)
    attention = ScaledDotProductAttention()(embedding)
    model = tf.keras.Model(layer_input, attention)
    model.summary()
    return model
    
model_test = test_ScaledDotProductAttention()
out = model_test(input_seq)
print(out.shape)

Model: "model"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
input_1 (InputLayer)         [(None, 5)]               0         
_________________________________________________________________
embedding (Embedding)        (None, 5, 256)            1280      
_________________________________________________________________
scaled_dot_product_attention (None, 5, 256)            197376    
Total params: 198,656
Trainable params: 198,656
Non-trainable params: 0
_________________________________________________________________
(1, 5, 256)


### --> Multi Head Attention / Masket Multi Head Attention

In [11]:
class MultiHeadAttention(tf.keras.layers.Layer):
    def __init__(self, dim=256, nb_head=8, **kwargs): #dim doit être divisible par nb_head
        self.dim = dim
        self.head_dim = dim//nb_head
        self.nb_head = nb_head
        super(**kwargs).__init__()
        
    def build(self, input_shape):
        self.query_layer = tf.keras.layers.Dense(256)
        self.key_layer   = tf.keras.layers.Dense(256)
        self.value_layer = tf.keras.layers.Dense(256)
        self.output_layer = tf.keras.layers.Dense(256)
        super().build(input_shape)
        
    def mask_softmax(self, x, mask):
        x_exp = tf.math.exp(x)
        x_exp_masked = x_exp * mask
        x_exp_sum = tf.reduce_sum(x_exp_masked, axis=-1)
        x_exp_sum = tf.expand_dims(x_exp_sum, axis=-1)
        softmax = x_exp_masked / x_exp_sum
        return softmax
        
    def call(self, x, mask=None):
        in_Q, in_K, in_V = x
        
        Q = self.query_layer(in_Q)
        K = self.key_layer(in_K)
        V = self.value_layer(in_V)
        
        batch_size = tf.shape(Q)[0]
        Q_seq_len = tf.shape(Q)[1]
        K_seq_len = tf.shape(K)[1]
        V_seq_len = tf.shape(V)[1]
        
        Q = tf.reshape(Q, [batch_size, Q_seq_len, self.nb_head, self.head_dim])
        K = tf.reshape(K, [batch_size, K_seq_len, self.nb_head, self.head_dim])
        V = tf.reshape(V, [batch_size, V_seq_len, self.nb_head, self.head_dim])
          
        Q = tf.transpose(Q, [0, 2, 1, 3])
        K = tf.transpose(K, [0, 2, 1, 3])
        V = tf.transpose(V, [0, 2, 1, 3])

        Q = tf.reshape(Q, [batch_size * self.nb_head, Q_seq_len, self.head_dim])
        K = tf.reshape(K, [batch_size * self.nb_head, K_seq_len, self.head_dim])
        V = tf.reshape(V, [batch_size * self.nb_head, V_seq_len, self.head_dim])
        
        #Scaled dot product attention
        QK = tf.matmul(Q, K, transpose_b=True)
        QK = QK / tf.math.sqrt(float(self.dim)) #Normalise  les valeurs
        
        #Mask
        #Ici on veut éviter de donner les réponses au réseau dans le decoder
        #On veut donc que l'attention ne porte que sur les éléments précédents ainsi que de la self-attention
        #[1, 0, 0]
        #[1, 1, 0]
        #[1, 1, 1]
        #Voila ce qu'on veut obtenir
        if mask is not None:
            QK = QK * mask
            softmax_QK = self.mask_softmax(QK, mask) #Softmax custom pour éviter les problèmes liés aux valuers à 0 à cause du mask
        else:
            softmax_QK = tf.nn.softmax(QK, axis=-1)  #Attention pour chaque mots de la sequence
        
        attention = tf.matmul(softmax_QK, V)         #Applique notre attention aux V
        
        #Concatenation des scaled dot product attention
        attention = tf.reshape(attention, [batch_size, self.nb_head, Q_seq_len, self.head_dim])
        attention = tf.transpose(attention, [0, 2, 1, 3])
        attention = tf.reshape(attention, [batch_size, Q_seq_len, self.nb_head * self.head_dim])
        
        out_attention = self.output_layer(attention)
        
        return out_attention

In [12]:
def test_MultiHeadAttention():
    layer_input = tf.keras.Input(shape=(SEQ_LEN_OUT))
    embedding = Embedding(nb_token=NB_TOKEN_OUT)(layer_input)
    attention = MultiHeadAttention()((embedding, embedding, embedding))
    model = tf.keras.Model(layer_input, attention)
    model.summary()
    return model
    
model_test = test_MultiHeadAttention()
out = model_test(output_seq)
print(out.shape)

Model: "model_1"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_2 (InputLayer)            [(None, 6)]          0                                            
__________________________________________________________________________________________________
embedding_1 (Embedding)         (None, 6, 256)       1536        input_2[0][0]                    
__________________________________________________________________________________________________
multi_head_attention (MultiHead (None, None, 256)    263168      embedding_1[0][0]                
                                                                 embedding_1[0][0]                
                                                                 embedding_1[0][0]                
Total params: 264,704
Trainable params: 264,704
Non-trainable params: 0
____________________

In [13]:
def test_MaskedMultiHeadAttention():
    layer_input = tf.keras.Input(shape=(SEQ_LEN_OUT)) #Taille sequence : 6 (On peut gerer les autres tailles avec de PAD)
    embedding = Embedding(nb_token=NB_TOKEN_OUT)(layer_input)
    
    #Mask
    mask = tf.sequence_mask(tf.range(SEQ_LEN_OUT) + 1, SEQ_LEN_OUT)
    mask = tf.cast(mask, tf.float32)
    mask = tf.expand_dims(mask, axis=0)
        
    attention = MultiHeadAttention()((embedding, embedding, embedding), mask=None)
    model = tf.keras.Model(layer_input, attention)
    model.summary()
    
    return model
    
model_test = test_MaskedMultiHeadAttention()
out = model_test(output_seq)
print(out.shape)

Model: "model_2"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_3 (InputLayer)            [(None, 6)]          0                                            
__________________________________________________________________________________________________
embedding_2 (Embedding)         (None, 6, 256)       1536        input_3[0][0]                    
__________________________________________________________________________________________________
multi_head_attention_1 (MultiHe (None, None, 256)    263168      embedding_2[0][0]                
                                                                 embedding_2[0][0]                
                                                                 embedding_2[0][0]                
Total params: 264,704
Trainable params: 264,704
Non-trainable params: 0
____________________

### --> Encoder layer

In [14]:
class EncoderLayer(tf.keras.layers.Layer):
    def __init__(self, **kwargs):
        super(**kwargs).__init__()
        
    def build(self, input_shape):
        self.multi_head_attention = MultiHeadAttention()
        self.norm = tf.keras.layers.LayerNormalization()
        self.feed_forward = tf.keras.layers.Dense(256)
        super().build(input_shape)
        
    def call(self, x):
        attention = self.multi_head_attention((x, x, x)) #Multi Head Attention, x x x because self attention
        post_attention = self.norm(attention + x)        #Add & Norm
        feed_forward = self.feed_forward(post_attention) #Feed Forward
        enc_output = self.norm(x + post_attention)       #2nd Add & Norm
        return enc_output

In [15]:
def test_EncoderLayer():
    layer_input = tf.keras.Input(shape=(SEQ_LEN)) #Taille sequence : 5 (On peut gerer les autres tailles avec de PAD)
    input_embedding = Embedding(nb_token=NB_TOKEN)(layer_input)
    encoder_output = EncoderLayer()(input_embedding)
    model = tf.keras.Model(layer_input, encoder_output)
    model.summary()
    return model
    
model_test = test_EncoderLayer()

Model: "model_3"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
input_4 (InputLayer)         [(None, 5)]               0         
_________________________________________________________________
embedding_3 (Embedding)      (None, 5, 256)            1280      
_________________________________________________________________
encoder_layer (EncoderLayer) (None, 5, 256)            329472    
Total params: 330,752
Trainable params: 330,752
Non-trainable params: 0
_________________________________________________________________


### --> Decoder Layer

In [16]:
class DecoderLayer(tf.keras.layers.Layer):
    def __init__(self, **kwargs):
        super(**kwargs).__init__()
        
    def build(self, input_shape):
        self.multi_head_self_attention = MultiHeadAttention()
        self.norm = tf.keras.layers.LayerNormalization()
        self.output_layer = tf.keras.layers.Dense(256)
        super().build(input_shape)
        
    def call(self, x):
        encoder_output, output_embedding, mask = x
        self_attention = self.multi_head_self_attention((output_embedding, output_embedding, output_embedding), mask=mask) #Masked ulti Head Self Attention
        post_self_attention = self.norm(output_embedding + self_attention)                                                 #Skip
        encoder_attention = self.multi_head_self_attention((post_self_attention, encoder_output, encoder_output))          #Multi Head Attention
        post_encoder_attention = self.norm(encoder_attention + post_self_attention)                                        #Skip
        output = self.output_layer(post_self_attention)                                                                    #Output
        decoder_output = self.norm(output + post_encoder_attention)                                                        #Skip
        return decoder_output

In [17]:
def test_DecoderLayer():
    layer_input = tf.keras.Input(shape=(SEQ_LEN)) #Taille sequence : 5 (On peut gerer les autres tailles avec de PAD)
    input_embedding = Embedding(nb_token=NB_TOKEN)(layer_input)
    encoder_output = EncoderLayer()(input_embedding)
    model = tf.keras.Model(layer_input, encoder_output)
    model.summary()
    return model
    
model_test = test_EncoderLayer()

Model: "model_4"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
input_5 (InputLayer)         [(None, 5)]               0         
_________________________________________________________________
embedding_4 (Embedding)      (None, 5, 256)            1280      
_________________________________________________________________
encoder_layer_1 (EncoderLaye (None, 5, 256)            329472    
Total params: 330,752
Trainable params: 330,752
Non-trainable params: 0
_________________________________________________________________


# 4) Encoder

In [18]:
class Encoder(tf.keras.layers.Layer):
    def __init__(self, nb_encoder, **kwargs):
        self.nb_encoder = nb_encoder
        super(**kwargs).__init__()
        
    def build(self, input_shape):
        self.encoder_layers = []
        for n in range(self.nb_encoder):
            self.encoder_layers.append(EncoderLayer())
        super().build(input_shape)
        
    def call(self, x):
        for encoder_layer in self.encoder_layers:
            x = encoder_layer(x)
        return x

In [19]:
def test_Encoder():
    layer_input = tf.keras.Input(shape=(SEQ_LEN)) #Taille sequence : 5 (On peut gerer les autres tailles avec de PAD)
    input_embedding = Embedding(nb_token=NB_TOKEN)(layer_input)
    encoder_output = Encoder(NB_ENCODER)(input_embedding)
    model = tf.keras.Model(layer_input, encoder_output)
    model.summary()
    return model
    
model_test = test_Encoder()
out = model_test(input_seq)
print(out.shape)

Model: "model_5"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
input_6 (InputLayer)         [(None, 5)]               0         
_________________________________________________________________
embedding_5 (Embedding)      (None, 5, 256)            1280      
_________________________________________________________________
encoder (Encoder)            (None, 5, 256)            1976832   
Total params: 1,978,112
Trainable params: 1,978,112
Non-trainable params: 0
_________________________________________________________________
(1, 5, 256)


# 5) Decoder

In [20]:
class Decoder(tf.keras.layers.Layer):
    def __init__(self, nb_decoder, **kwargs):
        self.nb_encoder = nb_decoder
        super(**kwargs).__init__()
        
    def build(self, input_shape):
        self.decoder_layers = []
        for n in range(self.nb_encoder):
            self.decoder_layers.append(DecoderLayer())
        super().build(input_shape)
        
    def call(self, x):
        encoder_output, output_embedding, mask = x
        decoder_output = output_embedding
        for decoder_layer in self.decoder_layers:
            decoder_output = decoder_layer((encoder_output, decoder_output, mask))
        return decoder_output

In [21]:
def build_transformer():
    input_token  = tf.keras.Input(shape=(SEQ_LEN)) #Taille sequence : 5 (On peut gerer les autres tailles avec de PAD)
    output_token = tf.keras.Input(shape=(SEQ_LEN_OUT)) #Taille sequence : 6 

    #Positional encoding
    input_pos  = Embedding(nb_token=NB_TOKEN)(tf.range(SEQ_LEN))
    output_pos = Embedding(nb_token=NB_TOKEN_OUT)(tf.range(SEQ_LEN_OUT))
    
    #Embeddings
    input_embedding  = Embedding(nb_token=NB_TOKEN)(input_token)
    output_embedding = Embedding(nb_token=NB_TOKEN_OUT)(output_token)
    
    #Add positional encoding
    input_embedding  = input_embedding  + input_pos
    output_embedding = output_embedding + output_pos
    
    #Encoder
    encoder_output = Encoder(nb_encoder=NB_ENCODER)(input_embedding)
    
    #Mask
    mask = tf.sequence_mask(tf.range(SEQ_LEN_OUT) + 1, SEQ_LEN_OUT)
    mask = tf.cast(mask, tf.float32)
    mask = tf.expand_dims(mask, axis=0)
    
    #Decoder
    decoder_output = Decoder(NB_DECODER)((encoder_output, output_embedding, mask))
    
    #Predictions
    output_prediction = tf.keras.layers.Dense(len(output_voc))(decoder_output)
    predictions = tf.nn.softmax(output_prediction, axis=-1)
    
    model = tf.keras.Model([input_token, output_token], predictions)
    model.summary()
    return model
    
transformer = build_transformer()
output = transformer((input_seq, output_seq))
print(output.shape)
print(output)

Model: "model_6"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_7 (InputLayer)            [(None, 5)]          0                                            
__________________________________________________________________________________________________
embedding_10 (Embedding)        (None, 5, 256)       1280        input_7[0][0]                    
__________________________________________________________________________________________________
input_8 (InputLayer)            [(None, 6)]          0                                            
__________________________________________________________________________________________________
tf.__operators__.add (TFOpLambd (None, 5, 256)       0           embedding_10[0][0]               
____________________________________________________________________________________________