# Text Classification

## RNN & LSTM

### HW2

**Full Name:Seyyedeh Zahra Fallah Mir Mousavi **

**SID: 401207192**

## Homework Overview
In this homework, you will learn to implement, train, and evaluate Recurrent Neural Networks (RNNs) and Long Short-Term Memory (LSTM) models on a text classification task using a dataset of IMDB movie reviews, and compare them.

**NOTE : Be sure to answer the analytical questions at the end of the notebook as well.**

In [110]:
import nltk
nltk.download('stopwords')
import random
import re
from collections import Counter
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from nltk.corpus import stopwords
from nltk import wordpunct_tokenize
from tqdm import tqdm
from sklearn.metrics import classification_report,accuracy_score
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch import optim
from torch.utils.data import DataLoader
from IPython.core.display import display, HTML
from google_drive_downloader import GoogleDriveDownloader as gdd
tqdm.pandas()

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
device

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


device(type='cuda')

# Dataset

In this section, we’ll load the IMDB dataset and preprocess the data to make it suitable for training RNN and LSTM models.

## Load Dataset
Description of Dataset: The IMDB movie reviews dataset consists of reviews along with their labels (positive or negative sentiment). Each review is a sentence or paragraph of text.

Download the Dataset: We will use a Google Drive link to download the dataset into our environment.

In [111]:
DATA_PATH = 'data/imdb_reviews.csv'
gdd.download_file_from_google_drive(file_id='1zfM5E6HvKIe7f3rEt1V2gBpw5QOSSKQz',dest_path=DATA_PATH,)

## Preprocessing

For our models to work effectively, we need to preprocess the text data by cleaning it and converting words to integer indices for training.Preproces steps
such as Tokenization and Cleaning , Replacing Rare Words , Build Vocabulary , Convert Tokens to Indices and Prepare Data for Training.

**NOTE : Do not alter the structure of this preprocessing code, as it aligns with other parts of the notebook.However, minor adjustments for compatibility with your code are allowed if needed.**

In [112]:
def tokenize(text, stop_words):
    text = re.sub(r'[^\w\s]', '', text)
    text = text.lower()
    tokens = wordpunct_tokenize(text)
    tokens = [token for token in tokens if token not in stop_words]
    return tokens

In [113]:
def remove_rare_words(tokens, common_tokens, max_len):
    return [token if token in common_tokens
            else '<UNK>' for token in tokens][-max_len:]

In [114]:
def load_and_preprocess_data(data_path, max_vocab, max_len):
    df = pd.read_csv(data_path)
    stop_words = set(stopwords.words('english'))

    # Clean and tokenize
    df['tokens'] = df['review'].apply(lambda x: tokenize(x, stop_words))

    # Replace rare words with <UNK>
    all_tokens = [token for tokens in df['tokens'] for token in tokens]
    common_tokens = set(list(zip(*Counter(all_tokens).most_common(max_vocab)))[0])
    df['tokens'] = df['tokens'].apply(lambda x: remove_rare_words(x, common_tokens, max_len))

    # Remove sequences with only <UNK>
    df = df[df['tokens'].apply(lambda tokens: any(token != '<UNK>' for token in tokens))]

    # Build vocab
    vocab = sorted(set([token for tokens in df['tokens'] for token in tokens]))
    token2idx = {token: idx for idx, token in enumerate(vocab)}
    token2idx['<PAD>'] = len(token2idx)

    # Index tokens
    df['indexed_tokens'] = df['tokens'].apply(lambda tokens: [token2idx[token] for token in tokens])

    return df['indexed_tokens'].tolist(), df['label'].tolist(), token2idx

In [115]:
# How many of the most common vocab words to keep
# Uncommon words get replaced with unknown token <UNK>
max_vocab = 2500

# How many tokens long each sequence will be cut to
# Shorter sequences will get the padding token <PAD>
max_len = 100

sequences, targets, token2idx = load_and_preprocess_data(DATA_PATH, max_vocab, max_len)


In [116]:
def split_data(sequences, targets, valid_ratio=0.05, test_ratio=0.05):
    total_size = len(sequences)
    test_size = int(total_size * test_ratio)
    valid_size = int(total_size * valid_ratio)
    train_size = total_size - valid_size - test_size

    train_sequences, train_targets = sequences[:train_size], targets[:train_size]
    valid_sequences, valid_targets = sequences[train_size:train_size + valid_size], targets[train_size:train_size + valid_size]
    test_sequences, test_targets = sequences[train_size + valid_size:], targets[train_size + valid_size:]

    return train_sequences, train_targets, valid_sequences, valid_targets, test_sequences, test_targets

In [117]:
train_sequences, train_targets, valid_sequences, valid_targets, test_sequences, test_targets = split_data(sequences, targets)

In [118]:
def collate(batch):
    inputs, targets = zip(*batch)
    inputs_padded = pad_sequences(inputs, padding_val=token2idx['<PAD>'])
    return torch.LongTensor(inputs_padded), torch.LongTensor(targets)

In [119]:
def pad_sequences(sequences, padding_val=0, pad_left=False):
    """Pad a list of sequences to the same length with a padding_val."""
    sequence_length = max(len(sequence) for sequence in sequences)
    if not pad_left:
        return [sequence + [padding_val] * (sequence_length - len(sequence)) for sequence in sequences]
    return [[padding_val] * (sequence_length - len(sequence)) + sequence for sequence in sequences]

In [120]:
batch_size = 256
train_data = list(zip(train_sequences, train_targets))
valid_data = list(zip(valid_sequences, valid_targets))
test_data = list(zip(test_sequences, test_targets))

train_loader = DataLoader(train_data, batch_size=batch_size, shuffle=True, collate_fn=collate)
valid_loader = DataLoader(valid_data, batch_size=batch_size, shuffle=False, collate_fn=collate)
test_loader = DataLoader(test_data, batch_size=batch_size, shuffle=False, collate_fn=collate)

# RNN

## RNN with nn.RNN
Implement a basic RNN model using PyTorch's built-in nn.RNN.
Define layers: embedding, RNN, and fully connected.

In [121]:
class RNNClassifier(nn.Module):
    def __init__(self, output_size, hidden_size, vocab_size,
                 device, n_layers=1,
                 embedding_dimension=50):
        super(RNNClassifier, self).__init__()
        self.hidden_size = hidden_size
        self.output_size = output_size
        self.n_layers = n_layers
        self.device = device
        # Embedding layer
        self.embedding = nn.Embedding(vocab_size, embedding_dimension, padding_idx=token2idx['<PAD>'])

        ###################################### TODO #####################################
        #                          COMPLETE THE FOLLOWING SECTION                       #
        #################################################################################
        """Define Needed Layers """
        #Your Code Here
         # Recurrent layer
        self.rnn = nn.LSTM(embedding_dimension, hidden_size, n_layers, batch_first=True)

        # Fully connected layer
        self.fc = nn.Linear(hidden_size, output_size)

        #################################################################################
        #                                   THE END                                     #
        #################################################################################

    def forward(self, inputs):
        ###################################### TODO #####################################
        #                          COMPLETE THE FOLLOWING SECTION                       #
        #################################################################################
        """
        Implements the forward pass: first, embed the input tokens, then pass
        the embeddings through the RNN layer to capture sequential dependencies.
        Finally, use fully connected layers to output class probabilities.
        """
        #Your Code Here
        # Embedding the input
        embedded = self.embedding(inputs)

        # Passing the embeddings through the RNN layer
        rnn_out, _ = self.rnn(embedded)

        # Using the output of the last time step for classification
        out = self.fc(rnn_out[:, -1, :])  # Get the last time step output

        #################################################################################
        #                                   THE END                                     #
        #################################################################################
        return out #probabilities for each class in the output.

### Train model

In this section, you should train model for multiple epochs on the training data and evaluate it on the validation data after each epoch, reporting the model's accuracy. Ensure that the model is set to train mode during training and switched to eval mode for evaluation on the validation data. The objective is to implement the training loop and, at the next , compute and report the final accuracy on the test data.

**Note**: You are not allowed to use library-built trainer functions in this section; the training loop should be implemented manually.

**Note**: To implement the training loop, you have the option to create a single train_model function that trains a model over multiple epochs, calculates training and validation accuracy, and logs the losses. Once written, this function can be reused for all RNN and LSTM models, allowing you to simply call it with different model instances for training. Reusing the function in this way will ensure that you receive credit for the training section of each subsequent model without needing to write separate loops , with just the correct function call.








In [122]:
model = RNNClassifier(output_size=2, hidden_size=128, vocab_size=len(token2idx), device=device, n_layers=1, embedding_dimension=50)
model.to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

In [123]:
############################# TODO #############################
# TODO: Implement the training loop
################################################################
def train(model, train_loader, val_loader, criterion, optimizer, device, num_epochs=10):
    model.train()
    for epoch in range(num_epochs):
        # Training phase
        train_loss = 0.0
        correct_train = 0
        total_train = 0

        for batch in train_loader:
            inputs, labels = batch
            inputs, labels = inputs.to(device), labels.to(device)

            # Zero the parameter gradients
            optimizer.zero_grad()

            # Forward pass
            outputs = model(inputs)

            # Compute loss
            loss = criterion(outputs, labels)

            # Backward pass and optimize
            loss.backward()
            optimizer.step()

            # Accumulate loss and accuracy
            train_loss += loss.item() * inputs.size(0)
            _, predicted = torch.max(outputs, 1)
            correct_train += (predicted == labels).sum().item()
            total_train += labels.size(0)

        # Validation phase
        model.eval()
        val_loss = 0.0
        correct_val = 0
        total_val = 0

        with torch.no_grad():
            for batch in val_loader:
                inputs, labels = batch
                inputs, labels = inputs.to(device), labels.to(device)

                # Forward pass
                outputs = model(inputs)

                # Compute loss
                loss = criterion(outputs, labels)
                val_loss += loss.item() * inputs.size(0)

                # Calculate accuracy
                _, predicted = torch.max(outputs, 1)
                correct_val += (predicted == labels).sum().item()
                total_val += labels.size(0)

        # Calculate average losses and accuracies
        avg_train_loss = train_loss / len(train_loader.dataset)
        avg_val_loss = val_loss / len(val_loader.dataset)
        train_accuracy = correct_train / total_train * 100
        val_accuracy = correct_val / total_val * 100

        print(f"Epoch {epoch+1}/{num_epochs}")
        print(f"Train Loss: {avg_train_loss:.4f}, Train Accuracy: {train_accuracy:.2f}%")
        print(f"Validation Loss: {avg_val_loss:.4f}, Validation Accuracy: {val_accuracy:.2f}%")
        print('-' * 50)

        # Switch back to train mode
        model.train()

train(model, train_loader, valid_loader, criterion, optimizer, device, num_epochs=10)






Epoch 1/10
Train Loss: 0.6721, Train Accuracy: 56.62%
Validation Loss: 0.6501, Validation Accuracy: 57.97%
--------------------------------------------------
Epoch 2/10
Train Loss: 0.5076, Train Accuracy: 75.77%
Validation Loss: 0.4380, Validation Accuracy: 79.95%
--------------------------------------------------
Epoch 3/10
Train Loss: 0.3889, Train Accuracy: 82.99%
Validation Loss: 0.3619, Validation Accuracy: 84.36%
--------------------------------------------------
Epoch 4/10
Train Loss: 0.3366, Train Accuracy: 85.73%
Validation Loss: 0.3609, Validation Accuracy: 84.62%
--------------------------------------------------
Epoch 5/10
Train Loss: 0.3107, Train Accuracy: 86.93%
Validation Loss: 0.3147, Validation Accuracy: 86.42%
--------------------------------------------------
Epoch 6/10
Train Loss: 0.2836, Train Accuracy: 88.16%
Validation Loss: 0.2869, Validation Accuracy: 88.54%
--------------------------------------------------
Epoch 7/10
Train Loss: 0.2691, Train Accuracy: 88.83

## RNN from Scratch
Implement an RNN from scratch by creating a custom RNN cell and a model that stacks these cells over time.

In [124]:
class CustomRNNCell(nn.Module):
    def __init__(self, input_size, hidden_size):
        super(CustomRNNCell, self).__init__()
        self.hidden_size = hidden_size
        ###################################### TODO #####################################
        #                          COMPLETE THE FOLLOWING SECTION                       #
        #################################################################################
        """Define Input-to-Hidden and Hidden-to-Hidden Layers"""
        #Your Code Here
        # Input-to-hidden transformation
        self.i2h = nn.Linear(input_size, hidden_size)
        # Hidden-to-hidden transformation
        self.h2h = nn.Linear(hidden_size, hidden_size)
        #################################################################################
        #                                   THE END                                     #
        #################################################################################

    def forward(self, input, hidden):
        ###################################### TODO #####################################
        #                          COMPLETE THE FOLLOWING SECTION                       #
        #################################################################################
        """
        Implements the forward pass: combines the input and previous hidden state
        to calculate the new hidden state for this RNN cell.
        """
        #Your Code Here
        combined = self.i2h(input) + self.h2h(hidden)
        hidden = torch.tanh(combined)

        #################################################################################
        #                                   THE END                                     #
        #################################################################################
        return hidden


In [125]:
class CustomRNN(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_size, output_size):
        super(CustomRNN, self).__init__()
        self.hidden_size = hidden_size
        self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=token2idx['<PAD>'])
        ###################################### TODO #####################################
        #                          COMPLETE THE FOLLOWING SECTION                       #
        #################################################################################
        """Define Custom RNN Cell and Fully Connected Layers"""
        #Your Code Here
        # Initialize your custom RNN cell
        self.rnn_cell = CustomRNNCell(embedding_dim, hidden_size)

        # Define a fully connected layer to map hidden state to output classes
        self.fc = nn.Linear(hidden_size, output_size)

        #################################################################################
        #                                   THE END                                     #
        #################################################################################


    def forward(self, inputs):
        ###################################### TODO #####################################
        #                          COMPLETE THE FOLLOWING SECTION                       #
        #################################################################################
        """
        Implements the forward pass: performs embedding lookup, iterates through each
        time step, and passes embeddings through the custom RNN cell. Finally,
        applies the fully connected layers to output class probabilities.
        """
        #Your Code Here
        # Embed input words
        embeddings = self.embedding(inputs)

        # Initialize hidden state (batch_size, hidden_size)
        batch_size = inputs.size(0)
        hidden = torch.zeros(batch_size, self.hidden_size, device=embeddings.device)

        # Iterate over time steps
        for t in range(inputs.size(1)):
            input_t = embeddings[:, t, :]
            hidden = self.rnn_cell(input_t, hidden)

        # Use the hidden state from the last time step; apply fully connected layer to output class probabilities
        out = self.fc(hidden)


        #################################################################################
        #                                   THE END                                     #
        #################################################################################
        return out #probabilities for each class in the output.


### Train model

In this section, you should train model for multiple epochs on the training data and evaluate it on the validation data after each epoch, reporting the model's accuracy. Ensure that the model is set to train mode during training and switched to eval mode for evaluation on the validation data. The objective is to implement the training loop and, at the next , compute and report the final accuracy on the test data.

In [126]:
model = CustomRNN(len(token2idx), 50, 128, 2)
model.to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

In [127]:
############################# TODO #############################
# TODO: Implement the training loop
################################################################
num_epochs = 10  # Number of epochs

for epoch in range(num_epochs):
    model.train()  # Set the model to training mode
    total_loss = 0

    for inputs, labels in train_loader:
        inputs, labels = inputs.to(device), labels.to(device)

        # Zero the gradients
        optimizer.zero_grad()

        # Forward pass
        outputs = model(inputs)

        # Compute the loss
        loss = criterion(outputs, labels)

        # Backward pass
        loss.backward()

        # Optimization step
        optimizer.step()

        total_loss += loss.item()

    avg_loss = total_loss / len(train_loader)
    print(f"Epoch [{epoch + 1}/{num_epochs}], Loss: {avg_loss:.4f}")

    ################## Optional: Validation Phase ##################
    model.eval()  # Set the model to evaluation mode
    val_loss = 0
    correct = 0
    total = 0

    with torch.no_grad():
        for inputs, labels in valid_loader:
            inputs, labels = inputs.to(device), labels.to(device)

            outputs = model(inputs)
            loss = criterion(outputs, labels)
            val_loss += loss.item()

            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    avg_val_loss = val_loss / len(valid_loader)
    accuracy = 100 * correct / total
    print(f"Validation Loss: {avg_val_loss:.4f}, Accuracy: {accuracy:.2f}%")



Epoch [1/10], Loss: 0.6890
Validation Loss: 0.6736, Accuracy: 58.74%
Epoch [2/10], Loss: 0.6787
Validation Loss: 0.6783, Accuracy: 55.75%
Epoch [3/10], Loss: 0.6660
Validation Loss: 0.6773, Accuracy: 51.95%
Epoch [4/10], Loss: 0.6574
Validation Loss: 0.6777, Accuracy: 52.53%
Epoch [5/10], Loss: 0.6449
Validation Loss: 0.6234, Accuracy: 62.76%
Epoch [6/10], Loss: 0.6222
Validation Loss: 0.5593, Accuracy: 73.19%
Epoch [7/10], Loss: 0.6139
Validation Loss: 0.6762, Accuracy: 53.46%
Epoch [8/10], Loss: 0.6565
Validation Loss: 0.6367, Accuracy: 58.58%
Epoch [9/10], Loss: 0.5905
Validation Loss: 0.5237, Accuracy: 76.50%
Epoch [10/10], Loss: 0.5869
Validation Loss: 0.6600, Accuracy: 54.49%


### evaluate RNN models on test set
To complete evaluate_on_test, loop through the test data to get predictions, calculate accuracy, and print a classification report for model evaluation. This function can be used to evaluate the performance LSTM models too.

**NOTE : to earn full marks for this section, you must adjust the network's hyperparameters so that each rnn models achieves at least 70% accuracy on the test data. If you achieve less than the required accuracy, consider adjusting your training loop and hyperparameters, such as the hidden state size and learning rate, to improve model performance.**

In [128]:
# Evaluate on test data
def evaluate_on_test(model, test_loader):
    model.eval()
    ############################# TODO #############################
    # TODO: Iterate over the test_loader, obtain model predictions,
    # calculate accuracy, and generate a classification report.
    ################################################################
    y_true_test = []
    y_pred_test = []

    with torch.no_grad():  # Disable gradient computation for inference
        for inputs, labels in test_loader:
            inputs, labels = inputs.to(device), labels.to(device)

            # Forward pass to get predictions
            outputs = model(inputs)
            _, predicted = torch.max(outputs, 1)

            # Append true labels and predictions for evaluation
            y_true_test.extend(labels.cpu().numpy())
            y_pred_test.extend(predicted.cpu().numpy())

    # Calculate accuracy
    accuracy = accuracy_score(y_true_test, y_pred_test)
    print(f"Accuracy: {accuracy * 100:.2f}%")

    # Generate classification report
    print("Classification Report:")

    print(classification_report(y_true_test, y_pred_test))

In [129]:
# Evaluate both RNN models on the test dataset
# Evaluate first RNN model
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
rnn_model_1 = RNNClassifier(output_size=2, hidden_size=128, vocab_size=len(token2idx), device=device, n_layers=1, embedding_dimension=50)
rnn_model_1.to(device)
rnn_model_2 =  CustomRNN(len(token2idx), 50, 128, 2)
rnn_model_2.to(device)
print("Evaluating RNN Model 1:")
evaluate_on_test(rnn_model_1, test_loader)

# Evaluate second RNN model
print("Evaluating RNN Model 2:")
evaluate_on_test(rnn_model_2, test_loader)

Evaluating RNN Model 1:
Accuracy: 48.57%
Classification Report:
              precision    recall  f1-score   support

           0       0.51      0.24      0.33      1619
           1       0.48      0.75      0.58      1488

    accuracy                           0.49      3107
   macro avg       0.50      0.50      0.46      3107
weighted avg       0.50      0.49      0.45      3107

Evaluating RNN Model 2:
Accuracy: 51.92%
Classification Report:
              precision    recall  f1-score   support

           0       0.53      0.70      0.60      1619
           1       0.50      0.32      0.39      1488

    accuracy                           0.52      3107
   macro avg       0.51      0.51      0.50      3107
weighted avg       0.51      0.52      0.50      3107



# LSTM

## LSTM with nn.LSTM
Define an LSTM model using PyTorch's built-in nn.LSTM.

In [130]:
class LSTMClassifier(nn.Module):
    def __init__(self, output_size, hidden_size, vocab_size,
                 device, bidirectional=False, n_layers=1,
                 embedding_dimension=50):
        super(LSTMClassifier, self).__init__()
        self.hidden_size = hidden_size
        self.output_size = output_size
        self.n_layers = n_layers
        self.device = device
        self.num_directions = 2 if bidirectional else 1

        # Embedding layer
        self.embedding = nn.Embedding(vocab_size, embedding_dimension, padding_idx = token2idx['<PAD>'])

        ###################################### TODO #####################################
        #                          COMPLETE THE FOLLOWING SECTION                       #
        #################################################################################
        """Define the LSTM layer and fully connected layers"""
        #Your Code Here: Initialize an nn.LSTM layer and any required fully connected layers.
        # Define the LSTM layer
        self.lstm = nn.LSTM(
            input_size=embedding_dimension,
            hidden_size=hidden_size,
            num_layers=n_layers,
            bidirectional=bidirectional,
            batch_first=True
        )

        # Define the fully connected layer
        self.fc = nn.Linear(hidden_size * self.num_directions, output_size)


        #################################################################################
        #                                   THE END                                     #
        #################################################################################

    def forward(self, inputs):
        # Initialize hidden state and cell state with zeros
        hidden = torch.zeros(self.n_layers * self.num_directions, inputs.size(0), self.hidden_size).to(inputs.device)
        cell_state = torch.zeros(self.n_layers * self.num_directions, inputs.size(0), self.hidden_size).to(inputs.device)

        ###################################### TODO #####################################
        #                          COMPLETE THE FOLLOWING SECTION                       #
        #################################################################################
        """
        Implements the forward pass: first, embed the input tokens, then pass
        the embeddings through the LSTM layer to capture sequential dependencies.
        Finally, use fully connected layers to output class probabilities.
        """
        #Your Code Here
        # Embed the input tokens
        embeddings = self.embedding(inputs)

        # Pass through LSTM
        lstm_out, (hidden, cell_state) = self.lstm(embeddings, (hidden, cell_state))

        # For classification, we often use the last hidden state of the sequence
        # Depending on bidirectional, concatenate the final hidden states from both directions
        if self.num_directions == 2:  # Bidirectional
            last_hidden = torch.cat((hidden[-2], hidden[-1]), dim=1)  # Concatenate the last layers
        else:  # Unidirectional
            last_hidden = hidden[-1]

        out = self.fc(last_hidden)
        #################################################################################
        #                                   THE END                                     #
        #################################################################################

        return out  # probabilities for each class in the output.

### Train model

In this section, you should train model for multiple epochs on the training data and evaluate it on the validation data after each epoch, reporting the model's accuracy. Ensure that the model is set to train mode during training and switched to eval mode for evaluation on the validation data. The objective is to implement the training loop and, at the next , compute and report the final accuracy on the test data.

**Note**: You are not allowed to use library-built trainer functions in this section; the training loop should be implemented manually.

**Note**: To implement the training loop, you have the option to create a single train_model function that trains a model over multiple epochs, calculates training and validation accuracy, and logs the losses. Once written, this function can be reused for all RNN and LSTM models, allowing you to simply call it with different model instances for training. Reusing the function in this way will ensure that you receive credit for the training section of each subsequent model without needing to write separate loops , with just the correct function call.

In [131]:
model = LSTMClassifier(
    output_size=3, hidden_size=128, vocab_size=len(token2idx), device=device,
    bidirectional=True, n_layers=2, embedding_dimension=50
).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

In [132]:
############################# TODO #############################
# TODO: Implement the training loop
################################################################
def train_model(model, train_loader, val_loader, criterion, optimizer, device, num_epochs=10):
    train_losses = []
    val_accuracies = []

    for epoch in range(num_epochs):
        # Training phase
        model.train()  # Set model to training mode
        total_loss = 0
        for inputs, targets in tqdm(train_loader, desc=f"Epoch {epoch+1}/{num_epochs} - Training"):
            inputs, targets = inputs.to(device), targets.to(device)

            optimizer.zero_grad()  # Clear gradients
            outputs = model(inputs)  # Forward pass
            loss = criterion(outputs, targets)  # Compute loss
            loss.backward()  # Backward pass
            optimizer.step()  # Update weights

            total_loss += loss.item()

        avg_train_loss = total_loss / len(train_loader)
        train_losses.append(avg_train_loss)

        # Validation phase
        model.eval()  # Set model to evaluation mode
        correct = 0
        total = 0
        with torch.no_grad():
            for inputs, targets in tqdm(val_loader, desc=f"Epoch {epoch+1}/{num_epochs} - Validation"):
                inputs, targets = inputs.to(device), targets.to(device)
                outputs = model(inputs)  # Forward pass
                _, predicted = torch.max(outputs, dim=1)
                correct += (predicted == targets).sum().item()
                total += targets.size(0)

        val_accuracy = correct / total
        val_accuracies.append(val_accuracy)

        # Log progress
        print(f"Epoch {epoch+1}/{num_epochs}:")
        print(f"    Training Loss: {avg_train_loss:.4f}")
        print(f"    Validation Accuracy: {val_accuracy:.4f}")

    return model, train_losses, val_accuracies

# Model instantiation
output_size = 3  # Example: number of output classes
hidden_size = 128
vocab_size = len(token2idx)  # Adjust based on your tokenizer
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
embedding_dimension = 50
n_layers = 2
bidirectional = True

lstm = LSTMClassifier(
    output_size=output_size,
    hidden_size=hidden_size,
    vocab_size=vocab_size,
    device=device,
    bidirectional=bidirectional,
    n_layers=n_layers,
    embedding_dimension=embedding_dimension
).to(device)

# Loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)


# Training the model
trained_model, train_losses, val_accuracies = train_model(
    model=model,
    train_loader=train_loader,
    val_loader=valid_loader,
    criterion=criterion,
    optimizer=optimizer,
    device=device,
    num_epochs=10
)

Epoch 1/10 - Training: 100%|██████████| 219/219 [00:10<00:00, 19.91it/s]
Epoch 1/10 - Validation: 100%|██████████| 13/13 [00:00<00:00, 53.04it/s]


Epoch 1/10:
    Training Loss: 0.6475
    Validation Accuracy: 0.7621


Epoch 2/10 - Training: 100%|██████████| 219/219 [00:11<00:00, 19.59it/s]
Epoch 2/10 - Validation: 100%|██████████| 13/13 [00:00<00:00, 51.80it/s]


Epoch 2/10:
    Training Loss: 0.4254
    Validation Accuracy: 0.8301


Epoch 3/10 - Training: 100%|██████████| 219/219 [00:11<00:00, 19.60it/s]
Epoch 3/10 - Validation: 100%|██████████| 13/13 [00:00<00:00, 52.57it/s]


Epoch 3/10:
    Training Loss: 0.3471
    Validation Accuracy: 0.8587


Epoch 4/10 - Training: 100%|██████████| 219/219 [00:11<00:00, 19.65it/s]
Epoch 4/10 - Validation: 100%|██████████| 13/13 [00:00<00:00, 53.79it/s]


Epoch 4/10:
    Training Loss: 0.3128
    Validation Accuracy: 0.8725


Epoch 5/10 - Training: 100%|██████████| 219/219 [00:11<00:00, 19.78it/s]
Epoch 5/10 - Validation: 100%|██████████| 13/13 [00:00<00:00, 52.53it/s]


Epoch 5/10:
    Training Loss: 0.2821
    Validation Accuracy: 0.8848


Epoch 6/10 - Training: 100%|██████████| 219/219 [00:10<00:00, 19.99it/s]
Epoch 6/10 - Validation: 100%|██████████| 13/13 [00:00<00:00, 49.45it/s]


Epoch 6/10:
    Training Loss: 0.2645
    Validation Accuracy: 0.8954


Epoch 7/10 - Training: 100%|██████████| 219/219 [00:10<00:00, 19.95it/s]
Epoch 7/10 - Validation: 100%|██████████| 13/13 [00:00<00:00, 51.81it/s]


Epoch 7/10:
    Training Loss: 0.2421
    Validation Accuracy: 0.9044


Epoch 8/10 - Training: 100%|██████████| 219/219 [00:11<00:00, 19.83it/s]
Epoch 8/10 - Validation: 100%|██████████| 13/13 [00:00<00:00, 51.74it/s]


Epoch 8/10:
    Training Loss: 0.2209
    Validation Accuracy: 0.9121


Epoch 9/10 - Training: 100%|██████████| 219/219 [00:11<00:00, 19.80it/s]
Epoch 9/10 - Validation: 100%|██████████| 13/13 [00:00<00:00, 53.84it/s]


Epoch 9/10:
    Training Loss: 0.2009
    Validation Accuracy: 0.9260


Epoch 10/10 - Training: 100%|██████████| 219/219 [00:11<00:00, 19.73it/s]
Epoch 10/10 - Validation: 100%|██████████| 13/13 [00:00<00:00, 51.75it/s]

Epoch 10/10:
    Training Loss: 0.1761
    Validation Accuracy: 0.9424





## Custom LSTM from Scratch
Implement an LSTM from scratch by defining a LSTM cell and a model that combines these cells over the sequence.

In [133]:
class CustomLSTMCell(nn.Module):
    def __init__(self, input_size, hidden_size):
        super(CustomLSTMCell, self).__init__()
        self.hidden_size = hidden_size
        ###################################### TODO #####################################
        #                          COMPLETE THE FOLLOWING SECTION                       #
        #################################################################################
        """Define Needed Layers """
        #Your Code Here
        # Input gate
        self.input_gate = nn.Linear(input_size + hidden_size, hidden_size)
        # Forget gate
        self.forget_gate = nn.Linear(input_size + hidden_size, hidden_size)
        # Output gate
        self.output_gate = nn.Linear(input_size + hidden_size, hidden_size)
        # Cell candidate
        self.cell_candidate = nn.Linear(input_size + hidden_size, hidden_size)

        #################################################################################
        #                                   THE END                                     #
        #################################################################################

    def forward(self, input, hidden, cell_state):
        ###################################### TODO #####################################
        #                          COMPLETE THE FOLLOWING SECTION                       #
        #################################################################################
        """Define Forward pass"""
        #Your Code Here

        combined = torch.cat((input, hidden), dim=1)

        # Calculate gates
        i_gate = torch.sigmoid(self.input_gate(combined))  # Input gate
        f_gate = torch.sigmoid(self.forget_gate(combined))  # Forget gate
        o_gate = torch.sigmoid(self.output_gate(combined))  # Output gate
        g_gate = torch.tanh(self.cell_candidate(combined))  # Cell candidate

        # Update cell state
        cell_state = f_gate * cell_state + i_gate * g_gate

        # Update hidden state
        hidden = o_gate * torch.tanh(cell_state)

        #################################################################################
        #                                   THE END                                     #
        #################################################################################

        return hidden, cell_state # New hidden state , New cell state


In [134]:
class CustomLSTM(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_size, output_size):
        super(CustomLSTM, self).__init__()
        self.hidden_size = hidden_size
        self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=token2idx['<PAD>'])
        ###################################### TODO #####################################
        #                          COMPLETE THE FOLLOWING SECTION                       #
        #################################################################################
        """Define Needed Layers """
        #Your Code Here

        # Define the custom LSTM cell
        self.lstm_cell = CustomLSTMCell(embedding_dim, hidden_size)

        # Fully connected layer to produce output probabilities
        self.fc = nn.Linear(hidden_size, output_size)

        #################################################################################
        #                                   THE END                                     #
        #################################################################################

    def forward(self, inputs):
        # Initialize hidden state and cell state with zeros
        hidden = torch.zeros(inputs.size(0), self.hidden_size).to(inputs.device)
        cell_state = torch.zeros(inputs.size(0), self.hidden_size).to(inputs.device)

        ###################################### TODO #####################################
        #                          COMPLETE THE FOLLOWING SECTION                       #
        #################################################################################
        """
        Implements the forward pass: first, embed the input tokens, then pass
        the embeddings through the LSTM layer to capture sequential dependencies.
        Finally, use fully connected layers to output class probabilities.
        """
        #Your Code Here

        # Embed the input tokens
        embeddings = self.embedding(inputs)

        # Process each time step
        for t in range(embeddings.size(1)):  # Iterate over the sequence length
            input_t = embeddings[:, t, :]  # Get the embedding for the t-th time step
            hidden, cell_state = self.lstm_cell(input_t, hidden, cell_state)

        # Pass the final hidden state through the fully connected layer
        out = self.fc(hidden)

        #################################################################################
        #                                   THE END                                     #
        #################################################################################

        return out  # probabilities for each class in the output.


### Train model

In this section, you should train model for multiple epochs on the training data and evaluate it on the validation data after each epoch, reporting the model's accuracy. Ensure that the model is set to train mode during training and switched to eval mode for evaluation on the validation data. The objective is to implement the training loop and, at the next , compute and report the final accuracy on the test data.

In [135]:
# Define the parameters
vocab_size = len(token2idx)  # Vocabulary size (size of token-to-index mapping)
embedding_dim = 50           # Embedding dimension
hidden_size = 128            # Hidden size of LSTM
output_size = 3              # Number of output classes

# Initialize the model
custom_lstm = CustomLSTM(vocab_size, embedding_dim, hidden_size, output_size).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

In [136]:
############################# TODO #############################
# TODO: Implement the training loop
################################################################
def train_model(model, train_loader, val_loader, criterion, optimizer, epochs=10, device='cpu'):
    # Store losses and accuracies for tracking
    train_losses = []
    val_accuracies = []

    for epoch in range(epochs):
        model.train()  # Set model to training mode
        running_loss = 0.0
        correct_predictions = 0
        total_predictions = 0

        # Iterate over the training data
        for batch in train_loader:
            inputs, labels = batch
            inputs, labels = inputs.to(device), labels.to(device)

            # Zero the gradients
            optimizer.zero_grad()

            # Forward pass
            outputs = model(inputs)

            # Compute loss
            loss = criterion(outputs, labels)

            # Backward pass and optimization
            loss.backward()
            optimizer.step()

            # Track loss for reporting
            running_loss += loss.item()

            # Get predictions and calculate accuracy
            _, predicted = torch.max(outputs, 1)
            correct_predictions += (predicted == labels).sum().item()
            total_predictions += labels.size(0)

        # Calculate average loss for this epoch
        avg_train_loss = running_loss / len(train_loader)
        train_losses.append(avg_train_loss)

        # Calculate training accuracy
        train_accuracy = 100 * correct_predictions / total_predictions

        # Evaluate on validation data
        val_accuracy = evaluate_model(model, val_loader, device)

        # Store validation accuracy
        val_accuracies.append(val_accuracy)

        # Print progress
        print(f"Epoch [{epoch+1}/{epochs}], "
              f"Training Loss: {avg_train_loss:.4f}, "
              f"Training Accuracy: {train_accuracy:.2f}%, "
              f"Validation Accuracy: {val_accuracy:.2f}%")

    return model, train_losses, val_accuracies

# Define the evaluation function
def evaluate_model(model, val_loader, device):
    model.eval()  # Set model to evaluation mode
    correct_predictions = 0
    total_predictions = 0

    with torch.no_grad():  # No gradients needed for evaluation
        for batch in val_loader:
            inputs, labels = batch
            inputs, labels = inputs.to(device), labels.to(device)

            # Forward pass
            outputs = model(inputs)

            # Get predictions
            _, predicted = torch.max(outputs, 1)
            correct_predictions += (predicted == labels).sum().item()
            total_predictions += labels.size(0)

    # Calculate validation accuracy
    val_accuracy = 100 * correct_predictions / total_predictions
    return val_accuracy


### evaluate LSTM models on test set
To complete evaluate_on_test, loop through the test data to get predictions, calculate accuracy, and print a classification report for model evaluation.
you can use the `evaluate_on_test` function implemented in the previous section. Alternatively, you may write a new function to conduct this evaluation.ensure report the "classification_report" of both LSTM models.

**NOTE : to earn full marks for this section, you must adjust the network's hyperparameters so that each lstm models achieves at least 80% accuracy on the test data. If you achieve less than the required accuracy, consider adjusting your training loop and hyperparameters, such as the hidden state size and learning rate, to improve model performance.**

In [137]:
# Evaluate both LSTM models on the test dataset

def evaluate_on_test(model, test_loader, device):
    model.eval()  # Set the model to evaluation mode
    correct_predictions = 0
    total_predictions = 0

    with torch.no_grad():  # Disable gradient calculation during evaluation
        for batch in test_loader:
            inputs, labels = batch
            inputs, labels = inputs.to(device), labels.to(device)

            # Forward pass
            outputs = model(inputs)

            # Get predictions
            _, predicted = torch.max(outputs, 1)
            correct_predictions += (predicted == labels).sum().item()
            total_predictions += labels.size(0)

    # Calculate test accuracy
    test_accuracy = 100 * correct_predictions / total_predictions
    return test_accuracy

# Evaluate the first LSTM model
test_accuracy_1 = evaluate_on_test(lstm, test_loader, device)
print(f"Test Accuracy for LSTMClassifier: {test_accuracy_1:.2f}%")

# Evaluate the second LSTM model
test_accuracy_2 = evaluate_on_test(custom_lstm, test_loader, device)
print(f"Test Accuracy for Custom LSTM: {test_accuracy_2:.2f}%")


Test Accuracy for LSTMClassifier: 42.90%
Test Accuracy for Custom LSTM: 44.06%


# Testing RNN and LSTM Models on a New Review

In [138]:
# Example review
review = "It is no wonder that the film has such a high rating, it is quite literally breathtaking. What can I say that hasn't said before? Not much, it's the story, the acting, the premise, but most of all, this movie is about how it makes you feel. Sometimes you watch a film, and can't remember it days later, this film loves with you, once you've seen it, you don't forget."


## Preprocess the test Review
To prepare the review for the model, we need to follow similar preprocessing steps as we did for the dataset:

Remove special characters and convert the text to lowercase.
Tokenize the text into individual words.
Remove stopwords to focus only on meaningful words.
Convert tokens to indices based on the token2idx dictionary created earlier.
Pad or truncate the sequence to a length of max_len .


In [139]:
import torch
import re
from nltk import wordpunct_tokenize
from nltk.corpus import stopwords


# Preprocessing function
def preprocess_text(text, stop_words, token2idx, max_len):

    ########################### TODO ###########################
    # Step 1: Clean and lowercase the input text
    ################################################################

    text = re.sub(r"[^a-zA-Z0-9\s]", "", text)  # Remove special characters
    text = text.lower()  # Convert to lowercase

    ########################### TODO ###########################
    # Step 2: Tokenize the text into words
    # - Use wordpunct_tokenize to split the cleaned text into individual word tokens.
    ############################################################

    tokens = wordpunct_tokenize(text)

    ########################### TODO ###########################
    # Step 3: Remove stopwords from the token list
    ############################################################

    tokens = [word for word in tokens if word not in stop_words]

    ########################### TODO ###########################
    # Step 4: Convert tokens to indices based on the token2idx dictionary
    # - For each token in the list, get the corresponding index from the token2idx dictionary.
    # - If a token is not found in token2idx, replace it with the index of '<UNK>'.
    ################################################################

    tokens_idx = [token2idx.get(word, token2idx['<UNK>']) for word in tokens]  # Use '<UNK>' for unknown words

    ########################### TODO ###########################
    # Step 5: Pad or truncate the tokens_idx list to the desired max_len
    ############################################################

    if len(tokens_idx) < max_len:
        tokens_idx = tokens_idx + [token2idx['<PAD>']] * (max_len - len(tokens_idx))  # Padding
    else:
        tokens_idx = tokens_idx[:max_len]  # Truncation

    ########################### End of TODOs ###########################

    return tokens_idx  # Return the processed list of indices

# Get stopwords
stop_words = set(stopwords.words('english'))

########################### TODO ###########################
# Set the maximum length for the review sequence
max_len = 100 # Example length

# Preprocess the review
review_indices = preprocess_text(review, stop_words, token2idx, max_len)
############################################################

# Convert the indices to a tensor and move it to the device (GPU or CPU)
input_tensor = torch.LongTensor([review_indices]).to(device)

## Make Predictions
Now that we have preprocessed the review, use both the RNN and LSTM models to make predictions on the sentiment of the review.

Set the model to evaluation mode to prevent updates during inference.
Predict the sentiment class by passing the input_tensor to the model.
Interpret the prediction as either "Positive" or "Negative" based on the model's output.

In [140]:
def predict_sentiment(model, input_tensor, model_name="Model"):
    model.eval()  # Set the model to evaluation mode
    ############################# TODO #############################
    # TODO: Perform a forward pass with the model on the input_tensor,
    # get the predicted class label, and map it to "Positive" or "Negative".
    ################################################################

    # Perform a forward pass with the model
    with torch.no_grad():  # We don't need gradients during inference
        output = model(input_tensor)  # Forward pass

    # Get the predicted class by finding the index with the highest probability
    _, predicted_class = torch.max(output, dim=1)  # Get index of max probability

    # Map the predicted class (assuming 0 = Negative, 1 = Positive)
    class_label = "Positive" if predicted_class.item() == 1 else "Negative"


    print(f"The predicted class for the review by {model_name} is: {class_label}")

In [141]:
# Make predictions using with "predict_sentiment" function for each of four models above

models = [rnn_model_1, rnn_model_2, lstm, custom_lstm]
model_names = ["RNN", "Custom RNN", "LSTM", "Custom LSTM"]

# Iterate through the models and make predictions
for model, model_name in zip(models, model_names):
    predict_sentiment(model, input_tensor, model_name)



The predicted class for the review by RNN is: Positive
The predicted class for the review by Custom RNN is: Negative
The predicted class for the review by LSTM is: Negative
The predicted class for the review by Custom LSTM is: Negative


# Questions

[1] - Based on your observations, what do you think caused the difference in performance between the RNN and LSTM models (on test set)? Analyze this difference using the results from the notebook, and discuss where a simple RNN might perform better.

[2] - If we increase max_len in the preprocessing step to 300, what changes in models (rnn & lstm ) performance would you expect, and why? Please *explain* and discuss the impact this may have on the learning process and the final results.

Answer1-

Results :

Test Accuracy for LSTMClassifier: 42.90%

Test Accuracy for Custom LSTM: 44.06%

-------------------------------------

Evaluating RNN Model 1:
Accuracy: 48.57%

Evaluating RNN Model 2:
Accuracy: 51.92%

----------------------------------------
Since the length of the sentences in the test data is short, that is likely why RNNs have shown better performance compared to LSTMs.

**Where RNNs Might Perform Better:**

On datasets with shorter sequences or simpler patterns, RNNs might perform as well or even better than LSTMs. Their simplicity makes them less prone to overfitting and faster to train.

Answer2-

Impact of Increasing max_len to 300
Expected Changes in Performance:

Improved Performance for LSTM:

With longer sequences (max_len = 300), LSTMs would have more context to make predictions, which is particularly beneficial if the sentiment of a review depends on long-range dependencies. The gating mechanisms in LSTMs allow them to handle this increased sequence length without significant degradation in learning.

Worsened or Unchanged Performance for RNN:

Increasing the sequence length exacerbates the vanishing gradient problem in RNNs, making it even harder for them to learn from earlier tokens in the sequence. As a result, performance may deteriorate or plateau.

Impact on the Learning Process:

Training Time:

Both RNNs and LSTMs would experience an increase in training time because of the additional computations required for longer sequences.
LSTMs may require more epochs to fully utilize the additional context, as longer sequences make learning more complex.

Risk of Overfitting:

Both models, especially LSTMs, might be at a higher risk of overfitting due to the increased input size. Regularization techniques (dropout, weight decay) would become more critical.

Memory Usage:

Both models would require more memory to process longer sequences. LSTMs, being more complex, might push system limits in resource-constrained environments.

Final Results:

LSTM: Likely to achieve better results on the test set as long as the data contains relevant long-range dependencies and sufficient training data exists to learn them.

RNN: Performance is unlikely to improve; in fact, it might degrade due to the challenges in learning long-term dependencies effectively.