In [7]:
#@title Libraries
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers, metrics, Model, losses
import pandas as pd

# ***Libraries*** 👆
---
# ***Methods and Classes*** 👇


In [2]:
#@title Samling
class Sampling(layers.Layer):
  '''
  Sampling Layer: Sample z from the Probability Distribution of z_mean and z_log_var
  '''
  def call(self, z_mean, z_log_var):
    batch = tf.shape(z_mean)[0]
    dim = tf.shape(z_mean)[1]
    epsilon = tf.random.normal(shape=(batch, dim))
    z = z_mean + tf.exp(0.5 * z_log_var) * epsilon
    return z

In [6]:
#@title VAE Model
class VAE(keras.Model):

  def __init__(self, input_dim:int, hidden_dim:int, latent_dim:int, **kwargs):
    '''
    Define the model structure and it's properties
    '''
    super().__init__(**kwargs)
    self.input_dim = input_dim
    self.hidden_dim = hidden_dim
    self.latent_dim = latent_dim
    self.encoder = self.Encoder()
    self.decoder = self.Decoder()
    self.total_loss = metrics.Mean(name="total_loss")
    self.reconstruction_loss = metrics.Mean(name="reconstruction_loss")
    self.kl_loss = metrics.Mean(name="kl_loss")


  @property
  def metrics(self):
    '''
    Loss metrics
    '''
    return [self.total_loss, self.reconstruction_loss, self.kl_loss,]


  def Encoder(self)->Model:
    '''
    Encoder model to transform the input to the latent space (compress)
    '''
    encoder_inputs = keras.Input(shape=(self.input_dim,))
    x = layers.Dense(self.hidden_dim, activation="relu")(encoder_inputs)
    z_mean = layers.Dense(self.latent_dim, name="z_mean")(x)
    z_log_var = layers.Dense(self.latent_dim, name="z_log_var")(x)
    z = Sampling()(z_mean, z_log_var)
    encoder = Model(encoder_inputs, [z_mean, z_log_var, z], name="encoder")
    encoder.summary()
    keras.utils.plot_model(encoder, show_shapes=True, to_file='Encoder.png')
    return encoder


  def Decoder(self)->Model:
    '''
    Decoder model to reconstruct the input from the latent vector  (decompress)
    '''
    latent_inputs = keras.Input(shape=(self.latent_dim,))
    x = layers.Dense(self.hidden_dim, activation="relu")(latent_inputs)
    decoder_outputs = layers.Dense(self.input_dim, activation="relu")(x)
    decoder = keras.Model(latent_inputs, decoder_outputs, name="decoder")
    decoder.summary()
    keras.utils.plot_model(decoder, show_shapes=True, to_file='Decoder.png')
    return decoder


  def Loss(self, input: tf.Tensor, output: tf.Tensor, z_mean: tf.Tensor, z_log_var: tf.Tensor)->list:
    '''
    The Loss fuction to calculate the loss of VEA (Reconstruction_loss+KL_loss)
    '''
    Reconstruction_loss = tf.reduce_mean((input-output)**2)
    KL_loss = -0.5 * (1 + z_log_var - tf.square(z_mean) - tf.exp(z_log_var))
    KL_loss = tf.reduce_mean(tf.reduce_sum(KL_loss))
    return [Reconstruction_loss + KL_loss, Reconstruction_loss, KL_loss]


  def train_step(self, input: tf.Tensor)->dict:
    '''
    Calculate the output of the model and the errors.
    Update the model's weights by Backpropagating the error
    '''
    with tf.GradientTape() as tape:
      z_mean, z_log_var, z = self.encoder(input)
      output = self.decoder(z)
      Total_loss, Reconstruction_loss, KL_loss = self.Loss(input, output, z_mean, z_log_var)
      grads = tape.gradient(Total_loss, self.trainable_weights)
      self.optimizer.apply_gradients(zip(grads, self.trainable_weights))
      self.total_loss.update_state(Total_loss)
      self.reconstruction_loss.update_state(Reconstruction_loss)
      self.kl_loss.update_state(KL_loss)
      return { "Loss": self.total_loss.result(), "Reconstruction_loss": self.reconstruction_loss.result(), "KL_loss": self.kl_loss.result(),}

# ***Methods and Classes*** 👆
---
# ***Main*** 👇

In [4]:
# Load the dataset and train the model

data = pd.read_pickle("/content/train_embeddings_1.p")
input_dim = data.shape[1]
hidden_dim = 128
latent_dim = 50

vae = VAE(input_dim, hidden_dim, latent_dim)
vae.compile(optimizer=keras.optimizers.Adam())
vae.fit(data, epochs=10, batch_size=32)

Model: "encoder"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_1 (InputLayer)           [(None, 384)]        0           []                               
                                                                                                  
 dense (Dense)                  (None, 128)          49280       ['input_1[0][0]']                
                                                                                                  
 z_mean (Dense)                 (None, 50)           6450        ['dense[0][0]']                  
                                                                                                  
 z_log_var (Dense)              (None, 50)           6450        ['dense[0][0]']                  
                                                                                            

<keras.callbacks.History at 0x7fc9d31c1d80>

In [5]:
# Use the Trained encoder to get the compress vector

trained_encoder = vae.encoder
trained_encoder.trainable = False  # freeze the weights
z_mean, z_log_var, z = trained_encoder.predict(data, verbose=0)