# Generative Deep Learning

Up to now, we mostly focused on classification and regression tasks in the context of deep learning. 

However, an important avenue of research is the *generation* of patterns with deep learning models.

Two of the most famous generative models are the Generative Adversarial Network (GAN) and the Variational Autoencoder (VAE).  
We will look at [an example](https://keras.io/examples/generative/dcgan_overriding_train_step/) of how to generate faces with a GAN.

**Warning**: these beasts are hard to train!

In [None]:
import tensorflow as tf
import tensorflow.keras as K
import numpy as np
import matplotlib.pyplot as plt
import os
import gdown
from zipfile import ZipFile

Generative models need a dataset from which learn what to generate!  
[CelebA dataset](http://mmlab.ie.cuhk.edu.hk/projects/CelebA.html) provides a large number of examples of... faces!

In [None]:
os.makedirs("celeba_gan")

url = "https://drive.google.com/uc?id=1O7m1010EJjLE5QxLZiM9Fpjs7Oj6e684"
output = "celeba_gan/data.zip"
gdown.download(url, output, quiet=True)

with ZipFile("celeba_gan/data.zip", "r") as zipobj:
    zipobj.extractall("celeba_gan")

Let's look at what we've got. We can exploit Keras to easily build a dataset ouf of it.

Remember also to normalize data!

In [None]:
dataset = K.preprocessing.image_dataset_from_directory(
    "celeba_gan", label_mode=None, image_size=(64, 64), batch_size=32
)
dataset = dataset.map(lambda x: x / 255.0)

We don't need labels... Why?

In [None]:
for x in dataset: # no y!
  print(x.shape)
  print(np.max(x), np.min(x))
  break

In [None]:
plt.imshow(x[0])
plt.axis('off')

## Let's build the model!

**DISCRIMINATOR**: the one that has to decide if an image is real or fake

Binary classification from images! Easy!

In [None]:
discriminator = K.Sequential(name='discriminator')
discriminator.add(K.Input(shape=(64, 64, 3)))
discriminator.add(K.layers.Conv2D(64, kernel_size=4, strides=2, padding="same"))
discriminator.add(K.layers.LeakyReLU(alpha=0.2))
discriminator.add(K.layers.Conv2D(128, kernel_size=4, strides=2, padding="same"))
discriminator.add(K.layers.LeakyReLU(alpha=0.2))
discriminator.add(K.layers.Conv2D(128, kernel_size=4, strides=2, padding="same"))
discriminator.add(K.layers.LeakyReLU(alpha=0.2))
     
discriminator.add(K.layers.Flatten())
discriminator.add(K.layers.Dropout(0.2))
discriminator.add(K.layers.Dense(1, activation="sigmoid")) # sigmoid is enough

discriminator.summary()

**GENERATOR**: generate a face from random noise

In [None]:
latent_dim = 128 # size of the discriminator before flattening

generator = K.Sequential(name='generator')
generator.add(K.Input(shape=(latent_dim,)))
generator.add(K.layers.Dense(8 * 8 * 128))
generator.add(K.layers.Reshape((8, 8, 128))) # set this to match the discriminator final feature map
generator.add(K.layers.Conv2DTranspose(128, kernel_size=4, strides=2, padding="same"))
generator.add(K.layers.LeakyReLU(alpha=0.2))
generator.add(K.layers.Conv2DTranspose(256, kernel_size=4, strides=2, padding="same"))
generator.add(K.layers.LeakyReLU(alpha=0.2))
generator.add(K.layers.Conv2DTranspose(512, kernel_size=4, strides=2, padding="same"))
generator.add(K.layers.LeakyReLU(alpha=0.2))
generator.add(K.layers.Conv2D(3, kernel_size=5, padding="same", activation="sigmoid"))

generator.summary()

**The trickier part**: how to train the entire GAN?

We have to build a custom Keras Model which encapsulates both the generator and the discriminator and train them accordingly.

In [None]:
class GAN(K.Model):
    def __init__(self, discriminator, generator, latent_dim):
        super(GAN, self).__init__()
        self.discriminator = discriminator
        self.generator = generator
        self.latent_dim = latent_dim

    def compile(self, d_optimizer, g_optimizer, loss_fn):
        super(GAN, self).compile()
        self.d_optimizer = d_optimizer
        self.g_optimizer = g_optimizer
        self.loss_fn = loss_fn
        
        # use standalone metrics to maintain a mean loss
        # for both the models
        self.d_loss_metric = K.metrics.Mean(name="d_loss")
        self.g_loss_metric = K.metrics.Mean(name="g_loss")

    @property
    def metrics(self):
        return [self.d_loss_metric, self.g_loss_metric]

    def train_step(self, real_images):
        # GENERATE RANDOM INITIAL VECTOR FOR THE GENERATOR
        batch_size = tf.shape(real_images)[0]
        random_latent_vectors = tf.random.normal(shape=(batch_size, self.latent_dim))

        # GENERATE FAKE IMAGES
        generated_images = self.generator(random_latent_vectors)

        # MIX THE DATA
        combined_images = tf.concat([generated_images, real_images], axis=0)
        labels = tf.concat(
            [tf.ones((batch_size, 1)), tf.zeros((batch_size, 1))], axis=0
        ) # 0=real, 1=fake
        # Label smoothing: ease convergence
        labels += 0.05 * tf.random.uniform(tf.shape(labels))

        # Train the discriminator
        with tf.GradientTape() as tape:
            predictions = self.discriminator(combined_images)
            d_loss = self.loss_fn(labels, predictions)
        grads = tape.gradient(d_loss, self.discriminator.trainable_weights)
        self.d_optimizer.apply_gradients(
            zip(grads, self.discriminator.trainable_weights)
        )

        # Sample random points in the latent space
        random_latent_vectors = tf.random.normal(shape=(batch_size, self.latent_dim))

        # Assemble labels that say "all real images"
        misleading_labels = tf.zeros((batch_size, 1))

        # Train the generator (note that we should *not* update the weights
        # of the discriminator)!
        with tf.GradientTape() as tape:
            predictions = self.discriminator(self.generator(random_latent_vectors))
            g_loss = self.loss_fn(misleading_labels, predictions)
        grads = tape.gradient(g_loss, self.generator.trainable_weights)
        self.g_optimizer.apply_gradients(zip(grads, self.generator.trainable_weights))

        # Update metrics
        self.d_loss_metric.update_state(d_loss)
        self.g_loss_metric.update_state(g_loss)
        return {
            "d_loss": self.d_loss_metric.result(),
            "g_loss": self.g_loss_metric.result(),
        }

If you want to know more about the label trick, you can look [here](https://www.inference.vc/instance-noise-a-trick-for-stabilising-gan-training/).

## Training

**Warning**: this will take quite a lot of time on the entire dataset. You can consider to reduce the dataset size with `take`.

In [None]:
epochs = 2
MAX_DATASET_SIZE = 10000 # -1 to take entire dataset

gan = GAN(discriminator=discriminator, generator=generator, latent_dim=latent_dim)
gan.compile(d_optimizer=K.optimizers.Adam(learning_rate=0.0001), 
            g_optimizer=K.optimizers.Adam(learning_rate=0.0001), 
            loss_fn=K.losses.BinaryCrossentropy())

dataset = dataset.take(MAX_DATASET_SIZE)
gan.fit(dataset, epochs=epochs)

## Results!

In [None]:
def generate_faces(generator, latent_dim, num_faces=6):
  random_latent_vectors = tf.random.normal(shape=(num_faces, latent_dim))
  generated_images = generator(random_latent_vectors) # these are in [0, 1]
  generated_images.numpy()
  return generated_images 

In [None]:
num_faces = 6
faces = generate_faces(generator, latent_dim, num_faces)

In [None]:
fig, ax = plt.subplots(1, num_faces)
for i, face in enumerate(faces):
  ax[i].imshow(face)
  ax[i].axis('off')

**Mode collapse**: produce similar outputs! There is not enough diversity in the restricted dataset.

**Exercise**: build a character-level RNN generator. Generate sentences character by character from a fixed-size alphabet.

E.g. take the english alphabet plus some punctuation symbol (+ start and end of sequence). Train your RNN on a corpus of text, character by character. Train it to predict the next character until the END is encountered. Then, try to generate text starting from START until END is produced.

# Deep Graph Networks (aka Graph Neural Networks)

[Spektral](https://graphneural.network/) is a high-level library based on Keras for graph applications.

We don't have time to dive into many details. Here, I just wanted to show you how easily it can be to build a complex model leveraging Keras and Spektral functionalities.

[The task](https://arxiv.org/pdf/1609.02907.pdf) we will consider is a node classification task. Each node in the graph has to be associated to a specific class.

Let's first install Spektral!

In [None]:
!pip install spektral

In [None]:
from spektral.data.loaders import SingleLoader
from spektral.datasets.citation import Citation
from spektral.layers import GCNConv
from spektral.models.gcn import GCN
from spektral.transforms import AdjToSpTensor, LayerPreprocess

We will work with Citation networks, specifically the [Cora dataset](https://graphsandnetworks.com/the-cora-dataset/), check it out!  
"*The Cora dataset consists of 2708 scientific publications classified into one of seven classes. The citation network consists of 5429 links. Each publication in the dataset is described by a 0/1-valued word vector indicating the absence/presence of the corresponding word from the dictionary. The dictionary consists of 1433 unique words.*"

So, nodes are documents and links are citations.  
Node classification task requires to predict the label of a paper among 7 possible classes (paper subject area).

In [None]:
dataset = Citation("cora", normalize_x=True, 
                   transforms=[LayerPreprocess(GCNConv),  # the model we will use needs to preprocess the adj matrix
                                                          # GNCConv implements a preprocess method which is called in the transform
                               AdjToSpTensor()])

In [None]:
dataset

this is not the usual dataset you are accustomed to. This is what is called `Single Data Mode` in Spektral (only one graph, usually for node classification tasks).

In [None]:
dataset[0] # take the graph

we have some nodes and some links with associated features and labels

In [None]:
print(dataset[0].a.shape) # adj matrix
print(dataset[0].x.shape) # feature nodes (1433 feature per node indicating presence/absence of word)
# print(dataset[0].e.shape) # edge features, we don't have them


In [None]:
print(dataset[0].a)

**Create the Graph Convolutional Network model**

In [None]:
model = GCN(n_labels=dataset.n_labels, n_input_channels=dataset.n_node_features)

model.compile(optimizer=K.optimizers.Adam(learning_rate=1e-2), 
              loss=K.losses.CategoricalCrossentropy(reduction="sum"), # take the sum, not the mean across nodes
              metrics=["accuracy"])

Split dataset into training, validation and test using Spektral

In [None]:
loader_tr = SingleLoader(dataset, sample_weights=dataset.mask_tr)
loader_va = SingleLoader(dataset, sample_weights=dataset.mask_va)
loader_te = SingleLoader(dataset, sample_weights=dataset.mask_te)

## Training the model

In [None]:
model.fit(loader_tr.load(),
          steps_per_epoch=loader_tr.steps_per_epoch,
          validation_data=loader_va.load(),
          validation_steps=loader_va.steps_per_epoch,
          epochs=200,
          callbacks=[K.callbacks.EarlyStopping(patience=10, restore_best_weights=True)])

In [None]:
eval_results = model.evaluate(loader_te.load(), steps=loader_te.steps_per_epoch)