# Sentiment Analysis on Amazon Customer Reviews Using Parallel Deep Learning

<b>Course:</b> CSYE7105 - High Performance Parallel Machine Learning & AI

<b>Team 4:</b> Anshul Chaudhary and Dev Mithunisvar Premraj

<b>Instructor:</b> Prof. Handan Liu


<div style="background-color: #d3d3d3; color: #000000; border: 1px solid #a9a9a9; padding: 10px; border-radius: 5px;">
    <strong>Note: </strong> Install the requirements using the following command:
    <ol>
    <li> Activate virtual env </li>
    <li> pip install -r requirements.txt </li>
    </ol>
</div>

### Data Processing 

#### Import Statements

In [1]:
# Data Processing Packages
import pandas as pd
import dask.dataframe as dd
from dask.distributed import performance_report
from dask.distributed import Client
import re

#### Client Setup

In [2]:
# client to monitor the resources during Dask porcessing
client = Client(n_workers=8, threads_per_worker=1, memory_limit='8GB')
client

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 8
Total threads: 8,Total memory: 59.60 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:38373,Workers: 8
Dashboard: http://127.0.0.1:8787/status,Total threads: 8
Started: Just now,Total memory: 59.60 GiB

0,1
Comm: tcp://127.0.0.1:35252,Total threads: 1
Dashboard: http://127.0.0.1:46737/status,Memory: 7.45 GiB
Nanny: tcp://127.0.0.1:43995,
Local directory: /tmp/dask-scratch-space/worker-c8dyq2fl,Local directory: /tmp/dask-scratch-space/worker-c8dyq2fl

0,1
Comm: tcp://127.0.0.1:33911,Total threads: 1
Dashboard: http://127.0.0.1:32882/status,Memory: 7.45 GiB
Nanny: tcp://127.0.0.1:32957,
Local directory: /tmp/dask-scratch-space/worker-9urz2j0o,Local directory: /tmp/dask-scratch-space/worker-9urz2j0o

0,1
Comm: tcp://127.0.0.1:40322,Total threads: 1
Dashboard: http://127.0.0.1:35993/status,Memory: 7.45 GiB
Nanny: tcp://127.0.0.1:35652,
Local directory: /tmp/dask-scratch-space/worker-ykcid93l,Local directory: /tmp/dask-scratch-space/worker-ykcid93l

0,1
Comm: tcp://127.0.0.1:33801,Total threads: 1
Dashboard: http://127.0.0.1:46794/status,Memory: 7.45 GiB
Nanny: tcp://127.0.0.1:46616,
Local directory: /tmp/dask-scratch-space/worker-dlkxp1gi,Local directory: /tmp/dask-scratch-space/worker-dlkxp1gi

0,1
Comm: tcp://127.0.0.1:35614,Total threads: 1
Dashboard: http://127.0.0.1:46067/status,Memory: 7.45 GiB
Nanny: tcp://127.0.0.1:45576,
Local directory: /tmp/dask-scratch-space/worker-kbs74lgo,Local directory: /tmp/dask-scratch-space/worker-kbs74lgo

0,1
Comm: tcp://127.0.0.1:37251,Total threads: 1
Dashboard: http://127.0.0.1:45055/status,Memory: 7.45 GiB
Nanny: tcp://127.0.0.1:39913,
Local directory: /tmp/dask-scratch-space/worker-10pxl9i8,Local directory: /tmp/dask-scratch-space/worker-10pxl9i8

0,1
Comm: tcp://127.0.0.1:42882,Total threads: 1
Dashboard: http://127.0.0.1:45880/status,Memory: 7.45 GiB
Nanny: tcp://127.0.0.1:33857,
Local directory: /tmp/dask-scratch-space/worker-agxzhwi0,Local directory: /tmp/dask-scratch-space/worker-agxzhwi0

0,1
Comm: tcp://127.0.0.1:38341,Total threads: 1
Dashboard: http://127.0.0.1:37151/status,Memory: 7.45 GiB
Nanny: tcp://127.0.0.1:39603,
Local directory: /tmp/dask-scratch-space/worker-lk8qkib3,Local directory: /tmp/dask-scratch-space/worker-lk8qkib3


#### Load Data in Dask DataFrame

In [3]:
# Source Data Path
train_path = '../dataset/train/train.ft.txt'
test_path = '../dataset/test/test.ft.txt'

In [4]:
# loading data into Dask DataFrame
train_data_dask = dd.read_csv(train_path, delimiter='\t', header=None, dtype=str)
test_data_dask = dd.read_csv(test_path, delimiter='\t', header=None, dtype=str)

In [5]:
train_data_dask

Unnamed: 0_level_0,0
npartitions=24,Unnamed: 1_level_1
,string
,...
...,...
,...
,...


In [6]:
test_data_dask

Unnamed: 0_level_0,0
npartitions=2,Unnamed: 1_level_1
,string
,...
,...


In [7]:
train_data_dask.head(1)

Unnamed: 0,0
0,__label__2 Stuning even for the non-gamer: Thi...


**Observations:**

- ```train_data_dask``` has npartition of 24 which means that the data is divided into 24 partitions. This allows us to take advantage of parallel processing and distribute the workload across multiple cores or machines.
- ```test_data_dask``` has npartition of 2 which means that the data is divided into 2 partitions. This allows us to take advantage of parallel processing and distribute the workload across multiple cores or machines.
- we can repartition the test data into 4 partition to better utilize the alloted cores.


**Data Cleaning Required:**

- We can see that while reading the data, all the values are read as string. So, we need to split the label and the review text and convert the label to integer. We will also remove the '__label__' prefix from the label.

#### Data Cleaning

In [8]:
# repartition test data to hae 4 partition
test_data_dask = test_data_dask.repartition(npartitions=4)

In [9]:
# persist the data in memory to avoid recomputation during processing
train_data_dask = train_data_dask.persist()
test_data_dask = test_data_dask.persist()

In [10]:
# function to process the data and split the label and review text
def process_data_dask(df):
  labels = df[0].str.split(' ', n=1, expand=True)[0].str.replace('__label__', '')
  text = df[0].str.split(' ', n=1, expand=True)[1]

  processed_df = pd.DataFrame({
    'label': labels,
    'review': text.str.strip()
  })
  return processed_df

In [11]:
# meta dataframe to define the schema of the output dataframe
meta = pd.DataFrame(columns=['label', 'review'], dtype=str)

In [12]:
# apply the process_data_dask function to each partition of the dataframe
train_data_dask = train_data_dask.map_partitions(process_data_dask, meta=meta)
test_data_dask = test_data_dask.map_partitions(process_data_dask, meta=meta)

In [13]:
# replace the label 2 with 1 and 1 with 0 to make it binary classification problem
# 1 -> good review
# 0 -> bad review
replace_dict = {"2": "1", "1": "0"}

In [14]:
# function to replace the labels
def replace_labels(df):
  df['label'] = df['label'].replace(replace_dict)
  return df

In [15]:
# apply the replace_labels function to each partition of the dataframe
train_data_dask = train_data_dask.map_partitions(replace_labels)
test_data_dask = test_data_dask.map_partitions(replace_labels)

In [16]:
# Text data cleaning
def text_cleaning(text):
  text = text.lower()
  pattern_punc = r'[^A-Za-z\s]'
  text = re.sub(pattern_punc, '', text).strip()
  return text

In [17]:
# function to clean the reviews and drop the null values
def clean_reviews(df):
  df['review'] = df['review'].apply(text_cleaning)
  df = df.dropna()
  return df

In [18]:
# apply the clean_reviews function to each partition of the dataframe
train_data_dask = train_data_dask.map_partitions(clean_reviews)
test_data_dask = test_data_dask.map_partitions(clean_reviews)

In [19]:
with performance_report(filename="dask_report.html"):
  train_data_processed = train_data_dask.compute()
  test_data_processed = test_data_dask.compute()

In [20]:
client.close()

In [21]:
train_data_processed.head()

Unnamed: 0,label,review
0,1,stuning even for the nongamer this sound track...
1,1,the best soundtrack ever to anything im readin...
2,1,amazing this soundtrack is my favorite music o...
3,1,excellent soundtrack i truly like this soundtr...
4,1,remember pull your jaw off the floor after hea...


In [22]:
test_data_processed.head()

Unnamed: 0,label,review
0,1,great cd my lovely pat has one of the great vo...
1,1,one of the best game music soundtracks for a ...
2,0,batteries died within a year i bought this ch...
3,1,works fine but maha energy is better check out...
4,1,great for the nonaudiophile reviewed quite a b...


In [23]:
test_data_processed.shape

(400000, 2)

####  Split Train and Validation Data

In [23]:
from sklearn.model_selection import train_test_split

In [24]:
train_data, val_data = train_test_split(train_data_processed, test_size=0.2, random_state=42)  # 80-20 split

### Tokenization & Dataset Preparation

#### Import Statements

In [39]:
import torch
from torch.utils.data import Dataset, DataLoader
from torch.utils.data.distributed import DistributedSampler
from transformers import AutoTokenizer

#### Dataset class definition

In [26]:
class SentimentDataset(Dataset):
  def __init__(self, data, tokenizer, max_length=512):
    self.data = data
    self.tokenizer = tokenizer
    self.max_length = max_length

  def __len__(self):
    return len(self.data)

  def __getitem__(self, idx):
    text = self.data.iloc[idx]['review']
    label = int(self.data.iloc[idx]['label'])
    
    # Tokenizing with AutoTokenizer
    encoding = self.tokenizer(text, truncation=True, padding='max_length', max_length=self.max_length, return_tensors="pt")
    input_ids = encoding['input_ids'].squeeze(0)  # Remove batch dimension
    
    return input_ids, torch.tensor(label, dtype=torch.float32)

In [27]:
tokenizer = AutoTokenizer.from_pretrained('bert-base-uncased')

In [40]:
train_dataset = SentimentDataset(train_data, tokenizer)
val_dataset = SentimentDataset(val_data, tokenizer)

In [29]:
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)

### Model Definition

In [36]:
world_size = 4

In [None]:
import torch
import torch.optim as optim
import torch.nn as nn
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.utils.data import Dataset, DataLoader

import torch.multiprocessing as mp
import re
import pandas as pd
from transformers import AutoTokenizer
from sklearn.model_selection import train_test_split
import dask.dataframe as dd

In [42]:
class SentimentLSTM(nn.Module):
  def __init__(self, vocab_size, embedding_dim, hidden_dim, output_dim):
    super(SentimentLSTM, self).__init__()
    
    # Define the layers
    self.embedding = nn.Embedding(vocab_size, embedding_dim)
    self.lstm = nn.LSTM(embedding_dim, hidden_dim, batch_first=True)
    self.fc = nn.Linear(hidden_dim, output_dim)
    self.sigmoid = nn.Sigmoid()

  def forward(self, x):
    # Pass through embedding layer
    x = self.embedding(x)
    
    # Pass through LSTM
    _, (hidden, _) = self.lstm(x)
    
    # Pass through fully connected layer and apply sigmoid
    output = self.fc(hidden[-1])  # Only use the last hidden state
    return self.sigmoid(output)  # Sigmoid to get probabilities (0 to 1)

In [48]:
def setup(rank, world_size):
    try:
        print(f"Initializing process group for rank {rank} out of {world_size} total processes")
        dist.init_process_group("gloo", rank=rank, world_size=world_size)  # Use 'gloo' for CPU
    except Exception as e:
        print(f"Error initializing process group for rank {rank}: {e}")


def cleanup():
  dist.destroy_process_group()

In [49]:
def train(rank, world_size):
    try:
        print(f"Rank {rank} starting")
        setup(rank, world_size)  # DDP setup

        # Dataset and Dataloader
        train_dataset = SentimentDataset(train_data, tokenizer)
        val_dataset = SentimentDataset(val_data, tokenizer)

        train_sampler = DistributedSampler(train_dataset, num_replicas=world_size, rank=rank)
        val_sampler = DistributedSampler(val_dataset, num_replicas=world_size, rank=rank)

        train_loader = DataLoader(train_dataset, batch_size=32, sampler=train_sampler)
        val_loader = DataLoader(val_dataset, batch_size=32, sampler=val_sampler)

        # Model Setup
        model = SentimentLSTM(vocab_size=len(tokenizer.get_vocab()), embedding_dim=64, hidden_dim=128, output_dim=1)
        model = model.to(rank)  # Move model to correct rank
        model = DDP(model, device_ids=[rank])

        criterion = nn.BCELoss()
        optimizer = optim.Adam(model.parameters(), lr=0.001)

        # Training Loop
        for epoch in range(5):  # Epochs can be adjusted
            print(f"Rank {rank}, Epoch {epoch} training started")

            model.train()
            for tokens, labels in train_loader:
                tokens, labels = tokens.to(rank), labels.to(rank)
                optimizer.zero_grad()
                outputs = model(tokens)
                loss = criterion(outputs.squeeze(), labels)
                loss.backward()
                optimizer.step()

            # Validation Step
            model.eval()
            val_loss = 0
            with torch.no_grad():
                for tokens, labels in val_loader:
                    tokens, labels = tokens.to(rank), labels.to(rank)
                    outputs = model(tokens)
                    val_loss += criterion(outputs.squeeze(), labels).item()

            print(f"Rank {rank}, Epoch {epoch}, Training Loss: {loss.item()}, Validation Loss: {val_loss / len(val_loader)}")

        print(f"Rank {rank} finished training")
    except Exception as e:
        print(f"Error occurred in rank {rank}: {e}")
    finally:
        cleanup()  # Ensure the cleanup is always called



In [52]:
def test(rank, world_size):
  print(rank, world_size)

In [None]:
def test(rank, world_size):
  print(rank, world_size)

world_size = 1  # Start with 2 processes instead of 4
mp.spawn(test, args=(world_size,), nprocs=world_size, join=True)


ProcessExitedException: process 0 terminated with exit code 1