In [None]:
!pip install simpletransformers

# Libraries & Functions

In [None]:
import pandas as pd 
import numpy as np
from tqdm import tqdm

In [None]:
from simpletransformers.classification import ClassificationModel
import torch

In [None]:
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
from sklearn.preprocessing import LabelEncoder

def calculate_scores(y_test, y_pred, average = "binary"):
    accuracy = accuracy_score(y_test, y_pred)
    precision = precision_score(y_test, y_pred, average = average)
    recall = recall_score(y_test, y_pred, average = average)
    f1 = f1_score(y_test, y_pred, average = average)
    return [accuracy, precision, recall, f1]

In [None]:
model_names = ["distilbert", "distilbert", "debertav2", "electra"]
checkpoint_names = ["ViktorDo/EcoBERT-Pretrained", "distilbert-base-uncased", "microsoft/deberta-v3-base", "google/electra-base-discriminator"]

In [None]:
focus_names = ["Life Form"]#["Growth Form"]#, "Life Form"]
focus_codes = ["2.3.1"]#["1.2.1"]#, "2.3.1"]

# Input Data

In [None]:
raw_datasets = dict()

## POWO Dataset

In [None]:
working_dir = "..//input//powo-gift-final//" 

df_POWO_Cat =  pd.read_excel(working_dir + "POWO_GIFT_Final.xlsx")
df_POWO_Cat_Preproc = df_POWO_Cat.drop_duplicates(subset = ["BERT_description"])
df_POWO_Cat_Preproc = df_POWO_Cat_Preproc[df_POWO_Cat_Preproc["BERT_description"].apply(lambda x: len(x.split(" ")))>10]
raw_datasets["POWO"] = df_POWO_Cat_Preproc

## WIKI Dataset

In [None]:
def fix_WIKI(name, description):
    for n in name.split(" "):
        description = str(description).replace(n.lower(), "")
    return description.strip()

In [None]:
working_dir = "..//input//wiki-gift-final//" 

df_WIKI_Cat =  pd.read_excel(working_dir + "WIKI_GIFT_Final.xlsx")
df_WIKI_Cat_Preproc = df_WIKI_Cat.drop_duplicates(subset = ["BERT_description"])
df_WIKI_Cat_Preproc["BERT_description"] = df_WIKI_Cat_Preproc[["name", "BERT_description"]].apply(lambda x: fix_WIKI(x[0], x[1]), axis = 1)
df_WIKI_Cat_Preproc = df_WIKI_Cat_Preproc[df_WIKI_Cat_Preproc["BERT_description"].apply(lambda x: len(str(x).split(" ")))>10]
raw_datasets["WIKI"] = df_WIKI_Cat_Preproc

## Preprocess Datasets

In [None]:
label_map = {
    "Growth Form": {"herb": 0, "shrub": 1, "tree": 2},
    "Life Form": {"phanerophyte": 0, "chamaephyte": 1, "hemicryptophyte": 2, "cryptophyte": 3, "therophyte": 4},
}


In [None]:
preprocessed_dataset_dict = {}
sample_size = 5000
for focus_name, focus_code in zip(focus_names, focus_codes):
    for dataset_name in list(raw_datasets.keys()):
        labelencoder = LabelEncoder()

        dataset_masked = raw_datasets[dataset_name][raw_datasets[dataset_name][focus_code].notna()]
        dataset_masked = dataset_masked[dataset_masked[focus_code].apply(lambda x: x in label_map[focus_name].keys())].sample(sample_size)
        dataset_masked[focus_code + "_encoded"] = labelencoder.fit_transform(dataset_masked[focus_code])

        indices_train, indices_test \
            = train_test_split(dataset_masked.index.values, test_size=0.25, random_state=42)
            
        df_train = dataset_masked.loc[indices_train, ["BERT_description", focus_code + "_encoded"]]
        df_train.columns = ["text", "labels"]
        df_test = dataset_masked.loc[indices_test, ["BERT_description", focus_code + "_encoded"]]
        df_test.columns = ["text", "labels"]
        
        preprocessed_dataset_dict[dataset_name, focus_name, "train"] = df_train
        preprocessed_dataset_dict[dataset_name, focus_name, "validation"] = df_test

# Model Training & Evaluation

In [None]:
if torch.cuda.is_available():  # Tell PyTorch to use the GPU. 
    device = torch.device("cuda") 
    print('There are %d GPU(s) available.' % torch.cuda.device_count()) 
    print('We will use the GPU:', torch.cuda.get_device_name(0)) # If not...
else:
    print('No GPU available, using the CPU instead.')
    device = torch.device("cpu")
    
import gc
torch.cuda.empty_cache()
gc.collect()

In [None]:
results_list = []

for model_name, model_checkpoint in zip(model_names[:], checkpoint_names[:]):
    for dataset_name in list(raw_datasets.keys())[:]:

        print(model_name, dataset_name)
        
        if(model_checkpoint == "ViktorDo/EcoBERT-Pretrained"):
            model = ClassificationModel(
                model_name,
                model_checkpoint,
                tokenizer_name = "distilbert-base-uncased",
                num_labels = 10, #preprocessed_dataset_dict[dataset_name, "train"]["labels"].nunique(),
                args = {"num_train_epochs": 3, "train_batch_size":8, "eval_batch_size":8, "reprocess_input_data": True, "overwrite_output_dir": True, "save_model_every_epoch": False, "save_eval_checkpoints": False, "max_seq_length": 512}, #"weight_decay": 0.01, "learning_rate": 2e-5, 
            )
#             model_name = "ecobert"
        else:
            model = ClassificationModel(
                model_name,
                model_checkpoint,
                num_labels = 10, #preprocessed_dataset_dict[dataset_name, "train"]["labels"].nunique(),
                args = {"num_train_epochs": 3, "train_batch_size":8, "eval_batch_size":8, "reprocess_input_data": True, "overwrite_output_dir": True, "save_model_every_epoch": False, "save_eval_checkpoints": False, "max_seq_length": 512}, #"weight_decay": 0.01, "learning_rate": 2e-5, 
            )
        # Train the model
        model.train_model(preprocessed_dataset_dict[dataset_name, focus_name, "train"])

        # Evaluate the model
        result, model_outputs, wrong_predictions = model.eval_model(preprocessed_dataset_dict[dataset_name, focus_name, "validation"])
        preprocessed_dataset_dict[dataset_name, focus_name, "validation"]["prediction"] = np.argmax(model_outputs, axis=1)
        results = calculate_scores(preprocessed_dataset_dict[dataset_name, focus_name, "validation"]["labels"], preprocessed_dataset_dict[dataset_name, focus_name, "validation"]["prediction"], average = "macro")
        results_list.append([dataset_name, focus_name] + results + [model_checkpoint])

        torch.cuda.empty_cache()
        gc.collect()


df_results = pd.DataFrame(results_list, columns=["Dataset", "Trait", "Accuracy", "Precision", "Recall", "F1-Score", "Model"])

In [None]:
df_results

In [None]:
df_results

In [None]:
df_results.to_excel("CategoricalTraitClassification_Encoder_Results_Growth_Form.xlsx", index = False)

In [None]:
df_results.to_excel("CategoricalTraitClassification_Encoder_Results_Life_Form.xlsx", index = False)