In [None]:
!pip install keras_cv

In [None]:
import numpy as np
import cv2
import matplotlib.pyplot as plt
import tensorflow as tf
import tensorflow_datasets as tfds
import os
import keras
import keras_cv
from tensorflow.keras import layers, models, Model
from keras.optimizers import Adam,SGD
from keras.datasets import cifar100
from keras.models import Sequential
from keras.layers import Dense, Activation, Dropout, Flatten, Conv2D, MaxPooling2D, BatchNormalization, Input,AveragePooling2D
from keras.regularizers import l2
np.random.seed(42)
tf.random.set_seed(42)

In [None]:
(x_train, y_train), (x_test, y_test) = cifar100.load_data()
n = len(x_train)
train_idx = np.random.choice(range(n),size=int(0.8*n),replace=False)
val_idx = np.setdiff1d(range(n),train_idx)
x_train, y_train, x_val, y_val = x_train[train_idx], y_train[train_idx], x_train[val_idx],y_train[val_idx]
plt.imshow(x_train[0])
plt.show()

In [None]:
strategy = tf.distribute.MirroredStrategy()
print('Number of devices: {}'.format(strategy.num_replicas_in_sync))

def scale(image):
    image = tf.cast(image, tf.float32)
    image /= 255
    return image

In [None]:
def conv2d_bn(x, filters, kernel_size, weight_decay=.0, strides=(1, 1)):
    layer = Conv2D(filters=filters,
                   kernel_size=kernel_size,
                   strides=strides,
                   padding='same',
                   use_bias=True,
                   kernel_regularizer=l2(weight_decay)
                   )(x)
    layer = BatchNormalization()(layer)
    return layer


def conv2d_bn_relu(x, filters, kernel_size, weight_decay=.0, strides=(1, 1)):
    layer = conv2d_bn(x, filters, kernel_size, weight_decay, strides)
    layer = Activation('relu')(layer)
    return layer


def ResidualBlock(x, filters, kernel_size, weight_decay, downsample=True):
    if downsample:
        # residual_x = conv2d_bn_relu(x, filters, kernel_size=1, strides=2)
        residual_x = conv2d_bn(x, filters, kernel_size=1, strides=2)
        stride = 2
    else:
        residual_x = x
        stride = 1
    residual = conv2d_bn_relu(x,
                              filters=filters,
                              kernel_size=kernel_size,
                              weight_decay=weight_decay,
                              strides=stride,
                              )
    residual = conv2d_bn(residual,
                         filters=filters,
                         kernel_size=kernel_size,
                         weight_decay=weight_decay,
                         strides=1,
                         )
    out = layers.add([residual_x, residual])
    out = Activation('relu')(out)
    return out


def ResNet18(classes, input_shape, weight_decay=1e-4):
    input = Input(shape=input_shape)
    x = input
    x = conv2d_bn_relu(x, filters=16, kernel_size=(7, 7), weight_decay=weight_decay, strides=(2, 2))
    x = MaxPooling2D(pool_size=(3, 3), strides=(2, 2),  padding='same')(x)
#     x = conv2d_bn_relu(x, filters=16, kernel_size=(3, 3), weight_decay=weight_decay, strides=(1, 1))

    # # conv 2
    x = ResidualBlock(x, filters=16, kernel_size=(5, 5), weight_decay=weight_decay, downsample=False)
    x = ResidualBlock(x, filters=16, kernel_size=(5, 5), weight_decay=weight_decay, downsample=False)
    # # conv 3
    x = ResidualBlock(x, filters=32, kernel_size=(5, 5), weight_decay=weight_decay, downsample=True)
    x = ResidualBlock(x, filters=32, kernel_size=(5, 5), weight_decay=weight_decay, downsample=False)
    # # conv 4
    x = ResidualBlock(x, filters=64, kernel_size=(3, 3), weight_decay=weight_decay, downsample=True)
    x = ResidualBlock(x, filters=64, kernel_size=(3, 3), weight_decay=weight_decay, downsample=False)
    # # conv 5
#     x = ResidualBlock(x, filters=256, kernel_size=(3, 3), weight_decay=weight_decay, downsample=True)
#     x = ResidualBlock(x, filters=256, kernel_size=(3, 3), weight_decay=weight_decay, downsample=False)
#     x = MaxPooling2D(pool_size=(4, 4), padding='valid')(x)
    x = layers.GlobalAvgPool2D()(x)
#     x = Flatten()(x)
    x = Dense(classes, activation='softmax')(x)
    model = Model(input, x, name='ResNet18')
    return model


In [None]:
model = ResNet18(input_shape=(32, 32, 3), classes=100, weight_decay=1e-4)
model.summary()

In [None]:
# lr=0.002
# def lr_scheduler(epoch):
#     new_lr = lr * (0.5 ** (epoch//10))
#     return new_lr 

callbacks = [
    tf.keras.callbacks.TensorBoard(log_dir='./logs_baseline'),
    tf.keras.callbacks.ModelCheckpoint(filepath='./best.h5',
                                       save_weights_only=True,
                                       save_best_only=True,
                                       monitor='val_loss',mode='min'),
#     tf.keras.callbacks.LearningRateScheduler(lr_scheduler,
#                                             verbose=5),
    tf.keras.callbacks.EarlyStopping(monitor="val_loss",
                                     patience=10,
                                     verbose=5,mode="auto",)
]

with strategy.scope():
    model = ResNet18(input_shape=(32, 32, 3), classes=100, weight_decay=1e-4)
#     model = get_training_model()
    model.compile(optimizer=Adam(), loss='sparse_categorical_crossentropy',
                  metrics=['accuracy'])
model.fit(scale(x_train), y_train, epochs=50, batch_size=64, 
          validation_data=(scale(x_val), y_val), callbacks=callbacks)

In [None]:
model.load_weights('../working/best.h5')
preds = model.predict(scale(x_test))
baseline_acc = np.mean(keras.metrics.sparse_categorical_accuracy(y_test,preds))
baseline_acc

# cutout

In [None]:
random_cutout = keras_cv.layers.preprocessing.RandomCutout(0.3, 0.3, seed=42)
cutout_x_train = random_cutout(x_train)
callbacks[0] = tf.keras.callbacks.TensorBoard(log_dir='./logs_cutout')
callbacks[1] = tf.keras.callbacks.ModelCheckpoint(filepath='./cutout_best.h5',
                                       save_weights_only=True,
                                       save_best_only=True,
                                       monitor='val_loss',mode='min')
with strategy.scope():
    model = ResNet18(input_shape=(32, 32, 3), classes=100, weight_decay=1e-4)
    model.compile(optimizer=Adam(), loss='sparse_categorical_crossentropy', 
                  metrics=['accuracy'])
model.fit(scale(cutout_x_train), y_train, epochs=50, batch_size=64, 
          validation_data=(scale(x_val), y_val), callbacks=callbacks)

model.load_weights('../working/cutout_best.h5')
preds = model.predict(scale(x_test))
cutout_acc = np.mean(keras.metrics.sparse_categorical_accuracy(y_test,preds))
cutout_acc

# mixup

In [None]:
oh_y_train = tf.cast(tf.one_hot(y_train.squeeze(), 100), tf.float32)
mixup = keras_cv.layers.preprocessing.MixUp(seed=42,alpha=0.5)
output = mixup(
    {'images': x_train[:], 'labels': oh_y_train[:]}
)
mixup_x_train, mixup_y_train = output['images'],output['labels']
callbacks[0] = tf.keras.callbacks.TensorBoard(log_dir='./logs_mixup'),
callbacks[1] = tf.keras.callbacks.ModelCheckpoint(filepath='./mixup_best.h5',
                                       save_weights_only=True,
                                       save_best_only=True,
                                       monitor='val_loss',mode='min')
with strategy.scope():
    model = ResNet18(input_shape=(32, 32, 3), classes=100, weight_decay=1e-4)
    model.compile(optimizer=Adam(), loss='categorical_crossentropy', 
                  metrics=['accuracy'])
model.fit(scale(mixup_x_train), mixup_y_train, epochs=50, batch_size=64, 
          validation_data=(scale(x_val), tf.one_hot(y_val.squeeze(), 100)),
          callbacks=callbacks)

model.load_weights('../working/mixup_best.h5')
preds = model.predict(scale(x_test))
mixup_acc = np.mean(keras.metrics.sparse_categorical_accuracy(y_test,preds))
mixup_acc

# cutmix

In [None]:
oh_y_train = tf.cast(tf.one_hot(y_train.squeeze(), 100), tf.float32)
cutmix = keras_cv.layers.preprocessing.cut_mix.CutMix(seed=42,alpha=0.5)
output = cutmix(
    {'images': x_train[:], 'labels': oh_y_train[:]}
)
cutmix_x_train, cutmix_y_train = output['images'],output['labels']
callbacks[0] = tf.keras.callbacks.TensorBoard(log_dir='./logs_cutmix'),
callbacks[1] = tf.keras.callbacks.ModelCheckpoint(filepath='./cutmix_best.h5',
                                       save_weights_only=True,
                                       save_best_only=True,
                                       monitor='val_loss',mode='min')
with strategy.scope():
    model=ResNet18(input_shape=(32, 32, 3), classes=100, weight_decay=1e-4)
    model.compile(optimizer=Adam(), loss='categorical_crossentropy', 
                  metrics=['accuracy'])
model.fit(scale(cutmix_x_train), cutmix_y_train, epochs=50, batch_size=64, 
          validation_data=(scale(x_val), tf.one_hot(y_val.squeeze(), 100)),
          callbacks=callbacks)

model.load_weights('../working/cutmix_best.h5')
preds = model.predict(scale(x_test))
cutmix_acc = np.mean(keras.metrics.sparse_categorical_accuracy(y_test,preds))

In [None]:
print(f'baseline acc: {baseline_acc }\n ')
print(f'cutout acc: {cutout_acc }\n ')
print(f'mixup acc: {mixup_acc }\n ')
print(f'cutmix acc: {cutmix_acc }\n ')

In [None]:
plt.figure(figsize=(12,12))
plt.subplot(3,3,1)
plt.imshow(cutout_x_train[0].numpy().astype('uint8'))
plt.subplot(3,3,2)
plt.imshow(mixup_x_train[0].numpy().astype('uint8'))
plt.subplot(3,3,3)
plt.imshow(cutmix_x_train[0].numpy().astype('uint8'))
plt.subplot(3,3,4)
plt.imshow(cutout_x_train[1].numpy().astype('uint8'))
plt.subplot(3,3,5)
plt.imshow(mixup_x_train[1].numpy().astype('uint8'))
plt.subplot(3,3,6)
plt.imshow(cutmix_x_train[1].numpy().astype('uint8'))
plt.subplot(3,3,7)
plt.imshow(cutout_x_train[2].numpy().astype('uint8'))
plt.subplot(3,3,8)
plt.imshow(mixup_x_train[2].numpy().astype('uint8'))
plt.subplot(3,3,9)
plt.imshow(cutmix_x_train[2].numpy().astype('uint8'))
plt.savefig('q1.png')