In [None]:
from data_processing_helper import * 

## Specific helper functions

In [None]:
def getOutputLabelsAndEpochTimes(event_df):
    # Generates the ordered list of output labels and epoch time pairs
    # Input: event_df -- the event dataframe from the csv file
    # Output: 
    #    output_labels -- [1, 2, 4, 3, etc] where the integers correspond to the trial type encoded
    #    epoch_times -- [[<timestamp of start>, <timestamp of end>], [<timestamp of start>, <timestamp of end>], etc]
    
    output_labels = []
    epoch_times = []
    current_epoch = []
    for index, row in event_df.iterrows():
        event_info = row['EventStart'].split("_")
        if event_info[0] == 'start': 
            output_labels.append(int(event_info[1]))
            current_epoch.append(row['time'])
        else :
            current_epoch.append(row['time'])
            epoch_times.append(list(current_epoch))
            current_epoch = []
    return np.array(output_labels), np.array(epoch_times)

In [None]:
def getEEGEpochs(epoch_times, eeg_df, target_num_trials=1000):
    # Slices and generates the epochs in the eeg_df given the epoch_times
    # Input: 
    #    epoch_times: [[<timestamp of start>, <timestamp of end>], [<timestamp of start>, <timestamp of end>], etc]
    #    eeg_df: dataframe from csv file
    # Output: 
    #    a numpy array containing eeg_epochs (#epoch, #chans, #timepoints)
    eeg_epochs = []
    for epoch_time in epoch_times: 
        sub_df = eeg_df[(eeg_df['time'] > epoch_time[0]) & (eeg_df['time'] < epoch_time[1])]
        sub_df = sub_df.drop(columns=['time'])
        num_above = len(sub_df) - target_num_trials
        if num_above >= 0:
            epoch = np.array(sub_df.values[num_above // 2: len(sub_df) - num_above // 2])[:1000]
            eeg_epochs.append(epoch.T)
            if len(epoch) != 1000:
                print("Warning: Potential off by 1 error. Found trail with != 1000 samples:", len(epoch))
        else: 
            print("Warning: Epoch with less than", target_num_trials, "eeg samples")
    return np.array(eeg_epochs)


### Main

In [None]:
eeg_filename = "./data/self_recorded/eeg_data 15_motorvis.csv"
event_filename = "./data/self_recorded/event_data 15_motorvis.csv"

chans = ['C4','C2', 'C1', 'C3']
eeg_df = pd.read_csv(eeg_filename)
eeg_df.columns=['time','C4', 'C2', 'C1', 'C3']

event_df = pd.read_csv(event_filename)
event_df.columns=['time', 'EventStart']
event_types = {0:"eye_close", 1:"left", 2:"right", 3:"foot", 4:"idle"}

# Filter the full data
filtered_df = eeg_df.copy()
for chan in chans:
    filtered_df[chan] = filterEEG(filtered_df[chan].values)

In [None]:
# Process dfs to get labels, raw eeg epochs, epochs of filtered eeg data, filtered epoch data
output_labels, epoch_times = getOutputLabelsAndEpochTimes(event_df)
raw_eeg_epochs = getEEGEpochs(epoch_times, eeg_df) # Raw eeg epochs
filtered_epochs = getEEGEpochs(epoch_times, filtered_df) # Epoched after filtering

# Create DataFrames
raw_eeg_epoch_df = getDF(raw_eeg_epochs, output_labels, epoch_times, chans)
filtered_epoch_df = getDF(filtered_epochs, output_labels, epoch_times, chans)
filtered_epoch_df.head(2)

In [None]:
# Visualize EEG and PSD for one trial
plt.figure(figsize=(15,5))
trial_num = 0

for ch in chans: 
    plt.plot(filtered_epoch_df[ch][trial_num], label=ch)
plt.ylabel("Voltage (uV)")
plt.xlabel("timepoints @ 250Hz")
plt.title("EEG of one motor imagery trial")
plt.legend() 
plt.show()

plt.figure(figsize=(15,5))
for ch in chans: 
    plotPSD_fromEEG(filtered_epoch_df.iloc[trial_num][ch], pre_cut_off_freq=2, post_cut_off_freq=30, label=ch)
plt.title("PSD of one motor imagery trial")
plt.legend()
plt.show()


In [None]:
# Get PSD averages for each channel for each event type 
# (0 = eye close, 1 = left, 2 = right, 3 = foot, 4 = idle)
psd_averages_by_type = {}

for event_type in range(0, 5): 
    psds_only_one_type={}
    freqs_only_one_type={}
    for i, row in filtered_epoch_df[filtered_epoch_df["event_type"] == event_type].iterrows(): 
        for ch in chans: 
            if ch not in psds_only_one_type: 
                psds_only_one_type[ch] = list()
                freqs_only_one_type[ch] = list()
            f, p = getMeanFreqPSD(row[ch])
            psds_only_one_type[ch].append(p)
            freqs_only_one_type[ch].append(f)
    avg_psds_one_type = {}
    for ch in chans:
        psds_only_one_type[ch] = np.array(psds_only_one_type[ch])
        avg_psds_one_type[ch] = np.mean(psds_only_one_type[ch], axis=0)
    psd_averages_by_type[event_type] = dict(avg_psds_one_type)

In [None]:
# View Average PSDs
for event_type in range(0, 5): 
    for ch in ['C4', 'C3']: 
        plotPSD(freqs_only_one_type[chans[0]][0], psd_averages_by_type[event_type][ch],pre_cut_off_freq=2, post_cut_off_freq=30, label=ch)

    # Plot for each event type
    plt.legend()
    plt.title("event type " + event_types[event_type])
    plt.show()

# For testing with ML Models to make sure output is fine

In [None]:
# # for models:
from pyriemann.estimation import XdawnCovariances
from pyriemann.tangentspace import TangentSpace
from sklearn.metrics import classification_report
from sklearn.linear_model import LogisticRegression


In [None]:
X = filtered_epochs[3:]
Y = output_labels[3:]

In [None]:
#Default 4:1 split
#Suffle
temp=list(zip(Y, X))
random.shuffle(temp)
Y, X = zip(*temp)
#Split train/test
X_train = np.array(X[:int(len(X)*4/(4+1))])
X_test = np.array(X[int(len(X)*4/(4+1)):])
Y_train = np.array(Y[:int(len(Y)*4/(4+1))])
Y_test = np.array(Y[int(len(Y)*4/(4+1)):])

class myModel:
    def __init__(self):
        pass
    def fit(X, Y):
        pass
    def predict(X):
        pass
    
class XDawnLRModel(myModel): # XDAWN Covariance Preprocessing + Linear Regression Classifier
    def __init__(self):
        super().__init__()
        self.XC = XdawnCovariances(nfilter = 1) # the number of filters can be changed
        self.logreg = LogisticRegression()
        
    def fit(self, X, Y):
        X_transformed = self.XC.fit_transform(X, Y)
        X_transformed = TangentSpace(metric='riemann').fit_transform(X_transformed)
        self.logreg.fit(X_transformed,Y)
        
    def predict(self, X):
        X_transformed = self.XC.transform(X)
        X_transformed = TangentSpace(metric='riemann').fit_transform(X_transformed)
        return self.logreg.predict(X_transformed)

model = XDawnLRModel()
model.fit(X_train, Y_train)
#return model

In [None]:
Y_pred = model.predict(X_test)
print(classification_report(Y_test, Y_pred))

In [None]:
sum(Y_test == Y_pred)/len(Y_test)