# Variational Autoencoder (VAE):

In [None]:
from tensorflow.keras import Input, Model
from tensorflow.keras.layers import Dense, PReLU

In [None]:
class PredictionCallback(Callback):
    def __init__(self, cp_test_indiv, prediction_frequency=2):      
        super().__init__()
        
        self.P_test = cp_test_indiv
        self.prediction_frequency = prediction_frequency

        self.epoch_predictions = []
        
    def on_epoch_end(self, epoch, logs=None):
        if (epoch + 1) % self.prediction_frequency == 0:
            predictions = self.model.predict(self.P_test)
            self.epoch_predictions.append(predictions)

def build_vae(input_shape, enc_layers, dec_layers, latent_dim, regularization, learn_rate, loss):
    # Define the encoder layers
    encoder_inputs = Input(shape=input_shape[0])
    encoded = Dense(enc_layers[0])(encoder_inputs)
    encoded = PReLU()(encoded)
    for i in range(1, len(enc_layers)):
        encoded = Dense(enc_layers[i])(encoded)
        encoded = PReLU()(encoded)
        
    # Define the mean and variance of the latent variables
    z_mean = Dense(latent_dim, name='z_mean')(encoded)
    z_log_var = Dense(latent_dim, name='z_log_var')(encoded)

    # Define the sampling function to generate latent variables
    def sampling(args):
        z_mean, z_log_var = args
        epsilon = K.random_normal(shape=(K.shape(z_mean)[0], latent_dim), mean=0., stddev=1.)
        return z_mean + K.exp(0.5 * z_log_var) * epsilon
    
    # Use the sampling function to generate latent variables
    z = Lambda(sampling, name='z')([z_mean, z_log_var])
    
    # Define the decoder layers
    decoded = Dense(dec_layers[0])(z)
    decoded = PReLU()(decoded)
    for i in range(1, len(dec_layers)):
        decoded = Dense(dec_layers[i])(decoded)
        decoded = PReLU()(decoded)
    decoder_outputs = Dense(input_shape[0])(decoded)
    decoder_outputs = Activation('linear')(decoder_outputs)

    # Define the VAE model
    vae = Model(encoder_inputs, decoder_outputs, name='vae')

    # Define the loss function for VAE
    reconstruction_loss = tf.reduce_mean(tf.square(decoder_outputs - encoder_inputs))
    kl_loss = -0.5 * tf.reduce_mean(1 + z_log_var - tf.square(z_mean) - tf.exp(z_log_var))
    total_loss = reconstruction_loss + regularization * kl_loss

    # Compile the model
    vae.add_loss(total_loss)
    adam = optimizers.Adam(learning_rate=learn_rate)
    vae.compile(optimizer=adam)

    # Define the encoder model
    encoder = Model(encoder_inputs, z_mean, name='encoder')

    return vae, encoder

print('Done!')


In [None]:
# Set the hyperparameters
input_shape = (15,)
enc_layers = [12,8]
dec_layers = [12]
latent_dim = 10
regularization = 1e-3
learn_rate = 2e-5 # 1e-5
loss = 'mean_squared_error'

# Define the custom callback
prediction_frequency = 5
prediction_callback_3_encoder = PredictionCallback(cp_test_indiv, prediction_frequency)

# Build the model
vae, encoder = build_vae(input_shape, enc_layers, dec_layers, latent_dim, regularization, learn_rate, loss)

# Print the model summary
vae.summary()

# Train the model with the custom callback
vae.fit(cp_train_indiv, cp_train_phaseAve, epochs=5000, batch_size=100, verbose = 2, shuffle=False, 
          validation_split=0.0, callbacks=[prediction_callback_3_encoder])

In [None]:
# Compute the MSE between the true test labels and the predicted labels
from sklearn.metrics import mean_squared_error as mse
mse_pred_per_epoch = {}

prediction_callback_reg = prediction_callback_3_encoder
mse_pred_per_epoch[regularization] = []
for predictions in prediction_callback_reg.epoch_predictions:
    mse_temp = mse(cp_test_phaseAve, predictions)
    mse_pred_per_epoch[regularization].append(mse_temp)

fig,ax=plt.subplots(figsize=(6,3))
for i, regul in enumerate([0.001]):
    ax.semilogy(mse_pred_per_epoch[regul], label=f'L2 regularization: {regul}')
    ax.legend()
# ax.set_ylim([0.04,1])
# ax.set_xticks([0, 20, 40, 60, 80, 100, 150-1])
# ax.set_xticklabels(['0','40','80','120','160','200', '300'])
ax.set_xmargin(0)
ax.set_ylabel('MSE (log10)')
ax.set_xlabel('Epochs')
ax.grid(color='lightgray', which='both')

In [None]:
fig,ax=plt.subplots(figsize=(6,3))

port_show = 0
min_val = min(mse_pred_per_epoch[0.001])
min_index = mse_pred_per_epoch[0.001].index(min_val)
print(min_index)

# ax.plot(x_ticks, cp_test_indiv[:,port_show],'lightgray', label='cp_test_indiv')
ax.plot(x_ticks, cp_test_indiv[0:len_period,port_show],'lightgray', label='cp_test_indiv')
ax.plot(x_ticks, cp_test_phaseAve[:,port_show],'b', label='cp_test_phaseAve')
ax.plot(x_ticks, prediction_callback_3_encoder.epoch_predictions[min_index][:,port_show],'r', label='cp_autoencoder')
ax.grid(color='lightgray')
ax.set_xlabel('$t^*$')
ax.set_ylabel('$C_p$')
ax.legend()
ax.set_xmargin(0)
ax.set_title('case A | 2nd cycle | $C_p$' + r'${}_{{{}}}$'.format('', port_show+1))
# ax.set_ylim([-4,0])

In [None]:
aaa = vae.predict(cp_train_indiv)
fig,ax=plt.subplots(figsize=(6,3))
port_show = 0
ax.plot(cp_train_indiv[:,port_show],'lightgray', label='cp_train_indiv')
ax.plot(cp_train_phaseAve[:,port_show],'r', label='cp_train_phaseAve')
ax.plot(aaa[:,port_show],'b', alpha=0.5, label='cp_train_reconstruct')
ax.set_xmargin(0)
ax.grid(color='lightgray')
ax.set_ylim([-10,5])
# ax.set_xlim(0,len_period)
ax.legend()
ax.set_ylabel('$C_p$')