# Transfer Learning

### Import Libraries

In [22]:

import pathlib
import os
import matplotlib.pyplot as plt
import numpy as np
import tensorflow as tf
from tensorflow.keras import layers
from tensorflow.keras.models import Sequential
from tensorflow.keras.callbacks import EarlyStopping
from sklearn.model_selection import KFold, train_test_split


## Train with MonkeyPox Dataset

### Hyperparameters

In [23]:
data_root = pathlib.Path("../data/Augmented_Images")    # points to the folder containing the images that will be used for training

# hyperparameters
batch_size = 32         # size of the batch that will be fed to model
img_height = 224        # input image height
img_width = 224         # input image width
NUM_MODELS = 1          # number of models that you wish to train

# k-fold cross-validation parameters
FOLDS = 5               # the amount of folds that will be created for cross-validation

# Fine-tuning parameters
FINE_TUNE_EPOCHS = 10   # number of epochs after which we start fine-tuning
FINE_TUNE_AT = 150      # layer number where we start unfreezing layers

# configurations that will be used in training
configs = [
    {"learning_rate": 0.001, "optimizer": "adam", "epochs": 14, "save_metrics": False},
    # {"learning_rate": 0.0001, "optimizer": "adam", "epochs": 50, "save_metrics": False},
    # {"learning_rate": 0.001, "optimizer": "sgd", "epochs": 50, "save_metrics": False},
    # {"learning_rate": 0.0001, "optimizer": "sgd", "epochs": 50, "save_metrics": False},
]

# Define the base path for saving models
save_dir = "../saved_models"
os.makedirs(save_dir, exist_ok=True)

## Training 

### Setup

In [24]:
# Load dataset without splitting
dataset = tf.keras.utils.image_dataset_from_directory(
    data_root,                                  # loads images from the data_root directory
    image_size=(img_height, img_width),         # resizes all images to (224, 224) pixels
    batch_size=batch_size,                      # set the batch size
    shuffle=True                                # shufle data when loaded
)

class_names = np.array(dataset.class_names)     # get the class names for the data
num_classes = len(class_names)                  # get the number of classes in the dataset

# convert the dataset to a list of (image, label) pairs. This makes it easier to perform cross-validation
image_paths, labels = [], []
for image_batch, label_batch in dataset:
    image_paths.extend(image_batch.numpy())
    labels.extend(label_batch.numpy())

image_paths = np.array(image_paths)             # convert to numpy array to facilitate training
labels = np.array(labels)                       # convert to numpy array to facilitate training

# EarlyStopping callback configuration
early_stopping = EarlyStopping(
    monitor='val_loss',        # Monitor validation loss
    patience=3,                # Number of epochs with no improvement to stop training
    restore_best_weights=True  # Restore model weights from the epoch with the best value of the monitored metric
)

# Split the dataset into training/validation and test sets
train_val_images, test_images, train_val_labels, test_labels = train_test_split(
    image_paths, labels, test_size=0.1, random_state=42, stratify=labels
)

# K-fold Cross Validation
kfold = KFold(n_splits=FOLDS, shuffle=True, random_state=42)
best_val_f1score = -float('inf')  # Initialize best F1 score with a very low value

Found 3192 files belonging to 2 classes.


### Metrics

In [25]:
from sklearn.metrics import precision_score, classification_report, roc_auc_score, recall_score, f1_score, confusion_matrix, ConfusionMatrixDisplay

# plot and save confusion matrix
def save_confusion_matrix(true_labels, predicted_labels, class_names, save_path):
    cm = confusion_matrix(true_labels, predicted_labels)
    disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=class_names)
    disp.plot(cmap=plt.cm.Blues)
    plt.title("Confusion Matrix")
    plt.savefig(save_path)
    plt.close()

# plot and save loss curves
def save_loss_curve(history, save_path):
    plt.figure(figsize=(10, 6))
    plt.plot(history['loss'], label='Training Loss', color='blue')
    plt.plot(history['val_loss'], label='Validation Loss', color='orange')
    plt.title("Training and Validation Loss Over Epochs")
    plt.xlabel("Epochs")
    plt.ylabel("Loss")
    plt.legend()
    plt.grid(True)
    plt.savefig(save_path)
    plt.close()

# compute and plot evaluation metrics (accuracy, sensitivity, specificity, F1 score)
def save_evaluation_metrics(true_labels, predicted_labels, history, cm, save_path):
    accuracy = history['val_accuracy'][-1]
    sensitivity = recall_score(true_labels, predicted_labels, average='macro')
    specificity = np.mean(np.diag(cm) / (np.diag(cm) + np.sum(cm, axis=0) - np.diag(cm)))
    f1 = f1_score(true_labels, predicted_labels, average='macro')

    metrics = {
        "Accuracy": accuracy,
        "Sensitivity (Recall)": sensitivity,
        "Specificity": specificity,
        "F1-Score": f1
    }

    plt.figure(figsize=(10, 6))
    plt.bar(metrics.keys(), metrics.values(), color=['darkturquoise', 'sandybrown', 'hotpink', 'limegreen'])
    plt.title("Model Evaluation Metrics")
    plt.ylim([0, 1])
    plt.yticks(np.arange(0, 1.1, 0.1))
    plt.ylabel("Score")
    plt.savefig(save_path)
    plt.close()
    return metrics

# save classification report
def save_classification_report(true_labels, predicted_labels, class_names, save_path):
    class_report = classification_report(true_labels, predicted_labels, target_names=class_names, digits=4)
    with open(save_path, "w") as f:
        f.write(class_report)

# Function to calculate metrics
def calculate_metrics(true_labels, predictions):
    accuracy = np.mean(np.argmax(predictions, axis=1) == true_labels)
    precision = precision_score(true_labels, np.argmax(predictions, axis=1), average='macro')
    recall = recall_score(true_labels, np.argmax(predictions, axis=1), average='macro')
    f1 = f1_score(true_labels, np.argmax(predictions, axis=1), average='macro')
    auc = roc_auc_score(tf.keras.utils.to_categorical(true_labels), predictions, multi_class='ovr')
    return accuracy, precision, recall, f1, auc

# Function to save metrics, loss curve, and confusion matrix for the best model
def save_best_model_visuals(history, model, val_ds, class_names, weights_path, fold):
    # generate predictions for the validation set
    val_predictions = model.predict(val_ds)
    val_predicted_ids = np.argmax(val_predictions, axis=-1)
    true_labels = np.concatenate([y for x, y in val_ds], axis=0)

    # confusion Matrix
    confusion_matrix_path = os.path.join(weights_path, f"confusion_matrix_fold_{fold}.png")
    save_confusion_matrix(true_labels, val_predicted_ids, class_names, confusion_matrix_path)

    # loss curve
    loss_curve_path = os.path.join(weights_path, f"loss_curve_fold_{fold}.png")
    save_loss_curve(history.history, loss_curve_path)

    # evaluation Metrics (Accuracy, Sensitivity, Specificity, F1 Score)
    cm = confusion_matrix(true_labels, val_predicted_ids)
    metrics_bar_chart_path = os.path.join(weights_path, f"evaluation_metrics_fold_{fold}.png")
    save_evaluation_metrics(true_labels, val_predicted_ids, history.history, cm, metrics_bar_chart_path)

    # save classification report as a text file
    classification_report_path = os.path.join(weights_path, f"classification_report_fold_{fold}.txt")
    save_classification_report(true_labels, val_predicted_ids, class_names, classification_report_path)

### Model creation and fine tuning

In [26]:
# Function to create and compile the model
def create_model(num_classes, config, fine_tune=None):
    # if you are not fine tuning the model, instantiate a new model 
    if(fine_tune == False):         
        # instantiate mobilenet (contains 154 layers)
        base_model = tf.keras.applications.MobileNetV2(
            input_shape=(img_height, img_width, 3),     # set the input it will receive
            include_top=False,                          # do not include top layer to perform transfer learning
            weights='imagenet'                          # load weights from imagenet dataset
        )
        base_model.trainable = False                    # Freeze the base model
        
        # add a layer in order to perform classification on our dataset
        model = Sequential([
            base_model,                                 # use base_model as the start of your model
            layers.GlobalAveragePooling2D(),            # add a final layer to perform classification
            layers.Dense(num_classes)                   # set the number of possible prediction to the num of classes in dataset
        ])
        
    # select optimizer and learning rate based on configuration
    if config["optimizer"] == "adam":
        optimizer = tf.keras.optimizers.Adam(learning_rate=config["learning_rate"])
    elif config["optimizer"] == "sgd":
        optimizer = tf.keras.optimizers.SGD(learning_rate=config["learning_rate"])
    else:
        raise ValueError(f"Unsupported optimizer: {config['optimizer']}")

    # compile the model
    model.compile(
        optimizer=optimizer,
        loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
        metrics=['accuracy']
    )
    
    return model

# fine tune model by unfreezing the layers after the first fine_tune_at layers
def fine_tune_model(base_model, fine_tune_at):
    # Unfreeze the layers starting from fine_tune_at index
    for layer in base_model.layers[:fine_tune_at]:
        layer.trainable = False
    for layer in base_model.layers[fine_tune_at:]:
        layer.trainable = True


### Training loop

In [28]:
train_metrics = []      # list to save training metrics
val_metrics = []        # list to save validation metrics

for i, config in enumerate(configs):
    print(f"Training model {i + 1}/{len(configs)} with config: {config}")

    # Training and validation loop for each fold
    fold = 1
    for train_idx, val_idx in kfold.split(train_val_images):
        print(f"\nFold {fold}/{FOLDS}...")

        # Create subset datasets for training and validation
        train_images, train_labels = train_val_images[train_idx], train_val_labels[train_idx]
        val_images, val_labels = train_val_images[val_idx], train_val_labels[val_idx]

        # Convert NumPy arrays back to TensorFlow datasets
        train_ds = tf.data.Dataset.from_tensor_slices((train_images, train_labels))
        val_ds = tf.data.Dataset.from_tensor_slices((val_images, val_labels))

        # Normalize datasets and batch
        normalization_layer = layers.Rescaling(1./255)
        train_ds = train_ds.map(lambda x, y: (normalization_layer(x), y)).batch(batch_size)
        val_ds = val_ds.map(lambda x, y: (normalization_layer(x), y)).batch(batch_size)

        # prefetch data to improve performance by overlapping data preprocessing and model execution and cache the dataset in memor
        AUTOTUNE = tf.data.AUTOTUNE
        train_ds = train_ds.cache().prefetch(buffer_size=AUTOTUNE)
        val_ds = val_ds.cache().prefetch(buffer_size=AUTOTUNE)

        # Step 1: Train model with frozen layers
        print(f"Training with frozen base layers for {config['epochs']} epochs...")

        # Create and compile model for each fold
        model = create_model(num_classes, config, fine_tune=False)

        # train the model on the training set until the epochs specified
        history_frozen = model.fit(
            train_ds,                           # dataset used for training
            validation_data=val_ds,             # dataset used for validation
            epochs=config['epochs'],            # epochs used for training
            callbacks=[early_stopping],         # set early stopping to avoid overfitting
            verbose=1
        )

        # Step 2: Unfreeze layers and fine-tune
        print(f"Unfreezing layers starting from layer {FINE_TUNE_AT} for fine-tuning...")
        fine_tune_model(model.layers[0], FINE_TUNE_AT)      # fine tune model

        # re-compile the model with a lower learning rate for fine-tuning
        fine_tune_lr = config['learning_rate'] * 0.01
        model.compile(
            optimizer=tf.keras.optimizers.Adam(learning_rate=fine_tune_lr),
            loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
            metrics=['accuracy']
        )

        print(f"Fine-tuning for {FINE_TUNE_EPOCHS} epochs...")
        history_fine_tune = model.fit(
            train_ds,                           # dataset used for training
            validation_data=val_ds,             # dataset used for validation
            epochs=FINE_TUNE_EPOCHS,            # epochs used for training
            callbacks=[early_stopping],         # set early stopping to avoid overfitting
            verbose=1
        )

        # evaluate on validation set after training
        val_predictions = model.predict(val_ds)
        avg_val_loss = model.evaluate(val_ds, verbose=0)[0]
        avg_val_accuracy, avg_val_precision, avg_val_recall, avg_val_f1, avg_val_auc = calculate_metrics(
            np.concatenate([y for x, y in val_ds]), val_predictions
        )

        print(f"\nValidation: \tFold {fold} - Loss: {avg_val_loss:.4f}, Accuracy: {avg_val_accuracy:.4f}, Precision: {avg_val_precision:.4f}, Recall: {avg_val_recall:.4f}, F1 Score: {avg_val_f1:.4f}, AUC Score: {avg_val_auc:.4f}")

        # save the best model based on validation F1 score
        if avg_val_f1 > best_val_f1score:
            best_val_f1score = avg_val_f1
            model.save(os.path.join(save_dir, f'mobilenetv2_best_f1score_fold_{fold}.h5'))
            print(f"Model with best F1 score during Validation saved at Fold {fold} with F1 Score of {best_val_f1score:.4f}")

            # save confusion matrix, loss curve, evaluation metrics for the best model
            # save_best_model_visuals(history_fine_tune, model, val_ds, class_names, save_dir, fold)

        fold += 1       # Move to the next fold

# save metrics after training
np.save(os.path.join(save_dir, 'train_metrics.npy'), train_metrics)
np.save(os.path.join(save_dir, 'val_metrics.npy'), val_metrics)

Training model 1/1 with config: {'learning_rate': 0.001, 'optimizer': 'adam', 'epochs': 14, 'save_metrics': False}

Fold 1/5...
Training with frozen base layers for 14 epochs...
Epoch 1/14
[1m72/72[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m38s[0m 344ms/step - accuracy: 0.5856 - loss: 0.7706 - val_accuracy: 0.8278 - val_loss: 0.3903
Epoch 2/14
[1m72/72[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m18s[0m 244ms/step - accuracy: 0.8334 - loss: 0.3941 - val_accuracy: 0.8678 - val_loss: 0.3252
Epoch 3/14
[1m72/72[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m18s[0m 249ms/step - accuracy: 0.8754 - loss: 0.3156 - val_accuracy: 0.8835 - val_loss: 0.2933
Epoch 4/14
[1m72/72[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m18s[0m 244ms/step - accuracy: 0.9024 - loss: 0.2711 - val_accuracy: 0.8957 - val_loss: 0.2739
Epoch 5/14
[1m72/72[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m19s[0m 261ms/step - accuracy: 0.9168 - loss: 0.2419 - val_accuracy: 0.8974 - val_loss: 0.2614
Epoch 6




Validation: 	Fold 1 - Loss: 0.2801, Accuracy: 0.9009, Precision: 0.9161, Recall: 0.8899, F1 Score: 0.8968, AUC Score: 0.9505
Model with best F1 score during Validation saved at Fold 1 with F1 Score of 0.8968

Fold 2/5...
Training with frozen base layers for 14 epochs...
Epoch 1/14
[1m72/72[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m177s[0m 2s/step - accuracy: 0.6159 - loss: 0.7157 - val_accuracy: 0.8191 - val_loss: 0.4327
Epoch 2/14
[1m72/72[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m22s[0m 300ms/step - accuracy: 0.8469 - loss: 0.3797 - val_accuracy: 0.8539 - val_loss: 0.3561
Epoch 3/14
[1m72/72[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m20s[0m 276ms/step - accuracy: 0.8787 - loss: 0.3099 - val_accuracy: 0.8783 - val_loss: 0.3169
Unfreezing layers starting from layer 150 for fine-tuning...
Fine-tuning for 10 epochs...
Epoch 1/10
[1m72/72[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m28s[0m 313ms/step - accuracy: 0.7925 - loss: 0.4596 - val_accuracy: 0.8052 - val_l

## Testing

In [28]:
# once training is complete, evaluate on the held-out test set
print("Evaluating the best model on the held-out test set...")
test_ds = tf.data.Dataset.from_tensor_slices((test_images, test_labels))
test_ds = test_ds.map(lambda x, y: (normalization_layer(x), y)).batch(batch_size)

test_predictions = model.predict(test_ds)
avg_test_loss = model.evaluate(test_ds, verbose=0)[0]
avg_test_accuracy, avg_test_precision, avg_test_recall, avg_test_f1, avg_test_auc = calculate_metrics(
    np.concatenate([y for x, y in test_ds]), test_predictions
)

print(f"\nTest Set Evaluation - Loss: {avg_test_loss:.4f}, Accuracy: {avg_test_accuracy:.4f}, Precision: {avg_test_precision:.4f}, Recall: {avg_test_recall:.4f}, F1 Score: {avg_test_f1:.4f}, AUC Score: {avg_test_auc:.4f}")
