Objective: 

Build a GAN that takes in real photos and recreates them in a Monet-like style painting.

Modeling Description: 

Generative adversarial networks (GAN) are used in deep learning and represents the application of using two types of neural networks: a generator and a discriminator.

The generator creates fake data while the discriminator tries to identify that data as fake. If the discriminator is successful, the generator is penalized, it adjusts the parameters used to generate the fake data, and it re-generates the fake data for the discriminator to assess again. 

If the generator is successful, the discriminator is penalized and it learns how to identify the new fake data. 

Both networks are in competition with each other and repeats the cycle until the discrimanator is unable to identify the real vs fake data.

Data Description: 

The dataset includes 300 Monet paintings to train the model, and 7000 photos to convert to a Monet-style like painting.

In [None]:
import numpy as np
import pandas as pd 

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers

%matplotlib inline
import matplotlib.pyplot as plt
import matplotlib.image as mpimg

import os
from PIL import Image
import shutil

In [None]:
# bringing in monets
monets = '/kaggle/input/gan-getting-started/monet_jpg/'
monet_files = [os.path.join(monets,f) for f in os.listdir(monets) if os.path.isfile(os.path.join(monets,f))]
monet_files_cnv = [np.array(Image.open(f)) for f in monet_files]
print('Monet paintings: ', len(os.listdir(monets)))


In [None]:
#bringing in photos
photos = '/kaggle/input/gan-getting-started/photo_jpg/'
photo_files = [os.path.join(photos,f) for f in os.listdir(photos) if os.path.isfile(os.path.join(photos,f))]
photo_files_cnv = [np.array(Image.open(f)) for f in photo_files]
print('Photos: ', len(os.listdir(photos)))

In [None]:
#Viewing the images
plt.subplot(121)
plt.title('Monet: 17')
plt.imshow(mpimg.imread(monet_files[17]))


plt.subplot(122)
plt.title('Photo: 17')
plt.imshow(mpimg.imread(photo_files[17]))


In [None]:
#Paths for tfrec files
tf_monet = tf.io.gfile.glob(str('/kaggle/input/gan-getting-started/monet_tfrec/*.tfrec'))
tf_photo = tf.io.gfile.glob(str('/kaggle/input/gan-getting-started/photo_tfrec/*.tfrec'))

In [None]:
#helper functions to decode and read tfrec
sizing = [256,256]

def decode(file):
    file = tf.image.decode_jpeg(file,channels=3)
    file = (tf.cast(file,tf.float32)/127.5) - 1
    file = tf.reshape(file, [*sizing,3])
    return file

def read_tfrec(record):
    tfrec_form = {
        "name": tf.io.FixedLenFeature([], tf.string),
        "pic": tf.io.FixedLenFeature([], tf.string),
        "target": tf.io.FixedLenFeature([],tf.string)
    }
    record = tf.io.parse_single_example(record, tfrec_form)
    pic = decode(record['pic'])
    return pic

In [None]:
#pulling in tfrec files
def load_tfrec(files, label=True, order=False):
    data = tf.data.TFRecordDataset(files)
    data = data.map(read_tfrec)
    return data

In [None]:
photo_data = load_tfrec(tf_photo, label=True).batch(1)
monet_data = load_tfrec(tf_monet, label=True).batch(1)

Model Building:

We need to build the generator and discriminator for this architecture. Both require CNN models and need multiple layers to manipulate the images into the right format.

In [None]:
#building the generator
output_ch = 3

## helpers for image size layer
def down_sample(x, y):
    init = tf.random_normal_initializer(0,.05)
    gamma = keras.initializers.RandomNormal(mean=0, stddev=.05)
    
    res = keras.Sequential()
    res.add(layers.Conv2D(x,y,strides=2,padding='same', kernel_initializer=init, use_bias=False))
    res.add(layers.LeakyReLU())

    return res

def up_sample(x,y):
    init = tf.random_normal_initializer(0,.05)
    gamma = keras.initializers.RandomNormal(mean=0, stddev=.05)
    
    res = keras.Sequential()
    res.add(layers.Conv2DTranspose(x,y,strides=2,padding='same', kernel_initializer=init, use_bias=False))
    res.add(layers.ReLU())

    return res

In [None]:
#building the generator
def Gen():
    inputs = layers.Input(shape=[256,256,3])

    downs = [down_sample(64,4),
             down_sample(128, 4),
             down_sample(256, 4),
             down_sample(512, 4),
             down_sample(512, 4),
             down_sample(512, 4),
             down_sample(512, 4),
             down_sample(512, 4)]
    ups = [
        up_sample(512, 4),
        up_sample(512, 4),
        up_sample(512, 4),
        up_sample(512, 4),
        up_sample(256, 4),
        up_sample(128, 4),
        up_sample(64, 4),
    ]
    
    init = tf.random_normal_initializer(0., 0.05)
    last = layers.Conv2DTranspose(output_ch, 4,
                                  strides=2,
                                  padding='same',
                                  kernel_initializer=init,
                                  activation='tanh')

    fs = inputs

    ls_miss = []
    for each in downs:
        fs = each(fs)
        ls_miss.append(fs)

    ls_miss = reversed(ls_miss[:-1])

    for i,j in zip(ups, ls_miss):
        fs = i(fs)
        fs = layers.Concatenate()([fs, j])

    fs = last(fs)

    return keras.Model(inputs=inputs, outputs=fs)

In [None]:
#building the discriminator
def Dis():
    init = tf.random_normal_initializer(0,.05)
    gamma = keras.initializers.RandomNormal(mean=0, stddev=.05)

    inputs = layers.Input(shape=[256,256,3])

    fs = inputs

    downs_a = down_sample(64,4)(fs)
    downs_b = down_sample(128,4)(downs_a)
    downs_c = down_sample(256,4)(downs_b)

    pads = layers.ZeroPadding2D()(downs_c)
    cons = layers.Conv2D(512,4,strides=1, kernel_initializer=init, use_bias=False)(pads)

    last = layers.Conv2D(1,4,strides=1,kernel_initializer=init)(pads)

    return tf.keras.Model(inputs=inputs,outputs=last)

In [None]:
#Generator and Discriminator Creation
ph_gen = Gen()
ph_dis = Dis()

mn_gen = Gen()
mn_dis = Dis()

In [None]:
#building the full GAN
class CycleGan(keras.Model):
    def __init__(
        self,
        mn_gen,
        ph_gen,
        mn_dis,
        ph_dis,
        lambdas=5,
    ):
        super(CycleGan, self).__init__()
        self.mngen = mn_gen
        self.phgen = ph_gen
        self.mndis = mn_dis
        self.phdis = ph_dis
        self.lambda_num = lambdas

    def compile(
        self,
        opt_mgen,
        opt_pgen,
        opt_mdis,
        opt_pdis,
        loss_gn,
        loss_dis,
        loss_cycle,
        loss_id
    ):
        super(CycleGan, self).compile()
        self.opt_mgen = opt_mgen
        self.opt_pgen = opt_pgen
        self.opt_mdis = opt_mdis
        self.opt_pdis = opt_pdis
        self.loss_gn = loss_gn
        self.loss_dis = loss_dis
        self.loss_cycle = loss_cycle
        self.loss_id = loss_id
        
    def call(self, inputs):
        gens = self.mngen
       return self.model(inputs)
        
    def training(self, data):
        mn_rl, ph_rl = data
        
        with tf.GradientTape(persistent=True) as tp:
            mn_fk = self.mngen(ph_rl, training=True)
            cnv_ph = self.phgen(mn_fk, training=True)

            ph_fk = self.p_gen(mn_rl, training=True)
            cnv_mn = self.m_gen(ph_fk, training=True)

            mnsm = self.m_gen(mn_rl, training=True)
            phsm = self.p_gen(ph_rl, training=True)

            dis_mn_rl = self.mn_dis(mn_rl, training=True)
            dis_ph_rl = self.ph_dis(ph_rl, training=True)

            dis_mn_fk = self.mn_dis(mn_fk, training=True)
            dis_ph_fk = self.ph_dis(ph_fk, training=True)

            mn_gen_loss = self.loss_gn(dis_mn_fk)
            ph_gen_loss = self.loss_gn(dis_ph_fk)

            total_loss = self.loss_cycle(mn_rl, cnv_mn, self.lambda_num) + self.loss_cycle(ph_rl, cnv_ph, self.lambda_num)

            total_mn_gen_loss = mn_gen_loss + total_loss + self.loss_id(mn_rl, mnsm, self.lambda_num)
            total_ph_gen_loss = ph_gen_loss + total_loss + self.loss_id(ph_rl, phsm, self.lambda_num)

            mn_dis_loss = self.loss_dis(dis_mn_rl, dis_mn_fk)
            ph_dis_loss = self.loss_dis(dis_ph_rl, dis_ph_fk)

        mn_gen_grads = tp.gradient(total_mn_gen_loss, self.mngen.train)
        ph_gen_grads = tp.gradient(total_ph_gen_loss, self.phgen.train)
        mn_dis_grads = tp.gradient(mn_dis_loss, self.mn_dis.train)
        ph_dis_grads = tp.gradient(ph_dis_loss, self.ph_dis.train)

        self.opt_mgen.apply_gradients(zip(mn_gen_grads, self.mngen.train))
        self.opt_pgen.apply_gradients(zip(ph_gen_grads, self.phgen.train))
        self.opt_mdis.apply_gradients(zip(mn_dis_grads, self.mn_dis.train))
        self.opt_pdis.apply_gradients(zip(ph_dis_grads, self.ph_dis.train))
        
        return {
            "mn_gen_loss": total_mn_gen_loss,
            "ph_gen_loss": total_ph_gen_loss,
            "mn_dis_loss": mn_dis_loss,
            "ph_dis_loss": ph_dis_loss
        }

In [None]:
#loss functions for the model

def loss_id_in(x,y,z):
    loss = tf.reduce_mean(tf.abs(x-y))
    return z * 0.5 * loss

def loss_dis_in(x,y):
    x_loss = tf.keras.losses.BinaryCrossentropy(from_logits=True, reduction=tf.keras.losses.Reduction.NONE)(tf.ones_like(x), x)
    y_loss = tf.keras.losses.BinaryCrossentropy(from_logits=True, reduction=tf.keras.losses.Reduction.NONE)(tf.zeros_like(y), y)
    total_dis_loss = x_loss + y_loss

def loss_cycle_in(x, y, z):
    loss_fl = tf.reduce_mean(tf.abs(x - y))
    return z * loss_fl

def loss_gen_in(x):
    return tf.keras.losses.BinaryCrossentropy(from_logits=True, reduction=tf.keras.losses.Reduction.NONE)(tf.ones_like(x), x)

In [None]:
# optimizer for model creation
opt_mgen_in = tf.keras.optimizers.Adam(1e-4, beta_1=0.1)
opt_pgen_in = tf.keras.optimizers.Adam(1e-4, beta_1=0.1)

opt_mdis_in = tf.keras.optimizers.Adam(1e-4, beta_1=0.1)
opt_pdis_in = tf.keras.optimizers.Adam(1e-4, beta_1=0.1)

In [None]:
# Init the GAN Model
gan_model = CycleGan(mn_gen, ph_gen, mn_dis, ph_dis)
gan_model.compile(
    opt_mgen = opt_mgen_in,
    opt_pgen = opt_pgen_in,
    opt_mdis = opt_mdis_in,
    opt_pdis = opt_pdis_in,
    loss_gn = loss_gen_in,
    loss_dis = loss_dis_in,
    loss_cycle = loss_cycle_in,
    loss_id = loss_id_in)

In [None]:
#training the model on the data
gan_model.fit(tf.data.Dataset.zip((monet_data,photo_data)),
              epochs=10)

Submission Generation

In [None]:
i = 1
for each in photo_data:
    pred = monet_generator(each, training=False)[0].numpy()
    pred = (pred * 127.5 + 127.5).astype(np.uint8)
    each = Image.fromarray(pred)
    im.save("images/" + str(i) + ".jpg")
    i += 1

Building, training and compiling a generatide adversial model is quite complicated. It requires the careful handling of multiple layers, specific configurations on the generator and discriminator and it requires a lot of computational power. However, while complicated and time-intensive, it proved to be a powerful application of deep learning because it yielded images that were fairly acceptable. 

In other contexts, I would experiment more with adding layers or tuning specific layers that are earlier in the process. I would include additional epochs which would increase computation time but might yield better results. I might use more distinct or defined images to see how the model handles images with lots of detail. 