In [None]:
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.callbacks import ReduceLROnPlateau, EarlyStopping

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

# Import and modify dataset

In [None]:
ds = pd.read_csv('../Dataset/data_abs.csv', header=None)
ds = ds[ds[4] == 1] # Select data for water media
ds = ds[ds[1] != 0] # Select only core-shell particles
ds = ds.sample(frac=1).reset_index(drop=True) # Randomize dataset

In [None]:
num_mats = 11 # Total number of materials used (including "empty")
spectra = np.array(ds.iloc[:, 5:])/255 # Normalized spectra

m1 = np.array(ds[[0]]) # Core material ids
m2 = np.array(ds[[1]]) # Shell material ids
t1 = np.array(ds[[2]])/100 # Normalized core thickness
t2 = np.array(ds[[3]])/100 # Normalized shell thickness

geo = np.hstack((m1, m2, t1, t2)) # Geometry parameters

test_split = int(0.95*ds.shape[0]) # Number of samples separated for testing after training

# Training output
y_train = spectra[:test_split]
y_train = tf.expand_dims(y_train, axis=-1)

# Training features
x_train_m1 = keras.utils.to_categorical(m1[:test_split])
x_train_m1 = tf.expand_dims(x_train_m1, axis=1)
x_train_m2 = keras.utils.to_categorical(m2[:test_split])
x_train_m2 = tf.expand_dims(x_train_m2, axis=1)
x_train_t1 = t1[:test_split]
x_train_t1 = tf.expand_dims(x_train_t1, axis=-1)
x_train_t2 = t2[:test_split]
x_train_t2 = tf.expand_dims(x_train_t2, axis=-1)
x_train = geo[:test_split]
x_train = tf.expand_dims(x_train, axis=-1)

# Testing output
y_test = spectra[test_split:]
y_test = tf.expand_dims(y_test, axis=-1)

# Testing features
x_test_m1 = keras.utils.to_categorical(m1[test_split:])
x_test_m1 = tf.expand_dims(x_test_m1, axis=1)
x_test_m2 = keras.utils.to_categorical(m2[test_split:])
x_test_m2 = tf.expand_dims(x_test_m2, axis=1)
x_test_t1 = t1[test_split:]
x_test_t1 = tf.expand_dims(x_test_t1, axis=-1)
x_test_t2 = t2[test_split:]
x_test_t2 = tf.expand_dims(x_test_t2, axis=-1)
x_test = geo[test_split:]
x_test = tf.expand_dims(x_test, axis=-1)

#  cVAE resnet model

In [None]:
def residual_block(x_in, N_filter, kernel_size=3, strides=1,
                   conv_layer=keras.layers.Conv1D, alpha=0.3,
                   with_BN=False):

    # Residual connection
    if x_in.shape[-1] != N_filter or strides != 1:
        # If input != output dimension, add BN/ReLU/conv. into shortcut
        conv_shortcut = conv_layer(
            filters=N_filter, kernel_size=1, strides=strides, padding='same')(x_in)
    else:
        # Else use bare input as shortcut
        conv_shortcut = x_in

    # Convolutional path
    x = x_in

    x = conv_layer(filters=N_filter, kernel_size=1, strides=1,
                   padding='same', use_bias=not with_BN)(x)
    if with_BN:
        x = keras.layers.BatchNormalization()(x)
    x = keras.layers.LeakyReLU(alpha)(x)

    x = conv_layer(filters=N_filter, kernel_size=kernel_size,
                   strides=strides, padding='same', use_bias=not with_BN)(x)
    if with_BN:
        x = keras.layers.BatchNormalization()(x)
    x = keras.layers.LeakyReLU(alpha)(x)

    x = conv_layer(filters=N_filter, kernel_size=1, strides=1,
                   padding='same', use_bias=not with_BN)(x)
    if with_BN:
        x = keras.layers.BatchNormalization()(x)

    # Add residual and main and apply a further activation
    x = keras.layers.Add()([x, conv_shortcut])
    x = keras.layers.LeakyReLU(alpha)(x)

    return x


In [None]:
def resblock_sequence_down(x_in, N_filter, N_blocks):
    x = x_in
    for i in range(N_blocks):
        x = residual_block(x, N_filter, kernel_size=3, strides=1, with_BN=True)
    # Use strides=2 for downsampling (more flexible, since trainable).
    x = residual_block(x, N_filter, kernel_size=3, strides=2)
    return x


def resblock_sequence_up(x_in, N_filter, N_blocks):
    x = x_in
    for i in range(N_blocks):
        x = residual_block(x, N_filter, kernel_size=3, strides=1, with_BN=True)
    # Use upsampling (more robust than transpose convolutions with stride 2)
    x = keras.layers.UpSampling1D()(x)
    return x


def resblock_sequence_id(x_in, N_filter, N_blocks):
    # Identity block sequence: input shape = output shape
    x = x_in
    for i in range(N_blocks):
        x = residual_block(x, N_filter, kernel_size=3, strides=1, with_BN=True)
    return x

#  RNG sampling layer for regularization

The latent space of a VAE is regularized via random sampling from a normal distribution during training and ragularization of these distributions using a KL loss. The KL loss will be introduced below.
Here we define a sampling layer that generates randomly perturbed latent vectors:

In [None]:
class SamplingRNG(keras.layers.Layer):
    # Layer to sample normally distributed random numbers z with mean `z_mean` and logarithmic variance `z_log_var`
    def call(self, inputs):
        z_mean, z_var_log = inputs
        epsilon = keras.backend.random_normal(shape=tf.shape(z_mean)[1:])
        z = z_mean + keras.backend.exp(0.5 * z_var_log) * epsilon
        return z

# Define the encoder

The cVAE consists of first an encoder. This compresses the condition (target spectrum) and the geometry (design) into a latent space.

In [None]:
latent_dim = 2 # for interpretability
N_blocks = 3 # Number of resblocks between upsamplings

# Input layers
condition_target_spec_input = keras.layers.Input(
    shape=y_train.shape[1:], name='condition_in')
design_in_m1 = keras.Input(shape=x_train_m1.shape[1:], name='design_in_m1')
design_in_m2 = keras.Input(shape=x_train_m2.shape[1:], name='design_in_m2')
design_in_t1 = keras.Input(shape=x_train_t1.shape[1:], name='design_in_t1')
design_in_t2 = keras.Input(shape=x_train_t2.shape[1:], name='design_in_t2')

# Spectrum path
x = condition_target_spec_input
x = resblock_sequence_down(x, N_filter=16, N_blocks=N_blocks) # 64 --> 32
x = resblock_sequence_down(x, N_filter=32, N_blocks=N_blocks) # 32 --> 16
x = resblock_sequence_down(x, N_filter=64, N_blocks=N_blocks) # 16 --> 8
x = resblock_sequence_down(x, N_filter=128, N_blocks=N_blocks) # 8 --> 4

x_spec = keras.layers.Flatten()(x)

# Geometry path
x_m1 = design_in_m1
x_m2 = design_in_m2
x_t1 = design_in_t1
x_t2 = design_in_t2

x_1 = keras.layers.Concatenate(axis=-1)([x_m1, x_t1])
x_2 = keras.layers.Concatenate(axis=-1)([x_m2, x_t2])
x = keras.layers.Concatenate(axis=1)([x_1, x_2])

x = resblock_sequence_id(x, N_filter=64, N_blocks=N_blocks) 

x_design = keras.layers.Flatten()(x)

# Merge both paths
x = keras.layers.Concatenate()([x_spec, x_design])
x = keras.layers.Dense(256)(x)
x = keras.layers.LeakyReLU(alpha=1e-1)(x)

# Output: latent sampling
z_mean = keras.layers.Dense(latent_dim)(x)
z_var_log = keras.layers.Dense(latent_dim)(x)
z = SamplingRNG()([z_mean, z_var_log])

encoder = keras.Model(
    [design_in_m1, design_in_m2, design_in_t1, design_in_t2, condition_target_spec_input],
    [z_mean, z_var_log, z],
    name="encoder")

# Define the decoder

The second stage of the cVAE, and the actual inverse model after training, is the decoder.  It takes as input the condition (target spectrum) and a latent vector z. During training multiple design solutions for a given target spectrum will be mapped on different latent vectors z so that the latent space can be used to distinguish different solutions.

In [None]:
# Latent vector input
latent_in = keras.Input(shape=(latent_dim,), name='latent_in')

# Spectrum path
x = condition_target_spec_input
x = resblock_sequence_down(x, N_filter=16, N_blocks=N_blocks) # 64 --> 32
x = resblock_sequence_down(x, N_filter=32, N_blocks=N_blocks) # 32 --> 16
x = resblock_sequence_down(x, N_filter=64, N_blocks=N_blocks) # 16 --> 8
x = resblock_sequence_down(x, N_filter=128, N_blocks=N_blocks) # 8 --> 4

x_spec = keras.layers.Flatten()(x)

# Merge with latent
x = keras.layers.Concatenate()([x_spec, latent_in])

# Geometry generator path
x = keras.layers.Dense(1*128)(x)
x = keras.layers.Reshape((1, 128))(x)

x = resblock_sequence_id(x, N_filter=128, N_blocks=N_blocks)

x_m1 = keras.layers.Dense(512, activation='relu')(x)
x_m1 = keras.layers.Dense(512, activation='relu')(x_m1)
out_design_m1 = keras.layers.Dense(num_mats, activation='softmax', name='design_out_m1')(x_m1)

x_m2 = keras.layers.Dense(512, activation='relu')(x)
x_m2 = keras.layers.Dense(512, activation='relu')(x_m2)
out_design_m2 = keras.layers.Dense(num_mats, activation='softmax', name='design_out_m2')(x_m2)

x_t1 = keras.layers.Dense(512, activation='relu')(x)
x_t1 = keras.layers.Dense(512, activation='relu')(x_t1)
out_design_t1 = keras.layers.Dense(1, activation='linear', name='design_out_t1')(x_t1)

x_t2 = keras.layers.Dense(512, activation='relu')(x)
x_t2 = keras.layers.Dense(512, activation='relu')(x_t2)
out_design_t2 = keras.layers.Dense(1, activation='linear', name='design_out_t2')(x_t2)

decoder = keras.Model(
    [latent_in, condition_target_spec_input],
    [out_design_m1, out_design_m2, out_design_t1, out_design_t2], name="decoder")

# Define the full cVAE

In [None]:
# Using the encoder to generate a randomized latent vector
z_mean_out, z_var_log_out, z_rnd_out = encoder([design_in_m1, design_in_m2, design_in_t1, design_in_t2, condition_target_spec_input])

# Using the decoder to generate the geometry parameters
design_pred_m1, design_pred_m2, design_pred_t1, design_pred_t2 = decoder([z_rnd_out, condition_target_spec_input])

cVAE = keras.Model(
    [design_in_m1, design_in_m2, design_in_t1, design_in_t2, condition_target_spec_input],
    [design_pred_m1, design_pred_m2, design_pred_t1, design_pred_t2],
    name="cVAE")

# cVAE losses: reconstruction and KL

In order to regularize the latent space, we need to add a Kullback-Leibler (KL) divergence loss in addition to the MSE reconstruction loss. Adding custom loss functions in keras can be done via model.add_loss(). We also add loss metrics to show status messages during training.

In [None]:
input_dim = np.product(x_train.shape[1:])
latent_dim = np.product(z.shape[1:])
beta = 0.001 # Weight of KL loss. If too high, reconstruction will suffer.

# Material reconstruction loss
def categorical_ce(y_true, y_pred):
    cce = keras.losses.CategoricalCrossentropy(name='reconstr_loss_m1')
    return cce(y_true, y_pred)

reconstruction_loss_m1 = categorical_ce(design_in_m1, design_pred_m1)

cVAE.add_loss(reconstruction_loss_m1)
cVAE.add_metric(reconstruction_loss_m1, name='reconstr_loss_m1')

m1_acc = keras.metrics.categorical_accuracy(design_in_m1, design_pred_m1)
cVAE.add_metric(m1_acc, name="m1_accuracy")

def categorical_ce1(y_true, y_pred):
    cce1 = keras.losses.CategoricalCrossentropy(name='reconstr_loss_m2')
    return cce1(y_true, y_pred)

reconstruction_loss_m2 = categorical_ce1(design_in_m2, design_pred_m2)

cVAE.add_loss(reconstruction_loss_m2)
cVAE.add_metric(reconstruction_loss_m2, name='reconstr_loss_m2')

m2_acc = keras.metrics.categorical_accuracy(design_in_m2, design_pred_m2)
cVAE.add_metric(m2_acc, name="m2_accuracy")


# Thickness reconstruction loss
reconstruction_loss_t1 = keras.losses.mse(
    keras.backend.flatten(design_in_t1),
    keras.backend.flatten(design_pred_t1))
cVAE.add_loss(reconstruction_loss_t1)
cVAE.add_metric(reconstruction_loss_t1, name='reconstr_loss_t1', aggregation='mean')

reconstruction_loss_t2 = keras.losses.mse(
    keras.backend.flatten(design_in_t2),
    keras.backend.flatten(design_pred_t2))
cVAE.add_loss(reconstruction_loss_t2)
cVAE.add_metric(reconstruction_loss_t2, name='reconstr_loss_t2', aggregation='mean')

# KL loss
kl_loss = keras.backend.sum(
    1 + z_var_log_out - keras.backend.square(z_mean_out) - keras.backend.exp(z_var_log_out), axis=-1)
kl_loss = beta * keras.backend.mean(-0.5 / latent_dim * kl_loss)
cVAE.add_loss(kl_loss)
cVAE.add_metric(kl_loss, name='kl_loss', aggregation='mean')

# Train the cVAE end-to-end

In contrast to the 2-stage training of the Tandem, the cVAE is trained end-to-end in a single run. Note that we have now two inputs for the full cVAE model: designs and spectra, the model output are the reconstructed designs, which are compared to the input designs via MSE loss. After training, for inverse design we will use only the decoder part.

In [None]:
# Compile the model using the Adam optimizer
optimizer = tf.keras.optimizers.Adam(learning_rate=0.0002)
cVAE.compile(optimizer=optimizer)

# Automatic learning rate reduction on loss plateau
reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=1/3, patience=4, verbose=1)
# Callback for early stopping
early_stop = EarlyStopping(monitor='val_loss', patience=10, mode='min', restore_best_weights=True)
callbacks = [reduce_lr, early_stop]

# Fit the model with BN --> increasing batchsize schdedule
hist = None # Global history after BS increase
for i in range(3): # 3x16 epochs, doubling batchsize
    _h = cVAE.fit(x=[x_train_m1, x_train_m2, x_train_t1, x_train_t2, y_train],
                  y=[x_train_m1, x_train_m2, x_train_t1, x_train_t2],
                  validation_split=0.2,
                  batch_size=32 * 2**i, epochs=16,
                  callbacks = callbacks)
    if hist is None:
        hist = _h
    else:
        # Update history
        for k in hist.history:
            hist.history[k] = np.concatenate([hist.history[k], _h.history[k]])

# Plot losses

In [None]:
plt.figure()
plt.plot(hist.history['loss'], label='loss')
plt.plot(hist.history['val_loss'], label='val_loss')
plt.yscale('log')
plt.legend()
plt.show()

# Save Models

In [None]:
cVAE.save('../Models/model_cVAE.h5')
encoder.save('../Models/model_cVAE_encoder.h5')
decoder.save('../Models/model_cVAE_decoder.h5')