<a href="https://colab.research.google.com/github/KaiSan2001/DeepLearning_StudyLibrary/blob/main/SRGAN/SRGANModi.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
#Path Initialization#
import os, sys
from google.colab import drive
drive.mount('/content/drive')
sys.path.append('/content/drive/My Drive/Deeplearning/SRGAN')
os.chdir('/content/drive/MyDrive/Deeplearning/SRGAN/Keras-SRGAN')
os.getcwd()

#Library Initialization#
import tensorflow as tf
from tensorflow.keras.layers import Lambda
from skimage import data, io, filters
import numpy as np
from numpy import array
from numpy.random import randint
from PIL import Image
import cv2
import matplotlib.pyplot as plt
plt.switch_backend('agg')
from tensorflow.keras.applications.vgg19 import VGG19
import tensorflow.keras.backend as K
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.layers import (
    BatchNormalization, SeparableConv2D, MaxPooling2D, Activation, Flatten, Dropout, Dense, UpSampling2D, Input, Conv2D, Conv2DTranspose, LeakyReLU, PReLU, add
)
from tqdm import tqdm
import numpy as np
import argparse

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [2]:
#Modules Declaration#

# Residual block
def res_block_gen(model, kernal_size, filters, strides):
    gen = model
    model = Conv2D(filters = filters, kernel_size = kernal_size, strides = strides, padding = "same")(model)
    model = BatchNormalization(momentum = 0.5)(model)
    # Using Parametric ReLU
    model = PReLU(alpha_initializer='zeros', alpha_regularizer=None, alpha_constraint=None, shared_axes=[1,2])(model)
    model = Conv2D(filters = filters, kernel_size = kernal_size, strides = strides, padding = "same")(model)
    model = BatchNormalization(momentum = 0.5)(model)
    model = add([gen, model])
    return model    
    
def up_sampling_block(model, kernal_size, filters, strides):   
    # In place of Conv2D and UpSampling2D we can also use Conv2DTranspose (Both are used for Deconvolution)
    # Even we can have our own function for deconvolution (i.e one made in Utils.py)
    #model = Conv2DTranspose(filters = filters, kernel_size = kernal_size, strides = strides, padding = "same")(model)
    model = Conv2D(filters = filters, kernel_size = kernal_size, strides = strides, padding = "same")(model)
    model = UpSampling2D(size = 2)(model)
    model = LeakyReLU(alpha = 0.2)(model)   
    return model

def discriminator_block(model, filters, kernel_size, strides):    
    model = Conv2D(filters = filters, kernel_size = kernel_size, strides = strides, padding = "same")(model)
    model = BatchNormalization(momentum = 0.5)(model)
    model = LeakyReLU(alpha = 0.2)(model)   
    return model

# Network Architecture is same as given in Paper https://arxiv.org/pdf/1609.04802.pdf
class Generator(object):

    def __init__(self, noise_shape):
        
        self.noise_shape = noise_shape

    def generator(self):
        
	    gen_input = Input(shape = self.noise_shape)
	    
	    model = Conv2D(filters = 64, kernel_size = 9, strides = 1, padding = "same")(gen_input)
	    model = PReLU(alpha_initializer='zeros', alpha_regularizer=None, alpha_constraint=None, shared_axes=[1,2])(model)
	    
	    gen_model = model
        
        # Using 16 Residual Blocks
	    for index in range(16):
	        model = res_block_gen(model, 3, 64, 1)
	    
	    model = Conv2D(filters = 64, kernel_size = 3, strides = 1, padding = "same")(model)
	    model = BatchNormalization(momentum = 0.5)(model)
	    model = add([gen_model, model])
	    
	    # Using 2 UpSampling Blocks
	    for index in range(2):
	        model = up_sampling_block(model, 3, 256, 1)
	    
	    model = Conv2D(filters = 3, kernel_size = 9, strides = 1, padding = "same")(model)
	    model = Activation('tanh')(model)
	   
	    generator_model = Model(inputs = gen_input, outputs = model)
        
	    return generator_model

# Network Architecture is same as given in Paper https://arxiv.org/pdf/1609.04802.pdf
class Discriminator(object):

    def __init__(self, image_shape):
        
        self.image_shape = image_shape
    
    def discriminator(self):
        
        dis_input = Input(shape = self.image_shape)
        
        model = Conv2D(filters = 64, kernel_size = 3, strides = 1, padding = "same")(dis_input)
        model = LeakyReLU(alpha = 0.2)(model)
        
        model = discriminator_block(model, 64, 3, 2)
        model = discriminator_block(model, 128, 3, 1)
        model = discriminator_block(model, 128, 3, 2)
        model = discriminator_block(model, 256, 3, 1)
        model = discriminator_block(model, 256, 3, 2)
        model = discriminator_block(model, 512, 3, 1)
        model = discriminator_block(model, 512, 3, 2)
        
        model = Flatten()(model)
        model = Dense(1024)(model)
        model = LeakyReLU(alpha = 0.2)(model)
       
        model = Dense(1)(model)
        model = Activation('sigmoid')(model) 
        
        discriminator_model = Model(inputs = dis_input, outputs = model)
        
        return discriminator_model

class VGG_LOSS(object):

    def __init__(self, image_shape):
        
        self.image_shape = image_shape

    # computes VGG loss or content loss
    def vgg_loss(self, y_true, y_pred):
    
        vgg19 = VGG19(include_top=False, weights='imagenet', input_shape=self.image_shape)
        vgg19.trainable = False
        # Make trainable as False
        for l in vgg19.layers:
            l.trainable = False
        model = Model(inputs=vgg19.input, outputs=vgg19.get_layer('block5_conv4').output)
        model.trainable = False
    
        return K.mean(K.square(model(y_true) - model(y_pred)))

def get_optimizer():
 
    adam = Adam(lr=1E-4, beta_1=0.9, beta_2=0.999, epsilon=1e-08)
    return adam

# Subpixel Conv will upsample from (h, w, c) to (h/r, w/r, c/r^2)
def SubpixelConv2D(input_shape, scale=4):
    def subpixel_shape(input_shape):
        dims = [input_shape[0],input_shape[1] * scale,input_shape[2] * scale,int(input_shape[3] / (scale ** 2))]
        output_shape = tuple(dims)
        return output_shape
    
    def subpixel(x):
        return tf.depth_to_space(x, scale)
        
    return Lambda(subpixel, output_shape=subpixel_shape)
    
# Takes list of images and provide HR images in form of numpy array
def hr_images(images):
    images_hr = array(images)
    return images_hr

# Takes list of images and provide LR images in form of numpy array
def lr_images(images_real , downscale):
    
    images= []
    for img in  range(len(images_real)):
        #images.append(cv2.resize(images_real[img], [images_real[img].shape[0]//downscale,images_real[img].shape[1]//downscale], interpolation='bicubic', mode=None))
        im123=cv2.resize(images_real[img], (int(images_real[img].shape[0]//downscale),int(images_real[img].shape[1]//downscale)))
        images.append(im123)
    images_lr = array(images)
    return images_lr
    
def normalize(input_data):

    return (input_data.astype(np.float32) - 127.5)/127.5 
    
def denormalize(input_data):
    input_data = (input_data + 1) * 127.5
    return input_data.astype(np.uint8)
'''
def load_training_data(x_train,y_train,x_test,y_test):

    x_train_hr = hr_images(x_train)
    x_train_hr = normalize(x_train_hr)
    
    x_train_lr = lr_images(x_train, 4)
    x_train_lr = normalize(x_train_lr)
    
    x_test_hr = hr_images(x_test)
    x_test_hr = normalize(x_test_hr)
    
    x_test_lr = lr_images(x_test, 4)
    x_test_lr = normalize(x_test_lr)
    
    return x_train_lr, x_train_hr, x_test_lr, x_test_hr
    '''
def load_training_data(x_train,y_train,x_test,y_test):

    x_train_hr = y_train
    x_train_hr = normalize(x_train_hr)
    
    x_train_lr = x_train
    x_train_lr = normalize(x_train_lr)
    
    x_test_hr = y_test
    x_test_hr = normalize(x_test_hr)
    
    x_test_lr = x_test
    x_test_lr = normalize(x_test_lr)
    
    return x_train_lr, x_train_hr, x_test_lr, x_test_hr



# While training save generated image(in form LR, SR, HR)
# Save only one image as sample  
def plot_generated_images(output_dir, epoch, generator, x_test_hr, x_test_lr , dim=(1, 3), figsize=(15, 5)):
    print("In plot test generated images")
    examples = x_test_hr.shape[0]
    print(examples)
    value = randint(0, examples)
    image_batch_hr = denormalize(x_test_hr)
    image_batch_lr = x_test_lr
    gen_img = generator.predict(image_batch_lr)
    generated_image = denormalize(gen_img)
    image_batch_lr = denormalize(image_batch_lr)
    
    plt.figure(figsize=figsize)
    
    plt.subplot(dim[0], dim[1], 1)
    plt.imshow(image_batch_lr[value], interpolation='nearest')
    plt.axis('off')
        
    plt.subplot(dim[0], dim[1], 2)
    plt.imshow(generated_image[value], interpolation='nearest')
    plt.axis('off')
    
    plt.subplot(dim[0], dim[1], 3)
    plt.imshow(image_batch_hr[value], interpolation='nearest')
    plt.axis('off')
    
    plt.tight_layout()
    plt.savefig(output_dir + 'generated_image_%d.png' % epoch)

In [3]:
np.random.seed(10)
# Better to use downscale factor as 4
downscale_factor = 1
# Remember to change image shape if you are having different size of images
image_shape = (512,511,3) #(32,32,3)

# Combined network
def get_gan_network(discriminator, shape, generator, optimizer, vgg_loss):
    discriminator.trainable = False
    gan_input = Input(shape=shape)
    x = generator(gan_input)
    gan_output = discriminator(x) #Here I think there should be two inputs to discriminator x and another hr image
    # and like this only the discriminator model should also be modified
    gan = Model(inputs=gan_input, outputs=[x,gan_output])    #And why is there two outputs of the gan_model overall
    gan.compile(loss=[vgg_loss, "binary_crossentropy"],
                loss_weights=[1., 1e-3],
                optimizer=optimizer)
    gan.run_eagerly = True
    return gan

# default values for all parameters are given, if want defferent values you can give via commandline
# for more info use $python train.py -h
def train(epochs, batch_size, x_train,y_train,x_test,y_test, model_save_dir):
    
    x_train_lr, x_train_hr, x_test_lr, x_test_hr = load_training_data(x_train,y_train,x_test,y_test) 
    loss = VGG_LOSS(image_shape)  
    
    batch_count = int(x_train_hr.shape[0] / batch_size)
    shape = (image_shape[0]//downscale_factor, image_shape[1]//downscale_factor, image_shape[2])
    
    generator = Generator(shape).generator()
    discriminator = Discriminator(image_shape).discriminator()

    optimizer = get_optimizer()
    generator.compile(loss=loss.vgg_loss, optimizer=optimizer)
    discriminator.compile(loss="binary_crossentropy", optimizer=optimizer)
    
    gan = get_gan_network(discriminator, shape, generator, optimizer, loss.vgg_loss)
    #gan.build(shape)
    loss_file = open(model_save_dir + 'losses.txt' , 'w+')
    loss_file.close()

    for e in range(1, epochs+1):
        print ('-'*15, 'Epoch %d' % e, '-'*15)
        for _ in tqdm(range(batch_count)):
            
            rand_nums = np.random.randint(0, x_train_hr.shape[0], size=batch_size)
            
            image_batch_hr = x_train_hr[rand_nums]
           
            image_batch_lr = x_train_lr[rand_nums]
            print(image_batch_lr.shape)
            generated_images_sr = generator.predict(image_batch_lr)

            real_data_Y = np.ones(batch_size) - np.random.random_sample(batch_size)*0.2
            fake_data_Y = np.random.random_sample(batch_size)*0.2
            
            discriminator.trainable = True
            
            d_loss_real = discriminator.train_on_batch(image_batch_hr, real_data_Y)
            d_loss_fake = discriminator.train_on_batch(generated_images_sr, fake_data_Y)
            discriminator_loss = 0.5 * np.add(d_loss_fake, d_loss_real)
            
            rand_nums = np.random.randint(0, x_train_hr.shape[0], size=batch_size)
            image_batch_hr = x_train_hr[rand_nums]
            image_batch_lr = x_train_lr[rand_nums]

            gan_Y = np.ones(batch_size) - np.random.random_sample(batch_size)*0.2
            discriminator.trainable = False
            gan_loss = gan.train_on_batch(image_batch_lr, [image_batch_hr,gan_Y])
            
            
        print("discriminator_loss : %f" % discriminator_loss)
        print("gan_loss :", gan_loss)
        gan_loss = str(gan_loss)
        
        loss_file = open(model_save_dir + 'losses.txt' , 'a')
        loss_file.write('epoch%d : gan_loss = %s ; discriminator_loss = %f\n' %(e, gan_loss, discriminator_loss) )
        loss_file.close()

        if e == 1 or e % 5 == 0:
            plot_generated_images(model_save_dir, e, generator,  x_test_hr, x_test_lr)
        if e % 2 == 0:
            
            generator.save(model_save_dir + 'gen_model%d.h5' % e)       
            discriminator.save(model_save_dir + 'dis_model%d.h5' % e)
            print("Model saved")



In [7]:
train(1, 64, x_train,y_train,x_test,y_test, '/content/drive/My Drive/Deeplearning/SRGAN/Modify/')    

  super(Adam, self).__init__(name, **kwargs)




ValueError: ignored

In [None]:
from keras.datasets import cifar10                     #dataset importing 

(x_train, y_train), (x_test, y_test) = cifar10.load_data()
x_train=x_train[0:5000]
x_test=x_test[0:800]
print(x_test.shape)
print(y_train.shape)

(800, 32, 32, 3)
(50000, 1)


In [None]:
#Data Pre-cropping#
Path = '/content/drive/MyDrive/Deeplearning/SRGAN/Modify'
#GroundTruth Cropping#
os.chdir(Path+'/Cropped/GT')
GTs = sorted(os.listdir(Path+'/ImgLib/GT'))
nums1 = len(GTs)
for i, name1 in enumerate(GTs):
    fpath1 = Path+'/ImgLib/GT/' + name1
    image = Image.open(fpath1)
    image = np.array(image)
    image = image[100:612,:]
    image = Image.fromarray(image)

    image.save('%s'%name1)

#Input Cropping#
os.chdir(Path+'/Cropped/Input')
Inputs = sorted(os.listdir(Path+'/ImgLib/Input'))
nums1 = len(Inputs)
for i, name2 in enumerate(GTs):
    fpath2 = Path+'/ImgLib/Input/' + name2
    image = Image.open(fpath2)
    image = np.array(image)
    image = image[100:612,:]
    image = Image.fromarray(image)

    image.save('%s'%name2)

In [5]:
#Train Set Preparation#
TrainPath = '/content/drive/MyDrive/Deeplearning/SRGAN/Modify/Cropped'
Input_set = []
GT_set =[]

#Random Selection for Cross-Validation#
GTs = sorted(os.listdir(TrainPath+'/GT'))
for gt in GTs:
    #find the GT image
    fpath1 = TrainPath + '/GT/' + gt #Read in the file one by one
    gt_img = cv2.imread(fpath1, cv2.IMREAD_COLOR)
    #print(gt_img.shape)

    #find the corresponding Input image
    fpath2 = TrainPath +'/Input/' + gt
    Input_img = cv2.imread(fpath2, cv2.IMREAD_COLOR)

    #Storage
    GT_set.append(gt_img)
    Input_set.append(Input_img)
    
GT_set = np.array(GT_set, dtype=np.float32)
Input_set = np.array(Input_set, dtype=np.float32)

print(GT_set.shape)
print(Input_set.shape)

(100, 512, 511, 3)
(100, 512, 511, 3)


In [6]:
x_train=np.array(Input_set[0:80], dtype=np.float32)
x_test=np.array(Input_set[80:], dtype=np.float32)
y_train=np.array(GT_set[0:80], dtype=np.float32)
y_test=np.array(GT_set[80:], dtype=np.float32)
print(x_train.shape);print(x_test.shape)
print(y_train.shape);print(y_test.shape)

(80, 512, 511, 3)
(20, 512, 511, 3)
(80, 512, 511, 3)
(20, 512, 511, 3)


In [29]:
x_train_lr1, x_train_hr1, x_test_lr1, x_test_hr1 = load_training_data(x_train,y_train,x_test,y_test)
print(x_train_lr1.shape)
print(x_train_hr1.shape)

(80, 512, 511, 3)
(80, 512, 511, 3)
