# X5

In [None]:
import random

import numpy as np
import pandas as pd

import torch

torch.cuda.empty_cache()

def seed_all(seed: int) -> None:
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False
    random.seed(seed)

In [None]:
SEED = 42
seed_all(SEED)

In [None]:
from pathlib import Path

DATA_PATH = Path('../../../data/')
DATA_PATH.mkdir(parents=True, exist_ok=True)

DATA_PATH_DOWNLOAD = DATA_PATH / Path('download/')
DATA_PATH_DOWNLOAD.mkdir(parents=True, exist_ok=True)

DATA_PATH_DATASET = DATA_PATH / Path('datasets/')
DATA_PATH_DATASET.mkdir(parents=True, exist_ok=True)

DATA_CACHE = DATA_PATH / Path('cache_dir/')
DATA_CACHE.mkdir(parents=True, exist_ok=True)

DATA_PATH_SAVE_MODELS = DATA_PATH / Path('models/')
DATA_PATH_SAVE_MODELS.mkdir(parents=True, exist_ok=True)

DATA_IMGS = DATA_PATH /  Path('imgs/')
DATA_IMGS.mkdir(parents=True, exist_ok=True)

import pandas as pd

pd.set_option('display.max_colwidth', 500) 

In [None]:
import sys
import os

project_path = os.path.abspath(os.path.join(os.getcwd(), "../../"))
sys.path.append(project_path)

In [None]:
BASE_MODEL_NAME = 'cointegrated/rubert-tiny2'
MODEL_NAME_SAVE = "ner_x5"
MODEL_CHECKPOINT_PATH = "ner_x5_checkpoint"

In [None]:
from datetime import datetime

current_date = datetime.now().strftime("%d-%m-%Y")
DATA_LOG = DATA_PATH / Path(f'../log/{MODEL_NAME_SAVE}_{current_date}')
DATA_LOG.mkdir(parents=True, exist_ok=True)

In [None]:
MAX_LENGTH = 10
BATCH_SIZE = 16

# Данные

In [None]:
df_x5 = pd.read_csv(DATA_PATH_DATASET / "train.csv", sep=";")

df_x5.sample(5)

In [None]:
def normalize_annotations(ann_list):
    if isinstance(ann_list, str):
        ann_list = eval(ann_list)
    new_list = []
    for start, end, label in ann_list:
        if label == "0":
            label = "O"
        new_list.append((start, end, label))
    return new_list

df_x5["annotation"] = df_x5["annotation"].apply(normalize_annotations)

In [None]:
all_labels = set()

for ann_list in df_x5["annotation"]:
    if isinstance(ann_list, str):
        ann_list = eval(ann_list)
    for _, _, label in ann_list:
        all_labels.add(label)

unique_labels = sorted(all_labels)
unique_labels

In [None]:
df_x5.sample(5)

In [None]:
df_train = pd.concat([df_x5], ignore_index=True)

# Словари

In [None]:
unique_labels = {label for anns in df_train["annotation"] for _, _, label in anns if label != "O"}

b_labels = sorted([lbl for lbl in unique_labels if lbl.startswith("B-")])
i_labels = {lbl[2:]: lbl for lbl in unique_labels if lbl.startswith("I-")}

all_labels = ["O"]
for b in b_labels:
    all_labels.append(b)
    base = b[2:]
    if base in i_labels:
        all_labels.append(i_labels[base])

label2idx = {label: idx for idx, label in enumerate(all_labels)}
idx2label = {idx: label for label, idx in label2idx.items()}

print("label2idx:", label2idx)
print("idx2label:", idx2label)


In [None]:
import json

label2idx_path = DATA_PATH_SAVE_MODELS / "label2idx.json"
idx2label_path = DATA_PATH_SAVE_MODELS / "idx2label.json"

with open(label2idx_path, "w", encoding="utf-8") as f:
    json.dump(label2idx, f, ensure_ascii=False, indent=4)

with open(idx2label_path, "w", encoding="utf-8") as f:
    json.dump(idx2label, f, ensure_ascii=False, indent=4)

print(f"Словарь label2idx сохранён в {label2idx_path}")
print(f"Словарь idx2label сохранён в {idx2label_path}")

# Датасет

In [None]:
import pandas as pd
from sklearn.model_selection import train_test_split

train_data: pd.DataFrame
test_data: pd.DataFrame
train_data, test_data = train_test_split(
    df_train,
    test_size=0.1,
    shuffle=True,
    random_state=42,
)

val_data: pd.DataFrame
train_data, val_data = train_test_split(
    train_data,
    test_size=0.1,
    shuffle=True,
    random_state=42,
)


In [None]:
import torch
import pandas as pd

from tqdm import tqdm
import matplotlib.pyplot as plt
from torch.utils.data import Dataset
from transformers import AutoTokenizer, DebertaV2Tokenizer
from typing import Tuple, Dict, Optional


class NerDataSet(Dataset):
    def __init__(
        self, df: pd.DataFrame, 
        max_length: int, 
        tokenizer_path: str, 
        label2idx: Dict[str, int],
        cache_dir: str = None, 
        text_label: str = 'sample',
        target_label: str = 'annotation',        
        dtype_input_ids: torch.dtype = torch.long,
        dtype_token_type_ids: torch.dtype = torch.long,
        dtype_attention_mask: torch.dtype = torch.long,
        dtype_labels : torch.dtype = torch.long,
        debug: bool = False,
    ):
        self.df = df.copy().reset_index(drop=True)
        self.max_length = max_length
        self.text_label = text_label
        self.target_label = target_label
        self.debug = debug
        
        self.label2idx = label2idx
        
        # TODO добавить класс для типизации
        self.tokenizer = AutoTokenizer.from_pretrained(
            tokenizer_path,
            cache_dir=cache_dir,
            use_fast=True,
        )

        self.dtype_input_ids = dtype_input_ids
        self.dtype_token_type_ids = dtype_token_type_ids
        self.dtype_attention_mask = dtype_attention_mask
        self.dtype_labels  = dtype_labels 

        self.input_ids, self.token_type_ids, self.attention_mask, self.labels = self.tokenize_data()

    def tokenize_data(self):
        input_ids, token_type_ids, attention_mask, labels = [], [], [], []
        tokens_ids_debug, tokens_text_debug, labels_debug = [], [], []

        for _, row in tqdm(
            self.df.iterrows(),
            total=len(self.df),
            desc="Tokenizing data",
            ncols=100
        ):
            text = row[self.text_label]
            ann_list = row[self.target_label]

            if isinstance(ann_list, str):
                ann_list = eval(ann_list)

            encoded = self.tokenizer(
                text,
                max_length=self.max_length,
                padding="max_length",
                truncation=True,
                return_offsets_mapping=True,
                return_token_type_ids=True,
            )

            offsets = encoded["offset_mapping"]
            seq_labels = ["O"] * len(offsets)

            for start, end, ent_label in ann_list:
                inside = False
                for i, (tok_start, tok_end) in enumerate(offsets):
                    if tok_start >= end:
                        break
                    if tok_end <= start:
                        continue

                    if not inside:
                        seq_labels[i] = ent_label  # B-XXX
                        inside = True
                    else:
                        # преобразуем "B-XXX" → "I-XXX"
                        if ent_label.startswith("B-"):
                            seq_labels[i] = "I-" + ent_label.split("-", 1)[1]
                        else:
                            seq_labels[i] = ent_label

            # конвертация в индексы
            label_ids = []
            for i, label in enumerate(seq_labels):
                if encoded["attention_mask"][i] == 0:
                    label_ids.append(-100)
                else:
                    label_ids.append(self.label2idx.get(label, self.label2idx["O"]))

            # добавляем в массивы
            input_ids.append(torch.tensor(encoded["input_ids"], dtype=self.dtype_input_ids))
            token_type_ids.append(torch.tensor(encoded.get("token_type_ids", [0]*len(label_ids)), dtype=self.dtype_token_type_ids))
            attention_mask.append(torch.tensor(encoded["attention_mask"], dtype=self.dtype_attention_mask))
            labels.append(torch.tensor(label_ids, dtype=self.dtype_labels))

            if self.debug:
                tokens_ids_debug.append(encoded["input_ids"])
                tokens_text_debug.append(self.tokenizer.convert_ids_to_tokens(encoded["input_ids"]))
                labels_debug.append(seq_labels)

        input_ids = torch.stack(input_ids)
        token_type_ids = torch.stack(token_type_ids)
        attention_mask = torch.stack(attention_mask)
        labels = torch.stack(labels)

        if self.debug:
            self.df["tokens_ids_debug"] = tokens_ids_debug
            self.df["tokens_text_debug"] = tokens_text_debug
            self.df["labels_debug"] = labels_debug

        return input_ids, token_type_ids, attention_mask, labels

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx: int) -> Dict[str, torch.Tensor]:
        return {
            "input_ids": self.input_ids[idx],
            "attention_mask": self.attention_mask[idx],
            "token_type_ids": self.token_type_ids[idx],
            "labels": self.labels[idx],
        }
        
    def plot_token_length_distribution(self):
        """
        Строит гистограмму распределения длины токенов (без специальных токенов и паддинга).
        Работает только если класс инициализирован с debug=True.
        """
        if not self.debug:
            raise ValueError("Для построения графика необходимо включить debug=True при инициализации.")

        token_lengths = []
        special_ids = set(self.tokenizer.all_special_ids)

        for token_ids in self.df["tokens_ids_debug"]:
            filtered_tokens = [tid for tid in token_ids if tid not in special_ids]
            token_lengths.append(len(filtered_tokens))

        plt.figure(figsize=(10, 6))
        plt.hist(token_lengths, bins=30, alpha=0.7, edgecolor="black")
        plt.xlabel("Длина текста (количество токенов)")
        plt.ylabel("Частота")
        plt.title("Распределение длин текстов в токенах")
        plt.grid(axis="y", linestyle="--", alpha=0.7)
        plt.show()



In [None]:
train_data.head()

In [None]:
dtype_input = torch.long
dtype_labels = torch.long

train_dataset = NerDataSet(
    df=train_data, 
    max_length=MAX_LENGTH, 
    tokenizer_path=BASE_MODEL_NAME,
    cache_dir=DATA_CACHE,
    label2idx=label2idx,
    text_label='sample',
    target_label='annotation',
    dtype_input_ids=dtype_input,
    dtype_token_type_ids=dtype_input,
    dtype_attention_mask=dtype_input,
    dtype_labels=dtype_labels,
    debug=True    
)

val_dataset = NerDataSet(
    df=val_data,
    max_length=MAX_LENGTH, 
    tokenizer_path=BASE_MODEL_NAME,
    cache_dir=DATA_CACHE,
    label2idx=label2idx,
    text_label='sample',
    target_label='annotation',
    dtype_input_ids=dtype_input,
    dtype_token_type_ids=dtype_input,
    dtype_attention_mask=dtype_input,
    dtype_labels=dtype_labels,
    debug=True
)

test_dataset = NerDataSet(
    df=test_data, 
    max_length=MAX_LENGTH, 
    tokenizer_path=BASE_MODEL_NAME,
    cache_dir=DATA_CACHE,
    label2idx=label2idx,
    text_label='sample',
    target_label='annotation',
    dtype_input_ids=dtype_input,
    dtype_token_type_ids=dtype_input,
    dtype_attention_mask=dtype_input,
    dtype_labels=dtype_labels,
    debug=True
)


In [None]:
val_dataset[0]

In [None]:
val_dataset.df.sample(3)

In [None]:
val_dataset.plot_token_length_distribution()

# Модель

In [None]:
from dataclasses import dataclass
from typing import Optional, Tuple, List

import torch
import torch.nn as nn
from transformers import BertForTokenClassification
from transformers.utils import ModelOutput
from torchcrf import CRF


@dataclass
class TokenClassifierCRFOutput(ModelOutput):
    """
    Выход модели для NER с CRF.
    """
    loss: Optional[torch.FloatTensor] = None
    logits: torch.FloatTensor = None  # [batch, seq_len, num_labels]
    predictions: Optional[torch.LongTensor] = None  # [batch, seq_len] с паддингами (-100)
    hidden_states: Optional[Tuple[torch.FloatTensor, ...]] = None
    attentions: Optional[Tuple[torch.FloatTensor, ...]] = None


class BertForTokenClassificationCRF(BertForTokenClassification):
    def __init__(self, config):
        super().__init__(config)
        self.classifier = nn.Linear(config.hidden_size, config.num_labels)
        self.crf = CRF(config.num_labels, batch_first=True)
    
    def forward(
        self,
        input_ids: Optional[torch.Tensor] = None,
        attention_mask: Optional[torch.Tensor] = None,
        token_type_ids: Optional[torch.Tensor] = None,
        position_ids: Optional[torch.Tensor] = None,
        head_mask: Optional[torch.Tensor] = None,
        inputs_embeds: Optional[torch.Tensor] = None,
        labels: Optional[torch.Tensor] = None,
        output_attentions: Optional[bool] = None,
        output_hidden_states: Optional[bool] = None,
        return_dict: Optional[bool] = None,
    ) -> TokenClassifierCRFOutput:

        return_dict = return_dict if return_dict is not None else self.config.use_return_dict

        outputs = self.bert(
            input_ids,
            attention_mask=attention_mask,
            token_type_ids=token_type_ids,
            position_ids=position_ids,
            head_mask=head_mask,
            inputs_embeds=inputs_embeds,
            output_attentions=output_attentions,
            output_hidden_states=output_hidden_states,
            return_dict=return_dict,
        )

        sequence_output = outputs[0]   # [batch, seq_len, hidden]
        sequence_output = self.dropout(sequence_output)
        emissions = self.classifier(sequence_output)  # [batch, seq_len, num_labels]

        loss, predictions = None, None
        if labels is not None:
            labels_for_crf = labels.clone()
            labels_for_crf[labels_for_crf == -100] = 0

            loss = -self.crf(
                emissions,
                labels_for_crf,
                mask=attention_mask.bool(),
                reduction="mean"
            )

        decoded = self.crf.decode(emissions, mask=attention_mask.bool())

        max_len = emissions.size(1)
        predictions_padded = torch.full(
            (len(decoded), max_len),
            fill_value=-100,
            dtype=torch.long,
            device=emissions.device,
        )
        for i, seq in enumerate(decoded):
            predictions_padded[i, :len(seq)] = torch.tensor(seq, dtype=torch.long, device=emissions.device)

        
        if not return_dict:
            output = (emissions,) + outputs[2:]
            return ((loss,) + output) if loss is not None else output

        return TokenClassifierCRFOutput(
            loss=loss,
            logits=emissions,
            predictions=predictions_padded,
            hidden_states=outputs.hidden_states,
            attentions=outputs.attentions,
        )


In [None]:
from transformers import AutoConfig

num_labels = len(label2idx)

config = AutoConfig.from_pretrained(
    BASE_MODEL_NAME,
    num_labels=num_labels,
    id2label=idx2label,     # словарь {int: str}
    label2id=label2idx,     # словарь {str: int}
    cache_dir=DATA_CACHE,
)

model = BertForTokenClassificationCRF.from_pretrained(
    BASE_MODEL_NAME,
    config=config,
    ignore_mismatched_sizes=True,
)

model.gradient_checkpointing_enable()

# Обучение

In [None]:
from seqeval.metrics import precision_score, recall_score, f1_score, accuracy_score, classification_report

def compute_metrics(eval_pred):
    """
    Метрики для NER (BIO-разметка) с CRF.
    """
    predictions, labels = eval_pred.predictions, eval_pred.label_ids

    true_labels = []
    true_predictions = []

    for pred_seq, label_seq in zip(predictions, labels):
        seq_true = []
        seq_pred = []
        for p, l in zip(pred_seq, label_seq):
            if l == -100:
                continue
            seq_true.append(idx2label[l])
            seq_pred.append(idx2label[p])
        true_labels.append(seq_true)
        true_predictions.append(seq_pred)

    # основные метрики
    precision = precision_score(true_labels, true_predictions)
    recall = recall_score(true_labels, true_predictions)
    f1_micro = f1_score(true_labels, true_predictions, average="micro")
    f1_macro = f1_score(true_labels, true_predictions, average="macro")
    accuracy = accuracy_score(true_labels, true_predictions)

    report = classification_report(true_labels, true_predictions, digits=4)

    metrics = {
        "precision": precision,
        "recall": recall,
        "f1_micro": f1_micro,
        "f1_macro": f1_macro,
        "accuracy": accuracy,
        "report": report,
    }
    return metrics


In [None]:
import torch
import matplotlib.pyplot as plt
import pandas as pd
from transformers import Trainer

class CustomBaseTrainer(Trainer):
    """
    Кастомный Trainer, наследуемый от transformers.Trainer.
    https://hf.qhduan.com/docs/transformers/trainer
    """

    def __init__(self, model, args, **kwargs):
        super().__init__(model, args, **kwargs)

    def plot_results(self):
        """
        Графики потерь и метрик на основе `trainer.state.log_history`.

        Графики строятся для:
        - `loss` (потери на обучении)
        - `eval_loss` (потери на валидации)
        - `eval_accuracy`, `eval_f1`, `eval_f1_macro` (если они были логированы)
        """

        if not self.state.log_history:
            print("Нет данных для построения графиков. Проверьте, выполнялось ли обучение.")
            return
        
        # Загружаем историю логов в DataFrame
        log_data = pd.DataFrame(self.state.log_history)

        # Фильтруем только строки с эпохами
        log_data = log_data.dropna(subset=["epoch"])  # Оставляем только строки с эпохами
        log_data = log_data.groupby("epoch").last().reset_index()  # Убираем дубли по эпохам
        
        # Список метрик, которые можно отобразить
        available_metrics = [col for col in log_data.columns if col.startswith("eval_") or col == "loss"]

        # Определяем количество графиков
        num_plots = len(available_metrics)
        plt.figure(figsize=(8, 4 * num_plots))

        for i, metric in enumerate(available_metrics, start=1):
            plt.subplot(num_plots, 1, i)
            plt.plot(log_data["epoch"], log_data[metric], marker="o", label=metric)

            plt.xlabel("Эпохи")
            plt.ylabel(metric)
            plt.title(f"График {metric}")
            plt.legend()
            plt.grid(True)

        plt.tight_layout()
        plt.show()

In [None]:
from transformers import TrainingArguments

training_args = TrainingArguments(
    # 🟢 Общие параметры тренировки
    output_dir=DATA_PATH_SAVE_MODELS / MODEL_CHECKPOINT_PATH,  # Папка для сохранения моделей
    # learning_rate=1e-4,  # Скорость обучения
    num_train_epochs=10,  # Количество эпох
    # weight_decay=1e-2,  # L2-регуляризация
    # optim="adamw_torch",  # Оптимизатор AdamW
    # lr_scheduler_type="cosine",
    # warmup_ratio=0.1,

    # 🔵 Оценка и логирование
    eval_strategy="steps",  # Оценка модели после каждой эпохи
    eval_steps=100,  # Оценка модели каждый шаг
    logging_strategy="steps",  # Логирование каждые N шагов
    logging_steps=100,  # Как часто логировать
    disable_tqdm=False,  # Отключить tqdm (нужно для работы в Colab/Kaggle)
    report_to="tensorboard",  # Логируем в TensorBoard
    logging_dir=DATA_LOG,  # Папка для логов

    # 🟠 Сохранение моделей
    save_strategy="steps",  # Сохранение модели после каждой эпохи
    save_steps=100, # Если save_strategy="steps"
    save_total_limit=5,  # Храним все
    load_best_model_at_end=True,  # Загружать лучшую модель после обучения
    metric_for_best_model="eval_f1_macro",  # Выбираем лучшую модель по eval_f1_macro
    greater_is_better=True,  # Чем меньше eval_loss, тем лучше модель

    # 🔴 Поддержка возобновления обучения
    # save_steps=500,  # Сохранять каждые 500 шагов (на случай долгих эпох)
    # resume_from_checkpoint=True,  # Автоматически продолжать обучение с последнего чекпоинта
    # trainer.train(resume_from_checkpoint="./saved_model/checkpoint-1500")

    # 🟡 Параметры обучения (batch_size, precision, градиентное накопление)
    per_device_train_batch_size=BATCH_SIZE,  # Размер батча на одно устройство (GPU/CPU)
    per_device_eval_batch_size=BATCH_SIZE,  # Размер батча для валидации
    # gradient_accumulation_steps=4,  # Градиентное накопление (симулирует batch_size в 4 раза больше)
    # fp16=True,  # Включить mixed precision (ускоряет обучение на GPU)

    # # 🔵 Оптимизация скорости (ускорение загрузки данных)
    # group_by_length=True,  # Динамическое изменение batch_size (ускоряет обучение)
    # dataloader_num_workers=4,  # Количество потоков для загрузки данных
)


In [None]:
from transformers import get_cosine_schedule_with_warmup

optimizer = torch.optim.AdamW(model.parameters(), lr=1e-5, weight_decay=1e-2)

steps_per_epoch = len(train_dataset.df) // training_args.per_device_train_batch_size
total_steps = steps_per_epoch * training_args.num_train_epochs

scheduler = get_cosine_schedule_with_warmup(
    optimizer,
    num_warmup_steps=int(0.2 * total_steps),    # 20% шагов на разогрев
    num_training_steps=total_steps              # полный цикл косинусного затухания
)


In [None]:
def preprocess_logits_for_metrics(logits, labels):
    if isinstance(logits, tuple):
        logits = logits[0]
    return torch.argmax(logits, dim=-1)

In [None]:
trainer = CustomBaseTrainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=val_dataset,
    optimizers=(optimizer, scheduler),
    compute_metrics=compute_metrics,
    processing_class=train_dataset.tokenizer,
    preprocess_logits_for_metrics=preprocess_logits_for_metrics
)

In [None]:
trainer.train()

In [None]:
trainer.plot_results()

In [None]:
test_results = trainer.predict(test_dataset)
print(test_results.metrics)

In [None]:
trainer.save_model(DATA_PATH_SAVE_MODELS / MODEL_NAME_SAVE)

In [None]:
from transformers import AutoModelForSequenceClassification

AutoModelForSequenceClassification.from_pretrained(DATA_PATH_SAVE_MODELS / MODEL_NAME_SAVE)
