In [None]:
# Activate virtual envoriment:

import os
path='/work/807122' # Remember to change kernel to virt_env!!!
os.chdir(path)
!./activate.sh

In [None]:
### Check that im are in the right environment

import sys
print(sys.executable)
import nilearn

In [4]:
# importing modules
import os
import pip
import numpy as np
import mne
import pickle
import itertools

import matplotlib.pyplot as plt
import importlib
importlib.reload(plt)

import pylab, seaborn as sns
from scipy.stats import ttest_rel, sem

%matplotlib inline 

from sklearn.model_selection import ShuffleSplit, train_test_split, StratifiedKFold, GridSearchCV, cross_val_score, cross_val_predict
from sklearn.preprocessing import RobustScaler
from sklearn.metrics import confusion_matrix, classification_report, make_scorer, accuracy_score, f1_score, precision_recall_fscore_support
from sklearn.pipeline import make_pipeline
from mne.decoding import (SlidingEstimator, GeneralizingEstimator, Scaler,
                          cross_val_multiscore, LinearModel, get_coef,Vectorizer, CSP)
from sklearn.utils import shuffle
from sklearn import svm
from sklearn.svm import SVC
from sklearn.naive_bayes import GaussianNB
from sklearn.linear_model import LogisticRegression

from collections import Counter

import imblearn
from imblearn.over_sampling import RandomOverSampler

from scipy.stats import binom_test

from math import floor

# 1. Load data

In [None]:
# Load data in with a hierarchical structure: each participants and respectively, their three sessions

#data_dir = '/work/823001/ds003626-download/derivatives/'
data_dir = '/work/data/ds003626-download/derivatives/'

# Empty folder 
all_data = []

# Listing all folders in data_dir that start with "sub-"
participants = [folder for folder in os.listdir(data_dir) if folder.startswith('sub-')]

# Define a list of modality labels (i.e., trial class)
modality_label = ['Arriba', 'Abajo', 'Derecha', 'Izquierda']

# Create lists, that will be filled up with all participants' data
participants_pronounced = []
participants_inner = []
participants_visualized = []

# Loop through each participant's data
for participant in participants:
    participant_dir = os.path.join(data_dir, participant)
    
    # Create empty lists to store participant and session data (these resets for each participants)
    session_pronounced = []
    session_inner = []
    session_visualized = []
    
    # Loop through all sessions for each participant in part_dir
    for session_num in range(1, 4):  #session "ses-01", "ses-02", and "ses-03"
        session_dir = os.path.join(participant_dir, f'ses-0{session_num}')
        
        # [] = list, {} = dictonary 
        event_dat = []
        event_data = {}
        event_data_session = {}
        
        # Create dictionaries for the three conditions
        pronounced_con = {} # column 2 == 0
        inner_con = {} # column 2 == 1
        visualized_con = {} # column 2 == 2
        
        # .dat file path
        dat_file_path = os.path.join(session_dir, f'{participant}_ses-0{session_num}_events.dat')
    
       # if os.path.exists(dat_file_path):
        ## i have commented this line of code out, as I want to recieve an error if a event.dat file is missing
        with open(dat_file_path, "rb") as file: # "rb" = read in binary format
            event_dat = pickle.load(file)
       
        # loop over the elements of the modality_label list (match labels and events.dat)
        # filter the event_dat based on the values of column 2 (class/modality)
        for l, label in enumerate(modality_label):
            event_data[label] = event_dat[event_dat[:,1]==l,:] 
        
        # now event_data contains 4 arrays (one for each of the modalities)
 
        # EEG .fif file path
        eeg_file_path = os.path.join(session_dir, f'{participant}_ses-0{session_num}_eeg-epo.fif')
        
        #if os.path.exists(eeg_file_path):
        ## same goes for the eeg.fif files
        epochs = mne.read_epochs(eeg_file_path, preload=True)
        
        # Check for epochs .fif and events .dat file correspondence (they will be matched on timestamps)
        # Correspondence = 0
        if not sum(epochs.events[:,0]-event_dat[:,0])==0:
            print("MISMATCH IN EPOCHS/DAT FILE TIMESTAMPS! {}".format(session_dir))
            break # Naturally, we want to stop the loop, if an error / mismatch occurs 
        
        # Specify channel location (EEG data acquired used BioSemi128)
        montage = mne.channels.make_standard_montage('biosemi128')
        epochs.set_montage(montage, verbose=False)
        
        # Loop over each condition (0, 1, 2) in modaility_label in data-file
        for label in modality_label:
            pronounced_con[label] = epochs[label][event_data[label][:, 2] == 0]
            inner_con[label] = epochs[label][event_data[label][:, 2] == 1]
            visualized_con[label] = epochs[label][event_data[label][:, 2] == 2]
        
            # Same sanity-check for the conditions and modalities
            if not (sum(pronounced_con[label].events[:,0]-event_data[label][event_data[label][:, 2] == 0, 0])==0 or 
                    sum(inner_con[label].events[:,0]-event_data[label][event_data[label][:, 2] == 1, 0])==0 or
                    sum(visualized_con[label].events[:,0]-event_data[label][event_data[label][:, 2] == 2, 0])==0):
                print("MISMATCH IN LABEL EPOCHS / LABEL DAT FILE TIMESTAMPS! {}: {}".format(session_dir, label))
                break
        
        session_pronounced.append(pronounced_con)
        session_inner.append(inner_con)
        session_visualized.append(visualized_con)
     
    # Empty dictionaries to store concatenated data for each condition
    concatenated_pronounced = {}
    concatenated_inner = {}
    concatenated_visualized = {}
    
    # Loop through the modality labels
    for label in modality_label:
        # Concatenate the sessions for the pronounced con
        pronounced_sessions = [sessions[label] for sessions in session_pronounced]
        concatenated_pronounced[label] = mne.concatenate_epochs(pronounced_sessions)

        # Concatenate the sessions for the inner con
        inner_sessions = [sessions[label] for sessions in session_inner]
        concatenated_inner[label] = mne.concatenate_epochs(inner_sessions)

        # Concatenate the sessions for the visualized 
        visualized_sessions = [sessions[label] for sessions in session_visualized]
        concatenated_visualized[label] = mne.concatenate_epochs(visualized_sessions)
    
    # Combine back into an epochs object with the four directions as conditions
    # ... now I will be able to analyze all 4 directions/labels over all 3 sessions for inner speech in the same object
    conc_pronounced = mne.concatenate_epochs([concatenated_pronounced[label] for label in modality_label])
    conc_inner = mne.concatenate_epochs([concatenated_inner[label] for label in modality_label])
    conc_visualized = mne.concatenate_epochs([concatenated_visualized[label] for label in modality_label])
    
    # Print length of participant_data to check if it's empty
    print(f"Participant {participant}: Number of sessions with EEG data: {len(concatenated_pronounced)}")
    
    # Append the EEG data to the participant_data list
    participants_pronounced.append(conc_pronounced)  
    participants_inner.append(conc_inner)  
    participants_visualized.append(conc_visualized)  
        
    # Append the epochs to the all_data list
    #all_data.append(epochs)
            
# Print length of all_data to check if it's empty 
print(f"Total number of participants with EEG data: {len(participants_pronounced)}")


### 1.2 Filtering

In [None]:
low_pass = 40  # Lets all frequencies lower than 40 Hz pass through

# Loop through all participants' conditions and apply low-pass filter of 40 Hz
for i in range(0, 10):  # Beacuse I have 10 participants        
    participants_pronounced[i] = participants_pronounced[i].filter(None, low_pass)

for i in range(0, 10):      
    participants_inner[i] = participants_inner[i].filter(None, low_pass)

for i in range(0, 10):      
    participants_visualized[i] = participants_visualized[i].filter(None, low_pass)

### 1.3 Reject criteria + bad channels + Interpolate bad channels

In [7]:
# The following channels will be removed, as they caused too many epochs to be rejected:

participants_pronounced[1].info['bads'] = ['D24', 'D18', 'B4', 'B2', 'D9', 'D10', 'A5','D14', 'A30', 'D20', 'B24'] 
participants_inner[1].info['bads'] = ['D24', 'D18', 'B4', 'B2', 'D9', 'D10', 'A5', 'D14', 'A30', 'D20', 'B24'] 
participants_visualized[1].info['bads'] = ['D24', 'D18', 'B4', 'B2', 'D9', 'D10', 'A5'] 

participants_pronounced[2].info['bads'] = ['A25', 'A27']
participants_inner[2].info['bads'] = ['A25', 'A27']
participants_visualized[2].info['bads'] = ['A25', 'A27']

participants_pronounced[4].info['bads'] =['B23', 'B24']
participants_inner[4].info['bads'] =['B23', 'B24']
participants_visualized[4].info['bads'] =['B26']

participants_pronounced[5].info['bads'] =['D6', 'D8', 'D9', 'D10']
participants_inner[5].info['bads'] =['D6', 'D8', 'D9', 'D10']
participants_visualized[5].info['bads'] =['D6', 'D8', 'D9', 'D10']

participants_pronounced[9].info['bads'] = ['D1', 'C1', 'B20', 'A1', 'A2', 'A3', 'B1', 'C2']
participants_inner[9].info['bads'] = ['D1', 'C1', 'B20', 'A1', 'A2', 'A3', 'B1', 'C2']

In [None]:
# Interpolate bad channels
# Before doing classification, it is important, that my data maintain identical dimensionalities for all subjects
# As the same channels are not bad for all subjects, I will reconstruct bad channels by interpolating its signal
# based on the signals of the good sensors around them

for sub in participants_pronounced:
    sub.copy().interpolate_bads(reset_bads=False)

for sub in participants_inner:
    sub.copy().interpolate_bads(reset_bads=False)

for sub in participants_visualized:
    sub.copy().interpolate_bads(reset_bads=False)

In [None]:
# Rejecting EEG epochs above 200 microvolts.
reject_criteria = dict(eeg=200e-6)

# Loop through the list of EpochsArray objects for each participant in each condtion
for sub in participants_pronounced:
    sub.drop_bad(reject=reject_criteria)
    
for sub in participants_inner:
    sub.drop_bad(reject=reject_criteria)

for sub in participants_visualized:
    sub.drop_bad(reject=reject_criteria)

### 1.5 Time window for epochs

In [10]:
# Pronounced vs inner speech:
## I only want to classify signal from the action interval (where the participant had to imagine or pronounce speech),
## I have to define the temporal epoch, which I will use as input for the classifier.

action_participants_pronounced = []
action_participants_inner = []
action_participants_visualized = []

action_tmin = 1
action_tmax = 3.5

for sub in participants_pronounced:
    time_windowed_data = sub.copy().crop(tmin=action_tmin, tmax=action_tmax)
    action_participants_pronounced.append(time_windowed_data)

for sub in participants_inner:
    time_windowed_data = sub.copy().crop(tmin=action_tmin, tmax=action_tmax)
    action_participants_inner.append(time_windowed_data)

for sub in participants_visualized:
    time_windowed_data = sub.copy().crop(tmin=action_tmin, tmax=action_tmax)
    action_participants_visualized.append(time_windowed_data)

In [None]:
############ For the rest vs inner condition ############
## as the timestamps dont match, I apply numpy.concatenate directly on my numpy arrays ##
# No need to apply RandomOverSampler as this data is balanced

action_participants_inner_2 = []
rest_participants_inner = []

rest_tmin = 3.5
rest_tmax = 4

action_tmin_2 = 1
action_tmax_2 = 1.5

for sub in participants_inner:
    time_windowed_data = sub.copy().crop(tmin=action_tmin_2, tmax=action_tmax_2)
    action_participants_inner_2.append(time_windowed_data.get_data())

for sub in participants_inner:
    rest_time_window_data = sub.copy().crop(tmin=rest_tmin, tmax=rest_tmax)
    rest_participants_inner.append(rest_time_window_data.get_data())

ri_balanced_X = []
ri_balanced_y = []
    
for participant_idx in range(len(participants_inner)):
    X_rest = rest_participants_inner[participant_idx]
    X_action = action_participants_inner_2[participant_idx]
    
    y_rest = np.zeros(X_rest.shape[0])
    y_action = np.ones(X_action.shape[0])
    
    # Concatenate numpy arrays directly
    X_participant = np.concatenate([X_rest, X_action], axis=0)
    y_participant = np.concatenate([y_rest, y_action], axis=0)

    class_balanced = Counter(y_participant)
    print(f'Participant {participant_idx}: {class_balanced}')
    
    # Reshape the resampled data back to 3D format
    #X_3D = X_over.reshape(-1, 128, 641) # (trials, channels, timepoints)
    
    ri_balanced_X.append(X_participant)
    ri_balanced_y.append(y_participant)
    
ri_X_overall = np.vstack(ri_balanced_X) # type = numpy.ndarray
ri_y_overall = np.hstack(ri_balanced_y)

# Model testing

In [None]:
balanced_X_sub = []
balanced_y_sub = []

X_sub = mne.concatenate_epochs([action_participants_pronounced[0], action_participants_inner[0]]).get_data() # (309, 128, 641)
X_sub = X_sub.reshape(-1, 128 * 641) # (309, 82048)
y_sub = np.concatenate([np.zeros(len(action_participants_pronounced[0])), np.ones(len(action_participants_inner[0]))]) # (309,)

class_before = Counter(y_sub)
print(f'Participant 0: {class_before}') # Print y before applying the random oversampler

# Apply RandomOverSampler for the current participant
oversample = RandomOverSampler(sampling_strategy='minority')
X_sub_over, y_sub_over = oversample.fit_resample(X_sub, y_sub) 

class_after = Counter(y_sub_over)
print(f'Participant 0: {class_after}') # Counter({0.0: 204, 1.0: 204})

X_sub_over = X_sub_over.reshape((-1, 128, 641))

balanced_X_sub.append(X_sub_over) # type = list
balanced_y_sub.append(y_sub_over)

X_participant = balanced_X_sub[0] # type = numpy.ndarray
y_participant = balanced_y_sub[0] 

#### Logistic Regression

In [None]:
cv = StratifiedShuffleSplit(n_splits=5,test_size=0.2,random_state=124) #random_data = set.seed() (to ensure reproducibility)

# create a classifier with a scaling function and l2 penalty
clf = make_pipeline(RobustScaler(), 
                    LinearModel(LogisticRegression(penalty='l2',solver='saga',max_iter=1000)))

time_decod = SlidingEstimator(clf, scoring='roc_auc')

scores = cross_val_multiscore(time_decod, X_participant, y_participant, cv=cv)

scores_mean = np.mean(scores, axis=0)

best_timepoint = np.argmax(scores_mean)
print("The most informative timepoint is:",best_timepoint)

time_decod.fit(X_participant, y_participant)

coef = get_coef(time_decod, 'patterns_',inverse_transform=True)

preds = time_decod.predict_proba(X_participant)[:,best_timepoint, :]

y_preds = [np.argmax(trial) for trial in preds]
print(y_preds[:10])

conf_matrix = confusion_matrix(y_participant, y_preds, labels = [0,1])
disp = ConfusionMatrixDisplay(confusion_matrix=conf_matrix, display_labels=['pronounced speech', 'inner speech']) 
disp.plot()
plt.show()

clf_report = classification_report(y_participant, y_preds, target_names = ['pronounced speech', 'inner speech']) 
print(clf_report)

accuracy = accuracy_score(y_participant, y_preds)
print(f"Accuracy: {accuracy}")
loss = log_loss(y_participant, preds)
print(f"Loss: {loss}")

In [None]:
significance_test = binomtest(k =191+197, n= 476,p=0.5)
significance_test

In [None]:
num_permutations = 1000
permutation_scores = []
num_classes = 2

for _ in range(num_permutations):
    shuffled_labels = shuffle(y_preds) # or all_predicted_labels?
    accuracy = accuracy_score(y_preds, shuffled_labels)
    permutation_scores.append(accuracy)

p_value = (np.sum(np.array(permutation_scores) >= accuracy) + 1) / (num_permutations + 1)
print(p_value)

plt.hist(permutation_scores, 20, label='Permutation scores', edgecolor='black')
ylim = plt.ylim()
plt.plot(2 * [accuracy], ylim, '--g', linewidth=3, label=f'Actual Accuracy (p-value {p_value:.4f})')
plt.plot(2 * [1. / num_classes], ylim, '--k', linewidth=3, label='Chance level')
plt.ylim(ylim)
plt.legend()
plt.xlabel('Accuracy')
plt.title('Permutation Test for Accuracy')
plt.show()

#### SVM 

In [None]:
print(X_participant.shape)
print(y_participant)

In [None]:
classifier_svm = make_pipeline(Vectorizer(), RobustScaler(), SVC(kernel='rbf', C=1))
cv_score_svm = cross_val_score(classifier_svm, X_participant, y_participant, cv=5)

# Print mean prediction score
print('Mean prediction score for SVM:', np.mean(cv_score_svm))

# Classification report for SVM
y_pred_svm = cross_val_predict(classifier_svm, X_participant, y_participant, cv=5)
print('Classification Report for SVM:')
print(classification_report(y_participant, y_pred_svm))

# Confusion matrix for SVM
conf_matrix_svm = confusion_matrix(y_participant, y_pred_svm)
print('Confusion Matrix for SVM:')
print(conf_matrix_svm)

In [None]:
significance_test = binomtest(k =188+201, n= 448,p=0.5)
significance_test

In [None]:
num_permutations = 1000
permutation_scores = []
num_classes = np.unique(y_sub_over).size

for _ in range(num_permutations):
    shuffled_labels = shuffle(y_pred_svm) # or all_predicted_labels?
    accuracy = accuracy_score(y_pred_svm, shuffled_labels)
    permutation_scores.append(accuracy)

p_value = (np.sum(np.array(permutation_scores) >= accuracy) + 1) / (num_permutations + 1)
print(p_value)

plt.hist(permutation_scores, 20, label='Permutation scores', edgecolor='black')
ylim = plt.ylim()
plt.plot(2 * [accuracy], ylim, '--g', linewidth=3, label=f'Actual Accuracy (p-value {p_value:.4f})')
plt.plot(2 * [1. / num_classes], ylim, '--k', linewidth=3, label='Chance level')
plt.ylim(ylim)
plt.legend()
plt.xlabel('Accuracy')
plt.title('Permutation Test for Accuracy')
plt.show()

### GaussainNB

In [None]:
print(X_participant.shape)
print(y_participant)

In [62]:
classifier = GaussianNB()
X = RobustScaler().fit_transform(X_participant)

In [None]:
cv_score = cross_val_score(GaussianNB(), X_participant, y_participant, cv=5)
print('Score of each individual cross-validation fold:', cv_score)
print('Mean prediction score:',np.mean(cv_score))

# Classification report
y_pred = cross_val_predict(GaussianNB(), X_participant, y_participant, cv=5)
print('Classification Report:')
print(classification_report(y_participant, y_pred))

# Confusion matrix
conf_matrix = confusion_matrix(y_participant, y_pred)
print('Confusion Matrix:')
print(conf_matrix)

In [None]:
from sklearn.model_selection import permutation_test_score

score, permutation_scores, pvalue= permutation_test_score(
    GaussianNB(), X_participant, y_participant, cv=5, n_permutations=50, 
    n_jobs=1, random_state=0, verbose=0, scoring=None)
print("Classification Accuracy: %s (pvalue : %s)" % (score, pvalue))

In [None]:
n_classes = np.unique(y_participant).size

plt.hist(permutation_scores, 20, label='Permutation scores',
         edgecolor='black')
ylim = plt.ylim()
plt.plot(2 * [score], ylim, '--g', linewidth=3,
         label='Classification Score'
         ' (pvalue %s)' % pvalue)
plt.plot(2 * [1. / n_classes], ylim, '--k', linewidth=3, label='Chance level')

plt.ylim(ylim)
plt.legend()
plt.xlabel('Score')
plt.show()