In [None]:
#@title Importing Necessary packages and mounting drive
#Installing the necessary package for analysis and visualization of EEG data
#!pip install mne
#Mount drive for accessing the dataset
from google.colab import drive
drive.mount('/content/drive')
#Importing the necessary packages
!pip install mne
import mne

In [None]:
#@title Load Pickle Data
import pickle
with open('/content/drive/My Drive/sub_data.pkl', 'rb') as file:
    sub_data = pickle.load(file)

In [None]:
#@title Loading The Data for Each Subject - Method1
#Subjects^

s=['23','24','25','26','28','30','31','33','34','35','36','37','41','42']
#Channels of interest
ch = ['O1','Oz','O2','PO7','PO3','POz','PO4','PO8','Pz','P3','P4','P2','P1']
import numpy as np
sub_trials = {}
sub_data = {}
# Defining conditions
conditions = ["Stimulus/Neutral/Left/Kick", "Stimulus/Neutral/Left/Walk", "Stimulus/Neutral/Right/Kick", "Stimulus/Neutral/Right/Walk",
                  "Stimulus/Congruent/Left/Kick", "Stimulus/Congruent/Left/Walk", "Stimulus/Congruent/Right/Kick", "Stimulus/Congruent/Right/Walk",
                  "Stimulus/Incongruent/Left/Kick", "Stimulus/Incongruent/Left/Walk", "Stimulus/Incongruent/Right/Kick", "Stimulus/Incongruent/Right/Walk"]
#Defining the trial type id_s
neutral_walk_ids = [31,33]
neutral_kick_ids = [32,34]
cong_walk_ids = [11,13]
cong_kick_ids = [12,14]
incong_walk_ids = [21,23]
incong_kick_ids = [22,24]
kick_ids = [12,14,22,24,32,34]
walk_ids = [11,13,21,23,31,33]
for sub_id in s:
    fname = f"/content/drive/MyDrive/EE585_Data/s{sub_id}_epochs_autoreject-epo.fif.gz"
    raw=mne.read_epochs(fname, preload=True)
    # Extracting epochs for each condition
    raw_epochs = raw[conditions].pick_channels(ch)
    neutral_epochs = raw[conditions[:4]].pick_channels(ch)
    cong_epochs = raw[conditions[4:8]].pick_channels(ch)
    incong_epochs = raw[conditions[8:]].pick_channels(ch)
    neutral_epoch_events = neutral_epochs.events[:, -1]
    cong_epoch_events = cong_epochs.events[:,-1]
    incong_epoch_events = incong_epochs.events[:,-1]
    raw_epoch_events = raw_epochs.events[:,-1]
    #labels: 0 for kick, 1 for walk
    neutral_labels = np.array([0 if event_id in neutral_kick_ids else 1 for event_id in neutral_epoch_events])
    cong_labels = np.array([0 if event_id in cong_kick_ids else 1 for event_id in cong_epoch_events])
    incong_labels = np.array([0 if event_id in incong_kick_ids else 1 for event_id in incong_epoch_events])
    raw_labels = np.array([0 if event_id in kick_ids else 1 for event_id in raw_epoch_events])
    sub_data[sub_id] = {
        'raw': raw,
        'raw_epochs': raw_epochs,
        'neutral_epochs': neutral_epochs,
        'cong_epochs': cong_epochs,
        'incong_epochs': incong_epochs,
        'raw_epoch_events':raw_epoch_events,
        'neutral_epoch_events':neutral_epoch_events,
        'cong_epoch_events':cong_epoch_events,
        'incong_epoch_events':incong_epoch_events,
        'neutral_labels': neutral_labels,
        'cong_labels': cong_labels,
        'incong_labels':incong_labels,
        'raw_labels':raw_labels
    }
    #_1 and _2:left & _3 and _4: right but irrelevant to our task now...
    # Store trial information in sub_trials
    sub_trials[sub_id] = [
        len(neutral_epochs), np.sum(neutral_labels == 0), np.sum(neutral_labels == 1),
        len(cong_epochs), np.sum(cong_labels == 0), np.sum(cong_labels == 1),
        len(incong_epochs), np.sum(incong_labels == 0), np.sum(incong_labels == 1)
    ]

In [None]:
#@title Visualizations of Time-Series Data
import matplotlib.pyplot as plt
#for sub_id in s:
times = np.linspace(0.1, 0.9, 8)
#["aud/left"].plot_topomap(ch_type="mag", times=times, colorbar=True)
raw.plot(picks=ch)
raw_epochs.plot_image(picks='POz', combine="mean")

In [None]:
#@title Number of Trial Types for Each Subject
for i in sub_trials.keys():
  print("Subj,Neut,NK, NW ,Cong, CK ,CW,Incon,IK,IW")
  print(i,sub_trials[i])

Subj,Neut,NK, NW ,Cong, CK ,CW,Incon,IK,IW
31 [76, 37, 39, 265, 127, 138, 81, 37, 44]


In [None]:
#@title Resampling and Visualizing Time Series Data
neutrals = 35
cong = 125
incong = 35
s=['23','24','25','26','28','30','31','33','34','35','36','37','41','42']
#Channels of interest
ch = ['O1','Oz','O2','PO7','PO3','POz','PO4','PO8','Pz','P3','P4','P2','P1']
#Make sampling rate 500
for sub_id in s:
    sub_data[sub_id]['raw'].resample(500)
    #Further inspection of the epoched data with 65 channels and time points ranging from -2.4 to .999
    sub_data[sub_id]['raw'].average().plot_image()
    #0 point corresponds to target stimulus onset

In [None]:
#@title Computing The Time Frequency using Morlet
import mne
from mne.time_frequency import tfr_array_morlet
import numpy as np
def time_freq(X,decim=3,n_cycles = 0.5):
  # Define frequencies of interest (log-spaced)
  # Define frequencies of interest for specific bands
  alpha_frequencies = np.linspace(10, 13, num=1)  # Alpha band
  beta_frequencies = np.linspace(13, 30, num=3)  # Beta band
  frequencies = np.concatenate([alpha_frequencies,beta_frequencies])
  # Compute time-frequency representation with Morlet wavelets
  power = tfr_array_morlet(X, sfreq=500,freqs=frequencies, n_cycles=n_cycles, use_fft=True, decim=decim,output='power')
  return power


In [None]:
#@title Reshaping & Get Trial Indices
import numpy as np

#Assuming time-freq data is 4D
def convert_2d(X):
  n_trials, n_chans, n_freqs, n_timesteps = X.shape
  X_2d = X.reshape(n_trials,n_chans*n_freqs*n_timesteps)
  return X_2d
#Assuming time-series data is 3D
def convert_2d_ts(X):
  n_trials, n_chans, n_timesteps = X.shape
  X_2d = X.reshape(n_trials,n_chans*n_timesteps)
  return X_2d

def get_trial_idx(kick_list,walk_list,n_trials):
  np.random.shuffle(kick_list)
  kick = kick_list[(len(kick_list)-n_trials):]
  np.random.shuffle(walk_list)
  walk = walk_list[(len(walk_list)-n_trials):]
  return kick, walk

In [None]:
#@title Normalization
import numpy as np
# Calculate the mean and standard deviation along the first axis (axis=0)
# Subtract mean (zero-centered), divide by std
def normalize(X):
  means = np.mean(X, axis=0)
  std_devs = np.std(X, axis=0)

  # Avoid division by zero
  std_devs[std_devs == 0] = 1

  # Normalize the data
  normalized_power_train = (X - means) / std_devs
  return normalized_power_train

In [None]:
#@title Preparing training and test datasets for each subject
from mne.time_frequency import tfr_array_morlet

n_trials = 35
c_trials = 125
i_trials = 35
fold = 4
sub_val_data = {}
sub_data_processed = {}
for sub_id in s:
  # Discard extra kick/walk trials in neutral ones
  neutral_kick_indices, neutral_walk_indices = get_trial_idx(np.where(sub_data[sub_id]['neutral_labels'] == 0)[0],np.where(sub_data[sub_id]['neutral_labels'] == 1)[0],35)
  #Discard extra kick/walk trials in incongruent ones
  incong_kick_indices, incong_walk_indices = get_trial_idx(np.where(sub_data[sub_id]['incong_labels'] == 0)[0],np.where(sub_data[sub_id]['incong_labels'] == 1)[0],35)
  #Discard extra kick/walk trials in congruent ones
  cong_kick_indices, cong_walk_indices = get_trial_idx(np.where(sub_data[sub_id]['cong_labels'] == 0)[0],np.where(sub_data[sub_id]['cong_labels'] == 1)[0],125)

  #Find the trial indices to split into training and test sets (80%-20%)
  train_indices = []
  test_indices = []
  train_indices = np.concatenate([neutral_kick_indices[:28],neutral_walk_indices[:28],incong_kick_indices[:28],incong_walk_indices[:28],cong_kick_indices[:100],cong_walk_indices[:100]])
  #val_indices = np.concatenate([neutral_kick_indices[:7],neutral_walk_indices[:7],incong_kick_indices[:7],incong_walk_indices[:7],cong_kick_indices[:25],cong_walk_indices[:25]])
  #train_val_indices = np.concatenate([neutral_kick_indices[7:28],neutral_walk_indices[7:28],incong_kick_indices[7:28],incong_walk_indices[7:28],cong_kick_indices[25:100],cong_walk_indices[15:100]])
  test_indices = np.concatenate([neutral_kick_indices[28:],neutral_walk_indices[28:],incong_kick_indices[28:],incong_walk_indices[28:],cong_kick_indices[100:],cong_walk_indices[100:]])
  sub_val_data[sub_id] = {
        'x_val0': [],
        'x_val1': [],
        'x_val2': [],
        'x_val3': [],
        'y_val0': [],
        'y_val1': [],
        'y_val2': [],
        'y_val3': [],
        'x_norm_val_power2d0': [],
        'x_norm_val_power2d1': [],
        'x_norm_val_power2d2': [],
        'x_norm_val_power2d3': [],
        'x_norm_train_power2d0': [],
        'x_norm_train_power2d1': [],
        'x_norm_train_power2d2': [],
        'x_norm_train_power2d3': [],
        'y_train0': [],
        'y_train1': [],
        'y_train2': [],
        'y_train3': [],
        'x_train0':[],
        'x_train1':[],
        'x_train2':[],
        'x_train3':[]
    }

  for f in range(fold):
    np.random.shuffle(train_indices)
    #Extract training and validation data (time-series), from 100ms - 700ms
    sub_val_data[sub_id][f'x_val{str(f)}'] = sub_data[sub_id]['raw_epochs'].crop(tmin=0.1, tmax=0.7).get_data(copy=False)[train_indices[:78]]
    sub_val_data[sub_id][f'y_val{str(f)}'] = sub_data[sub_id]['raw_labels'][train_indices[:78]]
    sub_val_data[sub_id][f'x_norm_val_power2d{str(f)}'] = convert_2d(normalize(time_freq(sub_val_data[sub_id][f'x_val{str(f)}'])))
    sub_val_data[sub_id][f'x_train{str(f)}'] = sub_data[sub_id]['raw_epochs'].crop(tmin=0.1, tmax=0.7).get_data(copy=False)[train_indices[78:]]
    sub_val_data[sub_id][f'y_train{str(f)}'] = sub_data[sub_id]['raw_labels'][train_indices[78:]]
    sub_val_data[sub_id][f'x_norm_train_power2d{str(f)}'] = convert_2d(normalize(time_freq(sub_val_data[sub_id][f'x_train{str(f)}'])))

  np.random.shuffle(train_indices)
  np.random.shuffle(test_indices)
  X_test = sub_data[sub_id]['raw_epochs'].crop(tmin=0.1, tmax=0.7).get_data(copy=False)[test_indices]
  y_test = sub_data[sub_id]['raw_labels'][test_indices]
  X_train = sub_data[sub_id]['raw_epochs'][train_indices]
  y_train = sub_data[sub_id]['raw_labels'][train_indices]
  x_train_power = time_freq(X_train)
  x_test_power = time_freq(X_test)
  sub_data_processed[sub_id] = {
      'x_train_power': time_freq(X_train),
      'x_test_power': time_freq(X_test),
      'x_train_2d': convert_2d_ts(normalize(X_train)),
      'y_train': y_train,
      'x_test_2d': convert_2d_ts(normalize(X_test)),
      'y_test':  y_test,
      'x_norm_train_power2d': convert_2d(normalize(x_train_power)),
      'x_norm_test_power2d': convert_2d(normalize(x_test_power))
  }
  #Saving time-series, time-freq data and labels for test & training
  #sub_data_processed[sub_id]['x_train_power'] = time_freq(X_train)
  #sub_data_processed[sub_id]['x_test_power'] = time_freq(X_test)
  #sub_data_processed[sub_id]['x_train_2d'] = convert_2d_ts(normalize(X_train))
  #sub_data_processed[sub_id]['y_train'] = y_train
  #sub_data_processed[sub_id]['x_test_2d'] = convert_2d_ts(normalize(X_test))
  #sub_data_processed[sub_id]['y_test'] = y_test
  #Normalizing training & test data and reshaping from 4D to 2D
  #sub_data_processed[sub_id]['x_norm_train_power2d'] = convert_2d(normalize(sub_data[sub_id]['x_train_power']))
  #sub_data_processed[sub_id]['x_norm_test_power2d'] = convert_2d(normalize(sub_data[sub_id]['x_test_power']))

In [None]:
#@title Preparing training and test datasets for each subject
from mne.time_frequency import tfr_array_morlet
s = ['23','24','25','26','28','30','31','33','34','35','36','37','41','42']
n_trials = 35
c_trials = 125
i_trials = 35
fold = 4
sub_val_data = {}
sub_data_processed = {}
for sub_id in s:
  # Discard extra kick/walk trials in neutral ones
  neutral_kick_indices, neutral_walk_indices = get_trial_idx(np.where(sub_data[sub_id]['neutral_labels'] == 0)[0],np.where(sub_data[sub_id]['neutral_labels'] == 1)[0],35)
  #Discard extra kick/walk trials in incongruent ones
  incong_kick_indices, incong_walk_indices = get_trial_idx(np.where(sub_data[sub_id]['incong_labels'] == 0)[0],np.where(sub_data[sub_id]['incong_labels'] == 1)[0],35)
  #Discard extra kick/walk trials in congruent ones
  cong_kick_indices, cong_walk_indices = get_trial_idx(np.where(sub_data[sub_id]['cong_labels'] == 0)[0],np.where(sub_data[sub_id]['cong_labels'] == 1)[0],125)

  #Find the trial indices to split into training and test sets (80%-20%)
  train_indices = []
  test_indices = []
  np.random.shuffle(neutral_kick_indices)
  np.random.shuffle(neutral_walk_indices)
  np.random.shuffle(incong_kick_indices)
  np.random.shuffle(incong_walk_indices)
  np.random.shuffle(cong_kick_indices)
  np.random.shuffle(cong_walk_indices)
  train_indices = np.concatenate([neutral_kick_indices[:28],neutral_walk_indices[:28],incong_kick_indices[:28],incong_walk_indices[:28],cong_kick_indices[:100],cong_walk_indices[:100]])
  test_indices = np.concatenate([neutral_kick_indices[28:],neutral_walk_indices[28:],incong_kick_indices[28:],incong_walk_indices[28:],cong_kick_indices[100:],cong_walk_indices[100:]])
  sub_val_data[sub_id] = {
        'x_val0': [],
        'x_val1': [],
        'x_val2': [],
        'x_val3': [],
        'y_val0': [],
        'y_val1': [],
        'y_val2': [],
        'y_val3': [],
        'x_norm_val_power2d0': [],
        'x_norm_val_power2d1': [],
        'x_norm_val_power2d2': [],
        'x_norm_val_power2d3': [],
        'x_norm_train_power2d0': [],
        'x_norm_train_power2d1': [],
        'x_norm_train_power2d2': [],
        'x_norm_train_power2d3': [],
        'y_train0': [],
        'y_train1': [],
        'y_train2': [],
        'y_train3': [],
        'x_train0':[],
        'x_train1':[],
        'x_train2':[],
        'x_train3':[]
    }
  for f in range(fold):
    np.random.shuffle(neutral_kick_indices)
    np.random.shuffle(neutral_walk_indices)
    np.random.shuffle(incong_kick_indices)
    np.random.shuffle(incong_walk_indices)
    np.random.shuffle(cong_kick_indices)
    np.random.shuffle(cong_walk_indices)
    #Extract training and validation data (time-series), from 100ms - 700ms
    val_indices = np.concatenate([neutral_kick_indices[:7],neutral_walk_indices[:7],incong_kick_indices[:7],incong_walk_indices[:7],cong_kick_indices[:25],cong_walk_indices[:25]])
    train_val_indices = np.concatenate([neutral_kick_indices[7:28],neutral_walk_indices[7:28],incong_kick_indices[7:28],incong_walk_indices[7:28],cong_kick_indices[25:100],cong_walk_indices[25:100]])
    sub_val_data[sub_id][f'x_val{str(f)}'] = sub_data[sub_id]['raw_epochs'].crop(tmin=0.1, tmax=0.7).get_data(copy=False)[val_indices]
    sub_val_data[sub_id][f'y_val{str(f)}'] = sub_data[sub_id]['raw_labels'][val_indices]
    sub_val_data[sub_id][f'x_norm_val_power2d{str(f)}'] = convert_2d(normalize(time_freq(sub_val_data[sub_id][f'x_val{str(f)}'])))
    sub_val_data[sub_id][f'x_train{str(f)}'] = sub_data[sub_id]['raw_epochs'].crop(tmin=0.1, tmax=0.7).get_data(copy=False)[train_val_indices]
    sub_val_data[sub_id][f'y_train{str(f)}'] = sub_data[sub_id]['raw_labels'][train_val_indices]
    sub_val_data[sub_id][f'x_norm_train_power2d{str(f)}'] = convert_2d(normalize(time_freq(sub_val_data[sub_id][f'x_train{str(f)}'])))

  X_test = sub_data[sub_id]['raw_epochs'].crop(tmin=0.1, tmax=0.7).get_data(copy=False)[test_indices]
  y_test = sub_data[sub_id]['raw_labels'][test_indices]
  X_train = sub_data[sub_id]['raw_epochs'].crop(tmin=0.1, tmax=0.7).get_data(copy=False)[train_indices]
  y_train = sub_data[sub_id]['raw_labels'][train_indices]
  x_train_power = time_freq(X_train)
  x_test_power = time_freq(X_test)
  sub_data_processed[sub_id] = {
      'x_train_power': time_freq(X_train),
      'x_test_power': time_freq(X_test),
      'x_train_2d': convert_2d_ts(normalize(X_train)),
      'y_train': y_train,
      'x_test_2d': convert_2d_ts(normalize(X_test)),
      'y_test':  y_test,
      'x_norm_train_power2d': convert_2d(normalize(x_train_power)),
      'x_norm_test_power2d': convert_2d(normalize(x_test_power))
  }
  #Saving time-series, time-freq data and labels for test & training
  #sub_data_processed[sub_id]['x_train_power'] = time_freq(X_train)
  #sub_data_processed[sub_id]['x_test_power'] = time_freq(X_test)
  #sub_data_processed[sub_id]['x_train_2d'] = convert_2d_ts(normalize(X_train))
  #sub_data_processed[sub_id]['y_train'] = y_train
  #sub_data_processed[sub_id]['x_test_2d'] = convert_2d_ts(normalize(X_test))
  #sub_data_processed[sub_id]['y_test'] = y_test
  #Normalizing training & test data and reshaping from 4D to 2D
  #sub_data_processed[sub_id]['x_norm_train_power2d'] = convert_2d(normalize(sub_data[sub_id]['x_train_power']))
  #sub_data_processed[sub_id]['x_norm_test_power2d'] = convert_2d(normalize(sub_data[sub_id]['x_test_power']))

In [None]:
with open('/content/drive/My Drive/sub_val_data.pkl', 'wb') as file:
    pickle.dump(sub_val_data, file)

In [None]:
with open('/content/drive/My Drive/sub_data_processed.pkl', 'wb') as file:
    pickle.dump(sub_data_processed, file)

In [None]:
print(frequencies)

[10.  13.  21.5 30. ]


In [None]:
#@title Preparing training and test datasets for each subject
from mne.time_frequency import tfr_array_morlet

n_trials = 35
c_trials = 125
i_trials = 35

for sub_id in s:
  # Discard extra kick/walk trials in neutral ones
  neutral_kick_indices, neutral_walk_indices = get_trial_idx(np.where(sub_data[sub_id]['neutral_labels'] == 0)[0],np.where(sub_data[sub_id]['neutral_labels'] == 1)[0],35)
  #Discard extra kick/walk trials in incongruent ones
  incong_kick_indices, incong_walk_indices = get_trial_idx(np.where(sub_data[sub_id]['incong_labels'] == 0)[0],np.where(sub_data[sub_id]['incong_labels'] == 1)[0],35)
  #Discard extra kick/walk trials in congruent ones
  cong_kick_indices, cong_walk_indices = get_trial_idx(np.where(sub_data[sub_id]['cong_labels'] == 0)[0],np.where(sub_data[sub_id]['cong_labels'] == 1)[0],125)

  #Find the trial indices to split into training and test sets (80%-20%)
  train_indices = []
  test_indices = []
  train_indices = np.concatenate([neutral_kick_indices[:28],neutral_walk_indices[:28],incong_kick_indices[:28],incong_walk_indices[:28],cong_kick_indices[:100],cong_walk_indices[:100]])
  test_indices = np.concatenate([neutral_kick_indices[28:],neutral_walk_indices[28:],incong_kick_indices[28:],incong_walk_indices[28:],cong_kick_indices[100:],cong_walk_indices[100:]])
  np.random.shuffle(train_indices)
  np.random.shuffle(test_indices)

  #Extract training and test data (time-series), from 100ms - 700ms
  X_train = sub_data[sub_id]['raw_epochs'].crop(tmin=0.1, tmax=0.7).get_data(copy=False)[train_indices]
  y_train = sub_data[sub_id]['raw_labels'][train_indices]
  X_test = sub_data[sub_id]['raw_epochs'].crop(tmin=0.1, tmax=0.7).get_data(copy=False)[test_indices]
  y_test = sub_data[sub_id]['raw_labels'][test_indices]

  #Saving time-series, time-freq data and labels for test & training
  sub_data[sub_id]['x_train_power'] = time_freq(X_train)
  sub_data[sub_id]['x_test_power'] = time_freq(X_test)
  sub_data[sub_id]['x_train'] = X_train
  sub_data[sub_id]['y_train'] = y_train
  sub_data[sub_id]['x_test'] = X_test
  sub_data[sub_id]['y_test'] = y_test

  #Normalizing training & test data and reshaping from 4D to 2D
  sub_data[sub_id]['x_norm_train2d'] = convert_2d(normalize(sub_data[sub_id]['x_train_power']))
  sub_data[sub_id]['x_norm_test2d'] = convert_2d(normalize(sub_data[sub_id]['x_test_power']))

In [None]:
#@title Prepare dataset for Z2
#s=['23','24','25','26','28','30','31','33','34','35','36','37','41','42']
s=['23','24','25','26','28','30','31','33','34','35','37','41','42']
all = {}
for sub_id in s:
  if sub_id == '23':
    all['train_2d'] = np.concatenate((sub_data[sub_id]['x_norm_train2d'],sub_data[sub_id]['x_norm_test2d']),axis=0)
    all['train_label'] = np.concatenate((sub_data[sub_id]['y_train'],sub_data[sub_id]['y_test']),axis=0)
  else:
    a = np.concatenate((sub_data[sub_id]['x_norm_train2d'],sub_data[sub_id]['x_norm_test2d']),axis=0)
    all['train_2d'] = np.concatenate((all['train_2d'],a),axis=0)
    b = np.concatenate((sub_data[sub_id]['y_train'],sub_data[sub_id]['y_test']),axis=0)
    all['train_label'] = np.concatenate((all['train_label'],b),axis=0)
all['test_2d'] = np.concatenate((sub_data['36']['x_norm_train2d'],sub_data['36']['x_norm_test2d']),axis=0)
all['test_2d'].shape
all['test_label'] = np.concatenate((sub_data['36']['y_train'],sub_data['36']['y_test']),axis=0)
print(all['test_label'].shape)

In [None]:
#@title Prepare dataset for Z3
s=['23','24','25','26','28','30','31','33','34','35','37','41','42']
all_z3 = {}
for sub_id in s:
  if sub_id == '23':
    all_z3['train_2d'] = np.concatenate((convert_2d(sub_data[sub_id]['x_train_power']),convert_2d(sub_data[sub_id]['x_test_power'])),axis=0)
    all_z3['train_label'] = np.concatenate((sub_data[sub_id]['y_train'],sub_data[sub_id]['y_test']),axis=0)
  else:
    a = np.concatenate((convert_2d(sub_data[sub_id]['x_train_power']),convert_2d(sub_data[sub_id]['x_test_power'])),axis=0)
    all_z3['train_2d'] = np.concatenate((all_z3['train_2d'],a),axis=0)
    b = np.concatenate((sub_data[sub_id]['y_train'],sub_data[sub_id]['y_test']),axis=0)
    all_z3['train_label'] = np.concatenate((all_z3['train_label'],b),axis=0)


all_z3['test_2d'] = np.concatenate((convert_2d(sub_data['36']['x_train_power']),convert_2d(sub_data['36']['x_test_power'])),axis=0)
all_z3['test_2d'].shape
all_z3['test_label'] = np.concatenate((sub_data['36']['y_train'],sub_data['36']['y_test']),axis=0)
print(all_z3['test_label'].shape)

(390,)


In [None]:
#@title Accuracy, Precision, Sensitivity, Specificity
import numpy as np

def accuracy_score(y_true, y_pred):
    correct_predictions = np.sum(y_true == y_pred)
    total_predictions = len(y_true)
    accuracy = correct_predictions / total_predictions
    return accuracy

def precision(tp, fp):
    return tp / float(tp + fp)

def sensitivity(tp, fn):
    return tp / float(tp + fn)

def specificity(tn,fn):
    return tn / float(fn + tn)

def acc_report(y_true, y_pred):
    labels = np.unique(np.concatenate((y_true, y_pred)))
    a = accuracy_score(y_true,y_pred)
    for label in labels:
        tp = np.sum((y_true == label) & (y_pred == label))
        fp = np.sum((y_true != label) & (y_pred == label))
        fn = np.sum((y_true == label) & (y_pred != label))
        tn = np.sum((y_true !=label) & (y_pred != label))
        p = precision(tp, fp)
        r = sensitivity(tp, fn)
        spec = specificity(tn,fn)

    return a, p, r, spec

In [None]:
#@title KNN Algorithm
import numpy as np

#Find the euclidean distance between the two vectors
def euclidean_distance(v1, v2):
    return np.sqrt(np.sum((v1 - v2) ** 2))

#Find the manhattan distance between the two vectors
def manhattan_distance(v1,v2):
    #print(np.sum(np.abs(v1-v2)))
    return np.sum(np.abs(v1-v2))
#Algorithm that takes the training dataset, y_train for the training dataset, test dataset, hyperparameter of #neighboring clusters
def knn(train, y_train, test, k):

    distances = []

    # Compute distance from test sample to all training samples
    for i in range(len(y_train)):
        dist = manhattan_distance(test, train[i])
        distances.append((y_train[i], dist))
    #print(len(distances))
    #print(len(y_train))

    # Sort by distance not label (x[0]), and get top k which is the hyperparameter
    k_neighbors = sorted(distances, key=lambda x: x[1])[:k]
    #print(k_neighbors)
    # Extract y_train of the k nearest neighbors
    k_y_train = [label for label, _ in k_neighbors]

    # Majority vote
    prediction = max(set(k_y_train), key=k_y_train.count)
    #print(prediction)
    #print(distances)
    return prediction
def predict(train, y_train, test, k):
    predictions = []

    for test in test:
        pred_class = knn(train, y_train, test, k)
        predictions.append(pred_class)

    return np.array(predictions)
#k = range(1,20,2) # Number of neighbors
acc = {}
a = 0
#s = [22,23,24,25,26,28,30,31,33,34,35,36,37,41,42]
s=['25']
for sub_id in s:
  sub_str = str(sub_id)
  test_predictions = predict(sub_data[sub_str]['x_norm_train2d'], sub_data[sub_str]['y_train'], sub_data[sub_str]['x_norm_test2d'] , k=1)
  # Evaluate accuracy
  accuracy = np.mean(test_predictions == sub_data[sub_str]['y_test'])
  print("%s - Accuracy:"%sub_str,accuracy)
  acc[a] = accuracy
  a+=1
  sub_data[sub_id]['knn_tf'] = acc_report(test_predictions, sub_data[sub_id]['y_test'])



In [None]:
#@title KNN Cross-Validation
def split_dataset(X, y, num_folds):
    fold_size = len(X) // num_folds
    X_folds = [X[i * fold_size:(i + 1) * fold_size] for i in range(num_folds)]
    y_folds = [y[i * fold_size:(i + 1) * fold_size] for i in range(num_folds)]
    return X_folds, y_folds

def cross_validate_knn(X, y, k_values, num_folds=6):
    X_folds, y_folds = split_dataset(X, y, num_folds)
    k_scores = {}

    for k in k_values:
        accuracies = []

        for i in range(num_folds):
            # Prepare training and testing sets
            X_train = np.concatenate([X_folds[j] for j in range(num_folds) if j != i])
            y_train = np.concatenate([y_folds[j] for j in range(num_folds) if j != i])
            X_test = X_folds[i]
            y_test = y_folds[i]

            # Reshape the data for kNN
            #nsamples, nx, ny = X_train.shape
            #X_train_2d = X_train.reshape((nsamples, nx*ny))
            #nsamples, nx, ny = X_test.shape
            #X_test_2d = X_test.reshape((nsamples, nx*ny))

            # Train and predict with kNN
            predictions = predict(X_train, y_train, X_test, k)
            accuracy = np.mean(predictions == y_test)
            accuracies.append(accuracy)
        k_scores[k] = np.mean(accuracies)
        print(k_scores[k])
    return k_scores


# Define the range of k values to test
k_values = range(1,19,2)
k_scores = cross_validate_knn(sub_data[sub_str]['x_norm_train2d'], sub_data[sub_str]['y_train'], k_values)

# Find the best k value
best_k = max(k_scores, key=k_scores.get)
print("Best k:",best_k, "with accuracy:", k_scores[best_k])


In [None]:
#@title Visualizing KNN Validation and Training for different K values
import matplotlib.pyplot as plt
plt.plot(acc.keys(),acc.values())
plt.plot(k_scores.keys(),k_scores.values())

In [None]:
#@title Principal Component Analysis (PCA)
import numpy as np
class PCA:
  def __init__(self,var_exp):
    self.var_exp = var_exp
    self.principal_components = None
    self.mean = None
    self.eigenvalues = None
    self.eigenvectors = None
  def fit(self, X):
    self.mean = np.mean(X, axis=0)
    X = X - self.mean
    cov = np.cov(X.T)
    eigenvalues, eigenvectors = np.linalg.eig(cov)
    eigenvectors = eigenvectors.T
    indices = np.argsort(abs(eigenvalues))[::-1]
    eigenvalues = eigenvalues[indices]
    eigenvectors = eigenvectors[indices]
    tot, pve = 0, 0
    for j in range(len(X[0,:])):
      for i in range(234):
        tot+= X[i][j]**2
    for m in range(234):
      for i in range(234):
        pve+=(np.dot(X[i].T,eigenvectors[m]))**2
      if np.real(pve/tot) > self.var_exp:
        print(m)
        break
    self.principal_components = eigenvectors[0:m]
    self.eigenvalues = eigenvalues[0:m]
    self.eigenvectors = eigenvectors
  def transform(self,X):
    X = X - self.mean
    return np.dot(X,self.principal_components.T)



In [None]:
#@title Dimensionality Reduction with PCA for each subject followed by LDA
s1 = ['23']
for sub_id in s1:
  pca = PCA(0.90)
  pca.fit(sub_data_processed[sub_id]['x_norm_train_power2d'])
  sub_data_processed[sub_id]['train_pca'] = np.real(pca.transform(sub_data_processed[sub_id]['x_norm_train_power2d']))
  sub_data_processed[sub_id]['test_pca'] = np.real(pca.transform(sub_data_processed[sub_id]['x_norm_test_power2d']))
  num_pc = len(pca.principal_components)
  sub_data_processed[sub_id]['num_pc'] = num_pc
#LDA for each subject
for sub_id in s1:
  lda = LDA(2,0)
  lda.fit(sub_data_processed[sub_id]['train_pca'],sub_data_processed[sub_id]['y_train'])
  sub_data_processed[sub_id]['train_lda'] = lda.transform(sub_data_processed[sub_id]['train_pca'])
  sub_data_processed[sub_id]['test_lda'] = lda.transform(sub_data_processed[sub_id]['test_pca'])
  sub_data_processed[sub_id]['conf_matrix'],sub_data_processed[sub_id]['lda_acc'],sub_data_processed[sub_id]['report']= nearest_centroid_classifier(sub_data_processed[sub_id]['train_lda'],sub_data_processed[sub_id]['y_train'],sub_data_processed[sub_id]['y_test'],sub_data_processed[sub_id]['test_lda'])

In [None]:
#@title Linear Support Vector Machine (SVM) Algorithm
#The idea is to have a linear model, such that:
#For c1: w*xi-b >= 1; For c2: w*xi-b <= -1 => For class label yi: yi(w*xi-b) >= 1
#Come up with w (weights) and b (bias) by cost function and gradient descent
#Hinge Loss: l = max(0,1-yi(w*xi-b))
#Maximize the margin between two classes: 2/||w|| --> minimize the magnitude
#Regularization: J = lambda*||w||^2 + 1/n(sum(hinge_loss)) --> from i to n
#Gradients for w: if yi*f(x) >= 1: dJ_i/dw_k = 2*lambda*w_k / else: dJ_i/dw_k = 2*lambda*w_k - yi*xj
#Gradients for b: if yi*f(x) >= 1: dJ_i/db = 0 else: dJ_i/db = yi
#Update rule: w = w - alpha*dw / b = b - alpha*db; where alpha is the learning rate
import numpy as np
class SVM:
  def __init__(self,lr = 0.001,lambda_param=0.01,n_iters=1000):
    self.lr = lr
    self.lambda_param = lambda_param
    self.n_iters = n_iters
    self.w = None
    self.b = None
  def fit(self,X,y):
    #since y = 0,1 we should convert to -1,1
    y_ = np.where(y <= 0, -1, 1)
    n_samples, n_features = X.shape
    self.w = np.zeros(n_features)
    self.b = 0
    for _ in range(self.n_iters):
      for idx, x_i in enumerate(X):
        if y_[idx] * (np.dot(x_i,self.w)-self.b) >= 1:
          self.w -= self.lr*(2*self.lambda_param*self.w)
        else:
          self.w -= self.lr*(2*self.lambda_param*self.w - np.dot(x_i,y_[idx]))
          self.b -= self.lr*y_[idx]
  def predict(self,X):
    linear_output = np.dot(X,self.w) - self.b
    return np.sign(linear_output)

In [None]:
for sub_id in s:
  svm = SVM()
  svm.fit(sub_data_processed_pca[sub_id]['train_pca'],sub_data_processed_pca[sub_id]['y_train'])
  labels = svm.predict(sub_data_processed_pca[sub_id]['test_pca'])
  labels[np.where(labels==-1)] = 0
  sub_data_processed_pca[sub_id]['svm_acc'] = acc_report(labels,sub_data_processed_pca[sub_id]['y_test'])

In [None]:
#For Test-Training
for sub_id in s:
  pca = PCA(0.90)
  pca.fit(sub_data_processed[sub_id]['x_norm_train2d'])
  sub_data_processed[sub_id]['train_pca']=np.real(pca.transform(sub_data_processed[sub_id]['x_norm_train_power2d']))
  sub_data_processed[sub_id]['test_pca']=np.real(pca.transform(sub_data_processed[sub_id]['x_norm_test_power2d']))

In [None]:
for sub_id in s1:
  lda = LDA(2,0)
  lda.fit(sub_data_processed[sub_id]['train_pca'],sub_data_processed[sub_id]['y_train'])
  sub_data_processed[sub_id]['train_lda'] = lda.transform(sub_data_processed[sub_id]['train_pca'])
  sub_data_processed[sub_id]['test_lda'] = lda.transform(sub_data_processed[sub_id]['test_pca'])
  sub_data_processed[sub_id]['conf_matrix'],sub_data_processed[sub_id]['lda_acc'],sub_data_processed[sub_id]['report']= nearest_centroid_classifier(sub_data_processed[sub_id]['train_lda'],sub_data_processed[sub_id]['y_train'],sub_data_processed[sub_id]['y_test'],sub_data_processed[sub_id]['test_lda'])

In [None]:
#@title Linear Discriminant Analysis (LDA) for Each Subject
import numpy as np
class LDA:
  def __init__(self,n_components,shrinkage=0):
    self.n_components = n_components
    self.linear_discriminants = None
    self.shrinkage = shrinkage
  def fit(self, X, y):
    n_features = X.shape[1]
    class_labels = np.unique(y)
    mean_overall = np.mean(X,axis=0)
    print(self.shrinkage)
    S_W = self.shrinkage*((X - mean_overall).T.dot(X - mean_overall))
    S_B = np.zeros((n_features,n_features))
    for c in class_labels:
      X_c = X[y==c]
      mean_c = np.mean(X_c, axis = 0)
      S_W += (1-self.shrinkage)*((X_c - mean_c).T.dot(X_c - mean_c))
      n_c = X_c.shape[0]
      mean_diff = (mean_c - mean_overall).reshape(n_features,1)
      S_B += n_c * (mean_diff).dot(mean_diff.T)
    A = np.linalg.inv(S_W).dot(S_B)
    eigenvalues, eigenvectors = np.linalg.eig(A)
    eigenvectors = eigenvectors.T
    indices = np.argsort(abs(eigenvalues))[::-1]
    eigenvalues = eigenvalues[indices]
    eigenvectors = eigenvectors[indices]
    self.linear_discriminants = eigenvectors[0:self.n_components]
    sub_data[sub_id]['lda_weight'] = self.linear_discriminants[0]
  def transform(self,X):
    return np.dot(X,self.linear_discriminants.T)
#x1 = sub_data[sub_id]['train_lda'][:,0]
#x2 = sub_data[sub_id]['train_lda'][:,1]
#plt.scatter(x1,x2)


In [None]:
#@title LDA Accuracy Check - Centroid - Each Subject
def nearest_centroid_classifier(X_train, y_train, y_test, X_test):
    """
    Nearest Centroid Classifier
    :param X_train: numpy.ndarray, Training data projected into LDA space
    :param y_train: numpy.ndarray, Training labels
    :param X_test: numpy.ndarray, Test data projected into LDA space
    :return: Predicted labels for the test data
    """
    class_labels = np.unique(y_train)
    centroids = np.array([np.mean(X_train[y_train == cl], axis=0) for cl in class_labels])

    # Find the nearest centroid for each test sample
    distances = np.sqrt(((X_test - centroids[:, np.newaxis])**2).sum(axis=2))
    nearest = np.argmin(distances, axis=0)

    predictions = class_labels[nearest]
    lda_accuracy = accuracy_score(y_test, predictions)
    print(sub_id,f"LDA Accuracy: {lda_accuracy}")
    accuracy = acc_report(y_test, predictions)
    #print("Accuracy:", accuracy)
    print("Confusion Matrix:\n", confusion_mat)
    print("Classification Report:\n", classification_rep)
    return accuracy

In [None]:
#@title LDA Accuracy Check - Probability Based
def lda_predict_proba(X,w):
    # X is the input data (features) for which you want to predict probabilities
    # w is the weight vector obtained from LDA
    # w0 is the intercept term obtained from LDA

    # Calculate the discriminant scores for each class
    #class_means = [np.mean(np.dot(X[y == 0],w)) for c in np.unique(y)]
    #direction = 1 if class_means[1] > class_means[0] else -1
    scores = np.dot(X, w)
    #print(scores)
    # Calculate class probabilities using the sigmoid function
    #if direction == -1:
      #scores=-scores
    probabilities = np.exp(scores) / (1 + np.exp(scores))
    return probabilities

def lda_predict(X,w,threshold=0.5):
    # X is the input data (features) for which you want to make predictions
    # w is the weight vector obtained from LDA
    # w0 is the intercept term obtained from LDA
    # threshold is the decision threshold for converting probabilities to class labels
    # Get class probabilities
    probabilities = lda_predict_proba(X,w)

    # Make predictions based on the decision threshold
    predictions = (probabilities > threshold).astype(int)
    from sklearn.metrics import accuracy_score, confusion_matrix, classification_report
    confusion_mat = confusion_matrix(sub_data[sub_id]['y_test'], predictions)
    print(sub_id,accuracy_score(sub_data[sub_id]['y_test'], predictions))
    print(confusion_mat)
    return predictions
for sub_id in s1:
  predictions = lda_predict(sub_data[sub_id]['test_pca'],sub_data[sub_id]['lda_weight'])

