# **Package import**

In [None]:
import os
import torch
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

import random
random.seed(42)

import torch.nn as nn
import torch.optim as optim
import yaml
import joblib
import pandas as pd 
import numpy as np

from imblearn.over_sampling import SMOTE
from sklearn.preprocessing import StandardScaler

from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.svm import SVC
from sklearn.linear_model import LogisticRegression
from sklearn.tree import DecisionTreeClassifier
from sklearn.metrics import roc_curve, roc_auc_score

import warnings
warnings.filterwarnings('ignore')

## VAE Model

In [None]:
class VAE(nn.Module):
    def __init__(self, input_dim, hid_dim_1, hid_dim_2, latent_dim):
        super(VAE, self).__init__()
        self.encoder = nn.Sequential(
            nn.Linear(input_dim, hid_dim_1),
            nn.ReLU(),
            nn.Linear(hid_dim_1, hid_dim_2),
            nn.ReLU(),
        )
        self.mu = nn.Linear(hid_dim_2, latent_dim)  
        self.log_var = nn.Linear(hid_dim_2, latent_dim)  

        self.decoder = nn.Sequential(
            nn.Linear(latent_dim, hid_dim_2),
            nn.ReLU(),
            nn.Linear(hid_dim_2, hid_dim_1),
            nn.ReLU(),
            nn.Linear(hid_dim_1, input_dim),
        )

    def reparameterize(self, mu, log_var):
        std = torch.exp(0.5 * log_var)
        eps = torch.randn_like(std).to(device)
        return mu + eps * std

    def forward(self, x):
        encoded = self.encoder(x)
        mu = self.mu(encoded)
        log_var = self.log_var(encoded)
        z = self.reparameterize(mu, log_var)
        decoded = self.decoder(z)
        return decoded, mu, log_var

def loss_function(recon_x, x, mu, log_var):
    recon_loss = nn.MSELoss()(recon_x, x)
    kl_loss = -0.5 * torch.sum(1 + log_var - mu.pow(2) - log_var.exp())
    return recon_loss + kl_loss

def generate_new_data(vae_model, latent_dim, num_samples=100):
    vae_model.eval()
    with torch.no_grad():
        z = torch.randn(num_samples, latent_dim).to(device)  
        generated_data = vae_model.decoder(z)
        return generated_data.cpu().numpy()  

In [None]:
current_path = os.getcwd()
print(f"Current path: {current_path}")

In [None]:
# Reference users group list
basegroup_list = ['P21', 'P22', 'P23', 'P24'] #  4th, 10th, 16th, 22th
subgroup1_list = ['P22', 'P23', 'P24']  
subgroup2_list = ['P21', 'P23', 'P24']  
subgroup3_list = ['P21', 'P22', 'P24']  
subgroup4_list = ['P21', 'P22', 'P23'] 

# Path where the feature dataframe is saved
feature_path = os.path.join(current_path, 'feature_data_csv(24)')
    
with open(os.path.join(feature_path, 'Feature_category.yaml'),'r') as f:
    feature_cat=yaml.full_load(f)

# Path to save the trained model 
os.makedirs(os.path.join(current_path, 'Trainined_model'), exist_ok=True)

# Path for saving result
save_folder = 'Result-EER'
os.makedirs(os.path.join(current_path, save_folder), exist_ok=True)

## Parameter

In [None]:
# Add the desired tasks to the list below
task_list = [
    'grabbing',
    'pointing', 
    'typing'
] 

# Add the desired training scenarios to the list below
scenario_list = [
    'Scen1', 
    'Scen2'
]  

# Add the desired classifier models to the list below
model_list = {
    'RF': RandomForestClassifier(random_state=42),
    'SVM': SVC(probability=True, random_state=42),
    'LR': LogisticRegression(random_state=42),
    # 'GradientBoost': GradientBoostingClassifier(random_state=42),
    # 'DecisionTree': DecisionTreeClassifier(random_state=42)
}

# Add the desired reference user groups to the list below
ref_list = {
    'BaseGroup': basegroup_list,
    # 'SubGroup1': subgroup1_list,
    # 'SubGroup2': subgroup2_list,
    # 'SubGroup3': subgroup3_list,
    # 'SubGroup4': subgroup4_list,
}

# Add the desired data augmentation techniques to the list below
data_aug_list = [
    'NoAug',
    # 'SMOTE', 
    # 'VAE'             
] 

# Add the desired feature categories to the list below
feature_category_list = [
    'movement',
    'spatial', 
    'orientation', 
    'interaction'
] 

## Main

In [None]:
feature_list = [feat for cate in feature_category_list for feat in feature_cat[cate]]
all_result_col = ['Model', 'Train_study', 'Referenec_set', 'Aug']
all_result_col += [f"{t}_{s}" for s in ["median", "iqr"] for t in task_list]
all_df = pd.DataFrame([], columns = all_result_col)

for train_study in scenario_list:
    print(f"\n[[Train Study: {train_study}]]"); 
    for m_name, model in model_list.items():
        print(f"Binary Model [{m_name}]");     
        for ref_name, ref in ref_list.items(): 
            print(f"  [{ref_name}]");                 
            for aug in data_aug_list: 
                print(f"  ------ {aug} ------"); 
            
                result_column = ['Task', 'Name', 'EER']
                result_df = pd.DataFrame([], columns = result_column)
            
                for task in task_list:
                    Dataframe_all = pd.read_csv(os.path.join(f"{feature_path}", f"{task}_Dataframe_all.csv"))
                    all_user_list = Dataframe_all['Name'].unique().tolist()  
            
                    reference_df = Dataframe_all[Dataframe_all["Name"].isin(ref)]
                    Dataframe_Users = Dataframe_all[~Dataframe_all["Name"].isin(basegroup_list)]    
                    user_list = [u for u in all_user_list if u not in basegroup_list]
            
                    for user in user_list:              
                        # (0) ========== Data Preparation ==========
                        if train_study == "Scen1":
                            train_normal_df = Dataframe_Users[(Dataframe_Users["Name"]==user)&(Dataframe_Users["Study"]=="Study1")]                    
                            test_normal_df = Dataframe_Users[(Dataframe_Users["Name"]==user)&(Dataframe_Users["Study"]!="Study1")]    
                            test_illegal_df = Dataframe_Users[(Dataframe_Users["Name"]!=user)&(Dataframe_Users["Study"]!="Study1")]      
                        elif train_study == "Scen2":                    
                            train_normal_df = Dataframe_Users[(Dataframe_Users["Name"]==user)&(Dataframe_Users["Study"]!="Study3")]                    
                            test_normal_df = Dataframe_Users[(Dataframe_Users["Name"]==user)&(Dataframe_Users["Study"]=="Study3")]    
                            test_illegal_df = Dataframe_Users[(Dataframe_Users["Name"]!=user)&(Dataframe_Users["Study"]=="Study3")]    
                        else:
                            train_normal_df = pd.DataFrame([])  
                            test_normal_df = pd.DataFrame([]) 
                            test_illegal_df = pd.DataFrame([]) 

                        # (1) ========== Data Augmentation ==========
                        if aug == "VAE":
                            vae_train_u_np = train_normal_df[feature_list].to_numpy()
                            vae_train_u_tensor = torch.FloatTensor(vae_train_u_np).to(device)
                            input_dim = vae_train_u_tensor.shape[1]
        
                            vae = VAE(input_dim = input_dim, hid_dim_1 = 128, hid_dim_2 = 64, latent_dim =  16).to(device)
                            optimizer = optim.Adam(vae.parameters(), lr=0.001)
        
                            epochs = 1000
                            for epoch in range(epochs):
                                optimizer.zero_grad()
                                reconstructed, mu, log_var = vae(vae_train_u_tensor)
                                loss = loss_function(reconstructed, vae_train_u_tensor, mu, log_var)
                                loss.backward()
                                optimizer.step()
                                
                            generated_u_data = generate_new_data(vae, latent_dim = 16, num_samples=330)
                            generated_normal_df = pd.DataFrame(generated_u_data, columns = feature_list)
                            train_normal_df = pd.concat([train_normal_df[feature_list], generated_normal_df], ignore_index = True)                        
                            
                        elif aug == "SMOTE":
                            X_smote_t = train_normal_df[feature_list]
                            y_smote_t = pd.Series([True] * len(X_smote_t))
                            X_smote_f = reference_df[feature_list]
                            y_smote_f = pd.Series([False] * len(X_smote_f))
                            X_smote = pd.concat([X_smote_t, X_smote_f], ignore_index=True)
                            y_smote = pd.concat([y_smote_t, y_smote_f], ignore_index=True) 
        
                            smote = SMOTE(random_state=42)
                            X_resampled, y_resampled = smote.fit_resample(X_smote, y_smote)
        
                            X_resampled_df = pd.DataFrame(X_resampled, columns=feature_list)
                            y_resampled_df = pd.DataFrame(y_resampled, columns=['label'])
                            resampled_df = pd.concat([X_resampled_df, y_resampled_df], axis=1)    
                            train_normal_df = resampled_df[resampled_df["label"]==True] 
                            
                        elif aug == "NoAug":
                            pass
                            
                        else:
                            print("Wrong Command")
        
                        # (3) ========== Training Binary Classifier Model ==========
                        X_train_t = train_normal_df[feature_list]
                        y_train_t = pd.Series([True] * len(X_train_t))
                        X_train_f = reference_df[feature_list]
                        y_train_f = pd.Series([False] * len(X_train_f))
                        X_train = pd.concat([X_train_t, X_train_f], ignore_index=True)
                        y_train = pd.concat([y_train_t, y_train_f], ignore_index=True) 
                        
                        stdscaler = StandardScaler()
                        X_train = stdscaler.fit_transform(X_train)                    
                        model.fit(X_train, y_train)     
                        
                        joblib.dump(model, os.path.join(current_path, "Trainined_model", f"{m_name}_{task}_{user}_{ref_name}_{aug}_{train_study}.pkl"))
        
                        # (4) ========== Testing Binary Classifier Model ========== 
                        X_test_t = test_normal_df[feature_list]
                        y_test_t = pd.Series([True] * len(X_test_t))
                        X_test_f = test_illegal_df[feature_list]
                        y_test_f = pd.Series([False] * len(X_test_f))
                        X_test = pd.concat([X_test_t, X_test_f], ignore_index=True)
                        y_test = pd.concat([y_test_t, y_test_f], ignore_index=True) 
                        
                        X_test = stdscaler.transform(X_test)
                        y_test = y_test.values.ravel()
                        
                        y_test_pred = model.predict(X_test)
                        y_test_prob = model.predict_proba(X_test)[:, 1] 
        
                        fpr, tpr, thresholds = roc_curve(y_test, y_test_prob)
                        roc_auc = roc_auc_score(y_test, y_test_prob)
                        far = fpr
                        frr = 1 - tpr      
                        eer = far[np.nanargmin(np.abs(far - frr))]                                   
        
                        new_df = pd.DataFrame([[task, user, round(eer ,4)]], columns = result_column)
                        result_df = pd.concat([result_df, new_df], ignore_index = True)
                            
                median_eer = round(result_df.groupby('Task')['EER'].median(), 4)
                iqr_eer = result_df.groupby('Task')['EER'].apply(lambda x: round(np.percentile(x, 75) - np.percentile(x, 25), 4))     
                for t_name in result_df['Task'].unique().tolist():
                    print(f"    {t_name:<8} median EER: {median_eer[t_name]:.4f} | IQR: {iqr_eer[t_name]:.4f}")
    
                all_res_list = [m_name, train_study, ref_name, aug] 
                all_res_list += [value for t in task_list for value in [median_eer[t], iqr_eer[t]]]
                
                all_add_df = pd.DataFrame([all_res_list], columns = all_result_col)
                all_df = pd.concat([all_df, all_add_df], ignore_index = True)                
                result_df.to_csv(os.path.join(save_folder, f"Result-{m_name}-{ref_name}-{aug}-{train_study}.csv"), index=False)
all_df.to_csv(os.path.join(save_folder, f"All-Result.csv"), index=False)