# TP : Implémentation d'un petit LLM avec PyTorch

Ce TP vous guidera à travers les étapes de construction et d'entraînement d'un petit modèle de langage (LLM) en utilisant PyTorch. Vous apprendrez à :

1.  Charger et préparer des données textuelles.
2.  Implémenter une architecture Transformer simplifiée (type decoder).
3.  Entraîner le modèle sur un corpus de texte.
4.  Visualiser les cartes d'attention.
5.  Générer du texte.
6.  Implémenter une version simplifiée de RLHF pour interagir avec votre modèle.

### Partie 1 : Chargement et préparation des données

Pour ce TP, nous allons utiliser le dataset **"rayml/french_gutenberg"** disponible sur Hugging Face Datasets. Ce dataset contient un grand nombre de livres du projet Gutenberg en français. Nous allons charger une partie du dataset et effectuer un prétraitement minimal.

In [None]:
# Installation des dépendances nécessaires
!pip install torch datasets transformers tqdm matplotlib

In [None]:
# Configuration
SEQUENCE_LENGTH = 128  # Longueur de séquence désirée
BATCH_SIZE = 32

from datasets import load_dataset
import torch
from torch.utils.data import Dataset, DataLoader
from transformers import GPT2Tokenizer  # Utilisation de GPT2Tokenizer
import torch

# 1. Charger le dataset
dataset = load_dataset("rayml/french_gutenberg", split="train[:10%]")

# 2. Choisir une colonne de texte
text_column = "content"

# 3. Charger le tokenizer GPT-2
tokenizer = GPT2Tokenizer.from_pretrained('gpt2')
tokenizer.pad_token = tokenizer.eos_token # Très important pour le padding avec GPT2

# 4. Fonction de tokenisation
def tokenize_function(examples):
    # Tokeniser le texte complet sans truncation
    return tokenizer(examples[text_column], padding=False, truncation=False)

# 5. Tokeniser le dataset
tokenized_dataset = dataset.map(tokenize_function, batched=True)

# 6. Créer un Dataset PyTorch qui extrait des séquences de longueur fixe
class TextDataset(Dataset):
    def __init__(self, dataset, seq_length):
        self.dataset = dataset
        self.seq_length = seq_length
        self.stride = seq_length // 2  # Utiliser un stride égal à la moitié de la longueur de séquence
        self.total_sequences = 0
        self.cumulative_lengths = []
        
        # Calculer le nombre total de séquences possibles avec le stride modifié
        for item in dataset:
            num_sequences = max(0, (len(item['input_ids']) - seq_length) // self.stride + 1)
            self.total_sequences += num_sequences
            self.cumulative_lengths.append(self.total_sequences)

    def __len__(self):
        return self.total_sequences

    def __getitem__(self, idx):
        # Trouver dans quel texte se trouve la séquence demandée
        text_idx = 0
        while text_idx < len(self.cumulative_lengths) and idx >= self.cumulative_lengths[text_idx]:
            text_idx += 1
        
        # Calculer la position de début dans ce texte
        if text_idx == 0:
            sequence_idx = idx
        else:
            sequence_idx = idx - self.cumulative_lengths[text_idx - 1]
            
        # Calculer la position de début en tenant compte du stride
        start_pos = sequence_idx * self.stride
            
        # Extraire la séquence
        input_ids = self.dataset[text_idx]['input_ids'][start_pos:start_pos + self.seq_length]
        
        # S'assurer que la séquence a la bonne longueur (important pour la dernière séquence)
        if len(input_ids) < self.seq_length:
            # Padding pour la dernière séquence si nécessaire
            input_ids = input_ids + [tokenizer.pad_token_id] * (self.seq_length - len(input_ids))
            
        input_ids = torch.tensor(input_ids)
        
        # Créer le masque d'attention (1 pour les tokens réels, 0 pour le padding)
        attention_mask = torch.ones_like(input_ids)
        
        # Créer les labels (décalés d'une position)
        labels = input_ids[1:].clone()
        labels = torch.cat([labels, torch.tensor([tokenizer.pad_token_id])])

        return {
            'input_ids': input_ids,
            'attention_mask': attention_mask,
            'labels': labels,
        }

# 7. Créer un DataLoader PyTorch
train_dataset = TextDataset(tokenized_dataset, SEQUENCE_LENGTH)
train_dataloader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)

# Exemple d'utilisation du DataLoader
batch = next(iter(train_dataloader))
print(f"Shape des input_ids : {batch['input_ids'].shape}")
print(f"Shape du attention_mask : {batch['attention_mask'].shape}")
print(f"Shape des labels : {batch['labels'].shape}")

# Afficher 2 séquences consécutives pour vérifier le chevauchement
print("\nVérification du chevauchement entre séquences consécutives:")
seq1 = train_dataset[0]  # Première séquence
seq2 = train_dataset[1]  # Deuxième séquence (devrait avoir un chevauchement de 50%)

# Décoder les tokens en texte
text1 = tokenizer.decode(seq1['input_ids'])
text2 = tokenizer.decode(seq2['input_ids'])

print("\nSéquence 1:")
print(text1)
print("\nSéquence 2:")
print(text2)

# Vérifier la taille du chevauchement
overlap_size = SEQUENCE_LENGTH // 2
print(f"\nTaille théorique du chevauchement: {overlap_size} tokens (50% de {SEQUENCE_LENGTH})")

# Visualiser la première moitié de la séquence 2 (qui devrait correspondre à la deuxième moitié de la séquence 1)
print("\nDernière moitié de la séquence 1:")
print(tokenizer.decode(seq1['input_ids'][overlap_size:]))
print("\nPremière moitié de la séquence 2:")
print(tokenizer.decode(seq2['input_ids'][:overlap_size]))

### Partie 2 : Implémentation du modèle

Nous allons implémenter un modèle de type Transformer (decoder) en PyTorch. L'implémentation sera faite bloc par bloc, en commençant par le bloc d'attention.

#### 2.1 Bloc d'attention

Le bloc d'attention est le cœur du modèle Transformer. Il permet au modèle de pondérer l'importance des différents mots de la séquence d'entrée lors du traitement d'un mot donné.

##### 2.1.1 Projection en Q, K, V

La première étape consiste à projeter les embeddings des mots d'entrée en trois vecteurs : Query (Q), Key (K) et Value (V). Ces projections sont effectuées à l'aide de couches linéaires.

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, num_heads):
        super().__init__()
        self.d_model = d_model
        self.num_heads = num_heads
        assert d_model % num_heads == 0, "d_model doit être divisible par num_heads"

        self.d_k = d_model // num_heads  # Dimension de chaque tête
        self.W_q = nn.Linear(d_model, d_model)  # Couches linéaires pour Q, K et V
        self.W_k = nn.Linear(d_model, d_model)
        self.W_v = nn.Linear(d_model, d_model)
        self.W_o = nn.Linear(d_model, d_model)  # Couche linéaire pour la sortie

    def forward(self, x, attention_mask=None):
        """
        x: (batch_size, seq_len, d_model)
        attention_mask: (batch_size, seq_len)  (optionnel, pour le padding)
        """
        batch_size, seq_len, _ = x.size()

        # 1. Projection en Q, K, V
        q = self.W_q(x).view(batch_size, seq_len, self.num_heads, self.d_k).transpose(1, 2)  # (batch_size, num_heads, seq_len, d_k)
        k = self.W_k(x).view(batch_size, seq_len, self.num_heads, self.d_k).transpose(1, 2)  # (batch_size, num_heads, seq_len, d_k)
        v = self.W_v(x).view(batch_size, seq_len, self.num_heads, self.d_k).transpose(1, 2)  # (batch_size, num_heads, seq_len, d_k)

        # 2. Calcul de l'attention
        attention_scores = torch.matmul(q, k.transpose(-2, -1)) / (self.d_k ** 0.5)  # (batch_size, num_heads, seq_len, seq_len)

        # Masque causal pour que le modèle n'attende pas sur le futur - Créé dynamiquement selon la séquence
        causal_mask = torch.tril(torch.ones(seq_len, seq_len)).view(1, 1, seq_len, seq_len).to(x.device)
        attention_scores = attention_scores.masked_fill(causal_mask == 0, float('-inf'))

        # Appliquer le masque de padding si fourni
        if attention_mask is not None:
            # Reformater le masque pour qu'il soit compatible avec la forme des scores d'attention
            # attention_mask: (batch_size, seq_len) -> (batch_size, 1, 1, seq_len)
            padding_mask = attention_mask.unsqueeze(1).unsqueeze(2)
            # Créer un masque complet pour tous les tokens cibles
            # Pour chaque position source : si le token est masqué, toutes les positions cibles verront son attention masquée
            padding_mask = padding_mask.expand(-1, -1, seq_len, -1)
            attention_scores = attention_scores.masked_fill(padding_mask == 0, float('-inf'))

        attention_probs = F.softmax(attention_scores, dim=-1)  # (batch_size, num_heads, seq_len, seq_len)

        # 3. Pondération des valeurs
        attended_values = torch.matmul(attention_probs, v)  # (batch_size, num_heads, seq_len, d_k)
        attended_values = attended_values.transpose(1, 2).contiguous().view(batch_size, seq_len, self.d_model)  # (batch_size, seq_len, d_model)

        # 4. Projection de sortie
        output = self.W_o(attended_values)  # (batch_size, seq_len, d_model)

        return output, attention_probs # Retourner aussi les probabilités d'attention pour la visualisation

##### 2.1.2 Calcul de l'attention

L'attention est calculée en effectuant un produit scalaire entre les vecteurs Q et K, en appliquant une fonction softmax et en pondérant les vecteurs V en conséquence. On utilise aussi un masque causal.

In [None]:
class MLP(nn.Module):
    def __init__(self, d_model, d_ff):
        super().__init__()
        self.fc1 = nn.Linear(d_model, d_ff)
        self.gelu = nn.GELU()  # Utiliser GELU au lieu de ReLU
        self.fc2 = nn.Linear(d_ff, d_model)

    def forward(self, x):
        return self.fc2(self.gelu(self.fc1(x)))

#### 2.3 Tête de classification

La tête de classification est une simple couche linéaire qui projette la sortie du modèle vers l'espace des tokens du vocabulaire.

In [None]:
class Head(nn.Module):
    def __init__(self, d_model, vocab_size):
        super().__init__()
        self.fc = nn.Linear(d_model, vocab_size)

    def forward(self, x):
        return self.fc(x)

#### 2.4 Modèle complet

Nous pouvons maintenant assembler les différents blocs pour construire le modèle complet.

In [None]:
class Transformer(nn.Module):
    def __init__(self, vocab_size, d_model, num_heads, num_layers, d_ff, max_seq_len):
        super().__init__()
        self.token_embedding = nn.Embedding(vocab_size, d_model)
        self.position_embedding = nn.Embedding(max_seq_len, d_model)
        self.layers = nn.ModuleList([layer for _ in range(num_layers) for layer in [MultiHeadAttention(d_model, num_heads), MLP(d_model, d_ff)]])
        self.head = Head(d_model, vocab_size)
        self.max_seq_len = max_seq_len
        self.d_model = d_model

        self.register_buffer('position_ids', torch.arange(max_seq_len).expand((1, -1)))

    def forward(self, idx, attention_mask=None):
        """
        idx: (batch_size, seq_len)
        attention_mask: (batch_size, seq_len)
        """
        batch_size, seq_len = idx.size()
        assert seq_len <= self.max_seq_len, f"Sequence length ({seq_len}) exceeds maximum sequence length ({self.max_seq_len})"

        # 1. Embeddings
        token_embeddings = self.token_embedding(idx)  # (batch_size, seq_len, d_model)
        position_embeddings = self.position_embedding(self.position_ids[:, :seq_len])  # (1, seq_len, d_model) -> (batch_size, seq_len, d_model)
        x = token_embeddings + position_embeddings  # (batch_size, seq_len, d_model)

        # 2. Blocs Transformer
        all_attention_probs = [] # Pour stocker les poids d'attention de chaque couche
        for layer in self.layers:
            if isinstance(layer, MultiHeadAttention):
                x, attention_probs = layer(x, attention_mask) # Récupérer les poids d'attention
                all_attention_probs.append(attention_probs)
            else:
                x = layer(x) # Si c'est un MLP, on ne récupère pas les poids

        # 3. Tête de classification
        logits = self.head(x)  # (batch_size, seq_len, vocab_size)

        return logits, all_attention_probs

### Partie 3 : Entraînement du modèle

Nous allons maintenant instancier le modèle, définir la fonction de perte et l'optimiseur, et lancer l'entraînement.

In [None]:
import torch.optim as optim
from tqdm import tqdm
import matplotlib.pyplot as plt

# 1. Instancier le modèle
vocab_size = len(tokenizer.get_vocab())
d_model = 256  # Dimension de l'embedding et des hidden states
num_heads = 4
num_layers = 2
d_ff = 512  # Dimension de la couche feedforward
max_seq_len = 128

model = Transformer(vocab_size, d_model, num_heads, num_layers, d_ff, max_seq_len)

# 2. Définir la fonction de perte et l'optimiseur
criterion = nn.CrossEntropyLoss(ignore_index=tokenizer.pad_token_id) # Ignorer le padding dans le calcul de la loss
optimizer = optim.AdamW(model.parameters(), lr=1e-3)

# 3. Définir le nombre d'epochs
num_epochs = 10
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

# 4. Boucle d'entraînement
best_loss = float('inf')
train_losses = []
for epoch in range(num_epochs):
    model.train()
    total_loss = 0
    progress_bar = tqdm(train_dataloader, desc=f'Epoch {epoch+1}/{num_epochs}', leave=False) # Ajouter une barre de progression
    for batch in progress_bar:
        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)
        labels = batch['labels'].to(device)

        optimizer.zero_grad()
        outputs, _ = model(input_ids, attention_mask) # Ne pas oublier de passer le attention_mask
        # Les logits ont la forme (batch_size, seq_len, vocab_size)
        # La loss CrossEntropy attend (batch_size * seq_len, vocab_size) et (batch_size * seq_len)
        loss = criterion(outputs.view(-1, vocab_size), labels.view(-1))
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
        progress_bar.set_postfix({'loss': loss.item()}) # Mise à jour de la loss dans la barre de progression

    avg_loss = total_loss / len(train_dataloader)
    train_losses.append(avg_loss)
    print(f"Epoch {epoch+1}/{num_epochs}, Loss: {avg_loss:.4f}")
    
    # Sauvegarder le meilleur modèle
    if avg_loss < best_loss:
        best_loss = avg_loss
        torch.save({
            'model_state_dict': model.state_dict(),
            'optimizer_state_dict': optimizer.state_dict(),
            'epoch': epoch,
            'loss': avg_loss,
            'vocab_size': vocab_size,
            'd_model': d_model,
            'num_heads': num_heads,
            'num_layers': num_layers,
            'd_ff': d_ff,
            'max_seq_len': max_seq_len,
        }, 'best_model_checkpoint.pth')
        print(f"Meilleur modèle sauvegardé avec une loss de {avg_loss:.4f}")
    
    # Sauvegarde régulière
    if (epoch + 1) % 5 == 0:
        torch.save({
            'model_state_dict': model.state_dict(),
            'optimizer_state_dict': optimizer.state_dict(),
            'epoch': epoch,
            'loss': avg_loss,
            'vocab_size': vocab_size,
            'd_model': d_model,
            'num_heads': num_heads,
            'num_layers': num_layers,
            'd_ff': d_ff,
            'max_seq_len': max_seq_len,
        }, f'model_checkpoint_epoch_{epoch+1}.pth')

# Fonction pour charger un modèle sauvegardé
def load_model(checkpoint_path, device):
    checkpoint = torch.load(checkpoint_path, map_location=device)
    
    # Recréer le modèle avec les mêmes hyperparamètres
    model = Transformer(
        checkpoint['vocab_size'],
        checkpoint['d_model'],
        checkpoint['num_heads'],
        checkpoint['num_layers'],
        checkpoint['d_ff'],
        checkpoint['max_seq_len']
    )
    
    # Charger les poids
    model.load_state_dict(checkpoint['model_state_dict'])
    model.to(device)
    
    return model

# 5. Visualiser la loss
plt.plot(train_losses)
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Training Loss')
plt.show()

### Explications

* **Instanciation du modèle :** Nous instancions le modèle `Transformer` avec les hyperparamètres choisis. Nous récupérons la taille du vocabulaire à partir du tokenizer.

* **Fonction de perte et optimiseur :** Nous utilisons la `CrossEntropyLoss` et l'optimiseur `AdamW`. Il est important d'ignorer le padding lors du calcul de la loss.

* **Boucle d'entraînement :**

    * Nous entraînons le modèle pendant le nombre d'epochs spécifié.

    * Nous utilisons une boucle `tqdm` pour afficher une barre de progression pendant l'entraînement.

    * Pour chaque batch :

        * Nous déplaçons les données sur le GPU si disponible.

        * Nous effectuons une passe forward, calculons la loss, effectuons la rétropropagation et mettons à jour les poids.

        * Nous stockons la loss pour la visualiser plus tard.

* **Visualisation de la loss :** Nous utilisons Matplotlib pour tracer la courbe de la loss d'entraînement.

N'oubliez pas de lancer le code dans un environnement avec PyTorch installé. La loss devrait décroître au fil des epochs, indiquant que le modèle apprend.

### Partie 4 : Visualisation des cartes d'attention

Maintenant que notre modèle est entraîné, nous allons visualiser les cartes d'attention pour mieux comprendre comment il traite l'information. Cette visualisation nous permettra de voir quels mots le modèle considère comme importants lorsqu'il prédit le mot suivant.

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

# Fonction pour visualiser les cartes d'attention
def plot_attention_maps(model, text, tokenizer, device):
    # Tokeniser le texte
    tokens = tokenizer.tokenize(text)
    token_ids = tokenizer.encode(text, return_tensors='pt').to(device)
    
    # Créer un masque d'attention
    attention_mask = torch.ones_like(token_ids)
    
    # Obtenir les probabilités d'attention
    model.eval()
    with torch.no_grad():
        _, attention_probs = model(token_ids, attention_mask)
    
    # Convertir les tokens en texte lisible
    token_texts = [tokenizer.decode([id]) for id in token_ids[0]]
    
    # Afficher les cartes d'attention pour chaque couche et chaque tête
    num_layers = len(attention_probs) // 2  # Diviser par 2 car chaque couche contient une attention et un MLP
    
    fig, axs = plt.subplots(num_layers, model.layers[0].num_heads, 
                           figsize=(model.layers[0].num_heads * 3, num_layers * 3))
    
    if num_layers == 1:
        axs = np.array([axs])  # Convertir en tableau 2D pour indexation uniforme
    
    layer_idx = 0
    for i, layer_attn in enumerate(attention_probs):
        # Les attention_probs sont dans les couches d'attention (pas dans les MLP)
        for head in range(model.layers[0].num_heads):
            # Récupérer les scores d'attention pour cette tête
            attn = layer_attn[0, head, :len(token_texts), :len(token_texts)].cpu().numpy()
            
            # Créer un heatmap
            sns.heatmap(attn, 
                      cmap='viridis', 
                      xticklabels=token_texts,
                      yticklabels=token_texts,
                      ax=axs[layer_idx, head])
            
            axs[layer_idx, head].set_title(f'Couche {layer_idx+1}, Tête {head+1}')
            plt.setp(axs[layer_idx, head].get_xticklabels(), rotation=45, ha="right", rotation_mode="anchor")
        
        layer_idx += 1
    
    plt.tight_layout()
    plt.show()

# Exemple de phrase à analyser
test_text = "Le petit chat noir dort sur le canapé."

# Visualiser les cartes d'attention
plot_attention_maps(model, test_text, tokenizer, device)

Les cartes d'attention montrent comment chaque mot est lié aux autres mots dans la phrase. Plus la couleur est vive, plus l'attention est forte. Voici quelques observations intéressantes à rechercher :

1. **Motifs diagonaux** : Indiquent une attention sur le mot lui-même ou ses voisins immédiats.
2. **Motifs verticaux** : Montrent quels mots sont les plus influents dans la phrase.
3. **Spécialisation des têtes** : Certaines têtes peuvent se spécialiser dans les relations grammaticales, d'autres dans les relations sémantiques.
4. **Différences entre couches** : Les couches inférieures capturent souvent des informations syntaxiques, tandis que les couches supérieures captent des informations plus sémantiques.

Vous pouvez essayer avec différentes phrases pour voir comment le modèle réagit à différents types de texte.

### Partie 5 : Génération de texte

Maintenant que notre modèle est entraîné, nous allons l'utiliser pour générer du texte. La génération se fait de manière auto-régressive, c'est-à-dire que nous utilisons les prédictions précédentes comme entrée pour les prédictions suivantes.

In [None]:
def generate_text(model, tokenizer, prompt, max_length=100, temperature=1.0, 
                 top_k=0, greedy=False, device='cpu'):
    """
    Génère du texte à partir d'une amorce (prompt).
    
    Args:
        model: Le modèle entraîné
        tokenizer: Le tokenizer utilisé
        prompt: L'amorce (texte initial)
        max_length: Nombre maximum de tokens à générer
        temperature: Contrôle la randomisation (1.0 = normal, <1.0 = plus conservateur, >1.0 = plus aléatoire)
        top_k: Si >0, limite le choix aux top_k tokens les plus probables
        greedy: Si True, utilise une sélection déterministe (toujours le token le plus probable)
        device: Le device (cpu ou cuda)
    
    Returns:
        Le texte généré
    """
    model.eval()  # Mode évaluation
    
    # Tokenisation de l'amorce
    input_ids = tokenizer.encode(prompt, return_tensors='pt').to(device)
    
    # Position initiale (longueur de l'amorce)
    cur_len = input_ids.shape[1]
    
    # Générer jusqu'à atteindre max_length
    for _ in range(max_length):
        # Créer un masque d'attention (tout à 1)
        attention_mask = torch.ones(1, cur_len, device=device)
        
        # Obtenir les prédictions du modèle
        with torch.no_grad():
            outputs, _ = model(input_ids, attention_mask)
        
        # On ne s'intéresse qu'à la prédiction pour le dernier token
        next_token_logits = outputs[0, -1, :]
        
        # Appliquer température
        if temperature != 1.0:
            next_token_logits = next_token_logits / temperature
        
        # Stratégie de génération
        if greedy:
            # Greedy decoding (prendre le token le plus probable)
            next_token = torch.argmax(next_token_logits, dim=-1).unsqueeze(0)
        else:
            # Sampling avec ou sans top-k
            if top_k > 0:
                # Garder uniquement les top_k tokens les plus probables
                indices_to_remove = torch.topk(next_token_logits, top_k)[1]
                mask = torch.ones_like(next_token_logits, dtype=torch.bool)
                mask[indices_to_remove] = False
                next_token_logits[mask] = float('-inf')
            
            # Appliquer softmax pour obtenir une distribution de probabilités
            probs = torch.nn.functional.softmax(next_token_logits, dim=-1)
            
            # Échantillonner un token selon cette distribution
            next_token = torch.multinomial(probs, num_samples=1)
        
        # Ajouter le token généré à l'entrée
        input_ids = torch.cat([input_ids, next_token.unsqueeze(0)], dim=1)
        cur_len += 1
        
        # Arrêter si on génère un token de fin de séquence
        if next_token.item() == tokenizer.eos_token_id:
            break
    
    # Décoder les tokens en texte
    generated_text = tokenizer.decode(input_ids[0], skip_special_tokens=True)
    
    return generated_text

# Amorce littéraire
prompt = "Il était une fois, dans une forêt sombre et mystérieuse,"

# Différentes stratégies de génération
print("Génération greedy (déterministe):")
greedy_text = generate_text(model, tokenizer, prompt, max_length=50, greedy=True, device=device)
print(greedy_text)
print("\n" + "-"*50 + "\n")

print("Génération avec sampling (temperature=0.7):")
sampling_text = generate_text(model, tokenizer, prompt, max_length=50, temperature=0.7, device=device)
print(sampling_text)
print("\n" + "-"*50 + "\n")

print("Génération avec top-k sampling (k=40, temperature=0.9):")
topk_text = generate_text(model, tokenizer, prompt, max_length=50, temperature=0.9, top_k=40, device=device)
print(topk_text)

Vous pouvez expérimenter avec différentes amorces et paramètres de génération :

1. **Temperature** : 
   - Valeur basse (ex. 0.3) : texte plus prévisible et répétitif
   - Valeur haute (ex. 1.5) : texte plus aléatoire et créatif

2. **Top-k** : 
   - Valeur basse (ex. 10) : limite fortement les choix possibles
   - Valeur haute (ex. 50) : permet plus de diversité

3. **Amorces** :
   - Essayez différents débuts d'histoire
   - Variez le style (conte, roman, poésie, etc.)
   - Posez des questions pour voir comment le modèle y répond

Plus vous avez entraîné longtemps votre modèle, plus la qualité du texte généré sera élevée. Il est également intéressant de comparer les différentes stratégies de génération sur la même amorce pour voir les variations dans le style et la cohérence du texte produit.

### Partie 6 : Interagir avec le modèle grâce à un simple RLHF

Dans cette partie, nous allons implémenter une version très simplifiée du RLHF (Reinforcement Learning from Human Feedback) pour permettre une interaction avec notre modèle et l'améliorer en fonction de nos retours. Cette technique est à la base des modèles comme ChatGPT et Claude.

In [None]:
import numpy as np
from IPython.display import clear_output
import time

class SimpleRLHF:
    def __init__(self, model, tokenizer, device, learning_rate=1e-5):
        self.model = model
        self.tokenizer = tokenizer
        self.device = device
        self.optimizer = optim.AdamW(model.parameters(), lr=learning_rate)
        self.conversations = []  # Pour stocker l'historique des conversations
        self.rewards = []  # Pour stocker les récompenses
        
    def generate_response(self, prompt, max_length=50, temperature=0.7, top_k=40):
        """Génère une réponse à partir d'un prompt"""
        return generate_text(self.model, self.tokenizer, prompt, 
                            max_length=max_length, 
                            temperature=temperature,
                            top_k=top_k, 
                            device=self.device)
    
    def update_model(self, prompt, response, reward):
        """Met à jour le modèle en fonction de la récompense"""
        # Tokeniser l'entrée et la sortie
        input_ids = self.tokenizer.encode(prompt, return_tensors='pt').to(self.device)
        target_ids = self.tokenizer.encode(response, return_tensors='pt').to(self.device)
        
        # Créer des masques d'attention
        input_mask = torch.ones_like(input_ids)
        
        # Forward pass
        self.model.train()
        self.optimizer.zero_grad()
        
        # Calculer les logits pour l'entrée
        logits, _ = self.model(input_ids, input_mask)
        
        # Créer une target avec les probabilités de reward plus élevées pour les tokens de la réponse
        # Version simplifiée: augmenter ou diminuer les probabilités selon la récompense
        reward_factor = reward  # Entre -1 et 1
        
        # Préparer les labels
        labels = torch.zeros(logits.shape[0], logits.shape[1], logits.shape[2]).to(self.device)
        
        # Encourager ou décourager les tokens de la réponse selon la récompense
        for i in range(min(target_ids.shape[1], logits.shape[1])):
            if i < target_ids.shape[1]:
                target_idx = target_ids[0, i].item()
                labels[0, i, target_idx] = 1.0 * (1 + reward_factor)
        
        # Calculer la loss (KL divergence simplifiée)
        loss = -torch.sum(labels * torch.log_softmax(logits, dim=-1))
        
        # Rétropropagation
        loss.backward()
        self.optimizer.step()
        
        return loss.item()
    
    def chat(self, initial_prompt="Bonjour, comment puis-je vous aider?", turns=5):
        """Interface de chat interactive avec feedback utilisateur"""
        conversation = [initial_prompt]
        print(f"Modèle: {initial_prompt}")
        
        for i in range(turns):
            # Obtenir l'entrée utilisateur
            user_input = input("Vous: ")
            conversation.append(user_input)
            
            # Générer une réponse
            full_prompt = " ".join(conversation)
            response = self.generate_response(full_prompt)
            
            # Extraire seulement la partie générée (après la dernière entrée utilisateur)
            response_only = response[len(full_prompt):]
            
            # Afficher la réponse
            print(f"Modèle: {response_only}")
            conversation.append(response_only)
            
            # Demander un feedback
            while True:
                feedback = input("Votre feedback (-1 à +1) où +1 est excellent: ")
                try:
                    reward = float(feedback)
                    if -1 <= reward <= 1:
                        break
                    else:
                        print("Veuillez entrer une valeur entre -1 et 1")
                except ValueError:
                    print("Veuillez entrer un nombre valide")
            
            # Mettre à jour le modèle avec le feedback
            loss = self.update_model(full_prompt, response_only, reward)
            
            # Stocker pour analyse
            self.conversations.append((full_prompt, response_only))
            self.rewards.append(reward)
            
            print(f"Modèle mis à jour. Loss: {loss:.4f}")
            
        # Sauvegarder le modèle après la session de RLHF
        torch.save({
            'model_state_dict': self.model.state_dict(),
            'optimizer_state_dict': self.optimizer.state_dict(),
            'vocab_size': self.model.head.fc.out_features,
            'd_model': self.model.d_model,
            'num_heads': self.model.layers[0].num_heads,
            'num_layers': len([l for l in self.model.layers if isinstance(l, MultiHeadAttention)]),
            'd_ff': self.model.layers[1].fc1.out_features,
            'max_seq_len': self.model.max_seq_len,
        }, 'rlhf_tuned_model.pth')
        
        print("Session de chat terminée et modèle sauvegardé.")
        
    def plot_rewards(self):
        """Affiche un graphique des récompenses obtenues"""
        plt.figure(figsize=(10, 5))
        plt.plot(self.rewards, marker='o')
        plt.axhline(y=0, color='r', linestyle='-', alpha=0.3)
        plt.title('Évolution des récompenses')
        plt.xlabel('Interaction')
        plt.ylabel('Récompense')
        plt.grid(True)
        plt.show()

# Création de l'instance RLHF
rlhf_trainer = SimpleRLHF(model, tokenizer, device)

# Démarrer une session de chat interactive
print("Commençons une conversation. Donnez un feedback après chaque réponse du modèle.")
rlhf_trainer.chat(initial_prompt="Bonjour! Je suis un assistant littéraire. Comment puis-je vous aider aujourd'hui?")

# Visualiser l'évolution des récompenses
rlhf_trainer.plot_rewards()

Dans cette implémentation simplifiée du RLHF :

1. Nous avons créé une classe `SimpleRLHF` qui permet d'interagir avec notre modèle.
2. À chaque tour de conversation, l'utilisateur donne une note entre -1 et +1 pour évaluer la réponse du modèle.
3. Le modèle est mis à jour en fonction de cette récompense, en renforçant les tokens qui ont conduit à des réponses bien notées et en affaiblissant ceux des réponses mal notées.
4. L'historique des conversations et des récompenses est enregistré, et nous pouvons visualiser l'évolution des performances du modèle.

Ce processus est une version très simplifiée du RLHF utilisé pour former des modèles comme ChatGPT, mais il illustre le principe fondamental : les modèles peuvent apprendre de nos retours pour s'améliorer progressivement.

Essayez de discuter avec votre modèle et observez s'il s'améliore au fur et à mesure des interactions !

### Conclusion

Dans ce TP, nous avons :
1. Chargé et préparé un corpus de texte français
2. Implémenté une architecture Transformer simplifiée
3. Entraîné notre modèle sur le corpus
4. Visualisé les mécanismes d'attention
5. Utilisé le modèle pour générer du texte
6. Implémenté une version simplifiée de RLHF pour interagir avec notre modèle

Ce petit LLM est bien sûr très simple comparé aux modèles comme GPT-3/4 ou LLaMA, mais il illustre les principes fondamentaux qui régissent ces modèles plus avancés. Pour aller plus loin, vous pourriez :

- Augmenter la taille du modèle (couches, têtes d'attention, dimension d'embedding)
- Entraîner sur un corpus plus large
- Améliorer l'algorithme de RLHF avec une fonction de récompense plus sophistiquée
- Ajouter des mécanismes comme l'instruction-tuning