# VGG16 USING GA

# Dataset download
In this section the dataset is downloaded from *Kaggle*, unzipped and well formatted.

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import json
import os
from tqdm import tqdm, tqdm_notebook
import random

import tensorflow as tf
from tensorflow.keras.models import Sequential, Model
from tensorflow.keras.layers import *
from tensorflow.keras.optimizers import *
from tensorflow.keras.applications import *
from tensorflow.keras.callbacks import *
from tensorflow.keras.initializers import *
from tensorflow.keras.preprocessing.image import ImageDataGenerator

from numpy.random import seed
seed(42)

In [None]:
! pip install -q kaggle

from google.colab import files
_ = files.upload()

! mkdir -p ~/.kaggle
! cp kaggle.json ~/.kaggle/
! chmod 600 ~/.kaggle/kaggle.json

Saving kaggle.json to kaggle.json


In [None]:
! kaggle datasets download -d ikarus777/best-artworks-of-all-time
! unzip best-artworks-of-all-time.zip

[1;30;43mOutput streaming troncato alle ultime 5000 righe.[0m
  inflating: resized/resized/Henri_Rousseau_61.jpg  
  inflating: resized/resized/Henri_Rousseau_62.jpg  
  inflating: resized/resized/Henri_Rousseau_63.jpg  
  inflating: resized/resized/Henri_Rousseau_64.jpg  
  inflating: resized/resized/Henri_Rousseau_65.jpg  
  inflating: resized/resized/Henri_Rousseau_66.jpg  
  inflating: resized/resized/Henri_Rousseau_67.jpg  
  inflating: resized/resized/Henri_Rousseau_68.jpg  
  inflating: resized/resized/Henri_Rousseau_69.jpg  
  inflating: resized/resized/Henri_Rousseau_7.jpg  
  inflating: resized/resized/Henri_Rousseau_70.jpg  
  inflating: resized/resized/Henri_Rousseau_8.jpg  
  inflating: resized/resized/Henri_Rousseau_9.jpg  
  inflating: resized/resized/Henri_de_Toulouse-Lautrec_1.jpg  
  inflating: resized/resized/Henri_de_Toulouse-Lautrec_10.jpg  
  inflating: resized/resized/Henri_de_Toulouse-Lautrec_11.jpg  
  inflating: resized/resized/Henri_de_Toulouse-Lautrec_12.j

#Data preprocessing

In [None]:
artists = pd.read_csv('artists.csv')
# Sort artists by number of paintings
artists = artists.sort_values(by=['paintings'], ascending=False)

# Create a dataframe with artists having more than 200 paintings
artists_top = artists[artists['paintings'] >= 200].reset_index()
artists_top = artists_top[['name', 'paintings']]
artists_top['class_weight'] = artists_top.paintings.sum() / (artists_top.shape[0] * artists_top.paintings)
#artists_top = artists_top.loc[artists_top['paintings'] >200]
artists_top

Unnamed: 0,name,paintings,class_weight
0,Vincent van Gogh,877,0.445631
1,Edgar Degas,702,0.556721
2,Pablo Picasso,439,0.890246
3,Pierre-Auguste Renoir,336,1.163149
4,Albrecht Dürer,328,1.191519
5,Paul Gauguin,311,1.25665
6,Francisco Goya,291,1.343018
7,Rembrandt,262,1.491672
8,Alfred Sisley,259,1.508951
9,Titian,255,1.53262


In [None]:
# Set class weights - assign higher weights to underrepresented classes
class_weights = artists_top['class_weight'].to_dict()
class_weights

{0: 0.44563076604125634,
 1: 0.5567210567210568,
 2: 0.8902464278318493,
 3: 1.1631493506493507,
 4: 1.1915188470066518,
 5: 1.2566501023092662,
 6: 1.3430178069353327,
 7: 1.491672449687717,
 8: 1.5089505089505089,
 9: 1.532620320855615,
 10: 1.6352225180677062}

Next step is to solve a problem: the downloaded dataset present two directory containing the same paintings but they have different name. What we do in the next cells is to create a single directory called 'Albrecht_Durer' containing one copy of each painting, and then we delete all unuseful directories.

In [None]:
updated_name = "Albrecht_Dürer".replace("_", " ")
artists_top.iloc[4, 0] = updated_name
images_dir = 'images/images'
artists_dirs = os.listdir(images_dir)
artists_top_name = artists_top['name'].str.replace(' ', '_').values

In [None]:
! mv '/content/images/images/Albrecht_Du╠Иrer' /content/images/images/Albrecht_Dürer

In [None]:
! rm -R '/content/images/images/Albrecht_DuтХа├кrer' 
! rm -R '/content/resized'

#Data fetch

In [None]:
import pathlib
import os

IMAGE_DIR = '/content/images/'
TRAIN_DIR = pathlib.Path(os.path.join(IMAGE_DIR, 'train'))
TEST_DIR = pathlib.Path(os.path.join(IMAGE_DIR, 'test'))

IMAGE_HEIGHT = 256
IMAGE_WIDTH = 256
BATCH_SIZE = 128
RANDOM_SEED = 42
VALIDATION_SPLIT = 0.10

In [None]:
import os
import numpy as np
import shutil

rootdir= '/content/images/images' #path of the original folder
classes = os.listdir(rootdir)

for i, c in enumerate(classes, start=1):
  if c not in artists_top_name.tolist():
    shutil.rmtree(rootdir + '/' + c)
    continue
  if not os.path.exists(rootdir + '/train/' + c):
    os.makedirs(rootdir + '/train/' + c)
  if not os.path.exists(rootdir + '/test/' + c):  
    os.makedirs(rootdir + '/test/' + c)

  source = os.path.join(rootdir, c)
  allFileNames = os.listdir(source)

  np.random.shuffle(allFileNames)

  test_ratio = 0.10
  train_FileNames, test_FileNames = np.split(np.array(allFileNames),
                                                        [int(len(allFileNames)* (1 - test_ratio))])

  train_FileNames = [source+'/'+ name for name in train_FileNames.tolist()]
  test_FileNames = [source+'/' + name for name in test_FileNames.tolist()]

  for name in train_FileNames:
    shutil.copy(name, rootdir +'/train/' + c)

  for name in test_FileNames:
    shutil.copy(name, rootdir +'/test/' + c)

In [None]:
! mv /content/images/images/train /content/images
! mv /content/images/images/test /content/images
! rm -r /content/images/images

In [None]:
import tensorflow as tf

training_images = tf.keras.preprocessing.image_dataset_from_directory(
    TRAIN_DIR, labels='inferred', label_mode='categorical',
    class_names=None, color_mode='rgb', batch_size=BATCH_SIZE, image_size=(IMAGE_HEIGHT,
    IMAGE_WIDTH), shuffle=True, seed=RANDOM_SEED, validation_split=VALIDATION_SPLIT, subset='training',
    interpolation='bilinear', follow_links=False
)

val_images = tf.keras.preprocessing.image_dataset_from_directory(
    TRAIN_DIR, labels='inferred', label_mode='categorical',
    class_names=None, color_mode='rgb', batch_size=BATCH_SIZE, image_size=(IMAGE_HEIGHT,
    IMAGE_WIDTH), shuffle=True, seed=RANDOM_SEED, validation_split=VALIDATION_SPLIT, subset='validation',
    interpolation='bilinear', follow_links=False
)

test_images = tf.keras.preprocessing.image_dataset_from_directory(
    TEST_DIR, labels='inferred', label_mode='categorical',
    class_names=None, color_mode='rgb', batch_size=BATCH_SIZE, image_size=(IMAGE_HEIGHT,
    IMAGE_WIDTH), shuffle=True, seed=RANDOM_SEED, interpolation='bilinear', follow_links=False
)


Found 3864 files belonging to 11 classes.
Using 3478 files for training.
Found 3864 files belonging to 11 classes.
Using 386 files for validation.
Found 435 files belonging to 11 classes.


In [None]:
NUM_CLASSES = len(training_images.class_names)

Install deap

In [None]:
! pip install deap



# VGG16 Class

In [None]:
from tensorflow import keras as ks
from tensorflow.keras import layers
from tensorflow.keras.applications import VGG16
from tensorflow.keras import regularizers
import matplotlib.pyplot as plt
import numpy as np

Callback and data augmentation objects:

In [None]:
callbacks_list = [
    ks.callbacks.EarlyStopping(
        monitor='val_accuracy',
        patience=5,
    ),
    ks.callbacks.ModelCheckpoint(
        filepath='checkpoint_vgg16.keras',
        monitor='val_loss',
        save_best_only=True,
    )
]
data_augmentation = ks.Sequential(
    [
        layers.RandomFlip('horizontal'),
        layers.RandomRotation(0.1),
        layers.RandomZoom(0.2),
        layers.RandomHeight(0.1),
        layers.RandomWidth(0.1)
    ]
)

In [None]:
class Vgg16:

    LOSS = 'categorical_crossentropy'

    def __init__(self, train=None, test=None, val=None, classes=11, epochs=10,
                 callbacks_list=None, data_augmentation=None, class_weights=None):
        # set datasets
        self.training_set = train
        self.test_set = test
        self.val_set = val

        # model
        self.model = None

        # utils
        self.history = None
        self.classes = classes
        self.epochs = epochs
        self.callbacks_list = callbacks_list
        self.data_augmentation= data_augmentation
        self.vgg16dict = {}
        self.class_weights=class_weights

    def vgg16_finetuned_dropout_reg(self, activation='relu', num_of_blocks=1):
        inputs = self.input()
        x = self.fine_tuning(num_of_blocks=num_of_blocks)(inputs)
        x = layers.Flatten(name='my_flatten')(x)
        x = layers.Dense(256,
                         activation=activation,
                         name='my_dense1',
                         kernel_regularizer=regularizers.l1_l2(0.002, 0.002))(x)
        x = layers.Dense(256,
                         activation=activation,
                         name='my_dense2',
                         kernel_regularizer=regularizers.l1_l2(0.002, 0.002))(x)
        x = layers.Dropout(0.5)(x)
        outputs = layers.Dense(self.classes, activation='softmax', name='predictions')(x)
        self.model = ks.Model(inputs=inputs, outputs=outputs)

    """
        NETWORK PIECE BY PIECE STRUCTURE BLOCK
    """

    def input(self):
        inputs = ks.Input(shape=(256, 256, 3))
        x = ks.applications.vgg16.preprocess_input(inputs)
        x = self.data_augmentation(x)
        x = layers.Rescaling(1. / 255)(x)
        return x

    def base_vgg16(self):
        res = VGG16(
            weights='imagenet',
            include_top=False,
            input_shape=((256, 256, 3))
        )
        res.trainable = False
        return res

    def fine_tuning(self, num_of_blocks=1):
        if num_of_blocks < 1:
            num_of_blocks = 1

        res = self.base_vgg16()
        res.trainable = True
        set_trainable = False

        if num_of_blocks == 1:
            block_name = 'block5_conv3'
        else:
            block_name = 'block5_conv2'

        for layer in res.layers:
            if layer.name == block_name:
                set_trainable = True
            if set_trainable:
                layer.trainable = True
            else:
                layer.trainable = False

        return res

    """
        UTILITIES
    """

    def plot_model(self, model_name):
        ks.utils.plot_model(self.model, model_name, show_shapes=True)

    def plot_accuracy(self):
        acc = self.history.history['accuracy']
        val_acc = self.history.history['val_accuracy']
        plt.plot(range(1, len(acc) + 1), acc, 'r', label='Training Accuracy')
        plt.plot(range(1, len(acc) + 1), val_acc, 'g', label='Validation Accuracy')
        plt.title('Training and Validation Accuracy')
        plt.xlabel('Epochs')
        plt.ylabel('Accuracy')
        plt.legend()
        plt.plot()

    def plot_loss(self):
        loss = self.history.history['loss']
        val_loss = self.history.history['val_loss']
        plt.plot(range(1, len(loss) + 1), loss, 'r', label='Training Loss')
        plt.plot(range(1, len(loss) + 1), val_loss, 'g', label='Validation Loss')
        plt.title('Training and Validation Loss')
        plt.xlabel('Epochs')
        plt.ylabel('Loss')
        plt.legend()
        plt.plot()

    def summary(self):
        self.model.summary()

    """
        GA UTILITIES FUNCTIONS
    """
    def convertParams(self, params):
        activation = int(params[0])
        optimizer = int(params[1])

        if activation == 1:
            activation = 'elu'
        else:
            activation = 'relu'

        if optimizer == 0:
            optimizer = ks.optimizers.RMSprop(learning_rate=params[2])
        else:
            optimizer = ks.optimizers.Adam(learning_rate=params[2])

        return activation, optimizer

    def getValAccuracy(self, params):
        key = str(int(params[0])) + '_' + str(int(params[1])) + '_' + str(params[2])
        if key in self.vgg16dict:
            return self.vgg16dict[key]
        activation, optimizer = self.convertParams(params)
        self.compile_and_fit(activation=activation, optimizer=optimizer, ga=True)

        # save value for avoiding to compute it again
        val_accuracy = np.max(self.history.history['val_accuracy'])
        self.vgg16dict[key] = val_accuracy

        return val_accuracy


    """
        COMPILE AND FIT
    """
    def compile_and_fit(self, optimizer, activation=None,  ga=False):
        if ga:
            self.vgg16_finetuned_dropout_reg(activation=activation, num_of_blocks=2)

        self.model.compile(
            optimizer=optimizer,
            loss=self.LOSS,
            metrics=['accuracy']
        )

        if self.callbacks_list is None:
            self.history = self.model.fit(
                self.training_set,
                epochs=self.epochs,
                validation_data=self.val_set
            )
        else:
            self.history = self.model.fit(
                self.training_set,
                epochs=self.epochs,
                validation_data=self.val_set,
                callbacks=self.callbacks_list,
                class_weight=self.class_weights
            )

    def evaluate(self):
        test_loss, test_acc = self.model.evaluate(self.test_set)
        print(f"Test accuracy: {test_acc:.3f}")

## Define elitism function

In [None]:
from deap import tools
from deap import algorithms

def eaSimpleWithElitism(population, toolbox, cxpb, mutpb, ngen, stats=None,
             halloffame=None, verbose=__debug__):
    """This algorithm is similar to DEAP eaSimple() algorithm, with the modification that
    halloffame is used to implement an elitism mechanism. The individuals contained in the
    halloffame are directly injected into the next generation and are not subject to the
    genetic operators of selection, crossover and mutation.
    """
    logbook = tools.Logbook()
    logbook.header = ['gen', 'nevals'] + (stats.fields if stats else [])

    # Evaluate the individuals with an invalid fitness
    invalid_ind = [ind for ind in population if not ind.fitness.valid]
    fitnesses = toolbox.map(toolbox.evaluate, invalid_ind)
    for ind, fit in zip(invalid_ind, fitnesses):
        ind.fitness.values = fit

    if halloffame is None:
        raise ValueError("halloffame parameter must not be empty!")

    halloffame.update(population)
    hof_size = len(halloffame.items) if halloffame.items else 0

    record = stats.compile(population) if stats else {}
    logbook.record(gen=0, nevals=len(invalid_ind), **record)
    if verbose:
        print(logbook.stream)

    # Begin the generational process
    for gen in range(1, ngen + 1):

        # Select the next generation individuals
        offspring = toolbox.select(population, len(population) - hof_size)

        # Vary the pool of individuals
        offspring = algorithms.varAnd(offspring, toolbox, cxpb, mutpb)

        # Evaluate the individuals with an invalid fitness
        invalid_ind = [ind for ind in offspring if not ind.fitness.valid]
        fitnesses = toolbox.map(toolbox.evaluate, invalid_ind)
        for ind, fit in zip(invalid_ind, fitnesses):
            ind.fitness.values = fit

        # add the best back to population:
        offspring.extend(halloffame.items)

        # Update the hall of fame with the generated individuals
        halloffame.update(offspring)

        # Replace the current population by the offspring
        population[:] = offspring

        # Append the current generation statistics to the logbook
        record = stats.compile(population) if stats else {}
        logbook.record(gen=gen, nevals=len(invalid_ind), **record)
        if verbose:
            print(logbook.stream)

    return population, logbook


## GA workflow

imports:

In [None]:
from deap import creator
from deap import tools
from deap import base

import random
import numpy as np

import matplotlib.pyplot as plt
import seaborn as sns

In [None]:
# boundaries for VGG16 parameters
# activation_function: 0, 1 (relu, elu)
# optimizer: 0, 1 (rmsprop, adam)
# learning rate: 0.001..0.01
BOUNDS_LOW = [0, 0, 0.001]
BOUNDS_HIGH = [1.999, 1.999, 0.01]

NUM_OF_PARAMS = len(BOUNDS_HIGH)

In [None]:
# Genetic Algorithm constants:
POPULATION_SIZE = 5
P_CROSSOVER = 0.9       # probability for crossover
P_MUTATION= 0.5         # probability for mutating an individual
MAX_GENERATIONS = 5
HALL_OF_FAME = 1
CROWDING_FACTOR = 20.0 # crowding factor for crossover and mutation

In [None]:
# set the random seed:
RANDOM_SEED = 42
random.seed(RANDOM_SEED)

In [None]:
# create the classifier accuracy test class:
test = Vgg16(train=training_images, val=val_images, test=test_images, 
              classes=NUM_CLASSES, callbacks_list=callbacks_list, 
              data_augmentation=data_augmentation, class_weights=class_weights,
              epochs=25)

GA class and functions:

In [None]:
toolbox = base.Toolbox()

# define a single objective, maximizing fitness strategy:
creator.create('FitnessMax', base.Fitness, weights=(1.0,))

# create the Individual class based on list:
creator.create('Individual', list, fitness=creator.FitnessMax)

# define the hyperparameter atributes individually:
for i in range(NUM_OF_PARAMS):
    # "hyperparameter_0", "hyperparameter_1", ...
    toolbox.register("hyperparameter_" + str(i),
                     random.uniform,
                     BOUNDS_LOW[i],
                     BOUNDS_HIGH[i])

# create a tuple containing an attribute generator for each param searched:
hyperparameters = ()
for i in range(NUM_OF_PARAMS):
    hyperparameters = hyperparameters + \
                      (toolbox.__getattribute__("hyperparameter_" + str(i)),)

# create the individual operator to fill up an Individual instance:
toolbox.register("individualCreator",
                 tools.initCycle,
                 creator.Individual,
                 hyperparameters,
                 n=1)

# create the population operator to generate a list of individuals:
toolbox.register('populationCreator', tools.initRepeat, list, toolbox.individualCreator)

# fitness calculation
def classificationAccuracy(individual):
    return test.getValAccuracy(individual), # return a tuple

toolbox.register('evaluate', classificationAccuracy)

# genetic operators:
toolbox.register('select', tools.selTournament, tournsize=2)
toolbox.register('mate',
                 tools.cxSimulatedBinaryBounded,
                 low=BOUNDS_LOW,
                 up=BOUNDS_HIGH,
                 eta=CROWDING_FACTOR)

toolbox.register('mutate',
                 tools.mutPolynomialBounded,
                 low=BOUNDS_LOW,
                 up=BOUNDS_HIGH,
                 eta=CROWDING_FACTOR,
                 indpb=1.0 / NUM_OF_PARAMS)



GA flow:

In [None]:
population = toolbox.populationCreator(n=POPULATION_SIZE)

stats = tools.Statistics(lambda ind: ind.fitness.value)
stats.register('max', np.max)
stats.register('avg', np.mean)

# define the hall-of-fame object:
hof = tools.HallOfFame(HALL_OF_FAME)

# perform the GA fow with hof feature added:
population, logbook = eaSimpleWithElitism(population,
                                          toolbox,
                                          cxpb=P_CROSSOVER,
                                          mutpb=P_MUTATION,
                                          ngen=MAX_GENERATIONS,
                                          stats=stats,
                                          halloffame=hof,
                                          verbose=True)

# print best solution found:
print("- Best solution is: ")
print("params = ", test.formatParams(hof.items[0]))
print("Accuracy = %1.5f" % hof.items[0].fitness.values[0])

# extract statistics:
maxFitnessValues, meanFitnessValues = logbook.select("max", "avg")

# plot statistics:
sns.set_style("whitegrid")
plt.plot(maxFitnessValues, color='red')
plt.plot(meanFitnessValues, color='green')
plt.xlabel('Generation')
plt.ylabel('Max / Average Fitness')
plt.title('Max and Average fitness over Generations')
plt.show()

Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/vgg16/vgg16_weights_tf_dim_ordering_tf_kernels_notop.h5
Epoch 1/25

KeyboardInterrupt: ignored