In [7]:
# disable compiler warnings
import os

# imports 
import tensorflow as tf
import tensorflow_datasets as tfds
from tensorflow.python.ops.numpy_ops import np_config
np_config.enable_numpy_behavior()
import numpy as np
import matplotlib.pyplot as plt
from tensorflow.python.keras.layers import Dense
from typing import List
import datetime
from tqdm.notebook import tqdm

from tensorflow.python.client import device_lib
#os.environ['TF_CPP_MIN_LOG_LEVEL'] = '0'  # FATAL
print("Num GPUs Available: ", tf.config.list_physical_devices('GPU'))

Num GPUs Available:  []


In [8]:
import urllib
categories = [line.rstrip(b'\n') for line in urllib.request.urlopen('https://raw.githubusercontent.com/googlecreativelab/quickdraw-dataset/master/categories.txt')]
print(categories[:10])
category = 'candle'

[b'aircraft carrier', b'airplane', b'alarm clock', b'ambulance', b'angel', b'animal migration', b'ant', b'anvil', b'apple', b'arm']


In [9]:
# Creates a folder to download the original drawings into.
# We chose to use the numpy format : 1x784 pixel vectors, with values going from 0 (white) to 255 (black). We reshape them later to 28x28 grids and normalize the pixel intensity to [-1, 1]

if not os.path.isdir('npy_files'):
    os.mkdir('npy_files')
    
url = f'https://storage.googleapis.com/quickdraw_dataset/full/numpy_bitmap/{category}.npy'  
urllib.request.urlretrieve(url, f'npy_files/{category}.npy')

images = np.load(f'npy_files/{category}.npy')
print(f'{len(images)} images to train on')

# You can limit the amount of images you use for training by setting :
train_images = images[:10000]
# You should also define a samller subset of the images for testing..
val_images = images[10000:20000]

train_ds = tf.data.Dataset.from_tensors(train_images)
val_ds = tf.data.Dataset.from_tensors(val_images)

141545 images to train on


In [29]:
def prepare_data(dataset, batch_size):
  dataset = dataset.map(lambda img: tf.reshape(img, [28, 28, 1]))

  dataset = dataset.map(lambda img: tf.cast(img, tf.float32))
  
  dataset = dataset.map(lambda img: (img/128.)-1.)

  dataset = dataset.cache()
  dataset = dataset.shuffle(4096)
  dataset = dataset.batch(batch_size)
  dataset = dataset.prefetch(tf.data.AUTOTUNE)

train_ds = prepare_data(train_ds, 32)
val_ds = prepare_data(val_ds, 32)

ValueError: ignored

In [23]:
class Discriminator(tf.keras.Model):
    def __init__(self):
        super(Discriminator, self).__init__()
        self.slayers = [
                                           tf.keras.layers.Conv2D(filters=32,
                                                                  kernel_size=3,
                                                                  strides=(2,2),
                                                                  input_shape=(None,28,28,1)),
                                           tf.keras.layers.BatchNormalization(),
                                           tf.keras.layers.Activation('relu'),
                                           tf.keras.layers.Conv2D(filters=64,
                                                                  kernel_size=3,
                                                                  strides=(2,2)),
                                           tf.keras.layers.BatchNormalization(),
                                           tf.keras.layers.Activation('relu'),
                                           tf.keras.layers.Flatten(),
                                           # binary decision of fake/real data
                                           tf.keras.layers.Dense(1, activation='sigmoid')]

    def call(self, x, training=False):
        for layer in self.slayers:
            try:  # training argument only for BN layer
                x = layer(x, training) 
            except:
                x = layer(x)
        return x

In [21]:
class Generator(tf.keras.Model):
    def __init__(self, latent_dim=100, restore_shape=(7,7,64)):
        super(Generator, self).__init__()
        self.latent_dim = latent_dim
        self.slayers = [tf.keras.layers.Dense(units=int(tf.math.reduce_prod(restore_shape)),
                                             input_shape=(latent_dim,)),
                       tf.keras.layers.BatchNormalization(),
                       tf.keras.layers.Activation('relu'),
                       # reshape to 3 dim with depth dim again
                       tf.keras.layers.Reshape(target_shape=restore_shape),
                       # (2,2) strided transposed conv to upsample        
                       tf.keras.layers.Conv2DTranspose(filters=32,
                                                       kernel_size=(3,3),
                                                       strides=(2,2),
                                                       padding='same'),
                       tf.keras.layers.BatchNormalization(),
                       tf.keras.layers.Activation('relu'),
                       # restore image by convolution with image size
                       tf.keras.layers.Conv2DTranspose(filters=1,
                                                       kernel_size=(3,3),
                                                       strides=(2,2),
                                                       padding='same'),
                       tf.keras.layers.BatchNormalization(),
                       # use sigmoid to get values between 0 and 1
                       tf.keras.layers.Activation('sigmoid')]

    def call(self, x, training=False):
        for layer in self.slayers:
            try:  # training argument only for BN layer
                x = layer(x, training) 
            except:
                x = layer(x)
        return x


In [13]:
@tf.function
def training_step_GAN(data, model_gen, model_disc, disc_optimizer, gen_optimizer, metric_gen, metric_disc):
    bce = tf.keras.losses.BinaryCrossentropy()
    
    for batch_true in data:
        # Generate fake images
        noise_in_z = tf.random.normal([tf.shape(batch_true)[0], model_gen.latent_dim])

        
        # Compute loss/gradient for discriminator
        with tf.GradientTape() as tape_disc, tf.GradientTape() as tape_gen:          
            batch_gen = model_gen(noise_in_z, training=True)

            fake_pred = model_disc(batch_gen, training=True)
            true_pred = model_disc(batch_true, training=True)

            loss_disc = bce(tf.zeros_like(fake_pred), fake_pred) + bce(tf.ones_like(true_pred), true_pred) 
            loss_gen = bce(tf.ones_like(fake_pred), fake_pred) 

        # Gradient descent
        g_disc = tape_disc.gradient(loss_disc, model_disc.trainable_variables)
        g_gen = tape_gen.gradient(loss_gen, model_gen.trainable_variables)

        disc_optimizer.apply_gradients(zip(g_disc, model_disc.trainable_variables))
        gen_optimizer.apply_gradients(zip(g_gen, model_gen.trainable_variables))
        
        # Save mean loss values
        metric_gen.update_state(loss_gen)
        metric_disc.update_state(loss_disc)

def evaluation_step_GAN(data, model_gen, model_disc, latent_vectors, num_image=1):
    '''
    Plot generated image next to real image side by side
    '''
     
    for batch_img_true in data:
        if tf.shape(batch_img_true)[0] < num_image:
            print('NUM_IMAGE SHOULD BE UNDER BATCH SIZE')
            pass

        # iterate over num_image img
        for i,img_true in enumerate(batch_img_true[:num_image]): 
            noise_in_z = latent_vectors
            img_gen = model_gen(noise_in_z)
            return img_gen[0,:,:,:]

In [14]:
import time

class Timer():
    def __init__(self):
        self._start_time = None

    def start(self):
        if self._start_time is not None:
            print(f"Timer is running. Use .stop() to stop it")
            return None

        self._start_time = time.perf_counter()

    def stop(self):
        if self._start_time is None:
            print(f"Timer is not running. Use .start() to start it")
            return 0
    
        elapsed_time = time.perf_counter() - self._start_time
        self._start_time = None
        return elapsed_time

In [24]:
# Hyperparameters
epochs = 25
learning_rate_disc = 0.0001
learning_rate_gen = 0.0001
latent_dim = 100

tf.keras.backend.clear_session() #clear session from previous models
timer = Timer() # Instantiate the timer

# Instiante models
generator = Generator()
discriminator = Discriminator()
generator.build((None,latent_dim))
discriminator.build((None,28,28,1))
generator.summary()
discriminator.summary()

# Instantiate optimizer
adam_disc = tf.keras.optimizers.Adam(learning_rate_disc)
adam_gen = tf.keras.optimizers.Adam(learning_rate_gen)

Model: "generator"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 dense (Dense)               multiple                  316736    
                                                                 
 batch_normalization (BatchN  multiple                 12544     
 ormalization)                                                   
                                                                 
 activation (Activation)     multiple                  0         
                                                                 
 reshape (Reshape)           multiple                  0         
                                                                 
 conv2d_transpose (Conv2DTra  multiple                 18464     
 nspose)                                                         
                                                                 
 batch_normalization_1 (Batc  multiple                 12

In [25]:
# take mean over different data points in the training loop
train_loss_gen = tf.keras.metrics.Mean('generator')
train_loss_disc = tf.keras.metrics.Mean('discriminator')

# initialize the logger for Tensorboard visualization
current_time = datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
train_log_dir = 'logs/gradient_tape/' + current_time + '/train_GAN'      # defining the log dir
train_summary_writer = tf.summary.create_file_writer(train_log_dir)  # training logger

# Initialize lists for later visualization.
train_losses_disc = []
train_losses_gen = []
times = []

In [26]:
latent_vectors = tf.expand_dims(tf.random.normal([generator.latent_dim]),0)

for epoch in range(epochs):
    print(f'\n[EPOCH] ____________________{epoch}____________________')
    
    # Training
    timer.start()
    training_step_GAN(train_ds, generator, discriminator, adam_disc, adam_gen,  train_loss_disc, train_loss_gen)
    
    # logging our metrics to a file which is used by tensorboard
    with train_summary_writer.as_default():     
        tf.summary.scalar('discriminator', train_loss_disc.result(), step=epoch)
        tf.summary.scalar('generator', train_loss_gen.result(), step=epoch)
    
    # log losses and reset logging metrics
    ld = train_loss_disc.result()    
    lg = train_loss_gen.result()

    train_loss_disc.reset_states()
    train_loss_gen.reset_states()
    
    print(ld, lg)
    
    # time and print progress
    elapsed_time = timer.stop()
    times.append(elapsed_time)
    print(f'[{epoch}] - Finished Epoch in {elapsed_time:0.2f} seconds - train_loss_disc: {ld:0.4f}, train_loss_gen: {lg:0.4f}')
    
    # Test (visualize)
    timer.start()


    generated_image = evaluation_step_GAN(test_ds, generator, discriminator, latent_vectors)
    plt.imshow(generated_image.numpy().reshape(28,28),cmap= "gray")
    elapsed_time = timer.stop()
    times.append(elapsed_time)
    
    plt.show()
  
    # Print progress
    if epoch%3 == 0:
        print(f'\n[INFO] - Total time elapsed: {np.sum(times)/60:0.4f} min. Total time remaining: {(np.sum(times)/(epoch+1))*(epochs-epoch-1)/60:0.4f} min.')

print(f'[INFO] - Total run time: {np.sum(times)/60:0.4f} min.')


[EPOCH] ____________________0____________________


ValueError: ignored

In [None]:
%reload_ext tensorboard
%tensorboard --logdir logs/gradient_tape