In [None]:
!pip install torch==2.2.0 torchtext==0.16.2 numpy\<2.0
!pip install portalocker>=2.0.0

In [None]:
import torch
import torchtext
import numpy
import portalocker
import torchdata
import math


from torch import nn
from torch import Tensor
from torch.nn import Transformer
from torchtext.datasets import multi30k, Multi30k
from typing import Iterable, List
from torchtext.data.utils import get_tokenizer
from torchtext.vocab import build_vocab_from_iterator
from torchtext.transforms import SentencePieceTokenizer

## Tokenisation (BPE) / Construction du vocabulaire :

In [None]:
import sentencepiece as spm
import tempfile
import os

SRC_LANGUAGE = 'en'
TGT_LANGUAGE = 'de'
LANGUAGE_MAP = {SRC_LANGUAGE: 0, TGT_LANGUAGE: 1}
UNK_IDX, PAD_IDX, BOS_IDX, EOS_IDX = 0, 1, 2, 3
special_symbols = ['<unk>', '<pad>', '<bos>', '<eos>']
DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
train_iter_for_spm = Multi30k(split='train', language_pair=(SRC_LANGUAGE, TGT_LANGUAGE))


#Creation d'un fichier temporaire contenant les phrases
temp_file = tempfile.NamedTemporaryFile(mode='w', delete=False, encoding='utf-8')
temp_file_path = temp_file.name

try:
    #Ecrire toutes les phrases (anglais et allemand) dans le fichier
    for src_sentence, tgt_sentence in train_iter_for_spm:
        temp_file.write(src_sentence.strip() + '\n')
        temp_file.write(tgt_sentence.strip() + '\n')
finally:
    temp_file.close()

#Entrainement du tokenizer avec SPM
spm.SentencePieceTrainer.train(
    input=temp_file_path,
    model_prefix='bpe',
    vocab_size=10000,
    model_type='bpe',
    bos_id=BOS_IDX,
    eos_id=EOS_IDX,
    unk_id=UNK_IDX,
    pad_id=PAD_IDX,
    character_coverage=1.0,
    byte_fallback=True
)

#Clean up
os.remove(temp_file_path)

# L'itérateur a été consommé on Re-initialise
train_iter = Multi30k(split='train', language_pair=(SRC_LANGUAGE, TGT_LANGUAGE))


# Tokenizers pour English and German : token_transform[LANGUAGE](" ") -> renvoie les tokens associés à la phrase
token_transform = {
    SRC_LANGUAGE: SentencePieceTokenizer("bpe.model"),
    TGT_LANGUAGE: SentencePieceTokenizer("bpe.model")
}

#Parcours toutes les phrases du dataset et renvoie les tokens
def yield_tokens(data_iter, language):
    language_index = {SRC_LANGUAGE: 0, TGT_LANGUAGE: 1}
    for sample in data_iter:
        yield token_transform[language](sample[language_index[language]])

vocab_transform = {}
for lang in [SRC_LANGUAGE, TGT_LANGUAGE]:
    current_train_iter = Multi30k(split='train', language_pair=(SRC_LANGUAGE, TGT_LANGUAGE))
    vocab_transform[lang] = build_vocab_from_iterator(
        yield_tokens(current_train_iter, lang),
        min_freq=1,
        specials=special_symbols,
        special_first=True
    )
    vocab_transform[lang].set_default_index(UNK_IDX)


In [None]:
token_transform[SRC_LANGUAGE]("The tokenizer is preprocessing the text.")

['▁The',
 '▁to',
 'ken',
 'iz',
 'er',
 '▁is',
 '▁prep',
 'ro',
 'cess',
 'ing',
 '▁the',
 '▁text',
 '.']

## Création du dataloader

In [None]:
from torch.nn.utils.rnn import pad_sequence

# Fonction Helper qui applique une suite de transformation
def sequential_transforms(*transforms):
    def func(txt_input):
        for transform in transforms:
            txt_input = transform(txt_input)
        return txt_input
    return func


# Ajoute BOS/EOS au token et créer le tenseur
def add_bos_and_eos(token_ids: List[int]):
    return torch.cat((torch.tensor([BOS_IDX]),
                      torch.tensor(token_ids),
                      torch.tensor([EOS_IDX])))

#Tokenisation complète
text_transform = {}
for ln in [SRC_LANGUAGE, TGT_LANGUAGE]:
    text_transform[ln] = sequential_transforms(token_transform[ln], #Tokenization
                                               vocab_transform[ln], #Numericalization
                                               add_bos_and_eos) #ajoute BOS/EOS et créer le tenseur


#batch utilisable
def collate_fn(batch):
    src_batch, tgt_batch = [], []
    for src_sample, tgt_sample in batch:
        src_batch.append(text_transform[SRC_LANGUAGE](src_sample.rstrip("\n")))
        tgt_batch.append(text_transform[TGT_LANGUAGE](tgt_sample.rstrip("\n")))

    src_batch = pad_sequence(src_batch, padding_value=PAD_IDX)
    tgt_batch = pad_sequence(tgt_batch, padding_value=PAD_IDX)
    return src_batch, tgt_batch

Loading

In [None]:
from torch.utils.data import DataLoader
BATCH_SIZE = 128
train_dataloader = DataLoader(train_iter, batch_size=BATCH_SIZE, collate_fn=collate_fn)
size_dataloader = len(list(train_dataloader))
train_dataloader = DataLoader(train_iter, batch_size=BATCH_SIZE, collate_fn=collate_fn)
val_iter = Multi30k(split='valid', language_pair=(SRC_LANGUAGE, TGT_LANGUAGE))
val_dataloader = DataLoader(val_iter, batch_size=BATCH_SIZE, collate_fn=collate_fn)

## MODELE

PositionnalEncoding sinusoidale classique:

In [None]:
DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
class PositionalEncoding(nn.Module):
    def __init__(self,
                 emb_size: int,
                 dropout: float,
                 maxlen: int = 5000):
        super(PositionalEncoding, self).__init__()
        den = torch.exp(- torch.arange(0, emb_size, 2)* math.log(10000) / emb_size)
        pos = torch.arange(0, maxlen).reshape(maxlen, 1)
        pos_embedding = torch.zeros((maxlen, emb_size))
        pos_embedding[:, 0::2] = torch.sin(pos * den)
        pos_embedding[:, 1::2] = torch.cos(pos * den)
        pos_embedding = pos_embedding.unsqueeze(-2)

        self.dropout = nn.Dropout(dropout)
        self.register_buffer('pos_embedding', pos_embedding)

    def forward(self, token_embedding: Tensor):
        sum_embed = token_embedding + self.pos_embedding[:token_embedding.size(0), :]
        return self.dropout(sum_embed)

Creation des masques d'attention

In [None]:
def generate_square_subsequent_mask(mask_size):
    return torch.triu(torch.ones((mask_size, mask_size), device=DEVICE, dtype=torch.bool),diagonal=1)


def create_mask(src, tgt):
    src_seq_len = src.shape[0]
    tgt_seq_len = tgt.shape[0]

    tgt_mask = generate_square_subsequent_mask(tgt_seq_len).to(DEVICE)
    src_mask = torch.zeros((src_seq_len, src_seq_len),device=DEVICE).type(torch.bool)

    src_padding_mask = (src == PAD_IDX).transpose(0, 1)
    tgt_padding_mask = (tgt == PAD_IDX).transpose(0, 1)
    return src_mask, tgt_mask, src_padding_mask, tgt_padding_mask

Modele

In [None]:
class TokenEmbedding(nn.Module):
    def __init__(self, vocab_size: int, emb_size):
        super(TokenEmbedding, self).__init__()
        self.embedding = nn.Embedding(vocab_size, emb_size)
        self.emb_size = emb_size

    def forward(self, tokens: Tensor):
        outs = self.embedding(tokens.long())
        return outs  * math.sqrt(self.emb_size)

In [None]:
class Seq2SeqTransformer(nn.Module):
    def __init__(self,
                 num_encoder_layers: int,
                 num_decoder_layers: int,
                 emb_size: int,
                 nhead: int,
                 src_vocab_size: int,
                 tgt_vocab_size: int,
                 dim_feedforward: int = 512,
                 dropout: float = 0.1):
        super(Seq2SeqTransformer, self).__init__()
        self.transformer = Transformer(d_model=emb_size,
                                       nhead=nhead,
                                       num_encoder_layers=num_encoder_layers,
                                       num_decoder_layers=num_decoder_layers,
                                       dim_feedforward=dim_feedforward,
                                       dropout=dropout)
        self.unembed = nn.Linear(emb_size, tgt_vocab_size)
        self.src_tok_emb = TokenEmbedding(src_vocab_size, emb_size)
        self.tgt_tok_emb = TokenEmbedding(tgt_vocab_size, emb_size)
        self.positional_encoding = PositionalEncoding(
            emb_size, dropout=dropout)

    def forward(self,
                src: Tensor,
                trg: Tensor,
                src_mask: Tensor,
                tgt_mask: Tensor,
                src_padding_mask: Tensor,
                tgt_padding_mask: Tensor,
                memory_key_padding_mask: Tensor):
        #embedding + positional encoding
        src_emb = self.positional_encoding(self.src_tok_emb(src))
        tgt_emb = self.positional_encoding(self.tgt_tok_emb(trg))
        #Transformer complet (encodeur + décodeur)
        outs = self.transformer(src_emb,tgt_emb,src_mask=src_mask,tgt_mask=tgt_mask,
                                src_key_padding_mask=src_padding_mask,tgt_key_padding_mask=tgt_padding_mask,
                                memory_key_padding_mask=memory_key_padding_mask)
        #projections
        logits = self.unembed(outs)
        return logits



    def encode(self, src: Tensor, src_mask: Tensor):
        return self.transformer.encoder(self.positional_encoding(
                            self.src_tok_emb(src)), src_mask)

    def decode(self, tgt: Tensor, memory: Tensor, tgt_mask: Tensor):
        return self.transformer.decoder(self.positional_encoding(
                          self.tgt_tok_emb(tgt)), memory,
                          tgt_mask)

In [23]:
torch.manual_seed(0)

#hyperparameters
SRC_VOCAB_SIZE = len(vocab_transform[SRC_LANGUAGE])
TGT_VOCAB_SIZE = len(vocab_transform[TGT_LANGUAGE])
EMB_SIZE = 512
NHEAD = 8
FFN_HID_DIM = 2048
NUM_ENCODER_LAYERS = 4
NUM_DECODER_LAYERS = 4
lr=0.0003
NUM_EPOCHS=10

transformer = Seq2SeqTransformer(NUM_ENCODER_LAYERS, NUM_DECODER_LAYERS, EMB_SIZE,NHEAD, SRC_VOCAB_SIZE, TGT_VOCAB_SIZE, FFN_HID_DIM)

#Initialisation des poids du transformer par xavier_uniform (évite les vanishing gradient)
for p in transformer.parameters():
    if p.dim() > 1: #pas les biais
        nn.init.xavier_uniform_(p)
transformer = transformer.to(DEVICE)

#loss et optimizer
loss_fn = torch.nn.CrossEntropyLoss(ignore_index=PAD_IDX)
optimizer = torch.optim.Adam(transformer.parameters(), lr=0.0003, betas=(0.9, 0.98), eps=1e-9)

## Training

In [24]:
def train_epoch(model, optimizer, loss_fn, train_dataloader):
    model.train()
    train_losses = []
    total_loss = 0.0

    for src, tgt in train_dataloader:
        src = src.to(DEVICE)
        tgt = tgt.to(DEVICE)

        # Décalage
        tgt_input = tgt[:-1, :]      # On enlève EOS : ce que le decodeur voit
        tgt_output = tgt[1:, :]      # On enlève BOS : ce que le decodeur doit prédire

        # Création des masks avec la fonction précédente
        src_mask, tgt_mask, src_padding_mask, tgt_padding_mask = create_mask(src, tgt_input)

        # Forward
        logits = model(src,tgt_input,src_mask,tgt_mask,src_padding_mask,tgt_padding_mask,src_padding_mask)

        # Loss
        optimizer.zero_grad() #remettre à 0 les gradients
        loss = loss_fn(logits.reshape(-1, logits.shape[-1]),tgt_output.reshape(-1))

        # Backward
        loss.backward()
        optimizer.step()

        train_losses.append(loss.item())
        total_loss += loss.item()

    return total_loss / len(train_losses), train_losses


def evaluate(model, loss_fn, val_dataloader):
    model.eval()
    val_losses = []
    total_loss = 0.0
    with torch.no_grad():
      for src, tgt in val_dataloader:
        src = src.to(DEVICE)
        tgt = tgt.to(DEVICE)

        # Décalage
        tgt_input = tgt[:-1, :] # On enlève EOS : ce que le decodeur voit
        tgt_output = tgt[1:, :] # On enlève BOS : ce que le decodeur doit prédire

        # Création des masks avec la fonction précédente
        src_mask, tgt_mask, src_padding_mask, tgt_padding_mask = create_mask(src, tgt_input)
        logits = model(src,tgt_input,src_mask,tgt_mask,src_padding_mask,tgt_padding_mask,src_padding_mask)

        #loss
        loss = loss_fn(logits.reshape(-1, logits.shape[-1]),tgt_output.reshape(-1))
        val_losses.append(loss.item())
        total_loss += loss.item()


      return total_loss / len(val_losses)

In [25]:
for epoch in range(1, NUM_EPOCHS + 1):
    train_loss, _ = train_epoch(transformer, optimizer, loss_fn, train_dataloader)
    val_loss = evaluate(transformer, loss_fn, val_dataloader)
    print(f"Epoch {epoch} | Train loss: {train_loss:.4f} | Val loss: {val_loss:.4f}")

Epoch 1 | Train loss: 5.1151 | Val loss: 4.3615
Epoch 2 | Train loss: 3.9818 | Val loss: 3.8046
Epoch 3 | Train loss: 3.5238 | Val loss: 3.4924
Epoch 4 | Train loss: 3.0967 | Val loss: 3.0830
Epoch 5 | Train loss: 2.5942 | Val loss: 2.6418
Epoch 6 | Train loss: 2.1499 | Val loss: 2.3442
Epoch 7 | Train loss: 1.8262 | Val loss: 2.1725
Epoch 8 | Train loss: 1.5837 | Val loss: 2.0940
Epoch 9 | Train loss: 1.3825 | Val loss: 2.0683
Epoch 10 | Train loss: 1.2197 | Val loss: 2.0437


## INFERENCE

BEAM SEARCH pour la traduction

In [26]:
import torch.nn.functional as F
def beam_search_decode(model, src, src_mask, max_len, start_symbol, K=5,length_penalty=0.7):
    model.eval()
    src = src.to(DEVICE)
    src_mask = src_mask.to(DEVICE)

    with torch.no_grad():
        memory = model.encode(src, src_mask)
        init_seq = torch.full((1, src.shape[1]), start_symbol, dtype=torch.long, device=DEVICE)
        beams = [(init_seq, 0.0, False)]

        for _ in range(max_len - 1):
            candidates = []

            #Si toutes les hypothèses du beam en cours ont déjà produit un token EOS on a fini d'étudier src
            if all(finished for (_, _, finished) in beams):
                break

            for seq, score, finished in beams: #on va etudier toutes les hypothèses du beam
                if finished:
                    #Si l'hypothèse a atteint EOS alors on l'ajoute à candidat et on passe à la prochaine hypothèse
                    candidates.append((seq, score, True))
                    continue

                #sinon on génère :
                tgt_mask = generate_square_subsequent_mask(seq.size(0)).to(DEVICE)
                out = model.decode(seq, memory, tgt_mask)
                logits = model.unembed(out)
                log_probs = F.log_softmax(logits[-1, 0, :], dim=-1)

                #On va regarder les K meilleurs prochains tokens et itérer dessus pour noter leur score respectif
                topk_log_probs, topk_tokens = torch.topk(log_probs, K)
                for j in range(K):
                    tok = topk_tokens[j].item()
                    new_seq = torch.cat([seq, torch.tensor([[tok]], dtype=torch.long, device=DEVICE)],dim=0)
                    new_score = score + topk_log_probs[j].item()
                    new_finished = (tok == EOS_IDX) #si le prochain token est EOS alors on a fini cette hypotèse
                    candidates.append((new_seq, new_score, new_finished))

            candidates.sort(key=lambda x: x[1] / (x[0].size(0) ** length_penalty), reverse=True) #length_penalty pour eviter que les séquences courtes soient privilégiées
            beams = candidates[:K] #on garde que les K meilleurs hypothèses (il y en a max K^2) comme ca on en a seulement K à chaque fois (sinon suite géometrique )

        #Selection du meilleur beam
        best_seq, _, _ = max(beams, key=lambda x: x[1] / (x[0].size(0) ** length_penalty))
        return best_seq

TRADUCTEUR : Anglais -> Allemand


In [27]:
def detokenize_sentencepiece(tokens):
    text = "".join(tokens)
    return text.replace("▁", " ").strip()


def Beam_translate(model: torch.nn.Module, src_sentence: str):
    model.eval()
    src = text_transform[SRC_LANGUAGE](src_sentence).view(-1, 1)
    num_tokens = src.shape[0]
    src_mask = (torch.zeros(num_tokens, num_tokens)).type(torch.bool)

    tgt_tokens = beam_search_decode(model,src,src_mask,max_len=num_tokens + 5,start_symbol=BOS_IDX,K=5,length_penalty=0.7).flatten()

    tokens = vocab_transform[TGT_LANGUAGE].lookup_tokens(list(tgt_tokens.cpu().tolist()))
    sentence = detokenize_sentencepiece(tokens)
    sentence = sentence.replace("<bos>", "").replace("<eos>", "").strip()
    return sentence


In [28]:
print(Beam_translate(transformer, "The woman is sleeping"))
print(Beam_translate(transformer, "The cat is playing with the dog."))
print(Beam_translate(transformer, "Two children are playing in the snow."))
print(Beam_translate(transformer, "A man is riding a bicycle down the street."))
print(Beam_translate(transformer, "Several people are sitting at a table outside."))
print(Beam_translate(transformer, "A dog is running through a field."))
print(Beam_translate(transformer, "A woman is holding a baby in her arms."))
print(Beam_translate(transformer, "Three men are standing near a car."))
print(Beam_translate(transformer, "A child is eating ice cream."))
print(Beam_translate(transformer, "I'm hungry, I want to eat"))

Eine Frau schläft auf dem schläft.
Eine Katze spielt mit dem Hund.
Zwei Kinder spielen im Schnee.
Ein Mann fährt auf einem Fahrrad die Straße entlang.
Mehrere Personen sitzen draußen an einem Tisch.
Ein Hund läuft über ein Feld.
Eine Frau hält ein Baby in den Armen.
Drei Männer stehen in der Nähe eines Autos.
Ein Kind isst ein Eis.
Ich vergnügen sich, um zu essen.


A woman is sleeping on the bed. (mots en plus)

A cat is playing with the dog.

Two children are playing in the snow.

A man is riding a bicycle down the street.

Several people are sitting outside at a table.

A dog is running across a field.

A woman is holding a baby in her arms.

Three men are standing near a car.

A child is eating an ice cream.

They are enjoying themselves to eat. (complétement faux quand on sort du type description d'image)

Conclusion :

Dans l’ensemble, le modèle produit des traductions grammaticalement correctes et sémantiquement cohérentes pour des phrases simples de description. On observe toutefois une forte homogénéité dans les structures et le vocabulaire, qui reflète directement le biais du jeu de données utilisé (phrases descriptives, scènes neutres, langage standardisé, issu de descriptions d’images).

Ce biais limite la diversité lexicale et la capacité de généralisation du modèle à des phrases plus complexes, abstraites ou hors domaine. Les résultats sont donc satisfaisants dans le cadre restreint du dataset, mais ils ne garantissent pas encore des performances robustes en conditions réelles ou sur des textes plus riches et variés.