In [None]:
#!pip install keras-tuner --upgrade
#!pip install tensorflow

In [None]:
from tensorflow import keras
from keras import backend as K
import keras_tuner as kt
from keras.layers import Layer
from keras.utils.vis_utils import plot_model
from tensorflow.keras import layers
from tensorflow.keras.initializers import RandomNormal
from tensorflow.keras.optimizers import SGD, Adam
from tensorboard.plugins.hparams import api as hp
import tensorflow as tf

from sklearn.model_selection import train_test_split
from keras.callbacks import CSVLogger

import os
import pandas as pd
import pickle
import datetime
import numpy as np

from matplotlib import pyplot as plt
import random
import logging

## Prepare the data

In [None]:
# Model / data parameters
dataset = "MNIST"
num_classes = 10
input_shape = (28, 28, 1)

# Load the data and split it between train and test sets
(x_train, y_train), (x_test, y_test) = keras.datasets.mnist.load_data()

if dataset == "CIFAR10":
    input_shape = (32, 32, 3)
    (x_train, y_train), (x_test, y_test) = keras.datasets.cifar10.load_data()


# Scale images to the [0, 1] range
x_train = x_train.astype("float32") / 255
x_test = x_test.astype("float32") / 255
# Make sure images have shape (28, 28, 1)
x_train = np.expand_dims(x_train, -1)
x_test = np.expand_dims(x_test, -1)

# convert class vectors to binary class matrices
y_train = keras.utils.to_categorical(y_train, num_classes)
y_test = keras.utils.to_categorical(y_test, num_classes)

# Split the data
x_train, x_valid, y_train, y_valid = train_test_split(
    x_train, y_train, test_size=0.20, shuffle=True
)

print(dataset)
print("x_train shape:", x_train.shape)
print(x_train.shape[0], "train samples")
print(x_valid.shape[0], "validation samples")
print(x_test.shape[0], "test samples")

## Building the MLP model

##### Building a custom layer for our random parameters

In [None]:
# Constraint to fix/freeze all layer weights but some (two params per layer unit)
# NB: Very slow, only did a constraint based solution because Keras does not support "picking" weigths to freeze atm.
class FreezeWeights(tf.keras.constraints.Constraint):
    def __init__(self, shape):
        self.shape = shape
        self.init_weights = tf.keras.initializers.HeNormal(seed=None)(
            shape=self.shape
        ).numpy()
        self.trainable_weights_0 = []
        self.trainable_weights_1 = []
        rand_ints = random.sample(
            range(0, self.shape[0] * self.shape[1]), 2 * self.shape[1]
        )
        # Map random ints to coordinates in x,y
        for i in range(self.shape[0]):
            for j in range(self.shape[1]):
                if i * self.shape[1] + j in rand_ints:
                    self.trainable_weights_0.append(i)
                    self.trainable_weights_1.append(j)

    def __call__(self, w):
        new_w = np.copy(self.init_weights)
        # For each unfrozen weigth, update
        for i in range(len(self.trainable_weights_0)):
            new_w[self.trainable_weights_0[i]][self.trainable_weights_1[i]] = w[
                self.trainable_weights_0[i]
            ][self.trainable_weights_1[i]].numpy()

        return tf.convert_to_tensor(new_w)

In [None]:
# When input and all hidden layers are the same width, this makes computation much faster than weight constraint.
class CustomDenseRandomLayer(Layer):
    def __init__(self, width=784):
        super(CustomDenseRandomLayer, self).__init__()
        self.Y = layers.Dense(
            2, name="trainable_layer", use_bias=False, kernel_initializer="he_normal"
        )
        self.Z = layers.Dense(
            self.width - 2,
            name="non_trainable_layer",
            use_bias=False,
            kernel_initializer="he_normal",
        )
        self.Z.trainable = False

    def call(self, x):
        return layers.Concatenate()([self.Y(x), self.Z(x)])

In [None]:
# Not used at this time. Kept for future work.
class CustomRandomLayer(Layer):
    def __init__(self, trainable=True):
        self.trainable = trainable
        super(CustomRandomLayer, self).__init__()

    def build(self, input_shape):
        # Adding two weights with shape as number of channels to make two params per channel
        self.w1 = self.add_weight(
            name="w1",
            shape=(input_shape[-1],),
            initializer="he_normal",
            trainable=self.trainable,
        )
        self.w2 = self.add_weight(
            name="w1",
            shape=(input_shape[-1],),
            initializer="he_normal",
            trainable=self.trainable,
        )

    def call(self, x):
        return (x * self.w1) + self.w2

In [None]:
def fc_model_builder(
    units_dense_layer=10,
    dense_layers=1,
    dense=True,
    bn=False,
    random=False,
    dense_out=False,
):
    """
    Wrapper for constructing models with different trainable layers and units in dense layers.
    """
    model = keras.Sequential()
    model.add(keras.Input(shape=input_shape))
    model.add(layers.Flatten())

    # Adding variable amount of hidden dense layers
    for i in range(0, dense_layers):
        if random:
            # Concatingating layers in custom layer works when the hidden layers have same dim as input layer
            if units_dense_layer == model.layers[-1].output_shape[1]:
                model.add(
                    CustomDenseRandomLayer(width=model.layers[-1].output_shape[1])
                )
            else:
                # Create constraint to fix all weights except some
                # NB: Very very slow! 10x full param computing time
                model.add(
                    layers.Dense(
                        units_dense_layer,
                        activation=None,
                        kernel_initializer="he_normal",
                        use_bias=False,
                        trainable=dense,
                        kernel_constraint=FixWeights(
                            shape=(model.layers[-1].output_shape[1], units_dense_layer)
                        ),
                    )
                )
            # We keep the normalizing layer, but without it having any params effecting the model
            model.add(
                layers.BatchNormalization(
                    beta_initializer="zeros",
                    gamma_initializer="ones",
                    trainable=False,
                )
            )

        else:
            model.add(
                layers.Dense(
                    units_dense_layer,
                    activation=None,
                    kernel_initializer="he_normal",
                    use_bias=False,
                    trainable=dense,
                    kernel_constraint=FixWeights(
                        shape=(model.layers[-1].output_shape[1], units_dense_layer)
                    )
                    if random
                    else None,
                )
            )
            if bn:
                model.add(
                    layers.BatchNormalization(
                        beta_initializer="zeros",
                        gamma_initializer=RandomNormal(mean=0.0, stddev=1.0),
                        trainable=True,
                    )
                )

        model.add(layers.Activation("relu"))

    # Output layer
    model.add(
        layers.Dense(
            num_classes,
            activation="softmax",
            kernel_initializer="he_normal",
            trainable=dense_out,
        )
    )

    return model

In [None]:
def fc_model_builder_dim_wrapper(
    layer_width=784, min_hdl=2, max_hdl=4, num_hdl_interval=1
):
    """
    Helper function for creating the models of interest with widht and depth as variables
    """
    fc_models = {}

    # Build nested dict with variable number of layered models
    for i in range(min_hdl, max_hdl + 1, num_hdl_interval):
        fc_models[i] = {}

        model_fc_all_layers = fc_model_builder(
            units_dense_layer=layer_width,
            dense_layers=i,
            dense=True,
            bn=True,
            random=False,
            dense_out=True,
        )
        vanilla_fc_model = fc_model_builder(
            units_dense_layer=layer_width,
            dense_layers=i,
            dense=True,
            bn=False,
            random=False,
            dense_out=True,
        )
        model_fc_bn = fc_model_builder(
            units_dense_layer=layer_width,
            dense_layers=i,
            dense=False,
            bn=True,
            random=False,
            dense_out=False,
        )
        model_fc_bn_out = fc_model_builder(
            units_dense_layer=layer_width,
            dense_layers=i,
            dense=False,
            bn=True,
            random=False,
            dense_out=True,
        )
        model_fc_random = fc_model_builder(
            units_dense_layer=layer_width,
            dense_layers=i,
            dense=True,
            bn=False,
            random=True,
            dense_out=False,
        )
        model_fc_out = fc_model_builder(
            units_dense_layer=layer_width,
            dense_layers=i,
            dense=False,
            bn=False,
            random=False,
            dense_out=True,
        )
        model_fc_none = fc_model_builder(
            units_dense_layer=layer_width,
            dense_layers=i,
            dense=False,
            bn=False,
            random=False,
            dense_out=False,
        )

        # Save models in dict
        fc_models[i] = {
            "model_fc_all_layers": model_fc_all_layers,
            # "vanilla_fc_model": vanilla_fc_model,
            "model_fc_bn": model_fc_bn,
            # "model_fc_bn_out": model_fc_bn_out,
            "model_fc_random": model_fc_random,
            "model_fc_out": model_fc_out,
            "model_fc_none": model_fc_none,
        }

    return fc_models

In [None]:
def scheduler(epoch, lr):
    if epoch in [70, 100]:
        return lr * 0.1
    return lr


# Creating callbacks
log_dir = os.path.join("logs/", datetime.datetime.now().strftime("%Y%m%d-%H%M%S"))

callbacks = [
    keras.callbacks.TensorBoard(
        log_dir=log_dir,
        histogram_freq=1,
        write_graph=True,
        write_images=True,
    )
]
callbacks.append(CSVLogger(log_dir + "/" + "latest.csv"))
callbacks.append(
    keras.callbacks.EarlyStopping(
        monitor="val_loss", patience=3, restore_best_weights=True
    )
)
callbacks.append(keras.callbacks.LearningRateScheduler(scheduler, verbose=1))

## Train models

In [None]:
# Building models of different depth and trainable layers
fc_models = fc_model_builder_dim_wrapper(
    layer_width=784, min_hdl=2, max_hdl=14, num_hdl_interval=2
)

In [None]:
# Init training params
epochs = 100
batch_size = 128
model_histories = {}

# Create new time-based log dir
log_dir = os.path.join(
    "logs/dense",
    datetime.datetime.now().strftime("%Y%m%d-%H%M%S"),
)

# Train all models in dict
for depth, nested in fc_models.items():
    model_histories[depth] = {}
    log_dir_depth = log_dir + "/" + str(depth)

    for key, model in nested.items():
        # Reinitialize callbacks with new directory
        log_dir_depth_key = log_dir_depth + "/" + key

        callbacks[0] = keras.callbacks.TensorBoard(
            log_dir=log_dir_depth_key,
            histogram_freq=1,
            write_graph=True,
            write_images=True,
        )
        callbacks[1] = CSVLogger(log_dir + "fit_csv_logger.csv")

        # Compile and fit models
        model.compile(
            loss=keras.losses.categorical_crossentropy,
            optimizer=SGD(learning_rate=0.01, momentum=0.9),
            metrics=["accuracy"],
            run_eagerly=True,
        )
        model_histories[depth][key] = model.fit(
            x_train,
            y_train,
            batch_size=batch_size,
            epochs=epochs,
            validation_data=(x_valid, y_valid),
            callbacks=callbacks,
        )

In [None]:
%reload_ext tensorboard

%tensorboard --logdir './logs/dense'

## Evaluate model

In [None]:
# Evaluate on test dataset
loss_test_dict = {}
acc_test_dict = {}

for depth, nested in fc_models.items():
    acc_test_dict[depth] = {}
    loss_test_dict[depth] = {}
    for key, model in nested.items():
        score = model.evaluate(x_test, y_test, verbose=0)
        loss_test_dict[depth][key] = score[0]
        acc_test_dict[depth][key] = score[1]

In [None]:
# Turn dict with acc metrics into dataframe and export
acc_test_frame = pd.DataFrame.from_dict(acc_test_dict, orient="index")
loss_test_frame = pd.DataFrame.from_dict(loss_test_dict, orient="index")

In [None]:
acc_test_frame

In [None]:
# Save statistics
acc_test_frame.to_csv(
    "logs/dense_test_stats_"
    + datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
    + ".csv",
    index=True,
    index_label="layers",
)

## Save models

In [None]:
# Save all models
for depth, nested in fc_models.items():
    for key, model in nested.items():
        model.save("models/saved764/" + str(depth) + key)

In [None]:
# model = keras.models.load_model('path/to/location')

## Tune model

In [None]:
# model = keras.models.load_model('path/to/location')

In [None]:
class MyFcHyperModel(kt.HyperModel):
    def build(self, hp):
        model = fc_model_builder(
            dense=hp.Boolean("dense"),
            bn=hp.Boolean("bn"),
            random=not hp.Boolean("bn"),
            dense_out=hp.Boolean("output"),
        )
        model.compile(
            loss=keras.losses.categorical_crossentropy,
            optimizer=SGD(learning_rate=0.01, momentum=0.9),
            metrics=["accuracy"],
        )

        return model

    def fit(self, hp, model, *args, **kwargs):
        return model.fit(
            *args,
            batch_size=128,  # hp.Choice("batch_size", [16, 32, 64, 128])
            **kwargs,
        )

In [None]:
fc_tuner = kt.RandomSearch(
    MyFcHyperModel(),
    objective="val_accuracy",
    max_trials=20,
    overwrite=True,
    # Set a directory to store the intermediate results.
    directory="/tmp/tb",
    project_name="fc_batchnorm",
)

In [None]:
# fc_tuner.search(x_train, y_train, epochs=2, validation_data=(x_test, y_test), callbacks=[callbacks],)

In [None]:
%reload_ext tensorboard

%tensorboard --logdir ./logs/fc