# Créer un LLM from Scratch

Ce notebook explique pas à pas comment construire un Large Language Model (LLM) de type GPT.

## Plan
1. **Tokenization** - Convertir du texte en nombres
2. **Embeddings** - Représenter les tokens dans un espace vectoriel
3. **Attention** - Le mécanisme clé des Transformers
4. **Architecture GPT** - Assembler les blocs
5. **Entraînement** - Apprendre à prédire le prochain token
6. **Génération** - Produire du texte

In [1]:
# Imports nécessaires
import torch
import torch.nn as nn
import torch.nn.functional as F
import math
from collections import defaultdict
import regex as re

# Vérifie le device disponible
if torch.cuda.is_available():
    device = torch.device("cuda")
elif torch.backends.mps.is_available():
    device = torch.device("mps")  # Mac M1/M2/M3
else:
    device = torch.device("cpu")

print(f"Device: {device}")

Device: mps


---
# 0. Téléchargement des données (Wikipedia FR)

Avant de commencer, on télécharge un corpus de texte français depuis Wikipedia.

**Options disponibles:**
- `small` : ~100MB, ~50 000 articles (recommandé pour débuter)
- `medium` : ~500MB, ~250 000 articles
- `large` : ~2GB, ~1 000 000 articles

Le téléchargement peut prendre quelques minutes.

In [None]:
import os
from datasets import load_dataset
from tqdm import tqdm

def download_wikipedia_fr(size: str = "small", output_dir: str = "data"):
    """
    Télécharge Wikipedia français.
    
    Args:
        size: "small" (~100MB), "medium" (~500MB), "large" (~2GB)
        output_dir: Dossier de sortie
    """
    n_articles = {"small": 50_000, "medium": 250_000, "large": 1_000_000}
    n = n_articles.get(size, 50_000)
    
    output_path = os.path.join(output_dir, f"wikipedia_fr_{size}.txt")
    
    # Vérifie si déjà téléchargé
    if os.path.exists(output_path):
        size_mb = os.path.getsize(output_path) / (1024 * 1024)
        print(f"✓ Données déjà téléchargées: {output_path} ({size_mb:.1f} MB)")
        return output_path
    
    print(f"Téléchargement de Wikipedia FR ({size}: {n:,} articles)...")
    print("Cela peut prendre quelques minutes...\n")
    
    # Charge le dataset en streaming
    dataset = load_dataset(
        "wikimedia/wikipedia",
        "20231101.fr",
        split="train",
        streaming=True
    )
    
    os.makedirs(output_dir, exist_ok=True)
    
    total_chars = 0
    count = 0
    with open(output_path, "w", encoding="utf-8") as f:
        for article in tqdm(dataset, total=n, desc="Téléchargement"):
            if count >= n:
                break
            
            text = article["text"]
            if len(text) < 500:  # Ignore les articles trop courts
                continue
            
            f.write(f"# {article['title']}\n\n")
            f.write(text)
            f.write("\n\n---\n\n")
            
            total_chars += len(text)
            count += 1
    
    size_mb = os.path.getsize(output_path) / (1024 * 1024)
    print(f"\n✓ Téléchargement terminé!")
    print(f"  Fichier: {output_path}")
    print(f"  Taille: {size_mb:.1f} MB")
    print(f"  Articles: {count:,}")
    
    return output_path

# Télécharge les données (change "small" en "medium" ou "large" si tu veux plus de données)
DATA_PATH = download_wikipedia_fr(size="small")

---
# 1. Tokenization (BPE)

## Pourquoi tokenizer ?

Un modèle ne comprend pas le texte directement. Il faut convertir les mots en **nombres**.

### Approches possibles :

| Méthode | Exemple "hello" | Problème |
|---------|-----------------|----------|
| Par caractère | `[h, e, l, l, o]` → `[104, 101, 108, 108, 111]` | Séquences très longues |
| Par mot | `[hello]` → `[2847]` | Vocabulaire énorme, mots inconnus |
| **BPE (sous-mots)** | `[hel, lo]` → `[892, 341]` | Bon compromis ✓ |

## Comment fonctionne BPE ?

1. Commence avec les 256 bytes comme vocabulaire de base
2. Compte les paires de tokens adjacents les plus fréquentes
3. Fusionne la paire la plus fréquente → nouveau token
4. Répète jusqu'à atteindre la taille de vocabulaire souhaitée

In [4]:
# Pattern de pré-tokenization (sépare en mots, nombres, ponctuation)
GPT2_PATTERN = r"""'(?:[sdmt]|ll|ve|re)| ?\p{L}+| ?\p{N}+| ?[^\s\p{L}\p{N}]+|\s+(?!\S)|\s+"""

class BPETokenizer:
    """
    Tokenizer Byte Pair Encoding simplifié.
    """
    
    def __init__(self, vocab_size: int = 1000):
        self.vocab_size = vocab_size
        self.merges = {}  # (token1, token2) -> nouveau_token
        self.vocab = {}   # bytes -> id
        self.inverse_vocab = {}  # id -> bytes
        
        # Tokens spéciaux
        self.special_tokens = {
            "<|pad|>": 0,
            "<|unk|>": 1, 
            "<|bos|>": 2,  # Beginning of sequence
            "<|eos|>": 3,  # End of sequence
        }
        
        self.pattern = re.compile(GPT2_PATTERN)
        self._init_base_vocab()
    
    def _init_base_vocab(self):
        """Initialise avec les 256 bytes + tokens spéciaux."""
        # Tokens spéciaux d'abord
        for token, idx in self.special_tokens.items():
            self.vocab[token.encode()] = idx
            self.inverse_vocab[idx] = token.encode()
        
        # Puis les 256 bytes possibles
        for i in range(256):
            byte = bytes([i])
            idx = i + len(self.special_tokens)
            self.vocab[byte] = idx
            self.inverse_vocab[idx] = byte
    
    def _get_stats(self, token_ids: list[list[int]]) -> dict:
        """Compte la fréquence de chaque paire adjacente."""
        stats = defaultdict(int)
        for ids in token_ids:
            for i in range(len(ids) - 1):
                pair = (ids[i], ids[i + 1])
                stats[pair] += 1
        return stats
    
    def _merge(self, token_ids: list[list[int]], pair: tuple, new_id: int) -> list[list[int]]:
        """Fusionne toutes les occurrences d'une paire."""
        new_token_ids = []
        for ids in token_ids:
            new_ids = []
            i = 0
            while i < len(ids):
                if i < len(ids) - 1 and (ids[i], ids[i + 1]) == pair:
                    new_ids.append(new_id)
                    i += 2
                else:
                    new_ids.append(ids[i])
                    i += 1
            new_token_ids.append(new_ids)
        return new_token_ids
    
    def train(self, text: str, verbose: bool = True):
        """Entraîne le tokenizer sur un corpus."""
        # Pré-tokenization
        chunks = self.pattern.findall(text)
        
        # Convertit en bytes
        token_ids = []
        for chunk in chunks:
            ids = [self.vocab[bytes([b])] for b in chunk.encode()]
            token_ids.append(ids)
        
        n_merges = self.vocab_size - len(self.vocab)
        if verbose:
            print(f"Entraînement: {n_merges} merges à effectuer")
        
        for i in range(n_merges):
            stats = self._get_stats(token_ids)
            if not stats:
                break
            
            # Trouve la paire la plus fréquente
            best_pair = max(stats, key=stats.get)
            if stats[best_pair] < 2:
                break
            
            # Crée un nouveau token
            new_id = len(self.vocab)
            self.merges[best_pair] = new_id
            
            # Concatène les bytes
            new_token = self.inverse_vocab[best_pair[0]] + self.inverse_vocab[best_pair[1]]
            self.vocab[new_token] = new_id
            self.inverse_vocab[new_id] = new_token
            
            # Applique le merge
            token_ids = self._merge(token_ids, best_pair, new_id)
            
            if verbose and (i + 1) % 100 == 0:
                print(f"  Merge {i + 1}/{n_merges}")
        
        print(f"Vocabulaire final: {len(self.vocab)} tokens")
    
    def encode(self, text: str) -> list[int]:
        """Encode du texte en IDs."""
        chunks = self.pattern.findall(text)
        all_ids = [self.special_tokens["<|bos|>"]]
        
        for chunk in chunks:
            ids = [self.vocab[bytes([b])] for b in chunk.encode()]
            # Applique les merges
            for pair, new_id in self.merges.items():
                ids = self._merge([ids], pair, new_id)[0]
            all_ids.extend(ids)
        
        all_ids.append(self.special_tokens["<|eos|>"])
        return all_ids
    
    def decode(self, ids: list[int]) -> str:
        """Décode des IDs en texte."""
        special_ids = set(self.special_tokens.values())
        byte_list = []
        for id_ in ids:
            if id_ not in special_ids and id_ in self.inverse_vocab:
                byte_list.append(self.inverse_vocab[id_])
        return b"".join(byte_list).decode("utf-8", errors="replace")
    
    def __len__(self):
        return len(self.vocab)

In [5]:
# Entraîne le tokenizer sur les données Wikipedia
print("Chargement des données Wikipedia pour le tokenizer...")

# On utilise une partie des données pour entraîner le tokenizer (plus rapide)
with open(DATA_PATH, "r", encoding="utf-8") as f:
    # Lit les premiers 10MB pour le tokenizer (suffisant pour apprendre le vocabulaire)
    texte_tokenizer = f.read(10_000_000)

print(f"Texte chargé: {len(texte_tokenizer):,} caractères")

# Entraîne le tokenizer avec un vocabulaire plus grand pour Wikipedia
tokenizer = BPETokenizer(vocab_size=4000)
tokenizer.train(texte_tokenizer)

Chargement des données Wikipedia pour le tokenizer...


NameError: name 'DATA_PATH' is not defined

In [None]:
# Test encode/decode
texte_test = "Bonjour, comment ça va ?"

encoded = tokenizer.encode(texte_test)
decoded = tokenizer.decode(encoded)

print(f"Texte original : {texte_test}")
print(f"Encodé (IDs)   : {encoded}")
print(f"Décodé         : {decoded}")
print(f"\nNombre de tokens: {len(encoded)} (vs {len(texte_test)} caractères)")

---
# 2. Embeddings

## Pourquoi des embeddings ?

Les IDs de tokens sont juste des nombres (0, 1, 2...). Le modèle a besoin de **vecteurs** pour calculer.

Un **embedding** transforme chaque ID en un vecteur de dimension `d_model` (par ex. 384).

```
Token ID: 42  →  Embedding: [0.12, -0.45, 0.78, ..., 0.33]  (384 dimensions)
```

Ces vecteurs sont **appris** pendant l'entraînement. Des mots similaires auront des vecteurs proches.

In [None]:
# Exemple simple d'embedding
vocab_size = 500
d_model = 384  # Dimension des vecteurs

# Crée une table d'embedding (matrice vocab_size x d_model)
embedding = nn.Embedding(vocab_size, d_model)

# Exemple: convertit des IDs en vecteurs
token_ids = torch.tensor([10, 42, 100])  # 3 tokens
vectors = embedding(token_ids)

print(f"Input shape  : {token_ids.shape}  (3 tokens)")
print(f"Output shape : {vectors.shape}  (3 vecteurs de {d_model} dimensions)")
print(f"\nVecteur du token 42:\n{vectors[1][:10]}...")  # Premiers éléments

---
# 3. Positional Encoding (RoPE)

## Le problème

L'attention traite tous les tokens **en parallèle**. Sans information de position, le modèle ne sait pas quel mot vient avant l'autre !

"Le chat mange la souris" vs "La souris mange le chat" → même embeddings !

## Solution: RoPE (Rotary Position Embedding)

RoPE encode la position par une **rotation** dans l'espace des vecteurs. Plus moderne et efficace que les positional encodings classiques.

In [None]:
class RotaryPositionalEmbedding(nn.Module):
    """
    RoPE: encode les positions par rotation.
    Utilisé dans LLaMA, Mistral, etc.
    """
    
    def __init__(self, dim: int, max_seq_len: int = 2048, base: float = 10000.0):
        super().__init__()
        self.dim = dim
        self.max_seq_len = max_seq_len
        
        # Calcul des fréquences de rotation
        inv_freq = 1.0 / (base ** (torch.arange(0, dim, 2).float() / dim))
        self.register_buffer("inv_freq", inv_freq)
        
        # Pré-calcul des cos et sin pour chaque position
        positions = torch.arange(max_seq_len)
        freqs = torch.outer(positions, inv_freq)
        self.register_buffer("cos_cached", freqs.cos())
        self.register_buffer("sin_cached", freqs.sin())
    
    def forward(self, seq_len: int):
        return self.cos_cached[:seq_len], self.sin_cached[:seq_len]


def apply_rotary_emb(q, k, cos, sin):
    """Applique la rotation à Q et K."""
    # Sépare en deux moitiés
    q1, q2 = q[..., ::2], q[..., 1::2]
    k1, k2 = k[..., ::2], k[..., 1::2]
    
    # Reshape pour broadcasting
    cos = cos.unsqueeze(0).unsqueeze(0)
    sin = sin.unsqueeze(0).unsqueeze(0)
    
    # Rotation: (a + bi) * (cos + i*sin) = (a*cos - b*sin) + i*(a*sin + b*cos)
    q_rot = torch.cat([q1 * cos - q2 * sin, q1 * sin + q2 * cos], dim=-1)
    k_rot = torch.cat([k1 * cos - k2 * sin, k1 * sin + k2 * cos], dim=-1)
    
    return q_rot, k_rot

In [None]:
# Visualisation des fréquences RoPE
import matplotlib.pyplot as plt

rope = RotaryPositionalEmbedding(dim=64, max_seq_len=100)
cos, sin = rope(100)

fig, axes = plt.subplots(1, 2, figsize=(12, 4))
axes[0].imshow(cos.numpy(), aspect='auto', cmap='coolwarm')
axes[0].set_title('Cosinus (positions x dimensions)')
axes[0].set_xlabel('Dimension')
axes[0].set_ylabel('Position')

axes[1].imshow(sin.numpy(), aspect='auto', cmap='coolwarm')
axes[1].set_title('Sinus (positions x dimensions)')
axes[1].set_xlabel('Dimension')
axes[1].set_ylabel('Position')

plt.tight_layout()
plt.show()

---
# 4. Self-Attention

## L'idée clé

L'attention permet à chaque token de "regarder" les autres tokens pour comprendre le contexte.

**Exemple:** Dans "Le chat dort sur le canapé"
- "dort" doit regarder "chat" pour savoir QUI dort
- "canapé" doit regarder "sur" pour comprendre la relation spatiale

## Mécanisme Q, K, V

Pour chaque token, on calcule 3 vecteurs:
- **Q (Query)**: "Que cherche ce token ?"
- **K (Key)**: "Qu'est-ce que ce token offre ?"
- **V (Value)**: "Quelle information ce token contient ?"

```
Attention(Q, K, V) = softmax(Q @ K^T / √d) @ V
```

## Attention Causale (pour GPT)

Un token ne peut voir que les tokens **précédents** (pas le futur). On utilise un masque triangulaire.

In [None]:
class MultiHeadAttention(nn.Module):
    """
    Multi-Head Self-Attention avec masque causal.
    
    "Multi-Head" = on fait plusieurs attentions en parallèle,
    chacune se spécialisant sur différents aspects.
    """
    
    def __init__(self, d_model: int, n_heads: int, max_seq_len: int = 256, dropout: float = 0.1):
        super().__init__()
        self.n_heads = n_heads
        self.d_model = d_model
        self.head_dim = d_model // n_heads
        
        assert d_model % n_heads == 0, "d_model doit être divisible par n_heads"
        
        # Projections linéaires pour Q, K, V et Output
        self.q_proj = nn.Linear(d_model, d_model, bias=False)
        self.k_proj = nn.Linear(d_model, d_model, bias=False)
        self.v_proj = nn.Linear(d_model, d_model, bias=False)
        self.o_proj = nn.Linear(d_model, d_model, bias=False)
        
        self.dropout = nn.Dropout(dropout)
        self.rope = RotaryPositionalEmbedding(self.head_dim, max_seq_len)
    
    def forward(self, x: torch.Tensor) -> torch.Tensor:
        """
        x: (batch_size, seq_len, d_model)
        """
        batch_size, seq_len, _ = x.shape
        
        # 1. Projections linéaires
        q = self.q_proj(x)
        k = self.k_proj(x)
        v = self.v_proj(x)
        
        # 2. Reshape pour multi-head: (batch, seq, n_heads, head_dim)
        q = q.view(batch_size, seq_len, self.n_heads, self.head_dim).transpose(1, 2)
        k = k.view(batch_size, seq_len, self.n_heads, self.head_dim).transpose(1, 2)
        v = v.view(batch_size, seq_len, self.n_heads, self.head_dim).transpose(1, 2)
        # Shape maintenant: (batch, n_heads, seq, head_dim)
        
        # 3. Applique RoPE (positional encoding)
        cos, sin = self.rope(seq_len)
        q, k = apply_rotary_emb(q, k, cos, sin)
        
        # 4. Calcul de l'attention: Q @ K^T / sqrt(d)
        scale = math.sqrt(self.head_dim)
        scores = torch.matmul(q, k.transpose(-2, -1)) / scale
        # Shape: (batch, n_heads, seq, seq)
        
        # 5. Masque causal (triangle inférieur)
        mask = torch.triu(torch.ones(seq_len, seq_len, device=x.device), diagonal=1).bool()
        scores = scores.masked_fill(mask, float("-inf"))
        
        # 6. Softmax et dropout
        attn_weights = F.softmax(scores, dim=-1)
        attn_weights = self.dropout(attn_weights)
        
        # 7. Attention @ V
        output = torch.matmul(attn_weights, v)
        # Shape: (batch, n_heads, seq, head_dim)
        
        # 8. Recombine les heads
        output = output.transpose(1, 2).contiguous().view(batch_size, seq_len, self.d_model)
        
        # 9. Projection de sortie
        return self.o_proj(output)

In [None]:
# Visualisation du masque causal
seq_len = 8
mask = torch.triu(torch.ones(seq_len, seq_len), diagonal=1)

plt.figure(figsize=(6, 5))
plt.imshow(1 - mask, cmap='Blues')
plt.title("Masque Causal\n(blanc = peut voir, bleu = masqué)")
plt.xlabel("Position K (clé)")
plt.ylabel("Position Q (requête)")
plt.colorbar()

# Ajoute les labels
tokens = ["Le", "chat", "dort", "sur", "le", "canapé", ".", "<eos>"]
plt.xticks(range(seq_len), tokens, rotation=45)
plt.yticks(range(seq_len), tokens)
plt.tight_layout()
plt.show()

print("Exemple: 'dort' (position 2) peut voir 'Le', 'chat', 'dort'")
print("         mais pas 'sur', 'le', 'canapé'...")

In [None]:
# Test de l'attention
d_model = 128
n_heads = 4
batch_size = 2
seq_len = 10

attn = MultiHeadAttention(d_model, n_heads)
x = torch.randn(batch_size, seq_len, d_model)

output = attn(x)
print(f"Input shape : {x.shape}")
print(f"Output shape: {output.shape}")

---
# 5. Feed-Forward Network (SwiGLU)

Après l'attention, chaque token passe par un réseau feed-forward.

**SwiGLU** est une activation moderne (utilisée dans LLaMA, PaLM):
```
SwiGLU(x) = Swish(xW1) * (xW3)
```

Plus efficace que ReLU ou GELU classique.

In [None]:
class FeedForward(nn.Module):
    """
    Feed-Forward avec activation SwiGLU.
    """
    
    def __init__(self, d_model: int, d_ff: int, dropout: float = 0.1):
        super().__init__()
        # Pour SwiGLU, on ajuste la dimension cachée
        hidden_dim = int(2 * d_ff / 3)
        hidden_dim = ((hidden_dim + 7) // 8) * 8  # Multiple de 8 (optimisation GPU)
        
        self.w1 = nn.Linear(d_model, hidden_dim, bias=False)
        self.w2 = nn.Linear(hidden_dim, d_model, bias=False)
        self.w3 = nn.Linear(d_model, hidden_dim, bias=False)
        self.dropout = nn.Dropout(dropout)
    
    def forward(self, x: torch.Tensor) -> torch.Tensor:
        # SwiGLU: swish(xW1) * (xW3)
        return self.dropout(self.w2(F.silu(self.w1(x)) * self.w3(x)))

In [None]:
# Comparaison des activations
x = torch.linspace(-3, 3, 100)

plt.figure(figsize=(10, 4))
plt.plot(x, F.relu(x), label='ReLU')
plt.plot(x, F.gelu(x), label='GELU')
plt.plot(x, F.silu(x), label='SiLU/Swish')
plt.legend()
plt.title('Fonctions d\'activation')
plt.xlabel('x')
plt.ylabel('activation(x)')
plt.grid(True, alpha=0.3)
plt.show()

---
# 6. RMSNorm

La normalisation stabilise l'entraînement. **RMSNorm** est plus simple que LayerNorm:

```
RMSNorm(x) = x / RMS(x) * γ
où RMS(x) = sqrt(mean(x²))
```

Pas besoin de soustraire la moyenne → plus rapide.

In [None]:
class RMSNorm(nn.Module):
    """Root Mean Square Layer Normalization."""
    
    def __init__(self, dim: int, eps: float = 1e-6):
        super().__init__()
        self.eps = eps
        self.weight = nn.Parameter(torch.ones(dim))  # γ (learnable)
    
    def forward(self, x: torch.Tensor) -> torch.Tensor:
        rms = torch.sqrt(torch.mean(x ** 2, dim=-1, keepdim=True) + self.eps)
        return (x / rms) * self.weight

---
# 7. Transformer Block

Un bloc Transformer combine tous les composants:

```
x → RMSNorm → Attention → + → RMSNorm → FeedForward → + → output
      ↑______________________|       ↑_________________|
           (résiduel)                   (résiduel)
```

Les connexions **résiduelles** (`+`) permettent au gradient de circuler facilement.

In [None]:
class TransformerBlock(nn.Module):
    """Un bloc Transformer complet."""
    
    def __init__(self, d_model: int, n_heads: int, d_ff: int, 
                 max_seq_len: int = 256, dropout: float = 0.1):
        super().__init__()
        self.attn_norm = RMSNorm(d_model)
        self.attn = MultiHeadAttention(d_model, n_heads, max_seq_len, dropout)
        self.ff_norm = RMSNorm(d_model)
        self.ff = FeedForward(d_model, d_ff, dropout)
    
    def forward(self, x: torch.Tensor) -> torch.Tensor:
        # Attention avec résiduel
        x = x + self.attn(self.attn_norm(x))
        # Feed-forward avec résiduel
        x = x + self.ff(self.ff_norm(x))
        return x

---
# 8. Le Modèle GPT Complet

On assemble tout:

```
Input IDs
    ↓
Token Embedding
    ↓
Transformer Block 1
    ↓
Transformer Block 2
    ↓
    ...
    ↓
Transformer Block N
    ↓
RMSNorm
    ↓
LM Head (projection vers vocabulaire)
    ↓
Logits (probabilités pour chaque token)
```

In [None]:
class GPT(nn.Module):
    """Modèle GPT complet."""
    
    def __init__(self, vocab_size: int, d_model: int = 384, n_heads: int = 6,
                 n_layers: int = 6, d_ff: int = 1536, max_seq_len: int = 256,
                 dropout: float = 0.1):
        super().__init__()
        self.vocab_size = vocab_size
        self.d_model = d_model
        self.max_seq_len = max_seq_len
        
        # Token Embedding
        self.tok_emb = nn.Embedding(vocab_size, d_model)
        
        # Transformer Blocks
        self.layers = nn.ModuleList([
            TransformerBlock(d_model, n_heads, d_ff, max_seq_len, dropout)
            for _ in range(n_layers)
        ])
        
        # Normalisation finale
        self.norm = RMSNorm(d_model)
        
        # LM Head: projette vers le vocabulaire
        self.lm_head = nn.Linear(d_model, vocab_size, bias=False)
        
        # Weight tying: partage les poids embedding <-> lm_head
        self.tok_emb.weight = self.lm_head.weight
        
        # Initialisation
        self.apply(self._init_weights)
        
        # Compte les paramètres
        n_params = sum(p.numel() for p in self.parameters())
        print(f"Paramètres: {n_params / 1e6:.2f}M")
    
    def _init_weights(self, module):
        if isinstance(module, nn.Linear):
            torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)
            if module.bias is not None:
                torch.nn.init.zeros_(module.bias)
        elif isinstance(module, nn.Embedding):
            torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)
    
    def forward(self, input_ids: torch.Tensor, labels: torch.Tensor = None):
        """
        input_ids: (batch, seq_len)
        labels: (batch, seq_len) - optionnel, pour calculer la loss
        """
        # Token embedding
        x = self.tok_emb(input_ids)
        
        # Passe à travers les blocs
        for layer in self.layers:
            x = layer(x)
        
        # Normalisation finale
        x = self.norm(x)
        
        # Projection vers vocabulaire
        logits = self.lm_head(x)
        
        # Calcul de la loss si labels fournis
        loss = None
        if labels is not None:
            # Shift: prédire le token suivant
            shift_logits = logits[..., :-1, :].contiguous()
            shift_labels = labels[..., 1:].contiguous()
            loss = F.cross_entropy(
                shift_logits.view(-1, self.vocab_size),
                shift_labels.view(-1)
            )
        
        return {"logits": logits, "loss": loss}
    
    @torch.no_grad()
    def generate(self, input_ids: torch.Tensor, max_new_tokens: int = 50,
                 temperature: float = 1.0, top_k: int = 50):
        """Génère du texte auto-régressivement."""
        self.eval()
        
        for _ in range(max_new_tokens):
            # Tronque si trop long
            idx = input_ids if input_ids.shape[1] <= self.max_seq_len else input_ids[:, -self.max_seq_len:]
            
            # Forward
            logits = self(idx)["logits"][:, -1, :]  # Dernier token
            
            # Température
            logits = logits / temperature
            
            # Top-k
            if top_k is not None:
                v, _ = torch.topk(logits, min(top_k, logits.size(-1)))
                logits[logits < v[:, [-1]]] = float("-inf")
            
            # Échantillonne
            probs = F.softmax(logits, dim=-1)
            next_token = torch.multinomial(probs, num_samples=1)
            
            # Ajoute au contexte
            input_ids = torch.cat([input_ids, next_token], dim=1)
        
        return input_ids

In [None]:
# Crée le modèle avec des paramètres adaptés à Wikipedia
model = GPT(
    vocab_size=len(tokenizer),
    d_model=384,      # Plus grand pour mieux capturer la complexité
    n_heads=6,
    n_layers=6,
    d_ff=1536,
    max_seq_len=256,
    dropout=0.1
).to(device)

# Test forward pass
test_ids = torch.randint(0, len(tokenizer), (2, 64)).to(device)
output = model(test_ids, labels=test_ids)

print(f"\nInput shape : {test_ids.shape}")
print(f"Logits shape: {output['logits'].shape}")
print(f"Loss        : {output['loss'].item():.4f}")

---
# 9. Entraînement

Le modèle apprend à **prédire le prochain token** (language modeling).

Pour chaque séquence:
```
Input:  [Le, chat, dort, sur]
Target: [chat, dort, sur, le]   (décalé de 1)
```

On minimise la **cross-entropy** entre les prédictions et les vrais tokens.

In [None]:
# Prépare les données d'entraînement depuis Wikipedia FR
print("Chargement et tokenization des données Wikipedia...")

with open(DATA_PATH, "r", encoding="utf-8") as f:
    texte_complet = f.read()

print(f"Texte chargé: {len(texte_complet):,} caractères ({len(texte_complet) / 1e6:.1f} MB)")

# Tokenize tout le texte (peut prendre un moment)
print("Tokenization en cours...")
all_tokens = tokenizer.encode(texte_complet)
print(f"Nombre total de tokens: {len(all_tokens):,}")

In [None]:
from torch.utils.data import Dataset, DataLoader
import os

# Crée le dossier checkpoints
os.makedirs("checkpoints", exist_ok=True)

class TextDataset(Dataset):
    def __init__(self, tokens: list[int], seq_len: int = 128):
        self.tokens = tokens
        self.seq_len = seq_len
    
    def __len__(self):
        return max(0, len(self.tokens) - self.seq_len)
    
    def __getitem__(self, idx):
        chunk = self.tokens[idx:idx + self.seq_len]
        return {
            "input_ids": torch.tensor(chunk, dtype=torch.long),
            "labels": torch.tensor(chunk, dtype=torch.long)
        }

# Crée le dataset et dataloader
# seq_len=128 pour des séquences plus longues (Wikipedia a des articles longs)
dataset = TextDataset(all_tokens, seq_len=128)
dataloader = DataLoader(dataset, batch_size=32, shuffle=True, num_workers=0)

print(f"Nombre de séquences: {len(dataset):,}")
print(f"Nombre de batches: {len(dataloader):,}")

In [None]:
from tqdm import tqdm

# Optimiseur
optimizer = torch.optim.AdamW(model.parameters(), lr=3e-4, weight_decay=0.1)

# Entraînement
# Note: avec Wikipedia, 1-2 epochs suffisent souvent car beaucoup de données
n_epochs = 2
losses = []

model.train()
for epoch in range(n_epochs):
    epoch_loss = 0
    pbar = tqdm(dataloader, desc=f"Epoch {epoch + 1}/{n_epochs}")
    
    for batch in pbar:
        input_ids = batch["input_ids"].to(device)
        labels = batch["labels"].to(device)
        
        # Forward
        output = model(input_ids, labels=labels)
        loss = output["loss"]
        
        # Backward
        optimizer.zero_grad()
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)  # Gradient clipping
        optimizer.step()
        
        epoch_loss += loss.item()
        losses.append(loss.item())
        pbar.set_postfix({"loss": f"{loss.item():.4f}"})
    
    avg_loss = epoch_loss / len(dataloader)
    print(f"Epoch {epoch + 1} - Loss moyenne: {avg_loss:.4f}")
    
    # Sauvegarde un checkpoint après chaque epoch
    torch.save({
        "model_state_dict": model.state_dict(),
        "optimizer_state_dict": optimizer.state_dict(),
        "epoch": epoch,
        "loss": avg_loss,
    }, f"checkpoints/model_epoch_{epoch + 1}.pt")
    print(f"  → Checkpoint sauvegardé")

In [None]:
# Visualise la loss
plt.figure(figsize=(10, 4))
plt.plot(losses)
plt.xlabel("Step")
plt.ylabel("Loss")
plt.title("Courbe d'entraînement")
plt.grid(True, alpha=0.3)
plt.show()

---
# 10. Génération de Texte

Maintenant qu'il est entraîné, le modèle peut générer du texte !

## Paramètres de génération

- **temperature**: Contrôle la "créativité"
  - < 1.0 : Plus déterministe, répétitif
  - = 1.0 : Normal
  - > 1.0 : Plus aléatoire, créatif

- **top_k**: Ne garde que les k tokens les plus probables

In [None]:
def generate_text(prompt: str, max_tokens: int = 50, temperature: float = 0.8, top_k: int = 40):
    """Génère du texte à partir d'un prompt."""
    model.eval()
    
    # Encode le prompt
    input_ids = torch.tensor([tokenizer.encode(prompt)]).to(device)
    
    # Génère
    output_ids = model.generate(
        input_ids,
        max_new_tokens=max_tokens,
        temperature=temperature,
        top_k=top_k
    )
    
    # Décode
    return tokenizer.decode(output_ids[0].tolist())

In [None]:
# Test de génération avec des prompts Wikipedia
prompts = [
    "La France est",
    "L'histoire de",
    "Paris est une ville",
    "La science permet de",
    "En 1789,",
]

for prompt in prompts:
    print(f"\n{'='*60}")
    print(f"Prompt: {prompt}")
    print(f"{'='*60}")
    generated = generate_text(prompt, max_tokens=60, temperature=0.8)
    print(generated)

In [None]:
# Expérimente avec la température
prompt = "Bonjour"

print("Effet de la température:\n")
for temp in [0.3, 0.7, 1.0, 1.5]:
    print(f"Temperature = {temp}:")
    print(generate_text(prompt, max_tokens=30, temperature=temp))
    print()

---
# Résumé

## Ce qu'on a construit:

1. **Tokenizer BPE** - Convertit texte ↔ nombres
2. **Embeddings** - Représente les tokens en vecteurs
3. **RoPE** - Encode les positions par rotation
4. **Multi-Head Attention** - Permet aux tokens de "se regarder"
5. **Feed-Forward (SwiGLU)** - Traitement non-linéaire
6. **RMSNorm** - Stabilise l'entraînement
7. **GPT** - Assemble tout en un modèle

## Pour aller plus loin:

- Entraîner sur plus de données (Wikipedia, livres, etc.)
- Augmenter la taille du modèle
- Ajouter Flash Attention pour l'efficacité
- Fine-tuning avec instruction following
- RLHF (Reinforcement Learning from Human Feedback)