In [0]:
import dlt
from pyspark.sql.functions import col, upper, trim, row_number, coalesce, lit
from pyspark.sql.window import Window

# GOLD: dim_customers
@dlt.table(comment="Dimension table for customers")
def dim_customers():
    silver_cust = dlt.read("silver_crm_cust_info")
    erp_cust = dlt.read("silver_erp_cust_az12")
    erp_loc = dlt.read("silver_erp_loc_a101")

    window_spec = Window.orderBy("cst_id")

    return (
        silver_cust.alias("ci")
        .join(erp_cust.alias("ca"), trim(upper(col("ci.cst_key"))) == trim(upper(col("ca.cid"))), "left")
        .join(erp_loc.alias("la"), trim(upper(col("ci.cst_key"))) == trim(upper(col("la.cid"))), "left")
        .withColumn("customer_key", row_number().over(window_spec))
        .select(
            "customer_key",
            "ci.cst_id",
            "ci.cst_key",
            "ci.cst_firstname",
            "ci.cst_lastname",
            "la.cntry",
            "ci.cst_marital_status",
            coalesce(col("ci.cst_gndr"), col("ca.gen"), lit("n/a")).alias("gender"),
            "ca.bdate",
            "ci.cst_create_date"
        )
    )

# GOLD: dim_products
@dlt.table(comment="Dimension table for products")
def dim_products():
    products = dlt.read("silver_crm_prd_info")
    categories = dlt.read("silver_erp_px_cat_g1v2")

    window_spec = Window.orderBy("prd_start_dt", "prd_key")

    return (
        products.alias("pn")
        .join(categories.alias("pc"), trim(upper(col("pn.cat_id"))) == trim(upper(col("pc.id"))), "left")
        .filter(col("prd_end_dt").isNull())
        .withColumn("product_key", row_number().over(window_spec))
        .select(
            "product_key",
            "pn.prd_id",
            "pn.prd_key",
            "pn.prd_nm",
            "pn.cat_id",
            "pc.cat",
            "pc.subcat",
            "pc.maintenance",
            "pn.prd_cost",
            "pn.prd_line",
            "pn.prd_start_dt"
        )
    )

# GOLD: fact_sales
@dlt.table(comment="Fact table for sales transactions")
def fact_sales():
    sales = dlt.read("silver_crm_sales_details")
    products = dlt.read("dim_products")
    customers = dlt.read("dim_customers")

    return (
        sales.alias("sd")
        .join(products.alias("pr"), trim(upper(col("sd.sls_prd_key"))) == trim(upper(col("pr.prd_key"))), "left")
        .join(customers.alias("cu"), trim(upper(col("sd.sls_cust_id"))) == trim(upper(col("cu.cst_key"))), "left")
        .select(
            "sd.sls_ord_num",
            "pr.product_key",
            "cu.customer_key",
            "sd.sls_order_dt",
            "sd.sls_ship_dt",
            "sd.sls_due_dt",
            "sd.sls_sales",
            "sd.sls_quantity",
            "sd.sls_price"
        )
    )

