In [1]:
import sys
import cv2
import numpy as np
from skimage.metrics import structural_similarity as ssim
from matplotlib import pyplot as plt
import h5py
from keras.models import Sequential
from keras.layers import Conv2D, Input, BatchNormalization
from keras.callbacks import ModelCheckpoint
from keras.optimizers import SGD, Adam
import math
import os

%matplotlib inline

Using TensorFlow backend.


In [2]:
def psnr(target, ref):
         
    target_data = target.astype(float)
    ref_data = ref.astype(float)

    diff = ref_data - target_data
    diff = diff.flatten('C')

    rmse = math.sqrt(np.mean(diff ** 2.))

    return 20 * math.log10(255. / (rmse + 0.0000001)) # Избегаем деления на 0

# mean squared error (MSE)
def mse(target, ref):
    err = np.sum((target.astype('float') - ref.astype('float')) ** 2)
    err /= float(target.shape[0] * target.shape[1])
    
    return err

# Объединяем все метрики в одну функцию
def compare_images(target, ref):
    scores = []
    scores.append(psnr(target, ref))
    scores.append(mse(target, ref))
    scores.append(ssim(target, ref, multichannel =True))
    
    return scores

In [3]:
# Подготавливаем испорченные изображения
'''
def prepare_images(path, factor):
    
    for file in os.listdir(path):
        
        img = cv2.imread(path + file)
        
        h, w, _ = img.shape
        new_height = h // factor
        new_width = w // factor
        
        # уменьшаем картинку
        img = cv2.resize(img, (new_width, new_height), interpolation = cv2.INTER_LINEAR)
        
        # растягиваем картинку обратно (обычной интерполяцией), чтобы ухудшить качество
        img = cv2.resize(img, (w, h), interpolation = cv2.INTER_LINEAR)
        
        print('Saving {}'.format(file))
        cv2.imwrite('Images/{}'.format(file), img)
prepare_images('Source/', 2)
'''

"\ndef prepare_images(path, factor):\n    \n    for file in os.listdir(path):\n        \n        img = cv2.imread(path + file)\n        \n        h, w, _ = img.shape\n        new_height = h // factor\n        new_width = w // factor\n        \n        # уменьшаем картинку\n        img = cv2.resize(img, (new_width, new_height), interpolation = cv2.INTER_LINEAR)\n        \n        # растягиваем картинку обратно (обычной интерполяцией), чтобы ухудшить качество\n        img = cv2.resize(img, (w, h), interpolation = cv2.INTER_LINEAR)\n        \n        print('Saving {}'.format(file))\n        cv2.imwrite('Images/{}'.format(file), img)\nprepare_images('Source/', 2)\n"

In [4]:
'''
for file in os.listdir('Images/'):
    
    # open target and reference images
    target = cv2.imread('Images/{}'.format(file))
    ref = cv2.imread('Source/{}'.format(file))
    
    # calculate score
    scores = compare_images(target, ref)

    # print all three scores with new line characters (\n) 
    print('{}\nPSNR: {}\nMSE: {}\nSSIM: {}\n'.format(file, scores[0], scores[1], scores[2]))
'''

"\nfor file in os.listdir('Images/'):\n    \n    # open target and reference images\n    target = cv2.imread('Images/{}'.format(file))\n    ref = cv2.imread('Source/{}'.format(file))\n    \n    # calculate score\n    scores = compare_images(target, ref)\n\n    # print all three scores with new line characters (\n) \n    print('{}\nPSNR: {}\nMSE: {}\nSSIM: {}\n'.format(file, scores[0], scores[1], scores[2]))\n"

In [5]:
DATA_PATH = "Train/"
VAL_PATH = "Test/Set14/"
TEST_PATH = "Test/Set5/"
Random_Crop = 30
Patch_size = 32
label_size = 20
conv_side = 6
scale = 2

# подготавливаем данные

def prepare_data(_path):
    names = os.listdir(_path)
    names = sorted(names)
    nums = names.__len__()

    data = np.zeros((nums * Random_Crop, 1, Patch_size, Patch_size), dtype=np.double)
    label = np.zeros((nums * Random_Crop, 1, label_size, label_size), dtype=np.double)

    for i in range(nums):
        name = _path + names[i]
        hr_img = cv2.imread(name, cv2.IMREAD_COLOR)
        shape = hr_img.shape

        hr_img = cv2.cvtColor(hr_img, cv2.COLOR_BGR2YCrCb)
        hr_img = hr_img[:, :, 0]

        # растягиваем картинку туда-сюда, чтобы ухудшить качество
        lr_img = cv2.resize(hr_img, (shape[1] // scale, shape[0] // scale))
        lr_img = cv2.resize(lr_img, (shape[1], shape[0]))

        # Набираем случайно patches для обучения
        Points_x = np.random.randint(0, min(shape[0], shape[1]) - Patch_size, Random_Crop)
        Points_y = np.random.randint(0, min(shape[0], shape[1]) - Patch_size, Random_Crop)

        for j in range(Random_Crop):
            lr_patch = lr_img[Points_x[j]: Points_x[j] + Patch_size, Points_y[j]: Points_y[j] + Patch_size]
            hr_patch = hr_img[Points_x[j]: Points_x[j] + Patch_size, Points_y[j]: Points_y[j] + Patch_size]

            lr_patch = lr_patch.astype(float) / 255.
            hr_patch = hr_patch.astype(float) / 255.

            data[i * Random_Crop + j, 0, :, :] = lr_patch
            label[i * Random_Crop + j, 0, :, :] = hr_patch[conv_side: -conv_side, conv_side: -conv_side]
            # cv2.imshow("lr", lr_patch)
            # cv2.imshow("hr", hr_patch)
            # cv2.waitKey(0)
    return data, label

# BORDER_CUT = 8
BLOCK_STEP = 16
BLOCK_SIZE = 32


def prepare_crop_data(_path):
    names = os.listdir(_path)
    names = sorted(names)
    nums = names.__len__()

    data = []
    label = []

    for i in range(nums):
        name = _path + names[i]
        hr_img = cv2.imread(name, cv2.IMREAD_COLOR)
        hr_img = cv2.cvtColor(hr_img, cv2.COLOR_BGR2YCrCb)
        hr_img = hr_img[:, :, 0]
        shape = hr_img.shape

        # растягиваем картинку туда-сюда, чтобы ухудшить качество
        lr_img = cv2.resize(hr_img, (shape[1] // scale, shape[0] // scale))
        lr_img = cv2.resize(lr_img, (shape[1], shape[0]))

        width_num = (shape[0] - (BLOCK_SIZE - BLOCK_STEP) * 2) // BLOCK_STEP
        height_num = (shape[1] - (BLOCK_SIZE - BLOCK_STEP) * 2) // BLOCK_STEP
        for k in range(width_num):
            for j in range(height_num):
                x = k * BLOCK_STEP
                y = j * BLOCK_STEP
                hr_patch = hr_img[x: x + BLOCK_SIZE, y: y + BLOCK_SIZE]
                lr_patch = lr_img[x: x + BLOCK_SIZE, y: y + BLOCK_SIZE]

                lr_patch = lr_patch.astype(float) / 255.
                hr_patch = hr_patch.astype(float) / 255.

                lr = np.zeros((1, Patch_size, Patch_size), dtype=np.double)
                hr = np.zeros((1, label_size, label_size), dtype=np.double)

                lr[0, :, :] = lr_patch
                hr[0, :, :] = hr_patch[conv_side: -conv_side, conv_side: -conv_side]

                data.append(lr)
                label.append(hr)

    data = np.array(data, dtype=float)
    label = np.array(label, dtype=float)
    return data, label


def write_hdf5(data, labels, output_filename):

    x = data.astype(np.float32)
    y = labels.astype(np.float32)

    with h5py.File(output_filename, 'w') as h:
        h.create_dataset('data', data=x, shape=x.shape)
        h.create_dataset('label', data=y, shape=y.shape)
        # h.create_dataset()


def read_training_data(file):
    with h5py.File(file, 'r') as hf:
        data = np.array(hf.get('data'))
        label = np.array(hf.get('label'))
        train_data = np.transpose(data, (0, 2, 3, 1))
        train_label = np.transpose(label, (0, 2, 3, 1))
        return train_data, train_label
    
data, label = prepare_crop_data(DATA_PATH)
write_hdf5(data, label, "train.h5")
data, label = prepare_data(VAL_PATH)
write_hdf5(data, label, "val.h5")

In [6]:
def model():
    
    SRCNN = Sequential()
    
    # Делаем три слоя, как в статье
    SRCNN.add(Conv2D(filters=128, kernel_size = (9, 9), kernel_initializer='glorot_uniform',
                     activation='relu', padding='valid', use_bias=True, input_shape=(None, None, 1)))
    SRCNN.add(Conv2D(filters=64, kernel_size = (3, 3), kernel_initializer='glorot_uniform',
                     activation='relu', padding='same', use_bias=True))
    SRCNN.add(Conv2D(filters=1, kernel_size = (5, 5), kernel_initializer='glorot_uniform',
                     activation='linear', padding='valid', use_bias=True))
    
    # Оптимизатор будет один на все слои
    # И learning rate тоже будет одинаковый
    adam = Adam(lr=0.0003)
    
    SRCNN.compile(optimizer=adam, loss='mean_squared_error', metrics=['mean_squared_error'])
    
    return SRCNN

In [8]:
'''
def modcrop(img, scale):
    tmpsz = img.shape
    sz = tmpsz[0:2]
    sz = sz - np.mod(sz, scale)
    img = img[0:sz[0], 0:sz[1]]
    return img
'''

def degradation(img, scale):
    h, w, _ = img.shape
    new_height = h // scale
    new_width = w // scale
        
    # сжимаем и растягиваем картинку, чтобы ухудшить качество
    img = cv2.resize(img, (new_width, new_height), interpolation = cv2.INTER_LINEAR)
        
    img = cv2.resize(img, (w, h), interpolation = cv2.INTER_LINEAR)
    
    return img


# Обрезаем края
def shave(image, border):
    img = image[border: -border, border: -border]
    return img

# Тренировочная функция
def train():
    print(srcnn_model.summary())
    data, label = read_training_data("train.h5")
    val_data, val_label = read_training_data("val.h5")

    checkpoint = ModelCheckpoint("SRCNN_check.h5", monitor='val_loss', verbose=1, save_best_only=True,
                                 save_weights_only=False, mode='min')
    callbacks_list = [checkpoint]

    srcnn_model.fit(data, label, batch_size=128, validation_data=(val_data, val_label),
                    callbacks=callbacks_list, shuffle=True, epochs=50, verbose=0)

# Предсказывающая функция
def predict(img):
    # img = cv2.imread(image_path)
    
    
    # конвертируем в YCrCb
    temp = cv2.cvtColor(img, cv2.COLOR_BGR2YCrCb)
    
    # нормализуем 
    Y = np.zeros((1, temp.shape[0], temp.shape[1], 1), dtype=float)
    Y[0, :, :, 0] = temp[:, :, 0].astype(float) / 255
    
    # применяем модель
    pre = srcnn_model.predict(Y, batch_size=1)
    
    # пост-обработка
    pre *= 255
    pre[pre[:] > 255] = 255
    pre[pre[:] < 0] = 0
    pre = pre.astype(np.uint8)
    
    # конвертируем в BGR
    temp = shave(temp, 6)
    temp[:, :, 0] = pre[0, :, :, 0]
    output = cv2.cvtColor(temp, cv2.COLOR_YCrCb2BGR)
    
    return output


In [9]:
def results(image_path, scale):

    ref = cv2.imread(image_path) # оригинальные данные
    degraded = degradation(ref, scale) # испорченные данные
    
    #ref = modcrop(ref, 3)
    #degraded = modcrop(degraded, 3)
    
    output = predict(degraded) # восстановленные данные

    # убираем границу
    ref = shave(ref.astype(np.uint8), 6)
    degraded = shave(degraded.astype(np.uint8), 6)
    
    # считам метрики
    scores = []
    scores.append(compare_images(degraded, ref))
    scores.append(compare_images(output, ref))
    
    return ref, degraded, output, scores

In [10]:
srcnn_model = model()
train()

Model: "sequential_2"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
conv2d_4 (Conv2D)            (None, None, None, 128)   10496     
_________________________________________________________________
conv2d_5 (Conv2D)            (None, None, None, 64)    73792     
_________________________________________________________________
conv2d_6 (Conv2D)            (None, None, None, 1)     1601      
Total params: 85,889
Trainable params: 85,889
Non-trainable params: 0
_________________________________________________________________
None

Epoch 00001: val_loss improved from inf to 0.00211, saving model to SRCNN_check.h5

Epoch 00002: val_loss improved from 0.00211 to 0.00167, saving model to SRCNN_check.h5

Epoch 00003: val_loss improved from 0.00167 to 0.00154, saving model to SRCNN_check.h5

Epoch 00004: val_loss did not improve from 0.00154

Epoch 00005: val_loss improved from 0.00154 to 0.00145, saving

In [11]:
for file in os.listdir('Test/Set5'):
    
    # Проходим по тестовой выборке и сохраняем результаты сравнения оригинальной, испорченной и восстановленной 
    # картинок между собой
    ref, degraded, output, scores = results('Test/Set5/{}'.format(file), 2)
    
    fig, axs = plt.subplots(1, 3, figsize=(20, 8))
    axs[0].imshow(cv2.cvtColor(ref, cv2.COLOR_BGR2RGB))
    axs[0].set_title('Original')
    axs[1].imshow(cv2.cvtColor(degraded, cv2.COLOR_BGR2RGB))
    axs[1].set_title('Degraded')
    axs[1].set(xlabel = 'PSNR: {}\nMSE: {} \nSSIM: {}'.format(scores[0][0], scores[0][1], scores[0][2]))
    axs[2].imshow(cv2.cvtColor(output, cv2.COLOR_BGR2RGB))
    axs[2].set_title('SRCNN')
    axs[2].set(xlabel = 'PSNR: {} \nMSE: {} \nSSIM: {}'.format(scores[1][0], scores[1][1], scores[1][2]))

    for ax in axs:
        ax.set_xticks([])
        ax.set_yticks([])
      
    print('Saving {}'.format(file))
    fig.savefig('Output/{}.png'.format(os.path.splitext(file)[0])) 
    plt.close()

Saving woman_GT.bmp
Saving butterfly_GT.bmp
Saving bird_GT.bmp
Saving head_GT.bmp
Saving baby_GT.bmp


In [12]:
# Теперь в Output можно посмотреть результаты