# Pip installs


In [None]:
!pip install mne

In [None]:
import tensorflow as tf
import keras
from keras import layers
from tensorflow.keras.losses import BinaryCrossentropy, CategoricalCrossentropy, SparseCategoricalCrossentropy, MeanSquaredError
import mne
from mne.preprocessing import ICA, corrmap, create_ecg_epochs, create_eog_epochs
import numpy as np
import csv
import pickle

# Getting the raw data
Each element of data referes to one subject, each element of data has 12 elements, each corrosponds to one trial, label has the same format

If you have a different directory to get the data from, feel free to change

In [None]:
# sample data for all subjects
data = []
labels = []
num_of_subjects = 109

for j in range(1, num_of_subjects + 1):
  j_subject_data = []
  j_subject_label = []
  for i in range(1, 15):
    if j < 10:
      if i < 10:
        file = "./files/S00" + str(j) + "/S00" + str(j) + "R0" + str(i) + ".edf"
      else:
        file = "./files/S00" + str(j) + "/S00" + str(j) + "R" + str(i) + ".edf"
    elif j < 100:
      if i < 10:
        file = "./files/S0" + str(j) + "/S0" + str(j) + "R0" + str(i) + ".edf"
      else:
        file = "./files/S0"+ str(j) + "/S0" + str(j) + "R" + str(i) + ".edf"
    else:
      if i < 10:
        file = "./files/S" + str(j) + "/S" + str(j) + "R0" + str(i) + ".edf"
      else:
        file = "./files/S"+ str(j) + "/S" + str(j) + "R" + str(i) + ".edf"

    j_subject_data.append(mne.io.read_raw_edf(file, verbose = False))
    j_subject_label.append(mne.read_annotations(file))
  data.append(j_subject_data)
  labels.append(j_subject_label)


# Global variables

These are needed to define which trials for a subject relate to indivbidual fists and which relate to both fist/feet clenching

In [None]:
# we define different trials with different results
individual_fist_trials = [3, 4, 7, 8, 11, 12]
# if a trial is not one of these, then it is either both feet closing or both fists closing

# EEG Power Analysis/Time frequency analysis
Code that allows us to visually inspect the EEG power for a fixed subject's trial

In [None]:
# Choose some data to observe
filt_raw = data[0][2].copy()

events = mne.events_from_annotations(filt_raw)

max_time_point = 4
min_time_point = -1
channel_epochs = mne.Epochs(filt_raw, events[0], tmax=max_time_point, tmin=min_time_point)
t0_epoch = channel_epochs['1']
t1_epoch = channel_epochs['2']
t2_epoch = channel_epochs['3']

frequencies = np.arange(1, 4)
power = mne.time_frequency.tfr_morlet(
    t2_epoch, n_cycles=2, return_itc=False, freqs=frequencies, decim=3
)
power.apply_baseline((min_time_point, max_time_point))
power.plot(["C3..", "C4..", "Cz.."])

frequencies = np.arange(13, 17)
power = mne.time_frequency.tfr_morlet(
    t2_epoch, n_cycles=2, return_itc=False, freqs=frequencies, decim=3
)
power.apply_baseline((min_time_point, max_time_point))
power.plot(["C3..", "C4..", "Cz.."])

# Helper functions
These are functions to get labels and to pre-process and retrieve the EEG data

NOTE that this assumes we are preprocessing the data with a 13-17 bandpass filter and baseline correction, to do the 1-4 Hz processing change the paramaters of the filter function and remove the apply_baseline function call

In [None]:
def pre_proccess_data(raw_data: mne.io.Raw):
  """
  Pre-proccess the data
  """

  # right now we filter for 13-17 frequency
  filt_raw = raw_data.load_data(verbose=False).filter(l_freq=13, h_freq=17, verbose=False)
  # get events to create power object
  events = mne.events_from_annotations(raw_data, verbose=False)

  # define time points to sample from and epochs
  max_time_point = 2
  min_time_point = -0.5
  epochs = mne.Epochs(filt_raw, events[0], tmax=max_time_point, tmin=min_time_point, verbose=False, preload=True)
  epochs = epochs.pick(picks=channels, verbose=False).apply_baseline((None, None), verbose=False)

  return epochs

In [None]:
def get_evoked_data(data, subject_index: int):
  """
  Gets the evoked response for an epoch for a subject at different trials
  """

  epoch = pre_proccess_data(data[subject_index])
  t0_evoked = epoch["1"].average()
  t1_evoked = epoch["2"].average()
  t2_evoked = epoch["3"].average()

  return (t0_evoked, t1_evoked, t2_evoked)

In [None]:
def get_true_label(t_label: str, trial_number: int) -> int:
  """
  Converts the labels given by the dataset (T0, T1, T2) to binary label
  """
  final_vector = [0, 0, 0, 0, 0]

  if t_label == "T0":
    final_vector[0] = 1
  elif t_label == "T1":
    if trial_number in individual_fist_trials:
      final_vector[2] = 1
    # other wise it is both fists clenching
    else:
      final_vector[3] = 1
  else: # t_label == "T2"
    if trial_number in individual_fist_trials:
      final_vector[1] = 1
    # other wise it is both feet clenching
    else:
      final_vector[4] = 1
  return final_vector

# Loading data

Below is the load_training_data function

In [None]:
def load_training_data(x_train: list[list[int]], y_train: list, data: list[list[int]], labels: list) -> None:
  """
  Loads data into x_train and y_train
  data is one subjects list of trials
  labels are the appropriate labels for each trial
  """
  # i is the trial number, e.g. i = 1 corrosponds to R01
  for i in range(1, len(labels) + 1):

    if i != 1 and i != 2:
      # get evoked data for each trial
      evoked_data = get_evoked_data(data, i - 1)
      t_labels = ["T0", "T1", "T2"]

      for j in range(0, 3):
        x_train.append(evoked_data[j].data.tolist())
        y_train.append(get_true_label(t_labels[j], i))

# Load data into x_train and y_train

In [None]:
channels = ['C4..', 'C3..', 'Cz..']
# CHANGE PATH_TO_SAVE TO SAVE TO FOLDER
PATH_TO_SAVE = "./"
for index_of_subject in range(0, len(data)):
  x_train, y_train = [], []
  load_training_data(x_train, y_train, data[index_of_subject], labels[index_of_subject])
  x_train = np.array(x_train)
  y_train = np.array(y_train)
  with open(PATH_TO_SAVE + '/S' + str(index_of_subject + 95) + '.pickle', 'wb') as file:
    pickle.dump((x_train, y_train), file)