<a href="https://colab.research.google.com/github/Zinni98/Sentiment-analysis-project/blob/main/project.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Polarity Classification

### Get the data

In [1]:
import nltk
import torch
nltk.download("punkt")
nltk.download("movie_reviews")
nltk.download("subjectivity")

[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt.zip.
[nltk_data] Downloading package movie_reviews to /root/nltk_data...
[nltk_data]   Unzipping corpora/movie_reviews.zip.
[nltk_data] Downloading package subjectivity to /root/nltk_data...
[nltk_data]   Unzipping corpora/subjectivity.zip.


True

## TODO: Use spacy negation marking

In [2]:
from nltk.corpus import movie_reviews
from nltk.sentiment.util import mark_negation
import numpy as np


class MovieReviewsCorpus():
  def __init__(self):
    # list of documents, each document is a list containing words of that document
    self.mr = movie_reviews
    self.corpus, self.labels = self._flatten()
    self.unprocessed_corpus = self._get_corpus()
    self.corpus_words = self.get_corpus_words()
    self.vocab = self._create_vocab()

  
  def _list_to_str(self, doc) -> str:
    """
    Put all elements of the list into a single string, separating each element with a space.
    """
    return " ".join([w for sent in doc for w in sent])
  
  
  def _flatten(self):
    """
    Returns
    -------
    list[list[str]]
      Each inner list represents a document. Each document is a list of tokens.
    """

    # 3 nested list: each list contain a document, each inner list contains a phrase (until fullstop), each phrase contains words.
    neg = self.mr.paras(categories = "neg")
    pos = self.mr.paras(categories = "pos")

    corpus = [[w for w in self._list_to_str(d).split(" ")] for d in pos] + [[w for w in self._list_to_str(d).split(" ")] for d in neg]
    labels = [0] * len(pos) + [1] * len(neg)
    return corpus, labels
  
  def _get_corpus(self):
    neg = self.mr.paras(categories = "neg")
    pos = self.mr.paras(categories = "pos")

    return neg + pos

  def movie_reviews_dataset_raw(self):
    """
    Returns the dataset containing:
    
    - A list of all the documents
    - The corresponding label for each document

    Returns
    -------
    tuple(list, list)
      The dataset: first element is the list of the document, the second element of the tuple is the associated label (positive or negative) for each document
    """
    
    return self.corpus, self.labels
  
  def get_sentence_ds(self):
    neg = self.mr.paras(categories = "neg")
    pos = self.mr.paras(categories = "pos")

    pos = [phrase for doc in pos for phrase in doc]
    neg = [phrase for doc in neg for phrase in doc]

    labels = np.array([0] * len(pos) + [1] * len(neg))
    corpus = neg+pos
    return corpus, labels

  
  def get_corpus_words(self) -> list:
    return [w for doc in self.corpus for w in doc]

  def _create_vocab(self):
    vocab = dict()
    for word in self.corpus_words:
      try:
        vocab[word] += 1
      except:
        vocab[word] = 1
    return vocab

  def __len__(self):
    return len(self.corpus)


"""class MovieReviewsCorpusNegMarked(MovieReviewsCorpus):
  def __init__(self):
    super().__init__()
    self._neg_marking()
    self.corpus_words = self.get_corpus_words()
    self.vocab = self._create_vocab()
  
  def _neg_marking(self):
    negated_corpus = [self._mark(doc) for doc in self.corpus]
    self.corpus = negated_corpus
  
  def _mark(self, doc):
    # negates the whole document
    negated_doc = mark_negation(doc, double_neg_flip=True)
    return negated_doc"""

'class MovieReviewsCorpusNegMarked(MovieReviewsCorpus):\n  def __init__(self):\n    super().__init__()\n    self._neg_marking()\n    self.corpus_words = self.get_corpus_words()\n    self.vocab = self._create_vocab()\n  \n  def _neg_marking(self):\n    negated_corpus = [self._mark(doc) for doc in self.corpus]\n    self.corpus = negated_corpus\n  \n  def _mark(self, doc):\n    # negates the whole document\n    negated_doc = mark_negation(doc, double_neg_flip=True)\n    return negated_doc'

In [None]:
from torch.utils.data import Dataset
from torchtext.vocab import GloVe

class MovieReviewsDataset(Dataset):
  def __init__(self, raw_dataset):
    self.corpus = np.array(raw_dataset[0], dtype = object)
    self.labels = np.array(raw_dataset[1], dtype = object)
    self.max_element = len(max(self.corpus, key=lambda x: len(x)))
    self.elements_to_tensor()

  def __len__(self):
    return len(self.corpus)
  
  def elements_to_tensor(self):
    global_vectors = GloVe(name='840B', dim=300)
    for idx, item in enumerate(self.corpus):
      item_tensor = torch.empty(len(item), 300)
      for i in range(len(item)):
        token = item[i]
        item_tensor[i] = global_vectors.get_vecs_by_tokens(token)
      self.corpus[idx] = item_tensor
  
  def __getitem__(self, index):
    item = self.corpus[index]
    label = self.labels[index]
    return (item, label)

### Create the model class
Let's first try with a simple BiLSTM

In [None]:
from unicodedata import bidirectional
import torch.nn as nn
from torch.autograd import Variable
from torch.nn.utils.rnn import pad_packed_sequence
import torch.nn.functional as F

class BiLSTM(nn.Module):
  def __init__(self, device = "cuda", input_size = 300, hidden_size = 128, output_size = 2):
    super(BiLSTM, self).__init__()
    self.hidden_size = hidden_size
    self.device = device
    self.lstm = nn.LSTM(input_size, hidden_size, batch_first = True, bidirectional=True, num_layers = 2)
    self.fc = nn.Sequential(nn.ReLU(),
                            nn.BatchNorm1d(hidden_size*2, eps = 1e-08),
                            nn.Dropout(0.3),
                            nn.Linear(hidden_size*2, output_size)
                            )
    
    
  
  def init_hidden(self, batch_size):
      if self.cuda:
        return (torch.zeros(4, batch_size, self.hidden_size).to(self.device),
                torch.zeros(4, batch_size, self.hidden_size).to(self.device),)
  
  def forward(self, x):
    batch_size = x.batch_sizes[0].item()
    hidden = self.init_hidden(batch_size)

    # output: batch_size, sequence_length, hidden_size * 2 (since is bilstm)
    out, _ = self.lstm(x, hidden)
    out, input_sizes = pad_packed_sequence(out, batch_first=True)
    # Interested only in the last layer
    out = out[list(range(batch_size)), input_sizes - 1, :]
    out = self.fc(out)

    return out
    

# Inspired by https://pytorch.org/tutorials/intermediate/seq2seq_translation_tutorial.html#the-decoder
class AttnDecoderRNN(nn.Module):
    def __init__(self, hidden_size, output_size, max_length, dropout_p=0.1, device = "cuda"):
        super(AttnDecoderRNN, self).__init__()
        self.hidden_size = hidden_size
        self.output_size = output_size
        self.dropout_p = dropout_p
        self.max_length = max_length

        self.attn = nn.Linear(self.hidden_size * 2, self.max_length)
        self.attn_combine = nn.Linear(self.hidden_size * 2, self.hidden_size)
        self.dropout = nn.Dropout(self.dropout_p)
        self.lstm = nn.LSTM(self.hidden_size, self.hidden_size)
        self.out = nn.Linear(self.hidden_size, self.output_size)

    def forward(self, input, hidden, encoder_outputs):
        embedded = self.dropout(input)

        attn_weights = F.softmax(
            self.attn(torch.cat((embedded[0], hidden[0]), 1)), dim=1)
        attn_applied = torch.bmm(attn_weights.unsqueeze(0),
                                 encoder_outputs.unsqueeze(0))

        output = torch.cat((embedded[0], attn_applied[0]), 1)
        output = self.attn_combine(output).unsqueeze(0)

        output = F.relu(output)
        output, hidden = self.lstm(output, hidden)

        output = F.log_softmax(self.out(output[0]), dim=1)
        return output, hidden, attn_weights

    def initHidden(self):
        return torch.zeros(1, 1, self.hidden_size, device=device)




In [None]:
def training_step(net, data_loader, optimizer, cost_function, device = 'cuda'):
  cumulative_loss = 0
  cumulative_accuracy = 0
  samples = 0

  net.train()

  for batch_idx, (inputs, targets) in enumerate(data_loader):

    inputs = inputs.to(device)
    targets = targets.to(device)
    in_size = targets.size(dim=0)

    outputs = net(inputs)

    loss = cost_function(outputs, targets)

    loss.backward()

    optimizer.step()

    optimizer.zero_grad()
    
    samples += in_size
    cumulative_loss += loss.item()
    _, predicted = outputs.max(dim=1)

    cumulative_accuracy += predicted.eq(targets).sum().item()

  return cumulative_loss/samples, (cumulative_accuracy/samples)*100

In [None]:
def test_step(net, data_loader, cost_function, device = 'cuda'):
  cumulative_loss = 0
  cumulative_accuracy = 0
  samples = 0

  net.eval()

  with torch.no_grad():

    for batch_idx, (inputs, targets) in enumerate(data_loader):
      inputs = inputs.to(device)
      targets = targets.to(device)
      in_size = targets.size(dim=0)

      outputs = net(inputs)

      loss = cost_function(outputs, targets)

      samples += in_size
      cumulative_loss += loss.item()
      _, predicted = outputs.max(dim=1)

      cumulative_accuracy += predicted.eq(targets).sum().item()

    return cumulative_loss/samples, (cumulative_accuracy/samples)*100


In [None]:
from abc import ABC, abstractmethod
import torch.optim as optim

class AnnealingOptimizer(torch.optim.Optimizer, ABC):
  """
  Defines and abstract class in order to implement an sgd optimizer using an annealing strategy
  """
  def __init__(self, model, nr_epochs, lr: float = 0.001, epoch: int = 0) -> None:
    if not 0.0 <= lr:
      raise ValueError(f"Invalid learning rate: {lr}")
    if not 0 <= epoch:
      raise ValueError(f"Invalid epoch value: {epoch}")
    
    self.nr_epochs = nr_epochs
    self.epoch = epoch
    self._alpha = 10
    self._beta = 0.75
    self._base_lr = lr

  def update_lr(self):
    """
    Updates the learning rate using the annealing strategy.
    In order to let the annealing strategy to work correctly, this method should be called at every epoch during the network training

    The learning rate for the classifier is 10 times bigger as proposed in the [Symnet paper](https://arxiv.org/pdf/1904.04663.pdf)
    """
    self.epoch += 1
    new_lr = self._compute_lr()
    for g in self.optimizer.param_groups:
      if g["name"] == "fe":
        g["lr"] = new_lr
      else:
        g["lr"] = new_lr*10

    
  def _compute_lr(self):
    """
    Computes the learning rate using the proposed annealing strategy

    Returns
    -------
    float
      updated learning rate
    """
    etap = 1 / ((1 + self._alpha * self.epoch / self.nr_epochs ) ** self._beta)
    return self._base_lr * etap

  def step(self):
    self.optimizer.step()
  
  def zero_grad(self):
    self.optimizer.zero_grad()


class BiLSTMOptimizer(AnnealingOptimizer):
  """
  Implements an annealing optimizer for Resnet
  """
  def __init__(self, model, nr_epochs, lr: float = 0.001, epoch: int = 0, momentum: float = 0.9) -> None:
    super(BiLSTMOptimizer ,self).__init__(model, nr_epochs, lr, epoch)
    
    # Note that names for parameters group are important in order to update each group differently
    self.optimizer = optim.SGD([
                {'params': model.lstm.parameters(), "name": "fe"},
                {'params': model.fc.parameters(), "lr": self._compute_lr()*10, "name": "classifier"}
            ], lr=lr, momentum=momentum)
    

In [None]:
from torch.utils.data import DataLoader
from torch.optim import Adam
import torch.nn as nn

def main(train_loader, test_loader, max_element, device = "cuda", epochs = 10):

  net = BiLSTM(device = device).to(device)
  decoder = 

  optimizer = Adam(net.parameters(), 0.001, betas = (0.9, 0.999), amsgrad=True)

  cost_function = nn.CrossEntropyLoss()

  for e in range(epochs):
    print(f"epoch {e}:")
    train_loss, train_accuracy = training_step(net, train_loader, optimizer, cost_function, device)
    print(f"Training loss: {train_loss} \n Training accuracy: {train_accuracy}")
    test_loss, test_accuracy = test_step(net, test_loader, cost_function, device)
    print(f"Test loss: {test_loss} \n Test accuracy: {test_accuracy}")
    print("------------------------------------------------------------------")
  
  _, test_accuracy = test_step(net, test_loader, cost_function, device)


  return test_accuracy


In [None]:
from typing import List
from torch.nn.utils.rnn import pack_padded_sequence
from torch.utils.data import Subset
from sklearn.model_selection import train_test_split

def pad(batch, max_size):
  pad = torch.zeros(batch[0].size(dim=1))
  for idx in range(len(batch)):
    remaining = max_size - batch[idx].size(dim = 0)
    batch[idx] = torch.cat((batch[idx], pad.repeat((remaining, 1))), dim = 0)
  return batch

def batch_to_tensor(X: List[torch.tensor], max_size):
  X_tensor = torch.zeros(len(X), max_size, X[0].size(dim = 1))
  for i, embed in enumerate(X):
    X_tensor[i] = embed
  return X_tensor

def sort_ds(X, Y):
  """
  Sort inputs by document lengths
  """
  document_lengths = np.array([tens.size(dim = 0) for tens in X])
  indexes = np.argsort(document_lengths)

  X_sorted = X[indexes][::-1]
  Y_sorted = Y[indexes][::-1]
  document_lengths = torch.from_numpy(document_lengths[indexes][::-1].copy())

  return X_sorted, Y_sorted, document_lengths



def collate(batch):
  X, Y = list(zip(*batch))
  Y = np.array(list(Y))
  X = np.array(list(X))

  # Sort dataset
  X, Y, document_lengths = sort_ds(X, Y)

  # Get tensor sizes
  max_size = torch.max(document_lengths).item()

  # Pad tensor each element
  X = pad(X, max_size)

  # Transform the batch to a tensor
  X_tensor = batch_to_tensor(X, max_size)
  
  # Return the padded sequence object
  X_final = pack_padded_sequence(X_tensor, document_lengths, batch_first=True)
  return X_final, torch.from_numpy(Y.copy())

def get_data(batch_size: int, collate_fn):
  batch = MovieReviewsCorpus()

  dataset = MovieReviewsDataset(batch.movie_reviews_dataset_raw())

  max_element = dataset.max_element

  # Random Split

  train_indexes, test_indexes = train_test_split(list(range(len(dataset.labels))), test_size = 0.2,
                                                 stratify = dataset.labels, random_state = 42)
  
  train_ds = Subset(dataset, train_indexes)
  test_ds = Subset(dataset, test_indexes)

  train_loader = DataLoader(train_ds, batch_size = batch_size, collate_fn = collate_fn, pin_memory=True)
  test_loader = DataLoader(test_ds, batch_size = batch_size, collate_fn = collate_fn, pin_memory=True)

  return train_loader, test_loader, max_element

In [None]:
train_loader, test_loader, max_element = get_data(128, collate)

  


In [None]:
accuracy = main(train_loader, test_loader, max_element, device = "cuda", epochs = 50)

print(f"Overall accuracy: {accuracy}")

epoch 0:




tensor([0, 0, 1, 1, 0, 0, 1, 1, 1, 0, 0, 0, 1, 0, 1, 0, 0, 1, 1, 1, 0, 1, 1, 0,
        0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 0, 0, 1, 0, 1, 0,
        1, 0, 0, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0, 1, 1, 0, 0, 1, 0, 0, 0, 1, 0, 1,
        0, 0, 0, 0, 1, 1, 0, 1, 1, 0, 0, 0, 1, 1, 1, 1, 1, 0, 0, 1, 0, 1, 1, 1,
        0, 0, 1, 0, 1, 0, 0, 1, 1, 0, 0, 0, 1, 1, 0, 0, 0, 1, 0, 1, 1, 0, 0, 1,
        1, 0, 1, 0, 0, 0, 1, 1], device='cuda:0')
tensor([[-3.6912e-03,  7.0637e-01],
        [-4.3693e-01, -3.9087e-01],
        [ 1.0633e+00,  7.6288e-02],
        [ 5.4099e-01,  5.5560e-02],
        [-3.3789e-01,  3.3105e-01],
        [-4.6595e-01, -4.0238e-01],
        [ 1.8964e-02,  4.3120e-01],
        [-5.6574e-01,  5.9941e-01],
        [-1.7378e-01,  1.3300e-01],
        [-3.3249e-01,  6.6812e-02],
        [-4.1974e-01, -6.7901e-01],
        [-4.2615e-01, -9.3330e-02],
        [ 6.2844e-01,  3.4237e-01],
        [-5.5788e-01,  7.8848e-01],
        [ 8.5738e-01, -7.6176e-02],
        [ 

KeyboardInterrupt: ignored

In [None]:
"""
tensor([1315, 1222, 1011, 1010,  936,  862,  814,  807,  807,  764,  718,  515,
         495,  388,  344,  323])
tensor([1617, 1361, 1311, 1178, 1081, 1068,  958,  941,  925,  768,  688,  619,
         604,  573,  484,  405])
"""

# First try to parse phrases documet-wise, then try to parse each phrase of a document separately, and then aggregate the result (if there are more positive phrases then positive, otherwise negative). (Try also to give a weight depending on the number of sentiment lexemes)

### Training procedure

### Main function containing also cross validation

### (Possible improvement, apply UDA to GLOVE)

In [None]:
from nltk.tokenize.stanford import StanfordTokenizer
from torchtext.vocab import GloVe

class VectorizerPipeline():

  def __init__(self, corpus, pipe= {"tokenizer": "stanford", "embedding": "glove"}, embedding_size: int = 300):
    self.corpus = corpus
    if embedding_size:
      self.embedding_size = embedding_size
    else:
      self.embedding_size = 300
    
    self._allowed = {
        "tokenizer": ["stanford"],
        "embedding": ["glove"],
        "lemmatizer": [],
        "stop-word-removal": [],
    }
    self.pipe = {
        "tokenizer": None,
        "embedding": None,
        "lemmatizer": None,
        "stop-word-removal": None,
    }
    if pipe:
      for key, value in pipe.items():
        try:
          if pipe[key] in self._allowed[key]:
            self.pipe[key] = value
          else:
            raise ValueError(f"Invalid type of {key}. \n Valid {key}s are {self._allowed[key]}")
        except KeyError:
          raise KeyError(f"Invalid step in the pipeline: {key}. \n valid steps are {list(self._allowed.keys())}")


  def tokenization(self, batch):
    tok = StanfordTokenizer()
    X = [tok(x) for x in batch]
    return X
  
  def embedding(self, batch):
    max_length = max(batch, key=len)

  def vectorize(self, batch):
    Y, X = list(zip(*batch))
    pass








ModuleNotFoundError: ignored

In [None]:
from torch.utils.data import DataLoader

corpus = MovieReviewsCorpus()

dataset = MovieReviewsDataset(corpus.movie_reviews_dataset_raw())

pipeline = VectorizerPipeline(corpus)

dataloader = DataLoader(dataset, batch_size = 64, collate_fn = pipeline.vectorize())

KeyboardInterrupt: ignored

In [None]:
import operator
from tqdm import tqdm
from torchtext.vocab import GloVe
import torch

corpus = MovieReviewsCorpus()

global_vectors = GloVe(name='840B', dim=300)

def check_coverage(vocab,embeddings_index):
    a = {}
    oov = {}
    k = 0
    i = 0
    null_embedding = torch.tensor([0.0]*300)
    for word in tqdm(vocab):
        try:
          if torch.equal(embeddings_index.get_vecs_by_tokens(word), null_embedding):
            raise KeyError
          a[word] = embeddings_index.get_vecs_by_tokens(word)
          k += vocab[word]
        except:

            oov[word] = vocab[word]
            i += vocab[word]
            pass

    print()
    print('Found embeddings for {:.2%} of vocab'.format(len(a) / len(vocab)))
    print('Found embeddings for  {:.2%} of all text'.format(k / (k + i)))
    sorted_x = sorted(oov.items(), key=operator.itemgetter(1))[::-1]

    return sorted_x


oov = check_coverage(corpus.vocab, global_vectors)