# ResNet-9: CIFAR-10

In [1]:
import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt
import math

# import tensorflow_model_optimization as tfmot
# from tensorflow_model_optimization.sparsity import keras as sparsity
# from tensorflow.keras import datasets, layers, models

import matplotlib.pyplot as plt

from tensorflow import keras
from tensorflow.keras.layers import AveragePooling2D, Conv2D, MaxPooling2D, ReLU, BatchNormalization
from tensorflow.keras import models, layers, datasets
from tensorflow.keras.layers import Dense, Flatten, Reshape, Input, InputLayer, Dropout
from tensorflow.keras.models import Sequential, Model
from tensorflow.keras.initializers import RandomNormal
# import math
from sklearn.metrics import accuracy_score, precision_score, recall_score

from tensorflow.keras.preprocessing.image import ImageDataGenerator

In [2]:
tf.__version__

'2.4.1'

In [None]:
# %env CUDA_DEVICE_ORDER=PCI_BUS_ID
# %env CUDA_VISIBLE_DEVICES=2

In [3]:
# Check GPU availibility-
'''
This API allows querying the physical hardware resources prior to runtime
initialization. Thus, giving an opportunity to call any additional
configuration APIs.
The following code lists the number of visible GPUs on the host-
'''
physical_gpus = tf.config.list_physical_devices('GPU')
print(f"Number of physical GPUs available: {len(physical_gpus)}")

Number of physical GPUs available: 1


In [4]:
batch_size = 128
num_classes = 10
num_epochs = 200

In [5]:
# Data preprocessing and cleaning:
# input image dimensions
img_rows, img_cols = 32, 32

# Load CIFAR-10 dataset-
(X_train, y_train), (X_test, y_test) = tf.keras.datasets.cifar10.load_data()

# Load CIFAR-100 dataset-
# (X_train, y_train), (X_test, y_test) = tf.keras.datasets.cifar100.load_data()

Downloading data from https://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz


In [6]:
if tf.keras.backend.image_data_format() == 'channels_first':
    X_train = X_train.reshape(X_train.shape[0], 3, img_rows, img_cols)
    X_test = X_test.reshape(X_test.shape[0], 3, img_rows, img_cols)
    input_shape = (3, img_rows, img_cols)
else:
    X_train = X_train.reshape(X_train.shape[0], img_rows, img_cols, 3)
    X_test = X_test.reshape(X_test.shape[0], img_rows, img_cols, 3)
    input_shape = (img_rows, img_cols, 3)

print("\n'input_shape' which will be used = {0}\n".format(input_shape))


'input_shape' which will be used = (32, 32, 3)



In [7]:
# convert class vectors/target to binary class matrices or one-hot encoded values-
y_train = tf.keras.utils.to_categorical(y_train, num_classes)
y_test = tf.keras.utils.to_categorical(y_test, num_classes)

In [8]:
print("\nDimensions of training and testing sets are:")
print("X_train.shape = {0}, y_train.shape = {1}".format(X_train.shape, y_train.shape))
print("X_test.shape = {0}, y_test.shape = {1}".format(X_test.shape, y_test.shape))


Dimensions of training and testing sets are:
X_train.shape = (50000, 32, 32, 3), y_train.shape = (50000, 10)
X_test.shape = (10000, 32, 32, 3), y_test.shape = (10000, 10)


In [9]:
# Instantiate TensorFlow's Dataset class for creating training and testing datasets-
train_dataset = tf.data.Dataset.from_tensor_slices((X_train, y_train))
test_dataset = tf.data.Dataset.from_tensor_slices((X_test, y_test))

### Data Augmentation:

Augment CIFAR-10 dataset by performing the following steps on every image:

- Pad the image with a black, four-pixel border.
- Randomly crop a 32 x 32 region from the padded image.
- Flip a coin to determine if the image should be horizontally flipped.

In [10]:
HEIGHT = 32
WIDTH = 32
NUM_CHANNELS = 3

In [11]:
def augmentation(x, y):
    x = tf.image.resize_with_crop_or_pad(x, HEIGHT + 8, WIDTH + 8)
    x = tf.image.random_crop(x, [HEIGHT, WIDTH, NUM_CHANNELS])
    x = tf.image.random_flip_left_right(x)

    return x, y

In [12]:
def normalize(x, y):
    x = tf.image.per_image_standardization(x)
    return x, y

In [13]:
# Augment training dataset
train_dataset = (train_dataset
        .map(augmentation)
        .shuffle(buffer_size = 50000)
        .map(normalize)
        .batch(batch_size = batch_size, drop_remainder = True))


# NOTE: Do NOT augment the testing dataset!

In [14]:
test_dataset = (test_dataset
                .map(normalize)
                .batch(batch_size = batch_size, drop_remainder = True))

In [15]:
# Obtain a batch size set of training images and labels-
images, labels = next(iter(train_dataset))

In [16]:
images.shape, images.shape

(TensorShape([128, 32, 32, 3]), TensorShape([128, 32, 32, 3]))

### Prepare CIFAR10 dataset for GradientTape training:

In [17]:
# Choose an optimizer and loss function for training-
loss_fn = tf.keras.losses.CategoricalCrossentropy()
optimizer = tf.keras.optimizers.SGD(learning_rate = 0.01, momentum = 0.9)

In [18]:
# Select metrics to measure the error & accuracy of model.
# These metrics accumulate the values over epochs and then
# print the overall result-
train_loss = tf.keras.metrics.Mean(name = 'train_loss')
train_accuracy = tf.keras.metrics.CategoricalAccuracy(name = 'train_accuracy')

test_loss = tf.keras.metrics.Mean(name = 'test_loss')
test_accuracy = tf.keras.metrics.CategoricalAccuracy(name = 'test_accuracy')

### ResNet-9 model experimentation:

In [None]:
inputs = keras.Input(shape = (32, 32, 3))

x = Conv2D(
    filters = 32, kernel_size = (3, 3),
    activation = 'relu', kernel_initializer = tf.initializers.he_normal(),
    strides = (1, 1), padding = 'same')(inputs)

'''
x = Conv2D(
    filters = 64, kernel_size = (3, 3),
    activation = 'relu', kernel_initializer = tf.initializers.he_normal(),
    strides = (1, 1), padding = 'same')(x)

x = MaxPooling2D(pool_size = (3, 3), strides = (1, 1))(x)
'''

"\nx = Conv2D(\n    filters = 64, kernel_size = (3, 3),\n    activation = 'relu', kernel_initializer = tf.initializers.he_normal(),\n    strides = (1, 1), padding = 'same')(x)\n\nx = MaxPooling2D(pool_size = (3, 3), strides = (1, 1))(x)\n"

In [None]:
x.shape

TensorShape([None, 32, 32, 32])

In [19]:
def resnet_block(input_data, num_filters, stride_length, conv_1x1 = False):
    '''
    Function to implement ResNet block as defined in the research paper.
    '''
    
    img = Conv2D(
        filters = num_filters, kernel_size = (3, 3),
        activation = None, kernel_initializer = tf.initializers.he_normal(),
        strides = stride_length, padding = 'same')(input_data)
    img = BatchNormalization()(img)
    img = layers.Activation('relu')(img)
    
    img = Conv2D(
        filters = num_filters, kernel_size = (3, 3),
        activation = None, kernel_initializer = tf.initializers.he_normal(),
        strides = stride_length, padding = 'same')(img)
    img = BatchNormalization()(img)
    
    
    # Adjust number of filters/channels using 1x1 conv layer-
    if conv_1x1:
        input_data = Conv2D(
            filters = num_filters, kernel_size = (1, 1),
            activation = 'relu', kernel_initializer = tf.initializers.he_normal(),
            strides = (1, 1), padding = 'same'
        )(input_data)
        
    
    print(f"ResNet block (before adding): img.shape (output of conv layers) = {img.shape} & input_data.shape = {input_data.shape}")
    img = layers.Add()([img, input_data])
    
    img = layers.Activation('relu')(img)
    
    return img
    


In [20]:
inputs = keras.Input(shape = (32, 32, 3))

x = Conv2D(
    filters = 32, kernel_size = (3, 3),
    activation = 'relu', kernel_initializer = tf.initializers.he_normal(),
    strides = (1, 1), padding = 'same'
    # input_shape = (32, 32, 3)
    )(inputs)

'''
x = Conv2D(
    filters = 64, kernel_size = (3, 3),
    activation = 'relu', kernel_initializer = tf.initializers.he_normal(),
    strides = (1, 1), padding = 'same')(x)

x = MaxPooling2D(pool_size = (3, 3), strides = (1, 1))(x)

x.shape
# TensorShape([None, 30, 30, 64])
'''

x.shape
# TensorShape([None, 32, 32, 32])

x = resnet_block(input_data = x, num_filters = 64, stride_length = 1, conv_1x1 = True)
x = resnet_block(input_data = x, num_filters = 64, stride_length = 1, conv_1x1 = False)

x = resnet_block(input_data = x, num_filters = 128, stride_length = 1, conv_1x1 = True)
x = resnet_block(input_data = x, num_filters = 128, stride_length = 1, conv_1x1 = False)

x = resnet_block(input_data = x, num_filters = 256, stride_length = 1, conv_1x1 = True)
x = resnet_block(input_data = x, num_filters = 256, stride_length = 1, conv_1x1 = False)

x = resnet_block(input_data = x, num_filters = 512, stride_length = 1, conv_1x1 = True)
x = resnet_block(input_data = x, num_filters = 512, stride_length = 1, conv_1x1 = False)

# x.shape
# TensorShape([None, 32, 32, 512])

x = layers.GlobalAveragePooling2D()(x)

x = layers.Dense(
    units = 256, activation='relu',
    kernel_initializer = tf.initializers.he_normal()
)(x)

# x = layers.Dropout(0.5)(x)

outputs = layers.Dense(units = 10, activation='softmax')(x)

ResNet block (before adding): img.shape (output of conv layers) = (None, 32, 32, 64) & input_data.shape = (None, 32, 32, 64)
ResNet block (before adding): img.shape (output of conv layers) = (None, 32, 32, 64) & input_data.shape = (None, 32, 32, 64)
ResNet block (before adding): img.shape (output of conv layers) = (None, 32, 32, 128) & input_data.shape = (None, 32, 32, 128)
ResNet block (before adding): img.shape (output of conv layers) = (None, 32, 32, 128) & input_data.shape = (None, 32, 32, 128)
ResNet block (before adding): img.shape (output of conv layers) = (None, 32, 32, 256) & input_data.shape = (None, 32, 32, 256)
ResNet block (before adding): img.shape (output of conv layers) = (None, 32, 32, 256) & input_data.shape = (None, 32, 32, 256)
ResNet block (before adding): img.shape (output of conv layers) = (None, 32, 32, 512) & input_data.shape = (None, 32, 32, 512)
ResNet block (before adding): img.shape (output of conv layers) = (None, 32, 32, 512) & input_data.shape = (None, 3

In [21]:
# Define ResNet model-
resnet_model = keras.Model(inputs, outputs)

In [None]:
# model = Model(inputs, x)

In [22]:
resnet_model.summary()

Model: "model"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_1 (InputLayer)            [(None, 32, 32, 3)]  0                                            
__________________________________________________________________________________________________
conv2d (Conv2D)                 (None, 32, 32, 32)   896         input_1[0][0]                    
__________________________________________________________________________________________________
conv2d_1 (Conv2D)               (None, 32, 32, 64)   18496       conv2d[0][0]                     
__________________________________________________________________________________________________
batch_normalization (BatchNorma (None, 32, 32, 64)   256         conv2d_1[0][0]                   
______________________________________________________________________________________________

In [23]:
# Sanity check to see whether the defined model works-
resnet_model.predict(images).shape

(128, 10)

In [24]:
# Define callback-
callback = [
    tf.keras.callbacks.EarlyStopping(
        monitor='val_loss', patience = 3,
        min_delta = 0.001
    )
]

In [25]:
# Compile defined model-
resnet_model.compile(
    optimizer = optimizer,
    loss = loss_fn,
    metrics = ['acc']
)

In [26]:
# Train model-
resnet_history = resnet_model.fit(
    train_dataset, epochs = num_epochs,
    validation_data = test_dataset,
    callbacks = callback
)

Epoch 1/200
Epoch 2/200
Epoch 3/200
