Dim_Date

In [0]:
import dlt
from pyspark.sql.functions import *
from pyspark.sql.window import Window
from functools import reduce
from operator import add
 
# =====================================================
# DIMENSION TABLES - ALL STREAMING WITH NATURAL KEYS
# =====================================================
 
@dlt.table(
    name="gold.dim_date",
    comment="Date dimension table with calendar attributes - Streaming with Natural Keys",
    table_properties={
        "quality": "gold",
        "delta.enableChangeDataFeed": "true"
    }
)
def dim_date():
    """Create date dimension as a streaming table using date as natural key"""
   
    # Read from silver streaming tables
    chicago_dates = dlt.read_stream("silver.chicago_inspections_silver").select("inspection_date")
    dallas_dates = dlt.read_stream("silver.dallas_inspections_silver").select("inspection_date")
   
    # Union all dates
    all_dates = chicago_dates.union(dallas_dates).select(
        to_date(col("inspection_date")).alias("date_col")
    ).dropDuplicates(["date_col"])
   
    # Create date dimension with natural key
    date_dim = all_dates.select(
        date_format(col("date_col"), "yyyyMMdd").cast("int").alias("date_id_nk_pk"),  # Natural key
        col("date_col").alias("date_num"),
        date_format(col("date_col"), "EEEE").alias("day_name"),
        dayofmonth(col("date_col")).alias("day_num"),
        date_format(col("date_col"), "E").alias("day_abbr"),
        when(dayofweek(col("date_col")).isin(1, 7), True).otherwise(False).alias("is_weekend"),
        date_format(col("date_col"), "MMMM").alias("month_name"),
        date_format(col("date_col"), "MMM").alias("month_abbr"),
        month(col("date_col")).alias("month_num"),
        dayofweek(col("date_col")).alias("dt"),
        year(col("date_col")).alias("year_num"),
        date_format(col("date_col"), "yyyyMMdd").cast("varchar(50)").alias("data_source_id"),
        date_format(col("date_col"), "yyyy-MM-dd").alias("data_workflow_name"),
        date_format(col("date_col"), "yyyyMMdd").cast("varchar(50)").alias("dw_job_id"),
        current_timestamp().alias("dw_load_dt")
    )
   
    return date_dim

Dim_Location 

In [0]:
@dlt.table(
    name="gold.dim_location",
    comment="Location dimension with address and geographic details - Streaming with Natural Keys",
    table_properties={
        "quality": "gold",
        "delta.enableChangeDataFeed": "true"
    }
)
def dim_location():
    """Create location dimension as a streaming table using location_id as natural key"""
   
    # Chicago locations
    chicago_locations = dlt.read_stream("silver.chicago_inspections_silver").select(
        concat_ws("_", lit("CHI"), col("address"), col("zip")).alias("location_id_nk_pk"),  # Natural key
        col("address").alias("street_address"),
        col("city"),
        col("state"),
        col("zip"),
        col("latitude").cast("double"),
        col("longitude").cast("double"),
        lit("CHI").alias("source_city")
    ).dropDuplicates(["location_id_nk_pk"])
   
    # Dallas locations
    dallas_locations = dlt.read_stream("silver.dallas_inspections_silver").select(
        concat_ws("_", lit("DAL"), col("street_address"), col("zip_code")).alias("location_id_nk_pk"),  # Natural key
        col("street_address"),
        lit("Dallas").alias("city"),
        lit("TX").alias("state"),
        col("zip_code").alias("zip"),
        col("lat").cast("double").alias("latitude"),
        col("long").cast("double").alias("longitude"),
        lit("DAL").alias("source_city")
    ).dropDuplicates(["location_id_nk_pk"])
   
    # Union locations
    all_locations = chicago_locations.unionByName(dallas_locations)
   
    # Format columns without surrogate keys
    dim_location = all_locations.select(
        col("location_id_nk_pk").cast("varchar(200)"),  # Natural key as primary key
        col("street_address").cast("varchar(150)"),
        col("city").cast("varchar(50)"),
        col("state").cast("varchar(50)"),
        col("zip").cast("varchar(50)"),
        col("latitude").cast("float"),
        col("longitude").cast("float"),
        col("location_id_nk_pk").cast("varchar(200)").alias("location"),
        col("location_id_nk_pk").cast("varchar(200)").alias("data_source_id"),
        col("source_city").cast("varchar(50)").alias("data_workflow_name"),
        col("location_id_nk_pk").cast("varchar(200)").alias("dw_job_id"),
        current_timestamp().alias("dw_load_dt")
    )
   
    return dim_location

# Restaurant dimension with SCD Type 2 - using natural key


In [0]:
@dlt.view(
    name="restaurant_updates"
)
def restaurant_updates():
    """Prepare restaurant updates for SCD2 processing from streaming silver tables"""
   
    # Chicago restaurants
    chicago_restaurants = dlt.read_stream("silver.chicago_inspections_silver").select(
        concat(lit("CHI_"), col("license"), lit("_"), col("latitude"), lit("_"), col("longitude")).alias("restaurant_id_nk"),  # Natural key
        col("dba_name").alias("restaurant_name"),
        col("aka_name").alias("aka_known_as_name"),
        col("license").cast("double").cast("bigint").alias("license_number"),
        col("facility_type"),
        col("risk").alias("risk_level"),
        lit("CHI").alias("source_city"),
        col("_ingestion_timestamp").alias("source_timestamp")
    ).filter(
        col("dba_name").isNotNull() &
        col("license").isNotNull() &
        col("latitude").isNotNull() &
        col("longitude").isNotNull()
    )
   
    # Dallas restaurants
    dallas_restaurants = dlt.read_stream("silver.dallas_inspections_silver").select(
        concat(lit("DAL_"), col("restaurant_name"), lit("_"), col("lat"), lit("_"), col("long")).alias("restaurant_id_nk"),  # Natural key
        col("restaurant_name"),
        lit(None).cast("varchar(100)").alias("aka_known_as_name"),
        lit(None).cast("bigint").alias("license_number"),
        lit("Restaurant").alias("facility_type"),
        when(col("inspection_score") < 80, "Risk 1 (High)")
        .when(col("inspection_score") < 90, "Risk 2 (Medium)")
        .otherwise("Risk 3 (Low)").alias("risk_level"),
        lit("DAL").alias("source_city"),
        col("_ingestion_timestamp").alias("source_timestamp")
    ).filter(
        col("restaurant_name").isNotNull() &
        col("lat").isNotNull() &
        col("long").isNotNull()
    )
   
    # Union and deduplicate
    combined_restaurants = chicago_restaurants.unionByName(dallas_restaurants) \
        .withWatermark("source_timestamp", "1 hour") \
        .dropDuplicates(["restaurant_id_nk", "source_timestamp"])
   
    # Drop any reserved SCD2 columns if they exist to ensure clean processing
    scd2_reserved_columns = ["_change_type", "_commit_version", "_commit_timestamp"]
    for col_name in scd2_reserved_columns:
        if col_name in combined_restaurants.columns:
            combined_restaurants = combined_restaurants.drop(col_name)
   
    return combined_restaurants
 
# Apply SCD Type 2 using DLT's built-in functionality
dlt.create_streaming_table(
    name="gold.dim_restaurant_scd2",
    comment="Restaurant dimension with SCD Type 2 - tracks historical changes with natural keys",
    table_properties={
        "quality": "gold"
        # CDF removed - SCD2 already tracks changes internally
    }
)
 
dlt.apply_changes(
    target="gold.dim_restaurant_scd2",
    source="restaurant_updates",
    keys=["restaurant_id_nk"],  # Natural key
    sequence_by="source_timestamp",
    apply_as_deletes=None,
    except_column_list=["source_timestamp"],
    stored_as_scd_type=2,
    track_history_column_list=[
        "restaurant_name",
        "aka_known_as_name",
        "risk_level",
        "facility_type",
        "license_number"
    ]
)
 
@dlt.table(
    name="gold.dim_violation",
    comment="Violation codes and descriptions dimension - Streaming with Natural Keys",
    table_properties={
        "quality": "gold",
        "delta.enableChangeDataFeed": "true"
    }
)
def dim_violation():
    """Create violation dimension as a streaming table using composite natural key"""

 # Parse Chicago violations

In [0]:
chicago_violations = dlt.read_stream("silver.chicago_inspections_silver") \
        .select(explode(split(col("violations"), r"\|")).alias("violation_text")) \
        .filter(col("violation_text") != "") \
        .select(
            regexp_extract(col("violation_text"), r"^(\d+)\.", 1).alias("violation_code"),
            trim(regexp_extract(col("violation_text"), r"^\d+\.\s*(.+?)\s*-\s*Comments:", 1)).alias("violation_title"),
            lit("CHI").alias("source_city")
        ).filter(col("violation_code") != "") \
        .dropDuplicates(["violation_code", "source_city"])
   
    # Parse Dallas violations
    dallas_violations_list = []
    for i in range(1, 26):
        dallas_viol = dlt.read_stream("silver.dallas_inspections_silver") \
            .select(
                regexp_extract(col(f"violation_description_{i}"), r"^\*?(\d+)", 1).alias("violation_code"),
                trim(regexp_replace(col(f"violation_description_{i}"), r"^\*?\d+\s*", "")).alias("violation_title"),
                lit("DAL").alias("source_city")
            ).filter(col(f"violation_description_{i}").isNotNull())
        dallas_violations_list.append(dallas_viol)
   
    # Union all Dallas violations
    dallas_violations = reduce(lambda df1, df2: df1.unionByName(df2), dallas_violations_list) \
        .dropDuplicates(["violation_code", "source_city"])
   
    # Union all violations
    all_violations = chicago_violations.unionByName(dallas_violations)
   
    # Create dimension with composite natural key
    dim_violation = all_violations.select(
        concat(col("source_city"), lit("_"), col("violation_code")).cast("varchar(100)").alias("violation_id_nk_pk"),  # Natural key
        col("violation_code").cast("varchar(50)"),
        col("violation_title").cast("varchar(1000)").alias("violation_description"),
        concat(col("source_city"), lit("_"), col("violation_code")).cast("varchar(100)").alias("data_source_id"),
        col("source_city").cast("varchar(50)").alias("data_workflow_name"),
        col("violation_code").cast("varchar(50)").alias("dw_job_id"),
        current_timestamp().alias("dw_load_dt")
    )
   
    return dim_violation