In [0]:
import dlt
from pyspark.sql import functions as F
import re
@dlt.table(
    name="bronze_chicago_raw",
    comment="Raw Chicago food inspections from multiple TSV files."
)
def bronze_chicago_raw():
    base_path = "/Volumes/food_inspections_catalog/bronze/raw_data/"
    chicago_files = [
        f"{base_path}Chicago_2021-2022.tsv",
        f"{base_path}Chicago_2022-2023.tsv",
        f"{base_path}Chicago_2023-2024.tsv",
        f"{base_path}Chicago_2024-2025.tsv",
        f"{base_path}Chicago_2025-partyear.tsv"
    ]

    df = (
        spark.read.option("header", True)
        .option("delimiter", "\t")
        .option("multiLine", True)
        .csv(chicago_files)
    )

    df = df.toDF(*[re.sub(r'[^A-Za-z0-9_]', '_', c.strip().replace(' ', '_')) for c in df.columns])

    return df.withColumn("source_city", F.lit("Chicago"))


@dlt.table(
    name="bronze_dallas_raw",
    comment="Raw Dallas food inspections from multiple TSV files."
)
def bronze_dallas_raw():
    base_path = "/Volumes/food_inspections_catalog/bronze/raw_data/"
    dallas_files = [
        f"{base_path}Dallas_2021-2022.tsv",
        f"{base_path}Dallas_2022-2023.tsv",
        f"{base_path}Dallas_2023-2024.tsv",
        f"{base_path}Dallas_2024-2025.tsv",
        f"{base_path}Dallas_2025-partyear.tsv"
    ]

    df = (
        spark.read.option("header", True)
        .option("delimiter", "\t")
        .option("multiLine", True)
        .csv(dallas_files)
    )
    df = df.toDF(*[re.sub(r'[^A-Za-z0-9_]', '_', c.strip().replace(' ', '_')) for c in df.columns])

    return df.withColumn("source_city", F.lit("Dallas"))


In [0]:
import dlt
from pyspark.sql import functions as F
import re

@dlt.table(
    name="silver_chicago_clean",
    comment="Cleansed Chicago food inspection data with derived violation score and validation rules."
)
@dlt.expect_all_or_drop({
    "non_null_fields": "DBA_Name IS NOT NULL AND Inspection_Date IS NOT NULL AND Inspection_Type IS NOT NULL AND Zip IS NOT NULL AND Results IS NOT NULL",
    "valid_zip": "Zip RLIKE '^[0-9]{5}$'"
})
def silver_chicago_clean():
    df = dlt.read("bronze_chicago_raw")

    df = (
        df.withColumn("Zip", F.trim("Zip"))
          .withColumn("Zip", F.regexp_extract(F.col("Zip"), r"(\d{5})", 1))
          .withColumn("Inspection_Date", F.to_date("Inspection_Date"))
          .withColumn("Results", F.trim("Results"))
          .withColumn("Facility_Type", F.trim("Facility_Type"))
    )

    df = df.withColumn(
        "Violation_Score",
        F.when(F.col("Results") == "Pass", 90)
         .when(F.col("Results") == "Pass w/ Conditions", 80)
         .when(F.col("Results") == "Fail", 70)
         .when(F.col("Results") == "No Entry", 0)
         .otherwise(None)
    )

    return df.dropDuplicates(["Inspection_Date", "License__"])


In [0]:
@dlt.table(
    name="silver_dallas_clean",
    comment="Cleansed Dallas food inspection data standardized to match Chicago schema."
)
@dlt.expect_all_or_drop({
    "non_null_fields": "Inspection_Date IS NOT NULL"
})
def silver_dallas_clean():
    df = dlt.read("bronze_dallas_raw")

    col_map = {
        "Restaurant_Name": "DBA_Name",
        "Business_Name": "DBA_Name",
        "Inspection_Score": "Violation_Score",
        "Score": "Violation_Score",
        "Inspection_Date": "Inspection_Date",
        "Date": "Inspection_Date",
        "Inspection_Type": "Inspection_Type",
        "Zip_Code": "Zip",
        "ZipCode": "Zip"
    }

    for old, new in col_map.items():
        if old in df.columns:
            df = df.withColumnRenamed(old, new)

    address_parts = []
    for part in ["Street_Number", "Street_Name", "Street_Type"]:
        if part in df.columns:
            address_parts.append(F.col(part))
    if address_parts:
        df = df.withColumn("Address", F.concat_ws(" ", *address_parts))
    else:
        df = df.withColumn("Address", F.lit(None).cast("string"))

    required_cols = ["DBA_Name", "Address", "Zip", "Inspection_Date", "Inspection_Type", "Violation_Score"]
    for c in required_cols:
        if c not in df.columns:
            df = df.withColumn(c, F.lit(None).cast("string"))

    df = (
        df.withColumn("Facility_Type", F.lit("Restaurant"))
          .withColumn("City", F.lit("Dallas"))
          .withColumn("State", F.lit("TX"))
          .withColumn("source_city", F.lit("Dallas"))
          .withColumn("Inspection_Date", F.to_date("Inspection_Date"))
          .withColumn("Zip", F.regexp_extract(F.col("Zip"), r"(\d{5})", 1))
          .withColumn("Violation_Score", F.col("Violation_Score").cast("int"))
    )

    return df.dropDuplicates(["DBA_Name", "Address", "Inspection_Date"])


In [0]:
import dlt
from pyspark.sql import functions as F
from pyspark.sql.window import Window

@dlt.table(
    name="dim_facility",
    comment="Facility dimension combining Chicago and Dallas data with SCD Type 2 tracking."
)
def dim_facility():
    chi = (
        dlt.read("silver_chicago_clean")
        .select(
            F.col("License__").alias("facility_id"),
            F.col("DBA_Name").alias("facility_name"),
            F.col("Facility_Type").alias("facility_type"),
            F.lit("Chicago").alias("city"),
            F.col("Inspection_Date").alias("effective_date")
        ).dropna(subset=["facility_name"])
    )

    dal = (
        dlt.read("silver_dallas_clean")
        .select(
            F.monotonically_increasing_id().alias("facility_id"),
            F.col("DBA_Name").alias("facility_name"),
            F.col("Facility_Type").alias("facility_type"),
            F.lit("Dallas").alias("city"),
            F.col("Inspection_Date").alias("effective_date")
        ).dropna(subset=["facility_name"])
    )

    df = chi.unionByName(dal, allowMissingColumns=True)
    w = Window.partitionBy("facility_id").orderBy("effective_date")

    return (
        df.withColumn("start_date", F.col("effective_date"))
          .withColumn("end_date", F.lead("effective_date").over(w))
          .withColumn("is_current", F.when(F.col("end_date").isNull(), True).otherwise(False))
          .dropDuplicates(["facility_id", "facility_name", "facility_type", "city", "effective_date"])
    )


In [0]:
@dlt.table(
    name="dim_location",
    comment="Location dimension combining ZIP, city, and state across Chicago and Dallas."
)
def dim_location():
    chi = (
        dlt.read("silver_chicago_clean")
        .select(F.col("Zip").alias("zip_code"), F.col("City").alias("city"), F.col("State").alias("state"))
    )
    dal = (
        dlt.read("silver_dallas_clean")
        .select(F.col("Zip").alias("zip_code"), F.col("City").alias("city"), F.col("State").alias("state"))
    )

    return (
        chi.unionByName(dal, allowMissingColumns=True)
           .filter(F.col("zip_code").isNotNull())
           .dropDuplicates(["zip_code"])
    )


In [0]:
@dlt.table(
    name="dim_date",
    comment="Date dimension for all inspection records."
)
def dim_date():
    chi = dlt.read("silver_chicago_clean").select(F.col("Inspection_Date").alias("date"))
    dal = dlt.read("silver_dallas_clean").select(F.col("Inspection_Date").alias("date"))

    return (
        chi.unionByName(dal, allowMissingColumns=True)
           .filter(F.col("date").isNotNull())
           .dropDuplicates(["date"])
           .withColumn("year", F.year("date"))
           .withColumn("month", F.month("date"))
           .withColumn("day", F.dayofmonth("date"))
           .withColumn("weekday", F.date_format("date", "EEEE"))
    )


In [0]:
@dlt.table(
    name="fact_inspection",
    comment="Fact table combining all inspection events from both cities."
)
def fact_inspection():
    chi = (
        dlt.read("silver_chicago_clean")
        .select(
            F.col("Inspection_ID").alias("inspection_id"),
            F.col("License__").alias("facility_id"),
            F.col("Inspection_Date").alias("inspection_date"),
            F.col("Inspection_Type").alias("inspection_type"),
            F.col("Results").alias("result"),
            F.col("Violation_Score").alias("violation_score"),
            F.col("Risk").alias("risk_level"),
            F.col("Zip").alias("zip_code"),
            F.col("source_city").alias("city")
        )
    )

    dal_src = dlt.read("silver_dallas_clean")
    if "Results" not in dal_src.columns:
        dal_src = dal_src.withColumn("Results", F.lit(None).cast("string"))
    if "Inspection_Type" not in dal_src.columns:
        dal_src = dal_src.withColumn("Inspection_Type", F.lit(None).cast("string"))

    dal = (
        dal_src
        .withColumn(
            "inspection_id",
            F.md5(F.concat_ws("|", F.col("DBA_Name"), F.col("Address"), F.col("Inspection_Date").cast("string")))
        )
        .select(
            F.col("inspection_id"),
            F.md5(F.concat_ws("|", F.col("DBA_Name"), F.col("Address"))).alias("facility_id"),
            F.col("Inspection_Date").alias("inspection_date"),
            F.col("Inspection_Type").alias("inspection_type"),
            F.col("Results").alias("result"),
            F.col("Violation_Score").alias("violation_score"),
            F.lit(None).alias("risk_level"),
            F.col("Zip").alias("zip_code"),
            F.col("source_city").alias("city")
        )
    )

    return chi.unionByName(dal, allowMissingColumns=True)  
