In [59]:
# define a function for peak signal-to-noise ratio (PSNR)
import cv2
import math
import numpy


def psnr(target, ref):
    # assume RGB image
    target_data = numpy.array(target, dtype=float)
    ref_data = numpy.array(ref, dtype=float)

    diff = ref_data - target_data
    diff = diff.flatten('C')   #flatten in row major

    rmse = math.sqrt(numpy.mean(diff ** 2.))

    return 20 * math.log10(255.) - 10 * math.log10(rmse) 

# Data Processing

In [10]:
# Load package for data processig
import os
import cv2
import h5py
import numpy
from matplotlib import pyplot as plt

HR_PATH = "HR/"
LR_PATH = "LR/"
Random_Crop = 30    #every img will be cropped into 30 sub imgs
Patch_size = 32     #sub imgs size
label_size = 20     #after three filters, the output of img will be smaller than original
conv_side = 6       #32-2*6

In [16]:
#data processing
def prepare_data(HR_path, LR_path):
    names_HR = os.listdir(HR_path)
    names_HR = sorted(names_HR)
    
    names_LR = os.listdir(LR_path)
    names_LR = sorted(names_LR)  
    
    nums = names_HR.__len__()
    
    
    data = numpy.zeros((nums * Random_Crop, 1, Patch_size, Patch_size), dtype=numpy.double)
    label = numpy.zeros((nums * Random_Crop, 1, label_size, label_size), dtype=numpy.double)

    for i in range(nums):
        name_HR = HR_path + names_HR[i]
        name_LR = LR_path + names_LR[i]
        hr_img = cv2.imread(name_HR)
        shape = hr_img.shape

        hr_img = cv2.cvtColor(hr_img, cv2.COLOR_BGR2YCrCb)
        hr_img = hr_img[:, :, 0]

        # two resize operation to produce training data and labels
        lr_img = cv2.imread(name_LR)
        lr_img = cv2.resize(lr_img, (shape[1], shape[0]))
        lr_img = cv2.cvtColor(lr_img, cv2.COLOR_BGR2YCrCb)
        lr_img = lr_img[:, :, 0]
        

        # produce Random_Crop random coordinate to crop training img
        Points_x = numpy.random.randint(0, min(shape[0], shape[1]) - Patch_size, Random_Crop)
        Points_y = numpy.random.randint(0, min(shape[0], shape[1]) - Patch_size, Random_Crop)

        for j in range(Random_Crop):
            lr_patch = lr_img[Points_x[j]: Points_x[j] + Patch_size, Points_y[j]: Points_y[j] + Patch_size]
            hr_patch = hr_img[Points_x[j]: Points_x[j] + Patch_size, Points_y[j]: Points_y[j] + Patch_size]

            lr_patch = lr_patch.astype(float) / 255.
            hr_patch = hr_patch.astype(float) / 255.

            data[i * Random_Crop + j, 0, :, :] = lr_patch
            label[i * Random_Crop + j, 0, :, :] = hr_patch[conv_side: -conv_side, conv_side: -conv_side]
            # cv2.imshow("lr", lr_patch)
            # cv2.imshow("hr", hr_patch)
            # cv2.waitKey(0)
    return data, label

In [17]:
#store data and label
def write_hdf5(data, labels, output_filename):
    """
    This function is used to save image data and its label(s) to hdf5 file.
    output_file.h5,contain data and label
    """

    x = data.astype(numpy.float32)
    y = labels.astype(numpy.float32)

    with h5py.File(output_filename, 'w') as h:
        h.create_dataset('data', data=x, shape=x.shape)
        h.create_dataset('label', data=y, shape=y.shape)
        # h.create_dataset()

In [19]:
# read data and label
def read_training_data(file):
    with h5py.File(file, 'r') as hf:
        data = numpy.array(hf.get('data'))
        label = numpy.array(hf.get('label'))
        train_data = numpy.transpose(data, (0, 2, 3, 1))
        train_label = numpy.transpose(label, (0, 2, 3, 1))
        return train_data, train_label

In [27]:
# get the img data and model label


#data, label = prepare_data(HR_path=HR_path, LR_path=LR_path)
#write_hdf5(data, label, "train.h5")

# Train SRCNN Model

In [38]:
# Load package for model train

from keras.models import Sequential
from keras.layers import Conv2D, Input, BatchNormalization
# from keras.layers.advanced_activations import LeakyReLU
from keras.callbacks import ModelCheckpoint
from keras.optimizers import SGD, Adam
#import prepare_data as pd
import numpy
import math



In [39]:
def model():
    # lrelu = LeakyReLU(alpha=0.1)
    SRCNN = Sequential()
    SRCNN.add(Conv2D(nb_filter=128, nb_row=9, nb_col=9, init='glorot_uniform',
                     activation='relu', border_mode='valid', bias=True, input_shape=(32, 32, 1)))
    SRCNN.add(Conv2D(nb_filter=64, nb_row=3, nb_col=3, init='glorot_uniform',
                     activation='relu', border_mode='same', bias=True))
    # SRCNN.add(BatchNormalization())
    SRCNN.add(Conv2D(nb_filter=1, nb_row=5, nb_col=5, init='glorot_uniform',
                     activation='linear', border_mode='valid', bias=True))
    adam = Adam(lr=0.0003)
    SRCNN.compile(optimizer=adam, loss='mean_squared_error', metrics=['mean_squared_error'])
    return SRCNN

In [40]:
def predict_model():
    # lrelu = LeakyReLU(alpha=0.1)
    SRCNN = Sequential()
    SRCNN.add(Conv2D(nb_filter=128, nb_row=9, nb_col=9, init='glorot_uniform',
                     activation='relu', border_mode='valid', bias=True, input_shape=(None, None, 1)))
    SRCNN.add(Conv2D(nb_filter=64, nb_row=3, nb_col=3, init='glorot_uniform',
                     activation='relu', border_mode='same', bias=True))
    # SRCNN.add(BatchNormalization())
    SRCNN.add(Conv2D(nb_filter=1, nb_row=5, nb_col=5, init='glorot_uniform',
                     activation='linear', border_mode='valid', bias=True))
    adam = Adam(lr=0.0003)
    SRCNN.compile(optimizer=adam, loss='mean_squared_error', metrics=['mean_squared_error'])
    return SRCNN

In [41]:
def train():
    srcnn_model = model()
    print(srcnn_model.summary())
    data, label = read_training_data("./crop_train.h5")
    #val_data, val_label = read_training_data("./test.h5")

    checkpoint = ModelCheckpoint("SRCNN_check.h5", monitor='val_loss', verbose=1, save_best_only=True,
                                 save_weights_only=False, mode='min')
    callbacks_list = [checkpoint]
                                                #validation_data=(val_data, val_label),
    srcnn_model.fit(data, label, batch_size=128, callbacks=callbacks_list, 
                    shuffle=True, nb_epoch=200, verbose=0)
    srcnn_model.save_weights('srcnn.h5')
    return(srcnn_model)
    # srcnn_model.load_weights("m_model_adam.h5")

In [None]:
# define main prediction function最后要用的

def predict_score(LR_PATH, HR_PATH):
    
    # load the srcnn model with weights
    srcnn_model = predict_model()
    srcnn_model.load_weights('srcnn.h5')
    
    # load the degraded and reference images
    #_, hrfile = os.path.split(HR_PATH)
    #_, lrfile = os.path.split(LR_PATH)
    #degraded = cv2.imread(image_path)
    #ref = cv2.imread('source/{}'.format(file))
    
    # preprocess the image with modcrop
    hr_img = cv2.imread(HR_PATH)
    shape = hr_img.shape

    img = cv2.cvtColor(hr_img, cv2.COLOR_BGR2YCrCb)

    # two resize operation to produce training data and labels
    lr_img = cv2.imread(LR_PATH)
    lr_img = cv2.resize(lr_img, (shape[1], shape[0]))
    lr_img = cv2.cvtColor(lr_img, cv2.COLOR_BGR2YCrCb)
    lr_img = lr_img[:, :, 0]
    
    img[:, :, 0] = lr_img
    img = cv2.cvtColor(img, cv2.COLOR_YCrCb2BGR)
    
    Y = numpy.zeros((1, shape[0], shape[1], 1), dtype=float)
    Y[0, :, :, 0] = lr_img.astype(float) / 255.
    
    pre = srcnn_model.predict(Y, batch_size=1) * 255.
    pre[pre[:] > 255] = 255
    pre[pre[:] < 0] = 0
    pre = pre.astype(numpy.uint8)
    img = cv2.cvtColor(img, cv2.COLOR_BGR2YCrCb)
    img[6: -6, 6: -6, 0] = pre[0, :, :, 0]
    img = cv2.cvtColor(img, cv2.COLOR_YCrCb2BGR)    
    score = psnr(hr_img,img)
    
    return score

In [45]:
# define main prediction function最后要用的

def predict(LR_PATH, HR_PATH):
    
    # load the srcnn model with weights
    srcnn_model = predict_model()
    srcnn_model.load_weights('srcnn.h5')
    
    # load the degraded and reference images
    #_, hrfile = os.path.split(HR_PATH)
    #_, lrfile = os.path.split(LR_PATH)
    #degraded = cv2.imread(image_path)
    #ref = cv2.imread('source/{}'.format(file))
    
    # preprocess the image with modcrop
    hr_img = cv2.imread(HR_PATH)
    shape = hr_img.shape

    img = cv2.cvtColor(hr_img, cv2.COLOR_BGR2YCrCb)

    # two resize operation to produce training data and labels
    lr_img = cv2.imread(LR_PATH)
    lr_img = cv2.resize(lr_img, (shape[1], shape[0]))
    lr_img = cv2.cvtColor(lr_img, cv2.COLOR_BGR2YCrCb)
    lr_img = lr_img[:, :, 0]
    
    img[:, :, 0] = lr_img
    img = cv2.cvtColor(img, cv2.COLOR_YCrCb2BGR)
    
    Y = numpy.zeros((1, shape[0], shape[1], 1), dtype=float)
    Y[0, :, :, 0] = lr_img.astype(float) / 255.
    
    pre = srcnn_model.predict(Y, batch_size=1) * 255.
    pre[pre[:] > 255] = 255
    pre[pre[:] < 0] = 0
    pre = pre.astype(numpy.uint8)
    img = cv2.cvtColor(img, cv2.COLOR_BGR2YCrCb)
    img[6: -6, 6: -6, 0] = pre[0, :, :, 0]
    img = cv2.cvtColor(img, cv2.COLOR_YCrCb2BGR)    
    score = psnr(hr_img,img)
    
    return score, hr_img, img

In [70]:
f = [x for x in os.listdir('LR/') if x != '.DS_Store' ]
for file in f:
    
    # perform super-resolution
    score, hr_img, sr_img = predict(LR_PATH='LR/{}'.format(file), HR_PATH='HR/{}'.format(file))
    
    # display images as subplots
    fig, axs = plt.subplots(1, 2, figsize=(10, 10))
    axs[0].imshow(cv2.cvtColor(hr_img, cv2.COLOR_BGR2RGB))
    axs[0].set_title('Original')
    axs[1].imshow(cv2.cvtColor(sr_img, cv2.COLOR_BGR2RGB))
    axs[1].set_title('Super Resolution')
    axs[1].set(xlabel = 'PSNR: {}\n'.format(score))


    # remove the x and y ticks
    for ax in axs:
        ax.set_xticks([])
        ax.set_yticks([])
      
    print('Saving {}'.format(file))
    fig.savefig('/Users/Janice/Documents/Jupyter/output/{}.png'.format(os.path.splitext(file)[0])) 
    plt.close()

  """
  import sys
  # Remove the CWD from sys.path while we load stuff.


Saving img_1327.jpg
Saving img_1326.jpg
Saving img_0326.jpg
Saving img_0327.jpg
Saving img_0127.jpg
Saving img_0126.jpg
