This code produces an EEG dataset according to the methodology described by Haufe et al. in "EEG potentials predict upcoming emergency brakings during simulated driving." dx.doi.org/10.1088/1741-2560/8/5/056001. Instead of the original modeling approach, we train and test our own model here. 

The original EEG data are part of a dataset from Haufe et al. called "Emergency braking during simulated driving." The dataset is available at http://bnci-horizon-2020.eu/database/data-sets

In [1]:
import numpy as np
import h5py
f = h5py.File('../../EEG_Dataset_Haufe/VPae.mat','r')

Read experimental data.

In [2]:
cnt = f.get('cnt')

We then sort the data and assign to variables for: 

signal channel names: cnt.clab
sampling frequency: cnt.fs
time-series data: cnt.x

In [3]:
cnt.clab = np.array(cnt['clab'])
cnt.fs = np.array(cnt['fs'])
cnt.x = np.array(cnt['x']) 

samplingRate = cnt.fs[0][0] #Down-/upsample rate for all data = 200Hz.

Let's check the names of the signal channels, along with the index of one of the channels. The index indicates to the order of the corresponding data array in cnt.x.

In [4]:
channelNames = ''
channelOfInterest = 'Fz'

for i in range(0,len(cnt.clab)):
    ref = cnt.clab[i][0]
    res = cnt[ref]
    #print(bytes(np.array(res).ravel().tolist()).decode('UTF-8'))
    channelName = bytes(np.array(res).ravel().tolist()).decode('UTF-8')
    if channelName == channelOfInterest : #'EMGf':
        print(channelOfInterest,' index =',i)
    channelNames = channelNames + channelName
    if i < len(cnt.clab)-1:
        channelNames = channelNames + ','
print(channelNames)    


Fz  index = 10
EOGv,Fp1,Fp2,AF3,AF4,EOGh,F7,F5,F3,F1,Fz,F2,F4,F6,F8,FT7,FC5,FC3,FC1,FCz,FC2,FC4,FC6,FT8,T7,C5,C3,C1,Cz,C2,C4,C6,T8,TP7,CP5,CP3,CP1,CPz,CP2,CP4,CP6,TP8,P9,P7,P5,P3,P1,Pz,P2,P4,P6,P8,P10,PO7,PO3,POz,PO4,PO8,O1,Oz,O2,EMGf,lead_gas,lead_brake,dist_to_lead,wheel_X,wheel_Y,gas,brake


Let's also check the data for the channel of interest. Note that the data are a concatenation of three 45-minute blocks collected from given test subject.

In [5]:
import matplotlib.pyplot as plt

#data, channelName, units = cnt.x[0], 'EOGv', 'unknown'
#data, channelName, units = cnt.x[61], 'EMGf', 'unknown' 
#data, channelName, units = cnt.x[68], 'Brake', 'unknown' 
#data, channelName, units = cnt.x[66], 'Y-axis Wheel', 'unknown' 
#data, channelName, units = cnt.x[65], 'X-axis Wheel', 'unknown'  
#data, channelName, units = cnt.x[64], 'Inter-vehicle Distance',  'm'  
data, channelName, units = cnt.x[10], 'Fz', 'unknown'

numDataPoints = len(data) 
t = np.arange(0, numDataPoints/samplingRate, 1/samplingRate)
#print('Number of data points = ' + str(numDataPoints))
fig, ax = plt.subplots(1,1)
ax.plot(t, data)
ax.set_ylabel(units)
ax.set_xlabel('Seconds')
ax.set_title('Channel ' + channelName)
plt.show()

<Figure size 640x480 with 1 Axes>

Next we'll read the event data corresponding to the experimental data.

In [6]:
mrk = f.get('mrk')

As shown below, the event data consists of: 

"class names": Categories of events
"event": events contains a key called "react," which contains reaction times between between car_brake and react_emg.
"time": timestamps for each event.
"y": indicates which class an event belongs to for a given timestamp.

In [7]:
mrk.keys()

<KeysViewHDF5 ['className', 'event', 'time', 'y']>

We'll read the different event data into specified variables.

In [8]:
mrk.classNames = mrk['className']
mrk.time = mrk['time']
mrk.y = mrk['y']
mrk.events = mrk['event']

In [9]:
'''
#Print brake reaction times of driver.
mrk.events['react'][0]
'''

"\n#Print brake reaction times of driver.\nmrk.events['react'][0]\n"

In [10]:
mrk.time

<HDF5 dataset "time": shape (914, 1), type "<f8">

In [11]:
classNames = ''
ref = mrk.classNames[4][0]
res = mrk[ref]

for i in range(0, len(mrk.classNames)):
    ref = mrk.classNames[i][0]
    res = mrk[ref]
    className = bytes(np.array(res).ravel().tolist()).decode('UTF-8')
    classNames = classNames + className
    if i < len(mrk.classNames)-1:
        classNames = classNames + ','
print(classNames) 

car_normal,car_brake,car_hold,car_collision,react_emg


In [12]:
'''
#Print event times
for i in range(0, len(mrk.time)):
    print(mrk.time[i][0])
'''

'\n#Print event times\nfor i in range(0, len(mrk.time)):\n    print(mrk.time[i][0])\n'

In [13]:
'''
#Print event category values: 0 = no event, 1 = event
mrk.y
for i in range(0, len(mrk.y)):
    print(mrk.y[i][1])
'''

'\n#Print event category values: 0 = no event, 1 = event\nmrk.y\nfor i in range(0, len(mrk.y)):\n    print(mrk.y[i][1])\n'

In [14]:
#Find all car braking events and store corresponding timestamps.
carBrakeTime = []

for i in range(0, len(mrk.y)):
    #print( mrk.y[i][1])
    if mrk.y[i][1] == 1: #Check if car is braking, i.e. y[i] = 1
        carBrakeTime.append(mrk.time[i][0]) #Store timestamp 

In [15]:
from scipy.signal import spectrogram

#Create training data from braking event EEG via these steps:
#Get segments of braking event EEG.
#Covert to PSD.
#Store first 4 PSD components of each segment in variable for training.
def createDatasetFromEEGEvents(timestamps, data, samplingRate, numberOfPSDComponents = 4):
    dt = 1/samplingRate #Time increment in seconds
    
    dt1_index = 0
    dt2_index = int(100/1000/dt) #Covert timestamps to seconds and divde by time increment to get index of datapoint at 100 ms.
    baselineCorrection_eeg = mean(data[dt1_index:dt2_index+1])
    
    #Define variables to split data in first 1/2 for training and second 1/2 for validation
    brakingEvent_eeg_PSD_train = []
    brakingEvent_eeg_PSD_val = []

    dt = 1/samplingRate #Time increment in seconds

    for time in carBrakeTime: #Iterate through event timestamps in milliseconds
        #index = int(time/1000/dt) #Covert timestamps to seconds and divde by time increment to get index of datapoint
        dt1_index = int((time-300)/1000/dt) #Index of datapoint 300 ms before event datapoint.
        dt2_index = int((time+1200)/1000/dt) #Index of datapoint 1200 ms before event datapoint.

        brakingEvent_eeg = data[dt1_index:dt2_index+1]-baselineCorrection_eeg
        #Normalize signal data WRT to max and find generate power spectral density 
        freq_data, time_data, pwr_spectral_density_data = spectrogram(
                                                            np.array([brakingEvent_eeg/max(brakingEvent_eeg)]), 
                                                            samplingRate
                                                            )
        if time < len(data)*1000*dt/2:
            brakingEvent_eeg_PSD_train.append(np.sort(np.sum(pwr_spectral_density_data[0],1))[-numberOfPSDComponents:None].tolist())
            continue
        brakingEvent_eeg_PSD_val.append(np.sort(np.sum(pwr_spectral_density_data[0],1))[-numberOfPSDComponents:None].tolist())
        
    return brakingEvent_eeg_PSD_train, brakingEvent_eeg_PSD_val

In [16]:
from statistics import mean

#Create baseline training EEG data containing no braking event EEG via these steps:
#Get 100 ms EEG segment at beginning of data to use for baseline correction.
#Get segments of EEG without braking events and subract 100 ms EEG segment.
#Covert to PSD.
#Store first 4 PSD components of each segment in variable for training.

def createDatasetFromEEGWithoutEvents(timestamps, data, samplingRate, numberOfPSDComponents=4):
    dt = 1/samplingRate #Time increment in seconds
    
    dt1_index = 0
    dt2_index = int(100/1000/dt) #Covert timestamps to seconds and divde by time increment to get index of datapoint at 100 ms.
    baselineCorrection_eeg = mean(data[dt1_index:dt2_index+1])

    noEvent_eeg_PSD_train = []
    noEvent_eeg_PSD_val = []

    for i in range(0, len(mrk.time)): #Iterate through all event timestamps in milliseconds
        if mrk.time[i][0] < 4500: #Skip iteration if there is not enough time to get an EEG segment between time of first datapoint and time of first event. 
            continue

        if i > 0:
            if mrk.time[i][0]-mrk.time[i-1][0] < 7500: #Skip iteration if there is not enough time to get EEG segment between current and previous timestamps.
                continue

        numberOfSegments = int((mrk.time[i][0]-mrk.time[i-1][0]-6000)/2000) #Calculate how many user-specified EEG segments can fit between two events.

        for segmentNum in range(0, numberOfSegments):
            #Add 500 ms between each EEG segment, except for segment closest in time to event
            dt1_index = int((mrk.time[i][0]-5000+(2000*segmentNum)-500)/1000/dt) #500 represents 500 ms offset between each EEG segment.
            dt2_index = int((mrk.time[i][0]-3000+(2000*segmentNum))/1000/dt)

            #dt1_index = int((mrk.time[i][0]-4500)/1000/dt)
            #dt2_index = int((mrk.time[i][0]-3000)/1000/dt)
            noEvent_eeg = data[dt1_index:dt2_index+1]-baselineCorrection_eeg #Get EEG segment immediately prior to current event.
            #Normalize signal data WRT to max and find generate power spectral density 
            freq_data, time_data, pwr_spectral_density_data = spectrogram( 
                                                                np.array([noEvent_eeg/max(noEvent_eeg)]), 
                                                                samplingRate
                                                                )
            if mrk.time[i][0] < len(data)*1000*dt/2:
                noEvent_eeg_PSD_train.append(np.sort(np.sum(pwr_spectral_density_data[0],1))[-numberOfPSDComponents:None].tolist())
                continue
            noEvent_eeg_PSD_val.append(np.sort(np.sum(pwr_spectral_density_data[0],1))[-numberOfPSDComponents:None].tolist())
    
        if i == len(mrk.time): #If iteration reaches last event timestamp, set indices to get any possible EEG segment beyond timestamp.
            numberOfSegments = int((len(data)*1000*dt/2-mrk.time[i][0])/2000) #Calculate how many user-specified EEG segments can fit between two events.

            for segmentNum in range(0, numberOfSegments):
                #Add 500 ms between each EEG segment, except for segment closest in time to event
                dt1_index = int((mrk.time[i][0]+3000+(2000*segmentNum))/1000/dt)
                dt2_index = int((mrk.time[i][0]+5000+(2000*segmentNum)-500)/1000/dt) #500 represents 500 ms offset between each EEG segment.


                #dt1_index = int((mrk.time[i][0]+3000)/1000/dt)
                #dt2_index = int((mrk.time[i][0]+4500)/1000/dt)
                #if dt2_index > len(data)-1: #If the second index is greater than index of last EEG datapoint, skip iteration.
                #    continue
                noEvent_eeg = data[dt1_index:dt2_index+1]  #Get EEG segment 
                #Normalize signal data WRT to max and find generate power spectral density 
                freq_data, time_data, pwr_spectral_density_data = spectrogram( 
                                                                    np.array([noEvent_eeg/max(noEvent_eeg)]), 
                                                                    samplingRate
                                                                    )
                if mrk.time[i][0] < len(data)*1000*dt/2:
                    noEvent_eeg_PSD_train.append(np.sort(np.sum(pwr_spectral_density_data[0],1))[-numberOfPSDComponents:None].tolist())
                    continue
                noEvent_eeg_PSD_val.append(np.sort(np.sum(pwr_spectral_density_data[0],1))[-numberOfPSDComponents:None].tolist())
    
    return noEvent_eeg_PSD_train, noEvent_eeg_PSD_val

from statistics import mean
#Create baseline training EEG data containing no braking event EEG via these steps:
#Get 100 ms EEG segment at beginning of data to use for baseline correction.
#Get segments of EEG without braking events and subract 100 ms EEG segment.
#Covert to PSD.
#Store first 4 PSD components of each segment in variable for training.
numberOfPSDComponents = 4

dt = 1/samplingRate #Time increment in seconds

dt1_index = 0
dt2_index = int(100/1000/dt) #Covert timestamps to seconds and divde by time increment to get index of datapoint at 100 ms.
baselineCorrection_eeg = mean(data[dt1_index:dt2_index+1])

noEvent_eeg_PSD_train = []
noEvent_eeg_PSD_val = []
    
for i in range(0, len(mrk.time)): #Iterate through all event timestamps in milliseconds
    if mrk.time[i][0] < 4500: #Skip iteration if there is not enough time to get an EEG segment between time of first datapoint and time of first event. 
        continue
    
    if i > 0:
        if mrk.time[i][0]-mrk.time[i-1][0] < 7500: #Skip iteration if there is not enough time to get EEG segment between current and previous timestamps.
            continue
    dt1_index = int((mrk.time[i][0]-4500)/1000/dt)
    dt2_index = int((mrk.time[i][0]-3000)/1000/dt)
    noEvent_eeg = data[dt1_index:dt2_index+1]-baselineCorrection_eeg  #Get EEG segment immediately prior to current event.
    #Normalize signal data WRT to max and find generate power spectral density 
    freq_data, time_data, pwr_spectral_density_data = spectrogram( 
                                                        np.array([noEvent_eeg/max(noEvent_eeg)]), 
                                                        samplingRate
                                                        )
    if mrk.time[i][0] < len(data)*1000*dt/2:
        noEvent_eeg_PSD_train.append(np.sort(np.sum(pwr_spectral_density_data[0],1))[-numberOfPSDComponents:None].tolist())
        continue
    noEvent_eeg_PSD_val.append(np.sort(np.sum(pwr_spectral_density_data[0],1))[-numberOfPSDComponents:None].tolist())
    if i == len(mrk.time): #If iteration reaches last event timestamp, set indices to get any possible EEG segment beyond timestamp.
        dt1_index = int((mrk.time[i][0]+3000)/1000/dt)
        dt2_index = int((mrk.time[i][0]+4500)/1000/dt)
        if dt2_index > len(data)-1: #If the second index is greater than index of last EEG datapoint, skip iteration.
            continue
        noEvent_eeg = data[dt1_index:dt2_index+1]  #Get EEG segment immediately prior to current event.
        #Normalize signal data WRT to max and find generate power spectral density 
        freq_data, time_data, pwr_spectral_density_data = spectrogram( 
                                                            np.array([noEvent_eeg/max(noEvent_eeg)]), 
                                                            samplingRate
                                                            )
        if mrk.time[i][0] < len(data)*1000*dt/2:
            noEvent_eeg_PSD_train.append(np.sort(np.sum(pwr_spectral_density_data[0],1))[-numberOfPSDComponents:None].tolist())
            continue
        noEvent_eeg_PSD_val.append(np.sort(np.sum(pwr_spectral_density_data[0],1))[-numberOfPSDComponents:None].tolist())


from tqdm import tqdm
#Create train and validation datasets

numberOfPSDComponents = 4

brakingEvent_eeg_PSD_train = []
brakingEvent_eeg_PSD_val = []

'''
noEvent_eeg_PSD_train = []
noEvent_eeg_PSD_val = []
'''

#Iterate through all EEG channels.
for channelNum in tqdm(range(0, 61)):
    
    #P9, P10, CPz, FCz
    channelsOfInterest = ['P9', 'P10', 'CPz', 'FCz'] <-- 20% accuracy
    #['P9'] <-- 48% accuracy
    #['POz','FCz'] <-- 34%
    #['FCz'] <-- 42% accuracy
    #['F3', 'F4', 'C3', 'C4'] <-- 19% accuracy
    #['POz'] <-- 44% accuracy
    #['O1', 'O2', 'Oz'] <-- 25% accuracy
    #['POz']#['FC3', 'FC4', 'Fz']#['Fz', 'FCz']
    ref = cnt.clab[channelNum][0]
    #print(bytes(np.array(res).ravel().tolist()).decode('UTF-8'))
    channelName = bytes(np.array(cnt[ref]).ravel().tolist()).decode('UTF-8')
    
    if channelName in channelsOfInterest:
    #if channelNum not in [0,3,4,5]:
        data = cnt.x[channelNum]
        event_eeg_PSD_train, event_eeg_PSD_val = createDatasetFromEEGEvents(mrk.time, data, samplingRate, numberOfPSDComponents)
        
        #_noEvent_eeg_PSD_train, _noEvent_eeg_PSD_val = createDatasetFromEEGWithoutEvents(mrk.time, data, samplingRate, numberOfPSDComponents)
        
        for array in event_eeg_PSD_train:
            #brakingEvent_eeg_PSD_train = np.concatenate((brakingEvent_eeg_PSD_train, array))
            brakingEvent_eeg_PSD_train.append(array)
        for array in event_eeg_PSD_val:
            #brakingEvent_eeg_PSD_val = np.concatenate((brakingEvent_eeg_PSD_val, array))
            brakingEvent_eeg_PSD_val.append(array)
            
        '''
        for array in _noEvent_eeg_PSD_train:
            #noEvent_eeg_PSD_train = np.concatenate((noEvent_eeg_PSD_train, array))
            noEvent_eeg_PSD_train.append(array)
        for array in _noEvent_eeg_PSD_val:
            #noEvent_eeg_PSD_val = np.concatenate((noEvent_eeg_PSD_val, array))
            noEvent_eeg_PSD_val.append(array)
        '''
    
#Label = 0 indicates no event; label = 1 indicates EEG braking event
trainData = np.concatenate((brakingEvent_eeg_PSD_train, noEvent_eeg_PSD_train))
trainLabels_event = np.ones(len(brakingEvent_eeg_PSD_train),dtype=int) 
trainLabels_noEvent = np.zeros(len(noEvent_eeg_PSD_train),dtype=int) 
trainLabels = np.concatenate((trainLabels_event, trainLabels_noEvent))

valData = np.concatenate((brakingEvent_eeg_PSD_val, noEvent_eeg_PSD_val))
valLabels_event = np.ones(len(brakingEvent_eeg_PSD_val),dtype=int) 
valLabels_noEvent = np.zeros(len(noEvent_eeg_PSD_val),dtype=int) 
valLabels = np.concatenate((valLabels_event, valLabels_noEvent))

In [65]:
from tqdm import tqdm
#Create train and validation datasets

numberOfPSDComponents = 1000 #Increasing components from 0 to 100 AUC accuracy.

brakingEvent_eeg_PSD_train = []
brakingEvent_eeg_PSD_val = []

noEvent_eeg_PSD_train = []
noEvent_eeg_PSD_val = []

#Iterate through all EEG channels.
for channelNum in tqdm(range(0, 61)):
    
    channelsOfInterest = ['P9', 'P10', 'CPz', 'FCz']#['O1', 'O2', 'Oz']#['POz']   <-- 46% accuracy
    #['POz']#['FC3', 'FC4', 'Fz']#['Fz', 'FCz']#['F3', 'F4', 'C3', 'C4']
    ref = cnt.clab[channelNum][0]
    #print(bytes(np.array(res).ravel().tolist()).decode('UTF-8'))
    channelName = bytes(np.array(cnt[ref]).ravel().tolist()).decode('UTF-8')
    '''
    if channelName in channelsOfInterest:
    #if channelNum not in [0,3,4,5]:
        data = cnt.x[channelNum]
        event_eeg_PSD_train, event_eeg_PSD_val = createDatasetFromEEGEvents(mrk.time, data, samplingRate, numberOfPSDComponents)
        _noEvent_eeg_PSD_train, _noEvent_eeg_PSD_val = createDatasetFromEEGWithoutEvents(mrk.time, data, samplingRate, numberOfPSDComponents)
        
        for array in event_eeg_PSD_train:
            #brakingEvent_eeg_PSD_train = np.concatenate((brakingEvent_eeg_PSD_train, array))
            brakingEvent_eeg_PSD_train.append(array)
        for array in event_eeg_PSD_val:
            #brakingEvent_eeg_PSD_val = np.concatenate((brakingEvent_eeg_PSD_val, array))
            brakingEvent_eeg_PSD_val.append(array)
        for array in _noEvent_eeg_PSD_train:
            #noEvent_eeg_PSD_train = np.concatenate((noEvent_eeg_PSD_train, array))
            noEvent_eeg_PSD_train.append(array)
        for array in _noEvent_eeg_PSD_val:
            #noEvent_eeg_PSD_val = np.concatenate((noEvent_eeg_PSD_val, array))
            noEvent_eeg_PSD_val.append(array)
    '''
    data = cnt.x[channelNum]
    event_eeg_PSD_train, event_eeg_PSD_val = createDatasetFromEEGEvents(mrk.time, data, samplingRate, numberOfPSDComponents)
    _noEvent_eeg_PSD_train, _noEvent_eeg_PSD_val = createDatasetFromEEGWithoutEvents(mrk.time, data, samplingRate, numberOfPSDComponents)

    for array in event_eeg_PSD_train:
        #brakingEvent_eeg_PSD_train = np.concatenate((brakingEvent_eeg_PSD_train, array))
        brakingEvent_eeg_PSD_train.append(array)
    for array in event_eeg_PSD_val:
        #brakingEvent_eeg_PSD_val = np.concatenate((brakingEvent_eeg_PSD_val, array))
        brakingEvent_eeg_PSD_val.append(array)
    for array in _noEvent_eeg_PSD_train:
        #noEvent_eeg_PSD_train = np.concatenate((noEvent_eeg_PSD_train, array))
        noEvent_eeg_PSD_train.append(array)
    for array in _noEvent_eeg_PSD_val:
        #noEvent_eeg_PSD_val = np.concatenate((noEvent_eeg_PSD_val, array))
        noEvent_eeg_PSD_val.append(array)
    
#Label = 0 indicates no event; label = 1 indicates EEG braking event
trainData = np.concatenate((brakingEvent_eeg_PSD_train, noEvent_eeg_PSD_train))
trainLabels_event = np.ones(len(brakingEvent_eeg_PSD_train),dtype=int) 
trainLabels_noEvent = np.zeros(len(noEvent_eeg_PSD_train),dtype=int) 
trainLabels = np.concatenate((trainLabels_event, trainLabels_noEvent))

valData = np.concatenate((brakingEvent_eeg_PSD_val, noEvent_eeg_PSD_val))
valLabels_event = np.ones(len(brakingEvent_eeg_PSD_val),dtype=int) 
valLabels_noEvent = np.zeros(len(noEvent_eeg_PSD_val),dtype=int) 
valLabels = np.concatenate((valLabels_event, valLabels_noEvent))

100%|██████████████████████████████████████████████████████████████████████████████████████████████████████████| 61/61 [02:32<00:00,  2.51s/it]


In [66]:
#Train LDA model
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis

clf = LinearDiscriminantAnalysis()
clf.fit(np.array(trainData), trainLabels)

LinearDiscriminantAnalysis()

In [67]:
#Validate LDA model
eventPredictions = clf.predict(valData).tolist()

In [68]:
accuracy = abs(sum(eventPredictions-valLabels))/len(eventPredictions)
print("LDA model accuracy: ",round(accuracy,2))

LDA model accuracy:  0.07


In [69]:
#Area under the curve score
from sklearn.metrics import roc_auc_score

accuracyAUC = roc_auc_score(valLabels, clf.predict_proba(np.array(valData))[:, 1])
print("LDA model AUC accuracy: ",round(accuracyAUC,2))

LDA model AUC accuracy:  0.8


In [22]:
#ToDo:
#Try prediction with mutliple test subjects
#