Implement a custom layer that performs layer normalization (we will use this type of layer in Chapter 15):

The build() method should define two trainable weights α and β, both of shape input_shape[-1:] and data type tf.float32. α should be initialized with 1s, and β with 0s.

In [1]:
import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt

2023-09-19 18:28:47.277629: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [2]:
class myLayer(tf.keras.layers.Layer):
    def __init__(self,eps=0.001,**kwargs):
        super().__init__(**kwargs)
        self.eps = eps
    def build(self,batch_input_shape):
        self.alpha = self.add_weight(name='alpha',
                                     shape=batch_input_shape[-1:],
                                     initializer='ones')
        self.beta = self.add_weight(name='beta',
                                    shape=batch_input_shape[-1:],
                                    initializer='zeros')
        super().build(batch_input_shape)
    def call(self,X):
        mean,var = tf.nn.moments(X,axes=-1,keepdims=True)
        return self.alpha*(X-mean)/(tf.sqrt(var+self.eps))+self.beta
    def compute_output_shape(self,batch_input_shape):
        return batch_input_shape
    def get_config(self):
        base_config = super().get_config()
        return {**base_config,'eps':self.eps}

In [3]:
(X_train_full, y_train_full), (X_test, y_test) = tf.keras.datasets.fashion_mnist.load_data()
X_train = X_train_full[5000:]
X_valid = X_train_full[:5000]
y_train = y_train_full[5000:]
y_valid = y_train_full[:5000]
X_test = X_test/255.0

In [4]:
X = X_train.astype(np.float32)
X_valid = X_valid.astype(np.float32).reshape(-1,28*28)/255.0
X_test = X_test.astype(np.float32).reshape(-1,28*28)/255.0
y = y_train.astype(np.int32)
y_valid = y_valid.astype(np.int32)
y_test = y_test.astype(np.int32)

custom_layer = myLayer()
keras_layer = tf.keras.layers.LayerNormalization()
tf.reduce_mean(tf.keras.losses.mean_squared_error(keras_layer(X),custom_layer(X)))

<tf.Tensor: shape=(), dtype=float32, numpy=1.3288982e-14>

In [5]:
random_alpha = np.random.rand(X.shape[-1])
random_beta = np.random.rand(X.shape[-1])
custom_layer.set_weights([random_alpha,random_beta])
keras_layer.set_weights([random_alpha,random_beta])
tf.reduce_mean(tf.keras.losses.mean_squared_error(keras_layer(X),custom_layer(X)))

<tf.Tensor: shape=(), dtype=float32, numpy=5.8439543e-15>

<h1>Train the Fashion MINST</h1>

In [6]:
(x_train_full, y_train_full), (x_test, y_test) = tf.keras.datasets.fashion_mnist.load_data()
x_train = x_train_full[5000:].astype(np.float32)/255.0
x_valid = x_train_full[:5000].astype(np.float32)/255.0
y_train = y_train_full[5000:]
y_valid = y_train_full[:5000]
x_test = x_test/255.0

In [7]:
tf.keras.backend.clear_session()
np.random.seed(42)
tf.random.set_seed(42)

In [8]:
lower_layers = tf.keras.models.Sequential([
    tf.keras.layers.Flatten(input_shape=[28,28]),
    tf.keras.layers.Dense(100,activation='relu')
])
upper_layers = tf.keras.models.Sequential([
    tf.keras.layers.Dense(10,activation='softmax')
])

lower_optimizer = tf.keras.optimizers.SGD(lr=1e-4)
upper_optimizer = tf.keras.optimizers.Nadam(lr=1e-3)
model = tf.keras.models.Sequential([lower_layers,upper_layers])



In [9]:
n_epochs = 5
batch_size = 32
n_steps = len(x_train)//batch_size
optimizer = tf.keras.optimizers.Nadam(lr=1e-3)
mean_loss = tf.keras.metrics.Mean()
loss_fn = tf.keras.losses.sparse_categorical_crossentropy
matrics = [tf.keras.metrics.SparseCategoricalAccuracy()]



In [10]:
def random_batch(X, y, batch_size=32):
    idx = np.random.randint(len(X), size=batch_size)
    return X[idx], y[idx]

In [11]:
def print_status_bar(step, total, loss, metrics=None):
    metrics = ' - '.join(['{}: {:.4f}'.format(m.name, m.result()) for m in [loss] + (metrics or [])])
    end = '' if step < total else '\n'
    print('\r{}/{} - '.format(step, total) + metrics, end=end)

          
   
This custom training loop is designed for training a neural network model using TensorFlow. Here's a detailed explanation of each step:

1. **Gradient Computation (Step 1):** In this step, you use a persistent GradientTape to compute gradients for the model's parameters. It records operations for gradient computation during the forward pass.

2. **Gradient Descent (Step 2):** After computing gradients, you update the model's trainable parameters using an optimizer (e.g., stochastic gradient descent) based on these gradients. This loop allows for different optimizers and layer sets (lower and upper) to be used.

3. **Applying Constraints (Step 3 - Optional):** If your model has any variable constraints (e.g., weight constraints), you apply them to the model's parameters here.

4. **Monitoring Training Progress (Step 4):** You create a dictionary called `status` to store training progress information. This includes the loss value, metrics (e.g., accuracy), and any other relevant information for monitoring.

5. **Validation (Step 5):** After each training batch, you perform validation on a separate validation dataset (x_valid, y_valid) to assess the model's performance on unseen data. The validation loss and accuracy are calculated and displayed.

6. **Reset Metrics (Step 6):** To prepare for the next batch, you reset the states of metrics (e.g., mean loss, accuracy) so that they don't accumulate over multiple batches.

This custom training loop allows for fine-grained control over the training process and monitoring of various metrics. It's a powerful way to train and evaluate neural network models while having full flexibility in handling gradients, optimizers, and constraints.

In [18]:
from tqdm.notebook import trange
from collections import OrderedDict

# Create a persistent GradientTape


with trange(0, n_epochs, desc='All epochs') as epochs:
    for epoch in epochs:
        with trange(1, n_steps + 1, desc=f"Epoch {epoch}/{n_epochs}") as steps:
            for step in steps:
                x_batch, y_batch = random_batch(x_train, y_train)
                
                # Use the persistent tape for multiple gradient computations
                with tf.GradientTape(persistent=True) as tape:
                    y_pred = model(x_batch)
                    main_loss = tf.reduce_mean(loss_fn(y_batch, y_pred))
                    loss = tf.add_n([main_loss] + model.losses)
            
                
                for layers, optimizer in [(lower_layers, lower_optimizer), (upper_layers, upper_optimizer)]:
                    grads = tape.gradient(loss, layers.trainable_variables)
                    optimizer.apply_gradients(zip(grads, layers.trainable_variables))
                del tape
                for var in model.trainable_variables:
                    if var.constraint is not None:
                        var.assign(var.constraint(var))
                
                status = OrderedDict()
                mean_loss(loss)
                status['loss'] = mean_loss.result().numpy()
                
                for metric in matrics:
                    metric(y_batch, y_pred)
                    status[metric.name] = metric.result().numpy()
                
                steps.set_postfix(status)
                
                y_pred = model(x_valid)
                status['var_loss'] = tf.reduce_mean(loss_fn(y_valid, y_pred)).numpy()
                status['val_acc'] = np.mean(tf.keras.metrics.sparse_categorical_accuracy(tf.constant(y_valid, dtype=np.float32), y_pred))
                steps.set_postfix(status)
                
                for metric in [mean_loss] + matrics:
                    metric.reset_states()

# Clean up the persistent tape at the end



All epochs:   0%|          | 0/5 [00:00<?, ?it/s]

Epoch 0/5:   0%|          | 0/1718 [00:00<?, ?it/s]

KeyboardInterrupt: 