# Implémentation du réseau proposé dans l'article "Attention is all you need"

In [None]:
import tensorflow as tf
import numpy as np
from tensorflow.keras.layers import LayerNormalization, Conv1D

## Calculer l'attention mise à l'échelle


\begin{equation*}
\mathbf A(\mathbf X) = \mathbf V(\mathbf X).Softmax(\frac{\mathbf K(\mathbf X)^T\mathbf Q(\mathbf X)}{\sqrt{d_q}})
\end{equation*}

On pourra utiliser les fonctions de tensorflow `matmul` et `softmax`. On retournera l'attention ainsi que les poids calculés par le softmax

In [None]:
def AttentionEchelle(Q, K, V):
    """
    Entrée :
        Q -- requetes
        K -- clés
        V -- valeurs

    Sortie :
        A(X), Poids d'attention
    """

    return A, poids_attention

On donne une classe python pour coder un mécanisme d'attention multiple. H est le nombre de têtes, dim_e est la taille des représentations (embeddings), dq est la taille de Q et K, et dv la taille de V

In [None]:
class Multihead_Attention(tf.keras.layers.Layer):
    def __init__(self, H, dim_e, dq, dv):

        super(Multihead_Attention, self).__init__()

        #On initialisation des matrices de poids
        initializer = tf.keras.initializers.GlorotUniform(seed=42)
        self.WQ = tf.Variable(initializer(shape=(H, dim_e, dq)), trainable=True)
        self.WK = tf.Variable(initializer(shape=(H, dim_e, dq)), trainable=True)
        self.WV = tf.Variable(initializer(shape=(H, dim_e, dv)), trainable=True)
        self.WO = tf.Variable(initializer(shape=(H*dv,dim_e)), trainable=True)


    # Calcul des poids d'attention : on utilise la fonction précédente
    def call(self, Q, K, V):

        Qh= np.dot(Q, self.WQ)
        Kh= np.dot(K, self.WK)
        Vh= np.dot(V, self.WV)

        #Transposition
        Qh=tf.transpose(Qh, [0,2,1,3])
        Kh=tf.transpose(Kh, [0,2,1,3])
        Vh=tf.transpose(Vh, [0,2,1,3])
        Ah,_=AttentionEchelle(Qh, Kh, Vh)
        A = tf.reshape(Ah,(Ah.shape[0],Ah.shape[2],Ah.shape[1]*Ah.shape[3]))
        A= np.dot(A, self.WO)

        return A

On donne une classe implémentant un réseau complètement connecté. dim_e est la taille de l'embedding, dim_h la taille de la couche cachée

In [None]:
class MLP(tf.keras.layers.Layer):
    def __init__(self, dim_e, dim_h):

        super(MLP, self).__init__()
        self.layer1 = Conv1D(filters=dim_h, kernel_size=1,activation="relu")
        self.layer2 = Conv1D(filters=dim_e, kernel_size=1)


    def call(self, x):
        x=self.layer1(x)
        fnn_layer_out=self.layer2(x)


        return fnn_layer_out

On donne une classe permettant de précalculer une matrice contenant les encodages de position.

In [None]:
def positional_encoding(positions, d):

    #Vecteur colonne contenant l'ensemble des positions de 0 à positions
    pos=np.arange(positions)[:, np.newaxis]
    #Vecteur ligne contenant les entiers de 0 à d-1 (dimensions)
    k= np.arange(d)[np.newaxis, :]
    i = k//2
    #Matrice des angles
    angles = pos/(10000**(2*i/d))

    angles[:, 0::2] = np.sin(angles[:, 0::2])
    angles[:, 1::2] = np.cos(angles[:, 1::2])

    #Ajout d'un axe pour le traitement par batch
    pos_encoding = angles[np.newaxis, ...]
    return tf.cast(pos_encoding, dtype=tf.float32)

## Construire l'encodeur comme spécifié dans la figure du cours.
On donne le squelette de la classe à compléter. Ici :
- H :nombre de têtes
- dim_e: dimension de la représentation
- dq : Taille de Q et K
- dv : taille de V
- dim_h : dimension de la couche cachée du MLP
- eta : paramètre de régularisation de la couche de normalisation

In [None]:
class EncoderLayer(tf.keras.layers.Layer):

    def __init__(self, H, dim_e, dq, dv, dim_h, layernorm_eta=1e-5):

        super(EncoderLayer, self).__init__()
        self.mha = Multihead_Attention(H, dim_e, dq, dv)
        self.mlp = MLP(dim_e, dim_h)
        self.layernorm1 = LayerNormalization(epsilon=layernorm_eta)
        self.layernorm2 = LayerNormalization(epsilon=layernorm_eta)


    def call(self, x):
        """
        A compléter (figure du cours) : x est le tenseur de données, de taille (batch_size,N,dim_e).
        En retour, on attend un tenseur de taille (batch_size,N,dim_e)
        Attention, ne pas oublier les connexions résiduelles !!
        """
        return encoder_layer_out

L'encodeur est un ensemble de K couches d'encodeurs

In [None]:
class Encoder(tf.keras.layers.Layer):

    def __init__(self, K, H, dim_e, dq, dv, dim_h,layernorm_eta=1e-6):
        super(Encoder, self).__init__()

        self.layers=[EncoderLayer(H, dim_e, dq, dv, dim_h,layernorm_eta=layernorm_eta)
                                  for i in range(K)]

    def call(self, x):
        for layer in self.layers:
            x = layer(x)

        return x


## Faire le même travail pour le décodeur, en fonction de la figure du cours

In [None]:
class DecoderLayer(tf.keras.layers.Layer):

    def __init__(self, H, dim_e, dq, dv, dim_h,layernorm_eta=1e-6):

        super(DecoderLayer, self).__init__()

        self.mha1 = Multihead_Attention(H, dim_e, dq, dv)
        self.mha2 = Multihead_Attention(H, dim_e, dq, dv)
        self.mlp = MLP(dim_e, dim_h)
        self.layernorm1 = LayerNormalization(epsilon=layernorm_eta)
        self.layernorm2 = LayerNormalization(epsilon=layernorm_eta)
        self.layernorm3 = LayerNormalization(epsilon=layernorm_eta)

    def call(self, x, encoder_output):

        """
        A compléter (figure du cours) :
        x est le tenseur de données, de taille (batch_size,N,dim_e).
        encoder_output est la sortie de l'encodeur
        Attention, ne pas oublier les connexions résiduelles !!
        """

        return decoder_layer_out

Le décodeur est un ensemble de K couches de décodeurs

In [None]:
class Decoder(tf.keras.layers.Layer):

    def __init__(self, K, H, dim_e, dq, dv, dim_h,  layernorm_eta=1e-6):

        super(Decoder, self).__init__()
        self.layers=[DecoderLayer(H, dim_e, dq, dv, dim_h,layernorm_eta=layernorm_eta)
                                  for i in range(K)]

    def call(self, x, encoder_output, training=False):
        for layer in self.layers:
            x = layer(x,encoder_output)

        return x

## Construire l'assemblage de l'encodeur et du décodeur pour produire le transformer.

vous devez compléter la fonction call

In [None]:
class Transformer(tf.keras.Model):

    def __init__(self, N, H, dim_e, dq, dv, dim_h,
                 vocab_size, max_positional_encoding,
                 layernorm_eta=1e-6):

        super(Transformer, self).__init__()

        initializer = tf.keras.initializers.GlorotUniform()
        self.embedding = tf.Variable(initializer(shape=(vocab_size, dim_e)), trainable=True)
        self.PE = positional_encoding(max_positional_encoding, dim_e)
        self.encoder = Encoder(N, H, dim_e, dq, dv, dim_h, layernorm_eta=layernorm_eta)
        self.decoder = Decoder(N, H, dim_e, dq, dv, dim_h, layernorm_eta=layernorm_eta)



    def call(self, x, y):
        """
        A faire
        """
        return pred

## Pour vérifier que votre code fonctionne, on instantie le problème avec les valeurs données dans l'article,
On affiche un résumé du transformer. Vous devez obtenir 44,116,480 paramètres entraînables.

In [None]:
N, H, dim_e, dq, dv, dim_h, vocab_size, T, batch_size = 6, 8, 512, 64, 64, 2048,29, 11,3

transformer = Transformer(N, H, dim_e, dq, dv, dim_h, vocab_size, T)

input_shape = (None, T,vocab_size)
x = tf.random.uniform((batch_size, T, vocab_size))
y =  tf.random.uniform((batch_size, T, vocab_size))

pred = transformer(x,y)
transformer.summary()