In [None]:
import time
from pathlib import Path
import itertools
from functools import reduce
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split, StratifiedKFold
from sklearn.metrics import confusion_matrix, roc_auc_score, roc_curve

from sklearn.linear_model import LogisticRegression
from sklearn.svm import LinearSVC, SVC
from sklearn.ensemble import RandomForestClassifier
from sklearn.neighbors import KNeighborsClassifier
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis, QuadraticDiscriminantAnalysis

from tabulate import tabulate

from tqdm.notebook import tqdm

import seaborn as sns


User parameters

In [None]:
bandpass_filename = 'SH_Round_Data.xlsx'
test_ratio = 0.3  # Proportion of test data
random_state = 42  # Specify to fix splitted training and test data. Otherwise, set to None.
labels = {
    0: 'NON',
    1: 'TAR'
}
n_splits = 5
max_input_dim = 100
rounds = [f"{i}round" for i in range(1, 2)]

The code

In [None]:
def plot_confusion_matrix(cm, classes,
                          normalize=False,
                          title='Confusion matrix',
                          cmap=plt.cm.Blues):
    """
    This function prints and plots the confusion matrix.
    Normalization can be applied by setting `normalize=True`.
    """
    accuracy = np.trace(cm) / np.sum(cm)
    print(f'Accuracy: {accuracy}')
    if normalize:
        cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
        print("Normalized confusion matrix")
    else:
        print('Confusion matrix, without normalization')

    print(cm)

    plt.imshow(cm, interpolation='nearest', cmap=cmap)
    plt.title(title)
    plt.colorbar()
    tick_marks = np.arange(len(classes))
    plt.xticks(tick_marks, classes, rotation=45)
    plt.yticks(tick_marks, classes)

    fmt = '.2f' if normalize else 'd'
    thresh = cm.max() / 2.
    for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
        plt.text(j, i, format(cm[i, j], fmt),
                 horizontalalignment="center",
                 color="white" if cm[i, j] > thresh else "black")

    plt.ylabel('True label')
    plt.xlabel('Predicted label')
    plt.tight_layout()

def visualize_training_result(model):
    y_pred = model.predict(x_train)
    plot_confusion_matrix(confusion_matrix(y_train, y_pred), labels, title='Training result')

def visualize_test_result(model):
    y_pred = model.predict(x_test)
    plot_confusion_matrix(confusion_matrix(y_test, y_pred), labels, title='Test result')

def confusion_matrix_summary(model):
    y_pred = model.predict(x_test)
    (tn, fp, fn, tp) = confusion_matrix(y_test, y_pred).ravel()
    n_total = tn + fp + fn + tp
    return {
        'accuracy': (tn + tp) / (tn + fp + fn + tp),
        'sensitivity': tp / (tp + fn),
        'specificity': tn / (tn + fp)
    }

def auc_summary(model, x_test, y_test):
    y_pred = model.predict_proba(x_test)[:, 1]
    auc = roc_auc_score(y_test, y_pred)
    return auc

def acc_summary(model, x_test, y_test):
    y_pred = model.predict(x_test)
    return np.mean(y_test == y_pred)

In [None]:
import warnings
warnings.filterwarnings(action='ignore')

In [None]:
# Pre-process inputs
# raw_data = dict()
# for round in rounds:
#     raw_data[round] = pd.read_excel(p300_path / bandpass_filename, sheet_name=round, header=None, index_col=None)
x_data = dict()
y_data = dict()
for i in tqdm(range(len(rounds)), desc="Loading excel data"):
    round = rounds[i]
    raw_data = pd.read_excel(p300_path / bandpass_filename, sheet_name=round, header=None, index_col=None)
    x_data[round] = raw_data.iloc[:, 1:]
    assert x_data[round].shape[1] == max_input_dim
    y_data[round] = raw_data.iloc[:, 0]

In [None]:
!pip install xlsxwriter

In [None]:
def auc_summary2(model, x_test, y_test, fn):
    y_score = fn(model, x_test)
    auc = roc_auc_score(y_test, y_score)
    return auc

In [None]:
from sklearn.metrics import roc_curve, auc
from scipy import interp

# def plot_roc_curve(model_builder, X, y, title=None, save=False, dir=p300_path):
#     tprs = []
#     aucs = []
#     mean_fpr = np.linspace(0, 1, 101)
#     for i, (train_index, test_index) in enumerate(skf.split(X, y)):
#         x_train = X.iloc[train_index, :]
#         y_train = y.iloc[train_index]
#         x_test = X.iloc[test_index, :]
#         y_test = y.iloc[test_index]
#         if isinstance(model_builder, tuple):
#             model = model_builder[0]()
#             model.fit(x_train, y_train)
#             y_score = model_builder[1](model, x_test)
#         else:
#             model = model_builder()
#             model.fit(x_train, y_train)
#             y_score = model.predict_proba(x_test)[:, 1]
#         fpr, tpr, _ = roc_curve(y_test, y_score)
#         tprs.append(interp(mean_fpr, fpr, tpr))
#         roc_auc = auc(fpr, tpr)
#         aucs.append(roc_auc)
#         plt.plot(fpr, tpr, lw=2, alpha=0.3, label=f'ROC fold {i+1} (AUC = {roc_auc:.2f})')
#     mean_tpr = np.mean(tprs, axis=0)
#     mean_auc = auc(mean_fpr, mean_tpr)
#     plt.plot(mean_fpr, mean_tpr, color='blue', label=f'Mean ROC (AUC = {mean_auc:.2f})', lw=2, alpha=1)
#     plt.xlabel('False Positive Rate')
#     plt.ylabel('True Positive Rate')
#     plt.title(title)
#     plt.legend(loc='lower right')
#     plt.show()
#     if save:
#         df = pd.DataFrame({'mean_fpr': mean_fpr, 'mean_tpr': mean_tpr})
#         with pd.ExcelWriter(dir / f'{title}.xlsx', engine='xlsxwriter') as writer:
#             df.to_excel(writer)
#             writer.save()
#     return mean_tpr, mean_auc

def plot_roc_curve(model_builder, X, y, input_dim, title=None, save=False, dir=p300_path):
    tprs = []
    aucs = []
    mean_fpr = np.linspace(0, 1, 101)
    for i, (train_index, test_index) in enumerate(skf.split(X, y)):
        x_train = X.iloc[train_index, :]
        y_train = y.iloc[train_index]
        x_test = X.iloc[test_index, :]
        y_test = y.iloc[test_index]
        if isinstance(model_builder, tuple):
            #model = model_builder[0]()
            model = model_builder[0](input_dim)
            model.fit(x_train, y_train)
            y_score = model_builder[1](model, x_test)
        else:
            #model = model_builder()
            model = model_builder(input_dim)
            model.fit(x_train, y_train)
            y_score = model.predict_proba(x_test)[:, 1]
        fpr, tpr, _ = roc_curve(y_test, y_score)
        tprs.append(interp(mean_fpr, fpr, tpr))
        roc_auc = auc(fpr, tpr)
        aucs.append(roc_auc)
        plt.plot(fpr, tpr, lw=2, alpha=0.3, label=f'ROC fold {i+1} (AUC = {roc_auc:.2f})')
    mean_tpr = np.mean(tprs, axis=0)
    mean_auc = auc(mean_fpr, mean_tpr)
    plt.plot(mean_fpr, mean_tpr, color='blue', label=f'Mean ROC (AUC = {mean_auc:.2f})', lw=2, alpha=1)
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.title(title)
    plt.legend(loc='lower right')
    plt.show()
    if save:
        if title is None:
            raise RuntimeError("Title should be given.")
        df = pd.DataFrame({'mean_fpr': mean_fpr, 'mean_tpr': mean_tpr})
        # writer = pd.ExcelWriter(dir / f'{title}.xlsx', engine='xlsxwriter')
        # df.to_excel(writer)
        # writer.save()
        with pd.ExcelWriter(dir / f'{title}.xlsx', engine='xlsxwriter') as writer:
            df.to_excel(writer)
            writer.save()
    return mean_tpr, mean_auc

In [None]:
import shutil

In [None]:
# Test
input_dims = [25]
# builders = {
#     'LR': lambda: LogisticRegression(),
#     'SVM': (lambda: LinearSVC(), lambda m, x: m.decision_function(x)),
#     'RF': lambda: RandomForestClassifier(),
#     'kNN': lambda: KNeighborsClassifier(n_neighbors=5),
#     'LDA': lambda: LinearDiscriminantAnalysis(),
#     'QDA': lambda: QuadraticDiscriminantAnalysis()
# }
builders = {
    'LR': lambda dim: LogisticRegression(),
    'SVM': (lambda dim: SVC(kernel='linear'), lambda m, x: m.decision_function(x)),
    'RF': lambda dim: RandomForestClassifier(n_estimators=128, max_features=input_dims[0], bootstrap=False, random_state=100),
    'kNN': lambda dim: KNeighborsClassifier(n_neighbors=5),
    'LDA': lambda dim: LinearDiscriminantAnalysis(),
    'QDA': lambda dim: QuadraticDiscriminantAnalysis()
}
metrics = ['AUC']
subject = 'SH'
rounds = ['1round']
lr_best_idx = {}
for metric in metrics:
    lr_best_idx[metric] = {}
skf = StratifiedKFold(n_splits=n_splits, shuffle=True, random_state=random_state)
for round in rounds:
    for metric in metrics:
        for input_dim in input_dims:
            tprs = {}
            mean_aucs = {}
            for model_name, model_builder in builders.items():
                save_dir = p300_path / f'{subject}_{metric}_INPUT_DIM_{input_dim}'
                # if save_dir.exists():
                #     shutil.rmtree(save_dir)
                save_dir.mkdir(exist_ok=True)
                accs = []
                aucs = []
                x_starts = [i for i in range(0, max_input_dim + 1 - input_dim)]
                for j in tqdm(range(len(x_starts)), desc=f"{subject}_{input_dim}_{model_name}_{round}_{metric}"):
                    accs.append([])
                    aucs.append([])
                    X = x_data[round].iloc[:, x_starts[j]:x_starts[j] + input_dim]
                    y = y_data[round]
                    for train_index, test_index in skf.split(X, y):
                        x_train = X.iloc[train_index, :]
                        y_train = y.iloc[train_index]
                        x_test = X.iloc[test_index, :]
                        y_test = y.iloc[test_index]
                        if isinstance(model_builder, tuple):
                            #model = model_builder[0]()
                            model = model_builder[0](input_dim)
                            model.fit(x_train, y_train)
                            aucs[j].append(auc_summary2(model, x_test, y_test, model_builder[1]))
                        else:
                            #model = model_builder()
                            model = model_builder(input_dim)
                            model.fit(x_train, y_train)
                            aucs[j].append(auc_summary(model, x_test, y_test))
                        accs[j].append(acc_summary(model, x_test, y_test))
                    accs[j] = np.mean(accs[j])
                    aucs[j] = np.mean(aucs[j])
                if metric == 'ACC':
                    best_idx = np.argmax(accs)
                    metric_score = accs[best_idx]
                elif metric == 'AUC':
                    best_idx = np.argmax(aucs)
                    metric_score = aucs[best_idx]
                else:
                    raise RuntimeError(f"Unknown metric {metric}")
                if model_name == 'LR':
                    lr_best_idx[metric][input_dim] = best_idx
                print(f'[Input dim {input_dim}][{model_name}] best {metric}: {metric_score} at index: {best_idx}')
                X = x_data[round].iloc[:, x_starts[best_idx]:x_starts[best_idx] + input_dim]
                y = y_data[round]
                mean_tpr, mean_auc = plot_roc_curve(model_builder, X, y, input_dim, title=f'{subject}_{round}_{model_name}_BEST_{metric}_ROC_CURVE_INPUT_DIM_{input_dim}_INDEX_{best_idx}', save=True, dir=save_dir)
                tprs[model_name] = mean_tpr
                mean_aucs[model_name] = mean_auc
            mean_fpr = np.linspace(0, 1, 101)
            for model_name, mean_tpr in tprs.items():
                mean_auc = mean_aucs[model_name]
                plt.plot(mean_fpr, mean_tpr, label=f'{model_name} (AUC = {mean_auc:.2f})', lw=2, alpha=1)
            plt.xlabel('False Positive Rate')
            plt.ylabel('True Positive Rate')
            plt.title(f"Input dim: {input_dim}")
            plt.legend(loc='lower right')
            plt.show()

In [None]:
for round in rounds:
    for metric in metrics:
        for input_dim in input_dims:
            curr_index = lr_best_idx[metric][input_dim]
            save_dir = p300_path / f'{subject}_{metric}_INPUT_DIM_{input_dim}_INDEX_{curr_index}'
            # if save_dir.exists():
            #     shutil.rmtree(save_dir)
            save_dir.mkdir(exist_ok=True)
            tprs = {}
            mean_aucs = {}
            for model_name, model_builder in builders.items():
                accs = []
                aucs = []
                x_starts = [curr_index]
                for j in tqdm(range(len(x_starts)), desc=f"{subject}_{input_dim}_{model_name}_{round}_{metric}"):
                    accs.append([])
                    aucs.append([])
                    X = x_data[round].iloc[:, x_starts[j]:x_starts[j] + input_dim]
                    y = y_data[round]
                    for train_index, test_index in skf.split(X, y):
                        x_train = X.iloc[train_index, :]
                        y_train = y.iloc[train_index]
                        x_test = X.iloc[test_index, :]
                        y_test = y.iloc[test_index]
                        if isinstance(model_builder, tuple):
                            #model = model_builder[0]()
                            model = model_builder[0](input_dim)
                            model.fit(x_train, y_train)
                            aucs[j].append(auc_summary2(model, x_test, y_test, model_builder[1]))
                        else:
                            #model = model_builder()
                            model = model_builder(input_dim)
                            model.fit(x_train, y_train)
                            aucs[j].append(auc_summary(model, x_test, y_test))
                        accs[j].append(acc_summary(model, x_test, y_test))
                    accs[j] = np.mean(accs[j])
                    aucs[j] = np.mean(aucs[j])
                if metric == 'ACC':
                    best_idx = np.argmax(accs)
                    metric_score = accs[best_idx]
                elif metric == 'AUC':
                    best_idx = np.argmax(aucs)
                    metric_score = aucs[best_idx]
                else:
                    raise RuntimeError(f"Unknown metric {metric}")
                #print(f'[Input dim {input_dim}][{model_name}] best {metric}: {metric_score} at index: {best_idx}')
                X = x_data[round].iloc[:, x_starts[best_idx]:x_starts[best_idx] + input_dim]
                y = y_data[round]
                mean_tpr, mean_auc = plot_roc_curve(model_builder, X, y, input_dim, title=f'{subject}_{round}_{model_name}_BEST_{metric}_ROC_CURVE_INPUT_DIM_{input_dim}_INDEX_{x_starts[best_idx]}', save=True, dir=save_dir)
                tprs[model_name] = mean_tpr
                mean_aucs[model_name] = mean_auc
            mean_fpr = np.linspace(0, 1, 101)
            for model_name, mean_tpr in tprs.items():
                mean_auc = mean_aucs[model_name]
                plt.plot(mean_fpr, mean_tpr, label=f'{model_name} (AUC = {mean_auc:.2f})', lw=2, alpha=1)
            plt.xlabel('False Positive Rate')
            plt.ylabel('True Positive Rate')
            plt.title(f"Input dim: {input_dim}")
            plt.legend(loc='lower right')
            plt.show()