# PII Masking

In [36]:
import pandas as pd
from pprint import pprint
import textwrap
from spacy.matcher import Matcher
import pandas as pd
import spacy
from spacy.training.example import Example
import random
import re
import spacy
from spacy.training.example import Example
import pandas as pd
from spacy.util import minibatch


# Load the JSON dataset
df = pd.read_json('datasets/6_dataset_full.json')

# Display the first few rows of the dataframe
df.head()

Unnamed: 0,id,text,to_mask
0,1,Hej! Jag heter Anna Svensson och vill flytta m...,"[{'label': '[NAMN]', 'value': 'Anna Svensson'}..."
1,2,"Jag har nyligen flyttat till Storgatan 12, 123...","[{'label': '[ADRESS]', 'value': 'Storgatan 12,..."
2,3,Jag vill ändra mina kontaktuppgifter. Mitt nya...,"[{'label': '[TELEFONNUMMER]', 'value': '070-12..."
3,4,Jag försökte logga in med mitt passnummer AB12...,"[{'label': '[PASSNUMMER]', 'value': 'AB1234567..."
4,5,Jag behöver uppdatera mitt bankkontonummer. De...,"[{'label': '[BANKKONTONUMMER]', 'value': 'SE98..."


In [37]:
# Display the first example with text and to_mask columns
first_example = df[['text', 'to_mask']].iloc[0]

# Split the text into multiple lines for better readability
first_example['text'] = '\n'.join(textwrap.wrap(first_example['text'], width=80))
print("Text:")
print(first_example['text'])
print("\nTo Mask:")
pprint(first_example['to_mask'])

Text:
Hej! Jag heter Anna Svensson och vill flytta min pension till er. Behöver jag
uppge mitt personnummer 19921212-5678 redan nu eller räcker det med
organisationsnumret 556677-8899 som jag fått från min arbetsgivare?

To Mask:
[{'label': '[NAMN]', 'value': 'Anna Svensson'},
 {'label': '[PERSONNUMMER]', 'value': '19921212-5678'},
 {'label': '[ORG-NUMMER]', 'value': '556677-8899'}]


In [None]:


# ------------------------------
# Step 0: Load Data and Preprocessing
# ------------------------------
df = pd.read_json('datasets/6_dataset_full.json')

def clean_text(text):
    """Clean and normalize text."""
    return str(text).strip()

def find_entity_spans(text, value):
    """
    Use regex to robustly find entity spans in text.
    This finds the first occurrence of the entity 'value'
    in a case-insensitive manner.
    """
    pattern = re.escape(value)
    match = re.search(pattern, text, flags=re.IGNORECASE)
    if match:
        return match.start(), match.end()
    return None

TRAIN_DATA = []
for _, row in df.iterrows():
    text = clean_text(row['text'])
    to_mask = row['to_mask']
    entities = []
    for item in to_mask:
        label = item['label']
        value = str(item['value']).strip()
        span = find_entity_spans(text, value)
        if span:
            start, end = span
            entities.append((start, end, label))
    if entities:
        TRAIN_DATA.append((text, {"entities": entities}))

# ------------------------------
# Step 1: Split Data
# ------------------------------
train_size = int(0.8 * len(TRAIN_DATA))
train_data = TRAIN_DATA[:train_size]
valid_data = TRAIN_DATA[train_size:]

# ------------------------------
# Step 2: Create and Configure the Model
# ------------------------------
nlp = spacy.blank("sv")  # Create a blank Swedish model

# Add NER pipeline if not already present
if "ner" not in nlp.pipe_names:
    ner = nlp.add_pipe("ner", last=True)
else:
    ner = nlp.get_pipe("ner")

# Add labels from the training data to the NER component
for _, annotations in train_data:
    for start, end, label in annotations["entities"]:
        ner.add_label(label)

# ------------------------------
# Step 3: Train the Model Using Batches with Dropout
# ------------------------------
optimizer = nlp.begin_training()
n_iter = 20  # Number of iterations (epochs)
batch_size = 16

for itn in range(n_iter):
    random.shuffle(train_data)
    batches = minibatch(train_data, size=batch_size)
    losses = {}
    for batch in batches:
        examples = []
        for text, annotations in batch:
            doc = nlp.make_doc(text)
            examples.append(Example.from_dict(doc, annotations))
        nlp.update(examples, sgd=optimizer, drop=0.3, losses=losses)
    print(f"Iteration {itn + 1}/{n_iter} - Losses: {losses}")

# ------------------------------
# Step 4: Define Improved Masking Function
# ------------------------------
def mask_pii(text, model):
    """
    Mask entities in the text with their label names.
    Replaces entities starting from the end to prevent offset issues.
    """
    doc = model(text)
    # Create spans (start, end, label) from detected entities
    spans = [(ent.start_char, ent.end_char, ent.label_) for ent in doc.ents]
    # Sort spans in reverse order by start index
    spans = sorted(spans, key=lambda x: x[0], reverse=True)
    masked_text = text
    for start, end, label in spans:
        masked_text = masked_text[:start] + label + masked_text[end:]
    return masked_text

# ------------------------------
# Step 5: Evaluate and Print Combined Results
# ------------------------------
correct_texts = 0
correct_labels = 0
total_texts = 0
total_labels = 0
failed_labels = []

print("\n=== Evaluation on Validation Data ===\n")
for text, annotations in valid_data:
    masked_text = mask_pii(text, nlp)
    
    # Print the original and masked text for inspection
    print("Original Text:")
    print(text)
    print("Masked Text:")
    print(masked_text)
    print("-" * 40)
    
    text_correct = True
    # Evaluate each entity label in the annotation
    for start, end, label in annotations["entities"]:
        total_labels += 1
        # Check if the label appears in the masked text.
        if label in masked_text:
            correct_labels += 1
        else:
            text_correct = False
            failed_labels.append((label, text[start:end]))
    
    if text_correct:
        correct_texts += 1
    total_texts += 1

# Print out details of failed maskings
if failed_labels:
    print("\nFAILED MASKINGS:")
    for label, value in failed_labels:
        print(f"Label: {label}, Expected Value: {value}")
else:
    print("\nAll entities were successfully masked in every text!")

# Calculate and print overall accuracies
text_accuracy = correct_texts / total_texts if total_texts > 0 else 0
label_accuracy = correct_labels / total_labels if total_labels > 0 else 0

print(f"\nText Accuracy (all entities in a text must be masked correctly): {text_accuracy:.2%}")
print(f"Label Accuracy (individual entity masking): {label_accuracy:.2%}")




Iteration 1/20 - Losses: {'ner': np.float32(6193.1616)}
Iteration 2/20 - Losses: {'ner': np.float32(1474.3865)}
Iteration 3/20 - Losses: {'ner': np.float32(1422.373)}
Iteration 4/20 - Losses: {'ner': np.float32(1176.7241)}
Iteration 5/20 - Losses: {'ner': np.float32(1007.9409)}
