# Implementing a Recurrent Neural Network for Sequential Data

In this exercise we will develop a recurrent neural network with vanilla RNN and GRU to perform classification, and test it out on the Text Document Classification Dataset. 😀

## Loading Text Document Classification Dataset.

Dataset contains different categories of text data. It contains labels for five different categories.
Politics = 0, Sport = 1, Technology = 2, Entertainment = 3, Business = 4.

In [None]:
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

from torch.utils.data import DataLoader, TensorDataset
from sklearn.model_selection import train_test_split
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.utils import to_categorical

# load dataset
data = pd.read_csv("./df_file.csv")
print(data.shape)

def preprocess_data(data, text_column, label_column):
    texts = data[text_column].values
    labels = data[label_column].values
    return texts, labels

texts, labels = preprocess_data(data, 'Text', 'Label')

(2225, 2)


In [None]:
# Build Tokenizer
tokenizer = Tokenizer(num_words=5000)  #num_words : number of word dictionary
tokenizer.fit_on_texts(texts)
sequences = tokenizer.texts_to_sequences(texts)

# padding for sentence length
max_seq_length = 100
padded_sequences = pad_sequences(sequences, maxlen=max_seq_length)

print(padded_sequences.shape)

(2225, 100)


In [None]:
# split dataset
train_features, test_features, train_labels, test_labels = train_test_split(padded_sequences, labels, test_size=0.3, random_state=42)
valid_features, test_features, valid_labels, test_labels = train_test_split(test_features, test_labels, test_size=0.5, random_state=42)

# change into tensor
train_features = torch.tensor(train_features, dtype=torch.long)
train_labels = torch.tensor(train_labels, dtype=torch.long)
valid_features = torch.tensor(valid_features, dtype=torch.long)
valid_labels = torch.tensor(valid_labels, dtype=torch.long)
test_features = torch.tensor(test_features, dtype=torch.long)
test_labels = torch.tensor(test_labels, dtype=torch.long)

# Build DataLoader
train_dataset = TensorDataset(train_features, train_labels)
valid_dataset = TensorDataset(valid_features, valid_labels)
test_dataset = TensorDataset(test_features, test_labels)

train_loader = DataLoader(dataset=train_dataset, batch_size=32, shuffle=True)
valid_loader = DataLoader(dataset=valid_dataset, batch_size=32, shuffle=False)
test_loader = DataLoader(dataset=test_dataset, batch_size=32, shuffle=False)

# print batch size
for batch in train_loader:
    inputs, targets = batch
    print(f"Input batch shape: {inputs.shape}")
    print(f"Target batch shape: {targets.shape}")
    break


Input batch shape: torch.Size([32, 100])
Target batch shape: torch.Size([32])


Code for testing model

In [None]:
def evaluate_model(model, val_loader):
    model.eval()
    val_loss = 0.0
    correct = 0
    total = 0

    with torch.no_grad():
        for batch in val_loader:
            inputs = batch[0].to(device)
            labels = batch[1].to(device)

            outputs = model(inputs)
            loss = criterion(outputs, labels)
            val_loss += loss.item()

            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    avg_loss = val_loss / len(val_loader)
    accuracy = 100 * correct / total
    print(f'Validation Loss: {avg_loss:.4f}, Accuracy: {accuracy:.2f}%')
    return avg_loss

def test_model(model, test_loader):
    model.eval()
    test_loss = 0.0
    correct = 0
    total = 0

    with torch.no_grad():
        for batch in test_loader:
            inputs = batch[0].to(device)
            labels = batch[1].to(device)

            outputs = model(inputs)
            loss = criterion(outputs, labels)
            test_loss += loss.item()

            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    avg_loss = test_loss / len(test_loader)
    accuracy = 100 * correct / total
    print(f'Test Loss: {avg_loss:.4f}, Accuracy: {accuracy:.2f}%')

## Vanilla RNN Implementation

Here we will implement vanilla RNN model.

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class VanillaRNN(nn.Module):
    #####fill in the blanks#####
    def __init__(self, vocab_size, embedding_dim, hidden_size, output_size):
        super(VanillaRNN, self).__init__()
        self.hidden_size = hidden_size
        self.embedding = nn.Embedding(num_embeddings=vocab_size, embedding_dim=embedding_dim)

        # Weights for input to hidden connection
        self.Wx = nn.Linear(##blank##, ##blank##)
        # Weights for hidden to hidden connection
        self.Wh = nn.Linear(##blank##, ##blank##)
        # Fully connected layer to map hidden state to output
        self.fc = nn.Linear(##blank##, ##blank##)

        # Activation function (tanh) for the hidden state
        self.tanh = nn.Tanh()

    def forward(self, x):
        x = self.embedding(x)
        batch_size, seq_len, _ = x.size()

        # Initialize hidden state with zeros
        h = torch.zeros(batch_size, self.hidden_size).to(x.device)

        # Iterate over each time step
        for t in range(seq_len):
            xt = ##fill in your code##  # Select the t-th time step input
            h = ##fill in your code##  # Update hidden state

        # Use the hidden state from the last time step to predict the output
        out = self.fc(h)
        return out

In [None]:
# model params
vocab_size = 5000
embedding_dim = 64
hidden_size = 128
output_size = 5

# build GRU model
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = VanillaRNN(vocab_size, embedding_dim, hidden_size, output_size)

Here we will train and test vanilla RNN model

In [None]:
# hyperparameters
learning_rate = 0.001
num_epochs = 10

# loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

model.to(device)

#start training
best_val_loss = float('inf')
for epoch in range(num_epochs):
    model.train()

    for batch in train_loader:
        inputs = batch[0].to(device)
        labels = batch[1].to(device)

        outputs = model(inputs)

        loss = criterion(outputs, labels)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}")
    val_loss = evaluate_model(model, valid_loader)

    if val_loss < best_val_loss:
        best_val_loss = val_loss
        torch.save(model.state_dict(), 'rnn_best_model.pth')
        print(f"Best model saved with validation loss: {val_loss:.4f}")

Here you can get your final test accuracy.

In [None]:
print("Evaluating on test set:")
test_model(model, test_loader)

## GRU (Gated Recurrent Unit) Implementation

Here, we will implement GRU model.

In [None]:
import torch
import torch.nn as nn

class GRUCell(nn.Module):
  #####fill in the blanks#####
    def __init__(self, input_size, hidden_size):
        super(GRUCell, self).__init__()
        self.hidden_size = hidden_size

        # Update gate
        self.W_z = nn.Linear(##blank##, ##blank##)
        self.U_z = nn.Linear(##blank##, ##blank##)

        # Reset gate
        self.W_r = nn.Linear(##blank##, ##blank##)
        self.U_r = nn.Linear(##blank##, ##blank##)

        # Candidate hidden state
        self.W_h = nn.Linear(##blank##, ##blank##)
        self.U_h = nn.Linear(##blank##, ##blank##)

    def forward(self, x, h_prev):
        # Update gate
        # hint : The update gate controls how much of the previous hidden state (h_prev)
        # should be carried forward to the next hidden state.
        z_t = ##fill in your code##

        # Reset gate
        # hint : The reset gate determines how much of the previous hidden state
        # should be "reset" or ignored when computing the candidate hidden state
        r_t =##fill in your code##

        # Candidate hidden state
        # hint : The candidate hidden state is computed using a combination of the reset hidden state and the current input.
        h_tilde = ##fill in your code##

        # New final hidden state
        # The final hidden state is a blend of the previous hidden state and the candidate hidden state
        h_t = ##fill in your code##

        return h_t

class GRUModel(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_size, output_size):
        super(GRUModel, self).__init__()
        self.hidden_size = hidden_size
        self.embedding = nn.Embedding(num_embeddings=vocab_size, embedding_dim=embedding_dim)
        self.gru_cell = GRUCell(embedding_dim, hidden_size)
        self.fc = nn.Linear(hidden_size, output_size)

    def forward(self, x):
        x = self.embedding(x)
        batch_size, seq_len, _ = x.size()
        h_t = torch.zeros(batch_size, self.hidden_size).to(x.device)

        for t in range(seq_len):
            h_t = ##fill in your code##

        out = self.fc(h_t)
        return out


In [None]:
# model params
vocab_size = 5000
embedding_dim = 64
hidden_size = 128
output_size = 5

# build GRU model
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = GRUModel(vocab_size, embedding_dim, hidden_size, output_size)

Here we will train and test our GRU model.

In [None]:
# hyperparameters
learning_rate = 0.001
num_epochs = 20

# loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

model.to(device)

#start training
best_val_loss = float('inf')
for epoch in range(num_epochs):
    model.train()

    for batch in train_loader:
        inputs = batch[0].to(device)
        labels = batch[1].to(device)

        outputs = model(inputs)

        loss = criterion(outputs, labels)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}")
    val_loss = evaluate_model(model, valid_loader)

    if val_loss < best_val_loss:
        best_val_loss = val_loss
        torch.save(model.state_dict(), 'gru_best_model.pth')
        print(f"Best model saved with validation loss: {val_loss:.4f}")

Here you can get your final test accuracy.

In [None]:
print("Evaluating on test set:")
test_model(model, test_loader)