# Project of Advanced Machine Learning

## Import Libraries

In [None]:
pip install -U keras-tuner

In [None]:
import warnings
warnings.filterwarnings('ignore')

import os
import csv
import logging
import numpy as np
import pandas as pd
import scipy.io
import matplotlib.pyplot as plt
import tarfile
import PIL
from sklearn.model_selection import train_test_split
from tensorflow.keras.preprocessing.image import ImageDataGenerator
import tensorflow.keras.backend
from tensorflow.keras.applications.vgg16 import VGG16
from tensorflow.keras.applications.xception import Xception
from tensorflow.keras.applications.resnet_v2 import ResNet50V2
from tensorflow.keras.applications.mobilenet_v2 import MobileNetV2
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Convolution2D, MaxPooling2D, Dropout, Flatten, GlobalAveragePooling2D
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping
from keras.wrappers.scikit_learn import KerasClassifier
from tensorflow.keras.regularizers import l1, l2
from tensorflow.keras.metrics import AUC
from kerastuner.tuners import RandomSearch

tensorflow.get_logger().setLevel(logging.ERROR)

## Utility functions

In [None]:
def get_image_names(tgz):
    with tarfile.open(tgz) as file:
        return [i.name for i in file.getmembers() if i.isfile()]

def show_random_images(n=4, size=10, label=None):
    if(label != None):
        images = df[df['label'] == str(label)].sample(n=(n*n))
    else:
        images = df.sample(n=(n*n))
        
    plt.figure(figsize=(size, size))
    for index, path in enumerate(images['id'].values):
        plt.subplot(n, n, index+1)
        plt.imshow(PIL.Image.open(DATA_PATH + 'images/' + path))
        plt.title('Label: ' + str(images['label'].values[index]))
        plt.axis('off')
    plt.show()

def show_random_images_aug(aug, n=2, size=10):
    x, y = aug.next()
    
    plt.figure(figsize=(size, size))
    for i in range(0, (n*n)):
        plt.subplot(n, n, i+1)
        plt.imshow(x[i])
        plt.title('Label: ' + str(np.where(y[i] == 1)[0][0]))
        plt.axis('off')
    plt.show()
    
def plot_history(history):
    plt.figure(figsize=(10,5))
    plt.plot(history.history['accuracy'], 'orange', label='Training Accuracy')
    plt.plot(history.history['val_accuracy'], 'royalblue', label='Validation Accuracy')
    plt.title('Training and Validation Accuracy')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy')
    plt.legend()

    plt.figure(figsize=(10,5))
    plt.plot(history.history['precision'], 'orange', label='Training Precision')
    plt.plot(history.history['val_precision'], 'royalblue', label='Validation Precision')
    plt.title('Training and Validation Precision')
    plt.xlabel('Epoch')
    plt.ylabel('Precision')
    plt.legend()

    plt.figure(figsize=(10,5))
    plt.plot(history.history['recall'], 'orange', label='Training Recall')
    plt.plot(history.history['val_recall'], 'royalblue', label='Validation Recall')
    plt.title('Training and Validation Recall')
    plt.xlabel('Epoch')
    plt.ylabel('Recall')
    plt.legend()

    plt.figure(figsize=(10,5))
    plt.plot(history.history['f1_score'], 'orange', label='Training F1-Score')
    plt.plot(history.history['val_f1_score'], 'royalblue', label='Validation F1-Score')
    plt.title('Training and Validation F1-Score')
    plt.xlabel('Epoch')
    plt.ylabel('F1-Score')
    plt.legend()

    plt.figure(figsize=(10,5))
    plt.plot(history.history['loss'], 'orange', label='Training Loss')
    plt.plot(history.history['val_loss'], 'royalblue', label='Validation Loss')
    plt.title('Training and Validation Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend()

    plt.show()

def true_positives(y_true, y_pred):
    return tensorflow.keras.backend.sum(tensorflow.keras.backend.round(tensorflow.keras.backend.clip(y_true * y_pred, 0, 1)))

def precision(y_true, y_pred):
    predicted_positives = tensorflow.keras.backend.sum(tensorflow.keras.backend.round(tensorflow.keras.backend.clip(y_pred, 0, 1)))

    return true_positives(y_true, y_pred) / (predicted_positives + tensorflow.keras.backend.epsilon())

def recall(y_true, y_pred):
    possible_positives = tensorflow.keras.backend.sum(tensorflow.keras.backend.round(tensorflow.keras.backend.clip(y_true, 0, 1)))

    return true_positives(y_true, y_pred) / (possible_positives + tensorflow.keras.backend.epsilon())

def f1_score(y_true, y_pred):
    possible_positives = tensorflow.keras.backend.sum(tensorflow.keras.backend.round(tensorflow.keras.backend.clip(y_true, 0, 1)))
    predicted_positives = tensorflow.keras.backend.sum(tensorflow.keras.backend.round(tensorflow.keras.backend.clip(y_pred, 0, 1)))

    return 2*(precision(y_true, y_pred)*recall(y_true, y_pred))/(precision(y_true, y_pred)+recall(y_true, y_pred)+tensorflow.keras.backend.epsilon())

## Configuration parameters

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
DATA_PATH = '/content/drive/My Drive/data/' #  './data/'

plt.style.use('dark_background')

TEST_RATIO = 0.3
VALIDATION_RATIO = 0.2
NUM_CLASSES = 102
IMG_SIZE = 250
IMG_CHANNELS = 3
INPUT_SHAPE = (IMG_SIZE, IMG_SIZE, IMG_CHANNELS)

SEED = 42
np.random.seed(SEED)

VERBOSE = 1
BATCH_SIZE = 32
PATIENCE = 5

CHECKPOINT = ModelCheckpoint((DATA_PATH + 'model.hdf5'), monitor=['val_accuracy'], verbose=VERBOSE, mode='max')
EARLYSTOP = EarlyStopping(monitor='val_accuracy', patience=PATIENCE, restore_best_weights=True)

## Base models

In [None]:
model_vgg = VGG16(weights='imagenet', include_top=False, input_shape=INPUT_SHAPE)
model_vgg.trainable = False

model_vgg.summary()

model_xception = Xception(weights='imagenet', include_top=False, input_shape=INPUT_SHAPE)
model_xception.trainable = False

model_xception.summary()

model_resnet = ResNet50V2(weights='imagenet', include_top=False, input_shape=INPUT_SHAPE)
model_resnet.trainable = False

model_resnet.summary()

## Import data

In [None]:
df = pd.DataFrame()
df['id'] = sorted(get_image_names(DATA_PATH + 'images.tgz'))
df['label'] = scipy.io.loadmat(DATA_PATH + 'labels.mat')['labels'][0] - 1
df['label'] = df['label'].astype('str')

tarfile.open(DATA_PATH + 'images.tgz').extractall(DATA_PATH + 'images/')

In [None]:
print('Size of dataset:')
print(df.count())

print('\nList of images:')
print(os.listdir(DATA_PATH + 'images/jpg')[:10])

print('\nDataframe:')
print(df.head(10))

print('\nNumber of classes:')
print(df['label'].nunique())

print('\nImages for each class:')
print(df['label'].value_counts())

print('\nSample images:')
show_random_images()

## Dataset split

In [None]:
train_X, test_X, train_y, test_y = train_test_split(
    df['id'], 
    df['label'], 
    test_size=TEST_RATIO, 
    random_state=SEED, 
    stratify=df['label']
)

train_X, validation_X, train_y, validation_y = train_test_split(
    train_X, 
    train_y, 
    test_size=(VALIDATION_RATIO/(1-TEST_RATIO)), 
    random_state=SEED,
    stratify=train_y
)

train = pd.DataFrame(train_X)
train['label'] = train_y

validation = pd.DataFrame(validation_X)
validation['label'] = validation_y

test = pd.DataFrame(test_X)
test['label'] = test_y

In [None]:
print('Size of train set:')
print(train.shape)

print('\nDataframe of the train set:')
print(train.head(10))

print('\nImages for each class of the train set:')
print(train['label'].value_counts())

print('\nSize of validation set:')
print(validation.shape)

print('\nDataframe of the validation set:')
print(validation.head(10))

print('\nImages for each class of the validation set:')
print(validation['label'].value_counts())

print('\nSize of test set:')
print(test.shape)

print('\nDataframe of the test set:')
print(test.head(10))

print('\nImages for each class of the test set:')
print(test['label'].value_counts())

## Data preprocessing

In [None]:
pd.DataFrame(df['label'].value_counts(sort=True)).plot(kind='barh', figsize=(10, 20), color='royalblue')

In [None]:
pd.DataFrame(train['label'].value_counts(sort=True)).plot(kind='barh', figsize=(10, 20), color='royalblue')

In [None]:
pd.DataFrame(validation['label'].value_counts(sort=True)).plot(kind='barh', figsize=(10, 20), color='royalblue')

In [None]:
pd.DataFrame(test['label'].value_counts(sort=True)).plot(kind='barh', figsize=(10, 20), color='royalblue')

### Data augmentation

In [None]:
train_aug_datagen = ImageDataGenerator(
    rotation_range=45, 
    width_shift_range=0.1, 
    height_shift_range=0.1, 
    brightness_range=[0.5, 1.5], 
    shear_range=0.15, 
    zoom_range=[0.75, 1.25], 
    fill_mode="nearest", 
    horizontal_flip=True, 
    rescale=1./255
)
train_aug = train_aug_datagen.flow_from_dataframe(
    dataframe=train, 
    directory=(DATA_PATH + 'images/'), 
    x_col='id', 
    y_col='label', 
    target_size=(IMG_SIZE, IMG_SIZE), 
    batch_size=BATCH_SIZE, 
    class_mode='categorical', 
    shuffle=True, 
    seed=SEED
)

validation_aug_datagen = ImageDataGenerator(
    rescale=1./255
)
validation_aug = validation_aug_datagen.flow_from_dataframe(
    dataframe=validation, 
    directory=(DATA_PATH + 'images/'), 
    x_col='id', 
    y_col='label', 
    target_size=(IMG_SIZE, IMG_SIZE), 
    batch_size=BATCH_SIZE, 
    class_mode='categorical', 
    shuffle=True, 
    seed=SEED
)

test_aug_datagen = ImageDataGenerator(
    rescale=1./255
)
test_aug = test_aug_datagen.flow_from_dataframe(
    dataframe=test, 
    directory=(DATA_PATH + 'images/'), 
    x_col='id', 
    y_col='label', 
    target_size=(IMG_SIZE, IMG_SIZE), 
    batch_size=BATCH_SIZE, 
    class_mode='categorical', 
    shuffle=True, 
    seed=SEED
)

In [None]:
print('Sample augmented images of the train set:')
show_random_images_aug(aug=train_aug)

print('Sample augmented images of the validation set:')
show_random_images_aug(aug=validation_aug)

print('Sample augmented images of the test set:')
show_random_images_aug(aug=test_aug)

## Model building

In [None]:
tensorflow.keras.backend.clear_session()

### Model creation for manual tuning of hyperparameter

In [None]:
def create_model(BASE_MODEL, LR, DROPOUT_RATE, NUM_UNITS, ACTIVATION, REGULARIZER=None):
    BASE_MODEL_TEMP = BASE_MODEL
    BASE_MODEL_TEMP.trainable = True

    # fine_tune_at = int(round(len(BASE_MODEL_TEMP.layers)))
    # for layer in BASE_MODEL_TEMP.layers[:fine_tune_at]:
    #     layer.trainable = False
    
    # for layer in BASE_MODEL_TEMP.layers: 
    #     layer.build(layer.input_shape)

    model = Sequential()
    model.add(BASE_MODEL_TEMP)
    model.add(MaxPooling2D(pool_size=(2, 2)))
    model.add(Flatten())
    model.add(Dense(NUM_UNITS, activation=ACTIVATION, kernel_regularizer=REGULARIZER))
    model.add(Dropout(DROPOUT_RATE))
    model.add(Dense(NUM_CLASSES, activation='softmax'))

    model.compile(
        loss='categorical_crossentropy', 
        optimizer=Adam(lr=LR), 
        metrics=[precision, recall, f1_score, AUC(), 'accuracy']
    )
    
    return model

### Model creation for automatic tuning of hyperparameter

In [None]:
def build_model(hp):
    BASE_MODEL_TEMP = model_resnet
    BASE_MODEL_TEMP.trainable = True

    model = Sequential()
    model.add(BASE_MODEL_TEMP)
    model.add(MaxPooling2D(pool_size=(2, 2)))
    model.add(Flatten())
    model.add(Dense(hp.Int('NUM_UNITS', 128, 1024, step=128, default=256), activation=hp.Choice(f'ACTIVATION', ['elu', 'relu']), kernel_regularizer=l2(3e-1)))
    model.add(Dropout(hp.Float(f'DROPOUT_RATE', 0.1, 0.7, step=0.1, default=0.5)))
    model.add(Dense(NUM_CLASSES, activation='softmax'))
    
    model.compile(
        loss='categorical_crossentropy', 
        optimizer=Adam(lr=hp.Choice(f"LR", [1e-5, 2e-5, 5e-5, 5e-6])), 
        metrics=[precision, recall, f1_score, AUC(), 'accuracy']
    )
  
    return model

## Model training

### Naive approach

In [None]:
PARAMS = {
    'BASE_MODEL': [{'MODEL': model_resnet, 'NAME': 'RESNET50V2'}], # [{'MODEL': model_vgg, 'NAME': 'VGG16'}, {'MODEL':  model_xception, 'NAME': 'XCEPTION'}, {'MODEL': model_resnet, 'NAME': 'RESNET50V2'}],
    'LR': [2e-5], # [1e-3, 1e-4],
    'EPOCHS': [50],
    'DROPOUT_RATE': [0.45], # [0.1, 0.2, 0.3, 0.4, 0.5, 0.6],
    'NUM_UNITS': [256], # [128, 256, 512],
    'ACTIVATION': ['elu'], # ['elu', 'relu'],
    'REGULARIZER': [{'REGULARIZER': l2(3e-1), 'NAME': 'L2(0.3)'}] # [{'REGULARIZER': l1(5e-3), 'NAME': 'L1(0.005)'}, {'REGULARIZER': l2(1e-2), 'NAME': 'L2(0.01)'}, {'REGULARIZER': l1(5e-4), 'NAME': 'L1(0.0005)'}, {'REGULARIZER': l2(1e-3), 'NAME': 'L2(0.001)'}]
}

with open(DATA_PATH + 'results.csv', mode='w') as results:
    results_writer = csv.writer(results, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL)
    results_writer.writerow(['EPOCHS', 'BATCH SIZE', 'BASE MODEL', 'DROPOUT RATE', 'LR', 'NUM UNITS', 'ACTIVATION', 'REGULARIZER', 'LOSS', 'ACCURACY', 'PRECISION', 'RECALL', 'F1-SCORE', 'VALIDATION LOSS', 'VALIDATION ACCURACY', 'VALIDATION PRECISION', 'VALIDATION RECALL', 'VALIDATION F1-SCORE'])

for EPOCHS in PARAMS['EPOCHS']:
    for BASE_MODEL in PARAMS['BASE_MODEL']:
        for DROPOUT_RATE in PARAMS['DROPOUT_RATE']:
            for REGULARIZER in PARAMS['REGULARIZER']:
                for LR in PARAMS['LR']:
                    for NUM_UNITS in PARAMS['NUM_UNITS']:
                        for ACTIVATION in PARAMS['ACTIVATION']:
                            with open(DATA_PATH + 'results.csv', mode='a') as results:
                                results_writer = csv.writer(results, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL)

                                model = create_model(BASE_MODEL['MODEL'], LR, DROPOUT_RATE, NUM_UNITS, ACTIVATION, REGULARIZER['REGULARIZER'])
                                model.summary()

                                history = model.fit(
                                    train_aug,
                                    validation_data=validation_aug,
                                    epochs=EPOCHS,
                                    callbacks=[CHECKPOINT, EARLYSTOP],
                                    verbose=VERBOSE
                                )

                                print('[INFO] Model:\n\tBase model: ', BASE_MODEL['NAME'], '\n\tEpochs: ', EPOCHS, '\n\tLearning rate: ', LR, '\n\tBatch size: ', BATCH_SIZE, '\n\tNum units: ', NUM_UNITS, '\n\tActivation: ', ACTIVATION, '\n\tDropout rate: ', DROPOUT_RATE, '\n\tRegularizer: ', REGULARIZER['NAME'])
                                print('[WEIGHTS] Model:\n\tSum of the weights of first dense layer: ', sum(sum(abs(model.layers[3].get_weights()[0]))), '\n\tSum of the biases of first dense layer: ', sum(abs(model.layers[3].get_weights()[1])), '\n\tSum of the weights of second dense layer: ', sum(sum(abs(model.layers[5].get_weights()[0]))), '\n\tSum of the biases of second dense layer: ', sum(abs(model.layers[5].get_weights()[1])))
                                print('[TRAINING] Model:\n\tTrain loss: ', history.history['loss'][-1], '\n\tTrain accuracy: ', history.history['accuracy'][-1], '\n\tTrain precision: ', history.history['precision'][-1], '\n\tTrain recall: ', history.history['recall'][-1], '\n\tTrain f1-score: ', history.history['f1_score'][-1])
                                print('[VALIDATING] Model:\n\tValidation loss: ', history.history['val_loss'][-1], '\n\tValidation accuracy: ', history.history['val_accuracy'][-1], '\n\tValidation precision: ', history.history['val_precision'][-1], '\n\tValidation recall: ', history.history['val_recall'][-1], '\n\tValidation f1-score: ', history.history['val_f1_score'][-1])
                                plot_history(history)

                                results_writer.writerow([EPOCHS, BATCH_SIZE, BASE_MODEL['NAME'], DROPOUT_RATE, LR, NUM_UNITS, ACTIVATION, REGULARIZER['NAME'], history.history['loss'][-1], history.history['accuracy'][-1], history.history['precision'][-1], history.history['recall'][-1], history.history['f1_score'][-1], history.history['val_loss'][-1], history.history['val_accuracy'][-1], history.history['val_precision'][-1], history.history['val_recall'][-1], history.history['val_f1_score'][-1]])
                                model.save(DATA_PATH + 'model_' + str(EPOCHS) + '_' + str(BATCH_SIZE) + '_' + BASE_MODEL['NAME'] + '_' + str(DROPOUT_RATE) + '_' + str(LR) + '_' + str(NUM_UNITS) + '_' + ACTIVATION + '_' + REGULARIZER['NAME'] + '.hdf5')                        
                                
                                tensorflow.keras.backend.clear_session()

### Structured approach

In [None]:
EPOCHS = 10
MAX_TRIALS = 32

tuner = RandomSearch(
    build_model,
    objective='val_accuracy',
    max_trials=MAX_TRIALS,
    directory=DATA_PATH,
    project_name='FlowersRecognition'
)

tuner.search(
    train_aug, 
    validation_data=validation_aug, 
    epochs=EPOCHS, 
    batch_size=BATCH_SIZE,
    callbacks=[EarlyStopping(monitor='val_accuracy', patience=PATIENCE, restore_best_weights=True)],
    verbose=VERBOSE
)

tuner.results_summary()

In [None]:
model = tuner.get_best_models()[0]
model.save(DATA_PATH + 'model.hdf5')
model = tensorflow.keras.models.load_model(DATA_PATH + 'model.hdf5', custom_objects={'precision': precision, 'recall': recall, 'f1_score': f1_score})
model.summary()

## Model testing

In [None]:
score = model.evaluate(test_aug, verbose=VERBOSE)
print('[TESTING] Model:\n\tTest loss: ', score[0], '\n\tTest accuracy: ', score[5], '\n\tTest precision: ', score[1], '\n\tTest recall: ', score[2], '\n\tTest f1-score: ', score[3], '\n\tTest auc: ', score[4])