In [None]:
from presidio_analyzer import AnalyzerEngine, PatternRecognizer, BatchAnalyzerEngine, RecognizerResult, DictAnalyzerResult
from presidio_anonymizer import AnonymizerEngine
from presidio_anonymizer.entities import OperatorConfig,EngineResult
from pyspark.sql.functions import col, pandas_udf
import pandas as pd
from pyspark.sql.types import StringType

#read the csv file into dataframe
df1 = spark.read.load('abfss://sensitive@pvdemoar4sisynapsedl.dfs.core.windows.net/customers/customers.csv', format='csv'
## If header exists uncomment line below
, header=True
)
#display(df1.limit(10))

#take a sample for detection/analysis
detectionsample = 10
dfsample= df1.limit(detectionsample).toPandas()

# DataFrame to dict
df_dict = dfsample.to_dict(orient="list")

#initialise the analyzer engine and analyze the sample for PII
analyzer = AnalyzerEngine()
batch_analyzer = BatchAnalyzerEngine(analyzer_engine=analyzer)
#batch_anonymizer = BatchAnonymizerEngine()
analyzer_results = batch_analyzer.analyze_dict(df_dict, language="en")
analyzer_results = list(analyzer_results)

#debugging line to print out results
#analyzer_results
#print(analyzer_results[3].key+':'+str(analyzer_results[3].recognizer_results[3][0]))
columntoanonymize = analyzer_results[3].key

anonymizer = AnonymizerEngine()

# broadcast the engines to the cluster nodes
broadcasted_analyzer = sc.broadcast(analyzer)
broadcasted_anonymizer = sc.broadcast(anonymizer)

# define a pandas UDF function and a series function over it.
def anonymize_text(text: str) -> str:
    analyzer = broadcasted_analyzer.value
    anonymizer = broadcasted_anonymizer.value
    analyzer_results = analyzer.analyze(text=text, language="en")
    anonymized_results = anonymizer.anonymize(
        text=text,
        analyzer_results=analyzer_results,
        operators={
            "DEFAULT": OperatorConfig("replace", {"new_value": ""})
        },
    )
    return anonymized_results.text


def anonymize_series(s: pd.Series) -> pd.Series:
    return s.apply(anonymize_text)


# define a the function as pandas UDF
anonymize = pandas_udf(anonymize_series, returnType=StringType())

# apply the udf
anonymized_df = df1.withColumn(
    columntoanonymize, anonymize(col(columntoanonymize))
)
display(anonymized_df)
