## Importing Required Components

In [None]:
import time
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
import tensorflow as tf
import keras.layers as layers
from tensorflow import keras
from keras.models import Sequential, Model
from keras.layers import Dense, Input, GlobalAveragePooling2D, Flatten, Dropout
from keras.applications.resnet import ResNet50, ResNet152
from keras.applications.vgg16 import VGG16
from keras.applications.vgg19 import VGG19
from keras.applications.densenet import DenseNet121
from keras.applications.efficientnet_v2 import EfficientNetV2S
from keras.callbacks import EarlyStopping, ModelCheckpoint
from keras.optimizers import adam_v2
from keras.utils import np_utils

## Some Preparations

In [None]:
%matplotlib inline # Jupyter Magic Function for displaying plotting command under the Jupyter cell
np.random.seed(2017) # Setting seed for numpy random generator in order make the model more reproducible

## Preparing Data

In [None]:
# This function prepares train, validation and evaluation data.
# Evaluation data is part of the train set that is not passed to model.fit() so that two models
# can be tested with different evaluation data which they have not seen during training at all.

def get_data(random_state=30, eval_set=True):
    # Reading Train and Test Images along with Train Labels and Saving them to Disk as Numpy Arrays for future use
    def prepare_data(train_len=50000,
                 test_len=10000,
                 x_train_dir="data/train",
                 x_test_dir="data/test",
                 y_trains="data/train_labels.csv"):

        tf_image = tf.keras.preprocessing.image

        x_test = []
        for i in range(test_len):
            image = tf_image.load_img(f"{x_test_dir}/{i}.jpg")
            image_array = tf_image.img_to_array(image)
            x_test.append(image_array)
        np.save("data/x_test.npy", np.array(x_test))

        x_train = []
        for i in range(train_len):
            image = tf_image.load_img(f"{x_train_dir}/{i}.jpg")
            image_array = tf_image.img_to_array(image)
            x_train.append(image_array)
        np.save("data/x_train.npy", np.array(x_train))

        df = pd.read_csv(y_trains)
        labels = df["label"].to_numpy()
        labels.resize((len(labels), 1))
        np.save("data/y_train.npy", labels)

    prepare_data()
    x_train_orig = np.load("data/x_train.npy")
    y_train_orig = np.load("data/y_train.npy")

    x_eval, y_eval = None, None

    x_train_orig = x_train_orig.astype('float32')

    x_train, x_val, y_train, y_val = train_test_split(x_train_orig, y_train_orig, test_size=0.2, random_state=random_state)
    if eval_set:
        x_train, x_eval, y_train, y_eval = train_test_split(x_train, y_train, test_size=0.15, random_state=random_state)
        y_eval = np_utils.to_categorical(y_eval, num_classes)

    # convert y_train and y_val (and possibly y_eval) to one hot encoding
    y_train = np_utils.to_categorical(y_train, num_classes)
    y_val = np_utils.to_categorical(y_val, num_classes)

    return x_train, x_val, x_eval, y_train, y_val, y_eval

## Getting Train, Validation, and Evaluation Datasets

In [None]:
num_classes = 10
x_train, x_val, x_eval, y_train, y_val, y_eval = get_data(eval_set=False)

print('Train Data Shape:', x_train.shape)
print(x_train.shape[0], 'Train Samples')
print(x_val.shape[0], 'Validation Samples')

## Configuring Preprocessing Layers For The Model

In [None]:
normalizer_layer = layers.Normalization(mean=0.5, variance=0.25) # Adding Normalization Layer to the Model

# Adding Data Augmentation layers to the model. Except Rescaling layer, the rest are active only during training phase.
# The value of these layers may change for different models
augmentation_layers = Sequential([
    layers.Resizing(36, 36),
    layers.RandomFlip("horizontal"),
    layers.RandomCrop(32, 32),
    layers.RandomZoom(height_factor=(-0.2, 0.2), fill_mode="constant", fill_value=1),
    layers.RandomRotation(0.015, fill_mode="constant", fill_value=1),
    layers.RandomContrast(0.3),
    layers.RandomTranslation(0.4, 0.4),
    layers.Rescaling(1./255),
])

# Bundling Together Augmentation and Normalization Layers
preprocessing_layers = Sequential([
    augmentation_layers,
    normalizer_layer
])

## Defining A class For Managing and Configuring Keras Applications

In [None]:
# This class configures and returns desired Keras Application without loading any pretrained weights
# based on the name of the application
class ModelsRepo:
    @staticmethod
    def getResnet50(prev_layer):
        base = ResNet50(weights=None, input_shape=input_shape, include_top=False)(prev_layer)
        avg_pool = GlobalAveragePooling2D()(base)
        return avg_pool

    @staticmethod
    def getResnet152(prev_layer):
        base = ResNet152(weights=None, input_shape=input_shape, include_top=False)(prev_layer)
        avg_pool = GlobalAveragePooling2D()(base)
        return avg_pool

    @staticmethod
    def getDenseNet121(prev_layer):
        base = DenseNet121(weights=None, input_shape=input_shape, include_top=False)(prev_layer)
        avg_pool = GlobalAveragePooling2D()(base)
        return avg_pool

    @staticmethod
    def getVGG16(prev_layer):
        base = VGG16(weights=None, input_shape=input_shape, include_top=False)(prev_layer)
        flat = Flatten()(base)
        fc1 = Dense(4096, activation="relu")(flat)
        fc2 = Dense(4096, activation="relu")(fc1)
        return fc2

    @staticmethod
    def getVGG19(prev_layer):
        base = VGG19(weights=None, input_shape=input_shape, include_top=False)(prev_layer)
        flat = Flatten()(base)
        fc1 = Dense(4096, activation="relu")(flat)
        fc2 = Dense(4096, activation="relu")(fc1)
        return fc2

    @staticmethod
    def getEfficientNetV2S(prev_layer):
        base = EfficientNetV2S(weights=None, input_shape=input_shape, include_top=False)(prev_layer)
        avg_pool = GlobalAveragePooling2D()(base)
        dropout = Dropout(0.2)(avg_pool)
        return dropout

    @staticmethod
    def getModel(model_name, prev_layer):
        if model_name == "resnet50":
            return ModelsRepo.getResnet50(prev_layer)
        if model_name == "resnet152":
            return ModelsRepo.getResnet152(prev_layer)
        if model_name == "densenet121":
            return ModelsRepo.getDenseNet121(prev_layer)
        if model_name == "vgg16":
            return ModelsRepo.getVGG16(prev_layer)
        if model_name == "vgg19":
            return ModelsRepo.getVGG19(prev_layer)
        if model_name == "efficientnetv2s":
            return ModelsRepo.getEfficientNetV2S(prev_layer)



## Configuring Model's Name and Directory For Saving and Loading The Model

In [None]:
model_names = ["resnet50", "resnet152", "densenet121", "vgg16", "vgg19", "efficientnetv2s"]
model_name = "densenet121"
fileaddr = f"{model_name}" # File address used for saving and loading h5 models

## Configuring Callback Function For The model

In [None]:
# Configuring Early Stopping callback function in order to prevent over-fitting
es = EarlyStopping(mode="min", patience=10)

 # Model Checkpoints to save the model with lowest val_loss during training
modelCheckpoint = ModelCheckpoint(f"{fileaddr}_best.h5", monitor="val_loss", mode="min", save_best_only=True)

## Constructing The Model

In [None]:
input_shape=x_train.shape[1:]

inp = Input(shape=input_shape)
preprocess = preprocessing_layers(inp) # Getting the preprocessing layers
base = ModelsRepo.getModel(model_name, preprocess) # Getting the desired model as base model
out = Dense(num_classes, activation='softmax')(base) # Adding our own output layer according to our classification task

model = Model(inp, out)
model.summary()

## Configuring Model With Proper Parameters

In [None]:
# Configuring learning rate of the model. This value is changed both for different models and while
# training the same model.
learning_rate = 1e-3


epochs = 10 # Configuring the number of epochs for training. This value is changed iteratively during training.

batch_size = 128 # Configuring batch size. This value is changed iteratively during training.

model.compile(loss='categorical_crossentropy',
              optimizer=adam_v2.Adam(learning_rate=learning_rate),
              metrics=['accuracy'])

## Training The Model

In [None]:
tick = time.time()

# Fitting the model on train data and saving the history in order to plot and monitor the model during training
history = model.fit(x_train, y_train,
                    batch_size=batch_size,
                    epochs=epochs,
                    validation_data=(x_val, y_val),
                    callbacks=[es, modelCheckpoint]
                    )

# Plotting Accuracy and Loss of model during training. Used for configuring further batch size, learning rate, and
# identifying over-fitting
plt.plot(history.history['accuracy'])
plt.plot(history.history['val_accuracy'])
plt.title('model accuracy')
plt.ylabel('accuracy')
plt.xlabel('epoch')
plt.legend(['train', 'val'], loc='upper left')
plt.show()

plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('model loss')
plt.ylabel('loss')
plt.xlabel('epoch')
plt.legend(['train', 'val'], loc='upper left')
plt.show()

print(f"Elapsed Time: {time.time() - tick}")

## Saving and Loading The Model

In [None]:
model.save(f'{fileaddr}.h5') # Saving the current model. Note that this isn't necessarily the best model
best_model = keras.models.load_model(f"{fileaddr}_best.h5") # Loading the best model of current Keras application

## Predicting Test Data

In [None]:
x_test = np.load("data/x_test.npy") # Loading Test Data

prediction = best_model.predict(x_test)
prediction = np.argmax(prediction, axis=1) # Converting one-hot encoding to an array of labels

# Creating CSV file for predictions
pred_df = pd.DataFrame(columns=["id", "label"])
pred_df["id"] = np.arange(len(x_test))
pred_df["label"] = prediction
pred_df.to_csv("predictions.csv", index=None)