<a href="https://colab.research.google.com/github/gupta24789/sentiment-analysis/blob/main/sentiment_lstm_lighting.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# !pip install -q lightning
# !pip install -q neattext

In [None]:
import pandas as pd
import numpy as np
import neattext as nt
import itertools

import warnings
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch import optim
from torch.nn.utils.rnn import  pad_sequence
from torch.utils.data import Dataset, DataLoader, TensorDataset


from torchmetrics import Accuracy
import pytorch_lightning as pl
from lightning.pytorch.loggers import CSVLogger
from lightning.pytorch.callbacks import TQDMProgressBar

warnings.filterwarnings('ignore')

In [None]:
train_df = pd.read_csv("https://raw.githubusercontent.com/gupta24789/sentiment-analysis/main/data/train.csv")
val_df = pd.read_csv("https://raw.githubusercontent.com/gupta24789/sentiment-analysis/main/data/val.csv")

train_df = train_df[['raw_tweet', 'label']].dropna().reset_index(drop = True)
val_df = val_df[['raw_tweet', 'label']].dropna().reset_index(drop = True)

In [None]:
## Clean the data
def custom_clean_text(x):
  x = nt.TextFrame(x)
  x = x.remove_stopwords().remove_urls().remove_emails().remove_dates().remove_puncts().remove_numbers().remove_userhandles().remove_multiple_spaces()
  x = x.text.lower()
  return x

train_df['processed_text'] = train_df.raw_tweet.apply(lambda x: custom_clean_text(x))
val_df['processed_text'] = val_df.raw_tweet.apply(lambda x: custom_clean_text(x))

In [None]:
train_df.head(4)

Unnamed: 0,raw_tweet,label,processed_text
0,Want to say a huge thanks to @WarriorAssaultS ...,1.0,want huge thanks #ff thanks support :)
1,@jaynehh_ you just need a job and get a letter...,1.0,need job letter work place saying work letter...
2,"@knhillrocks HA yes, make it quick tho :D",1.0,ha yes quick tho :d
3,@shartyboy Thanks for texting me back :)) I'm ...,1.0,thanks texting :)) im texting tomorrow :))


In [None]:
X_train = train_df.processed_text
y_train = train_df.label

X_val = val_df.processed_text
y_val = val_df.label

## Create Vocab

In [None]:
special_tokens = ['__PAD__','__UNK__']
words = list(set(itertools.chain.from_iterable(train_df.processed_text.apply(lambda x: x.split(" ")))))
words = special_tokens +  words
token2idx = {w:i for i,w in enumerate(words)}
idx2tokens = {i:w for i,w in enumerate(words)}
print(f"vocab size : {len(token2idx)}")

vocab size : 11332


## Convert words to numbers

In [None]:
def convert_token_to_number(tweet, verbose = False):
  unk_token_id = token2idx['__UNK__']
  encoded_tweet = []

  if verbose:
    print(f"UNK TOKEN ID : {unk_token_id}")
    print(f"RAW TWEET : {tweet}")

  for w in tweet.split(" "):
    if w in token2idx:
      encoded_tweet.append(token2idx[w])
    else:
      encoded_tweet.append(unk_token_id)

  return encoded_tweet

In [None]:
X_train_encoded = X_train.apply(lambda x: convert_token_to_number(x))
X_val_encoded = X_val.apply(lambda x: convert_token_to_number(x))

In [None]:
def data_collator(batch):
  features = [torch.tensor(item[0]) for item in batch]
  labels = torch.tensor([item[1] for item in batch], dtype = torch.float32)

  features = pad_sequence(features, batch_first=True, padding_value= token2idx['__PAD__'])

  return (features, labels)

In [None]:
batch_size = 32
train_dl = DataLoader(list(zip(X_train_encoded, y_train)), batch_size = batch_size, collate_fn = data_collator)
val_dl = DataLoader(list(zip(X_val_encoded, y_val)), batch_size = batch_size, collate_fn = data_collator)

In [None]:
example = next(iter(train_dl))
features, labels = example[0], example[1]
features.shape, labels.shape

(torch.Size([32, 15]), torch.Size([32]))

In [None]:
class RNNModel(pl.LightningModule):

  def __init__(self, vocab_size , emb_dim, hidden_dim, num_layers, batch_size, num_classes, learning_rate):
    super().__init__()
    ## define variable
    self.learning_rate = learning_rate

    ## define layers
    ## The embedding layer takes the vocab size and the embeddings size as input
    self.embedding = nn.Embedding(num_embeddings= vocab_size, embedding_dim= emb_dim)
    ## The LSTM layer takes in the the embedding size and the hidden vector size.
    self.lstm = nn.LSTM(emb_dim, hidden_dim, batch_first = True, num_layers= num_layers,bidirectional=False)
    self.dropout = nn.Dropout(0.1)
    self.relu = nn.ReLU()
    self.linear = nn.Linear(hidden_dim , num_classes)
    self.sigmoid = nn.Sigmoid()

    ## loss & metrics
    self.loss_fn = nn.BCELoss()
    self.train_accuracy = Accuracy(task = 'binary', num_classes= 2, threshold = 0.5)
    self.val_accuracy = Accuracy(task = 'binary', num_classes = 2, threshold= 0.5)

    ## initalize lstm state
    # self.init_hidden(num_layers, batch_size, hidden_dim)

  # def init_hidden(self, num_layers, batch_size, hidden_dim):
  #   # Create two new tensors with sizes n_layers x batch_size x hidden_dim,
  #   # initialized to zero, for hidden state and cell state of LSTM
  #   return (torch.zeros(num_layers, batch_size, hidden_dim), torch.zeros(num_layers, batch_size, hidden_dim))


  def forward(self, features, verbose = False):
    ## features shape : (batch_size, max_len)
    emb_out = self.embedding(features)
    ## emb_out shape : (batch_size, max_len, emb_dim)
    lstm_out, lstm_hidden = self.lstm(emb_out)
    ## lstm_out shape : (batch_size, max_len, hidden_dim)
    dropout_out = self.dropout(lstm_out)
    ## dropout_out shape : (batch_size, max_len, hidden_dim)
    linear_out = self.linear(dropout_out)
    ## linear_out shape : (batch_size, max_len, num_classes)
    # We extract the scores for the final hidden state since it is the one that matters.
    out = linear_out[:, -1]
    ## out shape : (batch_size, num_classes)
    sigmoid_out = self.sigmoid(out)
    ## sigmoid_out shape : (batch_size, num_classes)

    if verbose:
      print(f"Input shape : {features.shape}")
      print(f"EMB shape : {emb_out.shape}")
      print(f"LSTM shape : {lstm_out.shape}")
      print(f"dropout_out shape : {dropout_out.shape}")
      print(f"linear_out shape : {linear_out.shape}")
      print(f"out shape : {out.shape}")
      print(f"sigmoid_out shape : {sigmoid_out.shape}")

    return sigmoid_out, lstm_hidden


  def training_step(self, batch, batch_idx):
    features, labels = batch[0], batch[1]
    logits,hidden = self(features)
    logits = logits.squeeze(dim=1)
    loss = self.loss_fn(logits, labels)
    self.train_accuracy(logits, labels)
    self.log_dict({"train_loss": loss, "train_acc": self.train_accuracy}, on_step = False, on_epoch = True, prog_bar = True)

  def validation_step(self,batch, batch_idx):
    features, labels = batch[0], batch[1]
    logits,hidden = self(features)
    logits = logits.squeeze(dim=1)
    loss = self.loss_fn(logits, labels)
    self.val_accuracy(logits, labels)
    self.log_dict({"val_loss": loss, "val_acc":  self.val_accuracy}, on_step = False, on_epoch = True, prog_bar = True)

  def on_train_epoch_end(self):
    self.train_accuracy.reset()

  def on_validation_epoch_end(self):
    print(f"Epoch : {self.current_epoch} val accuracy : {self.val_accuracy.compute()}")
    self.val_accuracy.reset()

  def configure_optimizers(self):
    optimizer = optim.Adam(self.parameters(), lr = self.learning_rate)
    return optimizer

In [None]:
# ## Test the model
# model = RNNModel(len(token2idx), 100, 64, 1, batch_size, 1, learning_rate =0.001)


# logits = model(features, verbose = True)
# logits = logits.squeeze(dim=1)
# loss = model.loss_fn(logits, labels)
# print(f"\nLoss: {loss}")

In [None]:
## Build Trainer
model = RNNModel(len(token2idx), 100, 256, 2, batch_size, 1, learning_rate=0.00001)

## logger
logger = pl.loggers.CSVLogger("logs", name="sentiment_analysis")

## checkpoints
checkpoint_callback  = pl.callbacks.ModelCheckpoint(
                                                filename='{epoch}-{val_loss:.2f}-{val_accuracy:.2f}',
                                                every_n_epochs = 2,
                                                save_top_k = -1,
                                                monitor='val_loss',
                                                )


trainer = pl.Trainer(accelerator="cpu",
                     max_epochs = 10,
                     check_val_every_n_epoch=2,
                     callbacks=[checkpoint_callback],
                     logger=logger

                    )

## Train the Model
trainer.fit(model, train_dl, val_dl)

INFO: GPU available: False, used: False
INFO:lightning.pytorch.utilities.rank_zero:GPU available: False, used: False
INFO: TPU available: False, using: 0 TPU cores
INFO:lightning.pytorch.utilities.rank_zero:TPU available: False, using: 0 TPU cores
INFO: IPU available: False, using: 0 IPUs
INFO:lightning.pytorch.utilities.rank_zero:IPU available: False, using: 0 IPUs
INFO: HPU available: False, using: 0 HPUs
INFO:lightning.pytorch.utilities.rank_zero:HPU available: False, using: 0 HPUs
INFO:pytorch_lightning.callbacks.model_summary:
  | Name           | Type           | Params
--------------------------------------------------
0 | embedding      | Embedding      | 1.1 M 
1 | lstm           | LSTM           | 892 K 
2 | dropout        | Dropout        | 0     
3 | relu           | ReLU           | 0     
4 | linear         | Linear         | 257   
5 | sigmoid        | Sigmoid        | 0     
6 | loss_fn        | BCELoss        | 0     
7 | train_accuracy | BinaryAccuracy | 0     
8 | va

Sanity Checking: |          | 0/? [00:00<?, ?it/s]

Epoch : 0 val accuracy : 0.0


Training: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Epoch : 1 val accuracy : 0.5


Validation: |          | 0/? [00:00<?, ?it/s]

Epoch : 3 val accuracy : 0.5


Validation: |          | 0/? [00:00<?, ?it/s]

Epoch : 5 val accuracy : 0.5


Validation: |          | 0/? [00:00<?, ?it/s]

Epoch : 7 val accuracy : 0.5
