In [1]:
import os
from google.colab import drive
drive.mount('/content/drive')

main_directory = os.path.join('drive/MyDrive/Colab Notebooks/projects/NER_with_BERT')
if not os.path.exists(main_directory):
    os.makedirs(main_directory)
%cd -q $main_directory

Mounted at /content/drive


In [2]:
!pip install torch~=2.0.0 transformers~=4.24.0 scikit-learn~=1.2.1 tqdm~=4.65.0 datasets~=2.13.1 pandas~=1.5.3 nltk~=3.7

Collecting transformers~=4.24.0
  Downloading transformers-4.24.0-py3-none-any.whl (5.5 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m5.5/5.5 MB[0m [31m34.8 MB/s[0m eta [36m0:00:00[0m
Collecting tqdm~=4.65.0
  Downloading tqdm-4.65.2-py3-none-any.whl (77 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m77.1/77.1 kB[0m [31m10.1 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting datasets~=2.13.1
  Downloading datasets-2.13.1-py3-none-any.whl (486 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m486.2/486.2 kB[0m [31m30.6 MB/s[0m eta [36m0:00:00[0m
Collecting huggingface-hub<1.0,>=0.10.0 (from transformers~=4.24.0)
  Downloading huggingface_hub-0.16.4-py3-none-any.whl (268 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m268.8/268.8 kB[0m [31m28.1 MB/s[0m eta [36m0:00:00[0m
Collecting tokenizers!=0.11.3,<0.14,>=0.11.1 (from transformers~=4.24.0)
  Downloading tokenizers-0.13.3-cp310-cp310-manylinux_2_1

In [3]:
import json
import os
import nltk
import pandas as pd
import pickle
import torch
import torch.nn as nn
import torch.cuda
import random
import seaborn as sns
import matplotlib.pyplot as plt

from datasets import load_dataset
from sklearn.metrics import f1_score, confusion_matrix
from torch.utils.data import DataLoader
from torch.optim import AdamW
from tqdm import tqdm
from transformers import AutoTokenizer, BertModel, DataCollatorForTokenClassification
from datasets.arrow_dataset import Dataset
from transformers.tokenization_utils_base import BatchEncoding

nltk.download('punkt')
from nltk.tokenize import word_tokenize, sent_tokenize

[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt.zip.


In [4]:
class DataProcess:
    """
    Class is utilized to load and process the Huggingface dataset, according to the provided parameters
    """

    def __init__(self, ds_name: str, model_checkpoint: str, data_path: str):
        """
        Method is utilized as an initializer for data processing object
        :param ds_name: Huggingface dataset name (as path specified in the website)
        :param model_checkpoint: Huggingface model checkpoint for the transformer/tokenizer (e.g., bert-base-cased)
        :param data_path: directory to save tokenizer
        """
        self.datasets = self.get_dataset(ds_name)
        self.tokenizer = self.set_tokenizer(data_path, model_checkpoint)

    @staticmethod
    def set_tokenizer(data_path: str, model_checkpoint: str) -> AutoTokenizer:
        """
        Method is utilized to set the tokenizer according to the given path and tokenizer checkpoint in the Huggingface.
        If one wants to use the same model, vocabulary must be same as the model was trained on (e.g., inference)
        :param data_path: directory to save tokenizer. In case of reuse of the model vocabulary info must be same
        :param model_checkpoint: Huggingface model checkpoint for the transformer/tokenizer (e.g., bert-base-cased)
        :return: saved or new-set AutoTokenizer object
        """
        check_dir(data_path)
        tokenizer_path = os.path.join(data_path, 'tokenizer')
        if not os.path.exists(tokenizer_path):
            tokenizer = AutoTokenizer.from_pretrained(model_checkpoint)
            tokenizer.save_pretrained(tokenizer_path)
        return AutoTokenizer.from_pretrained(tokenizer_path)

    @staticmethod
    def get_dataset(ds_path: str):
        """
        Method is utilized to load dataset from the given dataset path of the Huggingface dataset
        :param ds_path: path for the Huggingface dataset
        :return: Required dataset for the task
        """
        return load_dataset(ds_path)

    @staticmethod
    def align_labels_with_tokens(labels: list, word_ids: list) -> list:
        """
        Method is utilized as it was given in the Huggingface website. It is utilized to align tokenization results with
        the given tokenized data
        :param labels: labels from the originally tokenized dataset
        :param word_ids: word indexes data from the tokenization process
        :return: list of aligned labels
        """

        new_labels = []
        current_word = None
        for word_id in word_ids:
            if word_id != current_word:
                # Start of a new word!
                current_word = word_id
                label = -100 if word_id is None else labels[word_id]
                new_labels.append(label)
            elif word_id is None:
                # Special token
                new_labels.append(-100)
            else:
                # Same word as previous token
                label = labels[word_id]
                # If the label is B-XXX we change it to I-XXX
                if label % 2 == 1:
                    label += 1
                new_labels.append(label)

        return new_labels

    def tokenize_and_align_labels(self, examples: Dataset) -> BatchEncoding:
        """
        Method is utilized to create aligned labels for an instance in dataset (method is same as in Huggingface)
        :param examples: dictionary for dataset instance
        :return: dictionary for the specified data instance
        """
        tokenized_inputs = self.tokenizer(examples["tokens"],
                                          truncation=True,
                                          is_split_into_words=True)

        all_labels = examples["ner_tags"]
        new_labels = []
        for i, labels in enumerate(all_labels):
            word_ids = tokenized_inputs.word_ids(i)
            new_labels.append(self.align_labels_with_tokens(labels, word_ids))

        tokenized_inputs["labels"] = new_labels
        return tokenized_inputs

    def process(self):
        """
        Method is utilized to process all datasets according to required format
        :return: resulting datasets for the project
        """

        tokenized_datasets = self.datasets.map(
            self.tokenize_and_align_labels,
            batched=True,
            remove_columns=self.datasets['train'].column_names
        )
        return tokenized_datasets


In [5]:
class NERClassifierBERT(nn.Module):
    """
    Class is utilized to set NER classifier which utilized BERT model as an encoder
    """

    def __init__(self, hp: dict):
        """
        Method is utilized to set model layers as an initializer
        :param hp: dictionary which includes experiment parameters
        """
        super(NERClassifierBERT, self).__init__()
        self.bert = BertModel.from_pretrained(hp['model_checkpoint'])
        self.dropout = nn.Dropout(hp['dropout'])
        self.dropout2 = nn.Dropout(hp['dropout'])

        self.fc = nn.Linear(self.bert.config.hidden_size, len(hp['id2label']))

    def forward(self, input_ids: torch.Tensor, attention_mask: torch.Tensor) -> torch.Tensor:
        """
        Method is utilized as feed forward performer of the model
        :param input_ids: input indexes for the provided sequence in shape of [batch size, max length]
        :param attention_mask: attention mask for the provided sequence in shape of [batch size, max length]
        :return: predictions of the classifier in shape of [batch size, max length, number of labels]
        """
        outputs = self.bert(input_ids=input_ids, attention_mask=attention_mask)
        drop = outputs.last_hidden_state
        outputs = self.fc(drop)
        return outputs


In [6]:

class Train:
    """
    Class is utilized to load datasets and train the model for number of provided epochs.
    """

    def __init__(self, hp: dict, process: DataProcess, device: str, exp_num: int):
        """
        Method is utilized as initialized of the training object
        :param hp: hyperparameters for the model and experiment
        :param process: process object is utilized to load dataset and prepare it for training
        :param device: can be cuda or cpu
        :param exp_num: experiment number
        """
        self.process = process
        self.exp_num = exp_num
        self.hp = hp
        self.device = device

        self.datasets = process.process()
        self.collator = self.set_collator()
        self.experiment_dir = self.set_environment()
        self.classifier = self.set_model()
        self.optimizer = self.set_optimizer()
        self.loss_fn = nn.CrossEntropyLoss()

    def set_collator(self) -> DataCollatorForTokenClassification:
        """
        Method is utilized to create data collator object for the experiment
        :return: DataCollator object will be used for post-processing purposes
        """
        return DataCollatorForTokenClassification(
            tokenizer=self.process.tokenizer,
            max_length=self.hp['max_length'],
            padding='max_length'
        )

    def set_model(self) -> NERClassifierBERT:
        """
        Method is utilized to set the model
        :return: Classifier for the given experiment
        """
        return NERClassifierBERT(self.hp).to(self.device)

    def set_optimizer(self) -> AdamW:
        """
        Method is utilized to set the optimizer
        :return: Optimization object for the experiment
        """
        return AdamW(self.classifier.parameters(), lr=self.hp['learning_rate'], weight_decay=0.0001)

    def save_parameters(self, project_parameters: dict) -> None:
        """
        Method is utilized to create corresponding output folder for experiment and save project parameters in it
        :param project_parameters: dictionary in which experiment parameters are set
        :return: None
        """
        outputs_path = os.path.join(self.experiment_dir, 'outputs')
        check_dir(outputs_path)
        parameters_path = os.path.join(outputs_path, 'parameters.pickle')
        if not os.path.exists(parameters_path):
            print(f'Project Parameters for experiment {self.exp_num} were saved successfully!')
            print(f'{"<" * 20}{">" * 20} \n')
            project_parameters['hyperparams'] = self.hp
            with open(parameters_path, 'wb') as params:
                pickle.dump(project_parameters, params)

    def set_environment(self) -> str:
        """
        Method is utilized to create the experimental environment
        :return: experiment directory
        """
        results_dir = 'results'
        check_dir(results_dir)
        experiment_dir = os.path.join(results_dir, f'experiment_{self.exp_num}')
        check_dir(experiment_dir)
        return experiment_dir

    def get_data(self, ds_name: str) -> DataLoader:
        """
        Method is utilized to collect datasets according to the provided dataset type (train, validation, test)
        :param ds_name: specifies which data must be loaded
        :return: batched data for the provided dataset type
        """

        return DataLoader(dataset=self.datasets[ds_name],
                          collate_fn=self.collator,
                          shuffle=True,
                          batch_size=self.hp['batch_size'])

    def load_model_results(self) -> dict:
        """
        Method is utilized to load model results, which are collected during the training process
        :return: dictionary which holds each epoch's results
        """
        results_path = os.path.join(self.experiment_dir, 'outputs/results.pickle')
        if not os.path.exists(results_path):
            raise FileNotFoundError('No such directory, for this you need to train the model first!')

        with open(results_path, 'rb') as result_data:
            result_dict = pickle.load(result_data)
        return result_dict

    def load_model(self, epoch_choice: int) -> None:
        """
        Method is utilized to load model for the given epoch choice
        :param epoch_choice: integer specifies which epoch's model will be loaded
        :return: None
        """
        result_dict = self.load_model_results()
        data = pd.DataFrame(result_dict)
        ckpt_dir = os.path.join(self.experiment_dir, 'checkpoints')
        if not os.path.exists(ckpt_dir):
            raise FileNotFoundError('No checkpoints folder was found! Make sure that you trained the model!')

        request = data[data['epoch'] == epoch_choice]
        model_name = f"model_epoch_{epoch_choice}_f1_{request['f1_macro'].item():.4f}_" \
                     f"loss_{request['dev_loss'].item():.4f}_acc_{request['dev_accuracy'].item():.4f}"
        optim_name = f"optim_epoch_{epoch_choice}"
        model_path = os.path.join(ckpt_dir, model_name)
        if not os.path.exists(model_path):
            raise FileNotFoundError(f'Model was not trained at least for {epoch_choice} epochs or there is typo! '
                                    f'Check it first!')
        optim_path = os.path.join(ckpt_dir, optim_name)

        self.classifier.load_state_dict(torch.load(model_path))
        self.classifier.eval()
        self.optimizer.load_state_dict(torch.load(optim_path))

    def decision_maker(self, project_parameters: dict) -> int:
        """
        Method is utilized to select epoch according to the user's choices for model loading scenario
        :param project_parameters: experiment parameters in which user's choice is kept for loading decision
        :return: epoch choice according to the provided information
        """
        if project_parameters['load_best']:
            if project_parameters['epoch_choice']:
                print(
                    'WARNING: Best choice and epoch choice were made together! '
                    'In such cases best choice is prioritized!'
                )
            return self.get_best_epoch(project_parameters['best_choice'])

        else:
            return project_parameters['epoch_choice']

    def resume_training(self, project_parameters: dict) -> int:
        """
        Method is utilized to decide which epoch must be the initial for the training. If resume training was set to
        True, the recent epoch + 1 will be set as initial for training, otherwise it is zero.
        In case the parameter is False (start new training) and given experiment directory already has some relevant
        data, user will not be allowed to train the model, to prevent overwriting issues.
        :param project_parameters: experiment parameters dictionary
        :return: integer value for epoch choice as initial epoch
        """
        if project_parameters['resume_training']:
            epoch_choice = self.get_best_epoch('epoch')
            self.load_model(epoch_choice)
            return epoch_choice
        else:
            ckpt_dir = os.path.join(self.experiment_dir, 'checkpoints')
            if os.path.exists(ckpt_dir) and os.listdir(ckpt_dir):
                raise SystemError('Folder is not empty and your choice tries to overwrite, since you do not '
                                  'try to resume training! Be careful about your choice!')
            else:
                return 0

    def step_process(self, batch_data: dict, train: bool = True) -> tuple:
        """
        Method is utilized to perform training (when train is True) or validation / test (when train is False) step for
        the given batch data
        :param batch_data: dictionary includes input_ids, attention_mask and labels for the given batch
        :param train: boolean variable specifies nature of the step
        :return: tuple of loss value for the step and predictions of the model
        """
        input_ids = batch_data['input_ids'].to(self.device)
        attention_mask = batch_data['attention_mask'].to(self.device)
        targets = batch_data['labels'].to(self.device)

        outputs = self.classifier(input_ids, attention_mask)
        predictions = outputs.view(-1, outputs.shape[-1])
        ner = targets.view(-1)

        if train:
            self.optimizer.zero_grad()
            loss = self.loss_fn(predictions, ner)
            loss.backward()
            self.optimizer.step()
        else:
            loss = self.loss_fn(predictions, ner)

        return loss.item(), predictions

    @staticmethod
    def compute_accuracy(targets: torch.Tensor, outputs: torch.Tensor) -> tuple:
        """
        Method is utilized to compute the accuracy for each step (of each phase, since procedure remains same)
        :param targets: Tensor for the labels in shape of [batch size, max length]
        :param outputs: Tensor for the predictions in shape of [batch size, max length, num classes]
        :return: tuple of following elements:
                number of correct predictions for specific step
                list of non-padded target tokens
                list of non-padded output tokens
                Note: all collected in one list, so don't try to check sequence based. It is not necessary for f1 score
        """
        predictions = torch.argmax(outputs, -1).tolist()
        labels = targets.view(-1).tolist()
        corrects = list()
        original_targets = list()
        original_predictions = list()
        for t, p in zip(labels, predictions):
            if t != -100:
                corrects.append(t == p)
                original_targets.append(t)
                original_predictions.append(p)

        return sum(corrects), original_targets, original_predictions

    def development(self, validation_loader: DataLoader, num_valid_batches: int, test: bool = False) -> tuple:
        """
        Method is utilized to perform either validation (test=False) or test (test=True) process
        :param validation_loader: DataLoader object for validation dataset
        :param num_valid_batches: number of batches in validation dataset (required for tqdm and average loss)
        :param test: boolean variable to specify whether validation (False) or test (True) process is performed
        :return: tuple for following elements:
                dev_acc: accuracy for specific phase
                dev_loss: loss for specific phase
                f1_macro: f1 score based on macro average
                f1_micro: f1 score based on micro average
        """
        self.classifier.eval()
        targets = list()
        outputs = list()
        with torch.no_grad():
            validation_iterator = tqdm(iterable=validation_loader, total=num_valid_batches, leave=True)
            validation_loss = 0
            validation_accuracy = 0
            validation_instances = 0
            for batch in validation_iterator:
                validation_step_loss, ner_out = self.step_process(batch, train=False)
                validation_loss += validation_step_loss
                step_accuracy, targets_, outputs_ = self.compute_accuracy(batch['labels'], ner_out)
                targets.extend(targets_)
                outputs.extend(outputs_)
                validation_accuracy += step_accuracy
                validation_instances += len(targets_)
                validation_iterator.set_description(
                    desc=f"{'Test' if test else 'Validation'}: Loss: {validation_loss / num_valid_batches: .4f}"
                         f" Accuracy: {validation_accuracy / validation_instances: .4f}"
                )
            f1_macro = f1_score(targets, outputs, average='macro')
            f1_micro = f1_score(targets, outputs, average='micro')
            print(f'F1 scores => macro: {f1_macro: .4f}, micro: {f1_micro: .4f}')
        dev_accuracy = validation_accuracy / validation_instances
        dev_loss = validation_loss / num_valid_batches
        return dev_accuracy, dev_loss, f1_macro, f1_micro

    def train_model(self, project_parameters: dict) -> None:
        """
        Method is utilized to process all steps of the training which are:
            Train model for each epoch and validate;
            Save epoch results;
            Test the model when training is over
            Note: Resume training is also called and checked in this method
        :param project_parameters: experiment parameters dictionary
        :return: None
        """
        num_epochs = project_parameters['epochs']
        dataloaders = {ds_name: self.get_data(ds_name) for ds_name in ['train', 'test', 'validation']}
        self.save_parameters(project_parameters)
        num_train_batches = len(dataloaders['train'])
        num_validation_batches = len(dataloaders['validation'])
        num_test_batches = len(dataloaders['test'])
        init = self.resume_training(project_parameters)

        for epoch in range(init, num_epochs):
            self.classifier.train()
            epoch_loss = 0
            epoch_accuracy = 0
            train_instances = 0
            train_iterator = tqdm(dataloaders['train'], total=num_train_batches, leave=True)
            for batch_data in train_iterator:
                step_loss, out = self.step_process(batch_data, train=True)
                epoch_loss += step_loss
                num_correct, non_pads, _ = self.compute_accuracy(batch_data['labels'], out)
                epoch_accuracy += num_correct
                train_instances += len(non_pads)
                train_iterator.set_description(f"Epoch: {epoch + 1} "
                                               f"Loss: {epoch_loss / num_train_batches: .4f} "
                                               f"Accuracy: {epoch_accuracy / train_instances: .4f}")

            dev_accuracy, dev_loss, f1_macro, f1_micro = self.development(dataloaders['validation'],
                                                                          num_validation_batches)

            epoch_dict = {
                'epoch': epoch + 1,
                'dev_loss': dev_loss,
                'dev_accuracy': dev_accuracy,
                'train_loss': epoch_loss / num_train_batches,
                'train_accuracy': epoch_accuracy / train_instances,
                'f1_macro': f1_macro,
                'f1_micro': f1_micro
            }
            self.save_results(epoch_dict)

        test_accuracy, test_loss, f1_macro_test, f1_micro_test = self.development(dataloaders['test'], num_test_batches,
                                                                                  test=True)
        test_dict = {
            'num_epochs': num_epochs,
            'test_accuracy': test_accuracy,
            'test_loss': test_loss,
            'f1_macro': f1_macro_test,
            'f1_micro': f1_micro_test
        }
        self.save_results(test_dict, test=True)

    def save_results(self, results_dict: dict, test: bool = False) -> None:
        """
        Method is utilized to save the epoch results (train and development in one dictionary)
        :param results_dict: dictionary in which all training and development values are set (if test is False)
        :param test: boolean variable specifies whether training and development outcomes will be saved or test results
        :return: None
        """
        directory = os.path.join(self.experiment_dir, 'outputs')
        results_file = os.path.join(directory, f"{'results_test' if test else 'results'}.pickle")
        if not os.path.exists(results_file):
            results = {key: list() for key in results_dict.keys()}
        else:
            with open(results_file, 'rb') as result_data:
                results = pickle.load(result_data)
        for key, details in results_dict.items():
            results[key].append(details)
        with open(results_file, 'wb') as result_data:
            pickle.dump(results, result_data)

        if test:
            print(f"Test results were saved after training of {results_dict['num_epochs']} epochs")
        else:
            self.save_model_parameters(results_dict)
            print(f"Epoch results were added to the existing data for epoch {results_dict['epoch']}")
            print(f'\n{"<" * 20}{">" * 20}')

    def save_model_parameters(self, results_dict: dict):
        """
        Method is utilized to save model and optimizer parameters for specific epoch.
        :param results_dict: dictionary in which all training and development values are set
        :return: None
        """
        ckpt_dir = os.path.join(self.experiment_dir, 'checkpoints')
        check_dir(ckpt_dir)
        model_name = f"model_epoch_{results_dict['epoch']}_f1_{results_dict['f1_macro']:.4f}_" \
                     f"loss_{results_dict['dev_loss']:.4f}_acc_{results_dict['dev_accuracy']:.4f}"
        optim_name = f"optim_epoch_{results_dict['epoch']}"
        model_path = os.path.join(ckpt_dir, model_name)
        optim_path = os.path.join(ckpt_dir, optim_name)
        torch.save(self.classifier.state_dict(), model_path)
        torch.save(self.optimizer.state_dict(), optim_path)
        print(f"Model and Optimizer parameters were saved for epoch {results_dict['epoch']}")

    def get_best_epoch(self, user_choice: str) -> int:
        """
        Method is utilized to get epoch value for the specific user choice
        :param user_choice: can be f1_macro, dev_acc, dev_loss
        :return: integer value for epoch which corresponds to the best value of the given choice
        """
        results_dict = self.load_model_results()
        data = pd.DataFrame(results_dict)
        if user_choice == 'dev_loss':
            choice = min(data[user_choice])
        else:
            choice = max(data[user_choice])
        request = data[data[user_choice] == choice]
        epoch = request['epoch'].item()
        print(f'According to the best choice selection, epoch {epoch} was chosen!')
        return epoch


In [7]:
class Inference:
    """
    Class is utilized to infer with the pre-trained model as a playground environment
    """

    def __init__(self, hp: dict, process: DataProcess, device: str, infer_parameters: dict):
        """
        Method is utilized as an initializer to set the inference environment
        :param hp: hyperparameters for the model setup
        :param process: data processing object will be utilized to set the model input
        :param device: can either be cuda or cpu
        :param infer_parameters: dictionary which includes inference parameters
        """
        self.hp = hp
        self.device = device
        self.process = process
        self.infer_parameters = infer_parameters
        self.vocabulary = self.process.tokenizer.vocab
        self.collator = self.set_collator()
        self.experiment_dir = self.set_experiment_environment()
        self.classifier = self.set_model()

    def set_experiment_environment(self) -> str:
        """
        Method is utilized to set experiment environment which data will be used for model setup
        :return: directory for the experiment data
        """
        experiment_path = os.path.join(f'results/experiment_{self.infer_parameters["experiment_num"]}')
        return experiment_path

    def set_collator(self) -> DataCollatorForTokenClassification:
        """
        Method is utilized to create data collator object for the experiment
        :return: DataCollator object will be used for post-processing purposes
        """
        return DataCollatorForTokenClassification(
            tokenizer=self.process.tokenizer,
            max_length=self.hp['max_length'],
            padding='max_length'
        )

    def set_model(self) -> NERClassifierBERT:
        """
        Method is utilized to set the model
        :return: Classifier for the given experiment
        """
        return NERClassifierBERT(self.hp).to(self.device)

    def get_best_epoch(self, user_choice: str) -> int:
        """
        Method is utilized to get epoch value for the specific user choice
        :param user_choice: can be f1_macro, dev_acc, dev_loss
        :return: integer value for epoch which corresponds to the best value of the given choice
        """
        results_dict = self.load_model_results()
        data = pd.DataFrame(results_dict)
        if user_choice == 'dev_loss':
            choice = min(data[user_choice])
        else:
            choice = max(data[user_choice])
        request = data[data[user_choice] == choice]
        epoch = request['epoch'].item()
        print(f'According to the best choice selection, epoch {epoch} was chosen!')
        return epoch

    def load_model_results(self) -> dict:
        """
        Method is utilized to load model results, which are collected during the training process
        :return: dictionary which holds each epoch's results
        """
        results_path = os.path.join(self.experiment_dir, 'outputs/results.pickle')
        if not os.path.exists(results_path):
            raise FileNotFoundError('No such directory, for this you need to train the model first!')

        with open(results_path, 'rb') as result_data:
            result_dict = pickle.load(result_data)
        return result_dict

    def load_model(self, epoch_choice: int) -> None:
        """
        Method is utilized to load model for the given epoch choice
        :param epoch_choice: integer specifies which epoch's model will be loaded
        :return: None
        """
        result_dict = self.load_model_results()
        data = pd.DataFrame(result_dict)
        ckpt_dir = os.path.join(self.experiment_dir, 'checkpoints')
        if not os.path.exists(ckpt_dir):
            raise FileNotFoundError('No checkpoints folder was found! Make sure that you trained the model!')

        request = data[data['epoch'] == epoch_choice]
        model_name = f"model_epoch_{epoch_choice}_f1_{request['f1_macro'].item():.4f}_" \
                     f"loss_{request['dev_loss'].item():.4f}_acc_{request['dev_accuracy'].item():.4f}"
        model_path = os.path.join(ckpt_dir, model_name)
        if not os.path.exists(model_path):
            raise FileNotFoundError(f'Model was not trained at least for {epoch_choice} epochs or there is typo! '
                                    f'Check it first!')

        self.classifier.load_state_dict(torch.load(model_path))
        self.classifier.eval()

    def decision_maker(self, project_parameters: dict) -> int:
        """
        Method is utilized to select epoch according to the user's choices for model loading scenario
        :param project_parameters: experiment parameters in which user's choice is kept for loading decision
        :return: epoch choice according to the provided information
        """
        if project_parameters['load_best']:
            if project_parameters['epoch_choice']:
                print(
                    'WARNING: Best choice and epoch choice were made together! '
                    'In such cases best choice is prioritized!'
                )
            return self.get_best_epoch(project_parameters['load_choice'])

        else:
            return project_parameters['epoch_choice']

    @staticmethod
    def pretty_combiner(sequence: list) -> list:
        """
        Method is utilized to prevent mis-tokenization because of combinations of apostrophes and specific examples such
        as 'gonna' and 'wanna'
        :param sequence: list of tokens which were collected by nltk tokenizer
        :return: list of tokens which can be seen as more correct combination
        """
        result = list()
        na_list = ['gon', 'wan']
        shortcuts = ["'s", "'d", "n't", "'m", "'ve", "'ll", "na"]
        count = 0
        for idx, token in enumerate(sequence):
            if token not in shortcuts or idx == 0:
                result.append(token)
                count += 1
            else:
                if token == 'na' and result[count - 1] not in na_list:
                    result.append(token)
                else:
                    new_token = result[count - 1] + token
                    result[count - 1] = new_token
        return result

    @staticmethod
    def make_alignment(original_tokens: list, bert_tokens: list) -> tuple:
        """
        Method is utilized to make alignment between clean sequence and model input sequence. Example:
        Clean sequence: ["I'm", "going", "home", "."]
        Model input: ["[CLS]", "I", "'", "m", "going", "home", ".", "[SEP]"]
        Our desired output is in the length of the clean sequence, thus predictions of 'I', ''', 'm' will be processed
        in specific manner. In order to eliminate prospective confusion in indexes we align them
        :param original_tokens: clean sequence of tokens
        :param bert_tokens: sequence of tokenization results for transformer model
        :return: tuple of following elements:
                result of alignment: "[CLS]", "I'm", "going", "home", ".", "[SEP]";
                alignment map, specifies combination indexes: {0: [0], 1: [1, 2, 3], 2: [4], 3:[5], 4: [6], 5: [7]}
        """
        new_set = [bert_tokens[0]] + original_tokens + [bert_tokens[-1]]

        result_list = list()
        alignment_map = dict()
        bert_idx = 0
        for idx, token in enumerate(new_set):
            alignment_map[idx] = [bert_idx]

            if token == bert_tokens[bert_idx]:
                result_list.append(token)
                bert_idx += 1
            else:
                tok = bert_tokens[bert_idx]
                for cur_idx in range(bert_idx + 1, len(bert_tokens)):
                    tok += bert_tokens[cur_idx]
                    alignment_map[idx].append(cur_idx)
                    if tok == token:
                        result_list.append(tok)
                        bert_idx = cur_idx + 1
                        break

        return result_list, alignment_map

    @staticmethod
    def clean_model_input(model_text: list):
        """
        Model is utilized to fix ## tokenization as a result of AutoTokenizer usage. It happens when the longer words
        are given as input to the tokenizer. The first syllable will be without ## and the rest will be as starting with
        ##. This method cleans them and returns clean version. The rest will be handled by pretty combiner method, if
        needed.
        Example: "[CLS]", "I", "do", "some", "am", "##bi", "##gu", "##ous", "works", "[SEP]"
        Result: "[CLS]", "I", "do", "some", "am", "##bi", "##gu", "##ous", "works", "[SEP]"
        After pretty combiner: '[CLS]', 'I', 'do', 'some', 'ambiguous', 'works', '[SEP]'
        Alignment map: {0: [0], 1: [1], 2: [2], 3: [3], 4: [4, 5, 6, 7], 5: [8], 6: [9]}
        :param model_text: list of tokens as output of the tokenizer
        :return: list of clean tokens (in case ## was detected, otherwise input itself)
        """
        result = list()
        for token in model_text:
            if '##' in token:
                result.append(token.replace('##', ''))
            else:
                result.append(token)
        return result

    def process_input(self, input_text: str) -> tuple:
        """
        Method is utilized to process the input text, which is provided by user as a sequence of characters. Then it
        will be put into the desired shape to perform classification.
        :param input_text: string object as an input sequence of characters
        :return: tuple of the following elements:
                clean_text: list of pretty combined tokens
                alignment_map: dictionary of alignment setup
                model_input: data can be used as model input
        """

        original_tokens = word_tokenize(input_text)
        main_tokens = self.pretty_combiner(original_tokens)

        model_input = self.process.tokenizer(input_text)
        alignment_model_data = self.clean_model_input(model_input.tokens())
        clean_text, alignment_map = self.make_alignment(main_tokens, alignment_model_data)
        return clean_text, alignment_map, model_input

    def process_out(self, clean_data: list, alignment_map: dict, predictions: torch.Tensor) -> None:
        """
        Method is utilized to process the output of the model for each scenario that can occur
        :param clean_data: list of pretty combined tokens
        :param alignment_map: dictionary for alignment setup
        :param predictions: prediction tensor with padding elements removed
        :return: Nothing, it just prints the output
        """
        prediction_result = list()

        for idx, token in enumerate(clean_data):

            if len(alignment_map[idx]) == 1:

                result = torch.argmax(predictions[:, alignment_map[idx][0], :], -1).item()

                prediction_result.append(result)

            else:
                init = 3 * predictions[:, alignment_map[idx][0]]

                summed = init + torch.sum(predictions[:, alignment_map[idx][1:len(alignment_map[idx])], :], 1)
                prediction_result.append(torch.argmax(summed, -1).item())

        decoded = [self.hp['id2label'][idx] for idx in prediction_result]
        result = [(token, decoded[idx]) for idx, token in enumerate(clean_data)]
        print(clean_data[1: len(clean_data)])
        print(result[1: len(result)])

    def infer(self, infer_parameters):
        user_choice = self.decision_maker(infer_parameters)
        self.load_model(user_choice)
        input_text = input('Please provide your text: ')
        clean_text, alignment_map, model_input = self.process_input(input_text)

        input_data = self.collator([each for each in [model_input]])
        predictions = self.classifier(input_data['input_ids'].to(self.device),
                                      input_data['attention_mask'].to(self.device))
        non_pad_out = predictions[:, 0: len(model_input.tokens()), :]

        self.process_out(clean_text, alignment_map, non_pad_out)


In [8]:
class Statistics:
    def __init__(self, hp, process, device, experiment_num, data_choice):
        self.hp = hp
        self.process = process
        self.device = device
        self.experiment_num = experiment_num
        self.data_choice = data_choice
        self.experiment_path = self.set_experiment_environment()
        self.classifier = self.set_model()
        self.collator = self.set_collator()

    def set_experiment_environment(self) -> str:
        """
        Method is utilized to set experiment environment which data will be used for model setup
        :return: directory for the experiment data
        """
        experiment_path = os.path.join('results', f'experiment_{self.experiment_num}')
        return experiment_path

    def set_model(self):
        return NERClassifierBERT(self.hp).to(self.device)

    def get_model_paths(self):
        result_dir = os.path.join(self.experiment_path, 'outputs/results.pickle')
        ckpt_dir = os.path.join(self.experiment_path, 'checkpoints')
        with open(result_dir, 'rb') as result_data:
            results_dict = pickle.load(result_data)
        paths = dict()
        for each in ['dev_loss', 'dev_accuracy', 'f1_macro']:
            if each == 'dev_loss':
                value = min(results_dict[each])
            else:
                value = max(results_dict[each])
            val_idx = results_dict[each].index(value)

            model_name = (f"model_epoch_{results_dict['epoch'][val_idx]}_"
                          f"f1_{results_dict['f1_macro'][val_idx]:.4f}_"
                          f"loss_{results_dict['dev_loss'][val_idx]:.4f}_"
                          f"acc_{results_dict['dev_accuracy'][val_idx]:.4f}")
            path = os.path.join(ckpt_dir, model_name)
            paths[each] = path
        return paths

    def set_collator(self) -> DataCollatorForTokenClassification:
        """
        Method is utilized to create data collator object for the experiment
        :return: DataCollator object will be used for post-processing purposes
        """
        return DataCollatorForTokenClassification(
            tokenizer=self.process.tokenizer,
            max_length=self.hp['max_length'],
            padding='max_length'
        )

    def load_model(self, path):
        self.classifier.load_state_dict(torch.load(path))
        self.classifier.eval()

    def get_data(self, statistics_parameters):
        datasets = self.process.process()
        return DataLoader(dataset=datasets[statistics_parameters['statistics_data_choice']],
                          collate_fn=self.collator,
                          shuffle=True,
                          batch_size=self.hp['batch_size'])

    def collect_results(self, path, statistics_parameters):
        self.load_model(path)
        dataloader = self.get_data(statistics_parameters)
        infer_iterator = tqdm(dataloader, total=len(dataloader), leave=True)
        predictions = list()
        targets = list()
        with torch.no_grad():
            for batch in infer_iterator:
                input_ids = batch['input_ids'].to(self.device)
                attention_mask = batch['attention_mask'].to(self.device)
                output = self.classifier(input_ids=input_ids, attention_mask=attention_mask)
                batch_targets, batch_predictions = self.clean_special_tokens(output, batch['labels'])
                targets.extend(batch_targets)
                predictions.extend(batch_predictions)
        return targets, predictions

    @staticmethod
    def clean_special_tokens(out, labels):
        batch_predictions = list()
        batch_targets = list()
        targets = labels.view(-1).tolist()
        predictions = out.reshape(-1, out.shape[-1])
        preds = torch.argmax(predictions, -1).tolist()
        for t, p in zip(targets, preds):
            if t != -100:
                batch_predictions.append(p)
                batch_targets.append(t)

        return batch_targets, batch_predictions

    def save_predictions(self, output_path, choice, model_path, statistics_parameters):
        file_name = f"prediction_{choice}_{statistics_parameters['statistics_data_choice']}.pickle"
        prediction_path = os.path.join(output_path, file_name)
        if not os.path.exists(prediction_path):
            targets, predictions = self.collect_results(model_path, statistics_parameters)
            targets_none = list()
            predictions_none = list()
            for t, p in zip(targets, predictions):
                if t != 0 and p != 0:
                    targets_none.append(t)
                    predictions_none.append(p)
            results = {
                'targets': targets,
                'predictions': predictions,
                'targets_none': targets_none,
                'predictions_none': predictions_none
            }
            with open(prediction_path, 'wb') as prediction_data:
                pickle.dump(results, prediction_data)
        with open(prediction_path, 'rb') as prediction_data:
            results = pickle.load(prediction_data)

        return results

    def plot_confusion(self, targets, predictions, graph_path, graph_word):
        conf_matrix = confusion_matrix(targets, predictions)
        plt.figure(figsize=(12, 12), dpi=100)
        sns.set_palette('tab10')
        sns.set(font_scale=1.1)

        ax = sns.heatmap(conf_matrix, annot=True, fmt='d', cmap='tab20c')

        classes = list(self.hp['label2id'].keys())
        labels = classes[1::] if 'without' in graph_word else classes

        ax.set_xlabel("Predicted Labels", fontsize=14, labelpad=20)
        ax.xaxis.set_ticklabels(labels)

        ax.set_ylabel("Actual Labels", fontsize=14, labelpad=20)
        ax.yaxis.set_ticklabels(labels)

        ax.set_title(f"Confusion Matrix based on {graph_word}", fontsize=14, pad=20)
        plt.savefig(graph_path)
        plt.show()

    def generate_confusion(self, model_path, choice, statistics_parameters):
        dataset = statistics_parameters['statistics_data_choice']

        if choice == 'dev_loss':
            graph_word = f'Validation Loss ({dataset.title()} dataset)'
        elif choice == 'dev_accuracy':
            graph_word = f'Validation Accuracy ({dataset.title()} dataset)'
        else:
            graph_word = f'F1 Score ({dataset.title()} dataset)'
        outputs_path = os.path.join(self.experiment_path, 'outputs')
        results = self.save_predictions(outputs_path, choice, model_path, statistics_parameters)

        graph_path = os.path.join(outputs_path, f'confusion_{dataset}_data_{choice}.png')
        graph_path_none = os.path.join(outputs_path, f'confusion_{dataset}_data_{choice}_none_o.png')
        self.plot_confusion(results['targets'], results['predictions'], graph_path, graph_word)
        self.plot_confusion(results['targets_none'], results['predictions_none'], graph_path_none,
                            graph_word + ' without O label')

    def show_statistics(self, statistics_parameters):
        paths_dict = self.get_model_paths()
        for metric, path in paths_dict.items():
            self.generate_confusion(path, metric, statistics_parameters)


In [9]:
def create_dataset_name(parameters):
    choices = ['cased', 'punctuation', 'stopwords']
    ds_path = 'dataset'
    for each in choices:
        if parameters[each]:
            ds_path += f'_{each}'
    remove_options = {choice: parameters[choice] for choice in choices}
    return ds_path, remove_options

def get_hyperparameters(parameters):
    req_data = ['learning_rate', 'batch_size', 'max_length', 'dropout', 'weight_decay', 'model_checkpoint']
    hp = {data: parameters[data] for data in req_data}
    return hp

def set_parameters():
    return {
        "experiment_num": 6,
        "cased": True,
        "punctuation": True,
        "stopwords": True,
        "epochs": 3,
        "learning_rate": 0.0001,
        "batch_size": 16,
        "weight_decay": 0.0001,
        "train": True,
        "infer": True,
        "resume_training": False,
        "epoch_choice": 1,
        "load_best": False,
        "load_choice": 'f1_macro',
        "dropout": 0.3,
        "max_length": 180,
        "model_checkpoint": 'bert-base-cased',
        "stats": True,
        "statistics_data_choice": 'test'
    }

def get_parameters() -> dict:
    """
    Method is utilized to transform Namespace object into dict (will be used by project)
    :return: dictionary that includes all user-defined project parameters
    """

    return set_parameters()

def check_dir(directory: str):
    if not os.path.exists(directory):
        os.makedirs(directory)

def setup_labels():

    with open('dataset_infos.json', 'r') as dataset_info:
        ds_info = json.load(dataset_info)
    features = ds_info['conllpp']['features']

    ner_tags = {label: idx for idx, label in enumerate(features['ner_tags']['feature']['names'])}
    pos_tags = {label: idx for idx, label in enumerate(features['pos_tags']['feature']['names'])}

    return ner_tags, pos_tags




In [10]:
def __main__():
    random.seed(42)
    project_parameters = get_parameters()
    hp = get_hyperparameters(project_parameters)
    data_path = 'dataset_parameters'
    process = DataProcess('conllpp', model_checkpoint=hp['model_checkpoint'], data_path=data_path)

    labels = process.datasets['train'].features['ner_tags'].feature.names
    hp['label2id'] = {label: idx for idx, label in enumerate(labels)}
    hp['id2label'] = {idx: label for idx, label in enumerate(labels)}
    #
    device = 'cuda' if torch.cuda.is_available() else 'cpu'

    if project_parameters['train']:
        trainer = Train(hp, process, device, project_parameters['experiment_num'])
        trainer.train_model(project_parameters)
    elif project_parameters['stats']:
        stat_data = ['experiment_num', 'statistics_data_choice', 'stats']
        statistics_parameters = {parameter: value for parameter, value in project_parameters.items()
                                 if parameter in stat_data}
        statistics = Statistics(hp, process, device, project_parameters['experiment_num'], 'validation')
        statistics.show_statistics(statistics_parameters)
    if project_parameters['train']:
        infer_options = ['load_choice', 'load_best', 'epoch_choice', 'experiment_num']
        infer_parameters = {option: project_parameters[option] for option in infer_options}
        inference = Inference(hp, process, device, infer_parameters)
        inference.infer(infer_parameters)


if __name__ == '__main__':
    __main__()


Downloading builder script:   0%|          | 0.00/8.73k [00:00<?, ?B/s]

Downloading metadata:   0%|          | 0.00/3.35k [00:00<?, ?B/s]

Downloading readme:   0%|          | 0.00/7.70k [00:00<?, ?B/s]

Downloading and preparing dataset conllpp/conllpp to /root/.cache/huggingface/datasets/conllpp/conllpp/1.0.0/04f15f257dff3fe0fb36e049b73d51ecdf382698682f5e590b7fb13898206ba2...


Downloading data files:   0%|          | 0/3 [00:00<?, ?it/s]

Downloading data:   0%|          | 0.00/650k [00:00<?, ?B/s]

Downloading data:   0%|          | 0.00/163k [00:00<?, ?B/s]

Downloading data:   0%|          | 0.00/141k [00:00<?, ?B/s]

Extracting data files:   0%|          | 0/3 [00:00<?, ?it/s]

Generating train split:   0%|          | 0/14041 [00:00<?, ? examples/s]

Generating validation split:   0%|          | 0/3250 [00:00<?, ? examples/s]

Generating test split:   0%|          | 0/3453 [00:00<?, ? examples/s]

Dataset conllpp downloaded and prepared to /root/.cache/huggingface/datasets/conllpp/conllpp/1.0.0/04f15f257dff3fe0fb36e049b73d51ecdf382698682f5e590b7fb13898206ba2. Subsequent calls will reuse this data.


  0%|          | 0/3 [00:00<?, ?it/s]

Map:   0%|          | 0/14041 [00:00<?, ? examples/s]

Map:   0%|          | 0/3250 [00:00<?, ? examples/s]

Map:   0%|          | 0/3453 [00:00<?, ? examples/s]

Downloading (…)lve/main/config.json:   0%|          | 0.00/570 [00:00<?, ?B/s]

Downloading pytorch_model.bin:   0%|          | 0.00/436M [00:00<?, ?B/s]

Some weights of the model checkpoint at bert-base-cased were not used when initializing BertModel: ['cls.predictions.transform.dense.bias', 'cls.predictions.transform.LayerNorm.bias', 'cls.seq_relationship.bias', 'cls.predictions.transform.LayerNorm.weight', 'cls.predictions.bias', 'cls.seq_relationship.weight', 'cls.predictions.decoder.weight', 'cls.predictions.transform.dense.weight']
- This IS expected if you are initializing BertModel from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing BertModel from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).


Project Parameters for experiment 6 were saved successfully!
<<<<<<<<<<<<<<<<<<<<>>>>>>>>>>>>>>>>>>>> 



  0%|          | 0/878 [00:00<?, ?it/s]You're using a BertTokenizerFast tokenizer. Please note that with a fast tokenizer, using the `__call__` method is faster than using a method to encode the text followed by a call to the `pad` method to get a padded encoding.
Epoch: 1 Loss:  0.1362 Accuracy:  0.9611: 100%|██████████| 878/878 [06:44<00:00,  2.17it/s]
Validation: Loss:  0.0781 Accuracy:  0.9766: 100%|██████████| 204/204 [00:33<00:00,  6.00it/s]


F1 scores => macro:  0.9181, micro:  0.9766
Model and Optimizer parameters were saved for epoch 1
Epoch results were added to the existing data for epoch 1

<<<<<<<<<<<<<<<<<<<<>>>>>>>>>>>>>>>>>>>>


Epoch: 2 Loss:  0.0597 Accuracy:  0.9832: 100%|██████████| 878/878 [06:45<00:00,  2.16it/s]
Validation: Loss:  0.0784 Accuracy:  0.9779: 100%|██████████| 204/204 [00:33<00:00,  6.01it/s]


F1 scores => macro:  0.9236, micro:  0.9779
Model and Optimizer parameters were saved for epoch 2
Epoch results were added to the existing data for epoch 2

<<<<<<<<<<<<<<<<<<<<>>>>>>>>>>>>>>>>>>>>


Epoch: 3 Loss:  0.0502 Accuracy:  0.9858: 100%|██████████| 878/878 [06:46<00:00,  2.16it/s]
Validation: Loss:  0.0800 Accuracy:  0.9790: 100%|██████████| 204/204 [00:33<00:00,  6.02it/s]


F1 scores => macro:  0.9261, micro:  0.9790
Model and Optimizer parameters were saved for epoch 3
Epoch results were added to the existing data for epoch 3

<<<<<<<<<<<<<<<<<<<<>>>>>>>>>>>>>>>>>>>>


Test: Loss:  0.1419 Accuracy:  0.9645: 100%|██████████| 216/216 [00:36<00:00,  5.99it/s]


F1 scores => macro:  0.8809, micro:  0.9645
Test results were saved after training of 3 epochs


Some weights of the model checkpoint at bert-base-cased were not used when initializing BertModel: ['cls.predictions.transform.dense.bias', 'cls.predictions.transform.LayerNorm.bias', 'cls.seq_relationship.bias', 'cls.predictions.transform.LayerNorm.weight', 'cls.predictions.bias', 'cls.seq_relationship.weight', 'cls.predictions.decoder.weight', 'cls.predictions.transform.dense.weight']
- This IS expected if you are initializing BertModel from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing BertModel from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).


Please provide your text: I studied my master's degree in Germany, Paderborn at Paderborn University.
['I', 'studied', 'my', "master's", 'degree', 'in', 'Germany', ',', 'Paderborn', 'at', 'Paderborn', 'University', '.', '[SEP]']
[('I', 'O'), ('studied', 'O'), ('my', 'O'), ("master's", 'O'), ('degree', 'O'), ('in', 'O'), ('Germany', 'B-LOC'), (',', 'O'), ('Paderborn', 'B-PER'), ('at', 'O'), ('Paderborn', 'I-ORG'), ('University', 'I-ORG'), ('.', 'O'), ('[SEP]', 'O')]
