In [3]:
import scipy.io as sio
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import pickle
from sklearn.svm import SVC
from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.metrics import accuracy_score
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis as LDA
from sklearn.decomposition import PCA
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import AdaBoostClassifier, RandomForestClassifier
sns.set()

### Read in Data: ###

In [4]:
data = sio.loadmat('../Data/data_cube_subject1.mat')
channel_labels = sio.loadmat('../Data/channel_label.mat')

In [5]:
data_matrix = data['data_cube']
data_labels = data['event_label'].reshape((200,))

### Dimensionality Reduction And Data Formatting Methods: ###

In [6]:
def Format(X:np.ndarray, 
           y:np.ndarray, 
           trials_per_condition:int=100, 
           reduction_method:str= 'Ave_over_samples'):
    """
    Functionality: Method prepares data for usage in a ML algorithm. After feeding X,y through this algorithm, you can expect:
        X and y will be split according to the different experiment conditions. Location of split is determined by trials_per_condition
        X will have its number of dimensions reduced by one, according to the reducion method specified. Also, output data will be transposed
            so that the first index is the number of trials.
    Param:
        X:: ndarray of unlabeled EEG data with dimensions (channels, samples, trials). 
        y:: ndarray of data labels for X with dimensions (trials).
        trials_per_condition:: number of trials done for each condition. Method assumes equal number of trials per condition
        reduction_method:: function with signature (channels x samples x trials)darray ==> ndarray, where the co-domain contains arrays of one fewer dimension than the domain
    Returns:
        X_by_classes_reduced:: Dictionary with keys A,B and values of dimensionality reduced X separated by condition type A or B. The first index of arrays in X_by_classes_reduced
                                    will correspond to the trial number. The second index will correspond to a value which depends on the reduction_method.
        y_by_classes:: Dictionary with keys A,B and values of y separated by condition type A or B.
    """
    X_by_classes, y_by_classes = class_split(X,y, trials_per_condition)
    reduce = reduction_methods[reduction_method]
    X_by_classes_reduced = {typ: reduce(X_by_classes[typ]).T\
                            for typ in X_by_classes}
    return X_by_classes_reduced, y_by_classes

def class_split(X:np.ndarray, y:np.ndarray, trials_per_condition:int):
    """
    Helper Method to Format, separates X and y into condition types
    """
    AX, BX = X[:,:,:trials_per_condition], X[:,:,trials_per_condition:]
    Ay, By = y[:trials_per_condition], y[trials_per_condition:]
    X_dict, y_dict = {'A': AX, 'B':BX}, {'A':Ay, 'B':By}
    return X_dict, y_dict

### Reduction Methods ###

In [7]:
#Reduction methods
#All reduction methods should assume a np.ndarray of shape (channels x samples x trials)
#All reduction methods should return a np.ndarray of shape (dim x trials), where dim depends on the reduction method
reduction_methods = {
    'Ave_over_channels': lambda X: np.mean(X,0),
    'Ave_over_samples': lambda X: np.mean(X,1),
}

### Data Partitioning ###

In [8]:

def train_test_partition(X_dict, y_dict, test_size=0.3, random_state=None):
    all_idx = list(range(X_dict['A'].shape[0]))
    train_idx, test_idx = train_test_split(all_idx, test_size=test_size, random_state=random_state)
    #make splits
    X_dict_train = {key:X_dict[key][train_idx] for key in X_dict}
    X_dict_test = {key:X_dict[key][test_idx] for key in X_dict}
    y_dict_train = {key:y_dict[key][train_idx] for key in y_dict}
    y_dict_test = {key:y_dict[key][test_idx] for key in y_dict}
    
    return (X_dict_train, 
            X_dict_test, 
            y_dict_train, 
            y_dict_test, 
            train_idx, 
            test_idx)

### Hyperparameter Tuning and Training ###

#### Hyperparameter Selection and Model Training Constants: ###

In [9]:
#To ensure the same splits across the different reduction methods, we introduce a random state
# variable here to feed train_test_partition:
RS = 1
       
#Hyperparameter dictionaries for use in grid search
ada_params = {
    'n_estimators': [50, 100, 200, 500],
    'learning_rate': [x/20 for x in range(1,21)],
    'algorithm': ['SAMME', 'SAMME.R'],
    'random_state': [RS]
}

rf_params = {
    'n_estimators': [50, 100, 200, 500],
    'criterion': ['gini', 'entropy'],
    'max_depth': (None, 10, 50, 100, 500),
    'min_samples_split': [2, 4, 5],
    'min_samples_leaf': [1, 2, 4],
    'max_features': ('auto', 'sqrt', 'log2', None),
    'min_impurity_split': (None, 0.5),
    'random_state': [RS]
}

#Dictionary of classifiers and hyperparemeter selections
clfs = {
    'RandomForestClassifier': RandomForestClassifier(),
    'AdaBoostClassifier': AdaBoostClassifier()
}

params = {
    'RandomForestClassifier': rf_params,
    'AdaBoostClassifier': ada_params
}

#### Hyperparameter Selection and Model Training Methods: ####

In [10]:

def tune_multiple_models(data_matrix:np.ndarray, 
                         data_labels:np.ndarray, 
                         trials_per_condition:int=100, 
                         clf_list:list=None, 
                         reduction_method_list:list=None):
    if clf_list==None:
        clf_list = [key for key in clfs]
    if reduction_method_list==None:
        reduction_method_list = [key for key in reduction_methods]
    tuned_models = []
    for clf_ident in clf_list:
        for reduction_method_ident in reduction_method_list:
            tuned_models.append(tune_model(data_matrix, 
                                           data_labels, 
                                           trials_per_condition,
                                           clf_ident, 
                                           reduction_method_ident) )
    return tuned_models

def tune_model(data_matrix:np.ndarray, 
               data_labels:np.ndarray,
               trials_per_condition:int,
               clf_ident:str, 
               reduction_method:str):
    #obtain classifier, hyperparameters from corresponding dictionaries
    clf = clfs[clf_ident]
    hyperparam_dict = params[clf_ident]
    
    #apply dimensionality reduction to data and split into different experiment classes
    X_dict, y_dict = Format(data_matrix,
                            data_labels,
                            trials_per_condition,
                            reduction_method)
    #apply test_train_split
    X_dict_train, X_dict_test, y_dict_train, y_dict_test, train_idx, test_idx = \
    train_test_partition(X_dict, y_dict, random_state=RS)
    
    #tune model
    #For model training, train on both A and B
    #TODO: When we have more data, it might be better to train separate models for A and B classes
        #At the moment there is not enough data to train each class independently
    X = np.vstack([X_dict_train['A'], X_dict_train['B']])
    y = np.hstack([y_dict_train['A'], y_dict_train['B']])
    clf_mod = GridSearchCV(clf, hyperparam_dict, n_jobs=7)
    clf_mod.fit(X,y)
    return {
        'clf_type': clf_ident,
        'reduction_method': reduction_method,
        'train_idx': train_idx,
        'test_idx': test_idx,
        'clf': clf_mod
    }

In [9]:
trained_clfs = tune_multiple_models(data_matrix, data_labels)

In [21]:
pickle.dump(trained_clfs, open('clfs.p','wb'))

In [11]:
X_dict_ave_over_channels, y_dict_ave_over_channels = Format(data_matrix,
                            data_labels,
                            100,
                            'Ave_over_channels')

X_dict_ave_over_samples, y_dict_ave_over_samples = Format(data_matrix,
                            data_labels,
                            100,
                            'Ave_over_samples')
X_tr, y_tr, x_ts, y_ts, _, _ = test_train_parititon(X_dict_ave_over_channels)