In [1]:
import tensorflow as tf

In [4]:
class Sampling(tf.keras.layers.Layer):
    '''
    随机采样, 保证latent vector的分布为正态分布
    @inputs latent vector通过神经网络生成的 均值(z_mean)和方差(z_log_var) 采用log的原因是nn生成的可正可负而方差只能为正，所以采用log
    @return z 符合z_mean和z_log_var的正态分布的z的随机生成z
    
    Random Sampling, ensure that the distribution of late vector is normal distribution
    @inputs The mean value and variance in log of z generated by the neural network,
    The reason why log is used in var is that NN can generate positive and negative data, but variance can only be positive, 
    so log is used to avoid it.
    @return z In accordance with mean and variance normal distribution variable random generation Z
    '''
    def call(self, inputs):
        z_mean, z_log_var = inputs
        batch = tf.shape(z_mean)[0]
        dim = tf.shape(z_mean)[1]
        epsilon = tf.keras.backend.random_normal(shape=(batch, dim))
        # 使得生成的分布满足(z_mean, z_log) 参见正态分布的性质
        return z_mean + tf.exp(0.5*z_log_var) * epsilon

In [14]:
class Encoder(tf.keras.layers.Layer):
    '''
    VAE的encoder
    @param latent_dim z的向量大小
    @param intermediate_dim 中间层的向量大小
    
    @return z_mean, z_log_var, z z的均值, z的log方差, 符合正态分布的随机取样z
    
    The encoder of VAE
    @param latent_dim z‘s dim
    @param intermediate_dim middle layer's dim
    
    @return z_mean, z_log_var, z z's mean , z's variance in log, random sampling of z in normal distribution
    '''
    def __init__(self, latent_dim, intermediate_dim, **kwargs):
        super(Encoder, self).__init__(**kwargs)
        self.intermediate = tf.keras.layers.Dense(intermediate_dim, activation='relu')
        # a layer for generation z's mean
        # 用于生成z的均值
        self.mean = tf.keras.layers.Dense(latent_dim)
        # a layer for generation z's log variance
        # 用于生成z的log 方差
        self.log_var = tf.keras.layers.Dense(latent_dim)
        # Random Sampling of Z in normal distribution
        # z的正态分布随机生取样
        self.sample =Sampling()
    
    def call(self, inputs):
        x = self.intermediate(inputs)
        z_mean = self.mean(x)
        z_log_var = self.log_var(x)
        z = self.sample([z_mean, z_log_var])
        
        return z_mean, z_log_var, z

In [15]:
class Decoder(tf.keras.layers.Layer):
    '''
    VAE的deocder
    @param original_dim 原始向量大小
    @param intermediate_dim 中间层的向量大小
    
    @return 原始向量
    
    The decoder of VAE
    @param original_dim original vector's dim
    @param intermediate_dim middle layer's dim
    
    @return original dim
    '''
    def __init__(self, original_dim, intermediate_dim, **kwargs):
        super(Decoder, self).__init__(**kwargs)
        self.intermediate = tf.keras.layers.Dense(intermediate_dim, activation='relu')
        self.outputs = tf.keras.layers.Dense(original_dim, activation='sigmoid')
    def call(self, inputs):
        x = self.intermediate(inputs)
        return self.outputs(x)

In [16]:
ORIGINAL_DIM=3294
INTERMEDIATE_DIM=1024
LATTENT_DIM=512

In [17]:
def build_model():
    vae_input = tf.keras.layers.Input((ORIGINAL_DIM,))
    z_mean, z_log_var, z = Encoder(LATTENT_DIM, INTERMEDIATE_DIM)(vae_input)
    reconstruct_x = Decoder(ORIGINAL_DIM, INTERMEDIATE_DIM)(z)
    # reconstruct loss function of generation and original, mean squared erros is used because of continous variable generation.
    # 原始数据和生成数据的重构误差，因为为连续值生成所以使用了均方差
    reconstruction_loss = tf.keras.losses.mean_squared_error(vae_input, reconstruct_x)
    # KL loss
    # KL loss
    kl_loss = -0.5 * tf.keras.backend.sum(1 + z_log_var - tf.keras.backend.square(z_mean) - tf.keras.backend.exp(z_log_var), axis=-1)
    elbo = tf.keras.backend.mean(reconstruction_loss+kl_loss)
    #elbo = -1 *elbo
    model = tf.keras.models.Model(vae_input, reconstruct_x)
    model.add_loss(elbo)
    model.compile(optimizer=tf.keras.optimizers.Adam())
    return model

In [18]:
vae = build_model()



In [19]:
vae.summary()

Model: "model"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_2 (InputLayer)            [(None, 3294)]       0                                            
__________________________________________________________________________________________________
encoder_1 (Encoder)             ((None, 512), (None, 4423680     input_2[0][0]                    
__________________________________________________________________________________________________
decoder (Decoder)               (None, 3294)         3901662     encoder_1[0][2]                  
__________________________________________________________________________________________________
tf_op_layer_add (TensorFlowOpLa [(None, 512)]        0           encoder_1[0][1]                  
______________________________________________________________________________________________

In [21]:
encoder_output = vae.layers[1]