In [None]:
# importation des bibliothèques
import torch
import torch.nn as nn
import torch.optim as optim
import math
from torch.utils.data import DataLoader, Dataset
import sentencepiece as spm
from tqdm import tqdm

In [None]:
# Implémente le mécanisme d’attention multi-tête qui 
# permet à chaque mot d'une phrase de regarder tous les autres mots en parallèle 
# et de pondérer leur importance pour mieux comprendre le contexte
class MultiHeadAttention(nn.Module):
    def __init__(self, embed_size, num_heads):
        super(MultiHeadAttention, self).__init__()
        assert embed_size % num_heads == 0, "embed_size doit être divisible par num_heads"
        self.num_heads = num_heads
        self.head_dim = embed_size // num_heads

        self.query = nn.Linear(embed_size, embed_size)
        self.key = nn.Linear(embed_size, embed_size)
        self.value = nn.Linear(embed_size, embed_size)
        self.fc_out = nn.Linear(embed_size, embed_size)

    def forward(self, query, key, value, mask=None):
        batch_size = query.shape[0]

        Q = self.query(query).view(batch_size, -1, self.num_heads, self.head_dim).transpose(1, 2)
        K = self.key(key).view(batch_size, -1, self.num_heads, self.head_dim).transpose(1, 2)
        V = self.value(value).view(batch_size, -1, self.num_heads, self.head_dim).transpose(1, 2)

        scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(self.head_dim)

        if mask is not None:
            scores = scores.masked_fill(mask == 0, float('-inf'))

        attention = torch.softmax(scores, dim=-1)
        output = torch.matmul(attention, V)

        output = output.transpose(1, 2).contiguous().view(batch_size, -1, self.num_heads * self.head_dim)
        return self.fc_out(output)

In [None]:
# Ajoute un réseau de neurones à chaque couche du Transformer
class FeedForward(nn.Module):
    def __init__(self, embed_size, hidden_size):
        super(FeedForward, self).__init__()
        self.fc1 = nn.Linear(embed_size, hidden_size)
        self.fc2 = nn.Linear(hidden_size, embed_size)
        self.relu = nn.ReLU()

    def forward(self, x):
        return self.fc2(self.relu(self.fc1(x)))

In [None]:
# Gère l’attention, la normalisation et les connexions résiduelles
class TransformerLayer(nn.Module):
    def __init__(self, embed_size, num_heads, hidden_size, dropout=0.1):
        super(TransformerLayer, self).__init__()
        self.attention = MultiHeadAttention(embed_size, num_heads)
        self.norm1 = nn.LayerNorm(embed_size)
        self.norm2 = nn.LayerNorm(embed_size)
        self.feed_forward = FeedForward(embed_size, hidden_size)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, mask=None):
        attn = self.attention(x, x, x, mask)
        x = self.norm1(x + self.dropout(attn))
        ff = self.feed_forward(x)
        x = self.norm2(x + self.dropout(ff))
        return x

In [None]:
# 	Encode la séquence d’entrée avec MultiHeadAttention
class Encoder(nn.Module):
    def __init__(self, vocab_size, embed_size, num_heads, hidden_size, num_layers, max_length=5000):
        super(Encoder, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embed_size)
        self.position_encoding = nn.Parameter(torch.zeros(1, max_length, embed_size))
        self.layers = nn.ModuleList([TransformerLayer(embed_size, num_heads, hidden_size) for _ in range(num_layers)])

    def forward(self, x, mask=None):
        x = self.embedding(x) + self.position_encoding[:, :x.shape[1], :]
        for layer in self.layers:
            x = layer(x, mask)
        return x

In [None]:
# Décode la sortie en générant un texte mot par mot
class Decoder(nn.Module):
    def __init__(self, vocab_size, embed_size, num_heads, hidden_size, num_layers, max_length=5000):
        super(Decoder, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embed_size)
        self.position_encoding = nn.Parameter(torch.zeros(1, max_length, embed_size))
        self.layers = nn.ModuleList([TransformerLayer(embed_size, num_heads, hidden_size) for _ in range(num_layers)])

    def forward(self, x, memory, mask=None):
        x = self.embedding(x) + self.position_encoding[:, :x.shape[1], :]
        for layer in self.layers:
            x = layer(x, mask)
        return x

In [None]:
# Assemble l’encodeur et le décodeur en un modèle complet.
class Transformer(nn.Module):
    def __init__(self, vocab_size, embed_size, num_heads, hidden_size, num_layers, dropout=0.1):
        super(Transformer, self).__init__()
        self.encoder = Encoder(vocab_size, embed_size, num_heads, hidden_size, num_layers)
        self.decoder = Decoder(vocab_size, embed_size, num_heads, hidden_size, num_layers)
        self.fc_out = nn.Linear(embed_size, vocab_size)

    def forward(self, src, tgt, mask=None):
        memory = self.encoder(src, mask)
        output = self.decoder(tgt, memory, mask)
        return self.fc_out(output)

In [None]:
# Déclare le Transformer avec ses hyperparamètres
embed_size = 256
num_heads = 8
hidden_size = 512
num_layers = 6
dropout = 0.1
vocab_size = 5000

model = Transformer(vocab_size, embed_size, num_heads, hidden_size, num_layers, dropout)

In [45]:
# Entraînement
criterion = nn.CrossEntropyLoss()
optimizer = optim.AdamW(model.parameters(), lr=0.001)

def train_loop(model, train_loader, optimizer, criterion, epochs=10):
    model.train()
    for epoch in range(epochs):
        running_loss = 0.0
        for input_ids, target_ids in tqdm(train_loader):
            optimizer.zero_grad()
            output = model(input_ids, target_ids)
            loss = criterion(output.view(-1, vocab_size), target_ids.view(-1))
            loss.backward()
            optimizer.step()
            running_loss += loss.item()
        print(f"Epoch {epoch+1}, Loss: {running_loss / len(train_loader)}")

train_loop(model, train_loader, optimizer, criterion, epochs=30)

100%|██████████| 1148/1148 [03:44<00:00,  5.12it/s]


Epoch 1, Loss: 0.15813016205780064


100%|██████████| 1148/1148 [04:27<00:00,  4.30it/s]


Epoch 2, Loss: 0.0005537557367064856


100%|██████████| 1148/1148 [03:52<00:00,  4.94it/s]


Epoch 3, Loss: 0.00020166858415309388


100%|██████████| 1148/1148 [03:49<00:00,  5.01it/s]


Epoch 4, Loss: 9.581578054734286e-05


100%|██████████| 1148/1148 [06:02<00:00,  3.17it/s] 


Epoch 5, Loss: 4.998864932205029e-05


100%|██████████| 1148/1148 [03:45<00:00,  5.10it/s]


Epoch 6, Loss: 2.6985744522571243e-05


 51%|█████     | 582/1148 [02:03<01:59,  4.72it/s]


KeyboardInterrupt: 

In [None]:
# Utilise le Transformer entraîné pour générer une phrase en prédisant un mot à la fois.
def generate_text_transformer(model, tokenizer, start_text, max_length=50, temperature=1.0):
    model.eval()
    input_ids = torch.tensor([tokenizer.encode(start_text, out_type=int)], dtype=torch.long).unsqueeze(0)
    generated_tokens = input_ids.tolist()[0]

    with torch.no_grad():
        for _ in range(max_length):
            src = torch.tensor([generated_tokens], dtype=torch.long)
            tgt = torch.tensor([generated_tokens], dtype=torch.long)

            output = model(src, tgt)

            probabilities = torch.nn.functional.softmax(output[:, -1, :] / temperature, dim=-1)
            next_token = torch.multinomial(probabilities, num_samples=1).item()

            generated_tokens.append(next_token)

            if next_token == tokenizer.eos_id():
                break

    return tokenizer.decode(generated_tokens)

In [None]:
start_text = "The future of AI is"
print(generate_text_transformer(model, sp, start_text, temperature=1.0))