In [None]:
## Import libraries
import numpy as np
import pandas as pd
import time
import json
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D

import tensorflow as tf
from tensorflow.keras import Input
from tensorflow.keras.models import Model, Sequential
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Activation, Flatten, concatenate, Dense, Dropout, BatchNormalization
from tensorflow.keras.optimizers import SGD
import tensorflow.keras.backend as K
import gc
from hyperopt import fmin, tpe, hp, space_eval, Trials, STATUS_OK #, rand
from tensorflow.keras.utils import plot_model

from sklearn.model_selection import StratifiedShuffleSplit, StratifiedKFold

In [None]:
## Set the display option for wide dataframes
pd.set_option('display.width', 1000)

In [None]:
## Set variables
seed = 42  # seed for reproducibility for training

k = 4  # k for k-fold cross-validation in hyperparameter tuning
max_evals = 30  # Maxomum number of evaluations for hyperparameter tuning 
seed_tuning = 13  # Seed to split the dataset for hyperparameter tuning

N = 10 # experiment repetitions
epochs = 30  # number of epochs for training
batch_size = 8  # batch size for training

# Load data

In [None]:
## Read list of selected patients
f = open('Data/selected_patients.txt', 'r')
patient_IDs = []
for x in f:
  patient_IDs.append(int(x.strip()))

In [None]:
## Load clinical data
data = pd.read_csv('Data/clinical_data.csv', header=0, index_col=0, delimiter=';')
data = data.loc[patient_IDs]  # Sort the clinical data DataFrame

In [None]:
## Load images
from PIL import Image

front = np.asarray([np.array(Image.open(os.path.join('Data/Images/Frontal/',str(ID).zfill(4) + '_front.jpg'))) for ID in patient_IDs])
L90 = np.asarray([np.array(Image.open(os.path.join('Data/Images/L90/',str(ID).zfill(4) + '_L90.jpg'))) for ID in patient_IDs])
R90 = np.asarray([np.array(Image.open(os.path.join('Data/Images/R90/',str(ID).zfill(4) + '_R90.jpg'))) for ID in patient_IDs])

In [None]:
## Generate binary labels
diagnosis = data['vx-diagnosis']
labels = np.zeros(len(patient_IDs))
labels[diagnosis == 'Sick'] = 1

# Pre-process

In [None]:
## Min-max normalization
M = np.concatenate((front, L90, R90)).max()
m = np.concatenate((front, L90, R90)).min()

front = ((front - m) / (M - m)).astype('float32')
L90 = ((L90 - m) / (M - m)).astype('float32')
R90 = ((R90 - m) / (M - m)).astype('float32')

# Function definitions

In [None]:
## CUSTOM METRICS
from tensorflow.python.keras.utils import metrics_utils
from tensorflow.python.keras.utils.generic_utils import to_list
from tensorflow.python.ops import init_ops
from tensorflow.python.ops import math_ops

def weighted_error(y_true, y_pred):
    WE = 20
    
    # Convert probabilities/logits to binary predictions
    y_pred_binary = tf.round(y_pred)

    # False Negatives: predicted 0, actual 1
    fn = tf.reduce_sum(tf.cast(tf.logical_and(tf.equal(y_true, 1), tf.equal(y_pred_binary, 0)), tf.float32))

    # False Positives: predicted 1, actual 0
    fp = tf.reduce_sum(tf.cast(tf.logical_and(tf.equal(y_true, 0), tf.equal(y_pred_binary, 1)), tf.float32))

    # Weighted error
    return fn * WE + fp

In [None]:
def evaluate_model(preds_raw, ground_truth):

    from sklearn.metrics import confusion_matrix, log_loss, roc_auc_score #, accuracy_score, precision_score, recall_score, f1_score, make_scorer
    
    # BCE loss
    bce = log_loss(ground_truth, preds_raw)

    # Confusion matrix
    TN, FP, FN, TP = confusion_matrix(ground_truth, np.round(preds_raw)).ravel()
    accuracy = (TP + TN) / (TP + TN + FP + FN)  # accuracy = accuracy_score(ground_truth, np.round(preds_raw))
    sensitivity = TP / (TP + FN)
    specificity = TN / (TN + FP)
    #gmean = np.sqrt((TP/(TP+FN))*(TN/(TN+FP)))
    precision = TP / (TP + FP)  # precision = precision_score(ground_truth, np.round(preds_raw))
    F1 = 2 * TP / (2*TP + FP + FN)  # F1 = f1_score(ground_truth, np.round(preds_raw))
    
    # ROC AUC
    auc = roc_auc_score(ground_truth, preds_raw)
    
    # Weighted error    
    we = weighted_error(ground_truth, np.round(preds_raw))
    
    results = {
        'BCELoss':bce,
        'Accuracy':accuracy,
        'TP':TP,
        'FP':FP,
        'TN':TN,
        'FN':FN,
        'Sensitivity':sensitivity,
        'Specificity':specificity,
        #'G-mean':gmean,
        'Precision':precision,
        'Recall':sensitivity,
        'F1':F1,
        'ROC_AUC':auc,
        'WE':we
    }
    return results

# Custom CNN

## Single-input

In [None]:
## Function to check tensor size to see if another downsampling layer can be applied
def can_apply_layer(x, min_height, min_width):
    """Check if the current layer can be applied based on tensor dimensions."""
    shape = K.int_shape(x)
    if shape[1] is None or shape[2] is None:
        return False  # Dynamic shape, cannot evaluate
    return shape[1] >= min_height and shape[2] >= min_width

In [None]:
## Model definition
def create_model_singleInput(params, a=480, b=640):
            
    # Input
    stacked_input = Input(shape=(a, b, 3))
    
    x = Conv2D(32, (3, 3), activation='relu')(stacked_input)
    x = Conv2D(params['conv1_filters'], kernel_size=params['conv1_kernel'], strides=params['conv1_strides'], activation='relu')(x)
    x = MaxPooling2D(pool_size=params['pool1_size'])(x)
    
    x = Conv2D(64, kernel_size=params['conv2_kernel'], strides=params['conv2_strides'], activation='relu')(x)
    if params['add_conv_3']:
        x = Conv2D(64, kernel_size=params['conv3_kernel'], activation='relu')(x)
    if params['add_conv_4']:
        x = Conv2D(64, kernel_size=params['conv4_kernel'], activation='relu')(x)
        x = MaxPooling2D((2, 2))(x)
        
    if can_apply_layer(x, 5, 5):
        if params['add_conv_5']:
            x = Conv2D(params['conv5_filters'], kernel_size=params['conv5_kernel'], activation='relu')(x)
    if can_apply_layer(x, 5, 5):
        if params['add_conv_6']:
            x = Conv2D(params['conv6_filters'], kernel_size=params['conv6_kernel'], activation='relu')(x)
    if can_apply_layer(x, 5, 5):
        if params['add_conv_7']:
            x = Conv2D(params['conv7_filters'], kernel_size=params['conv7_kernel'], activation='relu')(x)
            if can_apply_layer(x, 2, 2):
                x = MaxPooling2D((2, 2))(x)
                
    if can_apply_layer(x, 5, 5):
        if params['add_conv_8']:
            x = Conv2D(128, kernel_size=params['conv8_kernel'], activation='relu')(x)
    if can_apply_layer(x, 5, 5):
        if params['add_conv_9']:
            x = Conv2D(128, kernel_size=params['conv9_kernel'], activation='relu')(x)
    if can_apply_layer(x, 5, 5):
        if params['add_conv_10']:
            x = Conv2D(128, kernel_size=params['conv10_kernel'], activation='relu')(x)
            if can_apply_layer(x, 2, 2):
                x = MaxPooling2D((2, 2))(x)
                
    if can_apply_layer(x, 5, 5):
        if params['add_conv_11']:
            x = Conv2D(128, kernel_size=params['conv11_kernel'], activation='relu')(x)
    if can_apply_layer(x, 5, 5):
        if params['add_conv_12']:
            x = Conv2D(128, kernel_size=params['conv12_kernel'], activation='relu')(x)
            if can_apply_layer(x, 2, 2):
                x = MaxPooling2D((2, 2))(x)
        
    if can_apply_layer(x, 10, 10):
        if params['add_conv_13']:
            x = Conv2D(128, kernel_size=params['conv13_kernel'], activation='relu')(x)
            x = MaxPooling2D((2, 2))(x)
        
    x = Flatten()(x)
    
    x = Dense(params['dense1_units'], activation='relu')(x)
    x = BatchNormalization()(x)
    x = Activation('relu')(x)
    x = Dropout(0.5)(x) 
    
    if params['add_dense_2']:
        x = Dense(params['dense2_units'], use_bias=False)(x)
        x = BatchNormalization()(x)
        x = Activation('relu')(x)
        x = Dropout(0.5)(x)   #new

    x = Dense(params['dense3_units'], activation='relu')(x)
    if params['add_dense_4']:
        x = Dense(params['dense4_units'], use_bias=False)(x)
    X = Dense(1, activation='sigmoid')(x)   
    
    ## create the model
    return Model(stacked_input, X)

### Tune hyperparameters

In [None]:
def data_singleInput():
    ## Read list of selected patients
    f = open('Data/selected_patients.txt', 'r')
    patient_IDs = []
    for x in f:
      patient_IDs.append(int(x.strip()))

    ## Load clinical data to get the diagnosis
    data = pd.read_csv('Data/clinical_data.csv', header=0, index_col=0, delimiter=';')
    data = data.loc[patient_IDs]  # Sort the clinical data DataFrame
    
    ## Generate binary labels
    diagnosis = data['vx-diagnosis']
    labels = np.zeros(len(patient_IDs))
    labels[diagnosis == 'Sick'] = 1

    ## Load images
    from PIL import Image
    front = np.asarray([np.array(Image.open(os.path.join('Data/Images/Frontal/',str(ID).zfill(4) + '_front.jpg'))) for ID in patient_IDs])
    L90 = np.asarray([np.array(Image.open(os.path.join('Data/Images/L90/',str(ID).zfill(4) + '_L90.jpg'))) for ID in patient_IDs])
    R90 = np.asarray([np.array(Image.open(os.path.join('Data/Images/R90/',str(ID).zfill(4) + '_R90.jpg'))) for ID in patient_IDs])

    ## Normalize (min-max)
    M = np.concatenate((front, L90, R90)).max()
    m = np.concatenate((front, L90, R90)).min()
    front = ((front - m) / (M - m)).astype('float32')
    L90 = ((L90 - m) / (M - m)).astype('float32')
    R90 = ((R90 - m) / (M - m)).astype('float32')

    ## Stack views along the channel dimension
    images = np.stack((front[:,:,:,0], L90[:,:,:,0], R90[:,:,:,0]), axis=-1)
    
    return images, labels

In [None]:
def objective_singleInput(params):
    ## Load data
    images, labels = data_singleInput()

    ## Initialize cross-validation
    epochs = 30
    batch_size = 8
    lr = params['lr']
    folds = 5
    seed = 13
    skf = StratifiedKFold(n_splits=folds, shuffle=True, random_state=seed)

    ## Initialize the model
    model = create_model_singleInput(params, images.shape[1], images.shape[2])

    ## Save the initial weights of the model
    initial_weights = model.get_weights()

    ## Perform hyperparameter tuning
    fold_metrics = []
    for i, (train_idx, val_idx) in enumerate(skf.split(images, labels)):
        
        print(f"Fold {i+1}/{folds}")
        
        # Split the data into training and validation sets
        train_images, val_images = images[train_idx], images[val_idx]
        train_labels, val_labels = labels[train_idx], labels[val_idx]

        rate_train = sum(train_labels==0)/sum(train_labels)

        # Reset the model to initial weights
        model.set_weights(initial_weights)

        # Train the model
        optimizer = SGD(learning_rate=lr)
        model.compile(optimizer=optimizer, loss='binary_crossentropy', metrics=['AUC'])

        history = model.fit(
            train_images, train_labels,
            class_weight={0: 1, 1: rate_train},
            validation_data=(val_images, val_labels),
            epochs=epochs,
            batch_size=batch_size,
            verbose=0  # Suppress training output
        )

        # Record the best validation AUC for this fold
        best_auc = max(history.history['val_AUC'])
        fold_metrics.append(best_auc)

        print()

    # Clear TensorFlow session to release GPU memory
    tf.keras.backend.clear_session()
    del model
    gc.collect()

    # Return the negative mean AUC across all folds as the loss
    print(f'Max test ROC AUC in each fold: {fold_metrics}')
    mean_auc = np.mean(fold_metrics)
    return -mean_auc  # Maximize AUC by minimizing its negative

In [None]:
## Search space
space_singleInput = {
    'lr': hp.loguniform('lr', np.log(1e-5), np.log(1e-2)),

    'conv1_filters': hp.choice('conv1_filters', [32, 64]),
    'conv1_kernel': hp.choice('conv1_kernel', [(3, 3), (5, 5)]),
    'conv1_strides': hp.choice('conv1_strides', [1, 2]),
    'pool1_size': hp.choice('pool1_size', [2, 3]),
    'conv2_kernel': hp.choice('conv2_kernel', [(3, 3), (5, 5)]),
    'conv2_strides': hp.choice('conv2_strides', [1, 2]),
    'add_conv_3' : hp.choice('add_conv_3', [True, False]),
    'conv3_kernel': hp.choice('conv3_kernel', [(3, 3), (5, 5)]),
    'add_conv_4' : hp.choice('add_conv_4', [True, False]),
    'conv4_kernel': hp.choice('conv4_kernel', [(3, 3), (5, 5)]),
    'add_conv_5' : hp.choice('add_conv_5', [True, False]),
    'conv5_filters': hp.choice('conv5_filters', [64, 128]),
    'conv5_kernel': hp.choice('conv5_kernel', [(3, 3), (5, 5)]),
    'add_conv_6' : hp.choice('add_conv_6', [True, False]),
    'conv6_filters': hp.choice('conv6_filters', [64, 128]),
    'conv6_kernel': hp.choice('conv6_kernel', [(3, 3), (5, 5)]),
    'add_conv_7' : hp.choice('add_conv_7', [True, False]),
    'conv7_filters': hp.choice('conv7_filters', [64, 128]),
    'conv7_kernel': hp.choice('conv7_kernel', [(3, 3), (5, 5)]),
    'add_conv_8' : hp.choice('add_conv_8', [True, False]),
    'conv8_kernel': hp.choice('conv8_kernel', [(3, 3), (5, 5)]),
    'add_conv_9' : hp.choice('add_conv_9', [True, False]),
    'conv9_kernel': hp.choice('conv9_kernel', [(3, 3), (5, 5)]),
    'add_conv_10' : hp.choice('add_conv_10', [True, False]),
    'conv10_kernel': hp.choice('conv10_kernel', [(3, 3), (5, 5)]),
    'add_conv_11' : hp.choice('add_conv_11', [True, False]),
    'conv11_kernel': hp.choice('conv11_kernel', [(3, 3), (5, 5)]),
    'add_conv_12' : hp.choice('add_conv_12', [True, False]),
    'conv12_kernel': hp.choice('conv12_kernel', [(3, 3), (5, 5)]),
    'add_conv_13' : hp.choice('add_conv_13', [True, False]),
    'conv13_kernel': hp.choice('conv13_kernel', [(3, 3), (5, 5)]),
    
    'dense1_units': hp.choice('dense1_units', [256, 512]),
    'add_dense_2' : hp.choice('add_dense_2', [True, False]),
    'dense2_units': hp.choice('dense2_units', [64, 128]),
    'dense3_units': hp.choice('dense3_units', [32, 64]),
    'add_dense_4' : hp.choice('add_dense_4', [True, False]),
    'dense4_units': hp.choice('dense4_units', [32, 64])
}

In [None]:
## Run hyperparameter tuning
# Create trials object to store optimization results
trials = Trials()

# Run optimization
ti = time.time()
best_singleInput = fmin(
    fn=objective_singleInput,
    space=space_singleInput,
    algo=tpe.suggest,
    max_evals=max_evals,
    trials=trials
)
tuningTime = time.time() - ti

hours, remainder = divmod(tuningTime, 3600)
minutes, seconds = divmod(remainder, 60)
print(f'Hyperparameter tuning took {hours} hours, {minutes} minutes, and {seconds} seconds.')

print("Best hyperparameters:", space_eval(space, best_singleInput))

# Save the results
json_compatible_params = {
    key: (list(value) if isinstance(value, tuple) else value)
    for key, value in space_eval(space_singleInput, best_singleInput).items()
}
with open('Parameters_singleInputCNN.json', "w") as f:
    json.dump(json_compatible_params, f)

### Train N times

In [None]:
## Read tuned hyperparameters
with open("Parameters_singleInputCNN.json", "r") as f:
    params = json.load(f)
    
lr = params['lr']
params.pop('lr')

# Convert lists back to tuples if needed
params = {
    key: (tuple(value) if isinstance(value, list) else value)
    for key, value in params.items()
}

In [None]:
trials_results = []

In [None]:
# Initialize the splitter to split the dataset into 85% train and 15% tests sets N times 
splitter = StratifiedShuffleSplit(n_splits=N, test_size=int(round(0.15*len(labels))), random_state = seed)

In [None]:
for trial, (train_index, test_index) in enumerate(splitter.split(front, labels)):
    
    print(f'Trial {trial + 1}'), print()

    ## Split the dataset into train (85%) and test (15%) sets
    train_front = front[train_index]
    train_L90 = L90[train_index]
    train_R90 = R90[train_index]
    train_labels = labels[train_index]
    
    test_front = front[test_index]
    test_L90 = L90[test_index]
    test_R90 = R90[test_index]
    test_labels = labels[test_index]

    ## Stack views along the channel dimension
    images_train = np.stack((train_front[:,:,:,0], train_L90[:,:,:,0], train_R90[:,:,:,0]), axis=-1)
    images_test = np.stack((test_front[:,:,:,0], test_L90[:,:,:,0], test_R90[:,:,:,0]), axis=-1)

    ## Calculate the weight for training to address class imbalance
    rate_train = sum(train_labels == 0) / sum(train_labels)

    ## Initialize and compile the model
    model = create_model_singleInput(params, front.shape[1], front.shape[2])
    model.compile(loss='binary_crossentropy',
                  metrics=['accuracy', 'AUC', 'TruePositives', 'FalseNegatives', 'TrueNegatives', 'FalsePositives', 'Precision', 'Recall', weighted_error],
                  optimizer=SGD(learning_rate=lr, clipvalue=1.0))
    
    ## Train the model
    ti = time.time()
    history = model.fit(images_train, train_labels, class_weight = {0: 1, 1: rate_train}, 
                        validation_data = (images_test, test_labels),
                        batch_size=batch_size, 
                        epochs=epochs, 
                        #callbacks=[NaNCheckCallback()],
                        verbose=0
                       ) 
    train_time = time.time() - ti

    hours, remainder = divmod(train_time, 3600)
    minutes, seconds = divmod(remainder, 60)
    print(f'Training took {hours} hours, {minutes} minutes, and {seconds} seconds.')

    ## Save learning curves
    plt.figure(figsize=(30, 10))
    plt.subplot(1, 3, 1)
    plt.plot(range(epochs), history.history['loss'], label='Train Loss')
    plt.plot(range(epochs), history.history['val_loss'], label='Test Loss')
    plt.legend(loc='upper right',fontsize=10)
    plt.title('Train and Test Loss',fontsize=12)
    
    plt.subplot(1, 3, 2)
    plt.plot(range(epochs), history.history['accuracy'], label='Training Accuracy')
    plt.plot(range(epochs), history.history['val_accuracy'], label='Validation Accuracy')
    plt.legend(loc='lower right',fontsize=10)
    plt.title('Train and Test Accuracy',fontsize=12)
    
    plt.subplot(1, 3, 3)
    plt.plot(range(epochs), history.history['AUC'], label='Train AUC')
    plt.plot(range(epochs), history.history['val_AUC'], label='Test AUC')
    plt.legend(loc='lower right',fontsize=10)
    plt.title('Train and Test ROC AUC',fontsize=12)
    
    plt.savefig('Learning_curves/singleInputCNN_'+str(trial+1)+'.png'), plt.show()
    plt.show()

    ## Save the model
    model.save('Models/singleInputCNN_'+str(trial+1)+'.h5')

    ## Print the number of parameters in the model
    total_params = model.count_params()  # total_params = np.sum([np.prod(v.shape.as_list()) for v in model.variables])
    trainable_params = np.sum([np.prod(v.shape.as_list()) for v in model.trainable_variables])
    print(f'Classifier has {total_params} total parameters, {trainable_params} of which are trainable.'), print()
    
    ## Predict
    train_preds = model.predict(images_train)
    np.save('Predictions/singleInputCNN_train_'+str(trial+1)+'.npy',train_preds)
    test_preds = model.predict(images_test)
    np.save('Predictions/singleInputCNN_test_'+str(trial+1)+'.npy',test_preds)

    ## Evaluate the model
    results_train = evaluate_model(train_preds, train_labels)
    print('TRAIN results:')
    for metric, value in results_train.items():
        print(f'{metric}: {value:.4f}' if isinstance(value, (float, int)) else f'{metric}: {value}')
    print()

    results_test = evaluate_model(test_preds, test_labels)
    print('TEST results:')
    for metric, value in results_test.items():
        print(f'{metric}: {value:.4f}' if isinstance(value, (float, int)) else f'{metric}: {value}')
    print()

    ## Store results
    trials_results.append(
        {'classifier':'singleInputCNN',
         'trial':trial+1,
         'parameters':trainable_params,
         'trainTime':train_time,
         **{'train_'+k:v for k,v in results_train.items()},
         **{'test_'+k:v for k,v in results_test.items()}
        }
    )    

    ## Clear TensorFlow session to release GPU memory
    tf.keras.backend.clear_session()
    del model
    gc.collect()

    print(), print(100*'#'), print()

pd.DataFrame(trials_results).to_csv('CNN_Results_'+str(N)+'trials.csv')

## Multi-input

In [None]:
## Model definition
def create_model_multiInput(params, a=480, b=640):

    ## Front
    front_input = Input(shape=(a, b, 1))
    front = Conv2D(32, (3, 3), activation='relu')(front_input)
    front = Conv2D(64, kernel_size=params['front_conv1_kernel'], strides=params['front_conv1_strides'], activation='relu')(front)  # strides=2
    front = MaxPooling2D((2, 2))(front)
    front = Conv2D(params['front_conv2_filters'], kernel_size=params['front_conv2_kernel'], activation='relu')(front)  #256, 128
    front = Conv2D(params['front_conv3_filters'], kernel_size=params['front_conv3_kernel'], activation='relu')(front)  #64
    front = MaxPooling2D((2, 2))(front)
    front = Flatten()(front) 
    front = Dense(params['front_dense1_units'], activation='relu')(front)  #256, 512
    if params['front_add_dense2']:
        front = Dropout(0.5)(front)
        front = Dense(params['front_dense2_units'], activation='relu')(front)  #128, 256 
    
    ## L90
    L90_input = Input(shape=(a, b, 1))
    L90 = Conv2D(32, (3, 3), activation='relu')(L90_input)
    L90 = Conv2D(params['L90_conv1_filters'], kernel_size=params['L90_conv1_kernel'], activation='relu')(L90)  #64
    L90 = MaxPooling2D((2, 2))(L90)
    L90 = Conv2D(params['L90_conv2_filters'], kernel_size=params['L90_conv2_kernel'], strides=2, activation='relu')(L90)  #128
    #L90 = Conv2D(64, (5, 5), activation='relu')(L90) 
    if params['L90_add_conv3']:
        L90 = Conv2D(params['L90_conv3_filters'], kernel_size=params['L90_conv3_kernel'], activation='relu')(L90) 
    L90 = Conv2D(params['L90_conv4_filters'], kernel_size=params['L90_conv4_kernel'], activation='relu')(L90) # 64, (5, 5)
    L90 = MaxPooling2D((2, 2))(L90)
    L90 = Flatten()(L90)
    L90 = Dense(256, activation='relu')(L90)  
    #L90 = Dropout(0.5)(L90)
    #L90 = Dense(128, activation='relu')(L90)
    
    ## R90
    R90_input = Input(shape=(a, b, 1))
    R90 = Conv2D(32, (3, 3), activation='relu')(R90_input)
    R90 = Conv2D(params['R90_conv1_filters'], kernel_size=params['R90_conv1_kernel'], activation='relu')(R90)   
    R90 = MaxPooling2D((2, 2))(R90)
    R90 = Conv2D(params['R90_conv2_filters'], kernel_size=params['R90_conv2_kernel'], strides=2, activation='relu')(R90)  #128
    #R90 = Conv2D(64, (5, 5), activation='relu')(R90)  
    if params['R90_add_conv3']:
        R90 = Conv2D(params['R90_conv3_filters'], kernel_size=params['R90_conv3_kernel'], activation='relu')(R90) 
    R90 = Conv2D(params['R90_conv4_filters'], kernel_size=params['R90_conv4_kernel'], activation='relu')(R90)  # 64, (5, 5)
    R90 = MaxPooling2D((2, 2))(R90)
    R90 = Flatten()(R90)
    R90 = Dense(256, activation='relu')(R90)  
    #R90 = Dropout(0.5)(R90)
    #R90 = Dense(128, activation='relu')(R90)

    ## concatenate all the outputs of each stalk of the net.
    concatenated = concatenate([front, L90, R90], axis=-1)
    x = Dense(params['dense1_units'], activation='relu')(concatenated)  ## antes 512, 256
    x = BatchNormalization()(x)
    x = Activation('relu')(x)
    x = Dropout(0.5)(x) 

    if params['add_dense2']:
        x = Dense(params['dense2_units'], use_bias=False)(x)  ## 64
        x = BatchNormalization()(x)
        x = Activation('relu')(x)
        #x = Dense(64, activation='relu')(x)  ## 64, 128
        #x = Dense(32, activation='relu')(x)
        x = Dropout(0.5)(x)   #new

    x = Dense(params['dense3_units'], activation='relu')(x)   # <-- batchnorm en esta capa funciona mal
    X = Dense(1, activation='sigmoid')(x)   
            
    ## create the model
    return Model([front_input, L90_input, R90_input], X)

### Tune hyperparameters

In [None]:
def data_multieInput():
    ## Read list of selected patients
    f = open('Data/selected_patients.txt', 'r')
    patient_IDs = []
    for x in f:
      patient_IDs.append(int(x.strip()))

    ## Load clinical data to get the diagnosis
    data = pd.read_csv('Data/clinical_data.csv', header=0, index_col=0, delimiter=';')
    data = data.loc[patient_IDs]  # Sort the clinical data DataFrame
    
    ## Generate binary labels
    diagnosis = data['vx-diagnosis']
    labels = np.zeros(len(patient_IDs))
    labels[diagnosis == 'Sick'] = 1

    ## Load images
    from PIL import Image
    front = np.asarray([np.array(Image.open(os.path.join('Data/Images/Frontal/',str(ID).zfill(4) + '_front.jpg'))) for ID in patient_IDs])
    L90 = np.asarray([np.array(Image.open(os.path.join('Data/Images/L90/',str(ID).zfill(4) + '_L90.jpg'))) for ID in patient_IDs])
    R90 = np.asarray([np.array(Image.open(os.path.join('Data/Images/R90/',str(ID).zfill(4) + '_R90.jpg'))) for ID in patient_IDs])

    ## Normalize (min-max)
    M = np.concatenate((front, L90, R90)).max()
    m = np.concatenate((front, L90, R90)).min()
    front = ((front - m) / (M - m)).astype('float32')
    L90 = ((L90 - m) / (M - m)).astype('float32')
    R90 = ((R90 - m) / (M - m)).astype('float32')

    ## Convert 3-channel to 1-channel images
    front = np.expand_dims(front[:,:,:,0], -1)
    L90 = np.expand_dims(L90[:,:,:,0], -1)
    R90 = np.expand_dims(R90[:,:,:,0], -1)
    
    return [front, L90, R90], labels

In [None]:
def objective_multiInput(params):
    ## Load data
    images, labels = data_multiInput()
    front, L90, R90 = images

    ## Initialize cross-validation
    epochs = 30
    batch_size = 8
    lr = params['lr']
    folds = 5
    seed = 13
    skf = StratifiedKFold(n_splits=folds, shuffle=True, random_state=seed)

    ## Initialize the model
    model = create_model_multiInput(params, front.shape[1], front.shape[2])

    ## Save the initial weights of the model
    initial_weights = model.get_weights()

    ## Perform hyperparameter tuning
    fold_metrics = []
    for i, (train_idx, val_idx) in enumerate(skf.split(front, labels)):
        
        print(f"Fold {i+1}/{folds}")
        
        # Split the data into training and validation sets
        train_front, val_front = front[train_idx], front[val_idx]
        train_L90, val_L90 = L90[train_idx], L90[val_idx]
        train_R90, val_R90 = R90[train_idx], R90[val_idx]
        train_labels, val_labels = labels[train_idx], labels[val_idx]

        rate_train = sum(train_labels==0)/sum(train_labels)

        # Reset the model to initial weights
        model.set_weights(initial_weights)

        # Train the model
        optimizer = SGD(learning_rate=lr)
        model.compile(optimizer=optimizer, loss='binary_crossentropy', metrics=['AUC'])

        history = model.fit(
            [train_front, train_L90, train_R90], train_labels,
            class_weight={0: 1, 1: rate_train},
            validation_data=([val_front, val_L90, val_R90], val_labels),
            epochs=epochs,
            batch_size=batch_size,
            #callbacks=[NaNCheckCallback()],
            verbose=0  # Suppress training output
        )

        # Record the best validation AUC for this fold
        best_auc = max(history.history['val_AUC'])
        fold_metrics.append(best_auc)

        print()

    # Clear TensorFlow session to release GPU memory
    tf.keras.backend.clear_session()
    del model
    gc.collect()

    # Return the negative mean AUC across all folds as the loss
    print(f'Max test ROC AUC in each fold: {fold_metrics}')
    mean_auc = np.mean(fold_metrics)
    return -mean_auc  # Maximize AUC by minimizing its negative

In [None]:
## Search space
space_multiInput = {
    'lr': hp.loguniform('lr', np.log(1e-5), np.log(1e-2)),
    
    'front_conv1_kernel': hp.choice('front_conv1_kernel', [(3, 3), (5, 5)]),
    'front_conv1_strides': hp.choice('front_conv1_strides', [1, 2]),
    'front_conv2_filters': hp.choice('front_conv2_filters', [64, 128, 256]),
    'front_conv2_kernel': hp.choice('front_conv2_kernel', [(3, 3), (5, 5)]),
    'front_conv3_filters': hp.choice('front_conv3_filters', [64, 128]),
    'front_conv3_kernel': hp.choice('front_conv3_kernel', [(3, 3), (5, 5)]),
    'front_dense1_units': hp.choice('front_dense1_units', [256, 512]),
    'front_add_dense2' : hp.choice('front_add_dense2', [True, False]),
    'front_dense2_units': hp.choice('front_dense2_units', [128, 256]),

    'L90_conv1_filters': hp.choice('L90_conv1_filters', [32, 64]),
    'L90_conv1_kernel': hp.choice('L90_conv1_kernel', [(3, 3), (5, 5)]),
    'L90_conv2_filters': hp.choice('L90_conv2_filters', [64, 128]),
    'L90_conv2_kernel': hp.choice('L90_conv2_kernel', [(3, 3), (5, 5)]),
    'L90_add_conv3' : hp.choice('L90_add_conv3', [True, False]),
    'L90_conv3_filters': hp.choice('L90_conv3_filters', [64, 128]),
    'L90_conv3_kernel': hp.choice('L90_conv3_kernel', [(3, 3), (5, 5)]),
    'L90_conv4_filters': hp.choice('L90_conv4_filters', [64, 128]),
    'L90_conv4_kernel': hp.choice('L90_conv4_kernel', [(3, 3), (5, 5)]),

    'R90_conv1_filters': hp.choice('R90_conv1_filters', [32, 64]),
    'R90_conv1_kernel': hp.choice('R90_conv1_kernel', [(3, 3), (5, 5)]),
    'R90_conv2_filters': hp.choice('R90_conv2_filters', [64, 128]),
    'R90_conv2_kernel': hp.choice('R90_conv2_kernel', [(3, 3), (5, 5)]),
    'R90_add_conv3' : hp.choice('R90_add_conv3', [True, False]),
    'R90_conv3_filters': hp.choice('R90_conv3_filters', [64, 128]),
    'R90_conv3_kernel': hp.choice('R90_conv3_kernel', [(3, 3), (5, 5)]),
    'R90_conv4_filters': hp.choice('R90_conv4_filters', [64, 128]),
    'R90_conv4_kernel': hp.choice('R90_conv4_kernel', [(3, 3), (5, 5)]),

    'dense1_units': hp.choice('dense1_units', [256, 512]),
    'add_dense2' : hp.choice('add_dense2', [True, False]),
    'dense2_units': hp.choice('dense2_units', [64, 128]),
    'dense3_units': hp.choice('dense3_units', [32, 64])
}

In [None]:
## Run hyperparameter tuning
# Create trials object to store optimization results
trials = Trials()

# Run optimization
ti = time.time()
best_multiInput = fmin(
    fn=objective_multiInput,
    space=space_multiInput,
    algo=tpe.suggest,
    max_evals=max_evals,
    trials=trials
)
tuningTime = time.time() - ti

hours, remainder = divmod(tuningTime, 3600)
minutes, seconds = divmod(remainder, 60)
print(f'Hyperparameter tuning took {hours} hours, {minutes} minutes, and {seconds} seconds.')

print("Best hyperparameters:", space_eval(space, best_multiInput))

# Save the results
json_compatible_params = {
    key: (list(value) if isinstance(value, tuple) else value)
    for key, value in space_eval(space_multiInput, best_multiInput).items()
}
with open('Parameters_multiInputCNN.json', "w") as f:
    json.dump(json_compatible_params, f)

### Train N times

In [None]:
## Read tuned hyperparameters
with open("Parameters_multiInputCNN.json", "r") as f:
    params = json.load(f)
    
lr = params['lr']
params.pop('lr')

# Convert lists back to tuples if needed
params = {
    key: (tuple(value) if isinstance(value, list) else value)
    for key, value in params.items()
}

In [None]:
trials_results = pd.read_csv('CNN_Results_'+str(N)+'trials.csv', index_col = 0).to_dict(orient='records')

In [None]:
# Initialize the splitter to split the dataset into 85% train and 15% tests sets N times 
splitter = StratifiedShuffleSplit(n_splits=N, test_size=int(round(0.15*len(labels))), random_state = seed)

In [None]:
for trial, (train_index, test_index) in enumerate(splitter.split(front, labels)):
    
    print(f'Trial {trial + 1}'), print()

    ## Split the dataset into train (85%) and test (15%) sets
    train_front = front[train_index]
    train_L90 = L90[train_index]
    train_R90 = R90[train_index]
    train_labels = labels[train_index]
    
    test_front = front[test_index]
    test_L90 = L90[test_index]
    test_R90 = R90[test_index]
    test_labels = labels[test_index]

    ## Convert 3-channel to 1-channel images
    train_front = np.expand_dims(train_front[:,:,:,0], -1)
    train_L90 = np.expand_dims(train_L90[:,:,:,0], -1)
    train_R90 = np.expand_dims(train_R90[:,:,:,0], -1)
    
    test_front = np.expand_dims(test_front[:,:,:,0], -1)
    test_L90 = np.expand_dims(test_L90[:,:,:,0], -1)
    test_R90 = np.expand_dims(test_R90[:,:,:,0], -1)

    ## Calculate the weight for training to address class imbalance
    rate_train = sum(train_labels == 0) / sum(train_labels)

    ## Initialize and compile the model
    model = create_model_multiInput(params, front.shape[1], front.shape[2])
    model.compile(loss='binary_crossentropy',
                  metrics=['accuracy', 'AUC', 'TruePositives', 'FalseNegatives', 'TrueNegatives', 'FalsePositives', 'Precision', 'Recall', weighted_error],  
                  optimizer=SGD(learning_rate=lr, clipvalue=1.0))
    
    ## Train the model
    ti = time.time()
    history = model.fit([train_front, train_L90, train_R90], train_labels, class_weight = {0: 1, 1: rate_train}, 
                        validation_data = ([test_front, test_L90, test_R90], test_labels),
                        batch_size=batch_size, 
                        epochs=epochs, 
                        #callbacks=[NaNCheckCallback()],
                        verbose=0
                       ) 
    train_time = time.time() - ti

    hours, remainder = divmod(train_time, 3600)
    minutes, seconds = divmod(remainder, 60)
    print(f'Training took {hours} hours, {minutes} minutes, and {seconds} seconds.')

    ## Save learning curves
    plt.figure(figsize=(30, 10))
    plt.subplot(1, 3, 1)
    plt.plot(range(epochs), history.history['loss'], label='Train Loss')
    plt.plot(range(epochs), history.history['val_loss'], label='Test Loss')
    plt.legend(loc='upper right',fontsize=10)
    plt.title('Train and Test Loss',fontsize=12)
    
    plt.subplot(1, 3, 2)
    plt.plot(range(epochs), history.history['accuracy'], label='Training Accuracy')
    plt.plot(range(epochs), history.history['val_accuracy'], label='Validation Accuracy')
    plt.legend(loc='lower right',fontsize=10)
    plt.title('Train and Test Accuracy',fontsize=12)
    
    plt.subplot(1, 3, 3)
    plt.plot(range(epochs), history.history['AUC'], label='Train AUC')
    plt.plot(range(epochs), history.history['val_AUC'], label='Test AUC')
    plt.legend(loc='lower right',fontsize=10)
    plt.title('Train and Test ROC AUC',fontsize=12)
    
    plt.savefig('Learning_curves/multiInputCNN_'+str(trial+1)+'.png'), plt.show()
    plt.show()

    ## Save the model
    model.save('Models/multiInputCNN_'+str(trial+1)+'.h5')

    ## Print the number of parameters in the model
    total_params = model.count_params()  # total_params = np.sum([np.prod(v.shape.as_list()) for v in model.variables])
    trainable_params = np.sum([np.prod(v.shape.as_list()) for v in model.trainable_variables])
    print(f'Classifier has {total_params} total parameters, {trainable_params} of which are trainable.'), print()
    
    ## Predict
    train_preds = model.predict([train_front, train_L90, train_R90])
    np.save('Predictions/multiInputCNN_train_'+str(trial+1)+'.npy',train_preds)
    test_preds = model.predict([test_front, test_L90, test_R90])
    np.save('Predictions/multiInputCNN_test_'+str(trial+1)+'.npy',test_preds)

    ## Evaluate the model
    results_train = evaluate_model(train_preds, train_labels)
    print('TRAIN results:')
    for metric, value in results_train.items():
        print(f'{metric}: {value:.4f}' if isinstance(value, (float, int)) else f'{metric}: {value}')
    print()

    results_test = evaluate_model(test_preds, test_labels)
    print('TEST results:')
    for metric, value in results_test.items():
        print(f'{metric}: {value:.4f}' if isinstance(value, (float, int)) else f'{metric}: {value}')
    print()

    ## Store results
    trials_results.append(
        {'classifier':'multiInputCNN',
         'trial':trial+1,
         'parameters':trainable_params,
         'trainTime':train_time,
         **{'train_'+k:v for k,v in results_train.items()},
         **{'test_'+k:v for k,v in results_test.items()}
        }
    )    

    ## Clear TensorFlow session to release GPU memory
    tf.keras.backend.clear_session()
    del model
    gc.collect()

    print(), print(100*'#'), print()

pd.DataFrame(trials_results).to_csv('CNN_Results_'+str(N)+'trials.csv')

# Pre-trained CNNs

In [None]:
# Percentage of top layers to unfreeze during second step of fine-tuning
perc_unfreeze = 0.2
epochs_freeze = epochs//2

## Functions for hyperparameter tunning

In [None]:
## Hyperopt objective function
def objective_preTrainedCNN(params, baseModel_fn):

    ## Load data
    images, labels = data_singleInput()

    ## Initialize cross-validation
    epochs = 30
    epochs_freeze = epochs//2
    perc_unfreeze = 0.2
    lr_freeze = params['lr_freeze']
    lr_unfreeze = params['lr_unfreeze']
    batch_size = 8
    folds = 5
    seed = 13
    skf = StratifiedKFold(n_splits=folds, shuffle=True, random_state=seed)

    ## Initialize the model
    # Load the base model
    baseModel = baseModel_fn(weights='imagenet', include_top=False, input_shape=(images.shape[1], images.shape[2], 3))

    # Initially freeze all layers of the base model
    for layer in baseModel.layers:
        layer.trainable = False

    # Build the full model
    model = Sequential()
    model.add(baseModel)
    model.add(Flatten())
    model.add(Dense(256, activation='relu'))
    model.add(Dropout(0.5))
    model.add(Dense(1, activation='sigmoid'))

    ## Save the initial weights of the model
    initial_weights = model.get_weights()

    ## Perform hyperparameter tuning
    fold_metrics = []
    for i, (train_idx, val_idx) in enumerate(skf.split(images, labels)):
        
        print(f"Fold {i+1}/{folds}")
        
        ## Split the data into training and validation sets
        train_images, val_images = images[train_idx], images[val_idx]
        train_labels, val_labels = labels[train_idx], labels[val_idx]

        rate_train = sum(train_labels==0)/sum(train_labels)

        ## Reset the model to initial weights
        model.set_weights(initial_weights)
        

        ## Step 1: Train only the fully connected layers
        optimizer = SGD(learning_rate=lr_freeze)
        model.compile(optimizer=optimizer, loss='binary_crossentropy', metrics=['AUC'])

        # Train the model
        history = model.fit(
            train_images, train_labels,
            class_weight={0: 1, 1: rate_train},
            validation_data=(val_images, val_labels),
            epochs=epochs_freeze,
            batch_size=batch_size,
            #callbacks=[NaNCheckCallback()],
            verbose=0  # Suppress training output
        )

        # Print the best validation AUC for in this step
        best_auc_1 = max(history.history['val_AUC'])
        print(f'Max test ROC AUC after step 1 of fine-tuning: {best_auc_1}')
        print()
        

        ## Step 2: Unfreeze part of the base model
        total_layers = len(baseModel.layers)
        unfreeze_layers = int(total_layers * perc_unfreeze)

        for layer in baseModel.layers[total_layers - unfreeze_layers:]:
            layer.trainable = True

        # Recompile the model with a reduced learning rate
        optimizer = SGD(learning_rate=lr_unfreeze)
        model.compile(optimizer=optimizer, loss='binary_crossentropy', metrics=['AUC'])

        # Train the model for fine-tuning
        history = model.fit(
            train_images, train_labels,
            class_weight={0: 1, 1: rate_train},
            validation_data=(val_images, val_labels),
            epochs=epochs-epochs_freeze,
            batch_size=batch_size,
            #callbacks=[NaNCheckCallback()],
            verbose=0
        )

        # Print the best validation AUC for in this step
        best_auc = max(history.history['val_AUC'])
        print(f'Max test ROC AUC after step 2 of fine-tuning: {best_auc}')
        fold_metrics.append(best_auc)

        print()        

    # Clear TensorFlow session to release GPU memory
    tf.keras.backend.clear_session()
    del model
    gc.collect()

    # Return the negative mean AUC across all folds as the loss
    print(f'Max test ROC AUC in each fold: {fold_metrics}')
    mean_auc = np.mean(fold_metrics)
    return -mean_auc  # Maximize AUC by minimizing its negative


In [None]:
## Hyperopt search space
space_preTrainedCNN = {
    'lr_freeze': hp.loguniform('lr_freeze', np.log(1e-5), np.log(1e-2)),
    'lr_unfreeze': hp.loguniform('lr_unfreeze', np.log(1e-6), np.log(1e-3))
}

In [None]:
learning_rates_pt = {}

## VGG 16

In [None]:
from tensorflow.keras.applications import VGG16

### Tune hyperparameters

In [None]:
## Run hyperparameter tuning
# Objective function for the specific pre-trained CNN
objective_pt = lambda params: objective_preTrainedCNN(params, VGG16)

# Create trials object to store optimization results
trials = Trials()

# Run optimization
ti = time.time()
best = fmin(
    fn=objective_pt,
    space=space_preTrainedCNN,
    algo=tpe.suggest,
    max_evals=max_evals,
    trials = trials
)
tuningTime = time.time() - ti

hours, remainder = divmod(tuningTime, 3600)
minutes, seconds = divmod(remainder, 60)
print(f'Hyperparameter tuning took {hours} hours, {minutes} minutes, and {seconds} seconds.')

print("Best hyperparameters:", space_eval(space_preTrainedCNN, best))

learning_rates_pt['VGG16'] = {'lr_freeze' : best['lr_freeze'], 'lr_unfreeze' : best['lr_unfreeze']}

# Plot the parameter tuning process
lrs_freeze = np.asarray([trial['misc']['vals']['lr_freeze'][0] for trial in trials.trials])
lrs_unfreeze = np.asarray([trial['misc']['vals']['lr_unfreeze'][0] for trial in trials.trials])
losses = np.asarray([trial['result']['loss'] for trial in trials.trials])
    
# Plot the results
fig = plt.figure(figsize=(10, 6))
ax = fig.add_subplot(projection='3d')
ax.scatter(lrs_freeze, lrs_unfreeze, -losses, c=losses, cmap='viridis', label='Loss vs. Learning Rates')
ax.set_xlabel('Learning Rate for Step 1')
ax.set_ylabel('Learning Rate for Step 2')
ax.set_zlabel('Mean ROC AUC')
ax.set_title('Hyperparameter Tuning: ROC AUC vs. Learning Rate')
plt.grid(True)
plt.savefig('Parameter_tuning/LearningRate_tuning/VGG16.png'), plt.show()
plt.show()
    
# Save the results
pd.DataFrame(learning_rates_pt).to_csv('Parameters_preTrainedCNNs.csv')

### Train N times

In [None]:
learning_rates_pt = pd.read_csv('Parameters_preTrainedCNNs.csv', index_col = 0)
trials_results = pd.read_csv('CNN_Results_'+str(N)+'trials.csv', index_col = 0).to_dict(orient='records')

In [None]:
# Initialize the splitter to split the dataset into 85% train and 15% tests sets N times 
splitter = StratifiedShuffleSplit(n_splits=N, test_size=int(round(0.15*len(labels))), random_state = seed)

# Get the tuned learning rates
lr_freeze = learning_rates_pt['VGG16']['lr_freeze']
lr_unfreeze = learning_rates_pt['VGG16']['lr_unfreeze']
    
for trial, (train_index, test_index) in enumerate(splitter.split(front, labels)):
        
    print(f'Trial {trial + 1}'), print()

    ## Split the dataset into train (85%) and test (15%) sets
    train_front = front[train_index]
    train_L90 = L90[train_index]
    train_R90 = R90[train_index]
    train_labels = labels[train_index]
        
    test_front = front[test_index]
    test_L90 = L90[test_index]
    test_R90 = R90[test_index]
    test_labels = labels[test_index]
    
    ## Stack views along the channel dimension
    images_train = np.stack((train_front[:,:,:,0], train_L90[:,:,:,0], train_R90[:,:,:,0]), axis=-1)
    images_test = np.stack((test_front[:,:,:,0], test_L90[:,:,:,0], test_R90[:,:,:,0]), axis=-1)
    
    ## Calculate the weight for training to address class imbalance
    rate_train = sum(train_labels == 0) / sum(train_labels)
    
    ## Build the model
    # Load the base model
    baseModel = VGG16(weights='imagenet', include_top=False, input_shape=(images.shape[1], images.shape[2], 3))
    
    # Initially freeze all layers of the base model
    for layer in baseModel.layers:
        layer.trainable = False
    
    # Build the full model
    model = Sequential()
    model.add(baseModel)
    model.add(Flatten())
    model.add(Dense(256, activation='relu'))
    model.add(Dropout(0.5))
    model.add(Dense(1, activation='sigmoid'))


    ## Step 1: Train only the fully connected layers
    model.compile(loss='binary_crossentropy',
                  metrics=['accuracy', 'AUC', 'TruePositives', 'FalseNegatives', 'TrueNegatives', 'FalsePositives', 'Precision', 'Recall', weighted_error],   
                  optimizer=SGD(learning_rate=lr_freeze))

    # Train the model
    ti = time.time()
    history1 = model.fit(
        train_images, train_labels,
        class_weight={0: 1, 1: rate_train},
        validation_data=(val_images, val_labels),
        epochs=epochs_freeze,
        batch_size=batch_size,
        #callbacks=[NaNCheckCallback()],
        verbose=0  # Suppress training output
    )

    # Print the best validation AUC for in this step
    best_auc = max(history1.history['val_AUC'])
    print(f'Max test ROC AUC after step 1 of fine-tuning: {best_auc}')
    print()
        

    ## Step 2: Unfreeze part of the base model
    total_layers = len(baseModel.layers)
    unfreeze_layers = int(total_layers * perc_unfreeze)

    for layer in baseModel.layers[total_layers - unfreeze_layers:]:
        layer.trainable = True

    # Recompile the model with a reduced learning rate
    model.compile(loss='binary_crossentropy',
                  metrics=['accuracy', 'AUC', 'TruePositives', 'FalseNegatives', 'TrueNegatives', 'FalsePositives', 'Precision', 'Recall', weighted_error],  
                  optimizer=SGD(learning_rate=lr_unfreeze))

    # Train the model for fine-tuning
    history2 = model.fit(
        train_images, train_labels,
        class_weight={0: 1, 1: rate_train},
        validation_data=(val_images, val_labels),
        epochs=epochs-epochs_freeze,
        batch_size=batch_size,
        #callbacks=[NaNCheckCallback()],
        verbose=0
    )
        
    # Print the best validation AUC for in this step
    best_auc = max(history2.history['val_AUC'])
    print(f'Max test ROC AUC after step 2 of fine-tuning: {best_auc}')

    train_time = time.time() - ti
    hours, remainder = divmod(train_time, 3600)
    minutes, seconds = divmod(remainder, 60)
    print(f'Training took {hours} hours, {minutes} minutes, and {seconds} seconds.')

    ## Save learning curves
    plt.figure(figsize=(30, 10))
    plt.subplot(1, 3, 1)
    plt.plot(range(epochs), history1.history['loss']+history2.history['loss'], label='Train Loss')
    plt.plot(range(epochs), history1.history['val_loss']+history2.history['val_loss'], label='Test Loss')
    plt.legend(loc='upper right',fontsize=10)
    plt.title('Train and Test Loss',fontsize=12)
    
    plt.subplot(1, 3, 2)
    plt.plot(range(epochs), history1.history['accuracy']+history2.history['accuracy'], label='Training Accuracy')
    plt.plot(range(epochs), history1.history['val_accuracy']+history2.history['val_accuracy'], label='Validation Accuracy')
    plt.legend(loc='lower right',fontsize=10)
    plt.title('Train and Test Accuracy',fontsize=12)
    
    plt.subplot(1, 3, 3)
    plt.plot(range(epochs), history1.history['AUC']+history2.history['AUC'], label='Train AUC')
    plt.plot(range(epochs), history1.history['val_AUC']+history2.history['val_AUC'], label='Test AUC')
    plt.legend(loc='lower right',fontsize=10)
    plt.title('Train and Test ROC AUC',fontsize=12)
    
    plt.savefig('Learning_curves/VGG16_'+str(trial+1)+'.png'), plt.show()
    plt.show()

    ## Save the model
    model.save('Models/VGG16_'+str(trial+1)+'.h5')

    ## Print the number of parameters in the model
    total_params = model.count_params()  # total_params = np.sum([np.prod(v.shape.as_list()) for v in model.variables])
    trainable_params = np.sum([np.prod(v.shape.as_list()) for v in model.trainable_variables])
    print(f'Classifier has {total_params} total parameters, {trainable_params} of which are trainable.'), print()

    
    ## Predict
    train_preds = model.predict(images_train)
    np.save('Predictions/VGG16_train_'+str(trial+1)+'.npy',train_preds)
    test_preds = model.predict(images_test)
    np.save('Predictions/VGG16_test_'+str(trial+1)+'.npy',test_preds)

    ## Evaluate the model
    results_train = evaluate_model(train_preds, train_labels)
    print('TRAIN results:')
    for metric, value in results_train.items():
        print(f'{metric}: {value:.4f}' if isinstance(value, (float, int)) else f'{metric}: {value}')
    print()

    results_test = evaluate_model(test_preds, test_labels)
    print('TEST results:')
    for metric, value in results_test.items():
        print(f'{metric}: {value:.4f}' if isinstance(value, (float, int)) else f'{metric}: {value}')
    print()

    ## Store results
    trials_results.append(
        {'classifier':'VGG16',
         'trial':trial+1,
         'parameters':trainable_params,
         'trainTime':train_time,
         **{'train_'+k:v for k,v in results_train.items()},
         **{'test_'+k:v for k,v in results_test.items()}
        }
    )    

    ## Clear TensorFlow session to release GPU memory
    tf.keras.backend.clear_session()
    del model
    gc.collect()

pd.DataFrame(trials_results).to_csv('CNN_Results_'+str(N)+'trials.csv')

## DenseNet 121

In [None]:
from tensorflow.keras.applications import DenseNet121

### Tune hyperparameters

In [None]:
## Run hyperparameter tuning
# Objective function for the specific pre-trained CNN
objective_pt = lambda params: objective_preTrainedCNN(params, DenseNet121)

# Create trials object to store optimization results
trials = Trials()

# Run optimization
ti = time.time()
best = fmin(
    fn=objective_pt,
    space=space_preTrainedCNN,
    algo=tpe.suggest,
    max_evals=max_evals,
    trials = trials
)
tuningTime = time.time() - ti

hours, remainder = divmod(tuningTime, 3600)
minutes, seconds = divmod(remainder, 60)
print(f'Hyperparameter tuning took {hours} hours, {minutes} minutes, and {seconds} seconds.')

print("Best hyperparameters:", space_eval(space_preTrainedCNN, best))

learning_rates_pt['DenseNet121'] = {'lr_freeze' : best['lr_freeze'], 'lr_unfreeze' : best['lr_unfreeze']}

# Plot the parameter tuning process
lrs_freeze = np.asarray([trial['misc']['vals']['lr_freeze'][0] for trial in trials.trials])
lrs_unfreeze = np.asarray([trial['misc']['vals']['lr_unfreeze'][0] for trial in trials.trials])
losses = np.asarray([trial['result']['loss'] for trial in trials.trials])
    
# Plot the results
fig = plt.figure(figsize=(10, 6))
ax = fig.add_subplot(projection='3d')
ax.scatter(lrs_freeze, lrs_unfreeze, -losses, c=losses, cmap='viridis', label='Loss vs. Learning Rates')
ax.set_xlabel('Learning Rate for Step 1')
ax.set_ylabel('Learning Rate for Step 2')
ax.set_zlabel('Mean ROC AUC')
ax.set_title('Hyperparameter Tuning: ROC AUC vs. Learning Rate')
plt.grid(True)
plt.savefig('Parameter_tuning/LearningRate_tuning/DenseNet121.png'), plt.show()
plt.show()
    
# Save the results
pd.DataFrame(learning_rates_pt).to_csv('Parameters_preTrainedCNNs.csv')

### Train N times

In [None]:
learning_rates_pt = pd.read_csv('Parameters_preTrainedCNNs.csv', index_col = 0)
trials_results = pd.read_csv('CNN_Results_'+str(N)+'trials.csv', index_col = 0).to_dict(orient='records')

In [None]:
# Initialize the splitter to split the dataset into 85% train and 15% tests sets N times 
splitter = StratifiedShuffleSplit(n_splits=N, test_size=int(round(0.15*len(labels))), random_state = seed)

# Get the tuned learning rates
lr_freeze = learning_rates_pt['DenseNet121']['lr_freeze']
lr_unfreeze = learning_rates_pt['DenseNet121']['lr_unfreeze']
    
for trial, (train_index, test_index) in enumerate(splitter.split(front, labels)):
        
    print(f'Trial {trial + 1}'), print()

    ## Split the dataset into train (85%) and test (15%) sets
    train_front = front[train_index]
    train_L90 = L90[train_index]
    train_R90 = R90[train_index]
    train_labels = labels[train_index]
        
    test_front = front[test_index]
    test_L90 = L90[test_index]
    test_R90 = R90[test_index]
    test_labels = labels[test_index]
    
    ## Stack views along the channel dimension
    images_train = np.stack((train_front[:,:,:,0], train_L90[:,:,:,0], train_R90[:,:,:,0]), axis=-1)
    images_test = np.stack((test_front[:,:,:,0], test_L90[:,:,:,0], test_R90[:,:,:,0]), axis=-1)
    
    ## Calculate the weight for training to address class imbalance
    rate_train = sum(train_labels == 0) / sum(train_labels)
    
    ## Build the model
    # Load the base model
    baseModel = DenseNet121(weights='imagenet', include_top=False, input_shape=(images.shape[1], images.shape[2], 3))
    
    # Initially freeze all layers of the base model
    for layer in baseModel.layers:
        layer.trainable = False
    
    # Build the full model
    model = Sequential()
    model.add(baseModel)
    model.add(Flatten())
    model.add(Dense(256, activation='relu'))
    model.add(Dropout(0.5))
    model.add(Dense(1, activation='sigmoid'))


    ## Step 1: Train only the fully connected layers
    model.compile(loss='binary_crossentropy',
                  metrics=['accuracy', 'AUC', 'TruePositives', 'FalseNegatives', 'TrueNegatives', 'FalsePositives', 'Precision', 'Recall', weighted_error],
                  optimizer=SGD(learning_rate=lr_freeze))

    # Train the model
    ti = time.time()
    history1 = model.fit(
        train_images, train_labels,
        class_weight={0: 1, 1: rate_train},
        validation_data=(val_images, val_labels),
        epochs=epochs_freeze,
        batch_size=batch_size,
        #callbacks=[NaNCheckCallback()],
        verbose=0  # Suppress training output
    )

    # Print the best validation AUC for in this step
    best_auc = max(history1.history['val_AUC'])
    print(f'Max test ROC AUC after step 1 of fine-tuning: {best_auc}')
    print()
        

    ## Step 2: Unfreeze part of the base model
    total_layers = len(baseModel.layers)
    unfreeze_layers = int(total_layers * perc_unfreeze)

    for layer in baseModel.layers[total_layers - unfreeze_layers:]:
        layer.trainable = True

    # Recompile the model with a reduced learning rate
    model.compile(loss='binary_crossentropy',
                  metrics=['accuracy', 'AUC', 'TruePositives', 'FalseNegatives', 'TrueNegatives', 'FalsePositives', 'Precision', 'Recall', weighted_error],   
                  optimizer=SGD(learning_rate=lr_unfreeze))

    # Train the model for fine-tuning
    history2 = model.fit(
        train_images, train_labels,
        class_weight={0: 1, 1: rate_train},
        validation_data=(val_images, val_labels),
        epochs=epochs-epochs_freeze,
        batch_size=batch_size,
        #callbacks=[NaNCheckCallback()],
        verbose=0
    )
        
    # Print the best validation AUC for in this step
    best_auc = max(history2.history['val_AUC'])
    print(f'Max test ROC AUC after step 2 of fine-tuning: {best_auc}')

    train_time = time.time() - ti
    hours, remainder = divmod(train_time, 3600)
    minutes, seconds = divmod(remainder, 60)
    print(f'Training took {hours} hours, {minutes} minutes, and {seconds} seconds.')

    ## Save learning curves
    plt.figure(figsize=(30, 10))
    plt.subplot(1, 3, 1)
    plt.plot(range(epochs), history1.history['loss']+history2.history['loss'], label='Train Loss')
    plt.plot(range(epochs), history1.history['val_loss']+history2.history['val_loss'], label='Test Loss')
    plt.legend(loc='upper right',fontsize=10)
    plt.title('Train and Test Loss',fontsize=12)
    
    plt.subplot(1, 3, 2)
    plt.plot(range(epochs), history1.history['accuracy']+history2.history['accuracy'], label='Training Accuracy')
    plt.plot(range(epochs), history1.history['val_accuracy']+history2.history['val_accuracy'], label='Validation Accuracy')
    plt.legend(loc='lower right',fontsize=10)
    plt.title('Train and Test Accuracy',fontsize=12)
    
    plt.subplot(1, 3, 3)
    plt.plot(range(epochs), history1.history['AUC']+history2.history['AUC'], label='Train AUC')
    plt.plot(range(epochs), history1.history['val_AUC']+history2.history['val_AUC'], label='Test AUC')
    plt.legend(loc='lower right',fontsize=10)
    plt.title('Train and Test ROC AUC',fontsize=12)
    
    plt.savefig('Learning_curves/DenseNet121_'+str(trial+1)+'.png'), plt.show()
    plt.show()

    ## Save the model
    model.save('Models/DenseNet121_'+str(trial+1)+'.h5')

    ## Print the number of parameters in the model
    total_params = model.count_params()  # total_params = np.sum([np.prod(v.shape.as_list()) for v in model.variables])
    trainable_params = np.sum([np.prod(v.shape.as_list()) for v in model.trainable_variables])
    print(f'Classifier has {total_params} total parameters, {trainable_params} of which are trainable.'), print()

    
    ## Predict
    train_preds = model.predict(images_train)
    np.save('Predictions/DenseNet121_train_'+str(trial+1)+'.npy',train_preds)
    test_preds = model.predict(images_test)
    np.save('Predictions/DenseNet121_test_'+str(trial+1)+'.npy',test_preds)

    ## Evaluate the model
    results_train = evaluate_model(train_preds, train_labels)
    print('TRAIN results:')
    for metric, value in results_train.items():
        print(f'{metric}: {value:.4f}' if isinstance(value, (float, int)) else f'{metric}: {value}')
    print()

    results_test = evaluate_model(test_preds, test_labels)
    print('TEST results:')
    for metric, value in results_test.items():
        print(f'{metric}: {value:.4f}' if isinstance(value, (float, int)) else f'{metric}: {value}')
    print()

    ## Store results
    trials_results.append(
        {'classifier':'DenseNet121',
         'trial':trial+1,
         'parameters':trainable_params,
         'trainTime':train_time,
         **{'train_'+k:v for k,v in results_train.items()},
         **{'test_'+k:v for k,v in results_test.items()}
        }
    )    

    ## Clear TensorFlow session to release GPU memory
    tf.keras.backend.clear_session()
    del model
    gc.collect()

pd.DataFrame(trials_results).to_csv('CNN_Results_'+str(N)+'trials.csv')

## ResNet 50

In [None]:
from tensorflow.keras.applications import ResNet50

### Tune hyperparameters

In [None]:
## Run hyperparameter tuning
# Objective function for the specific pre-trained CNN
objective_pt = lambda params: objective_preTrainedCNN(params, ResNet50)

# Create trials object to store optimization results
trials = Trials()

# Run optimization
ti = time.time()
best = fmin(
    fn=objective_pt,
    space=space_preTrainedCNN,
    algo=tpe.suggest,
    max_evals=max_evals,
    trials = trials
)
tuningTime = time.time() - ti

hours, remainder = divmod(tuningTime, 3600)
minutes, seconds = divmod(remainder, 60)
print(f'Hyperparameter tuning took {hours} hours, {minutes} minutes, and {seconds} seconds.')

print("Best hyperparameters:", space_eval(space_preTrainedCNN, best))

learning_rates_pt['ResNet50'] = {'lr_freeze' : best['lr_freeze'], 'lr_unfreeze' : best['lr_unfreeze']}

# Plot the parameter tuning process
lrs_freeze = np.asarray([trial['misc']['vals']['lr_freeze'][0] for trial in trials.trials])
lrs_unfreeze = np.asarray([trial['misc']['vals']['lr_unfreeze'][0] for trial in trials.trials])
losses = np.asarray([trial['result']['loss'] for trial in trials.trials])
    
# Plot the results
fig = plt.figure(figsize=(10, 6))
ax = fig.add_subplot(projection='3d')
ax.scatter(lrs_freeze, lrs_unfreeze, -losses, c=losses, cmap='viridis', label='Loss vs. Learning Rates')
ax.set_xlabel('Learning Rate for Step 1')
ax.set_ylabel('Learning Rate for Step 2')
ax.set_zlabel('Mean ROC AUC')
ax.set_title('Hyperparameter Tuning: ROC AUC vs. Learning Rate')
plt.grid(True)
plt.savefig('Parameter_tuning/LearningRate_tuning/ResNet50.png'), plt.show()
plt.show()
    
# Save the results
pd.DataFrame(learning_rates_pt).to_csv('Parameters_preTrainedCNNs.csv')

### Train N times

In [None]:
learning_rates_pt = pd.read_csv('Parameters_preTrainedCNNs.csv', index_col = 0)
trials_results = pd.read_csv('CNN_Results_'+str(N)+'trials.csv', index_col = 0).to_dict(orient='records')

In [None]:
# Initialize the splitter to split the dataset into 85% train and 15% tests sets N times 
splitter = StratifiedShuffleSplit(n_splits=N, test_size=int(round(0.15*len(labels))), random_state = seed)

# Get the tuned learning rates
lr_freeze = learning_rates_pt['ResNet50']['lr_freeze']
lr_unfreeze = learning_rates_pt['ResNet50']['lr_unfreeze']
    
for trial, (train_index, test_index) in enumerate(splitter.split(front, labels)):
        
    print(f'Trial {trial + 1}'), print()

    ## Split the dataset into train (85%) and test (15%) sets
    train_front = front[train_index]
    train_L90 = L90[train_index]
    train_R90 = R90[train_index]
    train_labels = labels[train_index]
        
    test_front = front[test_index]
    test_L90 = L90[test_index]
    test_R90 = R90[test_index]
    test_labels = labels[test_index]
    
    ## Stack views along the channel dimension
    images_train = np.stack((train_front[:,:,:,0], train_L90[:,:,:,0], train_R90[:,:,:,0]), axis=-1)
    images_test = np.stack((test_front[:,:,:,0], test_L90[:,:,:,0], test_R90[:,:,:,0]), axis=-1)
    
    ## Calculate the weight for training to address class imbalance
    rate_train = sum(train_labels == 0) / sum(train_labels)
    
    ## Build the model
    # Load the base model
    baseModel = ResNet50(weights='imagenet', include_top=False, input_shape=(images.shape[1], images.shape[2], 3))
    
    # Initially freeze all layers of the base model
    for layer in baseModel.layers:
        layer.trainable = False
    
    # Build the full model
    model = Sequential()
    model.add(baseModel)
    model.add(Flatten())
    model.add(Dense(256, activation='relu'))
    model.add(Dropout(0.5))
    model.add(Dense(1, activation='sigmoid'))


    ## Step 1: Train only the fully connected layers
    model.compile(loss='binary_crossentropy',
                  metrics=['accuracy', 'AUC', 'TruePositives', 'FalseNegatives', 'TrueNegatives', 'FalsePositives', 'Precision', 'Recall', weighted_error],
                  optimizer=SGD(learning_rate=lr_freeze))

    # Train the model
    ti = time.time()
    history1 = model.fit(
        train_images, train_labels,
        class_weight={0: 1, 1: rate_train},
        validation_data=(val_images, val_labels),
        epochs=epochs_freeze,
        batch_size=batch_size,
        #callbacks=[NaNCheckCallback()],
        verbose=0  # Suppress training output
    )

    # Print the best validation AUC for in this step
    best_auc = max(history1.history['val_AUC'])
    print(f'Max test ROC AUC after step 1 of fine-tuning: {best_auc}')
    print()
        

    ## Step 2: Unfreeze part of the base model
    total_layers = len(baseModel.layers)
    unfreeze_layers = int(total_layers * perc_unfreeze)

    for layer in baseModel.layers[total_layers - unfreeze_layers:]:
        layer.trainable = True

    # Recompile the model with a reduced learning rate
    model.compile(loss='binary_crossentropy',
                  metrics=['accuracy', 'AUC', 'TruePositives', 'FalseNegatives', 'TrueNegatives', 'FalsePositives', 'Precision', 'Recall', weighted_error],
                  optimizer=SGD(learning_rate=lr_unfreeze))

    # Train the model for fine-tuning
    history2 = model.fit(
        train_images, train_labels,
        class_weight={0: 1, 1: rate_train},
        validation_data=(val_images, val_labels),
        epochs=epochs-epochs_freeze,
        batch_size=batch_size,
        #callbacks=[NaNCheckCallback()],
        verbose=0
    )
        
    # Print the best validation AUC for in this step
    best_auc = max(history2.history['val_AUC'])
    print(f'Max test ROC AUC after step 2 of fine-tuning: {best_auc}')

    train_time = time.time() - ti
    hours, remainder = divmod(train_time, 3600)
    minutes, seconds = divmod(remainder, 60)
    print(f'Training took {hours} hours, {minutes} minutes, and {seconds} seconds.')

    ## Save learning curves
    plt.figure(figsize=(30, 10))
    plt.subplot(1, 3, 1)
    plt.plot(range(epochs), history1.history['loss']+history2.history['loss'], label='Train Loss')
    plt.plot(range(epochs), history1.history['val_loss']+history2.history['val_loss'], label='Test Loss')
    plt.legend(loc='upper right',fontsize=10)
    plt.title('Train and Test Loss',fontsize=12)
    
    plt.subplot(1, 3, 2)
    plt.plot(range(epochs), history1.history['accuracy']+history2.history['accuracy'], label='Training Accuracy')
    plt.plot(range(epochs), history1.history['val_accuracy']+history2.history['val_accuracy'], label='Validation Accuracy')
    plt.legend(loc='lower right',fontsize=10)
    plt.title('Train and Test Accuracy',fontsize=12)
    
    plt.subplot(1, 3, 3)
    plt.plot(range(epochs), history1.history['AUC']+history2.history['AUC'], label='Train AUC')
    plt.plot(range(epochs), history1.history['val_AUC']+history2.history['val_AUC'], label='Test AUC')
    plt.legend(loc='lower right',fontsize=10)
    plt.title('Train and Test ROC AUC',fontsize=12)
    
    plt.savefig('Learning_curves/ResNet50_'+str(trial+1)+'.png'), plt.show()
    plt.show()

    ## Save the model
    model.save('Models/ResNet50_'+str(trial+1)+'.h5')

    ## Print the number of parameters in the model
    total_params = model.count_params()  # total_params = np.sum([np.prod(v.shape.as_list()) for v in model.variables])
    trainable_params = np.sum([np.prod(v.shape.as_list()) for v in model.trainable_variables])
    print(f'Classifier has {total_params} total parameters, {trainable_params} of which are trainable.'), print()

    
    ## Predict
    train_preds = model.predict(images_train)
    np.save('Predictions/ResNet50_train_'+str(trial+1)+'.npy',train_preds)
    test_preds = model.predict(images_test)
    np.save('Predictions/ResNet50_test_'+str(trial+1)+'.npy',test_preds)

    ## Evaluate the model
    results_train = evaluate_model(train_preds, train_labels)
    print('TRAIN results:')
    for metric, value in results_train.items():
        print(f'{metric}: {value:.4f}' if isinstance(value, (float, int)) else f'{metric}: {value}')
    print()

    results_test = evaluate_model(test_preds, test_labels)
    print('TEST results:')
    for metric, value in results_test.items():
        print(f'{metric}: {value:.4f}' if isinstance(value, (float, int)) else f'{metric}: {value}')
    print()

    ## Store results
    trials_results.append(
        {'classifier':'ResNet50',
         'trial':trial+1,
         'parameters':trainable_params,
         'trainTime':train_time,
         **{'train_'+k:v for k,v in results_train.items()},
         **{'test_'+k:v for k,v in results_test.items()}
        }
    )    

    ## Clear TensorFlow session to release GPU memory
    tf.keras.backend.clear_session()
    del model
    gc.collect()

pd.DataFrame(trials_results).to_csv('CNN_Results_'+str(N)+'trials.csv')

## MobileNet V3

In [None]:
from tensorflow.keras.applications import MobileNet

### Tune hyperparameters

In [None]:
## Run hyperparameter tuning
# Objective function for the specific pre-trained CNN
objective_pt = lambda params: objective_preTrainedCNN(params, MobileNet)

# Create trials object to store optimization results
trials = Trials()

# Run optimization
ti = time.time()
best = fmin(
    fn=objective_pt,
    space=space_preTrainedCNN,
    algo=tpe.suggest,
    max_evals=max_evals,
    trials = trials
)
tuningTime = time.time() - ti

hours, remainder = divmod(tuningTime, 3600)
minutes, seconds = divmod(remainder, 60)
print(f'Hyperparameter tuning took {hours} hours, {minutes} minutes, and {seconds} seconds.')

print("Best hyperparameters:", space_eval(space_preTrainedCNN, best))

learning_rates_pt['MobileNet'] = {'lr_freeze' : best['lr_freeze'], 'lr_unfreeze' : best['lr_unfreeze']}

# Plot the parameter tuning process
lrs_freeze = np.asarray([trial['misc']['vals']['lr_freeze'][0] for trial in trials.trials])
lrs_unfreeze = np.asarray([trial['misc']['vals']['lr_unfreeze'][0] for trial in trials.trials])
losses = np.asarray([trial['result']['loss'] for trial in trials.trials])
    
# Plot the results
fig = plt.figure(figsize=(10, 6))
ax = fig.add_subplot(projection='3d')
ax.scatter(lrs_freeze, lrs_unfreeze, -losses, c=losses, cmap='viridis', label='Loss vs. Learning Rates')
ax.set_xlabel('Learning Rate for Step 1')
ax.set_ylabel('Learning Rate for Step 2')
ax.set_zlabel('Mean ROC AUC')
ax.set_title('Hyperparameter Tuning: ROC AUC vs. Learning Rate')
plt.grid(True)
plt.savefig('Parameter_tuning/LearningRate_tuning/MobileNet.png'), plt.show()
plt.show()
    
# Save the results
pd.DataFrame(learning_rates_pt).to_csv('Parameters_preTrainedCNNs.csv')

### Train N times

In [None]:
learning_rates_pt = pd.read_csv('Parameters_preTrainedCNNs.csv', index_col = 0)
trials_results = pd.read_csv('CNN_Results_'+str(N)+'trials.csv', index_col = 0).to_dict(orient='records')

In [None]:
# Initialize the splitter to split the dataset into 85% train and 15% tests sets N times 
splitter = StratifiedShuffleSplit(n_splits=N, test_size=int(round(0.15*len(labels))), random_state = seed)

# Get the tuned learning rates
lr_freeze = learning_rates_pt['MobileNet']['lr_freeze']
lr_unfreeze = learning_rates_pt['MobileNet']['lr_unfreeze']
    
for trial, (train_index, test_index) in enumerate(splitter.split(front, labels)):
        
    print(f'Trial {trial + 1}'), print()

    ## Split the dataset into train (85%) and test (15%) sets
    train_front = front[train_index]
    train_L90 = L90[train_index]
    train_R90 = R90[train_index]
    train_labels = labels[train_index]
        
    test_front = front[test_index]
    test_L90 = L90[test_index]
    test_R90 = R90[test_index]
    test_labels = labels[test_index]
    
    ## Stack views along the channel dimension
    images_train = np.stack((train_front[:,:,:,0], train_L90[:,:,:,0], train_R90[:,:,:,0]), axis=-1)
    images_test = np.stack((test_front[:,:,:,0], test_L90[:,:,:,0], test_R90[:,:,:,0]), axis=-1)
    
    ## Calculate the weight for training to address class imbalance
    rate_train = sum(train_labels == 0) / sum(train_labels)
    
    ## Build the model
    # Load the base model
    baseModel = MobileNet(weights='imagenet', include_top=False, input_shape=(images.shape[1], images.shape[2], 3))
    
    # Initially freeze all layers of the base model
    for layer in baseModel.layers:
        layer.trainable = False
    
    # Build the full model
    model = Sequential()
    model.add(baseModel)
    model.add(Flatten())
    model.add(Dense(256, activation='relu'))
    model.add(Dropout(0.5))
    model.add(Dense(1, activation='sigmoid'))


    ## Step 1: Train only the fully connected layers
    model.compile(loss='binary_crossentropy',
                  metrics=['accuracy', 'AUC', 'TruePositives', 'FalseNegatives', 'TrueNegatives', 'FalsePositives', 'Precision', 'Recall', weighted_error], 
                  optimizer=SGD(learning_rate=lr_freeze))

    # Train the model
    ti = time.time()
    history1 = model.fit(
        train_images, train_labels,
        class_weight={0: 1, 1: rate_train},
        validation_data=(val_images, val_labels),
        epochs=epochs_freeze,
        batch_size=batch_size,
        #callbacks=[NaNCheckCallback()],
        verbose=0  # Suppress training output
    )

    # Print the best validation AUC for in this step
    best_auc = max(history1.history['val_AUC'])
    print(f'Max test ROC AUC after step 1 of fine-tuning: {best_auc}')
    print()
        

    ## Step 2: Unfreeze part of the base model
    total_layers = len(baseModel.layers)
    unfreeze_layers = int(total_layers * perc_unfreeze)

    for layer in baseModel.layers[total_layers - unfreeze_layers:]:
        layer.trainable = True

    # Recompile the model with a reduced learning rate
    model.compile(loss='binary_crossentropy',
                  metrics=['accuracy', 'AUC', 'TruePositives', 'FalseNegatives', 'TrueNegatives', 'FalsePositives', 'Precision', 'Recall', weighted_error],  
                  optimizer=SGD(learning_rate=lr_unfreeze))

    # Train the model for fine-tuning
    history2 = model.fit(
        train_images, train_labels,
        class_weight={0: 1, 1: rate_train},
        validation_data=(val_images, val_labels),
        epochs=epochs-epochs_freeze,
        batch_size=batch_size,
        #callbacks=[NaNCheckCallback()],
        verbose=0
    )
        
    # Print the best validation AUC for in this step
    best_auc = max(history2.history['val_AUC'])
    print(f'Max test ROC AUC after step 2 of fine-tuning: {best_auc}')

    train_time = time.time() - ti
    hours, remainder = divmod(train_time, 3600)
    minutes, seconds = divmod(remainder, 60)
    print(f'Training took {hours} hours, {minutes} minutes, and {seconds} seconds.')

    ## Save learning curves
    plt.figure(figsize=(30, 10))
    plt.subplot(1, 3, 1)
    plt.plot(range(epochs), history1.history['loss']+history2.history['loss'], label='Train Loss')
    plt.plot(range(epochs), history1.history['val_loss']+history2.history['val_loss'], label='Test Loss')
    plt.legend(loc='upper right',fontsize=10)
    plt.title('Train and Test Loss',fontsize=12)
    
    plt.subplot(1, 3, 2)
    plt.plot(range(epochs), history1.history['accuracy']+history2.history['accuracy'], label='Training Accuracy')
    plt.plot(range(epochs), history1.history['val_accuracy']+history2.history['val_accuracy'], label='Validation Accuracy')
    plt.legend(loc='lower right',fontsize=10)
    plt.title('Train and Test Accuracy',fontsize=12)
    
    plt.subplot(1, 3, 3)
    plt.plot(range(epochs), history1.history['AUC']+history2.history['AUC'], label='Train AUC')
    plt.plot(range(epochs), history1.history['val_AUC']+history2.history['val_AUC'], label='Test AUC')
    plt.legend(loc='lower right',fontsize=10)
    plt.title('Train and Test ROC AUC',fontsize=12)
    
    plt.savefig('Learning_curves/MobileNet_'+str(trial+1)+'.png'), plt.show()
    plt.show()

    ## Save the model
    model.save('Models/MobileNet_'+str(trial+1)+'.h5')

    ## Print the number of parameters in the model
    total_params = model.count_params()  # total_params = np.sum([np.prod(v.shape.as_list()) for v in model.variables])
    trainable_params = np.sum([np.prod(v.shape.as_list()) for v in model.trainable_variables])
    print(f'Classifier has {total_params} total parameters, {trainable_params} of which are trainable.'), print()

    
    ## Predict
    train_preds = model.predict(images_train)
    np.save('Predictions/MobileNet_train_'+str(trial+1)+'.npy',train_preds)
    test_preds = model.predict(images_test)
    np.save('Predictions/MobileNet_test_'+str(trial+1)+'.npy',test_preds)

    ## Evaluate the model
    results_train = evaluate_model(train_preds, train_labels)
    print('TRAIN results:')
    for metric, value in results_train.items():
        print(f'{metric}: {value:.4f}' if isinstance(value, (float, int)) else f'{metric}: {value}')
    print()

    results_test = evaluate_model(test_preds, test_labels)
    print('TEST results:')
    for metric, value in results_test.items():
        print(f'{metric}: {value:.4f}' if isinstance(value, (float, int)) else f'{metric}: {value}')
    print()

    ## Store results
    trials_results.append(
        {'classifier':'MobileNet',
         'trial':trial+1,
         'parameters':trainable_params,
         'trainTime':train_time,
         **{'train_'+k:v for k,v in results_train.items()},
         **{'test_'+k:v for k,v in results_test.items()}
        }
    )    

    ## Clear TensorFlow session to release GPU memory
    tf.keras.backend.clear_session()
    del model
    gc.collect()

pd.DataFrame(trials_results).to_csv('CNN_Results_'+str(N)+'trials.csv')

## Inception V3

In [None]:
from tensorflow.keras.applications import InceptionV3

### Tune hyperparameters

In [None]:
## Run hyperparameter tuning
# Objective function for the specific pre-trained CNN
objective_pt = lambda params: objective_preTrainedCNN(params, InceptionV3)

# Create trials object to store optimization results
trials = Trials()

# Run optimization
ti = time.time()
best = fmin(
    fn=objective_pt,
    space=space_preTrainedCNN,
    algo=tpe.suggest,
    max_evals=max_evals,
    trials = trials
)
tuningTime = time.time() - ti

hours, remainder = divmod(tuningTime, 3600)
minutes, seconds = divmod(remainder, 60)
print(f'Hyperparameter tuning took {hours} hours, {minutes} minutes, and {seconds} seconds.')

print("Best hyperparameters:", space_eval(space_preTrainedCNN, best))

learning_rates_pt['InceptionV3'] = {'lr_freeze' : best['lr_freeze'], 'lr_unfreeze' : best['lr_unfreeze']}

# Plot the parameter tuning process
lrs_freeze = np.asarray([trial['misc']['vals']['lr_freeze'][0] for trial in trials.trials])
lrs_unfreeze = np.asarray([trial['misc']['vals']['lr_unfreeze'][0] for trial in trials.trials])
losses = np.asarray([trial['result']['loss'] for trial in trials.trials])
    
# Plot the results
fig = plt.figure(figsize=(10, 6))
ax = fig.add_subplot(projection='3d')
ax.scatter(lrs_freeze, lrs_unfreeze, -losses, c=losses, cmap='viridis', label='Loss vs. Learning Rates')
ax.set_xlabel('Learning Rate for Step 1')
ax.set_ylabel('Learning Rate for Step 2')
ax.set_zlabel('Mean ROC AUC')
ax.set_title('Hyperparameter Tuning: ROC AUC vs. Learning Rate')
plt.grid(True)
plt.savefig('Parameter_tuning/LearningRate_tuning/InceptionV3.png'), plt.show()
plt.show()
    
# Save the results
pd.DataFrame(learning_rates_pt).to_csv('Parameters_preTrainedCNNs.csv')

### Train N times

In [None]:
learning_rates_pt = pd.read_csv('Parameters_preTrainedCNNs.csv', index_col = 0)
trials_results = pd.read_csv('CNN_Results_'+str(N)+'trials.csv', index_col = 0).to_dict(orient='records')

In [None]:
# Initialize the splitter to split the dataset into 85% train and 15% tests sets N times 
splitter = StratifiedShuffleSplit(n_splits=N, test_size=int(round(0.15*len(labels))), random_state = seed)

# Get the tuned learning rates
lr_freeze = learning_rates_pt['InceptionV3']['lr_freeze']
lr_unfreeze = learning_rates_pt['InceptionV3']['lr_unfreeze']
    
for trial, (train_index, test_index) in enumerate(splitter.split(front, labels)):
        
    print(f'Trial {trial + 1}'), print()

    ## Split the dataset into train (85%) and test (15%) sets
    train_front = front[train_index]
    train_L90 = L90[train_index]
    train_R90 = R90[train_index]
    train_labels = labels[train_index]
        
    test_front = front[test_index]
    test_L90 = L90[test_index]
    test_R90 = R90[test_index]
    test_labels = labels[test_index]
    
    ## Stack views along the channel dimension
    images_train = np.stack((train_front[:,:,:,0], train_L90[:,:,:,0], train_R90[:,:,:,0]), axis=-1)
    images_test = np.stack((test_front[:,:,:,0], test_L90[:,:,:,0], test_R90[:,:,:,0]), axis=-1)
    
    ## Calculate the weight for training to address class imbalance
    rate_train = sum(train_labels == 0) / sum(train_labels)
    
    ## Build the model
    # Load the base model
    baseModel = InceptionV3(weights='imagenet', include_top=False, input_shape=(images.shape[1], images.shape[2], 3))
    
    # Initially freeze all layers of the base model
    for layer in baseModel.layers:
        layer.trainable = False
    
    # Build the full model
    model = Sequential()
    model.add(baseModel)
    model.add(Flatten())
    model.add(Dense(256, activation='relu'))
    model.add(Dropout(0.5))
    model.add(Dense(1, activation='sigmoid'))


    ## Step 1: Train only the fully connected layers
    model.compile(loss='binary_crossentropy',
                  metrics=['accuracy', 'AUC', 'TruePositives', 'FalseNegatives', 'TrueNegatives', 'FalsePositives', 'Precision', 'Recall', weighted_error],
                  optimizer=SGD(learning_rate=lr_freeze))

    # Train the model
    ti = time.time()
    history1 = model.fit(
        train_images, train_labels,
        class_weight={0: 1, 1: rate_train},
        validation_data=(val_images, val_labels),
        epochs=epochs_freeze,
        batch_size=batch_size,
        #callbacks=[NaNCheckCallback()],
        verbose=0  # Suppress training output
    )

    # Print the best validation AUC for in this step
    best_auc = max(history1.history['val_AUC'])
    print(f'Max test ROC AUC after step 1 of fine-tuning: {best_auc}')
    print()
        

    ## Step 2: Unfreeze part of the base model
    total_layers = len(baseModel.layers)
    unfreeze_layers = int(total_layers * perc_unfreeze)

    for layer in baseModel.layers[total_layers - unfreeze_layers:]:
        layer.trainable = True

    # Recompile the model with a reduced learning rate
    model.compile(loss='binary_crossentropy',
                  metrics=['accuracy', 'AUC', 'TruePositives', 'FalseNegatives', 'TrueNegatives', 'FalsePositives', 'Precision', 'Recall', weighted_error],  
                  optimizer=SGD(learning_rate=lr_unfreeze))

    # Train the model for fine-tuning
    history2 = model.fit(
        train_images, train_labels,
        class_weight={0: 1, 1: rate_train},
        validation_data=(val_images, val_labels),
        epochs=epochs-epochs_freeze,
        batch_size=batch_size,
        #callbacks=[NaNCheckCallback()],
        verbose=0
    )
        
    # Print the best validation AUC for in this step
    best_auc = max(history2.history['val_AUC'])
    print(f'Max test ROC AUC after step 2 of fine-tuning: {best_auc}')

    train_time = time.time() - ti
    hours, remainder = divmod(train_time, 3600)
    minutes, seconds = divmod(remainder, 60)
    print(f'Training took {hours} hours, {minutes} minutes, and {seconds} seconds.')

    ## Save learning curves
    plt.figure(figsize=(30, 10))
    plt.subplot(1, 3, 1)
    plt.plot(range(epochs), history1.history['loss']+history2.history['loss'], label='Train Loss')
    plt.plot(range(epochs), history1.history['val_loss']+history2.history['val_loss'], label='Test Loss')
    plt.legend(loc='upper right',fontsize=10)
    plt.title('Train and Test Loss',fontsize=12)
    
    plt.subplot(1, 3, 2)
    plt.plot(range(epochs), history1.history['accuracy']+history2.history['accuracy'], label='Training Accuracy')
    plt.plot(range(epochs), history1.history['val_accuracy']+history2.history['val_accuracy'], label='Validation Accuracy')
    plt.legend(loc='lower right',fontsize=10)
    plt.title('Train and Test Accuracy',fontsize=12)
    
    plt.subplot(1, 3, 3)
    plt.plot(range(epochs), history1.history['AUC']+history2.history['AUC'], label='Train AUC')
    plt.plot(range(epochs), history1.history['val_AUC']+history2.history['val_AUC'], label='Test AUC')
    plt.legend(loc='lower right',fontsize=10)
    plt.title('Train and Test ROC AUC',fontsize=12)
    
    plt.savefig('Learning_curves/InceptionV3_'+str(trial+1)+'.png'), plt.show()
    plt.show()

    ## Save the model
    model.save('Models/InceptionV3_'+str(trial+1)+'.h5')

    ## Print the number of parameters in the model
    total_params = model.count_params()  # total_params = np.sum([np.prod(v.shape.as_list()) for v in model.variables])
    trainable_params = np.sum([np.prod(v.shape.as_list()) for v in model.trainable_variables])
    print(f'Classifier has {total_params} total parameters, {trainable_params} of which are trainable.'), print()

    
    ## Predict
    train_preds = model.predict(images_train)
    np.save('Predictions/InceptionV3_train_'+str(trial+1)+'.npy',train_preds)
    test_preds = model.predict(images_test)
    np.save('Predictions/InceptionV3_test_'+str(trial+1)+'.npy',test_preds)

    ## Evaluate the model
    results_train = evaluate_model(train_preds, train_labels)
    print('TRAIN results:')
    for metric, value in results_train.items():
        print(f'{metric}: {value:.4f}' if isinstance(value, (float, int)) else f'{metric}: {value}')
    print()

    results_test = evaluate_model(test_preds, test_labels)
    print('TEST results:')
    for metric, value in results_test.items():
        print(f'{metric}: {value:.4f}' if isinstance(value, (float, int)) else f'{metric}: {value}')
    print()

    ## Store results
    trials_results.append(
        {'classifier':'InceptionV3',
         'trial':trial+1,
         'parameters':trainable_params,
         'trainTime':train_time,
         **{'train_'+k:v for k,v in results_train.items()},
         **{'test_'+k:v for k,v in results_test.items()}
        }
    )    

    ## Clear TensorFlow session to release GPU memory
    tf.keras.backend.clear_session()
    del model
    gc.collect()

pd.DataFrame(trials_results).to_csv('CNN_Results_'+str(N)+'trials.csv')

# Compare models

In [None]:
trials_results = pd.read_csv('CNN_Results_'+str(N)+'trials.csv', index_col = 0)
trials_results.fillna(1e-10, inplace=True)

In [None]:
## Print statistics
models = trials_results.classifier.unique()
metrics = [c for c in trials_results.columns if 'test_' in c and c not in ['test_TP','test_FP','test_TN','test_FN']]
    
statistics = pd.DataFrame(index=models, columns=[item for sublist in [[metric+'_mean', metric+'_std'] for metric in metrics] for item in sublist])
    
for metric in metrics:
    mn, st = metric+'_mean', metric+'_std'
    for model in models:
        results = trials_results[trials_results['classifier']==model][metric].values
        statistics.at[model,mn] = results.mean()
        statistics.at[model,st] = results.std()

statistics

In [None]:
for metric in [m for m in metrics if 'test' in m]:
    mn, st = metric+'_mean', metric+'_std'
    if 'Loss' in metric or 'WE' in metric:
        model_best = pd.to_numeric(statistics[metric+'_mean']).idxmin()
        print(f'Model with lowest {metric} is {model_best} with value {statistics.loc[model_best,mn]} and standard deviation {statistics.loc[model_best,st]}')
    else:
        model_best = pd.to_numeric(statistics[metric+'_mean']).idxmax()
        print(f'Model with highest {metric} is {model_best} with value {statistics.loc[model_best,mn]} and standard deviation {statistics.loc[model_best,st]}')

In [None]:
## Print mean and std metrics for each model
for classifier_type in trials_results.classifier.unique():
    print(f'Classifier: {classifier_type}')
    results = trials_results[trials_results['classifier'] == classifier_type]

    # Number of parameters
    parameters = results['parameters'].values
    print(f'Mean number of parameters: {parameters.mean()} [{parameters.min()}, {parameters.max()}], std {parameters.std()}')

    # training time
    trainTime = results['trainTime'].values
    hours, remainder = divmod(trainTime.mean(), 3600)
    minutes, seconds = divmod(remainder, 60)
    print(f'Mean training time: {hours} hours, {minutes} minutes, and {seconds} seconds, (std {trainTime.std()} sec)')
    print()
    
    # TRAIN results
    metrics = ['BCELoss','Accuracy','Sensitivity','Specificity','ROC_AUC','Precision','F1','WE']
    for metric in metrics:
        values = results['train_' + metric].values
        print(f'Mean train {metric}: {values.mean()}, std {values.std()}')
    print()

    # TEST results
    for metric in metrics:
        values = results['test_' + metric].values
        print(f'Mean test {metric}: {values.mean()}, std {values.std()}')
    print()

    print('-'*120), print()

In [None]:
## Show boxplots
compared_metrics = ['test_BCELoss','test_Accuracy','test_F1','test_ROC_AUC','test_WE'] 
M = len(compared_metrics)
R = M//2 + int(M % 2 > 0)
plt.figure(figsize=(5*R, 20))
for i,metric in enumerate(compared_metrics):
    plt.subplot(R,2,i+1)
    sns.boxplot(x='classifier', y=metric, data=trials_results, palette='Set2')
    plt.xticks(rotation=25, ha='right')  # Rotate labels 25 degrees
    plt.grid(axis='y')
    plt.xlabel('')
    if 'Loss' in metric or 'WE' in metric:
        plt.ylim([0,trials_results[metric].values.max()+0.05*trials_results[metric].values.max()])
    else:
        plt.ylim([0,1])
    plt.title(metric, fontsize=16)
    #plt.tight_layout()
    
plt.savefig('CNN_Boxplots_allModels.png')
plt.subplots_adjust(hspace=0.3)  # Increase hspace for more vertical spacing
plt.show()

In [None]:
## Statistical model comparison
from scipy.stats import shapiro
from scipy.stats import probplot
from statsmodels.multivariate.manova import MANOVA
from statsmodels.stats.multicomp import pairwise_tukeyhsd
    
models = trials_results.classifier.unique()
compared_metrics = ['test_BCELoss','test_Accuracy','test_F1','test_ROC_AUC','test_WE']
    
## 1. Select independent metrics
#These should be independent. Use the correlation to find dependent variables to remove, if necessary 
print('Check that the metrics are independent:')
print(trials_results[compared_metrics].corr()), print()  # Check for multicollinearity using the correlation matrix

var = trials_results[compared_metrics].var().to_numpy()
compared_metrics = [metric for metric,v in zip(compared_metrics,var) if not v == 0]  # Remove metrics with zero variance across repetitions

## 2. Check for Normality
normality = np.ones(len(compared_metrics))
L = len(models)//5 + (len(models) % 5 > 0)
for i,metric in enumerate(compared_metrics): 
    print(metric)
    ## Check for normality
    plt.figure(figsize=(20,2*L))
    for j,model in enumerate(models):
        # Shapiro-Wilk test
        stat, p_value = shapiro(trials_results[trials_results['classifier']==model][metric])
        print(f'{model}: Shapiro-Wilk p-value={p_value}')
        # If p-value < 0.05, reject normality (non-normal distribution)
        normality[i] *= int(p_value > 0.05)

        # Generate Q-Q plots to visually inspect normality 
        plt.subplot(L,5,j+1)
        probplot(trials_results[trials_results['classifier']==model][metric], dist='norm', plot=plt)
        plt.title(model)
    plt.show()
    print(f'\nNormality test for {metric}: {bool(normality[i])}')
    print(100*'-')
print(), print()
    
    
## 3. Statistical test: MANOVA
dependent_variables = ' + '.join(compared_metrics)
formula = f'{dependent_variables} ~ classifier'
manova = MANOVA.from_formula(formula, data=trials_results)
manova_test = manova.mv_test()
print(manova_test)

## 4. Post-hoc test: Tukey's HSD
if manova_test.results['classifier']['stat']['Pr > F']['Pillai\'s trace'] < 0.05:
    for metric in compared_metrics:
        print(metric)
        tukey = pairwise_tukeyhsd(trials_results[metric],   # Metric data
                                          trials_results['classifier'],   # Grouping variable (model)
                                          alpha=0.05)   # Significance level
        print(tukey)
else:
    print(f"MANOVA is not significant, no need to apply Tukey's test.")

# Selected model

In [None]:
selected_model = 'MultiInputCNN'
results_selected = trials_results[trials_results['classifier']==selected_model]

In [None]:
## Show boxplots
df_long = pd.melt(results_selected[compared_metrics], value_vars=compared_metrics, var_name='metric', value_name='value')
plt.figure(figsize=(8, 5))
sns.boxplot(y='value',x='metric',data=df_long, palette='Set2', fliersize=5)
new_labels = [l.replace('test_','') for l in compared_metrics]
plt.xticks(ticks=range(len(new_labels)), labels=new_labels, fontsize=18, rotation=25, ha='right')
plt.yticks(fontsize=18)
plt.grid(axis='y')
plt.xlabel('', fontsize=18)
plt.ylabel('', fontsize=18)
plt.ylim([-0.05,1.05])
plt.savefig('CNN_Boxplot_'+selected_model+'.png')
plt.show()