# Chapter 12: Custom Models and Training with TensorFlow

This notebook contains the code reproductions and theoretical explanations for Chapter 12 of *Hands-On Machine Learning with Scikit-Learn, Keras, and TensorFlow*.

## Chapter Summary

This chapter transitions from the high-level `tf.keras` API to TensorFlow's lower-level Python API. The main objective is to provide the tools and knowledge necessary for when you need extra control over your models and training loops.

Key topics covered include:

* **TensorFlow Basics:** Using TensorFlow like NumPy for tensor manipulation, understanding the difference between `tf.Tensor` (immutable) and `tf.Variable` (mutable), and exploring other data structures like ragged and sparse tensors.
* **Customizing Keras Components:** We learn how to create:
    * Custom Loss Functions
    * Custom Activation Functions, Initializers, Regularizers, and Constraints
    * Custom Metrics (including streaming/stateful metrics)
    * Custom Layers (stateless and stateful)
    * Custom Models (by subclassing `keras.Model`)
    * How to save and load models containing these custom components.
* **Autodiff and Custom Training Loops:** The chapter explains how to compute gradients automatically using `tf.GradientTape`. This is the foundation for building custom training loops, which gives you full control over the training process (e.g., using multiple optimizers for different parts of a model).
* **TF Functions and Graphs:** Finally, it covers `tf.function`, a powerful tool that converts Python functions into high-performance TensorFlow computation graphs. It explains how AutoGraph and Tracing work to make this conversion, and the rules you must follow to ensure your functions are convertible.

## A Quick Tour of TensorFlow

### Theoretical Explanation

TensorFlow is an open-source library for numerical computation, fine-tuned for large-scale Machine Learning.

Its main features include:
* **NumPy-like Core:** Its core API is similar to NumPy's but with GPU support.
* **Distributed Computing:** It can run on multiple devices and servers.
* **JIT Compiler:** It includes a just-in-time (JIT) compiler that optimizes computation graphs for speed and memory.
* **Portability:** You can train a model in one environment (like Python on Linux) and run it in another (like Java on an Android device).
* **Autodiff:** It implements automatic differentiation (which we'll explore) and provides excellent optimizers.

While `tf.keras` is the high-level API, TensorFlow also provides a powerful lower-level API. We'll start by exploring how to use TF like NumPy.

## Using TensorFlow like NumPy

### Tensors and Operations

A `tf.Tensor` is the primary data structure in TensorFlow. It's very similar to a NumPy `ndarray`: it's a multidimensional array that can hold a scalar. Tensors are **immutable**.

In [1]:
import tensorflow as tf
from tensorflow import keras
import numpy as np

# Create tensors
tf.constant([[1., 2., 3.], [4., 5., 6.]]) # matrix

<tf.Tensor: shape=(2, 3), dtype=float32, numpy=
array([[1., 2., 3.],
       [4., 5., 6.]], dtype=float32)>

In [2]:
tf.constant(42) # scalar

<tf.Tensor: shape=(), dtype=int32, numpy=42>

In [3]:
# Check shape and dtype
t = tf.constant([[1., 2., 3.], [4., 5., 6.]])
t.shape

TensorShape([2, 3])

In [4]:
t.dtype

tf.float32

In [5]:
# Indexing (works like NumPy)
t[:, 1:]

<tf.Tensor: shape=(2, 2), dtype=float32, numpy=
array([[2., 3.],
       [5., 6.]], dtype=float32)>

In [6]:
t[..., 1, tf.newaxis]

<tf.Tensor: shape=(2, 1), dtype=float32, numpy=
array([[2.],
       [5.]], dtype=float32)>

In [7]:
# Operations
t + 10 # equivalent to tf.add(t, 10)

<tf.Tensor: shape=(2, 3), dtype=float32, numpy=
array([[11., 12., 13.],
       [14., 15., 16.]], dtype=float32)>

In [8]:
tf.square(t)

<tf.Tensor: shape=(2, 3), dtype=float32, numpy=
array([[ 1.,  4.,  9.],
       [16., 25., 36.]], dtype=float32)>

In [9]:
# Matrix multiplication
t @ tf.transpose(t) # equivalent to tf.matmul(t, tf.transpose(t))

<tf.Tensor: shape=(2, 2), dtype=float32, numpy=
array([[14., 32.],
       [32., 77.]], dtype=float32)>

### Keras's Low-Level API

`tf.keras` also has its own low-level API in `keras.backend`. These functions generally just call the corresponding TensorFlow operations.

In [10]:
K = keras.backend
K.square(K.transpose(t)) + 10

<tf.Tensor: shape=(3, 2), dtype=float32, numpy=
array([[11., 26.],
       [14., 35.],
       [19., 46.]], dtype=float32)>

### Tensors and NumPy

Tensors and NumPy arrays work together nicely. You can create a tensor from a NumPy array and vice-versa. You can even apply TF ops to NumPy arrays and NumPy ops to tensors.

**Note:** NumPy uses 64-bit precision by default, while TensorFlow uses 32-bit precision. This is because 32-bit is generally enough for neural networks and is faster, using less RAM. When converting, you should generally set `dtype=tf.float32`.

In [11]:
a = np.array([2., 4., 5.])
tf.constant(a)

<tf.Tensor: shape=(3,), dtype=float64, numpy=array([2., 4., 5.])>

In [12]:
t.numpy() # Convert a tensor to a NumPy array

array([[1., 2., 3.],
       [4., 5., 6.]], dtype=float32)

In [13]:
tf.square(a) # Apply TF op to NumPy array

<tf.Tensor: shape=(3,), dtype=float64, numpy=array([ 4., 16., 25.])>

In [14]:
np.square(t) # Apply NumPy op to a tensor

array([[ 1.,  4.,  9.],
       [16., 25., 36.]], dtype=float32)

### Type Conversions

TensorFlow is very strict about types. It **does not perform automatic type conversions** as this can hurt performance. Operations between tensors of incompatible types will raise an exception.

In [15]:
try:
    tf.constant(2.) + tf.constant(40)
except tf.errors.InvalidArgumentError as e:
    print(e)

cannot compute AddV2 as input #1(zero-based) was expected to be a float tensor but is a int32 tensor [Op:AddV2] name: 


In [16]:
try:
    tf.constant(2.) + tf.constant(40., dtype=tf.float64)
except tf.errors.InvalidArgumentError as e:
    print(e)

cannot compute AddV2 as input #1(zero-based) was expected to be a float tensor but is a double tensor [Op:AddV2] name: 


In [17]:
# We must cast explicitly
t2 = tf.constant(40., dtype=tf.float64)
tf.constant(2.0) + tf.cast(t2, tf.float32)

<tf.Tensor: shape=(), dtype=float32, numpy=42.0>

### Variables

`tf.Tensor` values are immutable. If you need a value that can be modified (like the weights of a neural network), you must use a `tf.Variable`.

In [18]:
v = tf.Variable([[1., 2., 3.], [4., 5., 6.]])
v

<tf.Variable 'Variable:0' shape=(2, 3) dtype=float32, numpy=
array([[1., 2., 3.],
       [4., 5., 6.]], dtype=float32)>

In [19]:
# Modify a variable in-place
v.assign(2 * v)

<tf.Variable 'UnreadVariable' shape=(2, 3) dtype=float32, numpy=
array([[ 2.,  4.,  6.],
       [ 8., 10., 12.]], dtype=float32)>

In [20]:
v[0, 1].assign(42)

<tf.Variable 'UnreadVariable' shape=(2, 3) dtype=float32, numpy=
array([[ 2., 42.,  6.],
       [ 8., 10., 12.]], dtype=float32)>

In [21]:
v[:, 2].assign([0., 1.])

<tf.Variable 'UnreadVariable' shape=(2, 3) dtype=float32, numpy=
array([[ 2., 42.,  0.],
       [ 8., 10.,  1.]], dtype=float32)>

In [22]:
# This will fail, direct item assignment is not supported
# try:
#     v[1, 2] = 100
# except TypeError as e:
#     print(e)

In [23]:
v.scatter_nd_update(indices=[[0, 0], [1, 2]], updates=[100., 200.])

<tf.Variable 'UnreadVariable' shape=(2, 3) dtype=float32, numpy=
array([[100.,  42.,   0.],
       [  8.,  10., 200.]], dtype=float32)>

### Other Data Structures

TensorFlow supports several other data structures:
* **Sparse Tensors (`tf.SparseTensor`):** Efficiently represent tensors with mostly zeros.
* **Tensor Arrays (`tf.TensorArray`):** Lists of tensors. Fixed size by default, but can be dynamic.
* **Ragged Tensors (`tf.RaggedTensor`):** Represent lists of lists of tensors (static lists, but tensors can have varying lengths).
* **String Tensors (`tf.string`):** Tensors of byte strings. `tf.strings` provides ops for them.
* **Sets (`tf.sets`):** Represented as regular tensors (or sparse tensors).
* **Queues (`tf.queue`):** e.g., `FIFOQueue`.

## Customizing Models and Training Algorithms

This section covers how to create custom losses, metrics, layers, and models.

### Custom Loss Functions

Let's say we want to use the Huber loss. We can just define it as a Python function.

In [24]:
# Setup: Load and prepare the California Housing dataset (from Ch 10)
# This is needed to have data to compile and fit a model.
from sklearn.datasets import fetch_california_housing
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler

housing = fetch_california_housing()
X_train_full, X_test, y_train_full, y_test = train_test_split(
    housing.data, housing.target, random_state=42)
X_train, X_valid, y_train, y_valid = train_test_split(
    X_train_full, y_train_full, random_state=42)

scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_valid_scaled = scaler.transform(X_valid)
X_test_scaled = scaler.transform(X_test)

# Define a simple model
model = keras.models.Sequential([
    keras.layers.Dense(30, activation="relu", input_shape=X_train.shape[1:]),
    keras.layers.Dense(1)
])

  super().__init__(activity_regularizer=activity_regularizer, **kwargs)


In [25]:
# Code reproduction from Chapter 12 starts here

def huber_fn(y_true, y_pred):
    error = y_true - y_pred
    is_small_error = tf.abs(error) < 1
    squared_loss = tf.square(error) / 2
    linear_loss  = tf.abs(error) - 0.5
    return tf.where(is_small_error, squared_loss, linear_loss)

# Now we can compile the model with this custom loss
model.compile(loss=huber_fn, optimizer="nadam")
model.fit(X_train_scaled, y_train, epochs=2,
          validation_data=(X_valid_scaled, y_valid))

Epoch 1/2
[1m363/363[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m5s[0m 7ms/step - loss: 0.9529 - val_loss: 0.2605
Epoch 2/2
[1m363/363[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 3ms/step - loss: 0.2554 - val_loss: 0.1988


<keras.src.callbacks.history.History at 0x79ad7ae33e90>

### Saving and Loading Models That Contain Custom Components

When you save a model, Keras saves the *name* of the loss function. When you load it, you must provide a dictionary mapping the name to the actual function.

In [26]:
model.save("my_model_with_custom_loss.h5")



In [27]:
model = keras.models.load_model(
    "my_model_with_custom_loss.h5",
    custom_objects={"huber_fn": huber_fn}
)



What if your custom function has hyperparameters (like the `threshold` in Huber loss)? The threshold will not be saved.

One way is to create a function that returns a configured loss function:

In [28]:
def create_huber(threshold=1.0):
    def huber_fn(y_true, y_pred):
        error = y_true - y_pred
        is_small_error = tf.abs(error) < threshold
        squared_loss = tf.square(error) / 2
        linear_loss  = threshold * tf.abs(error) - threshold**2 / 2
        return tf.where(is_small_error, squared_loss, linear_loss)
    return huber_fn

model.compile(loss=create_huber(2.0), optimizer="nadam")

In [29]:
model.save("my_model_with_custom_loss_threshold_2.h5")



In [30]:
# You must specify the threshold value when loading
model = keras.models.load_model(
    "my_model_with_custom_loss_threshold_2.h5",
    custom_objects={"huber_fn": create_huber(2.0)}
)



A better way is to subclass `keras.losses.Loss` and implement the `get_config()` method. This allows Keras to save and load the hyperparameter.

In [31]:
class HuberLoss(keras.losses.Loss):
    def __init__(self, threshold=1.0, **kwargs):
        self.threshold = threshold
        super().__init__(**kwargs)
    def call(self, y_true, y_pred):
        error = y_true - y_pred
        is_small_error = tf.abs(error) < self.threshold
        squared_loss = tf.square(error) / 2
        linear_loss  = self.threshold * tf.abs(error) - self.threshold**2 / 2
        return tf.where(is_small_error, squared_loss, linear_loss)
    def get_config(self):
        base_config = super().get_config()
        return {**base_config, "threshold": self.threshold}

In [32]:
model.compile(loss=HuberLoss(2.), optimizer="nadam")

In [33]:
model.save("my_model_with_custom_loss_class.h5")



In [34]:
# When loading, Keras will call the from_config() method,
# which creates an instance and passes the config to the constructor.
model = keras.models.load_model(
    "my_model_with_custom_loss_class.h5",
    custom_objects={"HuberLoss": HuberLoss}
)



### Custom Activation Functions, Initializers, Regularizers, and Constraints

Most of these can be implemented as simple functions. Keras will handle them correctly.

In [35]:
def my_softplus(z): # equivalent to tf.nn.softplus(z) or keras.activations.softplus
    return tf.math.log(tf.exp(z) + 1.0)

def my_glorot_initializer(shape, dtype=tf.float32): # equivalent to keras.initializers.glorot_normal
    stddev = tf.sqrt(2. / (shape[0] + shape[1]))
    return tf.random.normal(shape, stddev=stddev, dtype=dtype)

def my_l1_regularizer(weights): # equivalent to keras.regularizers.l1(0.01)
    return tf.reduce_sum(tf.abs(0.01 * weights))

def my_positive_weights(weights): # equivalent to keras.constraints.nonneg() or tf.nn.relu
    return tf.where(weights < 0., tf.zeros_like(weights), weights)

In [36]:
# These custom functions can be used normally:
layer = keras.layers.Dense(30, activation=my_softplus,
                           kernel_initializer=my_glorot_initializer,
                           kernel_regularizer=my_l1_regularizer,
                           kernel_constraint=my_positive_weights)

If your custom component needs to save its state (like hyperparameters), you should subclass the appropriate Keras class, such as `keras.regularizers.Regularizer`, `keras.constraints.Constraint`, `keras.initializers.Initializer`, or `keras.layers.Layer`.

In [37]:
class MyL1Regularizer(keras.regularizers.Regularizer):
    def __init__(self, factor):
        self.factor = factor
    def __call__(self, weights):
        return tf.reduce_sum(tf.abs(self.factor * weights))
    def get_config(self):
        return {"factor": self.factor}

### Custom Metrics

**Theoretical Explanation:**

Losses and metrics are not the same:
* **Losses:** Used by Gradient Descent to train the model. Must be differentiable (mostly) and should not be 0 everywhere. Being easily interpretable is not required.
* **Metrics:** Used to evaluate a model. Must be interpretable. Can be non-differentiable or have 0 gradients.

In simple cases, you can define a metric just like a loss function. For example, our `huber_fn` could be used as a metric.

In [38]:
model.compile(loss="mse", optimizer="nadam", metrics=[create_huber(2.0)])

#### Streaming Metrics (Stateful Metrics)

Some metrics, like precision, cannot be simply averaged across batches. For example, if Batch 1 has 4/5 precision (80%) and Batch 2 has 0/3 precision (0%), the average is 40%. But the *overall* precision is (4+0)/(5+3) = 4/8 (50%).

For this, we need a **streaming metric** (or stateful metric) that can keep track of its state over multiple batches. Keras's built-in metrics, like `keras.metrics.Precision`, do this.

In [39]:
precision = keras.metrics.Precision()
precision([0, 1, 1, 1, 0, 1, 0, 1], [1, 1, 0, 1, 0, 1, 0, 1])

<tf.Tensor: shape=(), dtype=float32, numpy=0.800000011920929>

In [40]:
precision([0, 1, 0, 0, 1, 0, 1, 1], [1, 0, 1, 1, 0, 0, 0, 0])
# Note: The output here is the mean precision *so far*

<tf.Tensor: shape=(), dtype=float32, numpy=0.5>

In [41]:
precision.result() # This is the 50% we calculated

<tf.Tensor: shape=(), dtype=float32, numpy=0.5>

In [42]:
precision.variables # Shows the tracked true_positives and false_positives

[<Variable path=precision/true_positives, shape=(1,), dtype=float32, value=[4.]>,
 <Variable path=precision/false_positives, shape=(1,), dtype=float32, value=[4.]>]

To create your own streaming metric, you subclass `keras.metrics.Metric`.

In [45]:
class HuberMetric(keras.metrics.Metric):
    def __init__(self, threshold=1.0, **kwargs):
        super().__init__(**kwargs) # handles base args (e.g., dtype)
        self.threshold = threshold
        self.huber_fn = create_huber(threshold) # use the function we defined earlier
        self.total = self.add_weight("total", initializer="zeros")
        self.count = self.add_weight("count", initializer="zeros")
    def update_state(self, y_true, y_pred, sample_weight=None):
        metric = self.huber_fn(y_true, y_pred)
        self.total.assign_add(tf.reduce_sum(metric))
        self.count.assign_add(tf.cast(tf.size(y_true), tf.float32))
    def result(self):
        return self.total / self.count
    def get_config(self):
        base_config = super().get_config()
        return {**base_config, "threshold": self.threshold}
    def reset_states(self):
        # The state of the metric will be reset at the start of each epoch.
        self.total.assign(0.0)
        self.count.assign(0.0)

### Custom Layers

If your model needs an exotic layer, you can create a custom one.

**Stateless Layers:** For simple, stateless custom layers, you can use `keras.layers.Lambda`. For example, this layer applies the `tf.exp()` function.

In [46]:
exponential_layer = keras.layers.Lambda(lambda x: tf.exp(x))

**Stateful Layers:** For layers with weights (state), you must subclass `keras.layers.Layer`.
You need to implement:
* `__init__()`: Define hyperparameters.
* `build()`: Create the layer's variables (weights). This is called the first time the layer is used, when Keras knows the input shape.
* `call()`: Perform the layer's operations.
* `get_config()`: To allow the layer to be saved and loaded.

In [47]:
class MyDense(keras.layers.Layer):
    def __init__(self, units, activation=None, **kwargs):
        super().__init__(**kwargs)
        self.units = units
        self.activation = keras.activations.get(activation)

    def build(self, batch_input_shape):
        self.kernel = self.add_weight(
            name="kernel", shape=[batch_input_shape[-1], self.units],
            initializer="glorot_normal")
        self.bias = self.add_weight(
            name="bias", shape=[self.units], initializer="zeros")
        super().build(batch_input_shape) # must be at the end

    def call(self, X):
        return self.activation(X @ self.kernel + self.bias)

    def compute_output_shape(self, batch_input_shape):
        return tf.TensorShape(batch_input_shape.as_list()[:-1] + [self.units])

    def get_config(self):
        base_config = super().get_config()
        return {**base_config, "units": self.units,
                "activation": keras.activations.serialize(self.activation)}

If a layer needs to have a different behavior during training and testing (e.g., `Dropout`, `BatchNormalization`), it must have a `training` argument in its `call()` method.

In [48]:
class MyGaussianNoise(keras.layers.Layer):
    def __init__(self, stddev, **kwargs):
        super().__init__(**kwargs)
        self.stddev = stddev

    def call(self, X, training=None):
        if training:
            noise = tf.random.normal(tf.shape(X), stddev=self.stddev)
            return X + noise
        else:
            return X

    def compute_output_shape(self, batch_input_shape):
        return batch_input_shape

### Custom Models

To create a custom model, you subclass `keras.Model`. This is very similar to creating a custom layer. This is useful for building complex architectures, such as models with skip connections (like a residual block).

First, let's define a residual block layer.

In [49]:
class ResidualBlock(keras.layers.Layer):
    def __init__(self, n_layers, n_neurons, **kwargs):
        super().__init__(**kwargs)
        self.hidden = [keras.layers.Dense(n_neurons, activation="elu",
                                          kernel_initializer="he_normal")
                      for _ in range(n_layers)]

    def call(self, inputs):
        Z = inputs
        for layer in self.hidden:
            Z = layer(Z)
        return inputs + Z

Now we can use this custom layer to build a model with the Subclassing API. The `Model` class is a subclass of `Layer`, but it provides extra features like `compile()`, `fit()`, `evaluate()`, etc.

In [50]:
class ResidualRegressor(keras.Model):
    def __init__(self, output_dim, **kwargs):
        super().__init__(**kwargs)
        self.hidden1 = keras.layers.Dense(30, activation="elu",
                                          kernel_initializer="he_normal")
        self.block1 = ResidualBlock(2, 30)
        self.block2 = ResidualBlock(2, 30)
        self.out = keras.layers.Dense(output_dim)

    def call(self, inputs):
        Z = self.hidden1(inputs)
        for _ in range(1 + 3): # Book example shows a loop
            Z = self.block1(Z)
        Z = self.block2(Z)
        return self.out(Z)

### Losses and Metrics Based on Model Internals

Sometimes you want a loss or metric to be based on the internal parts of a model (e.g., hidden layer activations or weights), not just on the predictions and labels. This is often used for regularization.

You can do this by computing the loss in the `call()` method of a custom model and adding it to the model's main loss by calling the `add_loss()` method.

In [51]:
class ReconstructingRegressor(keras.Model):
    def __init__(self, output_dim, **kwargs):
        super().__init__(**kwargs)
        self.hidden = [keras.layers.Dense(30, activation="selu",
                                          kernel_initializer="lecun_normal")
                      for _ in range(5)]
        self.out = keras.layers.Dense(output_dim)
        # The reconstruction layer is created in build()

    def build(self, batch_input_shape):
        n_inputs = batch_input_shape[-1]
        self.reconstruct = keras.layers.Dense(n_inputs)
        super().build(batch_input_shape)

    def call(self, inputs):
        Z = inputs
        for layer in self.hidden:
            Z = layer(Z)
        reconstruction = self.reconstruct(Z)
        recon_loss = tf.reduce_mean(tf.square(reconstruction - inputs))
        self.add_loss(0.05 * recon_loss) # This is the custom internal loss
        return self.out(Z)

## Computing Gradients Using Autodiff

**Theoretical Explanation:**

To write a custom training loop, you need to be able to compute gradients manually. TensorFlow's autodiff feature, `tf.GradientTape`, makes this easy.

You create a `tf.GradientTape()` context, and it will automatically record every operation that involves a variable. Then, you can call its `gradient()` method to compute the gradients of a result (like the loss) with regard to the variables (like the model weights).

This is far more efficient and accurate than manual differentiation or finite difference approximation (numerical differentiation).

In [52]:
def f(w1, w2):
    return 3 * w1 ** 2 + 2 * w1 * w2

In [53]:
w1, w2 = tf.Variable(5.), tf.Variable(3.)
with tf.GradientTape() as tape:
    z = f(w1, w2)

gradients = tape.gradient(z, [w1, w2])
print(gradients)

[<tf.Tensor: shape=(), dtype=float32, numpy=36.0>, <tf.Tensor: shape=(), dtype=float32, numpy=10.0>]


By default, the tape is automatically erased immediately after `gradient()` is called. If you need to call `gradient()` more than once, you must make the tape **persistent**.

In [54]:
with tf.GradientTape(persistent=True) as tape:
    z = f(w1, w2)

dz_dw1 = tape.gradient(z, w1)
dz_dw2 = tape.gradient(z, w2) # This works now
del tape
print(dz_dw1, dz_dw2)

tf.Tensor(36.0, shape=(), dtype=float32) tf.Tensor(10.0, shape=(), dtype=float32)


The tape only tracks operations involving variables. If you want to compute gradients with respect to non-variable tensors (like constants), you must call `tape.watch()` on them.

In [55]:
c1, c2 = tf.constant(5.), tf.constant(3.)
with tf.GradientTape() as tape:
    tape.watch(c1)
    tape.watch(c2)
    z = f(c1, c2)

gradients = tape.gradient(z, [c1, c2])
print(gradients)

[<tf.Tensor: shape=(), dtype=float32, numpy=36.0>, <tf.Tensor: shape=(), dtype=float32, numpy=10.0>]


You can stop gradients from backpropagating through some part of your network by using `tf.stop_gradient()`.

In [56]:
def f(w1, w2):
    return 3 * w1 ** 2 + tf.stop_gradient(2 * w1 * w2)

with tf.GradientTape() as tape:
    z = f(w1, w2)

gradients = tape.gradient(z, [w1, w2])
print(gradients) # The gradient for w2 will be None

[<tf.Tensor: shape=(), dtype=float32, numpy=30.0>, None]


If you run into numerical issues (like `NaN` gradients), you can provide a custom, numerically stable gradient function using the `@tf.custom_gradient` decorator.

In [57]:
@tf.custom_gradient
def my_better_softplus(z):
    exp = tf.exp(z)
    def my_softplus_gradients(grad):
        return grad / (1 + 1 / exp)
    return tf.math.log(exp + 1), my_softplus_gradients

## Custom Training Loops

**Theoretical Explanation:**

Sometimes, the `fit()` method is not flexible enough. For example, the Wide & Deep paper used two different optimizers, but `fit()` only uses one. In such rare cases, you can write your own training loop.

This process is as follows:
1.  Create two nested loops: one for epochs, one for batches.
2.  Get the batch of inputs and labels.
3.  Open a `tf.GradientTape()` context.
4.  Inside the context, make a prediction (the forward pass) and compute the loss.
5.  Outside the context, use the tape to compute the gradients of the loss with regard to the model's trainable variables.
6.  Apply these gradients to the optimizer to perform a Gradient Descent step.
7.  Update your metrics and display the status.

In [58]:
# Setup a simple model (same as before)
l2_reg = keras.regularizers.l2(0.05)
model = keras.models.Sequential([
    keras.layers.Dense(30, activation="elu", kernel_initializer="he_normal",
                       kernel_regularizer=l2_reg),
    keras.layers.Dense(1, kernel_regularizer=l2_reg)
])

In [59]:
# Helper function to get a random batch of data
def random_batch(X, y, batch_size=32):
    idx = np.random.randint(len(X), size=batch_size)
    return X[idx], y[idx]

# Helper function to print the status
def print_status_bar(iteration, total, loss, metrics=None):
    metrics = " - ".join(["{}: {:.4f}".format(m.name, m.result())
                         for m in [loss] + (metrics or [])])
    end = "" if iteration < total else "\n"
    print("\r{}/{} - ".format(iteration, total) + metrics,
          end=end)

In [65]:
# Define hyperparameters and components
n_epochs = 5
batch_size = 32
n_steps = len(X_train_scaled) // batch_size
optimizer = keras.optimizers.Nadam(learning_rate=0.01)
loss_fn = tf.keras.losses.MeanSquaredError()
mean_loss = tf.keras.metrics.Mean()
metrics = [keras.metrics.MeanAbsoluteError()]

In [67]:
# The custom training loop
for epoch in range(1, n_epochs + 1):
    print("Epoch {}/{}".format(epoch, n_epochs))
    for step in range(1, n_steps + 1):
        # 1. Get the batch
        X_batch, y_batch = random_batch(X_train_scaled, y_train)

        # 2. Open the tape
        with tf.GradientTape() as tape:
            # 3. Make prediction and compute loss
            y_pred = model(X_batch, training=True)
            main_loss = tf.reduce_mean(loss_fn(y_batch, y_pred))
            loss = tf.add_n([main_loss] + model.losses)

        # 4. Compute gradients
        gradients = tape.gradient(loss, model.trainable_variables)

        # 5. Apply gradients
        optimizer.apply_gradients(zip(gradients, model.trainable_variables))

        # Handle weight constraints (if any)
        for variable in model.variables:
            if variable.constraint is not None:
                variable.assign(variable.constraint(variable))

        # 6. Update metrics and display status
        mean_loss(loss)
        for metric in metrics:
            metric(y_batch, y_pred)
        print_status_bar(step * batch_size, len(y_train), mean_loss, metrics)

    print_status_bar(len(y_train), len(y_train), mean_loss, metrics)
    for metric in [mean_loss] + metrics:
        # Workaround for AttributeError: 'Mean' object has no attribute 'reset_states'
        # Manually reset the internal state of the metric
        metric.total.assign(0.0)
        metric.count.assign(0.0)

Epoch 1/5
11610/11610 - mean: 0.6610 - mean_absolute_error: 0.5152
Epoch 2/5
11610/11610 - mean: 0.6268 - mean_absolute_error: 0.5126
Epoch 3/5
11610/11610 - mean: 0.6128 - mean_absolute_error: 0.5113
Epoch 4/5
11610/11610 - mean: 0.6405 - mean_absolute_error: 0.5179
Epoch 5/5
11610/11610 - mean: 0.6281 - mean_absolute_error: 0.5130


## TensorFlow Functions and Graphs

**Theoretical Explanation:**

While TensorFlow 2.x runs in *eager mode* by default (like regular Python), its real power comes from its graph features. When you decorate a Python function with `@tf.function`, TensorFlow "converts" it into a **TF Function**.

1.  **Tracing:** The first time you call the TF Function, TF *traces* it. It runs the function in *graph mode*, meaning each TF operation adds a node to a computation graph. The arguments are treated as symbolic tensors (placeholders with a shape and dtype, but no value).
2.  **AutoGraph:** During tracing, TF's AutoGraph feature analyzes the Python source code to capture control flow statements (`if`, `for`, `while`) and converts them into TensorFlow graph operations (e.g., `tf.cond()`, `tf.while_loop()`).
3.  **Optimization:** TensorFlow then optimizes this graph (e.g., pruning unused nodes).
4.  **Execution:** On subsequent calls *with the same input signature* (i.e., same argument types and shapes), TensorFlow uses the optimized graph to run the computations, which is much faster than running the Python code.

Keras automatically converts your model's `call()` method and all custom components (losses, metrics, etc.) into TF Functions for you.

In [68]:
def cube(x):
    return x ** 3

# Calling it with a Python number
print(cube(2))

8


In [69]:
# Calling it with a tensor
print(cube(tf.constant(2.0)))

tf.Tensor(8.0, shape=(), dtype=float32)


In [70]:
# Creating a TF Function
tf_cube = tf.function(cube)
print(tf_cube)

<tensorflow.python.eager.polymorphic_function.polymorphic_function.Function object at 0x79ad7a452c30>


In [71]:
print(tf_cube(2)) # Returns a tensor

tf.Tensor(8, shape=(), dtype=int32)


In [72]:
# You can also use it as a decorator
@tf.function
def tf_cube(x):
    return x ** 3

# Check the original Python function
tf_cube.python_function(2)

8

### AutoGraph and Tracing

If you add a `print()` statement to a TF function, it will only run during tracing, not during execution. This is because `print()` is a Python side effect, not a TF operation.

In [73]:
@tf.function
def tf_cube(x):
    print("x =", x)
    return x ** 3

result = tf_cube(tf.constant(2.0))
print(result)

x = Tensor("x:0", shape=(), dtype=float32)
tf.Tensor(8.0, shape=(), dtype=float32)


In [74]:
result = tf_cube(tf.constant(3.0)) # No print, reuses the graph

In [75]:
result = tf_cube(tf.constant(4.0)) # No print, reuses the graph

In [76]:
# A Python value creates a new signature, so it traces again
result = tf_cube(2)
result = tf_cube(3)

x = 2
x = 3


In [77]:
# A new shape also creates a new signature
result = tf_cube(tf.constant([[1., 2.]]))

x = Tensor("x:0", shape=(1, 2), dtype=float32)


In [78]:
# A more general shape (None) is created
result = tf_cube(tf.constant([[3., 4.], [5., 6.]]))



x = Tensor("x:0", shape=(2, 2), dtype=float32)


### TF Function Rules

To ensure your function can be converted, you must follow some rules:

1.  **Use TF ops:** Only TensorFlow constructs (tensors, ops, variables) can be part of the graph. If you use external libraries (like NumPy) or Python functions (like `print()`), they will **only run during tracing**, not during graph execution.
2.  **`tf.Variable` creation:** Create variables *outside* the TF function (e.g., in the `__init__` or `build` method), ideally on the first call only. To modify a variable, use its `.assign()` method, not the `=` operator.
3.  **Source Code:** TensorFlow needs access to the function's source code to use AutoGraph.
4.  **Loops:** Use `tf.range()` instead of Python's `range()` if you want the loop to be *dynamic* (part of the graph). If you use `range()`, the loop will be *static* (unrolled during tracing).