<a href="https://colab.research.google.com/github/ailunguo/Test/blob/main/%E9%9D%A2%E5%90%91%E7%A0%94%E7%A9%B6%E4%BA%BA%E5%91%98Keras.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import tensorflow as tf
import keras

## 张量

In [None]:
# 常数张量
x = tf.constant([[5, 2], [1, 3]])
x

<tf.Tensor: shape=(2, 2), dtype=int32, numpy=
array([[5, 2],
       [1, 3]], dtype=int32)>

In [None]:
x.numpy()

array([[5, 2],
       [1, 3]], dtype=int32)

In [None]:
print("dtype:", x.dtype)
print("shape:", x.shape)

dtype: <dtype: 'int32'>
shape: (2, 2)


In [None]:
print(tf.ones(shape=(2, 1)))
print(tf.zeros(shape=(2, 1)))

tf.Tensor(
[[1.]
 [1.]], shape=(2, 1), dtype=float32)
tf.Tensor(
[[0.]
 [0.]], shape=(2, 1), dtype=float32)


In [None]:
# 创建随机常数张量
x = tf.random.normal(shape=(2, 2), mean=0.0, stddev=1.0)

x = tf.random.uniform(shape=(2, 2), minval=0, maxval=10, dtype='int32')

## 变量

In [None]:
# 变量是用于存储可变状态(例如神经网络的权重)的特殊张量。
initial_value = tf.random.normal(shape=(2, 2))
a = tf.Variable(initial_value)
print(a)

<tf.Variable 'Variable:0' shape=(2, 2) dtype=float32, numpy=
array([[ 0.8556467 , -0.9760387 ],
       [ 0.11634047,  0.917724  ]], dtype=float32)>


In [None]:
# Variable可以使用.assign(value),.assign_add(increment)来更新值.assign_sub(decrement)
new_value = tf.random.normal(shape=(2, 2))
a.assign(new_value)
for i in range(2):
  for j in range(2):
    assert a[i, j] == new_value[i, j]

added_value = tf.random.normal(shape=(2, 2))
a.assign_add(added_value)
for i in range(2):
  for j in range(2):
    assert a[i, j] == new_value[i, j] + added_value[i, j]

## 在Tensorflow中的数学

In [None]:
a = tf.random.normal(shape=(2, 2))
b = tf.random.normal(shape=(2, 2))

c = a + b
d = tf.square(c)
e = tf.exp(d)

In [None]:
# 可微
a = tf.random.normal(shape=(2, 2))
b = tf.random.normal(shape=(2, 2))

with tf.GradientTape() as tape:
  tape.watch(a)
  c = tf.sqrt(tf.square(a) + tf.square(b))
  dc_da = tape.gradient(c, a)
  print(dc_da)

tf.Tensor(
[[ 0.6820381   0.9994569 ]
 [-0.67598087 -0.59693843]], shape=(2, 2), dtype=float32)


In [None]:
a = tf.Variable(a)

with tf.GradientTape() as tape:
  c = tf.sqrt(tf.square(a) + tf.square(b))
  dc_da = tape.gradient(c, a)
  print(dc_da)

tf.Tensor(
[[ 0.6820381   0.9994569 ]
 [-0.67598087 -0.59693843]], shape=(2, 2), dtype=float32)


In [None]:
# 通过嵌套来计算高阶导数
with tf.GradientTape() as outer_tape:
  with tf.GradientTape() as tape:
    c = tf.sqrt(tf.square(a) + tf.square(b))
    dc_da = tape.gradient(c, a)
  d2c_d2a = outer_tape.gradient(dc_da, a)
  print(d2c_d2a)

tf.Tensor(
[[0.3676372  0.00111532]
 [1.05185    0.55841327]], shape=(2, 2), dtype=float32)


## Keras层

In [None]:
class Linear(keras.layers.Layer):
  """y = w.x + b"""

  def __init__(self, units=32, input_dim=32):
    super().__init__()
    self.w = self.add_weight(
        shape=(input_dim, units),
        initializer='random_normal',
        trainable=True)
    self.b = self.add_weight(
        shape=(units,),
        initializer='zeros',
        trainable=True)

  def call(self, inputs):
    return tf.matmul(inputs, self.w) + self.b

In [None]:
linear_layer = Linear(units=4, input_dim=2)

y = linear_layer(tf.ones((2, 2)))
assert y.shape == (2, 4)

In [None]:
assert linear_layer.weights == [linear_layer.w, linear_layer.b]

## 层权重创建build(input_shape)

In [None]:
class Linear(keras.layers.Layer):
  """y = w.x + b"""

  def __init__(self, units=32):
    super().__init__()
    self.units = units

  def build(self, input_shape):
    self.w = self.add_weight(
        shape=(input_shape[-1], self.units),
        initializer='random_normal',
        trainable=True
    )
    self.b = self.add_weight(
        shape=(self.units,),
        initializer='random_normal',
        trainable=True
    )

  def call(self, inputs):
    return tf.matmul(inputs, self.w) + self.b

linear_layer = Linear(4)
y = linear_layer(tf.ones((2, 2)))
# 上述类Linear中的build()函数是在执行call()时才执行的

## 网络层的梯度

In [None]:
# Prepare a dataset
(x_train, y_train), _ = keras.datasets.mnist.load_data()
dataset = tf.data.Dataset.from_tensor_slices(
    (x_train.reshape(60000, 784).astype('float32') / 255, y_train)
)
dataset = dataset.shuffle(buffer_size=1024).batch(64)

linear_layer = Linear(10)

loss_fn = keras.losses.SparseCategoricalCrossentropy(from_logits=True)

optimizer = keras.optimizers.SGD(learning_rate=1e-3)

for step, (x, y) in enumerate(dataset):
  with tf.GradientTape() as tape:
    logits = linear_layer(x)

    loss = loss_fn(y, logits)

  gradients = tape.gradient(loss, linear_layer.trainable_weights)

  optimizer.apply_gradients(zip(gradients, linear_layer.trainable_weights))

  if step % 100 == 0:
    print('Step:',step, 'loss:', float(loss))

Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz
Step: 0 loss: 2.345146656036377
Step: 100 loss: 2.2982373237609863
Step: 200 loss: 2.1140570640563965
Step: 300 loss: 2.0613176822662354
Step: 400 loss: 1.939256191253662
Step: 500 loss: 1.8842382431030273
Step: 600 loss: 1.7714017629623413
Step: 700 loss: 1.7904040813446045
Step: 800 loss: 1.6409804821014404
Step: 900 loss: 1.6009747982025146


## 可训练和不可训练的权重

In [None]:
# 通过trainable_weights和non_trainable_weights来设置可训练和不可训练的权重
class ComputeSum(keras.layers.Layer):
  """Returns the sum of the inputs."""

  def __init__(self, input_dim):
    super().__init__()
    self.total = self.add_weight(
        initializer='zeros',
        shape=(input_dim,),
        trainable=False
    )

  def call(self, inputs):
    self.total.assign_add(tf.reduce_sum(inputs, axis=0))
    return self.total

my_sum = ComputeSum(2)
x = tf.ones((2, 2))

y = my_sum(x)
print(y.numpy())

[2. 2.]


## 嵌套层

In [None]:
# 多次利用Linear这个类

class MLP(keras.layers.Layer):
  """简单的堆叠层"""

  def __init__(self):
    super().__init__()
    self.linear_1 = Linear(32)
    self.linear_2 = Linear(32)
    self.linear_3 = Linear(10)

  def call(self, inputs):
    x = self.linear_1(inputs)
    x = tf.nn.relu(x)
    x = self.linear_2(x)
    x = tf.nn.relu(x)
    return self.linear_3(x)

mlp = MLP()
y = mlp(tf.ones(shape=(3, 64)))
assert len(mlp.weights) == 6

In [None]:
len(mlp.weights)

6

In [None]:
# 上面的MLP类，相当于下面的
mlp = keras.Sequential(
    [keras.layers.Dense(32, activation=tf.nn.relu),
     keras.layers.Dense(32, activation=tf.nn.relu),
     keras.layers.Dense(10),]
)

In [None]:
# len(mlp.weights) # 创建完直接看权重的个数是看不到的，因为build()还没有执行
y = mlp(tf.ones(shape=(3, 64)))
len(mlp.weights)

6

## 追踪各层造成的损失

In [None]:
# 创建正则化损失的层
class ActivityRegularization(keras.layers.Layer):
  """Layer that creates an activity sparsity regularization loss"""

  def __init__(self, rate=1e-2):
    super().__init__()
    self.rate = rate

  def call(self, inputs):
    self.add_loss(self.rate * tf.reduce_sum(inputs))
    return inputs

In [None]:
# 包含该层的任何模型都将跟踪此正则化损失
class SparseMLP(keras.layers.Layer):

  def __init__(self):
    super().__init__()
    self.linear_1 = Linear(32)
    self.regularization = ActivityRegularization(1e-2)
    self.linear_3 = Linear(10)

  def call(self, inputs):
    x = self.linear_1(inputs)
    x = tf.nn.relu(x)
    x = self.regularization(x)
    return self.linear_3(x)

mlp = SparseMLP()
y = mlp(tf.ones((10, 10)))
print(mlp.losses)

[<tf.Tensor: shape=(), dtype=float32, numpy=0.20146622>]


In [None]:
# 这些损失会在每次向前传播开始时由顶层清除，它们不会积累。layer.losser始终仅包含最后一次前向传播期间的损失

mlp = SparseMLP()
mlp(tf.ones((10, 10)))
assert len(mlp.losses) == 1
mlp(tf.ones((10, 10)))
assert len(mlp.losses) == 1

# 接下来，用这些损失在训练中

(x_train, y_train), _ = keras.datasets.mnist.load_data()
dataset = tf.data.Dataset.from_tensor_slices(
    (x_train.reshape(60000, 784).astype('float32') / 255, y_train)
)
dataset = dataset.shuffle(buffer_size=1024).batch(64)

mlp = SparseMLP()

loss_fn = keras.losses.SparseCategoricalCrossentropy(from_logits=True)
optimizer = keras.optimizers.SGD(learning_rate=1e-3)

for step, (x, y) in enumerate(dataset):
  with tf.GradientTape() as tape:
    logits = mlp(x)
    loss = loss_fn(y, logits)
    loss += sum(mlp.losses) # 将正则化损失项加入到损失中
    gradients = tape.gradient(loss, mlp.trainable_weights)

  optimizer.apply_gradients(zip(gradients, mlp.trainable_weights))

  if step % 100 == 0:
    print("step:", step, "Loss:", float(loss))

step: 0 Loss: 5.145801067352295
step: 100 Loss: 2.561513900756836
step: 200 Loss: 2.4049787521362305
step: 300 Loss: 2.3597991466522217
step: 400 Loss: 2.3327791690826416
step: 500 Loss: 2.3470520973205566
step: 600 Loss: 2.3159615993499756
step: 700 Loss: 2.33622407913208
step: 800 Loss: 2.3366894721984863
step: 900 Loss: 2.334726572036743


## 跟踪训练指标

In [None]:
# 在keras中提供了广泛的内置指标,keras.metrics.AUC, keras.metrics.PrecisionAtRecall等
# 下面是一个简单的例子

accuracy = keras.metrics.SparseCategoricalAccuracy()

model = keras.Sequential(
    [
        keras.layers.Dense(32, activation='relu'),
        keras.layers.Dense(32, activation='relu'),
        keras.layers.Dense(10),
    ]
)
loss_fn = keras.losses.SparseCategoricalCrossentropy(from_logits=True)
optimizer = keras.optimizers.Adam(learning_rate=1e-3)

for epoch in range(2):
  for step, (x, y) in enumerate(dataset):
    with tf.GradientTape() as tape:
      logits = model(x)
      loss_value = loss_fn(y, logits)
    accuracy.update_state(y, logits)

    gradients = tape.gradient(loss_value, model.trainable_weights)
    optimizer.apply_gradients(zip(gradients, model.trainable_weights))

    if step % 200 == 0:
      print("Epoch:", epoch, "Step:", step)
      print("Total running accuracy so far: %.3f" % accuracy.result())

  accuracy.reset_state()

Epoch: 0 Step: 0
Total running accuracy so far: 0.094
Epoch: 0 Step: 200
Total running accuracy so far: 0.755
Epoch: 0 Step: 400
Total running accuracy so far: 0.830
Epoch: 0 Step: 600
Total running accuracy so far: 0.861
Epoch: 0 Step: 800
Total running accuracy so far: 0.878
Epoch: 1 Step: 0
Total running accuracy so far: 0.953
Epoch: 1 Step: 200
Total running accuracy so far: 0.943
Epoch: 1 Step: 400
Total running accuracy so far: 0.945
Epoch: 1 Step: 600
Total running accuracy so far: 0.945
Epoch: 1 Step: 800
Total running accuracy so far: 0.945


In [None]:
# 还可以通过子类来定义自己的指标keras.metrics.Metric
# 实现F1分数指标,支持样本加权
class F1Score(keras.metrics.Metric):
  def __init__(self, name='f1_score', dtype='float32', threshold=0.5, **kwargs):
    super().__init__(name=name, dtype=dtype, **kwargs)
    self.threshold = 0.5
    self.true_positives = self.add_weight(
        name='tp', dtype=dtype, initializer='zeros'
    )
    self.false_positives = self.add_weight(
        name='fp', dtype=dtype, initializer='zeros'
    )
    self.false_negatives = self.add_weight(
        name='fn', dtype=dtype, initializer='zeros'
    )

  def update_state(self, y_true, y_pred, sample_weight=None):
    y_pred = tf.math.greater_equal(y_pred, self.threshold)
    y_true = tf.cast(y_true, tf.bool)
    y_pred = tf.cast(y_pred, tf.bool)

    true_positives = tf.cast(y_true & y_pred, self.dtype)
    false_positives = tf.cast(~y_true & y_pred, self.dtype)
    false_negatives = tf.cast(y_true & ~y_pred, self.dtype)

    if sample_weight is not None:
      sample_weight = tf.cast(sample_weight, self.dtype)
      true_positives *= sample_weight
      false_positives *= sample_weight
      false_negatives *= sample_weight

    self.true_positives.assign_add(tf.reduce_sum(true_positives))
    self.false_positives.assign_add(tf.reduce_sum(false_positives))
    self.false_negatives.assign_add(tf.reduce_sum(false_negatives))

  def result(self):
    precision = self.true_positives / (self.true_positives + self.false_positives)
    recall = self.true_positives / (self.true_positives + self.false_negatives)
    return precision * recall * 2.0 / (precision + recall)

  def reset_state(self):
    self.true_positives.assign(0)
    self.false_positives.assign(0)
    self.false_negatives.assign(0)

In [None]:
m = F1Score()
m.update_state([0, 1, 0, 0], [0.3, 0.5, 0.8, 0.9])
print("Intermediate result:", float(m.result()))

m.update_state([1,1,1,1], [0.1,0.7,0.6,0.0])
print("Final result:", float(m.result()))

Intermediate result: 1.0
Final result: 0.6000000238418579


In [None]:
pred = tf.cast([1,1,0,0], tf.bool)
true = tf.cast([1,0,0,0], tf.bool)
tf.cast(pred & true, 'float32')

<tf.Tensor: shape=(4,), dtype=float32, numpy=array([1., 0., 0., 0.], dtype=float32)>

In [None]:
tf.reduce_sum([1,1,0,0])

<tf.Tensor: shape=(), dtype=int32, numpy=2>

## 编译函数

In [None]:
# 急切运行对调试来说非常有用，但是通过将计算编译成静态图，将获得更好的性能
# 静态图是研究人员最好的朋友，可以通过将任何函数包装在tf.function装饰器中来编译它

model = keras.Sequential(
    [
        keras.layers.Dense(32, activation='relu'),
        keras.layers.Dense(32, activation='relu'),
        keras.layers.Dense(10),
    ]
)
loss_fn = keras.losses.SparseCategoricalCrossentropy(from_logits=True)
optimizer = keras.optimizers.Adam(learning_rate=1e-3)

@tf.function # Make it fast
def train_on_batch(x, y):
  with tf.GradientTape() as tape:
    logits = model(x)
    loss = loss_fn(y, logits)
    gradients = tape.gradient(loss, model.trainable_weights)
  optimizer.apply_gradients(zip(gradients, model.trainable_weights))
  return loss

# Prepare a dataset
(x_train, y_train),_ = keras.datasets.mnist.load_data()
dataset = tf.data.Dataset.from_tensor_slices(
    (x_train.reshape(60000, 784).astype('float32') / 255, y_train)
)
dataset = dataset.shuffle(buffer_size=1024).batch(64)

for step, (x, y) in enumerate(dataset):
  loss = train_on_batch(x, y) # a fast process
  if step % 100 == 0:
    print("Step:", step, "Loss:", float(loss))

Step: 0 Loss: 2.294524908065796
Step: 100 Loss: 0.5501132011413574
Step: 200 Loss: 0.5192018747329712
Step: 300 Loss: 0.3526468575000763
Step: 400 Loss: 0.1700807809829712
Step: 500 Loss: 0.29131579399108887
Step: 600 Loss: 0.37472790479660034
Step: 700 Loss: 0.36151182651519775
Step: 800 Loss: 0.2379852533340454
Step: 900 Loss: 0.18078218400478363


## 训练模式和推理模式

In [None]:
# 有些层，特别是BatchNormalization层和Dropout层,在训练和推理过程中具有不同的行为
# 对于此类层，标准做法是在方法中公开training(布尔)参数call
# 通过公开参数call，可以启用内置训练和评估循环(例如拟合)以在训练和推理模式中正确使用该层

class Dropout(keras.layers.Layer):
  def __init__(self, rate):
    super().__init__()
    self.rate = rate

  def call(self, inputs, training=None):
    if training:
      return tf.nn.dropout(inputs, rate=self.rate)
    return inputs

class MLPWithDropout(keras.layers.Layer):
  def __init__(self):
    super().__init__()
    self.linear_1 = Linear(32)
    self.dropout = Dropout(0.5)
    self.linear_3 = Linear(10)

  def call(self, inputs, training=None):
    x = self.linear_1(inputs)
    x = tf.nn.relu(x)
    x = self.dropout(x, training=training)
    return self.linear_3(x)

mlp = MLPWithDropout()
y_train = mlp(tf.ones((2, 2)), training=True)
y_test = mlp(tf.ones((2, 2)), training=False)

# 用于模型构建的函数式API

In [None]:
# 要构建深度学习模型，，您不必一直使用面向对象编程，目前为止我们看到的所有层也可以按功能组合

# We use an "Input" object to describe the shape and dtype of the inputs
# This is the deep learning equivalent of *declaring a type*
# The shape argument is per-sample; it does not include the batch size
# The functional API focused on defining per-sample transformations
# The model we create will automatically batch the per-sample transformations
# so that it can be called on batched of data
inputs = keras.Input(shape=(16,), dtype='float32')

# We call layers on these 'type' objects
# and they return updated types (new shapes/dtypes)
x = Linear(32)(inputs)
x = Dropout(0.5)(x)
outputs = Linear(10)(x)

# A functional 'Model' can be defined by specifying inputs and outputs
# A model is itself a layer like any other
model = keras.Model(inputs, outputs)
assert len(model.weights) == 4

y = model(tf.ones((2, 16)))
assert y.shape == (2, 10)

y = model(tf.ones((2, 16)), training=True)

## 内置的训练和评估循环

In [None]:
inputs = keras.Input(shape=(784,), dtype='float32')
x = keras.layers.Dense(32, activation='relu')(inputs)
x = keras.layers.Dense(32, activation='relu')(x)
outputs = keras.layers.Dense(10)(x)
model = keras.Model(inputs, outputs)

# Specify the loss, optimizer, and metrics with 'compile()'
model.compile(
    loss = keras.losses.SparseCategoricalCrossentropy(from_logits=True),
    optimizer = keras.optimizers.Adam(learning_rate=1e-3),
    metrics = [keras.metrics.SparseCategoricalAccuracy()],
)

# Train the model with the dataset for 2 epochs
model.fit(dataset, epochs=2)
model.predict(dataset)
model.evaluate(dataset)

Epoch 1/2
Epoch 2/2


[0.1586298793554306, 0.9526166915893555]

In [None]:
# 如果想利用面向对象模型的内置训练循环，可以对该Model类进行子类化
# Layer只需重写即可
class CustomModel(keras.Model):
  def __init__(self, *args, **kwargs):
    super().__init__(*args, **kwargs)
    self.loss_tracker = keras.metrics.Mean(name='loss')
    self.accuracy = keras.metrics.SparseCategoricalAccuracy()
    self.loss_fn = keras.losses.SparseCategoricalCrossentropy(from_logits=True)
    self.optimizer = keras.optimizers.Adam(learning_rate=1e-3)

  def train_step(self, data):
    x, y = data
    with tf.GradientTape() as tape:
      y_pred = self(x, training=True)
      loss = self.loss_fn(y, y_pred)
    gradients = tape.gradient(loss, self.trainable_weights)
    self.optimizer.apply_gradients(zip(gradients, self.trainable_weights))
    self.loss_tracker.update_state(loss)
    self.accuracy.update_state(y, y_pred)
    return {'loss': self.loss_tracker.result(), 'accuracy': self.accuracy.result()}

  @property
  def metrics(self):
    return [self.loss_tracker, self.accuracy]

inputs = keras.Input(shape=(784,), dtype='float32')
x = keras.layers.Dense(32, activation='relu')(inputs)
x = keras.layers.Dense(32, activation='relu')(x)
outputs = keras.layers.Dense(10)(x)
model = CustomModel(inputs, outputs)
model.compile()
model.fit(dataset, epochs=2)

Epoch 1/2
Epoch 2/2


<keras.src.callbacks.History at 0x7bbcfb06d7e0>

In [None]:
# 通过@property装饰器使metrics为一个属性
model.metrics

[<keras.src.metrics.base_metric.Mean at 0x7bbd02c33040>,
 <keras.src.metrics.accuracy_metrics.SparseCategoricalAccuracy at 0x7bbd02c33d90>]

## 端到端实验示例1: 变分自动编码器

In [3]:
from tensorflow.keras import layers

class Sampling(layers.Layer):
  """Uses (z_mean, z_log_var) to sample z, the vector encoding a digit."""

  def call(self, inputs):
    z_mean, z_log_var = inputs
    batch = tf.shape(z_mean)[0]
    dim = tf.shape(z_mean)[1]
    # 创建一个均值为0，方差为1的正态分布张量
    epsilon = keras.backend.random_normal(shape=(batch, dim))
    return z_mean + tf.exp(0.5 * z_log_var) * epsilon

class Encoder(layers.Layer):
  """Maps MNIST digits to a triplet (z_mean, z_los_var, z)"""

  def __init__(self, latent_dim=32, intermediate_dim=64, **kwargs):
    super().__init__(**kwargs)
    self.dense_proj = layers.Dense(intermediate_dim, activation=tf.nn.relu)
    self.dense_mean = layers.Dense(latent_dim)
    self.dense_log_var = layers.Dense(latent_dim)
    self.sampling = Sampling()

  def call(self, inputs):
    x = self.dense_proj(inputs)
    z_mean = self.dense_mean(x)
    z_log_var = self.dense_log_var(x)
    z = self.sampling((z_mean, z_log_var))
    return z_mean, z_log_var, z

In [4]:
# 接下来，我们有一个Decoder类，它将概率潜在空间坐标映射回MNIST数字
class Decoder(layers.Layer):
  """Converts z, the encoded digit vector, back into a readable digit."""

  def __init__(self, original_dim, intermediate_dim=64, **kwargs):
    super().__init__(**kwargs)
    self.dense_proj = layers.Dense(intermediate_dim, activation=tf.nn.relu)
    self.dense_output = layers.Dense(original_dim, activation=tf.nn.sigmoid)

  def call(self, inputs):
    x = self.dense_proj(inputs)
    return self.dense_output(x)

In [5]:
# 将编码器和解码器组合在一起
class VariationalAutoEncoder(layers.Layer):
  """Combines the encoder and decoder into an end-to-end model for training."""

  def __init__(self, original_dim, intermediate_dim=64, latent_dim=32, **kwargs):
    super().__init__(**kwargs)
    self.original_dim = original_dim
    self.encoder = Encoder(latent_dim=latent_dim, intermediate_dim=intermediate_dim)
    self.decoder = Decoder(original_dim, intermediate_dim=intermediate_dim)

  def call(self, inputs):
    z_mean, z_log_var, z = self.encoder(inputs)
    reconstructed = self.decoder(z)

    # Add KL divergence regularization loss
    kl_loss = -0.5 * tf.reduce_mean(
        z_log_var - tf.square(z_mean) - tf.exp(z_log_var) + 1
    )
    self.add_loss(kl_loss)
    return reconstructed

In [6]:
# 编写一个训练循环，我们的训练步骤用来@tf.function编译成超快速的图形函数
# Our model
vae = VariationalAutoEncoder(original_dim=784, intermediate_dim=64, latent_dim=32)

# Loss and optimizer
loss_fn = keras.losses.MeanSquaredError()
optimizer = keras.optimizers.Adam(learning_rate=1e-3)

# Prepare a dataset
(x_train, _), _ = keras.datasets.mnist.load_data()
dataset = tf.data.Dataset.from_tensor_slices(
    x_train.reshape(60000, 784).astype('float32') / 255
)
dataset = dataset.shuffle(buffer_size=1024).batch(32)

@tf.function
def training_step(x):
  with tf.GradientTape() as tape:
    reconstructed = vae(x)
    # Compute loss
    loss = loss_fn(x, reconstructed)
    loss += sum(vae.losses)
  # Update the weights of the VAE
  grads = tape.gradient(loss, vae.trainable_weights)
  optimizer.apply_gradients(zip(grads, vae.trainable_weights))
  return loss

losses = []
for step, x in enumerate(dataset):
  loss = training_step(x)
  # Logging
  losses.append(float(loss))
  if step % 100 == 0:
    print("Step:", step, "Loss:", sum(losses) / len(losses))

  # Stop after 1000 steps
  # Training the model to convergence is left
  # as an exercise to the reader
  if step >= 1000:
    break

Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz
Step: 0 Loss: 0.3274182379245758
Step: 100 Loss: 0.12662266644805964
Step: 200 Loss: 0.10012252639923523
Step: 300 Loss: 0.08984106987021691
Step: 400 Loss: 0.08494694289423878
Step: 500 Loss: 0.08168212032157504
Step: 600 Loss: 0.0792424521298952
Step: 700 Loss: 0.07787219502467571
Step: 800 Loss: 0.07664873064978144
Step: 900 Loss: 0.07570356083051875
Step: 1000 Loss: 0.07475208267048522
