# 1-2 Variational AutoEncoder

<img src="./img/vae.png" alt="variationalautoencoder" width="500" align="left"/>

### 모듈 임포트

- os (디렉토리 생성)
- tensorflow (학습)
- numpy (랜덤데이터 플로팅)
- matplotlib.pyplot (플로팅)
- matplotlib.gridspec (플로팅)

In [None]:
import os
import tensorflow as tf
import numpy as np
from matplotlib import pyplot as plt
from matplotlib import gridspec as gridspec

### Logging Directory 설정

In [None]:
CKPT_DIR = "../generated_output/VAE"

### 학습 변수 설정

- learning rate : gradient descent시 적용할 학습률
- training steps : training 종료조건
- batch size : 1 step에 feed forward할 배치 크기

- 1 step에 batch size만큼의 데이터를 feed forward하기 때문에, step * batch size = feed forward된 데이터 샘플수
- 1 epoch은 총 sample수를 사용하여 학습한 것을 의미하므로, feed forward된 sample수 / 데이터 sample수 = epoch수

In [None]:
LEARNING_RATE = 1e-4
TRAINING_STEPS = 60000
BATCH_SIZE = 100

### 네트워크 변수 설정

- image dimension : 이미지 차원, Fashion-MNIST가 28x28이므로 총 784차원의 벡터
- latent dimension : 임베딩할 차원
- encoder hidden dimension : 인코더 파트의 은닉층 차원
- decoder hidden dimension : 디코더 파트의 은닉층 차원

In [None]:
IMAGE_DIM = 784
LATENT_DIM = 10
ENCODER_HIDDEN_DIM = 256
DECODER_HIDDEN_DIM = 256
graph = tf.Graph()

### Encoder 함수 정의

[batch_size, 784]

$\rightarrow$ Dense(784, 256) $\rightarrow$ relu $\rightarrow$ [batch_size, 256]

$\rightarrow$ 2 x Dense(256, 10) $\rightarrow$ 2 x [batch_size, 10]

#### Initialization 구현

shape = [784, 256]일 때, 이에 대한 weight initialization 구현법  
(실용 코드에서는 tensorflow. initializers에 함수로 구현되어, 호출만 하면 됨)

1. Lecun initialization

> tf.random_normal(shape=[shape], stddev=tf.sqrt(1. / (shape[0]))

2. Glorot initialization

> tf.random_normal(shape=[shape], stddev=tf.sqrt(2. / (shape[0] + shape[1])))

3. He initialization

> tf.random_normal(shape=[shape], stddev=tf.sqrt(2. / shape[0]))


In [None]:
def encoder_model(features):
    
    with tf.variable_scope('encoder', reuse=tf.AUTO_REUSE):
        
        net = features
        
        net = tf.layers.dense(
            net, ENCODER_HIDDEN_DIM, 
            activation=tf.nn.relu, 
            kernel_initializer=tf.initializers.he_normal())
        # tf.layers.dense(input, output dim)
        # 반복문을 통해 hidden dimension 리스트 형태로 hidden layer 구성
        # hidden layer의 activation function은 relu를 씀 (gradient vanishing 방지)
        # initializer은 he initialization을 씀 (relu에 최적화, stddev = sqrt(2/fan_in))            
            
        latent_mean = tf.layers.dense(
            net, LATENT_DIM, 
            kernel_initializer=tf.initializers.he_normal())
        # latent의 mean으로의 embedding
        
        latent_log_var = tf.layers.dense(
            net, LATENT_DIM, 
            kernel_initializer=tf.initializers.he_normal())
        # latent의 stddev으로의 embedding (계산 편의상 stddev대신 log variance를 이용)
        
        return latent_mean, latent_log_var
        # latent의 mean과 stddev(log variance) embedding tensor 리턴

### Sampling 함수 정의

- encoding하여 얻은 mean과 stddev(log variance)를 이용하여 random sampling하는 함수 정의 

2 x [batch_size, 10] $\rightarrow$ [batch_size, 10]

In [None]:
def sampler_model(latent_mean, latent_log_var):
    
    with tf.variable_scope('sampler', reuse=tf.AUTO_REUSE):
        
        snd_sample = tf.random_normal(
            tf.shape(latent_log_var), dtype=tf.float32, 
            mean=0., stddev=1.0)
        # latent 차원의 표준정규분포 샘플링
        
        latent_std = tf.exp(latent_log_var / 2)
        # log variance를 통하여 stddev계산
        
        latent = latent_mean + latent_std * snd_sample
        # 표준정규분포를 latent mean, latent stddev에 해당하는 샘플로 변환
        # tf.random_normal이 mean, stddev로서 단일값 혹은 0-D tensor을 받기 때문에 이런 식으로 샘플링
        
        return latent
        # 샘플링 된 gaussian distribution형태의 latent 리턴

### Decoder 함수 정의

[batch_size, 10]

$\rightarrow$ Dense(128, 256) $\rightarrow$ relu $\rightarrow$ [batch_size, 256]

$\rightarrow$ Dense(256, 784) $\rightarrow$ sigmoid $\rightarrow$ [batch_size, 784]

In [None]:
def decoder_model(features):
    
    with tf.variable_scope('decoder', reuse=tf.AUTO_REUSE):
        
        net = features
        
        net = tf.layers.dense(
            net, DECODER_HIDDEN_DIM, 
            activation=tf.nn.relu, 
            kernel_initializer=tf.initializers.he_normal())
        # tf.layers.dense(input, output dim)
        # 반복문을 통해 hidden dimension 리스트 형태로 hidden layer 구성
        # hidden layer의 activation function은 relu를 씀 (gradient vanishing 방지)
        # initializer은 he initialization을 씀 (relu에 최적화, stddev = sqrt(2/fan_in))
            
        recon = tf.layers.dense(
            net, IMAGE_DIM, 
            activation=tf.nn.sigmoid, 
            kernel_initializer=tf.initializers.he_normal())
        # output layer
        # output layer의 activation function은 sigmoid를 씀 (0~1로 mapping)
        
        return recon
        # output layer을 거친 tensor을 리턴

### Input Function 정의

feature

$\rightarrow$ Dataset 생성 $\rightarrow$ 데이터 사이즈로 랜덤 셔플링 $\rightarrow$ 데이터 모두 소모시 처음부터 반복

$\rightarrow$ batch 생성 $\rightarrow$ batch 한 개씩 소모하는 iterator $\rightarrow$ iterator 호출

In [None]:
def train_input_fn(features, batch_size=BATCH_SIZE):
    
    with graph.as_default():
        
        dataset = tf.data.Dataset.from_tensor_slices(features)
        # 인풋값으로부터 Dataset객체 생성
        batch_dataset = dataset.shuffle(features.shape[0]).repeat().batch(batch_size)
        # Dataset 셔플링, 반복, 배치화
        return batch_dataset.make_one_shot_iterator().get_next()
        # 단일 배치 반복자 리턴

<img src="./img/vae_recloss.png" alt="vae_recloss" width="400" align="top"/>
<img src="./img/vae_regloss.png" alt="vae_regloss" width="450" align="top"/>

### Regularization Loss

- 직접 mean을 0으로, stddev를 1로 보내는 loss를 설계할 경우와의 차이
    - MSE혹은 cross-entropy를 이용하여 latent 모수를 직접 특정 값으로 보낼 경우
        - 레이블에 관계 없이 모든 mean은 0으로 mapping되고, stddev은 1로 mapping됨
        - sampling bias를 이용한 random generation 적용 불가
    - Distribution의 KL-Divergence를 이용할 경우
        - 각각의 mean, stddev은 다양한 값들을 가짐 (물론, 중심점으로부터 멀지는 않음)
        - 단, 다양한 mean과 stddev의 데이터가 혼합된 batch embedding은 표준정규분포의 형태가 됨
        - 표준정규분포 내부에 데이터 특성에 따라 biased된 정규분포가 존재하는 형태
        
- 표준정규분포에서의 샘플링시 sampling bias 발생
    - 표본평균과 표본표준편차에 따라 특정 특징을 지닌 이미지로 mapping
    - 이를 통하여 학습 데이터셋에 포함되어있던 다양한 이미지 생성 가능

In [None]:
def vae_loss(features, outputs, latent_mean, latent_log_var):
    
    rec_loss = tf.reduce_sum(
        tf.keras.backend.binary_crossentropy(features, outputs), 1)
    # reconstruction loss (cross-entropy btw input image and output image)
    # reconstructed image를 원본과 유사하게 하는 역할을 하는 loss

    reg_loss = -0.5 * tf.reduce_sum(
        1 + latent_log_var 
        - tf.square(latent_mean) 
        - tf.exp(latent_log_var), 1)
    # regularization loss (KL-Divergence btw latent and N(0,1))
    # latent를 표준정규분포에 가깝게 가져가는 역할을 하는 loss
    
    return tf.reduce_mean(rec_loss + reg_loss)
    # reconstruction loss + regularization loss 리턴

### Variational AutoEncoder 학습 함수 정의

[batch_size, 784]

$\rightarrow$ Dense(784, 256) $\rightarrow$ relu $\rightarrow$ [batch_size, 256]

$\rightarrow$ 2 x Dense(256, 10) $\rightarrow$ 2 x [batch_size, 10]

$\rightarrow$ Gaussian Distribution Sampling $\rightarrow$ [batch_size, 10]

$\rightarrow$ Dense(128, 256) $\rightarrow$ relu $\rightarrow$ [batch_size, 256]

$\rightarrow$ Dense(256, 784) $\rightarrow$ sigmoid $\rightarrow$ [batch_size, 784]

In [None]:
def train(features):
    if not os.path.exists(os.path.dirname(CKPT_DIR)):
        os.makedirs(os.path.dirname(CKPT_DIR))
    # logging directory 생성

    with graph.as_default():
        
        features = train_input_fn(features)
        # feature 받아서 batch로 넘겨주는 그래프 작성
        latent_mean, latent_log_var = encoder_model(features)
        # batch를 encoder에 적용하여 latent의 mean과 stddev(log variance)를 추출하는 그래프 작성
        sample = sampler_model(latent_mean, latent_log_var)
        # mean과 stddev(log variance)이용하여 샘플링하여 latent를 생성하는 그래프 작성
        outputs = decoder_model(sample)
        # latent를 decoder에 적용하여 복원하는 그래프 작성
        loss = vae_loss(features, outputs, latent_mean, latent_log_var)
        # loss는 reconstruction loss + regularization loss
        optimizer = tf.train.AdamOptimizer(LEARNING_RATE).minimize(loss)
        # backpropagation operation 정의
        # optimizer은 ADAM 적용, 설정한 learning rate 적용
        saver = tf.train.Saver()
        # checkpoint 저장할 Saver객체 생성
        
        with tf.Session() as sess:
        # Session객체 생성, 컨텍스트 매니저로 자원 관리
            
            sess.run(tf.global_variables_initializer())
            # weight 초기화

            for step in range(TRAINING_STEPS):
                step += 1             
                loss_now, _ = sess.run([loss, optimizer])
                # feedforward, backpropagation 실행
                # Plotting할 embedding assigning operation도 같이 실행
                # 한 번의 run을 통해 실행해야 batch iterator가 한 번만 실행됨에 유의
                if (step == TRAINING_STEPS):    
                    saver.save(sess, CKPT_DIR + '/vae.ckpt')
                    # 학습된 weights에 대한 checkpoint 저장
                if (step % 1000 == 0):
                    print('steps: {}/{}, loss: {:.4f}'.format(step, TRAINING_STEPS, loss_now))
                    # 학습과정 로깅

### Fashion-MNIST 데이터 로드 및 전처리
- Fashion-MNIST 데이터 로드
- normalize
- flatten

In [None]:
(x_train, _), (x_test, _) = tf.keras.datasets.fashion_mnist.load_data()
# Fashion-MNIST 데이터 로드 (keras.datesets 모듈 이용)

x_train = x_train / 255.
x_test = x_test / 255.
# Fashion-MNIST 데이터 normalize (0~1로 mapping)

x_train = x_train.reshape([-1, IMAGE_DIM]).astype(np.float32)
x_test = x_test.reshape([-1, IMAGE_DIM]).astype(np.float32)
# Fashion-MNIST 데이터 vectorization

### Variational AutoEncoder 학습

In [None]:
train(x_train)

### Reconstruction Plotting 함수 정의

In [None]:
def recon_25_image_plot(features):
    
    with graph.as_default():
        
        dataset = tf.data.Dataset.from_tensor_slices(features)
        batch_dataset = dataset.batch(25)
        features = batch_dataset.make_one_shot_iterator().get_next()
        # plotting batch 생성
        
        latent_mean, latent_log_var = encoder_model(features)
        # vector을 encoder에 적용하여 latent 추출
        latents = sampler_model(latent_mean, latent_log_var)
        recon = decoder_model(latents)
        # latent를 decoder에 적용하여 복원
        
        fig = plt.figure(figsize=(10, 10))
        gs = gridspec.GridSpec(5, 5)
        gs.update(wspace=0.05)
        # 플로팅 사이즈, 배열, 간격 정의
        
        saver = tf.train.Saver()
        # weight restoring 진행할 Saver 객체 생성
        
        with tf.Session() as sess:
            
            saver.restore(sess, tf.train.latest_checkpoint(CKPT_DIR))
            # checkpoint로부터 weight 복원
            
            recon_image = sess.run(recon)
            # reconstruction operation 진행
            
            recon_image = recon_image.reshape([-1, 28, 28])
            # tensorization
            
            for i in range(25):
                plt.subplot(gs[i])
                plt.axis('off')
                plt.imshow(recon_image[i], cmap = 'gray')
            # reconstructed image plotting

### Original Image Plotting 함수 정의

In [None]:
def origin_25_image_plot(features):
    features = features.reshape([-1, 28, 28])
    fig = plt.figure(figsize=(10, 10))
    gs = gridspec.GridSpec(5, 5)
    gs.update(wspace=0.05)
    for i in range(25):
        plt.subplot(gs[i])
        plt.axis('off')
        plt.imshow(features[i], cmap = 'gray')

### Reconstruction Plot

In [None]:
recon_25_image_plot(x_test[:25])
origin_25_image_plot(x_test[:25])

### Random Plotting 함수 정의

In [None]:
def random_25_image_plot(seed=None):
    
    with graph.as_default():
        
        np.random.seed(seed)
        random_noise = np.random.normal(size=[25, LATENT_DIM]).astype(np.float32)
        # latent 형태의 random noise 생성
        
        random_noise_input = train_input_fn(random_noise, batch_size=25)
        # random noise tensor 생성
        
        random_gen = decoder_model(random_noise_input)
        # random noise를 decoder에 적용, Fashion-MNIST image 차원으로 임베딩

        fig = plt.figure(figsize=(10, 10))
        gs = gridspec.GridSpec(5, 5)
        gs.update(wspace=0.05)
        # 플로팅 사이즈, 배열, 간격 정의

        saver = tf.train.Saver()
        # weight restoring 진행할 Saver 객체 생성
        
        with tf.Session() as sess:
            
            saver.restore(sess, tf.train.latest_checkpoint(CKPT_DIR))
            # checkpoint로부터 weight 복원
            
            random_image = sess.run(random_gen)
            # random generation operation 진행
            
            random_image = random_image.reshape([-1, 28, 28])
            # tensorization
            
            for i in range(25):
                plt.subplot(gs[i])
                plt.axis('off')
                plt.imshow(random_image[i], cmap = 'gray')
            # plotting

### Random Plot

In [None]:
random_25_image_plot()