# Chapter6-2

## CRFを使ってラベル間の遷移可能性を学習

今回は直鎖CRFを使用。

$\rightarrow$隣接した2つのラベル間の遷移のみを考慮する

### ライブラリのインストール

In [None]:
!pip install datasets transformers[ja,torch] spacy-alignments seqeval pytorch-crf

Collecting datasets
  Downloading datasets-2.19.1-py3-none-any.whl (542 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/542.0 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[90m╺[0m [32m532.5/542.0 kB[0m [31m19.1 MB/s[0m eta [36m0:00:01[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m542.0/542.0 kB[0m [31m15.0 MB/s[0m eta [36m0:00:00[0m
Collecting spacy-alignments
  Downloading spacy_alignments-0.9.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (313 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m314.0/314.0 kB[0m [31m41.2 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting seqeval
  Downloading seqeval-1.2.2.tar.gz (43 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m43.6/43.6 kB[0m [31m7.0 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting pytorch-crf
  Downloading pytorch_crf-0.

### データセットの読み込み

In [None]:
from datasets import load_dataset

# データセットを読み込む
dataset = load_dataset("llm-book/ner-wikipedia-dataset")

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.
You can avoid this message in future by passing the argument `trust_remote_code=True`.
Passing `trust_remote_code=True` will be mandatory to load this dataset from the next major release of `datasets`.


Downloading builder script:   0%|          | 0.00/3.98k [00:00<?, ?B/s]

Downloading readme:   0%|          | 0.00/1.01k [00:00<?, ?B/s]

Downloading data:   0%|          | 0.00/641k [00:00<?, ?B/s]

Generating train split: 0 examples [00:00, ? examples/s]

Generating validation split: 0 examples [00:00, ? examples/s]

Generating test split: 0 examples [00:00, ? examples/s]

### トークナイザの取得



In [None]:
from transformers import AutoTokenizer

# トークナイザを読み込む
model_name = "cl-tohoku/bert-base-japanese-v3"
tokenizer = AutoTokenizer.from_pretrained(model_name)

tokenizer_config.json:   0%|          | 0.00/251 [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/231k [00:00<?, ?B/s]

### ラベルとIDを対応付けるdictを作成

In [None]:
from pprint import pprint
import torch

def create_label2id(
    entities_list: list[list[dict[str, str | int]]]
) -> dict[str, int]:

  """ ラベルとIDを紐づけるdictを作成 """

  label2id = {"0": 0}

  # 固有表現タイプのsetを獲得して並び替える
  entity_types = set(
      [e["type"] for entities in entities_list for e in entities]
  )
  entity_types = sorted(entity_types)

  print(f"entity_types: {entity_types}")

  for i, entity_type in enumerate(entity_types):
    label2id[f"B-{entity_type}"] = i * 2 + 1
    label2id[f"I-{entity_type}"] = i * 2 + 2

  return label2id

# ラベルIDを紐づけるdictを作成
label2id = create_label2id(dataset["train"]["entities"])
id2label = {v: k for k, v in label2id.items()}
pprint(label2id)
pprint(id2label)

entity_types: ['その他の組織名', 'イベント名', '人名', '地名', '政治的組織名', '施設名', '法人名', '製品名']
{'0': 0,
 'B-その他の組織名': 1,
 'B-イベント名': 3,
 'B-人名': 5,
 'B-地名': 7,
 'B-政治的組織名': 9,
 'B-施設名': 11,
 'B-法人名': 13,
 'B-製品名': 15,
 'I-その他の組織名': 2,
 'I-イベント名': 4,
 'I-人名': 6,
 'I-地名': 8,
 'I-政治的組織名': 10,
 'I-施設名': 12,
 'I-法人名': 14,
 'I-製品名': 16}
{0: '0',
 1: 'B-その他の組織名',
 2: 'I-その他の組織名',
 3: 'B-イベント名',
 4: 'I-イベント名',
 5: 'B-人名',
 6: 'I-人名',
 7: 'B-地名',
 8: 'I-地名',
 9: 'B-政治的組織名',
 10: 'I-政治的組織名',
 11: 'B-施設名',
 12: 'I-施設名',
 13: 'B-法人名',
 14: 'I-法人名',
 15: 'B-製品名',
 16: 'I-製品名'}


### データの前処理

In [None]:
from transformers.tokenization_utils_base import BatchEncoding
from transformers import PreTrainedTokenizer
from spacy_alignments.tokenizations import get_alignments

def preprocess_data(
    data: dict[str, any],
    tokenizer: PreTrainedTokenizer,
    label2id: dict[int, str]
) -> BatchEncoding:
  """ データの前処理 """

  # テキストのトークナイゼーション
  inputs = tokenizer(
      data["text"], return_tensors="pt", return_special_tokens_mask=True,
  )
  inputs = {k: v.squeeze(0) for k, v in inputs.items()}

  # 文字のlistとトークンのlistのアライメントをとる
  characters = list(data["text"])
  tokens = tokenizer.convert_ids_to_tokens(inputs["input_ids"])
  char_to_token_indices, _ = get_alignments(characters, tokens)

  # "0"のIDのlistを作成する
  labels = torch.zeros_like(inputs["input_ids"])
  for entity in data["entities"]:
    start_token_indices = char_to_token_indices[entity["span"][0]]
    end_token_indices = char_to_token_indices[entity["span"][1] - 1]

    # 文字に対応するトークンが存在しないときスキップ
    if len(start_token_indices) == 0 or len(end_token_indices) == 0:
      continue

    start, end = start_token_indices[0], end_token_indices[0]
    entity_type = entity["type"]

    # 固有表現開始位置に"B-"のIDを設定する
    labels[start] = label2id[f"B-{entity_type}"]

    # 固有表現開始トークン以外の位置に"I-"のIDを設定
    if start != end:
      labels[start + 1 : end + 1] = label2id[f"I-{entity_type}"]

  # 特殊トークンの位置のIDは-100とする
  labels[torch.where(inputs["special_tokens_mask"])] = -100
  inputs["labels"] = labels
  return inputs

# トレーニングセットに対して前処理
train_dataset = dataset["train"].map(
    preprocess_data,
    fn_kwargs={
        "tokenizer": tokenizer,
        "label2id" : label2id,
    },
    remove_columns=dataset["train"].column_names,
)

# 検証セットに対し
val_dataset = dataset["validation"].map(
    preprocess_data,
    fn_kwargs={
        "tokenizer": tokenizer,
        "label2id" : label2id
    },
    remove_columns=dataset["validation"].column_names
)

Parameter 'fn_kwargs'={'tokenizer': BertJapaneseTokenizer(name_or_path='cl-tohoku/bert-base-japanese-v3', vocab_size=32768, model_max_length=512, is_fast=False, padding_side='right', truncation_side='right', special_tokens={'unk_token': '[UNK]', 'sep_token': '[SEP]', 'pad_token': '[PAD]', 'cls_token': '[CLS]', 'mask_token': '[MASK]'}, clean_up_tokenization_spaces=True),  added_tokens_decoder={
	0: AddedToken("[PAD]", rstrip=False, lstrip=False, single_word=False, normalized=False, special=True),
	1: AddedToken("[UNK]", rstrip=False, lstrip=False, single_word=False, normalized=False, special=True),
	2: AddedToken("[CLS]", rstrip=False, lstrip=False, single_word=False, normalized=False, special=True),
	3: AddedToken("[SEP]", rstrip=False, lstrip=False, single_word=False, normalized=False, special=True),
	4: AddedToken("[MASK]", rstrip=False, lstrip=False, single_word=False, normalized=False, special=True),
	0: AddedToken("[PAD]", rstrip=False, lstrip=False, single_word=False, normalized=

Map:   0%|          | 0/4274 [00:00<?, ? examples/s]

Map:   0%|          | 0/534 [00:00<?, ? examples/s]

### 遷移スコアを定義

In [None]:

def create_transitions(
    label2id: dict[str, int]
) -> tuple[torch.Tensor, torch.Tensor, torch.Tensor]:

  """ 遷移スコアを定義 """
  # B-のラベルIDのリスト
  b_ids = [v for k, v in label2id.items() if k[0] == "B"]

  # I-のラベルIDのリスト
  i_ids = [v for k, v in label2id.items() if k[0] == "I"]

  # OのラベルID
  o_id = label2id["0"]

  # 開始遷移スコアを定義
  # すべてのスコアを-100で初期化
  start_transitions = torch.full([len(label2id)], -100.0)
  # すべてのラベルからB-へ遷移可能としてOを代入
  start_transitions[b_ids] = 0

  # すべてのラベルからOへ遷移可能としてOを代入
  start_transitions[o_id] = 0


  # ラベル間の遷移スコアを定義
  # 全てのスコアを-100で初期化
  transitions = torch.full([len(label2id), len(label2id)], -100.0)

  # すべてのラベルからB-へ遷移可能としてOを代入
  transitions[:, b_ids] = 0

  # すべてのラベルからOへ遷移可能としてOを代入
  transitions[:, o_id] = 0

  # B-から同じタイプのI-へ遷移可能としてOを代入
  transitions[b_ids, i_ids] = 0

  #I-から同じタイプのI-への遷移可能ととして0を代入
  transitions[i_ids, i_ids] = 0

  # 終了遷移スコアを定義
  # すべてのラベルから遷移可能としてすべてのスコアを0とする
  end_transitions = torch.zeros(len(label2id))
  return start_transitions, transitions, end_transitions

# 遷移スコアを定義
start_transitions, transitions, end_transitions = create_transitions(label2id)


### BertWithCrfForTokenClassificationを定義

In [None]:
from torchcrf import CRF
from transformers import BertForTokenClassification, PretrainedConfig
from transformers.modeling_outputs import TokenClassifierOutput
from transformers import DataCollatorForTokenClassification

class BertWithCrfForTokenClassification(BertForTokenClassification):
  """ BertForTokenClassificationにCRF層を加えたクラス """

  def __init__(self, config: PretrainedConfig):
    """ クラスを初期化 """
    super().__init__(config)
    # CRF層を定義
    self.crf = CRF(len(config.label2id), batch_first=True)

  def _init_weights(self, module: torch.nn.Module) -> None:
    """ 定義した遷移スコアでパラメータを初期化 """
    super()._init_weights(module)

    if isinstance(module, CRF):
      st, t, et = create_transitions(self.config.label2id)
      module.start_transitions.data = st
      module.transitions.data = t
      module.end_transitions.data = et

  def forward(
      self,
      input_ids: torch.Tensor,
      attention_mask: torch.Tensor = None,
      token_type_ids: torch.Tensor = None,
      labels: torch.Tensor = None,
  ) -> TokenClassifierOutput:
    """ モデルの前向き計算を定義 """

    # BertForTokenClassificationのforwardメソッドを適用して予測スコアを取得
    output = super().forward(
        input_ids=input_ids,
        attention_mask=attention_mask,
        token_type_ids=token_type_ids
    )

    if labels is not None:
      logits = output.logits
      mask = labels != -100
      labels *= mask

      # CRFによる損失を計算
      output["loss"] = -self.crf(
          logits[:, 1:, :],
          labels[:, 1:],
          mask=mask[:, 1:],
          reduction="mean"
      )

    return output

# BertForTokenClassificationにCRF層を加えたクラスを定義
model_crf = BertWithCrfForTokenClassification.from_pretrained(
    model_name, label2id=label2id, id2label=id2label
)

# collate関数にDataCollaorForTokenClassificationを使用
data_collator = DataCollatorForTokenClassification(tokenizer)

config.json:   0%|          | 0.00/472 [00:00<?, ?B/s]

pytorch_model.bin:   0%|          | 0.00/447M [00:00<?, ?B/s]

Some weights of BertWithCrfForTokenClassification were not initialized from the model checkpoint at cl-tohoku/bert-base-japanese-v3 and are newly initialized: ['classifier.bias', 'classifier.weight', 'crf.end_transitions', 'crf.start_transitions', 'crf.transitions']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


In [None]:
model_crf

BertWithCrfForTokenClassification(
  (bert): BertModel(
    (embeddings): BertEmbeddings(
      (word_embeddings): Embedding(32768, 768, padding_idx=0)
      (position_embeddings): Embedding(512, 768)
      (token_type_embeddings): Embedding(2, 768)
      (LayerNorm): LayerNorm((768,), eps=1e-12, elementwise_affine=True)
      (dropout): Dropout(p=0.1, inplace=False)
    )
    (encoder): BertEncoder(
      (layer): ModuleList(
        (0-11): 12 x BertLayer(
          (attention): BertAttention(
            (self): BertSdpaSelfAttention(
              (query): Linear(in_features=768, out_features=768, bias=True)
              (key): Linear(in_features=768, out_features=768, bias=True)
              (value): Linear(in_features=768, out_features=768, bias=True)
              (dropout): Dropout(p=0.1, inplace=False)
            )
            (output): BertSelfOutput(
              (dense): Linear(in_features=768, out_features=768, bias=True)
              (LayerNorm): LayerNorm((768,), ep

### モデルの学習

In [None]:
from transformers import Trainer, TrainingArguments
from transformers.trainer_utils import set_seed

# 乱数のシード値を再設定
set_seed(42)

# Trainerに渡す引数を初期化
training_args = TrainingArguments(
    output_dir="/content/drive/MyDrive/Learning_LLM/chapter6/bert_crf",
    per_device_train_batch_size=32,
    per_device_eval_batch_size=32,
    learning_rate=1e-4,
    lr_scheduler_type="linear",
    warmup_ratio=0.1,
    num_train_epochs=5,
    evaluation_strategy="epoch",
    save_strategy="epoch",
    logging_strategy="epoch",
    fp16=True
)

# Trainerを初期化
trainer = Trainer(
    model=model_crf,
    tokenizer=tokenizer,
    train_dataset=train_dataset,
    eval_dataset=val_dataset,
    data_collator=data_collator,
    args=training_args
)

# 学習開始
trainer.train()



Epoch,Training Loss,Validation Loss
1,21.5472,1.497271
2,1.2737,1.664914
3,0.5646,1.469021
4,0.3072,2.023351
5,0.1919,2.336819


TrainOutput(global_step=670, training_loss=4.77690212477499, metrics={'train_runtime': 474.6688, 'train_samples_per_second': 45.021, 'train_steps_per_second': 1.412, 'total_flos': 1070016473991360.0, 'train_loss': 4.77690212477499, 'epoch': 5.0})

### 固有表現ラベルの予測

In [None]:
def convert_list_dict_to_dict_list(
    list_dict: dict[str, list]
) -> list[dict[str, list]]:
  """ ミニバッチのデータを事例単位のlistに変換する """

  dict_list = []

  #キーのリストの作成
  keys = list(list_dict.keys())
  for idx in range(len(list_dict[keys[0]])):
    # dictの各キーからデータを取り出してリストに追加
    dict_list.append({key: list_dict[key][idx] for key in keys})

  return dict_list

# ミニバッチのデータを事例単位のリストに変換
list_dict = {
    "input_ids": [[0, 1], [2, 3]],
    "labels": [[1, 2], [3, 4]]
}

dict_list = convert_list_dict_to_dict_list(list_dict)
print(f"input: {list_dict}")
print(f"output: {dict_list}")

input: {'input_ids': [[0, 1], [2, 3]], 'labels': [[1, 2], [3, 4]]}
output: [{'input_ids': [0, 1], 'labels': [1, 2]}, {'input_ids': [2, 3], 'labels': [3, 4]}]


In [None]:
from torch.utils.data import DataLoader
from tqdm import tqdm
from transformers import PreTrainedModel

def run_prediction(
    dataloader: DataLoader, model: PreTrainedModel
) -> list[dict[str, any]]:
  """ 予測スコアに基づき固有表現ラベルを予測 """

  predictions = []

  # ミニバッチを処理
  for batch in tqdm(dataloader):
    inputs = {
        k: v.to(model.device)
        for k, v in batch.items()
        if k != "special_tokens_mask"
    }

    # 予測スコアを取得
    logits = model(**inputs).logits

    # 最もスコアの高いIDを取得
    batch["pred_label_ids"] = logits.argmax(-1)
    batch = {k: v.cpu().tolist() for k, v in batch.items()}

    # ミニバッチのデータを事例単位のlistに変換
    predictions += convert_list_dict_to_dict_list(batch)

  return predictions

# DataLoaderでミニバッチを作成
validation_dataloader = DataLoader(
    val_dataset,
    batch_size=32,
    shuffle=False,
    collate_fn=data_collator
)

# 固有表現ラベルを予測する
predictions = run_prediction(validation_dataloader, model_crf)
print(predictions[0]["pred_label_ids"]) # 最もスコアの高い予測IDを表示


100%|██████████| 17/17 [00:01<00:00,  8.97it/s]

[0, 0, 16, 16, 0, 0, 14, 14, 14, 0, 0, 0, 0, 0, 0, 0, 0, 0, 16, 14, 14, 14, 14, 14, 0, 0, 0, 0, 0, 15, 16, 16, 0, 14, 14, 14, 14, 14, 14, 14, 14, 14, 14, 14, 0, 0, 0, 14, 14, 14, 14, 14, 14, 14, 14, 14, 14, 0, 0, 0, 0, 16, 16, 16, 16, 0, 14, 14, 14, 14, 14, 0, 0, 0, 0, 16, 15, 16, 0, 0, 14, 14]





ビタビアルゴリズムを用いたラベル列の予測

In [None]:
def decode_with_viterbi(
    emissions: torch.Tensor, # ラベルの予測スコア
    mask: torch.Tensor, # マスク
    start_transitions: torch.Tensor, # 開始遷移スコア
    transitions: torch.Tensor, # ラベル間の遷移スコア
    end_transitions: torch.Tensor # 終了遷移スコア
) -> torch.Tensor:
  """ ビタビアルゴリズムを用いて最適なラベル列を探索 """

  # バッチサイズ、系列長を取得
  batch_size, seq_length = mask.shape

  # 予測スコア、マスクに0次元目と1次元目を入れ替え
  emissions = emissions.transpose(1, 0)
  mask = mask.transpose(1, 0)

  histories = []

  # 開始遷移スコア、予測スコアを加算。累積スコアの初期値とする
  score = start_transitions + emissions[0]

  for i in range(1, seq_length):
    # 累積スコアを3次元に変換
    broad_cast_score = score.unsqueeze(2)

    # 現在の予測スコアを3次元に変換
    broadcast_emission = emissions[i].unsqueeze(1)

    # 現在の累積スコアを取得
    next_score = (broad_cast_score + transitions + broadcast_emission)

    # 現在の累積スコアの各ラベルの最大値、インデックスを取得
    next_score, indices = next_score.max(dim=1)

    # マスクしない要素なら累積スコアを更新
    score = torch.where(mask[i].unsqueeze(1), next_score, score)

    # スコアの高いインデックスを履歴のリストに追加
    histories.append(indices)

  # 終了遷移スコアを加算、合計スコアにする
  score += end_transitions

  # 各事例で最適なラベル列を取得
  best_labels_list = []
  for i in range(batch_size):

    # 合計スコアの中で最大のスコアとなるラベルを取得
    _, best_last_label = score[i].max(dim=0)
    best_labels = [best_last_label.item()]

    # 最後のラベルの遷移を逆方向に探索、最適なラベル列を取得
    for history in reversed(histories):
      best_last_label = history[i][best_labels[-1]]
      best_labels.append(best_last_label.item())

    # 順序を反転
    best_labels.reverse()
    best_labels_list.append(best_labels)

  return torch.LongTensor(best_labels_list)

### 固有表現を抽出

In [None]:
from seqeval.metrics.sequence_labeling import get_entities

def extract_entities(
    predictions: list[dict[str, any]],
    dataset: list[dict[str, any]],
    tokenizer: PreTrainedTokenizer,
    id2label: dict[int, str]
) -> list[dict[str, any]]:
  """ 固有表現を抽出 """

  results = []
  for prediction, data in zip(predictions, dataset):
    # 文字のリストを取得
    characters = list(data["text"])

    # 特殊トークンを除いたトークンのリスト、予測ラベルのリストを取得
    tokens, pred_labels = [], []

    all_tokens = tokenizer.convert_ids_to_tokens(
        prediction["input_ids"]
    )

    for token, label_id in zip(all_tokens, prediction["pred_label_ids"]):

      # 特殊トークン以外をリストに追加
      if token not in tokenizer.all_special_tokens:
        tokens.append(token)
        pred_labels.append(id2label[label_id])

    # 文字のリスト、トークンのリストのアライメントを作成
    _, token_to_char_indices = get_alignments(characters, tokens)

    # 予測ラベルのリストから固有表現タイプ、トークンの開始、終了位置を取得
    # 上記を正解データと同じ形式に変換する
    pred_entities = []
    for entity in get_entities(pred_labels):
      entity_type, token_start, token_end = entity

      # 文字単位の開始位置を取得
      char_start = token_to_char_indices[token_start][0]

      # 文字単位の終了位置を取得
      char_end = token_to_char_indices[token_end][-1] + 1
      pred_entity = {
          "name": "".join(characters[char_start:char_end]),
          "span": [char_start, char_end],
          "type": entity_type
      }

      pred_entities.append(pred_entity)
    data["pred_entities"] = pred_entities
    results.append(data)
  return results

# 固有表現を抽出
results = extract_entities(
    predictions, dataset["validation"], tokenizer, id2label
)

pprint(results[0])



{'curid': '1662110',
 'entities': [{'name': '復活篇', 'span': [1, 4], 'type': '製品名'},
              {'name': 'グリーンバニー', 'span': [6, 13], 'type': '法人名'}],
 'pred_entities': [{'name': '「', 'span': [0, 1], 'type': '_'},
                   {'name': '復活篇', 'span': [1, 4], 'type': '製品名'},
                   {'name': '」は', 'span': [4, 6], 'type': '_'},
                   {'name': 'グリーンバニー', 'span': [6, 13], 'type': '法人名'}],
 'text': '「復活篇」はグリーンバニーからの発売となっている。'}


In [None]:
from seqeval.metrics import classification_report

def create_character_labels(
    text: str, entities: list[dict[str, list[int] | str]]
) -> list[str]:
  """ 文字ベースでラベルのlistを作成 """

  # "0"のラベルで初期化したラベルのlistを作成
  labels = ["0"] * len(text)

  for entity in entities:
    entity_span, entity_type = entity["span"], entity["type"]

    # 固有表現の開始文字の位置に"B-"のラベルを設定
    labels[entity_span[0]] = f"B-{entity_type}"

    # 固有表現の開始文字の位置に"I-"のラベルを設定
    for i in range(entity_span[0] + 1, entity_span[1]):
      labels[i] = f"I-{entity_type}"

  return labels

def convert_results_to_labels(
    results: list[dict[str, any]]
) -> tuple[list[list[str]], list[list[str]]]:
  """ 正解データ、予測データのラベルのリスト作成 """
  true_labels, pred_labels = [], []

  for result in results:
    true_labels.append(
        create_character_labels(
            result["text"], result["entities"]
        )
    )

    pred_labels.append(
        create_character_labels(
            result["text"], result["pred_entities"]
        )
    )

  return true_labels, pred_labels

from seqeval.metrics import f1_score, precision_score, recall_score

def compute_scores(
    true_labels: list[list[str]],
    pred_labels: list[list[str]],
    average: str,
) -> dict[str, float]:
  """ precision, recall, f1_scoreを算出 """

  scores = {
      "precision": precision_score(
          true_labels, pred_labels, average=average
      ),
      "recall": recall_score(
          true_labels, pred_labels, average=average
      ),
      "f1-score": f1_score(
          true_labels, pred_labels, average=average
      )
  }

  return scores

### BERT-CRFを使ってラベルを予測する関数を定義

In [None]:
from torch.utils.data import DataLoader
from tqdm import tqdm
from transformers import PreTrainedModel

def run_prediction_crf(
    dataloader: DataLoader,
    model: PreTrainedModel,
) -> list[dict[str, any]]:
  """ BERT-CRFモデルを使ってラベルを予測 """

  predictions = []
  for batch in tqdm(dataloader):
    inputs = {
        k: v.to(model.device)
        for k, v in batch.items()
        if k != "special_tokens_mask"
    }

    # [CLS]以外の予測スコアを取得
    logits = model(**inputs).logits.cpu()[:, 1:, :]

    # [CLS]以外の特殊トークンのマスクを取得
    mask = (batch["special_tokens_mask"] == 0).cpu()[:, 1:]

    # 学習した遷移スコアを取得
    start_transitions = model.crf.start_transitions.cpu()
    transitions = model.crf.transitions.cpu()
    end_transitions = model.crf.end_transitions.cpu()

    # ビタビアルゴリズムを用いて最適なIDの系列を探索
    pred_label_ids = decode_with_viterbi(
        logits,
        mask,
        start_transitions,
        transitions,
        end_transitions,
    )

    # [CLS]のIDを0とする
    cls_pred_label_id = torch.zeros(pred_label_ids.shape[0], 1)

    # [CLS]のID、探索したIDの系列を連結して予測ラベルとする
    batch["pred_label_ids"] = torch.concat(
        [cls_pred_label_id, pred_label_ids], dim=1
    )

    batch = {k: v.cpu().tolist() for k, v in batch.items()}

    # ミニバッチのデータを事例単位のlistに変換
    predictions += convert_list_dict_to_dict_list(batch)
  return predictions

### F値のマイクロ平均が高いモデルを選択


In [None]:
from glob import glob

# DataLoaderでミニバッチを作成
validation_dataloader = DataLoader(
    val_dataset,
    batch_size=32,
    shuffle=False,
    collate_fn=data_collator
)

best_score = 0

for checkpoint in sorted(glob("/content/drive/MyDrive/Learning_LLM/chapter6/bert_crf/checkpoint-*")):
  # モデルの読み込み
  model_crf = BertWithCrfForTokenClassification.from_pretrained(
      checkpoint
  )

  model_crf = model_crf.to("cuda")

  # 固有表現ラベルを予測
  predictions = run_prediction_crf(validation_dataloader, model_crf)

  # 固有表現を抽出
  results = extract_entities(
      predictions, dataset["validation"], tokenizer, id2label
  )

  # 正解データ、予測データのラベルのリストを作成
  true_labels, pred_labels = convert_results_to_labels(results)

  # 評価スコアを算出
  scores = compute_scores(true_labels, pred_labels, "micro")
  if best_score < scores["f1-score"]:
    best_model_crf = model_crf

100%|██████████| 17/17 [00:03<00:00,  4.89it/s]
100%|██████████| 17/17 [00:03<00:00,  4.29it/s]
100%|██████████| 17/17 [00:03<00:00,  4.89it/s]
100%|██████████| 17/17 [00:04<00:00,  4.13it/s]
100%|██████████| 17/17 [00:03<00:00,  5.04it/s]


### 性能を評価

In [None]:
# テストセットに対して前処理
test_dataset = dataset["test"].map(
    preprocess_data,
    fn_kwargs={
        "tokenizer": tokenizer,
        "label2id": label2id,
    },
    remove_columns = dataset["test"].column_names,
)

# DataLoaderでミニバッチを作成
test_dataloader = DataLoader(
    test_dataset,
    batch_size=32,
    shuffle=False,
    collate_fn=data_collator
)

# 固有表現ラベルの予測
predictions = run_prediction_crf(test_dataloader, best_model_crf)

print(predictions)

# 固有表現の抽出
results = extract_entities(
    predictions, dataset["test"], tokenizer, id2label
)

# 正解データ予測データのラベルのリストを作成
true_labels, pred_labels = convert_results_to_labels(results)

print(pred_labels)

# 評価結果を出力
print(classification_report(true_labels, pred_labels))

Map:   0%|          | 0/535 [00:00<?, ? examples/s]

100%|██████████| 17/17 [00:03<00:00,  4.50it/s]


[{'input_ids': [2, 15233, 14227, 464, 14693, 1112, 461, 12505, 17803, 500, 14550, 24390, 12489, 461, 14010, 384, 13218, 456, 4751, 7445, 1425, 464, 647, 8545, 500, 21443, 27910, 464, 6154, 8684, 484, 20422, 384, 17695, 2002, 464, 5961, 7584, 14197, 474, 464, 3107, 8043, 430, 28424, 385, 3, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], 'token_type_ids': [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], 'special_tokens_mask': [1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1], 'attention_mask': [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0,

In [None]:
# 最終的な処理

# 固有表現ラベルを予測
predictions = run_prediction_crf(test_dataloader, best_model_crf)

# 固有表現を抽出
results = extract_entities(
    predictions, dataset["test"], tokenizer, id2label
)

# 正解データ、予測データのラベルのリストを作成
true_labels, pred_labels = convert_results_to_labels(results)

# 評価結果を出力
print(classification_report(true_labels, pred_labels))

100%|██████████| 17/17 [00:03<00:00,  4.42it/s]


              precision    recall  f1-score   support

           _       0.66      0.66      0.66      1132
     その他の組織名       0.85      0.80      0.82       100
       イベント名       0.87      0.94      0.90        93
          人名       0.97      0.97      0.97       287
          地名       0.86      0.88      0.87       204
      政治的組織名       0.82      0.90      0.86       106
         施設名       0.93      0.89      0.91       137
         法人名       0.89      0.89      0.89       248
         製品名       0.83      0.82      0.83       158

   micro avg       0.79      0.79      0.79      2465
   macro avg       0.85      0.86      0.86      2465
weighted avg       0.79      0.79      0.79      2465



### 同じラベルが付与されている別のデータセットを用いてモデルを検証

In [None]:
# データセットの読み込み
dataset_wikinews = load_dataset("llm-book/ner-wikinews-dataset")
print(dataset_wikinews)

You can avoid this message in future by passing the argument `trust_remote_code=True`.
Passing `trust_remote_code=True` will be mandatory to load this dataset from the next major release of `datasets`.


Downloading builder script:   0%|          | 0.00/3.15k [00:00<?, ?B/s]

Downloading readme:   0%|          | 0.00/1.02k [00:00<?, ?B/s]

Downloading data:   0%|          | 0.00/276k [00:00<?, ?B/s]

Generating test split: 0 examples [00:00, ? examples/s]

DatasetDict({
    test: Dataset({
        features: ['curid', 'text', 'entities'],
        num_rows: 178
    })
})


In [None]:
# データセットの前処理
test_dataset = dataset_wikinews["test"].map(
    preprocess_data,
    fn_kwargs={"tokenizer": tokenizer, "label2id": label2id},
    remove_columns=dataset_wikinews["test"].column_names,
)

# ミニバッチを作成
test_loader = DataLoader(
    test_dataset,
    batch_size=32,
    shuffle=False,
    collate_fn=data_collator
)

Map:   0%|          | 0/178 [00:00<?, ? examples/s]

In [None]:
# 固有表現ラベルの予測
predictions = run_prediction_crf(
    test_loader, best_model_crf
)

# 固有表現の抽出
results = extract_entities(
    predictions, dataset_wikinews["test"], tokenizer, id2label
)

# 正解データ、予測データのラベルのリストを作成
true_labels, pred_labels = convert_results_to_labels(results)

print(classification_report(true_labels, pred_labels))

100%|██████████| 6/6 [00:01<00:00,  5.74it/s]


[{'curid': '45869486', 'text': '- 岸田総理夫人 単独訪米', 'entities': [{'name': '岸田', 'span': [2, 4], 'type': '人名'}], 'pred_entities': [{'name': '-', 'span': [0, 1], 'type': '_'}, {'name': '岸田', 'span': [2, 4], 'type': '人名'}, {'name': '総理夫人 単独訪', 'span': [4, 12], 'type': '_'}, {'name': '米', 'span': [12, 13], 'type': '地名'}]}, {'curid': '45869487', 'text': '2023年4月17日、岸田裕子首相夫人はアメリカのバイデン大統領夫人のジル・バイデン夫人の招待を受けて、ホワイトハウスで会談した。', 'entities': [{'name': '岸田裕子', 'span': [11, 15], 'type': '人名'}, {'name': 'アメリカ', 'span': [20, 24], 'type': '地名'}, {'name': 'バイデン', 'span': [25, 29], 'type': '人名'}, {'name': 'ジル・バイデン', 'span': [35, 42], 'type': '人名'}, {'name': 'ホワイトハウス', 'span': [52, 59], 'type': '施設名'}], 'pred_entities': [{'name': '2023年4月17日、', 'span': [0, 11], 'type': '_'}, {'name': '岸田裕子', 'span': [11, 15], 'type': '人名'}, {'name': '首相夫人は', 'span': [15, 20], 'type': '_'}, {'name': 'アメリカ', 'span': [20, 24], 'type': '地名'}, {'name': 'の', 'span': [24, 25], 'type': '_'}, {'name': 'バイデン', 'span': [25, 29], 'type': '

In [None]:
def find_error_results(
    results: list[dict[str, any]]
) -> list[dict[str, any]]:
  """ エラー事例を見つける """

  error_results = []
  for idx, result in enumerate(results):

    result["idx"] = idx

    # 正解データ、予測データが異なればリストに追加
    if result["entities"] != result["pred_entities"]:
      error_results.append(result)

  return error_results

In [None]:
def output_text_with_label(
    result: dict[str, any], entity_column: str
) -> str:
  """ 固有表現ラベル付きテキストを出力 """

  text_with_label = ""
  entity_count = 0

  for i, char in enumerate(result["text"]):
    # 出力に加えていない固有表現の有無を判定
    if entity_count < len(result[entity_column]):
      entity = result[entity_column][entity_count]

      # 固有表現の先頭の処理を行う
      if i == entity["span"][0]:
        entity_type = entity["type"]
        text_with_label += f"[({entity_type})"

      text_with_label += char

      #  固有表現の末尾の処理を行う
      if i == entity["span"][1] - 1:
        text_with_label += "]"
        entity_count += 1
    else:
      text_with_label += char

  return text_with_label

In [None]:
# 予測時に間違えた事例を見つける
error_results = find_error_results(results)

for result in error_results[:3]:
  idx = result["idx"]
  true_text = output_text_with_label(result, "entities")
  pred_text = output_text_with_label(result, "pred_entities")

  print(f"事例{idx}の正解: {true_text}")
  print(f"事例{idx}の予測: {pred_text}")
  print()

事例0の正解: - [(人名)岸田]総理夫人 単独訪米
事例0の予測: [(_)-] [(人名)岸田][(_)総理夫人 単独訪][(地名)米]

事例1の正解: 2023年4月17日、[(人名)岸田裕子]首相夫人は[(地名)アメリカ]の[(人名)バイデン]大統領夫人の[(人名)ジル・バイデン]夫人の招待を受けて、[(施設名)ホワイトハウス]で会談した。
事例1の予測: [(_)2023年4月17日、][(人名)岸田裕子][(_)首相夫人は][(地名)アメリカ][(_)の][(人名)バイデン][(_)大統領夫人の][(人名)ジル・バイデン][(_)夫人の招待を受けて、][(施設名)ホワイトハウス]で会談した。

事例2の正解: この会談では、[(人名)バイデン][(地名)アメリカ]大統領とも挨拶をし、両夫人は、[(施設名)ホワイトハウス]の庭で桜の木の植樹も行った。
事例2の予測: [(_)この会談では、][(人名)バイデン][(地名)アメリカ][(_)大統領とも挨拶をし、両夫人は、][(施設名)ホワイトハウス]の庭で桜の木の植樹も行った。

