1. Data Preparation

In [None]:
!rm -fr *.png
!rm -fr *.gif
!rm -fr *.mp4
import shutil
shutil.rmtree("./training_checkpoints")

In [None]:
# Import necessary libraries

from os import listdir
from matplotlib import image
import cv2
import matplotlib.pyplot as plt
import tensorflow as tf
from tensorflow.keras import layers
import os
import time
import glob
import imageio
import PIL
from IPython import display
from keras.preprocessing.image import img_to_array
from random import randint
import numpy as np

try:
    tpu = tf.distribute.cluster_resolver.GPUClusterResolver()
    print('Device:', gpu.master())
    tf.config.experimental_connect_to_cluster(gpu)
    tf.gpu.experimental.initialize_gpu_system(gpu)
    strategy = tf.distribute.experimental.GPUStrategy(gpu)
except:
    strategy = tf.distribute.get_strategy()
print('Number of replicas:', strategy.num_replicas_in_sync)

AUTOTUNE = tf.data.experimental.AUTOTUNE
    
print(tf.__version__)

In [None]:
# Function for loading data

def load_dataset(image_dir):
    '''
    loads all images from directory
    Input: image_dir = file path for training images
    Output: dataset = images stored as array
    '''
    dataset = []
    for filename in listdir(image_dir):
        img_data = cv2.imread(image_dir + filename) # load image
        dataset.append(img_data) # store loaded image
    
    return dataset

In [None]:
# Load data

image_dir = '/kaggle/input/gan-getting-started/monet_jpg/'
data = load_dataset(image_dir)
print(len(data))

In [None]:
data = np.array(data)
data = data.astype('float32')
data = (data / 127.5) - 1 #normalise values to [-1, 1]

In [None]:
# Take a look at an example

example = data[randint(0, 300)]
# summarize shape of the pixel array
print(example.dtype)
print(example.shape)
# display the array of pixels as an image
plt.imshow(example)
plt.show()

example_corr = (example + 1) * 127.5
example_corr = example_corr.astype(int)
plt.imshow(example_corr)
plt.show()
print('example', example.min(), example.max(), example.mean(), example.std())
print('example_corrected', example_corr.min(), example_corr.max(), example_corr.mean(), example_corr.std())

In [None]:
mean = np.mean(data, 0)
plt.imshow(mean)
plt.show()

std_dev = np.std(data, 0)
plt.imshow(std_dev)
plt.show()

In [None]:
BUFFER_SIZE = 8000
BATCH_SIZE = 32

# Batch and shuffle the data
train_dataset = tf.data.Dataset.from_tensor_slices(data).shuffle(BUFFER_SIZE).batch(BATCH_SIZE)

2. Model Definitions

In [None]:
# Make the generator
# Code adapted from Tensorflow dcgan tutorial

def make_generator():
    model = tf.keras.Sequential()
    model.add(layers.Dense(8*8*128, use_bias=False, input_shape=(256,)))
    model.add(layers.Dropout(0.5))
    model.add(layers.BatchNormalization())
    model.add(layers.LeakyReLU())

    model.add(layers.Reshape((8, 8, 128)))
    assert model.output_shape == (None, 8, 8, 128) # Note: None is the batch size

    model.add(layers.Conv2DTranspose(1024, (4, 4), strides=(1, 1), padding='same', use_bias=False))
    assert model.output_shape == (None, 8, 8, 1024)
    model.add(layers.Dropout(0.5))
    model.add(layers.BatchNormalization())
    model.add(layers.LeakyReLU())

    model.add(layers.Conv2DTranspose(512, (4, 4), strides=(2, 2), padding='same', use_bias=False))
    assert model.output_shape == (None, 16, 16, 512)
    model.add(layers.Dropout(0.5))
    model.add(layers.BatchNormalization())
    model.add(layers.LeakyReLU())
    
    model.add(layers.Conv2DTranspose(512, (4, 4), strides=(2, 2), padding='same', use_bias=False))
    assert model.output_shape == (None, 32, 32, 512)
    model.add(layers.Dropout(0.3))
    model.add(layers.BatchNormalization())
    model.add(layers.LeakyReLU())
    
    model.add(layers.Conv2DTranspose(512, (4, 4), strides=(2, 2), padding='same', use_bias=False))
    assert model.output_shape == (None, 64, 64, 512)
    model.add(layers.Dropout(0.3))
    model.add(layers.BatchNormalization())
    model.add(layers.LeakyReLU())
    
    model.add(layers.Conv2DTranspose(256, (4, 4), strides=(2, 2), padding='same', use_bias=False))
    assert model.output_shape == (None, 128, 128, 256)
    model.add(layers.Dropout(0.3))
    model.add(layers.BatchNormalization())
    model.add(layers.LeakyReLU())
    
    model.add(layers.Conv2DTranspose(3, (4, 4), strides=(2, 2), padding='same', use_bias=False, activation='tanh'))
    assert model.output_shape == (None, 256, 256, 3)

    return model

In [None]:
# Generator sanity check

generator = make_generator()

noise = tf.random.normal([1, 256])
generated_image = generator(noise, training=False)

plt.imshow(generated_image[0, :, :, 0])

In [None]:
# Make the discriminator
# Code adapted from Tensorflow dcgan tutorial

def make_discriminator_model():
    model = tf.keras.Sequential()
    model.add(layers.Conv2D(64, (4, 4), strides=(2, 2), padding='same',
                                     input_shape=[256, 256, 3]))
    model.add(layers.LeakyReLU())
    model.add(layers.Dropout(0.3))

    model.add(layers.Conv2D(128, (4, 4), strides=(2, 2), padding='same'))
    model.add(layers.LeakyReLU())
    model.add(layers.Dropout(0.3))
    
    model.add(layers.Conv2D(256, (4, 4), strides=(2, 2), padding='same'))
    model.add(layers.LeakyReLU())
    model.add(layers.Dropout(0.3))
    
    model.add(layers.Conv2D(512, (4, 4), strides=(1, 1), padding='same'))
    model.add(layers.LeakyReLU())
    model.add(layers.Dropout(0.3))
    
    model.add(layers.Flatten())
    model.add(layers.Dense(1))

    return model

In [None]:
# Discriminator sanity check

discriminator = make_discriminator_model()
decision = discriminator(generated_image)
print (decision)

In [None]:
# Loss functions

# This method returns a helper function to compute cross entropy loss
cross_entropy = tf.keras.losses.BinaryCrossentropy(from_logits=True)

def discriminator_loss(real_output, fake_output):
    real_loss = cross_entropy(tf.ones_like(real_output), real_output)
    fake_loss = cross_entropy(tf.zeros_like(fake_output), fake_output)
    total_loss = real_loss + fake_loss
    return total_loss

def generator_loss(fake_output):
    return cross_entropy(tf.ones_like(fake_output), fake_output)

In [None]:
# Optimisers

generator_optimizer = tf.keras.optimizers.Adam(lr = 1e-4, beta_1=0.5)
discriminator_optimizer = tf.keras.optimizers.Adam(lr = 1e-5, beta_1=0.5)

In [None]:
# Checkpoints

checkpoint_dir = './training_checkpoints'
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt")
checkpoint = tf.train.Checkpoint(generator_optimizer=generator_optimizer,
                                 discriminator_optimizer=discriminator_optimizer,
                                 generator=generator,
                                 discriminator=discriminator)

3. Model Training

In [None]:
EPOCHS = 50
noise_dim = 256
num_examples_to_generate = 1

# We will reuse this seed overtime (so it's easier)
# to visualize progress in the animated GIF)
seed = tf.random.normal([num_examples_to_generate, noise_dim])

In [None]:
# Notice the use of `tf.function`
# This annotation causes the function to be "compiled".
@tf.function
def train_step(images):
    noise = tf.random.normal([BATCH_SIZE, noise_dim])

    with tf.GradientTape() as gen_tape, tf.GradientTape() as disc_tape:
      generated_images = generator(noise, training=True)

      real_output = discriminator(images, training=True)
      fake_output = discriminator(generated_images, training=True)

      gen_loss = generator_loss(fake_output)
      disc_loss = discriminator_loss(real_output, fake_output)

    gradients_of_generator = gen_tape.gradient(gen_loss, generator.trainable_variables)
    gradients_of_discriminator = disc_tape.gradient(disc_loss, discriminator.trainable_variables)

    generator_optimizer.apply_gradients(zip(gradients_of_generator, generator.trainable_variables))
    discriminator_optimizer.apply_gradients(zip(gradients_of_discriminator, discriminator.trainable_variables))
    
    return gen_loss, disc_loss

In [None]:
def train(dataset, epochs):
  for epoch in range(epochs):
    start = time.time()

    for image_batch in dataset:
      gen_loss, disc_loss = train_step(image_batch)

    # Produce images for the GIF as we go
    display.clear_output(wait=True)
    generate_and_save_images(generator,
                             epoch + 1,
                             seed)

    # Save the model every 15 epochs
    if (epoch + 1) % 15 == 0:
      checkpoint.save(file_prefix = checkpoint_prefix)

    print ('Time for epoch {} is {} sec'.format(epoch + 1, time.time()-start))
    print ('gen loss = {}, disc loss = {}'.format(gen_loss, disc_loss))

  # Generate after the final epoch
  display.clear_output(wait=True)
  generate_and_save_images(generator,
                           epochs,
                           seed)

In [None]:
def generate_and_save_images(model, epoch, test_input):
  # Notice `training` is set to False.
  # This is so all layers run in inference mode (batchnorm).
    predictions = model(test_input, training=False)
    predictions = (predictions + 1) * 127.5
    predictions = tf.dtypes.cast(predictions, tf.uint8)
    fig = plt.figure(figsize=(8, 8))

    for i in range(predictions.shape[0]):
     plt.subplot(1, 1, i+1)
     plt.imshow(predictions[i, :, :, :])
     plt.axis('off')
     #predictions = (predictions + 1) * 127.5
     #predictions = tf.dtypes.cast(predictions, tf.uint8)
     #plt.imshow((predictions[0]))
     #plt.axis('off')
    plt.savefig('image_at_epoch_{:04d}.png'.format(epoch), bbox_inches='tight')
    plt.show()

In [None]:
train(train_dataset, EPOCHS)

4. Model Output

In [None]:
checkpoint.restore(tf.train.latest_checkpoint(checkpoint_dir))

# Display a single image using the epoch number
def display_image(epoch_no):
  return PIL.Image.open('image_at_epoch_{:04d}.png'.format(epoch_no))

display_image(EPOCHS)

In [None]:
anim_file = 'dcgan.gif'

with imageio.get_writer(anim_file, mode='I') as writer:
  filenames = glob.glob('image*.png')
  filenames = sorted(filenames)
  for filename in filenames:
    image = imageio.imread(filename)
    writer.append_data(image)
  image = imageio.imread(filename)
  writer.append_data(image)

In [None]:
# To generate GIFs
!pip install git+https://github.com/tensorflow/docs
    

In [None]:
import tensorflow_docs.vis.embed as embed
embed.embed_file(anim_file)

In [None]:
!pip install imageio-ffmpeg
import imageio

In [None]:
from IPython import display as ipythondisplay
import io
import os
import base64
from IPython.display import HTML

def show_video(vid):
  #mp4list = [video
  #if len(mp4list) > 0:
  ext = os.path.splitext(vid)[-1][1:]
  video = io.open(vid, 'r+b').read()
  #encoded = base64.b64encode(video)
  ipythondisplay.display(HTML(data='''<video alt="test" autoplay 
              loop controls style="height: 400px;">
              <source src="data:video/{1}';base64,{0}" type="video/{1}" />
              </video>'''.format(base64.b64encode(video).decode('ascii'), ext)))

In [None]:
z1 = tf.random.normal([1, noise_dim], stddev=2.0)
z2 = tf.random.normal([1, noise_dim], stddev=2.0)

image = generator(z1, training=False)
image = (image + 1) * 127.5
image = tf.dtypes.cast(image, tf.uint8)
plt.imshow(image[0, :, :, :])
plt.axis('off')

In [None]:
z1 = tf.random.normal([1, noise_dim], stddev=4)
z2 = tf.random.normal([1, noise_dim], stddev=4)


number_of_steps = 100

def generate_images(zs):       
    imgs = []
    count = 0
    for z in zs:
        count +=1
        images = generator(z)
        images = (images + 1) * 127.5
        images = tf.dtypes.cast(images, tf.uint8)
        plt.imshow((images[0]))
        plt.axis('off')
        plt.savefig('image_at_z_{:04d}.png'.format(count), bbox_inches='tight')
        imgs.append(images)
    return imgs

def interpolate(zs, steps):
   out = []
   for i in range(len(zs)-1):
    for index in range(steps):
     fraction = index/float(steps) 
     out.append(zs[i+1]*fraction + zs[i]*(1-fraction))
   return out

imgs = generate_images(interpolate([z1,z2],number_of_steps))

# Example of reading a generated set of images, and storing as MP4.
movieName = 'mov.mp4'

with imageio.get_writer(movieName, mode='I') as writer:
  filenames = glob.glob('image_at_z*.png')
  filenames = sorted(filenames)
  for filename in filenames:
    image = imageio.imread(filename)
    writer.append_data(image)
  image = imageio.imread(filename)
  writer.append_data(image)
show_video(movieName)

In [None]:
print(z1, z2)