In [None]:
from google.colab import drive
drive.mount('/gdrive')
%cd /gdrive/My Drive/lorenzo_hype_tuning

Drive already mounted at /gdrive; to attempt to forcibly remount, call drive.mount("/gdrive", force_remount=True).
/gdrive/My Drive/lorenzo_hype_tuning


# Import libraries

In [None]:
# Fix randomness and hide warnings
seed = 42

import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'
os.environ['PYTHONHASHSEED'] = str(seed)
os.environ['MPLCONFIGDIR'] = os.getcwd()+'/configs/'

import warnings
warnings.simplefilter(action='ignore', category=FutureWarning)
warnings.simplefilter(action='ignore', category=Warning)

import numpy as np
np.random.seed(seed)

import logging

import random
random.seed(seed)
import datetime

In [None]:
# Import tensorflow
import tensorflow as tf
from tensorflow import keras as tfk
from tensorflow.keras import layers as tfkl
!pip install -q -U keras-tuner
import keras_tuner as kt

tf.autograph.set_verbosity(0)
tf.get_logger().setLevel(logging.ERROR)
tf.compat.v1.logging.set_verbosity(tf.compat.v1.logging.ERROR)
tf.random.set_seed(seed)
tf.compat.v1.set_random_seed(seed)
print(tf.__version__)

2.14.0


In [None]:
# Import other libraries
import matplotlib.pyplot as plt
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.utils.class_weight import compute_class_weight
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score, confusion_matrix, roc_curve, auc
import seaborn as sns

# Splitting

In [None]:
processed_data = np.load('../homework1/processed_data.npz')
X = processed_data['X']
y = np.argmax(processed_data['y'], axis = -1)
labels = {0:'healthy', 1:'unhealthy'}
y.shape

(5004,)

In [None]:
X_train_val, X_test, y_train_val, y_test = train_test_split(
    X,
    y,
    test_size=0.1,
    stratify=y,
    random_state=seed
)
# Further split the combined training and validation set into a training set and a validation set
X_train, X_val, y_train, y_val = train_test_split(
    X_train_val,
    y_train_val,
    test_size = len(X_test), # Ensure validation set size matches test set size
    stratify=y_train_val,
    random_state=seed
)

# Print the shapes of the resulting datasets
print("Training_Validation Data Shape:", X_train_val.shape)
print("Training_Validation Label Shape:", y_train_val.shape)
print("Train Data Shape:", X_train.shape)
print("Train Label Shape:", y_train.shape)
print("Validation Data Shape:", X_val.shape)
print("Validation Label Shape:", y_val.shape)
print("Test Data Shape:", X_test.shape)
print("Test Label Shape:", y_test.shape)

Training_Validation Data Shape: (4503, 96, 96, 3)
Training_Validation Label Shape: (4503,)
Train Data Shape: (4002, 96, 96, 3)
Train Label Shape: (4002,)
Validation Data Shape: (501, 96, 96, 3)
Validation Label Shape: (501,)
Test Data Shape: (501, 96, 96, 3)
Test Label Shape: (501,)


In [None]:
# Display the count of occurrences of target classes in the training-validation dataset
print('Counting occurrences of y_train classes:')
print(pd.DataFrame(y_train, columns=['class']).value_counts())

print('Counting occurrences of y_val classes:')
print(pd.DataFrame(y_val, columns=['class']).value_counts())

print('Counting occurrences of y_test classes:')
print(pd.DataFrame(y_test, columns=['class']).value_counts())


Counting occurrences of y_train classes:
class
0        2480
1        1522
dtype: int64
Counting occurrences of y_val classes:
class
0        311
1        190
dtype: int64
Counting occurrences of y_test classes:
class
0        310
1        191
dtype: int64


In [None]:
class_weight_dict = dict(zip(np.unique(y_train), compute_class_weight('balanced', classes=np.unique(y_train), y=y_train)))
alpha = 1 - class_weight_dict[0]
print(class_weight_dict, alpha)

{0: 0.8068548387096774, 1: 1.3147174770039423} 0.19314516129032255


# ConvNeXtBase

In [None]:
# Define batch size, number of epochs, learning rate, input shape, and output shape
batch_size = 16
epochs = 200
es_patience = 10
rp_patience = 20
rp_min_lr = 1e-5
rp_factor = 0.1


input_shape = X_train.shape[1:]
output_shape = 1

# Define two callback functions for early stopping and learning rate reduction
callbacks=[
    tfk.callbacks.EarlyStopping(monitor='val_accuracy', patience=es_patience, restore_best_weights=True, mode='max'),
    tfk.callbacks.ReduceLROnPlateau(monitor="val_accuracy", factor=rp_factor, patience=rp_patience, min_lr=rp_min_lr, mode='max')
]

In [None]:
def build_model(
    hp,
    input_shape=input_shape,
    output_shape=output_shape,
):
    hp_dropout_1 = hp.Float(name='dropout_1', min_value = 0.1, max_value = 0.5, default = 0.2, step=0.05)
    hp_is_dropout_2 = hp.Boolean('is_dropout_2', default=True)
    hp_dropout_2 = hp.Float(name='dropout_2', min_value = 0.1, max_value = 0.5, default = 0.3, step=0.05)
    hp_to_freeze = hp.Choice(name='to_freeze', values=[161, 181, 221, 237], default=221)
    hp_gauss_noise = hp.Float(name='gauss_noise', min_value = 0.05, max_value = 0.3, default = 0.15, step=0.05)
    hp_init =  hp.Choice(name='init', values=['he_uniform', 'glorot_uniform'])
    hp_units_1 = hp.Choice(name='units_1', values=[256, 512], default=512)
    hp_units_2 = hp.Choice(name='units_2', values=[32, 64, 128], default=128)
    hp_learning_rate = hp.Float("learning_rate", min_value=1e-4, max_value=1e-2, sampling='log')
    hp_weight_decay = hp.Float('weight_decay', min_value=1e-4, max_value=1e-2, sampling='log')
    hp_alpha= hp.Float('alpha', min_value=0.1, max_value=0.3, default=alpha, step=0.05)
    hp_gamma= hp.Float('gamma', min_value=1.8, max_value=2.2, default=2.0, step=0.05)


    preprocessing = tfk.Sequential([
        tfkl.GaussianNoise(hp_gauss_noise),
        tfkl.RandomFlip('horizontal'),
        tfkl.RandomTranslation(height_factor=(-0.2, 0.3), width_factor=(-0.2, 0.3))
    ], name='Preprocessing')

    input_layer = tfkl.Input(shape=input_shape, name='Input_Layer')

    baseline_model = tfk.applications.ConvNeXtBase(
        weights='imagenet',
        include_top=False,
        input_shape=input_shape
    )
    # Freeze first N layers
    N = hp_to_freeze
    baseline_model.trainable = True
    for i, layer in enumerate(baseline_model.layers[:N]):
        layer.trainable=False

    x = preprocessing(input_layer)

    x = baseline_model(x)

    x = tfkl.GlobalAveragePooling2D(name='GlobalAveragePooling')(x)

    x = tfkl.Dense(hp_units_1, kernel_initializer=hp_init)(x)
    x = tfkl.BatchNormalization()(x)
    x = tfkl.Activation(activation='relu')(x)

    x = tfkl.Dropout(hp_dropout_1)(x)

    x = tfkl.Dense(hp_units_2, kernel_initializer=hp_init)(x)
    x = tfkl.BatchNormalization()(x)
    x = tfkl.Activation(activation='relu')(x)

    if (hp_is_dropout_2):
      x = tfkl.Dropout(hp_dropout_2)(x)


    output_layer = tfkl.Dense(output_shape, name='output_layer', activation='sigmoid')(x)

    # Create the model
    model = tfk.Model(inputs=input_layer, outputs=output_layer)

    loss = tfk.losses.BinaryFocalCrossentropy(
        apply_class_balancing=True,
        alpha=hp_alpha,
        gamma=hp_gamma
    )
    model.compile(loss=loss, optimizer=tfk.optimizers.AdamW(hp_learning_rate, weight_decay=hp_weight_decay), metrics=['accuracy'])
    return model

# Hypesearch

In [None]:
hp = kt.HyperParameters()

fixed_hs = {
#     'dropout_1': 0.3,
#     'dropout_2': 0.3,
#     'is_dropout_2': True,
#     'to_freeze': 181,
#     'gauss_noise': 0.1,
#     'init': 'he_uniform',
#     'units_1': 512,
#     'units_1': 256,
#     'learning_rate': 1e-4,
#     'weight_decay': 5e-4,
#     'alpha': alpha,
#     'gamma': 2.0,
}

for key, value in fixed_hs.items():
  hp.Fixed(name=key, value=value)

tuner = kt.Hyperband(
    hypermodel=build_model,
    hyperparameters=hp,
    max_epochs=40,
    objective='val_accuracy',
    directory='hyperband_search_dir_2',
    project_name='convnext_base',
    overwrite = False
)
tuner.search_space_summary()

Reloading Tuner from hyperband_search_dir_2/convnext_base/tuner0.json
Search space summary
Default search space size: 12
dropout_1 (Float)
{'default': 0.2, 'conditions': [], 'min_value': 0.1, 'max_value': 0.5, 'step': 0.05, 'sampling': 'linear'}
is_dropout_2 (Boolean)
{'default': True, 'conditions': []}
dropout_2 (Float)
{'default': 0.3, 'conditions': [], 'min_value': 0.1, 'max_value': 0.5, 'step': 0.05, 'sampling': 'linear'}
to_freeze (Choice)
{'default': 221, 'conditions': [], 'values': [161, 181, 221, 237], 'ordered': True}
gauss_noise (Float)
{'default': 0.15, 'conditions': [], 'min_value': 0.05, 'max_value': 0.3, 'step': 0.05, 'sampling': 'linear'}
init (Choice)
{'default': 'he_uniform', 'conditions': [], 'values': ['he_uniform', 'glorot_uniform'], 'ordered': False}
units_1 (Choice)
{'default': 512, 'conditions': [], 'values': [256, 512], 'ordered': True}
units_2 (Choice)
{'default': 128, 'conditions': [], 'values': [32, 64, 128], 'ordered': True}
learning_rate (Float)
{'default':

In [None]:
tuner.search(
    X_train * 255,
    y_train,
    validation_data=(X_val * 255,y_val),
    epochs=epochs,
    batch_size=batch_size,
    callbacks=callbacks
)

Trial 30 Complete [00h 04m 11s]
val_accuracy: 0.8463073968887329

Best val_accuracy So Far: 0.8702594637870789
Total elapsed time: 02h 31m 46s

Search: Running Trial #31

Value             |Best Value So Far |Hyperparameter
0.5               |0.45              |dropout_1
False             |True              |is_dropout_2
0.15              |0.2               |dropout_2
237               |237               |to_freeze
0.05              |0.05              |gauss_noise
glorot_uniform    |glorot_uniform    |init
256               |512               |units_1
128               |128               |units_2
0.0089263         |0.00041236        |learning_rate
0.00058644        |0.00075142        |weight_decay
0.2               |0.2               |alpha
1.8               |2.15              |gamma
2                 |2                 |tuner/epochs
0                 |0                 |tuner/initial_epoch
3                 |3                 |tuner/bracket
0                 |0                 |tuner/

# Build final model

In [None]:
model = build_model()
model.summary()

In [None]:
for i, layer in enumerate(model.get_layer('convnext_base').layers):
  print(i, layer.trainable, layer.name)

In [None]:
history = model.fit(
    X_train * 255,
    y_train,
    validation_data=(X_val * 255,y_val),
    epochs=epochs,
    batch_size=batch_size,
    verbose=1,
    callbacks=callbacks,
    class_weight=class_weight_dict
).history

In [None]:
model.save('convnext_base')

In [None]:
# Plot the transfer learning xception training histories
plt.figure(figsize=(15,5))
plt.plot(history['loss'], alpha=.3, label = 'loss', color='#ff7f0e', linestyle='--')
plt.plot(history['val_loss'], label='val_loss', alpha=.8, color='#ff7f0e')
plt.legend(loc='upper left')
plt.title('Binary Crossentropy')
plt.grid(alpha=.3)

plt.figure(figsize=(15,5))
plt.plot(history['accuracy'], alpha=.3, label='accuracy', color='#ff7f0e', linestyle='--')
plt.plot(history['val_accuracy'], label='val_accuracy', alpha=.8, color='#ff7f0e')
plt.legend(loc='upper left')
plt.title('Accuracy')
plt.grid(alpha=.3)

plt.show()

# Make Inference



In [None]:
def compute_metrics(y_true, y_pred):
    accuracy = accuracy_score(y_true, y_pred).round(4)
    precision = precision_score(y_true, y_pred, average='macro').round(4)
    recall = recall_score(y_true, y_pred, average='macro').round(4)
    f1 = f1_score(y_true, y_pred, average='macro').round(4)

    return {
      'accuracy': accuracy,
      'precision': precision,
      'recall' : recall,
      'f1' : f1
    }
def compute_cm(y_true, y_pred):
  # Compute the confusion matrix
  cm = confusion_matrix(y_true, y_pred)
  # Plot the confusion matrix
  plt.figure(figsize=(10, 8))
  sns.heatmap(cm.T, xticklabels=list(labels.values()), yticklabels=list(labels.values()), cmap='Blues')
  plt.xlabel('True labels')
  plt.ylabel('Predicted labels')
  plt.show()

def compute_roc_curve(y_true, y_pred):
  # Compute ROC curve and AUC
  fpr, tpr, thresholds = roc_curve(y_true, y_pred)
  roc_auc = auc(fpr, tpr)

  # Plot the ROC curve
  plt.figure(figsize=(8, 6))
  plt.plot(fpr, tpr, color='darkorange', lw=2, label='ROC curve (AUC = {:.2f})'.format(roc_auc))
  plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--')
  plt.xlabel('False Positive Rate')
  plt.ylabel('True Positive Rate')
  plt.title('Receiver Operating Characteristic (ROC) Curve')
  plt.legend(loc="lower right")
  plt.show()

def compute_optimal_threshold(y_true, y_pred):
  fpr, tpr, thresholds = roc_curve(y_true, y_pred)
  roc_auc = auc(fpr, tpr)

  # Find the optimal threshold based on the Youden's J statistic
  optimal_idx = np.argmax(tpr - fpr)
  optimal_threshold = thresholds[optimal_idx]
  return optimal_threshold

def make_inference(y_true, y_pred, use_threshold=True):
  optimal_threshold = 0.5
  if (use_threshold):
      optimal_threshold = compute_optimal_threshold(y_true, y_pred)
      print("Optimal Threshold:", optimal_threshold)


  y_pred_t = (y_pred > optimal_threshold).astype(int)
  metrics = compute_metrics(y_true, y_pred_t)
  # Display the computed metrics
  print(datetime.datetime.now(), '-', optimal_threshold ,' - ', metrics, ' - ', hs)

In [None]:
y_test_true = y_test
y_test_pred = np.squeeze(model.predict(X_test * 255, verbose=1), axis=-1)
make_inference(y_test_true, y_test_pred, use_threshold=True)