<a href="https://colab.research.google.com/github/AnkurMali/IST597_Spring_2022/blob/main/IST597_customlayers_vae_spring2022.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# IST597 :- Custom layers and VAEs
In this assignment we will learn, how to define your own custom layers using keras and integrate it with keras API. We will be developing simple Variational autoencoder and test it on MNIST dataset.

In [1]:
from __future__ import absolute_import, division, print_function, unicode_literals
import os
import tensorflow as tf
os.environ['TF_FORCE_GPU_ALLOW_GROWTH'] = 'true'

The full list of pre-existing layers can be seen in the documentation ([Keras API](https://https://www.tensorflow.org/api_docs/python/tf/keras/layers)). It includes Dense (a fully-connected layer), Conv2D (1D, 3D), RNN (GRU, LSTM ,etc), BatchNormalization, Dropout, and many others.)).

Let's look at different way of defining layers using keras and how we can create custom layers if needed.

In [2]:
layer = tf.keras.layers.Dense(256) # Provide number of hidden units, input shape is inferred from data
layer = tf.keras.layers.Dense(128, input_shape=(None, 10)) # Provide input shape, if model is complex

In [3]:
layer(tf.zeros([120, 10]))

<tf.Tensor: shape=(120, 128), dtype=float32, numpy=
array([[0., 0., 0., ..., 0., 0., 0.],
       [0., 0., 0., ..., 0., 0., 0.],
       [0., 0., 0., ..., 0., 0., 0.],
       ...,
       [0., 0., 0., ..., 0., 0., 0.],
       [0., 0., 0., ..., 0., 0., 0.],
       [0., 0., 0., ..., 0., 0., 0.]], dtype=float32)>

In [4]:
layer(tf.zeros([300, 10]))

<tf.Tensor: shape=(300, 128), dtype=float32, numpy=
array([[0., 0., 0., ..., 0., 0., 0.],
       [0., 0., 0., ..., 0., 0., 0.],
       [0., 0., 0., ..., 0., 0., 0.],
       ...,
       [0., 0., 0., ..., 0., 0., 0.],
       [0., 0., 0., ..., 0., 0., 0.],
       [0., 0., 0., ..., 0., 0., 0.]], dtype=float32)>

In [5]:
layer.variables # List all trainable variables

[<tf.Variable 'dense_1/kernel:0' shape=(10, 128) dtype=float32, numpy=
 array([[-0.00174594,  0.07600002,  0.20278783, ..., -0.04090717,
          0.0626701 ,  0.16592263],
        [ 0.02887262,  0.10382028, -0.16075982, ...,  0.15742756,
          0.06267197,  0.17785825],
        [ 0.0235547 , -0.02224658, -0.19540118, ..., -0.16289224,
          0.10448419,  0.11331768],
        ...,
        [-0.10184544, -0.06701   ,  0.12802236, ...,  0.09268863,
         -0.16879267, -0.09703728],
        [-0.19247532, -0.13628536,  0.10741906, ...,  0.18835728,
          0.03636011, -0.0418143 ],
        [ 0.07045038, -0.07457916, -0.20245764, ..., -0.14473625,
          0.0724908 , -0.08168623]], dtype=float32)>,
 <tf.Variable 'dense_1/bias:0' shape=(128,) dtype=float32, numpy=
 array([0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 

In [6]:
layer.kernel, layer.bias #Check weight and biases of your model

(<tf.Variable 'dense_1/kernel:0' shape=(10, 128) dtype=float32, numpy=
 array([[-0.00174594,  0.07600002,  0.20278783, ..., -0.04090717,
          0.0626701 ,  0.16592263],
        [ 0.02887262,  0.10382028, -0.16075982, ...,  0.15742756,
          0.06267197,  0.17785825],
        [ 0.0235547 , -0.02224658, -0.19540118, ..., -0.16289224,
          0.10448419,  0.11331768],
        ...,
        [-0.10184544, -0.06701   ,  0.12802236, ...,  0.09268863,
         -0.16879267, -0.09703728],
        [-0.19247532, -0.13628536,  0.10741906, ...,  0.18835728,
          0.03636011, -0.0418143 ],
        [ 0.07045038, -0.07457916, -0.20245764, ..., -0.14473625,
          0.0724908 , -0.08168623]], dtype=float32)>,
 <tf.Variable 'dense_1/bias:0' shape=(128,) dtype=float32, numpy=
 array([0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 

# Create custom layers
The overall optimization process is similar to keras layers, but provides user additional control over module.
You can create custom layers with or without trainable objects, gradient tape will skip or neglect all non-trainable parameters while calculating derivatives.

In [16]:
class IST597DenseLayer(tf.keras.layers.Layer):
    def __init__(self, num_outputs):
        super(IST597DenseLayer, self).__init__()
        self.num_outputs = num_outputs

    def build(self, input_shape):
        w_init = tf.random_normal_initializer()
        self.kernel = self.add_variable("kernel",
                                        shape=[int(input_shape[-1]),
                                               self.num_outputs])

    def call(self, input):
        return tf.matmul(input, self.kernel)

layer = IST597DenseLayer(128)
print(layer(tf.zeros([128, 10])))
print(layer.trainable_variables)


tf.Tensor(
[[0. 0. 0. ... 0. 0. 0.]
 [0. 0. 0. ... 0. 0. 0.]
 [0. 0. 0. ... 0. 0. 0.]
 ...
 [0. 0. 0. ... 0. 0. 0.]
 [0. 0. 0. ... 0. 0. 0.]
 [0. 0. 0. ... 0. 0. 0.]], shape=(128, 128), dtype=float32)
[<tf.Variable 'ist597_dense_layer_4/kernel:0' shape=(10, 128) dtype=float32, numpy=
array([[ 0.15198474,  0.06689079, -0.00597599, ..., -0.13580577,
        -0.14869042,  0.04165433],
       [ 0.04421245,  0.07853071, -0.20387553, ...,  0.12703057,
         0.17095082,  0.04433741],
       [-0.05372605,  0.11879389, -0.16131532, ...,  0.15864514,
        -0.09661411,  0.1667379 ],
       ...,
       [ 0.14704143, -0.12887022, -0.20500785, ..., -0.04495606,
         0.02551149, -0.13958366],
       [-0.20395795, -0.18829784, -0.14605734, ...,  0.10494737,
         0.11574872, -0.03081967],
       [ 0.07394899,  0.06932463,  0.15476967, ..., -0.16231357,
         0.15635408,  0.1387965 ]], dtype=float32)>]


  # Remove the CWD from sys.path while we load stuff.


# Defining your Resnet Blocks
Here we will see how one can implement Post-activation and pre-activation resnet blocks.
BN + Conv + relu = one set of trainable parameters (weight and biases for Conv and mean and variance for BN)

In [8]:
class ResnetBlock(tf.keras.Model):
    def __init__(self, kernel_size, filters):
        super(ResnetBlock, self).__init__(name='')
        filters1, filters2, filters3 = filters

        self.conv2a = tf.keras.layers.Conv2D(filters1, (1, 1))
        self.bn2a = tf.keras.layers.BatchNormalization()

        self.conv2b = tf.keras.layers.Conv2D(filters2, kernel_size, padding='same')
        self.bn2b = tf.keras.layers.BatchNormalization()

        self.conv2c = tf.keras.layers.Conv2D(filters3, (1, 1))
        self.bn2c = tf.keras.layers.BatchNormalization()

    def call(self, input_tensor, training=False):
        x = self.conv2a(input_tensor)
        x = self.bn2a(x, training=training)
        x = tf.nn.relu(x)

        x = self.conv2b(x)
        x = self.bn2b(x, training=training)
        x = tf.nn.relu(x)

        x = self.conv2c(x)
        x = self.bn2c(x, training=training)

        x += input_tensor
        return tf.nn.relu(x)

block = ResnetBlock(4, [64, 128, 256])
print(block(tf.zeros([4, 64, 128, 256])))
print([x.name for x in block.trainable_variables])

tf.Tensor(
[[[[0. 0. 0. ... 0. 0. 0.]
   [0. 0. 0. ... 0. 0. 0.]
   [0. 0. 0. ... 0. 0. 0.]
   ...
   [0. 0. 0. ... 0. 0. 0.]
   [0. 0. 0. ... 0. 0. 0.]
   [0. 0. 0. ... 0. 0. 0.]]

  [[0. 0. 0. ... 0. 0. 0.]
   [0. 0. 0. ... 0. 0. 0.]
   [0. 0. 0. ... 0. 0. 0.]
   ...
   [0. 0. 0. ... 0. 0. 0.]
   [0. 0. 0. ... 0. 0. 0.]
   [0. 0. 0. ... 0. 0. 0.]]

  [[0. 0. 0. ... 0. 0. 0.]
   [0. 0. 0. ... 0. 0. 0.]
   [0. 0. 0. ... 0. 0. 0.]
   ...
   [0. 0. 0. ... 0. 0. 0.]
   [0. 0. 0. ... 0. 0. 0.]
   [0. 0. 0. ... 0. 0. 0.]]

  ...

  [[0. 0. 0. ... 0. 0. 0.]
   [0. 0. 0. ... 0. 0. 0.]
   [0. 0. 0. ... 0. 0. 0.]
   ...
   [0. 0. 0. ... 0. 0. 0.]
   [0. 0. 0. ... 0. 0. 0.]
   [0. 0. 0. ... 0. 0. 0.]]

  [[0. 0. 0. ... 0. 0. 0.]
   [0. 0. 0. ... 0. 0. 0.]
   [0. 0. 0. ... 0. 0. 0.]
   ...
   [0. 0. 0. ... 0. 0. 0.]
   [0. 0. 0. ... 0. 0. 0.]
   [0. 0. 0. ... 0. 0. 0.]]

  [[0. 0. 0. ... 0. 0. 0.]
   [0. 0. 0. ... 0. 0. 0.]
   [0. 0. 0. ... 0. 0. 0.]
   ...
   [0. 0. 0. ... 0. 0. 0.]
   [0. 0. 0.

In [9]:
class ResnetBlockPre(tf.keras.Model):
    def __init__(self, kernel_size, filters):
        super(ResnetBlockPre, self).__init__(name='')
        filters1, filters2, filters3 = filters
        self.bn2a = tf.keras.layers.BatchNormalization()
        self.conv2a = tf.keras.layers.Conv2D(filters1, (1, 1))
        
        self.bn2b = tf.keras.layers.BatchNormalization()
        self.conv2b = tf.keras.layers.Conv2D(filters2, kernel_size, padding='same')
        
        self.bn2c = tf.keras.layers.BatchNormalization()
        self.conv2c = tf.keras.layers.Conv2D(filters3, (1, 1))
        

    def call(self, input_tensor, training=False):
        
        x = self.bn2a(input_tensor, training=training)
        x = tf.nn.relu(x)
        x = self.conv2a(x)

        
        x = self.bn2b(x, training=training)
        x = tf.nn.relu(x)
        x = self.conv2b(x)

       
        x = self.bn2c(x, training=training)
        x = self.conv2c(x)
        
        
        x += input_tensor
        return tf.nn.relu(x)

block = ResnetBlockPre(4, [64, 128, 256])
print(block(tf.zeros([4, 64, 128, 256])))
print([x.name for x in block.trainable_variables])

tf.Tensor(
[[[[0. 0. 0. ... 0. 0. 0.]
   [0. 0. 0. ... 0. 0. 0.]
   [0. 0. 0. ... 0. 0. 0.]
   ...
   [0. 0. 0. ... 0. 0. 0.]
   [0. 0. 0. ... 0. 0. 0.]
   [0. 0. 0. ... 0. 0. 0.]]

  [[0. 0. 0. ... 0. 0. 0.]
   [0. 0. 0. ... 0. 0. 0.]
   [0. 0. 0. ... 0. 0. 0.]
   ...
   [0. 0. 0. ... 0. 0. 0.]
   [0. 0. 0. ... 0. 0. 0.]
   [0. 0. 0. ... 0. 0. 0.]]

  [[0. 0. 0. ... 0. 0. 0.]
   [0. 0. 0. ... 0. 0. 0.]
   [0. 0. 0. ... 0. 0. 0.]
   ...
   [0. 0. 0. ... 0. 0. 0.]
   [0. 0. 0. ... 0. 0. 0.]
   [0. 0. 0. ... 0. 0. 0.]]

  ...

  [[0. 0. 0. ... 0. 0. 0.]
   [0. 0. 0. ... 0. 0. 0.]
   [0. 0. 0. ... 0. 0. 0.]
   ...
   [0. 0. 0. ... 0. 0. 0.]
   [0. 0. 0. ... 0. 0. 0.]
   [0. 0. 0. ... 0. 0. 0.]]

  [[0. 0. 0. ... 0. 0. 0.]
   [0. 0. 0. ... 0. 0. 0.]
   [0. 0. 0. ... 0. 0. 0.]
   ...
   [0. 0. 0. ... 0. 0. 0.]
   [0. 0. 0. ... 0. 0. 0.]
   [0. 0. 0. ... 0. 0. 0.]]

  [[0. 0. 0. ... 0. 0. 0.]
   [0. 0. 0. ... 0. 0. 0.]
   [0. 0. 0. ... 0. 0. 0.]
   ...
   [0. 0. 0. ... 0. 0. 0.]
   [0. 0. 0.

In [10]:
from __future__ import absolute_import, division, print_function, unicode_literals
from tensorflow.keras import layers

try:
  %tensorflow_version 2.x
except Exception:
  pass
import tensorflow as tf

tf.keras.backend.clear_session()  # For easy reset of notebook state.


class Sampling(layers.Layer):
  """Uses (z_mean, z_log_var) to sample z, the vector encoding a digit."""

  def call(self, inputs):
    z_mean, z_log_var = inputs
    batch = tf.shape(z_mean)[0]
    dim = tf.shape(z_mean)[1]
    epsilon = tf.keras.backend.random_normal(shape=(batch, dim))
    return z_mean + tf.exp(0.5 * z_log_var) * epsilon


class Encoder(layers.Layer):
  """Maps MNIST digits to a triplet (z_mean, z_log_var, z)."""

  def __init__(self,
               latent_dim=32,
               intermediate_dim=64,
               name='encoder',
               **kwargs):
    super(Encoder, self).__init__(name=name, **kwargs)
    self.dense_proj = layers.Dense(intermediate_dim, activation='relu')
    self.dense_mean = layers.Dense(latent_dim)
    self.dense_log_var = layers.Dense(latent_dim)
    self.sampling = Sampling()

  def call(self, inputs):
    x = self.dense_proj(inputs)
    z_mean = self.dense_mean(x)
    z_log_var = self.dense_log_var(x)
    z = self.sampling((z_mean, z_log_var))
    return z_mean, z_log_var, z


class Decoder(layers.Layer):
  """Converts z, the encoded digit vector, back into a readable digit."""

  def __init__(self,
               original_dim,
               intermediate_dim=64,
               name='decoder',
               **kwargs):
    super(Decoder, self).__init__(name=name, **kwargs)
    self.dense_proj = layers.Dense(intermediate_dim, activation='relu')
    self.dense_output = layers.Dense(original_dim, activation='sigmoid')

  def call(self, inputs):
    x = self.dense_proj(inputs)
    return self.dense_output(x)


class VariationalAutoEncoder(tf.keras.Model):
  """Combines the encoder and decoder into an end-to-end model for training."""

  def __init__(self,
               original_dim,
               intermediate_dim=64,
               latent_dim=32,
               name='autoencoder',
               **kwargs):
    super(VariationalAutoEncoder, self).__init__(name=name, **kwargs)
    self.original_dim = original_dim
    self.encoder = Encoder(latent_dim=latent_dim,
                           intermediate_dim=intermediate_dim)
    self.decoder = Decoder(original_dim, intermediate_dim=intermediate_dim)

  def call(self, inputs):
    z_mean, z_log_var, z = self.encoder(inputs)
    reconstructed = self.decoder(z)
    # Add KL divergence regularization loss.
    kl_loss = - 0.5 * tf.reduce_mean(
        z_log_var - tf.square(z_mean) - tf.exp(z_log_var) + 1)
    self.add_loss(kl_loss)
    return reconstructed


original_dim = 784
vae = VariationalAutoEncoder(original_dim, 64, 32)

optimizer = tf.keras.optimizers.Adam(learning_rate=1e-3)
mse_loss_fn = tf.keras.losses.MeanSquaredError()

loss_metric = tf.keras.metrics.Mean()

(x_train, _), _ = tf.keras.datasets.mnist.load_data()
x_train = x_train.reshape(60000, 784).astype('float32') / 255

train_dataset = tf.data.Dataset.from_tensor_slices(x_train)
train_dataset = train_dataset.shuffle(buffer_size=1024).batch(64)

# Iterate over epochs.
for epoch in range(10):
  print('Start of epoch %d' % (epoch,))

  # Iterate over the batches of the dataset.
  for step, x_batch_train in enumerate(train_dataset):
    with tf.GradientTape() as tape:
      reconstructed = vae(x_batch_train)
      # Compute reconstruction loss
      loss = mse_loss_fn(x_batch_train, reconstructed)
      loss += sum(vae.losses)  # Add KLD regularization loss

    grads = tape.gradient(loss, vae.trainable_weights)
    optimizer.apply_gradients(zip(grads, vae.trainable_weights))

    loss_metric(loss)

    if step % 100 == 0:
      print('step %s: mean loss = %s' % (step, loss_metric.result()))

vae.save('vae')

Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz
Start of epoch 0
step 0: mean loss = tf.Tensor(0.33800745, shape=(), dtype=float32)
step 100: mean loss = tf.Tensor(0.1249058, shape=(), dtype=float32)
step 200: mean loss = tf.Tensor(0.09881862, shape=(), dtype=float32)
step 300: mean loss = tf.Tensor(0.08892352, shape=(), dtype=float32)
step 400: mean loss = tf.Tensor(0.084020466, shape=(), dtype=float32)
step 500: mean loss = tf.Tensor(0.08074162, shape=(), dtype=float32)
step 600: mean loss = tf.Tensor(0.078637496, shape=(), dtype=float32)
step 700: mean loss = tf.Tensor(0.07705015, shape=(), dtype=float32)
step 800: mean loss = tf.Tensor(0.075907744, shape=(), dtype=float32)
step 900: mean loss = tf.Tensor(0.07489431, shape=(), dtype=float32)
Start of epoch 1
step 0: mean loss = tf.Tensor(0.074596465, shape=(), dtype=float32)
step 100: mean loss = tf.Tensor(0.07395722, shape=(), dtype=float32)
step 200: mean loss = tf.Tensor(0.0734612, shap



INFO:tensorflow:Assets written to: vae/assets


INFO:tensorflow:Assets written to: vae/assets
