In [None]:
import keras.layers
import tensorflow as tf
import numpy as np
import os
import random
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from datetime import datetime

In [None]:
tfk = tf.keras
tfkl = tf.keras.layers
tfk.__version__

# Random seed for reproducibility

seed = 42

random.seed(seed)
os.environ['PYTHONHASHSEED'] = str(seed)
np.random.seed(seed)
tf.random.set_seed(seed)
tf.compat.v1.set_random_seed(seed)

In [None]:
#Mount drive
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
# Method for checkpoints save and callbacks 

def create_folders_and_callbacks(model_name):

    exps_dir = './Challenge_CheckPoints'
    if not os.path.exists(exps_dir):
        os.makedirs(exps_dir)

    now = datetime.now().strftime('%b%d_%H-%M-%S')

    exp_dir = os.path.join(exps_dir, model_name + '_' + str(now))
    if not os.path.exists(exp_dir):
        os.makedirs(exp_dir)

    callbacks = []

    # Model checkpoint

    ckpt_dir = os.path.join(exp_dir, 'ckpts')
    if not os.path.exists(ckpt_dir):
        os.makedirs(ckpt_dir)

    ckpt_callback = tf.keras.callbacks.ModelCheckpoint(filepath=os.path.join(ckpt_dir, 'cp'),
                                                       save_weights_only=False,      
                                                       save_best_only=True)         
    callbacks.append(ckpt_callback)

    # Visualize Learning on Tensorboard

    tb_dir = os.path.join(exp_dir, 'tb_logs')
    if not os.path.exists(tb_dir):
        os.makedirs(tb_dir)

    tb_callback = tf.keras.callbacks.TensorBoard(log_dir=tb_dir,
                                                 profile_batch=0,
                                                 histogram_freq=1) 
    callbacks.append(tb_callback)

    # Early Stopping

    es_callback = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True)
    callbacks.append(es_callback)

    return callbacks


In [None]:
# Dataset folders
training_dir = "/content/training"

split_seed = 123  # Splitting seed needed to avoid overlap in training and validation set
batch_size = 32

# Custom function for gaussian noise 

def add_noise(img):
    # Add random noise to an image
    VARIABILITY = 25
    deviation = VARIABILITY*random.random()
    noise = np.random.normal(0, deviation, img.shape)
    img += noise
    np.clip(img, 0., 255.)
    return img

# Data augmentation for training set

train_data_gen = ImageDataGenerator(
    validation_split=0.1,
    rescale=1./255,
    shear_range=0.2,
    zoom_range=0.3,
    horizontal_flip=True,
    vertical_flip=True,
    brightness_range=(0.3, 1.7),
    rotation_range=180,
    height_shift_range=0.3,
    width_shift_range=0.3,
    fill_mode='reflect',
    channel_shift_range=0.5,
    preprocessing_function=add_noise,
    )

valid_data_gen = ImageDataGenerator(
    rescale=1./255,
    validation_split=0.1,
)

# Data load from directory and split in validation and training

training_set = train_data_gen.flow_from_directory(
    directory=training_dir,
    target_size=(256, 256),
    color_mode='rgb',
    classes=None,
    batch_size=batch_size,
    shuffle=True,
    subset='training',
    seed=seed)

validation_set = valid_data_gen.flow_from_directory(
    directory=training_dir,
    target_size=(256, 256),
    color_mode='rgb',
    classes=None,
    batch_size=batch_size,
    shuffle=True,
    subset='validation',
    seed=seed)

Found 15963 images belonging to 14 classes.
Found 1765 images belonging to 14 classes.


In [None]:
# Custom model made with only maxpoolings and convolutions, also add some regularization in order to acheive better performances
# There is a batch normalization after each convolution and before the relu
# There is a dropout layer after every maxpooling with 0.2

epochs = 150
input_shape = (256, 256, 3)

def build_model(input_shape):

    #input layer of the CNN
    input_layer = tfkl.Input(shape=input_shape, name='Input')


    # Convolutional part, increase the number of filter (32, 64, 64, 128, 128)
    conv1 = tfkl.Conv2D(
        filters=32,
        kernel_size=(3, 3),
        strides=(1, 1),
        padding='same',
        kernel_initializer='he_uniform'
    )(input_layer)

    bn1 = tfkl.BatchNormalization()(conv1)

    relu1 = tfkl.Activation('relu')(bn1)

    pool1 = tfkl.MaxPooling2D(
        pool_size=2,
        strides=3
    )(relu1)

    dp1 = tfkl.Dropout(0.2)(pool1)

    conv2 = tfkl.Conv2D(
        filters=64,
        kernel_size=(3, 3),
        strides=(1, 1),
        padding='same',
        kernel_initializer='he_uniform'
    )(dp1)

    bn2 = tfkl.BatchNormalization()(conv2)

    relu2 = tfkl.Activation('relu')(bn2)

    conv3 = tfkl.Conv2D(
        filters=64,
        kernel_size=(3, 3),
        strides=(1, 1),
        padding='same',
        activation='relu',
        kernel_initializer='he_uniform'
    )(relu2)

    bn3 = tfkl.BatchNormalization()(conv3)

    relu3 = tfkl.Activation('relu')(bn3)

    pool2 = tfkl.MaxPooling2D(
        pool_size=(2, 2),
        strides=2
    )(relu3)

    dp2 = tfkl.Dropout(0.2)(pool2)

    conv4 = tfkl.Conv2D(
        filters=128,
        kernel_size=(3, 3),
        strides=(1, 1),
        activation='relu',
        kernel_initializer='he_uniform'
    )(dp2)

    bn4 = tfkl.BatchNormalization()(conv4)

    relu4 = tfkl.Activation('relu')(bn4)

    conv5 = tfkl.Conv2D(
        filters=128,
        kernel_size=(3, 3),
        strides=(1, 1),
        padding='same',
        activation='relu',
        kernel_initializer='he_uniform'
    )(relu4)

    bn5 = tfkl.BatchNormalization()(conv5)

    relu5 = tfkl.Activation('relu')(bn5)

    pool3 = tfkl.MaxPooling2D(
        pool_size=(2, 2),
        strides=2
    )(relu5)

    dp3 = tfkl.Dropout(0.2)(pool3)


    # Global average pooling in order to prepare data to feed the FC part
    global_average = tfkl.GlobalAveragePooling2D()(dp3)


    # FC part 
    dense1 = tfkl.Dense(units=512, activation='relu', kernel_initializer='he_uniform')(global_average)

    bn6 = tfkl.BatchNormalization()(dense1)

    dp4 = tfkl.Dropout(0.5)(bn6)

    output_layer = tfkl.Dense(units=14, activation='softmax', kernel_initializer='he_uniform',
                              name='Output')(dp4)

    # Connect input and output through the Model class
    model = tfk.Model(inputs=input_layer, outputs=output_layer, name='model')

    # Compile the model
    model.compile(loss=tfk.losses.CategoricalCrossentropy(), optimizer=tfk.optimizers.Adam(1e-3), metrics='accuracy')

    # Return the model
    return model


model = build_model(input_shape)
model.summary()

# Create folders and callbacks
locals_callbacks = create_folders_and_callbacks(model_name='myCustomModel')


# Train the model
history = model.fit(
    training_set,
    epochs=epochs,
    validation_data=validation_set,
    callbacks=locals_callbacks
).history

# Save best epoch model
model.save("ModelSave/myCustomModelSave")


In [None]:
# Transfer learning with Xception

epochs = 150
input_shape = (256, 256, 3)

# Import Xception pretrained application without the FC part

base_model = tf.keras.applications.Xception(
    include_top=False,
    weights="imagenet",
    input_tensor=None,
    input_shape=input_shape,
    pooling=None,
)

# Freeze every layer in the net
base_model.trainable = False

def build_model(input_shape):

    # Use the xception feature extraction part
    x = base_model.output
    
    #Build a custom FC part
    global_average = tfkl.GlobalAveragePooling2D()(x)
    
    dense1 = tfkl.Dense(units=1024, activation='relu', kernel_initializer='he_uniform')(global_average)

    bn6 = tfkl.BatchNormalization()(dense1)

    dp4 = tfkl.Dropout(0.5)(bn6)

    output_layer = tfkl.Dense(units=14, activation='softmax', kernel_initializer='he_uniform',
                              name='Output')(dp4)

    # Connect input and output through the Model class
    model = tfk.Model(inputs=base_model.input, outputs=output_layer, name='model')

    # Compile the model
    model.compile(loss=tfk.losses.CategoricalCrossentropy(), optimizer=tfk.optimizers.Adam(1e-3), metrics='accuracy')

    # Return the model
    return model


model = build_model(input_shape)
model.summary() 
    
    
# Create folders and callbacks and fit
locals_callbacks = create_folders_and_callbacks(model_name='transferXception')


# Train the model
history = model.fit(
	training_set,
	epochs=epochs,
	validation_data=validation_set,
	callbacks=locals_callbacks
).history

# Fine tuning part

# Unfreez s number of layer in the Xception
for layer in model.layers[:15]:
    layer.trainable = False
for layer in model.layers[15:]:
    layer.trainable = True

# Recompile the model with very low learning rate
model.compile(loss=tfk.losses.CategoricalCrossentropy(), optimizer=tfk.optimizers.Adam(1e-5), metrics='accuracy')

# Callbacks
locals_callbacks1 = create_folders_and_callbacks(model_name='transferXception1')

# Retrain the model
history = model.fit(
	training_set,
	epochs=epochs,
	validation_data=validation_set,
	callbacks=locals_callbacks1
).history

# Save model
model.save("ModelSave/transferXceptionSave")

In [None]:
#Fine tuning of EfficientNetB2. Among all the experiments on EfficientNet's fine tuning, this one was the most promising,
#returning a 92.2% test accuracy in the development phase. Other worse experiments are not included here.

#Import EfficientNetB2 architecture
from tensorflow.keras.applications import EfficientNetB2

#Model's hyperparameters
input_shape = (256,256,3)
epochs=100
learning_rate=0.001
classes_count = 14
batch_size=64
training_directory='/content/drive/MyDrive/ANN_Chall/training'

#Function passed to ImageDataGenerator to add random noise to training images
def add_noise(img):
    '''Add random noise to an image'''
    VARIABILITY = 25
    deviation = VARIABILITY*random.random()
    noise = np.random.normal(0, deviation, img.shape)
    img += noise
    np.clip(img, 0., 255.)
    return img

#Use ImageDataGenerator to perform data augmentation and split into training and validation set.
#The rescale attribute in ImageDataGenerator is willingly commented because EfficientNetB2 already has 
#a recaling layer.

split_seed =123
augmenter = tfk.preprocessing.image.ImageDataGenerator(
    validation_split=0.1, 
    featurewise_std_normalization=True,
    rotation_range=90,
    width_shift_range=0.6,
    height_shift_range=0.6,
    horizontal_flip=True,
    #rescale=1/255.,
    brightness_range=[0.4,0.8],
    zoom_range=[0.8,1.2],
    shear_range=0.3,
    preprocessing_function=add_noise
)

training_set = augmenter.flow_from_directory(
    directory=training_directory,
    class_mode="categorical",
    color_mode="rgb",
    subset="training",
    seed=split_seed,
    batch_size=batch_size,
    target_size=(256,256)
    )

validation_set = augmenter.flow_from_directory(
    directory=training_directory,
    class_mode="categorical",
    color_mode="rgb",
    subset="validation",
    seed=split_seed,
    batch_size=batch_size,
    target_size=(256,256)
    )

#Initiate EfficientNetB2, but exclude the top layers. We will later replace the top layers with
#a 1024 units Dense layer
effnet = EfficientNetB2(weights='imagenet',input_shape=input_shape, include_top=False)

#"Freeze" every layer of EfficientNetB2 except for the last 30
for layer in effnet.layers[:-30]:
    layer.trainable=False
for i, layer in enumerate(effnet.layers):
    print(i, layer.name, layer.trainable)

#Build the entire model by adding GlobalAveragePooling, a Dense layer with l2 regularization, a Dropout layer and an output layer 
pooling = tfkl.GlobalAveragePooling2D()(effnet.output)
dense = tfkl.Dense(units=1024, activation='relu', kernel_regularizer=tfk.regularizers.l2(0.001))(pooling)
drop = tfkl.Dropout(0.4)(dense) # Dropout layer to reduce overfitting
output = tfkl.Dense(classes_count, activation='softmax')(drop) # Softmax for multiclass
transfer_model = tfk.models.Model(effnet.input, outputs=output)

transfer_model.summary()

#Create two callbacks

from keras.callbacks import ReduceLROnPlateau

#This one redices the learning rate when the validation accuracy is at a plateau
lr_reduce = ReduceLROnPlateau(monitor='val_accuracy', factor=0.6, patience=8, verbose=1, mode='max', min_lr=5e-5)

#"Classical" early stop with restoring of best weights
early_stop = tfk.callbacks.EarlyStopping(monitor='val_loss', mode='min',patience=5, restore_best_weights=True)

#Compile the model. We used Adam as optimizer.
transfer_model.compile(optimizer=tfk.optimizers.Adam(learning_rate), loss='categorical_crossentropy', metrics=['accuracy'])

#Train for 100 epochs
transfer_model.fit(training_set, validation_data=validation_set , callbacks = [early_stop,lr_reduce], epochs=100, batch_size=batch_size)

tfk.models.save_model(transfer_model, './')
