In [None]:
import pandas as pd
import numpy as np
import tensorflow as tf
import torch
import os
import random
import csv
import gc

In [None]:
os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID"
os.environ["CUDA_VISIBLE_DEVICES"] = ""
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device

In [None]:
random_seed = 42
torch.manual_seed(random_seed)
torch.cuda.manual_seed(random_seed)
torch.cuda.manual_seed_all(random_seed)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False
np.random.seed(random_seed)
random.seed(random_seed)

In [None]:
path = "."

output_path = path + "/output"
logging_path = path + "/logs"

model_path = path + "../model"
data_path = path + "../dataset/HUE_Summary_Retrieval"

train_data_path = data_path + "/HUE_Summary_Retrieval_train.csv"
eval_data_path = data_path + "/HUE_Summary_Retrieval_dev.csv"
test_data_path = data_path + "/HUE_Summary_Retrieval_test.csv"

In [None]:
model_map = {
    "bert-not-pretrained": {
        "tokenizer": "bert-base-multilingual-cased",
        "model": "",
        "output": output_path + "/bert-not-pretrained",
        "logs": logging_path + "/bert-not-pretrained",
    },
    "AnchiBERT": {
        "tokenizer": path + "/model/AnchiBERT", # path to AnchiBERT
        "model": path + "/model/AnchiBERT", # path to AnchiBERT
        "output": output_path + "/AnchiBERT",
        "logs": logging_path + "/AnchiBERT"
    },
    "mBERT": {
        "tokenizer": "bert-base-multilingual-cased",
        "model": "bert-base-multilingual-cased",
        "output": output_path + "/bert-base-multilingual-cased",
        "logs": logging_path + "/bert-base-multilingual-cased"
    },
    "AnchiBERT+AJD-DRS": {
        "tokenizer": model_path + "/AnchiBERT+AJD-DRS",
        "model": model_path + "/AnchiBERT+AJD-DRS",
        "output": output_path + "/AnchiBERT+AJD-DRS",
        "logs": logging_path + "/AnchiBERT+AJD-DRS"
        
    },
    "mBERT+AJD-DRS": {
        "tokenizer": model_path + "/mBERT+AJD-DRS",
        "model": model_path + "/mBERT+AJD-DRS",
        "output": output_path + "/mBERT+AJD-DRS",
        "logs": logging_path + "/mBERT+AJD-DRS"
    }
}

In [None]:
train_data = pd.read_csv(train_data_path, error_bad_lines=False)
eval_data = pd.read_csv(eval_data_path, error_bad_lines=False)
test_data = pd.read_csv(test_data_path, error_bad_lines=False)
train_data, eval_data, test_data = train_data.dropna(), eval_data.dropna(), test_data.dropna()
train_data

In [None]:
train_texts, train_labels = train_data['text'].tolist(), train_data['label'].tolist()
eval_texts, eval_labels = eval_data['text'].tolist(), eval_data['label'].tolist()
test_texts, test_labels = test_data['text'].tolist(), test_data['label'].tolist()

In [None]:
class CustomDataset(torch.utils.data.Dataset):
    def __init__(self, encodings, labels):
        self.encodings = encodings
        self.labels = labels
    def __getitem__(self, idx):
        item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}
        item['labels'] = torch.tensor(self.labels[idx])
        return item
    def __len__(self):
        return len(self.labels)

In [None]:
from sklearn.metrics import accuracy_score, recall_score, precision_score, f1_score

def compute_metrics(p):    
    pred, labels = p
    pred = np.argmax(pred, axis=1)
    accuracy = accuracy_score(y_true=labels, y_pred=pred)
    recall = recall_score(y_true=labels, y_pred=pred, average="weighted")
    precision = precision_score(y_true=labels, y_pred=pred, average="weighted")
    f1 = f1_score(y_true=labels, y_pred=pred, average="weighted")
    return {"accuracy": accuracy, "precision": precision, "recall": recall, "f1": f1}

In [None]:
from transformers import BertTokenizer, BertConfig, BertForSequenceClassification, EarlyStoppingCallback, Trainer, TrainingArguments

predict, evaluate = {k: [] for k in model_map.keys()}, {k: [] for k in model_map.keys()}

for model_name, model_dict in model_map.items():
    tokenizer = BertTokenizer.from_pretrained(model_dict["tokenizer"])
    
    train_encodings = tokenizer(train_texts, truncation=True, padding=True)
    eval_encodings = tokenizer(eval_texts, truncation=True, padding=True)
    test_encodings = tokenizer(test_texts, truncation=True, padding=True)
    
    train_dataset = CustomDataset(train_encodings, train_labels)
    eval_dataset = CustomDataset(eval_encodings, eval_labels)
    test_dataset = CustomDataset(test_encodings, test_labels)
    
    args = TrainingArguments(
        output_dir=model_dict["output"],
        overwrite_output_dir=True,
        logging_dir=model_dict["logs"],
        logging_steps=5000,
        logging_strategy="steps",
        num_train_epochs=30,
        per_device_train_batch_size=8,
        per_device_eval_batch_size=8,
        warmup_steps=5000,
        weight_decay=0.01,
        load_best_model_at_end=True,
        evaluation_strategy="steps",
        eval_steps=5000,
        save_steps=5000,
        metric_for_best_model="loss",
        do_train=True,
        do_eval=True,
        do_predict=True
    )
    if model_name == "bert-not-pretrained":
        config = BertConfig()
        config.num_labels = len(set(train_labels))
        config.vocab_size = tokenizer.vocab_size
        model = BertForSequenceClassification(config)
    else:
        model = BertForSequenceClassification.from_pretrained(model_dict["model"], num_labels=len(set(train_labels)))
    trainer = Trainer(
        model=model,
        args=args,
        callbacks = [EarlyStoppingCallback(early_stopping_patience=3)],
        compute_metrics=compute_metrics,
        train_dataset=train_dataset,
        eval_dataset=eval_dataset
    )
    
    trainer.train()
    evaluate[model_name] = trainer.evaluate()
    predict[model_name] = trainer.predict(eval_dataset)
    
    del tokenizer, model, args, trainer
    gc.collect()
    torch.cuda.empty_cache()