In [None]:
from IPython.display import clear_output
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import spacy
import nltk
import re
import string
from collections import Counter
from sklearn.model_selection import train_test_split
from torch.utils.data import Dataset, DataLoader
from tqdm import tqdm
import warnings
warnings.filterwarnings('ignore')

In [None]:
from IPython.display import clear_output
!pip install gdown
!gdown 1VmpeZgh9reH3dUYRUlaqQsj2mh3hhdb-

!wget https://nlp.stanford.edu/data/glove.6B.zip

!unzip /content/glove.6B.zip

!rm -rf /content/glove.6B.zip
!rm /content/glove.6B.50d.txt
!rm /content/glove.6B.100d.txt
!rm /content/glove.6B.200d.txt

clear_output()

In [None]:
nltk.download('stopwords')
import string
from nltk.corpus import stopwords
stop_words = stopwords.words("english")
# stop_words_regex = re.compile(r'\b(?:{})\b'.format('|'.join(stop_words)))
punctuations_regex = re.compile(r'[^\w\s]')
def tokenise(text):
#     text = re.sub(stop_words_regex, "", text)
    text = re.sub(punctuations_regex, "", text)
    return text.split()

review_df["tokenised_review"] = review_df["Verbatim Feedback "].apply(tokenise)

In [None]:
class Vocab:
    def __init__(self, text, max_text_length=205):
        self.text = text
        self.max_text_length = max_text_length
        self.vocab = self.create_vocab()
        self.unk_index = 0
        self.pad_index = 1
    
    def create_vocab(self):
        vocab = []
        for text in self.text:
            for token in text:
                if token not in vocab:
                    vocab.append(token)
        vocab.sort()
        vocab_dict = {word:(index+2) for index, word in enumerate(vocab)}
        vocab_dict["unk"] = 0
        vocab_dict["pad"] = 1
        return vocab_dict
    
    def vocab_size(self):
        return len(self.vocab)
        
    def token2index(self, tokens):
        embed = np.array([self.pad_index] * self.max_text_length)
        for index, token in enumerate(tokens[:self.max_text_length]):
            if token in self.vocab:
                embed[index] = self.vocab[token]
            else:
                embed[index] = self.unk_index
        return embed 

In [None]:
text = review_df["tokenised_review"].to_list()
train_vocab = Vocab(text, max_text_length=10)
vocab_size = train_vocab.vocab_size()

review_df["embedded_review"] = review_df["tokenised_review"].apply(lambda x: train_vocab.token2index(x))

In [None]:
X = list(review_df["embedded_review"])
y = list(review_df["Sentiment (1=Positive & 0= Negative)"])

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, shuffle=True)

In [None]:
class load_dataset(Dataset):
    def __init__(self, X, y):
        self.X = X
        self.y = y
        
    def __len__(self):
        return len(self.y)
    
    def __getitem__(self, idx):
        return torch.from_numpy(self.X[idx].astype(np.float32)), self.y[idx]

In [None]:
train_dataset = load_dataset(X_train, y_train)
batch_size = 16
train_loader = DataLoader(train_dataset, batch_size=batch_size)

In [None]:
class LSTM(nn.Module):
    def __init__(self, vocab_size, hidden_dim, output_dim, num_of_layers, embedding_dim=300, bidirectional=True):
        super(LSTM, self).__init__()
        self.bidirectional = bidirectional
        
        self.word2vec = nn.Embedding(vocab_size, embedding_dim)
        self.lstm = nn.LSTM(input_size=embedding_dim,
                            hidden_size=hidden_dim, 
                            num_layers=num_of_layers,
                            batch_first=True,
                            dropout=0.2, 
                            bidirectional=self.bidirectional)
        self.fc = nn.Sequential(nn.Linear(hidden_dim * 2, 256) if(bidirectional) else nn.Linear(hidden_dim, 256),
                                nn.Dropout(),
                                nn.Linear(256, 64),
                                nn.Dropout(),
                                nn.Linear(64, 16),
                                nn.Dropout(),
                                nn.Linear(16, 4),
                                nn.Dropout(),
                                nn.Linear(4, output_dim),
                               )

    def forward(self, text):
        text = self.word2vec(text)
        output, (hidden_state, cell_state) = self.lstm(text)

        if(self.bidirectional):
            hidden_state = torch.cat((hidden_state[-2, :, :], hidden_state[-1, : , :]), dim = 1)
        else:
            hidden_state = hidden_state[-1]
        
        outputs = self.fc(hidden_state)
        
        return outputs

In [None]:
def train_model(model, train_loader, device, epochs=20, lr=3e-4):
    model = model.to(device)
    optimizer = torch.optim.Adam(model.parameters(), lr=lr)
    loss_function = nn.BCEWithLogitsLoss()
    
    for epoch in range(epochs):
        model.train()
        train_loop = tqdm(enumerate(train_loader), total=len(train_loader), colour="green")
        for index, (X, y) in train_loop:
            X = X.to(device).long()
            y = y.to(device)
            y_pred = model(X)
            optimizer.zero_grad()
            loss = loss_function(y_pred, torch.reshape(y, (-1, 1)).float())
            loss.backward()
            optimizer.step()
            train_loop.set_description(f"Epoch [{epoch+1}/{epochs}]")
            train_loop.set_postfix(loss=loss.item())

In [None]:
hidden_dim = 512
output_dim = 1
num_of_layers = 2
embedding_dim = 100
lstm_model = LSTM(vocab_size, hidden_dim, output_dim, num_of_layers, embedding_dim)

device = "cuda" if torch.cuda.is_available() else "cpu"
epochs = 20
lr = 3e-4
train_model(lstm_model, train_loader, device, epochs, lr)

In [None]:
test_dataset = load_dataset(X_test, y_test)
batch_size = 1
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

In [None]:
predictions = []
test_loop = tqdm(enumerate(test_loader), total=len(test_loader), colour="green")
for index, (X, y) in test_loop:
    X = X.to(device).long()
    y = y.to(device)
    y_pred = lstm_model(X)
    if y_pred>=0.5:
        predictions.append(1)
    else:
        predictions.append(0)
    test_loop.set_description("Calculating accuracy..")

In [None]:
f1_score(predictions, y_test)