In [0]:
import dlt
from pyspark import pipelines as dp
from pyspark.sql.functions import *
from pyspark.sql.types import *

@dlt.table(name="bronze_patients_cdf", table_properties={"quality":"bronze"})
def bronze_patients_cdf():
    return dlt.read_stream("hospital_cg.raw.patients")

@dlt.table(name="bronze_providers_cdf", table_properties={"quality":"bronze"})
def bronze_providers_cdf():
    return dlt.read_stream("hospital_cg.raw.providers")

@dlt.table(name="bronze_encounters_cdf", table_properties={"quality":"bronze"})
def bronze_encounters_cdf():
    return dlt.read_stream("hospital_cg.raw.encounters")

@dlt.table(name="bronze_conditions_cdf", table_properties={"quality":"bronze"})
def bronze_conditions_cdf():
    return dlt.read_stream("hospital_cg.raw.conditions")

In [0]:
from pyspark.sql.window import Window

def clean_string(col_name):
    return trim(upper(col(col_name)))

def clean_date(col_name):
    return to_date(col(col_name), "yyyy-MM-dd")

@dlt.view
def silver_patients_cdf():
    df = spark.readStream.format("delta").option("readChangeFeed", "true").table("LIVE.bronze_patients_cdf")
    df = df.withColumn("patient_id", clean_string("Id"))
    df = df.withColumn("FirstName", clean_string("FIRST"))
    df = df.withColumn("LastName", clean_string("LAST"))
    df = df.withColumn("date_of_birth", clean_date("BIRTHDATE"))
    return df

@dlt.view
def silver_conditions_cdf():
    df = spark.readStream.format("delta").option("readChangeFeed", "true").table("LIVE.bronze_conditions_cdf")
    df = df.withColumn("visit_id", clean_string("ENCOUNTER"))
    df = df.withColumn("DiagnosisCode", clean_string("CODE"))
    df = df.withColumn("DiagnosisDescription", clean_string("DESCRIPTION"))
    df = df.withColumn("dx_start_ts", to_timestamp("START"))
    df = df.withColumn("diagnosis_sequence", lit(None).cast("int")) \
           .withColumn("is_primary", lit(None).cast("boolean"))
    return df

@dlt.view
def silver_encounters_cdf():
    df = spark.readStream.format("delta").option("readChangeFeed", "true").table("LIVE.bronze_encounters_cdf")
    df = df.withColumn("visit_id", clean_string("Id"))
    df = df.withColumn("patient_id", clean_string("PATIENT"))
    df = df.withColumn("doctor_id", clean_string("PROVIDER"))
    df = df.withColumn("visit_ts", to_timestamp("START"))
    df = df.withColumn("EncounterCode", clean_string("CODE"))
    df = df.withColumn("EncounterDescription", clean_string("DESCRIPTION"))
    df = df.withColumn("visit_date_key", date_format("visit_ts", "yyyyMMdd").cast("int"))
    return df

@dlt.view
def silver_providers_cdf():
    df = spark.readStream.format("delta").option("readChangeFeed", "true").table("LIVE.bronze_providers_cdf")
    df = df.withColumn("doctor_id", clean_string("Id"))
    df = df.withColumn("DoctorName", clean_string("NAME"))
    df = df.withColumn("Specialty", clean_string("SPECIALITY"))
    return df

In [0]:
dlt.create_streaming_table("gold_patients_type1")

dlt.apply_changes(
    target="gold_patients_type1",
    source="silver_patients_cdf",
    keys=["patient_id"],
    sequence_by=col("patient_id"),
    ignore_null_updates=True,
    apply_as_deletes=expr("_change_type = 'delete'"),    
    stored_as_scd_type=1
)

dlt.create_streaming_table("gold_conditions_type1")
dlt.apply_changes(
    target="gold_conditions_type1",
    source="silver_conditions_cdf",
    keys=["visit_id","DiagnosisCode"],
    sequence_by=struct(col("dx_start_ts"), col("DiagnosisCode")),
    apply_as_deletes=expr("false"),
    ignore_null_updates=True,
    stored_as_scd_type=1
)

dlt.create_streaming_table("gold_encounters_type1")
dlt.apply_changes(
    target="gold_encounters_type1",
    source="silver_encounters_cdf",
    keys=["visit_id"],
    sequence_by=col("visit_ts"),    apply_as_deletes=expr("_change_type = 'delete'"),
    ignore_null_updates=True,
    stored_as_scd_type=1
)

dlt.create_streaming_table("gold_providers_type1")
dlt.apply_changes(
    target="gold_providers_type1",
    source="silver_providers_cdf",
    keys=["doctor_id"],
    sequence_by=col("doctor_id"),
    apply_as_deletes=expr("false"),
    ignore_null_updates=True,
    stored_as_scd_type=1
)
