# Settings

## Constant

In [32]:
import os

PATH_DATA='./TD2D'
sensor_data = ['PPG.csv', 'ECG.csv', 'EDA.csv', 'HR.csv', 'diameter.csv', 'fixations.csv', 'gazePositions.csv']

## Utility Functions

In [27]:
import pandas as pd
import numpy as np

# Dataset Overview

## preproccessing

In [28]:
label = pd.DataFrame()

with os.scandir(PATH_DATA) as root:
    for pid in root: #D1~D50
        driver = os.path.basename(pid)[1:]

        if driver.__contains__('idea'):
            continue
        elif driver.__contains__('xlsx'):
            continue
        elif driver.__contains__('DS_Store'):
            continue

        if os.path.isdir(pid):
            with os.scandir(pid) as p:
                label_driver = pd.DataFrame()
                for task in p:  # e.g., audiobook listening
                    if os.path.isdir(task):
                        with os.scandir(task) as t:
                            secondary_task = os.path.basename(task)
                            #참조할 cases information csv file
                            cases_information = pd.read_csv(os.path.abspath(task)+'/takeoverScenarioMeasurements.csv', sep=',')
                            #label
                            label_task = pd.DataFrame([[driver, secondary_task]])
                            label_task = pd.concat([label_task, cases_information[['takeoverResult','reactionTime', 'NASA-TLX']]], axis=1, ignore_index=True)
                            label_driver = pd.concat([label_driver, label_task], axis=0, ignore_index=True)
                label = pd.concat([label, label_driver], axis=0, ignore_index=True)
label.columns = ['driver', 'secondaryTask', 'takeoverResult', 'reactionTime', 'NASA_TLX']


## Descriptive Statistics

In [30]:

# Define secondary task categories
def categorize_task(task):
    if task == 'baseline':
        return '<Without secondary task>'
    elif task in ['0back', '1back', '2back', 'audiobook listening', 'auditory gaming', 'auditory texting']:
        return '<Auditory tasks>'
    elif task in ['ebook reading', 'texting', 'gaming']:
        return '<Visual tasks>'
    return '<Unknown>'

label['Category'] = label['secondaryTask'].apply(categorize_task)

# Calculate success ratio
success_ratios = label.groupby('secondaryTask').apply(lambda x: (x['takeoverResult'] == 'Success').mean() * 100)

# Calculate means and standard deviations for reaction time and workload
stats = label.groupby('secondaryTask').agg(
    reactionTime_Mean=('reactionTime', 'mean'),
    reactionTime_SD=('reactionTime', 'std'),
    workload_Mean=('NASA_TLX', 'mean'),
    workload_SD=('NASA_TLX', 'std')
)

# Merge success ratios
stats = stats.join(success_ratios.rename('successRatio'))

# Add category information to the grouped statistics
stats = stats.reset_index().merge(label[['secondaryTask', 'Category']].drop_duplicates(), on='secondaryTask').set_index('secondaryTask')

# Reorder columns
stats = stats[['Category', 'successRatio', 'reactionTime_Mean', 'reactionTime_SD', 'workload_Mean', 'workload_SD']]

# Set custom order for secondary tasks
task_order = [
    'baseline', '0back', '1back', '2back', 'audiobook listening',
    'auditory texting', 'auditory gaming', 'ebook reading', 'texting', 'gaming'
]
stats.index = pd.CategoricalIndex(stats.index, categories=task_order, ordered=True)
stats = stats.sort_index()

# Define a function to format the output like the provided table
def format_stats_table_with_headers(label):
    table = "Secondary Task                  Success Ratio (%)   Reaction Time (ms)        Perceived Workload\n"
    table += "                                                    Mean       SD             Mean     SD\n"
    current_category = None
    for index, row in label.iterrows():
        if row['Category'] != current_category:
            current_category = row['Category']
            table += f"{current_category}\n"
        table += f"{index:<30} {row['successRatio']:>5.1f}                {row['reactionTime_Mean']:>6.2f}     {row['reactionTime_SD']:>6.2f}        {row['workload_Mean']:>6.2f}    {row['workload_SD']:>6.2f}\n"
    return table

# Format the table and print it
formatted_stats_table = format_stats_table_with_headers(stats)
print(formatted_stats_table)

Secondary Task                  Success Ratio (%)   Reaction Time (ms)        Perceived Workload
                                                    Mean       SD             Mean     SD
<Without secondary task>
baseline                        86.0                880.98     201.80         29.60     22.99
<Auditory tasks>
0back                           82.0                883.06     235.94         33.37     21.18
1back                           68.0                955.50     253.31         51.73     20.09
2back                           72.0                978.62     209.32         66.91     16.44
audiobook listening             76.0                905.26     213.36         38.28     18.95
auditory texting                68.0                1016.90     264.46         45.81     20.92
auditory gaming                 74.0                980.86     281.65         41.99     21.83
<Visual tasks>
ebook reading                   36.0                1294.30     483.59         71.25     15.06
te

### Correlation

In [9]:
from scipy.stats import pearsonr

x = label.copy()
x.takeoverResult = x.takeoverResult.apply(lambda x: 1 if x == 'Success' else 0)
x = x[['reactionTime', 'takeoverResult', 'NASA_TLX']]

corr_x = x.corr()
p_x = x.corr(method=lambda x, y: pearsonr(x, y)[1]) - np.eye(len(x.columns))

print("The correlation is:")
print(corr_x, "\n")
print("The p-value is:")
print(p_x)

The correlation is:
                reactionTime  takeoverResult  NASA_TLX
reactionTime        1.000000       -0.598944  0.285091
takeoverResult     -0.598944        1.000000 -0.326345
NASA_TLX            0.285091       -0.326345  1.000000 

The p-value is:
                reactionTime  takeoverResult      NASA_TLX
reactionTime    0.000000e+00    5.329990e-50  8.351804e-11
takeoverResult  5.329990e-50    0.000000e+00  7.159309e-14
NASA_TLX        8.351804e-11    7.159309e-14  0.000000e+00


# Preprocessing and Feature Extraction

In [10]:
def _extract(data: pd.DataFrame): # return features as a dataframe
    features = pd.DataFrame([[data.min(), data.max(), data.mean(), data.skew(), data.kurtosis()]])
    return features
FEATURE_NUMBER = 5 # min, max, mean, skewness, kurtosis
VALUE_NUMBER = 13 # sum of values
DATA_DURATION = 10000 # 10 s

feature = pd.DataFrame()
label = pd.DataFrame()

with os.scandir(PATH_DATA) as root:
    for pid in root: #D1~D50
        driver = os.path.basename(pid)[1:]

        if driver.__contains__('idea'):
            continue
        elif driver.__contains__('xlsx'):
            continue
        elif driver.__contains__('DS_Store'):
            continue

        if os.path.isdir(pid):
            with os.scandir(pid) as p:
                feature_driver = pd.DataFrame()
                label_driver = pd.DataFrame()
                for task in p:  # 01_task_name
                    if os.path.isdir(task):
                        with os.scandir(task) as t:
                            # takeoverScenarioMeasurements csv file
                            takeoverScenarioMeasurements = pd.read_csv(os.path.abspath(task)+'/takeoverScenarioMeasurements.csv', sep=',')
                            critical_event_occurrence_time = takeoverScenarioMeasurements.criticalEventOccurrenceTime.values[0]

                            # Dataframe for feature extract
                            feature_task = pd.DataFrame([driver], columns=['driver'])

                            # Label
                            label_task = pd.DataFrame([driver], columns=['driver'])
                            label_task = pd.concat([label_task, takeoverScenarioMeasurements[['takeoverResult', 'reactionTime', 'NASA-TLX']]], axis=1, ignore_index=True)

                            csvs = ['fixations', 'gazePositions', 'diameter', 'HR', 'PPG', 'ECG', 'EDA']
                            for csv in t:   # e.g., ECG.csv
                                f = os.path.basename(csv)
                                if sensor_data.__contains__(f):
                                    file = pd.read_csv(csv, sep=',')
                                    
                                    #selecting 10 seconds before critical event occurrence 
                                    idx_not_selected = file[(file['timestamp'] < (float(critical_event_occurrence_time) - DATA_DURATION)) | (file['timestamp'] > float(critical_event_occurrence_time))].index
                                    file = file.drop(idx_not_selected)

                                    if file.empty:
                                        continue

                                    # get feature
                                    if f == 'fixations.csv':
                                        feature_task = pd.concat([feature_task, _extract(file.iloc[:, 1])], axis=1, ignore_index=True)
                                        feature_task = pd.concat([feature_task, _extract(file.iloc[:, 2])], axis=1, ignore_index=True)
                                        feature_task = pd.concat([feature_task, _extract(file.iloc[:, 3])], axis=1, ignore_index=True)
                                        feature_task = pd.concat([feature_task, _extract(file.iloc[:, 4])], axis=1, ignore_index=True)
                                        feature_task = pd.concat([feature_task, _extract(file.iloc[:, 5])], axis=1, ignore_index=True)
                                    elif f == 'gazePositions.csv':
                                        feature_task = pd.concat([feature_task, _extract(file.iloc[:, 1])], axis=1, ignore_index=True)
                                        feature_task = pd.concat([feature_task, _extract(file.iloc[:, 2])], axis=1, ignore_index=True)
                                        feature_task = pd.concat([feature_task, _extract(file.iloc[:, 3])], axis=1, ignore_index=True)
                                    elif f == 'diameter.csv':
                                        feature_task = pd.concat([feature_task, _extract(file.iloc[:, 1])], axis=1, ignore_index=True)
                                    elif f == 'HR.csv':
                                        feature_task = pd.concat([feature_task, _extract(file.iloc[:, 1])], axis=1, ignore_index=True)
                                    elif f == 'PPG.csv':
                                        feature_task = pd.concat([feature_task, _extract(file.iloc[:, 1])], axis=1, ignore_index=True)
                                    elif f == 'ECG.csv':
                                        feature_task = pd.concat([feature_task, _extract(file.iloc[:, 1])], axis=1, ignore_index=True)
                                    elif f == 'EDA.csv':
                                        feature_task = pd.concat([feature_task, _extract(file.iloc[:, 1])], axis=1, ignore_index=True)
                            feature_driver = pd.concat([feature_driver, feature_task], axis=0, ignore_index=True)
                            label_driver = pd.concat([label_driver, label_task], axis=0, ignore_index=True)
                if (not feature_driver.isna().values.any()) & (len(feature_driver.columns) == 1+VALUE_NUMBER*FEATURE_NUMBER):
                    feature = pd.concat([feature, feature_driver], axis=0, ignore_index=True)
                    label = pd.concat([label, label_driver], axis=0, ignore_index=True)
label.columns = ['driver', 'takeoverResult', 'reactionTime', 'NASA-TLX']

# Model Building and Evaluation

In [11]:
import numpy as np
from sklearn.dummy import DummyClassifier
from sklearn.ensemble import RandomForestClassifier
from sklearn.datasets import make_classification
import xgboost as xgb
from xgboost import XGBClassifier
from lightgbm import LGBMClassifier
from sklearn.metrics import accuracy_score, f1_score
from imblearn.over_sampling import SMOTE
from sklearn.preprocessing import MinMaxScaler
from sklearn.feature_selection import RFE, RFECV

RANDOM_STATE = 42
MAX_DEPTH = None
LEARNING_RATE = 0.2

ESTIMATOR_DUMMY_CLF = DummyClassifier(strategy='prior')
ESTIMATOR_RF_CLF = RandomForestClassifier(random_state=RANDOM_STATE)
ESTIMATOR_XGB_CLF = XGBClassifier(random_state=RANDOM_STATE)
ESTIMATOR_LGBM_CLF = LGBMClassifier(random_state=RANDOM_STATE, verbose=-1, importance_type='gain')

In [12]:
X = np.array(feature.iloc[:, 1:])
y_tr = np.array(label.iloc[:, [1]]).ravel()
y_tr = np.where(y_tr == "Success", 1, 0)

from sklearn.model_selection import LeaveOneGroupOut

logo = LeaveOneGroupOut()
groups = np.array(feature.iloc[:,[0]]).ravel()

In [13]:
sm = SMOTE(random_state=RANDOM_STATE)

N_FEATURES = 20

DUMMY_accuracies = []
DUMMY_f1_scores = []
RF_accuracies = []
RF_f1_scores = []
XGB_accuracies = []
XGB_f1_scores = []
LGBM_accuracies = []
LGBM_f1_scores = []
RF_rankings = [0] * 65
XGB_rankings = [0] * 65
LGBM_rankings = [0] * 65

#classifier
for train_idx, test_idx in logo.split(X, y_tr, groups):
    X_train, X_test = X[train_idx], X[test_idx]
    y_clf_train, y_clf_test = y_tr[train_idx], y_tr[test_idx]
    
    #oversample
    X_train, y_clf_train = sm.fit_resample(X_train, y_clf_train)

    #Recursive Feature Elimination
    
    #RF model building
    selector = RFE(ESTIMATOR_RF_CLF, n_features_to_select=N_FEATURES, step=5)
    selector = selector.fit(X_train, y_clf_train)
    y_rf_clf_pred = selector.predict(X_test)
    
    RF_accuracy = selector.score(X_test, y_clf_test)
    RF_f1_score = f1_score(y_clf_test, y_rf_clf_pred, average='macro')
    RF_accuracies.append(RF_accuracy)
    RF_f1_scores.append(RF_f1_score)
    RF_rankings = RF_rankings + selector.ranking_
    
    #XGB model building
    selector = RFE(ESTIMATOR_XGB_CLF, n_features_to_select=N_FEATURES, step=5)
    selector = selector.fit(X_train, y_clf_train)
    y_xgb_clf_pred = selector.predict(X_test)
    
    XGB_accuracy = selector.score(X_test, y_clf_test)
    XGB_f1_score = f1_score(y_clf_test, y_xgb_clf_pred, average='macro')
    XGB_accuracies.append(XGB_accuracy)
    XGB_f1_scores.append(XGB_f1_score)
    XGB_rankings = XGB_rankings + selector.ranking_

    #LGBM model building
    selector = RFE(ESTIMATOR_LGBM_CLF, n_features_to_select=N_FEATURES, step=5)
    selector = selector.fit(X_train, y_clf_train)
    y_lgbm_clf_pred = selector.predict(X_test)
    
    LGBM_accuracy = selector.score(X_test, y_clf_test)
    LGBM_f1_score = f1_score(y_clf_test, y_lgbm_clf_pred, average='macro')
    LGBM_accuracies.append(LGBM_accuracy)
    LGBM_f1_scores.append(LGBM_f1_score)
    LGBM_rankings = LGBM_rankings + selector.ranking_

    #DUMMY model building
    ESTIMATOR_DUMMY_CLF.fit(X_train, y_clf_train.ravel())
    y_dummy_clf_pred = ESTIMATOR_DUMMY_CLF.predict(X_test)
    
    #DUMMY model evaluation
    DUMMY_accuracy = ESTIMATOR_DUMMY_CLF.score(X_test, y_clf_test)
    DUMMY_f1_score = f1_score(y_clf_test, y_dummy_clf_pred, average='macro')

    DUMMY_accuracies.append(DUMMY_accuracy)
    DUMMY_f1_scores.append(DUMMY_f1_score)

# Calculate averages and standard deviations
RF_avg_accuracy = np.mean(RF_accuracies)
RF_std_accuracy = np.std(RF_accuracies)
RF_avg_f1_score = np.mean(RF_f1_scores)
RF_std_f1 = np.std(RF_f1_scores)

XGB_avg_accuracy = np.mean(XGB_accuracies)
XGB_std_accuracy = np.std(XGB_accuracies)
XGB_avg_f1_score = np.mean(XGB_f1_scores)
XGB_std_f1 = np.std(XGB_f1_scores)

LGBM_avg_accuracy = np.mean(LGBM_accuracies)
LGBM_std_accuracy = np.std(LGBM_accuracies)
LGBM_avg_f1_score = np.mean(LGBM_f1_scores)
LGBM_std_f1 = np.std(LGBM_f1_scores)

DUMMY_avg_accuracy = np.mean(DUMMY_accuracies)
DUMMY_std_accuracy = np.std(DUMMY_accuracies)
DUMMY_avg_f1_score = np.mean(DUMMY_f1_scores)
DUMMY_std_f1 = np.std(DUMMY_f1_scores)

In [14]:

# Function to format the results into a table
def format_results_table(results):
    table = (
        "                      Avg. F1 (SD)  Avg. Accuracy (SD) \n"
    )
    for model, (avg_f1, std_f1, avg_acc, std_acc) in results.items():
        table += f"{model:<20}  {avg_f1:.2f} ({std_f1:.2f})   {avg_acc:.2f} ({std_acc:.2f}) \n"
    return table

# Results dictionary
results = {
    "Baseline": (DUMMY_avg_f1_score, DUMMY_std_f1, DUMMY_avg_accuracy, DUMMY_std_accuracy),
    "Random Forest": (RF_avg_f1_score, RF_std_f1, RF_avg_accuracy, RF_std_accuracy),
    "XGBoost": (XGB_avg_f1_score, XGB_std_f1, XGB_avg_accuracy, XGB_std_accuracy),
    "LightGBM": (LGBM_avg_f1_score, LGBM_std_f1, LGBM_avg_accuracy, LGBM_std_accuracy),
}

# Format and print the results table
formatted_results_table = format_results_table(results)
print(formatted_results_table)

                      Avg. F1 (SD)  Avg. Accuracy (SD) 
Baseline              0.22 (0.14)   0.33 (0.25) 
Random Forest         0.61 (0.18)   0.71 (0.14) 
XGBoost               0.56 (0.18)   0.68 (0.14) 
LightGBM              0.57 (0.21)   0.69 (0.18) 

