<a href="https://colab.research.google.com/github/imiled/DL_Tools_For_Finance/blob/master/GAN_with_solutions.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Generative Adversarial Networks

Content:
 * Intro: why do they exist, general structure, current impact
 * Simple example: 1D GAN to generate a specific function
 * Issue: evaluating the generative model
 * Extension: generating various classes of 1 variable functions
 * Issue: non-convergence and mode collapse
 * Application: GANs for semi-supervised learning (MNIST data)
 * Bonus task: feature matching
 * Bonus task: GANs vs. VAEs
 * Final remarks
 * References

## An incomplete but necessary introduction

GANs are generative models originally introduced in [1] which don't attempt to explicitly model the underlying density of the data $p_{\text{data}}$, but rather learn it implicitly and sample directly from it. VAEs, on the other hand, learn and sample from an explicit approximation of $p_{\text{data}}$.

One of the key characteristics of GANs is their training dynamics, with an interplay between two components: a _generator_ $G$ and a _discriminator_ $D$. These are usually implemented as deep neural networks, but in principle the adversarial dynamic governing GANs' training may be applied to non neural models.

GANs offer various __advantages__ over previous generative models, such as:
 * they offer parallel sampling
 * the only real restriction on the generator function is that it be differentiable
 * neural networks are universal function approximators (in principle)
 
However, they do also introduce new __disadvantages__:
 * the training is potentially unstable, leading to non-convergence and mode collapse
 * differentiability of the generator means their _direct_ application to discrete data, such as text, is harder
 * direct manipulation of the latent space of the generator is not easy


### Structure and training of a GAN

In [None]:
import IPython

Let's see how a GAN works (image taken from [2]):

In [None]:
IPython.display.HTML('<img src="https://drive.google.com/uc?export=view&id=10hF29LJBMjC4tCpNe9FTfe_TDKXVjGii" width="600">') 


The training happens in two scenarios:
 1. Real data is sampled from the training set and shown to the discriminator $D$. The objective here is to get $D$'s value as close as possible to $1$.
 2. $G$ generates a fake data sample $x$, which is then shown to the discriminator. Here $G$'s objective is to generate $x=G(z)$ such that $D(x)\approx 1$, while $D$'s objective is to achieve $D(x)\approx 0$
 
The assumption is that $D$ sees the same number of real and fake examples. Normally, these are drawn as two minibatches and the two learning steps (to optimise $G$ and $D$, respectively) are done at once, i.e. simultaneous SGD.

#### **Question :)**  When can we consider the model to be trained (i.e. be at an equilibrium)?

When $D$'s classification accuracy is around .5

## Enough with this s**t, show me the code

We'll start off by building a toy example of a GAN which learns to generate pairs of real numbers obeying some sort of functional relationship, for example cubic $y=x^3$.

In other words, we want a model that sample points which, when plotted, look, like a cubic curve.

In [None]:
# Our code generates plenty of warnings because of GANs' weird compiling 
# requirements. This is to mute them.
import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'
%tensorflow_version 1.x

In [None]:
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline

In [None]:
# define the cubic
def f(x):
    return x**3

In [None]:
# plot a sample of points between, say, -5 and 5
x = np.linspace(-5, 5, 100)
y = f(x)

plt.plot(x, y)

__Why do we need this?!__

Remember: a GAN is trained on both real and fake points. _Real_ training points will be taken from this cubic. 

So we need some code to generate them :)

In [None]:
# Let's stick to generating points between -5 and 5, shall we?
def generate_real_samples(f, n=64):
    x = (np.random.rand(n) - .5) * 10
    y = f(x)
    x = x.reshape(n, 1)
    y = y.reshape(n, 1)
    labels = np.ones((n, 1))
    xy = np.hstack((x,y))
    return (xy, labels)

In [None]:
x = np.linspace(-5, 5, 100)
y = f(x)
gen_xy, _ = generate_real_samples(f, 10) #we don't need the labels just yet
plt.plot(x, y)
plt.plot(gen_xy[:,0], gen_xy[:,1], "ro")

### Let's get serious: the discriminator

We can now move to one of the two fundamental components of a GAN: the discriminator. This is the model which will try to discern whether the points it's seeing are real or fake.

You can think of it as a kind of classifier.

We'll build the discriminator model with the following ingredients:
 * 1 hidden layer of 30 nodes
 * ReLU activations
 * He intialisation
 * binary crossentropy loss

In [None]:
from keras.models import Sequential
from keras.layers import Dense

def build_discriminator(n_inputs=2, nh_1=30, init="he_uniform", act="relu"):
    model = Sequential()
    model.add(Dense(nh_1, activation=act, kernel_initializer=init, input_dim=n_inputs))
    model.add(Dense(1, activation='sigmoid'))
    model.compile(loss='binary_crossentropy', optimizer='adam', metrics=['accuracy'])
    return model

In [None]:
from keras.utils.vis_utils import plot_model

model = build_discriminator()
model.summary()
plot_model(model, show_shapes=True, show_layer_names=True)

### The star of the show: the generator

The generator is the key piece of a GAN, being the model tasked with learning how to generate realistic samples, _realistic_ here meaning: looking as if they had been drawn from the underlying distribution of the training data.

The way the generator is helped to fulfil its role is by _being trained_ with the feedback obtained from $D$'s classification performance. Other than that, it is really nothing more than a (nonlinear) function on top of some random noise.

The ingredients for our discriminator model:
 * A $n$-dimensional latent space of Gaussian features (random noise fed as input to the next layer)
 * A dense hidden layer of 30 nodes with ReLU activations
 * He initialisation
 * linear activation on the output layer (to scale the generated $(x,y)$ pairs)

In [None]:
def build_generator(latent_dim, nh_1=30, init="he_uniform", act="relu", n_outputs=2):
    model = Sequential()
    model.add(Dense(nh_1, activation=act, kernel_initializer=init, input_dim=latent_dim))
    model.add(Dense(n_outputs, activation='linear'))
    return model

In [None]:
model = build_generator(10)
model.summary()
plot_model(model, show_shapes=True, show_layer_names=True)

#### __Question :)__ Why are we not compiling $G$? Shouldn't there be some SGD details in there??

The generator is _not_ compiled because it is _never directly fit_, unlike the discriminator which does go through a training scenario _on its own_ when it sees real samples.

This is not to say no updates are backpropagated to the generator! As we'll see in a moment, backpropagation needs to happen _through_ the discriminator.

#### A generator dry run

Let's see how the model can be used to generate samples, although at the moment we shouldn't get our hopes too high...

In [None]:
# sampling in the latent space
def sample_latent_points(latent_dim, n):
    # generate points in the latent space
    x_input = np.random.randn(latent_dim * n)
    # reshape into a batch of inputs for the network
    x_input = x_input.reshape(n, latent_dim)
    return x_input

In [None]:
# we already know what we'll use the generator for... :P
def generate_fake_samples(generator, latent_dim, n):
    # sample points in latent space
    x_input = sample_latent_points(latent_dim, n)
    # pass through the generator
    X = generator.predict(x_input)
    labels = np.zeros((n, 1))
    return X, labels

In [None]:
# let's put it to work
latent_dim = 5
model = build_generator(latent_dim, n_outputs=2)
X_fake, _ = generate_fake_samples(model, latent_dim, 100)
plt.scatter(X_fake[:,0], X_fake[:,1])

More cubic or more cubist?

### Building the GAN

Let's write the code to build and train our GAN. Predictably, we'll join the generator and discriminator we implemented thus far and - the most important bit - define how they play out together during training. (This is where we finally train $G$!!) 

Finally, we'll put in place a simple evaluation of the GAN.

To build our joint model, we'll use:
* binary crossentropy loss
* Adam

In [None]:
def build_gan(generator, discriminator):
    # NB: Why are we doing this?
    discriminator.trainable = False
    # stack the models together
    model = Sequential()
    # add generator
    model.add(generator)
    # add the discriminator
    model.add(discriminator)
    # We _do_ compile this model!
    model.compile(loss='binary_crossentropy', optimizer='adam')
    return model

In [None]:
latent_dim = 20
discriminator = build_discriminator()
generator = build_generator(latent_dim)
gan_model = build_gan(generator, discriminator)
gan_model.summary()
plot_model(gan_model, show_shapes=True, show_layer_names=True)

#### __Question :)__ Why are we not training $D$ in the GAN model?

Me marked $D$'s weights as non trainable because $D$'s role within the GAN model is only to provide feedback on $G$ performance, i.e. on fake samples.

We'll take care of training the discriminator as a standalone model on both real and fake data, but within the GAN we don't really want to update its weights to prevent overfitting on fake points.

### Defining the training

Let's now write the training code. Remember: the discriminator is trained on the same number of real and fake samples and _outside_ the joint model. Within the latter, the discriminator's role is merely to provide feedback to the generator.

In [None]:
# train the generator and discriminator
def train(generator, discriminator, gan, latent_dim, n_epochs=10000, n_batch=128):
    # determine half the size of one batch, for updating the discriminator
    half_batch = int(n_batch / 2)
    # manually enumerate epochs
    for i in range(n_epochs):
        # real samples
        x_real, y_real = generate_real_samples(half_batch)
        # fake examples
        x_fake, y_fake = generate_fake_samples(generator, latent_dim, half_batch)
        # train the discriminator
        discriminator.train_on_batch(x_real, y_real)
        discriminator.train_on_batch(x_fake, y_fake)
        # sample the latent space: THIS is the input to the joint model!
        x_gan = sample_latent_points(latent_dim, n_batch)
        # QUESTION: Why are we switching the labels here??
        y_gan = np.ones((n_batch, 1))
        # train the generator using the discriminator's feedback
        gan.train_on_batch(x_gan, y_gan)

#### __Question :)__ Why are we switching labels when training the joint model?

We need the discriminator to believe that the samples it's seeing are _real_, even though they're not. The generated loss will _not_ impact $D$'s weights (they're nontrainable!), instead, it'll only help $G$. Indeed, a high loss would mean $G$'s samples are still too unrealistic, as $D$ would be marking them as 0.

### Adding some evaluation

In principle we could just take the plunge and train our GAN, now...but how would we know how well it's working?

In [None]:
from sklearn.metrics import mean_squared_error as mse

In [None]:
def evaluate(discriminator, generator, latent_dim, ground_truth_f, error_function, eval_size=100):
    generated_xy, generated_labs = generate_fake_samples(generator, latent_dim, eval_size)
    real_xy, real_labs = generate_real_samples(ground_truth_f, eval_size)
    _, dar = discriminator.evaluate(real_xy, real_labs, verbose=0)
    _, daf = discriminator.evaluate(generated_xy, generated_labs, verbose=0)
    correct_y = ground_truth_f(generated_xy[:,0])
    return dar, daf, error_function(correct_y, generated_xy[:,1])

Let's incorporate an evaluation step within the training procedure.

In [None]:
# train the generator and discriminator
def train(generator, discriminator, gan, latent_dim, n_epochs=10000, n_batch=128, eval_interval=200):
    # determine half the size of one batch, for updating the discriminator
    half_batch = int(n_batch / 2)
    # manually enumerate epochs
    for i in range(n_epochs):
        # real samples
        x_real, y_real = generate_real_samples(f, half_batch)
        # fake examples
        x_fake, y_fake = generate_fake_samples(generator, latent_dim, half_batch)
        # train the discriminator
        discriminator.train_on_batch(x_real, y_real)
        discriminator.train_on_batch(x_fake, y_fake)
        # sample the latent space: THIS is the input to the joint model!
        x_gan = sample_latent_points(latent_dim, n_batch)
        # QUESTION: Why are we switching the labels here??
        y_gan = np.ones((n_batch, 1))
        # train the generator using the discriminator's feedback
        gan.train_on_batch(x_gan, y_gan)
        if (i+1) % eval_interval == 0:
            dar, daf, ee = evaluate(discriminator, generator, latent_dim, f, mse)
            print("epoch %s: discr acc real: %s, discr acc fake: %s, eval error: %s" % (i, dar, daf, ee))

For the sake of completeness, let's join all the relevant code in a single cell. It'll be useful, trust me.

In [None]:
def f(x):
    return x**3

def generate_real_samples(f, n=64):
    x = (np.random.rand(n) - .5) * 10
    y = f(x)
    x = x.reshape(n, 1)
    y = y.reshape(n, 1)
    labels = np.ones((n, 1))
    xy = np.hstack((x,y))
    return (xy, labels)

def build_discriminator(n_inputs=2, nh_1=30, init="he_uniform", act="relu"):
    model = Sequential()
    model.add(Dense(nh_1, activation=act, kernel_initializer=init, input_dim=n_inputs))
    model.add(Dense(1, activation='sigmoid'))
    model.compile(loss='binary_crossentropy', optimizer='adam', metrics=['accuracy'])
    return model

def build_generator(latent_dim, nh_1=30, init="he_uniform", act="relu", n_outputs=2):
    model = Sequential()
    model.add(Dense(nh_1, activation=act, kernel_initializer=init, input_dim=latent_dim))
    model.add(Dense(n_outputs, activation='linear'))
    return model

#sampling in the latent space
def sample_latent_points(latent_dim, n):
    # generate points in the latent space
    x_input = np.random.randn(latent_dim * n)
    # reshape into a batch of inputs for the network
    x_input = x_input.reshape(n, latent_dim)
    return x_input

def generate_fake_samples(generator, latent_dim, n):
    # sample points in latent space
    x_input = sample_latent_points(latent_dim, n)
    # pass through the generator
    X = generator.predict(x_input)
    labels = np.zeros((n, 1))
    return X, labels

def build_gan(generator, discriminator):
    discriminator.trainable = False
    # stack the models together
    model = Sequential()
    # add generator
    model.add(generator)
    # add the discriminator
    model.add(discriminator)
    # We _do_ compile this model!
    model.compile(loss='binary_crossentropy', optimizer='adam')
    return model

def evaluate(discriminator, generator, latent_dim, ground_truth_f, error_function, eval_size=100):
    generated_xy, generated_labs = generate_fake_samples(generator, latent_dim, eval_size)
    real_xy, real_labs = generate_real_samples(ground_truth_f, eval_size)
    _, dar = discriminator.evaluate(real_xy, real_labs, verbose=0)
    _, daf = discriminator.evaluate(generated_xy, generated_labs, verbose=0)
    correct_y = ground_truth_f(generated_xy[:,0])
    return dar, daf, error_function(correct_y, generated_xy[:,1]), generated_xy[:,0], generated_xy[:,1]

# train the generator and discriminator
def train(generator, discriminator, gan, latent_dim, n_epochs=10000, n_batch=128, eval_interval=200):
    gen_xy_for_evaluation = []
    # determine half the size of one batch, for updating the discriminator
    half_batch = int(n_batch / 2)
    # manually enumerate epochs
    for i in range(n_epochs):
        # real samples
        x_real, y_real = generate_real_samples(f, half_batch)
        # fake examples
        x_fake, y_fake = generate_fake_samples(generator, latent_dim, half_batch)
        # train the discriminator
        discriminator.train_on_batch(x_real, y_real)
        discriminator.train_on_batch(x_fake, y_fake)
        # sample the latent space: THIS is the input to the joint model!
        x_gan = sample_latent_points(latent_dim, n_batch)
        # QUESTION: Why are we switching the labels here??
        y_gan = np.ones((n_batch, 1))
        # train the generator using the discriminator's feedback
        gan.train_on_batch(x_gan, y_gan)
        if (i+1) % eval_interval == 0:
            dar, daf, ee, gen_x, gen_y = evaluate(discriminator, generator, latent_dim, f, mse)
            gen_xy_for_evaluation.append((gen_x, gen_y))
            print("epoch %s: discr acc real: %s, discr acc fake: %s, eval error: %s" % (i, dar, daf, ee))
    return gen_xy_for_evaluation

In [None]:
latent_dim = 10
discriminator = build_discriminator(nh_1=30)
generator = build_generator(latent_dim, nh_1=50)
gan_model = build_gan(generator, discriminator)
gen_xy_data = train(generator, discriminator, gan_model, latent_dim, n_epochs=20000, eval_interval=5000)

In [None]:
x = np.linspace(-5, 5, 100)
y = f(x)
plt.plot(x, y)
plt.plot(*gen_xy_data[0], "ro")

In [None]:
x = np.linspace(-5, 5, 100)
y = f(x)
plt.plot(x, y)
plt.plot(*gen_xy_data[-2], "ro")

In [None]:
x = np.linspace(-5, 5, 100)
y = f(x)
plt.plot(x, y)
plt.plot(*gen_xy_data[-1], "ro")

What do we observe? Is the cubic nonlinearity being perfectly captured? What if we gave our networks some more depth?

### Adding some layers

Let's see if by adding some layers to $G$ and $D$ we're able to fit that cubic a bit better...

In [None]:
def f(x):
    return x**3

def generate_real_samples(f, n=64):
    x = (np.random.rand(n) - .5) * 10
    y = f(x)
    x = x.reshape(n, 1)
    y = y.reshape(n, 1)
    labels = np.ones((n, 1))
    xy = np.hstack((x,y))
    return (xy, labels)

def build_discriminator(n_inputs=2, nh=[30], init="he_uniform", act="relu"):
    model = Sequential()
    indim = n_inputs
    for nhid in nh:
        model.add(Dense(nhid, activation=act, kernel_initializer=init, input_dim=indim))
        indim=nhid
    model.add(Dense(1, activation='sigmoid'))
    model.compile(loss='binary_crossentropy', optimizer='adam', metrics=['accuracy'])
    return model

def build_generator(latent_dim, nh=[30], init="he_uniform", act="relu", n_outputs=2):
    model = Sequential()
    indim = latent_dim
    for nhid in nh:
        model.add(Dense(nhid, activation=act, kernel_initializer=init, input_dim=indim))
        indim=nhid
    model.add(Dense(n_outputs, activation='linear'))
    return model

#sampling in the latent space
def sample_latent_points(latent_dim, n):
    # generate points in the latent space
    x_input = np.random.randn(latent_dim * n)
    # reshape into a batch of inputs for the network
    x_input = x_input.reshape(n, latent_dim)
    return x_input

def generate_fake_samples(generator, latent_dim, n):
    # sample points in latent space
    x_input = sample_latent_points(latent_dim, n)
    # pass through the generator
    X = generator.predict(x_input)
    labels = np.zeros((n, 1))
    return X, labels

def build_gan(generator, discriminator):
    discriminator.trainable = False
    # stack the models together
    model = Sequential()
    # add generator
    model.add(generator)
    # add the discriminator
    model.add(discriminator)
    # We _do_ compile this model!
    model.compile(loss='binary_crossentropy', optimizer='adam')
    return model

def evaluate(discriminator, generator, latent_dim, ground_truth_f, error_function, eval_size=100):
    generated_xy, generated_labs = generate_fake_samples(generator, latent_dim, eval_size)
    real_xy, real_labs = generate_real_samples(ground_truth_f, eval_size)
    _, dar = discriminator.evaluate(real_xy, real_labs, verbose=0)
    _, daf = discriminator.evaluate(generated_xy, generated_labs, verbose=0)
    correct_y = ground_truth_f(generated_xy[:,0])
    return dar, daf, error_function(correct_y, generated_xy[:,1]), generated_xy[:,0], generated_xy[:,1]

# train the generator and discriminator
def train(generator, discriminator, gan, latent_dim, n_epochs=10000, n_batch=128, eval_interval=200):
    gen_xy_for_evaluation = []
    # determine half the size of one batch, for updating the discriminator
    half_batch = int(n_batch / 2)
    # manually enumerate epochs
    for i in range(n_epochs):
        # real samples
        x_real, y_real = generate_real_samples(f, half_batch)
        # fake examples
        x_fake, y_fake = generate_fake_samples(generator, latent_dim, half_batch)
        # train the discriminator
        discriminator.train_on_batch(x_real, y_real)
        discriminator.train_on_batch(x_fake, y_fake)
        # sample the latent space: THIS is the input to the joint model!
        x_gan = sample_latent_points(latent_dim, n_batch)
        # QUESTION: Why are we switching the labels here??
        y_gan = np.ones((n_batch, 1))
        # train the generator using the discriminator's feedback
        gan.train_on_batch(x_gan, y_gan)
        if (i+1) % eval_interval == 0:
            dar, daf, ee, gen_x, gen_y = evaluate(discriminator, generator, latent_dim, f, mse)
            gen_xy_for_evaluation.append((gen_x, gen_y))
            print("epoch %s: discr acc real: %s, discr acc fake: %s, eval error: %s" % (i, dar, daf, ee))
    return gen_xy_for_evaluation

In [None]:
latent_dim = 5
discriminator = build_discriminator(nh=[10, 20, 30])
generator = build_generator(latent_dim, nh=[10, 20, 30])
gan_model = build_gan(generator, discriminator)
gen_xy_data = train(generator, discriminator, gan_model, latent_dim, n_epochs=20000, eval_interval=5000)

In [None]:
x = np.linspace(-5, 5, 100)
y = f(x)
plt.plot(x, y)
plt.plot(*gen_xy_data[3], "ro")

## Learning multimodal distributions

Let's consider an extension of the model we've just seen. What would happen if we tried to learn _several_ possible functions, rather than just a single cubic? 

For example, we may want to learn a cubic and two quadrics, a parabola and a circle.

In [None]:
#The circle is not a function, so we'll need to change our (python) functions accordingly...

def fcub(x):
    return x**3

def fpar(x):
    return x**2

def fcirc(x):
    #let's fix the radius at 1, it's easier
    r=1
    #Split the output to have half the points on the upper circle, half on the lower
    y_p = (r-x[:len(x)//2]**2)**.5
    y_n = -(r-x[(len(x)//2):]**2)**.5
    return  np.concatenate((y_p, y_n))

### Discriminator, again

Now let's adapt our discriminator function. In fact, it shouldn't look much different...

In [None]:
def build_discriminator(n_inputs=100, nh=[200], init="he_uniform", act="relu"): 
    model = Sequential()
    indim = n_inputs
    for nhid in nh:
        model.add(Dense(nhid, activation=act, kernel_initializer=init, input_dim=indim))
        indim=nhid
    model.add(Dense(1, activation='sigmoid'))
    model.compile(loss='binary_crossentropy', optimizer='adam', metrics=['accuracy'])
    return model

### Generator, too

Now, this is where the situation becomes slightly more complex. Since we're generating various functions, if $G$ produced a _single_ fake point at a time, like before, it'd be difficult to establish how realistic that point is. Think about it: it'd be like evaluating an image generator from a single pixel.

Instead, we'll ask $G$ to output an array of points.

In [None]:
def build_generator(latent_dim, nh=[200], init="he_uniform", act="relu", n_outputs=100):
    model = Sequential()
    indim = latent_dim
    for nhid in nh:
        model.add(Dense(nhid, activation=act, kernel_initializer=init, input_dim=indim))
        indim=nhid
    model.add(Dense(n_outputs, activation='linear'))
    return model

The generation of real samples needs a couple of changes, too. Given that we decided to restrict to radius 1 for the circle, we'll only generate points within [-1, 1] _for all functions_. 

Furthermore, we don't need to reshape the generated points as they're already in matrix form.

In [None]:
def generate_real_samples(f, n=64, points_per_sample=50):
    x = (np.random.rand(n, points_per_sample) - .5)*2
    y = np.apply_along_axis(f, 1, x) #axis concept is actually superfluous here...
    labels = np.ones((n, 1))
    xy = np.hstack((x,y))
    return (xy, labels)

### Evaluation for multimodal ground truth

The only thing we need to do here is to remove the `ground_truth_f` from the subjective evaluation (output plot) because sampled points used for evaluation may belong to any of the modes of the distribution...hopefully.

In [None]:
def evaluate(discriminator, generator, ground_truth_f, latent_dim, points_per_sample, eval_size=100):
    generated_xy, generated_labs = generate_fake_samples(generator, latent_dim, eval_size)
    real_xy, real_labs = generate_real_samples(ground_truth_f, eval_size, points_per_sample)
    _, dar = discriminator.evaluate(real_xy, real_labs, verbose=0)
    _, daf = discriminator.evaluate(generated_xy, generated_labs, verbose=0)
    return dar, daf, generated_xy[:,:points_per_sample], generated_xy[:,points_per_sample:]

### Training for multimodal ground truth

In the training function, we need to make sure all modes are represented in the training samples. We'll add a utility function to help with this part.

In [None]:
def generate_for_modes(modes, batch, points_per_sample):
    split_batch = [batch//len(modes) for i in range(len(modes)-1)]
    split_batch.append(batch- (batch//len(modes)) * (len(modes)-1)) #what's left, may or may not be the %
    samples_split = [generate_real_samples(modes[i], sb, points_per_sample) for i,sb in enumerate(split_batch)]
    zipped_samples = list(zip(*samples_split))
    return np.concatenate(zipped_samples[0]), np.concatenate(zipped_samples[1])

In [None]:
def train(generator, discriminator, gan, latent_dim, 
          n_epochs=5000, n_batch=128, points_per_sample=20, 
          eval_interval=200, eval_size=20, modes=[fpar, fcub, fcirc]):
    
    gen_xy_for_evaluation = []
    
    # determine half the size of one batch, for updating the discriminator
    half_batch = int(n_batch / 2)
    # manually enumerate epochs
    for i in range(n_epochs):
        # real samples
        x_real, y_real = generate_for_modes(modes, half_batch, points_per_sample)
        # fake examples
        x_fake, y_fake = generate_fake_samples(generator, latent_dim, half_batch)
        # train the discriminator
        discriminator.train_on_batch(x_real, y_real)
        discriminator.train_on_batch(x_fake, y_fake)
        # sample the latent space
        x_gan = sample_latent_points(latent_dim, n_batch)
        y_gan = np.ones((n_batch, 1))
        # train the generator using the discriminator's feedback
        gan.train_on_batch(x_gan, y_gan)
        if (i+1) % eval_interval == 0:
            dar, daf, gen_x, gen_y = evaluate(discriminator, generator, f, latent_dim, 
                                              points_per_sample=20, eval_size=20)
            gen_xy_for_evaluation.append((gen_x, gen_y))
            print("epoch %s: discr acc real: %s, discr acc fake: %s" % (i, dar, daf))
    return gen_xy_for_evaluation

### Putting it all together

Let's put it all together, and in a class, for easier management...

In [None]:
#The circle is not a function, so we'll need to change our (python) functions accordingly...

def fcub(x):
    return x**3

def fpar(x):
    return x**2

def fcirc(x):
    #let's fix the radius at 1, it's easier
    r=1
    #Split the output to have half the points on the upper circle, half on the lower
    y_p = (r-x[:len(x)//2]**2)**.5
    y_n = -(r-x[(len(x)//2):]**2)**.5
    return  np.concatenate((y_p, y_n))

def f5(x):
    return x**5+x**3-x**2

#I know, it's a pretty shitty class, but it's only to reduce shadowing
class MultimodalGanExperiment:
    
    def __init__(self, points_per_sample=50, latent_dim = 10, eval_size=20, batch_size=128):
        self.points_per_sample = points_per_sample
        self.latent_dim = latent_dim
        self.eval_size = eval_size
        self.n_batch = batch_size
        
    def build_discriminator(self, nh=[200], init="he_uniform", act="relu"): 
        n_inputs= self.points_per_sample * 2 # *2 b/c we're accounting for x and y!
        model = Sequential()
        indim = n_inputs
        for nhid in nh:
            model.add(Dense(nhid, activation=act, kernel_initializer=init, input_dim=indim))
            indim=nhid
        model.add(Dense(1, activation='sigmoid'))
        model.compile(loss='binary_crossentropy', optimizer='adam', metrics=['accuracy'])
        return model

    def build_generator(self, nh=[200], init="he_uniform", act="relu"):
        n_outputs = self.points_per_sample * 2
        model = Sequential()
        indim = self.latent_dim
        for nhid in nh:
            model.add(Dense(nhid, activation=act, kernel_initializer=init, input_dim=indim))
            indim=nhid
        model.add(Dense(n_outputs, activation='linear'))
        return model

    def generate_real_samples(self, f, n):
        x = (np.random.rand(n, self.points_per_sample) - .5)*2
        y = np.apply_along_axis(f, 1, x) #axis concept is actually superfluous here...
        labels = np.ones((n, 1))
        xy = np.hstack((x,y))
        return (xy, labels)

    #sampling in the latent space
    def sample_latent_points(self, n):
        # generate points in the latent space
        x_input = np.random.randn(self.latent_dim * n)
        # reshape into a batch of inputs for the network
        x_input = x_input.reshape(n, self.latent_dim)
        return x_input

    def generate_fake_samples(self, generator, n):
        # sample points in latent space
        x_input = sample_latent_points(self.latent_dim, n)
        # pass through the generator
        X = generator.predict(x_input)
        labels = np.zeros((n, 1))
        return X, labels

    def build_gan(self, generator, discriminator):
        discriminator.trainable = False
        # stack the models together
        model = Sequential()
        # add generator
        model.add(generator)
        # add the discriminator
        model.add(discriminator)
        # We _do_ compile this model!
        model.compile(loss='binary_crossentropy', optimizer='adam')
        return model

    def evaluate(self, discriminator, generator, modes):
        generated_xy, generated_labs = self.generate_fake_samples(generator, self.eval_size)
        real_xy, real_labs = self.generate_for_modes(modes, self.eval_size)
        _, dar = discriminator.evaluate(real_xy, real_labs, verbose=0)
        _, daf = discriminator.evaluate(generated_xy, generated_labs, verbose=0)
        return dar, daf, generated_xy[:,:self.points_per_sample], generated_xy[:,self.points_per_sample:]

    def generate_for_modes(self, modes, batch):
        split_batch = [batch//len(modes) for i in range(len(modes)-1)]
        split_batch.append(batch- (batch//len(modes)) * (len(modes)-1)) #what's left, may or may not be the %
        samples_split = [self.generate_real_samples(modes[i], sb) for i,sb in enumerate(split_batch)]
        zipped_samples = list(zip(*samples_split))
        return np.concatenate(zipped_samples[0]), np.concatenate(zipped_samples[1])
#         for i,sb in enumerate(split_batch):
#             yield self.generate_real_samples(modes[i], sb)

    def train(self, generator, discriminator, gan, modes,
              n_epochs=5000, n_batch=128, 
              eval_interval=200, eval_size=20):

        gen_xy_for_evaluation = []

        # determine half the size of one batch, for updating the discriminator
        half_batch = int(self.n_batch / 2)
        # manually enumerate epochs
        for i in range(n_epochs):
            # real samples
            x_real, y_real = self.generate_for_modes(modes, half_batch)
            # fake examples
            x_fake, y_fake = self.generate_fake_samples(generator, half_batch)
            # train the discriminator
            discriminator.train_on_batch(x_real, y_real)
            discriminator.train_on_batch(x_fake, y_fake)
            # sample the latent space
            x_gan = self.sample_latent_points(self.n_batch)
            y_gan = np.ones((self.n_batch, 1))
            # train the generator using the discriminator's feedback
            gan.train_on_batch(x_gan, y_gan)
            if (i+1) % eval_interval == 0:
                dar, daf, gen_x, gen_y = self.evaluate(discriminator, generator, modes)
                gen_xy_for_evaluation.append((gen_x, gen_y))
                print("epoch %s: discr acc real: %s, discr acc fake: %s" % (i, dar, daf))
        return gen_xy_for_evaluation

In [None]:
mge = MultimodalGanExperiment(points_per_sample=50, latent_dim = 10, eval_size=36, batch_size=3)
modes = [fcirc, fcub, fpar]
# fig, axs = plt.subplots(3, 6)
x, _= mge.generate_for_modes(modes, 6)
# for i,x in enumerate(xg):
#     for j in range(6):
#         axs[i,j].plot(x[0][j,:10], x[0][j,10:], "ro")
fig, axs=plt.subplots(3, 2) #10, 6 even if we've got more
fig.set_size_inches(6, 9)
axsf = axs.flatten()
for i,xc in enumerate(x):
        axsf[i].plot(xc[:50], xc[50:], "ro")

In [None]:
#ld=40, pps=20
mge = MultimodalGanExperiment(points_per_sample=200, latent_dim = 15, eval_size=50, batch_size=128)

#modes = [fcirc, fcub, fpar]
modes = [f5, fcub, fpar]

discriminator = mge.build_discriminator(nh=[80, 40, 20, 10]) #100 100 100
generator = mge.build_generator(nh=[20, 40, 60]) #70 70 70 70
gan_model = mge.build_gan(generator, discriminator)

gen_xy_data = mge.train(generator, discriminator, gan_model, modes, n_epochs=20000, eval_interval=2000) #50000

Let's have a look at generated samples...

In [None]:
from itertools import product
fig, axs=plt.subplots(len(gen_xy_data), 8) 
fig.set_size_inches(15, 18)
for i,j in product(range(len(gen_xy_data)), range(8)):
    axs[i,j].plot(gen_xy_data[i][0][j], gen_xy_data[i][1][j], "ro")

### Adding some steroids

Our GAN, as it is, doesn't seem to be particularly effective with the learning of a multimodal distribution. Why? What makes this case so different from the unimodal, 1D function example?

It's hard to give a precise answer to this question, particularly with the rather weak understanding of GANs' dynamics that most of us currently have. One reasonable guess is that the functional relationships of $(x,y)$ pairs in the multimodal case are more diverse and complex, and as such, we may need to increase the network's complexity accordingly.

Indeed, we may think that learning the shape of a single 1D function is really a 1D problem, while learning the shapes of various 1D functions is closer to a 2D problem...

With that in mind, what could we add to our network to make it more sensitive to the relationship among 2D points?



In [None]:
from keras.layers import Conv2D, LeakyReLU, Dropout, Flatten, Reshape, InputLayer, Conv2DTranspose, Permute
from keras.backend import expand_dims

In [None]:
class MultimodalGanExperiment:
    
    def __init__(self, points_per_sample=50, latent_dim = 10, eval_size=20, batch_size=128):
        self.points_per_sample = points_per_sample
        self.latent_dim = latent_dim
        self.eval_size = eval_size
        self.n_batch = batch_size
        
    def build_discriminator(self, init="he_uniform", act="relu"): 
        nh=4
        nchan=16
        model = Sequential()
        in_points = InputLayer(input_shape=(2, self.points_per_sample, 1), 
                               name="discr_input")
        model.add(in_points)
        # YOUR CODE HERE :)
        model.add(Dense(1, activation='sigmoid'))
        model.compile(loss='binary_crossentropy', optimizer='adam', metrics=['accuracy'])
        return model

    def build_generator(self, init="he_uniform", act="relu"):
        nh=4
        nchan=16
        model = Sequential()
        # YOUR CODE HERE :)
        return model

    def generate_real_samples(self, f, n):
        # YOUR CODE HERE
        return (xy, labels)

    #sampling in the latent space
    def sample_latent_points(self, n):
        # generate points in the latent space
        x_input = np.random.randn(self.latent_dim * n)
        # reshape into a batch of inputs for the network
        x_input = x_input.reshape(n, self.latent_dim)
        return x_input

    def generate_fake_samples(self, generator, n):
        # sample points in latent space
        x_input = self.sample_latent_points(n)
        # pass through the generator
        X = generator.predict(x_input)
        labels = np.zeros((n, 1))
        return X, labels

    def build_gan(self, generator, discriminator):
        discriminator.trainable = False
        # stack the models together
        model = Sequential()
        # add generator
        model.add(generator)
        # add the discriminator
        model.add(discriminator)
        # We _do_ compile this model!
        model.compile(loss='binary_crossentropy', optimizer='adam')
        return model

    def evaluate(self, discriminator, generator, modes):
        # YOUR CODE HERE
        _, dar = discriminator.evaluate(real_xy, real_labs, verbose=0, steps=1)
        _, daf = discriminator.evaluate(generated_xy, generated_labs, verbose=0, steps=1)
        return dar, daf, generated_xy[:,0], generated_xy[:,1]

    def generate_for_modes(self, modes, batch):
        # YOUR CODE HERE
        return np.concatenate(zipped_samples[0], axis=0), np.concatenate(zipped_samples[1])

    def train(self, generator, discriminator, gan, modes,
              n_epochs=5000, n_batch=128, 
              eval_interval=200, eval_size=20):

        gen_xy_for_evaluation = []

        # determine half the size of one batch, for updating the discriminator
        half_batch = int(self.n_batch / 2)
        # manually enumerate epochs
        for i in range(n_epochs):
            # real samples
            # YOUR CODE HERE

            # fake examples
            x_fake, y_fake = self.generate_fake_samples(generator, half_batch)
            
            # train the discriminator
            discriminator.train_on_batch(x_real, y_real)
            discriminator.train_on_batch(x_fake, y_fake)
            # sample the latent space
            x_gan = self.sample_latent_points(self.n_batch)
            y_gan = np.ones((self.n_batch, 1))
            # train the generator using the discriminator's feedback
            gan.train_on_batch(x_gan, y_gan)
            if (i+1) % eval_interval == 0:
                dar, daf, gen_x, gen_y = self.evaluate(discriminator, generator, modes)
                gen_xy_for_evaluation.append((gen_x, gen_y))
                print("epoch %s: discr acc real: %s, discr acc fake: %s" % (i, dar, daf))
        return gen_xy_for_evaluation

In [None]:
class MultimodalGanExperiment:
    
    def __init__(self, points_per_sample=50, latent_dim = 10, eval_size=20, batch_size=128):
        self.points_per_sample = points_per_sample
        self.latent_dim = latent_dim
        self.eval_size = eval_size
        self.n_batch = batch_size
        
    def build_discriminator(self, init="he_uniform", act="relu"): 
        nh=4
        nchan=16
        model = Sequential()
        in_points = InputLayer(input_shape=(2, self.points_per_sample, 1), name="discr_input")
        model.add(in_points)
        for lyr in range(nh):
            model.add(Conv2D(nchan, (20, 2), strides=2, padding="same"))
            model.add(Dropout(.4))
            model.add(LeakyReLU(alpha=0.2))
        model.add(Flatten())
        model.add(Dropout(.4))
        model.add(Dense(1, activation='sigmoid'))
        model.compile(loss='binary_crossentropy', optimizer='adam', metrics=['accuracy'])
        return model

    def build_generator(self, init="he_uniform", act="relu"):
        nh=4
        nchan=16
        model = Sequential()
        #model.add(Input(shape=(self.latent_dim,)))
        starting_conv_dim = max(self.points_per_sample//nh, 1)
        n_nodes = nchan * starting_conv_dim * 2
        model.add(Dense(n_nodes, input_shape=(self.latent_dim,)))
        model.add(LeakyReLU(alpha=0.2))
        model.add(Reshape((starting_conv_dim, 2, nchan)))
        for lyr in range(2): #it's all shitty and hardcoded
            model.add(Conv2DTranspose(nchan, (20,2), strides=(2,1), padding='same'))
            model.add(Dropout(.4))
            model.add(LeakyReLU(alpha=0.2))
        model.add(Conv2D(1, (20,2), activation='tanh', padding='same'))
        model.add(Permute((2, 1, 3)))
        # model.add(Flatten())
#        model.add(Reshape((2, self.points_per_sample)))
        # print(model.output)
        return model

    def generate_real_samples(self, f, n):
        x = ((np.random.rand(n, self.points_per_sample) - .5)*2)
        y = np.apply_along_axis(f, 1, x) #axis concept is actually superfluous here...
        labels = np.ones((n, 1))
        xy = np.stack((x.reshape(n, self.points_per_sample),
                       y.reshape(n, self.points_per_sample)), 1)
        return (xy, labels)

    #sampling in the latent space
    def sample_latent_points(self, n):
        # generate points in the latent space
        x_input = np.random.randn(self.latent_dim * n)
        # reshape into a batch of inputs for the network
        x_input = x_input.reshape(n, self.latent_dim)
        return x_input

    def generate_fake_samples(self, generator, n):
        # sample points in latent space
        x_input = self.sample_latent_points(n)
        # pass through the generator
        X = generator.predict(x_input)
        labels = np.zeros((n, 1))
        return X, labels

    def build_gan(self, generator, discriminator):
        discriminator.trainable = False
        # stack the models together
        model = Sequential()
        # add generator
        model.add(generator)
        # add the discriminator
        model.add(discriminator)
        # We _do_ compile this model!
        model.compile(loss='binary_crossentropy', optimizer='adam')
        return model

    def evaluate(self, discriminator, generator, modes):
        generated_xy, generated_labs = self.generate_fake_samples(generator, self.eval_size)
        real_xy, real_labs = self.generate_for_modes(modes, self.eval_size)
        real_xy = expand_dims(real_xy)
        _, dar = discriminator.evaluate(real_xy, real_labs, verbose=0, steps=1)
        _, daf = discriminator.evaluate(generated_xy, generated_labs, verbose=0, steps=1)
        return dar, daf, generated_xy[:,0], generated_xy[:,1]

    def generate_for_modes(self, modes, batch):
        split_batch = [batch//len(modes) for i in range(len(modes)-1)]
        split_batch.append(batch- (batch//len(modes)) * (len(modes)-1)) #what's left, may or may not be the %
        samples_split = [self.generate_real_samples(modes[i], sb) for i,sb in 
                         enumerate(split_batch)]
        zipped_samples = list(zip(*samples_split))
        # print(zipped_samples[0][0].shape)
        # print(zipped_samples[0][1].shape)
        return np.concatenate(zipped_samples[0], axis=0), np.concatenate(zipped_samples[1])
#         for i,sb in enumerate(split_batch):
#             yield self.generate_real_samples(modes[i], sb)

    def train(self, generator, discriminator, gan, modes,
              n_epochs=5000, n_batch=128, 
              eval_interval=200, eval_size=20):

        gen_xy_for_evaluation = []

        # determine half the size of one batch, for updating the discriminator
        half_batch = int(self.n_batch / 2)
        # manually enumerate epochs
        for i in range(n_epochs):
            # real samples
            x_real, y_real = self.generate_for_modes(modes, half_batch)
            x_real = expand_dims(x_real)
            # print("xreal: " + str(x_real.shape))
            # fake examples
            x_fake, y_fake = self.generate_fake_samples(generator, half_batch)
            # print("xfake: " + str(x_fake.shape))
            # print(y_fake.shape)
            # train the discriminator
            discriminator.train_on_batch(x_real, y_real)
            discriminator.train_on_batch(x_fake, y_fake)
            # sample the latent space
            x_gan = self.sample_latent_points(self.n_batch)
            y_gan = np.ones((self.n_batch, 1))
            # train the generator using the discriminator's feedback
            gan.train_on_batch(x_gan, y_gan)
            if (i+1) % eval_interval == 0:
                dar, daf, gen_x, gen_y = self.evaluate(discriminator, generator, modes)
                gen_xy_for_evaluation.append((gen_x, gen_y))
                print("epoch %s: discr acc real: %s, discr acc fake: %s" % (i, dar, daf))
        return gen_xy_for_evaluation

In [None]:
#ld=40, pps=20
mge = MultimodalGanExperiment(points_per_sample=200, latent_dim = 30, 
                              eval_size=50, batch_size=128)

#modes = [fcirc, fcub, fpar]
modes = [f5, fcub, fpar]

discriminator = mge.build_discriminator() #100 100 100
generator = mge.build_generator() #70 70 70 70
gan_model = mge.build_gan(generator, discriminator)

gen_xy_data = mge.train(generator, discriminator, gan_model, modes, n_epochs=500, eval_interval=50) #50000

In [None]:
fig, axs=plt.subplots(len(gen_xy_data), 8) 
fig.set_size_inches(15, 18)
for i,j in product(range(len(gen_xy_data)), range(8)):
    axs[i,j].plot(gen_xy_data[i][0][-j,:,0], gen_xy_data[i][1][-j,:,0], "ro")

It's clear that the network, without further optimisation or heuristic, needs longer times to converge to something plausible and, quite possibly, more architectural complexity to capture the 3 modes of our data. Even so, we can start seeing the first signs of _mode collapse_, since most generated graphs show a diagonally looking upward trend crossing (0,0), very reminiscent of our cubic, but much less so of a parabola or circle.

#### A note on mode collapse

Mode collapse is one of the fundamental problems affecting GANs. As the name suggests, it's the behaviour associated with the generator only focussing on one, or some, of the possible modes.

It is an issue that needs to be specifically accounted and corrected for, insofar a simple loss function doesn't normally penalise it: if $G$ learnt to generate perfect parabolas and never even attempted to generate circles, the discriminator loss on those samples would still be low.

This said, _mode collapse is not necessarily an evil_. It may be acceptable in scenarios where even generating a small subset of all the possible data modes is a viable solution.

In [2], the authors suggest the use of _minibatch features_ to correct mode collapse. Intuitively, it consists in allowing the discriminator to compare a generated sample to a minibatch of fake and one of real data, in order to determine whether the sample is _too much_ alike other generated points.

## Semi-supervised Learning

One of the most exciting, _real_ applications of GANs is pushing the envelope of semi-supervised learning. SSL is quite possibly the most common and realistic scenario faced by ML practitioners, where only a small fraction of the available data has been labeled: the challenge is to make the most of it. 

The idea behind GANs for SSL is simple: learn the distribution of the labelled data in the hope of being able to generate realistic enough samples to make up for the shortage labelled points.

The implementation of it is, of course, less straightforward and rife with subtleties, such as

* What happens if there are classes for which I have no labelled samples?
* How do I distinguish between generated and unlabelled?
* What happens with mode collapse?
* To what extent does the quality of the generated samples impact the classifier I want to train?

Let's get to work and we'll address some of these points (among other things).

__How__:
* We'll work on MNIST data
* We'll discuss how to turn a discriminator into a classifier

### The structure of our SGAN


Let's start by giving the general skeleton of how our SGAN class will look like. It's quite similar to what we've coded so far, hence we can save up some work.

In [None]:
class SGAN:
    
    def __init__(self, latent_dim):
        pass

    def build_discriminator(self, in_shape=(28,28,1)):
        pass

    # define the standalone generator model
    def build_generator(self):
        pass

    # define the combined generator and discriminator model, for updating the generator
    def build_gan(self):
        self.discriminator.trainable = False
        # connect image output from generator as input to discriminator
        gan_output = self.discriminator(self.generator.output)
        # define gan model as taking noise and outputting a classification
        model = Model(self.generator.input, gan_output)
        # compile model
        opt = Adam(lr=0.0002, beta_1=0.5)
        model.compile(loss='binary_crossentropy', optimizer=opt)
        return model

    @staticmethod
    def prepare_real_data():
        # load dataset
        (X_train, y_train), (_, _) = mnist.load_data()
        X = np.expand_dims(X_train, axis=-1).astype('float32')
        # rescale from [-255, 255] to [-1,1]
        X = (X - 127.5) / 127.5
        return [X, y_train]

    # select a balanced, supervised subdataset
    def prepare_supervised_samples(self, n_samples=100):
        X, y = self.dataset
        X_list, y_list = list(), list()
        n_per_class = int(n_samples / self.n_classes)
        for i in range(self.n_classes):
            # choose random instances of images of a given class
            Xi_all = X[y == i]
            xi = np.random.randint(0, len(Xi_all), n_per_class)
            # add to list
            for j in xi:
                X_list.append(Xi_all[j])
                y_list.append(i)
        return np.asarray(X_list), np.asarray(y_list)

    #generate real points for the discriminator from a sample of labelled data
    @staticmethod
    def generate_real_samples(labelled_data, n_samples):
        X, y = labelled_data
        xi = np.random.randint(0, X.shape[0], n_samples)
        X_samp, y_samp = X[xi], y[xi]
        # generate class labels for discriminator
        ones = np.ones((n_samples, 1))
        return [X_samp, y_samp], ones

    # sample the latent space for the generator
    def sample_latent_points(self, n_samples):
        lat_pnts = np.random.randn(self.latent_dim * n_samples)
        lat_pnts = lat_pnts.reshape(n_samples, self.latent_dim)
        return lat_pnts

    # generate fake samples for the discriminator
    def generate_fake_samples(self, n_samples):
        latent_points = self.sample_latent_points(n_samples)
        generated_images = self.generator.predict(latent_points)
        y = np.zeros((n_samples, 1))
        return generated_images, y

    # generate samples, save as a plot and save the model
    def summarize_performance(self, step, n_samples=100):
        X, _ = self.generate_fake_samples(n_samples)
        # rescale for simplicity
        X = (X + 1) / 2.0
        # plot
        for i in range(100):
            pyplot.subplot(10, 10, 1 + i)
            pyplot.axis('off')
            pyplot.imshow(X[i, :, :, 0], cmap='gray_r')
        fl = 'generated_images_%04d.png' % (step+1)
        pyplot.savefig(fl)
        pyplot.close()
        # evaluate the classifier
        # WARNING!! DON'T TRY THIS AT HOME!! (why? :)
        X, y = self.dataset
        _, acc = self.classifier.evaluate(X, y, verbose=0)
        print('===> Classifier Accuracy: %.3f%%' % (acc * 100))
        # save the generator
        gen_file = 'generator_%04d.h5' % (step+1)
        self.generator.save(gen_file)
        # save the classifier
        class_file = 'classifier_%04d.h5' % (step+1)
        self.classifier.save(class_file)

    # train the model!
    def train(self, n_epochs=20, batch_size=100):
        #Prepare the training data...
        #Your code here
        
        # Now train
        for i in range(training_steps):
            #Your code here
            
            # summarize losses 
            if (i+1) % 500 == 0:
                print('-> step=%d \n'\
                    '   classifier -> loss = %.3f, accuracy = %.0f \n'\
                    '   discriminator -> loss on real = %.3f, loss on fake = %.3f \n'\
                    '   generator -> loss = %.3f' % 
                    (i+1, c_loss, c_acc*100, d_loss1, d_loss2, g_loss))
            # evaluate model performance...and save!
            if (i+1) % bat_per_epo == 0:
                self.summarize_performance(i)

### The discriminator of a SGAN

Unsurprisingly, this is the most delicate aspect concerning the architecture of the network. Remember: our aim is to train a classifier with very little labelled data. The natural place for a classifier within a GAN is, of course, the discriminator. But how do we fit it in?

#### First things first: feature extraction

Let's turn those $28\times28$ images into something we can work with. Any ideas? :)

In [None]:
import keras

In [None]:
from keras.layers import LeakyReLU

In [None]:
LeakyReLU()

In [None]:
def build_discriminator(self, in_shape=(28,28,1)):
    # image input
    in_image = Input(shape=in_shape)
    # downsample
    fe = Conv2D(128, (3,3), strides=(2,2), padding='same')(in_image)
    fe = LeakyReLU(alpha=0.2)(fe)
    # downsample
    fe = Conv2D(128, (3,3), strides=(2,2), padding='same')(fe)
    fe = LeakyReLU(alpha=0.2)(fe)
    # downsample
    fe = Conv2D(128, (3,3), strides=(2,2), padding='same')(fe)
    fe = LeakyReLU(alpha=0.2)(fe)
    # flatten feature maps
    fe = Flatten()(fe)
    # dropout
    fe = Dropout(0.4)(fe)

#### Making it an unsupervised discriminator

Unsurprisingly: pass the vector through a dense layer with sigmoid activation, optimise against binary crossentropy.

In [None]:
d_out_layer = Dense(1, activation='sigmoid')(fe)
# define and compile unsupervised discriminator model
d_model = Model(in_image, d_out_layer)
d_model.compile(loss='binary_crossentropy', optimizer=Adam(lr=0.0002, beta_1=0.5))

#### Using the same features to classify

Strategy: collapse the layer into 10 classes, pick one (which activation do you need?), optimise against categorical crossentropy.

In [None]:
# supervised output
c_out_layer = Dense(n_classes, activation="softmax")(fe)
# define and compile supervised discriminator model
c_model = Model(in_image, c_out_layer)
c_model.compile(loss='sparse_categorical_crossentropy', 
                optimizer=Adam(lr=0.0002, beta_1=0.5), 
                metrics=['accuracy']) #we've got evaluation metrics now!

#### What we have now

With this approach we have two models, an unsupervised discriminator and a classifier, that share the feature extraction weights and have two separate outputs. Let's have a look:

In [None]:
import keras.datasets.mnist #import load_data
from keras.optimizers import Adam
from keras.models import Model
from keras.layers import Input
from keras.layers import Dense
from keras.layers import Reshape
from keras.layers import Flatten
from keras.layers import Conv2D
from keras.layers import Conv2DTranspose
from keras.layers import LeakyReLU
from keras.layers import Dropout
from keras.layers import Lambda
from keras.layers import Activation
from keras.utils.vis_utils import plot_model

In [None]:
def build_discriminator(n_classes, in_shape=(28,28,1)):
    # image input
    in_image = Input(shape=in_shape)
    # downsample
    fe = Conv2D(128, (3,3), strides=(2,2), padding='same')(in_image)
    fe = LeakyReLU(alpha=0.2)(fe)
    # downsample
    fe = Conv2D(128, (3,3), strides=(2,2), padding='same')(fe)
    fe = LeakyReLU(alpha=0.2)(fe)
    # downsample
    fe = Conv2D(128, (3,3), strides=(2,2), padding='same')(fe)
    fe = LeakyReLU(alpha=0.2)(fe)
    # flatten feature maps
    fe = Flatten()(fe)
    # dropout
    fe = Dropout(0.4)(fe)
    d_out_layer = Dense(1, activation='sigmoid')(fe)
    # define and compile unsupervised discriminator model
    d_model = Model(in_image, d_out_layer)
    d_model.compile(loss='binary_crossentropy', optimizer=Adam(lr=0.0002, beta_1=0.5))
    # supervised output
    c_out_layer = Dense(n_classes, activation='softmax')(fe)
    # define and compile supervised discriminator model
    c_model = Model(in_image, c_out_layer)
    c_model.compile(loss='sparse_categorical_crossentropy', 
                    optimizer=Adam(lr=0.0002, beta_1=0.5), 
                    metrics=['accuracy']) #we've got evaluation metrics now!
    return d_model, c_model

In [None]:
d_model, c_model = build_discriminator(10)

Here is what the unsupervised discriminator looks like:

In [None]:
plot_model(d_model, show_shapes=True, show_layer_names=True)

...and the classifier:

In [None]:
plot_model(c_model, show_shapes=True, show_layer_names=True)

#### The twist: feeding the classifier into the discriminator

The classifier, in its last layer and before the softmax activation, contains the information about which of the $n$ output classes those features are more likely associated with. In [2] Salimans et al. suggest that if the classifier were passed a generated but unrealistic input, then its last layer should be about flat, and with fairly small values. That is: it wouldn't know how to classify the sample because it doesn't look like any real data point.

__SHOULD WE SEPARATE THE ACTIVATION AS A SINGLE LAYER IN ORDER TO AVOID DISTORTION BY THE FINAL LAYER'S WEIGHTS?__

If that's the case, one might pass the last classification layer as input to the discriminator, which would then take advantage of the additional discriminative power of the classifier. In order to make this approach work they suggest the following tweak to the discriminator's activation function:

$$D(\mathbf x) = \frac{L(\mathbf x)}{L(\mathbf x)+1},\ \text{with } L(\mathbf x) = \sum e^{x_i}$$

In [None]:
import keras.backend as Kb
def new_activation(output):
    logexpsum = Kb.sum(Kb.exp(output), axis=-1, keepdims=True)
    result = logexpsum / (logexpsum + 1.0)
    return result


In [None]:
#Using the classifier for discrimination
def build_discriminator(n_classes, in_shape=(28,28,1)):
    # image input
    in_image = Input(shape=in_shape)
    # downsample
    fe = Conv2D(128, (3,3), strides=(2,2), padding='same')(in_image)
    fe = LeakyReLU(alpha=0.2)(fe)
    # downsample
    fe = Conv2D(128, (3,3), strides=(2,2), padding='same')(fe)
    fe = LeakyReLU(alpha=0.2)(fe)
    # downsample
    fe = Conv2D(128, (3,3), strides=(2,2), padding='same')(fe)
    fe = LeakyReLU(alpha=0.2)(fe)
    # flatten feature maps
    fe = Flatten()(fe)
    # dropout
    fe = Dropout(0.4)(fe)
    # separate the last dense layer
    fe = Dense(self.n_classes)(fe)
    # supervised output
    c_out_layer = Activation('softmax')(fe)
    # define and compile supervised discriminator model
    c_model = Model(in_image, c_out_layer)
    c_model.compile(loss='sparse_categorical_crossentropy', 
                    optimizer=Adam(lr=0.0002, beta_1=0.5), 
                    metrics=['accuracy'])
    # unsupervised output: the input is the classifier's last dense layer!
    d_out_layer = Lambda(new_activation)(fe)
    # define and compile unsupervised discriminator model
    d_model = Model(in_image, d_out_layer)
    d_model.compile(loss='binary_crossentropy', 
                    optimizer=Adam(lr=0.0002, beta_1=0.5))
    return d_model, c_model


#### Diversion: is this all we've got?

Are there other ways to train a classifier and a discriminator together? Which? Who dares to try one??

_(Alternatives: single model with [discriminator, classifier] output; two models with shared weights, as before.)_


### Building the generator

Remember: we downsampled the image for the discriminator. Let's do the opposite for the generator, upsampling. We can be perfectly symmetrical, start from an intermediate dimension (7? 14?), from an arbitrary one, use a different stride and window or do something different altogether.

Anyone wants to try some alternatives?

In [None]:
def build_generator(self):
    # image generator input
    in_lat = Input(shape=(self.latent_dim,))
    # foundation for 7x7 image
    n_nodes = 128 * 7 * 7
    gen = Dense(n_nodes)(in_lat)
    gen = LeakyReLU(alpha=0.2)(gen)
    gen = Reshape((7, 7, 128))(gen)
    # upsample to 14x14
    gen = Conv2DTranspose(128, (4,4), strides=(2,2), padding='same')(gen)
    gen = LeakyReLU(alpha=0.2)(gen)
    # upsample to 28x28
    gen = Conv2DTranspose(128, (4,4), strides=(2,2), padding='same')(gen)
    gen = LeakyReLU(alpha=0.2)(gen)
    # output
    out_layer = Conv2D(1, (7,7), activation='tanh', padding='same')(gen)
    # define model
    model = Model(in_lat, out_layer)
    return model

### Writing the training code

We're not done yet! We need to prepare the training data and write the training code :P

In [None]:
#a few guidelines...
def train(self, n_epochs=20, batch_size=100):
    # prepare the supervised data
    X_sup, y_sup = 
    # compute the number of training batches per epoch and the training iterations
    
    # also! don't forget that every batch must be split in half: real and fake
    half_batch = batch_size // 2
    # this will help us keep the same nomenclature :P
    print('n_epochs=%d, batch_size=%d, 1/2 batch=%d, batch per epoch=%d, total steps=%d' % 
        (n_epochs, batch_size, half_batch, batches_per_epoch, training_steps))

    # Now train
    for i in range(training_steps):
        # update the supervised discriminator a.k.a. classifier
        [X_sup_sample, y_sup_sample], _ = 
        # update the standard, unsupervised discriminator
        # compute the loss on real and fake data separately
        [X_real, _], y_real = 
        d_loss1 = 
        X_fake, y_fake = 
        d_loss2 = 
        # update the generator
        X_lat, y_lat = 
        g_loss = 
        # summarize losses 
        if (i+1) % 500 == 0:
            print('-> step=%d \n'\
                '   classifier -> loss = %.3f, accuracy = %.0f \n'\
                '   discriminator -> loss on real = %.3f, loss on fake = %.3f \n'\
                '   generator -> loss = %.3f' % 
                (i+1, c_loss, c_acc*100, d_loss1, d_loss2, g_loss))
        # evaluate model performance...and save!
        if (i+1) % bat_per_epo == 0:
            self.summarize_performance(i)

In [None]:
def train(self, n_epochs=20, batch_size=100):
    # supervised data
    X_sup, y_sup = self.prepare_supervised_samples()
    # each epoch we'll train a certain number of batches...
    batches_per_epoch = int(self.dataset[0].shape[0] / batch_size)
    # calculate the number of training iterations
    training_steps = batches_per_epoch * n_epochs
    # this'll come in handy...
    half_batch = batch_size // 2
    print('n_epochs=%d, batch_size=%d, 1/2 batch=%d, batch per epoch=%d, total steps=%d' % 
        (n_epochs, batch_size, half_batch, batches_per_epoch, training_steps))

    # Now train
    for i in range(training_steps):
        # update the supervised discriminator a.k.a. classifier
        [X_sup_sample, y_sup_sample], _ = self.generate_real_samples([X_sup, y_sup], half_batch)
        c_loss, c_acc = self.classifier.train_on_batch_size(X_sup_sample, y_sup_sample)
        # update the standard, unsupervised discriminator
        # compute the loss on real and fake data separately
        [X_real, _], y_real = self.generate_real_samples(self.dataset, half_batch)
        d_loss1 = self.discriminator.train_on_batch_size(X_real, y_real)
        X_fake, y_fake = self.generate_fake_samples(half_batch)
        d_loss2 = self.discriminator.train_on_batch_size(X_fake, y_fake)
        # update the generator
        X_lat, y_lat = self.sample_latent_points(batch_size), np.ones((batch_size, 1))
        g_loss = self.gan.train_on_batch_size(X_lat, y_lat)
        # summarize losses 
        if (i+1) % 500 == 0:
            print('-> step=%d \n'\
                '   classifier -> loss = %.3f, accuracy = %.0f \n'\
                '   discriminator -> loss on real = %.3f, loss on fake = %.3f \n'\
                '   generator -> loss = %.3f' % 
                (i+1, c_loss, c_acc*100, d_loss1, d_loss2, g_loss))
        # evaluate model performance...and save!
        if (i+1) % bat_per_epo == 0:
            self.summarize_performance(i)

### Putting it all together

In [None]:
# SGAN on MNIST data
import keras.datasets.mnist as mnist
from keras.optimizers import Adam
from keras.models import Model
from keras.layers import Input
from keras.layers import Dense
from keras.layers import Reshape
from keras.layers import Flatten
from keras.layers import Conv2D
from keras.layers import Conv2DTranspose
from keras.layers import LeakyReLU
from keras.layers import Dropout
from keras.layers import Lambda
from keras.layers import Activation
from matplotlib import pyplot
from keras import backend as Kb

class SGAN:
    
    def __init__(self, latent_dim):
        self.dataset = self.prepare_real_data()
        self.n_classes = 10
        self.latent_dim = latent_dim
        self.generator = self.build_generator()
        self.discriminator, self.classifier = self.build_discriminator()
        self.gan = self.build_gan()

    # custom activation function
    @staticmethod
    def custom_activation(output):
        logexpsum = Kb.sum(Kb.exp(output), axis=-1, keepdims=True)
        result = logexpsum / (logexpsum + 1.0)
        return result

    # define the standalone supervised and unsupervised discriminator models
    def build_discriminator(self, in_shape=(28,28,1)):
        # image input
        in_image = Input(shape=in_shape)
        # downsample
        fe = Conv2D(128, (3,3), strides=(2,2), padding='same')(in_image)
        fe = LeakyReLU(alpha=0.2)(fe)
        # downsample
        fe = Conv2D(128, (3,3), strides=(2,2), padding='same')(fe)
        fe = LeakyReLU(alpha=0.2)(fe)
        # downsample
        fe = Conv2D(128, (3,3), strides=(2,2), padding='same')(fe)
        fe = LeakyReLU(alpha=0.2)(fe)
        # flatten feature maps
        fe = Flatten()(fe)
        # dropout
        fe = Dropout(0.4)(fe)
        # output layer nodes
        fe = Dense(self.n_classes)(fe)
        # supervised output
        c_out_layer = Activation('softmax')(fe)
        # define and compile supervised discriminator model
        c_model = Model(in_image, c_out_layer)
        c_model.compile(loss='sparse_categorical_crossentropy', 
                        optimizer=Adam(lr=0.0002, beta_1=0.5), 
                        metrics=['accuracy'])
        # unsupervised output
        d_out_layer = Lambda(self.custom_activation)(fe)
        # define and compile unsupervised discriminator model
        d_model = Model(in_image, d_out_layer)
        d_model.compile(loss='binary_crossentropy', 
                        optimizer=Adam(lr=0.0002, beta_1=0.5))
        return d_model, c_model

    # define the standalone generator model
    def build_generator(self):
        # image generator input
        in_lat = Input(shape=(self.latent_dim,))
        # foundation for 7x7 image
        n_nodes = 128 * 7 * 7
        gen = Dense(n_nodes)(in_lat)
        gen = LeakyReLU(alpha=0.2)(gen)
        gen = Reshape((7, 7, 128))(gen)
        # upsample to 14x14
        gen = Conv2DTranspose(128, (4,4), strides=(2,2), padding='same')(gen)
        gen = LeakyReLU(alpha=0.2)(gen)
        # upsample to 28x28
        gen = Conv2DTranspose(128, (4,4), strides=(2,2), padding='same')(gen)
        gen = LeakyReLU(alpha=0.2)(gen)
        # output
        out_layer = Conv2D(1, (7,7), activation='tanh', padding='same')(gen)
        # define model
        model = Model(in_lat, out_layer)
        return model

    # define the combined generator and discriminator model, for updating the generator
    def build_gan(self):
        # make weights in the discriminator not trainable
        self.discriminator.trainable = False
        # connect image output from generator as input to discriminator
        gan_output = self.discriminator(self.generator.output)
        # define gan model as taking noise and outputting a classification
        model = Model(self.generator.input, gan_output)
        # compile model
        opt = Adam(lr=0.0002, beta_1=0.5)
        model.compile(loss='binary_crossentropy', optimizer=opt)
        return model

    @staticmethod
    def prepare_real_data():
        # load dataset
        (X_train, y_train), (_, _) = mnist.load_data()
        X = np.expand_dims(X_train, axis=-1).astype('float32')
        # rescale from [-255, 255] to [-1,1]
        X = (X - 127.5) / 127.5
        return [X, y_train]

    # select a balanced, supervised subdataset
    def prepare_supervised_samples(self, n_samples=100):
        X, y = self.dataset
        X_list, y_list = list(), list()
        n_per_class = int(n_samples / self.n_classes)
        for i in range(self.n_classes):
            # choose random instances of images of a given class
            Xi_all = X[y == i]
            xi = np.random.randint(0, len(Xi_all), n_per_class)
            # add to list
            for j in xi:
                X_list.append(Xi_all[j])
                y_list.append(i)
        return np.asarray(X_list), np.asarray(y_list)

    #generate real points for the discriminator from a sample of labelled data
    @staticmethod
    def generate_real_samples(labelled_data, n_samples):
        X, y = labelled_data
        xi = np.random.randint(0, X.shape[0], n_samples)
        X_samp, y_samp = X[xi], y[xi]
        # generate class labels for discriminator
        ones = np.ones((n_samples, 1))
        return [X_samp, y_samp], ones

    # sample the latent space for the generator
    def sample_latent_points(self, n_samples):
        lat_pnts = np.random.randn(self.latent_dim * n_samples)
        lat_pnts = lat_pnts.reshape(n_samples, self.latent_dim)
        return lat_pnts

    # generate fake samples for the discriminator
    def generate_fake_samples(self, n_samples):
        latent_points = self.sample_latent_points(n_samples)
        generated_images = self.generator.predict(latent_points)
        y = np.zeros((n_samples, 1))
        return generated_images, y

    # generate samples, save as a plot and save the model
    def summarize_performance(self, step, n_samples=100):
        X, _ = self.generate_fake_samples(n_samples)
        # rescale for simplicity
        X = (X + 1) / 2.0
        # plot
        for i in range(100):
            pyplot.subplot(10, 10, 1 + i)
            pyplot.axis('off')
            pyplot.imshow(X[i, :, :, 0], cmap='gray_r')
        fl = 'generated_images_%04d.png' % (step+1)
        pyplot.savefig(fl)
        pyplot.close()
        # evaluate the classifier
        # WARNING!! DON'T TRY THIS AT HOME!! (why? :)
        X, y = self.dataset
        _, acc = self.classifier.evaluate(X, y, verbose=0)
        print('===> Classifier Accuracy: %.3f%%' % (acc * 100))
        # save the generator
        gen_file = 'generator_%04d.h5' % (step+1)
        self.generator.save(gen_file)
        # save the classifier
        class_file = 'classifier_%04d.h5' % (step+1)
        self.classifier.save(class_file)

    # train the model!
    def train(self, n_epochs=20, batch_size=100):
        # supervised data
        X_sup, y_sup = self.prepare_supervised_samples()
        # each epoch we'll train a certain number of batches...
        batches_per_epoch = int(self.dataset[0].shape[0] / batch_size)
        # calculate the number of training iterations
        training_steps = batches_per_epoch * n_epochs
        # this'll come in handy...
        half_batch = batch_size // 2
        print('n_epochs=%d, batch_size=%d, 1/2 batch=%d, batch per epoch=%d, total steps=%d' % 
            (n_epochs, batch_size, half_batch, batches_per_epoch, training_steps))
        
        # Now train
        for i in range(training_steps):
            # update the supervised discriminator a.k.a. classifier
            [X_sup_sample, y_sup_sample], _ = self.generate_real_samples([X_sup, y_sup], half_batch)
            c_loss, c_acc = self.classifier.train_on_batch(X_sup_sample, y_sup_sample)
            # update the standard, unsupervised discriminator
            # compute the loss on real and fake data separately
            [X_real, _], y_real = self.generate_real_samples(self.dataset, half_batch)
            d_loss1 = self.discriminator.train_on_batch(X_real, y_real)
            X_fake, y_fake = self.generate_fake_samples(half_batch)
            d_loss2 = self.discriminator.train_on_batch(X_fake, y_fake)
            # update the generator
            X_lat, y_lat = self.sample_latent_points(batch_size), np.ones((batch_size, 1))
            g_loss = self.gan.train_on_batch(X_lat, y_lat)
            # summarize losses 
            if (i+1) % 500 == 0:
                print('-> step=%d \n'\
                    '   classifier -> loss = %.3f, accuracy = %.0f \n'\
                    '   discriminator -> loss on real = %.3f, loss on fake = %.3f \n'\
                    '   generator -> loss = %.3f' % 
                    (i+1, c_loss, c_acc*100, d_loss1, d_loss2, g_loss))
            # evaluate model performance...and save!
            if (i+1) % batches_per_epoch == 0:
                self.summarize_performance(i)

Let's run it!

In [None]:
import numpy as np
sgan = SGAN(30)

sgan.train(n_epochs=2)

### Simple Bonus Task: Feature matching

What we did by piping the classifier into the discriminator moves along the lines of another interesting strategy suggested in [2]: feature matching. It consists of asking the generator to generate data whose statistics match those used by the discriminator.

Think about it: $G$ is not only trying to fool $D$, it is doing so by _explicitly_ using $D$'s criteria against it. Salimans et al. claim that this change greatly helps in reducing the instability of GAN training, in particular the performance oscillation between $G$ and $D$.

In [None]:
class SGAN_FM(SGAN):
    
    def __init__(self, latent_dim):
        self.dataset = self.prepare_real_data()
        self.n_classes = 10
        self.unlab_imgs = Input(shape=(28, 28, 1))
        self.latent_dim = latent_dim
        self.generator = self.build_generator()
        self.discriminator, self.classifier, self.fe_model = self.build_discriminator()
        self.fm_layer = self.fe_model.output
        self.gan = self.build_gan()

    # custom activation function
    @staticmethod
    def custom_activation(output):
        logexpsum = Kb.sum(Kb.exp(output), axis=-1, keepdims=True)
        result = logexpsum / (logexpsum + 1.0)
        return result

    def build_discriminator(self, in_shape=(28,28,1)):
        # image input
        in_image = Input(shape=in_shape)
        # downsample
        fe = Conv2D(128, (3,3), strides=(2,2), padding='same')(in_image)
        fe = LeakyReLU(alpha=0.2)(fe)
        # downsample
        fe = Conv2D(128, (3,3), strides=(2,2), padding='same')(fe)
        fe = LeakyReLU(alpha=0.2)(fe)
        # downsample
        fe = Conv2D(128, (3,3), strides=(2,2), padding='same')(fe)
        fe = LeakyReLU(alpha=0.2)(fe)
        # flatten feature maps
        fe = Flatten()(fe)
        # dropout
        fe = Dropout(0.4)(fe)
        
        # output layer nodes
        fe = Dense(self.n_classes)(fe)
        
        fe_model = Model(in_image, fe)
        
        # supervised output
        c_out_layer = Activation('softmax')(fe)
        # define and compile supervised discriminator model
        c_model = Model(in_image, c_out_layer)
        c_model.compile(loss='sparse_categorical_crossentropy', 
                        optimizer=Adam(lr=0.0002, beta_1=0.5), 
                        metrics=['accuracy'])
        # unsupervised output
        d_out_layer = Lambda(self.custom_activation)(fe)
        # define and compile unsupervised discriminator model
        d_model = Model(in_image, d_out_layer)
        d_model.compile(loss='binary_crossentropy', 
                        optimizer=Adam(lr=0.0002, beta_1=0.5))
        return d_model, c_model, fe_model

    # define the standalone generator model
    def build_generator(self):
        # image generator input
        in_lat = Input(shape=(self.latent_dim,))
        # foundation for 7x7 image
        n_nodes = 128 * 7 * 7
        gen = Dense(n_nodes)(in_lat)
        gen = LeakyReLU(alpha=0.2)(gen)
        gen = Reshape((7, 7, 128))(gen)
        # upsample to 14x14
        gen = Conv2DTranspose(128, (4,4), strides=(2,2), padding='same')(gen)
        gen = LeakyReLU(alpha=0.2)(gen)
        # upsample to 28x28
        gen = Conv2DTranspose(128, (4,4), strides=(2,2), padding='same')(gen)
        gen = LeakyReLU(alpha=0.2)(gen)
        # output
        out_layer = Conv2D(1, (7,7), activation='tanh', padding='same')(gen)
        # define model
        model = Model(in_lat, out_layer)
        return model
    
    def new_gen_loss(self):
        def out_loss(y_pred, y_true):
            #need to do a new forward pass here
            output_unlabeled = self.fe_model.get_output_at(-1)
            m1 = Kb.mean(output_unlabeled, axis=0)
            m2 = Kb.mean(y_pred, axis=0)
            loss = Kb.mean(Kb.abs(m1-m2))
            return loss
        return out_loss
    
    def build_gan(self):
        # make weights in the discriminator not trainable
        self.discriminator.trainable = False
        # connect image output from generator as input to discriminator
        gan_output = self.fe_model(self.generator.output)
        model = Model(self.generator.input, gan_output)
        # compile model
        opt = Adam(lr=0.0002, beta_1=0.5)
        model.compile(loss=self.new_gen_loss(), optimizer=opt)
        return model
    
    # train the model!
    def train(self, n_epochs=20, batch_size=100):
        # supervised data
        X_sup, y_sup = self.prepare_supervised_samples()
        # each epoch we'll train a certain number of batches...
        batches_per_epoch = int(self.dataset[0].shape[0] / batch_size)
        # calculate the number of training iterations
        training_steps = batches_per_epoch * n_epochs
        # this'll come in handy...
        half_batch = batch_size // 2
        print('n_epochs=%d, batch_size=%d, 1/2 batch=%d, batch per epoch=%d, total steps=%d' % 
            (n_epochs, batch_size, half_batch, batches_per_epoch, training_steps))
        
        # Now train
        for i in range(training_steps):
            # update the supervised discriminator a.k.a. classifier
            [X_sup_sample, y_sup_sample], _ = self.generate_real_samples([X_sup, y_sup], half_batch)
            c_loss, c_acc = self.classifier.train_on_batch(X_sup_sample, y_sup_sample)
            # update the standard, unsupervised discriminator
            # compute the loss on real and fake data separately
            [X_real, _], y_real = self.generate_real_samples(self.dataset, half_batch)
            d_loss1 = self.discriminator.train_on_batch(X_real, y_real)
            X_fake, y_fake = self.generate_fake_samples(half_batch)
            d_loss2 = self.discriminator.train_on_batch(X_fake, y_fake)
            
#            self.fm_layer = self.fe_model.predict(X_real)

            # update the generator
            X_lat, y_lat = self.sample_latent_points(batch_size), np.ones((batch_size, 1))
            g_loss = self.gan.train_on_batch(X_lat, y_lat)
            # summarize losses 
            if (i+1) % 500 == 0:
                print('-> step=%d \n'\
                    '   classifier -> loss = %.3f, accuracy = %.0f \n'\
                    '   discriminator -> loss on real = %.3f, loss on fake = %.3f \n'\
                    '   generator -> loss = %.3f' % 
                    (i+1, c_loss, c_acc*100, d_loss1, d_loss2, g_loss))
            # evaluate model performance...and save!
            if (i+1) % batches_per_epoch == 0:
                self.summarize_performance(i)

## Real Bonus Tasks

If you've reached this point still lucid and unharmed, it means you possess a remarkable mastery of generative models and neural architectures. It's highly commendable.

But it also means it's time to take the wheels off. So let's think about the following problems :)) 

### GANs vs. VAEs

Both VAEs and GANs tackle a similar challenge, namely that of generating realistic looking data. VAEs explicitly model the underlying distribution, GANs somehow learn it implicitly. 

1.  How does a VAE perform on our unimodal and multimodal 1D function learning task?
2.  Could we use a VAE for semisupervised learning? Would it train a better classifier than a GAN?

### Class-targeted GAN

From the MNIST SGAN code extrapolate a simple GAN to generate MNIST digits. Whenever we call the generator, it'll produce whatever sample suits its fancy. This is not always ideal.

Modify the GAN so that we can specify the class of the samples to generate.

### Latent space exploration

Can the generator's latent space tell us something about the classes? What if we tried to explore it with different forms of clustering? (distance-based, density-based...)

### Parametrisation of the latent space

In VAEs we have two parameters that control the generation of samples in a smooth way. Can we obtain something similar with GANs?

### Make mode-collapse happen

Build a GAN that is supposed to generate a circle of bidimentional standard Gaussians. Mode collapse should appear in the form of undersampling/absence of some of the Gaussians.

###GAAs: Generative Adversarial Anything

Do they _really_ have to be networks? Could we apply the GAN training framework to any kind of generator/discriminator models? How would we optimise the parameters? 

_(This kind of lies outside the scope of a course on DL, but it certainly is a worthy conceptual exercise.)_

### Tabular GAN

How can we adapt a GAN to generate tabular data? I recommend approaching this problem by steps: first, think about purely continuous data, then think about how you would handle categorical variables. This is a non trivial task and there is published literature on models tackling this issue.

Some details to keep in mind:
 * Features have ranges that make some values plausible, others implausible.
 * Features could be generated by separated channels, but their correlation must be taken into account (e.g. on medical data, you won't be able to generate data for a male with a high risk of cervical cancer)


## Conclusions

GANs are powerful generative models that implicitly learn the distribution lying under the training data. To do so, they leverate, in an unsupervised context, an adversarial training where a generator and a discriminator compete against each other.

The two models usually being neural networks, intuitively means GANs are even more (over)parametrised and data-hungry than standard NNs, which in GANs often end up being but a part of a much larger architecture. Furthermore, the adversarial training adds additional instabilities and hurdles to convergence to the usual gradient-descent based methods, such as the numerous oscillations needed to reach an equilibrium in the generator/discriminator optimisation game, and the possibility of the game getting stuck in a local optimum as in the case of mode collapse.

The many technical pitfalls and the still not thorough understanding we have of these models motivate much of the ongoing research on GANs, which despite these drawbacks keep showing promising results in a vast array of applications, including the scenario most commonly faced by practitioners of training models in a semi-supervised fashion, a case in which GANs can help generate non-trivial, non-distorting labelled examples.

## References

[1] Goodfellow Ian, Generative Adversarial Networks, NIPS 2016 Tutorial, https://arxiv.org/pdf/1701.00160.pdf

[2] Salimans T. et al., Improved Techniques for Training GANs, https://arxiv.org/pdf/1606.03498.pdf

[3] Chinthala S. et al., How to train a GAN? Tips and tricks to make GANs work., https://github.com/soumith/ganhacks

In [None]:
# %tensorflow_version 2.x

In [None]:
# import tensorflow as tf

In [None]:
# from datetime import datetime
# logdir = "logs/scalars/" + datetime.now().strftime("%Y%m%d-%H%M%S")
# file_writer = tf.summary.create_file_writer(logdir + "/metrics")
# file_writer.set_as_default()

In [None]:
# %reload_ext tensorboard

In [None]:
# %tensorboard --inspect --logdir logs/scalars/20200110-124802/metrics

In [None]:
# SGAN on MNIST data
import keras.datasets.mnist as mnist
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Dense, Reshape, Flatten, Conv2D
from tensorflow.keras.layers import Conv2DTranspose, LeakyReLU, Dropout, Lambda, Activation
from matplotlib import pyplot
from tensorflow.keras import backend as Kb
import tensorflow as tf


tensorboard_callback = tf.keras.callbacks.TensorBoard(log_dir=logdir)

class SGAN:
    
    def __init__(self, latent_dim):
        self.dataset = self.prepare_real_data()
        self.n_classes = 10
        self.latent_dim = latent_dim
        self.generator = self.build_generator()
        self.discriminator, self.classifier = self.build_discriminator()
        self.gan = self.build_gan()

    # custom activation function
    @staticmethod
    def custom_activation(output):
        logexpsum = Kb.sum(Kb.exp(output), axis=-1, keepdims=True)
        result = logexpsum / (logexpsum + 1.0)
        return result

    # define the standalone supervised and unsupervised discriminator models
    def build_discriminator(self, in_shape=(28,28,1)):
        # image input
        in_image = Input(shape=in_shape)
        # downsample
        fe = Conv2D(128, (3,3), strides=(2,2), padding='same')(in_image)
        fe = LeakyReLU(alpha=0.2)(fe)
        # downsample
        fe = Conv2D(128, (3,3), strides=(2,2), padding='same')(fe)
        fe = LeakyReLU(alpha=0.2)(fe)
        # downsample
        fe = Conv2D(128, (3,3), strides=(2,2), padding='same')(fe)
        fe = LeakyReLU(alpha=0.2)(fe)
        # flatten feature maps
        fe = Flatten()(fe)
        # dropout
        fe = Dropout(0.4)(fe)
        # output layer nodes
        fe = Dense(self.n_classes)(fe)
        # supervised output
        c_out_layer = Activation('softmax')(fe)
        # define and compile supervised discriminator model
        c_model = Model(in_image, c_out_layer)
        c_model.compile(loss='sparse_categorical_crossentropy', 
                        optimizer=Adam(lr=0.0002, beta_1=0.5), 
                        metrics=['accuracy'])
        # unsupervised output
        d_out_layer = Lambda(self.custom_activation)(fe)
        # define and compile unsupervised discriminator model
        d_model = Model(in_image, d_out_layer)
        d_model.compile(loss='binary_crossentropy', 
                        optimizer=Adam(lr=0.0002, beta_1=0.5))
        return d_model, c_model

    # define the standalone generator model
    def build_generator(self):
        # image generator input
        in_lat = Input(shape=(self.latent_dim,))
        # foundation for 7x7 image
        n_nodes = 128 * 7 * 7
        gen = Dense(n_nodes)(in_lat)
        gen = LeakyReLU(alpha=0.2)(gen)
        gen = Reshape((7, 7, 128))(gen)
        # upsample to 14x14
        gen = Conv2DTranspose(128, (4,4), strides=(2,2), padding='same')(gen)
        gen = LeakyReLU(alpha=0.2)(gen)
        # upsample to 28x28
        gen = Conv2DTranspose(128, (4,4), strides=(2,2), padding='same')(gen)
        gen = LeakyReLU(alpha=0.2)(gen)
        # output
        out_layer = Conv2D(1, (7,7), activation='tanh', padding='same')(gen)
        # define model
        model = Model(in_lat, out_layer)
        return model

    # define the combined generator and discriminator model, for updating the generator
    def build_gan(self):
        # make weights in the discriminator not trainable
        self.discriminator.trainable = False
        # connect image output from generator as input to discriminator
        gan_output = self.discriminator(self.generator.output)
        # define gan model as taking noise and outputting a classification
        model = Model(self.generator.input, gan_output)
        # compile model
        opt = Adam(lr=0.0002, beta_1=0.5)
        model.compile(loss='binary_crossentropy', optimizer=opt)
        return model

    @staticmethod
    def prepare_real_data():
        # load dataset
        (X_train, y_train), (_, _) = mnist.load_data()
        X = np.expand_dims(X_train, axis=-1).astype('float32')
        # rescale from [-255, 255] to [-1,1]
        X = (X - 127.5) / 127.5
        return [X, y_train]

    # select a balanced, supervised subdataset
    def prepare_supervised_samples(self, n_samples=100):
        X, y = self.dataset
        X_list, y_list = list(), list()
        n_per_class = int(n_samples / self.n_classes)
        for i in range(self.n_classes):
            # choose random instances of images of a given class
            Xi_all = X[y == i]
            xi = np.random.randint(0, len(Xi_all), n_per_class)
            # add to list
            for j in xi:
                X_list.append(Xi_all[j])
                y_list.append(i)
        return np.asarray(X_list), np.asarray(y_list)

    #generate real points for the discriminator from a sample of labelled data
    @staticmethod
    def generate_real_samples(labelled_data, n_samples):
        X, y = labelled_data
        xi = np.random.randint(0, X.shape[0], n_samples)
        X_samp, y_samp = X[xi], y[xi]
        # generate class labels for discriminator
        ones = np.ones((n_samples, 1))
        return [X_samp, y_samp], ones

    # sample the latent space for the generator
    def sample_latent_points(self, n_samples):
        lat_pnts = np.random.randn(self.latent_dim * n_samples)
        lat_pnts = lat_pnts.reshape(n_samples, self.latent_dim)
        return lat_pnts

    # generate fake samples for the discriminator
    def generate_fake_samples(self, n_samples):
        latent_points = self.sample_latent_points(n_samples)
        generated_images = self.generator.predict(latent_points)
        y = np.zeros((n_samples, 1))
        return generated_images, y

    # generate samples, save as a plot and save the model
    def summarize_performance(self, step, n_samples=100):
        X, _ = self.generate_fake_samples(n_samples)
        # rescale for simplicity
        X = (X + 1) / 2.0
        # plot
        for i in range(100):
            pyplot.subplot(10, 10, 1 + i)
            pyplot.axis('off')
            pyplot.imshow(X[i, :, :, 0], cmap='gray_r')
        fl = 'generated_images_%04d.png' % (step+1)
        pyplot.savefig(fl)
        pyplot.close()
        # evaluate the classifier
        # WARNING!! DON'T TRY THIS AT HOME!! (why? :)
        X, y = self.dataset
        _, acc = self.classifier.evaluate(X, y, verbose=0)
        print('===> Classifier Accuracy: %.3f%%' % (acc * 100))
        # save the generator
        gen_file = 'generator_%04d.h5' % (step+1)
        self.generator.save(gen_file)
        # save the classifier
        class_file = 'classifier_%04d.h5' % (step+1)
        self.classifier.save(class_file)

    # train the model!
    def train(self, n_epochs=20, batch_size=100):
        # supervised data
        X_sup, y_sup = self.prepare_supervised_samples()
        # each epoch we'll train a certain number of batches...
        batches_per_epoch = int(self.dataset[0].shape[0] / batch_size)
        # calculate the number of training iterations
        training_steps = batches_per_epoch * n_epochs
        # this'll come in handy...
        half_batch = batch_size // 2
        print('n_epochs=%d, batch_size=%d, 1/2 batch=%d, batch per epoch=%d, total steps=%d' % 
            (n_epochs, batch_size, half_batch, batches_per_epoch, training_steps))
        
        # Now train
        for i in range(training_steps):
            # update the supervised discriminator a.k.a. classifier
            [X_sup_sample, y_sup_sample], _ = self.generate_real_samples([X_sup, y_sup], half_batch)
            c_loss, c_acc = self.classifier.train_on_batch(X_sup_sample, y_sup_sample)
            # update the standard, unsupervised discriminator
            # compute the loss on real and fake data separately
            [X_real, _], y_real = self.generate_real_samples(self.dataset, half_batch)
            d_loss1 = self.discriminator.train_on_batch(X_real, y_real)
            X_fake, y_fake = self.generate_fake_samples(half_batch)
            d_loss2 = self.discriminator.train_on_batch(X_fake, y_fake)
            # update the generator
            X_lat, y_lat = self.sample_latent_points(batch_size), np.ones((batch_size, 1))
            g_loss = self.gan.train_on_batch(X_lat, y_lat)
            # summarize losses 
            if (i+1) % 500 == 0:
                print('-> step=%d \n'\
                    '   classifier -> loss = %.3f, accuracy = %.0f \n'\
                    '   discriminator -> loss on real = %.3f, loss on fake = %.3f \n'\
                    '   generator -> loss = %.3f' % 
                    (i+1, c_loss, c_acc*100, d_loss1, d_loss2, g_loss))
            # evaluate model performance...and save!
            if (i+1) % batches_per_epoch == 0:
                self.summarize_performance(i)