In [None]:
import glob
import os
import numpy as np

from PIL import Image
import matplotlib.pyplot as plt

import tensorflow as tf

In [None]:
%%capture
!wget https://people.eecs.berkeley.edu/~tinghuiz/projects/pix2pix/datasets/facades.tar.gz -O facades.tar.gz
!tar -zxvf facades.tar.gz -C ./

In [None]:
print("학습 데이터셋 A와 B의 개수:", len(next(os.walk('./facades/train/'))[2]))
print("평가 데이터셋 A와 B의 개수:", len(next(os.walk('./facades/val/'))[2]))
print("테스트 데이터셋 A와 B의 개수:", len(next(os.walk('./facades/test/'))[2]))

In [None]:
# 한 쌍의 이미지 출력(왼쪽은 정답 이미지, 오른쪽은 조건 이미지)
image = Image.open('./facades/train/1.jpg')
print("이미지 크기:", image.size)

plt.imshow(image)
plt.show()

In [None]:
def load(file) :
    image = tf.io.read_file(file)
    image = tf.io.decode_jpeg(image)

    w = tf.shape(image)[1]
    img_A = image[:, w//2 :, :]
    img_B = image[:, : w//2, :]

    img_A = tf.cast(img_A, tf.float32)
    img_B = tf.cast(img_B, tf.float32)

    return img_A, img_B

def resize(img_A, img_B, height, width) :
    img_A = tf.image.resize(img_A, [height, width],
                            method=tf.image.ResizeMethod.BICUBIC)
    img_B = tf.image.resize(img_B, [height, width],
                        method=tf.image.ResizeMethod.BICUBIC)
    return img_A, img_B

def normalize(img_A, img_B) :
    img_A /= 255.
    img_B /= 255.
    return img_A, img_B

def standardlize(img_A, img_B) :
    img_A = ( img_A - 0.5 ) / 0.5
    img_B = ( img_B - 0.5 ) / 0.5
    return img_A, img_B

@tf.function()
def transform(img_A, img_B) :
    if np.random.random() < 0.5:
        img_A = tf.image.flip_left_right(img_A)
        img_B = tf.image.flip_left_right(img_B)

    img_A, img_B = resize(img_A, img_B, 256, 256)
    img_A, img_B = normalize(img_A, img_B)
    img_A, img_B = standardlize(img_A, img_B)
    return img_A, img_B

In [None]:
train_files = sorted(glob.glob(os.path.join("facades", "train") + "/*.jpg"))
# 데이터의 개수가 적기 때문에 테스트 데이터를 학습 시기에 사용
train_files.extend(sorted(glob.glob(os.path.join("facades", "test") + "/*.jpg")))
test_files = sorted(glob.glob(os.path.join("facades", "val") + "/*.jpg"))

In [None]:
BUFFER_SIZE = 500
BATCH_SIZE = 10

def load_image_file(image_file):
  img_A, img_B = load(image_file)
  img_A, img_B = transform(img_A, img_B)

  return img_A, img_B

train_dataset = tf.data.Dataset.from_tensor_slices(train_files)
train_dataset = train_dataset.map(load_image_file,
                                  num_parallel_calls=tf.data.AUTOTUNE)
train_dataset = train_dataset.shuffle(BUFFER_SIZE)
train_dataset = train_dataset.batch(BATCH_SIZE)

test_dataset = tf.data.Dataset.from_tensor_slices(test_files)
test_dataset = train_dataset.map(load_image_file,
                                  num_parallel_calls=tf.data.AUTOTUNE)
test_dataset = train_dataset.shuffle(BUFFER_SIZE)
test_dataset = train_dataset.batch(BATCH_SIZE)

In [None]:
def UNetDown(in_channels, out_channels, normalize=True, dropout=0.0) :
    initializer = tf.random_normal_initializer(0.0, 0.02)

    layers = [ tf.keras.layers.Conv2D(filters = out_channels, kernel_size = 4, strides=2, padding='same',
                             kernel_initializer=initializer, use_bias=False) ]
    if normalize :
        layers.append( tf.keras.layers.BatchNormalization(axis=[0,1]) )
    if dropout :
        layers.append( tf.keras.layers.Dropout(dropout) )
    layers.append(tf.keras.layers.LeakyReLU(0.2))
    model = tf.keras.Sequential( layers )
    return model

def UNetUp(in_channels, out_channels, dropout=0.0) :
    initializer = tf.random_normal_initializer(0.0, 0.02)
    layers = [ tf.keras.layers.Conv2DTranspose(filters = out_channels, kernel_size = 4, strides=2,
                                    padding='same',
                                    kernel_initializer=initializer,
                                    use_bias=False) ]
    layers.append( tf.keras.layers.BatchNormalization(axis=[0,1]) )
    layers.append( tf.keras.layers.ReLU() )
    if dropout :
        layers.append( tf.keras.layers.Dropout(dropout) )
    model = tf.keras.Sequential(layers)
    return model

def GeneratorUNet(in_channels = 3, out_channels = 3):
    inputs = tf.keras.layers.Input(shape=[256, 256, in_channels])
    downs = [
        UNetDown(3, 64, normalize=False),
        UNetDown(64, 128),
        UNetDown(128, 256),
        UNetDown(256, 512, dropout = 0.5),
        UNetDown(512, 512, dropout = 0.5),
        UNetDown(512, 512, dropout = 0.5),
        UNetDown(512, 512, dropout = 0.5),
        UNetDown(512, 512, normalize = False, dropout = 0.5),
    ]
    ups = [
        UNetUp(512, 512, dropout=0.5),
        UNetUp(1024, 512, dropout=0.5),
        UNetUp(1024, 512, dropout=0.5),  
        UNetUp(1024, 512, dropout=0.5),  
        UNetUp(1024, 256),  
        UNetUp(512, 128),  
        UNetUp(256, 64)  
    ]

    final_init = tf.random_normal_initializer(0.0, 0.02)
    final = tf.keras.layers.Conv2DTranspose(out_channels, kennel_size= 4, strides=2,
                                         padding='same',
                                         kernel_initializer=final_init,
                                         activation='tanh')  # (batch_size, 256, 256, 3)


    ### forward   
    x = inputs
    skip_connections = list()
    for i in range(len(downs)) :
        x = downs[i](x)
        if i < len(downs)-1 :
            skip_connections.append(x)

    skip_connections.reverse()

    for i in range(len(ups)) :
        x = ups[i](x)
        x = tf.keras.layers.Concatenate()([x, skip_connections[i]])
    x = final(x)

    return tf.keras.Model(inputs = inputs, outputs = x)

def Discriminator(in_channels = 3) :

    def discriminator_block(in_channels, out_channels, normalization=True):
        initializer = tf.random_normal_initializer(0., 0.02)
        layers = [ tf.keras.layers.Conv2D(filters = out_channels, kernel_size = 4, strides=2, padding='same',
                                    kernel_initializer=initializer, use_bias=False) ]
        if normalization :
            layers.append( tf.keras.layers.BatchNormalization(axis=[0,1]) )
        layers.append(tf.keras.layers.LeakyReLU(0.2))
        return layers
    
    initializer = tf.random_normal_initializer(0., 0.02)
    layers = list()
    layers.extend( discriminator_block(in_channels*2, 64, normalization = False))
    layers.extend( discriminator_block(64, 128) )
    layers.extend( discriminator_block(128, 256) )
    layers.extend( discriminator_block(256, 512) )

    layers.append(tf.keras.layers.ZeroPadding2D(padding = ((1, 0), (1, 0))))
    layers.append(tf.keras.layers.Conv2D(filters = 1, kernel_size = 4, strides=1, padding='same',
                                    kernel_initializer=initializer, use_bias=False))
    model = tf.keras.Sequential(layers)

    ### forward
    img_A = tf.keras.layers.Input(shape=[256, 256, 3], name='input')
    img_B = tf.keras.layers.Input(shape=[256, 256, 3], name='target')
    x = tf.keras.layers.Concatenate()([img_A, img_B])
    x = model(x)

    return tf.keras.Model(inputs = [img_A, img_B], outputs = x)

In [None]:
lambda_pixel = 100

criterion_GAN =  tf.keras.losses.MeanSquaredError()
def criterion_pixelwise(target, gen_output) : # MAE
    return tf.reduce_mean(tf.abs(target - gen_output))

def g_loss(disc_generated_output, gen_output, target):
  loss_GAN = criterion_GAN(tf.ones_like(disc_generated_output), disc_generated_output)
  loss_pixel = criterion_pixelwise(target, gen_output)
  loss_G = loss_GAN + lambda_pixel * loss_pixel

  return loss_G

def d_loss(disc_real_output, disc_generated_output):
  loss_real = criterion_GAN(tf.ones_like(disc_real_output), disc_real_output)
  loss_fake = criterion_GAN(tf.zeros_like(disc_generated_output), disc_generated_output)

  loss_D = loss_real + loss_fake

  return loss_D

In [None]:
generator = GeneratorUNet()
discriminator = Discriminator()

lr = 0.0002

optimizer_G = tf.keras.optimizers.Adam(lr, beta_1=0.5)
optimizer_D = tf.keras.optimizers.Adam(lr, beta_1=0.5)

In [None]:
@tf.function
def train_step(input_image, target):
  with tf.GradientTape() as gen_tape, tf.GradientTape() as disc_tape:
    gen_output = generator(input_image, training=True)

    disc_real_output = discriminator([input_image, target], training=True)
    disc_generated_output = discriminator([input_image, gen_output], training=True)

    gen_total_loss = g_loss(disc_generated_output, gen_output, target)
    disc_loss = d_loss(disc_real_output, disc_generated_output)

  generator_gradients = gen_tape.gradient(gen_total_loss,
                                          generator.trainable_variables)
  discriminator_gradients = disc_tape.gradient(disc_loss,
                                               discriminator.trainable_variables)

  optimizer_G.apply_gradients(zip(generator_gradients,
                                          generator.trainable_variables))
  optimizer_D.apply_gradients(zip(discriminator_gradients,
                                          discriminator.trainable_variables))
  
  