# Setup and Imports

In [None]:
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers, models
import numpy as np
import matplotlib.pyplot as plt
import datetime
print("TensorFlow version:", tf.__version__)

TensorFlow version: 2.18.0


# Custom Learning Rate Scheduler (OneCycle)

In [None]:
class OneCycleScheduler(keras.callbacks.Callback):
    def __init__(self, total_steps, max_lr, start_lr=None, last_lr=None):
        super().__init__()
        self.total_steps = total_steps
        self.max_lr = max_lr
        self.start_lr = start_lr or max_lr / 10
        self.last_lr = last_lr or self.start_lr / 100
        self.history = []

    def on_train_begin(self, logs=None):
        self.step = 0

    def on_batch_end(self, batch, logs=None):
        self.step += 1
        pct = self.step / self.total_steps
        if pct < 0.3:
            lr = self.start_lr + (self.max_lr - self.start_lr) * pct / 0.3
        elif pct < 0.7:
            lr = self.max_lr
        else:
            lr = self.max_lr - (self.max_lr - self.last_lr) * (pct - 0.7) / 0.3
        keras.backend.set_value(self.model.optimizer.lr, lr)
        self.history.append(lr)

# Custom Dropout (MCAlphaDropout)

In [None]:
class MCAlphaDropout(layers.AlphaDropout):
    def call(self, inputs, training=None):
        return super().call(inputs, training=True)  # always apply dropout (MC dropout)

# Custom Normalization (MaxNormDense Layer)

In [None]:
class MaxNormDense(layers.Layer):
    def __init__(self, units, max_norm=1.0):
        super().__init__()
        self.units = units
        self.max_norm = max_norm

    def build(self, input_shape):
        self.kernel = self.add_weight(
            shape=(input_shape[-1], self.units),
            initializer="glorot_uniform",
            constraint=constraints.max_norm(self.max_norm)
        )
        self.bias = self.add_weight(shape=(self.units,), initializer="zeros")

    def call(self, inputs):
        return tf.matmul(inputs, self.kernel) + self.bias

#  TensorBoard Setup

In [None]:
log_dir = "logs/fit/" + datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
tensorboard_callback = tf.keras.callbacks.TensorBoard(log_dir=log_dir, histogram_freq=1)

# Custom Loss Function (Huber Loss)

In [None]:
class HuberLoss(keras.losses.Loss):
    def __init__(self, delta=1.0):
        super().__init__()
        self.delta = delta

    def call(self, y_true, y_pred):
        error = y_true - y_pred
        is_small_error = tf.abs(error) <= self.delta
        squared_loss = tf.square(error) / 2
        linear_loss = self.delta * (tf.abs(error) - self.delta / 2)
        return tf.where(is_small_error, squared_loss, linear_loss)

# Custom Activation, Initializer, Regularizer, and Constraint

In [None]:
def my_leaky_relu(z):
    return tf.maximum(0.01 * z, z)

def my_glorot_initializer(shape, dtype=None):
    stddev = tf.sqrt(2. / tf.reduce_sum(shape))
    return tf.random.normal(shape, stddev=stddev, dtype=dtype)

class MyL1Regularizer(keras.regularizers.Regularizer):
    def __init__(self, factor):
        self.factor = factor

    def __call__(self, x):
        return self.factor * tf.reduce_sum(tf.abs(x))

def my_positive_weights(shape, dtype=None):
    return tf.random.uniform(shape, minval=0., maxval=1., dtype=dtype)

# Custom Metric (Huber Metric)

In [None]:
class HuberMetric(keras.metrics.Metric):
    def __init__(self, delta=1.0, name="huber_metric", **kwargs):
        super().__init__(name=name, **kwargs)
        self.delta = delta
        self.total = self.add_weight("total", initializer="zeros")
        self.count = self.add_weight("count", initializer="zeros")

    def update_state(self, y_true, y_pred, sample_weight=None):
        error = y_true - y_pred
        small_error = tf.abs(error) <= self.delta
        loss = tf.where(small_error, 0.5 * tf.square(error), self.delta * (tf.abs(error) - 0.5 * self.delta))
        self.total.assign_add(tf.reduce_sum(loss))
        self.count.assign_add(tf.cast(tf.size(error), tf.float32))

    def result(self):
        return self.total / self.count

# Custom Layer and Custom Model

In [None]:
class AddGaussianNoise(layers.Layer):
    def __init__(self, stddev):
        super().__init__()
        self.stddev = stddev

    def call(self, inputs, training=False):
        if training:
            return inputs + tf.random.normal(tf.shape(inputs), stddev=self.stddev)
        return inputs

class ResidualBlock(layers.Layer):
    def __init__(self, units):
        super().__init__()
        self.dense1 = layers.Dense(units, activation='relu')
        self.dense2 = layers.Dense(units)

    def call(self, inputs):
        x = self.dense1(inputs)
        x = self.dense2(x)
        return layers.ReLU()(inputs + x)

class ResidualRegressor(keras.Model):
    def __init__(self, units=64):
        super().__init__()
        self.input_layer = layers.Dense(units, activation='relu')
        self.res_block = ResidualBlock(units)
        self.out_layer = layers.Dense(1)

    def call(self, inputs):
        x = self.input_layer(inputs)
        x = self.res_block(x)
        return self.out_layer(x)

# Custom Optimizer

In [None]:
class MyMomentumOptimizer(keras.optimizers.Optimizer):
    def __init__(self, learning_rate=0.01, momentum=0.9, name="MyMomentum", **kwargs):
        super().__init__(name, **kwargs)
        self._set_hyper("learning_rate", learning_rate)
        self._set_hyper("momentum", momentum)

    @tf.function
    def _resource_apply_dense(self, grad, var, apply_state=None):
        lr = self._get_hyper("learning_rate")
        momentum = self._get_hyper("momentum")
        if not hasattr(var, "v"):
            var.v = tf.Variable(tf.zeros_like(var), trainable=False)
        var.v.assign(momentum * var.v - lr * grad)
        var.assign_add(var.v)

# Custom Training Loop (Fashion MNIST Example)

In [None]:
(train_x, train_y), (test_x, test_y) = keras.datasets.fashion_mnist.load_data()
train_x = train_x.reshape(-1, 28*28).astype("float32") / 255.0
test_x = test_x.reshape(-1, 28*28).astype("float32") / 255.0

model = ResidualRegressor()
loss_fn = HuberLoss()
optimizer = keras.optimizers.Adam()

batch_size = 64
epochs = 3
train_ds = tf.data.Dataset.from_tensor_slices((train_x, train_y)).shuffle(1024).batch(batch_size)

for epoch in range(epochs):
    print(f"\nEpoch {epoch+1}")
    for step, (x_batch, y_batch) in enumerate(train_ds):
        with tf.GradientTape() as tape:
            preds = model(x_batch)
            loss = loss_fn(tf.cast(y_batch, tf.float32), tf.squeeze(preds))
        grads = tape.gradient(loss, model.trainable_weights)
        optimizer.apply_gradients(zip(grads, model.trainable_weights))
        if step % 100 == 0:
            print(f"Step {step}, Loss: {loss:.4f}")

Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/train-labels-idx1-ubyte.gz
[1m29515/29515[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 0us/step
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/train-images-idx3-ubyte.gz
[1m26421880/26421880[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 0us/step
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/t10k-labels-idx1-ubyte.gz
[1m5148/5148[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 1us/step
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/t10k-images-idx3-ubyte.gz
[1m4422102/4422102[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 0us/step

Epoch 1
Step 0, Loss: 4.2102
Step 100, Loss: 0.9054
Step 200, Loss: 0.7561
Step 300, Loss: 0.4586
Step 400, Loss: 0.6641
Step 500, Loss: 0.4912
Step 600, Loss: 0.3817
Step 700, Loss: 0.6367
Step 800, Loss: 0.3300
Step 900, Loss: 0.5257

Epoc