In [2]:
import re
import numpy as np
import torch as th
import torch.autograd as ag
import torch.nn.functional as F
import torch.nn as nn
import random
from torch.utils.data import DataLoader
from torch.nn.utils.rnn import pad_sequence


# Deep Learning for NLP - lab exercise 1

In this first lab exercise we will implement a simple bag-of-word classifier, i.e. a classifier that ignores the sequential structure of the sentence, and a classifier based on a convolutional neural network (CNN). The goal is to predict if a sentence is a positive or negative review of a movie. We will use a dataset constructed from IMDB.

1. Load and clean the data
2. Preprocess the data for the NN
3. Module definition
4. Train the network!   

We will implement this model with Pytorch, the most popular deep learning framework for Natural Language Processing. You can use the following links for help:

- turorials: http://pytorch.org/tutorials/   
- documentation: http://pytorch.org/docs/master/

## Data

The data can be download here: http://caio-corro.fr/dl4nlp/imdb.zip   

There are two files: one with positive reviews (imdb.pos) and one with negative reviews (imdb.neg). Each file contains 300000 reviews, one per line.   

The following functions can be used to load and clean the data.

In [3]:
# Tokenize a sentence
def clean_str(string, tolower=True):
    """
    Tokenization/string cleaning.
    Original taken from https://github.com/yoonkim/CNN_sentence/blob/master/process_data.py
    """
    string = re.sub(r"[^A-Za-z0-9(),!?\'\`]", " ", string)
    string = re.sub(r"\'s", " \'s", string)
    string = re.sub(r"\'ve", " \'ve", string)
    string = re.sub(r"n\'t", " n\'t", string)
    string = re.sub(r"\'re", " \'re", string)
    string = re.sub(r"\'d", " \'d", string)
    string = re.sub(r"\'ll", " \'ll", string)
    string = re.sub(r",", " , ", string)
    string = re.sub(r"!", " ! ", string)
    string = re.sub(r"\(", " \( ", string)
    string = re.sub(r"\)", " \) ", string)
    string = re.sub(r"\?", " \? ", string)
    string = re.sub(r"\s{2,}", " ", string)
    if tolower:
        string = string.lower()
    return string.strip()


# reads the content of the file passed as an argument.
# if limit > 0, this function will return only the first "limit" sentences in the file.
def loadTexts(filename, limit=-1):
    dataset=[]
    with open(filename) as f:
        line = f.readline()
        cpt=1
        skip=0
        while line :
            cleanline = clean_str(f.readline()).split()
            if cleanline:
                dataset.append(cleanline)
            else:
                line = f.readline()
                skip+=1
                continue
            if limit > 0 and cpt >= limit:
                break
            line = f.readline()
            cpt+=1

        print("Load ", cpt, " lines from ", filename , " / ", skip ," lines discarded")
    return dataset

The following cell load the first 5000 sentences in each review set.

In [4]:
LIM = 5000
txtfile = './imdb.pos'  # path of the file containing positive reviews
postxt = loadTexts(txtfile,limit=LIM)

txtfile = './imdb.neg' # path of the file containing negative reviews
negtxt = loadTexts(txtfile,limit=LIM)

Load  5000  lines from  ./imdb.pos  /  1  lines discarded
Load  5000  lines from  ./imdb.neg  /  1  lines discarded


Split the data between train / dev / test, for example by creating lists txt_train, label_train, txt_dev, ... You should take care to keep a 50/50 ratio between positive and negative instances in each set.

In [5]:
# 70% des data pour train, 15% pour dev et test
txt_train = postxt[:int(LIM*0.7)] + negtxt[:int(LIM*0.7)]
txt_dev = postxt[int(LIM*0.7):int(LIM*0.85)] + negtxt[int(LIM*0.7):int(LIM*0.85)]
txt_test = postxt[int(LIM*0.85):] + negtxt[int(LIM*0.85):]

In [None]:
'''
def replace_with_unk(sentence, unk_prob=0.025):
    return [word if random.random() > unk_prob else '<unk>' for word in sentence]
#fonction pour placer de maniere aleatoire des "unk"
train_data_with_unk = [replace_with_unk(sentence) for sentence in txt_train]
'''

In [6]:
# on fait les labels
def create_labels(texts, label):
    return [label] * len(texts)

labels_train = create_labels(postxt[:int(LIM*0.7)], 1) + create_labels(negtxt[:int(LIM*0.7)], 0)
labels_dev = create_labels(postxt[int(LIM*0.7):int(LIM*0.85)], 1) + create_labels(negtxt[int(LIM*0.7):int(LIM*0.85)], 0)
labels_test = create_labels(postxt[int(LIM*0.85):], 1) + create_labels(negtxt[int(LIM*0.85):], 0)


# Converting data to Pytorch tensors

We will first convert data to Pytorch tensors so they can be used in a neural network. To do that, you must first create a dictionnary that will map words to integers. Add to the dictionnary only words that are in the training set (be sure to understand why we do that!).

Then, you can convert the data to tensors:

- use tensors of longs: both the sentence and the label will be represented as integers, not floats!
- these tensors do not require a gradient

A tensor representing a sentence is composed of the integer representation of each word, e.g. [10, 256, 3, 4]. Note that some words in the dev and test sets may not be in the dictionnary! (i.e. unknown words) You can just skip them, even if this is a bad idea in general.

In [7]:
#on construit un dictionnaire de mot a partir de l'ensemble de train

word_dict = {}
for sentence in txt_train:
    for word in sentence:
        if word not in word_dict:
            word_dict[word] = len(word_dict) + 1

pad_token = '<pad>'
word_dict[pad_token] = len(word_dict)
unk_token = '<unk>'
word_dict[unk_token] = len(word_dict)

In [8]:
#convertion data tensor
def sentence_to_tensor(sentence, word_dict):
    return th.LongTensor([word_dict.get(word, word_dict.get('<unk>', 0)) for word in sentence])
#la fonction get permet de mapper les mots de la sentence en fonction du dictionnaire, si le mot n'est pas trouvé dans le dictionnaire on met "unk"

In [9]:
def collate_fn(batch, word_dict, pad_token='<pad>'):

    # Récupérer l'indice du token de padding
    pad_idx = word_dict.get(pad_token, None)

    # Séparer les textes et les étiquettes dans le batch
    text_tensors = [sentence_tensor for sentence_tensor, _ in batch]
    labels = [label for _, label in batch]

    # Ajouter du padding aux textes pour les rendre de taille égale
    padded_texts = pad_sequence(text_tensors, batch_first=True, padding_value=pad_idx)

    # Convertir les labels en tenseur
    labels_tensor = th.tensor(labels, dtype=th.float if isinstance(labels[0], float) else th.long)

    return padded_texts, labels_tensor


In [10]:
#on convertit en tensor en ajoutant les labels
batch_size = 64
train_tensors = [(sentence_to_tensor(sentence, word_dict), label) for sentence, label in zip(txt_train, labels_train)]
dev_tensors = [(sentence_to_tensor(sentence, word_dict), label) for sentence, label in zip(txt_dev, labels_dev)]
test_tensors = [(sentence_to_tensor(sentence, word_dict), label) for sentence, label in zip(txt_test, labels_test)]
train_loader = DataLoader(train_tensors, batch_size=batch_size, shuffle=True, collate_fn=lambda batch: collate_fn(batch, word_dict))
dev_loader = DataLoader(dev_tensors, batch_size=batch_size, shuffle=True, collate_fn=lambda batch: collate_fn(batch, word_dict))
test_loader = DataLoader(test_tensors, batch_size=batch_size, shuffle=True, collate_fn=lambda batch: collate_fn(batch, word_dict))


# Neural network definition

You need to implement two networks:

- a simple bag of word model (note: it may be better to take the mean of input embeddings that the sum)
- a simple CNN as described in the course   

To simplify code, you can assume the input will always be a single sentence first, and then implement batched inputs. In the case of batched inputs, give to the forward function a (python) list of tensors.

The bag of word neural network should be defined as follows:

- take as input a tensor that is a sequence of integers indexing word embeddings
- retrieve the word embeddings from an embedding table
- construct the "input" of the MLP by summing (or computing the mean) over all embeddings (i.e. bag-of-word model)
- build a hidden represention using a MLP (1 layer? 2 layers? experiment! but maybe first try wihout any hidden layer...)
- project the hidden representation to the output space: it is a binary classification task, so the output space is a scalar where a negative (resp. positive) value means the review is negative (resp. positive).


The CNN is a little bit more tricky to implement. The goal is that you implement the one presented in the first lecture. Importantly, you should add "padding" tokens before and after the sentence so you can have a convolution even when there is a single word in the input. For example, if you input sentence is ["word"], you want to instead consider the sentence ["<BOS>", "word", "<EOS>"] if your window is of size 2 or 3. You can do this either directly when you load the data, or you can do that in the neural network module.

In [11]:
# BAG of word classifier
vocab_size = len(word_dict)
class CBOW_classifier(nn.Module):
    def __init__(self, vocab_size, embedding_dim):
        super(CBOW_classifier, self).__init__()
        self.embeddings = nn.Embedding(vocab_size, embedding_dim)
        self.fc = nn.Linear(embedding_dim, 1)
        self.sigmoid = nn.Sigmoid()  # pour la classification binaire


    def forward(self, inputs):
        # TODO
        # Récupérer les embeddings pour les mots dans la séquence
        embedded = self.embeddings(inputs)
        #on fait la moyenne des embeddings
        sentence_embedding = embedded.mean(dim=1)

        hidden = self.fc(sentence_embedding)

        output = self.sigmoid(hidden)
        return output

In [16]:
# BAG of word classifier
vocab_size = len(word_dict)
class CBOW_classifier_2(nn.Module):
    def __init__(self, vocab_size, embedding_dim,hidden_dim):
        super(CBOW_classifier_2, self).__init__()
        self.embeddings = nn.Embedding(vocab_size, embedding_dim)
        self.hidden = nn.Linear(embedding_dim, hidden_dim)
        self.relu = nn.ReLU()  # Fonction d'activation pour la couche cachée
        # Output layer
        self.fc = nn.Linear(hidden_dim, 1)
        self.sigmoid = nn.Sigmoid()  # pour la classification binaire


    def forward(self, inputs):
       # Obtenir les embeddings pour les mots dans la séquence
        embedded = self.embeddings(inputs)
        # Calculer la moyenne des embeddings
        sentence_embedding = embedded.mean(dim=1)

        # Propagation à travers la couche cachée
        hidden = self.relu(self.hidden(sentence_embedding))

        # Propagation finale
        output = self.sigmoid(self.fc(hidden))
        return output


In [19]:
model_one_layer = CBOW_classifier_2(vocab_size=len(word_dict), embedding_dim=300, hidden_dim=150)


In [72]:
# Modèle CNN pour la classification
class CNN_classifier(nn.Module):
    def __init__(self, vocab_size, embedding_dim, kernel_size, num_filters):
        super(CNN_classifier, self).__init__()
        self.embeddings = nn.Embedding(vocab_size, embedding_dim)

        self.conv = nn.Conv1d(in_channels=embedding_dim, out_channels=num_filters, kernel_size=kernel_size)

        # Max pooling pour réduire la dimensionnalité
        self.pool = nn.AdaptiveMaxPool1d(1)

        # Classification finale
        self.fc = nn.Linear(num_filters, 1)
        self.sigmoid = nn.Sigmoid()

    def forward(self, inputs):
        embedded = self.embeddings(inputs) # [batch_size, seq_length, embedding_dim]

        embedded = embedded.permute(0, 2, 1)  # [batch_size, embedding_dim, seq_length]

        # Passer dans la convolution
        conv_out = self.conv(embedded)  # [batch_size, num_filters, seq_length - kernel_size + 1]

        # Appliquer le pooling
        pooled_out = self.pool(conv_out)  # [batch_size, num_filters, 1]

        # Passer dans la couche lineaire
        pooled_out = pooled_out.view(pooled_out.size(0), -1)  # [batch_size, num_filters]
        hidden = self.fc(pooled_out)  # [batch_size, 1]

        # Appliquer la fonction d'activation
        output = self.sigmoid(hidden)  # [batch_size, 1]

        return output

# Loss function

Create a loss function builder.

- Pytorch loss functions are documented here: https://pytorch.org/docs/stable/nn.html#loss-functions
- In our case, we are interested in BCELoss and BCEWithLogitsLoss. Read their documentation and choose the one that fits with your network output

In [23]:
def build_loss_function():
    return nn.BCELoss()  # BCELoss, applique une perte cross-entropy binaire
    # pour BCELoss on doit appliquer la fonction d'activation sigmoid, ce qui n'est pas le cas avec BCEWithLogitsLoss

# Training loop

Write your training loop!

- parameterizable number of epochs
- at each epoch, print the mean loss and the dev accuracy

In [20]:
def train(model, train_data, dev_data, loss_fn, optimizer, num_epochs=10, batch_size=64):
    """

    :param model: Le modèle à entraîner (CNN ou bag-of-words).
    :param train_data: Données d'entraînement sous forme de liste de tuples (texte, label).
    :param dev_data: Données de développement sous forme de liste de tuples (texte, label).
    :param loss_fn: Fonction de perte (BCELoss).
    :param optimizer: Optimiseur
    :param num_epochs: Nombre d'époques pour l'entraînement.
    :param batch_size: Taille du batch pour l'entraînement.
    """

    #  DataLoader pour l'entraînement et la validation, on realise ici le shuffle des données.
    train_loader = train_data
    dev_loader =  dev_data

    for epoch in range(num_epochs):
        model.train()
        epoch_loss = 0.0
        correct_train = 0
        total_train = 0

        # Entraîner le modèle par lots
        for texts, labels in train_loader:
            optimizer.zero_grad()  # Réinitialiser les gradients
            outputs = model(texts)  # Propagation avant
            loss = loss_fn(outputs.squeeze(), labels.float())  # Calcul de la perte
            loss.backward()  # backpropagation
            optimizer.step()  # Mise à jour des poids

            epoch_loss += loss.item()  # Ajouter la perte de ce lot
            predicted = (outputs.squeeze() > 0.5).float()  # Prédire 0 ou 1 avec un seuil de 0.5
            correct_train += (predicted == labels).sum().item()  # Nombre de prédictions correctes
            total_train += labels.size(0)  # Nombre total d'exemples

        # Calcul de la précision sur l'ensemble d'entraînement
        train_accuracy = correct_train / total_train * 100  #pour afficher en pourcentage

        # Évaluation sur l'ensemble de validation (dev)
        model.eval()
        correct_dev = 0
        total_dev = 0

        with th.no_grad():  # Désactiver les gradients pour la validation
            for texts, labels in dev_loader:
                outputs = model(texts)
                predicted = (outputs.squeeze() > 0.5).float()
                correct_dev += (predicted == labels).sum().item()
                total_dev += labels.size(0)

        dev_accuracy = correct_dev / total_dev * 100

        # Affichage des résultats
        print(f"Époque {epoch+1}/{num_epochs}")
        print(f"  Perte entraînement: {epoch_loss/len(train_loader):.4f}")
        print(f"  Précision entraînement: {train_accuracy:.2f}%")
        print(f"  Précision validation: {dev_accuracy:.2f}%\n")
    return model

In [21]:
def test(model, test_data, loss_fn):

    model.eval()
    correct_test = 0
    total_test = 0
    test_loss = 0.0

    with th.no_grad():
        for texts, labels in test_data:
            outputs = model(texts)

            loss = loss_fn(outputs.squeeze(), labels.float())
            test_loss += loss.item()

            # Calcul des prédictions
            predicted = (outputs.squeeze() > 0.5).float()

            # Compter le nombre de prédictions correctes
            correct_test += (predicted == labels).sum().item()
            total_test += labels.size(0)

    test_accuracy = correct_test / total_test * 100
    avg_test_loss = test_loss / len(test_data)

    print(f"Perte de test: {avg_test_loss:.4f}")
    print(f"Précision de test: {test_accuracy:.2f}%")

    return test_accuracy, avg_test_loss


In [29]:
loss_fn = build_loss_function()
train_data = train_tensors
dev_data = dev_tensors
model_1 = CBOW_classifier(len(word_dict), 300)
#model_2 = CNN_classifier(len(word_dict), 300, 5, 50)
optimizer = th.optim.Adam(model_1.parameters(), lr=0.001)


In [30]:
model_1_train = train(model_1, train_loader, dev_loader, loss_fn, optimizer)

Époque 1/10
  Perte entraînement: 0.6853
  Précision entraînement: 55.49%
  Précision validation: 66.13%

Époque 2/10
  Perte entraînement: 0.6575
  Précision entraînement: 66.24%
  Précision validation: 64.80%

Époque 3/10
  Perte entraînement: 0.6207
  Précision entraînement: 73.43%
  Précision validation: 69.33%

Époque 4/10
  Perte entraînement: 0.5766
  Précision entraînement: 76.96%
  Précision validation: 73.47%

Époque 5/10
  Perte entraînement: 0.5280
  Précision entraînement: 80.37%
  Précision validation: 76.60%

Époque 6/10
  Perte entraînement: 0.4855
  Précision entraînement: 82.01%
  Précision validation: 77.33%

Époque 7/10
  Perte entraînement: 0.4425
  Précision entraînement: 85.24%
  Précision validation: 79.00%

Époque 8/10
  Perte entraînement: 0.4057
  Précision entraînement: 86.57%
  Précision validation: 78.13%

Époque 9/10
  Perte entraînement: 0.3742
  Précision entraînement: 87.97%
  Précision validation: 78.87%

Époque 10/10
  Perte entraînement: 0.3440
  Pr

In [28]:
accuracy, loss = test(model_1_train, test_loader, loss_fn)

Perte de test: 0.5678
Précision de test: 76.87%


In [None]:
train(model_2, train_loader, dev_loader, loss_fn, optimizer)

Époque 1/10
  Perte entraînement: 0.7073
  Précision entraînement: 49.40%
  Précision validation: 48.80%

Époque 2/10
  Perte entraînement: 0.7074
  Précision entraînement: 49.43%
  Précision validation: 48.80%

Époque 3/10
  Perte entraînement: 0.7072
  Précision entraînement: 49.47%
  Précision validation: 48.60%

Époque 4/10
  Perte entraînement: 0.7073
  Précision entraînement: 49.39%
  Précision validation: 48.93%

Époque 5/10
  Perte entraînement: 0.7073
  Précision entraînement: 49.46%
  Précision validation: 48.73%

Époque 6/10
  Perte entraînement: 0.7073
  Précision entraînement: 49.43%
  Précision validation: 48.87%

Époque 7/10
  Perte entraînement: 0.7072
  Précision entraînement: 49.41%
  Précision validation: 48.80%

Époque 8/10
  Perte entraînement: 0.7074
  Précision entraînement: 49.33%
  Précision validation: 48.73%

Époque 9/10
  Perte entraînement: 0.7073
  Précision entraînement: 49.37%
  Précision validation: 48.80%

Époque 10/10
  Perte entraînement: 0.7073
  Pr