In [1]:
import torch
import torchvision
import matplotlib.pyplot as plt
from torch import nn
import math

# Création de la classe Transformers

In [3]:
def check_for_nans(tensor, tensor_name):
    if torch.isnan(tensor).any():
        print(f"NaN detected in {tensor_name}")

Création des différentes couches

In [76]:
class FeedForwardNetwork(nn.Module):
    
    def __init__(self, d_model, d_ff):
        super(FeedForwardNetwork, self).__init__()
        
        self.fc1 = nn.Linear(d_model, d_ff)
        self.fc2 = nn.Linear(d_ff, d_model)
        self.relu = nn.ReLU()

    def forward(self, x):
        return self.fc2(self.relu(self.fc1(x)))

In [78]:
# Optionel : Coder notre propre couche multi-attention head (pour ne pas utiliser celle fournit par torch.nn)

In [80]:
def create_mask(x):
    len = x.size(0)
    mask = torch.triu(torch.ones(len,len), diagonal = 1) * (-1e9) # Matrice triangulaire supérieur de valeur -inf
    return mask

In [82]:
class PositionalEncoding(nn.Module):

    def __init__(self, max_length, d_model):
        super(PositionalEncoding, self).__init__()
        
        pe = torch.zeros(max_length, d_model)
        position = torch.arange(0, max_length, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * -(math.log(10000.0) / d_model))
        
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        
        self.register_buffer('pe', pe.unsqueeze(0))
        
    def forward(self, x):
        return x + self.pe[:, :x.size(1)]
        

In [84]:
class Encoder(nn.Module):

    def __init__(self, d_model, n_heads, d_ff):
        super(Encoder, self).__init__()

        self.attention = torch.nn.MultiheadAttention(d_model, n_heads)
        self.norm1 = nn.LayerNorm(d_model)
        self.ffn = FeedForwardNetwork(d_model, d_ff)
        self.norm2 = nn.LayerNorm(d_model)
        

    def forward(self, x):
        attention_output, wei = self.attention(x, x, x)
        x = self.norm1(x + attention_output)
        ffn_output = self.ffn(x)
        x = self.norm2(x + ffn_output)
        return x



In [86]:
class Decoder(nn.Module):

    def __init__(self, d_model, n_heads, d_ff):
        super(Decoder, self).__init__()

        self.attention1 = torch.nn.MultiheadAttention(d_model, n_heads)
        self.norm1 = nn.LayerNorm(d_model)
        self.attention2 = torch.nn.MultiheadAttention(d_model, n_heads)
        self.norm2 = nn.LayerNorm(d_model)
        self.ffn = FeedForwardNetwork(d_model, d_ff)
        self.norm3 = nn.LayerNorm(d_model)

    def forward(self, x, enc_output, mask):
        attention_output1, wei = self.attention1(x, x, x,attn_mask = mask)
        x = self.norm1(x + attention_output1) 
        attention_output2, wei = self.attention2(x, enc_output, enc_output)
        x = self.norm2(x + attention_output2)
        ffn_output = self.ffn(x)
        x = self.norm3(x + ffn_output)
        return x



In [88]:
class Transformer(nn.Module):

    def __init__(self, vocab_size, target_size, max_length, d_model, num_heads, d_ff, n_layers):
        super(Transformer, self).__init__()
        
        self.enc_embedding = nn.Embedding(vocab_size, d_model)
        self.dec_embedding = nn.Embedding(target_size, d_model)
        self.positional_encoding = PositionalEncoding(max_length, d_model)
        
        self.encoder_layers = [Encoder(d_model, num_heads,d_ff) for i in range(n_layers)]
        self.decoder_layers = [Decoder(d_model, num_heads, d_ff) for i in range(n_layers)]

        self.linear = nn.Linear(d_model, target_size)

    def check_for_nans(self):
        for name, param in self.named_parameters():
            if torch.isnan(param).any():
                print(f"NaN detected in parameter: {name}")

    def forward(self,inp,out):

        check_for_nans(inp,"inp")
        check_for_nans(out, "out")
        
        mask = create_mask(out)

        check_for_nans(mask, "mask")
        
        out_embedded = self.positional_encoding(self.dec_embedding(out))
        inp_embedded = self.positional_encoding(self.enc_embedding(inp))

        check_for_nans(out_embedded, "out_embedded")
        check_for_nans(inp_embedded, "inp_embedded")
        
        enc_output = inp_embedded
        for encoder in self.encoder_layers:
            enc_output = encoder(enc_output)

        check_for_nans(enc_output, "enc_output")
        
        dec_output = out_embedded
        for decoder in self.decoder_layers:
            dec_output = decoder(dec_output, enc_output, mask)

        check_for_nans(dec_output, "dec_output")
        
        output = self.linear(dec_output)

        check_for_nans(output, "output")
        return output   

# Mise en forme des données

In [158]:
import nltk
import csv
from nltk.tokenize import word_tokenize
from nltk.stem.porter import PorterStemmer
import string
punctuation = string.punctuation

In [160]:
x = []
y = []
with open("eng_-french.csv",encoding = "utf-8") as file:
    reader = csv.reader(file)
    for row in reader:
        y.append(row[0])
        x.append(row[-1])
    x.pop(0)
    y.pop(0)

In [162]:
x_red = x[:10000]
y_red = y[:10000]

In [164]:
from sklearn.model_selection import train_test_split

In [166]:
x_train, x_test, y_train, y_test = train_test_split(x_red, y_red, test_size = 0.2)

In [168]:
x_train = [word_tokenize(word) for word in x_train]
y_train = [word_tokenize(word) for word in y_train]
x_test = [word_tokenize(word) for word in x_test]
y_test = [word_tokenize(word) for word in y_test]

In [169]:
for i in range(len(x_train)):
    for j in range(len(x_train[i])-1,-1,-1):
        if x_train[i][j] in punctuation:
            x_train[i].pop(j)

for i in range(len(y_train)):
    for j in range(len(y_train[i])-1,-1,-1):
        if y_train[i][j] in punctuation:
            y_train[i].pop(j)

for i in range(len(x_test)):
    for j in range(len(x_test[i])-1,-1,-1):
        if x_test[i][j] in punctuation:
            x_test[i].pop(j)

for i in range(len(y_test)):
    for j in range(len(y_test[i])-1,-1,-1):
        if y_test[i][j] in punctuation:
            y_test[i].pop(j)


In [172]:
stemmer = PorterStemmer()
for i in range(len(y_train)):
    y_train[i] = [stemmer.stem(word) for word in y_train[i]]

for i in range(len(x_train)):
    x_train[i] = [stemmer.stem(word) for word in x_train[i]]

for i in range(len(y_test)):
    y_test[i] = [stemmer.stem(word) for word in y_test[i]]

for i in range(len(x_test)):
    x_test[i] = [stemmer.stem(word) for word in x_test[i]]


In [173]:
vocab = []
target_vocab = []
for sentence in x_train:
    for word in sentence:
        if word not in vocab:
            vocab.append(word)

for sentence in y_train:
    for word in sentence:
        if word not in target_vocab:
            target_vocab.append(word)

print(len(vocab))
print(len(target_vocab))

3391
1587


In [176]:
from collections import Counter

In [178]:
cnt = Counter()
for sentence in x_train:
    for word in sentence:
        cnt[word] += 1

li = cnt.most_common(len(vocab))
vocab = {}
for i in range(len(li)):
    word, n = li[i]
    vocab[word] = i + 4

In [180]:
cnt = Counter()
for sentence in y_train:
    for word in sentence:
        cnt[word] += 1

li = cnt.most_common(len(target_vocab))
target_vocab = {}
for i in range(len(li)):
    word, n = li[i]
    target_vocab[word] = i + 4

In [182]:
for sentence in x_train:
    for i in range(len(sentence) - 1, -1, -1):
        if sentence[i] not in vocab:
            sentence.pop(i)
        else:
            sentence[i] = vocab[sentence[i]]

for sentence in x_test:
    for i in range(len(sentence) - 1, -1, -1):
        if sentence[i] not in vocab:
            sentence.pop(i)
        else:
            sentence[i] = vocab[sentence[i]]

In [184]:
for sentence in y_train:
    for i in range(len(sentence) - 1, -1, -1):
        if sentence[i] not in target_vocab:
            sentence.pop(i)
        else:
            sentence[i] = target_vocab[sentence[i]]

for sentence in y_test:
    for i in range(len(sentence) - 1, -1, -1):
        if sentence[i] not in target_vocab:
            sentence.pop(i)
        else:
            sentence[i] = target_vocab[sentence[i]]

In [186]:
def make_vector(li, max_length):
    if len(li) > max_length:
        return [1] + li[:max_length] + [2]
    else:
        return [1] + li + [3 for i in range(max_length - len(li))] + [2]

In [188]:
for i in range(len(x_train)):
    x_train[i] = make_vector(x_train[i], 40)
for i in range(len(y_train)):
    y_train[i] = make_vector(y_train[i], 40)

for i in range(len(x_test)):
    x_test[i] = make_vector(x_test[i], 40)
for i in range(len(y_test)):
    y_test[i] = make_vector(y_test[i], 40)

In [190]:
from torch.utils.data import DataLoader
from torch.utils.data import TensorDataset
from torch import IntTensor

In [192]:
# Création des Dataloader
trainset = TensorDataset(IntTensor(x_train), IntTensor(y_train))
train_dataloader = DataLoader(trainset, batch_size=32, shuffle=True, drop_last=True)

testset = TensorDataset(IntTensor(x_test), IntTensor(y_test))
test_dataloader = DataLoader(testset, batch_size=1, shuffle=True, drop_last=True)

# Entrainement du transformer

In [262]:
from torch.optim import Adam

In [264]:
tf1 = Transformer(len(vocab) + 4,len(target_vocab) + 4, 42, 512, 8, 1024, 6)

In [266]:
adam = Adam(tf1.parameters(),lr=1e-5)
loss_fn = nn.CrossEntropyLoss(reduction='mean')

In [268]:
def train_loop(dataloader, model, loss_fn, optimizer):
    size = len(dataloader.dataset)
    # Set the model to training mode - important for batch normalization and dropout layers
    # Unnecessary in this situation but added for best practices
    model.train()
    for batch, (X, y) in enumerate(dataloader):
        # Compute prediction and loss
        batch_size = len(y)
        pred = model(X, y)
        pred = pred.view(-1, pred.size(-1))  # [batch_size * seq_len, num_classes]
        y = y.view(-1)  # [batch_size * seq_len]
        y = y.long()
        loss = loss_fn(pred, y)

        # Backpropagation
        loss.backward()
        optimizer.step()
        optimizer.zero_grad()

        if batch % 100 == 0:
            loss, current = loss.item(), batch * batch_size + len(X)
            print(f"loss: {loss:>7f}  [{current:>5d}/{size:>5d}]")

In [270]:
nb_epoch = 3
for i in range(nb_epoch):
    train_loop(train_dataloader, tf1,loss_fn, adam)

loss: 8.246761  [   32/ 8000]
loss: 7.561241  [ 3232/ 8000]
loss: 6.849017  [ 6432/ 8000]
loss: 6.491234  [   32/ 8000]
loss: 5.809368  [ 3232/ 8000]
loss: 5.106065  [ 6432/ 8000]
loss: 4.766796  [   32/ 8000]
loss: 4.098043  [ 3232/ 8000]
loss: 3.438899  [ 6432/ 8000]


# Prédictions

In [286]:
def greedy_decode(model, src_tensor, max_len=10, start_token=1, end_token=2):
    output = []
    # Initial decoder token (start token for each batch)
    batch_size = src_tensor.shape[0]  # Get batch size from the source tensor (second dimension)
    dec_input = torch.full((1, batch_size), start_token, dtype=torch.long)  # Shape: (1, batch_size)

    # Transpose the src_tensor to match (seq_len, batch_size, embedding_size)
    src_tensor = src_tensor.transpose(0, 1)  # Transpose to (batch_size, seq_len, embedding_size)
    
    for i in range(max_len):
        # Get decoder output from the model (shape: batch_size, seq_len, vocab_size)
        dec_output = model(src_tensor, dec_input)  # Assuming model works with the new input shape

        # Select the most likely token (argmax over vocab_size)
        next_token = dec_output[-1, :, :].argmax(dim=-1)  # Shape: (batch_size,)
        
        # Append the next token for each sequence in the batch
        output.extend(next_token.tolist())  # This will append all tokens for the current batch
        
        # Convert next_token to shape (batch_size, 1) for concatenation
        next_token = next_token.unsqueeze(0)  # Shape: (1, batch_size)
        next_token = next_token.transpose(0,1)
        
        # Concatenate next_token along the sequence dimension (dim=1)
        dec_input = torch.cat([dec_input, next_token], dim=0)  # Shape: (seq_len+1, batch_size)


        # Stop if all sequences generate the end token
        if (next_token == end_token).all():  # If all sequences generate end_token
            break

    return output


In [288]:
def decode(x):
    for i in range(len(x)):
        if x[i] <= 3:
            x[i] = ""
        else:    
            for keys in target_vocab.keys():
                if x[i] == target_vocab[keys]:
                    x[i] = keys
    sentence = ""
    for i in range(len(x)):
        sentence += x[i] + " "
    return sentence

In [290]:
from nltk.translate.bleu_score import sentence_bleu

In [292]:
def evaluate_model(model, dataloader):
    model.eval()
    total_bleu = 0
    for  batch, (X, y) in enumerate(dataloader): 
        predicted_tokens = greedy_decode(model, X)
        predicted_translation = decode(predicted_tokens)
        
        target = decode(y.tolist()[0])
        bleu_score = sentence_bleu(target,predicted_translation )
        total_bleu += bleu_score

    avg_bleu = total_bleu / len(dataloader)
    print(f"Average BLEU Score: {avg_bleu}")

In [258]:
evaluate_model(tf1, test_dataloader)

KeyboardInterrupt: 

In [316]:
def translate(x,model = tf1):
    x = word_tokenize(x)
    for i in range(len(x) - 1, -1, -1):
        if x[i] in punctuation:
            x.pop(i)
    x = [stemmer.stem(word) for word in x]
    for i in range(len(x) - 1, -1, -1):
        if x[i] not in vocab:
            x.pop(i)
        else:
            x[i] = vocab[x[i]]
    x = make_vector(x, 40)
    x = [x]
    xset = TensorDataset(IntTensor(x), IntTensor(x))
    x_dataloader = DataLoader(xset, batch_size=1, shuffle=True, drop_last=True)
    
    for  batch, (X, y) in enumerate(x_dataloader):
        predicted_tokens = greedy_decode(model, X)
        print(predicted_tokens)
        print(batch)
        x = decode(predicted_tokens)   
    return x
    

In [320]:
translate("Etre ou ne pas etre")

[3, 3, 3, 3, 3, 3, 3, 3, 3, 3]
0


'          '