# 使用 fastNLP 完成翻译任务

&emsp;&emsp;本篇教程将为您详细展示如何使用 `fastNLP` 使用 `de-en` 数据集进行机器翻译任务。您可以使用 `fastNLP` 的各个组件快捷/方便地完成翻译任务，达到出色的效果。 在阅读这篇教程前，希望您已经熟悉了 `fastNLP` 的基础使用，尤其是数据的载入以及模型的构建。通过这个小任务，能让您进一步熟悉 `fastNLP` 的使用。

&emsp;&emsp;**本教程推荐使用 GPU 进行实验**

   

## 1.载入数据

&emsp;&emsp;在本篇教程中，我们使用 `de-en` 数据集来完成翻译任务，其中包含六个文件，其中，训练集有两个文件，分别为 `train.de` 以及 `train.en`，分别存储着英语和德语数据。每一行都是一句英语或者德语语料，并且互相映射。总数达16万。
   
&emsp;&emsp;英语训练集的格式如下

&emsp;&emsp;德语训练集的格式如下

### 1.1 load

&emsp;&emsp;继承自 `Loader` 基类，我们自定义 `TranslationLoader` 数据加载类，通过 `load` 函数，将文件夹中的数据集取出，并按照英文和德文一一映射的关系，每一对映射都生成一个 `Instance` 存入对应的 `DataSet` 中，并最终返回 `data_bundle`。

In [1]:
from fastNLP.io import Loader
import fastNLP

class TranslationLoader(Loader):
    """
    加载的Translation数据都是已经通过BPE tokenize好的
    """
    def __init__(self):
        super().__init__()

    def load(self, paths=None):
        """
        返回的DataBundle中包含train, dev, test三个DataSet，每个DataSet中的内容
            lg1             lg2
            'xxx xxx...'    'xxx xxx ...'

        :param paths:
        :return:
        """
        print(paths)
        assert os.path.exists(paths) and os.path.isdir(paths)
        data_bundle = DataBundle()
        data = defaultdict(dict)

        def read_list(fn):
            l = []
            with open(os.path.join(paths, fn), 'r', encoding='utf-8') as f:
                for line in f:
                    l.append(line.strip())
            return l

        for fn in os.listdir(paths):
            lg = fn.split('.')[-1]  # language
            if fn.startswith('train'):
                l = read_list(fn)
                data['train'][lg] = l
            elif fn.startswith('valid'):
                l = read_list(fn)
                data['dev'][lg] = l
            elif fn.startswith('test'):
                l = read_list(fn)
                data['test'][lg] = l

        for split, lgs in data.items():
            names = list(lgs.keys())
            ls = list(lgs.values())
            ds = DataSet()
            assert len(set(map(len, ls)))==1
            for sent1, sent2 in zip(*ls):
                ins = Instance(**{f'{names[0]}':sent1, f'{names[1]}':sent2})
                ds.append(ins)
            data_bundle.set_dataset(ds, name=split)

        return data_bundle

&emsp;&emsp;我们无法直接运算文本数据，所以通过 `TranslationPipe` 类对于 `data_bundle` 进行进一步处理。

&emsp;&emsp;其中，使用了 `fastNLP` 提供的 `Vocabulary` 模块来构建词汇表，通过 `from_dataset` 方法从 `dataset` 的指定字段中获取字段中的所有元素，然后编号。

&emsp;&emsp;之后，**通过 vocabulary 的 index_dataset 方法**，**调整 dataset 中指定字段的元素**，**使用编号将之代替**。


In [2]:
from fastNLP.io import Pipe, DataBundle
from fastNLP import DataSet, Instance, Vocabulary
import os
from collections import defaultdict
class TranslationPipe(Pipe):
    def __init__(self, merge_vocab=False, target_lg='de'):
        """

        :param bool merge_vocab: 是否把两个语言的vocab合在一起
        :param str target_lg: 哪个语言作为目标语言
        """
        self.target_lg = target_lg
        self.merge_vocab = merge_vocab

    def process(self, data_bundle: DataBundle) -> DataBundle:
        for name, ds in data_bundle.iter_datasets():
            assert ds.has_field(self.target_lg)
            for field_name in ds.get_field_names():
                ds.apply_field(lambda x:x.split(), field_name=field_name, new_field_name='tgt_tokens'
                               if field_name==self.target_lg else 'src_tokens')

        # 创建, index vocab
        tgt_vocab = Vocabulary(unknown='UNKNOWN')
        tgt_vocab.add_word_lst(['<SOS>', '<EOS>'])
        tgt_vocab.from_dataset(data_bundle.get_dataset('train'),
                               field_name='tgt_tokens')
        if self.merge_vocab:
            src_vocab = tgt_vocab
        else:
            src_vocab = Vocabulary()
        src_vocab.from_dataset(data_bundle.get_dataset('train'),
                               field_name='src_tokens')
        tgt_vocab.index_dataset(*data_bundle.datasets.values(), field_name='tgt_tokens')
        src_vocab.index_dataset(*data_bundle.datasets.values(), field_name='src_tokens')

        data_bundle.apply_field(lambda x:[tgt_vocab.to_index('<SOS>')] + x + [tgt_vocab.to_index('<EOS>')],
                                field_name='tgt_tokens', new_field_name='tgt_tokens')

        data_bundle.set_vocab(tgt_vocab, 'tgt_tokens')
        data_bundle.set_vocab(src_vocab, 'src_tokens')

        data_bundle.apply_field(lambda x:len(x), field_name='tgt_tokens', new_field_name='tgt_seq_len')
        data_bundle.apply_field(lambda x:len(x), field_name='src_tokens', new_field_name='src_seq_len')
        return data_bundle

    def process_from_file(self, paths) -> DataBundle:
        data_bundle = TranslationLoader().load(paths)
        return self.process(data_bundle)

&emsp;&emsp;加载 `data_bundle` 和 `vocabulary`。

&emsp;&emsp;**cache_results** 函数是 **fastNLP** 中用于缓存数据的装饰器，通过该函数您可以省去调试代码过程中一些耗时过长程序带来的时间开销。

In [3]:
from fastNLP import cache_results
@cache_results('caches/data2.pkl')
def get_data():
    data_bundle = TranslationPipe().process_from_file('./data/de-en2')
    return data_bundle


data_bundle = get_data()

#
tgt_vocab = data_bundle.get_vocab('tgt_tokens')

&emsp;&emsp;这里我们使用了自定义的 `Metric`。对于**自定义的 metric 类型**也**需要继承自 Metric 类**，同时**内部自定义好 init 、 update 和 get_metric 函数**。

&emsp;&emsp;在这里我们自定义了 `BLEUMetric`，并在 `get_metric` 中返回 `bleu`。

In [4]:
from fastNLP import Metric
import sacrebleu

class BLEUMetric(Metric):
    def __init__(self, vocab, eos_index, bpe_indicator='@@'):
        super().__init__()
        self.vocab = vocab
        self.eos_index = eos_index
        self.bpe_indicator= bpe_indicator
        self.goldens = []
        self.preds = []
        self.get_golden = True

    def update(self, tgt_tokens, tgt_seq_len, pred):
        """
        :param tgt_tokens: bsz x max_len (构成为[<SOS>] + [tokens] + [<EOS>])
        :param tgt_seq_len: bsz
        :param pred: bsz x max_len' (构成为[<SOS>] + [tokens] + [<EOS>])
        :return:
        """
        for i in range(tgt_tokens.size(0)):
            self.goldens.append(' '.join(map(self.vocab.to_word, tgt_tokens[i, 1:tgt_seq_len[i]-1].tolist())).replace(f'{self.bpe_indicator} ', ''))

        for i in range(pred.size(0)):
            words = []
            for idx in pred[i, 1:].tolist():
                if idx==self.eos_index:
                    break
                words.append(self.vocab.to_word(idx))
            self.preds.append(' '.join(words).replace(f'{self.bpe_indicator} ', ''))

    def get_metric(self, reset=True):
        bleu = sacrebleu.corpus_bleu(self.preds, [self.goldens], force=True)
        if reset:
            self.preds = []
            self.goldens = []
        return {'bleu': bleu.score}


## 2.模型创建与运行

&emsp;&emsp;首先使用 `StaticEmbedding` 类对与原始的 token 进行词嵌入，`embedding_size` 大小为512。

&emsp;&emsp;之后，通过 `TransformerSeq2SeqModel` 生成 model，该 model 包括一个编码器和一个解码器。

&emsp;&emsp;第三步，通过 `SequenceGeneratorModel` 来封装之前生成的 seq2seq model，这使得其既可以用于训练也可以用于预测。

&emsp;&emsp;训练的时候，本模型的 `forward` 函数会被调用，生成的时候本模型的 `predict` 函数会被调用。

In [5]:
from fastNLP.models.torch import TransformerSeq2SeqModel, SequenceGeneratorModel
from fastNLP.embeddings.torch import StaticEmbedding
dropout = 0.3
num_beams = 5
do_sample = True
src_embed = StaticEmbedding(data_bundle.get_vocab('src_tokens'), embedding_dim=512, model_dir_or_name=None)
tgt_embed = StaticEmbedding(data_bundle.get_vocab('tgt_tokens'), embedding_dim=512, model_dir_or_name=None)
seq2seq_model = TransformerSeq2SeqModel.build_model(src_embed, tgt_embed=tgt_embed,
                                                    pos_embed='sin', max_position=1024, num_layers=6, d_model=512,
                                                    n_head=4, dim_ff=1024, dropout=dropout,
                                                    bind_encoder_decoder_embed=False,
                                                    bind_decoder_input_output_embed=True)
# #
model = SequenceGeneratorModel(seq2seq_model, bos_token_id=tgt_vocab.to_index('<SOS>'),
                               eos_token_id=tgt_vocab.to_index('<EOS>'), max_length=10, max_len_a=1.2,
                               num_beams=num_beams,
                               do_sample=do_sample, temperature=1.0, top_k=50, top_p=1.0,
                               repetition_penalty=1, length_penalty=1.0, pad_token_id=0)


&emsp;&emsp;准备 `train` 和 `val` 的 `DataSet`，并构建 `DataLoader`，其中，我们指定 `batch_size` 大小为 32，`evaluate_dataloader` 只取前两百条 `sentence` 进行验证。

In [6]:
from fastNLP import TorchDataLoader

tr_data = data_bundle.get_dataset('train')
val_data = data_bundle.get_dataset("dev")

train_dataloader = TorchDataLoader(tr_data, batch_size=32, sampler=None, num_workers=0, pin_memory=False,
                                   drop_last=False, timeout=0, worker_init_fn=None,)

evaluate_dataloader = TorchDataLoader(val_data[:200], batch_size=32, sampler=None, num_workers=0, pin_memory=False,
                                      drop_last=False, timeout=0, worker_init_fn=None,)



&emsp;&emsp;定义优化器，定义 `Callback` 函数，定义训练模块 `Trainer`，以每 1000 个批次为一轮验证一次模型。一个 `epoch` 为 5000 个批次。

&emsp;&emsp;在 `Callback` 上我们选用  `TorchWarmupCallback` 。其能够对学习率进行预热，对于复杂模型很有效。

In [9]:
from fastNLP import Trainer, TorchWarmupCallback
from torch import optim
lr = 5e-4
optimizer = optim.AdamW(model.parameters(), lr=lr, betas=(0.9, 0.98), weight_decay=1e-4)

callbacks = [TorchWarmupCallback(warmup=4000, schedule='linear')]

trainer = Trainer(
    model=model,
    driver="torch",
    device=6,
    n_epochs=5,
    optimizers=optimizer,
    train_dataloader=train_dataloader,
    evaluate_dataloaders=evaluate_dataloader,
    evaluate_every=1000,
    callbacks=callbacks,
    validate_every=-1,
    save_path=None,
    use_tqdm=True,
    metrics={"bleu": BLEUMetric(vocab=tgt_vocab, eos_index=tgt_vocab.to_index('<EOS>'))},
)

&emsp;&emsp;使用 `trainer.run` 方法，训练模型，`n_epochs` 参数中已经指定需要迭代 50 轮。

In [None]:
trainer.run()

Output()

Output()

In [11]:
trainer.evaluator.run()

Output()

{'bleu#bleu': 22.085335376378072}