# Coding your own RNN

Using this pre-filled notebook, we will code our own RNN for sentence classification. For now, we'll keep using IMDB, as the goal of this part is to understand how an RNN works.

Unlike our previous lab, we will also learn the embedding layer. Which means we need to deal with vocabulary by ourselves.

In [19]:
from functools import partial
from typing import Callable, Dict, Generator, List, Tuple

from datasets import load_dataset
import numpy as np
from sklearn.utils import shuffle
import torch
from torch import nn
from torchtext.vocab import vocab, Vocab
from torchtext.data.utils import get_tokenizer

from tqdm.auto import tqdm
from collections import OrderedDict, Counter

import copy

## Dataset
We load the dataset and split the training set in a stratified train/validation set.

In [2]:
dataset = load_dataset("imdb")
train_dataset = dataset["train"].train_test_split(
    stratify_by_column="label", test_size=0.2, seed=42
)
test_df = dataset["test"]
train_df = train_dataset["train"]
valid_df = train_dataset["test"]
train_df.shape, valid_df.shape, test_df.shape

Found cached dataset imdb (/home/coartix/.cache/huggingface/datasets/imdb/plain_text/1.0.0/d613c88cf8fa3bab83b4ded3713f1f74830d1100e171db75bbddb80b3345c9c0)


  0%|          | 0/3 [00:00<?, ?it/s]

Loading cached split indices for dataset at /home/coartix/.cache/huggingface/datasets/imdb/plain_text/1.0.0/d613c88cf8fa3bab83b4ded3713f1f74830d1100e171db75bbddb80b3345c9c0/cache-5f37fd0866e4f89f.arrow and /home/coartix/.cache/huggingface/datasets/imdb/plain_text/1.0.0/d613c88cf8fa3bab83b4ded3713f1f74830d1100e171db75bbddb80b3345c9c0/cache-dd5732a0e6ac784c.arrow


((20000, 2), (5000, 2), (25000, 2))

## Vocabulary (1 point)

**\[1 point\]** Build your own vocabulary. The [example provided in torchtext documentation](https://pytorch.org/text/stable/vocab.html#id1) might be of help.
* Don't forge to setup the `min_freq` parameter to not include unfrequent noise.
* You will need a tokenizer. Reuse the `basic_english` one from the our previous lab.
* For an RNN we need two special tokens: `<unk>`, for unknown words, and `<pad>` for padding.

In [4]:
min_freq = 5
tokenizer = get_tokenizer("basic_english", language="en")

tokens = []
unk_token = "<unk>"
pad_token = "<pad>"
for text in tqdm(train_df['text'], total=len(train_df)):
    tokens += tokenizer(text)
vocabulary = vocab(OrderedDict(sorted(Counter(tokens).items(), key=lambda x: x[1], reverse=True)), min_freq=min_freq, specials=[pad_token, unk_token])
vocabulary.set_default_index(vocabulary[unk_token])

  0%|          | 0/20000 [00:00<?, ?it/s]

Testing the vocabulary

In [5]:
print(vocabulary['<unk>'], vocabulary['<pad>'], vocabulary['out of vocabulary'], vocabulary['the'])
# get the word at index 2 in vocab
print(vocabulary.get_itos()[2])

1 0 1 2
the


## Vectorize and batch the input (3 points)

As seen in class, our model should take one-hot encoded vectors corresponding to the each token vocabulary id. However, computing a vector x matrix multiplication for every input is unnecessarily costly. Multiplying a one-hot vector with a matrix is the equivalent of taking one row of the matrix. In pyTorch, we provide ids for each token which will be used as input to an `nn.Embedding` layer. The id is simply the row in the embedding matrix.

**\[1 point\]** Fill the `vectorize_text` function returning a 1D torch tensor of `torch.long` for each input text.

In [6]:
def vectorize_text(
    text: str, vocabulary: Vocab, tokenizer: Callable[[str], List[str]]
) -> torch.Tensor:
    """
    Generate a tensor of vocabluary IDs for a given text.
    Args:
        text: the input text.
        vocabulary: a Vocab objects.
        tokenizer: a text tokenizer.
    Returns:
        A tensor of IDs (torch.long).
    """
    return torch.tensor(
        [vocabulary[token] for token in tokenizer(text)], dtype=torch.long
    )

Testing `vectorize_text` :

In [7]:
# Test the function
vectorize_text("This is a test", vocabulary, tokenizer)

tensor([  14,   10,    6, 2148])

In [8]:
text_pipeline = partial(vectorize_text, vocabulary=vocabulary, tokenizer=tokenizer)

Check the function is working correctly, especially it should return the right special id for unknown words.

In [9]:
text_pipeline("Some text I am thinking about... ragafqfa")

tensor([  56, 3160,   13,  244,  526,   50,    3,    3,    3,    1])

In [10]:
X_train = [text_pipeline(text) for text in tqdm(train_df["text"])]
y_train = train_df["label"]
X_valid = [text_pipeline(text) for text in tqdm(valid_df["text"])]
y_valid = valid_df["label"]
X_test = [text_pipeline(text) for text in tqdm(test_df["text"])]
y_test = test_df["label"]

  0%|          | 0/20000 [00:00<?, ?it/s]

  0%|          | 0/5000 [00:00<?, ?it/s]

  0%|          | 0/25000 [00:00<?, ?it/s]

To speed up the training process, we turn the inputs into batches, as we did last time. For batches to work, every line must have the same lengths. Last time, it was implicit as only a vector (the average of all embeddings) was provided. This time, every line has the length of a different review.

To go around this problem, we use padding. So every line within a batch is padded to the length of its longest element.

* **\[1 point\]** Fill the data generator function.
* **\[1 point\]** On which side should you pad and why?

Recurrent Neural Networks (RNNs) process sequences step-by-step, starting from the left. 
By right-padding (padding tokens added at the end) the sequences, you ensure that the words in each sentence remain in their original order. This maintains the sequential structure of the sentences, which is essential for capturing meaningful patterns and dependencies in natural language processing tasks.  
But, if we use Pytorch RNN which we will in the second notebook, there is a reason for left-padding in PyTorch RNNs. The underlying implementation of these modules assumes that the input sequences are left-padded. Consequently, when you use left-padding, PyTorch can optimize the computations and memory usage during the forward and backward passes of the RNN.  

So we will use right-padding here and left-padding on the second notebook.

In [13]:
def data_generator(
    X: List[torch.tensor], y: List[int], pad_id: int, batch_size: int = 32
) -> Generator[Tuple[torch.Tensor, torch.Tensor], None, None]:
    """
    Yield batches from given input data and labels.
    Args:
        X: a list of tensor (input features).
        y: the corresponding labels.
        batch_size: the size of every batch [32].
    Returns:
        A tuple of tensors (features, labels).
    """
    n_samples = len(X)
    n_batches = n_samples // batch_size
    for i in range(n_batches):
        X_batch = X[i * batch_size : (i + 1) * batch_size]
        y_batch = y[i * batch_size : (i + 1) * batch_size]
        max_len = max([len(x) for x in X_batch])
        X_batch = torch.stack(
            [torch.cat((x, torch.tensor([pad_id] * (max_len - len(x))))) for x in X_batch]
        )
        yield X_batch, torch.tensor(y_batch, dtype=torch.long)

In [14]:
train_gen = lambda: data_generator(X_train, y_train, vocabulary[pad_token])
valid_gen = lambda: data_generator(X_valid, y_valid, vocabulary[pad_token])
test_gen = lambda: data_generator(X_test, y_test, vocabulary[pad_token])

## Classifier (3 points)

**\[3 points\]** Code your own RNN. Fill the `RNN` class correctly. Remember an RNN has 3 matrices and an embedding layer (see course slide 61).
* The embedding layer turns a one-hot vectors into dense vectors.
* The first matrix (W) connects the embedding to the hidden layer.
  * `embedding_size -> hidden_size`
* The second matrix (U) connect the previous hidden layer to the current one.
  * `hidden_size -> hidden_size`
* These to vectors are added and go through an activation function (e.g. $h_t = tanh(Wx_i+Uh_{t-1})$).
* The last matrix (V) connects the hidden layer to the hidden layer to the output.
  * `hidden_size -> 1`
* Donc forget to add an `init_hidden` function which initialize the first hidden layer to 0.

In [15]:
# RNN customed
class RNN(nn.Module):
    def __init__(self, vocab_size, input_size, hidden_size):
        super().__init__()
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.vocab_size = vocab_size
        self.output_size = 1
        
        # Define the layers
        self.i2h = nn.Linear(input_size, hidden_size, bias=False)
        self.h2h = nn.Linear(hidden_size, hidden_size)
        self.h2o = nn.Linear(hidden_size, 1)
    
    def forward(self, x, hidden_state) -> tuple[torch.Tensor, torch.Tensor]:
        """
        Returns computed output and tanh(i2h + h2h)
        Inputs
        ------
        x: Input vector
        hidden_state: Previous hidden state
        Outputs
        -------
        out: Linear output (without activation because of how pytorch works)
        hidden_state: New hidden state matrix
        """
        x = self.i2h(x.T)
        hidden_state = self.h2h(hidden_state)
        hidden_state = torch.tanh(x + hidden_state)
        out = self.h2o(hidden_state)
        return out, hidden_state
        
    def init_hidden(self, batch_size=1) -> torch.Tensor:
        """
				Helper function.
        Returns a hidden state with specified batch size. Defaults to 1
        """
        return torch.zeros(batch_size, self.hidden_size, requires_grad=False)

## Training (2 points)

Training is a bit different than usual. We will need to sequentially (but in "batch parallel") go through an input, keeping track of the hidden layer, and use the last output as prediction.

**\[2 point\]** Code the training loop.
* Note that for each batch, you need to loop through the whole input and use the output of the last token as input to your criterion.
* Keep the best model evaluated on the validation set.
* Plot the training and validation losses.
* Training will take some time (~30 min on a T4 GPU). Make sure your results appear in the notebook.

Since it can take long, let's try to use cuda

In [16]:
device = "cuda:0" if torch.cuda.is_available() else "cpu"
device

'cpu'

Defining variables and a method computing the accuracy from predictions :

In [17]:
n_embedding = 32
n_hidden = 64
model = RNN(len(vocabulary.get_itos()), n_embedding, n_hidden).to(device)
criterion = nn.BCEWithLogitsLoss()
optimizer = torch.optim.RMSprop(model.parameters(), lr=0.001)

In [18]:
def accuracy(y_pred: torch.Tensor, y_true: torch.Tensor) -> float:
    """
    Compute the accuracy of a model.
    Args:
        y_pred: the predictions of the model.
        y_true: the true labels.
    Returns:
        The accuracy of the model.
    """
    with torch.no_grad():
        y_pred = torch.sigmoid(y_pred)
        y_pred = torch.round(y_pred)
        return (y_pred == y_true).sum().item() / len(y_true)


Let's begin the training of the RNN model

In [20]:
n_epochs = 1
batch_size = 32
train_losses = {}
best_model = None
accuracy_list = []
for epoch in range(n_epochs):
    epoch_losses = list()
    model.train()
    for (X_batch, y_batch) in tqdm(train_gen()):
        if X_batch.shape[0] != batch_size:
            continue
        # Set model to train mode
        model.train()
        # Reset gradients
        optimizer.zero_grad()
        # Forward pass
        hidden = model.init_hidden(X_batch.size(0))  # Using dynamic batch size
        # To device
        X_batch, y_batch, hidden = X_batch.to(device), y_batch.to(device), hidden.to(device)

        loss = 0
        for c in range(X_batch.shape[1]):
            out, hidden = model(X_batch[:, c].reshape(X_batch.shape[0], 1).float(), hidden)
            l = criterion(out, y_batch.reshape(X_batch.shape[0], 1).float())
            loss += l

        # 4. Compute gradients gradients
        loss.backward()

        # 5. Adjust learnable parameters and clip to avoid vanishing and exploding gradients
        nn.utils.clip_grad_norm_(model.parameters(), 3)
        optimizer.step()

        epoch_losses.append(loss.detach().item() / X_batch.shape[1])
        accuracy_list.append(accuracy(out, y_batch.reshape(X_batch.shape[0], 1).float()))

    train_losses[epoch] = torch.tensor(epoch_losses).mean()
    print(f'=> epoch: {epoch + 1}, loss: {train_losses[epoch]}')
    print(f'=> epoch: {epoch + 1}, accuracy: {torch.tensor(accuracy_list).mean()}')

    # compute validation loss, keep the best model
    model.eval()
    accuracy_list = []
    with torch.no_grad():
        valid_losses = list()
        for X_batch, y_batch in tqdm(valid_gen()):
            if X_batch.shape[0] != batch_size:
                continue
            hidden = model.init_hidden(X_batch.size(0))
            X_batch, y_batch, hidden = X_batch.to(device), y_batch.to(device), hidden.to(device)

            loss = 0
            for c in range(X_batch.shape[1]):
                out, hidden = model(X_batch[:, c].reshape(X_batch.shape[0], 1).float(), hidden)
                l = criterion(out, y_batch.reshape(X_batch.shape[0], 1).float())
                loss += l

            valid_losses.append(loss.detach().item() / X_batch.shape[1])
            accuracy_list.append(accuracy(out, y_batch.reshape(X_batch.shape[0], 1).float()))

        valid_loss = torch.tensor(valid_losses).mean()
        print(f'=> epoch: {epoch + 1}, validation loss: {valid_loss}')
        print(f'=> epoch: {epoch + 1}, validation accuracy: {torch.tensor(accuracy_list).mean()}')
        if epoch == 0:
            best_model = copy.deepcopy(model)
        elif valid_loss < min(valid_losses):
            best_model = copy.deepcopy(model)

0it [00:00, ?it/s]

KeyboardInterrupt: 

Now let's test it

In [21]:
# Test the model
best_model.eval()
accuracy_list = []
with torch.no_grad():
    test_losses = list()
    for X_batch, y_batch in tqdm(test_gen()):
        if X_batch.shape[0] != batch_size:
            continue
        hidden = best_model.init_hidden(X_batch.size(0))
        X_batch = X_batch.to(device)
        y_batch = y_batch.to(device)
        hidden = hidden.to(device)

        loss = 0
        for c in range(X_batch.shape[1]):
            out, hidden = best_model(X_batch[:, c].reshape(X_batch.shape[0], 1).float(), hidden)
            l = criterion(out, y_batch.reshape(X_batch.shape[0], 1).float())
            loss += l

        test_losses.append(loss.detach().item() / X_batch.shape[1])
        accuracy_list.append(accuracy(out, y_batch.reshape(X_batch.shape[0], 1).float()))

    test_loss = torch.tensor(test_losses).mean()
    print(f'=> test loss: {test_loss}')
    print(f'=> test accuracy: {torch.tensor(accuracy_list).mean()}')

AttributeError: 'NoneType' object has no attribute 'eval'

## Evaluation (1 point)

* **\[1 point\]** Compute the accuracy for all 3 splits.

In [None]:
# Evaluation
best_model.eval()
with torch.no_grad():
    test_accuracies = list()
    for X_batch, y_batch in test_gen():
        if X_batch.shape[0] != batch_size:
            continue
        hidden = best_model.init_hidden(X_batch.size(0))
        X_batch = X_batch.to(device)
        y_batch = y_batch.to(device)
        hidden = hidden.to(device)

        loss = 0
        for c in range(X_batch.shape[1]):
            out, hidden = best_model(X_batch[:, c].reshape(X_batch.shape[0], 1).float(), hidden)
            l = criterion(out, y_batch.reshape(X_batch.shape[0], 1).float())
            loss += l

        test_accuracies.append(accuracy(out, y_batch.reshape(X_batch.shape[0], 1).float()))

    test_accuracy = torch.tensor(test_accuracies).mean()
    print(f'=> test accuracy: {test_accuracy}')

=> test accuracy: nan
