<a href="https://colab.research.google.com/github/anoushkabhat13/BCI_Project_Drowsiness_Detection/blob/main/BCI_Project.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import numpy as np
import matplotlib.pyplot as plt

import mne
from mne.datasets.sleep_physionet.age import fetch_data
from mne.time_frequency import psd_welch
from mne.io import Raw

from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score
from sklearn.metrics import confusion_matrix
from sklearn.metrics import classification_report
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import FunctionTransformer
from sklearn import preprocessing
from sklearn.linear_model import LogisticRegression

from pandas import read_csv
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.svm import SVC
import scipy.signal as sig        

In [None]:
eeg = mne.datasets.sleep_physionet.age.fetch_data(list(range(0, 83)), recording=(1, 2), path=None, force_update=False, base_url='https://physionet.org/physiobank/database/sleep-edfx/sleep-cassette/', on_missing='ignore',verbose=None)

# get annotations and raw eeg files for data
edf_array = []
annotation_array = []

for file in eeg:
    # index 0 is the recording, index 1 is the annotations
    #Fpz-Cz pre-frontal midline - central midline
    # muse uses frontal lobe FP and temporal lobe 
    edf = mne.io.read_raw_edf(file[0], misc=["EEG Fpz-Cz"])
    annotation = mne.read_annotations(file[1])
    edf.set_annotations(annotation, emit_warning=False)
    # get the EEG signals
    edf_array.append(edf)
    annotation_array.append(annotation)

In [None]:
# gets data in array format (similar to csv file) and runs preprocessing
raw_data = []
for i in range(0, len(edf_array)):
    print(i)
    data = edf_array[i].get_data()[0]
    raw_data.append(preprocessing.scale(data))
print(raw_data)

In [None]:
# dataframe - 2 columns - first colume dataframe with all eeg data, second one time points of events, third one annotations
sleep_stages = {'Sleep stage W': 0,'Sleep stage 1': 1}
all_data = pd.DataFrame()
full_df = list()

for i in range(len(raw_data)):
    events, event_ids = mne.events_from_annotations(edf_array[i], event_id=sleep_stages)
    epochs = mne.Epochs(edf_array[i], events, event_ids, 0, 10, baseline=None, preload=True)
    dataf = epochs.to_data_frame()
    dataf.sort_index(inplace=True)
    dataf = dataf.drop(columns=['EEG Pz-Oz', 'EOG horizontal', 'Resp oro-nasal',
       'EMG submental', 'Temp rectal', 'Event marker'])
    dataf['condition'].replace({"Sleep stage W": "0", "Sleep stage 1": "1"}, inplace=True)
    subject = [i]*dataf.shape[0]
    dataf['subject'] = subject
    full_df.append(dataf)
    
    all_data = pd.concat([all_data, dataf], axis=0)

In [None]:
grouped = all_data.groupby("subject")
sig_0 = [[]]
sig_1 = [[]]

for i in range(len(grouped)):
    # group by subject
    subject_grouped = grouped.get_group(i)
    
    # group subject data by condition
    sub_grouped = subject_grouped.groupby("condition")
    subgrp_0 = sub_grouped.get_group("0")
    subgrp_1 = sub_grouped.get_group("1")
    
    sig_0.append(subgrp_0["EEG Fpz-Cz"].values[:10])
    sig_1.append(subgrp_1["EEG Fpz-Cz"].values[:10])

sig_0.pop(0)
sig_1.pop(0)

total_array = sig_0 + sig_1
events_array = ([0]*len(grouped)) + ([1]*len(grouped))
new_data = {"EEG Signal": total_array, "Events": events_array}
new_dataframe = pd.DataFrame(new_data)
new_dataframe

X = new_dataframe["EEG Signal"]
y = new_dataframe["Events"]
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size = 0.30)

In [None]:
a = new_dataframe.iloc[:,0] # the eeg signals
#print(new_dataframe.iloc[:, 1])
b = new_dataframe.iloc[:, 1]
A_train, A_test, b_train, b_test = train_test_split(a, b, test_size =0.3, random_state=42)

# make the data used in the model into 3d array
xarr = np.array(total_array)
xarr.shape #200 samples 50 epochs per sample 1 channel
newx = xarr[:, :, np.newaxis] # make to 3d

#feature selection
import scipy.stats
import entropy as ent
import tsfel
def mean(x):
    return np.mean(x, axis=None)
def abs_diff_signal(x):
    return np.sum(np.abs(np.diff(x, axis=-1)), axis=None)
def std(x):
    return np.std(x, axis=None)
def rms(x):
    return np.sqrt(np.mean(x**2, axis=None))
def ptp(x):
    return np.ptp(x, axis=None)
def ent(x):
    pd_series = pd.Series(x)
    p_data = pd_series.value_counts()
    return scipy.stats.entropy(p_data)
def psd(x):
    freqs, psd = signal.welch(x, 100, nperseg = 400)
    freq_res = freqs[1]-freqs[0]

gmean = []
gstd = []
grms = []
gent = []
gptp = []
for d in newx:
    gmean.append(mean(d))
    gstd.append(std(d))
    grms.append(rms(d))
    gent.append(ent(d.flatten()))
    gptp.append(ptp(d))
pwr = []
dis = []
for d in xarr:
    pwr.append(tsfel.feature_extraction.features.max_power_spectrum(d, 100))
    dis.append(tsfel.feature_extraction.features.pk_pk_distance(d))

new_dataframe["mean"] = gmean
new_dataframe["std"] = gstd
new_dataframe["ptp"] = gptp
new_dataframe["rms"] = grms
new_dataframe["ent"] = gent
new_dataframe["pwr"] = pwr
new_dataframe["dis"] =  dis
print(new_dataframe)

In [None]:
from mlxtend.feature_selection import SequentialFeatureSelector as SFS

clf = RandomForestClassifier(n_estimators=100, n_jobs=1)

#X = new_dataframe[["mean", "std", "dis", "rms", "ent", "ptp", "pwr"]]

# Build step forward feature selection
sfs1 = SFS(clf,
           k_features=4,
           forward=True,
           floating=False,
           verbose=2,
           scoring='accuracy',
           cv=2)

# Perform SFFS
sfs1 = sfs1.fit(X_train, y_train)
print(sfs1.k_feature_idx_)

In [None]:
def check_column_score(cols):
    '''
    Trains and evaluates the model via cross-validation on the columns
    of the dataset with select indices.
    '''
    
    print("training with columns " + str(cols))

    SVM = svm.SVC()
    return cross_val_score(SVM, X_train[cols], y_train, cv = 5).mean() 
    # average CV score for each subset of the predictor data

In [None]:
def visualize_confusion_matrix(score):
    plt.figure(figsize=(9,9))
    sns.heatmap(c, annot=True, fmt=".3f", linewidths=.5, square = True, cmap = 'Blues_r')
    plt.ylabel('Actual label')
    plt.xlabel('Predicted label')
    all_sample_title = 'Accuracy Score: {0}'.format(score)
    plt.title(all_sample_title, size = 15)

In [None]:
np.random.seed(200)
powers = 20
gamma_pool = [10**k for k in range(-powers, powers+1)]
# Initialize the score of the highest-performing model to negative infinity
best_score = -np.inf
best_g = -np.inf
# Loop through each potential value for gamma, create an SVM model with that
# gamma value, cross-validate that model on the training set, and save the
# best score among all of these models
#X = new_dataframe.loc[:, ["mean", "std", "rms"]].values
X = new_dataframe.loc[:, ["mean", "std", "rms", "dis"]].values
y = new_dataframe["Events"].to_list()
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size = 0.30)


for g in gamma_pool:
    SVM = svm.SVC(gamma = g)
    score = cross_val_score(SVM, X_train, y_train, cv = 5).mean()
    
    # If the average CV score for an SVM model with the current gamma is
    # greater than the best average CV score so far, make it the
    # new best average CV score and store its gamma value
    if score > best_score:
        # Perform update
        best_score = score
        best_g = g

# SEE OPTIMAL GAMMA VALUE AND BEST CV SCORE
print(best_g, best_score)

# Use best_g to evaluate our model on the test set
SVM = svm.SVC(gamma = best_g)
# Train model on training set
SVM.fit(X_train, y_train)
# Score model on test set to see how it performs on unseen data
SVM.score(X_test, y_test)

# Save predictions our model makes about sleep stage in the test set
y_test_pred = SVM.predict(X_test)

# Import the confusion_matrix() function from the metrics package in sklearn
from sklearn.metrics import confusion_matrix

print("Accuracy:",metrics.accuracy_score(y_test, y_test_pred))
score = metrics.accuracy_score(y_test, y_test_pred)

# Evaluate a confusion matrix on the test set
c = confusion_matrix(y_test, y_test_pred)
c

visualize_confusion_matrix(score)


In [None]:
from sklearn.ensemble import RandomForestClassifier
np.random.seed(200)
X = new_dataframe.loc[:, ["mean", "std", "rms", "dis"]].values
y = new_dataframe["Events"].to_list()
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size = 0.30)

N = 30

scores = np.zeros(N)
best_score = -np.inf

for d in range(1, N+1):
    RF3 = RandomForestClassifier(n_estimators=d)
    scores[d-1] = cross_val_score(RF3, X_train, y_train, cv=5).mean()
    if scores[d-1]>best_score:
        best_score=scores[d-1]
        best_N = d
print(best_N, best_score)

clf=RandomForestClassifier(n_estimators=best_N)


clf.fit(X_train,y_train)

y_pred=clf.predict(X_test)
print("Accuracy:",metrics.accuracy_score(y_test, y_pred))

In [None]:
combos = [["mean"], ["mean", "std"], ["mean", "std", "rms"], ["mean", "std", "dis", "ptp", "rms"],
          ["mean", "std", "dis", "ptp", "rms", "ent"], ["mean", "std", "rms", "dis"], ["mean", "std", "rms", "pwr"], ["mean", "std", "rms", "dis", "ent"]]
X = new_dataframe[["EEG Signal", "mean", "std", "ptp", "rms", "ent", "dis", "pwr"]]
y = new_dataframe["Events"]
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size = 0.30)
# not all possible combos, but a good starting point
# Use a function?

# Check the model score when trained on each subset of the predictor data
for combo in combos:
    x = check_column_score(combo)
    print("CV score is " + str(np.round(x,3)))
    
#mean/std/rms or mean/std/rms/dis looking like the highest

In [None]:
#for checking which model is best -- if you have better code that is welcome too
models = []
models.append(('LR', LogisticRegression()))
models.append(('LDA', LinearDiscriminantAnalysis()))
models.append(('KNN', KNeighborsClassifier()))
models.append(('CART', DecisionTreeClassifier()))
models.append(('NB', GaussianNB()))
models.append(('SVM', SVC()))

In [None]:
# evaluate each model in turn
results = []
names = []
scoring = 'accuracy'
#X = new_dataframe.loc[:, ["EEG Signal", "mean", "std", "rms"]].values
X = new_dataframe.loc[:, ["mean", "std", "rms"]].values
y = new_dataframe["Events"]
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size = 0.30)


for name, model in models:
    kfold = model_selection.KFold(n_splits=10, random_state=seed)
    cv_results = model_selection.cross_val_score(model, X_train, y_train, cv=kfold, scoring=scoring)
    results.append(cv_results)
    names.append(name)
    msg = "%s: %f (%f)" % (name, cv_results.mean(), cv_results.std())
    print(msg)
# boxplot algorithm comparison
fig = plt.figure()
fig.suptitle('Comparison between different MLAs')
ax = fig.add_subplot(111)
plt.boxplot(results)
ax.set_xticklabels(names)
plt.show()

In [None]:
import keras
from keras import layers
from tensorflow.keras.layers import Input, Lambda, Dense, Flatten,Conv1D, Dropout
from tensorflow.keras.models import Model
from tensorflow.keras.layers import MaxPooling1D
from keras.layers.advanced_activations import LeakyReLU
from keras.layers import BatchNormalization
import tensorflow as tf

#X = new_dataframe.loc[:, ["mean", "std", "rms"]].values
X = new_dataframe["EEG Signal"].values
y = new_dataframe["Events"].values
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size = 0.30)

mylist = []
for val in X_train:
    l1 = []
    for v in val:
        l1.append([v])
    mylist.append(l1)
print(len(mylist), len(mylist[0]), len(mylist[0][0]))

X_train = np.asarray(np.array(mylist)).astype(np.float32)

mylist2 = []
for val in X_test:
    l2 = []
    for v in val:
        l2.append([v])
    mylist2.append(l2)

X_test = np.asarray(np.array(mylist2)).astype(np.float32)

model = keras.Sequential()
model.add(Conv1D(32, kernel_size=3,activation='relu',input_shape=(24,1),padding='same'))
model.add(MaxPooling1D(pool_size=2))
model.add(Dropout(0.3))

model.add(Conv1D(64, kernel_size=3,activation='relu',input_shape=(24,1),padding='same'))
model.add(Conv1D(64, kernel_size=3,activation='relu',input_shape=(24,1),padding='same'))
model.add(BatchNormalization())

model.add(Conv1D(128, kernel_size=3,activation='relu',input_shape=(24,1),padding='same'))
model.add(BatchNormalization())
model.add(Dropout(0.3))
model.add(Conv1D(128, kernel_size=3,activation='relu',input_shape=(24,1),padding='same'))
model.add(MaxPooling1D(pool_size=2))

model.add(Flatten())
model.add(Dense(512, activation='relu'))
model.add(Dense(1, activation='sigmoid'))

#model.summary()
model.compile(optimizer="adam", loss='binary_crossentropy', metrics=['accuracy'])
classifier = model.fit(X_train, y_train, epochs=24, batch_size=214)

loss, accuracy = model.evaluate(X_test, y_test, verbose=1)
print("Test: accuracy = %f  ;  loss = %f" % (accuracy, loss))

