In [1]:
import os
import numpy as np
import scipy
import tensorflow
from tensorflow.keras import Model
from tensorflow.keras.datasets import cifar10
from tensorflow.keras.layers import Add, GlobalAveragePooling2D,\
	Dense, Flatten, Conv2D, Lambda,	Input, BatchNormalization, Activation
from tensorflow.keras.optimizers import schedules, SGD
from tensorflow.keras.callbacks import TensorBoard, ModelCheckpoint

# Set up Configuration 

In [2]:
# Model Configuration
def model_configuration(): 
    """
    Get the model configuration
    """

    # Load dataset for computing dataset size
    (input_train, _), (_, _) = load_dataset()

    # Generic config
    width, height, channels = 32, 32, 3
    batch_size = 128
    num_classes = 10
    validation_split = 0.1
    verbose = 1
    n = 3
    init_fm_dim = 16
    shortcut_type = "identity"
    
    # Dataset size
    train_size = (1 - validation_split)  * len(input_train)
    val_size = (validation_split) * len(input_train)

    # Number of steps per epoch is dependent on batch size
    maximum_number_iterations = 64000
    steps_per_epoch = tensorflow.math.floor(train_size / batch_size)
    val_steps_per_epoch = tensorflow.math.floor(val_size / batch_size)
    epochs = tensorflow.cast(tensorflow.math.floor(maximum_number_iterations / steps_per_epoch), dtype=tensorflow.int64)

    # Define loss function
    loss = tensorflow.keras.losses.CategoricalCrossentropy(from_logits=True)

    # Learning rate config
    boundaries = [32000, 48000]
    values = [0.1, 0.01, 0.001]
    lr_schedule = schedules.PiecewiseConstantDecay(boundaries, values)

    # Set layer init
    initialiser = tensorflow.keras.initializers.HeNormal()

    # Define optimiser
    optimiser_momentum = 0.9
    optimiser_additional_metrics = ["accuracy"]
    optimiser = SGD(learning_rate=lr_schedule, momentum=optimiser_momentum)

    # Load Tensorboard callback
    tensorboard = TensorBoard(
	    log_dir=os.path.join(os.getcwd(), "logs"),
	    histogram_freq=1,
	    write_images=True
	)

    # Save a model checkpoint after every epoch
    checkpoint = ModelCheckpoint(
        os.path.join(os.getcwd(), "model_checkpoint"),
        save_freq="epoch"
    )

    # Add callbacks to list
    callbacks = [
        tensorboard,
        checkpoint
    ]

    #Create config dictionary
    config = {
        "width": width,
        "height": height,
        "dim": channels,
        "batch_size": batch_size,
        "num_classes": num_classes,
        "validation_split": validation_split,
        "verbose": verbose,
        "stack_n": n,
        "initial_num_feature_maps": init_fm_dim,
        "training_ds_size": train_size,
        "steps_per_epoch": steps_per_epoch,
        "val_steps_per_epoch": val_steps_per_epoch,
        "num_epochs": epochs,
        "loss": loss,
        "optim": optimiser,
        "optim_learning_rate_schedule": lr_schedule,
        "optim_momentum": optimiser_momentum,
        "optim_additional_metrics": optimiser_additional_metrics,
        "initialiser": initialiser,
        "callbacks": callbacks,
        "shortcut_type": shortcut_type,
        "name": "ResNet20"
    }
    return config

# Loading Dataset

In [3]:
def load_dataset():
	"""
		Load the CIFAR-10 dataset
	"""
	return cifar10.load_data()

# Preprocessing
## Cropping image

In [4]:
def random_crop(img, random_crop_size): 
    # Note: image_data_format is 'channel_last'
    # SOURCE: https://jkjung-avt.github.io/keras-image-cropping/

    assert img.shape[2] == 3
    height, width = img.shape[0], img.shape[1]
    dy, dx = random_crop_size
    x = np.random.randint(0, width - dx + 1)
    y = np.random.randint(0, height - dy + 1)
    return img[y:(y+dy), x:(x+dx), :]

def crop_generator(batches, crop_length):
    """
    Take as input a Keras ImageGen (Iterator) and generate random crops from the image batches generated by the original iterator
    SOURCE: https://jkjung-avt.github.io/keras-image-cropping/
    """

    while True: 
        batch_x, batch_y = next(batches)
        batch_crops = np.zeros((batch_x.shape[0], crop_length, crop_length, 3))
        for i in range(batch_x.shape[0]):
            batch_crops[i] = random_crop(batch_x[i], (crop_length, crop_length))
        yield (batch_crops, batch_y)

In [5]:
def preprocessed_dataset(): 
    """
    Load and preprocess the CIFAR-10 dataset
    """
    (input_train, target_train), (input_test, target_test) = load_dataset()

    # Retrieve shape from model configuration and unpack into components
    config = model_configuration()
    width, height, dim = config.get("width"), config.get("height"), config.get("dim")
    num_classes = config.get("num_classes")

    # Data augmentation: perform zero padding on datasets
    paddings = tensorflow.constant([[0,0,], [4,4], [4,4], [0,0]])
    input_train = tensorflow.pad(input_train, paddings, mode="CONSTANT")

    # Convert scalar targets to categorical ones
    target_train = tensorflow.keras.utils.to_categorical(target_train, num_classes)
    target_test = tensorflow.keras.utils.to_categorical(target_test, num_classes)

    # Data generator for training data
    train_generator = tensorflow.keras.preprocessing.image.ImageDataGenerator(
        validation_split=config.get("validation_split"),
        horizontal_flip=True,
        rescale=1./255,
        preprocessing_function = tensorflow.keras.applications.resnet50.preprocess_input
    )

    # Generate training and validation batches
    train_batches = train_generator.flow(input_train, target_train, batch_size=config.get("batch_size"), subset="training")
    validation_batches = train_generator.flow(input_train, target_train, batch_size=config.get("batch_size"), subset="validation")
    train_batches = crop_generator(train_batches, config.get("height"))
    validation_batches = crop_generator(validation_batches, config.get("height"))

    # Data generator for testing data
    test_generator = tensorflow.keras.preprocessing.image.ImageDataGenerator(
        rescale=1./255,
        preprocessing_function = tensorflow.keras.applications.resnet50.preprocess_input
    )

    # Generate testing batches
    test_batches = test_generator.flow(input_test, target_test, batch_size=config.get("batch_size"))

    return train_batches, validation_batches, test_batches

# Creating the Residual Block

In [6]:
def residual_block(x, number_of_filters, match_filter_size=False):
    """
    Residual block for ResNet
    """
    # Retrieve initialiser
    config = model_configuration()
    initialiser = config.get("initialiser")

    # Create skip connection
    x_skip = x

    # Perform the original mapping
    if match_filter_size: 
        x = Conv2D(number_of_filters, kernel_size=(3, 3), strides=(2, 2), kernel_initializer=initialiser,padding="same")(x_skip)
    else: 
        x = Conv2D(number_of_filters, kernel_size=(3, 3), strides=(1, 1), kernel_initializer=initialiser, padding="same")(x_skip)

    x = BatchNormalization(axis=3)(x)
    x = Activation("relu")(x)
    x = Conv2D(number_of_filters, kernel_size=(3, 3), kernel_initializer=initialiser, padding="same")(x)
    x = BatchNormalization(axis=3)(x)

    # Perform matching of filter numbers if necessary
    if match_filter_size and config.get("shortcut_type") == "identity": 
        x_skip = Lambda(lambda x: tensorflow.pad(x[:, ::2, ::2, :], tensorflow.constant([[0, 0,], [0, 0], [0, 0], [number_of_filters//4, number_of_filters//4]]), mode="CONSTANT"))(x_skip)
        x_skip = BatchNormalization(axis=3)(x_skip)
    elif match_filter_size and config.get("shortcut_type") == "projection":
        x_skip = Conv2D(number_of_filters, kernel_size=(1,1), kernel_initializer=initializer, strides=(2,2))(x_skip)
    
    # Add the skip connection to the regular mapping
    x = Add()([x, x_skip])

    # Nonlinearly activate the result
    x = Activation("relu")(x)

    # Return the result
    return x

In [7]:
def ResidualBlocks(x):
    """ 
    Set up the residual blocks
    """

    # Retrieve values
    config = model_configuration()

    # Set initial filter size
    filter_size = config.get("initial_num_feature_maps")

    # 6n/2n = 3, there are 3 groups of residual blocks
    for layer_group in range(3): 

        # Each block in our code has 2 weighted layers, and each group has 2n such blocks, so we have n blocks per group
        for block in range(config.get("stack_n")):

            # Perform filter size increase at every first layer in the 2nd block onwards. 
            if layer_group > 0 and block == 0: 
                filter_size *= 2
                x = residual_block(x, filter_size, match_filter_size=True)
            else:
                x = residual_block(x, filter_size)

    return x

In [8]:
def model_base(shp): 
    """
    Base Structure of the model, with residual blocks attached
    """

    # Get number of classes from model configuration
    config = model_configuration()
    initialiser = config.get("initialiser")

    # Define model structure
    # logits are returned becase Softmax is pushed to loss function
    inputs = Input(shape=shp)
    x = Conv2D(config.get("initial_num_feature_maps"), kernel_size=(3, 3), strides=(1,1), kernel_initializer=initialiser, padding="same")(inputs)
    x = BatchNormalization()(x)
    x = Activation("relu")(x)
    x = ResidualBlocks(x)
    x = GlobalAveragePooling2D()(x)
    x = Flatten()(x)
    outputs = Dense(config.get("num_classes"), kernel_initializer=initialiser)(x)

    return inputs, outputs

# Model Initialisation

In [9]:
def init_model(): 
    """
    Initilise a compiled ResNet model
    """

    # Get shape from model configuration
    config = model_configuration()

    # Get model base
    inputs, outputs = model_base((config.get("width"), config.get("height"), config.get("dim")))

    # Initialise and compile model
    model = Model(inputs, outputs, name=config.get("name"))
    model.compile(loss=config.get("loss"), optimizer=config.get("optim"), metrics=config.get("optim_additional_metrics"))

    # Print model summary
    model.summary()

    return model

# Training

In [10]:
def train_model(model, train_batches, validation_batches): 
    """
    Train an initialised model
    """

    # Get model configuration
    config = model_configuration()

    # Fit data to model
    model.fit(
        train_batches,
        batch_size=config.get("batch_size"),
        epochs=config.get("num_epochs"),
        verbose=config.get("verbose"),
        callbacks=config.get("callbacks"),
        steps_per_epoch=config.get("steps_per_epoch"),
        validation_data=validation_batches,
        validation_steps=config.get("val_steps_per_epoch")
    )
    return model


# Evaluation

In [11]:
def evaluate_model(model, test_batches): 
    """
    Evaluate a trained model
    """

    # Evaluate the model
    score = model.evaluate(test_batches, verbose=0)
    print(f'Test loss: {score[0]} / Test accuracy: {score[1]}')

# Overall Process

In [12]:
def training_process(): 
    """
    Run the training process for the ResNet model
    """

    # Get dataset
    train_batches, validation_batches, test_batches = preprocessed_dataset()

    # Initialise ResNet
    resnet = init_model()

    # Train ResNet model
    trained_resnet = train_model(resnet, train_batches, validation_batches)

    # Evaluate ResNet model
    evaluate_model(trained_resnet, test_batches)

In [13]:
if __name__ == "__main__": 
    training_process()

2024-03-30 22:18:20.384301: I metal_plugin/src/device/metal_device.cc:1154] Metal device set to: Apple M1 Pro
2024-03-30 22:18:20.384327: I metal_plugin/src/device/metal_device.cc:296] systemMemory: 16.00 GB
2024-03-30 22:18:20.384333: I metal_plugin/src/device/metal_device.cc:313] maxCacheSize: 5.33 GB
2024-03-30 22:18:20.384366: I tensorflow/core/common_runtime/pluggable_device/pluggable_device_factory.cc:306] Could not identify NUMA node of platform GPU ID 0, defaulting to 0. Your kernel may not have been built with NUMA support.
2024-03-30 22:18:20.384384: I tensorflow/core/common_runtime/pluggable_device/pluggable_device_factory.cc:272] Created TensorFlow device (/job:localhost/replica:0/task:0/device:GPU:0 with 0 MB memory) -> physical PluggableDevice (device: 0, name: METAL, pci bus id: <undefined>)


Model: "ResNet20"
__________________________________________________________________________________________________
 Layer (type)                Output Shape                 Param #   Connected to                  
 input_1 (InputLayer)        [(None, 32, 32, 3)]          0         []                            
                                                                                                  
 conv2d (Conv2D)             (None, 32, 32, 16)           448       ['input_1[0][0]']             
                                                                                                  
 batch_normalization (Batch  (None, 32, 32, 16)           64        ['conv2d[0][0]']              
 Normalization)                                                                                   
                                                                                                  
 activation (Activation)     (None, 32, 32, 16)           0         ['batch_normalization[0



NameError: name 'batch_s' is not defined