# Banana health classification

An ML model for image classification of healthy and diseased banana leafs. Uses binary classification to determine health state, based on two powerful ML models.

This notebook is part of the "Neuronale Netze & Deep Learning" course at [Digital Business University of Applied Sciences (DBU)](https://dbuas.de/). See the attached report "Analyse der Gesundheit von Bananenpflanzen durch Bild-Klassifizierung mittels maschineller Lernverfahren" (german) for more insights.

**⚠️ Please read the additional setup instructions in the attached project report (section "A. Systemanforderungen") before running this notebook.**

Author: [Elias Häußler](https://haeussler.dev) &middot; Developed in July 2024.

## 1.) Preparation

In the following steps, the **Python environment** is prepared and all required modules are imported. This includes configuration of **Kaggle authentication** which requires you to type in your username and personal Kaggle API key. You can find both values in a `kaggle.json` file which can be downloaded from your Kaggle [account settings](https://www.kaggle.com/settings).

### 1.1.) Import modules and install libraries

This section installs and imports all relevant Python modules and external libraries.

In [None]:
!pip install \
  "kaggle==1.6.17" \
  "keras-tuner==1.4.7" \
  "matplotlib==3.7.1" \
  "numpy==1.26.4" \
  "pandas==2.1.4" \
  "scikit-learn==1.3.2" \
  "seaborn==0.13.1" \
  "tensorflow==2.15.0" \
  "tensorflow-hub==0.16.1"

In [None]:
import datetime
import math
import os
from collections import defaultdict
from platform import python_version

import tensorflow as tf
import tensorflow_hub as hub
import keras_tuner as kt

import matplotlib.pylab as plt
import numpy as np
import pandas as pd
import seaborn as sns

from sklearn.utils import class_weight
from sklearn.metrics import confusion_matrix
from sklearn.model_selection import train_test_split

In [None]:
print("Python version:", python_version())
print("TF version:", tf.__version__)
print("Hub version:", hub.__version__)
print("GPU is", "available 🏎️" if tf.config.list_physical_devices('GPU') else "NOT AVAILABLE 🚜")

### 1.2.) Authentication at Kaggle

Please provide your authentication data for Kaggle. You can lookup your username and API key in a `kaggle.json` file which can be downloaded from your Kaggle account settings. Read more at the [official documentation](https://github.com/Kaggle/kaggle-api/blob/main/docs/README.md#api-credentials).

In [None]:
kaggle_user = input('Please enter your Kaggle username: ')
kaggle_key = input('Please enter your Kaggle API token: ')

%env KAGGLE_USERNAME=$kaggle_user
%env KAGGLE_KEY=$kaggle_key

## 2.) Data colletion

In this section, we **prepare the dataset** used to train and test our model. For this project, the [`BananaLSD`](https://www.kaggle.com/datasets/shifatearman/bananalsd) dataset from Kaggle is used. The dataset provides an original set as well as an augmented set with preprocessed images using data augmentation. However, we will use the **original set** and perform data augmentation on our own.

### 2.1.) Download and initialize dataset

This section downloads the dataset from Kaggle. Once downloaded, the dataset is split into training and validation datasets. The used validation split is 30%. We enable batching with 32 data points per batch and define our image dimensions (224x224 pixels).

In [None]:
training_dir = os.path.join(os.getcwd(), 'BananaHealthClassification')
dataset_dir = os.path.join(training_dir, 'dataset')
data_dir = os.path.join(dataset_dir, 'BananaLSD', 'OriginalSet')

In [None]:
if not os.path.isdir(data_dir):
  !kaggle datasets download -d shifatearman/bananalsd -p $dataset_dir --unzip

In [None]:
seed = 42
batch_size = 32
image_height = 224
image_width = 224
image_shape = (image_height, image_width, 3)
validation_split = 0.3

(train_ds, val_ds) = tf.keras.utils.image_dataset_from_directory(
  data_dir,
  seed = seed,
  validation_split = validation_split,
  subset = 'both',
  image_size = (image_height, image_width),
  batch_size = batch_size,
)

### 2.2.) Convert multiclass to binary class data

Since we're just interested in the leaves being healthy or having a disease, we convert the labels of each data point to their corresponding health state:

* `0` = leaves are healthy
* `1` = leaves have disease

In [None]:
initial_class_names = np.array(train_ds.class_names)
initial_healthy_class_index = np.where(initial_class_names == 'healthy')[0][0]
initial_disease_class_indexes = np.where(initial_class_names != 'healthy')[0]

print(f'Initial class names: {initial_class_names}')
print(f'Initial "healthy" class has index {initial_healthy_class_index}.')
print(f'Initial "disease" classes have indexes {initial_disease_class_indexes}.')

In [None]:
healthy_class_index = 0
disease_class_index = 1
class_names = np.array([healthy_class_index, disease_class_index])
class_labels = {
  healthy_class_index: 'Healthy',
  disease_class_index: 'Disease',
}

print(f'Modified class names: {class_names}')
print(f'Modified "healthy" class has index {healthy_class_index}.')
print(f'Modified "disease" class has index {disease_class_index}.')

In [None]:
def convert_labels(images, labels):
  """
  Convert image labels from multiclass to binary class state (healthy/disease).
  """

  # Convert healthy class
  converted_labels = tf.where(
    labels == initial_healthy_class_index,
    tf.cast(healthy_class_index, tf.int32),
    labels,
  )

  # Convert disease classes
  for initial_disease_class_index in initial_disease_class_indexes:
    converted_labels = tf.where(
      labels == initial_disease_class_index,
      tf.cast(disease_class_index, tf.int32),
      converted_labels,
    )

  # Convert to float values for the F1 score to work properly
  converted_labels = tf.cast(converted_labels, tf.float32)

  return images, converted_labels

In [None]:
train_ds = train_ds.map(convert_labels)
val_ds = val_ds.map(convert_labels)

### 2.3.) Lookup data samples

Once initialized, we want to take a quick look at our image dataset. For this, we render some samples from the first image batch.

In [None]:
def normalize_label(label):
  """
  Normalize label value from float to integer.
  """

  return label.numpy().astype('uint8')

In [None]:
def label_to_class(label):
  """
  Convert given label value to speaking label.
  """

  return class_labels[normalize_label(label)]

In [None]:
def plot_sample_images_from_dataset(image_batch, rows = None, title = 'Samples from image batch'):
  """
  Render sample images from dataset using subplots.
  """

  cols = 3

  for images, labels in image_batch:
    images_len = len(images)
    images_range = range(images_len if rows == None else rows * cols)

    if rows == None:
      rows = math.ceil(images_len / cols)

    plt.figure(figsize = (cols * 4, rows * 4))
    plt.suptitle(title)

    for i in images_range:
      if images_len > i:
        axes = plt.subplot(rows, cols, i + 1)
        plt.imshow(images[i].numpy().astype('uint8'))
        plt.title(f'{normalize_label(labels[i])} ({label_to_class(labels[i])})')
        plt.axis('off')

In [None]:
plot_sample_images_from_dataset(
  train_ds.take(1),
  title = 'Training images (sample batch)',
  rows = 3,
)

## 3.) Modelling

In order to build our model, we first need to make sure that our data is properly **balanced**. This is an essential step to assure that our model performs well on unknown data. In subsequent steps, the model is defined by all its components, for example by two base models using **Transfer Learning** and combined methods to address **regularization**.

### 3.1.) Improve training performance

Before starting our modelling process, we modify our training and validation datasets to perform better during the training process.

In [None]:
def cache_datasets(train_ds, val_ds):
  """
  Cache and prefetch training and validation datasets.
  """

  train_ds = train_ds.cache().prefetch(buffer_size = tf.data.AUTOTUNE)
  val_ds = val_ds.cache().prefetch(buffer_size = tf.data.AUTOTUNE)

  return train_ds, val_ds

In [None]:
train_ds, val_ds = cache_datasets(train_ds, val_ds)

### 3.2.) Check class imbalance

We need to make sure that our data is properly balanced. Otherwise, the model may perform better on the overrepresented classes. That's why we evaluate our training data in terms of class imbalance.

In [None]:
def count_datapoints(dataset):
  """
  Count datapoints of each class in the dataset.

  Returns a data frame with classes and number of datapoints (samples) per class.
  """

  dict_counts = defaultdict(int)

  for _, labels_batch in dataset:
    for label in labels_batch:
      class_name = label_to_class(label)
      dict_counts[class_name] += 1

  dict_counts_df = pd.DataFrame(
    dict_counts.items(),
    columns = ['class', 'samples'],
  )

  return dict_counts_df.sort_values(by = 'class', ignore_index = True)

In [None]:
def plot_datapoints(data_frame, column, title):
  """
  Render bar chart and pie chart of given data and selected column.
  """

  healthy_count = data_frame[column][datapoints_count['class'] == 'Healthy'].sum()
  disease_count = data_frame[column][datapoints_count['class'] == 'Disease'].sum()

  data = [healthy_count, disease_count]
  labels = ['Healthy', 'Disease']
  colors = ['lightgreen', 'coral']

  # Create subplot
  fig, ax = plt.subplots(
    nrows = 1,
    ncols = 2,
    width_ratios = [0.3, 0.7],
    figsize = (12, 6),
  )
  fig.suptitle(title)

  # Render bar chart
  ax[0].bar(
    labels,
    data,
    color = colors,
  )
  ax[0].spines[['top', 'right']].set_visible(False)

  for i in range(len(data)):
    ax[0].text(i, data[i] / 2, str(round(data[i], 2)), ha = 'center')

  # Render pie chart
  wedges, texts, autotexts = ax[1].pie(
    data,
    colors = colors,
    autopct = lambda x: str(round(x, 2)) + '%',
    explode = (0.1, 0),
    textprops = dict(color = 'white'),
  )
  ax[1].legend(
    wedges,
    labels,
    loc = 'upper right',
    bbox_to_anchor = (1, 0, 0.5, 1),
  )

  plt.show()

In [None]:
datapoints_count = count_datapoints(train_ds)

In [None]:
plot_datapoints(datapoints_count, 'samples', 'Samples per class')

### 3.3.) Handle class imbalance

Our dataset contains an imbalanced set of data per class. That's why we use several techniques to handle class imbalance in our dataset. The following sections describe all techniques in a meaningful way and visualize how each technique steps in the process of resolving our class imbalance.

#### 3.3.1.) Class Weights

This step calculates the weight of each class and visualizes the class imbalance based on the calculated class weights. This way, we can again recognize the class imbalance in our dataset.

In [None]:
def calculate_class_weights():
  """
  Calculate class weights in training dataset.
  """

  labels = np.concatenate([y for x, y in train_ds], axis = 0)
  class_weights = class_weight.compute_class_weight(
    'balanced',
    classes = np.unique(labels),
    y = labels,
  )

  return dict(enumerate(class_weights))

In [None]:
class_weights = calculate_class_weights()

In [None]:
datapoints_count.insert(0, 'weight', class_weights.values())

In [None]:
plot_datapoints(datapoints_count, 'weight', 'Weight per class')

#### 3.3.2.) Data Augmentation

In this step, we define data augmentation for our dataset. This allows us to better train our model and, in addition, handle class imbalance. An example of data augmentation is visualized for a sample batch.

After that, we use data augmentation to perform oversampling. We generate 1000 images per class using data augmentation, while keeping the existing data. As a result, we should see two balanced classes in our dataset.

In [None]:
data_augmentation = tf.keras.Sequential([
  tf.keras.layers.RandomFlip(
    'horizontal_and_vertical',
    input_shape = image_shape,
  ),
  tf.keras.layers.RandomRotation(0.5),
  tf.keras.layers.RandomZoom(0.5),
  tf.keras.layers.RandomBrightness(0.2),
  tf.keras.layers.RandomContrast(0.2),
  tf.keras.layers.RandomWidth(0.2),
  tf.keras.layers.RandomHeight(0.2),
  tf.keras.layers.GaussianNoise(0.1),
])

In [None]:
plot_sample_images_from_dataset(
  train_ds.take(1).map(lambda x, y: (data_augmentation(x, training = True), y)),
  title = 'Augmented training images (sample batch)',
  rows = 3,
)

In [None]:
def split_images_by_class(dataset):
  """
  Split images of dataset into healthy and disease images.
  """

  healthy_images = []
  disease_images = []

  for images, labels in dataset:
    for i in range(len(labels)):
      if labels[i] == healthy_class_index:
        healthy_images.append(images[i])
      else:
        disease_images.append(images[i])

  return np.array(healthy_images), np.array(disease_images)

In [None]:
def copy_and_augment_images(images, target_class_count = 1000):
  """
  Perform data augmentation on given images until target class count is reached.
  """

  augmented_images = list(images.copy())

  while len(augmented_images) < target_class_count:
    for x in images:
      # Normalize image shape for expected input in augmentation layer
      augmented_image = tf.squeeze(data_augmentation(tf.expand_dims(x, 0)), 0)
      augmented_image = tf.image.resize(augmented_image, [image_height, image_width])
      augmented_images.append(augmented_image)

      if len(augmented_images) >= target_class_count:
        break

  return np.array(augmented_images)

In [None]:
# Perform data augmentation on full dataset
full_dataset = train_ds.concatenate(val_ds)
healthy_images, disease_images = split_images_by_class(full_dataset)
healthy_augmented_images = copy_and_augment_images(healthy_images)
disease_augmented_images = copy_and_augment_images(disease_images)

In [None]:
# Prepare augmented images and labels
augmented_images = np.concatenate((healthy_augmented_images, disease_augmented_images))
augmented_labels = np.array(
  # Cast to float is relevant for the F1 score to work properly
  [tf.cast(healthy_class_index, tf.float32)] * len(healthy_augmented_images) +
  [tf.cast(disease_class_index, tf.float32)] * len(disease_augmented_images),
)

In [None]:
# Split augmented training and validation data
train_images, val_images, train_labels, val_labels = train_test_split(
  augmented_images,
  augmented_labels,
  test_size = validation_split,
  random_state = seed,
)

In [None]:
# Convert augmented data back to TF datasets
train_ds = tf.data.Dataset.from_tensor_slices((train_images, train_labels)).batch(batch_size)
val_ds = tf.data.Dataset.from_tensor_slices((val_images, val_labels)).batch(batch_size)

In [None]:
train_ds, val_ds = cache_datasets(train_ds, val_ds)

### 3.4.) Validate class balance

It's time to validate whether we properly handled class imbalance. In this step, we visualize the samples and weights per class again. We can see that our classes are now properly balanced, which allows us to continue our modeling process effectively.

In [None]:
datapoints_count = count_datapoints(train_ds)

In [None]:
plot_datapoints(datapoints_count, 'samples', 'Samples per class')

In [None]:
class_weights = calculate_class_weights()

In [None]:
datapoints_count.insert(0, 'weight', class_weights.values())

In [None]:
plot_datapoints(datapoints_count, 'weight', 'Weight per class')

### 3.5.) Build model

Now that our dataset is properly initialized and balanced, we can start building our model. We use several techniques to improve the training process and make the model perform as good as possible.

#### 3.5.1.) Select base models (Transfer Learning)

In the first step, we select two pretrained models as our base models. This technique is called **Transfer Learning** and allows our model to make better predictions since we can rely on a lot of previous trainings and a huge training dataset.

We use the following two base models:

1. The [**EfficientNet V2**](https://www.kaggle.com/models/google/efficientnet-v2) model in the variation *Efficientnetv2 B0 (21K)* is trained on the *ImageNet-21K* dataset and specifically on Google's *flower images* dataset, which targets a broad variety of our custom training dataset. We use the model's variation that provides additional feature vectors in order to get more power out of the CNN behind it.
2. The more specific [**rishitdagli/plant-disease**](https://www.kaggle.com/models/rishitdagli/plant-disease) model is trained on a various set of plant diseases. It allows us to re-use similar training data for detections of banana leaf diseases.

In [None]:
base_model_1 = hub.KerasLayer(
  'https://www.kaggle.com/models/google/efficientnet-v2/frameworks/TensorFlow2/variations/imagenet21k-ft1k-b0-feature-vector/versions/1',
  trainable = False,
)

In [None]:
base_model_2 = hub.KerasLayer(
  'https://www.kaggle.com/models/rishitdagli/plant-disease/TensorFlow2/plant-disease/1',
  trainable = False,
)

#### 3.5.2.) Define layers

The base models as part of our model are defined as additional concatenated input layer. Next to it, we add some more layers and use them to define our model:

1. We include the previously defined **data augmentation** layer.
2. We add a **rescaling layer** to assure that input data is properly converted to the expected Tensor format.
3. In order to address *regularization*, we also add a **dropout layer**.
4. We now include the **base models** as additional input layer.
5. The final output layers consist of two **dense layers**. The first one activates some neurons to detect patterns. The second (and last) one describes the final output layer containing the model's prediction. In addition, we define an L2 regularizer as an additional regularization measure.

In [None]:
def define_layers(dense_units = 480, dropout_rate = 0.5, l2_factor = 0.001):
  """
  Define all relevant layers for our resulting model using the given parameters.
  """

  # Input layers
  input_layer = tf.keras.Input(shape = image_shape)
  inputs = data_augmentation(input_layer)
  inputs = tf.keras.layers.Rescaling(1./255)(inputs)
  inputs = tf.keras.layers.Dropout(dropout_rate)(inputs)

  # Output layers
  model_1_output = base_model_1(inputs)
  model_2_output = base_model_2(inputs)
  outputs = tf.keras.layers.concatenate([
    model_1_output,
    model_2_output,
  ])
  outputs = tf.keras.layers.Dense(
    units = dense_units,
    activation = 'relu',
  )(outputs)
  outputs = tf.keras.layers.Dense(
    # We use an explicit dense layer of 1 for our binary classification problem
    units = 1,
    activation = 'sigmoid',
    kernel_regularizer = tf.keras.regularizers.l2(l2_factor),
  )(outputs)

  return input_layer, outputs

#### 3.5.3.) Compile model

Once the model is described with our supported layers, we can now define its compilation process. We use *Adam* as **compiler** for our model since it performs very well as generic compiler. In order to calculate the **crossentropy loss** between labels and predictions, we use *binary crossentropy*. The relevant **metrics** during fitting of our model are the following:

* We use the **accuracy** metric to measure correct predictions during fitting.
* We use the **F1 score** to measure performance of our model, specifically in context of image classification.
* We use the **precision** metric to validate a low rate of false positives.
* Last but not least, we use the **recall** metric to validate a low rate of false negatives. This is an essential metric for our model, because we must avoid predictions of healthy leaves while they're actually having a disease.

In [None]:
def create_metrics():
  """
  Define all evaluation metrics to train our model.
  """

  return [
    tf.keras.metrics.BinaryAccuracy(name = 'accuracy'),
    tf.keras.metrics.F1Score(threshold = 0.5, name = 'f1_score'),
    tf.keras.metrics.Precision(name = 'precision'),
    tf.keras.metrics.Recall(name = 'recall'),
  ]

In [None]:
def compile_model(model, optimizer = 'adam', learning_rate = 0.001):
  """
  Compile model with given learning rate for given optimizer.

  Creates several metrics and uses them together with the given optimizer and
  a binary crossentropy loss to compile the given model.
  """

  if optimizer == 'rmsprop':
    optimizer = tf.keras.optimizers.RMSprop(learning_rate = learning_rate)
  elif optimizer == 'sgd':
    optimizer = tf.keras.optimizers.SGD(learning_rate = learning_rate)
  else:
    optimizer = tf.keras.optimizers.Adam(learning_rate = learning_rate)

  model.compile(
    optimizer = optimizer,
    loss = tf.keras.losses.BinaryCrossentropy(),
    metrics = create_metrics(),
  )

#### 3.5.4.) Define callbacks

We use **Early Stopping** as relevant callback in the fitting process of our model. It is based on the validation loss and serves as an additional measure for regularization and to avoid overfitting of our model. We choose to stop in case training won't be improved after 10 epochs (using the `patience` argument).

In [None]:
def create_early_stopping_callback():
  return tf.keras.callbacks.EarlyStopping(
    monitor = 'val_loss',
    patience = 10,
    restore_best_weights = True,
  )

#### 3.5.5.) Free up RAM

For the following resource-intensive operations, we remove some variables to free up RAM. This stabilizes further operations and avoids session timeouts due to full RAM.

In [None]:
%reset_selective -f ^initial_class_names$
%reset_selective -f ^initial_healthy_class_index$
%reset_selective -f ^initial_disease_class_indexes$
%reset_selective -f ^full_dataset$
%reset_selective -f ^healthy_images$
%reset_selective -f ^disease_images$
%reset_selective -f ^healthy_augmented_images$
%reset_selective -f ^disease_augmented_images$
%reset_selective -f ^augmented_images$
%reset_selective -f ^augmented_labels$
%reset_selective -f ^train_images$
%reset_selective -f ^val_images$
%reset_selective -f ^train_labels$
%reset_selective -f ^val_labels$

## 4.) Training

Now that our model is properly defined, we can start fitting and thus training of our model. We use our training dataset as input data and define our validation dataset to evaluate loss and other metrics for each epoch. We use a maxmimum of **30 epochs** for fitting of our model. Since _Early Stopping_ is enabled, the actual number of performed epochs may be lower.

The training is split in two steps:

1. **Manual training**: We choose four different hyperparameters in four iterations to test our model.
2. **Automatic training**: We use *Keras Tuner* to automatically find the best hyperparameters for our model.

In [None]:
def fit_model(model, epochs = 30):
  """
  Fit given model using our training and validation datasets.

  Performs fitting for given number of epochs. Includes previously calculated
  class weights and Early Stopping callback to avoid overfitting.
  """

  return model.fit(
    train_ds,
    epochs = epochs,
    validation_data = val_ds,
    class_weight = class_weights,
    callbacks = [create_early_stopping_callback()],
  )

In [None]:
def build_and_train(dense_units, dropout_rate, l2_factor, optimizer, learning_rate):
  """
  Build and train our model using the given hyperparameters.

  Returns the trained model and the training history for further evaluation.
  """

  input_layer, outputs = define_layers(
    dense_units = dense_units,
    dropout_rate = dropout_rate,
    l2_factor = l2_factor,
  )
  model = tf.keras.Model(inputs = input_layer, outputs = outputs)
  compile_model(
    model = model,
    optimizer = optimizer,
    learning_rate = learning_rate,
  )
  history = fit_model(model)

  return model, history.history

In [None]:
def read_metric(metric: str, history: dict):
  """
  Return dict with training and validation score of given metric.
  """

  return {
    'train': history[metric],
    'val': history[f'val_{metric}'],
  }

In [None]:
def read_all_metrics(history: dict):
  """
  Read and return dict with all metrics of given training history.
  """

  accuracy = read_metric('accuracy', history)
  f1_score = read_metric('f1_score', history)
  precision = read_metric('precision', history)
  recall = read_metric('recall', history)
  loss = read_metric('loss', history)

  # Flatten list values in f1_score
  f1_score['train'] = [x[0] for x in f1_score['train']]
  f1_score['val'] = [x[0] for x in f1_score['val']]

  return (accuracy, f1_score, precision, recall, loss)

In [None]:
def plot_metrics(metrics: dict, show_title = True):
  """
  Plot given metrics.
  """

  label_mapping = {
    'train': 'Training',
    'val': 'Validation',
  }
  linestyle_mapping = {
    'train': '-',
    'val': '--',
  }

  metrics_len = len(metrics)
  ncols = 2
  nrows = math.ceil(metrics_len / ncols)

  fig, ax = plt.subplots(
    nrows = nrows,
    ncols = ncols,
    figsize = (12, nrows * 4),
    squeeze = False,
  )

  # Plot each metric
  for i, label in enumerate(metrics):
    metric = metrics[label]
    row = math.floor(i / ncols)
    col = i % ncols

    # Add plots
    for y in metric:
      values = metric[y]
      y_label = f'{label_mapping[y] or y} {label}'
      line_style = linestyle_mapping[y] or '-'
      epochs = range(1, len(values) + 1)

      ax[row, col].plot(epochs, values, label = y_label, linestyle = line_style)

    # Configure axes
    ax[row, col].set_title(label)
    ax[row, col].set_xlim(left = 1)
    ax[row, col].legend()
    ax[row, col].grid(axis = 'y')

  # Remove remaining axes
  if i + 1 < nrows * ncols:
    for j in range(i + 1, nrows * ncols):
      row = math.floor(j / ncols)
      col = j % ncols

      fig.delaxes(ax[row, col])

  # Set labels and title
  if show_title:
    if metrics_len > 1:
      fig.suptitle('Training and Validation metrics')
    else:
      fig.suptitle(f'Training and Validation {label}')

  # Render figure
  plt.show()

In [None]:
def plot_metric(metric: list, label: str):
  """
  Plot a single metric.
  """

  plot_metrics({label: metric})

In [None]:
def plot_all_metrics(accuracy, f1_score, precision, recall, loss):
  """
  Plot all given metrics.
  """

  plot_metrics({
    'Accuracy': accuracy,
    'F1 score': f1_score,
    'Precision': precision,
    'Recall': recall,
    'Loss': loss,
  })

In [None]:
def evaluate_metrics(history: dict):
  """
  Reads metrics from training history and plots each metric history.
  """

  accuracy, f1_score, precision, recall, loss = read_all_metrics(history)
  plot_metrics(
    {
      'Accuracy': accuracy,
      'F1 score': f1_score,
      'Precision': precision,
      'Recall': recall,
    },
    show_title = False,
  )

In [None]:
def perform_predictions(model):
  """
  Run predictions on given model with random batch of validation dataset.
  """

  test_images, test_labels = next(iter(val_ds.shuffle(batch_size)))

  predictions = model.predict(test_images)
  predicted_labels = np.round(predictions).astype(int).flatten()

  false_images = test_images[test_labels != predicted_labels]
  false_labels = predicted_labels[test_labels != predicted_labels]
  false_predictions_len = len(false_images)

  if false_predictions_len == 0:
    print('There were no false predictions.')
  elif false_predictions_len == 1:
    print('There was one false prediction.')
  else:
    print(f'There were {false_predictions_len} false predictions.')

  return (test_labels, predicted_labels, false_images, false_labels)

In [None]:
def show_confusion_matrix(cm):
  """
  Render given confusion matrix.
  """

  labels = class_labels.values()

  plt.figure(figsize = (10, 8))
  plt.suptitle('Confusion matrix')

  sns.heatmap(cm, xticklabels = labels, yticklabels = labels, annot = True, fmt = 'g')
  plt.xlabel('Prediction')
  plt.ylabel('Label')

  plt.show()

In [None]:
def plot_false_predictions(false_images, false_labels):
  """
  Plot false model predictions.
  """

  false_images_ds = tf.data.Dataset.from_tensor_slices(false_images).batch(batch_size)
  false_labels_ds = tf.data.Dataset.from_tensor_slices(false_labels).batch(batch_size)
  false_predictions_ds = tf.data.Dataset.zip((false_images_ds, false_labels_ds))

  plot_sample_images_from_dataset(
    false_predictions_ds,
    title = 'False predictions',
  )

In [None]:
def predict_and_evaluate(model):
  """
  Perform predictions on given model and visuzalize results.
  """

  test_labels, predicted_labels, false_images, false_labels = perform_predictions(model)
  cm = confusion_matrix(test_labels, predicted_labels)
  show_confusion_matrix(cm)
  plot_false_predictions(false_images, false_labels)

### 4.1.) Manual training

The manual training is split in four iterations. In each iteration, our model is trained with different hyperparameters. After fitting, the metrics are visualized and the trained model is used to make predictions for the validation dataset.

#### 4.1.1.) First training iteration

The first manual training iteration is done with these hyperparameters:

| Parameter | Value |
|---|---|
| Dense units | 480 |
| Dropout rate | 0.5 |
| L2 factor | 0.001 |
| Optimizer | Adam |
| Learning rate | 0.001 |

In [None]:
model, history = build_and_train(
  dense_units = 480,
  dropout_rate = 0.5,
  l2_factor = 1e-3,
  optimizer = 'adam',
  learning_rate = 1e-3,
)

In [None]:
evaluate_metrics(history)

In [None]:
predict_and_evaluate(model)

In [None]:
# Free up some RAM for following trainings
%reset_selective -f ^history$
%reset_selective -f ^model$

#### 4.1.2.) Second training iteration

The second manual training iteration is done with these hyperparameters:

| Parameter | Value |
|---|---|
| Dense units | 192 |
| Dropout rate | 0.4 |
| L2 factor | 0.0005 |
| Optimizer | SGD |
| Learning rate | 0.0001 |

In [None]:
model, history = build_and_train(
  dense_units = 192,
  dropout_rate = 0.4,
  l2_factor = 5e-4,
  optimizer = 'sgd',
  learning_rate = 1e-4,
)

In [None]:
evaluate_metrics(history)

In [None]:
predict_and_evaluate(model)

In [None]:
# Free up some RAM for following trainings
%reset_selective -f ^history$
%reset_selective -f ^model$

#### 4.1.3.) Third training iteration

The third manual training iteration is done with these hyperparameters:

| Parameter | Value |
|---|---|
| Dense units | 96 |
| Dropout rate | 0.2 |
| L2 factor | 0.0002 |
| Optimizer | Adam |
| Learning rate | 0.0004 |

In [None]:
model, history = build_and_train(
  dense_units = 96,
  dropout_rate = 0.2,
  l2_factor = 2e-4,
  optimizer = 'adam',
  learning_rate = 4e-4,
)

In [None]:
evaluate_metrics(history)

In [None]:
predict_and_evaluate(model)

In [None]:
# Free up some RAM for following trainings
%reset_selective -f ^history$
%reset_selective -f ^model$

#### 4.1.4.) Fourth training iteration

The fourth manual training iteration is done with these hyperparameters:

| Parameter | Value |
|---|---|
| Dense units | 32 |
| Dropout rate | 0.1 |
| L2 factor | 0.00002 |
| Optimizer | RMSprop |
| Learning rate | 0.0005 |

In [None]:
model, history = build_and_train(
  dense_units = 32,
  dropout_rate = 0.1,
  l2_factor = 2e-5,
  optimizer = 'rmsprop',
  learning_rate = 5e-4,
)

In [None]:
evaluate_metrics(history)

In [None]:
predict_and_evaluate(model)

In [None]:
# Free up some RAM for following trainings
%reset_selective -f ^history$
%reset_selective -f ^model$

### 4.2.) Automatic training

In order to find the best performing model, we use the *Keras Tuner* to perform a search, based on a custom hypermodel.

#### 4.2.1.) Build hypermodel

Our hypermodel is defined to build an ML model with the following range of hyperparameters:

The second manual training iteration is done with these hyperparameters:

| Parameter | Range |
|---|---|
| Dense units | 32 - 512 |
| Dropout rate | 0.1 - 0.5 |
| L2 factor | 0.00001 - 0.01 |
| Optimizer | Adam, RMSprop, SGD |
| Learning rate | 0.0001 - 0.01 |

We use the **Bayesian optimization** tuner and perform a maximum of 10 trials. The best hyperparameters are calculated as a combination of the best **validation accuracy** and **validation loss** metrics.

In [None]:
class BananaLeafDiseasesHyperModel(kt.HyperModel):
  def __init__(self, name = None, tunable = True):
    self.history = []

    super().__init__(name, tunable)

  def build(self, hp):
    # Define layers
    input_layer, outputs = define_layers(
      dense_units = hp.Int('dense_units', min_value = 32, max_value = 512, step = 32),
      dropout_rate = hp.Float('dropout_rate', min_value = 0.1, max_value = 0.5, step = 0.1),
      l2_factor = hp.Float('l2_factor', min_value = 1e-5, max_value = 1e-2, sampling = 'log'),
    )
    model = tf.keras.Model(inputs = input_layer, outputs = outputs)

    # Compile model
    compile_model(
      model = model,
      optimizer = hp.Choice('optimizer', values = ['adam', 'rmsprop', 'sgd']),
      learning_rate = hp.Float('learning_rate', min_value = 1e-4, max_value = 1e-2, sampling = 'log'),
    )

    return model

  def fit(self, hp, model, *args, **kwargs):
    # Start training
    history = super().fit(hp, model, *args, **kwargs)

    # Track training results
    self.history.append(history)

    return history

In [None]:
tuner = kt.BayesianOptimization(
  hypermodel = BananaLeafDiseasesHyperModel(),
  objective = [
    kt.Objective('val_accuracy', 'max'),
    kt.Objective('val_loss', 'min'),
  ],
  max_trials = 10,
  executions_per_trial = 1,
  directory = os.path.join(training_dir, 'tuner'),
  project_name = str(datetime.datetime.now().timestamp()),
)

#### 4.2.2.) Search for best hyperparameters

Similar to the manual training iterations, we now use the prebuilt tuner to start searching for the best model. We perform a maximum of 30 epochs per trial and use *Early Stopping* as defined earlier.

In [None]:
tuner.search(
  train_ds,
  epochs = 30,
  validation_data = val_ds,
  callbacks = [create_early_stopping_callback()],
)

In [None]:
# Fetch history from best trial
best_trial = int(tuner.oracle.get_best_trials()[0].trial_id)
best_history = tuner.hypermodel.history[best_trial].history

# Fetch best performing model
best_model = tuner.get_best_models()[0]

#### 4.2.3.) Evaluate best performing model

All metrics of the best performing model are now visualized. In addition, we use the model to make predictions for the validation dataset.

In [None]:
evaluate_metrics(best_history)

In [None]:
predict_and_evaluate(best_model)

## 5.) Save best performing model

After we've found and trained our best performing model, we finally export it for later usage.

In [None]:
output_path = os.path.join(training_dir, 'model')
best_model.save(output_path)

That's it. I hope you enjoyed it.

🤖👋