<a href="https://colab.research.google.com/github/chottokun/ModernBERT_NER_ja/blob/main/modernBERT_JA_NER_Sudachi.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## ModernBERT NER の整理



In [None]:
# ModernBERT ベースモデル・HFレポジトリ名設定

# ベースモデルとなるモデル　hugging
model_name = "cl-nagoya/ruri-v3-130m"

# Hugging Faceユーザー名または組織名 (例: "your-username" または "your-org")
# ログインしているユーザーのリポジトリに作成する場合、ユーザー名または組織名は不要です。
# repo_id = "your-username/japanese-ner-wikipedia-sudachi" # ユーザー名を指定する場合
repo_id = "Chottokun/ruri-v3-pt-130m_ner_wikipedia" # ログインユーザーのリポジトリに作成する場合

# 学習条件
EARLY_STOP = True # False に設定すると Early Stopping は無効になります
NUM_TRAIN_EPOCHS = 50 # Early Stoppingを使用する場合、num_train_epochsは最大エポック数となる


## Hugging Face Hub へのログイン

### Subtask:
Hugging Face Hub に HF-TOKEN を使用してログインします。

**Reasoning**:
Log in to the Hugging Face Hub programmatically using the HF-TOKEN stored as a Colab secret for authentication.

In [None]:
from huggingface_hub import notebook_login
from google.colab import userdata

# HF-TOKEN を Colab の Secrets から取得してログイン
try:
    hf_token = userdata.get('HF_TOKEN')
    if hf_token:
        # notebook_login() # これによりトークン入力プロンプトが表示されるか、既にログイン済みならスキップされる
        # Alternatively, you can directly login with the token if notebook_login is not interactive enough
        from huggingface_hub import login
        login(token=hf_token)
    else:
        print("Colab Secrets に 'HF_TOKEN' が設定されていません。設定してください。")
except Exception as e:
    print(f"Hugging Face Hub ログイン中にエラーが発生しました: {e}")

## ライブラリのインストール

### Subtask:
必要なライブラリ（`transformers`, `datasets`, `SudachiPy`, `SudachiDict_core`, `spacy-alignments`, `seqeval`, `evaluate`）をインストールします。

In [None]:
!pip install -U -q transformers datasets SudachiPy SudachiDict_core spacy-alignments seqeval evaluate

## データセットとトークナイザーの準備

### Subtask:
データセットをロードし、モデルのトークナイザー、Sudachiトークナイザー、およびラベルのマッピングを準備します。

In [None]:
from transformers import AutoTokenizer
from datasets import load_dataset
from sudachipy import Dictionary, SplitMode

# モデルとトークナイザーの準備
# model_name = "cl-nagoya/ruri-v3-130m"
tokenizer = AutoTokenizer.from_pretrained(model_name)

# Sudachi辞書の準備
dict_obj = Dictionary()
tokenizer_obj = dict_obj.create()

# カスタムSudachiトークナイザー関数の定義
def sudachi_tokenizer(text):
    # A mode for short units
    tokens = [m.surface() for m in tokenizer_obj.tokenize(text, SplitMode.A)]
    return tokens

# データセットの準備
dataset = load_dataset("stockmark/ner-wikipedia-dataset")

# ラベルのマッピング
label_list = ["O", "B-人名", "I-人名", "B-法人名", "I-法人名", "B-政治的組織名", "I-政治的組織名",
              "B-その他の組織名", "I-その他の組織名", "B-地名", "I-地名", "B-施設名", "I-施設名",
              "B-製品名", "I-製品名", "B-イベント名", "I-イベント名"]
label2id = {l: i for i, l in enumerate(label_list)}
id2label = {i: l for l, i in label2id.items()}

print("データセット、トークナイザー、ラベルマッピングの準備が完了しました。")

## データ前処理とラベルアライメント

### Subtask:
Sudachiで分かち書きを行い、Hugging Faceトークナイザーでサブワード化し、`spacy-alignments`を用いてラベルをアライメントします。

In [None]:
import spacy_alignments as tokenizations # Import spacy_alignments

def tokenize_and_align_labels_sudachi(examples):
    all_input_ids = []
    all_attention_mask = []
    all_labels = []

    for i, text in enumerate(examples["text"]):
        entities = examples["entities"][i]
        # 1. Sudachiで分かち書きを行う
        words = sudachi_tokenizer(text)

        # 2. 分かち書きした単語列をHugging Faceトークナイザーに入力
        #    is_split_into_words=True を使用し、return_offsets_mapping=True でオフセット情報を取得
        tokenized_inputs = tokenizer(
            words,
            is_split_into_words=True,
            truncation=True,
            return_offsets_mapping=True,
            return_attention_mask=True,
            add_special_tokens=True # Special tokens are added here
        )

        input_ids = tokenized_inputs["input_ids"]
        attention_mask = tokenized_inputs["attention_mask"]
        offset_mapping = tokenized_inputs["offset_mapping"]

        # 3. spacy_alignments を使用して単語とサブワードのアライメントを取得
        #    Hugging Face tokenizer converts words to subwords, so we need to convert token_ids back to tokens for alignment
        #    Exclude special tokens from subwords for alignment when creating subwords_for_alignment
        subwords_for_alignment = tokenizer.convert_ids_to_tokens(tokenizer(words, is_split_into_words=True, add_special_tokens=False).input_ids)


        words2subwords, subwords2words = tokenizations.get_alignments(words, subwords_for_alignment)


        # 4. エンティティラベルをサブワードにアライメント
        aligned_labels = [-100] * len(input_ids)

        # Create word-level labels first based on original text and entities
        word_labels = ["O"] * len(words)
        # Calculate character spans for each word in the original text
        word_char_spans = []
        current_char_index = 0
        for word_text in words:
            start_index = text.find(word_text, current_char_index)
            if start_index != -1:
                end_index = start_index + len(word_text)
                word_char_spans.append((start_index, end_index))
                current_char_index = end_index
            else:
                 word_char_spans.append((-1, -1)) # Should not happen with correct tokenization

        # Assign B- or I- labels at the word level
        for ent in entities:
             ent_start_char, ent_end_char = ent["span"]
             ent_type = ent["type"]

             for word_index, (word_char_start, word_char_end) in enumerate(word_char_spans):
                  if word_char_start != -1 and max(word_char_start, ent_start_char) < min(word_char_end, ent_end_char):
                       # Word overlaps with an entity
                       if word_char_start >= ent_start_char: # If the word starts at or after the entity start
                            is_entity_start = False
                            # Check if the previous word is part of the same entity
                            if word_index == 0:
                                is_entity_start = True
                            else:
                                prev_word_char_start, prev_word_char_end = word_char_spans[word_index - 1]
                                prev_word_in_same_entity = False
                                if prev_word_char_start != -1:
                                     for prev_ent in entities:
                                          prev_ent_start, prev_ent_end = prev_ent["span"]
                                          prev_ent_type = prev_ent["type"]
                                          # Check if the previous word's character span overlaps with the same entity type
                                          if max(prev_word_char_start, prev_ent_start) < min(prev_word_char_end, prev_ent_end) and prev_ent_type == ent_type:
                                                prev_word_in_same_entity = True
                                                break
                                if not prev_word_in_same_entity:
                                     is_entity_start = True

                            word_labels[word_index] = "B-" + ent_type if is_entity_start else "I-" + ent_type


        # Transfer word-level labels to subword-level labels using alignments
        for subword_index in range(len(input_ids)):
            if tokenizer.convert_ids_to_tokens(input_ids[subword_index]) in tokenizer.all_special_tokens:
                 aligned_labels[subword_index] = -100
            else:
                 # Get the word index(es) aligned to this subword (adjusting for special tokens)
                 # The alignment from spacy_alignments is based on subwords_for_alignment (without special tokens)
                 # We need to map the current subword_index (from input_ids with special tokens)
                 # to the corresponding index in subwords_for_alignment.
                 # Assuming special tokens are only at the beginning ([CLS]) and end ([SEP]),
                 # the subword at index `k` in input_ids (where k > 0 and k < len(input_ids) - 1)
                 # corresponds to the subword at index `k-1` in subwords_for_alignment.
                 subword_alignment_index = subword_index - 1 # Adjust for [CLS] token at the beginning

                 if subword_alignment_index >= 0 and subword_alignment_index < len(subwords2words):
                      aligned_word_indices = subwords2words[subword_alignment_index]

                      if aligned_word_indices:
                           # Take the label of the first aligned word
                           first_aligned_word_index = aligned_word_indices[0]
                           if first_aligned_word_index < len(word_labels):
                                word_level_label = word_labels[first_aligned_word_index]

                                if word_level_label != "O":
                                     # If this subword is the first subword of the first aligned word, assign B-
                                     # Otherwise, assign I-
                                     is_first_subword_of_word = (first_aligned_word_index < len(words2subwords)) and (subword_alignment_index in words2subwords[first_aligned_word_index]) and (words2subwords[first_aligned_word_index][0] == subword_alignment_index)


                                     if is_first_subword_of_word:
                                          aligned_labels[subword_index] = label2id[word_level_label]
                                     else:
                                          # Ensure I- label matches the B- label type
                                          if word_level_label.startswith("B-"):
                                               aligned_labels[subword_index] = label2id["I-" + word_level_label.split("-")[1]]
                                          else:
                                               aligned_labels[subword_index] = label2id[word_level_label] # It's already an I- label at word level
                                else:
                                     aligned_labels[subword_index] = label2id["O"]
                      else:
                           # Subword aligned to no word (should not happen with is_split_into_words=True and proper alignment)
                           aligned_labels[subword_index] = label2id["O"] # Default to O
                 else:
                      # This case handles the last special token ([SEP]) and any potential padding tokens
                      aligned_labels[subword_index] = -100


        all_input_ids.append(input_ids)
        all_attention_mask.append(attention_mask)
        all_labels.append(aligned_labels)


    return {
        "input_ids": all_input_ids,
        "attention_mask": all_attention_mask,
        "labels": all_labels
    }


# データセットに前処理を適用 (分割前にmapを適用)
tokenized_datasets_sudachi = dataset.map(
    tokenize_and_align_labels_sudachi,
    batched=True,
    remove_columns=dataset["train"].column_names,
)

# "train" 分割から 10% を検証用として作成 (map処理後に分割)
split_datasets_sudachi = tokenized_datasets_sudachi["train"].train_test_split(test_size=0.1)

print("データ前処理とラベルアライメントが完了しました。")

# 確認のために最初の数件を表示
df_sudachi = split_datasets_sudachi["train"].select(range(5)).to_pandas()
display(df_sudachi)

## トレーニングの実行

### Subtask:
更新したデータセットでモデルのトレーニングを実行します。

In [None]:
import torch
from transformers import AutoModelForTokenClassification, TrainingArguments, Trainer, get_linear_schedule_with_warmup, DataCollatorForTokenClassification
import numpy as np
import evaluate
from transformers.trainer_callback import EarlyStoppingCallback

# Early Stopping を有効にするかのフラグ
EARLY_STOP = True # False に設定すると Early Stopping は無効になります

# モデルの準備
# model_name = "sbintuitions/modernbert-ja-130m"
model = AutoModelForTokenClassification.from_pretrained(model_name, num_labels=len(label_list))

# 評価指標の準備
metric = evaluate.load("seqeval")

# 評価指標の計算関数
def compute_metrics(eval_preds):
    logits, labels = eval_preds
    predictions = np.argmax(logits, axis=-1)

    # Remove ignored index (special tokens) and convert to labels
    true_labels = [[label_list[l] for l in label if l != -100] for label in labels]
    true_predictions = [
        [label_list[p] for (p, l) in zip(prediction, label) if l != -100]
        for prediction, label in zip(predictions, labels)
    ]
    all_metrics = metric.compute(predictions=true_predictions, references=true_labels)
    return {
        "precision": all_metrics["overall_precision"],
        "recall": all_metrics["overall_recall"],
        "f1": all_metrics["overall_f1"],
        "accuracy": all_metrics["overall_accuracy"],
    }

# トレーニングの設定
training_args = TrainingArguments(
    output_dir="./results_sudachi",
    eval_strategy="epoch", # Early Stoppingにはevaluation_strategyが"epoch"または"steps"である必要がある
    per_device_train_batch_size=16,
    per_device_eval_batch_size=16,
    logging_dir="./logs_sudachi",
    num_train_epochs=NUM_TRAIN_EPOCHS, # Early Stoppingを使用する場合、num_train_epochsは最大エポック数となる
    weight_decay=0.01,
    load_best_model_at_end=True, # Early Stoppingにはload_best_model_at_endがTrueである必要がある
    save_strategy="epoch",
    report_to="none",
    metric_for_best_model="f1", # 監視する評価指標 (compute_metricsのキーと一致させる)
    greater_is_better=True, # 監視する評価指標が大きい方が良いか (f1は大きい方が良い)
)

# オプティマイザと学習率スケジューラの設定
optimizer = torch.optim.AdamW(model.parameters(), lr=training_args.learning_rate) # Use learning_rate from training_args
num_training_steps = len(split_datasets_sudachi["train"]) // training_args.per_device_train_batch_size * training_args.num_train_epochs
num_warmup_steps = int(training_args.warmup_ratio * num_training_steps) if training_args.warmup_ratio is not None else training_args.warmup_steps

scheduler = get_linear_schedule_with_warmup(
    optimizer,
    num_warmup_steps=num_warmup_steps,
    num_training_steps=num_training_steps
)

# データコレーターの作成
data_collator = DataCollatorForTokenClassification(tokenizer)

# Early Stopping コールバックの設定 (条件付きで追加)
callbacks = []
if EARLY_STOP:
    early_stopping_callback = EarlyStoppingCallback(
        early_stopping_patience=3, # 監視指標がこのエポック/ステップ数改善しない場合に停止
        early_stopping_threshold=0.0, # 改善とみなす最小の変化量 (0.0でわずかな変化でも改善とみなす)
    )
    callbacks.append(early_stopping_callback)


# Trainerの初期化と学習の実行
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=split_datasets_sudachi["train"],
    eval_dataset=split_datasets_sudachi["test"],
    data_collator=data_collator,
    compute_metrics=compute_metrics,
    tokenizer=tokenizer,
    optimizers=(optimizer, scheduler),
    callbacks=callbacks, # Callbacks リストを渡す
)

# トレーニングの開始
trainer.train()

# モデルの保存
trainer.save_model("./japanese_ner_wikipedia_sudachi")

## モデルの評価 (詳細)

### Subtask:
トレーニング済みのモデルをテストデータセットで評価し、詳細な評価指標を表示します。

In [None]:
# トレーニング済みモデルでの評価を実行
evaluation_results = trainer.evaluate(split_datasets_sudachi["test"])

# 評価結果を表示
print("--- 詳細な評価結果 ---")
import pprint # Add import statement
pprint.pprint(evaluation_results)

## サンプル予測結果の確認

### Subtask:
テストデータセットからいくつかのサンプルを選び、モデルの予測結果を元の文章、正解ラベルと比較して表示します。

In [None]:
# 実際の予測結果を確認する
predicted_labels = []

# テストデータセットから10個をサンプリング
for i in range(0, 10):
    # テストデータセットからサンプルを1つ選択
    sample = split_datasets_sudachi["test"][i] # Use split_datasets_sudachi

    # Trainerのpredictメソッドはデータセットまたはデータセットのリストを受け取る
    # 単一のサンプルに対して予測を行う場合は、データセット形式に合わせる必要がある
    # または、pipelineを使用する方が単一テキストの予測には適している

    # trainer.predict([sample]) の代わりに pipeline を使用して手軽に予測を行う
    # ただし、pipelineはモデルのロードが必要。ここではtrainerオブジェクトが使えるのでtrainer.predictを使う方向で調整
    # trainer.predictはBatchConverterを内部で使用するため、単一サンプルをリストで渡すのが適切
    predictions = trainer.predict([sample])


    # 予測結果を解釈
    predicted_label_ids = predictions.predictions.argmax(-1)[0]

    # special tokenを除外してラベル名に変換
    # sample["labels"] も特殊トークンは-100になっている前提
    true_predictions = [
        label_list[p_id] for p_id, l_id in zip(predicted_label_ids, sample["labels"]) if l_id != -100
    ]
    true_labels = [
        label_list[l_id] for l_id in sample["labels"] if l_id != -100
    ]

    # トークンを取得 (特殊トークンを除外)
    tokens = tokenizer.convert_ids_to_tokens(sample['input_ids'])
    # アライメントされたラベルに対応するトークンのみを取得するため、-100でないラベルに対応するトークンを選ぶ
    aligned_tokens = [token for token, l_id in zip(tokens, sample["labels"]) if l_id != -100]


    # 元の文章と予測結果を出力
    print("元の文章トークン:", ' '.join(aligned_tokens))
    print("予測ラベル:", true_predictions)  # special tokenを除外した予測結果を出力
    print("正解ラベル:", true_labels) # special tokenを除外した正解ラベルを出力
    # 正誤判定はラベル名で行う
    print("正誤:", ["○" if p == l else "×" for p, l in zip(true_predictions, true_labels)])
    print("---")

## Hugging Face Hub にモデルをプッシュ

### Subtask:
Hugging Face Hub に新しいリポジトリを作成し、トレーニング済みのモデルファイルをアップロードします。

In [None]:
from huggingface_hub import create_repo, HfApi
import os

# Hugging Faceユーザー名または組織名 (例: "your-username" または "your-org")
# ログインしているユーザーのリポジトリに作成する場合、ユーザー名または組織名は不要です。
# repo_id = "your-username/japanese-ner-wikipedia-sudachi" # ユーザー名を指定する場合
# repo_id = "Chottokun/ruri-v3-pt-130m_ner_wikipedia" # ログインユーザーのリポジトリに作成する場合

# リポジトリを作成
try:
    create_repo(repo_id, repo_type="model", exist_ok=True)
    print(f"リポジトリ '{repo_id}' が作成または既に存在します。")
except Exception as e:
    print(f"リポジトリの作成中にエラーが発生しました: {e}")


# トレーニング済みのモデルディレクトリ
local_model_path = "./japanese_ner_wikipedia"

# モデルファイルをHugging Face Hubにプッシュ
# Trainer オブジェクトが push_to_hub メソッドを持っている場合
try:
    if 'trainer' in locals() and hasattr(trainer, 'push_to_hub'):
        print(f"'{local_model_path}' のモデルファイルを '{repo_id}' にプッシュします...")
        trainer.push_to_hub(repo_id)
        print("モデルのプッシュが完了しました。")
    else:
        print("Trainer オブジェクトが見つからないか、push_to_hub メソッドがありません。")
        print("代わりに HfApi を使用してファイルをアップロードします。")

        # HfApi を使用してファイルをアップロードする場合
        api = HfApi()
        # アップロードするファイルリスト (必要に応じて調整)
        files_to_upload = [
            os.path.join(local_model_path, "config.json"),
            os.path.join(local_model_path, "pytorch_model.bin"),
            os.path.join(local_model_path, "training_args.bin"),
            # Add other necessary files like tokenizer files if they were saved to local_model_path
            os.path.join(local_model_path, "tokenizer.json"),
            os.path.join(local_model_path, "special_tokens_map.json"),
            os.path.join(local_model_path, "tokenizer_config.json"),
            os.path.join(local_model_path, "vocab.txt"), # If using a vocab.txt based tokenizer
            os.path.join(local_model_path, "spm.model"), # If using a SentencePiece model
        ]

        for file_path in files_to_upload:
            if os.path.exists(file_path):
                try:
                    api.upload_file(
                        path_or_fileobj=file_path,
                        path_in_repo=os.path.basename(file_path),
                        repo_id=repo_id,
                        repo_type="model",
                    )
                    print(f"'{os.path.basename(file_path)}' をアップロードしました。")
                except Exception as e:
                    print(f"ファイルのアップロード中にエラーが発生しました ({os.path.basename(file_path)}): {e}")
            else:
                print(f"ファイル '{file_path}' が見つかりませんでした。スキップします。")


except Exception as e:
    print(f"モデルのプッシュ中にエラーが発生しました: {e}")