<a href="https://colab.research.google.com/github/hansong0219/Advanced-Deep-learning-Notebooks/blob/master/Cross-Domain/CycleGAN_ResNet.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# ResNet 생성자를 이용한 Cycle GAN 모델 

ResNet 구조는 이전 층의 전보를 네트워 앞쪽에 한개 이상의 층으로 스킵한다는 부분에서 U-Net 과 비슷하나, U-Net 은 다운샘플링 층을 이에 상응하는 업샘플링 층으로 연결하여 U 모양을 구성하는 대신 ResNet은 Residual block 을 차례대로 쌓아 구성하게된다. 

각 블록은 다음의 층으로 출력을 전달하기 전에 입력과 출력을 합하는 스킵 연결층을 가지고 있다.

ResNet  구조는 수백 또는 수천개의 층도 훈련할 수 있는데 앞쪽에 층에 도달하는 그레디언트가 매우 작아져 매우 느리게 훈련되는 vanishing gradient 문제가 없고,Error gradient 가 Residual Block 의 스킵 연결을 통해 네트워크에 그대로 역전파 되기 때문이다. 또, 층을 추가해도 모델의 정확도를 떨어뜨리지 않는데 추가적인 특성이 추출되지 않는다면, 스킵연결로 인해 언제든지 이전 층의 특성이 identify mapping 을 통과하기 때문이다.

본 Notebook 에서는 Residual block 을 사용한 생성자를 구성하여 Image Style Transfer 를 수행할 예정이다.

In [None]:
import numpy as np
import os
import sys
from tensorflow.keras.layers import Input, Dropout, concatenate, add, Layer
from tensorflow.keras.layers import Conv2DTranspose, Conv2D 
from tensorflow.keras.models import Sequential, Model
from tensorflow.keras.layers import LeakyReLU, Activation
from tensorflow.keras.layers import BatchNormalization
from tensorflow_addons.layers import InstanceNormalization
from tensorflow.keras.optimizers import Adam, RMSprop
from tensorflow.keras.utils import plot_model
from tensorflow.keras.losses import BinaryCrossentropy
import matplotlib.pyplot as plt
import tensorflow as tf
from tensorflow import keras

import cv2
import math
import datetime
import imageio
from glob import glob

# GPU 할당

In [None]:
import tensorflow as tf 
physical_devices =tf.config.experimental.list_physical_devices('GPU')
tf.config.experimental.set_memory_growth(physical_devices[0],True)

# 유틸 함수 정의


### 이미지 및 데이터 재구성 유틸

In [None]:
def display_images(imgs, filename, title='', imgs_dir=None, show=False):
  
    #이미지를 nxn 으로 나타냄
    rows = imgs.shape[1]
    cols = imgs.shape[2]
    channels = imgs.shape[3]
    side = int(math.sqrt(imgs.shape[0]))
    assert int(side * side) == imgs.shape[0]

    # 이미지 저장을 위한 폴더를 만듦
    if imgs_dir is None:
        imgs_dir = 'saved_images'
    save_dir = os.path.join(os.getcwd(), imgs_dir)
    if not os.path.isdir(save_dir):
        os.makedirs(save_dir)
    filename = os.path.join(imgs_dir, filename)
    # 이미지의 shape 을 지정
    if channels==1:
        imgs = imgs.reshape((side, side, rows, cols))
    else:
        imgs = imgs.reshape((side, side, rows, cols, channels))
    imgs = np.vstack([np.hstack(i) for i in imgs])
    
    if np.min(imgs)< 0:
      imgs = imgs * 0.5 + 0.5
    
    plt.figure(figsize = (8,8))
    plt.axis('off')
    plt.title(title)
    if channels==1:
        plt.imshow(imgs, interpolation='none', cmap='gray')
    else:
        plt.imshow(imgs, interpolation='none')
    plt.savefig(filename)
    if show:
        plt.show()
    
    plt.close('all')


def test_generator(generators, test_data, step, titles, dirs, todisplay=4, show=False):
    # generator 모델을 테스트함
    # 입력 인수
    """
    generators (tuple): 소스와 타겟 생성기
    test_data (tuple): 소스와 타겟 데이터
    step (int): 진행 단계
    titles (tuple): 표시 이미지의 타이틀
    dirs (tuple): 이미지 저장 폴더
    todisplay (int): 저장이미지의 수 (정사각형 형태로 생성되어야 한다.)
    show (bool): 이미지 표시 여부
    """

    # test data 로 부터 output 예측
    g_source, g_target = generators
    test_source_data, test_target_data = test_data
    t1, t2, t3, t4 = titles
    title_pred_source = t1
    title_pred_target = t2
    title_reco_source = t3
    title_reco_target = t4
    dir_pred_source, dir_pred_target = dirs

    pred_target_data = g_target.predict(test_source_data)
    pred_source_data = g_source.predict(test_target_data)
    reco_source_data = g_source.predict(pred_target_data)
    reco_target_data = g_target.predict(pred_source_data)

    # 정사각형 형태의 하나의 이미지로 나타냄
    imgs = pred_target_data[:todisplay]
    filename = '%06d.png' % step
    step = " Step: {:,}".format(step)
    title = title_pred_target + step
    display_images(imgs,
                   filename=filename,
                   imgs_dir=dir_pred_target,
                   title=title,
                   show=show)

    imgs = pred_source_data[:todisplay]
    title = title_pred_source
    display_images(imgs,
                   filename=filename,
                   imgs_dir=dir_pred_source,
                   title=title,
                   show=show)

    imgs = reco_source_data[:todisplay]
    title = title_reco_source
    filename = "reconstructed_source.png"
    display_images(imgs,
                   filename=filename,
                   imgs_dir=dir_pred_source,
                   title=title,
                   show=show)

    imgs = reco_target_data[:todisplay]
    title = title_reco_target
    filename = "reconstructed_target.png"
    display_images(imgs,
                   filename=filename,
                   imgs_dir=dir_pred_target,
                   title=title,
                   show=show)


def process_data(data, titles, filenames, todisplay=4):
    source_data, target_data, test_source_data, test_target_data = data
    test_source_filename, test_target_filename = filenames
    test_source_title, test_target_title = titles

    # 테스트 타겟 이미지 표시
    imgs = test_target_data[:todisplay]
    display_images(imgs,
                   filename=test_target_filename,
                   title=test_target_title)

    # 테스트 소스이미지 표시
    imgs = test_source_data[:todisplay]
    display_images(imgs,
                   filename=test_source_filename,
                   title=test_source_title)

    # 이미지 표시 정리
    target_data = target_data.astype('float32')  / 127.5 - 1
    test_target_data = test_target_data.astype('float32') / 127.5 - 1

    source_data = source_data.astype('float32')  / 127.5 - 1
    test_source_data = test_source_data.astype('float32') / 127.5 - 1

    # 소스, 타겟, 테스트 데이터
    data = (source_data, target_data, test_source_data, test_target_data)

    rows = source_data.shape[1]
    cols = source_data.shape[2]
    channels = source_data.shape[3]
    source_shape = (rows, cols, channels)

    rows = target_data.shape[1]
    cols = target_data.shape[2]
    channels = target_data.shape[3]
    target_shape = (rows, cols, channels)

    shapes = (source_shape, target_shape)
    
    return data, shapes

### 데이터 불러오기 유틸

In [None]:
def imread(path):
    return imageio.imread(path,as_gray=False,pilmode='RGB').astype(np.float)

def load_data(dataset_path, is_test=False):
  data_type = "train" if not is_test else "test"
  path_source = glob(dataset_path + '/%sA/*' %(data_type))
  path_target = glob(dataset_path + '/%sB/*' %(data_type))
    
  source_data, target_data = [], []
  for source, target in zip(path_source, path_target):
    img_source = imread(source)
    img_target = imread(target)

    img_source = np.array(img_source)
    img_target = np.array(img_target)

    if is_test and np.random.random()>0.5:
      img_source = np.fliplr(img_source)
      img_target = np.fliplr(img_target)
    
    source_data.append(img_source)
    target_data.append(img_target)
    
  return np.array(source_data), np.array(target_data)

In [None]:
# 데이터 불러오기 및 확인
dataset_path = "D:/data/vangogh2photo"
source_data, target_data = load_data(dataset_path)

plt.figure()
plt.imshow(source_data[0]/255.0)
plt.figure()
plt.imshow(target_data[0]/255.0)

# 모델 구성

### ReflectionPadding2D

CycleGAN 논문 및 Source 를 통해 ResNet 생성기의 Layer 구조에서 Reflection Padding을 사용하는 것이 ZerosPadding을 사용하는 Conv2D의 padding=same 조건에 비해 권장 되었다. 따라서 ResNet 생성기에는 ReflectionPadding2D의 Class를 지정하여 사용한다.

In [None]:
class ReflectionPadding2D(Layer):
    # ReflectionPadding Layer 를 실행할 수 있도록 구성된 class
    
    # 입력인수
    """
    padding(tuple): padding을 위한 특정 차원의 크기로 지정된다. 
    """
    # 출력 : padding 이 실행된 텐서를 출력한다.

    def __init__(self, padding=(1, 1), **kwargs):
        self.padding = tuple(padding)
        super(ReflectionPadding2D, self).__init__(**kwargs)

    def call(self, input_tensor, mask=None):
        padding_width, padding_height = self.padding
        padding_tensor = [
            [0, 0],
            [padding_height, padding_height],
            [padding_width, padding_width],
            [0, 0],
        ]
        return tf.pad(input_tensor, padding_tensor, mode="REFLECT")

# 모델 Build 함수 정의

### 모델 구성층 구성 함수
ResNet 을 생성자로 하는 Cycle GAN 의 구성요소는 크게 3가지로 분류할 수 있다. 

1. 생성자와 판별자의 encoder(downsampling) layer
2. 생성자의 Residual Block Unit
3. 생성자의 decoder(upsampling) layer

In [None]:
def encoder_layer(inputs, filters=16, kernel_size=3, strides = 2,  activation='relu', instance_norm=True):
  #Conv2D -IN - ReLU(LeakyReLU) 의 인코더 층을 구성한다.
  kernel_init = keras.initializers.RandomNormal(mean=0.0, stddev=0.02)

  x = inputs
  x = Conv2D(filters, kernel_size=kernel_size, strides=strides,kernel_initializer=kernel_init, padding = 'same')(x)
  if instance_norm:
    x = InstanceNormalization()(x)

  if activation=='relu':
    x = Activation(activation)(x)
  else:
    x = LeakyReLU(alpha=0.2)(x)

  return x

def decoder_layer(inputs, filters=16, kernel_size=3, strides=2, activation='relu', instance_norm = True):
  #Conv2DTranspose-IN-LeakyReLU로 구성된 디코더 계층구성, 활성화 함수는 ReLU 로 교체될 수 있음
  kernel_init = keras.initializers.RandomNormal(mean=0.0, stddev=0.02)

  x = inputs
  x = Conv2DTranspose(filters, kernel_size=kernel_size, strides=strides, kernel_initializer = kernel_init, padding='same')(x)
  if instance_norm:
    x = InstanceNormalization()(x)
  
  if activation=='relu':
    x = Activation(activation)(x)
  else:
    x = LeakyReLU(alpha=0.2)(x)

  return x

def residual_block(inputs, filters=64, kernel_size=3, resblock=2):
  # shorcut 연결 구현한 Residual Block
  kernel_init = keras.initializers.RandomNormal(mean=0.0, stddev=0.02)
  shortcut = inputs

  for num_unit in range(resblock):
    if num_unit == 0:
      x = ReflectionPadding2D(padding=(1,1))(inputs)
    else:
      x = ReflectionPadding2D(padding=(1,1))(x)
    
    x = Conv2D(filters, kernel_size=kernel_size, strides=1, padding='valid', kernel_initializer=kernel_init)(x)
    x = InstanceNormalization()(x)

    if num_unit != resblock-1:
      x = Activation('relu')(x)
 
  return add([shortcut, x])

### 생성기 Build 함수 구현

In [None]:
# 생성기 Build 함수
def build_generator(input_shape, output_shape=None, filters = 64, num_encoders = 2, num_residual_blocks= 9, num_decoders = 2, name=None):
  inputs = Input(shape=input_shape)
  channels = int(output_shape[-1])
  kernel_init = keras.initializers.RandomNormal(mean=0.0, stddev=0.02)

  x = ReflectionPadding2D(padding=(3, 3))(inputs)
  x = Conv2D(filters, kernel_size=7, strides=1, padding='valid', kernel_initializer = kernel_init, use_bias=False)(x)
  x = InstanceNormalization()(x)
  x = Activation('relu')(x)

  #DownSampling
  for _ in range(num_encoders):
    filters *=2
    x = encoder_layer(x, filters=filters)
  
  #Residual Block 층 구성
  for _ in range(num_residual_blocks):
    x = residual_block(x, filters=filters)

  #UpSampling
  for _ in range(num_decoders):
    filters//=2
    x = decoder_layer(x, filters=filters)

  #Final Block
  x = ReflectionPadding2D(padding=(3, 3))(x)
  x = Conv2D(channels, (7, 7), strides=1, padding="valid")(x)
  x = Activation("tanh")(x)

  return Model(inputs, x, name=name)

### 판별자 Build 함수 (Patch GAN)

In [None]:
def build_discriminator(input_shape, filters=64, kernel_size=4, num_encoder=4, name=None):
  kernel_init = keras.initializers.RandomNormal(mean=0.0, stddev=0.02)
  
  inputs = Input(shape=input_shape)
  x = encoder_layer(inputs, filters = filters, kernel_size = kernel_size, strides=2, activation='leaky_relu', instance_norm=False) 

  num_filters=filters
  for num_block in range(num_encoder):
    num_filters *= 2
    if num_block < num_encoder-1:
      x = encoder_layer(x, filters=num_filters, kernel_size = kernel_size, strides = 2, activation='leaky_relu') 
    else:
      x = encoder_layer(x, filters=num_filters, kernel_size = kernel_size, strides = 1, activation='leaky_relu')

  outputs = Conv2D(1, (4,4), strides=1, padding='same', kernel_initializer = kernel_init)(x)

  return Model(inputs, outputs, name=name)

### Cycle GAN Build 함수 지정

In [None]:
def build_cyclegan(shape, source_name='source', target_name='target', kernel_size=3, patchgan=False, identify=False):
  #Cycle GAN 의 구성
  """
  1) 타깃과 소스의 판별기
  2) 타깃과 소스의 생성기
  3) 적대적 네트워크 구성
  """

  #입력 인수
  """
  shape(tuple): 소스와 타깃 형상
  source_name (string): 판별기/생성기 모델 이름 뒤에 붙는 소스이름 문자열
  target_name (string): 판별기/생성기 모델 이름 뒤에 붙는 타깃이름 문자열
  target_size (int): 인코더/디코더 혹은 판별기/생성기 모델에 사용될 커널 크기
  patchgan(bool): 판별기에 patchgan 사용 여부
  identify(bool): 동질성 사용 여부
  """
  #출력 결과:
  #list : 2개의 생성기, 2개의 판별기, 1개의 적대적 모델 

  source_shape, target_shape = shapes
  lr = 2e-4
  decay = 6e-8
  gt_name = "gen_" + target_name
  gs_name = "gen_" + source_name
  dt_name = "dis_" + target_name
  ds_name = "dis_" + source_name

  #타깃과 소스 생성기 구성
  g_target = build_generator(source_shape, target_shape, name=gt_name)
  g_source = build_generator(target_shape, source_shape, name=gs_name)

  print('-----Target Generator-----')
  g_target.summary()
  print('-----Source Generator-----')
  g_source.summary()

  #타깃과 소스 판별기 구성
  d_target = build_discriminator(target_shape, name=dt_name)
  d_source = build_discriminator(source_shape, name=ds_name)

  print('-----Targent Discriminator-----')
  d_target.summary()
  print('-----Source Discriminator-----')
  d_source.summary()

  optimizer = RMSprop(lr=lr, decay=decay)
  d_target.compile(loss='mse',optimizer=optimizer,metrics=['accuracy'])
  d_source.compile(loss='mse',optimizer=optimizer,metrics=['accuracy'])

  #적대적 모델에서 판별기 가중치 고정
  d_target.trainable=False
  d_source.trainable=False

  #적대적 모델의 계산 그래프 구성

  #전방순환 네트워크와 타깃 판별기 
  source_input = Input(shape=source_shape)
  fake_target = g_target(source_input)
  preal_target = d_target(fake_target)
  reco_source = g_source(fake_target)

  #후방순환 네트워크와 타깃 판별기
  target_input = Input(shape=target_shape)
  fake_source = g_source(target_input)
  preal_source = d_source(fake_source)
  reco_target = g_target(fake_source)

  #동질성 손실 사용 시, 두개의 손실항과 출력을 추가한다.
  if identify:
    iden_source = g_source(source_input)
    iden_target = g_target(target_input)
    loss = ['mse', 'mse', 'mae', 'mae', 'mae', 'mae']
    loss_weights = [1., 1., 10., 10., 0.5, 0.5]
    inputs = [source_input, target_input]
    outputs = [preal_source, preal_target, reco_source, reco_target, iden_source, iden_target]

  else:
    loss = ['mse', 'mse', 'mae', 'mae']
    loss_weights = [1., 1., 10., 10.]
    inputs = [source_input, target_input]
    outputs = [preal_source, preal_target, reco_source, reco_target]

  #적대적 모델 구성
  adv = Model(inputs, outputs, name='adversarial')
  optimizer = RMSprop(lr=lr*0.5, decay=decay*0.5)
  adv.compile(loss=loss, loss_weights=loss_weights, optimizer=optimizer)
  print('-----Adversarial Network-----')
  adv.summary()

  return g_source, g_target, d_source, d_target, adv

In [None]:
def train_cyclegan(models, data, params, test_params, test_generator, identify=False):
  #Cycle GAN 훈련
  """
  1) 타깃 판별기 훈련
  2) 소스 판별기 훈련
  3) 적대적 네트워크의 전방/ 후방 순환 훈련
  """
  # 입력 인수:
  """
  models (list): 소스/타깃에 대한 판별기/생성기, 적대적 모델
  data (tuple): 소스와 타깃 훈련데이터
  params (tuple): 네트워크 매개변수
  test_params (tuple): 테스트 매개변수
  test_generator (function): 예측 타깃/소스 이미지생성에 사용됨.
  """
  #모델
  g_source, g_target, d_source, d_target, adv = models
  #네트워크 매개변수 지정
  batch_size, train_steps, patch, model_name = params
  #훈련 데이터 세트 
  source_data, target_data, test_source_data, test_target_data = data
  titles, dirs = test_params

  #생성기 이미지는 2000단계마다 저장됨
  save_interval = 2000
  target_size = target_data.shape[0]
  source_size = source_data.shape[0]

  #patchgan 사용 여부
  if patch > 1:
    d_patch = (patch, patch, 1)
    valid = np.ones((batch_size,) + d_patch)
    fake = np.zeros((batch_size,) + d_patch)

  else:
    valid = np.ones([batch_size, 1])
    fake = np.zeros([batch_size, 1])

  valid_fake = np.concatenate((valid,fake))
  start_time = datetime.datetime.now()

  for step in range(train_steps):
    #실제 타깃 데이터 배치 샘플링
    rand_indices = np.random.randint(0, target_size, size=batch_size)
    real_target = target_data[rand_indices]

    #실제 소스 데이터 배치 샘플링
    rand_indices = np.random.randint(0, source_size, size=batch_size)
    real_source = source_data[rand_indices]

    #실제 소스 데이터에서 가짜 타깃 데이터 배치를 생성
    fake_target = g_target.predict(real_source)
    
    #실제 타겟 데이터와 가짜 타겟데이터를 하나의 배치로 결합
    x = np.concatenate((real_target, fake_target))
    #타겟 판별자를 훈련시킴
    metrics = d_target.train_on_batch(x, valid_fake)
    log = "%d: [d_target loss: %f]" %(step, metrics[0])

    #실제 타깃 데이터에서 가짜 소스 데이터 배치 생성
    fake_source = g_source.predict(real_target)
    x = np.concatenate((real_source, fake_source))
    #가짜/실제 데이터를 사용해 소스 판별기 훈련
    metrics = d_source.train_on_batch(x, valid_fake)
    log = "%s [d_source loss: %f]" %(log, metrics[0])
    
    #전방/후방 순환을 사용해 적대적 네트워크 훈련
    #생성된 가짜 소스/타깃 데이터는 판별기를 속이려고 시도함
    
    if identify:
        x = [real_source, real_target]
        y = [valid, valid, real_source, real_target, real_source, real_target]
    
    else:
        x = [real_source, real_target]
        y = [valid, valid, real_source, real_target]

    metrics = adv.train_on_batch(x, y)

    elapsed_time = datetime.datetime.now()-start_time
    fmt = "%s [adv loss: %f] [time: %s]"
    log = fmt %(log, metrics[0], elapsed_time)
    print(log)

    if (step+1) % save_interval == 0:
      if (step+1) == train_steps:
        show = True
      else:
        show = False

      test_generator((g_source, g_target), (test_source_data, test_target_data), step = step+1, titles=titles, dirs=dirs, show=show)

  g_source.save(model_name + "-g_source.h5")
  g_target.save(model_name + "-g_target.h5")

# 데이터 전처리

In [None]:
test_source_data, test_target_data = load_data(dataset_path, is_test=True)

filenames = ('vangogh_test_source.png', 'picture_test_target.png')
titles = ('Van Gogh test source images', 'picture test target images')
data = (source_data, target_data, test_source_data, test_target_data)

In [None]:
# 이미지 저장 및 shape 재지정
data, shapes = loaded_data(data, titles, filenames)

In [None]:
#model_fine tuning 변수 지정
model_name = 'cyclegan_vangogh'
batch_size = 1
train_steps = 100000
patchgan = True
kernel_size = 3
postfix = ('%dp' % kernel_size) if patchgan else ('%d' % kernel_size)

titles = ('vangogh2pic predicted source images.',
          'vangogh2pic predicted target images.',
          'vangogh2pic reconstructed source images.',
          'vangogh2pic reconstructed target images.')
dirs = ('vangogh2pic_source-%s' % postfix, 'vangogh2pic_target-%s' % postfix)

In [None]:
models = build_cyclegan(shapes, "vangogh-%s" % postfix, "picture-%s" % postfix, kernel_size=kernel_size, patchgan=patchgan)

In [None]:
#판별기의 입력을 2^n 만큼 척도를 줄임 -> patch 크기를 2^n 으로 나눔(즉 strides=2를 n회 사용함)
patch = int(source_data.shape[1] / 2**4) if patchgan else 1
params = (batch_size, train_steps, patch, model_name)
test_params = (titles, dirs)

#cyclegan 훈련
train_cyclegan(models, data, params, test_params, test_generator)