# Sentiment Analysis

## Imports & Configs

In [None]:
import re
import torch
import torch.nn as nn
import torch.nn.functional as F
import pandas as pd
import numpy as np
from torch.utils.data import Dataset , DataLoader
from torch.nn.utils.rnn import pad_sequence
from sklearn.model_selection import train_test_split
from sklearn.metrics import f1_score, accuracy_score
from tqdm import tqdm
import nltk
nltk.download('punkt')

# setup device agnostic code
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
device


## Load Dataset (IMDB from huggingface)

In [None]:
from datasets import load_dataset
dataset = load_dataset("imdb")

train_texts = dataset['train']['text']
train_labels = dataset['train']['label']
test_texts  = dataset['test']['text']
test_labels  = dataset['test']['label']

In [None]:
print(f'Train size: {len(train_texts)}')
print(f'Test size: {len(test_texts)}')
train_texts[1] , train_labels[1]

## Preprocessing

In [None]:
# text cleaning utility
def clean_text(string):
  string = string.lower()
  string = re.sub(r"https\S+","",string)
  string = re.sub(r"@\w+", "", string)
  string = re.sub(r"[^a-z0-9\s']"," ",string)
  string = re.sub(r"\s+"," ",string).strip()

  return string

In [None]:
# applying utility on train-test data
train_texts = [clean_text(t) for t in train_texts]
test_texts = [clean_text(t) for t in test_texts]

## Tokenization & Vocabulary

In [None]:
from collections import Counter

def build_vocab(texts,min_freq = 2, max_size = 200000):
  counter = Counter()
  for t in texts:
    counter.update(t.split())
  most_common = [w for w, c in counter.most_common(max_size) if c >=min_freq]
  itos = ["<PAD>","<OOV>"] + most_common
  stoi = {w:i for i , w in enumerate(itos)}
  return stoi , itos

stoi , itos = build_vocab(train_texts)
vocab_size = len(stoi)
print("Vocab Size: ", vocab_size)

In [None]:
def texts_to_sequence(texts,stoi):
  seqs = []
  for t in texts:
    seq = [stoi.get(w,stoi["<OOV>"]) for w in t.split()]
    seqs.append(torch.tensor(seq, dtype= torch.long))
  return seqs

train_seq = texts_to_sequence(train_texts,stoi)
test_seq = texts_to_sequence(test_texts , stoi)

## Dataset & DataLoader

In [None]:
class TextDataset(Dataset):
  def __init__(self,seqs, labels):
    self.seqs = seqs
    self.labels = labels
  def __len__(self):
    return len(self.seqs)
  def __getitem__(self, idx):
    return self.seqs[idx], torch.tensor(self.labels[idx], dtype=torch.float)

def collate_fn(batch):
  seqs , labels = zip(*batch)
  seqs_padded = pad_sequence(seqs, batch_first=True, padding_value= 0)
  lengths = torch.tensor([len(s) for s in seqs])
  labels = torch.stack(labels)
  return seqs_padded, lengths, labels


In [None]:
train_ds = TextDataset(train_seq,train_labels)
test_ds = TextDataset(test_seq, test_labels)

train_loader = DataLoader(train_ds,
                          batch_size=64,
                          shuffle=True,
                          collate_fn= collate_fn)

test_loader = DataLoader(test_ds,
                          batch_size=64,
                          collate_fn= collate_fn)

## Defining Model (LSTM)

In [None]:
class RNNClassifier(nn.Module):
  def __init__(self, vocab_size, emb_dim = 128, hidden_dim = 128, n_layers = 1,
               bidirectional = True, dropout = 0.5):

    super().__init__()
    self.emb = nn.Embedding(vocab_size, emb_dim , padding_idx=0)
    self.lstm = nn.LSTM(emb_dim, hidden_dim, num_layers= n_layers,
                        batch_first = True, bidirectional=bidirectional,
                        dropout= dropout if n_layers > 1 else 0)
    self.fc = nn.Sequential(
        nn.Linear(hidden_dim * (2 if bidirectional else 1 ), 64),
        nn.ReLU(),
        nn.Dropout(0.5),
        nn.Linear(64,1)
    )

  def forward(self,x, lengths = None):
    emb = self.emb(x)
    out , (h,c) = self.lstm(emb)
    h_last = torch.cat((h[-2], h[-1]), dim = 1) if self.lstm.bidirectional else h[-1]
    logits = self.fc(h_last)
    return logits.squeeze(1)


## Training Loop

In [None]:
# Training section

model = RNNClassifier(vocab_size).to(device)
optimizer = torch.optim.Adam(model.parameters(),lr= 0.001)

def train_one_epoch(model,dataloader):
  model.train()
  total_loss, preds , trues = 0, [], []
  for X, lengths , y in tqdm(dataloader):
    X,y = X.to(device), y.to(device)
    optimizer.zero_grad()
    logits = model(X, lengths)
    loss = F.binary_cross_entropy_with_logits(logits,y)
    loss.backward()
    optimizer.step()

    total_loss += loss.item() * X.size(0)
    pred = (torch.sigmoid(logits) > 0.5).int().cpu().numpy()
    preds.extend(pred)
    trues.extend(y.cpu().numpy())

  avg_loss = total_loss / len(dataloader.dataset)
  acc = accuracy_score(trues , preds)
  f1 = f1_score(trues, preds)
  return acc , f1 , loss



In [None]:
# Evaluation section
@torch.no_grad()
def evaluate(model, dataloader):
  model.eval()
  preds , trues = [] , []
  for X,lengths, y  in dataloader:
    X , y = X.to(device), y.to(device)
    logits = model(X, lengths)
    pred = (torch.sigmoid(logits) > 0.5).int().cpu().numpy()
    preds.extend(pred)
    trues.extend(y.cpu().numpy())

  acc = accuracy_score(trues, preds)
  f1 = f1_score(trues, preds)
  return acc , f1

## Train...!

In [None]:
epochs = 3
for epoch in range(epochs):
  loss, acc , f1 = train_one_epoch(model, train_loader)
  val_acc , val_f1 = evaluate(model, test_loader)
  print(f"epoch {epoch+1}/{epochs} | Loss = {loss:.4f} | Train Acc: {acc:.3f} | Val Acc: {val_acc:.3f} | F1: {val_f1:.3f}")