#### 本篇博客希望展示如何基于transformers提供的功能进行模型的开发，减少代码量，提高开发速度。

# 定义全局变量

In [1]:
CLUENER_DATASET_DIR = "../input/business-privacy-identify1/Business_Privacy_Identify/data/clue"

# 数据集路径
DATASET_DIR = CLUENER_DATASET_DIR

# BERT 模型   "bert-base-chinese"
BERT_MODEL_NAME = "hfl/chinese-roberta-wwm-ext"


In [2]:
import torch
import warnings
import torch.nn as nn
import numpy as np
import json

from torch import Tensor
from typing import List, Dict
from dataclasses import dataclass, field
from torch.nn.utils.rnn import pad_sequence
from torch.utils.data import Dataset, DataLoader
from transformers.file_utils import logger, logging
from transformers.trainer_utils import EvalPrediction
from transformers.modeling_outputs import TokenClassifierOutput
from sklearn.metrics import f1_score, precision_score, recall_score
from transformers import TrainingArguments, Trainer, BertTokenizer, BertModel, BertPreTrainedModel

warnings.filterwarnings("ignore")

### 一、定义参数

In [3]:
@dataclass
class ModelArguments:
    use_lstm: bool = field(default=True, metadata={"help": "是否使用LSTM"})
    lstm_hidden_size: int = field(default=500, metadata={"help": "LSTM隐藏层输出的维度"})
    lstm_layers: int = field(default=1, metadata={"help": "堆叠LSTM的层数"})
    lstm_dropout: float = field(default=0.5, metadata={"help": "LSTM的dropout"})
    hidden_dropout: float = field(default=0.5, metadata={"help": "预训练模型输出向量表示的dropout"})
    ner_num_labels: int = field(default=34, metadata={"help": "需要预测的标签数量"})


@dataclass
class OurTrainingArguments:
    checkpoint_dir: str = field(default="./models/checkpoints", metadata={"help": "训练过程中的checkpoints的保存路径"})
    best_dir: str = field(default="./models/best", metadata={"help": "最优模型的保存路径"})
    do_eval: bool = field(default=True, metadata={"help": "是否在训练时进行评估"})
    do_predict: bool = field(default=True, metadata={"help": "是否在训练时进行预测"})
    epoch: int = field(default=5, metadata={"help": "训练的epoch"})
    train_batch_size: int = field(default=8, metadata={"help": "训练时的batch size"})
    eval_batch_size: int = field(default=8, metadata={"help": "评估时的batch size"})
    bert_model_name: str = field(default=BERT_MODEL_NAME, metadata={"help": "BERT模型名称"})


@dataclass
class DataArguments:
    train_file: str = field(default=DATASET_DIR +"/train.json", metadata={"help": "训练数据的路径"})
    dev_file: str = field(default=DATASET_DIR +"/dev.json", metadata={"help": "测试数据的路径"})
    test_file: str = field(default=DATASET_DIR +"/test.json", metadata={"help": "测试数据的路径"})

### 二、读取数据

这里定义了一个用于保存数据的数据结构，这样的方法能够提高代码的可阅读性。

In [4]:
@dataclass
class Example:
    text: List[str] # ner的文本
    label: List[str] = None # ner的标签

    def __post_init__(self):
        if self.label:
            assert len(self.text) == len(self.label)

定义将文件中的ner数据保存为Example列表的函数

In [5]:
# 读取数据集:json 格式
def read_json(input_file):
    """read dataset """
    lines = []
    with open(input_file, 'r') as f:
        for line in f:
            line = json.loads(line.strip())
            text = line['text']
            label_entities = line.get('label', None)
            words = list(text)
            labels = ['O'] * len(words)
            if label_entities is not None:
                for key, value in label_entities.items():
                    for sub_name, sub_index in value.items():
                        for start_index, end_index in sub_index:
                            assert ''.join(words[start_index:end_index + 1]) == sub_name
                            if start_index == end_index:
                                labels[start_index] = 'S-' + key
                            else:
                                labels[start_index] = 'B-' + key
                                labels[start_index + 1:end_index + 1] = ['I-' + key] * (len(sub_name) - 1)
            lines.append({"words": words, "labels": labels})
    return lines

def read_dataset_json(input_file):
    """ 读取数据集:json 格式  """
    examples = []
    lines = read_json(input_file)
    for line in lines:
        examples.append(Example(line["words"], line["labels"]))

    return examples

# 读取数据集:json 格式
def read_dataset_txt(input_file):
    """read dataset """
    examples = []
    with open(input_file, "r", encoding="utf-8") as file:
        text = []
        label = []
        for line in file:
            line = line.strip()
            # 一条文本结束
            if len(line) == 0:
                examples.append(Example(text, label))
                text = []
                label = []
                continue
            text.append(line.split()[0])
            label.append(line.split()[1])
    return examples

In [6]:


def read_data(path, data_type="json"):
    examples = None
    if data_type == 'txt':
        examples = read_dataset_txt(path)
    elif data_type == "json":
        examples = read_dataset_json(path)

    return examples

train_data = read_data(DATASET_DIR +"/train.json")
eval_data = read_data(DATASET_DIR +"/dev.json")
print(train_data[0])


for i in range(10):
  print(train_data[i])

Example(text=['浙', '商', '银', '行', '企', '业', '信', '贷', '部', '叶', '老', '桂', '博', '士', '则', '从', '另', '一', '个', '角', '度', '对', '五', '道', '门', '槛', '进', '行', '了', '解', '读', '。', '叶', '老', '桂', '认', '为', '，', '对', '目', '前', '国', '内', '商', '业', '银', '行', '而', '言', '，'], label=['B-company', 'I-company', 'I-company', 'I-company', 'O', 'O', 'O', 'O', 'O', 'B-name', 'I-name', 'I-name', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O'])
Example(text=['浙', '商', '银', '行', '企', '业', '信', '贷', '部', '叶', '老', '桂', '博', '士', '则', '从', '另', '一', '个', '角', '度', '对', '五', '道', '门', '槛', '进', '行', '了', '解', '读', '。', '叶', '老', '桂', '认', '为', '，', '对', '目', '前', '国', '内', '商', '业', '银', '行', '而', '言', '，'], label=['B-company', 'I-company', 'I-company', 'I-company', 'O', 'O', 'O', 'O', 'O', 'B-name', 'I-name', 'I-name', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', '

加载标签数据并分配对于的id

In [7]:
def get_labels_from_list():
    "CLUENER TAGS"
    return ["<pad>", "B-address", "B-book", "B-company", 'B-game', 'B-government', 'B-movie', 'B-name',
            'B-organization', 'B-position', 'B-scene', "I-address",
            "I-book", "I-company", 'I-game', 'I-government', 'I-movie', 'I-name',
            'I-organization', 'I-position', 'I-scene',
            "S-address", "S-book", "S-company", 'S-game', 'S-government', 'S-movie',
            'S-name', 'S-organization', 'S-position',
            'S-scene', 'O', "<start>", "<eos>"]


def load_tag_from_file(path):
    with open(path, "r", encoding="utf-8") as file:
        lines = file.readlines()
        tag2id = {tag.strip(): idx for idx, tag in enumerate(lines)}
        id2tag = {idx: tag for tag, idx in tag2id.items()}
    return tag2id, id2tag


def load_tag(path=None):
    if path is not None:
        tag2id, id2tag = load_tag_from_file(path)
    else:
        tags = get_labels_from_list()

        id2tag = {i: label for i, label in enumerate(tags)}
        tag2id = {label: i for i, label in enumerate(tags)}

    return tag2id, id2tag


tag2id, id2tag = load_tag()
print(tag2id)
print(id2tag)

{'<pad>': 0, 'B-address': 1, 'B-book': 2, 'B-company': 3, 'B-game': 4, 'B-government': 5, 'B-movie': 6, 'B-name': 7, 'B-organization': 8, 'B-position': 9, 'B-scene': 10, 'I-address': 11, 'I-book': 12, 'I-company': 13, 'I-game': 14, 'I-government': 15, 'I-movie': 16, 'I-name': 17, 'I-organization': 18, 'I-position': 19, 'I-scene': 20, 'S-address': 21, 'S-book': 22, 'S-company': 23, 'S-game': 24, 'S-government': 25, 'S-movie': 26, 'S-name': 27, 'S-organization': 28, 'S-position': 29, 'S-scene': 30, 'O': 31, '<start>': 32, '<eos>': 33}
{0: '<pad>', 1: 'B-address', 2: 'B-book', 3: 'B-company', 4: 'B-game', 5: 'B-government', 6: 'B-movie', 7: 'B-name', 8: 'B-organization', 9: 'B-position', 10: 'B-scene', 11: 'I-address', 12: 'I-book', 13: 'I-company', 14: 'I-game', 15: 'I-government', 16: 'I-movie', 17: 'I-name', 18: 'I-organization', 19: 'I-position', 20: 'I-scene', 21: 'S-address', 22: 'S-book', 23: 'S-company', 24: 'S-game', 25: 'S-government', 26: 'S-movie', 27: 'S-name', 28: 'S-organiz

读取tokenizer

In [8]:
tokenizer = BertTokenizer.from_pretrained(BERT_MODEL_NAME)

Downloading:   0%|          | 0.00/110k [00:00<?, ?B/s]

Downloading:   0%|          | 0.00/2.00 [00:00<?, ?B/s]

Downloading:   0%|          | 0.00/112 [00:00<?, ?B/s]

Downloading:   0%|          | 0.00/19.0 [00:00<?, ?B/s]

Downloading:   0%|          | 0.00/269k [00:00<?, ?B/s]

### 三、构建Dataset和collate_fn

构建Dataset

In [9]:
class NERDataset(Dataset):
    def __init__(self, examples: List[Example], max_length=128,tokenizer=BertTokenizer.from_pretrained(BERT_MODEL_NAME)):
        self.max_length = 512 if max_length > 512 else max_length
        """
        1. 将文本的长度控制在max_length - 2，减2的原因是为[CLS]和[SEP]空出位置； 
        2. 将文本转换为id序列；
        3. 将id序列转换为Tensor；
        """
        self.texts = [torch.LongTensor(tokenizer.encode(example.text[: self.max_length - 2])) for example in examples]
        self.labels = []
        for example in examples:
            label = example.label
            """
            1. 将字符的label转换为对于的id；
            2. 控制label的最长长度；
            3. 添加开始位置和结束位置对应的标签，这里<start>对应输入中的[CLS],<eos>对于[SEP]；
            4. 转换为Tensor；
            """
            label = [tag2id["<start>"]] + [tag2id[l] for l in label][: self.max_length - 2] + [tag2id["<eos>"]]
            self.labels.append(torch.LongTensor(label))
        assert len(self.texts) == len(self.labels)
        for text, label in zip(self.texts, self.labels):
            assert len(text) == len(label)

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, item):
        return {
            "input_ids": self.texts[item],
            "labels": self.labels[item]
        }

train_dataset = NERDataset(train_data)
eval_dataset = NERDataset(eval_data)
print(train_dataset[0])

{'input_ids': tensor([ 101, 3851, 1555, 7213, 6121,  821,  689,  928, 6587, 6956, 1383, 5439,
        3424, 1300, 1894, 1156,  794, 1369,  671,  702, 6235, 2428, 2190,  758,
        6887, 7305, 3546, 6822, 6121,  749, 6237, 6438,  511, 1383, 5439, 3424,
        6371,  711, 8024, 2190, 4680, 1184, 1744, 1079, 1555,  689, 7213, 6121,
        5445, 6241, 8024,  102]), 'labels': tensor([32,  3, 13, 13, 13, 31, 31, 31, 31, 31,  7, 17, 17, 31, 31, 31, 31, 31,
        31, 31, 31, 31, 31, 31, 31, 31, 31, 31, 31, 31, 31, 31, 31, 31, 31, 31,
        31, 31, 31, 31, 31, 31, 31, 31, 31, 31, 31, 31, 31, 31, 31, 33])}


定义collate_fn，collate_fn的作用在Dataloader生成batch数据时会被调用。
这里的作用是对每个batch进行padding

In [10]:
def collate_fn(features) -> Dict[str, Tensor]:
    batch_input_ids = [feature["input_ids"] for feature in features]
    batch_labels = [feature["labels"] for feature in features]
    batch_attentiton_mask = [torch.ones_like(feature["input_ids"]) for feature in features]
    # padding
    batch_input_ids = pad_sequence(batch_input_ids, batch_first=True, padding_value=tokenizer.pad_token_id)
    batch_labels = pad_sequence(batch_labels, batch_first=True, padding_value=tag2id["<pad>"])
    batch_attentiton_mask = pad_sequence(batch_attentiton_mask, batch_first=True, padding_value=0)
    assert batch_input_ids.shape == batch_labels.shape
    return {"input_ids": batch_input_ids, "labels": batch_labels, "attention_mask": batch_attentiton_mask}

测试一下collate_fn

In [11]:
dataloader = DataLoader(train_dataset, shuffle=True, batch_size=2, collate_fn=collate_fn)
batch = next(iter(dataloader))
print(batch.keys())
print(type(batch["input_ids"]))
print(batch["input_ids"].shape)
print(type(batch["labels"]))
print(batch["labels"].shape)
print(type(batch["attention_mask"]))
print(batch["attention_mask"].shape)

dict_keys(['input_ids', 'labels', 'attention_mask'])
<class 'torch.Tensor'>
torch.Size([2, 45])
<class 'torch.Tensor'>
torch.Size([2, 45])
<class 'torch.Tensor'>
torch.Size([2, 45])


### 四、定义一个评估函数

In [12]:

def ner_metrics(eval_output: EvalPrediction) -> Dict[str, float]:
    """
    该函数是回调函数，Trainer会在进行评估时调用该函数。
    (如果使用Pycharm等IDE进行调试，可以使用断点的方法来调试该函数，该函数在进行评估时被调用)
    """
    preds = eval_output.predictions
    preds = np.argmax(preds, axis=-1).flatten()
    labels = eval_output.label_ids.flatten()
    # labels为0表示为<pad>，因此计算时需要去掉该部分
    mask = labels != 0
    preds = preds[mask]
    labels = labels[mask]
    metrics = dict()
    metrics["f1"] = f1_score(labels, preds, average="macro")
    metrics["precision"] = precision_score(labels, preds, average="macro")
    metrics["recall"] = recall_score(labels, preds, average="macro")
    # 必须以字典的形式返回，后面会用到字典的key
    return metrics

### 五、构建模型
+ 自定义的模型需要继承BertPreTrainedModel

In [13]:
class BertForNER(BertPreTrainedModel):
    def __init__(self, config, *model_args, **model_kargs):
        super().__init__(config) # 初始化父类(必要的步骤)
        if "model_args" in model_kargs:
            model_args = model_kargs["model_args"]
            """
            必须将额外的参数更新至self.config中，这样在调用save_model保存模型时才会将这些参数保存；
            这种在使用from_pretrained方法加载模型时才不会出错；
            """
            self.config.__dict__.update(model_args.__dict__)
        self.num_labels = self.config.ner_num_labels
        self.bert = BertModel(config, add_pooling_layer=False)
        self.dropout = nn.Dropout(self.config.hidden_dropout)
        self.lstm = nn.LSTM(self.config.hidden_size, # 输入的维度
                            self.config.lstm_hidden_size, # 输出维度
                            num_layers=self.config.lstm_layers, # 堆叠lstm的层数
                            dropout=self.config.lstm_dropout,
                            bidirectional=True, # 是否双向
                            batch_first=True)
        if self.config.use_lstm:
            self.classifier = nn.Linear(self.config.lstm_hidden_size * 2, self.num_labels)
        else:
            self.classifier = nn.Linear(self.config.hidden_size, self.num_labels)
        self.init_weights()

    def forward(
            self,
            input_ids=None,
            attention_mask=None,
            token_type_ids=None,
            position_ids=None,
            head_mask=None,
            inputs_embeds=None,
            labels=None,
            pos=None,
            output_attentions=None,
            output_hidden_states=None,
            return_dict=None,
    ):
        return_dict = return_dict if return_dict is not None else self.config.use_return_dict
        outputs = self.bert(
            input_ids,
            attention_mask=attention_mask,
            token_type_ids=token_type_ids,
            position_ids=position_ids,
            head_mask=head_mask,
            inputs_embeds=inputs_embeds,
            output_attentions=output_attentions,
            output_hidden_states=output_hidden_states,
            return_dict=return_dict,
        )
        sequence_output = self.dropout(outputs[0])
        if self.config.use_lstm:
            sequence_output, _ = self.lstm(sequence_output)
        logits = self.classifier(sequence_output)

        loss = None
        if labels is not None:
            loss_fct = nn.CrossEntropyLoss()
            # 如果attention_mask不为空，则只计算attention_mask中为1部分的Loss
            if attention_mask is not None:
                active_loss = attention_mask.view(-1) == 1
                active_logits = logits.view(-1, self.num_labels)
                active_labels = torch.where(
                    active_loss, labels.view(-1), torch.tensor(loss_fct.ignore_index).type_as(labels)
                )
                loss = loss_fct(active_logits, active_labels)
            else:
                loss = loss_fct(logits.view(-1, self.num_labels), labels.view(-1))

        if not return_dict:
            output = (logits,) + outputs[2:]
            return ((loss,) + output) if loss is not None else output

        return TokenClassifierOutput(
            loss=loss,
            logits=logits, # 该部分在评估时，会作为EvalPrediction对象的predictions进行返回
            hidden_states=outputs.hidden_states,
            attentions=outputs.attentions,
        )

测试一下模型是否符合预期

In [14]:
model_args = ModelArguments(use_lstm=True)
model = BertForNER.from_pretrained(BERT_MODEL_NAME, model_args=model_args)
output = model(**batch)
print(type(output))
print(output.loss)
print(output.logits.shape)

Downloading:   0%|          | 0.00/689 [00:00<?, ?B/s]

Downloading:   0%|          | 0.00/412M [00:00<?, ?B/s]

Some weights of the model checkpoint at hfl/chinese-roberta-wwm-ext were not used when initializing BertForNER: ['cls.predictions.transform.LayerNorm.weight', 'bert.pooler.dense.weight', 'cls.predictions.transform.LayerNorm.bias', 'cls.predictions.decoder.weight', 'cls.predictions.transform.dense.bias', 'bert.pooler.dense.bias', 'cls.seq_relationship.bias', 'cls.predictions.bias', 'cls.seq_relationship.weight', 'cls.predictions.transform.dense.weight']
- This IS expected if you are initializing BertForNER from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing BertForNER from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).
Some weights of BertForNER were not initialized from the model checkpoint at hfl/chinese

<class 'transformers.modeling_outputs.TokenClassifierOutput'>
tensor(3.4761, grad_fn=<NllLossBackward>)
torch.Size([2, 45, 34])


### 六、模型训练

In [15]:
def run(model_args: ModelArguments, data_args: DataArguments, args: OurTrainingArguments):
    # 设定训练参数
    training_args = TrainingArguments(output_dir=args.checkpoint_dir,  # 训练中的checkpoint保存的位置
                                      num_train_epochs=args.epoch,
                                      do_eval=args.do_eval,  # 是否进行评估
                                      evaluation_strategy="epoch",  # 每个epoch结束后进行评估
                                      per_device_train_batch_size=args.train_batch_size,
                                      per_device_eval_batch_size=args.eval_batch_size,
                                      load_best_model_at_end=True,  # 训练完成后加载最优模型
                                      metric_for_best_model="f1"  # 评估最优模型的指标，该指标是ner_metrics返回评估指标中的key
                                      )
     # 构建分词器
    tokenizer = BertTokenizer.from_pretrained(args.bert_model_name)
    
    # 构建dataset
    train_dataset = NERDataset(read_data(data_args.train_file), tokenizer=tokenizer)
    eval_dataset = NERDataset(read_data(data_args.dev_file), tokenizer=tokenizer)
    test_dataset = NERDataset(read_data(data_args.test_file), tokenizer=tokenizer)
    
    # 加载预训练模型
    model = BertForNER.from_pretrained(args.bert_model_name, model_args=model_args)
    # 初始化Trainer
    trainer = Trainer(model=model,
                      args=training_args,
                      train_dataset=train_dataset,
                      eval_dataset=eval_dataset,
                      tokenizer=tokenizer,
                      data_collator=collate_fn,
                      compute_metrics=ner_metrics)
    # 模型训练
    trainer.train()
    # 训练完成后，加载最优模型并进行评估
    logger.info(trainer.evaluate(eval_dataset))
    # 保存训练好的模型
    trainer.save_model(args.best_dir)
    
    # 进行预测
    logger.info(trainer.predict(test_dataset))
    



In [18]:
def main():
    # 定义各类参数并训练模型
    model_args = ModelArguments(use_lstm=True)
    data_args = DataArguments()
    training_args = OurTrainingArguments(bert_model_name="hfl/chinese-roberta-wwm-ext",epoch=5,
                                         train_batch_size=128, eval_batch_size=128)
    run(model_args, data_args, training_args)

In [19]:
main()

Some weights of the model checkpoint at hfl/chinese-roberta-wwm-ext were not used when initializing BertForNER: ['cls.predictions.transform.LayerNorm.weight', 'bert.pooler.dense.weight', 'cls.predictions.transform.LayerNorm.bias', 'cls.predictions.decoder.weight', 'cls.predictions.transform.dense.bias', 'bert.pooler.dense.bias', 'cls.seq_relationship.bias', 'cls.predictions.bias', 'cls.seq_relationship.weight', 'cls.predictions.transform.dense.weight']
- This IS expected if you are initializing BertForNER from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing BertForNER from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).
Some weights of BertForNER were not initialized from the model checkpoint at hfl/chinese

Epoch,Training Loss,Validation Loss,F1,Precision,Recall
1,No log,0.452647,0.364199,0.436083,0.390126
2,No log,0.256539,0.713568,0.768139,0.701887
3,No log,0.230775,0.774396,0.766954,0.786128
4,No log,0.211439,0.792071,0.784034,0.80217
5,No log,0.220745,0.786049,0.781213,0.793653
6,0.342100,0.238769,0.781475,0.780459,0.784691
7,0.342100,0.248653,0.783276,0.774141,0.793234
8,0.342100,0.254722,0.785502,0.777876,0.795465
9,0.342100,0.264056,0.78102,0.774723,0.789593
10,0.342100,0.263541,0.783215,0.774956,0.792627
