In [25]:
import pickle
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler , LabelEncoder
from lightgbm import LGBMClassifier, LGBMRegressor
from sklearn.metrics import precision_score, recall_score, confusion_matrix, f1_score, accuracy_score, balanced_accuracy_score
from sklearn.ensemble import RandomForestClassifier
from collections import namedtuple
from sklearn.svm import LinearSVC
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis
from sklearn.naive_bayes import GaussianNB
import pandas as pd
from sklearn.model_selection import cross_val_predict
import matplotlib.pyplot as plt
import numpy as np
from ipynb.fs.full.evaluation import *
from ipynb.fs.full.Data_Processing import *
from sklearn.model_selection import KFold
import time

### Baseline classification performance on EEG Dataset

Get with 5 fold cross-validation and performance on the test set
- Load the data of each user for all tests 
- Get performance per user - attention, interest, effort
- Get performance per user 

### 1) Load the data

In [26]:
def average(lst): 
    return sum(lst) / len(lst) 

In [27]:
def choose_base_model(model_type, objective, model_name):
    # choose model
    if model_type == 'clf':
        model_dict = {'RF': RandomForestClassifier(random_state=0, n_jobs=-1),
            'NB': GaussianNB(),
            'SVM': LinearSVC(random_state=0, tol=1e-05),
            'LGBM': LGBMClassifier(objective=objective, num_leaves=31, learning_rate=0.1,  random_state=5),
            'LDA': LinearDiscriminantAnalysis()}       
        return model_dict[model_name]
    else: 
        model = LGBMRegressor(num_leaves=31, learning_rate=0.1, random_state=5)
        return model
    

def choose_obj(num_classes):
    if num_classes == 2:
        objective = 'binary'
        return objective
    else: 
        objective = 'multiclass' 
        return objective

In [28]:
def kfold_predict(X,y, model_type, model_name):
    """
    Method for running 5 fold cross validation based on a given array of tests
    """
    kf= KFold(n_splits = 5, shuffle = True, random_state =  1)
    
    if model_type == 'clf':
        results = {"Accuracy":[], "Precision":[], "Recall":[], "F1 Score Macro":[],
              "F1 Score Micro":[],"Balanced Accuracy":[]}
    else:
        results = {'RMSE':[], 'R2':[]}
        
    total_predictions = []
    total_true = []
    num_classes = 0
    for train_index, test_index in kf.split(X):

        #Train/test split
        X_train, X_test = np.concatenate(X[train_index]), np.concatenate(X[test_index])
        y_train, y_test = np.concatenate(y[train_index]).astype('int'), np.concatenate(y[test_index]).astype('int')
        total_true.append(y_test)
        size = len(X_train) + len(X_test)
        
        if len(set(y_train)) > num_classes:
            num_classes = len(set(y_train))
        objective = choose_obj(num_classes)           
        model = choose_base_model(model_type, objective, model_name).fit(X_train,y_train)
        y_pred = model.predict(X_test)
        total_predictions.append(y_pred)
        r = get_results(y_test, y_pred, model_type) # get dictionary of all results
        
        for key in r: # loop through dictionary to add to all the scores to the results dictionary
            results[key].append(r[key])

    for key in results: # average out the results 
        results[key] = average(results[key])

    return results, np.concatenate(total_predictions), np.concatenate(total_true) , num_classes , size

In [29]:
def get_results_per_user(all_users, users, labels , bandpass, model_type):
    """
    Method for getting results per user by applying 5 fold cross validation.
    """
    results = []
    model_names =     ['RF', 'NB', 'SVM','LGBM', 'LDA']
    n_epochs = None
    window_size_samples = None
    for model_name in model_names:
        print("Starting with Baseline Model: {0}...".format(model_name))
        for user in users:
            
            for label in labels:
                print("Running - Model: {0}, User: {1}, label: {2}".format(model_name, user,label))
                time_start = time.time()
                test_list = all_users[user] # list of all the tests
                if bandpass == True:
                    X = np.array([np.array(filter_df(test.iloc[:,:8])) for test in test_list]) # filtered array of all the inputs for each test
                else:
                    X = np.array([np.array(test.iloc[:,:8]) for test in test_list]) # array of all the inputs for each test
                y = np.array([np.array(test[label]) for test in test_list]) # array of given labels for each test           

                # get results
                r, y_pred, y_true, num_classes, size = kfold_predict(X,y, model_type, model_name)
                duration = time.time() - time_start
                results.append(collate_results(r, user, label, duration, num_classes, size, model_type, n_epochs, window_size_samples, model_name))

                if model_type == 'clf':
                    # plot confusion matrix
                    cm = confusion_matrix(y_true, y_pred)
                    saved_file = "results/baseline/clf/confusion matrices/k_fold_performance/{2}_User_{0}_{1}.png".format(user,label, model_name)
                    plot_confusion_matrix(cm, set(y_true), normalize=True , file = saved_file)
                if model_type == 'reg':
                    saved_file = "results/baseline/reg/y vs y_pred/per user/{2}_User_{0}_{1}.png".format(user,label, model_name)
                    plot_model(y_true, y_pred, user, label,file=saved_file)
    return results 

In [30]:
def get_results_cross_users(labels, all_users_agg, bandpass, model_type):
    """
    Method for getting results cross-user using 5-Fold cross-validation
    """
    results = []
    user = 'cross'
    model_names =     ['RF', 'NB', 'SVM','LGBM', 'LDA']
    n_epochs = None
    window_size_samples = None
    for model_name in model_names:
        print("Starting with Baseline Model: {0}...".format(model_name))
    
        # get the results cross-user
        for label in labels: #put the inputs and labels in an array for K-fold
            print("Running - Model: {0}, User: {1}, label: {2}".format(model_name, user,label))
            time_start = time.time()
            if bandpass == True: # filter the channels using bandpass filtering
                X = np.array([np.array(filter_df(all_users_agg[key].iloc[:, :8] )) for key in all_users_agg]) #array of aggregated inputs per user
            else:
                X = np.array([np.array(all_users_agg[key].iloc[:, :8] ) for key in all_users_agg])
            y = np.array([np.array(all_users_agg[key][label]) for key in all_users_agg]) # array of aggregated labels per user        

            # get results 
            r, y_pred, y_true, num_classes , size = kfold_predict(X,y, model_type, model_name)
            duration = time.time() - time_start
            results.append(collate_results(r, user, label, duration, num_classes, size, model_type, n_epochs, window_size_samples, model_name))

            if model_type == 'clf':
                # plot confusion matrix
                cm = confusion_matrix(y_true, y_pred)
                saved_file = "results/baseline/clf/confusion matrices/k_fold_performance/{2}_User_{0}_{1}.png".format(user,label, model_name)
                plot_confusion_matrix(cm, set(y_true), normalize=True , file = saved_file)

            if model_type == 'reg':
                    saved_file = "results/baseline/reg/y vs y_pred/cross user/{2}_User_{0}_{1}.png".format(user,label, model_name)
                    plot_model(y_true, y_pred, user, label,file=saved_file)
                
    return results

In [31]:
def get_all_results(model_type, bandpass, eval_type):
    """
    Method for getting all results per user and across users by applying 5 fold cross validation.
    """
    # Load all the data
    per_user_path = "/cs/home/ybk1/Dissertation/data/saved user and test data/all_tests_EEG_no_agg.pickle"
    all_user_path = "/cs/home/ybk1/Dissertation/data/saved user and test data/all_tests_EEG.pickle"
    all_users_no_agg = load_file(per_user_path) # dictionary of all users , where tests are split out by user without aggregation for per user analysis
    all_users_agg = load_file(all_user_path) # dictionary of all users, where tests are aggregated by user for cross-user analysis
    
    users = all_users_no_agg.keys()
  
    labels = ['attention','interest','effort']
    order = 4
    time_start = time.time()
    
    file = "results/baseline/{0}/tabulated/k fold/baselines_5_fold_CV_performance_{1}_bandpass_{2}.csv".format(model_type, eval_type, bandpass)
    if eval_type == 'per user':
        # get the results per user
        results = get_results_per_user(all_users_no_agg, users, labels, bandpass, model_type)
        results  = pd.DataFrame(results).to_csv(file, index=False)

    elif eval_type == 'cross user':
        #get results cross user
        results = get_results_cross_users(labels, all_users_agg, bandpass,  model_type)
        results  = pd.DataFrame(results).to_csv(file, index=False)

    elif eval_type == 'both':
        results = []
        results.append(pd.DataFrame(get_results_cross_users(labels, all_users_agg, bandpass,  model_type)))
        results.append(pd.DataFrame(get_results_per_user(all_users_no_agg, users, labels, bandpass, model_type)))                     
        results = pd.concat(results).to_csv(file, index=False)
       
  

    print("Time elapsed! {0}".format(time.time() - time_start))
    return results

In [32]:
r = get_all_results('clf', False, 'both')

Starting with Baseline Model: RF...


  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))


KeyboardInterrupt: 