In [1]:
import sys
sys.path.append("..")
from VAE.vae import *

In [25]:
config_space = ConfigurationSpace(
                {'input_dropout': 0.1, 'intermediate_activation': "relu", 'intermediate_dimension': 10,
                'intermediate_layers': 2, 'latent_dimension': 1, 'learning_rate': 0.001,
                'original_dim': 200, 'solver': 'nadam'}
            )
config = config_space.get_default_configuration()

In [65]:
@keras.saving.register_keras_serializable(package="FIA_VAE")
class Sampling(layers.Layer):
        """
        Uses (z_mean, z_log_var) to sample z, the vector encoding a digit.
        """
        def call(self, inputs):
            z_mean, z_log_var = inputs
            z_mean_shape = ops.shape(z_mean)
            batch   = z_mean_shape[0]
            dim     = z_mean_shape[1]
            epsilon = keras.random.normal(shape=(batch,dim))
            return ops.multiply(ops.add(z_mean, ops.exp(0.5 * z_log_var)), epsilon)


@keras.saving.register_keras_serializable(package="FIA_VAE")
def kl_reconstruction_loss(y_true, y_pred, sigma, mu):
    """
    Loss function for Kullback-Leibler + Reconstruction loss

    Args:
        true: True values
        pred: Predicted values
    Returns:
        Loss = Kullback-Leibler + Reconstruction loss
    """
    reconstruction_loss = losses.mean_absolute_error(y_true, y_pred)
    kl_loss = -0.5 * ops.sum( 1.0 + sigma - ops.square(mu) - ops.exp(sigma) )
    loss = reconstruction_loss + kl_loss
    
    return {"reconstruction_loss": reconstruction_loss, "kl_loss": kl_loss, "loss": loss}

@keras.saving.register_keras_serializable(package="FIA_VAE")
class FIA_VAE(Model):
    """
    A variational autoencoder for flow injection analysis
    """
    def __init__(self, config:Union[Configuration, dict]):
        super().__init__()
        self.config             = config
        intermediate_dims       = [i for i in range(config["intermediate_layers"]) 
                                    if config["intermediate_dimension"] // 2**i > config["latent_dimension"]]
        activation_function     = get_activation_function( config["intermediate_activation"] )

        # Encoder (with sucessive halfing of intermediate dimension)
        self.dropout            = Dropout( config["input_dropout"] , name="dropout")        
        self.intermediate_enc   = Sequential ( [ Input(shape=(config["original_dim"],), name='encoder_input') ] +
                                               [ Dense( config["intermediate_dimension"] // 2**i,
                                                        activation=activation_function ) 
                                                for i in intermediate_dims] , name="encoder_intermediate")

        self.mu_encoder         = Dense( config["latent_dimension"], name='latent_mu' )
        self.sigma_encoder      = Dense( config["latent_dimension"], name='latent_sigma' )
        self.z_encoder          = Sampling(name="latent_reparametrization") 

        # Decoder
        self.decoder            = Sequential( [ Input(shape=(config["latent_dimension"], ), name='decoder_input') ] +
                                              [ Dense( config["intermediate_dimension"] // 2**i,
                                                       activation=activation_function )
                                               for i in reversed(intermediate_dims) ] +
                                              [ Dense(config["original_dim"], activation="relu") ] , name="Decoder")

        # Loss trackers
        self.reconstruction_loss    = metrics.Mean(name="reconstruction_loss")
        self.kl_loss                = metrics.Mean(name="kl_loss")
        self.loss_tracker           = metrics.Mean(name="loss")

        # Define optimizer
        self.optimizer = get_solver( config["solver"] )( config["learning_rate"] )

        # Compile VAE
        self.compile(optimizer=self.optimizer, loss=kl_reconstruction_loss)

    @property
    def metrics(self):
        return [self.loss_tracker, self.reconstruction_loss, self.kl_loss]
    
    def get_config(self):
        return {"config": dict(self.config)}

    def call(self, data, training=False):
        x = self.dropout(data, training=training)
        return self.decode(self.encode(x))

    def encode(self, data):
        x = self.intermediate_enc(data)
        self.mu = self.mu_encoder(x)
        self.sigma = self.sigma_encoder(x)
        self.z = self.z_encoder( [self.mu, self.sigma] )
        return self.z
    
    def encode_mu(self, data):
        x = self.intermediate_enc(data)
        return self.mu(x)
    
    def decode(self, x):
        return self.decoder(x)
    
    def train_step(self, data):
        x, y = data

        with tf.GradientTape() as tape:
            y_pred = self(x, training=True)  # Forward pass
            # Compute our own loss
            loss = kl_reconstruction_loss(y, y_pred, self.sigma, self.mu)

        # Compute gradients
        trainable_vars = self.trainable_variables
        gradients = tape.gradient(loss, trainable_vars)

        # Update weights
        self.optimizer.apply_gradients(zip(gradients, trainable_vars))

        self.reconstruction_loss.update_state(loss["reconstruction_loss"])
        self.kl_loss.update_state(loss["kl_loss"])
        self.loss_tracker.update_state( loss["loss"] )
        return loss

In [66]:
model = FIA_VAE(config)

In [67]:
model.summary()

## Saving

In [68]:
keras.saving.save_model(model, "../../runs/VAE/training/test.keras")

In [69]:
model.save_weights("../../runs/VAE/training/test.weights.h5")

## Loading

In [70]:
model2 = keras.saving.load_model("../../runs/VAE/training/test.keras")

In [71]:
model2.load_weights("../../runs/VAE/training/test.weights.h5")

## Testing

In [73]:
X = np.random.normal(0.0, 1.0, size=(5, 200))

In [74]:
model2.fit(X, X)

[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 1s/step - kl_loss: 3.9384 - loss: 4.7367 - reconstruction_loss: 0.7983


<keras.src.callbacks.history.History at 0x7ff29028dc10>