<a href="https://colab.research.google.com/github/Timure228/Hands-on-ML/blob/main/Chapter_12.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# You can create a tensor with tf.constant()
import tensorflow as tf
t = tf.constant([[1., 2., 3.], [4., 5., 6.]]) # matrix
t

<tf.Tensor: shape=(2, 3), dtype=float32, numpy=
array([[1., 2., 3.],
       [4., 5., 6.]], dtype=float32)>

In [None]:
# Check the shape and datatype of a tensor
t.shape, t.dtype

(TensorShape([2, 3]), tf.float32)

In [None]:
# Indexing
t[:, 1:] # [row, column]

<tf.Tensor: shape=(2, 2), dtype=float32, numpy=
array([[2., 3.],
       [5., 6.]], dtype=float32)>

In [None]:
t[..., 1, tf.newaxis] # tf.newaxis is simmilar to tf.expand_dims()

<tf.Tensor: shape=(2, 1), dtype=float32, numpy=
array([[2.],
       [5.]], dtype=float32)>

In [None]:
# Expand dimensions
tf.expand_dims(t, axis=1)

<tf.Tensor: shape=(2, 1, 3), dtype=float32, numpy=
array([[[1., 2., 3.]],

       [[4., 5., 6.]]], dtype=float32)>

In [None]:
tf.transpose(t)

<tf.Tensor: shape=(3, 2), dtype=float32, numpy=
array([[1., 4.],
       [2., 5.],
       [3., 6.]], dtype=float32)>

In [None]:
# Tensor operations
t + 10, tf.square(t), t @ tf.transpose(t)

(<tf.Tensor: shape=(2, 3), dtype=float32, numpy=
 array([[11., 12., 13.],
        [14., 15., 16.]], dtype=float32)>,
 <tf.Tensor: shape=(2, 3), dtype=float32, numpy=
 array([[ 1.,  4.,  9.],
        [16., 25., 36.]], dtype=float32)>,
 <tf.Tensor: shape=(2, 2), dtype=float32, numpy=
 array([[14., 32.],
        [32., 77.]], dtype=float32)>)

## Tensors and NumPy

In [None]:
# NumPy into tf
import numpy as np
a = np.array([[1., 2., 3.], [4., 5., 6.]], dtype=np.float32)
tf.constant(a)

<tf.Tensor: shape=(2, 3), dtype=float32, numpy=
array([[1., 2., 3.],
       [4., 5., 6.]], dtype=float32)>

In [None]:
# tf into NumPy
t.numpy(), np.array(t)

(array([[1., 2., 3.],
        [4., 5., 6.]], dtype=float32),
 array([[1., 2., 3.],
        [4., 5., 6.]], dtype=float32))

In [None]:
# Use tf operations on NumPy arrays
tf.square(a)

<tf.Tensor: shape=(2, 3), dtype=float32, numpy=
array([[ 1.,  4.,  9.],
       [16., 25., 36.]], dtype=float32)>

In [None]:
# Use NumPy operations on tf tensors
np.square(t)

array([[ 1.,  4.,  9.],
       [16., 25., 36.]], dtype=float32)

In [None]:
# Sum different data type tensors is not possible!
tf.constant(2.) + tf.constant(40)

InvalidArgumentError: cannot compute AddV2 as input #1(zero-based) was expected to be a float tensor but is a int32 tensor [Op:AddV2] name: 

In [None]:
tf.constant(2.) + tf.constant(40., dtype=tf.float64)

InvalidArgumentError: cannot compute AddV2 as input #1(zero-based) was expected to be a float tensor but is a double tensor [Op:AddV2] name: 

In [None]:
# But if you really need to convert types use tf.cast()
t2 = tf.constant(40., dtype=tf.float64)
tf.constant(2.) + tf.cast(t2, tf.float32)

<tf.Tensor: shape=(), dtype=float32, numpy=42.0>

## Variables

In [None]:
# tf.Tensor values are immutable. Mutable are tf.Variable
v = tf.Variable([[1., 2., 3.], [4., 5., 6.]])
v

<tf.Variable 'Variable:0' shape=(2, 3) dtype=float32, numpy=
array([[1., 2., 3.],
       [4., 5., 6.]], dtype=float32)>

In [None]:
# Use assign to update the variable
v.assign(2 * v)

<tf.Variable 'UnreadVariable' shape=(2, 3) dtype=float32, numpy=
array([[ 2.,  4.,  6.],
       [ 8., 10., 12.]], dtype=float32)>

In [None]:
v[0, 1].assign(42)

<tf.Variable 'UnreadVariable' shape=(2, 3) dtype=float32, numpy=
array([[ 2., 42.,  6.],
       [ 8., 10., 12.]], dtype=float32)>

In [None]:
v[:, 2].assign([0, 1.])

<tf.Variable 'UnreadVariable' shape=(2, 3) dtype=float32, numpy=
array([[ 2., 42.,  0.],
       [ 8., 10.,  1.]], dtype=float32)>

In [None]:
# To change specific cells use scatter_nd_update
v.scatter_nd_update(indices=[[0, 0], [1, 2]], updates=[100., 200.])

<tf.Variable 'UnreadVariable' shape=(2, 3) dtype=float32, numpy=
array([[100.,  42.,   0.],
       [  8.,  10., 200.]], dtype=float32)>

In [None]:
# Direct assignment won't work
v[1] = [7., 8., 9.]

TypeError: 'ResourceVariable' object does not support item assignment

In [None]:
# Ragged Tensor
tf.RaggedTensor.from_row_splits(
      values=[3, 1, 4, 1, 5, 9, 2, 6],
      row_splits=[0, 4, 4, 7, 8, 8])

<tf.RaggedTensor [[3, 1, 4, 1], [], [5, 9, 2], [6], []]>

# Custom Functions

In [None]:
# Recreate Huber loss
def huber_fn(y_true, y_pred):
  error = y_true - y_pred
  is_small_error = tf.abs(error) < 1
  squared_loss = tf.square(error) / 2
  linear_loss = tf.abs(error) - 0.5
  return tf.where(is_small_error, squared_loss, linear_loss) # If small return squared_loss, else linear_loss

In [None]:
tf.where(True, 1, 0), tf.where(False, 1, 0)

(<tf.Tensor: shape=(), dtype=int32, numpy=1>,
 <tf.Tensor: shape=(), dtype=int32, numpy=0>)

In [None]:
# When loading the model with a custom loss, mention it in custom_objects=
model = tf.keras.models.load_model("my_model_with_a_custom_loss", custom_objects={"huber_fn": huber_fn})

ValueError: File format not supported: filepath=my_model_with_a_custom_loss. Keras 3 only supports V3 `.keras` files and legacy H5 format files (`.h5` extension). Note that the legacy SavedModel format is not supported by `load_model()` in Keras 3. In order to reload a TensorFlow SavedModel as an inference-only layer in Keras 3, use `keras.layers.TFSMLayer(my_model_with_a_custom_loss, call_endpoint='serving_default')` (note that your `call_endpoint` might have a different name).

In [None]:
# If you don't want to load model with custom_objects=, decorate your custom loss function with @tf.keras.utils.register_keras_serializable()
@tf.keras.utils.register_keras_serializable()
def huber_fn(y_true, y_pred):
  error = y_true - y_pred
  is_small_error = tf.abs(error) < 1
  squared_loss = tf.square(error) / 2
  linear_loss = tf.abs(error) - 0.5
  return tf.where(is_small_error, squared_loss, linear_loss) # If small return squared_loss, else linear_loss

In [None]:
model = tf.keras.Sequential([
    tf.keras.layers.Dense(10)
])

In [None]:
# To change threshold of the loss function, create creator of it.
def create_huber(threshold=1.0):
  def huber_fn(y_true, y_pred):
    error = y_true - y_pred
    is_small_error = tf.abs(error) < threshold
    squared_loss = tf.square(error) / 2
    linear_loss = threshold * tf.abs(error) - threshold**2 / 2
    return tf.where(is_small_error, squared_loss, linear_loss) # If small return squared_loss, else linear_loss
  return huber_fn

model.compile(loss=create_huber(2.0), optimizer="nadam")

In [None]:
# When you save the model, threshold won't be saved, so mention it in custom_objects=
model = tf.keras.models.load_model(
    "my_model_with_loss_threshold",
    custom_objects={"huber_fn": create_huber(2.0)}
)

ValueError: File format not supported: filepath=my_model_with_loss_threshold. Keras 3 only supports V3 `.keras` files and legacy H5 format files (`.h5` extension). Note that the legacy SavedModel format is not supported by `load_model()` in Keras 3. In order to reload a TensorFlow SavedModel as an inference-only layer in Keras 3, use `keras.layers.TFSMLayer(my_model_with_loss_threshold, call_endpoint='serving_default')` (note that your `call_endpoint` might have a different name).

In [None]:
# To solve this, create a subclass of tf.keras.losses.Loss class, then implement its get_config() method
class HuberLoss(tf.keras.losses.Loss):
  def __init__(self, threshold=1.0, **kwargs):
    self.threshold = threshold
    super().__init__(**kwargs)

  def call(self, y_true, y_pred):
    error = y_true - y_pred
    is_small_error = tf.abs(error) < threshold
    squared_loss = tf.square(error) / 2
    linear_loss = threshold * tf.abs(error) - threshold**2 / 2
    return tf.where(is_small_error, squared_loss, linear_loss)

  def get_config(self):
    base_config = super().get_config()
    return {**base_config, "threshold": self.threshold}

In [None]:
# Merge two dictionaries
a = {"example1": "example2", "example3": "example4"}
print({**a, "example5": "example6"})
# OR
print(a | {"example5": "example6"})

In [None]:
# Use any instance of this custom loss class
model.compile(loss=HuberLoss(2.), optimizer="nadam")

In [None]:
# Now you don't need to write threshold each time you load the model
model = tf.keras.models.load_model("my_model_with_loss_class",
                                   custom_objects={"HuberLoss": HuberLoss})

In [None]:
# Custom Activation, Initializer, Regularizer and Constraint functions
def my_softplus(z): # Activation
  return tf.math.log(1.0 + tf.exp(z))

def my_glorot_initializer(shape, dtype=tf.float32): # Initializer
  stddev = tf.sqrt(2. / (shape[0] + shape[1]))
  return tf.random.normal(shape, stddev=stddev, dtype=dtype)

def my_l1_regularizer(weights): # Regularizer
  return tf.reduce_sum(tf.abs(0.01 * weights))

def my_positive_weights(weights): # Constraint
  return tf.where(weights < 0., tf.zeros_like(weights), weights)

In [None]:
layer = tf.keras.layers.Dense(1, activation=my_softplus,
                              kernel_initializer=my_glorot_initializer,
                              kernel_regularizer=my_l1_regularizer,
                              kernel_constraint=my_positive_weights)

* Activation function will be applied to the output.

* Weights will be initialized using value returned by the initializer.

* Each train step weights will be passed to the regularization function.

* Regularization loss will be added to the main loss.

* Constrain function replaces layers's weights by the constrained weights each train step.

In [None]:
# To save hyperparameters by saving, create a subclass of the appropriate class
class MyL1Regularizer(tf.keras.regularizers.Regularizer):
  def __init__(self, factor):
    self.factor = factor

  def __call__(self, weights):
    return tf.reduce_sum(tf.abs(self.factor * weights))

  def get_config(self):
    return {"factor": self.factor} # No configs from parent. Not defined by parent.

# As you can see, we implement call() method for losses, layers, activation functions and __call__() for
# regularizers, initializers and constraints.

# Custom metrics

In [None]:
# We can use our custom huber_fn() loss as a metric
model.compile(loss="mse", optimizer="nadam", metrics=[create_huber(2.0)])

In [None]:
# Check out precision metric
precision = tf.keras.metrics.Precision()
print(precision([1, 1, 0, 1], [1, 0, 1, 0])) # Predicted 1 True Positives / All 2 True Positives
print(precision([0, 0, 0, 1], [1, 0, 1, 0]))
print(precision([0, 0, 0, 1], [1, 0, 1, 0])) # Streaming metric

tf.Tensor(0.5, shape=(), dtype=float32)
tf.Tensor(0.25, shape=(), dtype=float32)
tf.Tensor(0.16666667, shape=(), dtype=float32)


In [None]:
# Check the precision result and variables (number of true and false positives)
precision.result(), precision.variables

(<tf.Tensor: shape=(), dtype=float32, numpy=0.1666666716337204>,
 [<Variable path=precision/true_positives, shape=(1,), dtype=float32, value=[1.]>,
  <Variable path=precision/false_positives, shape=(1,), dtype=float32, value=[5.]>])

In [None]:
# To reset variables use .reset_state()
precision.reset_state()

In [None]:
# Let's make HuberMetric class
class HuberMetric(tf.keras.metrics.Metric):
  def __init__(self, threshold=1.0, **kwargs):
    super().__init__(**kwargs)
    self.threshold = threshold
    self.huber_fn = create_huber(threshold)
    self.total = self.add_weight(name="total", initializer="zero")
    self.count = self.add_weight(name="count", initializer="zero")

  def update_state(self, y_true, y_pred, sample_weight=None):
    sample_metrics = self.huber_fn(y_true, y_pred)
    self.total.assign_add(tf.reduce_sum(sample_metrics))
    self.count.assign_add(tf.cast(tf.size(y_true), tf.float32))

  def result(self):
    return self.total / self.count

  def get_config(self):
    base_config = super().get_config()
    return {**base_config, "threshold": self.threshold}

# Custom Layers

In [None]:
# Create layer without any weights (like Flatten or ReLU layers) with tf.keras.layers.Lambda()
exponential_layer = tf.keras.layers.Lambda(lambda x: tf.exp(x)) # Applies exp function to inputs

In [None]:
# Recreate the Dense layer (simplified)
class MyDense(tf.keras.layers.Layer):
  def __init__(self, n_units, activation=None, **kwargs):
    super().__init__(**kwargs)
    self.n_units = n_units
    self.activation = tf.keras.activations.get(activation)

  def build(self, batch_input_shape):
    self.kernel = self.add_weight(
        name="kernel", shape=[batch_input_shape[-1], self.n_units],
        initializer="gloro_normal")
    self.bias = self.add_weight(name="bias", shape=[self.n_units], initializer="zeros")

  def call(self, X):
    return self.activation(X @ self.kernel + self.bias)

  def get_config(self):
    base_config = super().get_config()
    return {**base_config, "units": self.n_units, "activation": tf.keras.activations.serialize(self.activation)}

In [None]:
dense = MyDense(54)
dense.get_config()

{'name': 'my_dense',
 'trainable': True,
 'dtype': {'module': 'keras',
  'class_name': 'DTypePolicy',
  'config': {'name': 'float32'},
  'registered_name': None},
 'units': 54,
 'activation': 'linear'}

In [None]:
# Create multiple input layer
class MyMultiLayer(tf.keras.layers.Layer):
  def call(self, X):
    X1, X2 = X # Takes 2 inputs
    return X1 + X2, X1 * X2, X1 / X2 # Returns 3 outputs

In [None]:
# Create layer that adds Gaussian noise (for regularization) during training, but does nothing during training
class MyGaussianNoise(tf.keras.layers.Layer):
  def __init__(self, stddev, **kwargs):
    super().__init__(**kwargs)
    self.stddev = stddev

  def call(self, X, training=False):
    if training:
      noise = tf.random.normal(tf.shape(X), stddev=self.stddev)
      return X + MyGaussianNoise
    else:
      return X

In [None]:
# Recreate model from Figure 12-3
class ResidualBlock(tf.keras.layers.Layer): # First create ResidualBlock layer
  def __init__(self, n_layers, n_neurons, **kwargs):
    super().__init__(**kwargs)
    self.hidden = [tf.keras.layers.Dense(n_neurons, activation="relu",
                                         kernel_initializer="he_normal")
    for _ in range(n_layers)]

    def call(self, inputs):
      Z = inputs
      for layer in self.hidden:
        Z = layer(Z)
      return inputs + Z

    def get_config():
      base_config = super().get_config()
      return {**base_config, "n_layers": n_layers, "n_neurons": n_neurons}

# Now create model itself
class ResidualRegressor(tf.keras.Model):
  def __init__(self, output_dim, **kwargs):
    super().__init__(**kwargs)
    self.hidden1 = tf.keras.layers.Dense(30, activation="relu",
                                         kernel_initializer="he_normal")
    self.block1 = ResidualBlock(2, 30)
    self.block2 = ResidualBlock(2, 30)
    self.out = tf.keras.layers.Dense(output_dim)

  def call(self, inputs):
    Z = self.hidden1(inputs)
    for _ in range(1 + 3):
      Z = self.block1(Z)
    Z = self.block2(Z)
    return self.out(Z)

  def get_config(self):
    base_config = super().get_config()
    return {**base_config, "output_dim": self.out}

In [None]:
residual_reg = ResidualRegressor(2)
residual_reg.compile(loss=HuberLoss(1.5), optimizer="Nadam")

NameError: name 'HuberLoss' is not defined

In [None]:
residual_reg.get_config()

{'name': 'residual_regressor',
 'trainable': True,
 'dtype': {'module': 'keras',
  'class_name': 'DTypePolicy',
  'config': {'name': 'float32'},
  'registered_name': None},
 'output_dim': <Dense name=dense_6, built=False>}

In [None]:
residual_reg.summary()

In [None]:
# Let's build a model with a custom reconstruction loss and metrics
class ReconstructingRegressor(tf.keras.Model):
  def __init__(self, output_dim, **kwargs):
    super().__init__(**kwargs)
    self.hidden = [tf.keras.layers.Dense(30, activation="relu",
                                         kernel_initializer="he_normal") for _ in range(5)]
    self.out = tf.keras.layers.Dense(output_dim)
    self.reconstruction_mean = tf.keras.metrics.Mean(
        name="reconstruction_error")

  def build(self, batch_input_shape):
    n_inputs = batch_input_shape[-1]
    self.reconstruct = tf.keras.layers.Dense(n_inputs)

  def call(self, inputs, training=False):
    Z = inputs
    for layer in self.hidden:
      Z = layer(Z)
    reconstruction = self.reconstruct(Z)
    recon_loss = tf.reduce_mean(tf.square(reconstruction - inputs))
    self.add_loss(0.05 * recon_loss)
    if training:
      result = self.reconstruction_mean(recon_loss)
      self.add_metric(result)
    return self.out(Z)

In [None]:
rec_reg_model = ReconstructingRegressor(1)

In [None]:
rec_reg_model.summary()

### Computing Gradients using Autodiff

Evaluating derivatives (gradients) numerically

In [None]:
def f(w1, w2):
  return 3 * w1 ** 2 + 2 * w1 * w2

# Reverse-mode autodiff with TensorFlow
w1, w2 = tf.Variable(5.), tf.Variable(3.)
with tf.GradientTape() as tape: # tf.GradientTape() automatically records every operation that involves a variable
  z = f(w1, w2)

gradients = tape.gradient(z, [w1, w2]) # don't call it twice

In [None]:
gradients

[<tf.Tensor: shape=(), dtype=float32, numpy=36.0>,
 <tf.Tensor: shape=(), dtype=float32, numpy=10.0>]

## Custom Training Loops

In [165]:
X_train = np.random.randint(5, size=(1000, 4, 4, 3))
y_train = np.random.randint(4, size=(1000, 1))

X_train_scaled = X_train / 5

In [None]:
X_test = np.random.randint(5, size=(100, 4, 4, 3))
y_test = np.random.randint(4, size=(100, 1))

In [106]:
# Let's build a simple model.
# We don't need to compile it, because we do the training loop manually
l2_reg = tf.keras.regularizers.l2(0.05)
model = tf.keras.models.Sequential([
    tf.keras.layers.Dense(30, activation="relu", kernel_initializer="he_normal", kernel_regularizer=l2_reg),
    tf.keras.layers.Dense(1, kernel_regularizer=l2_reg)
])

# Generate random batch
def random_batch(X, y, batch_size=32):
  idx = np.random.randint(len(X), size=batch_size) # generate random indecies
  return X[idx], y[idx]

# Display training status
def print_status_bar(step, total, loss, metrics=None):
  metrics = " - ".join([f"{m.name}: {m.result():.4f}" for m in [loss] + (metrics or [])])
  end = "" if step < total else "\n"
  print(f"\r{step}/{total} - {metrics}", end=end)

# Define hyperparameters, optimizer, loss and metrics
n_epochs = 5
batch_size = 128
n_steps = len(X_train) // batch_size
optimizer = tf.keras.optimizers.SGD(learning_rate=0.01)
loss_fn = tf.keras.losses.MeanSquaredError()
mean_loss = tf.keras.metrics.Mean(name="mean_loss")
metrics = [tf.keras.metrics.MeanAbsoluteError()]

In [None]:
# Now write training loop itself
for epoch in range(1, n_epochs + 1):
  print("Epoch {}/{}".format(epoch, n_epochs))
  for step in range(1, n_steps + 1):
    X_batch, y_batch = random_batch(X_train_scaled, y_train, batch_size=batch_size)
    with tf.GradientTape() as tape:
      y_pred = model(X_batch, training=True) # Do the forward pass... (set training=True)
      main_loss = tf.reduce_mean(loss_fn(y_batch.squeeze(), y_pred)) # Calculate the loss (then compute mean of all losses with .reduce_mean())
      loss = tf.add_n([main_loss] + model.losses) # Sum the losses

    gradients = tape.gradient(loss, model.trainable_variables) # Compute gradients of the loss with regard to each trainable variable.
    optimizer.apply_gradients(zip(gradients, model.trainable_variables)) # Optimizer step. Apply gradients to optimizer

    for variable in model.variables: # Add weight constraints (optional)
      if variable.constraint is not None:
        variable.assign(variable.constraint(variable))

    mean_loss(loss) # Update mean loss
    for metric in metrics: # Update metrics
      metric(y_batch.squeeze(), y_pred) # With .squeeze(), the dimensions of size 1 are removed.

    print_status_bar(step, n_steps, mean_loss, metrics) # Display the train status

  for metric in [mean_loss] + metrics: # Reset mean loss and the metrics
    metric.reset_state()

Epoch 1/5
31/31 - mean_loss: 2.0220 - mean_absolute_error: 0.9896
Epoch 2/5
31/31 - mean_loss: 1.9456 - mean_absolute_error: 0.9748
Epoch 3/5
31/31 - mean_loss: 1.9280 - mean_absolute_error: 0.9891
Epoch 4/5
31/31 - mean_loss: 1.8657 - mean_absolute_error: 0.9785
Epoch 5/5
31/31 - mean_loss: 1.8206 - mean_absolute_error: 0.9758


In [None]:
np.random.randint(5, size=3)

array([0, 3, 2])

## TensoFlow Functions and Graphs

In [2]:
# Create a random function
def square(x):
  return x ** 2

square(tf.constant(2.))

<tf.Tensor: shape=(), dtype=float32, numpy=4.0>

In [3]:
# Convert the random function to a TensorFlow function with tf.function(). It will convert it to equivalent computation graph
# TF functions are faster than default python functions!
tf_square = tf.function(square)
print(tf_square)

# OR use a decorator @tf.function
@tf.function
def tf_square_dec(x):
  return x ** 2

<tensorflow.python.eager.polymorphic_function.polymorphic_function.Function object at 0x7babe0b06bd0>


##### Use `jit_compile=True` when calling `tf.function()` or `.compile()`, if you want to use XLA (reduces RAM usage and fuses multiple computations in a single kernel)

In [None]:
# Now it will return only tensors
tf_square(2.), tf_square_dec(2.)

(<tf.Tensor: shape=(), dtype=float32, numpy=4.0>,
 <tf.Tensor: shape=(), dtype=float32, numpy=4.0>)

In [None]:
# You still have access to your origin function
tf_square.python_function(2)

4

In [7]:
# Check the source code of the TensorFlow Function with tf.autograph.to_code()
tf.autograph.to_code(tf_square.python_function)

"def tf__square(x):\n    with ag__.FunctionScope('square', 'fscope', ag__.ConversionOptions(recursive=True, user_requested=True, optional_features=(), internal_convert_user_code=True)) as fscope:\n        do_return = False\n        retval_ = ag__.UndefinedReturnValue()\n        try:\n            do_return = True\n            retval_ = ag__.ld(x) ** 2\n        except:\n            do_return = False\n            raise\n        return fscope.ret(retval_, do_return)\n"

# Exercises

In [13]:
# 3.
import numpy as np

tf.range(10), tf.constant(np.arange(10)) # Difference is dtype.

(<tf.Tensor: shape=(10,), dtype=int32, numpy=array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9], dtype=int32)>,
 <tf.Tensor: shape=(10,), dtype=int64, numpy=array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9])>)

### 12.

In [56]:
# 12.
class LayerNorm(tf.keras.layers.Layer):
  def __init__(self, eps=0.001, **kwargs):
    super().__init__(**kwargs),
    self.eps = eps

  # 12. (a)
  def build(self, input_shape):
    self.alpha = self.add_weight(name="alpha", shape=input_shape[-1:], initializer="ones",
            trainable=True, dtype=tf.float32)
    self.beta = self.add_weight(name="beta", shape=input_shape[-1:], initializer="zeros",
                                trainable=True, dtype=tf.float32)

  # 12. (b)
  def call(self, X):
    mean, variance = tf.nn.moments(X, axes=-1, keepdims=True)
    return self.alpha * (X - mean) / (tf.sqrt(variance + self.eps)) + self.beta

  def get_config(self):
    base_config = super().get_config()
    return base_config | {"eps": self.eps}

In [73]:
X_train = np.random.randn(10, 4, 4, 3)

In [90]:
# 12. (c)
input_data = np.array([5., 3., 6.], dtype='float32')

original_norm_layer = tf.keras.layers.LayerNormalization()
original_out = original_norm_layer(X_train)

custom_norm_layer = LayerNorm(0.001)
custom_out = custom_norm_layer(X_train)

# Compare
tf.sqrt(tf.keras.losses.MeanAbsoluteError()(original_out, custom_out))

<tf.Tensor: shape=(), dtype=float32, numpy=0.00022276003437582403>

### 13. (a)

In [187]:
# Load Fashion MNIST dataset
(X_train, y_train), (X_test, y_test) = tf.keras.datasets.fashion_mnist.load_data()

In [188]:
# Define validation set
X_val, y_val = X_train[-11000:], y_train[-11000:]
X_train, y_train = X_train[:-11000], y_train[:-11000]

In [189]:
X_train_scaled, X_val_scaled = X_train / 255., X_val / 255.

In [193]:
model = tf.keras.Sequential([
    tf.keras.layers.Input((28, 28)),
    tf.keras.layers.Flatten(),
    tf.keras.layers.Dense(15, activation="relu"),
    tf.keras.layers.Dense(25, activation="relu"),
    tf.keras.layers.Dense(10, activation="softmax")
])

In [217]:
def display_status(step, n_steps, loss, mean_accuracy):
  print(f"\r {step} / {n_steps} | mean_loss: {loss:.4f} | mean_acc: {mean_accuracy:.2f}", end='')

# Generate random batch
def random_batch(X, y, batch_size=32):
  idx = np.random.randint(len(X), size=batch_size) # generate random indecies
  return X[idx], y[idx]


n_epochs = 5
batch_size = 64
n_steps = len(X_train) // batch_size
optimizer = tf.keras.optimizers.Nadam(learning_rate=0.001)
loss_fn = tf.keras.losses.SparseCategoricalCrossentropy()
mean_loss = tf.keras.metrics.Mean()
metrics = [tf.keras.metrics.SparseCategoricalAccuracy()]

In [219]:
# Training Loop (a)
for epoch in range(1, n_epochs + 1):
  print(f"{epoch} / {n_epochs} epochs")
  for step in range(1, n_steps + 1):
    X_batch, y_batch = random_batch(X_train_scaled, y_train, batch_size=batch_size)
    with tf.GradientTape() as tape:
      y_preds = model(X_batch, training=True)
      main_loss = tf.reduce_mean(loss_fn(y_batch, y_preds))
      loss = tf.add_n([main_loss] + model.losses)

    gradients = tape.gradient(loss, model.trainable_variables)
    optimizer.apply_gradients(zip(gradients, model.trainable_variables))

    mean_loss(loss)
    for metric in metrics:
      acc = metric(y_batch, y_preds)

    display_status(step=step, n_steps=n_steps, loss=loss, mean_accuracy=acc)


  y_val_preds = model(X_val)
  val_loss = tf.reduce_mean(loss_fn(y_val, y_val_preds))
  val_accuracy = tf.reduce_mean(metrics[0](y_val, y_val_preds))
  print(f" val_loss: {val_loss:.4f} | val_accuracy: {val_accuracy:.2f}")

  for metric in [mean_loss] + metrics:
    metric.reset_state()

1 / 5 epochs
 765 / 765 | mean_loss: 0.4094 | mean_acc: 0.86 val_loss: 80.8426742553711 | val_accuracy: 0.8492841720581055
2 / 5 epochs
 101 / 765 | mean_loss: 0.3423 | mean_acc: 0.86

KeyboardInterrupt: 

### 13. (b)

In [223]:
# Define lower & upper layers
lower_layers = tf.keras.Sequential([
    tf.keras.layers.Input((28, 28)),
    tf.keras.layers.Flatten(),
    tf.keras.layers.Dense(15, activation="relu", kernel_initializer="he_normal"),
])

upper_layer = tf.keras.layers.Dense(10, activation="softmax")

model_b = lower_layers
model_b.add(upper_layer)

In [225]:
# Define optimizers for lower & upper layers
lower_optimizer = tf.keras.optimizers.RMSprop(learning_rate=0.0001)
upper_optimizer = tf.keras.optimizers.Adam(learning_rate=0.01)

In [228]:
# Other stuff
loss = tf.keras.losses.SparseCategoricalCrossentropy()
mean_loss = tf.keras.metrics.Mean()
metrics = [tf.keras.metrics.SparseCategoricalAccuracy()]

In [231]:
# Training loop (b)
for epoch in range(1, n_epochs + 1):
  print(f"{epoch} / {n_epochs} epochs")
  for step in range(1, n_steps + 1):
    X_batch, y_batch = random_batch(X_train_scaled, y_train, batch_size=batch_size)
    with tf.GradientTape(persistent=True) as tape: # set persistent=True
      y_preds = model_b(X_batch, training=True)
      main_loss = tf.reduce_mean(loss_fn(y_batch, y_preds))
      loss = tf.add_n([main_loss] + model_b.losses)

    # Applying optimizers
    for layers, optimizer in ((lower_layers, lower_optimizer), (upper_layer, upper_optimizer)):
      gradients = tape.gradient(loss, layers.trainable_variables)
      optimizer.apply_gradients(zip(gradients, layers.trainable_variables))

    mean_loss(loss)
    for metric in metrics:
      acc = metric(y_batch, y_preds)

    display_status(step=step, n_steps=n_steps, loss=loss, mean_accuracy=acc)


  y_val_preds = model_b(X_val)
  val_loss = tf.reduce_mean(loss_fn(y_val, y_val_preds))
  val_accuracy = tf.reduce_mean(metrics[0](y_val, y_val_preds))
  print(f" val_loss: {val_loss:.4f} | val_accuracy: {val_accuracy:.2f}")

  for metric in [mean_loss] + metrics:
    metric.reset_state()

1 / 5 epochs
 765 / 765 | mean_loss: 0.4711 | mean_acc: 0.72 val_loss: 103.3705 | val_accuracy: 0.73
2 / 5 epochs
 92 / 765 | mean_loss: 0.6150 | mean_acc: 0.80

KeyboardInterrupt: 