# Build format VAE network
Test VAE build before including it in larger training framework

### Imports
Install tensorflow:
``%pip install tensorflow``

In [2]:
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import matplotlib.pyplot as plt
tf.random.set_seed(2) 

### Create sampling layer

In [49]:
class Sampling(layers.Layer):
    """Uses (z_mean, z_log_var) to sample z, the vector encoding a digit."""

    def call(self, inputs):
        z_mean, z_log_var = inputs
        batch = tf.shape(z_mean)[0]
        dim = tf.shape(z_mean)[1]
        epsilon = tf.keras.backend.random_normal(shape=(batch, dim))
        return z_mean + tf.exp(0.5 * z_log_var) * epsilon


### Build encoder


In [66]:
# def make_encoder():    
#     #filter_1 = 3 #32
#     #filter_2 = 2 #64
#     #kernel_size = 5 #3
#     dense_size = 16; 
#     encoder_inputs = keras.Input(shape=(20, 20,3)) # enter cut-out shape (20,20,3)
#     x = layers.Conv2D(filter_1, kernel_size, activation="relu", strides=2, padding="same")(encoder_inputs)
#     x = layers.Conv2D(filter_2, kernel_size, activation="relu", strides=2, padding="same")(x)
#     x = layers.Flatten()(x) # to vector
#     x = layers.Dense(dense_size, activation="relu")(x) # linked layer
#     z_mean = layers.Dense(latent_dim, name="z_mean")(x)
#     z_log_var = layers.Dense(latent_dim, name="z_log_var")(x)
#     z = Sampling()([z_mean, z_log_var])
#     encoder = keras.Model(encoder_inputs, [z_mean, z_log_var, z], name="encoder")
#     encoder.summary()
#     return encoder_inputs, encoder, z , z_mean, z_log_var


def make_encoder(cutout_size,n_bands,
                 filter_1,filter_2,
                 kernel_size_1,kernel_size_2,
                 dense_size,latent_dim):    
    encoder_inputs = keras.Input(shape=(cutout_size, cutout_size,n_bands)) # enter cut-out shape (20,20,3)
    x = layers.Conv2D(filter_1, kernel_size_1, activation="relu", strides=2, padding="same")(encoder_inputs)
    x = layers.Conv2D(filter_2, kernel_size_2, activation="relu", strides=2, padding="same")(x)
    x = layers.Conv2D(filter_2, kernel_size_2, activation="relu", strides=1, padding="same")(x)
    x = layers.Flatten()(x) # to vector
    x = layers.Dense(dense_size, activation="relu")(x) # linked layer
    z_mean = layers.Dense(latent_dim, name="z_mean")(x)
    z_log_var = layers.Dense(latent_dim, name="z_log_var")(x)
    z = Sampling()([z_mean, z_log_var])
    encoder = keras.Model(encoder_inputs, [z_mean, z_log_var, z], name="encoder")
    encoder.summary()
    
    return encoder_inputs, encoder, z , z_mean, z_log_var



### Build decoder

In [67]:
# def make_decoder(): 
#     latent_inputs = keras.Input(shape=(latent_dim,))
#     x = layers.Dense(5 * 5 * filter_2, activation="relu")(latent_inputs) # -- shape corresponding to encoder
#     x = layers.Reshape((5, 5, filter_2))(x)
#     x = layers.Conv2DTranspose(filter_2, kernel_size, activation="relu", strides=2, padding="same")(x)
#     x = layers.Conv2DTranspose(filter_1, kernel_size, activation="relu", strides=2, padding="same")(x)
#     decoder_outputs = layers.Conv2DTranspose(3, 3, activation="sigmoid", padding="same")(x) # (1,3) or (3,3)
#     decoder = keras.Model(latent_inputs, decoder_outputs, name="decoder")
#     decoder.summary()
#     return decoder

def make_decoder(latent_dim,encoder,
                 filter_1,filter_2,
                 kernel_size_1,kernel_size_2): 
    latent_inputs = keras.Input(shape=(latent_dim,))
    # get shape of last layer in encoder before flattning
    flat_layer = [layer for layer in encoder.layers if 'flatten' in layer.name] 
    flat_input = flat_layer[-1].input_shape # input shape of flat layer to be used to reconstruct; (None, 5,5,16) or smth
    # x = layers.Dense(5 * 5 * filter_2, activation="relu")(latent_inputs) # -- shape corresponding to encoder
    # x = layers.Reshape((5, 5, filter_2))(x)
    x = layers.Dense(flat_input[1] * flat_input[2] * filter_2, activation="relu")(latent_inputs) # -- shape corresponding to encoder
    x = layers.Reshape((flat_input[1], flat_input[2], filter_2))(x)
    x = layers.Conv2DTranspose(filter_2, kernel_size_2, activation="relu", strides=1, padding="same")(x)
    x = layers.Conv2DTranspose(filter_2, kernel_size_2, activation="relu", strides=2, padding="same")(x)
    x = layers.Conv2DTranspose(filter_1, kernel_size_1, activation="relu", strides=2, padding="same")(x)
    decoder_outputs = layers.Conv2DTranspose(3, 3, activation="sigmoid", padding="same")(x) # (1,3) or (3,3)
    decoder = keras.Model(latent_inputs, decoder_outputs, name="decoder")
    decoder.summary()
    return decoder


## Define VAE as model
With custom train_step

Update: instead of defining VAE as class, use function-wise definition

In [68]:
# Define VAE model.
def make_vae(encoder_inputs, z, z_mean, z_log_var, decoder,alpha=5):
    outputs = decoder(z)
    vae = tf.keras.Model(inputs=encoder_inputs, outputs=outputs, name="vae")

    # Add KL divergence regularization loss.
    reconstruction = decoder(z)
    reconstruction_loss = tf.reduce_mean(
        tf.reduce_sum(
            keras.losses.binary_crossentropy(encoder_inputs, reconstruction), axis=(1, 2)
                )
            )
    kl_loss = -0.5 * (1 + z_log_var - tf.square(z_mean) - tf.exp(z_log_var))
    kl_loss = tf.reduce_mean(tf.reduce_sum(kl_loss, axis=1))

#     alpha = 5

    # Play witht different alpha: -2, 0 , 1 ,2 ; 0.2 ; -0.5 ; 50
    # alpha = 10.; 
    total_loss = reconstruction_loss +  alpha * kl_loss # alpha is custom
    vae.add_loss(total_loss)
    return vae
    

## Test build

In [70]:
filter1 = 32 
filter2 = 16
kernelSize1 = 5
kernelSize2=kernelSize1
denseSize = 16

sizeCutOut=300
bands = 3, 2, 1 

latentDim = 4

encoder_inputs, encoder, z, z_mean, z_log_var = make_encoder(
                            sizeCutOut,len(bands),
                            filter1,filter2,
                            kernelSize1,kernelSize2,
                            denseSize,latentDim)


# for layer in encoder.layers:
    # print(layer.name)
    # print(layer.input_shape)
    
# layer_flatten_shape = encode
# print(encoder.layers)
flat_layer = [layer for layer in encoder.layers if 'flatten' in layer.name] # want flat_layer input_shape to usee in decoder
dense_layer = [layer for layer in encoder.layers if 'dense' in layer.name]
print(flat_layer[-1].input_shape,dense_layer[-1].input_shape)
# print(encoder_inputs)
decoder = make_decoder(latentDim,encoder,
                       filter1,filter2,
                       kernelSize1,kernelSize2)
# vae = make_vae(encoder_inputs, z, z_mean, z_log_var, decoder)
# # vae.compile(optimizer=keras.optimizers.Adam())



Model: "encoder"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_59 (InputLayer)          [(None, 300, 300, 3  0           []                               
                                )]                                                                
                                                                                                  
 conv2d_91 (Conv2D)             (None, 150, 150, 32  2432        ['input_59[0][0]']               
                                )                                                                 
                                                                                                  
 conv2d_92 (Conv2D)             (None, 75, 75, 16)   12816       ['conv2d_91[0][0]']              
                                                                                            

In [56]:
# test 0 works:
# filter1 = 64 
# filter2 = 16 
# kernelSize = 5
# denseSize = 16


# # this did not work (out of memory)
# filter1 = 128 
# filter2 = 32 
# kernelSize1 = 8
# kernelSize2= 5
# denseSize = 16

filter1 = 64 
filter2 = 32 
kernelSize1 = 5
kernelSize2= 5
denseSize = 16

sizeCutOut=20
bands = 3, 2, 1 

latentDim = 4

encoder_inputs, encoder, z, z_mean, z_log_var = make_encoder(
                            sizeCutOut,len(bands),
                            filter1,filter2,
                            kernelSize1,kernelSize2,
                            denseSize,latentDim)

decoder = make_decoder(latentDim,filter1,filter2,kernelSize1,kernelSize2)
vae = make_vae(encoder_inputs, z, z_mean, z_log_var, decoder)
vae.compile(optimizer=keras.optimizers.Adam())

Model: "encoder"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_42 (InputLayer)          [(None, 20, 20, 3)]  0           []                               
                                                                                                  
 conv2d_47 (Conv2D)             (None, 10, 10, 64)   4864        ['input_42[0][0]']               
                                                                                                  
 conv2d_48 (Conv2D)             (None, 5, 5, 32)     51232       ['conv2d_47[0][0]']              
                                                                                                  
 conv2d_49 (Conv2D)             (None, 5, 5, 32)     25632       ['conv2d_48[0][0]']              
                                                                                            

# number of training steps in 1 epoch 
model.fit( steps_per_epoch=None):  When training with input tensors such as TensorFlow data tensors, the default None is equal to the number of samples in your dataset divided by the batch size, or 1 if that cannot be determined

So I have  204,259 samples (window size =100 )
batch_size = 32 (default; do not set it yourself when using tf.datasets
Number of steps: 2766

In [73]:
204259 / 32
204259 / 2766

73.84634851771511