### Importing key libraries

In [None]:
import os, json, yaml, warnings, random
import numpy as np, matplotlib.pyplot as plt, tensorflow as tf
from sklearn import metrics
import optuna

warnings.filterwarnings('ignore')

# Set seeds for reproducibility
random.seed(42)
np.random.seed(42)
tf.random.set_seed(42)

In [None]:
# Uncommon packages
# !pip install -q split-folders optuna-integration

### TF configuration

In [None]:
physical_devices = tf.config.list_physical_devices('GPU')
for gpu in physical_devices:
    tf.config.experimental.set_memory_growth(gpu, True)

### Non-optimized hyperparameters

In [None]:
IMG_SIZE = 224
NUM_EPOCHS = 100 # doesn't matter much because of EarlyStopping
# EarlyStopping
PATIENCE = 5 # no. of epochs with no improvement after which training will be stopped
# ReduceLROnPlateau
LR_PATIENCE = 5 # no. of epochs with no improvement after which learning rate will be reduced
LR_FACTOR = 0.1 # factor by which the learning rate will be reduced
MIN_LR = 1e-6 # lower bound on the learning rate
# Optuna
OPTUNA_TRIALS = 50

### Dataset splitting (one-time)

In [None]:
output_dir = r'splitted_data' # Intentionally declared in separate cell

In [None]:
# import splitfolders

# dataset_dir = r'dataset'

# # Make output dir if doesn't exist, else skip
# os.makedirs(output_dir, exist_ok=True)

# splitfolders.ratio(dataset_dir, output=output_dir, seed=1337, ratio=(0.75, 0.15, 0.1))
# print(f"Dataset successfully splitted into: {os.listdir(output_dir)}")

### Dataset config

In [None]:
def helper_ds(partition, shuffle_status=True):
    return tf.keras.utils.image_dataset_from_directory(
        directory=os.path.join(output_dir, partition),
        image_size=(IMG_SIZE, IMG_SIZE),
        batch_size=32,
        label_mode='binary',
        shuffle=shuffle_status
    )

train_ds = helper_ds('train')
val_ds = helper_ds('val')
test_ds = helper_ds('test', False)


# Rescale layer
rescale_layer = tf.keras.layers.Rescaling(1./255)

# Data augmentation for 'train' set
data_augmentation = tf.keras.Sequential([
        tf.keras.layers.RandomFlip('horizontal'),
        tf.keras.layers.RandomRotation(0.2),
        tf.keras.layers.RandomContrast(0.2),
        tf.keras.layers.RandomBrightness(0.15),
        ])

AUTOTUNE = tf.data.AUTOTUNE

# Augmentation and rescaling for 'train' set
train_rsc = train_ds.map(lambda x, y: (rescale_layer(data_augmentation(x), training=True), y), num_parallel_calls=AUTOTUNE)
train_rsc = train_rsc.cache().shuffle(1000).prefetch(buffer_size=AUTOTUNE)

# Only rescaling for 'test' and 'val' set
val_rsc = val_ds.map(lambda x, y: (rescale_layer(x), y), num_parallel_calls=AUTOTUNE).prefetch(buffer_size=AUTOTUNE)
test_rsc = test_ds.map(lambda x, y: (rescale_layer(x), y), num_parallel_calls=AUTOTUNE).prefetch(buffer_size=AUTOTUNE)

In [None]:
class_names = train_ds.class_names
print(f"Class names: {class_names}")

# Saving the class names
with open('class_names.json', 'w') as f:
    json.dump(class_names, f)

# Sanity check
print(f"Class names successfully saved.") if os.path.exists('class_names.json') else print(f"Failed to save the file.")

### Data visualization

In [None]:
plt.figure(figsize=(10, 8))
for images, labels in train_rsc.take(1):
    for i in range(9):
        ax = plt.subplot(3, 3, i + 1)
        plt.imshow(images[i].numpy())
        state = "active" if int(labels[i]) else "drowsy"
        plt.title(state)
        plt.axis("off")
        plt.tight_layout()

### Stacking models

In [None]:
def stacked_model(input_shape=[IMG_SIZE, IMG_SIZE, 3], LEARNING_RATE=0.0001, DROPOUT_RATE=0.2):

    d_net = tf.keras.applications.DenseNet121(include_top=False, weights='imagenet', input_shape=input_shape)
    e_net = tf.keras.applications.EfficientNetV2B2(include_top=False, weights='imagenet', input_shape=input_shape)
    m_net = tf.keras.applications.MobileNetV2(include_top=False, weights='imagenet', input_shape=input_shape)

    # Freeze the models
    d_net.trainable=False; e_net.trainable=False; m_net.trainable=False

    # Input layer
    inputs = tf.keras.Input(shape=input_shape)

    # Averaging the outputs from each model
    d_net_gap = tf.keras.layers.GlobalAveragePooling2D()(d_net(inputs))
    e_net_gap = tf.keras.layers.GlobalAveragePooling2D()(e_net(inputs))
    m_net_gap = tf.keras.layers.GlobalAveragePooling2D()(m_net(inputs))

    # Combine outputs
    combined_outputs = tf.keras.layers.concatenate([d_net_gap, e_net_gap, m_net_gap])

    # Custom layers
    stacked_1 = tf.keras.layers.Dense(1024, activation='relu')(combined_outputs)
    stacked_1 = tf.keras.layers.BatchNormalization()(stacked_1)
    stacked_1 = tf.keras.layers.Dropout(DROPOUT_RATE)(stacked_1)

    stacked_2 = tf.keras.layers.Dense(512, activation='relu')(stacked_1)
    stacked_2 = tf.keras.layers.BatchNormalization()(stacked_2)
    stacked_2 = tf.keras.layers.Dropout(DROPOUT_RATE)(stacked_2)

    stacked_3 = tf.keras.layers.Dense(256, activation='relu')(stacked_2)
    stacked_3 = tf.keras.layers.BatchNormalization()(stacked_3)
    stacked_3 = tf.keras.layers.Dropout(DROPOUT_RATE)(stacked_3)

    out = tf.keras.layers.Dense(1, activation='sigmoid')(stacked_3)

    model = tf.keras.Model(inputs=inputs, outputs=out)
    model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=LEARNING_RATE),
                 loss='binary_crossentropy',
                 metrics=['accuracy', tf.keras.metrics.Precision(name='precision'), tf.keras.metrics.Recall(name='recall')])
    return model

### Defining callbacks

In [None]:
early_stopping = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=PATIENCE, restore_best_weights=True)
reduce_lr = tf.keras.callbacks.ReduceLROnPlateau(monitor='val_loss', factor=LR_FACTOR, patience=LR_PATIENCE, min_lr=MIN_LR)
model_checkpoint = tf.keras.callbacks.ModelCheckpoint("checkpoints/best_model.keras", monitor='val_loss', save_best_only=True, verbose=1)

# Custom training curve callback
from IPython.display import clear_output

train_losses=[]; val_losses=[]; precision_scores=[]; recall_scores=[]

class TrainingCurveCallback(tf.keras.callbacks.Callback):
    def on_epoch_end(self, epoch, logs=None):
        train_losses.append(logs['loss'])
        val_losses.append(logs['val_loss'])
        precision_scores.append(logs['precision'])
        recall_scores.append(logs['recall'])

        clear_output(wait=True) # clear output before plotting

        fig, ax1 = plt.subplots(figsize=(10, 5)) # create figure; will contain loss/accuracy curves
        fig.suptitle('Training Curves')

        # loss curve (ax1 - left y axis)
        ax1.plot(train_losses, label='Train Loss')
        ax1.plot(val_losses, label='Validation Loss')
        ax1.set_xlabel('Epoch')
        ax1.set_xlim(left=0)
        ax1.set_ylabel('Loss function')

        # determine left y axis range from min/max loss values
        y_ax1_min = min(min(train_losses), min(val_losses)) # find lowest loss value across both curves
        y_ax1_min = max(0, y_ax1_min - 0.1) # add some padding to the bottom of the plot. lower bound can't be less than 0
        y_ax1_max = max(max(train_losses), max(val_losses)) + 0.1 # find highest loss value across both curves, add some padding to the top of the plot
        ax1.set_ylim(y_ax1_min, y_ax1_max) # set y axis limits

        # Best epoch (smallest loss)
        best_epoch = int(np.argmin(val_losses))
        best_loss = val_losses[best_epoch]

        # add veritcal line for best epoch
        ax1.vlines(best_epoch, ymin=0, ymax=y_ax1_max, linestyles='dashed', colors='black',
                   label=f'best epoch={best_epoch}\nval loss={best_loss:.3f}')
        ax1.legend(loc='upper left')

        # create right y axis for precision/recall curves
        ax2 = ax1.twinx()

        # Plot precision/recall (right y-axis)
        ax2.plot(precision_scores, label=f'Precision\n{precision_scores[best_epoch]:.3f} @ {best_epoch}', color='red')
        ax2.plot(recall_scores, label=f'Recall\n{recall_scores[best_epoch]:.3f} @ {best_epoch}', color='green')
        ax2.set_ylabel('Precision / Recall')
        ax2.set_ylim(0, 1)
        ax2.legend(loc='upper right')
        plt.tight_layout()
        plt.show()

### Model summary (optional)

In [None]:
# ensemble_model = stacked_model()
# ensemble_model.summary()

### Running Optuna (skippable)

In [None]:
def objective(trial):
    # Define the hyperparameter search space
    params = {
        'LEARNING_RATE': trial.suggest_loguniform('LEARNING_RATE', 1e-5, 1e-2),
        'DROPOUT_RATE': trial.suggest_float('DROPOUT_RATE', 0.1, 0.5, step=0.1),
    }
    ensemble_model = stacked_model(LEARNING_RATE=params['LEARNING_RATE'], DROPOUT_RATE=params['DROPOUT_RATE'])

    optuna_pruner = optuna.integration.TFKerasPruningCallback(trial, monitor='val_loss')
    history = ensemble_model.fit(train_rsc, validation_data=val_rsc, epochs=NUM_EPOCHS, callbacks=[early_stopping, reduce_lr, optuna_pruner])
    return min(history.history['val_loss'])
    
study = optuna.create_study(direction='minimize') # minimize validation loss
study.optimize(objective, n_trials=OPTUNA_TRIALS, show_progress_bar=True)

In [None]:
# Get the best trial's value and hyperparameters
best_val = study.best_value; best_params = study.best_params
print(f'Best value: {best_val}\nBest hyperparameters found: {best_params}')

In [None]:
# Save the best hyperparameters to a file
with open('best_hyperparameters.yaml', 'w') as f:
    yaml.dump(best_params, f)

### Training the final model

In [None]:
# Load the ensemble model again
ensemble_model = stacked_model(LEARNING_RATE=best_params['LEARNING_RATE'], DROPOUT_RATE=best_params['DROPOUT_RATE'])

# If you want to skip Optuna, simply comment out the line above and uncomment the line below
# ensemble_model = stacked_model(LEARNING_RATE=0.0001, DROPOUT_RATE=0.2)

history = ensemble_model.fit(train_rsc, validation_data=val_rsc, epochs=NUM_EPOCHS, callbacks=[early_stopping, reduce_lr, TrainingCurveCallback()])

### Saving the model

In [None]:
model_filename = 'best_model.keras' # Intentionally declared in separate cell

In [None]:
ensemble_model.save(model_filename)

# Sanity check
print(f"Model successfully saved as {model_filename}") if os.path.exists(model_filename) else print(f"Failed to save the model.")

### Loading the model

In [None]:
optimized_ensemble = tf.keras.models.load_model(model_filename) # Intentionally declared in separate cell

### Testing w/ val dataset

In [None]:
final_val_loss, final_val_accuracy, *rest = optimized_ensemble.evaluate(val_rsc)
print(f"Ensemble Model - Validation Loss: {final_val_loss:.4f}, Validation Accuracy: {final_val_accuracy:.4f}")

### Testing w/ test dataset

In [None]:
# Gather true test labels
y_true = []
for _, labels in test_rsc:
    y_true.extend(labels.numpy())

# Sanity check
assert len(y_true) > 1, "True label list is empty!"

# Predict on test dataset
y_pred = optimized_ensemble.predict(test_rsc)

In [None]:
# Need to convert prediction labels to binary for confusion matrix
y_pred_conf = (y_pred >= 0.5).astype(int)

cm = metrics.confusion_matrix(y_true, y_pred_conf)
disp = metrics.ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=class_names)

plt.figure(figsize=(8,8))
disp.plot(cmap=plt.cm.Blues, values_format='d')
plt.xticks(rotation=45, ha='right') # Rotating labels for better readability
plt.yticks(rotation=45, ha='right')
plt.title('Confusion Matrix')
plt.tight_layout()
plt.show()

In [None]:
# Predict probabilities on the test set
fpr, tpr, _ = metrics.roc_curve(y_true, y_pred)
roc_auc = metrics.auc(fpr, tpr)

plt.figure(figsize=(8, 5))
plt.plot(fpr, tpr, label='ROC curve (area = %0.2f)' % roc_auc)
plt.plot([0, 1], [0, 1], 'k--', label='No Skill')
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('ROC Curve for Driver Drowsiness Detection')
plt.legend()
plt.tight_layout()
plt.show()