## Setup

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
!pip install datasets transformers evaluate bert-score rouge-score accelerate --quiet
!pip install intervaltree stanza textstat --quiet

# Install SpaCy model(s) (downstream task)
!pip install spacy scispacy --quiet
!pip install https://s3-us-west-2.amazonaws.com/ai2-s2-scispacy/releases/v0.5.4/en_ner_bc5cdr_md-0.5.4.tar.gz --quiet

  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m527.3/527.3 kB[0m [31m11.3 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m84.0/84.0 kB[0m [31m7.9 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m61.1/61.1 kB[0m [31m5.5 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m116.3/116.3 kB[0m [31m8.9 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m39.9/39.9 MB[0m [31m55.5 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m134.8/134.8 kB[0m [31m12.3 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m194.1/194.1 kB[0m [31m17.3 MB/s[0m eta [36m0:00:00[0m
[?25h  Building wheel for rouge-score (setup.py) ... [?25l[?25hdone
[31mERROR: pip's dependency resolver 

In [None]:
# Base imports
import math, datetime, os, shutil, itertools, json, random, re, csv
import numpy as np
import pandas as pd
import xml.etree.ElementTree as ET
# Transformer model imports
from transformers import AutoTokenizer, AutoModelForMaskedLM, pipeline
import datasets, transformers, torch, evaluate
from bert_score import score
# NER imports
import nltk
import intervaltree, stanza, textstat
from intervaltree import Interval, IntervalTree
# Downstream imports
import spacy, scispacy, en_ner_bc5cdr_md
from spacy.scorer import Scorer
from spacy.training import Example

In [None]:
# Parameters
RUN_NAME = 'Run - 09-08-18-17'

TEST_MASK_RATIOS = {
  # PHI is set to 1.0 and MED_* to 0.0
  # Every other missing annotation type is automatically set to 0.0
  'NOUN': 0.90,
  'VERB': 0.90,
  'ADJ': 0.90,
}

GEN_TECHNIQUE = 'iterative'

In [None]:
REPO_PATH = 'drive/MyDrive/LiboMsc'

TRAINED_MODEL_PATH = f'{REPO_PATH}/Bio_ClinicalBERT/Results/{RUN_NAME}'
HP_FOLDER = f"GT={GEN_TECHNIQUE}_{'_'.join([f'{key}={value}' for key, value in TEST_MASK_RATIOS.items()])}"

TEST_DATASET_PATH = f'{REPO_PATH}/data/i2b2_2024_T1_test'

CHUNK_SIZE = 256
DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu'

In [None]:
if not os.path.exists(f'{TRAINED_MODEL_PATH}/{HP_FOLDER}'):
    os.mkdir(f'{TRAINED_MODEL_PATH}/{HP_FOLDER}')
    print('Folder created.')

Folder created.


## Load testing dataset

In [None]:
def parse_xml_file(file_path):
  ''' Takes an XML file path and extracts note_id and text. '''
  tree = ET.parse(file_path)
  root = tree.getroot()
  data = {
    'note_id': file_path.split('/')[-1].replace('.xml', ''),
    'text':    root.find('TEXT').text,
  }
  return data


def load_xml_folder(folder_path):
  ''' Takes a folder path and loads all XML files from it. '''
  data_list = []
  for filename in os.listdir(folder_path):
    if filename.endswith('.xml'):
      file_path = os.path.join(folder_path, filename)
      data = parse_xml_file(file_path)
      data_list.append(data)

  return data_list

In [None]:
# Load test dataset
test_dataset = datasets.Dataset.from_list(load_xml_folder(TEST_DATASET_PATH))

## Get annotations

In [None]:
# Download Philter and dependencies
!git clone https://github.com/BCHSI/philter-deidstable1_mirror.git ./philter/src
%cd ./philter/src
!git checkout v1.2024.1
!pip install -r requirements.txt
%cd ../..

nltk.download('averaged_perceptron_tagger', quiet=True)

# Format all letters from testing dataset
%rm -rf ./philter/results ./philter/data
os.makedirs('./philter/results')
os.makedirs('./philter/data')

for letter in test_dataset:
  with open(f"./philter/data/{letter['note_id']}.txt", 'w+') as f:
    f.write(letter['text'])

# Extract PHIs from each letter
%cd philter/src
!python3 deidpipe.py -i ../data/ -o ../results/ -f configs/philter_one2024.json -d False
%cd ../..

Cloning into './philter/src'...
remote: Enumerating objects: 17723, done.[K
remote: Counting objects: 100% (2062/2062), done.[K
remote: Compressing objects: 100% (681/681), done.[K
remote: Total 17723 (delta 1142), reused 2060 (delta 1140), pack-reused 15661 (from 1)[K
Receiving objects: 100% (17723/17723), 159.95 MiB | 34.86 MiB/s, done.
Resolving deltas: 100% (11611/11611), done.
Updating files: 100% (1934/1934), done.
/content/philter/src
Note: switching to 'v1.2024.1'.

You are in 'detached HEAD' state. You can look around, make experimental
changes and commit them, and you can discard any commits you make in this
state without impacting any branches by switching back to a branch.

If you want to create a new branch to retain commits you create, you may
do so (now or later) by using -c with the switch command. Example:

  git switch -c <new-branch-name>

Or undo this operation with:

  git switch -

Turn off this advice by setting config variable advice.detachedHead to false

H

In [None]:
# Save PHIs in each sample of the dataset
def get_phi(sample):
  with open(f'./philter/results/log/phi_marked.json', 'r') as f:
    phi = json.load(f)
  phis = phi[f'../data/{sample["note_id"]}.txt']
  for p in phis:
    p['label'] = f'PHI_{p["type"]}'
    p.pop('type')
    p['text'] = p.pop('word')
    p.pop('context')
  return phis

In [None]:
# Load model to annotate clinical text
stanza.download('en', package='mimic', processors={'ner': 'i2b2'})
stza_detector = stanza.Pipeline('en', package='mimic', processors={'ner': 'i2b2'})

def get_all_annotations(text, phi):
  doc = stza_detector(text)
  annotations = phi.copy()

  # Get all medical entities (to keep) that do not overlap with PHIs (to remove)
  tree_phi = IntervalTree.from_tuples([(inf['start'], inf['end']) for inf in phi])
  def overlaps(tree, start_ind, end_ind):
    return len(tree.overlap(start_ind, end_ind)) > 0

  for ents in doc.entities:
    if not overlaps(tree_phi, ents.start_char, ents.end_char):
      annotations.append(dict(start=ents.start_char, end=ents.end_char, text=ents.text, label=f'MED_{ents.type}'))
  # print('Number of medical entities:', len(annotations))

  # Search for other types of entities (NOUN, ADJ, VRB...) that do not overlap with medical entities (to keep)
  tree_med = IntervalTree.from_tuples([(annotation['start'], annotation['end']) for annotation in annotations])

  for sent in doc.sentences:
    for word in sent.words:
      if (not overlaps(tree_med, word.start_char, word.end_char)) and (not overlaps(tree_phi, word.start_char, word.end_char)):
        annotations.append(dict(start=word.start_char, end=word.end_char, text=word.text, label=word.upos))
  # print('Number of all entities (medical + others):', len(annotations))

  # Sort annotations based on appeareance order
  annotations = sorted(annotations, key=lambda x: x['start'])

  # Decompose multi-word annotations into multiple single-word annotations
  sing_word_annotations = []
  for anno in annotations:
    # If annotation is a NUM or PUNCT keep it as it is
    if anno['label'] == 'NUM' or anno['label'] == 'PUNCT':
      sing_word_annotations.append(anno)
    # Otherwise decompose it
    else:
      indexes = [[]]
      for i, char in enumerate(anno['text']):
        if char.isalpha() or char.isalnum() or char == '\'':
          indexes[-1].append(i)
        else:
          indexes.append([])

      for new_anno in [{
          'start': anno['start']+i[0], 'end': anno['start']+i[-1]+1,
          'text': ''.join([anno['text'][e] for e in i]), 'label': anno['label']
        } for i in indexes if len(i)>0]:
          sing_word_annotations.append(new_anno)

  # Sort annotations based on appeareance order
  sing_word_annotations = sorted(sing_word_annotations, key=lambda x: x['start'])

  return sing_word_annotations

Downloading https://raw.githubusercontent.com/stanfordnlp/stanza-resources/main/resources_1.8.0.json:   0%|   …

INFO:stanza:Downloaded file to /root/stanza_resources/resources.json
INFO:stanza:Downloading these customized packages for language: en (English)...
| Processor       | Package        |
------------------------------------
| tokenize        | mimic          |
| pos             | mimic_charlm   |
| lemma           | mimic_nocharlm |
| depparse        | mimic_charlm   |
| ner             | i2b2           |
| forward_charlm  | mimic          |
| pretrain        | mimic          |
| backward_charlm | mimic          |



Downloading https://huggingface.co/stanfordnlp/stanza-en/resolve/v1.8.0/models/tokenize/mimic.pt:   0%|       …

INFO:stanza:Downloaded file to /root/stanza_resources/en/tokenize/mimic.pt


Downloading https://huggingface.co/stanfordnlp/stanza-en/resolve/v1.8.0/models/pos/mimic_charlm.pt:   0%|     …

INFO:stanza:Downloaded file to /root/stanza_resources/en/pos/mimic_charlm.pt


Downloading https://huggingface.co/stanfordnlp/stanza-en/resolve/v1.8.0/models/lemma/mimic_nocharlm.pt:   0%| …

INFO:stanza:Downloaded file to /root/stanza_resources/en/lemma/mimic_nocharlm.pt


Downloading https://huggingface.co/stanfordnlp/stanza-en/resolve/v1.8.0/models/depparse/mimic_charlm.pt:   0%|…

INFO:stanza:Downloaded file to /root/stanza_resources/en/depparse/mimic_charlm.pt


Downloading https://huggingface.co/stanfordnlp/stanza-en/resolve/v1.8.0/models/ner/i2b2.pt:   0%|          | 0…

INFO:stanza:Downloaded file to /root/stanza_resources/en/ner/i2b2.pt


Downloading https://huggingface.co/stanfordnlp/stanza-en/resolve/v1.8.0/models/forward_charlm/mimic.pt:   0%| …

INFO:stanza:Downloaded file to /root/stanza_resources/en/forward_charlm/mimic.pt


Downloading https://huggingface.co/stanfordnlp/stanza-en/resolve/v1.8.0/models/pretrain/mimic.pt:   0%|       …

INFO:stanza:Downloaded file to /root/stanza_resources/en/pretrain/mimic.pt


Downloading https://huggingface.co/stanfordnlp/stanza-en/resolve/v1.8.0/models/backward_charlm/mimic.pt:   0%|…

INFO:stanza:Downloaded file to /root/stanza_resources/en/backward_charlm/mimic.pt
INFO:stanza:Finished downloading models and saved to /root/stanza_resources
INFO:stanza:Checking for updates to resources.json in case models have been updated.  Note: this behavior can be turned off with download_method=None or download_method=DownloadMethod.REUSE_RESOURCES


Downloading https://raw.githubusercontent.com/stanfordnlp/stanza-resources/main/resources_1.8.0.json:   0%|   …

INFO:stanza:Downloaded file to /root/stanza_resources/resources.json
INFO:stanza:Loading these models for language: en (English):
| Processor | Package        |
------------------------------
| tokenize  | mimic          |
| pos       | mimic_charlm   |
| lemma     | mimic_nocharlm |
| depparse  | mimic_charlm   |
| ner       | i2b2           |

INFO:stanza:Using device: cuda
INFO:stanza:Loading: tokenize
  checkpoint = torch.load(filename, lambda storage, loc: storage)
INFO:stanza:Loading: pos
  checkpoint = torch.load(filename, lambda storage, loc: storage)
  data = torch.load(self.filename, lambda storage, loc: storage)
  state = torch.load(filename, lambda storage, loc: storage)
INFO:stanza:Loading: lemma
  checkpoint = torch.load(filename, lambda storage, loc: storage)
INFO:stanza:Loading: depparse
  checkpoint = torch.load(filename, lambda storage, loc: storage)
INFO:stanza:Loading: ner
  checkpoint = torch.load(filename, lambda storage, loc: storage)
INFO:stanza:Done loading pro

In [None]:
def extract_annotations_to_mask_only(sample, mask_ratios):
    # Get all annotations
    phis = get_phi(sample)
    all_annotations = get_all_annotations(sample['text'], phis)

    # Sort annotations
    annotations = {'PHI': [], 'MED': []}
    for anno in all_annotations:
        anno_type = anno['label']
        if anno_type.startswith('PHI'):
            annotations['PHI'].append(anno)
        elif anno_type.startswith('MED'):
            annotations['MED'].append(anno)
        else:
            if anno_type not in annotations:
                annotations[anno_type] = []
            annotations[anno_type].append(anno)

    # Select entities to mask based on given ratios
    entities_to_mask = {}
    for anno_type, annos in annotations.items():
        if anno_type == 'PHI':
            # Mask ratio for PHI is 1.0
            entities_to_mask[anno_type] = annos.copy()
        elif (anno_type == 'MED') or (anno_type not in mask_ratios):
            # Mask ratio for MED or absent types are 0.0
            continue
        else:
            random.seed(55) # For reproducibility
            annotations_to_keep = random.sample(annos, int(len(annos) * mask_ratios[anno_type]))
            entities_to_mask[anno_type] = annotations_to_keep

    # Merge all entities (stop considering annotation types) and order them
    entities_to_mask = sorted(sum(entities_to_mask.values(), []), key=lambda x: x['start'])

    sample['annotations'] = entities_to_mask
    return sample

In [None]:
# Load in tokenizer of trained model
tokenizer = AutoTokenizer.from_pretrained(f'{TRAINED_MODEL_PATH}/tokenizer')

In [None]:
# Extract annotations
test_dataset = test_dataset.map(extract_annotations_to_mask_only, fn_kwargs={'mask_ratios': TEST_MASK_RATIOS})

Map:   0%|          | 0/514 [00:00<?, ? examples/s]

## Letter generations

In [None]:
# Load in trained model
checkpoint_name = [p for p in os.listdir(f'{TRAINED_MODEL_PATH}/checkpoints') if p.startswith('checkpoint-')][0]
FINAL_MODEL = AutoModelForMaskedLM.from_pretrained(f'{TRAINED_MODEL_PATH}/checkpoints/{checkpoint_name}')

In [None]:
# Save original letters
original_texts = test_dataset['text']

In [None]:
# Get fully masked letters
def get_masked_letter(sample):
    text = sample['text']
    for anno in sample['annotations']:
        for idx in range(anno['start'], anno['end']):
            text = text[:idx] + 'ø' + text[idx+1:]
    text = re.sub(r"ø+", "[MASK]", text)
    return text

masked_texts = [get_masked_letter(sample) for sample in test_dataset]

In [None]:
# Prepare mask-fill pipeline
fill_mask_pipeline = pipeline('fill-mask', model=FINAL_MODEL, tokenizer=tokenizer, device=DEVICE)

### Different generation technique functions

In [None]:
def generate_letter_iterative(sample):

  def remove_overlaps(annotations):
    ''' Only keep a single annotation per 'start' index.
    Will select the longest entity for ties to avoid overlaps.
    '''
    # Sort annotations by 'start' and break ties by 'end'
    annotations = sorted(annotations, key=lambda x: (x['start'], -x['end']))

    # Only keep the first occurence of entities with same 'start' value
    passed, new_annotations = set(), []
    for anno in annotations:
      if anno['start'] in passed:
        continue
      new_annotations.append(anno)
      passed.add(anno['start'])

    return new_annotations

  annotations = remove_overlaps(sample['annotations'])
  text = sample['text']
  offset = 0
  for new_word, anno in enumerate(annotations):

    # Get entire sequence with the next annotation masked
    anno_start_idx, anno_end_idx = anno['start']+offset, anno['end']+offset
    masked_sequence = text[:anno_start_idx] + '[MASK]' + text[anno_end_idx:]

    # Get the limited-size chunk with the masked token at its middle index
    tok_masked_sequence = tokenizer(masked_sequence, return_offsets_mapping=True)
    mask_tok_idx = tok_masked_sequence['input_ids'].index(tokenizer.mask_token_id)

    chunk_start_idx = tok_masked_sequence['offset_mapping'][max(0, mask_tok_idx-(CHUNK_SIZE//2))][0]
    chunk_end_idx = tok_masked_sequence['offset_mapping'][min(len(tok_masked_sequence['offset_mapping'])-2, mask_tok_idx+(CHUNK_SIZE//2))][-1]

    masked_sequence = masked_sequence[chunk_start_idx:chunk_end_idx]

    # Predict the masked word with given probabilities
    mask_answers = fill_mask_pipeline(masked_sequence, top_k=6)

    scores = [ans['score'] for ans in mask_answers]
    probabilities = np.array(scores) / sum(scores)

    picked_ans = np.random.choice(list(range(6)), p=probabilities)
    mask_answer = mask_answers[picked_ans]['token_str']

    # Inject answer in the original text and re-calculate offset
    if mask_answer.startswith('##'):
      text = text[:anno_start_idx-1] + mask_answer[2:] + text[anno_end_idx:]
      offset += (len(mask_answer) - 2 - 1) - len(anno['text'])
    else:
      text = text[:anno_start_idx] + mask_answer + text[anno_end_idx:]
      offset += len(mask_answer) - len(anno['text'])

  return text

In [None]:
def fill_sample(masked_text):
  tok_masked_text = tokenizer(masked_text, return_offsets_mapping=True)

  # Get chunks
  chunks, end_idx = [], 0
  for i in range(0, len(tok_masked_text['offset_mapping'])-CHUNK_SIZE-1, CHUNK_SIZE):
    start_idx = tok_masked_text['offset_mapping'][i][0]
    end_idx = tok_masked_text['offset_mapping'][min(i+CHUNK_SIZE, len(tok_masked_text['offset_mapping'])-1)][0]
    if 'MASK' in masked_text[start_idx:end_idx]:
      chunks.append(masked_text[start_idx:end_idx])
  else:
    if 'MASK' in masked_text[end_idx:]:
      chunks.append(masked_text[end_idx:])

  # Predict and fill chunks
  filled_chunks = []
  for chunk in chunks:
    mask_answer = fill_mask_pipeline(chunk, top_k=1)
    if not isinstance(mask_answer[0], list):
      mask_answer = [mask_answer]
    for i in range(len(mask_answer)):
      ans = mask_answer[i][0]['token_str']
      if ans.startswith('##'):
        masked_text = masked_text.replace(' [MASK]', ans[2:], 1)
      else:
        masked_text = masked_text.replace('[MASK]', ans, 1)

  return masked_text

### Generation

In [None]:
''' Pick generation technique to use: '''
if GEN_TECHNIQUE == 'iterative':
  generated_texts = [generate_letter_iterative(sample) for sample in test_dataset]
elif GEN_TECHNIQUE == 'default':
  generated_texts = [fill_sample(text) for text in masked_texts]
else:
  assert False, 'Invalid generation technique'

You seem to be using the pipelines sequentially on GPU. In order to maximize efficiency please use a dataset


In [None]:
# Save original/masked/generated letters
data = {
    'ID': test_dataset['note_id'],
    'Original letter': original_texts,
    'Masked letter': masked_texts,
    'Generated letter': generated_texts,
}

# Write to CSV using csv library

with open(f'{TRAINED_MODEL_PATH}/{HP_FOLDER}/generated_letters.csv', 'w+', newline='', encoding='utf-8') as csvfile:
    fieldnames = ['ID', 'Original letter', 'Masked letter', 'Generated letter']
    writer = csv.DictWriter(csvfile, fieldnames=fieldnames)

    writer.writeheader()
    for i in range(len(data['ID'])):
        writer.writerow({
            'ID': data['ID'][i],
            'Original letter': data['Original letter'][i],
            'Masked letter': data['Masked letter'][i],
            'Generated letter': data['Generated letter'][i]
        })

## Evaluation

### Automatic Evaluation

In [None]:
# --- Lexical Similarity metrics ---

# Get ROUGE metric
rouge_evaluator = evaluate.load('rouge', quiet=True)
rouge_eval = rouge_evaluator.compute(predictions=generated_texts, references=original_texts)

# Get BERTScore metric
P_mul, R_mul, F_mul = score(generated_texts, original_texts, lang="en", rescale_with_baseline=True)
bertscore = float(F_mul.mean())


# --- Readability metrics ---
# https://www.kaggle.com/code/yhirakawa/textstat-how-to-evaluate-readability

# Get SMOG score
gen_smog = np.mean([textstat.smog_index(text) for text in generated_texts])
ori_smog = np.mean([textstat.smog_index(text) for text in original_texts])

# Get Flesch Reading Ease
gen_fre = np.mean([textstat.flesch_reading_ease(text) for text in generated_texts])
ori_fre = np.mean([textstat.flesch_reading_ease(text) for text in original_texts])

# Get Flesch Kincaid Grade
gen_fkg = np.mean([textstat.flesch_kincaid_grade(text) for text in generated_texts])
ori_fkg = np.mean([textstat.flesch_kincaid_grade(text) for text in original_texts])

Error while fetching `HF_TOKEN` secret value from your vault: 'Requesting secret HF_TOKEN timed out. Secrets can only be fetched when running from the Colab UI.'.
You are not authenticated with the Hugging Face Hub in this notebook.
If the error persists, please let us know by opening an issue on GitHub (https://github.com/huggingface/huggingface_hub/issues/new).


Downloading builder script:   0%|          | 0.00/6.27k [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/25.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/482 [00:00<?, ?B/s]

vocab.json:   0%|          | 0.00/899k [00:00<?, ?B/s]

merges.txt:   0%|          | 0.00/456k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/1.36M [00:00<?, ?B/s]



model.safetensors:   0%|          | 0.00/1.42G [00:00<?, ?B/s]

Some weights of RobertaModel were not initialized from the model checkpoint at roberta-large and are newly initialized: ['roberta.pooler.dense.bias', 'roberta.pooler.dense.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


In [None]:
# Load all metric results
# deidentification_scores = {
#     'Deid-Precision': round(deid_precision, 3),
#     'Deid-Recall': round(deid_recall, 3),
#     'Deid-F1': round(deid_f1, 3),
# }

test_results_lexical_similarity = {
    'ROUGE-1': round(rouge_eval['rouge1'], 3),
    'ROUGE-2': round(rouge_eval['rouge2'], 3),
    'ROUGE-L': round(rouge_eval['rougeL'], 3),
    'BERTScore': round(bertscore, 3),
}

test_results_readability = {
    'SMOG-original': round(ori_smog, 3),
    'SMOG-generated': round(gen_smog, 3),

    'FRE-original': round(ori_fre, 3),
    'FRE-generated': round(gen_fre, 3),

    'FKG-original': round(ori_fkg, 3),
    'FKG-generated': round(gen_fkg, 3),
}

# print(deidentification_scores)
print(test_results_lexical_similarity)
print(test_results_readability)

{'ROUGE-1': 0.826, 'ROUGE-2': 0.686, 'ROUGE-L': 0.811, 'BERTScore': 0.663}
{'SMOG-original': 11.067, 'SMOG-generated': 10.832, 'FRE-original': 61.597, 'FRE-generated': 64.294, 'FKG-original': 8.06, 'FKG-generated': 7.636}


In [None]:
# Save logs for trained model evaluation (on testing set)
with open(f'{TRAINED_MODEL_PATH}/{HP_FOLDER}/best_model_test_results.txt', 'a') as f:
  # f.write('Deidentification scores (Philter):\n' + ',\n'.join([f'\t{key}={value}' for key, value in deidentification_scores.items()]) + '.\n\n')
  f.write('Automatic metrics (Lexical Similarity):\n' + ',\n'.join([f'\t{key}={value}' for key, value in test_results_lexical_similarity.items()]) + '.\n')
  f.write('Automatic metrics (Readability):\n' + ',\n'.join([f'\t{key}={value}' for key, value in test_results_readability.items()]) + '.\n\n')

In [None]:
# 70% (iterative)
# {'ROUGE-1': 0.852, 'ROUGE-2': 0.733, 'ROUGE-L': 0.841, 'BERTScore': 0.707}
# {'SMOG-original': 11.067, 'SMOG-generated': 10.905, 'FRE-original': 61.597, 'FRE-generated': 63.51, 'FKG-original': 8.06, 'FKG-generated': 7.754}

In [None]:
# 100% (official)
# {'ROUGE-1': 0.762, 'ROUGE-2': 0.595, 'ROUGE-L': 0.738, 'BERTScore': 0.563}
# {'SMOG-original': 11.067, 'SMOG-generated': 10.31, 'FRE-original': 61.597, 'FRE-generated': 68.374, 'FKG-original': 8.06, 'FKG-generated': 6.924}

# 50% (official)
# {'ROUGE-1': 0.842, 'ROUGE-2': 0.75, 'ROUGE-L': 0.836, 'BERTScore': 0.673}
# {'SMOG-original': 11.067, 'SMOG-generated': 11.214, 'FRE-original': 61.597, 'FRE-generated': 57.65, 'FKG-original': 8.06, 'FKG-generated': 8.439}

# 30% (official)
# {'ROUGE-1': 0.892, 'ROUGE-2': 0.819, 'ROUGE-L': 0.887, 'BERTScore': 0.782}
# {'SMOG-original': 11.067, 'SMOG-generated': 10.927, 'FRE-original': 61.597, 'FRE-generated': 63.069, 'FKG-original': 8.06, 'FKG-generated': 7.807}

### Downstream Evaluation

#### Entities extraction with SciSpacy

In [None]:
nlp_scispacy = spacy.load("en_ner_bc5cdr_md")

  deserializers["tokenizer"] = lambda p: self.tokenizer.from_disk(  # type: ignore[union-attr]


In [None]:
# This function generate anotation for each entities and label
def generate_annotation(texts):
    annotations = []
    for text in texts:
        doc = nlp_scispacy(text)
        entities = []
        for ent in doc.ents:
            entities.append((ent.start_char, ent.end_char, ent.label_, ent.text))
        annotations.append({'text': text, 'entities': entities})
    return annotations

In [None]:
# Generate annotations for original and synthetic letters
original_annotations = generate_annotation(original_texts)
synthetic_annotations = generate_annotation(generated_texts)

In [None]:
# Split in Train, Validation and Testing sets

# Original annotation splits (70/10/20)
train_idx, valid_idx = int(0.70*len(original_annotations)), int(0.80*len(original_annotations))
original_annotations_train, original_annotations_valid, original_annotations_test = original_annotations[:train_idx], original_annotations[train_idx:valid_idx], original_annotations[valid_idx:]

# Synthetic annotation splits (70/10/20)
train_idx, valid_idx = int(0.70*len(synthetic_annotations)), int(0.80*len(synthetic_annotations))
synthetic_annotations_train, synthetic_annotations_valid, synthetic_annotations_test = synthetic_annotations[:train_idx], synthetic_annotations[train_idx:valid_idx], synthetic_annotations[valid_idx:]

#### Fine-tune Spacy on Original vs Synthetic letters

In [None]:
from spacy.tokens import DocBin
from tqdm import tqdm
from spacy.util import filter_spans

nlp_blank = spacy.blank('en')

def training_data_to_docBin(training_data):
  doc_bin = DocBin()
  for training_example in tqdm(training_data):
      text = training_example['text']
      labels = training_example['entities']
      doc = nlp_blank.make_doc(text)
      ents = []
      for start, end, label, _ in labels:
          span = doc.char_span(start, end, label=label, alignment_mode="contract")
          if span is not None:
              ents.append(span)
      filtered_ents = filter_spans(ents)
      doc.ents = filtered_ents
      doc_bin.add(doc)
  return doc_bin



##### On original letters

In [None]:
# Convert annotations data to SpaCy Docbin format
original_annotations_training_docBin = training_data_to_docBin(original_annotations_train)
original_annotations_valid_docBin = training_data_to_docBin(original_annotations_valid)

# Save DocBin to disk for training
original_annotations_training_docBin.to_disk("original_annotations_train.spacy")
original_annotations_valid_docBin.to_disk("original_annotations_valid.spacy")

100%|██████████| 359/359 [00:03<00:00, 102.92it/s]
100%|██████████| 52/52 [00:00<00:00, 122.60it/s]


In [None]:
BASE_CONFIG_PATH = f'{REPO_PATH}/SpacyConfig/base_config.cfg'

In [None]:
# Initialize SpaCy
!python -m spacy init fill-config {BASE_CONFIG_PATH} config.cfg

[38;5;2m✔ Auto-filled config with all values[0m
[38;5;2m✔ Saved config[0m
config.cfg
You can now add your data and train your pipeline:
python -m spacy train config.cfg --paths.train ./train.spacy --paths.dev ./dev.spacy


In [None]:
# Train SpaCy
!python -m spacy train config.cfg --output ./spacy_ft/on_original --paths.train ./original_annotations_train.spacy --paths.dev ./original_annotations_valid.spacy

[38;5;2m✔ Created output directory: spacy_ft/on_original[0m
[38;5;4mℹ Saving to output directory: spacy_ft/on_original[0m
[38;5;4mℹ Using CPU[0m
[38;5;4mℹ To switch to GPU 0, use the option: --gpu-id 0[0m
[1m
[38;5;2m✔ Initialized pipeline[0m
[1m
[38;5;4mℹ Pipeline: ['tok2vec', 'ner'][0m
[38;5;4mℹ Initial learn rate: 0.001[0m
E    #       LOSS TOK2VEC  LOSS NER  ENTS_F  ENTS_P  ENTS_R  SCORE 
---  ------  ------------  --------  ------  ------  ------  ------
  0       0          0.00    672.90    3.35    1.99   10.59    0.03
  0     200       5998.53  19164.98   71.88   80.13   65.18    0.72
  1     400        989.40   7488.35   77.86   81.02   74.95    0.78
  1     600        791.12   5445.82   80.59   87.23   74.88    0.81
  2     800        494.73   4455.62   80.88   82.09   79.71    0.81
  2    1000        606.48   4034.47   83.16   87.08   79.58    0.83
  3    1200        599.33   3612.88   81.66   83.37   80.02    0.82
  3    1400        635.50   3586.86   82.21 

In [None]:
nlp_spacy_on_original = spacy.load("./spacy_ft/on_original/model-best")

# Get annotation predictions from trained Spacy
original_examples = []
for sample in original_annotations_test:
  text, annotations = sample['text'],  [ent[:3] for ent in sample['entities']]
  prediction = nlp_spacy_on_original(text)
  example = Example.from_dict(prediction, {'entities': annotations})
  original_examples.append(example)

In [None]:
# Evaluate trained Spacy model
scorer = Scorer()
original_scores = scorer.score(original_examples)

original_scores = {
    'f-score': round(original_scores['ents_f'], 3),
    'precision': round(original_scores['ents_p'], 3),
    'recall': round(original_scores['ents_r'], 3),
}
print('Original scores:', original_scores)

Original scores: {'f-score': 0.843, 'precision': 0.862, 'recall': 0.824}


In [None]:
# Save logs for downstream on Original data
with open(f'{TRAINED_MODEL_PATH}/{HP_FOLDER}/best_model_test_results.txt', 'a') as f:
  f.write('Downstream NER (Spacy trained on Original letters):\n' + ',\n'.join([f'\t{key}={value}' for key, value in original_scores.items()]) + '.\n')

##### On synthetic letters

In [None]:
# Convert annotations data to SpaCy Docbin format
synthetic_annotations_training_docBin = training_data_to_docBin(synthetic_annotations_train)
synthetic_annotations_valid_docBin = training_data_to_docBin(synthetic_annotations_valid)

# Save DocBin to disk for training
synthetic_annotations_training_docBin.to_disk("synthetic_annotations_train.spacy")
synthetic_annotations_valid_docBin.to_disk("synthetic_annotations_valid.spacy")

100%|██████████| 359/359 [00:02<00:00, 149.69it/s]
100%|██████████| 52/52 [00:00<00:00, 140.62it/s]


In [None]:
# Initialize SpaCy
!python -m spacy init fill-config {BASE_CONFIG_PATH} config.cfg

[38;5;2m✔ Auto-filled config with all values[0m
[38;5;2m✔ Saved config[0m
config.cfg
You can now add your data and train your pipeline:
python -m spacy train config.cfg --paths.train ./train.spacy --paths.dev ./dev.spacy


In [None]:
# Train SpaCy
!python -m spacy train config.cfg --output ./spacy_ft/on_synthetic --paths.train ./synthetic_annotations_train.spacy --paths.dev ./synthetic_annotations_valid.spacy

[38;5;2m✔ Created output directory: spacy_ft/on_synthetic[0m
[38;5;4mℹ Saving to output directory: spacy_ft/on_synthetic[0m
[38;5;4mℹ Using CPU[0m
[38;5;4mℹ To switch to GPU 0, use the option: --gpu-id 0[0m
[1m
[38;5;2m✔ Initialized pipeline[0m
[1m
[38;5;4mℹ Pipeline: ['tok2vec', 'ner'][0m
[38;5;4mℹ Initial learn rate: 0.001[0m
E    #       LOSS TOK2VEC  LOSS NER  ENTS_F  ENTS_P  ENTS_R  SCORE 
---  ------  ------------  --------  ------  ------  ------  ------
  0       0          0.00    655.00    0.00    0.00    0.00    0.00
  0     200       1555.93  16983.06   70.54   79.76   63.22    0.71
  1     400        529.64   6100.80   79.62   81.15   78.14    0.80
  1     600        532.02   4715.21   80.37   84.91   76.30    0.80
  2     800        487.64   3967.02   81.12   82.49   79.79    0.81
  2    1000        556.11   3469.69   82.86   85.53   80.35    0.83
  3    1200        532.65   3164.40   82.41   81.59   83.25    0.82
  3    1400        567.29   3211.69   83.9

In [None]:
nlp_spacy_on_synthetic = spacy.load("./spacy_ft/on_synthetic/model-best")

# Get annotation predictions from trained Spacy
synthetic_examples = []
for sample in original_annotations_test:
  text, annotations = sample['text'],  [ent[:3] for ent in sample['entities']]
  prediction = nlp_spacy_on_synthetic(text)
  example = Example.from_dict(prediction, {'entities': annotations})
  synthetic_examples.append(example)

In [None]:
# Evaluate trained Spacy model
scorer = Scorer()
synthetic_scores = scorer.score(synthetic_examples)

synthetic_scores = {
    'f-score': round(synthetic_scores['ents_f'], 3),
    'precision': round(synthetic_scores['ents_p'], 3),
    'recall': round(synthetic_scores['ents_r'], 3),
}
print('Synthetic scores:', synthetic_scores)

Synthetic scores: {'f-score': 0.844, 'precision': 0.863, 'recall': 0.827}


In [None]:
# Save logs for downstream on Synthetic data
with open(f'{TRAINED_MODEL_PATH}/{HP_FOLDER}/best_model_test_results.txt', 'a') as f:
  f.write('Downstream NER (Spacy trained on Synthetic letters):\n' + ',\n'.join([f'\t{key}={value}' for key, value in synthetic_scores.items()]) + '.\n')