In [0]:
%pip install cryptography


In [0]:
%restart_python

In [0]:
from cryptography.fernet import Fernet

# Generate a new key
key = Fernet.generate_key()
print(key)

In [0]:
cipher = Fernet(key)

In [0]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

def encrypt_val(v):
    if v is None:
        return None
    return cipher.encrypt(v.encode()).decode()

encrypt_udf = udf(encrypt_val, StringType())

In [0]:
df_bronze_patients = spark.table("default.raw_patients")

In [0]:

row_count = df_bronze_patients.count()
print(f"Total rows: {row_count}")




In [0]:
df_bronze_patients_enc = df_bronze_patients \
  .withColumn("ssn_enc", encrypt_udf("SSN"))\
  .withColumn("Drivers_enc", encrypt_udf("DRIVERS"))\
  .withColumn("Passport_enc", encrypt_udf("PASSPORT"))\
  .withColumn("First_enc", encrypt_udf("FIRST")) \
  .withColumn("Last_enc", encrypt_udf("LAST")) \
  .withColumn("Address_enc", encrypt_udf("ADDRESS"))\
  .withColumn("Maiden_enc", encrypt_udf("MAIDEN"))\
  .drop("SSN")\
  .drop("DRIVERS")\
  .drop("PASSPORT")\
  .drop("FIRST")\
  .drop("LAST")\
  .drop("ADDRESS")\
  .drop("MAIDEN")

In [0]:
df_bronze_patients_enc.write.format("delta") \
  .saveAsTable("bronze_ehr.patients")

In [0]:
%sql
select count(*) from bronze_ehr.patients;

In [0]:
%sql
select * from bronze_ehr.patients limit 10;

In [0]:
%sql
SELECT CONCAT(CAST(ROUND(LAT, 2) AS STRING), 'XX') AS LAT_masked, CONCAT(CAST(ROUND(LON, 2) AS STRING), 'XX') AS LON_masked FROM bronze_ehr.patients LIMIT 10;

In [0]:
%sql
ALTER TABLE bronze_ehr.patients ADD COLUMNS (
    LAT_masked STRING,
    LON_masked STRING
);

UPDATE bronze_ehr.patients
SET LAT_masked = CONCAT(CAST(ROUND(LAT, 2) AS STRING), 'XX'),
    LON_masked = CONCAT(CAST(ROUND(LON, 2) AS STRING), 'XX');



In [0]:
%sql
select distinct(RACE) from bronze_ehr.patients limit 10;

In [0]:
%sql
ALTER TABLE bronze_ehr.patients ADD COLUMNS (
    BIRTH_YEAR INT,
    DEATH_YEAR INT,
    ZIP_masked STRING
);

UPDATE bronze_ehr.patients
SET BIRTH_YEAR = YEAR(BIRTHDATE),
    DEATH_YEAR = YEAR(DEATHDATE),
    ZIP_masked = CONCAT(LEFT(ZIP, 3), 'XX');

    


In [0]:
%sql
select * from bronze_ehr.patients limit 10;

In [0]:
%sql
SELECT 
  array_join(slice(split(BIRTHPLACE, '  '), 2, 10), '  ') AS BIRTHPLACE_masked
FROM bronze_ehr.patients
LIMIT 10;


In [0]:
%sql
ALTER TABLE bronze_ehr.patients ADD COLUMNS (BIRTHPLACE_masked STRING);

UPDATE bronze_ehr.patients
SET BIRTHPLACE_masked = array_join(slice(split(BIRTHPLACE, '  '), 2, 10), '  ');


In [0]:
%sql
select * from bronze_ehr.patients limit 10;
    


In [0]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()


df = spark.read.format("delta").table("bronze_ehr.patients")

df_clean = df.drop(
    "BIRTHDATE", 
    "DEATHDATE", 
    "ZIP",
    "BIRTHPLACE",
    "LAT",
    "LON"
)
df_clean.write.format("delta").mode("overwrite").option("overwriteSchema", "true").saveAsTable("bronze_ehr.patients")

In [0]:
%sql
select * from bronze_ehr.patients limit 10;

In [0]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

# Load the Delta table
df = spark.read.format("delta").table("bronze_ehr.patients")

# Rearrange columns 
df_reordered = df.select(
    "Id",
    "BIRTH_YEAR", "DEATH_YEAR" , "ssn_enc", "Drivers_enc", "Passport_enc" , "PREFIX" , "First_enc" , "Last_enc" , "SUFFIX" , "Maiden_enc" ,"MARITAL" , "RACE" , "ETHNICITY" , "GENDER" , "BIRTHPLACE_masked" , "Address_enc" , "CITY" , "STATE" , "COUNTY" , "ZIP_masked" , "LAT_masked" , "LON_masked" , "HEALTHCARE_COVERAGE" , "HEALTHCARE_EXPENSES"
   
)

# Save the reordered dataset (recommended to write into silver schema)
df_reordered.write.format("delta").mode("overwrite").option("overwriteSchema", "true").saveAsTable("bronze_ehr.patients")


In [0]:
%sql
select * from bronze_ehr.patients limit 10;

In [0]:
%sql
drop table if exists silver_ehr.patients;
    