In [1]:
# Elle McFarlane
import tensorflow as tf
import tensorflow_datasets as tfds
import matplotlib.pyplot as plt
import numpy as np
import shutil
import time

%matplotlib inline

print("GPU Available:", tf.test.is_gpu_available())
if tf.config.list_physical_devices('GPU'):
    device_name = tf.test.gpu_device_name()
else:
    device_name = 'CPU:0'

Instructions for updating:
Use `tf.config.list_physical_devices('GPU')` instead.
GPU Available: True


In [2]:
# Unzip data
drawings_archive_path = '/content/avantgarde_drawings.zip'
shutil.unpack_archive(drawings_archive_path)

In [5]:
# Config class needed for DataManager and GAN classes
class Config:
    def from_json(config_path):
        with open(config_path, "r") as config_file:
          config = json.load(config_file)
        return config

In [6]:
# DataManager class for Image, Z-vec Data
from keras.preprocessing.image import ImageDataGenerator
import PIL
import json

class DataManager:
    def __init__(self, config_file):
        config = Config.from_json(config_file)
        self.image_dims = config['image_dims']
        self.n_pnts = config['n_pnts']
        self.batch_size = config['batch_size']
        self.n_batches = config['n_pnts']//self.batch_size
        self.z_size = config['z_size']
        self.vec_mode = config['z_vec_mode']
        self.data_dir = config['data_dir']

        if self.vec_mode == 'uniform':
            self.fixed_z = tf.random.uniform(
                shape=(self.batch_size, self.z_size),
                minval=-1, maxval=1)
        elif self.vec_mode == 'normal':
            self.fixed_z = tf.random.normal(
                shape=(self.batch_size, self.z_size))

    def prepare_data(self):
        image_len, image_height = self.image_dims[0], self.image_dims[1]
        image_size = image_len, image_height

        train_datagen = ImageDataGenerator(preprocessing_function=DataManager.rescale)

        shared_imgs = train_datagen.flow_from_directory(self.data_dir,
                                                    target_size=image_size,
                                                    batch_size=self.batch_size,
                                                    shuffle=False)

        disc_imgs = train_datagen.flow_from_directory(self.data_dir,
                                                          target_size=image_size,
                                                          batch_size=self.batch_size,
                                                          shuffle=False)

        shared_data = tf.data.Dataset.from_generator(
            lambda: shared_imgs,
            output_types=(tf.float32, tf.float32),
            output_shapes = ([None,image_len,image_len,3],
                            [None,1]))

        disc_data = tf.data.Dataset.from_generator(
            lambda: disc_imgs,
            output_types=(tf.float32, tf.float32),
            output_shapes = ([None,image_len,image_len,3],
                            [None,1]))

        input_zs = self.get_noise_vecs()
        shared_data_iter = list(tf.data.Dataset.zip((input_zs, shared_data)).as_numpy_iterator())
        disc_data_iter = list(tf.data.Dataset.zip((input_zs, disc_data)).as_numpy_iterator())

        # shuffle data
        np.random.shuffle(shared_data_iter)
        np.random.shuffle(disc_data_iter)

        self.shared_data_iter = shared_data_iter
        self.disc_data_iter = disc_data_iter


    def rescale(image):
        "puts color values in [-1,1] range"
        return (image/255.)*2.-1

    def get_noise_vector(self):
        'noise vector for the generator input'
        if self.vec_mode == 'uniform':
            input_z = tf.random.uniform(
                shape=(self.z_size,), minval=-1.0, maxval=1.0)
        elif self.vec_mode == 'normal':
            input_z = tf.random.normal(shape=(self.z_size,))
        return input_z

    def get_noise_vecs(self):
        X = list(range(self.n_pnts))
        input_zs = tf.data.Dataset.from_tensor_slices(X)
        input_zs = input_zs.map(lambda x: self.get_noise_vector())
        input_zs = input_zs.batch(self.batch_size, drop_remainder=False)
        return input_zs

    def is_aligned(data):
        "makes sure each image has a z_vector in each batch"
        for batch_i, (z_vecs, (imgs,_)) in enumerate(data):
          if len(z_vecs) != len(imgs):
            return False
        return True

    def shuffle_shared_data(self):
        np.random.shuffle(self.shared_data_iter)

    def get_shared_data_iter(self):
        return self.shared_data_iter

    def get_disc_data(self, n_pnt_to_retrieve):
        np.random.shuffle(self.disc_data_iter)
        for i in range(n_pnt_to_retrieve):
            yield self.disc_data_iter[i]

    def get_fixed_z(self):
        return self.fixed_z

    def get_batch_size(self):
        return self.batch_size

    def is_data_valid(self):
        return DataManager.is_aligned(self.disc_data_iter) and \
              DataManager.is_aligned(self.shared_data_iter)

In [11]:
# Classes for both GAN networks
from abc import ABC, abstractmethod
import json

class BaseModel(ABC):
    """Abstract Model class"""
    def __init__(self, config_file):
          self.config = Config.from_json(config_file)
          self.config['size_factor'] = 2**self.config['n_blocks']
          self.config['hidden']['size'] = (
            self.config['img_size'][0]//self.config['size_factor'], 
            self.config['img_size'][1]//self.config['size_factor']
          )

    @abstractmethod
    def build(self):
      pass

    @abstractmethod
    def add_output_layer(self):
      pass

    @abstractmethod
    def build_model_with_base_layers(self):
      pass

    @abstractmethod
    def train(self):
      pass

class GeneratorDCGAN(BaseModel):
    def __init__(self, config):
        super().__init__(config)
        self.optimizer = tf.keras.optimizers.Adam(
            learning_rate=self.config['optimizer']['learning_rate'],
            beta_1=self.config['optimizer']['beta_1'],
            beta_2=self.config['optimizer']['beta_2'])

    def set_discriminator(self, disc_model):
        self.disc_model = disc_model

    def get_discriminator(self):
        return self.disc_model

    def add_output_layer(self):         
        self.model.add(
            tf.keras.layers.Conv2DTranspose(
                filters=self.config['img_size'][2],
                kernel_size=self.config['output']['kernel_size'], 
                strides=self.config['output']['strides'],
                padding=self.config['output']['padding'],
                use_bias=self.config['output']['use_bias'], 
                activation=self.config['output']['activation']))
    
    def build_model_with_base_layers(self):
        partial_model = tf.keras.Sequential([
                  tf.keras.layers.Input(shape=(self.config['z_size'])),
                  tf.keras.layers.Dense(
                      units=self.config['n_filters'] \
                      *np.prod(self.config['hidden']['size']), 
                      use_bias=self.config['base']['dense']['use_bias']),
                  tf.keras.layers.BatchNormalization(),
                  tf.keras.layers.LeakyReLU(),
                  tf.keras.layers.Reshape(
                      (self.config['hidden']['size'][0],
                      self.config['hidden']['size'][1],
                      self.config['n_filters'])),
                  tf.keras.layers.Conv2DTranspose(
                      filters=self.config['n_filters'],
                      kernel_size=self.config['base']['tconv']['kernel_size'],
                      strides=self.config['base']['tconv']['strides'],
                      padding=self.config['base']['tconv']['padding'],
                      use_bias=self.config['base']['tconv']['use_bias']),
                  tf.keras.layers.BatchNormalization(),
                  tf.keras.layers.LeakyReLU()
              ])
        self.model = partial_model

    def add_hidden_blocks(self):
        curr_n_filters = self.config['n_filters']
        for i in range(self.config['n_blocks']):
            curr_n_filters = curr_n_filters // 2
            self.model.add(
              tf.keras.layers.Conv2DTranspose(
              filters=curr_n_filters,
              kernel_size=self.config['hidden']['kernel_size'],
              strides=self.config['hidden']['strides'],
              padding=self.config['hidden']['padding'],
              use_bias=self.config['hidden']['use_bias']))
            self.model.add(tf.keras.layers.BatchNormalization())
            self.model.add(tf.keras.layers.LeakyReLU())

    def build(self):
        self.build_model_with_base_layers()
        self.add_hidden_blocks()
        self.add_output_layer()

    def train(self, input_vec):
        with tf.GradientTape() as tape:
            fake_img = self.__train(input_vec)
            disc_pred = self.get_discriminator().predict(fake_img)
            loss = -tf.math.reduce_mean(disc_pred)
        
        grads = tape.gradient(loss, gen_model.get_trainable_variables())
        gen_model.get_optimizer().apply_gradients(zip(grads, gen_model.get_trainable_variables()))
        return loss

    def __train(self, input_vec):
        return self.model(input_vec, training=True)

    def apply_gradients(self, grads):
        self.get_optimizer().apply_gradients(
          grads_and_vars=zip(grads, self.model.trainable_variables))
        
    def generate(self, input_vec):
        img = self.model(input_vec, training=False)
        return img

    def get_trainable_variables(self):
        return self.model.trainable_variables

    def get_optimizer(self):
        return self.optimizer

    def create_samples(self, input_z, batch_size):
        fake_imgs = self.generate(input_z)
        images = tf.reshape(fake_imgs, (batch_size, *self.config['img_size']))    
        # convert scale from [-1,1] back to [0,1]
        return (images+1)/2.0

class DiscriminatorDCGAN(BaseModel):
    def __init__(self, config):
        super().__init__(config)
        self.optimizer = tf.keras.optimizers.Adam(
            learning_rate=self.config['optimizer']['learning_rate'],
            beta_1=self.config['optimizer']['beta_1'],
            beta_2=self.config['optimizer']['beta_2'])

    def set_generator(self, gen_model):
        self.gen_model = gen_model

    def get_generator(self):
        return self.gen_model

    def build(self):
        self.build_model_with_base_layers()
        self.add_hidden_blocks()
        self.add_output_layer()

    def build_model_with_base_layers(self):
        partial_model = tf.keras.Sequential([
                  tf.keras.layers.Input(shape=(self.config['img_size'])),
                  tf.keras.layers.Conv2D(
                      filters=self.config['base']['conv']['n_filters'],
                      kernel_size=self.config['base']['conv']['kernel_size'], 
                      strides=self.config['base']['conv']['strides'],
                      padding=self.config['base']['conv']['padding']),
                  tf.keras.layers.BatchNormalization(),
                  tf.keras.layers.LeakyReLU()
              ])
        self.model = partial_model

    def add_hidden_blocks(self):
        curr_n_filters = self.config['n_filters']
        for i in range(self.config['n_blocks']):
            curr_n_filters = curr_n_filters * 2
            self.model.add(
                tf.keras.layers.Conv2D(
                    filters=curr_n_filters,
                    kernel_size=self.config['hidden']['kernel_size'], 
                    strides=self.config['hidden']['strides'],
                    padding=self.config['hidden']['padding']))
            self.model.add(tf.keras.layers.BatchNormalization())
            self.model.add(tf.keras.layers.LeakyReLU())
            self.model.add(tf.keras.layers.
                           Dropout(self.config['hidden']
                                              ['dropout_rate']))

    def add_output_layer(self):
        self.model.add(tf.keras.layers.Conv2D(
          filters=self.config['output']['n_filters'],
          kernel_size=self.config['hidden']['size'],
          padding=self.config['output']['padding']))        
        self.model.add(tf.keras.layers.Reshape((1,)))

    def train(self, real_img, input_z):
        with tf.GradientTape() as tape:
            fake_img = self.get_generator().generate(input_z)
            pred_real = disc_model.__train(real_img)
            pred_fake = disc_model.__train(fake_img)

            # Compute losses
            loss_real = -tf.math.reduce_mean(pred_real)
            loss_fake =  tf.math.reduce_mean(pred_fake)
            tot_loss = loss_real + loss_fake

            # Calculate gradient penalty
            with tf.GradientTape() as gp_tape:
                alpha = tf.random.uniform(
                    shape=[pred_real.shape[0], 1, 1, 1], 
                    minval=0, maxval=1.0)
                interpolated = (alpha*real_img + (1-alpha)*fake_img)
                gp_tape.watch(interpolated)
                critic_intp = disc_model.predict(interpolated)

            grads_intp = gp_tape.gradient(
                critic_intp, [interpolated,])[0]
            grads_intp_l2 = tf.sqrt(
                tf.reduce_sum(tf.square(grads_intp), axis=[1, 2, 3]))
            grad_penalty = tf.reduce_mean(tf.square(grads_intp_l2 - 1.0))

            # Apply gradient penalty
            tot_loss += tot_loss + self.config['lambda_gp']*grad_penalty

        # Compute and apply the gradients
        grads = tape.gradient(tot_loss, disc_model.get_trainable_variables())
        self.get_optimizer().apply_gradients(
            grads_and_vars=zip(grads, disc_model.get_trainable_variables()))
        
        return tot_loss, loss_real, loss_fake

    def __train(self, input_img):
        return self.model(input_img, training=True)

    def apply_gradients(self, grads):
        self.get_optimizer().apply_gradients(
          grads_and_vars=zip(grads, self.model.trainable_variables))
        
    def predict(self, input_img):
        verdict = self.model(input_img, training=False)
        return verdict

    def get_trainable_variables(self):
        return self.model.trainable_variables

    def get_optimizer(self):
        return self.optimizer

In [18]:
if __name__ == "__main__":
    # Set up data
    data_manager = DataManager('data_config.json')
    data_manager.prepare_data()

Found 9930 images belonging to 1 classes.
Found 9930 images belonging to 1 classes.


In [19]:
    # Set up GAN architecture
    with tf.device(device_name):
        gen_model = GeneratorDCGAN('generator_config.json')
        gen_model.build()

        disc_model = DiscriminatorDCGAN('discriminator_config.json')
        disc_model.build()

        gen_model.set_discriminator(disc_model)
        disc_model.set_generator(gen_model)

In [20]:
    all_losses = []
    epoch_samples = []
    critic_rounds_per_generator = 5
    n_epochs = 200

    start_time = time.time()
    for epoch in range(1, n_epochs+1):
        epoch_losses = []
        data_manager.shuffle_shared_data()
        for batch_i, (z_vec, (real_img, _)) in enumerate(data_manager.get_shared_data_iter()):
            if (batch_i % 50) == 0:
                print("batch", batch_i)
            # only train the discriminator
            for bonus_z_vec, (bonus_real_img, _) in data_manager.get_disc_data(critic_rounds_per_generator):
                disc_model.train(bonus_real_img, bonus_z_vec)
            # train both discriminator and generator
            d_loss, d_loss_real, d_loss_fake = disc_model.train(real_img, z_vec)
            g_loss = gen_model.train(z_vec)

            epoch_losses.append(
                (g_loss.numpy(), d_loss.numpy(), 
                  d_loss_real.numpy(), d_loss_fake.numpy()))
                            
            all_losses.append(epoch_losses)

        print('Epoch {:-3d} | Est. Time {:.2f} min | Avg Losses >>'
              ' G/D {:6.2f}/{:6.2f} [D-Real: {:6.2f} D-Fake: {:6.2f}]'
              .format(epoch, (time.time() - start_time)/60, 
                      *list(np.mean(all_losses[-1], axis=0)))
        )

        epoch_samples.append(
            gen_model.create_samples(data_manager.get_fixed_z(),
                                    data_manager.get_batch_size()).numpy()
        )

batch 0


KeyboardInterrupt: ignored