# Sentiment Analysis on IMDB Rating
Implemented using RNN

In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
import torchtext
import numpy as np
import matplotlib.pyplot as plt
import datasets
import re
from collections import Counter, OrderedDict

## Preprocessing

In [2]:
#Load test and train data
train_data, test_data = datasets.load_dataset('imdb', split=['train','test'])

#Split test data into train (20k) and validate (5k)
from torch.utils.data.dataset import random_split
torch.manual_seed(1)
train_data, valid_data = random_split(list(train_data),[20000,5000])

In [3]:
def tokenizer(text):
    # Remove HTML tags
    text = re.sub('<[^>]*>', '', text)
    # Extract emoticons
    emoticons = re.findall(r'(?::|;|=)(?:-)?(?:\)|\(|D|P)', text.lower())
    # Eliminate excessive whitespace and convert text to lowercase
    text = re.sub(r'[\W]+', ' ', text.lower())
    # Append emoticons at the end, removing the "nose" for standardization
    text = text + ' ' + ' '.join(emoticons).replace('-', '')
    #Split by white space
    tokenized = text.split()
    return tokenized

In [4]:
#How many unique tokens are in the text corpus?
token_counts = Counter()
for review in train_data:
    text = review['text']
    tokens = tokenizer(text)
    token_counts.update(tokens)
print('number of tokens', len(token_counts))

number of tokens 69006


In [5]:
#Map each token to a unique integer. In reverse frequency order. 0 and 1 placeholders
#Sort counter in reverse frequency order
sorted_dict = sorted(
    token_counts.items(), key=lambda x:x[1], reverse=True
)
ordered_dict = OrderedDict(sorted_dict)

#Word_index contains word:index pairs
word_index = {}
counter = 2
for word, freq in ordered_dict.items():
    word_index[word] = counter
    counter += 1

#0 reserverd for padding. 1 reserved for unknown words
word_index['<pad>'] = 0
word_index['<unk>'] = 1

#Demonstrate encoding scheme works
def word_index_conversion(text):
    encoding = []
    tokens = tokenizer(text)
    for token in tokens:
        encoding.append(word_index.get(token,1))
    return encoding

#Testing
print(word_index_conversion("Roses are red"))
print(word_index_conversion("roSes ARE reD :)"))

[11558, 26, 736]
[11558, 26, 736, 2152]


In [6]:
def build_dataloader(batch):
    label_list, text_list, lengths = [], [], []
    for review in batch:
        text = review['text']
        label = review['label']
        label_list.append(label)
        processed_text = torch.tensor(word_index_conversion(text), dtype=torch.int64)
        text_list.append(processed_text)
        lengths.append(processed_text.size(0))
    label_list = torch.tensor(label_list)
    lengths = torch.tensor(lengths)
    #Ensure all sequence in minibatch have same length to store efficiently as tensor
    padded_text_list = nn.utils.rnn.pad_sequence(text_list, batch_first=True)
    return padded_text_list, label_list, lengths

In [7]:
#Load a small sample with batchsize of 4
from torch.utils.data import DataLoader
dataloader = DataLoader(train_data,batch_size=4,shuffle=False, collate_fn=build_dataloader)
text_batch, label_batch, length_batch = next(iter(dataloader))
#Length of text_batch is maximum in the minibatch

In [8]:
#Divide into batches of size 32
batch_size = 32
train_dl = DataLoader(train_data, batch_size=batch_size, shuffle=True, collate_fn=build_dataloader)
valid_dl = DataLoader(valid_data, batch_size=batch_size, shuffle=True, collate_fn=build_dataloader)
test_dl = DataLoader(test_data, batch_size=batch_size, shuffle=True, collate_fn=build_dataloader)

## Dimensionality Reduction
One way to encode the index is via one hot encoding. Results in sparse feature vectors \
Suffer from curse of dimensionality \
A better approach is to map each word to a vector of fixed size with real-valued elements \
-> Advantage: Reduction in dimensionality of the feature space \
-> Extraction of salient features since the embedding layer in an NN can be optimized \
Let n be the number of unique words \
Embedding matrix is of size (n+2) x embedding_dim. Reserve 2 spots for \<unknown\> and \<pad\> \
Given integer index i, simply look up the row at index i

# RNN Models

In [11]:
# Save the trained model
def save_model(model, path='sentiment_rnn.pth'):
    torch.save(model.state_dict(), path)
    print(f"Model saved to {path}")

# Load the trained model
def load_model(model, path='sentiment_rnn.pth'):
    model.load_state_dict(torch.load(path))
    model.eval()  # Set the model to evaluation mode
    print(f"Model loaded from {path}")
    return model


In [15]:
# Define the RNN model
class SentimentRNN(nn.Module):
    def __init__(self, vocab_size, embed_size, hidden_size, output_size, num_layers=1):
        super(SentimentRNN, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embed_size, padding_idx=0)  # Embedding layer
        self.rnn = nn.RNN(embed_size, hidden_size, num_layers, batch_first=True, nonlinearity='tanh')
        self.fc = nn.Linear(hidden_size, output_size)  # Fully connected layer

    def forward(self, text, lengths):
        # Apply embedding layer
        embedded = self.embedding(text)
        
        # Pack padded sequence
        packed = nn.utils.rnn.pack_padded_sequence(embedded, lengths, batch_first=True, enforce_sorted=False)
        
        # Pass through RNN
        packed_output, hidden = self.rnn(packed)
        
        # Use the last hidden state
        hidden = hidden[-1]  # Take the last layer's hidden state
        
        # Fully connected layer
        output = self.fc(hidden)
        return output

# Model parameters
vocab_size = len(word_index)
embed_size = 128  # Size of the embedding vectors
hidden_size = 128  # Number of hidden units in RNN
output_size = 2  # Positive or negative sentiment
num_layers = 2  # Number of RNN layers

# Instantiate the model
model = SentimentRNN(vocab_size, embed_size, hidden_size, output_size, num_layers)
print(model)

# Loss and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Move model to GPU if available
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = model.to(device)
criterion = criterion.to(device)

# Training loop
def train_model(model, train_dl, valid_dl, epochs=5):
    for epoch in range(epochs):
        model.train()
        train_loss = 0
        for text, labels, lengths in train_dl:
            text, labels, lengths = text.to(device), labels.to(device), lengths.to(device)

            # Forward pass
            outputs = model(text, lengths)
            loss = criterion(outputs, labels)
            
            # Backward pass and optimization
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            
            train_loss += loss.item()
        
        valid_loss = 0
        model.eval()
        with torch.no_grad():
            for text, labels, lengths in valid_dl:
                text, labels, lengths = text.to(device), labels.to(device), lengths.to(device)
                outputs = model(text, lengths)
                loss = criterion(outputs, labels)
                valid_loss += loss.item()
        
        print(f"Epoch {epoch + 1}/{epochs}, Training Loss: {train_loss / len(train_dl):.4f}, Validation Loss: {valid_loss / len(valid_dl):.4f}")

# Train the model
train_model(model, train_dl, valid_dl, epochs=5)

# Evaluation function
def evaluate_model(model, test_dl):
    correct, total = 0, 0
    model.eval()
    with torch.no_grad():
        for text, labels, lengths in test_dl:
            text, labels, lengths = text.to(device), labels.to(device), lengths.to(device)
            outputs = model(text, lengths)
            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    print(f"Test Accuracy: {correct / total * 100:.2f}%")

# Evaluate the model on the test set
evaluate_model(model, test_dl)

SentimentRNN(
  (embedding): Embedding(69008, 128, padding_idx=0)
  (rnn): RNN(128, 128, num_layers=2, batch_first=True)
  (fc): Linear(in_features=128, out_features=2, bias=True)
)
Epoch 1/5, Training Loss: 0.6422, Validation Loss: 0.5808
Epoch 2/5, Training Loss: 0.5759, Validation Loss: 0.5958
Epoch 3/5, Training Loss: 0.5442, Validation Loss: 0.6860
Epoch 4/5, Training Loss: 0.5175, Validation Loss: 0.5292
Epoch 5/5, Training Loss: 0.5237, Validation Loss: 0.6161
Test Accuracy: 69.87%


In [16]:
save_model(model, "RNN 2 Layers Embedding 128 Hidden 128 Epochs 5")

Model saved to RNN 2 Layers Embedding 128 Hidden 128 Epochs 5


In [None]:
# Define the RNN model
class SentimentRNN(nn.Module):
    def __init__(self, vocab_size, embed_size, hidden_size, output_size, num_layers=1, dropout=0.5):
        super(SentimentRNN, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embed_size, padding_idx=0)  # Embedding layer
        self.lstm = nn.LSTM(embed_size, hidden_size, num_layers, batch_first=True, dropout=dropout)
        self.fc = nn.Linear(hidden_size, output_size)  # Fully connected layer
        self.dropout = nn.Dropout(dropout)  # Regularization

    def forward(self, text, lengths):
        # Apply embedding layer
        embedded = self.embedding(text)
        
        # Pack padded sequence
        packed = nn.utils.rnn.pack_padded_sequence(embedded, lengths, batch_first=True, enforce_sorted=False)
        
        # Pass through LSTM
        packed_output, (hidden, _) = self.lstm(packed)
        
        # Use the last hidden state
        hidden = hidden[-1]  # Take the last layer's hidden state
        
        # Apply dropout and fully connected layer
        output = self.fc(self.dropout(hidden))
        return output

# Model parameters
vocab_size = len(word_index)
embed_size = 100  # Size of the embedding vectors
hidden_size = 128  # Number of hidden units in LSTM
output_size = 2  # Positive or negative sentiment
num_layers = 1  # Number of LSTM layers
dropout = 0.5  # Dropout probability

# Instantiate the model
model = SentimentRNN(vocab_size, embed_size, hidden_size, output_size, num_layers, dropout)
print(model)

# Loss and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Move model to GPU if available
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = model.to(device)
criterion = criterion.to(device)

# Training loop
def train_model(model, train_dl, valid_dl, epochs=5):
    for epoch in range(epochs):
        model.train()
        train_loss = 0
        for text, labels, lengths in train_dl:
            text, labels, lengths = text.to(device), labels.to(device), lengths.to(device)

            # Forward pass
            outputs = model(text, lengths)
            loss = criterion(outputs, labels)
            
            # Backward pass and optimization
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            
            train_loss += loss.item()
        
        valid_loss = 0
        model.eval()
        with torch.no_grad():
            for text, labels, lengths in valid_dl:
                text, labels, lengths = text.to(device), labels.to(device), lengths.to(device)
                outputs = model(text, lengths)
                loss = criterion(outputs, labels)
                valid_loss += loss.item()
        
        print(f"Epoch {epoch + 1}/{epochs}, Training Loss: {train_loss / len(train_dl):.4f}, Validation Loss: {valid_loss / len(valid_dl):.4f}")

# Train the model
train_model(model, train_dl, valid_dl, epochs=5)

# Evaluation function
def evaluate_model(model, test_dl):
    correct, total = 0, 0
    model.eval()
    with torch.no_grad():
        for text, labels, lengths in test_dl:
            text, labels, lengths = text.to(device), labels.to(device), lengths.to(device)
            outputs = model(text, lengths)
            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    print(f"Test Accuracy: {correct / total * 100:.2f}%")

# Evaluate the model on the test set
evaluate_model(model, test_dl)




SentimentRNN(
  (embedding): Embedding(69008, 100, padding_idx=0)
  (lstm): LSTM(100, 128, batch_first=True, dropout=0.5)
  (fc): Linear(in_features=128, out_features=2, bias=True)
  (dropout): Dropout(p=0.5, inplace=False)
)
Epoch 1/5, Training Loss: 0.5929, Validation Loss: 0.5373
Epoch 2/5, Training Loss: 0.5187, Validation Loss: 0.4915
Epoch 3/5, Training Loss: 0.4405, Validation Loss: 0.5174
Epoch 4/5, Training Loss: 0.4164, Validation Loss: 0.4556
