## Libraries

In [None]:
import matplotlib.pyplot as plt
import numpy as np
import tensorflow as tf 
import keras
from keras.models import Sequential
from keras.layers import Dense, Activation
from keras import layers
from keras.layers import Conv2D
from keras.layers import AveragePooling2D
from keras.layers import Flatten
from keras.layers import MaxPool2D
from keras.layers import Input
from keras.layers import BatchNormalization
from keras.models import Model
from keras.regularizers import l2
from keras import losses
from keras import optimizers
from keras.preprocessing.image import ImageDataGenerator

import keras.backend as K
from keras.callbacks import LearningRateScheduler
from keras.callbacks import ModelCheckpoint

In [None]:
!pip install keras-cv
import tensorflow_datasets as tfds
import keras_cv
from keras_cv.layers import AugMix, CutMix, MixUp, GridMask
from tensorflow.keras.optimizers import Adam, SGD
from keras.callbacks import ModelCheckpoint, EarlyStopping

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting keras-cv
  Downloading keras_cv-0.4.1-py3-none-any.whl (615 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m615.8/615.8 KB[0m [31m37.3 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: keras-cv
Successfully installed keras-cv-0.4.1
You do not have Waymo Open Dataset installed, so KerasCV Waymo metrics are not available.


In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


## Data loading

In [None]:
# Change the path according to your specific GDrive
!unzip -u -q "/content/drive/MyDrive/Machine Learning II/cifar10_1000.zip" 

In [None]:
size = '1000'           # Sizes: 1000  2000  3000  4000  original
aug_factor = 4          # Factors: 0.25  0.67  1  1.5  2  3  4
technique = "augmix"    # Techniques: cutmix  augmix  mixup  gridmask  baseline

In [None]:
# Setting constant variables and seed

ds_name = "cifar10_" + size
img_height = img_width = 32
batch_size = 128
AUTOTUNE = tf.data.AUTOTUNE
np.random.seed(42)

In [None]:
# Loading datasets from GDrive directory

train_ds = tf.keras.utils.image_dataset_from_directory(
  f"/content/{ds_name}/train",
  validation_split=0.1,
  subset="training",
  seed=30,
  image_size=(img_height, img_width),
  batch_size=batch_size,
  label_mode="categorical")

val_ds = tf.keras.utils.image_dataset_from_directory(
  f"/content/{ds_name}/train",
  validation_split=0.1,
  subset="validation",
  seed=30,
  image_size=(img_height, img_width),
  batch_size=batch_size,
  label_mode="categorical")

test_ds = tf.keras.utils.image_dataset_from_directory(
  f"/content/{ds_name}/test",
  seed=30,
  image_size=(img_height, img_width),
  batch_size=batch_size,
  label_mode="categorical")

Found 10000 files belonging to 10 classes.
Using 9000 files for training.
Found 10000 files belonging to 10 classes.
Using 1000 files for validation.
Found 10000 files belonging to 10 classes.


In [None]:
# FUNCTIONS

def ds_to_np(dataset):
    '''
    Transform a tf dataset in a numpy array, splitting X values from Y labels
    '''
    ds_np = np.array(list(dataset.unbatch().as_numpy_iterator()), dtype=object)
    flat_ds = np.asarray([x for x in ds_np])
    np.random.shuffle(flat_ds)
    img_ds = flat_ds[:,0]
    lbl_ds = flat_ds[:,1]
    X = np.asarray([img for img in img_ds], dtype='float32')
    Y = np.asarray([lbl for lbl in lbl_ds], dtype='float32')
    return X, Y
    
def visualize_dataset(dataset, title):
    '''
    Visualize 9 random images from a given dataset
    '''
    plt.figure(figsize=(12, 12)).suptitle(title, fontsize=18)
    for i, samples in enumerate(iter(dataset.take(9))):
        images = samples[0]
        plt.subplot(3, 3, i + 1)
        plt.imshow(images[0].numpy().astype("uint8"))
        plt.axis("off")
    plt.show()

# Preprocessing for data augmentation
def to_dict(image, label):
    return {"images": image, "labels": label}

def preprocess_for_model(inputs):
    images, labels = inputs["images"], inputs["labels"]
    images = tf.cast(images, tf.float32)
    return images, labels

def data_augmentation(inputs, technique):
    return inputs.map(to_dict, num_parallel_calls=AUTOTUNE).map(lambda x: technique(x)).map(preprocess_for_model)

# Combining source dataset with the augmented part
def prepare_combined_dataset(train_ds, augmentation_size, technique):
    '''
    :param train_ds: Starting dataset
    :param augmentation_size: Choose the augmentation factor among [0.25, 0.67, 1, 1.5, 2, 3, 4] 
        e.g. 0.25 means that 25% of the given dataset will be augmented, resulting in a 125% bigger combined dataset
    :param technique: Choose a technique among 'cutmix', 'augmix', 'mixup', 'gridmask'
    :return the augmented subset and the full combined one.
    
    Performing augmentation and concatenating augmented dataset with starting dataset
    
    '''
    train_size = train_ds.cardinality().numpy()
    if augmentation_size < 1:
        subset_to_aug = train_ds.take(augmentation_size * train_size)   
        train_ds_aug = data_augmentation(subset_to_aug, technique)
    elif augmentation_size == 1.5:
        train_ds_aug_int = data_augmentation(train_ds, technique)
        subset_to_aug = train_ds.take(0.5 * train_size)   
        train_ds_aug_dec = data_augmentation(subset_to_aug, technique)
        train_ds_aug = train_ds_aug_int.concatenate(train_ds_aug_dec)
    else:
        train_ds_aug = data_augmentation(train_ds, technique)
        for _ in range(augmentation_size-1):
            train_ds_aug1 = data_augmentation(train_ds, technique)
            train_ds_aug = train_ds_aug.concatenate(train_ds_aug1)

    train_ds_combined = train_ds.concatenate(train_ds_aug)
    return train_ds_aug, train_ds_combined
    
    

## Setting Data Augmentation

In [None]:
techniques = {
    'cutmix': keras_cv.layers.CutMix(),
    'augmix': keras_cv.layers.AugMix((0,255)),
    'mixup': keras_cv.layers.MixUp(),
    'gridmask': keras_cv.layers.GridMask()
}

In [None]:
# Performing data augmentation only if not 'baseline' 
if technique != 'baseline':
    sub_aug, comb_ds = prepare_combined_dataset(train_ds, aug_factor, techniques[technique])
    print('Batches augmented:', sub_aug.cardinality().numpy())
    print('Batches not augmented:', train_ds.cardinality().numpy())
    print('Batches combined:', comb_ds.cardinality().numpy())
else:
    print('Batches in train set:', train_ds.cardinality().numpy())

Batches augmented: 284
Batches not augmented: 71
Batches combined: 355


In [None]:
# Transforming tensorflow dataset in numpy arrays
if technique == 'baseline':
    X_train, Y_train = ds_to_np(train_ds) 

else:
    X_train, Y_train = ds_to_np(comb_ds)

X_val, Y_val = ds_to_np(val_ds)
X_test, Y_test = ds_to_np(test_ds)

## ResNet20 and CIFAR-10

In [None]:
# Defining a class for handling cifar10 data
class CIFAR10Data(object):

    def __init__(self, x_train, y_train, x_val, y_val, x_test, y_test):
         self.x_train = x_train
         self.y_train = y_train
         self.x_val = x_val
         self.y_val = y_val
         self.x_test = x_test
         self.y_test = y_test
         
         self.classes = ['plane', 'car', 'bird', 'cat', 'deer', 'dog', 'frog', 'horse', 'ship', 'truck']

         print('CIFAR10 Training data shape:', self.x_train.shape)
         print('CIFAR10 Training label shape', self.y_train.shape)
         print('CIFAR10 Validation data shape', self.x_val.shape)
         print('CIFAR10 Validation label shape', self.y_val.shape)
         print('CIFAR10 Test data shape', self.x_test.shape)
         print('CIFAR10 Test label shape', self.y_test.shape)


    def get_data(self, subtract_mean=True, output_shape=None):
        num_classes = len(self.classes)
        x_train = self.x_train
        x_val = self.x_val
        x_test = self.x_test

        x_train = x_train.astype('float16')
        y_train = self.y_train

        x_val = x_val.astype('float16')
        y_val = self.y_val

        x_test = x_test.astype('float16')
        y_test = self.y_test


        # normalization: subtract the mean value
        if subtract_mean:
            mean_image = np.mean(x_train, axis=0)
            x_train -= mean_image
            x_val -= mean_image
            x_test -= mean_image

        return x_train, y_train, x_val, y_val, x_test, y_test

In [None]:
def conv2d_bn(x, filters, kernel_size, weight_decay=.0, strides=(1, 1)):
    layer = Conv2D(filters=filters,
                   kernel_size=kernel_size,
                   strides=strides,
                   padding='same',
                   use_bias=False,
                   kernel_regularizer=l2(weight_decay)
                   )(x)
    layer = BatchNormalization()(layer)
    return layer


def conv2d_bn_relu(x, filters, kernel_size, weight_decay=.0, strides=(1, 1)):
    layer = conv2d_bn(x, filters, kernel_size, weight_decay, strides)
    layer = Activation('relu')(layer)
    return layer


def ResidualBlock(x, filters, kernel_size, weight_decay, downsample=True):
    if downsample:
        # residual_x = conv2d_bn_relu(x, filters, kernel_size=1, strides=2)
        residual_x = conv2d_bn(x, filters, kernel_size=1, strides=2)
        stride = 2
    else:
        residual_x = x
        stride = 1
    residual = conv2d_bn_relu(x,
                              filters=filters,
                              kernel_size=kernel_size,
                              weight_decay=weight_decay,
                              strides=stride,
                              )
    residual = conv2d_bn(residual,
                         filters=filters,
                         kernel_size=kernel_size,
                         weight_decay=weight_decay,
                         strides=1,
                         )
    out = layers.add([residual_x, residual])
    out = Activation('relu')(out)
    return out


def ResNet18(classes, input_shape, weight_decay=1e-4):
    input = Input(shape=input_shape)
    x = input
    x = conv2d_bn_relu(x, filters=64, kernel_size=(3, 3), weight_decay=weight_decay, strides=(1, 1))

    # # conv 2
    x = ResidualBlock(x, filters=64, kernel_size=(3, 3), weight_decay=weight_decay, downsample=False)
    x = ResidualBlock(x, filters=64, kernel_size=(3, 3), weight_decay=weight_decay, downsample=False)
    # # conv 3
    x = ResidualBlock(x, filters=128, kernel_size=(3, 3), weight_decay=weight_decay, downsample=True)
    x = ResidualBlock(x, filters=128, kernel_size=(3, 3), weight_decay=weight_decay, downsample=False)
    # # conv 4
    x = ResidualBlock(x, filters=256, kernel_size=(3, 3), weight_decay=weight_decay, downsample=True)
    x = ResidualBlock(x, filters=256, kernel_size=(3, 3), weight_decay=weight_decay, downsample=False)
    # # conv 5
    x = ResidualBlock(x, filters=512, kernel_size=(3, 3), weight_decay=weight_decay, downsample=True)
    x = ResidualBlock(x, filters=512, kernel_size=(3, 3), weight_decay=weight_decay, downsample=False)
    x = AveragePooling2D(pool_size=(4, 4), padding='valid')(x)
    x = Flatten()(x)
    x = Dense(classes, activation='softmax')(x)
    model = Model(input, x, name='ResNet18')
    return model


def ResNetForCIFAR10(classes, name, input_shape, block_layers_num, weight_decay):
    input = Input(shape=input_shape)
    x = input
    x = conv2d_bn_relu(x, filters=16, kernel_size=(3, 3), weight_decay=weight_decay, strides=(1, 1))

    # # conv 2
    for i in range(block_layers_num):
        x = ResidualBlock(x, filters=16, kernel_size=(3, 3), weight_decay=weight_decay, downsample=False)
    # # conv 3
    x = ResidualBlock(x, filters=32, kernel_size=(3, 3), weight_decay=weight_decay, downsample=True)
    for i in range(block_layers_num - 1):
        x = ResidualBlock(x, filters=32, kernel_size=(3, 3), weight_decay=weight_decay, downsample=False)
    # # conv 4
    x = ResidualBlock(x, filters=64, kernel_size=(3, 3), weight_decay=weight_decay, downsample=True)
    for i in range(block_layers_num - 1):
        x = ResidualBlock(x, filters=64, kernel_size=(3, 3), weight_decay=weight_decay, downsample=False)
    x = AveragePooling2D(pool_size=(8, 8), padding='valid')(x)
    x = Flatten()(x)
    x = Dense(classes, activation='softmax')(x)
    model = Model(input, x, name=name)
    return model


def ResNet20ForCIFAR10(classes, input_shape, weight_decay):
    return ResNetForCIFAR10(classes, 'resnet20', input_shape, 3, weight_decay)

In [None]:
weight_decay = 1e-4
lr = 1e-1
num_classes = 10
resnet20 = ResNet20ForCIFAR10(input_shape=(32, 32, 3), classes=num_classes, weight_decay=weight_decay)
opt = optimizers.SGD(lr=lr, momentum=0.9, nesterov=False)
resnet20.compile(optimizer=opt,
                 loss=losses.CategoricalCrossentropy(), # label_smoothing = 0.1 for cutmix and augmix
                 metrics=['accuracy'])
#resnet20.summary()

  super(SGD, self).__init__(name, **kwargs)


In [None]:
def plot_history(history):
    """
    plot train epoch history and acc
    :param history: train history object returned by CIFAR10Solver.train()
    """
    plt.plot(history.history['loss'])
    plt.plot(history.history['val_loss'])
    plt.xlabel('epoch')
    plt.ylabel('Loss value')
    plt.legend(['train', 'test'], loc='upper left')
    plt.show()

    plt.plot(history.history['acc'])
    plt.plot(history.history['val_acc'])
    plt.xlabel('epoch')
    plt.ylabel('acc value')
    plt.legend(['train', 'test'], loc='upper left')
    plt.show()


class CIFAR10Solver(object):
    """
    A CIFAR10Solver encapsulates all the logic nessary for training cifar10 classifiers.The train model is defined
    outside, you must pass it to init().
    The solver train the model, plot loss and aac history, and test on the test data.
    Example usage might look something like this.
    model = MyAwesomeModel(opt=SGD, losses='categorical_crossentropy',  metrics=['acc'])
    model.compile(...)
    model.summary()
    solver = CIFAR10Solver(model)
    history = solver.train()
    plotHistory(history)
    solver.test()
    """

    def __init__(self, model, data):
        """
        :param model: A model object conforming to the API described above
        :param data:  A tuple of training, validation and test data from CIFAR10Data
        """
        self.model = model
        self.X_train, self.Y_train, self.X_val, self.Y_val, self.X_test, self.Y_test = data

    def __on_epoch_end(self, epoch, logs=None):
        print(K.eval(self.model.optimizer.lr))

    def train(self, epochs=200, batch_size=128, data_augmentation=True, callbacks=None):
        if data_augmentation:
            # datagen
            datagen = ImageDataGenerator(
                featurewise_center=False,  # set input mean to 0 over the dataset
                samplewise_center=False,  # set each sample mean to 0
                featurewise_std_normalization=False,  # divide inputs by std of the dataset
                samplewise_std_normalization=False,  # divide each input by its std
                zca_whitening=False,  # apply ZCA whitening
                # rotation_range=15,  # randomly rotate images in the range (degrees, 0 to 180)
                width_shift_range=4,  # randomly shift images horizontally (fraction of total width)
                height_shift_range=4,  # randomly shift images vertically (fraction of total height)
                horizontal_flip=True,  # randomly flip images
                vertical_flip=False,  # randomly flip images
            )
            # (std, mean, and principal components if ZCA whitening is applied).
            # datagen.fit(x_train)
            print('Training model with (light) data augmentation...')
            train_gen = datagen.flow(self.X_train, self.Y_train, batch_size=batch_size)
            history = self.model.fit_generator(generator=train_gen,
                                               epochs=epochs,
                                               callbacks=callbacks,
                                               validation_data=(self.X_val, self.Y_val),
                                               )
        else:
            print('Training model without data augmentation...')
            history = self.model.fit(self.X_train, self.Y_train,
                                     batch_size=batch_size, epochs=epochs,
                                     callbacks=callbacks,
                                     validation_data=(self.X_val, self.Y_val),
                                     )
        return history

    def test(self):
        loss, acc = self.model.evaluate(self.X_test, self.Y_test)
        print('test data loss:%.2f acc:%.4f' % (loss, acc))

## Training and Testing

In [None]:
# get data
cifar10_data = CIFAR10Data(X_train, Y_train, X_val, Y_val, X_test, Y_test)
data = cifar10_data.get_data(subtract_mean=True)

CIFAR10 Training data shape: (18000, 32, 32, 3)
CIFAR10 Training label shape (18000, 10)
CIFAR10 Validation data shape (1000, 32, 32, 3)
CIFAR10 Validation label shape (1000, 10)
CIFAR10 Test data shape (10000, 32, 32, 3)
CIFAR10 Test label shape (10000, 10)


In [None]:
def lr_scheduler(epoch):
    new_lr = lr
    if epoch <= 91:
        pass
    elif epoch > 91 and epoch <= 137:
        new_lr = lr * 0.1
    else:
        new_lr = lr * 0.01
    print('new lr:%.2e' % new_lr)
    return new_lr 

reduce_lr = LearningRateScheduler(lr_scheduler)
save_best = ModelCheckpoint('/content/drive/MyDrive/Machine Learning II/best_models/'+f"{ds_name}_{aug_factor}_{technique}.h5", monitor='val_accuracy', verbose=1,save_best_only=True, mode='max')


solver = CIFAR10Solver(resnet20, data)
history = solver.train(epochs=182, batch_size=128, data_augmentation=False, callbacks=[reduce_lr, save_best])

Training model without data augmentation...
new lr:1.00e-01
Epoch 1/182
Epoch 1: val_accuracy improved from -inf to 0.21800, saving model to /content/drive/MyDrive/Machine Learning II/best_models/cifar10_1000_1_augmix.h5
new lr:1.00e-01
Epoch 2/182
Epoch 2: val_accuracy improved from 0.21800 to 0.28800, saving model to /content/drive/MyDrive/Machine Learning II/best_models/cifar10_1000_1_augmix.h5
new lr:1.00e-01
Epoch 3/182
Epoch 3: val_accuracy improved from 0.28800 to 0.46700, saving model to /content/drive/MyDrive/Machine Learning II/best_models/cifar10_1000_1_augmix.h5
new lr:1.00e-01
Epoch 4/182
Epoch 4: val_accuracy improved from 0.46700 to 0.50900, saving model to /content/drive/MyDrive/Machine Learning II/best_models/cifar10_1000_1_augmix.h5
new lr:1.00e-01
Epoch 5/182
Epoch 5: val_accuracy did not improve from 0.50900
new lr:1.00e-01
Epoch 6/182
Epoch 6: val_accuracy improved from 0.50900 to 0.54600, saving model to /content/drive/MyDrive/Machine Learning II/best_models/cifar

KeyboardInterrupt: ignored

In [None]:
# TEST IN DISTRIBUTION
solver.model = keras.models.load_model(f"/content/drive/MyDrive/Machine Learning II/best_models/{ds_name}_{aug_factor}_{technique}.h5")
solver.test()

test data loss:1.22 acc:0.8072


### Out-of distribution testing

In [None]:
# LOAD MODEL
solver = CIFAR10Solver(resnet20, data)


#### CIFAR-10C - Mean Robust Accuracy

In [None]:
import os

y_test_ood = np.load("/content/drive/MyDrive/Machine Learning II/labels.npy")
y_test_ood = keras.utils.to_categorical(y_test_ood, num_classes)
acc_ood_tot = 0
for corruption in os.listdir("/content/drive/MyDrive/Machine Learning II/CIFAR-10-C"):
    print(f"------{corruption}------\n")
    x_test_ood = np.load("/content/drive/MyDrive/Machine Learning II/CIFAR-10-C" + "/" + corruption)
    loss, acc_ood = solver.model.evaluate(x_test_ood, y_test_ood)
    acc_ood_tot += acc_ood


print("Mean Robust Accuracy: ", acc_ood_tot/len(os.listdir("/content/drive/MyDrive/Machine Learning II/CIFAR-10-C")))

    

Mean Robust Accuracy:  0.4974631588709982
