## Implémentation du modèle de traduction de *Cho et al* __(2014)__
https://arxiv.org/pdf/1406.1078.pdf

In [46]:
import torch
import torch.nn as nn
import torch.functional as F

from torchtext import datasets
from torch.utils.data import DataLoader

from torchsummary import summary

In [47]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device

device(type='cpu')

### Hugging Face Tokenizer
On va utiliser un tokenizer BPE avec une séparation à minima sur les espaces.

In [48]:
from tokenizers import Tokenizer
from tokenizers.models import BPE

from tokenizers.trainers import BpeTrainer

from tokenizers.normalizers import NFD
from tokenizers.pre_tokenizers import Whitespace

In [49]:
tokenizer = Tokenizer(BPE(unk_token="[UNK]"))

tokenizer.normalizer = NFD()
tokenizer.pre_tokenizer = Whitespace()

special_tokens = ["[UNK]", "[CLS]", "[PAD]", "[SEP]", "[MASK]"]
trainer =  BpeTrainer(
    vocab=10000,
    special_tokens=special_tokens,
)

Ignored unknown kwargs option vocab


In [50]:
def train_tokenizer(data, tokenizer, trainer, save=False):

    files = ''
    tokenizer.train(files, trainer)
    
    # Saving
    if save:
        tokenizer.save('tokenizers/001.json')
    
    return tokenizer

In [51]:
# Test

data = ['blabla']

tokenizer.train_from_iterator(data, trainer)






In [52]:
output = tokenizer.encode('This is bla')
output.tokens

['[UNK]', '[UNK]', '[UNK]', '[UNK]', '[UNK]', '[UNK]', 'bla']

### Encodeur RNN

In [62]:
class Encoder(nn.Module):

    def __init__(self, emb_dim, hidden_size, num_layers=1, vocab_size=1000) -> None:
        super(Encoder, self).__init__()

        self.num_layers = num_layers
        self.emb_dim = emb_dim
        self.hidden_size = hidden_size
        self.vocab_size = vocab_size

        self.embedding = nn.Embedding(
            num_embeddings=self.vocab_size, 
            embedding_dim=self.emb_dim,
            )

        self.rnn = nn.RNN(
            input_size=self.emb_dim,
            hidden_size=self.hidden_size,
            num_layers=self.num_layers,
            nonlinearity='tanh', # tanh par défaut, 'relu' possible
            batch_first=True, # Sortie (batch_size, sequence_length, hidden_size)
        )

    def forward(self, input_sequence, hidden):
        # Input,  :   [batch_size, sequence_length], 
        # Hidden  :   [batch_size, sequence_length, hidden_size]
        
        # Embedding :       [batch_size, sequence_length, embedding_dim] (emb_dim = hidden_size ici)
        # Output :          [batch_size, sequence_length, hidden_size]

        embedded = self.embedding(input_sequence)            # Entrée
        output_sequence, hidden = self.rnn(embedded, hidden) # État précédent

        return output_sequence, hidden

    def h_init(self, batch_size):
        return torch.zeros(self.num_layers, batch_size, self.hidden_size, device=device)



#### • Vérifications 

In [63]:
encodeur = Encoder(
    emb_dim=100, 
    hidden_size=50,
    num_layers=2)

# batch_size, sequence_length
emb_input = torch.randint(1, size=(32, 10))   

# num_layer, sequence_length, hidden_size
hidden_input = torch.randint(1, size=(2, 32, 50), dtype=torch.float)

In [64]:
res = encodeur(emb_input, hidden_input) # output, hidden
res[0].shape, res[1].shape

(torch.Size([32, 10, 50]), torch.Size([2, 32, 50]))

### Décodeur

In [65]:
class Decodeur(nn.Module):

    def __init__(self, emb_dim, hidden_size, num_layers=1, vocab_size=1000) -> None:
        super(Decodeur, self).__init__()
        
        self.embedding_dim = emb_dim        # Dimension des y_t
        self.vocab_size = vocab_size        # Dimension des prédictions
        self.num_layers = num_layers
        self.hidden_size = hidden_size


        self.embedding = nn.Embedding(
            num_embeddings=self.vocab_size,
            embedding_dim=self.embedding_dim
        )

        self.rnn = nn.RNN(
            input_size = self.embedding_dim + self.hidden_size, # Concaténation entre "input" et context
            hidden_size=self.hidden_size,
            num_layers=self.num_layers,
            nonlinearity='tanh',
            batch_first=True,
        )

        self.linear = nn.Linear(
            in_features = 2 * self.hidden_size + self.embedding_dim,
            out_features=self.vocab_size, # Dimension des pred
            bias=False
        )

        self.softmax = nn.Softmax(dim=1)

    def forward(self, input_sequence, context):
        
        # pred : batch_size, sequence_length, decoder_vocab_size
        # input : batch_size, sequence_length
    
        seq_length = input_sequence.shape[1]
        emb_input_seq = self.embedding(input_sequence) # batch_size, sequence_length, emb_dim

        # Voir cours RNN papier pour explications sur la reproduction du contexte
        c = context[-1]
        c = torch.unsqueeze(c, dim=1)
        c = torch.cat((c, ) * seq_length, dim=1)

        z_concat = torch.cat((emb_input_seq, c), dim=2) 

        output_seq, hidden =  self.rnn(z_concat, context)
        output_concat = torch.cat((z_concat, output_seq), dim=2)

        y = self.linear(output_concat)
        sequence_preds = self.softmax(y)

        print(f"emb : {emb_input_seq.shape}")
        print(f"c : {c.shape}")
        print(f"z_concat : {z_concat.shape}")
        print(f"hidden : {hidden.shape}")
        print(f"output_concat : {output_concat.shape}")
        print(f"sequence_preds : {sequence_preds.shape}")

        return sequence_preds, hidden

    def h_init(self, batch_size):
        # num_layers, batch_size=1, output_dim
        y_init =  torch.zeros(size=(self.num_layers, batch_size, self.embedding_dim))
        return y_init

In [66]:
decodeur = Decodeur(
    emb_dim=150, 
    hidden_size=60,
    num_layers=2, 
    vocab_size=1000)

# batch_size,  (Prédiction à l'instant t-1)
dec_input_sequence = torch.randint(1000, size=(32, 15)) 

#  num_layers, batch_size, hidden_size
context = torch.randint(1, size=(2, 32, 60), dtype=torch.float) # Fait office de h_init


dec_res = decodeur(dec_input_sequence, context) # context fait office de

emb : torch.Size([32, 15, 150])
c : torch.Size([32, 15, 60])
z_concat : torch.Size([32, 15, 210])
hidden : torch.Size([2, 32, 60])
output_concat : torch.Size([32, 15, 270])
sequence_preds : torch.Size([32, 15, 1000])


### Assemblage Encodeur - Décodeur

In [71]:
class Seq2Seq(nn.Module):

    def __init__(self, encoder, decoder) -> None:
        super(Seq2Seq, self).__init__()

        self.encoder = encoder
        self.decoder = decoder

    def forward(self, source_seq, target_seq):
        
        # source_sequence : batch_size, seq_len, encodeur_vocab_size
        # target_sequence : batch_size, seq_len, decodeur_vocab_size
        batch_size = source_seq.shape[0]
        h_init = self.encoder.h_init(batch_size)

        _, context = self.encoder(source_seq, h_init) # Seul le contexte nous intéresse
        pred_seq, _ = self.decoder(target_seq, context) # Seul les résultats de la dernière couche nous intéressent

        preds = torch.argmax(pred_seq, dim=2) # On cherche le max sur la dernière dim
        print(f"preds : {preds.shape}")

        return preds



In [72]:
config = {
    "encoder_embedding_dim": 150,
    "encoder_vocab_size": 15000,
    "encoder_num_layers": 2,
    "decoder_embedding_dim": 100,
    "decoder_vocab_size": 10000,
    "decoder_num_layers": 2,
    "hidden_size": 50,
}

encodeur = Encoder(
    emb_dim=config["encoder_embedding_dim"],
    hidden_size=config["hidden_size"],
    num_layers=config["encoder_num_layers"],
    vocab_size=config["encoder_vocab_size"]
)

decodeur = Decodeur(
    emb_dim=config["decoder_embedding_dim"],
    hidden_size=config["hidden_size"],
    num_layers=config["decoder_num_layers"],
    vocab_size=config["decoder_vocab_size"]
)

In [73]:
seq2seq = Seq2Seq(encodeur, decodeur)

In [75]:
source_seq = torch.randint(15000, size=(32, 10))
target_seq = torch.randint(10000, size=(32, 9))

res = seq2seq(source_seq, target_seq)
res.shape

emb : torch.Size([32, 9, 100])
c : torch.Size([32, 9, 50])
z_concat : torch.Size([32, 9, 150])
hidden : torch.Size([2, 32, 50])
output_concat : torch.Size([32, 9, 200])
sequence_preds : torch.Size([32, 9, 10000])
preds : torch.Size([32, 9])


torch.Size([32, 9])

In [78]:
def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)

print(f'The model has {count_parameters(seq2seq):,} trainable parameters')

The model has 5,280,400 trainable parameters
