In [59]:
# install presidio evaluator via pip if not yet installed

!pip install presidio-evaluator azure-ai-textanalytics  cryptography==43.0 kaleido==0.2.1



In [60]:
from pathlib import Path
from pprint import pprint
from collections import Counter
from typing import Dict, List
import json

from presidio_evaluator import InputSample
from presidio_evaluator.evaluation import SpanEvaluator, ModelError, Plotter
from presidio_evaluator.experiment_tracking import get_experiment_tracker

import pandas as pd

pd.set_option("display.max_columns", None)
pd.set_option("display.max_rows", None)
pd.set_option("display.max_colwidth", None)


## 1. Load dataset from file

In [61]:
dataset_name = "synth_dataset_v2.json"
dataset = InputSample.read_dataset_json(dataset_name)
print(len(dataset))

tokenizing input: 100%|â–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆ| 6000/6000 [01:16<00:00, 78.86it/s]

6000





This dataset was auto generated. See more info here [Synthetic data generation](1_Generate_data.ipynb).

In [62]:
def get_entity_counts(dataset: List[InputSample]) -> Dict:
    """Return a dictionary with counter per entity type."""
    entity_counter = Counter()
    for sample in dataset:
        for tag in sample.tags:
            entity_counter[tag] += 1
    return entity_counter


## 2. Simple dataset statistics

In [63]:
entity_counts = get_entity_counts(dataset)
print("Count per entity:")
pprint(entity_counts.most_common(), compact=True)

print("\nMin and max number of tokens in dataset: "\
f"Min: {min([len(sample.tokens) for sample in dataset])}, "\
f"Max: {max([len(sample.tokens) for sample in dataset])}")

print(f"Min and max sentence length in dataset: " \
f"Min: {min([len(sample.full_text) for sample in dataset])}, "\
f"Max: {max([len(sample.full_text) for sample in dataset])}")

print("\nExample InputSample:")
print(dataset[0])

Count per entity:
[('O', 162466), ('STREET_ADDRESS', 28228), ('PERSON', 14779), ('GPE', 5791),
 ('ORGANIZATION', 5575), ('CREDIT_CARD', 4878), ('PHONE_NUMBER', 3135),
 ('DATE_TIME', 2397), ('EMAIL_ADDRESS', 1854), ('DOMAIN_NAME', 1708),
 ('TITLE', 1425), ('IBAN_CODE', 974), ('NRP', 661), ('US_SSN', 560),
 ('IP_ADDRESS', 512), ('ZIP_CODE', 487), ('AGE', 474),
 ('US_DRIVER_LICENSE', 137)]

Min and max number of tokens in dataset: Min: 3, Max: 407
Min and max sentence length in dataset: Min: 9, Max: 813

Example InputSample:
Full text: The address of Persint is 6750 Koskikatu 25 Apt. 864
Artilleros
, CO
 Uruguay 64677
Spans: [Span(type: STREET_ADDRESS, value: 6750 Koskikatu 25 Apt. 864
Artilleros
, CO
 Uruguay 64677, char_span: [26: 83]), Span(type: ORGANIZATION, value: Persint, char_span: [15: 22])]



In [64]:
print("A few examples sentences containing each entity:\n")
for entity in entity_counts.keys():
    samples = [sample for sample in dataset if entity in set(sample.tags)]
    if len(samples) > 1 and entity != "O":
        print(f"Entity: <{entity}> two example sentences:\n"
              f"\n1) {samples[0].full_text}"
              f"\n2) {samples[1].full_text}"
              f"\n------------------------------------\n")

A few examples sentences containing each entity:

Entity: <ORGANIZATION> two example sentences:

1) The address of Persint is 6750 Koskikatu 25 Apt. 864
Artilleros
, CO
 Uruguay 64677
2) T h e   a d d r e s s   o f   P e r s i n t   i s   6 7 5 0   K o s k i k a t u   2 5   A p t .   8 6 4 
 A r t i l l e r o s 
 ,   C O 
   U r u g u a y   6 4 6 7 7
------------------------------------

Entity: <STREET_ADDRESS> two example sentences:

1) The address of Persint is 6750 Koskikatu 25 Apt. 864
Artilleros
, CO
 Uruguay 64677
2) T h e   a d d r e s s   o f   P e r s i n t   i s   6 7 5 0   K o s k i k a t u   2 5   A p t .   8 6 4 
 A r t i l l e r o s 
 ,   C O 
   U r u g u a y   6 4 6 7 7
------------------------------------

Entity: <PERSON> two example sentences:

1) KrisztiÃ¡n SzÃ¶llÃ¶sy listed his top 20 songs for Entertainment Weekly and had the balls to list this song at #15. (What did he put at #1 you ask? Answer:"Tube Snake Boogie" by Szabina J GelencsÃ©r ×’â‚¬â€œ go figure)
2) K

## 3. Define the AnalyzerEngine object
Using Presidio with default parameters (not recommended, it's used here for simplicity). For an example on customization, see [notebook 5](5_Evaluate_Custom_Presidio_Analyzer.ipynb)

In [65]:
from typing import List, Dict, Optional
from transformers import pipeline
from presidio_evaluator import InputSample, span_to_tag
from presidio_evaluator.models import BaseModel

class CustomBertNERWrapper(BaseModel):
    def __init__(
        self,
        model_path_or_name: str,
        entity_mapping: Optional[Dict[str, str]] = None
    ):
        super().__init__(entity_mapping=entity_mapping)
        self.name = "Custom BERT NER Model"
        self.entity_mapping = entity_mapping or {}

        print(f"Loading BERT model: {model_path_or_name} ...")
        # ðŸ’¡ Core trick: aggregation_strategy="simple"
        # BERT splits words into subwords. This parameter automatically merges B- and I- tags,
        # directly outputting the start and end character indices of the full entity, saving a lot of manual stitching.
        self.pipeline = pipeline("ner", model=model_path_or_name, aggregation_strategy="simple")

    def predict(self, sample: InputSample, **kwargs) -> List[str]:
        # 1. Let the BERT model infer on the text
        predictions = self.pipeline(sample.full_text)

        if predictions:
            raw_tags = set([p["entity_group"] for p in predictions])
        starts = []
        ends = []
        scores = []
        tags = []

        # 2. Parse the output of the BERT pipeline
        # Output format is typically: [{'entity_group': 'PER', 'score': 0.99, 'start': 0, 'end': 5}, ...]
        for pred in predictions:
            starts.append(pred["start"])
            ends.append(pred["end"])
            scores.append(float(pred["score"]))

            # 3. Label mapping (if BERT outputs 'PER' and your dataset calls it 'PERSON', this converts it)
            raw_label = pred["entity_group"]
            mapped_label = self.entity_mapping.get(raw_label, raw_label)
            tags.append(mapped_label)

        # 4. Convert spans to Token-level tags required by the Evaluator
        return span_to_tag(
            scheme="IO",
            text=sample.full_text,
            starts=starts,
            ends=ends,
            tokens=sample.tokens,
            scores=scores,
            tags=tags,
        )

    # Implement batch_predict method to satisfy BaseModel abstract class requirements
    def batch_predict(self, dataset: List[InputSample], **kwargs) -> List[List[str]]:

        return [self.predict(sample, **kwargs) for sample in dataset]


# ==========================================
# Instantiation and Configuration
# ==========================================

# 1. Define label mapping (Very Important!)
# Assuming your BERT model was trained on datasets like CoNLL-2003, it might output PER, LOC, ORG
bert_entity_mapping = {
    "PER": "PERSON",
    "LOC": "LOCATION",
    "ORG": "ORGANIZATION",
    "MISC": "O" # Ignore irrelevant entities
}

# 2. Instantiate model wrapper
# If using a locally finetuned model, fill in the folder path, e.g., "./my_finetuned_bert"
# If using an open-source model from Hugging Face, fill in the ID, e.g., "dslim/bert-base-NER"
bert_model = CustomBertNERWrapper(
    model_path_or_name="dslim/bert-base-NER",
    entity_mapping=bert_entity_mapping
)

Loading BERT model: dslim/bert-base-NER ...


Loading weights:   0%|          | 0/199 [00:00<?, ?it/s]

BertForTokenClassification LOAD REPORT from: dslim/bert-base-NER
Key                      | Status     |  | 
-------------------------+------------+--+-
bert.pooler.dense.weight | UNEXPECTED |  | 
bert.pooler.dense.bias   | UNEXPECTED |  | 

Notes:
- UNEXPECTED	:can be ignored when loading from different task/architecture; not ok if you expect identical arch.


In [66]:
import pandas as pd
import numpy as np

summary_data = []

# Use optimized results variable v2
current_evaluator = evaluator_v2
current_results = evaluation_results_v2

# 1. Extract all obfuscation types
obs_types = set([
    sample.metadata.get("obfuscation_type")
    for sample in dataset
    if sample.metadata and "obfuscation_type" in sample.metadata
])

if any(s.metadata is None or "obfuscation_type" not in s.metadata for s in dataset):
    obs_types.add("Original/None")

# 2. Group calculation by obfuscation type
for obs_type in obs_types:

    filtered_results = []
    for res, sample in zip(current_results, dataset):
        current_type = sample.metadata.get("obfuscation_type") if sample.metadata else "Original/None"
        if current_type == obs_type:
            filtered_results.append(res)

    if not filtered_results:
        continue

    score = current_evaluator.calculate_score(filtered_results)

    p = score.pii_precision
    r = score.pii_recall
    f1 = score.pii_f

    # Calculate F2 Score (Beta=2) manually
    beta = 2
    if (beta**2 * p + r) > 0:
        f2 = (1 + beta**2) * (p * r) / ((beta**2 * p) + r)
    else:
        f2 = 0.0

    summary_data.append({
        "Obfuscation Type": obs_type,
        "Sample Count": len(filtered_results),
        "Precision": p,
        "Recall": r,
        "F1 Score": f1,
        "F2 Score": f2
    })

# 5. Convert to pretty DataFrame and print
df_summary = pd.DataFrame(summary_data)

# Sort by F2 Score descending
df_summary = df_summary.sort_values(by="F2 Score", ascending=False)

# Format percentage display
cols_to_format = ["Precision", "Recall", "F1 Score", "F2 Score"]
for col in cols_to_format:
    df_summary[col] = df_summary[col].apply(lambda x: f"{x:.2%}")

print("\n====== Obfuscation Types comparison (Piiranha-v1 Optimized) - with F2 Score \n")
display(df_summary)





Unnamed: 0,Obfuscation Type,Sample Count,Precision,Recall,F1 Score,F2 Score
3,,1500,59.04%,42.36%,44.90%,44.90%
1,5-space,1500,53.49%,41.93%,43.83%,43.83%
0,textualization,1500,36.48%,23.38%,25.19%,25.19%
2,1-space,1500,17.35%,3.96%,4.68%,4.68%


In [67]:
bert_entity_mapping = {
    "PER": "PERSON",      # BERT's PER corresponds to our PERSON
    "LOC": "LOCATION",    # BERT's LOC corresponds to our LOCATION
    "ORG": "ORGANIZATION",
    "MISC": "O"           # We don't care about MISC (e.g. events, works), label as O
}

bert_model = CustomBertNERWrapper(
    model_path_or_name="dslim/bert-base-NER",
    entity_mapping=bert_entity_mapping
)


print("Starting model evaluation...")

# Initialize Evaluator
evaluator = SpanEvaluator(model=bert_model)

# Pass the test set and evaluate (using the dataset list here)
evaluation_results = evaluator.evaluate_all(dataset=dataset)
results = evaluator.calculate_score(evaluation_results)

# Calculate and get confusion matrix
entities, confmatrix = results.to_confusion_matrix()

# Print formatted output as in the original notebook
print("\nConfusion matrix:")
print(pd.DataFrame(confmatrix, columns=entities, index=entities))

print("\nPrecision and recall")
pprint(results)

Loading BERT model: dslim/bert-base-NER ...


Loading weights:   0%|          | 0/199 [00:00<?, ?it/s]

BertForTokenClassification LOAD REPORT from: dslim/bert-base-NER
Key                      | Status     |  | 
-------------------------+------------+--+-
bert.pooler.dense.weight | UNEXPECTED |  | 
bert.pooler.dense.bias   | UNEXPECTED |  | 

Notes:
- UNEXPECTED	:can be ignored when loading from different task/architecture; not ok if you expect identical arch.


Starting model evaluation...
Mapping entity values using this dictionary: {'PER': 'PERSON', 'LOC': 'LOCATION', 'ORG': 'ORGANIZATION', 'MISC': 'O'}
Running model CustomBertNERWrapper on dataset...



skip words not provided, using default skip words. If you want the evaluation to not use skip words, pass skip_words=[]



Finished running model on dataset

Confusion matrix:
                   AGE  CREDIT_CARD  DATE_TIME  DOMAIN_NAME  EMAIL_ADDRESS  \
AGE                  0            0          0            0              0   
CREDIT_CARD          0            0          0            0              0   
DATE_TIME            0            0          0            0              0   
DOMAIN_NAME          0            0          0            0              0   
EMAIL_ADDRESS        0            0          0            0              0   
GPE                  0            0          0            0              0   
IBAN_CODE            0            0          0            0              0   
IP_ADDRESS           0            0          0            0              0   
LOCATION             0            0          0            0              0   
NRP                  0            0          0            0              0   
ORGANIZATION         0            0          0            0              0   
PERSON     

In [68]:
# Calculate and get confusion matrix
entities, confmatrix = results.to_confusion_matrix()

# Print formatted output
print("\nConfusion matrix:")
print(pd.DataFrame(confmatrix, columns=entities, index=entities))

print("\nPrecision and recall")
pprint(results)


Confusion matrix:
                   AGE  CREDIT_CARD  DATE_TIME  DOMAIN_NAME  EMAIL_ADDRESS  \
AGE                  0            0          0            0              0   
CREDIT_CARD          0            0          0            0              0   
DATE_TIME            0            0          0            0              0   
DOMAIN_NAME          0            0          0            0              0   
EMAIL_ADDRESS        0            0          0            0              0   
GPE                  0            0          0            0              0   
IBAN_CODE            0            0          0            0              0   
IP_ADDRESS           0            0          0            0              0   
LOCATION             0            0          0            0              0   
NRP                  0            0          0            0              0   
ORGANIZATION         0            0          0            0              0   
PERSON               0            0          

In [69]:
import os
from pathlib import Path
from presidio_evaluator.evaluation import Plotter

# 1. Set output folder path for charts
# Recommended to create in current working directory (cwd) for easy viewing and downloading in Colab
output_folder = Path(Path.cwd(), "plotter_output")

# Ensure folder exists, create if not to prevent FileNotFoundError
output_folder.mkdir(parents=True, exist_ok=True)

print(f"Preparing to generate plots, model name: {evaluator.model.name}")

# 2. Instantiate Plotter
# beta = 2 means calculating F2-Score (In PII detection, we usually care more about recall, so beta=2 is appropriate)
plotter = Plotter(
    results=results, # Pass results generated by evaluator.calculate_score()
    model_name=evaluator.model.name,
    save_as="svg",   # Save as high-res vector image
    beta=2
)

# 3. Generate and save plots
plotter.plot_scores(output_folder=output_folder)

print(f"âœ… Plots generated! Check folder in file browser: {output_folder.absolute()}")

Preparing to generate plots, model name: Custom BERT NER Model


âœ… Plots generated! Check folder in file browser: /content/plotter_output


The default bert model is not enough of supported enetity types. We need a better one.

In [70]:
# 1. Advanced label mapping for Piiranha-v1 model
# Corrected keys based on previous Confusion Matrix to match actual model output
piiranha_entity_mapping = {
    "GIVENNAME": "PERSON",
    "SURNAME": "PERSON",
    "CITY": "LOCATION",
    "GPE": "LOCATION", # If model occasionally outputs GPE
    "STREET": "STREET_ADDRESS",
    "BUILDINGNUM": "STREET_ADDRESS", # Building number is part of address
    "CREDITCARDNUMBER": "CREDIT_CARD",
    "EMAIL": "EMAIL_ADDRESS",
    "TELEPHONENUM": "PHONE_NUMBER",
    "SOCIALNUM": "US_SSN",
    "ZIPCODE": "ZIP_CODE",
    "DATEOFBIRTH": "DATE_TIME",
    "DRIVERLICENSENUM": "US_DRIVER_LICENSE",
    "ACCOUNTNUM": "O",
    "USERNAME": "O",
    "PASSWORD": "O",
    "IDCARDNUM": "O", # Or map to other ID type
    "TAXNUM": "O"
}

# 2. Instantiate this advanced PII-specific model
# Note: Downloading this model might take some time
advanced_pii_model = CustomBertNERWrapper(
    model_path_or_name="iiiorg/piiranha-v1-detect-personal-information",
    entity_mapping=piiranha_entity_mapping
)

evaluator = SpanEvaluator(model=advanced_pii_model)

evaluation_results = evaluator.evaluate_all(dataset=dataset)
results = evaluator.calculate_score(evaluation_results)

Loading BERT model: iiiorg/piiranha-v1-detect-personal-information ...


Loading weights:   0%|          | 0/200 [00:00<?, ?it/s]


skip words not provided, using default skip words. If you want the evaluation to not use skip words, pass skip_words=[]



Mapping entity values using this dictionary: {'GIVENNAME': 'PERSON', 'SURNAME': 'PERSON', 'CITY': 'LOCATION', 'GPE': 'LOCATION', 'STREET': 'STREET_ADDRESS', 'BUILDINGNUM': 'STREET_ADDRESS', 'CREDITCARDNUMBER': 'CREDIT_CARD', 'EMAIL': 'EMAIL_ADDRESS', 'TELEPHONENUM': 'PHONE_NUMBER', 'SOCIALNUM': 'US_SSN', 'ZIPCODE': 'ZIP_CODE', 'DATEOFBIRTH': 'DATE_TIME', 'DRIVERLICENSENUM': 'US_DRIVER_LICENSE', 'ACCOUNTNUM': 'O', 'USERNAME': 'O', 'PASSWORD': 'O', 'IDCARDNUM': 'O', 'TAXNUM': 'O'}
Running model CustomBertNERWrapper on dataset...
Finished running model on dataset


In [71]:
print("====== Piiranha-v1 Evaluation Results ======")

# Calculate and get confusion matrix
entities, confmatrix = results.to_confusion_matrix()

# Print formatted output
print("\nConfusion matrix:")
print(pd.DataFrame(confmatrix, columns=entities, index=entities))

print("\nPrecision and recall")
pprint(results)


Confusion matrix:
                   AGE  CREDIT_CARD  DATE_TIME  DOMAIN_NAME  EMAIL_ADDRESS  \
AGE                  0            0          0            0              0   
CREDIT_CARD          0          182          0            0              0   
DATE_TIME            0            0         37            0              0   
DOMAIN_NAME          0            0          0            0              0   
EMAIL_ADDRESS        0            0          0            0              6   
GPE                  0            0          0            0              0   
IBAN_CODE            0            0          0            0              0   
IP_ADDRESS           0            0          0            0              0   
LOCATION             0            0          0            0              0   
NRP                  0            0          0            0              0   
ORGANIZATION         0            0          0            0              0   
PERSON               0            0          

In [72]:
import os
from pathlib import Path
from presidio_evaluator.evaluation import Plotter

# 1. Set output folder path for charts
# Recommended to create in current working directory (cwd) for easy viewing and downloading in Colab
output_folder = Path(Path.cwd(), "plotter_output")

# Ensure folder exists, create if not to prevent FileNotFoundError
output_folder.mkdir(parents=True, exist_ok=True)

print(f"Preparing to generate plots, model name: {evaluator.model.name}")

# 2. Instantiate Plotter
# beta = 2 means calculating F2-Score (In PII detection, we usually care more about recall, so beta=2 is appropriate)
plotter = Plotter(
    results=results, # Pass results generated by evaluator.calculate_score()
    model_name=evaluator.model.name,
    save_as="svg",   # Save as high-res vector image
    beta=2
)

# 3. Generate and save plots
plotter.plot_scores(output_folder=output_folder)

print(f"âœ… Plots generated! Check folder in file browser: {output_folder.absolute()}")

Preparing to generate plots, model name: Custom BERT NER Model


âœ… Plots generated! Check folder in file browser: /content/plotter_output


In [73]:
import pandas as pd
import numpy as np

summary_data = []

# 1. Extract all obfuscation types
# Iterate dataset to extract obfuscation_type from metadata
obs_types = set([
    sample.metadata.get("obfuscation_type")
    for sample in dataset
    if sample.metadata and "obfuscation_type" in sample.metadata
])

if any(s.metadata is None or "obfuscation_type" not in s.metadata for s in dataset):
    obs_types.add("Original/None")

# 2. Group calculation by obfuscation type
for obs_type in obs_types:

    filtered_results = []
    for res, sample in zip(evaluation_results, dataset):
        current_type = sample.metadata.get("obfuscation_type") if sample.metadata else "Original/None"
        if current_type == obs_type:
            filtered_results.append(res)

    if not filtered_results:
        continue

    score = evaluator.calculate_score(filtered_results)

    p = score.pii_precision
    r = score.pii_recall
    f1 = score.pii_f

    # Calculate F2 Score (Beta=2) manually
    beta = 2
    if (beta**2 * p + r) > 0:
        f2 = (1 + beta**2) * (p * r) / ((beta**2 * p) + r)
    else:
        f2 = 0.0

    summary_data.append({
        "Obfuscation Type": obs_type,
        "Sample Count": len(filtered_results),
        "Precision": p,
        "Recall": r,
        "F1 Score": f1,
        "F2 Score": f2
    })

# 5. Convert to pretty DataFrame and print
df_summary = pd.DataFrame(summary_data)

# Sort by F2 Score descending
df_summary = df_summary.sort_values(by="F2 Score", ascending=False)

# Format percentage display
cols_to_format = ["Precision", "Recall", "F1 Score", "F2 Score"]
for col in cols_to_format:
    df_summary[col] = df_summary[col].apply(lambda x: f"{x:.2%}")

print("\n====== Obfuscation Types comparison (Piiranha-v1) - with F2 Score \n")
display(df_summary)





Unnamed: 0,Obfuscation Type,Sample Count,Precision,Recall,F1 Score,F2 Score
3,,1500,59.04%,42.36%,44.90%,44.90%
1,5-space,1500,53.58%,42.00%,43.90%,43.90%
0,textualization,1500,36.36%,23.30%,25.10%,25.10%
2,1-space,1500,17.35%,3.96%,4.68%,4.68%


modified mapping ones:

In [74]:
from transformers import pipeline
from typing import List, Dict, Optional
from presidio_evaluator import InputSample, span_to_tag
from presidio_evaluator.models import BaseModel
from presidio_evaluator.evaluation import SpanEvaluator
import pandas as pd
from pprint import pprint

# 2. Redefine robust prediction class with logic to merge adjacent same-type entities
class AdvancedBertNERWrapper(BaseModel):
    def __init__(
        self,
        model_path_or_name: str,
        entity_mapping: Optional[Dict[str, str]] = None
    ):
        super().__init__(entity_mapping=entity_mapping)
        self.name = "Advanced Piiranha Model"
        self.entity_mapping = entity_mapping or {}

        print(f"Loading model: {model_path_or_name} ...")
        self.pipeline = pipeline("ner", model=model_path_or_name, aggregation_strategy="simple")

    def predict(self, sample: InputSample, **kwargs) -> List[str]:
        predictions = self.pipeline(sample.full_text)

        # --- New Logic: Merge adjacent same-type entities (e.g. First Name + Last Name -> Full Name) ---
        merged_predictions = []
        if predictions:
            # Sort by start position
            predictions = sorted(predictions, key=lambda x: x['start'])

            current_pred = None

            for pred in predictions:
                # Get mapped label
                raw_label = pred["entity_group"]
                mapped_label = self.entity_mapping.get(raw_label, raw_label)

                # Create a new processed object with mapped label
                processed_pred = pred.copy()
                processed_pred['mapped_label'] = mapped_label

                if current_pred is None:
                    current_pred = processed_pred
                else:
                    # Check if adjacent and same label (allow small gap like 1-2 chars)
                    is_same_label = (current_pred['mapped_label'] == mapped_label)
                    # Tolerance of 2 allows space in "John Smith"
                    is_adjacent = (processed_pred['start'] - current_pred['end']) <= 2

                    if is_same_label and is_adjacent and mapped_label != "O":
                        # Merge! Update end position and score (average)
                        current_pred['end'] = processed_pred['end']
                        current_pred['score'] = (current_pred['score'] + processed_pred['score']) / 2
                        # Concatenate words for debugging
                        current_pred['word'] += " " + processed_pred['word']
                    else:
                        # Do not merge, save current, start next
                        merged_predictions.append(current_pred)
                        current_pred = processed_pred

            # Add the last one
            if current_pred:
                merged_predictions.append(current_pred)

        # ----------------------------------------------------------------------

        starts = []
        ends = []
        scores = []
        tags = []

        for pred in merged_predictions:
            starts.append(pred["start"])
            ends.append(pred["end"])
            scores.append(float(pred["score"]))
            tags.append(pred['mapped_label'])

        return span_to_tag(
            scheme="IO",
            text=sample.full_text,
            starts=starts,
            ends=ends,
            tokens=sample.tokens,
            scores=scores,
            tags=tags,
        )

    def batch_predict(self, dataset: List[InputSample], **kwargs) -> List[List[str]]:
        return [self.predict(sample, **kwargs) for sample in dataset]

# 3. Re-instantiate and run evaluation
# Fine-tune mapping based on id2label if necessary
final_mapping = {
    "GIVENNAME": "PERSON",
    "SURNAME": "PERSON",
    "CITY": "LOCATION",
    "GPE": "LOCATION",
    "STREET": "STREET_ADDRESS",
    "BUILDINGNUM": "STREET_ADDRESS",
    "CREDITCARDNUMBER": "CREDIT_CARD",
    "EMAIL": "EMAIL_ADDRESS",
    "TELEPHONENUM": "PHONE_NUMBER",
    "SOCIALNUM": "US_SSN",
    "ZIPCODE": "ZIP_CODE",
    "DATEOFBIRTH": "DATE_TIME",
    "DRIVERLICENSENUM": "US_DRIVER_LICENSE",
    "ACCOUNTNUM": "O",
    "USERNAME": "O",
    "PASSWORD": "O",
    "IDCARDNUM": "O",
    "TAXNUM": "O"
}

advanced_model_v2 = AdvancedBertNERWrapper(
    model_path_or_name="iiiorg/piiranha-v1-detect-personal-information",
    entity_mapping=final_mapping
)

evaluator_v2 = SpanEvaluator(model=advanced_model_v2)
evaluation_results_v2 = evaluator_v2.evaluate_all(dataset=dataset)
results_v2 = evaluator_v2.calculate_score(evaluation_results_v2)

print("\n====== Optimized Piiranha-v1 Results ======")
entities, confmatrix = results_v2.to_confusion_matrix()
print(pd.DataFrame(confmatrix, columns=entities, index=entities))
pprint(results_v2)

Loading model: iiiorg/piiranha-v1-detect-personal-information ...


Loading weights:   0%|          | 0/200 [00:00<?, ?it/s]


skip words not provided, using default skip words. If you want the evaluation to not use skip words, pass skip_words=[]



Mapping entity values using this dictionary: {'GIVENNAME': 'PERSON', 'SURNAME': 'PERSON', 'CITY': 'LOCATION', 'GPE': 'LOCATION', 'STREET': 'STREET_ADDRESS', 'BUILDINGNUM': 'STREET_ADDRESS', 'CREDITCARDNUMBER': 'CREDIT_CARD', 'EMAIL': 'EMAIL_ADDRESS', 'TELEPHONENUM': 'PHONE_NUMBER', 'SOCIALNUM': 'US_SSN', 'ZIPCODE': 'ZIP_CODE', 'DATEOFBIRTH': 'DATE_TIME', 'DRIVERLICENSENUM': 'US_DRIVER_LICENSE', 'ACCOUNTNUM': 'O', 'USERNAME': 'O', 'PASSWORD': 'O', 'IDCARDNUM': 'O', 'TAXNUM': 'O'}
Running model AdvancedBertNERWrapper on dataset...
Finished running model on dataset

                   AGE  CREDIT_CARD  DATE_TIME  DOMAIN_NAME  EMAIL_ADDRESS  \
AGE                  0            0          0            0              0   
CREDIT_CARD          0          182          0            0              0   
DATE_TIME            0            0         37            0              0   
DOMAIN_NAME          0            0          0            0              0   
EMAIL_ADDRESS        0            0     

In [75]:
# Generate plots for optimized results (results_v2)
plotter_v2 = Plotter(
    results=results_v2,
    model_name="Advanced Piiranha Model (Merged)",
    save_as="svg",
    beta=2
)

output_folder_v2 = Path(Path.cwd(), "plotter_output_v2")
output_folder_v2.mkdir(parents=True, exist_ok=True)

plotter_v2.plot_scores(output_folder=output_folder_v2)

print(f"âœ… Optimized model plots generated: {output_folder_v2.absolute()}")

âœ… Optimized model plots generated: /content/plotter_output_v2


In [76]:
import pandas as pd
import numpy as np

print("====== Optimized Piiranha-v1 Detailed Metrics (with F2-Score) ======")

# Get Precision and Recall dictionaries
precision_dict = getattr(results_v2, 'entity_precision_dict', {})
recall_dict = getattr(results_v2, 'entity_recall_dict', {})

# Get all entity labels
all_entities = sorted(set(precision_dict.keys()) | set(recall_dict.keys()))

data = []

for entity in all_entities:
    p = precision_dict.get(entity, 0.0)
    r = recall_dict.get(entity, 0.0)

    # Handle NaN (some entities never predicted, Precision is NaN)
    if pd.isna(p):
        p = 0.0
    if pd.isna(r):
        r = 0.0

    # Calculate F1 (Beta=1)
    if (p + r) > 0:
        f1 = 2 * (p * r) / (p + r)
    else:
        f1 = 0.0

    # Calculate F2 (Beta=2, prioritize Recall)
    # Formula: (1 + beta^2) * (p * r) / ((beta^2 * p) + r)
    beta = 2
    if (beta**2 * p + r) > 0:
        f2 = (1 + beta**2) * (p * r) / ((beta**2 * p) + r)
    else:
        f2 = 0.0

    data.append({
        "Entity": entity,
        "Precision": p,
        "Recall": r,
        "F1 Score": f1,
        "F2 Score": f2
    })

# Create DataFrame
df_metrics = pd.DataFrame(data).set_index("Entity")

# Format display
format_mapping = {
    "Precision": "{:.2%}",
    "Recall": "{:.2%}",
    "F1 Score": "{:.2%}",
    "F2 Score": "{:.2%}"
}

print("\n--- Per-Entity Detailed Scores ---")
display(df_metrics.style.format(format_mapping))

# 2. Print confusion matrix
print("\n--- Confusion Matrix ---")
entities, confmatrix = results_v2.to_confusion_matrix()
print(pd.DataFrame(confmatrix, columns=entities, index=entities))


--- Per-Entity Detailed Scores ---


Unnamed: 0_level_0,Precision,Recall,F1 Score,F2 Score
Entity,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
AGE,0.00%,0.00%,0.00%,0.00%
CREDIT_CARD,98.91%,33.83%,50.42%,38.96%
DATE_TIME,92.50%,7.91%,14.57%,9.68%
DOMAIN_NAME,0.00%,0.00%,0.00%,0.00%
EMAIL_ADDRESS,23.08%,3.06%,5.41%,3.70%
GPE,0.00%,0.00%,0.00%,0.00%
IBAN_CODE,0.00%,0.00%,0.00%,0.00%
IP_ADDRESS,0.00%,0.00%,0.00%,0.00%
LOCATION,0.00%,0.00%,0.00%,0.00%
NRP,0.00%,0.00%,0.00%,0.00%



--- Confusion Matrix ---
                   AGE  CREDIT_CARD  DATE_TIME  DOMAIN_NAME  EMAIL_ADDRESS  \
AGE                  0            0          0            0              0   
CREDIT_CARD          0          182          0            0              0   
DATE_TIME            0            0         37            0              0   
DOMAIN_NAME          0            0          0            0              0   
EMAIL_ADDRESS        0            0          0            0              6   
GPE                  0            0          0            0              0   
IBAN_CODE            0            0          0            0              0   
IP_ADDRESS           0            0          0            0              0   
LOCATION             0            0          0            0              0   
NRP                  0            0          0            0              0   
ORGANIZATION         0            0          0            0              0   
PERSON               0            0   

In [77]:
import pandas as pd
import numpy as np

summary_data = []

# 1. Extract all obfuscation types
# Iterate dataset to extract obfuscation_type from metadata
obs_types = set([
    sample.metadata.get("obfuscation_type")
    for sample in dataset
    if sample.metadata and "obfuscation_type" in sample.metadata
])

# Include None for original data
if any(s.metadata is None or "obfuscation_type" not in s.metadata for s in dataset):
    obs_types.add("Original/None")

# 2. Group calculation by obfuscation type
for obs_type in obs_types:

    # Zip results and samples to filter by type
    filtered_results = []
    for res, sample in zip(evaluation_results_v2, dataset):
        current_type = sample.metadata.get("obfuscation_type") if sample.metadata else "Original/None"
        if current_type == obs_type:
            filtered_results.append(res)

    # Skip empty groups
    if not filtered_results:
        continue

    # 3. Calculate score for this subset
    score = evaluator.calculate_score(filtered_results)

    # 4. Save results
    summary_data.append({
        "Obfuscation Type": obs_type,
        "Sample Count": len(filtered_results),
        "Precision": score.pii_precision,
        "Recall": score.pii_recall,
        "F1 Score": score.pii_f
    })

# 5. Convert to DataFrame and print
df_summary = pd.DataFrame(summary_data)

# Sort by F1 Score descending
df_summary = df_summary.sort_values(by="F1 Score", ascending=False)

# Format as percentage
df_summary["Precision"] = df_summary["Precision"].apply(lambda x: f"{x:.2%}")
df_summary["Recall"] = df_summary["Recall"].apply(lambda x: f"{x:.2%}")
df_summary["F1 Score"] = df_summary["F1 Score"].apply(lambda x: f"{x:.2%}")

print("\n====== Obfuscation Types comparison\n")
# Display pretty HTML table in Jupyter
display(df_summary)





Unnamed: 0,Obfuscation Type,Sample Count,Precision,Recall,F1 Score
3,,1500,59.04%,42.36%,44.90%
1,5-space,1500,53.49%,41.93%,43.83%
0,textualization,1500,36.48%,23.38%,25.19%
2,1-space,1500,17.35%,3.96%,4.68%
