In [2]:
import glob
import os
import time
import cv2

import matplotlib.pyplot as plt
import numpy as np
from tensorflow.keras.layers import *
from tensorflow.keras import Input
from tensorflow.keras.applications import VGG19
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam

2023-04-08 16:45:52.103773: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcudart.so.10.1


In [3]:
import tensorflow as tf
physical_devices = tf.config.experimental.list_physical_devices('GPU')
assert len(physical_devices) > 0, "Not enough GPU hardware devices available"
config = tf.config.experimental.set_memory_growth(physical_devices[0], True)

2023-04-08 16:45:55.977915: I tensorflow/compiler/jit/xla_cpu_device.cc:41] Not creating XLA devices, tf_xla_enable_xla_devices not set
2023-04-08 16:45:55.978909: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcuda.so.1
2023-04-08 16:45:55.997516: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:941] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2023-04-08 16:45:55.997751: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1720] Found device 0 with properties: 
pciBusID: 0000:01:00.0 name: NVIDIA GeForce GTX 1650 Ti computeCapability: 7.5
coreClock: 1.485GHz coreCount: 16 deviceMemorySize: 3.81GiB deviceMemoryBandwidth: 178.84GiB/s
2023-04-08 16:45:55.997782: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcudart.so.10.1
2023-04-08 16:45:56.017346: I tensorflow/stream_executor/platform

In [None]:
#DATA PREPROCESSING

train_dir = "data"

for img in os.listdir(train_dir + "/original"):
    img_arr = cv2.imread(train_dir + "/original/" + img)
    img_arr = cv2.resize(img_arr,(128,128))
    lr_img_arr = cv2.resize(img_arr,(32,32))
    cv2.imwrite(train_dir + "/hr/" + img, img_arr)
    cv2.imwrite(train_dir + "/lr/" + img, lr_img_arr)


In [None]:
def residual_block(x):
    """
    Residual block
    """
    filters = [64, 64]
    kernel_size = 3
    strides = 1
    padding = "same"
    momentum = 0.8
    activation = "relu"

    res = Conv2D(filters=filters[0], kernel_size=kernel_size, 
                 strides=strides, padding=padding)(x)
    res = Activation(activation=activation)(res)
    res = BatchNormalization(momentum=momentum)(res)

    res = Conv2D(filters=filters[1], kernel_size=kernel_size, 
                 strides=strides, padding=padding)(res)
    res = BatchNormalization(momentum=momentum)(res)

    # Add res and x
    res = Add()([res, x])
    return res

In [None]:
#Generator Network
def build_generator(gen_input):
    """
    Create a generator network using the hyperparameter values defined below
    :return:
    """
    residual_blocks = 16
    momentum = 0.8
    # Input Layer of the generator network
    input_layer = gen_input

    # Add the pre-residual block
    gen1 = Conv2D(filters=64, kernel_size=9, strides=1, padding='same',  
                  activation='relu')(input_layer)

    # Add 16 residual blocks
    res = residual_block(gen1)
    for i in range(residual_blocks - 1):
        res = residual_block(res)

    # Add the post-residual block
    gen2 = Conv2D(filters=64, kernel_size=3, strides=1, padding='same')(res)
    gen2 = BatchNormalization(momentum=momentum)(gen2)

    # Take the sum of the output from the pre-residual block(gen1) and the post-residual block(gen2)
    gen3 = Add()([gen2, gen1])

    # Add an upsampling block
    gen4 = UpSampling2D(size=2)(gen3)
    gen4 = Conv2D(filters=256, kernel_size=3, strides=1, padding='same')(gen4)
    gen4 = Activation('relu')(gen4)

    # Add another upsampling block
    gen5 = UpSampling2D(size=2)(gen4)
    gen5 = Conv2D(filters=256, kernel_size=3, strides=1, 
                  padding='same')(gen5)
    gen5 = Activation('relu')(gen5)

    # Output convolution layer
    gen6 = Conv2D(filters=3, kernel_size=9, strides=1, padding='same')(gen5)
    output = Activation('tanh')(gen6)

    # Keras model
    model = Model(inputs=[input_layer], outputs=[output], 
                  name='generator')
    return model

In [None]:
def build_discriminator(dis_input):
    """
    Create a discriminator network using the hyperparameter values defined below
    :return:
    """
    leakyrelu_alpha = 0.2
    momentum = 0.8

    input_layer = dis_input

    # Add the first convolution block
    dis1 = Conv2D(filters=64, kernel_size=3, strides=1, padding='same')(input_layer)
    dis1 = LeakyReLU(alpha=leakyrelu_alpha)(dis1)

    # Add the 2nd convolution block
    dis2 = Conv2D(filters=64, kernel_size=3, strides=2, padding='same')(dis1)
    dis2 = LeakyReLU(alpha=leakyrelu_alpha)(dis2)
    dis2 = BatchNormalization(momentum=momentum)(dis2)

    # Add the third convolution block
    dis3 = Conv2D(filters=128, kernel_size=3, strides=1, padding='same')(dis2)
    dis3 = LeakyReLU(alpha=leakyrelu_alpha)(dis3)
    dis3 = BatchNormalization(momentum=momentum)(dis3)

    # Add the fourth convolution block
    dis4 = Conv2D(filters=128, kernel_size=3, strides=2, padding='same')(dis3)
    dis4 = LeakyReLU(alpha=leakyrelu_alpha)(dis4)
    dis4 = BatchNormalization(momentum=0.8)(dis4)

    # Add the fifth convolution block
    dis5 = Conv2D(256, kernel_size=3, strides=1, padding='same')(dis4)
    dis5 = LeakyReLU(alpha=leakyrelu_alpha)(dis5)
    dis5 = BatchNormalization(momentum=momentum)(dis5)

    # Add the sixth convolution block
    dis6 = Conv2D(filters=256, kernel_size=3, strides=2, padding='same')(dis5)
    dis6 = LeakyReLU(alpha=leakyrelu_alpha)(dis6)
    dis6 = BatchNormalization(momentum=momentum)(dis6)

    # Add the seventh convolution block
    dis7 = Conv2D(filters=512, kernel_size=3, strides=1, padding='same')(dis6)
    dis7 = LeakyReLU(alpha=leakyrelu_alpha)(dis7)
    dis7 = BatchNormalization(momentum=momentum)(dis7)

    # Add the eight convolution block
    dis8 = Conv2D(filters=512, kernel_size=3, strides=2, padding='same')(dis7)
    dis8 = LeakyReLU(alpha=leakyrelu_alpha)(dis8)
    dis8 = BatchNormalization(momentum=momentum)(dis8)

    # Add a dense layer
    dis9 = Dense(units=1024)(dis8)
    dis9 = LeakyReLU(alpha=0.2)(dis9)

    # Last dense layer - for classification
    output = Dense(units=1, activation='sigmoid')(dis9)

    model = Model(inputs=[input_layer], outputs=[output], name='discriminator')
    return model

In [None]:
def build_adversarial_model(generator, discriminator, vgg):

    input_low_resolution = Input(shape=(64, 64, 3))

    fake_hr_images = generator(input_low_resolution)
    fake_features = vgg(fake_hr_images)

    discriminator.trainable = False

    output = discriminator(fake_hr_images)

    model = Model(inputs=[input_low_resolution],
                  outputs=[output, fake_features])

    for layer in model.layers:
        print(layer.name, layer.trainable)
        
    return model

In [None]:
# def sample_images(data_dir, batch_size, high_resolution_shape, low_resolution_shape):
#     # Make a list of all images inside the data directory
#     all_images = glob.glob(data_dir)

#     # Choose a random batch of images
#     images_batch = np.random.choice(all_images, size=batch_size)

#     low_resolution_images = []
#     high_resolution_images = []

#     for img in images_batch:
#         # Get an ndarray of the current image
#         img1 = load_img(img, mode='RGB')
#         img1 = img1.astype(np.float32)

#         # Resize the image
#         img1_high_resolution = smart_resize(img1, high_resolution_shape)
#         img1_low_resolution = smart_resize(img1, low_resolution_shape)

#         # Do a random flip
#         if np.random.random() < 0.5:
#             img1_high_resolution = np.fliplr(img1_high_resolution)
#             img1_low_resolution = np.fliplr(img1_low_resolution)

#         high_resolution_images.append(img1_high_resolution)
#         low_resolution_images.append(img1_low_resolution)

#     return np.array(high_resolution_images), np.array(low_resolution_images)

In [None]:
def build_vgg(high_resolution_shape):
    vgg = VGG19(weights = 'imagenet', include_top=False, input_shape=high_resolution_shape)

    return Model(inputs = vgg.inputs, outputs = vgg.layers[10].output)

def create_combination(generator,discriminator,vgg,input_low_resolution,input_high_resolution):
    generated_high_resolution_images = generator(input_low_resolution)

    features = vgg(generated_high_resolution_images)

    discriminator.trainable = False

    probs = discriminator(generated_high_resolution_images)

    adversarial_model = Model(inputs=[input_low_resolution, input_high_resolution], outputs=[probs, features])
    
    return adversarial_model

In [None]:
#Laoding images
n = 5000
lr_list = os.listdir("data/lr")[:n]

lr_images = []
for img in lr_list:
    img_lr = cv2.imread("data/lr/" + img)
    img_lr = cv2.cvtColor(img_lr,cv2.COLOR_BGR2RGB)
    lr_images.append(img_lr)

hr_list = os.listdir("data/hr")[:n]
hr_images = []
for img in hr_list:
    img_hr = cv2.imread("data/hr/" + img)
    img_hr = cv2.cvtColor(img_hr,cv2.COLOR_BGR2RGB)
    hr_images.append(img_hr)

lr_images = np.array(lr_images)
hr_images = np.array(hr_images)

In [None]:
#Viewing some images
import random
image_number = random.randint(0,len(lr_images)-1)
plt.figure(figsize = (12,6))
plt.subplot(121)
plt.imshow(np.reshape(lr_images[image_number],(32,32,3)))
plt.subplot(122)
plt.imshow(np.reshape(hr_images[image_number],(128,128,3)))
plt.show()

In [None]:
#Scale values

lr_images = lr_images/255
hr_images = hr_images/255

In [None]:
#Train test Split
from sklearn.model_selection import train_test_split

lr_train, lr_test, hr_train, hr_test = train_test_split(lr_images,hr_images,test_size=0.3,random_state=100)

low_resolution_shape = (lr_train.shape[1], lr_train.shape[2], lr_train.shape[3])
high_resolution_shape = (hr_train.shape[1], hr_train.shape[2], hr_train.shape[3])

In [None]:

# Shape of low-resolution and high-resolution images
input_low_resolution = Input(shape=low_resolution_shape)
input_high_resolution = Input(shape=high_resolution_shape)

In [None]:
generator = build_generator(input_low_resolution)
generator.summary()

In [None]:
# Common optimizer for all networks
common_optimizer = Adam(0.0002, 0.5)
discriminator = build_discriminator(input_high_resolution)
discriminator.compile(loss='mse', optimizer=common_optimizer, 
                      metrics=['accuracy'])
discriminator.summary()

In [None]:
vgg = build_vgg((128,128,3))
print(vgg.summary())
vgg.trainable = False

In [None]:
gan_model = create_combination(generator,discriminator,vgg,input_low_resolution,input_high_resolution)
gan_model.compile(loss=["binary_crossentropy","mse"], loss_weights=[1e-3, 1], 
                  optimizer=common_optimizer)
gan_model.summary()

In [None]:
adv_model = build_adversarial_model(generator,discriminator,vgg)
adv_model.compile(loss=["binary_crossentropy","mse"], loss_weights=[1e-3, 1], 
                  optimizer=common_optimizer)
adv_model.summary()

In [None]:
from tqdm import tqdm
epochs = 15
batch_size = 1

train_lr_batches = []
train_hr_batches = []

for it in range(int(hr_train.shape[0]/batch_size)):
    start_idx = it*batch_size
    end_idx = start_idx + batch_size
    train_hr_batches.append(hr_train[start_idx:end_idx])
    train_lr_batches.append(lr_train[start_idx:end_idx])

for e in range(epochs):
    fake_label = np.zeros((batch_size,1)) #Assign a label of 0 to all fake generated images
    real_label = np.ones((batch_size,1)) #Assign a label of 1 to all real images

    #List for losses of generator and discrimanator
    gen_losses = []
    dis_losses = []

    for b in tqdm(range(len(train_hr_batches))):
        lr_imgs = train_lr_batches[b]
        hr_imgs = train_hr_batches[b]

        fake_imgs = generator.predict_on_batch(lr_imgs)

        discriminator.trainable = True

        d_loss_gen = discriminator.train_on_batch(fake_imgs,fake_label)
        d_loss_real = discriminator.train_on_batch(hr_imgs,real_label)

        discriminator.trainable = False

        d_loss = 0.5 * np.add(d_loss_gen, d_loss_real)

        image_features = vgg.predict(hr_imgs)
        g_loss, _, _ = gan_model.train_on_batch([lr_imgs, hr_imgs], [real_label, image_features])

        
        #Save losses to a list so we can average and report. 
        dis_losses.append(d_loss)
        gen_losses.append(g_loss)
        
    #Convert the list of losses to an array to make it easy to average    
    gen_losses = np.array(gen_losses)
    dis_losses = np.array(dis_losses)
    
    #Calculate the average losses for generator and discriminator
    g_loss = np.sum(gen_losses, axis=0) / len(gen_losses)
    d_loss = np.sum(dis_losses, axis=0) / len(dis_losses)
    
    #Report the progress during training. 
    print("epoch:", e+1 , "g_loss: ", g_loss, "d_loss:", d_loss)

    if (e+1) % 10 == 0: #Change the frequency for model saving, if needed
        #Save the generator after every n epochs (Usually 10 epochs)
        generator.save("gen_e_"+ str(e+1) +".h5")


In [None]:
###################################################################################
#Test - perform super resolution using saved generator model
from tensorflow.keras.models import load_model
from numpy.random import randint

generator = load_model('gen_e_10.h5', compile=False)


[X1, X2] = [lr_test, hr_test]
# select random example
ix = randint(0, len(X1), 1)
src_image, tar_image = X1[ix], X2[ix]

# generate image from source
gen_image = generator.predict(src_image)


# plot all three images

plt.figure(figsize=(16, 8))
plt.subplot(231)
plt.title('LR Image')
plt.imshow(src_image[0,:,:,:])
plt.subplot(232)
plt.title('Superresolution')
plt.imshow(gen_image[0,:,:,:])
plt.subplot(233)
plt.title('Orig. HR image')
plt.imshow(tar_image[0,:,:,:])

plt.show()


################################################
sreeni_lr = cv2.imread("data/sreeni_32.jpg")
sreeni_hr = cv2.imread("data/sreeni_256.jpg")

#Change images from BGR to RGB for plotting. 
#Remember that we used cv2 to load images which loads as BGR.
sreeni_lr = cv2.cvtColor(sreeni_lr, cv2.COLOR_BGR2RGB)
sreeni_hr = cv2.cvtColor(sreeni_hr, cv2.COLOR_BGR2RGB)

sreeni_lr = sreeni_lr / 255.
sreeni_hr = sreeni_hr / 255.

sreeni_lr = np.expand_dims(sreeni_lr, axis=0)
sreeni_hr = np.expand_dims(sreeni_hr, axis=0)

generated_sreeni_hr = generator.predict(sreeni_lr)

# plot all three images
plt.figure(figsize=(16, 8))
plt.subplot(231)
plt.title('LR Image')
plt.imshow(sreeni_lr[0,:,:,:])
plt.subplot(232)
plt.title('Superresolution')
plt.imshow(generated_sreeni_hr[0,:,:,:])
plt.subplot(233)
plt.title('Orig. HR image')
plt.imshow(sreeni_hr[0,:,:,:])

plt.show()