In [1]:
import sys

import numpy as np

sys.path.insert(1, "./src")

In [2]:
import pickle
from pathlib import Path
from collections import Counter

import yaml
import pytorch_lightning as pl
from torch.utils.data import DataLoader
from transformers import set_seed

from anonymization.gpt2_generation import GPT2GenerationAnonymization
from anonymization.ref_book import ReferenceBookAnonymization
from anonymization.donated_dataset import DonatedDatasetAnonymization
from mask.util import convert_masked_docs_to_segments_set
from models.gpt2_model import PretrainedGPT2TextInfilling
from utils.infill_metrics import Statistics
from utils.log_reader import TensorBoardReader
from datasets.ner_dataset import get_ner_dataset
from mask.personal_entity import MaskEntityType
from models.bert_model import PretrainedBertNER
from utils.ner_metrics import Statistics as NerStatistics

In [3]:
set_seed(42)

In [4]:
mask_config = yaml.load(open("configs/ngram_mask_config.yaml", 'r'), Loader=yaml.Loader)
roc_stories_data_config = yaml.load(open("configs/roc_stories_data_config.yaml", 'r'), Loader=yaml.Loader)
i2b2_2006_data_config = yaml.load(open("configs/i2b2-2006_data_config.yaml", 'r'), Loader=yaml.Loader)
i2b2_2014_data_config = yaml.load(open("configs/i2b2-2014_data_config.yaml", 'r'), Loader=yaml.Loader)

In [5]:
# NER Model config initialisation
bert_base_model_config = yaml.load(open("configs/bert-base_model_config.yaml", 'r'), Loader=yaml.Loader)
bert_large_model_config = yaml.load(open("configs/bert-large_model_config.yaml", 'r'), Loader=yaml.Loader)
bert_large_model_config["model_version"] = 3

In [6]:
# Anonymization config initialisation
anon_config = yaml.load(open("configs/ref_book_anonymization_config.yaml", 'r'), Loader=yaml.Loader)
donor_data_config = yaml.load(open("configs/i2b2-2014_data_config.yaml", 'r'), Loader=yaml.Loader)
anon_gpt2_config = yaml.load(open("configs/gpt2_anonymization_config.yaml", 'r'), Loader=yaml.Loader)
anon_gpt2_config["model_version"] = 23

In [7]:
# # Инициализация алгоритма генерации искусственных сущностей с помощью GPT2
# model_reader = TensorBoardReader(Path(anon_gpt2_config["log_dir"]) / Path("lightning_logs"))
# path_to_checkpoint = model_reader.get_ckpt_path(anon_gpt2_config["model_version"])
# text_infill_model = PretrainedGPT2TextInfilling.load_from_checkpoint(path_to_checkpoint, strict=False).to("cuda:0")
# text_infill_model.eval()

# anonymization = GPT2GenerationAnonymization(text_infill_model,
#                                             label2type=lambda x: MaskEntityType[x.upper()],
#                                             mask_types=list(MaskEntityType), **anon_gpt2_config)

# model_reader.plot_text_infill_tensorboard_graphics(anon_gpt2_config["model_version"])

In [8]:
anonymization = ReferenceBookAnonymization(**anon_config, other_label=i2b2_2014_data_config['other_label'])

# path_to_donor = Path(donor_data_config["train_data_path"]).with_suffix(".pkl")
# anonymization = DonatedDatasetAnonymization.use_saved_dataset_as_donor(str(path_to_donor),
#                                                                        other_label=i2b2_2014_data_config['other_label'])

In [9]:
def print_dict(dictionary: dict):
    for k, v in dictionary.items():
        print(f"{k}:\t{v}")

In [10]:
def print_helpfull_statistics(infill_stats: Statistics, donated_entities_tf=None):
    masks_num = {k: sum(c for c in v.values()) for k, v in infill_stats.sub_entity_term_freq.items()}
    print(f"Распределение всех заполняемых масок ({sum(c for c in masks_num.values())}):")
    print_dict(masks_num)
    print()
    
    min_cer = min([min(x) for x in infill_stats.error_rates])
    mean_cer = sum([sum(x) for x in infill_stats.error_rates]) / sum([len(x) for x in infill_stats.error_rates])
    print(f"Средний показатель CER между изначальными текстами и ответами в примерах(macro_avg): {infill_stats.avg_cer:.3f}")
    print(f"Средний CER между изначальными текстами и ответами (micro_avg): {mean_cer:.3f}")
    print(f"Минимальный CER между изначальным текстом и ответом: {min_cer:.3f}")
    print()

    print("Число сэмплов с идеальными угадываниями:", len([x for x in infill_stats.error_rates if any(np.array(x) == 0.)]), 
          "/", len(infill_stats.error_rates))
    print("Число идеальных угадываний:", sum([np.sum(np.array(x) == 0.) for x in infill_stats.error_rates]), 
      "/", sum([len(x) for x in infill_stats.error_rates]))
    print("Статистика по угаданным типам данных:")
    guessed_categories = [np.array([cat for cat in categories if cat != infill_stats.other_label]) for categories in infill_stats.general_category_list]
    guessed_text = [np.array([infill_stats.source_text_list[i][j] for j, cat in enumerate(categories) if cat != infill_stats.other_label]) 
                    for i, categories in enumerate(infill_stats.general_category_list)]
    guessed_categories = sum([c[np.array(x) == 0.].tolist() for x, c in zip(infill_stats.error_rates, guessed_categories)], [])
    guessed_text = sum([t[np.array(x) == 0.].tolist() for x, t in zip(infill_stats.error_rates, guessed_text)], [])
    guessed_categories = Counter(guessed_categories)
    guessed_text = Counter(guessed_text)
    print("Категории:")
    print_dict(guessed_categories)
    print("Тексты:")
    print(dict(guessed_text))
    print()

    repeated_entities = {k: v.keys() & infill_stats.orig_entity_term_freq[k].keys() for k, v in infill_stats.sub_entity_term_freq.items()}
    print(f"Количество полностью повторённых сущностей без учёта контекста ({sum(len(v) for v in repeated_entities.values())}):")
    print_dict({k: len(v) for k, v in repeated_entities.items()})
    print("10 самых часто повторяемых сущностей каждого типа: текст сущности / количество появлений в заменённом тексте / количество появлений в изначальном тексте")
    print_dict({k: sorted([(t, infill_stats.sub_entity_term_freq[k][t], infill_stats.orig_entity_term_freq[k][t]) for t in v],
                     key=lambda x: x[1])[-10:] for k, v in repeated_entities.items()})
    print()
    
    if donated_entities_tf is not None:
        repeated_entities = {k: v.keys() & donated_entities_tf[k].keys() for k, v in infill_stats.sub_entity_term_freq.items()}
        print(f"Количество полностью повторённых сущностей из донорского датасета ({sum(len(v) for v in repeated_entities.values())}):")
        print_dict({k: len(v) for k, v in repeated_entities.items()})
        print("10 самых часто повторяемых сущностей каждого типа: текст сущности / количество появлений в заменённом тексте / количество появлений в изначальном тексте")
        print_dict({k: sorted([(t, infill_stats.sub_entity_term_freq[k][t], donated_entities_tf[k][t]) for t in v],
                         key=lambda x: x[1])[-10:] for k, v in repeated_entities.items()})
        print()

    print("Количество используемых лемм при анонимизации:", {k: len(v) for k, v in infill_stats.sub_label_lemmas.items()})
    print("Количество используемых лемм в изначальном наборе данных:", {k: len(v) for k, v in infill_stats.orig_label_lemmas.items()})
    print("Доля лемм, используемых из изначального набора данных:")
    print_dict({k: len(v & infill_stats.orig_label_lemmas[k]) / (len(v) or 1) for k, v in infill_stats.sub_label_lemmas.items()})

### Соответствие сгенерированных данных их типам
Правдоподобность искусственных примеров будет оцениваться по метрикам их определения лучшей NER моделью --- BERT-large-uncased, дообученной на i2b2 2014

In [11]:
# Инициализация обезличенного тестового датасета
test_dataset = get_ner_dataset(path_to_folder=i2b2_2014_data_config["train_data_path"], 
                               anonymization=anonymization, device='cpu',
                               **i2b2_2014_data_config)
test_dataloader = DataLoader(test_dataset, shuffle=False,
                             batch_size=i2b2_2014_data_config["batch_size"],
                             collate_fn=test_dataset.get_collate_fn(),
                             num_workers=10,
                             pin_memory=False,
                             persistent_workers=True)

Графики обучения модели для заполнения пропусков

In [12]:
t_reader = TensorBoardReader(Path(bert_large_model_config["log_dir"]) / Path("lightning_logs"))
ner_model = PretrainedBertNER.load_from_checkpoint(t_reader.get_ckpt_path(bert_large_model_config["model_version"]))

In [13]:
# Тестирование
trainer_args = {
    "accelerator": "gpu",
    "logger": False
}
trainer = pl.Trainer(**trainer_args, enable_progress_bar=True)
trainer.test(ner_model, test_dataloader)

In [14]:
# Метрики
stats = NerStatistics(ner_model, test_dataloader)
print(stats.get_classification_report())
stats.plot_confusion_matrix()
stats.print_random_failed_predictions()

In [15]:
stats.get_specific_failed_predictions('LOCATION')

In [16]:
stats.get_specific_failed_predictions('PROFESSION')

### Проверка заполнения пропусков на случайным образом замаскированных текстах (roc stories)

In [17]:
path_to_data = roc_stories_data_config["validate_data_path"]
split = "valid"
if Path(path_to_data).suffix != '.pkl':
    path_to_data = str(Path(path_to_data).parent / Path(f'{Path(path_to_data).stem}_{split}.pkl'))
    
# категории сущностей в формате [список категорий отрезков в документе, ...]; исходный текст в формате [список отрезков в документе, ...]
with open(path_to_data, 'rb') as f:
    # [(текст документа, список наборов масок для него: [[(тип, сдвиг, длина), ...], ...]), ...]
    infill_dataset = pickle.load(f)
    _, categories_list, source_texts = convert_masked_docs_to_segments_set(infill_dataset)

In [18]:
infill_stats = Statistics(anonymization, categories_list[:3000], categories_list[:3000], source_texts[:3000], is_uncased=True)

In [19]:
indexes = infill_stats.random_examples_indexes(30)
infill_stats.print_examples_by_indexes(indexes[-5:], max_example_len=500, start_other_len=100)

### Проверка заполнения пропусков в примерах с личной информацией (i2b2_2014)

#### Тренировочная выборка

In [20]:
# Для кэширования данных
get_ner_dataset(path_to_folder=i2b2_2014_data_config["train_data_path"], device='cpu', **i2b2_2014_data_config)

In [21]:
path = str(Path(i2b2_2014_data_config["train_data_path"]).with_suffix(".pkl"))
with open(path, 'rb') as f:
    (_, source_texts, specific_category_list, general_category_list, _) = pickle.load(f)

In [22]:
infill_stats = Statistics(anonymization, general_category_list, specific_category_list, source_texts, is_uncased=True)
donated_entities_tf = infill_stats.orig_entity_term_freq

In [23]:
indexes = infill_stats.random_examples_indexes(5)
infill_stats.print_examples_by_indexes(indexes[-5:], max_example_len=500, start_other_len=100)

In [24]:
(record_ids, col_j), cer = infill_stats.find_closest_substitutions(5)
infill_stats.print_examples_by_indexes(record_ids.tolist(), max_example_len=500, start_other_len=100)

In [25]:
print_helpfull_statistics(infill_stats)

#### Валидационная выборка

In [None]:
# Для кэширования данных
get_ner_dataset(path_to_folder=i2b2_2014_data_config["validate_data_path"], device='cpu', **i2b2_2014_data_config)

In [None]:
path = str(Path(i2b2_2014_data_config["validate_data_path"]).with_suffix(".pkl"))
with open(path, 'rb') as f:
    (_, source_texts, specific_category_list, general_category_list, _) = pickle.load(f)

In [None]:
infill_stats = Statistics(anonymization, general_category_list, specific_category_list, source_texts, is_uncased=True)

In [None]:
indexes = infill_stats.random_examples_indexes(5)
infill_stats.print_examples_by_indexes(indexes[-5:], max_example_len=500, start_other_len=100)

In [None]:
(record_ids, col_j), cer = infill_stats.find_closest_substitutions(5)
infill_stats.print_examples_by_indexes(record_ids.tolist(), max_example_len=500, start_other_len=100)

In [None]:
print_helpfull_statistics(infill_stats, donated_entities_tf)

### Проверка заполнения пропусков в примерах с личной информацией (i2b2_2006)

#### Тренировочная выборка

In [None]:
# Для кэширования данных
get_ner_dataset(path_to_folder=i2b2_2006_data_config["train_data_path"], device='cpu', **i2b2_2006_data_config)

In [None]:
path = str(Path(i2b2_2006_data_config["train_data_path"]).with_suffix(".pkl"))
with open(path, 'rb') as f:
    (_, source_texts, specific_category_list, general_category_list, _) = pickle.load(f)

In [None]:
infill_stats = Statistics(anonymization, general_category_list, specific_category_list, source_texts, is_uncased=True)

In [None]:
indexes = infill_stats.random_examples_indexes(5)
infill_stats.print_examples_by_indexes(indexes[-5:], max_example_len=500, start_other_len=100)

In [None]:
(record_ids, col_j), cer = infill_stats.find_closest_substitutions(5)
infill_stats.print_examples_by_indexes(record_ids.tolist(), max_example_len=500, start_other_len=100)

In [None]:
print_helpfull_statistics(infill_stats, donated_entities_tf)

#### Валидационная выборка

In [None]:
# Для кэширования данных
get_ner_dataset(path_to_folder=i2b2_2006_data_config["validate_data_path"], device='cpu', **i2b2_2006_data_config)

In [None]:
path = str(Path(i2b2_2014_data_config["validate_data_path"]).with_suffix(".pkl"))
with open(path, 'rb') as f:
    (_, source_texts, specific_category_list, general_category_list, _) = pickle.load(f)

In [None]:
infill_stats = Statistics(anonymization, general_category_list, specific_category_list, source_texts, is_uncased=True)

In [None]:
indexes = infill_stats.random_examples_indexes(5)
infill_stats.print_examples_by_indexes(indexes[-5:], max_example_len=500, start_other_len=100)

In [None]:
(record_ids, col_j), cer = infill_stats.find_closest_substitutions(5)
infill_stats.print_examples_by_indexes(record_ids.tolist(), max_example_len=500, start_other_len=100)

In [None]:
print_helpfull_statistics(infill_stats, donated_entities_tf)