# 自定义训练/评估循环(自行实现 fit)

`fit` 在易用性和灵活性之间取得了非常好的平衡,但是这并不意味着 fit 可以适用于任何训练过程.即使 fit 配合自定义指标 自定义损失 自定义回调等,也无法满足全部的训练场景.

`fit` 的工作流只适用于监督学习,监督学习要求数据已经标注,之后通过训练得出模型.然而监督学习只是机器学习的一个大类,还有无监督学习,半监督学习,生成学习,强化学习等等.

如果 fit 无法满足需要,此时就需要编写自己的训练逻辑了.第 2 3 章我们见过一些简单的低层次训练逻辑.一个典型的训练循环如下

- 计算正向传播(人话就是计算模型输出),获取当前批次的损失值.
- 检索与模型权重有关的损失梯度
- 更新模型权重,以降低当前批次的损失值.

以上其实就是 fit 做的全部事情,这一节会从头重新实现 fit.


## 训练/推理

在 2 3 章的例子中

- 正向传播: `predictions = model(inputs)`
- 检索与权重相关梯度: `gradients = tape.gradient(loss, model.weights`

编写自定义 fit 这里有两个问题

一些 keras 层在训练和预测之间的行为并不相同.在前向传递调用 keras 模型时,一定要将 `training` 设置为 true.

- 例如 dropout 层在其 `call` 方法中有意训练的 bool 参数,调用 `dropout(inputs, training=True)` 将会忽略一些激活单元,但是调用 `dropout(inputs, training=Flase)` 则不会忽略.
- 这个参数在 Sequential 或 Functional 生成的模型中也存在.

检索模型的权重梯度时应该使用 `tape.gradients(loss, model.trainable_weights)` 而不是 `tape.gradients(loss, model.weights)`.实际上层和模型有两种类型的权重

- 可训练权重: 可以通过反向传播更新,以最小化模型损失.例如米基础的 kernel 和 bias.
- 不可训练权重: 前向传递过程中,由层自行维护更新的权重.例如在层增加一个批次计数器.这个信息就会存放在不可训练权重中,每个批次训练时层将计数器 +1.

keras 内置的层中,唯一具有不可训练权重的层是 BatchNormalization ,我们将在第 9 章见到.BatchNormalization 使用不可训练权重跟踪它数据的平均值和标准差,~~进行及时的特征标准化(归一化/规范化).~~(这一句翻译感觉有问题,等到第9章再说).


In [9]:
import tensorflow as tf
import tensorflow.keras as keras
import tensorflow.keras.layers as layers

In [12]:
from tensorflow.keras.datasets import mnist
(images, labels), (test_images, test_labels) = mnist.load_data()
images = images.reshape((60000, 28 * 28)).astype("float32") / 255
test_images = test_images.reshape((10000, 28 * 28)).astype("float32") / 255
train_images, val_images = images[10000:], images[:10000]
train_labels, val_labels = labels[10000:], labels[:10000]

In [4]:
def train_step(inputs, targets):
    with tf.GradientTape() as tape:
        predictions = model(inputs, training=True)
        loss = loss_fn(targets, predictions)
    gradients = tape.gradients(loss, model.trainable_weights)
    optimizer.apply_gradients(zip(model.trainable_weights, gradients))

### 自定义 fit 使用指标

在自定义 fit 的循环中,我们还是想使用 keras 的指标(内置的或者自定义的),前面已经提到过了.

- 更新保存权重 `update_state(y_true, y_pred)`
- 获取结果 `result()`


In [5]:
metric = keras.metrics.SparseCategoricalAccuracy()
targets = [0, 1, 2]
predictions = [[1, 0, 0], [0, 1, 0], [0, 0, 1]]
metric.update_state(targets, predictions)
current_result = metric.result()
print(f"result: {current_result:.2f}")

result: 1.00


一个使用指标的示例.


In [6]:
values = [0, 1, 2, 3, 4]
mean_tracker = keras.metrics.Mean()
for value in values:
    mean_tracker.update_state(value)
print(f"Mean of values: {mean_tracker.result():.2f}")

Mean of values: 2.00


跟踪 标量值的均值 -> `keras.metrics.Mean()`


最后重置结果 -> `metric.reset_state()`


## 一个完成的训练&评估循环



In [7]:
def get_mnist_model():
    inputs = keras.Input(shape=(28 * 28,))
    features = layers.Dense(512, activation="relu")(inputs)
    features = layers.Dropout(0.5)(features)
    outputs = layers.Dense(10, activation="softmax")(features)
    model = keras.Model(inputs, outputs)
    return model

In [10]:
model = get_mnist_model()

loss_fn = keras.losses.SparseCategoricalCrossentropy()  #损失函数
optimizer = keras.optimizers.RMSprop()  #优化器
metrics = [keras.metrics.SparseCategoricalAccuracy()]  # 指标
loss_tracking_metric = keras.metrics.Mean()  #监控均值


def train_step(inputs, targets):
    with tf.GradientTape() as tape:  #正向传播
        predictions = model(inputs, training=True)  #training = true
        loss = loss_fn(targets, predictions)  #损失
    # 反向传播
    gradients = tape.gradient(loss, model.trainable_weights)
    optimizer.apply_gradients(zip(gradients,
                                  model.trainable_weights))  #优化器更新权重

    logs = {}
    for metric in metrics:  #指标
        metric.update_state(targets, predictions)
        logs[metric.name] = metric.result()  #获取指标结果

    loss_tracking_metric.update_state(loss)  #损失均值
    logs["loss"] = loss_tracking_metric.result()
    return logs  #返回结果


一个完成的训练/评估循环.接收数据训练,返回 ift 进度条日志.


In [14]:
def reset_metrics():  #重置指标
    for metric in metrics:
        metric.reset_states()
    loss_tracking_metric.reset_states()

每个 epoch 我们都需要重置指标


In [15]:
training_dataset = tf.data.Dataset.from_tensor_slices(
    (train_images, train_labels))  #张量化
training_dataset = training_dataset.batch(32)  #一个批次长度 32
epochs = 3  #3轮 ephoch
for epoch in range(epochs):
    reset_metrics()  #重置指标
    for inputs_batch, targets_batch in training_dataset:  #训练
        logs = train_step(inputs_batch, targets_batch)
    print(f"Results at the end of epoch {epoch}")  #打印结果
    for key, value in logs.items():
        print(f"...{key}: {value:.4f}")

Results at the end of epoch 0
...sparse_categorical_accuracy: 0.9138
...loss: 0.2901
Results at the end of epoch 1
...sparse_categorical_accuracy: 0.9528
...loss: 0.1679
Results at the end of epoch 2
...sparse_categorical_accuracy: 0.9627
...loss: 0.1389


自定义训练流程.这里使用了 `tf.data.Dataset` 将传入数据拆分成了 32 尺寸的批次.

In [16]:
def test_step(inputs, targets):  #和训练循环差不多,省略了反向传播和权重更新
    predictions = model(inputs, training=False)  #评估阶段 预测值,training = false
    loss = loss_fn(targets, predictions)  #损失

    logs = {}
    for metric in metrics:  #指标
        metric.update_state(targets, predictions)
        logs["val_" + metric.name] = metric.result()  #预测值结果

    loss_tracking_metric.update_state(loss)
    logs["val_loss"] = loss_tracking_metric.result()
    return logs


val_dataset = tf.data.Dataset.from_tensor_slices(
    (val_images, val_labels))  #张量化
val_dataset = val_dataset.batch(32)  #拆分尺寸 32
reset_metrics()
for inputs_batch, targets_batch in val_dataset:
    logs = test_step(inputs_batch, targets_batch)
print("Evaluation results:")
for key, value in logs.items():  #取均值
    print(f"...{key}: {value:.4f}")

Evaluation results:
...val_sparse_categorical_accuracy: 0.9688
...val_loss: 0.1233


至此我们终于实现了和 fit 与 evaluation 差不多的基础功能.当然 fit 和 evaluation 还支持更多更复杂功能.

别急还没结束,还有性能优化.


## 使用 `tf.function` 注解加速


In [17]:
@tf.function
def test_step(inputs, targets):
    predictions = model(inputs, training=False)
    loss = loss_fn(targets, predictions)

    logs = {}
    for metric in metrics:
        metric.update_state(targets, predictions)
        logs["val_" + metric.name] = metric.result()

    loss_tracking_metric.update_state(loss)
    logs["val_loss"] = loss_tracking_metric.result()
    return logs


val_dataset = tf.data.Dataset.from_tensor_slices((val_images, val_labels))
val_dataset = val_dataset.batch(32)
reset_metrics()
for inputs_batch, targets_batch in val_dataset:
    logs = test_step(inputs_batch, targets_batch)
print("Evaluation results:")
for key, value in logs.items():
    print(f"...{key}: {value:.4f}")

Evaluation results:
...val_sparse_categorical_accuracy: 0.9688
...val_loss: 0.1233


直接编写的 test_step 执行时会如同所有 python 代码一样,逐行执行,方便调试.但是这样从性能角度并不划算.

简单添加 `@tf.function` 就能使得 tensorflow 单独编译这个函数,基本加速了一倍差不多(cpu).

但是编码/调试阶段不要乱用,`@tf.function` 会干扰调试找 bug.


## 带有 fit 的自定义训练循环

完全重新函数实现 fit 的功能,耗时耗力,灵活性高,但是无法使用 fit 的各种便利的功能.

实际上在使用 fit 和完全自定义训练循环之外,还有一个中间地带.可以编写自定义的训练步骤,再让 fit 完成其他工作.

覆盖 Model 的 `train_step`方法,这个方法是 fit 为每轮数据调用的训练方法.就像是偷偷替换了汽车引擎,但是 fit 完全不知道,还在跑.

In [19]:
loss_fn = keras.losses.SparseCategoricalCrossentropy()  #损失函数
loss_tracker = keras.metrics.Mean(name="loss")  #监控均值


class CustomModel(keras.Model):
    def train_step(self, data):  #覆盖训练函数
        inputs, targets = data
        with tf.GradientTape() as tape:  #正向传播
            predictions = self(inputs, training=True)
            loss = loss_fn(targets, predictions)
        gradients = tape.gradient(loss, model.trainable_weights)  #反向传播
        optimizer.apply_gradients(zip(gradients,
                                      model.trainable_weights))  #优化器更新权重

        loss_tracker.update_state(loss)  #更新均值
        return {"loss": loss_tracker.result()}  #返回字典

    @property
    def metrics(self):#指标
        return [loss_tracker]

子类 CustomModel

- 覆盖了 train_step 方法,训练循环几乎和上一小节一致.
- 最终返回一个结果字典


In [20]:
inputs = keras.Input(shape=(28 * 28, ))
features = layers.Dense(512, activation="relu")(inputs)
features = layers.Dropout(0.5)(features)
outputs = layers.Dense(10, activation="softmax")(features)
model = CustomModel(inputs, outputs)  #模型实例化

model.compile(optimizer=keras.optimizers.RMSprop())
model.fit(train_images, train_labels, epochs=3)  #训练


Epoch 1/3
Epoch 2/3
Epoch 3/3


<tensorflow.python.keras.callbacks.History at 0x1a557358550>

覆盖过 `train_step` 的类实例化,并不妨碍其与其他模型组合.

覆盖 `train_step` 不需要 `@tf.function` 注解,框架会自动进行优化.

In [None]:
class CustomModel(keras.Model):
    def train_step(self, data):
        inputs, targets = data
        with tf.GradientTape() as tape:
            predictions = self(inputs, training=True)
            loss = self.compiled_loss(targets, predictions)  #计算损失
        gradients = tape.gradient(loss, model.trainable_weights)
        optimizer.apply_gradients(zip(gradients,
                                      model.trainable_weights))  #优化器更新权重
        self.compiled_metrics.update_state(targets, predictions)  #更新指标
        return {m.name: m.result() for m in self.metrics}  #实际返回的指标列表


上面的例子有一点不好,优化器/损失函数/指标是在 `train_step` 定义的.实际上在 model 调用 `model.compile` 后,在 `train_step` 是可以访问到传入的优化器/损失函数/指标的

- self.compiled_loss 损失函数
- self.compiled_metrics 指标列表的封装,可以调用 `self.compiled_metrics.update_state` 一次性更新全部指标.
- self.metrics 传递给 compile 的实际指标列表.


In [21]:
inputs = keras.Input(shape=(28 * 28, ))
features = layers.Dense(512, activation="relu")(inputs)
features = layers.Dropout(0.5)(features)
outputs = layers.Dense(10, activation="softmax")(features)
model = CustomModel(inputs, outputs)

model.compile(optimizer=keras.optimizers.RMSprop(),
              loss=keras.losses.SparseCategoricalCrossentropy(),
              metrics=[keras.metrics.SparseCategoricalAccuracy()])
model.fit(train_images, train_labels, epochs=3)

Epoch 1/3
Epoch 2/3
Epoch 3/3


<tensorflow.python.keras.callbacks.History at 0x1a55cac5d90>

while 以上就是这本书需要用到的全部 keras api.现在你应该能做一些之前做不到的事情了.
