In [None]:
import numpy as np
import pandas as pd
import torch
import time

import nltk
from nltk.stem import WordNetLemmatizer
from nltk.corpus import stopwords
from nltk.lm import Vocabulary

import re
import spacy
import string
from sklearn.model_selection import train_test_split

from torch.utils.data import Dataset, TensorDataset, DataLoader
from torch.nn.utils.rnn import pad_sequence
import torch.nn as nn
from torch import optim
import torch.nn.functional as F

In [None]:
nltk.download('wordnet')

In [None]:
nltk.download('stopwords')

In [None]:
df = pd.read_csv('./dataset/IMDB Movie Reviews/movie_data.csv')
df.tail()

In [None]:
df['review'][1]

In [None]:
df['sentiment'] .value_counts()

In [None]:
df.isnull().sum()

In [None]:
df.duplicated().sum()

In [None]:
df.drop_duplicates(inplace=True)
print(df.duplicated().sum())
print(df['sentiment'].value_counts())
print(df.shape)

In [None]:
def remove_tags(text):
    cleaned_text = re.sub(re.compile('<.*?>'), '', text)
    return cleaned_text

df['review'] = df['review'].apply(remove_tags)
df.head()

In [None]:
df['review'] = df['review'].str.lower()
df.head()

In [None]:
sw_list = stopwords.words('english')
# sw_list

In [None]:
df['review'] = df['review'].apply(lambda x: ' '.join([word for word in x.split() if word not in (sw_list)]))
df.head()

In [None]:
exclude = string.punctuation

def remove_punctuations(text):
    return text.translate(str.maketrans('', '', exclude))

df['review'] = df['review'].apply(remove_punctuations)
df.head()

In [None]:
def remove_numbers(text):
    return re.sub(r'\d+', '', text)

df['review'] = df['review'].apply(remove_numbers)
df.head()

In [None]:
# from textblob import TextBlob

# def correct_word(text):
#     return str(TextBlob(text).correct())

# df['review'] = df['review'].apply(correct_word)

In [None]:
def lemma(text):
    lemmatizer = WordNetLemmatizer()
    return ' '.join([lemmatizer.lemmatize(word) for word in text.split()])

df['review'] = df['review'].apply(lemma)
df.head()

## DataLoader

In [None]:
spacy_eng = spacy.load("en_core_web_sm")

In [None]:
class CustomVocabulary:
    def __init__(self, freq_threshold):
        self.itos = {0:"<PAD>", 1:"<SOS>", 2:"<EOS>", 3:"<UNK>"}
        self.stoi = {"<PAD>":0, "<SOS>":1, "<EOS>":2, "<UNK>":3}
        self.freq_threshold = freq_threshold

    def __len__(self):
        return len(self.itos)

    @staticmethod
    def tokenizer_eng(text):
        return [tok.text.lower() for tok in spacy_eng.tokenizer(text)]

    def build_vocabulary(self, sentence_list):
        all_tokens = [token for sentence in sentence_list for token in self.tokenizer_eng(sentence)]

        # Create a Vocabulary object from nltk.lm with the tokens and frequency threshold
        vocab = Vocabulary(all_tokens, unk_cutoff=self.freq_threshold)

        # Create mappings from word to index and index to word
        idx = 4  # Starting index after predefined tokens
        for word in vocab:
            if word not in self.stoi:  # Avoid overwriting special tokens
                self.stoi[word] = idx
                self.itos[idx] = word
                idx += 1

    def numericalize(self, text):
        tokenized_text = self.tokenizer_eng(text)
        return [self.stoi.get(token, self.stoi["<UNK>"]) for token in tokenized_text]

In [None]:
vocab = CustomVocabulary(freq_threshold=5)
all_reviews = df['review'].tolist()
vocab.build_vocabulary(all_reviews)

In [None]:
class IMDB_Dataset(Dataset):
    def __init__(self, data):
        self.data = data

    def __len__(self):
        return len(self.data)

    def __getitem__(self, index):
        review = self.data['review'][index]
        numericalized_review = [vocab.stoi["<SOS>"]]
        numericalized_review += vocab.numericalize(review)
        numericalized_review.append(vocab.stoi["<EOS>"])

        label = self.data['sentiment'][index]
        return torch.tensor(numericalized_review, dtype=torch.float32), torch.tensor(label, dtype=torch.int64)

In [None]:
class CustomCollate:
    def __init__(self, pad_idx, seq_length=250):
        self.pad_idx = pad_idx
        self.seq_length = seq_length

    def pad_text(self, encoded_reviews):
        reviews = []
        for review in encoded_reviews:
            review = review.tolist()  # Convert tensor to list
            if len(review) >= self.seq_length:
                reviews.append(review[:self.seq_length])
            else:
                reviews.append([self.pad_idx] * (self.seq_length - len(review)) + review)
        return np.array(reviews)

    def __call__(self, batch):
        reviews = [item[0] for item in batch]
        reviews = self.pad_text(reviews)
        reviews = torch.tensor(reviews, dtype=torch.int64)
        labels = [item[1] for item in batch]
        return reviews, torch.tensor(labels, dtype=torch.int64)

In [None]:
def get_loader(data, batch_size, shuffle=False, drop_last=False):
    dataset = IMDB_Dataset(data)
    pad_idx = vocab.stoi["<PAD>"]
    loader = DataLoader(dataset=dataset, batch_size=batch_size, drop_last=drop_last, shuffle=shuffle, collate_fn=CustomCollate(pad_idx))
    return loader

In [None]:
train_ratio = 0.8
valid_ratio = (1 - train_ratio)/2
total = len(df)
train_cutoff = int(total * train_ratio)
valid_cutoff = int(total * (1 - valid_ratio))

train_data = df[['review', 'sentiment']][:train_cutoff]
valid_data = df[['review', 'sentiment']][train_cutoff : valid_cutoff]
test_data = df[['review', 'sentiment']][valid_cutoff : ]

train_data.reset_index(inplace= True)
valid_data.reset_index(inplace=True)
test_data.reset_index(inplace=True)

In [None]:
train_loader = get_loader(train_data, batch_size=128, shuffle=True, drop_last=False)
valid_loader = get_loader(valid_data, batch_size=128, shuffle=False, drop_last=False)
test_loader = get_loader(test_data, batch_size=128, shuffle=False, drop_last=False)

In [None]:
for i, (inp, tar) in enumerate(train_loader):
    print(inp.shape)
    print(tar.shape)
    break

for i, (inp, tar) in enumerate(valid_loader):
    print(inp.shape)
    print(tar.shape)
    break

for i, (inp, tar) in enumerate(test_loader):
    print(inp.shape)
    print(tar.shape)
    break

### LSTM Model

In [None]:
class RNN(torch.nn.Module):

    def __init__(self, input_dim, embedding_dim, hidden_dim, output_dim):
        super().__init__()

        self.embedding = torch.nn.Embedding(input_dim, embedding_dim)
        self.rnn = torch.nn.LSTM(embedding_dim, hidden_dim, bidirectional=True)

        self.fc = torch.nn.Linear(hidden_dim * 2, output_dim)  # Multiply hidden_dim by 2

    def forward(self, text):
        # text dim: [sentence length, batch size]

        embedded = self.embedding(text)
        # embedded dim: [sentence length, batch size, embedding dim]

        output, (hidden, cell) = self.rnn(embedded)
        # output dim: [sentence length, batch size, hidden dim * 2]
        # hidden dim: [2, batch size, hidden dim] (2 for bidirectional)

        # Concatenate the final forward (hidden[-2,:,:]) and backward (hidden[-1,:,:]) hidden states
        hidden = torch.cat((hidden[-2,:,:], hidden[-1,:,:]), dim=1)
        # hidden dim: [batch size, hidden dim * 2]

        # print('output', output.shape)
        # print('hidden', hidden.shape)

        output = self.fc(hidden)
        # output dim: [batch size, output dim]

        # print('output after fc', output.shape)

        return output

In [None]:
RANDOM_SEED = 123
torch.manual_seed(RANDOM_SEED)

VOCABULARY_SIZE = 20000
LEARNING_RATE = 0.005
BATCH_SIZE = 128
NUM_EPOCHS = 15
DEVICE = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')

EMBEDDING_DIM = 128
HIDDEN_DIM = 256
NUM_CLASSES = 2

In [None]:
print(DEVICE)

In [None]:
torch.manual_seed(RANDOM_SEED)
model = RNN(input_dim=len(vocab.stoi),
            embedding_dim=EMBEDDING_DIM,
            hidden_dim=HIDDEN_DIM,
            output_dim=NUM_CLASSES # could use 1 for binary classification
)

model = model.to(DEVICE)
optimizer = torch.optim.Adam(model.parameters(), lr=0.005)

In [None]:
def compute_accuracy(model, data_loader, device):

    with torch.no_grad():

        correct_pred, num_examples = 0, 0

        for i, (features, targets) in enumerate(train_loader):
            model.eval()

            features = torch.transpose(features, 1, 0)
            targets = targets.long()

            features = features.long().to(DEVICE)
            targets = targets.to(DEVICE)

            logits = model(features)
            _, predicted_labels = torch.max(logits, 1)

            num_examples += targets.size(0)
            correct_pred += (predicted_labels == targets).sum()
    return correct_pred.float()/num_examples * 100

In [None]:
start_time = time.time()

for epoch in range(NUM_EPOCHS):
    model.train()
    for batch_idx, (text, labels) in enumerate(train_loader):

        text = torch.transpose(text, 1, 0)
        labels = labels.long()

        text = text.long().to(DEVICE)
        labels = labels.to(DEVICE)

        # print('input', text.shape)
        # print('label', labels.shape)

        ### FORWARD AND BACK PROP
        logits = model(text)

        # print('logits', logits.shape)

        loss = F.cross_entropy(logits, labels)
        optimizer.zero_grad()

        loss.backward()

        ### UPDATE MODEL PARAMETERS
        optimizer.step()


        ### LOGGING
        if not batch_idx % 50:
            print (f'Epoch: {epoch+1:03d}/{NUM_EPOCHS:03d} | '
                   f'Batch {batch_idx:03d}/{len(train_loader):03d} | '
                   f'Loss: {loss:.4f}')


    with torch.set_grad_enabled(False):
        print(f'training accuracy: '
              f'{compute_accuracy(model, train_loader, DEVICE):.2f}%'
              f'\nvalid accuracy: '
              f'{compute_accuracy(model, valid_loader, DEVICE):.2f}%')

    print(f'Time elapsed: {(time.time() - start_time)/60:.2f} min')

print(f'Total Training Time: {(time.time() - start_time)/60:.2f} min')
print(f'Test accuracy: {compute_accuracy(model, test_loader, DEVICE):.2f}%')

In [None]:
def pad_single_text(review, seq_length):

    if len(review) >= seq_length:
        return (review[:seq_length])
    else:
        return ([0]*(seq_length-len(review)) + review)

In [None]:
def preprocess(text):
  text = remove_tags(text)
  text = text.lower()
  text = ' '.join([word for word in text.split() if word not in (sw_list)])
  text = remove_numbers(text)
  text = remove_punctuations(text)
  text = lemma(text)
  tokenized = [vocab.stoi["<SOS>"]]
  tokenized += vocab.numericalize(text)
  tokenized.append(vocab.stoi["<EOS>"])
  pad_texts = pad_single_text(tokenized, 250)
  return torch.tensor(pad_texts)

In [None]:
def predict_sentiment(model, sentence):

    model.eval()
    tensor = preprocess(sentence).view(-1, 1).to(DEVICE)
    prediction = torch.nn.functional.softmax(model(tensor), dim=1)
    prob_positive = prediction[0][1].item()

    if (prob_positive>=0.5):
      print('Probability positive:', prob_positive)
    else:
      print('Probability negative:', 1-prob_positive)

predict_sentiment(model, "This is such an awesome movie, I really love it!")

In [None]:
predict_sentiment(model, "I really hate this movie, it is really bad and sucks!")

In [None]:
predict_sentiment(model, "coulde be much more better")