# Install dependencies

# Data Import

In [1]:
import sys
sys.path.append('../src')
import helper_functions_presidio as hfp

In [2]:
import spacy
spacy.load('de_core_news_lg')

<spacy.lang.de.German at 0x26cf318aa10>

In [3]:
import os
import json
import pytz
from datetime import datetime

In [4]:
# Load .txt files from local folder and convert to dictionary
directory_path = '../data/original_texts_renamed/'
original_texts_dict = hfp.convert_txt_to_dict(directory_path)

In [5]:
# Check number of texts that were converted into dictionary
len(original_texts_dict.keys())

1000

# Import Presidio libraries

In [21]:
from presidio_analyzer import AnalyzerEngine, RecognizerRegistry, PatternRecognizer, EntityRecognizer, Pattern, RecognizerResult
from presidio_analyzer.recognizer_registry import RecognizerRegistry
from presidio_analyzer.nlp_engine import NlpEngineProvider
from presidio_analyzer.context_aware_enhancers import LemmaContextAwareEnhancer
from presidio_anonymizer import AnonymizerEngine
from presidio_anonymizer.entities import OperatorConfig
from presidio_anonymizer.entities import ConflictResolutionStrategy

## Define environment variables for Azure AI Language package

In [25]:
with open('../credentials/azure_credentials.json') as config_file:
    config = json.load(config_file)

os.environ['AZURE_AI_ENDPOINT'] = config['azure_endpoint']
os.environ['AZURE_AI_KEY'] = config['azure_key']

## Initialize Presidio Analyzer with SpaCy

In [23]:
# Load configuration file that specifies NLP engine and model for PresidioAnalyzer
conf_file = "../data/config_spacy_de.yml"

# Create NLP engine based on configuration
provider = NlpEngineProvider(conf_file = conf_file)
nlp_engine = provider.create_engine()

# Initialize Recognizer registry
registry = RecognizerRegistry()

# Add Recognizers to registry
### There are dozens of different Recognizers that can be called and added to the Presidio Analyzer, depending on the use-case. The Recognizers added here were specifically tailored to detect the entities that occured in the 1000 synthetic e-mails from E.ON. For texts with more or different entities, other Recognizers might need to be added. (For example, we didn't add a Recognizer for German IBAN numbers because they didn't occur in the texts and thus would potentially only add false positives.)

In [26]:
# Add AzureAILanguageRecognizer to registry
from presidio_analyzer.predefined_recognizers import AzureAILanguageRecognizer

azure_ai_language = AzureAILanguageRecognizer(
                                              supported_entities=["ADDRESS", "PERSON", "EMAIL", "LOCATION", "PHONENUMBER"],
                                              supported_language="de"
                                              )

registry.add_recognizer(azure_ai_language)
azure_ai_language.get_supported_entities()

['ADDRESS', 'PERSON', 'EMAIL', 'LOCATION', 'PHONENUMBER']

In [None]:
# Add SpacyRecognizer to registry
from presidio_analyzer.predefined_recognizers import SpacyRecognizer

spacy_recognizer_de = SpacyRecognizer(
                                      supported_language="de",
                                      )

registry.add_recognizer(spacy_recognizer_de)
spacy_recognizer_de.get_supported_entities()

['DATE_TIME', 'NRP', 'LOCATION', 'PERSON', 'ORGANIZATION']

In [27]:
# Add PhoneRecognizer to registry
from presidio_analyzer.predefined_recognizers import PhoneRecognizer

phone_recognizer_de = PhoneRecognizer(
                                      supported_language="de",
                                      context=["telefon", "handy", "phone" "tel", "mobil"]
                                      )

registry.add_recognizer(phone_recognizer_de)
phone_recognizer_de.get_supported_entities()

['PHONE_NUMBER']

In [28]:
# Build and add custom PatternRecognizers to registry

# Define patterns for PatternRecognizers based on regex
zaehlernr_pattern = Pattern(name="zaehlernr_pattern", regex="(?<!\d)\d{7,12}(?!\d)", score = 0.5)
vertragsnr_pattern = Pattern(name="vertragsnr_pattern", regex="(?<!\d)\d{9,12}(?!\d)", score = 0.5)
geschaftsnr_pattern = Pattern(name="geschaftsnr_pattern", regex="(?<!\d)\d{9,12}(?!\d)", score = 0.5)
rechnungsnr_pattern = Pattern(name="rechnungsnr_pattern", regex="(?<!\d)\d{9,12}(?!\d)", score = 0.5)
postlz_pattern = Pattern(name="postlz_pattern", regex="(?<!\d)\d{5}(?!\d)", score = 0.45)
strasse_pattern = Pattern(name="strasse_pattern",
                          regex="(^|\s)[A-ZÄÖÜ][a-zäöüß\- ]+ \d{1,5}($|\s)", score = 0.4)

# Build Recognizers
zaehlernr_recognizer = PatternRecognizer(
                                      supported_entity="ZAEHLERNR.",
                                      patterns = [zaehlernr_pattern],
                                      context= ["zählern", "zähler", "zahler", "hlernummer", "zaehler"],
                                      supported_language="de"
                                      )

vertragsnr_recognizer = PatternRecognizer(
                                      supported_entity="VERTRAGSNR.",
                                      patterns = [vertragsnr_pattern],
                                      context= ["vertragsnummer", "vertrag"],
                                      supported_language="de"
                                      )

gpartnernr_recognizer = PatternRecognizer(
                                      supported_entity="GESCHAEFTSPARTNERNR.",
                                      patterns = [geschaftsnr_pattern],
                                      context= ["geschäftspartnernummer", "geschäftspartner", "geschäft", "partner", "geschaefts", "kunden"],
                                      supported_language="de"
                                      )

rechungsnr_recognizer = PatternRecognizer(
                                      supported_entity="RECHNUNGSNR.",
                                      patterns = [rechnungsnr_pattern],
                                      context= ["rechnungsnummer", "rechnung", "echnung"],
                                      supported_language="de"
                                      )

postlz_recognizer = PatternRecognizer(
                                      supported_entity="PLZ",
                                      patterns = [postlz_pattern],
                                      context= ["postleitzahl", "strasse", "weg", "adresse", "plz", "platz", "gasse", "straße"],
                                      supported_language="de"
                                      )

strasse_recognizer = PatternRecognizer(
                                      supported_entity="STRASSE",
                                      patterns = [strasse_pattern],
                                      context= ["wohne", "adresse", "weg", "plz"],
                                      supported_language="de"
                                      )

# Add PatternRecognizers to registry
registry.add_recognizer(zaehlernr_recognizer)
registry.add_recognizer(vertragsnr_recognizer)
registry.add_recognizer(gpartnernr_recognizer)
registry.add_recognizer(rechungsnr_recognizer)
#registry.add_recognizer(postlz_recognizer)
#registry.add_recognizer(strasse_recognizer)

In [31]:
# Add Deny List Recognizer with German City Names

# Load txt file with German City names
german_city_names = []
with open('../data/german_cities_list.txt', 'r', encoding='utf-8') as file:
    for line in file:
        german_city_names.append(line.strip())

cities_recognizer = PatternRecognizer(supported_entity = "ORT", supported_language="de", deny_list = german_city_names)

# This Recognizer produced many false positives and therefore was not added
#registry.add_recognizer(cities_recognizer)

In [32]:
# Initialize Presidio AnalyzerEngine with registry (containing Recognizers) and context_aware_enhancer

context_aware_enhancer = LemmaContextAwareEnhancer(
                                                  context_similarity_factor = 0.45,
                                                  min_score_with_context_similarity = 0.5,
                                                  context_prefix_count = 3,
                                                  context_suffix_count = 0
                                                  )

analyzer_eon = AnalyzerEngine(
                              registry = registry,
                              nlp_engine = nlp_engine,
                              supported_languages = ["de"],
                              context_aware_enhancer = context_aware_enhancer,
                              )

In [33]:
# Print all supported entities for the PresidioAnalyzer named 'analyzer_eon'
print(analyzer_eon.get_supported_entities())

['GESCHAEFTSPARTNERNR.', 'PHONE_NUMBER', 'EMAIL', 'ADDRESS', 'PERSON', 'PHONENUMBER', 'VERTRAGSNR.', 'ZAEHLERNR.', 'RECHNUNGSNR.', 'LOCATION']


# Print all Recognizers for analyzer_eon

In [44]:
print(analyzer_eon.get_recognizers())

[<presidio_analyzer.predefined_recognizers.phone_recognizer.PhoneRecognizer object at 0x0000021976727050>, <presidio_analyzer.pattern_recognizer.PatternRecognizer object at 0x000002194090B6D0>, <presidio_analyzer.predefined_recognizers.azure_ai_language.AzureAILanguageRecognizer object at 0x000002197671BCD0>, <presidio_analyzer.pattern_recognizer.PatternRecognizer object at 0x0000021976726F10>, <presidio_analyzer.pattern_recognizer.PatternRecognizer object at 0x000002193DFAE910>, <presidio_analyzer.pattern_recognizer.PatternRecognizer object at 0x0000021940952190>]


In [45]:
# Define list of entities that the PresidioAnalyzer should detect

entities_to_detect = [
                      'ADDRESS',
                      'PHONENUMBER',
                      'PHONE_NUMBER',
                      'EMAIL',
                      'RECHNUNGSNR.',
                      'ZAEHLERNR.',
                      'PERSON',
                      'VERTRAGSNR.',
                      'GESCHAEFTSPARTNERNR.',
                      'LOCATION',
                      'ORT'
                     #'POSTLEITZAHL'
                      ]

# Run Presidio Analyzer on all texts

In [46]:
%%time
# Run Presidio Analyzer on texts, perform conflict resolution and address merging and return dict of results (predictions_dict)
# that has the correct format for later calculating the performance scores

anonymizer = AnonymizerEngine()

predictions_dict = {}
predictions_dict_recognizer_format = {}

for key, value in original_texts_dict.items():

  pred = analyzer_eon.analyze(
                              text = value,
                              language = "de",
                              entities = entities_to_detect,
                              score_threshold = 0.6,
                              )

  # Remove conflicts when overlapping entities were detected
  pred_after_conflicts = anonymizer._remove_conflicts_and_get_text_manipulation_data(
                                                                                    analyzer_results = pred,
                                                                                    conflict_resolution = ConflictResolutionStrategy
                                                                                    )

  pred_after_conflicts_sorted = sorted(pred_after_conflicts, key=lambda x: x.start)

  # Merge address entities when adjacent
  pred_merged = hfp.merge_to_address(
                                    self = anonymizer,
                                    text = value,
                                    analyzer_results = pred_after_conflicts_sorted
                                    )

  pred_merged_list = hfp.results_recognizer_to_list(pred_merged)

  predictions_dict_recognizer_format[key] = pred_merged
  predictions_dict[key] = pred_merged_list

HttpResponseError: (403) Out of call volume quota for TextAnalytics F0 pricing tier. Please retry after 12 days. To increase your call volume switch to a paid tier.
Code: 403
Message: Out of call volume quota for TextAnalytics F0 pricing tier. Please retry after 12 days. To increase your call volume switch to a paid tier.

# Run Presidio Anonymizer on all texts

In [47]:
# Run PresidioAnonymizer
# The PresidioAnonymizer takes the results of the PresidioAnalyzer and the original texts as an input and
# returns the anonymized text as an output. The operators define how the detected entities should be anonymized.

anonymized_texts_dict = {}

for key, value in predictions_dict_recognizer_format.items():

  anonymized_result = anonymizer.anonymize(
                                          text = original_texts_dict[key],
                                          analyzer_results = value,
                                          operators = {
                                                      "DEFAULT": OperatorConfig("replace", {"new_value": "<ANONYMIZED>"}),
                                                      "PERSON": OperatorConfig("replace", {"new_value": "<PERSON>"}),
                                                      "LOCATION": OperatorConfig("replace", {"new_value": "<ORT>"}),
                                                      "ZAEHLERNR.": OperatorConfig("replace", {"new_value": "<ZÄHLERNR.>"}),
                                                      "PHONE_NUMBER": OperatorConfig("replace", {"new_value": "<TELEFONNR.>"}),
                                                      "VERTRAGSNR.": OperatorConfig("replace", {"new_value": "<VERTRAGSNR.>"}),
                                                      "GESCHAEFTSPARTNERNR.": OperatorConfig("replace", {"new_value": "<GESCHÄFTSPARTNERNR.>"}),
                                                      "RECHNUNGSNR.": OperatorConfig("replace", {"new_value": "<RECHNUNGSNR.>"}),
                                                      "PLZ": OperatorConfig("replace", {"new_value": "<PLZ>"}),
                                                      "STRASSE": OperatorConfig("replace", {"new_value": "<STRASSE>"}),
                                                      "PHONENUMBER": OperatorConfig("replace", {"new_value": "<TELEFONNR.>"}),
                                                      "EMAIL": OperatorConfig("replace", {"new_value": "<EMAIL>"}),
                                                      "ADDRESS": OperatorConfig("replace", {"new_value": "<ADRESSE>"}),
                                                      "ADRESSE": OperatorConfig("replace", {"new_value": "<ADRESSE>"}),
                                                      }
                                          )

  anonymized_texts_dict[key] = anonymized_result.text

# Save anonymized texts and predictions

In [50]:
current_dt = datetime.now(pytz.timezone('Europe/Berlin')).strftime("%Y-%m-%d_%H-%M-%S") # get current datetime for folder and filenames
file_ext = "_presidio" # set string for filenames

## Save predictions and anonymized texts as .json

In [None]:
new_folder_name = os.path.join("..", "model_results", current_dt + file_ext)
os.makedirs(new_folder_name, exist_ok=True)

# Set filenames of json files
predictions_dict_filename = os.path.join(new_folder_name, "predictions_dict_" + current_dt + file_ext + ".json")
anonymized_texts_dict_filename = os.path.join(new_folder_name, "anonymized_texts_dict_" + current_dt + file_ext + ".json")

# Save predictions and anonymized_texts dictionaries as json files
with open(predictions_dict_filename, 'w') as predictions_file:
    json.dump(predictions_dict, predictions_file)

with open(anonymized_texts_dict_filename, 'w') as anonymized_texts_file:
    json.dump(anonymized_texts_dict, anonymized_texts_file)

## Save all anonymized texts as .txt files on Google Drive

In [None]:
# Set location of the local folder where you want to save the files
local_folder_path = "../data/anonymized_texts"
os.makedirs(local_folder_path, exist_ok=True)  # Ensure the folder exists, create it if it doesn't

# Set foldername and filenames for saving data
file_ext = "_GPT"  # set string for filenames
current_dt = datetime.now(pytz.timezone('Europe/Berlin')).strftime("%Y-%m-%d_%H-%M-%S")  # get current datetime for filenames
new_folder_name_txt = os.path.join(local_folder_path, current_dt + file_ext)
os.makedirs(new_folder_name_txt, exist_ok=True)  # Create a new folder with datetime

# Save each file in the local folder
for key, value in anonymized_texts_dict.items():
    filename = os.path.join(new_folder_name_txt, f"{key[:-4]}_an.txt")
    file_content = value

    # Write the content to a file in the local folder
    with open(filename, 'w') as file:
        file.write(file_content)