Demo for aspect-based Sentiment Analysis shown at the jambit CoffeeTalks on 6th of March, 2020.

Based mainly on [Ben Trevett's PyTorch Sentiment Analysis](https://github.com/bentrevett/pytorch-sentiment-analysis). The training and testing data is a prepared csv version of the restaurant data from the SemEval 2014 task on Aspect-based Sentiment Analysis.

(c) Wiltrud Kessler


In [0]:
from google.colab import drive
drive.mount('/content/drive')

In [0]:
# Import pytorch and torchtext libraries
from torchtext import data
import torch
import torch.optim as optim
import torch.nn as nn
import time
import numpy as np
from sklearn.metrics import confusion_matrix

In [0]:
# Task settings

# The data we use has sentiment polarity annotations and aspect annotations.
# There are 4 possible sentiment labels, '1', '-1', '0', and 'conflict'.
# There are two types of aspect annotations in the data, 'category' (5 different aspect categories) and 'terms' (actual aspect words).
# Chose here which setting to run: 
# Demo 1: Polarity classification -> labeltype 'category', use_aspect_label False. Play around with polarities, if you like.
# Demo 2: Aspect category classification -> labeltype 'category', use_aspect_label True.
# Demo 3: Aspect term classification -> labeltype 'term', use_aspect_label True.
labeltype = 'category'
polarities = ['1', '-1', '0', 'conflict']
use_aspect_label = True

In [0]:
# Load the data from csv

ID = data.Field()
TEXT = data.Field()
ASPECT = data.Field()
POLARITY = data.Field()
LABEL = data.LabelField()

# Select the columns of the csv file that we want to use
# field ->   sent.id          text         ex.id          aspect           polarity
if use_aspect_label:
  fields = [(None, None), ('text', TEXT), (None, None), ('label', LABEL), (None, None)] # Use aspect as label
else:
  fields = [(None, None), ('text', TEXT), (None, None), (None, None), ('label', LABEL)] # Use polarity as label

# The data is already split into training/validation/test to load with the corresponding names
prefix = 'semeval2014_restaurants_' + labeltype + "_" + ".".join(polarities)
print(f'Loading data from {prefix}')
train_data, valid_data, test_data = data.TabularDataset.splits(
                                        path = '/content/drive/My Drive/semeval',
                                        train = prefix + '_train.csv',
                                        validation = prefix + '_val.csv',
                                        test = prefix + '_test.csv',
                                        format = 'csv',
                                        fields = fields,
                                        skip_header = True
)

# Check if we loaded the right data by the number of examples
print(f'Number of training examples: {len(train_data)}')
print(f'Number of validation examples: {len(valid_data)}')
print(f'Number of testing examples: {len(test_data)}')

# Print some examples
print(vars(train_data.examples[1]))
print(vars(valid_data.examples[0]))
print(vars(test_data.examples[0]))

In [0]:
# Build the vocabulary

BATCH_SIZE = 64
MAX_VOCAB_SIZE = 25_000
MAX_LABEL_SIZE = 25_000 # take all

# Build the vocabulary only over the training data (test data is unknown)
TEXT.build_vocab(train_data, max_size = MAX_VOCAB_SIZE)
LABEL.build_vocab(train_data, max_size = MAX_LABEL_SIZE)

# Look at the numbers a bit
print(f"Unique tokens in TEXT vocabulary: {len(TEXT.vocab)}")
print(f"Most frequent vocabulary words: {TEXT.vocab.freqs.most_common(20)}")

print(f"Unique tokens in LABEL vocabulary: {len(LABEL.vocab)}")
print(f"Classes distribution: {LABEL.vocab.freqs}")

if use_aspect_label and labeltype == 'term':
  print(f"Most frequent class words: {LABEL.vocab.freqs.most_common(MAX_LABEL_SIZE)}")
  num = sum([x[1] for x in LABEL.vocab.freqs.most_common(MAX_LABEL_SIZE)])
  if MAX_LABEL_SIZE < len(LABEL.vocab):
    print(f"Words with real label: {num} Words with default label {len(train_data)-num}")

# Move computations to GPU if available
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# The algorithm needs to be able to iterate over the data later on,
# these iterators are defined here
train_iterator, valid_iterator, test_iterator = data.BucketIterator.splits(
    (train_data, valid_data, test_data), 
    batch_size = BATCH_SIZE,
    sort_key=lambda x: len(x.text),
    sort_within_batch=False,
    device = device)

In [0]:
# Define the machine learning (Recurrent Neural Network and evaulation)

# The RNN itself
class RNN(nn.Module): 
    def __init__(self, input_dim, embedding_dim, hidden_dim, output_dim):
        super().__init__()
        self.embedding = nn.Embedding(input_dim, embedding_dim)
        self.rnn = nn.RNN(embedding_dim, hidden_dim)
        self.fc = nn.Linear(hidden_dim, output_dim)
        
    def forward(self, text):
        embedded = self.embedding(text)
        output, hidden = self.rnn(embedded)
        assert torch.equal(output[-1,:,:], hidden.squeeze(0))
        return self.fc(hidden.squeeze(0))

# Define the metric for evaluation = accuracy
def categorical_accuracy(preds, y, labels):
    max_preds = preds.argmax(dim = 1, keepdim = True) # get the index of the max probability
    correct = max_preds.squeeze(1).eq(y) # check if it is correct
    return correct.sum() / torch.FloatTensor([y.shape[0]]), confusion_matrix(y, max_preds, labels=labels)

# Define training of the model
def train(model, iterator, optimizer, criterion):
    
    epoch_loss = 0
    epoch_acc = 0
    epoch_matrix = 0
    model.train() # put the model into training mode

    for batch in iterator:
        optimizer.zero_grad() # remove gradients from last round
        predictions = model(batch.text)
        loss = criterion(predictions, batch.label)
        acc, matrix = categorical_accuracy(predictions, batch.label, range(0,model.fc.out_features))
        loss.backward()
        optimizer.step()
        epoch_loss += loss.item()
        epoch_acc += acc.item()
        epoch_matrix += matrix
        
    return epoch_loss / len(iterator), epoch_acc / len(iterator), epoch_matrix

# Define evaluation of the model
def evaluate(model, iterator, criterion):
    
    epoch_loss = 0
    epoch_acc = 0    
    epoch_matrix = 0
    model.eval() # put the model into evaluation mode (no weights are changed)
    
    with torch.no_grad():    # enhance efficiency by telling PyTorch not to update gradients
        for batch in iterator:
            predictions = model(batch.text)
            loss = criterion(predictions, batch.label)
            acc, matrix = categorical_accuracy(predictions, batch.label, range(0,model.fc.out_features))
            epoch_loss += loss.item()
            epoch_acc += acc.item()
            epoch_matrix += matrix
        
    return epoch_loss / len(iterator), epoch_acc / len(iterator), epoch_matrix

# Debug function that counts the parameters of the model
def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)

# Debug function to show how much time one training iteration takes
def epoch_time(start_time, end_time):
    elapsed_time = end_time - start_time
    elapsed_mins = int(elapsed_time / 60)
    elapsed_secs = int(elapsed_time - (elapsed_mins * 60))
    return elapsed_mins, elapsed_secs


In [0]:
# Actually define and train the model

INPUT_DIM = len(TEXT.vocab) # each word is an input dimension
EMBEDDING_DIM = 100 # this number falls from the sky and may be tuned ;)
HIDDEN_DIM = 256 # this number falls from the sky and may be tuned ;)
OUTPUT_DIM = len(LABEL.vocab) # each label is an output dimension
N_EPOCHS = 10 # How often to iterate through all examples

# Get an instance of our RNN class
model = RNN(INPUT_DIM, EMBEDDING_DIM, HIDDEN_DIM, OUTPUT_DIM)
print(model)
print(f'The model has {count_parameters(model):,} trainable parameters')

# Set other parameters for the network (optimizer, loss function)
optimizer = optim.SGD(model.parameters(), lr=1e-3)
criterion = nn.CrossEntropyLoss()

# Move calculation to GPU, if we have one
model = model.to(device)
criterion = criterion.to(device)

best_valid_loss = float('inf')

# Train the model a few times on the training data and evaluate it
# on the validation data. Save the best model.
print("Labels (in order): " + str(LABEL.vocab.itos))
for epoch in range(N_EPOCHS):

    start_time = time.time()
    
    train_loss, train_acc, train_matrix = train(model, train_iterator, optimizer, criterion)
    valid_loss, valid_acc, valid_matrix = evaluate(model, valid_iterator, criterion)
    
    end_time = time.time()

    epoch_mins, epoch_secs = epoch_time(start_time, end_time)

    if valid_loss < best_valid_loss:
        best_valid_loss = valid_loss
        torch.save(model.state_dict(), 'absa1.pt')
    
    print(f'Epoch: {epoch+1:02} | Epoch Time: {epoch_mins}m {epoch_secs}s')
    print(f'\tTrain Loss: {train_loss:.3f}      | Train Accuracy: {train_acc*100:.2f}%')
    print(f'\tValidation Loss: {valid_loss:.3f} | Validation Accuracy: {valid_acc*100:.2f}%')


In [0]:
# Evaluate the model on the test data

model.load_state_dict(torch.load('absa1.pt'))

test_loss, test_acc, test_matrix = evaluate(model, test_iterator, criterion)

print(f'Test Accuracy: {test_acc*100:.2f}%')

In [0]:
# Try out a few examples by hand

from nltk.tokenize import WordPunctTokenizer

# Sentence tokenization (the same that has been done for the train and test data)
def custom_tokenize(text):
    tokenizer = WordPunctTokenizer()
    tokens = tokenizer.tokenize(text)
    words = [word for word in tokens if word.isalnum()]
    return words

# Give the sentence to the model and receive a prediction for it
def predict_sentiment(model, sentence):
    model.eval() # Put model in eval mode
    tokenized = custom_tokenize(sentence)
    indexed = [TEXT.vocab.stoi[t] for t in tokenized]
    length = [len(indexed)]
    tensor = torch.LongTensor(indexed).to(device)
    tensor = tensor.unsqueeze(1)
    predictions = torch.sigmoid(model(tensor))
    max_prediction = predictions.argmax(dim = 1)
    return predictions.tolist(), max_prediction.item()

# POLARITY examples
if not use_aspect_label:
  # Label here is human-readable, i.e. 0 is neutral, 1 is positive, -1 is negative
  sentences = [
      ("The food was delicious.", "1"),
      ("This is by far my favorite place in the neighborhood", "1"),
      ("The sushi was awful!", "-1"),
      ("Service was prompt, friendly and great.", "1"),
      ("The website and rating makes this place look wonderful but in reality it was very disappointing.", "conflict"),
      ("I know because I live nearby.", "0")
  ]

# ASPECT CATEGORY examples
if labeltype == 'category' and use_aspect_label:
   sentences = [
      ("The food was delicious.", "food"),
      ("This is by far my favorite place in the neighborhood", "anecdotes miscellaneous"),
      ("The sushi was awful!", "food"),
      ("Service was prompt, friendly and great.", "service"),
      ("in the neighborhood it is well worth the price you pay for them.", "price")
   ]

# ASPECT TERM examples
if labeltype == 'term' and use_aspect_label:
   sentences = [
      ("The food was delicious.", "food"),
      ("The sushi was awful!", "sushi"),
      ("My pick for best pizza restaurant anywhere!", "pizza"),
      ("The atmosphere isn't the greatest , but I suppose that's how they keep the prices down .", "atmosphere"),
      ("The atmosphere isn't the greatest , but I suppose that's how they keep the prices down .", "prices"),
      ("the desert was good.", "desert")
   ]

for s in sentences:
   prediction = predict_sentiment(model, s[0])
   predicted_label = LABEL.vocab.itos[prediction[1]]
   print(f'Sentence: {s[0]}\n   {str(s[1]) == predicted_label}! - Gold label: {s[1]} - Predicted label: {predicted_label}') # Probabilities: {prediction[0]}

In [0]:
s = ("Service was very prompt but slightly rushed.", "service")
prediction = predict_sentiment(model, s[0])
predicted_label = LABEL.vocab.itos[prediction[1]]
print(f'Sentence: {s[0]}\n   {str(s[1]) == predicted_label}! - Gold label: {s[1]} - Predicted label: {predicted_label}')