In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
import os
print(os.listdir())

In [None]:
import tensorflow as tf
device_name = tf.test.gpu_device_name()
with tf.device(device_name):
    print(device_name.split(":")[1]," running . . . ")

In [None]:
import tensorflow as tf
import os,sys
import numpy as np
import random

random.seed(1)
np.random.seed(1)
tf.random.set_seed(2)

import numpy as np
np.random.seed(10)

import os, keras, numpy,tensorflow
import matplotlib.pyplot as plt
from numpy import *
from numpy.random import *
from tensorflow.keras.optimizers import Adam
from keras.models import Model
from keras.layers import *
from tensorflow.keras import initializers

from keras.layers import *
from keras.layers import Dense
from keras.layers.core import Activation
from keras.initializers import RandomNormal
from keras.layers.convolutional import UpSampling2D
from keras.layers.core import Flatten
from keras.layers import Input
from keras.layers.convolutional import Conv2D, Conv2DTranspose
from keras.models import Model
from keras.layers import add
from keras.layers import Conv2D, Conv2DTranspose, BatchNormalization

In [None]:
def Discriminator(input_shape_in = (16, 16, 3), target_shape_in = (128, 128, 3)):
    
    # input
    input_shape = Input(shape = input_shape_in)
    input_target = Input(shape = target_shape_in)
    
    # C64
    model = Conv2D(64, (4,4), strides=(2,2), padding='same', kernel_initializer=RandomNormal(stddev=0.02))(input_target)
    model = LeakyReLU(alpha=0.2)(model) # --> (64, 64)
    
    # C128
    model = Conv2D(64, (4,4), strides=(2,2), padding='same', kernel_initializer=RandomNormal(stddev=0.02))(model)
    model = BatchNormalization()(model)
    model = LeakyReLU(alpha=0.2)(model)  # --> (32, 32)
    
    # C256
    model = Conv2D(128, (4,4), strides=(2,2), padding='same', kernel_initializer=RandomNormal(stddev=0.02))(model)
    model = BatchNormalization()(model)
    model = LeakyReLU(alpha=0.2)(model)  # --> (16, 16)

    model = Concatenate()([model, input_shape])
    
    # C256
    model = Conv2D(128, (4,4), strides=(2,2), padding='same', kernel_initializer=RandomNormal(stddev=0.02))(model)
    model = BatchNormalization()(model)
    model = LeakyReLU(alpha=0.2)(model)  # --> (8, 8)

    # C512
    model = Conv2D(128, (4,4), strides=(2,2), padding='same', kernel_initializer=RandomNormal(stddev=0.02))(model)
    model = BatchNormalization()(model)
    model = LeakyReLU(alpha=0.2)(model)  # --> (4, 4)
    
    # second last output layer
    model = Conv2D(128, (4,4), padding='same', kernel_initializer=RandomNormal(stddev=0.02))(model)
    model = BatchNormalization()(model)
    model = LeakyReLU(alpha=0.2)(model)  # --> (4, 4)
    
    # patch output
    model = Conv2D(1, (4,4), padding='same', kernel_initializer=RandomNormal(stddev=0.02))(model)
    model = Activation('sigmoid')(model)  # --> (4, 4)
    
    # define model
    discriminator_model = Model(inputs = [input_shape, input_target], outputs = model)
    
    # compile model
    opt = Adam(learning_rate=0.0005, beta_1=0.5)
    discriminator_model.compile(loss='binary_crossentropy', optimizer=opt, loss_weights=[0.5])
        
    return discriminator_model

In [None]:
a = Discriminator()
print(a.summary())

In [None]:
# plot the discriminator model
tf.keras.utils.plot_model(a,show_shapes=True,
                          show_dtype=True,
                          show_layer_names=True,
                          show_layer_activations=True)

In [None]:
# define an encoder block
def encoder_block(layer_in, n_filters, batchnorm=True):
    g = Conv2D(n_filters, (4,4), strides=(2,2), padding='same', kernel_initializer=RandomNormal(stddev=0.02))(layer_in)
    if batchnorm:
        g = BatchNormalization()(g, training=True)
    g = LeakyReLU(alpha=0.2)(g)
    
    return g


In [None]:
# define a decoder block
def decoder_block(layer_in, skip_in, n_filters, dropout=True):
    g = Conv2DTranspose(n_filters, (4,4), strides=(2,2), padding='same', kernel_initializer=RandomNormal(stddev=0.02))(layer_in)
    g = BatchNormalization()(g, training=True)
    if dropout:
        g = Dropout(0.5)(g, training=True)
    g = Concatenate()([g, skip_in])
    g = Activation('relu')(g)
    
    return g

In [None]:
# define the standalone generator model
def Generator(image_shape=(16,16,3), latent_dim = 512):
   
    # image input
    in_image = Input(shape=image_shape)  # --> (16, 16)

    input_latent = Input(shape=(latent_dim,))
    input_img_1 = Dense(128*128*3)(input_latent)
    input_img_1 = Reshape((128, 128, 3))(input_img_1)  # --> (128, 128)
    
    # encoder model
    e1 = encoder_block(input_img_1, 64, batchnorm=False)  # --> (64, 64)
    e2 = encoder_block(e1, 64)   # --> (32, 32)
    e3 = encoder_block(e2, 64)   # --> (16, 16)
    e3 = Concatenate()([e3, in_image])  # --> (16, 16)
    e4 = encoder_block(e3, 128)   # --> (8, 8)
    e5 = encoder_block(e4, 128)   # --> (4, 4)
    e6 = encoder_block(e5, 512)   # --> (2, 2)
    
    # bottleneck, no batch norm and relu
    b = Conv2D(512, (4,4), strides=(2,2), padding='same', kernel_initializer=RandomNormal(stddev=0.02))(e6)
    b = Activation('relu')(b)   # --> (1, 1)
    
    # decoder model
    d2 = decoder_block(b, e6, 512)    # --> (2, 2)
    d3 = decoder_block(d2, e5, 128)   # --> (4, 4)
    d4 = decoder_block(d3, e4, 128)   # --> (8, 8)
    d5 = decoder_block(d4, e3, 64, dropout=False)   # --> (16, 16)
    d5 = Concatenate()([d5, in_image])
    d6 = decoder_block(d5, e2, 64, dropout=False)   # --> (32, 32)
    d7 = decoder_block(d6, e1, 64, dropout=False)    # --> (64, 64)
    
    # output
    g = Conv2DTranspose(3, (4,4), strides=(2,2), padding='same', kernel_initializer=RandomNormal(stddev=0.02))(d7)
    for _ in range(2):
      g = Conv2D(3, (2,2), padding='same', kernel_initializer=RandomNormal(stddev=0.02))(g)
      g = BatchNormalization()(g)
      g = Activation("relu")(g)
    out_image = Activation('tanh')(g)  # --> (128, 128)
    
    # model
    model = Model([in_image, input_latent], out_image)
    
    return model

In [None]:
b = Generator()
print(b.summary())

In [None]:
# plot the discriminator model
tf.keras.utils.plot_model(b,show_shapes=True,
                          show_dtype=True,
                          show_layer_names=True,
                          show_layer_activations=True)

In [None]:
# define the combined generator and discriminator model, for updating the generator
def GAN(g_model, d_model, image_shape = (16, 16, 3), latent_dim = 512):
    
    # make weights in the discriminator not trainable
    for layer in d_model.layers:
        if not isinstance(layer, BatchNormalization):
            layer.trainable = False
    
   
    in_src = Input(shape=image_shape)
    latent_input = Input(shape=(latent_dim,))
   
    gen_out = g_model([in_src, latent_input])
    
    dis_out = d_model([in_src, gen_out])
    
    model = Model([in_src, latent_input], [dis_out, gen_out])
    
    # compile model
    opt = Adam(learning_rate=0.0002, beta_1=0.5)
    model.compile(loss=['binary_crossentropy', 'mae'], optimizer=opt, loss_weights=[1,100])
    
    return model

In [None]:
c = GAN(b, a)
print(c.summary())

In [None]:
# plot the discriminator model
tf.keras.utils.plot_model(c,show_shapes=True,
                          show_dtype=True,
                          show_layer_names=True,
                          show_layer_activations=True)

In [None]:
def show_prediction(bw_images, rgb_images, predicted_rgb, epoch, n, save=False):
    
  plt.rcParams["figure.figsize"] = (9,9)
  for i in range(n):
    plt.suptitle("Epoch_"+str(epoch),size=16)
      
    plt.subplot(331)
    plt.imshow(bw_images[i], cmap='gray')
    plt.title("LR RGB Image")
    plt.axis('off')

    plt.subplot(332)
    plt.imshow(rgb_images[i])
    plt.title("Original RGB Image")
    plt.axis('off')

    plt.subplot(333)
    plt.imshow(predicted_rgb[i])
    plt.title("SR RGB Image")
    plt.axis('off')

    if(save==True):
      plt.savefig("/content/drive/MyDrive/SRGAN/Model 16 Two/Predicted Outputs/Epoch_"+str(epoch))
    
    # plt.show()
    
# plot data
# save_plot(k[:16])

In [None]:
def load_data():
    data = np.load("/content/drive/MyDrive/SRGAN/Dataset/Anime_rgb_resized_16_128_20000.npz")
    rgb, bw = data['a'], data['b']
    rgb, bw = np.array(rgb), np.array(bw)
    rgb, bw = rgb.astype('float32'), bw.astype('float32')
    rgb = rgb.reshape((rgb.shape[0], rgb.shape[1], rgb.shape[2], 3))
    bw = bw.reshape((bw.shape[0], bw.shape[1], bw.shape[2], 3))
    # scale from [0,255] to [-1,1]
    rgb = (rgb - 127.5) / 127.5
    bw = (bw - 127.5) / 127.5
    return rgb, bw
    
# rgb, bw = load_data()
# print(rgb.shape, bw.shape)

In [None]:
def generate_real_samples(rgb_images, bw_images, n_samples, patch_size):
    ix = randint(0, rgb_images.shape[0], n_samples)
    rgb_images = rgb_images[ix]
    bw_images = bw_images[ix]
    y = ones((n_samples, patch_size, patch_size, 1))
    return [bw_images, rgb_images], y

# d = generate_real_samples(rgb, bw, 32, 8)
# print(np.array(d[0][0]).shape, np.array(d[0][1]).shape, np.array(d[1]).shape)
# d[1][0]

In [None]:
def generate_fake_samples(generator, bw_images, n_samples, patch_size, latent_dim=512):
    x_input = randn(latent_dim * n_samples)
    z_input = x_input.reshape(n_samples, latent_dim)
    pred_rgb_images = generator.predict([bw_images, z_input])
    y = zeros((n_samples, patch_size, patch_size, 1))
    return pred_rgb_images, z_input, y

# with tf.device(device_name):
# kh = generate_fake_samples(b, 512, 3)
# print("shape of the generated images: ",kh[0][0].shape)

In [None]:
def summarize_the_model(generator, rgb_images, bw_images, epoch, latent_dim = 512, n_samples=1, save=False):
    kk = 12310
    bw_images = bw_images[kk]
    bw_images = bw_images.reshape((1, bw_images.shape[0], bw_images.shape[1], 3))
    rgb_images = rgb_images[kk]
    rgb_images = rgb_images.reshape((1, rgb_images.shape[0], rgb_images.shape[1], 3))
    x_input = randn(latent_dim * n_samples)
    z_input = x_input.reshape(n_samples, latent_dim)
    predicted_rgb  = generator.predict([bw_images, z_input])
    # scale from [-1,1] to [0,1]
    # X = (X + 1) / 2.0
    bw_images = (bw_images + 1.0) / 2.0
    rgb_images = (rgb_images + 1.0) / 2.0
    predicted_rgb = (predicted_rgb + 1.0) / 2.0
    show_prediction(bw_images, rgb_images, predicted_rgb, epoch, n=n_samples, save=save)

In [None]:
def train(g_model, d_model, gan_model, dataset, n_epochs=3, n_batch=128):
    
    rgb_images, bw_images = dataset
    
    print("Data Size: ",rgb_images.shape)
    print("No. Of Epoch: ",n_epochs)
    bat_per_epo = int(rgb_images.shape[0] / n_batch)
    print("Batch Per Epoch: ", bat_per_epo)
    print("Full Batch: ",n_batch)
    print("*"*50,'\n\n')
    
    patch_size = d_model.output_shape[1]

    d_loss_real_array,d_loss_fake_array =[],[]
    g_loss_array = []
    for i in range(n_epochs):
        d_loss_r,d_loss_f = 0.0,0.0
        g_loss = 0.0
        
        for j in range(bat_per_epo):

            [bw_real, rgb_real], y_real = generate_real_samples(rgb_images, bw_images, n_batch, patch_size)
            d_loss1  = d_model.train_on_batch([bw_real, rgb_real], y_real)
            d_loss_r += d_loss1
            # print("real_loss")

            pred_rgb, z_input, y_fake = generate_fake_samples(g_model, bw_real, n_batch, patch_size) # Trick 1 is Here
            d_loss2  = d_model.train_on_batch([bw_real, pred_rgb], y_fake)
            d_loss_f += d_loss2
            # print("fake_loss")

            g_loss_1, _ , _ = gan_model.train_on_batch([bw_real, z_input], [y_real, rgb_real]) # Trick 2 is Here
            g_loss += g_loss_1
            # print("gan_loss")

        d_loss_real_array.append(d_loss_r)
        d_loss_fake_array.append(d_loss_f)
        g_loss_array.append(g_loss)

        print("\n")

        print('epoch -> [%d/%d], discriminator_loss_for_real_data = %.2f, discriminator_loss_for_fake_data = %.2f, generator_loss = %.2f\n' %(i+1, n_epochs, d_loss_r, d_loss_f, g_loss))
        epoch_k = i + 1
        # if(epoch_k%30==0):
        summarize_the_model(g_model, rgb_images, bw_images, epoch_k, n_samples = 1, save=True)
        # g_model.save("g_gan_model_"+str(i)+".h5")
        g_model.save("/content/drive/MyDrive/SRGAN/Model 16 Two/SRGAN_Generator_Model.h5")
        d_model.save("/content/drive/MyDrive/SRGAN/Model 16 Two/SRGAN_Discriminator_Model.h5")
        gan_model.save("/content/drive/MyDrive/SRGAN/Model 16 Two/SRGAN_GAN_Model.h5")
        np.savez_compressed("/content/drive/MyDrive/SRGAN/Model 16 Two/SRGAN_Loss_Record.npz", a=d_loss_real_array, b=d_loss_fake_array, c=g_loss_array)

        print("\n")

    return d_loss_real_array, d_loss_fake_array, g_loss_array

In [None]:
with tf.device(device_name):

    n_epochs = 1500
    n_batch = 128
    
    d_model = Discriminator()
    g_model = Generator()
    gan_model = GAN(g_model, d_model)
    
    dataset = load_data()
    print('\nREADY TO GO !!!\n')

    d_loss_real_array, d_loss_fake_array, g_loss_array = train(g_model, d_model, gan_model, dataset, n_epochs, n_batch)