In [0]:
import dlt
from pyspark.sql import functions as F

In [0]:

@dlt.table(
    name="fact_crime",
    table_properties={"delta.columnMapping.mode": "name"}
)
def fact_crime():
    
    src = dlt.read("crime_silver")
    dim_v = dlt.read("dim_victim")

     # Join to get victim_key from the dimension
    joined = (
        src.join(
            dim_v,
            (src["VICT_AGE_CLEAN"] == dim_v["age"]) &
            (src["VICT_SEX"] == dim_v["gender"]) &
            (src["VICT_DESCENT"] == dim_v["descent_code"]),
            "left"
        )
    )

    df = (
        joined
        # natural PK
        .withColumn("crime_key", F.col("DR_NO").cast("long"))

        # date keys
        .withColumn("date_occured_key", F.date_format("DATE_OCC", "yyyyMMdd").cast("int"))
        .withColumn("date_reported_key", F.date_format("DATE_RPTD", "yyyyMMdd").cast("int"))

        # time key (HHMM)
        .withColumn("time_occured_key", F.col("TIME_OCC").cast("int"))

        # location key (matches dim_location logic)
        .withColumn(
            "location_key",
            F.when(
                F.col("LAT").isNull() | F.col("LON").isNull(),
                F.lit(0)
            ).otherwise(
                F.xxhash64(
                    "LOCATION", "CROSS_STREET", "AREA", "AREA_NAME", "LAT", "LON"
                ).cast("long")
            )
        )

        # dimension FK keys
        .withColumn("premise_key", F.col("PREMIS_CD"))
        .withColumn("weapon_key", F.col("WEAPON_USED_CD"))
        .withColumn("status_key", F.col("STATUS"))
        .withColumn("primary_crime_code_key", F.col("CRM_CD"))

        # number of crime codes in record
        .withColumn(
            "crime_code_count",
            (F.col("CRM_CD").isNotNull()).cast("int") +
            (F.col("CRM_CD_1").isNotNull()).cast("int") +
            (F.col("CRM_CD_2").isNotNull()).cast("int") +
            (F.col("CRM_CD_3").isNotNull()).cast("int") +
            (F.col("CRM_CD_4").isNotNull()).cast("int")
        )

        # reporting lag
        .withColumn("days_to_report", F.datediff("DATE_RPTD", "DATE_OCC"))

        # victim FK from the dim
        .withColumn("victim_key", F.col("victim_key"))

        # quality flags
        .withColumn("is_location_valid", F.col("is_location_valid"))

        .withColumn("Source_System", F.lit("LA_CRIME"))
    )

    return df.select(
        "crime_key",
        "date_occured_key",
        "date_reported_key",
        "time_occured_key",
        "location_key",
        "premise_key",
        "weapon_key",
        "status_key",
        "victim_key",
        "primary_crime_code_key",
        "crime_code_count",
        "days_to_report",
        "is_location_valid",
        "Source_System"
    )


In [0]:
@dlt.table(
    name="bridge_crime_code",
    table_properties={"delta.columnMapping.mode": "name"}
)
def bridge_crime_code():
    
    src = dlt.read("crime_silver")

    exploded = (
        src
        # build an array of all crime-code columns
        .select(
            F.col("DR_NO").cast("long").alias("crime_key"),
            F.array("CRM_CD", "CRM_CD_1", "CRM_CD_2", "CRM_CD_3", "CRM_CD_4").alias("crime_codes")
        )
        # posexplode_outer returns (pos, col). We alias them properly here:
        .select(
            "crime_key",
            F.posexplode_outer("crime_codes").alias("seq_idx", "crime_code_key")
        )
        # keep only non-null crime codes
        .where(F.col("crime_code_key").isNotNull())
        .withColumn("code_sequence", F.col("seq_idx") + 1)
        .withColumn("is_primary_code", F.col("code_sequence") == 1)
        .withColumn(
            "weighting_factor",
            F.when(F.col("is_primary_code"), F.lit(1.0)).otherwise(F.lit(0.5))
        )
        .withColumn("Source_System", F.lit("LA_CRIME"))
    )

    return exploded.select(
        "crime_key",
        F.col("crime_code_key").cast("int").alias("crime_code_key"),
        "code_sequence",
        "is_primary_code",
        "weighting_factor",
        "Source_System"
    )