### Imports

In [None]:
from sklearn.preprocessing import LabelBinarizer
from keras.callbacks import LearningRateScheduler
from keras.optimizers import SGD
from keras.datasets import cifar10
import numpy as np
import argparse
import os

### Functions

#### MiniGoogleNet Model

In [None]:
from keras.layers import BatchNormalization, Conv2D, AveragePooling2D, MaxPooling2D, Activation, Dropout, Dense, \
    Flatten, Input, concatenate
from keras.models import Model
from keras import backend as K


class MiniGoogleNet:

    @staticmethod
    def conv_module(x, k, k_x, k_y, stride, chan_dim, padding="same"):
        """
        Module to apply 2D convolution layer followed by an activation layer and finally a batch normalization layer
        :param x: input layer
        :param k: num filters for conv layer to learn
        :param k_x: size of x-axis of K filters to learn
        :param k_y: size of y-axis of K filters to learn
        :param stride: stride of conv layer
        :param chan_dim: channel dimension (derived from channels last or channels first ordering)
        :param padding: type of padding to be applied to conv layer
        :return: Keras layer
        """
        # def a CONV => RELU => BN pattern
        x = Conv2D(k, (k_x, k_y), strides=stride, padding=padding)(x)
        x = Activation("relu")(x)
        x = BatchNormalization(axis=chan_dim)(x)

        return x

    @staticmethod
    def inception_module(x, num_k_1x1, num_k_3x3, chan_dim):
        """
        Module to generate the mini inception module which applies a 1x1 conv_module and a 3x3 conv_module in parallel
        to the input and then merges the 2 results across the channel dimension to form the output
        :param x: input layer
        :param num_k_1x1: num 1x1 conv filters to generate
        :param num_k_3x3: num 3x3 conv filters to generate
        :param chan_dim: channel dimension (derived from channels last or channels first ordering)
        :return: Keras layer
        """
        conv_1x1 = MiniGoogleNet.conv_module(x, num_k_1x1, 1, 1, (1, 1), chan_dim)
        conv_3x3 = MiniGoogleNet.conv_module(x, num_k_3x3, 3, 3, (1, 1), chan_dim)
        x = concatenate([conv_1x1, conv_3x3], axis=chan_dim)

        return x

    @staticmethod
    def downsample_module(x, k, chan_dim):
        """
        Module to decrease the output volume size. Two methods are applied in parallel and the results are then
        merged together. The first down-sampling method is a 3x3 conv layer with a stride of 2x2 and the second is a
        max pooling layer with a 3x3 window and a stride of 2x2.
        :param x: input layer
        :param k: num filters
        :param chan_dim: channel dimension (derived from channels last or channels first ordering)
        :return: Keras layer
        """
        conv_3x3 = MiniGoogleNet.conv_module(x, k, 3, 3, (2, 2), chan_dim, padding="valid")
        pool = MaxPooling2D((3, 3), strides=(2, 2))(x)
        x = concatenate([conv_3x3, pool], axis=chan_dim)

        return x

    @staticmethod
    def build(width, height, depth, classes):
        inputShape = (height, width, depth)
        chan_dim = -1

        if K.image_data_format() == "channels_first":
            inputShape = (depth, height, width)
            chan_dim = 1

        inputs = Input(shape=inputShape)
        x = MiniGoogleNet.conv_module(inputs, 96, 3, 3, (1, 1), chan_dim)

        x = MiniGoogleNet.inception_module(x, 32, 32, chan_dim)
        x = MiniGoogleNet.inception_module(x, 32, 48, chan_dim)
        x = MiniGoogleNet.downsample_module(x, 80, chan_dim)

        x = MiniGoogleNet.inception_module(x, 112, 48, chan_dim)
        x = MiniGoogleNet.inception_module(x, 96, 64, chan_dim)
        x = MiniGoogleNet.inception_module(x, 80, 80, chan_dim)
        x = MiniGoogleNet.inception_module(x, 48, 96, chan_dim)
        x = MiniGoogleNet.downsample_module(x, 96, chan_dim)

        x = MiniGoogleNet.inception_module(x, 176, 160, chan_dim)
        x = MiniGoogleNet.inception_module(x, 176, 160, chan_dim)
        x = AveragePooling2D((7, 7))(x)
        x = Dropout(0.5)(x)

        x = Flatten()(x)
        x = Dense(classes)(x)
        x = Activation("softmax")(x)

        model = Model(inputs, x, name="minigooglenet")

        return model


### Training Monitor

In [None]:
from keras.callbacks import BaseLogger
import matplotlib.pyplot as plt
import numpy as np
import os
import json


class TrainingMonitor(BaseLogger):
    def __init__(self, fig_path=None, json_path=None, start_at=0, start_graph_at=0):
        super(TrainingMonitor, self).__init__()
        self.fig_path = fig_path
        self.json_path = json_path
        self. start_at = start_at
        self.start_graph_at = start_graph_at
        self.H = {}

    def on_train_begin(self, logs={}):

        if self.json_path:
            if os.path.exists(self.json_path):
                self.H = json.loads(open(self.json_path).read())

                # trim any entries that are past the starting epoch
                if self.start_at > 0:
                    for k in self.H.keys():
                        self.H[k] = self.H[k][:self.start_at]

    def on_epoch_end(self, epoch, logs={}):
        for (k, v) in logs.items():
            log = self.H.get(k, [])
            log.append(v)
            self.H[k] = log

        if self.json_path:
            with open(self.json_path, "w") as file:
                file.write(json.dumps(self.H))

        if len(self.H["loss"]) > self.start_graph_at + 1:
            x_val = np.arange(0, len(self.H["loss"][self.start_graph_at:]))
            plt.style.use("ggplot")
            plt.figure()
            plt.plot(x_val, self.H["loss"][self.start_graph_at:], label="train_loss")
            plt.plot(x_val, self.H["val_loss"][self.start_graph_at:], label="val_loss")
            plt.plot(x_val, self.H["accuracy"][self.start_graph_at:], label="train_acc")
            plt.plot(x_val, self.H["val_accuracy"][self.start_graph_at:], label="val_acc")
            plt.title(f"Training Loss and Accuracy [Epoch {len(self.H['loss'])}]")
            plt.xlabel("Epoch #")
            plt.ylabel("Loss / Accuracy")
            plt.legend()

            if self.fig_path:
                plt.savefig(self.fig_path)
                plt.close()
            else:
                plt.show()


### Data Augmentor

In [None]:
!pip install keras_core
!pip install keras_cv



In [None]:
from keras import Sequential
from keras_cv.layers import RandomFlip, RandomRotation, RandomTranslation, RandomZoom, RandomShear


class DataAugmentor:

    @staticmethod
    def build(model):
        data_augmentation = Sequential([
            RandomRotation(0.1),
            RandomTranslation(height_factor=0.1, width_factor=0.1),
            RandomShear(0.2),
            RandomZoom(0.2),
            RandomFlip("horizontal"),
        ])

        model = Sequential([data_augmentation, model])

        return model


### Polynomial Decay

In [None]:
def poly_decay(epoch):
    max_epochs = NUM_EPOCHS
    base_lr = INIT_lR
    # power of 1 gives linear decay, increasing power increases initial and lessens final rate of decay
    power = 1.0

    alpha = base_lr * (1 - (epoch / float(max_epochs))) ** power

    return alpha

### Main

In [None]:
NUM_EPOCHS = 70
INIT_lR = 1e-2

args = {}
args["model"] = "/content/minigooglenet_cifar_10.keras"
args["output"] = "/content/output"

print("[INFO loading CIFAR-10 data...")
((trainX, trainY), (testX, testY)) = cifar10.load_data()
trainX = trainX.astype("float")
testX = testX.astype("float")

# apply mean subtraction
mean = np.mean(trainX, axis=0)
trainX -= mean
testX -= mean

lb = LabelBinarizer()
trainY = lb.fit_transform(trainY)
testY = lb.transform(testY)

# construct callbacks
fig_path = os.path.sep.join([args["output"], "{}.png".format(os.getpid())])
json_path = os.path.sep.join([args["output"], "{}.json".format(os.getpid())])
callbacks = [TrainingMonitor(fig_path, json_path=json_path), LearningRateScheduler(poly_decay)]

print("[INFO] compiling model...")
opt = SGD(learning_rate=INIT_lR, momentum=0.9)
model = MiniGoogleNet.build(width=32, height=32, depth=3, classes=10)
model = DataAugmentor.build(model)
model.compile(loss="categorical_crossentropy", optimizer=opt, metrics=["accuracy"])

print("[INFO] training network...")
model.fit(trainX, trainY, batch_size=64, validation_data=(testX, testY),
          epochs=NUM_EPOCHS, callbacks=callbacks, verbose=1)
print("[INFO] serializing network...")
model.save(args["model"])

[INFO loading CIFAR-10 data...
[INFO] compiling model...
[INFO] training network...
Epoch 1/70
