## This file includes all data analysis done with the mouse feature datasets

### Package imports

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

# sk learn modules

# pipeline module
from sklearn.pipeline import Pipeline

# model validation modules
from sklearn.model_selection import permutation_test_score
from sklearn.model_selection import cross_val_score
from sklearn.model_selection import StratifiedKFold
from sklearn.model_selection import KFold
from sklearn.model_selection import cross_validate

# data preprocessing modules
from sklearn.preprocessing import RobustScaler

# import classifier algorithms
from sklearn.linear_model import LogisticRegression
from sklearn.svm import SVC
from sklearn.ensemble import RandomForestClassifier

# import regression algorithms
from sklearn.linear_model import LinearRegression
from sklearn.svm import SVR
from sklearn.ensemble import RandomForestRegressor


# Create custom classes for data transformation in the sklearn pipeline
from sklearn.base import TransformerMixin, BaseEstimator


import pingouin as pg
# Vallat, R. (2018). Pingouin: statistics in Python. Journal of Open Source Software, 3(31), 1026,
# https://doi.org/10.21105/joss.01026

### Import the mouse task analysis dataset

#### Chose the file of one mouse task that you want to analyse

In [None]:
# read the dataset
# replace mouse_task_file_name with the name of your chosen dataset for mouse feature analysis
dataset = pd.read_csv("mmouse_task_file.csv", sep="\t", encoding="utf-8", index_col="ID")

# choose the task_name of the file
# Important: do not change the task_name string, it must match the string of the task name appended to each mouse
# feature in the file, e.g. mean_speed_PointClick
task_name = "PointClick"
# task_name = "DragDrop"
# task_name = "Slider"
# task_name = "FollowCircle"

### Before the analysis, do some transformations to the task names for later steps

In [None]:
# rename the non_features to be handable by the custom transformer (bad solution, but the "fastest" at the point it
# was done --> The data need the same name structure to be able to transform it correctly in later analysis steps
dataset.rename(columns={
    "Pr_arousal": "arousal_Pr_" + task_name,
    "Pr_valence": "valence_Pr_" + task_name,
    "Co_valence": "valence_Con_" + task_name,
    "Co_arousal": "arousal_Con_" + task_name,
    "Pr_stress": "stress_Pr_" + task_name,
    "Co_stress": "stress_Con_" + task_name,
    "Pr_MDBF_GS": "MDBF_GS_Pr_" + task_name,
    "Pr_MDBF_RU": "MDBF_RU_Pr_" + task_name,
    "Pr_MDBF_WM": "MDBF_WM_Pr_" + task_name,
    "Co_MDBF_GS": "MDBF_GS_Con_" + task_name,
    "Co_MDBF_RU": "MDBF_RU_Con_" + task_name,
    "Co_MDBF_WM": "MDBF_WM_Con_" + task_name
}, inplace=True)

# get the names of all input features
all_names = list(dataset.columns)

# get a list with all non mouse features
non_features = ["arousal_Pr_" + task_name, "valence_Pr_" + task_name, "valence_Con_" + task_name,
                "arousal_Con_" + task_name, "MDBF_GS_Pr_" + task_name, "MDBF_RU_Pr_" + task_name,
                "MDBF_WM_Pr_" + task_name, "MDBF_GS_Con_" + task_name, "MDBF_RU_Con_" + task_name,
                "MDBF_WM_Con_" + task_name, "stress_Pr_" + task_name, "stress_Con_" + task_name,
                "jahr", "sex", "schule", "condition"]

# get a list with the names of the mouse features and the name of the condition and practice phase string
mouse_feature_names = []
condition_name_string, practice_name_string = None, None
for name in all_names:

    if name not in non_features:

        if "Pr_" in name:
            mouse_feature_names.append(name[:name.find("Pr_")])
            condition_name_string = name[name.find("Pr_"):]
        elif "Con_" in name:
            practice_name_string = name[name.find("Con_"):]

### Get some descriptive statistics about the mouse usage features

In [7]:
# get the descriptive stats of all independent variables variables

# get the data from the high stress condition, drop the non dependent variables, get the descriptive stats and sort
# it by column name
# entire sample
descriptive_hs_data = dataset.loc[dataset["condition"] == 0].drop(non_features, axis=1).describe().sort_index(axis=1)
descriptive_ls_data = dataset.loc[dataset["condition"] == 1].drop(non_features, axis=1).describe().sort_index(axis=1)

# save the descriptive stats as an excel file
with pd.ExcelWriter("Descriptive_Data_Mouse_Usage_" + task_name + ".xlsx") as writer:
    descriptive_hs_data.to_excel(writer, sheet_name="High_Stress_Condition")
    descriptive_ls_data.to_excel(writer, sheet_name="Low_Stress_Condition")

#### create histograms of the variables

In [None]:
# plot the mouse usage features

for i, col in enumerate(dataset.columns):
    if col not in non_features:
        plt.figure(i)
        sns.distplot(dataset[col], kde=False)
        plt.show()

In [None]:
# plot the dependent variables
for i, col in enumerate(non_features):
    plt.figure(i)
    sns.distplot(dataset[col], kde=False)
    plt.show()

#### create a Pairplot to plot different variables against each other

In [None]:
# a plot with all possible variables is too much, therefore plot selected variables
# an example:
some_features = [i + condition_name_string for i in mouse_feature_names[:1]]
some_features.extend(["valence_Con_" + task_name, "arousal_Con_" + task_name])
data_to_plot = dataset.loc[:, some_features]

sns.pairplot(data_to_plot)

plt.show()

## Calculate mixed anovas for each mouse usage feature

### same procedure as in the manipulation check, but with the mouse usage data

In [10]:
# prepare the dataset for the ANOVA

# helper to split the prepare the dataset for the anova and change the data from a wide to a long format
def create_anova_df(df, variable, condition, practice):

    # assign a subject number to each subject
    df["subject"] = np.arange(len(df))
    # replace the condition number with a string
    df["condition"].replace({0: "HS", 1: "LS"}, inplace=True)

    # change the format
    df = pd.melt(df, id_vars=["subject", "condition"], value_vars=[variable + practice, variable + condition],
                 var_name="Pr-Con", value_name=variable)

    # change the name of the column to practice and condition for clearer reading of the results
    df["Pr-Con"].replace({variable + practice: "Pr", variable + condition: "Con"}, inplace=True)

    return df

In [11]:
# Create PointPlots to visualize the manipulation check results
# Tutorial here: https://raphaelvallat.com/pingouin.html
def plot_anova(data, variable):

    # data visualization
    sns.set()
    sns.pointplot(data=data, x="Pr-Con", y=variable, hue="condition", dodge=True, markers=['o', 's'],
                  capsize=.1, errwidth=1, palette='colorblind')

    plt.title("Pointplot with " + variable)
    # show the plot
    plt.show()
    # save the plot
    # plt.savefig('ANOVA' + variable + '.png')

In [12]:
# calculate the anova results

def calc_anova(data, variable):

    # calculate the anova
    aov = pg.mixed_anova(dv=variable, within="Pr-Con", between="condition", subject="subject", data=data)
    print("Repeated measures ANOVA with " + variable)
    pg.print_table(aov)
    posthocs = pg.pairwise_ttests(dv=variable, within='Pr-Con', between='condition',
                                  subject='subject', data=data)
    pg.print_table(posthocs)
    print("\n" + "\n")

    # return the anova and post hoc dataframes with an added index layer that is the variable name
    return pd.concat([aov], keys=[variable]), pd.concat([posthocs], keys=[variable])

In [13]:
# helper function to get a dataframe with all anova results per task
def get_anova_results(dataframe, variable_list, condition, practice):

    # initialize dataframes
    anova_df = pd.DataFrame()
    posthoc_df = pd.DataFrame()

    # loop the variable list and add the result dataframes to the initialized dataframes
    for variable in variable_list:

        anova_data = create_anova_df(dataframe, variable, condition, practice)

        # plot the anova
        # plot_anova(anova_data, variable)
        # calc the anova
        anova, posthoc = calc_anova(anova_data, variable)

        anova_df = pd.concat([anova_df, anova])
        posthoc_df = pd.concat([posthoc_df, posthoc])

    return anova_df, posthoc_df

In [None]:
# Calculate the ANOVA for all mouse features and save the results in a dataframe
# entire sample
anova_results_df, anova_posthocs_df = get_anova_results(dataset, mouse_feature_names, condition_name_string,
                                                        practice_name_string)

# save the descriptive stats as an excel file
with pd.ExcelWriter("ANOVAS_Mouse_Parameters_" + task_name + ".xlsx") as writer:
    anova_results_df.to_excel(writer, sheet_name="mixed_ANOVA_Results")
    anova_posthocs_df.to_excel(writer, sheet_name="POSTHOC_Results")

## Machine Learning Analysis

### Helper Class for custom variable transformation in the sklearn pipeline

In [16]:
class CustomStandardization(BaseEstimator, TransformerMixin):

    """
    does custom transformation on the condition data by using the practice data to take individual differences into
    account
    1. use the difference between the condition and practice trial
    2. only use the condition trial
    """

    def __init__(self, method="ignore_practice"):
        self.method = method

    def fit(self, X, y=None):
        return self

    def transform(self, X, y=None):

        feat_names = []
        cond_name_string, pra_name_string = None, None
        for col in list(X.columns):
            if "Pr_" in col:
                feat_names.append(col[:col.find("Pr_")])
                pra_name_string = col[col.find("Pr_"):]
            elif "Con_" in col:
                cond_name_string = col[col.find("Con_"):]

        if self.method == "ignore_practice":

            # get only the names of the condition columns
            condition_cols = [col for col in list(X.columns) if "Con_" in col]

            return X.loc[:, condition_cols]

        elif self.method == "Difference":

            # calculate the difference between the condition phase and baseline data
            diff_df = pd.DataFrame()
            for feat in feat_names:
                diff_df["Diff_" + feat] = X[feat + cond_name_string] - X[feat + pra_name_string]

            return diff_df

        else:

            print("Chosen method " + self.method + " does not exist. Defaulted to ignore_practice")
            return X

## Condition Classification Analysis

### Permuation test helper functions

#### calculate the results of 5-fold cross validation followed by a permutation test

In [19]:
# calculates a k-fold cross val score and compares the mean performance score with a distribution of n models that
# were trained with permutated class labels
# more information can be found here:
# https://scikit-learn.org/stable/modules/generated/sklearn.model_selection.permutation_test_score.html
def ml_permutation_test(iv, dv, standard_method, algorithm, procedure_title):

    # save the results of the permutation test
    results = {}

    # make a pipeline that does the preprocessing and outputs the cross validation score
    pipeline = Pipeline([
        ("custom_transformation", CustomStandardization(method=standard_method)),
        ("robust_scaler", RobustScaler()),
        ("clf", algorithm)
    ])

    # create a stratified kFold Splitter
    cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)

    # get the mean cross validation score, the permuation scores and the
    # p-value of the permutation test
    score, permutation_scores, pvalue = permutation_test_score(
        pipeline, iv, dv, scoring="accuracy", cv=cv, n_permutations=500, n_jobs=1)

    # get the statistical values of the permutation
    perm_score = np.mean(permutation_scores)
    perm_std = np.std(permutation_scores)
    # Get the significance threshold (the classification model must be better than 95% of the permutated models)
    sig_tresh = np.percentile(permutation_scores, 95.0)

    results["Mean_score"] = np.round(score, 2) * 100
    # results["Std_score"] = np.round(std, 2) * 100
    results["p_value"] = np.round(pvalue, 5)
    results["Mean_Permutation_score"] = np.round(perm_score, 2) * 100
    results["Std_Permutation_score"] = np.round(perm_std, 2) * 100
    results["Sig_Treshold"] = sig_tresh * 100

    plot_permutation_test_results(permutation_scores * 100,
                                  score * 100, np.round(pvalue, 3),
                                  np.round(sig_tresh, 2) * 100, procedure_title)

    return results

#### Plot the permutation test results

In [18]:
# plot the results of the permutation procedure
def plot_permutation_test_results(permutation_scores, score, pvalue, p_tresh, plot_title):
    # Create and save a plot of the procedure
    plt.hist(permutation_scores, bins=np.arange(np.min(permutation_scores), np.max(permutation_scores), 1),
             alpha=0.6, rwidth=1.0, edgecolor="black", color="Blue")
    ylim = plt.ylim()
    plt.plot(2 * [score], ylim, "--r", linewidth=3,
             label="Mean Accuracy: %s, p = %s" % (np.round(np.mean(score), 2), pvalue))
    plt.plot(2 * [p_tresh], ylim, "--b", linewidth=2, label="Sig. Threshold: " "%s" % p_tresh)
    plt.plot(2 * [np.mean(permutation_scores)], ylim, "--g", linewidth=3,
             label="Mean Permutation Score: " "%s" % (np.round(np.mean(permutation_scores), 2)))
    plt.ylim(ylim)
    plt.xlabel("Accuracy Score")
    plt.ylabel("Frequency")
    plt.title(plot_title)
    plt.legend(loc="upper left")
    
    # show the plot
    plt.show()
    
    # save the plot
#     plt.savefig("Permutation_Test_" + plot_title + ".png")
    
    plt.close()

### get the classification model options

In [20]:
# Create a dictionary with all the different model options that the models can be build and tested in a loop
# the options are:
#   - Using the logistic regression, the support vector machine or the random forest algorithm
#   - Control for baseline mouse usage or ignore the practice data
# other options can be added or options can be removed

model_options = {
    "Alg": {
        "LogRegr": LogisticRegression(solver="liblinear"),
        "SVM": SVC(gamma="scale"),
        "RF": RandomForestClassifier(n_estimators=50)},
    "Pr_inclusion": {
        "Ignore": "ignore_practice",
        "Diff": "Difference"}
}

### Do the permuation test

In [None]:
# Permuation Test (caution, this procedure takes long to run, because every classification option is additionally
# done 500 times on permutated class labels)

# get a dict to save the classifier results:

permutation_results = {}

# get the predictors
predictors = dataset.drop(non_features, axis=1)
class_label = dataset["condition"]

# loop over the classification options and save the results of the classification
for option in model_options["Pr_inclusion"]:
    for alg in model_options["Alg"]:
        print("Permutation test classification with Mouse Data and " + option + alg)
        permutation_results[(option, alg)] = \
                    ml_permutation_test(predictors,
                                        class_label,
                                        model_options["Pr_inclusion"][option],
                                        model_options["Alg"][alg],
                                        "Permutation Test with " + option + " " + alg)

# create a multiindexed dataframe from the results dictionary
permutation_results_df = pd.DataFrame(permutation_results).T

# Save the results
permutation_results_df.to_excel("Classification_Permutation_Test_Results_" + task_name + ".xlsx")

## Regression on Valence and Arousal

### 5-fold-cross validation helper function

In [None]:
# do a k-fold cross validation for regression
def k_fold_cross_val_regression(iv, dv, standard_method, algorithm):

    # save the results
    results = {}

    # make a pipeline that does the preprocessing and outputs the cross validation score
    pipeline = Pipeline([
        ("custom_transformation", CustomStandardization(method=standard_method)),
        ("robust_scaler", RobustScaler()),
        ("clf", algorithm)
    ])

    # get a cross validation splitter
    cv = KFold(n_splits=5, shuffle=True)

    metrics = {"nMAE": "neg_mean_absolute_error", "R2_score": "r2"}

    scores = cross_validate(pipeline, iv, dv, cv=cv, scoring=metrics, return_train_score=True)

    for key in scores:
        results["mean_" + key] = np.round(np.mean(scores[key]), 5)
        results["std_" + key] = np.round(np.std(scores[key]), 5)

    return results

### get the regression model options

In [None]:
# Create a dictionary that holds all information about the different options of the regression analysis
# (Similar to the classification)

# Regression on valence and arousal
sam_regression_options = {
    "Alg": {
        "LinReg": LinearRegression(),
        "RFReg": RandomForestRegressor(n_estimators=50),
        "SVR": SVR(gamma="auto")}
}

### Helper Function to save the regression results

In [None]:
# the item name and information about baseline inclusion are passed as parameters to the helper function
def get_self_report_regression_results(item_name, handle_practice):
    
    sam_regression_results = {}

    # get the predictors (mouse usage features)
    predictors = dataset.drop(non_features, axis=1)
    # get the self-report item
    if "Con" in item_name:
        # only use the condition valence/arousal score
        dependent_variable = dataset[item_name]
    else:
        # use the valence/arousal difference score between the condition and practice phase
        dependent_variable = dataset[item_name + "_Con_" + task_name] \
                                 - dataset[item_name + "_Pr_" + task_name]
    
    # loop over the regression options
    for alg in sam_regression_options["Alg"]:
        print("Regression with 5foldCV and " + item_name + alg)
        sam_regression_results[(item_name, alg)] = \
                        k_fold_cross_val_regression(iv=predictors,
                                                    dv=dependent_variable,
                                                    standard_method=handle_practice,
                                                    algorithm=sam_regression_options["Alg"][alg])


    # create a multiindexed dataframe from the results dictionary
    sam_regression_results_df = pd.DataFrame(sam_regression_results).T
    
    return sam_regression_results_df

### Do the analysis for each dependent variable and save the results

##### Regression without including the baseline of the practice phase

In [None]:
# SAM prediction
# take the sam items of the condition phase
sam_items = ["valence_Con_" + task_name, "arousal_Con_" + task_name]

# intiate an empty dataframe
self_report_reg_no_baseline = pd.DataFrame()

for item in sam_items:
    # calculate the results of the regression analysis
    results = get_self_report_regression_results(item_name=item, handle_practice="ignore_practice")
    # save them into the dataframe
    self_report_reg_no_baseline = pd.concat([self_report_reg_no_baseline, results])

# save the dataset
self_report_reg_no_baseline.to_excel("Reggression_Results_No_Baseline" + task_name + ".xlsx")

##### Regression that includes the baseline of the practice phase

In [None]:
# also use the practice data and predict the difference score between the practice and actual condition
# get the dataset

# The MDBF Items could be added here (to make the table even bigger :))
sam_items = ["valence", "arousal"]

# save the results
self_report_reg_baseline = pd.DataFrame()

for item in sam_items:
    # get the results
    results = get_self_report_regression_results(item_name=item, handle_practice="Difference")
    # save the results in a dataframe
    self_report_reg_baseline = pd.concat([self_report_reg_baseline, results])

self_report_reg_baseline.to_excel("Difference_Score_Reggression_Results_" + task_name + ".xlsx")

## Task versus Task classification

In [None]:
# Point-and-Click Task versus Drag-and-Drop Task

# import and read the datasets

# Replace the dataset file names with your dataset filenames (and directory)
pointclick_data = pd.read_csv("point_click_task_dataset.csv", sep="\t", encoding="utf-8", index_col="ID")
dragdrop_data = pd.read_csv("drag_drop_task_dataset.csv", sep="\t", encoding="utf-8", index_col="ID")

In [None]:
# get the condition column names (only of the condition, not the practice)
pointclick_cols = [i for i in list(pointclick_data.columns) if "Con_PointClick" in i]
dragdrop_cols = [i for i in list(dragdrop_data.columns) if "Con_DragDrop" in i]

# get a list with the new shared column names (name doesnt include the task name anymore, only the feature name, e.g.
# mouse_speed_mean)
renamed_cols = [i[:-len("Con_PointClick")] for i in pointclick_cols]

In [None]:
# get the dataframes with the relevant columns, rename the column and add a "task indicator" (classification prediction)
pointclick_data = pointclick_data.loc[:, pointclick_cols]
pointclick_data.columns = renamed_cols
pointclick_data["task"] = 0

dragdrop_data = dragdrop_data.loc[:, dragdrop_cols]
dragdrop_data.columns = renamed_cols
dragdrop_data["task"] = 1

# merge the two dataframes to one
pc_vs_dd = pd.concat([pointclick_data, dragdrop_data], axis=0)

# check if everything worked as intended
print("Point-Click-Data shape:", pointclick_data.shape)
print("Drag-Drop-Data shape:", dragdrop_data.shape)
print("Merged Dataset shape", pc_vs_dd.shape)

In [None]:
# get the predictors and the dependent variable
X = pc_vs_dd.drop(["task"], axis=1)
Y = pc_vs_dd["task"]

# create a pipeline with the scaler and the machine learning algorithm (here support vector classification is used
# exmeplarly)
pipeline = Pipeline([
        ("robust_scaler", RobustScaler()),
        ("clf", SVC(gamma="scale"))
    ])

# get the scores from 5-fold-cross validation
task_vs_task_scores = cross_val_score(pipeline, X, Y, cv=5)

print("\n")
print("-----------")
print("Mean Accuracy in the Task Versus Task Classification (5-fold-cv): ", np.mean(task_vs_task_scores))