# Coding your own RNN

Using this pre-filled notebook, we will code our own RNN for sentence classification. For now, we'll keep using IMDB, as the goal of this part is to understand how an RNN works.

Unlike our previous lab, we will also learn the embedding layer. Which means we need to deal with vocabulary by ourselves.

In [5]:
from functools import partial
from typing import Callable, Dict, Generator, List, Tuple
from collections import Counter, OrderedDict
from datasets import load_dataset
import numpy as np
from sklearn.utils import shuffle
import torch
from torch import nn
from torchtext.vocab import vocab, Vocab
from torchtext.data.utils import get_tokenizer

import matplotlib.pyplot as plt

from tqdm.auto import tqdm

## Dataset
We load the dataset and split the training set in a stratified train/validation set.

In [6]:
dataset = load_dataset("imdb")
train_dataset = dataset["train"].train_test_split(
    stratify_by_column="label", test_size=0.2, seed=42
)
test_df = dataset["test"]
train_df = train_dataset["train"]
valid_df = train_dataset["test"]
train_df.shape, valid_df.shape, test_df.shape

Found cached dataset imdb (C:/Users/antho/.cache/huggingface/datasets/imdb/plain_text/1.0.0/d613c88cf8fa3bab83b4ded3713f1f74830d1100e171db75bbddb80b3345c9c0)


  0%|          | 0/3 [00:00<?, ?it/s]

((20000, 2), (5000, 2), (25000, 2))

## Vocabulary (1 point)

**\[1 point\]** Build your own vocabulary. The [example provided in torchtext documentation](https://pytorch.org/text/stable/vocab.html#id1) might be of help.
* Don't forge to setup the `min_freq` parameter to not include unfrequent noise.
* You will need a tokenizer. Reuse the `basic_english` one from the our previous lab.
* For an RNN we need two special tokens: `<unk>`, for unknown words, and `<pad>` for padding.

In [7]:
unk_token = '<unk>'
pad_token = '<pad>'
tokenizer = get_tokenizer("basic_english", language="en")
corpus = train_df['text']
tokens = []
for text in corpus:
    tokens += tokenizer(text)
counter = Counter(tokens)
sorted_by_freq_tuples = sorted(counter.items(), key=lambda x: x[1], reverse=True)
ordered_dict = OrderedDict(sorted_by_freq_tuples)
vocabulary = vocab(ordered_dict, specials=[unk_token, pad_token], min_freq = 5)
vocabulary.set_default_index(vocabulary[unk_token])

## Vectorize and batch the input (3 points)

As seen in class, our model should take one-hot encoded vectors corresponding to the each token vocabulary id. However, computing a vector x matrix multiplication for every input is unnecessarily costly. Multiplying a one-hot vector with a matrix is the equivalent of taking one row of the matrix. In pyTorch, we provide ids for each token which will be used as input to an `nn.Embedding` layer. The id is simply the row in the embedding matrix.

**\[1 point\]** Fill the `vectorize_text` function returning a 1D torch tensor of `torch.long` for each input text.

In [8]:
def vectorize_text(
    text: str, vocabulary: Vocab, tokenizer: Callable[[str], List[str]]
) -> torch.Tensor:
    """
    Generate a tensor of vocabluary IDs for a given text.
    Args:
        text: the input text.
        vocabulary: a Vocab objects.
        tokenizer: a text tokenizer.
    Returns:
        A tensor of IDs (torch.long).
    """
    tokens = tokenizer(text)

    ids = [vocabulary[token] for token in tokens]

    tensor = torch.tensor(ids, dtype=torch.long)
    return tensor

In [9]:
text_pipeline = partial(vectorize_text, vocabulary=vocabulary, tokenizer=tokenizer)

Check the function is working correctly, especially it should return the right special id for unknown words.

In [10]:
text_pipeline("Some text I am thinking about... ragafqfa")

tensor([  56, 3160,   13,  244,  526,   50,    3,    3,    3,    0])

In [11]:
X_train = [text_pipeline(text) for text in tqdm(train_df["text"])]
y_train = train_df["label"]
X_valid = [text_pipeline(text) for text in tqdm(valid_df["text"])]
y_valid = valid_df["label"]
X_test = [text_pipeline(text) for text in tqdm(test_df["text"])]
y_test = test_df["label"]

  0%|          | 0/20000 [00:00<?, ?it/s]

  0%|          | 0/5000 [00:00<?, ?it/s]

  0%|          | 0/25000 [00:00<?, ?it/s]

To speed up the training process, we turn the inputs into batches, as we did last time. For batches to work, every line must have the same lengths. Last time, it was implicit as only a vector (the average of all embeddings) was provided. This time, every line has the length of a different review.

To go around this problem, we use padding. So every line within a batch is padded to the length of its longest element.

* **\[1 point\]** Fill the data generator function.
* **\[1 point\]** On which side should you pad and why?

In [12]:
def data_generator(
    X: List[torch.tensor], y: List[int], pad_id: int, batch_size: int = 32
) -> Generator[Tuple[torch.Tensor, torch.Tensor], None, None]:
    """
    Yield batches from given input data and labels.
    Args:
        X: a list of tensor (input features).
        y: the corresponding labels.
        batch_size: the size of every batch [32].
    Returns:
        A tuple of tensors (features, labels).
    """
    X, y = shuffle(X, y)
    n_batches = len(X) // batch_size
    if len(X) % batch_size != 0:
        n_batches += 1
    
    for i in range(n_batches):
        start_idx = i * batch_size
        end_idx = start_idx + batch_size
        batch_X = X[start_idx:end_idx]
        batch_y = y[start_idx:end_idx]

        # Padding the batch_X to the length of the longest element
        max_length = max([len(x) for x in batch_X])
        padded_X = []
        for x in batch_X:
            padding_length = max_length - len(x)
            padded_x = torch.nn.functional.pad(x, (0, padding_length), value=pad_id)
            padded_X.append(padded_x)
        
        yield torch.stack(padded_X), torch.tensor(batch_y)

Recurrent neural networks (RNN) read text from left to right. Padding on the right side ensures that the model's attention is focused on the actual content of the sequence rather than the padded tokens. Padding on the right also preserves the order of the tokens.

In [13]:
train_gen = lambda: data_generator(X_train, y_train, vocabulary[pad_token])
valid_gen = lambda: data_generator(X_valid, y_valid, vocabulary[pad_token])
test_gen = lambda: data_generator(X_test, y_test, vocabulary[pad_token])

## Classifier (3 points)

**\[3 points\]** Code your own RNN. Fill the `RNN` class correctly. Remember an RNN has 3 matrices and an embedding layer (see course slide 61).
* The embedding layer turns a one-hot vectors into dense vectors.
* The first matrix (W) connects the embedding to the hidden layer.
  * `embedding_size -> hidden_size`
* The second matrix (U) connect the previous hidden layer to the current one.
  * `hidden_size -> hidden_size`
* These to vectors are added and go through an activation function (e.g. $h_t = tanh(Wx_i+Uh_{t-1})$).
* The last matrix (V) connects the hidden layer to the hidden layer to the output.
  * `hidden_size -> 1`
* Donc forget to add an `init_hidden` function which initialize the first hidden layer to 0.

In [14]:
class RNN(nn.Module):
    """
    The RNN classifier.
    """

    def __init__(self, nb_classes:int,  embedding_size: int, hidden_size: int) -> None:
        """
        Args:
            embedding_size: the dimension of the input embeddings.
            hidden_size: the dimension of the hidden layer.
        """
        super(RNN, self).__init__()
        self.embedding = nn.Embedding(num_embeddings=nb_classes, embedding_dim=embedding_size)
        self.hidden_size = hidden_size
        self.W = nn.Linear(embedding_size, hidden_size)
        self.U = nn.Linear(hidden_size, hidden_size)
        self.activation = nn.Tanh()
        self.V = nn.Linear(hidden_size, 1)

    def init_hidden(self, batch_size: int) -> torch.Tensor:
        """
        Initialize the first hidden layer to zeros.

        Args:
            batch_size: the size of the input batch.
        Returns:
            Initial hidden layer tensor.
        """
        return torch.zeros(batch_size, self.hidden_size)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        """
        Args:
            x: an input tensor.
        Returns:
            Logits.
        """
        embedded = self.embedding(x)
        batch_size = x.size(0)
        hidden = self.init_hidden(batch_size)

        for t in range(x.size(1)):
            input_t = embedded[:, t, :]
            Wx = self.W(input_t)
            Uh = self.U(hidden)
            hidden = self.activation(Wx + Uh)

        output = self.V(hidden)
        return output

## Training (2 points)

Training is a bit different than usual. We will need to sequentially (but in "batch parallel") go through an input, keeping track of the hidden layer, and use the last output as prediction.

**\[2 point\]** Code the training loop.
* Note that for each batch, you need to loop through the whole input and use the output of the last token as input to your criterion.
* Keep the best model evaluated on the validation set.
* Plot the training and validation losses.
* Training will take some time (~30 min on a T4 GPU). Make sure your results appear in the notebook.

In [30]:
device = "cuda:0" if torch.cuda.is_available() else "cpu"
device

<module 'torch.cuda' from 'E:\\Program Files (x86)\\anaconda3\\lib\\site-packages\\torch\\cuda\\__init__.py'>

In [31]:
n_embedding = 32
n_hidden = 64
model = RNN(len(vocabulary.get_itos()), n_embedding, n_hidden).to(device)
criterion = nn.BCEWithLogitsLoss()
optimizer = torch.optim.RMSprop(model.parameters(), lr=0.001)

In [39]:
from tqdm import tqdm

n_samples = len(X_train)
batch_size = 32
n_epochs = 2  # Define the number of training epochs
n_iterations = n_samples // batch_size
best_loss = float('inf')
best_model = None
train_losses = []
valid_losses = []

model = model.to(device)  # Move the model to GPU
criterion = criterion.to(device)  # Move the criterion to GPU

for epoch in range(n_epochs):
    train_loss = 0.0
    model.train()

    with tqdm(total=n_iterations, desc=f"Epoch {epoch+1}/{n_epochs} - Training", leave=False) as pbar:
        for iteration in range(n_iterations):
            X_train_batch, y_train_batch = next(train_gen())

            optimizer.zero_grad()
            output = model(X_train_batch)
            loss = criterion(output[:, -1], y_train_batch.float())
            loss.backward()
            optimizer.step()

            train_loss += loss.item()
            pbar.update(1)

    avg_train_loss = total_loss / num_batches
    train_losses.append(avg_train_loss)

    model.eval()
    total_loss = 0.0
    num_batches = 0

    with torch.no_grad():
        with tqdm(valid_gen(), desc=f"Epoch {epoch+1}/{n_epochs} - Validation", leave=False) as pbar:
            for X_valid_batch, y_valid_batch in pbar:
                X_valid_batch = X_valid_batch.to(device)  # Move input data to GPU
                y_valid_batch = y_valid_batch.to(device)  # Move target labels to GPU

                hidden = model.init_hidden(X_valid_batch.size(0)).to(device)  # Initialize hidden state

                # Forward pass
                output = model(X_valid_batch)
                loss = criterion(output[:, -1], y_valid_batch.float())  # Use the last output as prediction

                total_loss += loss.item()
                num_batches += 1

                pbar.set_postfix({"Loss": total_loss / num_batches})

    avg_valid_loss = total_loss / num_batches
    valid_losses.append(avg_valid_loss)

    tqdm.write(f"Epoch {epoch+1}/{n_epochs} - Train Loss: {avg_train_loss:.4f} - Valid Loss: {avg_valid_loss:.4f}")

    if avg_valid_loss < best_loss:
        best_loss = avg_valid_loss
        best_model = model.state_dict()  # Save the model state dictionary

best_model = RNN(len(vocabulary.get_itos()), n_embedding, n_hidden).to(device)  # Create a new instance of the model


                                                                                                                       

Epoch 1/2 - Train Loss: 0.6929 - Valid Loss: 0.6932


                                                                                                                       

Epoch 2/2 - Train Loss: 0.6932 - Valid Loss: 0.6938




NameError: name 'best_model_dict' is not defined

We wanted to add more epochs but we weren't able to run it on GPU so we had to reduce the number of epochs.

## Evaluation (1 point)

* **\[1 point\]** Compute the accuracy for all 3 splits.

In [43]:
from sklearn.metrics import accuracy_score

def compute_accuracy(model: nn.Module, gen) -> float:
    model.eval()
    correct = 0
    total = 0

    with torch.no_grad():
        for X_batch, y_batch in gen():
            output = model(X_batch)
            predictions = torch.round(torch.sigmoid(output))  # Apply threshold at 0.5 for binary classification
            correct += (predictions.squeeze() == y_batch).sum().item()
            total += y_batch.size(0)

    accuracy = correct / total
    return accuracy

train_accuracy = compute_accuracy(best_model, train_gen)
valid_accuracy = compute_accuracy(best_model, valid_gen)
test_accuracy = compute_accuracy(best_model, test_gen)

print(f"Train Accuracy: {train_accuracy:.2%}")
print(f"Valid Accuracy: {valid_accuracy:.2%}")
print(f"Test Accuracy: {test_accuracy:.2%}")

Train Accuracy: 50.19%
Valid Accuracy: 50.50%
Test Accuracy: 50.00%
