<a href="https://colab.research.google.com/github/LorenzoAgnolucci/BERT_for_ABSA/blob/master/BERT_for_ABSA.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install transformers

In [None]:
from google.colab import drive
drive.mount('/gdrive', force_remount=True)



---


# BERT-pair


---



In [None]:
#@title Choose a dataset and a task { run: "auto", display-mode: "form" }
base_dir = "/gdrive/MyDrive/Machine_Learning" #@param {type:"string"}
dataset_type = "sentihood" #@param ["sentihood", "semeval2014"]
task = "NLI_M" #@param ["QA_M", "NLI_M", "QA_B", "NLI_B"]

In [None]:
import pandas as pd
import random


if dataset_type == "sentihood":
    id2label = {0: "None", 1: "Positive", 2: "Negative"}
    label2id = {"None": 0, "Positive": 1, "Negative": 2}
elif dataset_type == "semeval2014":
    id2label = {0: "positive", 1: "neutral", 2: "negative", 3: "conflict", 4: "none"}
    label2id = {"positive": 0, "neutral" : 1, "negative" : 2, "conflict": 3, "none": 4}

if task.endswith("B"):
    num_classes = 2
else:
    if dataset_type == "sentihood":
        num_classes = 3
    elif dataset_type == "semeval2014":
        num_classes = 5


def get_dataset(path):
    original_sentences = []
    auxiliary_sentences = []
    labels = []
    data = pd.read_csv(path, header=0, sep="\t").values.tolist()
    for row in data:
        original_sentences.append(row[1])
        auxiliary_sentences.append(row[2])
        labels.append(row[3])
    return original_sentences, auxiliary_sentences, labels



train_original_sentences, train_auxiliary_sentences, train_labels = get_dataset(f"{base_dir}/data/{dataset_type}/BERT-pair/train_{task}.csv")
if dataset_type == "sentihood":
    val_original_sentences, val_auxiliary_sentences, val_labels = get_dataset(f"{base_dir}/data/{dataset_type}/BERT-pair/dev_{task}.csv")
elif dataset_type == "semeval2014":
    val_original_sentences, val_auxiliary_sentences, val_labels = get_dataset(f"{base_dir}/data/{dataset_type}/BERT-pair/test_{task}.csv")
test_original_sentences, test_auxiliary_sentences, test_labels = get_dataset(f"{base_dir}/data/{dataset_type}/BERT-pair/test_{task}.csv")

In [None]:
from transformers import BertTokenizer

tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')

train_encodings = tokenizer(train_original_sentences, train_auxiliary_sentences, truncation=True, padding=True)
val_encodings = tokenizer(val_original_sentences, val_auxiliary_sentences, truncation=True, padding=True)
test_encodings = tokenizer(test_original_sentences, test_auxiliary_sentences, truncation=True, padding=True)

In [None]:
import torch

class ABSA_Dataset(torch.utils.data.Dataset):
    def __init__(self, encodings, labels):
        self.encodings = encodings
        self.labels = labels

    def __getitem__(self, idx):
        item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}
        item['labels'] = torch.tensor(self.labels[idx])
        return item

    def __len__(self):
        return len(self.labels)

train_dataset = ABSA_Dataset(train_encodings, train_labels)
val_dataset = ABSA_Dataset(val_encodings, val_labels)
test_dataset = ABSA_Dataset(test_encodings, test_labels)

In [None]:
import sys
if base_dir not in sys.path:
    sys.path.insert(0, f'{base_dir}/')
import numpy as np
from scipy.special import softmax
import evaluation


def get_test_labels(data_dir, dataset_type):
    original_sentences = []
    auxiliary_sentences = []
    labels = []
    data = pd.read_csv(f"{data_dir}/{dataset_type}/BERT-pair/test_NLI_M.csv", header=0, sep="\t").values.tolist()
    for row in data:
        labels.append(row[3])
    return labels


def get_predictions(data, task, dataset_type):
    predicted_labels = []
    scores = []
    if task.endswith("B"):
        if dataset_type == "sentihood":
            if task.endswith("B"):
                count_aspect_rows = 0
                current_aspect_scores = []
                for row in data:
                    current_aspect_scores.append(row[2])
                    count_aspect_rows += 1
                    if count_aspect_rows % 3 == 0:
                        sum_current_aspect_scores = np.sum(current_aspect_scores)
                        current_aspect_scores = [score / sum_current_aspect_scores for score in current_aspect_scores]
                        scores.append(current_aspect_scores)
                        predicted_labels.append(np.argmax(current_aspect_scores))
                        current_aspect_scores = []
        elif dataset_type == "semeval2014":
            if task.endswith("B"):
                count_aspect_rows = 0
                current_aspect_scores = []
                for row in data:
                    current_aspect_scores.append(row[2])
                    count_aspect_rows += 1
                    if count_aspect_rows % 5 == 0:
                        sum_current_aspect_scores = np.sum(current_aspect_scores)
                        current_aspect_scores = [score / sum_current_aspect_scores for score in current_aspect_scores]
                        scores.append(current_aspect_scores)
                        predicted_labels.append(np.argmax(current_aspect_scores))
                        current_aspect_scores = []
    return predicted_labels, scores


if dataset_type == "sentihood":
    def compute_metrics(predictions):
        scores = [softmax(prediction) for prediction in predictions[0]]
        predicted_labels = [np.argmax(x) for x in scores]
        if task.endswith("B"):
            data = np.insert(scores, 0, predicted_labels, axis=1)
            predicted_labels, scores = get_predictions(data, task, dataset_type)
        test_labels = get_test_labels(f"{base_dir}/data", dataset_type)
        metrics = {}
        metrics["strict_acc"] = evaluation.compute_sentihood_aspect_strict_accuracy(test_labels, predicted_labels)
        metrics["F1"] = evaluation.compute_sentihood_aspect_macro_F1(test_labels, predicted_labels)
        metrics["aspect_AUC"] = evaluation.compute_sentihood_aspect_macro_AUC(test_labels, scores)
        sentiment_macro_AUC, sentiment_accuracy = evaluation.compute_sentihood_sentiment_classification_metrics(test_labels, scores)
        metrics["sentiment_acc"] = sentiment_accuracy
        metrics["sentiment_AUC"] = sentiment_macro_AUC
        return metrics

elif dataset_type == "semeval2014":
    def compute_metrics(predictions):
        scores = [softmax(prediction) for prediction in predictions[0]]
        predicted_labels = [np.argmax(x) for x in scores]
        if task.endswith("B"):
            data = np.insert(scores, 0, predicted_labels, axis=1)
            predicted_labels, scores = get_predictions(data, task, dataset_type)
        test_labels = get_test_labels(f"{base_dir}/data", dataset_type)
        metrics = {}
        p, r, f1 = evaluation.compute_semeval_PRF(test_labels, predicted_labels)
        metrics["P"] = p
        metrics["R"] = r
        metrics["F1"] = f1
        metrics["4-way"] = evaluation.compute_semeval_accuracy(test_labels, predicted_labels, scores, 4)
        metrics["3-way"] = evaluation.compute_semeval_accuracy(test_labels, predicted_labels, scores, 3)
        metrics["binary"] = evaluation.compute_semeval_accuracy(test_labels, predicted_labels, scores, 2)
        return metrics

In [None]:
from transformers import BertForSequenceClassification, Trainer, TrainingArguments, BertConfig

from transformers import logging
logging.set_verbosity_debug()


epochs = 4
batch_size = 24
num_steps = len(train_dataset) * epochs // batch_size
warmup_steps = num_steps // 10  # 10% of the training steps
save_steps = num_steps // epochs    # Save a checkpoint at the end of each epoch


training_args = TrainingArguments(
    output_dir = f'{base_dir}/models/{dataset_type}/BERT-pair/{task}/',          
    num_train_epochs = epochs,              
    per_device_train_batch_size = batch_size,  
    per_device_eval_batch_size = batch_size,   
    warmup_steps = warmup_steps,   
    weight_decay = 0.01,               
    logging_dir = f'{base_dir}/logs/{dataset_type}/BERT-pair/{task}/',            
    logging_steps = 10,
    evaluation_strategy = 'epoch',
    learning_rate = 2e-5,
    save_steps = save_steps
)

config = BertConfig.from_pretrained(
    'bert-base-uncased',
    architectures = ['BertForSequenceClassification'],
    hidden_size = 768,
    num_hidden_layers = 12,
    num_attention_heads = 12,
    hidden_dropout_prob = 0.1,
    num_labels = num_classes
)    

load_finetuned_model = True
if not load_finetuned_model:
    model = BertForSequenceClassification.from_pretrained('bert-base-uncased', config=config)

    trainer = Trainer(
        model=model,                         
        args=training_args,                  
        train_dataset=train_dataset,         
        eval_dataset=val_dataset,
        compute_metrics=compute_metrics             
    )
    trainer.train()

    model.save_pretrained(f"{base_dir}/models/{dataset_type}/BERT-pair/{task}/last_step")

else:
    model = BertForSequenceClassification.from_pretrained(f"{base_dir}/models/{dataset_type}/BERT-pair/{task}/last_step")

    trainer = Trainer(
        model=model,                         
        args=training_args,                  
        train_dataset=train_dataset,         
        eval_dataset=val_dataset,
        compute_metrics=compute_metrics             
    )


In [None]:
evaluation_result = trainer.evaluate(test_dataset)
print(evaluation_result)

In [None]:
import pandas as pd


results = trainer.predict(test_dataset)

scores = [softmax(prediction) for prediction in results.predictions]
predicted_labels = [np.argmax(x) for x in scores]

In [None]:
csv_output = np.insert(scores, 0, predicted_labels, axis=1)
df = pd.DataFrame(csv_output)
df[0] = df[0].astype("int")
if task.endswith("B"):
    header = ["predicted_label", "no", "yes"]
else:
    header = ["predicted_label"]
    for label in label2id.keys():
        header.append(label)
df.to_csv(f"{base_dir}/results/{dataset_type}/BERT-pair/{task}.csv", index=False, header=header)

In [None]:
import sys
if base_dir not in sys.path:
    sys.path.insert(0, f'{base_dir}/')
import evaluation
evaluation.main(task, dataset_type, f"{base_dir}/data", f"{base_dir}/results")



---


# BERT-single


---



In [None]:
#@title Choose a dataset and a task { run: "auto", display-mode: "form" }
base_dir = "/gdrive/MyDrive/Machine_Learning" #@param {type:"string"}
dataset_type = "semeval2014" #@param ["sentihood", "semeval2014"]
task = "single" #@param ["single"]

In [None]:
import pandas as pd
import random


if dataset_type == "sentihood":
    id2label = {0: "None", 1: "Positive", 2: "Negative"}
    label2id = {"None": 0, "Positive": 1, "Negative": 2}
elif dataset_type == "semeval2014":
    id2label = {0: "positive", 1: "neutral", 2: "negative", 3: "conflict", 4: "none"}
    label2id = {"positive": 0, "neutral" : 1, "negative" : 2, "conflict": 3, "none": 4}

if dataset_type == "sentihood":
    num_classes = 3
    locations = ["location_1_", "location_2_"]
    aspects = ["general", "price", "safety", "transit location"]
elif dataset_type == "semeval2014":
    num_classes = 5
    locations = [""]
    aspects = ["ambience", "anecdotes", "food", "price", "service"]


def get_dataset(path):
    original_sentences = []
    labels = []
    data = pd.read_csv(path, header=0, sep="\t").values.tolist()
    for row in data:
        original_sentences.append(row[1])
        labels.append(row[3])
    return original_sentences, labels


train_original_sentences = {}
train_labels = {}
val_original_sentences = {}
val_labels = {}
test_original_sentences = {}
test_labels = {}

for location in locations:
    train_original_sentences[location] = {}
    train_labels[location] = {}
    val_original_sentences[location] = {}
    val_labels[location] = {}
    test_original_sentences[location] = {}
    test_labels[location] = {}
    for aspect in aspects:
        train_original_sentences[location][aspect], train_labels[location][aspect] = get_dataset(f"{base_dir}/data/{dataset_type}/BERT-single/{location}{aspect}/train.csv")
        if dataset_type == "sentihood":
            val_original_sentences[location][aspect], val_labels[location][aspect] = get_dataset(f"{base_dir}/data/{dataset_type}/BERT-single/{location}{aspect}/dev.csv")
        elif dataset_type == "semeval2014":
            val_original_sentences[location][aspect], val_labels[location][aspect] = get_dataset(f"{base_dir}/data/{dataset_type}/BERT-single/{location}{aspect}/test.csv")
        test_original_sentences[location][aspect], test_labels[location][aspect] = get_dataset(f"{base_dir}/data/{dataset_type}/BERT-single/{location}{aspect}/test.csv")

In [None]:
from transformers import BertTokenizer

tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')

train_encodings = {}
val_encodings = {}
test_encodings = {}
for location in locations:
    train_encodings[location] = {}
    val_encodings[location] = {}
    test_encodings[location] = {}
    for aspect in aspects:
        train_encodings[location][aspect] = tokenizer(train_original_sentences[location][aspect], truncation=True, padding=True)
        val_encodings[location][aspect] = tokenizer(val_original_sentences[location][aspect], truncation=True, padding=True)
        test_encodings[location][aspect] = tokenizer(test_original_sentences[location][aspect], truncation=True, padding=True)

In [None]:
import torch

class ABSA_Dataset(torch.utils.data.Dataset):
    def __init__(self, encodings, labels):
        self.encodings = encodings
        self.labels = labels

    def __getitem__(self, idx):
        item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}
        item['labels'] = torch.tensor(self.labels[idx])
        return item

    def __len__(self):
        return len(self.labels)


train_dataset = {}
val_dataset = {}
test_dataset = {}
for location in locations:
    train_dataset[location] = {}
    val_dataset[location] = {}
    test_dataset[location] = {}
    for aspect in aspects:
        train_dataset[location][aspect] = ABSA_Dataset(train_encodings[location][aspect], train_labels[location][aspect])
        val_dataset[location][aspect] = ABSA_Dataset(val_encodings[location][aspect], val_labels[location][aspect])
        test_dataset[location][aspect] = ABSA_Dataset(test_encodings[location][aspect], test_labels[location][aspect])

In [None]:
from transformers import BertForSequenceClassification, Trainer, TrainingArguments, BertConfig
from transformers import logging
import gc
import pandas as pd
import numpy as np
from scipy.special import softmax


logging.set_verbosity_debug()


epochs = 4
batch_size = 24

header = ["predicted_label"]
for label in label2id.keys():
    header.append(label)

config = BertConfig.from_pretrained(
        'bert-base-uncased',
        architectures = ['BertForSequenceClassification'],
        hidden_size = 768,
        num_hidden_layers = 12,
        num_attention_heads = 12,
        hidden_dropout_prob = 0.1,
        num_labels = num_classes
    )    

for location in locations:
    for aspect in aspects:
        num_steps = len(train_dataset[location][aspect]) * epochs // batch_size
        warmup_steps = num_steps // 10  # 10% of the training steps
        save_steps = num_steps // epochs    # Save a checkpoint at the end of each epoch


        training_args = TrainingArguments(
            output_dir = f'{base_dir}/models/{dataset_type}/BERT-single/{location}{aspect}/',          
            num_train_epochs = epochs,              
            per_device_train_batch_size = batch_size,  
            per_device_eval_batch_size = batch_size,   
            warmup_steps = warmup_steps,   
            weight_decay = 0.01,               
            logging_dir = f'{base_dir}/logs/{dataset_type}/BERT-single/{location}{aspect}/',            
            logging_steps = 10,
            evaluation_strategy = 'epoch',
            learning_rate = 2e-5,
            save_steps = save_steps,
            seed=21
        )

        model = BertForSequenceClassification.from_pretrained('bert-base-uncased', config=config)

        trainer = Trainer(
            model=model,                         
            args=training_args,                  
            train_dataset=train_dataset[location][aspect],         
            eval_dataset=val_dataset[location][aspect]             
        )

        trainer.train()

        model.save_pretrained(f"{base_dir}/models/{dataset_type}/BERT-single/{location}{aspect}/last_step")

        results = trainer.predict(test_dataset[location][aspect])

        scores = [softmax(prediction) for prediction in results.predictions]
        predicted_labels = [np.argmax(x) for x in scores]

        csv_output = np.insert(scores, 0, predicted_labels, axis=1)
        df = pd.DataFrame(csv_output)
        df[0] = df[0].astype("int")
        df.to_csv(f"{base_dir}/results/{dataset_type}/BERT-single/{location}{aspect}.csv", index=False, header=header)

        del training_args
        del model
        del trainer
        del results
        del scores
        del predicted_labels
        del csv_output
        del df
        gc.collect()


In [None]:
import sys
if base_dir not in sys.path:
    sys.path.insert(0, f'{base_dir}/')
import evaluation


evaluation.main(task, dataset_type, f"{base_dir}/data", f"{base_dir}/results")

# Paper Code

In [None]:
import nltk
nltk.download('punkt')

In [None]:
!git clone https://github.com/HSLCY/ABSA-BERT-pair.git
%cd ABSA-BERT-pair/
%cd generate/
!bash make.sh sentihood
!bash make.sh semeval

In [None]:
%cd ../
!wget https://storage.googleapis.com/bert_models/2020_02_20/uncased_L-12_H-768_A-12.zip
!unzip -d uncased_L-12_H-768_A-12 uncased_L-12_H-768_A-12.zip

In [None]:
!python convert_tf_checkpoint_to_pytorch.py \
--tf_checkpoint_path uncased_L-12_H-768_A-12/bert_model.ckpt \
--bert_config_file uncased_L-12_H-768_A-12/bert_config.json \
--pytorch_dump_path uncased_L-12_H-768_A-12/pytorch_model.bin

In [None]:
!CUDA_VISIBLE_DEVICES=0 python run_classifier_TABSA.py \
--task_name sentihood_QA_M \
--data_dir data/sentihood/bert-pair/ \
--vocab_file uncased_L-12_H-768_A-12/vocab.txt \
--bert_config_file uncased_L-12_H-768_A-12/bert_config.json \
--init_checkpoint uncased_L-12_H-768_A-12/pytorch_model.bin \
--eval_test \
--do_lower_case \
--max_seq_length 512 \
--train_batch_size 8 \
--learning_rate 2e-5 \
--num_train_epochs 4.0 \
--output_dir results/ \
--seed 42

In [None]:
!python evaluation.py --task_name sentihood_QA_M --pred_data_dir results/test_ep_4.txt