In [51]:
import math
import numpy as np
import tensorflow as tf
from tensorflow import keras
from functools import wraps

def timer_measurer(orig_func):
    import time

    @wraps(orig_func)
    def wrapper(*args, **kwargs):
        start = time.time()
        result = orig_func(*args, **kwargs)
        end = time.time() - start
        print(f'{orig_func.__name__} ran in: {end} sec')
        return result

    return wrapper

Implement a custom layer that performs Layer Normalization:

In [2]:
class MyLayerNormalizer(keras.layers.Layer):
    def __init__(self,*args ,**kwargs):
        super(MyLayerNormalizer, self).__init__(*args, **kwargs)
        self.eps = 0.001
        
    def build(self, batch_input_shape):
        self.alpha = self.add_weight(
            name="alpha", shape = batch_input_shape[-1:], initializer = "ones", dtype=tf.float32)
        self.beta = self.add_weight(
            name="beta", shape = batch_input_shape[-1:], initializer = "zeros", dtype=tf.float32)
        super(MyLayerNormalizer, self).build(batch_input_shape)
        
    def call(self, X):
        mean, variance = tf.nn.moments(X, axes=-1, keepdims=True)
        sd = tf.math.sqrt(variance)
        centralized_mean = X - mean
        denominator = sd + self.eps
        right_side = centralized_mean / denominator
        output = tf.math.multiply(self.alpha, right_side) + self.beta
        return output

Ensure that it produces similar results like keras.layers.LayerNormalization:

In [3]:
(x_boston_train, y_boston_train), _ = keras.datasets.boston_housing.load_data()

mylayer = MyLayerNormalizer()
my_results = mylayer(x_boston_train)

norm_layer = keras.layers.LayerNormalization()
norm_results = norm_layer(x_boston_train, training=True)

tf.less_equal(abs(my_results-norm_results), 0.001)



To change all layers to have dtype float64 by default, call `tf.keras.backend.set_floatx('float64')`. To change just this layer, pass dtype='float64' to the layer constructor. If you are the author of this layer, you can disable autocasting by passing autocast=False to the base Layer constructor.



To change all layers to have dtype float64 by default, call `tf.keras.backend.set_floatx('float64')`. To change just this layer, pass dtype='float64' to the layer constructor. If you are the author of this layer, you can disable autocasting by passing autocast=False to the base Layer constructor.



<tf.Tensor: id=75, shape=(404, 13), dtype=bool, numpy=
array([[ True,  True,  True, ...,  True,  True,  True],
       [ True,  True,  True, ...,  True,  True,  True],
       [ True,  True,  True, ...,  True,  True,  True],
       ...,
       [ True,  True,  True, ...,  True,  True,  True],
       [ True,  True,  True, ...,  True,  True,  True],
       [ True,  True,  True, ...,  True,  True,  True]])>

Checking manually differences between first observations:

In [4]:
my_results[0]

<tf.Tensor: id=79, shape=(13,), dtype=float32, numpy=
array([-0.517459  , -0.52729   , -0.4623597 , -0.52729   , -0.5229985 ,
       -0.47829714,  0.20417313, -0.49556747, -0.4953832 ,  1.9215562 ,
       -0.35977936,  2.6386611 , -0.37796623], dtype=float32)>

In [5]:
norm_results[0]

<tf.Tensor: id=83, shape=(13,), dtype=float32, numpy=
array([-0.517463  , -0.52729416, -0.46236333, -0.52729416, -0.5230027 ,
       -0.4783009 ,  0.2041747 , -0.49557137, -0.4953871 ,  1.9215713 ,
       -0.3597822 ,  2.638682  , -0.3779692 ], dtype=float32)>

Train a model using a custom training loop to tackle the Fashion MNIST dataset:

Display the epoch, iteration, mean training loss, and mean accuracy over each epoch (updated at each iteration), as well as the validation loss and accuracy at the end of each epoch

Try using a different optimizer with a different learning rate for the upper layers and the lower layers.

In [43]:
class_names = ["T-shirt", "Trouser", "Pullover", "Dress", "Coat",
               "Sandal", "Shirt", "Sneaker", "Bag", "Ankle boot"]

# Data loading and preparation:
fashion_mnist = keras.datasets.fashion_mnist
(x_train_full, y_train_full), (x_test, y_test) = fashion_mnist.load_data()

x_validation, x_train = x_train_full[:5000] / 255.0, x_train_full[5000:] / 255.0
y_validation, y_train = y_train_full[:5000], y_train_full[5000:]

x_test = x_test / 255.0

# Creating model:
l2_reg = keras.regularizers.l2(0.05)

model = keras.models.Sequential()
model.add(keras.layers.Flatten(input_shape = x_train.shape[1:]))
model.add(keras.layers.Dense(300, activation = "relu", kernel_initializer = "he_normal", kernel_regularizer = l2_reg))
model.add(keras.layers.Dense(100, activation = "relu", kernel_initializer = "he_normal", kernel_regularizer = l2_reg))
model.add(keras.layers.Dense(100, activation = "relu", kernel_initializer = "he_normal", kernel_regularizer = l2_reg))
model.add(keras.layers.Dense(100, activation = "relu", kernel_initializer = "he_normal", kernel_regularizer = l2_reg))
model.add(keras.layers.Dense(10, activation = "softmax"))

First lets compile this in a conventional keras way, and later we will do it ourselves:

In [45]:
model.compile(loss=keras.losses.sparse_categorical_crossentropy,
              optimizer=keras.optimizers.Nadam(),
              metrics=['accuracy'])

model.fit(x_train, y_train,
          batch_size=256,
          epochs=50,
          verbose=1,
          validation_data=(x_validation, y_validation))

Train on 55000 samples, validate on 5000 samples
Epoch 1/50
Epoch 2/50
Epoch 3/50
Epoch 4/50
Epoch 5/50
Epoch 6/50
Epoch 7/50
Epoch 8/50
Epoch 9/50
Epoch 10/50
Epoch 11/50
Epoch 12/50
Epoch 13/50
Epoch 14/50
Epoch 15/50
Epoch 16/50
Epoch 17/50
Epoch 18/50
Epoch 19/50
Epoch 20/50
Epoch 21/50
Epoch 22/50
Epoch 23/50
Epoch 24/50
Epoch 25/50
Epoch 26/50
Epoch 27/50
Epoch 28/50
Epoch 29/50
Epoch 30/50
Epoch 31/50
Epoch 32/50
Epoch 33/50
Epoch 34/50
Epoch 35/50
Epoch 36/50
Epoch 37/50
Epoch 38/50
Epoch 39/50
Epoch 40/50
Epoch 41/50
Epoch 42/50
Epoch 43/50
Epoch 44/50
Epoch 45/50
Epoch 46/50
Epoch 47/50
Epoch 48/50
Epoch 49/50
Epoch 50/50
[0.773391241645813, 0.8106]


Now to do this whole training loop myself:

In [63]:
batch_size = 256
n_epochs = 50
n_steps = len(x_train) // batch_size
loss_fn = keras.losses.sparse_categorical_crossentropy
optimizer1 = keras.optimizers.Nadam()
# optimizer2 = keras.optimizers.SGD(nesterov = True)
optimizer2 = optimizer1
mean_loss = keras.metrics.Mean()
metrics = [keras.metrics.Accuracy()]
val_metrics = [keras.metrics.Accuracy()]

model2 = keras.models.Sequential()
model2.add(keras.layers.Flatten(input_shape = x_train.shape[1:]))
model2.add(keras.layers.Dense(300, activation = "relu", kernel_initializer = "he_normal", kernel_regularizer = l2_reg))
model2.add(keras.layers.Dense(100, activation = "relu", kernel_initializer = "he_normal", kernel_regularizer = l2_reg))
model2.add(keras.layers.Dense(100, activation = "relu", kernel_initializer = "he_normal", kernel_regularizer = l2_reg))
model2.add(keras.layers.Dense(100, activation = "relu", kernel_initializer = "he_normal", kernel_regularizer = l2_reg))
model2.add(keras.layers.Dense(10, activation = "softmax"))

def random_batch(x, y, batch_size = batch_size):
    """
    Get random batch of data for training
    """
    idx = np.random.randint(len(x), size=batch_size)
    return x[idx], y[idx]

def print_status_bar(iteration, total, loss, metrics=None, valid_loss = None, valid_metrics = None):
    """
    Print the status bar
    """
    metrics_print = "loss: {:.4f} - ".format(loss.result() ) + " - ".join(["{}: {:.4f}".format(m.name, m.result()) for m in (metrics or [])])
    if valid_loss is not None:
        valid_print = " - valid_loss: {:.4f} - ".format(valid_loss.numpy() ) + " - ".join(["valid_{}: {:.4f}".format(m.name, m.result()) for m in (valid_metrics or [])])
        metrics_print = metrics_print + valid_print
    end = "" if iteration < total else "\n"
    print("\r{}/{} - ".format(iteration, total) + metrics_print, end=end)

def compute_metrics(metrics, y_true, y_pred):
    """
    Compute the results for each metric in the list.
    """
    # take the max in y_pred
    y_pred = tf.math.argmax(y_pred, axis=-1)
    for metric in metrics:
        metric(y_true, y_pred)

In [64]:
@timer_measurer
def train_neural_network():
    for epoch in range(1, n_epochs + 1):
        print(f"Epoch {epoch}/{n_epochs}")
        for step in range(1, n_steps + 1):
            x_batch, y_batch = random_batch(x_train, y_train)
            with tf.GradientTape() as tape:
                y_pred = model2(x_batch, training=True)
                main_loss = tf.reduce_mean(loss_fn(y_batch, y_pred))
                loss = tf.add_n([main_loss] + model2.losses)
            gradients = tape.gradient(loss, model2.trainable_variables)
            gradients1, variables1 = gradients[:math.floor(len(model2.layers)/2)], model2.trainable_variables[:math.floor(len(model2.layers)/2)]
            gradients2, variables2 = gradients[math.ceil(len(model2.layers)/2):], model2.trainable_variables[math.ceil(len(model2.layers)/2):]
            optimizer1.apply_gradients(zip(gradients1, variables1))
            optimizer2.apply_gradients(zip(gradients2, variables2))
            mean_loss(loss)
            compute_metrics(metrics, y_batch, y_pred)
            print_status_bar(step * batch_size, len(y_train), mean_loss, metrics)
        # Now to get validation results:
        y_pred_valid = model2(x_validation, training = False)
        validation_loss = tf.reduce_mean(loss_fn(y_validation, y_pred_valid))
        compute_metrics(val_metrics, y_batch, y_pred)
        print_status_bar(len(y_train), len(y_train), mean_loss, metrics, validation_loss, val_metrics)
        for metric in [mean_loss] + metrics:
            metric.reset_states()
            
    return model2

In [65]:
model2 = train_neural_network()

Epoch 1/50


To change all layers to have dtype float64 by default, call `tf.keras.backend.set_floatx('float64')`. To change just this layer, pass dtype='float64' to the layer constructor. If you are the author of this layer, you can disable autocasting by passing autocast=False to the base Layer constructor.

55000/55000 - loss: 13.6983 - accuracy: 0.6595 - valid_loss: 0.9665 - valid_accuracy: 0.6836
Epoch 2/50
55000/55000 - loss: 1.8391 - accuracy: 0.6757 - valid_loss: 0.8516 - valid_accuracy: 0.6719
Epoch 3/50
55000/55000 - loss: 1.4375 - accuracy: 0.6947 - valid_loss: 0.7803 - valid_accuracy: 0.7057
Epoch 4/50
55000/55000 - loss: 1.3275 - accuracy: 0.7158 - valid_loss: 0.8135 - valid_accuracy: 0.6953
Epoch 5/50
55000/55000 - loss: 1.2728 - accuracy: 0.7278 - valid_loss: 0.7910 - valid_accuracy: 0.7016
Epoch 6/50
55000/55000 - loss: 1.2341 - accuracy: 0.7363 - valid_loss: 0.7050 - valid_accuracy: 0.7116
Epoch 7/50
55000/55000 - loss: 1.1924 - accuracy: 0.7457 - valid_loss: 0.6886 - 

In [62]:
from sklearn.metrics import accuracy_score

y_pred_1 = model.predict(x_test)
y_pred_1 = np.argmax(y_pred_1, axis=-1)

y_pred_2 = model2.predict(x_test)
y_pred_2 = np.argmax(y_pred_2, axis=-1)

print(f"Keras implemented model test accuracy: {accuracy_score(y_test, y_pred_1)}")    
print(f"Manually implemented model test accuracy: {accuracy_score(y_test, y_pred_2)}")

Keras implemented model test accuracy: 0.7793
Manually implemented model test accuracy: 0.8094
