In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [None]:
#Importing the necessary dependencies
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import tensorflow_addons as tfa
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import re
import os
from PIL import Image
from kaggle_datasets import KaggleDatasets

## Connecting to TPU

In [None]:
try:
    tpu = tf.distribute.cluster_resolver.TPUClusterResolver()
    print('Device:', tpu.master())
    tf.config.experimental_connect_to_cluster(tpu)
    tf.tpu.experimental.initialize_tpu_system(tpu)
    strategy = tf.distribute.experimental.TPUStrategy(tpu)
except:
    strategy = tf.distribute.get_strategy()
print('Number of replicas:', strategy.num_replicas_in_sync)

AUTOTUNE = tf.data.experimental.AUTOTUNE
    
print(tf.__version__)

## Hyperparameters

In [None]:
GCS_PATH = KaggleDatasets().get_gcs_path() #Google cloud storage path
MONET_Filenames = tf.io.gfile.glob(str(GCS_PATH + '/monet_tfrec/*.tfrec'))
TEST_Photo_Filenames = tf.io.gfile.glob(str(GCS_PATH + '/photo_tfrec/*.tfrec'))
IMAGE_SIZE = [256, 256] #original size of image
BATCH_SIZE = 1
OUTPUT_CHANNELS = 3 #RGB 
EPOCHS = 20 #No of iterations for the model
#Note:Higher Epochs give you a better result as the loss value keeps decreasing.
# I have gone for 20 as I have very low computational power

##  Loading Datasets

**Displaying pictures**

In [None]:
_, ax = plt.subplots(3,3, figsize=(8,8))
plt.suptitle('Some images with monet style', fontsize=19, fontweight='bold')

ind = 0 
for i in range(3):
    for j in range(3):
        ax[i][j].imshow(Image.open('../input/gan-getting-started/monet_jpg/'+os.listdir('../input/gan-getting-started/monet_jpg')[ind]))
        ind += 1

In [None]:
_, ax = plt.subplots(3,3, figsize=(8,8))
plt.suptitle('Some images with no monet style', fontsize=19, fontweight='bold')

ind = 0 
for i in range(3):
    for j in range(3):
        ax[i][j].imshow(Image.open('../input/gan-getting-started/photo_jpg/'+os.listdir('../input/gan-getting-started/photo_jpg')[ind]))
        ind += 1

In [None]:
MONET_filenames = tf.io.gfile.glob(str(GCS_PATH + '/monet_tfrec/*.tfrec'))
print(f'{len(MONET_filenames)} Monet TFRecord Files')
Test_Photo_filenames=tf.io.gfile.glob(str(GCS_PATH + '/photo_tfrec/*.tfrec'))
print(f'{len(Test_Photo_filenames)} Test_Photo TFRecord Files')

**Data augmentation**: it a technique to increase the diversity of the training set by applying random (but realistic) transformations, such as image rotation, and it can be done very easily using the API tf.image. <br> To learn more about it check out the official decantation:
https://www.tensorflow.org/tutorials/images/data_augmentation.

In [None]:
def data_augment(image):
    p_spatial = tf.random.uniform([], 0, 1.0, dtype=tf.float32) #generates random uniform distribution
    p_rotate = tf.random.uniform([], 0, 1.0, dtype=tf.float32)

    
    #rotating images
    if p_rotate > .8:
        image = tf.image.rot90(image, k=3)  
    elif p_rotate > .6:
        image = tf.image.rot90(image, k=2) 
    elif p_rotate > .4:
        image = tf.image.rot90(image, k=1)
        
    image = tf.image.random_flip_left_right(image)
    image = tf.image.random_flip_up_down(image)
    #transposing images
    if p_spatial > .75:
        image = tf.image.transpose(image)
    image = tf.image.random_hue(image, 0.01) #hue settings
    image = tf.image.random_saturation(image, 0.70, 1.30) #saturation settings
    image = tf.image.random_contrast(image, 0.80, 1.20) #contrast settings
    image = tf.image.random_brightness(image, 0.10) #brightness settings
    return image

**Displaying images after applying data augmentation**

In [None]:
_, ax = plt.subplots(3,3, figsize=(8,8))
plt.suptitle('Some augmented images with monet style', fontsize=19, fontweight='bold')

ind = 0 
for i in range(3):
    for j in range(3):
        ax[i][j].imshow(data_augment(np.array(Image.open('../input/gan-getting-started/monet_jpg/'+os.listdir('../input/gan-getting-started/monet_jpg')[ind]))))
        ind += 1

In [None]:
_, ax = plt.subplots(3,3, figsize=(8,8))
plt.suptitle('Some images with no monet style', fontsize=19, fontweight='bold')

ind = 0 
for i in range(3):
    for j in range(3):
        ax[i][j].imshow(Image.open('../input/gan-getting-started/photo_jpg/'+os.listdir('../input/gan-getting-started/photo_jpg')[ind]))
        ind += 1

Now let's define a function decode_image that decode a JPEG-encoded image to a uint8 tensor, casts it to a float32, divide it by 127.5 and subtract it by 1, to make the values in the tensor between -1 and 1, and finally, reshape it to (IMAGE_SIZE,IMAGE_SIZE, NUM_CHANNEL)

In [None]:
def decode_image(image):
    image = tf.image.decode_jpeg(image, channels=3)
    image = (tf.cast(image, tf.float32) / 127.5) - 1
    image = tf.reshape(image, [*IMAGE_SIZE, 3])
    return image


Now let's define a function read_tfrecord to Parse a single Example photo.

In [None]:
def read_tfrecord(example):
    tfrecord_format = {
        "image": tf.io.FixedLenFeature([], tf.string),
    }
    example = tf.io.parse_single_example(example, tfrecord_format)
    image = decode_image(example['image'])
    
    return image

Now let's define a function load_dataset to load our dataset.

In [None]:
def load_dataset(filenames):
    dataset = tf.data.TFRecordDataset(filenames)
    dataset = dataset.map(read_tfrecord, num_parallel_calls=AUTOTUNE)
    return dataset

Ddefining a function **get_gan_dataset** that load datasets from monet_files and photo_files, then augments the data with the function that we defined earlier **data_augment** and combines consecutive elements of this dataset into batches, then let's use prefetch to allow later elements to be prepared while the current element is being processed. This improves the latency and throughput, at the cost of using additional memory to store prefetched elements. Finally , let's create our final Dataset by zipping together the given datasets (monet_ds and photo_ds).

In [None]:
monet_ds = load_dataset(MONET_Filenames).batch(1)
photo_ds = load_dataset(TEST_Photo_Filenames).batch(1)
def get_gan_dataset(monet_files, photo_files, batch_size=BATCH_SIZE):

    monet_ds = load_dataset(monet_files)
    photo_ds = load_dataset(photo_files)
    
    monet_ds = monet_ds.map(data_augment, num_parallel_calls=AUTOTUNE)
    photo_ds = photo_ds.map(data_augment, num_parallel_calls=AUTOTUNE)
        
    monet_ds = monet_ds.batch(batch_size)
    photo_ds = photo_ds.batch(batch_size)
    
    monet_ds = monet_ds.prefetch(AUTOTUNE)
    photo_ds = photo_ds.prefetch(AUTOTUNE)
    
    gan_ds = tf.data.Dataset.zip((monet_ds, photo_ds))
    
    return gan_ds

final_dataset = get_gan_dataset(MONET_Filenames, TEST_Photo_Filenames, batch_size=BATCH_SIZE)

## Defining Models and Losses

**Constructing the Generator**

In [None]:
#Defining the downsample layer 
def down_sample(filters, size, apply_instancenorm=True):
    # In the paper the weights are initialized from a Gaussian distribution N (0, 0.02).
    initializer = tf.random_normal_initializer(0., 0.02)
    gamma_init = keras.initializers.RandomNormal(mean=0.0, stddev=0.02)

    layer = keras.Sequential()
    layer.add(layers.Conv2D(filters, size, strides=2, padding='same', kernel_initializer=initializer, use_bias=False))

    if apply_instancenorm:
        layer.add(tfa.layers.InstanceNormalization(gamma_initializer=gamma_init))

    layer.add(layers.LeakyReLU())

    return layer

In [None]:
#defining the upsampling layer
def up_sample(filters, size, apply_dropout=False):
    initializer = tf.random_normal_initializer(0., 0.02)
    gamma_init = keras.initializers.RandomNormal(mean=0.0, stddev=0.02)

    layer = keras.Sequential()
    layer.add(layers.Conv2DTranspose(filters, size, strides=2, padding='same', kernel_initializer=initializer,use_bias=False))
    layer.add(tfa.layers.InstanceNormalization(gamma_initializer=gamma_init))

    if apply_dropout:
        layer.add(layers.Dropout(0.5))

    layer.add(layers.ReLU())

    return layer

In [None]:
#compiling the generator
def Generator():
    inputs = layers.Input(shape=[256,256,3])
    down_stack = [
        down_sample(64, 4, apply_instancenorm=False),
        down_sample(128, 4),                        
        down_sample(256, 4),                        
        down_sample(512, 4),                        
        down_sample(512, 4),                      
        down_sample(512, 4),                      
        down_sample(512, 4),                      
        down_sample(512, 4),                      
    ]

    up_stack = [
        up_sample(512, 4, apply_dropout=True),    
        up_sample(512, 4, apply_dropout=True),    
        up_sample(512, 4, apply_dropout=True),    
        up_sample(512, 4),                          
        up_sample(256, 4),                         
        up_sample(128, 4),                           
        up_sample(64, 4),                           
    ]

    initializer = tf.random_normal_initializer(0., 0.02)
    # The last activaltion function is tanh because we want to force the model to generate pixels between 1 and -1, to be the same as the input pixels after preprocessing.
    last = layers.Conv2DTranspose(3, 4, strides=2, padding='same', kernel_initializer=initializer, activation='tanh') 
   

    x = inputs

    # Downsampling through the model
    skips = []
    for down in down_stack:
        x = down(x)
        skips.append(x)

    skips = reversed(skips[:-1])

    # Upsampling and establishing the skip connections
    for up, skip in zip(up_stack, skips):
        x = up(x)
        x = layers.Concatenate()([x, skip])

    x = last(x)

    return keras.Model(inputs=inputs, outputs=x)

**Constructing the Discriminator**

In [None]:
def Discriminator():
    initializer = tf.random_normal_initializer(0., 0.02)
    gamma_init = keras.initializers.RandomNormal(mean=0.0, stddev=0.02)
    
    inp = layers.Input(shape=[256, 256, 3], name='input_image')
    x = inp
    
    down1 = down_sample(64, 4, False)(x)       
    down2 = down_sample(128, 4)(down1)        
    down3 = down_sample(256, 4)(down2)        

    zero_pad1 = layers.ZeroPadding2D()(down3)
    conv = layers.Conv2D(512, 4, strides=1, kernel_initializer=initializer, use_bias=False)(zero_pad1)

    norm1 = tfa.layers.InstanceNormalization(gamma_initializer=gamma_init)(conv)
    leaky_relu = layers.LeakyReLU()(norm1)
    zero_pad2 = layers.ZeroPadding2D()(leaky_relu)
    last = layers.Conv2D(1, 4, strides=1, kernel_initializer=initializer)(zero_pad2)

    return tf.keras.Model(inputs=inp, outputs=last)

Now let's create the models in strategy.scope() for TPU

In [None]:
with strategy.scope():
    monet_generator = Generator() # transforms photos to Monet style 
    photo_generator = Generator() # transforms Monet style to be more like photos

    monet_discriminator = Discriminator() # differentiates real images with Monet style andi mages with generated Monet style
    photo_discriminator = Discriminator() # differentiates real photos and generated photos

**Building CycleGAN class**

In [None]:
class CycleGan(keras.Model):
    def __init__(
        self,
        monet_generator,
        photo_generator,
        monet_discriminator,
        photo_discriminator, 
        lambda_cycle=10, 
    ):
        super(CycleGan, self).__init__()
        self.m_gen = monet_generator
        self.p_gen = photo_generator
        self.m_disc = monet_discriminator
        self.p_disc = photo_discriminator
        self.lambda_cycle = lambda_cycle
        
    def compile(
        self,
        m_gen_optimizer,
        p_gen_optimizer,
        m_disc_optimizer,
        p_disc_optimizer,
        gen_loss_fn,
        disc_loss_fn,
        cycle_loss_fn,
        identity_loss_fn,
    ):
        super(CycleGan, self).compile()
        self.m_gen_optimizer = m_gen_optimizer
        self.p_gen_optimizer = p_gen_optimizer
        self.m_disc_optimizer = m_disc_optimizer
        self.p_disc_optimizer = p_disc_optimizer
        self.gen_loss_fn = gen_loss_fn
        self.disc_loss_fn = disc_loss_fn
        self.cycle_loss_fn = cycle_loss_fn
        self.identity_loss_fn = identity_loss_fn

        
    def train_step(self, batch_data):
        real_monet, real_photo = batch_data
        batch_size = tf.shape(real_monet)[0]
        with tf.GradientTape(persistent=True) as tape:
        
            # photo to monet back to photo
            fake_monet = self.m_gen(real_photo, training=True)
            cycled_photo = self.p_gen(fake_monet, training=True)

            # monet to photo back to monet
            fake_photo = self.p_gen(real_monet, training=True)
            cycled_monet = self.m_gen(fake_photo, training=True)

            # generating itself
            same_monet = self.m_gen(real_monet, training=True)
            same_photo = self.p_gen(real_photo, training=True)

            
            
            # discriminator used to check, inputing real images
            disc_real_monet = self.m_disc(real_monet, training=True)
            disc_real_photo = self.p_disc(real_photo, training=True)

            # discriminator used to check, inputing fake images
            disc_fake_monet = self.m_disc(fake_monet, training=True)
            disc_fake_photo = self.p_disc(fake_photo, training=True)

            # evaluates generator loss
            monet_gen_loss = self.gen_loss_fn(disc_fake_monet)
            photo_gen_loss = self.gen_loss_fn(disc_fake_photo)

            # evaluates total cycle consistency loss
            total_cycle_loss = self.cycle_loss_fn(real_monet, cycled_monet, self.lambda_cycle) + self.cycle_loss_fn(real_photo, cycled_photo, self.lambda_cycle)

            # evaluates total generator loss
            total_monet_gen_loss = monet_gen_loss + total_cycle_loss + self.identity_loss_fn(real_monet, same_monet, self.lambda_cycle)
            total_photo_gen_loss = photo_gen_loss + total_cycle_loss + self.identity_loss_fn(real_photo, same_photo, self.lambda_cycle)

            # evaluates discriminator loss
            monet_disc_loss = self.disc_loss_fn(disc_real_monet, disc_fake_monet)
            photo_disc_loss = self.disc_loss_fn(disc_real_photo, disc_fake_photo)

        # Calculate the gradients for generator and discriminator
        monet_generator_gradients = tape.gradient(total_monet_gen_loss,
                                                  self.m_gen.trainable_variables)
        photo_generator_gradients = tape.gradient(total_photo_gen_loss,
                                                  self.p_gen.trainable_variables)

        monet_discriminator_gradients = tape.gradient(monet_disc_loss,
                                                      self.m_disc.trainable_variables)
        photo_discriminator_gradients = tape.gradient(photo_disc_loss,
                                                      self.p_disc.trainable_variables)

        # Apply the gradients to the optimizer
        self.m_gen_optimizer.apply_gradients(zip(monet_generator_gradients,
                                                 self.m_gen.trainable_variables))

        self.p_gen_optimizer.apply_gradients(zip(photo_generator_gradients,
                                                 self.p_gen.trainable_variables))

        self.m_disc_optimizer.apply_gradients(zip(monet_discriminator_gradients,
                                                  self.m_disc.trainable_variables))

        self.p_disc_optimizer.apply_gradients(zip(photo_discriminator_gradients,
                                                  self.p_disc.trainable_variables))
        
        return {
            "monet_gen_loss": total_monet_gen_loss,
            "photo_gen_loss": total_photo_gen_loss,
            "monet_disc_loss": monet_disc_loss,
            "photo_disc_loss": photo_disc_loss
        }

- Defining the loss function for the discriminator that labels the original images as 1, and the false ones as 0. The ideal discriminator will output only  1 for the true images and  zeros for the false.

In [None]:
with strategy.scope():
    def discriminator_loss(real, generated):
        real_loss = tf.keras.losses.BinaryCrossentropy(from_logits=True, reduction=tf.keras.losses.Reduction.NONE)(tf.ones_like(real), real)

        generated_loss = tf.keras.losses.BinaryCrossentropy(from_logits=True, reduction=tf.keras.losses.Reduction.NONE)(tf.zeros_like(generated), generated)

        total_disc_loss = real_loss + generated_loss

        return total_disc_loss * 0.5

- Defining the loss function for the generator that tries to trick the discriminator by generating an image that the discriminator considers as original. An ideal generator will cause the discriminator on the output to return  1.

In [None]:
with strategy.scope():
    def generator_loss(generated):
        return tf.keras.losses.BinaryCrossentropy(from_logits=True, reduction=tf.keras.losses.Reduction.NONE)(tf.ones_like(generated), generated)

- Defining the Cycle consistency loss, wich is the arithmetic mean of the differences between the original photo and the transformed photo denoted as l1

In [None]:
with strategy.scope():
    def calc_cycle_loss(real_image, cycled_image, LAMBDA):
        l1 = tf.reduce_mean(tf.abs(real_image - cycled_image))

        return LAMBDA * l1

Finally defining thr Identity loss that is used to compare the image x and that image produced by generator F. We expect F (x) ~ x, i.e. if the Monet style image generator is a Monet image, the output should get the same image.

In [None]:
with strategy.scope():
    def identity_loss(real_image, same_image, LAMBDA):
        loss = tf.reduce_mean(tf.abs(real_image - same_image))
        return LAMBDA * 0.5 * loss

## Training 
Let's initialize the optimizers for the models in strategy.scope() because we are using TPU

In [None]:
with strategy.scope():
    monet_generator_optimizer = tf.keras.optimizers.Adam(2e-4, beta_1=0.5)
    photo_generator_optimizer = tf.keras.optimizers.Adam(2e-4, beta_1=0.5)

    monet_discriminator_optimizer = tf.keras.optimizers.Adam(2e-4, beta_1=0.5)
    photo_discriminator_optimizer = tf.keras.optimizers.Adam(2e-4, beta_1=0.5)

**Compiling and fitting the model**

In [None]:
with strategy.scope():
    cycle_gan_model = CycleGan(
        monet_generator, photo_generator, monet_discriminator, photo_discriminator
    )

    cycle_gan_model.compile(
        m_gen_optimizer = monet_generator_optimizer,
        p_gen_optimizer = photo_generator_optimizer,
        m_disc_optimizer = monet_discriminator_optimizer,
        p_disc_optimizer = photo_discriminator_optimizer,
        gen_loss_fn = generator_loss,
        disc_loss_fn = discriminator_loss,
        cycle_loss_fn = calc_cycle_loss,
        identity_loss_fn = identity_loss
    )

In [None]:
history = cycle_gan_model.fit(final_dataset, 
                        epochs=EPOCHS, 
                        ).history

## Visualizing loss function 

In [None]:
loss_results_df = pd.DataFrame(history)
loss_results_df = loss_results_df.applymap(np.mean)
plt.plot(loss_results_df.index, loss_results_df['monet_gen_loss'], color='g', label='Loss Monet Generator')
plt.plot(loss_results_df.index, loss_results_df['photo_gen_loss'], color='r', label='Loss Photo Generator')
plt.plot(loss_results_df.index, loss_results_df['monet_disc_loss'], color='b', label='Loss Monet Discriminator')
plt.plot(loss_results_df.index, loss_results_df['photo_disc_loss'], color='m', label='Loss Photo Discriminator')
plt.legend(loc='best')
plt.show()


## Results

In [None]:
_, ax = plt.subplots(5, 2, figsize=(10, 10))
for i, img in enumerate(photo_ds.take(5)):
    prediction = monet_generator(img, training=False)[0].numpy()
    prediction = (prediction * 127.5 + 127.5).astype(np.uint8)
    img = (img[0] * 127.5 + 127.5).numpy().astype(np.uint8)

    ax[i, 0].imshow(img)
    ax[i, 1].imshow(prediction)
    ax[i, 0].set_title("Input Photo")
    ax[i, 1].set_title("Monet-esque")
    ax[i, 0].axis("off")
    ax[i, 1].axis("off")
plt.show()

**As you can see the results are not that great. Please increase the number of epochs to about 100 for a better result. The dicriminator loss tends to be around 0.55 which is much better than what we have achieved here.**

## Saving and Submission

In [None]:
import PIL
! mkdir ../images

i = 1
for img in photo_ds:
    prediction = monet_generator(img, training=False)[0].numpy()
    prediction = (prediction * 127.5 + 127.5).astype(np.uint8)
    im = PIL.Image.fromarray(prediction)
    im.save("../images/" + str(i) + ".jpg")
    i += 1
    

import shutil
shutil.make_archive("/kaggle/working/images", 'zip', "/kaggle/images")