# Embedding layer

    Convert each word to input sequence and generate embedding vector.

In [1]:
import torch 
import torch.nn as nn
import math 
import copy 

class Embedding(nn.Module):
    def __init__(self, vocab_size, dmodel = 512) -> None:
        super(Embedding, self).__init__()
        self.vocab_size = vocab_size
        self.dmodel = dmodel    #dimension
        self.embed_layer = nn.Embedding(self.vocab_size, self.dmodel)
    def forward(self, x):
        embed_out = self.embed_layer(x)
        return embed_out * math.sqrt(self.dmodel)

In [28]:
def get_clone(module, num_clones):
    return nn.ModuleList([copy.deepcopy(module) for _ in range(num_clones)])

# Positional Encoding

    
Étant donné que notre modèle ne contient ni récurrence ni convolution, afin que le modèle puisse utiliser l'ordre de la séquence, nous devons injecter des informations sur la position relative ou absolue des tokens dans la séquence. Pour ce faire, nous ajoutons des encodages positionnels aux embeddings d'entrée au bas des piles de l'encodeur et du décodeur. Les encodages positionnels ont la même dimension $d_{\text{modèle}}$ que les embeddings, afin que les deux puissent être additionnés.

In [5]:
class PositionalEncoding(nn.Module):
    def __init__(self, max_seq_len, d_model = 512) -> None:
        super(PositionalEncoding, self).__init__()
        self.d_model = d_model
        pos = torch.arange(0, max_seq_len, dtype=torch.float).unsqueeze(1)
        frequency = torch.pow(10000, -torch.arange(0, d_model, 2, dtype = torch.float) / d_model)
        pe = torch.zeros((max_seq_len, d_model))
        pe[:, 0::2] = torch.sin(pos * frequency)
        pe[:, 1::2] = torch.cos(pos * frequency)
        self.register_buffer('pe', pe)
    def forward(self, embed_vect):
        return embed_vect + self.pe 

# Attention Layer

- First we create three vector(Query, Key, Value) from encoder input vector. Here new vector dimension is smaller than embedding vector.

- we compute the dot products of the query with all keys, divide each by square root of d_k. Here d_k is the dimension of key vector. In our case d_k = d_v = d_model/h = 64 (d_model=512).

- we apply softmax function and multiply with value matrix.

- Then we resize output dimension and pass through a linear function.

In [6]:
class MultiHeadAttention(nn.Module):
    def __init__(self, d_model = 512, n_head = 8, dropout_rate = 0.2) -> None:
        super().__init__()
        self.d_model = d_model
        self.n_head = n_head
        self.dropout = nn.Dropout(p = dropout_rate)
        self.head_dim = int(d_model / n_head)
        self.softmax_layer = nn.Softmax(dim = -1)
        self.w_key = nn.Linear(d_model, d_model)
        self.w_query = nn.Linear(d_model, d_model)
        self.w_value = nn.Linear(d_model, d_model)
        self.output_project = nn.Linear(d_model, d_model)

    def attention(self, key, query, value, mask=None):
        # calculate attenction score
        
        # attention score size for encoder attention = (BS,NH,S,S) , decoder attention = (BS,NH,T,T), encoder-decoder attention = (BS,NH,T,S)
        attention_score = torch.matmul(query, key.transpose(-2,-1)) / math.sqrt(self.head_dim)    # query = (BS,NH,S/T,HD) , key.transpose(-2,-1) = (BS,NH,HD,S/T)

        # apply masking
        if mask is not None:
            attention_score.masked_fill(mask==torch.tensor(False),float("-inf"))

        # pass through softmax layer
        attention_weight = self.softmax_layer(attention_score)

        # multiply with value
        # Final shape of score = (BS,NH,S/T,HD)
        score = torch.matmul(attention_weight,value)
        return score

    def forward(self, key, query, value, mask=None):
        batch_size = key.size()[0]

        # dot product with weight matrices
        # size of key/query/value = (BS,S/T,ED) ,
        # where BS = batch size,
        # S = Source Sequence length, T = target sequence lenth,
        # ED = Embedding dimension,
        # NH = Number Of Head, HD = head dimension
        key, query, value = self.w_key(key), self.w_query(query), self.w_value(value)

        # split vector by number of head and transpose
        # size of key/query/value = (BS,NH,S/T,HD) , where BS = batch size, NH = Number Of Head, ED = Head dimension
        key = key.view(batch_size,-1,self.n_head,self.head_dim).transpose(1, 2)
        query = query.view(batch_size,-1,self.n_head,self.head_dim).transpose(1, 2)
        value = value.view(batch_size,-1,self.n_head,self.head_dim).transpose(1, 2)

        # size of attention_score = (BS,NH,S/T,HD)
        attention_score = self.attention(key,query,value,mask) # size - torch.Size([2, 4, 8, 64]) -> [batch_size, max_seq_len,n_head, head_dim]
        attention_score = self.dropout(attention_score)

        # concatenated output
        attention_score = attention_score.transpose(1,2) # size = (BS,S/T,NH,HD)
        attention_score = attention_score.reshape(batch_size,-1,self.head_dim*self.n_head) # size = (BS,S/T,ED)

        # Pass through linear layer
        attention_out = self.output_project(attention_score)
        return attention_out

# Position-wise Feed-Forward Networks

$ \text{FFN} (x) = \text{ReLU} (x W_1 + b_1) W_2 + b_2$

In [9]:
class PositionWiseFeedForward(nn.Module):
    def __init__(self, d_model=512, dropout_rate = 0.2) -> None:
        super().__init__()
        self.d_model = d_model
        hidden_width = 4
        self.dropout = nn.Dropout(p=dropout_rate)
        self.linear1 = nn.Linear(d_model,d_model*hidden_width)
        self.linear2 = nn.Linear(d_model*hidden_width, d_model)
        self.relu = nn.ReLU()
    def forward(self,x):
        transfo_lineaire = self.linear1(x)    #y = xW + b
        relu = self.relu(transfo_lineaire)   #relu = max(0,y)
        drop = self.dropout(relu)    #Dropout désactive de manière aléatoire un certain pourcentage de neurones pour éviter l'overfitting
        return self.linear2(drop)

# Sub-Layer 

    Chaque bloc d'encodeur se compose de deux sous-couches : un mécanisme d'attention multi-têtes et un réseau feed-forward appliqué à chaque position. 

La normalisation par couche : Dans les modèles Transformer, LayerNorm est utilisée après chaque sous-couche (comme l'attention multi-têtes et le réseau feed-forward) pour normaliser les activations, ce qui aide à stabiliser l'entraînement et à améliorer les performances du modèle.

In [10]:
class SubLayer(nn.Module):
    def __init__(self,d_model = 512) -> None:
        super(SubLayer,self).__init__()
        self.norm = nn.LayerNorm(d_model)    #applique une normalisation par couche 
    def forward(self, x, sub_layer_x):
        return self.norm(x + sub_layer_x)

# Encoder Layer

    La couche d'encodeur possède deux sous-couches. La première est un mécanisme d'attention multi-tête, et la seconde est un réseau de neurones entièrement connecté et positionnel de type feed-forward.

In [43]:
class EncodeurLayer(nn.Module):
    def __init__(self,d_model,multi_head_arttention_layer,position_wise_feedforward_layer, dropout_rate = 0.2) -> None:
        super().__init__()
        self.d_model = d_model

        self.multi_head_arttention_layer = multi_head_arttention_layer
        self.dropout1 = nn.Dropout(p=dropout_rate)
        self.sublayer1 = SubLayer(d_model)

        self.position_wise_feedforward_layer = position_wise_feedforward_layer
        self.dropout2 = nn.Dropout(p=dropout_rate)
        self.sublayer2 = SubLayer(d_model)
    def forward(self,vec_representation,src_mask=None):
        # compute self attention
        attention_score =self.multi_head_arttention_layer(key = vec_representation,query = vec_representation,value = vec_representation,mask = src_mask)
        attention_score = self.dropout1(attention_score)
        # Layer Norm
        attention_out = self.sublayer1(vec_representation,attention_score)

        # pass Position Wise Feedforward Network
        position_wise_feedforward_out = self.position_wise_feedforward_layer(attention_out)
        position_wise_feedforward_out = self.dropout2(position_wise_feedforward_out)
        # Layer Norm
        encoder_out = self.sublayer2(attention_out,position_wise_feedforward_out)
        return encoder_out


The encoder is composed of a stack of N = 6 identical layers.

In [44]:
class EncoderBlock(nn.Module):
    def __init__(self,encoder_layer, num_layer = 6) -> None:
        super().__init__()
        self.encoder_layer = encoder_layer
        self.encoder_layer_list = get_clone(self.encoder_layer,num_layer)

    def forward(self,src_embedding,src_mask=None):
        encoder_out = src_embedding
        for encoder_layer in self.encoder_layer_list:
            encoder_out = encoder_layer(encoder_out,src_mask)
        return encoder_out

# Decodeur Layer 

    La couche de décodeur possède trois sous-couches. En plus des deux sous-couches de chaque couche d'encodeur, le décodeur insère une troisième sous-couche, qui effectue une attention multi-tête sur la sortie de la pile d'encodeurs.
    
    La couche de décodeur contient deux sous-couches : l'une est une attention multi-tête masquée, et l'autre est une "attention encodeur-décodeur". Dans les couches d'"attention encodeur-décodeur", les requêtes proviennent de la couche de décodeur précédente, tandis que les clés et les valeurs de mémoire proviennent de la sortie de l'encodeur. Cela permet à chaque position dans le décodeur d'attendre sur toutes les positions de la séquence d'entrée.

In [47]:
class DecoderLayer(nn.Module):
    def __init__(self,d_model,multi_head_arttention_layer,position_wise_feedforward_layer,dropout_rate = 0.2) -> None:
        super().__init__()
        self.d_model = d_model
        self.decoder_attention_layer = copy.deepcopy(multi_head_arttention_layer)
        self.dropout1 = nn.Dropout(p=dropout_rate)
        self.sublayer1 = SubLayer(d_model)

        self.encoder_decoder_attention_layer = copy.deepcopy(multi_head_arttention_layer)
        self.dropout2 = nn.Dropout(p=dropout_rate)
        self.sublayer2 = SubLayer(d_model)

        self.position_wise_feedforward_layer = position_wise_feedforward_layer
        self.dropout3 = nn.Dropout(p=dropout_rate)
        self.sublayer3 = SubLayer(d_model)

    def forward(self,enc,dec,src_mask=None,target_mask=None):
        decoder_attention_out = self.decoder_attention_layer(key = dec,query = dec,value = dec,mask = target_mask)
        decoder_attention_out = self.dropout1(decoder_attention_out)
        decoder_attention_out = self.sublayer1(dec,decoder_attention_out)

        enc_dec_attention_out = self.encoder_decoder_attention_layer(key = enc,query = decoder_attention_out,value = enc,mask = src_mask)
        enc_dec_attention_out = self.dropout2(enc_dec_attention_out)
        enc_dec_attention_out = self.sublayer2(decoder_attention_out,enc_dec_attention_out)

        ffn_out = self.position_wise_feedforward_layer(enc_dec_attention_out)
        ffn_out = self.dropout2(ffn_out)
        ffn_out = self.sublayer2(enc_dec_attention_out,ffn_out)

        return ffn_out



In [48]:
class DecoderBlock(nn.Module):
    def __init__(self,decoder_layer,num_layer = 6) -> None:
        super().__init__()
        self.decoder_layer = decoder_layer
        self.decoder_layer_list = get_clone(self.decoder_layer,num_layer)
        self.layer_norm = nn.LayerNorm(self.decoder_layer.d_model)

    def forward(self,encoder_out_vec,decoder_embedding,src_mask=None,target_mask=None):
        dec_out = decoder_embedding
        for decoder_layer in self.decoder_layer_list:
            dec_out = decoder_layer(enc = encoder_out_vec,dec = dec_out,src_mask = src_mask,target_mask = target_mask)
        return dec_out

In [49]:
class DecoderGenerator(nn.Module):
    def __init__(self,d_model,target_vocab_size) -> None:
        super().__init__()
        self.linear = nn.Linear(d_model,target_vocab_size)
        self.softmax_layer = nn.LogSoftmax(dim=-1)
    def forward(self,target_vec_rep):
        return self.softmax_layer(self.linear(target_vec_rep))

# Transformer Block 

In [50]:
class Transformers(nn.Module):
    def __init__(self,
                 src_seq_len,
                 trg_seq_len,
                 d_model,
                 num_head,
                 dropout_rate = 0.2) -> None:
        super().__init__()
        self.src_seq_len = src_seq_len
        self.trg_seq_len = trg_seq_len
        self.d_model = d_model
        self.num_head = num_head

        self.src_embedding = Embedding(self.src_seq_len, self.d_model)
        self.src_pe = PositionalEncoding(self.trg_seq_len, self.d_model)

        self.multi_head_attention = MultiHeadAttention(self.d_model, self.num_head, dropout_rate)
        self.position_wise_feedforward = PositionWiseFeedForward(self.d_model, dropout_rate)

        self.encoder_layer = EncodeurLayer(d_model, self.multi_head_attention, self.position_wise_feedforward, dropout_rate)
        self.decodeur_layer = DecoderLayer(d_model,self.multi_head_attention,self.position_wise_feedforward,dropout_rate)


        self.encoder_block = EncoderBlock(self.encoder_layer, num_layer = 6)
        self.decodeur_block = DecoderBlock(self.decodeur_layer, num_layer = 6)
        self.decoder_out_gen = DecoderGenerator(self.d_model, self.trg_seq_len)

    def forward(self, src_token_id, target_token_id, src_mask=None,target_mask=None):
        encode_out = self.encode(src_token_id, src_mask)
        decode_out = self.decode(encode_out, target_token_id, src_mask, target_mask)
        return decode_out

    def encode(self, src_token_id, src_mask):
        embed = self.src_embedding(src_token_id) 
        pe_out = self.src_pe(embed)
        encoder_out = self.encoder_block(pe_out, src_mask)
        return encoder_out

    def decode(self, enc_out, trg_token_ids, src_mask=None, target_mask=None):
        embed = self.src_embedding(trg_token_ids)
        pe_out = self.src_pe(embed)
        decoder_out = self.decodeur_block(enc_out, pe_out, src_mask, target_mask)
        decoder_out = self.decoder_out_gen(decoder_out)
        return decoder_out

In [51]:
def get_src_mask(src_token_ids_batch,pad_tok_id):
    batch_size = src_token_ids_batch.size()[0]
    src_mask = (src_token_ids_batch!=pad_tok_id).view(batch_size, 1, 1,-1) #SIZE = (BS,1,1,S)
    return src_mask
def get_trg_mask(trg_token_ids_batch,pad_tok_id):
    batch_size = trg_token_ids_batch.size()[0]
    seq_len = trg_token_ids_batch.size()[1]
    trg_pad_mask = (trg_token_ids_batch!=pad_tok_id).view(batch_size, 1, 1,-1) #SIZE = (BS,1,1,T)
    trg_look_forward = torch.triu(torch.ones(1,1,seq_len,seq_len)).transpose(2,3)
    trg_mask = trg_pad_mask & trg_look_forward
    return trg_mask

UNK_IDX, PAD_IDX, BOS_IDX, EOS_IDX = 0, 1, 2, 3
low_bound = 3
high_bound = 15
batch_size = 32
src_seq_len = 10
trg_seq_len = 15

src_tensor_size = (batch_size, src_seq_len)  
trh_tensor_size = (batch_size, trg_seq_len)  

src_seq = torch.randint(3, 16, size=src_tensor_size, dtype=torch.int32)
trg_seq = torch.randint(3, 16, size=trh_tensor_size, dtype=torch.int32)
transformer = Transformers(
    src_seq_len = 20,
    trg_seq_len = 20,
    d_model = 512,
    num_head = 8,
    dropout_rate = 0.2
)
src_mask = get_src_mask(src_seq,PAD_IDX)
trg_mask = get_src_mask(trg_seq,PAD_IDX)
output = transformer(src_seq, trg_seq,src_mask,trg_mask)

RuntimeError: The size of tensor a (10) must match the size of tensor b (20) at non-singleton dimension 1