In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
import os
print(os.listdir())

['.config', 'drive', 'sample_data']


In [3]:
import tensorflow as tf
import os,sys
import numpy as np
import random

random.seed(1)
np.random.seed(1)
tf.random.set_seed(2)

import numpy as np
np.random.seed(10)

import os, keras, numpy,tensorflow
import matplotlib.pyplot as plt
from numpy import *
from numpy.random import *
from tensorflow.keras.optimizers import Adam
from keras.models import Model
from keras.layers import *
from tensorflow.keras import initializers

from keras.layers import *
from keras.layers import Dense
from keras.layers.core import Activation
from keras.initializers import RandomNormal
from keras.layers.convolutional import UpSampling2D
from keras.layers.core import Flatten
from keras.layers import Input
from keras.layers.convolutional import Conv2D, Conv2DTranspose, UpSampling2D
from keras.models import Model
from keras.layers import add
from keras.layers import Conv2D, Conv2DTranspose, BatchNormalization

In [4]:
import tensorflow as tf
device_name = tf.test.gpu_device_name()
with tf.device(device_name):
    print(device_name.split(":")[1]," running . . . ")

GPU  running . . . 


In [5]:
# strategy = tf.distribute.MirroredStrategy()
# print("Number of Accelerator: ",strategy.num_replicas_in_sync)

# # batch_size = 256
# # epoch = 1

# # BATCH_SIZE =  batch_size*strategy.num_replicas_in_sync
# # print("Global Batch Size: ",BATCH_SIZE)

# with strategy.scope():
#   # model = create_model()
#   print("Ready To Go !")

In [6]:
def Discriminator(input_shape_in = (128, 128, 3), target_shape_in = (128, 128, 3)):
    
    # input
    input_shape = Input(shape = input_shape_in)
    input_target = Input(shape = target_shape_in)
    
    concate_one = Concatenate()([input_shape, input_target])
    
    # C64
    model = Conv2D(64, (4,4), strides=(2,2), padding='same', kernel_initializer=RandomNormal(stddev=0.02))(concate_one)
    model = LeakyReLU(alpha=0.2)(model)
    
    # C128
    model = Conv2D(128, (4,4), strides=(2,2), padding='same', kernel_initializer=RandomNormal(stddev=0.02))(model)
    model = BatchNormalization()(model)
    model = LeakyReLU(alpha=0.2)(model)
    
    # C256
    model = Conv2D(256, (4,4), strides=(2,2), padding='same', kernel_initializer=RandomNormal(stddev=0.02))(model)
    model = BatchNormalization()(model)
    model = LeakyReLU(alpha=0.2)(model)
    
    # C512
    model = Conv2D(256, (4,4), strides=(2,2), padding='same', kernel_initializer=RandomNormal(stddev=0.02))(model)
    model = BatchNormalization()(model)
    model = LeakyReLU(alpha=0.2)(model)

    # C512
    model = Conv2D(512, (4,4), strides=(2,2), padding='same', kernel_initializer=RandomNormal(stddev=0.02))(model)
    model = BatchNormalization()(model)
    model = LeakyReLU(alpha=0.2)(model)
    
    # second last output layer
    model = Conv2D(512, (4,4), padding='same', kernel_initializer=RandomNormal(stddev=0.02))(model)
    model = BatchNormalization()(model)
    model = LeakyReLU(alpha=0.2)(model)
    
    # patch output
    model = Conv2D(1, (4,4), padding='same', kernel_initializer=RandomNormal(stddev=0.02))(model)
    model = Activation('tanh')(model)
    
    # define model
    discriminator_model = Model(inputs = [input_shape, input_target], outputs = model)
    
    # compile model
    opt = Adam(learning_rate=0.0004, beta_1=0.5)
    discriminator_model.compile(loss='binary_crossentropy', optimizer=opt, loss_weights=[0.5])
        
    return discriminator_model

In [7]:
a = Discriminator()
print(a.summary())

Model: "model"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_1 (InputLayer)           [(None, 128, 128, 3  0           []                               
                                )]                                                                
                                                                                                  
 input_2 (InputLayer)           [(None, 128, 128, 3  0           []                               
                                )]                                                                
                                                                                                  
 concatenate (Concatenate)      (None, 128, 128, 6)  0           ['input_1[0][0]',                
                                                                  'input_2[0][0]']            

In [8]:
# plot the discriminator model
# tf.keras.utils.plot_model(a,show_shapes=True,
#                           show_dtype=True,
#                           show_layer_names=True,
#                           show_layer_activations=True)

In [9]:
# define an encoder block
def define_encoder_block(layer_in, n_filters, batchnorm=True):
    g = Conv2D(n_filters, (3,3), strides=(2,2), padding='same', kernel_initializer=RandomNormal(stddev=0.02))(layer_in)
    if batchnorm:
        g = BatchNormalization()(g, training=True)
    g = LeakyReLU(alpha=0.2)(g)
    
    return g


In [10]:
# define a decoder block
def decoder_block(layer_in, skip_in, n_filters, dropout=True):
    # g = Conv2DTranspose(n_filters, (3,3), strides=(2,2), padding='same', kernel_initializer=RandomNormal(stddev=0.02))(layer_in)
    g = UpSampling2D(size=(2,2))(layer_in)
    g = Conv2D(n_filters, (1,1), padding='same', kernel_initializer=RandomNormal(stddev=0.02))(g)
    g = BatchNormalization()(g, training=True)
    if dropout:
        g = Dropout(0.5)(g, training=True)
    g = Concatenate()([g, skip_in])
    g = Activation('relu')(g)
    
    return g

In [11]:
# define the standalone generator model
def Generator(image_shape=(128,128,3)):
   
    # image input
    in_image = Input(shape=image_shape)
    
    # encoder model
    e1 = define_encoder_block(in_image, 64, batchnorm=False)
    e2 = define_encoder_block(e1, 128)
    e3 = define_encoder_block(e2, 256)
    e4 = define_encoder_block(e3, 512)
    e5 = define_encoder_block(e4, 512)
    e6 = define_encoder_block(e5, 512)
    
    # bottleneck, no batch norm and relu
    b = Conv2D(512, (4,4), strides=(2,2), padding='same', kernel_initializer=RandomNormal(stddev=0.02))(e6)
    b = Activation('relu')(b)
    
    # decoder model
    d2 = decoder_block(b, e6, 512)
    d3 = decoder_block(d2, e5, 512)
    d4 = decoder_block(d3, e4, 512, dropout=False)
    d5 = decoder_block(d4, e3, 256, dropout=False)
    d6 = decoder_block(d5, e2, 128, dropout=False)
    d7 = decoder_block(d6, e1, 64, dropout=False)
    
    # output
    # g = Conv2DTranspose(3, (3,3), strides=(2,2), padding='same', kernel_initializer=RandomNormal(stddev=0.02))(d7)
    g = UpSampling2D(size=(2,2))(d7)
    g = Conv2D(64, (1,1), padding='same', kernel_initializer=RandomNormal(stddev=0.02))(g)

    g = Conv2D(3, (4,4), padding='same', kernel_initializer=RandomNormal(stddev=0.02))(g)
    out_image = Activation('tanh')(g)
    
    # model
    model = Model(in_image, out_image)
    
    return model

In [12]:
b = Generator()
print(b.summary())

Model: "model_1"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_3 (InputLayer)           [(None, 128, 128, 3  0           []                               
                                )]                                                                
                                                                                                  
 conv2d_7 (Conv2D)              (None, 64, 64, 64)   1792        ['input_3[0][0]']                
                                                                                                  
 leaky_re_lu_6 (LeakyReLU)      (None, 64, 64, 64)   0           ['conv2d_7[0][0]']               
                                                                                                  
 conv2d_8 (Conv2D)              (None, 32, 32, 128)  73856       ['leaky_re_lu_6[0][0]']    

In [13]:
# plot the discriminator model
# tf.keras.utils.plot_model(b,show_shapes=True,
#                           show_dtype=True,
#                           show_layer_names=True,
#                           show_layer_activations=True)

In [14]:
# define the combined generator and discriminator model, for updating the generator
def GAN(g_model, d_model, image_shape = (128, 128, 3)):
    
    # make weights in the discriminator not trainable
    for layer in d_model.layers:
        if not isinstance(layer, BatchNormalization):
            layer.trainable = False
    
   
    in_src = Input(shape=image_shape)
   
    gen_out = g_model(in_src)
    
    dis_out = d_model([in_src, gen_out])
    
    model = Model(in_src, [dis_out, gen_out])
    
    # compile model
    opt = Adam(learning_rate=0.0004, beta_1=0.5)
    model.compile(loss=['binary_crossentropy', 'mae'], optimizer=opt, loss_weights=[1,100])
    
    return model

In [15]:
# c = GAN(b, a)
# print(c.summary())

In [16]:
# plot the discriminator model
# tf.keras.utils.plot_model(c,show_shapes=True,
#                           show_dtype=True,
#                           show_layer_names=True,
#                           show_layer_activations=True)

In [17]:
def show_prediction(bw_images, rgb_images, predicted_rgb, epoch, n, save=False):
    
  plt.rcParams["figure.figsize"] = (8,8)
  for i in range(n):
    plt.suptitle("Iteration "+str(epoch),size=8.5)
      
    plt.subplot(331)
    plt.imshow(bw_images[i], cmap='gray')
    plt.title("RGB Image",size=8.5)
    plt.axis('off')

    plt.subplot(332)
    plt.imshow(rgb_images[i])
    plt.title("Original Segmented Image",size=8.5)
    plt.axis('off')

    plt.subplot(333)
    plt.imshow(predicted_rgb[i])
    plt.title("Predicted Segmented Image",size=8.5)
    plt.axis('off')

    if(save==True):
      plt.savefig("/content/drive/MyDrive/Image Segmentation/Stage 2/Image_Outputs/Iteration "+str(epoch))
    
    # plt.show()
    
# plot data
# save_plot(k[:16])

In [18]:
def load_data():
    data = np.load("/content/drive/MyDrive/Image Segmentation/Datasets/street_data_128.npz")
    rgb, bw = data['a'], data['b']
    rgb, bw = np.array(rgb), np.array(bw)
    rgb, bw = rgb.astype('float32'), bw.astype('float32')
    rgb = rgb.reshape((rgb.shape[0], rgb.shape[1], rgb.shape[2], 3))
    bw = bw.reshape((bw.shape[0], bw.shape[1], bw.shape[2], 3))
    # scale from [0,255] to [-1,1]
    rgb = (rgb - 127.5) / 127.5
    bw = (bw - 127.5) / 127.5
    return bw, rgb
    
# rgb, bw = load_data()
# print(rgb.shape, bw.shape)
# show_prediction(rgb, bw, bw, epoch=0, n=1, save=False)

In [19]:
def generate_real_samples(rgb_images, bw_images, n_samples, patch_size):
    ix = randint(0, rgb_images.shape[0], n_samples)
    rgb_images = rgb_images[ix]
    bw_images = bw_images[ix]
    y = ones((n_samples, patch_size, patch_size, 1))
    return [bw_images, rgb_images], y

# d = generate_real_samples(rgb, bw, 32, 8)
# print(np.array(d[0][0]).shape, np.array(d[0][1]).shape, np.array(d[1]).shape)
# d[1][0]

In [20]:
def generate_fake_samples(generator, bw_images, n_samples, patch_size):
    pred_rgb_images = generator.predict(bw_images,verbose=0)
    y = zeros((n_samples, patch_size, patch_size, 1))
    return pred_rgb_images, y

# with tf.device(device_name):
# kh = generate_fake_samples(b, 512, 3)
# print("shape of the generated images: ",kh[0][0].shape)

In [21]:
def summarize_the_model(generator, rgb_images, bw_images, epoch, n_samples=1, save=False):
    kk = 781
    bw_images = bw_images[kk]
    bw_images = bw_images.reshape((1, bw_images.shape[0], bw_images.shape[1], 3))
    rgb_images = rgb_images[kk]
    rgb_images = rgb_images.reshape((1, rgb_images.shape[0], rgb_images.shape[1], 3))
    predicted_rgb  = generator.predict(bw_images,verbose=0)
    # scale from [-1,1] to [0,1]
    # X = (X + 1) / 2.0
    bw_images = (bw_images + 1.0) / 2.0
    rgb_images = (rgb_images + 1.0) / 2.0
    predicted_rgb = (predicted_rgb + 1.0) / 2.0
    show_prediction(bw_images, rgb_images, predicted_rgb, epoch, n=n_samples, save=save)

In [22]:
def train(g_model, d_model, gan_model, dataset, n_epochs=3, n_batch=128):
    
    rgb_images, bw_images = dataset
    
    print("Data Size: ",rgb_images.shape)
    print("No. Of Epoch: ",n_epochs)
    bat_per_epo = int(rgb_images.shape[0] / n_batch)
    print("Batch Per Epoch: ", bat_per_epo)
    print("Full Batch: ",n_batch)
    print("*"*50,'\n\n')
    
    patch_size = d_model.output_shape[1]

    d_loss_real_array,d_loss_fake_array =[],[]
    g_loss_array = []
    for i in range(n_epochs):
        d_loss_r,d_loss_f = 0.0,0.0
        g_loss = 0.0

        for j in range(bat_per_epo):

            [bw_real, rgb_real], y_real = generate_real_samples(rgb_images, bw_images, n_batch, patch_size)
            d_loss1  = d_model.train_on_batch([bw_real, rgb_real], y_real)
            d_loss_r += (d_loss1/n_batch)
            # print("real_loss")

            pred_rgb, y_fake = generate_fake_samples(g_model, bw_real, n_batch, patch_size) # Trick 1 is Here
            d_loss2  = d_model.train_on_batch([bw_real, pred_rgb], y_fake)
            d_loss_f += (d_loss2/n_batch)
            # print("fake_loss")

            g_loss_1, _ , _ = gan_model.train_on_batch(bw_real, [y_real, rgb_real]) # Trick 2 is Here
            g_loss += (g_loss_1/n_batch)
            # print("gan_loss")

            d_loss_real_array.append(d_loss_r)
            d_loss_fake_array.append(d_loss_f)
            g_loss_array.append(g_loss)

            # print("\n")

        print('epoch -> [%d/%d], discriminator_loss_for_real_data = %f, discriminator_loss_for_fake_data = %f, generator_loss = %f\n' %(i+1, n_epochs, d_loss_r, d_loss_f, g_loss))
        epoch_k = i + 1
        if(epoch_k%100==0):
          summarize_the_model(g_model, rgb_images, bw_images, epoch_k, n_samples = 1, save=True)

        d_model_json = d_model.to_json()
        with open("/content/drive/MyDrive/Image Segmentation/Stage 2/PixToPixGAN_Discriminator_Model.json", "w") as json_file:
          json_file.write(d_model_json)

        g_model_json = g_model.to_json()
        with open("/content/drive/MyDrive/Image Segmentation/Stage 2/PixToPixGAN_Generator_Model.json", "w") as json_file:
          json_file.write(g_model_json)

        # d_model.save("/content/drive/MyDrive/Image Segmentation/Stage 2/PixToPixGAN_Discriminator_Model.h5")
        # g_model.save("/content/drive/MyDrive/Image Segmentation/Stage 2/PixToPixGAN_Generator_Model.h5")
        np.savez_compressed("/content/drive/MyDrive/Image Segmentation/Stage 2/PixToPixGAN_Loss_Record.npz", a=d_loss_real_array, b=d_loss_fake_array, c=g_loss_array)

        # print("\n")

    return d_loss_real_array, d_loss_fake_array, g_loss_array

In [None]:
with tf.device(device_name):

    n_epochs = 1500
    n_batch = 64
    
    d_model = Discriminator()
    g_model = Generator()
    gan_model = GAN(g_model, d_model)
    
    dataset = load_data()
    print('\nREADY TO GO !!!\n')

    d_loss_real_array, d_loss_fake_array, g_loss_array = train(g_model, d_model, gan_model, dataset, n_epochs, n_batch)


READY TO GO !!!

Data Size:  (2975, 128, 128, 3)
No. Of Epoch:  1500
Batch Per Epoch:  46
Full Batch:  64
************************************************** 


epoch -> [1/1500], discriminator_loss_for_real_data = 1.527608, discriminator_loss_for_fake_data = 0.998118, generator_loss = 23.480594

epoch -> [2/1500], discriminator_loss_for_real_data = 1.408846, discriminator_loss_for_fake_data = 0.899095, generator_loss = 23.731913

epoch -> [3/1500], discriminator_loss_for_real_data = 1.800645, discriminator_loss_for_fake_data = 1.567196, generator_loss = 19.358227

epoch -> [4/1500], discriminator_loss_for_real_data = 1.443317, discriminator_loss_for_fake_data = 1.024429, generator_loss = 19.143002

epoch -> [5/1500], discriminator_loss_for_real_data = 1.090233, discriminator_loss_for_fake_data = 0.989465, generator_loss = 19.481443

epoch -> [6/1500], discriminator_loss_for_real_data = 0.665266, discriminator_loss_for_fake_data = 1.126401, generator_loss = 17.234163

epoch -> [7/1500]

# Misc.