# Convolutional 2D VAE

Import necessary packages

In [None]:
import numpy as np
from scipy.spatial.distance import pdist, squareform
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import matplotlib.pyplot as plt
import pandas as pd
import seaborn as sns
from scipy import linalg as la
from keras import regularizers
from keras import backend as K
from keras.layers import (
    Conv2D,
    Conv2DTranspose,
    Input,
    Flatten,
    Dense,
    Lambda,
    Reshape,
)

## Data Input and Pre Processing

Define core features of the dataset

In [None]:
dim = 2
numpart = 30
latent_dim = 50
box_size = 10


Import and reshape data

In [None]:
fname = r"\\wsl$\Ubuntu\home\alepitte\ale\uni\variational-autoencoders\mc-sampling\dump\test_30_10_1_0.75_0.2_10000_2500_5_x.txt"
# fname = '/Users/lorenzobarbiero/Documents/GitHub/variational-autoencoders/mc-sampling/good-runs/test_30_10_1_0.75_0.2_10000_2500_5_x.txt'

with open(fname) as f:
    df0 = pd.DataFrame(
        [
            [float(i.strip()) for i in s.split(" ") if i != ""]
            for s in f.read().split("\n")
            if s != ""
        ]
    )

fname1 = r"\\wsl$\Ubuntu\home\alepitte\ale\uni\variational-autoencoders\mc-sampling\dump\test_30_10_0.1_0.75_0.2_10000_2500_0_x.txt"

with open(fname1) as f:
    df1 = pd.DataFrame(
        [
            [float(i.strip()) for i in s.split(" ") if i != ""]
            for s in f.read().split("\n")
            if s != ""
        ]
    )

data = pd.concat([df0, df1])

labels = np.array([0] * len(df0) + [1] * len(df1))

In [None]:
np.array(data)


In [None]:
vcs = np.array(data).reshape((-1, numpart, dim)) / (
    box_size * np.sqrt(dim)
)
print(vcs.shape, vcs[1])

Sort by distance from origin

In [None]:
sortmode = 1
if sortmode == 1:
    # Calculate distances from (0, 0)
    distances = np.sqrt(vcs[:, :, 0] ** 2 + vcs[:, :, 1] ** 2)
    idx = np.argsort(distances, axis=1)
    sorted_vcs = np.empty_like(vcs)
    for i in range(len(vcs)):
        sorted_vcs[i] = vcs[i][idx[i]]

    print(sorted_vcs.shape, "\n", sorted_vcs[1])

### Compute distance matrices

In [None]:
dm = np.zeros((len(vcs), numpart, numpart))

for i in range(len(vcs)):
    dm[i] = squareform(
        pdist(sorted_vcs[i], metric="euclidean"), force="no", checks=True
    )

print(dm.shape, "\n", dm[1])

Split in training and test set

In [None]:
labels


In [None]:
train_perc = 0.8

m = sorted_vcs.shape[0]
print(m)
permutation = np.random.permutation(m)  # random permurtation

sorted_vcs = sorted_vcs[permutation]
labels = labels[permutation]
dm = dm[permutation]

# m_training needs to be the number of samples in the training set
m_training = int(m * train_perc)

# m_test needs to be the number of samples in the test set
m_test = m - m_training

trainset_conf = sorted_vcs[:m_training]
testset_conf = sorted_vcs[m_training:]

trainset_mat = dm[:m_training]
testset_mat = dm[m_training:]


# check if there are at least 10 elements from class -1 and 1
c1 = np.count_nonzero(labels[:m_training] == 1)
c0 = np.count_nonzero(labels[:m_training] == 0)

print(
    "number of 1 in training set:",
    c1,
    "\n",
    "number of 0 in training set:",
    c0,
)

while c1 < 1000 or c0 < 1000:  # permute until the condition is reached
    permutation = np.random.permutation(m)  # random permurtation

    sorted_vcs = sorted_vcs[permutation]
    labels = labels[permutation]
    dm = dm[permutation]

    trainset_conf = sorted_vcs[:m_training]
    testset_conf = sorted_vcs[m_training:]

    trainset_mat = dm[:m_training]
    testset_mat = dm[m_training:]

    c1 = np.count_nonzero(labels[:m_training] == 1)
    c0 = np.count_nonzero(labels[:m_training] == 0)


print("Shape of training set: " + str(trainset_conf.shape))
print("Shape of test set: " + str(testset_mat.shape))

## Variational Auto Encoder (Model 1)

### Sampling class

In [None]:
class Sampling(layers.Layer):
    """Uses (z_mean, z_log_var) to sample z, the vector encoding a digit."""

    def call(self, inputs):
        z_mean, z_log_var = inputs
        batch = tf.shape(z_mean)[0]
        dim = tf.shape(z_mean)[1]
        epsilon = tf.keras.backend.random_normal(shape=(batch, dim))
        return z_mean + tf.exp(0.5 * z_log_var) * epsilon


### Encoder

In [None]:
encoder_inputs = keras.Input(shape=(numpart, numpart, 1))
x = Conv2D(32, 3, padding="same", activation="relu")(encoder_inputs)
# x = Conv2D(128, 3, padding='same', activation='relu')(x)
x = Conv2D(64, 3, padding="same", activation="relu")(x)
conv_shape = K.int_shape(x)  # Shape of conv to be provided to decoder
x = Flatten()(x)  # Flatten


z_mean = layers.Dense(latent_dim, name="z_mean")(x)
z_log_var = layers.Dense(latent_dim, name="z_log_var")(x)
z = Sampling()([z_mean, z_log_var])
encoder = keras.Model(
    encoder_inputs, [z_mean, z_log_var, z], name="encoder"
)
encoder.summary()

### Decoder

In [None]:
decoder_input = Input(shape=(latent_dim,), name="decoder_input")
x = Dense(
    conv_shape[1] * conv_shape[2] * conv_shape[3], activation="relu"
)(decoder_input)
x = Reshape((conv_shape[1], conv_shape[2], conv_shape[3]))(x)
x = Conv2DTranspose(64, 3, padding="same", activation="relu")(x)
# x = Conv2DTranspose(128, 3, padding='same', activation='relu')(x)
x = Conv2DTranspose(32, 3, padding="same", activation="relu")(x)
decoder_outputs = Conv2DTranspose(
    1, 3, padding="same", activation="sigmoid", name="decoder_output"
)(x)

decoder = keras.Model(decoder_input, decoder_outputs, name="decoder")
decoder.summary()

### VAE Class

In [None]:
class VAE(keras.Model):
    def __init__(self, encoder, decoder, **kwargs):
        super().__init__(**kwargs)
        self.encoder = encoder
        self.decoder = decoder
        self.total_loss_tracker = keras.metrics.Mean(name="total_loss")
        self.reconstruction_loss_tracker = keras.metrics.Mean(
            name="reconstruction_loss"
        )
        self.kl_loss_tracker = keras.metrics.Mean(name="kl_loss")

    @property
    def metrics(self):
        return [
            self.total_loss_tracker,
            self.reconstruction_loss_tracker,
            self.kl_loss_tracker,
        ]

    def train_step(self, data):
        with tf.GradientTape() as tape:
            z_mean, z_log_var, z = self.encoder(data)
            reconstruction = self.decoder(z)
            size = reconstruction.shape[
                1:
            ]  # Extract dimensions excluding the first 'None' dimension
            noise = np.random.normal(0, 0.1, size=size)
            reconstruction = reconstruction + noise

            # Reshape data to match decoder output shape
            data = tf.expand_dims(data, axis=-1)

            reconstruction_loss = tf.reduce_mean(
                keras.losses.mean_squared_error(data, reconstruction)
            )
            kl_loss = -0.5 * (
                1 + z_log_var - tf.square(z_mean) - tf.exp(z_log_var)
            )
            kl_loss = tf.reduce_mean(kl_loss)
            total_loss = reconstruction_loss + reg_lambda * kl_loss
        grads = tape.gradient(total_loss, self.trainable_weights)
        self.optimizer.apply_gradients(zip(grads, self.trainable_weights))
        self.total_loss_tracker.update_state(total_loss)
        self.reconstruction_loss_tracker.update_state(reconstruction_loss)
        self.kl_loss_tracker.update_state(kl_loss)
        return {
            "loss": self.total_loss_tracker.result(),
            "reconstruction_loss": self.reconstruction_loss_tracker.result(),
            "kl_loss": self.kl_loss_tracker.result(),
        }

### Train VAE

In [None]:
reg_lambda = 0.001
vae = VAE(encoder, decoder)
vae.compile(
    optimizer=keras.optimizers.Adam(learning_rate=0.001)
)  # lower learning rate
fit = vae.fit(trainset_mat, epochs=10, batch_size=128, verbose=2)

In [None]:
plt.rcParams["font.size"] = 12
fig, AX = plt.subplots(1, 2, figsize=(14, 6.0))
ax = AX[0]
ax.plot(fit.history["loss"], label="MSE loss", c="b")
ax.set_xlabel("epoch")
ax.set_ylabel("MSE loss")
ax.legend()
ax = AX[1]
ax.plot(fit.history["kl_loss"], label="KL loss", c="r")
ax.set_xlabel("epoch")
ax.set_ylabel("KL loss")
ax.legend()

## Evaluate performance
We'll now use the test set to explore the latent space distribution of data and the reconstruction accuracy

In [None]:
encoded_test = np.array(vae.encoder.predict(testset_mat))
encoded_train = np.array(vae.encoder.predict(trainset_mat))


In [None]:
print(encoded_test.shape)


z_mean are the first dimension, z_log_var the second (used in training), we're interested in the third dimension, which are the sampled z

In [None]:
dim1 = encoded_test[2, :, :]
print(dim1.shape)

Sampling in the latent space is reasonably gaussian as expected

In [None]:
# df = pd.DataFrame(dim1, columns=["x", "y"])
# sns.jointplot(x="x", y="y", data=df);

We can now use the data to decode

In [None]:
decoded_test = np.array(decoder.predict(encoded_test[2, :, :])).reshape(
    -1, numpart, numpart
)
decoded_train = np.array(decoder.predict(encoded_train[2, :, :])).reshape(
    -1, numpart, numpart
)
print(decoded_test.shape)

### Check reconstruction

In [None]:
ind = 20
df = pd.DataFrame(decoded_test[ind])
sns.heatmap(data=df)

In [None]:
df2 = pd.DataFrame(testset_mat[ind])
sns.heatmap(data=df2)

## Coordinates Reconstructor (Model 2)
The reconstructor is trained on the original distance matrices and predicts the decoded matrices

In [None]:
rec_inputs = layers.Input(shape=(numpart, numpart))
x = Flatten()(rec_inputs)
x = layers.Dense(int(numpart**2 * 4 / 5), activation="relu")(x)
x = layers.Dense(int(numpart**2 * 3 / 5), activation="relu")(x)
x = layers.Dense(int(numpart**2 * 2 / 5), activation="relu")(x)
x = layers.Dense(int(numpart**2 * 1 / 5), activation="relu")(x)
x = layers.Dense(int(numpart * dim))(x)
rec_outputs = Reshape((numpart, dim))(x)
reconstruction = keras.Model(rec_inputs, rec_outputs)
reconstruction.summary()

In [None]:
reconstruction.compile(
    optimizer=keras.optimizers.Adam(learning_rate=0.0001),
    loss=tf.keras.losses.MeanSquaredError(),
)  # lower learning rate
fit = reconstruction.fit(
    trainset_mat, trainset_conf, epochs=30, batch_size=128, verbose=2
)

In [None]:
rec_test = np.array(reconstruction.predict(testset_mat))
rec_test_dec = np.array(reconstruction.predict(decoded_test))

### Evaluate performance
Original data is in blue, reconstructed original configurations in gold

In [None]:
ind = 20
fig = plt.figure(figsize=(5, 5))
ax = fig.add_subplot()
l = np.sqrt(2)
ax.scatter(
    rec_test[ind, :, 0] * l, rec_test[ind, :, 1] * l, s=30, c="gold"
)
ax.scatter(
    testset_conf[ind, :, 0] * l,
    testset_conf[ind, :, 1] * l,
    s=30,
    c="#023e8a",
)
ax.set_xlim(-0.15, 1.15)
ax.set_ylim(-0.15, 1.15)

Original data is in 

In [None]:
ind = 20
fig = plt.figure(figsize=(5, 5))
ax = fig.add_subplot()
l = np.sqrt(2)
ax.scatter(
    rec_test_dec[ind, :, 0] * l,
    rec_test_dec[ind, :, 1] * l,
    s=30,
    c="#e63946",
)
ax.scatter(
    testset_conf[ind, :, 0] * l,
    testset_conf[ind, :, 1] * l,
    s=30,
    c="#023e8a",
)
ax.set_xlim(-0.15, 1.15)
ax.set_ylim(-0.15, 1.15)

In [None]:
from matplotlib.animation import FuncAnimation

# Create the figure and axis objects
fig, ax = plt.subplots()

nframes = 100


# Define the animation function
def update(ind):
    ax.clear()
    plt.scatter(
        rec_test[ind, :, 0] * l, rec_test[ind, :, 1] * l, s=20, c="b"
    )
    plt.scatter(
        testset_conf[ind, :, 0] * l,
        testset_conf[ind, :, 1] * l,
        s=20,
        c="y",
    )
    ax.set_title(f"Scatter plot ({ind}/{nframes})")
    ax.set_xlim(-0.1, 1.1)
    ax.set_ylim(-0.1, 1.1)


# Create the animation
animation = FuncAnimation(fig, update, frames=nframes, interval=400)

# Save the animation as a GIF
animation.save("conv2dist.gif", writer="imagemagick")

## Deez Labels

In [None]:
import sklearn
from sklearn.decomposition import PCA


def label_vis(vae, data, labels):
    # prediction
    z_mean, _, _ = vae.encoder.predict(data)

    pca = PCA(n_components=2)
    transformed_data = pca.fit_transform(z_mean)
    variance_ratio = pca.explained_variance_ratio_
    print(variance_ratio)

    # plot
    plt.figure(figsize=(5, 5))
    plt.scatter(transformed_data[:, 0], transformed_data[:, 1], c=labels)
    plt.colorbar()
    plt.xlabel("z[0]")
    plt.ylabel("z[1]")
    plt.show()

In [None]:
label_vis(vae, dm, labels)
