# Custom Models and Training with Tensorflow

In [17]:
import tensorflow as tf
import numpy as np
from tensorflow import keras

## Using TensorFlow like Numpy

TensorFlow's API revolves around *tensors*, which flow from operation to operation, hence the name Tensor*flow*. A tensor is usually a multidimensional array (exactly like Numpy `ndarray`), but it can also hold a scalar (a simple, value such as 42).

These tensors will be important when we create custom cost functions, custom metrics, custom layers , and more. Let's see how to create and manipulate them.

### Tensor and Operations

We can create a tensor with `tf.constant()`.

In [18]:
t = tf.constant([[1., 2., 3.], [4., 5., 6.]]) # Here's a tensor representing matrix with two rows and three columns of floats
t

<tf.Tensor: shape=(2, 3), dtype=float32, numpy=
array([[1., 2., 3.],
       [4., 5., 6.]], dtype=float32)>

In [19]:
tf.constant(42) #scalar

<tf.Tensor: shape=(), dtype=int32, numpy=42>

Just like `ndarray`, a `tf.Tensor` has a shape and a datatype (dtype):

In [20]:
t.shape

TensorShape([2, 3])

In [21]:
t.dtype

tf.float32

Indexing works much like NumPy:

In [22]:
t[:, 1:]

<tf.Tensor: shape=(2, 2), dtype=float32, numpy=
array([[2., 3.],
       [5., 6.]], dtype=float32)>

In [23]:
t[..., 1, tf.newaxis] # ... is same as :

<tf.Tensor: shape=(2, 1), dtype=float32, numpy=
array([[2.],
       [5.]], dtype=float32)>

In [24]:
t[:, 1, tf.newaxis]

<tf.Tensor: shape=(2, 1), dtype=float32, numpy=
array([[2.],
       [5.]], dtype=float32)>

Most importantly, all sorts of tensor operations are available:

In [25]:
t + 10

<tf.Tensor: shape=(2, 3), dtype=float32, numpy=
array([[11., 12., 13.],
       [14., 15., 16.]], dtype=float32)>

In [26]:
tf.square(t)

<tf.Tensor: shape=(2, 3), dtype=float32, numpy=
array([[ 1.,  4.,  9.],
       [16., 25., 36.]], dtype=float32)>

In [27]:
t @ tf.transpose(t) # @ is for multiplication, equivalent to tf.matmul()

<tf.Tensor: shape=(2, 2), dtype=float32, numpy=
array([[14., 32.],
       [32., 77.]], dtype=float32)>

Note that writing `t + 10` is equivalent to calling `tf.add(t, 10)`

### Tensor and Numpy

Tensors play nice with Numpy: we can create a tensor from Numpy array and vice versa. We can even apply TensorFlow operations to NumPy arrays and NumPy operations to tensors:

In [28]:
a = np.array([2., 4., 5.])
tf.constant(a)

<tf.Tensor: shape=(3,), dtype=float64, numpy=array([2., 4., 5.])>

In [29]:
t.numpy() # or np.array(t)

array([[1., 2., 3.],
       [4., 5., 6.]], dtype=float32)

In [30]:
tf.square(a)

<tf.Tensor: shape=(3,), dtype=float64, numpy=array([ 4., 16., 25.])>

In [31]:
np.square(t)

array([[ 1.,  4.,  9.],
       [16., 25., 36.]], dtype=float32)

**WARNING:**

Notice that Numpy uses a 64-bit precision by default, while TensorFlow uses 32-bit. This is because 32-bit precision is generally more than enough for neural networks, plus it runs faster and uses less RAM. So when we create a tensor from Numpy array, make sure to set `dtype=tf.float32`

### Type Conversions

Type conversions can significantly hurt performance, and they can easily gets unnoticed when they are done automatically. To avoid this, TensorFlow does not perform any type conversion automatically: it just raises an exception if we try to execute an operation on tensors with compatible types. For example: we cannot add a float tensor and an integer tensor, and we cannot even add a 32-bit float and a 64-bit float:

In [32]:
tf.constant(2.) + tf.constant(40)

InvalidArgumentError: cannot compute AddV2 as input #1(zero-based) was expected to be a float tensor but is a int32 tensor [Op:AddV2] name: 

In [33]:
try:
    tf.constant(2.) + tf.constant(40)
except Exception as e:
    print(e)

cannot compute AddV2 as input #1(zero-based) was expected to be a float tensor but is a int32 tensor [Op:AddV2] name: 


In [34]:
try:
    tf.constant(2.) + tf.constant(40., dtype=tf.float64)
except Exception as e:
    print(e)

cannot compute AddV2 as input #1(zero-based) was expected to be a float tensor but is a double tensor [Op:AddV2] name: 


This may be a bit annoying at first, but remember that it's for good cause! And ofcourse we can use `tf.cast()` when we really need to convert types:

In [35]:
t2 = tf.constant(40, dtype=tf.float64)
tf.constant(2.0) + tf.cast(t2, tf.float32)

<tf.Tensor: shape=(), dtype=float32, numpy=42.0>

### Variables

The `tf.Tensor` values we've seen so far are immutables: we cannot modify them. This means that we cannot use regular tensors to implement weights in neural networks, since they needs to be tweaked by backpropogation. What we need is `tf.Variable`

In [36]:
v = tf.Variable([[1, 2, 3], [4, 5, 6]])
v

<tf.Variable 'Variable:0' shape=(2, 3) dtype=int32, numpy=
array([[1, 2, 3],
       [4, 5, 6]], dtype=int32)>

A `tf.Variable` acts much like a `tf.Tensor`: we can perform the same operations with it, it plays nicely with Numpy as well, and it is just as picky with types. But it can also be modified in place using `assign()` method (direct assignment will not work)

In [37]:
v.assign(2*v)
v

<tf.Variable 'Variable:0' shape=(2, 3) dtype=int32, numpy=
array([[ 2,  4,  6],
       [ 8, 10, 12]], dtype=int32)>

In [38]:
v[0,1].assign(42)

<tf.Variable 'UnreadVariable' shape=(2, 3) dtype=int32, numpy=
array([[ 2, 42,  6],
       [ 8, 10, 12]], dtype=int32)>

In [39]:
v[:, 2].assign([0, 1])

<tf.Variable 'UnreadVariable' shape=(2, 3) dtype=int32, numpy=
array([[ 2, 42,  0],
       [ 8, 10,  1]], dtype=int32)>

In [40]:
v.scatter_nd_update(indices=[[0,0], [1,2]], updates=[100, 200])

<tf.Variable 'UnreadVariable' shape=(2, 3) dtype=int32, numpy=
array([[100,  42,   0],
       [  8,  10, 200]], dtype=int32)>

**NOTE:**

In practice we will rarely have to create variables manually.

### Other Data Structures

- Sparse tensors (`tf.SparseTensor`) => Tensors containing mostly zeros
- Tensor Arrays (`tf.TensorArray`) => List of tensors
- Ragged Tensors (`tf.RaggedTensor`) => static list of lists of tensors. Every tensor has same shape and data type
- String Tensors => Regular tensor of type `tf.string`
- Sets
- Queues

## Customizing Model and Training Algorithms

Let's start by creating a custom loss function, which is simple and common use case.

Let's start by loading and preparing the California housing dataset. We first load it, then split it into a training set, a validation set and a test set, and finally we scale it:

In [41]:
from sklearn.datasets import fetch_california_housing
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler

housing = fetch_california_housing()
X_train_full, X_test, y_train_full, y_test = train_test_split(
    housing.data, housing.target.reshape(-1, 1), random_state=42)
X_train, X_valid, y_train, y_valid = train_test_split(
    X_train_full, y_train_full, random_state=42)

scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_valid_scaled = scaler.transform(X_valid)
X_test_scaled = scaler.transform(X_test)

### Custom Loss Function

Suppose we want to train a linear regression model, but our training set is a bit noisy. Ofcourse, we start by trying to clean up the dataset by removing or fixing the outliers, but that turns out to be insufficient; the dataset is still noisy. 

Which loss function should we use? The mean squared error might penalize large errors too much and cause model to be imprecise. The mean absolute error would not penalize outliers as much, but training might take a while to converge, and trained model not be very precise. This is probably good time to use Huber loss instead of the good old MSE. 

The Huber loss is not currently part of the official Keras API, but it is available in tf.keras (just use an instance of the `keras.losses.Huber` class). But let's pretend it's not there: implementing it is easy as pie!.

Just create a function that takes the labels and parameters as arguments, and use TensorFlow operations to compute every instance's loss:

Hubber Loss:
$$
L_{\delta}(a) =
\begin{cases}
    \frac{1}{2} a^2 & \text{for |a|} \le \delta \\
    \delta \cdot (|a| - \frac{1}{2} \cdot \delta) , & otherwise
\end{cases}
$$

In [42]:
def huber_loss_fn(y_true, y_pred):
    error = y_true - y_pred
    is_small_error = tf.abs(error) < 1 # here the delta = 1
    squared_loss = tf.square(error) / 2
    linear_loss = 1 * (tf.abs(error) - (0.5 * 1))
    return tf.where(is_small_error, squared_loss, linear_loss) # returns squared loss when error is small, else returns linear loss

It is also preferable to return a tensor containing one loss per instance, rather than returning the mean loss. This way, Keras can apply class weights or sample weights when requested.

Now we can use this loss when we compile the Keras model, then train our model. When compiling the model, just pass this function to `loss` argument like this:
```
model.compile(loss=huber_loss_fn, optimizer="nadam")
```

In [43]:
input_shape = X_train.shape[1:] # (8,1)

model = keras.models.Sequential([
    keras.layers.Dense(30, activation="selu", kernel_initializer="lecun_normal",
                      input_shape=input_shape),
    keras.layers.Dense(1)
])

model.compile(loss=huber_loss_fn, optimizer="nadam", metrics=["mae"])

model.fit(X_train_scaled, y_train, epochs=2, validation_data=(X_valid_scaled, y_valid))

Epoch 1/2
Epoch 2/2


<keras.src.callbacks.History at 0x7f90e4c42090>

And that's it! For each batch during training, Keras will call the `huber_fn()` to compute the loss and use it to perform a Gradient Descent step. Moreover, it will keep track of the total loss since the beginning of the epoch, and it will display the mean loss. 

But what happens to this custom loss when we save the model?

### Saving and Loading Models That Contain Custom Components

In [44]:
model.save("my_model_with_a_custom_loss.keras")

Saving a model containing custom loss function works fine, as Keras saves the name of the function. Whenever we load it, we we'll need to provide a dictionary that maps the function name to the actual function.

More generally, when we load a model containing custom objects, we need to map the names of the objects:

In [45]:
model = keras.models.load_model("my_model_with_a_custom_loss.keras", 
                               custom_objects={"huber_loss_fn": huber_loss_fn})

In [46]:
model.fit(X_train_scaled, y_train, epochs=2, validation_data=(X_valid_scaled, y_valid))

Epoch 1/2
Epoch 2/2


<keras.src.callbacks.History at 0x7f90e49d1890>

With the current implementation, any error between -1 and 1 is considered "small". But what if we want a different threshold? One solution is to create a function that creates a configured loss function:

In [47]:
def create_huber(threshold=1.0):
    def huber_fn(y_true, y_pred):
        error = y_true - y_pred
        is_small_error = tf.abs(error) < threshold 
        squared_loss = tf.square(error) / 2
        linear_loss = threshold * tf.abs(error) - threshold**2 / 2
        return tf.where(is_small_error, squared_loss, linear_loss)
    return huber_fn

In [48]:
model.compile(loss=create_huber(2.0), optimizer="nadam", metrics=["mae"])

In [49]:
model.fit(X_train_scaled, y_train, epochs=2, validation_data=(X_valid_scaled, y_valid))

Epoch 1/2
Epoch 2/2


<keras.src.callbacks.History at 0x7f9104e0efd0>

In [50]:
model.save("my_model_with_custom_loss_threshold_2.keras")

Unfortunately, when we save the model, the `threshold` will not be saved. This means that we will have to specify the `threshold` value when loading the model (note that the name to use is "`huber_fn`", which is the name of the function we gave Keras, not the name of the function that created it):

In [51]:
model = keras.models.load_model("my_model_with_custom_loss_threshold_2.keras",
                               custom_objects={"huber_fn": create_huber(2.0)})

In [52]:
model.fit(X_train_scaled, y_train, epochs=2, validation_data=(X_valid_scaled, y_valid))

Epoch 1/2
Epoch 2/2


<keras.src.callbacks.History at 0x7f9104ce6fd0>

We can solve this by creating a subclass of `keras.losses.Loss` class, and then implementing its `get_config()` method:

In [53]:
class HuberLoss(keras.losses.Loss):
    def __init__(self, threshold=1.0, **kwargs):
        self.threshold = threshold
        super().__init__(**kwargs)
    def call(self, y_true, y_pred):
        error = y_true - y_pred
        is_small_error = tf.abs(error) < self.threshold 
        squared_loss = tf.square(error) / 2
        linear_loss = self.threshold * tf.abs(error) - self.threshold**2 / 2
        return tf.where(is_small_error, squared_loss, linear_loss)
    def get_config(self):
        base_config = super().get_config()
        return {**base_config, "threshold": self.threshold}
        

Explanation of above code:

- The constructor accepts ``**kwargs` and passes them to the parent constructor, which handles standard hyperparameters: the `name` of the loss and the `reduction` algorithm to use to aggregate the individual instance losses. By default, it is ``"sum_over_batch_size"`, which means that the loss will be the sum of the instance losses, weighted by the sample weights, if any, and divided by the batch size (not by the sum of weights, so this is not the weighted mean). Other possible values are `"sum"` and `None`.

- The `call()` method takes the labels and predictions, computes all the instance losses, and returns them.

- The `get_config()` method returns a dictionary mapping each hyperparameter name to its value. It first calls the parent class’s `get_config()` method, then adds the new hyperparameters to this dictionary.

We can than use any instance of the class when we compile the model:

In [54]:
model = keras.models.Sequential([
    keras.layers.Dense(30, activation="selu", kernel_initializer="lecun_normal",
                       input_shape=input_shape),
    keras.layers.Dense(1),
])

model.compile(loss=HuberLoss(2.0), optimizer="nadam", metrics=["mae"])

h = model.fit(X_train_scaled, y_train, epochs=2, validation_data=(X_valid_scaled, y_valid))

Epoch 1/2
Epoch 2/2


When we save the model, the threshold will get saved along with it.

In [55]:
model.save("my_model_with_custom_loss_class.kears")

INFO:tensorflow:Assets written to: my_model_with_custom_loss_class.kears/assets


INFO:tensorflow:Assets written to: my_model_with_custom_loss_class.kears/assets


And when we load the model, we just need to map the class name to class itself:

In [56]:
model = keras.models.load_model("my_model_with_custom_loss_class.keras",
                               custom_objects={"HuberLoss": HuberLoss})

When we save a model, Keras calls the loss instance's `get_config()` method and saves the config as the JSON. When we load the model, it calls the `from_config()` class method on `HuberLoss` class: this method is implemented by the base class (Loss) and creates an instance of the class, passing `**config` to the constructor. 

That's it for losses! Similar for activation functions, initializers, regularizers and constraints. Let's look at these now.

### Custom Activation Functions, Initializers, Regularizers and Constraints

Most of the Keras functionalities, such as losses, regularizers, constraints, initializers, metrics, activation functions, layers, and even full models, can be customized in very much same way. Most of the time, we just need to write a simple function with appropriate inputs and outputs.

Here are some examples:

Custom activation function `softplus`:

$$
 Y = log(1 + e^x)
$$

`softplus` is smooth continuous version of ReLU

In [57]:
def my_softplus(z):
    return tf.math.log(tf.exp(z) + 1.0) # same as tf.nn.softplus(z)

A custom Glorot Initializer:

$$
 \text{Normal distribution with mean 0 and variance } {\sigma}^2 = \frac{1}{fan_{avg}} \\
  \text{where, } fan_{avg} = \frac{fan_{in} + {fan_{out}}}{2}
$$

In [58]:
def my_glorot_initializer(shape, dtype=tf.float32):
    stddev = tf.sqrt( 2 / (shape[0]  + shape[1]))
    #tf.random.normal outputs random values from a normal distribution
    return tf.random.normal(shape=shape, mean=0.0, stddev=stddev, dtype=dtype) # same as keras.initializers.glorot_normal()

A custom $l_1$ regularizer:
$$
    Penalty = \lambda \cdot \sum_{i=1}^N |w_i|
$$

In [59]:
def my_l1_regularizer(weights):
    # here lambda = 0.01
    """
    this tf.reduce_sum() will calculate the sum of all the values in the tensor.
    For example:
    t = [[1,1,1], [1,1,1]]
    tf.reduce_sum(t) ==> 6 
    
    So here, this tf.reduce_sum() acts as sigma (summation)
    """
    return tf.reduce_sum(tf.abs(0.01 * weights)) # eqv to keras.regularizer.l1(0.01)

A custom constraint that ensures weights are all positive:

In [67]:
def my_positive_weights(weights):
    """
    tf.zeros_like creates a tensor with all elements set to zero
    """
    # return value is same as tf.nn.relu()
    return tf.where(weights < 0., tf.zeros_like(weights), weights) # eqv to keras.constraints.nonneg() or tf.nn.relu()

These custom function then can be used normally.

In [68]:
layer = keras.layers.Dense(1, activation=my_softplus, kernel_initializer=my_glorot_initializer,
                          kernel_regularizer=my_l1_regularizer, 
                          kernel_constraint=my_positive_weights)

The activation function will be applied to the output of this `Dense` layer, and its result will be passed on to the next layer. The layer's weights will be initialized using the value returned by the initializer. At each training step the weights will be passed to the regularization function to compute the regularization loss, which will be added to the main loss to get the final loss used for training. Finally, the constraint function will be called after each training step, and the layer's weights will be replaced by the constrained weights.

In [70]:
keras.backend.clear_session()
tf.random.set_seed(42)
np.random.seed(42)

model = keras.models.Sequential([
    keras.layers.Dense(30, activation="selu", kernel_initializer="lecun_normal",
                      input_shape=input_shape),
    keras.layers.Dense(1, activation=my_softplus, kernel_initializer=my_glorot_initializer,
                          kernel_regularizer=my_l1_regularizer, 
                          kernel_constraint=my_positive_weights)
])

model.compile(loss="mse", optimizer="nadam", metrics=["mae"])

h = model.fit(X_train_scaled, y_train, epochs=2, validation_data=(X_valid_scaled, y_valid))

Epoch 1/2
Epoch 2/2


In [71]:
model.save("my_model_with_many_custom_parts.keras")

In [72]:
model = keras.models.load_model("my_model_with_many_custom_parts.keras",
                               custom_objects={
                                   "my_l1_regularizer": my_l1_regularizer,
                                   "my_positive_weights": my_positive_weights,
                                   "my_glorot_initializer": my_glorot_initializer,
                                   "my_softplus": my_softplus
                               })

If the function has hyperparameters that need to be saved along with a model, then we will want to subclass the appropriate class, such as `keras.regularizers.Regulizer`, `keras.constraints.Constraint`, `keras.initializers.Initializer` or `keras.layers.Layer` (for any layer including activation functions). Much like we did for custom loss, here's a simple class for $l_1$ regularization that saves its `factor` hyperparameter (this time we do not need to call the parent constructor or the `get_config()` method, as they are not defined by the parent class):

In [73]:
class MyL1Regularizer(keras.regularizers.Regularizer):
    def __init__(self, factor):
        self.factor = factor
    def __call__(self, weights):
        return tf.reduce_sum(self.factor * tf.abs(weights))
    def get_config(self):
        return {"factor": self.factor}

Note that we must implement the `call()` method for losses, layers (including activation functions), and models, or the `__call__()` method for regularizers, initializers, and constraints. For metrics, things are bit different, we will see them in a while.

In [74]:
keras.backend.clear_session()
tf.random.set_seed(42)
np.random.seed(42)

model = keras.models.Sequential([
    keras.layers.Dense(30, activation="selu", kernel_initializer="lecun_normal",
                      input_shape=input_shape),
    keras.layers.Dense(1, activation=my_softplus, 
                       kernel_initializer=my_glorot_initializer,
                       kernel_regularizer=MyL1Regularizer(0.01),
                       kernel_constraint=my_positive_weights)
])

model.compile(loss="mse", optimizer="nadam", metrics=["mae"])

h = model.fit(X_train_scaled, y_train, epochs=2, validation_data=(X_valid_scaled, y_valid))

Epoch 1/2
Epoch 2/2


In [78]:
model.save("my_model_with_many_custom_parts.keras")

In [79]:
model = keras.models.load_model("my_model_with_many_custom_parts.keras",
                               custom_objects={
                                   "MyL1Regularizer": MyL1Regularizer,
                                   "my_positive_weights": my_positive_weights,
                                   "my_glorot_initializer": my_glorot_initializer,
                                   "my_softplus": my_softplus
                               })

### Custom Metrics

In [84]:
keras.backend.clear_session()
tf.random.set_seed(42)
np.random.seed(42)

model = keras.models.Sequential([
    keras.layers.Dense(30, activation="selu", kernel_initializer="lecun_normal",
                      input_shape=input_shape),
    keras.layers.Dense(1)
])

In most cases, defining a custom metric function is exactly the same as defining a custom loss function. In fact, we could even use the Huber loss function we created earlier as a metric; (however, Huber loss is seldom used as metric, MSE or MAE is preferred) it would just work fine:

In [85]:
model.compile(loss="mse", optimizer="nadam", metrics=[create_huber(2.0)])

In [86]:
model.fit(X_train_scaled, y_train, epochs=2)

Epoch 1/2
Epoch 2/2


<keras.src.callbacks.History at 0x7f90d5e48b10>

For each batch during training, Keras will compute this metric and keep track of its mean since the beginning of the epoch. Most of the time, this is exactly what we want. But not always!

Consider a binary classifier's precision, for example. As we saw in Chapter 3, precision is the number of true positives, divided by the number of positive predictions (including both true and false positives). Suppose the model made five positive predictions in first batch, four of which were correct: that's 80% prediction. 

Then suppose model made three positive predictions in the second batch, but they all were incorrect: that's 0% precision for the second batch. If we compute the mean of these two precisions, we get 40%. 

But wait a second - that's *not* the model's precision over these two batches! Indeed, there were total of 4 true positives (4 + 0) out of 8 positive predictions (5 + 3), so the overall precision is 50% and not 40%. 

What we need is an object that can keep track of the number of true positives and the number of false positives and that can compute their ratios when requested. 

This is precisely what the `keras.metrics.Precision` class does:

In [88]:
precision = keras.metrics.Precision()
precision([0, 1, 1, 1, 0, 1, 0, 1], [1, 1, 0, 1, 0, 1, 0, 1]) # labels and predictions for 1st batch

<tf.Tensor: shape=(), dtype=float32, numpy=0.8>

In [89]:
precision([0, 1, 0, 0, 1, 0, 1, 1], [1, 0, 1, 1, 0, 0, 0, 0]) # labels and predictions for 2nd batch

<tf.Tensor: shape=(), dtype=float32, numpy=0.5>

In this example, we create a `Precision` object, then we used it like a function, passing it a label and predictions for the first batch, then for the second batch (we could also have passed sample weights). We used the same number of true and false positives as in the example we just discussed. 

After the first batch, it returns a precision of 80%; then after the second batch, it returns 50% (which is overall precision so far, not the second batch's precision). 

This is called `streaming metric` or (`stateful metric`), as it is gradually updated, batch after batch.

At any point, we can call the `result()` method to get the current value of the metric.

In [91]:
precision.result()

<tf.Tensor: shape=(), dtype=float32, numpy=0.5>

We can also look at its variables (tracking the number of true and false positives) by using the `variables` attribute.

In [92]:
precision.variables

[<tf.Variable 'true_positives:0' shape=(1,) dtype=float32, numpy=array([4.], dtype=float32)>,
 <tf.Variable 'false_positives:0' shape=(1,) dtype=float32, numpy=array([4.], dtype=float32)>]

We can reset these variables using the `rest_states()` method:

In [94]:
precision.reset_states() # both variables will get reset to 0.0

In [96]:
precision.variables

[<tf.Variable 'true_positives:0' shape=(1,) dtype=float32, numpy=array([0.], dtype=float32)>,
 <tf.Variable 'false_positives:0' shape=(1,) dtype=float32, numpy=array([0.], dtype=float32)>]

If we need to create such a streaming metric, create a subclass of `keras.metrics.Metric` class. 

Here's the simple example that keeps track of the total Huber loss and the number of instances seen so far. When asked for result, it returns the ratio, which is simply the mean Huber loss:

In [114]:
class HuberMetric(keras.metrics.Metric):
    def __init__(self, threshold=1.0, **kwargs):
        super().__init__(**kwargs) # handles base args (e.g dtype)
        self.threshold = threshold
        self.huber_fn = create_huber(threshold)
        # add_weight adds a new variable to the layer
        self.total = self.add_weight("total", initializer="zeros")
        self.count = self.add_weight("count", initializer="zeros")
    def update_state(self, y_true, y_pred, sample_weight=None):
        """
        This method is called when we use an instance of this class as function (as we did with
        `Precision` object). It updates the variables, given the labels and predictions for one batch
        (and sample weights, but in this case we ignore them). 
        """
        metric = self.huber_fn(y_true, y_pred)
        # assign_add() is to update the variable. Since in tf, direct updation is not allowed
        self.total.assign_add(tf.reduce_sum(metric))
        # tf.size() => Returns a 0-D Tensor representing the number of elements
        self.count.assign_add(tf.cast((tf.size(y_true)), tf.float32))
    def result(self):
        """
        Computes and returns the final result, in this case the mean Huber metric over all instances.
        When we use the metric as a function, the `update_state()` method gets called first, then the
        `result()` method is called, and its output is returned
        """
        return self.total / self.count
    def get_config(self):
        """
        Implemented this method to ensure that `threshold` gets saved along with the model. 
        """
        base_config = super().get_config()
        return {**base_config, "threshold": self.threshold}

In [102]:
m = HuberMetric(2.0)

"""
error (10 - 2 = 8) > threshold (2) . Therefore it will act linearly
 total = 2 * (|10-2| - 2/2) = 14
 count = 1
 result = 14 / 1 = 14
"""

# y_true, y_pred
m(tf.constant([[2.]]), tf.constant([[10.]]))

<tf.Tensor: shape=(), dtype=float32, numpy=14.0>

In [103]:
# total = total + (|1-0|^2 / 2) + (2 * (|9.25 - 5| - 2/2)) = 14 + 0.5 + 6.5 = 21
# count = count + 2 = 1 + 2 = 3
# result = total / count = 21 / 3 = 7

m(tf.constant([[0.], [5.]]) , tf.constant([[1.], [9.25]]))

<tf.Tensor: shape=(), dtype=float32, numpy=7.0>

In [105]:
m.result()

<tf.Tensor: shape=(), dtype=float32, numpy=7.0>

In [106]:
m.variables

[<tf.Variable 'total:0' shape=() dtype=float32, numpy=21.0>,
 <tf.Variable 'count:0' shape=() dtype=float32, numpy=3.0>]

In [107]:
m.reset_states()

In [108]:
m.variables

[<tf.Variable 'total:0' shape=() dtype=float32, numpy=0.0>,
 <tf.Variable 'count:0' shape=() dtype=float32, numpy=0.0>]

Let's check that the `HuberMetric` class works well:

In [115]:
keras.backend.clear_session()
tf.random.set_seed(42)
np.random.seed(42)

model = keras.models.Sequential([
    keras.layers.Dense(30, activation="selu", kernel_initializer="lecun_normal", 
                      input_shape=input_shape),
    keras.layers.Dense(1)
])

model.compile(loss=create_huber(2.0), optimizer="nadam", metrics=[HuberMetric(2.0)])

h = model.fit(X_train_scaled.astype(np.float32), y_train.astype(np.float32), epochs=2)

Epoch 1/2
Epoch 2/2


In [116]:
model.save("my_model_with_a_custom_metric.keras")

In [117]:
model = keras.models.load_model("my_model_with_a_custom_metric.keras",
                               custom_objects={
                                   "huber_fn": create_huber(2.0),
                                   "HuberMetric": HuberMetric
                               })

In [118]:
model.fit(X_train_scaled.astype(np.float32), y_train.astype(np.float32), epochs=2)

Epoch 1/2
Epoch 2/2


<keras.src.callbacks.History at 0x7f90d450fd90>

In [120]:
model.metrics

[<keras.src.metrics.base_metric.Mean at 0x7f90d6a81dd0>,
 <__main__.HuberMetric at 0x7f90d46a1ed0>]

In [121]:
model.metrics[-1].threshold

2.0

When we define a metric using a simple function, Keras automatically calls it for each batch, and it keeps track of the mean durin each epoch, just like we did manually. So the only benefit of our `HuberMetric` class is that the `threshold` will be saved. But of course, some metrics like precision, cannot simply be averaged over batches, in those cases, there's no other option than to implement a streaming metric. 

More simply, we could have the created the class like this:

In [127]:
class HuberMetric(keras.metrics.Mean):
    def __init__(self, threshold=1.0, name="HuberMetric", dtype=None):
        self.threshold = threshold
        self.huber_fn = create_huber(threshold)
        super().__init__(name=name, dtype=dtype)
    def update_state(self, y_true, y_pred, sample_weight=None):
        metric = self.huber_fn(y_true, y_pred)
        super(HuberMetric, self).update_state(metric, sample_weight)
    def get_config(self):
        base_config = super().get_config()
        return {**base_config, "threshold": self.threshold}

This class handles shapes better, and it also supports sample weights.

In [128]:
keras.backend.clear_session()
tf.random.set_seed(42)
np.random.seed(42)

model = keras.models.Sequential([
    keras.layers.Dense(30, activation="selu", kernel_initializer="lecun_normal",
                      input_shape=input_shape),
    keras.layers.Dense(1)
])

model.compile(loss=keras.losses.Huber(2.0), optimizer="nadam", metrics=[HuberMetric(2.0)])

sample_weight = np.random.rand(len(y_train))
h = model.fit(X_train_scaled.astype(np.float32), y_train.astype(np.float32),
             epochs=2, sample_weight=sample_weight)

Epoch 1/2
Epoch 2/2


In [129]:
h.history["loss"][0] , h.history["HuberMetric"][0] * sample_weight.mean()

(0.37584829330444336, 0.37724936285833965)

In [130]:
model.save("my_model_with_a_custom_metric_v2.keras")

In [131]:
model = keras.models.load_model("my_model_with_a_custom_metric_v2.keras",
                               custom_objects={
                                   "HuberMetric": HuberMetric
                               })

In [132]:
model.fit(X_train_scaled.astype(np.float32), y_train.astype(np.float32), epochs=2)

Epoch 1/2
Epoch 2/2


<keras.src.callbacks.History at 0x7f908c6e7890>

In [137]:
model.metrics[-1].threshold

2.0

Now we have built a streaming metric, building a custom layer will seem like a walk in park. 

### Custom Layers