# Fine-tuning an Inception V4 CNN

In [None]:
import os
import sys
import glob
import argparse
from matplotlib import pyplot as plt
import multiprocessing
import time

from keras import __version__
from inception_v4 import InceptionV4, preprocess_input
from keras.models import Model
from keras.layers import Dense, GlobalAveragePooling2D
from keras.preprocessing.image import ImageDataGenerator
from keras.optimizers import SGD
from keras.callbacks import EarlyStopping, ModelCheckpoint
from types import SimpleNamespace

IM_WIDTH, IM_HEIGHT = 299, 299 # fixed size for Inception V4
EPOCHS = 200
BATCH_SIZE = 16
FC_SIZE = 1024
TRAIN_DIR = "C:\\AID\\AID-split\\train"
TEST_DIR = "C:\\AID\\AID-split\\test"
WEIGHTS_PATH = str(os.path.join('output_weights', 'weights_checkpoint.h5'))

In [None]:
def get_callbacks(weights_path=WEIGHTS_PATH, patience=40, monitor='val_acc', min_delta=0):
    early_stopping = EarlyStopping(verbose=1, patience=patience, monitor=monitor, min_delta=min_delta)
    model_checkpoint = ModelCheckpoint(weights_path,
                                       save_best_only=True,
                                       save_weights_only=True,
                                       monitor=monitor)
    return [early_stopping, model_checkpoint]


def get_nb_files(directory):
    """Get number of files by searching directory recursively"""
    if not os.path.exists(directory):
        return 0
    cnt = 0
    for r, dirs, files in os.walk(directory):
        for dr in dirs:
            cnt += len(glob.glob(os.path.join(r, dr + "/*")))
    return cnt


def setup_to_transfer_learn(model, base_model):
    """Freeze all layers and compile the model"""
    for layer in base_model.layers:
        layer.trainable = False
    model.compile(optimizer = 'rmsprop', loss = 'categorical_crossentropy', metrics = ['accuracy'])


def add_new_last_layer(base_model, nb_classes):
    """Add last layer to the convnet"""
    x = base_model.output
    x = GlobalAveragePooling2D()(x)
    x = Dense(FC_SIZE, activation = 'relu')(x) # new FC layer, random init
    predictions = Dense(nb_classes, activation = 'softmax')(x) # new softmax layer
    model = Model(inputs = base_model.input, outputs = predictions)
    return model


def setup_to_finetune(model):
    """Unfreeze all layers"""
    for layer in model.layers:
         layer.trainable = True
    model.compile(optimizer = SGD(lr = 0.0001, momentum = 0.9), loss = 'categorical_crossentropy', metrics = ['accuracy'])


def plot_training(history):
    acc = history.history['acc']
    val_acc = history.history['val_acc']
    loss = history.history['loss']
    val_loss = history.history['val_loss']
    epochs = range(len(acc))
    
    if not os.path.exists('output_graphs'):
        os.makedirs('output_graphs')

    plt.plot(epochs, acc, 'r.')
    plt.plot(epochs, val_acc, 'r')
    plt.title('Training and validation accuracy')
    
    plt.savefig(os.path.join('output_graphs', 'training_plot1_' + str(time.time()).split('.')[0] + '.svg'))

    plt.figure()
    plt.plot(epochs, loss, 'r.')
    plt.plot(epochs, val_loss, 'r-')
    plt.title('Training and validation loss')
    plt.tight_layout()
    plt.savefig(os.path.join('output_graphs', 'training_plot2_' + str(time.time()).split('.')[0] + '.svg'))
    plt.show()
    

### Use transfer learning and fine-tuning to train a network on a new dataset

In [None]:
# Train and test directories must exist
assert(os.path.exists(TRAIN_DIR) and os.path.exists(TEST_DIR))

nb_train_samples = get_nb_files(TRAIN_DIR)
nb_classes = len(glob.glob(TRAIN_DIR + "/*"))
nb_val_samples = get_nb_files(TEST_DIR)

if not os.path.exists('output_weights'):
    os.makedirs('output_weights')

# data prep
train_datagen = ImageDataGenerator(
    preprocessing_function = preprocess_input,
    rotation_range = 45,
    width_shift_range = 0.2,
    height_shift_range = 0.2,
    shear_range = 0.2,
    zoom_range = 0.2,
    horizontal_flip = True
)
test_datagen = ImageDataGenerator(
    preprocessing_function = preprocess_input
)

train_generator = train_datagen.flow_from_directory(
    TRAIN_DIR,
    target_size = (IM_WIDTH, IM_HEIGHT),
    batch_size = BATCH_SIZE,
)

validation_generator = test_datagen.flow_from_directory(
    TEST_DIR,
    target_size = (IM_WIDTH, IM_HEIGHT),
    batch_size = BATCH_SIZE,
)

# setup model
base_model = InceptionV4(weights = 'imagenet', include_top = False)
model = add_new_last_layer(base_model, nb_classes)

# transfer learning
setup_to_transfer_learn(model, base_model)

history_tl = model.fit_generator(
    train_generator,
    epochs = EPOCHS,
    steps_per_epoch = nb_train_samples / BATCH_SIZE,
    validation_data = validation_generator,
    validation_steps = nb_val_samples / BATCH_SIZE,
    class_weight = 'auto',
    callbacks = get_callbacks(monitor='val_loss', min_delta=0.075)
)

# Load the best weights from the previous stage
model.load_weights(WEIGHTS_PATH)

# fine-tuning
setup_to_finetune(model)

history_ft = model.fit_generator(
    train_generator,
    epochs = EPOCHS,
    steps_per_epoch = nb_train_samples / BATCH_SIZE,
    validation_data = validation_generator,
    validation_steps = nb_val_samples / BATCH_SIZE,
    class_weight = 'auto',
    callbacks = get_callbacks())

model.save_weights(os.path.join('output_weights', 'weights_' + str(time.time()).split('.')[0] + '.h5'))

if args.plot:
    plot_training(history_ft)