# Introduction

This notebook will explore using GAN to to turn regular images into those with a Monet style of painting. The data consists of 300 monet paintings to learn from and 7000 images to generate into fake money style images.

In [21]:
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import tensorflow_addons as tfa
import matplotlib.pyplot as plt
import numpy as np
import os
import re

try:
    tpu = tf.distribute.cluster_resolver.TPUClusterResolver()
    print('Device:', tpu.master())
    tf.config.experimental_connect_to_cluster(tpu)
    tf.tpu.experimental.initialize_tpu_system(tpu)
    strategy = tf.distribute.experimental.TPUStrategy(tpu)
except:
    strategy = tf.distribute.get_strategy()
print('Number of replicas:', strategy.num_replicas_in_sync)

AUTOTUNE = tf.data.experimental.AUTOTUNE

Number of replicas: 1


In [22]:
monet_filenames = os.listdir(r'C:\Users\Vedhik\Documents\Master of Data Science\Intro to deep learning\gan\monet_tfrec')
photo_filenames = os.listdir(r'C:\Users\Vedhik\Documents\Master of Data Science\Intro to deep learning\gan\photo_tfrec')

def count_data_items(filenames):
    n = [int(re.compile(r"-([0-9]*)\.").search(filename).group(1)) for filename in filenames]
    return np.sum(n)

n_monet_samples = count_data_items(monet_filenames)
n_photo_samples = count_data_items(photo_filenames)

BATCH_SIZE =  4
EPOCHS_NUM = 30
AUTO = tf.data.experimental.AUTOTUNE

In [23]:
IMAGE_SIZE = [256, 256]
def decode_image(image):
    image = tf.image.decode_jpeg(image, channels=3)
    image = (tf.cast(image, tf.float32) / 127.5) - 1
    image = tf.reshape(image, [*IMAGE_SIZE, 3])
    return image

def read_tfrecord(example):
    tfrecord_format = {
        "image_name": tf.io.FixedLenFeature([], tf.string),
        "image": tf.io.FixedLenFeature([], tf.string),
        "target": tf.io.FixedLenFeature([], tf.string)
    }
    example = tf.io.parse_single_example(example, tfrecord_format)
    image = decode_image(example['image'])
    return image

def load_dataset(filenames, labeled=True, ordered=False):
    dataset = tf.data.TFRecordDataset(filenames)
    dataset = dataset.map(read_tfrecord, num_parallel_calls=AUTOTUNE)
    return dataset

In [24]:
monet_ds = load_dataset(monet_filenames, labeled=True).batch(1)
photo_ds = load_dataset(photo_filenames, labeled=True).batch(1)

In [25]:
OUTPUT_CHANNELS = 3


def downsample(filters, size, normalize=True):
    intializer = tf.random_normal_initializer(0., .02)
    model = tf.keras.Sequential()
    model.add(keras.layers.Conv2D(filters, size, strides=2, padding='same', kernel_initializer=intializer, use_bias=False))
    if normalize:
        model.add(tfa.layers.InstanceNormalization(gamma_initializer=intializer))
    
    model.add(keras.layers.LeakyReLU(alpha=0.2))
    return model

In [26]:
def upsampling(filters, size, dropout=False):
    initializer = tf.random_normal_initializer(0., .02)
    model = tf.keras.Sequential()
    model.add(
    keras.layers.Conv2DTranspose(filters, size, strides=2,padding='same', kernel_initializer=initializer,use_bias=False))

    model.add(tfa.layers.InstanceNormalization(gamma_initializer=initializer))
    if dropout:
        model.add(keras.layers.Dropout(.5))

    model.add(keras.layers.ReLU())
    return model

# Generator

In [27]:
def Generator():
    inputs = keras.layers.Input(shape=[256, 256, 3])
    down_sampling = [
        downsample(64, 4, normalize=False),
        downsample(128, 4,),
        downsample(256, 4,),
        downsample(512, 4,),
        downsample(512, 4,),
    ]
    up_sampling = [
        upsampling(512, 4, dropout=True),
        upsampling(512, 4, dropout=True),
        upsampling(256, 4, dropout=False),
        upsampling(128, 4, dropout=False),
        upsampling(64, 4, dropout=False),
    ]
    
    initializer = tf.random_normal_initializer(0, .02)
    last_layer = keras.layers.Conv2DTranspose(filters=3, kernel_size=4, strides=2, padding="same")
    output_image = keras.layers.Activation("tanh")
    
    x = inputs
    skips = []
    for down in down_sampling:
        x = down(x)
        skips.append(x)

    skips = reversed(skips[:-1])
    for up, skip in zip(up_sampling, skips):
        x = up(x)
        x = keras.layers.Concatenate()([x, skip])

    x = output_image(last_layer(x))

    return keras.Model(inputs=inputs, outputs=x)

# Discriminator

In [28]:
def Discriminator():
    initializer = tf.random_normal_initializer(0., .02)
    gamma_init = keras.initializers.RandomNormal(mean=.0, stddev=.02)

    inp = keras.layers.Input(shape=[256, 256, 3], name='input_image')

    x = inp
    down1 = downsample(64, 4, False)(x) 
    down2 = downsample(128, 4)(down1) 
    down3 = downsample(256, 4)(down2)

    zero_pad1 = keras.layers.ZeroPadding2D()(down3)
    conv = keras.layers.Conv2D(512, 4, strides=1,
                         kernel_initializer=initializer,
                         use_bias=False)(zero_pad1) 

    norm1 = tfa.layers.InstanceNormalization(gamma_initializer=gamma_init)(conv)

    leaky_relu = keras.layers.LeakyReLU()(norm1)

    zero_pad2 = keras.layers.ZeroPadding2D()(leaky_relu) 

    last = keras.layers.Conv2D(1, 4, strides=1, kernel_initializer=initializer)(zero_pad2)

    return keras.Model(inputs=inp, outputs=last)

In [29]:

monet_generator = Generator() 
photo_generator = Generator() 

monet_discriminator = Discriminator() 
photo_discriminator = Discriminator() 

# Model

In [30]:
class CycleGAN(keras.Model):
    def __init__(self, monet_generator, photo_generator,
                 monet_discriminator, photo_discriminator,
                 lambda_cycle=10):
        super(CycleGAN, self).__init__()
        self.monet_generator = monet_generator
        self.photo_generator = photo_generator
        self.monet_discriminator = monet_discriminator
        self.photo_discriminator = photo_discriminator
        self.lambda_cycle = lambda_cycle
        
    def compile(self, monet_gen_optimizer, photo_gen_optimizer,
                monet_disc_optimizer, photo_disc_optimizer,
                generator_loss_fn, discriminator_loss_fn,
                cycle_loss_fn, identity_loss_fn):
        super(CycleGAN, self).compile()
        self.monet_gen_optimizer = monet_gen_optimizer
        self.photo_gen_optimizer = photo_gen_optimizer
        self.monet_disc_optimizer = monet_disc_optimizer
        self.photo_disc_optimizer = photo_disc_optimizer
        self.generator_loss_fn = generator_loss_fn
        self.discriminator_loss_fn = discriminator_loss_fn
        self.cycle_loss_fn = cycle_loss_fn
        self.identity_loss_fn = identity_loss_fn
        
    def train_step(self, batch_data):
        real_monet, real_photo = batch_data
        
        with tf.GradientTape(persistent=True) as tape:
            # Photo to Monet back to Photo
            fake_monet = self.monet_generator(real_photo, training=True)
            cycled_photo = self.photo_generator(fake_monet, training=True)

            # Monet to Photo back to Monet
            fake_photo = self.photo_generator(real_monet, training=True)
            cycled_monet = self.monet_generator(fake_photo, training=True)

            # Generating itself
            same_monet = self.monet_generator(real_monet, training=True)
            same_photo = self.photo_generator(real_photo, training=True)

            # Discriminator for real images
            disc_real_monet = self.monet_discriminator(real_monet, training=True)
            disc_real_photo = self.photo_discriminator(real_photo, training=True)

            # Discriminator for fake images
            disc_fake_monet = self.monet_discriminator(fake_monet, training=True)
            disc_fake_photo = self.photo_discriminator(fake_photo, training=True)

            # Generator loss
            monet_gen_loss = self.generator_loss_fn(disc_fake_monet)
            photo_gen_loss = self.generator_loss_fn(disc_fake_photo)

            # Total cycle consistency loss
            total_cycle_loss = self.cycle_loss_fn(real_monet, cycled_monet, self.lambda_cycle) + self.cycle_loss_fn(real_photo, cycled_photo, self.lambda_cycle)

            # Total generator loss
            total_monet_gen_loss = monet_gen_loss + total_cycle_loss + self.identity_loss_fn(real_monet, same_monet, self.lambda_cycle)
            total_photo_gen_loss = photo_gen_loss + total_cycle_loss + self.identity_loss_fn(real_photo, same_photo, self.lambda_cycle)

            # Discriminator loss
            monet_disc_loss = self.discriminator_loss_fn(disc_real_monet, disc_fake_monet)
            photo_disc_loss = self.discriminator_loss_fn(disc_real_photo, disc_fake_photo)

        # Calculate gradients
        monet_gen_gradients = tape.gradient(total_monet_gen_loss, self.monet_generator.trainable_variables)
        photo_gen_gradients = tape.gradient(total_photo_gen_loss, self.photo_generator.trainable_variables)
        monet_disc_gradients = tape.gradient(monet_disc_loss, self.monet_discriminator.trainable_variables)
        photo_disc_gradients = tape.gradient(photo_disc_loss, self.photo_discriminator.trainable_variables)

        # Apply gradients
        self.monet_gen_optimizer.apply_gradients(zip(monet_gen_gradients, self.monet_generator.trainable_variables))
        self.photo_gen_optimizer.apply_gradients(zip(photo_gen_gradients, self.photo_generator.trainable_variables))
        self.monet_disc_optimizer.apply_gradients(zip(monet_disc_gradients, self.monet_discriminator.trainable_variables))
        self.photo_disc_optimizer.apply_gradients(zip(photo_disc_gradients, self.photo_discriminator.trainable_variables))
        
        return {
            "monet_gen_loss": total_monet_gen_loss,
            "photo_gen_loss": total_photo_gen_loss,
            "monet_disc_loss": monet_disc_loss,
            "photo_disc_loss": photo_disc_loss
        }

# Loss functions

In [31]:

bce_loss = tf.keras.losses.BinaryCrossentropy(from_logits=True, reduction=tf.keras.losses.Reduction.NONE)

def discriminator_loss(real, generated):
    real_loss = bce_loss(tf.ones_like(real), real)
    generated_loss = bce_loss(tf.zeros_like(generated), generated)
    total_disc_loss = real_loss + generated_loss
    return total_disc_loss * 0.5

def generator_loss(generated):
    return bce_loss(tf.ones_like(generated), generated)

def calc_cycle_loss(real_image, cycled_image, LAMBDA):
    loss1 = tf.reduce_mean(tf.abs(real_image - cycled_image))
    return LAMBDA * loss1

def identity_loss(real_image, same_image, LAMBDA):
    loss = tf.reduce_mean(tf.abs(real_image - same_image))
    return LAMBDA * 0.5 * loss

# Apply strategy.scope() to the defined loss functions
with strategy.scope():
    discriminator_loss = tf.function(discriminator_loss)
    generator_loss = tf.function(generator_loss)
    calc_cycle_loss = tf.function(calc_cycle_loss)
    identity_loss = tf.function(identity_loss)

In [36]:
with strategy.scope():
    monet_generator_optimizer = tf.keras.optimizers.Adam(2e-4, beta_1=0.5)
    photo_generator_optimizer = tf.keras.optimizers.Adam(2e-4, beta_1=0.5)

    monet_discriminator_optimizer = tf.keras.optimizers.Adam(2e-4, beta_1=0.5)
    photo_discriminator_optimizer = tf.keras.optimizers.Adam(2e-4, beta_1=0.5)

with strategy.scope():
    cycle_gan_model = CycleGan(
        monet_generator, photo_generator, monet_discriminator, photo_discriminator
    )

    cycle_gan_model.compile(
        m_gen_optimizer = monet_generator_optimizer,
        p_gen_optimizer = photo_generator_optimizer,
        m_disc_optimizer = monet_discriminator_optimizer,
        p_disc_optimizer = photo_discriminator_optimizer,
        gen_loss_fn = generator_loss,
        disc_loss_fn = discriminator_loss,
        cycle_loss_fn = calc_cycle_loss,
        identity_loss_fn = identity_loss
    )


    
    
cycle_gan_model.fit(
    tf.data.Dataset.zip((monet_ds, photo_ds)),
    epochs=25
)

Epoch 1/25


NotFoundError: Graph execution error:

Detected at node 'IteratorGetNext' defined at (most recent call last):
    File "C:\Users\Vedhik\AppData\Local\Programs\Python\Python310\lib\runpy.py", line 196, in _run_module_as_main
      return _run_code(code, main_globals, None,
    File "C:\Users\Vedhik\AppData\Local\Programs\Python\Python310\lib\runpy.py", line 86, in _run_code
      exec(code, run_globals)
    File "C:\Users\Vedhik\AppData\Local\Programs\Python\Python310\lib\site-packages\ipykernel_launcher.py", line 16, in <module>
      app.launch_new_instance()
    File "C:\Users\Vedhik\AppData\Local\Programs\Python\Python310\lib\site-packages\traitlets\config\application.py", line 1043, in launch_instance
      app.start()
    File "C:\Users\Vedhik\AppData\Local\Programs\Python\Python310\lib\site-packages\ipykernel\kernelapp.py", line 677, in start
      self.io_loop.start()
    File "C:\Users\Vedhik\AppData\Local\Programs\Python\Python310\lib\site-packages\tornado\platform\asyncio.py", line 215, in start
      self.asyncio_loop.run_forever()
    File "C:\Users\Vedhik\AppData\Local\Programs\Python\Python310\lib\asyncio\base_events.py", line 595, in run_forever
      self._run_once()
    File "C:\Users\Vedhik\AppData\Local\Programs\Python\Python310\lib\asyncio\base_events.py", line 1881, in _run_once
      handle._run()
    File "C:\Users\Vedhik\AppData\Local\Programs\Python\Python310\lib\asyncio\events.py", line 80, in _run
      self._context.run(self._callback, *self._args)
    File "C:\Users\Vedhik\AppData\Local\Programs\Python\Python310\lib\site-packages\ipykernel\kernelbase.py", line 457, in dispatch_queue
      await self.process_one()
    File "C:\Users\Vedhik\AppData\Local\Programs\Python\Python310\lib\site-packages\ipykernel\kernelbase.py", line 446, in process_one
      await dispatch(*args)
    File "C:\Users\Vedhik\AppData\Local\Programs\Python\Python310\lib\site-packages\ipykernel\kernelbase.py", line 353, in dispatch_shell
      await result
    File "C:\Users\Vedhik\AppData\Local\Programs\Python\Python310\lib\site-packages\ipykernel\kernelbase.py", line 648, in execute_request
      reply_content = await reply_content
    File "C:\Users\Vedhik\AppData\Local\Programs\Python\Python310\lib\site-packages\ipykernel\ipkernel.py", line 353, in do_execute
      res = shell.run_cell(code, store_history=store_history, silent=silent)
    File "C:\Users\Vedhik\AppData\Local\Programs\Python\Python310\lib\site-packages\ipykernel\zmqshell.py", line 533, in run_cell
      return super(ZMQInteractiveShell, self).run_cell(*args, **kwargs)
    File "C:\Users\Vedhik\AppData\Local\Programs\Python\Python310\lib\site-packages\IPython\core\interactiveshell.py", line 2914, in run_cell
      result = self._run_cell(
    File "C:\Users\Vedhik\AppData\Local\Programs\Python\Python310\lib\site-packages\IPython\core\interactiveshell.py", line 2960, in _run_cell
      return runner(coro)
    File "C:\Users\Vedhik\AppData\Local\Programs\Python\Python310\lib\site-packages\IPython\core\async_helpers.py", line 68, in _pseudo_sync_runner
      coro.send(None)
    File "C:\Users\Vedhik\AppData\Local\Programs\Python\Python310\lib\site-packages\IPython\core\interactiveshell.py", line 3185, in run_cell_async
      has_raised = await self.run_ast_nodes(code_ast.body, cell_name,
    File "C:\Users\Vedhik\AppData\Local\Programs\Python\Python310\lib\site-packages\IPython\core\interactiveshell.py", line 3377, in run_ast_nodes
      if (await self.run_code(code, result,  async_=asy)):
    File "C:\Users\Vedhik\AppData\Local\Programs\Python\Python310\lib\site-packages\IPython\core\interactiveshell.py", line 3457, in run_code
      exec(code_obj, self.user_global_ns, self.user_ns)
    File "C:\Users\Vedhik\AppData\Local\Temp/ipykernel_10328/919252902.py", line 27, in <module>
      cycle_gan_model.fit(
    File "C:\Users\Vedhik\AppData\Local\Programs\Python\Python310\lib\site-packages\keras\utils\traceback_utils.py", line 64, in error_handler
      return fn(*args, **kwargs)
    File "C:\Users\Vedhik\AppData\Local\Programs\Python\Python310\lib\site-packages\keras\engine\training.py", line 1384, in fit
      tmp_logs = self.train_function(iterator)
    File "C:\Users\Vedhik\AppData\Local\Programs\Python\Python310\lib\site-packages\keras\engine\training.py", line 1021, in train_function
      return step_function(self, iterator)
    File "C:\Users\Vedhik\AppData\Local\Programs\Python\Python310\lib\site-packages\keras\engine\training.py", line 1009, in step_function
      data = next(iterator)
Node: 'IteratorGetNext'
NewRandomAccessFile failed to Create/Open: monet00-60.tfrec : The system cannot find the file specified.
; No such file or directory
	 [[{{node IteratorGetNext}}]] [Op:__inference_train_function_132122]

# Discussion/Results

I've built the generator, discriminator, and cyclic gan model with the loss functions; howvewr there is an error when fitting the model. Due to this, I'm unable to have a proper discussion and results. In future iterations, The epochs can be experimented with to see how it affects the results.