In [None]:
import tensorflow as tf

In [None]:
# function Hubert loss

def huber_fn(y_true, y_pred):
    error = y_true - y_pred
    is_small_error = tf.abs(error) < 1.0
    squared_loss = tf.square(error) / 2
    linear_loss = tf.abs(error) - 0.5
    return tf.where(is_small_error, squared_loss, linear_loss)

# example of using
model.compile(
    optimizer='adam',
    loss=huber_fn,
    metrics=['mae']
)

# Create a custom loss class

In [None]:
class HuberLoss(tf.keras.losses.Loss):
    def __init__(self, delta=1.0, name="huber_loss", *kwargs):
        super().__init__(name=name, **kwargs)
        self.delta = delta

    def call(self, y_true, y_pred):
        " this method is called during model training to compute the loss "
        error = y_true - y_pred
        is_small_error = tf.abs(error) < self.delta
        squared_loss = tf.square(error) / 2
        linear_loss = self.delta * (tf.abs(error) - self.delta / 2)
        return tf.where(is_small_error, squared_loss, linear_loss)

    def get_config(self):
        " this method is necessary to serialize the loss function (save and load model) "
        base_config = super().get_config()
        return {**base_config, "delta": self.delta}

# example of using
model.compile(
    optimizer='adam',
    loss=HuberLoss(delta=1.0),
    metrics=['mae']
)

model.save("my_model_with_huber_loss.keras")

loaded_model = tf.keras.models.load_model(
    "my_model_with_huber_loss.keras",
    custom_objects={"HuberLoss": HuberLoss} # <- necessary when loading a model with custom objects
)

# Custom Initializer, Regulazer, Constraint follow similar pattern

In [None]:
def my_softplus(z):
    return tf.math.log(tf.exp(z) + 1.0)

def my_glorot_initializer(shape, dtype=tf.float32):
    sttdev = tf.sqrt(2.0 / (shape[0] + shape[1]))
    return tf.random.normal(shape, stddev=sttdev, dtype=dtype)

def my_l1_regularizer(weights):
    return 0.01 * tf.reduce_sum(tf.abs(weights))

def my_positive_weights_constraint(weights):
    return tf.where(weights < 0.0, tf.zeros_like(weights), weights)

# example of using
layer = tf.keras.layers.Dense(
    units=64,
    activation=my_softplus,
    kernel_initializer=my_glorot_initializer,
    kernel_regularizer=my_l1_regularizer,
    kernel_constraint=my_positive_weights_constraint
)

# class for Regularizer

class MyL1Regularizer(tf.keras.regularizers.Regularizer):
    def __init__(self, factor):
        self.factor = factor

    def __call__(self, weights):
        return tf.reduce_sum(self.factor * tf.abs(weights))

    def get_config(self):
        return {"factor": self.factor}

# Custom Metric

In [None]:
def create_huber(threshold=1.0):
    def huber_fn(y_true, y_pred):
        error = y_true - y_pred
        is_small_error = tf.abs(error) < threshold
        squared_loss = tf.square(error) / 2
        linear_loss  = threshold * tf.abs(error) - threshold**2 / 2
        return tf.where(is_small_error, squared_loss, linear_loss)
    return huber_fn

class HuberMetric(tf.keras.metrics.Metric):
    def __init__(self, threshold=1.0, **kwargs):
        super().__init__(**kwargs) # handles base args (e.g., dtype)
        self.threshold = threshold
        self.huber_fn = create_huber(threshold)
        self.total = self.add_weight("total", initializer="zeros")
        self.count = self.add_weight("count", initializer="zeros")
    def update_state(self, y_true, y_pred, sample_weight=None):
        metric = self.huber_fn(y_true, y_pred)
        self.total.assign_add(tf.reduce_sum(metric))
        self.count.assign_add(tf.cast(tf.size(y_true), tf.float32))
    def result(self):
        return self.total / self.count
    def get_config(self):
        base_config = super().get_config()
        return {**base_config, "threshold": self.threshold}

# Special Layers

In [None]:
class MyDense(tf.keras.layers.Layer):
    def __init__(self, units, activation=None, **kwargs):
        super().__init__(**kwargs)
        self.units = units
        self.activation = tf.keras.activations.get(activation)

    def build(self, batch_input_shape):
        self.kernel = self.add_weight(
            name="kernel", shape=[batch_input_shape[-1], self.units],
            initializer="glorot_normal")
        self.bias = self.add_weight(
            name="bias", shape=[self.units], initializer="zeros")
        super().build(batch_input_shape) # must be at the end

    def call(self, X):
        return self.activation(X @ self.kernel + self.bias)

    def compute_output_shape(self, batch_input_shape):
        return tf.TensorShape(batch_input_shape.as_list()[:-1] + [self.units])

    def get_config(self):
        base_config = super().get_config()
        return {**base_config, "units": self.units,
                "activation": tf.keras.activations.serialize(self.activation)}

# Example of using

model = tf.keras.models.Sequential([
    MyDense(30, activation="relu", input_shape=32),
    MyDense(1)
])

In [None]:
class ResidualBlock(tf.keras.layers.Layer):

    def __init__(self, n_layers, n_neurons, **kwargs):
        super().__init__(**kwargs)
        self.hidden = [tf.keras.layers.Dense(n_neurons, activation="elu",
                                          kernel_initializer="he_normal")
                       for _ in range(n_layers)]

    def call(self, inputs):

        Z = inputs

        for layer in self.hidden:
            Z = layer(Z)

        result = inputs + Z

        return result


 class ResidualRegressor(tf.keras.models.Model):
    def __init__(self, output_dim, **kwargs):
        super().__init__(**kwargs)
        self.hidden1 = tf.keras.layers.Dense(30, activation="elu",
                                          kernel_initializer="he_normal")
        self.block1 = ResidualBlock(2, 30)
        self.block2 = ResidualBlock(2, 30)
        self.out = tf.keras.layers.Dense(output_dim)

    def call(self, inputs):
        Z = self.hidden1(inputs)
        for _ in range(1 + 3):
            Z = self.block1(Z)
        Z = self.block2(Z)
        return self.out(Z)