In [1]:
import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt
import tensorflow_datasets as tfds
import cv2


data = tfds.load('tf_flowers')
tf.config.list_physical_devices()


2023-10-15 18:25:30.027977: E tensorflow/compiler/xla/stream_executor/cuda/cuda_dnn.cc:9342] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
2023-10-15 18:25:30.028030: E tensorflow/compiler/xla/stream_executor/cuda/cuda_fft.cc:609] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
2023-10-15 18:25:30.028086: E tensorflow/compiler/xla/stream_executor/cuda/cuda_blas.cc:1518] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2023-10-15 18:25:30.039094: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.
  from .autonotebook import tqdm as notebook_tqdm


[PhysicalDevice(name='/physical_device:CPU:0', device_type='CPU')]

## SRCNN and FSRCNN

In [None]:
train_data = data['train'].skip(600)
test_data = data['train'].take(600)

@tf.function
def build_data(data):
  cropped = tf.dtypes.cast(tf.image.random_crop(data['image'] / 255, (128, 128, 3)), tf.float32)
  lr = tf.image.resize(cropped, (64, 64))
  lr = tf.image.resize(lr, (128, 128), method = tf.image.ResizeMethod.BICUBIC)
  return (lr, cropped)

def downsample_image(image,scale):
  lr = tf.image.resize(image / 255, (image.shape[0]//scale, image.shape[1]//scale))
  lr = tf.image.resize(lr, (image.shape[0], image.shape[1]), method = tf.image.ResizeMethod.BICUBIC)
  return lr

def upsample_image(image,scale):
  hr = tf.image.resize(image / 255, (image.shape[0] * scale, image.shape[1] * scale), method=tf.image.ResizeMethod.BICUBIC)
  hr = tf.image.resize(hr, (128, 128), method=tf.image.ResizeMethod.BICUBIC)
  return hr

In [None]:
train_dataset_mapped = train_data.map(build_data, num_parallel_calls = tf.data.AUTOTUNE)
def show_image():
    for x in train_dataset_mapped.take(1):
      plt.imshow(x[0].numpy())
      plt.show()
      plt.imshow(x[1].numpy())
      plt.show()

In [None]:
SRCNN_915=tf.keras.models.Sequential([
    tf.keras.layers.Conv2D(64,9,padding='same',activation='relu'),
    tf.keras.layers.Conv2D(64,1,padding='same',activation='relu'),
    tf.keras.layers.Conv2D(3,5,padding='same',activation='relu')
])
def pixel_mse_loss(y_true,y_pred):
  return tf.reduce_mean( (y_true - y_pred) ** 2 )

def PSNR(y_true,y_pred):
  mse = tf.reduce_mean( (y_true - y_pred) ** 2 )
  return 20 * log10(1 / (mse ** 0.5))

def log10(x):
  numerator = tf.log(x)
  denominator = tf.log(tf.constant(10, dtype=numerator.dtype))
  return numerator / denominator

SRCNN_915.compile(optimizer = tf.keras.optimizers.Adam(0.001), loss = pixel_mse_loss)

In [None]:
total_iterations = 20
for iteration in range(total_iterations):
    print(f"Iteration: {iteration + 1} / {total_iterations}")
    train_dataset_mapped = train_data.map(build_data, num_parallel_calls = tf.data.AUTOTUNE).batch(128)
    val_dataset_mapped = test_data.map(build_data, num_parallel_calls = tf.data.AUTOTUNE).batch(128)
    SRCNN_915.fit(train_dataset_mapped, epochs = 1,validation_data = val_dataset_mapped)

In [None]:
train_dataset_mapped = train_data.map(build_data,num_parallel_calls=tf.data.AUTOTUNE)
for x in train_data.take(2):
  fig = plt.figure(figsize=(12,4))

  plt.subplot(1,3,1)
  plt.imshow(x['image'].numpy())
  plt.axis('off')
  plt.subplot(1,3,2)
  lr = downsample_image(x['image'].numpy(),4)
  plt.imshow(lr.numpy())
  plt.axis('off')
  plt.subplot(1,3,3)
  pred = SRCNN_915(np.array([lr]))
  plt.imshow(pred[0].numpy())
  plt.axis('off')
  plt.show()

In [None]:
FSRCNN = tf.keras.models.Sequential([
    tf.keras.layers.Conv2D(64, 9, padding = 'same', activation = 'relu'),
    tf.keras.layers.Conv2D(64, 1, padding = 'same', activation = 'relu'),
    tf.keras.layers.Conv2D(3, 5, padding = 'same', activation = 'relu'),
    tf.keras.layers.Conv2DTranspose(3, 3, padding = 'same', activation = 'relu')
])

FSRCNN.compile(optimizer=tf.keras.optimizers.Adam(0.001),loss=pixel_mse_loss)

In [None]:
total_iterations = 20
train_data = data['train'].skip(600)
test_data = data['train'].take(600)
train_dataset_mapped = train_data.map(build_data, num_parallel_calls = tf.data.AUTOTUNE)
for iteration in range(total_iterations):
    print(f"Iteration: {iteration + 1} / {total_iterations}")
    train_dataset_mapped = train_data.map(build_data, num_parallel_calls = tf.data.AUTOTUNE).batch(128)
    val_dataset_mapped = test_data.map(build_data, num_parallel_calls = tf.data.AUTOTUNE).batch(128)
    FSRCNN.fit(train_dataset_mapped, epochs = 1,validation_data = val_dataset_mapped)

In [None]:
train_dataset_mapped = train_data.map(build_data,num_parallel_calls=tf.data.AUTOTUNE)
for x in train_data.take(2):
  fig = plt.figure(figsize=(12,4))
  plt.subplot(1,3,1)
  plt.imshow(x['image'].numpy())
  plt.axis('off')
  plt.subplot(1,3,2)
  lr = downsample_image(x['image'].numpy(),4)
  plt.imshow(lr.numpy())
  plt.axis('off')
  plt.subplot(1,3,3)
  pred = FSRCNN(np.array([lr]))
  plt.imshow(pred[0].numpy())
  plt.axis('off')
  plt.show()

## SRGAN

# extracting umages from dataset 'tf_flowers'; total 600 images extracted


In [None]:
import tensorflow as tf
import tensorflow_datasets as tfds
import os
from PIL import Image
import random

data, info = tfds.load("tf_flowers", with_info=True)

target_dir = "/content/gdrive/MyDrive/DL_Project/fl_dt"

os.makedirs(target_dir, exist_ok=True)

dataset_list = list(data["train"].as_numpy_iterator())
random.shuffle(dataset_list)


images_extracted = 0

num_images_to_save = 600


for example in dataset_list:
    image = example["image"]
    label = example["label"]
    class_name = info.features["label"].int2str(label)


    image_filename = f"{class_name}_{label}_{hash(image.tobytes()):016x}.jpg"
    target_path = os.path.join(target_dir, image_filename)


    image_pil = Image.fromarray(image)
    image_pil.save(target_path)


    images_extracted += 1


    if images_extracted >= num_images_to_save:
        break

print(f"Saved {num_images_to_save} random images from the TF Flowers dataset to {target_dir}")


In [None]:
train_dir = "/content/gdrive/MyDrive/DL_Project/fl_dt"

output_hr_dir = os.path.join(train_dir, "hr_images_1")
output_lr_dir = os.path.join(train_dir, "lr_images_1")
os.makedirs(output_hr_dir, exist_ok=True)
os.makedirs(output_lr_dir, exist_ok=True)

for img_filename in os.listdir(train_dir):
    img_path = os.path.join(train_dir, img_filename)
    img_array = cv2.imread(img_path)

    if img_array is not None and img_array.shape[0] > 0 and img_array.shape[1] > 0:
        print(f"Processing: {img_filename} - Original Shape: {img_array.shape}")
        img_array_hr = cv2.resize(img_array, (128, 128))
        img_array_lr = cv2.resize(img_array, (32, 32))
        cv2.imwrite(os.path.join(output_hr_dir, img_filename), img_array_hr)
        cv2.imwrite(os.path.join(output_lr_dir, img_filename), img_array_lr)
        print(f"Processed: {img_filename} - HR Shape: {img_array_hr.shape}, LR Shape: {img_array_lr.shape}")
    else:
        print(f"Error processing image: {img_filename}")


In [None]:

def res_block(ip):

    res_model = Conv2D(64, (3,3), padding = "same")(ip)
    res_model = BatchNormalization(momentum = 0.5)(res_model)
    res_model = PReLU(shared_axes = [1,2])(res_model)

    res_model = Conv2D(64, (3,3), padding = "same")(res_model)
    res_model = BatchNormalization(momentum = 0.5)(res_model)

    return add([ip,res_model])

def upscale_block(ip):

    up_model = Conv2D(256, (3,3), padding="same")(ip)
    up_model = UpSampling2D( size = 2 )(up_model)
    up_model = PReLU(shared_axes=[1,2])(up_model)

    return up_model


def create_gen(gen_ip, num_res_block):
    layers = Conv2D(64, (9,9), padding="same")(gen_ip)
    layers = PReLU(shared_axes=[1,2])(layers)

    temp = layers

    for i in range(num_res_block):
        layers = res_block(layers)

    layers = Conv2D(64, (3,3), padding="same")(layers)
    layers = BatchNormalization(momentum=0.5)(layers)
    layers = add([layers,temp])

    layers = upscale_block(layers)
    layers = upscale_block(layers)

    op = Conv2D(3, (9,9), padding="same")(layers)

    return Model(inputs=gen_ip, outputs=op)


def discriminator_block(ip, filters, strides=1, bn=True):

    disc_model = Conv2D(filters, (3,3), strides = strides, padding="same")(ip)

    if bn:
        disc_model = BatchNormalization( momentum=0.8 )(disc_model)

    disc_model = LeakyReLU( alpha=0.2 )(disc_model)

    return disc_model



def create_disc(disc_ip):

    df = 64

    d1 = discriminator_block(disc_ip, df, bn=False)
    d2 = discriminator_block(d1, df, strides=2)
    d3 = discriminator_block(d2, df*2)
    d4 = discriminator_block(d3, df*2, strides=2)
    d5 = discriminator_block(d4, df*4)
    d6 = discriminator_block(d5, df*4, strides=2)
    d7 = discriminator_block(d6, df*8)
    d8 = discriminator_block(d7, df*8, strides=2)

    d8_5 = Flatten()(d8)
    d9 = Dense(df*16)(d8_5)
    d10 = LeakyReLU(alpha=0.2)(d9)
    validity = Dense(1, activation='sigmoid')(d10)

    return Model(disc_ip, validity)


In [None]:
from keras.applications import VGG19

def build_vgg(hr_shape):

    vgg = VGG19(weights="imagenet",include_top=False, input_shape=hr_shape)

    return Model(inputs=vgg.inputs, outputs=vgg.layers[10].output)


def create_comb(gen_model, disc_model, vgg, lr_ip, hr_ip):
    gen_img = gen_model(lr_ip)

    gen_features = vgg(gen_img)

    disc_model.trainable = False
    validity = disc_model(gen_img)

    return Model(inputs=[lr_ip, hr_ip], outputs=[validity, gen_features])

In [None]:
lr_list = os.listdir("/content/gdrive/MyDrive/DL_Project/fl_dt/lr_images_1")

lr_images = []
for img in lr_list:
    img_lr = cv2.imread("/content/gdrive/MyDrive/DL_Project/fl_dt/lr_images_1/" + img)
    if img_lr is not None:
        img_lr = cv2.cvtColor(img_lr, cv2.COLOR_BGR2RGB)
        lr_images.append(img_lr)

hr_list = os.listdir("/content/gdrive/MyDrive/DL_Project/fl_dt/hr_images_1")

hr_images = []
for img in hr_list:
    img_hr = cv2.imread("/content/gdrive/MyDrive/DL_Project/fl_dt/hr_images_1/" + img)
    if img_hr is not None:
        img_hr = cv2.cvtColor(img_hr, cv2.COLOR_BGR2RGB)
        hr_images.append(img_hr)

lr_images = np.array(lr_images)
hr_images = np.array(hr_images)



In [None]:
lr_images = lr_images / 255.
hr_images = hr_images / 255.

lr_train, lr_test, hr_train, hr_test = train_test_split(lr_images, hr_images,
                                                      test_size=0.33, random_state=42)


hr_shape = (hr_train.shape[1], hr_train.shape[2], hr_train.shape[3])
lr_shape = (lr_train.shape[1], lr_train.shape[2], lr_train.shape[3])

lr_ip = Input(shape=lr_shape)
hr_ip = Input(shape=hr_shape)

generator = create_gen(lr_ip, num_res_block = 16)
generator.summary()

discriminator = create_disc(hr_ip)
discriminator.compile(loss="binary_crossentropy", optimizer="adam", metrics=['accuracy'])
discriminator.summary()

vgg = build_vgg((128,128,3))
print(vgg.summary())
vgg.trainable = False

gan_model = create_comb(generator, discriminator, vgg, lr_ip, hr_ip)

In [None]:
gan_model.compile(loss=["binary_crossentropy", "mse"], loss_weights=[1e-3, 1], optimizer="adam")
gan_model.summary()

batch_size = 1
train_lr_batches = []
train_hr_batches = []
for it in range(int(hr_train.shape[0] / batch_size)):
    start_idx = it * batch_size
    end_idx = start_idx + batch_size
    train_hr_batches.append(hr_train[start_idx:end_idx])
    train_lr_batches.append(lr_train[start_idx:end_idx])


epochs = 5
for e in range(epochs):

    fake_label = np.zeros((batch_size, 1))
    real_label = np.ones((batch_size,1))

    g_losses = []
    d_losses = []


    for b in tqdm(range(len(train_hr_batches))):
        lr_imgs = train_lr_batches[b]
        hr_imgs = train_hr_batches[b]

        fake_imgs = generator.predict_on_batch(lr_imgs)


        discriminator.trainable = True
        d_loss_gen = discriminator.train_on_batch(fake_imgs, fake_label)
        d_loss_real = discriminator.train_on_batch(hr_imgs, real_label)


        discriminator.trainable = False


        d_loss = 0.5 * np.add(d_loss_gen, d_loss_real)


        image_features = vgg.predict(hr_imgs)



        g_loss, _, _ = gan_model.train_on_batch([lr_imgs, hr_imgs], [real_label, image_features])


        d_losses.append(d_loss)
        g_losses.append(g_loss)

    g_losses = np.array(g_losses)
    d_losses = np.array(d_losses)

    g_loss = np.sum(g_losses, axis=0) / len(g_losses)
    d_loss = np.sum(d_losses, axis=0) / len(d_losses)

    print("epoch:", e+1 ,"g_loss:", g_loss, "d_loss:", d_loss)

    generator.save("/content/gdrive/MyDrive/DL_Project/gen_e.h5")

In [None]:
from keras.models import load_model
from numpy.random import randint

generator = load_model('/content/gdrive/MyDrive/DL_Project/gen_e.h5', compile=False)


[X1, X2] = [lr_test, hr_test]
ix = randint(0, len(X1), 1)
src_image, tar_image = X1[ix], X2[ix]

gen_image = generator.predict(src_image)



plt.figure(figsize=(16, 8))
plt.subplot(231)
plt.title('LR Image')
plt.imshow(src_image[0,:,:,:])
plt.subplot(232)
plt.title('Superresolution')
plt.imshow(gen_image[0,:,:,:])
plt.subplot(233)
plt.title('Orig. HR image')
plt.imshow(tar_image[0,:,:,:])

plt.show()

In [None]:
sreeni_lr = cv2.imread("/content/gdrive/MyDrive/DL_Project/fl_dt/lr_images_1/daisy_1_-10790155265df092.jpg")
sreeni_hr = cv2.imread("/content/gdrive/MyDrive/DL_Project/fl_dt/hr_images_1/daisy_1_-10790155265df092.jpg")


sreeni_lr = cv2.cvtColor(sreeni_lr, cv2.COLOR_BGR2RGB)
sreeni_hr = cv2.cvtColor(sreeni_hr, cv2.COLOR_BGR2RGB)

sreeni_lr = sreeni_lr / 255.
sreeni_hr = sreeni_hr / 255.

sreeni_lr = np.expand_dims(sreeni_lr, axis=0)
sreeni_hr = np.expand_dims(sreeni_hr, axis=0)

generated_sreeni_hr = generator.predict(sreeni_lr)

plt.figure(figsize=(16, 8))
plt.subplot(231)
plt.title('LR Image')
plt.imshow(sreeni_lr[0,:,:,:])
plt.subplot(232)
plt.title('Superresolution')
plt.imshow(generated_sreeni_hr[0,:,:,:])
plt.subplot(233)
plt.title('Orig. HR image')
plt.imshow(sreeni_hr[0,:,:,:])

plt.show()