In [None]:
@inproceedings{CycleGAN2017,
  title={Unpaired Image-to-Image Translation using Cycle-Consistent Adversarial Networkss},
  author={Zhu, Jun-Yan and Park, Taesung and Isola, Phillip and Efros, Alexei A},
  booktitle={Computer Vision (ICCV), 2017 IEEE International Conference on},
  year={2017}
}

In [None]:
import tensorflow as tf
import tensorflow_datasets as tfds
import os
import time
import matplotlib.pyplot as plt
from IPython.display import clear_output

import tensorflow_addons as tfa
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.initializers import RandomNormal
from tensorflow.keras.models import Model
from tensorflow.keras import Input
from tensorflow.keras.layers import Conv2D, Flatten, Dense
from tensorflow.keras.layers import Conv2DTranspose
from tensorflow.keras.layers import LeakyReLU
from tensorflow.keras.layers import Activation
from tensorflow.keras.layers import Concatenate
from tensorflow.keras.layers import BatchNormalization
from tensorflow_addons.layers import InstanceNormalization
from tensorflow.keras import backend

import numpy
import random
import glob
from PIL import Image
from numpy import asarray

AUTOTUNE = tf.data.AUTOTUNE

In [None]:
BUFFER_SIZE = 1000
BATCH_SIZE = 1
IMG_WIDTH = 256
IMG_HEIGHT = 256

In [None]:
train_horses, train_zebras = [], []
test_horses, test_zebras = [], []

In [None]:
def turnimagetonumpyA(d,array):
    # load the image and convert into
    # numpy array
    expre = d + '/*_A.jpg'
    jpgFilenamesList = glob.glob(expre)
    jpgFilenamesList = sorted(jpgFilenamesList)
    for name in jpgFilenamesList[:2000]:
        img = Image.open(name)
        # asarray() class is used to convert
        # PIL images into NumPy arrays
        numpydata = asarray(img)
        #  shape
        # print(numpydata.shape)
        numpydata = (numpydata / 127.5) - 1
        array.append(numpydata)

In [None]:
def turnimagetonumpyB(d,array):
    # load the image and convert into
    # numpy array
    expre = d + '/*_B.jpg'
    # print(expre)
    jpgFilenamesList = glob.glob(expre)
    jpgFilenamesList = sorted(jpgFilenamesList)
    #print(jpgFilenamesList)
    for name in jpgFilenamesList[:2000]:
        img = Image.open(name)
        # asarray() class is used to convert
        # PIL images into NumPy arrays
        numpydata = asarray(img)
        #  shape
        # print(numpydata.shape)
        numpydata = (numpydata / 127.5) - 1
        array.append(numpydata)

In [None]:
turnimagetonumpyA("../input/cityscapedata/dataset/datasets/cityscapes/trainA",train_horses)      
turnimagetonumpyB("../input/cityscapedata/dataset/datasets/cityscapes/trainB",train_zebras)

In [None]:
turnimagetonumpyA("../input/cityscapedata/dataset/datasets/cityscapes/testA",test_horses)
turnimagetonumpyB("../input/cityscapedata/dataset/datasets/cityscapes/testB",test_zebras)

In [None]:
def reshapearray(a):
    a = numpy.array(a)
    a = a.reshape([-1,1,256,256,3])
    print(a.shape)
    # a = tf.convert_to_tensor(a, dtype=tf.float32)
    return a

In [None]:
train_horses = reshapearray(train_horses)
train_zebras = reshapearray(train_zebras)

In [None]:
test_horses = reshapearray(test_horses)
test_zebras = reshapearray(test_zebras)

In [None]:
plt.subplot(121)
plt.title('Horse')
plt.imshow(train_horses[0][0] * 0.5 + 0.5)

In [None]:
plt.subplot(121)
plt.title('Zebra')
plt.imshow(train_zebras[0][0] * 0.5 + 0.5)

In [None]:
# Define the loss functions to be used for discrimiator
# This should be (fake_loss - real_loss)
# We will add the gradient penalty later to this loss function
def discriminator_loss(real, fake):
    real_loss = tf.math.reduce_mean(real)
    fake_loss = tf.math.reduce_mean(fake)
    return fake_loss - real_loss

In [None]:
# Define the loss functions to be used for generator
def generator_loss(fake,X_BackB,X_realB,X_BackA,X_realA):
    return -tf.math.reduce_mean(fake) + (10 * (tf.math.reduce_mean(tf.math.abs(X_BackB - X_realB)) + tf.math.reduce_mean(tf.math.abs(X_BackA - X_realA))))

In [None]:
# define the discriminator model
def define_discriminator(image_shape):
    # weight initialization
    init = RandomNormal(stddev=0.02)
    # source image input
    in_image = Input(shape=image_shape)
    # C64
    d = Conv2D(64, (4,4), strides=(2,2), padding='same', kernel_initializer=init)(in_image)
    d = LeakyReLU(alpha=0.2)(d)
    # C128
    d = Conv2D(128, (4,4), strides=(2,2), padding='same', kernel_initializer=init)(d)
    d = InstanceNormalization(axis=-1)(d)
    d = LeakyReLU(alpha=0.2)(d)
    # C256
    d = Conv2D(256, (4,4), strides=(2,2), padding='same', kernel_initializer=init)(d)
    d = InstanceNormalization(axis=-1)(d)
    d = LeakyReLU(alpha=0.2)(d)
    # C512
    d = Conv2D(512, (4,4), strides=(2,2), padding='same', kernel_initializer=init)(d)
    d = InstanceNormalization(axis=-1)(d)
    d = LeakyReLU(alpha=0.2)(d)
    # patch output
    patch_out = Conv2D(1, (4,4), padding='same', kernel_initializer=init)(d)
    patch_out = Flatten()(patch_out)
    patch_out = Dense(1)(patch_out)
    # define model
    model = Model(in_image, patch_out)
    # compile model
    # model.compile(loss=wasserstein_loss, optimizer=RMSprop(lr=0.00005))
    return model


In [None]:
# define image shape
image_shape = (IMG_HEIGHT,IMG_WIDTH,3)
# create the model
model = define_discriminator(image_shape)
# summarize the model
model.summary()

In [None]:
# generator a resnet block
def resnet_block(n_features, input_layer):
    # weight initialization
    init = RandomNormal(stddev=0.02)
    # first layer convolutional layer
    g = Conv2D(n_features, (3,3), padding='same', kernel_initializer=init)(input_layer)
    g = InstanceNormalization(axis=-1)(g)
    g = Activation('relu')(g)
    # second convolutional layer
    g = Conv2D(n_features, (3,3), padding='same', kernel_initializer=init)(g)
    g = InstanceNormalization(axis=-1)(g)
    # concatenate merge channel-wise with input layer
    g = g + input_layer
    return g

In [None]:
# define the standalone generator model
def define_generator(image_shape=(256,256,3), n_resnet=6):
    # weight initialization
    init = RandomNormal(stddev=0.02)
    # image input
    in_image = Input(shape=image_shape)
    # c7s1-64
    g = Conv2D(64, (7,7), padding='same', kernel_initializer=init)(in_image)
    g = InstanceNormalization(axis=-1)(g)
    g = Activation('relu')(g)
    # d128
    g = Conv2D(128, (3,3), strides=(2,2), padding='same', kernel_initializer=init)(g)
    g = InstanceNormalization(axis=-1)(g)
    g = Activation('relu')(g)
    # d256
    g = Conv2D(256, (3,3), strides=(2,2), padding='same', kernel_initializer=init)(g)
    g = InstanceNormalization(axis=-1)(g)
    g = Activation('relu')(g)
    # R256
    for _ in range(n_resnet):
        g = resnet_block(256, g)
    # u128
    g = Conv2DTranspose(128, (3,3), strides=(2,2), padding='same', kernel_initializer=init)(g)
    g = InstanceNormalization(axis=-1)(g)
    g = Activation('relu')(g)
    # u64
    g = Conv2DTranspose(64, (3,3), strides=(2,2), padding='same', kernel_initializer=init)(g)
    g = InstanceNormalization(axis=-1)(g)
    g = Activation('relu')(g)
    # c7s1-3
    g = Conv2D(3, (7,7), padding='same', kernel_initializer=init)(g)
    g = InstanceNormalization(axis=-1)(g)
    out_image = Activation('tanh')(g)
    # define model
    model = Model(in_image, out_image)
    return model

In [None]:
model = define_generator()
# summarize the model
model.summary()

In [None]:
# input shape
image_shape = (256,256,3)
# generator: A -> B
g_model_AtoB = define_generator(image_shape)
# generator: B -> A
g_model_BtoA = define_generator(image_shape)
# discriminator: A -> [real/fake]
d_model_A = define_discriminator(image_shape)
# discriminator: B -> [real/fake]
d_model_B = define_discriminator(image_shape)
# composite: A -> B -> [real/fake, A]
# c_model_AtoB = define_composite_model(g_model_AtoB, d_model_B, g_model_BtoA, image_shape)
# composite: B -> A -> [real/fake, B]
# c_model_BtoA = define_composite_model(g_model_BtoA, d_model_A, g_model_AtoB, image_shape)

In [None]:
class WGAN(Model):
    def __init__(
        self,
        d_model_A,
        d_model_B,
        g_model_AtoB,
        g_model_BtoA,
        discriminator_extra_steps=3,
        gp_weight=10.0,
    ):
        super(WGAN, self).__init__()
        self.d_model_A = d_model_A
        self.d_model_B = d_model_B
        self.g_model_AtoB = g_model_AtoB
        self.g_model_BtoA = g_model_BtoA
        self.d_steps = discriminator_extra_steps
        self.gp_weight = gp_weight

    def compile(self, da_optimizer, db_optimizer, ga_optimizer, gb_optimizer, d_loss_fn, g_loss_fn):
        super(WGAN, self).compile()
        self.da_optimizer = da_optimizer
        self.db_optimizer = db_optimizer
        self.ga_optimizer = ga_optimizer
        self.gb_optimizer = gb_optimizer
        self.d_loss_fn = d_loss_fn
        self.g_loss_fn = g_loss_fn

    def gradient_penalty_A(self, batch_size, real, fake):
        alpha = tf.random.normal([batch_size, 1, 1, 1], 0.0, 1.0)
        diff = fake - real
        interpolated = real + alpha * diff
        with tf.GradientTape() as gp_tape:
            gp_tape.watch(interpolated)
            pred = self.d_model_A(interpolated, training=True)
        grads = gp_tape.gradient(pred, [interpolated])[0]
        norm = tf.math.sqrt(tf.math.reduce_sum(tf.math.square(grads), axis=[1, 2, 3]))
        gp = tf.math.reduce_mean((norm - 1.0) ** 2)
        return gp

    def train(self, b_size=1, n_epoch=20, total=1000, n_batch=1, n_patch=1):

        # Get the batch size
        batch_size = b_size
        bat_per_epo = total
        n_steps = bat_per_epo * n_epoch
        
        trainA, trainB = train_horses, train_zebras
        
        # 1. Train the generator and get the generator loss
        # 2. Train the discriminator and get the discriminator loss
        # 3. Calculate the gradient penalty
        # 4. Multiply this gradient penalty with a constant weight factor
        # 5. Add gradient penalty to the discriminator loss
        # 6. Return generator and discriminator losses as a loss dictionary.

        for j in range(n_steps): 
            for i in range(self.d_steps):
                # select a batch of real samples
                X_realA, y_realA = generate_real_samples(trainA, n_batch, n_patch)
                X_realB, y_realB = generate_real_samples(trainB, n_batch, n_patch)
                # generate a batch of fake samples
                # X_fakeB, y_fakeB = generate_fake_samples(g_model_AtoB, X_realA, n_patch)
                # X_fakeA, y_fakeA = generate_fake_samples(g_model_BtoA, X_realB, n_patch)
                # update fakes from pool
                # X_fakeA = update_image_pool(poolA, X_fakeA)
                # X_fakeB = update_image_pool(poolB, X_fakeB)

                with tf.GradientTape() as tape:
                    X_fakeA = self.g_model_BtoA(X_realB, training=True)
                    real_logits = self.d_model_A(X_realA, training=True)
                    fake_logits = self.d_model_A(X_fakeA, training=True)

                    # Calculate discriminator loss using fake and real logits
                    d_cost = self.d_loss_fn(real=real_logits, fake=fake_logits)
                    # Calculate the gradient penalty
                    gp = self.gradient_penalty_A(n_batch, X_realA, X_fakeA)
                    # Add the gradient penalty to the original discriminator loss
                    d_loss = d_cost + gp * self.gp_weight

                # Get the gradients w.r.t the discriminator loss
                d_gradient = tape.gradient(d_loss, self.d_model_A.trainable_variables)
                # Update the weights of the discriminator using the discriminator optimizer
                self.da_optimizer.apply_gradients(
                  zip(d_gradient, self.d_model_A.trainable_variables)
                )
                
            X_realA, y_realA = generate_real_samples(trainA, n_batch, n_patch)
            X_realB, y_realB = generate_real_samples(trainB, n_batch, n_patch)
            with tf.GradientTape() as tape:
                X_fakeA = self.g_model_BtoA(X_realB, training=True)
                fake_logits = self.d_model_A(X_fakeA, training=True)
                X_BackB = self.g_model_AtoB(X_fakeA, training=False)
                X_fakeB = self.g_model_AtoB(X_realA, training=False)
                X_BackA = self.g_model_BtoA(X_fakeB, training=True)
                g_loss = self.g_loss_fn(fake_logits,X_BackB,X_realB,X_BackA,X_realA)
                
            # Get the gradients w.r.t the generator loss
            gen_gradient = tape.gradient(g_loss, self.g_model_BtoA.trainable_variables)
            # Update the weights of the generator using the generator optimizer
            self.gb_optimizer.apply_gradients(
                zip(gen_gradient, self.g_model_BtoA.trainable_variables)
            )
            
            for i in range(self.d_steps):
                # select a batch of real samples
                X_realA, y_realA = generate_real_samples(trainA, n_batch, n_patch)
                X_realB, y_realB = generate_real_samples(trainB, n_batch, n_patch)
                # generate a batch of fake samples
                # X_fakeB, y_fakeB = generate_fake_samples(g_model_AtoB, X_realA, n_patch)
                # X_fakeA, y_fakeA = generate_fake_samples(g_model_BtoA, X_realB, n_patch)
                # update fakes from pool
                # X_fakeA = update_image_pool(poolA, X_fakeA)
                # X_fakeB = update_image_pool(poolB, X_fakeB)

                with tf.GradientTape() as tape:
                    X_fakeB = self.g_model_AtoB(X_realA, training=True)
                    real_logits = self.d_model_B(X_realB, training=True)
                    fake_logits = self.d_model_B(X_fakeB, training=True)

                    # Calculate discriminator loss using fake and real logits
                    d_cost = self.d_loss_fn(real=real_logits, fake=fake_logits)
                    # Calculate the gradient penalty
                    gp = self.gradient_penalty_A(n_batch, X_realB, X_fakeB)
                    # Add the gradient penalty to the original discriminator loss
                    d_loss = d_cost + gp * self.gp_weight

                # Get the gradients w.r.t the discriminator loss
                d_gradient = tape.gradient(d_loss, self.d_model_B.trainable_variables)
                # Update the weights of the discriminator using the discriminator optimizer
                self.db_optimizer.apply_gradients(
                  zip(d_gradient, self.d_model_B.trainable_variables)
                )
                
            X_realA, y_realA = generate_real_samples(trainA, n_batch, n_patch)
            X_realB, y_realB = generate_real_samples(trainB, n_batch, n_patch)
            with tf.GradientTape() as tape:
                X_fakeB = self.g_model_AtoB(X_realA, training=True)
                fake_logits = self.d_model_B(X_fakeB, training=True)
                X_BackA = self.g_model_BtoA(X_fakeB, training=False)
                X_fakeA = self.g_model_BtoA(X_realB, training=False)
                X_BackB = self.g_model_AtoB(X_fakeA, training=True)
                g_loss = self.g_loss_fn(fake_logits,X_BackB,X_realB,X_BackA,X_realA)
                
            # Get the gradients w.r.t the generator loss
            gen_gradient = tape.gradient(g_loss, self.g_model_AtoB.trainable_variables)
            # Update the weights of the generator using the generator optimizer
            self.ga_optimizer.apply_gradients(
                zip(gen_gradient, self.g_model_AtoB.trainable_variables)
            )
            
            d_hist.append(float(d_loss.numpy()))
            g_hist.append(float(g_loss.numpy()))
            # plot_history(d_hist,g_hist,j+1) 
            if (j+1) % 500 == 0:
                ckpt_save_path = ckpt_manager.save()
                print("d_loss: %f , g_loss: %f" %(d_loss, g_loss))
                save_error()

In [None]:
d_hist, g_hist = list(), list()

In [None]:
with open("derror.txt", "r") as fp:  
    d_hist = json.load(fp)
with open("gerror.txt", "r") as fp:  
    g_hist = json.load(fp)

In [None]:
def save_error():
    with open("derror.txt", "w") as fp:  
        json.dump(d_hist, fp)
    with open("gerror.txt", "w") as fp:  
        json.dump(g_hist, fp)
    plt.plot(d_hist, label='crit')
    plt.plot(g_hist, label='gen')
    plt.legend()
    name = '/content/gdrive/My Drive/WGAN/plot_line_plot_loss.png'
    plt.savefig(name)

In [None]:
def generate_real_samples(dataset, n_samples, patch_shape):
    # choose random instances
    ix = random.randint(0, dataset.shape[0]-1)
    # retrieve selected images
    X = dataset[ix]
    # generate 'real' class labels (1)
    y = -numpy.ones((n_samples, patch_shape, patch_shape, 1))
    return X, y

In [None]:
# generate a batch of images, returns images and targets
def generate_fake_samples(g_model, dataset, patch_shape):
    # generate fake instance
    X = g_model.predict(dataset)
    # create 'fake' class labels (0)
    y = numpy.ones((len(X), patch_shape, patch_shape, 1))
    return X, y

In [None]:
import os
os.chdir(r'/kaggle/input/cityscapedata/results (3)')

In [None]:
os.chdir(r'/kaggle/working')

In [None]:
!ls

In [None]:
ga_optimizer = Adam(
    learning_rate=0.0002, beta_1=0.5, beta_2=0.9
)
gb_optimizer = Adam(
    learning_rate=0.0002, beta_1=0.5, beta_2=0.9
)
da_optimizer = Adam(
    learning_rate=0.0002, beta_1=0.5, beta_2=0.9
)
db_optimizer = Adam(
    learning_rate=0.0002, beta_1=0.5, beta_2=0.9
)


In [None]:
wgan = WGAN(
        d_model_A = d_model_A,
        d_model_B = d_model_B,
        g_model_AtoB = g_model_AtoB,
        g_model_BtoA = g_model_BtoA
)

In [None]:
wgan.compile(
        da_optimizer = da_optimizer,
        db_optimizer = db_optimizer,
        ga_optimizer = ga_optimizer,
        gb_optimizer = gb_optimizer,
        d_loss_fn = discriminator_loss,
        g_loss_fn = generator_loss
)

In [None]:
checkpoint_path = "pretained"

ckpt = tf.train.Checkpoint(wgan)

ckpt_manager = tf.train.CheckpointManager(ckpt, checkpoint_path, max_to_keep=5)

In [None]:
# if a checkpoint exists, restore the latest checkpoint.
if ckpt_manager.latest_checkpoint:
    ckpt.restore(ckpt_manager.latest_checkpoint)
    print ('Latest checkpoint restored!!')

In [None]:
ckpt_save_path = ckpt_manager.save()

In [None]:
wgan.train()

In [None]:
print(d_hist)
print(g_hist)

In [None]:
def generate_images(model, test_input):
    prediction = model.predict(test_input)
    
    plt.figure(figsize=(12, 12))

    display_list = [test_input[0], prediction[0]]
    title = ['Input Image', 'Predicted Image']

    for i in range(2):
        plt.subplot(1, 2, i+1)
        plt.title(title[i])
        # getting the pixel values between [0, 1] to plot it.
        plt.imshow(display_list[i] * 0.5 + 0.5)
        plt.axis('off')
    plt.show()

In [None]:
for i in range(5):
    X_realA, _ = generate_real_samples(train_zebras, 1, 16)
    generate_images(g_model_BtoA, X_realA)

In [None]:
for i in range(5):
    X_realA, _ = generate_real_samples(train_horses, 1, 16)
    generate_images(g_model_AtoB, X_realA)

In [None]:
for i in range(5):
    X_realA, _ = generate_real_samples(test_zebras, 1, 16)
    generate_images(g_model_BtoA, X_realA)

In [None]:
for i in range(5):
    X_realA, _ = generate_real_samples(test_horses, 1, 16)
    generate_images(g_model_AtoB, X_realA)

In [None]:
temp = test_horses
temp = temp.reshape([-1,256,256,3])
ans = g_model_AtoB.predict(temp)

In [None]:
realans = test_zebras.reshape([-1,256,256,3])

In [None]:
tans = (ans + 1) * 127.5
trealans = (realans + 1) * 127.5
# print(ans)
# print(realans[0])

In [None]:
tans = numpy.around(tans).astype(numpy.uint8)
trealans = numpy.around(trealans).astype(numpy.uint8)

In [None]:
colormap = numpy.zeros((19, 3), dtype=numpy.uint8)
colormap[0] = [128, 64, 128]
colormap[1] = [244, 35, 232]
colormap[2] = [70, 70, 70]
colormap[3] = [102, 102, 156]
colormap[4] = [190, 153, 153]
colormap[5] = [153, 153, 153]
colormap[6] = [250, 170, 30]
colormap[7] = [220, 220, 0]
colormap[8] = [107, 142, 35]
colormap[9] = [152, 251, 152]
colormap[10] = [70, 130, 180]
colormap[11] = [220, 20, 60]
colormap[12] = [255, 0, 0]
colormap[13] = [0, 0, 142]
colormap[14] = [0, 0, 70]
colormap[15] = [0, 60, 100]
colormap[16] = [0, 80, 100]
colormap[17] = [0, 0, 230]
colormap[18] = [119, 11, 32]

In [None]:
def findclosestindex(colormap,temp):
    value = 255 * 3
    temp = numpy.tile(temp,(19,1))
    dif = colormap - temp
    dif = numpy.absolute(dif)
    t = dif.sum(axis=1)
    result = numpy.where(t == numpy.amin(t))
    return result[0][0]

In [None]:
def colortomap(t,colormap):
    answer = []
    e = t.reshape([-1,3])
    for x in e:
        value = findclosestindex(colormap,x)
        answer.append(value)
    return numpy.array(answer)

In [None]:
tans = colortomap(tans,colormap)
tans = tans.reshape([500,256,256])
print(tans.shape)
trealans = colortomap(trealans,colormap)
trealans = trealans.reshape([500,256,256])
print(trealans.shape)

In [None]:
def evaluate(test,answer):
    mean_class = []
    total = answer.size
    matches = test == answer
    print(matches.shape)
    print(matches.sum(),total)
    for x in range(19):
        x = numpy.repeat(x,total)
        x = x.reshape(-1,256,256)
        ematches = x == answer
        class_sum = matches.sum()
        testmatch = x == test
        testmatch = numpy.logical_and(testmatch, ematches)
        mean_class.append(testmatch.sum() / class_sum)
    return matches.sum() / total , mean_class

In [None]:
pixel_acc, class_acc = evaluate(tans,trealans)
print(pixel_acc)
print(class_acc)