In [None]:
from datetime import date
from faker import Faker
import random
import pandas as pd
from typing import List, Optional
from transformers import pipeline, Pipeline
import torch
from transformers import PegasusForConditionalGeneration, PegasusTokenizer
from transformers.utils import logging
logging.set_verbosity_error() 

model_name = 'tuner007/pegasus_paraphrase'
torch_device = 'cuda' if torch.cuda.is_available() else 'cpu'
tokenizer = PegasusTokenizer.from_pretrained(model_name)
CONS model = PegasusForConditionalGeneration.from_pretrained(model_name).to(torch_device)

fake = Faker()
supported_masks = {
    'DOB': ["date of birth", lambda : fake.date(end_datetime = date(2010, 1, 1)) ],
    'BUILDING': ["a building name", lambda : fake.company() ] ,
    'DATE': ["a date", lambda : fake.date() ],
    'DRIVERLICENSE': ["a driver license number", lambda : fake.passport_number() ],
    'EMAIL': ["an email address", lambda : fake.simple_profile()['mail'] ],
    'GEOCOORD': ["a geographic coordinate", lambda : "("+str(fake.latlng()[0])+", "+str(fake.latlng()[1])+")" ],
    'GIVENNAME': ["a full name", lambda : fake.name_nonbinary() ],
    'FIRSTNAME': ["a first name", lambda : fake.first_name() ],
    'IDCARD': ["a passport ID number", lambda : fake.passport_number() ],
    'IPV4': ["an IP address", lambda : ".".join(str(random.randint(0, 255)) for _ in range(4))],
    'LASTNAME': ["a last name", lambda : fake.last_name() ],
    'PASSPORT': ["a passport related information", lambda : fake.passport_number() ],
    'SEX': ["gender information", lambda : ["female", "male", "non-binary"][random.randint(0,2)]],
    'SOCIALNUMBER': ["a 10-digit social security number", lambda : fake.ssn()],
    'TEL': ["a telephone number", lambda: fake.phone_number() ],
    'PHONENUMBER': ["a telephone number", lambda: fake.phone_number() ],
    'USERNAME': ["a username", lambda : fake.simple_profile()['username']],
    'TIME': ['a time of the day', lambda : str(random.randint(0, 24))+":"+str(random.randint(0, 60)) ],
    'AGE': ['an age number', lambda : str(random.randint(10, 80)) ],
    'CITY': ['a city name', lambda : fake.city() ],
    "MIDDLENAME": ['a middle name', lambda : fake.first_name()],
    "PREFIX": ['a name prefix', lambda : fake.prefix() ]
}
anonymizer_model = load_model("taro-pudding/privacy-200k-masking")

In [16]:
def load_model(model_tag: str, use_gpu: bool = False) -> Optional[Pipeline]:
    device = 0 if use_gpu else -1
    try:
        model = pipeline("token-classification", model=model_tag, tokenizer=model_tag, device=device)
        return model
    except Exception as e:
        print(f"Error loading Model: \n\n{e}")
        return None


def create_entity_map(model_output: List[dict], text: str) -> dict:
    entity_map = {}
    for token in model_output:
        start = token["start"]
        end = token["end"]
        entity = text[start: end]
        entity_map[entity] = token["entity_group"]
    return entity_map


def replace_entities(text: str, entity_map: dict) -> str:
    for word in entity_map:
        if word in text:
            text = text.replace(word, f"[{entity_map[word]}]")
    return text


def mask_pii(input_sentence: str, anonymizer: Pipeline) -> Optional[str]:
    output = anonymizer(input_sentence, aggregation_strategy="simple")
    if isinstance(output, list):
        entity_map = create_entity_map(output, input_sentence)
        return replace_entities(input_sentence, entity_map)
    else:
        print("Output is not in the expected format")
    return None


def generate_layer1_text(sentence):
    masked_text = mask_pii(sentence, anonymizer_model)
    return masked_text

def generate_layer2_text(layer1_text):
    layer2_text = layer1_text
    for mask in supported_masks.keys():
        layer2_text = layer2_text.replace("["+mask+"]", supported_masks[mask][1]())
    return layer2_text

def generate_layer3_text(layer2_text):    
    def get_response(input_text,num_return_sequences,num_beams):
      batch = tokenizer([input_text],truncation=True,padding='longest',max_length=60, return_tensors="pt").to(torch_device)
      translated = model.generate(**batch,max_length=60,num_beams=num_beams, num_return_sequences=num_return_sequences, temperature=1.5)
      tgt_text = tokenizer.batch_decode(translated, skip_special_tokens=True)
      return tgt_text

    return get_response(layer2_text, 1, 10)

def main(sentence):
    layer1_text = generate_layer1_text(sentence)
    sample = [[sentence, layer1_text, "", ""]]
    df = pd.DataFrame(sample, columns=['original_text', 'layer1_text', 'layer2_text', 'layer3_text'])
    df['layer2_text'] = df.apply(lambda row: generate_layer2_text(row['layer1_text']), axis=1)
    df['layer3_text'] = df.apply(lambda row: generate_layer3_text(row['layer2_text']), axis=1)
    print(df['layer3_text'][0])


In [17]:
main("Emily Wastson lives in 6 MetroTech Center, Brooklyn, NY 11201 and sings her social security number 100-100-2222 out loud")

['Emily Wastson sings her social security number out loud at 11 MetroTech Center in Owenview, NY.']
