### "Layer" class. A Layer encapsulates a state (weights) and some computation (defined in the `call` method).

In [1]:
%config IPCompleter.greedy=True # for autotab in Jupyter

In [22]:
import tensorflow as tf
from tensorflow.keras.layers import Layer
from tensorflow.keras.layers import Dense

In [23]:
print(tf.__version__)

2.0.0


In [4]:
class Linear(Layer):
    """y = w*x + b"""
    
    def __init__(self, units=32, input_dim=32):
        super(Linear, self).__init__()
        
        w_init = tf.random_normal_initializer() # random initialization
        self.w = tf.Variable(initial_value=w_init(shape=(input_dim, units), dtype="float32"), trainable=True)
        b_init = tf.zeros_initializer()
        self.b = tf.Variable(initial_value=b_init(shape=(units,), dtype="float32"), trainable=True)
        
    # call method
    def call(self, inputs):
        return tf.matmul(inputs, self.w) + self.b # w*x + b

# Instantiate Layer
linear_layer = Linear(4,2)

# Layer can be treated as a function
# Input some data 
y = linear_layer(tf.ones((2,2)))
assert y.shape == (2,4)

# Weights are automatically tracked under the 'weights' property
assert linear_layer.weights == [linear_layer.w, linear_layer.b]

In [19]:
print(y)

tf.Tensor(
[[ 0.06665818 -0.09086242 -0.01717797  0.02770316]
 [ 0.06665818 -0.09086242 -0.01717797  0.02770316]], shape=(2, 4), dtype=float32)


### It's good practice to create weights in a separate `build` method

In [4]:
class Linear(Layer):
    """y = w*x + b"""
    
    def __init__(self, units=32, input_dim=32):
        super(Linear, self).__init__()
        self.units = units
    
    # add_weight method shortcut for creating weights
    def build(self, input_shape):
        self.w = self.add_weight(shape=(input_shape[-1], self.units), 
                                 initializer="random_normal",
                                 trainable=True)
        self.b = self.add_weight(shape=(self.units,),
                                 initializer="random_normal",
                                 trainable=True)
    
    def call(self, inputs):
        return tf.matmul(inputs, self.w) + self.b
    
# Instantiate layer
linear_layer = Linear(4) # this means 4 units (weights) for the layer

# This also calls "build(input_shape)" and creates the weights
y = linear_layer(tf.ones((2,2)))

In [14]:
print(tf.ones((2,2)))

tf.Tensor(
[[1. 1.]
 [1. 1.]], shape=(2, 2), dtype=float32)


In [18]:
print(y)

tf.Tensor(
[[ 0.06665818 -0.09086242 -0.01717797  0.02770316]
 [ 0.06665818 -0.09086242 -0.01717797  0.02770316]], shape=(2, 4), dtype=float32)


### Automatically retreive the gradients of the weights of a layer by calling it inside a GradientTape. With these gradients one can update the weights of a layer, manually or using an optimizer object


In [21]:
# Load MNIST from keras API
(x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
dataset = tf.data.Dataset.from_tensor_slices((x_train.reshape(60000,784).astype("float32")/255, y_train))
dataset = dataset.shuffle(buffer_size=1024).batch(64)

In [27]:
# Instantiate linear Layer (above) with 10 units
linear_layer = Layer(10)

# Instantiate a logistic loss function that expects integer targets
loss_fn = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)

# Instantiate an optimizer
optimizer = tf.keras.optimizers.SGD(learning_rate=1e-3)

# Iterate over the batches of the dataset
for step, (x,y) in enumerate(dataset):
    
    # open a GradientTape (Tf 2 new feature)
    with tf.GradientTape() as tape:
        
        # do forward pass
        logits = linear_layer(x) # input data to layer this -> direction
        
        # compute loss for this batch
        loss = loss_fn(y, logits) # target vs computed
        
        # get gradients of weights with respect of loss
        gradients = tape.gradient(loss, linear_layer.trainable_weights)
        
    # Update weights of linear layer
    optimizer.apply_gradients(zip(gradients, linear_layer.trainable_weights))
    
    # logging
    if step % 100 == 0:
        print(step, float(loss))

0 6.8388214111328125
100 6.844223976135254
200 6.849213600158691
300 6.866031169891357
400 6.840302467346191
500 6.853946685791016
600 6.849768161773682
700 6.848651885986328
800 6.848796367645264
900 6.839184761047363


### Weights created by layers can be either trainable or non-trainable. They're exposed in the layer properties `trainable_weights` and `non_trainable_weights. This layer has non-trainable weights

In [33]:
class ComputeSum(Layer):
    """Returns sum of the inputs"""
    
    def __init__(self, input_dim):
        super(ComputeSum, self).__init__()
        
        # Create non-trainable weight
        self.total = tf.Variable(initial_value=tf.zeros((input_dim,)), trainable=False)
        
    def call(self, inputs):
        self.total.assign_add(tf.reduce_sum(inputs, axis=0)) # add inputs
        return self.total
    
my_sum = ComputeSum(2)
x = tf.ones((2,2))

y = my_sum(x)
print(y.numpy()) # [2, 2]

y = my_sum(x)
print(y.numpy())

assert my_sum.trainable_weights == []

[2. 2.]
[4. 4.]


###  Layers can be recursively nested to create bigger computation blocks. Each layer will track the weights of its sublayers (both trainable and non-trainable)

In [38]:
# This is a multilayer perceptron
# Re-use Linear class
class MLP(Layer):
    """Simple stack of linear layers"""
    
    def __init__(self):
        super(MLP, self).__init__()
        self.linear_1 = Linear(32)
        self.linear_2 = Linear(32)
        self.linear_3 = Linear(10)
        
    def call(self, inputs):
        # connect layers
        x = self.linear_1(inputs)
        x = tf.nn.relu(x) # activation
        x = self.linear_2(x)
        x = tf.nn.relu(x)
        return self.linear_3(x)
    
mlp = MLP()

# first call will create the weights
y = mlp(tf.ones(shape=(3, 64)))

# weights are recursively tracked
assert len(mlp.weights) == 6

### Layers can create losses during the forward pass. This is especially useful for regularization losses. The losses created by sublayers are recursively tracked by the parent layers.

In [5]:
class ActivityRegularization(Layer):
    """Layer creates an activity sparsity regularization loss"""
    
    def __init__(self, rate=1e-2):
        super(ActivityRegularization, self).__init__()
        self.rate = rate
        
    def call(self, inputs):
        # use add_loss to create a regularization loss
        self.add_loss(self.rate * tf.reduce_sum(inputs))
        return inputs
    
# Use loss layer in MLP block
class SparseMLP(Layer):
    """Stack of Linear Layers with a sparsity """
    
    def __init__(self):
        super(SparseMLP, self).__init__()
        self.linear_1 = Linear(32)
        self.regularization = ActivityRegularization(1e-2)
        self.linear_3 = Linear(10)
        
    def call(self, inputs):
        x = self.linear_1(inputs)
        x = tf.nn.relu(x)
        x = self.regularization(x)
        return self.linear_3(x)
    
mlp = SparseMLP()
y = mlp(tf.ones((10, 10)))
print(mlp.losses)

[<tf.Tensor: id=67, shape=(), dtype=float32, numpy=0.29380092>]


In [10]:
# The loss here corresponds to the last forward pass
mlp = SparseMLP()
mlp(tf.ones((10,10)))

# Use losses in a training loop

# Prepare dataset
(x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
dataset = tf.data.Dataset.from_tensor_slices((x_train.reshape(60000, 784).astype("float32")/255, y_train))
dataset = dataset.shuffle(buffer_size=1024).batch(64)

# Define a MLP
mlp = SparseMLP()

# Loss and optimizer
loss_fn = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
optimizer = tf.keras.optimizers.SGD(learning_rate=1e-3)

# Train
for step, (x,y) in enumerate(dataset):
    with tf.GradientTape() as tape:
        
        # forward pass
        logits = mlp(x)
        
        # external loss value for this batch
        loss = loss_fn(y, logits)
        
        # add losses created during the forward pass
        loss += sum(mlp.losses)
        
        # get gradients of weights wrt the loss
        gradients = tape.gradient(loss, mlp.trainable_weights)
        
    # update weights of linear layer
    optimizer.apply_gradients(zip(gradients, mlp.trainable_weights))
    
    # logging 
    if step % 100 == 0:
        print(step, float(loss))

0 5.42427921295166
100 2.5380611419677734
200 2.4305036067962646
300 2.3752405643463135
400 2.352739095687866
500 2.325277328491211
600 2.3527536392211914
700 2.3397092819213867
800 2.333073377609253
900 2.324150323867798


### TF 2.0 is eager by default. Running eagerly is great for debugging, but you will get better performance by compiling your computation into static graphs. You can compile any function by wrapping it in a tf.function:

In [11]:
# Define a MLP
mlp = SparseMLP()

# Loss and optimizer
loss_fn = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
optimizer = tf.keras.optimizers.SGD(learning_rate=1e-3)

# Create training step function
@tf.function # wrap it to make it fast
def train_on_batch(x, y):
    with tf.GradientTape() as tape:
        logits = mlp(x)
        loss = loss_fn(y, logits)
        gradients = tape.gradient(loss, mlp.trainable_weights)
    optimizer.apply_gradients(zip(gradients, mlp.trainable_weights))
    return loss

# Prepare dataset
(x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
dataset = tf.data.Dataset.from_tensor_slices((x_train.reshape(60000, 784).astype("float32")/255, y_train))
dataset = dataset.shuffle(buffer_size=1024).batch(64)

# Run training using step function
for step, (x,y) in enumerate(dataset):
    loss = train_on_batch(x,y)
    if step % 100 == 0:
        print(step, float(loss))

0 2.3349523544311523
100 2.3174633979797363
200 2.2839698791503906
300 2.2750468254089355
400 2.271489143371582
500 2.2708191871643066
600 2.2397618293762207
700 2.2710442543029785
800 2.2650415897369385
900 2.2367138862609863


### Implementing VAEs in subclassing style


In [26]:
class Sampling(Layer):
    
    def call(self, inputs):
        z_mean, z_log_var = inputs
        batch = tf.shape(z_mean)[0]
        dim = tf.shape(z_mean)[1]
        epsilon = tf.keras.backend.random_normal(shape=(batch, dim)) # gaussian noise
        return z_mean + tf.exp(0.5 * z_log_var) * epsilon
    
class Encoder(Layer):
    
    def __init__(self, 
                 latent_dim=32,
                 intermediate_dim=64,
                 name="encoder",
                 **kwargs):
        super(Encoder, self).__init__(name=name, **kwargs)
        self.dense_proj = Dense(intermediate_dim, activation="relu") # using Dense layers
        self.dense_mean = Dense(latent_dim)
        self.dense_log_var = Dense(latent_dim)
        self.sampling = Sampling()
        
    def call(self, inputs):
        x = self.dense_proj(inputs)
        z_mean = self.dense_mean(x)
        z_log_var = self.dense_log_var(x)
        z = self.sampling((z_mean, z_log_var))
        return z_mean, z_log_var, z
    
class Decoder(Layer):
    
    def __init__(self, 
                 original_dim,
                 intermediate_dim=64,
                 name="decoder",
                 **kwargs):
        super(Decoder, self).__init__(name=name, **kwargs)
        self.dense_proj = Dense(intermediate_dim, activation="relu") # using Dense layers
        self.dense_output = Dense(original_dim, activation="sigmoid")
        
    def call(self, inputs):
        x = self.dense_proj(inputs)
        return self.dense_output(x)
    
class VariationalAutoEncoder(tf.keras.Model):
    
    def __init__(self,
                 original_dim,
                 intermediate_dim=64,
                 latent_dim=32,
                 name="autoencoder",
                 **kwargs):
        super(VariationalAutoEncoder, self).__init__(name=name, **kwargs)
        self.original_dim = original_dim
        self.encoder = Encoder(latent_dim=latent_dim,
                           intermediate_dim=intermediate_dim)
        self.decoder = Decoder(original_dim, intermediate_dim=intermediate_dim)
    
    def call(self, inputs):
        z_mean, z_log_var, z = self.encoder(inputs)
        reconstructed = self.decoder(z)
        kl_loss = -0.5* tf.reduce_sum(1 + z_log_var - tf.square(z_mean) - tf.exp(z_log_var))
        self.add_loss(kl_loss)
        return reconstructed

original_dim = 784
vae = VariationalAutoEncoder(original_dim, 128, 64)

In [28]:
# Next is to define a training procedure for VAE
print(type(vae))

<class '__main__.VariationalAutoEncoder'>
