# **Art historical GAN**<br>

This notebook contains the code to implement and train the networks described in the thesis "Latent Space Analysis in an Art Creating GAN". <br>

There is no setup required. Simply clone this repo to Google Drive and run this notebook with Google Colaboratory.<br> 

The 128x128 pixel data set can be downloaded from Drive as it is to large to be uploaded on Github. Alternatively the code can be run with the 64x64 pixel data set from the data folder.

In [None]:
import random
import time
import numpy as np
%tensorflow_version 2.x
import tensorflow as tf
import os
from pathlib import Path
from PIL import Image
import matplotlib.pyplot as plt

Mount Drive and unzip the data archive on the virtual maschine. 

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
zip_path = '/content/drive/My\ Drive/ArtHistoricalGAN/data/64.zip' # adopt this path if using the 128x128 data set
!cp {zip_path} .
!unzip -q 64.zip -d Images
!rm 64.zip

In [None]:
# define a path to save models and output images to
RES_PATH = 'drive/My Drive/ArtHistoricalGAN/'

Make the wikiart_genre file available in the Colab session.

In [None]:
import sys
sys.path.append('/content/drive/My Drive/ArtHistoricalGAN/utility')
import wikiart_genre

Check if files were properly unzipped or just look through some images from the data set.

In [None]:
%%time
img_path = Path("Images/Expressionism/4.jpg")
image = Image.open(img_path).convert(mode="RGB")
image = np.asarray(image)
plt.imshow(image);

**Build the WGAN-GP**

In [None]:
from tensorflow.keras.layers import Input, Dense, Reshape, Flatten, Dropout, multiply
from tensorflow.keras.layers import BatchNormalization, Activation, Embedding, ZeroPadding2D, LeakyReLU
from tensorflow.keras.layers import UpSampling2D, Conv2D, Conv2DTranspose, LayerNormalization
from tensorflow.keras.models import Sequential, Model
from tensorflow.keras.optimizers import Adam, RMSprop

from functools import partial



# Define auxillary class for gradient penalty
class RandomWeightedAverage(tf.keras.layers.Layer):
    """Provides a (random) weighted average between real and generated image samples"""
    def __init__(self, batch_size=BATCH_SIZE):
        super().__init__()
        self.batch_size = batch_size

    def call(self, inputs):
        alpha = tf.random.uniform(shape=(self.batch_size, 1, 1, 1), maxval=1)
        return (alpha * inputs[0]) + ((1 - alpha) * inputs[1])
 
class WGAN_GP():
    def __init__(self):
        # Input shape
        self.img_rows = 128
        self.img_cols = 128
        self.channels = 3
        self.img_shape = (self.img_rows, self.img_cols, self.channels)
        # critic iterations for each gen update
        self.n_critic = 10
        self.num_classes = 5
        self.latent_dim = 128
        # gradient penalty weight
        self.LAMBDA = 10
        self.losslog = []
        self.batch_size=90

        #optimizer = Adam(0.0001, beta_1=0.5, beta_2=0.9)
        optimizer = RMSprop(0.00005)

        # Build the generator and critic
        self.generator = self.build_generator()
        self.critic = self.build_critic()

        #-------------------------------
        # Construct Computational Graph
        #       for the Critic
        #-------------------------------

        # Freeze generator's layers while training critic
        self.generator.trainable = False

        # Image input (real sample)
        real_img = Input(shape=self.img_shape)

        # Noise input
        z_disc = Input(shape=(self.latent_dim,))
        # Generate image based of noise (fake sample)
        fake_img = self.generator(z_disc)

        # Discriminator determines validity of the real and fake images
        fake = self.critic(fake_img)
        valid = self.critic(real_img)

        # Construct weighted average between real and fake images
        interpolated_img = RandomWeightedAverage()([real_img, fake_img])
        # Determine validity of weighted sample
        validity_interpolated = self.critic(interpolated_img)

        # Use Python partial to provide loss function with additional
        # 'averaged_samples' argument
        partial_gp_loss = partial(self.gradient_penalty_loss,
                          averaged_samples=interpolated_img)
        partial_gp_loss.__name__ = 'gradient_penalty' # Keras requires function names

        self.critic_model = Model(inputs=[real_img, z_disc],
                            outputs=[valid, fake, validity_interpolated])
        self.critic_model.compile(loss=[self.wasserstein_loss,
                                        self.wasserstein_loss,
                                        partial_gp_loss], optimizer=optimizer,
                                        loss_weights=[1, 1, self.LAMBDA])
        #-------------------------------
        # Construct Computational Graph
        #         for Generator
        #-------------------------------

        # For the generator we freeze the critic's layers
        self.critic.trainable = False
        self.generator.trainable = True

        # Noise input
        z_gen = Input(shape=(self.latent_dim,))
        # generate image
        img = self.generator(z_gen)
        # critic determines validity 
        valid = self.critic(img)
        # The combined model  (stacked generator and discriminator)
        # Trains the generator to fool the discriminator
        self.combined = Model(z_gen, valid)
        self.combined.compile(loss=self.wasserstein_loss, optimizer=optimizer) # loss weight to force different imgs for different labels?!
        
    def _compute_gradients(self, tensor, var_list):
      grads = tf.gradients(tensor, var_list)
      return [grad if grad is not None else tf.zeros_like(var) for var, grad in zip(var_list, grads)]
    
    def gradient_penalty_loss(self, y_true, y_pred, averaged_samples):
        """
        Computes gradient penalty based on prediction and weighted real / fake samples
        """
        gradients = self._compute_gradients(y_pred, [averaged_samples])[0]
        # compute the euclidean norm by squaring ...
        gradients_sqr = tf.math.square(gradients)
        #   ... summing over the rows ...
        gradients_sqr_sum = tf.math.reduce_sum(gradients_sqr,
                                  axis=np.arange(1, len(gradients_sqr.shape)))
        #   ... and sqrt
        gradient_l2_norm = tf.math.sqrt(gradients_sqr_sum)
        # compute lambda * (1 - ||grad||)^2 still for each single sample, lambda is in loss weight already?
        gradient_penalty = tf.math.square(1 - gradient_l2_norm)
        # return the mean as loss over all the batch samples
        return tf.math.reduce_mean(gradient_penalty)


    def wasserstein_loss(self, y_true, y_pred):
        return tf.math.reduce_mean(y_true * y_pred)


    def build_generator(self):

        model = Sequential()

        model.add(Dense(128 * self.img_rows * 2, activation="relu", input_dim=self.latent_dim))
        model.add(Reshape((8, 8, 512)))
        model.add(UpSampling2D())
        #16X16
        model.add(Conv2D(512, kernel_size=4, padding="same"))
        model.add(BatchNormalization(momentum=0.8))
        model.add(Activation("relu"))
        model.add(UpSampling2D())
        #32x32
        model.add(Conv2D(256, kernel_size=4, padding="same"))
        model.add(BatchNormalization(momentum=0.8))
        model.add(Activation("relu"))
        model.add(UpSampling2D())
        #64x64
        model.add(Conv2D(128, kernel_size=4, padding="same"))
        model.add(BatchNormalization(momentum=0.8))
        model.add(Activation("relu"))
        model.add(UpSampling2D())
        #128x128
        model.add(Conv2D(64, kernel_size=4, padding="same"))
        model.add(BatchNormalization(momentum=0.8))
        model.add(Activation("relu"))
        model.add(Conv2D(self.channels, kernel_size=4, padding="same"))
        model.add(Activation("tanh"))

        model.summary()

        noise = Input(shape=(self.latent_dim,))

        img = model(noise)

        return Model(noise, img)


    def build_critic(self):
        model = Sequential()
        
        model.add(Conv2D(16, kernel_size=3, strides=2, padding="same", input_shape=self.img_shape))
        model.add(LeakyReLU(alpha=0.2))
        model.add(Dropout(0.25))
        model.add(Conv2D(32, kernel_size=3, strides=2, padding="same"))
        model.add(LeakyReLU(alpha=0.2))
        model.add(Dropout(0.25))
        model.add(Conv2D(64, kernel_size=3, strides=2, padding="same"))
        model.add(LayerNormalization(epsilon=1e-5))
        model.add(LeakyReLU(alpha=0.2))
        model.add(Dropout(0.25))
        model.add(Conv2D(128, kernel_size=3, strides=2, padding="same"))
        model.add(LayerNormalization(epsilon=1e-5))
        model.add(LeakyReLU(alpha=0.2))
        model.add(Dropout(0.25))
        model.add(Conv2D(256, kernel_size=3, strides=1, padding="same"))
        model.add(LayerNormalization(epsilon=1e-5))
        model.add(LeakyReLU(alpha=0.2))
        model.add(Dropout(0.25))
        model.add(Conv2D(512, kernel_size=3, strides=1, padding="same"))
        model.add(LayerNormalization(epsilon=1e-5))
        model.add(LeakyReLU(alpha=0.2))
        model.add(Dropout(0.25))
        model.add(Flatten())
        model.add(Dense(1))

        model.summary()
        
        img = Input(shape=self.img_shape)

        # Determine validity of the image
        validity = model(img)

        return Model(img, validity)

    def train(self, epochs, sample_interval=100, start_epoch=0, preiterations=0, batch_size=self.batch_size):
        if start_epoch > 0:
          self.generator.load_weights(RES_PATH + "saved_model/generator_weights.hdf5")
          self.critic.load_weights(RES_PATH + "saved_model/discriminator_weights.hdf5")
        # Dataset iterator
        train_gen, dev_gen = wikiart_genre.load(batch_size)
        train_gen = train_gen()
        test_gen = dev_gen()
        # Adversarial ground truths
        valid = -np.ones((batch_size, 1))
        fake = np.ones((batch_size, 1))
        dummy = np.zeros((batch_size, 1)) # Dummy gt for gradient penalty

        # pretrain the discriminator if desired
        for iterp in range(preiterations):
          # Get the next batch of images.
          imgs, labels = next(train_gen)
          # scale between -1 and 1 to match gen output
          imgs = (imgs.astype(np.float32) - 127.5) / 127.5
          # Sample noise as generator input
          noise = np.random.normal(0, 1, (batch_size, self.latent_dim))

          # Train the critic
          d_loss = self.critic_model.train_on_batch([imgs, noise],
                                                    [valid, fake, dummy])
          if (iterp+1) % 100 == 0: 
            print ("%d [D loss: %f] [D loss label: %f]" % (iterp, d_loss[0], d_loss[1]))

        for epoch in range(start_epoch, epochs):

            # ---------------------
            #  Train Discriminator
            # ---------------------
            for _ in range(self.n_critic):

                imgs, labels = next(train_gen)
                # scale between -1 and 1 to match gen output
                imgs = (imgs.astype(np.float32) - 127.5) / 127.5
                # Sample noise as generator input
                noise = np.random.normal(0, 1, (batch_size, self.latent_dim))

                # Train the critic
                d_loss = self.critic_model.train_on_batch([imgs, noise],
                                                                [valid, fake, dummy])
            # ---------------------
            #  Train Generator
            # ---------------------

            g_loss = self.combined.train_on_batch(noise, valid)

            # Plot the progress
            print ("%d [D loss: %f] [G loss: %f]" % (epoch, d_loss[0], g_loss))
            self.losslog.append([d_loss, g_loss])

            # If at save interval => save generated image samples and models
            if (epoch+1) % sample_interval == 0:
                # save model and images
                self.save_model()
                self.sample_images(epoch)
                # evaluate on test set
                imgs, labels = next(test_gen)
                # scale between -1 and 1 to match gen output
                imgs = (imgs.astype(np.float32) - 127.5) / 127.5
                # Sample noise as generator input
                noise = np.random.normal(0, 1, (batch_size, self.latent_dim))

                # Test the critic
                d_loss_test = self.critic_model.test_on_batch([imgs, noise],
                                                        [valid, fake, dummy])
                # Test the generator
                g_loss_test = self.combined.test_on_batch(noise, valid)

                # Plot the progress
                print ("%d [D loss test: %f] [G loss test: %f]" % (epoch, d_loss_test[0], g_loss_test))
                with open(RES_PATH + 'loss.log', 'a') as f:
                    for each in self.losslog:
                        f.writelines('%s, %s\n'%(each[0], each[1]))

    def sample_images(self, epoch):
        r, c = 10, 10
        noise = np.random.normal(0, 1, (r * c, self.latent_dim))
        #sampled_labels = np.array([num for _ in range(20) for num in range(5)])
        gen_imgs = self.generator.predict(noise)
        # Rescale images 0 - 1
        gen_imgs = 0.5 * gen_imgs + 0.5
        fig, axs = plt.subplots(r, c)
        cnt = 0
        for i in range(r):
            for j in range(c):
                axs[i,j].imshow(gen_imgs[cnt,:,:,:])
                axs[i,j].axis('off')
                cnt += 1
        fig.savefig(RES_PATH + "images_%d.jpg" % epoch)
        plt.close()
        
    def generate_image(self):
        self.generator.load_weights(RES_PATH + "saved_model/generator_weights.hdf5")
        noise = np.random.normal(0, 1, (1, self.latent_dim))
        gen_img = self.generator.predict(noise)

        # Rescale images 0 - 1
        gen_img = 0.5 * gen_img + 0.5
        plt.imshow(gen_img[0,:,:,:])
        plt.axis('off')

    def plot_images(self):
        r, c = 5, 5
        self.generator.load_weights(RES_PATH + "saved_model/generator_weights.hdf5")
        noise = np.random.normal(0, 1, (r * c, self.latent_dim))
        #sampled_labels = np.array([num for _ in range(20) for num in range(5)])
        gen_imgs = self.generator.predict(noise)
        # Rescale images 0 - 1
        gen_imgs = 0.5 * gen_imgs + 0.5
        fig, axs = plt.subplots(r, c, figsize=(6,6))
        cnt = 0
        for i in range(r):
            for j in range(c):
                axs[i,j].imshow(gen_imgs[cnt,:,:,:])
                axs[i,j].axis('off')
                cnt += 1
        fig.savefig(RES_PATH + "generated_images.jpg")

    def save_model(self):

        def save(model, model_name):
            model_path =  RES_PATH + "saved_model/%s.json" % model_name
            weights_path = RES_PATH + "saved_model/%s_weights.hdf5" % model_name
            options = {"file_arch": model_path,
                        "file_weight": weights_path}
            json_string = model.to_json()
            open(options['file_arch'], 'w').write(json_string)
            model.save_weights(options['file_weight'], overwrite=True)

        save(self.generator, "generator")
        save(self.critic, "discriminator")

Initialize and train the model

In [None]:
wgan = WGAN_GP()

In [None]:
wgan.train(epochs=10000, sample_interval=500)

In [None]:
# plot and save 25 sample generated images
wgan.plot_images()

**Build and train the Classifier**

In [None]:
from tensorflow import keras
from tensorflow.keras import layers

Construct and balance the data set

In [None]:
data_dir = Path("Images/")
image_count = len(list(data_dir.glob('*/*.jpg')))
print(image_count)
BATCHSIZE = 128

In [None]:
train_ds = tf.keras.preprocessing.image_dataset_from_directory(
  data_dir,
  label_mode='categorical',
  validation_split=0.2,
  subset="training",
  seed=123,
  image_size=(DIM, DIM),
  batch_size=BATCHSIZE)

In [None]:
val_ds = tf.keras.preprocessing.image_dataset_from_directory(
  data_dir,
  label_mode='categorical',
  validation_split=0.2,
  subset="validation",
  seed=123,
  image_size=(DIM, DIM),
  batch_size=BATCHSIZE)

In [None]:
imp_train_ds = (
  train_ds
    .unbatch()
    .filter(lambda images, label: label[0] == 1)
    .repeat())
exp_train_ds = (
  train_ds
    .unbatch()
    .filter(lambda images, label: label[1] == 1)
    .repeat())
gogh_train_ds = (
  train_ds
    .unbatch()
    .filter(lambda images, label: label[2] == 1)
    .repeat())
cez_train_ds = (
  train_ds
    .unbatch()
    .filter(lambda images, label: label[3] == 1)
    .repeat())
gaug_train_ds = (
  train_ds
    .unbatch()
    .filter(lambda images, label: label[4] == 1)
    .repeat())

In [None]:
imp_val_ds = (
  val_ds
    .unbatch()
    .filter(lambda images, label: label[0] == 1)
    .repeat())
exp_val_ds = (
  val_ds
    .unbatch()
    .filter(lambda images, label: label[1] == 1)
    .repeat())
gogh_val_ds = (
  val_ds
    .unbatch()
    .filter(lambda images, label: label[2] == 1)
    .repeat())
cez_val_ds = (
  val_ds
    .unbatch()
    .filter(lambda images, label: label[3] == 1)
    .repeat())
gaug_val_ds = (
  val_ds
    .unbatch()
    .filter(lambda images, label: label[4] == 1)
    .repeat())

In [None]:
resampled_train_ds = tf.data.experimental.sample_from_datasets([imp_train_ds, exp_train_ds, gogh_train_ds, cez_train_ds, gaug_train_ds], weights=[0.2, 0.2, 0.2, 0.2, 0.2])
resampled_train_ds = resampled_train_ds.batch(BATCHSIZE)

resampled_val_ds = tf.data.experimental.sample_from_datasets([imp_val_ds, exp_val_ds, gogh_val_ds, cez_val_ds, gaug_val_ds], weights=[0.2, 0.2, 0.2, 0.2, 0.2])
resampled_val_ds = resampled_val_ds.batch(BATCHSIZE)

Print samples images

In [None]:
class_names = train_ds.class_names
print(class_names)
plt.figure(figsize=(10, 10))
for images, labels in resampled_train_ds.take(1):
  for i in range(25):
    ax = plt.subplot(5, 5, i + 1)
    plt.imshow(images[i].numpy().astype("uint8"))
    plt.title(class_names[np.where(labels[i] == 1)[0][0]])
    plt.axis("off")
plt.savefig(RES_PATH + 'example_images.png')

Prepare for training - this uses the unbalanced data set for now

In [None]:
AUTOTUNE = tf.data.experimental.AUTOTUNE

train_ds = train_ds.cache().shuffle(10000).prefetch(buffer_size=AUTOTUNE)
val_ds = val_ds.cache().prefetch(buffer_size=AUTOTUNE)

In [None]:
data_augmentation = keras.Sequential(
  [
    layers.experimental.preprocessing.RandomFlip("horizontal", 
                                                 input_shape=(DIM, DIM,3)),
    layers.experimental.preprocessing.RandomRotation(0.1),
    layers.experimental.preprocessing.RandomZoom(0.1),
  ]
)

In [None]:
num_classes = 5

cls = tf.keras.models.Sequential([
  data_augmentation,
  layers.experimental.preprocessing.Rescaling(1./255),
  layers.Conv2D(16, 3, padding='same', activation='relu'),
  layers.MaxPooling2D(),
  layers.Conv2D(32, 3, padding='same', activation='relu'),
  layers.MaxPooling2D(),
  layers.Conv2D(64, 3, padding='same', activation='relu'),
  layers.MaxPooling2D(),
  layers.Dropout(0.2),
  layers.Flatten(),
  layers.Dense(128, activation='relu'),
  layers.Dense(num_classes, activation='softmax')
])

In [None]:
cls.compile(optimizer='adam',
              loss=tf.keras.losses.CategoricalCrossentropy(),
              metrics=['accuracy'])

cls.summary()

In [None]:
epochs = 50
history = cls.fit(
  train_ds,
  validation_data=val_ds,
  epochs=epochs
)

Plot training results

In [None]:
acc = history.history['accuracy']
val_acc = history.history['val_accuracy']

loss = history.history['loss']
val_loss = history.history['val_loss']

epochs_range = range(epochs)

plt.figure(figsize=(8, 8))
plt.subplot(1, 2, 1)
plt.plot(epochs_range, acc, label='Training Accuracy')
plt.plot(epochs_range, val_acc, label='Validation Accuracy')
#plt.legend(loc='lower right')
plt.ylabel('accuracy [%]')
plt.xlabel('epochs')
plt.title('Accuracy')

plt.subplot(1, 2, 2)
plt.plot(epochs_range, loss, label='Training')
plt.plot(epochs_range, val_loss, label='Validation')
plt.legend(loc='upper right')
plt.title('Loss')
plt.ylabel('loss')
plt.xlabel('epochs')
plt.savefig(RES_PATH + 'training_cls.jpg')

**Latent Space Experiments** <br>

Work in progess