In [0]:
from pyspark.sql.functions import col, to_timestamp, lit, sha2, concat_ws, coalesce, current_timestamp

# ------------------------------
# Table Configurations
# ------------------------------
bronze_table = "bronze_dev.bronze_dev.stg_bronze_superstore"
silver_fact_table = "silver_dev.silver_dev.silver_superstore"
silver_customers_table = "silver_dev.silver_dev.dim_customers"
silver_products_table = "silver_dev.silver_dev.dim_products"
silver_date_table = "silver_dev.silver_dev.dim_date"

# ------------------------------
# 1. Read Bronze Table
# ------------------------------
df_bronze = spark.table(bronze_table)

# ------------------------------
# 2. Data Quality: Filter negative quantities and standardize dates
# ------------------------------
df_silver = df_bronze.filter(col("quantity") > 0) \
                     .withColumn("order_date", to_timestamp("order_date", "MM/dd/yyyy")) \
                     .withColumn("ship_date", to_timestamp("ship_date", "MM/dd/yyyy")) \
                     .withColumn("city", coalesce(col("city"), lit("UNKNOWN"))) \
                     .withColumn("postal_code", coalesce(col("postal_code"), lit("00000")).cast("int") \
                     .withColumn("quantity", col("quantity").cast("int")) \
                     .withColumn("sales", col("sales").cast("double")) \
                     .withColumn("discount", col("discount").cast("double")) \
                     .withColumn("profit", col("profit").cast("double")))

# ------------------------------
# 3. Rename Columns to Business-Friendly Names
# ------------------------------
df_silver = df_silver.withColumnRenamed("category", "product_category") \
                     .withColumnRenamed("sub_category", "product_sub_category")

# ------------------------------
# 4. Create Record Hash for Idempotency
# ------------------------------
business_cols = [c for c in df_silver.columns if c not in ["_metadata", "_ingest_ts"]]
df_silver = df_silver.withColumn(
    "_record_hash",
    sha2(concat_ws("||", *[col(c).cast("string") for c in business_cols]), 256)
).withColumn("_ingest_ts", current_timestamp())

# ------------------------------
# 5. Maintain Dimension Tables
# ------------------------------

# Customers Dimension
dim_customers = df_silver.select("customer_name", "city", "postal_code").distinct() \
                          .withColumn("customer_id", sha2(col("customer_name"), 256))

if spark.catalog.tableExists(silver_customers_table):
    existing_customers = spark.table(silver_customers_table).select("customer_id")
    new_customers = dim_customers.join(existing_customers, on="customer_id", how="left_anti")
    if new_customers.count() > 0:
        new_customers.write.format("delta").mode("append").saveAsTable(silver_customers_table)
else:
    dim_customers.write.format("delta").mode("overwrite").saveAsTable(silver_customers_table)

# Products Dimension
dim_products = df_silver.select("product_id", "product_category", "product_sub_category").distinct() \
                        .withColumn("product_key", sha2(col("product_id"), 256))

if spark.catalog.tableExists(silver_products_table):
    existing_products = spark.table(silver_products_table).select("product_key")
    new_products = dim_products.join(existing_products, on="product_key", how="left_anti")
    if new_products.count() > 0:
        new_products.write.format("delta").mode("append").saveAsTable(silver_products_table)
else:
    dim_products.write.format("delta").mode("overwrite").saveAsTable(silver_products_table)

# Date Dimension
dim_date = df_silver.select("order_date").distinct() \
                    .withColumn("order_date_key", sha2(col("order_date").cast("string"), 256))

if spark.catalog.tableExists(silver_date_table):
    existing_date = spark.table(silver_date_table).select("order_date_key")
    new_dates = dim_date.join(existing_date, on="order_date_key", how="left_anti")
    if new_dates.count() > 0:
        new_dates.write.format("delta").mode("append").saveAsTable(silver_date_table)
else:
    dim_date.write.format("delta").mode("overwrite").saveAsTable(silver_date_table)

# ------------------------------
# 6. Idempotent Load for Silver Fact Table
# ------------------------------
if spark.catalog.tableExists(silver_fact_table):
    df_existing = spark.table(silver_fact_table).select("_record_hash")
    df_new = df_silver.join(df_existing, on="_record_hash", how="left_anti")
else:
    df_new = df_silver

# ------------------------------
# 7. Write New Rows Only
# ------------------------------
if df_new.count() > 0:
    df_new.write.format("delta").mode("append").saveAsTable(silver_fact_table)
    print(f"Inserted {df_new.count()} new rows into Silver fact table.")
else:
    print("No new rows to insert.")

print("Silver Layer processing completed successfully!")



In [0]:
spark.sql(f' select distinct quantity from bronze_dev.bronze_dev.stg_bronze_superstore').display()

