In [1]:
__author__ = "Federico Motta"
__copyright__ = "2023 Federico Motta <federico.motta@unimore.it>"
__license__ = """
This notebook is free software: you can redistribute it and/or
modify it under the terms of the GNU General Public License as
published by the Free Software Foundation, either version 3 of
the License, or any later version.

This notebook is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
General Public License for more details.

You should have received a copy of the GNU General Public License
along with this notebook. If not, see http://www.gnu.org/licenses
"""
__version__ = "2023-12-06"

## Overview

This Jupyter notebook implements an example of supervised sentiment
analysis in a ternary label setting (positive, negative, neutral).

The ultimate goal is to make accurate predictions in multiple domains.

Three example datasets will be used:

- DynaSent Round 1
- DynaSent Round 2
- Stanford Sentiment Treebank

Two of them will be used as training set, the remaining one as a test set

In [2]:
# from sklearn.feature_extraction import DictVectorizer
# from sklearn.linear_model import LogisticRegression
from collections import Counter
from datasets import load_dataset
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
from transformers import (
    BertForSequenceClassification,
    BertTokenizerFast,
    RobertaForSequenceClassification,
    RobertaTokenizerFast,
    Trainer,
    TrainingArguments,
)
import numpy as np
import pandas as pd
import torch

ModuleNotFoundError: No module named 'datasets'

## Datasets

The [DynaSent dataset](https://huggingface.co/datasets/dynabench/dynasent) is a
ternary sentiment benchmark consisting of two rounds 

### DynaSent round 1

In Round 1 are collected naturally occurred sentences from the
[Yelp Academic Dataset](https://www.yelp.com/dataset); these samples
were later labeled by crowdworkers

Hint: there are a lot of metadata, for simplicity we will focus on
just the sentences and labels

In [3]:
dynasent_r1 = load_dataset("dynabench/dynasent", "dynabench.dynasent.r1.all")

NameError: name 'load_dataset' is not defined

In [None]:
dynasent_r1

: 

Splits:

In [None]:
for splitname in ("train", "validation"):
    print(splitname)
    for k, v in Counter(dynasent_r1[splitname]["gold_label"]).most_common():
        print(f"\t{k:>16s}: {v:5d}")

: 

### DynaSent round 2

In Round 2 crowdworkers edited, through the
[Dynabench](https://dynabench.org) platform, the sentences from the
previous round in order to achieve the expression of a particular
sentiment

In [None]:
dynasent_r2 = load_dataset("dynabench/dynasent", "dynabench.dynasent.r2.all")

: 

In [None]:
for splitname in ("train", "validation"):
    print(splitname)
    for k, v in Counter(dynasent_r2[splitname]["gold_label"]).most_common():
        print(f"\t{k:>16s}: {v:5d}")

: 

### Stanford Sentiment Treebank

The [Stanford Sentiment Treebank (SST)](https://nlp.stanford.edu/sentiment/treebank.html)
consists of sentences from Rotten Tomatoes Movie Reviews. For
homogeneity with the previous datasets we will use the ternary version (SST-3).

Please note that SST samples are either labeled at a phrase-level as
well as at a sentence level; for the scope of this demo this is un
unnecessary level of detail, thus we will only use the latter ones

In [None]:
sst3 = load_dataset("SetFit/sst5")

: 

In [None]:
sst3

: 

In [None]:
for splitname in ("train", "validation"):
    print(splitname)
    for k, v in Counter(sst3[splitname]["label_text"]).most_common():
        print(f"\t{k:>16s}: {v:5d}")

: 

Since labels are not yet aligned with our ternary task, and the
dataset uses different keys from those in DynaSent; first we need to
homogenize the dataset

In [None]:
for splitname in ("train", "validation", "test"):
    dist = [s.split(" ")[-1] for s in sst3[splitname]["label_text"]]
    sst3[splitname] = sst3[splitname].add_column("gold_label", dist)
    sst3[splitname] = sst3[splitname].add_column("sentence", sst3[splitname]["text"])

: 

In [None]:
for splitname in ("train", "validation"):
    print(splitname)
    for k, v in Counter(sst3[splitname]["gold_label"]).most_common():
        print(f"\t{k:>16s}: {v}")

: 

In [None]:
training_set = (
    pd.concat(
        [
            dynasent_r1["train"].to_pandas().loc[:, ["sentence", "gold_label"]],
            dynasent_r2["train"].to_pandas().loc[:, ["sentence", "gold_label"]],
        ],
        axis=0,
    )
    .rename(columns={"sentence": "text", "gold_label": "label"})
    .astype({"text": "string"})
    .drop_duplicates(subset="text")
    .reset_index(drop=True)
    .iloc[:10000, :]
)

: 

In [None]:
training_set

: 

In [None]:
test_set = (
    pd.concat(
        [
            dataset.to_pandas().loc[:, ["text", "gold_label"]]
            for _, dataset in sst3.items()
        ],
        axis=0,
    )
    .rename(columns={"gold_label": "label"})
    .astype({"text": "string"})
    .drop_duplicates(subset="text")
    .reset_index(drop=True)
    .iloc[:1000, :]
)

: 

In [None]:
test_set

: 

In [None]:
sentiment_mapping = dict(negative=-1, neutral=0, positive=1)
assert all(
    (
        l in sentiment_mapping
        for _df in (training_set, test_set)
        for l in _df["label"].unique()
    )
)

: 

In [None]:
training_set = training_set.assign(
    label=lambda _df: _df["label"].replace(sentiment_mapping)
)
training_set

: 

In [None]:
test_set = test_set.assign(
    label=lambda _df: _df["label"].replace(sentiment_mapping)
)
test_set

: 

In [None]:
df_train, df_valid = train_test_split(training_set, train_size=0.8)
df_test = test_set
del training_set, test_set

: 

In [None]:
for _df in (df_train, df_valid, df_test):
    print(_df.shape)

: 

In [None]:
bert_tokenizer = BertTokenizerFast.from_pretrained("bert-base-cased")
roberta_tokenizer = RobertaTokenizerFast.from_pretrained("roberta-base")

: 

In [None]:
encodings_train_bert = bert_tokenizer(
    df_train["text"].to_list(), truncation=True, padding=True
)
encodings_valid_bert = bert_tokenizer(
    df_valid["text"].to_list(), truncation=True, padding=True
)
encodings_test_bert = bert_tokenizer(
    df_test["text"].to_list(), truncation=True, padding=True
)

: 

# Ripetere per esercizio lo stesso procedimento con RoBERTa

In [None]:
# TODO

: 

In [None]:
# Create a dataloader
class MyDataset(torch.utils.data.Dataset):
    def __init__(self, encodings, labels):
        self.encodings = encodings
        self.labels = labels

    def __getitem__(self, idx):
        item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}
        item['labels'] = torch.tensor(self.labels[idx])
        return item

    def __len__(self):
        return len(self.labels)

: 

In [None]:
dataset_train_bert = MyDataset(encodings_train_bert, df_train["label"].to_list())
dataset_valid_bert = MyDataset(encodings_valid_bert, df_valid["label"].to_list())
dataset_test_bert = MyDataset(encodings_test_bert, df_test["label"].to_list())

: 

In [None]:
def train_model(model, train_dataset, val_dataset, model_path):
    training_args = TrainingArguments(
        output_dir="./results",
        num_train_epochs=1,  # 5,
        per_device_train_batch_size=16,
        per_device_eval_batch_size=16,  # 64,
        warmup_steps=10,  # 500,
        weight_decay=0.10,  # 0.01,
        logging_dir="./logs",
    )
    trainer = Trainer(
        model=model,
        args=training_args,
        train_dataset=train_dataset,
        eval_dataset=val_dataset,
    )
    trainer.train()
    model.save_pretrained(model_path)

: 

In [None]:
bert_model = BertForSequenceClassification.from_pretrained(
    "bert-base-cased", num_labels=len(sentiment_mapping.values())
)

: 

In [None]:
# TODO
# )

: 

In [None]:
model_path = "./model"
train_model(
    bert_model, dataset_train_bert, dataset_valid_bert, f"{model_path}/bert_model"
)

: 

In [None]:
# TODO
# )

: 

In [None]:
def compute_metrics(pred):
    labels = pred.label_ids
    preds = pred.predictions.argmax(-1)
    precision = precision_score(labels, preds, average="weighted")
    recall = recall_score(labels, preds, average="weighted")
    f1 = f1_score(labels, preds, average="weighted")
    acc = accuracy_score(labels, preds)

    em = np.mean([np.array_equal(p, l) for p, l in zip(preds, labels)])
    return {
        "accuracy": acc,
        "f1": f1,
        "precision": precision,
        "recall": recall,
        "exact_match": em,
    }


def evaluate_model(model, dataset):
    trainer = Trainer(model, compute_metrics=compute_metrics)
    return trainer.evaluate(dataset)

: 

In [None]:
bert_result = evaluate_model(bert_model, dataset_test_bert)
print("BERT Evaluation Result: ", bert_result)

: 

In [None]:
# TODO


: 

In [None]:
bert_model.state_dict()

: 

In [None]:
# TODO


: 