In [3]:
import tensorflow as tf
from tensorflow.keras import layers

In [8]:
class Linear(layers.Layer):
    '''
    Dense layer
    '''
    def __init__(self, units=32, input_dim=32):
        super(Linear, self).__init__()
        # 初始化权重参数
        w_init = tf.random_normal_initializer()
        # 权重w占位符
        self.w = tf.Variable(initial_value=w_init(shape=(input_dim, units),
                                                 dtype='float32'),
                            trainable=True)
        # 初始化偏置参数
        b_init = tf.zeros_initializer()
        # 偏置b占位符
        self.b = tf.Variable(initial_value=b_init(shape=(units,),
                                                 dtype='float32'),
                            trainable=True)
    # 通过回调函数计算    
    def call(self, inputs):
        return tf.matmul(inputs, self.w) + self.b # matmul 矩阵乘法
        

x = tf.ones((2,2))
linear_layer = Linear(4,2)
y = linear_layer(x)
print(y)

tf.Tensor(
[[ 0.06485754 -0.11709414  0.10522978  0.04428321]
 [ 0.06485754 -0.11709414  0.10522978  0.04428321]], shape=(2, 4), dtype=float32)


In [9]:
# assert（断言）用于判断一个表达式，在表达式条件为 false 的时候触发异常。
assert linear_layer.weights == [linear_layer.w, linear_layer.b]

In [10]:
class Linear(layers.Layer):

    def __init__(self, units=32, input_dim=32):
        super(Linear, self).__init__()
        self.w = self.add_weight(shape=(input_dim, units),
                                 initializer='random_normal',
                                 trainable=True)
        self.b = self.add_weight(shape=(units,),
                                 initializer='zeros',
                                 trainable=True)

    def call(self, inputs):
        return tf.matmul(inputs, self.w) + self.b

x = tf.ones((2, 2))
linear_layer = Linear(4, 2)
y = linear_layer(x)
print(y)

tf.Tensor(
[[-0.04406109  0.09093019 -0.03715542  0.04563867]
 [-0.04406109  0.09093019 -0.03715542  0.04563867]], shape=(2, 4), dtype=float32)


In [11]:
class ComputeSum(layers.Layer):

    def __init__(self, input_dim):
        super(ComputeSum, self).__init__()
        self.total = tf.Variable(initial_value=tf.zeros((input_dim,)),
                                 trainable=False)

    def call(self, inputs):
        # .assign_add(delta, use_locking=False, name=None, read_value=True) 属于 tf.Variable 的一个方法；
        # 实现向变量delta(tensor)添加值。
        self.total.assign_add(tf.reduce_sum(inputs, axis=0))
        return self.total

x = tf.ones((2, 2))
my_sum = ComputeSum(2)
y = my_sum(x)
print(y.numpy())
y = my_sum(x)
print(y.numpy())

[2. 2.]
[4. 4.]


In [12]:
print('weights:', len(my_sum.weights))
print('non-trainable weights:', len(my_sum.non_trainable_weights))

print('trainable_weights:', my_sum.trainable_weights)

weights: 1
non-trainable weights: 1
trainable_weights: []


In [13]:
class Linear(layers.Layer):

    def __init__(self, units=32, input_dim=32):
        super(Linear, self).__init__()
        self.w = self.add_weight(shape=(input_dim, units),
                                        initializer='random_normal',
                                        trainable=True)
        self.b = self.add_weight(shape=(units,),
                                 initializer='zeros',
                                 trainable=True)
        

In [14]:
class Linear(layers.Layer):

    def __init__(self, units=32):
        super(Linear, self).__init__()
        self.units = units

    def build(self, input_shape):
        
        self.w = self.add_weight(shape=(input_shape[-1], self.units),
                                 initializer='random_normal',
                                 trainable=True)
        self.b = self.add_weight(shape=(self.units,),
                                 initializer='random_normal',
                                 trainable=True)

    def call(self, inputs):
        return tf.matmul(inputs, self.w) + self.b

In [15]:
# 在实例化时，不知道该调用什么输入
linear_layer = Linear(32)
# 首次调用该层时将动态创建该层的权重
y = linear_layer(x)

In [19]:
y

<tf.Tensor: shape=(2, 32), dtype=float32, numpy=
array([[ 0.16962165, -0.10605951, -0.0124007 ,  0.05497382, -0.04780749,
        -0.00888127, -0.0770494 ,  0.21947056,  0.09034248, -0.05703586,
        -0.0060977 , -0.07365507,  0.07659768, -0.01544504, -0.04743483,
        -0.04726882,  0.10706575,  0.07933822,  0.00366602,  0.0726752 ,
         0.09143378,  0.02786283, -0.01465205,  0.10379285,  0.17563567,
         0.02788435, -0.05976434,  0.02714855,  0.08858202,  0.02146289,
        -0.02282026,  0.05422783],
       [ 0.16962165, -0.10605951, -0.0124007 ,  0.05497382, -0.04780749,
        -0.00888127, -0.0770494 ,  0.21947056,  0.09034248, -0.05703586,
        -0.0060977 , -0.07365507,  0.07659768, -0.01544504, -0.04743483,
        -0.04726882,  0.10706575,  0.07933822,  0.00366602,  0.0726752 ,
         0.09143378,  0.02786283, -0.01465205,  0.10379285,  0.17563567,
         0.02788435, -0.05976434,  0.02714855,  0.08858202,  0.02146289,
        -0.02282026,  0.05422783]], dtyp

In [None]:
class MLPBlock(layers.Layer):

    def __init__(self):
        super(MLPBlock, self).__init__()
        # 实例化子类层
        self.linear_1 = Linear(32)
        self.linear_2 = Linear(32)
        self.linear_3 = Linear(1)

    def call(self, inputs):
        x = self.linear_1(inputs)
        x = tf.nn.relu(x)
        x = self.linear_2(x)
        x = tf.nn.relu(x)
        return self.linear_3(x)


mlp = MLPBlock()
# 首次调用mlp将创建权重
y = mlp(tf.ones(shape=(3, 64))) 
print('weights:', len(mlp.weights))
print('trainable weights:', len(mlp.trainable_weights))

In [20]:
# 动态变化正则化层
class ActivityRegularizationLayer(layers.Layer):

    def __init__(self, rate=1e-2):
        super(ActivityRegularizationLayer, self).__init__()
        self.rate = rate

    def call(self, inputs):
        self.add_loss(self.rate * tf.reduce_sum(inputs))
        return inputs

In [21]:
class OuterLayer(layers.Layer):

    def __init__(self):
        super(OuterLayer, self).__init__()
        self.activity_reg = ActivityRegularizationLayer(1e-2)

    def call(self, inputs):
        return self.activity_reg(inputs)

layer = OuterLayer()
assert len(layer.losses) == 0  # 由于从未调用过该层，因此尚未计算任何损失

_ = layer(tf.zeros(1, 1))
assert len(layer.losses) == 1  # 通过实例化，产生了损失

# 在每次__call__的开始时都会重置`layer.losses`
_ = layer(tf.zeros(1, 1))
assert len(layer.losses) == 1  # 这是调用时产生的损失

In [22]:
class OuterLayer(layers.Layer):

    def __init__(self):
        super(OuterLayer, self).__init__()
        self.dense = layers.Dense(32, kernel_regularizer=tf.keras.regularizers.l2(1e-3))

    def call(self, inputs):
        return self.dense(inputs)


layer = OuterLayer()
_ = layer(tf.zeros((1, 1)))

# 这是由上面的`kernel_regularizer`创建的损失`1e-3 * sum（layer.dense.kernel ** 2）`，。
print(layer.losses)

[<tf.Tensor: shape=(), dtype=float32, numpy=0.0020331577>]


In [24]:
# I实例化SGD优化器
optimizer = tf.keras.optimizers.SGD(learning_rate=1e-3)
# 定义损失
loss_fn = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)

# 批处理遍历数据集
for x_batch_train, y_batch_train in train_dataset:
    with tf.GradientTape() as tape:
        logits = layer(x_batch_train)
        # 批数据的损失
        loss_value = loss_fn(y_batch_train, logits)
        # 添加在此向前传递过程中创建的额外损失：
        loss_value += sum(model.losses)

    grads = tape.gradient(loss_value, model.trainable_weights)
    optimizer.apply_gradients(zip(grads, model.trainable_weights))

NameError: name 'train_dataset' is not defined

In [25]:
class Linear(layers.Layer):

    def __init__(self, units=32):
        super(Linear, self).__init__()
        self.units = units

    def build(self, input_shape):
        self.w = self.add_weight(shape=(input_shape[-1], self.units),
                                 initializer='random_normal',
                                 trainable=True)
        self.b = self.add_weight(shape=(self.units,),
                                 initializer='random_normal',
                                 trainable=True)

    def call(self, inputs):
        return tf.matmul(inputs, self.w) + self.b

    def get_config(self):
        return {'units': self.units}


# 修改默认参数值，重新创建该层：
layer = Linear(64)
config = layer.get_config()
print(config)
new_layer = Linear.from_config(config)

{'units': 64}


In [26]:
class CustomDropout(layers.Layer):

    def __init__(self, rate, **kwargs):
        super(CustomDropout, self).__init__(**kwargs)
        self.rate = rate

    def call(self, inputs, training=None):
        if training:
            return tf.nn.dropout(inputs, rate=self.rate)
        return inputs

In [None]:
class ResNet(tf.keras.Model):

    def __init__(self):
        super(ResNet, self).__init__()
        self.block_1 = ResNetBlock()
        self.block_2 = ResNetBlock()
        self.global_pool = layers.GlobalAveragePooling2D()
        self.classifier = Dense(num_classes)

    def call(self, inputs):
        x = self.block_1(inputs)
        x = self.block_2(x)
        x = self.global_pool(x)
        return self.classifier(x)


resnet = ResNet()
dataset = ...
resnet.fit(dataset, epochs=10)
resnet.save_weights(filepath)

In [27]:
class Sampling(layers.Layer):
    '''
    将z重采样为（z_mean，z_log_var）向量，并用该向量编码一个数字。
    '''
    def call(self, inputs):
        z_mean, z_log_var = inputs
        batch = tf.shape(z_mean)[0]
        dim = tf.shape(z_mean)[1]
        epsilon = tf.keras.backend.random_normal(shape=(batch, dim))
        return z_mean + tf.exp(0.5 * z_log_var) * epsilon


class Encoder(layers.Layer):
    '''
    将MNIST数字映射到三元组（z_mean，z_log_var，z）
    '''

    def __init__(self,
                 latent_dim=32,
                 intermediate_dim=64,
                 name='encoder',
                 **kwargs):
        
        super(Encoder, self).__init__(name=name, **kwargs)
        self.dense_proj = layers.Dense(intermediate_dim, activation='relu')
        self.dense_mean = layers.Dense(latent_dim)
        self.dense_log_var = layers.Dense(latent_dim)
        self.sampling = Sampling()

    def call(self, inputs):
        x = self.dense_proj(inputs)
        z_mean = self.dense_mean(x)
        z_log_var = self.dense_log_var(x)
        z = self.sampling((z_mean, z_log_var))
        return z_mean, z_log_var, z


class Decoder(layers.Layer):
    '''
    将编码的数字矢量z转换回可读的数字。
    '''

    def __init__(self,
                 original_dim,
                 intermediate_dim=64,
                 name='decoder',
                 **kwargs):
        
        super(Decoder, self).__init__(name=name, **kwargs)
        self.dense_proj = layers.Dense(intermediate_dim, activation='relu')
        self.dense_output = layers.Dense(original_dim, activation='sigmoid')

    def call(self, inputs):
        x = self.dense_proj(inputs)
        return self.dense_output(x)


class VariationalAutoEncoder(tf.keras.Model):
    '''
    将编码器和解码器组合为端到端模型以进行训练。
    '''

    def __init__(self,
                 original_dim,
                 intermediate_dim=64,
                 latent_dim=32,
                 name='autoencoder',
                 **kwargs):
        super(VariationalAutoEncoder, self).__init__(name=name, **kwargs)
        self.original_dim = original_dim
        self.encoder = Encoder(latent_dim=latent_dim,
                               intermediate_dim=intermediate_dim)
        self.decoder = Decoder(original_dim, intermediate_dim=intermediate_dim)

    def call(self, inputs):
        z_mean, z_log_var, z = self.encoder(inputs)
        reconstructed = self.decoder(z)
        # 添加KL散度正则化损失
        kl_loss = - 0.5 * tf.reduce_mean(
            z_log_var - tf.square(z_mean) - tf.exp(z_log_var) + 1)
        self.add_loss(kl_loss)
        return reconstructed

In [28]:
original_dim = 784
vae = VariationalAutoEncoder(original_dim, 64, 32)

optimizer = tf.keras.optimizers.Adam(learning_rate=1e-3)
mse_loss_fn = tf.keras.losses.MeanSquaredError()

loss_metric = tf.keras.metrics.Mean()

(x_train, _), _ = tf.keras.datasets.mnist.load_data()
x_train = x_train.reshape(60000, 784).astype('float32') / 255

train_dataset = tf.data.Dataset.from_tensor_slices(x_train)
train_dataset = train_dataset.shuffle(buffer_size=1024).batch(64)

epochs = 3

for epoch in range(epochs):
    print('Start of epoch %d' % (epoch,))
    for step, x_batch_train in enumerate(train_dataset):
        with tf.GradientTape() as tape:
            reconstructed = vae(x_batch_train)
            # 计算重构损失
            loss = mse_loss_fn(x_batch_train, reconstructed)
            loss += sum(vae.losses)  # 添加KL散度正则化损失

        grads = tape.gradient(loss, vae.trainable_weights)
        optimizer.apply_gradients(zip(grads, vae.trainable_weights))

        loss_metric(loss)

        if step % 100 == 0:
            print('step %s: mean loss = %s' % (step, loss_metric.result()))


Start of epoch 0
step 0: mean loss = tf.Tensor(0.2939159, shape=(), dtype=float32)
step 100: mean loss = tf.Tensor(0.12394193, shape=(), dtype=float32)
step 200: mean loss = tf.Tensor(0.09826896, shape=(), dtype=float32)
step 300: mean loss = tf.Tensor(0.088605046, shape=(), dtype=float32)
step 400: mean loss = tf.Tensor(0.08379695, shape=(), dtype=float32)
step 500: mean loss = tf.Tensor(0.080534376, shape=(), dtype=float32)
step 600: mean loss = tf.Tensor(0.07844055, shape=(), dtype=float32)
step 700: mean loss = tf.Tensor(0.07689294, shape=(), dtype=float32)
step 800: mean loss = tf.Tensor(0.07575489, shape=(), dtype=float32)
step 900: mean loss = tf.Tensor(0.07475837, shape=(), dtype=float32)
Start of epoch 1
step 0: mean loss = tf.Tensor(0.07446379, shape=(), dtype=float32)
step 100: mean loss = tf.Tensor(0.073832676, shape=(), dtype=float32)
step 200: mean loss = tf.Tensor(0.07333698, shape=(), dtype=float32)
step 300: mean loss = tf.Tensor(0.07288531, shape=(), dtype=float32)
st

In [29]:
vae = VariationalAutoEncoder(784, 64, 32)

optimizer = tf.keras.optimizers.Adam(learning_rate=1e-3)

vae.compile(optimizer, loss=tf.keras.losses.MeanSquaredError())
vae.fit(x_train, x_train, epochs=3, batch_size=64)

Train on 60000 samples
Epoch 1/3
Epoch 2/3
Epoch 3/3


<tensorflow.python.keras.callbacks.History at 0x1e26e7e6888>

In [30]:
original_dim = 784
intermediate_dim = 64
latent_dim = 32

# 定义编码器模型
original_inputs = tf.keras.Input(shape=(original_dim,), name='encoder_input')
x = layers.Dense(intermediate_dim, activation='relu')(original_inputs)
z_mean = layers.Dense(latent_dim, name='z_mean')(x)
z_log_var = layers.Dense(latent_dim, name='z_log_var')(x)
z = Sampling()((z_mean, z_log_var))
encoder = tf.keras.Model(inputs=original_inputs, outputs=z, name='encoder')

# 定义解码器模型
latent_inputs = tf.keras.Input(shape=(latent_dim,), name='z_sampling')
x = layers.Dense(intermediate_dim, activation='relu')(latent_inputs)
outputs = layers.Dense(original_dim, activation='sigmoid')(x)
decoder = tf.keras.Model(inputs=latent_inputs, outputs=outputs, name='decoder')

# 定义 VAE 模型
outputs = decoder(z)
vae = tf.keras.Model(inputs=original_inputs, outputs=outputs, name='vae')

# 添加KL散度正则化损失
kl_loss = - 0.5 * tf.reduce_mean(
    z_log_var - tf.square(z_mean) - tf.exp(z_log_var) + 1)
vae.add_loss(kl_loss)

# 训练
optimizer = tf.keras.optimizers.Adam(learning_rate=1e-3)
vae.compile(optimizer, loss=tf.keras.losses.MeanSquaredError())
vae.fit(x_train, x_train, epochs=3, batch_size=64)

Train on 60000 samples
Epoch 1/3
Epoch 2/3
Epoch 3/3


<tensorflow.python.keras.callbacks.History at 0x1e266d010c8>