# TP NLP ‚Äî Seq2Seq (LSTM) : Addition symbolique (caract√®re par caract√®re) ‚Äî Master IA

Ce notebook propose un **TP complet** pour entra√Æner un mod√®le **Seq2Seq** (encodeur‚Äìd√©codeur) √† r√©aliser une **addition** √©crite sous forme de cha√Æne.

---

## üéØ Int√©r√™t p√©dagogique (pourquoi ce probl√®me ?)

L'addition symbolique est un **probl√®me jouet riche**, car il force le mod√®le √† apprendre :
- un mapping **s√©quence ‚Üí s√©quence** avec **longueurs variables**,
- la notion de **g√©n√©ration auto-r√©gressive** (un caract√®re apr√®s l‚Äôautre),
- une forme de **raisonnement algorithmique** (retenues/carry),
- les limites du Seq2Seq **sans attention** (goulot d‚Äô√©tranglement),
- l‚Äôimpact de **teacher forcing** et de la **g√©n√©ralisation** (vers des nombres plus longs).

> Contrairement √† l‚Äôinversion, ici on ne fait pas ‚Äúque‚Äù r√©ordonner :  
> le mod√®le doit apprendre une **r√®gle de calcul** et **propager une retenue**.

---

## üß© Formulation du probl√®me

Entr√©e (source) : une cha√Æne du type  
`"123+45"`  

Sortie (cible) : la somme sous forme de cha√Æne  
`"168"`

On travaille **au niveau caract√®re** (char-level), ce qui permet :
- un vocabulaire tr√®s petit (digits + symboles),
- une visualisation claire des erreurs,
- un apprentissage progressif.

---

## ‚úÖ Objectifs du TP

√Ä la fin du TP, l‚Äô√©tudiant sera capable de :
1. G√©n√©rer un dataset synth√©tique d‚Äôadditions.
2. Construire un vocabulaire de caract√®res avec `PAD`, `SOS`, `EOS`.
3. Impl√©menter un Seq2Seq LSTM (encodeur + d√©codeur).
4. Entra√Æner avec teacher forcing.
5. √âvaluer avec **exact match** (r√©sultat enti√®rement correct).
6. √âtudier la g√©n√©ralisation en augmentant la longueur des nombres.

---

## üìå D√©roulement du TP (plan)

1. Param√®tres + vocabulaire
2. G√©n√©ration des donn√©es (a+b)
3. Dataset / DataLoader / padding
4. Mod√®les : Encoder / Decoder / Seq2Seq
5. Entra√Ænement + courbes
6. Inference greedy + exemples
7. Tests de g√©n√©ralisation (longueurs)



In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader

import numpy as np
import random
import math
import matplotlib.pyplot as plt

SEED = 42
random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device


---

## 1) Param√®tres du TP


In [None]:
# --- Donn√©es ---
MAX_DIGITS = 3          # a et b auront entre 1 et MAX_DIGITS chiffres
TRAIN_SIZE = 12000
VALID_SIZE = 1500
TEST_SIZE  = 1500

BATCH_SIZE = 64

# --- Mod√®le ---
EMBED_DIM  = 64
HIDDEN_DIM = 128
NUM_LAYERS = 1
DROPOUT    = 0.0

EPOCHS = 15
LR = 1e-3

TEACHER_FORCING_RATIO = 0.7


---

## 2) Vocabulaire caract√®re par caract√®re


In [None]:
SPECIALS = ["<PAD>", "<SOS>", "<EOS>"]
CHARS = list("0123456789+")
itos = SPECIALS + CHARS
stoi = {ch:i for i,ch in enumerate(itos)}

PAD = stoi["<PAD>"]
SOS = stoi["<SOS>"]
EOS = stoi["<EOS>"]

VOCAB_SIZE = len(itos)

itos, VOCAB_SIZE, PAD, SOS, EOS


---

## 3) G√©n√©ration des paires (source, cible)


In [None]:
def sample_addition(max_digits=MAX_DIGITS):
    a = random.randint(0, 10**max_digits - 1)
    b = random.randint(0, 10**max_digits - 1)
    src = f"{a}+{b}"
    tgt = f"{a+b}"
    return src, tgt

for _ in range(5):
    print(sample_addition())


---

## 4) Encodage en ids + padding + DataLoader


In [None]:
def encode_string(s):
    return [stoi[ch] for ch in s]

def pad_batch(seqs, pad_value=PAD):
    max_len = max(len(s) for s in seqs)
    out = []
    lengths = []
    for s in seqs:
        lengths.append(len(s))
        out.append(s + [pad_value]*(max_len - len(s)))
    return torch.tensor(out, dtype=torch.long), torch.tensor(lengths, dtype=torch.long)

class AdditionDataset(Dataset):
    def __init__(self, n, max_digits=MAX_DIGITS):
        self.samples = [sample_addition(max_digits) for _ in range(n)]
    def __len__(self):
        return len(self.samples)
    def __getitem__(self, idx):
        return self.samples[idx]

def collate_fn(batch):
    src_strs = [b[0] for b in batch]
    tgt_strs = [b[1] for b in batch]

    src_ids = [encode_string(s) for s in src_strs]
    tgt_ids = [encode_string(s) for s in tgt_strs]

    tgt_in_ids  = [[SOS] + t for t in tgt_ids]
    tgt_out_ids = [t + [EOS] for t in tgt_ids]

    src, src_len = pad_batch(src_ids, PAD)
    tgt_in, _ = pad_batch(tgt_in_ids, PAD)
    tgt_out, _ = pad_batch(tgt_out_ids, PAD)

    return src, src_len, tgt_in, tgt_out

train_ds = AdditionDataset(TRAIN_SIZE, MAX_DIGITS)
valid_ds = AdditionDataset(VALID_SIZE, MAX_DIGITS)
test_ds  = AdditionDataset(TEST_SIZE,  MAX_DIGITS)

train_loader = DataLoader(train_ds, batch_size=BATCH_SIZE, shuffle=True, collate_fn=collate_fn)
valid_loader = DataLoader(valid_ds, batch_size=BATCH_SIZE, shuffle=False, collate_fn=collate_fn)
test_loader  = DataLoader(test_ds,  batch_size=BATCH_SIZE, shuffle=False, collate_fn=collate_fn)

src, src_len, tgt_in, tgt_out = next(iter(train_loader))
src.shape, tgt_in.shape, tgt_out.shape


---

## 5) Mod√®les : Encodeur / D√©codeur LSTM


In [None]:
class Encoder(nn.Module):
    def __init__(self, vocab_size, embed_dim, hidden_dim, num_layers=1, dropout=0.0):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, embed_dim, padding_idx=PAD)
        self.lstm = nn.LSTM(
            embed_dim, hidden_dim, num_layers=num_layers,
            batch_first=True, dropout=dropout if num_layers > 1 else 0.0
        )
    def forward(self, src):
        emb = self.embedding(src)
        _, (h, c) = self.lstm(emb)
        return h, c

class Decoder(nn.Module):
    def __init__(self, vocab_size, embed_dim, hidden_dim, num_layers=1, dropout=0.0):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, embed_dim, padding_idx=PAD)
        self.lstm = nn.LSTM(
            embed_dim, hidden_dim, num_layers=num_layers,
            batch_first=True, dropout=dropout if num_layers > 1 else 0.0
        )
        self.fc = nn.Linear(hidden_dim, vocab_size)

    def forward(self, x, h, c):
        emb = self.embedding(x)
        out, (h, c) = self.lstm(emb, (h, c))
        logits = self.fc(out)
        return logits, h, c


---

## 6) Seq2Seq + Teacher Forcing


In [None]:
class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder):
        super().__init__()
        self.encoder = encoder
        self.decoder = decoder

    def forward(self, src, tgt_in, teacher_forcing_ratio=0.7):
        B, Ttgt = tgt_in.shape
        V = self.decoder.fc.out_features

        h, c = self.encoder(src)

        logits_all = torch.zeros(B, Ttgt, V, device=src.device)

        input_tok = tgt_in[:, 0].unsqueeze(1)  # SOS

        for t in range(Ttgt):
            step_logits, h, c = self.decoder(input_tok, h, c)
            logits_all[:, t:t+1, :] = step_logits

            pred_tok = step_logits.argmax(-1)

            if t + 1 < Ttgt:
                use_tf = random.random() < teacher_forcing_ratio
                input_tok = tgt_in[:, t+1].unsqueeze(1) if use_tf else pred_tok

        return logits_all


---

## 7) Loss + m√©triques


In [None]:
criterion = nn.CrossEntropyLoss(ignore_index=PAD)

def token_accuracy(logits, targets, pad_idx=PAD):
    pred = logits.argmax(-1)
    mask = targets != pad_idx
    correct = (pred == targets) & mask
    return correct.sum().item() / mask.sum().item()

def exact_match(logits, targets, pad_idx=PAD):
    pred = logits.argmax(-1).detach().cpu().numpy()
    gold = targets.detach().cpu().numpy()
    B = gold.shape[0]
    ok = 0
    for i in range(B):
        g = [t for t in gold[i].tolist() if t != pad_idx]
        p = [t for t in pred[i].tolist() if t != pad_idx]
        ok += int(p == g)
    return ok / B


---

## 8) Entra√Ænement / √âvaluation


In [None]:
def train_one_epoch(model, loader, optimizer, criterion, tf_ratio=0.7):
    model.train()
    total_loss, total_tokacc, total_em = 0.0, 0.0, 0.0

    for src, src_len, tgt_in, tgt_out in loader:
        src = src.to(device)
        tgt_in = tgt_in.to(device)
        tgt_out = tgt_out.to(device)

        optimizer.zero_grad()
        logits = model(src, tgt_in, teacher_forcing_ratio=tf_ratio)

        B, T, V = logits.shape
        loss = criterion(logits.reshape(B*T, V), tgt_out.reshape(B*T))

        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
        optimizer.step()

        total_loss += loss.item()
        total_tokacc += token_accuracy(logits, tgt_out)
        total_em += exact_match(logits, tgt_out)

    n = len(loader)
    return total_loss/n, total_tokacc/n, total_em/n

@torch.no_grad()
def evaluate(model, loader, criterion):
    model.eval()
    total_loss, total_tokacc, total_em = 0.0, 0.0, 0.0

    for src, src_len, tgt_in, tgt_out in loader:
        src = src.to(device)
        tgt_in = tgt_in.to(device)
        tgt_out = tgt_out.to(device)

        logits = model(src, tgt_in, teacher_forcing_ratio=0.0)

        B, T, V = logits.shape
        loss = criterion(logits.reshape(B*T, V), tgt_out.reshape(B*T))

        total_loss += loss.item()
        total_tokacc += token_accuracy(logits, tgt_out)
        total_em += exact_match(logits, tgt_out)

    n = len(loader)
    return total_loss/n, total_tokacc/n, total_em/n


---

## 9) Lancement de l‚Äôentra√Ænement


In [None]:
encoder = Encoder(VOCAB_SIZE, EMBED_DIM, HIDDEN_DIM, NUM_LAYERS, DROPOUT)
decoder = Decoder(VOCAB_SIZE, EMBED_DIM, HIDDEN_DIM, NUM_LAYERS, DROPOUT)
model = Seq2Seq(encoder, decoder).to(device)

optimizer = optim.Adam(model.parameters(), lr=LR)

hist = {"tr_loss":[], "va_loss":[], "tr_em":[], "va_em":[]}

for epoch in range(1, EPOCHS+1):
    tr_loss, tr_tok, tr_em = train_one_epoch(model, train_loader, optimizer, criterion, tf_ratio=TEACHER_FORCING_RATIO)
    va_loss, va_tok, va_em = evaluate(model, valid_loader, criterion)

    hist["tr_loss"].append(tr_loss); hist["va_loss"].append(va_loss)
    hist["tr_em"].append(tr_em);     hist["va_em"].append(va_em)

    print(f"Epoch {epoch:02d} | train loss {tr_loss:.4f} EM {tr_em:.3f} | "
          f"valid loss {va_loss:.4f} EM {va_em:.3f}")


---

## 10) Courbes


In [None]:
plt.figure()
plt.plot(hist["tr_loss"], label="train loss")
plt.plot(hist["va_loss"], label="valid loss")
plt.xlabel("epoch"); plt.ylabel("loss"); plt.legend(); plt.show()

plt.figure()
plt.plot(hist["tr_em"], label="train exact match")
plt.plot(hist["va_em"], label="valid exact match")
plt.xlabel("epoch"); plt.ylabel("exact match"); plt.legend(); plt.show()


---

## 11) Inference greedy


In [None]:
def decode_ids(ids):
    out = []
    for i in ids:
        if i == PAD or i == SOS:
            continue
        if i == EOS:
            break
        out.append(itos[i])
    return "".join(out)

@torch.no_grad()
def greedy_generate(model, src_str, max_len=12):
    model.eval()
    src_ids = torch.tensor([encode_string(src_str)], dtype=torch.long, device=device)
    h, c = model.encoder(src_ids)

    input_tok = torch.tensor([[SOS]], dtype=torch.long, device=device)
    out_ids = []

    for _ in range(max_len):
        logits, h, c = model.decoder(input_tok, h, c)
        pred = logits.argmax(-1)
        tok = pred.item()
        out_ids.append(tok)
        input_tok = pred
        if tok == EOS:
            break

    return decode_ids(out_ids)

for s in ["3+7", "12+5", "99+1", "123+45", "7+250"]:
    print(s, "=>", greedy_generate(model, s, max_len=20), "(gold:", str(eval(s)), ")")


---

## 12) Test final


In [None]:
test_loss, test_tok, test_em = evaluate(model, test_loader, criterion)
print(f"TEST | loss {test_loss:.4f} exact match {test_em:.3f}")


---

## 13) Analyse d‚Äôerreurs (qualitative)


In [None]:
errors = 0
for i in range(400):
    src, tgt = test_ds[i]
    pred = greedy_generate(model, src, max_len=25)
    if pred != tgt:
        print("src :", src)
        print("pred:", pred)
        print("gold:", tgt)
        print("---")
        errors += 1
        if errors >= 10:
            break

print("Errors shown:", errors)


---

## 14) Test de g√©n√©ralisation (MAX_DIGITS + 1)


In [None]:
@torch.no_grad()
def generalization_test(model, max_digits_test=MAX_DIGITS+1, n=300):
    ok = 0
    for _ in range(n):
        src, tgt = sample_addition(max_digits=max_digits_test)
        pred = greedy_generate(model, src, max_len=40)
        ok += int(pred == tgt)
    return ok / n

gen_em = generalization_test(model, max_digits_test=MAX_DIGITS+1, n=300)
print(f"Generalization exact match (test digits={MAX_DIGITS+1}): {gen_em:.3f}")


---

## 15) Questions √† rendre (rapport 1‚Äì2 pages)

1. Pourquoi l‚Äôaddition symbolique est-elle plus difficile que l‚Äôinversion ?  
2. Expliquez le r√¥le de `SOS`, `EOS`, `PAD`.  
3. Identifiez une erreur due √† la retenue et expliquez-la.  
4. Comparez `TEACHER_FORCING_RATIO=0.0` vs `0.7` vs `0.9`.  
5. Discutez la g√©n√©ralisation vers des nombres plus longs.  
6. Proposez une am√©lioration : attention, transformer, ou inversion de la source.

---

## 16) Extensions (facultatif)

- Inverser la source et comparer : aide-t-il la performance ?
- Essayer GRU.
- Ajouter attention (T3).
