In [None]:
from tensorflow.keras.datasets import mnist
import numpy as np

(X_train, _), (X_test, y_test) = mnist.load_data()

def preprocess_data(dataset):
    dataset = dataset.astype('float32') / 255.0
    dataset = np.expand_dims(dataset, axis=-1)
    return dataset

X_train = preprocess_data(X_train)
X_test = preprocess_data(X_test)

print(X_train.shape, X_train.dtype, X_test.shape, X_test.dtype)

Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz
(60000, 28, 28, 1) float32 (10000, 28, 28, 1) float32


In [None]:
# VAE Encoder Model
from tensorflow.keras import layers
from tensorflow.keras.models import Model

def vae_encoder(latent_dim=2, shape=(28,28,1)):
    inputs = layers.Input(shape=shape)
    x = layers.Conv2D(32, 3, strides=2, activation='relu', padding='same')(inputs)
    x = layers.Conv2D(64, 3, strides=2, activation='relu', padding='same')(x)
    x = layers.Flatten()(x)
    x = layers.Dense(16, activation='relu')(x)

    # 잠재 변수의 평균과 로그 분산
    z_mean = layers.Dense(latent_dim)(x)
    z_log_var = layers.Dense(latent_dim)(x)

    model = Model(inputs, [z_mean, z_log_var], name='encoder')
    return model

In [None]:
# VAE Decoder Model
from tensorflow.keras import layers
from tensorflow.keras.models import Model

def vae_decoder(latent_dim=2):
    latent_inputs = layers.Input(shape=(latent_dim,))
    x = layers.Dense(7*7*64, activation='relu')(latent_inputs)

    x = layers.Reshape(target_shape=(7,7,64))(x)
    x = layers.Conv2DTranspose(64, 3, strides=2, activation='relu', padding='same')(x)

    x = layers.Conv2DTranspose(32, 3, strides=2, activation='relu', padding='same')(x)

    outputs = layers.Conv2DTranspose(1, 3, activation='sigmoid', padding='same')(x)

    model = Model(latent_inputs, outputs, name='decoder')
    return model

In [None]:
import tensorflow as tf

def sampling(z_mean, z_log_var):
    batch = tf.shape(z_mean)[0]
    dim = tf.shape(z_mean)[1]
    epsilon = tf.keras.backend.random_normal(shape=(batch, dim))
    return z_mean + tf.exp(0.5 * z_log_var) * epsilon

In [None]:
# VAE 모델 정으
latent_dim = 2
encoder = vae_encoder(latent_dim=latent_dim)
decoder = vae_decoder(latent_dim=latent_dim)

inputs = layers.Input(shape=(28,28,1))
z_mean, z_log_var = encoder(inputs)
z = sampling(z_mean, z_log_var)
outputs = decoder(z)

vae_model = Model(inputs, outputs)
vae_model.summary()

Model: "model"
__________________________________________________________________________________________________
 Layer (type)                Output Shape                 Param #   Connected to                  
 input_3 (InputLayer)        [(None, 28, 28, 1)]          0         []                            
                                                                                                  
 encoder (Functional)        [(None, 2),                  69076     ['input_3[0][0]']             
                              (None, 2)]                                                          
                                                                                                  
 tf.compat.v1.shape (TFOpLa  (2,)                         0         ['encoder[0][0]']             
 mbda)                                                                                            
                                                                                              

In [None]:
# 손실함수 정의
from tensorflow.keras.losses import mse

# 재구성 손실 + KL 손실
def vae_loss(inputs, outputs, z_mean, z_log_var):
    # 재구성 손실
    reconstruction_loss = mse(tf.keras.backend.flatten(inputs), tf.keras.backend.flatten(outputs))
    reconstruction_loss *= 28 * 28 # 원래 이미지 크기가 반영되도록 이미지 화소의 개수만큼 곱해준다.

    # KL 발산 손실(Kullback-Leibler divergence)
    kl_loss = 1 + z_log_var - tf.square(z_mean) - tf.exp(z_log_var) # KL 로스 수식을 구현
    kl_loss = tf.reduce_sum(kl_loss, axis=-1) # 각 차원에 대한 합
    kl_loss *= -0.5 # 부호를 맞추기 위해서 조정, KL 발산 조정

    # 전체 손실
    total_loss = tf.reduce_mean(reconstruction_loss + kl_loss) # 최종 손실 계산
    return total_loss

In [None]:
# 모델 컴파일 및 학습
vae_model.add_loss(vae_loss(inputs, outputs, z_mean, z_log_var))
vae_model.compile(optimizer='adam')

vae_model.fit(X_train, epochs=50, batch_size=128, validation_data=(X_test, None))

TypeError: unhashable type: 'DictWrapper'

In [None]:
# 모델 저장하기
model.save("vae.keras")

In [None]:
# 모델 불러오기
vae_model.load_weights("vae.keras")

In [None]:
# 잠재 벡터를 이용한 이미지 생성
digit = decoder.predict([[0, 0]])
print(digit.shape)

In [None]:
# 시각화
import matplotlib.pyplot as plt

plt.figure(figsize=(3, 3))
plt.imshow(digit[0,...], cmap='gray')
plt.axis('off')
plt.show()

In [None]:
# 테스트
z_test_mean, z_test_log_var = np.array(encoder.predict(X_test[:16, ...]))
digits = decoder.predict(sampling(z_test_mean, z_test_log_var))

plt.figure(figsize=(2,2))
for i in range(16):
    plt.subplot(4,4,i+1)
    plt.imshow(digits[i,...], cmap='gray')
    plt.axis('off')
plt.show()

print(y_test[:16])

In [None]:
x_test_latent1, x_test_latent2 = np.array(encoder.predict(X_test))
x_test_mean, x_test_log_var = x_test_latent1[:,0], x_test_latent1[:,1]

In [None]:
# 잠재벡터 확인
x = z_test_mean[:, 0]
y = z_test_mean[:, 1]
plt.figure(figsize=(10,10))
scatter = plt.scatter(x, y, c=y_test)
plt.colorbar(scatter)
plt.show()

In [None]:
# 잠재벡터 확인
x = z_test_log_var[:, 0]
y = z_test_log_var[:, 1]
plt.figure(figsize=(10,10))
scatter = plt.scatter(x, y, c=y_test)
plt.colorbar(scatter)
plt.show()