In [1]:
from datasets import load_dataset
from pprint import pprint
from collections import Counter
import pandas as pd
from datasets import Dataset
from unicodedata import normalize, is_normalized
from spacy_alignments.tokenizations import get_alignments
from transformers import AutoTokenizer, AutoModelForSequenceClassification, DataCollatorWithPadding, Trainer,BatchEncoding, pipeline
import torch
from seqeval.metrics.sequence_labeling import get_entities

In [2]:
dataset = load_dataset('llm-book/ner-wikipedia-dataset')

You can avoid this message in future by passing the argument `trust_remote_code=True`.
Passing `trust_remote_code=True` will be mandatory to load this dataset from the next major release of `datasets`.


In [3]:
print(dataset)

DatasetDict({
    train: Dataset({
        features: ['curid', 'text', 'entities'],
        num_rows: 4274
    })
    validation: Dataset({
        features: ['curid', 'text', 'entities'],
        num_rows: 534
    })
    test: Dataset({
        features: ['curid', 'text', 'entities'],
        num_rows: 535
    })
})


In [4]:
pprint(list(dataset['train'])[:2])

[{'curid': '3638038',
  'entities': [{'name': 'さくら学院', 'span': [0, 5], 'type': 'その他の組織名'},
               {'name': 'Ciao Smiles', 'span': [6, 17], 'type': 'その他の組織名'}],
  'text': 'さくら学院、Ciao Smilesのメンバー。'},
 {'curid': '1729527',
  'entities': [{'name': 'レクレアティーボ・ウェルバ', 'span': [17, 30], 'type': 'その他の組織名'},
               {'name': 'プリメーラ・ディビシオン', 'span': [32, 44], 'type': 'その他の組織名'}],
  'text': '2008年10月5日、アウェーでのレクレアティーボ・ウェルバ戦でプリメーラ・ディビシオンでの初得点を決めた。'}]


In [5]:
for i in dataset['train']:
    pprint(i)
    break

{'curid': '3638038',
 'entities': [{'name': 'さくら学院', 'span': [0, 5], 'type': 'その他の組織名'},
              {'name': 'Ciao Smiles', 'span': [6, 17], 'type': 'その他の組織名'}],
 'text': 'さくら学院、Ciao Smilesのメンバー。'}


・今回はテキストに含まれる固有表現のスパンとそのタイプを指定する

In [6]:
# データセットの分析

def count_label_occurrences(dataset: Dataset) -> dict[str, int]:

    # 固有表現タイプを抽出したlistを作成する
    entities = [
        e['type'] for data in dataset for e in data['entities']
    ]

    # ラベルの出現回数が多い順に並び変える
    # Counterにはmost_common()メソッドがあり、(要素, 出現回数)という形のタプルを出現回数順に並べたリストを返す。
    label_counts = dict(Counter(entities).most_common())
    return label_counts


label_counts_dict = {}
for split in dataset:
    label_counts_dict[split] = count_label_occurrences(dataset[split])
df = pd.DataFrame(label_counts_dict)
df.loc['合計'] = df.sum()
df

Unnamed: 0,train,validation,test
人名,2394,299,287
法人名,2006,231,248
地名,1769,184,204
政治的組織名,953,121,106
製品名,934,123,158
施設名,868,103,137
その他の組織名,852,99,100
イベント名,831,85,93
合計,10607,1245,1333


In [7]:
def has_overlap(spans):
    sorted_spans = sorted(spans, key=lambda x: x[0])
    for i in range(1, len(sorted_spans)):
        if sorted_spans[i-1][1] > sorted_spans[i][0]:
            return 1
    return 0


overlap_count = 0
for split in dataset:
    for data in dataset[split]:
        if data['entities']:
            spans = [e['span'] for e in data['entities']]
            overlap_count += has_overlap(spans)

    print(f"{split}におけるスパンが重複する事例数：{overlap_count}")

trainにおけるスパンが重複する事例数：0
validationにおけるスパンが重複する事例数：0
testにおけるスパンが重複する事例数：0


In [8]:
spans

[[0, 9], [10, 21], [25, 37]]

In [9]:
data['entities']

[{'name': 'ダーヴラ・カーワン', 'span': [0, 9], 'type': '人名'},
 {'name': 'マーシー・ハーティガン', 'span': [10, 21], 'type': '人名'},
 {'name': 'ラッセル・T・デイヴィス', 'span': [25, 37], 'type': '人名'}]

In [10]:
dataset['test'][-1:]

{'curid': ['4113413'],
 'text': ['ダーヴラ・カーワンはマーシー・ハーティガンを演じ、ラッセル・T・デイヴィスは本作のポッドキャストコメンタリーで彼女について「これまでにないほどダークな悪役」と表現した。'],
 'entities': [[{'name': 'ダーヴラ・カーワン', 'span': [0, 9], 'type': '人名'},
   {'name': 'マーシー・ハーティガン', 'span': [10, 21], 'type': '人名'},
   {'name': 'ラッセル・T・デイヴィス', 'span': [25, 37], 'type': '人名'}]]}

## 前処理

## テキスト正規化

In [11]:
text = "ABCＡＢＣabcABCアイウｱｲｳ①②③123"

nomalized_text = normalize('NFKC', text)
print('正規化前', text)
print('正規化後', nomalized_text)

正規化前 ABCＡＢＣabcABCアイウｱｲｳ①②③123
正規化後 ABCABCabcABCアイウアイウ123123


In [12]:
count = 0
for split in dataset:
    for data in dataset[split]:
        if not is_normalized('NFKC',data['text']): # 正規化されていないとFalseをかえす？
            count += 1
print(f'正規化されていない事例数: {count}')

正規化されていない事例数: 0


In [13]:
text = "ABCＡＢＣabcABCアイウｱｲｳ①②③123"
is_normalized('NFKC', text)

False

In [14]:
# joinはイテレータを引数に受け取る => 文字を1文字ずつ処理
'/'.join(dataset['train'][0]['text'])

'さ/く/ら/学/院/、/C/i/a/o/ /S/m/i/l/e/s/の/メ/ン/バ/ー/。'

## 文字列とトークン列のアライメント

In [15]:
model_name = "cl-tohoku/bert-base-japanese-v3"
tokenizer = AutoTokenizer.from_pretrained(model_name)


text ='さくら学院は'

# 文字列のLISTに変換
characters = list(text)

# 特殊トークンも含めたリストにする
tokens = tokenizer.convert_ids_to_tokens(tokenizer.encode(text))

char_to_token_indices, token_to_char_indices = get_alignments(characters, tokens)
print(characters, tokens)
print('文字に対するトークンの位置',char_to_token_indices)
print('トークンに対する文字の位置',token_to_char_indices)

['さ', 'く', 'ら', '学', '院', 'は'] ['[CLS]', 'さくら', '学院', 'は', '[SEP]']
文字に対するトークンの位置 [[1], [1], [1], [2], [2], [3]]
トークンに対する文字の位置 [[], [0, 1, 2], [3, 4], [5], []]


In [16]:
print(characters, tokens)

['さ', 'く', 'ら', '学', '院', 'は'] ['[CLS]', 'さくら', '学院', 'は', '[SEP]']


In [17]:
text = '大谷翔平は岩手県水沢市出身'
entities = [
    {'name':'大谷正平', 'span':[0, 4], 'type':'人名'},
    {'name':'岩手県水沢市', 'span':[5, 11], 'type':'地方'}
]

In [50]:



def output_tokens_and_labels(text, entities, tokenizer):
    characters = list(text)
    tokens = tokenizer.convert_ids_to_tokens(tokenizer.encode(text))
    char_to_token_indices, _ = get_alignments(characters, tokens)
    
    # 0で初期化したラベルリスト
    labels = ['0'] * len(tokens)
    for entity in entities:
        entity_span, entity_type = entity['span'], entity['type']
        start = char_to_token_indices[entity_span[0]][0]
        end = char_to_token_indices[entity_span[1] -1][0]
        labels[start] = f"B-{entity_type}"
        for idx in range(start + 1, end + 1):
            labels[idx] = f"I-{entity_type}"
            print(labels)
    
        labels[0] = '-'
        labels[-1] = '-'
    return tokens, labels


tokens, labels = output_tokens_and_labels(text, entities, tokenizer)

df = pd.DataFrame({'トークン列':tokens, 'ラベル列':labels})
df.index.name = "位置"
df.T

['0', 'B-人名', 'I-人名', '0', '0', '0', '0', '0', '0', '0', '0']
['0', 'B-人名', 'I-人名', 'I-人名', '0', '0', '0', '0', '0', '0', '0']
['-', 'B-人名', 'I-人名', 'I-人名', '0', 'B-地方', 'I-地方', '0', '0', '0', '-']
['-', 'B-人名', 'I-人名', 'I-人名', '0', 'B-地方', 'I-地方', 'I-地方', '0', '0', '-']
['-', 'B-人名', 'I-人名', 'I-人名', '0', 'B-地方', 'I-地方', 'I-地方', 'I-地方', '0', '-']


位置,0,1,2,3,4,5,6,7,8,9,10
トークン列,[CLS],大谷,翔,##平,は,岩手,県,水沢,市,出身,[SEP]
ラベル列,-,B-人名,I-人名,I-人名,0,B-地方,I-地方,I-地方,I-地方,0,-


In [19]:
from typing import Any
from seqeval.metrics import classification_report

def create_character_labels(
    text: str, entities: list[dict[str, list[int] | str]]
) -> list[str]:
    """文字ベースでラベルのlistを作成"""
    # "O"のラベルで初期化したラベルのlistを作成する
    labels = ["O"] * len(text)
    for entity in entities: # 各固有表現を処理する
        entity_span, entity_type = entity["span"], entity["type"]
        # 固有表現の開始文字の位置に"B-"のラベルを設定する
        labels[entity_span[0]] = f"B-{entity_type}"
        # 固有表現の開始文字以外の位置に"I-"のラベルを設定する
        for i in range(entity_span[0] + 1, entity_span[1]):
            labels[i] = f"I-{entity_type}"
    return labels

def convert_results_to_labels(
    results: list[dict[str, Any]]
) -> tuple[list[list[str]], list[list[str]]]:
    """正解データと予測データのラベルのlistを作成"""
    true_labels, pred_labels = [], []
    for result in results: # 各事例を処理する
        # 文字ベースでラベルのリストを作成してlistに加える
        true_labels.append(
            create_character_labels(result["text"], result["entities"])
        )
        pred_labels.append(
            create_character_labels(result["text"], result["pred_entities"])
        )
    return true_labels, pred_labels

In [20]:
## 評価指標のseqevalの挙動

results = [
    {
        "text": "大谷翔平は岩手県水沢市出身",
        "entities": [
            {"name": "大谷翔平", "span": [0, 4], "type": "人名"},
            {"name": "岩手県水沢市", "span": [5, 11], "type": "地名"},
        ],
        "pred_entities": [
            {"name": "大谷翔平", "span": [0, 4], "type": "人名"},
            {"name": "岩手県", "span": [5, 8], "type": "地名"},
            {"name": "水沢市", "span": [8, 11], "type": "施設名"},
        ],
    }
]

true_labels, pred_labels = convert_results_to_labels(results)
print(classification_report(true_labels, pred_labels))

              precision    recall  f1-score   support

          人名       1.00      1.00      1.00         1
          地名       0.00      0.00      0.00         1
         施設名       0.00      0.00      0.00         0

   micro avg       0.33      0.50      0.40         2
   macro avg       0.33      0.33      0.33         2
weighted avg       0.50      0.50      0.50         2



  _warn_prf(average, modifier, msg_start, len(result))


In [21]:
true_labels

[['B-人名',
  'I-人名',
  'I-人名',
  'I-人名',
  'O',
  'B-地名',
  'I-地名',
  'I-地名',
  'I-地名',
  'I-地名',
  'I-地名',
  'O',
  'O']]

In [22]:
from seqeval.metrics import f1_score, precision_score, recall_score

def compute_scores(true_labels: list[list[str]], pred_labels: list[list[str]], average:str) -> dict[str, float]:
    scores = {
        'precision': precision_score(true_labels, pred_labels, average=average),
        'recall': recall_score(true_labels, pred_labels, average=average),
        'F1-score': f1_score(true_labels, pred_labels, average=average),
    }

    return scores


print(compute_scores(true_labels, pred_labels, 'micro'))

{'precision': 0.3333333333333333, 'recall': 0.5, 'F1-score': 0.4}


## 固有表現認識モデルの実装

In [23]:
# BERTのファインチューニング

# label1id
def create_label2id(
    entities_list: list[list[dict[str, str | str]]]
) -> dict[str, int]:
    label2id = {"O": 0}

    # setなので重複はなし
    entity_type = set([e['type'] for entities in entities_list for e in entities])

    entity_types = sorted(entity_type)

    # 1entityにつき2種類登録
    for i, entity_type in enumerate(entity_types):
        label2id[f"B-{entity_type}"] = i*2 + 1
        label2id[f"I-{entity_type}"] = i*2 + 2
    return label2id


label2id = create_label2id(dataset['train']["entities"])
id2label = {id:v for v, id in label2id.items()}
pprint(id2label)

{0: 'O',
 1: 'B-その他の組織名',
 2: 'I-その他の組織名',
 3: 'B-イベント名',
 4: 'I-イベント名',
 5: 'B-人名',
 6: 'I-人名',
 7: 'B-地名',
 8: 'I-地名',
 9: 'B-政治的組織名',
 10: 'I-政治的組織名',
 11: 'B-施設名',
 12: 'I-施設名',
 13: 'B-法人名',
 14: 'I-法人名',
 15: 'B-製品名',
 16: 'I-製品名'}


In [24]:
# データの前処理


def preprocess_data(data, tokenizer, label2id) -> BatchEncoding:
    # トークナイゼーション
    inputs = tokenizer(data['text'], return_tensors='pt', return_special_tokens_mask=True)
    inputs = { k:v.squeeze(0) for k, v in inputs.items()}

    # textの文字リスト
    characters = list(data['text'])

    # 特殊トークンを含んだサブトークンリスト
    tokens = tokenizer.convert_ids_to_tokens(inputs['input_ids'])
    # print(tokens[1:-2])
    # print(characters)
    # print(tokens)

    # 文字に対するトークン位置を取得
    char_to_token_indeces, _ = get_alignments(characters, tokens)

    # トークンリストと同じ要素数の０値リストで初期化
    labels = torch.zeros_like(inputs['input_ids'])
    for entity in data['entities']:
        # print(char_to_token_indeces)
        # print(entity['span'][0])
        # print(entity['span'][1] - 1)

        # char_to_token_indecesのentityの最初と最後の位置を取得
        start_token_indeces = char_to_token_indeces[entity['span'][0]]

        # sliceは0からなので-1を加える
        end_token_indeces = char_to_token_indeces[entity['span'][1] - 1]
        # print(characters)
        # print(char_to_token_indeces)
        # print(entity['span'][0])
        # print(entity['span'][1] - 1)
        # print(start_token_indeces)
        # print(end_token_indeces)

        # 文字に対応するトークンが存在しなければスキップ 
        if(
            len(start_token_indeces) == 0
            or len(end_token_indeces) == 0
        ):
            # print(entity)
            # print(char_to_token_indeces[entity['span'][0]])
            continue

        start, end = start_token_indeces[0], end_token_indeces[0]
        # print(start, end)
        entity_type = entity['type']

        # トークン要素リストでラベル付け
        labels[start] = label2id[f"B-{entity_type}"]
        if start != end:
            labels[start + 1 : end + 1] = label2id[f"I-{entity_type}"]
        # print(labels)


    labels[torch.where(inputs["special_tokens_mask"])] = -100
    inputs['labels'] = labels
    return inputs
        
        

In [26]:
test = dataset['train'][0]
preprocess_data(test, tokenizer, label2id)

{'input_ids': tensor([    2, 16972, 14284,   384,    50, 13634,  7075, 20218, 18124,  7045,
           464, 12913,   385,     3]),
 'token_type_ids': tensor([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]),
 'special_tokens_mask': tensor([1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1]),
 'attention_mask': tensor([1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]),
 'labels': tensor([-100,    1,    2,    0,    1,    2,    2,    2,    2,    2,    0,    0,
            0, -100])}

In [27]:
pprint(dataset['train'][0])

{'curid': '3638038',
 'entities': [{'name': 'さくら学院', 'span': [0, 5], 'type': 'その他の組織名'},
              {'name': 'Ciao Smiles', 'span': [6, 17], 'type': 'その他の組織名'}],
 'text': 'さくら学院、Ciao Smilesのメンバー。'}


In [28]:
# 訓練セットに対して前処理を行う
train_dataset = dataset["train"].map(
    preprocess_data,
    fn_kwargs={
        "tokenizer": tokenizer,
        "label2id": label2id,
    },
    remove_columns=dataset["train"].column_names,
)
# 検証セットに対して前処理を行う
validation_dataset = dataset["validation"].map(
    preprocess_data,
    fn_kwargs={
        "tokenizer": tokenizer,
        "label2id": label2id,
    },
    remove_columns=dataset["validation"].column_names,
)

Parameter 'fn_kwargs'={'tokenizer': BertJapaneseTokenizer(name_or_path='cl-tohoku/bert-base-japanese-v3', vocab_size=32768, model_max_length=512, is_fast=False, padding_side='right', truncation_side='right', special_tokens={'unk_token': '[UNK]', 'sep_token': '[SEP]', 'pad_token': '[PAD]', 'cls_token': '[CLS]', 'mask_token': '[MASK]'}, clean_up_tokenization_spaces=True),  added_tokens_decoder={
	0: AddedToken("[PAD]", rstrip=False, lstrip=False, single_word=False, normalized=False, special=True),
	1: AddedToken("[UNK]", rstrip=False, lstrip=False, single_word=False, normalized=False, special=True),
	2: AddedToken("[CLS]", rstrip=False, lstrip=False, single_word=False, normalized=False, special=True),
	3: AddedToken("[SEP]", rstrip=False, lstrip=False, single_word=False, normalized=False, special=True),
	4: AddedToken("[MASK]", rstrip=False, lstrip=False, single_word=False, normalized=False, special=True),


Map:   0%|          | 0/4274 [00:00<?, ? examples/s]

Map:   0%|          | 0/534 [00:00<?, ? examples/s]

In [29]:
from transformers import AutoModelForTokenClassification, DataCollatorForTokenClassification

model = AutoModelForTokenClassification.from_pretrained(model_name, label2id=label2id, id2label=id2label)
data_collator = DataCollatorForTokenClassification(tokenizer)

Some weights of BertForTokenClassification were not initialized from the model checkpoint at cl-tohoku/bert-base-japanese-v3 and are newly initialized: ['classifier.bias', 'classifier.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


In [30]:
data_collator

DataCollatorForTokenClassification(tokenizer=BertJapaneseTokenizer(name_or_path='cl-tohoku/bert-base-japanese-v3', vocab_size=32768, model_max_length=512, is_fast=False, padding_side='right', truncation_side='right', special_tokens={'unk_token': '[UNK]', 'sep_token': '[SEP]', 'pad_token': '[PAD]', 'cls_token': '[CLS]', 'mask_token': '[MASK]'}, clean_up_tokenization_spaces=True),  added_tokens_decoder={
	0: AddedToken("[PAD]", rstrip=False, lstrip=False, single_word=False, normalized=False, special=True),
	1: AddedToken("[UNK]", rstrip=False, lstrip=False, single_word=False, normalized=False, special=True),
	2: AddedToken("[CLS]", rstrip=False, lstrip=False, single_word=False, normalized=False, special=True),
	3: AddedToken("[SEP]", rstrip=False, lstrip=False, single_word=False, normalized=False, special=True),
	4: AddedToken("[MASK]", rstrip=False, lstrip=False, single_word=False, normalized=False, special=True),
}, padding=True, max_length=None, pad_to_multiple_of=None, label_pad_toke

In [32]:
from transformers import Trainer, TrainingArguments
from transformers.trainer_utils import set_seed

set_seed(42)

training_args = TrainingArguments(
    output_dir='output_bert_ner',
    per_device_train_batch_size=32,
    per_device_eval_batch_size=32,
    learning_rate=1e-4,
    lr_scheduler_type='linear',
    warmup_ratio=0.1,
    num_train_epochs=5,
    evaluation_strategy='epoch',
    save_strategy='epoch',
    logging_strategy='epoch',
    fp16=True
)

trainer = Trainer(
    model=model,
    tokenizer=tokenizer,
    train_dataset=train_dataset,
    eval_dataset=validation_dataset,
    data_collator=data_collator,
    args=training_args
)

trainer.train()

dataloader_config = DataLoaderConfiguration(dispatch_batches=None, split_batches=False, even_batches=True, use_seedable_sampler=True)


Epoch,Training Loss,Validation Loss
1,0.6569,0.102932
2,0.0725,0.090111
3,0.0302,0.08194
4,0.0125,0.094305
5,0.0062,0.101235


TrainOutput(global_step=670, training_loss=0.15564849447848192, metrics={'train_runtime': 85.8186, 'train_samples_per_second': 249.014, 'train_steps_per_second': 7.807, 'total_flos': 1070012411245680.0, 'train_loss': 0.15564849447848192, 'epoch': 5.0})

In [33]:
from transformers import Trainer, TrainingArguments
from transformers.trainer_utils import set_seed

# 乱数シードを42に固定する
set_seed(42)

# Trainerに渡す引数を初期化する
training_args = TrainingArguments(
    output_dir="output_bert_ner", # 結果の保存フォルダ
    per_device_train_batch_size=32, # 訓練時のバッチサイズ
    per_device_eval_batch_size=32, # 評価時のバッチサイズ
    learning_rate=1e-4, # 学習率
    lr_scheduler_type="linear", # 学習率スケジューラ
    warmup_ratio=0.1, # 学習率のウォームアップ
    num_train_epochs=5, # 訓練エポック数
    evaluation_strategy="epoch", # 評価タイミング
    save_strategy="epoch", # チェックポイントの保存タイミング
    logging_strategy="epoch", # ロギングのタイミング
    fp16=True, # 自動混合精度演算の有効化
)

# Trainerを初期化する
trainer = Trainer(
    model=model,
    tokenizer=tokenizer,
    train_dataset=train_dataset,
    eval_dataset=validation_dataset,
    data_collator=data_collator,
    args=training_args,
)

# 訓練する
trainer.train()

dataloader_config = DataLoaderConfiguration(dispatch_batches=None, split_batches=False, even_batches=True, use_seedable_sampler=True)


Epoch,Training Loss,Validation Loss
1,0.0138,0.119601
2,0.0143,0.150496
3,0.0087,0.138084
4,0.0043,0.137459
5,0.0017,0.12891


TrainOutput(global_step=670, training_loss=0.008565774915823296, metrics={'train_runtime': 85.9458, 'train_samples_per_second': 248.645, 'train_steps_per_second': 7.796, 'total_flos': 1070012411245680.0, 'train_loss': 0.008565774915823296, 'epoch': 5.0})

In [34]:
def convert_list_dict_to_dict_list(
    list_dict: dict[str, list]
) -> list[dict[str, list]]:
    """ミニバッチのデータを事例単位のlistに変換"""
    dict_list = []
    # dictのキーのlistを作成する
    keys = list(list_dict.keys())
    for idx in range(len(list_dict[keys[0]])): # 各事例で処理する
        # dictの各キーからデータを取り出してlistに追加する
        dict_list.append({key: list_dict[key][idx] for key in keys})
    return dict_list

# ミニバッチのデータを事例単位のlistに変換する
list_dict = {
    "input_ids": [[0, 1], [2, 3]],
    "labels": [[1, 2], [3, 4]],
}
dict_list = convert_list_dict_to_dict_list(list_dict)
print(f"入力: {list_dict}")
print(f"出力: {dict_list}")

入力: {'input_ids': [[0, 1], [2, 3]], 'labels': [[1, 2], [3, 4]]}
出力: [{'input_ids': [0, 1], 'labels': [1, 2]}, {'input_ids': [2, 3], 'labels': [3, 4]}]


In [35]:
 list(list_dict.keys())

['input_ids', 'labels']

In [69]:
from tqdm import tqdm
from torch.utils.data import DataLoader
from transformers import PreTrainedModel

def run_prediction(
    dataloader: DataLoader, model: PreTrainedModel
) -> list[dict[str, Any]]:
    """予測スコアに基づき固有表現ラベルを予測"""
    predictions = []
    for batch in tqdm(dataloader): # 各ミニバッチを処理する
        inputs = {
            k: v.to(model.device)
            for k, v in batch.items()
            if k != "special_tokens_mask"
        }
        # 予測スコアを取得する
        logits = model(**inputs).logits
        # 最もスコアの高いIDを取得する
        batch["pred_label_ids"] = logits.argmax(-1)
        batch = {k: v.cpu().tolist() for k, v in batch.items()}
        # ミニバッチのデータを事例単位のlistに変換する
        predictions += convert_list_dict_to_dict_list(batch)
    return predictions

# ミニバッチの作成にDataLoaderを用いる
validation_dataloader = DataLoader(
    validation_dataset,
    batch_size=32,
    shuffle=False,
    collate_fn=data_collator,
)
# 固有表現ラベルを予測する
predictions = run_prediction(validation_dataloader, model)
print(predictions[0]["pred_label_ids"])

100%|██████████| 17/17 [00:00<00:00, 28.28it/s]

[0, 0, 15, 16, 0, 0, 13, 14, 14, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 13, 14, 14, 14, 0, 0, 0, 0, 0, 0, 15, 16, 16, 0, 13, 14, 14, 14, 14, 13, 13, 14, 14, 14, 0, 0, 0, 0, 13, 14, 14, 14, 0, 0, 14, 14, 14, 14, 0, 0, 0, 0, 0, 15, 16, 16, 0, 13, 14, 14, 14, 13, 13, 0, 0, 0, 15, 15, 16, 16, 13, 14, 14]





In [70]:
print(len(predictions))
for i in validation_dataloader:
    print(i['input_ids'].size())

534
torch.Size([32, 82])
torch.Size([32, 69])
torch.Size([32, 90])
torch.Size([32, 86])
torch.Size([32, 112])
torch.Size([32, 73])
torch.Size([32, 78])
torch.Size([32, 85])
torch.Size([32, 81])
torch.Size([32, 69])
torch.Size([32, 81])
torch.Size([32, 88])
torch.Size([32, 69])
torch.Size([32, 81])
torch.Size([32, 148])
torch.Size([32, 82])
torch.Size([22, 66])


In [71]:
for i in validation_dataloader:
    pprint(i['input_ids'].size())
    pprint(i)
    break

torch.Size([32, 82])
{'attention_mask': tensor([[1, 1, 1,  ..., 0, 0, 0],
        [1, 1, 1,  ..., 0, 0, 0],
        [1, 1, 1,  ..., 0, 0, 0],
        ...,
        [1, 1, 1,  ..., 0, 0, 0],
        [1, 1, 1,  ..., 0, 0, 0],
        [1, 1, 1,  ..., 0, 0, 0]]),
 'input_ids': tensor([[    2,   395, 13887,  ...,     0,     0,     0],
        [    2, 12538,   491,  ...,     0,     0,     0],
        [    2, 18763,  7087,  ...,     0,     0,     0],
        ...,
        [    2, 25912, 12881,  ...,     0,     0,     0],
        [    2, 12538,   500,  ...,     0,     0,     0],
        [    2,  1220, 12582,  ...,     0,     0,     0]]),
 'labels': tensor([[-100,    0,   15,  ..., -100, -100, -100],
        [-100,    0,    0,  ..., -100, -100, -100],
        [-100,    0,    0,  ..., -100, -100, -100],
        ...,
        [-100,   11,   12,  ..., -100, -100, -100],
        [-100,    0,    0,  ..., -100, -100, -100],
        [-100,    0,    0,  ..., -100, -100, -100]]),
 'special_tokens_mask': te

In [72]:
predictions[0]

{'input_ids': [2,
  395,
  13887,
  4436,
  396,
  465,
  14895,
  31227,
  7053,
  12488,
  464,
  12627,
  458,
  12493,
  456,
  12483,
  385,
  3,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0],
 'token_type_ids': [0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0],
 'special_tokens_mask': [1,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
 

In [73]:

def extract_entites(predictions, dataset, tokenizer, id2label):
    """
    固有表現の抽出
    """
    results = []
    for prediction, data in zip(predictions, dataset):
    
        # 1文字のリスト化
        characters = list(data['text'])
        print('characters:',characters)
    
        tokens, pred_labels = [], []

        # idからトークンへ変換
        all_tokens = tokenizer.convert_ids_to_tokens(prediction['input_ids'])
    
        for token, label_id in zip(all_tokens, prediction['pred_label_ids']):
            
            # スペシャルトークンを除いたトークンリスト、予想ラベルリストを作成
            if token not in tokenizer.all_special_tokens:
                tokens.append(token)
                pred_labels.append(id2label[label_id])
    
    
            # スペシャルトークンを含まないトークンに対する文字の位置リストを返す
            _, token_to_char_indices = get_alignments(characters, tokens)


            
    
            pred_entities = []
            print('tokens:',tokens)
            print('pred_labels:', pred_labels)

            # token, label_idのイテレーション毎にpred_labelsを０からイテレーションを回す
            for entity in get_entities(pred_labels):
                print('===========get_entities=============')
                print('entity:', entity)
                
                print('token_to_char_indices:',token_to_char_indices)
                entity_type, token_start, token_end = entity
                char_start = token_to_char_indices[token_start][0]
                char_end = token_to_char_indices[token_end][-1] + 1
                pred_entity = {
                    'name': "".join(characters[char_start: char_end]), 
                    'span': [char_start, char_end],
                    'type': entity_type,
                }
                print('pred_entity:',pred_entity)
    
                pred_entities.append(pred_entity)

            # dataのバッチ毎に予想ラベルを追加する
            data['pred_entities'] = pred_entities
            results.append(data)
        return results


results = extract_entites(predictions, dataset['validation'], tokenizer, id2label)

            
            


characters: ['「', '復', '活', '篇', '」', 'は', 'グ', 'リ', 'ー', 'ン', 'バ', 'ニ', 'ー', 'か', 'ら', 'の', '発', '売', 'と', 'な', 'っ', 'て', 'い', 'る', '。']
tokens: []
pred_labels: []
tokens: ['「']
pred_labels: ['O']
tokens: ['「', '復活']
pred_labels: ['O', 'B-製品名']
entity: ('製品名', 1, 1)
token_to_char_indices: [[0], [1, 2]]
pred_entity: {'name': '復活', 'span': [1, 3], 'type': '製品名'}
tokens: ['「', '復活', '篇']
pred_labels: ['O', 'B-製品名', 'I-製品名']
entity: ('製品名', 1, 2)
token_to_char_indices: [[0], [1, 2], [3]]
pred_entity: {'name': '復活篇', 'span': [1, 4], 'type': '製品名'}
tokens: ['「', '復活', '篇', '」']
pred_labels: ['O', 'B-製品名', 'I-製品名', 'O']
entity: ('製品名', 1, 2)
token_to_char_indices: [[0], [1, 2], [3], [4]]
pred_entity: {'name': '復活篇', 'span': [1, 4], 'type': '製品名'}
tokens: ['「', '復活', '篇', '」', 'は']
pred_labels: ['O', 'B-製品名', 'I-製品名', 'O', 'O']
entity: ('製品名', 1, 2)
token_to_char_indices: [[0], [1, 2], [3], [4], [5]]
pred_entity: {'name': '復活篇', 'span': [1, 4], 'type': '製品名'}
tokens: ['「', '復活', '篇', '」', 'は'

In [74]:
# ラストの０ラベルは0が続くのでエンティティとは認められない
get_entities( ['0', 'B-製品名', 'I-製品名', '0', '0', 'B-法人名', 'I-法人名', 'I-法人名', '0', '0', '0', '0', '0', '0', '0', '0'])



[('_', 0, 0), ('製品名', 1, 2), ('_', 3, 4), ('法人名', 5, 7)]

In [75]:
get_entities( ['O', 'B-製品名', 'I-製品名', 'O', 'O', 'B-法人名', 'I-法人名', 'I-法人名', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O'])

[('製品名', 1, 2), ('法人名', 5, 7)]

In [77]:
from seqeval.metrics.sequence_labeling import get_entities
from transformers import PreTrainedTokenizer

def extract_entities(
    predictions: list[dict[str, Any]],
    dataset: list[dict[str, Any]],
    tokenizer: PreTrainedTokenizer,
    id2label: dict[int, str],
) -> list[dict[str, Any]]:
    """固有表現を抽出"""
    results = []
    for prediction, data in zip(predictions, dataset):
        # 文字のlistを取得する
        characters = list(data["text"])

        # 特殊トークンを除いたトークンのlistと予測ラベルのlistを取得する
        tokens, pred_labels = [], []
        all_tokens = tokenizer.convert_ids_to_tokens(
            prediction["input_ids"]
        )
        # print(prediction["pred_label_ids"])

        for token, label_id in zip(
            all_tokens, prediction["pred_label_ids"]
        ):
            # 特殊トークン以外をlistに追加する
            if token not in tokenizer.all_special_tokens:
                tokens.append(token)
                pred_labels.append(id2label[label_id])

        # 文字のlistとトークンのlistのアライメントをとる
        _, token_to_char_indices = get_alignments(characters, tokens)

        # 予測ラベルのlistから固有表現タイプと、
        # トークン単位の開始位置と終了位置を取得して、
        # それらを正解データと同じ形式に変換する
        pred_entities = []
        # print('pred_labels', pred_labels)
        for entity in get_entities(pred_labels):
            # print('entity:',entity)
            entity_type, token_start, token_end = entity
            # 文字単位の開始位置を取得する
            char_start = token_to_char_indices[token_start][0]
            # 文字単位の終了位置を取得する
            char_end = token_to_char_indices[token_end][-1] + 1
            pred_entity = {
                "name": "".join(characters[char_start:char_end]),
                "span": [char_start, char_end],
                "type": entity_type,
            }
            pred_entities.append(pred_entity)
        data["pred_entities"] = pred_entities
        results.append(data)

    return results

# 固有表現を抽出する
results = extract_entities(
    predictions, dataset["validation"], tokenizer, id2label
)
pprint(results[0])

{'curid': '1662110',
 'entities': [{'name': '復活篇', 'span': [1, 4], 'type': '製品名'},
              {'name': 'グリーンバニー', 'span': [6, 13], 'type': '法人名'}],
 'pred_entities': [{'name': '復活篇', 'span': [1, 4], 'type': '製品名'},
                   {'name': 'グリーンバニー', 'span': [6, 13], 'type': '法人名'}],
 'text': '「復活篇」はグリーンバニーからの発売となっている。'}


In [44]:
predictions[0]

{'input_ids': [2,
  395,
  13887,
  4436,
  396,
  465,
  14895,
  31227,
  7053,
  12488,
  464,
  12627,
  458,
  12493,
  456,
  12483,
  385,
  3,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0],
 'token_type_ids': [0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0],
 'special_tokens_mask': [1,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
 

In [45]:
# モデルを読み込む
model_name = "llm-book/bert-base-japanese-v3-ner-wikipedia-dataset"
best_model = AutoModelForTokenClassification.from_pretrained(
    model_name
)
best_model = best_model.to("cuda:0")

In [66]:
# テストセットに対して前処理を行う
test_dataset = dataset["test"].map(
    preprocess_data,
    fn_kwargs={
        "tokenizer": tokenizer,
        "label2id": label2id,
    },
    remove_columns=dataset["test"].column_names,
)
# ミニバッチの作成にDataLoaderを用いる
test_dataloader = DataLoader(
    test_dataset,
    batch_size=32,
    shuffle=False,
    collate_fn=data_collator,
)
# 固有表現ラベルを予測する
predictions = run_prediction(test_dataloader, best_model)
# 固有表現を抽出する
results = extract_entities(
    predictions, dataset["test"], tokenizer, id2label
)
# 正解データと予測データのラベルのlistを作成する
true_labels, pred_labels = convert_results_to_labels(results)
# 評価結果を出力する
print(classification_report(true_labels, pred_labels))

Map:   0%|          | 0/535 [00:00<?, ? examples/s]

100%|██████████| 17/17 [00:01<00:00, 16.91it/s]


[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 3, 4, 4, 4, 4, 0, 0, 0, 9, 0, 0, 0, 0, 0, 0, 0, 0, 0, 3, 4, 4, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 3, 3, 3, 0, 9, 0, 0, 0, 0, 0, 0, 3, 3, 4, 3, 3, 9, 9, 9, 0, 0]
pred_labels ['O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'B-イベント名', 'I-イベント名', 'I-イベント名', 'I-イベント名', 'I-イベント名', 'O', 'O', 'O', 'B-政治的組織名', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'B-イベント名', 'I-イベント名', 'I-イベント名', 'O', 'O', 'O', 'O', 'O', 'O', 'O']
entity: ('イベント名', 17, 21)
entity: ('政治的組織名', 25, 25)
entity: ('イベント名', 35, 37)
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
pred_labels ['O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O']
[0, 13, 14, 14, 0, 0, 0, 0, 15, 16, 16, 16, 0, 0, 0, 0, 0, 0, 0, 13, 15, 15, 16, 16, 16, 16, 16, 0, 0, 0, 0, 13, 13, 13, 13, 13, 14, 16, 1

In [78]:
results

[{'curid': '1662110',
  'text': '「復活篇」はグリーンバニーからの発売となっている。',
  'entities': [{'name': '復活篇', 'span': [1, 4], 'type': '製品名'},
   {'name': 'グリーンバニー', 'span': [6, 13], 'type': '法人名'}],
  'pred_entities': [{'name': '復活篇', 'span': [1, 4], 'type': '製品名'},
   {'name': 'グリーンバニー', 'span': [6, 13], 'type': '法人名'}]},
 {'curid': '3024498',
  'text': 'これらにより実質的な証拠調べが遅れたと日刊ゲンダイは報じている。',
  'entities': [{'name': '日刊ゲンダイ', 'span': [19, 25], 'type': '法人名'}],
  'pred_entities': [{'name': '日刊ゲンダイ', 'span': [19, 25], 'type': '法人名'}]},
 {'curid': '3576262',
  'text': 'プログラマのアンドリュー・スミスによれば、体の動きと頭の動きを独立させてしまうと、「カンニングできて」しまうパズルがあるという。',
  'entities': [{'name': 'アンドリュー・スミス', 'span': [6, 16], 'type': '人名'}],
  'pred_entities': [{'name': 'アンドリュー・スミス', 'span': [6, 16], 'type': '人名'}]},
 {'curid': '519044',
  'text': 'ポリュビオスに従えばピクトルは、第二次ポエニ戦争についてその責任をハミルカル・バルカ、ハンニバルらバルカ家に帰している。',
  'entities': [{'name': 'ポリュビオス', 'span': [0, 6], 'type': '人名'},
   {'name': 'ピクトル', 'span': [10, 14], 'type': '人名'},
   {'name': '第二次ポエニ戦

In [48]:
a = {'entities': [{'name': '義和団の乱', 'span': [27, 32], 'type': 'イベント名'},
   {'name': '清朝', 'span': [37, 39], 'type': '政治的組織名'},
   {'name': '辛亥革命', 'span': [53, 57], 'type': 'イベント名'}]}

b = {'pred_entities': [{'name': '統治機構の近代化により王朝を立て直すことに失敗、加えて',
    'span': [0, 27],
    'type': '_'},
   {'name': '義和団の乱', 'span': [27, 32], 'type': 'イベント名'},
   {'name': '後をめぐる', 'span': [32, 37], 'type': '_'},
   {'name': '清朝', 'span': [37, 39], 'type': '政治的組織名'},
   {'name': 'の醜態も加わり、1911年の', 'span': [39, 53], 'type': '_'},
   {'name': '辛亥革命', 'span': [53, 57], 'type': 'イベント名'}]}

a['entities'] == b['pred_entities']

False

In [79]:
def find_error_results(
    results: list[dict[str, Any]],
) -> list[dict[str, Any]]:
    """エラー事例を発見"""
    error_results = []
    for idx, result in enumerate(results): # 各事例を処理する
        result["idx"] = idx
        # 正解データと予測データが異なるならばlistに加える
        if result["entities"] != result["pred_entities"]:
            error_results.append(result)
    return error_results

def output_text_with_label(result: dict[str, Any], entity_column: str) -> str:
    """固有表現ラベル付きテキストを出力"""
    text_with_label = ""
    entity_count = 0
    for i, char in enumerate(result["text"]): # 各文字を処理する
        # 出力に加えていない固有表現の有無を判定する
        if entity_count < len(result[entity_column]):
            entity = result[entity_column][entity_count]
            # 固有表現の先頭の処理を行う
            if i == entity["span"][0]:
                entity_type = entity["type"]
                text_with_label += f" [({entity_type}) "
            text_with_label += char
            # 固有表現の末尾の処理を行う
            if i == entity["span"][1] - 1:
                text_with_label += "] "
                entity_count += 1
        else:
            text_with_label += char
    return text_with_label

# エラー事例を発見する
error_results = find_error_results(results)
# 3件のエラー事例を出力する
for result in error_results[:3]:
    idx = result["idx"]
    true_text = output_text_with_label(result, "entities")
    pred_text = output_text_with_label(result, "pred_entities")
    print(f"事例{idx}の正解: {true_text}")
    print(f"事例{idx}の予測: {pred_text}")
    print()

事例7の正解: しかし、新たなキーテナントの [(施設名) マルエイ] も2009年4月22日に閉店してしまう。
事例7の予測: しかし、新たなキーテナントの [(法人名) マルエイ] も2009年4月22日に閉店してしまう。

事例9の正解: 8月8日、 [(法人名) デトロイト] 製 [(製品名) DT12シーケンシャルマニュアルトランスミッション] の搭載を開始。
事例9の予測: 8月8日、 [(法人名) デトロイト] 製 [(製品名) DT12シー] ケンシャルマニュアルトランスミッションの搭載を開始。

事例10の正解:  [(法人名) イオン]  [(施設名) 上田店] を核に63の専門店が並ぶ。
事例10の予測:  [(施設名) イオン上田店] を核に63の専門店が並ぶ。



In [80]:
def create_transitions(label2id: dict[str, int]) -> tuple[torch.Tensor, torch.Tensor, torch.Tensor]:
    """遷移スコアを定義"""
    # "B-"のラベルIDのlist
    b_ids = [v for k, v in label2id.items() if k[0] == "B"]
    # I-のラベルIDのlist
    i_ids = [v for k, v in label2id.items() if k[0] == "I"]
    o_id = label2id["O"]  # OのラベルID

    # 開始遷移スコアを定義する
    # すべてのスコアを-100で初期化する
    start_transitions = torch.full([len(label2id)], -100.0)
    # "B-"のラベルへ遷移可能として0を代入する
    start_transitions[b_ids] = 0
    # "O"のラベルへ遷移可能として0を代入する
    start_transitions[o_id] = 0

    # ラベル間の遷移スコアを定義する
    # すべてのスコアを-100で初期化する
    # # 行列（遷移前のラベルのid, 遷移後のラベルid）
    transitions = torch.full([len(label2id), len(label2id)], -100.0)
    # すべてのラベルから"B-"へ遷移可能として0を代入する
    transitions[:, b_ids] = 0
    # すべてのラベルから"O"へ遷移可能として0を代入する
    transitions[:, o_id] = 0
    # "B-"から同じタイプの"I-"へ遷移可能として0を代入する
    transitions[b_ids, i_ids] = 0
    # "I-"から同じタイプの"I-"へ遷移可能として0を代入する
    transitions[i_ids, i_ids] = 0

    # 終了遷移スコアを定義する
    # すべてのラベルから遷移可能としてすべてのスコアを0とする
    end_transitions = torch.zeros(len(label2id))
    return start_transitions, transitions, end_transitions

# 遷移スコアを定義する
start_transitions, transitions, end_transitions = create_transitions(
    label2id
)

In [83]:
label2id

{'O': 0,
 'B-その他の組織名': 1,
 'I-その他の組織名': 2,
 'B-イベント名': 3,
 'I-イベント名': 4,
 'B-人名': 5,
 'I-人名': 6,
 'B-地名': 7,
 'I-地名': 8,
 'B-政治的組織名': 9,
 'I-政治的組織名': 10,
 'B-施設名': 11,
 'I-施設名': 12,
 'B-法人名': 13,
 'I-法人名': 14,
 'B-製品名': 15,
 'I-製品名': 16}

In [85]:
 torch.full([len(label2id), len(label2id)], -100.0).size()

torch.Size([17, 17])

# ピタビアルゴリズムを用いたラベル列の予想

In [88]:
def decode_with_viterbi(
    emissions: torch.Tensor,  # ラベルの予測スコア
    mask: torch.Tensor,  # マスク
    start_transitions: torch.Tensor,  # 開始遷移スコア
    transitions: torch.Tensor,  # ラベル間の遷移スコア (ラベル, ラベル)
    end_transitions: torch.Tensor,  # 終了遷移スコア
) -> torch.Tensor:
    """ビタビアルゴリズムを用いて最適なラベル列を探索"""
    # バッチサイズと系列長を取得する
    batch_size, seq_length = mask.shape
    # 予測スコアとマスクに関して、0次元目と1次元目を入れ替える
    emissions = emissions.transpose(1, 0) # ====>　三次元では？(バッチ, 系列, ラベル) => (系列, バッチ, ラベル)
    mask = mask.transpose(1, 0) # (バッチ, 系列) => (系列, バッチ)

    histories = []  # 最適なラベル系列を保存するための履歴のlist
    # 開始遷移スコアと予測スコアを加算して、累積スコアの初期値とする
    score = start_transitions + emissions[0] # (系列, バッチ,ラベル) => 0系列の(バッチ,ラベル) 
    for i in range(1, seq_length):
        # 累積スコアを3次元に変換する
        broadcast_score = score.unsqueeze(2) # 0系列の(バッチ,ラベル) => (バッチ,ラベル, 1)
        # 現在の予測スコアを3次元に変換する
        broadcast_emission = emissions[i].unsqueeze(1)  # i系列の(バッチ,ラベル) => i系列の(バッチ, 1,ラベル)
        # 累積スコアと遷移スコアと現在の予測スコアを加算して、
        # 現在の累積スコアを取得する
        next_score = (
            # (バッチ,ラベル, 1) + (ラベル, ラベル) => (バッチ,ラベル,ラベル)
            # (バッチ,ラベル,ラベル) + i系列の(バッチ, 1,ラベル) => i系列までのスコアを算出
            broadcast_score + transitions + broadcast_emission
        )
        # 現在の累積スコアの各ラベルの最大値とそのインデックスを取得する
        next_score, indices = next_score.max(dim=1)
        # マスクしない要素の場合、累積スコアを更新する
        score = torch.where(mask[i].unsqueeze(1), next_score, score)
        # スコアの高いインデックスを履歴のlistに追加する
        histories.append(indices)
    # 終了遷移スコアを加算して合計スコアとする
    score += end_transitions

    # 各事例で最適なラベル列を取得する
    best_labels_list = []
    for i in range(batch_size):
        # 合計スコアの中で最大のスコアとなるラベルを取得する
        _, best_last_label = score[i].max(dim=0)
        best_labels = [best_last_label.item()]
        # 最後のラベルの遷移を逆方向に探索し、最適なラベル列を取得する
        for history in reversed(histories):
            best_last_label = history[i][best_labels[-1]]
            best_labels.append(best_last_label.item())
        # 順序を反転する
        best_labels.reverse()
        best_labels_list.append(best_labels)
    return torch.LongTensor(best_labels_list)

In [89]:
def run_prediction_viterbi(
    dataloader: DataLoader,
    model: PreTrainedModel,
) -> list[dict[str, Any]]:
    """ビダビアルゴリズムを用いてラベルを予測"""
    # 遷移スコアを取得する
    start_transitions, transitions, end_transitions = (
        create_transitions(model.config.label2id)
    )

    predictions = []
    for batch in tqdm(dataloader):  # 各ミニバッチを処理する
        inputs = {
            k: v.to(model.device)
            for k, v in batch.items()
            if k != "special_tokens_mask"
        }
        # [CLS]以外の予測スコアを取得する
        logits = model(**inputs).logits.cpu()[:, 1:, :]
        # [CLS]以外の特殊トークンのマスクを取得する
        mask = (batch["special_tokens_mask"].cpu() == 0)[:, 1:]
        # ビタビアルゴリズムを用いて最適なIDの系列を探索する
        pred_label_ids = decode_with_viterbi(
            logits,
            mask,
            start_transitions,
            transitions,
            end_transitions,
        )
        # [CLS]のIDを0とする
        cls_pred_label_id = torch.zeros(pred_label_ids.shape[0], 1)
        # [CLS]のIDと探索したIDの系列を連結して予測ラベルとする
        batch["pred_label_ids"] = torch.concat(
            [cls_pred_label_id, pred_label_ids], dim=1
        )
        batch = {k: v.cpu().tolist() for k, v in batch.items()}
        # ミニバッチのデータを事例単位のlistに変換する
        predictions += convert_list_dict_to_dict_list(batch)
    return predictions

# ビタビアルゴリズムを用いてラベルを予測する
predictions = run_prediction_viterbi(test_dataloader, best_model)
# 固有表現を抽出する
results = extract_entities(
    predictions, dataset["test"], tokenizer, id2label
)
# 正解データと予測データのラベルのlistを作成する
true_labels, pred_labels = convert_results_to_labels(results)
# 評価結果を出力する
print(classification_report(true_labels, pred_labels))

100%|██████████| 17/17 [00:04<00:00,  4.09it/s]


              precision    recall  f1-score   support

     その他の組織名       0.86      0.83      0.85       100
       イベント名       0.84      0.94      0.89        93
          人名       0.96      0.96      0.96       287
          地名       0.87      0.87      0.87       204
      政治的組織名       0.79      0.91      0.85       106
         施設名       0.88      0.86      0.87       137
         法人名       0.90      0.88      0.89       248
         製品名       0.79      0.81      0.80       158

   micro avg       0.88      0.89      0.88      1333
   macro avg       0.86      0.88      0.87      1333
weighted avg       0.88      0.89      0.88      1333



In [150]:
# logits, mask, start_transitions, transitions, end_transitionsの取得

def test(
    dataloader: DataLoader,
    model: PreTrainedModel,
) -> list[dict[str, Any]]:
    """ビダビアルゴリズムを用いてラベルを予測"""
    # 遷移スコアを取得する
    start_transitions, transitions, end_transitions = (
        create_transitions(model.config.label2id)
    )

    predictions = []
    for batch in tqdm(dataloader):  # 各ミニバッチを処理する
        inputs = {
            k: v.to(model.device)
            for k, v in batch.items()
            if k != "special_tokens_mask"
        }
        # [CLS]以外の予測スコアを取得する
        logits = model(**inputs).logits.cpu()[:, 1:, :]
        # [CLS]以外の特殊トークンのマスクを取得する
        mask = (batch["special_tokens_mask"].cpu() == 0)[:, 1:]
        # ビタビアルゴリズムを用いて最適なIDの系列を探索する
        pred_label_ids = decode_with_viterbi(
            logits,
            mask,
            start_transitions,
            transitions,
            end_transitions,
        )

    return logits, mask, start_transitions, transitions, end_transitions

In [151]:
logits, mask, start_transitions, transitions, end_transitions = test(test_dataloader, best_model)

100%|██████████| 17/17 [00:31<00:00,  1.83s/it]


In [93]:
logits.shape

torch.Size([23, 83, 17])

In [96]:
len(test_dataloader)

17

In [111]:
for i in test_dataloader:
    pprint(i['labels'].size())
    pprint(i)
    break

torch.Size([32, 70])
{'attention_mask': tensor([[1, 1, 1,  ..., 0, 0, 0],
        [1, 1, 1,  ..., 0, 0, 0],
        [1, 1, 1,  ..., 0, 0, 0],
        ...,
        [1, 1, 1,  ..., 1, 1, 1],
        [1, 1, 1,  ..., 0, 0, 0],
        [1, 1, 1,  ..., 0, 0, 0]]),
 'input_ids': tensor([[    2, 15233, 14227,  ...,     0,     0,     0],
        [    2, 18275,   465,  ...,     0,     0,     0],
        [    2, 12754, 12587,  ...,     0,     0,     0],
        ...,
        [    2, 18678,  7067,  ...,   449,   385,     3],
        [    2, 13888,  2002,  ...,     0,     0,     0],
        [    2, 17967, 12893,  ...,     0,     0,     0]]),
 'labels': tensor([[-100,    0,    0,  ..., -100, -100, -100],
        [-100,    0,    0,  ..., -100, -100, -100],
        [-100,   13,   14,  ..., -100, -100, -100],
        ...,
        [-100,   15,   16,  ...,    0,    0, -100],
        [-100,    0,    0,  ..., -100, -100, -100],
        [-100,    3,    4,  ..., -100, -100, -100]]),
 'special_tokens_mask': te

In [142]:
emissions = logits

batch_size, seq_length = mask.shape

emissions = emissions.transpose(1, 0) # ====>　三次元では？(バッチ, 系列, ラベル) => (系列, バッチ, ラベル)
mask = mask.transpose(1, 0) # (バッチ, 系列) => (系列, バッチ)

histories = []  

score = start_transitions + emissions[0] # (系列, バッチ,ラベル) => 0系列の(バッチ,ラベル) 
for i in range(1, seq_length):
    # 累積スコアを3次元に変換する
    broadcast_score = score.unsqueeze(2) # 0系列の(バッチ,ラベル) => (バッチ,ラベル, 1)
    # 現在の予測スコアを3次元に変換する
    broadcast_emission = emissions[i].unsqueeze(1)  # i系列の(バッチ,ラベル) => i系列の(バッチ, 1,ラベル)
    # 累積スコアと遷移スコアと現在の予測スコアを加算して、
    # 現在の累積スコアを取得する
    next_score = (
        # (バッチ,ラベル, 1) + (ラベル, ラベル) => (バッチ,ラベル,ラベル)
        # (バッチ,ラベル,ラベル) + i系列の(バッチ, 1,ラベル) => i系列までのスコアを算出
        broadcast_score + transitions + broadcast_emission
    )
    # 現在の累積スコアの各ラベルの最大値とそのインデックスを取得する
    next_score, indices = next_score.max(dim=1)
    # マスクしない要素の場合、累積スコアを更新する
    score = torch.where(mask[i].unsqueeze(1), next_score, score)
    # スコアの高いインデックスを履歴のlistに追加する
    histories.append(indices)
# 終了遷移スコアを加算して合計スコアとする
score += end_transitions

# # 各事例で最適なラベル列を取得する
best_labels_list = []
for i in range(batch_size):
    # 合計スコアの中で最大のスコアとなるラベルを取得する
    _, best_last_label = score[i].max(dim=0)
    best_labels = [best_last_label.item()]
    # 最後のラベルの遷移を逆方向に探索し、最適なラベル列を取得する
    for history in reversed(histories):
        best_last_label = history[i][best_labels[-1]]
        best_labels.append(best_last_label.item())
    # 順序を反転する
    best_labels.reverse()
    best_labels_list.append(best_labels)
torch.LongTensor(best_labels_list)

tensor([[0, 0, 0,  ..., 0, 0, 0],
        [0, 0, 0,  ..., 0, 0, 0],
        [0, 0, 0,  ..., 0, 0, 0],
        ...,
        [5, 6, 6,  ..., 0, 0, 0],
        [0, 0, 0,  ..., 0, 0, 0],
        [5, 6, 6,  ..., 0, 0, 0]])

In [132]:
matrix1 = torch.full((3, 5, 1), 10)
matrix2 = torch.full((5, 5), 2)

matrix1 + matrix2


tensor([[[12, 12, 12, 12, 12],
         [12, 12, 12, 12, 12],
         [12, 12, 12, 12, 12],
         [12, 12, 12, 12, 12],
         [12, 12, 12, 12, 12]],

        [[12, 12, 12, 12, 12],
         [12, 12, 12, 12, 12],
         [12, 12, 12, 12, 12],
         [12, 12, 12, 12, 12],
         [12, 12, 12, 12, 12]],

        [[12, 12, 12, 12, 12],
         [12, 12, 12, 12, 12],
         [12, 12, 12, 12, 12],
         [12, 12, 12, 12, 12],
         [12, 12, 12, 12, 12]]])

In [148]:
best_last_label

tensor(5)

In [149]:
def run_prediction_viterbi(
    dataloader: DataLoader,
    model: PreTrainedModel,
) -> list[dict[str, Any]]:
    """ビダビアルゴリズムを用いてラベルを予測"""
    # 遷移スコアを取得する
    start_transitions, transitions, end_transitions = (
        create_transitions(model.config.label2id)
    )

    predictions = []
    for batch in tqdm(dataloader):  # 各ミニバッチを処理する
        inputs = {
            k: v.to(model.device)
            for k, v in batch.items()
            if k != "special_tokens_mask"
        }
        # [CLS]以外の予測スコアを取得する
        logits = model(**inputs).logits.cpu()[:, 1:, :]
        # [CLS]以外の特殊トークンのマスクを取得する
        mask = (batch["special_tokens_mask"].cpu() == 0)[:, 1:]
        # ビタビアルゴリズムを用いて最適なIDの系列を探索する
        pred_label_ids = decode_with_viterbi(
            logits,
            mask,
            start_transitions,
            transitions,
            end_transitions,
        )
        # [CLS]のIDを0とする
        cls_pred_label_id = torch.zeros(pred_label_ids.shape[0], 1)
        # [CLS]のIDと探索したIDの系列を連結して予測ラベルとする
        batch["pred_label_ids"] = torch.concat(
            [cls_pred_label_id, pred_label_ids], dim=1
        )
        batch = {k: v.cpu().tolist() for k, v in batch.items()}
        # ミニバッチのデータを事例単位のlistに変換する
        predictions += convert_list_dict_to_dict_list(batch)
    return predictions

# ビタビアルゴリズムを用いてラベルを予測する
predictions = run_prediction_viterbi(test_dataloader, best_model)
# 固有表現を抽出する
results = extract_entities(
    predictions, dataset["test"], tokenizer, id2label
)
# 正解データと予測データのラベルのlistを作成する
true_labels, pred_labels = convert_results_to_labels(results)
# 評価結果を出力する
print(classification_report(true_labels, pred_labels))

100%|██████████| 17/17 [00:27<00:00,  1.60s/it]


              precision    recall  f1-score   support

     その他の組織名       0.86      0.83      0.85       100
       イベント名       0.84      0.94      0.89        93
          人名       0.96      0.96      0.96       287
          地名       0.87      0.87      0.87       204
      政治的組織名       0.79      0.91      0.85       106
         施設名       0.88      0.86      0.87       137
         法人名       0.90      0.88      0.89       248
         製品名       0.79      0.81      0.80       158

   micro avg       0.88      0.89      0.88      1333
   macro avg       0.86      0.88      0.87      1333
weighted avg       0.88      0.89      0.88      1333

