In [1]:
import numpy as np
import tensorflow as tf
from tensorflow import keras
from keras.losses import mean_squared_error, huber_loss, MeanSquaredError
from keras.metrics import Accuracy

In [2]:
# to make this notebook's output stable across runs
np.random.seed(42)
tf.random.set_seed(42)

In [3]:
t = tf.constant(10)
t

<tf.Tensor: shape=(), dtype=int32, numpy=10>

In [62]:
shape = tf.constant([[[1,2,3],[1,2,3],[1,2,3]]])
shape.shape

TensorShape([1, 3, 3])

In [4]:
t = t + 5
t = tf.add(t, 5)  # tf.math.add(t,5)
# t.__add__(10)
t

<tf.Tensor: shape=(), dtype=int32, numpy=20>

In [5]:
a = tf.constant([[1,2,3]])
# a_n = a @ tf.transpose(a)
a_n = tf.matmul(a, tf.transpose(a))
a_n

<tf.Tensor: shape=(1, 1), dtype=int32, numpy=array([[14]])>

In [6]:
t = tf.constant([[1,2,3],[4,5,6],[7,8,9]])

In [7]:
print(np.mean(t, axis=1))
print(tf.reduce_mean(t, axis=1))

[2. 5. 8.]
tf.Tensor([2 5 8], shape=(3,), dtype=int32)


In [8]:
t = tf.constant([4., 5., 6.])
a = np.array([4., 5., 6.])

In [9]:
np.square(t)

array([16., 25., 36.], dtype=float32)

In [10]:
tf.square(a)

<tf.Tensor: shape=(3,), dtype=float64, numpy=array([16., 25., 36.])>

Tensorflow does not perform automatic type conversion to not hurt perfomance.
If type conversion is needed you can use tf.cast()

In [11]:
t2 = tf.constant(4., dtype=tf.float64)
tf.constant(2.0) + tf.cast(t2, dtype=tf.float32)

<tf.Tensor: shape=(), dtype=float32, numpy=6.0>

Because the tf.Tensor values so far are immutable we cannot change them which is problematic when we are working with weights that need to be adjusted by backpropagation.
We can use tf.Variable

In [12]:
v = tf.Variable([[1., 2., 3.],[4., 5., 6.]])

In [13]:
v

<tf.Variable 'Variable:0' shape=(2, 3) dtype=float32, numpy=
array([[1., 2., 3.],
       [4., 5., 6.]], dtype=float32)>

In [14]:
v.assign(v * 2)

<tf.Variable 'UnreadVariable' shape=(2, 3) dtype=float32, numpy=
array([[ 2.,  4.,  6.],
       [ 8., 10., 12.]], dtype=float32)>

In [15]:
v[0, 0].assign(42)

<tf.Variable 'UnreadVariable' shape=(2, 3) dtype=float32, numpy=
array([[42.,  4.,  6.],
       [ 8., 10., 12.]], dtype=float32)>

In [16]:
v[:, 2].assign([0, 1])

<tf.Variable 'UnreadVariable' shape=(2, 3) dtype=float32, numpy=
array([[42.,  4.,  0.],
       [ 8., 10.,  1.]], dtype=float32)>

In [17]:
arr = tf.constant([1,2,3,4,5,6,7,8,9])
print(tf.reduce_sum(arr))

tf.Tensor(45, shape=(), dtype=int32)


# Custom Loss Function

In [18]:
from sklearn.datasets import fetch_california_housing
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler

In [19]:
dataset = fetch_california_housing()
X, y = dataset.data, dataset.target
X_train_full, X_test, y_train_full, y_test = train_test_split(X, y, random_state=42)
X_train, X_val, y_train, y_val = train_test_split(X_train_full, y_train_full, random_state=42)

std_scaler = StandardScaler()
X_train_scaled = std_scaler.fit_transform(X_train)
X_test_scaled = std_scaler.transform(X_test)
X_val_scaled = std_scaler.transform(X_val)

In [20]:
def huber_fn(y_true, y_pred):
    error = y_true - y_pred
    mse = tf.square(error) / 2
    mae = tf.abs(error) - 0.5
    is_small_error = tf.abs(error) < 1
    return tf.where(is_small_error, mse, mae)

In [21]:
def create_huber(threshold=1.0):
    def huber_fn(y_true, y_pred):
        error = y_true - y_pred
        is_small_error = tf.abs(error) < threshold
        squared_loss = tf.square(error) / 2
        linear_loss  = threshold * tf.abs(error) - threshold**2 / 2
        return tf.where(is_small_error, squared_loss, linear_loss)
    return huber_fn

In [22]:
# model.compile(loss=huber_fn, optimizer="Nadam")
# model.fit(X_train, y_train, [...])

In [23]:
class HuberLoss(keras.losses.Loss):
    def __init__(self, threshold=1.0, **kwargs):
        self.threshold = threshold
        super().__init__(**kwargs)
    def call(self, y_true, y_pred):
        error = y_true - y_pred
        huber_mse = 0.5 * tf.square(error)
        huber_mae = self.threshold * (tf.abs(error) - 0.5 * self.threshold)
        is_smaller_error = tf.abs(error) < self.threshold
        return tf.where(is_smaller_error, huber_mse, huber_mae)
    def get_config(self):
        """
        This method used to store the parameters used in the training
        """
        base_config = super().get_config()
        return {**base_config, "threshold": self.threshold}

In [24]:
model = keras.models.Sequential()
model.add(keras.layers.Dense(30, activation="selu", kernel_initializer="lecun_normal",
                            input_shape=[8,]))
model.add(keras.layers.Dense(1))

In [25]:
model.compile(loss=HuberLoss(2.), optimizer="nadam", metrics=["accuracy"])

In [26]:
model.fit(X_train_scaled, y_train, epochs=2,
          validation_data=(X_val_scaled, y_val))

Epoch 1/2
Epoch 2/2


<keras.callbacks.History at 0x1e7b70176a0>

In [27]:
model.save("my_model_with_a_custom_loss_class.h5")
model = keras.models.load_model("my_model_with_a_custom_loss_class.h5", 
                                custom_objects={"HuberLoss": HuberLoss})
history = model.fit(X_train_scaled, y_train, epochs=2, 
          validation_data=(X_val_scaled, y_val))

Epoch 1/2
Epoch 2/2


In [28]:
model.loss.threshold

2.0

In [29]:
keras.backend.clear_session()
np.random.seed(42)
tf.random.set_seed(42)

In [30]:
def my_softplus(z):
    return tf.math.log(tf.exp(z) + 1.0)

def my_glorot_initializer(shape, dtype=tf.float32):
    stddev = tf.sqrt(2. / (shape[0] + shape[1]))
    return tf.random.normal(shape, stddev = stddev, dtype=dtype)

def my_l1_regularizer(weights):
    return tf.reduce_sum(tf.abs(weights * 0.1))

def my_positive_weights(weights):
    return tf.nn.relu(weights)

In [31]:
layer = keras.layers.Dense(1, activation=my_softplus, 
                           kernel_initializer=my_glorot_initializer, 
                           kernel_regularizer=my_l1_regularizer, 
                           kernel_constraint=my_positive_weights)

In [32]:
keras.backend.clear_session()
np.random.seed(42)
tf.random.set_seed(42)

In [33]:
class MyL1Regularizer(keras.regularizers.Regularizer):
    def __init__(self, factor):
        self.factor = factor
    def __call__(self, weights):
        return tf.reduce_sum(tf.abs(weights * self.factor))
    def get_config(self):
        return {"factor": self.factor}

In [34]:
model = keras.models.Sequential([
    keras.layers.Dense(30, activation="selu", kernel_initializer="lecun_normal",
                      input_shape=[8,]),
    keras.layers.Dense(1, activation=my_softplus, 
                       kernel_initializer=my_glorot_initializer,
                       kernel_regularizer=MyL1Regularizer(0.01),
                       kernel_constraint=my_positive_weights)
])

In [35]:
model.compile(loss="mse", optimizer="nadam", metrics=["mae"])

In [36]:
model.fit(X_train_scaled, y_train, epochs=2, validation_data=(X_val_scaled, y_val))

Epoch 1/2
Epoch 2/2


<keras.callbacks.History at 0x1e7ba45e280>

In [37]:
model.save("my_model_with_many_custom_parts.h5")

In [38]:
model = keras.models.load_model(
    "my_model_with_many_custom_parts.h5",
    custom_objects={
       "MyL1Regularizer": MyL1Regularizer,
       "my_positive_weights": my_positive_weights,
       "my_glorot_initializer": my_glorot_initializer,
       "my_softplus": my_softplus,
    }
)

# Custom Metrics

In [39]:
keras.backend.clear_session()
tf.random.set_seed(42)
np.random.seed(42)

In [40]:
class CustomAccuracy(keras.metrics.Metric):
    def __init__(self,  delta = 1.0, **kwargs):
        super().__init__(**kwargs)
        self.delta = delta
        self.huber_fn = create_huber(delta)
        self.total = self.add_weight("total", initializer="zeros")
        self.count = self.add_weight("count", initializer="zeros")
        
    def update_state(self, y_true, y_pred, sample_weight=None):
        metric = self.huber_fn(y_true, y_pred)
        self.total.assign_add(tf.reduce_sum(metric))
        self.count.assign_add(tf.cast(tf.size(y_true), tf.float32))
        
    def result(self):
        return self.total / self.count
    
    def reset_states(self):
        self.total.assign(0.)
        self.count.assign(0.)

In [41]:
y_pred = model.predict(X_train_scaled)



# Custom Layers

To create custom layers without any weight such as Flatten, Relu we can wrap it in Lambda layer

In [42]:
keras.backend.clear_session()
np.random.seed(42)
tf.random.set_seed(42)

In [43]:
exponential_layer = keras.layers.Lambda(lambda x: tf.exp(x))

In [44]:
exponential_layer([-1., 0., 1.])

<tf.Tensor: shape=(3,), dtype=float32, numpy=array([0.36787945, 1.        , 2.7182817 ], dtype=float32)>

In [45]:
model = keras.models.Sequential([
        keras.layers.Dense(30, activation="relu", input_shape=[8,]),
        keras.layers.Dense(1),
        exponential_layer
])

In [46]:
model.compile(loss="mse", optimizer="sgd")

model.fit(X_train_scaled, y_train, epochs=5,
         validation_data=(X_val_scaled, y_val))
model.evaluate(X_test_scaled, y_test)

Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5


0.39079830050468445

To create a custom layer with weights we have to build a new subclass of keras.layers.Layer

In [47]:
class CustomLayerDense(keras.layers.Layer):
    def __init__(self, units, activation=None, **kwargs):
        super().__init__(**kwargs)
        self.units = units
        self.activation = keras.activations.get(activation)
        
    def build(self, input_shape):
        self.kernel = self.add_weight(
            name="kernel", 
            shape=[input_shape[-1], self.units],
            initializer="glorot_normal"
        )
        self.bias = self.add_weight(
            name="bias",
            shape=[self.units], 
            initializer="zeros"
        )
        super().build(input_shape)
    def call(self, X):
        return self.activation(X @ self.kernel + self.bias)
    
    def compute_output_shape(self, input_shape):
        return tf.TensorShape(input_shape.as_list()[:-1] + [self.units])
    
    def get_config(self):
        """
        Method used to store the parameters
        """
        base_config = super().get_config()
        return {**base_config, "units": self.units,
                "activation": keras.activations.serialize(self.activation)}

In [48]:
keras.backend.clear_session()
tf.random.set_seed(42)
np.random.seed(42)

In [49]:
model = keras.models.Sequential([
    CustomLayerDense(30, activation="relu", input_shape=[8,]),
    CustomLayerDense(1)
])

In [50]:
model.compile(loss="mse", optimizer="nadam")
model.fit(X_train_scaled, y_train, epochs=10, 
         validation_data=(X_val_scaled, y_val))

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


<keras.callbacks.History at 0x1e7b70a9400>

In [51]:
class MyMultiLayer(keras.layers.Layer):
    def call(self, X):
        x1, x2 = X
        print("x1.shape: ", x1.shape ," x2.shape: ", x2.shape) # Debugging of custom layer
        return x1 + x2, x1 * x2
    def compute_output_shape(self, input_shape):
        input_shape1, input_shape2 = input_shape
        return [input_shape1, input_shape2]

In [52]:
inputs1 = keras.layers.Input(shape=[2])
inputs2 = keras.layers.Input(shape=[2])
outputs1, outputs2 = MyMultiLayer()((inputs1, inputs2))

x1.shape:  (None, 2)  x2.shape:  (None, 2)


# Custom Model

In [57]:
class ResidualBlock(keras.layers.Layer):
    def __init__(self, n_layers, n_neurons, **kwargs):
        super().__init__(**kwargs)
        self.hidden = [keras.layers.Dense(n_neurons, activation="relu", 
                                          kernel_initializer="he_normal") for _ in range(n_layers)]
    def call(self, inputs):
        Z = inputs
        print(Z.shape, inputs.shape)
        for layer in self.hidden:
            Z = layer(Z)
        return inputs + Z

In [58]:
class ResidualRegressor(keras.Model):
    def __init__(self, output_dim, **kwargs):
        super().__init__(**kwargs)
        self.hidden_1 = keras.layers.Dense(30, activation="elu", kernel_initializer="he_normal")
        self.block_1 = ResidualBlock(2, 30)
        self.block_2 = ResidualBlock(2, 30)
        
        self.out = keras.layers.Dense(output_dim)
    
    def call(self, inputs):
        Z = self.hidden_1(inputs)
        for _ in range(1 + 3):
            Z = self.block_1(Z)
        Z = self.block_2(Z)
        return self.out(Z)
        

In [59]:
keras.backend.clear_session()
np.random.seed(42)
tf.random.set_seed(42)

In [60]:
model = ResidualRegressor(1)
model.compile(loss="mse", optimizer="nadam")
history = model.fit(X_train_scaled, y_train, epochs=5)
score = model.evaluate(X_val_scaled, y_val)
y_pred = model.predict(X_test_scaled)

Epoch 1/5
(None, 30) (None, 30)
(None, 30) (None, 30)
(None, 30) (None, 30)
(None, 30) (None, 30)
(None, 30) (None, 30)
(None, 30) (None, 30)
(None, 30) (None, 30)
(None, 30) (None, 30)
(None, 30) (None, 30)
(None, 30) (None, 30)
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5
(None, 30) (None, 30)
(None, 30) (None, 30)
(None, 30) (None, 30)
(None, 30) (None, 30)
(None, 30) (None, 30)
(None, 30) (None, 30)
(None, 30) (None, 30)
(None, 30) (None, 30)
(None, 30) (None, 30)
(None, 30) (None, 30)


In [None]:
model.summary()