---
# Обучение модели для распознания именованных сущностей

> **🎯 Цель этого ноутбука:** обучить кастомную NER модель на основе ai-forever/ruBert-large для распознавания именованных сущностей в вопросах ЧГК.
---

*   **`AUTHOR`**: Создатели (Пушкин, Тарантино).
*   **`CHARACTER`**: Персонажи (Онегин, Зевс).
*   **`PERSON`**: Реальные люди, не являющиеся авторами в данном контексте (Наполеон как историческая фигура, Юрий Гагарин).
*   **`WORK_OF_ART`**: Произведения ("Евгений Онегин", "Криминальное чтиво").
*   **`LOCATION`**: Места (Петербург, Лувр).

## Импорт библиотек

In [None]:
import json
import time

import numpy as np
import torch
from datasets import load_dataset
from seqeval.metrics import classification_report
from sklearn.model_selection import train_test_split
from tqdm.notebook import tqdm

from transformers import (
    AutoModelForTokenClassification,
    AutoTokenizer,
    DataCollatorForTokenClassification,
    EarlyStoppingCallback,
    Trainer,
    TrainingArguments,
    pipeline,
    set_seed,
)

## Разбиение выборки на 3 части - тренировочную, валидационную и тестовую

In [None]:
with open('../data/annotated_data.json', 'r', encoding='utf-8') as f:
    all_data = json.load(f)


train_data, temp_data = train_test_split(all_data, test_size=0.2, random_state=42)
val_data, test_data = train_test_split(temp_data, test_size=0.5, random_state=42)

with open('train_data.json', 'w', encoding='utf-8') as f:
    json.dump(train_data, f, ensure_ascii=False, indent=2)

with open('val_data.json', 'w', encoding='utf-8') as f:
    json.dump(val_data, f, ensure_ascii=False, indent=2)

with open('test_data.json', 'w', encoding='utf-8') as f:
    json.dump(test_data, f, ensure_ascii=False, indent=2)

print(f"Данные успешно разделены:")
print(f"Обучающая выборка: {len(train_data)} примеров (сохранено в train_data.json)")
print(f"Валидационная выборка: {len(val_data)} примеров (сохранено в val_data.json)")
print(f"Тестовая выборка: {len(test_data)} примеров (сохранено в test_data.json)")

## Обучение модели

In [None]:
MODEL_NAME = "ai-forever/ruBert-large"
OUTPUT_DIR = "./my-chgk-ner-model-v1"
TRAIN_FILE = "train_data.json"
TEST_FILE = "val_data.json"

set_seed(42)
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Используемое устройство: {device}")

dataset = load_dataset('json', data_files={'train': TRAIN_FILE, 'test': TEST_FILE})
print("\nСтруктура датасета:")
print(dataset)

labels_from_data = set()
for item in dataset['train']['annotations']:
    for entity in item[0]['result']:
        labels_from_data.add(entity['value']['labels'][0])

unique_labels = sorted(list(labels_from_data))
label2id = {}
id2label = {}

for label in unique_labels:
    label2id[f"B-{label}"] = len(label2id)
    id2label[len(id2label)] = f"B-{label}"
    label2id[f"I-{label}"] = len(label2id)
    id2label[len(id2label)] = f"I-{label}"

label2id["O"] = len(label2id)
id2label[len(id2label)] = "O"

print("\nСгенерированные метки:")
print(id2label)

print("\n--- Загрузка токенизатора ---")
start_time = time.time()
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)
end_time = time.time()
print(f"Токенизатор загружен за {end_time - start_time:.2f} сек.")


def tokenize_and_align_labels(examples):
    texts = [item['text'] for item in examples['data']]
    tokenized_inputs = tokenizer(texts, truncation=True, is_split_into_words=False)
    labels = []
    for i in range(len(texts)):
        word_ids = tokenized_inputs.word_ids(batch_index=i)
        label_ids = [-100] * len(word_ids)
        if examples['annotations'][i] and examples['annotations'][i][0].get('result'):
            entities = examples['annotations'][i][0]['result']
        else:
            entities = []
        for entity in entities:
            if 'value' not in entity: continue
            value = entity['value']
            start_char, end_char, label = value['start'], value['end'], value['labels'][0]
            token_start_index = tokenized_inputs.char_to_token(i, start_char)
            token_end_index = tokenized_inputs.char_to_token(i, end_char - 1)
            if token_start_index is not None and token_end_index is not None and f"B-{label}" in label2id:
                label_ids[token_start_index] = label2id[f"B-{label}"]
                for t_idx in range(token_start_index + 1, token_end_index + 1):
                    label_ids[t_idx] = label2id[f"I-{label}"]
        for j, word_id in enumerate(word_ids):
            if label_ids[j] == -100 and word_id is not None:
                 label_ids[j] = label2id["O"]
        labels.append(label_ids)
    tokenized_inputs["labels"] = labels
    return tokenized_inputs

print("\n--- Токенизация и выравнивание меток ---")
tokenized_dataset = dataset.map(tokenize_and_align_labels, batched=True, desc="Running tokenizer")
print("Токенизация завершена.")


print("\n--- Загрузка предобученной модели ---")
start_time = time.time()
model = AutoModelForTokenClassification.from_pretrained(
    MODEL_NAME,
    num_labels=len(label2id),
    id2label=id2label,
    label2id=label2id
).to(device)
end_time = time.time()
print(f"Модель загружена за {end_time - start_time:.2f} сек.")

num_params = model.num_parameters()
num_params_trainable = model.num_parameters(only_trainable=True)
model_size_mb = num_params * 4 / (1024**2)
print(f"\n--- Информация о модели ---")
print(f"Количество всех параметров: {num_params / 1_000_000:.2f} M")
print(f"Количество обучаемых параметров: {num_params_trainable / 1_000_000:.2f} M")
print(f"Примерный размер модели в памяти (float32): {model_size_mb:.2f} MB")
print("---------------------------\n")


data_collator = DataCollatorForTokenClassification(tokenizer=tokenizer)

def compute_metrics(p):
    predictions, labels = p
    predictions = np.argmax(predictions, axis=2)
    true_predictions = [[id2label[p] for (p, l) in zip(prediction, label) if l != -100] for prediction, label in zip(predictions, labels)]
    true_labels = [[id2label[l] for (p, l) in zip(prediction, label) if l != -100] for prediction, label in zip(predictions, labels)]
    f1 = seqeval.metrics.f1_score(true_labels, true_predictions, average="macro")
    precision = seqeval.metrics.precision_score(true_labels, true_predictions, average="macro")
    recall = seqeval.metrics.recall_score(true_labels, true_predictions, average="macro")
    return {"precision": precision, "recall": recall, "f1": f1}



training_args = TrainingArguments(
    output_dir=OUTPUT_DIR,
    learning_rate=2e-5,
    per_device_train_batch_size=16,
    gradient_accumulation_steps=1,
    per_device_eval_batch_size=8,
    num_train_epochs=25,
    weight_decay=0.01,
    fp16=True,
    
    eval_strategy="epoch", 
    save_strategy="epoch",      
    
    load_best_model_at_end=True,
    metric_for_best_model="f1",
    greater_is_better=True,
    
    save_total_limit=2,
    report_to="none",
)


print("\n--- Ручная настройка весов для классов ---")

manual_weights = {
    "O": 0.5,
    
    "B-LOCATION": 1.0,
    "I-LOCATION": 1.0,


    "B-WORK_OF_ART": 1.0,
    "I-WORK_OF_ART": 1.0,

    "B-AUTHOR": 1.0,
    "I-AUTHOR": 1.0,
    "B-PERSON": 1.5,
    "I-PERSON": 1.5,
    "B-CHARACTER": 1.0,
    "I-CHARACTER": 1.0,
}

num_labels = len(label2id)
class_weights = torch.zeros(num_labels)

for i in range(num_labels):
    label_name = id2label[i]
    class_weights[i] = manual_weights.get(label_name, 1.0)

class_weights = class_weights.to(device)

print("\nИтоговые ручные веса для функции потерь:")
for i in range(num_labels):
    print(f"  {id2label[i]}: {class_weights[i]:.4f}")
print("----------------------------------\n")


class WeightedLossTrainer(Trainer):
    def compute_loss(self, model, inputs, return_outputs=False, **kwargs):
        labels = inputs.pop("labels")
        
        outputs = model(**inputs)
        logits = outputs.get("logits")
        
        loss_fct = torch.nn.CrossEntropyLoss(weight=class_weights)
        
        loss = loss_fct(logits.view(-1, self.model.config.num_labels), labels.view(-1))
        
        return (loss, outputs) if return_outputs else loss


trainer = WeightedLossTrainer(
    model=model,
    args=training_args,
    train_dataset=tokenized_dataset["train"],
    eval_dataset=tokenized_dataset["test"],
    tokenizer=tokenizer,
    data_collator=data_collator,
    compute_metrics=compute_metrics,
    callbacks=[EarlyStoppingCallback(early_stopping_patience=3)]
)

print("\n--- Начинаем дообучение модели ---")
trainer.train()

trainer.save_model(f"{OUTPUT_DIR}/best_model")
print(f"\nОбучение завершено! Лучшая модель сохранена в {OUTPUT_DIR}/best_model")


## Проверка качества модели на отложенной выборке

In [None]:
MODEL_PATH = "../data/my-chgk-ner-model-v1/best_model" 
TEST_FILE = "test_data.json"

device = 0 if torch.cuda.is_available() else -1
print(f"Используемое устройство: {'cuda' if device == 0 else 'cpu'}")

print("Загрузка модели и токенизатора...")
ner_pipeline = pipeline(
    "ner",
    model=MODEL_PATH,
    tokenizer=MODEL_PATH,
    aggregation_strategy="simple", 
    device=device
)

tokenizer = AutoTokenizer.from_pretrained(MODEL_PATH)

with open(TEST_FILE, 'r', encoding='utf-8') as f:
    test_data = json.load(f)

print(f"Модель загружена. Загружено {len(test_data)} примеров для теста.")

all_true_tags = []
all_pred_tags = []

print("\nНачинаем оценку на тестовой выборке...")

for example in tqdm(test_data, desc="Оценка примеров"):
    text = example['data']['text']
    true_annotations = example['annotations'][0].get('result', [])

    tokenized_inputs = tokenizer(text, return_offsets_mapping=True, truncation=True)
    tokens = tokenizer.convert_ids_to_tokens(tokenized_inputs["input_ids"])
    offsets = tokenized_inputs["offset_mapping"]

    true_tags = ['O'] * len(tokens)
    for annotation in true_annotations:
        value = annotation['value']
        start_char, end_char, label = value['start'], value['end'], value['labels'][0]

        token_start_index = -1
        token_end_index = -1

        for i, (offset_start, offset_end) in enumerate(offsets):
            if offset_start <= start_char < offset_end:
                token_start_index = i
            if offset_start < end_char <= offset_end:
                token_end_index = i
        
        if token_start_index != -1 and token_end_index != -1:
            true_tags[token_start_index] = f"B-{label}"
            for i in range(token_start_index + 1, token_end_index + 1):
                true_tags[i] = f"I-{label}"

    pred_tags = ['O'] * len(tokens)
    predictions = ner_pipeline(text)
    
    for entity in predictions:
        start_char, end_char, label = entity['start'], entity['end'], entity['entity_group']
        
        token_start_index = -1
        token_end_index = -1

        for i, (offset_start, offset_end) in enumerate(offsets):
            if offset_start <= start_char < offset_end:
                token_start_index = i
            if offset_start < end_char <= offset_end:
                token_end_index = i

        if token_start_index != -1 and token_end_index != -1:
            pred_tags[token_start_index] = f"B-{label}"
            for i in range(token_start_index + 1, token_end_index + 1):
                pred_tags[i] = f"I-{label}"

    final_true_tags = []
    final_pred_tags = []
    for i, token in enumerate(tokens):
        if token not in (tokenizer.cls_token, tokenizer.sep_token, tokenizer.pad_token):
            final_true_tags.append(true_tags[i])
            final_pred_tags.append(pred_tags[i])

    all_true_tags.append(final_true_tags)
    all_pred_tags.append(final_pred_tags)

print("\nОценка завершена. Расчет итоговых метрик...")
report = classification_report(all_true_tags, all_pred_tags, digits=4)

print("\n--- Отчет по качеству NER-модели на тестовой выборке ---")
print(report)
