[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github.com/JeryBan/ai-powered-review-summary/blob/main/sentiment_analysis.ipynb)

# Review Classifier based on Sentiment Analysis
---

# Table of Contents

- [Dataframe & text preprocess](#Dataframe-&-text-preprocess)
- [Setup Configurations](#Setup-Configurations)
- [Getting Pretrained Embeddings](#Getting-Pretrained-Embeddings)
- [Creating the Model](#Creating-the-Model)
- [Creating Datasets and Dataloaders](#Creating-Datasets-and-Dataloaders)
- [Training and Evaluation](#Training-and-Evaluation)

In [165]:
# for colab
# !pip install torchmetrics
# !pip install torchinfo

# Dataframe & text preprocess

In [39]:
import pandas as pd
from pathlib import Path

dataset_dir = Path('./data')
csv_dir = dataset_dir / 'raw' / 'Restaurant reviews.csv'

df = pd.read_csv(csv_dir, usecols=['Review', 'Rating']).dropna()
df

Unnamed: 0,Review,Rating
0,"The ambience was good, food was quite good . h...",5
1,Ambience is too good for a pleasant evening. S...,5
2,A must try.. great food great ambience. Thnx f...,5
3,Soumen das and Arun was a great guy. Only beca...,5
4,Food is good.we ordered Kodi drumsticks and ba...,5
...,...,...
9995,Madhumathi Mahajan Well to start with nice cou...,3
9996,This place has never disappointed us.. The foo...,4.5
9997,"Bad rating is mainly because of ""Chicken Bone ...",1.5
9998,I personally love and prefer Chinese Food. Had...,4


In [40]:
from src.utils.df_setup import clean_dataframe

df = clean_dataframe(df)

* turn ratings to binary labels for the sentiment analysis

In [41]:
df.loc[:, 'Rating'] = df['Rating'].astype(float)
df.loc[:, 'Rating'] = df['Rating'].map(lambda rating: 1 if rating > 3 else 0)

In [42]:
# check for duplicates or missing values
duplicate_index = df.index[df.index.duplicated()]
print('Duplicates:')
print(len(duplicate_index))

print('\nMissing indexes:')
missing_index = set(range(len(df))) - set(df.index)
print(len(missing_index))

Duplicates:
0

Missing indexes:
57


In [43]:
df = df.reindex(range(len(df)))
missing_index = set(range(len(df))) - set(df.index)
df = df.dropna()
print('Missing indexes:')
print(len(missing_index))

Missing indexes:
0


In [44]:
# clean reviews but keep the verbs to help in sentiment analysis
from src.utils.text_setup import TextCleaner

f = TextCleaner(remove_verbs=False)
# df['cleaned-reviews'] = df['Review'].map(lambda review: f.clean(review))
# df.to_csv(dataset_dir / 'processed' /'clean_with_verbs.csv', index=False)

In [1]:
import pandas as pd
df = pd.read_csv('./data/processed/clean_with_verbs.csv')
df

Unnamed: 0,Review,Rating,cleaned-reviews
0,"The ambience was good, food was quite good . h...",1,ambience good food quite good saturday lunch c...
1,Ambience is too good for a pleasant evening. S...,1,ambience good pleasant evening service prompt ...
2,A must try.. great food great ambience. Thnx f...,1,must try great food great ambience thnx servic...
3,Soumen das and Arun was a great guy. Only beca...,1,soumen das arun great guy behavior sincerety g...
4,Food is good.we ordered Kodi drumsticks and ba...,1,food good.we order kodi drumstick basket mutto...
...,...,...,...
9881,Been looking for Chinese food around gachibowl...,1,look chinese food around gachibowli find place...
9882,I am amazed at the quality of food and service...,1,amazed quality food service place provide opul...
9883,The food was amazing. Do not forget to try 'Mo...,1,food amazing forget try mou chi kay amazing si...
9884,We ordered from here via swiggy:\n\nWe ordered...,1,order via swiggy order stuff mushroom little s...


# Setup Configurations

In [2]:
import torch
from torch import nn

from torchinfo import summary
from torchmetrics import Accuracy, F1Score

device = 'cuda' if torch.cuda.is_available() else 'cpu'
device

'cpu'

In [3]:
class conf:
    # training
    EPOCHS = 1
    LOSS_FN = nn.BCELoss()
    LR = 0.001
    BATCH_SIZE = 32
    # metrics
    ACC_FN = Accuracy(task='binary').to(device)
    F1 = F1Score(task='binary').to(device)
    # model
    EMBEDDING_DIM = 200
    HIDDEN_UNITS = 128
    ATTN_HEADS = 1
    LSTM_LAYERS = 2
            

In [4]:
df['cleaned-reviews'] = df['cleaned-reviews'].astype(str)
df['Rating'] = df['Rating'].astype(int)
reviews = df['cleaned-reviews'].values.tolist()
labels = df['Rating'].values.tolist()
reviews[:2]

['ambience good food quite good saturday lunch cost effective good place sate brunch one also chill friend parent waiter soumen das really courteous helpful',
 'ambience good pleasant evening service prompt food good good experience soumen das kudo service']

In [10]:
from src.utils.vocab import create_vocabulary

vocab = create_vocabulary(reviews, min_freq=3)
vocab.save('./data/saved_models/sentiment_vocab.pt')

# Getting Pretrained Embeddings

In [46]:
# %%writefile src/utils/embeddings.py

from pathlib import Path
import zipfile
import requests
import os

def download_weights():
    '''Downloads glove.6B.200d.txt , pretrained word weights
       and returns the path of the downloaded file.'''
    file_destination = Path('data/saved_models')
    
    print('Downloading weights...')
    with open('glove.6B.200d.zip', 'wb') as f:
        response = requests.get('https://storage.googleapis.com/kaggle-data-sets/13926/18767/bundle/archive.zip?X-Goog-Algorithm=GOOG4-RSA-SHA256&X-Goog-Credential=gcp-kaggle-com%40kaggle-161607.iam.gserviceaccount.com%2F20240229%2Fauto%2Fstorage%2Fgoog4_request&X-Goog-Date=20240229T114040Z&X-Goog-Expires=259200&X-Goog-SignedHeaders=host&X-Goog-Signature=4c1c59d7ff03b749bc5a3d74c5211a9dc203f9b3d4582ec867eb582e4ed7725d935ccb77dc1b7dc5b0216245492c765b0091c3519e266f750447a56cd34b5fa8c6de769c722c1ad37ddd891a9926ee87d7823d889da35922efbe07bfc4abf9991680951b4a246d3b3fc0b2ca7e2b6524d03f85a5718c2d04a0bf45150eb504707e64230862fb1439ea5ffab680cdff33c50b493e625702b4f8d594d2bc6a1c7116f4b63edf2fbb65eef09510dc9b9687997bfb331cdb414540b758db3d904d33dd129a09f41ece8e9223138fe6bff3ff62a77b8737dee24ff2175b37593121500445b822d863bbf2919cfcb1885947ab24e3a14d3afdcba79a211569379d9d44')
        f.write(response.content)

    print(f'Unziping to {file_destination} ...')
    with zipfile.ZipFile('glove.6B.200d.zip', 'r') as zip_ref:
        zip_ref.extractall(file_destination)

    os.remove('glove.6B.200d.zip')
    print('Done')

    return file_destination / 'glove.6B.200d.txt'

from gensim.corpora import Dictionary
import numpy as np
import torch

def get_embedding_matrix(weights_path):
    '''Populates a matrix with the pretrained weights for every word
       in the vocabulary present in the weights file.'''
    vocab = Dictionary.load('./data/saved_models/sentiment_vocab.pt')
    stoi = vocab.token2id
    
    embeddings_index = {}
    
    f = open(weights_path)
    for line in f:
        values = line.split()
        word = values[0]
        coefs = np.asarray(values[1:], dtype='float32')
        embeddings_index[word] = coefs
    f.close()
    
    embedding_matrix = np.zeros((len(stoi) + 1, 200))
    
    for word, i in stoi.items():
        embedding_vector = embeddings_index.get(word)
        if embedding_vector is not None:
            # words not found in embedding index will be all-zeros.
            embedding_matrix[i] = embedding_vector

    return torch.tensor(embedding_matrix, dtype=torch.float)
    

In [12]:
weights_path = Path('data/saved_models/glove.6B.200d.txt') #download_weights()

embedding_matrix = get_embedding_matrix(weights_path)

In [13]:
embedding_matrix.shape

torch.Size([4944, 200])

# Creating the Model

In [15]:
# %%writefile src/models/lstm_model.py

import torch
from torch import nn

class SelfAttention(nn.Module):
    def __init__(self, embedding_dim: int, num_heads: int):
        super(SelfAttention, self).__init__()

        self.attn = nn.MultiheadAttention(embed_dim = embedding_dim,
                                          num_heads = num_heads,
                                          batch_first = True)

    def forward(self, embeddings):
        output, _ = self.attn(embeddings, embeddings, embeddings)
        return output
        

class LSTMClassifier(nn.Module):
    def __init__(self, embedding_matrix, embedding_dim, hidden_units, lstm_layers: int, attn_heads: int):
        super(LSTMClassifier, self).__init__()
 
        self.embeddings = nn.Embedding.from_pretrained(embeddings = embedding_matrix, freeze = True)

        self.attn_layer = SelfAttention(embedding_dim = embedding_dim,
                                        num_heads = attn_heads)
        
        self.lstm = nn.LSTM(embedding_dim, hidden_units, 
                            num_layers = lstm_layers, 
                            batch_first = True, 
                            bidirectional = True)
        
        self.fc = nn.Sequential(nn.Linear(in_features = hidden_units * 2, out_features = hidden_units),
                                nn.Linear(in_features = hidden_units, out_features = 1),
                                nn.Dropout(0.2))
        
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
    
        embeddings = self.embeddings(x)

        attn_out = self.attn_layer(embeddings)
    
        lstm_out, _ = self.lstm(attn_out)
    
        lstm_out = lstm_out[:, -1, :]
        
        fc_out = self.fc(lstm_out).squeeze(1)

        output = self.sigmoid(fc_out)

        return output

In [16]:
model = LSTMClassifier(embedding_matrix=embedding_matrix,
                       embedding_dim=conf.EMBEDDING_DIM, 
                       hidden_units=conf.HIDDEN_UNITS,
                       lstm_layers=conf.LSTM_LAYERS,
                       attn_heads=conf.ATTN_HEADS)

In [36]:
summary(model=model,
        input_data=train_dataset[0][0].unsqueeze(0),
        col_names=['input_size', 'output_size', 'num_params'])

Layer (type:depth-idx)                   Input Shape               Output Shape              Param #
LSTMClassifier                           [1, 200]                  [1]                       --
├─Embedding: 1-1                         [1, 200]                  [1, 200, 200]             (988,800)
├─SelfAttention: 1-2                     [1, 200, 200]             [1, 200, 200]             --
│    └─MultiheadAttention: 2-1           [1, 200, 200]             [1, 200, 200]             160,800
├─LSTM: 1-3                              [1, 200, 200]             [1, 200, 256]             733,184
├─Sequential: 1-4                        [1, 256]                  [1, 1]                    --
│    └─Linear: 2-2                       [1, 256]                  [1, 128]                  32,896
│    └─Linear: 2-3                       [1, 128]                  [1, 1]                    129
│    └─Dropout: 2-4                      [1, 1]                    [1, 1]                    --
├─Sigmoid: 1-

# Creating Dataset and Dataloaders

In [48]:
# %%writefile src/utils/data.py

from typing import List
import os

import torch
from torch.utils.data import Dataset, DataLoader

from torchtext.functional import to_tensor
import torchtext.transforms as T

from gensim.corpora import Dictionary

class CustomDataset(Dataset):
    '''Creates a torch.utils.data.Dataset and applies given transformations.'''
    def __init__(self, data: List[str], labels: list):
        super().__init__()

        self.labels = torch.tensor(labels, dtype=torch.float)
        self.data = custom_transforms(data)

    def __len__(self):
        return len(self.data)

    def __getitem__(self, index):
        return self.data[index], self.labels[index]
        

def train_test_split(data: List[str], labels: list, split_point: float = 0.7):
    '''Splits the data to train and test segments according to split_point.'''
    split = int(split_point * len(data))
    
    train_data = data[0 : split]
    train_labels = labels[0 : split]

    test_data = data[split : len(data) -1]
    test_labels = labels[split : len(labels) -1]

    return train_data, train_labels, test_data, test_labels

def create_dataloaders(train_dataset, test_dataset, batch_size: int):
    '''Creates train and test dataloaders of given batch size.'''
    train_dataloader = DataLoader(dataset=train_dataset,
                                  batch_size=batch_size,
                                  num_workers=os.cpu_count(),
                                  shuffle=False)

    test_dataloader = DataLoader(dataset=test_dataset,
                                  batch_size=batch_size,
                                  num_workers=os.cpu_count(),
                                  shuffle=False)

    return train_dataloader, test_dataloader

def custom_transforms(data: List[str], max_seq_len: int = 200) -> torch.Tensor:
    '''Converts words to ids truncates and returns sentences as tensors.'''
    vocab = Dictionary.load('./data/saved_models/sentiment_vocab.pt')
    
    f = T.Truncate(max_seq_len=max_seq_len)
    
    sent2ids = [vocab.doc2idx(sentence.split(' '), unknown_word_index=1) for sentence in data]
    sent2ids = f(sent2ids)
    
    return to_tensor(sent2ids, padding_value=0)
    

In [33]:
train_data, train_labels, test_data, test_labels = train_test_split(reviews, labels)

train_dataset = CustomDataset(data=train_data, labels=train_labels)
test_dataset = CustomDataset(data=test_data, labels=test_labels)

len(train_dataset), len(test_dataset)

(6920, 2965)

In [34]:
train_dataloader, test_dataloader = create_dataloaders(train_dataset, test_dataset, conf.BATCH_SIZE)

len(train_dataloader), len(test_dataloader)

(217, 93)

# Training and Evaluation

In [35]:
# %%writefile src/training/training.py

import torch
from torch import nn

def train_step(model: nn.Module,
               loss_fn: nn.Module,
               optimizer: torch.optim,
               acc_fn,
               dataloader: torch.utils.data,
               device: torch.device):
    '''Training step during model training'''

    model.train()
    model.to(device)

    train_loss, train_acc = 0, 0

    for batch, (X, y) in enumerate(dataloader):
        X, y = X.to(device), y.to(device)

        logits = model(X)
        preds = torch.round(logits)
        
        loss = loss_fn(logits, y)
        acc = acc_fn(preds, y)

        train_loss += loss.item()
        train_acc += acc.item()

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    train_loss /= len(dataloader)
    train_acc /= len(dataloader)


    return train_loss, train_acc



def test_step(model: nn.Module,
              loss_fn: nn.Module,
              acc_fn,
              f1,
              dataloader: torch.utils.data,
              device: torch.device):
    '''Test step during evaluation'''

    model.eval()
    model.to(device)

    test_loss, test_acc, f1_score = 0, 0, 0

    with torch.inference_mode():
        for batch, (X, y) in enumerate(dataloader):
            X, y = X.to(device), y.to(device)
    
            test_logits = model(X)
            test_preds = torch.round(test_logits)
            
            loss = loss_fn(test_logits, y).item()
            acc = acc_fn(test_preds, y).item()
            score = f1(test_preds, y).item()

            test_loss += loss
            test_acc += acc
            f1_score += score

        test_loss /= len(dataloader)
        test_acc /= len(dataloader)
        f1_score /= len(dataloader)

    return test_loss, test_acc, f1_score



def train(model: nn.Module,
         loss_fn: nn.Module,
         optimizer: torch.optim,
         train_dataloader: torch.utils.data,
         test_dataloader: torch.utils.data,
         acc_fn,
         f1,
         epochs: int,
         device: torch.device):

    results = {
        'train_loss': [],
        'train_acc': [],
        'test_loss': [],
        'test_acc': [],
        'f1_score': []
    }

    best_test_loss = float('inf')

    for epoch in range(epochs):
        print(f'\nEpoch: {epoch}\n--------')

        train_loss, train_acc = train_step(model=model, loss_fn=loss_fn, optimizer=optimizer, acc_fn=acc_fn, dataloader=train_dataloader, device=device)
        results['train_loss'].append(train_loss)
        results['train_acc'].append(train_acc)

        print()
        
        test_loss, test_acc, f1_score = test_step(model=model, loss_fn=loss_fn, acc_fn=acc_fn, f1=f1, dataloader=test_dataloader, device=device)
        results['test_loss'].append(test_loss)
        results['test_acc'].append(test_acc)
        results['f1_score'].append(f1_score)

        if test_loss < best_test_loss:
                best_test_loss = test_loss
                torch.save(model.state_dict(), f'lstm-{epochs}epochs-{conf.LSTM_LAYERS}lstm-{conf.ATTN_HEADS}attn-{conf.BATCH_SIZE}.pth')
                print('saved')

        print(f'train loss: {train_loss:.4f} | train acc: {train_acc:.2f}%')
        print(f'test loss: {test_loss:.4f} | test acc: {test_acc:.2f}%')
        print(f'f1 score: {f1_score:.2f}')

    return results


In [154]:
model = LSTMClassifier(embedding_matrix=embedding_matrix, 
                       embedding_dim=conf.EMBEDDING_DIM, 
                       hidden_units=conf.HIDDEN_UNITS,
                       lstm_layers=conf.LSTM_LAYERS,
                       attn_heads=conf.ATTN_HEADS)

optimizer = torch.optim.Adam(model.parameters(), lr=conf.LR)

In [None]:
results = train(model=model,
               loss_fn=conf.LOSS_FN,
               optimizer=optimizer,
               train_dataloader=train_dataloader,
               test_dataloader=test_dataloader,
               acc_fn=conf.ACC_FN,
               f1=conf.F1,
               epochs=conf.EPOCHS,
               device=device)

In [9]:
# %%writefile src/evaluation/plot.py

from typing import Dict, List
import matplotlib.pyplot as plt

def plot_loss_curves(results: Dict[str, List[float]]):
    """Plots training curves of a results dictionary."""
    
    loss = results['train_loss']
    test_loss = results['test_loss']

    accuracy = results['train_acc']
    test_accuracy = results['test_acc']

    f1_score = results['f1_score']

    epochs = range(len(results['train_loss']))

    plt.figure(figsize=(15, 7))

    # loss
    plt.subplot(1, 3, 1)
    plt.plot(epochs, loss, label='train_loss')
    plt.plot(epochs, test_loss, label='test_loss')
    plt.title('Loss')
    plt.xlabel('Epochs')
    plt.legend()

    # accuracy
    plt.subplot(1, 3, 2)
    plt.plot(epochs, accuracy, label='train_accuracy')
    plt.plot(epochs, test_accuracy, label='test_accuracy')
    plt.title('Accuracy')
    plt.xlabel('Epochs')
    plt.legend();

    # f1 score
    plt.subplot(1, 3, 3)
    plt.plot(epochs, f1_score, label='f1_score')
    plt.title('F1 Score')
    plt.xlabel('Epochs')
    plt.legend();

In [None]:
plot_loss_curves(results)