In [21]:
import re
import torch
import torch.nn.functional as F
import torch.nn as nn
import random
from torch.utils.data import DataLoader, TensorDataset
import torch.optim as optim
import os

# Deep Learning for NLP - lab exercise 1

## Data


In [22]:
# Tokenize a sentence
def clean_str(string: str, tolower:bool=True) -> str:
    """
    Tokenization/string cleaning.
    Original taken from https://github.com/yoonkim/CNN_sentence/blob/master/process_data.py
    """
    string = re.sub(r"[^A-Za-z0-9(),!?\'\`]", " ", string)
    string = re.sub(r"\'s", " \'s", string)
    string = re.sub(r"\'ve", " \'ve", string)
    string = re.sub(r"n\'t", " n\'t", string)
    string = re.sub(r"\'re", " \'re", string)
    string = re.sub(r"\'d", " \'d", string)
    string = re.sub(r"\'ll", " \'ll", string)
    string = re.sub(r",", " , ", string)
    string = re.sub(r"!", " ! ", string)
    string = re.sub(r"\(", " ( ", string)
    string = re.sub(r"\)", " ) ", string)
    string = re.sub(r"\?", " ? ", string)
    string = re.sub(r"\s{2,}", " ", string)
    if tolower:
        string = string.lower()
    return string.strip()


# reads the content of the file passed as an argument.
# if limit > 0, this function will return only the first "limit" sentences in the file.
def loadTexts(filename: str, limit:int=-1) -> list[list[str]]:
    dataset =[]
    with open(filename) as f:
        line: str = f.readline()
        cpt=1
        skip=0
        while line :
            cleanline = clean_str(f.readline()).split()
            if cleanline: 
                dataset.append(cleanline)
            else: 
                line = f.readline()
                skip+=1
                continue
            if limit > 0 and cpt >= limit: 
                break
            line = f.readline()
            cpt+=1        

        print("Load ", cpt, " lines from ", filename , " / ", skip ," lines discarded")
    return dataset


The following cell load the first 5000 sentences in each review set.


In [23]:
LIM = 10000
txtfile = "../data/imdb/imdb.pos"  # path of the file containing positive reviews
postxt = loadTexts(txtfile,limit=LIM)

txtfile = "../data/imdb/imdb.neg"  # path of the file containing negative reviews
negtxt = loadTexts(txtfile,limit=LIM)

Load  10000  lines from  ../data/imdb/imdb.pos  /  1  lines discarded
Load  10000  lines from  ../data/imdb/imdb.neg  /  3  lines discarded


Split the data between train / dev / test.


In [24]:
reviews = postxt + negtxt # Combine all reviews in a list
labels = [1] * len(postxt) + [0] * len(negtxt) # Create labels

from sklearn.model_selection import train_test_split

txt_train, txt_test_dev, label_train, label_test_dev = train_test_split(reviews, labels, # Splitting both reviews and labels
                                                                        train_size=0.7, # Keeping 70 % of the dataset as training data
                                                                        stratify=labels, # Using stratify to ensure both sets keep a 50/50 ratio betwwen positive and negative reviews 
                                                                        random_state=42) # Setting random_state for replicability


txt_test, txt_dev, label_test, label_dev = train_test_split(txt_test_dev, label_test_dev, # Splitting test and dev sets
                                                            train_size=0.5, # Keeping 15 % of the dataset as test/dev data
                                                            stratify=label_test_dev, # Using stratify to ensure both sets keep a 50/50 ratio betwwen positive and negative reviews 
                                                            random_state=42) # Setting random_state for replicability

# Converting data to Pytorch tensors


In [25]:
def build_vocab(sentences: list[list[str]]) -> dict[str, int]:
    """Generate the vocabulary used in a list of sentences.

    Args:
        sentences (list[list[str]]): List of sentences. Sentences should already be split by words/tokens

    Returns:
        dict[str, int]: Vocabulary, in the form of a dictionnary where each word is assigned to an integer
    """
    vocab = {'<UNK>' : 0, '<PAD>' : 1, '<BOS>' : 2, '<EOS>' : 3} # Initializing the vocab with an base tokens
    index = 4 # Starting BOW from index 1 

    for sentence in sentences: # Iterating through the sentences
        for word in sentence: # Iterating throigh each word in the sentence
            if word not in vocab: # If we didn't see the word already
                vocab[word] = index # We had it to the vocabulary
                index +=1 # We adjust the running index
    
    return vocab

def sentences_to_tensor(sentences:list[list[str]], vocab:dict[str, int], padding_size:int) -> torch.Tensor:

    tokenized_sentences = [
        [vocab.get('<BOS>')] + [vocab.get(word, vocab.get('<UNK>')) for word in sentence] + [vocab.get('<EOS>')] # Create the list of tokenized word. We add <BOS> and <EOS> for delimiting the sentence. If the word is not in the vocab dictionnary, we had the encoding for the <UNK> token
        for sentence in sentences
    ]

    tokenized_sentences = [
        tokens[:padding_size] + [vocab.get('<PAD>')] * max(0, padding_size - len(tokens))
        for tokens in tokenized_sentences
    ]

    tensor_data = torch.tensor(tokenized_sentences, dtype=torch.long)

    return tensor_data

vocab = build_vocab(txt_train)

tensor_txt_train = sentences_to_tensor(txt_train, vocab, max([len(sentence) for sentence in reviews]))
tensor_txt_dev = sentences_to_tensor(txt_dev, vocab, max([len(sentence) for sentence in reviews]))
tensor_txt_test = sentences_to_tensor(txt_test, vocab, max([len(sentence) for sentence in reviews]))

tensor_label_train = torch.tensor(label_train, dtype=torch.long)
tensor_label_dev = torch.tensor(label_dev, dtype=torch.long)
tensor_label_test = torch.tensor(label_test, dtype=torch.long)

# Neural network definition


In [26]:
# Naïve classifier
class Naive_classifier(nn.Module):
    """A simple naïve classification model. returns at random the class of a model.
    """
    def __init__(self) -> None:
        """Initialize model
        """
        super().__init__()

    def forward(self, inputs:torch.Tensor) -> torch.Tensor:
        """forward method for "training"

        Args:
            inputs (list[torch.Tensor]): batch input of the model

        Returns:
            torch.Tensor: random predictions. -1 predicts a negative class, 1 a positive class
        """
        batch_size = inputs.size(0)
        random.seed(42)
        random_predictions = torch.tensor([random.randint(0, 1) for _ in range(batch_size)])
        return random_predictions

# BAG of word classifier
class CBOW_classifier(nn.Module):
    """A simple Bag of words classifier
    """
    def __init__(self, vocab_size:int, embedding_dim:int) -> None:
        """Initialize the BagOfWords model

        Args:
            vocab_size (int):Size of the vocabulary, to create the embedding space
            embedding_dim (int): Dimension of the embedding space
        """
        super(CBOW_classifier, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.MLP = nn.Sequential(
            nn.Linear(embedding_dim, embedding_dim //2),
            nn.ReLU(),
            nn.Linear(embedding_dim //2, 1)
        )
        
    def forward(self, inputs:torch.Tensor) -> torch.Tensor:
        """Forward pass of the model

        Args:
            inputs (torch.Tensor): Batched inputs of the model

        Returns:
            torch.Tensor: logits output of the model
        """
        # Get embeddings and sum along the axis
        embedded = self.embedding(inputs) # (batch_size, sentence_len, embedding_dim)
        embedded = embedded.sum(dim=1) # (batch_size, embedding_dim)

        # Pass through the MLP to get the logits outputs
        logits = self.MLP(embedded) # (batch_size, 1)

        return logits
    

class CNN_classifier(nn.Module):
    """Based on the paper "Convolutional Neural Networks for Sentence Classification, Yoon Kim, 2014
    """
    def __init__(self, vocab_size:int, embedding_dim:int, num_filters:int, filter_sizes:list[int], dropout:int=0.5) -> None:
        """Initialize the CNN model

        Args:
            vocab_size (int): Size of the vocabulary
            embedsing_dim (int): Dimesnion of the embedding space
            num_filters (int):Number of convolutional filters
            filter_sizes (list[int]): List of sizes of the convolutional filters. Size 1 focus on unigrams, 2 on bigrams, 3 on trigrams ... 
            dropout (int, optional): _description_. Defaults to 0.5.
        """
        super(CNN_classifier, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim)

        self.convs = nn.ModuleList([nn.Conv1d(in_channels=embedding_dim,
                                             out_channels=num_filters,
                                             kernel_size = fs)
                                    for fs in filter_sizes])

        self.dropout = nn.Dropout(dropout)
        self.MLP = nn.Sequential(
            nn.Linear(num_filters * len(filter_sizes), (num_filters * len(filter_sizes)) //2),
            nn.ReLU(),
            nn.Linear((num_filters * len(filter_sizes)) //2, 1)
        )

    def forward(self, inputs:torch.Tensor)-> torch.Tensor:
        """Forward pass of the model

        Args:
            inputs (torch.Tensor): Batch of input tensors

        Returns:
            torch.Tensor: logits output of the model
        """

        embedded = self.embedding(inputs) # (batch_size, sentence_len, embedding_dim) 
        embedded = embedded.permute(0,2,1) # (batch_size, embedding_dim, sentence_len)

        out_conv = [F.relu(conv(embedded)).max(dim=2)[0]
                    for conv in self.convs]
        out_conv = torch.cat(out_conv, dim=1) # (batch_size, num-filters * len(filter_sizes))

        out_conv = self.dropout(out_conv) # (batch_size, num-filters * len(filter_sizes))

        logits = self.MLP(out_conv) # (batch_size, 1)

        return logits

## Training loop


In [27]:
def train_model(model:nn.Module, train_loader:DataLoader, dev_loader:DataLoader, num_epochs:int, criterion:nn.Module, optimizer:optim.Optimizer, device:torch.device, patience:int=2, min_delta:float=0.001) -> None:
    """Training a model passed as input

    Args:
        model (nn.Module): model to train
        train_loader (DataLoader): train set, as a DataLoader
        dev_loader (DataLoader): dev set, as a DataLoader
        num_epochs (int): Number of training epochs
        criterion (nn.Module): Loss fuction
        optimizer (optim.Optimizer): Optimizer for training
        device (torch.device): Device to run the training on
        patience (int): Number of epochs to wait before checking for early stopping. Defaults to 2.
        min_delta (float): Minimum variation in dev loss to check for early stopping. Defaults to 0.001
    """

    model.to(device)
    best_dev_loss = float('inf')
    no_improvement_epochs = 0
    for epoch in range(num_epochs):
        model.train()
        train_loss = 0

        for sentences, labels in train_loader:
            sentences, labels = sentences.to(device), labels.to(device)

            # Forward pass
            logits = model(sentences)
            loss = criterion(logits.squeeze().float(), labels.float())

            # Backpropagation
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            train_loss += loss.item()

        # Dev loop
        model.eval()
        dev_loss, correct, total = 0, 0, 0
        with torch.no_grad():
            for sentences, labels in dev_loader:
                sentences, labels = sentences.to(device), labels.to(device)

                logits = model(sentences)
                loss = criterion(logits.squeeze().float(), labels.float())
                dev_loss += loss.item()

                # Calculate accuracy
                predictions = (torch.sigmoid(logits.squeeze()) > 0.5).long()
                correct += (predictions == labels.long()).sum().item()
                total += labels.size(0)

        train_loss /= len(train_loader)
        dev_loss /= len(dev_loader)

        # Early stopping
        if dev_loss < best_dev_loss - min_delta:
            best_dev_loss = dev_loss
            no_improvement_epochs = 0
            print("Dev loss improved, saving current model.")
            torch.save(model.state_dict(), 'best_model.pth')
        else:
            no_improvement_epochs += 1

        if no_improvement_epochs >= patience:
            print("Patience reache. Early stopping triggered.")
            break

        print(f"Epoch [{epoch+1}/{num_epochs}], "
              f"Train Loss: {train_loss:.2f}, "
              f"Dev Loss: {dev_loss:.2f}, "
              f"Dev Accuraccy: {correct/total:.2f}")
        
    print("Loading best model.")
    model.load_state_dict(torch.load('best_model.pth', weights_only=True))
    os.remove('best_model.pth')
    print("Saved model deleted.")

In [28]:
from torch.utils.data import DataLoader, TensorDataset
import torch.optim as optim

train_dataset = TensorDataset(tensor_txt_train, tensor_label_train)
train_loader = DataLoader(train_dataset, batch_size = 16)

dev_dataset = TensorDataset(tensor_txt_dev, tensor_label_dev)
dev_loader = DataLoader(dev_dataset, batch_size = 16)


BOW_model = CBOW_classifier(vocab_size=len(vocab), embedding_dim=128)
criterion = nn.BCEWithLogitsLoss()
optimizer = optim.Adam(BOW_model.parameters())

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

num_epochs = 10

train_model(BOW_model, train_loader, dev_loader, num_epochs, criterion, optimizer, device)

Dev loss improved, saving current model.
Epoch [1/10], Train Loss: 0.74, Dev Loss: 0.68, Dev Accuraccy: 0.59
Epoch [2/10], Train Loss: 0.55, Dev Loss: 0.69, Dev Accuraccy: 0.63
Dev loss improved, saving current model.
Epoch [3/10], Train Loss: 0.45, Dev Loss: 0.67, Dev Accuraccy: 0.68
Epoch [4/10], Train Loss: 0.38, Dev Loss: 0.70, Dev Accuraccy: 0.69
Patience reache. Early stopping triggered.
Loading best model.
Saved model deleted.


In [29]:
from torch.utils.data import DataLoader, TensorDataset
import torch.optim as optim

train_dataset = TensorDataset(tensor_txt_train, tensor_label_train)
train_loader = DataLoader(train_dataset, batch_size = 16)

dev_dataset = TensorDataset(tensor_txt_dev, tensor_label_dev)
dev_loader = DataLoader(dev_dataset, batch_size = 16)


CNN_model = CNN_classifier(vocab_size=len(vocab), embedding_dim = 128, num_filters=10, filter_sizes=[2, 3, 4])
criterion = nn.BCEWithLogitsLoss()
optimizer = optim.Adam(CNN_model.parameters())

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

num_epochs = 10

train_model(CNN_model, train_loader, dev_loader, num_epochs, criterion, optimizer, device)

Dev loss improved, saving current model.
Epoch [1/10], Train Loss: 0.62, Dev Loss: 0.53, Dev Accuraccy: 0.74
Dev loss improved, saving current model.
Epoch [2/10], Train Loss: 0.49, Dev Loss: 0.46, Dev Accuraccy: 0.78
Dev loss improved, saving current model.
Epoch [3/10], Train Loss: 0.41, Dev Loss: 0.43, Dev Accuraccy: 0.79
Dev loss improved, saving current model.
Epoch [4/10], Train Loss: 0.35, Dev Loss: 0.42, Dev Accuraccy: 0.79
Epoch [5/10], Train Loss: 0.30, Dev Loss: 0.43, Dev Accuraccy: 0.79
Patience reache. Early stopping triggered.
Loading best model.
Saved model deleted.


## Predictions on test set


In [30]:
from sklearn.metrics import accuracy_score, classification_report

def evaluate_model(model:nn.Module, test_loader:DataLoader) -> tuple[float, str | dict]:
    """Evaluation function

    Args:
        model (nn.Module): Model to evaluate
        test_data (DataLoader): Test set to evaluate the model on
        test_labels (torch.Tensor): Test labels

    Returns:
        tuple[Float, str | dict]: accuarcy score and accuracy report from sklearn.metrics
    """
    predictions = []
    true_labels = []
    with torch.no_grad():
        for sentences, labels in test_loader:
            sentences, labels = sentences.to(device), labels.to(device)
            true_labels.extend(labels.tolist())

            # Forward pass
            logits = model(sentences)
            preds = (logits.sigmoid() > 0.5).long().tolist()
            predictions.extend(preds)

    accuracy = accuracy_score(true_labels, predictions)
    report = classification_report(true_labels, predictions)

    return accuracy, report


In [31]:
test_dataset = TensorDataset(tensor_txt_test, tensor_label_test)
test_loader = DataLoader(test_dataset, batch_size = 16)

naive_model = Naive_classifier()
naive_accuracy, naive_report = evaluate_model(naive_model, test_loader)
print("Naive Classifier (Random Predictions) Results")
print(f"Accuracy: {naive_accuracy:.2f}")
print(naive_report)

BOW_model.eval()
BOW_accuracy, BOW_report = evaluate_model(BOW_model, test_loader)
print("BOW Classifier Results")
print(f"Accuracy: {BOW_accuracy:.2f}")
print(BOW_report)

CNN_model.eval()
CNN_accuracy, CNN_report = evaluate_model(CNN_model, test_loader)
print("CNN Classifier Results")
print(f"Accuracy: {CNN_accuracy:.2f}")
print(CNN_report)

Naive Classifier (Random Predictions) Results
Accuracy: 0.50
              precision    recall  f1-score   support

           0       0.50      0.87      0.64      1500
           1       0.50      0.12      0.20      1500

    accuracy                           0.50      3000
   macro avg       0.50      0.50      0.42      3000
weighted avg       0.50      0.50      0.42      3000

BOW Classifier Results
Accuracy: 0.68
              precision    recall  f1-score   support

           0       0.92      0.40      0.56      1500
           1       0.62      0.96      0.75      1500

    accuracy                           0.68      3000
   macro avg       0.77      0.68      0.66      3000
weighted avg       0.77      0.68      0.66      3000

CNN Classifier Results
Accuracy: 0.79
              precision    recall  f1-score   support

           0       0.77      0.83      0.80      1500
           1       0.82      0.75      0.78      1500

    accuracy                           0.79  