In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window

### Transform `crm_cust_info` table and save to silver

In [0]:
# Drop existing table if exists
spark.sql("DROP TABLE IF EXISTS etl.silver.crm_cust_info")

# Define window for deduplication
window_spec = Window.partitionBy("cst_id").orderBy(desc("cst_create_date"))

# Read from bronze and apply transformations
df = (
    spark.read.table("etl.bronze.crm_cust_info")
    .filter(col("cst_id").isNotNull())
    .withColumn("cst_id", col("cst_id").cast("int"))
    .withColumn("flag", row_number().over(window_spec))
    .filter(col("flag") == 1)
    .drop("flag")
    .withColumn("cst_firstname", trim(col("cst_firstname")))
    .withColumn("cst_lastname", trim(col("cst_lastname")))
    .withColumn(
        "cst_marital_status",
        when(col("cst_marital_status").isNull(), "n/a")
        .when(trim(col("cst_marital_status")) == "M", "Married")
        .when(trim(col("cst_marital_status")) == "S", "Single")
        .otherwise("n/a"),
    )
    .withColumn(
        "cst_gndr",
        when(col("cst_gndr").isNull(), "n/a")
        .when(trim(col("cst_gndr")) == "M", "Male")
        .when(trim(col("cst_gndr")) == "F", "Female")
        .otherwise("n/a"),
    )
    .withColumn("cst_create_date", to_date(col("cst_create_date"), "yyyy-MM-dd"))
).orderBy(asc("cst_id"))

# Save to silver table
df.write.mode("overwrite").format("delta").saveAsTable("etl.silver.crm_cust_info")

### Transform `crm_prd_info` table and save to silver

In [0]:
# Drop existing table if exists
spark.sql("DROP TABLE IF EXISTS etl.silver.crm_prd_info")

df = (
    spark.read.table("etl.bronze.crm_prd_info")
    .withColumn("cat_id", regexp_replace(substring(col("prd_key"), 1, 5), "-", "_"))
    .withColumn("prd_key", substring(col("prd_key"), 7, length(col("prd_key"))))
    .withColumn("prd_nm", trim(col("prd_nm")))
    .withColumn("prd_cost", col("prd_cost").cast("double"))
    .fillna(0, subset=["prd_cost"])
    .withColumn(
        "prd_line",
        when(col("prd_line").isNull(), "n/a")
        .when(trim(col("prd_line")) == "M", "Mountain")
        .when(trim(col("prd_line")) == "R", "Road")
        .when(trim(col("prd_line")) == "S", "Other sales")
        .when(trim(col("prd_line")) == "T", "Touring"),
    )
    .withColumn("prd_start_dt", to_date(col("prd_start_dt"), "yyyy-MM-dd"))
    .withColumn("prd_end_dt", to_date(col("prd_end_dt"), "yyyy-MM-dd"))
    .withColumn(
        "prd_end_dt",
        when(
            col("prd_end_dt") < col("prd_start_dt"),
            lead("prd_start_dt").over(
                Window.partitionBy("prd_key").orderBy(asc("prd_start_dt"))
            )
            - 1,
        ).otherwise(col("prd_end_dt")),
    )
).orderBy(asc("prd_id"))

# Save to silver table
df.select(
    "prd_id",
    "prd_key",
    "cat_id",
    "prd_nm",
    "prd_cost",
    "prd_line",
    "prd_start_dt",
    "prd_end_dt",
).write.mode("overwrite").format("delta").saveAsTable("etl.silver.crm_prd_info")

### Transform `crm_sales_details` table and save to silver

In [0]:
# Drop existing table if exists
spark.sql("DROP TABLE IF EXISTS etl.silver.crm_sales_details")

df = (
    spark.read.table("etl.bronze.crm_sales_details")
    .withColumn("sls_cust_id", col("sls_cust_id").cast("int"))
    .withColumn(
        "sls_order_dt",
        when(
            (col("sls_order_dt") == 0) | (length(col("sls_order_dt")) != 8), None
        ).otherwise(to_date(col("sls_order_dt"), "yyyyMMdd")),
    )
    .withColumn(
        "sls_ship_dt",
        when(
            (col("sls_ship_dt") == 0) | (length(col("sls_ship_dt")) != 8), None
        ).otherwise(to_date(col("sls_ship_dt"), "yyyyMMdd")),
    )
    .withColumn(
        "sls_due_dt",
        when(
            (col("sls_due_dt") == 0) | (length(col("sls_due_dt")) != 8), None
        ).otherwise(to_date(col("sls_due_dt"), "yyyyMMdd")),
    )
    .withColumn(
        "sls_sales",
        when(
            (col("sls_sales").cast("double") <= 0)
            | (col("sls_sales") != col("sls_quantity") * col("sls_price")),
            col("sls_quantity").cast("double") * col("sls_price").cast("double"),
        ).otherwise(col("sls_sales").cast("double")),
    )
    .withColumn(
        "sls_quantity",
        col("sls_quantity").cast("double"),
    )
    .withColumn(
        "sls_price",
        when(
            col("sls_price").cast("double") <= 0, col("sls_sales") / col("sls_quantity")
        ).otherwise(col("sls_price").cast("double")),
    )
)

# Save to silver table
df.write.mode("overwrite").format("delta").saveAsTable("etl.silver.crm_sales_details")

### Transform `erp_cust_az12` table and save to silver

In [0]:
# Drop existing table if exists
spark.sql("DROP TABLE IF EXISTS etl.silver.erp_cust_az12")

df = (
    spark.read.table("etl.bronze.erp_cust_az12")
    .withColumn(
        "BDATE",
        when(to_date(col("BDATE")) > current_date(), None).otherwise(
            to_date(col("BDATE"))
        ),
    )
    .withColumn(
        "GEN",
        when((trim(col("GEN")) == "M") | (trim(col("GEN")) == "Male"), "Male")
        .when((trim(col("GEN")) == "F") | (trim(col("GEN")) == "Female"), "Female")
        .otherwise("n/a"),
    )
    .withColumn(
        "CID",
        when(
            col("CID").like("NAS%"), substring(col("CID"), 4, length(col("CID")))
        ).otherwise(col("CID")),
    )
)

# Save to silver table
df.write.mode("overwrite").format("delta").saveAsTable("etl.silver.erp_cust_az12")

### Transform `erp_loc_a101` table and save to silver

In [0]:
# Drop existing table if exists
spark.sql("DROP TABLE IF EXISTS etl.silver.erp_loc_a101")

df = (
    spark.read.table("etl.bronze.erp_loc_a101")
    .withColumn(
        "CNTRY",
        when(
            (trim(col("CNTRY")) == "US") | (trim(col("CNTRY")) == "USA"),
            "United States",
        )
        .when(trim(col("CNTRY")) == "DE", "Germany")
        .when((trim(col("CNTRY")) == "") | col("CNTRY").isNull(), "n/a")
        .otherwise(trim(col("CNTRY"))),
    )
    .withColumn("CID", regexp_replace(col("CID"), "-", ""))
)

# Save to silver table
df.write.mode("overwrite").format("delta").saveAsTable("etl.silver.erp_loc_a101")

### Transform `erp_px_car_g1v2` table and save to silver

In [0]:
# Drop existing table if exists
spark.sql("DROP TABLE IF EXISTS etl.silver.erp_px_cat_g1v2")

df = spark.read.table("etl.bronze.erp_px_cat_g1v2")

# Save to silver table
df.write.mode("overwrite").format("delta").saveAsTable("etl.silver.erp_px_cat_g1v2")