<a href="https://colab.research.google.com/github/davidemichelon11/NLU/blob/main/NLU_project.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [58]:
import nltk
import numpy as np
from nltk.corpus import movie_reviews
from nltk.corpus import subjectivity

nltk.download('punkt')
nltk.download('vader_lexicon')
nltk.download('subjectivity')

[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package vader_lexicon to /root/nltk_data...
[nltk_data]   Package vader_lexicon is already up-to-date!
[nltk_data] Downloading package subjectivity to /root/nltk_data...
[nltk_data]   Package subjectivity is already up-to-date!


True


**BASELINE SUBJECTIVITY**



In [59]:
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.naive_bayes import MultinomialNB
from sklearn.model_selection import StratifiedKFold
from sklearn.model_selection import cross_validate
from sklearn.metrics import classification_report
from sklearn.model_selection import train_test_split
from sklearn import svm

In [60]:
def doc2string(doc):
  return " ".join([w for sent in doc for w in sent])

def sent2string(sent):
  return " ".join([w for w in sent])

In [61]:
vectorizer = CountVectorizer()
classifier_NB = MultinomialNB()

subj_docs = [(sent, 'subj') for sent in subjectivity.sents(categories='subj')]
obj_docs = [(sent, 'obj') for sent in subjectivity.sents(categories='obj')]

corpus = [sent2string(d[0]).lower() for d in subj_docs] + [sent2string(d[0]).lower() for d in obj_docs]
vectors = vectorizer.fit_transform(corpus)

labels = np.array(['subj'] * len(subj_docs) + ['obj'] * len(obj_docs))
scores = cross_validate(classifier_NB, vectors, labels, cv=StratifiedKFold(n_splits=10) , scoring=['f1_micro'])
average = sum(scores['test_f1_micro'])/len(scores['test_f1_micro'])
print(round(average, 3))

0.921


In [62]:
# NB and SVM for subj
classifier_NB2_subj = MultinomialNB()
classifier_SVM_subj = svm.SVC()

corpus = [sent2string(d[0]).lower() for d in subj_docs] + [sent2string(d[0]).lower() for d in obj_docs]
labels = np.array(['subj'] * len(subj_docs) + ['obj'] * len(obj_docs))
train_samples, test_samples, train_labels, test_labels = train_test_split(corpus, labels, test_size=0.3)
vectors = vectorizer.fit_transform(train_samples + test_samples)

classifier_NB2_subj.fit(vectors[:len(train_samples)], train_labels)
labels_pred_NB2 = classifier_NB2_subj.predict(vectors[len(train_labels):])
print(classification_report(test_labels, labels_pred_NB2, digits=3))

classifier_SVM_subj.fit(vectors[:len(train_samples)], train_labels)
labels_pred_SVM = classifier_SVM_subj.predict(vectors[len(train_labels):])
print(classification_report(test_labels, labels_pred_SVM, digits=3))

              precision    recall  f1-score   support

         obj      0.933     0.905     0.919      1516
        subj      0.906     0.934     0.920      1484

    accuracy                          0.919      3000
   macro avg      0.920     0.919     0.919      3000
weighted avg      0.920     0.919     0.919      3000

              precision    recall  f1-score   support

         obj      0.894     0.872     0.883      1516
        subj      0.873     0.895     0.884      1484

    accuracy                          0.883      3000
   macro avg      0.883     0.883     0.883      3000
weighted avg      0.884     0.883     0.883      3000



**BASELINE via SVM - SA**

In [63]:
nltk.download('movie_reviews')
mr = movie_reviews
neg = mr.paras(categories='neg')
pos = mr.paras(categories='pos')

[nltk_data] Downloading package movie_reviews to /root/nltk_data...
[nltk_data]   Package movie_reviews is already up-to-date!


In [64]:
vectorizer2 = CountVectorizer()
classifier_sa = svm.SVC()

corpus = [doc2string(p) for p in pos] + [doc2string(n) for n in neg]
labels = np.array([0] * len(pos) + [1] * len(neg))
train_samples, test_samples, train_labels, test_labels = train_test_split(corpus, labels, test_size=0.1)

vectors = vectorizer2.fit_transform(train_samples + test_samples)
classifier_sa.fit(vectors[:len(train_samples)], train_labels)
labels_pred = classifier_sa.predict(vectors[len(train_labels):])

print(classification_report(test_labels, labels_pred, digits=3))

              precision    recall  f1-score   support

           0      0.759     0.670     0.712        94
           1      0.735     0.811     0.771       106

    accuracy                          0.745       200
   macro avg      0.747     0.741     0.742       200
weighted avg      0.746     0.745     0.743       200



In [65]:
# For each review, remove obj sentences and compute the SVM
vectorizer3 = CountVectorizer()
classifier_sa2 = svm.SVC()

def get_new_rev(original):
  new_list = []
  for rev in original:
    new_rev = []
    for s in rev:
      vector = vectorizer.transform([sent2string(s)]).toarray()
      if classifier_NB2_subj.predict(vector) == ['subj']: 
        new_rev.append(s)
    new_list.append(new_rev)
  return new_list
              
new_pos = get_new_rev(pos)
new_neg = get_new_rev(neg)

corpus_ = [doc2string(p) for p in new_pos] + [doc2string(n) for n in new_neg]
labels_ = np.array([0] * len(new_pos) + [1] * len(new_neg))
train_samples_, test_samples_, train_labels_, test_labels_ = train_test_split(corpus_, labels_, test_size=0.1)

vectors_ = vectorizer3.fit_transform(train_samples_ + test_samples_)
classifier_sa2.fit(vectors_[:len(train_samples_)], train_labels_)
labels_pred_ = classifier_sa2.predict(vectors_[len(train_labels_):])

print(classification_report(test_labels_, labels_pred_, digits=3))

              precision    recall  f1-score   support

           0      0.825     0.748     0.784       107
           1      0.738     0.817     0.776        93

    accuracy                          0.780       200
   macro avg      0.781     0.782     0.780       200
weighted avg      0.784     0.780     0.780       200

accuracy:  0.78


**VADER Baseline**

In [66]:
from nltk.sentiment.vader import SentimentIntensityAnalyzer, VaderConstants

In [67]:
# Analyse complete review
analyzer = SentimentIntensityAnalyzer()
labels_vader = np.array([0] * len(neg) + [1] * len(pos))
prediction_val = [analyzer.polarity_scores(doc2string(v)) for v in (pos + neg)]
prediction_labels = [0 if p['pos'] > p['neg'] else 1 for p in prediction_val]

print(classification_report(labels_vader, prediction_labels, digits=3))

              precision    recall  f1-score   support

           0      0.583     0.842     0.689      1000
           1      0.715     0.397     0.511      1000

    accuracy                          0.620      2000
   macro avg      0.649     0.619     0.600      2000
weighted avg      0.649     0.620     0.600      2000



In [68]:
# Analyse each sentence of review, sum sentences contribution as 1
prediction_labels = []

for rev in (pos+neg):
  pos_ = 0
  neg_ = 0
  for sent in rev:
    p = analyzer.polarity_scores(" ".join([w for w in sent]))
    if p['pos'] > p['neg']: pos_ += 1
    else: neg_ += 1
  prediction_labels.append(0 if pos_ > neg_ else 1)
print(classification_report(labels_vader, prediction_labels, digits=3))

prediction_labels = []

for rev in (pos+neg):
  pos_ = 0
  neg_ = 0
  for sent in rev:
    p = analyzer.polarity_scores(" ".join([w for w in sent]))
    if p['pos'] > p['neg']: pos_ += p['pos']
    else: neg_ += p['neg']
  prediction_labels.append(0 if pos_ > neg_ else 1)
print(classification_report(labels, prediction_labels, digits=3))

              precision    recall  f1-score   support

           0      0.698     0.500     0.583      1000
           1      0.611     0.784     0.687      1000

    accuracy                          0.642      2000
   macro avg      0.654     0.642     0.635      2000
weighted avg      0.654     0.642     0.635      2000

              precision    recall  f1-score   support

           0      0.602     0.843     0.702      1000
           1      0.738     0.442     0.553      1000

    accuracy                          0.642      2000
   macro avg      0.670     0.642     0.628      2000
weighted avg      0.670     0.642     0.628      2000



If time, another baseline on ['neu']
Aggiungere _NEG anche baseline

In [193]:
import torch
import torch.nn as nn
import torch.utils.data as data
from torch.utils.data import DataLoader

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
criterion = nn.CrossEntropyLoss()

In [194]:
import unicodedata
import re

# scroll all reviews and create matrix [n_reviews x n_words]
class Rev2Vec:
  def __init__(self):
    self.word2index = {}
    self.word2count = {}
    self.n_words = 0

  def addRev(self, rev):
    for word in rev.split(' '):
      self.addWord(word)

  def addWord(self, word):
    if word not in self.word2index:
      self.word2index[word] = self.n_words
      self.word2count[word] = 1
      self.n_words += 1
    else:
      self.word2count[word] += 1

def unicodeToAscii(s):
  return ''.join(
    c for c in unicodedata.normalize('NFD', s)
    if unicodedata.category(c) != 'Mn'
  )

def normalizeString(s):
  s = unicodeToAscii(s.lower().strip())
  s = re.sub(r"([.!?])", r" \1", s)
  s = re.sub(r"[^a-zA-Z.!?]+", r" ", s)
  return s

def prepare_data(reviews: list):
  voc = Rev2Vec()
  for rev in reviews:
    voc.addRev(normalizeString(rev))
  return voc

def create_dataset(v, reviews):
  revs2Tens = torch.zeros(len(reviews), v.n_words)
  for i, rev in enumerate(reviews):
    word_emb = []
    rev_arr = normalizeString(rev).split()
    for w in rev_arr:
      revs2Tens[ i,v.word2index[str(w)]] += 1
  return revs2Tens

In [195]:
class ObjDataset (data.Dataset):
  def __init__(self, rev, labels):
    self.rev = rev
    self.labels = labels

  def __len__(self):
    return len(self.rev)

  def __getitem__(self, idx: int):
    return self.rev[idx], torch.tensor(self.labels[idx])

In [196]:
batch_size = 100

corpus = [sent2string(d[0]).lower() for d in subj_docs] + [sent2string(d[0]).lower() for d in obj_docs]
labels = np.append(np.zeros((len(subj_docs)), dtype=int), np.ones((len(subj_docs)), dtype=int))

dataset_tor = create_dataset(v, corpus)
train_samples, test_samples, train_labels, test_labels = train_test_split(dataset_tor, labels, test_size=0.3)
train_dataset = ObjDataset(train_samples, train_labels)
test_dataset = ObjDataset(test_samples, test_labels)

train_loader = torch.utils.data.DataLoader(train_dataset, batch_size, shuffle=True)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size)

In [197]:
class RNN(nn.Module):
  def __init__(self, input_size, hidden_size, output_size):
    super(RNN, self).__init__()
    
    self.input_size = input_size
    self.hidden_size = hidden_size
    self.output_size = output_size
    
    self.i2h = nn.Linear(input_size + hidden_size, hidden_size)
    self.i2o = nn.Linear(input_size + hidden_size, output_size)
    self.softmax = nn.LogSoftmax(dim=1)
  
  def forward(self, input, hidden):
    combined = torch.cat((input, hidden), 1)
    hidden = self.i2h(combined)
    output = self.i2o(combined)
    output = self.softmax(output)
    return output, hidden

  def init_hidden(self,shape=1):
    return torch.zeros(shape, self.hidden_size)

class RNNcat(nn.Module):
  def __init__(self, input_size, hidden_size, output_size):
    super(RNN, self).__init__()
    self.hidden_size = hidden_size

    self.i2h = nn.Linear(n_categories + input_size + hidden_size, hidden_size)
    self.i2o = nn.Linear(n_categories + input_size + hidden_size, output_size)
    self.o2o = nn.Linear(hidden_size + output_size, output_size)
    self.dropout = nn.Dropout(0.1)
    self.softmax = nn.LogSoftmax(dim=1)

  def forward(self, category, input, hidden):
    input_combined = torch.cat((category, input, hidden), 1)
    hidden = self.i2h(input_combined)
    output = self.i2o(input_combined)
    output_combined = torch.cat((hidden, output), 1)
    output = self.o2o(output_combined)
    output = self.dropout(output)
    output = self.softmax(output)
    return output, hidden

  def initHidden(self):
    return torch.zeros(1, self.hidden_size)
# class RNN(nn.Module):
#   def __init__(self, input_size, hidden_size, output_size):
#     super(RNN, self).__init__()
    
#     self.input_size = input_size
#     self.hidden_size = hidden_size
#     self.output_size = output_size
    
#     self.i2h = nn.RNN(input_size, hidden_size)
#     self.i2o = nn.Linear(hidden_size, output_size)
  
#   def forward(self, input, hidden=None):
#     if hidden==None:
#       hidden = self.init_hidden(input.shape[1])
#     print(input.shape)
#     print(hidden.shape)
#     output, _ = self.i2h(input, hidden)
#     output = self.i2o(output[-1])
#     return output

#   def init_hidden(self,shape=1):
#     return torch.zeros(shape, self.hidden_size)

In [198]:
# training
def train(rnn, optimizer, train_loader, hidden):

  cumulative_accuracy = 0
  samples=0

  for x,y in train_loader:
    x,y = x.to(device),y.to(device)
    outputs, next_hidden = rnn(x, hidden)
    loss = criterion(outputs, y)
    _, predicted = outputs.max(1)

    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

    samples += x.shape[0]
    cumulative_accuracy += predicted.eq(y).sum().item()
  return cumulative_accuracy/samples*100

In [199]:
def evaluate(rnn, test_loader, hidden):

  cumulative_accuracy = 0
  samples=0

  for x,y in test_loader:
    x,y = x.to(device),y.to(device)
    outputs, next_hidden = rnn(x, hidden)
    loss = criterion(outputs, y)
    _, predicted = outputs.max(1)
    samples += x.shape[0]
    cumulative_accuracy += predicted.eq(y).sum().item()
  return cumulative_accuracy/samples*100

In [200]:
epochs = 10
learning_rate = 0.005
n_hidden = 128
n_categories = 2
current_loss = 0

hidden = torch.zeros(batch_size, n_hidden)
n_features = v.n_words
rnn = RNN(n_features, n_hidden, n_categories)
rnn_cat = RNNcat((n_features, n_hidden, n_categories))
optimizer = torch.optim.SGD(rnn.parameters(), lr=learning_rate)

#Evaluation before training
test_accuracy = evaluate(rnn, test_loader, hidden)
print('Before training, test accuracy: {:.2f}'.format(test_accuracy))
for e in range(epochs):
  train_accuracy = train(rnn, optimizer, train_loader, hidden)
#Evaluation after testing
test_accuracy = evaluate(rnn, test_loader, hidden)
print('After training, test accuracy: {:.2f}'.format(test_accuracy))

Before training, test accuracy: 48.00
After training, test accuracy: 78.80
