Transformer (From Scratch)

In [1]:
# gere les structures de donnees de bases, le calcul du gradient automatique
import torch
# contient les blocs de construction des RN, couches pre-construites
import torch.nn as nn
# contient les algos d'optimisation
import torch.optim as optim
# pour creer les dataloaders (gere les flux de donnees)
import torch.utils.data as data
# contient les fonctions mathematiques
import math
# pour copier les onjets
import copy

In [2]:
class AttentionMultiTetes(nn.Module):
    def __init__(self, d_modele, nb_tetes):
        super(AttentionMultiTetes, self).__init__()
        # S'assurer que la dimension du modèle (d_modele) est divisible par le nombre de têtes
        assert d_modele % nb_tetes == 0, "d_modele doit être divisible par nb_tetes"

        # Initialisation des dimensions
        self.d_modele = d_modele # Dimension du modèle
        self.nb_tetes = nb_tetes # Nombre de têtes d'attention
        self.d_k = d_modele // nb_tetes # Dimension des clés, requêtes et valeurs de chaque tête

        # Couches linéaires pour la transformation des entrées
        self.W_q = nn.Linear(d_modele, d_modele) # Transformation des Requêtes (Query)
        self.W_k = nn.Linear(d_modele, d_modele) # Transformation des Clés (Key)
        self.W_v = nn.Linear(d_modele, d_modele) # Transformation des Valeurs (Value)
        self.W_o = nn.Linear(d_modele, d_modele) # Transformation de Sortie (Output)

    def attention_produit_scalaire_normalisee(self, Q, K, V, masque=None):
        # Calcul des scores d'attention
        scores_attn = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(self.d_k)

        # Appliquer le masque si fourni (utile pour ignorer le rembourrage/padding)
        if masque is not None:
            scores_attn = scores_attn.masked_fill(masque == 0, -1e9)

        # Application du Softmax pour obtenir les probabilités d'attention
        probs_attn = torch.softmax(scores_attn, dim=-1)

        # Multiplier par les valeurs pour obtenir la sortie finale
        sortie = torch.matmul(probs_attn, V)
        return sortie

    def separer_tetes(self, x):
        # Redimensionner l'entrée pour avoir plusieurs têtes d'attention
        taille_batch, long_seq, d_modele = x.size()
        return x.view(taille_batch, long_seq, self.nb_tetes, self.d_k).transpose(1, 2)

    def combiner_tetes(self, x):
        # Combiner les têtes multiples pour revenir à la forme originale
        taille_batch, _, long_seq, d_k = x.size()
        return x.transpose(1, 2).contiguous().view(taille_batch, long_seq, self.d_modele)

    def forward(self, Q, K, V, masque=None):
        # Appliquer les transformations linéaires et séparer les têtes
        Q = self.separer_tetes(self.W_q(Q))
        K = self.separer_tetes(self.W_k(K))
        V = self.separer_tetes(self.W_v(V))

        # Effectuer l'attention par produit scalaire normalisée
        sortie_attn = self.attention_produit_scalaire_normalisee(Q, K, V, masque)

        # Combiner les têtes et appliquer la transformation de sortie finale
        sortie = self.W_o(self.combiner_tetes(sortie_attn))
        return sortie

In [3]:
class ReseauNeuronesPositionnel(nn.Module):
    def __init__(self, d_modele, d_ff):
        super(ReseauNeuronesPositionnel, self).__init__()
        # Première couche linéaire (expansion de la dimension)
        self.fc1 = nn.Linear(d_modele, d_ff)
        # Seconde couche linéaire (retour à la dimension originale du modèle)
        self.fc2 = nn.Linear(d_ff, d_modele)
        # Fonction d'activation non-linéaire
        self.relu = nn.ReLU()

    def forward(self, x):
        # Passage dans la première couche, activation ReLU, puis seconde couche
        return self.fc2(self.relu(self.fc1(x)))

In [4]:
class EncodagePositionnel(nn.Module):
    def __init__(self, d_modele, long_max_seq):
        super(EncodagePositionnel, self).__init__()

        # Création d'une matrice d'encodage positionnel (pe) remplie de zéros
        pe = torch.zeros(long_max_seq, d_modele)

        # Création d'un vecteur de positions (0, 1, 2, ..., long_max_seq)
        position = torch.arange(0, long_max_seq, dtype=torch.float).unsqueeze(1)

        # Calcul du terme de division pour les fréquences sinus et cosinus
        terme_div = torch.exp(torch.arange(0, d_modele, 2).float() * -(math.log(10000.0) / d_modele))

        # Application de la fonction sinus aux indices pairs (0, 2, 4...)
        pe[:, 0::2] = torch.sin(position * terme_div)

        # Application de la fonction cosinus aux indices impairs (1, 3, 5...)
        pe[:, 1::2] = torch.cos(position * terme_div)

        # Enregistrer 'pe' comme un buffer (ne sera pas considéré comme un paramètre entraînable)
        self.register_buffer('pe', pe.unsqueeze(0))

    def forward(self, x):
        # Ajoute l'encodage positionnel aux plongements (embeddings) d'entrée
        return x + self.pe[:, :x.size(1)]

In [5]:
class CoucheEncodeur(nn.Module):
    def __init__(self, d_modele, nb_tetes, d_ff, dropout):
        super(CoucheEncodeur, self).__init__()
        # Auto-attention multi-têtes
        self.auto_attn = AttentionMultiTetes(d_modele, nb_tetes) ### ici
        # Réseau de neurones positionnel (Feed-Forward)
        self.reseau_positionnel = ReseauNeuronesPositionnel(d_modele, d_ff)
        # Couches de normalisation
        self.norm1 = nn.LayerNorm(d_modele)
        self.norm2 = nn.LayerNorm(d_modele)
        # Couche de dropout pour la régularisation
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, masque):
        # Étape 1 : Auto-attention et connexion résiduelle suivie d'une normalisation
        sortie_attn = self.auto_attn(x, x, x, masque)
        x = self.norm1(x + self.dropout(sortie_attn))

        # Étape 2 : Réseau Feed-Forward et connexion résiduelle suivie d'une normalisation
        sortie_ff = self.reseau_positionnel(x)
        x = self.norm2(x + self.dropout(sortie_ff))

        return x

In [6]:
class CoucheDecodeur(nn.Module):
    def __init__(self, d_modele, nb_tetes, d_ff, dropout):
        super(CoucheDecodeur, self).__init__()
        # Auto-attention pour les tokens de la cible (déjà générés)
        self.auto_attn = AttentionMultiTetes(d_modele, nb_tetes)
        # Attention croisée pour regarder la sortie de l'encodeur
        self.attn_croisee = AttentionMultiTetes(d_modele, nb_tetes)
        # Réseau de neurones positionnel (Feed-Forward)
        self.reseau_positionnel = ReseauNeuronesPositionnel(d_modele, d_ff)

        # Couches de normalisation
        self.norm1 = nn.LayerNorm(d_modele)
        self.norm2 = nn.LayerNorm(d_modele)
        self.norm3 = nn.LayerNorm(d_modele)
        # Couche de dropout pour la régularisation
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, sortie_encodeur, masque_src, masque_tgt):
        # Étape 1 : Auto-attention sur la cible avec masque (pour ne pas voir le futur)
        sortie_auto_attn = self.auto_attn(x, x, x, masque_tgt)
        x = self.norm1(x + self.dropout(sortie_auto_attn))

        # Étape 2 : Attention croisée (Requête vient du décodeur, Clé/Valeur de l'encodeur)
        sortie_attn_croisee = self.attn_croisee(x, sortie_encodeur, sortie_encodeur, masque_src)
        x = self.norm2(x + self.dropout(sortie_attn_croisee))

        # Étape 3 : Réseau Feed-Forward
        sortie_ff = self.reseau_positionnel(x)
        x = self.norm3(x + self.dropout(sortie_ff))

        return x

In [7]:
class Transformer(nn.Module):
    def __init__(self, taille_vocab_src, taille_vocab_tgt, d_modele, nb_tetes, nb_couches, d_ff, long_max_seq, dropout):
        super(Transformer, self).__init__()
        # Couches de plongement (embeddings) pour la source et la cible
        self.plongement_encodeur = nn.Embedding(taille_vocab_src, d_modele)
        self.plongement_decodeur = nn.Embedding(taille_vocab_tgt, d_modele)
        # Module d'encodage positionnel
        self.encodage_positionnel = EncodagePositionnel(d_modele, long_max_seq)

        # Listes de couches pour l'encodeur et le décodeur
        self.couches_encodeur = nn.ModuleList([CoucheEncodeur(d_modele, nb_tetes, d_ff, dropout) for _ in range(nb_couches)])
        self.couches_decodeur = nn.ModuleList([CoucheDecodeur(d_modele, nb_tetes, d_ff, dropout) for _ in range(nb_couches)])

        # Couche linéaire finale pour la prédiction des mots
        self.fc = nn.Linear(d_modele, taille_vocab_tgt)
        self.dropout = nn.Dropout(dropout)

    def generer_masque(self, src, tgt):
        # Masque pour ignorer les jetons de rembourrage (padding) dans la source
        masque_src = (src != 0).unsqueeze(1).unsqueeze(2)
        # Masque pour ignorer le rembourrage dans la cible
        masque_tgt = (tgt != 0).unsqueeze(1).unsqueeze(3)

        # Masque triangulaire pour empêcher le décodeur de regarder les mots futurs
        long_seq = tgt.size(1)
        masque_causal = (1 - torch.triu(torch.ones(1, long_seq, long_seq), diagonal=1)).bool()
        masque_tgt = masque_tgt & masque_causal

        return masque_src, masque_tgt

    def forward(self, src, tgt):
        # 1. Génération des masques
        masque_src, masque_tgt = self.generer_masque(src, tgt)

        # 2. Préparation des entrées (Embedding + Encodage Positionnel)
        src_embedded = self.dropout(self.encodage_positionnel(self.plongement_encodeur(src)))
        tgt_embedded = self.dropout(self.encodage_positionnel(self.plongement_decodeur(tgt)))

        # 3. Passage à travers les couches de l'encodeur
        sortie_encodeur = src_embedded
        for couche_enc in self.couches_encodeur:
            sortie_encodeur = couche_enc(sortie_encodeur, masque_src)

        # 4. Passage à travers les couches du décodeur
        sortie_decodeur = tgt_embedded
        for couche_dec in self.couches_decodeur:
            sortie_decodeur = couche_dec(sortie_decodeur, sortie_encodeur, masque_src, masque_tgt)

        # 5. Projection finale vers le vocabulaire de sortie
        output = self.fc(sortie_decodeur)
        return output

In [9]:
# Configuration des hyperparamètres
taille_vocab_src = 5000
taille_vocab_tgt = 5000
d_modele = 512
nb_tetes = 8
nb_couches = 6
d_ff = 2048
long_max_seq = 5000  # Augmenté pour tester les limites du modèle
dropout = 0.1

# Initialisation du modèle avec les noms de variables traduits
transformer = Transformer(
    taille_vocab_src,
    taille_vocab_tgt,
    d_modele,
    nb_tetes,
    nb_couches,
    d_ff,
    long_max_seq,
    dropout
)

# Génération de données d'exemple aléatoires
# src_donnees : (taille_batch, long_sequence)
taille_batch = 64
src_donnees = torch.randint(1, taille_vocab_src, (taille_batch, long_max_seq))
tgt_donnees = torch.randint(1, taille_vocab_tgt, (taille_batch, long_max_seq))

In [13]:
import time
import matplotlib.pyplot as plt

def benchmark_transformer_performances(modele, device="cuda"):
    tailles_sequence = [128, 256, 512, 1024, 1536, 2048]
    temps_execution = []
    memoire_allouee = []

    modele.to(device)
    modele.eval()

    print(f"Début du benchmark sur {device}...")

    for taille in tailles_sequence:
        # Création de données fictives (Batch de 1)
        src = torch.randint(1, 5000, (1, taille)).to(device)
        tgt = torch.randint(1, 5000, (1, taille)).to(device)

        torch.cuda.synchronize() # Attendre que le GPU soit prêt
        torch.cuda.empty_cache()
        mem_initiale = torch.cuda.memory_allocated(device) / (1024**2)

        debut = time.time()
        try:
            with torch.no_grad():
                _ = modele(src, tgt[:, :-1])

            torch.cuda.synchronize()
            fin = time.time()

            mem_finale = torch.cuda.max_memory_allocated(device) / (1024**2)

            temps_execution.append(fin - debut)
            memoire_allouee.append(mem_finale - mem_initiale)
            print(f"Taille {taille} : {fin-debut:.4f}s | {mem_finale:.2f} MB")

        except RuntimeError: # En cas de Out of Memory (OOM)
            print(f"Taille {taille} : ÉCHEC (Mémoire insuffisante)")
            temps_execution.append(None)
            memoire_allouee.append(None)
            break

    return tailles_sequence, temps_execution, memoire_allouee

# Exécution
tailles, temps, memoire = benchmark_transformer_performances(transformer, device="gpu")

RuntimeError: Expected one of cpu, cuda, ipu, xpu, mkldnn, opengl, opencl, ideep, hip, ve, fpga, maia, xla, lazy, vulkan, mps, meta, hpu, mtia, privateuseone device type at start of device string: gpu

In [None]:
def tracer_graphiques(tailles, temps, memoire):
    fig, ax1 = plt.subplots(figsize=(10, 6))

    # Courbe du temps
    color = 'tab:red'
    ax1.set_xlabel('Longueur de la séquence (n)')
    ax1.set_ylabel('Temps d\'exécution (s)', color=color)
    ax1.plot(tailles[:len(temps)], temps, marker='o', color=color, label="Temps (Quadratique)")
    ax1.tick_params(axis='y', labelcolor=color)

    # Courbe de la mémoire
    ax2 = ax1.twinx()
    color = 'tab:blue'
    ax2.set_ylabel('Mémoire GPU (MB)', color=color)
    ax2.plot(tailles[:len(memoire)], memoire, marker='s', color=color, label="Mémoire (Quadratique)")
    ax2.tick_params(axis='y', labelcolor=color)

    plt.title("L'impact de la complexité quadratique n²")
    plt.grid(True, linestyle='--')
    plt.show()

tracer_graphiques(tailles, temps, memoire)

In [None]:
# Définition de la fonction de perte (entropie croisée)
# On ignore l'index 0 qui correspond généralement au rembourrage (padding)
criterion = nn.CrossEntropyLoss(ignore_index=0)

# Configuration de l'optimiseur Adam avec les paramètres du papier original
optimiser = optim.Adam(transformer.parameters(), lr=0.0001, betas=(0.9, 0.98), eps=1e-9)

# Passage du modèle en mode entraînement
transformer.train()

for epoch in range(100):
    # Réinitialisation des gradients (indispensable à chaque étape)
    optimiser.zero_grad()

    # Passage en avant (Forward pass) : on donne la source et la cible (décalée d'un mot)
    # tgt_donnees[:, :-1] signifie qu'on donne tout sauf le dernier mot
    sortie = transformer(src_donnees, tgt_donnees[:, :-1])

    # Calcul de la perte : on compare la prédiction avec la cible attendue (décalée à droite)
    # view(-1) permet d'aplatir les tenseurs pour le calcul de l'entropie croisée
    perte = criterion(sortie.contiguous().view(-1, taille_vocab_tgt),
                   tgt_donnees[:, 1:].contiguous().view(-1))

    # Rétropropagation de l'erreur (Calcul des gradients)
    perte.backward()

    # Mise à jour des poids du Transformer
    optimiser.step()

    # Affichage de la perte pour suivre la progression
    print(f"Époque: {epoch+1}, Perte: {perte.item():.4f}")

In [None]:
# Passage du modèle en mode évaluation (désactive le dropout)
transformer.eval()

# Génération de données de validation aléatoires
# val_src_donnees : données sources de validation
# val_tgt_donnees : données cibles de validation
taille_batch = 64
val_src_donnees = torch.randint(1, taille_vocab_src, (taille_batch, long_max_seq))
val_tgt_donnees = torch.randint(1, taille_vocab_tgt, (taille_batch, long_max_seq))

# Désactivation du calcul des gradients (économie de mémoire et de calcul)
with torch.no_grad():
    # Passage en avant sur les données de validation
    val_sortie = transformer(val_src_donnees, val_tgt_donnees[:, :-1])

    # Calcul de la perte de validation
    perte_val = criterion(val_sortie.contiguous().view(-1, taille_vocab_tgt),
                        val_tgt_donnees[:, 1:].contiguous().view(-1))

    print(f"Perte de Validation : {perte_val.item():.4f}")

Nous allons utiliser un tokeniser pré-entraîné (celui de GPT-2 ou BERT) pour gagner du temps sur la gestion du vocabulaire

In [None]:
from transformers import AutoTokenizer

# Utilisation du tokeniser de GPT-2
nom_tokeniser = "gpt2"
tokeniser = AutoTokenizer.from_pretrained(nom_tokeniser)

# Ajout d'un jeton de rembourrage (padding) car GPT-2 n'en a pas par défaut
tokeniser.pad_token = tokeniser.eos_token

In [None]:
# environ 100 mots
document_court = """
L'intelligence artificielle transforme radicalement notre façon de travailler.
Les modèles de langage, comme les Transformers, permettent aujourd'hui de
résumer des textes, de traduire des langues et de générer du code.
Cependant, ces modèles font face à un défi majeur : la gestion des documents
très longs à cause de leur complexité mathématique quadratique.
"""

# Encodage du texte
entrees = tokeniser(document_court, return_tensors="pt", padding=True, truncation=True, max_length=long_max_seq)
src_doc = entrees['input_ids'] # Notre source pour le Transformer

In [None]:
# On simule une cible (le début d'un résumé par exemple)
cible_fictive = torch.randint(1, taille_vocab_tgt, (1, 10))

transformer.eval()
with torch.no_grad():
    # Passage du vrai document dans le modèle
    prediction = transformer(src_doc, cible_fictive)

print(f"Forme de l'entrée : {src_doc.shape}") # (1, longueur_du_texte)
print(f"Forme de la sortie : {prediction.shape}")

In [None]:
document_long = "cava comee cai joe amdje oa"

In [None]:
# 1. On s'assure que le vocabulaire correspond au tokeniser
taille_vocab = len(tokeniser) # Généralement 50257 pour GPT-2

# 2. On réinitialise le modèle correctement
transformer = Transformer(
    taille_vocab_src=taille_vocab,
    taille_vocab_tgt=taille_vocab,
    d_modele=512,
    nb_tetes=8,
    nb_couches=6,
    d_ff=2048,
    long_max_seq=10000, # Doit être > à ton document
    dropout=0.1
)

# 3. On encode ton document long
# Imaginons que 'document_long' est ta variable texte
entrees = tokeniser(document_long, return_tensors="pt", truncation=True, max_length=10000)
src_doc = entrees['input_ids']

# 4. Pour le test de complexité, on simule une cible de résumé de 100 mots
cible_resume = torch.randint(1, taille_vocab, (1, 100))

# 5. Appel
prediction = transformer(src_doc, cible_resume)