# Using pyTorch implementation

In this second part, we use pyTorch's implementation of RNNs and LSTMs. Again, as we are focusing on understanding the model and library, we will keep using the IMDB dataset. The good news is training is much faster using pyTorch's implementations.

In [1]:
from typing import Callable, List, Tuple, Generator
from functools import partial
import numpy as np
from datasets import load_dataset
from sklearn.utils import shuffle
import torch
from torch import nn
from torchtext.vocab import Vocab, build_vocab_from_iterator
from torchtext.data.utils import get_tokenizer

from tqdm.auto import tqdm
import torch.nn.functional as F

## From dataset to batch inputs

You already know what to do here, it's the same as in the previous notebook.

In [2]:
dataset = load_dataset("imdb")
train_dataset = dataset["train"].train_test_split(
    stratify_by_column="label", test_size=0.2, seed=42
)
test_df = dataset["test"]
train_df = train_dataset["train"]
valid_df = train_dataset["test"]
train_df.shape, valid_df.shape, test_df.shape

Found cached dataset imdb (/Users/louis/.cache/huggingface/datasets/imdb/plain_text/1.0.0/d613c88cf8fa3bab83b4ded3713f1f74830d1100e171db75bbddb80b3345c9c0)


  0%|          | 0/3 [00:00<?, ?it/s]

Loading cached split indices for dataset at /Users/louis/.cache/huggingface/datasets/imdb/plain_text/1.0.0/d613c88cf8fa3bab83b4ded3713f1f74830d1100e171db75bbddb80b3345c9c0/cache-5f37fd0866e4f89f.arrow and /Users/louis/.cache/huggingface/datasets/imdb/plain_text/1.0.0/d613c88cf8fa3bab83b4ded3713f1f74830d1100e171db75bbddb80b3345c9c0/cache-dd5732a0e6ac784c.arrow


((20000, 2), (5000, 2), (25000, 2))

In [3]:
tokenizer = get_tokenizer("basic_english", language="en")
def yield_tokens(data_iter):
    for doc in data_iter:
        yield tokenizer(doc["text"])
vocabulary = build_vocab_from_iterator(yield_tokens(train_df), min_freq=1, specials=["<unk>", "<pad>"])
vocabulary.set_default_index(vocabulary["<unk>"])

In [4]:
pad_token = "<pad>"

In [5]:
def vectorize_text(
    text: str, vocabulary: Vocab, tokenizer: Callable[[str], List[str]]
) -> torch.Tensor:
    """
    Generate a tensor of vocabluary IDs for a given text.
    Args:
        text: the input text.
        vocabulary: a Vocab objects.
        tokenizer: a text tokenizer.
    Returns:
        A tensor of IDs (torch.long).
    """
    return torch.tensor(vocabulary.forward(tokenizer(text)))


In [6]:
text_pipeline = partial(vectorize_text, vocabulary=vocabulary, tokenizer=tokenizer)

In [7]:
X_train = [text_pipeline(text) for text in tqdm(train_df["text"])]
y_train = train_df["label"]
X_valid = [text_pipeline(text) for text in tqdm(valid_df["text"])]
y_valid = valid_df["label"]
X_test = [text_pipeline(text) for text in tqdm(test_df["text"])]
y_test = test_df["label"]

  0%|          | 0/20000 [00:00<?, ?it/s]

  0%|          | 0/5000 [00:00<?, ?it/s]

  0%|          | 0/25000 [00:00<?, ?it/s]

In [8]:
def data_generator(
    X: List[torch.tensor], y: List[int], pad_id: int, batch_size: int = 32
) -> Generator[Tuple[torch.Tensor, torch.Tensor], None, None]:
    """
    Yield batches from given input data and labels.
    Args:
        X: a list of tensor (input features).
        y: the corresponding labels.
        batch_size: the size of every batch [32].
    Returns:
        A tuple of tensors (features, labels).
    """
    X, y = shuffle(X, y)
    num_batches = (len(X) - 1) // batch_size + 1

    for i in range(num_batches) :
        starting_index = i * batch_size
        ending_index = min((i + 1) * batch_size, len(X))
        batch_X = X[starting_index:ending_index]
        batch_Y = y[starting_index:ending_index]
        max_len = 0
        for line in batch_X :
            max_len = max(max_len, len(line))
        for j in range(len(batch_X)) :
            len_pad = max_len - len(batch_X[j])
            if len_pad > 0:
                batch_X[j] = torch.cat((batch_X[j], torch.full((len_pad,), pad_id)), 0)
        yield torch.stack(batch_X), torch.tensor(batch_Y)

In [9]:
train_gen = lambda: data_generator(X_train, y_train, vocabulary[pad_token])
valid_gen = lambda: data_generator(X_valid, y_valid, vocabulary[pad_token])
test_gen = lambda: data_generator(X_test, y_test, vocabulary[pad_token])

## The classifier

The implementation behind shows how to use the [RNN](https://pytorch.org/docs/stable/generated/torch.nn.RNN.html) implementation provided by pyTorch to code a simple RNN.

In [10]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [11]:
class RNN(nn.Module):
    """A simple RNN module with word embeddings.
    """
    def __init__(self, vocab_size: int, embed_size: int, hidden_size: int, n_layers: int, n_outputs: int) -> None:
        """
        Args:
            vocab_size: vocabulary size.
            embed_size: embedding dimensions.
            hidden_size: hidden layer size.
            n_layers: the number of layers.
            n_outputs: the number of output classes.
        """
        super().__init__()
        self.vocab_size = vocab_size
        self.embed_size = embed_size
        self.hidden_size = hidden_size
        self.n_layers = n_layers
        self.n_outputs = n_outputs


        # The word embedding layer.
        self.embed = nn.Embedding(self.vocab_size, self.embed_size)
        # The RNN
        self.rnn = nn.RNN(
            input_size = self.embed_size,
            hidden_size = self.hidden_size,
            num_layers = self.n_layers,
            batch_first = True, # Changes the order of dimension to put the batches first.
        )
        # A fully connected layer to project the RNN's output to only one output used for classification.
        self.fc = nn.Linear(self.hidden_size, self.n_outputs)

    def forward(self, X: torch.Tensor) -> torch.Tensor:
        """Function called when the model is called with data as input.
        Args:
            X: the input tensor of dimensions batch_size, sequence length, vocab size (actually just an int).
        Returns:
            The resulting tensor of dimension batch_size, sequence length, output dimensions.
        """
        h0 = torch.zeros(self.n_layers, X.size(0), self.hidden_size).to(device)

        out = self.embed(X)
        # out contains the output layer of all words in the sequence.
        # First dim is batch, second the word in the sequence, third is the vector itself.
        # The second output value is the last vector of all intermediate layer.
        # Only use it if you want to access the intermediate layer values of a
        # multilayer model.
        out, _ = self.rnn(out, h0)
        # Getting the last value only.
        out = out[:, -1, :]

        # Linear projection.
        out = self.fc(out)

        return out

## Training (1 point)

**\[1 point\]** Code the training function.
* Note that we are using a function, as we will use it on several models here.
* The RNN implementation of pyTorch doesn't need to be manually looped. As commented in the `forward` function above, `out` contains the ouptut layer for all words in the sequence, and taking its last value is what we needed.

In [12]:
def train(
    model: nn.Module,
    criterion: Callable,
    optimizer: torch.optim.Optimizer,
    n_epochs: int,
    train_gen: Callable,
    valid_gen: Callable,
) -> Tuple[nn.Module, List[float], List[float]]:
    """Train a model using a batch gradient descent.
    Args:
        model: a class inheriting from nn.Module.
        criterion: a loss criterion.
        optimizer: an optimizer (e.g. Adam, RMSprop, ...).
        n_epochs: the number of training epochs.
        train_gen: a callable function returing a batch (data, labels).
        valid_gen: a callable function returing a batch (data, labels).
    Returns:
        A tuple:[best_model (by validation loss), training losses, validation losses].
    """
    train_losses, valid_losses = [], []
    best_loss = float('inf')
    best_model = None


    for epoch in tqdm(range(n_epochs)):
        model.train()
        train_loss = 0.0
        num_train_batches = 0

        for data, labels in train_gen():
            data = data.to(device)
            labels = labels.to(device).float()

            optimizer.zero_grad()
            output = model(data) #.squeeze().float()
            output = output.to(device)
            loss = criterion(output, labels)
            loss.backward()
            optimizer.step()
            train_loss += loss.item()
            num_train_batches += 1

        train_loss /= num_train_batches
        train_losses.append(train_loss)

        # Validation
        model.eval()
        valid_loss = 0.0
        num_valid_batches = 0

        with torch.no_grad():
            for data, labels in valid_gen():
                data = data.to(device)
                labels = labels.to(device).float()

                output = model(data)#.squeeze().float()
                output = output.to(device)
                loss = criterion(output, labels)
                valid_loss += loss.item()
                num_valid_batches += 1

        valid_loss /= num_valid_batches
        valid_losses.append(valid_loss)

        # Vérification du meilleur modèle selon la perte de validation
        if valid_loss < best_loss:
            best_loss = valid_loss
            best_model = model

       # print(f"Epoch {epoch+1}/{n_epochs} - Train Loss: {train_loss:.4f} - Valid Loss: {valid_loss:.4f}")
    return best_model, train_losses, valid_losses

In [13]:
n_embedding = 32
n_hidden = 64
criterion = nn.BCEWithLogitsLoss()
rnn_model = RNN(len(vocabulary.get_itos()), n_embedding, n_hidden, 2, 1).to(device)
optimizer = torch.optim.RMSprop(rnn_model.parameters(), lr=0.001)

In [None]:
train(rnn_model, criterion, optimizer, 15, train_gen, valid_gen)

*  Add an accuracy function and report the accuracy of the training, validation, and test set.

In [16]:
def accuracy(model: nn.Module, data_gen: Callable) -> float:
    """Calcule l'accuracy du modèle.
    Args:
        model: Notre modèle.
        data_gen: Notre générateur de données.
    Returns:
        L'accuracy du modèle sur l'ensemble de données passé en paramètre.
    """
    model.eval()
    total = 0
    correct = 0

    with torch.no_grad():
        for data, labels in data_gen():
            pred = model(data)
            _, predicted = torch.max(pred.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    accuracy = correct / total
    return accuracy


In [None]:
print('Accuracy on the training set: ', accuracy(rnn_model, train_gen))
print('Accuracy on the validation set: ', accuracy(rnn_model, valid_gen))
print('Accuracy on the test set: ', accuracy(rnn_model, test_gen))

*  Create an LSTM class which uses an LSTM instead of an RNN.

In [17]:
class LSTM(nn.Module):
    """A simple LSTM module with word embeddings.
    """
    def __init__(self, vocab_size: int, embed_size: int, hidden_size: int, n_layers: int, n_outputs: int) -> None:
        """
        Args:
            vocab_size: vocabulary size.
            embed_size: embedding dimensions.
            hidden_size: hidden layer size.
            n_layers: the number of layers.
            n_outputs: the number of output classes.
        """
        super().__init__()
        self.vocab_size = vocab_size
        self.embed_size = embed_size
        self.hidden_size = hidden_size
        self.n_layers = n_layers
        self.n_outputs = n_outputs

        # The word embedding layer.
        self.embed = nn.Embedding(self.vocab_size, self.embed_size)
        # The LSTM
        self.lstm = nn.LSTM(
            input_size=self.embed_size,
            hidden_size=self.hidden_size,
            num_layers=self.n_layers,
            batch_first=True,  # Changes the order of dimension to put the batches first.
        )
        # A fully connected layer to project the LSTM's output to the desired number of output classes.
        self.fc = nn.Linear(self.hidden_size, self.n_outputs)

    def forward(self, X: torch.Tensor) -> torch.Tensor:
        """Function called when the model is called with data as input.
        Args:
            X: the input tensor of dimensions batch_size, sequence length, vocab size (actually just an int).
        Returns:
            The resulting tensor of dimensions batch_size, sequence length, output dimensions.
        """
        h0 = torch.zeros(self.n_layers, X.size(0), self.hidden_size).to(device)
        c0 = torch.zeros(self.n_layers, X.size(0), self.hidden_size).to(device)

        out = self.embed(X)
        out, _ = self.lstm(out, (h0, c0))
        out = out[:, -1, :]

        # Linear projection.
        out = self.fc(out)

        return out


In [18]:
n_embedding = 32
n_hidden = 64
n_layers = 2
lstm_model = LSTM(len(vocabulary.get_itos()), n_embedding, n_hidden, n_layers, 1).to(device)
criterion = nn.BCEWithLogitsLoss()
optimizer = torch.optim.RMSprop(lstm_model.parameters(), lr=0.001)

In [None]:
train(lstm_model, criterion, optimizer, 15, train_gen, valid_gen)

In [None]:
print('Accuracy on the training set: ', accuracy(lstm_model, train_gen))
print('Accuracy on the validation set: ', accuracy(lstm_model, valid_gen))
print('Accuracy on the test set: ', accuracy(lstm_model, test_gen))

Comparing RNN and LSTM models

# TODO

* Implement a function which takes any text and returns the model's prediction.
    * The function should have a string as input and return a class (0 or 1) and its confidence (between 0 and 1).

Implement a function which takes any text and returns the model's prediction.

In [19]:
def lstm_predict(text: str) -> Tuple[int, float]:

    input_tensor = torch.tensor(text).unsqueeze(0).to(device)
    model = LSTM()

    with torch.no_grad():
        model.eval()
        output = model(input_tensor)
        probabilities =nn.softmax(output, dim=1)

    predicted_class = torch.argmax(probabilities, dim=1).item()
    confidence = probabilities[0, predicted_class].item()

    return predicted_class, confidence


* With your best classifier, look at two wrongly classified examples on the test set. Try explaining why the model was wrong.