## Dependencies

In [1]:
import numpy as np
import pandas as pd
from sklearn.linear_model import Lasso
import scipy.stats as stats
from sklearn.metrics import classification_report
from sklearn.metrics import recall_score
from sklearn.metrics import precision_score
from sklearn.metrics import accuracy_score
from sklearn.metrics import f1_score
from sklearn.metrics import roc_curve, roc_auc_score
from sklearn.metrics import log_loss
from sklearn.metrics import classification_report
import sklearn as sk
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
import xgboost as xgb
from sklearn.preprocessing import MinMaxScaler
#from ipynb.fs.full.driver_drowsiness_extraction import select_channel
import numpy as np
import pandas as pd
from scipy import stats

In [56]:
def channel_selection(features, labels, channel_list):
    '''
    Select the desired channels from the total feature dataset
    '''
    selected_channels = []
    for channel in channel_list:
        selected_channels.append(features.loc[features['channels'] == channel])
    return (pd.concat(selected_channels).drop('channels', axis=1))

In [54]:
def feature_selection(selected_channels, feature_subset):
    ''' 
    Select the desired subset of features to prepare training data on.
    '''
    selected_features = pd.DataFrame()
    for feature in feature_subset:
        selected_features[feature] = selected_channels[feature]
    # temporary sanity check, will delete
    # print(selected_features.head())
    return selected_features.to_numpy()

## I/O

In [57]:
# load the feature dataset as a dataframe
csv_file = 'eeg_features.csv'
df = pd.read_csv(csv_file,float_precision='round_trip')
df = df.drop('Unnamed: 0', axis=1)

In [58]:
# split the dataset to features and labels
features = df.drop('label', axis=1)
labels = df.iloc[:,-1:]
display_labels = ['drowsy' if label == 1 else 'alert' for label in labels['label'].unique()]

## Feature Selection

In [59]:
channel_list = ['F3', 'F4','C3','Cz','Oz']
selected_channels = channel_selection(features=features, labels=labels, channel_list=channel_list)
X = feature_selection(selected_channels=selected_channels, feature_subset=selected_channels.columns) # select every feature
y = labels[0:2022*len(channel_list)].to_numpy()

In [60]:
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2,random_state=1)

# apply normalization after splitting to avoid leakage
scaler = StandardScaler()
X_train = scaler.fit_transform(X_train)
X_test = scaler.transform(X_test)


In [61]:
from sklearn.neighbors import KNeighborsClassifier
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.svm import SVC
from sklearn.neural_network import MLPClassifier
import warnings
from sklearn.exceptions import DataConversionWarning
warnings.filterwarnings(action='ignore', category=DataConversionWarning)

def model_training(model_family, display_labels, stats=False, cm=False):

  if model_family == 'K-NN':
    model = KNeighborsClassifier()
  elif model_family == 'DTC':
    model = DecisionTreeClassifier()
  elif model_family == 'RFC':
    model = RandomForestClassifier(n_estimators=100)
  elif model_family == 'Logistic Regression':
    model = LogisticRegression(max_iter=5000)
  elif model_family == 'SVM':
    model = SVC(C=1.0, kernel='rbf', degree=10, gamma='scale', coef0=0.0, shrinking=True, probability=False, tol=0.001, cache_size=200, class_weight=None, verbose=False, max_iter=-1, decision_function_shape='ovr', break_ties=False, random_state=1)
  elif model_family == 'NN':
    model = MLPClassifier(activation='relu',solver='adam', alpha=1e-2, learning_rate='adaptive', max_iter=1000000, hidden_layer_sizes=(60,2), random_state=1)
  elif model_family == 'GBC':
    model = GradientBoostingClassifier(loss='log_loss',n_estimators=300, learning_rate=0.1, max_depth=10, random_state=1)

  model.fit(X_train, y_train)
  print('Accuracy of {} classifier on training set: {:.8f}'
     .format(model_family, model.score(X_train, y_train)))
  print('Accuracy of {} classifier on test set: {:.8f}'
     .format(model_family, model.score(X_test, y_test)))

  if stats:
    print()
    print("==== Stats for the {} model ====".format(model_family))
    sensitivity = recall_score(y_test, model.predict(X_test))
    print("Sensitivity (Recall):", sensitivity)

    precision = precision_score(y_test, model.predict(X_test))
    print("Precision:", precision)

    accuracy = accuracy_score(y_test, model.predict(X_test))
    print("Accuracy (Recall):", accuracy)
        
    f1 = f1_score(y_test, model.predict(X_test))
    print("F1_score:", f1)

    fpr, tpr, thresholds = roc_curve(y_test, model.predict(X_test))
    auc = roc_auc_score(y_test, model.predict(X_test))
    print("AUC:", auc)

    logloss = log_loss(y_test, model.predict(X_test))
    print("Logloss:", logloss)
    print()

  if cm:
    model_cm = confusion_matrix(y_test, model.predict(X_test))
    model_disp = ConfusionMatrixDisplay(confusion_matrix=model_cm,display_labels=display_labels)
    model_disp.plot()

In [62]:
models = ['GBC']
for model in models:
    model_training(model, display_labels, stats=True, cm=False)
    

Accuracy of GBC classifier on training set: 1.00000000
Accuracy of GBC classifier on test set: 0.81651830

==== Stats for the GBC model ====
Sensitivity (Recall): 0.7928994082840237
Precision: 0.8331606217616581
Accuracy (Recall): 0.8165182987141444
F1_score: 0.8125315816068721
AUC: 0.8165885930309008
Logloss: 6.61335084439291



## Testing and Processing

## Evaluation

In [63]:
# TODO: select different feature subsets, then apply the p-value thresholding to see if there's any inconsistencies
# after that we can either implement the p values as a function to select the top performers of a subset (subset of the subset)
# or we can just select our own features and do the processing on them

# if we're going to use multiple subsets then write another function definition for the preparation of X_train and Y_train etc.
# the function should return x_train y_train x_test y_test and it should take the selected feature subset as the input
# after that we can run the selections in their respective cells to see the performances of different models
# this can also allow us to compare the model performances with respect to feature selection which could make a great performance matrix

# P-Value Thresholding for Feature Selection
p_values = []

X_p = df.drop('channels',axis = 1)
X_p = X_p.drop('label',axis = 1)

y_p = y.flatten()
y_p = pd.Series(y_p)


#y_p = pd.Series(y['0'])
for feature in X_p.columns:
    t_stat, p_value = stats.ttest_ind(X_p[feature][y_p == 0], X_p[feature][y_p == 1])
    p_values.append(p_value)

alpha = 0.05

# Select features with p-values below the significance level
selected_features = [X_p.columns[i] for i, p in enumerate(p_values) if p < alpha]
# Alternatively, you can rank features by p-value
sorted_features = [x for _, x in sorted(zip(p_values, X_p.columns))]

In [None]:
'''
from sklearn.decomposition import PCA
from sklearn.preprocessing import LabelEncoder, StandardScaler

#y = y.reset_index(drop=True)

pca = PCA(n_components = 0.999)
X_train = pca.fit_transform(X_train)
X_test = pca.transform(X_test)
#X = dataPCA
variance = pd.DataFrame(pca.explained_variance_ratio_)
print(pca.explained_variance_ratio_)
print(np.sum(pca.explained_variance_ratio_))
'''