# Library

## Google Drive

In [None]:
from google.colab import drive
drive.mount('/content/drive')

## Data

In [None]:
import numpy as np
import pandas as pd
import json
from sklearn.preprocessing import MinMaxScaler

## Model

In [None]:
import pickle
from sklearn.neural_network import MLPClassifier
from sklearn.svm import SVC as SVM
from sklearn.neighbors import KNeighborsClassifier

In [None]:
!pip install neupy
from neupy.algorithms import PNN

## K * L Fold CV

In [None]:
from sklearn.base import clone as clone_model
from sklearn.model_selection import GridSearchCV, cross_validate, StratifiedKFold
from sklearn.metrics import confusion_matrix

# Dataset

## Data Collection

In [None]:
time_durations = [2,3,4,5]
path = "/content/drive/My Drive/Signal/"
folder = "Dataset/"
datasets = []
for time_duration in time_durations:
  dataset = pd.read_csv(path + folder + "Dataset_{}.csv".format(time_duration))
  dataset.drop(labels='File', axis=1, inplace=True)
  datasets.append(dataset)

## Feature Scaling

In [None]:
def feature_scaling(dataset):
  result = []
  scaler = MinMaxScaler()
  columns = dataset.columns
  dataset[columns[:-1]] = scaler.fit_transform(dataset[columns[:-1]])
  result = dataset
  return result

In [None]:
scaled_datasets = []
for dataset in datasets:
  scaled_datasets.append(feature_scaling(dataset.copy()))

# Model

## Save Result

### CSV

In [None]:
def save_csv(df, folder, filename):
  path = "/content/drive/My Drive/Signal/Single Model/" + folder + "/" + filename
  df.to_csv(path, index=False)
  print("{} saved!".format(filename))

### Model

In [None]:
def save_model(model, folder, filename):
  path = "/content/drive/My Drive/Signal/Single Model/" + folder + "/"
  with open(path+filename, 'wb') as f:
    pickle.dump(model, f)
  print("Model {} saved!".format(filename))

## Metrics

In [None]:
def get_metrics(true_n, false_p, false_n, true_p):
  acc = (true_p + true_n) / (true_p + true_n + false_p + false_n)
  prec = (true_p) / (true_p + false_p)
  rec = (true_p) / (true_p + false_n)
  f1 = 2 * (prec * rec) / (prec + rec)

  result = {
      'accuracy': acc,
      'precision': prec,
      'recall': rec,
      'f1_score': f1
  }
  return result

## Cross Validation

### Inner Cross Validation
Return Param + F1

In [None]:
def inner_cross_validation(n_splits, model, params, X_train, y_train, X_test, y_test):
  # Configure the cross-validation procedure
  # cv = StratifiedKFold(n_splits=n_splits, shuffle=True, random_state=10)
  cv = StratifiedKFold(n_splits=n_splits, shuffle=False)

  # Define search
  # n_jobs = Number of job run parallel
  # refit = retrain best estimator with whole dataset
  search = GridSearchCV(estimator=model, param_grid=params, scoring='accuracy', cv=cv, n_jobs=-1, refit=False)

  # Execute search
  result = search.fit(X_train, y_train)

  # Best Paramteter
  best_parameter = result.best_params_

  # Cross Validation DataFrame
  cv_res = pd.DataFrame(result.cv_results_)
  cv_res['params'] = cv_res['params'].map(lambda x: str(x)) # Convert Dictionary to String type
  cv_res = cv_res[['params','mean_test_score']]

  return cv_res, best_parameter

### K * L Fold CV


In [None]:
def k_l_fold_cv(X, y, k_splits=3, l_splits=5, model=None, params=None):
  if model == None:
    print("Model is undefined.")
    return
  if params == None:
    print("Parameters is undefined.")
    return

  # Configure the cross-validation procedure
  # cv_outer = StratifiedKFold(n_splits=k_splits, shuffle=True, random_state=10)
  cv_outer = StratifiedKFold(n_splits=k_splits, shuffle=False)
  joined_inner_cv_df = pd.DataFrame()
  fold_counter = 0

  # Outer CV Result
  outer_confusion_matrix = {
    'tn': [],
    'fp': [],
    'fn': [],
    'tp': [],
  }
  trained_best_models = []

  for train_ix, test_ix in cv_outer.split(X, y): # Outer Fold Split
    # Split data
    X_train = X[X.index.isin(train_ix)]
    y_train = y[y.index.isin(train_ix)]
    X_test = X[X.index.isin(test_ix)]
    y_test = y[y.index.isin(test_ix)]

    # Inner CV
    inner_cv_df, best_parameter = inner_cross_validation(l_splits, model, params, X_train, y_train, X_test, y_test)

    # Inner CV DataFrame
    inner_cv_df.columns = ['params',f'mean_val_score_{fold_counter+1}']
    if joined_inner_cv_df.empty:
      joined_inner_cv_df = inner_cv_df
    else:
      joined_inner_cv_df = joined_inner_cv_df.join(inner_cv_df.set_index('params'), on='params')

    fold_counter += 1
    print(f"Inner CV Fold {fold_counter} done!")

    # Train / Test Model with Best Parameter
    # Copy Model with Best Parameter
    best_model = clone_model(model, safe=True)
    best_model.set_params(**best_parameter)

    # Train Model
    best_model.fit(X_train, y_train)
    trained_best_models.append(best_model)

    # Test Model
    prediction_prob = best_model.predict_proba(X_test)
    for p1, p2 in prediction_prob:
      if p1 < 0 or p1 > 1:
        print("ERROR PREDICTION")

    # Prediction of Single Model
    y_pred = np.argmax(prediction_prob, axis = 1)

    # Confusion Matrix
    tn, fp, fn, tp = confusion_matrix(y_test, y_pred).ravel()
    outer_confusion_matrix['tn'].append(tn)
    outer_confusion_matrix['fp'].append(fp)
    outer_confusion_matrix['fn'].append(fn)
    outer_confusion_matrix['tp'].append(tp)

  return outer_confusion_matrix, joined_inner_cv_df, trained_best_models

## Single Model Evaluation
Iterate each Duration

### Execution

In [None]:
def single_model_evaluation(folder_name, model_name, model, params, features, datasets):
  # Configure CV
  k_splits = 3
  l_splits = 5

  # Model Evaluation Result DataFrame (Outer CV)
  single_model_evaluation_dict = {
      'duration': [],
  }
  # Dictionary Confusion Matrix Per K Fold (Outer CV)
  confusion_matrix_cells = ['tn', 'fp', 'fn', 'tp']
  for confusion_matrix_cell in confusion_matrix_cells:
    for fold_id in range(k_splits):
      single_model_evaluation_dict[f"{confusion_matrix_cell}_{fold_id+1}"] = []

  # Dictionary Metrics Per K Fold (Outer CV) and Aggregate Result
  metric_names = ['accuracy','precision', 'recall', 'f1_score']
  for metric_name in metric_names:
    for fold_id in range(k_splits):
      single_model_evaluation_dict[f"{metric_name}_{fold_id+1}"] = []
    single_model_evaluation_dict[f"mean_{metric_name}"] = []
    single_model_evaluation_dict[f"std_{metric_name}"] = []

  for duration, dataset in enumerate(datasets):
    time_duration = duration + 2
    print("{} minute duration...".format(time_duration))

    # Feature Selection
    columns = dataset.columns
    X = dataset[columns[:-1]]
    y = dataset[columns[-1]]
    X = X[features]

    # Train Test
    outer_confusion_matrix, joined_inner_cv_df, trained_best_models = k_l_fold_cv(X=X, y=y, k_splits=k_splits, l_splits=l_splits, model=model, params=params)

    # Save Inner Cross Validation Result (Parameter Combinations)
    cv_result_filename = f"{model_name}_duration_{time_duration}_inner_cv.csv"
    save_csv(df=joined_inner_cv_df, folder=folder_name, filename=cv_result_filename)

    # Save Trained Models
    for fold_id, model in enumerate(trained_best_models):
      model_filename = f"{model_name}_duration_{time_duration}_fold_{fold_id+1}.pickle"
      save_model(model=model, folder=folder_name, filename=model_filename)

    # Outer CV Confusion Matrix
    for confusion_matrix_cell in confusion_matrix_cells:
      confusion_matrix_results = outer_confusion_matrix[confusion_matrix_cell]
      for fold_id, confusion_matrix_result in enumerate(confusion_matrix_results): # For each Outer CV Result
        single_model_evaluation_dict[f"{confusion_matrix_cell}_{fold_id+1}"].append(confusion_matrix_result)

    # Configure Dictionary for All Outer CV Result
    metric_outer_cv_results = {}
    for metric_name in metric_names:
      metric_outer_cv_results[metric_name] = []

    # Metrics Per Fold
    for fold_id in range(k_splits):
      tn = single_model_evaluation_dict[f"tn_{fold_id+1}"][duration]
      fp = single_model_evaluation_dict[f"fp_{fold_id+1}"][duration]
      fn = single_model_evaluation_dict[f"fn_{fold_id+1}"][duration]
      tp = single_model_evaluation_dict[f"tp_{fold_id+1}"][duration]

      metric_results = get_metrics(tn, fp, fn, tp)
      for metric_name in metric_names:
        metric_result = metric_results[metric_name]
        single_model_evaluation_dict[f"{metric_name}_{fold_id+1}"].append(metric_result)
        metric_outer_cv_results[metric_name].append(metric_result)

    # Aggregate Metrics
    for metric_name in metric_names:
      metric_outer_cv_result = metric_outer_cv_results[metric_name]
      single_model_evaluation_dict[f'mean_{metric_name}'].append(np.mean(metric_outer_cv_result)) # Metrics Average
      single_model_evaluation_dict[f'std_{metric_name}'].append(np.std(metric_outer_cv_result)) # Metrics Std

    # Add Duration and Best Parameter
    single_model_evaluation_dict['duration'].append(time_duration)

  # Save Single Model Evaluation DataFrame (Outer Cross Validation)
  single_model_df = pd.DataFrame(single_model_evaluation_dict)
  save_csv(df=single_model_df, folder=folder_name, filename=f"{model_name}_final_result.csv")

  return single_model_df

## Joo (2010)

In [None]:
model_1 = MLPClassifier(hidden_layer_sizes=(25, 30), shuffle=False, verbose=False, random_state=6)
model_1_features = ['MeanNN','SDNN','RMSSD','pNN50','VLF','LF','HF','LF/HF','SD1','SD2','SD1/SD2']

model_1_params = dict()
model_1_params['solver'] = ['adam', 'sgd']
model_1_params['learning_rate_init'] = [0.001, 0.01]
model_1_params['max_iter'] = [250, 500, 750, 1000, 1250, 1500, 1750, 2000, 2250, 2500]
model_1_params['activation'] = ['logistic','tanh','relu']

cv_result = single_model_evaluation("Joo", "Neural Network", model_1, model_1_params, model_1_features, scaled_datasets)
cv_result

## Lee (2016)

In [None]:
model_6 = MLPClassifier(hidden_layer_sizes=(5), shuffle=False, verbose=False, random_state=2)
model_6_features = ['MeanNN','SDNN','RMSSD','pNN50','VLF','LF','HF','LF/HF','SD1','SD2','SD1/SD2']

model_6_params = dict()
model_6_params['solver'] = ['adam', 'sgd']
model_6_params['learning_rate_init'] = [0.001, 0.01]
model_6_params['max_iter'] = [250, 500, 750, 1000, 1250, 1500, 1750, 2000, 2250, 2500]
model_6_params['activation'] = ['logistic','tanh','relu']

cv_result_6 = single_model_evaluation("Lee", "Neural Network", model_6, model_6_params, model_6_features, scaled_datasets)
cv_result_6

## Murukesan (2014)
4. SVM
5. PNN

#### SVM

In [None]:
model_4 = SVM(kernel='rbf', gamma=1.0, probability=True, random_state=8)
model_4_features = ['Outlier','sdHR','aTotal','pVLF','pLF','SD1','Alpha']

model_4_params = dict()
model_4_params['C'] = [0.001, 0.01, 0.1, 1, 10, 100, 1000]

cv_result_4 = single_model_evaluation("Murukesan 1", "SVM", model_4, model_4_params, model_4_features, scaled_datasets)
cv_result_4

### PNN

In [None]:
model_5 = PNN(std=0.4, verbose=False)
model_5_features = ['Outlier','sdHR','aTotal','pVLF','pLF','SD1','Alpha']

model_5_params = dict()
model_5_params['std'] = [0.4]

cv_result_5 = single_model_evaluation("Murukesan 2", "PNN", model_5, model_5_params, model_5_features, scaled_datasets)
cv_result_5