In [31]:
# Importing libraries
import numpy as np
import os
import scipy.io as sio
from matplotlib  import pyplot as plt
import pyntbci
import os
import yaml
import pickle

In [32]:
# defining relevant functions
def accuracy_across_folds(X: np.ndarray, y: np.ndarray, codes: np.ndarray, fs: int, transient_size:float, trial_time:int, n_folds:int=4, plot:bool = False):
    """
    Computes classification accuracy for n = n_folds

    Args:
        X (np.ndarray): EEG data (trials x channels x samples)
        y (np.ndarray): labels of trials
        codes (np.ndarray): codes used in the experiment
        fs (int): downsampling frequency
        transient_size (float): duration of the transient response in seconds
        trial_time (int): duration for which codes were flashing on the screen
        n_folds (int, optional): number of folds for cross-validation. Defaults to 4.
        plot (bool, optional): plot the accuracy per fold. Defaults to False.

    Returns:
        np.array: row vector containing accuracies of n_folds
    """
 
    n_samples = int(trial_time * fs)
    n_trials = X.shape[0]
    n_classes = codes.shape[0]
    
    # Chronological cross-validation
    folds = np.repeat(np.arange(n_folds), int(n_trials / n_folds))
    
    # Loop folds over different transient sizes
    accuracy = np.zeros(n_folds)

    for i_fold in range(n_folds):
        
        # Split data to train and test set
        X_trn, y_trn = X[folds != i_fold, :, :n_samples], y[folds != i_fold]
        X_tst, y_tst = X[folds == i_fold, :, :n_samples], y[folds == i_fold]

        # Train template-matching classifier
        rcca = pyntbci.classifiers.rCCA(codes= codes, fs = fs, event = "duration", transient_size = transient_size, onset_event = True)
        rcca.fit(X_trn, y_trn)

        # Apply template-matching classifier
        yh_tst = rcca.predict(X_tst)

        # Compute accuracy
        accuracy[i_fold] = np.mean(yh_tst == y_tst)

    if plot:
        plt.figure(figsize=(15, 3))
        plt.bar(np.arange(n_folds), accuracy)
        plt.axhline(accuracy.mean(), linestyle='--', alpha=0.5, label="average")
        plt.axhline(1 / n_classes, color="k", linestyle="--", alpha=0.5, label="chance")
        plt.xlabel("(test) fold")
        plt.ylabel("accuracy")
        plt.legend()
        plt.title("Chronological cross-validation")
        plt.tight_layout()


    return accuracy

In [33]:
project_path = r'C:\Users\s1081686\Desktop\RA_Project\Scripts\pynt_codes\version_2'

# load relevant parameters
with open(os.path.join(project_path,'config.yml'), "r") as yaml_file:
    config_data = yaml.safe_load(yaml_file)
    
analysis_params = config_data['analysis_params']
experimental_params = config_data['experimental_params']

n_trials = experimental_params['N_TRIALS']
trial_time = experimental_params['TRIAL_TIME']

n_channels = 63 #analysis_params['N_CHANNELS']
fs = analysis_params['Fs'] # downsampled frequency
n_subjects = analysis_params['N_SUBJECTS']
n_runs_ov = analysis_params['N_RUNS_OVERT'] # number of runs in the overt condition
n_runs_cov = analysis_params['N_RUNS_COVERT']

# loading paths
data_path = analysis_params['DATA_PATH']

# preprocessed eeg data filenames of all subjects
subjects_overt = analysis_params['subjects_overt']
subjects_covert = analysis_params['subjects_covert']

# path to store results
results_path = os.path.join(project_path,'analysis_version2','result_variables')

print(n_subjects)

6


In [34]:
# load preprocessed data, labels and codes for covert and overt conditions

# data
X_overt = np.zeros((n_subjects, n_runs_ov* n_trials, n_channels, trial_time*fs)) # n_subjects x (n_runs*trials x channels x samples)
X_covert = np.zeros((n_subjects, n_runs_cov* n_trials, n_channels, trial_time*fs)) # n_subjects x (n_runs*trials x channels x samples)

# labels
y_overt = np.zeros((n_subjects, n_runs_ov* n_trials))
y_covert = np.zeros((n_subjects, n_runs_cov* n_trials))

for i_subject, (sub_cov, sub_ov) in enumerate(zip(subjects_covert, subjects_overt)):
    
    print(f"loading data for subject {i_subject+1}")
    sub_num = sub_cov.split('_')[0]
    
    # file names
    fn_overt = os.path.join(data_path, sub_num, sub_ov)
    fn_covert = os.path.join(data_path, sub_num, sub_cov)
    
    
    #loading overt data
    tmp_overt = np.load(fn_overt)
    X_overt[i_subject] = tmp_overt["X"] # eeg data
    y_overt[i_subject] = tmp_overt["y"] # trial labels
    V = tmp_overt['V'] # codes: note, these codes will be now used throughout the notebook!
    
    #loading covert data
    tmp_covert = np.load(fn_covert)
    X_covert[i_subject] = tmp_covert["X"] # eeg data
    y_covert[i_subject] = tmp_covert["y"] # trial labels
    
print("Loading data finished")
print(f"shape of overt data for a praticipant {X_overt[i_subject].shape}, shape of labels {y_overt[i_subject].shape}")
print(f"shape of covert data for a praticipant {X_covert[i_subject].shape}, shape of labels {y_covert[i_subject].shape}")

loading data for subject 1
loading data for subject 2
loading data for subject 3
loading data for subject 4
loading data for subject 5
loading data for subject 6
Loading data finished
shape of overt data for a praticipant (20, 63, 2400), shape of labels (20,)
shape of covert data for a praticipant (80, 63, 2400), shape of labels (80,)


In [35]:
print("overt data shape", X_overt.shape)
print("covert data shape", X_covert.shape)
print("codes shape", V.shape)



overt data shape (6, 20, 63, 2400)
covert data shape (6, 80, 63, 2400)
codes shape (2, 252)


In [36]:
#1. Compare classification accuracies across modeled transient response lengths
transient_size_vec = np.arange(0.1, 1, 0.1) # varying transient response lengths from 0.1s to .9s
n_transient_sizes = len(transient_size_vec)

# initializing variables for collecting accuracy
n_folds = 4 # 4 fold cross validation
accuracy_across_ts_covert = np.zeros((n_subjects, n_transient_sizes, n_folds)) 
accuracy_across_ts_overt = np.zeros((n_subjects, n_transient_sizes, n_folds))

for i_subject in range(n_subjects):
    
    # overt data and labels
    X_ov = X_overt[i_subject]
    y_ov = y_overt[i_subject]
    
    # covert data and labels
    X_cov = X_covert[i_subject]
    y_cov = y_covert[i_subject]
    
    for i_ts in range(transient_size_vec.shape[0]):
        transient_size = transient_size_vec[i_ts]
        accuracy_across_ts_overt[i_subject, i_ts, :] = accuracy_across_folds(X = X_ov, y = y_ov, codes = V, fs = fs, trial_time= trial_time, transient_size= transient_size)        
        accuracy_across_ts_covert[i_subject, i_ts, :] = accuracy_across_folds(X = X_cov, y = y_cov, codes = V, fs = fs, trial_time= trial_time, transient_size= transient_size)
        
    print(f"finished computing accuracy results for subject{i_subject + 1}")
    
# # saving files
accuracy_across_ts = {'overt_condition': accuracy_across_ts_overt, 'covert_condition': accuracy_across_ts_covert}
fname = os.path.join(results_path, 'accuracy_across_transient_responses.pickle')
with open(fname, 'wb') as handle:
    pickle.dump(accuracy_across_ts, handle, protocol=pickle.HIGHEST_PROTOCOL)
print('files stored successfully')  

finished computing accuracy results for subject1
finished computing accuracy results for subject2
finished computing accuracy results for subject3
finished computing accuracy results for subject4
finished computing accuracy results for subject5
finished computing accuracy results for subject6
files stored successfully


In [37]:
#2. Finding classification accuracies at the optimum transient response length
mean_accuracy_across_folds_overt = accuracy_across_ts_overt.mean(axis = 2) # averaging in the fold dimension
mean_accuracy_across_folds_covert = accuracy_across_ts_covert.mean(axis = 2)    

# range of accuracies
range_acc_covert = [np.min(mean_accuracy_across_folds_covert), np.max(mean_accuracy_across_folds_covert)]
range_acc_overt = [np.min(mean_accuracy_across_folds_overt[:]), np.max(mean_accuracy_across_folds_overt[:])]
acc_overt_across_subjects = mean_accuracy_across_folds_overt

print(f"overt condition: mean accuracy of all subjects for all transient response lengths {acc_overt_across_subjects}")
print(f"covert condition: min and max mean accuracy observed in subjects across all transient response lengths {range_acc_covert}")

# identifying the optimum transient response length and finding corresponding accuracies
optimum_resp_len = transient_size_vec[np.argmax(mean_accuracy_across_folds_covert.mean(axis = 0))]
mean_acc_opt_resp_len_overt = mean_accuracy_across_folds_overt[:, transient_size_vec == optimum_resp_len]
mean_acc_opt_resp_len_covert = mean_accuracy_across_folds_covert[:, transient_size_vec == optimum_resp_len]

# # saving files
mean_accuracy_optimum_resp_len ={'overt_condition': mean_acc_opt_resp_len_overt, 'covert_condition': mean_acc_opt_resp_len_covert}
fname = os.path.join(results_path, 'mean_accuracy_at_optimum_response_length.pickle')
with open(fname, 'wb') as handle:
    pickle.dump(mean_accuracy_optimum_resp_len, handle, protocol=pickle.HIGHEST_PROTOCOL)
print("files saved successfully")


overt condition: mean accuracy of all subjects for all transient response lengths [[1. 1. 1. 1. 1. 1. 1. 1. 1.]
 [1. 1. 1. 1. 1. 1. 1. 1. 1.]
 [1. 1. 1. 1. 1. 1. 1. 1. 1.]
 [1. 1. 1. 1. 1. 1. 1. 1. 1.]
 [1. 1. 1. 1. 1. 1. 1. 1. 1.]
 [1. 1. 1. 1. 1. 1. 1. 1. 1.]]
covert condition: min and max mean accuracy observed in subjects across all transient response lengths [0.4625, 0.725]
files saved successfully


In [38]:
#3. computing spatial activity and transient response curves at the optimum response length
num_events = analysis_params['NUM_EVENTS'] # number of events (short, long and onset)
len_transient_response = int(optimum_resp_len * fs * num_events)

# transient response curve
r_overt = np.zeros((len(subjects_covert),len_transient_response)) 
r_covert = np.zeros((len(subjects_covert),len_transient_response))

# activity pattern
activity_pat_overt = np.zeros((len(subjects_covert), n_channels))
activity_pat_covert= np.zeros((len(subjects_covert), n_channels))

for i_subject in range(n_subjects):
    
    print(f'calculating results for subject {i_subject+1}')
    
    # overt data and labels
    X_ov = X_overt[i_subject]
    y_ov = y_overt[i_subject]
    
    # covert data and labels
    X_cov = X_covert[i_subject]
    y_cov = y_covert[i_subject]
    
    # computing spatial filters and transient responses: overt
    rcca_ov = pyntbci.classifiers.rCCA(codes=V, fs=fs, event="duration", transient_size=optimum_resp_len, onset_event=True)
    rcca_ov.fit(X = X_ov, y = y_ov)
    w_ov, r_overt[i_subject] = rcca_ov.w_.flatten(), rcca_ov.r_.flatten()
    
    # computing spatial filters and transient responses: covert
    rcca_cov = pyntbci.classifiers.rCCA(codes=V, fs=fs, event="duration", transient_size=optimum_resp_len, onset_event=True)
    rcca_cov.fit(X = X_cov, y = y_cov)
    w_cov, r_covert[i_subject] = rcca_cov.w_.flatten(), rcca_cov.r_.flatten()
    
    # compute covariance matrices activity pattern np.dot(w.T,np.cov(X))
    X_ov = np.transpose(X_ov, axes = [1,0,2] )
    X_cov = np.transpose(X_cov, axes = [1,0,2])

    # covariance matrices
    X_ov_covar = np.cov(X_ov.reshape(X_ov.shape[0],X_ov.shape[1]*X_ov.shape[2])) # channels x (trials x samples)
    X_cov_covar = np.cov(X_cov.reshape(X_cov.shape[0],X_cov.shape[1]*X_cov.shape[2]))# channels x (trials x samples)

    # compute activity pattern np.dot(w.T,np.cov(X))
    activity_pat_overt[i_subject] = np.dot(w_ov.T,X_ov_covar)
    activity_pat_covert[i_subject] = np.dot(w_cov.T,X_cov_covar)
    
    print(r_overt.shape)

    # # saving files
    spatial_activity_and_tr_responses = {'activity_pattern_overt':activity_pat_overt, 
                                         'transient_response_overt':r_overt, 
                                         'activity_pattern_covert': activity_pat_covert, 
                                         'transient_response_covert':r_covert,
                                         'transient_size':optimum_resp_len}
    
    fname = os.path.join(results_path,f'spatial_activity_and_transient_responses_S{i_subject}.pickle')
    with open(fname, 'wb') as handle:
        pickle.dump(spatial_activity_and_tr_responses, handle, protocol=pickle.HIGHEST_PROTOCOL)
    print("files saved successfully")


calculating results for subject 1
(6, 216)
files saved successfully
calculating results for subject 2
(6, 216)
files saved successfully
calculating results for subject 3
(6, 216)
files saved successfully
calculating results for subject 4
(6, 216)
files saved successfully
calculating results for subject 5
(6, 216)
files saved successfully
calculating results for subject 6
(6, 216)
files saved successfully


In [39]:
# 4. permutation testing (run time: approx 6 hours)

# initializing parameters and constants
transient_size = optimum_resp_len # optimum transient response duration
num_iter = 10 # number of iterations in the permutation test

# vectors to store accuracy results
perm_test_acc_ov = np.zeros((num_iter))
perm_test_acc_cov = np.zeros((num_iter))

# important variables to save
covert_dict_permutation_testing = {}
overt_dict_permutation_testing = {}

for i_subject in range(n_subjects):

    print(f'calculating results for subject {i_subject+1}')
    
    # overt data and labels
    X_ov = X_overt[i_subject]
    y_ov = y_overt[i_subject]
    
    # covert data and labels
    X_cov = X_covert[i_subject]
    y_cov = y_covert[i_subject]
    
    # observed accuracy (averaged across folds)
    observed_acc_ov = accuracy_across_folds(X = X_ov, y = y_ov, codes = V, fs = fs, trial_time= trial_time, transient_size= transient_size).mean()    
    observed_acc_cov = accuracy_across_folds(X = X_cov, y = y_cov, codes = V, fs = fs, trial_time= trial_time, transient_size= transient_size).mean()
       
    for iter in range(num_iter):
        
        perm_test_acc_ov[iter] = accuracy_across_folds(X = X_ov, y = np.random.permutation(y_ov), codes = V, fs =fs, transient_size = transient_size, trial_time= 20, n_folds=4).mean()
        perm_test_acc_cov[iter] = accuracy_across_folds(X = X_cov, y = np.random.permutation(y_cov), codes = V, fs = fs, transient_size = transient_size, trial_time= 20, n_folds=4).mean() 

    # p values
    pvalue_ov = np.mean(perm_test_acc_ov>= observed_acc_ov)
    pvalue_cov = np.mean(perm_test_acc_cov >= observed_acc_cov)
    
    
    #  collecting info        
    overt_dict_permutation_testing[f"S{i_subject}_rand_acc_vec"] = perm_test_acc_ov
    overt_dict_permutation_testing[f"S{i_subject}_observed_acc"] = observed_acc_ov
    overt_dict_permutation_testing[f"S{i_subject}_pvalue"] = pvalue_ov
    
    covert_dict_permutation_testing[f"S{i_subject}_rand_acc_vec"] = perm_test_acc_cov
    covert_dict_permutation_testing[f"S{i_subject}_observed_acc"] = observed_acc_cov
    covert_dict_permutation_testing[f"S{i_subject}_pvalue"] = pvalue_cov

    
# saving files
with open(os.path.join(results_path,"overt_permutation_test_var.pkl"), 'wb') as pickle_file:
    pickle.dump(overt_dict_permutation_testing, pickle_file)

with open(os.path.join(results_path,"covert_permutation_test_var.pkl"), 'wb') as pickle_file:
    pickle.dump(overt_dict_permutation_testing, pickle_file)
    
print("files saved successfully")

calculating results for subject 1
calculating results for subject 2
calculating results for subject 3
calculating results for subject 4
calculating results for subject 5


KeyboardInterrupt: 

: 