In [None]:
# 10 songs for each of 4 trials for each of 31 subjects = total of 1240 playings
# 12 target classes: HAPPY, LOW VALENCE, LOW TENSION, LOW ENERGY, HIGH ENERGY, SURPRISE, FEAR, TENDER, ANGER, SAD, HIGH VALENCE, HIGH TENSION
# ? Unknown how many playings fall under each target

In [29]:
import mne # package for reading edf data
import numpy as np
import pandas as pd
import json
import matplotlib.pyplot as plt
from scipy.signal import butter, sosfiltfilt, sosfreqz  

In [30]:
fs = 240.0                      # Hz; sampling rate
dt = 1000/fs                    # ms; time between samples
sdt = np.round(dt).astype(int); # rounded dt so that we can index samples
hp = 0.1                        # Hz; our low cut for our bandpass
lp = 30.                        # Hz; our high cut for our bandpass
order = 3                       # filter order (functionally doubled)

# Create our filter coefficient as as a second-order section
# Note: by defining 'fs' we don't divide our windows by the Nyquist
sos = butter(order, [hp, lp], analog = False, btype = 'band', output = 'sos', fs = fs)

In [31]:
# Create empty lists to store our data. We'll conver them into np.arrays at the end
train_data = []
train_labels = []
test_data = []

test_set = np.random.choice(124, size=24, replace=False)

# Going through all 124 trials
for subject in range(1,32):
    for trial in range(2,6):
        fileName = f"data/sub-{format(subject, '02x')}/eeg/sub-{format(subject, '02x')}_task-run{trial}_eeg.edf"
        print('Processing: ' + fileName)
        data = mne.io.read_raw_edf(fileName)
        trialRecording = data.to_data_frame().infer_objects()

        labels = [] # Should be an array of 10 labels for each of the songs played in this trial
        recordings = [] # Should be a 3D array of 10 recordings, each with 16000 rows and 20 columns
        # Bens code to get individual playings




        # Going through all 10 playings
        for playing in range(1,11):
            recording = recordings[playing]

            # Epoch and correct DC offset of signal pre-filtering
            recording = recording_[e_s+t0:t0+e_e, :] - np.mean(recording_[e_s+t0:t0+e_e, :], 0)
            recording = sosfiltfilt(sos, recording)
            
            # Now let's baseline correct
            # data = data - np.mean(data[bl_s+np.abs(e_s):np.abs(e_s)+bl_e, :], 0)
            
            # Append data to correct locations
            if s in test_set:
                test_data.append(data)
            else:
                train_data.append(data)
                train_labels.append(labels[playing]) # target or non-target
                    
print('Processing completed!')

# Convert all of our lists into numpy arrays
train_data = np.array(train_data)
train_labels = np.array(train_labels)
test_data = np.array(test_data)

Processing: data/sub-01/eeg/sub-01_task-run2_eeg.edf
Extracting EDF parameters from c:\Users\Jack Sheridan\Documents\COGS189\COGS189-final-project\data\sub-01\eeg\sub-01_task-run2_eeg.edf...
EDF file detected
Setting channel info structure...
Creating raw.info structure...


IndexError: list index out of range

In [None]:
# Seperate target and non-target for plotting
tar     = train_data[np.where(train_labels == 1)[0], :, :]
non_tar = train_data[np.where(train_labels == 0)[0], :, :]

print('We have %d target trials' % tar.shape[0])
print('We have %d non-target trials' % non_tar.shape[0])

# We'll take the average of all trials to create an averaged ERP
tar_avg     = np.mean(tar, 0)
non_tar_avg = np.mean(non_tar, 0)

# Define channel of interest and create an array of time points
chan = 'Cz' # let's plot Cz
ch = np.where(channels == chan)[0][0]
times = np.linspace(epoch_start, epoch_end, train_data.shape[1])

# Initialize plot and calculate min and max y value
fig, ax = plt.subplots(nrows=1, ncols=1, figsize=(20, 6))
min_y = min(np.min(tar_avg), np.min(non_tar_avg))
max_y = max(np.max(tar_avg), np.max(non_tar_avg))

# Plot x and y axes
plt.plot([np.min(times), np.max(times)], [0, 0], color='k');  # x-axis
plt.plot([0, 0], [min_y, max_y], color='k');                  # y-axis

# Plot our averaged ERPs
plt.plot(times, tar_avg[:, ch], 'b', linewidth=4)
plt.plot(times, non_tar_avg[:, ch], 'r', linewidth=4)

# Highlight the baseline window and window of interest of our ERP
baseline = patches.Rectangle([baseline_start, min_y], baseline_end, np.abs(min_y)+max_y, 
                             color='c', alpha=0.2)
erp_win = patches.Rectangle([erp_start, min_y], erp_end-erp_start, np.abs(min_y)+max_y, 
                             color='lime', alpha=0.3)

# Add our baseline and window of interest highlights
ax.add_patch(baseline)
ax.add_patch(erp_win)

# Manually create legends since patches will corrupt default handles
legend_ = [patches.Patch(color='b', label = 'Target (oddball)'),
           patches.Patch(color='r', label = 'Non-target (standard)')]

# Finalize plot and set a high DPI for a crisp, hi-res figure
plt.xlabel('Time (msec)');
plt.ylabel('Amplitude (A/D Units)');
plt.legend(handles=legend_, loc="upper right");
plt.title('Event Related Potentials at channel %s' % chan);
fig.set_dpi(216);
plt.show();

In [None]:
# Let's compute the windowed means within erp_start and erp_end
num_points = 5; # we will divide our window into num_points means

# Define a simple windowed means function
def wm(x, start, end, num_points):
    num_trials = x.shape[0] # assumes first dem is numb observations
    w = np.round((start+end)/num_points).astype(int)
    y = np.zeros((num_points, x.shape[-1], num_trials)) # assumes num chans as last dimension
    for i in range(0, num_points):
        s = start + (w * i)
        e = end   + (w * i)
        y[i, :, :] = np.mean(x[:, s:e, :], 1).T
    return y

# Combine into a single train variable. Also create labels
X_train    = wm(train_data, erp_s, erp_e, num_points)
markers_train = np.vstack((train_labels, train_markers)).T
y = train_labels

# Now let's compute windowed means of our test data
X_test = wm(test_data, erp_s, erp_e, num_points)
markers_test = test_markers

# Let's print out the shapes of our data
print('X_train shape is: ' + str(X_train.shape))
print('y shape is......: ' + str(y.shape))
print('X_test shape is.: ' + str(X_test.shape))

In [None]:
# Since our X is 3D, we must flatten our data. We will then transpose it for sklearn
X_train = X_train.reshape(-1, X_train.shape[-1]).T
X_test = X_test.reshape(-1, X_test.shape[-1]).T

# Let's print out the new shape
print('X_train shape is now: ' + str(X_train.shape))
print('X_test  shape is now: ' + str(X_test.shape))

In [None]:
# Train our classifier (this may take a while via JupyterHub)
clf_lsqrs = LinearDiscriminantAnalysis(solver = 'lsqr',  shrinkage = 'auto').fit(X_train, y)

In [None]:
# Let's do 5-fold cross validation
score_lsqrs = cross_val_score(clf_lsqrs.fit(X_train, y), X_train, y, cv = 5)

# We will print out the mean score
print("solver = lsqr  accuracy: %f" % np.mean(score_lsqrs))