<a href="https://www.kaggle.com/code/scr0ll0/modeling-pii-data-detection?scriptVersionId=160750625" target="_blank"><img align="left" alt="Kaggle" title="Open in Kaggle" src="https://kaggle.com/static/images/open-in-kaggle.svg"></a>

We will be using Presidio: a Microsoft framework for PII detection. Credit to pjmathematician for both the idea and the baseline code: https://www.kaggle.com/code/pjmathematician/pii-eda-presidio-baseline

# Downloading and Loading Libraries

In [1]:
VALIDATE = True

In [2]:
%%capture
!pip install presidio_analyzer --no-index --find-links=file:///kaggle/input/presidio-wheels/presidio

In [3]:
import ast
import json
import numpy as np
import pandas as pd
import spacy
from tqdm.auto import tqdm
from dateutil import parser
from presidio_analyzer import AnalyzerEngine, EntityRecognizer, PatternRecognizer, Pattern, RecognizerResult
from presidio_analyzer.context_aware_enhancers import LemmaContextAwareEnhancer
from presidio_analyzer.nlp_engine import NlpArtifacts, NlpEngineProvider
from presidio_analyzer.predefined_recognizers import PhoneRecognizer
from presidio_analyzer.recognizer_registry import RecognizerRegistry
from sklearn.model_selection import train_test_split

# Loading Data

Credit to both moth and Valentin Werner for the new training dataset: https://www.kaggle.com/code/valentinwerner/fix-punctuation-tokenization-external-dataset

In [4]:
train = pd.read_json('/kaggle/input/pii-detection-removal-from-educational-data/train.json')
test = pd.read_json('/kaggle/input/pii-detection-removal-from-educational-data/test.json')

# Functions

Sources:

tokens2index, find_or_next_larger, count_trailing_whitespaces, is_valid_date:

pjmathematician: https://www.kaggle.com/code/pjmathematician/pii-eda-presidio-baseline

leonshangguan: https://www.kaggle.com/code/leonshangguan/modify-of-pii-detect-study

pii_fbeta_score:  

Amed: https://www.kaggle.com/code/amedprof/pii-evaluation-metric/notebook

In [5]:
def tokens2index(row):
    tokens  = row['tokens']
    start_ind = []
    end_ind = []
    prev_ind = 0
    for tok in tokens:
        start = prev_ind + row['full_text'][prev_ind:].index(tok)
        end = start+len(tok)
        start_ind.append(start)
        end_ind.append(end)
        prev_ind = end
    return start_ind, end_ind

In [6]:
def find_or_next_larger(arr, target):
    left, right = 0, len(arr) - 1

    while left <= right:
        mid = (left + right) // 2

        if arr[mid] == target:
            return mid
        elif arr[mid] < target:
            left = mid + 1
        else:
            right = mid - 1
    return left

In [7]:
def count_trailing_whitespaces(word):
    return len(word) - len(word.rstrip())

In [8]:
def is_valid_date(text):
    try:
        parsed_date = parser.parse(text)
        return True
    except:
        return False

In [9]:
def pii_fbeta_score(pred_df, gt_df,beta=5):
    """
    Parameters:
    - pred_df (DataFrame): DataFrame containing predicted PII labels.
    - gt_df (DataFrame): DataFrame containing ground truth PII labels.
    - beta (float): The beta parameter for the F-beta score, controlling the trade-off between precision and recall.

    Returns:
    - float: Micro F-beta score.
    """   
    df = pred_df.merge(gt_df,how='outer',on=['document',"token"],suffixes=('_pred','_gt'))

    df['cm'] = ""

    df.loc[df.label_gt.isna(),'cm'] = "FP"


    df.loc[df.label_pred.isna(),'cm'] = "FN"
    df.loc[(df.label_gt.notna()) & (df.label_gt!=df.label_pred),'cm'] = "FN"

    df.loc[(df.label_pred.notna()) & (df.label_gt.notna()) & (df.label_gt==df.label_pred),'cm'] = "TP"
    
    FP = (df['cm']=="FP").sum()
    FN = (df['cm']=="FN").sum()
    TP = (df['cm']=="TP").sum()

    s_micro = (1+(beta**2))*TP/(((1+(beta**2))*TP) + ((beta**2)*FN) + FP)

    return s_micro

# Modeling

In [10]:
if VALIDATE:
    x = pd.DataFrame(train)
    y = x['labels']
    x = x.drop(columns='labels')
    train_x, val_x, train_y, val_y = train_test_split(x, y, test_size=0.25, random_state=0)

In [11]:
ALLOW_LIST = []
DENY_LIST_EMAIL = []
DENY_LIST_ADDRESS = []
DENY_LIST_URL = []
DENY_LIST_NAME = []
DENY_LIST_PHONE = []
DENY_LIST_ID = []

In [12]:
#"Training"
#Iterate through all of the data, identify tokens that are labels, add to allow/deny list

if VALIDATE:
    tokens = train_x['tokens'].apply(pd.Series).stack().reset_index(drop=True).tolist()
    labels = train_y.apply(pd.Series).stack().reset_index(drop=True).tolist()
else:
    tokens = train['tokens'].apply(pd.Series).stack().reset_index(drop=True).tolist()
    labels = train['labels'].apply(pd.Series).stack().reset_index(drop=True).tolist()

for i in set(labels):
    indices = [j for j in range(len(labels)) if labels[j] == i]
    if i == 'O':
        ALLOW_LIST.extend([tokens[i] for i in indices])
    if i == 'B-EMAIL':
        DENY_LIST_EMAIL.extend([tokens[i] for i in indices])
    elif i in ['B-STREET_ADDRESS', 'I-STREET_ADDRESS']:
        DENY_LIST_ADDRESS.extend([tokens[i] for i in indices])
    elif i in ['B-URL_PERSONAL', 'I-URL_PERSONAL']:
        DENY_LIST_URL.extend([tokens[i] for i in indices])
    elif i in ['B-NAME_STUDENT', 'I-NAME_STUDENT']:
        DENY_LIST_NAME.extend([tokens[i] for i in indices])
    elif i in ['B-PHONE_NUM', 'I-PHONE_NUM']:
        DENY_LIST_PHONE.extend([tokens[i] for i in indices])
    elif i in ['B-ID_NUM', 'I-ID_NUM']:
        DENY_LIST_ID.extend([tokens[i] for i in indices])
    else:
        continue

Credit to leonshangguan for the Pattern Recognizers: https://www.kaggle.com/code/leonshangguan/modify-of-pii-detect-study/notebook?scriptVersionId=159973328 


In [13]:
address_regex = r'\b\d+\s+\w+(\s+\w+)*\s+((st(\.)?)|(ave(\.)?)|(rd(\.)?)|(blvd(\.)?)|(ln(\.)?)|(ct(\.)?)|(dr(\.)?))\b'
address_pattern = Pattern(name="address", regex=address_regex, score=0.5)
address_recognizer = PatternRecognizer(supported_entity="ADDRESS_CUSTOM", patterns = [address_pattern], context=["st", "Apt"], deny_list=DENY_LIST_ADDRESS)

email_regex = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'
email_pattern = Pattern(name="email address", regex=email_regex, score=0.5)
email_recognizer = PatternRecognizer(supported_entity="EMAIL_CUSTOM", patterns = [email_pattern], deny_list=DENY_LIST_EMAIL)

url_regex = r'https?://\S+|www\.\S+'
url_pattern = Pattern(name="url", regex=url_regex, score=0.5)
url_recognizer = PatternRecognizer(supported_entity="URL_CUSTOM", patterns = [url_pattern], deny_list=DENY_LIST_URL)

phone_recognizer = PhoneRecognizer(context=['phone', 'number', 'telephone', 'cell',
                                            'cellphone', 'mobile', 'call', 'ph',
                                            'tel', 'mobile', 'Email'])

Credit to Microsoft for the Numbers Recognizer, which can identify numbers within words like "Fifty": https://microsoft.github.io/presidio/tutorial/03_rule_based/

In [14]:
class NumbersRecognizer(EntityRecognizer):

    expected_confidence_level = 0.7  # expected confidence level for this recognizer

    def load(self) -> None:
        """No loading is required."""
        pass

    def analyze(self, text: str, entities: list[str], nlp_artifacts: NlpArtifacts) -> list[RecognizerResult]:
        """
        Analyzes test to find tokens which represent numbers (either 123 or One Two Three).
        """
        results = []

        # iterate over the spaCy tokens, and call `token.like_num`
        for token in nlp_artifacts.tokens:
            if token.like_num:
                result = RecognizerResult(
                    entity_type="NUMBER",
                    start=token.idx,
                    end=token.idx + len(token),
                    score=self.expected_confidence_level,
                )
                results.append(result)
        return results

new_numbers_recognizer = NumbersRecognizer(supported_entities=["NUMBER"])

More credit to Microsoft for the engine configs: https://microsoft.github.io/presidio/analyzer/customizing_nlp_models/


In [15]:
configuration = {
    "nlp_engine_name": "spacy",
    "models": [{"lang_code": "en", "model_name": "en_core_web_lg"}],
}
provider = NlpEngineProvider(nlp_configuration=configuration)
nlp_engine = provider.create_engine()

In [16]:
registry = RecognizerRegistry()
registry.load_predefined_recognizers()
registry.add_recognizer(address_recognizer)
registry.add_recognizer(email_recognizer)
registry.add_recognizer(url_recognizer)
registry.add_recognizer(new_numbers_recognizer)
registry.add_recognizer(phone_recognizer)

In [17]:
analyzer = AnalyzerEngine(supported_languages=['en'],
                          registry=registry,
                          nlp_engine=nlp_engine,
                          context_aware_enhancer=LemmaContextAwareEnhancer(
                              context_similarity_factor=0.6,
                              min_score_with_context_similarity=0.4
                          ))

## Training/Testing

In [18]:
preds = []

In [19]:
if VALIDATE:
    test = val_x

In [20]:
temp = test.apply(lambda x: tokens2index(x), axis=1)
test['start'] = temp.apply(lambda x: x[0])
test['end'] = temp.apply(lambda x: x[1])

In [21]:
for i, d in enumerate(tqdm(test.iterrows())):
    results = analyzer.analyze(text=d[1]['full_text'],
                               entities=["PHONE_NUMBER","PERSON","URL_CUSTOM","EMAIL_ADDRESS",
                                         "EMAIL_CUSTOM","ADDRESS_CUSTOM","US_SSN", "US_ITIN",
                                         "US_PASSPORT", "US_BANK_NUMBER","USERNAME"],
                               allow_list=ALLOW_LIST,
                               language='en', 
                               score_threshold=0.005)
    pre_preds = []
    for r in results:
        s = find_or_next_larger(d[1]['start'], r.start)
        end = r.end
        word = d[1]['full_text'][r.start:r.end]
        end = end - count_trailing_whitespaces(word)
        temp_preds = [s]
        try:
            while d[1]['end'][s+1] <= end:
                temp_preds.append(s+1)
                s +=1
        except:
            pass
        
        if r.entity_type == 'PHONE_NUMBER':
            if is_valid_date(word):
                continue
            label =  'PHONE_NUM'
        if r.entity_type == 'PERSON':
            label =  'NAME_STUDENT'
        if r.entity_type == 'URL_CUSTOM':
            label = 'URL_PERSONAL'
        if r.entity_type == 'EMAIL_ADDRESS' or r.entity_type == 'EMAIL_CUSTOM':
            label = "EMAIL"
        if r.entity_type == 'ADDRESS_CUSTOM':
            label = 'STREET_ADDRESS'
        if r.entity_type in ['US_SSN', 'US_ITIN', 'US_PASSPORT', 'US_BANK_NUMBER']:
            label = 'ID_NUM'
        if r.entity_type == 'USERNAME':
            label =  'USERNAME'

        for p in temp_preds:
            if len(pre_preds) > 0:
                if pre_preds[-1]['rlabel'] == r.entity_type and ((p - pre_preds[-1]['token'])==1):
                    label_f = "I-"+label
                else:
                    label_f = "B-"+label
            else:
                label_f = "B-"+label
            pre_preds.append(({
                    "document":d[1]['document'],
                    "token":p,
                    "label":label_f,
                    "rlabel":r.entity_type
                }))
    preds.extend(pre_preds)

0it [00:00, ?it/s]

# Submission

In [22]:
submission = pd.DataFrame(preds).iloc[:,:-1].reset_index()
submission.columns = ['row_id','document', 'token', 'label']
submission

Unnamed: 0,row_id,document,token,label
0,0,15180,113,B-STREET_ADDRESS
1,1,15180,208,B-STREET_ADDRESS
2,2,15180,345,B-STREET_ADDRESS
3,3,15180,447,B-STREET_ADDRESS
4,4,15180,517,B-STREET_ADDRESS
...,...,...,...,...
18149,18149,19834,528,I-STREET_ADDRESS
18150,18150,12604,65,B-STREET_ADDRESS
18151,18151,12604,127,B-STREET_ADDRESS
18152,18152,12604,150,B-STREET_ADDRESS


In [23]:
if VALIDATE:
    temp = val_x[['document']].join(val_y)
    dictionary = temp['labels'].apply(lambda x: {'indx': list(range(len(x))), 'vals': x})
    indices = dictionary.apply(lambda x: x['indx']).explode()
    values = dictionary.apply(lambda x: x['vals']).explode()
    ground_truth = pd.concat([indices, values], axis=1).reset_index()
    ground_truth['document'] = ground_truth['index'].apply(lambda x: temp['document'][x])
    ground_truth = ground_truth.drop(columns='index')
    ground_truth.columns = ['token', 'label', 'document']
    ground_truth = ground_truth[ground_truth['label'] != 'O']
    ground_truth = ground_truth.reset_index(names=['row_id'])
    print(pii_fbeta_score(submission, ground_truth, 5))
else:
    submission.to_csv('submission.csv', index = False)

0.44260599793174765
