In [1]:
import glob as gb
import numpy as np
import glob as gb
import os
import h5py
import numpy as np
from tqdm import tqdm
from PIL import Image
from keras.layers import Input, concatenate
from keras.layers.advanced_activations import LeakyReLU
from keras.layers.convolutional import Convolution2D
from keras.layers.core import Dropout, Dense, Flatten, Lambda
from keras.layers.merge import Average
from keras.layers.normalization import BatchNormalization
from keras.models import Model
from keras.utils.vis_utils import plot_model
import keras.backend as K
import numpy as np
from keras.applications.vgg16 import VGG16
from keras.models import Model

Using TensorFlow backend.


In [2]:
image_shape = (256, 256, 3)
K_1 = 145
K_2 = 170
channel_rate = 64
patch_shape = (channel_rate, channel_rate, 3)

In [3]:
def normalization(x):
    return x / 127.5 - 1

def formatImage(path, size):
    inImage = Image.open(path)
    fImage = inImage.crop((0, 0, inImage.size[0] / 2, inImage.size[1]))
    bImage = inImage.crop((inImage.size[0] / 2, 0, inImage.size[0], inImage.size[1]))

    fImage = fImage.resize((size, size), Image.ANTIALIAS)
    bImage = bImage.resize((size, size), Image.ANTIALIAS)

    return np.array(fImage), np.array(b_Image)

def buildHDF5(jpeg_dir, size=256):
    hdf5_file = os.path.join('data', 'data.h5')
    with h5py.File(hdf5_file, 'w') as f:

        for data_type in tqdm(['train', 'test'], desc='create HDF5 dataset from images'):
            dataPath = jpeg_dir + '/%s/*.jpg' % data_type
            imagesPath = gb.glob(dataPath)
            fData = []
            bData = []
            for imagePath in imagesPath:
                fImage, bImage = formatImage(imagePath, size)
                fData.append(fImage)
                bData.append(bImage)

            f.create_dataset('%s_data_full' % data_type, data=fData)
            f.create_dataset('%s_data_blur' % data_type, data=bData)

def loadData(data_type):
    with h5py.File('data/data.h5', 'r') as f:
        fData = f['%s_data_full' % data_type][:].astype(np.float32)
        fData = normalization(fData)

        bData = f['%s_data_blur' % data_type][:].astype(np.float32)
        bData = normalization(bData)

        return fData, bData


def generateImage(full, blur, generated, path, epoch=None, index=None):
    full = full * 127.5 + 127.5
    blur = blur * 127.5 + 127.5
    generated = generated * 127.5 + 127.5
    for i in range(generated.shape[0]):
        fImage = full[i, :, :, :]
        bImage = blur[i, :, :, :]
        gImage = generated[i, :, :, :]
        image = np.concatenate((fImage, bImage, gImage), axis=1)
        if (epoch is not None) and (index is not None):
            Image.fromarray(image.astype(np.uint8)).save(path + str(epoch + 1) + '_' + str(index + 1) + '.png')
        else:
            Image.fromarray(image.astype(np.uint8)).save(path + str(i) + '.png')
    
def lossL1(yTrue, yPred):
    return K.mean(K.abs(yPred - yTrue))

def lossPerceptual(yTrue, yPred):
    vgg = VGG16(include_top=False, weights='imagenet', input_shape=image_shape)
    lossModel = Model(inputs=vgg.input, outputs=vgg.get_layer('block3_conv3').output)
    lossModel.trainable = False
    return K.mean(K.square(lossModel(yTrue) - lossModel(yPred)))

def lossGenerator(yTrue, yPred):
    return K_1 * lossPerceptual(yTrue, yPred) + K_2 * lossL1(yTrue, yPred)

def lossAdversarial(yTrue, yPred):
    return -K.log(yPred)

def denseBlock(inputs, dilationFactor=None):
    x = LeakyReLU(alpha=0.2)(inputs)
    x = Convolution2D(filters=4 * channel_rate, kernel_size=(1, 1), padding='same')(x)
    x = BatchNormalization()(x)
    x = LeakyReLU(alpha=0.2)(x)
    
    if dilationFactor is not None:
        x = Convolution2D(filters=channel_rate, kernel_size=(3, 3), padding='same',
                          dilation_rate=dilationFactor)(x)
    else:
        x = Convolution2D(filters=channel_rate, kernel_size=(3, 3), padding='same')(x)
    x = BatchNormalization()(x)
    x = Dropout(rate=0.5)(x)
    return x


def generatorModel():
    in_gen = Input(shape=(None, None, 3))
    head = Convolution2D(filters=4 * channel_rate, kernel_size=(3, 3), padding='same')(in_gen)
    dense1 = denseBlock(inputs=head)
    x = concatenate([head, dense1])
    dense2 = denseBlock(inputs=x, dilationFactor=(1, 1))
    x = concatenate([x, dense2])
    dense3 = denseBlock(inputs=x)
    x = concatenate([x, dense3])
    dense4 = denseBlock(inputs=x, dilationFactor=(2, 2))
    x = concatenate([x, dense4])
    dense5 = denseBlock(inputs=x)
    x = concatenate([x, dense5])
    dense6 = denseBlock(inputs=x, dilationFactor=(3, 3))
    x = concatenate([x, dense6])
    dense7 = denseBlock(inputs=x)
    x = concatenate([x, dense7])
    dense8 = denseBlock(inputs=x, dilationFactor=(2, 2))
    x = concatenate([x, dense8])
    dense9 = denseBlock(inputs=x)
    x = concatenate([x, dense9])
    dense10 = denseBlock(inputs=x, dilationFactor=(1, 1))
    x = LeakyReLU(alpha=0.2)(dense10)
    x = Convolution2D(filters=4 * channel_rate, kernel_size=(1, 1), padding='same')(x)
    x = BatchNormalization()(x)
    x = concatenate([head, x])
    x = Convolution2D(filters=channel_rate, kernel_size=(3, 3), padding='same')(x)
    x = LeakyReLU(alpha=0.2)(x)
    out_gen = Convolution2D(filters=3, kernel_size=(3, 3), padding='same', activation='tanh')(x)
    
    model = Model(inputs=in_gen, outputs=out_gen, name='Generator')
    return model


def discriminatorModel():
    in_dis = Input(shape=patch_shape)
    x = Convolution2D(filters=channel_rate, kernel_size=(3, 3), strides=(2, 2), padding="same")(in_dis)
    x = BatchNormalization()(x)
    x = LeakyReLU(alpha=0.2)(x)
    x = Convolution2D(filters=2 * channel_rate, kernel_size=(3, 3), strides=(2, 2), padding="same")(x)
    x = BatchNormalization()(x)
    x = LeakyReLU(alpha=0.2)(x)
    x = Convolution2D(filters=4 * channel_rate, kernel_size=(3, 3), strides=(2, 2), padding="same")(x)
    x = BatchNormalization()(x)
    x = LeakyReLU(alpha=0.2)(x)
    x = Convolution2D(filters=4 * channel_rate, kernel_size=(3, 3), strides=(2, 2), padding="same")(x)
    x = BatchNormalization()(x)
    x = LeakyReLU(alpha=0.2)(x)
    x = Flatten()(x)
    out_dis = Dense(units=1, activation='sigmoid')(x)
    model = Model(inputs=in_dis, outputs=out_dis, name='Discriminator')

    inputs = Input(shape=image_shape)

    list_row_idx = [(i * channel_rate, (i + 1) * channel_rate) for i in
                    range(int(image_shape[0] / patch_shape[0]))]
    list_col_idx = [(i * channel_rate, (i + 1) * channel_rate) for i in
                    range(int(image_shape[1] / patch_shape[1]))]

    list_patch = []
    for row_idx in list_row_idx:
        for col_idx in list_col_idx:
            x_patch = Lambda(lambda z: z[:, row_idx[0]:row_idx[1], col_idx[0]:col_idx[1], :])(inputs)
            list_patch.append(x_patch)

    x = [model(patch) for patch in list_patch]
    outputs = Average()(x)
    model = Model(inputs=inputs, outputs=outputs, name='Discriminator')
    return model

def generatorWithDiscriminator(generator, discriminator):
    inputs = Input(shape=image_shape)
    generated_image = generator(inputs)
    outputs = discriminator(generated_image)
    model = Model(inputs=inputs, outputs=outputs)
    return model

def train(batch_size, epoch_num):
    yTrain, xTrain = loadData(data_type='train')

    g = generatorModel()
    d = discriminatorModel()
    d_on_g = generatorWithDiscriminator(g, d)

    g.compile(optimizer='adam', loss=lossGenerator)
    d.compile(optimizer='adam', loss='binary_crossentropy')
    d_on_g.compile(optimizer='adam', loss=lossAdversarial)

    for epoch in range(epoch_num):
        print('epoch: ', epoch + 1, '/', epoch_num)
        print('batches: ', int(xTrain.shape[0] / batch_size))

        for index in range(int(xTrain.shape[0] / batch_size)):
            bImageBatch = xTrain[index * batch_size:(index + 1) * batch_size]
            fImageBatch = yTrain[index * batch_size:(index + 1) * batch_size]
            gImages = g.predict(x=bImageBatch, batch_size=batch_size)

            if (index % 30 == 0) and (index != 0):
                generateImage(fImageBatch, bImageBatch, gImages,
                                          'result/interim/', epoch, index)

            x = np.concatenate((fImageBatch, gImages))

            y = [1] * batch_size + [0] * batch_size

            dLoss = d.train_on_batch(x, y)
            print('batch %d d_loss : %f' % (index + 1, dLoss))

            d.trainable = False

            d_on_gLoss = d_on_g.train_on_batch(bImageBatch, [1] * batch_size)
            print('batch %d d_on_g_loss : %f' % (index + 1, d_on_gLoss))

            gLoss = g.train_on_batch(bImageBatch, fImageBatch)
            print('batch %d g_loss : %f' % (index + 1, gLoss))

            d.trainable = True

            if (index % 30 == 0) and (index != 0):
                g.save_weights('weight/generator_weights.h5', True)
                d.save_weights('weight/discriminator_weights.h5', True)


def test(batch_size):
    yTest, xTest = loadData(data_type='test')
    g = generatorModel()
    g.load_weights('weight/generator_weights.h5')
    gImages = g.predict(x=xTest, batch_size=batch_size)
    generateImage(yTest, xTest, gImages, 'result/final/')


def testPictures(batch_size):
    dataPath = 'data/test/*.jpeg'
    imagesPath = gb.glob(dataPath)
    bData = []
    for imagePath in imagesPath:
        bImage = Image.open(imagePath)
        bData.append(np.array(bImage))

    bData = np.array(bData).astype(np.float32)
    bData = normalization(bData)

    g = generatorModel()
    g.load_weights('weight/generator_weights.h5')
    gImages = g.predict(x=bData, batch_size=batch_size)
    generated = gImages * 127.5 + 127.5
    for i in range(generated.shape[0]):
        image_generated = generated[i, :, :, :]
        Image.fromarray(image_generated.astype(np.uint8)).save('result/test/' + str(i) + '.png')

In [None]:
train(batch_size=2, epoch_num=10)
test(4)
testPictures(2)

epoch:  1 / 10
batches:  302
