In [21]:
import os
import sys
from pathlib import Path

# current_path = os.getcwd()
# repo_dir = Path(current_path).parents[0]
# data_dir = repo_dir / 'data'
# if repo_dir not in sys.path:
#     sys.path.append(repo_dir)
    
repo_dir = os.path.abspath(os.path.join('..'))
if repo_dir not in sys.path:
    sys.path.append(repo_dir)

import argparse
from functools import partial
import json
from keras import optimizers


from toolbox.data import load_set
from toolbox.models import get_model
from toolbox.experiment import Experiment
from keras.preprocessing.image import img_to_array
from keras.preprocessing.image import load_img
import numpy as np
from PIL import Image
import matplotlib.pyplot as plt
%matplotlib inline

from keras.models import Sequential
from keras.engine.topology import Layer
import tensorflow as tf
from keras.layers import Conv2D
from keras.layers import InputLayer
from keras import backend as K

import pandas as pd

import time

In [22]:
def generate_sub_images(image, size, stride):
    for i in range(0, image.size[0] - size + 1, stride):
        for j in range(0, image.size[1] - size + 1, stride):
            yield image.crop([i, j, i + size, j + size])
            
def array_to_img(x, mode='YCbCr'):
    if mode == 'gray':
        return Image.fromarray(x.astype('uint8')).convert('RGB')
    else:
        return Image.fromarray(x.astype('uint8'), mode=mode).convert('RGB')



def bicubic_rescale(image, scale):
    if isinstance(scale, (float, int)):
        size = (np.array(image.size) * scale).astype(int)
    return image.resize(size, resample=Image.BICUBIC)

def img_downscale(image, scale = 4):
    if len(image.shape)==3: # color image
        down_size = np.int64(np.array(image.shape[0:2])/scale)
        img_downscale = np.zeros(np.append(down_size, image.shape[2]))
        for x in range(down_size[0]): #row
            for y in range(down_size[1]): # column
                X = x * scale
                Y = y * scale
                img_downscale[x, y] = image[X,Y]
        return img_downscale
    else: # gray_scale
        down_size = np.uint8(np.array(image.shape)/scale)
        img_downscale = np.zeros(down_size)
        for x in range(down_size[0]): #row
            for y in range(down_size[1]): # column
                X = x * scale
                Y = y * scale
                img_downscale[x, y] = image[X,Y]
        return img_downscale


def modcrop(image, scale):
    size = np.array(image.size)
    size -= size % scale
    return image.crop([0, 0, *size])

def load_set(name, lr_sub_size=11, lr_sub_stride=5, scale=3):
    hr_sub_size = lr_sub_size * scale
    hr_sub_stride = lr_sub_stride * scale
    lr_sub_arrays = []
    hr_sub_arrays = []
    for path in (data_dir / name).glob('*'):        
        image = load_img(path)
        image = image.convert('YCbCr')
        hr_image = modcrop(image, scale)
        lr_image = bicubic_rescale(hr_image, 1 / scale)
        lr_sub_arrays += [img_to_array(img) for img in generate_sub_images(lr_image, size=lr_sub_size, stride=lr_sub_stride)]
        hr_sub_arrays += [img_to_array(img) for img in generate_sub_images(hr_image, size=hr_sub_size, stride=hr_sub_stride)]
    x = np.stack(lr_sub_arrays)
    y = np.stack(hr_sub_arrays)
    return x, y

def ensure_dimension(array, dim):
    while len(array.shape) < dim:
        array = array[np.newaxis, ...]
    return array

def ensure_channel(array, c):
    return array[..., c:c+1]

def pre_process(array):
    array = ensure_dimension(array, 4)
    array = ensure_channel(array, 0)
    return array

def post_process(array, auxiliary_array):
    array = np.concatenate([array, auxiliary_array[..., 1:]], axis=-1)
    array = np.clip(array, 0, 255)
    return array

def inverse_post_process(array):
    array = ensure_dimension(array, 4)
    array = ensure_channel(array, 0)
    return array

In [23]:


class ImageRescale(Layer):
    def __init__(self, scale, method=tf.image.ResizeMethod.BICUBIC,
                 trainable=False, **kwargs):
        self.scale = scale
        self.method = method
        super().__init__(trainable=trainable, **kwargs)

    def compute_size(self, shape):
        size = np.array(shape)[[1, 2]] * self.scale
        return tuple(size.astype(int))

    def call(self, x):
        size = self.compute_size(x.shape.as_list())
        return tf.image.resize_images(x, size, method=self.method)

    def compute_output_shape(self, input_shape):
        size = self.compute_size(input_shape)
        return (input_shape[0], *size, input_shape[3])

    def get_config(self):
        config = super().get_config()
        config['scale'] = self.scale
        config['method'] = self.method
        return config

def bicubic(x, scale=3):
    model = Sequential()
    model.add(InputLayer(input_shape=x.shape[-3:]))
    model.add(ImageRescale(scale, method=tf.image.ResizeMethod.BICUBIC))
    return model

def srcnn(x, f=[9, 1, 5], n=[64, 32], scale=3):
    assert len(f) == len(n) + 1
    model = bicubic(x, scale=scale)
    c = x.shape[-1]
    for ni, fi in zip(n, f):
        model.add(Conv2D(ni, fi, padding='same',
                         kernel_initializer='he_normal', activation='relu'))
    model.add(Conv2D(c, f[-1], padding='same',
                     kernel_initializer='he_normal'))
    return model

def psnr(y_true, y_pred):
    """Peak signal-to-noise ratio averaged over samples and channels."""
    mse = K.mean(K.square(y_true - y_pred), axis=(-3, -2))
    return K.mean(20 * K.log(255 / K.sqrt(mse)) / np.log(10))

def print_statistic(gx):
    print('Image shape is :' + str(gx.shape))
    print('Image range from %f to %f' % (np.amin(gx), np.amax(gx)))
    print('The length of the range is %f' % np.ptp(gx))
    print('data type is ' + str(gx.dtype))


In [27]:
def psnr(y_true, y_pred):
    """Peak signal-to-noise ratio averaged over samples and channels."""
    mse = K.mean(K.square(y_true - y_pred), axis=(-3, -2))
    return K.mean(20 * K.log(255 / K.sqrt(mse)) / np.log(10))

def hevc_gray_upscale(img_gray):
    img_gray = np.int64(img_gray) -127
    
    taps14 = np.array([-1,4,-10,58,17,-5,1])
    taps12 = np.array([-1,4,-11,40,40,-11,4,-1])
    taps34 = np.flip(taps14)
    scale = 4
    datatype = np.int64
    # pad zeros to the side
    pad_zero_left = np.zeros((img_gray.shape[0], 3), dtype=datatype)
    pad_zero_right = np.zeros((img_gray.shape[0], 4), dtype=datatype)
    img_gray_pad = np.c_[pad_zero_left, img_gray, pad_zero_right]

    pad_zero_up = np.zeros((3, img_gray_pad.shape[1]), dtype=datatype)
    pad_zero_down = np.zeros((4, img_gray_pad.shape[1]), dtype=datatype)
    img_gray_pad = np.r_[pad_zero_up, img_gray_pad, pad_zero_down]

    img_upscale = np.zeros(np.array(img_gray_pad.shape) * scale, dtype=datatype)

    # pixel interpolated using original image
    for x in range(3, img_gray_pad.shape[0] - 4): #row
        for y in range(3, img_gray_pad.shape[1] - 4): # column
            X = x * scale
            Y = y * scale

            img_upscale[X, Y] = img_gray_pad[x,y] * 64
            img_upscale[X, Y + 1] = np.sum(np.multiply(taps14, img_gray_pad[x, y-3:y+4]))
            img_upscale[X, Y + 2] = np.sum(np.multiply(taps12, img_gray_pad[x, y-3:y+5]))
            img_upscale[X, Y + 3] = np.sum(np.multiply(taps34, img_gray_pad[x, y-2:y+5]))
            img_upscale[X + 1, Y] = np.sum(np.multiply(taps14, img_gray_pad[x-3:x+4, y]))
            img_upscale[X + 2, Y] = np.sum(np.multiply(taps12, img_gray_pad[x-3:x+5, y]))
            img_upscale[X + 3, Y] = np.sum(np.multiply(taps34, img_gray_pad[x-2:x+5, y]))

    # pixel interpolated in the middle
    for x in range(3, img_gray_pad.shape[0] - 4): #row
        for y in range(3, img_gray_pad.shape[1] - 4): # column    
            X = x * scale
            Y = y * scale
            img_upscale[X + 1, Y + 1] = np.right_shift(np.sum(np.multiply(taps14, img_upscale[X-3*scale : X+4*scale : scale, Y + 1])), 6)
            img_upscale[X + 1, Y + 2] = np.right_shift(np.sum(np.multiply(taps14, img_upscale[X-3*scale : X+4*scale : scale, Y + 2])), 6)
            img_upscale[X + 1, Y + 3] = np.right_shift(np.sum(np.multiply(taps14, img_upscale[X-3*scale : X+4*scale : scale, Y + 3])), 6)
            img_upscale[X + 2, Y + 1] = np.right_shift(np.sum(np.multiply(taps12, img_upscale[X-3*scale : X+5*scale : scale, Y + 1])), 6)
            img_upscale[X + 2, Y + 2] = np.right_shift(np.sum(np.multiply(taps12, img_upscale[X-3*scale : X+5*scale : scale, Y + 2])), 6)
            img_upscale[X + 2, Y + 3] = np.right_shift(np.sum(np.multiply(taps12, img_upscale[X-3*scale : X+5*scale : scale, Y + 3])), 6)
            img_upscale[X + 3, Y + 1] = np.right_shift(np.sum(np.multiply(taps34, img_upscale[X-2*scale : X+5*scale : scale, Y + 1])), 6)
            img_upscale[X + 3, Y + 2] = np.right_shift(np.sum(np.multiply(taps34, img_upscale[X-2*scale : X+5*scale : scale, Y + 2])), 6)
            img_upscale[X + 3, Y + 3] = np.right_shift(np.sum(np.multiply(taps34, img_upscale[X-2*scale : X+5*scale : scale, Y + 3])), 6)
    
    # normalization
    img_upscale = (img_upscale - np.min(img_upscale))
    img_upscale = img_upscale / np.max(img_upscale)
    img_upscale = np.uint8(img_upscale*255)
    # crop back to original size
    img_upscale = img_upscale[3*scale:-4*scale, 3*scale:-4*scale]
    
    return img_upscale


def test_one_image_hevc(image_dir, prefix, suffix='png', metrics=[psnr], scale=4):
    image = load_img(image_dir)
    image = image.convert('YCbCr')
    hr_image = modcrop(image, scale)
    lr_image = bicubic_rescale(hr_image, 1 / scale)
    x = img_to_array(lr_image)[np.newaxis, ...]
    bicubic_model = bicubic(x, scale=scale)
    y = bicubic_model.predict_on_batch(x)
    bicubic_array = np.clip(y[0], 0, 255)
    
    x_gray = pre_process(x)
    start = time.perf_counter()
    img_sr_hevc = hevc_gray_upscale(x_gray[0,:,:,0])
    end = time.perf_counter()
    img_output_hevc_chrom_bicubic = np.zeros((img_sr_hevc.shape[0],img_sr_hevc.shape[1],3), dtype=np.uint8)
    img_output_hevc_chrom_bicubic[:,:,0] = img_sr_hevc
    img_output_hevc_chrom_bicubic[:,:,1] = bicubic_array[..., 1]
    img_output_hevc_chrom_bicubic[:,:,2] = bicubic_array[..., 2]
    img_output_hevc_chrom_bicubic = Image.fromarray(img_output_hevc_chrom_bicubic, mode='YCbCr')
#     img_output_hevc_chrom_bicubic.convert('RGB').save(str(save_dir / ('hevc_'+file_name)))

    row_hevc = pd.Series()
    row_hevc['name'] = Path(image_dir).stem
    row_hevc['time'] = end - start
    y_true = inverse_post_process(img_to_array(hr_image))
    metrics=[psnr]
    for metric in metrics:
        row_hevc[metric.__name__] = K.eval(metric(y_true, img_sr_hevc[np.newaxis, ..., np.newaxis]))
    # Save images
    images_to_save = []
    images_to_save += [(hr_image, 'original')]
    images_to_save += [(img_output_hevc_chrom_bicubic, 'output')]
    images_to_save += [(lr_image, 'input')]
    for img, label in images_to_save:
        img.convert(mode='RGB').save('.'.join([prefix, label, suffix]))

    return row_hevc

In [28]:
save_dir = Path(repo_dir) / 'experiments' / 'hevc-sc4'
save_dir.mkdir(parents=True, exist_ok=True)
test_dir = save_dir / 'test'
test_dir.mkdir(exist_ok=True)
data_dir = Path(repo_dir) / 'data' / 'mytest'

In [34]:
test_set='Set14'
metrics=[psnr]
print('Test on', test_set)
image_dir = test_dir / test_set
image_dir.mkdir(exist_ok=True)
data_dir = Path(repo_dir) / 'data'
# Evaluate metrics on each image
rows = []
for image_path in (data_dir / test_set).glob('*'):
    rows += [test_one_image_hevc(str(image_path),str(image_dir / image_path.stem),metrics=metrics)]

df = pd.DataFrame(rows)

# Compute average metrics
row = pd.Series()
row['name'] = 'average'
for col in df:
    if col != 'name':
        row[col] = df[col].mean()
df = df.append(row, ignore_index=True)

df.to_csv(str(test_dir / f'{test_set}/metrics.csv'))


Test on Set14


In [35]:
df

Unnamed: 0,name,time,psnr
0,foreman,0.655279,20.399267
1,man,1.70942,18.013014
2,pepper,1.711894,20.816298
3,comic,0.573778,17.448311
4,coastguard,0.693888,21.656029
5,baboon,1.554434,15.08835
6,monarch,2.684909,22.513008
7,flowers,1.163277,21.038273
8,face,0.518917,23.048426
9,ppt3,2.279411,16.38686
