# Simple Variational Autoencoders (VAEs)

https://en.wikipedia.org/wiki/Variational_autoencoder

Note It only works with numpy<2 then tensorflow has to be <2.18 
!pip install tensorflow==2.17.0

In [13]:
%reset

Once deleted, variables cannot be recovered. Proceed (y/[n])?  y


In [None]:
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import numpy as np
from sklearn.model_selection import train_test_split
from scipy.stats import truncnorm

## The Model

In [2]:
class VAE(keras.Model):
    def __init__(self, input_dim, latent_dim, **kwargs):
        super(VAE, self).__init__(**kwargs)
        self.encoder = self.encoder(input_dim)
        self.decoder = self.decoder(input_dim, latent_dim)
    
    def encoder(self, input_dim):
        encoder_inputs = tf.keras.Input(shape=(input_dim,))
        x = layers.Dense(8, activation='relu')(encoder_inputs)
        x = layers.Dense(12, activation='relu')(x)
        z_mean = layers.Dense(latent_dim, name='z_mean')(x)
        z_log_var = layers.Dense(latent_dim, name='z_log_var')(x)
        encoder = keras.Model(encoder_inputs, [z_mean, z_log_var], name="encoder")
        return encoder

    # Define the decoder
    def decoder(self, input_dim, latent_dim):
        latent_inputs = tf.keras.Input(shape=(latent_dim,))
        x = layers.Dense(12, activation='relu')(latent_inputs)
        x = layers.Dense(8, activation='relu')(x)
        decoder_outputs = layers.Dense(input_dim, activation='sigmoid')(x)
        decoder = keras.Model(latent_inputs, decoder_outputs, name="decoder")
        return decoder
        
    def train_step(self, data):
        if isinstance(data, tuple):
            data = data[0]
        with tf.GradientTape() as tape:
            z_mean, z_log_var = self.encoder(data)
            z = self.sampling((z_mean, z_log_var))
            reconstruction = self.decoder(z)
            
            reconstruction_loss = tf.reduce_mean(
                keras.losses.binary_crossentropy(data, reconstruction)
            )
            reconstruction_loss *= data.shape[1]
            
            kl_loss = 1 + z_log_var - tf.square(z_mean) - tf.exp(z_log_var)
            kl_loss = -0.5 * tf.reduce_mean(kl_loss)
            
            # Add penalty for deviation from sum-to-1 on topics
            normalization_loss = tf.reduce_mean(
                tf.square(tf.reduce_sum(reconstruction[2:4], axis=1) - 1.0)
            )

            total_loss = reconstruction_loss + kl_loss + 1 * normalization_loss
            total_loss = reconstruction_loss + kl_loss
        grads = tape.gradient(total_loss, self.trainable_weights)
        self.optimizer.apply_gradients(zip(grads, self.trainable_weights))
        return {
            "loss": total_loss,
            "reconstruction_loss": reconstruction_loss,
            "kl_loss": kl_loss,
        }

    def call(self, data):
        z_mean, z_log_var = self.encoder(data)
        z = self.sampling((z_mean, z_log_var))
        return self.decoder(z)

    def sampling(self, args):
        z_mean, z_log_var = args
        batch = tf.shape(z_mean)[0]
        dim = tf.shape(z_mean)[1]
        epsilon = tf.keras.backend.random_normal(shape=(batch, dim))
        return z_mean + tf.exp(1 * z_log_var) * epsilon

## The implementation

### Train with fake generated data

Note that the data are vectors. Each agent will be described by a single vector such as:
- [frequency_of_posting, probability of engagement, topic_preference_music, topic_preference_sports]
- [0.5, 0.2, 0.7, 0.3]

In [3]:
input_dim = 4
latent_dim = 2

data = np.random.rand(1000, input_dim)
#generate biases
data[:, 3] += 0.5
data[:, 2] += 1.0
data[:, 1] *= 1
data[:, 0] *= 0.8

# Normalize only columns 3 and 4 so their sum equals 1
sum_cols_34 = np.sum(data[:, 2:4], axis=1, keepdims=True)
data[:, 2:4] = data[:, 2:4] / sum_cols_34
rounded_data = np.round(data, 2)

X_train, X_test = train_test_split(rounded_data, test_size=0.2, random_state=42)
print("Training data shape:", X_train.shape)
print("Test data shape:", X_test.shape)
print(X_train)

Training data shape: (800, 4)
Test data shape: (200, 4)
[[0.62 0.46 0.7  0.3 ]
 [0.22 0.66 0.58 0.42]
 [0.68 0.64 0.73 0.27]
 ...
 [0.32 0.66 0.73 0.27]
 [0.55 0.38 0.52 0.48]
 [0.42 0.91 0.58 0.42]]


In [4]:
vae = VAE(input_dim, latent_dim)
vae.compile(optimizer=keras.optimizers.Adam())
vae.fit(data, epochs=20, batch_size=128)

Epoch 1/20
[1m8/8[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 2ms/step - kl_loss: 0.2409 - loss: 3.2450 - reconstruction_loss: 3.0040
Epoch 2/20
[1m8/8[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 2ms/step - kl_loss: 0.1631 - loss: 3.1104 - reconstruction_loss: 2.9474
Epoch 3/20
[1m8/8[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 2ms/step - kl_loss: 0.1072 - loss: 2.9826 - reconstruction_loss: 2.8753
Epoch 4/20
[1m8/8[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 3ms/step - kl_loss: 0.0720 - loss: 2.9196 - reconstruction_loss: 2.8476
Epoch 5/20
[1m8/8[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 2ms/step - kl_loss: 0.0489 - loss: 2.8734 - reconstruction_loss: 2.8245
Epoch 6/20
[1m8/8[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 2ms/step - kl_loss: 0.0335 - loss: 2.8357 - reconstruction_loss: 2.8022
Epoch 7/20
[1m8/8[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 2ms/step - kl_loss: 0.0243 - loss: 2.8098 - reconstructio

<keras.src.callbacks.history.History at 0x7f881365b7f0>

### Generate New Agents

In [5]:
# Sample 10 random vectors
z_sample = np.random.normal(scale = 2, size=(10, latent_dim))
# Decode the vectors
x_decoded = vae.decoder.predict(z_sample)
print(np.round(x_decoded, 2))

[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 36ms/step
[[0.45 0.5  0.51 0.59]
 [0.29 0.46 0.78 0.46]
 [0.39 0.52 0.59 0.47]
 [0.38 0.51 0.6  0.5 ]
 [0.4  0.5  0.59 0.5 ]
 [0.37 0.5  0.61 0.55]
 [0.28 0.58 0.61 0.58]
 [0.29 0.5  0.71 0.48]
 [0.42 0.51 0.57 0.46]
 [0.42 0.49 0.57 0.55]]
