Exercise 5: ResNet with subclassing & sequential API

In [None]:
# Check your device for learning
%tensorflow_version 2.x
import tensorflow as tf
device_name = tf.test.gpu_device_name()
if device_name != '/device:GPU:0':
  raise SystemError('GPU device not found')
print('Found GPU at: {}'.format(device_name))

In [None]:
import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt

from tensorflow.keras import *
from tensorflow.keras.datasets import * #cifar10
from tensorflow.keras.utils import * #to_categorical
from tensorflow.keras.preprocessing.image import * # ImageDataGenerator
from tensorflow.keras.layers import *#Layer, Conv2D, BatchNormalization, ReLU


In [None]:
class ResidualUnit(Layer):
    def __init__(self, filters, strides=1, activation="relu", use_bias=False, **kwargs):
        super(ResidualUnit, self).__init__(**kwargs)
        self.activation = activation
        self.main_layers = [
            Conv2D(filters, 3, strides=strides, padding="same", use_bias=use_bias),
            BatchNormalization(),
            ReLU(),
            Conv2D(filters, 3, strides=1, padding="same", use_bias=use_bias),
            BatchNormalization()
        ]
        self.skip_layers = []
        if strides > 1:
            self.skip_layers = [
                Conv2D(filters, 1, strides=strides, padding="same", use_bias=use_bias),
                BatchNormalization()
            ]

    def call(self, inputs):
        Z = inputs
        for layer in self.main_layers:
            Z = layer(Z)
        skip_Z = inputs
        for layer in self.skip_layers:
            skip_Z = layer(skip_Z)
        return activations.relu(Z + skip_Z)


In [None]:

# Define the ResidualUnit class (as shown previously)

model = models.Sequential()

# Initial Convolution Layer
model.add(Conv2D(64, 7, strides=2, input_shape=[28,28, 1], padding="same", use_bias=False))
model.add(BatchNormalization())
model.add(ReLU())
model.add(MaxPool2D(pool_size=3, strides=2, padding="same"))
prev_filters = 64

# Residual Blocks
block_sizes = [3, 4, 6, 3]
filters_list = [64, 128, 256, 512]

for block_size, filters in zip(block_sizes, filters_list):
    for _ in range(block_size):
        strides = 1 if filters == prev_filters else 2
        model.add(ResidualUnit(filters, strides=strides))
        prev_filters = filters

# Global Average Pooling and Classification Layer
model.add(GlobalAveragePooling2D())
model.add(Flatten())
model.add(Dense(10, activation="softmax"))



In [None]:
model.summary()

Exercise 6: Training ResNet for fashion MNIST

In [None]:
# load fashion MNIST data
(X_train_full, y_train_full), (X_test, y_test) = fashion_mnist.load_data()
X_train, X_valid = X_train_full[:-5000], X_train_full[-5000:]
y_train, y_valid = y_train_full[:-5000], y_train_full[-5000:]
# data normalization
X_mean = X_train.mean(axis=0, keepdims=True)
X_std = X_train.std(axis=0, keepdims=True) + 1e-7
X_train = (X_train - X_mean) / X_std
X_valid = (X_valid - X_mean) / X_std
X_test = (X_test - X_mean) / X_std
X_train = X_train[..., np.newaxis]
X_valid = X_valid[..., np.newaxis]
X_test = X_test[..., np.newaxis]

In [None]:
model.compile(loss="sparse_categorical_crossentropy", optimizer="nadam",metrics=["accuracy"])
history = model.fit(X_train, y_train, epochs=10, validation_data=(X_valid,y_valid))
score = model.evaluate(X_test, y_test)
X_new = X_test[:10] # pretend we have new images
y_pred = model.predict(X_new)

Exercise 7: Training ResNet for CIFAR10

In [None]:
def visualize_data(images, categories, class_names):
    fig = plt.figure(figsize=(14, 6))
    fig.patch.set_facecolor('white')
    for i in range(3 * 7):
        plt.subplot(3, 7, i+1)
        plt.xticks([])
        plt.yticks([])
        plt.imshow(images[i])
        class_index = categories[i].argmax()
        plt.xlabel(class_names[class_index])
    plt.show()

In [None]:
class_names = ['airplane', 'automobile', 'bird', 'cat', 'deer', 'dog', 'frog', 'horse', 'ship', 'truck']
num_classes = len(class_names)

(x_train, y_train), (x_test, y_test) = cifar10.load_data()

x_train = x_train / 255.0
y_train = to_categorical(y_train, num_classes)

x_test = x_test / 255.0
y_test = to_categorical(y_test, num_classes)

In [None]:
print(x_train.shape, y_train.shape)
visualize_data(x_train, y_train, class_names)

In [None]:
# Define the ResidualUnit class (as shown previously)

model = models.Sequential()

# Initial Convolution Layer
model.add(Conv2D(64, 7, strides=2, input_shape=[32, 32, 3], padding="same", use_bias=False))
model.add(BatchNormalization())
model.add(ReLU())
model.add(MaxPool2D(pool_size=3, strides=2, padding="same"))
prev_filters = 64

# Residual Blocks
block_sizes = [3, 4, 6, 3]
filters_list = [64, 128, 256, 512]

for block_size, filters in zip(block_sizes, filters_list):
    for _ in range(block_size):
        strides = 1 if filters == prev_filters else 2
        model.add(ResidualUnit(filters, strides=strides))
        prev_filters = filters

# Global Average Pooling and Classification Layer
model.add(GlobalAveragePooling2D())
model.add(Flatten())
model.add(Dense(10, activation="softmax"))


In [None]:
model.summary()

In [None]:
model.compile(loss="categorical_crossentropy", optimizer="nadam",metrics=["accuracy"])
history = model.fit(x_train, y_train, epochs=10, validation_data=(x_test,y_test))
score = model.evaluate(x_test, y_test)

Exercise 8: CIFAR10 with data augmentation

In [None]:
(x_train, y_train), (x_test, y_test) = cifar10.load_data()

x_train = x_train / 255.0
y_train = to_categorical(y_train, num_classes)

x_test = x_test / 255.0
y_test = to_categorical(y_test, num_classes)

visualize_data(x_train, y_train, class_names)

In [None]:
width_shift = 3/32
height_shift = 3/32
flip = True

datagen = ImageDataGenerator(
    horizontal_flip=flip,
    width_shift_range=width_shift,
    height_shift_range=height_shift,
    )
datagen.fit(x_train)

it = datagen.flow(x_train, y_train, shuffle=False)
batch_images, batch_labels = next(it)
visualize_data(batch_images, batch_labels, class_names)

In [None]:
model.compile(loss="categorical_crossentropy", optimizer="nadam",metrics=["accuracy"])
history = model.fit(x_train, y_train, epochs=10, validation_data=(x_test,y_test))
score = model.evaluate(x_test, y_test)

Exercise 9: Do it yourself

In [None]:
class ResidualUnit50(Layer):
    def __init__(self, filters, strides=1, activation="relu", **kwargs):
        super().__init__(**kwargs)
        self.activation = activations.get(activation)  # ReLU activation in this example
        self.main_layers = [
            Conv2D(filters, 1, strides=strides, padding="same", use_bias=False),
            BatchNormalization(),
            self.activation,

            Conv2D(filters, 3, strides=1, padding="same", use_bias=False),
            BatchNormalization(),
            self.activation,

            Conv2D(filters * 4, 1, strides=1, padding="same", use_bias=False),
            BatchNormalization(),
            self.activation
        ]

        self.skip_layers = []  # To make skip connection
        if strides > 1:
            self.skip_layers = [
                Conv2D(filters * 4, 1, strides=strides, padding="same", use_bias=False),
                BatchNormalization()
            ]

    def call(self, inputs):
        Z = inputs
        for layer in self.main_layers:
            Z = layer(Z)
        skip_Z = inputs
        for layer in self.skip_layers:
            skip_Z = layer(skip_Z)
        return activations.relu(Z + skip_Z)


In [None]:
model = models.Sequential()
model.add(Conv2D(64, 7, strides=2, input_shape=[32, 32, 3], padding="same", use_bias=False))
model.add(BatchNormalization())
model.add(Activation("relu"))
model.add(MaxPool2D(pool_size=3, strides=2, padding="same"))
prev_filters = 0

block_sizes = [3, 4, 6, 3]
filters = [64, 128, 256, 512]

for i in range(len(block_sizes)):
    for _ in range(block_sizes[i]):
        strides = 1 if prev_filters == filters[i] else 2
        model.add(ResidualUnit50(filters[i], strides=strides))
        prev_filters = filters[i]

model.add(GlobalAvgPool2D())
model.add(Flatten())
model.add(Dense(10, activation="softmax"))

model.summary()

In [None]:
model.compile(loss="categorical_crossentropy", optimizer="nadam",metrics=["accuracy"])
history = model.fit(x_train, y_train, epochs=10, validation_data=(x_test,y_test))
score = model.evaluate(x_test, y_test)
x_new = x_test[:10] # pretend we have new images
y_pred = model.predict(x_new)