In [None]:
from sklearn.metrics import balanced_accuracy_score, f1_score, roc_auc_score, confusion_matrix, log_loss
import pandas as pd
from sklearn.svm import SVC
from sklearn.ensemble import RandomForestClassifier

from imblearn.pipeline import make_pipeline
from imblearn.over_sampling import SMOTE
from sklearn.preprocessing import StandardScaler
from typing import Tuple 
from sklearn.model_selection import StratifiedGroupKFold

In [None]:
def load_data(corpus: str):
    """
    Corpus
        - gREL
        - GoogleNQ
    """
    # The data splits are from the codes of https://github.com/DFKI-Interactive-Machine-Learning/gazeRE-dataset 
    scanpath_all = pd.read_csv(f"../Data/Feature_Data/{corpus}.csv")
    if corpus == "gREL":
        scanpath_agree = scanpath_all.loc[scanpath_all["label"] == scanpath_all["system_label"]]
        scanpath_agree = scanpath_agree.loc[scanpath_agree["gREL_label"] != "t"]
        scanpath_topical = scanpath_all.loc[scanpath_all["gREL_label"] == "t"]
    else:
        scanpath_agree = scanpath_all.loc[scanpath_all["label"] == scanpath_all["system_label"]]
        scanpath_topical = None
    return scanpath_all, scanpath_agree, scanpath_topical

In [None]:
def prepare_data(df: pd.DataFrame, feature_list: list) -> Tuple[list, list, list]:
    features = df[feature_list]
    dataset_list = features.values.tolist()
    labels = df[["label"]].astype({"label": int}).values.tolist()
    labels_list = [item for sublist in labels for item in sublist]
    user_groups_list = df[["user_id"]].values.tolist()
    return dataset_list, labels_list, user_groups_list

In [None]:
def evaluate_results(true_labels: list, prediction_labels: list, prediction_proba: list):
    accuracy = balanced_accuracy_score(true_labels, prediction_labels)
    tn, fp, fn, tp = confusion_matrix(y_true=true_labels, y_pred=prediction_labels, labels=[0, 1]).ravel()
    if (tp + fp) == 0 and (tp + fn) == 0:
        print("F1 Zero")
        f1_result = 0
    else:
        f1_result = f1_score(true_labels, prediction_labels, labels=[0, 1])
    if tp + fn == 0:
        print("TPR Zero")
        tpr = 0
    else:
        tpr = tp / (tp + fn)
    if tn + fp == 0:
        print("FPR Zero")
        fpr = 0
    else:
        fpr = fp / (tn + fp)
    loss_score = log_loss(y_true=true_labels, y_pred=prediction_labels, labels=[0, 1])
    roc_auc = roc_auc_score(y_true=true_labels, y_score=prediction_proba, average="weighted", labels=[0, 1])
    return f1_result, accuracy, tpr, fpr, loss_score, roc_auc, tn, fp, fn, tp

In [None]:
list_all_features = ['f_fixn_n', 'f_fixn_dur_sum', 'f_fixn_dur_avg', 'f_fixn_dur_sd', 
                     'f_scan_distance_h', 'f_scan_distance_v', 'f_scan_distance_euclid', 'f_scan_hv_ratio', 
                     'f_avg_sacc_length', 'f_scan_speed_h', 'f_scan_speed_v', 'f_scan_speed', 
                     'f_box_area', 'f_box_area_per_time', 'f_fixns_per_box_area', 'f_hull_area_per_time', 
                     'f_fixns_per_hull_area']
list_two_features = ['f_hull_area_per_time', 'f_fixns_per_hull_area']

def process_df_stratified_group_folds(df: pd.DataFrame, corpus: str, data_type: str, which_model: str):
    df_history_testing = pd.DataFrame(columns=["user_id", "f1_score_testing", "accuracy_testing", "tpr_testing", "fpr_testing", "loss_testing", "auc_testing", "tn_testing", "fp_testing", "fn_testing", "tp_testing", "feature_list"])
    for feature_list in [list_all_features, list_two_features]:
        # Cross-validation loop for the testing data
        fold_splits = StratifiedGroupKFold(n_splits=5, shuffle=True, random_state=256)
        list_users_dataset_all, list_users_labels_all, list_users_groups_ids_all = prepare_data(df, feature_list)
        
        for i, (training_index, testing_index) in enumerate(
                fold_splits.split(list_users_dataset_all, list_users_labels_all, list_users_groups_ids_all)):
            # Dict to save the results of these users
            dict_results = {}
            # Prepare data for training
            list_users_dataset_training = [list_users_dataset_all[train_ind] for train_ind in training_index]
            list_users_labels_training = [list_users_labels_all[train_ind] for train_ind in training_index]
            # Prepare data for testing
            list_user_dataset_testing = [list_users_dataset_all[k] for k in testing_index]
            list_user_labels_testing = [list_users_labels_all[k] for k in testing_index]
            list_user_group_ids_testing = [list_users_groups_ids_all[k] for k in testing_index]
            list_user_group_ids_testing = [item for sublist in list_user_group_ids_testing for item in sublist]
            list_user_group_ids_testing_unduplicated = list(dict.fromkeys(list_user_group_ids_testing))
            string_user_group_ids_testing_unduplicated = ', '.join(list_user_group_ids_testing_unduplicated)
            
            # Initialise the model
            if which_model == "RF":
                model = RandomForestClassifier(n_estimators=100, random_state=256)
            elif which_model == "SVM_SMOTE":
                model = make_pipeline(SMOTE(), StandardScaler(), SVC(kernel="rbf", C=1, probability=True))
            elif which_model == "RF_SMOTE":
                model = make_pipeline(SMOTE(), StandardScaler(), RandomForestClassifier(n_estimators=100, random_state=256))
            else:
                model = SVC(probability=True)
            
            # Train the model
            model.fit(list_users_dataset_training, list_users_labels_training)
            # Test the model
            list_predicted_labels_testing = model.predict(list_user_dataset_testing)
            list_positive_predicted_proba_testing = model.predict_proba(list_user_dataset_testing)[:, 1]  # keep probabilities for the positive outcome only
            testing_f1_score, testing_accuracy, testing_tpr, testing_fpr, testing_loss, testing_auc, testing_tn, testing_fp, testing_fn, testing_tp = (
                evaluate_results(true_labels=list_user_labels_testing, prediction_labels=list_predicted_labels_testing, prediction_proba=list_positive_predicted_proba_testing))
    
            dict_results["f1_score_testing"] = testing_f1_score
            dict_results["accuracy_testing"] = testing_accuracy
            dict_results["tpr_testing"] = testing_tpr
            dict_results["fpr_testing"] = testing_fpr
            dict_results["loss_testing"] = testing_loss
            dict_results["auc_testing"] = testing_auc
            dict_results["tn_testing"] = testing_tn
            dict_results["fp_testing"] = testing_fp
            dict_results["fn_testing"] = testing_fn
            dict_results["tp_testing"] = testing_tp
            dict_results["user_id"] = string_user_group_ids_testing_unduplicated
            dict_results["feature_list"] = len(feature_list)
    
            # Save the results for this user
            df_history_testing = pd.concat([df_history_testing, pd.DataFrame([dict_results])],ignore_index=True)
            df_history_testing.to_csv(f'../Results/Traditional_ML/{corpus}_{which_model}_{data_type}_{len(feature_list)}_Features.csv', index=False)

# Stratified Groups

In [None]:
"""
Corpus
    - gREL
    - GoogleNQ
Which_Model
    - RF
    - SVM
    - RF_SMOTE
    - SVM_SMOTE
"""
for Corpus in ["gREL", "GoogleNQ"]:
    for Which_Model in ["RF_SMOTE", "RF", "SVM", "SVM_SMOTE"]:
            scanpath_df_all, scanpath_df_agree, scanpath_df_topical = load_data(corpus=Corpus)
            print("Agree")
            process_df_stratified_group_folds(df=scanpath_df_agree, corpus=Corpus, data_type="Agree", which_model=Which_Model)
            if Corpus == "gREL":
                print("Topical")
                process_df_stratified_group_folds(df=scanpath_df_topical, corpus=Corpus, data_type="Topical", which_model=Which_Model)
            print("All")
            process_df_stratified_group_folds(df=scanpath_df_all, corpus=Corpus, data_type="All", which_model=Which_Model)
            print(f"{Corpus} {Which_Model} Finished") 
    print(f"{Corpus} Finished") 