In [None]:
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import tensorflow_addons as tfa
from kaggle_datasets import KaggleDatasets
import matplotlib.pyplot as plt
import numpy as np
import re
import PIL
import os
import shutil
import cv2
import math
import random

try:
    tpu = tf.distribute.cluster_resolver.TPUClusterResolver()
    print('Device:', tpu.master())
    tf.config.experimental_connect_to_cluster(tpu)
    tf.tpu.experimental.initialize_tpu_system(tpu)
    strategy = tf.distribute.experimental.TPUStrategy(tpu)
except:
    strategy = tf.distribute.get_strategy()
print('Number of replicas:', strategy.num_replicas_in_sync)

AUTO = tf.data.experimental.AUTOTUNE
print(tf.__version__)

In [None]:
BASE_PATH = '../input/gan-getting-started'
MONET_PATH = os.path.join(BASE_PATH, 'monet_jpg')
PHOTO_PATH = os.path.join(BASE_PATH, 'photo_jpg')

In [None]:
x=10
def load_images(paths):
    images = []
    for img in paths:
        try:
            img = cv2.imread(img)
            img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        except:
            print("Could not load {}".format(img))
        images.append(img)
    return images

In [None]:
MONET_IMAGES = [os.path.join(MONET_PATH, file) for file in os.listdir(MONET_PATH)]
monet_images = load_images(MONET_IMAGES)

PHOTO_IMAGES = [os.path.join(PHOTO_PATH, file) for file in os.listdir(PHOTO_PATH)]
photo_images = load_images(PHOTO_IMAGES)

In [None]:
GCS_PATH = KaggleDatasets().get_gcs_path("gan-getting-started")

In [None]:
# Size of input data
MONET_FILES = tf.io.gfile.glob(str(GCS_PATH + '/monet_tfrec/*.tfrec'))
PHOTO_FILES = tf.io.gfile.glob(str(GCS_PATH + '/photo_tfrec/*.tfrec'))

n = [int(re.compile(r"-([0-9]*)\.").search(filename).group(1)) for filename in MONET_FILES]
n_monet = np.sum(n)
n = [int(re.compile(r"-([0-9]*)\.").search(filename).group(1)) for filename in PHOTO_FILES]
n_photo = np.sum(n)

print(f'Monet image files: {n_monet}')
print(f'Photo image files: {n_photo}')

In [None]:
def read_tfrecord(example):
    IMAGE_SIZE = [256, 256]
    tfrecord_format = {
        "image_name": tf.io.FixedLenFeature([], tf.string),
        "image": tf.io.FixedLenFeature([], tf.string),
        "target": tf.io.FixedLenFeature([], tf.string)
    }
    example = tf.io.parse_single_example(example, tfrecord_format)
    im = example['image']
    
    image = tf.image.decode_jpeg(im, channels=3)
    image = (tf.cast(image, tf.float32) / 127.5) - 1
    image = tf.reshape(image, [*IMAGE_SIZE, 3])
    
    return image
def augment_image(image): # input data augmentation
    x = tf.random.uniform([], 0, 1.0, dtype=tf.float32)
    y = tf.random.uniform([], 0, 1.0, dtype=tf.float32)
    
    if y > .5: # random crop image
        image = tf.image.resize(image, [286, 286])
        image = tf.image.random_crop(image, size=[256, 256, 3])
            
    if x > .6: # random flip image
        image = tf.image.random_flip_left_right(image)
    
    return image

In [None]:
def load_data(filenames):
    dataset = tf.data.TFRecordDataset(filenames)
    dataset = dataset.map(read_tfrecord, num_parallel_calls=AUTO)
    return dataset

def gan_dataset(monet_files, photo_files, augment=None, repeat=True, shuffle=True, batch_size=1):
    monet_ds = load_data(monet_files)
    photo_ds = load_data(photo_files)
    
    if augment:
        monet_ds = monet_ds.map(augment, num_parallel_calls=AUTO)
        photo_ds = photo_ds.map(augment, num_parallel_calls=AUTO)
    if repeat:
        monet_ds = monet_ds.repeat()
        photo_ds = photo_ds.repeat()

    monet_ds = monet_ds.batch(batch_size, drop_remainder=True)
    photo_ds = photo_ds.batch(batch_size, drop_remainder=True)

    monet_ds = monet_ds.prefetch(AUTO)
    photo_ds = photo_ds.prefetch(AUTO)
    
    gan_ds = tf.data.Dataset.zip((monet_ds, photo_ds))
    
    return gan_ds
data = gan_dataset(MONET_FILES, PHOTO_FILES, augment=augment_image, repeat=True, shuffle=True, batch_size=4)

In [None]:
def augment_image(image): # input data augmentation
    x = tf.random.uniform([], 0, 1.0, dtype=tf.float32)
    y = tf.random.uniform([], 0, 1.0, dtype=tf.float32)
    
    if y > .5: # random crop image
        image = tf.image.resize(image, [286, 286])
        image = tf.image.random_crop(image, size=[256, 256, 3])
            
    if x > .6: # random flip image
        image = tf.image.random_flip_left_right(image)
    
    return image

In [None]:
sample_monet , sample_photo = next(iter(data))

# Display Photo images
plt.subplot(121)
plt.title('Photo')
plt.imshow(sample_photo[0] * 0.5 + 0.5)

# Display Monet images
plt.subplot(122)
plt.title('Monet')
plt.imshow(sample_monet[0] * 0.5 + 0.5)

In [None]:
def down_sample(filters, size, apply_instancenorm=True):
    initializer = tf.random_normal_initializer(0., 0.02)
    gamma_init = keras.initializers.RandomNormal(mean=0.0, stddev=0.02)

    layer = keras.Sequential()
    layer.add(layers.Conv2D(filters, size, strides=2, padding='same', kernel_initializer=initializer, use_bias=False))

    if apply_instancenorm:
        layer.add(tfa.layers.InstanceNormalization(gamma_initializer=gamma_init))

    layer.add(layers.LeakyReLU())

    return layer

def up_sample(filters, size, apply_dropout=False):
    initializer = tf.random_normal_initializer(0., 0.02)
    gamma_init = keras.initializers.RandomNormal(mean=0.0, stddev=0.02)

    layer = keras.Sequential()
    layer.add(layers.Conv2DTranspose(filters, size, strides=2, padding='same', kernel_initializer=initializer,use_bias=False))
    layer.add(tfa.layers.InstanceNormalization(gamma_initializer=gamma_init))

    if apply_dropout:
        layer.add(layers.Dropout(0.5))

    layer.add(layers.ReLU())

    return layer

In [None]:
def generator():
    inputs = layers.Input(shape=[256,256,3])
    down_stack = [
        down_sample(64, 4, apply_instancenorm=False),# (size, 128, 128, 64)
        down_sample(128, 4),                         # (size, 64, 64, 128)
        down_sample(256, 4),                         # (size, 32, 32, 256)
        down_sample(512, 4),                         # (size, 16, 16, 512)
        down_sample(512, 4),                         # (size, 8, 8, 512)
        down_sample(512, 4),                         # (size, 4, 4, 512)
        down_sample(512, 4),                         # (size, 2, 2, 512)
        down_sample(512, 4),                         # (size, 1, 1, 512)
    ]

    up_stack = [
        up_sample(512, 4, apply_dropout=True),       # (size, 2, 2, 1024)
        up_sample(512, 4, apply_dropout=True),       # (size, 4, 4, 1024)
        up_sample(512, 4, apply_dropout=True),       # (size, 8, 8, 1024)
        up_sample(512, 4),                           # (size, 16, 16, 1024)
        up_sample(256, 4),                           # (size, 32, 32, 512)
        up_sample(128, 4),                           # (size, 64, 64, 256)
        up_sample(64, 4),                            # (size, 128, 128, 128)
    ]

    initializer = tf.random_normal_initializer(0., 0.02)
    last = layers.Conv2DTranspose(3, 4, strides=2, padding='same', kernel_initializer=initializer, activation='tanh') 
    # (size, 256, 256, 3)

    x = inputs

    # Downsampling through the model
    skips = []
    for down in down_stack:
        x = down(x)
        skips.append(x)

    skips = reversed(skips[:-1])

    # Upsampling and establishing the skip connections
    for up, skip in zip(up_stack, skips):
        x = up(x)
        x = layers.Concatenate()([x, skip])

    x = last(x)

    return keras.Model(inputs=inputs, outputs=x)

In [None]:
def discriminator():
    initializer = tf.random_normal_initializer(0., 0.02)
    gamma_init = keras.initializers.RandomNormal(mean=0.0, stddev=0.02)
    
    inp = layers.Input(shape=[256, 256, 3], name='input_image')
    x = inp
    
    down1 = down_sample(64, 4, False)(x)       # (size, 128, 128, 64)
    down2 = down_sample(128, 4)(down1)         # (size, 64, 64, 128)
    down3 = down_sample(256, 4)(down2)         # (size, 32, 32, 256)

    zero_pad1 = layers.ZeroPadding2D()(down3) # (size, 34, 34, 256)
    conv = layers.Conv2D(512, 4, strides=1, kernel_initializer=initializer, use_bias=False)(zero_pad1) # (size, 31, 31, 512)

    norm1 = tfa.layers.InstanceNormalization(gamma_initializer=gamma_init)(conv)
    leaky_relu = layers.LeakyReLU()(norm1)
    zero_pad2 = layers.ZeroPadding2D()(leaky_relu) # (size, 33, 33, 512)
    last = layers.Conv2D(1, 4, strides=1, kernel_initializer=initializer)(zero_pad2) # (size, 30, 30, 1)

    return tf.keras.Model(inputs=inp, outputs=last)

In [None]:
with strategy.scope(): # for TPU
    monet_generator = generator() # transforms photos to Monet paintings
    photo_generator = generator() # transforms Monet paintings to be more like photos

    monet_discriminator = discriminator() # differentiates real Monet paintings and generated Monet paintings
    photo_discriminator = discriminator()

In [None]:
class Gan(keras.Model):
    def __init__(
        self,
        monet_generator,
        photo_generator,
        monet_discriminator,
        photo_discriminator,
        lambda_cycle=150,
    ):
        super(Gan, self).__init__()
        self.m_gen = monet_generator
        self.p_gen = photo_generator
        self.m_disc = monet_discriminator
        self.p_disc = photo_discriminator
        self.lambda_cycle = lambda_cycle
        
    def compile(
        self,
        m_gen_optimizer,
        p_gen_optimizer,
        m_disc_optimizer,
        p_disc_optimizer,
        gen_loss_fn,
        disc_loss_fn,
        cycle_loss_fn,
        identity_loss_fn
    ):
        super(Gan, self).compile()
        self.m_gen_optimizer = m_gen_optimizer
        self.p_gen_optimizer = p_gen_optimizer
        self.m_disc_optimizer = m_disc_optimizer
        self.p_disc_optimizer = p_disc_optimizer
        self.gen_loss_fn = gen_loss_fn
        self.disc_loss_fn = disc_loss_fn
        self.cycle_loss_fn = cycle_loss_fn
        self.identity_loss_fn = identity_loss_fn
        
    def train_step(self, batch_data):
        real_monet, real_photo = batch_data
        
        with tf.GradientTape(persistent=True) as tape:
            fake_monet = self.m_gen(real_photo, training=True)
            same_monet = self.m_gen(real_monet, training=True)
            disc_real_monet = self.m_disc(real_monet, training=True)
            disc_fake_monet = self.m_disc(fake_monet, training=True)
            monet_gen_loss = self.gen_loss_fn(disc_fake_monet)
            total_monet_gen_loss = monet_gen_loss + self.identity_loss_fn(real_monet, same_monet, self.lambda_cycle)
            monet_disc_loss = self.disc_loss_fn(disc_real_monet, disc_fake_monet)      
            
        monet_generator_gradients = tape.gradient(total_monet_gen_loss, self.m_gen.trainable_variables)

        monet_discriminator_gradients = tape.gradient(monet_disc_loss, self.m_disc.trainable_variables)
        self.m_gen_optimizer.apply_gradients(zip(monet_generator_gradients, self.m_gen.trainable_variables))
        self.m_disc_optimizer.apply_gradients(zip(monet_discriminator_gradients, self.m_disc.trainable_variables))
        
        return {
            "monet_gen_loss": total_monet_gen_loss,
            "monet_disc_loss": monet_disc_loss,
        }

In [None]:
with strategy.scope(): # for TPU
    def discriminator_loss(real, generated):
        real_loss = tf.keras.losses.BinaryCrossentropy(from_logits=True, reduction=tf.keras.losses.Reduction.NONE)(tf.ones_like(real), real)
        generated_loss = tf.keras.losses.BinaryCrossentropy(from_logits=True, reduction=tf.keras.losses.Reduction.NONE)(tf.zeros_like(generated), generated)
        total_disc_loss = real_loss + generated_loss

        return total_disc_loss * 0.5

with strategy.scope(): # for TPU
    def generator_loss(generated):
        return tf.keras.losses.BinaryCrossentropy(from_logits=True, reduction=tf.keras.losses.Reduction.NONE)(tf.ones_like(generated), generated)

with strategy.scope(): # for TPU
    def calculate_cycle_loss(real_image, cycled_image, LAMBDA):
        loss1 = tf.reduce_mean(tf.abs(real_image - cycled_image))
        return LAMBDA * loss1

with strategy.scope(): # for TPU
    def identity_loss(real_image, same_image, LAMBDA):
        loss = tf.reduce_mean(tf.abs(real_image - same_image))
        return LAMBDA * 0.5 * loss

In [None]:
with strategy.scope(): # for TPU
    monet_generator_optimizer = tf.keras.optimizers.Adam(2e-4, beta_1=0.5)
    photo_generator_optimizer = tf.keras.optimizers.Adam(2e-4, beta_1=0.5)

    monet_discriminator_optimizer = tf.keras.optimizers.Adam(2e-4, beta_1=0.5)
    photo_discriminator_optimizer = tf.keras.optimizers.Adam(2e-4, beta_1=0.5)

In [None]:
with strategy.scope(): # for TPU
    gan_model = Gan(monet_generator, photo_generator, monet_discriminator, photo_discriminator)

    gan_model.compile(
        m_gen_optimizer = monet_generator_optimizer,
        p_gen_optimizer = photo_generator_optimizer,
        m_disc_optimizer = monet_discriminator_optimizer,
        p_disc_optimizer = photo_discriminator_optimizer,
        gen_loss_fn = generator_loss,
        disc_loss_fn = discriminator_loss,
        cycle_loss_fn = calculate_cycle_loss,
        identity_loss_fn = identity_loss
    )

In [None]:
gan_model.fit(data, epochs=25, steps_per_epoch=(max(n_monet, n_photo)//5))

In [None]:
def predict_and_save(input_ds, generator_model, output_path):
    i = 1
    for img in input_ds:
        prediction = generator_model(img, training=False)[0].numpy() # make predition
        prediction = (prediction * 127.5 + 127.5).astype(np.uint8)   # re-scale
        im = PIL.Image.fromarray(prediction)
        im.save(f'{output_path}{str(i)}.jpg')
        i += 1

In [None]:
os.makedirs('../images/')
predict_and_save(load_data(PHOTO_FILES).batch(1), monet_generator, '../images/')
shutil.make_archive('/kaggle/working/images/', 'zip', '../images')
print(f"Generated samples: {len([name for name in os.listdir('../images/') if os.path.isfile(os.path.join('../images/', name))])}")

In [None]:
monet_histograms = [[cv2.calcHist([image],[i],None,[256],[0,256]) for i in range(3)] for image in monet_images]
photo_histograms = [[cv2.calcHist([image],[i],None,[256],[0,256]) for i in range(3)] for image in photo_images]

In [None]:
def plot_histograms(img, title):
    plt.figure(figsize=(16,16))
    
    plt.title(title)
    color = ('b','g','r')
    
    
    w = 4
    h = int(len(img)/2)
    
    idxs = [i for i in range(len(img)*2)[::2]]
    
    for idx, image in zip(idxs,img):
        
        plt.subplot(h, w, idx + 1)
        plt.imshow(image)
        plt.axis('off')
        
        plt.subplot(h, w, idx + 2)
        for i,col in enumerate(color):   
            histr = cv2.calcHist([image],[i],None,[256],[0,256])
            plt.plot(histr,color = col)
            plt.xlim([0,256])

In [None]:
plot_histograms(random.sample(monet_images,8), "Histogram for Monet Image")

In [None]:
plot_histograms(random.sample(photo_images,8), "Histogram for Photo")

In [None]:
color = ('b','g','r')
monet_means = {c: [np.mean(x[:,:,i]) for x in monet_images] for i,c in enumerate(color)}
photo_means = {c: [np.mean(x[:,:,i]) for x in photo_images] for i,c in enumerate(color)}

monet_std = {c: [np.std(x[:,:,i]) for x in monet_images] for i,c in enumerate(color)}
photo_std = {c: [np.std(x[:,:,i]) for x in photo_images] for i,c in enumerate(color)}

In [None]:
def boxplots_for_comparison(monet, photo, title):
    for i,c in enumerate(color):
        fig, ax = plt.subplots()
        ax.set_title('{} Channel: {}'.format(c.upper(), title))
        ax.boxplot([monet[c], photo[c]])
        ax.set_xticklabels(["Monet","Photo"])
        fig.show()

In [None]:
boxplots_for_comparison(monet_means, photo_means, "Pixel Value Mean comparison - Monet and Photo")

In [None]:
boxplots_for_comparison(monet_std, photo_std, "Pixel Value standard deviation comparison - Monet and Photo")

In [None]:
zero_values_monet = {c: [np.count_nonzero(hist[i].ravel()==0) for hist in monet_histograms] for i,c in enumerate(color)}
zero_values_photo = {c: [np.count_nonzero(hist[i].ravel()==0) for hist in photo_histograms] for i,c in enumerate(color)}

In [None]:
boxplots_for_comparison(zero_values_monet,zero_values_photo, "Number of not used intensity values Monet and Photo" )