<a href="https://colab.research.google.com/github/Cpt-Shaan/deep-stuff/blob/main/Sequential-Models/sentiment_analysis.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Sentiment analysis on IMDB dataset using single layer LSTMs.

Setting Device

In [None]:
import torch
device = 'cuda' if torch.cuda.is_available() else 'cpu'

Reading CSV File and storing reviews and labels.

In [None]:
import csv

reviews = []
labels = []
with open("IMDB Dataset.csv", "r") as f:
    reader = csv.reader(f)
    next(reader)  # Skip the header row
    for row in reader:
        # Check if the row has at least 2 elements before accessing the second element
        if len(row) > 1:
            reviews.append(row[0])
            labels.append(row[1])
        else:
            # Handle rows with missing data, for example by skipping them or adding a default value
            print(f"Skipping row: {row}") # Print a warning message for the skipped row

Skipping row: ["I think that the basic idea of any movie is to entertain or to inform. If you want information you are looking at true life movies and historical movies. Sometimes these are one of the same. The other side of the coin is to entertain. Did Hitch entertain me? Yes it did. Okay the formula is standard. Boy meets girl or in this case boys met girls. They get together have a falling out then get back together. However the way it happened in this movie was refreshing. I particularly liked the bar scene with Hitch and Sara. The Allegra Albert romance was a delight to watch unfold, most REAL men are shy when it comes to wooing the woman of their dreams and had I had Hitch's advice I would probably have got my wife up the altar in half the time.I read the first comment on this film that appeared to suggest that this movie was played safely and good have had a few more laughs. I tend to disagree there are so many laughs you can pack into a romantic comedy without turning it into 

In [None]:
print(len(reviews))
print(len(labels))

14209
14209


In [None]:
from string import punctuation

Removing Punctuations from the text as pre-processing

In [None]:
def preprocess_text(text):
    text = text.lower()
    text = "".join([char for char in text if char not in punctuation])
    return text

for review in reviews:
    preprocess_text(review)


Providing 2-way lookup methods for words and their IDs which are assigned here.

In [None]:
from collections import Counter

all_words = []
for review in reviews:
    all_words += review.split()

word_counts = Counter(all_words)
word_list = sorted(word_counts, key = word_counts.get, reverse = True)
vocab_to_int = {word:idx+1 for idx, word in enumerate(word_list)}
int_to_vocab = {idx:word for word, idx in vocab_to_int.items()}

encoded_reviews = [[vocab_to_int.get(word) for word in review.split()] for review in reviews]

In [None]:
encoded_labels = [1 if label == 'positive' else 0 for label in labels]
print(len(encoded_reviews))
print(len(encoded_labels))

14209
14209


Padding Text Sequence to maintain same length for each sentence / review.

In [None]:
import numpy as np

encoded_labels = np.array( [label for idx, label in enumerate(encoded_labels) if len(encoded_reviews[idx]) > 0] )
encoded_reviews = [review for review in encoded_reviews if len(review) > 0]

def pad_text(encoded_reviews, seq_length):

    reviews = []

    for review in encoded_reviews:
        if len(review) >= seq_length:
            reviews.append(review[:seq_length])
        else:
            reviews.append([0]*(seq_length-len(review)) + review)

    return np.array(reviews)


padded_reviews = pad_text(encoded_reviews, seq_length = 200)
# Only Considering 1st 200 words of each review.

Splitting Training, Validation and Testing dataset.

In [None]:
train_ratio = 0.8
valid_ratio = (1 - train_ratio)/2
total = padded_reviews.shape[0]
train_cutoff = int(total * train_ratio)
valid_cutoff = int(total * (1 - valid_ratio))

train_x, train_y = padded_reviews[:train_cutoff], encoded_labels[:train_cutoff]
valid_x, valid_y = padded_reviews[train_cutoff : valid_cutoff], encoded_labels[train_cutoff : valid_cutoff]
test_x, test_y = padded_reviews[valid_cutoff:], encoded_labels[valid_cutoff:]

from torch.utils.data import TensorDataset, DataLoader

train_x = torch.tensor(train_x)
train_y = torch.tensor(train_y)
valid_x = torch.tensor(valid_x)
valid_y = torch.tensor(valid_y)
test_x = torch.tensor(test_x)
test_y = torch.tensor(test_y)

train_data = TensorDataset(train_x, train_y)
valid_data = TensorDataset(valid_x, valid_y)
test_data = TensorDataset(test_x, test_y)

batch_size = 50
train_loader = DataLoader(train_data, batch_size = batch_size, shuffle = True)
valid_loader = DataLoader(valid_data, batch_size = batch_size, shuffle = True)
test_loader = DataLoader(test_data, batch_size = batch_size, shuffle = True)


MOdel Architecture using LSTM cells.

In [None]:
from torch import nn

class SentimentRNN(nn.Module):

    def __init__(self, n_vocab, n_embed, n_hidden, n_output, n_layers, drop_p = 0.5):
        super().__init__()

        self.n_vocab = n_vocab
        self.n_layers = n_layers
        self.n_hidden = n_hidden

        self.embedding = nn.Embedding(n_vocab, n_embed)
        self.rnn = nn.RNN(n_embed, n_hidden, n_layers, batch_first = True, dropout = drop_p)
        self.dropout = nn.Dropout(drop_p)
        self.fc = nn.Linear(n_hidden, n_output)
        self.sigmoid = nn.Sigmoid()

    def forward (self, input_words):

        embedded_words = self.embedding(input_words)
        rnn_out, h = self.lstm(embedded_words)
        rnn_out = self.dropout(rnn_out)
        rnn_out = rnn_out.contiguous().view(-1, self.n_hidden)
        fc_out = self.fc(rnn_out)
        sigmoid_out = self.sigmoid(fc_out)
        sigmoid_out = sigmoid_out.view(batch_size, -1)

        # extract the output of ONLY the LAST output of the LAST element of the sequence
        sigmoid_last = sigmoid_out[:, -1]

        return sigmoid_last, h


    def init_hidden (self, batch_size):  # initialize hidden weights (h,c) to 0

        device = "cuda" if torch.cuda.is_available() else "cpu"
        weights = next(self.parameters()).data
        h = (weights.new(self.n_layers, batch_size, self.n_hidden).zero_().to(device),
             weights.new(self.n_layers, batch_size, self.n_hidden).zero_().to(device))

        return h

Initializing Hyperparameters

In [None]:
n_vocab = len(vocab_to_int)
n_embed = 400
n_hidden = 512
n_output = 1
n_layers = 2

net = SentimentRNN(n_vocab, n_embed, n_hidden, n_output, n_layers)
net = net.to(device)

Training Process using Adam's optimizer

In [None]:
from torch import optim

criterion = nn.BCELoss()
optimizer = optim.Adam(net.parameters(), lr = 0.0001)

print_every = 50
step = 0
n_epochs = 50
clip = 5  # for gradient clip to prevent exploding gradient problem in LSTM/RNN

for epoch in range(n_epochs):
    h = net.init_hidden(batch_size)
    net.train()

    for inputs, labels in train_loader:
        step += 1
        inputs, labels = inputs.to(device), labels.to(device)

        # making requires_grad = False for the latest set of h
        h = tuple([each.data for each in h])

        net.zero_grad()
        output, h = net(inputs)

        # Adjust output and labels to have the same size for the last batch
        # by taking only the relevant elements from the output tensor
        output = output[:labels.size(0)]

        loss = criterion(output.squeeze(), labels.float())
        loss.backward()
        nn.utils.clip_grad_norm(net.parameters(), clip)
        optimizer.step()

        if (step % print_every) == 0:
            net.eval()
            valid_losses = []
            v_h = net.init_hidden(batch_size)

            for v_inputs, v_labels in valid_loader:
                v_inputs, v_labels = v_inputs.to(device), v_labels.to(device) # Use v_inputs and v_labels instead of inputs and labels

                v_h = tuple([each.data for each in v_h])

                v_output, v_h = net(v_inputs)

                # Adjust output and labels to have the same size for the last batch in validation as well
                v_output = v_output[:v_labels.size(0)]

                v_loss = criterion(v_output.squeeze(), v_labels.float())
                valid_losses.append(v_loss.item())

            print("Epoch: {}/{}".format((epoch+1), n_epochs),
                  "Step: {}".format(step),
                  "Training Loss: {:.4f}".format(loss.item()),
                  "Validation Loss: {:.4f}".format(np.mean(valid_losses)))
            net.train()

  nn.utils.clip_grad_norm(net.parameters(), clip)


Epoch: 1/50 Step: 50 Training Loss: 0.7265 Validation Loss: 0.6948
Epoch: 1/50 Step: 100 Training Loss: 0.6879 Validation Loss: 0.6941
Epoch: 1/50 Step: 150 Training Loss: 0.7156 Validation Loss: 0.6931
Epoch: 1/50 Step: 200 Training Loss: 0.7111 Validation Loss: 0.6937
Epoch: 2/50 Step: 250 Training Loss: 0.6982 Validation Loss: 0.6932
Epoch: 2/50 Step: 300 Training Loss: 0.7115 Validation Loss: 0.6926
Epoch: 2/50 Step: 350 Training Loss: 0.7121 Validation Loss: 0.6941
Epoch: 2/50 Step: 400 Training Loss: 0.7187 Validation Loss: 0.6948
Epoch: 2/50 Step: 450 Training Loss: 0.6746 Validation Loss: 0.6959
Epoch: 3/50 Step: 500 Training Loss: 0.7055 Validation Loss: 0.6931
Epoch: 3/50 Step: 550 Training Loss: 0.6900 Validation Loss: 0.6935
Epoch: 3/50 Step: 600 Training Loss: 0.6898 Validation Loss: 0.6936
Epoch: 3/50 Step: 650 Training Loss: 0.7060 Validation Loss: 0.6959
Epoch: 4/50 Step: 700 Training Loss: 0.6955 Validation Loss: 0.6941
Epoch: 4/50 Step: 750 Training Loss: 0.6868 Valid

Evaluating Accuracy and Loss on the Test Set.

In [None]:
net.eval()
test_losses = []
num_correct = 0
test_h = net.init_hidden(batch_size)

for inputs, labels in test_loader:
    test_h = tuple([each.data for each in test_h])
    inputs, labels = inputs.to(device), labels.to(device)
    test_output, test_h = net(inputs)

    # Get the actual batch size for the current batch
    current_batch_size = labels.size(0)

    # Slice the output to match the target size
    test_output = test_output[:current_batch_size]

    loss = criterion(test_output, labels.float())
    test_losses.append(loss.item())

    preds = torch.round(test_output.squeeze())
    correct_tensor = preds.eq(labels.float().view_as(preds))
    correct = np.squeeze(correct_tensor.cpu().numpy())
    num_correct += np.sum(correct)

print("Test Loss: {:.4f}".format(np.mean(test_losses)))
print("Test Accuracy: {:.2f}".format(num_correct/len(test_loader.dataset)))

Test Loss: 1.4218
Test Accuracy: 0.68


Function for predicting custom review


In [None]:
def predict(net, review, seq_length=200):
    device = "cuda" if torch.cuda.is_available() else "cpu"

    words = preprocess_text(review).split()
    encoded_words = [vocab_to_int.get(word) for word in words if word in vocab_to_int]

    if not encoded_words:
        print("Your review must contain at least 1 word present in the vocabulary!")
        return None

    padded_words = pad_text([encoded_words], seq_length)
    padded_words = torch.from_numpy(padded_words).to(device)

    net.eval()

    # Initialize the hidden state with a batch size of 1
    h = net.init_hidden(1)

    output, h = net(padded_words)
    # Get the prediction for the first (and only) item in the batch
    # output.squeeze(0) will get the first element in the batch and squeeze it to remove unnecessary dimensions.
    # output.squeeze(0)[0] will get the first element of this tensor, corresponding to the prediction for the review.
    pred = torch.round(output.squeeze(0)[-1])

    pred_value = pred.item() if isinstance(pred, torch.Tensor) else pred

    msg = "This is a positive review." if pred_value == 1 else "This is a negative review."

    return msg

In [None]:
review1 = "very good"
print(predict(net, review1))
review2 = "very bad movie"
print(predict(net, review2))

This is a positive review.
This is a negative review.


In [None]:
review3 = "this movie was horrible"
print(predict(net, review3))
review4 = "this movie was great"
print(predict(net, review4))

This is a negative review.
This is a positive review.


In [None]:
#save model
torch.save(net.state_dict(), 'model.pth')

In [None]:
#download model params
from google.colab import files
files.download('model.pth')

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>