## Set up the env

In [None]:
# Seed environment 
seed_value = 1 # seed value 

# Set`PYTHONHASHSEED` environment variable at a fixed value 
import os os.environ['PYTHONHASHSEED']=str(seed_value) 

# Set the `python` built-in pseudo-random generator at a fixed value 
import random 
random.seed = seed_value 

# Set the `numpy` pseudo-random generator at a fixed value 
import numpy as np 
np.random.seed = seed_value 

# Set the `tensorflow` pseudo-random generator at a fixed value import tensorflow as tf tf.seed = seed_value

## Set data configurations and get the classes

In [None]:
# set configs 
base_path = './natural_images' 
target_size = (224,224,3) 

# define shape for all images # get classes classes = os.listdir(base_path) print(classes)

## Plot sample images 

In [None]:


import matplotlib.pyplot as plt
import cv2

f, axes = plt.subplots(2, 4, sharex=True, sharey=True, figsize = (16,7))

for ax, label in zip(axes.ravel(), classes):
    img = np.random.choice(os.listdir(os.path.join(base_path, label)))
    img = cv2.imread(os.path.join(base_path, label, img))
    img = cv2.resize(img, target_size[:2])
    ax.imshow(cv2.cvtColor(img, cv2.COLOR_BGRA2RGB))
    ax.set_title(label)
    ax.axis(False)

## Load the images

In [None]:
from keras.preprocessing.image import ImageDataGenerator

batch_size = 32

datagen = ImageDataGenerator(rescale=1./255,
                                   rotation_range=20,
                                   shear_range=0.2,
                                   zoom_range=0.2,
                                   width_shift_range = 0.2,
                                   height_shift_range = 0.2,
                                   vertical_flip = True,
                                   validation_split=0.25)

In [None]:
train_gen = datagen.flow_from_directory(base_path,
                                               target_size=target_size[:2],
                                               batch_size=batch_size,
                                               class_mode='categorical',
                                               subset='training')
val_gen =  datagen.flow_from_directory(base_path,
                                               target_size=target_size[:2],
                                               batch_size=batch_size,
                                               class_mode='categorical',
                                               subset='validation',
                                               shuffle=False)

## Build and train a custom model

In [None]:
# Build model 
input = Input(shape= target_size) 

x = Conv2D(filters=32, kernel_size=(3,3), activation='relu')(input) 
x = MaxPool2D(2,2)(x) 

x = Conv2D(filters=64, kernel_size=(3,3), activation='relu')(x) 
x = MaxPool2D(2,2)(x) 

x = Conv2D(filters=128, kernel_size=(3,3), activation='relu')(x) 
x = MaxPool2D(2,2)(x) 

x = Conv2D(filters=256, kernel_size=(3,3), activation='relu')(x) 
x = MaxPool2D(2,2)(x) 

x = Dropout(0.25)(x) 
x = Flatten()(x) 
x = Dense(units=128, activation='relu')(x) 
x = Dense(units=64, activation='relu')(x) 
output = Dense(units=8, activation='softmax')(x) 

custom_model  = Model(input, output, name= 'Custom_Model')

In [None]:
# compile model
custom_model.compile(loss= 'categorical_crossentropy', optimizer='rmsprop', metrics=['accuracy']) 

# initialize callbacks 
reduceLR = ReduceLROnPlateau(monitor='val_loss', patience= 3, verbose= 1,                                  mode='min', factor=  0.2, min_lr = 1e-6) 

early_stopping = EarlyStopping(monitor='val_loss', patience = 5 , verbose=1,                                  mode='min', restore_best_weights= True) 

checkpoint = ModelCheckpoint('CustomModel.weights.hdf5', monitor='val_loss',                               verbose=1,save_best_only=True, mode= 'min') 

callbacks= [reduceLR, early_stopping,checkpoint]

In [None]:
# define training config 
TRAIN_STEPS = 5177 // batch_size 
VAL_STEPS = 1722 //batch_size 
epochs = 80 

# train model
custom_model.fit(train_gen, steps_per_epoch= TRAIN_STEPS, validation_data=val_gen, validation_steps=VAL_STEPS, epochs= epochs, callbacks= callbacks)

## Evaluate the model

In [None]:
# Evaluate the model 
custom_model.evaluate(val_gen)

In [None]:
# get validation labels
val_labels = [] 
for i in range(VAL_STEPS + 1):
  val_labels.extend(val_gen[i][1])

val_labels = np.argmax(val_labels, axis=1)

In [None]:
# show classification report 

from sklearn.metrics import classification_report 

print(classification_report(val_labels, predicted_labels, target_names=classes))


In [None]:
# function to plot confusion matrix

import itertools

def plot_confusion_matrix(actual, predicted):

    cm = confusion_matrix(actual, predicted)
    cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]

    plt.figure(figsize=(7,7))
    cmap=plt.cm.Blues
    plt.imshow(cm, interpolation='nearest', cmap=cmap)
    plt.title('Confusion matrix', fontsize=25)
  
    tick_marks = np.arange(len(classes))
    plt.xticks(tick_marks, classes, rotation=90, fontsize=15)
    plt.yticks(tick_marks, classes, fontsize=15)

    thresh = cm.max() / 2.
    for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
        plt.text(j, i, format(cm[i, j], '.2f'),
        horizontalalignment="center",
        color="white" if cm[i, j] > thresh else "black", fontsize = 14)

    plt.ylabel('True label', fontsize=20)
    plt.xlabel('Predicted label', fontsize=20)
    plt.show()

In [None]:
# plot confusion matrix
plot_confusion_matrix(val_labels, predicted_labels)

## Initializing and fine-tuning the VGG16 model.

In [None]:
# Import the VGG16 pretrained model 
from tensorflow.keras.applications import VGG16 

# initialize the model vgg16 = VGG16(input_shape=(224,224,3), weights='imagenet', include_top=False) 

# Freeze all but the last 3 layers for layer in vgg16.layers[:-3]: layer.trainable = False 

# build model 
input = vgg16.layers[-1].output # input is the last output from vgg16 

x = Dropout(0.25)(input) 
x = Flatten()(x) 
output = Dense(8, activation='softmax')(x) 

# create the model 
vgg16_model = Model(vgg16.input, output, name='VGG16_Model')

In [None]:
# compile the model 
vgg16_model.compile(optimizer= SGD(learning_rate=1e-3), loss= 'categorical_crossentropy', metrics= ['accuracy']) 

# reinitialize callbacks 
checkpoint = ModelCheckpoint('VggModel.weights.hdf5', monitor='val_loss', verbose=1,save_best_only=True, mode= 'min') 

callbacks= [reduceLR, early_stopping,checkpoint] 

# Train model 
vgg16_model.fit(train_gen, steps_per_epoch= TRAIN_STEPS, validation_data=val_gen, validation_steps=VAL_STEPS, epochs= epochs, callbacks= callbacks)

## Evaluate VGG16

In [None]:
# Evaluate the model 
vgg16_model.evaluate(val_gen)

# get the model predictions 
predicted_labels = np.argmax(vgg16_model.predict(val_gen), axis=1) 

# show classification report 
print(classification_report(val_labels, predicted_labels, target_names=classes)) 

# plot confusion matrix 
plot_confusion_matrix(val_labels, predicted_labels)

## Using a pretrained MobileNet model

In [None]:
# initializing the mobilenet model
mobilenet = MobileNet(input_shape=(224,224,3), weights='imagenet', include_top=False)

# freezing all but the last 5 layers
for layer in mobilenet.layers[:-5]:
  layer.trainable = False

# add few mor layers
x = mobilenet.layers[-1].output
x = Dropout(0.5)(x)
x = Flatten()(x) 
x = Dense(32, activation='relu')(x)
x = Dropout(0.5)(x)
x = Dense(16, activation='relu')(x)
output = Dense(8, activation='softmax')(x)

# Create the model
mobilenet_model = Model(mobilenet.input, output, name= "Mobilenet_Model")

In [None]:
# compile the model 
mobilenet_model.compile(optimizer= SGD(learning_rate=1e-3), loss= 'categorical_crossentropy', metrics= ['accuracy']) 

# reinitialize callbacks 
checkpoint = ModelCheckpoint('MobilenetModel.weights.hdf5', monitor='val_loss', verbose=1,save_best_only=True, mode= 'min') 

callbacks= [reduceLR, early_stopping,checkpoint] 

# model training 
mobilenet_model.fit(train_gen, steps_per_epoch= TRAIN_STEPS, validation_data=val_gen, validation_steps=VAL_STEPS, epochs=epochs, callbacks= callbacks)

In [None]:
# Evaluate the model 
mobilenet_model.evaluate(val_gen) 

# get the model's predictions 
predicted_labels = np.argmax(mobilenet_model.predict(val_gen), axis=1) 

# show the classification report 
print(classification_report(val_labels, predicted_labels, target_names=classes)) 

# plot the confusion matrix 
plot_confusion_matrix(val_labels, predicted_labels)

## Concatenation Ensemble

In [None]:
# concatenate the models

# import concatenate layer
from tensorflow.keras.layers import Concatenate

# get list of models
models = [custom_model, vgg16_model, mobilenet_model] 

input = Input(shape=(224, 224, 3), name='input') # input layer

# get output for each model input
outputs = [model(input) for model in models]

# contenate the ouputs
x = Concatenate()(outputs) 

# add further layers
x = Dropout(0.5)(x) 
output = Dense(8, activation='softmax', name='output')(x) # output layer

# create concatenated model
conc_model = Model(input, output, name= 'Concatenated_Model')

In [None]:
# show model structure 
from tensorflow.keras.utils import plot_model 
plot_model(conc_model)

## Average Ensemble

In [None]:
# average ensemble model 

# import Average layer
from tensorflow.keras.layers import Average 

input = Input(shape=(224, 224, 3), name='input')  # input layer

# get output for each input model
outputs = [model(input) for model in models] 

# take average of the outputs
x = Average()(outputs) 

x = Dense(16, activation='relu')(x) 
x = Dropout(0.3)(x) 
output = Dense(8, activation='softmax', name='output')(x) # output layer

# create average ensembled model
avg_model = Model(input, output)

In [None]:
# show model structure 
plot_model(avg_model)

## Weighted Average Ensemble

In [None]:
# function for setting weights

import numpy as np

def weight_init(shape =(1,1,3), weights=[1,2,3], dtype=tf.float32):
    return tf.constant(np.array(weights).reshape(shape), dtype=dtype)

In [None]:
# implement custom weighted average layer

import tensorflow as tf
from tensorflow.keras.layers import Layer, Concatenate

class WeightedAverage(Layer):

    def __init__(self):
        super(WeightedAverage, self).__init__()
        
    def build(self, input_shape):
        
        self.W = self.add_weight(
                    shape=(1,1,len(input_shape)),
                    initializer=weighted_init,
                    dtype=tf.float32,
                    trainable=True)
    def call(self, inputs):
    
        inputs = [tf.expand_dims(i, -1) for i in inputs]
        inputs = Concatenate(axis=-1)(inputs) 
        weights = tf.nn.softmax(self.W, axis=-1)

        return tf.reduce_mean(weights*inputs, axis=-1)
    

In [None]:
input = Input(shape=(224, 224, 3), name='input')  # input layer

# get output for each input model
outputs = [model(input) for model in models] 

# get weighted average of outputs
x = WeightedAverage()(outputs)

output = Dense(8, activation='softmax')(x) # output layer

weighted_avg_model = Model(input, output, name= 'Weighted_AVerage_Model')

In [None]:
# plot model
plot_model(weighted_avg_model)