In [301]:
import mne
from mne.decoding import Scaler, Vectorizer, CSP, cross_val_multiscore
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np
import pandas as pd
import glob
from scipy import io, stats

In [302]:
from sklearn.model_selection import train_test_split, cross_val_score, ShuffleSplit
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis
from sklearn.svm import SVC
from sklearn.ensemble import RandomForestClassifier, AdaBoostClassifier
from sklearn.metrics import accuracy_score, confusion_matrix, classification_report
from sklearn.feature_selection import SelectKBest
from sklearn.base import BaseEstimator, TransformerMixin

In [304]:
def channel_selection(epochs):
    X = epochs.get_data()
    y = epochs.events[:, -1]
    sfreq = epochs.info["sfreq"]

    n_trials, n_channels, n_samples = X.shape
    
    # Calculate corrcoef of each channel pair for each trials
    corrcoefs = np.empty((n_trials, n_channels, n_channels))
    
    for trial in range(n_trials):
        corrcoefs[trial] = np.corrcoef(X[trial])
    
    corr_1 = np.mean(corrcoefs[y==1], 0)
    corr_2 = np.mean(corrcoefs[y==2], 0)
    
    t_statistic, p_values = stats.ttest_ind(corrcoefs[y==1], corrcoefs[y==2])
    
    mi_scores = np.sum(p_values < 0.05, axis=1)
    mi_scores_mean = np.mean(mi_scores)

    distinct_channels = np.where(mi_scores > mi_scores_mean)[0]
    corr_channels = np.logical_and(np.abs(corr_1) >= 0.5, np.abs(corr_2) >= 0.5)

    psds, freqs = mne.time_frequency.psd_array_welch(X, sfreq, verbose=False)
    
    # Function to calculate Fisher score for a channel group
    def calculate_fisher_score(channel_group):
        left_psds = psds[y==1][:, channel_group]
        right_psds = psds[y==2][:, channel_group]

        # Compute mean and variance for each class and channel
        mean_left = np.mean(left_psds, axis=0)
        var_left = np.var(left_psds, axis=0)

        mean_right = np.mean(right_psds, axis=0)
        var_right = np.var(right_psds, axis=0)

        # Calculate Fischer score for each channel
        return np.mean((mean_left - mean_right) ** 2 / (var_left + var_right))
        
    max_fisher_score = 0.0
    selected_channels = np.where(epochs.info["ch_names"])[0]
    
    for ch in distinct_channels:
        channel_group = np.where(corr_channels[ch])[0]
        
        fisher_score = calculate_fisher_score(channel_group)
        if fisher_score > max_fisher_score:
            max_fisher_score = fisher_score
            selected_channels = channel_group

    return selected_channels

In [305]:
np.concatenate(([1], [2,3]))

array([1, 2, 3])

In [306]:
def feature_extraction(X):
    X_transformed = np.concatenate((
        np.mean(X, axis=-1),
        np.amax(X, axis=-1),
        np.amin(X, axis=-1),
        np.std(X, axis=-1),
        stats.skew(X, axis=-1),
        stats.kurtosis(X, axis=-1)
    ), axis=-1)  
    
    return X_transformed 

In [380]:
from skrebate import ReliefF

class ReliefF_Selector():
    def __init__(self):
        self.reliefF = ReliefF(n_neighbors=10)
        pass

    def fit(self, X, y=None):
        self.reliefF.fit(X, y)
        return self

    def transform(self, X, y=None):
        return X[:, self.reliefF.top_features_[:10]]

In [308]:
from mrmr import mrmr_classif

class MRMR_Selector():
    def __init__(self):
        pass

    def fit(self, X, y=None):
        data = pd.DataFrame(X, copy=True)
        labels = pd.Series(y, copy=True)
        self.selected_features = mrmr_classif(X=data, y=labels, K=60, show_progress=False)
        
        return self

    def transform(self, X, y=None):
        return X[:, self.selected_features]

In [309]:
from sklearn.preprocessing import MinMaxScaler
from sklearn.feature_selection import chi2 

class Chi2_Selector():
    def __init__(self):
        self.chi2_selector = SelectKBest(chi2, k=60) 
        self.scaler = MinMaxScaler()
        pass

    def fit(self, X, y=None):
        self.chi2_selector.fit(self.scaler.fit_transform(X), y)
        return self

    def transform(self, X, y=None):
        return self.chi2_selector.transform(self.scaler.fit_transform(X))

In [310]:
from sklearn.feature_selection import f_classif

class ANOVA_Selector():
    def __init__(self):
        self.anova_selector = SelectKBest(f_classif, k=60) 
        pass

    def fit(self, X, y=None):
        self.anova_selector.fit(X, y)

        return self

    def transform(self, X, y=None):
        return self.anova_selector.transform(X)

In [311]:
class KruskalWallis_Selector():
    def __init__(self):
        self.p_values = []
        pass

    def fit(self, X, y=None):
        for idx in range(X.shape[1]):
            _, p_value = stats.kruskal(X[:,idx], y)
            self.p_values.append(p_value)

        self.selected_features = np.argsort(self.p_values)[:60]
        return self

    def transform(self, X, y=None):
        return X[:, self.selected_features]

In [312]:
dataset = 'A'

subjects = ['01', '02', '03', '04', '05', '06', '07', '08', '09', '10',
            '11', '12', '13', '14', '15', '16', '17', '18', '19', '20',
            '21', '22', '23', '24', '25', '26', '27', '28', '29']

good_subjects = ['01', '02', '03', '04', '06', '07', '08', '09', '10',
            '11', '12', '13', '14', '15', '16', '17', '18', '19', '20',
            '21', '22', '23', '24', '25', '26', '27', '28', '29']

bad_subjects = ['05']

In [402]:
scores = []

for subject in good_subjects:
    epochs_eeg = mne.read_epochs(f"epochs_eeg/Dataset_{dataset}/subject_{subject}_epo.fif", preload=True, verbose=False)

    epochs_eeg.crop(tmin=0, tmax=10).pick(picks="eeg")
    
    X = epochs_eeg.get_data()
    y = epochs_eeg.events[:, -1]

    cv = ShuffleSplit(5, test_size=0.2, random_state=42)
    cv_split = cv.split(y)

    scaler = StandardScaler()
    selector = ReliefF_Selector()
    classifier = LinearDiscriminantAnalysis()

    cv_scores = []

    # selected_eeg_channels = channel_selection(epochs_eeg)
    
    X_transformed = feature_extraction(X[:, :])
    
    X_transformed = scaler.fit_transform(X_transformed)
    
    selector.fit(X_transformed, y)
    X_transformed = selector.transform(X_transformed)

    for train_idx, test_idx in cv_split:    
        classifier.fit(X_transformed[train_idx], y[train_idx])
        y_pred = classifier.predict(X_transformed[test_idx])

        score = accuracy_score(y[test_idx], y_pred)   

        cv_scores.append(score)
    
    print(f"Subject_{subject}: {(np.mean(cv_scores)*100):.2f} ± {(np.std(cv_scores)*100):.2f} %")

    scores.append(np.mean(cv_scores))

print(f"Score: {(np.mean(scores)*100):.2f} ± {(np.std(scores)*100):.2f} %")

Subject_01: 81.67 ± 9.72 %
Subject_02: 75.00 ± 0.00 %
Subject_03: 71.67 ± 8.50 %
Subject_04: 60.00 ± 14.34 %
Subject_06: 73.33 ± 6.24 %
Subject_07: 60.00 ± 18.56 %
Subject_08: 70.00 ± 12.47 %
Subject_09: 91.67 ± 7.45 %
Subject_10: 60.00 ± 9.72 %
Subject_11: 66.67 ± 19.72 %
Subject_12: 61.67 ± 8.50 %
Subject_13: 55.00 ± 4.08 %
Subject_14: 50.00 ± 19.00 %
Subject_15: 66.67 ± 7.45 %
Subject_16: 88.33 ± 8.50 %
Subject_17: 76.67 ± 11.06 %
Subject_18: 65.00 ± 8.16 %
Subject_19: 96.67 ± 4.08 %
Subject_20: 58.33 ± 11.79 %
Subject_21: 61.67 ± 13.54 %
Subject_22: 53.33 ± 13.54 %
Subject_23: 68.33 ± 3.33 %
Subject_24: 70.00 ± 8.50 %
Subject_25: 96.67 ± 4.08 %
Subject_26: 75.00 ± 17.48 %
Subject_27: 95.00 ± 6.67 %
Subject_28: 66.67 ± 9.13 %
Subject_29: 66.67 ± 5.27 %
Score: 70.77 ± 12.86 %


In [407]:
np.save(f"scores/Dataset_{dataset}/EEG_WCS_ReliefF_10_LDA.npy", scores)