## Imports

In [None]:
!pip install tensorflow_addons
!pip install numpy
!pip install matplotlib
!pip install opencv-python
!pip install scipy
!pip install tensorflow

In [None]:
# basics
import numpy as np
import PIL.Image as Image
import os
import pathlib
import matplotlib.pylab as plt
import random
import cv2
import tensorflow
import tensorflow as tf
from tensorflow import keras
import scipy

# datageneraton
from keras.preprocessing.image import ImageDataGenerator
import tensorflow.keras.applications as applications

# for new top layers
from tensorflow.keras.layers import Dense, Flatten, GlobalAveragePooling2D, Dropout, Input
from tensorflow.keras.models import Model

# hypeparameters
from tensorflow.keras.optimizers import Adam
from keras.losses import CategoricalCrossentropy
from tensorflow.keras.metrics import TopKCategoricalAccuracy
from tensorflow_addons.metrics import F1Score

# Callbacks
from tensorflow.keras.callbacks import LearningRateScheduler
from tensorflow.keras.callbacks import EarlyStopping

# models
from tensorflow.keras.applications.nasnet import NASNetMobile
from tensorflow.keras.applications.vgg16 import VGG16
from tensorflow.keras.applications.vgg19 import VGG19
from tensorflow.keras.applications import EfficientNetB0, EfficientNetB1, EfficientNetB2, EfficientNetB3, EfficientNetB4, EfficientNetB5, EfficientNetB6, EfficientNetB7
from tensorflow.keras.applications.resnet_v2 import ResNet50V2, ResNet101V2, ResNet152V2
from tensorflow.keras.applications.mobilenet_v2 import MobileNetV2
from tensorflow.keras.applications.inception_v3 import InceptionV3
from tensorflow.keras.applications.xception import Xception
from tensorflow.keras.applications.densenet import DenseNet121, DenseNet169

## Set Up

In [None]:
WARM_UP_EPOCHS     = 10
FINE_TUNE_EPOCHS   = 25
BASE_LEARNING_RATE = 0.001
USE_EARLY_STOP     = False
DATA_DIRECTORY     = ''

### DataLoaders

In [None]:
def get_dataloaders(data_dir:str, batch_size:int, width, height:int, model_name:str):
    """
    This function creates and returns the dataloaders for images using ImageDataGenerator from keras.

    Parameters:
    :param data_dir:     A string representing the path to the data directory where the data is already
                         divided into 3 folders: train, val, and test.
    :param batch_size:   An integer representing the batch size for the dataloaders
    :param image_width:  An integer representing the width of the images, according to the pretrained
                         model's specifications.
    :param image_height: An integer representing the height of the images, according to the pretrained
                         model's specifications.
    :param model_name:   A string representing the name of the model to be used.

    Returns:
    :return: train_loader, val_loader, test_loader
    """

    def __image_preprocess(image:np.array):
        """
        Rescales an image to the specified width and height. If the model type is 'efficientnet',
        the image values are also rescaled to the range [0, 1].
        
        Parameters:
        :param image: The image to be preprocssed.
        
        Returns:
        :return: The preprocssed image.
        """

        size = (width, height)

        if 'efficientnet' not in model_name.lower():
            # Normilize images - EfficientNets do not need normalization
            image = image/255.0 # rescale
        # Resizing
        image = cv2.resize(image,size)

        return image

    # Create a the training data generator (with augmentation methods)
    train_datagen = ImageDataGenerator(# Augmentation_methods
                                       rotation_range=10,       # rotate randomly for [0,10] degrees
                                       horizontal_flip=True,    # flip horizontally
                                       vertical_flip=True,      # flip vertically
                                       width_shift_range=0.05,  # shift in width dimention randomly for [0,5]%
                                       height_shift_range=0.05, # shift in height dimention randomly for [0,5]%
                                       brightness_range=[0.3,0.9],
                                       shear_range=0.1,         # shear randomly for [0,1]%
                                       zoom_range=0.2,          # zoom randomly for [0,2]%
                                       # Resizing
                                       preprocessing_function=__image_preprocess
                                       )

    print('Train:\t\t', end='')
    # Load and iterate training dataset
    train_it = train_datagen.flow_from_directory(directory=data_dir + 'train',
                                                 target_size=(height, width),
                                                 class_mode='categorical',
                                                 batch_size=batch_size,
                                                 save_format='jpg'
                                                 )

    # Create a the validation and test data generator
    test_datagen = ImageDataGenerator(preprocessing_function=__image_preprocess)

    
    print('Validation:\t', end='')
    # Load and iterate validation dataset
    val_it = test_datagen.flow_from_directory(directory=data_dir + 'val',
                                              target_size=(height, width),
                                              class_mode='categorical',
                                              batch_size=batch_size,
                                              save_format='jpg'
                                              )

    print('Test:\t\t', end='')
    # Load and iterate test dataset
    test_it = test_datagen.flow_from_directory(directory=data_dir + 'test',
                                               target_size=(height, width),
                                               class_mode='categorical',
                                               batch_size=batch_size,
                                               save_format='jpg'
                                               )

    return train_it, val_it, test_it

### Weights (For Imbalanced Data)

In [None]:
def get_weights(data_dir:str, n_classes:int):
    """
    Computes class weights for imbalanced data. The function counts the number of samples 
    in each class in the training data and computes the weight for each class based on the 
    total number of samples and the number of samples in each class.
    
    Parameters:
    :param data_dir:  The directory containing the training data.
    :param n_classes: The number of classes in the dataset.
    
    Returns:
    :return: A dictionary containing the class weights, where the keys are the class indices
             and the values are the computed weights.
    """

    # For imbalanced data 
    samples_dict = dict()
    total_samples = 0
    dataset_dir = data_dir + 'train'

    # Count each class population and the total sum
    for sub_folder in os.listdir(dataset_dir):
        sub_folder_dir = os.path.join(dataset_dir,sub_folder)
        sub_folder_count = len(os.listdir(sub_folder_dir))   
        samples_dict[sub_folder] = sub_folder_count
        total_samples += sub_folder_count

    # Compute weights of each class
    class_weights = {}

    for i in range(n_classes):
        w = total_samples / (n_classes * samples_dict[list(samples_dict.keys())[i]])
        class_weights[i] = w

    return class_weights

### Early Stopping

In [None]:
early_stop = EarlyStopping(monitor='val_loss',
                           mode='min',
                           verbose=1,
                           patience=5
                           )

## Training

In [None]:
# Define the list of models to compare
models = {'NASNetMobile'  : NASNetMobile,
          'VGG16'         : VGG16,
          'VGG19'         : VGG19,
          'ResNet50V2'    : ResNet50V2,
          'ResNet101V2'   : ResNet101V2,
          'ResNet152V2'   : ResNet152V2,
          'MobileNetV2'   : MobileNetV2,
          'InceptionV3'   : InceptionV3,
          'Xception'      : Xception,
          'DenseNet121'   : DenseNet121,
          'DenseNet169'   : DenseNet169,
          'EfficientNetB0': EfficientNetB0,
          'EfficientNetB1': EfficientNetB1,
          'EfficientNetB2': EfficientNetB2,
          'EfficientNetB3': EfficientNetB3,
          'EfficientNetB4': EfficientNetB4,
          'EfficientNetB5': EfficientNetB5,
          'EfficientNetB6': EfficientNetB6,
          'EfficientNetB7': EfficientNetB7
          }

# Define the parameters of each model. Note that the batch size is calculated
# for my machine, in your case there may be a need to make some adjustments.
parameters =  {'NASNetMobile'  : {'input_sizes':(160, 120), 'batch_size':64},
               'VGG16'         : {'input_sizes':(160, 120), 'batch_size':64},
               'VGG19'         : {'input_sizes':(160, 120), 'batch_size':64},
               'ResNet50V2'    : {'input_sizes':(160, 120), 'batch_size':64},
               'ResNet101V2'   : {'input_sizes':(160, 120), 'batch_size':64},
               'ResNet152V2'   : {'input_sizes':(160, 120), 'batch_size':64},
               'MobileNetV2'   : {'input_sizes':(160, 120), 'batch_size':64},
               'InceptionV3'   : {'input_sizes':(160, 120), 'batch_size':64},
               'Xception'      : {'input_sizes':(160, 120), 'batch_size':64},
               'DenseNet121'   : {'input_sizes':(160, 120), 'batch_size':64},
               'DenseNet169'   : {'input_sizes':(160, 120), 'batch_size':64},
               'EfficientNetB0': {'input_sizes':(224, 224), 'batch_size':32},
               'EfficientNetB1': {'input_sizes':(240, 240), 'batch_size':32},
               'EfficientNetB2': {'input_sizes':(260, 260), 'batch_size':32},
               'EfficientNetB3': {'input_sizes':(300, 300), 'batch_size':16},
               'EfficientNetB4': {'input_sizes':(380, 380), 'batch_size':16},
               'EfficientNetB5': {'input_sizes':(456, 456), 'batch_size':16},
               'EfficientNetB6': {'input_sizes':(528, 528), 'batch_size':16},
               'EfficientNetB7': {'input_sizes':(600, 600), 'batch_size':16},
               } 

In [None]:
# Chek if every model has a input size assigned
len(models.keys()) == len(parameters.keys())

In [None]:
# Check if there is an available GPU
print("Num GPUs Available: ", len(tf.config.list_physical_devices('GPU')))

In [None]:
history_warm_up     = {}
history_fine_tuning = {}
history_evaluation  = {}

# Iterate through the models dictionary
for idx, model_name in enumerate(models.keys()):
    
    print(f'{model_name} {"-"*100}')
    
    # Get data in expecting shape
    (in_height, in_width, channels) = (parameters[model_name]['input_sizes'][0],parameters[model_name]['input_sizes'][1], 3)
    batch_size = parameters[model_name]['batch_size']
    
    print(f'Input tensor shape: ({in_height}, {in_width}, {channels})')
    train_it, val_it, test_it = get_dataloaders(DATA_DIRECTORY, batch_size, in_width, in_height, model_name)
    n_classes = train_it.num_classes
    print(f'Number of Classes: {n_classes}')

    # Get weights
    class_weights = get_weights(DATA_DIRECTORY, n_classes)

    # Call the model architexture
    model_class = models[model_name]
    # specify model parameters
    model = model_class(include_top=False,  # Do not include the first layer. This is necessary to change the input tensor size
                        weights='imagenet', # Pretraind weights from imagnet
                        input_tensor=Input(shape=(in_height, in_width, channels))
                        )
    
    # Freeze the model
    for layer in model.layers:
        layer.trainable = False
        
    # Add new layers at-the end
    x = Flatten()(model.output)
    x = GlobalAveragePooling2D()(x)
    x = Dropout(rate=0.2)(x)
    prediction = Dense(n_classes, activation='softmax')(x)
    
    # Define the new model
    model = Model(inputs=model.input, outputs=prediction)
    
    # Compile
    model.compile(optimizer = Adam(learning_rate=BASE_LEARNING_RATE),
                  loss = CategoricalCrossentropy(from_logits=True),
                  metrics = ['accuracy']
                 )
    
    # Prepare Callbacks
    callbacks = []
    if USE_EARLY_STOP:
        callbacks.append(early_stop)

    # Let's warm up the new layers!
    print('Training (Warm_Up):')
    history_warm_up[model_name] = model.fit(train_it,
                                            epochs=WARM_UP_EPOCHS,
                                            validation_data=(val_it),
                                            class_weight=class_weights,
                                            callbacks=callbacks,
                                            workers=-1,
                                            use_multiprocessing=True
                                           )
    
    # Un-freeze only the last layers witch compute more hight level features
    model.trainable = True
    print("Number of layers in the base model: ", len(model.layers))
    # Fine-tune from this layer onwards
    fine_tune_at = int(0.75 * len(model.layers)) # Note that the 0.75 isn't fixed
    print(f"Fine-Tune from layer {fine_tune_at}")

    # Freeze all the layers before the `fine_tune_at` layer
    for layer in model.layers[:fine_tune_at]:
        layer.trainable = False
    
    # Compile
    model.compile(optimizer = Adam(learning_rate=BASE_LEARNING_RATE*0.1),
                  loss = CategoricalCrossentropy(from_logits=True),
                  metrics = ['accuracy']
                  )

    total_epochs = WARM_UP_EPOCHS+FINE_TUNE_EPOCHS

    # Let's fine tune it!
    print('Training (Fine_tune):')
    history_fine_tuning[model_name] = model.fit(train_it,
                                                epochs=total_epochs, # End epoch (including the previous training epochs)
                                                initial_epoch=WARM_UP_EPOCHS, # Continue from previous epoch
                                                validation_data=(val_it),
                                                class_weight=class_weights,
                                                callbacks=callbacks,
                                                workers=-1,
                                                use_multiprocessing=True
                                                )
    
    # Evaluation time!
    print('Evaluation:')
    history_evaluation[model_name] = model.evaluate(test_it,
                                                    workers=-1,
                                                    use_multiprocessing=True
                                                    )
    
    print()
    print() # NEXT!

## Results

### Training & Validation History

In [None]:
def metric_history(model_name:str, metric:str):
    """
    Retrieves the training history of a specific metric for a specific model.
    The history is retrieved from the warm-up phase and fine-tuning phase.
    The function concatenates the warm-up phase and fine-tuning phase history and returns them as a list.
    
    Parameters:
    :param model_name: The name of the model.
    :param metric: The name of the metric to retrieve the history of.
        
    Returns:
    :return: A list containing the history of the specified metric for the specified model.
    """
    warm_up   = history_warm_up    [model_name].history[metric]
    fine_tune = history_fine_tuning[model_name].history[metric]
    
    return warm_up + fine_tune
    

train_metrics = ['loss',     'accuracy'    ]
val_metrics   = ['val_loss', 'val_accuracy']

plot_columns = int(len(train_metrics))

# Iterate through the models dictionary
for idx, model_name in enumerate(list(models.keys())):
    print(model_name.upper(), ':')
    
    # Plot progression graphs
    plt.figure(figsize=(12,3))
    for i in range(plot_columns):
        plt.subplot(1, plot_columns, i + 1)
        plt.title((train_metrics[i].replace('_', ' ')).title())
        
        # Set axes
        if 'loss' not in train_metrics[i]:
            plt.ylim(0.0, 1.0)
        plt.xlim(0, total_epochs)
        plt.grid(axis='y', linestyle='-')

        # Plots
        plt.plot(metric_history(model_name,train_metrics[i]))
        plt.plot(metric_history(model_name,val_metrics  [i]))
        vline_position = len(history_warm_up[model_name].history[train_metrics[i]])
        plt.axvline(x=vline_position, color='green', ls='--')

        # Legent     
        plt.legend(['train', 'val', 'fine tune'], loc='best')
    
    plt.show()

    print() # NEXT!

### Compare Evaluation metrics

In [None]:
# Create list for each metric
eval_loss = []
eval_acc  = []

# Iterate through the evaluation history dictionary and
# extract the loss and accuracy values
for [loss, acc] in history_evaluation.values():
    eval_loss.append(loss)
    eval_acc.append(acc)

labels = list(models.keys()) # List of model names

x = np.arange(len(labels))  # The label locations
width = 0.30  # The width of the bars

fig, ax = plt.subplots(figsize=(13, 7))
rects2 = ax.bar(x + 0.2, eval_acc, width, label='Accuracy')

# Add some text for labels, title and custom x-axis tick labels, etc.
ax.set_ylabel('Scores')
ax.set_title('Scores by Model', fontsize=22)
ax.set_xticks(x, labels, rotation = 90, fontsize=14)
ax.legend(loc="best", fontsize=12)

# Add the label value on each bar
ax.bar_label(rects2, fmt='%.2f', fontsize=12)

fig.tight_layout(rect=(0,0,1.2,1.1))

plt.show()