In [1]:
import torch
import pandas as pd
import numpy as np
from transformers import AutoTokenizer, AutoModel, DataCollatorForTokenClassification, AutoModelForTokenClassification, TrainingArguments, Trainer
from corus import load_factru
from datasets import Dataset, DatasetDict
from tqdm import tqdm


model_name = "DeepPavlov/rubert-base-cased"
max_tok_len = 512
tokenizer = AutoTokenizer.from_pretrained(model_name)

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
# [ids, type_ids, tokens, offsets, attention_mask, special_tokens_mask, overflowing]
records = list(load_factru("factRuEval-2016-master"))

In [3]:
types = []
for rec in records:
    for obj in rec.objects:
        for span in obj.spans:
            print(span)
            types.append(span.type.upper())
types = sorted(set(types))
label_names = ["O"]

types = ["LOC_NAME", #"ORG_DESCR",
         "ORG_NAME", "SURNAME", "NAME", "PATRONYMIC"]  # оставляем только нормальные теги
for t in types:
    label_names.append("B-" + t)
    label_names.append("I-" + t)

id2label = {i: label for i, label in enumerate(label_names)}
label2id = {v: k for k, v in id2label.items()}
label2id

FactruSpan(id='22763', type='loc_name', start=37, stop=43)
FactruSpan(id='22764', type='org_descr', start=31, stop=36)
FactruSpan(id='22763', type='loc_name', start=37, stop=43)
FactruSpan(id='22765', type='loc_name', start=47, stop=55)
FactruSpan(id='22766', type='loc_descr', start=56, stop=63)
FactruSpan(id='22767', type='name', start=313, stop=317)
FactruSpan(id='22768', type='surname', start=318, stop=325)
FactruSpan(id='22769', type='loc_name', start=306, stop=312)
FactruSpan(id='22770', type='loc_name', start=477, stop=483)
FactruSpan(id='22771', type='loc_name', start=531, stop=537)
FactruSpan(id='22772', type='org_name', start=562, stop=572)
FactruSpan(id='22773', type='name', start=610, stop=616)
FactruSpan(id='22774', type='surname', start=617, stop=624)
FactruSpan(id='22775', type='name', start=756, stop=760)
FactruSpan(id='22776', type='surname', start=761, stop=768)
FactruSpan(id='68733', type='geo_adj', start=584, stop=597)
FactruSpan(id='22782', type='name', start=968, s

{'O': 0,
 'B-LOC_NAME': 1,
 'I-LOC_NAME': 2,
 'B-ORG_NAME': 3,
 'I-ORG_NAME': 4,
 'B-SURNAME': 5,
 'I-SURNAME': 6,
 'B-NAME': 7,
 'I-NAME': 8,
 'B-PATRONYMIC': 9,
 'I-PATRONYMIC': 10}

In [4]:
result = {
    'input_ids': [],
    'token_type_ids': [],
    'attention_mask': [],
    'labels': []
}
for rec in tqdm(records):
    labels = []
    tokenized_inputs = tokenizer(rec.text, truncation=True, max_length=max_tok_len, padding=True)
    enc = tokenized_inputs.encodings[0]
    offsets = enc.offsets
    word_ids = enc.word_ids
    stm = enc.special_tokens_mask
    temp_tags = []
    for i in range(len(offsets)):
        if word_ids[i] is None:
            labels.append(-100)
            temp_tags.append("O")
        elif i > 0 and word_ids[i-1] == word_ids[i]:
            labels.append(-100)
            offset = offsets[i]
            flg = False
            for obj in rec.objects:
                if flg:
                    break
                for span in obj.spans:
                    if span.start <= offset[0] < span.stop or span.start < offset[1] <= span.stop:
                        temp_tags.append(span.type)
                        flg = True
                        break
            if not flg:
                temp_tags.append("O")
        else:
            offset = offsets[i]
            flg = False
            for obj in rec.objects:
                if flg:
                    break
                for span in obj.spans:

                    if span.start <= offset[0] < span.stop or span.start < offset[1] <= span.stop:
                        if span.type == temp_tags[-1]:
                            prefix = "I-"
                        else:
                            prefix = "B-"
                        # labels.append(prefix + span.type.upper())
                        labels.append(label2id.get(prefix + span.type.upper(), 0))
                        temp_tags.append(span.type)
                        flg = True
                        break
            if not flg:
                labels.append(0)
                temp_tags.append("O")
    result['input_ids'].append(tokenized_inputs['input_ids'])
    result['token_type_ids'].append(tokenized_inputs['token_type_ids'])
    result['attention_mask'].append(tokenized_inputs['attention_mask'])
    result['labels'].append(labels)
    if len(labels) != len(offsets) or len(temp_tags) != len(offsets):
        break

100%|██████████| 254/254 [00:00<00:00, 360.76it/s]


In [5]:
ds = Dataset.from_dict(result)
ds

Dataset({
    features: ['input_ids', 'token_type_ids', 'attention_mask', 'labels'],
    num_rows: 254
})

In [6]:
import numpy as np
gen = np.random.default_rng(42)

dsd = ds.train_test_split(test_size=0.2, generator=gen)
dsd_temp = dsd["test"].train_test_split(test_size=0.5, generator=gen)
dsd = DatasetDict({"train": dsd["train"], "test": dsd_temp["test"], "valid": dsd_temp["train"]})
dsd

DatasetDict({
    train: Dataset({
        features: ['input_ids', 'token_type_ids', 'attention_mask', 'labels'],
        num_rows: 203
    })
    test: Dataset({
        features: ['input_ids', 'token_type_ids', 'attention_mask', 'labels'],
        num_rows: 26
    })
    valid: Dataset({
        features: ['input_ids', 'token_type_ids', 'attention_mask', 'labels'],
        num_rows: 25
    })
})

In [7]:
from seqeval.metrics import classification_report
from seqeval.scheme import IOB2


def compute_metrics(p):
    predictions, labels = p
    predictions = np.argmax(predictions, axis=-1)

    y_true = [[id2label[l] for l, p in zip(label, pred) if l != -100] for label, pred in zip(labels, predictions)]
    y_pred = [[id2label[p] for l, p in zip(label, pred) if l != -100] for label, pred in zip(labels, predictions)]

    res = classification_report(y_true=y_true, y_pred=y_pred, scheme=IOB2, output_dict=True)["weighted avg"]
    res.pop("support", None)
    return res

In [8]:
import torch
from transformers import AutoModelForTokenClassification

model_name = "./res_model_base"
model = AutoModelForTokenClassification.from_pretrained(model_name)
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")

labels = []
predictions = []
for b in dsd["test"]:
    labels.append(b.pop("labels"))
    batch = {k: torch.as_tensor([v]) for k, v in b.items()}
    with torch.no_grad():
        outputs = model(**batch)
    predictions.append(np.argmax(outputs.logits, axis=-1).numpy())

In [9]:
preds = []  # объединение предиктов из разных батчей
for i, ps in enumerate(predictions):
    for j, p in enumerate(ps):
        preds.append(p)

#labels = tokenized_dsd["test"]["labels"]  # подгрузка размеченных/истинных тегов
len(preds), len(labels)

(26, 26)

In [11]:
import re
def rm_bi(x):
    return re.sub(r"^[BI]-", "", x).upper()

y_true = []
y_pred = []
for label, pred in zip(labels, predictions):
    for l, p in zip(label, pred[0]):
        if l != -100:
            y_true.append(rm_bi(id2label[l]))
            y_pred.append(rm_bi(id2label[p]))
lbls = list(set(y_true))
lbls.sort()
len(y_true), len(y_pred)
print()




In [12]:
from sklearn.metrics import classification_report

rep = classification_report(y_true=y_true, y_pred=y_pred, labels=lbls, output_dict=False, digits=4, zero_division=0)
print(rep)

              precision    recall  f1-score   support

    LOC_NAME     0.8889    0.9130    0.9008       184
        NAME     0.9820    0.9478    0.9646       115
           O     0.9954    0.9933    0.9944      7631
    ORG_NAME     0.8586    0.8827    0.8704       392
  PATRONYMIC     0.0000    0.0000    0.0000         2
     SURNAME     0.9511    0.9831    0.9669       178

    accuracy                         0.9854      8502
   macro avg     0.7793    0.7867    0.7828      8502
weighted avg     0.9854    0.9854    0.9854      8502



In [1]:
import torch
import re
import numpy as np
import os
import json
from sklearn.metrics import classification_report, precision_recall_fscore_support
from transformers import AutoModelForTokenClassification, AutoTokenizer
from typing import List, Dict, Any

model_name = "./res_model_base"
model = AutoModelForTokenClassification.from_pretrained(model_name)
tokenizer = AutoTokenizer.from_pretrained("DeepPavlov/rubert-base-cased")
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
model.to(device)

id2label = {int(k): v for k, v in model.config.id2label.items()}

def is_valid_inn(inn_str: str) -> bool:
    if not inn_str.isdigit():
        return False
    if len(inn_str) == 10:
        weights = [2, 4, 10, 3, 5, 9, 4, 6, 8]
        digits = [int(d) for d in inn_str]
        checksum = sum(w * d for w, d in zip(weights, digits[:9]))
        return ((checksum % 11) % 10) == digits[9]
    elif len(inn_str) == 12:
        weights1 = [7, 2, 4, 10, 3, 5, 9, 4, 6, 8]
        weights2 = [3, 7, 2, 4, 10, 3, 5, 9, 4, 6, 8]
        digits = [int(d) for d in inn_str]
        checksum1 = sum(w * d for w, d in zip(weights1, digits[:10]))
        if ((checksum1 % 11) % 10) != digits[10]:
            return False
        checksum2 = sum(w * d for w, d in zip(weights2, digits[:11]))
        return ((checksum2 % 11) % 10) == digits[11]
    return False

def is_valid_card_number(card_str: str) -> bool:
    card_str = re.sub(r'\s+', '', card_str)
    if not card_str.isdigit() or not (13 <= len(card_str) <= 19):
        return False
    digits = [int(d) for d in card_str]
    checksum = 0
    for i, digit in enumerate(reversed(digits)):
        if i % 2 == 1:
            doubled_digit = digit * 2
            checksum += doubled_digit - 9 if doubled_digit > 9 else doubled_digit
        else:
            checksum += digit
    return checksum % 10 == 0


def find_rule_based_entities(text: str) -> List[Dict[str, Any]]:
    """
    Находит в тексте сущности, основанные на правилах, с использованием
    контекстного анализа для разрешения неоднозначностей.
    """
    entities = []
    used_positions = set()

    regex_map = {
        "PASSPORT": r'\b\d{4}\s\d{6}\b|\b\d{2}\s\d{2}\s\d{6}\b',
        "INN": r'\b\d{12}\b',
        "NUM_10": r'\b\d{10}\b',
        "PHONE": r'(?:\+7|8)[\s\-]?\(?\d{3}\)?[\s\-]?\d{3}[\s\-]?\d{2}[\s\-]?\d{2}',
        "EMAIL": r'\b[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}\b',
        "DATE": r'\b(?:(?:0[1-9]|[12][0-9]|3[01])[\.\-/](?:0[1-9]|1[0-2])[\.\-/](?:\d{4}|\d{2}))|\b(?:\d{4}[\.\-/](?:0[1-9]|1[0-2])[\.\-/](?:0[1-9]|[12][0-9]|3[01]))',
        "PROJECT_ID": r'\b[A-Z]+-\d+\b',
        "CARD": r'\b\d{4}[\s-]?\d{4}[\s-]?\d{4}[\s-]?\d{4}\b',
    }

    def add_entity(match, entity_type):
        start, end = match.start(), match.end()
        # Проверяем, не пересекается ли найденная сущность с уже добавленными
        if not any(pos in used_positions for pos in range(start, end)):
            entities.append({
                "start": start,
                "end": end,
                "type": entity_type,
                "text": match.group(0)
            })
            used_positions.update(range(start, end))
            return True
        return False

    priority_order = [
        "EMAIL", "PROJECT_ID",
        "PASSPORT", "INN", "CARD",
        "DATE", "PHONE"
    ]

    for entity_type in priority_order:
        pattern = regex_map[entity_type]
        flags = re.IGNORECASE if entity_type == "EMAIL" else 0

        for match in re.finditer(pattern, text, flags):
            if entity_type == "INN" and not is_valid_inn(match.group(0)):
                continue
            if entity_type == "CARD" and not is_valid_card_number(match.group(0)):
                continue

            add_entity(match, entity_type)

    for match in re.finditer(regex_map["NUM_10"], text):
        start, end = match.start(), match.end()

        if any(pos in used_positions for pos in range(start, end)):
            continue

        context_window = text[max(0, start - 25):start].lower()

        if re.search(r'\b(паспорт|серия|номер)\b', context_window):
            add_entity(match, "PASSPORT")
        elif is_valid_inn(match.group(0)):
            add_entity(match, "INN")

    return entities

def neural_predict(text, model, tokenizer, id2label, max_length=512):
    target_labels = {
        "B-NAME", "I-NAME",
        "B-SURNAME", "I-SURNAME",
        "B-ORG_NAME", "I-ORG_NAME",
        "B-LOC_NAME", "I-LOC_NAME",
    }

    inputs = tokenizer(
        text,
        return_tensors="pt",
        truncation=True,
        max_length=max_length,
        return_offsets_mapping=True
    )
    offset_mapping = inputs.pop("offset_mapping").cpu().numpy()[0]
    inputs = {k: v.to(device) for k, v in inputs.items()}

    with torch.no_grad():
        outputs = model(**inputs)

    predictions = torch.argmax(outputs.logits, dim=-1).cpu().numpy()[0]
    neural_entities = []
    current_entity = None

    for pred, offset in zip(predictions, offset_mapping):
        if offset[0] == offset[1]:  # Пропуск спец. токенов
            continue

        label = id2label[pred]

        if label in target_labels:
            clean_label = re.sub(r"^[BI]-", "", label)

            if label.startswith("B-") or current_entity is None or current_entity["type"] != clean_label:
                if current_entity:
                    neural_entities.append(current_entity)
                current_entity = {
                    "type": clean_label,
                    "start": int(offset[0]),
                    "end": int(offset[1])
                }
            else:
                current_entity["end"] = int(offset[1])
        else:
            if current_entity:
                neural_entities.append(current_entity)
                current_entity = None

    if current_entity:
        neural_entities.append(current_entity)

    for entity in neural_entities:
        entity["text"] = text[entity["start"]:entity["end"]]

    return neural_entities


def hybrid_predict_entities(text, model, tokenizer, id2label):
    regex_entities = find_rule_based_entities(text)

    used_positions = set()
    for entity in regex_entities:
        used_positions.update(range(entity["start"], entity["end"]))

    neural_entities = neural_predict(text, model, tokenizer, id2label)

    filtered_neural_entities = []
    for entity in neural_entities:
        position_range = range(entity["start"], entity["end"])
        if not any(pos in used_positions for pos in position_range):
            filtered_neural_entities.append(entity)
            used_positions.update(position_range)

    # Объединяем сущности (regex имеют приоритет)
    all_entities = regex_entities + filtered_neural_entities

    # Сортируем сущности по их позиции в тексте
    all_entities.sort(key=lambda x: x['start'])
    return all_entities


def load_data_from_directory(texts_dir, labels_dir=None):
    texts, filenames, true_entities_list = [], [], []
    if not os.path.exists(texts_dir):
        print(f"Каталог {texts_dir} не найден!")
        return texts, filenames, true_entities_list
    for filename in sorted(os.listdir(texts_dir)):
        if filename.endswith('.txt'):
            file_path = os.path.join(texts_dir, filename)
            try:
                with open(file_path, 'r', encoding='utf-8') as file:
                    text = file.read().strip()
                    if text:
                        texts.append(text)
                        filenames.append(filename)
            except Exception as e:
                print(f"Ошибка при чтении файла {filename}: {e}")
    if labels_dir and os.path.exists(labels_dir):
        for filename in filenames:
            label_filename = filename.replace('.txt', '.json')
            label_path = os.path.join(labels_dir, label_filename)
            if os.path.exists(label_path):
                try:
                    with open(label_path, 'r', encoding='utf-8') as file:
                        true_entities_list.append(json.load(file))
                except Exception as e:
                    print(f"Ошибка при чтении разметки {label_filename}: {e}")
                    true_entities_list.append([])
            else:
                true_entities_list.append([])
    else:
        true_entities_list = [[] for _ in texts]

    return texts, filenames, true_entities_list

def prepare_for_classification_report(true_entities_list, pred_entities_list):
    y_true, y_pred = [], []
    for true_entities, pred_entities in zip(true_entities_list, pred_entities_list):
        if not true_entities: continue
        true_set = set((ent['type'], ent['text']) for ent in true_entities)
        pred_set = set((ent['type'], ent['text']) for ent in pred_entities)
        all_entities_in_example = true_set | pred_set
        for entity_type, entity_text in all_entities_in_example:
            y_true.append(entity_type if (entity_type, entity_text) in true_set else 'O')
            y_pred.append(entity_type if (entity_type, entity_text) in pred_set else 'O')
    return y_true, y_pred


texts_directory = "texts"
labels_directory = "labels"
test_texts, filenames, true_entities_list = load_data_from_directory(texts_directory, labels_directory)

if not test_texts:
    print("Не найдено текстовых файлов для обработки.")
else:
    pred_entities_list = []
    for i, (text, filename) in enumerate(zip(test_texts, filenames), 1):
        print(f"\nФайл {i}: {filename}")
        pred_entities = hybrid_predict_entities(text, model, tokenizer, id2label)
        pred_entities_list.append(pred_entities)
        print("Распознанные атрибуты ПДн:")
        if not pred_entities:
            print("  Не найдено.")
        for entity in pred_entities:
            attribute = [entity['type'], entity['start'], entity['end']]
            print(f"  {attribute} -> '{entity['text']}'")

    has_labels = any(true_entities_list)
    if has_labels:
        y_true, y_pred = prepare_for_classification_report(true_entities_list, pred_entities_list)
        if y_true and y_pred:
            labels = sorted(set(y_true + y_pred) - {'O'})
            report = classification_report(y_true, y_pred, labels=labels, digits=4, zero_division=0)
            print()
            print(report)
            precision, recall, f1, support = precision_recall_fscore_support(
                y_true, y_pred, labels=labels, average='weighted', zero_division=0
            )
            print()
            print(f"Precision: {precision:.4f}")
            print(f"Recall:    {recall:.4f}")
            print(f"F1-score:  {f1:.4f}")

  from .autonotebook import tqdm as notebook_tqdm



Файл 1: 1.txt
Распознанные атрибуты ПДн:
  ['SURNAME', 14, 20] -> 'Петров'
  ['NAME', 21, 25] -> 'Инна'
  ['INN', 35, 45] -> '7707083893'
  ['PASSPORT', 98, 109] -> '4506 123456'
  ['PASSPORT', 125, 135] -> '4505123123'
  ['CARD', 155, 171] -> '4242424242424242'
  ['DATE', 187, 197] -> '25.12.1990'
  ['EMAIL', 237, 253] -> 'test@example.com'
  ['PROJECT_ID', 262, 269] -> 'ABC-123'
  ['LOC_NAME', 271, 281] -> 'Узбекистан'

Файл 2: 10.txt
Распознанные атрибуты ПДн:
  ['SURNAME', 14, 23] -> 'Васильева'
  ['NAME', 24, 33] -> 'Екатерина'
  ['INN', 44, 54] -> '7233490713'
  ['PASSPORT', 104, 115] -> '4529 798027'
  ['PASSPORT', 131, 141] -> '4529798027'
  ['CARD', 161, 180] -> '5903 5408 5428 1864'
  ['DATE', 197, 207] -> '28.07.1985'
  ['PHONE', 218, 234] -> '+7 202 129 26 26'
  ['EMAIL', 243, 268] -> 'konovalovlavr@example.com'
  ['PROJECT_ID', 278, 285] -> 'ABC-707'
  ['ORG_NAME', 300, 303] -> 'НПО'
  ['ORG_NAME', 305, 311] -> 'Восход'
  ['LOC_NAME', 323, 329] -> 'Кизляр'

Файл 3: 2.txt
