# Sentiment classifier with LSTM

In this notebook, we will implement a simple sentiment classifier using an LSTM. Follow the notebook, complete the missing part, answer the questions and apply the asked modifications.



In [None]:
!pip install datasets

In [None]:
from collections import defaultdict
from copy import deepcopy
import re
from typing import Callable, List, Tuple

from datasets import load_dataset
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import torch
from torch import nn
import torch.nn.functional as F
from tqdm.auto import tqdm
from datetime import datetime
from sklearn.utils import shuffle

## Downloading the dataset

Using the datasets library, we load the imdb dataset.

In [None]:
dataset = load_dataset("imdb")

In [None]:
dataset

In [None]:
# We do not need the "unsupervised" split.
dataset.pop("unsupervised")

In [None]:
dataset

In [None]:
dataset["train"][:2]

## Pretreatment **(1 point)**

Code the `pretreatment` function which clean the input text. Look at the dataset and deduce which treatment is needed.

In [None]:
from string import punctuation


def pretreatment(text: str) -> str:
	"""Clean IMDB text entries.
    Args:
        text: an input string.
    Returns:
        The cleaned text.
    """
	# Your code here
	lower = text.lower()
	return lower.translate(str.maketrans('', '', punctuation))

In [None]:
# This applies the pretreatment function to all
clean_dataset = dataset.map(lambda x: {"text": pretreatment(x["text"]), "label": x["label"]})

Let's see what the text now look like.

In [None]:
clean_dataset["train"]["text"][:3]

Let's take a quick look at the labels. Notice that the labels are ordered in the training set starting by the negative reviews (0), followed by the positive ones (1). Training neural networks on this kind of configuration tends to considerably affect their performances. So the dataset will have to be shuffled.

In [None]:
clean_dataset["train"]["label"][12490:12510]

## Train/validation split **(1 point)**

In our example, we consider the test split as production data. Which means, we need to treat it as if we never see it during the training process. To experiment on the model, we need to split the training set into a training and validation set. See [here](https://huggingface.co/course/chapter5/3?fw=pt#creating-a-validation-set) on how to do so with the `Datasets` library.

Don't forget to **stratify** your split (we need to have the same proportion of class in both training and validation set).

In [None]:
from datasets import DatasetDict
clean_dataset = DatasetDict({
    'train': load_dataset('imdb', split='train[:40%]+train[50%:90%]'),
    'validation': load_dataset('imdb', split='train[40%:50%]+train[90%:]'),
    'test': clean_dataset['test']
}).shuffle(seed=42)
'''
l = int(25000/2)
perc = 0.80
lperc= int(l*perc)

train_text =clean_dataset['train']['text'][:lperc] +  clean_dataset['train']['text'][l:l+lperc]
val_text = clean_dataset['train']['text'][lperc:l] +  clean_dataset['train']['text'][l+lperc:2*l]

train_label=clean_dataset['train']['label'][:lperc] +  clean_dataset['train']['label'][l:l+lperc]
val_label= clean_dataset['train']['label'][lperc:l] +  clean_dataset['train']['label'][l+lperc:2*l]

train = {'text': train_text, 'label': train_label}
val = {'text': val_text, 'label': val_label}
'''

In [None]:
clean_dataset['train']['label'][:10]

In [None]:
# from sklearn.model_selection import train_test_split
# X_train,X_val,Y_train,Y_val = train_test_split(clean_dataset["train"]["text"],clean_dataset["train"]["label"],test_size=0.2,shuffle=True)

In [None]:
# sum(Y_train)

In [None]:
clean_dataset

In [None]:
# Check here that the dataset is stratified.

## Categorical encoding of the vocabulary **(2 points)**

We can't feed word to a neural network. A usual solution is to turn words into categorical data by using one-hot encoding. To avoid an explosion in vocabulary size, we will only keep words which appear more than a certain amount of time.

The `Vocabulary` class below will do that for us.

In [None]:
UNK_TOKEN = "<UNK>"
PAD_TOKEN = "<PAD>"


class Vocabulary:
	"""Vocabulary manager on a collection.
    """

	def __init__(self) -> None:
		"""No parameters to provide.
        """
		# Index to word mapping.
		self.index2word = [PAD_TOKEN, UNK_TOKEN]
		# Word to index mapping.
		self.word2index = {value: key for key, value in enumerate(self.index2word)}
		# Word counter.
		self.word2count = defaultdict(int)

	def add_word(self, word: str) -> None:
		"""Increments the count of a word to the vocabulary.
        Args:
            word: the word.
        """
		self.word2count[word] += 1
		if not word in self.word2index:
			self.word2index[word] = len(self.index2word)
			self.index2word.append(word)

	def add_text(self, text: str, separator: str = " ") -> None:
		"""Add the words given in a text to our vocabulary.
        Args:
            text: a sequence of words separated by a given separator.
            separator: the separator used to split our text (default is " ").
        """
		for word in text.split(separator):
			self.add_word(word)

	def get_index(self, word: str) -> int:
		"""Returns the index of a given word in our vocabulary.
        If the word is not in the vocabulary, returns the index for UNK_TOKEN.
        Args:
            word: a string.
        Returns:
            The corresponding index or the index for UNK_TOKEN.
        """
		return (
			self.word2index[word]
			if word in self.word2index
			else self.word2index[UNK_TOKEN]
		)

	def get_word(self, index: int) -> str:
		"""Returns the word at a given index in our vocabulary.
        Args:
            index: the word position in our vocabulary.
        Returns:
            The word corresponding to the given index.
        """
		return self.index2word[index]

	def get_word_count(self, word: str) -> int:
		"""Returns the number of occurences for a given word.
        Raise a 
        Args:
            The word.
        Returns:
            Its number of measured occurences.
        """
		return self.word2count[word]

	def get_vocabulary(self) -> List[str]:
		"""Returns a copy of the whole vocabulary list.
        Returns:
            A list of words.
        """
		return deepcopy(self.index2word)

	def __len__(self) -> int:
		"""len() function.
        Returns:
            The number of words in the vocabulary.
        """
		return len(self.index2word)

	def trim_vocabulary(self, min_occurences: int = 5) -> None:
		"""Trim the vocabulary based on the number of occurrences of each words.
        Note that whole counts of deleted words are added to the UNK_TOKEN counts.
        Args:
            min_occurences: the minimum number of occurences for a word to be kept.
        """
		to_delete = {
			word for word, count in self.word2count.items() if count < min_occurences
		}
		new_word2count = defaultdict(int)
		for word, count in self.word2count.items():
			if word not in to_delete:
				new_word2count[word] = count
			else:
				new_word2count[UNK_TOKEN] += count
		new_index2word = [word for word in self.index2word if word not in to_delete]
		new_word2index = {word: index for index, word in enumerate(new_index2word)}

		self.word2count = new_word2count
		self.index2word = new_index2word
		self.word2index = new_word2index

**(1 point)** Get the vocabulary on both the training and validation set using the `Vocabulary` class. Remember, we don't use the test set here as we consider it as proxy production data. The trim it down as you see fit (around 20K words in the vocabulary is a good value).

In [None]:
vocabulary = Vocabulary()
for rev in clean_dataset['train']['text']:
  vocabulary.add_text(rev)
for rev in clean_dataset['validation']['text']:
  vocabulary.add_text(rev)
# Your code...
vocabulary.__len__()

In [None]:
vocabulary.trim_vocabulary(15)
vocabulary.__len__()

**(1 point)** Fill the encoding and decoding functions. The encoding function takes a text as input and returns a list IDs corresponding to the index of each word in the vocabulary. The decoding function reverse the process, turning a list of IDs into a text. Make sure the encoding function returns a numpy array.

In [None]:
# Encoding and decoding function

def encode_text(text: str) -> np.ndarray:
  tokens = text.split()
  return np.array(list(map(lambda word: vocabulary.get_index(word), tokens)))

def decode_text(encoded_text: np.ndarray) -> str:
  tokens = list(map(lambda index: vocabulary.get_word(index), encoded_text))
  return " ".join(tokens)

To make sure everything went well, we compare a text before and after encoding and then decoding it. You should see rare words / typos replaced by the `<UNK>` token.

In [None]:
# Apply the encoding function to the entire dataset.
encoded_dataset = clean_dataset.map(lambda x: {"text": encode_text(x["text"]), "label": x["label"]})

In [None]:
type(encoded_dataset["train"]["text"][0][0])

In [None]:
clean_dataset["train"]["text"][0], decode_text(encoded_dataset["train"]["text"][0])

## Batch preparation **(1 point)**

To speed up learning, and take advantage of the GPU architecture, we provide data to the model by batches. Since all line in the same batch need to have the same length, we pad lines to the maximum length of each batch.

In [None]:
def data_generator(X: np.ndarray, y: np.ndarray, batch_size: int = 32, pad_right: bool = False) -> Tuple[
	np.ndarray, np.ndarray]:
	"""Generate randomly ordered batches of data+labels.
    Args:
        X: the input data.
        y: the corresponding labels.
        batch_size: the size of each batch [32].
        pad_right: if true, the padding is done on the right [False].
    """

	X, y = shuffle(X, y)
	n_batches = int(np.ceil(len(y) / batch_size))

	for i in range(n_batches):

		end = min((i + 1) * batch_size, len(y))

		X_batch = X[i * batch_size:end]
		y_batch = y[i * batch_size:end]

		# Padding to max ength size within the batch
		max_len = np.max([len(x) for x in X_batch])
		for j in range(len(X_batch)):
			x = X_batch[j]
			pad = [vocabulary.get_index(PAD_TOKEN)] * (max_len - len(x))
			X_batch[j] = x + pad if pad_right else pad + x

		X_batch = torch.from_numpy(np.array(X_batch)).long()
		y_batch = torch.from_numpy(np.array(y_batch)).long()

		# Yielding results, so every time the function is called, it starts again from here.
		yield X_batch, y_batch

Let's see what the batches look like.

In [None]:
for inputs, labels in data_generator(encoded_dataset["train"]["text"], encoded_dataset["train"]["label"]):
	print("inputs", inputs, "shape:", inputs.shape)
	print("labels", labels, "shape:", labels.shape)
	break

**(1 point)** Question: On which side should we pad the data for our use case and why?

## The model **(13 points)**

We use a simple RNN with a configurable number of layers.

In [None]:
# Before starting, let's set up the device. A GPU if available, else the CPU.
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
device

In [None]:
class RNN(nn.Module):
	"""A simple RNN module with word embeddings.
    """

	def __init__(self, vocab_size: int, embed_size: int, hidden_size: int, n_layers: int, n_outputs: int) -> None:
		"""
        Args:
            vocab_size: vocabulary size.
            embed_size: embedding dimensions.
            hidden_size: hidden layer size.
            n_layers: the number of layers.
            n_outputs: the number of output classes.
        """
		super().__init__()
		self.vocab_size = vocab_size
		self.embed_size = embed_size
		self.hidden_size = hidden_size
		self.n_layers = n_layers
		self.n_outputs = n_outputs

		# The word embedding layer.
		self.embed = nn.Embedding(self.vocab_size, self.embed_size)
		# The RNN
		self.rnn = nn.RNN(
			input_size=self.embed_size,
			hidden_size=self.hidden_size,
			num_layers=self.n_layers,
			batch_first=True,  # Changes the order of dimension to put the batches first.
		)
		# A fully connected layer to project the RNN's output to only one output used for classification.
		self.fc = nn.Linear(self.hidden_size, self.n_outputs)

	def forward(self, X: torch.Tensor) -> torch.Tensor:
		"""Function called when the model is called with data as input.
        Args:
            X: the input tensor of dimensions batch_size, sequence length, vocab size (actually just an int).
        Returns:
            The resulting tensor of dimension batch_size, sequence length, output dimensions.
        """
		h0 = torch.zeros(self.n_layers, X.size(0), self.hidden_size).to(device)

		out = self.embed(X)
		# out contains the output layer of all words in the sequence.
		# First dim is batch, second the word in the sequence, third is the vector itself.
		# The second output value is the last vector of all intermediate layer.
		# Only use it if you want to access the intermediate layer values of a
		# multilayer model.
		out, _ = self.rnn(out, h0)
		# Getting the last value only.
		out = out[:, -1, :]

		# Linear projection.
		out = self.fc(out)

		return out

Note that we do not pass the output through a sigmoid function. This is because pyTorch implements some code optimization within the `BCEWithLogitsLoss` we'll see later.

In [None]:
def my_accuracy(outputs: torch.Tensor, labels: torch.Tensor):
    outputs = torch.sigmoid(outputs)
    outputs = torch.where(outputs > 0.5, 1, 0)
    acc = (torch.sum(outputs == labels).float()/labels.size()[0]).item()
    return acc

In [None]:
from copy import deepcopy

def train(
		model: nn.Module,
		criterion: Callable,
		optimizier: torch.optim.Optimizer,
		n_epochs: int,
		train_gen: Callable,
		valid_gen: Callable,
) -> Tuple[List[float], List[float]]:
	"""Train a model using a batch gradient descent.
    Args:
        model: a class inheriting from nn.Module.
        criterion: a loss criterion.
        optimizer: an optimizer (e.g. Adam, RMSprop, ...).
        n_epochs: the number of training epochs.
        train_gen: a callable function returing a batch (data, labels).
        valid_gen: a callable function returing a batch (data, labels).
    """
	train_losses = np.zeros(n_epochs)
	valid_losses = np.zeros(n_epochs)
	best_val_loss = float('inf')
	best_model = deepcopy(model)
	train_accuracies = np.zeros(n_epochs)
	valid_accuracies = np.zeros(n_epochs)

	for epoch in range(n_epochs):

		t0 = datetime.now()
		model.train()
		train_loss = []
		train_accuracy = []

		# Training loop.
		for inputs, labels in train_gen():
			# labels are of dimension (N,) we turn them into (N, 1).
			labels = labels.view(-1, 1).float()
			# Put them on the GPU.
			inputs, labels = inputs.to(device), labels.to(device)

			# Reset the gradient.
			optimizer.zero_grad()

			outputs = model(inputs)
			train_accuracy.append(my_accuracy(outputs, labels))
			loss = criterion(outputs, labels)
			loss.backward()
			optimizier.step()
			
			train_loss.append(loss.item())  # .item() detach the value from GPU.
   
		train_losses[epoch] = np.mean(train_loss)
		train_accuracies[epoch] = np.mean(train_accuracy)

		model.eval()
		valid_loss = []
		valid_accuracy = []
		# Evaluation loop.
		for inputs, labels in valid_gen():
			labels = labels.view(-1, 1).float()
			inputs, labels = inputs.to(device), labels.to(device)

			outputs = model(inputs)
			my_accuracy(outputs, labels)
			valid_accuracy.append(my_accuracy(outputs, labels))
			loss = criterion(outputs, labels)

			valid_loss.append(loss.item())

		valid_losses[epoch] = np.mean(valid_loss)
		valid_accuracies[epoch] = np.mean(valid_accuracy)
  
		if valid_losses[epoch] < best_val_loss:
			best_val_loss = valid_losses[epoch]
			best_model = deepcopy(model)
		print(
			f"Epoch: {epoch}, training loss: {train_losses[epoch]}, validation loss: {valid_losses[epoch]}, training accuracy: {train_accuracies[epoch]}, validation accuracy: {valid_accuracies[epoch]}, in {datetime.now() - t0}")
	return train_losses, valid_losses, best_model

We setup the model, criterion (a binary cross entropy), and the optimizer (Adam).

Note that `BCEWithLogitsLoss` use a mathematical trick to incorporate the sigmoid function in its computation. This trick makes the learning process go slightly faster and is the reason why we didn't put a sigmoid in the forward function of the model.

In [None]:
model = RNN(len(vocabulary), 32, 64, 1, 1).to(device)
criterion = nn.BCEWithLogitsLoss()
optimizer = torch.optim.Adam(model.parameters())

We get the 3 generators.

In [None]:
train_gen = lambda: data_generator(encoded_dataset["train"]["text"], encoded_dataset["train"]["label"])
valid_gen = lambda: data_generator(encoded_dataset["validation"]["text"], encoded_dataset["validation"]["label"])
test_gen = lambda: data_generator(encoded_dataset["test"]["text"], encoded_dataset["test"]["label"])

And train the model.

In [None]:
train_losses, valid_losses, model = train(model, criterion, optimizer, 20, train_gen, valid_gen)

We can look at the training and validation loss.

In [None]:
plt.plot(train_losses, label="Training loss")
plt.plot(valid_losses, label="Validation loss")
plt.legend()

For the assignment, code the following.
* **(2 points)** The model validation loss should go down and then up. It means the model starts overfitting after a certain number of iterations. Modify the `train` function so it returns the model found with the best validation loss.
* **(2 points)** Add an accuracy function and report the accuracy of the training and test set.
* **(3 points)** Create an LSTM class which uses an LSTM instead of an RNN. Compare its results with the RNN.
* **(2 point)** Implement a function which takes any text and return the model's prediction.
    * The function should have a string as input and return a class (0 or 1) and its probability (score out of a [sigmoid](https://pytorch.org/docs/stable/generated/torch.sigmoid.html)).
    * Don't forget to make the text go through the same pretreatment and encoding you used to train your model.
* **(3 points)** Create a bidirectional LSTM (BiLSTM) class to classify your sentences. Report the accuracy on the training and test data.
    * To combine the last output of both direction, you can concatenate, add, or max-pool them. Please document your choice.
* **(1 point)** With your best classifier, look at two wrongly classified examples on the test set. Try explaining why the model was wrong.
* **(Bonus)** Try finding better hyperparameters (dimensions, number of layers, ...). Document your experiments and results.