In [6]:
import tensorflow as tf
import os
import pandas as pd
import numpy as np
import warnings
import datetime
warnings.filterwarnings('ignore')

In [15]:
print("Imported functions:")
print("- activate_gpu()")
print("- generate_directories()")
print("- build_binary_classification_transfer_learning_model()")
print("- build_train_image_data_generators_flow_from_directory()")
print("- build_callbacks()")
print("- optimize_model()")
print("- visualize_loss_accuracy_binary_classification()")
print("- build_evaluate_image_data_generators_flow_from_directory()")
print("- evaluate_binary_classification_model()")

print("")

Imported functions:
- activate_gpu()
- generate_directories()
- build_binary_classification_transfer_learning_model()
- build_train_image_data_generators_flow_from_directory()
- build_callbacks()



In [None]:
def activate_gpu():
    
    tf.keras.backend.clear_session()
    gpus = tf.config.experimental.list_physical_devices('GPU')
    if gpus:
        try:
            tf.config.experimental.set_visible_devices(gpus[0], 'GPU')
            for gpu in gpus:
                tf.config.experimental.set_memory_growth(gpu, True)
            logical_gpus = tf.config.experimental.list_logical_devices('GPU')
            print(len(gpus), "Physical GPUs,", len(logical_gpus), "Logical GPUs")
        except RuntimeError as e:
            print(e)
            
    return gpu

In [None]:
def generate_directories(model_name,
                         model_type,
                         working_directory,
                         dataset_directory,
                         print_directories = False):
    
    """
    model_name (str): ResNet50
    model_type (str): Binary
    working_directory (str): path
    dataset_directory (str): path
    print_directories (bool): False
    """
    
    # 1. Create model folder:
    model_directory = os.path.join(working_directory, model_name)
    model_type_directory = os.path.join(model_directory, model_type)
    if os.path.exists(model_directory) == False:
        os.mkdir(model_directory)
    else:
        print("Folder", model_directory, "already exist")
        
    # 2. Create model type folder:
    if os.path.exists(model_type_directory) == False:
        os.mkdir(model_type_directory)
    else:
        print("Folder", model_type_directory, "already exist")
        
    # 3. Dataset directories:
    train_directory = os.path.join(dataset_directory, "train")
    validation_directory = os.path.join(dataset_directory, "validation")
    test_directory = os.path.join(dataset_directory, "test")
    
    # 4. Model store and checkpoint directories:
    callback_model_checkpoint_directory = os.path.join(model_type_directory, "keras_model.weights.{epoch:02d}-{val_binary_accuracy:.4f}-{val_loss:.4f}.hdf5")
    callback_tensorboard_directory = os.path.join(model_type_directory, "logs")
    callback_csv_logger_directory = os.path.join(model_type_directory, "_".join([datetime.datetime.now().strftime("%d-%m-%Y %H-%M-%S"), model_name, "model_optimization_logger.csv"]))
    
    # 5. Print directories:
    if print_directories:
        print("Working directory:", working_directory)
        print("Model directory:", model_directory)
        print("Model type directory:", model_type_directory)
        print("Dataset directory:", dataset_directory)
        print("Train dataset directory:", train_directory)
        print("Validation dataset directory:", validation_directory)
        print("Test dataset directory:", test_directory)
        print("Model callbacks checkpoint directory:", callback_model_checkpoint_directory)
        print("Model callbacks tensorboard directory:", callback_tensorboard_directory)
        print("Model callbacks csv logger directory:", callback_csv_logger_directory)
    
    directories = dict(working_directory = working_directory,
                       model_directory = model_directory,
                       model_type_directory = model_type_directory,
                       dataset_directory = dataset_directory,
                       train_directory = train_directory,
                       validation_directory = validation_directory,
                       test_directory = test_directory,
                       callback_model_checkpoint_directory = callback_model_checkpoint_directory,
                       callback_tensorboard_directory = callback_tensorboard_directory,
                       callback_csv_logger_directory = callback_csv_logger_directory)
    
    return directories

In [8]:
def build_binary_classification_transfer_learning_model(model_name,
                                                        weights = "imagenet",
                                                        include_top = False,
                                                        print_model_summary = False):
    
    """
    model_name (str): "EfficientNetB0",
                      "EfficientNetB1",
                      "EfficientNetB2",
                      "EfficientNetB3", 
                      "EfficientNetB4",
                      "EfficientNetB5", 
                      "EfficientNetB6",
                      "EfficientNetB7",
                      "Xception",
                      "VGG16",
                      "VGG19",
                      "ResNet50",
                      "ResNet50V2",
                      "ResNet101",
                      "ResNet101V2", 
                      "ResNet152", 
                      "ResNet152V2", 
                      "MobileNet", 
                      "MobileNetV2":
                      "InceptionV3", 
                      "InceptionResNetV2",
                      "DenseNet121", 
                      "DenseNet169", 
                      "DenseNet201", 
                      "NASNetLarge",
                      "NASNetMobile"
    weights (str): "imagenet"
    include_top (bool): False
    print_model_summary (bool): False
    """
    
    if model_name == "EfficientNetB0":
        input_shape = (224, 224, 3)
        model_base = tf.keras.applications.EfficientNetB0(include_top = include_top, weights = "imagenet", input_shape = input_shape)

    if model_name == "EfficientNetB1":
        input_shape = (240, 240, 3)
        model_base = tf.keras.applications.EfficientNetB1(include_top = include_top, weights = "imagenet", input_shape = input_shape)

    if model_name == "EfficientNetB2":
        input_shape = (260, 260, 3)
        model_base = tf.keras.applications.EfficientNetB2(include_top = include_top, weights = "imagenet", input_shape = input_shape)

    if model_name == "EfficientNetB3":
        input_shape = (300, 300, 3)
        model_base = tf.keras.applications.EfficientNetB3(include_top = include_top, weights = "imagenet", input_shape = input_shape)

    if model_name == "EfficientNetB4":
        input_shape = (380, 380, 3)
        model_base = tf.keras.applications.EfficientNetB4(include_top = include_top, weights = "imagenet", input_shape = input_shape)

    if model_name == "EfficientNetB5":
        input_shape = (456, 456, 3)
        model_base = tf.keras.applications.EfficientNetB5(include_top = include_top, weights = "imagenet", input_shape = input_shape) 

    if model_name == "EfficientNetB6":
        input_shape = (528, 528, 3)
        model_base = tf.keras.applications.EfficientNetB6(include_top = include_top, weights = "imagenet", input_shape = input_shape)

    if model_name == "EfficientNetB7":
        input_shape = (600, 600, 3)
        model_base = tf.keras.applications.EfficientNetB7(include_top = include_top, weights = "imagenet", input_shape = input_shape)

    if model_name == "Xception":
        input_shape = (299, 299, 3)
        model_base = tf.keras.applications.Xception(include_top = include_top, weights = "imagenet", input_shape = input_shape)

    if model_name == "VGG16":
        input_shape = (224, 224, 3)
        model_base = tf.keras.applications.VGG16(include_top = include_top, weights = "imagenet", input_shape = input_shape)

    if model_name == "VGG19":
        input_shape = (224, 224, 3)
        model_base = tf.keras.applications.VGG19(include_top = include_top, weights = "imagenet", input_shape = input_shape)

    if model_name == "ResNet50":
        input_shape = (224, 224, 3)
        model_base = tf.keras.applications.ResNet50(include_top = include_top, weights = "imagenet", input_shape = input_shape)  

    if model_name == "ResNet50V2":
        input_shape = (224, 224, 3)
        model_base = tf.keras.applications.ResNet50V2(include_top = include_top, weights = "imagenet", input_shape = input_shape)  

    if model_name == "ResNet101":
        input_shape = (224, 224, 3)
        model_base = tf.keras.applications.ResNet101(include_top = include_top, weights = "imagenet", input_shape = input_shape)  

    if model_name == "ResNet101V2":
        input_shape = (224, 224, 3)
        model_base = tf.keras.applications.ResNet101V2(include_top = include_top, weights = "imagenet", input_shape = input_shape) 

    if model_name == "ResNet152":
        input_shape = (224, 224, 3)
        model_base = tf.keras.applications.ResNet152(include_top = include_top, weights = "imagenet", input_shape = input_shape)  

    if model_name == "ResNet152V2":
        input_shape = (224, 224, 3)
        model_base = tf.keras.applications.ResNet152V2(include_top = include_top, weights = "imagenet", input_shape = input_shape)  

    if model_name == "MobileNet":
        input_shape = (224, 224, 3)
        model_base = tf.keras.applications.MobileNet(include_top = include_top, weights = "imagenet", input_shape = input_shape)      

    if model_name == "MobileNetV2":
        input_shape = (224, 224, 3)
        model_base = tf.keras.applications.MobileNetV2(include_top = include_top, weights = "imagenet", input_shape = input_shape)    

    if model_name == "InceptionV3":
        input_shape = (299, 299, 3)
        model_base = tf.keras.applications.InceptionV3(include_top = include_top, weights = "imagenet", input_shape = input_shape)      

    if model_name == "InceptionResNetV2":
        input_shape = (299, 299, 3)
        model_base = tf.keras.applications.InceptionResNetV2(include_top = include_top, weights = "imagenet", input_shape = input_shape)     

    if model_name == "DenseNet121":
        input_shape = (224, 224, 3)
        model_base = tf.keras.applications.DenseNet121(include_top = include_top, weights = "imagenet", input_shape = input_shape)

    if model_name == "DenseNet169":
        input_shape = (224, 224, 3)
        model_base = tf.keras.applications.DenseNet169(include_top = include_top, weights = "imagenet", input_shape = input_shape)

    if model_name == "DenseNet201":
        input_shape = (224, 224, 3)
        model_base = tf.keras.applications.DenseNet201(include_top = include_top, weights = "imagenet", input_shape = input_shape)

    if model_name == "NASNetLarge":
        input_shape = (331, 331, 3)
        model_base = tf.keras.applications.NASNetLarge(include_top = include_top, weights = "imagenet", input_shape = input_shape) 

    if model_name == "NASNetMobile":
        input_shape = (224, 224, 3)
        model_base = tf.keras.applications.NASNetMobile(include_top = include_top, weights = "imagenet", input_shape = input_shape)  

    output_tensor = tf.keras.layers.GlobalAveragePooling2D()(model_base.layers[-1].output)
    output_tensor = tf.keras.layers.Dense(units = 1, activation = tf.keras.activations.sigmoid)(output_tensor)
    model = tf.keras.models.Model(inputs = model_base.inputs, outputs = output_tensor)
    
    if print_model_summary:
        print("Model name:", model_name)
        print(model.summary())
    
    return(model)

In [11]:
def build_train_image_data_generators_flow_from_directory(model,
                                                          directories,
                                                          parameters,
                                                          augmentation_parameters = None):
    
    """
    model (obj): model
    directories (dict) : directories
    parameters (dict) : parameters
    augmentation_parameters (dict) : augmentation_parameters
    """
    
    if augmentation_parameters:
        rescale = dict(rescale = 1/255)
        augmentation_parameters = {**augmentation_parameters, **rescale}
    else:
        augmentation_parameters = dict(rescale = 1/255)
        
    print("Augmentation parameters:")
    for k, v in augmentation_parameters.items():
        print("->", k, ":", v)
    print("")
        
    train_data_generator = tf.keras.preprocessing.image.ImageDataGenerator(**augmentation_parameters)
    validation_data_generator = tf.keras.preprocessing.image.ImageDataGenerator(rescale = 1/255)
    
    train_data_generator = train_data_generator.flow_from_directory(directory = directories["train_directory"],
                                                                    target_size = (model.input_shape[1], model.input_shape[1]),
                                                                    batch_size = parameters["batch_size"],
                                                                    class_mode = parameters["class_mode"],
                                                                    shuffle = parameters["shuffle"])
    
    validation_data_generator = validation_data_generator.flow_from_directory(directory = directories["validation_directory"],
                                                                              target_size = (model.input_shape[1], model.input_shape[1]),
                                                                              batch_size = parameters["batch_size"],
                                                                              class_mode = parameters["class_mode"],
                                                                              shuffle = parameters["shuffle"])
    
    train_data_generators = dict(train_data_generator = train_data_generator,
                                 validation_data_generator = validation_data_generator)
    
    return train_data_generators

In [12]:
def build_callbacks(directories,
                    parameters):
    
    """
    directories (dict) : directories
    parameters (dict) : parameters
    """

    callbacks = [
        tf.keras.callbacks.ModelCheckpoint(
            filepath = directories["callback_model_checkpoint_directory"],
            monitor = parameters["monitor"],
            verbose = parameters["verbose"],
            save_best_only = parameters["save_best_only"]),
        tf.keras.callbacks.EarlyStopping(
            monitor = parameters["monitor"],
            verbose = parameters["verbose"],
            patience = parameters["early_stopping_patience"],
            restore_best_weights = parameters["restore_best_weights"]),
        tf.keras.callbacks.ReduceLROnPlateau(
            monitor = parameters["monitor"],
            verbose = parameters["verbose"]),
        tf.keras.callbacks.TensorBoard(
            log_dir = directories["callback_tensorboard_directory"]),
        tf.keras.callbacks.CSVLogger(
            filename = directories["callback_csv_logger_directory"],
            separator = ";",
            append = True)
    ]
    
    return callbacks

<tensorflow.python.keras.engine.functional.Functional at 0x1cd87feb048>

In [None]:
def optimize_model(model,
                   train_data_generators,
                   parameters,
                   callbacks):
    
    """
    model (obj) : model
    train_data_generators (dict) : train_data_generators
    parameters (dict) : parameters
    callbacks (list) : callbacks
    """
    
    history = model.fit_generator(
        generator = train_data_generators["train_data_generator"],
        steps_per_epoch = np.ceil(train_data_generators["train_data_generator"].n/train_data_generators["train_data_generator"].batch_size),
        epochs = parameters["epochs"],
        validation_data = train_data_generators["validation_data_generator"],
        validation_steps = np.ceil(train_data_generators["validation_data_generator"].n/train_data_generators["validation_data_generator"].batch_size),
        callbacks = callbacks       
    )
    
    results = pd.DataFrame(history.history)
    results["epoch"] = results.index + 1
    
    return results

In [None]:
def visualize_loss_accuracy_binary_classification(history):
    
    """
    history (dataframe) : history
    """
    
    min_loss = history[history["val_loss"] == min(history["val_loss"])]
    max_loss = history[history["val_loss"] == max(history["val_loss"])]
    min_accuracy = history[history["val_binary_accuracy"] == min(history["val_binary_accuracy"])]
    max_accuracy = history[history["val_binary_accuracy"] == max(history["val_binary_accuracy"])]
    epoch = history[history["epoch"] == max(history["epoch"])]["epoch"].iloc[0]

    plt.rcParams['figure.figsize'] = [25, 5]

    fig, ax = plt.subplots(1, 2)
    ax[0].plot(history["epoch"], 
               history["loss"], 
               color = "blue", 
               marker = "o", 
               linestyle = "--",
               label = 'Loss')
    ax[0].plot(history["epoch"],
               history["val_loss"], 
               color = "red", 
               marker = "o",
               linestyle = "--", 
               label = 'Val Loss')
    ax[0].legend(loc = "upper right")
    ax[0].set_xlabel("Epoch")
    ax[0].set_ylabel("Loss")
    ax[0].set_title("Loss plot")

    ax[0].annotate(text = "Val Loss = " + str(round(min_loss["val_loss"].iloc[0], 2)), 
                   xy = (min_loss["epoch"], min_loss["val_loss"]),
                   xytext = (epoch/2, 0.5 * (min_loss["val_loss"].iloc[0] + max_loss["val_loss"].iloc[0])),
                   arrowprops = {"arrowstyle": "->", "color": "black"})
    ax[0].grid()

    ax[1].plot(history["epoch"], 
               history["binary_accuracy"], 
               color = "blue",
               marker = "o", 
               linestyle = "--", 
               label = 'Accuracy')
    ax[1].plot(history["epoch"],
               history["val_binary_accuracy"],
               color = "red",
               marker = "o",
               linestyle = "--", 
               label = 'Val Accuracy')
    ax[1].legend(loc = "upper left")
    ax[1].set_xlabel("Epoch")
    ax[1].set_ylabel("Accuracy")
    ax[1].set_title("Accuracy plot")
    ax[1].annotate(text = "Val Accuracy = " + str(round(max_accuracy["val_binary_accuracy"].iloc[0], 2)), 
                   xy = (max_accuracy["epoch"], max_accuracy["val_binary_accuracy"]),
                   xytext = (epoch/2, 0.5 * (min_accuracy["val_binary_accuracy"].iloc[0] + max_accuracy["val_binary_accuracy"].iloc[0])),
                   arrowprops = {"arrowstyle": "->", "color": "black"})
    ax[1].grid()

    plt.show()

In [None]:
def build_evaluate_image_data_generators_flow_from_directory(model,
                                                             directories,
                                                             parameters):
    
    """
    model (obj): model
    directories (dict) : directories
    parameters (dict) : parameters
    """

    train_data_generator = tf.keras.preprocessing.image.ImageDataGenerator(rescale = 1/255)
    validation_data_generator = tf.keras.preprocessing.image.ImageDataGenerator(rescale = 1/255)
    test_data_generator = tf.keras.preprocessing.image.ImageDataGenerator(rescale = 1/255)
    
    train_data_generator = train_data_generator.flow_from_directory(directory = directories["train_directory"],
                                                                    target_size = (model.input_shape[1], model.input_shape[1]),
                                                                    batch_size = parameters["batch_size"],
                                                                    class_mode = parameters["class_mode"],
                                                                    shuffle = False)
    
    validation_data_generator = validation_data_generator.flow_from_directory(directory = directories["validation_directory"],
                                                                              target_size = (model.input_shape[1], model.input_shape[1]),
                                                                              batch_size = parameters["batch_size"],
                                                                              class_mode = parameters["class_mode"],
                                                                              shuffle = False)
    
    test_data_generator = test_data_generator.flow_from_directory(directory = directories["test_directory"],
                                                                  target_size = (model.input_shape[1], model.input_shape[1]),
                                                                  batch_size = parameters["batch_size"],
                                                                  class_mode = parameters["class_mode"],
                                                                  shuffle = False)
    
    evaluate_data_generators = dict(train_data_generator = train_data_generator,
                                    validation_data_generator = validation_data_generator,
                                    test_data_generator = test_data_generator)
    
    return evaluate_data_generators

In [None]:
def evaluate_binary_classification_model(model,
                                         evaluate_data_generators):
    
    """
    model (obj) : model
    evaluate_data_generators (dict) : evaluate_data_generators
    """
    
    print("Train generator evaluation")
    train_evaluation = model.evaluate_generator(
        generator = evaluate_data_generators["train_data_generator"], 
        steps = np.ceil(evaluate_data_generators["train_data_generator"].n/evaluate_data_generators["train_data_generator"].batch_size)
    )
    
    print("Validation generator evaluation")
    validation_evaluation = model.evaluate_generator(
        generator = evaluate_data_generators["validation_data_generator"], 
        steps = np.ceil(evaluate_data_generators["validation_data_generator"].n/evaluate_data_generators["validation_data_generator"].batch_size)
    )
    
    print("Test generator evaluation")
    test_evaluation = model.evaluate_generator(
        generator = evaluate_data_generators["test_data_generator"], 
        steps = np.ceil(evaluate_data_generators["test_data_generator"].n/evaluate_data_generators["test_data_generator"].batch_size)
    )

    evaluation = pd.DataFrame({"Train" : train_evaluation,
                               "Validation" : validation_evaluation,
                               "Test" : test_evaluation},
                               index = ["Loss", "Accuracy"])
    
    return evaluation