In [1]:
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow import keras
import matplotlib.pyplot as plt

### 12. Implement a custom layer that performs Layer Normalization

#### a): The build() method should define two trainable weights α and β, both of shape input_shape[-1:] and data type tf.float32. α should be initialized with 1s, and β with 0s.

#### b): The call() method should compute the mean μ and standard deviation σ of each instance's features. For this, you can use tf.nn.moments(inputs, axes=-1, keepdims=True), which returns the mean μ and the variance σ2 of all instances (compute the square root of the variance to get the standard deviation). Then the function should compute and return α⊗(X - μ)/(σ + ε) + β, where ⊗ represents itemwise multiplication (*) and ε is a smoothing term (small constant to avoid division by zero, e.g., 0.001).

#### c) Ensure that your custom layer produces the same (or very nearly the same) output as the keras.layers.LayerNormalization layer.


In [19]:
class LayerNormalization(keras.layers.Layer):
    def __init__(self, epsilon = 0.001, **kwargs):
        super().__init__(**kwargs)
        self.epsilon = epsilon

    def build(self, batch_input_shape):
        self.alfa = self.add_weight(
            name = "alfa",
            shape = batch_input_shape[-1:], 
            initializer = "ones"
        )
        self.beta = self.add_weight(
            name = "beta",
            shape = batch_input_shape[-1:], 
            initializer = "zeros"
        )
        super().build(batch_input_shape)

    def call(self, X):
        mean, variance = tf.nn.moments(X, axes=-1, keepdims=True)
        return self.alfa * (X-mean)/(tf.sqrt(variance + self.epsilon)) + self.beta

    def compute_output_shape(self, batch_input_shape):
        return batch_input_shape

    def get_config(self):
        base_config = super().get_config()
        return {**base_config, "epsilon": self.epsilon}


# c) - Check how it works

(X_train_full, y_train_full), (X_test, y_test) = keras.datasets.mnist.load_data()
check_normalization = X_train_full[5000:] / 255.

my_normalization = LayerNormalization()
keras_normalization = keras.layers.LayerNormalization()

print(tf.reduce_sum(my_normalization(check_normalization) - keras_normalization(check_normalization)))

# There is a diffrence between my calculated values and values calculated by keras, which is equal to 0.72437537   
    

tf.Tensor(0.72437537, shape=(), dtype=float32)


### 13. Train a model using a custom training loop to tackle the Fashion MNIST dataset




#### a) Display the epoch, iteration, mean training loss, and mean accuracy over each epoch (updated at each iteration), as well as the validation loss and accuracy at the end of each epoch.

In [32]:
(X_train_full, y_train_full), (X_test, y_test) = keras.datasets.fashion_mnist.load_data()
X_train_full = X_train_full.astype(np.float32) / 255.
X_valid, X_train = X_train_full[:5000], X_train_full[5000:]
y_valid, y_train = y_train_full[:5000], y_train_full[5000:]
X_test = X_test.astype(np.float32) / 255.

keras.backend.clear_session()
np.random.seed(42)
tf.random.set_seed(42)

fashion_network = keras.models.Sequential([
    keras.layers.Flatten(input_shape = [28, 28]),
    keras.layers.Dense(50, activation = "relu"),
    keras.layers.Dense(10, activation = "softmax")
])

def random_batch(X, y, batch_size=32):
    idx = np.random.randint(len(X), size=batch_size)
    return X[idx], y[idx]

def print_status_bar(iteration, total, loss, metrics=None):
    metrics = " - ".join(["{}: {:.4f}".format(m.name, m.result())
                            for m in [loss] + (metrics or [])])
    end = "" if iteration < total else "\n"
    print("\r{}/{} - ".format(iteration, total) + metrics, end=end)

n_epochs = 3
batch_size = 32
n_steps = len(X_train) // batch_size
loss_fn = keras.losses.sparse_categorical_crossentropy
mean_loss = keras.metrics.Mean()
metrics = [keras.metrics.SparseCategoricalAccuracy()]

for epoch in range(1, n_epochs + 1):
    print(f'Epoch number {epoch}')
    for step in range(1, n_steps + 1):
        X_batch, y_batch = random_batch(X_train, y_train)
        with tf.GradientTape() as tape:
            y_pred = fashion_network(X_batch, training=True)
            main_loss = tf.reduce_mean(loss_fn(y_batch, y_pred))
            loss = tf.add_n([main_loss] + fashion_network.losses)
        gradients = tape.gradient(loss, fashion_network.trainable_variables)
        mean_loss(loss)

        for metric in metrics:
            metric(y_batch, y_pred)
        print_status_bar(step*batch_size, len(y_train), mean_loss, metrics)
    print_status_bar(len(y_train), len(y_train), mean_loss, metrics)
    for metric in [mean_loss] + metrics:
        metric.reset_states()

Epoch number 1
55000/55000 - mean: 2.3547 - sparse_categorical_accuracy: 0.1150
Epoch number 2
55000/55000 - mean: 2.3544 - sparse_categorical_accuracy: 0.1171
Epoch number 3
55000/55000 - mean: 2.3577 - sparse_categorical_accuracy: 0.1164
