<a href="https://colab.research.google.com/github/RhysComissiong/Python-Machine-Learning-Tutorial/blob/master/GANs_ch_17.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
! pip install -q tensorflow-gpu==2.1.0

[K     |██████████████████████████▋     | 350.2 MB 40.6 MB/s eta 0:00:02
[31mERROR: Operation cancelled by user[0m
[?25hTraceback (most recent call last):
  File "/usr/local/lib/python3.7/dist-packages/pip/_internal/cli/base_command.py", line 180, in _main
    status = self.run(options, args)
  File "/usr/local/lib/python3.7/dist-packages/pip/_internal/cli/req_command.py", line 199, in wrapper
    return func(self, options, args)
  File "/usr/local/lib/python3.7/dist-packages/pip/_internal/commands/install.py", line 319, in run
    reqs, check_supported_wheels=not options.target_dir
  File "/usr/local/lib/python3.7/dist-packages/pip/_internal/resolution/resolvelib/resolver.py", line 128, in resolve
    requirements, max_rounds=try_to_avoid_resolution_too_deep
  File "/usr/local/lib/python3.7/dist-packages/pip/_vendor/resolvelib/resolvers.py", line 473, in resolve
    state = resolution.resolve(requirements, max_rounds=max_rounds)
  File "/usr/local/lib/python3.7/dist-packages/pip/_

In [2]:
import tensorflow as tf
print(tf.__version__)

2.7.0


In [3]:
print("GPU Available:", tf.test.is_gpu_available())

Instructions for updating:
Use `tf.config.list_physical_devices('GPU')` instead.
GPU Available: True


In [4]:
if tf.test.is_gpu_available():
    device_name = tf.test.gpu_device_name()

else:
    device_name = '/CPU:0'

In [5]:
print(device_name)

/device:GPU:0


In [6]:
from google.colab import drive

drive.mount('/content/drive')

MessageError: ignored

In [None]:
import tensorflow as tf
import tensorflow_datasets as tfds
import numpy as np
import matplotlib.pyplot as plt

In [None]:
## define a function for the generator:
def make_generator_network(
      num_hidden_layers=1,
      num_hidden_units=100,
      num_output_units=784):
  
  model = tf.keras.Sequential()
  for i in range(num_hidden_layers):
    model.add(
        tf.keras.layers.Dense(units=num_hidden_units, use_bias=False))
    model.add(tf.keras.layers.LeakyReLU())
    model.add(tf.keras.layers.Dense(units=num_output_units, activation='tanh'))
  return model

In [None]:
## define a function for the discriminator:
def make_discriminator_network(
      num_hidden_layers=1,
      num_hidden_units=100,
      num_output_units=1):
  
  model = tf.keras.Sequential()
  for i in range(num_hidden_layers):
    model.add(tf.keras.layers.Dense(units=num_hidden_units))
    model.add(tf.keras.layers.LeakyReLU())
    model.add(tf.keras.layers.Dropout(rate=0.5))

  model.add(
      tf.keras.layers.Dense(
          units=num_output_units, activation=None))
  return model

In [None]:
image_size = (28,28)
z_size = 20
mode_z = 'uniform' # 'uniform' vs 'normal'
gen_hidden_layers=1
gen_hidden_size=100
disc_hidden_layers=1
disc_hidden_size=100

In [None]:
tf.random.set_seed(1)

In [None]:
gen_model = make_generator_network(
    num_hidden_layers=gen_hidden_layers,
    num_hidden_units=gen_hidden_size,
    num_output_units=np.prod(image_size))

In [None]:
gen_model.build(input_shape=(None, z_size))
gen_model.summary()

In [None]:
disc_model = make_discriminator_network(
    num_hidden_layers=disc_hidden_layers,
    num_hidden_units=disc_hidden_size)

In [None]:
disc_model.build(input_shape=(None, np.prod(image_size)))
disc_model.summary()

In [None]:
mnist_bldr = tfds.builder('mnist')
mnist_bldr.download_and_prepare()
mnist = mnist_bldr.as_dataset(shuffle_files=False)

In [None]:
def preprocess(ex, mode='uniform'):
  image = ex['image']
  image = tf.image.convert_image_dtype(image, tf.float32)
  image = tf.reshape(image, [-1])
  image = image*2 - 1.0
  if mode == 'uniform':
    input_z = tf.random.uniform(
        shape=(z_size,), minval=-1.0, maxval=1.0)
  elif mode == 'normal':
    input_z = tf.random.normal(shape=(z_size,))
  return input_z, image

In [None]:
mnist_trainset = mnist['train']
mnist_trainset = mnist_trainset.map(preprocess)

In [None]:
mnist_trainset = mnist_trainset.batch(32, drop_remainder=True)
input_z, input_real = next(iter(mnist_trainset))
print('input-z -- shape:    ', input_z.shape)
print('input-real -- shape:', input_real.shape)

In [None]:
g_output = gen_model(input_z)
print('Output of G -- shape:', g_output.shape)

In [None]:
d_logits_real = disc_model(input_real)
d_logits_fake = disc_model(g_output)
print('Disc. (real) -- shape:', d_logits_real.shape)
print('Disc. (fake) -- shape:', d_logits_fake.shape)

**Training the GAN model**

In [None]:
loss_fn = tf.keras.losses.BinaryCrossentropy(from_logits=True)

## Loss for the Generator
g_labels_real = tf.ones_like(d_logits_fake)
g_loss = loss_fn(y_true=g_labels_real, y_pred=d_logits_fake)
print('Generator Loss: {:.4f}'.format(g_loss))

In [None]:
## Loss for the Discriminator
d_labels_real = tf.ones_like(d_logits_real)
d_labels_fake = tf.zeros_like(d_logits_fake)

d_loss_real = loss_fn(y_true=d_labels_real,
                      y_pred=d_logits_real)

d_loss_fake = loss_fn(y_true=d_labels_fake,
                      y_pred=d_logits_fake)

print('Discriminator Losses: Real {:.4f} Fake {:.4f}'.format(d_loss_real.numpy(), d_loss_fake.numpy()))

In [None]:
import time
num_epochs = 100
batch_size = 64
image_size = (28, 28)
z_size = 20
mode_z = 'uniform'
gen_hidden_layers = 1
gen_hidden_size = 100
disc_hidden_layers = 1
disc_hidden_size = 100

tf.random.set_seed(1)
np.random.seed(1)

if mode_z == 'uniform':
  fixed_z = tf.random.uniform(
      shape=(batch_size, z_size),
      minval=-1, maxval=1)
elif mode_z == 'normal':
  fixed_z = tf.random.normal(
    shape=(batch_size, z_size))
  
def create_samples(gmodel, input_z):
  g_output = g_model(input_z, training=False)
  images = tf.reshape(g_output, (batch_size, *image_size))
  return (images+1)/2.0

## Set-up the dataset
mnist_trainset = mnist['train']
mnist_trainset = mnist_trainset.map(
    lambda ex: preprocess(ex, mode=mode_z)
)

mnist_trainset = mnist_trainset.shuffle(10000)
mnist_trainset = mnist_trainset.batch(batch_size, drop_remainder=True)

## Set-up the model
with tf.device(device_name):
  gen_model = make_generator_network(
      num_hidden_layers=gen_hidden_layers,
      num_hidden_units=gen_hidden_size,
      num_output_units=np.prod(image_size))
  gen_model.build(input_shape=(None, z_size))

  disc_model = make_discriminator_network(
      num_hidden_layers=disc_hidden_layers,
      num_hidden_units=disc_hidden_size)
  disc_model.build(input_shape=(None, np.prod(image_size)))

## Loss function and optimizers:
loss_fn = tf.keras.losses.BinaryCrossentropy(from_logits=True)
g_optimizer = tf.keras.optimizers.Adam()
d_optimizer = tf.keras.optimizers.Adam()

all_losses = []
all_d_vals = []
epoch_samples = []

start_time = time.time()
for epoch in range(1, num_epochs+1):

  epoch_losses, epoch_d_vals = [], []

  for i, (input_z, input_real) in enumerate(mnist_trainset):

    ## Compute generator's loss
    with tf.GradientTape() as g_tape:
      g_output = gen_model(input_z)
      d_logits_fake = disc_model(g_output,
                                 training=True)
      labels_real = tf.ones_like(d_logits_fake)
      g_loss = loss_fn(y_true=labels_real,
                       y_pred=d_logits_fake)
      
    ## Compute the gradients of g_loss
    g_grads = g_tape.gradient(g_loss,
                              gen_model.trainable_variables)
    
    ## Optimization: Apply the gradients
    g_optimizer.apply_gradients(
        grads_and_vars=zip(g_grads,gen_model.trainable_variables)
    )

    ## Compute discriminator's loss
    with tf.GradientTape() as d_tape:
      d_logits_real = disc_model(input_real, training=True)

      d_labels_real = tf.ones_like(d_logits_real)

      d_loss_real = loss_fn(y_true=d_labels_real, y_pred=d_logits_real)

      d_logits_fake = disc_model(g_output, training=True)
      d_labels_fake = tf.zeros_like(d_logits_fake)

      d_loss_fake = loss_fn(y_true=d_labels_fake, y_pred=d_logits_fake)
      d_loss = d_loss_real + d_loss_fake

    ## Compute the gradients of d_loss
    d_grads = d_tape.gradient(d_loss, disc_model.trainable_variables)

    ## Optimization: Apply the gradients
    d_optimizer.apply_gradients(
        grads_and_vars=zip(d_grads, disc_model.trainable_variables)
    )

    epoch_losses.append(
        (g_loss.numpy(), d_loss.numpy(),
         d_loss_real.numpy(), d_loss_fake.numpy())
    )

    d_probs_real = tf.reduce_mean(tf.sigmoid(d_logits_real))
    d_probs_fake = tf.reduce_mean(tf.sigmoid(d_logits_fake))
    epoch_d_vals.append((d_probs_real.numpy(), d_probs_fake.numpy()))
    all_losses.append(epoch_losses)
    all_d_vals.append(epoch_d_vals)
    print(
        'Epoch {:03d} | ET {:.2f} min | Avg Losses >>'
        ' G/D {:.4f}/{:.4f} [D-Real: {:.4f} D-Fake: {:.4f}]'
        .format(
            epoch, (time.time() - start_time)/60,
            *list(np.mean(all_losses[-1], axis=0))))
    epoch_samples.append(
        create_samples(gen_model, fixed_z).numpy())

In [None]:
import itertools

fig = plt.figure(figsize=(16, 6))

## Plotting the losses
ax = fig.add_subplot(1,2,1)
g_losses = [item[0] for item in itertools.chain(*all_losses)]
d_losses = [item[1]/2.0 for item in itertools.chain(*all_losses)]
plt.plot(g_losses, label='Generator loss', alpha=0.95)
plt.plot(d_losses, label='Discriminator loss', alpha=0.95)
plt.legend(fontsize=20)

ax.set_xlabel('Iteration', size=15)
ax.set_ylabel('Loss', size=15)

epochs = np.arange(1, 101)
epoch2iter = lambda e: e*len(all_losses[-1])
epoch_ticks = [1, 20, 40, 60, 80, 100]
newpos = [epoch2iter(e) for e in epoch_ticks]
ax2 = ax.twiny()
ax2.set_xticks(newpos)
ax2.set_xticklabels(epoch_ticks)
ax2.xaxis.set_ticks_position('bottom')
ax2.xaxis.set_label_position('bottom')
ax2.spines['bottom'].set_position(('outward', 60))
ax2.set_xlabel('Epoch', size=15)
ax2.set_xlim(ax.get_xlim())
ax.tick_params(axis='both', which='major', labelsize=15)
ax2.tick_params(axis='both', which='major', labelsize=15)

## Plotting the outputs of the discriminator
ax = fig.add_subplot(1, 2, 2)
d_vals_real = [item[0] for item in itertools.chain(*all_d_vals)]
d_vals_fake = [item[1] for item in itertools.chain(*all_d_vals)]
plt.plot(d_vals_real, alpha=0.75, label=r'Real: $D(\mathbf{x})$')
plt.plot(d_vals_fake, alpha=0.75, label=r'Fake: $D(G(\mathbf{z}))$')
plt.legend(fontsize=20)
ax.set_xlabel('Iteration', size=15)
ax.set_ylabel('Discriminator output', size=15)

ax2 = ax.twiny()
ax2.set_xticks(newpos)
ax2.set_xticklabels(epoch_ticks)
ax2.xaxis.set_ticks_position('bottom')
ax2.xaxis.set_label_position('bottom')
ax2.spines['bottom'].set_position(('outward', 60))
ax2.set_xlabel('Epoch', size=15)
ax2.set_xlim(ax.get_xlim())
ax.tick_params(axis='both', which='major', labelsize=15)
ax2.tick_params(axis='both', which='major', labelsize=15)
plt.show()

In [None]:
selected_epochs = [1, 2, 4, 10, 50, 100]
fig = plt.figure(figsize=(10, 14))
for i,e in enumerate(selected_epochs):
  for j in range(5):
    ax = fig.add_subplot(6, 5, i*5+j+1)
    ax.set_xticks([])
    ax.set_yticks([])
    if j == 0:
      ax.text(
          -0.06, 0.5, 'Epoch {}'.format(e),
          rotation=90, size=18, color='red',
          horizontalalignment='right',
          verticalalignment='center',
          transform=ax.transAxes
      )

    image = epoch_samples[e-1][j]
    ax.imshow(image, cmap='gray_r')

In [None]:
plt.show()

In [None]:
def make_dcgan_generator(
    z_size=20,
    output_size=(28, 28, 1),
    n_filters=128,
    n_blocks=2):
  size_factor = 2**n_blocks
  hidden_size=(output_size[0]//size_factor,
               output_size[1]//size_factor)
  
  model = tf.keras.Sequential([
      tf.keras.layers.Input(shape=(z_size,)),

      tf.keras.layers.Dense(
          units=n_filters*np.prod(hidden_size),
          use_bias=False),
      tf.keras.layers.BatchNormalization(),
      tf.keras.layers.LeakyReLU(),
      tf.keras.layers.Reshape(
          (hidden_size[0], hidden_size[1], n_filters)),

      tf.keras.layers.Conv2DTranspose(
          filters=n_filters, kernel_size=(5, 5),
          strides=(1,1), padding='same', use_bias=False),
      tf.keras.layers.BatchNormalization(),
      tf.keras.layers.LeakyReLU()
  ])

  nf = n_filters
  for i in range(n_blocks):
    nf = nf // 2
    model.add(
        tf.keras.layers.Conv2DTranspose(
            filters=nf, kernel_size=(5,5),
            strides=(2,2), padding='same',
            use_bias=False))
    model.add(tf.keras.layers.BatchNormalization())
    model.add(tf.keras.layers.LeakyReLU())

  model.add(
      tf.keras.layers.Conv2DTranspose(
          filters=output_size[2], kernel_size=(5,5),
          strides=(1,1), padding='same', use_bias=False,
          activation='tanh'))
  return model


In [None]:
def make_dcgan_discriminator(
    input_size=(28, 28, 1),
    n_filters=64,
    n_blocks=2):
  
  model = tf.keras.Sequential([
      tf.keras.layers.Input(shape=input_size),
      tf.keras.layers.Conv2D(filters=n_filters, kernel_size=5, strides=(1, 1), padding='same'),
      tf.keras.layers.BatchNormalization(),
      tf.keras.layers.LeakyReLU()
  ])

  nf = n_filters
  for i in range(n_blocks):
    nf = nf*.2
    model.add(
        tf.keras.layers.Conv2D(
            filters=nf, kernel_size=(5,5),
            strides=(2,2), padding='same'))
    model.add(tf.keras.layers.BatchNormalization())
    model.add(tf.keras.layers.LeakyReLU())
    model.add(tf.keras.layers.Dropout(0.3))

  model.add(
      tf.keras.layers.Conv2D(
          filters=1, kernel_size=(7,7),
          padding='valid'))
  
  model.add(tf.keras.layers.Reshape((1,)))

  return model

In [None]:
mnist_bldr = tfds.builder('mnist')
mnist_bldr.download_and_prepare()
mnist = mnist_bldr.as_dataset(shuffle_files=False)

In [None]:
def preprocess(ex, mode='uniform'):
  image = ex['image']
  image = tf.image.convert_image_dtype(image, tf.float32)

  image = image*2 - 1.0
  if mode == 'uniform':
    input_z = tf.random.uniform(
        shape=(z_size,), minval=-1.0, maxval=1.0
    )
  elif mode == 'normal':
    input_z = tf.random.normal(shape=(z_size,))
  return input_z, image

In [None]:
gen_model = make_dcan_generator()

In [None]:
gen_model.summary()

In [None]:
disc_model = make_dcgan_discriminator()
disc_model.summary()

**Implementing WGAN-GP to train the DCGAN model**

In [None]:
num_epochs = 100
batch_size = 128
image_size = (28, 28)
z_size = 20
mode_x = 'uniform'
lambda_gp = 10.0

tf.random.set_seed(1)
np.random.set_seed(1)

## Set-up the dataset
mnist_trainset = mnist['train']
mnist_trainset = mnist_trainset.map(preprocess)

mnist_trainset = mnist_trainset.shuffle(10000)
mnist_trainset = mnist_trainset.batch(batch_size, drop_remainder=True)

## Set-up the model
with tf.device(device_name):
  gen_model = make_dcgan_generator()
  gen_model.build(input_shape=(None, z_size))

  disc_model = make_dcgan_discriminator()
  disc_model.build(input_shape=(None, np.prod(image_size)))

In [None]:
import time

## Optimizers:
g_optimizer = tf.keras.optimizers.Adam(0.0002)
d_optimizer = tf.keras.optimizers.Adam(0.0002)

if mode_z == 'uniform':
  fixed_z = tf.random.uniform(
      shape=(batch_size, z_size), minval=-1, maxval=1
  )
elif mode_z == 'normal':
  fixed_z = tf.random.normal(shape=(batch_size, z_size))

In [None]:
def create_samples(g_model, input_z):
  g_output = g_model(input_z, training=False)
  images = tf.reshape(g_output, (batch_size, *image_size))
  return (images+1)/2.0

In [None]:
all_losses = []
epoch_samples = []
start_time = time.time()

for epoch in range(1, num_epochs+1):

  epoch_losses = []

  for i, (input_z, input_real) in enumerate(mnist_trainset):
    
    with tf.GradientTape() as d_tape, tf.GradientTape() as g_tape:

      g_output = gen_model(input_z, training=True)

      d_critics_real = disc_model(input_real, training=True)
      d_critics_fake = disc_model(g_output, training=True)

      ## Compute generator's loss:
      g_loss = -tf.math.reduce_mean(d_critics_fake)

      ## compute discriminator's losses:
      d_loss_real = -tf.math.reduce_mean(d_critics_real)
      d_loss_fake = tf.math.reduce_mean(d_critics_fake)
      d_loss = d_loss_real + d_loss_fake

      ## Gradient-penalty:
      with tf.GradientTape() as gp_tape:
        alpha = tf.random.uniform(
            shape=[d_critics_real.shape[0], 1, 1, 1],
            minval=0.0, maxval=1.0
        )
        interpolated = (alpha*input_real + (1-alpha)*g_output)
        gp_tape.watch(interpolated)
        d_critics_intp = disc_model(interpolated)

      grads_intp = gp_tape.gradient(d_critics_intp, [interpolated,])[0]
      grads_intp_l2 = tf.sqrt(tf.reduce_sum(tf.square(grads_intp), axis=[1,2,3]))
      grad_penalty = tf.reduce_mean(tf.square(grads_intp_l2 - 1.0))

      d_loss = d_loss + lambda_gp*grad_penalty

    ## Optimization: Compute the gradients apply them
    d_grads = d_tape.gradient(d_loss, disc_model.trainable_variables)
    d_optimizer.apply_gradients(grads_and_vars=zip(d_grads, disc_model.trainable_variables))

    g_grads = g_tape.gradient(g_loss, gen_model.trainable_variables)
    g_optimizer.apply_gradients(grads_and_vars=zip(g_grads, gen_model.trainable_variables))

    epoch_losses.append(
        (g_loss.numpy(), d_loss.numpy(), 
         d_loss_real.numpy(), d_loss_fake.numpy()))
    
    all_losses.append(epoch_losses)
    print(
        'Epoch {:03d} | ET {:.2f} min | Avg Losses >>'
        ' G/D {:6.2f}/{:6.2f} [D-Real: {:6.2f} D-Fake: {:6.2f}]'
        .format(
            epoch, (time.time() - start_time)/60,
            *list(np.mean(all_losses[-1], axis=0))))
    epoch_samples.append(
        create_samples(gen_model, fixed_z).numpy())

In [None]:
selected_epochs = [1, 2, 4, 10, 50, 100]
fig = plt.figure(figsize=(10,14))
for i,e in enumerate(selected_epochs):
  for j in range(5):
    ax = fig.add_subplot(6, 5, i*5+j+1)
    ax.set_xticks([])
    ax.set_yticks([])
    if j == 0:
      ax.text(-0.06, 0.5, 'Epoch {}'.format(e),
              rotation=90, size=18, color='red',
              horizontalalignment='right',
              verticalalignment='center',
              transform=ax.transAxes)
      
      image = epoch_samples[e-1][j]
      ax.imshow(image, cmap='gray_r')

In [None]:
plt.show()