### Install dependencies

Forces to install `keras` version `2.4.3` and `backports.cached-property` for old versions of python

In [None]:
!pip install -qU --force-reinstall --no-deps keras==2.4.3 backports.cached-property

## Initialization

In [None]:
import json
import os
import pathlib
import csv

from datetime import datetime
from enum import Enum

import numpy as np
import sklearn as sk
import sklearn.model_selection
import tensorflow as tf
import tensorflow.keras as kr

# for retrocompatibility if python version is less than 3.8
try:
    from functools import cached_property
except ImportError:
    from backports.cached_property import cached_property

from typing import Tuple, List, Dict, Any

In [None]:
print("tensorflow version: {}".format(tf.__version__))
print("keras version: {}".format(kr.__version__))
print("available gpu: {}".format(tf.test.gpu_device_name()))

Sets the seed to guarantee the experiment reproducibility

In [None]:
SEED = abs(hash("cookies")) // 2**32
tf.random.set_seed(SEED)
np.random.seed(SEED)

Sets some experiment's parameters

In [None]:
BS = 64
IMAGE_SIZE = (256, 256)
EPOCHS = 30
EPOCHS_FINE = 30

Defines the data paths

In [None]:
PATH_DATA = pathlib.Path("./") / "MaskDataset"

PATH_WORKING = pathlib.Path("./") / "working"

PATH_TRAINING = PATH_DATA / "training"
PATH_TRAINING_OUTPUT = PATH_DATA / "train_gt.json"

PATH_TEST = PATH_DATA / "test"

## Experiment helper

The code below provides some functions to manage dataset and output files

### category

Helping to convert categories to current encoding

In [None]:
class Category(Enum):
    NO_MASK = 0
    ALL_MASK = 1
    SOMEONE_MASK = 2

    @staticmethod
    def to_value(cat: int) -> int:
        if cat == 2:
            return (1,0)
        if cat == 1:
            return (1,1)
        if cat == 0:
            return (0,0)

    @staticmethod
    def from_value(value: int) -> int:
        if value == (1,0):
            return 2
        if value == (1,1):
            return 1
        if value == (0,0):
            return 0

### Dataset

`Dataset` takes care of loading in memory the images of the datasets

In [None]:
class Dataset:
    training: Tuple[np.array, np.array] = None
    validation: Tuple[np.array, np.array] = None
    test: Tuple[np.array, str] = None

    @classmethod
    def load_data(cls):
        images = []
        labels = []

        with open(PATH_TRAINING_OUTPUT) as output_file:
            output = json.load(output_file)
            for filename, category in output.items():
                labels.append(category)

                path_image = PATH_TRAINING / filename

                image = kr.preprocessing.image.load_img(path_image,
                                                        target_size=IMAGE_SIZE)
                image = kr.preprocessing.image.img_to_array(image)

                images.append(image)

        images = np.array(images, dtype=np.float32)
        labels = np.array(list(map(Category.to_value, labels)), dtype="float")

        dataset = sk.model_selection.train_test_split(images,
                                                      labels,
                                                      test_size=0.20,
                                                      stratify=labels,
                                                      random_state=SEED)

        cls.training = (dataset[0], dataset[2])
        cls.validation = (dataset[1], dataset[3])

        test_images = []
        test_filename = []
        for path_file in PATH_TEST.glob("*.jpg"):
            test_filename.append(path_file.name)

            image = kr.preprocessing.image.load_img(path_file,
                                                    target_size=IMAGE_SIZE)
            image = kr.preprocessing.image.img_to_array(image)

            test_images.append(image)

        cls.test = (np.array(test_images, dtype=np.float32), test_filename)

`DatasetGenerator` provides the three preprocessed datasets and it takes care of the process of data augmentation

In [None]:
class DatasetGenerator:
    def __init__(self, preprocessing_function=None):
        if Dataset.training is None:
            Dataset.load_data()

        self._preprocessing_function = preprocessing_function

    @cached_property
    def training(self) -> kr.preprocessing.image.NumpyArrayIterator:
        return kr.preprocessing.image.ImageDataGenerator(
            preprocessing_function=self._preprocessing_function,
            rotation_range=20,
            zoom_range=0.15,
            width_shift_range=0.2,
            height_shift_range=0.2,
            shear_range=0.15,
            horizontal_flip=True,
            fill_mode="nearest").flow(*Dataset.training, batch_size=BS)

    @cached_property
    def validation(self) -> kr.preprocessing.image.NumpyArrayIterator:
        return kr.preprocessing.image.ImageDataGenerator(
            preprocessing_function=self._preprocessing_function).flow(
                *Dataset.validation, batch_size=BS)

    @cached_property
    def test(self) -> Tuple[kr.preprocessing.image.NumpyArrayIterator, str]:
        return (kr.preprocessing.image.ImageDataGenerator(
            preprocessing_function=self._preprocessing_function).flow(
                Dataset.test[0], batch_size=1, shuffle=False), Dataset.test[1])

### Experiment

`Experiment` simplifies the process of experiment realization providing callbacks and some methods to save the model and to compute the prediction

In [None]:
class Experiment:
    def __init__(self, exp_name: str = "exp", exp_id: str = None, preprocessing_function=None, **kwargs):
        self.dataset = DatasetGenerator(preprocessing_function)

        self._id = "{}_{}".format(
            datetime.now().strftime("%m-%d_%H-%M"),
            exp_name
        ) if exp_id is None else exp_id

        self._path = PATH_WORKING / self._id
        
        self._path_checkpoints = self._path / "checkpoints"
        self._path_tensorboard_log = self._path / "tb_log"
        self._path_models = self._path / "models"
        self._path_results = self._path / "results.csv"

    def get_callbacks(self,
                      checkpoints: bool = False,
                      tensorboard: bool = True,
                      early_stopping: bool = True
                      ):
        # returns a list of desidered callbacks for the fit process
        callbacks = []

        # if they are required, checkpoints will be saved in a specific experiment subdirecotry
        if checkpoints:
            os.makedirs(self._path_checkpoints, exist_ok=True)

            callbacks.append(kr.callbacks.ModelCheckpoint(
                self._path_checkpoints / "cp-{epoch:04d}.ckpt"
            ))

        # if they are required, tensorboard files will be generated
        if tensorboard:
            os.makedirs(self._path_tensorboard_log, exist_ok=True)

            callbacks.append(kr.callbacks.TensorBoard(
                self._path_tensorboard_log,
                histogram_freq=1,
                profile_batch=0
            ))
        
        # if it is required, a callback for early stopping will be enabled
        if early_stopping:
            callbacks.append(kr.callbacks.EarlyStopping(
                patience=5,
                restore_best_weights=True
            ))

        return callbacks

    def save_model(self, model: tf.keras.Model, model_id: str = None):
        # saves the provided model in the experiment models path, if model_id is provided saves the model in a subdirectory
        os.makedirs(self._path_models, exist_ok=True)
        
        model.save(self._path_models if model_id is None else self._path_models / model_id)

    def load_model(self, model_id: str = None) -> kr.Model:
        # loads the model from path of experiment, if a model_id is provided loads the specific model
        return kr.models.load_model(self._path_models if model_id is None else self._path_models / model_id)

    def get_test_prediction(self, model: tf.keras.Model):
        # computes the prediction for the test set and returns a dict of name file -> predicted category 
        dataset_test = self.dataset.test

        preditcions_list = model.predict(dataset_test[0], batch_size=BS)

        output = {}
        
        for i in range(len(preditcions_list)):
            code = (
                0 if preditcions_list[i][0] < 0.5 else 1,
                0 if preditcions_list[i][1] < 0.5 else 1
            )

            output[dataset_test[1][i]] = Category.from_value(code)
        
        return output

    def mk_report_test_prediction(self, model: tf.keras.Model):
        # creates a results.csv for the submission on kaggle
        os.makedirs(self._path, exist_ok=True)

        predictions = self.get_test_prediction(model)

        with open(self._path_results, mode='w') as file:
            csv_file = csv.writer(file)
            csv_file.writerow(["Id", "Category"])

            for f, c in predictions.items():
                csv_file.writerow([f, c])

## Model

Prepares a new experiment helper

In [None]:
exp = Experiment(
    "tl_ordered_vgg19",
    preprocessing_function=kr.applications.vgg19.preprocess_input
)

Creates the custom bias layer

In [None]:
class BiasLayer(kr.layers.Layer):
    def __init__(self, units, *args, **kwargs):
        super(BiasLayer, self).__init__(*args, **kwargs)
        self.bias = self.add_weight(
            "bias",
            shape=[units],
            initializer="zeros",
            trainable=True
        )

    def call(self, x):
        return x + self.bias

Loads the pretrained model `VGG19` trained with `imagenet` dataset, without classification layers

In [None]:
input_shape = Dataset.training[0].shape[1:]
output_shape = Dataset.training[1].shape[1]

pretrained_model = kr.applications.VGG19(
    weights="imagenet",
    include_top=False,
    input_shape=input_shape
)

Attaches a new classification model to the pre-trained model and creates a new model from this net with bias layer

In [None]:
x = pretrained_model.output
x = kr.layers.AveragePooling2D(pool_size=(7, 7))(x)
x = kr.layers.Flatten()(x)
x = kr.layers.Dense(256, activation=kr.activations.relu)(x)
x = kr.layers.Dropout(0.5)(x)
# added layers for bias
x = kr.layers.Dense(2, use_bias=False)(x)
x = BiasLayer(2)(x)
x = kr.layers.Activation("sigmoid")(x)

model = kr.Model(inputs=pretrained_model.input, outputs=x)

Freezes all the layers of the pretrained model

In [None]:
for layer in pretrained_model.layers:
    layer.trainable = False

Compiles the new model

In [None]:
model.compile(
    optimizer=kr.optimizers.Adam(lr=1e-4, decay=1e-4 / EPOCHS),
    loss='binary_crossentropy',
    metrics=[
        tf.keras.metrics.BinaryAccuracy(threshold=0.5)
    ]
)

model.summary()

### Classifier training

Starts the training of classifier part

In [None]:
model.fit(
    exp.dataset.training,
    validation_data=exp.dataset.validation,
    epochs=EPOCHS,
    steps_per_epoch=len(Dataset.training[0]) // BS,
    validation_steps=len(Dataset.validation[0]) // BS,
    callbacks=exp.get_callbacks()
)

Saves the model trained only for its classifier part

In [None]:
exp.save_model(model,"rough")

### Fine tunning

Unfreezes the whole model

In [None]:
model.trainable = True

And recompiles it with a smaller learning rate

In [None]:
model.compile(
    optimizer=kr.optimizers.Adam(lr=1e-5, decay=1e-5 / EPOCHS_FINE),
    loss='binary_crossentropy',
    metrics=[
        tf.keras.metrics.BinaryAccuracy(threshold=0.5)
    ]
)

model.summary()

Starts the fine tuning

In [None]:
model.fit(
    exp.dataset.training,
    validation_data=exp.dataset.validation,
    epochs=EPOCHS_FINE,
    steps_per_epoch=len(Dataset.training[0]) // BS,
    validation_steps=len(Dataset.validation[0]) // BS,
    callbacks=exp.get_callbacks()
)

Saves the tuned model

In [None]:
exp.save_model(model,"fine")

## Prediction

Creates the results file for the kaggle submission, exploiting the tuned model

In [None]:
exp.mk_report_test_prediction(model)