In [None]:
# faker package for fake names
%pip install faker

In [None]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from delta.tables import DeltaTable
import dlt

from faker import Faker
import random

In [None]:
# Define the schema for the patients table
patients_schema = StructType([
    StructField("patient_id", IntegerType(), True),
    StructField("provider_id", IntegerType(), True),
    StructField("patient_key", StringType(), True),
    StructField("patient_name", StringType(), True),
    StructField("address", StringType(), True),
    StructField("ethnicity", StringType(), True),
    StructField("date_of_birth", DateType(), True)
])

# Define the path for the Delta table
delta_table_path = "dbfs:/delta/patients_db"

# Create an empty Delta table if it doesn't exist
# this is a store for the patient data
if not DeltaTable.isDeltaTable(spark, delta_table_path):
    empty_df = spark.createDataFrame([], patients_schema)
    empty_df.write.format("delta").save(delta_table_path)

In [None]:
# then we would have a series of functions to generate fake data
# generate fake names
fake = Faker()

In [None]:
def generate_fake_name():
    return fake.name()
# register the UDF
fake_name_udf = udf(generate_fake_name, StringType())

In [None]:
# generate fake address
def generate_fake_address():
    return fake.address()
# register the UDF
fake_address_udf = udf(generate_fake_address, StringType())

In [None]:
# Define UDF for generating fake dates of birth
def generate_fake_dob():
    return fake.date_of_birth(minimum_age=0, maximum_age=90)
# Register the UDF
fake_dob_udf = udf(generate_fake_dob, DateType())


In [None]:
# generate fake ethnicities
ethnicities = [
    "Asian", "Black or African American", "Hispanic or Latino",
    "White", "Native American", "Pacific Islander", "Other"
]

def generate_fake_ethnicity():
    return random.choice(ethnicities)

fake_ethnicity_udf = udf(generate_fake_ethnicity, StringType())

In [None]:
# # this is the bronze table that ingests raw data
# @dlt.table(
#     comment="Ingest raw data",
#     table_properties={"quality": "bronze"}
# )
# def bronze_urgent_care():
#     return (
#         spark.read.table("hive_metastore.default.ae_dummy")
#     )

In [None]:
# this is the bronze table that ingests raw data
@dlt.table(
    comment="Ingest raw data using Autoloader",
    table_properties={"quality": "bronze"}
)
def bronze_urgent_care():
    return (
        spark.readStream.format("cloudFiles")
        .option("cloudFiles.format", "parquet")  # Specify the file format as Parquet
        .option("cloudFiles.inferColumnTypes", "true")  # Let Autoloader infer the column types
        .load("/mnt/aesim/")  # Path to the mounted directory
    )

In [None]:
@dlt.view(
    comment="Place holder for transformed patient data"
)
def transformed_bronze_patients():
    # Read the source data from the bronze table
    source_data = dlt.read("bronze_urgent_care")

    # Generate surrogate key for each patient
    return source_data.select("patient_id", "provider_id") \
        .withColumn("concat_id", concat(col("patient_id").cast("string"), col("provider_id").cast("string"))) \
        .withColumn("patient_key", expr("substring(sha2(concat_id, 256), 1, 16)")) \
        .withColumn("patient_name", fake_name_udf()) \
        .withColumn("address", fake_address_udf()) \
        .withColumn("ethnicity", fake_ethnicity_udf()) \
        .withColumn("date_of_birth", fake_dob_udf())

In [None]:
# merge into patients table - this allows for retrospective removals
# and updates to patient data - needed for regulatory compliance
@dlt.table(
  comment="Merged patient table",
  table_properties={"quality": "silver"}
)
def patients_db():
    # Ensure the target table schema matches the transformed data
    transformed_data = dlt.read("transformed_bronze_patients")

    # Load the Delta table
    delta_table = DeltaTable.forPath(spark, delta_table_path)

    # Merge using DataFrame operations
    delta_table.alias("target").merge(
        transformed_data.alias("source"),
        "target.patient_id = source.patient_id AND target.provider_id = source.provider_id"
    ).whenMatchedUpdate(
        set = {
            "patient_name": "source.patient_name",
            "address": "source.address",
            "ethnicity": "source.ethnicity",
            "date_of_birth": "source.date_of_birth"
        }
    ).whenNotMatchedInsert(
        values = {
            "patient_id": "source.patient_id",
            "provider_id": "source.provider_id",
            "patient_key": "source.patient_key",
            "patient_name": "source.patient_name",
            "address": "source.address",
            "ethnicity": "source.ethnicity",
            "date_of_birth": "source.date_of_birth"
        }
    ).whenNotMatchedBySourceDelete().execute()

    # Return the updated table as a DataFrame
    return spark.read.format("delta").load(delta_table_path)
    

In [None]:
@dlt.table(
    comment="Refine/check bronze data to produce silver tables",
    table_properties={"quality": "silver"}
)
@dlt.expect_or_fail("patient id check", "patient_id IS NOT NULL") # pipeline fails
@dlt.expect_or_fail("provider id check", "provider_id IS NOT NULL")
@dlt.expect("doctor id missing count", "doctor_id_seen IS NOT NULL")
def silver_urgent_care():
    # Read the bronze table data
    bronze_df = dlt.read("bronze_urgent_care")
    
    # Read the patients_db table
    patients_df = dlt.read("patients_db")
    
    # Join the bronze data with the patients data on the patient hash key
    joined_df = bronze_df.join(
        patients_df,
        (bronze_df["patient_id"] == patients_df["patient_id"]) & 
        (bronze_df["provider_id"] == patients_df["provider_id"]),
        "inner"
    )
    
    # Select the necessary columns (including those from the patients table)
    refined_df = joined_df.select(
        bronze_df["*"],  # Select just the patient key from patients data
        patients_df["patient_key"]
    )
    
    return refined_df

In [None]:
# Define the gold table with average time_in_system by provider_id
@dlt.table(
    comment="Gold table with average time_in_system by provider_id",
    table_properties={"quality": "gold"}
)
def gold_average_time_in_system():
    # Read the silver table data
    silver_df = dlt.read("silver_urgent_care")
    
    # Calculate the average time_in_system by provider_id
    avg_time_in_system_df = silver_df.groupBy("provider_id").agg(avg("time_in_system").alias("avg_time_in_system"))
    
    return avg_time_in_system_df