# Experiment tests

This notebook shows how to perform an experiment. To do an experiment we need:

- The dataset with the IXI-T1 slices
- The DataGenerator class that helps the Experiment with data acquisition and data augmentation
- The autoencoder model
- The Experiment() class. This class is not compulsory but is useful to help us get the results and pass the hyperparemeters. It is also useful to obtain the results of the experiment in a homogenous way, so that each test has a uniform output structure, which can then be used as the input in the evaluation stages of the project.

In this project, the experiments were run inside Kaggle kernels. Even though Kaggle offers the possibility to use "Utility scripts", we declined this option, so we need to include the models to use inline, as well as the Experiment() class itself and other dependencies such as a custom data generator.

In [None]:
# Library dependencies
import numpy as np
import glob
import traceback
import tensorflow as tf
import tensorflow_addons as tfa
import random
import keras
from keras import layers
import imageio
import pickle
from data_generator import DataGenerator

# The Experiment class
The experiment class works as a wrapper of common keras operations. Despite losing most of the keras utilities, we focus on the problem we are facing and thus we can only pass a limited list of hyperparameters that we think that could be important for our task. Some of these params. are the model, the number of epochs or the batch_size.

The instances of Experiment() write a homogenous output which consists of:

- A logs folder that can be interpreted by tensorboard
- A .history file, containing the summary of losses and metrics for each epoch
- A .h5 containing the architecture and the weights calculated during the training stage

In [None]:
class Experiment:
    _history = None

    def __init__(self, name='Exp', model: keras.Model = None, da_enabled=True, aug_policy='batch', train_gen=None, val_gen=None,
                 skull_stripped=True, path_to_dataset='', path_to_results='', epochs=25, es_patience=5,
                 batch_size=128, optimizer="Adam", loss="mse", metrics="mse", reduce_lr_on_plateau=True):
        self.name = name
        self.model = model
        self.da_enabled = da_enabled
        self.aug_policy = aug_policy
        self.train_gen = train_gen
        self.val_gen = val_gen
        self.skull_stripped = skull_stripped
        self.path_to_dataset = path_to_dataset
        self.path_to_results = path_to_results
        self.epochs = epochs
        self.es_patience = es_patience
        self.batch_size = batch_size
        self.optimizer = optimizer
        self.loss = loss
        self.metrics = metrics
        self.reduce_lr_on_plateau = reduce_lr_on_plateau

        if self.path_to_dataset == '':
            self.path_to_dataset = '../input/ixit1slices/IXI-T1-slices'
        if self.path_to_results == '':
            self.path_to_results = './'

        path_to_slices = '/skull-stripped' if self.skull_stripped else '/full'
        path_to_slices = self.path_to_dataset + path_to_slices
        path_to_train_slices = path_to_slices + '/train'
        path_to_val_slices = path_to_slices + '/val'

        self.train_gen = DataGenerator(base_dir=path_to_train_slices, batch_size=self.batch_size,
                                       Augment=self.da_enabled, AugmentationPolicy=self.aug_policy)

        self.val_gen = DataGenerator(base_dir=path_to_val_slices, batch_size=self.batch_size,
                                     Augment=self.da_enabled, AugmentationPolicy=self.aug_policy)

    def get_name(self):
        """
        The name of this experiment, which depends on the model's name and some of the experiment parameters
        :return:
        """
        da_config: str = 'da-yes' if self.da_enabled else 'da-no'
        skull_mode: str = 'skull-stripped' if self.skull_stripped else 'full-skull'
        rlronplateru = 'rlrop-Y' if self.reduce_lr_on_plateau else 'rlrop-N'
        return f'{self.name}_model-{self.model.name}_{skull_mode}_ep-{self.epochs}_bs-{self.batch_size}_{da_config}_loss-{self.loss}_{rlronplateru}'

    def start(self):
        """
        starts the experirment with the given params
        :return:
        """
        try:
            print('Experiment started')
            if self.model is None:
                raise Exception("No Model was provided")

            print("Model is about to start training")
            self.fit_model()

            print("Training has finished. Saving results.")
            self.save_results()
        except Exception as ex:
            print("General error executing the experiment: " + str(ex))
            print(traceback.format_exc())

    def fit_model(self):
        self.compile_model()

        # Callbacks
        callbacks = []
        callbacks.append(tf.keras.callbacks.TensorBoard(f"logs/{self.get_name()}"))
        callbacks.append(keras.callbacks.EarlyStopping(monitor='val_loss', mode='min', patience=self.es_patience,
                                                       verbose=1, restore_best_weights=True))
        if self.reduce_lr_on_plateau:
            callbacks.append(tf.keras.callbacks.ReduceLROnPlateau(monitor="val_loss", factor=0.2,
                                                                  patience=2, verbose=1, cooldown=1))

        self._history = self.model.fit(self.train_gen,
                                       epochs=self.epochs,
                                       batch_size=self.batch_size,
                                       validation_data=self.val_gen,
                                       callbacks=callbacks)

    def compile_model(self):
        loss_func = self.get_loss_function()
        self.model.compile(loss=loss_func, optimizer=self.optimizer, metrics=self.metrics)

    def get_loss_function(self):
        if self.loss == 'ssim':
            return self._loss_ssim
        elif self.loss == 'ms-ssim':
            return self._loss_ms_ssim
        elif self.loss == 'combined':
            return self._loss_mae_ssim
        else:
            return 'mse'

    def _loss_ssim(self, img, pred_img):
        return 1 - tf.reduce_mean(tf.image.ssim(pred_img, img, 1.0))

    def _loss_ms_ssim(self, img, pred_img):
        return 1 - tf.reduce_mean(tf.image.ssim_multiscale(pred_img, img, 1.0))

    def _loss_mae_ssim(self, img, pred_img):
        # https://arxiv.org/pdf/1511.08861.pdf
        # TODO
        return None


    def save_results(self):
        filename = self.path_to_results + self.get_name() + '.h5'
        self.model.save(filename)

        filename = self.path_to_results + self.get_name() + '.history'
        with open(filename, 'wb') as file_pi:
            pickle.dump(self._history.history, file_pi)


# The model
We need to include the model architecture or architectures we want to use in our experiments. In this example, we include the Convolutional Autoencoder based on residual blocks fith pre activation and skip connections.



In [None]:
class resnet_model():
    def get_model(self, activation='sigmoid', type='pre'):
        input_img = layers.Input(shape=(128, 128, 1))

        res_block = self._pre_activation_residual_block if type == 'pre' else residual_block
        conv2d = layers.Conv2D(32, (3, 3), strides=2, padding="same", name='Conv1')(input_img)
        resblock = res_block(conv2d, 64)
        resblock = res_block(resblock, 64, strides=1)
        encoder = res_block(resblock, 128, strides=2)

        decoder = self._transpose2d(encoder, 64)
        decoder = layers.Concatenate()([decoder, resblock])
        decoder = self._transpose2d(decoder, 32)
        decoder = layers.Concatenate()([decoder, conv2d])
        decoder = self._transpose2d(decoder, 16)
        decoder = layers.Conv2D(1, (3, 3), activation=activation, padding='same')(decoder)

        return Model(input_img, decoder, name="resnet_" + type)

    def _transpose2d(self, inputs, filters):
        block = layers.Conv2DTranspose(filters, (3, 3), strides=(2, 2), padding='same')(inputs)
        block = layers.BatchNormalization()(block)
        block = layers.ReLU()(block)
        return block

    def _residual_block(self, inputs, filters, strides=2):
        """
        See the block diagram:
        https://www.researchgate.net/figure/Architecture-of-normal-residual-block-a-and-pre-activation-residual-block-b_fig2_337691625
        """
        block = layers.Conv2D(filters=filters, kernel_size=(3, 3), strides=strides, padding="same")(inputs)
        block = layers.BatchNormalization()(block)
        block = layers.ReLU()(block)

        block = layers.Conv2D(filters=filters, kernel_size=(3, 3), strides=1, padding="same")(block)
        block = layers.BatchNormalization()(block)

        inputs = layers.Conv2D(filters=filters, kernel_size=(1, 1), strides=strides, padding="same")(inputs)

        block = layers.Add()([inputs, block])
        block = layers.ReLU()(block)
        return block

    def _pre_activation_residual_block(self, inputs, filters, strides=2):
        """
        See the block diagram:
        https://www.researchgate.net/figure/Architecture-of-normal-residual-block-a-and-pre-activation-residual-block-b_fig2_337691625
        "the pre-activation architecture is implemented by moving BN and ReLU activation function before convolution operation"
        """
        block = layers.BatchNormalization()(inputs)
        block = layers.ReLU()(block)

        block = layers.Conv2D(filters=filters, kernel_size=(3, 3), strides=strides, padding="same")(block)

        block = layers.BatchNormalization()(block)
        block = layers.ReLU()(block)

        block = layers.Conv2D(filters=filters, kernel_size=(3, 3), strides=1, padding="same")(block)

        if strides != 1:
            inputs = layers.Conv2D(filters=filters, kernel_size=(1, 1), strides=strides, padding="same")(inputs)

        block = layers.Add()([inputs, block])
        return block

The Experiment() class relies on the DataGenerator() class, a custom keras data generator specific for this project:

In [None]:
class DataGenerator(tf.keras.utils.Sequence):
    """
    Custom class to provide data to keras models
    AugmentationPolicy: can be => 'instance', 'batch'
    """
    def __init__(self, base_dir='.', batch_size=128, Shuffle=True, Augment=True, AugmentationPolicy='instance',
                 brain_amount=15):
        self.batch_size = batch_size
        self.base_dir = base_dir
        self.shuffle = Shuffle
        self.Augment = Augment
        self.AugmentationPolicy = AugmentationPolicy
        self.brain_amount = brain_amount

        self.files = glob.glob(f'{base_dir}/*.png')
        if self.brain_amount is not None:
            self._filter_according_to_bamount()

        if self.shuffle:
            random.shuffle(self.files)
        print(f'Data generator on {base_dir}, found {len(self.files)} PNG files')


    def _filter_according_to_bamount(self):
        filtered = []
        for file in self.files:
            ta = int(file.split('.')[-2].replace('ta', ''))
            # ta is the number of pixels belonging to brain tissue in the original 256x256 slice
            percentage = ta/(256*256)*100
            if percentage > self.brain_amount:
                filtered.append(file)
        self.files = filtered

    def __len__(self):
        return len(self.files) // self.batch_size
        # return math.ceil(len(self.files) / self.batch_size)

    def __getitem__(self, index):
        Y = []
        for i in range(index * self.batch_size, index * self.batch_size + self.batch_size):
            image_path = self.files[i]
            im = imageio.imread(image_path)
            im = im.astype(np.float32)
            im = im / 255
            im = im.reshape(256, 256, 1)
            im = tf.image.resize(im, [128, 128])
            Y.append(im)

        # Convert Y to numpy before performing augmentation
        Y = np.array(Y)

        # Augmentation according to policy
        if self.Augment:
            if self.AugmentationPolicy == 'instance':
                X = []
                for i in range(0, len(Y)):
                    augmented_y = self._augment(np.array([Y[i]]))
                    X.append(augmented_y[0])
                X = np.array(X)
            else:
                # All the images in batch will have the same augmentations
                X = Y
                X = self._augment(X)

        return X, Y

    #     def on_epoch_end(self):
    #         self.indexes = np.arange(len(self.list_IDs))
    #         if self.shuffle == True:
    #             np.random.shuffle(self.indexes)

    def _augment(self, images):
        # Each filter is applied with a probability of 25%, except cutout which is applied half of the times
        if random.randint(1, 4) == 1:
            images = self.add_noise(images)
        if random.randint(1, 4) == 1:
            images = self.dropout(images)
        if random.randint(1, 4) == 1:
            images = self.gaussian_blur(images)
        if random.randint(1, 2) == 1:
            images = self.cutout(images)
        return images

    def add_noise(self, images):
        sdev = 0 + (random.random() * (0.05 - 0))
        images = layers.GaussianNoise(stddev=sdev)(images, training=True)
        return images

    def dropout(self, images):
        rnds_noise = tf.random.uniform((1, 2), minval=0, maxval=0.04)
        images = tf.nn.dropout(images, rnds_noise[0][0])
        return images

    # https://www.tensorflow.org/addons/api_docs/python/tfa/image/gaussian_filter2d
    def gaussian_blur(self, images):
        images = tfa.image.gaussian_filter2d(images,
                                             filter_shape=[4, 4],
                                             sigma=0.8,
                                             constant_values=0,
                                             )
        return images

    def cutout(self, images):
        w = tf.random.uniform((), minval=10, maxval=20, dtype=tf.dtypes.int32)
        h = tf.random.uniform((), minval=10, maxval=20, dtype=tf.dtypes.int32)
        x = tf.random.uniform((), minval=20, maxval=105, dtype=tf.dtypes.int32)
        y = tf.random.uniform((), minval=40, maxval=105, dtype=tf.dtypes.int32)

        if w % 2 != 0:
            w += 1 if bool(random.getrandbits(1)) else -1
        if h % 2 != 0:
            h += 1 if bool(random.getrandbits(1)) else -1

        # image = tfa.image.random_cutout(image, mask_size=(w,h), constant_values=0)
        images = tfa.image.cutout(images,
                                  mask_size=(w, h),
                                  offset=(x, y),
                                  constant_values=0
                                  )
        return images


Up to this point, the above code is not new, since it was already introduced in other notebooks. The actual code that gets everything working consists of getting an instance of Experiment() and passing the appropriate parameters to then running the process:

In [None]:
exp = Experiment(bath_size=64, epochs = 20, da_enabled = True, skull_stripped = True, es_patience = 5)
exp.model = resnet_model().get_model()
exp.start()