# Setting

In [1]:
from google.colab import drive
drive.mount('/content/drive', force_remount=True)

Mounted at /content/drive


In [2]:
import sys
from IPython.display import Image
import matplotlib
%matplotlib inline
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
import seaborn as sns
import os
import warnings
import tensorflow as tf
import tensorflow_datasets as tfds
warnings.filterwarnings("ignore")

# 17.3 합성곱 GAN과 바서슈타인 GAN으로 합성 이미지 품질 높이기

GAN 예제의 성능을 높이기 위해 DCGAN을 만들어 보겠다.

몇 가지 중요한 기술을 도입하고 바서슈타인 GAN을 구현해 보겠다.

전치 합성곱, 배치 정규화, WGAN, 그레이디언트 페널티

## 17.3.1 전치 합성곱

합성곱 연산은 보통 특성 맵을 다운샘플링하지만 전치 합성곱 연산은 특성 맵을 업샘플링하는 데 사용한다.

합성곱 연산을 통해 출력 특성 맵을 만들고 원래 크기의 특성 맵을 얻는 합성곱을 적용하여도 행렬의 크기는 복원되지만 실제 행렬 값은 복원되지 않는다.

In [3]:
Image(url='https://git.io/JLjn7', width=700)

전치 합성곱은 입력 특성 맵의 원소 사이에 0을 끼워 넣어 합성곱을 수행하는 식으로 특성 맵을 업샘플링한다.

In [4]:
Image(url='https://git.io/JLjnb', width=700)

## 17.3.2 배치 정규화

배치 정규화의 주요 아이디어 중 하나는 층의 입력을 정규화하고 훈련하는 동안 입력 분포의 변화를 막는 것이다.

이는 모델을 빠르고 안정적으로 수렴하게 만든다.

배치 정규화는 계산된 통계 값을 기반으로 미니 배치의 특성을 변환한다.

배치 정규화 단계를 정리해보면

1. 미니 배치 입력의 평균과 표준편차를 계산한다.

2. 배치에 있는 모든 샘플의 입력을 표준화한다.

3. 정규화된 입력을 학습하는 두 개의 파라미터 백터로 스케일을 조정하고 이동시킨다.

In [5]:
Image(url='https://git.io/JLjnA', width=700)

표준화된 입력 값을 가지면 경사 하강법 기반의 최적화 방식에 바람직한 성질이다.

반면 각기 다른 미니 배치에서 동일한 성질을 같도록 항상 입력을 정규화하면 신경망의 표현 능력에 큰 영향을 미칠 수 있다.

표준정규분포를 따르는 변수가 시그모이드 함수를 통과할 때 0에 가까운 값은 선형적인 영역이기 때문이다.

따라서 단계 3에서 크기가 c인 학습 가능한 파라미터를 이용하여 정규화된 특성을 이동시키고 분산시킨다.

훈련한느 동안 이동 평균과 이동 분산을 계산하는데 이 값은 튜닝 파라미터와 함께 사용하여 평가 시에 테스트 샘플을 정규화한다.

배치 정규화가 최적화에 도움이 되는 이유는 초기에 배치 정규화는 내부 공변량 변화를 감소하기 위해 개발되었는데 내부 공변량 변화는 신경망이 훈련하는 동안 모델 파라미터가 업데이트되기 때문에 층 활성화의 분산에 변화가 생기는 현상이다.

## 17.3.3 생성자와 판별자 구현

In [6]:
#생성자

Image(url='https://git.io/JLjnx', width=700)

In [7]:
#판별자

Image(url='https://git.io/JLjnj', width=700)

In [8]:
import tensorflow as tf


print(tf.__version__)

print("GPU 여부:", len(tf.config.list_physical_devices('GPU')) > 0)

if tf.config.list_physical_devices('GPU'):
    device_name = tf.test.gpu_device_name()
else:
    device_name = 'CPU:0'
    
print(device_name)

2.7.0
GPU 여부: True
/device:GPU:0


In [9]:
def make_dcgan_generator(
        z_size=20, 
        output_size=(28, 28, 1),
        n_filters=128, 
        n_blocks=2):
    size_factor = 2**n_blocks
    hidden_size = (
        output_size[0]//size_factor, 
        output_size[1]//size_factor
    )
    
    model = tf.keras.Sequential([
        tf.keras.layers.Input(shape=(z_size,)),
        
        tf.keras.layers.Dense(
            units=n_filters*np.prod(hidden_size), 
            use_bias=False),
        tf.keras.layers.BatchNormalization(),
        tf.keras.layers.LeakyReLU(),
        tf.keras.layers.Reshape(
            (hidden_size[0], hidden_size[1], n_filters)),
    
        tf.keras.layers.Conv2DTranspose(
            filters=n_filters, kernel_size=(5, 5), strides=(1, 1),
            padding='same', use_bias=False),
        tf.keras.layers.BatchNormalization(),
        tf.keras.layers.LeakyReLU()
    ])
        
    nf = n_filters
    for i in range(n_blocks):
        nf = nf // 2
        model.add(
            tf.keras.layers.Conv2DTranspose(
                filters=nf, kernel_size=(5, 5), strides=(2, 2),
                padding='same', use_bias=False))
        model.add(tf.keras.layers.BatchNormalization())
        model.add(tf.keras.layers.LeakyReLU())
                
    model.add(
        tf.keras.layers.Conv2DTranspose(
            filters=output_size[2], kernel_size=(5, 5), 
            strides=(1, 1), padding='same', use_bias=False, 
            activation='tanh'))
        
    return model

def make_dcgan_discriminator(
        input_size=(28, 28, 1),
        n_filters=64, 
        n_blocks=2):
    model = tf.keras.Sequential([
        tf.keras.layers.Input(shape=input_size),
        tf.keras.layers.Conv2D(
            filters=n_filters, kernel_size=5, 
            strides=(1, 1), padding='same'),
        tf.keras.layers.BatchNormalization(),
        tf.keras.layers.LeakyReLU()
    ])
    
    nf = n_filters
    for i in range(n_blocks):
        nf = nf*2
        model.add(
            tf.keras.layers.Conv2D(
                filters=nf, kernel_size=(5, 5), 
                strides=(2, 2),padding='same'))
        model.add(tf.keras.layers.BatchNormalization())
        model.add(tf.keras.layers.LeakyReLU())
        model.add(tf.keras.layers.Dropout(0.3))
        
    model.add(tf.keras.layers.Conv2D(
            filters=1, kernel_size=(7, 7), padding='valid'))
    
    model.add(tf.keras.layers.Reshape((1,)))
    
    return model

In [10]:
gen_model = make_dcgan_generator()
gen_model.summary()

disc_model = make_dcgan_discriminator()
disc_model.summary()

Model: "sequential"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 dense (Dense)               (None, 6272)              125440    
                                                                 
 batch_normalization (BatchN  (None, 6272)             25088     
 ormalization)                                                   
                                                                 
 leaky_re_lu (LeakyReLU)     (None, 6272)              0         
                                                                 
 reshape (Reshape)           (None, 7, 7, 128)         0         
                                                                 
 conv2d_transpose (Conv2DTra  (None, 7, 7, 128)        409600    
 nspose)                                                         
                                                                 
 batch_normalization_1 (Batc  (None, 7, 7, 128)        5

## 17.3.4 두 분포 사이의 거리 측정

생성자 모델은 훈련 데이터셋과 같은 분포를 가진 새로운 샘플을 합성하는 법을 배우는 것이 목적이다.



In [11]:
Image(url='https://git.io/JLjcf', width=700)

- TV 거리 : 각 포인트에서 두 분산 사이의 가장 큰 차이

- EM 거리 : 한 분포에서 다른 분포로 변환할 때 필요한 최소의 작업량이다.

- KL,JS 


In [12]:
Image(url='https://git.io/JLjcJ', width=800)

원본 GAN에 있는 손실 함수는 진짜와 가짜 샘플 사이의 JS 발산을 최소화하는 것임을 수학적으로 증명할 수 있다.

하지만 JS 발산은 GAN 모델 훈련을 할 때 문제가 있다.

따라서 훈련 성능을 높이기 위해 연구자들은 EM 거리를 진짜와 가짜 샘플 분포 사이의 거리를 측정하는 데 사용했다.

EM 거리를 계산하는 것은 그 자체가 최적화 문제이다. 

따라서 계산하기가 매우 어렵다.

다행히 EM 거리 계산을 칸트로비치-루빈스타인 쌍대성 이론을 사용해 단순화할 수 있다.

## 17.3.5 GAN에 EM 거리 사용

문제는 진짜와 가짜 샘플 사이의 바서슈타인 거리를 계산하기 위한 1-립시츠 함수를 어떻게 찾느냐는 것이다.

심층 신경망이 어떤 함수도 근사할 수 있다.

기본 GAN은 판별자를 분류기 형태로 사용한다.

WGAN에서는 판별자를 바꾸어 확률 점수 대신에 스칼라 점수를 반환하는 비평자로 바꿀 수 있다.

이 점수를 입력 이미지가 얼마나 진짜 같은지 나타내는 정도로 해석할 수 있다.



## 17.3.6 그레이디언트 페널티

1. 한 배치에서 진짜와 가짜 샘플의 각 쌍에 대해 균등 분포에서 랜덤한 수를 샘플링한다.

2. 진짜와 가짜 샘플 사이를 보간한다. 결국 보간된 샘플의 배치가 만들어진다.

3. 보간된 전체 샘플에 대해 판별자 출력을 계산한다.

4. 각 보간된 샘플에 대해 비평자 출력의 그레이디언트를 계산한다.

5. GP를 계산.

In [13]:
mnist_bldr = tfds.builder('mnist')
mnist_bldr.download_and_prepare()
mnist = mnist_bldr.as_dataset(shuffle_files=False)

def preprocess(ex, mode='uniform'):
    image = ex['image']
    image = tf.image.convert_image_dtype(image, tf.float32)

    image = image*2 - 1.0
    if mode == 'uniform':
        input_z = tf.random.uniform(
            shape=(z_size,), minval=-1.0, maxval=1.0)
    elif mode == 'normal':
        input_z = tf.random.normal(shape=(z_size,))
    return input_z, image

[1mDownloading and preparing dataset mnist/3.0.1 (download: 11.06 MiB, generated: 21.00 MiB, total: 32.06 MiB) to /root/tensorflow_datasets/mnist/3.0.1...[0m


local data directory. If you'd instead prefer to read directly from our public
GCS bucket (recommended if you're running on GCP), you can instead pass
`try_gcs=True` to `tfds.load` or set `data_dir=gs://tfds-data/datasets`.



Dl Completed...:   0%|          | 0/4 [00:00<?, ? file/s]


[1mDataset mnist downloaded and prepared to /root/tensorflow_datasets/mnist/3.0.1. Subsequent calls will reuse this data.[0m


In [14]:
num_epochs = 100
batch_size = 128
image_size = (28, 28)
z_size = 20
mode_z = 'uniform'
lambda_gp = 10.0

tf.random.set_seed(1)
np.random.seed(1)

## 데이터셋 준비
mnist_trainset = mnist['train']
mnist_trainset = mnist_trainset.map(preprocess)

mnist_trainset = mnist_trainset.shuffle(10000)
mnist_trainset = mnist_trainset.batch(
    batch_size, drop_remainder=True)

## 모델 생성
with tf.device(device_name):
    gen_model = make_dcgan_generator()
    gen_model.build(input_shape=(None, z_size))
    gen_model.summary()

    disc_model = make_dcgan_discriminator()
    disc_model.build(input_shape=(None, np.prod(image_size)))
    disc_model.summary()

Model: "sequential_2"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 dense_1 (Dense)             (None, 6272)              125440    
                                                                 
 batch_normalization_7 (Batc  (None, 6272)             25088     
 hNormalization)                                                 
                                                                 
 leaky_re_lu_7 (LeakyReLU)   (None, 6272)              0         
                                                                 
 reshape_2 (Reshape)         (None, 7, 7, 128)         0         
                                                                 
 conv2d_transpose_4 (Conv2DT  (None, 7, 7, 128)        409600    
 ranspose)                                                       
                                                                 
 batch_normalization_8 (Batc  (None, 7, 7, 128)       

In [15]:
import time


## 옵티마이저:
g_optimizer = tf.keras.optimizers.Adam(0.0002)
d_optimizer = tf.keras.optimizers.Adam(0.0002)

if mode_z == 'uniform':
    fixed_z = tf.random.uniform(
        shape=(batch_size, z_size),
        minval=-1, maxval=1)
elif mode_z == 'normal':
    fixed_z = tf.random.normal(
        shape=(batch_size, z_size))

def create_samples(g_model, input_z):
    g_output = g_model(input_z, training=False)
    images = tf.reshape(g_output, (batch_size, *image_size))    
    return (images+1)/2.0

all_losses = []
epoch_samples = []

start_time = time.time()

for epoch in range(1, num_epochs+1):
    epoch_losses = []
    for i,(input_z,input_real) in enumerate(mnist_trainset):
        
        with tf.GradientTape() as d_tape, tf.GradientTape() as g_tape:
            g_output = gen_model(input_z, training=True)
            
            d_critics_real = disc_model(input_real, training=True)
            d_critics_fake = disc_model(g_output, training=True)

            ## 생성자 손실을 계산합니다:
            g_loss = -tf.math.reduce_mean(d_critics_fake)

            ## 판별자 손실을 계산합니다:
            d_loss_real = -tf.math.reduce_mean(d_critics_real)
            d_loss_fake =  tf.math.reduce_mean(d_critics_fake)
            d_loss = d_loss_real + d_loss_fake

            ## 그래디언트 페널티:
            with tf.GradientTape() as gp_tape:
                alpha = tf.random.uniform(
                    shape=[d_critics_real.shape[0], 1, 1, 1], 
                    minval=0.0, maxval=1.0)
                interpolated = (
                    alpha*input_real + (1-alpha)*g_output)
                gp_tape.watch(interpolated)
                d_critics_intp = disc_model(interpolated)
            
            grads_intp = gp_tape.gradient(
                d_critics_intp, [interpolated,])[0]
            grads_intp_l2 = tf.sqrt(
                tf.reduce_sum(tf.square(grads_intp), axis=[1, 2, 3]))
            grad_penalty = tf.reduce_mean(tf.square(grads_intp_l2 - 1.0))
        
            d_loss = d_loss + lambda_gp*grad_penalty
        
        ## 최적화: 그래디언트를 계산하고 적용합니다
        d_grads = d_tape.gradient(d_loss, disc_model.trainable_variables)
        d_optimizer.apply_gradients(
            grads_and_vars=zip(d_grads, disc_model.trainable_variables))
        
        g_grads = g_tape.gradient(g_loss, gen_model.trainable_variables)
        g_optimizer.apply_gradients(
            grads_and_vars=zip(g_grads, gen_model.trainable_variables))

        epoch_losses.append(
            (g_loss.numpy(), d_loss.numpy(), 
             d_loss_real.numpy(), d_loss_fake.numpy()))
                    
    all_losses.append(epoch_losses)
    
    print('에포크 {:-3d} | 시간 {:.2f} min | 평균 손실 >>'
          ' 생성자/판별자 {:6.2f}/{:6.2f} [판별자-진짜: {:6.2f} 판별자-가짜: {:6.2f}]'
          .format(epoch, (time.time() - start_time)/60, 
                  *list(np.mean(all_losses[-1], axis=0)))
    )
    
    epoch_samples.append(
        create_samples(gen_model, fixed_z).numpy()
    )

에포크   1 | 시간 3.66 min | 평균 손실 >> 생성자/판별자 264.35/-429.40 [판별자-진짜: -267.40 판별자-가짜: -264.35]
에포크   2 | 시간 7.07 min | 평균 손실 >> 생성자/판별자 196.05/-171.74 [판별자-진짜: -117.04 판별자-가짜: -196.05]
에포크   3 | 시간 10.47 min | 평균 손실 >> 생성자/판별자 177.99/-95.76 [판별자-진짜: -71.55 판별자-가짜: -177.99]
에포크   4 | 시간 13.89 min | 평균 손실 >> 생성자/판별자 153.67/  7.65 [판별자-진짜:   8.30 판별자-가짜: -153.67]
에포크   5 | 시간 17.28 min | 평균 손실 >> 생성자/판별자  87.47/ 12.47 [판별자-진짜:   9.40 판별자-가짜: -87.47]
에포크   6 | 시간 20.70 min | 평균 손실 >> 생성자/판별자  77.16/-22.17 [판별자-진짜:   7.99 판별자-가짜: -77.16]
에포크   7 | 시간 24.09 min | 평균 손실 >> 생성자/판별자  69.26/ -9.35 [판별자-진짜:  13.11 판별자-가짜: -69.26]
에포크   8 | 시간 27.51 min | 평균 손실 >> 생성자/판별자  62.45/-19.28 [판별자-진짜:  18.93 판별자-가짜: -62.45]
에포크   9 | 시간 30.90 min | 평균 손실 >> 생성자/판별자  87.78/-39.11 [판별자-진짜:  37.20 판별자-가짜: -87.78]
에포크  10 | 시간 34.33 min | 평균 손실 >> 생성자/판별자  64.18/-33.13 [판별자-진짜:  19.21 판별자-가짜: -64.18]
에포크  11 | 시간 37.75 min | 평균 손실 >> 생성자/판별자  82.42/-38.13 [판별자-진짜:  34.49 판별자-가짜: -82.42]
에포크  12 | 시간 41.18 min | 평

KeyboardInterrupt: ignored

In [None]:
#합성 샘플의 품질이 어떻게 바뀌는지 보자
import itertools


fig = plt.figure(figsize=(8, 6))

## 손실 그래프
ax = fig.add_subplot(1, 1, 1)
g_losses = [item[0] for item in itertools.chain(*all_losses)]
d_losses = [item[1] for item in itertools.chain(*all_losses)]
plt.plot(g_losses, label='Generator loss', alpha=0.95)
plt.plot(d_losses, label='Discriminator loss', alpha=0.95)
plt.legend(fontsize=20)
ax.set_xlabel('Iteration', size=15)
ax.set_ylabel('Loss', size=15)

epochs = np.arange(1, 101)
epoch2iter = lambda e: e*len(all_losses[-1])
epoch_ticks = [1, 20, 40, 60, 80, 100]
newpos   = [epoch2iter(e) for e in epoch_ticks]
ax2 = ax.twiny()
ax2.set_xticks(newpos)
ax2.set_xticklabels(epoch_ticks)
ax2.xaxis.set_ticks_position('bottom')
ax2.xaxis.set_label_position('bottom')
ax2.spines['bottom'].set_position(('outward', 60))
ax2.set_xlabel('Epoch', size=15)
ax2.set_xlim(ax.get_xlim())
ax.tick_params(axis='both', which='major', labelsize=15)
ax2.tick_params(axis='both', which='major', labelsize=15)

plt.show()

In [None]:
selected_epochs = [1, 2, 4, 10, 50, 100]
fig = plt.figure(figsize=(10, 14))
for i,e in enumerate(selected_epochs):
    for j in range(5):
        ax = fig.add_subplot(6, 5, i*5+j+1)
        ax.set_xticks([])
        ax.set_yticks([])
        if j == 0:
            ax.text(
                -0.06, 0.5, 'Epoch {}'.format(e),
                rotation=90, size=18, color='red',
                horizontalalignment='right',
                verticalalignment='center', 
                transform=ax.transAxes)
        
        image = epoch_samples[e-1][j]
        ax.imshow(image, cmap='gray_r')

plt.show()

## 17.3.8 모드 붕괴

GAN은 적대적인 특징 때문에 훈련하기 어렵기로 악명이 높다.

실패하는 흔한 이유 중 하나는 생성자가 작은 부분 공간에 갇혀 단순한 샘플만 생성하는 것을 학습할 때이다.

이를 모드 붕괴라고 부른다.

그레이디언트 폭주와 소멸 문제 외에도 GAN 훈련을 어렵게 만들 수 있는 또 다른 면이 있다.

전문가들이 권장하는 몇 가지 기법이 있다.

하나는 미니 배치 판별이다.

가짜나 진짜 샘플로만 이루어진 배치를 따로 판별자에게 주입한다.

미니 배치 판별에서는 판별자가 배치가 진짜인지 가짜인지 판단하기 위해 배치 안의 샘플을 비교한다.

만약 모델이 모드 붕괴 문제를 겪고 있다면 진짜 샘플로만 구성된 배치가 가짜 배치보다 다양성이 높을 것이다.

안정적인 GAN 훈련을 위해 널리 사용하는 또 다른 기법은 특성 매칭이다.

특성 매칭에서는 생성자의 목적 함수를 조금 수정한다.

판별자의 중간 표현을 기반으로 원본 이미지와 합성 이미지 간의 차이를 최소화하는 추가적인 항을 더한다.

In [None]:
Image(url='https://git.io/JLjcT', width=600)