# Text Classification

## RNN & LSTM

In this project, I will implement, train, and evaluate Recurrent Neural Networks (RNNs) and Long Short-Term Memory (LSTM) models on a text classification task using a dataset of IMDB movie reviews, and compare them.

In [None]:
import nltk
nltk.download('stopwords')
import random
import re
from collections import Counter
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from nltk.corpus import stopwords
from nltk import wordpunct_tokenize
from tqdm import tqdm
from sklearn.metrics import classification_report
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch import optim
from torch.utils.data import DataLoader
from IPython.core.display import display, HTML
tqdm.pandas()

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
device

# Dataset

In this section, we’ll load the IMDB dataset and preprocess the data to make it suitable for training RNN and LSTM models.

## Load Dataset
Description of Dataset: The IMDB movie reviews dataset consists of reviews along with their labels (positive or negative sentiment). Each review is a sentence or paragraph of text.

Download the Dataset: We will use a Google Drive link to download the dataset into our environment.

In [None]:
DATA_PATH = 'data/imdb_reviews.csv'
# gdd.download_file_from_google_drive(file_id='1zfM5E6HvKIe7f3rEt1V2gBpw5QOSSKQz',dest_path=DATA_PATH,)

## Preprocessing

For our models to work effectively, we need to preprocess the text data by cleaning it and converting words to integer indices for training.Preproces steps
such as Tokenization and Cleaning , Replacing Rare Words , Build Vocabulary , Convert Tokens to Indices and Prepare Data for Training.

In [None]:
def tokenize(text, stop_words):
    text = re.sub(r'[^\w\s]', '', text)
    text = text.lower()
    tokens = wordpunct_tokenize(text)
    tokens = [token for token in tokens if token not in stop_words]
    return tokens

In [None]:
def remove_rare_words(tokens, common_tokens, max_len):
    return [token if token in common_tokens
            else '<UNK>' for token in tokens][-max_len:]

In [None]:
def load_and_preprocess_data(data_path, max_vocab, max_len):
    df = pd.read_csv(data_path)
    stop_words = set(stopwords.words('english'))

    df['tokens'] = df['review'].apply(lambda x: tokenize(x, stop_words))

    all_tokens = [token for tokens in df['tokens'] for token in tokens]
    common_tokens = set(list(zip(*Counter(all_tokens).most_common(max_vocab)))[0])
    df['tokens'] = df['tokens'].apply(lambda x: remove_rare_words(x, common_tokens, max_len))

    df = df[df['tokens'].apply(lambda tokens: any(token != '<UNK>' for token in tokens))]

    vocab = sorted(set([token for tokens in df['tokens'] for token in tokens]))
    token2idx = {token: idx for idx, token in enumerate(vocab)}
    token2idx['<PAD>'] = len(token2idx)

    df['indexed_tokens'] = df['tokens'].apply(lambda tokens: [token2idx[token] for token in tokens])

    return df['indexed_tokens'].tolist(), df['label'].tolist(), token2idx

In [None]:
max_vocab = 2500

max_len = 100

sequences, targets, token2idx = load_and_preprocess_data(DATA_PATH, max_vocab, max_len)


In [None]:
def split_data(sequences, targets, valid_ratio=0.05, test_ratio=0.05):
    total_size = len(sequences)
    test_size = int(total_size * test_ratio)
    valid_size = int(total_size * valid_ratio)
    train_size = total_size - valid_size - test_size

    train_sequences, train_targets = sequences[:train_size], targets[:train_size]
    valid_sequences, valid_targets = sequences[train_size:train_size + valid_size], targets[train_size:train_size + valid_size]
    test_sequences, test_targets = sequences[train_size + valid_size:], targets[train_size + valid_size:]

    return train_sequences, train_targets, valid_sequences, valid_targets, test_sequences, test_targets

In [None]:
train_sequences, train_targets, valid_sequences, valid_targets, test_sequences, test_targets = split_data(sequences, targets)

In [None]:
def collate(batch):
    inputs, targets = zip(*batch)
    inputs_padded = pad_sequences(inputs, padding_val=token2idx['<PAD>'])
    return torch.LongTensor(inputs_padded), torch.LongTensor(targets)

In [None]:
def pad_sequences(sequences, padding_val=0, pad_left=False):
    """Pad a list of sequences to the same length with a padding_val."""
    sequence_length = max(len(sequence) for sequence in sequences)
    if not pad_left:
        return [sequence + [padding_val] * (sequence_length - len(sequence)) for sequence in sequences]
    return [[padding_val] * (sequence_length - len(sequence)) + sequence for sequence in sequences]

In [None]:
batch_size = 256
train_data = list(zip(train_sequences, train_targets))
valid_data = list(zip(valid_sequences, valid_targets))
test_data = list(zip(test_sequences, test_targets))

train_loader = DataLoader(train_data, batch_size=batch_size, shuffle=True, collate_fn=collate)
valid_loader = DataLoader(valid_data, batch_size=batch_size, shuffle=False, collate_fn=collate)
test_loader = DataLoader(test_data, batch_size=batch_size, shuffle=False, collate_fn=collate)

# RNN

## RNN with nn.RNN
Implement a basic RNN model using PyTorch's built-in nn.RNN.

Layers: embedding, RNN, and fully connected.

In [None]:
class RNNClassifier(nn.Module):
    def __init__(self, output_size, hidden_size, vocab_size,
                 device, n_layers=1,
                 embedding_dimension=50):
        super(RNNClassifier, self).__init__()
        self.hidden_size = hidden_size
        self.output_size = output_size
        self.n_layers = n_layers
        self.device = device
        self.embedding = nn.Embedding(vocab_size, embedding_dimension, padding_idx=token2idx['<PAD>'])
        self.rnn = nn.RNN(input_size=embedding_dimension,
                          hidden_size=hidden_size,
                          num_layers=n_layers,
                          batch_first=True,
                          nonlinearity='tanh')  
        self.fc = nn.Linear(hidden_size, output_size)

    def forward(self, inputs):
        embedded = self.embedding(inputs) 
        output, hidden = self.rnn(embedded)  
        out = self.fc(hidden[-1]) 
        return out  # logits for each class in the output.

### Train model









In [None]:
def train_model(model, train_loader, valid_loader, criterion, optimizer, device, num_epochs=10):
    model.to(device)
    for epoch in range(1, num_epochs + 1):
        model.train()
        epoch_loss = 0
        correct = 0
        total = 0
        for inputs, targets in tqdm(train_loader, desc=f"Training Epoch {epoch}"):
            inputs, targets = inputs.to(device), targets.to(device)
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, targets)
            loss.backward()
            optimizer.step()

            epoch_loss += loss.item() * inputs.size(0)
            _, predicted = torch.max(outputs, 1)
            correct += (predicted == targets).sum().item()
            total += targets.size(0)

        train_loss = epoch_loss / total
        train_acc = correct / total

        # Validation
        model.eval()
        val_loss = 0
        val_correct = 0
        val_total = 0
        with torch.no_grad():
            for inputs, targets in tqdm(valid_loader, desc=f"Validation Epoch {epoch}"):
                inputs, targets = inputs.to(device), targets.to(device)
                outputs = model(inputs)
                loss = criterion(outputs, targets)

                val_loss += loss.item() * inputs.size(0)
                _, predicted = torch.max(outputs, 1)
                val_correct += (predicted == targets).sum().item()
                val_total += targets.size(0)

        val_loss = val_loss / val_total
        val_acc = val_correct / val_total

        print(f"Epoch {epoch}: Train Loss={train_loss:.4f}, Train Acc={train_acc:.4f}, Val Loss={val_loss:.4f}, Val Acc={val_acc:.4f}")
    return model


In [None]:
# Parameters
output_size = 2  # Positive or Negative
hidden_size = 128  # Hyperparameter to be adjusted
vocab_size = len(token2idx)
n_layers = 1
embedding_dimension = 50

rnn_model = RNNClassifier(output_size=output_size,
                          hidden_size=hidden_size,
                          vocab_size=vocab_size,
                          device=device,
                          n_layers=n_layers,
                          embedding_dimension=embedding_dimension)

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(rnn_model.parameters(), lr=0.001)

num_epochs = 10
rnn_model = train_model(rnn_model, train_loader, valid_loader, criterion, optimizer, device, num_epochs=num_epochs)

## RNN from Scratch
Implement an RNN from scratch by creating a custom RNN cell and a model that stacks these cells over time.

In [None]:
class CustomRNNCell(nn.Module):
    def __init__(self, input_size, hidden_size):
        super(CustomRNNCell, self).__init__()
        self.hidden_size = hidden_size
        self.input2hidden = nn.Linear(input_size, hidden_size)
        self.hidden2hidden = nn.Linear(hidden_size, hidden_size)

    def forward(self, input, hidden):
        new_hidden = torch.tanh(self.input2hidden(input) + self.hidden2hidden(hidden))
        return new_hidden

In [None]:
class CustomRNN(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_size, output_size):
        super(CustomRNN, self).__init__()
        self.hidden_size = hidden_size
        self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=token2idx['<PAD>'])
        self.rnn_cell = CustomRNNCell(input_size=embedding_dim, hidden_size=hidden_size)
        self.fc = nn.Linear(hidden_size, output_size)
    def forward(self, inputs):
        embedded = self.embedding(inputs)  
        batch_size, seq_length, embedding_dim = embedded.size()
        hidden = torch.zeros(batch_size, self.hidden_size).to(inputs.device)

        for t in range(seq_length):
            hidden = self.rnn_cell(embedded[:, t, :], hidden)

        out = self.fc(hidden) 
        return out  # logits for each class in the output.

### Train model

In [None]:
output_size = 2 
hidden_size = 128 
vocab_size = len(token2idx)
n_layers = 1
embedding_dimension = 50

custom_rnn_model = CustomRNN(vocab_size=vocab_size,
                             embedding_dim=embedding_dimension,
                             hidden_size=hidden_size,
                             output_size=output_size)

criterion_custom_rnn = nn.CrossEntropyLoss()
optimizer_custom_rnn = optim.Adam(custom_rnn_model.parameters(), lr=0.001)

custom_rnn_model = train_model(custom_rnn_model, train_loader, valid_loader, criterion_custom_rnn, optimizer_custom_rnn, device)

### Evaluate RNN models on test set

In [None]:
def evaluate_on_test(model, test_loader):
    model.eval()
    y_true_test = []
    y_pred_test = []
    with torch.no_grad():
        for inputs, targets in tqdm(test_loader, desc="Evaluating on Test Set"):
            inputs, targets = inputs.to(device), targets.to(device)
            outputs = model(inputs)
            _, predicted = torch.max(outputs, 1)
            y_true_test.extend(targets.cpu().numpy())
            y_pred_test.extend(predicted.cpu().numpy())
    print(classification_report(y_true_test, y_pred_test, target_names=['Negative', 'Positive']))

In [None]:
print("Evaluating Built-in RNN Model on Test Set:")
evaluate_on_test(rnn_model, test_loader)

print("\nEvaluating Custom RNN Model on Test Set:")
evaluate_on_test(custom_rnn_model, test_loader)

# LSTM

## LSTM with nn.LSTM
Define an LSTM model using PyTorch's built-in nn.LSTM.

In [None]:
class LSTMClassifier(nn.Module):
    def __init__(self, output_size, hidden_size, vocab_size,
                 device, bidirectional=False, n_layers=1,
                 embedding_dimension=50):
        super(LSTMClassifier, self).__init__()
        self.hidden_size = hidden_size
        self.output_size = output_size
        self.n_layers = n_layers
        self.device = device
        self.bidirectional = bidirectional
        self.num_directions = 2 if bidirectional else 1

        self.embedding = nn.Embedding(vocab_size, embedding_dimension, padding_idx=token2idx['<PAD>'])
        self.lstm = nn.LSTM(input_size=embedding_dimension,
                            hidden_size=hidden_size,
                            num_layers=n_layers,
                            bidirectional=bidirectional,
                            batch_first=True)
        self.fc = nn.Linear(hidden_size * self.num_directions, output_size)

    def forward(self, inputs):
        h0 = torch.zeros(self.n_layers * self.num_directions, inputs.size(0), self.hidden_size).to(inputs.device)
        c0 = torch.zeros(self.n_layers * self.num_directions, inputs.size(0), self.hidden_size).to(inputs.device)

        embedded = self.embedding(inputs)  
        lstm_out, (hn, cn) = self.lstm(embedded, (h0, c0))  
        if self.bidirectional:
            hn = hn.view(self.n_layers, self.num_directions, inputs.size(0), self.hidden_size)
            hn_forward = hn[-1, 0, :, :]  
            hn_backward = hn[-1, 1, :, :] 
            hn_combined = torch.cat((hn_forward, hn_backward), dim=1)  
            out = self.fc(hn_combined)
        else:
            out = self.fc(hn[-1])  
        return out  

### Train model


In [None]:
lstm_model = LSTMClassifier(output_size=output_size,
                            hidden_size=hidden_size,
                            vocab_size=vocab_size,
                            device=device,
                            bidirectional=False,
                            n_layers=n_layers,
                            embedding_dimension=embedding_dimension)

criterion_lstm = nn.CrossEntropyLoss()
optimizer_lstm = optim.Adam(lstm_model.parameters(), lr=0.001)

lstm_model = train_model(lstm_model, train_loader, valid_loader, criterion_lstm, optimizer_lstm, device, num_epochs=num_epochs)

## Custom LSTM from Scratch
Implement an LSTM from scratch by defining a LSTM cell and a model that combines these cells over the sequence.

In [None]:
class CustomLSTMCell(nn.Module):
    def __init__(self, input_size, hidden_size):
        super(CustomLSTMCell, self).__init__()
        self.hidden_size = hidden_size
        self.input_gate = nn.Linear(input_size + hidden_size, hidden_size)
        self.forget_gate = nn.Linear(input_size + hidden_size, hidden_size)
        self.output_gate = nn.Linear(input_size + hidden_size, hidden_size)
        self.cell_gate = nn.Linear(input_size + hidden_size, hidden_size)

    def forward(self, input, hidden, cell_state):
        combined = torch.cat((input, hidden), dim=1)   
        i = torch.sigmoid(self.input_gate(combined))   
        f = torch.sigmoid(self.forget_gate(combined))   
        o = torch.sigmoid(self.output_gate(combined))  
        g = torch.tanh(self.cell_gate(combined))   
        cell_state = f * cell_state + i * g
        hidden = o * torch.tanh(cell_state)
        return hidden, cell_state  # New hidden state , New cell state

In [None]:
class CustomLSTM(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_size, output_size):
        super(CustomLSTM, self).__init__()
        self.hidden_size = hidden_size
        self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=token2idx['<PAD>'])
        self.lstm_cell = CustomLSTMCell(input_size=embedding_dim, hidden_size=hidden_size)
        self.fc = nn.Linear(hidden_size, output_size)

    def forward(self, inputs):
        batch_size = inputs.size(0)
        hidden = torch.zeros(batch_size, self.hidden_size).to(inputs.device)
        cell_state = torch.zeros(batch_size, self.hidden_size).to(inputs.device)
        embedded = self.embedding(inputs)  
        batch_size, seq_length, embedding_dim = embedded.size()
        for t in range(seq_length):
            hidden, cell_state = self.lstm_cell(embedded[:, t, :], hidden, cell_state)
        
        out = self.fc(hidden) 
        return out  

### Train model


In [None]:
custom_lstm_model = CustomLSTM(vocab_size=vocab_size,
                               embedding_dim=embedding_dimension,
                               hidden_size=hidden_size,
                               output_size=output_size)

criterion_custom_lstm = nn.CrossEntropyLoss()
optimizer_custom_lstm = optim.Adam(custom_lstm_model.parameters(), lr=0.001)

custom_lstm_model = train_model(custom_lstm_model, train_loader, valid_loader, criterion_custom_lstm, optimizer_custom_lstm, device, num_epochs=num_epochs)


### Evaluate LSTM models on test set

In [None]:
print("Evaluating Built-in LSTM Model on Test Set:")
evaluate_on_test(lstm_model, test_loader)

print("\nEvaluating Custom LSTM Model on Test Set:")
evaluate_on_test(custom_lstm_model, test_loader)

## Testing RNN and LSTM Models on a New Review

In [None]:
# Example review
review = "It is no wonder that the film has such a high rating, it is quite literally breathtaking. What can I say that hasn't said before? Not much, it's the story, the acting, the premise, but most of all, this movie is about how it makes you feel. Sometimes you watch a film, and can't remember it days later, this film loves with you, once you've seen it, you don't forget."

### Preprocess the test Review
To prepare the review for the model, we need to follow similar preprocessing steps as we did for the dataset:

Remove special characters and convert the text to lowercase.

Tokenize the text into individual words.

Remove stopwords to focus only on meaningful words.

Convert tokens to indices based on the token2idx dictionary created earlier.

Pad or truncate the sequence to a length of max_len .


In [None]:
import torch
import re
from nltk import wordpunct_tokenize
from nltk.corpus import stopwords

def preprocess_text(text, stop_words, token2idx, max_len):
    text = re.sub(r'[^\w\s]', '', text).lower()
    tokens = wordpunct_tokenize(text)
    tokens = [token for token in tokens if token not in stop_words]
    tokens_idx = [token2idx.get(token, token2idx['<UNK>']) for token in tokens]
    if len(tokens_idx) < max_len:
        tokens_idx = tokens_idx + [token2idx['<PAD>']] * (max_len - len(tokens_idx))
    else:
        tokens_idx = tokens_idx[:max_len]
    return tokens_idx

stop_words = set(stopwords.words('english'))
review_indices = preprocess_text(review, stop_words, token2idx, max_len)
print(f"Processed review indices: {review_indices}")
input_tensor = torch.LongTensor([review_indices]).to(device)

### Make Predictions
Now that we have preprocessed the review, use both the RNN and LSTM models to make predictions on the sentiment of the review.

Set the model to evaluation mode to prevent updates during inference.
Predict the sentiment class by passing the input_tensor to the model.
Interpret the prediction as either "Positive" or "Negative" based on the model's output.

In [None]:
def predict_sentiment(model, input_tensor, model_name="Model"):
    model.eval() 
    with torch.no_grad():
        input_tensor = input_tensor.to(device)
        outputs = model(input_tensor)
        _, predicted = torch.max(outputs, 1)
        class_label = "Positive" if predicted.item() == 1 else "Negative"
    print(f"The predicted class for the review by {model_name} is: {class_label}")

predict_sentiment(rnn_model, input_tensor, model_name="Built-in RNN")
predict_sentiment(custom_rnn_model, input_tensor, model_name="Custom RNN")
predict_sentiment(lstm_model, input_tensor, model_name="Built-in LSTM")
predict_sentiment(custom_lstm_model, input_tensor, model_name="Custom LSTM")