In [2]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
import os
import pandas as pd
import codecs, json
from tqdm import tqdm
import numpy as np
from datetime import datetime
import time
import math
import joblib

import sys
sys.path.append('drive/MyDrive/BmiResearch')
from constants import constants
from sklearn.utils.multiclass import unique_labels
from sklearn.metrics import confusion_matrix
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import GridSearchCV
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis
import tracemalloc
from sklearn.model_selection import train_test_split

In [None]:
class_names_dict = {'RightTO': 0, 'RightHS': 1, 'LeftTO': 2, 'LeftHS': 3}
class_names_dict_1 = {0: 'RightTO',  1: 'RightHS',  2: 'LeftTO',  3: 'LeftHS'}

def calculate_weighted_metrics(y_test, y_pred):
    """'weighted':
    Calculate metrics for each label, and find their average weighted by support
    (the number of true instances for each label). This alters ‘macro’ to account for
    label imbalance; it can result in an F-score that is not between
    precision and recall."""
    ACC = accuracy_score(y_test, y_pred)
    PPV = precision_score(y_test, y_pred, average='weighted')
    TPR = recall_score(y_test, y_pred, average='weighted')
    F1 = f1_score(y_test, y_pred, average='weighted')
    return ACC, PPV, TPR, F1


def calculate_weighted_metrics_per_class(y_test, y_pred):
    ACC = accuracy_score(y_test, y_pred)
    PPV = precision_score(y_test, y_pred, average=None)
    TPR = recall_score(y_test, y_pred, average=None)
    F1 = f1_score(y_test, y_pred, average=None)
    PPV = [round(el, 3) for el in PPV]
    TPR = [round(el, 3) for el in TPR]
    F1 = [round(el, 3) for el in F1]
    return ACC, PPV, TPR, F1

def apply_standard_scaling(data_chanks_list_train, one_scaler):
    print('[apply_standard_scaling]')
    print("data_chanks_list_train shape = ", data_chanks_list_train[0].shape)
    final_train_set = []
    for chank_df in tqdm(data_chanks_list_train):
        final_train_set.append(one_scaler.transform(chank_df.T).T)
    return np.array(final_train_set)

def flat_aray(chanks):
    new_chanks = [el.flatten().copy() for el in chanks]
    return np.array(new_chanks)

In [None]:
for sp in constants.SIGNAL_PROCESSING:
  for fe in constants.FEATURE_EXTRACTION:
    output_path = f'{constants.MODELS}/lda/{sp}_{fe}'
    print('output_path: ', output_path)

    if (sp == 'NOSP') & (fe == 'NOFE'):
      dataset_path = constants.BASE_DATASET_PATH
    elif (sp != 'NOSP') & (fe == 'NOFE'):
      dataset_path = f'{constants.PREPROCESSED_DATASET_PATH}/{sp}'
    else:
      dataset_path = f'{constants.PREPROCESSED_DATASET_PATH}/{sp}_{fe}'

    print('dataset_path: ', dataset_path)

    for subject in sorted(os.listdir(dataset_path)):
      print(subject)
      experiment_settings = dict()
      experiment_settings['general_params'] = {'low_filter':constants.low_filter,
                                              'high_filter':constants.high_filter,
                                              'frequency':constants.freq,
                                              'minutes_for_test':constants.minutes_for_test,
                                              'window_size':constants.window_size,
                                              'overlap':constants.overlap,
                                              'EEG_CHANNELS':constants.EEG_CHANNELS}
      experiment_settings['subject'] = subject
      experiment_settings['signal_processing'] = sp
      experiment_settings['feature_extraction'] = fe
      experiment_settings['classification'] = 'LDA'
      experiment_settings['DateTime'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
      experiment_settings['dataset_path'] = dataset_path
      output_path_subject = (f'{output_path}/{subject}')
      experiment_settings['OUTPUT_PATH'] = output_path_subject
      os.makedirs(output_path_subject)

      # fit
      fit_df = codecs.open(f'{dataset_path}/{subject}/X_fit.json', 'r', encoding='utf-8').read()
      fit_df = json.loads(fit_df)
      fit_df = np.array(fit_df)

      one_scaler = StandardScaler()
      one_scaler.fit(fit_df)

      # train
      chanks_train = codecs.open(f'{dataset_path}/{subject}/X_train_chunks.json', 'r', encoding='utf-8').read()
      chanks_train = json.loads(chanks_train)
      chanks_train = np.array(chanks_train)

      # test
      chanks_test = codecs.open(f'{dataset_path}/{subject}/X_test_chunks.json', 'r', encoding='utf-8').read()
      chanks_test = json.loads(chanks_test)
      chanks_test = np.array(chanks_test)

      # y train
      final_y_train_list = codecs.open(f'{constants.BASE_DATASET_PATH}/{subject}/y_train_chunks.json', 'r', encoding='utf-8').read()
      final_y_train_list = json.loads(final_y_train_list)
      final_y_train_list = np.array(final_y_train_list)

      # y test
      final_y_test_list = codecs.open(f'{constants.BASE_DATASET_PATH}/{subject}/y_test_chunks.json', 'r', encoding='utf-8').read()
      final_y_test_list = json.loads(final_y_test_list)
      final_y_test_list = np.array(final_y_test_list)

      lda_train_set = apply_standard_scaling(chanks_train, one_scaler)

      starttime = time.perf_counter()
      lda_test_set = apply_standard_scaling(chanks_test, one_scaler)
      duration_standard_scaling_s = (time.perf_counter() - starttime)
      scale_1ch_s = round(duration_standard_scaling_s / chanks_test.shape[0], 10)
      experiment_settings['scale_1ch_s'] = scale_1ch_s

      lda_train_set = flat_aray(lda_train_set)
      lda_test_set = flat_aray(lda_test_set)
      X_train1, X_val, y_train1, y_val = train_test_split(lda_train_set, final_y_train_list,
                                                                                  test_size=0.2,
                                                                                  random_state=42)

      starttime = time.perf_counter()
      tracemalloc.start()
      model = LinearDiscriminantAnalysis()
      # define grid
      grid = dict()
      grid['solver'] = ['lsqr']
      grid['shrinkage'] = ['auto', 0.01, 0.05, 0.1, 0.15, 0.2, 0.25, 0.3, 0.35, 0.4, 0.45, 0.5, 0.55, 0.6,
                           0.65, 0.7, 0.75, 0.8, 0.85, 0.9, 0.95, 1]

      # # define search
      grid = GridSearchCV(model, grid, scoring='accuracy', cv=5, verbose=3)
      # perform the search
      results = grid.fit(X_val, y_val)
      bp = results.best_params_
      experiment_settings['best_params'] = bp
      # experiment_settings['best_params'] = {'shrinkage': 0.55, 'solver': 'lsqr'}
      print('Mean Accuracy: %.3f' % results.best_score_)
      print('Config: %s' % results.best_params_)
      model = LinearDiscriminantAnalysis(solver=bp['solver'], shrinkage=bp['shrinkage'])
      model.fit(lda_train_set, final_y_train_list)
      joblib.dump(model, f'{output_path_subject}/model_lda_{sp}_{fe}.pkl')
      current, peak_train_MB = tracemalloc.get_traced_memory()
      duration_train_min = round((time.perf_counter() - starttime) / 60, 3)
      print(
          f"Final current memory usage, MB [{current / (1024 * 1024):0.2f}]~peak memory usage, MB [{peak_train_MB / (1024 * 1024):0.2f}]~time [{duration_train_min}] minutes, ")

      tracemalloc.reset_peak()
      tracemalloc.clear_traces()
      tracemalloc.stop()

      starttime = time.perf_counter()
      tracemalloc.start()

      predictions = model.predict(lda_test_set)
      labels = unique_labels(final_y_test_list, predictions)
      labels = [class_names_dict_1[el] for el in labels]
      ACC_w, PPV_w, TPR_w, F1_w = calculate_weighted_metrics(final_y_test_list, predictions)
      ACC, PPV, TPR, F1 = calculate_weighted_metrics_per_class(final_y_test_list, predictions)

      current, peak_predict = tracemalloc.get_traced_memory()
      sec_predict = round((time.perf_counter() - starttime), 3)
      tracemalloc.reset_peak()
      tracemalloc.clear_traces()
      tracemalloc.stop()

      proc_1ch_s = round(sec_predict / chanks_test.shape[0], 5)
      experiment_settings['pred_1_ch_s'] = proc_1ch_s
      experiment_settings['len_test'] = chanks_test.shape[0]

      experiment_settings['y_test'] = list(final_y_test_list[:])
      experiment_settings['prediction'] = list(predictions[:])
      # print('Y test:', list(final_y_test_list[:]))
      # print('Prediction:', list(predictions[:]))
      experiment_settings['labels'] = labels
      cm = confusion_matrix(final_y_test_list, predictions, normalize='true')
      experiment_settings['confusion_matrix'] = experiment_settings['confusion_matrix'] = cm.tolist()
      print(cm)
      print("ACC_w, PPV_w, TPR_w, F1_w = ", ACC_w, PPV_w, TPR_w, F1_w)
      print("ACC, PPV, TPR, F1 = ", ACC, PPV, TPR, F1)

      experiment_settings['peak_predict_MB'] = round(peak_predict / (1024 * 1024), 2)
      experiment_settings['sec_predict'] = sec_predict
      experiment_settings['accuracy_score'] = round(ACC_w, 3)
      experiment_settings['precision_score'] = round(PPV_w, 3)
      experiment_settings['recall_score'] = round(TPR_w, 3)
      experiment_settings['f1_score'] = round(F1_w, 3)
      print("round(F1_w, 3) = ", round(F1_w, 3))

      for i, val in enumerate(labels):
          experiment_settings[f'{val}_precision_score'] = f'{round(PPV[i], 3)}'
          experiment_settings[f'{val}_recall_score'] = f'{round(TPR[i], 3)}'
          experiment_settings[f'{val}_f1_score'] = f'{round(F1[i], 3)}'

      json.dump(experiment_settings, codecs.open(f'{output_path_subject}/experiment_results.json', 'w', encoding='utf-8'),
      separators=(',', ':'),
      sort_keys=True,
      indent=4, default=str)

