# Variational AutoEncoder

**Author:** [fchollet](https://twitter.com/fchollet)<br>
**Date created:** 2020/05/03<br>
**Last modified:** 2020/05/03<br>
**Description:** Convolutional Variational AutoEncoder (VAE) trained on MNIST digits.<br>
**Edited for IR:** yang shi han <br>

## Setup

In [20]:
import numpy as np
import tensorflow as tf
from tensorflow import keras as kr
from tensorflow import keras as keras
import os , scipy.io
#minors
import math
import time
import scipy
import random
from keras.layers import Dense, Dropout, Flatten, Conv2D, MaxPool2D ,Conv1D
from tensorflow.keras import regularizers
from tensorflow.keras import layers
from keras.callbacks import Callback

# plot related
import matplotlib
import matplotlib.pyplot as plt
from matplotlib.figure import Figure

# get dataset from drive
from google.colab import drive
drive.mount('/gdrive')

# linux 操作-轉入雲端硬碟根目錄
%cd /gdrive/My Drive/
%cd ML_Data/
%cd MM/INCHI/

# load data
X_data = np.genfromtxt('label.dat', delimiter=',', skip_header = 1,)
name_list = np.recfromcsv('index.dat')

X_data = 1-X_data
X_data = X_data.reshape(len(X_data),len(X_data[1]),1)
IR_resolution = len(X_data[0])


Drive already mounted at /gdrive; to attempt to forcibly remount, call drive.mount("/gdrive", force_remount=True).
/gdrive/My Drive
/gdrive/My Drive/ML_Data
/gdrive/My Drive/ML_Data/MM/INCHI


  output = genfromtxt(fname, **kwargs)


## Create a sampling layer

In [4]:

class Sampling(layers.Layer):
    """Uses (z_mean, z_log_var) to sample z, the vector encoding a digit."""

    def call(self, inputs):
        z_mean, z_log_var = inputs
        batch = tf.shape(z_mean)[0]
        dim = tf.shape(z_mean)[1]
        epsilon = tf.keras.backend.random_normal(shape=(batch, dim))
        return z_mean + tf.exp(0.5 * z_log_var) * epsilon


## Build the en/decoder

In [5]:
# pre-set value
latent_dim = 8
kl_scale_factor = 10
VAE_epoch_step = -1
#encoder

# model construction

In [None]:
# Encoder
encoder_inputs = keras.Input(shape=(IR_resolution, 1))
x = keras.layers.Conv1D(filters=2, kernel_size=2, strides=2, padding="same", activation=keras.layers.LeakyReLU(alpha=0.2),)(encoder_inputs)
x = keras.layers.BatchNormalization()(x)
x = keras.layers.Conv1D(filters=4, kernel_size=2, strides=2, padding="same", activation=keras.layers.LeakyReLU(alpha=0.2),)(x)
x = keras.layers.BatchNormalization()(x)
x = keras.layers.Conv1D(filters=8, kernel_size=2, strides=2, padding="same", activation=keras.layers.LeakyReLU(alpha=0.2),kernel_regularizer=regularizers.L1L2(l1=1e-5, l2=1e-4))(x)
x = keras.layers.BatchNormalization()(x)
x = keras.layers.Conv1D(filters=16, kernel_size=2, strides=2, padding="same", activation=keras.layers.LeakyReLU(alpha=0.2),kernel_regularizer=regularizers.L1L2(l1=1e-5, l2=1e-4))(x)
x = keras.layers.BatchNormalization()(x)
x = keras.layers.Conv1D(filters=32, kernel_size=2, strides=2, padding="same", activation=keras.layers.LeakyReLU(alpha=0.2),kernel_regularizer=regularizers.L1L2(l1=1e-5, l2=1e-4))(x)
x = keras.layers.BatchNormalization()(x)
x = layers.Flatten()(x)

z_mean = layers.Dense(latent_dim, name="z_mean")(x)
z_log_var = layers.Dense(latent_dim, name="z_log_var")(x)
z = Sampling()([z_mean, z_log_var])
encoder = keras.Model(encoder_inputs, [z_mean, z_log_var, z], name="encoder")
encoder.summary()

# Decoder
latent_inputs = keras.Input(shape=(latent_dim,))
x = layers.Dense(latent_dim, activation="relu")(latent_inputs)
x = layers.Dense(8 * latent_dim, activation="relu")(x)
x = layers.Dense(8 * latent_dim, activation="relu")(x)
x = layers.Reshape((8, latent_dim))(x)
x = layers.Conv1DTranspose(16, 2, activation=keras.layers.LeakyReLU(alpha=0.2), strides=2, padding="same")(x)
x = keras.layers.BatchNormalization()(x)
x = layers.Conv1DTranspose(8, 2, activation=keras.layers.LeakyReLU(alpha=0.2), strides=2, padding="same")(x)
x = keras.layers.BatchNormalization()(x)
x = layers.Conv1DTranspose(4, 2, activation=keras.layers.LeakyReLU(alpha=0.2), strides=2, padding="same")(x)
x = keras.layers.BatchNormalization()(x)
x = layers.Conv1DTranspose(2, 2, activation=keras.layers.LeakyReLU(alpha=0.2), strides=2, padding="same")(x)
x = keras.layers.BatchNormalization()(x)
x = layers.Conv1DTranspose(1, 1, activation=keras.layers.LeakyReLU(alpha=0.2), padding="same")(x)
x = layers.Flatten()(x)

x = layers.Dense(IR_resolution, activation=keras.layers.LeakyReLU(alpha=0.2))(x)
decoder_outputs = layers.Reshape((IR_resolution, 1))(x)
decoder = keras.Model(latent_inputs, decoder_outputs, name="decoder")
decoder.summary()


## Define the VAE as a `Model` with a custom `train_step`

In [7]:
def clamp(n, smallest, largest):
    return max(smallest, min(n, largest))

class VAE(keras.Model):
    def __init__(self, encoder, decoder, **kwargs):
        super(VAE, self).__init__(**kwargs)
        self.encoder = encoder
        self.decoder = decoder
        self.total_loss_tracker = keras.metrics.Mean(name="total_loss")
        self.reconstruction_loss_tracker = keras.metrics.Mean(
            name="reconstruction_loss"
        )
        self.kl_loss_tracker = keras.metrics.Mean(name="kl_loss")

    @property
    def metrics(self):
        return [
            self.total_loss_tracker,
            self.reconstruction_loss_tracker,
            self.kl_loss_tracker,
        ]

    def train_step(self, data):
      global kl_scale_factor,  VAE_epoch_step
      with tf.GradientTape() as tape:
          z_mean, z_log_var, z = self.encoder(data)
          reconstruction = self.decoder(z)
          #print(reconstruction)
          reconstruction_loss = tf.reduce_mean(
              tf.reduce_sum(
                  keras.losses.binary_crossentropy(data, reconstruction), axis=(1)
              )
          )
          kl_loss = -0.5 * (1 + z_log_var - tf.square(z_mean) - tf.exp(z_log_var)) # *  clamp((1-1/(VAE_epoch_step *kl_scale_factor)),0,1)
          kl_loss = tf.reduce_mean(tf.reduce_sum(kl_loss, axis=1))
          total_loss = reconstruction_loss + kl_loss
      grads = tape.gradient(total_loss, self.trainable_weights)
      self.optimizer.apply_gradients(zip(grads, self.trainable_weights))
      self.total_loss_tracker.update_state(total_loss)
      self.reconstruction_loss_tracker.update_state(reconstruction_loss)
      self.kl_loss_tracker.update_state(kl_loss)
      return {
          "loss": self.total_loss_tracker.result(),
          "reconstruction_loss": self.reconstruction_loss_tracker.result(),
          "kl_loss": self.kl_loss_tracker.result(),
      }

class VAE_Callback(Callback):
  def __init__(self):
    self.epoch_step = 1
    pass
  def on_epoch_begin(self, epoch, logs=None):
    self.epoch_step = 1
    global VAE_epoch_step
    VAE_epoch_step = self.epoch_step

  def on_epoch_end(self, epoch, logs=None):
    global VAE_epoch_step
    self.epoch_step += 1
    VAE_epoch_step = self.epoch_step
    #print(VAE_epoch_step)

    #progress = self.epoch_step / self.params["steps"]


## Train the VAE

In [None]:
early_stopping = keras.callbacks.EarlyStopping(monitor='reconstruction_loss',patience=150, min_delta=0.0005, restore_best_weights=True)

vae = VAE(encoder, decoder)
vae.compile(optimizer=keras.optimizers.Adam(), loss='mean_absolute_error')
history = vae.fit(X_data, epochs=2500, batch_size=32,callbacks=[VAE_Callback(),early_stopping])

In [None]:
# export with CSV format
train_dat = history.history
loss_dat = train_dat['loss']
val_dat = train_dat['reconstruction_loss']
val_dat2 = train_dat['kl_loss']
offset = 0

for i in range(len(loss_dat)):
  print(str(i+1+offset)+','+str(loss_dat[i])+','+str(val_dat[i])+','+str(val_dat2[i]))


# plot by mean/std

In [None]:
# preset
low_end_wavenumber = 400
high_end_wavenumber= 4000
scale_factor = 2
means_list = [-0.008141835,-0.00361425,-0.012794748,-0.005634071,-0.010974623,0.0000211559,-0.015696594,-0.022589697]
std_list = [0.026210615,0.973954139,0.986732415,1.001472623,0.956764939,0.929925118,0.991434809,0.942747022]

n = scale_factor*2+1
plt.rcParams["figure.figsize"] = (15,2.2*n)
for i in range(n):
  fig =plt.subplot(n,1,i+1)
  code = np.zeros((1,len(std_list)))
  for k in range(len(std_list)):
    code[0][k] = means_list[k] + std_list[k] * (-1 + ((i)/((n-1)/2))) * 3
  data = vae.decoder.predict(code)
  data = 1-data
  #print(data.shape)
  data= data.reshape(IR_resolution,)
  x_axis = list()
  for j in range(len(data)):
    x_axis.append(low_end_wavenumber + (high_end_wavenumber-low_end_wavenumber)/len(data)*j)
  fig.plot(x_axis, data,  c='b', label='DB' , linewidth=1.0)

  if i <n-1:
  # stop showing x axis tick (x axis numbers)
    plt.tick_params(
    axis='x',
    which='both',
    bottom=False,
    top=False,
    labelbottom=False)

  fig.invert_xaxis()
  fig.grid(True)
  fig.set_title(""+str([round(k,ndigits=4) for k in code[0]]))
  fig.set_ylim([0, 1.2])

In [None]:
def reconstruction_test(vae,test_array):
  n = len(test_array)
  plt.rcParams["figure.figsize"] = (15,2*n)
  for i in range(n):
    fig =plt.subplot(n,1,i+1)
    data = vae.decoder.predict( vae.encoder.predict( test_array[i].reshape(1,IR_resolution,1))[2] )
    data = 1-data
    data= data.reshape(IR_resolution,)
    fig.plot(range(len(data)), data,  c='r', label='DB' , linewidth=1.0)
    fig.plot(range(len(data)), 1 - test_array[i].reshape(IR_resolution,),  c='g', label='DB' , linewidth=1.0)
    fig.invert_xaxis()
    fig.grid(True)
    #fig.set_title("                           ")
    fig.set_ylim([0, 1.2])
reconstruction_test(vae,[X_data[k] for k in np.random.choice(5699, 4)])

## Evulation with spearsman

In [16]:
from scipy.stats import spearmanr
# final evaluation
#VAE.summary()
def spearsman_eval(vae):
  spearsman_list_NN = []
  samples = 100

  for i in range(samples):#range(len(X_data)):
    #VAE.decoder.predict( VAE.encoder.predict( X_data[i].reshape(1,IR_resolution,1)[2]) )
    ans = vae.decoder.predict( vae.encoder.predict( X_data[i].reshape(1,IR_resolution,1) , verbose=0 )[2] , verbose=0  )
    ans = ans.reshape(len(X_data[1]))
    correlation, p = spearmanr(ans,X_data[i])
    spearsman_list_NN.append(correlation)

  print('Neuron: ',end='')
  print(sum(spearsman_list_NN)/samples)

spearsman_eval(vae)
#print(format(end-start))

Neuron: 0.6689953770421287


# Save Model


In [17]:
vae.encoder.save('VAE_encoder8.h5')
vae.decoder.save('VAE_decoder8.h5')

# Load Model

In [22]:
enc = tf.keras.models.load_model('VAE_encoder8.h5',custom_objects={"Sampling":Sampling})
dec = tf.keras.models.load_model('VAE_decoder8.h5',custom_objects={"Sampling":Sampling})
vae = VAE(enc,dec)
vae.compile(optimizer=keras.optimizers.Adam())

