In [0]:
%run ./encryption_utils

In [0]:
from pyspark.sql.functions import col, trim, to_timestamp, to_date, when
from pyspark.sql.types import IntegerType, LongType
from pyspark.sql import functions as F

encryptor = PIIEncryptor() 


In [0]:
def decrypt_clean_encrypt(df, pii_fields, critical_fields, dedup_cols):
    # decrypt df
    df = encryptor.decrypt_dataframe(df, pii_fields)
    
    for c in df.columns:
        # trim whitespace
        df = df.withColumn(c, trim(col(c)))
        # replace blanks with null
        df = df.withColumn(c, when(col(c) == "", None).otherwise(col(c)))
        # cast IDs to integer/long if column name contains 'id'
        if "id" in c.lower():
            df = df.withColumn(c, col(c).cast(LongType()))
        # rename columns to lowercase
        df = df.withColumnRenamed(c, c.lower())

    # standardize date/timestamp columns
    for c in df.columns:
        if "date" in c.lower():
            df = df.withColumn(c, to_date(col(c)))
        if "time" in c.lower():
            df = df.withColumn(c, to_timestamp(col(c)))

    # remove records with nulls in critical fields
    if critical_fields:
        df = df.dropna(subset=critical_fields)

    # remove duplicates (streaming-safe)
    if dedup_cols:
        df = df \
            .withWatermark("ingest_time", "2 hours") \
            .dropDuplicates(dedup_cols)

    # encrypt PII columns again
    df = encryptor.encrypt_dataframe(df, pii_fields)

    return df

In [0]:
hotel_weather_pii_columns = [
    "address",
    "name"
]
hotel_weather_critical_fields = ["id", "wthr_date"]
hotel_weather_dedupe_cols = ["id", "wthr_date"]

# read streaming bronze
hotel_bronze = (
    spark.readStream
        .format("delta")
        .table("bronze.hotel_weather_raw")
        .withColumn("ingest_time", F.current_timestamp())
)

hotel_clean = decrypt_clean_encrypt(
    hotel_bronze,
    pii_fields=hotel_weather_pii_columns,
    critical_fields=hotel_weather_critical_fields,
    dedup_cols=hotel_weather_dedupe_cols
)

# write streaming silver
(
    hotel_clean.writeStream
        .format("delta")
        .outputMode("append")
        .option("checkpointLocation", "/mnt/checkpoints/silver_hotel_weather_n")
        .table("silver.hotel_weather_processed")
)


<pyspark.sql.streaming.query.StreamingQuery at 0x7f30360322a0>