In [0]:
import dlt
from pyspark.sql.functions import col, current_timestamp, concat_ws

@dlt.view
def business_da_src():
    df = spark.readStream.table(
        "workspace.food_inspection_project.silver_food_inspection_dallas"
    )

    
    df = df.where(
        col("restaurant_name").isNotNull() & col("zip_code").isNotNull()
    )

    df = (
        df.withColumn(
            "business_nk",
            concat_ws("|", col("restaurant_name"), col("zip_code"))  
        )
        .withColumn("event_ts", current_timestamp())
    )

    
    return df.select(
        "business_nk",        
        "restaurant_name",
        "street_address",
        "zip_code",
        "event_ts"           
    )



dlt.create_streaming_table("business_da_type2_stage")

dlt.apply_changes(
    target="business_da_type2_stage",
    source="business_da_src",        
    keys=["business_nk"],            
    sequence_by="event_ts",          
    ignore_null_updates=True,
    stored_as_scd_type=2             
)



@dlt.table(name="dim_business_da_scd2")
def dim_business_da_scd2():
    df = dlt.read_stream("business_da_type2_stage")

    return (
        df
        .withColumnRenamed("__START_AT", "__START_AT")   
        .withColumnRenamed("__END_AT", "__END_AT")
        .withColumnRenamed("__IS_CURRENT", "__IS_CURRENT")
      
    )