In [None]:
import pandas as pd
import pickle
import numpy as np

import os

import torch
from torch.utils.data import Subset

import transformers
from transformers import AutoTokenizer
from transformers import BertForTokenClassification, Trainer, TrainingArguments

import random
import time

In [None]:
# The class aims to create the appropriate type of tensor data required as input to the Trainer method.
# Takes a BERT-tokenized and "aligned" object and returns the dataset class.

class Preoblikuj_u_Dataset(torch.utils.data.Dataset):
    def __init__(self, encodings, labels):
        self.encodings = encodings
        self.labels = labels

    def __getitem__(self, idx):
        item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}
        item['labels'] = torch.tensor(self.labels[idx])
        return item

    def __len__(self):
        return len(self.labels)

In [None]:
def load_from_pickle(pickle_name):
    with open(pickle_name, 'rb') as fh:
        unpickled_object = pickle.load(fh)
    return unpickled_object

In [None]:
# The function takes the dataset and the name of the file in which it should be pickled.
# The third argument (when explicitly called) forms a subfolder of the BioNER class name that is "hidden"
# for zero-shot and few-shot training, which enables a transparent folder structure of the data.

# Returns nothing, just saves the pickle dataset.

def dump_to_pickle(data_set, file_name, class_name=None):
    if class_name == None:
        folders = os.path.join('Datasets')
    else:
        folders = os.path.join('Datasets', class_name)
    os.makedirs(folders, exist_ok=True)
    filename = file_name+'.pkl'
    file_path = os.path.join(folders, filename)
    outfile = open(file_path,'wb')
    pickle.dump(data_set,outfile, protocol=4)

In [None]:
# The function takes only the name of the class that is "hidden" for the purposes of forming datasets
# for zero- and few-shot training on that class.

"""
Dumps 5 pickled datasets into the appropriate files for the given "hidden" class:
    
     1. dataset_withoutOne_train - dataset for zero-shot training, from which all representatives of the "hidden" class are excluded
     2. dataset_withOne_test - dataset for testing*, in which there are only representatives of the "hidden" class
        
     3. dataset_UnseenClass_train_1 - 1-shot training dataset, in which there is 1 non-empty representative of the "hidden" class
     4. dataset_UnseenClass_train_10 - 10-shot training dataset, which contains 10 non-empty representatives of the "hidden" class
     5. dataset_UnseenClass_train_100 - 100-shot training dataset, which contains 100 non-empty representatives of the "hidden" class
    
     NOTE: For each zero- and few-shot training, each class is the SAME validation dataset.
     * The testing dataset per "hidden" class is the same for zero-shot and few-shot, so there is only one!
     ** training representatives - refers to examples from the initial training set - the one that makes up 85% of the total set
"""

def skriva_Dataset_klasu_vraca_TrainValidTest_Subsets(class_name):
    indexes_train = list(df_train.index[df_train['klasa']!=class_name])
    indexes_test = list(df_test.index[df_test['klasa']==class_name])
    
    dataset_withoutOne_train = Subset(dataset_train, indexes_train)
    dataset_withOne_test = Subset(dataset_test, indexes_test)
    
    dump_to_pickle(dataset_withoutOne_train, 'dataset_withoutOne_train', class_name=class_name)
    dump_to_pickle(dataset_withOne_test, 'dataset_withOne_test', class_name=class_name)
    
    lst_indexes_FewShot_train = list(df_train.index[(df_train['klasa']==class_name) & (df_train['labels'].apply(lambda lst : sum(lst))>0)])
    
    indexes_UnseenClass_train_1 = random.choices(lst_indexes_FewShot_train, k=1)
    indexes_UnseenClass_train_10 = random.choices(lst_indexes_FewShot_train, k=10)
    indexes_UnseenClass_train_100 = random.choices(lst_indexes_FewShot_train, k=100)

    dataset_UnseenClass_train_1 = Subset(dataset_train, indexes_UnseenClass_train_1)
    dataset_UnseenClass_train_10 = Subset(dataset_train, indexes_UnseenClass_train_10)
    dataset_UnseenClass_train_100 = Subset(dataset_train, indexes_UnseenClass_train_100)
    
    dump_to_pickle(dataset_UnseenClass_train_1, 'dataset_UnseenClass_train_1', class_name=class_name)
    dump_to_pickle(dataset_UnseenClass_train_10, 'dataset_UnseenClass_train_10', class_name=class_name)
    dump_to_pickle(dataset_UnseenClass_train_100, 'dataset_UnseenClass_train_100', class_name=class_name)
    
    return dataset_withoutOne_train, dataset_withOne_test, dataset_UnseenClass_train_1, dataset_UnseenClass_train_10, dataset_UnseenClass_train_100 

In [None]:
df_train_name = './ALL_DATA_klasa_nova_train.pkl'
df_test_name = './ALL_DATA_klasa_nova_test.pkl'

df_train = load_from_pickle(df_train_name)
df_test = load_from_pickle(df_test_name)

In [None]:
dataset_train = load_from_pickle('./Datasets/dataset_train.pkl')
dataset_valid = load_from_pickle('./Datasets/dataset_valid.pkl')
dataset_test = load_from_pickle('./Datasets/dataset_test.pkl')

In [None]:
class_unseen = 'Drug' # change class to desired for "hiding" for zero- and few-shot

In [None]:
train0shot, _, train1shot, train10shot, train100shot = skriva_Dataset_klasu_vraca_TrainValidTest_Subsets(class_unseen)

In [None]:
model_name='dmis-lab/biobert-v1.1'
tokenizer = AutoTokenizer.from_pretrained(model_name)  # load the specific tokenizer

In [None]:
training_args = TrainingArguments(
    output_dir='./Results'+class_unseen+'ZeroShot',   # output folder (folder to store the results)
    num_train_epochs=6,                               # number of training epochs
    per_device_train_batch_size=16,                   # batch size per device during training
    per_device_eval_batch_size=16,                    # batch size for evaluation
    weight_decay=0.01,                                # strength of weight decay
    logging_dir='./Logs'+class_unseen+'ZeroShot',     # folder to store the logs
    #logging_steps=10000,
    #logging_strategy='steps',
    save_strategy='epoch',
    evaluation_strategy='epoch',
    load_best_model_at_end=True 
)


model = BertForTokenClassification.from_pretrained(model_name, num_labels=2)

trainer = Trainer(
    model=model,                 # pre-trained model for fine-tuning
    args=training_args,          # training arguments defined above
    train_dataset=train0shot,    # dataset class object for training
    eval_dataset=valid_dataset   # dataset class object for validation
)

start_time = time.time()
trainer.train()
total_time = time.time()-start_time

model_path = os.path.join('Results', class_unseen, 'ZeroShot', 'Model')
os.makedirs(model_path, exist_ok=True)
model.save_pretrained(model_path)
                
tokenizer_path = os.path.join('Results', class_unseen, 'ZeroShot','Tokenizer')
os.makedirs(tokenizer_path, exist_ok=True)
tokenizer.save_pretrained(tokenizer_path)

In [None]:
for i in [train1shot, train10shot, train100shot]:
    training_args = TrainingArguments(
        output_dir='./Results'+class_unseen+'FewShot'+str(i),  # output folder (folder to store the results)
        num_train_epochs=10,                                   # number of training epochs
        per_device_train_batch_size=16,                        # batch size per device during training
        per_device_eval_batch_size=16,                         # batch size for evaluation
        weight_decay=0.01,                                     # strength of weight decay
        logging_dir='./Logs'+class_unseen+'FewShot'+str(i),    # folder to store the logs
        #logging_steps=10000,
        #logging_strategy='steps',
        save_strategy='epoch',
        evaluation_strategy='epoch',
        load_best_model_at_end=True 
    )

    model0 = BertForTokenClassification.from_pretrained(model_path, num_labels=2)

    trainer = Trainer(
        model=model0,                # pre-trained model for fine-tuning
        args=training_args,          # training arguments defined above
        train_dataset=train_0shot,   # dataset class object for training
        eval_dataset=valid_dataset   # dataset class object for validation
    )

    start_time = time.time()
    trainer.train()
    total_time = time.time()-start_time

    model_path = os.path.join('Results', class_unseen, 'FewShot',str(i), 'Model')
    os.makedirs(model_path, exist_ok=True)
    model.save_pretrained(model_path)

    tokenizer_path = os.path.join('Results', class_unseen, 'FewShot', str(i), 'Tokenizer')
    os.makedirs(tokenizer_path, exist_ok=True)
    tokenizer.save_pretrained(tokenizer_path)
