#  Custom Models and Training with TensorFlow

### Tensorflow Playground

In [1]:
import tensorflow as tf

# t1 = tf.constant([
#     [1.,2.,3.],
#     [4.,5.,6.]
# ])

# print(t1.shape)
# print(t1.dtype)

# print(tf.math.log(t1))
# print(tf.square(t1))

# t2 = tf.constant([
#     [7,8,9],
#     [10,11,12]
# ])

# t1 + tf.cast(t2, dtype=tf.float32)

# v = tf.Variable([
#     [1.,2.,3.],
#     [4.,5.,6.]
# ])

# v.assign([
#     [7,8,9],
#     [10,11,12]
# ])

# v.assign(v * 2)

str = "Sample String"

tf_string = tf.convert_to_tensor(str)

tf_string

unicode_form = tf.strings.unicode_decode(tf_string, "UTF-8")

tf.strings.unicode_encode(unicode_form, "UTF-8")

2025-03-19 07:01:51.577928: I external/local_tsl/tsl/cuda/cudart_stub.cc:31] Could not find cuda drivers on your machine, GPU will not be used.
2025-03-19 07:01:51.607534: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:9261] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
2025-03-19 07:01:51.607663: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:607] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
2025-03-19 07:01:51.608705: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1515] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2025-03-19 07:01:51.613984: I external/local_tsl/tsl/cuda/cudart_stub.cc:31] Could not find cuda drivers on your machine, GPU will not be used.
2025-03-19 07:01:51.614913: I tensorflow/core/platform/cpu_feature_guard.cc:1

<tf.Tensor: shape=(), dtype=string, numpy=b'Sample String'>

### Custom Cost Functions

In [2]:
import tensorflow as tf
from tensorflow import keras

# def huber_fn(y_true, y_pred):
#     error = y_true - y_pred
#     is_small_error = tf.abs(error) < 1
#     squared_loss = tf.square(error) / 2
#     linear_loss = tf.abs(error) - 0.5
#     return tf.where(is_small_error, squared_loss, linear_loss)

class HuberLoss(keras.losses.Loss):
    def __init__(self, threshold=1.0, **kwargs):
        self.threshold = threshold
        super().__init__(**kwargs)
    def call(self, y_true, y_pred):
        error = y_true - y_pred
        is_small_error = tf.abs(error) < self.threshold
        squared_loss = tf.square(error) / 2
        linear_loss = tf.abs(error) - 0.5
        return tf.where(is_small_error, squared_loss, linear_loss)
    def get_config(self):
        base_config = super().get_config()
        return {**base_config, "threshold": self.threshold}

### Custom Metrics

In [3]:
import tensorflow as tf

def create_huber(threshold=1.0):
    def huber_fn(y_true, y_pred):
        error = y_true - y_pred
        is_small_error = tf.abs(error) < threshold
        squared_loss = tf.square(error) / 2
        linear_loss = threshold * tf.abs(error) - threshold ** 2 / 2
        return tf.where(is_small_error, squared_loss, linear_loss)
    return huber_fn

class HuberMetric(tf.keras.metrics.Metric):
    def __init__(self, threshold=1.0, **kwargs):
        super().__init__(**kwargs)
        self.threshold = threshold
        self.huber_fn = create_huber(threshold)
        self.total = self.add_weight("total", initializer = "zeros")
        self.count = self.add_weight("count", initializer = "zeros")

    def update_state(self, y_true, y_pred, sample_weight=None):
        sample_metrics = self.huber_fn(y_true, y_pred)
        self.total.assign_add(tf.reduce_sum(sample_metrics))
        self.count.assign_add(tf.cast(tf.size(y_true), tf.float32))

    def result(self):
        return self.total / self.count

    def get_config(self):
        base_config = super().get_config()
        return {**base_config, "threshold": self.threshold}

### Custom Layers

In [4]:
import tensorflow as tf

class MyDense(tf.keras.layers.Layer):
    def __init__(self, units, activation=None, **kwargs):
        super().__init__(**kwargs)
        self.units = units
        self.activation = tf.keras.activations.get(activation)
    
    def build(self, batch_input_shape):
        self.kernel = slef.add_weight(
            name="kernel",
            shape=[batch_input_shape[-1], self.units],
            initializer="glorot_normal"
        )
        self.bias = self.add_weight(
            name="bias",
            shape=[self.units],
            initializer="zeros"
        )
    
    def call(self, X):
        return self.activation(X @ self.kernel + self.bias)
    
    def get_config(self):
        base_config = super().get_config()
        return {
            **base_config, 
            "units": self.units, 
            "activation": tf.keras.activations.serialize(self.activation)
        }


### Custon Model

In [5]:
import tensorflow as tf

class ResidualBlock(tf.keras.layers.Layer):
    def __init__(self, n_layers, n_neurons, **kwargs):
        super().__init__(**kwargs)
        self.hidden = [
            tf.keras.layers.Dense(n_neurons, activation="relu", kernel_initializer="he_normal"
            )
            for _ in range(n_layers)
        ]
    
    def call(self, inputs):
        Z = inputs
        for layer in self.hidden:
            Z = layer(Z)
        return inputs + Z

class ResidualRegressor(tf.keras.Model):
    def __init__(self, output_dim, **kwargs):
        super().__init__(**kwargs)
        self.hidden1 = tf.keras.layers.Dense(30, activation="relu", kernel_initializer="he_normal")
        self.block1 = ResidualBlock(2, 30)
        self.block2 = ResidualBlock(2, 30)
        self.out = tf.keras.layers.Dense(output_dim)
    
    def call(self, inputs):
        Z = self.hidden1(inputs)
        for _ in range(1 + 3):
            Z = self.block1(Z)
        Z = self.block2(Z)
        return self.out(Z)

ResidualRegressor(5).layers

[<keras.src.layers.core.dense.Dense at 0x7f512ef61420>,
 <__main__.ResidualBlock at 0x7f512ef62920>,
 <__main__.ResidualBlock at 0x7f512ef61120>,
 <keras.src.layers.core.dense.Dense at 0x7f512ef63a00>]

### Custom Losses and Metrics Based on Model Internals

In [6]:
import tensorflow as tf

class RecontructingRegressor(tf.keras.Model):
    def __init__(self, output_dim, **kwargs):
        super.__init__(**kwargs)
        self.hidden = [
            tf.keras.layers.Dense(30, activation="relu", kernel_initializer="he_normal")
            for _ in range(5)
        ]
        self.out = tf.keras.layers.Dense(output_dim)
        self.reconstruction_mean = tf.keras.metrics.Mean(
            name="reconstruction_error"
        )
    
    def build(self, batch_input_shape):
        n_inputs = batch_input_shape[-1]
        self.reconstrucy = tf.keras.layers.Dense(n_inputs)
    
    def call(self, inputs, training=False):
        Z = inputs
        for layer in self.hidden:
            Z = layer(Z)
        reconstruction = self.reconstruct(Z)
        recon_loss = tf.reduce_mean(tf.square(reconstruction - inputs))
        self.add_loss(0.05 * recon_loss)
        if training:
            result = self.reconstruction_mean(recon_loss)
            self.add_metric(result)
        return self.out(Z)


### Custom Training Loop

In [7]:
import tensorflow as tf
import numpy as np
import pandas as pd

def random_batch(X, y, batch_size=32):
    idx = np.random.randint(len(X), size=batch_size)
    return X[idx], y[idx]

def print_status_bar(step, total, loss, metrics=None):
    metrics = " - ".join(
        [f"{m.name}: {m.result():.4f}" 
            for m in [loss] + (metrics or [])
        ]
    )
    end = "" if step < total else "\n"
    print(f"\r{step}/{total} - " + metrics, end=end)

# Just for demonstration purposes
X_train = np.random.rand(6400, 3)
y_train = X_train[:, 0] + 2 * X_train[:, 1] + 3 * X_train[:, 2] + np.random.normal(0, 1, size=6400)

l2_reg = tf.keras.regularizers.l2(0.05)
model = tf.keras.models.Sequential([
    tf.keras.layers.Dense(
        30, 
        activation="relu", 
        kernel_initializer="he_normal",
        kernel_regularizer=l2_reg
    ),
    tf.keras.layers.Dense(1, kernel_regularizer=l2_reg)
])

n_epochs = 15
batch_size = 32
n_steps = len(X_train) // batch_size
optimizer = tf.keras.optimizers.SGD(learning_rate=1e-3)
loss_fn = tf.keras.losses.mean_squared_error
mean_loss = tf.keras.metrics.Mean()
metrics = [tf.keras.metrics.MeanAbsoluteError()]

for epoch in range(1, n_epochs + 1):
    print(f"Epoch {epoch}/{n_epochs}")
    for step in range(1, n_steps + 1):
        X_batch, y_batch = random_batch(X_train, y_train)
        with tf.GradientTape() as tape:
            y_pred = model(X_batch)
            main_loss = tf.reduce_mean(loss_fn(y_batch, y_pred))
            loss = tf.add_n([main_loss] + model.losses)
        gradients = tape.gradient(loss, model.trainable_variables)
        optimizer.apply_gradients(zip(gradients, model.trainable_variables))
        mean_loss(loss)
        for metric in metrics:
            metric(y_batch, y_pred)
        print_status_bar(step, n_steps, mean_loss, metrics)
    for metric in [mean_loss] + metrics:
        metric.reset_states()

Epoch 1/15
200/200 - mean: 7.4803 - mean_absolute_error: 1.5324
Epoch 2/15
200/200 - mean: 5.2902 - mean_absolute_error: 0.9413
Epoch 3/15
200/200 - mean: 5.1353 - mean_absolute_error: 0.9583
Epoch 4/15
200/200 - mean: 5.0838 - mean_absolute_error: 0.9807
Epoch 5/15
200/200 - mean: 4.8872 - mean_absolute_error: 0.9815
Epoch 6/15
200/200 - mean: 4.8255 - mean_absolute_error: 0.9999
Epoch 7/15
200/200 - mean: 4.6561 - mean_absolute_error: 0.9957
Epoch 8/15
200/200 - mean: 4.5005 - mean_absolute_error: 0.9929
Epoch 9/15
200/200 - mean: 4.3998 - mean_absolute_error: 1.0009
Epoch 10/15
200/200 - mean: 4.3759 - mean_absolute_error: 1.0266
Epoch 11/15
200/200 - mean: 4.2764 - mean_absolute_error: 1.0336
Epoch 12/15
200/200 - mean: 4.1726 - mean_absolute_error: 1.0291
Epoch 13/15
200/200 - mean: 4.0593 - mean_absolute_error: 1.0259
Epoch 14/15
200/200 - mean: 3.9736 - mean_absolute_error: 1.0356
Epoch 15/15
200/200 - mean: 3.9738 - mean_absolute_error: 1.0679


### TensorFlow Functions and Graphs

In [8]:
import tensorflow as tf

@tf.function
def tf_cube(x):
    print(100+100)
    print(x)
    return x ** 3

print(tf_cube.python_function)

concrete_function = tf_cube.get_concrete_function(tf.constant(2.0))

ops = concrete_function.graph.get_operations()

result = tf_cube(tf.constant(2.0))
result = tf_cube(tf.constant(3.0))

result = tf_cube(tf.constant([2.0, 3.0]))

result = tf_cube(2)
result = tf_cube(3)

@tf.function(input_signature=[tf.TensorSpec([None, 28, 28], tf.float32)])
def shrink(images):
    return images[:, ::2, ::2]

img_batch_1 = tf.random.uniform(shape=[100,28,28])
img_batch_2 = tf.random.uniform(shape=[50,28,28])

preprocessed_images = shrink(img_batch_1)
preprocessed_images = shrink(img_batch_2)

# TypeError: Binding inputs to tf.function failed
# img_batch_3 = tf.random.uniform(shape=[2,2,2])
# preprocessed_images = shrink(img_batch_3)

print(preprocessed_images.shape)

# @tf.function
# def add_10(x):
#     for i in range(10):
#         x += 1
#     return x

# add_10(tf.constant(0))

# add_10.get_concrete_function(tf.constant(0)).graph.get_operations()

@tf.function
def tf_add_10(x):
    for i in tf.range(10):
        x += 1
    return x

tf_add_10(tf.constant(0))

tf_add_10.get_concrete_function(tf.constant(0)).graph.get_operations()


<function tf_cube at 0x7f512c4b7880>
200
Tensor("x:0", shape=(), dtype=float32)
200
Tensor("x:0", shape=(2,), dtype=float32)
200
2
200
3
(50, 14, 14)


[<tf.Operation 'x' type=Placeholder>,
 <tf.Operation 'range/start' type=Const>,
 <tf.Operation 'range/limit' type=Const>,
 <tf.Operation 'range/delta' type=Const>,
 <tf.Operation 'range' type=Range>,
 <tf.Operation 'sub' type=Sub>,
 <tf.Operation 'floordiv' type=FloorDiv>,
 <tf.Operation 'mod' type=FloorMod>,
 <tf.Operation 'zeros_like' type=Const>,
 <tf.Operation 'NotEqual' type=NotEqual>,
 <tf.Operation 'Cast' type=Cast>,
 <tf.Operation 'add' type=AddV2>,
 <tf.Operation 'zeros_like_1' type=Const>,
 <tf.Operation 'Maximum' type=Maximum>,
 <tf.Operation 'while/maximum_iterations' type=Const>,
 <tf.Operation 'while/loop_counter' type=Const>,
 <tf.Operation 'while' type=StatelessWhile>,
 <tf.Operation 'Identity' type=Identity>]

### Implement a custom layer that performs layer normalization

In [36]:
import tensorflow as tf
from sklearn.datasets import fetch_california_housing
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
import numpy as np

class ElliotsLayer(tf.keras.layers.Layer):
    def __init__(self, epsilon=0.001, **kwargs):
        super().__init__(**kwargs)
        self.epsilon = epsilon
    
    def build(self, batch_input_shape):
        self.alpha = self.add_weight(
            name="alpha",
            shape=batch_input_shape[-1:],
            initializer="ones",
            dtype="float32",
        )
        self.beta = self.add_weight(
            name="beta",
            shape=batch_input_shape[-1:],
            initializer="zeros",
            dtype="float32",
        )
    
    def call(self, X):
        mean, variance = tf.nn.moments(X, axes=-1, keepdims=True)
        return self.alpha * (X - mean) / tf.math.sqrt(variance + self.epsilon) + self.beta
    
    def get_config(self):
        base_config = super().get_config()
        return {**base_config, "epsilon": self.epsilon}

housing = fetch_california_housing()
X_train_full, X_test, y_train_full, y_test = train_test_split(
    housing.data, housing.target.reshape(-1, 1), random_state=42)
X_train, X_valid, y_train, y_valid = train_test_split(
    X_train_full, y_train_full, random_state=42)

scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_valid_scaled = scaler.transform(X_valid)
X_test_scaled = scaler.transform(X_test)

X = X_train.astype(np.float32)
custom_layer_norm = ElliotsLayer()
keras_layer_norm = tf.keras.layers.LayerNormalization()

# error = tf.reduce_mean(
#     tf.keras.losses.MeanAbsoluteError()(
#         keras_layer_norm(X),
#         custom_layer_norm(X)
#     )
# )
# print(error)

# Apply the custom layer once to initialize weights
custom_layer_norm.build(X.shape)
keras_layer_norm.build(X.shape)

tf.keras.utils.set_random_seed(42)
random_alpha = np.random.rand(X.shape[-1])
random_beta = np.random.rand(X.shape[-1])

keras_layer_norm.set_weights([random_alpha, random_beta])
custom_layer_norm.set_weights([random_alpha, random_beta])

random_error = tf.reduce_mean(
    tf.keras.losses.MeanAbsoluteError()(
        keras_layer_norm(X),
        custom_layer_norm(X)
    )
)

print(random_error)

tf.Tensor(0.66280156, shape=(), dtype=float32)


### Train a model using a custom training loop to tackle the Fashion MNIST dataset

In [1]:
import tensorflow as tf
import numpy as np
from collections import OrderedDict

def random_batch(X, y, batch_size=32):
    idx = np.random.randint(len(X), size=batch_size)
    return X[idx], y[idx]

def print_status_bar(step, total, loss, metrics=None):
    metrics = " - ".join(
        [f"{m.name}: {m.result():.4f}" 
            for m in [loss] + (metrics or [])
        ]
    )
    end = "" if step < total else "\n"
    print(f"\r{step}/{total} - " + metrics, end=end)

fashion_mnist = tf.keras.datasets.fashion_mnist.load_data()
(X_train_full, y_train_full), (X_test, y_test) = fashion_mnist
X_train, y_train = X_train_full[:-5000], y_train_full[:-5000]
X_valid, y_valid = X_train_full[-5000:], y_train_full[-5000:]

# Preprocessing of data
X_train, X_valid, X_test = X_train / 255., X_valid / 255., X_test / 255.
class_names = ["T-shirt/top", "Trouser", "Pullover", "Dress", "Coat",
 "Sandal", "Shirt", "Sneaker", "Bag", "Ankle boot"]

# model = tf.keras.Sequential([
#     tf.keras.layers.Flatten(input_shape=[28, 28]),
#     tf.keras.layers.Dense(300, activation="relu"),
#     tf.keras.layers.Dense(100, activation="relu"),
#     tf.keras.layers.Dense(10, activation="softmax")
# ])

# n_epoch = 5
# batch_size = 32
# n_steps = len(X_train) // 32
# optimizer = tf.keras.optimizers.Nadam(learning_rate=0.01)
# loss_fn = tf.keras.losses.sparse_categorical_crossentropy
# mean_loss = tf.keras.metrics.Mean(name="mean_loss")
# metrics = [tf.keras.metrics.SparseCategoricalAccuracy()]

# for epoch in range(1, n_epoch + 1):
#     print(f"Epoch {epoch}/{n_epoch}")
#     for step in range(1, n_steps + 1):
#         X_batch, y_batch = random_batch(X_train, y_train)
#         with tf.GradientTape() as tape:
#             y_pred = model(X_batch)
#             main_loss = tf.reduce_mean(loss_fn(y_batch, y_pred))
#             loss = tf.add_n([main_loss] + model.losses)
#         gradients = tape.gradient(loss, model.trainable_variables)
#         optimizer.apply_gradients(zip(gradients, model.trainable_variables))
#         status = OrderedDict()
#         mean_loss(loss)
#         status["loss"] = mean_loss.result().numpy()
#         for metric in metrics:
#             metric(y_batch, y_pred)
#             status[metric.name] = metric.result().numpy()
#         print_status_bar(step, n_steps, mean_loss, metrics)
    
#     y_pred = model(X_valid)
#     status["val_loss"] = tf.reduce_mean(loss_fn(y_valid, y_pred))
#     status["val_accuracy"] = np.mean(tf.keras.metrics.sparse_categorical_accuracy(
#         tf.constant(y_valid, dtype=np.float32), y_pred
#     ))

#     for metric in [mean_loss] + metrics:
#         metric.reset_states()

# Split the model into two parts and apply a different optimizer to each part
lower_layer = tf.keras.Sequential([
    tf.keras.layers.Flatten(input_shape=[28, 28]),
    tf.keras.layers.Dense(300, activation="relu"),
])
upper_layer = tf.keras.Sequential([
    tf.keras.layers.Dense(100, activation="relu"),
    tf.keras.layers.Dense(10, activation="softmax")
])
model = tf.keras.Sequential([
    lower_layer,
    upper_layer
])
lower_optimizer = tf.keras.optimizers.SGD(learning_rate=1e-4)
upper_optimizer = tf.keras.optimizers.Nadam(learning_rate=1e-3)

n_epoch = 5
batch_size = 32
n_steps = len(X_train) // 32
loss_fn = tf.keras.losses.sparse_categorical_crossentropy
mean_loss = tf.keras.metrics.Mean(name="mean_loss")
metrics = [tf.keras.metrics.SparseCategoricalAccuracy()]

for epoch in range(1, n_epoch + 1):
    print(f"Epoch {epoch}/{n_epoch}")
    for step in range(1, n_steps + 1):
        X_batch, y_batch = random_batch(X_train, y_train)
        with tf.GradientTape(persistent=True) as tape:
            lower_output = lower_layer(X_batch)
            y_pred = upper_layer(lower_output)
            main_loss = tf.reduce_mean(loss_fn(y_batch, y_pred))
            loss = tf.add_n([main_loss] + model.losses)
        # TODO
        lower_grads = tape.gradient(loss, lower_layer.trainable_variables)
        upper_grads = tape.gradient(loss, upper_layer.trainable_variables)
        lower_optimizer.apply_gradients(zip(lower_grads, lower_layer.trainable_variables))
        upper_optimizer.apply_gradients(zip(upper_grads, upper_layer.trainable_variables))
        del tape
        status = OrderedDict()
        mean_loss(loss)
        status["loss"] = mean_loss.result().numpy()
        for metric in metrics:
            metric(y_batch, y_pred)
            status[metric.name] = metric.result().numpy()
        print_status_bar(step, n_steps, mean_loss, metrics)
    
    y_pred = model(X_valid)
    status["val_loss"] = tf.reduce_mean(loss_fn(y_valid, y_pred))
    status["val_accuracy"] = np.mean(tf.keras.metrics.sparse_categorical_accuracy(
        tf.constant(y_valid, dtype=np.float32), y_pred
    ))

    for metric in [mean_loss] + metrics:
        metric.reset_states()
    

2025-03-18 05:35:46.275260: I tensorflow/core/util/port.cc:113] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
2025-03-18 05:35:46.277924: I external/local_tsl/tsl/cuda/cudart_stub.cc:31] Could not find cuda drivers on your machine, GPU will not be used.
2025-03-18 05:35:46.309909: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:9261] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
2025-03-18 05:35:46.310010: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:607] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
2025-03-18 05:35:46.310884: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1515] Unable to register cuBLAS factory: Attempting to

<a style='text-decoration:none;line-height:16px;display:flex;color:#5B5B62;padding:10px;justify-content:end;' href='https://deepnote.com?utm_source=created-in-deepnote-cell&projectId=968d3c27-50e7-4d42-bdd9-442f6904c1c2' target="_blank">
 </img>
Created in <span style='font-weight:600;margin-left:4px;'>Deepnote</span></a>