In [None]:
%pip install lime
%pip install scikit-plot

In [None]:
import utils_data as utils

import torch
import torch.nn as nn 
import torch.nn.functional as F
from torchtext.data.utils import get_tokenizer
from torchtext.vocab import build_vocab_from_iterator
from torchtext.data.functional import to_map_style_dataset
from torch.utils.data import DataLoader
from torch.utils.data import WeightedRandomSampler
from torch.optim import Adam

from tqdm import tqdm
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix
import scikitplot as skplt
import matplotlib.pyplot as plt
from lime import lime_text
from collections import Counter
import numpy as np
import gc

device = "cuda" if torch.cuda.is_available() else "cpu"

### Importation du dataset

In [None]:
# Récupération du contenu des différents datasets
text_train, text_val = utils.get_data_split()
label_train, label_val = utils.get_labels_split()
text_test = utils.get_test()

# Fusion des textes et des labels sous forme d'une liste de tuple
merged_train = utils.merge_data_labels(text_train, label_train)
merged_val = utils.merge_data_labels(text_val, label_val)

In [None]:
# Solution pour randomiser la séparation train / val
text_all = utils.get_data()
labels_all = utils.get_labels()
limit = int(len(labels_all) * 0.8) + 1

merged_all = utils.merge_data_labels(text_all, labels_all)
np.random.shuffle(merged_all)

merged_train, merged_val = merged_all[:limit], merged_all[limit:] # Séparation entre les données de train et de validation (80% - 20%)

Note importante : toute la suite est inspiré de : https://coderzcolumn.com/tutorials/artificial-intelligence/pytorch-rnn-for-text-classification-tasks, ainsi que du cours IFT 6135-A2022.

### Création de tokens à partir des phrases

In [None]:
# On construit notre vocabulaire à partir des mots de la base de données
tokenizer = get_tokenizer("basic_english")

def build_vocabulary(datasets):
    for dataset in datasets:
        for text in dataset:
            yield tokenizer(text)

vocab = build_vocab_from_iterator(build_vocabulary([text_all]), min_freq=1, specials=["<UNK>"])
vocab.set_default_index(vocab["<UNK>"])

### Hyperparamètres

In [None]:
target_classes = ["negative", "neutral", "positive"]
max_words = 75
embed_len = 75
hidden_dim = 75
n_layers = 5
p_dropout = 0.3
epochs = 15
learning_rate = 0.001

### Préparation des données

In [None]:
# Comptage du nombre d'exemples par classe pour établir des poids et un "sampler" (sur l'ensemble des données d'entraînement choisies)
label_train = np.asarray([list(e) for e in merged_train])[:, 1].astype(int)
counter = Counter(label_train.tolist())

class_weights = [len(label_train) / counter[i] for i in range(len(counter))]
weights = [class_weights[label_train[i]] for i in range(len(label_train))]
sampler = WeightedRandomSampler(torch.DoubleTensor(weights), len(label_train))

In [None]:
# Préparation des données sous un format lisible par PyTorch
data_train, data_val = to_map_style_dataset(merged_train), to_map_style_dataset(merged_val)
data_test = to_map_style_dataset(text_test)

# Fonction servant à transformer nos données (textes) sous formes de nombres.
def vectorize_batch(batch):
    X, Y = list(zip(*batch))
    X = [vocab(tokenizer(text)) for text in X]
    X = [tokens+([0]* (max_words-len(tokens))) if len(tokens)<max_words else tokens[:max_words] for tokens in X] ## Bringing all samples to max_words length.

    return torch.tensor(X, dtype=torch.int32), torch.tensor(Y, dtype = torch.long)

# Fonction servant à transformer nos données (textes) sous formes de nombres (dans le cas de données de test, on a pas de labels)
def vectorize_test_batch(batch):
    X = list(batch)
    X = [vocab(tokenizer(text)) for text in X]
    X = [tokens+([0]* (max_words-len(tokens))) if len(tokens)<max_words else tokens[:max_words] for tokens in X] ## Bringing all samples to max_words length.

    return torch.tensor(X, dtype=torch.int32)

# Préparation des DataLoader pouvant être lu dans les modèles PyTorch
train_loader = DataLoader(data_train, batch_size=1024, collate_fn=vectorize_batch, sampler = sampler)
val_loader = DataLoader(data_val, batch_size=1024, collate_fn=vectorize_batch)
test_loader = DataLoader(data_test, batch_size=1024, collate_fn=vectorize_test_batch)

### Définition du RNN (GRU)

In [None]:
# Architecture de notre GRU
class RNNClassifier(nn.Module):
    def __init__(self):
        super(RNNClassifier, self).__init__()
        self.embedding_layer = nn.Embedding(num_embeddings=len(vocab), embedding_dim=embed_len)
        self.dropout = nn.Dropout(p_dropout)
        self.rnn = nn.GRU(input_size=embed_len, hidden_size=hidden_dim, num_layers=n_layers, batch_first=True)
        self.linear = nn.Linear(hidden_dim, len(target_classes))

    def forward(self, X_batch):
        embeddings = self.embedding_layer(X_batch).to(device)
        output, hidden = self.rnn(self.dropout(embeddings))
        return F.softmax(self.linear(output[:,-1]), dim=-1)

### Entraînement du RNN

In [None]:
# Fonction d'évaluation du RNN
def evaluate(model, loss_fn, val_loader):
    model.eval()
    with torch.no_grad():
        Y_true, Y_preds, losses = [],[],[]
        for X, Y in val_loader:
            Y_true.append(Y)
            X, Y = X.to(device), Y.to(device)
            preds = model(X)
            loss = loss_fn(preds, Y)
            losses.append(loss.item())
            Y_preds.append(preds.argmax(dim=-1))

        Y_true = torch.cat(Y_true)
        Y_preds = torch.cat(Y_preds)

        print("Valid Loss : {:.3f} | Valid Acc : {:.3f}".format(torch.tensor(losses).mean(), accuracy_score(Y_true.detach().cpu().numpy(), Y_preds.detach().cpu().numpy())))
        
        return torch.tensor(losses).mean(), accuracy_score(Y_true.detach().cpu().numpy(), Y_preds.detach().cpu().numpy())
# Fonction d'entraînement du RNN
def train(model, loss_fn, optimizer, train_loader, val_loader, epochs=10):
    train_losses, val_losses, train_accs, val_accs = [], [], [], []
    for i in range(1, epochs+1):
        Y_true, Y_preds, losses = [],[],[]
        for X, Y in tqdm(train_loader):
            Y_true.append(Y)
            X, Y = X.to(device), Y.to(device)
            optimizer.zero_grad()
            preds = model(X)
            loss = loss_fn(preds, Y)
            losses.append(loss.item())
            Y_preds.append(preds.argmax(dim=-1))
            loss.backward()
            optimizer.step()

        Y_true = torch.cat(Y_true)
        Y_preds = torch.cat(Y_preds)
            
        print("Epoch {} | Train Loss : {:.3f} | Train acc : {:.3f}".format(i, torch.tensor(losses).mean(), accuracy_score(Y_true.detach().cpu().numpy(), Y_preds.detach().cpu().numpy())))
        train_losses.append(torch.tensor(losses).mean())
        train_accs.append(accuracy_score(Y_true.detach().cpu().numpy(), Y_preds.detach().cpu().numpy()))
        
        val_loss, val_acc = evaluate(model, loss_fn, val_loader)
        val_losses.append(val_loss)
        val_accs.append(val_acc)

        model.train()

    return train_losses, val_losses, train_accs, val_accs

In [None]:
# On déclare le modèle, la fonction de coût et l'optimiseur avant de lancer l'entraînement
loss_fn = nn.CrossEntropyLoss()
rnn_classifier = RNNClassifier().to(device)
optimizer = Adam(rnn_classifier.parameters(), lr=learning_rate)

train_loss, val_loss, train_acc, val_acc = train(rnn_classifier, loss_fn, optimizer, train_loader, val_loader, epochs)

### Prédictions sur l'ensemble de test

In [None]:
# Fonction de prédiction sur un ensemble de test
def predict(model, loader):
    Y_preds = []
    for X in tqdm(loader):
        X = X.to(device)
        preds = model(X)
        Y_preds.append(preds.detach().cpu())
    gc.collect()
    Y_preds = torch.cat(Y_preds)

    return Y_preds.argmax(dim=-1).numpy()

Y_preds = predict(rnn_classifier, test_loader)

### Enregistrement des résultats

In [None]:
# Enregistrement des résultats dans le dossier "data"
utils.save_results(Y_preds, "RNN")

### Explicabilité : Matrice de confusion et LIME

In [None]:
# On considère l'ensemble de validation et on fait des prédictions dessus
val_loader_test = DataLoader(text_val, batch_size=1024, collate_fn=vectorize_test_batch)
Y_preds_val = predict(rnn_classifier, val_loader_test)
Y_actual_val = label_val[:, 1]

In [None]:
# Affichage de la matrice de confusion
skplt.metrics.plot_confusion_matrix([target_classes[i] for i in Y_actual_val], [target_classes[i] for i in Y_preds_val],
                                    normalize=True,
                                    title="Confusion Matrix",
                                    cmap="Purples",
                                    hide_zeros=False,
                                    figsize=(5,5)
                                    );
plt.xticks(rotation=90);

In [None]:
# On récupère les éléments utilisés pour l'explication par LIME
X_test_text, Y_test = [], []
for X, Y in merged_val:
    X_test_text.append(X)
    Y_test.append(Y)

explainer = lime_text.LimeTextExplainer(class_names=target_classes, verbose=True)

# Fonction permettant à LIME de faire ses prédictions
def make_predictions_lime(X_batch_text):
    X = [vocab(tokenizer(text)) for text in X_batch_text]
    X = [tokens+([0]* (max_words-len(tokens))) if len(tokens)<max_words else tokens[:max_words] for tokens in X] ## Bringing all samples to max_words length.
    preds = rnn_classifier(torch.tensor(X, dtype=torch.int32, device = device))
    return preds.detach().cpu().numpy()

# On prend un élément au hasard pour voir quelle prédiction on fait dessus
idx = int(np.random.uniform(0, len(Y_test), 1))
X = [vocab(tokenizer(text)) for text in X_test_text[idx:idx+1]]
X = [tokens+([0]* (max_words-len(tokens))) if len(tokens)<max_words else tokens[:max_words] for tokens in X] ## Bringing all samples to max_words length.
preds = rnn_classifier(torch.tensor(X, dtype=torch.int32, device = device))

# On affiche l'explication de LIME
explanation = explainer.explain_instance(X_test_text[idx], classifier_fn=make_predictions_lime,
                                         labels=Y_test[idx:idx+1])              
explanation.show_in_notebook()

# On compare la prédiction avec la vrai classe
print("Prediction : ", target_classes[preds.argmax()])
print("Actual :     ", target_classes[Y_test[idx]])