In [1]:
import tensorflow as tf
import numpy as np
from sklearn.datasets import fetch_california_housing
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from tqdm.notebook import trange
from collections import OrderedDict

2023-06-24 20:06:20.788729: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [2]:
t = tf.constant([[1.0, 2.0, 3.0], [4.0, 5.0, 6.0]])  # 2x3 tensor of floats
t

<tf.Tensor: shape=(2, 3), dtype=float32, numpy=
array([[1., 2., 3.],
       [4., 5., 6.]], dtype=float32)>

In [3]:
# just like ndarrays, tensors have shape and data type
# indexing works just like ndarrays
t[:, 1:]

<tf.Tensor: shape=(2, 2), dtype=float32, numpy=
array([[2., 3.],
       [5., 6.]], dtype=float32)>

In [4]:
t[..., 1, tf.newaxis]  # equivalent to t[:, 1:2]

<tf.Tensor: shape=(2, 1), dtype=float32, numpy=
array([[2.],
       [5.]], dtype=float32)>

In [5]:
# there are many tensor operations
t + 10
# there are many aliases for operations such as tf.add() and tf.math.add()

<tf.Tensor: shape=(2, 3), dtype=float32, numpy=
array([[11., 12., 13.],
       [14., 15., 16.]], dtype=float32)>

In [6]:
tf.square(t)

<tf.Tensor: shape=(2, 3), dtype=float32, numpy=
array([[ 1.,  4.,  9.],
       [16., 25., 36.]], dtype=float32)>

In [7]:
# matrix multiplication is done with the matmul() function
t @ tf.transpose(t)

<tf.Tensor: shape=(2, 2), dtype=float32, numpy=
array([[14., 32.],
       [32., 77.]], dtype=float32)>

In [8]:
# a tensor can hold a scaler value in this case the shqape is empty
tf.constant(42)

<tf.Tensor: shape=(), dtype=int32, numpy=42>

In [9]:
# the keras low lovel api located in td.keras.backend this is usually imported as K for convenience
# now that keras is tensor flow we should just use tf low level api directly
# tf.reduce_mean is the same as np.mean and tf.reduce_sum is the same as np.sum and so on
# tensors work well with numpy you can create a tensor from a numpy array and vice versa and even apply tensor flow operations to numpy arrays
a = np.array([2.0, 4.0, 5.0])
tf.constant(a)

<tf.Tensor: shape=(3,), dtype=float64, numpy=array([2., 4., 5.])>

In [10]:
t.numpy()

array([[1., 2., 3.],
       [4., 5., 6.]], dtype=float32)

In [11]:
tf.square(a)

<tf.Tensor: shape=(3,), dtype=float64, numpy=array([ 4., 16., 25.])>

In [12]:
np.square(t)
# keep in mind that numpy uses 64 bit precision by default while tensor flow uses 32 bit by default because it runs faster and uses less ram
# when you create a tensor from a numpy array make sure to set the dtype argument to 32 bit otherwise tensorflow will create a 64 bit tensor by default

array([[ 1.,  4.,  9.],
       [16., 25., 36.]], dtype=float32)

In [13]:
# tensor flow does not do any type conversions it just raises an exceeption if you try to execute an operation on tensors with incompatible types
# tf.constant(2.) + tf.constant(40)
# both of these will raise errors because they are different types
# tf.constant(2.) + tf. constant(40, dtype=tf.float64)

In [14]:
# you can use tf.cast() to convert types
t2 = tf.constant(40.0, dtype=tf.float64)
tf.constant(2.0) + tf.cast(t2, tf.float32)

<tf.Tensor: shape=(), dtype=float32, numpy=42.0>

Variables

In [15]:
# the tf.tensor values that we have seen so far are immutable meaning that you cannot modify them so we need to use tf.Variable instead
v = tf.Variable([[1.0, 2.0, 3.0], [4.0, 5.0, 6.0]])
v

<tf.Variable 'Variable:0' shape=(2, 3) dtype=float32, numpy=
array([[1., 2., 3.],
       [4., 5., 6.]], dtype=float32)>

In [16]:
# a variable acts much like a tensor and you can perform the same operations on it but you can also modify it's value by using the assign() method
# this changes the value of the variable in place without creating a new tensor object you can also use the assign_add() or assign_sub() methods
v.assign(2 * v)  # v now eauals [[2., 4., 6.], [8., 10., 12.]]
v[0, 1].assign(42)  # v now equals [[2., 42., 6.], [8., 10., 12.]]
v[:, 2].assign([0.0, 1.0])  # v now equals [[2., 42., 0.], [8., 10., 1.]]
v.scatter_nd_update(
    indices=[[0, 0], [1, 2]], updates=[100.0, 200.0]
)  # v now equals [[100., 42., 0.], [8., 10., 200.]]
# direct assignments such as v[1] = [7., 8., 9.] do not work
# in practice you will rarely have to create variable manually since keras provides an add_weight() method that will take care of it for you

<tf.Variable 'UnreadVariable' shape=(2, 3) dtype=float32, numpy=
array([[100.,  42.,   0.],
       [  8.,  10., 200.]], dtype=float32)>

OTHER DATA STRUCTURES

In [17]:
# sparce tensors are tensors that have a lot of zero values
# tensor arrays are lists of tensors that have the same shape and data type
# ragged tensors are tensors that have one or more ragged dimensions meaning that along at least one axis some rows have a different number of elements than others
# string tensors are regular tensors of type string they are usually created by encoding unicode strings
# sets are are represented as regular tensors for example tf.constant([[1, 2], [3, 4]]) represents the set {1, 2} {3, 4} each set is reppresented by a vector in the tensors last axis
# queues store tensors across multiple setps FIFO, Priority, Random Shuffle and Padding queues are available

Customizing models and training algorithms

In [18]:
# custom loss functions
# training set is noisy so we want to use the huber loss function instead of the mse
def huber_fn(y_true, y_pred):
    error = y_true - y_pred
    is_small_error = tf.abs(error) < 1
    squared_loss = tf.square(error) / 2
    linear_loss = tf.abs(error) - 0.5
    return tf.where(is_small_error, squared_loss, linear_loss)

In [19]:
housing = fetch_california_housing()
X_train_full, X_test, y_train_full, y_test = train_test_split(
    housing.data, housing.target.reshape(-1, 1), random_state=42
)
X_train, X_valid, y_train, y_valid = train_test_split(
    X_train_full, y_train_full, random_state=42
)

scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_valid_scaled = scaler.transform(X_valid)
X_test_scaled = scaler.transform(X_test)

input_shape = X_train.shape[1:]

tf.random.set_seed(42)
model = tf.keras.Sequential(
    [
        tf.keras.layers.Dense(
            30,
            activation="relu",
            kernel_initializer="he_normal",
            input_shape=input_shape,
        ),
        tf.keras.layers.Dense(1),
    ]
)

In [20]:
model.compile(loss=huber_fn, optimizer="nadam", metrics=["mae"])

In [21]:
model.fit(X_train_scaled, y_train, epochs=2, validation_data=(X_valid_scaled, y_valid))

Epoch 1/2


Epoch 2/2


<keras.callbacks.History at 0x13b84c790>

In [22]:
model.save("custom_models/my_model_with_a_custom_loss.h5")

In [23]:
# when we load the custom loss function we need to tell keras how to find it by mapping the function to it's name
model = tf.keras.models.load_model(
    "custom_models/my_model_with_a_custom_loss.h5",
    custom_objects={"huber_fn": huber_fn},
)
model.fit(X_train_scaled, y_train, epochs=2, validation_data=(X_valid_scaled, y_valid))

Epoch 1/2
Epoch 2/2


<keras.callbacks.History at 0x13be6d3d0>

In [24]:
def create_huber(threshold=1.0):
    def huber_fn(y_true, y_pred):
        error = y_true - y_pred
        is_small_error = tf.abs(error) < threshold
        squared_loss = tf.square(error) / 2
        linear_loss = threshold * tf.abs(error) - threshold**2 / 2
        return tf.where(is_small_error, squared_loss, linear_loss)

    return huber_fn


model.compile(loss=create_huber(2.0), optimizer="nadam", metrics=["mae"])
model.fit(X_train_scaled, y_train, epochs=2, validation_data=(X_valid_scaled, y_valid))

Epoch 1/2
Epoch 2/2


<keras.callbacks.History at 0x13be6da50>

In [25]:
model.save("custom_models/my_model_with_a_custom_loss_threshold_2.h5")

In [26]:
# since the threshold is not saved when we load the model we need to specify it again when we load the model
model = tf.keras.models.load_model(
    "custom_models/my_model_with_a_custom_loss_threshold_2.h5",
    custom_objects={"huber_fn": create_huber(2.0)},
)

In [27]:
# we can solve this by creating a subclass of the keras.losses.Loss class and implementing the get_config() method which keras will call and save the threshold value
class HuberLoss(tf.keras.losses.Loss):
    def __init__(self, threshold=1.0, **kwargs):  # accepts any additional argument
        self.threshold = threshold
        super().__init__(
            **kwargs
        )  # handles standard args (e.g., dtype) and keyword args (e.g., name)

    def call(
        self, y_true, y_pred
    ):  # takes the labels and predictions and returns the loss
        error = y_true - y_pred
        is_small_error = tf.abs(error) < self.threshold
        squared_loss = tf.square(error) / 2
        linear_loss = self.threshold * tf.abs(error) - self.threshold**2 / 2
        return tf.where(is_small_error, squared_loss, linear_loss)

    def get_config(
        self,
    ):  # returns a dictionary mapping each hyperparameter name to its value it first calls parent class's get_config() method then adds the new hyperparameters to this dictionary
        base_config = super().get_config()
        return {**base_config, "threshold": self.threshold}

In [28]:
tf.random.set_seed(42)
model = tf.keras.Sequential(
    [
        tf.keras.layers.Dense(
            30,
            activation="relu",
            kernel_initializer="he_normal",
            input_shape=input_shape,
        ),
        tf.keras.layers.Dense(1),
    ]
)

In [29]:
model.compile(loss=HuberLoss(2.0), optimizer="nadam", metrics=["mae"])
model.fit(X_train_scaled, y_train, epochs=2, validation_data=(X_valid_scaled, y_valid))
model.save("custom_models/my_model_with_a_custom_loss_class.h5")

Epoch 1/2
Epoch 2/2


In [30]:
# when we load the model we need to tell keras how to find the class by mapping the class to its name
model = tf.keras.models.load_model(
    "custom_models/my_model_with_a_custom_loss_class.h5",
    custom_objects={"HuberLoss": HuberLoss},
)
model.fit(X_train_scaled, y_train, epochs=2, validation_data=(X_valid_scaled, y_valid))
model.loss.threshold

Epoch 1/2
Epoch 2/2


2.0

Custom Activation Functions, Initializers, Regularizers, and Constraints

In [31]:
# most functionality can be customized in a similar way
def my_softplus(z):
    return tf.math.log(tf.exp(z) + 1.0)


def my_glorot_initializer(shape, dtype=tf.float32):
    stddev = tf.sqrt(2.0 / (shape[0] + shape[1]))
    return tf.random.normal(shape, stddev=stddev, dtype=dtype)


def my_l1_regularizer(weights):
    return tf.reduce_sum(tf.abs(0.01 * weights))


def my_positive_weights(weights):
    return tf.where(weights < 0.0, tf.zeros_like(weights), weights)

In [32]:
# these custom functions can be used in the same way as the built-in functions
layer = tf.keras.layers.Dense(
    1,
    activation=my_softplus,
    kernel_initializer=my_glorot_initializer,
    kernel_regularizer=my_l1_regularizer,
    kernel_constraint=my_positive_weights,
)

In [33]:
# if a function has hyperparameters that need to be saved along with the model then we need to subclass the appropriate class
class MyL1Regularizer(tf.keras.regularizers.Regularizer):
    def __init__(self, factor):
        self.factor = factor

    def __call__(self, weights):
        return tf.reduce_sum(tf.abs(self.factor * weights))

    def get_config(self):
        return {"factor": self.factor}


# we must impliment the call() method for losses and layers and the __call__() method for regularizers, initializers, and constraints

Custom metrics

In [34]:
# losses and metrics are not the same thing
# losses are used by gradient descent to train a model, they must be differentiable
# metrics are used to evaluate a model, they can be non-differentiable
model.compile(loss="mse", optimizer="nadam", metrics=[create_huber(2.0)])
# for each batch druing training keras will compute this metric and display the mean of the metric for each epoch
precision = tf.keras.metrics.Precision()
precision([0, 1, 1, 1, 0, 1, 0, 1], [1, 1, 0, 1, 0, 1, 0, 1])

<tf.Tensor: shape=(), dtype=float32, numpy=0.8>

In [35]:
precision([0, 1, 0, 0, 1, 0, 1, 1], [1, 0, 1, 1, 0, 0, 0, 0])

<tf.Tensor: shape=(), dtype=float32, numpy=0.5>

In [36]:
precision.result()

<tf.Tensor: shape=(), dtype=float32, numpy=0.5>

In [37]:
precision.variables

[<tf.Variable 'true_positives:0' shape=(1,) dtype=float32, numpy=array([4.], dtype=float32)>,
 <tf.Variable 'false_positives:0' shape=(1,) dtype=float32, numpy=array([4.], dtype=float32)>]

In [38]:
precision.reset_states()

In [39]:
# if we need to define our own metric we can subclass the keras.metrics.Metric class
# this keeps track of the Huber loss and the number of instances seen so far
class HuberMetric(tf.keras.metrics.Metric):
    def __init__(
        self, threshold=1.0, **kwargs
    ):  # creates the state variables needed by the metric (in this case the total and count) and calls the parent class's constructor
        super().__init__(**kwargs)  # handles base args (e.g., dtype)
        self.threshold = threshold
        self.huber_fn = create_huber(threshold)
        self.total = self.add_weight("total", initializer="zeros")
        self.count = self.add_weight("count", initializer="zeros")

    def update_state(
        self, y_true, y_pred, sample_weight=None
    ):  # updates the state variables, given the labels and predictions for one batch (or sample), and optionally the sample weights
        sample_metrics = self.huber_fn(y_true, y_pred)
        self.total.assign_add(tf.reduce_sum(sample_metrics))
        self.count.assign_add(tf.cast(tf.size(y_true), tf.float32))

    def result(self):  # computes and returns the final result
        return self.total / self.count

    def get_config(
        self,
    ):  # returns a dictionary mapping each hyperparameter name to its value
        base_config = super().get_config()
        return {**base_config, "threshold": self.threshold}

Custom Layers

In [40]:
# this creates a layer without any weights that will apply the exponential function to the inputs
exponential_layer = tf.keras.layers.Lambda(lambda x: tf.exp(x))
exponential_layer([-1.0, 0.0, 1.0])

<tf.Tensor: shape=(3,), dtype=float32, numpy=array([0.36787945, 1.        , 2.7182817 ], dtype=float32)>

In [41]:
# to build a custom stateful layer we need to subclass the keras.layers.Layer class
class MyDense(
    tf.keras.layers.Layer
):  # this layer will compute w * x + b where w is a matrix created by the layer, x is the input matrix, and b is a vector created by the layer
    def __init__(self, units, activation=None, **kwargs):
        super().__init__(**kwargs)
        self.units = units
        self.activation = tf.keras.activations.get(activation)

    def build(self, batch_input_shape):  # creates the layer's variables (weights)
        self.kernel = self.add_weight(
            name="kernel",
            shape=[batch_input_shape[-1], self.units],
            initializer="he_normal",
        )
        self.bias = self.add_weight(
            name="bias", shape=[self.units], initializer="zeros"
        )
        super().build(batch_input_shape)  # must be at the end

    def call(self, X):  # performs the computations (in this case w * x + b)
        return self.activation(X @ self.kernel + self.bias)

    def compute_output_shape(self, batch_input_shape):
        return tf.TensorShape(batch_input_shape.as_list()[:-1] + [self.units])

    def get_config(
        self,
    ):  # returns a dictionary mapping each hyperparameter name to its value
        base_config = super().get_config()
        return {
            **base_config,
            "units": self.units,
            "activation": tf.keras.activations.serialize(self.activation),
        }


# this can now be used like any other layer

In [42]:
# to create a layer with multiple inputs the argument to the call() method must be a tuple containing all the inputs
# this example takes two inputs and returns three outputs
class MyMultiLayer(tf.keras.layers.Layer):
    def call(self, X):
        X1, X2 = X
        print("X1.shape: ", X1.shape, " X2.shape: ", X2.shape)
        return X1 + X2, X1 * X2, X1 / X2

    def compute_output_shape(self, batch_input_shape):
        batch_input_shape1, batch_input_shape2 = batch_input_shape
        return [batch_input_shape1, batch_input_shape1, batch_input_shape1]

In [43]:
# if your layer needs to have different behavior during training and testing then you must add a training argument to the call() method and use it to decide what to do
# this adds Gaussian noise during training (for regularization) but not during testing
class MyGaussianNoise(tf.keras.layers.Layer):
    def __init__(self, stddev, **kwargs):
        super().__init__(**kwargs)
        self.stddev = stddev

    def call(self, X, training=None):
        if training:
            noise = tf.random.normal(tf.shape(X), stddev=self.stddev)
            return X + noise
        else:
            return X

    def compute_output_shape(self, batch_input_shape):
        return batch_input_shape

Custom Models

In [44]:
class ResidualBlock(tf.keras.layers.Layer):
    def __init__(self, n_layers, n_neurons, **kwargs):
        super().__init__(**kwargs)
        self.hidden = [
            tf.keras.layers.Dense(
                n_neurons, activation="relu", kernel_initializer="he_normal"
            )
            for _ in range(n_layers)
        ]

    def call(self, inputs):
        Z = inputs
        for layer in self.hidden:
            Z = layer(Z)
        return inputs + Z


class ResidualRegressor(tf.keras.Model):
    def __init__(self, output_dim, **kwargs):
        super().__init__(**kwargs)
        self.hidden1 = tf.keras.layers.Dense(
            30, activation="relu", kernel_initializer="he_normal"
        )
        self.block1 = ResidualBlock(2, 30)
        self.block2 = ResidualBlock(2, 30)
        self.out = tf.keras.layers.Dense(output_dim)

    def call(self, inputs):
        Z = self.hidden1(inputs)
        for _ in range(1 + 3):
            Z = self.block1(Z)
        Z = self.block2(Z)
        return self.out(Z)

losses and metrics based on model internals

In [45]:
# the custom losses and metrics we created earlier were all based on the labels and predictions for a single batch (or sample)
# this creates a custom MLP model composed of a stack of five hidden layers plus one output layer
# the loss associated with this auxiliary output will be called "reconstruction_loss" and will only be added to the main loss when trainin
# we will encourage the model to preserve as much infromation as possible through the hidden layers this sometimes will improve generalization
class ReconstructingRegressor(tf.keras.Model):
    """
    Initializes the ReconstructingRegressor.

    Args:
        output_dim (int): The number of units in the output layer.
        **kwargs: Arbitrary keyword arguments.
    """

    def __init__(
        self, output_dim, **kwargs
    ):  # creates the DNN with five hidden layers and one output layer and a Mean streaming metric to track the reconstruction error
        super().__init__(**kwargs)
        self.hidden = [
            tf.keras.layers.Dense(30, activation="relu", kernel_initializer="he_normal")
            for _ in range(5)
        ]
        self.out = tf.keras.layers.Dense(output_dim)
        self.reconstruction_mean = tf.keras.metrics.Mean(name="reconstruction_error")

    def build(
        self, batch_input_shape
    ):  # creates an extra dense layer to reconstruct the inputs created here because the number of inputs is not known until the model is built
        """
        Creates a layer to reconstruct the inputs.

        Args:
            batch_input_shape (TensorShape or tuple): The shape of the input data.
        """
        n_inputs = batch_input_shape[-1]
        self.reconstruct = tf.keras.layers.Dense(n_inputs)
        self.built = True  # WORKAROUND for super().build(batch_input_shape)

    def call(
        self, inputs, training=None
    ):  # processes the inputs through the five hidden layers then passes the result through the reconstruction layer then computes the mean squared error between the reconstruction and the inputs and adds it to the model's list of losses
        """
        Performs a forward pass through the model.

        Args:
            inputs (tf.Tensor): The input data.
            training (bool, optional): Whether the model is in training mode. Defaults to None.

        Returns:
            The output of the model.
        """
        Z = inputs
        for layer in self.hidden:
            Z = layer(Z)
        reconstruction = self.reconstruct(Z)
        recon_loss = tf.reduce_mean(tf.square(reconstruction - inputs))
        self.add_loss(0.05 * recon_loss)
        if training:
            result = self.reconstruction_mean(recon_loss)
            self.add_metric(result)
        return self.out(Z)

In [46]:
tf.random.set_seed(42)
model = ReconstructingRegressor(1)
model.compile(loss="mse", optimizer="nadam")
history = model.fit(X_train_scaled, y_train, epochs=5)
y_pred = model.predict(X_test_scaled)

Epoch 1/5


Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5


Computing gradients using autodiff

In [47]:
def f(w1, w2):
    return 3 * w1**2 + 2 * w1 * w2


w1, w2 = 5, 3
eps = 1e-6
(f(w1 + eps, w2) - f(w1, w2)) / eps

36.000003007075065

In [48]:
(f(w1, w2 + eps) - f(w1, w2)) / eps

10.000000003174137

In [49]:
# to compute the gradients automatically you can use the tf.GradientTape class
w1, w2 = tf.Variable(5.0), tf.Variable(3.0)
with tf.GradientTape() as tape:
    z = f(w1, w2)

gradients = tape.gradient(z, [w1, w2])
gradients
# in order to save memory only put the strict minimum inside the tf.GradientTape block or pause recording gradients using tape.stop_recording()

[<tf.Tensor: shape=(), dtype=float32, numpy=36.0>,
 <tf.Tensor: shape=(), dtype=float32, numpy=10.0>]

In [50]:
# the tape is automatically erased after you call its gradient() method so you will get an exception if you try to call gradient() twice
with tf.GradientTape() as tape:
    z = f(w1, w2)

dz_dw1 = tape.gradient(z, w1)
# dz_dw2 = tape.gradient(z, w2) # this would raise an exception

In [51]:
# if you need to call gradient more than once you must make the tape persistent and delete it each time that you are done with it to release resources
with tf.GradientTape(persistent=True) as tape:
    z = f(w1, w2)

dz_dw1 = tape.gradient(z, w1)
print(dz_dw1)
dz_dw2 = tape.gradient(z, w2)
print(dz_dw2)
del tape

tf.Tensor(36.0, shape=(), dtype=float32)
tf.Tensor(10.0, shape=(), dtype=float32)


In [52]:
# by default the tape will only track operations involving variables so if you try to compute the gradient of z with regard to anything else it will return None
c1, c2 = tf.constant(5.0), tf.constant(3.0)
with tf.GradientTape() as tape:
    z = f(c1, c2)
gradients = tape.gradient(z, [c1, c2])
gradients
# this can be usefull if you want to impliment a regularization loss that penalizes activations that vary a lot when the inputs vary little

[None, None]

In [53]:
# if you want to compute the gradients of a vector you have to call the Jacobian() method to get a matrix containing all the partial derivatives
# in some cases you may want to stop gradients from backpropagating through some part of your neural network this is usefull for freezing lower layers when training a new DNN on top
def f(w1, w2):
    return 3 * w1**2 + tf.stop_gradient(2 * w1 * w2)


with tf.GradientTape() as tape:
    z = f(w1, w2)  # the forward pass is not affected by the stop_gradient() function

gradients = tape.gradient(z, [w1, w2])
gradients

[<tf.Tensor: shape=(), dtype=float32, numpy=30.0>, None]

In [54]:
x = tf.Variable(1e-50)
with tf.GradientTape() as tape:
    z = tf.sqrt(x)

tape.gradient(
    z, [x]
)  # causes an infiate number because the derivative of sqrt(x) is 1 / (2 * sqrt(x)) and when x = 0 this is infinite

[<tf.Tensor: shape=(), dtype=float32, numpy=inf>]

In [55]:
def my_softplus(z):  # this makes this numericaly stable
    return tf.math.log(1 + tf.exp(-tf.abs(z))) + tf.maximum(0.0, z)


# in some cases where you may still have numerically unstable gradients so you will have to tell TensorFlow to compute gradients using a more stable function
@tf.custom_gradient
def my_better_softplus(z):
    def my_softplus_gradients(grads):
        return grads * (1 - 1 / (1 + tf.exp(z)))

    result = tf.math.log(1 + tf.exp(-tf.abs(z))) + tf.maximum(0.0, z)
    return result, my_softplus_gradients

Custom Training Loops

In [56]:
# in some cases the fit method is not enough for what we want to do so we can create custom training loops
l2_reg = tf.keras.regularizers.l2(0.05)
model = tf.keras.models.Sequential(
    [
        tf.keras.layers.Dense(
            30,
            activation="relu",
            kernel_initializer="he_normal",
            kernel_regularizer=l2_reg,
        ),
        tf.keras.layers.Dense(1, kernel_regularizer=l2_reg),
    ]
)


# we create a function that will randomly sample a batch of instances from the training set
def random_batch(X, y, batch_size=32):
    idx = np.random.randint(len(X), size=batch_size)
    return X[idx], y[idx]


# we next define a function that will display the training status including the number of steps the total number of steps the mean loss since the start of the epoch and other metrics
def print_status_bar(step, total, loss, metrics=None):
    metrics = " - ".join(
        [f"{m.name}: {m.result():.4f}" for m in [loss] + (metrics or [])]
    )
    end = "" if step < total else "\n"
    print(f"\r{step}/{total} - " + metrics, end=end)

In [57]:
# we can define some hyperparameters and choose the optimizer, the loss function, and the metrics to monitor
n_epochs = 5
batch_size = 32
n_steps = len(X_train) // batch_size
optimizer = tf.keras.optimizers.SGD(learning_rate=0.01)
loss_fn = tf.keras.losses.mean_squared_error
mean_loss = tf.keras.metrics.Mean(name="mean_loss")
metrics = [tf.keras.metrics.MeanAbsoluteError()]

In [58]:
# below is the custom loop
for epoch in range(1, n_epochs + 1):
    print(f"Epoch {epoch}/{n_epochs}")
    for step in range(1, n_steps + 1):
        X_batch, y_batch = random_batch(X_train_scaled, y_train)
        with tf.GradientTape() as tape:  # we make a prediction for one batch using the model as a function and we compute the loss
            y_pred = model(X_batch, training=True)
            main_loss = tf.reduce_mean(
                loss_fn(y_batch, y_pred)
            )  # we compute the mean over the batch
            loss = tf.add_n([main_loss] + model.losses)

        gradients = tape.gradient(
            loss, model.trainable_variables
        )  # we compute the gradients of the loss with regard to each trainable variable not all variables
        optimizer.apply_gradients(
            zip(gradients, model.trainable_variables)
        )  # this then performs a Gradient Descent step
        for variable in model.variables:
            if variable.constraint is not None:
                variable.assign(
                    variable.constraint(variable)
                )  # we apply any constraints wether they are bias constraints or kernel constraints

        mean_loss(loss)  # we update the mean loss and the metrics
        for metric in metrics:
            metric(y_batch, y_pred)

        print_status_bar(step, n_steps, mean_loss, metrics)  # display the status bar

    for metric in [mean_loss] + metrics:
        metric.reset_states()

Epoch 1/5
362/362 - mean_loss: 2.8571 - mean_absolute_error: 0.6311
Epoch 2/5
362/362 - mean_loss: 1.5686 - mean_absolute_error: 0.5224
Epoch 3/5
362/362 - mean_loss: 1.0540 - mean_absolute_error: 0.4973
Epoch 4/5
362/362 - mean_loss: 0.8184 - mean_absolute_error: 0.4956
Epoch 5/5
362/362 - mean_loss: 0.7071 - mean_absolute_error: 0.4920


In [59]:
# extra code – shows how to use the tqdm package to display nice progress bars

with trange(1, n_epochs + 1, desc="All epochs") as epochs:
    for epoch in epochs:
        with trange(1, n_steps + 1, desc=f"Epoch {epoch}/{n_epochs}") as steps:
            for step in steps:
                X_batch, y_batch = random_batch(X_train_scaled, y_train)
                with tf.GradientTape() as tape:
                    y_pred = model(X_batch)
                    main_loss = tf.reduce_mean(loss_fn(y_batch, y_pred))
                    loss = tf.add_n([main_loss] + model.losses)

                gradients = tape.gradient(loss, model.trainable_variables)
                optimizer.apply_gradients(zip(gradients, model.trainable_variables))

                for variable in model.variables:
                    if variable.constraint is not None:
                        variable.assign(variable.constraint(variable))

                status = OrderedDict()
                mean_loss(loss)
                status["loss"] = mean_loss.result().numpy()
                for metric in metrics:
                    metric(y_batch, y_pred)
                    status[metric.name] = metric.result().numpy()

                steps.set_postfix(status)

        for metric in [mean_loss] + metrics:
            metric.reset_states()

All epochs:   0%|          | 0/5 [00:00<?, ?it/s]

Epoch 1/5:   0%|          | 0/362 [00:00<?, ?it/s]

Epoch 2/5:   0%|          | 0/362 [00:00<?, ?it/s]

Epoch 3/5:   0%|          | 0/362 [00:00<?, ?it/s]

Epoch 4/5:   0%|          | 0/362 [00:00<?, ?it/s]

Epoch 5/5:   0%|          | 0/362 [00:00<?, ?it/s]

TensorFlow Functions and Graphs

In [61]:
def cube(x):
    return x**3


cube(2)

8

In [62]:
cube(tf.constant(2.0))

<tf.Tensor: shape=(), dtype=float32, numpy=8.0>

In [63]:
tf_cube = tf.function(cube)
tf_cube

<tensorflow.python.eager.polymorphic_function.polymorphic_function.Function at 0x13e70da50>

In [64]:
tf_cube(2)

<tf.Tensor: shape=(), dtype=int32, numpy=8>

In [65]:
tf_cube(tf.constant(2.0))

<tf.Tensor: shape=(), dtype=float32, numpy=8.0>

In [67]:
# this allows us to use the function as if it was a regular python function but it will actually be executed as a TensorFlow graph
# we can also use the tf.function decorator
@tf.function
def tf_cube(x):
    return x**3


# the original python function is still available via the python_function attribute
tf_cube.python_function(2)
# the tf.function decorator will automatically create a new graph and will reuse it for subsequent calls it will also automatically handle the creation of variables and the execution of the function
# it prunes unused nodes after each call and it will automatically reduce the number of operations by optimizing the graph

8

In [None]:
# if you set jit_compile=True when calling tf.function it will use XLA to compile the function into a much faster TensorFlow graph (XLA is a domain-specific compiler for linear algebra that optimizes TensorFlow computations)
# tf generates graphs by analyzing the source code of the function so it will only capture for loops if they use static bounds and it will not capture while loops or if statements