# Imports and Setup

In [None]:
%load_ext autoreload
%autoreload 2

from IPython.display import display, HTML
display(HTML("<style>.container { width:100% !important; }</style>"))

In [None]:
import numpy as np
import pandas as pd
import copy
import torch
from transformers import TrainingArguments, DataCollatorWithPadding
from datasets import load_metric, Metric, load_dataset
from sklearn.metrics import f1_score
from tqdm import tqdm 
from datasets import Features, Value, ClassLabel
from transformers import AutoTokenizer, AutoModelForSequenceClassification, TrainingArguments, Trainer
from src.data_loading import load_raw_datasets
from src.data_processing import get_raw_x_y, encode_labels
from src.constants import PATH_PREPROCESSED_DATA

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print('Using device:', device)

# Data Loading and Preprocessing

In [None]:
# get the data
train, validation, test = load_raw_datasets()

set_dict = {'train' : train, 
                'validation': validation, 
                'test': test}


for key in set_dict.keys():
    x_raw, y_raw = get_raw_x_y(set_dict[key])
    y_raw = encode_labels(y_raw)
    df = pd.DataFrame()
    df['text'] = x_raw
    df['label'] = y_raw
    df.to_csv(PATH_PREPROCESSED_DATA+key+"_with_labels.csv", index=False)

In [None]:
# construct the new compatible dataset object
data_files = {"train": PATH_PREPROCESSED_DATA+"/train_with_labels.csv", 
              "test": PATH_PREPROCESSED_DATA+"test_with_labels.csv", 
              "validation": PATH_PREPROCESSED_DATA+"validation_with_labels.csv"}

class_names = ['BACKGROUND','METHODS','RESULTS','OBJECTIVE','CONCLUSIONS']
abstract_features = Features({'text': Value('string'), 'label': ClassLabel(names=class_names)})

dataset = load_dataset('csv', data_files=data_files, features=abstract_features)

In [None]:
# get and apply tokenizer

tokenizer = AutoTokenizer.from_pretrained("microsoft/BiomedNLP-PubMedBERT-base-uncased-abstract")

def tokenize_function(examples):
    return tokenizer(examples["text"], truncation=True)

tokenized_datasets = dataset.map(tokenize_function, batched=True)

tokenizer

In [None]:
# get a random smaller subset of the whole dataset
# CHANGE HERE THE SUBSAMPLE SIZES FOR THE TRAINING AND EVALUATION DATASET IF YOU LIKE
# we tested and reported the following subset sizes for the training set: 1k,10k, 20k
small_train_dataset = tokenized_datasets["train"].shuffle(seed=42).select(range(1000))

small_eval_dataset = tokenized_datasets["validation"].shuffle(seed=42).select(range(1000))

large_train_dataset = tokenized_datasets["train"]
large_eval_dataset = tokenized_datasets["validation"]
large_test_dataset = tokenized_datasets["test"]

small_train_dataset

# Load pretrained model and run experiments

In [None]:
# get the pretrained model

model = AutoModelForSequenceClassification.from_pretrained("microsoft/BiomedNLP-PubMedBERT-base-uncased-abstract", num_labels=5)

model_original = copy.deepcopy(model)

In [None]:
# freeze all layers except the last layer, so we only train the last layer
for param in model.base_model.parameters():
    param.requires_grad = False

In [None]:
# construct metrics function
def compute_metrics(eval_pred):
    logits, labels = eval_pred
    predictions = np.argmax(logits, axis=-1)
    
    accuracy = load_metric("accuracy").compute(predictions=predictions, references=labels)
    f1_weighted = load_metric("f1").compute(predictions=predictions, references=labels, average="weighted")
    f1_macro = load_metric("f1").compute(predictions=predictions, references=labels, average="macro")
    return {'accuracy':accuracy['accuracy'], 'f1_weighted':f1_weighted['f1'], 'f1_macro':f1_macro['f1']}
    
data_collator = DataCollatorWithPadding(tokenizer=tokenizer)   

# get trainer with default hyper parameters

training_args = TrainingArguments(
    output_dir="test_trainer", 
    evaluation_strategy="epoch",
)

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=small_train_dataset,
    eval_dataset=small_eval_dataset,
    compute_metrics=compute_metrics,
    data_collator=data_collator,
)

In [None]:
# fine tune the pretrained model only on the last layer
trainer.train()

In [None]:
# test the fine tuned model on the test set
trainer.predict(large_test_dataset)

In [None]:
# get the original model with all layers unfrozen

model = model_original

# get a new trainer
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=small_train_dataset,
    eval_dataset=small_eval_dataset,
    compute_metrics=compute_metrics,
    data_collator=data_collator,
)

In [None]:
# fine tune the pretrained model on all layers
trainer.train()

In [None]:
# test the fine tuned model on the test set
trainer.predict(large_test_dataset)