In [525]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.utils.data as data
import math
import copy
import numpy as np


![multi_head_attention.png](multi_head_attention.png)\
![Detail_attention_head.png](Detail_attention_head.png)


In [526]:
class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, num_heads):
        super(MultiHeadAttention, self).__init__()
        assert d_model % num_heads == 0, "d_model must be divisible by num_heads"
        
        self.d_model = d_model
        self.num_heads = num_heads
        self.d_k = 1
        
        self.W_q = nn.Linear(d_model, d_model).double()
        self.W_k = nn.Linear(d_model, d_model).double()
        self.W_v = nn.Linear(d_model, d_model).double()
        self.W_o = nn.Linear(d_model, d_model).double()
        
    def scaled_dot_product_attention(self, Q, K, V, mask=None):
        attn_scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(self.d_k)
        if mask is not None:
            attn_scores = attn_scores.masked_fill(mask == 0, -1e9)
        attn_probs = torch.softmax(attn_scores, dim=-1)
        output = torch.matmul(attn_probs, V)
        return output
        
    def split_heads(self, x):
        batch_size, seq_length, d_model = x.size()
        return x.view(batch_size, seq_length, self.num_heads, self.d_k).transpose(1, 2)
        
    def combine_heads(self, x):
        batch_size, _, d_k,seq_length, _ = x.size()
        return x.transpose(1, 2).contiguous().view(batch_size, seq_length, self.d_model)
        
    def forward(self, Q, K, V, mask=None):
        Q = self.split_heads(self.W_q(Q))
        K = self.split_heads(self.W_k(K))
        V = self.split_heads(self.W_v(V))
        
        attn_output = self.scaled_dot_product_attention(Q, K, V, mask)
        output = self.W_o(self.combine_heads(attn_output))
        return output

In [527]:
class PositionWiseFeedForward(nn.Module):
    def __init__(self, d_model, d_ff):
        super(PositionWiseFeedForward, self).__init__()
        self.fc1 = nn.Linear(d_model, d_ff).double()
        self.fc2 = nn.Linear(d_ff, d_model).double()
        self.relu = nn.ReLU()

    def forward(self, x):
        return self.fc2(self.relu(self.fc1(x)))

![encoder.png](encoder_1.png)

In [528]:
class EncoderLayer(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, dropout):
        super(EncoderLayer, self).__init__()
        self.self_attn = MultiHeadAttention(d_model, num_heads)
        self.feed_forward = PositionWiseFeedForward(d_model, d_ff)
        self.norm1 = nn.LayerNorm(d_model).double()
        self.norm2 = nn.LayerNorm(d_model).double()
        self.dropout = nn.Dropout(dropout)
        
    def forward(self, x, mask):
        attn_output = self.self_attn(x, x, x, mask)
        x = self.norm1(x + self.dropout(attn_output))
        ff_output = self.feed_forward(x)
        x = self.norm2(x + self.dropout(ff_output))
        return x

![decoder.png](decoder_1.png)

In [529]:
class DecoderLayer(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, dropout):
        super(DecoderLayer, self).__init__()
        self.self_attn = MultiHeadAttention(d_model, num_heads)
        self.cross_attn = MultiHeadAttention(d_model, num_heads)
        self.feed_forward = PositionWiseFeedForward(d_model, d_ff)
        self.norm1 = nn.LayerNorm(d_model).double()
        self.norm2 = nn.LayerNorm(d_model).double()
        self.norm3 = nn.LayerNorm(d_model).double()
        self.dropout = nn.Dropout(dropout)
        
    def forward(self, enc_output, src_mask):
        attn_output = self.self_attn(enc_output, enc_output, enc_output, src_mask)
        x = enc_output
        x = self.norm1(x + self.dropout(attn_output))
        attn_output = self.cross_attn(x, enc_output, enc_output, src_mask)
        x = self.norm2(x + self.dropout(attn_output))
        ff_output = self.feed_forward(x)
        x = self.norm3(x + self.dropout(ff_output))
        return x

![transformer_model.png](transformer_model_1.png)

In [530]:
#num_layers : nombre d'encoder et decoder

class Transformer(nn.Module):
    def __init__(self, d_model, num_heads, num_layers, d_ff, dropout, d_output):
        super(Transformer, self).__init__()
        self.encoder_embedding = nn.Embedding(100, d_model)
        #Création des encoders et decoder au nombre de num_layers chacun
        self.encoder_layers = nn.ModuleList([EncoderLayer(d_model, num_heads, d_ff, dropout) for _ in range(num_layers)])
        self.decoder_layers = nn.ModuleList([DecoderLayer(d_model, num_heads, d_ff, dropout) for _ in range(num_layers)])

        #Creation de la couche linéaire finale
        self.fc = nn.Linear(d_model, d_output).double()
        #Creation du dropout
        #self.dropout = nn.Dropout(dropout) -------------A voir-------------

    def generate_mask(self, src):
        src_mask = (src != 0).unsqueeze(1).unsqueeze(2)
        return src_mask

    def forward(self, src):
        src_mask = self.generate_mask(src)
        #src_embedded = self.dropout(self.positional_encoding(self.encoder_embedding(src))) pas besoin deja embedded; on change un peu la fonction

        #-----------------------------Utilisation de dropout-----------------------------
        #src_embedded = self.dropout(src)
        #-----------------------------Utilisation de dropout-----------------------------
        
        #On créé des valeur inutile pour garder la structure de base si on besoin d'une adaptation avec dropout
        print(src.size())
        embedding = nn.Embedding(100, 1)
        embedding(src)
        

        #Couche des N encodeurs
        enc_output = src_embedded
        for enc_layer in self.encoder_layers:
            enc_output = enc_layer(enc_output, src_mask)
        
        #Couche des N decodeurs
        for dec_layer in self.decoder_layers:
            dec_output = dec_layer(enc_output, src_mask)
            
            
        #Sortie linéaire
        print(dec_output.size())
        output = self.fc(dec_output)
        print(output)
        print(output.size())
        return output

For the optimal performance, we assign 1024 neurons in FFN layers and set 32 as the padding size of embedding.\
We train our model with 25 training epochs, a 0.5 dropout rate, a learning rate of 5e − 4, and a batch size of 128.\
We apply Stochastic Gradient Descent (SGD) algorithm totrain the model and we choose the cross-entropy as the loss function.\
Insipré du site https://towardsdatascience.com/build-your-own-transformer-from-scratch-using-pytorch-84c850470dcb

In [531]:


#----------------------------------Remplir en fonction de l'entrée----------------------------------
X_input = np.load('./X_input.npy')
Y = np.load('./Y.npy')
#----------------------------------Remplir en fonction de l'entrée----------------------------------


d_output = 15 #Nombre de labels
d_model = 32
seq_len = np.shape(X_input)[1] #np.shape(X_input)[1] #Longueur du vecteur d'entrée (d_model) normalement 82
num_heads = 1  #d_model % num_heads == 0, "d_model must be divisible by num_heads"
num_layers = 6 #RTIDS Nombre de répétition des encoders/decoders
d_ff = 1024 #RTIDS dimension du FFN layer
dropout = 0.5 #RTIDS

transformer = Transformer(d_model, num_heads, num_layers, d_ff, dropout, d_output)

print(transformer)


# Generate random sample data
#src_data = torch.randint(1, src_vocab_size, (64, max_seq_length))  # (batch_size, seq_length)  --------------------A changer--------------------
#tgt_data = torch.randint(1, tgt_vocab_size, (64, max_seq_length))  # (batch_size, seq_length)  --------------------A changer--------------------

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(transformer.parameters(), lr=0.0005)  #RTIDS

transformer.train()

for epoch in range(25):
    i=0
    for raw in X_input:
        running_loss = 0.

        optimizer.zero_grad()
        
        #Changer la valeur 1 pour s_batch si on prend plus de raw (nb_raw==s_batch)
        output = transformer(torch.from_numpy(raw).view(1,seq_len))
        loss = criterion(output, torch.from_numpy(Y[i]))
        loss.backward()
        optimizer.step()

        running_loss += loss.item()
        i = i+1
    print(f"Epoch: {epoch+1}, Loss: {running_loss}")


Transformer(
  (encoder_embedding): Embedding(100, 32)
  (encoder_layers): ModuleList(
    (0-5): 6 x EncoderLayer(
      (self_attn): MultiHeadAttention(
        (W_q): Linear(in_features=32, out_features=32, bias=True)
        (W_k): Linear(in_features=32, out_features=32, bias=True)
        (W_v): Linear(in_features=32, out_features=32, bias=True)
        (W_o): Linear(in_features=32, out_features=32, bias=True)
      )
      (feed_forward): PositionWiseFeedForward(
        (fc1): Linear(in_features=32, out_features=1024, bias=True)
        (fc2): Linear(in_features=1024, out_features=32, bias=True)
        (relu): ReLU()
      )
      (norm1): LayerNorm((32,), eps=1e-05, elementwise_affine=True)
      (norm2): LayerNorm((32,), eps=1e-05, elementwise_affine=True)
      (dropout): Dropout(p=0.5, inplace=False)
    )
  )
  (decoder_layers): ModuleList(
    (0-5): 6 x DecoderLayer(
      (self_attn): MultiHeadAttention(
        (W_q): Linear(in_features=32, out_features=32, bias=True)


RuntimeError: Expected tensor for argument #1 'indices' to have one of the following scalar types: Long, Int; but got torch.DoubleTensor instead (while checking arguments for embedding)