In [0]:
from pyspark.sql import functions as F
from delta.tables import DeltaTable

CATALOG_DB = "sales_db"

BRONZE_TABLE = f"{CATALOG_DB}.bronze.orders_raw"

SILVER_SCHEMA = "silver"
table_name = "orders_clean"
silver_table = f"{CATALOG_DB}.{SILVER_SCHEMA}.{table_name}"

In [0]:
# CREATE SCHEMA IF NOT EXISTS

spark.sql(f"CREATE DATABASE IF NOT EXISTS {CATALOG_DB}")
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {CATALOG_DB}.{SILVER_SCHEMA}")


# READ FROM BRONZE TABLE

b = spark.table(BRONZE_TABLE)


In [0]:
# data cleaning

s = (
    b
      # Types 
      .withColumn("order_date", F.to_date("order_date"))
      .withColumn("ship_date", F.to_date("ship_date"))
      .withColumn("sales", F.col("sales").cast("decimal(18,2)"))
      .withColumn("postal_code", F.col("postal_code").cast("string"))

      # Trims 
      .withColumn("ship_mode", F.trim(F.col("ship_mode")))
      .withColumn("customer_name", F.trim(F.col("customer_name")))
      .withColumn("segment", F.trim(F.col("segment")))
      .withColumn("country", F.trim(F.col("country")))
      .withColumn("city", F.trim(F.col("city")))
      .withColumn("state", F.trim(F.col("state")))
      .withColumn("region", F.trim(F.col("region")))
      .withColumn("category", F.trim(F.col("category")))
      .withColumn("sub_category", F.trim(F.col("sub_category")))
      .withColumn("product_name", F.trim(F.col("product_name")))

      # Derived 
      .withColumn("ship_delay_days", F.datediff("ship_date", "order_date"))
      .withColumn("order_year", F.year("order_date"))

      # Business Key 
      .withColumn(
          "order_line_key",
          F.sha2(
              F.concat_ws("||",
                  F.coalesce(F.col("order_id"), F.lit("")),
                  F.coalesce(F.col("product_id"), F.lit("")),
                  F.coalesce(F.col("customer_id"), F.lit("")),
                  F.coalesce(F.col("order_date").cast("string"), F.lit("")),
                  F.coalesce(F.col("ship_date").cast("string"), F.lit("")),
                  F.coalesce(F.col("ship_mode"), F.lit("")),
                  F.coalesce(F.col("postal_code"), F.lit("")),
                  F.coalesce(F.col("region"), F.lit(""))
              ),
              256
          )
      )

      #Select  Columns
      .select(
          "order_line_key",
          "row_id", "order_id", "order_date", "ship_date", "ship_mode",
          "customer_id", "customer_name", "segment",
          "country", "state", "city", "postal_code", "region",
          "product_id", "product_name", "category", "sub_category",
          "sales", "ship_delay_days",
          "ingest_ts", "source_file",
          "order_year"
      )
      .dropDuplicates(["order_line_key"])
)

In [0]:
# CREATE TABLE IF NOT EXISTS

if not spark._jsparkSession.catalog().tableExists(silver_table):
    (
        s.limit(0)
         .write
         .format("delta")
         .mode("overwrite")
         .partitionBy("order_year")
         .saveAsTable(silver_table)
    )
    print(f"Created Silver table: {table_name}")


# MERGE

delta_tgt = DeltaTable.forName(spark, silver_table)

(
    delta_tgt.alias("tgt")
             .merge(
                 s.alias("src"),
                 "tgt.order_line_key = src.order_line_key"
             )
             .whenMatchedUpdateAll()
             .whenNotMatchedInsertAll()
             .execute()
)

print(f"Silver MERGE complete: {table_name}")
print("Rows in Silver:", spark.table(silver_table).count())

Silver MERGE complete: orders_clean
Rows in Silver: 9792


In [0]:

query = f"SELECT * FROM {CATALOG_DB}.{SILVER_SCHEMA}.orders_clean limit 5"
spark.sql(query).show()

+--------------------+------+--------------+----------+----------+--------------+-----------+---------------+-----------+-------------+--------+-------------+-----------+-------+---------------+--------------------+---------------+------------+-------+---------------+--------------------+--------------+----------+
|      order_line_key|row_id|      order_id|order_date| ship_date|     ship_mode|customer_id|  customer_name|    segment|      country|   state|         city|postal_code| region|     product_id|        product_name|       category|sub_category|  sales|ship_delay_days|           ingest_ts|   source_file|order_year|
+--------------------+------+--------------+----------+----------+--------------+-----------+---------------+-----------+-------------+--------+-------------+-----------+-------+---------------+--------------------+---------------+------------+-------+---------------+--------------------+--------------+----------+
|008c40574e2e0ebe9...|  2575|CA-2015-129091|2015-11-