# Making New Layers and Models via Subclassing

Learning Objectives:
- Use Layer class as the combination of state (weights) and computation
- Defer weight creation until the shape of the inputs is known
- Build recursively compostable layers
- Compute loss using add_loss() method
- Compute average using add_metric() method
- Enable serialization on layers.



In [5]:
import tensorflow as tf
from tensorflow import keras

#### The Layer calss: the combination of state (weigths) and some computation

Densely connected layer. It has variables w and b.

In [2]:
class Linear(keras.layers.Layer):
    def __init__(self, units=32, input_dim=32):
        super(Linear, self).__init__()
        w_init = tf.random_normal_initializer()
        self.w = tf.Variable(
            initial_value=w_init(shape=(input_dim, units), dtype="float32"),
            trainable=True
        )
        b_init = tf.zeros_initializer()
        self.b = tf.Variable(
            initial_value=b_init(shape=(units,), dtype="float32"), trainable=True
        )
    
    def call(self, inputs):
        return tf.matmul(inputs, self.w)+self.b

In [3]:
x = tf.ones((2,2))
linear_layer = Linear(4,2)
y = linear_layer(x)
print(y)

tf.Tensor(
[[-0.06749453 -0.05709337  0.01190891  0.03545174]
 [-0.06749453 -0.05709337  0.01190891  0.03545174]], shape=(2, 4), dtype=float32)


In [4]:
assert linear_layer.weights == [linear_layer.w, linear_layer.b]

In [7]:
class Linear(keras.layers.Layer):
    def __init__(self, units=32, input_dim=32):
        super(Linear, self).__init__()
        self.w = self.add_weight(
            shape=(input_dim, units), initializer="random_normal", trainable=True
        )
        self.b = self.add_weight(
            shape=(units,), initializer="zeros", trainable=True
        )
    
    def call(self, inputs):
        return tf.matmul(inputs, self.w) + self.b

x = tf.ones((2,2))
linear_layer = Linear(4,2)
y = linear_layer(x)
print(y)

tf.Tensor(
[[ 0.1557091  -0.06040075  0.01923282 -0.01216238]
 [ 0.1557091  -0.06040075  0.01923282 -0.01216238]], shape=(2, 4), dtype=float32)


### Layers can have non-trainable weights

In [8]:
class ComputeSum(keras.layers.Layer):
    def __init__(self, input_dim):
        super(ComputeSum, self).__init__()
        self.total = tf.Variable(initial_value=tf.zeros((input_dim),),trainable=False)
    
    def call(self, inputs):
        self.total.assign_add(tf.reduce_sum(inputs, axis=0))
        return self.total

x = tf.ones((2,2))
my_sum = ComputeSum(2)
y = my_sum(x)
print(y.numpy())
y = my_sum(x)
print(y.numpy())

[2. 2.]
[4. 4.]


In [9]:
print("weights:", len(my_sum.weights))
print("non-trainable weights:", len(my_sum.non_trainable_weights))
print("trainable_weights:", my_sum.trainable_weights)

weights: 1
non-trainable weights: 1
trainable_weights: []


### Best practice: deferring weight creation until the shape of the inputs is known

Above, the input_dim was used to compute the shape of the weights w and b in __init__()

In many cases, we do not know in advanced the size of the inputs. We would want to create the weights after instantiating the layer.

Create layer weights in the build(self, input_shape) method layer.

In [11]:
class Linear(keras.layers.Layer):
    def __init__(self, units=32):
        print("init_method")
        super(Linear,self).__init__()
        self.units = units
    
    def build(self, input_shape):
        print("build method")
        self.w = self.add_weight(
            shape=(input_shape[-1], self.units),
            initializer="random_normal",
            trainable=True
        )

        self.b = self.add_weight(
            shape=(self.units,),
            initializer="random_normal",
            trainable=True
        )
    
    def call(self, inputs):
        print("call method")
        return tf.matmul(inputs, self.w) + self.b

In [12]:
linear_Layer = Linear(32)

init_method


In [13]:
y = linear_Layer(x)

build method
call method


### Layers are recursively composable

If you assign a layer instance as an attribute of another Layer, the outer layer will start tracking the weights of the inner layer.

We recommend creating such sublayers in the __init__() method (since the sublayers will typucally have build method, they will be built when the outer layer gets built)

In [14]:
class MLPBlock(keras.layers.Layer):
    def __init__(self):
        super(MLPBlock, self).__init__()
        self.linear_1 = Linear(32)
        self.linear_2 = Linear(32)
        self.linear_3 = Linear(1)
    
    def call(self, inputs):
        x = self.linear_1(inputs)
        x = tf.nn.relu(x)
        x = self.linear_2(x)
        x = tf.nn.relu(x)
        return self.linear_3(x)

In [15]:
mlp = MLPBlock()
y = mlp(tf.ones(shape=(3,64)))
print("weights:", len(mlp.weights))
print("trainable weights:", len(mlp.trainable_weights))

init_method
init_method
init_method
build method
call method
build method
call method
build method
call method
weights: 6
trainable weights: 6


## The add_loss() method

When writing the call() method of a layer, you can create loss tensors that you will want to use later, when writitng your training loop.  

This is doable by calling self.add_loss(value)

In [16]:
class ActivityRegularizationLayer(keras.layers.Layer):
    def __init__(self, rate=1e-2):
        super(ActivityRegularizationLayer, self).__init__()
        self.rate = rate
    
    def call(self, inputs):
        self.add_loss(self.rate*tf.reduce_sum(inputs))
        return inputs

These losses (including thosee by any inner layer) can be retrived via layer.losses. This property is reset at the start of every __call__() to the tip-level layer, so that ayer.losses always contains the loss values created during the last forward pass.

In [20]:
class OuterLayer(keras.layers.Layer):
    def __init__(self):
        super(OuterLayer, self).__init__()
        self.activity_reg = ActivityRegularizationLayer(1e-2)
    
    def call(self, inputs):
        return self.activity_reg(inputs)

In [21]:
layer = OuterLayer()
assert len(layer.losses) == 0

_ = layer(tf.zeros(1, 1))
assert len(layer.losses) == 1


_ = layer(tf.zeros(1, 1))
assert len(layer.losses) == 1  

In [22]:
class OuterLayerWithKernelRegularizer(keras.layers.Layer):
    def __init__(self):
        super(OuterLayerWithKernelRegularizer, self).__init__()
        self.dense = keras.layers.Dense(
            32, kernel_regularizer=tf.keras.regularizers.l2(1e-3)
        )

    def call(self, inputs):
        return self.dense(inputs)


layer = OuterLayerWithKernelRegularizer()
_ = layer(tf.zeros((1, 1)))
print(layer.losses)

[<tf.Tensor: shape=(), dtype=float32, numpy=0.0021020933>]


These losses work seamlessly with fit() (they get automatically summed and added to the main loss):

In [23]:
import numpy as np

inputs = keras.Input(shape=(3,))
outputs = ActivityRegularizationLayer()(inputs)
model = keras.Model(inputs, outputs)

model.compile(optimizer='adam', loss='mse')
model.fit(np.random.random((2,3)), np.random.random((2,3)))

model.compile(optimizer="adam")
model.fit(np.random.random((2,3)), np.random.random((2,3)))



<keras.callbacks.History at 0x25d49c37f70>

### The add_metric() method

Layers have an add_metric() method for tracking the moving average of a quantity during training.

In [24]:
class LogisticEndpoint(keras.layers.Layer):
    def __init__(self, name=None):
        super(LogisticEndpoint, self).__init__(name=name)
        self.loss_fn = keras.losses.BinaryCrossentropy(from_logits=True)
        self.accuracy_fn = keras.metrics.BinaryAccuracy()
    
    def call(self, targets, logits, sample_weights=None):
        loss = self.loss_fn(targets, logits, sample_weights)
        self.add_loss(loss)

        acc = self.accuracy_fn(targets, logits, sample_weights)
        self.add_metric(acc, name='accuracy')

        return tf.nn.softmax(logits)

In [26]:
layer = LogisticEndpoint()

targets = tf.ones((2,2))
logits = tf.ones((2,2))
y = layer(targets, logits)

print("layer.metrics", layer.metrics)
print("current accuracy value", float(layer.metrics[0].result()))

layer.metrics [<keras.metrics.accuracy_metrics.BinaryAccuracy object at 0x0000025D48782760>]
current accuracy value 1.0


In [27]:
inputs = keras.Input(shape=(3,), name="inputs")
targets = keras.Input(shape=(10,), name="targets")
logits = keras.layers.Dense(10)(inputs)
predictions = LogisticEndpoint(name="predictions")(logits, targets)

model = keras.Model(inputs=[inputs, targets], outputs=predictions)
model.compile(optimizer="adam")

data = {
    "inputs": np.random.random((3, 3)),
    "targets": np.random.random((3, 10)),
}
model.fit(data)



<keras.callbacks.History at 0x25d4acefb80>

### You can optionally enable serialization on your layers

For layers to be serializable as a part of a functional model, implement get_config()

In [28]:
class Linear(keras.layers.Layer):
    def __init__(self, units=32):
        super(Linear, self).__init__()
        self.units = units

    def build(self, input_shape):
        self.w = self.add_weight(
            shape=(input_shape[-1], self.units),
            initializer="random_normal",
            trainable=True,
        )
        self.b = self.add_weight(
            shape=(self.units,), initializer="random_normal", trainable=True
        )

    def call(self, inputs):
        return tf.matmul(inputs, self.w) + self.b

    def get_config(self):
        return {"units": self.units} 

layer = Linear(64)
config = layer.get_config()
print(config)
new_layer = Linear.from_config(config)

{'units': 64}


In [29]:

class Linear(keras.layers.Layer):
    def __init__(self, units=32, **kwargs):
        super(Linear, self).__init__(**kwargs)
        self.units = units

    def build(self, input_shape):
        self.w = self.add_weight(
            shape=(input_shape[-1], self.units),
            initializer="random_normal",
            trainable=True,
        )
        self.b = self.add_weight(
            shape=(self.units,), initializer="random_normal", trainable=True
        )

    def call(self, inputs):
        return tf.matmul(inputs, self.w) + self.b

    def get_config(self):
        config = super(Linear, self).get_config()
        config.update({"units": self.units})
        return config


layer = Linear(64)
config = layer.get_config()
print(config)
new_layer = Linear.from_config(config)

{'name': 'linear_9', 'trainable': True, 'dtype': 'float32', 'units': 64}


### Privileged training argument in the call() method

Some layers such as BatchNormalization and Dropout layers have different behaviours during tainning and inference.

It is standart practive to expose a trainning argument in the call method.

In [30]:
class CustomDropout(keras.layers.Layer):
    def __init__(self, rate, **kwargs):
        super(CustomDropout, self).__init__(**kwargs)
        self.rate = rate

    def call(self, inputs, training=None):
        if training:
            return tf.nn.dropout(inputs, rate=self.rate)
        return inputs

### Privileged mask argument in the call() method

In RNN layers, a mask is a boolean tensor (one bollean value per timestamp in the input) used to skip input timesteps when processing timeseries data.

Keras will automatically pass the correct mask argument to __call__() for layers that support it, when a mask is generated by a prior layer. Mask-generating layers are the Embedding layer configured with mask_zero=True, and the Masking layer.

### Putting it all together: an end-to-end example

- A layer encapsulates a state (crated in init or build) and some computation (defined in call)
- Layers can be recursivaly nested to create new, bigger computation blocks.
- Layers can create and tarck losses as well as metrics (vai add_loss() and add_metrics())
- The outer container, the thing you want to train is a Model. A Model is like a layer but with added trainning and serialization utilities.  

In [15]:
from tensorflow.keras import layers

class Sampling(layers.Layer):
    def call(self, inputs):
        z_mean, z_log_var = inputs
        batch = tf.shape(z_mean)[0]
        dim  = tf.shape(z_mean)[1]
        epsilon = tf.keras.backend.random_normal(shape=(batch, dim))
        return z_mean + tf.exp(0.5 * z_log_var) * epsilon
    

class Encoder(layers.Layer):

    def __init__(self, latent_dim=32, intermidiate_dim=64, name='encoder', **kwargs):
        super(Encoder, self).__init__(name=name, **kwargs)
        self.dense_proj = layers.Dense(intermidiate_dim, activation='relu')
        self.dense_mean = layers.Dense(latent_dim)
        self.dense_log_var = layers.Dense(latent_dim)
        self.sampling = Sampling()

    def call(self, inputs):
        x = self.dense_proj(inputs)
        z_mean = self.dense_mean(x)
        z_log_var = self.dense_log_var(x)
        z = self.sampling((z_mean, z_log_var))
        return z_mean, z_log_var, z
    

class Decoder(layers.Layer):
    def __init__(self, original_dim, intermidiate_dim=64, name='decoder', **kwargs):
        super(Decoder, self).__init__(name=name, **kwargs)
        self.dense_proj = layers.Dense(intermidiate_dim, activation='relu')
        self.dense_output = layers.Dense(original_dim, activation='sigmoid')
    
    def call(self, inputs):
        x = self.dense_proj(inputs)
        return self.dense_output(x)
    
class VariationalAutoEncoder(keras.Model):
    """Combines the encoder and decoder into an end-to-end model for training."""

    def __init__(
        self,
        original_dim,
        intermediate_dim=64,
        latent_dim=32,
        name="autoencoder",
        **kwargs
    ):
        super(VariationalAutoEncoder, self).__init__(name=name, **kwargs)
        self.original_dim = original_dim
        self.encoder = Encoder(latent_dim=latent_dim, intermidiate_dim=intermediate_dim)
        self.decoder = Decoder(original_dim, intermidiate_dim=intermediate_dim)
    
    def call(self, inputs):
        z_mean, z_log_var, z = self.encoder(inputs)
        reconstructed = self.decoder(z)
        kl_loss =  -0.5 * tf.reduce_mean(
            z_log_var - tf.square(z_mean) - tf.exp(z_log_var) + 1
        )
        self.add_loss(kl_loss)
        return reconstructed

In [19]:
original_dim = 784
vae = VariationalAutoEncoder(original_dim, intermediate_dim=64, latent_dim=32)

optimizer = tf.keras.optimizers.Adam(learning_rate=1e-3)
mse_loss_fn = tf.keras.losses.MeanSquaredError()

loss_metric = tf.keras.metrics.Mean()

(x_train, _), _  = tf.keras.datasets.mnist.load_data()
x_train = x_train.reshape(60000, 784).astype("float32")/255

train_dataset = tf.data.Dataset.from_tensor_slices(x_train)
train_dataset = train_dataset.shuffle(buffer_size=1024).batch(64)

epochs = 2

for epoch in range(epochs):
    print("Start of epoch %d" %(epoch,))

    for step, x_batch_train in enumerate(train_dataset):
        with tf.GradientTape() as tape:
            reconstructed = vae(x_batch_train)
            loss = mse_loss_fn(x_batch_train, reconstructed)
            loss += sum(vae.losses)
        
        grads = tape.gradient(loss, vae.trainable_weights)
        optimizer.apply_gradients(zip(grads, vae.trainable_weights))

        loss_metric(loss)

        if step %100 ==0:
            print("step %d: mean loss = %.4f" %(step,loss_metric.result()))


Start of epoch 0
step 0: mean loss = 0.3556
step 100: mean loss = 0.1250
step 200: mean loss = 0.0989
step 300: mean loss = 0.0890
step 400: mean loss = 0.0841
step 500: mean loss = 0.0808
step 600: mean loss = 0.0787
step 700: mean loss = 0.0771
step 800: mean loss = 0.0759
step 900: mean loss = 0.0749
Start of epoch 1
step 0: mean loss = 0.0746
step 100: mean loss = 0.0740
step 200: mean loss = 0.0735
step 300: mean loss = 0.0730
step 400: mean loss = 0.0727
step 500: mean loss = 0.0723
step 600: mean loss = 0.0720
step 700: mean loss = 0.0717
step 800: mean loss = 0.0714
step 900: mean loss = 0.0712


In [20]:
vae = VariationalAutoEncoder(784, 64, 32)

optimizer = tf.keras.optimizers.Adam(learning_rate=1e-3)

vae.compile(optimizer, loss=tf.keras.losses.MeanSquaredError())
vae.fit(x_train, x_train, epochs=2, batch_size=64)

Epoch 1/2
Epoch 2/2


<keras.callbacks.History at 0x2973d7c9fa0>