[Reference](https://towardsdatascience.com/vae-for-time-series-1dc0fef4bffa)

In [3]:
import tensorflow as tf
import numpy as np
from tensorflow.keras import layers, models

class Sampling(layers.Layer):
    """Uses (z_mean, z_log_var) to sample z"""
    def call(self, inputs):
        z_mean, z_log_var = inputs
        batch = tf.shape(z_mean)[0]
        rank = len(z_mean.shape)

        if rank == 2:  # 2D case
            dim = tf.shape(z_mean)[1]
            epsilon_shape = (batch, dim)
        elif rank == 1:  # 1D case
            epsilon_shape = (batch,)
        elif rank == 3:  # 3D case
            dim1 = tf.shape(z_mean)[1]
            dim2 = tf.shape(z_mean)[2]
            epsilon_shape = (batch, dim1, dim2)
        else:
            raise ValueError("z_mean and z_log_var must be 1D, 2D, or 3D tensors")

        epsilon = tf.keras.backend.random_normal(shape=epsilon_shape)
        return z_mean + tf.exp(0.5 * z_log_var) * epsilon


def kl_divergence_sum(mu1 = 0.0, log_var1 = 0.0, mu2 = 0.0, log_var2 = 0.0):
    var1 = tf.exp(log_var1)
    var2 = tf.exp(log_var2)
    axis0 = 0.5*tf.reduce_mean(log_var2 - log_var1 + (var1 + (mu1 - mu2)**2) / var2 - 1, axis=0)
    return tf.reduce_sum(axis0)



def log_lik_normal_sum(x, mu=0.0, log_var = 0.0):
    axis0 = -0.5*(tf.math.log(2*np.pi) + tf.reduce_mean(log_var + (x - mu) ** 2 * tf.exp(-log_var), axis=0))
    return tf.reduce_sum(axis0)

In [4]:
inputs = layers.Input(shape=(None,)) # (N, 96*k)
x = layers.Reshape((-1, 1))(inputs)  # (N, 96*k, 1)

# Conv1D parameters: filters, kernel_size, strides, padding
x = layers.Conv1D(40, 5, 3, 'same', activation='relu')(x) # (N, 32*k, 40)
x = layers.Conv1D(40, 3, 2, 'same', activation='relu')(x) # (N, 16*k, 40)
x = layers.Conv1D(40, 3, 2, 'same', activation='relu')(x) # (N, 8*k, 40)
x = layers.Conv1D(40, 3, 2, 'same', activation='relu')(x) # (N, 4*k, 40)
x = layers.Conv1D(40, 3, 2, 'same', activation='relu')(x) # (N, 2*k, 40)
x = layers.Conv1D(20, 3, 2, 'same')(x) # (N, k, 20)

z_mean = x[: ,:, :10]   # (N, k, 10)
z_log_var = x[:, :, 10:] # (N, k, 10)
z = Sampling()([z_mean, z_log_var]) # custom layer sampling from gaussian

encoder = models.Model(inputs, [z_mean, z_log_var, z], name='encoder')

In [5]:
# input shape: (batch_size, time_length/96, latent_features)
inputs = layers.Input(shape=(None, 10)) # (N, k, 10)

# Conv1DTranspose parameters: filters, kernel_size, strides, padding
x = layers.Conv1DTranspose(40, 3, 2, 'same', activation='relu')(inputs) # (N, 2*k, 40)
x = layers.Conv1DTranspose(40, 3, 2, 'same', activation='relu')(x) # (N, 4*k, 40)
x = layers.Conv1DTranspose(40, 3, 2, 'same', activation='relu')(x) # (N, 8*k, 40)
x = layers.Conv1DTranspose(40, 3, 2, 'same', activation='relu')(x) # (N, 16*k, 40)
x = layers.Conv1DTranspose(40, 3, 2, 'same', activation='relu')(x) # (N, 32*k, 40)
x = layers.Conv1DTranspose(1,  5, 3, 'same')(x) # (N, 96*k, 1)

outputs = layers.Reshape((-1,))(x) # (N, 96*k)

decoder = models.Model(inputs, outputs, name='decoder')

In [6]:
class VAE(models.Model):
    def __init__(self, encoder, decoder, prior, **kwargs):
        super(VAE, self).__init__(**kwargs)
        self.encoder = encoder
        self.decoder = decoder
        self.prior = prior
        self.noise_log_var = self.add_weight(name='var', shape=(1,), initializer='zeros', trainable=True)

    @tf.function
    def vae_loss(self, data):
        values, seasonal = data
        z_mean, z_log_var, z = self.encoder(values)
        reconstructed = self.decoder(z)
        reconstruction_loss = -log_lik_normal_sum(values, reconstructed, self.noise_log_var)/INPUT_SIZE
        seasonal_z_mean, seasonal_z_log_var, _ = self.prior(seasonal)
        kl_loss_z = kl_divergence_sum(z_mean, z_log_var, seasonal_z_mean, seasonal_z_log_var)/INPUT_SIZE
        return reconstruction_loss, kl_loss_z

    def train_step(self, data):
        with tf.GradientTape() as tape:
            reconstruction_loss, kl_loss_z = self.vae_loss(data)
            total_loss = reconstruction_loss + kl_loss_z

        gradients = tape.gradient(total_loss, self.trainable_variables)
        self.optimizer.apply_gradients(zip(gradients, self.trainable_variables))

        return {'loss': total_loss}