In [0]:
import dlt
from pyspark.sql.functions import *
from pyspark.sql.types import *
import re

def clean_columns(df):
    new_df = df
    for old in df.columns:
        clean = old.strip()
        clean = re.sub(r'[^A-Za-z0-9_]', '_', clean)
        clean = re.sub('_+', '_', clean)
        new_df = new_df.withColumnRenamed(old, clean)
    return new_df


In [0]:
@dlt.table(
    name="bronze_chi",
    comment="Chicago raw data with cleaned column names"
)
def bronze_chi():
    return clean_columns(
        spark.read.option("header", True)
        .csv("/Volumes/workspace/damg7370/datastore/Cleaned_Chicago.csv")
    )


@dlt.table(
    name="bronze_dal",
    comment="Dallas raw data with cleaned column names"
)
def bronze_dal():
    df = spark.read.option("header", True).csv(
        "/Volumes/workspace/damg7370/datastore/Cleaned_Dallas.csv"
    )
    
    # Fix weird Dallas column
    if "Description " in df.columns:
        df = df.withColumnRenamed("Description ", "Description_dallas")

    return clean_columns(df)


In [0]:
# ----------------------------------------------
#              SILVER FINAL (FIXED)
# ----------------------------------------------
@dlt.table(
    name="food_silver_final",
    comment="Unified Silver layer Chicago + Dallas"
)
def food_silver_final():

    chi = dlt.read("bronze_chi")
    dal = dlt.read("bronze_dal")

    # -------------------------------
    # CHICAGO STANDARDIZATION
    # -------------------------------
    chi = (
        chi
        .withColumnRenamed("InspectionID", "inspection_id")
        .withColumnRenamed("DBA_Name", "dba_name")
        .withColumnRenamed("AKA_Name", "aka_name")
        .withColumnRenamed("Facility_Type", "facility_type")
        .withColumnRenamed("Inspection_Date", "inspection_date")
        .withColumnRenamed("Inspection_Type", "inspection_type")
        .withColumnRenamed("Results", "inspection_result")
        .withColumnRenamed("Violation_Description", "violation_description_raw_chi")
        .withColumnRenamed("Zip", "zip_code")
        .withColumn("source_city", lit("Chicago"))
        .withColumn("restaurant_name", coalesce(col("dba_name"), col("aka_name")))
        .drop("dba_name", "aka_name")
    )

    # -------------------------------
    # DALLAS STANDARDIZATION
    # -------------------------------
    dal = (
        dal
        .withColumnRenamed("RecordID", "inspection_id")
        .withColumnRenamed("Inspection_Type", "inspection_type")
        .withColumnRenamed("Inspection_Date", "inspection_date")
        .withColumnRenamed("Results", "inspection_result")
        .withColumnRenamed("Street_Address", "address")
        .withColumnRenamed("Zip_Code", "zip_code")
        .withColumnRenamed("Facality_Type", "facility_type")
        .withColumnRenamed("Violation_desc", "violation_description_raw_dal")
        .withColumnRenamed("Description_dallas", "violation_description_raw_dal_extra")
        .withColumn("source_city", lit("Dallas"))
        .withColumn("restaurant_name", col("restaurant_name"))
    )

    # -------------------------------
    # UNION
    # -------------------------------
    df = chi.unionByName(dal, allowMissingColumns=True)

    # -------------------------------
    # UNIFIED VIOLATION DESCRIPTION
    # -------------------------------
    df = df.withColumn(
        "violation_description",
        coalesce(
            col("violation_description_raw_chi"),
            col("violation_description_raw_dal"),
            col("violation_description_raw_dal_extra")
        )
    )

    # -------------------------------
    # FIX VIOLATION CODE
    # -------------------------------
    from pyspark.sql.window import Window

    df = df.withColumn(
        "violation_code",
        when(col("violation_code").isNotNull(), col("violation_code"))
        .otherwise(
            concat(
                lit("DAL_"),
                lpad(
                    row_number().over(Window.orderBy(monotonically_increasing_id())),
                    6,
                    "0"
                )
            )
        )
    )

    # -------------------------------
    # FINAL CLEANING
    # -------------------------------
    df = (
        df.withColumn("zip_code", regexp_extract("zip_code", r'\d{5}', 0))
          .withColumn("inspection_date", to_date("inspection_date"))
          .withColumn("latitude", col("Latitude").cast("double"))
          .withColumn("longitude", col("Longitude").cast("double"))
          .dropDuplicates(["inspection_id"])
          .filter(col("inspection_id").isNotNull())
    )

    # -------------------------------
    # NATURAL KEY FOR RESTAURANT
    # -------------------------------
    df = df.withColumn(
        "restaurant_nk",
        sha2(
            concat_ws(
                "||",
                col("restaurant_name"),
                col("address"),
                col("zip_code")
            ),
            256
        )
    )

    return df


In [0]:

import dlt
from pyspark.sql.functions import *
from pyspark.sql import functions as F

# ---------------- DIM_DATE ----------------
@dlt.table(name="dim_date", comment="Date dimension")
def dim_date():
    df = dlt.read("food_silver_final")

    dim = (
        df.select(col("inspection_date").alias("full_date"))
          .filter(col("full_date").isNotNull())
          .dropDuplicates()
          .withColumn("date_key", date_format("full_date", "yyyyMMdd").cast("int"))
          .withColumn("year", year("full_date"))
          .withColumn("month", month("full_date"))
          .withColumn("day", dayofmonth("full_date"))
          .withColumn("quarter", quarter("full_date"))
    )
    return dim


# ---------------- DIM_CITY ----------------
@dlt.table(name="dim_city", comment="City dimension")
def dim_city():
    df = dlt.read("food_silver_final")
    return (
        df.select(col("source_city").alias("city_name"))
          .dropDuplicates()
          .withColumn("city_key", monotonically_increasing_id())
    )


# ---------------- DIM_RESTAURANT ----------------
@dlt.table(
    name="dim_restaurant",
    comment="Restaurant dimension with SCD Type-2"
)
def dim_restaurant():

    df = dlt.read("food_silver_final").select(
        col("restaurant_name"),
        col("address"),
        col("zip_code"),
        col("license").alias("license_number"),
        col("source_city")
    )

    # Natural Key for SCD2
    df = df.withColumn(
        "restaurant_nk",
        sha2(
            concat_ws(
                "||",
                col("restaurant_name"),
                col("address"),
                col("zip_code")
            ), 256
        )
    )

    # SCD2 Metadata
    df = (
        df.withColumn("effective_date", current_date())
          .withColumn("end_date", lit(None).cast("date"))
          .withColumn("is_current", lit(True))
    )

    # Surrogate Key
    df = df.withColumn("restaurant_key", monotonically_increasing_id())

    return df




# ---------------- DIM_VIOLATION ----------------
@dlt.table(name="dim_violation", comment="Violation dimension")
def dim_violation():
    df = dlt.read("food_silver_final")
    return (
        df.select("violation_code", "violation_description")
          .dropDuplicates()
          .withColumn("violation_key", monotonically_increasing_id())
    )


# ---------------- DIM_RESULT ----------------
@dlt.table(name="dim_result", comment="Inspection result dimension")
def dim_result():
    df = dlt.read("food_silver_final")

    return (
        df.select(col("inspection_result").alias("result_name"))
          .dropDuplicates()
          .withColumn("inspection_result_key", monotonically_increasing_id())
          .withColumn(
                "derived_score",
                when(col("result_name") == "Pass", 90)
                .when(col("result_name") == "Pass w/ Conditions", 80)
                .when(col("result_name") == "Fail", 70)
                .otherwise(60)
          )
    )


# ---------------- DIM_TYPE ----------------
@dlt.table(name="dim_type", comment="Inspection type dimension")
def dim_type():
    df = dlt.read("food_silver_final")
    return (
        df.select("inspection_type")
          .dropDuplicates()
          .withColumn("inspection_type_key", monotonically_increasing_id())
    )


# ---------------- DIM_FACILITY ----------------
@dlt.table(name="dim_facility_type", comment="Facility type dimension")
def dim_facility_type():
    df = dlt.read("food_silver_final")
    return (
        df.select("facility_type")
          .dropDuplicates()
          .withColumn("facility_type_key", monotonically_increasing_id())
    )


In [0]:
@dlt.table(
    name="fact_food_inspection",
    comment="Fact table depending ONLY on dimensions (no silver lineage)"
)
def fact_food_inspection():

    s = (
        spark.table("workspace.default.food_silver_final")
        .select(
            "inspection_id",
            "inspection_date",
            "inspection_type",
            "inspection_result",
            "violation_code",
            "violation_description",
            "restaurant_nk",       # <-- REQUIRED FOR JOIN
            "restaurant_name",
            "address",
            "zip_code",
            "facility_type",
            "source_city",
            "derivedscore",
            "risk"
        )
    )

    d_date      = dlt.read("dim_date")
    d_city      = dlt.read("dim_city")
    d_rest      = dlt.read("dim_restaurant")
    d_violation = dlt.read("dim_violation")
    d_result    = dlt.read("dim_result")
    d_type      = dlt.read("dim_type")
    d_fac       = dlt.read("dim_facility_type")

    fact = (
        s.alias("s")

        # DATE DIM
        .join(
            d_date.alias("dd"),
            col("s.inspection_date") == col("dd.full_date"),
            "left"
        )

        # CITY DIM
        .join(
            d_city.alias("dc"),
            col("s.source_city") == col("dc.city_name"),
            "left"
        )

        # RESTAURANT DIM (SCD2 - NK join)
        .join(
            d_rest.alias("dr"),
            col("s.restaurant_nk") == col("dr.restaurant_nk"),
            "left"
        )

        # VIOLATION DIM
        .join(
            d_violation.alias("dv"),
            col("s.violation_description") == col("dv.violation_description"),
            "left"
        )

        # RESULT DIM
        .join(
            d_result.alias("res"),
            col("s.inspection_result") == col("res.result_name"),
            "left"
        )

        # TYPE DIM
        .join(
            d_type.alias("dt"),
            col("s.inspection_type") == col("dt.inspection_type"),
            "left"
        )

        # FACILITY TYPE DIM
        .join(
            d_fac.alias("dfac"),
            col("s.facility_type") == col("dfac.facility_type"),
            "left"
        )

        .select(
            col("s.inspection_id").alias("inspection_key"),
            col("dd.date_key"),
            col("dc.city_key"),
            col("dr.restaurant_key"),
            col("dv.violation_key"),
            col("res.inspection_result_key"),
            col("dt.inspection_type_key"),
            col("dfac.facility_type_key"),
            col("res.derived_score"),
            col("s.risk"),
            col("s.source_city").alias("record_source")
        )
    )

    return fact
