## Notes

[ You can use both the raw EEG data and EEG smooth feature data and plot both to understand them and see what to classify. ]

First, start with one or two participants (possibly someone who has x and someone who doesn't have x) to figure out some kind of pattern. In the end you would take all the information and clump the participants into categories (if they are all in clumps you can take the average and see what the predominant feature is in them)

~Possible helpful tool to split data into training and testing set is sk.learn, which is a training test split. It splits true or false values so you can epoch classifiers. 


Understanding the feature smooth data: 
- it says each field is in the shape of channel_number (62) x sample number (W; W indicates the number of time windows in that trial (different trials have different W because the film clips are not of the same length and each time window is 4 seconds) x frequency_bands (5): 1) delta: 1~4 Hz; 2) theta: 4~8 Hz; 3) alpha: 8~14 Hz; 4) beta: 14~31 Hz; and 5) gamma: 31~50 Hz). 
- this means the data is 3 dimensional 
    - the first would be one channel (ex like one electrode)
    - the second is a time point (ex at 2000 ms)
    - the third dimension is a frequency band (0 = delta, 1 = theta, 2 = alpha, 3 = beta, and 4 = gamma)
    
    
~possible classifiers:
- might be best to use SVM for feature smooth data because it works well with high dimensional data 
- LDA might be good to use for raw data and a lot of articles say it is the most popular classifier (maybe if you find problems with the results it could be an interesting point)

In [None]:
# imports

import sys
import os
from scipy.io import loadmat
import pandas as pd
import numpy as np
from tqdm import tqdm
from typing import Tuple
from math import sqrt
from sklearn.svm import SVC
from sklearn.model_selection import GridSearchCV, StratifiedKFold, train_test_split, cross_val_score
from sklearn.ensemble import RandomForestClassifier
from sklearn.neighbors import KNeighborsClassifier
from sklearn.metrics import SCORERS

print(sys.executable)

In [None]:
# vars
NUM_SUBJECTS = 15
SESSION1_LABELS = [1,2,3,0,2,0,0,1,0,1,2,1,1,1,2,3,2,2,3,3,0,3,0,3]
ESI_Neuroscan_System_channels = ['FP1', 'FPZ', 'FP2', 'AF3', 'AF4', 'F7', 'F5', 'F3', 'F1', 'FZ', 
                                 'F2', 'F4', 'F6', 'F8', 'FT7', 'FC5', 'FC3', 'FC1', 'FCZ', 'FC2', 
                                 'FC4', 'FC6', 'FT8', 'T7', 'C5', 'C3', 'C1','CZ', 'C2', 'C4', 'C6', 
                                 'T8', 'TP7', 'CP5', 'CP3', 'CP1', 'CPZ', 'CP2', 'CP4', 'CP6', 'TP8', 
                                 'P7', 'P5', 'P3', 'P1', 'PZ', 'P2', 'P4', 'P6', 'P8', 'PO7', 'PO5', 
                                 'PO3', 'POZ', 'PO4', 'PO6', 'PO8', 'CB1', 'O1', 'OZ', 'O2', 'CB2']
ten_twenty_international_system_channels = ['FP1', 'FP2', 'F7', 'F3', 'FZ', 'F4', 'F8', 'T3', 'C3', 
                                            'CZ', 'C4', 'T4', 'T5', 'P3', 'PZ', 'P4', 'T6', 'O1', 'O2']
frequency_bands = ['delta', 'theta', 'alpha', 'beta', 'gamma']
DATA_COL_NAMES = [f"{channel}_{freq_band}" for channel in ESI_Neuroscan_System_channels for freq_band in frequency_bands] + ["emotionLabel"]
CHANNELS_INDEX = 0; WINDOWS_INDEX = 1; FREQUENCY_BANDS_INDEX = 2


In [None]:
session1_dir = "SEED_IV/eeg_feature_smooth/1"
session1_subjects_data = [None] * NUM_SUBJECTS
print("Loading data from session 1:")
for filename in os.listdir(session1_dir):
    f = os.path.join(session1_dir, filename)
    subject, date = filename.split("_")
    print(f"\tLoading data from subject {subject}...")
    session1_subjects_data[int(subject)-1] = loadmat(f)

### Now to parse the data into 4 dataframes: <br>
- **session1_de_movingAve**
- **session1_de_lds**
- **session1_psd_movingAve**
- **session1_psd_lds**





In [None]:
def parse_data_title(data_title: str) -> Tuple[str, str, str]:
    '''
    Parse the key in the data dictionary into feature, smoothing method, and trial number.
    '''
    if data_title in ["__globals__", "__header__", "__version__"]:
        return "N/A", "N/A", "N/A"

    # 2-digit trial number
    if data_title[-2].isdigit():
        feature, smoothing_method_and_trial_num = data_title.split("_")
        return feature, smoothing_method_and_trial_num[:-2], smoothing_method_and_trial_num[-2:]

    # 1-digit trial number
    else:
        feature, smoothing_method_and_trial_num = data_title.split("_")
        return feature, smoothing_method_and_trial_num[:-1], smoothing_method_and_trial_num[-1:]


In [None]:
def parse_trial(data: np.ndarray, trial_num: int):
       '''
       Parse data from one trial into a dataframe and return it.
       '''
       trial_df = pd.DataFrame(columns=DATA_COL_NAMES)
       emotionLabel = SESSION1_LABELS[trial_num-1]
       # add windows in the trial as samples to the trial df
       for window in range(data.shape[WINDOWS_INDEX]):
              window_mat = np.array(data[:, window, :])
              row = np.append(window_mat.flatten(), emotionLabel).reshape(1, len(DATA_COL_NAMES))
              row_df = pd.DataFrame(row, columns=DATA_COL_NAMES)
              trial_df = pd.concat([trial_df, row_df])

       return trial_df

In [None]:
# create the dataframes
session1_de_movingAve = pd.DataFrame(columns=DATA_COL_NAMES)
session1_de_lds = pd.DataFrame(columns=DATA_COL_NAMES)
session1_psd_movingAve = pd.DataFrame(columns=DATA_COL_NAMES)
session1_psd_lds = pd.DataFrame(columns=DATA_COL_NAMES)

# loop over all 15 subjects
for subject_num, subject_data in enumerate(session1_subjects_data):
    print(f"Adding data from subject {subject_num+1}...")
    # loop over all the different features, smoothing methods, and trials
    for data_title in subject_data:
        # parse the data into row to add to the dataframe
        feature, smoothing_method, trial = parse_data_title(data_title)
        #print(f"parsing feature: {feature} | smoothing method: {smoothing_method} | trial: {trial}")
        
        # a field that doesn't need to be parsed
        if feature == "N/A":
            continue

        #which dataframe does it go into?
        if feature == "de" and smoothing_method == 'movingAve':
            # add to session1_de_movingAve dataframe
            session1_de_movingAve =  pd.concat([session1_de_movingAve, parse_trial(subject_data[data_title], int(trial))])

        if feature == "de" and smoothing_method == 'LDS':
            # add to session1_de_movingAve dataframe
            session1_de_lds =  pd.concat([session1_de_lds, parse_trial(subject_data[data_title], int(trial))])

        if feature == "psd" and smoothing_method == 'movingAve':
            # add to session1_de_movingAve dataframe
            session1_psd_movingAve =  pd.concat([session1_psd_movingAve, parse_trial(subject_data[data_title], int(trial))])

        if feature == "psd" and smoothing_method == 'LDS':
            # add to session1_de_movingAve dataframe
            session1_psd_lds =  pd.concat([session1_psd_lds, parse_trial(subject_data[data_title], int(trial))])

session1_de_movingAve = session1_de_movingAve.reset_index(drop=True)
session1_de_lds = session1_de_lds.reset_index(drop=True)
session1_psd_movingAve = session1_de_movingAve.reset_index(drop=True)
session1_psd_lds = session1_psd_lds.reset_index(drop=True)

In [None]:
session1_de_movingAve

In [None]:
session1_de_lds

In [None]:
session1_psd_movingAve

In [None]:
session1_psd_lds

### Reduce channels to only 10/20 international system channels.

In [None]:
def is_ten_twenty_international_system_channel(col_name: str) -> bool:
    '''
    Return True if a column corresponds to a 10/20 international system channel (and not just an ESI channel), 
    False otherwise.
    '''

    if col_name == 'emotionLabel':
        return True

    esi_channel, freq_band = col_name.split("_")
    #print(f"{esi_channel} matched these 10/20 is channels: {np.array(ten_twenty_international_system_channels)[arr]}")
    return any(is_channel == esi_channel for is_channel in ten_twenty_international_system_channels)

assert is_ten_twenty_international_system_channel("FP1_gamma"), "FP1_gamma unit test failed"
assert is_ten_twenty_international_system_channel("O2_beta"), "O2_beta unittest failed"
assert not is_ten_twenty_international_system_channel("CP4_alpha"), "CP4_alpha unit test failed"
assert not is_ten_twenty_international_system_channel("TP7_theta"), "TP7_theta unit test failed"
assert is_ten_twenty_international_system_channel("emotionLabel"), "emotionLabel unit test failed"

In [None]:
reduced_channels_cols = [col for col in DATA_COL_NAMES if is_ten_twenty_international_system_channel(col)]

Oh no! Some 10/20 interational system channels are not ESI channels as well!

In [None]:
reduced_channels = set(map(lambda s: s.split("_")[0], reduced_channels_cols))
print(f"These 10/20 international system channels are missing: {set(ten_twenty_international_system_channels) - reduced_channels}") 

Well, T7, T8, P7, and P8 are pretty close to those channels so we can just include them as well to recover that information.

In [None]:
def is_extra_channel(col_name: str) -> bool:
    '''
    Return True if a column corresponds to one of the extra channels, False otherwise.
    '''
    esi_channel, freq_band = col_name.split("_")
    return any(esi_channel == extra_channel for extra_channel in ["T7", "T8", "P7", "P8"])

In [None]:
cols_to_keep = [col for col in DATA_COL_NAMES if is_ten_twenty_international_system_channel(col) or is_extra_channel(col)]

session1_de_movingAve = session1_de_movingAve[cols_to_keep]
session1_de_lds = session1_de_lds[cols_to_keep]
session1_psd_movingAve = session1_psd_movingAve[cols_to_keep]
session1_psd_lds = session1_psd_lds[cols_to_keep]

In [None]:
session1_de_movingAve

In [None]:
session1_de_lds

In [None]:
session1_psd_movingAve

In [None]:
session1_psd_lds

# Next Steps
Now we have the 4 dataframes (session1_de_movingAve, session1_de_lds, session1_psd_movingAve, session1_psd_lds) representing the 4 different possible feature/smoothing methods matchups.

Now, the meat of our project. We want to see which combination of feature, smoothing method, and classifier is the best at predicting what type of film clip the participant was watching: (DE (differential entropy), PSD (power spectral density)) x (movingAve, LDS (linear dynamic system)) x (SVC(rbf kernel), random forest, k-nearest neighbors). Let's just assume we use classification accuracy as our metric to measure which is the best.


In [None]:
# create sample datasets for testing

SAMPLE_FRAC = 0.01
sample_session1_de_movingAve = session1_de_movingAve.sample(frac=SAMPLE_FRAC)
sample_session1_de_lds = session1_de_lds.sample(frac=SAMPLE_FRAC)
sample_session1_psd_movingAve = session1_psd_movingAve.sample(frac=SAMPLE_FRAC)
sample_session1_de_lds = session1_de_lds.sample(frac=SAMPLE_FRAC)

sample_datasets = [(sample_session1_de_movingAve, "de", "movingAve"), (sample_session1_de_lds, "de", "LDS"), 
                   (sample_session1_psd_movingAve, "psd", "movingAve"), (sample_session1_de_lds, "de", "LDS")]

In [None]:
svc_param_search_space = {'C': np.logspace(-3, 3, 7), 
                    'gamma': ['scale', 'auto'], 
                    'kernel': ['rbf']}

rf_param_search_space = {'max_features': ['auto', 'sqrt', 'log2'],
                         'criterion': ['gini', 'entropy']}

knn_param_search_space = {'n_neighbors': list(map(int, np.linspace(5, sqrt(len(session1_de_movingAve)), num=5))), 
                          'weights': ['uniform', 'distance']}

sample_knn_param_search_space = {'n_neighbors': list(map(int, np.linspace(5, sqrt(len(sample_session1_de_movingAve)), num=5))), 
                          'weights': ['uniform', 'distance']}
NUM_TRIALS = 10

MAX_ITER = 10000000
classifiers = [(SVC(max_iter=MAX_ITER), svc_param_search_space, "SVC"), 
               (RandomForestClassifier(), rf_param_search_space, "Random Forest"), 
               (KNeighborsClassifier(n_neighbors=5), knn_param_search_space, "K-Nearest Neighbors")]

sample_classifiers = [(SVC(max_iter=MAX_ITER), svc_param_search_space, "SVC"), 
               (RandomForestClassifier(), rf_param_search_space, "Random Forest"), 
               (KNeighborsClassifier(n_neighbors=5), sample_knn_param_search_space, "K-Nearest Neighbors")]

datasets = [(session1_de_movingAve, "de", "movingAve"), (session1_de_lds, "de", "LDS"), 
            (session1_psd_movingAve, "psd", "movingAve"), (session1_psd_lds, "psd", "LDS")]

ANALYSIS_RESULTS_DF_COL_NAMES = ['feature', 'smoothing method', 'classifier', 'accuracy']

In [None]:
%%time

# run with sample datasets to first make sure the code works
analysis_results_df = pd.DataFrame(columns=ANALYSIS_RESULTS_DF_COL_NAMES)
print(f"Running Analysis...\n")
print(f"FEATURE\tSMOOTHING METHOD\tCLASSIFIER")
for trial in range(NUM_TRIALS):
    for clf, clf_param_search_space, clf_name in sample_classifiers:
        for dataset, feature, smoothing_method in sample_datasets:
            print(f"{feature}\t{smoothing_method}\t\t{clf_name}")

            # train model
            X = dataset.drop('emotionLabel', axis=1); y = dataset.emotionLabel
            X_train, X_test, y_train, y_test = train_test_split(X, y)
            grid = GridSearchCV(clf, clf_param_search_space, cv=StratifiedKFold(n_splits=5), scoring="accuracy", 
                                refit=True).fit(X_train, y_train)
            
            # record results in dataframe
            test_set_accuracy_score = grid.score(X_test, y_test)
            row = [[feature, smoothing_method, clf_name, test_set_accuracy_score]]
            analysis_results_df = pd.concat([analysis_results_df, pd.DataFrame(row, columns=ANALYSIS_RESULTS_DF_COL_NAMES)])

analysis_results_df

In [None]:
%%time

# Now let's run the real analysis
analysis_results_df = pd.DataFrame(columns=ANALYSIS_RESULTS_DF_COL_NAMES)
print(f"Running Analysis...\n")
print(f"FEATURE\tSMOOTHING METHOD\tCLASSIFIER")
for trial in range(NUM_TRIALS):
    for clf, clf_param_search_space, clf_name in classifiers:
        for dataset, feature, smoothing_method in datasets:
            print(f"{feature}\t{smoothing_method}\t\t{clf_name}")

            # train model
            X = dataset.drop('emotionLabel', axis=1); y = dataset.emotionLabel
            X_train, X_test, y_train, y_test = train_test_split(X, y)
            grid = GridSearchCV(clf, clf_param_search_space, cv=StratifiedKFold(n_splits=5), scoring="accuracy", 
                                refit=True).fit(X_train, y_train)
            
            # record results in dataframe
            test_set_accuracy_score = grid.score(X_test, y_test)
            row = [[feature, smoothing_method, clf_name, test_set_accuracy_score]]
            analysis_results_df = pd.concat([analysis_results_df, pd.DataFrame(row, columns=ANALYSIS_RESULTS_DF_COL_NAMES)])

analysis_results_df = analysis_results_df.reset_index(drop=True)
analysis_results_df

In [None]:
# save the data
analysis_results_df.to_csv("seed_iv_analysis_results.csv")