## Connect Colab to Google Drive

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


## Check file structure

In [None]:
print('> File Structure in Drive root:')
!ls drive/MyDrive
print('> File Structure in "notebooks":')
!ls drive/MyDrive/notebooks/
print('> File Structure in "pickles":')
!ls drive/MyDrive/training_data/
print('> Current Working Directory:')
!pwd

> File Structure in Drive root:
ls: cannot access 'drive/MyDrive': No such file or directory
> File Structure in "notebooks":
ls: cannot access 'drive/MyDrive/notebooks/': No such file or directory
> File Structure in "pickles":
ls: cannot access 'drive/MyDrive/training_data/': No such file or directory
> Current Working Directory:
/content
> Libs before:
Package                       Version       
----------------------------- --------------
absl-py                       0.10.0        
alabaster                     0.7.12        
albumentations                0.1.12        
altair                        4.1.0         
appdirs                       1.4.4         
argon2-cffi                   20.1.0        
asgiref                       3.3.1         
astor                         0.8.1         
astropy                       4.2           
astunparse                    1.6.3         
async-generator               1.10          
atari-py                      0.2.6         
atomicwrites

## Install Necessary Dependencies

In [None]:
!pip install numpy==1.18.5
!pip install matplotlib==3.3.3
!pip install scipy==1.5.4
!pip install tensorflow==2.3.1

## Check GPU and CUDA availability

In [None]:
from tensorflow.config import list_physical_devices
list_physical_devices()

[PhysicalDevice(name='/physical_device:CPU:0', device_type='CPU'),
 PhysicalDevice(name='/physical_device:GPU:0', device_type='GPU')]

In [None]:
from tensorflow.test import is_built_with_cuda
is_built_with_cuda()

True

## Imports

In [None]:
import json
import matplotlib.pyplot as plt
import os
import numpy as np
import tensorflow as tf
import tensorflow.keras.applications as apps
from tensorflow.keras import layers, losses, metrics, models, optimizers
from tensorflow.keras.preprocessing.image import ImageDataGenerator

## Constants

In [None]:
DRIVE_ROOT_PATH = '/content/drive/MyDrive'
FIGURES_PATH = os.path.join(DRIVE_ROOT_PATH, 'figures')
TEXTS_PATH = os.path.join(DRIVE_ROOT_PATH, 'texts')
RESOURCES_PATH = os.path.join(DRIVE_ROOT_PATH, 'resources')
TRAINING_DATA_PATH = os.path.join(DRIVE_ROOT_PATH, 'training_data')
TRAIN_SET_PATH = os.path.join(TRAINING_DATA_PATH, 'train')
TEST_SET_PATH = os.path.join(TRAINING_DATA_PATH, 'test')
VALIDATION_SET_PATH = os.path.join(TRAINING_DATA_PATH, 'validation')

SUBSAMPLE_ARRAY_NAME = 'subsample.npy'
TOP10_BRANDS_COUNTS_NAME = 'top_10_brands_samples_counts.txt'

RANDOM_STATE = 64
RESIZE_HEIGHT = 128
RESIZE_WIDTH = 128
BATCH_SIZE = 32

## Utility Functions

In [None]:
def read_dictionary(file_name):
    ''' Reads the dictionary specified in JSON format from the specified file '''
    with open(os.path.join(RESOURCES_PATH, file_name), 'r') as f:
        dictionary = json.load(f)
    
    return dictionary

In [None]:
def write_dictionary(dictionary, file_name):
    ''' Writes a dictionary to the specified file, in indented JSON format '''
    if os.path.isdir(TEXTS_PATH) is False:
        os.mkdir(TEXTS_PATH)
    with open(os.path.join(TEXTS_PATH, file_name), 'w') as f:
        f.write(json.dumps(dictionary, indent=4))

In [None]:
def load_numpy_array(file_name):
    ''' Reads the NumPy array from the specified file '''
    with open(os.path.join(RESOURCES_PATH, file_name), 'rb') as f:
        array = np.load(f, fix_imports=False)

    return array

In [None]:
def load_pretrained_network(network_name):
    ''' Loads the specified pretrained network from Keras applications, with
    frozen weights '''
    image_shape = (RESIZE_HEIGHT, RESIZE_WIDTH, 3)
    base_model = getattr(apps, network_name)(include_top=False, weights='imagenet', input_shape=image_shape)
    base_model.trainable = False

    return base_model

In [None]:
def load_input_preprocessing_function(module_name):
    ''' Loads the input preprocessing function for the specified pretrained
    network from Keras applications '''
    network_module = getattr(apps, module_name)
    preprocess_input_function = getattr(network_module, 'preprocess_input')

    return preprocess_input_function

## Creating Keras data generators and iterators

In [None]:
# Load the necessary data into memory
X_sample = load_numpy_array(SUBSAMPLE_ARRAY_NAME)
samples_counts = read_dictionary(TOP10_BRANDS_COUNTS_NAME)

In [None]:
# Defining and fitting the data generator
# The augmentation is the same for all data sets, so a single generator is used
data_generator = ImageDataGenerator(
    featurewise_center=True,
    featurewise_std_normalization=True
)

data_generator.fit(X_sample)
del X_sample # This is no longer needed, delete to free space

In [None]:
# Defining the data iterators
train_iterator = data_generator.flow_from_directory(
    directory=TRAIN_SET_PATH,
    target_size=(RESIZE_HEIGHT, RESIZE_WIDTH), # Size of MobileNet inputs is (224, 224)
    color_mode='rgb',
    classes=list(samples_counts.keys()),
    class_mode='categorical',
    batch_size=BATCH_SIZE,
    shuffle=False,
    # seed=RANDOM_STATE,
    interpolation='bilinear'
)

validation_iterator = data_generator.flow_from_directory(
    directory=VALIDATION_SET_PATH,
    target_size=(RESIZE_HEIGHT, RESIZE_WIDTH),
    color_mode='rgb',
    classes=list(samples_counts.keys()),
    class_mode='categorical',
    batch_size=BATCH_SIZE,
    shuffle=False,
    # seed=RANDOM_STATE,
    interpolation='bilinear'
)

test_iterator = data_generator.flow_from_directory(
    directory=TEST_SET_PATH,
    target_size=(RESIZE_HEIGHT, RESIZE_WIDTH), # Size of MobileNet inputs is (224, 224)
    color_mode='rgb',
    classes=list(samples_counts.keys()),
    class_mode='categorical',
    batch_size=BATCH_SIZE,
    shuffle=False,
    # seed=RANDOM_STATE,
    interpolation='bilinear'
)

Found 154550 images belonging to 10 classes.
Found 40572 images belonging to 10 classes.


## Training the model (Simple version)

In [None]:
# Define test layers
preprocess_input = apps.vgg16.preprocess_input
base_model = load_pretrained_network('VGG16')
flatten_layer = layers.Flatten(name='flatten')
specialisation_layer = layers.Dense(1024, activation='relu', name='specialisation_layer')
avg_pooling_layer = layers.GlobalAveragePooling2D(name='avg_pooling_layer')
max_pooling_layer = layers.GlobalMaxPooling2D(name='max_pooling_layer')
dropout_layer = layers.Dropout(0.5, name='dropout_layer')
classification_layer = layers.Dense(10, activation='softmax', name='classification_layer')

# Define test model
inputs = tf.keras.Input(shape=(RESIZE_HEIGHT, RESIZE_WIDTH, 3))
x = preprocess_input(inputs)
x = base_model(x, training=False)
x = avg_pooling_layer(x)
# x = flatten_layer(x)
# x = specialisation_layer(x)
x = dropout_layer(x)
outputs = classification_layer(x)
model = tf.keras.Model(inputs, outputs)

model.summary()

# Define train parameters
steps_per_epoch = len(train_iterator)
validation_steps = len(validation_iterator)
base_learning_rate = 0.001
optimizer = optimizers.Adam(learning_rate=base_learning_rate)
loss_function = losses.CategoricalCrossentropy()
train_metrics = [metrics.Accuracy(), metrics.AUC(), metrics.Precision(), metrics.Recall()]

# Compile the model
model.compile(optimizer=optimizer,
              loss=loss_function,
              metrics=train_metrics)

# Train the model
training_history = model.fit(train_iterator, epochs=20, verbose=1,
                             validation_data=validation_iterator,
                             callbacks=[],
                             steps_per_epoch=steps_per_epoch,
                             validation_steps=validation_steps)
history = training_history.history

# Evaluate the model
final_results = model.evaluate(test_iterator,
                               return_dict=True)
write_dictionary(final_results, 'VGG16 Training Metrics.txt')

# Plot training and validation accuracy and loss
if os.path.isdir(FIGURES_PATH) is False:
    os.mkdir(FIGURES_PATH)

training_accuracy = history['accuracy']
validation_accuracy = history['val_accuracy']
training_loss = history['loss']
validation_loss = history['val_loss']

plt.figure(figsize=(18, 10))
plt.subplot(2, 1, 1)
plt.plot(training_accuracy, label='Training Accuracy')
plt.plot(validation_accuracy, label='Validation Accuracy')
plt.legend()
plt.ylabel('Accuracy')
plt.ylim([min(plt.ylim()), 1])
plt.title('Training and Validation Accuracy')

plt.subplot(2, 1, 2)
plt.plot(training_loss, label='Training Loss')
plt.plot(validation_loss, label='Validation Loss')
plt.legend()
plt.xlabel('Epoch')
plt.ylabel('Categorical Cross Entropy')
plt.ylim([0, max(plt.ylim())])
plt.title('Training and Validation Loss')

figure_path = os.path.join(FIGURES_PATH, 'VGG16 Training Results.png')
plt.savefig(figure_path, quality=100)
plt.close()

## Training the model (Complex version)

In [None]:
# Define the models and train parameters to be tested
model_names = {
    'efficientnet': ['EfficientNetB0', 'EfficientNetB3', 'EfficientNetB7'],
    'mobilenet': ['MobileNet'],
    'mobilenet_v2': ['MobileNetV2'],
    'nasnet': ['NASNetMobile'],
    'resnet50': ['ResNet50'],
    'resnet_v2': ['ResNet50V2'],
    'vgg16': ['VGG16']
}

steps_per_epoch = len(train_iterator)
validation_steps = len(validation_iterator)
base_learning_rate = 0.001
loss_function = losses.CategoricalCrossentropy()
train_metrics = [metrics.Accuracy(), metrics.AUC(), metrics.Precision(), metrics.Recall()]
train_optimizers = [optimizers.Adam(learning_rate=base_learning_rate),
                    optimizers.RMSprop(learning_rate=base_learning_rate),
                    optimizers.Adadelta(learning_rate=base_learning_rate)]

In [None]:
# Define test layers
flatten_layer = layers.Flatten(name='flatten')
specialisation_layer = layers.Dense(1024, activation='relu', name='specialisation_layer')
avg_pooling_layer = layers.GlobalAveragePooling2D(name='avg_pooling_layer')
max_pooling_layer = layers.GlobalMaxPooling2D(name='max_pooling_layer')
dropout_layer = layers.Dropout(0.5, name='dropout_layer')
classification_layer = layers.Dense(10, activation='softmax', name='classification_layer')

In [None]:
# Perform training for each model and optimizer
inputs = tf.keras.Input(shape=(RESIZE_HEIGHT, RESIZE_WIDTH, 3))

for module in model_names.keys():
    for network in model_names[module]:
        base_model = load_pretrained_network(network)
        preprocess_input = load_input_preprocessing_function(module)

        # Build the model
        x = preprocess_input(inputs)
        x = base_model(x, training=False)
        x = avg_pooling_layer(x)
        x = dropout_layer(x)
        outputs = classification_layer(x)
        model = tf.keras.Model(inputs, outputs)

        model.summary()

        for optimizer in train_optimizers:
            # Compile the model
            model.compile(optimizer=optimizer,
                          loss=loss_function,
                          metrics=train_metrics)
            
            # Fit the model
            training_history = model.fit(train_iterator, epochs=20, verbose=1,
                                        validation_data=validation_iterator,
                                        callbacks=[],
                                        steps_per_epoch=steps_per_epoch,
                                        validation_steps=validation_steps)
            history = training_history.history

            # Evaluate the model
            final_results = model.evaluate(test_iterator,
                                           return_dict=True)
            optimizer_name = type(optimizer).__name__
            dictionary_name = module + '_' + network + '_' + optimizer_name + '_results.txt'
            write_dictionary(final_results, dictionary_name)

            # Plot training and validation accuracy and loss
            training_accuracy = history['accuracy']
            validation_accuracy = history['val_accuracy']
            training_loss = history['loss']
            validation_loss = history['val_loss']

            plt.figure(figsize=(18, 10))
            plt.subplot(2, 1, 1)
            plt.plot(training_accuracy, label='Training Accuracy')
            plt.plot(validation_accuracy, label='Validation Accuracy')
            plt.legend()
            plt.ylabel('Accuracy')
            plt.ylim([min(plt.ylim()), 1])
            plt.title('Training and Validation Accuracy')

            plt.subplot(2, 1, 2)
            plt.plot(training_loss, label='Training Loss')
            plt.plot(validation_loss, label='Validation Loss')
            plt.legend()
            plt.xlabel('Epoch')
            plt.ylabel('Categorical Cross Entropy')
            plt.ylim([0, max(plt.ylim())])
            plt.title('Training and Validation Loss')

            figure_name = module + '_' + network + '_' + optimizer_name + '_results.png'
            figure_path = os.path.join(FIGURES_PATH, figure_name)
            plt.savefig(figure_path, quality=100)
            plt.close()

Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/vgg16/vgg16_weights_tf_dim_ordering_tf_kernels_notop.h5


HBox(children=(FloatProgress(value=1.0, bar_style='info', max=1.0), HTML(value='')))

HBox(children=(FloatProgress(value=0.0, max=4830.0), HTML(value='')))

KeyboardInterrupt: ignored