#Introduce

This notebook is created for the case using to load the Dataset (DIV2k) with difference key, and preprocessing image:


1.   Load the dataset with key.
2.   Preprocessing image.
3.   Seperate the dataset.



In [None]:
print('import libary...')
import matplotlib.pyplot as plt
import numpy as np
import tensorflow as tf
import tensorflow_datasets as tfds
  

import libary...


Load dataset by key and Tensorflow API

In [None]:
def load_dataset(key='div2k',include_validate=True):
  print('using tensorflow_datasets to load the dataset','\nloading dataset...')
  if include_validate:
    (train_ds, val_ds), metadata = tfds.load(
    key,
    split=['train','validation'],
    with_info=True,
    as_supervised=True,
   )
  else:
    (train_ds), metadata = tfds.load(
    key,
    split=['train'],
    with_info=True,
    as_supervised=True,
    )
  print(metadata)
  return (train_ds,val_ds),metadata

Load dataset from Folder on Drive directory

In [None]:
def Load_dataset_from_directory(path,scale=4):
  def process_path(file_path):
  # Load the raw data from the file as a string
    img = tf.io.read_file(file_path)
    img = tf.io.decode_jpeg(img, channels=3)
    return img
  list_ds = tf.data.Dataset.list_files(path+'/*', shuffle=False)
  data_length = len(list_ds)
  hr_data = list_ds.map(process_path, num_parallel_calls=tf.data.AUTOTUNE)
  def build_pair_image(data,scale=4):
    lr=[]
    for img in data.take(len(data)):
      lr.append(tf.cast(tf.image.resize(img, [int(img.shape[0]/scale),int(img.shape[1]/scale)],antialias=True, method=tf.image.ResizeMethod.BICUBIC),dtype=tf.int32))
    return lr
  lr_data = tf.data.Dataset.from_generator(lambda: 
                              build_pair_image(hr_data),
                              output_types=( tf.int32),
                              output_shapes=(tf.TensorShape([ None, None, 
                                         3])))
  data = tf.data.Dataset.zip((lr_data,hr_data))
  data=data.apply(tf.data.experimental.assert_cardinality(data_length))
  return data

Normalize pixel value to [0,1]

In [None]:
def normalize_img(image):
  print('Normalizes images: `uint8` -> `float32`')
  return tf.cast(image, tf.float32) /255

In [None]:
def configure_performan(ds,BUFFER_SIZE=1000,BATCH_SIZE=16):
  ds=ds.repeat()
  ds=ds.shuffle(buffer_size=BUFFER_SIZE)
  ds=ds.batch(BATCH_SIZE)
  ds=ds.prefetch(buffer_size=tf.data.AUTOTUNE)
  return ds

In [None]:
def show(image, label,size):
  plt.figure(figsize=(len(image.numpy())/size,len(image.numpy()[0])/size))
  plt.imshow(image)
  plt.title(np.array(label))
  plt.axis('on')

In [None]:
def load_image_from_path(path):
  image = tf.keras.preprocessing.image.load_img(path)
  image = tf.keras.preprocessing.image.img_to_array(image)
  image = tf.constant(image)
  return tf.cast(image,tf.int32)

In [None]:
def save_image_path(path,image):
  tf.keras.utils.save_img(
    path, image
  )

In [None]:
def visualize_augmented(original, augmented,type='lr'):
  plt.figure(figsize=(len(original.numpy())/80,len(original.numpy()[0])/80))
  # fig = plt.figure()
  plt.subplot(1,2,1)
  plt.title('Original image'+type)
  plt.imshow(original)
  plt.subplot(1,2,2)
  plt.title('Variant image'+type)
  plt.imshow(augmented)

Preprocessing data before drop to model

In [None]:
def augment(lr,hr,with_adjust=False):
  #get the same seed to get the same operator to both lr and hr for image when we use randomly
  generator= tf.random.get_global_generator()
  seed= generator.make_seeds(2)[0]
  #mean that change the pixel value by adujst it's factor
  if with_adjust==True:
    #low resolution augmentation
    lr=tf.image.stateless_random_contrast(lr,0.8,1.2,seed)
    lr=tf.image.stateless_random_brightness(lr,0.10,seed)
    lr=tf.image.stateless_random_saturation(lr,0.8,1.3,seed)
    #high resolution augmentation
    hr=tf.image.stateless_random_contrast(hr,0.8,1.2,seed)
    hr=tf.image.stateless_random_brightness(hr,0.10,seed)
    hr=tf.image.stateless_random_saturation(hr,0.8,1.3,seed)
  #random rotate and flip
  roate_factor=tf.squeeze(generator.uniform([1],minval=1,maxval=5,dtype=tf.dtypes.int32), axis=0)
  lr=tf.image.rot90(lr,roate_factor)
  lr=tf.image.stateless_random_flip_left_right(lr,seed)
  hr=tf.image.rot90(hr,roate_factor)
  hr=tf.image.stateless_random_flip_left_right(hr,seed)
  return lr,hr

Random Crop image

In [None]:
def random_crop(lr_img, hr_img, hr_crop_size, scale):
    lr_crop_size = hr_crop_size // scale
    lr_shape = tf.shape(lr_img)[:2]
    lr_top = tf.random.uniform(shape=(), maxval=lr_shape[0] - lr_crop_size + 1, dtype=tf.int32)
    lr_left = tf.random.uniform(shape=(), maxval=lr_shape[1] - lr_crop_size + 1, dtype=tf.int32)
    hr_top = lr_top * scale
    hr_left = lr_left * scale
    lr_crop = lr_img[lr_top:lr_top + lr_crop_size, lr_left:lr_left + lr_crop_size]
    hr_crop = hr_img[hr_top:hr_top + hr_crop_size, hr_left:hr_left + hr_crop_size]
    return lr_crop, hr_crop

In [None]:
# Normalizes RGB images to [-1, 1].
def normalize_m11(x):
    return x / 127.5 - 1
# Normalizes RGB images to [0, 1].
def normalize_01(x):
    return x / 255.0
# Denomalizes RGB images from [-1, 1].
def denormalize_m11(x):
    return (x + 1) * 127.5

Preprocessing Image with Large size (OOM)

In [None]:
import numpy as np
from PIL import Image
def get_sr_image(model, lr, max_image_dimension=512, patch_size=96, batch_size=16):
    lr = lr[np.newaxis]

    if max(lr.shape) <= max_image_dimension:
        return process_full_image(model, lr)

    return process_in_patches(model, lr, patch_size, batch_size)

def unpad_patched_scaled_image(lr_image, patches_per_row, patches_per_col, patch_size, padded_scaled_image):
    _, lr_rows, lr_cols, _ = lr_image.shape

    padded_rows = patches_per_row * patch_size
    padded_cols = patches_per_col * patch_size

    row_padding = padded_rows - lr_rows
    col_padding = padded_cols - lr_cols

    scaling_factor = padded_scaled_image.shape[0] // padded_rows

    extract_from_row = int(row_padding / 2 * scaling_factor)
    extract_till_row = -1

    if row_padding > 0:
        extract_till_row = - extract_from_row

    extract_from_col = int(col_padding / 2 * scaling_factor)
    extract_till_col = -1

    if col_padding > 0:
        extract_till_col = - extract_from_col

    return padded_scaled_image[extract_from_row:extract_till_row, extract_from_col:extract_till_col]

def process_in_patches(model, lr, patch_size, batch_size):
    extracted_patches = tf.image.extract_patches(images=lr,
                             sizes=[1, patch_size, patch_size, 1],
                             strides=[1, patch_size, patch_size, 1],
                             rates=[1, 1, 1, 1],
                             padding='SAME')

    patches_per_row, patches_per_col = extracted_patches.shape[1], extracted_patches.shape[2]

    extracted_patches = extracted_patches.numpy().reshape(-1, patch_size, patch_size, 3)

    patch_results = []

    for start_index in range(0, extracted_patches.shape[0], batch_size):
        end_index = start_index + batch_size
        
        preds = model.predict(extracted_patches[start_index: end_index])
        
        patch_results.append(preds)

    patch_results = np.concatenate(patch_results, axis=0)
    patch_results = np.clip(patch_results, 0, 1)
    # patch_results = np.round(patch_results)

    joined = join_patches(patch_results, patches_per_row, patches_per_col).astype(np.float32)

    return unpad_patched_scaled_image(lr, patches_per_row, patches_per_col, patch_size, joined)

def join_patches(patches, patches_per_row, patches_per_col):
    patch_rows, patch_cols, channels = patches[0].shape
    
    image_rows = patch_rows * (patches_per_row - 1) + patch_rows
    image_cols = patch_cols * (patches_per_col - 1) + patch_cols
    
    joined = np.zeros((image_rows, image_cols, 3))
    
    row, col = 0, 0
    
    for patch in patches:
        joined[row * patch_rows: (row+1) * (patch_rows), col * patch_cols: (col+1) * (patch_cols)] = patch
        
        col += 1
        
        if col >= patches_per_col:
            col = 0
            row += 1
            
    return joined
def process_full_image(model, lr):
    sr = model.predict(lr)[0]
    sr = tf.clip_by_value(sr, 0, 1)
    # sr = tf.round(sr)
    sr = tf.cast(sr, tf.float32)
    return sr


In [None]:
def specific_crop(x,y, hr_img, hr_crop_size, scale):
    hr_top = x * scale
    hr_left = y * scale
    hr_crop = hr_img[hr_top:hr_top + hr_crop_size, hr_left:hr_left + hr_crop_size]
    return  hr_crop

In [None]:
def gaussian_noise_layer(input_layer, std):
    noise = tf.random.normal(shape=tf.shape(input_layer), mean=0.0, stddev=std, dtype=tf.float32) 
    return input_layer + noise

In [None]:
def gaussian_kernel(size: int,
                    mean: float,
                    std: float,
                   ):
    """Makes 2D gaussian Kernel for convolution."""

    d = tf.distributions.Normal(mean, std)

    vals = d.prob(tf.range(start = -size, limit = size + 1, dtype = tf.float32))

    gauss_kernel = tf.einsum('i,j->ij',
                                  vals,
                                  vals)

    return gauss_kernel / tf.reduce_sum(gauss_kernel)

#Visualize Sample

*   Set5
*   Set14




#Install .zip dataset from URL (Do not able this code)

In [None]:
# !wget --no-check-certificate \
#     https://drive.google.com/u/0/uc?export=download&confirm=y0q_&id=1h3H0Vm4yvMVVQgpJk3s0a304FTpOZGKg \
#     -O /content/drive/MyDrive/Dataset/Evaluate/ds.zip
  

/bin/bash: -O: command not found
--2022-02-06 03:51:11--  https://drive.google.com/u/0/uc?export=download
Resolving drive.google.com (drive.google.com)... 108.177.127.101, 108.177.127.139, 108.177.127.138, ...
Connecting to drive.google.com (drive.google.com)|108.177.127.101|:443... connected.
HTTP request sent, awaiting response... 400 Bad Request
2022-02-06 03:51:12 ERROR 400: Bad Request.



In [None]:
# import requests
# file_url = "https://drive.google.com/u/0/uc?export=download&confirm=y0q_&id=1h3H0Vm4yvMVVQgpJk3s0a304FTpOZGKg"
# r = requests.get(file_url, stream = True)
# with open("/content/drive/MyDrive/Dataset/Evaluate/ds.zip","wb") as file:
#   for block in r.iter_content(chunk_size = 1024):
#     if block:
#       file.write(block)

In [None]:
# import os
# import zipfile
# local_zip = '/content/drive/MyDrive/Dataset/Evaluate/Flickr2K/Flickr2K.zip'
# zip_ref = zipfile.ZipFile(local_zip, 'r')
# zip_ref.extractall('/content/drive/MyDrive/Dataset/Flick/')
# zip_ref.close()
# # local_zip = '/content/drive/MyDrive/Dataset/Evaludate/set5/SR_testing_datasets.zip'
# # zip_ref = zipfile.ZipFile(local_zip, 'r')
# # zip_ref.extractall('/content/drive/MyDrive/Dataset/Evaluate_dataset')
# # zip_ref.close()