# Variational Convolutional Autoencoder (VCAE)
A Variational Convolutional Autoencoder (VCAE) is a type of generative model and a specific kind of autoencoder that incorporates principles from variational inference to model the underlying distribution of data. VAEs are designed to generate new data samples that are similar to the training data by learning a probabilistic mapping between the input data and a latent space.
Key Concepts:
* **Probabilistic Approach:** VCAEs are generative models that assume the data is generated by some latent variables. They learn to model the distribution of the data and generate new samples from this distribution.
* **Latent Space Regularization:** VCAEs impose a prior distribution (usually Gaussian) on the latent space and regularize it using the Kullback-Leibler (KL) divergence, encouraging the latent variables to follow this distribution.
* **Reparameterization Trick:** To make the model differentiable and suitable for gradient-based optimization, the reparameterization trick is used, where the latent variables are expressed as a deterministic function of a parameterized mean and standard deviation, plus some noise.

In [1]:
import time
import numpy as np

import tensorflow as tf
from tensorflow.keras import layers, losses

ae_version = "v07"
ae_layers = 1
# set the dimensionality of the latent space to a plane for visualization later
latent_dim = 3

In [2]:
from platform import python_version
import sys
print('Python: ' + python_version()) # Python: 3.10.9
print('numpy: ' + np.__version__) # numpy: 1.23.5
print ('tensorflow: ' + sys.modules["tensorflow"].__version__) # tensorflow: 2.10.0

Python: 3.10.9
numpy: 1.23.5
tensorflow: 2.10.0


In [2]:

class CNN_AE(tf.keras.Model):
    def __init__(self, latent_dim):
        super(CNN_AE, self).__init__()
        self.latent_dim = latent_dim
        self.encoder = tf.keras.Sequential(
        [
            tf.keras.layers.InputLayer(input_shape=(182, 362, 1)),
            tf.keras.layers.Conv2D(
                filters=8, kernel_size=4, strides=2, activation='relu', padding='valid'),
            tf.keras.layers.Conv2D(
                filters=16, kernel_size=3, strides=2, activation='relu', padding='same'),
            tf.keras.layers.Conv2D(
                filters=32, kernel_size=3, strides=2, activation='relu', padding='same'),
            tf.keras.layers.Conv2D(
                filters=64, kernel_size=3, strides=2, activation='relu', padding='same'),
            tf.keras.layers.Conv2D(
                filters=64, kernel_size=3, strides=2, activation='relu', padding='same'),
            tf.keras.layers.Flatten(),
            tf.keras.layers.Dense(latent_dim + latent_dim),

        ]
        )

        self.decoder = tf.keras.Sequential(
        [
            tf.keras.layers.InputLayer(input_shape=(latent_dim,)),
            tf.keras.layers.Dense(units=6*12*64, activation=tf.nn.relu),
            tf.keras.layers.Reshape(target_shape=(6, 12, 64)),

            tf.keras.layers.Conv2DTranspose(
                filters=64, kernel_size=3, strides=2, padding='same',
                activation='relu'),
            tf.keras.layers.Cropping2D(cropping=((0, 0), (0, 1))),
            tf.keras.layers.Conv2DTranspose(
                filters=64, kernel_size=3, strides=2, padding='same',
                activation='relu'),
            tf.keras.layers.Cropping2D(cropping=((0, 1), (0, 1))),
            tf.keras.layers.Conv2DTranspose(
                filters=32, kernel_size=3, strides=2, padding='same',
                activation='relu'),
            tf.keras.layers.Cropping2D(cropping=((0, 1), (0, 0))),
            tf.keras.layers.Conv2DTranspose(
                filters=16, kernel_size=3, strides=2, padding='same',
                activation='relu'),
            tf.keras.layers.Conv2DTranspose(
                filters=8, kernel_size=3, strides=2, padding='same',
                activation='relu'),
            # No activation
            tf.keras.layers.Conv2D(
                filters=1, kernel_size=4, strides=1, padding='same'),
        ]
        )
    

    
    @tf.function
    def sample(self, eps=None):
        if eps is None:
            eps = tf.random.normal(shape=(100, self.latent_dim))
        return self.decode(eps)

    def encode(self, x):
        mean, logvar = tf.split(self.encoder(x), num_or_size_splits=2, axis=1)
        return mean, logvar

    def reparameterize(self, mean, logvar):
        eps = tf.random.normal(shape=mean.shape)
        return eps * tf.exp(logvar * .5) + mean

    def decode(self, z):
        logits = self.decoder(z)
        return logits


In [3]:
optimizer = tf.keras.optimizers.Adam()

def compute_loss(model, x):
    mean, logvar = model.encode(x)
    z = model.reparameterize(mean, logvar)
    x_logit = model.decode(z)
    #Reconstruction loss
    reconstruction_loss = tf.keras.backend.mean(tf.math.square(x_logit - x[:,1:181,1:361,:]), axis=[1, 2, 3])
    #KL div loss
    kl_loss = - 0.5 * tf.keras.backend.mean(1 + logvar - tf.keras.backend.square(mean) - tf.keras.backend.exp(logvar), axis=-1)
    elbo = tf.keras.backend.mean(reconstruction_loss + kl_loss)
    return elbo


In [4]:
@tf.function
def train_step(model, x, optimizer):
    """Executes one training step and returns the loss.

  This function computes the loss and gradients, and uses the latter to
  update the model's parameters.
  """
    with tf.GradientTape() as tape:
        loss = compute_loss(model, x)
    gradients = tape.gradient(loss, model.trainable_variables)
    optimizer.apply_gradients(zip(gradients, model.trainable_variables))


In [9]:
model = CNN_AE(latent_dim)

In [18]:
model.encoder.summary()
model.decoder.summary()

Model: "sequential"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 conv2d (Conv2D)             (None, 90, 180, 8)        136       
                                                                 
 conv2d_1 (Conv2D)           (None, 45, 90, 16)        1168      
                                                                 
 conv2d_2 (Conv2D)           (None, 23, 45, 32)        4640      
                                                                 
 conv2d_3 (Conv2D)           (None, 12, 23, 64)        18496     
                                                                 
 conv2d_4 (Conv2D)           (None, 6, 12, 64)         36928     
                                                                 
 flatten (Flatten)           (None, 4608)              0         
                                                                 
 dense (Dense)               (None, 6)                 2

In [5]:
datacube_precip = np.load("../data/WaterPrecip_datacube_CAE_single.npy")

In [6]:
# datacube_precip = datacube_precip/datacube_precip.max()

In [7]:
test_size = 1000
train_size = datacube_precip.shape[0] - test_size
batch_size = 32

In [8]:
train_dataset = (tf.data.Dataset.from_tensor_slices(datacube_precip)
                 .shuffle(train_size).batch(batch_size))
test_dataset = (tf.data.Dataset.from_tensor_slices(datacube_precip)
                .shuffle(test_size).batch(batch_size))

In [10]:
epochs = 100

In [11]:
for epoch in range(1, epochs + 1):
    start_time = time.time()
    for train_x in train_dataset:
        train_step(model, train_x, optimizer)
    end_time = time.time()

    loss = tf.keras.metrics.Mean()
    for test_x in test_dataset:
        loss(compute_loss(model, test_x))

    elbo = loss.result()
    print('Epoch: {}, Test set ELBO: {}, time elapse for current epoch: {}'
        .format(epoch, elbo, end_time - start_time))


Epoch: 1, Test set ELBO: 24.562021255493164, time elapse for current epoch: 91.60698962211609
Epoch: 2, Test set ELBO: 24.37751579284668, time elapse for current epoch: 94.14610838890076
Epoch: 3, Test set ELBO: 24.285104751586914, time elapse for current epoch: 95.56970191001892
Epoch: 4, Test set ELBO: 24.25105094909668, time elapse for current epoch: 99.46296954154968
Epoch: 5, Test set ELBO: 24.211315155029297, time elapse for current epoch: 109.63563394546509
Epoch: 6, Test set ELBO: 24.207128524780273, time elapse for current epoch: 105.78567337989807
Epoch: 7, Test set ELBO: 24.164278030395508, time elapse for current epoch: 104.64545631408691
Epoch: 8, Test set ELBO: 24.14128875732422, time elapse for current epoch: 94.29784941673279
Epoch: 9, Test set ELBO: 24.13054847717285, time elapse for current epoch: 105.45315074920654
Epoch: 10, Test set ELBO: 24.140180587768555, time elapse for current epoch: 99.57289528846741
Epoch: 11, Test set ELBO: 24.149242401123047, time elapse f

In [16]:
version = "01"
model.encoder.save_weights('./Weights/cnn_vae_v' + ae_version + '_encoder_weights_' + version)
model.decoder.save_weights('./Weights/cnn_vae_v' + ae_version + '_decoder_weights_' + version)

In [17]:
def predict(model, x):
    mean, logvar = model.encode(x)
    z = model.reparameterize(mean, logvar)
    x_logit = model.decode(z)
    return x_logit

In [18]:
test = predict(model,datacube_precip[:10,:,:,:] )

In [19]:
np.save("./Testing/temp_CVAE_v" +version+ ".npy", test)

In [31]:
def fun_test(model, x):
    mean, logvar = model.encode(x)
    z = model.reparameterize(mean, logvar)
    x_logit = model.decode(z)
    #Reconstruction loss
    reconstruction_loss = tf.keras.backend.mean(tf.math.square(x_logit - x[:,1:181,1:361,:]), axis=[1, 2, 3])
    #KL div loss
    kl_loss = - 0.5 * tf.keras.backend.mean(1 + logvar - tf.keras.backend.square(mean) - tf.keras.backend.exp(logvar), axis=-1)
    elbo = tf.keras.backend.mean(reconstruction_loss + kl_loss)
    test = tf.keras.backend.mean(tf.math.square(x_logit - x[:,1:181,1:361,:]), axis=[1, 2, 3])
    return test


In [34]:
test = fun_test(model,datacube_precip[:10,:,:,:] )
test

<tf.Tensor: shape=(10,), dtype=float32, numpy=
array([30.622057, 27.244976, 26.310926, 24.937326, 27.256582, 26.601446,
       28.952744, 29.623966, 23.67386 , 21.789776], dtype=float32)>