# Silver Layer – Data Cleaning & Standardization (PySpark)
# 
This notebook transforms Bronze CRM and ERP tables into clean,
standardized Silver tables by applying data quality rules.


In [0]:
spark


<pyspark.sql.connect.session.SparkSession at 0xffccbb3e4ad0>

In [0]:
spark.sql("USE CATALOG pyspark_dataware_house_project")


DataFrame[]

In [0]:
spark.sql("USE SCHEMA default")


DataFrame[]

In [0]:
spark.sql("SELECT current_catalog(), current_schema()").show()


+--------------------+----------------+
|   current_catalog()|current_schema()|
+--------------------+----------------+
|pyspark_dataware_...|         default|
+--------------------+----------------+



In [0]:
sales_df = spark.table("sales_details")
product_df = spark.table("prd_info")
customer_df = spark.table("cust_info")

erp_customer_df = spark.table("cust_az12")
erp_location_df = spark.table("loc_a101")
erp_product_cat_df = spark.table("px_cat_g1v2")


# Silver Layer – Data Cleaning & Standardization (PySpark)

This notebook transforms Bronze tables into Silver tables by applying

data cleansing, standardization, deduplication, and business rules.


In [0]:
spark


<pyspark.sql.connect.session.SparkSession at 0xffccbb3e4ad0>

In [0]:
spark.sql("USE CATALOG pyspark_dataware_house_project")
spark.sql("USE SCHEMA default")


DataFrame[]

In [0]:
from pyspark.sql.functions import (
    col, trim, upper, when, row_number, substring,
    length, abs, lead, current_timestamp, regexp_replace
)
from pyspark.sql.window import Window


In [0]:
crm_cust_df = spark.table("cust_info")
crm_prd_df  = spark.table("prd_info")
crm_sales_df = spark.table("sales_details")

erp_cust_df = spark.table("cust_az12")
erp_loc_df  = spark.table("loc_a101")
erp_cat_df  = spark.table("px_cat_g1v2")


In [0]:
cust_window = Window.partitionBy("cst_id").orderBy(col("cst_create_date").desc())


In [0]:
silver_crm_cust_df = (
    crm_cust_df
    .filter(col("cst_id").isNotNull())
    .withColumn("rn", row_number().over(cust_window))
    .filter(col("rn") == 1)
    .drop("rn")
    .withColumn("cst_firstname", trim(col("cst_firstname")))
    .withColumn("cst_lastname", trim(col("cst_lastname")))
    .withColumn(
        "cst_marital_status",
        when(upper(trim(col("cst_marital_status"))) == "S", "single")
        .when(upper(trim(col("cst_marital_status"))) == "M", "married")
        .otherwise("n/a")
    )
    .withColumn(
        "cst_gndr",
        when(upper(trim(col("cst_gndr"))) == "F", "female")
        .when(upper(trim(col("cst_gndr"))) == "M", "male")
        .otherwise("n/a")
    )
    .withColumn("dwh_create_date", current_timestamp())
)


In [0]:
silver_crm_cust_df.write.mode("overwrite").saveAsTable(
    "silver_crm_cust_info"
)


In [0]:
prd_window = Window.partitionBy("prd_key").orderBy("prd_start_dt")


In [0]:
silver_crm_prd_df = (
    crm_prd_df
    .withColumn("cat_id", regexp_replace(substring(col("prd_key"), 1, 5), "-", "_"))
    .withColumn("prd_key", substring(col("prd_key"), 7, length(col("prd_key"))))
    .withColumn("prd_cost", when(col("prd_cost").isNull(), 0).otherwise(col("prd_cost")))
    .withColumn(
        "prd_line",
        when(upper(trim(col("prd_line"))) == "M", "Mountain")
        .when(upper(trim(col("prd_line"))) == "R", "Road")
        .when(upper(trim(col("prd_line"))) == "S", "Other Sales")
        .when(upper(trim(col("prd_line"))) == "T", "Touring")
        .otherwise("n/a")
    )
    .withColumn("prd_start_dt", col("prd_start_dt").cast("date"))
    .withColumn(
        "prd_end_dt",
        lead(col("prd_start_dt")).over(prd_window)
    )
    .withColumn("dwh_create_date", current_timestamp())
)


In [0]:
silver_crm_prd_df.write.mode("overwrite").saveAsTable(
    "silver_crm_prd_info"
)


In [0]:
from pyspark.sql.functions import to_date, length

silver_sales_df = (
    crm_sales_df
    .withColumn(
        "sls_order_dt",
        when(
            (col("sls_order_dt") == 0) | (length(col("sls_order_dt")) != 8),
            None
        ).otherwise(
            to_date(col("sls_order_dt").cast("string"), "yyyyMMdd")
        )
    )
    .withColumn(
        "sls_ship_dt",
        when(
            (col("sls_ship_dt") == 0) | (length(col("sls_ship_dt")) != 8),
            None
        ).otherwise(
            to_date(col("sls_ship_dt").cast("string"), "yyyyMMdd")
        )
    )
    .withColumn(
        "sls_due_dt",
        when(
            (col("sls_due_dt") == 0) | (length(col("sls_due_dt")) != 8),
            None
        ).otherwise(
            to_date(col("sls_due_dt").cast("string"), "yyyyMMdd")
        )
    )
    .withColumn(
        "sls_sales",
        when(
            (col("sls_sales").isNull()) |
            (col("sls_sales") <= 0) |
            (col("sls_sales") != col("sls_quantity") * abs(col("sls_price"))),
            col("sls_quantity") * abs(col("sls_price"))
        ).otherwise(col("sls_sales"))
    )
    .withColumn(
        "sls_price",
        when(
            (col("sls_price").isNull()) | (col("sls_price") <= 0),
            col("sls_sales") / col("sls_quantity")
        ).otherwise(col("sls_price"))
    )
    .withColumn("dwh_create_date", current_timestamp())
)


In [0]:
silver_sales_df.write.mode("overwrite").saveAsTable(
    "silver_crm_sale_details"
)


In [0]:
silver_erp_cust_df = (
    erp_cust_df
    .withColumn("cid", when(col("cid").startswith("NAS"), substring(col("cid"), 4, length(col("cid")))).otherwise(col("cid")))
    .withColumn("bdate", when(col("bdate") > current_timestamp(), None).otherwise(col("bdate")))
    .withColumn(
        "gen",
        when(upper(trim(col("gen"))).isin("F", "FEMALE"), "Female")
        .when(upper(trim(col("gen"))).isin("M", "MALE"), "Male")
        .otherwise("n/a")
    )
    .withColumn("dwh_create_date", current_timestamp())
)

silver_erp_cust_df.write.mode("overwrite").saveAsTable("silver_erp_cust_az12")


In [0]:
silver_erp_loc_df = (
    erp_loc_df
    .withColumn("cid", regexp_replace(col("cid"), "-", ""))
    .withColumn(
        "cntry",
        when(trim(col("cntry")) == "de", "germany")
        .when(trim(col("cntry")).isin("US", "USA"), "United States")
        .when(col("cntry").isNull() | (trim(col("cntry")) == ""), "n/a")
        .otherwise(trim(col("cntry")))
    )
    .withColumn("dwh_create_date", current_timestamp())
)

silver_erp_loc_df.write.mode("overwrite").saveAsTable("silver_erp_loc_a101")


In [0]:
silver_erp_cat_df = (
    erp_cat_df
    .withColumn("dwh_create_date", current_timestamp())
)

silver_erp_cat_df.write.mode("overwrite").saveAsTable("silver_erp_px_cat_g1v2")


## Silver Layer Completion

- Applied data cleansing and standardization rules
- Implemented deduplication and business logic
- Created curated Silver tables for CRM and ERP
- Data ready for Gold layer dimensional modeling
