<a href="https://colab.research.google.com/github/c-quilo/premiereDroplets/blob/main/ganDropletExperiments.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Generative adversarial Networks (GANs) for droplet experiments

In [36]:
from __future__ import print_function, division

import tensorflow.keras as tf
import tensorflow
from sklearn.model_selection import train_test_split
import tensorflow.keras.backend as backend
import matplotlib.pyplot as plt
import numpy as np
from keras import backend
from keras.layers import Lambda
from keras.constraints import Constraint
from keras.initializers import RandomNormal

from keras import optimizers
from keras.utils import np_utils
import tensorflow.keras as tf

The new class GANexperiments is defined

In [40]:
class ClipConstraint(Constraint):
    # set clip value when initialized
    def __init__(self, clip_value):
        self.clip_value = clip_value

    # clip model weights to hypercube
    def __call__(self, weights):
        return backend.clip(weights, -self.clip_value, self.clip_value)

    # get the config
    def get_config(self):
        return {'clip_value': self.clip_value}

# clip model weights to a given hypercube
class dropletGAN():

    def __init__(self, trainingData, latent_dim, GANorWGAN):

        # Wasserstein loss
        def wasserstein_loss(y_true, y_pred):
            return backend.mean(y_true * y_pred)

        self.trainingData = trainingData
        self.nFeatures = self.trainingData.shape[1]
        # Dimension of the latent space (noise)
        self.latent_dim = latent_dim
        self.constraint = 0.01
        self.dropoutNumber = 0.5
        self.alpha = 0.3
        self.GANorWGAN = GANorWGAN
        self.nameExperiment = '_dropletExperiments_'

        self.c1_hist = []
        self.c2_hist = []
        self.g_hist = []

        self.optimizer = tf.optimizers.Nadam()

        if self.GANorWGAN == 'WGAN':
            self.loss = wasserstein_loss
        elif self.GANorWGAN == 'GAN':
            self.loss = 'binary_crossentropy'

        self.loss_gen = 'mse'

        # Build and compile the discriminator
        self.discriminator = self.build_discriminator()

        # Build the encoder and decoder
        self.generator = self.build_generator()

        # Only the generator is trained through the combined model, thus:
        self.discriminator.trainable = False

        # Connecting models
        noise_input = tf.Input(shape=self.latent_dim)
        generator_output = self.generator(noise_input)
        discriminator_output = self.discriminator(generator_output)

        # The combined model stacks the autoencoder and discriminator
        # The stacked model has one input and two outputs: the decoded input and the discriminator output
        self.combined = tf.Model(noise_input, discriminator_output, name = 'dropletGAN')
        self.combined.compile(loss=self.loss, loss_weights=[0.999, 0.001], optimizer=self.optimizer)

    def build_discriminator(self):
        init = RandomNormal(stddev=0.02)
        const = ClipConstraint(0.01)

        in_disc = tf.Input(shape=(self.nFeatures))
        disc = tf.layers.LeakyReLU(self.alpha)(in_disc)
        disc = tf.layers.BatchNormalization()(disc)
        disc_output = tf.layers.Dense(1, activation='sigmoid')(disc)
        discriminator = tf.Model(in_disc, disc_output, name='Discriminator')
        discriminator.compile(loss=self.loss, optimizer=self.optimizer)

        return discriminator

    def build_generator(self):
        #init = RandomNormal(stddev=0.02)
        #init = tf.initializers.RandomNormal(stddev=0.02)

        input_gen = tf.Input(shape=self.latent_dim)
        gen = tf.layers.Dense(8)(input_gen)
        gen = tf.layers.LeakyReLU(self.alpha)(gen)
        gen = tf.layers.BatchNormalization()(gen)
        gen = tf.layers.Dense(8)(gen)
        gen = tf.layers.LeakyReLU(self.alpha)(gen)
        gen = tf.layers.BatchNormalization()(gen)
        gen_output = tf.layers.Dense(self.nFeatures)(gen)

        generator = tf.Model(input_gen, gen_output, name='Generator')
        generator.summary()
        return generator

    def train(self, epochs, batch_size=128, sample_interval=50, n_critic=5):

        # Load and pre process the data

        X_all = self.trainingData

        if self.GANorWGAN == 'WGAN':
            real = -np.ones(batch_size)
            fake = np.ones(batch_size)

        if self.GANorWGAN == 'GAN':
            real = np.ones(batch_size)
            fake = np.zeros(batch_size)

        # Training the model
        for epoch in range(epochs):
            c1_tmp, c2_tmp = list(), list()

            # Training the discriminator more often than the generator
            for _ in range(n_critic):
                # Randomly selected samples and noise
                randomIndex = np.random.randint(0, X_all.shape[0], size=batch_size)
                noise = np.random.normal(0, 1, size=(batch_size, self.latent_dim))
                # Select a random batch of input
                real_seqs = X_all[randomIndex]

                # Generate a batch of new outputs (in the latent space) predicted by the encoder
                gen_seqs = self.generator.predict(noise)

                # Train the discriminator
                # The arbitrary noise is considered to be a "real" sample
                d_loss_real = self.discriminator.train_on_batch(real_seqs, real)
                c1_tmp.append(d_loss_real)
                # The latent space generated by the encoder is considered a "fake" sample
                d_loss_fake = self.discriminator.train_on_batch(gen_seqs, fake)
                c2_tmp.append(d_loss_fake)

            self.c1_hist.append(np.mean(c1_tmp))
            self.c2_hist.append(np.mean(c2_tmp))

            # Training the stacked model
            g_loss = self.combined.train_on_batch(noise, [gen_seqs, real])
            self.g_hist.append(g_loss)
            print("%d [C1 real: %f, C2 fake: %f], [G loss: %f]" % (epoch, self.c1_hist[epoch], self.c2_hist[epoch], g_loss[0]))

            # Checkpoint progress: Plot losses and predicted data
            if epoch % sample_interval == 0:

                self.plot_loss(epoch)
                self.plot_values(epoch)
                self.generator.save(self.directory_data + '/' + self.nameExperiment + GANorWGAN +
                                            '_' + self.field_name + '_' + str(self.latent_dim) + '_' + str(epoch),
                                            save_format='tf')

                self.discriminator.save(self.directory_data + '/' + self.nameExperiment + GANorWGAN +
                                        '_' + self.field_name + '_' + str(self.latent_dim) + '_' + str(epoch),
                                        save_format='tf')

    # Plots the (W)GAN related losses at every sample interval

    def plot_loss(self, epoch):
        fig = plt.figure()
        plt.plot(self.c1_hist, c='red')
        plt.plot(self.c2_hist, c='blue')
        plt.plot(self.g_hist, c='green')
        plt.xlabel('Epoch')
        plt.ylabel('Loss')
        plt.title("GAN Loss per Epoch")
        plt.legend(['C real', 'C fake', 'Generator'])

        plt.savefig(self.directory_data + '/' + self.nameExperiment + GANorWGAN + '_' + self.field_name + '_' + '_' + str(epoch) +
                    '_' + str(self.latent_dim) + '.png')
        plt.close()

    # Plots predicted in the first 8 latent dimension at every sample interval

    def plot_values(self, epoch):
        noise = np.random.normal(0, 1, size=(X_all.shape[0], self.latent_dim))
        prediction = self.generator(noise)
        plt.subplot(1, 2, 1)
        plt.imshow(X_all[:, i])
        plt.title('Ground truth')
        plt.imshow(prediction)
        plt.title('Generated experiments')

        plt.tight_layout()
        plt.savefig(self.directory_data + '/' + self.nameExperiment + GANorWGAN + '_' + self.field_name + '_' + '_' + str(epoch) +
                    '_' + str(self.latent_dim) + '.png')
        plt.close()

Execute the class and train the GAN

In [41]:
if __name__ == '__main__':
    #Load data
    directory_data = '/content/'
    filename = 'normalisedDropletsExperiments.npy'
    trainingData = np.load(directory_data + filename)
    epochs = 10000
    batch_size = 32
    n_critic = 5
    sample_interval = 1000
    latent_dim = 4

    #Training method
    GANorWGAN = 'GAN'

    dropGAN = dropletGAN(trainingData=trainingData,
              latent_dim=latent_dim,
              GANorWGAN=GANorWGAN)
    advAE.train(epochs=epochs,
              batch_size=batch_size,
              sample_interval=sample_interval,
              n_critic = n_critic)

Model: "Generator"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
input_43 (InputLayer)        [(None, 4)]               0         
_________________________________________________________________
dense_67 (Dense)             (None, 8)                 40        
_________________________________________________________________
leaky_re_lu_52 (LeakyReLU)   (None, 8)                 0         
_________________________________________________________________
batch_normalization_52 (Batc (None, 8)                 32        
_________________________________________________________________
dense_68 (Dense)             (None, 8)                 72        
_________________________________________________________________
leaky_re_lu_53 (LeakyReLU)   (None, 8)                 0         
_________________________________________________________________
batch_normalization_53 (Batc (None, 8)                 32

FileNotFoundError: ignored

In [None]:
directory_data
filename