## fine tune bert model for custom dataset

### 1. install libraries

In [None]:
! pip install transformers

In [None]:
! pip install imbalanced-learn

### 2. load/define data set

In [2]:
import pandas as pd
from sklearn.model_selection import train_test_split
from imblearn.over_sampling import RandomOverSampler
from transformers import AutoTokenizer
import numpy as np
DATA_DIR = "./data"


def applylimit(df: pd.DataFrame, n: int = 2) -> pd.DataFrame:
    """
    :param df: data frame with 'label' column
    :param n: min number of samples required
    :return: data frame with minimum number of samples per label
    """
    dfcounts = df.groupby('label').size().reset_index(name='counts')
    dfmincounts = dfcounts[dfcounts.counts > n]
    return df[df.label.isin(dfmincounts.label.tolist())]


def encodelabels(df: pd.DataFrame) -> pd.DataFrame:
    """
    :param df: data frame with 'label' column
    :return: data frame with 'intlabel' column containing encoded labels
    """
    df['intlabel'] = df['label'].rank(method='dense', ascending=False).astype(int) - 1
    return df


def loadpreprocesseddata(path: str) -> pd.DataFrame:
    """
    :param path: absolute path to a csv file with 'label' column
    :return: data frame with labels mapped to
        integer values in the intlabel column
    """
    df = pd.read_csv(path)
    df = applylimit(df=df, n=2)
    df = encodelabels(df=df)
    return df


def getmapping(data: pd.DataFrame) -> dict:
    """
    :param data: data frame with 'intlabel' columns containing int values
        and 'label' column containing str values
    :return: mapping from int values to str
    """
    labelmapping = {}
    for key in data.intlabel.unique():
        value = data.loc[data['intlabel'] == key, 'label'].unique()[0]
        labelmapping[key] = value
    return labelmapping


def splitdata(data: pd.DataFrame, ratio: float = 0.2) -> dict:
    """
    :param data: data frame with 'text' and 'intlabel' columns
    :param ratio: ratio of a test set to a data set
    :return: train and test data sets
    """
    texts = data.text.tolist()
    labels = data.intlabel.tolist()
    trntxt, tsttxt, trnlbl, tstlbl = train_test_split(texts, labels, test_size=ratio)
    return {"train": {"text": trntxt, "label": trnlbl},
            "test": {"text": tsttxt, "label": tstlbl}}


def balancedata(data: dict) -> dict:
    """
    :param data: dictionary with 'text' and 'label' keys
    :return: balanced dataset
    """
    sampler = RandomOverSampler(random_state=42)
    txt = np.asarray(data["text"])
    txt = txt[:, np.newaxis]
    txt, lbl = sampler.fit_resample(txt, data["label"])
    txt = txt.flatten().tolist()
    return {"text": txt, "label": lbl}


def encodefeatures(data: dict, tokenizer) -> list:
    """
    :param data: dictionary with 'text' and 'label' keys
    :param tokenizer: encode text into vectors with integer values
    :return: list of dicts with encoded data
    """
    encodings = tokenizer(data["text"], truncation=True, padding=True)
    zipped = zip(data["label"], encodings['input_ids'], encodings['attention_mask'])
    return [{'label': label,
             'input_ids': input_id,
             'attention_mask': attention_mask} for label, input_id, attention_mask in zipped]


def countlabels(data: dict) -> pd.DataFrame:
    """
    :param data: dictionary with a 'label' key and one feature key
    :return: data frame with 'counts' column containing
        number of samples per label
    """
    df = pd.DataFrame(data)
    return df.groupby('label').size().reset_index(name='counts')


class DataManager:
    def __init__(self, path: str, tokenizer):
        """
        :param path: relative to a data folder path to a csv file with two columns 'text', 'label'
        :param tokenizer: encode text into vectors with integer values.
            loaded with from_pretrained() function for a model that is about to be tuned
        """
        self.data = loadpreprocesseddata(path=f"{DATA_DIR}/{path}")
        self.labelmapping = getmapping(data=self.data)
        self.nlabels = len(self.labelmapping.values())
        self.tokenizer = tokenizer
        self.datasets = {}
        self.trainset = []
        self.testset = []
        self.resamplesets()

    def reloaddata(self, path: str):
        """
        Reload and preprocess again raw data

        :param path: relative to a data folder path to a csv file with two columns 'text', 'label'
        """
        self.data = loadpreprocesseddata(path=f"{DATA_DIR}/{path}")
        self.labelmapping = getmapping(data=self.data)
        self.nlabels = len(self.labelmapping.values())
        self.resamplesets()

    def resamplesets(self):
        """
        Randomly split data into train and test sets
        """
        self.datasets = splitdata(data=self.data)
        self.datasets["train"] = balancedata(data=self.datasets["train"])
        self.trainset = encodefeatures(data=self.datasets["train"], tokenizer=self.tokenizer)
        self.testset = encodefeatures(data=self.datasets["test"], tokenizer=self.tokenizer)

    def getdistribution(self, name: str):
        """
        :param name: name of a subset: train/test
        :return: data frame containing number of
            samples per label in a train dataset
        """
        return countlabels(data=self.datasets[name])


### 4. load pretrained model

In [5]:
from transformers import AutoTokenizer
from transformers import AutoModelForSequenceClassification as AMSC
from transformers import TrainingArguments, Trainer
import numpy as np
import time
import matplotlib.pyplot as plt
CONFIG_DIR = "./config"


def showlearningcurve(loss: list, evalloss: list):
    """
    :param loss: list of train loss values
    :param evalloss: list of evaluation loss values
    """
    plt.figure()
    plt.xlabel("epoch")
    plt.ylabel("loss")
    epochs = np.arange(len(loss))
    plt.plot(epochs, loss, color='b')
    plt.plot(epochs, evalloss, color='r')
    plt.legend(['train loss', 'test loss'])
    plt.show()


class FineTuner:
    def __init__(self, config: dict):
        """
        :param config: configuration with training parameters.
            required are: 'modelname', 'datapath'
        """
        self.config = config
        self.tokenizer = AutoTokenizer.from_pretrained(self.config["modelname"])
        self.dm = DataManager(path=self.config["datapath"], tokenizer=self.tokenizer)
        self.model = AMSC.from_pretrained(self.config["modelname"], num_labels=self.dm.nlabels)
        self.args = self.getargs()
        self.trainer = self.gettrainer()

    def reloaddata(self):
        """
        Reload and preprocess raw data
        """
        self.dm.reloaddata(path=self.config["datapath"])

    def resample(self):
        """
        Randomly resample train and test sets
        """
        self.dm.resamplesets()

    def reloadmodel(self):
        """
        Reload model for fine tuning
        """
        self.model = AMSC.from_pretrained(self.config["modelname"], num_labels=self.dm.nlabels)

    def getargs(self) -> TrainingArguments:
        """
        :return: configured training arguments
        """
        return TrainingArguments(
            output_dir="./tunedbert",
            do_eval=True,
            evaluation_strategy="epoch",
            learning_rate=self.config["lr"],
            per_device_train_batch_size=16,
            per_device_eval_batch_size=16,
            logging_strategy="epoch",
            num_train_epochs=self.config["nepochs"],
            weight_decay=0.01,
        )

    def gettrainer(self) -> Trainer:
        """
        :return: configured trainer
        """
        return Trainer(
            model=self.model,
            args=self.args,
            train_dataset=self.dm.trainset,
            eval_dataset=self.dm.testset,
            tokenizer=self.tokenizer,
        )

    def train(self, learningcurve: bool = False) -> (list, list):
        """
        :param learningcurve: if true show learning curve after training
        :return: (training loss, evaluation loss)
        """
        self.trainer.train()
        history = np.asarray(self.trainer.state.log_history[:-1])
        loss = [entry['loss'] for entry in history[::2]]
        evalloss = [entry['eval_loss'] for entry in history[1::2]]
        if learningcurve:
            showlearningcurve(loss=loss, evalloss=evalloss)
        return loss, evalloss

    def predictbatch(self, batch: list) -> np.ndarray:
        """
        :param batch: list of encoded inputs
        :return: numpy array of predicted labels
        """
        predictions = self.trainer.predict(batch)
        return np.argmax(predictions.predictions, axis=1)

    def humanpredict(self, sentence: str) -> str:
        """
        :return: predicted label
        """
        pass


### 6. Test trained model

In [7]:
import numpy as np
import yaml
import sys


def loadconfig(path: str) -> dict:
    """
    :param path: path to a configuration file
    :return: configurations as a dictionary
    """
    with open(path) as f:
        try:
            return yaml.load(stream=f, Loader=yaml.FullLoader)
        except IOError as e:
            sys.exit(f"FAILED TO LOAD CONFIG {path}: {e}")



class Evaluator:
    def __init__(self):
        self.trials = []
        config = loadconfig(path=f"{CONFIG_DIR}/finetune.yaml")
        self.tuner = FineTuner(config=config)

    def evaluate(self) -> dict:
        """
        Train and evaluate fine tuned model
        :return: dictionary with results
        """
        self.tuner.train()
        batch = self.tuner.dm.testset
        predictions = self.tuner.predictbatch(batch=batch)
        groundtruth = np.array([entry['label'] for entry in batch])
        correct = np.sum(predictions == groundtruth)
        accuracy = correct / groundtruth.shape[0]
        return {"accuracy": accuracy,
                "correct": correct,
                "total": groundtruth.shape[0],
                "predicted": predictions,
                "groundtruth": groundtruth}

    def processresults(self) -> dict:
        """
        :return: mean and total results respective to the metrics for all trials combined
        """
        results = {"accuracy": 0, "correct": 0, "total": 0}
        for trial in self.trials:
            for key in results.keys():
                results[key] += trial[key]
        results["accuracy"] /= len(self.trials)
        return results

    def runevaluation(self, n: int = 5) -> dict:
        """
        Run evaluations n times and return mean score

        :param n: number of evaluation iterations
        :return: dictionary with results
        """
        for i in range(n):
            self.trials.append(self.evaluate())
            print(f"TRIAL {i}; ACCURACY: {self.trials[-1]['accuracy']}")
            self.tuner.resample()
            self.tuner.reloadmodel()
        return self.processresults()