# **PIIranha for PI redaction**

The following notebook contains the initial usage of the PIIranha model to evaluate whether or not it is a good performer for the project.

Step 1: imports

In [8]:
!pip install trafilatura
import torch
from transformers import AutoTokenizer, AutoModelForTokenClassification
from trafilatura.core import *
from trafilatura import fetch_url
import re
import os



Step 2: load model

In [9]:
# Load PIIranha model
tokenizer = AutoTokenizer.from_pretrained("iiiorg/piiranha-v1-detect-personal-information")
model = AutoModelForTokenClassification.from_pretrained("iiiorg/piiranha-v1-detect-personal-information")
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

DebertaV2ForTokenClassification(
  (deberta): DebertaV2Model(
    (embeddings): DebertaV2Embeddings(
      (word_embeddings): Embedding(251000, 768, padding_idx=0)
      (LayerNorm): LayerNorm((768,), eps=1e-07, elementwise_affine=True)
      (dropout): StableDropout()
    )
    (encoder): DebertaV2Encoder(
      (layer): ModuleList(
        (0-11): 12 x DebertaV2Layer(
          (attention): DebertaV2Attention(
            (self): DisentangledSelfAttention(
              (query_proj): Linear(in_features=768, out_features=768, bias=True)
              (key_proj): Linear(in_features=768, out_features=768, bias=True)
              (value_proj): Linear(in_features=768, out_features=768, bias=True)
              (pos_dropout): StableDropout()
              (dropout): StableDropout()
            )
            (output): DebertaV2SelfOutput(
              (dense): Linear(in_features=768, out_features=768, bias=True)
              (LayerNorm): LayerNorm((768,), eps=1e-07, elementwise_affine=Tr

Step 3: Define function for PII detection and redaction

In [10]:
# use model for PII detection and removal
def mask_pii(text, aggregate_redaction=True):
    # Tokenize input text
    inputs = tokenizer(text, return_tensors="pt", truncation=True, padding=True)
    inputs = {k: v.to(device) for k, v in inputs.items()}

    # Get the model predictions
    with torch.no_grad():
        outputs = model(**inputs)

    # Get the predicted labels
    predictions = torch.argmax(outputs.logits, dim=-1)

    # Convert token predictions to word predictions
    encoded_inputs = tokenizer.encode_plus(text, return_offsets_mapping=True, add_special_tokens=True)
    offset_mapping = encoded_inputs['offset_mapping']

    masked_text = list(text)
    is_redacting = False
    redaction_start = 0
    current_pii_type = ''
    count = 0

    for i, (start, end) in enumerate(offset_mapping):
        if start == end:  # Special token
            continue

        label = predictions[0][i].item()
        if label != model.config.label2id['O']:  # Non-O label
            pii_type = model.config.id2label[label]
            if not is_redacting:
                is_redacting = True
                redaction_start = start
                current_pii_type = pii_type
            elif not aggregate_redaction and pii_type != current_pii_type:
                # End current redaction and start a new one
                apply_redaction(masked_text, redaction_start, start, current_pii_type, aggregate_redaction)
                redaction_start = start
                current_pii_type = pii_type
                count += 1
        else:
            if is_redacting:
                apply_redaction(masked_text, redaction_start, end, current_pii_type, aggregate_redaction)
                is_redacting = False
                count += 1

    # Handle case where PII is at the end of the text
    if is_redacting:
        apply_redaction(masked_text, redaction_start, len(masked_text), current_pii_type, aggregate_redaction)
        count += 1

    return ''.join(masked_text), count

def apply_redaction(masked_text, start, end, pii_type, aggregate_redaction):
    for j in range(start, end):
        masked_text[j] = ''
    if aggregate_redaction:
        masked_text[start] = '[redacted]'
    else:
        masked_text[start] = f'[{pii_type}]'

Step 4: perform a Trafilatura extraction and PII removal on the ECE404 webpage as an example

In [11]:
# test a specific instance: the ECE 404 homepage
ece404html_content = fetch_url("https://engineering.purdue.edu/ece404/")
text = extract(ece404html_content, favor_recall=True) # favor_precision=True will cut out noise, favor_recall=True will keep more in
redacted_text, count = mask_pii(str(text), aggregate_redaction=False)
print("Total instances of PI detected in ECE404 homepage: " + str(count))
print(str(redacted_text))

Asking to truncate to max_length but no maximum length is provided and the model has no predefined maximum length. Default to no truncation.


Total instances of PI detected in ECE404 homepage: 13
Instructor:[I-GIVENNAME][I-SURNAME]
Professor, ECE
E-mail:[I-EMAIL]You must place the string 'ece404' in the subject line to get past your instructor's notorious spam filter)
Graduate TAs:[I-GIVENNAME][I-SURNAME]-
E-mail:[I-USERNAME][I-EMAIL][I-USERNAME]
Joseph Wang
-
E-mail:[I-USERNAME][I-EMAIL]
Adrien Dubois
-
E-mail:[I-USERNAME][I-EMAIL][I-USERNAME]
Lecture Location and Time
-
TuTh: 6:00 PM - 7:15 PM, PHYS 112
Course Description
-
Beyond question, computer and network security has emerged as one of
the most important subjects of study in modern times. Even the minutest
details of our lives now depend on our computers and networks working
with our trust that the information that is private to us will not fall
in the hands of those with ill intent. The two major components of
computer and network security are cryptography and what is known as
systems-oriented security. For a good education in computer and network
security, you have

Step 5: mount subset of GovDocs1 dataset from Google Drive for testing (this section of code is not reproducible unless you download GovDocs data yourself at this link https://corp.digitalcorpora.org/corpora/files/govdocs1/threads/ )

In [12]:
from google.colab import drive
drive.mount('/content/drive')
govdocs_dir ='/content/drive/My Drive/ECE570/govdocs_testingdata'

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


Step 6: create a list of the html files from the dataset (again not reproducible without govdocs data downloaded locally, but there was no way to access these files without local download)

In [13]:
# Function to identify and read HTML files from the dataset
def find_html_files(directory):
    html_files = []
    for root, dirs, files in os.walk(directory):
        for file in files:
            # Check if the file has an HTML extension
            if file.endswith(".html") or file.endswith(".htm"):
                html_files.append(os.path.join(root, file))
    return html_files

# Find HTML files in the dataset
html_files = find_html_files(govdocs_dir)

Step 7: for each html file from GovDocs1 dataset, extract content with Trafilatura and use the model to remove PII (again not reproducible if you have not downloaded a GovDocs1 thread and saved it to your Drive)

In [14]:
# helping with RAM memory errors
os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "expandable_segments:True"
torch.cuda.empty_cache()

# Process the HTML content with Trafilatura's extract() function
def process_html(html_content, chunk_size=512):
    text = extract(html_content, favor_recall=True)
    if text:
        redacted_text = ""
        instances = 0
        for i in range(0, len(text), chunk_size):
            chunk = text[i:i + chunk_size]
            redacted_chunk, chunk_instances = mask_pii(chunk, aggregate_redaction=False)
            redacted_text += redacted_chunk
            instances += chunk_instances
        return redacted_text, instances
    else:
        return None

# Function to read the content of an HTML file
def read_html_file(file_path):
    with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
        return f.read()

# for each HTML file, extract content and detect and remove PII
if html_files:
    i = 0
    total_instances = 0
    chunk_size = 512
    i = 0
    for file in html_files:
        html_content = read_html_file(file)
        redacted_content, instances = process_html(html_content)
        total_instances += instances
        if i == 13: # this was selected because it is an example of reasonably small enough length to display
            ex_for_print = redacted_content
        # if redacted_content:
        #     try:
        #         with open(f'./PIIranha_filtered_content/test{i}.txt', 'w', encoding='utf-8') as fp:
        #             fp.write(redacted_content)
        #     except Exception as e:
        #         print(f"Error writing to file: {e}")
        # else:
        #     print(f'Failed to extract text from {file}.')
        i += 1
    print("Total instances of PI detected in GovDocs subset: " + str(total_instances))
    print("Example: " + str(ex_for_print))
else:
    print("No HTML files found in the directory.")

Total instances of PI detected in GovDocs subset: 6574
Example: Statement of[I-USERNAME][I-SURNAME] nominated by president[I-USERNAME] be a member of the board of trustees of the[I-USERNAME]scholarship and excellence in national environmental policy foundation
To the united states senate committee on environment and public works
Submitted march 25, 2003
Mr. Chairman and Members of the Committee, thank you for the opportunity to provide this statement in support of my nomination to be a member of the Board of Trustees of the Morris K. Udall Foundation. I am honored and grateful that President Bush saw fit to nominate me to this position and, if confirmed, look forward to continuing my public service by helping to advance the mission of the Udall Foundation.
My professional career and personal background has provided me with valuable experience and perspective to bring to the Udall Foundation. I grew up in northern rural[I-CITY] to the shores of[I-CITY] Following my undergraduate educati

Step 8: Individual example testing of SSNs and phone numbers

In [15]:
text = "My name is John Deere. My SSN is 123-45-6789 and my phone number is (123) 456-7890."
redacted_text, count = mask_pii(text, aggregate_redaction=False)
print(redacted_text)
print("Total instances of PI detected: " + str(count))

My name is[I-GIVENNAME][I-SURNAME] My SSN is[I-SOCIALNUM] my phone number is[I-TELEPHONENUM]
Total instances of PI detected: 4


Step 9: download labeled dataset for accuracy testing

The dataset can be found here: https://huggingface.co/datasets/ai4privacy/pii-masking-400k

In [16]:
!pip install datasets
from datasets import load_dataset
ds = load_dataset("ai4privacy/pii-masking-400k")

Collecting datasets
  Downloading datasets-3.1.0-py3-none-any.whl.metadata (20 kB)
Collecting dill<0.3.9,>=0.3.0 (from datasets)
  Downloading dill-0.3.8-py3-none-any.whl.metadata (10 kB)
Collecting xxhash (from datasets)
  Downloading xxhash-3.5.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (12 kB)
Collecting multiprocess<0.70.17 (from datasets)
  Downloading multiprocess-0.70.16-py310-none-any.whl.metadata (7.2 kB)
Collecting fsspec<=2024.9.0,>=2023.1.0 (from fsspec[http]<=2024.9.0,>=2023.1.0->datasets)
  Downloading fsspec-2024.9.0-py3-none-any.whl.metadata (11 kB)
Downloading datasets-3.1.0-py3-none-any.whl (480 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m480.6/480.6 kB[0m [31m8.5 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading dill-0.3.8-py3-none-any.whl (116 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m116.3/116.3 kB[0m [31m12.5 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading fsspec-2024.9.0-py3-none-any.whl (

README.md:   0%|          | 0.00/15.6k [00:00<?, ?B/s]

1en.jsonl:   0%|          | 0.00/84.8M [00:00<?, ?B/s]

de.jsonl:   0%|          | 0.00/82.7M [00:00<?, ?B/s]

es.jsonl:   0%|          | 0.00/42.5M [00:00<?, ?B/s]

fr.jsonl:   0%|          | 0.00/84.1M [00:00<?, ?B/s]

it.jsonl:   0%|          | 0.00/79.2M [00:00<?, ?B/s]

nl.jsonl:   0%|          | 0.00/38.4M [00:00<?, ?B/s]

1en.jsonl:   0%|          | 0.00/21.3M [00:00<?, ?B/s]

de.jsonl:   0%|          | 0.00/20.7M [00:00<?, ?B/s]

es.jsonl:   0%|          | 0.00/10.7M [00:00<?, ?B/s]

fr.jsonl:   0%|          | 0.00/21.1M [00:00<?, ?B/s]

it.jsonl:   0%|          | 0.00/19.8M [00:00<?, ?B/s]

data/validation/nl.jsonl:   0%|          | 0.00/9.67M [00:00<?, ?B/s]

Generating train split:   0%|          | 0/325517 [00:00<?, ? examples/s]

Generating validation split:   0%|          | 0/81379 [00:00<?, ? examples/s]

Step 10: Split the data, run through model, and compute accuracy based on location of redactions in the strings

In [17]:
# select 1,000 rows from the validation set as no test set is available
data = ds['validation']
data = data[0:1000]
y_true = data['masked_text']
X = data['source_text']
y_pred = []
for x in X:
  y = mask_pii(x, aggregate_redaction=False)
  y_pred.append(y)

def normalize_text(text):
    text = text.replace(" ", "")
    text = text.replace("\n", "")
    return text.lower()

# finds each redaction and adds the redaction and the characters to its left and right into a tuple
# returns list of the tuples
def extract_redactions_with_context(text):
    redactions_with_context = []

    # Use regular expression to find all redactions and their positions
    for match in re.finditer(r'\[.*?\]', text):
        start = match.start()
        end = match.end()
        left_char = text[start - 1] if start > 0 else ''  # Get the left character
        right_char = text[end] if end < len(text) else ''  # Get the right character
        redactions_with_context.append((match.group(0), left_char, right_char))  # Store redaction and its context

    return redactions_with_context

# wrote own accuracy score function to compute accuracy of redactions based on what characters are to
# the left and right of each [redacted] segment
def compute_accuracy(y_true, y_pred):
    # normalize both true and predicted texts by removing all spaces, newlines, and converting to lowercase
    y_true_normalized = [normalize_text(str(t)) for t in y_true]
    y_pred_normalized = [normalize_text(str(p[0])) for p in y_pred]
    total_redactions = 0
    correct_matches = 0

    for true_text, pred_text in zip(y_true_normalized, y_pred_normalized):
      # find each redaction and the characters to its left and right, returns list of tuples
      true_redactions = extract_redactions_with_context(true_text)
      pred_redactions = extract_redactions_with_context(pred_text)

      # count total redactions in the true text
      total_redactions += len(pred_redactions)

      # check surrounding characters of predicted redactions against true redactions
      i = 0
      for pred_redaction, pred_left, pred_right in pred_redactions:
        if i < len(true_redactions) and true_redactions[i][1] == pred_left: # correct character on the left
          # The start of the redaction is considered more important than the end
          correct_matches += 1
        elif pred_left == ']': # the case when there are two back to back redactions, so need to check the previous
            if i-1 >= 0 and i < len(true_redactions) and pred_redactions[i-1][1] == true_redactions[i][1]:
              correct_matches += .5
        elif i-1 >= 0 and i-1 < len(true_redactions) and pred_left == true_redactions[i-1][1]: # the case where two back to back redactions in true but pred only made one
          correct_matches += .5

        if i < len(true_redactions) and true_redactions[i][2] == pred_right: # correct character on the right
          correct_matches += .5
        elif pred_right == '[': # the case when there are two back to back redactions in pred, so need to check the next one
            if i+1 < len(pred_redactions) and i < len(true_redactions) and pred_redactions[i+1][2] == true_redactions[i][2]:
              correct_matches += .5
        elif i-1 >= 0 and i-1 < len(true_redactions) and pred_right == true_redactions[i-1][2]: # the case where two back to back redactions in true but pred only made one
          correct_matches += .5

        i += 1

    # compute accuracy as the number of correct matches over the total redactions
    accuracy = correct_matches / total_redactions if total_redactions > 0 else 0
    return accuracy

# Calculate accuracy
accuracy = compute_accuracy(y_true, y_pred)
print(f'Accuracy: {accuracy * 100:.2f}%')

Accuracy: 93.47%


Step 11: Quantify the predicted redactions of the character-based PII categories

In [24]:
# finds each redaction and adds the redaction to a list
def extract_redactions(text):
    redactions = []

    # Use regular expression to find all redactions and their positions
    for match in re.finditer(r'\[.*?\]', text):
        start = match.start()
        end = match.end()
        redactions.append(match.group(0))

    return redactions

def compute_redaction_counts(y_pred):
  ssns = 0
  phone_numbers = 0
  names = 0
  emails = 0
  credit_cards = 0
  bank_accounts = 0
  total_redactions = 0

  for pred_text in y_pred:
    redactions = extract_redactions(pred_text[0])
    total_redactions += len(redactions)
    for redaction in redactions:
      if 'NAME' in redaction:
        names += 1
      elif 'SOCIALNUM' in redaction:
        ssns += 1
      elif 'PHONE' in redaction:
        phone_numbers += 1
      elif 'EMAIL' in redaction:
        emails += 1
      elif 'CREDITCARD' in redaction:
        credit_cards += 1
      elif 'ACCOUNTNUM' in redaction:
        bank_accounts += 1

  return total_redactions, ssns, phone_numbers, names, emails, credit_cards, bank_accounts

total_redactions, ssns, phone_numbers, names, emails, credit_cards, bank_accounts = compute_redaction_counts(y_pred)
print("Total redactions: " + str(total_redactions))
print("SSNs: " + str(ssns))
print("Phone numbers: " + str(phone_numbers))
print("Names: " + str(names))
print("Emails: " + str(emails))
print("Credit cards: " + str(credit_cards))
print("Bank accounts: " + str(bank_accounts))

Total redactions: 3753
SSNs: 123
Phone numbers: 230
Names: 1106
Emails: 256
Credit cards: 88
Bank accounts: 119


In [19]:
!pip freeze > requirements.txt