In [1]:
# Install necessary packages: faker for generating fake PII data, transformers for text generation
!pip install faker
!pip install transformers

# Import necessary libraries
import random   # Used for random selections and shuffling
import re       # Regular expressions to detect patterns for PII redaction
from faker import Faker  # Faker is used to generate synthetic data
from transformers import pipeline  # Transformers provide pre-trained models for NLP tasks like text generation
from faker.providers import internet, bank, person, phone_number, misc  # Specialized providers for Faker
import string   # Useful for character manipulations, such as creating random strings

# Initialize the Faker object, which will help generate various types of fake data
fake = Faker()

# Adding specific providers to generate specialized fake data (internet, bank, personal details, etc.)
fake.add_provider(internet)
fake.add_provider(bank)
fake.add_provider(person)
fake.add_provider(phone_number)
fake.add_provider(misc)

# At this point, we are ready to generate realistic fake PII data for further redaction in the program.



  _torch_pytree._register_pytree_node(
  _torch_pytree._register_pytree_node(


# Define various PII Data Templates for synthetic Data Generation

In [2]:
# Define custom templates for fake PII data generation. Generated Synthetic dataset will be used for training the model.
custom_templates = [

    # Template 1-5
    "On [DATE_TIME], [PERSON] of nationality [NRP], residing at [LOCATION], updated their payment method to a card ending in [CREDIT_CARD]. A confirmation was sent to [EMAIL_ADDRESS] from IP [IP_ADDRESS].",
    "A transfer to [IBAN_CODE] from [BANK_NUMBER] was initiated on [DATE_TIME] by [PERSON], holder of passport [PASSPORT]. Any queries should be directed to [PHONE_NUMBER] or [EMAIL_ADDRESS].",
    "[PERSON], holding a driver's license [DRIVER_LICENSE] from [LOCATION], reported a misplaced wallet containing a card [CREDIT_CARD] and SSN [SSN] on [DATE_TIME].",
    "User [PERSON], with ITIN [ITIN], accessed [URL] on [DATE_TIME] from IP [IP_ADDRESS]. For account support related to [BANK_NUMBER], contact [PHONE_NUMBER].",
    "Customer [PERSON] requested an account statement for [BANK_NUMBER] to be mailed to [LOCATION]. For further assistance, reach out to [EMAIL_ADDRESS] or [PHONE_NUMBER].",

    # Template 6-10
    "On [DATE_TIME], [PERSON] from [LOCATION] called customer support from [PHONE_NUMBER] regarding an unrecognized charge on their bank statement [BANK_NUMBER].",
    "[PERSON] visited the branch at [LOCATION] on [DATE_TIME] to open a new account with IBAN [IBAN_CODE]. Contact details: [PHONE_NUMBER] and [EMAIL_ADDRESS].",
    "Email alert for [PERSON]: Your new card [CREDIT_CARD] is now linked to the account [BANK_NUMBER]. For security, this change was recorded from IP [IP_ADDRESS].",
    "A parcel for [PERSON] with ITIN [ITIN] is awaiting pickup at [LOCATION]. Please present your driver's license [DRIVER_LICENSE] and SSN [SSN] for identification.",
    "Our records show [PERSON], nationality [NRP], booked a flight using passport number [PASSPORT]. Confirmation was sent to [EMAIL_ADDRESS] and can be tracked at [URL].",

    # Template 11-15
    "Security notice for [PERSON]: An attempt to access [URL] with your credentials was made on [DATE_TIME] from IP [IP_ADDRESS]. If this wasn't you, contact us at [PHONE_NUMBER].",
    "[PERSON]'s appointment for passport renewal [PASSPORT] is set for [DATE_TIME] at [LOCATION]. The confirmation email is sent to [EMAIL_ADDRESS].",
    "During our regular audit on [DATE_TIME], we verified the account [BANK_NUMBER] and ITIN [ITIN] for [PERSON] against the national database [NRP].",
    "The car rental for [PERSON] with driver's license [DRIVER_LICENSE] was processed on [DATE_TIME]. The transaction ID [IBAN_CODE] was sent to [EMAIL_ADDRESS].",
    "[PERSON], residing at [LOCATION], updated their contact information to [PHONE_NUMBER] and [EMAIL_ADDRESS]. Their account [BANK_NUMBER] and SSN [SSN] were noted for the update.",

    # Template 16-20
    "Our new client [PERSON] with ITIN [ITIN] has registered for online banking. Account [BANK_NUMBER], IP [IP_ADDRESS], email [EMAIL_ADDRESS] are now linked.",
    "Confirmation for [PERSON]: Your transaction to [IBAN_CODE] from account [BANK_NUMBER] is complete. Details were sent to [EMAIL_ADDRESS] and can be reviewed at [URL].",
    "Notice for [PERSON]: Your recent application using SSN [SSN] and driver's license [DRIVER_LICENSE] has been approved. Please confirm your email [EMAIL_ADDRESS].",
    "[PERSON] reported losing their passport [PASSPORT] at [LOCATION] on [DATE_TIME]. A temporary hold has been placed on their bank account [BANK_NUMBER].",
    "A new connection from IP [IP_ADDRESS] was detected on [DATE_TIME] for [PERSON]'s account. Please verify recent activities with your bank [BANK_NUMBER].",

    # Template 21-25
    "Subscription for [PERSON] with email [EMAIL_ADDRESS] was renewed on [DATE_TIME]. For billing inquiries about bank number [BANK_NUMBER] or card [CREDIT_CARD], call [PHONE_NUMBER].",
    "[PERSON], with nationality [NRP], scheduled a service at [LOCATION] for [DATE_TIME]. Confirmation sent to [EMAIL_ADDRESS]. Remember to bring ID [DRIVER_LICENSE] and SSN [SSN].",
    "Account [BANK_NUMBER] owned by [PERSON] was accessed on [DATE_TIME] from new IP [IP_ADDRESS]. Verify the activity through [URL] or contact support at [PHONE_NUMBER].",
    "New login to [URL] by [PERSON] using ITIN [ITIN] from [IP_ADDRESS] on [DATE_TIME]. If this wasn't you, please notify us immediately at [EMAIL_ADDRESS].",
    "[PERSON] requested a copy of the transaction with IBAN [IBAN_CODE] to be sent to [EMAIL_ADDRESS]. For more details, visit [URL] or dial [PHONE_NUMBER].",

    # Template 26-30
    "[PERSON] with SSN [SSN] reported an error with the payment portal [URL] on [DATE_TIME]. Tech support is investigating the IP [IP_ADDRESS] and bank number [BANK_NUMBER].",
    "Customer notice for [PERSON]: Your driver's license [DRIVER_LICENSE] and passport [PASSPORT] need renewal before [DATE_TIME]. Visit [LOCATION] or contact [PHONE_NUMBER].",
    "[PERSON]'s profile updated. New address [LOCATION], phone [PHONE_NUMBER], email [EMAIL_ADDRESS], and bank details [BANK_NUMBER] confirmed on [DATE_TIME].",
    "Registration completed: [PERSON], nationality [NRP], with ITIN [ITIN]. Credentials sent to [EMAIL_ADDRESS]. Log in from [IP_ADDRESS] to confirm and complete setup.",
    "Security update: [PERSON], your password for account [BANK_NUMBER] was reset on [DATE_TIME]. A notification was sent to [EMAIL_ADDRESS] from [IP_ADDRESS].",

    # Template 31-35
    "Order confirmation for [PERSON]: Shipment to [LOCATION] will arrive on [DATE_TIME]. Tracking available at [URL]. Contact us at [PHONE_NUMBER] for issues.",
    "[PERSON]'s credit card [CREDIT_CARD] was charged on [DATE_TIME] for the purchase. Receipt sent to [EMAIL_ADDRESS]. Call [PHONE_NUMBER] for disputes.",
    "Job application received from [PERSON] on [DATE_TIME] for position in [LOCATION]. Applicant SSN [SSN] and email [EMAIL_ADDRESS] recorded for follow-up.",
    "Insurance claim filed by [PERSON] with policy number [BANK_NUMBER]. Accident report from [DATE_TIME] at [LOCATION] logged. Contact [PHONE_NUMBER] for status.",
    "[PERSON] reported a missing passport [PASSPORT] to authorities at [LOCATION] on [DATE_TIME]. A temporary document was issued and sent to [EMAIL_ADDRESS].",

    # Template 36-40
    "[PERSON] updated their emergency contact details to [PHONE_NUMBER]. The security question linked to SSN [SSN] and bank account [BANK_NUMBER] was also updated.",
    "Alert: Unauthorized login attempt on [PERSON]'s account [BANK_NUMBER] detected from IP [IP_ADDRESS] on [DATE_TIME]. Verify identity with ITIN [ITIN] at [URL].",
    "[PERSON], residing at [LOCATION], updated contact information. New phone [PHONE_NUMBER], email [EMAIL_ADDRESS], and driver's license [DRIVER_LICENSE] number recorded.",
    "Welcome [PERSON] to our service! Your account [BANK_NUMBER] is now active. Log in from [IP_ADDRESS], or call us at [PHONE_NUMBER] for support.",
    "Final notice for [PERSON]: Credit card [CREDIT_CARD] payment overdue as of [DATE_TIME]. Access to account [BANK_NUMBER] may be restricted. Contact [EMAIL_ADDRESS].",

    # Template 41-45
    "Email to [PERSON]: Your recent transaction from [IBAN_CODE] has been flagged. Please verify recent activity using your SSN [SSN] at [URL].",
    "[PERSON] with driver's license [DRIVER_LICENSE] requested roadside assistance at [LOCATION] on [DATE_TIME]. For updates, contact [PHONE_NUMBER].",
    "New device setup by [PERSON] completed. IP [IP_ADDRESS] and device ID linked to email [EMAIL_ADDRESS] and bank account [BANK_NUMBER].",
    "Customer [PERSON] made a purchase on [DATE_TIME] using card [CREDIT_CARD]. A loyalty discount was applied. Email [EMAIL_ADDRESS] for membership details.",
    "[PERSON] with nationality [NRP] applied for a travel visa. Application ID [PASSPORT] and contact [EMAIL_ADDRESS] have been logged for processing.",

    # Template 46-50
    "Alarm raised for [PERSON] at [DATE_TIME]: An attempt to access [URL] using [IP_ADDRESS] with the email [EMAIL_ADDRESS]. Bank account [BANK_NUMBER] has been put on hold.",
    "[PERSON] confirmed their attendance for the webinar on [DATE_TIME] at [LOCATION]. Registration details: SSN [SSN], contact [PHONE_NUMBER], and nationality [NRP].",
    "Update received from [PERSON]: Change of address to [LOCATION] and phone to [PHONE_NUMBER] was noted. New driver's license [DRIVER_LICENSE] was issued and sent to [EMAIL_ADDRESS].",
    "Guest [PERSON] with passport number [PASSPORT] checked in at [LOCATION] on [DATE_TIME]. Billing info: credit card [CREDIT_CARD], contact email [EMAIL_ADDRESS].",
    "[PERSON] with ITIN [ITIN] booked a consultation on [DATE_TIME]. IP address [IP_ADDRESS] and bank details [BANK_NUMBER] were verified for the appointment.",

    # Template 51-55
    "[PERSON]'s transaction to [IBAN_CODE] from account [BANK_NUMBER] was completed on [DATE_TIME]. For security, SSN [SSN] was verified and email confirmation sent to [EMAIL_ADDRESS].",
    "Car loan approved for [PERSON] on [DATE_TIME]. Vehicle to be picked up at [LOCATION]. Loan details: account [BANK_NUMBER], phone [PHONE_NUMBER].",
    "Passport application for [PERSON] with nationality [NRP] processed. New passport [PASSPORT] will be mailed to [LOCATION]. Confirmation to [EMAIL_ADDRESS].",
    "Profile update for [PERSON] successful: new email [EMAIL_ADDRESS], phone [PHONE_NUMBER], and SSN [SSN]. Visit [URL] to review changes or call support.",
    "[PERSON], residing at [LOCATION], reported a lost credit card [CREDIT_CARD] to the hotline [PHONE_NUMBER] on [DATE_TIME]. A freeze was placed on their account [BANK_NUMBER].",

    # Template 56-60
    "Welcome message sent to [PERSON] at [EMAIL_ADDRESS]. Instructions to set up their new account [BANK_NUMBER] are included. IP [IP_ADDRESS] was used for signup.",
    "[PERSON] requested a credit increase for card [CREDIT_CARD] on [DATE_TIME]. New terms sent to [EMAIL_ADDRESS]. For questions, contact [PHONE_NUMBER].",
    "[PERSON] updated their billing address for bank account [BANK_NUMBER] to [LOCATION]. New card [CREDIT_CARD] will be dispatched to this address.",
    "On [DATE_TIME], [PERSON] with ITIN [ITIN] and SSN [SSN] requested a payment deferral. The call was logged at [PHONE_NUMBER], and notes were sent to [EMAIL_ADDRESS].",
    "Alert for [PERSON]: Your IP [IP_ADDRESS] was used to schedule a payment to [IBAN_CODE] on [DATE_TIME]. If this was not authorized, please contact [PHONE_NUMBER].",

    # Template 61-65
    "[PERSON] with nationality [NRP] reviewed their transaction history for account [BANK_NUMBER] on [URL] and flagged an issue to [EMAIL_ADDRESS].",
    "Notification: [PERSON]'s driver's license [DRIVER_LICENSE] is set to expire on [DATE_TIME]. Renewal details have been sent to [EMAIL_ADDRESS].",
    "[PERSON] with passport [PASSPORT] booked a rental at [LOCATION]. Contact number [PHONE_NUMBER] provided for any changes to the booking.",
    "New IP address [IP_ADDRESS] logged for [PERSON]'s account on [DATE_TIME]. Confirm changes via [EMAIL_ADDRESS] or call [PHONE_NUMBER].",
    "Password reset request for [PERSON] was received from IP [IP_ADDRESS]. To confirm, use SSN [SSN] and bank number [BANK_NUMBER], or contact [PHONE_NUMBER].",

    # Template 66-70
    "[PERSON] with ITIN [ITIN] made a purchase on [URL] on [DATE_TIME]. Shipping to [LOCATION]. Payment was made using bank account [BANK_NUMBER].",
    "Confirmation for [PERSON]: IT support has reset your password for account [BANK_NUMBER] as requested. For any issues, email [EMAIL_ADDRESS] or call [PHONE_NUMBER].",
    "[PERSON]'s new credit card [CREDIT_CARD] was activated on [DATE_TIME] after identity verification with SSN [SSN] was completed. Confirmation email sent to [EMAIL_ADDRESS].",

   # Template 71-75
    "[PERSON] signed up for a subscription service using credit card [CREDIT_CARD] and SSN [SSN] on [DATE_TIME]. Contact support at [PHONE_NUMBER] for changes.",
    "[PERSON] with ITIN [ITIN] registered for an event at [LOCATION] on [DATE_TIME]. A confirmation email was sent to [EMAIL_ADDRESS].",
    "[PERSON] reported a fraudulent charge on their card [CREDIT_CARD] on [DATE_TIME]. Contact details: [PHONE_NUMBER], [EMAIL_ADDRESS].",
    "[PERSON] filed a dispute over the transaction made to [IBAN_CODE] on [DATE_TIME]. Bank account [BANK_NUMBER] and IP [IP_ADDRESS] were flagged.",
    "[PERSON] updated their payment details for account [BANK_NUMBER] with IBAN [IBAN_CODE]. New email [EMAIL_ADDRESS] and phone [PHONE_NUMBER] were confirmed.",

    # Template 76-80
    "A loan application by [PERSON] using SSN [SSN] and bank account [BANK_NUMBER] was submitted on [DATE_TIME]. Confirmation sent to [EMAIL_ADDRESS].",
    "[PERSON] changed their billing address for their credit card [CREDIT_CARD] to [LOCATION] on [DATE_TIME].",
    "[PERSON] logged in from IP [IP_ADDRESS] on [DATE_TIME] to review their transaction history for account [BANK_NUMBER].",
    "Account update: [PERSON]'s driver's license [DRIVER_LICENSE] and SSN [SSN] were verified on [DATE_TIME] for transaction ID [IBAN_CODE].",
    "New card [CREDIT_CARD] linked to [PERSON]'s account [BANK_NUMBER] was activated on [DATE_TIME]. Confirmation email sent to [EMAIL_ADDRESS].",

    # Template 81-85
    "[PERSON]'s request to update email [EMAIL_ADDRESS] for account [BANK_NUMBER] was processed on [DATE_TIME].",
    "A change in address for [PERSON]'s account [BANK_NUMBER] was logged at [LOCATION] on [DATE_TIME].",
    "[PERSON] logged into [URL] from IP [IP_ADDRESS] to review transactions for [BANK_NUMBER]. Contact [PHONE_NUMBER] for assistance.",
    "Notification: [PERSON]'s request to transfer funds from [BANK_NUMBER] to IBAN [IBAN_CODE] was completed on [DATE_TIME].",
    "[PERSON]'s SSN [SSN] and passport [PASSPORT] were verified for an application submitted on [DATE_TIME].",

    # Template 86-90
    "[PERSON] confirmed a payment using card [CREDIT_CARD] on [DATE_TIME].",
    "A travel booking for [PERSON] with passport [PASSPORT] was completed on [DATE_TIME] and confirmed at [EMAIL_ADDRESS].",
    "[PERSON]'s ITIN [ITIN] and driver's license [DRIVER_LICENSE] were used to process a transaction on [DATE_TIME].",
    "Profile update: [PERSON]'s new email [EMAIL_ADDRESS] and phone number [PHONE_NUMBER] were confirmed on [DATE_TIME].",
    "[PERSON]'s application for a loan using SSN [SSN] was approved on [DATE_TIME].",

    # Template 91-95
    "[PERSON] flagged a suspicious transaction made to [IBAN_CODE] from account [BANK_NUMBER] on [DATE_TIME].",
    "A request to update [PERSON]'s email [EMAIL_ADDRESS] was submitted on [DATE_TIME]. Contact [PHONE_NUMBER] for any issues.",
    "[PERSON]'s account [BANK_NUMBER] was accessed from IP [IP_ADDRESS] on [DATE_TIME]. Verification code sent to [EMAIL_ADDRESS].",
    "New application for [PERSON] was submitted on [DATE_TIME] using SSN [SSN] and passport [PASSPORT].",
    "[PERSON] confirmed their identity with ITIN [ITIN] and email [EMAIL_ADDRESS] on [DATE_TIME].",

    # Template 96-100
    "[PERSON]'s card [CREDIT_CARD] was used to make a purchase on [DATE_TIME]. Confirmation sent to [EMAIL_ADDRESS].",
    "A parcel for [PERSON] is awaiting pickup at [LOCATION]. Driver's license [DRIVER_LICENSE] and SSN [SSN] will be needed for verification.",
    "[PERSON] requested assistance with a payment made to IBAN [IBAN_CODE] on [DATE_TIME]. Contact support at [PHONE_NUMBER] or [EMAIL_ADDRESS].",
    "[PERSON] updated their SSN [SSN] and phone [PHONE_NUMBER] for their account [BANK_NUMBER] on [DATE_TIME].",
    "[PERSON]'s new driver's license [DRIVER_LICENSE] was issued on [DATE_TIME] and sent to their registered email [EMAIL_ADDRESS]."
]

# Define a list of common religions for the templates.
religions = [
    'Christianity', 'Islam', 'Hinduism', 'Buddhism', 'Sikhism',
    'Judaism', 'Baha\'i', 'Jainism', 'Shinto', 'Cao Dai',
    'Zoroastrianism', 'Tenrikyo', 'Animism', 'Neo-Paganism',
    'Unitarian Universalism', 'Rastafarianism'
]

# Define political affiliations to fill in relevant placeholders in templates.
political_groups = [
    'Democrat', 'Republican', 'Independent', 'Libertarian', 'Green',
    'Conservative', 'Liberal', 'Socialist', 'Communist', 'Centrist',
    'Progressive', 'Anarchist', 'Monarchist', 'Fascist', 'Nationalist',
    'Populist'
]

# Generate a list of 150 unique countries using the Faker library's 'country()' method.
nationalities = [fake.country() for _ in range(150)]

# Ensure the nationalities list contains unique values by converting it to a set.
nationalities = list(set(nationalities))

# print (nationalities)

# Synthetic PII dataset generation

In [3]:
# Define functions to generate different types of PII data

from faker.providers import date_time

# Initialize the GPT-2 based text generator using the distilgpt2 model
text_generator = pipeline("text-generation", model="distilgpt2")

# Function to generate random email addresses using Faker and predefined domain types
def create_email():
    user_name = fake.user_name()
    domain_types = ['free', 'isp', 'custom']
    domain_type = random.choice(domain_types)

    if domain_type == 'free':
        domains = ['gmail.com', 'yahoo.com', 'hotmail.com']
        domain = random.choice(domains)
        email = f"{user_name}@{domain}"
    elif domain_type == 'isp':
        domains = ['comcast.net', 'verizon.net', 'att.net']
        domain = random.choice(domains)
        email = f"{user_name}@{domain}"
    else:  # custom_domain
        domain = fake.domain_name()
        email = f"{user_name}@{domain}"
    return email

# Function to generate a random SSN (Social Security Number) in a standard format
def create_ssn():
    # Generate SSN with blocks of random numbers
    part1 = random.randint(100, 999)
    part2 = random.randint(10, 99)
    part3 = random.randint(1000, 9999)
    return f"{part1}-{part2}-{part3}"

# Function to generate a passport number with random alphanumeric characters
def create_passport():
    # Generate Passport Number with alphanumeric characters and diverse lengths
    length = random.choice([8, 9])  # Common lengths
    letters = string.ascii_uppercase
    digits = string.digits
    return ''.join(random.choice(letters + digits) for _ in range(length))

# Function to generate a realistic driver's license number based on state abbreviations
def create_driver_license():
    state_abbr = fake.state_abbr()
    numeric_part = ''.join(random.choices(string.digits, k=random.randint(5, 8)))
    format_choice = random.choice(['letters', 'numeric', 'mixed'])
    if format_choice == 'letters':
        letter_part = ''.join(random.choices(string.ascii_uppercase, k=2))
        license_number = f"{state_abbr}-{letter_part}{numeric_part}"
    elif format_choice == 'numeric':
        license_number = f"{state_abbr}-{numeric_part}"
    else:  # mixed
        mixed_part = ''.join(random.choices(string.ascii_uppercase + string.digits, k=2))
        license_number = f"{state_abbr}-{mixed_part}{numeric_part}"
    return license_number


# Function to generate a credit card number with optional formatting (spaces, dashes, none)
def create_card_number():
    card_number = fake.credit_card_number(card_type=None)  # Generate a basic card number
    format_choice = random.choice(['spaces', 'dashes', 'none'])

    if format_choice == 'spaces':
        # Format with spaces every 4 digits
        formatted_card_number = ' '.join(card_number[i:i+4] for i in range(0, len(card_number), 4))
    elif format_choice == 'dashes':
        # Format with dashes every 4 digits
        formatted_card_number = '-'.join(card_number[i:i+4] for i in range(0, len(card_number), 4))
    else:
        # No formatting applied
        formatted_card_number = card_number

    return formatted_card_number

# Function to generate a datetime in a random, common format
def create_date_time():
    # Define a list of common date-time formats
    formats = [
        "%Y-%m-%d %H:%M:%S",  # 2024-03-27 14:20:12
        "%d/%m/%Y %I:%M %p",  # 27/03/2024 02:20 PM
        "%A, %B %d, %Y",      # Saturday, March 27, 2024
        "%d-%b-%Y %H:%M",     # 27-Mar-2024 14:20
    ]
    # Choose a random format
    chosen_format = random.choice(formats)
    # Generate a datetime string in the chosen format
    return fake.date_time().strftime(chosen_format)

# Function to generate a phone number in various formats (international, dashed, dotted)
def create_phone():
    format_choice = random.choice(['international', 'dashes', 'dots'])
    if format_choice == 'international':
        phone_number = f"+{fake.country_calling_code()} {fake.phone_number()}"
    elif format_choice == 'dashes':
        phone_number = fake.phone_number().replace(' ', '-')
    elif format_choice == 'dots':
        phone_number = fake.phone_number().replace(' ', '.')
    return phone_number

# Function to generates a random International Bank Account Number (IBAN)
def create_iban_code():
    iban_number = fake.iban()
    format_choice = random.choice(['spaces', 'none'])
    if format_choice == 'spaces':
        formatted_iban_number = ' '.join(iban_number[i:i+4] for i in range(0, len(iban_number), 4))
    else:
        formatted_iban_number = iban_number
    return formatted_iban_number

# Function to generates a random IPv4 or an IPv6 address
def create_ip_address():
    format_choice = random.choice(['ipv4', 'ipv6'])
    if format_choice == 'ipv4':
        ip_address = fake.ipv4()
    else:
        ip_address = fake.ipv6()
    return ip_address

# Function to randomly selects a value from either nationality, religion, or political group lists
def generate_nrp():
    # Randomly choose between nationality, religion, or political group
    choice = random.choice(['nationality', 'religion', 'political_group'])
    if choice == 'nationality':
        return random.choice(nationalities)
    elif choice == 'religion':
        return random.choice(religions)
    else:
        return random.choice(political_groups)

# Function to create a random location
def create_location():
    format_choice = random.choice(['city_only', 'city_country', 'full_address'])
    if format_choice == 'city_only':
        location = fake.city()
    elif format_choice == 'city_country':
        location = f"{fake.city()}, {fake.country()}"
    else:  # full_address for more detail
        location = fake.address().replace("\n", ", ")
    return location

# Function to create random person names
def create_person_name():
    format_choice = random.choice(['simple', 'with_middle_initial', 'with_title'])
    if format_choice == 'simple':
        person_name = fake.name()
    elif format_choice == 'with_middle_initial':
        first_name, last_name = fake.first_name(), fake.last_name()
        middle_initial = random.choice(string.ascii_uppercase)
        person_name = f"{first_name} {middle_initial}. {last_name}"
    else:  # with_title
        title = random.choice(['Mr.', 'Ms.', 'Dr.', 'Prof.'])
        person_name = f"{title} {fake.name()}"
    return person_name

# Function to create random urls
def create_url():
    # Protocols
    protocol = random.choice(['http', 'https'])

    # Subdomains and domains
    subdomain = random.choice(['www', 'app', 'blog', '', 'store', 'secure', 'mail'])
    domain = fake.domain_name()

    # Top-Level Domains (TLDs)
    tld = random.choice(['com', 'org', 'net', 'io', 'co.uk', 'info', 'biz', 'edu'])

    # Paths
    paths = [
        '', '/home', '/contact-us', '/products', '/products/item', '/search',
        '/user/profile', '/login', '/signup', '/about', '/help', '/settings'
    ]
    path = random.choice(paths)

    # Query Parameters
    query_parameters = [
        '', '?ref=homepage', '?utm_source=google', '?q=searchTerm', '?page=2',
        '?sort=asc&order=price', '?id=12345', '?filter=active', '?lang=en'
    ]
    query = random.choice(query_parameters)

    # Fragments
    fragments = ['', '#section', '#comments', '#top', '#details', '#contact']
    fragment = random.choice(fragments)

    # Assemble the URL
    if subdomain:
        url = f"{protocol}://{subdomain}.{domain}.{tld}{path}{query}{fragment}"
    else:
        url = f"{protocol}://{domain}.{tld}{path}{query}{fragment}"

    return url

# Function to create random bank account numbers
def create_bank_number():
    account_number = ''.join(random.choices(string.digits, k=random.randint(8, 12)))
    format_choice = random.choice(['leading_zeroes', 'dashes', 'plain'])
    if format_choice == 'leading_zeroes':
        account_number = f"0{account_number}"
    elif format_choice == 'dashes':
        parts = [account_number[i:i+4] for i in range(0, len(account_number), 4)]
        account_number = '-'.join(parts)
    # 'plain' needs no modification
    return account_number

# Function to create random ITIN
def create_itin():
    # Generate ITIN with more diverse formats
    return f"9{random.randint(70,99)}-{random.choice(['7','8'])}{random.randint(0,9)}-{random.randint(1000,9999)}"

# Function to generate one record of PII data
def create_pii_record():
    return {
        'CREDIT_CARD': create_card_number(),
        'DATE_TIME': create_date_time(),
        'EMAIL_ADDRESS': create_email(),
        'IBAN_CODE': create_iban_code(),
        'IP_ADDRESS': create_ip_address(),
        'NRP': generate_nrp(),
        'LOCATION': create_location(),
        'PERSON': create_person_name(),
        'PHONE_NUMBER': create_phone(),
        'URL': create_url(),
        'BANK_NUMBER': create_bank_number(),
        'DRIVER_LICENSE': create_driver_license(),
        'ITIN': create_itin(),
        'PASSPORT': create_passport(),
        'SSN': create_ssn(),
    }


def remove_immediate_repetition(text):
    """Remove immediate repeated phrases in the generated text."""
    def replace(match):
        return match.group(1)  # Only keep one instance of the repetition

    # This regex looks for words that are followed by the same sequence
    # It captures phrases where the repetition is immediate and exact
    regex_pattern = r'(\b.+\b)(?: \1\b)+'
    processed_text = re.sub(regex_pattern, replace, text)
    return processed_text


# Generate PII text dataset by inserting fake PII into randomly chosen custom templates
def generate_pii_text_dataset(num_records, custom_templates):
    pii_text_dataset = []

    for _ in range(num_records):
        text_context = random.choice(custom_templates)
        pii_record = create_pii_record()

        generated_texts = text_generator(
            text_context,
            max_length=150,  # Consider reducing max_length if repetition occurs
            num_return_sequences=1,
            temperature=0.8,  # Lower temperature helps reduce randomness
            top_k=40,  # A lower top_k encourages the model to focus on more likely words
            top_p=0.85,  # Top-p sampling can also help control randomness
            truncation=True
        )
        generated_text = generated_texts[0]['generated_text']
        generated_text = remove_immediate_repetition(generated_text)
        pii_text = replace_pii(generated_text, pii_record)
        pii_text_dataset.append(pii_text)
        print("-"*50)

    return pii_text_dataset


# Function to replaces placeholders in the generated template (like [CREDIT_CARD]) with actual PII data (e.g., "1234-5678-9876-5432").
def replace_pii(text, pii_record):
    placeholders = {
        '[CREDIT_CARD]': pii_record['CREDIT_CARD'],
        '[DATE_TIME]': pii_record['DATE_TIME'],
        '[EMAIL_ADDRESS]': pii_record['EMAIL_ADDRESS'],
        '[IBAN_CODE]': pii_record['IBAN_CODE'],
        '[IP_ADDRESS]': pii_record['IP_ADDRESS'],
        '[NRP]': pii_record['NRP'],
        '[LOCATION]': pii_record['LOCATION'],
        '[PERSON]': pii_record['PERSON'],
        '[PHONE_NUMBER]': pii_record['PHONE_NUMBER'],
        '[URL]': pii_record['URL'],
        '[BANK_NUMBER]': pii_record['BANK_NUMBER'],
        '[DRIVER_LICENSE]': pii_record['DRIVER_LICENSE'],
        '[ITIN]': pii_record['ITIN'],
        '[PASSPORT]': pii_record['PASSPORT'],
        '[SSN]': pii_record['SSN'],
    }

    for placeholder, pii in placeholders.items():
        # Directly replace the placeholder with the PII data, ensuring no leading/trailing spaces
        text = text.replace(placeholder, pii.strip())

    # Remove any leftover placeholders that were not in the pii_record
    text = re.sub(r'\[[^\]]+\]', '', text)

    # Additional step to remove extra spaces and newlines
    text = re.sub(r'\s+', ' ', text).strip()  # Replace multiple spaces with one
    text = re.sub(r'\n+', '\n', text).strip()  # Replace multiple newlines with one

    return text


def find_placeholder_order(template, pii_data):
    placeholder_positions = []
    for placeholder in pii_data.keys():
        placeholder_tag = f"[{placeholder}]"
        position = template.find(placeholder_tag)
        if position != -1:
            placeholder_positions.append((position, placeholder))
    # Sort by position
    placeholder_positions.sort()
    return [placeholder for _, placeholder in placeholder_positions]

# Function to update the generated text with actual PII and records the positions of PII for redaction purposes
def generate_data_and_annotations_sequential(template, pii_data):
    annotations = []
    updated_text = template
    current_offset = 0

    # Determine the order in which placeholders appear in the template
    placeholders_order = find_placeholder_order(template, pii_data)

    for placeholder in placeholders_order:
        actual_value = pii_data[placeholder]
        placeholder_tag = f"[{placeholder}]"
        start_index = updated_text.find(placeholder_tag, current_offset)

        if start_index != -1:
            end_index = start_index + len(actual_value)
            updated_text = updated_text[:start_index] + actual_value + updated_text[start_index + len(placeholder_tag):]

            annotations.append((start_index, end_index, placeholder))

            # Update current_offset to the end of the last replacement to ensure correct indexing
            current_offset = end_index

    annotations.sort(key=lambda x: x[0])
    return updated_text, annotations

def generate_annotated_dataset_sequential(num_records):
    annotated_dataset = []

    for _ in range(num_records):
        template = random.choice(custom_templates)  # Or however you choose your template
        pii_data = create_pii_record()  # Assuming this generates your data

        text, annotations = generate_data_and_annotations_sequential(template, pii_data)
        annotated_dataset.append({"text": text, "annotations": {"entities": annotations}})

    return annotated_dataset

# Generate the dataset
num_records = 10000  # Adjust as necessary
annotated_dataset_sequential = generate_annotated_dataset_sequential(num_records)

# Example output
for record in annotated_dataset_sequential[:10]:
    print(record)

def convert_to_spacy_format(annotated_dataset_final):
    spacy_training_data = []
    for record in annotated_dataset_final:
        entities_list = record["annotations"]["entities"]
        spacy_entities = [(start, end, label) for start, end, label in entities_list]
        spacy_training_data.append((record["text"], {"entities": spacy_entities}))
    return spacy_training_data

spacy_training_data = convert_to_spacy_format(annotated_dataset_sequential)
print(len(spacy_training_data))

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


{'text': 'User Katie Cook MD, with ITIN 999-77-1709, accessed https://mail.dean-richards.org.edu on 1990-03-13 02:10:13 from IP d6a4:f22f:5b78:1cc1:7ec5:5780:c840:1a16. For account support related to 025553533725, contact 2756939473.', 'annotations': {'entities': [(5, 18, 'PERSON'), (30, 41, 'ITIN'), (52, 86, 'URL'), (90, 109, 'DATE_TIME'), (118, 157, 'IP_ADDRESS'), (190, 202, 'BANK_NUMBER'), (212, 222, 'PHONE_NUMBER')]}}
{'text': 'Nicole B. Garcia confirmed their attendance for the webinar on 19/11/2007 08:26 AM at Lake Julieville, Croatia. Registration details: SSN 115-76-4581, contact ++855 5366764295, and nationality Colombia.', 'annotations': {'entities': [(0, 16, 'PERSON'), (63, 82, 'DATE_TIME'), (86, 110, 'LOCATION'), (138, 149, 'SSN'), (159, 175, 'PHONE_NUMBER'), (193, 201, 'NRP')]}}
{'text': 'A transfer to GB22 GOHM 0444 6516 0413 50 from 01388143692 was initiated on Friday, October 21, 1994 by Heather Cummings, holder of passport 00S5KMHJ. Any queries should be directed to ++

# Check for any overlap in annotations

In [4]:
def verify_annotations_in_dataset(annotated_dataset_sequential):
    # Iterate through each record in the annotated dataset
    for record in annotated_dataset_sequential:
        # Extract the entities (start, end, label) from each record's annotations
        entities = record["annotations"]["entities"]
        # Sort the entities by their start index to compare consecutive entities
        entities.sort(key=lambda x: x[0])

        # Check for overlap between consecutive entities
        for i in range(len(entities) - 1):
            current_entity = entities[i]
            next_entity = entities[i + 1]
            # Assert that there is no overlap: the current entity's end index must be
            # less than or equal to the next entity's start index.
            assert current_entity[1] <= next_entity[0], f"Overlap found between {current_entity} and {next_entity}"

# Execute the verification function to check for overlapping annotations
try:
    verify_annotations_in_dataset(annotated_dataset_sequential)
    print("No overlaps found in annotations.")
except AssertionError as e:
    # If an overlap is found, print the details
    print(f"Validation error: {e}")

No overlaps found in annotations.


# Download a public dataset like AG_News

In [5]:
!pip install datasets
from datasets import load_dataset

# Load ag_news dataset
ag_news_dataset = load_dataset('ag_news', split='train')

# Sample a subset of the ag_news dataset for integration
ag_news_sample = ag_news_dataset.select(range(10000))

# Convert ag_news samples to the same format as the synthetic dataset
ag_news_texts = [{"text": sample['text'], "annotations": {"entities": []}} for sample in ag_news_sample]

# for record in ag_news_texts[:10]:
#    print(record)

# Merge the synthetic dataset with the ag_news samples
combined_dataset = annotated_dataset_sequential + ag_news_texts

# Convert the combined dataset to spaCy format
spacy_combined_training_data = convert_to_spacy_format(combined_dataset)
print(len(spacy_combined_training_data))

# Print the first few lines of the combined dataset
for record in spacy_combined_training_data[:10]:
    print(record)

20000
('User Katie Cook MD, with ITIN 999-77-1709, accessed https://mail.dean-richards.org.edu on 1990-03-13 02:10:13 from IP d6a4:f22f:5b78:1cc1:7ec5:5780:c840:1a16. For account support related to 025553533725, contact 2756939473.', {'entities': [(5, 18, 'PERSON'), (30, 41, 'ITIN'), (52, 86, 'URL'), (90, 109, 'DATE_TIME'), (118, 157, 'IP_ADDRESS'), (190, 202, 'BANK_NUMBER'), (212, 222, 'PHONE_NUMBER')]})
('Nicole B. Garcia confirmed their attendance for the webinar on 19/11/2007 08:26 AM at Lake Julieville, Croatia. Registration details: SSN 115-76-4581, contact ++855 5366764295, and nationality Colombia.', {'entities': [(0, 16, 'PERSON'), (63, 82, 'DATE_TIME'), (86, 110, 'LOCATION'), (138, 149, 'SSN'), (159, 175, 'PHONE_NUMBER'), (193, 201, 'NRP')]})
('A transfer to GB22 GOHM 0444 6516 0413 50 from 01388143692 was initiated on Friday, October 21, 1994 by Heather Cummings, holder of passport 00S5KMHJ. Any queries should be directed to ++218 434-595-1288 or elizabethholmes@holt.com.', 

# Preprocess & clean the combined dataset

In [6]:
# Preprocessing function to clean text
def preprocess_text(text):
    text = re.sub(r'\s+', ' ', text)  # Remove extra whitespace
    #text = re.sub(r'http\S+', '', text)  # Remove URLs
    #text = re.sub(r'\@\w+', '', text)  # Remove mentions
    #text = re.sub(r'\#\w+', '', text)  # Remove hashtags
    #text = re.sub(r'\d+', '', text)  # Remove digits
    text = text.strip()
    return text

# Apply preprocessing to the combined dataset
cleaned_spacy_combined_training_data = [
    (preprocess_text(record[0]), record[1]) for record in spacy_combined_training_data
]

# Print the first few lines of the cleaned combined dataset
for record in cleaned_spacy_combined_training_data[:10]:
    print(record)

('User Katie Cook MD, with ITIN 999-77-1709, accessed https://mail.dean-richards.org.edu on 1990-03-13 02:10:13 from IP d6a4:f22f:5b78:1cc1:7ec5:5780:c840:1a16. For account support related to 025553533725, contact 2756939473.', {'entities': [(5, 18, 'PERSON'), (30, 41, 'ITIN'), (52, 86, 'URL'), (90, 109, 'DATE_TIME'), (118, 157, 'IP_ADDRESS'), (190, 202, 'BANK_NUMBER'), (212, 222, 'PHONE_NUMBER')]})
('Nicole B. Garcia confirmed their attendance for the webinar on 19/11/2007 08:26 AM at Lake Julieville, Croatia. Registration details: SSN 115-76-4581, contact ++855 5366764295, and nationality Colombia.', {'entities': [(0, 16, 'PERSON'), (63, 82, 'DATE_TIME'), (86, 110, 'LOCATION'), (138, 149, 'SSN'), (159, 175, 'PHONE_NUMBER'), (193, 201, 'NRP')]})
('A transfer to GB22 GOHM 0444 6516 0413 50 from 01388143692 was initiated on Friday, October 21, 1994 by Heather Cummings, holder of passport 00S5KMHJ. Any queries should be directed to ++218 434-595-1288 or elizabethholmes@holt.com.', {'enti

#PII Detection Using Traditional Methods (Regex) and spaCy's NER

In [7]:
# PII Detection using traditional methods like Regex or open source NER

# Installing Required Libraries
!pip install DLNValidation
!pip install spacy-langdetect
!pip install py3DNS
!pip install validate-email-address
!pip install email-validator
!pip install python-stdnum
!pip install spacy-transformers
!pip install torch
!python -m spacy download en_core_web_trf
!pip install tldextract

import re
import spacy
from spacy.language import Language
from spacy_langdetect import LanguageDetector
from validate_email_address import validate_email
from dlnvalidation import is_valid
from datetime import datetime
from stdnum import luhn
from stdnum import iban
from stdnum.iso7064 import mod_97_10
import calendar
from dateutil.parser import parse

# Loading spaCy's Transformer-based Model and Language Detector
nlp = spacy.load("en_core_web_trf")

# Define regex patterns for different PII types
regex_patterns = {
    'CREDIT_CARD': r'\b(?:\d[ -]*?){13,19}\b',
    'DATE_TIME': r'\b(?:\d{1,4}[\/\-]\d{1,2}[\/\-]\d{1,4}|\d{1,2}:\d{2}(?::\d{2})?(?:\s?[AP]M)?)\b',
    'EMAIL_ADDRESS': r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b',
    'IBAN_CODE': r'\b[A-Z]{2}\d{2}[A-Z0-9]{4}\d{7}([A-Z0-9]?){0,16}\b',
    'IP_ADDRESS': r'\b(?:\d{1,3}\.){3}\d{1,3}\b',
    'PHONE_NUMBER': r'\b(?:\+?[\d\s]{3,}\)?[\d\s-]{6,}\d)\b',
    'URL': r'\b(?:https?://)?[^\s/$.?#].[^\s]*\.\b(?:com|org|net|edu|gov|mil|co|io|ai|[a-z]{2})\b[^\s]*',
    'BANK_NUMBER': r'\b\d{8,17}\b',
    'DRIVER_LICENSE': r'\b[A-Z]{1,2}-?\d{5,9}\b',
    'ITIN': r'\b9\d{2}-?(?:7|8)\d-?\d{4}\b',
    'PASSPORT': r'\b\d{9}\b',
    'SSN': r'\b\d{3}-?\d{2}-?\d{4}\b',
}

# Define checksum validators
def validate_credit_card(number):
    return luhn.is_valid(number)

def validate_iban(number):
    return iban.is_valid(number)

def validate_ip_address(ip):
    # This will only check the format, not the actual validity of the IP
    try:
        if ':' in ip:  # This is a basic check for IPv6 format
            return True  # We assume IPv6 is valid as we don't have a checksum method
        else:
            parts = ip.split('.')
            return len(parts) == 4 and all(0 <= int(part) < 256 for part in parts)
    except ValueError:
        return False

def get_lang_detector(nlp, name):
    return LanguageDetector()

def validate_dln(number) :
    # implement is valid for every state
    # List of US state codes to check the DLN against
    state_codes = [
        'AL', 'AK', 'AZ', 'AR', 'CA', 'CO', 'CT', 'DE', 'FL', 'GA',
        'HI', 'ID', 'IL', 'IN', 'IA', 'KS', 'KY', 'LA', 'ME', 'MD',
        'MA', 'MI', 'MN', 'MS', 'MO', 'MT', 'NE', 'NV', 'NH', 'NJ',
        'NM', 'NY', 'NC', 'ND', 'OH', 'OK', 'OR', 'PA', 'RI', 'SC',
        'SD', 'TN', 'TX', 'UT', 'VT', 'VA', 'WA', 'WV', 'WI', 'WY'
    ]

    for state_code in state_codes:
        if is_valid(number, state_code):
            return True
    return False

def detect_and_redact(text):
    # Check for IP addresses, IBAN codes, credit card numbers, and US ITIN with checksum
    for entity, pattern in regex_patterns.items():
        for match in re.finditer(pattern, text):
            if entity == 'CREDIT_CARD' and validate_credit_card(match.group()):
                text = text.replace(match.group(), '[REDACTED_CRED]')
            elif entity == 'IBAN_CODE' and validate_iban(match.group()):
                text = text.replace(match.group(), '[REDACTED_IBAN]')
            elif entity == 'EMAIL_ADDRESS' and validate_email(match.group(), verify=True):
                text = text.replace(match.group(), '[REDACTED]')
            elif entity in ['IP_ADDRESS', 'ITIN', 'PASSPORT', 'SSN', 'URL']:
                text = text.replace(match.group(), '[REDACTED]')
            elif entity == 'DRIVER_LICENSE' and validate_dln(match.group()):
                text = text.replace(match.group(), '[REDACTED]')
            elif entity == 'DATE_TIME':
                try:
                    # This will raise an exception if it's not a real date/time
                    if parse(match.group(), fuzzy=False):
                        text = text.replace(match.group(), '[REDACTED]')
                except ValueError:
                    continue
            elif entity == 'URL':
                extracted = tldextract.extract(matched_text)
                if not extracted.suffix:  # If there's no recognizable TLD
                    continue  # Consider adding a specific action here, like redacting or logging
                else:
                    text = text.replace(matched_text, '[REDACTED]')
            else:
                text = text.replace(match .group(), '[REDACTED_CUSTOM]')

    # Detect and redact named entities using spaCy
    doc = nlp(text)
    for ent in doc.ents:
        if ent.label_ in ['PERSON', 'GPE', 'ORG', 'NORP','LOC', 'DATE', 'TIME']:
            text = text.replace(ent.text, '[REDACTED_NER]')

    return text

# Example usage
example_text = """
    Sushil's email is 2022cs04045@wilp.bits-pilani.ac.in and his credit card number is 4874940012002902.
    His IP is 192.168.1.1 and his IBAN is GB33BUKB20201555555555. He lives in Bangalore in India. It is the 22nd Jan and 4pm. His religion is Hindu and he believes in Hinduism.
    He is from India. His email website is yahoo.co.in and https://in.search.yahoo.com/?fr2=inr His driving license is G454666"""

redacted_example = detect_and_redact(example_text)
print(redacted_example)

  _torch_pytree._register_pytree_node(
Collecting en-core-web-trf==3.7.3
  Downloading https://github.com/explosion/spacy-models/releases/download/en_core_web_trf-3.7.3/en_core_web_trf-3.7.3-py3-none-any.whl (457.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m457.4/457.4 MB[0m [31m2.2 MB/s[0m eta [36m0:00:00[0m
[38;5;2m✔ Download and installation successful[0m
You can now load the package via spacy.load('en_core_web_trf')
[38;5;3m⚠ Restart to reload dependencies[0m
If you are in a Jupyter or Colab notebook, you may need to restart Python in
order to load all the package's dependencies. You can do this by selecting the
'Restart kernel' or 'Restart runtime' option.


  model.load_state_dict(torch.load(filelike, map_location=device))
  with torch.cuda.amp.autocast(self._mixed_precision):



    [REDACTED_NER]'s email is [REDACTED] and his credit card number is [REDACTED_CUSTOM].
    His IP is [REDACTED] and his IBAN is [REDACTED_IBAN]. He lives in [REDACTED_NER] in [REDACTED_NER]. It is [REDACTED_NER]. His religion is [REDACTED_NER] and he believes in [REDACTED_NER]ism.
    He is from [REDACTED_NER]. His email website is [REDACTED] and [REDACTED] His driving license is [REDACTED]


# Apply redaction to synthetic dataset

In [8]:
# Apply redaction to each item in the dataset
redacted_texts = []
sample_set=spacy_training_data[0:100]
for text, annot_dict in sample_set:
    annotations = annot_dict['entities']  # Extracting the 'entities' from the annotations dictionary
  #  print ("annotations:",annotations)
  #  print ("text",text)
    redacted_text = detect_and_redact(text)
    redacted_texts.append(redacted_text)
print(len(spacy_training_data))

# Example output
for original, redacted in zip(sample_set, redacted_texts[:10]):
    print("Original:", original[0]) # Print the original, unredacted text
    print("Redacted:", redacted)    # Print the redacted version of the text
    print("-------")                # Separator between outputs

len(redacted_texts)

10000
Original: User Katie Cook MD, with ITIN 999-77-1709, accessed https://mail.dean-richards.org.edu on 1990-03-13 02:10:13 from IP d6a4:f22f:5b78:1cc1:7ec5:5780:c840:1a16. For account support related to 025553533725, contact 2756939473.
Redacted: User [REDACTED_NER], with ITIN[REDACTED_CUSTOM], accessed [REDACTED] on [REDACTED] [REDACTED] from IP d6a4:f22f:5b78:1cc1:7ec5:5780:c840:1a16. For account support related to[REDACTED_CUSTOM], contact[REDACTED_CUSTOM].
-------
Original: Nicole B. Garcia confirmed their attendance for the webinar on 19/11/2007 08:26 AM at Lake Julieville, Croatia. Registration details: SSN 115-76-4581, contact ++855 5366764295, and nationality Colombia.
Redacted: [REDACTED_NER] confirmed their attendance for the webinar on [REDACTED] [REDACTED] at [REDACTED_NER], [REDACTED_NER]. Registration details: SSN[REDACTED_CUSTOM], contact ++[REDACTED_CUSTOM], and nationality [REDACTED_NER].
-------
Original: A transfer to GB22 GOHM 0444 6516 0413 50 from 01388143692 w

100

# Evaluate the performance of the PII redaction process


In [9]:
# Evaluate the performance of the PII redaction process by calculating key metrics: precision, recall, and the F1 score.
print(len(sample_set))

def count_unredacted_entities(original_text, redacted_text, entities):

    """
    This function counts the number of entities that were not redacted in the redacted text.

    Parameters:
    - original_text: The unredacted, original text.
    - redacted_text: The text that has been processed and redacted.
    - entities: List of entity annotations with start and end positions.

    Returns:
    - unredacted_count: Number of entities that were not redacted and still present in the redacted text.
    """

    unredacted_count = 0
    for start, end, _ in entities:
        entity_text = original_text[start:end]
        # Check if the entity text is still present in the redacted text
        if entity_text in redacted_text:
            unredacted_count += 1
    return unredacted_count

def calculate_performance_metrics(annotated_data, redacted_texts):
    """
    This function calculates the key performance metrics (precision, recall, F1 score)
    for the PII redaction process based on the original and redacted texts.

    Parameters:
    - annotated_data: The original dataset with annotations (list of (text, annotations) pairs).
    - redacted_texts: The list of texts after redaction (processed texts).

    Returns:
    - precision: The precision score, which measures the percentage of correctly redacted entities out of all redactions.
    - recall: The recall score, which measures the percentage of entities that should have been redacted but were missed.
    - f1_score: The harmonic mean of precision and recall, providing a balanced performance measure.
    """
    TP = 0
    FP = 0
    FN = 0

    for ((text, annotation), redacted_text) in zip(annotated_data, redacted_texts):
        entities = annotation['entities']
        redacted_count = redacted_text.count('[REDACTED]')

        TP += len(entities) - count_unredacted_entities(text, redacted_text, entities)  # Adjusted TP calculation
        FN += count_unredacted_entities(text, redacted_text, entities)  # Now actually calculates FNs
        FP += max(0, redacted_count - len(entities))  # Extra redactions are considered FPs

    precision = TP / (TP + FP) if TP + FP > 0 else 0
    recall = TP / (TP + FN) if TP + FN > 0 else 0
    f1_score = 2 * (precision * recall) / (precision + recall) if precision + recall > 0 else 0
    # Print results and the list of unredacted entities
    print(f"\nTrue Positives (Correct Redactions): {TP}")
    print(f"False Positives (Over-redactions): {FP}")
    print(f"False Negatives (Missed Redactions): {FN}\n")
    return precision, recall, f1_score

# Example usage
precision, recall, f1_score = calculate_performance_metrics(sample_set[0:1023], redacted_texts)
print(f"Precision: {precision:.2f}, Recall: {recall:.2f}, F1 Score: {f1_score:.2f}")

# print(len(spacy_training_data))

100

True Positives (Correct Redactions): 451
False Positives (Over-redactions): 0
False Negatives (Missed Redactions): 44

Precision: 1.00, Recall: 0.91, F1 Score: 0.95


# Finetuning en_core_web_trf Model using synthetic PII dataset


In [None]:
# Finetuning Custom Model using Spacy2.0
import os
import spacy
from spacy.training import Example
import random
sample_set=spacy_training_data[0:10000]
#nlp = spacy.load("en")
nlp = spacy.load("en_core_web_trf")
existing_entities = nlp.get_pipe("ner").labels
print(existing_entities)
new_entities = ["CREDIT_CARD", "SSN","EMAIL_ADDRESS", "IP_ADDRESS",'PHONE_NUMBER','URL','BANK_NUMBER','DRIVER_LICENSE','ITIN','PASSPORT']
ner = nlp.get_pipe("ner")
for entity in new_entities:
    ner.add_label(entity)

TRAIN_DATA = sample_set[0:100]
print (len(TRAIN_DATA))
#print (TRAIN_DATA)
# Assuming nlp is your loaded or created spaCy model with the NER component
# Make sure NUM_EPOCHS is defined, e.g., NUM_EPOCHS = 10
# Shuffle the training data
random.shuffle(TRAIN_DATA)

# Begin training
optimizer = nlp.resume_training()

# Number of epochs to train the model
NUM_EPOCHS = 2
for epoch in range(NUM_EPOCHS):
    random.shuffle(TRAIN_DATA)
    losses = {}

    # Train the model on each example
    for text, annotations in TRAIN_DATA:
        doc = nlp.make_doc(text)
        example = Example.from_dict(doc, annotations)
        nlp.update([example], drop=0.5, losses=losses)

    print(f"Losses at epoch {epoch}: {losses}")

# Save the trained model to a directory
output_dir = "./trained_model"
if not os.path.exists(output_dir):
    os.makedirs(output_dir)
nlp.to_disk(output_dir)
print(f"Model saved to {output_dir}")

# Test the model immediately after training
#test_text = "Updated their emergency contact details to +919451514358. The security question linked to SSN 559-33-5905 and bank account 4854980002002907 was also updated."
test_text = "Customer Mr. Sushil Kumar requested an account statement for 873815548953 to be mailed to Delhi. For further assistance, reach out to 2022cs04045@wilp.bits-pilani.ac.in or 988.654.4600."

doc = nlp(test_text)
if not doc.ents:
    print("No entities detected immediately after training.")
else:
    for ent in doc.ents:
        print(f"Immediately after training: {ent.text} - {ent.label_}")

# Load the saved model from disk
nlp_updated = spacy.load(output_dir)

# Test the model after loading it
doc_updated = nlp_updated(test_text)
if not doc_updated.ents:
    print("No entities detected after loading the model.")
else:
    for ent in doc_updated.ents:
        print(f"After loading the model: {ent.text} - {ent.label_}")

('CARDINAL', 'DATE', 'EVENT', 'FAC', 'GPE', 'LANGUAGE', 'LAW', 'LOC', 'MONEY', 'NORP', 'ORDINAL', 'ORG', 'PERCENT', 'PERSON', 'PRODUCT', 'QUANTITY', 'TIME', 'WORK_OF_ART')
500




Losses at epoch 0: {'transformer': 4320.009489893913, 'tagger': 0.0, 'parser': 0.0, 'ner': 5157.2391159887065}
Losses at epoch 1: {'transformer': 4372.224582195282, 'tagger': 0.0, 'parser': 0.0, 'ner': 5154.860774141052}
Model saved to ./trained_model
Immediately after training: Kristen Murphy - PERSON
Immediately after training: Sanchezton - ORG
After loading the model: Kristen Murphy - PERSON
After loading the model: Sanchezton - ORG


In [11]:
import os
import spacy
from spacy.training import Example
import random
from transformers import AutoModel

sample_set=spacy_training_data[0:10000]

# Load the spaCy model. "en_core_web_trf" is a transformer-based model.
nlp = spacy.load("en_core_web_trf")

# Print the existing entities in the model to see what entities it recognizes (like PERSON, ORG, etc.).
existing_entities = nlp.get_pipe("ner").labels
print(existing_entities)  # Outputs: ['PERSON', 'ORG', 'GPE', 'LOC', etc.]

# We define new entities that we want to add to the existing model (related to PII like CREDIT_CARD, SSN, etc.).
new_entities = ["CREDIT_CARD", "SSN", "EMAIL_ADDRESS", "IP_ADDRESS", 'PHONE_NUMBER', 'URL', 'BANK_NUMBER', 'DRIVER_LICENSE', 'ITIN', 'PASSPORT']

# Get the Named Entity Recognizer (NER) pipe from the loaded model.
ner = nlp.get_pipe("ner")

# Add the new entities to the model.
for entity in new_entities:
    ner.add_label(entity)

# Use a subset of the training data for finetuning the model.sample_set comes from the generated synthetic dataset.
TRAIN_DATA = sample_set[0:100]  # Use the first 100 records for training.

# print(len(TRAIN_DATA))  # Output: 100 (length of the training data)

# Shuffle the training data to ensure randomness during training.
random.shuffle(TRAIN_DATA)

# Begin training the model.
optimizer = nlp.resume_training()

# Define the number of training epochs (how many times we want the model to go over the training data).
NUM_EPOCHS = 2

# Perform training for the defined number of epochs.
for epoch in range(NUM_EPOCHS):
    random.shuffle(TRAIN_DATA)  # Shuffle the data for each epoch.
    losses = {}  # Dictionary to store the loss value after each update (indicates how well the model is doing).

    # Iterate through the training examples and update the model.
    for text, annotations in TRAIN_DATA:
        # Create a spaCy document (doc) object from the input text.
        doc = nlp.make_doc(text)

        # Create an Example object from the text and its corresponding annotations (entities).
        example = Example.from_dict(doc, annotations)

        # Update the model with the training example, and adjust the loss based on the performance.
        nlp.update([example], drop=0.5, losses=losses)  # drop=0.5 randomly drops data to prevent overfitting.

    # Print the loss after each epoch to monitor progress.
    print(f"Losses at epoch {epoch}: {losses}")

# Save the trained model to a specified directory.
output_dir = "./trained_model"
if not os.path.exists(output_dir):
    os.makedirs(output_dir)  # Create the directory if it doesn't exist.

nlp.to_disk(output_dir)  # Save the model to disk.
print(f"Model saved to {output_dir}")

# Test the model on a new text sample immediately after training.
#test_text = "Customer Dr. Kristen Murphy requested an account statement for 873815548953 to be mailed to Sanchezton. For further assistance, reach out to christina79@aguirre-hall.net or 673.556.4431."
test_text = "Customer Mr. Sushil Kumar requested an account statement for 873815548953 to be mailed to Delhi. For further assistance, reach out to 2022cs04045@wilp.bits-pilani.ac.in or 988.654.4600."

doc = nlp(test_text)  # Create a spaCy doc object from the test text.

# Check if any named entities are detected in the test text immediately after training.
if not doc.ents:
    print("No entities detected immediately after training.")
else:
    # Print out all the detected entities and their corresponding labels.
    for ent in doc.ents:
        print(f"Immediately after training: {ent.text} - {ent.label_}")

# Load the saved model from disk to ensure that the trained model can be correctly reloaded.
# nlp_updated = spacy.load(output_dir)

# Test the reloaded model on the same test text.
# doc_updated = nlp_updated(test_text)
# if not doc_updated.ents:
#    print("No entities detected after loading the model.")
#else:
#    # Print out all the detected entities from the reloaded model.
#    for ent in doc_updated.ents:
#        print(f"After loading the model: {ent.text} - {ent.label_}")

# Count the number of parameters in the underlying transformer model (likely RoBERTa-base).
model = AutoModel.from_pretrained("roberta-base")

# Calculate and print the total number of parameters in the model.
total_params = sum(p.numel() for p in model.parameters())
print(f"\nTotal parameters in the underlying transformer model: {total_params}")

('CARDINAL', 'DATE', 'EVENT', 'FAC', 'GPE', 'LANGUAGE', 'LAW', 'LOC', 'MONEY', 'NORP', 'ORDINAL', 'ORG', 'PERCENT', 'PERSON', 'PRODUCT', 'QUANTITY', 'TIME', 'WORK_OF_ART')
Losses at epoch 0: {'transformer': 814.881168782711, 'tagger': 0.0, 'parser': 0.0, 'ner': 971.4823097121027}
Losses at epoch 1: {'transformer': 810.3824676275253, 'tagger': 0.0, 'parser': 0.0, 'ner': 974.5425711042709}
Model saved to ./trained_model
Immediately after training: Sushil Kumar - PERSON
Immediately after training: Delhi - GPE


Some weights of RobertaModel were not initialized from the model checkpoint at roberta-base and are newly initialized: ['roberta.pooler.dense.weight', 'roberta.pooler.dense.bias']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.



Total parameters in the underlying transformer model: 124645632


#Saving and Loading Redacted Text Data and Sample Set in JSON Format

In [12]:
import json
# Write the list to a file
with open('my_list_redacted.txt', 'w') as file:
    json.dump(redacted_texts, file)

# Read the list back from the file
with open('my_list_redacted.txt', 'r') as file:
    read_list = json.load(file)

print(read_list)

import json
# Write the list to a file
with open('my_list_sample.txt', 'w') as file:
    json.dump(sample_set[0:1023], file)

# Read the list back from the file
with open('my_list_sample.txt', 'r') as file:
    read_list = json.load(file)

['User [REDACTED_NER], with ITIN[REDACTED_CUSTOM], accessed [REDACTED] on [REDACTED] [REDACTED] from IP d6a4:f22f:5b78:1cc1:7ec5:5780:c840:1a16. For account support related to[REDACTED_CUSTOM], contact[REDACTED_CUSTOM].', '[REDACTED_NER] confirmed their attendance for the webinar on [REDACTED] [REDACTED] at [REDACTED_NER], [REDACTED_NER]. Registration details: SSN[REDACTED_CUSTOM], contact ++[REDACTED_CUSTOM], and nationality [REDACTED_NER].', 'A transfer to GB22 GOHM [REDACTED_CUSTOM] from[REDACTED_CUSTOM] was initiated on [REDACTED_NER] by [REDACTED_NER], holder of passport 00S5KMHJ. Any queries should be directed to ++[REDACTED_CUSTOM] or [REDACTED_CUSTOM].', '[REDACTED_NER] confirmed a payment using card [REDACTED_CUSTOM] on [REDACTED] [REDACTED].', "Update received from Ms. [REDACTED_NER]: Change of address to 0586 Gates Spring Suite 938, [REDACTED_NER], [REDACTED_NER] 89879 and phone to[REDACTED_CUSTOM] was noted. New driver's license OH-[REDACTED] was issued and sent to [REDACTE

# Finetuning custom model using spacy3.0

In [23]:
# Finetuning custom model using spacy3.0

import pandas as pd
import os
from tqdm import tqdm
import spacy
from spacy.tokens import DocBin

# Use a subset of the training data for finetuning the model.sample_set comes from the generated synthetic dataset.
TRAIN_DATA = sample_set[0:1000]  # Use the first 100 records for training.

#nlp = spacy.blank("en") # load a new spacy model
nlp = spacy.load("en_core_web_trf") # load other spacy model

db = DocBin() # create a DocBin object
for text, annot in tqdm(TRAIN_DATA): # data in previous format
    doc = nlp.make_doc(text) # create doc object from text
    ents = []
    for start, end, label in annot["entities"]: # add character indexes
        span = doc.char_span(start, end, label=label, alignment_mode="contract")
        if span is None:
            print("Skipping entity")
        else:
            ents.append(span)
    doc.ents = ents # label the text with the ents
    db.add(doc)

# os.chdir(r'XXXX\XXXXX')
db.to_disk("./train.spacy") # save the docbin object

 22%|██▏       | 216/1000 [00:00<00:00, 1086.60it/s]

Skipping entity
Skipping entity
Skipping entity
Skipping entity


 72%|███████▏  | 715/1000 [00:00<00:00, 1189.97it/s]

Skipping entity
Skipping entity
Skipping entity


100%|██████████| 1000/1000 [00:00<00:00, 1156.28it/s]

Skipping entity
Skipping entity





In [24]:
!python3 -m spacy init fill-config base_config.cfg config.cfg

  _torch_pytree._register_pytree_node(
  _torch_pytree._register_pytree_node(
[38;5;2m✔ Auto-filled config with all values[0m
[38;5;2m✔ Saved config[0m
config.cfg
You can now add your data and train your pipeline:
python -m spacy train config.cfg --paths.train ./train.spacy --paths.dev ./dev.spacy


In [None]:
!python3 -m spacy train config.cfg --output ./output --paths.train ./train.spacy --paths.dev ./train.spacy

  _torch_pytree._register_pytree_node(
[38;5;4mℹ Saving to output directory: output[0m
[38;5;4mℹ Using CPU[0m
[1m
  _torch_pytree._register_pytree_node(
[38;5;2m✔ Initialized pipeline[0m
[1m
[38;5;4mℹ Pipeline: ['tok2vec', 'ner'][0m
[38;5;4mℹ Initial learn rate: 0.001[0m
E    #       LOSS TOK2VEC  LOSS NER  ENTS_F  ENTS_P  ENTS_R  SCORE 
---  ------  ------------  --------  ------  ------  ------  ------
  0       0          0.00     39.90    0.00    0.00    0.00    0.00
  0     200       1096.23   5040.27   71.91   73.34   70.54    0.72
  1     400        512.10   1216.68   96.74   96.54   96.94    0.97
  2     600        497.81    488.10   99.23   99.18   99.29    0.99
  3     800       1031.12    155.58   99.56   99.59   99.53    1.00
  4    1000         88.04     70.80   99.84   99.84   99.84    1.00


In [17]:
import spacy
from spacy import displacy

# Load the model (adjust the path if needed)
nlp1 = spacy.load("/content/output/model-best")  # Ensures the model is uploaded in Colab's environment

# Sample input text
doc = nlp1("Kyle Williams, residing at Wilcoxland, updated their contact information to ++44 7781 (891)733-0984x46212 and jeremy48@gray.com. Their account 0525617716 and SSN 522-89-8848 were noted for the update.")

# Render the named entities in the text (for Colab, use jupyter=False or render HTML)
html = displacy.render(doc, style="ent", jupyter=False)

# To display in Colab, render the HTML in an output cell
from IPython.display import display, HTML
display(HTML(html))

In [18]:
import spacy

def redact_pii_content(text, nlp_model):
    doc = nlp_model(text)
    redacted_text = text
    for ent in doc.ents:
        redacted_text = redacted_text.replace(ent.text, '[REDACTED]')
    return redacted_text

# Loaded NER model as 'nlp1'
nlp1 = spacy.load("./output/model-best")

sample_set_new = redacted_texts   # Assuming you have your sample set defined here

redacted_texts_new = []

for text in sample_set_new:
    redacted_text = redact_pii_content(text, nlp1)
    redacted_texts_new.append(redacted_text)

print(len(sample_set_new))

# Example output
for original, redacted in zip(sample_set_new, redacted_texts_new[:2]):
    print("Original:", original[0])
    print("Redacted:", redacted)
    print("-------")

100
Original: U
Redacted: User [[[REDACTED]]]], with ITIN[[REDACTED]]_CUSTOM], accessed [[REDACTED]]] on [[REDACTED]]] [[REDACTED]]] from IP [REDACTED]. For account support related to[[REDACTED]]_CUSTOM], contact[[REDACTED]]_CUSTOM].
-------
Original: [
Redacted: [[[REDACTED]]] confirmed their attendance for the webinar on [[REDACTED]] [[REDACTED]] at [[[REDACTED]]], [[[REDACTED]]]. Registration details: SSN[[REDACTED]_CUSTOM], contact ++[[REDACTED]_CUSTOM], and nationality [[[REDACTED]]].
-------


In [19]:
def count_unredacted_entities(original_text, redacted_text, entities):
    unredacted_count = 0
    for start, end, _ in entities:
        entity_text = original_text[start:end]
        # Check if the entity text is still present in the redacted text
        if entity_text in redacted_text:
            unredacted_count += 1
    return unredacted_count

def calculate_performance_metrics(annotated_data, redacted_texts):
    TP = 0
    FP = 0
    FN = 0

    for ((text, annotation), redacted_text) in zip(annotated_data, redacted_texts):
        entities = annotation['entities']
        redacted_count = redacted_text.count('[REDACTED]')

        TP += len(entities) - count_unredacted_entities(text, redacted_text, entities)  # Adjusted TP calculation
        FN += count_unredacted_entities(text, redacted_text, entities)  # Now actually calculates FNs
        FP += max(0, redacted_count - len(entities))  # Extra redactions are considered FPs

    precision = TP / (TP + FP) if TP + FP > 0 else 0
    recall = TP / (TP + FN) if TP + FN > 0 else 0
    f1_score = 2 * (precision * recall) / (precision + recall) if precision + recall > 0 else 0
    # Print results and the list of unredacted entities
    print(f"\nTrue Positives (Correct Redactions): {TP}")
    print(f"False Positives (Over-redactions): {FP}")
    print(f"False Negatives (Missed Redactions): {FN}\n")
    return precision, recall, f1_score

# Example usage
precision, recall, f1_score = calculate_performance_metrics(sample_set[0:1023], redacted_texts_new)
print(f"Precision: {precision:.2f}, Recall: {recall:.2f}, F1 Score: {f1_score:.2f}")

print(len(spacy_training_data))

493 56 2
Precision: 0.90, Recall: 1.00, F1 Score: 0.94
10000


In [None]:
import re
import spacy

# Assuming you've already loaded your trained NER model
nlp = spacy.load("./output/model-best")  # Path to your fine-tuned model

# Function to read a text file
def read_file_content(file_path):
    with open(file_path, 'r') as file:
        return file.read()

# Function to save the redacted text to a new file
def save_to_file(redacted_text, output_file):
    with open(output_file, 'w') as file:
        file.write(redacted_text)

# Function to detect and redact PII using the NER model
def redact_pii_content(text, nlp_model):
    doc = nlp_model(text)
    redacted_text = text
    for ent in doc.ents:
        redacted_text = redacted_text.replace(ent.text, '[REDACTED]')
    return redacted_text

# Main function to process the input file
def redact_pii_in_file(input_file, output_file):
    # Step 1: Read the text from the input file
    original_text = read_file_content(input_file)

    # Step 2: Detect and redact PII in the text using the NER model
    redacted_text = redact_pii_content(original_text, nlp)

    # Step 3: Save the redacted text to the output file
    save_to_file(redacted_text, output_file)

    print(f"Redacted text saved to {output_file}")

# Example usage:
input_file_path = "input_file.txt"  # Replace with your input file path
output_file_path = "redacted_output_file.txt"  # Replace with your desired output file path

# Call the function to redact PII in the input file
redact_pii_in_file(input_file_path, output_file_path)

Redacted text saved to redacted_output_file.txt


In [None]:
import time
import re
import spacy

# Load your trained NER model
nlp = spacy.load("./output/model-best")  # Path to your fine-tuned model

# Function to read a text file
def read_file_content(file_path):
    with open(file_path, 'r') as file:
        return file.read()

# Function to save the redacted text to a new file
def save_to_file(redacted_text, output_file):
    with open(output_file, 'w') as file:
        file.write(redacted_text)

# Function to detect and redact PII using the NER model
def redact_pii_content(text, nlp_model):
    doc = nlp_model(text)
    redacted_text = text
    for ent in doc.ents:
        redacted_text = redacted_text.replace(ent.text, '[REDACTED]')
    return redacted_text

# Function to measure and print the detect and redact time
def redact_pii_in_file(input_file, output_file):
    # Step 1: Read the text from the input file
    original_text = read_file_content(input_file)

    # Step 2: Measure detection and redaction time
    start_time = time.time()

    # Detect and redact PII
    redacted_text = redact_pii_content(original_text, nlp)

    end_time = time.time()
    total_time = end_time - start_time

    # Step 3: Save the redacted text to the output file
    save_to_file(redacted_text, output_file)

    # Step 4: Print the total detection and redaction time
    print(f"Redacted text saved to {output_file}")
    print(f"Detection and Redaction Time: {total_time:.2f} seconds")

# Example usage:
input_file_path = "dataset_1000.txt"  # Replace with your input file path
output_file_path = "redacted_output_file.txt"  # Replace with your desired output file path

# Call the function to redact PII in the input file and measure time
redact_pii_in_file(input_file_path, output_file_path)

Redacted text saved to redacted_output_file.txt
Detection and Redaction Time: 17.98 seconds
