In [0]:
import dlt
from pyspark.sql.functions import col, current_timestamp

# ------------------------------------
# 1) Source view from Silver Business
# ------------------------------------
@dlt.view
def business_chi_src():
    return (
        spark.readStream.table("workspace.food_inspection_project.silver_business_chi")
            .where(col("license_number").isNotNull())
            .selectExpr(
                "CAST(license_number AS STRING) AS business_nk",
                "dba_name",
                "aka_name",
                "current_timestamp() AS event_ts"
            )
    )

# ------------------------------------
# 2) SCD2 stage table
# ------------------------------------
dlt.create_streaming_table("business_chi_type2_stage")

dlt.apply_changes(
    target="business_chi_type2_stage",
    source="business_chi_src",
    keys=["business_nk"],
    sequence_by="event_ts",
    stored_as_scd_type=2,
    ignore_null_updates=True
)

# ------------------------------------
# 3) Final curated SCD2 dimension
#    (★ IMPORTANT: use dlt.read, NOT dlt.read_stream ★)
# ------------------------------------
@dlt.table(name="dim_business_chi_scd2")
def dim_business_chi_scd2():
    df = dlt.read("business_chi_type2_stage")   # <-- changed from read_stream

    return (
        df
        .withColumnRenamed("__START_AT",   "effective_start_dt")
        .withColumnRenamed("__END_AT",     "effective_end_dt")
        .withColumnRenamed("__IS_CURRENT", "is_current")
    )

