# T7. 使用 Callback 和 Trainer.on 实现个性化的训练过程

## 1. fastNLP 中的回调模块 Callback

&emsp;&emsp;`Callback n. 回拨的电话；回叫；召回，`在程序中一般被称作被称作**回调函数**。如果把函数的指针（地址）作为参数传递给另一个函数，当这个指针被用来调用它所指向的函数时，我们就说这是回调函数。回调函数并不会由该函数的实现方直接调用，而是在特定的事件或条件发生时由另外的一方调用的，用于对该事件或条件进行响应。众所周知，机器学习的过程中往往会包含 `初始化`、`训练开始`、`前向传播开始`、`前向传播结束`、`反向传播开始`、`反向传播结束`、`训练结束` 等等数个阶段。`fastNLP` 为了给予用户更广泛的定制空间，为训练增加灵活性也包含了回调机制，并实现成了一个单独的类 `fastNLP.core.Callback`。接下来我们为向您介绍如何使用 `Callback` 来自由地定制您的训练过程。

### 1.1 TrainerState 和 State

&emsp;&emsp;首先需要了解的是我们可以从 `Trainer` 中获取哪些状态。`Trainer` 中实例化了一个类 `TrainerState` 专门用来记录训练中的特定状态，它包括：

名称|简要介绍|
----|----|
 `n_epochs` | 训练 `epoch` 的总数 |
 `cur_epoch_idx` | 当前正处在第几个 `epoch` 中，从 0 开始 |
 `global_forward_batches` | 从训练开始到目前共 `forward` 了几个 `batch` |
 `batch_idx_in_epoch` | 当前正处在该次 `epoch` 的第几个 `batch` 中 |
 `num_batches_per_epoch` | 每次迭代总共包含多少个 `batch` |
 `n_batches` | 整个训练过程中的 `batch` 数目，满足 `n_batches = num_batches_per_epoch * n_epochs` |

&emsp;&emsp;我们可以在训练过程中通过 `Trainer` 直接访问这些状态来获取训练的信息，如 `trainer.cur_epoch_idx`、`trainer.batch_idx_in_epoch` 等。

&emsp;&emsp;除此之外，`fastNLP` 还在 `Trainer` 中内置了一个类 `State`，您可以通过 `trainer.state` 来访问该类。`TrainerState` 只能在训练中被 `Trainer` 自动更新，而 `State` 则可以让用户随时记录自己需要的信息。其具体的使用方式将会在下一部分为您介绍。

### 1.2 Callback 的使用

&emsp;&emsp;回调模块 `Callback` 可以从 `fastNLP.core` 导入，任何具体的 `Callback` 都应当继承该类。`Callback` 包含许多函数，分别会在训练的不同时机调用。并且 `fastNLP` 会传入 `trainer`、`driver` 等参数来帮助处理信息。它们包括：

函数 | 传入参数 | 调用时机
----|----|----|
`on_after_trainer_initialized` | trainer, driver | `Trainer` 初始化完成后 |
`on_sanity_check_begin` | trainer | **预跑** 阶段开始前 |
`on_sanity_check_end` | trainer, sanity_check_res | **预跑** 阶段结束后 |
`on_train_begin` | trainer | 训练开始前 |
`on_train_epoch_begin` | trainer | 一次 `epoch` 开始前 |
`on_fetch_data_begin` | trainer | 从 `dataloader` 中取数据前 |
`on_fetch_data_end` | trainer | 从 `dataloader` 中取数据后 |
`on_train_batch_begin` | trainer, batch, indices | 前向传播一个 `batch` 前 |
`on_before_backward` | trainer, outputs | 反向传播前 |
`on_after_backward` | trainer | 反向传播后 |
`on_before_zero_grad` | trainer, optimizers | 梯度清零前 |
`on_after_zero_grad` | trainer, optimizers | 梯度清零后 |
`on_before_optimizers_step` | trainer, optimizers | 执行 `optimizer.step()` 前 |
`on_after_optimizers_step` | trainer, optimizers | 执行 `optimizer.step()` 后 |
`on_train_batch_end` | trainer | 前向传播一个 `batch` 完成，`batch_idx_in_epoch` 更新后 |
`on_train_epoch_end` | trainer | 一次 `epoch` 结束，`cur_epoch_idx` 更新后 |
`on_evaluate_begin` | trainer | 执行验证 `evaluate` 前 |
`on_evaluate_end` | trainer, results | 执行验证 `evaluate` 后 |
`on_train_end` | trainer | 整个训练结束后 |
`on_exception` | trainer, exception | 发生异常时 |
`on_save_model` | trainer | 模型保存前 |
`on_load_model` | trainer | 模型加载后 |
`on_save_checkpoint` | trainer | 断点保存前 |
`on_load_checkpoint` | trainer | 断点保存后 |

其调用时机顺序大致如下：

```python
Trainer.__init__():
    on_after_trainer_initialized(trainer, driver)
Trainer.run():
    if num_eval_sanity_batch>0:
        on_sanity_check_begin(trainer)  # 如果设置了num_eval_sanity_batch
        on_sanity_check_end(trainer, sanity_check_res)
    try:
        on_train_begin(trainer)
        while cur_epoch_idx < n_epochs:
            on_train_epoch_begin(trainer)
            while batch_idx_in_epoch<=num_batches_per_epoch:
                on_fetch_data_begin(trainer)
                batch = next(dataloader)
                on_fetch_data_end(trainer)
                on_train_batch_begin(trainer, batch, indices)
                on_before_backward(trainer, outputs)  # 其中 outputs 是经过 output_mapping（如果设置了） 后的，否则即为 model 的输出。
                driver.backward()
                on_after_backward(trainer)
                on_before_optimizers_step(trainer, optimizers)  # 实际调用受到 accumulation_steps 影响
                driver.step()
                on_after_optimizers_step(trainer, optimizers)  # 实际调用受到 accumulation_steps 影响
                on_before_zero_grad(trainer, optimizers)  # 实际调用受到 accumulation_steps 影响
                driver.zero_grad()
                on_after_zero_grad(trainer, optimizers)  # 实际调用受到 accumulation_steps 影响
                batch_idx_in_epoch += 1
                on_train_batch_end(trainer)
            cur_epoch_idx += 1
            on_train_epoch_end(trainer)
    except BaseException:
        self.on_exception(trainer, exception)
    finally:
        on_train_end(trainer)
```

&emsp;&emsp;其它的函数例如 `on_evaluate_begin(trainer)`、`on_evaluate_end(trainer, results)`、`on_save_model(trainer)`、`on_load_model(trainer)`、`on_save_checkpoint(trainer)`、`on_load_checkpoint(trainer)` 将根据需要在 `Trainer.run` 的不同时机被调用。

&emsp;&emsp;接下来我们将通过实例程序来演示 `Callback` 的用法。首先加载演示用的 `sst-2` 数据集和模型：

In [1]:
from datasets import load_dataset

# sst-2 数据集
sst2data = load_dataset('glue', 'sst2')

Reusing dataset glue (/remote-home/shxing/.cache/huggingface/datasets/glue/sst2/1.0.0/dacbe3125aa31d7f70367a07a8a9e72a5a0bfeb5fc42e75c9db75b96da6053ad)


  0%|          | 0/3 [00:00<?, ?it/s]

In [2]:
from fastNLP import DataSet, Vocabulary, prepare_dataloader

# 仅取 20 条数据
dataset = DataSet.from_datasets(sst2data['train'])[:100]
dataset.apply_more(lambda ins:{'words': ins['sentence'].lower().split(), 'target': ins['label']}, 
                   progress_bar="tqdm")
dataset.add_seq_len('words')
dataset.delete_field('sentence')
dataset.delete_field('label')
dataset.delete_field('idx')

vocab = Vocabulary()
vocab.from_dataset(dataset, field_name='words')
vocab.index_dataset(dataset, field_name='words')

# 训练集和验证集 0.8:0.2
train_dataset, evaluate_dataset = dataset.split(ratio=0.8)

print(train_dataset)

train_dataloader = prepare_dataloader(
    train_dataset, batch_size=16, shuffle=True
)
val_dataloader = prepare_dataloader(
    evaluate_dataset, batch_size=4, shuffle=False
)

Processing:   0%|          | 0/100 [00:00<?, ?it/s]

Output()

+------------------------------+--------+---------+
| words                        | target | seq_len |
+------------------------------+--------+---------+
| [466, 18, 467, 468, 39]      | 1      | 5       |
| [454, 455, 3, 16, 456, 45... | 1      | 22      |
| [2, 304]                     | 0      | 2       |
| [68, 7, 161, 162]            | 0      | 4       |
| [293, 294, 295, 296, 25, ... | 1      | 21      |
| [38, 104, 337, 4]            | 0      | 4       |
| [237, 66, 3]                 | 0      | 3       |
| [30, 163, 164, 69, 165, 9... | 0      | 17      |
| [47, 218, 6, 219, 11, 220... | 0      | 21      |
| [5, 283, 284, 285, 286, 6... | 0      | 8       |
| [3, 31, 111, 3, 11, 22, 1... | 1      | 13      |
| [335, 16, 5, 336]            | 1      | 4       |
| ...                          | ...    | ...     |
+------------------------------+--------+---------+


In [3]:
from fastNLP.models.torch import CNNText
model = CNNText(embed=(len(vocab), 224), num_classes=2, dropout=0.2)

&emsp;&emsp;接着我们导入 `Callback` 模块，创建一个自定义的 `MyCallback` 类，并且在每个 `epoch`、每个 `batch` 前后输出训练的进度和当前使用的数据。通过 `trainer.cur_epoch_idx` 可以获取当前的 `epoch`，`trainer.batch_idx_in_epoch` 可以获取已经处理到第几个 `batch`；而 `on_train_batch_begin` 函数中的 `batch` 和 `indices` 则分别代表**当前批次的数据**和**所用数据的索引**；`on_before_backward` 函数中的 `outputs` 则是从模型中返回的数据。

&emsp;&emsp;同时，我们还用到了上文提到的 `state` 属性，在这里我们用它来记录训练中的平均 `loss`。大体上它可以被看作一个字典，您可以使用 `state.loss` 或者 `state['loss']` 来打印其中 `'loss'` 对应的内容，但是赋值操作只能通过 `state['loss'] = 1` 这样的语句进行。

&emsp;&emsp;有一点需要注意：如果您需要进行断点重训，那么请确保您记录在 `state` 中的内容是**可序列化的**。

In [4]:
from fastNLP import Callback

class MyCallback(Callback):
    def __init__(self):
        pass

    def on_train_begin(self, trainer):
        print("Now start training...")

    def on_train_epoch_begin(self, trainer):
        print(f"Epoch {trainer.cur_epoch_idx + 1}/{trainer.n_epochs}")
        trainer.state['loss'] = 0

    def on_train_batch_begin(self, trainer, batch, indices):
        print(f"Training Batch {trainer.batch_idx_in_epoch + 1}/{trainer.num_batches_per_epoch}...")
        print("Batch Words:", batch['words'].tolist(), "Target:", batch['target'].tolist(), "Indices:", indices)
        

    def on_before_backward(self, trainer, outputs):
        print("Loss:", outputs["loss"].tolist())
        trainer.state['loss'] += outputs['loss'].item()

    def on_train_epoch_end(self, trainer):
        ave_loss = trainer.state['loss'] / trainer.num_batches_per_epoch
        print(f"End Epoch {trainer.cur_epoch_idx}, Average Loss {ave_loss}")

    def on_train_end(self, trainer):
        print("Quit training process.")

&emsp;&emsp;最后开始训练，观察控制台的输出。`Trainer` 的 `callbacks` 参数要求传入一个列表，然后在其内部通过 `CallbackManager` 统一进行调用，因此您可以多种 `Callback` 同时应用在 `Trainer` 上，这也是 `fastNLP` 灵活性的体现。

In [5]:
from torch.optim import Adam

from fastNLP import Trainer

adam = Adam(model.parameters(), 1e-3)
trainer = Trainer(
    model=model, train_dataloader=train_dataloader, optimizers=adam,
    device='cpu', n_epochs=1, callbacks=[MyCallback()],
)
trainer.run()

Output()

Quit training process.


&emsp;&emsp;删除变量并释放内存，为下一次训练做准备：

In [6]:
import gc

del trainer
del model
del adam

gc.collect()

46

### 1.3 使用 monitor 监控训练结果

&emsp;&emsp;之前我们在教程中提到了 `Trainer` 的 `monitor` 参数，在这里我们将介绍如何将它和 `Callback` 结合起来。`fastNLP` 定义了一种特殊的 `HasMonitorCallback`，如果您想在训练中对 `Metric` 输出的结果进行监控，那么该 `Callback` 将是一个很好的工具。它包含三个参数：

- `monitor`：需要监视的参数，可以为 `None`、`str` 或一个函数。
    - 如果为 `None`，则将在 `Trainer` 初始化后在 `Trainer` 中寻找 `monitor`
    - 如果为 `str`，那么会在 `evaluation` 的结果中查找；如果没有找到，则会按照最长字符串匹配算法查找最匹配的哪个
    - 如果为函数，那么这个函数的参数为 `evaluation` 的结果（字典），并且返回一个 `float` 作为结果
- `larger_better`：如何判定一个参数正在变好；如果为 `True` 则认为越大越好。
- `must_have_monitor`：是否要求该 `Callback` 一定要指定 `monitor`。

&emsp;&emsp;`HasMonitorCallback` 也包含数个工具函数，其中较为常用的有：

- `is_better_results`：它可以判定传入的结果是否变得更好，并且在结果变好之后会记住最新的结果作为下一次的评判标准。
- `get_monitor_value`：它可以从结果中获取 `monitor` 对应的值（返回应用了 `.item()` 函数后的标量）

如果您想详细地查看 `HasMonitorCallback` 使用的函数，请查看 [HasMonitorCallback 的文档](../../fastNLP.core.callbacks.has_monitor_callback.rst)。

&emsp;&emsp;那么如果我们想要在上述程序的训练过程中查看正确率的变化，可以按照下面的代码定义一个 `WatchAccCallback`：

In [7]:
from fastNLP import HasMonitorCallback

class WatchAccCallback(HasMonitorCallback):
    def __init__(self):
        super(WatchAccCallback, self).__init__(
            monitor='acc#acc',
            larger_better=True
        )

    def on_evaluate_end(self, trainer, results):
        if self.is_better_results(results):
            print(f"Epoch: {trainer.cur_epoch_idx + 1}/{trainer.n_epochs}, "
                  f"'acc' is better: {self.get_monitor_value(results)}")
        else:
            print(f"Epoch: {trainer.cur_epoch_idx + 1}/{trainer.n_epochs}, "
                  f"'acc' is not better: {self.get_monitor_value(results)}")

&emsp;&emsp;在每次评测结束后，`WatchAccCallback` 会根据 `acc#acc` 变大与否输出响应的信息。接着让我们定义好评测方法 `Accuracy` 再运行一次代码（因为选取的数据量过少，因此正确率低是正常的）：

In [8]:
from fastNLP import Accuracy

model = CNNText(embed=(len(vocab), 224), num_classes=2, dropout=0)
metrics = {'acc': Accuracy()}
sgd = Adam(model.parameters(), 1e-3)
trainer = Trainer(
    model=model, train_dataloader=train_dataloader, optimizers=sgd,
    device='cpu', n_epochs=40, evaluate_dataloaders=val_dataloader,
    metrics=metrics, callbacks=[WatchAccCallback()], evaluate_every=-10
)
trainer.run()

Output()

Output()

&emsp;&emsp;可以看到在结果中如实输出了 `acc#acc` 的变化情况。利用这一点，`fastNLP` 可以实现许多根据训练结果动态执行的 `Callback`：[LoadBestModelCallback](../../fastNLP.core.callbacks.load_best_model_callback.rst)、[EarlyStopCallback](../../fastNLP.core.callbacks.early_stop_callback.rst)、[FitlogCallback](../../fastNLP.core.callbacks.fitlog_callback.rst)、[MoreEvaluateCallback](../../fastNLP.core.callbacks.more_evaluate_callback.rst)。

### 1.4 fastNLP 预定义的 Callback

&emsp;&emsp;`fastNLP` 已经内置了许多 `Callback` 来帮助用户实现训练中的一些额外功能，它们包括：

名称|简要介绍|备注|
----|----|----|
 [CheckpointCallback](../../fastNLP.core.callbacks.checkpoint_callback.rst) | 在训练过程中根据不同的条件保存模型或者断点 | |
 [RichCallback](../../fastNLP.core.callbacks.progress_callback.rst#fastNLP.core.callbacks.progress_callback.RichCallback) | 使用 `rich` 包显示进度条 | 详见 `Trainer` 的 `progress_bar` 参数 |
 [TqdmCallback](../../fastNLP.core.callbacks.progress_callback.rst#fastNLP.core.callbacks.progress_callback.TqdmCallback) | 使用 `tqdm` 包显示进度条 | 详见 `Trainer` 的 `progress_bar` 参数 |
 [RawTextCallback](../../fastNLP.core.callbacks.progress_callback.rst#fastNLP.core.callbacks.progress_callback.RawTextCallback) | 在控制台输出训练进度 | 详见 `Trainer` 的 `progress_bar` 参数 |
 [LRSchedCallback](../../fastNLP.core.callbacks.lr_scheduler_callback.rst) | 在训练中调用 `Scheduler` 优化训练 | |
 [LoadBestModelCallback](../../fastNLP.core.callbacks.load_best_model_callback.rst) | 自动保存并在最后加载效果最好的模型 | |
 [EarlyStopCallback](../../fastNLP.core.callbacks.early_stop_callback.rst) | 多次 `evaluate` 后结果没有提升时提前中止训练 | |
 [MoreEvaluateCallback](../../fastNLP.core.callbacks.more_evaluate_callback.rst) | 在训练中使用不同的 `evaluate_fn` | |
 [FitlogCallback](../../fastNLP.core.callbacks.fitlog_callback.rst) | 将训练中的信息记录到 `fitlog` 中 | 需要安装 `fitlog` |
 [TimerCallback](../../fastNLP.core.callbacks.timer_callback.rst) | 为训练的各个阶段计时 | |
 [TorchWarmupCallback](../../fastNLP.core.callbacks.torch_callbacks.torch_lr_sched_callback.rst) | 在 `pytorch` 中预热学习率 | 仅限 `pytorch` 框架 |
 [TorchGradClipCallback](../../fastNLP.core.callbacks.torch_callbacks.torch_grad_clip_callback.rst) | 在 `pytorch` 中进行梯度截断 | 仅限 `pytorch` 框架 |

&emsp;&emsp;删除变量并释放内存，为下一次训练做准备：

In [9]:
import gc

del trainer
del metrics
del sgd
del model

gc.collect()

394

## 2. fastNLP 中的 on 函数

&emsp;&emsp;

### 2.1 Trainer.on 与事件 Event

&emsp;&emsp;很多时候我们可能只想在训练的一个或两个阶段进行定制，比如只想在每个 `epoch` 结束时查看一下 `loss` 或评测的结果，此时如果再单独实现一个 `Callback` 未免有些繁琐。为了优化这方面的体验，`fastNLP` 提供了另一种回调机制，即 `Trainer` 的 `on` 函数和事件 `Event`。比如在一次迭代后要输出 `loss`，可以按以下方式编写代码：

In [10]:
from fastNLP.core import Event

@Trainer.on(Event.on_before_backward())
def print_loss(trainer, outputs):
    print("Total Batches {}/{} Loss {}".format(
        trainer.global_forward_batches + 1, trainer.n_batches,
        outputs['loss'].item()))

model = CNNText(embed=(len(vocab), 100), num_classes=2, dropout=0.1)
adam = Adam(model.parameters(), 1e-3)
trainer = Trainer(
    model=model, train_dataloader=train_dataloader, optimizers=adam,
    device='cpu', n_epochs=2
)
trainer.run()

Output()

&emsp;&emsp;这样，我们便可以通过一个十分简便的方式实现回调机制了。`Event` 所包含的调用时机和 `Callback` 相同，也具有相当高的自由度。除此之外，`Event` 的每个 `on_xxx` 系列函数都有三个参数：`every`、`once` 和 `filter_fn`。

&emsp;&emsp;`every`，顾名思义，就是每触发多少次就真正执行一次：

In [11]:
@Trainer.on(Event.on_before_backward(every=2))
def print_loss_every(trainer, outputs):
    print("Total Batches {}/{} Loss {}".format(
        trainer.global_forward_batches + 1, trainer.n_batches,
        outputs['loss'].item()))

trainer_on_every = Trainer(
    model=model, train_dataloader=train_dataloader, optimizers=adam,
    device='cpu', n_epochs=2
)
trainer_on_every.run()

Output()

&emsp;&emsp;`once` 参数则是在触发到第 `once` 次才执行一次，且仅执行这一次。比如我们令 `once=2`，可以发现只有在第二次迭代时才会进行输出。

In [12]:
@Trainer.on(Event.on_before_backward(once=2))
def print_loss_once(trainer, outputs):
    print("Total Batches {}/{} Loss {}".format(
        trainer.global_forward_batches + 1, trainer.n_batches,
        outputs['loss'].item()))

trainer_on_once = Trainer(
    model=model, train_dataloader=train_dataloader, optimizers=adam,
    device='cpu', n_epochs=2
)
trainer_on_once.run()

Output()

&emsp;&emsp;`filter_fn` 参数则更加复杂。它接受两个参数 `filter` 和 `trainer`，前者是一个 `Filter` 对象，包含了 `num_called` 和 `num_executed` 两个成员，分别代表 **触发次数** 和 **实际执行次数**，这也是 `fastNLP` 实现这种回调机制的方法。使用 `filter_fn` 函数您可以更加自由地控制回调机制发生的时机。下面代码等效于 `every=2`：

In [13]:
def filter_fn(filter, trainer):
    return filter.num_called % 2 == 0

@Trainer.on(Event.on_before_backward(filter_fn=filter_fn))
def print_loss_filter(trainer, outputs):
    print("Total Batches {}/{} Loss {}".format(
        trainer.global_forward_batches + 1, trainer.n_batches,
        outputs['loss'].item()))

trainer_on_filter = Trainer(
    model=model, train_dataloader=train_dataloader, optimizers=adam,
    device='cpu', n_epochs=2
)
trainer_on_filter.run()

Output()

&emsp;&emsp;需要提醒您的是，以上三个参数是互斥的，如果同时设置了多个参数，那么 `fastNLP` 会按照 `every`、`once`、`_filter_fn` 的优先顺序进行设置。

&emsp;&emsp;还有一点需要注意，当代码内有多个 `Trainer` 存在时，`Trainer.on` 修饰的函数会绑定到下方距离它最近的 `Trainer` 实例上。如果您想要调整绑定的对象可以通过设置 `Trainer.on` 的 `marker` 参数来实现，这里就不赘述了，详情可以查看文档。

&emsp;&emsp;删除变量并释放内存。

In [14]:
import gc

del trainer
del trainer_on_every
del trainer_on_once
del trainer_on_filter
del model
del adam

gc.collect()

55