In [None]:
pip install mne

In [None]:
pip install pyriemann

In [None]:
import os
import argparse
import sys
import mne
import math
import time
import json
import numpy as np
from scipy.signal import butter, filtfilt
from pyriemann.estimation import XdawnCovariances
from pyriemann.tangentspace import TangentSpace
from sklearn.pipeline import make_pipeline
from sklearn.model_selection import StratifiedKFold
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import classification_report, matthews_corrcoef

start = time.time()


def is_notebook():
    try:
        shell = get_ipython().__class__.__name__
        if shell == 'ZMQInteractiveShell':
            return True
        elif shell == 'TerminalInteractiveShell':
            return False
        else:
            return False
    except NameError:
        return False

if is_notebook():
    args = argparse.Namespace(s=None, c=None)
else:
    parser = argparse.ArgumentParser()
    parser.add_argument('-s', default=None)
    parser.add_argument('-c', default=None, type=int)

    args = parser.parse_args(args=[])

print(args.s)
print(args.c)
print(__doc__)

subject = 'sub-B'
if args.s is not None:
    subject = args.s
test_class =1
if args.c is not None:
    test_class = args.c

import numpy as np
fnum = np.array([[1,4],
                 [2,5],
                 [3,6]])

trig_id = [2,8,32]
tasks = ['low', 'low', 'mid', 'mid', 'high', 'high']
reject={'eeg':100e-6,'eog':500e-6}

import os
import json
repository_base = os.path.dirname(os.path.dirname(os.path.abspath('file_path')))
base = os.path.join(repository_base, "eeg")
save_base = os.path.join(repository_base, "results")
if not os.path.exists(save_base):
    os.makedirs(save_base)

Fs = 1000
fc = [1, 40]
resample = None

from scipy.signal import butter, filtfilt
def apply_filter(data, b, a):
    r = filtfilt(b=b, a=a, x=data)
    return r
b,a = butter(N = 2, Wn = np.array(fc)/(Fs/2), btype = 'bandpass', output = 'ba')

t_file = []
nt_file = []
target_file = []
non_target_file = []

for i in range(len(fnum.ravel())):
    fname = os.path.join(base, subject, "eeg", "%s_task-%s_run-%d_eeg.vhdr" % (subject, tasks[i], fnum.ravel()[i]))
    print(fname)
    if np.any(fnum[test_class-1] == fnum.ravel()[i]):
        if isinstance(target_file, list):
            target_file = mne.io.read_raw_brainvision(fname, preload=True, eog=('hEOG', 'vEOG'))
            target_file = target_file.apply_function(apply_filter, channel_wise=True, b=b, a=a)
            t_file.append(fnum.ravel()[i])
        else:
            tmp = mne.io.read_raw_brainvision(fname, preload=True, eog=('hEOG', 'vEOG'))
            tmp = tmp.apply_function(apply_filter, channel_wise=True, b=b, a=a)
            target_file = mne.concatenate_raws([target_file, tmp])
            t_file.append(fnum.ravel()[i])
    else:
        if isinstance(non_target_file, list):
            non_target_file = mne.io.read_raw_brainvision(fname, preload=True, eog=('hEOG', 'vEOG'))
            non_target_file = non_target_file.apply_function(apply_filter, channel_wise=True, b=b, a=a)
            nt_file.append(fnum.ravel()[i])
        else:
            tmp = mne.io.read_raw_brainvision(fname, preload=True, eog=('hEOG', 'vEOG'))
            tmp = tmp.apply_function(apply_filter, channel_wise=True, b=b, a=a)
            non_target_file = mne.concatenate_raws([non_target_file, tmp])
            nt_file.append(fnum.ravel()[i])
if resample is not None:
    target_file.resample(resample)
    non_target_file.resample(resample)

if resample != None:
    target_file.resample(resample)
    non_target_file.resample(resample)

event_id={'target_stimulus_id':-100,'non_target_stimulus_id':-500}
target_eve = mne.events_from_annotations(target_file)
target_eve = mne.merge_events(target_eve[0],[trig_id[test_class-1]],event_id['target_stimulus_id'],replace_events=True)

non_target_eve = mne.events_from_annotations(non_target_file)
non_target_eve = mne.merge_events(non_target_eve[0],[trig_id[test_class-1]],event_id['non_target_stimulus_id'],replace_events=True)

tmin,tmax= -0.02, 1.4
baseline=(0.0,0.01)
target_epochs = mne.Epochs(target_file,events=target_eve, event_id=event_id['target_stimulus_id'], tmin=tmin,tmax=tmax, baseline=baseline, reject=reject,preload = True)

# Sliding and Overlapping Window Epoching

In [None]:

import numpy as np
def find_max_peak_window(data, Fs, window_size_ms=500, overlap_size_ms=100, epoch_index=0):
    """
    Function to find the window with the maximum peak value within a specific epoch and return its start and end indices.
    """
    window_size_samples = int(window_size_ms * Fs / 1000)
    overlap_size_samples = int(overlap_size_ms * Fs / 1000)
    epoch_data = data[epoch_index]
    n_channels, n_times = epoch_data.shape
    max_peak_value = -np.inf
    max_peak_start_index = None
    max_peak_end_index = None
    start = 0
    while start + window_size_samples <= n_times:
        window_data = epoch_data[:, start:start + window_size_samples]
        current_peak_value = np.max(window_data)
        if current_peak_value > max_peak_value:
            max_peak_value = current_peak_value
            max_peak_start_index = start
            max_peak_end_index = start + window_size_samples - 1
        start += (window_size_samples - overlap_size_samples)
    print(f"Epoch {epoch_index + 1} - Max Peak Value: {max_peak_value}")
    print(f"Epoch {epoch_index + 1} - Window Start Index: {max_peak_start_index}, End Index: {max_peak_end_index}")
    return max_peak_start_index, max_peak_end_index

def analyze_all_epochs_max_peaks(data, Fs, window_size_ms=500, overlap_size_ms=100):
    """
    Function to find and print the maximum peak values and corresponding window indices across all epochs.
    """
    n_epochs = data.shape[0]
    peak_window_indices = []
    for epoch_index in range(n_epochs):
        print(f"\nAnalyzing Epoch {epoch_index + 1}")
        start_index, end_index = find_max_peak_window(data, Fs, window_size_ms, overlap_size_ms, epoch_index)
        peak_window_indices.append((start_index, end_index))
    return peak_window_indices

def re_epoch_data(data, peak_window_indices):
    """
    Function to re-epoch the data based on the peak window indices.
    """
    re_epoched_data = []
    for epoch_index, (start, end) in enumerate(peak_window_indices):
        re_epoched_epoch_data = data[epoch_index][:, start:end + 1]
        re_epoched_data.append(re_epoched_epoch_data)
    re_epoched_data = np.array(re_epoched_data)
    print("\nRe-epoching complete.")
    print("Re-epoched data shape:", re_epoched_data.shape)
    return re_epoched_data

Fs = 1000
data = target_epochs.get_data()
peak_window_indices = analyze_all_epochs_max_peaks(data, Fs, window_size_ms=500, overlap_size_ms=100)
re_epoched_data = re_epoch_data(data, peak_window_indices)

from mne import EpochsArray
from mne import create_info
n_channels = re_epoched_data.shape[1]
info = create_info(ch_names=[f'chan{i}' for i in range(n_channels)], sfreq=Fs, ch_types='eeg')
re_epoched_epochs = EpochsArray(re_epoched_data, info)
print("Re-epoched data converted to MNE Epochs:", re_epoched_epochs)



from mne import EpochsArray, create_info
n_channels = re_epoched_data.shape[1]
info = create_info(ch_names=[f'chan{i}' for i in range(n_channels)], sfreq=Fs, ch_types='eeg')
re_epoched_epochs = EpochsArray(re_epoched_data, info)
baseline=(0.0,0.01)
re_epoched_epochs.apply_baseline(baseline)
re_epoched_epochs.filter(l_freq=0.50, h_freq=100.00)
print("Re-epoched data converted to MNE Epochs and processed with baseline correction and filtering.")

tmin,tmax= -0.0, 0.499
baseline=(0.0,0.01)
non_target_epochs = mne.Epochs(non_target_file, events=non_target_eve, event_id=event_id['non_target_stimulus_id'], tmin=tmin,tmax=tmax, baseline=baseline, reject=reject,preload = True)


import mne
tmin, tmax = -0.0, 0.499
baseline=(0.0,0.01)
non_target_epochs = mne.Epochs(
    non_target_file,
    events=non_target_eve,
    event_id=event_id['non_target_stimulus_id'],
    tmin=tmin,
    tmax=tmax,
    baseline=baseline,
    reject=reject,
    preload=True
)

new_channel_names = [f'chan{i}' for i in range(len(non_target_epochs.ch_names))]
rename_dict = dict(zip(non_target_epochs.ch_names, new_channel_names))
non_target_epochs.rename_channels(rename_dict)
print("Updated channel names:", non_target_epochs.ch_names)



In [None]:
epochs = mne.concatenate_epochs([re_epoched_epochs, non_target_epochs])
epochs = epochs.copy().pick_types(eeg=True, eog=False)

# Logistic Regression

In [None]:

clf = make_pipeline(XdawnCovariances(8),
                    TangentSpace(metric='logeuclid'),
                    LogisticRegression( penalty='l1', solver='liblinear', multi_class='ovr'))

epochs_data = epochs.get_data()
labels = epochs.events[:, -1]
preds = np.zeros(len(labels))

cv = StratifiedKFold(n_splits=10, shuffle=True, random_state=42)

preds = np.empty(len(labels))
for train_idx, test_idx in cv.split(epochs_data, labels):
    clf.fit(epochs_data[train_idx], labels[train_idx])
    preds[test_idx] = clf.predict(epochs_data[test_idx])

report = classification_report(labels, preds, target_names=['non-target', 'target'], output_dict=True)
print(report)


#Support Vector Machine (SVM)

In [None]:
from sklearn.svm import SVC
from sklearn.metrics import classification_report, confusion_matrix, accuracy_score
import matplotlib.pyplot as plt
import seaborn as sns

clf = make_pipeline(XdawnCovariances(8),
                    TangentSpace(metric='logeuclid'),
                    SVC(kernel='linear', C=1, decision_function_shape='ovr'))


epochs_data = epochs.get_data()
labels = epochs.events[:, -1]
preds = np.zeros(len(labels))

cv = StratifiedKFold(n_splits=10, shuffle=True, random_state=42)

train_accuracies = []
test_accuracies = []
train_losses = []
test_losses = []
preds = np.empty(len(labels))
for train_idx, test_idx in cv.split(epochs_data, labels):
    clf.fit(epochs_data[train_idx], labels[train_idx])
    train_preds = clf.predict(epochs_data[train_idx])
    test_preds = clf.predict(epochs_data[test_idx])
    train_accuracy = accuracy_score(labels[train_idx], train_preds)
    test_accuracy = accuracy_score(labels[test_idx], test_preds)
    train_accuracies.append(train_accuracy)
    test_accuracies.append(test_accuracy)
    train_loss = np.mean(train_preds != labels[train_idx])
    test_loss = np.mean(test_preds != labels[test_idx])
    train_losses.append(train_loss)
    test_losses.append(test_loss)
    preds[test_idx] = test_preds
report = classification_report(labels, preds, target_names=['non-target', 'target'], output_dict=True)
print(report)


epochs_range = range(1, len(train_accuracies) + 1)
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(14, 6))
ax1.plot(epochs_range, train_accuracies, label='Train Accuracy', marker='o')
ax1.plot(epochs_range, test_accuracies, label='Test Accuracy', marker='x')
ax1.set_title('Train and Test Accuracy')
ax1.set_xlabel('Fold')
ax1.set_ylabel('Accuracy')
ax1.legend()


for i in range(len(train_accuracies)):
    ax1.annotate(f'{train_accuracies[i]:.2f}', (epochs_range[i], train_accuracies[i]),
                 textcoords="offset points", xytext=(0, 10), ha='center', fontsize=8)
    ax1.annotate(f'{test_accuracies[i]:.2f}', (epochs_range[i], test_accuracies[i]),
                 textcoords="offset points", xytext=(0, 10), ha='center', fontsize=8)

ax2.plot(epochs_range, train_losses, label='Train Loss', marker='o')
ax2.plot(epochs_range, test_losses, label='Test Loss', marker='x')
ax2.set_title('Train and Test Loss')
ax2.set_xlabel('Fold')
ax2.set_ylabel('Loss')
ax2.legend()

for i in range(len(train_losses)):
    ax2.annotate(f'{train_losses[i]:.2f}', (epochs_range[i], train_losses[i]),
                 textcoords="offset points", xytext=(0, 10), ha='center', fontsize=8)
    ax2.annotate(f'{test_losses[i]:.2f}', (epochs_range[i], test_losses[i]),
                 textcoords="offset points", xytext=(0, 10), ha='center', fontsize=8)

plt.tight_layout()
plt.show()
cm = confusion_matrix(labels, preds)
plt.figure(figsize=(6, 5))
sns.heatmap(cm, annot=True, fmt="d", cmap="Blues", xticklabels=['non-target', 'target'], yticklabels=['non-target', 'target'])
plt.title('Confusion Matrix')
plt.xlabel('Predicted')
plt.ylabel('True')
plt.show()


# Random Forest

In [None]:
from sklearn.ensemble import RandomForestClassifier

clf = make_pipeline(XdawnCovariances(8),
                    TangentSpace(metric='logeuclid'),
                    RandomForestClassifier(n_estimators=100, random_state=42))

epochs_data = epochs.get_data()
labels = epochs.events[:, -1]
preds = np.zeros(len(labels))

cv = StratifiedKFold(n_splits=10, shuffle=True, random_state=42)

train_accuracies = []
test_accuracies = []
train_losses = []
test_losses = []

preds = np.empty(len(labels))
for train_idx, test_idx in cv.split(epochs_data, labels):

    clf.fit(epochs_data[train_idx], labels[train_idx])
    train_preds = clf.predict(epochs_data[train_idx])
    test_preds = clf.predict(epochs_data[test_idx])
    train_accuracy = accuracy_score(labels[train_idx], train_preds)
    test_accuracy = accuracy_score(labels[test_idx], test_preds)
    train_accuracies.append(train_accuracy)
    test_accuracies.append(test_accuracy)
    train_loss = np.mean(train_preds != labels[train_idx])
    test_loss = np.mean(test_preds != labels[test_idx])

    train_losses.append(train_loss)
    test_losses.append(test_loss)
    preds[test_idx] = test_preds

report = classification_report(labels, preds, target_names=['non-target', 'target'], output_dict=True)
print(report)


epochs_range = range(1, len(train_accuracies) + 1)
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(14, 6))

ax1.plot(epochs_range, train_accuracies, label='Train Accuracy', marker='o')
ax1.plot(epochs_range, test_accuracies, label='Test Accuracy', marker='x')
ax1.set_title('Train and Test Accuracy')
ax1.set_xlabel('Fold')
ax1.set_ylabel('Accuracy')
ax1.legend()


for i in range(len(train_accuracies)):
    ax1.annotate(f'{train_accuracies[i]:.2f}', (epochs_range[i], train_accuracies[i]),
                 textcoords="offset points", xytext=(0, 10), ha='center', fontsize=8)
    ax1.annotate(f'{test_accuracies[i]:.2f}', (epochs_range[i], test_accuracies[i]),
                 textcoords="offset points", xytext=(0, 10), ha='center', fontsize=8)

ax2.plot(epochs_range, train_losses, label='Train Loss', marker='o')
ax2.plot(epochs_range, test_losses, label='Test Loss', marker='x')
ax2.set_title('Train and Test Loss')
ax2.set_xlabel('Fold')
ax2.set_ylabel('Loss')
ax2.legend()

for i in range(len(train_losses)):
    ax2.annotate(f'{train_losses[i]:.2f}', (epochs_range[i], train_losses[i]),
                 textcoords="offset points", xytext=(0, 10), ha='center', fontsize=8)
    ax2.annotate(f'{test_losses[i]:.2f}', (epochs_range[i], test_losses[i]),
                 textcoords="offset points", xytext=(0, 10), ha='center', fontsize=8)

plt.tight_layout()
plt.show()

cm = confusion_matrix(labels, preds)
plt.figure(figsize=(6, 5))
sns.heatmap(cm, annot=True, fmt="d", cmap="Blues", xticklabels=['non-target', 'target'], yticklabels=['non-target', 'target'])
plt.title('Confusion Matrix')
plt.xlabel('Predicted')
plt.ylabel('True')
plt.show()


# Polynomial Logistic Regression

In [None]:
from sklearn.preprocessing import PolynomialFeatures
from sklearn.linear_model import LogisticRegression

clf = make_pipeline(XdawnCovariances(8),
                    TangentSpace(metric='logeuclid'),
                    PolynomialFeatures(degree=2, include_bias=False),
                    LogisticRegression(penalty='l2', solver='liblinear', multi_class='ovr'))

epochs_data = epochs.get_data()
labels = epochs.events[:, -1]
preds = np.zeros(len(labels))

cv = StratifiedKFold(n_splits=10, shuffle=True, random_state=42)
train_accuracies = []
test_accuracies = []
train_losses = []
test_losses = []

preds = np.empty(len(labels))
for train_idx, test_idx in cv.split(epochs_data, labels):
    clf.fit(epochs_data[train_idx], labels[train_idx])
    train_preds = clf.predict(epochs_data[train_idx])
    test_preds = clf.predict(epochs_data[test_idx])
    train_accuracy = accuracy_score(labels[train_idx], train_preds)
    test_accuracy = accuracy_score(labels[test_idx], test_preds)
    train_accuracies.append(train_accuracy)
    test_accuracies.append(test_accuracy)
    train_loss = np.mean(train_preds != labels[train_idx])
    test_loss = np.mean(test_preds != labels[test_idx])
    train_losses.append(train_loss)
    test_losses.append(test_loss)
    preds[test_idx] = test_preds


report = classification_report(labels, preds, target_names=['non-target', 'target'], output_dict=True)
print(report)

epochs_range = range(1, len(train_accuracies) + 1)
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(14, 6))

ax1.plot(epochs_range, train_accuracies, label='Train Accuracy', marker='o')
ax1.plot(epochs_range, test_accuracies, label='Test Accuracy', marker='x')
ax1.set_title('Train and Test Accuracy')
ax1.set_xlabel('Fold')
ax1.set_ylabel('Accuracy')
ax1.legend()

for i in range(len(train_accuracies)):
    ax1.annotate(f'{train_accuracies[i]:.2f}', (epochs_range[i], train_accuracies[i]),
                 textcoords="offset points", xytext=(0, 10), ha='center', fontsize=8)
    ax1.annotate(f'{test_accuracies[i]:.2f}', (epochs_range[i], test_accuracies[i]),
                 textcoords="offset points", xytext=(0, 10), ha='center', fontsize=8)
ax2.plot(epochs_range, train_losses, label='Train Loss', marker='o')
ax2.plot(epochs_range, test_losses, label='Test Loss', marker='x')
ax2.set_title('Train and Test Loss')
ax2.set_xlabel('Fold')
ax2.set_ylabel('Loss')
ax2.legend()


for i in range(len(train_losses)):
    ax2.annotate(f'{train_losses[i]:.2f}', (epochs_range[i], train_losses[i]),
                 textcoords="offset points", xytext=(0, 10), ha='center', fontsize=8)
    ax2.annotate(f'{test_losses[i]:.2f}', (epochs_range[i], test_losses[i]),
                 textcoords="offset points", xytext=(0, 10), ha='center', fontsize=8)
plt.tight_layout()
plt.show()


cm = confusion_matrix(labels, preds)
plt.figure(figsize=(6, 5))
sns.heatmap(cm, annot=True, fmt="d", cmap="Blues", xticklabels=['non-target', 'target'], yticklabels=['non-target', 'target'])
plt.title('Confusion Matrix')
plt.xlabel('Predicted')
plt.ylabel('True')
plt.show()
