In [2]:
!git clone https://github.com/Charliebond125/MEGNet.git

Cloning into 'MEGNet'...
remote: Enumerating objects: 48, done.[K
remote: Counting objects: 100% (48/48), done.[K
remote: Compressing objects: 100% (46/46), done.[K
remote: Total 48 (delta 19), reused 0 (delta 0), pack-reused 0[K
Receiving objects: 100% (48/48), 24.32 MiB | 7.69 MiB/s, done.
Resolving deltas: 100% (19/19), done.


In [1]:
!pip install mne



In [2]:
import numpy as np
import pandas as pd
import mne
import random
from mne import read_epochs
from mne import pick_types, Epochs
from mne.channels import read_layout
from tensorflow.keras.utils import to_categorical
from sklearn.utils import shuffle
from sklearn.metrics import roc_auc_score, accuracy_score, f1_score
from sklearn.model_selection import train_test_split
from matplotlib import pyplot as plt
import tensorflow as tf
from tensorflow.keras import utils as np_utils
from tensorflow.keras.callbacks import ModelCheckpoint
from tensorflow.keras.callbacks import EarlyStopping
from tensorflow.keras import backend as K
K.set_image_data_format('channels_last')
import sys, os

from MEGNet.MEGModels import MEGNet, ShallowConvNet, DeepConvNet

In [3]:
# Function to preprocess the data for a specific event pair
def preprocess_data(event_pair_function, epochs_data):
    epochs_data_selected = event_pair_function(epochs_data)

    # Select only gradiometer channels
    grad_channels = mne.pick_types(epochs_data_selected.info, meg='grad')
    grad_channel_names = [epochs_data_selected.ch_names[ch] for ch in grad_channels]
    epochs_data_selected = epochs_data_selected.pick_channels(grad_channel_names)

    return epochs_data_selected

# Your provided functions for each event pair
def hands_vs_feet(epochs_data):
    event_dict = [
        "hand_imagery",
        "feet_imagery"
    ]

    return epochs_data[event_dict]


def hands_vs_word(epochs_data):
    event_dict = [
        "hand_imagery",
        "word_imagery"
    ]

    return epochs_data[event_dict]

def hands_vs_sub(epochs_data):
    event_dict = [
        "hand_imagery",
        "subtraction_imagery"
    ]


    return epochs_data[event_dict]

def feet_vs_word(epochs_data):
    event_dict = [
        "feet_imagery",
        "word_imagery"
    ]

    return epochs_data[event_dict]

def feet_vs_sub(epochs_data):
    event_dict = [
        "feet_imagery",
        "subtraction_imagery"
    ]

    return epochs_data[event_dict]

def word_vs_sub(epochs_data):
    event_dict = [
        "word_imagery",
        "subtraction_imagery"
    ]

    return epochs_data[event_dict]

def min_max_scaler(epoch_data):
    # scales data to -1, 1
    max_abs = np.max(np.abs(epoch_data))
    processed_data = epoch_data / max_abs
    return processed_data

def mne_standard_scaler(epoch_data):
    # Different from sklearn standardscaler
    # Works via channel to channel

    from mne.decoding import Scaler
    scaler = Scaler(scalings="mean")
    scaled_data = scaler.fit_transform(epoch_data)
    return scaled_data

def z_score_scaler(epoch_data):
    # scales by removing mean and unit standard deviation
    mean = np.mean(epoch_data)
    std = np.std(epoch_data)
    processed_data = (epoch_data - mean) / std
    return processed_data

Load the combined epochs

In [None]:
# Define epochs for training
n_epochs = 20

out_f_name = '/content' +'acc_EEGNet' + '_epochs_' + str(n_epochs) + '_filtered_' + 'sub.npy'
print(out_f_name)

# loading combined epochs data
print("Loading epoched data...")
epoched_data = mne.read_epochs('/content/drive/MyDrive/MEG/Maxwell Filtered Data/Combined Run data/Sub_11_combined_runs_epoched_data-epo.fif', preload=True)
print(f"Epoched data successfully loaded {epoched_data}")

# Train MEGNet model

In [None]:
# Choosing kernel length values
kl_values = [8,16, 32, 64, 128]

# Controlling dropout values
d_values = [0.1, 0.2, 0.5]

# Initialize the DataFrame to store the results
results_df = pd.DataFrame(columns=['Event Pair', 'Best kl', 'Best d', 'Best Accuracy','Best AUC Score', 'Best F1 Score'])

# List of event pairs and their corresponding selection functions
event_pairs = [
    ('hands_vs_word', hands_vs_word),
    ('hands_vs_sub', hands_vs_sub),
    ('feet_vs_word', feet_vs_word),
    ('feet_vs_sub', feet_vs_sub),
    ('hands_vs_feet', hands_vs_feet),
    ('word_vs_sub', word_vs_sub)

]

# Iterate over the event pairs
for event_pair_name, event_pair_function in event_pairs:
    print(f"Training for event pair: {event_pair_name}")

    # Preprocess the data for the current event pair
    epochs_data_selected = preprocess_data(event_pair_function, epoched_data)

    # Extract data and labels
    X = epochs_data_selected.get_data()
    y = epochs_data_selected.events[:, -1]

    # Map the event codes to integer labels starting from 0
    unique_events = np.unique(y)
    event_to_label = {event: idx for idx, event in enumerate(unique_events)}
    y_labels = np.array([event_to_label[event] for event in y])

    # One-hot encode the labels
    y_one_hot = to_categorical(y_labels, num_classes=2)


    # Split the data into training and testing sets (95% for training, 5% for testing)
    X_train, X_test, y_train, y_test = train_test_split(X, y_one_hot, test_size=0.05, random_state=42, stratify=y_one_hot)


    # Print class distribution for training set
    train_class_distribution = pd.Series(y_train.argmax(axis=1)).value_counts(normalize=True)
    print("Class Distribution in Training Set:")
    print(train_class_distribution)

    # Print class distribution for testing set
    test_class_distribution = pd.Series(y_test.argmax(axis=1)).value_counts(normalize=True)
    print("Class Distribution in Testing Set:")
    print(test_class_distribution)

    # Normalize the data using min-max scaling
    X_train = min_max_scaler(X_train)
    X_test = min_max_scaler(X_test)

    # Reshape the data to match EEGNet input format
    X_train = X_train.reshape(X_train.shape[0], X_train.shape[1], X_train.shape[2], 1)
    X_test = X_test.reshape(X_test.shape[0], X_test.shape[1], X_test.shape[2], 1)


    # Update the nb_classes parameter in the EEGNet function
    nb_classes = y_train.shape[1]

    # Initialize variables to store the highest AUC score and corresponding kl, d values for the current event pair
    highest_auc_score = 0
    highest_acc = 0
    f1 = 0
    best_kl = None
    best_d = None

    # Iterate over the kl and d values
    for kl in kl_values:
        for d in d_values:
            print()
            print(f"Current kl: {kl}")
            print(f"Current d: {d}")
            print()

            # Configure the MEGNet model
            # The model will be trained and tested with randomized class assignments for each  sample.
            model = MEGNet(nb_classes=nb_classes, Chans=X_train.shape[1], Samples=X_train.shape[2], dropoutRate=d, kernLength=kl)
            # print(model.summary())
            # Compile the model
            model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])

            # Set a valid path for model checkpoints
            checkpointer = ModelCheckpoint(filepath='/tmp/checkpoint.h5', verbose=1, save_best_only=True)

            # Class weights
            class_weights = {0: 1, 1: 1}

            # Fit the model to the training data
            history = model.fit(X_train, y_train, batch_size=16, epochs=n_epochs, verbose=2,
                                validation_split=0.2, callbacks=[checkpointer, EarlyStopping(patience=5)],
                                class_weight=class_weights)

            # Evaluate the model on the test data
            y_pred = model.predict(X_test)
            auc_score = roc_auc_score(y_test, y_pred)
            acc = accuracy_score(y_test.argmax(axis=1), y_pred.argmax(axis=1))
            f1_score_val = f1_score(y_test.argmax(axis=1), y_pred.argmax(axis=1), average='macro')

            # Print the Accuracy, AUC and F1 scores
            print('Accuracy Score:', acc)
            print('AUC Accuracy Score:', auc_score)
            print('F1 Score:', f1_score_val)

            # Check if the current AUC score is higher than the highest recorded score for the current event pair
            if acc > highest_acc:
                highest_auc_score = auc_score
                best_kl = kl
                best_d = d
                f1 = f1_score_val
                highest_acc = acc

            # Reset the Keras session to clear the model
            K.clear_session()

            print()
            print('=' * 100)
            print('=' * 100)
            print()

    # Append the best parameters and scores for the current event pair to the results DataFrame
    results_df = results_df.append({'Event Pair': event_pair_name, 'Best kl': best_kl, 'Best d': best_d,
                                    'Best Accuracy':highest_acc,'Best AUC Score': highest_auc_score,
                                    'Best F1 Score': f1},ignore_index=True)

# Print the results DataFrame
print(results_df)

# Train ShallowConvNet or DeepConvNet

In [None]:
# Controlling dropout values
d_values = [0.5, 0.6, 0.8]

# Initialize the DataFrame to store the results
results_df = pd.DataFrame(columns=['Event Pair', 'Best d','Best Accuracy',  'Best AUC Score', 'Best F1 Score'])

# List of event pairs and their corresponding selection functions
event_pairs = [
    ('hands_vs_word', hands_vs_word),
    ('hands_vs_sub', hands_vs_sub),
    ('feet_vs_word', feet_vs_word),
    ('feet_vs_sub', feet_vs_sub),
    ('hands_vs_feet', hands_vs_feet),
    ('word_vs_sub', word_vs_sub)
]



# Iterate over the event pairs
for event_pair_name, event_pair_function in event_pairs:
    print(f"Training for event pair: {event_pair_name}")

    # Preprocess the data for the current event pair
    epochs_data_selected = preprocess_data(event_pair_function, epoched_data)

    # Extract data and labels
    X = epochs_data_selected.get_data()
    y = epochs_data_selected.events[:, -1]

    # Map the event codes to integer labels starting from 0
    unique_events = np.unique(y)
    event_to_label = {event: idx for idx, event in enumerate(unique_events)}
    y_labels = np.array([event_to_label[event] for event in y])

    # One-hot encode the labels
    y_one_hot = to_categorical(y_labels, num_classes=2)

    # Split the data into training and testing sets (95% for training, 5% for testing)
    X_train, X_test, y_train, y_test = train_test_split(X, y_one_hot, test_size=0.05, random_state=42, stratify=y_one_hot)


    # Print class distribution for training set
    train_class_distribution = pd.Series(y_train.argmax(axis=1)).value_counts(normalize=True)
    print("Class Distribution in Training Set:")
    print(train_class_distribution)

    # Print class distribution for testing set
    test_class_distribution = pd.Series(y_test.argmax(axis=1)).value_counts(normalize=True)
    print("Class Distribution in Testing Set:")
    print(test_class_distribution)

    # Normalize the data using min-max scaling
    X_train = min_max_scaler(X_train)
    X_test = min_max_scaler(X_test)

    # Reshape the data to match EEGNet input format
    X_train = X_train.reshape(X_train.shape[0], X_train.shape[1], X_train.shape[2], 1)
    X_test = X_test.reshape(X_test.shape[0], X_test.shape[1], X_test.shape[2], 1)


    # Update the nb_classes parameter in the EEGNet function
    nb_classes = y_train.shape[1]

    # Initialize variables to store the highest AUC score and corresponding kl, d values for the current event pair
    highest_auc_score = 0
    highest_acc = 0
    f1 = 0
    best_d = None

    # Iterate over the kl and d values
    # for kl in kl_values:# Only for MEGNet/EEGNet
    for d in d_values:
        print()
        # print(f"Current kl: {kl}")
        print(f"Current d: {d}")
        print()

        # Configure the ShallowConvNet/DeepConvNet model
        model = ShallowConvNet(nb_classes=nb_classes, Chans=X_train.shape[1], Samples=X_train.shape[2], dropoutRate=d)

        # Compile the model
        model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])

        # Set a valid path for model checkpoints
        checkpointer = ModelCheckpoint(filepath='/tmp/checkpoint.h5', verbose=1, save_best_only=True)

        # Class weights
        class_weights = {0: 1, 1: 1}

        # Fit the model to the training data
        history = model.fit(X_train, y_train, batch_size=16, epochs=n_epochs, verbose=2,
                            validation_split=0.2, callbacks=[checkpointer, EarlyStopping(patience=5)],
                            class_weight=class_weights)


        # Evaluate the model on the test data
        y_pred = model.predict(X_test)
        auc_score = roc_auc_score(y_test, y_pred)
        acc = accuracy_score(y_test.argmax(axis=1), y_pred.argmax(axis=1))
        f1_score_val = f1_score(y_test.argmax(axis=1), y_pred.argmax(axis=1), average='macro')

        # Print the Accuracy, AUC and F1 scores
        print('Accuracy Score:', acc)
        print('AUC Accuracy Score:', auc_score)
        print('F1 Score:', f1_score_val)

        # Check if the current AUC score is higher than the highest recorded score for the current event pair
        if acc > highest_acc:
            highest_auc_score = auc_score
            best_d = d
            f1 = f1_score_val
            highest_acc = acc

        # Reset the Keras session to clear the model
        K.clear_session()

        print()
        print('=' * 100)
        print('=' * 100)
        print()

    # Append the best parameters and scores for the current event pair to the results DataFrame
    results_df = results_df.append({'Event Pair': event_pair_name, 'Best d': best_d,
                                    'Best Accuracy':highest_acc,'Best AUC Score': highest_auc_score, 'Best F1 Score': f1},
                                    ignore_index=True)

# Print the results DataFrame
print(results_df)