In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType,IntegerType,LongType,DateType
from pyspark.sql.functions import length, col, expr, split, regexp_extract,substring_index,to_date
from datetime import datetime
from pyspark.sql.functions import current_timestamp
# Create a SparkSession
spark = SparkSession.builder \
    .appName("Anonymization using pyspark") \
    .getOrCreate()

In [0]:
get_data = [('s1','table1','PERS_FIRST_NAME,PERS_LAST_NAME','DOB','EMAIL','HOME_PHONE_NBR'),
        ('s1','table2','User Name',None,'Email',None) ]
# Define schema
schema = StructType([
    StructField("SchemaName", StringType(), True),
    StructField("TableName", StringType(), True),
    StructField("ColumnNames", StringType(), True),
    StructField("ColumnDates", StringType(), True),
    StructField("ColumnEmail", StringType(), True),
    StructField("ColumnPhone", StringType(), True)
])

get_lookupdf = spark.createDataFrame(get_data, schema=schema)
get_lookupdf.show(truncate=False)

+----------+---------+------------------------------+-----------+-----------+--------------+
|SchemaName|TableName|ColumnNames                   |ColumnDates|ColumnEmail|ColumnPhone   |
+----------+---------+------------------------------+-----------+-----------+--------------+
|s1        |table1   |PERS_FIRST_NAME,PERS_LAST_NAME|DOB        |EMAIL      |HOME_PHONE_NBR|
|s1        |table2   |User Name                     |null       |Email      |null          |
+----------+---------+------------------------------+-----------+-----------+--------------+



In [0]:
get_data = [("Rich", "Bob","1999-09-15","rbob@gmail.com", "1234567894"),
        ("June", "Dan","1959-09-05", "jdan@gmail.com", "9987654321"),
        ("Charlie","Dawn","1969-09-26","cdawn@gmail.com", "6789054321"),
        ("Bob","Alice","1949-05-25","balice@gmail.com","5432167890")
        ]
schema = StructType([
    StructField("PERS_FIRST_NAME", StringType(), True),
    StructField("PERS_LAST_NAME", StringType(), True),
    StructField("DOB", DateType(), True),
    StructField("EMAIL", StringType(), True),
    StructField("HOME_PHONE_NBR", StringType(), True)
])

schema = ["PERS_FIRST_NAME", "PERS_LAST_NAME", "DOB","EMAIL","HOME_PHONE_NBR"]
get_df = spark.createDataFrame(get_data, schema=schema)

get_df.show(truncate=False)

+---------------+--------------+----------+----------------+--------------+
|PERS_FIRST_NAME|PERS_LAST_NAME|DOB       |EMAIL           |HOME_PHONE_NBR|
+---------------+--------------+----------+----------------+--------------+
|Rich           |Bob           |1999-09-15|rbob@gmail.com  |1234567894    |
|June           |Dan           |1959-09-05|jdan@gmail.com  |9987654321    |
|Charlie        |Dawn          |1969-09-26|cdawn@gmail.com |6789054321    |
|Bob            |Alice         |1949-05-25|balice@gmail.com|5432167890    |
+---------------+--------------+----------+----------------+--------------+



In [0]:
def anonymize_raw_file(get_df,get_table_name,get_lookupdf):
    get_lookupdf_data = get_lookupdf.filter(get_lookupdf["TableName"] == get_table_name).first()

    get_col_names = get_lookupdf_data["ColumnNames"].split(',') if get_lookupdf_data["ColumnNames"] else None
    get_col_emails = get_lookupdf_data["ColumnEmail"].split(',') if get_lookupdf_data["ColumnEmail"] else None
    get_col_dates = get_lookupdf_data["ColumnDates"].split(',') if get_lookupdf_data["ColumnDates"] else None
    get_col_phones = get_lookupdf_data["ColumnPhone"].split(',') if get_lookupdf_data["ColumnPhone"] else None
   
    if get_col_names is not None:
        for get_name in get_col_names:
            get_df = get_df.withColumn("length", length(col(get_name))) \
                   .withColumn("first_char", col(get_name).substr(1, 1)) \
                   .withColumn(get_name, expr("concat(first_char, repeat('x', length - 1))")) \
                   .drop("first_char") \
                   .drop("length")
    else:
        print("Inside column names else condition")
        
    if get_col_phones is not None:
        for get_phone in get_col_phones:
            if isinstance(get_df.schema[get_phone].dataType, (LongType, IntegerType)):
                get_df = get_df.withColumn("Phone_NBR_str", col(get_phone).cast("string")) \
                       .withColumn(get_phone, expr("substring(Phone_NBR_str, 1, 1) || repeat('0', length(Phone_NBR_str) - 1)")) \
                       .withColumn(get_phone, col(get_phone).cast("bigint")) \
                       .drop("Phone_NBR_str")
            else:
                get_df = get_df.withColumn("Phone_NBR_str", col(get_phone).cast("string")) \
                       .withColumn(get_phone, expr("substring(Phone_NBR_str, 1, 1) || repeat('0', length(Phone_NBR_str) - 1)")) \
                       .drop("Phone_NBR_str")
            
    else:
        print("Inside phone else condition")

    if get_col_emails is not None:
        for get_email in get_col_emails:
            # Extracting the substring before '@'
            get_df = get_df.withColumn("extractedname", substring_index(col(get_email), "@", 1)) \
                            .withColumn("extractedname", expr("substring(extractedname, 1, 1) || repeat('x', length(extractedname) - 1)")) \
                            .withColumn(get_email, expr("concat(extractedname, '@xxxx.com')")) \
                            .drop("extractedname")
    else:
        print("Inside email else condition ")
    
    if get_col_dates is not None:
        for get_date in get_col_dates: 
            if get_df.schema[get_date].dataType == DateType() :
                get_df = get_df.withColumn(get_date, expr("substring({0}, 1, 5) || '01-01'".format(get_date))) \
                       .withColumn(get_date, to_date(col(get_date), "yyyy-MM-dd"))
            else:
                get_df = get_df.withColumn(get_date, expr("substring({0}, 1, 5) || '01-01'".format(get_date)))
    else:
        print("Inside date else condition")   
    return get_df

