# Seq2Seq English–French Translation (Colab)

Notebook d'entraînement pour le **Sujet 11 : Traduction automatique simplifiée avec Seq2Seq**.

- Modèle : encodeur–décodeur LSTM (Seq2Seq) en PyTorch.
- Données : corpus de paires de phrases anglais–français (fichier CSV).
- Objectif : entraîner un mini-traducteur et tester quelques phrases.


In [None]:
# Installation des dépendances de base
!pip install -q torch pandas


In [None]:
# 1) Charger les données
# Option simple : uploader manuellement le fichier CSV depuis votre machine.
# Le fichier doit contenir deux colonnes : colonne 0 = anglais, colonne 1 = français.
# Exemple : utilisez le dataset Kaggle "Language Translation English-French" et exportez un CSV.

from google.colab import files
import io
import pandas as pd

print("Veuillez sélectionner votre fichier CSV (ex: eng_-french.csv)...")
uploaded = files.upload()

# On prend le premier fichier uploadé
csv_name = list(uploaded.keys())[0]
print(f"Fichier reçu : {csv_name}")

df = pd.read_csv(io.BytesIO(uploaded[csv_name]))
DATA_PATH = "eng_-french_colab.csv"
df.to_csv(DATA_PATH, index=False)
print(f"Données sauvegardées sous {DATA_PATH}")


In [None]:
# 2) Prétraitement des données et dataset PyTorch

import unicodedata
import re
import torch
from torch.utils.data import Dataset, DataLoader

SOS_token = 0
EOS_token = 1
UNK_token = 2
PAD_token = 3

class Lang:
    def __init__(self, name):
        self.name = name
        self.word2index = {}
        self.word2count = {}
        self.index2word = {0: "SOS", 1: "EOS", 2: "UNK", 3: "PAD"}
        self.n_words = 4  # Count SOS, EOS, UNK, PAD

    def addSentence(self, sentence):
        for word in sentence.split(' '):
            self.addWord(word)

    def addWord(self, word):
        if word not in self.word2index:
            self.word2index[word] = self.n_words
            self.word2count[word] = 1
            self.index2word[self.n_words] = word
            self.n_words += 1
        else:
            self.word2count[word] += 1

def unicodeToAscii(s):
    return ''.join(
        c for c in unicodedata.normalize('NFD', s)
        if unicodedata.category(c) != 'Mn'
    )

def normalizeString(s):
    s = unicodeToAscii(s.lower().strip())
    # Séparer la ponctuation
    s = re.sub(r"([.!?])", r" \1", s)
    s = re.sub(r"[^a-zA-Z.!?]+", r" ", s)
    return s.strip()

def read_data(path, limit=None):
    df_local = pd.read_csv(path)
    if limit:
        df_local = df_local.head(limit)

    pairs = []
    for i in range(len(df_local)):
        eng = normalizeString(str(df_local.iloc[i, 0]))
        fra = normalizeString(str(df_local.iloc[i, 1]))
        pairs.append([eng, fra])

    return pairs

def filterPair(p, max_length=15):
    return len(p[0].split(' ')) < max_length and len(p[1].split(' ')) < max_length

def filterPairs(pairs, max_length=15):
    return [pair for pair in pairs if filterPair(pair, max_length)]

def prepareData(path, limit=None):
    pairs = read_data(path, limit)
    print(f"Read {len(pairs)} sentence pairs")
    pairs = filterPairs(pairs)
    print(f"Trimmed to {len(pairs)} sentence pairs")

    input_lang = Lang("eng")
    output_lang = Lang("fra")

    for pair in pairs:
        input_lang.addSentence(pair[0])
        output_lang.addSentence(pair[1])

    print("Counted words:")
    print(input_lang.name, input_lang.n_words)
    print(output_lang.name, output_lang.n_words)

    return input_lang, output_lang, pairs

def indexesFromSentence(lang, sentence):
    return [lang.word2index.get(word, UNK_token) for word in sentence.split(' ')]

def tensorFromSentence(lang, sentence):
    indexes = [SOS_token] + indexesFromSentence(lang, sentence)
    indexes.append(EOS_token)
    return torch.tensor(indexes, dtype=torch.long)

class TranslationDataset(Dataset):
    def __init__(self, pairs, input_lang, output_lang):
        self.pairs = pairs
        self.input_lang = input_lang
        self.output_lang = output_lang

    def __len__(self):
        return len(self.pairs)

    def __getitem__(self, idx):
        pair = self.pairs[idx]
        input_tensor = tensorFromSentence(self.input_lang, pair[0])
        target_tensor = tensorFromSentence(self.output_lang, pair[1])
        return input_tensor, target_tensor, pair[0], pair[1]

def collate_fn(batch):
    input_tensors, target_tensors, _, _ = zip(*batch)

    input_lengths = [len(tensor) for tensor in input_tensors]
    target_lengths = [len(tensor) for tensor in target_tensors]

    input_tensors_padded = torch.nn.utils.rnn.pad_sequence(input_tensors, padding_value=PAD_token)
    target_tensors_padded = torch.nn.utils.rnn.pad_sequence(target_tensors, padding_value=PAD_token)

    return input_tensors_padded, target_tensors_padded, input_lengths, target_lengths


In [None]:
# 3) Modèle Seq2Seq (encodeur–décodeur LSTM)

import torch.nn as nn
from torch.nn.utils.rnn import pack_padded_sequence, pad_packed_sequence

class EncoderRNN(nn.Module):
    def __init__(self, input_vocab_size, embedding_dim, hidden_size, num_layers=1, dropout=0.1):
        super().__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers

        self.embedding = nn.Embedding(input_vocab_size, embedding_dim, padding_idx=PAD_token)
        self.lstm = nn.LSTM(
            embedding_dim,
            hidden_size,
            num_layers=num_layers,
            dropout=dropout if num_layers > 1 else 0.0,
        )

    def forward(self, src, src_lengths):
        # src: (src_len, batch_size)
        embedded = self.embedding(src)
        packed = pack_padded_sequence(embedded, src_lengths, enforce_sorted=False)
        packed_outputs, (hidden, cell) = self.lstm(packed)
        outputs, _ = pad_packed_sequence(packed_outputs)
        return outputs, (hidden, cell)

class DecoderRNN(nn.Module):
    def __init__(self, output_vocab_size, embedding_dim, hidden_size, num_layers=1, dropout=0.1):
        super().__init__()
        self.output_vocab_size = output_vocab_size

        self.embedding = nn.Embedding(output_vocab_size, embedding_dim, padding_idx=PAD_token)
        self.lstm = nn.LSTM(
            embedding_dim,
            hidden_size,
            num_layers=num_layers,
            dropout=dropout if num_layers > 1 else 0.0,
        )
        self.fc_out = nn.Linear(hidden_size, output_vocab_size)

    def forward(self, input_step, hidden, cell):
        # input_step: (batch_size,)
        input_step = input_step.unsqueeze(0)  # (1, batch_size)
        embedded = self.embedding(input_step)  # (1, batch_size, embedding_dim)
        outputs, (hidden, cell) = self.lstm(embedded, (hidden, cell))
        predictions = self.fc_out(outputs.squeeze(0))  # (batch_size, vocab_size)
        return predictions, hidden, cell

class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder, device):
        super().__init__()
        self.encoder = encoder
        self.decoder = decoder
        self.device = device

    def forward(self, src, src_lengths, trg, teacher_forcing_ratio=0.5):
        # src: (src_len, batch_size)
        # trg: (trg_len, batch_size)
        batch_size = trg.size(1)
        trg_len = trg.size(0)
        vocab_size = self.decoder.output_vocab_size

        outputs = torch.zeros(trg_len, batch_size, vocab_size, device=self.device)

        _, (hidden, cell) = self.encoder(src, src_lengths)

        # Premier token d'entrée du décodeur : <SOS>
        input_step = trg[0, :]

        for t in range(1, trg_len):
            output, hidden, cell = self.decoder(input_step, hidden, cell)
            outputs[t] = output

            teacher_force = torch.rand(1).item() < teacher_forcing_ratio
            top1 = output.argmax(1)

            input_step = trg[t] if teacher_force else top1

        return outputs


In [None]:
# 4) Entraînement du modèle Seq2Seq

import math
import time

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Device:", device)

# Hyperparamètres
EMBEDDING_DIM = 256
HIDDEN_SIZE = 256
NUM_LAYERS = 1
DROPOUT = 0.1

BATCH_SIZE = 32
N_EPOCHS = 5          # Vous pouvez augmenter si vous avez le temps
LEARNING_RATE = 1e-3
TEACHER_FORCING_RATIO = 0.5
GRAD_CLIP = 1.0

# Limitez le nombre de paires pour aller plus vite (ex: 50_000) ou None pour tout
LIMIT_PAIRS = 50000

input_lang, output_lang, pairs = prepareData(DATA_PATH, limit=LIMIT_PAIRS)

dataset = TranslationDataset(pairs, input_lang, output_lang)
dataloader = DataLoader(
    dataset,
    batch_size=BATCH_SIZE,
    shuffle=True,
    collate_fn=collate_fn,
)

encoder = EncoderRNN(
    input_vocab_size=input_lang.n_words,
    embedding_dim=EMBEDDING_DIM,
    hidden_size=HIDDEN_SIZE,
    num_layers=NUM_LAYERS,
    dropout=DROPOUT,
).to(device)

decoder = DecoderRNN(
    output_vocab_size=output_lang.n_words,
    embedding_dim=EMBEDDING_DIM,
    hidden_size=HIDDEN_SIZE,
    num_layers=NUM_LAYERS,
    dropout=DROPOUT,
).to(device)

model = Seq2Seq(encoder, decoder, device).to(device)

criterion = nn.CrossEntropyLoss(ignore_index=PAD_token)
optimizer = torch.optim.Adam(model.parameters(), lr=LEARNING_RATE)

def train_epoch():
    model.train()
    epoch_loss = 0.0
    for batch in dataloader:
        src, trg, src_lengths, _ = batch
        src = src.to(device)
        trg = trg.to(device)

        optimizer.zero_grad()
        output = model(src, src_lengths, trg, teacher_forcing_ratio=TEACHER_FORCING_RATIO)

        # output: (trg_len, batch_size, vocab_size)
        output_dim = output.shape[-1]
        output = output[1:].reshape(-1, output_dim)
        trg_flat = trg[1:].reshape(-1)

        loss = criterion(output, trg_flat)
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), GRAD_CLIP)
        optimizer.step()

        epoch_loss += loss.item()
    return epoch_loss / len(dataloader)

for epoch in range(1, N_EPOCHS + 1):
    start_time = time.time()
    train_loss = train_epoch()
    elapsed = time.time() - start_time
    print(f"Epoch {epoch}/{N_EPOCHS} - loss: {train_loss:.4f} - time: {elapsed:.1f}s")

MODEL_PATH = "seq2seq_en_fr_colab.pt"
torch.save(
    {
        "encoder_state_dict": encoder.state_dict(),
        "decoder_state_dict": decoder.state_dict(),
        "input_lang": input_lang,
        "output_lang": output_lang,
        "config": {
            "embedding_dim": EMBEDDING_DIM,
            "hidden_size": HIDDEN_SIZE,
            "num_layers": NUM_LAYERS,
            "dropout": DROPOUT,
        },
    },
    MODEL_PATH,
)
print(f"Modèle sauvegardé dans {MODEL_PATH}")


In [None]:
# 5) Fonction de traduction pour tester le modèle

def translate_sentence(sentence, model, input_lang, output_lang, device, max_length=30):
    model.eval()
    with torch.no_grad():
        normalized = normalizeString(sentence)
        src_tensor = tensorFromSentence(input_lang, normalized).to(device)
        src_length = [src_tensor.size(0)]
        src_tensor = src_tensor.unsqueeze(1)  # (seq_len, 1)

        _, (hidden, cell) = model.encoder(src_tensor, src_length)
        input_token = torch.tensor([SOS_token], dtype=torch.long, device=device)

        decoded_tokens = []
        for _ in range(max_length):
            output, hidden, cell = model.decoder(input_token, hidden, cell)
            top1 = output.argmax(1)
            token_id = top1.item()
            if token_id == EOS_token:
                break
            decoded_tokens.append(token_id)
            input_token = top1

    translated_words = [output_lang.index2word.get(idx, "<UNK>") for idx in decoded_tokens]
    return " ".join(translated_words)

# Exemple rapide après entraînement :
test_sentence = "i am a student ."
print("Anglais :", test_sentence)
print("Français :", translate_sentence(test_sentence, model, input_lang, output_lang, device))
