In [None]:
pip install transformers datasets

Collecting datasets
  Downloading datasets-3.1.0-py3-none-any.whl.metadata (20 kB)
Collecting dill<0.3.9,>=0.3.0 (from datasets)
  Downloading dill-0.3.8-py3-none-any.whl.metadata (10 kB)
Collecting xxhash (from datasets)
  Downloading xxhash-3.5.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (12 kB)
Collecting multiprocess<0.70.17 (from datasets)
  Downloading multiprocess-0.70.16-py310-none-any.whl.metadata (7.2 kB)
Collecting fsspec<=2024.9.0,>=2023.1.0 (from fsspec[http]<=2024.9.0,>=2023.1.0->datasets)
  Downloading fsspec-2024.9.0-py3-none-any.whl.metadata (11 kB)
Downloading datasets-3.1.0-py3-none-any.whl (480 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m480.6/480.6 kB[0m [31m19.0 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading dill-0.3.8-py3-none-any.whl (116 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m116.3/116.3 kB[0m [31m11.0 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading fsspec-2024.9.0-py3-none-any.whl 

In [None]:
pip install evaluate

Collecting evaluate
  Downloading evaluate-0.4.3-py3-none-any.whl.metadata (9.2 kB)
Downloading evaluate-0.4.3-py3-none-any.whl (84 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m84.0/84.0 kB[0m [31m4.3 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: evaluate
Successfully installed evaluate-0.4.3


In [None]:
pip install seqeval

Collecting seqeval
  Downloading seqeval-1.2.2.tar.gz (43 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/43.6 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m43.6/43.6 kB[0m [31m2.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: seqeval
  Building wheel for seqeval (setup.py) ... [?25l[?25hdone
  Created wheel for seqeval: filename=seqeval-1.2.2-py3-none-any.whl size=16161 sha256=aa72d4a667299692988f597a6bc0f798f7dd62bcd72519972ea3d4d4ddea0aa5
  Stored in directory: /root/.cache/pip/wheels/1a/67/4a/ad4082dd7dfc30f2abfe4d80a2ed5926a506eb8a972b4767fa
Successfully built seqeval
Installing collected packages: seqeval
Successfully installed seqeval-1.2.2


In [None]:
import pandas as pd
import numpy as np
import torch
from sklearn.model_selection import train_test_split
from transformers import (
    RobertaTokenizerFast,
    RobertaForTokenClassification,
    DataCollatorForTokenClassification,
    Trainer,
    TrainingArguments
)
from datasets import Dataset

### Data Preparation

In [None]:
import evaluate

metric = evaluate.load("seqeval")

def compute_metrics(pred):
    predictions, labels = pred
    predictions = np.argmax(predictions, axis=2)

    true_predictions = [
        [id2label[p] for (p, l) in zip(prediction, label) if l != -100]
        for prediction, label in zip(predictions, labels)
    ]
    true_labels = [
        [id2label[l] for (p, l) in zip(prediction, label) if l != -100]
        for prediction, label in zip(predictions, labels)
    ]

    results = metric.compute(predictions=true_predictions, references=true_labels)
    return {
        "precision": results["overall_precision"],
        "recall": results["overall_recall"],
        "f1": results["overall_f1"],
        "accuracy": results["overall_accuracy"],
    }


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


Downloading builder script:   0%|          | 0.00/6.34k [00:00<?, ?B/s]

In [None]:
def prepare_ner_data(data):
    annotated_data = []

    for _, row in data.iterrows():
        text = row['Identifier'].lower()
        label = row['Tag']

        words = text.split()
        if label in ['PHONE', 'SSN', 'EMAIL', 'ADDRESS', 'DATE', 'MRN', 'MBI', 'ACCOUNT', 'PERSON']:
            annotated_data.append({
                'tokens': words,
                'ner_tags': ['B-' + label] + ['I-' + label] * (len(words) - 1)
            })
        else:
            annotated_data.append({
                'tokens': words,
                'ner_tags': ['O'] * len(words)
            })

    return annotated_data


In [None]:
def load_and_preprocess_data(filepath):
    """
    Load data and prepare for NER

    Args:
        filepath (str): Path to input data file

    Returns:
        tuple: Preprocessed data and label mappings
    """
    # Read the data
    data = pd.read_csv(filepath)

    # Prepare annotated data
    annotated_data = prepare_ner_data(data)

    # Create label mappings
    all_tags = set()
    for item in annotated_data:
        all_tags.update(item['ner_tags'])

    label2id = {label: idx for idx, label in enumerate(sorted(all_tags))}
    id2label = {idx: label for label, idx in label2id.items()}

    return annotated_data, label2id, id2label

In [None]:
def split_data(annotated_data, test_size=0.3, val_size=0.5, random_state=42):
    """
    Split data into train, validation, and test sets

    Args:
        annotated_data (list): Preprocessed NER data
        test_size (float): Proportion of data for test set
        val_size (float): Proportion of remaining data for validation
        random_state (int): Random seed for reproducibility

    Returns:
        tuple: Train, validation, and test datasets
    """
    # First split: separate test set
    train_data, test_data = train_test_split(
        annotated_data,
        test_size=test_size,
        random_state=random_state
    )

    # Second split: separate validation from remaining training data
    train_data, val_data = train_test_split(
        test_data,
        test_size=val_size,
        random_state=random_state
    )

    return train_data, val_data, test_data

In [None]:
def tokenize_and_align_labels(examples, tokenizer, label2id):
    """
    Tokenize input and align labels

    Args:
        examples (dict): Dataset examples
        tokenizer (PreTrainedTokenizer): Tokenizer to use
        label2id (dict): Mapping of labels to ids

    Returns:
        dict: Tokenized inputs with aligned labels
    """
    tokenized_inputs = tokenizer(
        examples["tokens"],
        truncation=True,
        is_split_into_words=True,
        padding="max_length",
        max_length=128,
    )

    labels = []
    for i, label in enumerate(examples["ner_tags"]):
        word_ids = tokenized_inputs.word_ids(batch_index=i)  # Map subwords to original word indices
        label_ids = []
        previous_word_idx = None

        for word_idx in word_ids:
            if word_idx is None:  # Padding token
                label_ids.append(-100)
            elif word_idx != previous_word_idx:  # First subword of a word
                label_ids.append(label2id.get(label[word_idx], 0))
            else:  # Subword tokens
                label_ids.append(-100)  # Ignore subwords for NER tagging
            previous_word_idx = word_idx

        labels.append(label_ids)

    tokenized_inputs["labels"] = labels
    return tokenized_inputs

In [None]:
 # Load and preprocess data
annotated_data, label2id, id2label = load_and_preprocess_data("PHI_synthetic_data_2.csv")

In [None]:
  # Split data
train_data, val_data, test_data = split_data(annotated_data)

In [None]:
# Create datasets
train_dataset = Dataset.from_dict({
        'tokens': [item['tokens'] for item in train_data],
        'ner_tags': [item['ner_tags'] for item in train_data]
    })

val_dataset = Dataset.from_dict({
        'tokens': [item['tokens'] for item in val_data],
        'ner_tags': [item['ner_tags'] for item in val_data]
    })

test_dataset = Dataset.from_dict({
        'tokens': [item['tokens'] for item in test_data],
        'ner_tags': [item['ner_tags'] for item in test_data]
    })


In [None]:
# Load tokenizer
tokenizer = RobertaTokenizerFast.from_pretrained("roberta-base", add_prefix_space=True)

# Tokenize datasets
tokenized_train_dataset = train_dataset.map(
    lambda x: tokenize_and_align_labels(x, tokenizer, label2id),
    batched=True,
    remove_columns=train_dataset.column_names,
)

tokenized_val_dataset = val_dataset.map(
    lambda x: tokenize_and_align_labels(x, tokenizer, label2id),
    batched=True,
    remove_columns=val_dataset.column_names,
)

tokenized_test_dataset = test_dataset.map(
    lambda x: tokenize_and_align_labels(x, tokenizer, label2id),
    batched=True,
    remove_columns=test_dataset.column_names,
)


tokenizer_config.json:   0%|          | 0.00/25.0 [00:00<?, ?B/s]

vocab.json:   0%|          | 0.00/899k [00:00<?, ?B/s]

merges.txt:   0%|          | 0.00/456k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/1.36M [00:00<?, ?B/s]

config.json:   0%|          | 0.00/481 [00:00<?, ?B/s]

Map:   0%|          | 0/15450 [00:00<?, ? examples/s]

Map:   0%|          | 0/15450 [00:00<?, ? examples/s]

Map:   0%|          | 0/30900 [00:00<?, ? examples/s]

### Model Training and Evaluation

In [None]:
# Configure model
model = RobertaForTokenClassification.from_pretrained(
    "roberta-base",
    num_labels=len(label2id),
    id2label=id2label,
    label2id=label2id
)

model.safetensors:   0%|          | 0.00/499M [00:00<?, ?B/s]

Some weights of RobertaForTokenClassification were not initialized from the model checkpoint at roberta-base and are newly initialized: ['classifier.bias', 'classifier.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


In [None]:
# Prepare data collator
data_collator = DataCollatorForTokenClassification(tokenizer=tokenizer)


In [None]:
# Training arguments with more detailed logging
training_args = TrainingArguments(
    output_dir="./model/results",
    evaluation_strategy="epoch",
    save_strategy="epoch",
    learning_rate=2e-5,
    per_device_train_batch_size=16,
    per_device_eval_batch_size=16,
    num_train_epochs=3,
    weight_decay=0.01,
    logging_dir=".model/logs",
    logging_steps=50,
    seed=42,
    load_best_model_at_end=True,
    metric_for_best_model="eval_loss",
    report_to="none",
)




In [None]:
# Initialize Trainer with custom metrics

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=tokenized_train_dataset,
    eval_dataset=tokenized_val_dataset,
    tokenizer=tokenizer,
    data_collator=data_collator,
    compute_metrics=compute_metrics,
)

# Train the model
trainer.train()

  trainer = Trainer(


Epoch,Training Loss,Validation Loss,Precision,Recall,F1,Accuracy
1,0.0049,0.0028,0.999555,0.999555,0.999555,0.999631
2,0.0003,0.004801,0.999406,0.999406,0.999406,0.999499
3,0.0002,0.004437,0.999406,0.999406,0.999406,0.999499


TrainOutput(global_step=2898, training_loss=0.04101341146991181, metrics={'train_runtime': 682.0093, 'train_samples_per_second': 67.961, 'train_steps_per_second': 4.249, 'total_flos': 3028241525644800.0, 'train_loss': 0.04101341146991181, 'epoch': 3.0})

In [None]:
trainer.save_model("./PHI_NER_RoBERTa")

In [None]:
# Test dataset evaluation
test_results = trainer.evaluate(eval_dataset=tokenized_test_dataset)

print("Test Results:")
print(f"Loss: {test_results['eval_loss']}")
print(f"F1 Score: {test_results['eval_f1']}")
print(f"Precision: {test_results['eval_precision']}")
print(f"Recall: {test_results['eval_recall']}")

Test Results:
Loss: 0.0014725603396072984
F1 Score: 0.999777753083676
Precision: 0.999777753083676
Recall: 0.999777753083676


### PHI NER on Documents

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
import torch
from transformers import AutoTokenizer, AutoModelForTokenClassification
from transformers import pipeline

# Load your trained NER model
model = AutoModelForTokenClassification.from_pretrained("/content/drive/MyDrive/NEU/NLP/Project/model/PHI_NER_RoBERTa")
tokenizer = AutoTokenizer.from_pretrained("/content/drive/MyDrive/NEU/NLP/Project/model/PHI_NER_RoBERTa")

# Initialize NER pipeline
nlp_ner = pipeline("ner", model=model, tokenizer=tokenizer)

In [None]:
def read_file_to_list(filepath):
    """Reads a file, processes each line, and appends it to a list.
    Args:
        filepath: The path to the file.
    Returns:
        A list of strings, where each string is a line from the file.
    """
    lines = []
    try:
        with open(filepath, 'r') as file:
            for line in file:
                lines.append(line.strip())  # Remove leading/trailing whitespace
    except FileNotFoundError:
        print(f"Error: File not found at {filepath}")
        return None
    return lines

# Example usage
file_content = read_file_to_list("/content/drive/MyDrive/NEU/NLP/Project/test_phi_document.txt")

if file_content:
    # Now 'file_content' contains a list of strings, where each string is a line
    print(f"Number of lines read: {len(file_content)}")

Number of lines read: 63


In [None]:
file_content

['Patient Medical Report',
 'Patient Information:',
 '',
 'Name: Johnathan Doe',
 '',
 'DOB: 1982-04-15',
 '',
 'Social Security Number:798-77-0047',
 '',
 'Contact Number: (270) 720-7109',
 '',
 'Email Address: johndoe@example.com',
 '',
 'Medical Record Number: MRN-7894-5671',
 '',
 'Medicare Beneficiary Identifier: 1EG4-TE5-MK73',
 '',
 'Patient Account Number: BANK-2262-20691',
 '',
 'Address: 23 Elm Street, Springfield, IL 62704',
 '',
 '',
 'Medical History:',
 '',
 'Known Allergies: Penicillin, Pollen',
 'Chronic Conditions: Type 2 Diabetes, Hypertension',
 '',
 'Past Surgeries:',
 'Appendectomy: 2010-05-12',
 'Knee Replacement: 2018-11-20',
 '',
 '',
 'Current Visit Details:',
 'Visit Date: 2023-12-07',
 'Chief Complaint: Persistent chest pain and shortness of breath',
 'Preliminary Diagnosis: Angina',
 '',
 'Physician:',
 'Dr. Emily Hernandez, MD',
 'EMAIL: emilyhernandez@hospitalcare.com',
 '',
 'Prescriptions:',
 'Medication Name: Metformin',
 'Dosage: 500 mg',
 'Frequency: 

In [None]:
for i in range(len(file_content)):
  print(file_content[i])
  print(nlp_ner(file_content[i]))
  print("\n")

Patient Medical Report
[]


Patient Information:
[]



[]


Name: Johnathan Doe
[{'entity': 'B-PERSON', 'score': 0.9989774, 'index': 1, 'word': 'ĠName', 'start': 0, 'end': 4}, {'entity': 'I-PERSON', 'score': 0.9601431, 'index': 2, 'word': ':', 'start': 4, 'end': 5}, {'entity': 'I-PERSON', 'score': 0.9992855, 'index': 3, 'word': 'ĠJohn', 'start': 6, 'end': 10}, {'entity': 'I-PERSON', 'score': 0.9992447, 'index': 4, 'word': 'athan', 'start': 10, 'end': 15}, {'entity': 'I-PERSON', 'score': 0.9993339, 'index': 5, 'word': 'ĠDoe', 'start': 16, 'end': 19}]



[]


DOB: 1982-04-15
[{'entity': 'B-DATE', 'score': 0.9998252, 'index': 1, 'word': 'ĠDO', 'start': 0, 'end': 2}, {'entity': 'I-DATE', 'score': 0.99969447, 'index': 2, 'word': 'B', 'start': 2, 'end': 3}, {'entity': 'I-DATE', 'score': 0.9996916, 'index': 3, 'word': ':', 'start': 3, 'end': 4}, {'entity': 'I-DATE', 'score': 0.9997398, 'index': 4, 'word': 'Ġ1982', 'start': 5, 'end': 9}, {'entity': 'I-DATE', 'score': 0.999721, 'index': 5, 'wor

### Anonymizing Documents

In [None]:
ENTITIES_TO_MASK = {"PERSON", "EMAIL", "ADDRESS", "DATE", "PHONE", "SSN", "MRN", "MBI", "ACCOUNT"}

def mask_entities_in_line(line, ner_pipeline):
    if not line.strip():
        return line

    outputs = ner_pipeline(line)
    if not outputs:
        return line

    merged_entities = []
    current_entity = None

    for token in outputs:
        entity_label = token['entity']
        # Extract the main label after B-/I- prefix if present
        if '-' in entity_label:
            main_label = entity_label.split('-', 1)[1]
        else:
            main_label = entity_label

        start_idx = token['start']
        end_idx = token['end']

        if main_label in ENTITIES_TO_MASK:
            if current_entity is None:
                current_entity = {
                    'label': main_label,
                    'start': start_idx,
                    'end': end_idx
                }
            else:
                # Continue same entity if labels match
                if main_label == current_entity['label']:
                    current_entity['end'] = end_idx
                else:
                    # Different entity encountered, close old one and start new
                    merged_entities.append(current_entity)
                    current_entity = {
                        'label': main_label,
                        'start': start_idx,
                        'end': end_idx
                    }
        else:
            # Non-entity token ends an entity span if we were in one
            if current_entity is not None:
                merged_entities.append(current_entity)
                current_entity = None

    # Close off the last entity if still open
    if current_entity is not None:
        merged_entities.append(current_entity)

    # Sort entities by start position
    merged_entities.sort(key=lambda x: x['start'])

    # Build masked line
    masked_line = ""
    last_end = 0
    for ent in merged_entities:
        masked_line += line[last_end:ent['start']] + "**********[" + ent['label'] + "]"
        last_end = ent['end']
    masked_line += line[last_end:]

    return masked_line

def mask_document(input_filepath, output_filepath, ner_pipeline):
    lines = read_file_to_list(input_filepath)
    if lines is None:
        return
    with open(output_filepath, 'w') as out_file:
        for line in lines:
            masked_line = mask_entities_in_line(line, ner_pipeline)
            out_file.write(masked_line + "\n")




In [None]:
mask_document("/content/drive/MyDrive/NEU/NLP/Project/test_phi_document.txt", "output_document.txt", nlp_ner)