#### This notebook is modified from <a href="https://www.kaggle.com/code/pjmathematician/pii-eda-presidio-baseline">PII EDA Presidio Baseline</a> and <a href="https://www.kaggle.com/code/yunsuxiaozi/pii-detect-study-notebook">PII detect study notebook</a>. 

## Modifications 

#### I add my own address_recognizer and email_recognizer, URL_recognizer, and add a black list to filter potential public urls and date checker to filter noisy phone numbers. I also added Chinese note for my modifications.

### Install presidio

In [4]:
#安装python库 presidio_analyzer 不从python库里下载,而是从给定的链接处下载,更新到最新版本,并减少输出信息.
!pip install -U -q presidio_analyzer --no-index --find-links=/Users/0ne/Programming/Kaggle/PIIDetect/data/presidio

### Import  necessary libraries

In [2]:
import json
import pandas as pd

# Presidio
from presidio_analyzer import AnalyzerEngine
from presidio_analyzer.nlp_engine import NlpEngineProvider
from tqdm import tqdm
from typing import List
import pprint
import re

from presidio_analyzer import (
    AnalyzerEngine,
    PatternRecognizer,
    EntityRecognizer,
    Pattern,
    RecognizerResult,
)
from presidio_analyzer.recognizer_registry import RecognizerRegistry
from presidio_analyzer.nlp_engine import NlpEngine, SpacyNlpEngine, NlpArtifacts
from presidio_analyzer.context_aware_enhancers import LemmaContextAwareEnhancer

from presidio_analyzer.context_aware_enhancers import LemmaContextAwareEnhancer
from presidio_analyzer.predefined_recognizers import PhoneRecognizer
from dateutil import parser

In [None]:
print(
  for i
  in range(1,
           5)
)

### Import dataset

In [3]:
train_df = json.load(
    open("/kaggle/input/pii-detection-removal-from-educational-data/train.json")
)
print(f"len(train_df):{len(train_df)},train_df[0].keys():{train_df[0].keys()}")
print("-" * 50)
labels = set()
for i in range(len(train_df)):
    labels.update(train_df[i]["labels"])
print(f"labels:{labels}")
test_df = json.load(
    open("/kaggle/input/pii-detection-removal-from-educational-data/test.json")
)

FileNotFoundError: [Errno 2] No such file or directory: '/kaggle/input/pii-detection-removal-from-educational-data/train.json'

### create Analyzer

In [None]:
# analyzer = AnalyzerEngine()#创建文本分析器
configuration = {
    "nlp_engine_name": "spacy",
    "models": [{"lang_code": "en", "model_name": "en_core_web_lg"}],
}

# Create NLP engine based on configuration
provider = NlpEngineProvider(nlp_configuration=configuration)
nlp_engine = provider.create_engine()

# create address recognizer  创建地址分析器
address_regex = r"\b\d+\s+\w+(\s+\w+)*\s+((st(\.)?)|(ave(\.)?)|(rd(\.)?)|(blvd(\.)?)|(ln(\.)?)|(ct(\.)?)|(dr(\.)?))\b"
address_pattern = Pattern(name="address", regex=address_regex, score=0.5)
address_recognizer = PatternRecognizer(
    supported_entity="ADDRESS_CUSTOM", patterns=[address_pattern], context=["st", "Apt"]
)

# create address recognizer  创建邮箱分析器
email_regex = r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b"
email_pattern = Pattern(name="email address", regex=email_regex, score=0.5)
email_recognizer = PatternRecognizer(
    supported_entity="EMAIL_CUSTOM", patterns=[email_pattern]
)

# create url recognizer  创建URL分析器
url_regex = r"https?://\S+|www\.\S+"
url_pattern = Pattern(name="url", regex=url_regex, score=0.5)
url_recognizer = PatternRecognizer(
    supported_entity="URL_CUSTOM", patterns=[url_pattern]
)

# create phone recognizer  创建电话分析器
phone_recognizer = PhoneRecognizer(
    context=[
        "phone",
        "number",
        "telephone",
        "cell",
        "cellphone",
        "mobile",
        "call",
        "ph",
        "tel",
        "mobile",
        "Email",
    ]
)


registry = RecognizerRegistry()
registry.load_predefined_recognizers()
registry.add_recognizer(address_recognizer)
registry.add_recognizer(email_recognizer)
registry.add_recognizer(url_recognizer)
registry.add_recognizer(phone_recognizer)


# Pass the created NLP engine and supported_languages to the AnalyzerEngine
analyzer = AnalyzerEngine(
    nlp_engine=nlp_engine,
    supported_languages=["en"],
    registry=registry,
    context_aware_enhancer=LemmaContextAwareEnhancer(
        context_similarity_factor=0.8, min_score_with_context_similarity=0.4
    ),
)


# remove date info in phone number recognizer  移除日期类型的电话号码
def is_valid_date(text):
    try:
        # Attempt to parse the text as a date
        parsed_date = parser.parse(text)
        return True
    except:
        return False

### Function

In [None]:
# 对文本进行分词成下标,也就是每个词的起始位置和终止位置
def tokens2index(row):  # 传入一个json解析的数据
    tokens = row["tokens"]  # 分词的数据['apple','bool','cat',……]
    start_ind = []
    end_ind = []
    prev_ind = 0
    for tok in tokens:  # 取出一个词
        # 比如现在的位置是30,从30开始往后找index为5,那么起始位置就是35
        start = prev_ind + row["full_text"][prev_ind:].index(tok)
        end = start + len(tok)  # 起始位置+词的长度=终点位置
        # 储存这个词的起点和终点位置
        start_ind.append(start)
        end_ind.append(end)
        prev_ind = end
    return start_ind, end_ind  # 返回的是分词后每个词的起始位置和终点位置


# 二分查找,找到arr[index]=target
def find_or_next_larger(arr, target):  # arr:分词后每个词的start,target:一个实体的start
    left, right = 0, len(arr) - 1  # arr的最左边和最右边

    while left <= right:
        mid = (left + right) // 2

        if arr[mid] == target:
            return mid
        elif arr[mid] < target:
            left = mid + 1
        else:
            right = mid - 1
    return left


def count_trailing_whitespaces(word):
    # 单词的长度-单词去掉尾部空格后的长度=单词尾部的长度
    return len(word) - len(word.rstrip())

### Prediction

In [None]:
# Add URL black list 创建URL黑名单
black_list = [
    "wikipedia",
    "coursera",
    ".pdf",
    ".PDF",
    "article",
    ".png",
    ".gov",
    ".work",
    ".ai",
    ".firm",
    ".arts",
    ".store",
    ".rec",
    ".biz",
    ".travel",
]
white_list = [
    "phone",
    "number",
    "telephone",
    "cell",
    "cellphone",
    "mobile",
    "call",
    "ph",
    "tel",
    "mobile",
    "Email",
]

In [None]:
df_ = test_df  # test_df #train_df
PHONE_NUM, NAME_STUDENT, URL_PERSONAL, EMAIL, STREET_ADDRESS, ID_NUM, USERNAME = (
    [],
    [],
    [],
    [],
    [],
    [],
    [],
)

preds = []
# 查找每个词分词后的起始位置和终点位置
for i in tqdm(range(len(df_)), desc="Processing tokens2index"):
    start, end = tokens2index(df_[i])
    # 将每个词分词后的起始位置和终点位置加入json文件里.
    df_[i]["start"] = start
    df_[i]["end"] = end

for i, d in tqdm(
    enumerate(df_), total=len(df_), desc="Analyzing entities"
):  # 取出d=df_[i]
    # 传入的文本是full_text,对英文文本进行分析,需要识别的是电话号码,人,url和email这几种类型.
    # results:[type: PERSON, start: 22, end: 37, score: 0.85]
    results = analyzer.analyze(
        text=d["full_text"],
        entities=[
            "PHONE_NUMBER",
            "PERSON",
            "URL_CUSTOM",  # "IP_ADDRESS", #"URL",
            "EMAIL_ADDRESS",
            "EMAIL_CUSTOM",
            "ADDRESS_CUSTOM",
            "US_SSN",
            "US_ITIN",
            "US_PASSPORT",
            "US_BANK_NUMBER",
            "USERNAME",
        ],
        language="en",
        #                            score_threshold=0.04,
    )
    pre_preds = []
    for r in results:  # 遍历找到过的每个实体,r:[type: PERSON, start: 22, end: 37, score: 0.85]
        # 就是第s个词就是某个实体的开始
        s = find_or_next_larger(d["start"], r.start)  # d['start'][s]=r.start
        end = r.end  # 实体终点
        word = d["full_text"][r.start : r.end]  # 文本里找单词
        end = end - count_trailing_whitespaces(word)  # end减去尾部的空格就是单词自身尾部的下标
        temp_preds = [s]  # 实体单词的集合从第s个单词开始,然后连续几个单词?
        try:
            # 实体可能不是一个单词,分词的下一个单词如果还没有到达实体的尾部,就把下一个单词加上
            while d["end"][s + 1] <= end:
                temp_preds.append(s + 1)
                s += 1
        except:
            pass

        # 找出来的实体是什么,我们就给它打对应的标签
        tmp = False

        if r.entity_type == "USERNAME":
            label = "USERNAME"
            USERNAME.append(d["full_text"][r.start : r.end])

        if r.entity_type == "PHONE_NUMBER":
            # 检查是不是日期类型
            if is_valid_date(word):
                continue
            for w in white_list:
                if (
                    w
                    in d["full_text"][
                        max(r.start - 50, 0) : min(r.end + 50, len(d["full_text"]))
                    ]
                ):
                    tmp = False
                    break
                else:
                    tmp = True

            label = "PHONE_NUM"
            PHONE_NUM.append(d["full_text"][r.start : r.end])

        if r.entity_type == "PERSON":
            label = "NAME_STUDENT"
            NAME_STUDENT.append(d["full_text"][r.start : r.end])

        if r.entity_type == "ADDRESS_CUSTOM":
            label = "STREET_ADDRESS"
            STREET_ADDRESS.append(d["full_text"][r.start : r.end])

        if (
            r.entity_type == "US_SSN"
            or r.entity_type == "US_ITIN"
            or r.entity_type == "US_PASSPORT"
            or r.entity_type == "US_BANK_NUMBER"
        ):
            label = "ID_NUM"
            ID_NUM.append(d["full_text"][r.start : r.end])

        if r.entity_type == "EMAIL_ADDRESS" or r.entity_type == "EMAIL_CUSTOM":
            label = "EMAIL"
            EMAIL.append(d["full_text"][r.start : r.end])

        if (
            r.entity_type == "URL_CUSTOM"
        ):  # or r.entity_type == 'IP_ADDRESS' or "http" in word:
            # 去除掉黑名单里的标签
            for w in black_list:
                if w in word:
                    tmp = True
                    break

            label = "URL_PERSONAL"
            URL_PERSONAL.append(d["full_text"][r.start : r.end])

        if tmp:
            continue

        # 取出实体中的一个分词的下标
        for p in temp_preds:
            if len(pre_preds) > 0:  # 第2次及以后经过这里.
                """
                新开始一个r的时候,pre_preds[-1]['rlabel']还是上一个实体的r.entity_type
                此时也许会不等于这个实体的r.entity_type,换句话说,第一个等号就是还在同一个实体里.
                p - pre_preds[-1]['token']==1就是连续的意思
                """
                if pre_preds[-1]["rlabel"] == r.entity_type and (
                    p - pre_preds[-1]["token"] == 1
                ):
                    label_f = "I-" + label  # 实体的中间位置
                else:
                    label_f = "B-" + label  # 否则就是下一个实体的开始
            else:  # 第一个label是起始位置,故标记为‘B-’
                label_f = "B-" + label
            # 保存document,从第p个单词开始,标签为label_f
            pre_preds.append(
                (
                    {
                        "document": d["document"],
                        "token": p,
                        "label": label_f,
                        "rlabel": r.entity_type,  # 实体的类型
                    }
                )
            )
    preds.extend(pre_preds)  # 遍历完这个数据之后,将所有找到的实体做汇总

### Submission

In [None]:
# 得到预测结果后,最后一行r.entity_type不要,reset_index
submission = pd.DataFrame(preds).iloc[:, :-1].reset_index()
# index变成row_id,剩下3列就是submission的列名
submission.columns = ["row_id", "document", "token", "label"]
# 保存csv文件
submission.to_csv("submission.csv", index=False)
submission.head()