## Experiment CNN models
Use this to test different CNN models, augmentations, and hyperparameters.

In [None]:
import tensorflow as tf
import keras
import keras_cv
import keras_cv.layers.preprocessing

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

from sklearn.metrics import confusion_matrix, classification_report, ConfusionMatrixDisplay
from sklearn.utils.class_weight import compute_class_weight
from matplotlib.colors import LinearSegmentedColormap

import os

In [None]:
# Set random variables
np.random.seed(0)

tf.random.set_seed(0)

In [None]:
# Import our data

TRAIN_DATADIR = "../data/train_directory"
VAL_DATADIR = "../data/val_directory"
TEST_DATADIR = "../data/test_directory"
BATCH_SIZE = 128

train_ds = keras.utils.image_dataset_from_directory(
    TRAIN_DATADIR,
    labels="inferred",
    label_mode="categorical",
    class_names=None,
    color_mode="rgb",
    batch_size=BATCH_SIZE,
    image_size=(224, 224),
    shuffle=True,
    seed = 0
)

val_ds = keras.utils.image_dataset_from_directory(
    VAL_DATADIR,
    labels="inferred",
    label_mode="categorical",
    class_names=None,
    color_mode="rgb",
    batch_size=BATCH_SIZE,
    image_size=(224, 224),
    shuffle=False,
)

test_ds = keras.utils.image_dataset_from_directory(
    TEST_DATADIR,
    labels="inferred",
    label_mode="categorical",
    class_names=None,
    color_mode="rgb",
    batch_size=1,
    image_size=(224, 224),
    shuffle=False,
)

In [None]:
# Retrieve class names
class_names = train_ds.class_names

In [None]:
# Get class weights

def compute_class_weights(dataset_dir):
    # Extract class weights for imbalanced datasets
    data = []
    for class_dir in os.listdir(dataset_dir):
        for img in os.listdir(os.path.join(dataset_dir, class_dir)):
            data.append((os.path.join(dataset_dir, class_dir, img), class_dir))
    df = pd.DataFrame(data, columns=['filepath', 'label'])
    class_labels = df['label'].unique()
    weights = compute_class_weight(class_weight='balanced', classes=class_labels, y=df['label'].values)
    return weights

weights = compute_class_weights(TRAIN_DATADIR)

In [None]:
# Let's take a look at the data
batch = next(iter(train_ds.take(1)))
image_batch = batch[0]

keras_cv.visualization.plot_image_gallery(
    image_batch,
    rows=3,
    cols=3,
    value_range=(0, 255),
    show=True,
)

### Augmentations
- RandomFlip (horizontal and vertical)
- Random Crop And Resize
- Normalize pixel values to [0, 1]

The other augmentations are more experimental and can be skipped or added to test their effect.

In [None]:
# Randomly flip the image horizontally and vertically
random_flip = keras_cv.layers.RandomFlip(mode="horizontal_and_vertical")

# Randomly crop and resize the image
crop_and_resize = keras_cv.layers.RandomCropAndResize(
    target_size=(224, 224),
    crop_area_factor=(0.8, 1.0),
    aspect_ratio_factor=(0.9, 1.1)
)

# Apply some random augmentations
rand_augment = keras_cv.layers.RandAugment(
    augmentations_per_image=3,
    value_range=(0, 1),
    magnitude=0.5,
    magnitude_stddev=0.2,
    rate=1.0
)

# Merge multiple augmentations into a single augmentation
# Stays more true to the original image than cutmix or mixup
aug_mix = keras_cv.layers.AugMix(
    [0,1],
    severity=0.3,
    num_chains=3,
    chain_depth=[1, 3],
    alpha=1.0,
    seed=0
)

# Cut parts of the image and paste them on other images
cut_mix = keras_cv.layers.preprocessing.CutMix()

# Mix two images together
mix_up = keras_cv.layers.preprocessing.MixUp()

# Randomly choose between CutMix and MixUp
cut_mix_or_mix_up = keras_cv.layers.RandomChoice([cut_mix, mix_up], batchwise=True)

# Define the augmentation function
def augmenter_train(images, labels):
    images = tf.cast(images, tf.float32) / 255.0
    images = random_flip(images, training=True)
    images = crop_and_resize(images, training=True)
    #inputs = rand_augment(inputs, training=True)
    #images = aug_mix(images, training=True)
    #inputs = cut_mix_or_mix_up(inputs)

    return images, labels

def augmenter_val(images, labels):
    images = tf.cast(images, tf.float32) / 255.0

    return images, labels

Apply augmentations and prefetch the data to the GPU to speed up training.

In [None]:
train_ds = train_ds.map(augmenter_train, num_parallel_calls=tf.data.AUTOTUNE).prefetch(buffer_size=tf.data.AUTOTUNE)

val_ds = val_ds.map(augmenter_val, num_parallel_calls=tf.data.AUTOTUNE).prefetch(buffer_size=tf.data.AUTOTUNE)

test_ds = test_ds.map(augmenter_val, num_parallel_calls=tf.data.AUTOTUNE)

In [None]:
# Visualize the augmented images
image_batch = next(iter(train_ds.take(1)))[0]

keras_cv.visualization.plot_image_gallery(
    image_batch,
    rows=3,
    cols=3,
    value_range=(0, 1),
    show=True,
)

Super-Convergence: Very Fast Training of Neural Networks Using Large Learning Rates: https://arxiv.org/abs/1708.07120 <br>
This code is based on the implementation from, https://github.com/ageron/handson-ml3/blob/main/11_training_deep_neural_networks.ipynb <br>


In [None]:
import math

# To find the optimal learning rate, we use an exponential learning rate schedule
# We can plot the learning rate against the loss to find the optimal learning rate

class ExponentialLearningRate(tf.keras.callbacks.Callback):
    def __init__(self, factor):
        self.factor = factor
        self.rates = []
        self.losses = []

    def on_epoch_begin(self, epoch, logs=None):
        self.sum_of_epoch_losses = 0

    def on_batch_end(self, batch, logs=None):
        mean_epoch_loss = logs["loss"]
        new_sum_of_epoch_losses = mean_epoch_loss * (batch + 1)
        batch_loss = new_sum_of_epoch_losses - self.sum_of_epoch_losses
        self.sum_of_epoch_losses = new_sum_of_epoch_losses
        self.rates.append(self.model.optimizer.learning_rate.numpy())
        self.losses.append(batch_loss)
        new_lr = self.model.optimizer.learning_rate * self.factor
        self.model.optimizer.learning_rate.assign(new_lr)
        
def find_learning_rate(model, dataset, epochs=1, min_rate=1e-4, max_rate=1):
    init_weights = model.get_weights()
    num_samples = tf.data.experimental.cardinality(dataset).numpy()
    iterations = math.ceil(num_samples / epochs)
    factor = (max_rate / min_rate) ** (1 / iterations)
    init_lr = model.optimizer.learning_rate.numpy()
    model.optimizer.learning_rate.assign(min_rate)
    exp_lr = ExponentialLearningRate(factor)
    history = model.fit(dataset, epochs=epochs, callbacks=[exp_lr])
    model.optimizer.learning_rate.assign(init_lr)
    model.set_weights(init_weights)
    return exp_lr.rates, exp_lr.losses

def plot_lr_vs_loss(rates, losses):
    plt.plot(rates, losses, "b")
    plt.gca().set_xscale('log')
    max_loss = losses[0] + min(losses)
    plt.hlines(min(losses), min(rates), max(rates), color="k")
    plt.axis([min(rates), max(rates), 0, max_loss])
    plt.xlabel("Learning rate")
    plt.ylabel("Loss")
    plt.grid()

The one-cycle learning rate scheduler is a learning rate schedule that consists of three phases:
1. The learning rate increases linearly from the initial learning rate to the maximum learning rate
2. The learning rate decreases linearly from the maximum learning rate to the minimum learning rate
3. The learning rate decreases linearly from the minimum learning rate to the final learning rate


In [None]:
class OneCycleScheduler(tf.keras.callbacks.Callback):
    def __init__(self, iterations, max_lr=1e-3, start_lr=None,
                 last_iterations=None, last_lr=None):
        self.iterations = iterations
        self.max_lr = max_lr
        self.start_lr = start_lr or max_lr / 10
        self.last_iterations = last_iterations or iterations // 10 + 1
        self.half_iteration = (iterations - self.last_iterations) // 2
        self.last_lr = last_lr or self.start_lr / 1000
        self.iteration = 0

    def _interpolate(self, iter1, iter2, lr1, lr2):
        return (lr2 - lr1) * (self.iteration - iter1) / (iter2 - iter1) + lr1

    def on_batch_begin(self, batch, logs):
        if self.iteration < self.half_iteration:
            lr = self._interpolate(0, self.half_iteration, self.start_lr,
                                   self.max_lr)
        elif self.iteration < 2 * self.half_iteration:
            lr = self._interpolate(self.half_iteration, 2 * self.half_iteration,
                                   self.max_lr, self.start_lr)
        else:
            lr = self._interpolate(2 * self.half_iteration, self.iterations,
                                   self.start_lr, self.last_lr)
        self.iteration += 1
        self.model.optimizer.learning_rate.assign(lr)

## ResNet-18

We use the Residual blocks as defined in https://github.com/ageron/handson-ml3/blob/main/14_deep_computer_vision_with_cnns.ipynb <br>

In [None]:
from functools import partial
from keras.saving import register_keras_serializable

# Define the strandard convolutional layer 
DefaultConv2D = partial(keras.layers.Conv2D, kernel_size=3, strides=1,
                        padding="same", kernel_initializer="he_normal",
                        use_bias=False)


@register_keras_serializable(package='Custom', name='ResidualUnit')
class ResidualUnit(keras.layers.Layer):
    def __init__(self, filters, strides=1, activation="relu", **kwargs):
        super().__init__(**kwargs)
        self.strides = strides
        self.filters = filters
        self.activation = keras.activations.get(activation)
        self.main_layers = [
            DefaultConv2D(filters, strides=strides),
            keras.layers.BatchNormalization(),
            self.activation,
            DefaultConv2D(filters),
            keras.layers.BatchNormalization()
        ]
        self.skip_layers = []
        if strides > 1:
            self.skip_layers = [
                DefaultConv2D(filters, kernel_size=1, strides=strides),
                keras.layers.BatchNormalization()
            ]
    def build(self, input_shape):
        super().build(input_shape)

    def call(self, inputs):
        Z = inputs
        for layer in self.main_layers:
            Z = layer(Z)
        skip_Z = inputs
        for layer in self.skip_layers:
            skip_Z = layer(skip_Z)
        return self.activation(Z + skip_Z)
    
    def get_config(self):
        config = super().get_config()
        config.update({
                "filters": self.filters,
                "strides": self.strides,
                "activation": keras.activations.serialize(self.activation)
            })
        return config
    
    @classmethod
    def from_config(cls, config):
        return cls(**config)

In [None]:
model = keras.Sequential([
    keras.layers.Input(shape=(224, 224, 3)),
    DefaultConv2D(64, kernel_size=7, strides=2),
    keras.layers.BatchNormalization(),
    keras.layers.Activation("relu"),
    keras.layers.MaxPool2D(pool_size=3, strides=2, padding="same"),
])
prev_filters = 64

# Each ResidualUnit consists of two convolutional layers with the same number of filters
# This gives us the ResNet-18 architecture
for filters in [64] * 2 + [128] * 2 + [256] * 2 + [512] * 2:
    strides = 1 if filters == prev_filters else 2
    model.add(ResidualUnit(filters, strides=strides))
    prev_filters = filters

model.add(keras.layers.GlobalAvgPool2D())
model.add(keras.layers.Dense(11, activation="softmax"))

loss = keras.losses.CategoricalFocalCrossentropy(weights)

model.compile(
    loss=loss,
    optimizer=tf.keras.optimizers.SGD(learning_rate=0.001, momentum = 0.9, weight_decay=1e-4),
    metrics=["accuracy"],
)

model.summary()

In [None]:
# Find the optimal learning rate
rates, losses = find_learning_rate(model, train_ds, epochs=1)
plot_lr_vs_loss(rates, losses)

In [None]:
model.compile(
    loss=keras.losses.CategoricalFocalCrossentropy(weights),
    optimizer=tf.keras.optimizers.SGD(learning_rate=0.001, momentum = 0.9, weight_decay=1e-4),
    metrics=["accuracy"],
)

EPOCHS = 35
# Set up callbacks
from keras.callbacks import TensorBoard, EarlyStopping, ModelCheckpoint
import datetime

log_dir = os.path.join("logs", datetime.datetime.now().strftime("%Y%m%d-%H%M%S"))

onecycle = OneCycleScheduler(math.ceil(len(train_ds)) * EPOCHS, max_lr=1e-2)


# Callbacks definition
callbacks = [
    EarlyStopping(monitor='val_loss', patience=20, verbose=1),
    ModelCheckpoint('../models/best_model_18.keras', monitor='val_accuracy', save_best_only=True, verbose=1),
    TensorBoard(log_dir=log_dir, histogram_freq=1),
    onecycle
]


history = model.fit(train_ds, validation_data=val_ds, epochs=EPOCHS, callbacks=callbacks)

## Evaluation

In [None]:


def plot_learning_curves(history, start_epoch=1):
    """
    Plot training and validation loss and accuracy curves.

    Args:
        history: A History object generated from training a model.
        start_epoch: The epoch that the training started from.
    """   

    # Convert the history.history dict to a pandas DataFrame
    df = pd.DataFrame(history.history)

    # Plot the curves from the specified epoch onwards
    df = df.iloc[start_epoch-1:]

    # Set the style of seaborn for better visualization
    sns.set(rc={'axes.facecolor': '#f0f0fc'}, style='darkgrid')

    # Plotting the learning curves
    plt.figure(figsize=(15,6))

    # Plotting the training and validation loss
    plt.subplot(1, 2, 1)
    sns.lineplot(x=df.index, y=df['loss'], color='royalblue', label='Train Loss')
    sns.lineplot(x=df.index, y=df['val_loss'], color='orangered', linestyle='--', label='Validation Loss')
    plt.title('Loss Evolution')

    # Plotting the training and validation accuracy
    plt.subplot(1, 2, 2)
    sns.lineplot(x=df.index, y=df['accuracy'], color='royalblue', label='Train Accuracy')
    sns.lineplot(x=df.index, y=df['val_accuracy'], color='orangered', linestyle='--', label='Validation Accuracy')
    plt.title('Accuracy Evolution')

    plt.show()

In [None]:
plot_learning_curves(history, start_epoch=1)

In [None]:

def evaluate_model_performance(model, test_ds, class_labels):
    """
    Evaluate the model's performance on the validation set and print the classification report.

    Args:
        model: A trained Keras model.
        test_ds: A tf.data.Dataset object containing the test set.
        class_labels: A list of class labels.
    """

    
    # Initialize a list to hold all labels
    true_labels = []

    # Iterate over the dataset
    for _ , labels in test_ds:
        true_labels.append(np.argmax(labels.numpy()))  # Extract labels and convert to NumPy arrays
 
    #  To get the predicted labels, we predict using the model  
    predictions = model.predict(test_ds, steps=len(test_ds))
    
    # Take the argmax to get the predicted class indices.
    predicted_labels = np.argmax(predictions, axis=1)


    # Classification report
    report = classification_report(true_labels, predicted_labels, target_names=class_labels)
    print(report)
    print('\n')
    
    # Define a custom colormap
    colors = ["white", "royalblue"]
    cmap_cm = LinearSegmentedColormap.from_list("cmap_cm", colors)

    # Confusion Matrix
    cm = confusion_matrix(true_labels, predicted_labels)

    # Plotting confusion matrix using seaborn
    plt.figure(figsize=(8,6))
    sns.heatmap(cm, annot=True, cmap=cmap_cm, fmt='d', xticklabels=class_labels, yticklabels=class_labels)
    plt.xlabel('Predicted Labels')
    plt.ylabel('True Labels')
    plt.title('Confusion Matrix')
    plt.show()


In [None]:
# Import model
model = keras.models.load_model('../models/best_model_18.keras')

In [None]:
evaluate_model_performance(model, test_ds, class_names)

We can additionaly look at top 2 accuracy, as we have 11 classes it can be useful to see if the model is predicting the correct class as the second best class. <br>

In [None]:
def top_2_accuracy(y_true, y_pred):
    return keras.metrics.top_k_categorical_accuracy(y_true, y_pred, k=2)

model.compile(
    loss=keras.losses.CategoricalFocalCrossentropy(weights),
    optimizer=keras.optimizers.SGD(learning_rate=0.001, momentum = 0.9, weight_decay=1e-4),
    metrics=["accuracy", top_2_accuracy],
)


# Evaluate the model on the test set
test_loss, test_accuracy, test_top_2_accuracy = model.evaluate(test_ds)

print(f"Test Accuracy: {test_accuracy}")
print(f"Test Top-2 Accuracy: {test_top_2_accuracy}")

Visualize images that has been misclassified an the probability distribution of the top 5 predictions. <br>


In [None]:
# Initialize lists to hold true and predicted labels
true_labels = []
images_list = []

# Collect true labels and images
for images, labels in test_ds:
    true_labels.extend(np.argmax(labels.numpy(), axis=1))  # Extract labels and convert to NumPy arrays
    images_list.extend(images.numpy())  # Collect images as well

# Predict using the model
predictions = model.predict(test_ds, steps=len(test_ds))

# Get the predicted class indices
predicted_labels = np.argmax(predictions, axis=1)

# Identify wrongly classified images
wrong_indices = [i for i in range(len(true_labels)) if true_labels[i] != predicted_labels[i]]

# Visualize wrongly classified images with their predicted probabilities
def plot_wrong_classifications(images, true_labels, predicted_labels, predictions, wrong_indices, num_images=10):
    plt.figure(figsize=(20, 40))
    
    for i, idx in enumerate(wrong_indices[:num_images]):
        # Plot the image
        ax_image = plt.subplot(num_images, 2, 2 * i + 1)
        plt.imshow(images[idx])
        plt.title(f"True: {true_labels[idx]}, Pred: {predicted_labels[idx]}")
        plt.axis("off")
        
        # Plot the bar chart of probabilities
        ax_bar = plt.subplot(num_images, 2, 2 * i + 2)
        top_5_indices = np.argsort(predictions[idx])[-5:][::-1]
        top_5_probs = predictions[idx][top_5_indices]
        top_5_labels = top_5_indices
        
        ax_bar.barh(range(5), top_5_probs, color='blue')
        ax_bar.set_yticks(range(5))
        ax_bar.set_yticklabels(top_5_labels)
        ax_bar.invert_yaxis()  # Invert y-axis to have the highest probability on top
        ax_bar.set_xlabel('Probability')
        ax_bar.set_title('Top 5 Predicted Probabilities')
        
    plt.tight_layout()
    plt.show()

# Call the function to plot
plot_wrong_classifications(images_list, true_labels, predicted_labels, predictions, wrong_indices)