In [41]:
import numpy as np
import tensorflow as tf


DIV2K_RGB_MEAN = np.array([0.4488, 0.4371, 0.4040]) * 255


def resolve_single(model, lr):
    return resolve(model, tf.expand_dims(lr, axis=0))[0]


def resolve(model, lr_batch):
    lr_batch = tf.cast(lr_batch, tf.float32)
    sr_batch = model(lr_batch)
    sr_batch = tf.clip_by_value(sr_batch, 0, 255)
    sr_batch = tf.round(sr_batch)
    sr_batch = tf.cast(sr_batch, tf.uint8)
    return sr_batch

def normalize(x, rgb_mean=DIV2K_RGB_MEAN):
    return (x - rgb_mean) / 127.5


def denormalize(x, rgb_mean=DIV2K_RGB_MEAN):
    return x * 127.5 + rgb_mean


def normalize_01(x):
    return x / 255.0


def normalize_m11(x):
    return x / 127.5 - 1


def denormalize_m11(x):
    return (x + 1) * 127.5

def pixel_shuffle(scale):
    return lambda x: tf.nn.depth_to_space(x, scale)




In [42]:
from tensorflow.python.keras.layers import Add, BatchNormalization, Conv2D, Dense, Flatten, Input, LeakyReLU, PReLU, Lambda, Activation, Concatenate, Multiply, Dropout
from tensorflow.python.keras.models import Model
from tensorflow.python.keras.applications.vgg19 import VGG19

LR_SIZE = 24
HR_SIZE = 96
upscaling_factor = 4
channels = 3
filters = 64

def SubpixelConv2D(name, scale=2):
        
        def subpixel_shape(input_shape):
            dims = [input_shape[0],
                    None if input_shape[1] is None else input_shape[1] * scale,
                    None if input_shape[2] is None else input_shape[2] * scale,
                    int(input_shape[3] / (scale ** 2))]
            output_shape = tuple(dims)
            return output_shape

        def subpixel(x):
            return tf.nn.depth_to_space(x, scale)

        return Lambda(subpixel, output_shape=subpixel_shape, name=name)

def upsample(x, number):
    x = Conv2D(256, kernel_size=3, strides=1, padding='same', name='upSampleConv2d_' + str(number))(x)
    x = SubpixelConv2D(name = str('upSampleSubPixel_') + str(number) , scale = 2)(x)
    x = PReLU(shared_axes=[1, 2], name='upSamplePReLU_' + str(number))(x)
    return x

def dense_block(input):
            
  x1 = Conv2D(64, kernel_size=3, strides=1, padding='same')(input)
  x1 = LeakyReLU(0.2)(x1)
  x1 = Concatenate()([input, x1])

  x2 = Conv2D(64, kernel_size=3, strides=1, padding='same')(x1)
  x2 = LeakyReLU(0.2)(x2)
  x2 = Concatenate()([input, x1, x2])

  x3 = Conv2D(64, kernel_size=3, strides=1, padding='same')(x2)
  x3 = LeakyReLU(0.2)(x3)
  x3 = Concatenate()([input, x1, x2, x3])

  x4 = Conv2D(64, kernel_size=3, strides=1, padding='same')(x3)
  x4 = LeakyReLU(0.2)(x4)
  x4 = Concatenate()([input, x1, x2, x3, x4])  

  x5 = Conv2D(64, kernel_size=3, strides=1, padding='same')(x4)
  x5 = Lambda(lambda x: x * 0.2)(x5)
  x = Add()([x5, input])
  return x

def RRDB(input):
    x = dense_block(input)
    x = dense_block(x)
    x = dense_block(x)
    x = Lambda(lambda x: x * 0.2)(x)
    out = Add()([x, input])
    return out


def sr_resnet(num_filters=64, num_res_blocks=16):
    lr_input = Input(shape=(24, 24, 3))

    x_start = Conv2D(64, kernel_size=3, strides=1, padding='same')(lr_input)
    x_start = LeakyReLU(0.2)(x_start)

    x = RRDB(x_start)

    x = Conv2D(64, kernel_size=3, strides=1, padding='same')(x)
    x = Lambda(lambda x: x * 0.2)(x)
    x = Add()([x, x_start])

    x = upsample(x, 1)
    if upscaling_factor > 2:
            x = upsample(x, 2)
    if upscaling_factor > 4:
            x = upsample(x, 3)

    x = Conv2D(64, kernel_size=3, strides=1, padding='same')(x)
    x = LeakyReLU(0.2)(x)
    hr_output = Conv2D(channels, kernel_size=3, strides=1, padding='same', activation='tanh')(x)
    
    
    model = Model(inputs=lr_input, outputs=hr_output)
  
    return model


generator = sr_resnet


def conv2d_block(input, filters, strides=1, bn=True):
      
      d = Conv2D(filters, kernel_size=3, strides=strides, padding='same')(input)
      d = LeakyReLU(alpha=0.2)(d)
      if bn:
                d = BatchNormalization(momentum=0.8)(d)
      return d



def discriminator(num_filters=64):
        img = Input(shape=(HR_SIZE,HR_SIZE,3))
        x = conv2d_block(img, filters, bn=False)
        x = conv2d_block(x, filters, strides=2)
        x = conv2d_block(x, filters * 2)
        x = conv2d_block(x, filters * 2, strides=2)
        x = conv2d_block(x, filters * 4)
        x = conv2d_block(x, filters * 4, strides=2)
        x = conv2d_block(x, filters * 8)
        x = conv2d_block(x, filters * 8, strides=2)
        x = Dense(filters * 16)(x)
        x = LeakyReLU(alpha=0.2)(x)
        x = Dropout(0.4)(x)
        x = Dense(1)(x)

        # Create model and compile
        model = Model(inputs=img, outputs=x)
        return model

In [None]:
import os

tf.keras.backend.clear_session()

resolver = tf.distribute.cluster_resolver.TPUClusterResolver('grpc://' + os.environ['COLAB_TPU_ADDR'])
tf.config.experimental_connect_to_cluster(resolver)

# This is the TPU initialization code that has to be at the beginning.
tf.tpu.experimental.initialize_tpu_system(resolver)
print("All devices: ", tf.config.list_logical_devices('TPU'))


In [None]:
strategy = tf.distribute.experimental.TPUStrategy(resolver)

with strategy.scope():
  generator().summary()
  discriminator().summary()
  model = tf.keras.models.Sequential()
  model.add(generator())
  model.add(discriminator())
  model.summary()
  discriminator().compile(loss="binary_crossentropy", optimizer="rmsprop")
  discriminator().trainable = False
  model.compile(loss="binary_crossentropy", optimizer="rmsprop")

In [None]:
from tqdm import tqdm
import cv2

batch_size = 32

def train_dcgan(model,epochs=5):
   

    print("done")
    generator, discriminator = model.layers
    discriminator.compile(loss="binary_crossentropy", optimizer="rmsprop")
    discriminator.trainable = False
    model.compile(loss="binary_crossentropy", optimizer="rmsprop")
    path = '/content/drive/MyDrive/train/'
    for epoch in tqdm(range(epochs)):
      print("Epoch {}/{}".format(epoch + 1, epochs))
      for root, dirnames, filenames in os.walk(path):
        i = 0
        j = 0
        x_train_x = np.zeros((32,24,24,3))
        x_train_y = np.zeros((32,96,96,3))
        for filename in filenames:
          img_path = os.path.join(path,filename)
          x_train = cv2.imread(img_path)
          x_trainx = cv2.resize(x_train,(24,24))
          x_trainy = cv2.resize(x_train,(96,96))
          x_train_x[i] = x_trainx 
          x_train_y[i] = x_trainy
          i = i+1
          if i == 32:
            j = j + 1
            print("batch {}/254".format(j))
            X_batch, Y_batch = x_train_x, x_train_y
            X_batch = tf.cast(X_batch, tf.float32)
            Y_batch = tf.cast(Y_batch, tf.float32)
            generated_images = generator(X_batch)
            X = tf.cast(generated_images, tf.float32) 
            X_fake_and_real = tf.concat([X, Y_batch], axis=0)
            y1 = tf.constant([[0.]] * batch_size + [[1.]] * batch_size)
            discriminator.trainable = True
            history_disc = discriminator.train_on_batch(X_fake_and_real, y1)
            y2 = tf.constant([[1.]] * batch_size)
            discriminator.trainable = False
            history_gen = model.train_on_batch(X_batch, y2)
            i = 0
            x_train_x = np.zeros((32,24,24,3))
            x_train_y = np.zeros((32,96,96,3))
    
      if (epoch+1)%10 == 0:

        generator.save_weights("Generator{}.h5".format(epoch))
        discriminator.save_weights("Discriminator_weights{}.h5".format(epoch))
        model.save_weights("Model{}.h5".format(epoch))
        from google.colab.patches import cv2_imshow

        path = "/content/drive/MyDrive/train/07336.jpg"

        X = cv2.imread(path)
        X = cv2.resize(X,(24,24))
        X = np.reshape(X, (1,24,24,3))
        X_batch = tf.cast(X, tf.float32)

        generator.load_weights("Generator{}.h5".format(epoch))
        discriminator.load_weights("Discriminator_weights{}.h5".format(epoch))
        Y = generator(X_batch)
        cv2_imshow(X[0])
        cv2_imshow( Y[0].numpy())



In [None]:
train_dcgan(model,epochs=4000)

In [None]:
import cv2
import numpy as np
import tensorflow as tf
from google.colab.patches import cv2_imshow

path = "/content/drive/MyDrive/train/04336.jpg"

X2 = cv2.imread(path)
X1 = cv2.resize(X2,(24,24), interpolation = cv2.INTER_AREA)
X = np.reshape(X1, (1,24,24,3))
X_batch = tf.cast(X, tf.float32)

generator, discriminator = model.layers
#generator.load_weights("DIV2K_gan.h5")
generator.load_weights("/content/generator_4X_epoch65000.h5")
Y = generator(X_batch/255)
cv2_imshow(X[0])
output = (Y[0].numpy() + 1)*127.5
cv2_imshow( output)

In [None]:
import matplotlib.pyplot as plt
%inline
loss_disc = history_disc.history['loss']
loss_gen = history_gen.history['loss']

epochs_range = range(epochs)

plt.figure(figsize=(8, 8))
plt.subplot(1, 1, 1)

plt.plot(epochs_range, loss_gen, label='Generator Loss')
plt.plot(epochs_range, loss_disc, label='Discriminator Loss')
plt.legend(loc='upper right')
plt.title('Training and Validation Loss')
plt.show()