<a href="https://colab.research.google.com/github/chegmarco1989/AdminLTE/blob/master/Mini_GPT.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from pathlib import Path
import re
from typing import List, Dict, Tuple
import requests


#choisir le corpus entre LaFontaine et Shakespeare en commentant la partie de code non voulue
"""
def load_corpus():
    url = "https://www.gutenberg.org/files/56327/56327-0.txt"
    text = requests.get(url).text
    # Récupérer uniquement le texte entre les balises de début / fin
    start = text.find("M DCCC LXVIII")+len("M DCCC LXVIII")+1
    end = text.find("TABLE DES FABLES")
    if start == -1 or end == -1:
        cleaned = text  # fallback : on garde tout
    else:
        cleaned = text[start:end]

    # Mise en minuscules
    cleaned = cleaned.lower()

    # Suppression des annotations entre crochets (ex. [illustration], [16])
    cleaned = re.sub(r"\[[^\]]*\]", " ", cleaned)

    # Remplacement des retours‑ligne par des espaces simples
    cleaned = re.sub(r"\s+", " ", cleaned).strip()

    return cleaned
    return text[start:end].lower()
"""



def load_corpus() -> str:
    url = "https://raw.githubusercontent.com/karpathy/char-rnn/master/data/tinyshakespeare/input.txt"
    text = requests.get(url, timeout=30).text
    return text.strip()



def build_vocab(corpus: str) -> Tuple[List[str], Dict[str, int], Dict[int, str]]:
    """Construit le vocabulaire et les tables d'indexation."""
    vocab = sorted(set(corpus))
    stoi = {ch: i for i, ch in enumerate(vocab)}  # string → int
    itos = {i: ch for ch, i in stoi.items()}      # int → string
    return vocab, stoi, itos


def encode(text: str, stoi: Dict[str, int]) -> List[int]:
    """Encode une chaîne en liste d'indices."""
    return [stoi[ch] for ch in text]


def decode(indices: List[int], itos: Dict[int, str]) -> str:
    """Décode une liste d'indices en chaîne."""
    return "".join(itos[i] for i in indices)


if __name__ == "__main__":
    corpus = load_corpus()
    vocab, stoi, itos = build_vocab(corpus)
    encoded = encode(corpus, stoi)
    print("Aperçu du corpus :", corpus[:50], "…")
    print("Taille du corpus  :", len(corpus))
    print("Taille du vocab   :", len(vocab))


Aperçu du corpus : First Citizen:
Before we proceed any further, hear …
Taille du corpus  : 1115393
Taille du vocab   : 65


In [None]:
from collections import Counter, defaultdict
from typing import List, Dict, Tuple
import re


class SimpleBPE:
    """Tokenizer BPE simplifié (caractère → sous-mots)."""

    def __init__(self):
        self.stoi: Dict[str, int] = {}
        self.itos: Dict[int, str] = {}
        self.vocab: List[str] = []
        self.merges: List[Tuple[str, str]] = []  # historique des fusions
        # Token spécial de fin de mot
        self.EOW = "</w>"

    def _word_to_symbols(self, word: str) -> Tuple[str, ...]:
        """Découpe un mot en symboles (caractères) + token de fin."""
        return tuple(list(word) + [self.EOW])

    def _get_stats(self, corpus_sym: List[Tuple[str, ...]]) -> Counter:
        """Compte la fréquence des paires de symboles dans le corpus."""
        pairs = Counter()
        for symbols in corpus_sym:
            for i in range(len(symbols) - 1):
                pairs[(symbols[i], symbols[i + 1])] += 1
        return pairs

    def train(self, corpus: str, vocab_size: int = 200) -> None:
        """Apprend les fusions BPE jusqu'à atteindre la taille de vocabulaire."""
        # Nettoyage basique : minuscules + espaces simples
        corpus = re.sub(r"\s+", " ", corpus.lower().strip())
        words = corpus.split(" ")
        # Corpus sous forme de tuples de symboles
        corpus_sym = [self._word_to_symbols(w) for w in words if w]

        # Vocab initial : tous les symboles uniques
        symbol_counter = Counter(s for sym in corpus_sym for s in sym)
        self.vocab = sorted(symbol_counter)

        def merge_pair(sym_seq: Tuple[str, ...], pair: Tuple[str, str]) -> Tuple[str, ...]:
            """Fusionne pair dans la séquence."""
            merged = []
            i = 0
            while i < len(sym_seq):
                if i < len(sym_seq) - 1 and (sym_seq[i], sym_seq[i + 1]) == pair:
                    merged.append(sym_seq[i] + sym_seq[i + 1])
                    i += 2
                else:
                    merged.append(sym_seq[i])
                    i += 1
            return tuple(merged)

        # Boucle principale
        while len(self.vocab) < vocab_size:
            stats = self._get_stats(corpus_sym)
            if not stats:
                break  # plus rien à fusionner
            best_pair = stats.most_common(1)[0][0]
            self.merges.append(best_pair)
            # Mettre à jour corpus
            corpus_sym = [merge_pair(sym, best_pair) for sym in corpus_sym]
            # Mettre à jour vocab
            new_symbol = best_pair[0] + best_pair[1]
            self.vocab.append(new_symbol)

        # Construire tables stoi/itos définitives
        self.stoi = {sym: i for i, sym in enumerate(self.vocab)}
        self.itos = {i: sym for sym, i in self.stoi.items()}

    def _apply_merges(self, word: str) -> List[str]:
        symbols = list(word) + [self.EOW]
        for a, b in self.merges:
            i = 0
            while i < len(symbols) - 1:
                if symbols[i] == a and symbols[i + 1] == b:
                    symbols[i : i + 2] = [a + b]
                else:
                    i += 1
        return symbols

    def encode(self, text: str) -> List[int]:
        """Encode une chaîne en liste d'indices (BPE)."""
        tokens: List[int] = []
        for word in re.sub(r"\s+", " ", text.lower().strip()).split(" "):
            for sym in self._apply_merges(word):
                if sym not in self.stoi:
                    # Si symbole OOV, le décomposer caractère par caractère
                    for ch in list(sym):
                        tokens.append(self.stoi.get(ch, self.stoi[self.EOW]))
                else:
                    tokens.append(self.stoi[sym])
        return tokens

    def decode(self, ids: List[int]) -> str:
        """Décode une liste d'indices en texte (sans les tokens </w>)."""
        words: List[str] = []
        current = []
        for idx in ids:
            sym = self.itos.get(idx, "?")
            if sym == self.EOW:
                words.append("".join(current))
                current = []
            elif sym.endswith(self.EOW):
                current.append(sym[:-len(self.EOW)])
                words.append("".join(current))
                current = []
            else:
                current.append(sym)
        if current:
            words.append("".join(current))
        return " ".join(words).strip()


    def vocab_size(self) -> int:
        return len(self.vocab)

    def __len__(self):
        return self.vocab_size()


In [None]:
corpus = load_corpus()
tok = SimpleBPE()
tok.train(corpus, vocab_size=120)

ids = tok.encode("Le chat dort")
print(ids, tok.decode(ids))


[86, 99, 19, 73, 22, 106, 73] le chat dort


In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

def _generate_square_subsequent_mask(t: int, device: torch.device) -> torch.Tensor:
    return torch.triu(torch.ones((t, t), dtype=torch.bool, device=device), diagonal=1)


class TransformerBlock(nn.Module):
    def __init__(self, d_model: int, n_heads: int, dropout: float = 0.1) -> None:
        super().__init__()
        self.attn = nn.MultiheadAttention(d_model, n_heads, batch_first=True)
        self.ff = nn.Sequential(
            nn.Linear(d_model, 4 * d_model),
            nn.ReLU(),
            nn.Linear(4 * d_model, d_model),
        )
        self.ln1 = nn.LayerNorm(d_model)
        self.ln2 = nn.LayerNorm(d_model)
        self.drop = nn.Dropout(dropout)

    def forward(self, x: torch.Tensor, attn_mask: torch.Tensor) -> torch.Tensor:
        # Self‑attention
        attn_out, _ = self.attn(x, x, x, attn_mask=attn_mask, need_weights=False)
        x = self.ln1(x + self.drop(attn_out))
        # Feed‑forward
        ff_out = self.ff(x)
        x = self.ln2(x + self.drop(ff_out))
        return x


class MiniTransformer(nn.Module):
    """Transformer jouet auto‑régresseur avec `n_layers` blocs."""

    def __init__(
        self,
        vocab_size: int,
        d_model: int = 128,
        n_heads: int = 4,
        n_layers: int = 2,
        max_seq_len: int = 64,
        dropout: float = 0.1,
    ) -> None:
        super().__init__()
        self.max_seq_len = max_seq_len

        # Embeddings
        self.tok_emb = nn.Embedding(vocab_size, d_model)
        self.pos_emb = nn.Embedding(max_seq_len, d_model)

        # Pile de blocs Transformer
        self.blocks = nn.ModuleList(
            [TransformerBlock(d_model, n_heads, dropout) for _ in range(n_layers)]
        )

        # Projection finale
        self.head = nn.Linear(d_model, vocab_size, bias=False)

    def forward(self, idx: torch.Tensor) -> torch.Tensor:
        """idx : (B, T) → logits : (B, T, vocab_size)"""
        B, T = idx.shape
        if T > self.max_seq_len:
            raise ValueError(
                f"Sequence length {T} exceeds max_seq_len {self.max_seq_len}"
            )

        # Embedding
        token_emb = self.tok_emb(idx)
        pos = torch.arange(T, device=idx.device).unsqueeze(0)
        x = token_emb + self.pos_emb(pos)

        # Masque causal
        attn_mask = _generate_square_subsequent_mask(T, idx.device)

        # Empiler les blocs
        for blk in self.blocks:
            x = blk(x, attn_mask)

        # Projection finale
        logits = self.head(x)
        return logits

In [None]:
import random
from typing import Tuple

def build_dataset(
    corpus: str,
    tokenizer: SimpleBPE,
    train_ratio: float = 0.9,
) -> Tuple[torch.Tensor, torch.Tensor]:
    """Encode le corpus complet puis renvoie (train_ids, val_ids)."""
    ids = torch.tensor(tokenizer.encode(corpus), dtype=torch.long)
    split_idx = int(len(ids) * train_ratio)
    return ids[:split_idx], ids[split_idx:]


def get_batch(
    data: torch.Tensor,
    batch_size: int,
    block_size: int,
    device: torch.device,
) -> Tuple[torch.Tensor, torch.Tensor]:
    """Renvoie un lot (X, Y) aléatoire de forme (B, T)."""
    if len(data) - block_size - 1 < 0:
        raise ValueError(f"Data length ({len(data)}) is too small for block_size ({block_size})")
    idx = torch.randint(0, len(data) - block_size - 1, (batch_size,))
    x = torch.stack([data[i : i + block_size] for i in idx]).to(device)
    y = torch.stack([data[i + 1 : i + block_size + 1] for i in idx]).to(device)
    return x, y


BATCH_SIZE = 32        # Nombre de séquences par batch
BLOCK_SIZE = 64        # Longueur de séquence (tokens) vue par le modèle
ITERS       = 3000     # Itérations d'entraînement (ajustez si besoin)
EVAL_INT    = 200      # Fréquence d'évaluation
LR          = 1e-3     # Taux d'apprentissage


print("🗂️  Chargement du corpus…")
corpus = load_corpus()

print("🧩 Entraînement du tokenizer BPE…")
tok = SimpleBPE()
tok.train(corpus, vocab_size=1000)

print("🪄 Encodage du corpus → ids…")
train_ids, val_ids = build_dataset(corpus, tok)

print("📏 Longueur jeu train :", len(train_ids), "tokens")
print("📏 Longueur jeu valid :", len(val_ids), "tokens")



🗂️  Chargement du corpus…
🧩 Entraînement du tokenizer BPE…
🪄 Encodage du corpus → ids…
📏 Longueur jeu train : 326548 tokens
📏 Longueur jeu valid : 36284 tokens


In [None]:
print("🧠 Instanciation du modèle…")
model = MiniTransformer(vocab_size=len(tok),n_heads=8)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)
print(f"📦  Paramètres du modèle : {sum(p.numel() for p in model.parameters())}")

optimizer = torch.optim.AdamW(model.parameters(), lr=LR)

def estimate_loss() -> Tuple[float, float]:
    """Calcule la perte moyenne sur train et val (sans grad)."""
    model.eval()
    losses = {}
    with torch.no_grad():
        for split, data in ("train", train_ids), ("val", val_ids):
            loss_sum = 0.0
            iters = 50  # nombre de mini‑lots pour l'estimation
            for _ in range(iters):
                x, y = get_batch(data, BATCH_SIZE, BLOCK_SIZE, device)
                logits = model(x)
                loss = F.cross_entropy(logits.view(-1, logits.size(-1)), y.view(-1))
                loss_sum += loss.item()
            losses[split] = loss_sum / iters
    model.train()
    return losses["train"], losses["val"]

🧠 Instanciation du modèle…
📦  Paramètres du modèle : 660736


In [None]:
print("🚀 Début entraînement…")
for step in range(1, ITERS + 1):
    xb, yb = get_batch(train_ids, BATCH_SIZE, BLOCK_SIZE, device)

    logits = model(xb)
    loss = F.cross_entropy(logits.view(-1, logits.size(-1)), yb.view(-1))

    optimizer.zero_grad(set_to_none=True)
    loss.backward()
    optimizer.step()

    if step % EVAL_INT == 0 or step == 1:
        train_loss, val_loss = estimate_loss()
        print(
            f"Step {step:4d}/{ITERS}  |  Train loss {train_loss:.3f}  |  Val loss {val_loss:.3f}"
        )

🚀 Début entraînement…
Step    1/3000  |  Train loss 6.998  |  Val loss 6.997
Step  200/3000  |  Train loss 5.267  |  Val loss 5.493


KeyboardInterrupt: 

In [None]:
def generate(
    model: MiniTransformer,
    prompt: str,
    max_new_tokens: int = 50,
    temperature: float = 1.0,
) -> str:
    model.eval()
    idx = torch.tensor(tok.encode(prompt), dtype=torch.long, device=device).unsqueeze(0)  # (1, T)
    with torch.no_grad():
        for _ in range(max_new_tokens):
            idx_cond = idx[:, -BLOCK_SIZE:]
            logits = model(idx_cond)
            next_logits = logits[:, -1, :] / temperature  # ← température ici
            probs = torch.softmax(next_logits, dim=-1)
            next_id = torch.multinomial(probs, num_samples=1)
            idx = torch.cat([idx, next_id], dim=1)
    return tok.decode(idx[0].tolist())

In [None]:
prompt = "I am"
print("\n📝 Génération après entraînement :")
print(generate(model, prompt, max_new_tokens=50, temperature=0.8))


📝 Génération après entraînement :
i am no man in a sicion; but i should to do so i change to patiest with you: and shall be your honour. if your son-shoulders and thrive it to your france the
