In [59]:
from huggingface_hub import login
from dotenv import load_dotenv
import os

load_dotenv()

hf_token = os.getenv("HUGGINGFACE_TOKEN")

if hf_token:
    print("Токен Hugging Face успешно загружен из .env файла.")
    login(token=hf_token)
else:
    print("ВНИМАНИЕ: Токен Hugging Face не найден в .env файле.")
    print("Пожалуйста, создайте файл .env в корне проекта и добавьте в него HUGGINGFACE_TOKEN.")

Токен Hugging Face успешно загружен из .env файла.


In [60]:
import pandas as pd
import ast
from transformers import AutoTokenizer

tokenizer = AutoTokenizer.from_pretrained("xlm-roberta-base")

In [61]:
def _tokenize_and_filter(text: str) -> tuple[list[str], list[tuple[int, int]]]:
    if tokenizer is None:
        raise RuntimeError("Токенизатор не был инициализирован.")
    encoding = tokenizer(text, return_offsets_mapping=True)
    tokens = tokenizer.convert_ids_to_tokens(encoding["input_ids"])
    offsets = encoding["offset_mapping"]
    filtered_tokens, filtered_offsets = [], []
    for token, offset in zip(tokens, offsets):
        if offset != (0, 0):
            filtered_tokens.append(token)
            filtered_offsets.append(offset)
    return filtered_tokens, filtered_offsets

In [62]:
def indices_to_bio(text: str, annotations: list) -> tuple[list[str], list[str]]:
    if tokenizer is None:
        raise RuntimeError("Токенизатор не был инициализирован.")

    encoding = tokenizer(text, return_offsets_mapping=True)
    tokens = tokenizer.convert_ids_to_tokens(encoding["input_ids"])
    offsets = encoding["offset_mapping"]

    filtered_tokens = []
    filtered_offsets = []
    for token, offset in zip(tokens, offsets):
        if offset != (0, 0):
            filtered_tokens.append(token)
            filtered_offsets.append(offset)

    bio_tags = ["O"] * len(filtered_tokens)

    annotations_sorted = sorted(annotations, key=lambda x: x[0])

    last_assigned_idx = -1
    last_core = None

    for ann_start, ann_end, ann_label in annotations_sorted:
        if ann_label == "O":
            continue

        core = ann_label.split("-", 1)[-1]
        ann_is_I = ann_label.startswith("I-")

        overlapping_idxs = [
            i for i, (ts, te) in enumerate(filtered_offsets)
            if max(ts, ann_start) < min(te, ann_end)
        ]
        if not overlapping_idxs:
            continue

        for j, tok_idx in enumerate(overlapping_idxs):
            if j == 0:
                if ann_is_I:
                    if last_assigned_idx == tok_idx - 1 and last_core == core:
                        bio_tags[tok_idx] = f"I-{core}"
                    else:
                        bio_tags[tok_idx] = f"B-{core}"
                else:
                    bio_tags[tok_idx] = f"B-{core}"
            else:
                bio_tags[tok_idx] = f"I-{core}"

            last_assigned_idx = tok_idx
            last_core = core

    return filtered_tokens, bio_tags

In [63]:
def bio_to_indices(text: str, bio_tags: list) -> list:
    if tokenizer is None:
        raise RuntimeError("Токенизатор не был инициализирован.")

    encoding = tokenizer(text, return_offsets_mapping=True)
    offsets = encoding["offset_mapping"]

    filtered_offsets = [offset for offset in offsets if offset != (0, 0)]

    if len(bio_tags) != len(filtered_offsets):
        raise ValueError("Количество BIO-тегов не совпадает с количеством отфильтрованных токенов")

    annotations = []
    current_entity_indices = []

    extended_tags = bio_tags + ["O"]

    for i, tag in enumerate(extended_tags):
        if tag.startswith("B-"):
            if current_entity_indices:
                start_token_idx = current_entity_indices[0]
                end_token_idx = current_entity_indices[-1]
                start_char = filtered_offsets[start_token_idx][0]
                end_char = filtered_offsets[end_token_idx][1]
                entity_tag = bio_tags[start_token_idx]
                annotations.append((start_char, end_char, entity_tag))

            current_entity_indices = [i]

        elif tag.startswith("I-"):
            if current_entity_indices:
                prev_tag = bio_tags[i - 1]
                if prev_tag != "O" and prev_tag.split("-")[-1] == tag.split("-")[-1]:
                    current_entity_indices.append(i)
                else:
                    if current_entity_indices:
                        start_token_idx = current_entity_indices[0]
                        end_token_idx = current_entity_indices[-1]
                        start_char = filtered_offsets[start_token_idx][0]
                        end_char = filtered_offsets[end_token_idx][1]
                        entity_tag = bio_tags[start_token_idx]
                        annotations.append((start_char, end_char, entity_tag))
                    current_entity_indices = [i]
            else:
                current_entity_indices = [i]

        else:
            if current_entity_indices:
                start_token_idx = current_entity_indices[0]
                end_token_idx = current_entity_indices[-1]
                start_char = filtered_offsets[start_token_idx][0]
                end_char = filtered_offsets[end_token_idx][1]
                entity_tag = bio_tags[start_token_idx]
                annotations.append((start_char, end_char, entity_tag))

            current_entity_indices = []

    return annotations

In [64]:
def sanitize_annotations(text: str, annotations: list) -> tuple[str, list]:
    new_annotations = []
    for start, end, label in annotations:
        entity_text = text[start:end]
        stripped_entity_text_right = entity_text.rstrip()
        new_end = start + len(stripped_entity_text_right)
        stripped_entity_text_left = stripped_entity_text_right.lstrip()
        new_start = new_end - len(stripped_entity_text_left)
        if new_start < new_end:
            new_annotations.append((new_start, new_end, label))
    return text, new_annotations

In [65]:
try:
    df_train = pd.read_csv("../../data/raw/train.csv", sep=";")
    df_train["annotation"] = df_train["annotation"].apply(ast.literal_eval)
    print("Данные для проведения теста успешно загружены.")
except FileNotFoundError:
    print("Критическая ошибка: файл train.csv не найден. Проверьте путь '../../data/raw/train.csv'.")

sample_df = df_train.sample(n=min(1000, len(df_train)), random_state=42)
errors_found = 0
total_checked = 0

print(f"Запуск кругового теста на {len(sample_df)} случайных примерах с предварительной очисткой...")

for _, row in sample_df.iterrows():
    total_checked += 1
    original_text = row["sample"]
    original_annotations = row["annotation"]

    try:
        sanitized_text, sanitized_annotations = sanitize_annotations(original_text, original_annotations)

        sanitized_annotations_no_O = [ann for ann in sanitized_annotations if ann[2] != "O"]

        tokens, bio_tags = indices_to_bio(sanitized_text, sanitized_annotations)
        reconstructed_annotations = bio_to_indices(sanitized_text, bio_tags)

        if sorted(sanitized_annotations_no_O, key=lambda x: x[0]) != sorted(reconstructed_annotations,
                                                                            key=lambda x: x[0]):
            errors_found += 1
            print("-" * 80)
            print(f"ОБНАРУЖЕНО РАСХОЖДЕНИЕ В ПРИМЕРЕ: '{original_text}'")
            print(f"  Эталон (без 'O'): {sorted(sanitized_annotations_no_O, key=lambda x: x[0])}")
            print(f"  Восстановлено:    {sorted(reconstructed_annotations, key=lambda x: x[0])}")
            print(f"  (Промежуточные BIO-теги: {bio_tags})")
            print("-" * 80)

    except Exception as e:
        errors_found += 1
        print(f"КРИТИЧЕСКАЯ ОШИБКА ИСПОЛНЕНИЯ в примере '{original_text}': {e}")
        import traceback

        traceback.print_exc()

print("\n" + "=" * 40)
if errors_found == 0:
    print(f"РЕЗУЛЬТАТ: УСПЕХ. Круговой тест пройден на {total_checked} примерах без расхождений.")
else:
    print(f"РЕЗУЛЬТАТ: ПРОВАЛ. Обнаружено {errors_found} ошибок на {total_checked} примерах.")
print("=" * 40)

Данные для проведения теста успешно загружены.
Запуск кругового теста на 1000 случайных примерах с предварительной очисткой...
--------------------------------------------------------------------------------
ОБНАРУЖЕНО РАСХОЖДЕНИЕ В ПРИМЕРЕ: 'кабачковая икра'
  Эталон (без 'O'): [(0, 10, 'B-TYPE'), (11, 15, 'I-TYPE')]
  Восстановлено:    [(0, 15, 'B-TYPE')]
  (Промежуточные BIO-теги: ['B-TYPE', 'I-TYPE', 'I-TYPE', 'I-TYPE', 'I-TYPE', 'I-TYPE'])
--------------------------------------------------------------------------------
--------------------------------------------------------------------------------
ОБНАРУЖЕНО РАСХОЖДЕНИЕ В ПРИМЕРЕ: 'ядра подсолнечника'
  Эталон (без 'O'): [(0, 4, 'B-TYPE'), (5, 18, 'I-TYPE')]
  Восстановлено:    [(0, 18, 'B-TYPE')]
  (Промежуточные BIO-теги: ['B-TYPE', 'I-TYPE', 'I-TYPE', 'I-TYPE', 'I-TYPE', 'I-TYPE', 'I-TYPE'])
--------------------------------------------------------------------------------
--------------------------------------------------------