In [53]:
import numpy as np
from keras.models import Model
from keras.layers import Input, BatchNormalization, Dense, Add, Reshape, Permute
from keras.layers.convolutional import Conv2D
from keras.layers.advanced_activations import PReLU, LeakyReLU
from keras.optimizers import Adam

import cv2
import matplotlib.pyplot as plt


ratio = 4
LR_shape = np.array([120, 160, 3])
g_weight_name = "g_weight_01.h5"
d_weight_name = "d_weight_01.h5"

L_h, L_w, c = LR_shape
H_h = L_h * ratio
H_w = L_w * ratio
HR_shape = np.array([H_h, H_w, c])

optimizer = Adam()


        
def pixel_shuffle(in_map, h, w, c = channels):
    x = Reshape((h, w, 2, 2, c))(in_map)
    x = Permute((3, 1, 4, 2, 5))(x)
    out_map = Reshape((2 * h, 2 * w, c))(x)
    return Model(in_map, out_map)

def upsampling(in_map, h, w, c):
    x = Conv2D(filters = 4 * c, 
                     kernel_size = 3,
                     strides = 1,
                     padding = "same")(in_map)
    x = pixel_shuffle(x, h, w, c)
    out_map = PReLU()(x)
    return Model(in_map, out_map)

def residual_block(in_map):
    x = Conv2D(filters = 64, kernel_size = 3, strides = 1, padding = "same")(in_map)
    x = BatchNormalization()(x)
    x = PReLU()(x)
    x = Conv2D(filters = 64, kernel_size = 3, strides = 1, padding = "same")(x)
    x = BatchNormalization()(x)
    out_map = Add()([x, in_map])
    return out_map

def d_block(in_map, filters, kernel_size, strides, padding):
    d = Conv2D(filters = filters, kernel_size = kernel_size, strides = strides, padding = padding)(in_map)
    d = BatchNormalization()(d)
    d = LeakyReLU(alpha = 0.2)(d)
    return d

def denormalize(img):
    img = (img + 1) * 127.5
    return img.astype(np.uint8)


def save_images(epoch, hr_img, lr_img, sr_img):
    hr_img = denormalize(hr_img)
    lr_img = denormalize(lr_img)
    sr_img = denormalize(sr_img)
    
    cv2.imwrite("../../images/output/" + str(epoch+1) + "_HRimage.png", hr_img)
    cv2.imwrite("../../images/output/" + str(epoch+1) + "_LRimage.png", lr_img)
    cv2.imwrite("../../images/output/" + str(epoch+1) + "_SRimage.png", sr_img)

    
def psnr_calc(img1: np.ndarray, img2: np.ndarray):
    def convert(img):
        return cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    def extract_y(image: np.ndarray) -> np.ndarray:
        if image.ndim == 2:
            return image
        image = image.astype(np.int32)
        return ((image[:, :, 2] * 65.481 / 255.
                  + image[:, :, 1] * 128.553 / 255.
                  + image[:, :, 0] * 24.966 / 255.) + 16).astype(np.int32)


    def psnr(img1, img2):
        mse = np.mean((img1 - img2) ** 2)
        if mse == 0:
            return 100
        PIXEL_MAX = 255.0
        return 10 * math.log10(PIXEL_MAX * PIXEL_MAX / mse)

    img1_conv=convert(img1)
    img2_conv=convert(img2)

    # BGR -> YCrCb
    # 画像はcv2.imreadで読まれている前提 [0, 255]
    y1 = extract_y(img1_conv)
    y2 = extract_y(img2_conv)
    # 周囲のcropping
    # assert y1.shape == y2.shape
    h, w = y1.shape
    cr = ratio
    cropped_y1 = y1[cr:h - cr, cr:w - cr].astype(np.float64)
    cropped_y2 = y2[cr:h - cr, cr:w - cr].astype(np.float64)

    # psnr
    psnr_val = psnr(cropped_y1, cropped_y2)
    return psnr_val

    
        
        
        

In [54]:
def build_generator():
    input_img = Input(shape = LR_shape)
    middle = Conv2D(filters = 64, kernel_size = 9, strides = 1, padding = "same")(input_img)
    middle = PReLU()(middle)
    
    g = residual_block(middle)
    for _ in range(4):
        g = residual_block(g)

    g = Conv2D(filters = 64, kernel_size = 3, strides = 1, padding = "same")(g)
    g = BatchNormalization()(g)
    g = Add()([g, middle])

    n = ratio
    i = 1
    while(n % 2 == 0):
        g = upsampling(g, L_h * i, L_w * i, c)
        i = i * 2
        n = n // 2

    output_img = Conv2D(filters = 3, kernel_size = 9, strides = 1, padding = "same")(g)

    return Model(input_img, output_img)


#------------------------------------------------------------------------#


def build_discriminator():
    input_img = Input(shape = HR_shape)
    
    d = Conv2D(filters = 64, kernel_size = 3, strides = 1, padding = "same")(input_img)
    d = d_block(d, filters = 64, kernel_size = 3, strides = 2, padding = "same")
    d = d_block(d, filters = 128, kernel_size = 3, strides = 1, padding = "same")
    d = d_block(d, filters = 128, kernel_size = 3, strides = 2, padding = "same")
    d = d_block(d, filters = 256, kernel_size = 3, strides = 1, padding = "same")
    d = d_block(d, filters = 256, kernel_size = 3, strides = 2, padding = "same")
    d = d_block(d, filters = 512, kernel_size = 3, strides = 1, padding = "same")
    d = d_block(d, filters = 512, kernel_size = 3, strides = 2, padding = "same")
    d = Dense(1024)(d)
    d = LeakyReLU(alpha = 0.2)(d)
    output_img = Dense(1, activation = "sigmoid")(d)

    return Model(input_img, output_img)

#------------------------------------------------------------------------#

def build_vgg():
    vgg = VGG19(include_top = False)
    return Model(vgg.input, vgg.layers[9].output)
    
#------------------------------------------------------------------------#

def combined(generator, discriminator, vgg):
    input_img = Input(shape = LR_shape)
    fake_img = generator(input_img)
    
    validity = discriminator(fake_img)
    features = vgg(fake_img)
    
    return Model(input_img, [validity, features])




In [55]:
losses = []
epochs_checkpoint = []
psnr = []

def train(epochs, batch_size, interval):
    
    start_time = datetime.datetime.now()
    
    real = np.ones((batch_size, 1))
    fake = np.zeros((batch_size, 1))
    
    for epoch in range(epochs):
        
        real_imgs, lr_imgs = load_data(batch_size)
        fake_imgs = generator.predict(lr_imgs)
        
        #Dの訓練
        d_loss_real = self.discriminator.train_on_batch(real_imgs, valid)
        d_loss_fake = self.discriminator.train_on_batch(fake_imgs, fake)
        d_loss = 0.5 * np.add(d_loss_real, d_loss_fake)
        
        #Gの訓練
        vgg_features = vgg.predict(real_imgs)
        g_loss = srgan.train_on_batch(lr_imgs, [real, vgg_features])
        
        time = datetime.datetime.now() - start_time
        print("%d time: %s" % (epoch, time))
        
        if epoch+1 % interval == 0:
            losses.append((d_loss, g_loss))
            epochs_checkpoint.append(epoch+1)
            save_images(epoch, real_imgs[0], lr_imgs[0], fake_imgs[0])
            psnr.append(calc_psnr(real_imgs[0], fake_imgs[0]))
            
    generator.save_weights(g_weight_name)
    discriminator.save_weights(d_weight_name)
        

In [56]:
#------------------------------------
# メインプログラム
#------------------------------------

#Discriminator
discriminator = build_discriminator()
discriminator.compile(loss = "mse",
                      optimizer = optimizer,
                      metrics = ["accuracy"])
#Generator
generator = build_generator()
discriminator.trainable = False
srgan = combined(generator, discriminator)
srgan.compile(loss=['binary_crossentropy', 'mse'],
                              loss_weights=[1e-3, 1],
                              optimizer=optimizer)



Note that input tensors are instantiated via `tensor = keras.layers.Input(shape)`.
The tensor that caused the issue was: conv2d_280/BiasAdd:0
  str(x.name))


ValueError: Graph disconnected: cannot obtain value for tensor Tensor("input_28:0", shape=(None, 120, 160, 3), dtype=float32) at layer "input_28". The following previous layers were accessed without issue: []

In [None]:
train(epochs = 1000, batch_size = 1, interval = 100)

In [20]:
from PIL import Image
D = Discriminator(480, 640, 3)
img = Image.open("../../images/train/image_1.png")
img = np.array(img)
valid = D.discriminate(img)
print(valid)

Tensor("dense_2/Sigmoid:0", shape=(?, 30, 40, 1), dtype=float32)


In [None]:
import keras