# WASSERSTEIN GENERATIVE ADVERSARIAL NETWORK: SOCOFing DATA

*Authored by Iulia-Maia Muresan and Marc González i Planellas*

## Objective

This notebook is devoted to the implementation of a WGAN to the SOCOFing dataset available on Kaggle:

https://www.kaggle.com/ruizgara/socofing

Which contains 6000 fingerprint scans from 600 african subjects, which we encode in greyscale. The pertinent exploration of how the SOCOFing (along with that of Fashion Mnist) is structured and how to retrieve a workable array to input in the WGAN is explored in the *image_processing.ipyinb* complementary file.

In [1]:
import os
import numpy as np
from numpy import expand_dims
from numpy import mean
from numpy import ones
from numpy.random import randn
from numpy.random import randint
from keras import backend
from keras.optimizers import RMSprop
from keras.models import Sequential
from keras.layers import Dense
from keras.layers import Reshape
from keras.layers import Flatten
from keras.layers import Conv2D
from keras.layers import Conv2DTranspose
from keras.layers import LeakyReLU
from keras.layers import BatchNormalization
from keras.initializers import RandomNormal
from keras.constraints import Constraint
from matplotlib import pyplot
from PIL import Image, ImageOps # maybe you need to -pip install pillow

One of the main characteristics of the Wasserstein GAN is that the **Critic** (the equivalent of the Discriminator for GAN in the original paper of Goodfellow et al.) is allowed to have a range of real values instead of assigning a binary class (fake or real) to the sampled image. However, to facilitate convergence, this indicator must be bound to a small value. The authors of WGAN define it at the $w \in \{-0.01,0.01\}$ range, but given the difference in complexity due to the fingerprint patterns, we decide for a more strict constraint (see Kernel [4]). In any case, we prepare the wasserstein loss function that the neural networks are going to use as the metric for minimizing their error. Given that `y_true` will be binary with value -1 for an image coming from the real sample and 1 for a value coming from the generator, this will set up a proper basis for the critic to minimize unto.

In [2]:
# luckily for us, the wasserstein loss is very easy to define in practise
def wasserstein_loss(y_true, y_pred):
    return backend.mean(y_true * y_pred)

In [3]:
class ClipConstraint(Constraint):
    # set clip value when initialized
    def __init__(self, clip_value):
        self.clip_value = clip_value

    # clip model weights to hypercube
    def __call__(self, weights):
        return backend.clip(weights, -self.clip_value, self.clip_value)

Defining the critic model. To disect a bit the different components within it:

* `in_shape(x,y,z)` is referencing the **input shape** that the critic will accept. The $(x,y)$ pair refers to the pixels of a $x\times y$ image, while $z$ is left to the channel, which in our case is 1 as we simply use greyscaled images but it could be 3 in case that all three primary chromatic sources are deployed.
* The weight initialization parameter `init` corresponds to the **prior** $p(z)$ and is needed to generate the stochastic component of the model. WGAN authors recommend using either a uniform or a gaussian distribution, which is what we decide for in here.
* We define the **constraints for the weights** as discussed in kernel [3] above.
* `LeakyReLU` allows for a small gradient when a specific link between layers should not be active. The slope is controlled by the hyperparameter `alpha`, and should in general be kept small.
* The `Sequential` class allows the pipeline between layers. Later on we will also use it to merge the Critic and the Generator parts of the GAN. We use two hidden layers before encoding the outcome into a single value, the critic "score" defining how real an image looks.
* For details on `Conv2D`, visit the keras documentation: https://www.tensorflow.org/api_docs/python/tf/keras/layers/Conv2D. The 4x4 convolutional window has been chosen based on standard practise. 
* **Batch normalization** is applied at every layer to re-standarize the outcomes from the previous layer and ensure proper performance. Spectral normalization (see the report) could also be considered.
* The activation step (from the last hidden layer to the output) requires of a **linear function** to assign the critic score. This is the default for `Dense(1)`. However, we need to pass an array to the activation layer, so `Flatten()` is used to convert the previous output tensor to the correct shape.
* The WGAN paper mentions that the model is unstable under momentum based optimizers, and instead decides to deploy `RMSProp`, so we follow their lead on this decision. The learning rate is set to a small value to ensure convergence, the other main potential issue that the authors discuss on that regard.

With this, we define the `define_critic` function for the model.

In [4]:
def define_critic(in_shape=(80,80,1)):
    # weight initialization and constraint
    init = RandomNormal(stddev=0.02)
    const = ClipConstraint(0.005)
    # define model
    model = Sequential()
    # downsize to 40x40
    model.add(Conv2D(64, (4,4), strides=(2,2), padding='same', kernel_initializer=init, 
                     kernel_constraint=const, input_shape=in_shape))
    model.add(BatchNormalization())
    model.add(LeakyReLU(alpha=0.2))
    # downsize to 20x20
    model.add(Conv2D(64, (4,4), strides=(2,2), padding='same', kernel_initializer=init, 
                     kernel_constraint=const, input_shape=in_shape))
    model.add(BatchNormalization())
    model.add(LeakyReLU(alpha=0.2))
    # downsize to 10x10
    model.add(Conv2D(64, (4,4), strides=(2,2), padding='same', kernel_initializer=init, 
                     kernel_constraint=const))
    model.add(BatchNormalization())
    model.add(LeakyReLU(alpha=0.2))
    # downsize to 5x5
    model.add(Conv2D(64, (4,4), strides=(2,2), padding='same', kernel_initializer=init, 
                     kernel_constraint=const))
    model.add(BatchNormalization())
    model.add(LeakyReLU(alpha=0.2))
    # preparing and applying linear activation
    model.add(Flatten()) # Convert the tensor into an array of values to pass to the activation layer
    model.add(Dense(1)) # Activation
    # compile model
    opt = RMSprop(lr=0.00005) # set the optimization method
    model.compile(loss=wasserstein_loss, optimizer=opt) # apply the loss metric that we specified before based on the 
    # approximation to Wasserstein-1 loss proposed.
    return model

Similarly for the generator:

* We again need a prior set by a gaussian distribution
* `n_nodes = 128x5x5`, as we will generate 128 low resolution samples of a $5\times5$ image given the input. These images are then **upscaled** through successive convolutional layers, doubling the perimeter and squaring the area.
* A final **hyperbolic tangent** is used for the activation layer, which will output a $80\times80$ greyscaled image


In [5]:
def define_generator(latent_dim):
    # weight initialization
    init = RandomNormal(stddev=0.02)
    # define model
    model = Sequential()
    # prepare 5x5 image
    n_nodes = 128 * 5 * 5
    model.add(Dense(n_nodes, kernel_initializer=init, input_dim=latent_dim))
    model.add(LeakyReLU(alpha=0.2))
    model.add(Reshape((5, 5, 128)))
    # upsize to 10x10
    model.add(Conv2DTranspose(128, (4,4), strides=(2,2), padding='same', kernel_initializer=init))
    model.add(BatchNormalization())
    model.add(LeakyReLU(alpha=0.2))
    # upsize to 20x20
    model.add(Conv2DTranspose(128, (4,4), strides=(2,2), padding='same', kernel_initializer=init))
    model.add(BatchNormalization())
    model.add(LeakyReLU(alpha=0.2))
    # upsize to 40x40
    model.add(Conv2DTranspose(128, (4,4), strides=(2,2), padding='same', kernel_initializer=init))
    model.add(BatchNormalization())
    model.add(LeakyReLU(alpha=0.2))
    # upsize to 80x80
    model.add(Conv2DTranspose(128, (4,4), strides=(2,2), padding='same', kernel_initializer=init))
    model.add(BatchNormalization())
    model.add(LeakyReLU(alpha=0.2))
    # output 80x80x1
    model.add(Conv2D(1, (5,5), activation='tanh', padding='same', kernel_initializer=init))
    
    return model

That was a mouthful! The tough work is done, now we "*only*" need to define the rest of the environment where these models will operate. Starting simple, the WGAN as a whole is defined using the `Sequential()` class to assemble the critic and the generator parts together. Given that the critic is trained in its own level of iterations, we impose fixed weights for it on the larger model. This does not affect the weight assigning process of the critic model per se.

In [6]:
# define the combined generator and critic model, for updating the generator
def define_wgan(generator, critic):
    # make weights in the critic not trainable when the generator updates
    for layer in critic.layers:
        if not isinstance(layer, BatchNormalization):
            layer.trainable = False
    # create the environment to connect both
    model = Sequential()
    # add generator
    model.add(generator)
    # add the critic
    model.add(critic)
    # set the optimization algorithm and ultimate the model's structure
    opt = RMSprop(lr=0.00005)
    model.compile(loss=wasserstein_loss, optimizer=opt)
    return model

We use the `load_real_samples()` function. The process by which we exactly found out how to define it is detailed in *image_processing.ipynb*.

In [7]:
def load_real_samples():
    X = np.empty([6000,80,80,1])
    #X = np.empty([10,103,96,1])
    path_to_image = os.listdir("Fingerprint images/Real")
    for i in range(6000):
        X_i = Image.open("Fingerprint images/Real/"+path_to_image[i]).resize((84,84))
        X_i = ImageOps.grayscale(X_i)
        X_i = np.asarray(X_i)[1:81,1:81]
        X_i = expand_dims(X_i, axis=-1) # expand to 3d, e.g. add channels
        X_i = X_i.astype('float32') # convert from ints to floats
        X_i[0,:,0] = np.zeros(len(X_i[0,:,0])) + 255
        X_i[:,0,0] = np.zeros(len(X_i[:,0,0])) + 255
        X_i = -(X_i - 127.5) / 127.5 # scale from [0,255] to [-1,1]
        X[i] = X_i
    return X

Define a function to take real samples from all of the image data and label them with $y = -1$ (real image)

In [8]:
def generate_real_samples(dataset, n_samples):
    # obtain random indexes to sample the real images
    ix = randint(0, dataset.shape[0], n_samples)
    # sample the real images
    X = dataset[ix]
    # assign the -1 label to the values coming from the real data
    y = -ones((n_samples, 1))
    return X, y

Generate the necessary inputs for the generator as a function of the number of samples and the outputs of the critic. Reshape them in an appropiate form to treat later within the generator model.

In [9]:
def generate_latent_points(latent_dim, n_samples):
    # get the random initialization values for the latent space
    x_input = randn(latent_dim * n_samples)
    # reshape. We need to distribute the random coordinates generated before across all the samples
    x_input = x_input.reshape(n_samples, latent_dim)
    return x_input

Use the previously defined function to generate the priors. Use them as input to create fake images which will be stored in the pertinent parameters of $X$. Label them as $y=1$ (fake data) for the next generation of the discriminator to work unto.

In [10]:
def generate_fake_samples(generator, latent_dim, n_samples):
    # use the function in kernel[9] to generate the random latent points 
    x_input = generate_latent_points(latent_dim, n_samples)
    # predict outputs
    X = generator.predict(x_input)
    # assign the 1 label to the fake images that come from the discriminator.
    y = ones((n_samples, 1))
    return X, y

Here we create a summary function. What we want is to generate fake samples with our latest WGAN iteration, plot some of them to see how they look like, and store the plot and the model as files for later inspection and use.

In [11]:
def summarize_performance(step, g_model, latent_dim, n_samples=100):
    # use the generator to create fake data
    X, _ = generate_fake_samples(g_model, latent_dim, n_samples)
    # scale from [-1,1] to [0,1]
    X = (X + 1) / 2.0
    # plot images
    for i in range(4 * 6):
        pyplot.subplot(4, 6, 1 + i)
        pyplot.axis('off')
        pyplot.imshow(X[i, :, :, 0], cmap='gray_r')
    # save plot
    filename1 = 'generated_plot_%04d.png' % (step+1)
    pyplot.savefig(filename1)
    pyplot.close()
    # save the model
    filename2 = 'model_%04d.h5' % (step+1)
    g_model.save(filename2)
    print('>Saved: %s and %s' % (filename1, filename2))

Finally we define the parameters to plot the evolution of the losses for the critic with real and fake data (c1,c2) and for the generator(g), as the model self-optimizes with subsequent iterations.

In [12]:
def plot_history(d1_hist, d2_hist, g_hist):
    pyplot.plot(d1_hist, label='crit_real')
    pyplot.plot(d2_hist, label='crit_fake')
    pyplot.plot(g_hist, label='gen')
    pyplot.legend()
    pyplot.savefig('plot_line_plot_loss.png')
    pyplot.close()

Finally, we put everything together to train the model. Some commentary: 

* Half batches as $\frac{1}{2}$ corresponds to real images and the other half to the generator's output.
* GANs in general are about updating the Discriminator / Critic several times for every round of Generator updating, as in theory there is no risk of making the process fail due to vanishing gradients. 
* The number of epochs and batch size is arbitrary. Depending on the complexity of image patterns and pixel size, it can take more or less time for the WGAN to converge.

In [13]:
def train(g_model, c_model, gan_model, dataset, latent_dim, n_epochs=30, n_batch=64, n_critic=10):
    # batches per training epoch and training iterations
    bat_per_epo = int(dataset.shape[0] / n_batch)
    n_steps = bat_per_epo * n_epochs
    half_batch = int(n_batch / 2)
    # lists for keeping track of loss
    c1_hist, c2_hist, g_hist = list(), list(), list()
    # manually enumerate epochs
    for i in range(n_steps):
        # update the critic more than the generator
        c1_tmp, c2_tmp = list(), list()
        for _ in range(n_critic):
            # sample from the data
            X_real, y_real = generate_real_samples(dataset, half_batch)
            # update critic based on real images
            c_loss1 = c_model.train_on_batch(X_real, y_real)
            c1_tmp.append(c_loss1)
            # use the latest iteration of the generator to create fake images
            X_fake, y_fake = generate_fake_samples(g_model, latent_dim, half_batch)
            # update critic based on fake images
            c_loss2 = c_model.train_on_batch(X_fake, y_fake)
            c2_tmp.append(c_loss2)
        # store critic loss
        c1_hist.append(mean(c1_tmp))
        c2_hist.append(mean(c2_tmp))
        # prepare the latent points that we will introduce in the generator and label the fake images
        X_gan = generate_latent_points(latent_dim, n_batch)
        y_gan = -ones((n_batch, 1))
        # generator actualization based on the discrimination made by the critic
        g_loss = gan_model.train_on_batch(X_gan, y_gan)
        g_hist.append(g_loss)
        # summarize loss for current iteration
        print('>%d, c1=%.3f, c2=%.3f g=%.3f' % (i+1, c1_hist[-1], c2_hist[-1], g_loss))
        # assess the evolution of the WGAN for the current iteration
        if (i+1) % bat_per_epo == 0:
            summarize_performance(i, g_model, latent_dim)
    # once the WGAN has finished the training, we plot the evolution of the Wasserstein-1 measurements
    plot_history(c1_hist, c2_hist, g_hist)

We choose the number of latent dimensions that we want to model. Then, we define the critic and generator objects as specified above, and merge them using the `define_wgan(generator, critic)` function. Finally we load the data and make a quick check that it is stored correctly.

In [14]:
# size of the latent space
latent_dim = 200
# define critic and generator, assign latent dimension to the generator.
critic = define_critic()
generator = define_generator(latent_dim)
# define the WGAN object
wgan_model = define_wgan(generator, critic)
# load image data
dataset = load_real_samples()
print(dataset.shape)

(6000, 80, 80, 1)


The only thing left to do is to press the red button and enjoy. This launches the training of the model.

In [None]:
train(generator, critic, wgan_model, dataset, latent_dim)