In [12]:
import os
import numpy as np
import tensorflow as tf
from  tensorflow import keras
from matplotlib import pyplot as plt
from tensorflow.keras import Sequential, layers,optimizers
from    PIL import Image

In [2]:
# 设置随机种子以及提示信息
tf.random.set_seed(22)
np.random.seed(22)
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
assert tf.__version__.startswith('2.')

In [3]:
def save_images(imgs,name):
    new_img = Image.new('L',(280,280))

    index = 0
    for i in range(0,280,28):
        for j in range(0,280,28):
            im = imgs[index]
            im = Image.fromarray(im,mode ='L')
            new_img.paste(im,(i,j))
            index +=1

    new_img.save(name)

In [18]:
h_dim = 20
batchsz = 512
lr = 1e-3

In [19]:
(x_train, y_train), (x_test, y_test) = keras.datasets.fashion_mnist.load_data()
x_train, x_test = x_train.astype(np.float32) / 255., x_test.astype(np.float32) / 255.
# we do not need label
train_db = tf.data.Dataset.from_tensor_slices(x_train)
train_db = train_db.shuffle(batchsz * 5).batch(batchsz)
test_db = tf.data.Dataset.from_tensor_slices(x_test)
test_db = test_db.batch(batchsz)

print(x_train.shape, y_train.shape)
print(x_test.shape, y_test.shape)


(60000, 28, 28) (60000,)
(10000, 28, 28) (10000,)


In [6]:
# 超参数设置
z_dim = 20
batch_size = 512
lr = 1e-3

In [10]:
class VAE(keras.Model):

    def __init__(self):
        super(VAE,self).__init__()

        # Encoder 网络
        self.fc1 = layers.Dense(128)
        self.fc2 = layers.Dense(z_dim)  #均值
        self.fc3 = layers.Dense(z_dim)  #方差

        # Decoder 网络
        self.fc4 = layers.Dense(128)
        self.fc5 = layers.Dense(784)

    def encoder(self,x):

        h = tf.nn.relu(self.fc1(x))

        mu = self.fc2(h)

        log_var = self.fc3(h)

        return mu,log_var

    def decoder(self,z):

        out = tf.nn.relu(self.fc4(z))
        out = self.fc5(out)

        return out

    def reparameterize(self,mu,log_var):

        eps = tf.random.normal(log_var.shape)

        std = tf.exp(log_var)**0.5

        z = mu + std * eps
        return z

    def call(self,inputs,training=None):
        mu,log_var = self.encoder(inputs)

        z = self.reparameterize(mu,log_var)

        x_hat = self.decoder(z)

        return x_hat,mu,log_var


In [14]:
model = VAE()

model.build(input_shape=(4,784))

model.summary()

optimizer = optimizers.Adam(lr)

Model: "vae_5"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
dense_21 (Dense)             multiple                  100480    
_________________________________________________________________
dense_22 (Dense)             multiple                  2580      
_________________________________________________________________
dense_23 (Dense)             multiple                  2580      
_________________________________________________________________
dense_24 (Dense)             multiple                  2688      
_________________________________________________________________
dense_25 (Dense)             multiple                  101136    
Total params: 209,464
Trainable params: 209,464
Non-trainable params: 0
_________________________________________________________________


In [20]:
for epoch in range(100):
    for step,x in enumerate(train_db):

        x = tf.reshape(x,[-1,784])

        with tf.GradientTape() as tape:

            x_rec_logits,mu,log_var = model(x)

            rec_loss = tf.nn.sigmoid_cross_entropy_with_logits(labels=x,logits=x_rec_logits)

            rec_loss = tf.reduce_mean(rec_loss)/x.shape[0]

            kl_div = -0.5 * (log_var + 1- mu**2 - tf.exp(log_var))

            kl_div = tf.reduce_sum(kl_div)/x.shape[0]

            loss = rec_loss + 1. * kl_div


        grads = tape.gradient(loss,model.trainable_variables)

        optimizer.apply_gradients(zip(grads,model.trainable_variables))

        if step % 100 == 0:
            print(epoch, step, 'kl div:', float(kl_div), 'rec loss:', float(rec_loss))

    # evaluation
    z = tf.random.normal((batchsz, z_dim))
    logits = model.decoder(z)
    x_hat = tf.sigmoid(logits)
    x_hat = tf.reshape(x_hat, [-1, 28, 28]).numpy() *255.
    x_hat = x_hat.astype(np.uint8)
    save_images(x_hat, 'vae_images/sampled_epoch%d.png'%epoch)

    x = next(iter(test_db))
    x = tf.reshape(x, [-1, 784])
    x_hat_logits, _, _ = model(x)
    x_hat = tf.sigmoid(x_hat_logits)
    x_hat = tf.reshape(x_hat, [-1, 28, 28]).numpy() *255.
    x_hat = x_hat.astype(np.uint8)
    save_images(x_hat, 'vae_images/rec_epoch%d.png'%epoch)

        




0 0 kl div: 4.994808197021484 rec loss: 0.0013679481344297528
0 100 kl div: 0.0012910384684801102 rec loss: 0.0010831286199390888
1 0 kl div: 0.0007409519166685641 rec loss: 0.0010349482763558626
1 100 kl div: 0.0006935124401934445 rec loss: 0.0009704981930553913
2 0 kl div: 0.00042013899656012654 rec loss: 0.000977409421466291
2 100 kl div: 0.00024227809626609087 rec loss: 0.000960362667683512
3 0 kl div: 0.0004030410200357437 rec loss: 0.0009635186870582402
3 100 kl div: 0.0002925911103375256 rec loss: 0.0009743685368448496
4 0 kl div: 0.00012343772687017918 rec loss: 0.0009570939000695944
4 100 kl div: 0.00015748717123642564 rec loss: 0.0009406720055267215
5 0 kl div: 0.00018369726603850722 rec loss: 0.0009612427093088627
5 100 kl div: 0.00013153679901733994 rec loss: 0.0009624602389521897
6 0 kl div: 0.0001381991314701736 rec loss: 0.0009623814839869738
6 100 kl div: 9.958632290363312e-05 rec loss: 0.0009573789429850876
7 0 kl div: 8.360372157767415e-05 rec loss: 0.0009592307615093

KeyboardInterrupt: 