In [None]:
##### Step 1: import functions #####
from keras.layers import Dense, Flatten, BatchNormalization, Activation, Conv2D, Conv1D, AveragePooling2D, AveragePooling1D, Input, MaxPooling1D, Dropout
from keras.models import load_model, Model
from keras.optimizers import Adam
from keras.callbacks import ModelCheckpoint, EarlyStopping, LearningRateScheduler
from keras import backend as K, initializers, regularizers
import pickle
import pandas as pd
import numpy as np
# from numpy.random import seed; seed(473)
import random
import matplotlib.pyplot as plt
import seaborn as sns
import tensorflow as tf
# from tensorflow import set_random_seed; set_random_seed(763)
from keras.losses import binary_crossentropy
from sklearn.model_selection import KFold
from sklearn.metrics import roc_curve, auc
# from sklearn.externals.six import StringIO  
from sklearn.tree import export_graphviz, DecisionTreeRegressor
from sklearn.calibration import calibration_curve
from sklearn.utils import class_weight
from scipy.stats import ttest_ind
from IPython.display import Image  
import pydotplus
import math

In [None]:
##### Step 2: load data #####
x = np.load('supermatrix_unfiltered_all_log_clipped_0-1_norm1.npy')
metaData = pd.read_csv('data/supermatrix_labels.csv', delimiter=';')
y = metaData['D4 pass'].values

print('Total number of samples: ', x.shape[0])
print('Number of positive samples (D4 pass): ', np.sum(y))
print('Number of negative samples (D4 fail): ', y.shape[0]-np.sum(y))

In [3]:
# load test and train folds
# test_folds = np.load('data/test_folds.npy', allow_pickle=True)
test_folds = np.load('data/test_folds_super.npy', allow_pickle=True)
train_folds = np.load('data/train_folds_super.npy', allow_pickle=True)

In [4]:
##### Define test and train folds #####
samples = metaData['sample'].values

# find all samples labelled as D4 pass
d4_pass = metaData[metaData['D4 pass']==1]['sample'].values
d4_fail = metaData[metaData['D4 pass']==0]['sample'].values

# shuffle data and split into eight folds
random.shuffle(d4_pass)
random.shuffle(d4_fail)
test_folds_pass = np.array_split(d4_pass, 7)
test_folds_fail = np.array_split(d4_fail, 7)

# concatenate test folds
test_folds = []
for i in range(7):
    test_folds.append(np.concatenate((test_folds_pass[i], test_folds_fail[6-i])))

# create 5-fold train-validation splits for each test fold
train_folds = []
for test_fold in test_folds:
    idx_train = np.invert(np.isin(samples, test_fold))
    idx_test = np.isin(samples, test_fold)

    samples_train = samples[idx_train]
    random.shuffle(samples_train)
    train_fold = np.array_split(samples_train, 5)

    train_folds.append(train_fold)

In [4]:
##### Step 4: define model #####

def create_model(x, l2=0.001, dropout=1/3):
    # input
    model_input = Input(shape=x[0].shape)

    # first convolution layer
    model_output = Conv1D(4, 
                        kernel_size=1, 
                        kernel_initializer=initializers.RandomUniform(),
                        kernel_regularizer=regularizers.l2(l2),
                        activation=None)(model_input)
    model_output = BatchNormalization()(model_output)
    model_output = Activation("relu")(model_output)

    # second convolution layer
    model_output = Conv1D(4,
                        kernel_size=1,
                        kernel_initializer=initializers.RandomUniform(),
                        kernel_regularizer=regularizers.l2(l2),
                        activation=None)(model_output)
    model_output = BatchNormalization()(model_output)
    model_output = Activation("relu")(model_output)

    # pooling layer
    model_output = AveragePooling1D(pool_size=x.shape[1])(model_output)
    model_output = Flatten()(model_output)

    # # dropout
    # model_output = Dropout(rate=1/4)(model_output)

    # Dense layer
    model_output = Dense(3, 
                        kernel_initializer=initializers.RandomUniform(),
                        kernel_regularizer=regularizers.l2(l2),
                        activation=None)(model_output)
    model_output = BatchNormalization()(model_output)
    model_output = Activation("relu")(model_output)

    # output layer
    model_output = Dense(1, 
                        kernel_initializer=initializers.RandomUniform(),
                        activation=None)(model_output)
    model_output = BatchNormalization()(model_output)
    model_output = Activation("sigmoid")(model_output)

    return Model(inputs=model_input, outputs=model_output)

# define function for plotting train and validation loss and accuracy
def plot_loss_acc(history):
    fig, axes = plt.subplots(1, 2, figsize=(15, 6))
    # increase font size
    plt.rcParams.update({'font.size': 14})
    # increase axes font size
    axes[0].tick_params(axis='both', which='major', labelsize=14)
    axes[1].tick_params(axis='both', which='major', labelsize=14)

    axes[0].plot(history.history['loss'])
    axes[0].plot(history.history['val_loss'])
    axes[0].set_title('train vs validation loss')
    axes[0].set_ylabel('loss')
    axes[0].set_xlabel('epoch')
    axes[0].legend(['train', 'validation'], loc='best')

    axes[1].plot(history.history['acc'])
    axes[1].plot(history.history['val_acc'])
    axes[1].set_title('train vs validation accuracy')
    axes[1].set_ylabel('accuracy')
    axes[1].set_xlabel('epoch')
    axes[1].legend(['train', 'validation'], loc='best')
    plt.show()

def plot_prec_rec_f1(history):
    fig, axes = plt.subplots(1, 3, figsize=(22, 6))
    # increase font size
    plt.rcParams.update({'font.size': 14})
    # increase axes font size
    axes[0].tick_params(axis='both', which='major', labelsize=14)
    axes[1].tick_params(axis='both', which='major', labelsize=14)
    axes[2].tick_params(axis='both', which='major', labelsize=14)

    axes[0].plot(history.history['precision'])
    axes[0].plot(history.history['val_precision'])
    axes[0].set_title('train vs validation precision')
    axes[0].set_ylabel('precision')
    axes[0].set_xlabel('epoch')
    axes[0].legend(['train', 'validation'], loc='best')

    axes[1].plot(history.history['recall'])
    axes[1].plot(history.history['val_recall'])
    axes[1].set_title('train vs validation recall')
    axes[1].set_ylabel('recall')
    axes[1].set_xlabel('epoch')
    axes[1].legend(['train', 'validation'], loc='best')

    axes[2].plot(history.history['f1'])
    axes[2].plot(history.history['val_f1'])
    axes[2].set_title('train vs validation f1')
    axes[2].set_ylabel('f1')
    axes[2].set_xlabel('epoch')
    axes[2].legend(['train', 'validation'], loc='best')
    plt.show()

def weighted_binary_crossentropy(y_true, y_pred, class_weights):
    # Weighted loss for class 0 and class 1
    w0 = class_weights[0]
    w1 = class_weights[1]
    
    # Calculate the weighted binary cross-entropy
    bce = binary_crossentropy(y_true, y_pred)
    weights = (1-y_true)*w0 + y_true*w1
    weighted_loss = bce*weights
    return weighted_loss

# define custom metrics: precision, recall, f1
def precision(y_true, y_pred):
    """Precision metric.
    Only computes a batch-wise average of precision.
    Computes the precision, a metric for multi-label classification of
    how many selected items are relevant.
    """
    
    true_positives = K.sum(K.round(K.clip(y_true * y_pred, 0, 1)))
    predicted_positives = K.sum(K.round(K.clip(y_pred, 0, 1)))

    precision = true_positives / (predicted_positives + K.epsilon())
    return precision

def recall(y_true, y_pred):
    """Recall metric.
    Only computes a batch-wise average of recall.
    Computes the recall, a metric for multi-label classification of
    how many relevant items are selected.
    """

    true_positives = K.sum(K.round(K.clip(y_true * y_pred, 0, 1)))
    total_positives = K.sum(K.round(K.clip(y_true, 0, 1)))
    
    recall = true_positives / (total_positives + K.epsilon())
    return recall

def f1(y_true, y_pred):
    """F1 metric.
    Only computes a batch-wise average of f1.
    Computes the f1, a metric for multi-label classification of
    how many relevant items are selected and how many selected items are relevant.
    """

    precision_value = precision(y_true, y_pred)
    recall_value = recall(y_true, y_pred)
    
    f1 = 2*((precision_value*recall_value)/(precision_value+recall_value+K.epsilon()))
    return f1

def compute_class_weights(y):
    """Compute class weights for unbalanced datasets."""
    class_weights = {0: len(y)/(len(y)-np.sum(y)), 1: 1}
    # class_weights = {0: len(y)/(len(y)-np.sum(y)), 1: len(y)/np.sum(y)}
    return class_weights

def compute_inverse_frequency_weights_binary(y):
    # Calculate the frequency of class 0 and class 1
    count_class_0 = np.sum(y == 0)
    count_class_1 = np.sum(y == 1)

    # Calculate the total number of samples
    total_samples = len(y)

    # Calculate the inverse frequency class weights
    weight_class_0 = total_samples/(count_class_0*2)  # Class 0 weight
    weight_class_1 = total_samples/(count_class_1*2)  # Class 1 weight

    class_weights = {0: weight_class_0, 1: weight_class_1}
    
    return class_weights

def compute_sqrt_inverse_frequency_weights_binary(y):
    # Calculate the frequency of class 0 and class 1
    count_class_0 = np.sum(y == 0)
    count_class_1 = np.sum(y == 1)

    # Calculate the total number of samples
    total_samples = len(y)

    # Calculate the square root of inverse frequency class weights
    weight_class_0 = np.sqrt(total_samples/(count_class_0*2))  # Class 0 weight
    weight_class_1 = np.sqrt(total_samples/(count_class_1*2))  # Class 1 weight

    class_weights = {0: weight_class_0, 1: weight_class_1}
    
    return class_weights

def compute_logarithmic_weights_binary(y):
    # Calculate the frequency of class 0 and class 1
    count_class_0 = np.sum(y == 0)
    count_class_1 = np.sum(y == 1)

    # Calculate the total number of samples
    total_samples = len(y)

    # Calculate the logarithmic class weights (adding 1 to avoid division by zero)
    weight_class_0 = math.log(total_samples/(count_class_0 * 2) + 1)  # Class 0 weight
    weight_class_1 = math.log(total_samples/(count_class_1 * 2) + 1)  # Class 1 weight

    class_weights = {0: weight_class_0, 1: weight_class_1}
    
    return class_weights

def calc_mean_std(history):
    mean = np.array(history).mean(axis=0)
    std = np.array(history).std(axis=0)
    return mean, std

def plot_error_bar(train_mean, train_std, val_mean, val_std, ylabel, ax, leg_loc='best'):
    ax.tick_params(axis='both', which='major', labelsize=16)
    ax.errorbar(range(1, len(train_mean) + 1), train_mean, yerr=train_std, label='train')
    ax.errorbar(range(1, len(val_mean) + 1), val_mean, yerr=val_std, label='validation')
    ax.set_xlabel('epoch')
    ax.set_ylabel(ylabel)
    ax.legend(loc=leg_loc)

In [None]:
##### Step 5: train and test model #####
samples = metaData['sample'].values
for test_round, test_fold in enumerate(test_folds):
    print(f"Test fold {test_round+1}/{len(test_folds)}")
    
    idx_test = np.isin(samples, test_fold)
    idx_train = np.invert(idx_test)

    # choose channels that are to be included in the analysis
    # channels = [0,3,6,9,12,15,18,21,24,27,30,33,36,39,42,45]
    # channels = [0,3,24]

    # x_train = x[idx_train,:,:]; x_train = x_train[:,:,channels]; y_train = y[idx_train]
    # x_test = x[idx_test,:,:]; x_test = x_test[:,:,channels]; y_test = y[idx_test]

    x_train = x[idx_train,:,:]; y_train = y[idx_train]; samples_train = samples[idx_train]
    x_test = x[idx_test,:,:]; y_test = y[idx_test]; samples_test = samples[idx_test]

    # # k-fold cross-validation
    # # Specify the number of folds for cross-validation
    # num_folds = 5

    # # Create a KFold instance
    # # generate random number between 0 and 1000
    # seed = random.randint(0, 1000)
    # kf = KFold(n_splits=num_folds, shuffle=True, random_state=seed)

    # Lists to store evaluation results
    val_loss_history = []; val_accuracy_history = []; val_recall_history = []; val_precision_history = []; val_f1_history = []
    train_loss_history = []; train_accuracy_history = []; train_recall_history = []; train_precision_history = []; train_f1_history = []

    class_weights_folds = []
    acc_best_epoch = []; loss_best_epoch = []

    # for fold, (train_indices, valid_indices) in enumerate(kf.split(x_train)):
    for fold, valid_samples in enumerate(train_folds[test_round]):
        print(f"Training fold {fold+1}/{len(train_folds[test_round])}")

        valid_indices = np.isin(samples_train, valid_samples)
        train_indices = np.invert(valid_indices)
        
        x_train_fold = x_train[train_indices]
        y_train_fold = y_train[train_indices]
        x_valid_fold = x_train[valid_indices]
        y_valid_fold = y_train[valid_indices]

        class_weights = compute_class_weights(y_train_fold)
        class_weights_folds.append(class_weights)
        print(f"Class weights: {class_weights}")
        
        # Build the model (same as your previous code)
        model = create_model(x_train, l2=0.01)
        model.compile(loss=lambda y_true, y_pred: weighted_binary_crossentropy(y_true, y_pred, class_weights),
                    optimizer=Adam(lr=0.0001),
                    metrics=['accuracy', precision, recall, f1])
        
        checkpointer = ModelCheckpoint(filepath=f'saved_weights_unfiltered_super_testfold{test_round+1}_fold{fold+1}.hdf5', 
                                    monitor='val_loss', verbose=0, 
                                    save_best_only=True)
        
        # Train the model
        history = model.fit(x_train_fold, y_train_fold,
                            batch_size=5,
                            epochs=300, 
                            verbose=0,
                            callbacks=[checkpointer],
                            validation_data=([x_valid_fold], y_valid_fold),
                            class_weight=class_weights)
        
        plot_loss_acc(model.history)
        plot_prec_rec_f1(model.history)
        
        # Store validation metrics for each fold
        val_loss_history.append(history.history['val_loss'])
        val_accuracy_history.append(history.history['val_acc'])
        val_recall_history.append(history.history['val_recall'])
        val_precision_history.append(history.history['val_precision'])
        val_f1_history.append(history.history['val_f1'])
        train_loss_history.append(history.history['loss'])
        train_accuracy_history.append(history.history['acc'])
        train_recall_history.append(history.history['recall'])
        train_precision_history.append(history.history['precision'])
        train_f1_history.append(history.history['f1'])

        # Find the epoch with the lowest validation loss
        best_epoch = np.argmin(history.history['val_loss'])
        acc_best_epoch.append(history.history['val_acc'][best_epoch])
        loss_best_epoch.append(history.history['val_loss'][best_epoch])

        # print validation accuracy and loss at best epoch
        print(f"Validation accuracy at best epoch: {history.history['val_acc'][best_epoch]:.2f}")
        print(f"Validation loss at best epoch: {history.history['val_loss'][best_epoch]:.2f}")
    
    # print model with lowest validation loss and highest validation accuracy
    print(f"Model with lowest validation loss at best epoch: {np.argmin(loss_best_epoch)+1}")
    print(f"Model with highest validation accuracy at best epoch: {np.argmax(acc_best_epoch)+1}")

    # Calculate average validation loss and accuracy for each model
    avg_val_loss = np.array(val_loss_history).mean(axis=1)
    avg_val_accuracy = np.array(val_accuracy_history).mean(axis=1)
    avg_val_f1 = np.array(val_f1_history).mean(axis=1)

    # Choose the model with the highest average validation accuracy
    print(f"Model with lowest average validation loss: {np.argmin(avg_val_loss)+1}")
    print(f"Model with highest average validation accuracy: {np.argmax(avg_val_accuracy)+1}")
    print(f"Model with highest average validation f1: {np.argmax(avg_val_f1)+1}")

    # plot average validation (train, validation) and accuracy (train, validation) vs. epoch with error bars
    fig, axes = plt.subplots(1, 2, figsize=(15, 6))
    plt.rcParams.update({'font.size': 16})

    # loss
    [train_mean, train_std] = calc_mean_std(train_loss_history)
    [val_mean, val_std] = calc_mean_std(val_loss_history)
    print(f"Average train loss: {train_mean[-1]:.2f} +/- {train_std[-1]:.2f}")
    print(f"Average validation loss: {val_mean[-1]:.2f} +/- {val_std[-1]:.2f}")

    plot_error_bar(train_mean, train_std, val_mean, val_std, 'loss', axes[0], leg_loc='upper right')

    # accuracy
    [train_mean, train_std] = calc_mean_std(train_accuracy_history)
    [val_mean, val_std] = calc_mean_std(val_accuracy_history)
    print(f"Average train accuracy: {train_mean[-1]:.2f} +/- {train_std[-1]:.2f}")
    print(f"Average validation accuracy: {val_mean[-1]:.2f} +/- {val_std[-1]:.2f}")

    plot_error_bar(train_mean, train_std, val_mean, val_std, 'accuracy', axes[1], leg_loc='lower right')

    plt.suptitle('5-fold cross-validation')
    plt.show()
        
    # plot average precision (train, validation), recall (train, validation) and f1 (train, validation) vs. epoch with error bars
    fig, axes = plt.subplots(1, 3, figsize=(22, 6))
    plt.rcParams.update({'font.size': 16})

    # precision
    [train_mean, train_std] = calc_mean_std(train_precision_history)
    [val_mean, val_std] = calc_mean_std(val_precision_history)
    print(f"Average train precision: {train_mean[-1]:.2f} +/- {train_std[-1]:.2f}")
    print(f"Average validation precision: {val_mean[-1]:.2f} +/- {val_std[-1]:.2f}")

    plot_error_bar(train_mean, train_std, val_mean, val_std, 'precision', axes[0], leg_loc='lower right')

    # recall
    [train_mean, train_std] = calc_mean_std(train_recall_history)
    [val_mean, val_std] = calc_mean_std(val_recall_history)
    print(f"Average train recall: {train_mean[-1]:.2f} +/- {train_std[-1]:.2f}")
    print(f"Average validation recall: {val_mean[-1]:.2f} +/- {val_std[-1]:.2f}")

    plot_error_bar(train_mean, train_std, val_mean, val_std, 'recall', axes[1], leg_loc='lower right')

    # f1
    [train_mean, train_std] = calc_mean_std(train_f1_history)
    [val_mean, val_std] = calc_mean_std(val_f1_history)
    print(f"Average train f1: {train_mean[-1]:.2f} +/- {train_std[-1]:.2f}")
    print(f"Average validation f1: {val_mean[-1]:.2f} +/- {val_std[-1]:.2f}")

    plot_error_bar(train_mean, train_std, val_mean, val_std, 'f1', axes[2], leg_loc='lower right')

    plt.suptitle('5-fold cross validation')
    plt.show()

    # test models on test fold
    y_5fold_scores = np.zeros((y_test.shape[0], 5))

    for i in range(5):
        final_model = load_model(f'saved_weights_unfiltered_super_testfold{test_round+1}_fold{i+1}.hdf5', compile=False)

        # define loss function and optimizer
        final_model.compile(loss=lambda y_true, y_pred: weighted_binary_crossentropy(y_true, y_pred, class_weights_folds[i]),
                            optimizer=Adam(lr=0.0001),
                            metrics=['accuracy', precision, recall, f1])

        # generate ROC and AUC
        y_scores = final_model.predict([x_test])
        y_5fold_scores[:,i] = y_scores.flatten()

        # test accuracy
        y_pred = np.rint(y_scores)
        accuracy = np.sum(1-np.abs(y_pred.reshape(-1)-y_test))/len(y_test)
        print(f"Accuracy on test fold {test_round+1} and train fold {i+1}: {accuracy:.2f}")

    y_mean_scores = np.mean(y_5fold_scores, axis=1)

    # generate ROC and AUC
    fpr, tpr, _ = roc_curve(y_test, y_mean_scores)
    roc_auc = auc(fpr, tpr)

    # test accuracy
    y_pred = np.rint(y_mean_scores)
    accuracy = np.sum(1-np.abs(y_pred.reshape(-1)-y_test))/len(y_test)

    # print prediction and true label
    df = pd.DataFrame({'sample': samples[idx_test],
                        'fold1_pred': y_5fold_scores[:,0],
                        'fold2_pred': y_5fold_scores[:,1],
                        'fold3_pred': y_5fold_scores[:,2],
                        'fold4_pred': y_5fold_scores[:,3],
                        'fold5_pred': y_5fold_scores[:,4],
                        'mean_pred': y_mean_scores.flatten(),
                        'true': y_test})
    print(df)
    
    # save predictions
    df.to_csv(f'predictions_unfiltered_testfold{test_round+1}.csv', sep=';')

    # fig, axes = plt.subplots(1, 3, figsize=(21, 6))

    # # plot ROC curve
    # axes[0].plot(fpr, tpr)
    # axes[0].plot([0, 1], [0, 1], 'k--')
    # axes[0].set_xlabel('False positive rate')
    # axes[0].set_ylabel('True positive rate')
    # axes[0].set_title('AUC = {0:.2f}'.format(roc_auc))

    # # Calculate reliability curve
    # prob_true, prob_pred = calibration_curve(y_test, y_mean_scores, n_bins=10, strategy='uniform')

    # axes[1].plot(prob_pred, prob_true, marker='o', label='Model Calibration')
    # axes[1].plot([0, 1], [0, 1], linestyle='--', color='gray', label='Perfectly Calibrated')
    # axes[1].set_xlabel('Mean predicted probability')
    # axes[1].set_ylabel('Fraction of positives')
    # axes[1].set_title('Probability calibration plot')
    # axes[1].legend()

    # # Create the scatter plot
    # hb = plt.hexbin(y_mean_scores.flatten(), y_test, gridsize=15, cmap='coolwarm', alpha=0.5, extent=[0,1,0,1])
    # plt.colorbar(hb, label='Density')
    # axes[2].set_xlabel('Predicted score')
    # axes[2].set_ylabel('True label')
    # axes[2].set_title('Predicted scores vs true labels')
    # axes[2].set_xlim(-0.1, 1.1)  # Set x-axis limit from 0 to 1 (probability range)
    # axes[2].set_ylim(-0.1, 1.1)  # Set y-axis limit to accommodate binary labels
    # axes[2].grid(True)

    # plt.suptitle(f'Average accuracy = {round(accuracy*100,1)}%')
    # plt.tight_layout(rect=[0, 0.03, 1, 0.95])
    # plt.show()