<a href="https://colab.research.google.com/github/whkaikai/AI/blob/main/mnist_keras_GRU_VAEmodel.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import math
import os
import numpy as np
from keras.layers import Input, InputLayer, Dense, RepeatVector, Lambda, TimeDistributed
# from keras.layers import GRU
from keras.layers import CuDNNGRU as GRU  # GPU用
from keras.models import Model, Sequential
from keras.callbacks import TensorBoard, EarlyStopping
# from keras.optimizers import adam
from keras import backend as K

In [None]:
def prepare_data():
    """
    use mnist as 
      x_train, x_test
    """
    from keras.datasets import mnist
    (x_train, _), (x_test, _) = mnist.load_data()
    x_train, x_test = x_train/255, x_test/255
    # >>> x_train.shape, x_test.shape
    # ((60000, 28, 28), (10000, 28, 28))
    # as (None, NUM_TIMESTEPS, NUM_INPUT_DIM),NUM_TIMESTEPS = 28, NUM_INPUT_DIM = 28
    global NUM_TIMESTEPS, NUM_INPUT_DIM
    _, NUM_TIMESTEPS, NUM_INPUT_DIM = x_train.shape
    return x_train, x_test

def seq_vae():
    """
    (input)
    ↓
    GRU(encoder)
    ↓
    in GRU
    ↓   ↓
    mean, log_var
    ↓
    z--encoder
    ↓
    GRU(decoder)
    ↓
    output


    train 
     model
    """
    LATENT_DIM = 40
    CODING_DIM=3

    def sampling(args):
        """
        从 z_mean, z_log_var = args 中采样 z 的函数
         返回值
             z (tf.tensor)：采样的潜在变量
        """
        z_mean, z_log_var = args
        batch = K.shape(z_mean)[0]
        dim = K.int_shape(z_mean)[1]
        # by default, random_normal has mean=0 and std=1.0
        epsilon = K.random_normal(shape=(batch, dim))
        # K.exp (0.5 * z_log_var) 是方差的标准差
        # 突然找到标准差是可以的，因为它允许负数。
        return z_mean + K.exp(0.5 * z_log_var) * epsilon

    # encoder
    inputs = Input(shape=(NUM_TIMESTEPS, NUM_INPUT_DIM))
    # (None, NUM_TIMESTEPS,NUM_INPUT_DIM)
    x = GRU(LATENT_DIM)(inputs)
    # (None, CODING_DIM)
    z_mean = Dense(CODING_DIM, name='z_mean')(x)  # z_mean输出
    # (None, CODING_DIM)
    z_log_var = Dense(CODING_DIM, name='z_log_var')(x)  # z_sigma输出
    # (None, LATENT_DIM)

    # use reparameterization trick to push the sampling out as input
    # note that "output_shape" isn't necessary with the TensorFlow backend
    z = Lambda(sampling, output_shape=(CODING_DIM,), name='z')(
        [z_mean, z_log_var])  # 接收两个变量并随机抽样
    # (None, CODING_DIM)
    encoder = Model(inputs, [z_mean, z_log_var, z], name="encoder")
    # #encoder 部分接收输入并返回：均值、方差和从中随机采样。

    # decoder
    latent_inputs = RepeatVector(
        NUM_TIMESTEPS)(z)
    # (None, NUM_TIMESTEPS, CODING_DIM)
    x = GRU(LATENT_DIM, return_sequences=True)(latent_inputs)
    # (None, NUM_TIMESTEPS, LATENT_DIM)
    outputs = TimeDistributed(
        Dense(NUM_INPUT_DIM, activation='sigmoid'))(x)
    # (None, NUM_TIMESTEPS, NUM_INPUT_DIM)

    # instantiate decoder model
    # decoder = Model(z, outputs, name='decoder')
    # print("decoder构成")
    # decoder.summary()

    # 结合解码器和编码器
    # 以编码器的输出 z 作为输入运行解码器
    vae = Model(inputs, outputs, name='seq_vae')

    # 向该模型添加损失函数
    def loss(inputs, outputs):
        """
        损失函数定义
        """
        from keras.losses import binary_crossentropy
        z_mean, z_log_var, _ = encoder(inputs)
        reconstruction_loss = binary_crossentropy(
            K.flatten(inputs), K.flatten(outputs))
#         reconstruction_loss *= NUM_INPUT_DIM*NUM_TIMESTEPS
        kl_loss = 1 + z_log_var - K.square(z_mean) - K.exp(z_log_var)
        kl_loss = K.sum(kl_loss, axis=-1)
        kl_loss *= -0.5
        
        lam = 0.01
        print("reconstruction",reconstruction_loss,"kl",kl_loss)
        return K.mean((1-lam)*reconstruction_loss + lam*kl_loss)

    vae.add_loss(loss(inputs, outputs))
    print("seq_vae构成")
    vae.summary()
    return vae


if __name__ == "__main__":

    print("="*20+"preparating the data..."+"="*20)

    x_train, x_test = prepare_data()
    print("="*20+"summary of this model"+"="*20)
    seq_vae = seq_vae()
    
    seq_vae.compile(optimizer="amsgrad")#adam(lr=0.002))
    # 学习
    seq_vae.fit(x_train,
                epochs=100,
                batch_size=64,
                shuffle=True,
                validation_split=0.1,
                callbacks=[])
                # callbacks=[TensorBoard(log_dir="./MNIST_gru_vae/"), EarlyStopping(patience=3)])

    seq_vae.save('./MNIST_gru_vae/MNIST_gru_vae.h5')
    encoder=K.function([seq_vae.input],[seq_vae.get_layer("z_mean").output])
    decoder=K.function([seq_vae.get_layer("z").output],[seq_vae.layers[-1].output])
    # 推断
    x_pred = seq_vae.predict(x_test)

    import matplotlib.pyplot as plt
    # 预测对应的结果
    for true, pred in zip(x_test[::500], x_pred[::500]):
        plt.matshow(true)

        plt.matshow(pred)
        plt.show()



Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz
reconstruction KerasTensor(type_spec=TensorSpec(shape=(), dtype=tf.float32, name=None), name='tf.math.reduce_mean/Mean:0', description="created by layer 'tf.math.reduce_mean'") kl KerasTensor(type_spec=TensorSpec(shape=(None,), dtype=tf.float32, name=None), name='tf.math.multiply/Mul:0', description="created by layer 'tf.math.multiply'")
seq_vaeの構成
Model: "seq_vae"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_1 (InputLayer)           [(None, 28, 28)]     0           []                               
                                                                                                  
 cu_dnngru (CuDNNGRU)           (None, 40)           8400        ['input_1[0][0]']                
                                             

ValueError: ignored