# **自定义fit()中发生的操作**

### **介绍**

当你使用监督学习时，你可以使用`fit()`，一切都可以顺利进行。

当你需要从头开始编写自己的训练循环时，可以使用`GradientTape`并控制每个小细节。

但是，如果你需要自定义训练算法，并且仍然想要使用fit()相关的便利特性，例如回调，内置分发支持或分步融合，该怎么办？

Keras的核心原则是**逐步揭示复杂性**。你应该始终能够逐步进入较底层的工作流程。如果高级功能与你的案例不完全匹配，那么你不应该弃用高级功能转向底层功能。你应该在更好地控制小细节的同时，保留相当数量的高级功能带来的便利。

当需要自定义`fit()`时，你应该覆盖`Model`类的训练步骤方法。这些方法是`fit()`在每个批量数据中调用的函数。接着，你将保证`fit()`能够照常调用--它将运行你自己的学习算法。

请注意，上述模式同样适用于函数式API构建模型。无论是构建`Sequential`模型，函数式API模型还是子类化模型，都可以执行此操作。

接下来，让我们看看上述模式是如何工作的。

### **引入**
要求TensorFlow 2.2或更高版本。

In [1]:
import tensorflow as tf
from tensorflow import keras

### **第一个简单的示例**

让我们从一个简单的示例开始：

+ 我们创建一个新的[`keras.Model`](https://www.tensorflow.org/api_docs/python/tf/keras/Model)子类
+ 我们只是重写`train_step(self, data)`方法
+ 我们返回一个将指标名称（包括损失）映射到其当前值的字典

输入参数`data`是传递用于fit训练数据的参数：

+ 如果你通过调用`fit(x, y, ...)`传递numpy数组，那么`data`将是元组`(x, y)`
+ 如果你通过调用`fit(dataset, ...)`传递的是`tf.data.Dataset`，那么`data`将会是dataset生成的批量数据

类似于你已经熟悉的内容，在该`train_step`方法的主体中，我们也将进行定期的训练更新。重要的是，我们通过`self.compiled_loss`计算损失，该损失包含了传递给`compile()`的损失函数。

同样，我们调用`self.compiled_metrics.update_state(y, y_pred)`来更新传入`compile()`的指标的状态，并从通过`self.metrics`查询最终结果以检索其当前值。

In [2]:
class CustomModel(keras.Model):
    def train_step(self, data):
        # 分离数据，它的结构取决于你的模型以及传递给`fit（）`的内容。
        x, y = data

        with tf.GradientTape() as tape:
            y_pred = self(x, training=True)  # 前向传递
            # 计算损失值（在compile()中配置的损失函数）
            loss = self.compiled_loss(y, y_pred, regularization_losses=self.losses)

        # 计算梯度
        trainable_vars = self.trainable_variables
        gradients = tape.gradient(loss, trainable_vars)
        # 更新权重
        self.optimizer.apply_gradients(zip(gradients, trainable_vars))
        # 更新指标(包括跟踪损失的指标)
        self.compiled_metrics.update_state(y, y_pred)
        # 返回一个将指标名称映射到当前值的字典
        return {m.name: m.result() for m in self.metrics}

让我们尝试调用一下：

In [3]:
import numpy as np

# 构造并编译CustomModel的实例
inputs = keras.Input(shape=(32,))
outputs = keras.layers.Dense(1)(inputs)
model = CustomModel(inputs, outputs)
model.compile(optimizer="adam", loss="mse", metrics=["mae"])

# 照常使用`fit`
x = np.random.random((1000, 32))
y = np.random.random((1000, 1))
model.fit(x, y, epochs=3)

Epoch 1/3
Epoch 2/3
Epoch 3/3


<tensorflow.python.keras.callbacks.History at 0x7fb6ef196828>

### **更底层的操作**

当然，你可以直接跳过`compile()`中传递的损失函数，然后尽可能地在`train_step`中手动完成。指标也是如此。这是一个较低级的示例，仅仅使用了`compile()`配置优化器：

In [4]:
mae_metric = keras.metrics.MeanAbsoluteError(name="mae")
loss_tracker = keras.metrics.Mean(name="loss")


class CustomModel(keras.Model):
    def train_step(self, data):
        x, y = data

        with tf.GradientTape() as tape:
            y_pred = self(x, training=True)  # 前向传递
            # 计算损失
            loss = keras.losses.mean_squared_error(y, y_pred)

        # 计算梯度
        trainable_vars = self.trainable_variables
        gradients = tape.gradient(loss, trainable_vars)

        # 更新权重
        self.optimizer.apply_gradients(zip(gradients, trainable_vars))

        # 计算指标
        loss_tracker.update_state(loss)
        mae_metric.update_state(y, y_pred)
        return {"loss": loss_tracker.result(), "mae": mae_metric.result()}


# 构造并编译CustomModel的实例
inputs = keras.Input(shape=(32,))
outputs = keras.layers.Dense(1)(inputs)
model = CustomModel(inputs, outputs)

# 我们在这里不传递损失和指标
model.compile(optimizer="adam")

# 照常使用`fit` -- 你依旧可以使用回调等等
x = np.random.random((1000, 32))
y = np.random.random((1000, 1))
model.fit(x, y, epochs=1)



<tensorflow.python.keras.callbacks.History at 0x7fb6ecac25c0>

请注意，使用此设置后，你需要在每个epoch之后，或者训练和评估间，手动调用`reset_states()`

### **支持sample_weight与class_weight**

你可能已经注意到，我们的第一个基本示例中没有提及样本权重。如果要支持`fit()`的参数`sample_weight`和`class_weight`，则只需执行以下操作：

+ 将`sample_weight`从`data`参数中分离出来
+ 将其传递给`compiled_loss`＆`compiled_metrics`（当然，如果你不依赖`compile()`传递损失和指标，也可以手动应用它）
+ 仅此而已，下面进行具体示例

In [5]:
class CustomModel(keras.Model):
    def train_step(self, data):
        # 分离数据，它的结构取决于你的模型以及传递给`fit（）`的内容。
        if len(data) == 3:
            x, y, sample_weight = data
        else:
            x, y = data

        with tf.GradientTape() as tape:
            y_pred = self(x, training=True)  # Forward pass
            # 计算损失
            # 这里的损失函数是由`compile()`配置的
            loss = self.compiled_loss(
                y,
                y_pred,
                sample_weight=sample_weight,
                regularization_losses=self.losses,
            )

        # 计算梯度
        trainable_vars = self.trainable_variables
        gradients = tape.gradient(loss, trainable_vars)

        # 更新权重
        self.optimizer.apply_gradients(zip(gradients, trainable_vars))

        # 更新指标
        # 这里的指标是由`compile()`配置的
        self.compiled_metrics.update_state(y, y_pred, sample_weight=sample_weight)

        # 返回一个将指标名称映射到当前值的字典
        # 请注意，它将包括在self.metrics中跟踪的损失
        return {m.name: m.result() for m in self.metrics}


# 构造并编译CustomModel的实例
inputs = keras.Input(shape=(32,))
outputs = keras.layers.Dense(1)(inputs)
model = CustomModel(inputs, outputs)
model.compile(optimizer="adam", loss="mse", metrics=["mae"])

# 现在你可以使用sample_weight参数
x = np.random.random((1000, 32))
y = np.random.random((1000, 1))
sw = np.random.random((1000, 1))
model.fit(x, y, sample_weight=sw, epochs=3)

Epoch 1/3
Epoch 2/3
Epoch 3/3


<tensorflow.python.keras.callbacks.History at 0x7fb6ecac25f8>

### **定义你自己的评估步骤**

如果要进行与调用model.evaluate()相同的操作，该怎么做？其实很简单，你只要以完全相同的方式重写`test_step`就可以了，具体步骤如下：

In [6]:
class CustomModel(keras.Model):
    def test_step(self, data):
        # 分离数据
        x, y = data
        # 计算预测
        y_pred = self(x, training=False)
        # 更新跟踪损失的指标
        self.compiled_loss(y, y_pred, regularization_losses=self.losses)
        # 更新指标
        self.compiled_metrics.update_state(y, y_pred)
        # 返回一个将指标名称映射到当前值的字典
        # 请注意，它将包括在self.metrics中跟踪的损失
        return {m.name: m.result() for m in self.metrics}


# 构造并编译CustomModel的实例
inputs = keras.Input(shape=(32,))
outputs = keras.layers.Dense(1)(inputs)
model = CustomModel(inputs, outputs)
model.compile(loss="mse", metrics=["mae"])

# 通过自定义的test_step进行评估
x = np.random.random((1000, 32))
y = np.random.random((1000, 1))
model.evaluate(x, y)



[0.2646936774253845, 0.4124218821525574]

### **总结：端到端GAN(生成式对抗网络)示例**

让我们来看一个利用你刚刚学到的知识编写的端到端示例。

让我们思考一下：

+ 一个生成器网络用于生成`28x28x1`图像
+ 一个判别器网络用于将`28x28x1`图像分为两类（“假”和“真”）
+ 两个网络各一个优化器
+ 一个损失函数，用于训练判别器。

In [7]:
from tensorflow.keras import layers

# 创建判别器
discriminator = keras.Sequential(
    [
        keras.Input(shape=(28, 28, 1)),
        layers.Conv2D(64, (3, 3), strides=(2, 2), padding="same"),
        layers.LeakyReLU(alpha=0.2),
        layers.Conv2D(128, (3, 3), strides=(2, 2), padding="same"),
        layers.LeakyReLU(alpha=0.2),
        layers.GlobalMaxPooling2D(),
        layers.Dense(1),
    ],
    name="discriminator",
)

# 创建生成器
latent_dim = 128
generator = keras.Sequential(
    [
        keras.Input(shape=(latent_dim,)),
        # 我们想生成128个系数以reshape为7x7x128的图
        layers.Dense(7 * 7 * 128),
        layers.LeakyReLU(alpha=0.2),
        layers.Reshape((7, 7, 128)),
        layers.Conv2DTranspose(128, (4, 4), strides=(2, 2), padding="same"),
        layers.LeakyReLU(alpha=0.2),
        layers.Conv2DTranspose(128, (4, 4), strides=(2, 2), padding="same"),
        layers.LeakyReLU(alpha=0.2),
        layers.Conv2D(1, (7, 7), padding="same", activation="sigmoid"),
    ],
    name="generator",
)

下面演示的是一个完整的GAN（生成式对抗网络）类，可以重写`compile()`使用其自己的签名，并在`train_step`中的17行实现了整个GAN算法：

In [8]:
class GAN(keras.Model):
    def __init__(self, discriminator, generator, latent_dim):
        super(GAN, self).__init__()
        self.discriminator = discriminator
        self.generator = generator
        self.latent_dim = latent_dim

    def compile(self, d_optimizer, g_optimizer, loss_fn):
        super(GAN, self).compile()
        self.d_optimizer = d_optimizer
        self.g_optimizer = g_optimizer
        self.loss_fn = loss_fn

    def train_step(self, real_images):
        if isinstance(real_images, tuple):
            real_images = real_images[0]
        # 在潜在空间中采样随机点
        batch_size = tf.shape(real_images)[0]
        random_latent_vectors = tf.random.normal(shape=(batch_size, self.latent_dim))

        # 将它们解码为假图像
        generated_images = self.generator(random_latent_vectors)

        # 将它们与真实图像结合
        combined_images = tf.concat([generated_images, real_images], axis=0)

        # 串联区分真假图像的标签
        labels = tf.concat(
            [tf.ones((batch_size, 1)), tf.zeros((batch_size, 1))], axis=0
        )
        # 在标签上添加随机噪音--非常重要的技巧！
        labels += 0.05 * tf.random.uniform(tf.shape(labels))

        # 训练判别器
        with tf.GradientTape() as tape:
            predictions = self.discriminator(combined_images)
            d_loss = self.loss_fn(labels, predictions)
        grads = tape.gradient(d_loss, self.discriminator.trainable_weights)
        self.d_optimizer.apply_gradients(
            zip(grads, self.discriminator.trainable_weights)
        )

        # 在潜在空间中采样随机点
        random_latent_vectors = tf.random.normal(shape=(batch_size, self.latent_dim))

        # 组装标有“所有真实图像”的标签
        misleading_labels = tf.zeros((batch_size, 1))

        # 训练生成器（请注意，我们*不*更新判别器的权重）！
        with tf.GradientTape() as tape:
            predictions = self.discriminator(self.generator(random_latent_vectors))
            g_loss = self.loss_fn(misleading_labels, predictions)
        grads = tape.gradient(g_loss, self.generator.trainable_weights)
        self.g_optimizer.apply_gradients(zip(grads, self.generator.trainable_weights))
        return {"d_loss": d_loss, "g_loss": g_loss}

让我们试驾一下：

In [9]:
# 准备数据集 我们将MNIST数字同时用于训练和测试
batch_size = 64
(x_train, _), (x_test, _) = keras.datasets.mnist.load_data()
all_digits = np.concatenate([x_train, x_test])
all_digits = all_digits.astype("float32") / 255.0
all_digits = np.reshape(all_digits, (-1, 28, 28, 1))
dataset = tf.data.Dataset.from_tensor_slices(all_digits)
dataset = dataset.shuffle(buffer_size=1024).batch(batch_size)

gan = GAN(discriminator=discriminator, generator=generator, latent_dim=latent_dim)
gan.compile(
    d_optimizer=keras.optimizers.Adam(learning_rate=0.0003),
    g_optimizer=keras.optimizers.Adam(learning_rate=0.0003),
    loss_fn=keras.losses.BinaryCrossentropy(from_logits=True),
)

# 为了控制执行时间，我们只训练100个批量，你也可以训练整个数据
# 集，这样的话将需要大约20个epoch才能获得不错的结果。
gan.fit(dataset.take(100), epochs=1)

Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz


<tensorflow.python.keras.callbacks.History at 0x7fb6eaf347f0>

深度学习背后的想法很简单，那么为什么实现它们会很困难？