In [1]:
# Python ≥3.5 is required
import sys
assert sys.version_info >= (3, 5)

# Scikit-Learn ≥0.20 is required
import sklearn
assert sklearn.__version__ >= "0.20"

try:
    # %tensorflow_version only exists in Colab.
    %tensorflow_version 2.x
except Exception:
    pass

# TensorFlow ≥2.4 is required in this notebook
# Earlier 2.x versions will mostly work the same, but with a few bugs
import tensorflow as tf
from tensorflow import keras
assert tf.__version__ >= "2.4"

# Common imports
import numpy as np
import os

# To make this notebook's output stable across runs
np.random.seed(42)
tf.random.set_seed(42)

# To plot pretty figures
%matplotlib inline
import matplotlib as mpl
import matplotlib.pyplot as plt
mpl.rc('axes', labelsize=14)
mpl.rc('xtick', labelsize=12)
mpl.rc('ytick', labelsize=12)

# Where to save the figures
PROJECT_ROOT_DIR = "."
CHAPTER_ID = "deep"
IMAGES_PATH = os.path.join(PROJECT_ROOT_DIR, "images", CHAPTER_ID)
os.makedirs(IMAGES_PATH, exist_ok=True)

def save_fig(fig_id, tight_layout=True, fig_extension="png", resolution=300):
    path = os.path.join(IMAGES_PATH, fig_id + "." + fig_extension)
    print("Saving figure", fig_id)
    if tight_layout:
        plt.tight_layout()
    plt.savefig(path, format=fig_extension, dpi=resolution)

Colab only includes TensorFlow 2.x; %tensorflow_version has no effect.


In [2]:
# 12. Implement a custom layer that performs `Layer Normalization`

In [8]:
class LayerNormalization(keras.layers.Layer):
    def __init__(self, eps=0.001, **kwargs):
        super().__init__(**kwargs)
        self.eps = eps

    def build(self, batch_input_shape):
        self.alpha = self.add_weight(name="alpha", shape=batch_input_shape[-1:], initializer="ones")
        self.beta  = self.add_weight(name="beta",  shape=batch_input_shape[-1:], initializer="zeros")
        super().build(batch_input_shape) # must be at the end

    def call(self, X):
        mean, variance = tf.nn.moments(X, axes=-1, keepdims=True)
        return self.alpha * (X - mean) / (tf.sqrt(variance + self.eps)) + self.beta

    def compute_output_shape(self, batch_input_shape):
        return batch_input_shape # same shape as input (this layer only performs normalization)

    def get_config(self):
        base_config = super().get_config()
        return {**base_config, "eps": self.eps}

In [20]:
# Let's load some data to test our custom layer
(X_train_full, y_train_full), (X_test, y_test) = keras.datasets.fashion_mnist.load_data()
X_train_full = X_train_full.astype(np.float32) / 255.
X_valid, X_train = X_train_full[:5000], X_train_full[5000:]
y_valid, y_train = y_train_full[:5000], y_train_full[5000:]
X_test = X_test.astype(np.float32) / 255.

Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/train-labels-idx1-ubyte.gz
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/train-images-idx3-ubyte.gz
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/t10k-labels-idx1-ubyte.gz
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/t10k-images-idx3-ubyte.gz


In [21]:
# Ensure that our custom layer produces the same output as the `keras.layers.LayerNormalization` layer
X = X_train.astype(np.float32)

custom_layer_norm = LayerNormalization()
keras_layer_norm  = keras.layers.LayerNormalization()

tf.reduce_mean(keras.losses.mean_absolute_error(keras_layer_norm(X),
                                               custom_layer_norm(X)))

<tf.Tensor: shape=(), dtype=float32, numpy=9.576266e-08>

In [22]:
# 13. Train a model using a custom training loop to tackel the Fashion MNIST dataset
# Use different optimizers for the upper layers and the lower layers

In [26]:
keras.backend.clear_session()
np.random.seed(42)
tf.random.set_seed(42)

In [33]:
lower_layers = keras.models.Sequential([
    keras.layers.Flatten(input_shape=[28, 28]),
    keras.layers.Dense(100, activation="relu"),
])

upper_layers = keras.models.Sequential([
    keras.layers.Dense(10,  activation="softmax"),
])

model = keras.models.Sequential([lower_layers, upper_layers])

In [34]:
lower_optimizer = keras.optimizers.SGD(learning_rate=1e-4)
upper_optimizer = keras.optimizers.Nadam(learning_rate=1e-3)

In [35]:
n_epochs    = 5
batch_size  = 32
n_steps     = len(X_train) // batch_size
loss_fn     = keras.losses.sparse_categorical_crossentropy
mean_loss   = keras.metrics.Mean()
metrics     = [keras.metrics.SparseCategoricalAccuracy()]

In [30]:
from tqdm.notebook import trange
from collections import OrderedDict

In [31]:
def random_batch(X, y, batch_size=32):
    idx = np.random.randint(len(X), size=batch_size)
    return X[idx], y[idx]

In [36]:
with trange(1, n_epochs + 1, desc="All epochs") as epochs:
    # For each epoch
    for epoch in epochs:
        with trange(1, n_steps + 1, desc="Epoch {}/{}".format(epoch, n_epochs)) as steps:
            # For each step (batch) within an epoch
            for step in steps:
                # Select a random batch from the training set
                X_batch, y_batch = random_batch(X_train, y_train)
                
                # Make prediction for one batch and compute the loss
                with tf.GradientTape(persistent=True) as tape:
                    y_pred    = model(X_batch)
                    main_loss = tf.reduce_mean(loss_fn(y_batch, y_pred))
                    loss      = tf.add_n([main_loss] + model.losses)
                
                # Compute the gradienst of the loss with regard to each trainable variable
                # and apply them (the gradients) to the lower & upper optimizers
                for layers, optimizer in ((lower_layers, lower_optimizer), (upper_layers, upper_optimizer)):
                    gradients = tape.gradient(loss, layers.trainable_variables)
                    optimizer.apply_gradients(zip(gradients, layers.trainable_variables))
                del tape
                
                # Apply constraints to variables (if needed)
                for variable in model.variables:
                    if variable.constraint is not None:
                        variable.assign(variable.constraint(variable))                    
                status = OrderedDict()

                # Update the mean loss (over the current epoch)
                mean_loss(loss)
                status["loss"] = mean_loss.result().numpy()

                # Update the metrics   (over the current epoch)
                for metric in metrics:
                    metric(y_batch, y_pred)
                    status[metric.name] = metric.result().numpy()
                
                # tqdm progress bar
                steps.set_postfix(status)

            # Predict on the validation set and compute the loss and accuracy 
            y_pred = model(X_valid)
            status["val_loss"]     = np.mean(loss_fn(y_valid, y_pred))
            status["val_accuracy"] = np.mean(keras.metrics.sparse_categorical_accuracy(tf.constant(y_valid, dtype=np.float32), y_pred))
            steps.set_postfix(status)
        
        # Reset the states of the mean loss and the metrics
        for metric in [mean_loss] + metrics:
            metric.reset_states()

All epochs:   0%|          | 0/5 [00:00<?, ?it/s]

Epoch 1/5:   0%|          | 0/1718 [00:00<?, ?it/s]

Epoch 2/5:   0%|          | 0/1718 [00:00<?, ?it/s]

Epoch 3/5:   0%|          | 0/1718 [00:00<?, ?it/s]

Epoch 4/5:   0%|          | 0/1718 [00:00<?, ?it/s]

Epoch 5/5:   0%|          | 0/1718 [00:00<?, ?it/s]