In [3]:
import torch
import torch.nn as nn
from torch.nn.functional import softmax

In [4]:
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=5000):
        super(PositionalEncoding, self).__init__()

        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-torch.log(torch.tensor(10000.0)) / d_model))

        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)

        pe = pe.unsqueeze(0).transpose(0, 1)
        self.register_buffer('pe', pe)

    def forward(self, x):
        x = x + self.pe[:x.size(0), :]
        return x

In [5]:
class ScaledDotProductAttention(nn.Module):
    def __init__(self, d_k):
        super(ScaledDotProductAttention, self).__init__()
        self.d_k = d_k

    def forward(self, Q, K, V, attn_mask):
        scores = torch.matmul(Q, K.transpose(-2, -1)) / torch.sqrt(torch.tensor(self.d_k).float()) 
        scores.masked_fill_(attn_mask == 0, -1e9) 
        attn = softmax(scores, dim=-1)
        context = torch.matmul(attn, V)
        return context, attn

In [6]:
class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, n_heads):
        super(MultiHeadAttention, self).__init__()
        self.n_heads = n_heads
        self.d_k = d_model // n_heads
        self.d_v = d_model // n_heads

        self.W_Q = nn.Linear(d_model, d_model)
        self.W_K = nn.Linear(d_model, d_model)
        self.W_V = nn.Linear(d_model, d_model)
        self.linear = nn.Linear(n_heads * self.d_v, d_model)

    def forward(self, Q, K, V, attn_mask):
        batch_size = Q.size(0)
        
        Q = self.W_Q(Q).view(batch_size, -1, self.n_heads, self.d_k).transpose(1,2) 
        K = self.W_K(K).view(batch_size, -1, self.n_heads, self.d_k).transpose(1,2)
        V = self.W_V(V).view(batch_size, -1, self.n_heads, self.d_v).transpose(1,2)
        
        attn_mask = attn_mask.unsqueeze(1).repeat(1, self.n_heads, 1, 1) 

        context, attn = ScaledDotProductAttention(self.d_k)(Q, K, V, attn_mask)
        context = context.transpose(1, 2).reshape(batch_size, -1, self.n_heads * self.d_v) 
        output = self.linear(context)
        return output, attn

In [7]:
class PoswiseFeedForwardNet(nn.Module):
    def __init__(self, d_model, d_ff):
        super(PoswiseFeedForwardNet, self).__init__()
        self.fc1 = nn.Linear(d_model, d_ff)
        self.fc2 = nn.Linear(d_ff, d_model)
        self.relu = nn.ReLU()

    def forward(self, x):
        return self.fc2(self.relu(self.fc1(x)))

In [8]:
class EncoderLayer(nn.Module):
    def __init__(self, d_model, n_heads, d_ff, dropout=0.1):
        super(EncoderLayer, self).__init__()
        self.enc_self_attn = MultiHeadAttention(d_model, n_heads)
        self.pos_ffn = PoswiseFeedForwardNet(d_model, d_ff)
        self.layer_norm1 = nn.LayerNorm(d_model)
        self.layer_norm2 = nn.LayerNorm(d_model)
        self.dropout1 = nn.Dropout(dropout)
        self.dropout2 = nn.Dropout(dropout)

    def forward(self, inputs, enc_self_attn_mask):
        outputs, attn = self.enc_self_attn(inputs, inputs, inputs, enc_self_attn_mask) 
        outputs = inputs + self.dropout1(outputs)
        outputs = self.layer_norm1(outputs)
        outputs = outputs + self.dropout2(self.pos_ffn(outputs)) 
        outputs = self.layer_norm2(outputs)
        return outputs, attn

class DecoderLayer(nn.Module):
    def __init__(self, d_model, n_heads, d_ff, dropout=0.1):
        super(DecoderLayer, self).__init__()
        self.dec_self_attn = MultiHeadAttention(d_model, n_heads)
        self.dec_enc_attn = MultiHeadAttention(d_model, n_heads)
        self.pos_ffn = PoswiseFeedForwardNet(d_model, d_ff)
        self.layer_norm1 = nn.LayerNorm(d_model)
        self.layer_norm2 = nn.LayerNorm(d_model)
        self.layer_norm3 = nn.LayerNorm(d_model)
        self.dropout1 = nn.Dropout(dropout)
        self.dropout2 = nn.Dropout(dropout)
        self.dropout3 = nn.Dropout(dropout)

    def forward(self, inputs, enc_outputs, dec_self_attn_mask, dec_enc_attn_mask):
        outputs, dec_self_attn = self.dec_self_attn(inputs, inputs, inputs, dec_self_attn_mask) 
        outputs = inputs + self.dropout1(outputs)
        outputs = self.layer_norm1(outputs)
        outputs, dec_enc_attn = self.dec_enc_attn(outputs, enc_outputs, enc_outputs, dec_enc_attn_mask)
        outputs = outputs + self.dropout2(outputs)
        outputs = self.layer_norm2(outputs)
        outputs = outputs + self.dropout3(self.pos_ffn(outputs))
        outputs = self.layer_norm3(outputs)

        return outputs, dec_self_attn, dec_enc_attn

class Encoder(nn.Module):
    def __init__(self, vocab_size, d_model, n_layers, n_heads, d_ff, dropout=0.1, max_len=5000):
        super(Encoder, self).__init__()
        self.src_emb = nn.Embedding(vocab_size, d_model)
        self.pos_emb = PositionalEncoding(d_model, max_len)
        self.layers = nn.ModuleList([EncoderLayer(d_model, n_heads, d_ff, dropout) for _ in range(n_layers)])
        self.dropout = nn.Dropout(p=dropout)

    def forward(self, x, src_mask):
        x = self.dropout(self.pos_emb(self.src_emb(x)))
        enc_self_attns = []
        for layer in self.layers:
            x, enc_self_attn = layer(x, src_mask)
            enc_self_attns.append(enc_self_attn)
        return x, enc_self_attns

class Decoder(nn.Module):
    def __init__(self, vocab_size, d_model, n_layers, n_heads, d_ff, dropout=0.1, max_len=5000):
        super(Decoder, self).__init__()
        self.tgt_emb = nn.Embedding(vocab_size, d_model)
        self.pos_emb = PositionalEncoding(d_model, max_len)
        self.layers = nn.ModuleList([DecoderLayer(d_model, n_heads, d_ff, dropout) for _ in range(n_layers)])
        self.dropout = nn.Dropout(p=dropout)

    def forward(self, x, enc_outputs, src_mask, tgt_mask):
        x = self.dropout(self.pos_emb(self.tgt_emb(x)))
        dec_self_attns, dec_enc_attns = [], []
        for layer in self.layers:
            x, dec_self_attn, dec_enc_attn = layer(x, enc_outputs, tgt_mask, src_mask)
            dec_self_attns.append(dec_self_attn)
            dec_enc_attns.append(dec_enc_attn)
        return x, dec_self_attns, dec_enc_attns

In [9]:
class Transformer(nn.Module):
    def __init__(self, src_vocab_size, tgt_vocab_size, d_model, n_layers, n_heads, d_ff, dropout=0.1, max_len=5000):
        super(Transformer, self).__init__()
        self.encoder = Encoder(src_vocab_size, d_model, n_layers, n_heads, d_ff, dropout, max_len)
        self.decoder = Decoder(tgt_vocab_size, d_model, n_layers, n_heads, d_ff, dropout, max_len)
        self.projection = nn.Linear(d_model, tgt_vocab_size, bias=False)

    def forward(self, enc_inputs, dec_inputs, src_mask, tgt_mask):
        enc_outputs, enc_self_attns = self.encoder(enc_inputs, src_mask)
        dec_outputs, dec_self_attns, dec_enc_attns = self.decoder(dec_inputs, enc_outputs, src_mask, tgt_mask)
        dec_logits = self.projection(dec_outputs) 
        return dec_logits.view(-1, dec_logits.size(-1)), enc_self_attns, dec_self_attns, dec_enc_attns

In [10]:
# Hyperparamètres
d_model = 512  # Dimension du modèle
n_layers = 6  # Nombre de couches dans l'encodeur et le décodeur
n_heads = 8  # Nombre de têtes d'attention
d_ff = 2048  # Dimension de la couche feed-forward
src_vocab_size = 8192  # Taille du vocabulaire source
tgt_vocab_size = 8192  # Taille du vocabulaire cible

# Instancier le modèle
model = Transformer(src_vocab_size, tgt_vocab_size, d_model, n_layers, n_heads, d_ff)

# ... (Code pour créer les masques src_mask et tgt_mask, 
#      charger les données d'entraînement, définir la fonction de coût,
#      l'optimiseur, etc.)

# Boucle d'entraînement (simplifiée)
# for epoch in range(num_epochs):
#     for enc_inputs, dec_inputs, src_mask, tgt_mask in train_loader:
#         # ... (Code pour calculer la sortie du modèle, la fonction de coût,
#         #      mettre à jour les paramètres du modèle, etc.)

In [11]:
model = Transformer(src_vocab_size, tgt_vocab_size, d_model, n_layers, n_heads, d_ff)

In [12]:
torch.save(model, 'transformer.pth')

In [13]:
# Plot the model
from torchviz import make_dot
enc_inputs = torch.tensor([[1, 2, 3, 4, 5]])
dec_inputs = torch.tensor([[1, 2, 3, 4, 5]])
src_mask = torch.tensor([[1, 1, 1, 1, 1]])
tgt_mask = torch.tensor([[1, 1, 1, 1, 1]])
out = model(enc_inputs, dec_inputs, src_mask, tgt_mask)
make_dot(out[0], params=dict(model.named_parameters()))

# Afficher le graphique
make_dot(out[0], params=dict(model.named_parameters()), show_attrs=True).render("attached", format="svg")

'attached.svg'

In [14]:
# Afficher le graphique
# make_dot(out[0], params=dict(model.named_parameters()))

In [15]:
model.parameters

<bound method Module.parameters of Transformer(
  (encoder): Encoder(
    (src_emb): Embedding(8192, 512)
    (pos_emb): PositionalEncoding()
    (layers): ModuleList(
      (0-5): 6 x EncoderLayer(
        (enc_self_attn): MultiHeadAttention(
          (W_Q): Linear(in_features=512, out_features=512, bias=True)
          (W_K): Linear(in_features=512, out_features=512, bias=True)
          (W_V): Linear(in_features=512, out_features=512, bias=True)
          (linear): Linear(in_features=512, out_features=512, bias=True)
        )
        (pos_ffn): PoswiseFeedForwardNet(
          (fc1): Linear(in_features=512, out_features=2048, bias=True)
          (fc2): Linear(in_features=2048, out_features=512, bias=True)
          (relu): ReLU()
        )
        (layer_norm1): LayerNorm((512,), eps=1e-05, elementwise_affine=True)
        (layer_norm2): LayerNorm((512,), eps=1e-05, elementwise_affine=True)
        (dropout1): Dropout(p=0.1, inplace=False)
        (dropout2): Dropout(p=0.1, inpla