# Pix2Pix Network, An Image-To-Image Translation Using Conditional GANs (CGANs)

Download the dataset from the following link 
http://efrosgans.eecs.berkeley.edu/pix2pix/datasets/cityscapes.tar.gz

Import the modules.

In [1]:
import os
import sys
import random
import numpy as np
from PIL import Image
import keras.backend as K
from copy import deepcopy
from random import randint
from keras import initializers
import matplotlib.pyplot as plt
from keras.utils import plot_model
from keras.layers.core import Activation
from keras.models import Sequential, Model
from keras.optimizers import Adam, SGD, Nadam,Adamax
from keras.layers.advanced_activations import LeakyReLU
from keras.layers import  Flatten, Dropout, Lambda, Concatenate
from keras.layers import Dense, Reshape, Input, BatchNormalization, Concatenate
from keras.layers.convolutional import UpSampling2D, Convolution2D, MaxPooling2D,Deconvolution2D

Using TensorFlow backend.


The generator uses the U-NET architecture.

In [2]:
class Generator(object):
    def __init__(self, width = 28, height= 28, channels = 1):
        
        self.W = width
        self.H = height
        self.C = channels
        self.SHAPE = (width,height,channels)

        self.Generator = self.model()
        self.OPTIMIZER = Adam(lr=2e-4, beta_1=0.5,decay=1e-5)
        self.Generator.compile(loss='binary_crossentropy', optimizer=self.OPTIMIZER,metrics=['accuracy'])

        self.save_model()
        self.summary()

    def model(self):
        input_layer = Input(shape=self.SHAPE)
        
        down_1 = Convolution2D(64  , kernel_size=4, strides=2, padding='same',activation=LeakyReLU(alpha=0.2))(input_layer)

        down_2 = Convolution2D(64*2, kernel_size=4, strides=2, padding='same',activation=LeakyReLU(alpha=0.2))(down_1)
        norm_2 = BatchNormalization()(down_2)

        down_3 = Convolution2D(64*4, kernel_size=4, strides=2, padding='same',activation=LeakyReLU(alpha=0.2))(norm_2)
        norm_3 = BatchNormalization()(down_3)

        down_4 = Convolution2D(64*8, kernel_size=4, strides=2, padding='same',activation=LeakyReLU(alpha=0.2))(norm_3)
        norm_4 = BatchNormalization()(down_4)

        down_5 = Convolution2D(64*8, kernel_size=4, strides=2, padding='same',activation=LeakyReLU(alpha=0.2))(norm_4)
        norm_5 = BatchNormalization()(down_5)

        down_6 = Convolution2D(64*8, kernel_size=4, strides=2, padding='same',activation=LeakyReLU(alpha=0.2))(norm_5)
        norm_6 = BatchNormalization()(down_6)

        down_7 = Convolution2D(64*8, kernel_size=4, strides=2, padding='same',activation=LeakyReLU(alpha=0.2))(norm_6)
        norm_7 = BatchNormalization()(down_7)


        upsample_1 = UpSampling2D(size=2)(norm_7)
        up_conv_1 = Convolution2D(64*8, kernel_size=4, strides=1, padding='same',activation='relu')(upsample_1)
        norm_up_1 = BatchNormalization(momentum=0.8)(up_conv_1)
        add_skip_1 = Concatenate()([norm_up_1,norm_6])


        upsample_2 = UpSampling2D(size=2)(add_skip_1)
        up_conv_2 = Convolution2D(64*8, kernel_size=4, strides=1, padding='same',activation='relu')(upsample_2)
        norm_up_2 = BatchNormalization(momentum=0.8)(up_conv_2)
        add_skip_2 = Concatenate()([norm_up_2,norm_5])

        upsample_3 = UpSampling2D(size=2)(add_skip_2)
        up_conv_3 = Convolution2D(64*8, kernel_size=4, strides=1, padding='same',activation='relu')(upsample_3)
        norm_up_3 = BatchNormalization(momentum=0.8)(up_conv_3)
        add_skip_3 = Concatenate()([norm_up_3,norm_4])


        upsample_4 = UpSampling2D(size=2)(add_skip_3)
        up_conv_4 = Convolution2D(64*4, kernel_size=4, strides=1, padding='same',activation='relu')(upsample_4)
        norm_up_4 = BatchNormalization(momentum=0.8)(up_conv_4)
        add_skip_4 = Concatenate()([norm_up_4,norm_3])

        upsample_5 = UpSampling2D(size=2)(add_skip_4)
        up_conv_5 = Convolution2D(64*2, kernel_size=4, strides=1, padding='same',activation='relu')(upsample_5)
        norm_up_5 = BatchNormalization(momentum=0.8)(up_conv_5)
        add_skip_5 = Concatenate()([norm_up_5,norm_2])

        upsample_6 = UpSampling2D(size=2)(add_skip_5)
        up_conv_6 = Convolution2D(64, kernel_size=4, strides=1, padding='same',activation='relu')(upsample_6)
        norm_up_6 = BatchNormalization(momentum=0.8)(up_conv_6)
        add_skip_6 = Concatenate()([norm_up_6,down_1])

        last_upsample = UpSampling2D(size=2)(add_skip_6)
        output_layer = Convolution2D(self.C, kernel_size=4, strides=1, padding='same',activation='tanh')(last_upsample)
        
        return Model(input_layer,output_layer)

    def summary(self):
        return self.Generator.summary()

    def save_model(self):
        plot_model(self.Generator, to_file='./data/out/Generator_Model.png')

Discriminator network.

In [3]:
class Discriminator(object):
    def __init__(self, width = 28, height= 28, channels = 1, starting_filters=64):
        self.W = width
        self.H = height
        self.C = channels
        self.CAPACITY = width*height*channels
        self.SHAPE = (width,height,channels)
        self.FS = starting_filters #FilterStart
        
        self.Discriminator = self.model()
        self.OPTIMIZER = Adam(lr=2e-4, beta_1=0.5,decay=1e-5)
        self.Discriminator.compile(loss='mse', optimizer=self.OPTIMIZER, metrics=['accuracy'] )

        self.save_model()
        self.summary()

    def model(self):


        input_A = Input(shape=self.SHAPE)
        input_B = Input(shape=self.SHAPE)
        input_layer = Concatenate(axis=-1)([input_A, input_B])

        up_layer_1 = Convolution2D(self.FS, kernel_size=4, strides=2, padding='same',activation=LeakyReLU(alpha=0.2))(input_layer)

        up_layer_2 = Convolution2D(self.FS*2, kernel_size=4, strides=2, padding='same',activation=LeakyReLU(alpha=0.2))(up_layer_1)
        leaky_layer_2 =  BatchNormalization(momentum=0.8)(up_layer_2)

        up_layer_3 = Convolution2D(self.FS*4, kernel_size=4, strides=2, padding='same',activation=LeakyReLU(alpha=0.2))(leaky_layer_2)
        leaky_layer_3 =  BatchNormalization(momentum=0.8)(up_layer_3)

        up_layer_4 = Convolution2D(self.FS*8, kernel_size=4, strides=2, padding='same',activation=LeakyReLU(alpha=0.2))(leaky_layer_3)
        leaky_layer_4 = BatchNormalization(momentum=0.8)(up_layer_4)

        output_layer = Convolution2D(1, kernel_size=4, strides=1, padding='same')(leaky_layer_4)
        
        return Model([input_A, input_B],output_layer)

    def summary(self):
        return self.Discriminator.summary()

    def save_model(self):
        plot_model(self.Discriminator, to_file='./data/out/Discriminator_Model.png')

GAN Network

In [4]:
class GAN(object):
    def __init__(self, model_inputs=[],model_outputs=[]):
        self.OPTIMIZER = SGD(lr=2e-4,nesterov=True)

        self.inputs = model_inputs
        self.outputs = model_outputs
        self.gan_model = Model(inputs = self.inputs, outputs = self.outputs)
        self.OPTIMIZER = Adam(lr=2e-4, beta_1=0.5)
        self.gan_model.compile(loss=['mse', 'mae'],
                            loss_weights=[  1, 100],
                            optimizer=self.OPTIMIZER)
        self.save_model()
        self.summary()

    def model(self):
        model = Model()
        return model

    def summary(self):
        return self.gan_model.summary()

    def save_model(self):
        plot_model(self.gan_model, to_file='./data/out/GAN_Model.png')

Trainer class for training

In [5]:
class Trainer:
    def __init__(self, height = 256, width = 256, channels=3, epochs = 50000, batch = 1, checkpoint = 50, train_data_path = '',test_data_path=''):
        self.EPOCHS = epochs
        self.BATCH = batch
        self.H = height
        self.W = width
        self.C = channels
        self.CHECKPOINT = checkpoint

        self.X_train_B, self.X_train_A = self.load_data(train_data_path)
        self.X_test_B, self.X_test_A  = self.load_data(test_data_path)


        self.generator = Generator(height=self.H, width=self.W, channels=self.C)

        self.orig_A = Input(shape=(self.W, self.H, self.C))
        self.orig_B = Input(shape=(self.W, self.H, self.C))

        self.fake_A = self.generator.Generator(self.orig_B)

        self.discriminator = Discriminator(height=self.H, width=self.W, channels=self.C)
        self.discriminator.trainable = False
        self.valid = self.discriminator.Discriminator([self.fake_A,self.orig_B])

        model_inputs  = [self.orig_A,self.orig_B]
        model_outputs = [self.valid, self.fake_A]
        self.gan = GAN(model_inputs=model_inputs,model_outputs=model_outputs)
        
        

    def load_data(self,data_path):
        listOFFiles = self.grabListOfFiles(data_path,extension="jpg")
        imgs_temp = np.array(self.grabArrayOfImages(listOFFiles))
        imgs_A = []
        imgs_B = []
        for img in imgs_temp:
            imgs_A.append(img[:,:self.H])
            imgs_B.append(img[:,self.H:])

        imgs_A_out = self.norm_and_expand(np.array(imgs_A))
        imgs_B_out = self.norm_and_expand(np.array(imgs_B))

        return imgs_A_out, imgs_B_out

    def norm_and_expand(self,arr):
        arr = (arr.astype(np.float32) - 127.5)/127.5
        normed = np.expand_dims(arr, axis=3)
        return normed

    def grabListOfFiles(self,startingDirectory,extension=".webp"):
        listOfFiles = []
        for file in os.listdir(startingDirectory):
            if file.endswith(extension):
                listOfFiles.append(os.path.join(startingDirectory, file))
        return listOfFiles

    def grabArrayOfImages(self,listOfFiles,gray=False):
        imageArr = []
        for f in listOfFiles:
            if gray:
                im = Image.open(f).convert("L")
            else:
                im = Image.open(f).convert("RGB")
            imData = np.asarray(im)
            imageArr.append(imData)
        return imageArr


    def train(self):
        for e in range(self.EPOCHS):
            b = 0
            X_train_A_temp = deepcopy(self.X_train_A)
            X_train_B_temp = deepcopy(self.X_train_B)

            number_of_batches = len(self.X_train_A)
        
            for b in range(number_of_batches):
                # Train Discriminator
                # Grab Real Images for this training batch
                starting_ind = randint(0, (len(X_train_A_temp)-1))
                real_images_raw_A = X_train_A_temp[ starting_ind : (starting_ind + 1) ]
                real_images_raw_B = X_train_B_temp[ starting_ind : (starting_ind + 1) ]

                # Delete the images used until we have none left
                X_train_A_temp = np.delete(X_train_A_temp,range(starting_ind,(starting_ind + 1)),0)
                X_train_B_temp = np.delete(X_train_B_temp,range(starting_ind,(starting_ind + 1)),0)

                batch_A = real_images_raw_A.reshape( 1, self.W, self.H, self.C )
                batch_B = real_images_raw_B.reshape( 1, self.W, self.H, self.C )

                # PatchGAN
                y_valid = np.ones((1,)+(int(self.W / 2**4), int(self.W / 2**4), 1))
                y_fake = np.zeros((1,)+(int(self.W / 2**4), int(self.W / 2**4), 1))

                fake_A = self.generator.Generator.predict(batch_B)

                # Now, train the discriminator with this batch of reals
                discriminator_loss_real = self.discriminator.Discriminator.train_on_batch([batch_A,batch_B],y_valid)[0]
                discriminator_loss_fake = self.discriminator.Discriminator.train_on_batch([fake_A,batch_B],y_fake)[0]
                full_loss = 0.5 * np.add(discriminator_loss_real, discriminator_loss_fake)

                generator_loss = self.gan.gan_model.train_on_batch([batch_A, batch_B],[y_valid,batch_A])    

                print ('Batch: '+str(int(b))+', [Full Discriminator :: Loss: '+str(full_loss)+'], [ Generator :: Loss: '+str(generator_loss)+']')
                if b % self.CHECKPOINT == 0 :
                    label = str(e)+'_'+str(b)
                    self.plot_checkpoint(label)

            print ('Epoch: '+str(int(e))+', [Full Discriminator :: Loss:'+str(full_loss)+'], [ Generator :: Loss: '+str(generator_loss)+']')
                        
        return

    def plot_checkpoint(self,b):
        orig_filename = "./data/out/batch_check_"+str(b)+"_original.png"

        r, c = 3, 3
        random_inds = random.sample(range(len(self.X_test_A)),3)
        imgs_A = self.X_test_A[random_inds].reshape(3, self.W, self.H, self.C )
        imgs_B = self.X_test_B[random_inds].reshape( 3, self.W, self.H, self.C )
        fake_A = self.generator.Generator.predict(imgs_B)

        gen_imgs = np.concatenate([imgs_B, fake_A, imgs_A])

        # Rescale images 0 - 1
        gen_imgs = 0.5 * gen_imgs + 0.5

        titles = ['Style', 'Generated', 'Original']
        fig, axs = plt.subplots(r, c)
        cnt = 0
        for i in range(r):
            for j in range(c):
                axs[i,j].imshow(gen_imgs[cnt])
                axs[i, j].set_title(titles[i])
                axs[i,j].axis('off')
                cnt += 1
        fig.savefig("./data/out/batch_check_"+str(b)+".png")
        plt.close('all')

        return

In [7]:
#discriminator = Discriminator(256, 256, 3)
#generator = Generator(256, 256, 3)

Create instance of Trainer, with EPOCHS value 1 and BATCH of 1

Please create one sub-dir "./data/out"

In [8]:
HEIGHT  = 256
WIDTH   = 256
CHANNELS = 3
#EPOCHS = 5
EPOCHS = 1
BATCH = 1
CHECKPOINT = 100
TRAIN_PATH = "./data/cityscapes/train/"
TEST_PATH = "./data/cityscapes/val/"

trainer = Trainer(height=HEIGHT,width=WIDTH, channels=CHANNELS,epochs =EPOCHS,\
                 batch=BATCH,\
                 checkpoint=CHECKPOINT,\
                 train_data_path=TRAIN_PATH,\
                 test_data_path=TEST_PATH)


Model: "model_3"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_3 (InputLayer)            (None, 256, 256, 3)  0                                            
__________________________________________________________________________________________________
conv2d_29 (Conv2D)              (None, 128, 128, 64) 3136        input_3[0][0]                    
__________________________________________________________________________________________________
conv2d_30 (Conv2D)              (None, 64, 64, 128)  131200      conv2d_29[0][0]                  
__________________________________________________________________________________________________
batch_normalization_25 (BatchNo (None, 64, 64, 128)  512         conv2d_30[0][0]                  
____________________________________________________________________________________________

Start training

In [9]:
trainer.train()

869171143], [ Generator :: Loss: [11.590365, 0.33055636, 0.11259809]]
Batch: 2812, [Full Discriminator :: Loss: 0.03965310379862785], [ Generator :: Loss: [16.495077, 0.7892751, 0.15705803]]
Batch: 2813, [Full Discriminator :: Loss: 0.10953105986118317], [ Generator :: Loss: [15.221852, 0.06703171, 0.1515482]]
Batch: 2814, [Full Discriminator :: Loss: 0.11493577808141708], [ Generator :: Loss: [13.0082, 0.11810427, 0.12890096]]
Batch: 2815, [Full Discriminator :: Loss: 0.050147704780101776], [ Generator :: Loss: [15.0349, 0.2057295, 0.1482917]]
Batch: 2816, [Full Discriminator :: Loss: 0.15607288479804993], [ Generator :: Loss: [14.233681, 0.11194132, 0.1412174]]
Batch: 2817, [Full Discriminator :: Loss: 0.17060954868793488], [ Generator :: Loss: [16.78407, 0.19171524, 0.16592354]]
Batch: 2818, [Full Discriminator :: Loss: 0.0814598873257637], [ Generator :: Loss: [14.515349, 0.07273714, 0.14442612]]
Batch: 2819, [Full Discriminator :: Loss: 0.06290876120328903], [ Generator :: Loss: [