In [None]:
import numpy as np
import matplotlib.pyplot as plt

from tensorflow.keras.layers import Input, Dense, Lambda, Layer
from tensorflow.keras.models import Model
from tensorflow.keras import backend as K
from tensorflow.keras import metrics
from tensorflow.keras.datasets import mnist

In [None]:
(x_train, y_train), (x_test, y_test) = mnist.load_data()

In [None]:
image_size = x_train.shape[1]
original_dim = image_size * image_size
latent_dim = 2
intermediate_dim = 256
epochs = 50
epsilon_std = 1.0

# 훈련 배열 전 처리
x_train = np.reshape(x_train, [-1, original_dim])
x_test = np.reshape(x_test, [-1, original_dim])
x_train = x_train.astype('float32') / 255.
x_test = x_test.astype('float32') / 255.

In [None]:
x_train.shape,  x_test.shape, type(x_train)

In [None]:
# 인코더 모듈
input_layer = Input(shape=(original_dim,))
intermediate_layer = Dense(intermediate_dim,
                           activation='relu',
                           name='Intermediate_layer')(input_layer)
z_mean = Dense(latent_dim,
               name='z_mean')(intermediate_layer)
z_log_var = Dense(latent_dim,
                  name='z_log_var')(intermediate_layer)


In [None]:
encoder_module = Model(input_layer, (z_mean, z_log_var))
encoder_module.summary()

In [None]:
# 샘플링 함수 정의
def sampling(args):
    z_mean, z_log_var = args
    epsilon = K.random_normal(shape=(K.shape(z_mean)[0], latent_dim), 
                              mean=0., 
                              stddev=epsilon_std)
    return z_mean + K.exp(z_log_var / 2) * epsilon


In [None]:
z = Lambda(sampling, output_shape=(latent_dim,))([z_mean, z_log_var])

In [None]:
# 디코더 모듈
decoder_h = Dense(intermediate_dim, activation='relu')
decoder_mean = Dense(original_dim, activation='sigmoid')
h_decoded = decoder_h(z)
x_decoded_mean = decoder_mean(h_decoded)

In [None]:
# 커스텀 가변 손실 레이어
class CustomVariationalLayer(Layer):
    def __init__(self, **kwargs):
        self.is_placeholder = True
        super(CustomVariationalLayer, self).__init__(**kwargs)

    def vae_loss(self, x, x_decoded_mean):
        xent_loss = original_dim * \
                    metrics.binary_crossentropy(x, x_decoded_mean)
        kl_loss = -0.5 * K.sum(1 + z_log_var
                               - K.square(z_mean)
                               - K.exp(z_log_var),
                               axis=-1)
        return K.mean(xent_loss + kl_loss)

    def call(self, inputs):
        x = inputs[0]
        x_decoded_mean = inputs[1]
        loss = self.vae_loss(x, x_decoded_mean)
        self.add_loss(loss, inputs=inputs)
        return x

In [None]:
y = CustomVariationalLayer()([input_layer, x_decoded_mean])

In [None]:
vae = Model(input_layer, y)
vae.compile(optimizer='rmsprop', loss=None)


In [None]:
vae.summary()

In [None]:
# 모델 훈련
vae.fit(x_train,
        shuffle=True,
        #epochs=epochs,
        epochs=1,
        batch_size=100,
        validation_data=(x_test, None))

In [None]:
# np.array(x_test_encoded).shape
x_test_encoded[1]

In [None]:
# 잠재 공간 2차원 시각화
x_test_encoded = encoder_module.predict(x_test, batch_size=256)
#x_test_encoded = encoder_network.predict(x_test, batch_size=256)
plt.figure(figsize=(8, 8))
# plt.scatter(x_test_encoded[0][:, 0], x_test_encoded[0][:, 1], c=y_test, cmap='Paired')
plt.scatter(x_test_encoded[1][:, 0], x_test_encoded[1][:, 1], c=y_test, cmap='Paired')
plt.colorbar()
plt.show()


In [None]:
# 학습한 분산으로부터 샘플을 새성하는 생성자 네트워크 구현
decoder_input = Input(shape=(latent_dim,))
_h_decoded = decoder_h(decoder_input)
_x_decoded_mean = decoder_mean(_h_decoded)
generator = Model(decoder_input, _x_decoded_mean)

In [None]:
from scipy.stats import norm

# 숫자의 20 매니폴드 표시
n = 15  # 15 x 15 figure 크기
digit_size = 28
figure = np.zeros((digit_size * n, digit_size * n))

# 단위 면적 공간에 선형적으로 위치한 좌표들은 가우시안 역 CDF(ppf)을 통해 변환되어 잠재 변수 z의 값을 생성한다. 
# 이전 잠재 공간이 가우시안 분포를 따르기 때문이다.
grid_x = norm.ppf(np.linspace(0.05, 0.95, n))
grid_y = norm.ppf(np.linspace(0.05, 0.95, n))

for i, yi in enumerate(grid_x):
    for j, xi in enumerate(grid_y):
        z_sample = np.array([[xi, yi]])
        x_decoded = generator.predict(z_sample)
        digit = x_decoded[0].reshape(digit_size, digit_size)
        figure[i * digit_size: (i + 1) * digit_size, 
               j * digit_size: (j + 1) * digit_size] = digit

plt.figure(figsize=(10, 10))
plt.imshow(figure, cmap='binary')
plt.show()