In [None]:
# Training parameters

window = 672       # 1 week
stride = 4         # 1 hour
latent_dim = 10    # Autoencoder latent dimension
epochs = 150       # Number of epochs
batch_size = 8     # Batch size
M = 200            # Montecarlo
f = 5              # Filters' dimensions (conv layers)

In [None]:
# Building the model

from keras import backend as K
from tensorflow.keras import Input

input_shape = X_train.shape[1:]
output_shape = X_train.shape[1:]

###########
# ENCODER #
###########

encoder_input = tf.keras.Input(shape=input_shape)

x = tfkl.Conv1D(16, f, activation="relu", strides=1, padding="same")(encoder_input)
x = tfkl.MaxPool1D(pool_size=2, strides=2)(x)
x = tfkl.Conv1D(32, f, activation="relu", strides=1, padding="same")(x)
x = tfkl.MaxPool1D(pool_size=2, strides=2)(x)
x = tfkl.Conv1D(64, f, activation="relu", strides=1, padding="same")(x)
x = tfkl.MaxPool1D(pool_size=2, strides=2)(x)

x = tfkl.Flatten()(x)
x = tfkl.Dense(latent_dim, activation='linear')(x)

# Latent representation: mean + log of std.dev.
z_mu = tfkl.Dense(latent_dim, name='latent_mu')(x) # Mean
z_sigma = tfkl.Dense(latent_dim, name='latent_sigma')(x) # Std.Dev.

# Reparametrization trick
def sample_z1(args):
    z_mean, z_log_var = args
    eps = tf.keras.backend.random_normal(shape=(K.shape(z_mean)[0], K.int_shape(z_mean)[1]))
    return z_mean + tf.exp(alpha * z_log_var) * eps
    

# Sampling a vector from the latent distribution
z = tfkl.Lambda(sample_z1, output_shape=(latent_dim, ), name='z')([z_mu, z_sigma])

encoder = tfk.Model(encoder_input, [z_mu, z_sigma, z], name='encoder')
print(encoder.summary())

In [None]:
###########
# DECODER #
###########

decoder_input = Input(shape=(latent_dim, ), name='decoder_input')
x = tfkl.Dense(units=42*X_train.shape[2])(decoder_input)
x = tfkl.Reshape((42,X_train.shape[2]))(x)
x = tfkl.Conv1DTranspose(64,f,2, padding='same', activation='relu')(x)
x = tfkl.Conv1DTranspose(32,f,2, padding='same', activation='relu')(x)
x = tfkl.Conv1DTranspose(16,f,2, padding='same', activation='relu')(x)
decoder_output = tfkl.Conv1DTranspose(X_train.shape[2],f,2, padding='same', activation='linear')(x)

# Define and summarize decoder model
decoder = tfk.Model(decoder_input, decoder_output, name='decoder')
decoder.summary()

In [None]:
class VAE(tfk.Model):
    def __init__(self, encoder, decoder, **kwargs):
        super(VAE, self).__init__(**kwargs)
        self.encoder = encoder
        self.decoder = decoder
        self.total_loss_tracker = tfk.metrics.Mean(name="total_loss")
        self.reconstruction_loss_tracker = tfk.metrics.Mean(name="reconstruction_loss")
        self.kl_loss_tracker = tfk.metrics.Mean(name="kl_loss")

    @property
    def metrics(self):
        return [
            self.total_loss_tracker,
            self.reconstruction_loss_tracker,
            self.kl_loss_tracker,
        ]

    def train_step(self, y_true):
        with tf.GradientTape() as tape:
            
            encoder_mu, encoder_log_var, z = self.encoder(y_true)
            y_predict = self.decoder(z)
          
            reconstruction_loss = tf.reduce_mean(tf.reduce_sum(tfk.losses.mse(y_true, y_predict), axis=1))
            
            kl_loss = -0.5 * (1 + encoder_log_var - tf.square(encoder_mu) - tf.exp(encoder_log_var))
            kl_loss = tf.reduce_mean(tf.reduce_sum(kl_loss, axis=1))
            
            total_loss = reconstruction_loss + kl_loss
            
        grads = tape.gradient(total_loss, self.trainable_weights)
        self.optimizer.apply_gradients(zip(grads, self.trainable_weights))
        
        self.total_loss_tracker.update_state(total_loss)
        self.reconstruction_loss_tracker.update_state(reconstruction_loss)
        self.kl_loss_tracker.update_state(kl_loss)
        
        return {
            "loss": self.total_loss_tracker.result(),
            "reconstruction_loss": self.reconstruction_loss_tracker.result(),
            "kl_loss": self.kl_loss_tracker.result()
        }
    
    def test_step(self, data): #https://github.com/keras-team/keras-io/issues/38

        encoder_mu, encoder_log_var, z = self.encoder(data)
        y_predict = self.decoder(z)
          
        reconstruction_loss = tf.reduce_mean(tf.reduce_sum(tfk.losses.mse(data, y_predict), axis=1))
            
        kl_loss = -0.5 * (1 + encoder_log_var - tf.square(encoder_mu) - tf.exp(encoder_log_var))
        kl_loss = tf.reduce_mean(tf.reduce_sum(kl_loss, axis=1))
            
        total_loss = reconstruction_loss + kl_loss
        
        return {
            "loss": total_loss,
            "reconstruction_loss": reconstruction_loss,
            "kl_loss": kl_loss
        }

In [None]:
vae = VAE(encoder, decoder)
vae.compile(optimizer=tfk.optimizers.Adam())
vae.fit(x = X_train,
        validation_data = (X_val, None),
        epochs=epochs, 
        batch_size=batch_size)
vae.fit(x = X_train,
        validation_data = (X_val, None),
        epochs=epochs, 
        batch_size=batch_size,
        callbacks=[tfk.callbacks.EarlyStopping(monitor='val_loss', patience=15, restore_best_weights=True)])