## [Download the IMDB Dataset](https://medium.com/@sayedebad.777/building-rnn-lstm-and-gru-from-scratch-e27ebb0f8e0f)

In [6]:
import torch
import torch.nn as nn
import torch.optim as optim
from keras.src.utils.module_utils import tensorflow
from torch.utils.data import DataLoader, TensorDataset
from tensorflow.keras.datasets import imdb
from tensorflow.keras.preprocessing import sequence
import numpy as np

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f'Using device: {device}')
print(tensorflow.__version__)

Using device: cpu
2.18.0


In [7]:
# Define the maximum number of words to include in the vocabulary
max_features = 5000

# Define the maximum length to which sequences will be padded
max_len = 500

# Load the IMDB dataset with the specified vocabulary size
# x_train, y_train: Training data and labels
# x_test, y_test: Test data and labels
(x_train, y_train), (x_test, y_test) = imdb.load_data(num_words=max_features)

Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/imdb.npz
[1m17464789/17464789[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 0us/step


In [12]:
# Pad sequences to ensure uniform length 
# (ensures all input data samples have the same sequence length for model training)
x_train = sequence.pad_sequences(x_train, maxlen=max_len)
x_test = sequence.pad_sequences(x_test, maxlen=max_len)

# Convert the padded sequences and their corresponding labels into PyTorch tensors 
# (prepares data for usage in PyTorch DataLoader and models)
x_train = torch.as_tensor(x_train, dtype=torch.long)
y_train = torch.as_tensor(y_train, dtype=torch.float32)
x_test = torch.as_tensor(x_test, dtype=torch.long)
y_test = torch.as_tensor(y_test, dtype=torch.float32)

In [13]:
# Create DataLoader
batch_size = 64

# TensorDataset groups multiple tensors into a dataset. 
# Here, it combines the input data (x_train/x_test) and the labels (y_train/y_test).
train_data = TensorDataset(x_train, y_train)

# DataLoader is used to efficiently handle batching and shuffling of data during training and testing.
# Parameters: batch_size specifies the size of the batches, shuffle randomizes the dataset order.
train_loader = DataLoader(train_data, batch_size=batch_size, shuffle=True)

test_data = TensorDataset(x_test, y_test)
test_loader = DataLoader(test_data, batch_size=batch_size, shuffle=False)

## RNN Model

In [16]:
# Define the RNNModel class, a PyTorch neural network model for sentiment analysis
class RNNModel(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, output_dim, num_layers):
        super(RNNModel, self).__init__()
        # Embedding layer for converting word indices to dense vectors
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        # RNN (Recurrent Neural Network) layer with specified dimensions and number of layers
        # batch_first=True ensures input batches have shape (batch_size, sequence_length, features)
        self.rnn = nn.RNN(embedding_dim, hidden_dim, num_layers=num_layers, batch_first=True)
        # Fully connected layer to map hidden states to the output dimension
        self.fc = nn.Linear(hidden_dim, output_dim)
        # Sigmoid activation function to produce probabilities for binary classification
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        # Pass input through the embedding layer to get word embeddings
        x = self.embedding(x)
        # Pass the embeddings through the RNN layer; only use the last hidden state
        x, _ = self.rnn(x)
        # Extract the last time step's output
        x = x[:, -1, :]
        # Pass the last hidden state through the fully connected layer
        x = self.fc(x)
        # Apply the sigmoid activation to get the final output (binary probability)
        x = self.sigmoid(x)
        return x


# Define model hyperparameters
vocab_size = max_features  # Vocabulary size, i.e., max number of unique tokens
embedding_dim = 128  # Dimension of word embeddings
hidden_dim = 128  # Dimension of hidden state in the RNN
output_dim = 1  # Single output for binary classification (positive/negative sentiment)
num_layers = 3  # Number of layers for the RNN

# Initialize the model with defined hyperparameters and send it to the device (CPU or GPU)
rnn_model = RNNModel(vocab_size, embedding_dim, hidden_dim, output_dim, num_layers).to(device)

# Define binary cross-entropy loss for binary classification tasks
criterion = nn.BCELoss()

# Define the optimizer (Adam) for updating model parameters during training
optimizer = optim.Adam(rnn_model.parameters(), lr=0.001)

In [17]:
# Training Loop
num_epochs = 5  # Define the number of epochs for training

for epoch in range(num_epochs):  # Loop over each epoch
    rnn_model.train()  # Set the model to training mode
    total_loss = 0  # Initialize the total loss for this epoch
    for inputs, targets in train_loader:  # Iterate over batches in the training data
        inputs = inputs.to(device)  # Move input data to the specified device (CPU/GPU)
        targets = targets.to(device)  # Move target labels to the specified device
        optimizer.zero_grad()  # Reset gradients to zero before backpropagation
        outputs = rnn_model(inputs)  # Perform the forward pass through the RNN model
        loss = criterion(outputs.squeeze(), targets)  # Compute the binary cross-entropy loss
        loss.backward()  # Backpropagate the loss to calculate gradients
        optimizer.step()  # Update model parameters using the optimizer
        total_loss += loss.item()  # Accumulate the batch loss into the total loss

    avg_loss = total_loss / len(train_loader)  # Calculate average loss over all batches
    print(f'Epoch [{epoch + 1}/{num_epochs}], Loss: {avg_loss:.4f}')  # Print the epoch loss

Epoch [1/5], Loss: 0.6615
Epoch [2/5], Loss: 0.6542
Epoch [3/5], Loss: 0.6153
Epoch [4/5], Loss: 0.5841
Epoch [5/5], Loss: 0.6357


In [19]:
# Evaluate the model
rnn_model.eval()  # Set the model to evaluation mode (disables features like dropout)
correct, total = 0, 0  # Initialize counters for correct predictions and total samples
with torch.no_grad():  # Disable gradient computation for memory efficiency during evaluation
    for inputs, targets in test_loader:  # Iterate over batches from the test DataLoader
        inputs, targets = inputs.to(device), targets.to(device)  # Move inputs and targets to the device (CPU/GPU)
        outputs = rnn_model(inputs)  # Perform forward pass to get model predictions
        predicted = (outputs.squeeze() > 0.5).float()  # Apply threshold (0.5) to generate binary predictions
        total += targets.size(0)  # Add the number of samples in the current batch to the total
        correct += (predicted == targets).sum().item()  # Count the correctly predicted samples

accuracy = correct / total  # Calculate overall accuracy as the ratio of correct predictions to total samples
print(f'Test Accuracy: {accuracy * 100:.2f}% - {accuracy:.4f}')  # Print accuracy in percentage and decimal format

Test Accuracy: 60.12% - 0.6012


## LSTM Model

In [20]:
# Define the LSTMModel class for text classification
class LSTMModel(nn.Module):
    def __init__(self, vocab_size, embdding_dim, hidden_dim, output_dim, num_layers):
        super(LSTMModel, self).__init__()
        # Embedding layer for mapping word indices to dense vector representations
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        # LSTM layer for sequential data processing
        self.lstm = nn.LSTM(embedding_dim, hidden_dim, num_layers=num_layers, batch_first=True)
        # Fully connected layer for mapping hidden state features to output
        self.fc = nn.Linear(hidden_dim, output_dim)
        # Sigmoid activation function for binary classification
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        # Compute word embeddings
        x = self.embedding(x)
        # Process embeddings with the LSTM layer
        x, _ = self.lstm(x)
        # Extract features from the last time-step
        x = x[:, -1, :]
        # Apply the fully connected layer to the extracted features
        x = self.fc(x)
        # Apply the sigmoid activation to get probabilities
        x = self.sigmoid(x)
        return x


# Model hyperparameters
vocab_size = max_features  # Size of the vocabulary (number of unique tokens to use)
embedding_dim = 128  # Dimensionality of the embeddings for each token
hidden_dim = 128  # Dimensionality of the hidden state in the LSTM
output_dim = 1  # Single output for binary classification (positive/negative sentiment)
num_layers = 3  # Number of stacked LSTM layers in the network

# Initialize the model, send it to the available device (CPU or GPU)
lstm_model = LSTMModel(vocab_size, embedding_dim, hidden_dim, output_dim, num_layers).to(device)

# Define the loss function to measure the model's performance (Binary Cross-Entropy Loss)
criterion = nn.BCELoss()

# Define the optimizer for parameter updates during training (Adam optimizer)
optimizer = optim.Adam(lstm_model.parameters(), lr=0.001)

In [21]:
# Training loop for the LSTM model
# num_epochs defines the number of full passes through the dataset for training
num_epochs = 5

for epoch in range(num_epochs):  # Iterate over epochs
    lstm_model.train()  # Set the model to training mode (enables layers like dropout if present)
    total_loss = 0  # Initialize variable to accumulate total loss for the epoch

    # Iterate over batches of data from the training DataLoader
    for inputs, targets in train_loader:
        # Move inputs and targets to the device (CPU/GPU) to facilitate computation
        inputs, targets = inputs.to(device), targets.to(device)

        # Reset gradients to ensure they don’t accumulate from previous steps
        optimizer.zero_grad()

        # Perform the forward pass through the model to get predictions
        outputs = lstm_model(inputs)

        # Compute the loss between predictions and actual targets
        loss = criterion(outputs.squeeze(), targets)

        # Perform backpropagation to compute gradients
        loss.backward()

        # Update model parameters based on computed gradients
        optimizer.step()

        # Add the current batch's loss to the total loss
        total_loss += loss.item()

    # Compute the average loss over all batches in the epoch
    avg_loss = total_loss / len(train_loader)

    # Print the loss for the epoch to monitor training progress
    print(f'Epoch [{epoch + 1}/{num_epochs}], Loss: {avg_loss:.4f}')

Epoch [1/5], Loss: 0.5862
Epoch [2/5], Loss: 0.4440
Epoch [3/5], Loss: 0.3117
Epoch [4/5], Loss: 0.2437
Epoch [5/5], Loss: 0.1956


In [22]:
# Evaluate the model's performance on the test dataset
lstm_model.eval()  # Switch the model to evaluation mode, disabling dropout and other training-only behaviors
correct = 0  # Initialize a counter for the number of correct predictions
total = 0  # Initialize a counter for the total number of samples

# Disable gradient computation during evaluation to save memory and improve computation speed
with torch.no_grad():
    # Iterate through the test data batches
    for inputs, targets in test_loader:
        # Move input data and target labels to the selected device (CPU or GPU)
        inputs, targets = inputs.to(device), targets.to(device)

        # Perform a forward pass through the LSTM model to get the predicted probabilities
        outputs = lstm_model(inputs)

        # Apply a threshold of 0.5 to convert probabilities into binary predictions (0 or 1)
        predicted = (outputs.squeeze() >= 0.5).float()

        # Update the total number of samples and count correct predictions
        total += targets.size(0)
        correct += (predicted == targets).sum().item()

# Compute the overall test accuracy as the ratio of correctly predicted samples to the total samples
accuracy = correct / total
# Print the test accuracy, formatted to display four decimal places
print(f'Test Accuracy: {accuracy:.4f}')

Test Accuracy: 0.8810


## GRU Model

In [23]:
class GRUModel(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, output_dim, num_layers):
        super(GRUModel, self).__init__()
        # Embedding layer maps integer-encoded words to dense vectors of a fixed size (embedding_dim)
        self.embedding = nn.Embedding(vocab_size, embedding_dim)

        # GRU layer processes input sequences to capture temporal dependencies.
        # embedding_dim is the input size, hidden_dim is the output size of each GRU cell, 
        # and num_layers specifies the number of stacked GRU layers.
        # batch_first=True ensures input tensors have the shape (batch_size, seq_len, features).
        self.gru = nn.GRU(embedding_dim, hidden_dim, num_layers=num_layers, batch_first=True)

        # Fully connected layer maps the final hidden state of the GRU to the desired output dimension.
        self.fc = nn.Linear(hidden_dim, output_dim)

        # Activation function (Sigmoid) outputs probabilities for binary classification.
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        # Step 1: Convert input word indices into dense embeddings.
        x = self.embedding(x)

        # Step 2: Process embeddings with the GRU layer; the (_, _) outputs are hidden states.
        x, _ = self.gru(x)

        # Step 3: Extract the last hidden state across the sequence (x[:, -1, :]).
        x = x[:, -1, :]

        # Step 4: Pass the final hidden state through the fully connected layer.
        x = self.fc(x)

        # Step 5: Apply the sigmoid activation function to the output.
        x = self.sigmoid(x)
        return x


# Model hyperparameters (parameters for model configuration):
vocab_size = max_features  # Size of the vocabulary (number of unique tokens to use in embeddings).
embedding_dim = 128  # Number of dimensions in the embedding vectors.
hidden_dim = 128  # Number of hidden units in the GRU cell.
output_dim = 1  # Single output for binary classification (positive or negative sentiment).
num_layers = 3  # Number of stacked GRU layers.

# Initialize the GRU model with the defined hyperparameters.
gru_model = GRUModel(vocab_size, embedding_dim, hidden_dim, output_dim, num_layers)

# Define the loss function: Binary Cross Entropy (suitable for binary classification tasks).
criterion = nn.BCELoss()

# Define the optimizer: Adam optimizer adjusts the GRU model’s parameters to minimize the loss.
optimizer = optim.Adam(gru_model.parameters(), lr=0.001)

In [24]:
# Training loop
num_epochs = 5  # Define the number of epochs (full passes through training data).

for epoch in range(num_epochs):  # Outer loop over epochs, controlling the progress of training.
    gru_model.train()  # Set the GRU model to training mode (enables features like dropout if applied).

    for inputs, targets in train_loader:  # Inner loop iterating over batches of training data.
        inputs, targets = inputs.to(device), targets.to(
            device)  # Move inputs and targets to the computation device (CPU/GPU).
        optimizer.zero_grad()  # Clear gradients from the previous step to ensure they don't accumulate.
        outputs = gru_model(inputs)  # Perform a forward pass through the model to compute predictions.
        loss = criterion(outputs.squeeze(),
                         targets)  # Calculate the loss (Binary Cross Entropy) based on model outputs and target labels.
        loss.backward()  # Backpropagate the computed loss to calculate gradients for all model parameters.
        optimizer.step()  # Update model parameters using the computed gradients and optimizer.
        total_loss += loss.item()  # Accumulate the current batch's loss into the total loss for the epoch.

    avg_loss = total_loss / len(train_loader)  # Compute the average loss across all training batches.
    print(f'Epoch [{epoch + 1}/{num_epochs}], Loss: {avg_loss:.4f}')  # Output the average training loss for this epoch.

Epoch [1/5], Loss: 0.7172
Epoch [2/5], Loss: 1.0077
Epoch [3/5], Loss: 1.2188
Epoch [4/5], Loss: 1.3746
Epoch [5/5], Loss: 1.4816


In [25]:
# Evaluate the model
gru_model.eval()  # Set the GRU model to evaluation mode (disables dropout and similar layers).
correct = 0  # Initialize a counter for correct predictions.
total = 0  # Initialize a counter for the total number of samples.

# Disable gradient computation for evaluation to save memory and computational resources.
with torch.no_grad():
    # Loop through batches from the test DataLoader to compute predictions.
    for inputs, targets in test_loader:
        # Move the inputs and targets to the specified device (CPU/GPU).
        inputs, targets = inputs.to(device), targets.to(device)

        # Forward pass: compute the model's output probabilities for the current batch.
        outputs = gru_model(inputs)

        # Apply a threshold to the output probabilities to generate binary predictions.
        predicted = (outputs.squeeze() >= 0.5).float()

        # Update the total with the batch size.
        total += targets.size(0)

        # Count the number of correct predictions in the current batch.
        correct += (predicted == targets).sum().item()

# Calculate accuracy as the ratio of correct predictions to the total samples.
accuracy = correct / total

# Print the test accuracy formatted to four decimal places.
print(f'Test Accuracy: {accuracy:.4f}')

Test Accuracy: 0.8922
