%tensorflow_version 1.x
Import all the required modules


In [1]:
# %tensorflow_version 1.x

# %matplotlib inline
import tensorflow as tf
from tensorflow import keras
import numpy as np
import matplotlib.pyplot as plt
import PIL
import os
import cv2
from PIL import Image
from keras import layers
from keras.layers import Conv2D, BatchNormalization, Dense, Flatten, Input, UpSampling2D
from keras.layers import Activation
from keras.layers.advanced_activations import LeakyReLU, PReLU
from keras.models import Model
from keras.layers import add
from keras.losses import BinaryCrossentropy
from keras.applications.vgg19 import VGG19
from keras.optimizers import Adam
from keras.models import Model
import keras.backend as K
from tqdm import tqdm

Using TensorFlow backend.


 Overall Network Layout


In [2]:
#Residual block
def res_block_gen(model, kernel_size, filters, strides):
    res_gen = model
    model = Conv2D(filters=filters,
                   kernel_size=kernel_size,
                   strides=strides,
                   padding='same')(model)
    model = BatchNormalization(momentum=0.5)(model)

    model = PReLU(alpha_initializer="zeros",
                  alpha_regularizer=None,
                  alpha_constraint=None,
                  shared_axes=[1, 2])(model)

    model = Conv2D(filters=filters,
                   kernel_size=kernel_size,
                   strides=strides,
                   padding='same')(model)
    model = BatchNormalization()(model)

    model = add([res_gen, model])
    return model

In [3]:
#up_scaling_block
def up_scale(model, kernel_size, filters, strides):

    model = Conv2D(filters=filters,
                   kernel_size=kernel_size,
                   strides=strides,
                   padding='same')(model)
    model = UpSampling2D(size=2)(model)
    model = PReLU()(model)

    return model

In [4]:
#Discriminator block
def disc(model, filters, kernel_size, strides):
    model = Conv2D(filters=filters,
                   kernel_size=kernel_size,
                   strides=strides,
                   padding='same')(model)
    model = BatchNormalization()(model)
    model = LeakyReLU(alpha=0.2)(model)

    return model

In [5]:
# Discriminator Network
class dis():
    def __init__(self, image_shape):
        self.image_shape = image_shape
        print('self.image_shape')

    def discriminator(self):
        dis_input = Input(self.image_shape)
        print(dis_input)

        model = Conv2D(filters=64, kernel_size=3, strides=1,
                       padding='same')(dis_input)
        model = Activation('relu')(model)

        model = disc(model, 64, 3, 2)
        model = disc(model, 128, 3, 1)


class gen(object):
    def __init__(self, noise_shape):
        self.noise_shape = noise_shape

    def generator(self):

        gen_input = Input(self.noise_shape)

        model = Conv2D(filters=64, kernel_size=9, strides=1,
                       padding='same')(gen_input)
        model = PReLU(alpha_initializer='zeros',
                      alpha_regularizer=None,
                      alpha_constraint=None,
                      shared_axes=[1, 2])(model)
        gen_model = model

        for index in range(16):
            model = res_block_gen(model, 3, 64, 1)

        model = Conv2D(filters=64, kernel_size=3, strides=1,
                       padding="same")(model)
        model = BatchNormalization(momentum=0.5)(model)
        model = add([gen_model, model])

        # Using 2 UpSampling
        for index in range(2):
            model = up_scale(model, 3, 256, 1)
        # print('model',model)

        model = Conv2D(filters=3, kernel_size=9, strides=1,
                       padding='same')(model)
        model = Activation('tanh')(model)  #modification

        # print('model',model)

        generator = Model(inputs=gen_input, outputs=model)
        # print('generator',generator)
        return generator

        model = disc(model, 128, 3, 2)
        model = disc(model, 256, 3, 1)
        model = disc(model, 256, 3, 2)
        model = disc(model, 512, 3, 1)
        model = disc(model, 512, 3, 2)

        model = Flatten()(model)
        model = Dense(1024)(model)
        model = LeakyReLU(alpha=0.2)(model)

        model = Dense(1)(model)
        model = Activation('sigmoid')(model)

        discriminator = Model(input=dis_input, output=model)
        return discriminator

In [8]:
# Discriminator Network
class dis():
    def __init__(self, image_shape):
        self.image_shape = image_shape
        print('self.image_shape')

    def discriminator(self):
        dis_input = Input(self.image_shape)
        print(dis_input)

        model = Conv2D(filters=64, kernel_size=3, strides=1,
                       padding='same')(dis_input)
        model = Activation('relu')(model)

        model = disc(model, 64, 3, 2)
        model = disc(model, 128, 3, 1)
        model = disc(model, 128, 3, 2)
        model = disc(model, 256, 3, 1)
        model = disc(model, 256, 3, 2)
        model = disc(model, 512, 3, 1)
        model = disc(model, 512, 3, 2)

        model = Flatten()(model)
        model = Dense(1024)(model)
        model = LeakyReLU(alpha=0.2)(model)

        model = Dense(1)(model)
        model = Activation('sigmoid')(model)

        discriminator = Model(input=dis_input, output=model)
        return discriminator

Data Preprocessing

In [13]:
# #Mount the drive
# from google.colab import drive
# drive.mount('/content/drive')

In [8]:
train_path = "./data/train/"
test_path = "./data/test/"

In [9]:
trainfile_name = os.listdir(train_path)
print(trainfile_name)
num = len(trainfile_name)
print("total train images:", num)
testfile_name = os.listdir(test_path)
num = len(testfile_name)
print("total test images:", num)

['dt_train_1.jpg', 'dt_train_10.jpg', 'dt_train_11.jpg', 'dt_train_12.jpg', 'dt_train_13.jpg', 'dt_train_14.jpg', 'dt_train_15.jpg', 'dt_train_16.jpg', 'dt_train_17.jpg', 'dt_train_18.jpg', 'dt_train_19.jpg', 'dt_train_2.jpg', 'dt_train_20.jpg', 'dt_train_21.jpg', 'dt_train_22.jpg', 'dt_train_23.jpg', 'dt_train_24.jpg', 'dt_train_25.jpg', 'dt_train_26.jpg', 'dt_train_27.jpg', 'dt_train_28.jpg', 'dt_train_29.jpg', 'dt_train_3.jpg', 'dt_train_30.jpg', 'dt_train_31.jpg', 'dt_train_32.jpg', 'dt_train_33.jpg', 'dt_train_34.jpg', 'dt_train_35.jpg', 'dt_train_36.jpg', 'dt_train_37.jpg', 'dt_train_38.jpg', 'dt_train_39.jpg', 'dt_train_4.jpg', 'dt_train_40.jpg', 'dt_train_41.jpg', 'dt_train_42.jpg', 'dt_train_43.jpg', 'dt_train_44.jpg', 'dt_train_45.jpg', 'dt_train_46.jpg', 'dt_train_47.jpg', 'dt_train_48.jpg', 'dt_train_49.jpg', 'dt_train_5.jpg', 'dt_train_50.jpg', 'dt_train_6.jpg', 'dt_train_7.jpg', 'dt_train_8.jpg', 'dt_train_9.jpg']
total train images: 50
total test images: 10


In [11]:
#load the train_data from drive
def load_train_data ():
    train_images = []
    test_images = []
    
    train_path ="./data/train/"
    test_path ="./data/test/" 
    train_data = os.listdir(train_path)
    test_data = os.listdir(test_path)
    
    for sample in train_data:
        img_path = train_path + sample
        train_hr_images = cv2.imread(img_path)
        train_images.append(train_hr_images)
  #  print(train_hr_images)   
   return train_images
  



IndentationError: unindent does not match any outer indentation level (<tokenize>, line 16)

In [None]:

def hr_images(images):
  # images_hr = tf.keras.preprocessing.image.img_to_array(images)  
  images_hr = np.array(images)
  return images_hr


In [None]:
def lr_images(real_images, downscale):
   images =[]
   for img in range(len(real_images)):
        images.append(np.array(PIL.Image.fromarray(real_images[img]).resize([real_images[img].shape[0]//downscale, 
        real_images[img].shape[1]//downscale],resample=PIL.Image.BICUBIC)))
   images_lr = np.array(images)
    
    
   return images_lr 

In [None]:
def normalize(input_data):
    input_data = np.divide(input_data.astype(np.float32),127.5) - np.ones_like(input_data, dtype=np.float32)
    # print("ii",input_data)
    return input_data

In [None]:
def denormalize(input_data):
    input_data = input_data*127.5+127.5
    return input_data.astype(np.uint8)

In [None]:
def resize_images(img):

  img_size = 192
  resized_images =[]
  for i in range(len(img)):
    resized_images.append(np.array(PIL.Image.fromarray(img[i]).resize([img_size, img_size],resample=PIL.Image.BICUBIC)))
        
  resized_images = np.array(resized_images)
  return resized_images


In [None]:
def train_data_load():
  x_train = load_train_data()

  # conversion of image into the numpy array
  x_train_hr = hr_images(x_train)
  x_train_hr_resized =resize_images(x_train_hr)
  x_train_hr = normalize(x_train_hr_resized) #normailze the array
  
  #conversion of hr images into lr_images and into the numpy array
  x_train_lr = lr_images(x_train_hr_resized, 4)
  # x_train_lr_resized =resize_images(x_train_lr)

  x_train_lr = normalize(x_train_lr) #array normalization


  return x_train_hr , x_train_lr 
  
  


In [None]:

def load_test_data ():
   test_images = []
    
   test_path ="/content/drive/My Drive/Colab Notebooks/project/dataset-upload-to-kaggle/finished/valid/dataraw/hires/"  
   test_data = os.listdir(test_path)
    
   for sample in test_data:
        img_path = test_path + sample
        test_hr_images = cv2.imread(img_path)
        test_images.append(test_hr_images)
      
   return test_images
  



In [None]:
def load_testing_data():
  x_test = load_test_data()

  # conversion of image into the numpy array
  x_test_hr = hr_images(x_test)
  x_test_hr_resized =resize_images(x_test_hr)

  x_test_hr = normalize(x_test_hr_resized) #normailze the array
  
  #conversion of hr images into lr_images and into the numpy array
  x_test_lr = lr_images(x_test_hr_resized, 4)
  # x_test_lr_resized =(x_test_lr)

  x_test_lr = normalize(x_test_lr) #array normalization


  return x_test_hr , x_test_lr
  


In [None]:
def plot_generated_images(epoch,generator,x_test_hr,x_test_lr ,examples=3 , dim=(1, 3), figsize=(15, 5)):
    
    rand_nums = np.random.randint(0, x_test_hr.shape[0], size=examples)
    image_batch_hr = denormalize(x_test_hr[rand_nums])
    image_batch_lr = x_test_lr[rand_nums]
    gen_img = generator.predict(image_batch_lr)
    generated_image = denormalize(gen_img)
    image_batch_lr = denormalize(image_batch_lr)
    
    #generated_image = deprocess_HR(generator.predict(image_batch_lr))
    
    plt.figure(figsize=figsize)
    
    plt.subplot(dim[0], dim[1], 1)
    plt.imshow(image_batch_lr[1], interpolation='nearest')
    plt.axis('off')
        
    plt.subplot(dim[0], dim[1], 2)
    plt.imshow(generated_image[1], interpolation='nearest')
    plt.axis('off')
    
    plt.subplot(dim[0], dim[1], 3)
    plt.imshow(image_batch_hr[1], interpolation='nearest')
    plt.axis('off')
    
    plt.tight_layout()
    plt.savefig('output/gan_generated_image_epoch_%d.png' % epoch)
    


Loss Function


In [None]:
#loss calculation ------- 
class VGG_LOSS(object):

    # image_shape =(64,64,3)
    def __init__(self,image_shape):
        self.image_shape = image_shape
    
    def vgg_loss(self,y_true,y_pred):
        loss = VGG19(include_top = False,weights = 'imagenet',input_shape = self.image_shape)
                    
        loss.trainable = False
        for l in loss.layers:
          l.trainable = False
        model = Model(inputs= loss.input,outputs= loss.get_layer('block5_conv4').output)
        model.trainable= False

        return K.mean(K.square(model(y_true)-model(y_pred)))
        
    

In [None]:
def optimizer():
    adam = Adam(learning_rate = 0.0001, beta_1 = 0.9, beta_2 = 0.999,epsilon = 1e-7)
    return adam

Training 


In [None]:
# downscale_factor = 4
# image_shape =(192,192,3) #need to be modified 

In [None]:
# Overall gan network
def gan_network(discriminator,shape, generator, optimizer,vgg_loss):
    gan_input= Input(shape=shape)
    discriminator.trainable = False

    image_sr = generator(gan_input)
    
    
    gan_output = discriminator(image_sr)
    gan = Model(inputs = [gan_input],outputs = [gan_output])
    gan.compile(loss =[vgg_loss, "binary_crossentropy"],loss_weights =[1., 1e-3] , optimizer = optimizer)
    print(gan.summary())
    

In [None]:
np.random.seed(10)

downscale_factor = 4

image_shape =(192,192,3) #need to be modified 

def train(epochs, batch_size):
   x_train_hr, x_train_lr = train_data_load()             #load data 
   loss = VGG_LOSS(image_shape)

   batch_count = int(x_train_hr.shape[0]/batch_size)
   shape = (image_shape[0] // downscale_factor, image_shape[1] // downscale_factor, image_shape[2]//downscale_factor)
   generator = gen(shape).generator()
   discriminator = dis(image_shape).discriminator()
  
   optimizer = optimizer()

   generator.compile(loss=loss.vgg_loss,optimizer = optimizer )
   discriminator.compile(loss ='binary_crossentropy',optimizer= optimizer)

   gan =gan_network(discriminator,shape,generator, optimizer,loss.vgg_loss)


   for e in range(1,epochs+1):
     print('Epoch{}/{}'.format(e+1,epochs+1))
     for _ in tqdm(range(batch_count)):
    #  generates random integer between 0 and x_train_hr.shape[0] of the size batch_size
       rand_num = np.random.randint(0,x_train_hr.shape[0],size=batch_size)

       image_batch_hr = x_train_hr[rand_num]
       image_batch_lr = x_train_lr[rand_num]

       generated_images_sr = generator.predict(image_batch_lr)

       real_data_Y = np.ones(batch_size) - np.random.random_sample(batch_size)*0.2
       fake_data_y = np.random.random_sample(batch_size)*0.2

       discriminator.trainable = True 

      # calculate discriminator loss

       d_loss_real = discriminator.train_on_batch(image_batch_hr, real_data_Y)
       d_loss_fake = discriminator.train_on_batch(generated_images_sr, fake_data_y)
       dis_loss = 0.5*np.add(d_loss_real,d_loss_fake)

    #  calculate gan loss
       rand_num = np.random.randint(0,x_train_hr.shape[0],size=batch_size)

     
       image_batch_hr = x_train_hr[rand_num]
       image_batch_lr = x_train_lr[rand_num]

       gan_Y = np.random.random_sample(batch_size)*0.2
       discriminator.trainable = False

       gan_loss = gan.train_on_batch(image_batch_lr,[])

     print("discriminator_loss: %f", dis_loss)
     print("gan_loss", gan_loss)
    # gan_loss = str(gan_loss)

     if e ==1 or e % 5 ==0:
       preprocessing.plot_generated_images(e,generator,x_test_hr,x_test_lr)
     if e % 300 == 0:
       generator.save("/content/drive/My Drive/Colab Notebooks/project/Output/gen_model%d.h5" %e)
       discriminator.save("/content/drive/My Drive/Colab Notebooks/project/Output/dis_model%d.h5" %e)
       gan.save("/content/drive/My Drive/Colab Notebooks/project/Output/gan_model%d.h5" %e)


In [None]:
train(200,4)