Notebook: 02_silver_customers_products  
Layer   : Silver  
Purpose : Apply data quality check, deduplication, and SCD logic   
Design Decisions:  
*Customers are modelled as SCD Type 2 to preserve historical changes  
*Products are modeled as SCD Type 1 as historical tracking is not required  

In [0]:
from pyspark.sql.functions import (
    col, trim, coalesce, lit, current_timestamp, to_date, row_number, sha2, concat_ws
)

from pyspark.sql.window import Window
from delta.tables import DeltaTable

In [0]:
customers_bronze = spark.table("bronze.customers")

In [0]:
# Basic data quality checks
# customer_id and customer_name are mandatory business identifiers
customers_dq = (
    customers_bronze
    .filter(col("customer_id").isNotNull())
    .filter(col("customer_name").isNotNull())
)

[0;31m---------------------------------------------------------------------------[0m
[0;31mNameError[0m                                 Traceback (most recent call last)
File [0;32m<command-8781167362100327>, line 3[0m
[1;32m      1[0m [38;5;66;03m# Basic data quality checks[39;00m
[1;32m      2[0m customers_dq [38;5;241m=[39m (
[0;32m----> 3[0m     customers_bronze
[1;32m      4[0m     [38;5;241m.[39mfilter(col([38;5;124m"[39m[38;5;124mcustomer_id[39m[38;5;124m"[39m)[38;5;241m.[39misNotNull())
[1;32m      5[0m     [38;5;241m.[39mfilter(col([38;5;124m"[39m[38;5;124mcustomer_name[39m[38;5;124m"[39m)[38;5;241m.[39misNotNull())
[1;32m      6[0m )

[0;31mNameError[0m: name 'customers_bronze' is not defined

In [0]:
customers_cleaneddf = (
    customers_dq
    .select (
              col("customer_id"),
        trim(col("customer_name")).alias("customer_name"),
        trim(col("email")).alias("email"),
        trim(col("phone")).alias("phone"),
        trim(col("address")).alias("address"),
        trim(col("segment")).alias("segment"),
        coalesce(trim(col("country")), lit("UNKNOWN")).alias("country"),
        trim(col("city")).alias("city"),
        trim(col("state")).alias("state"),
        col("postal_code").cast("long").alias("postal_code"),
        trim(col("region")).alias("region"),
        to_date(col("ingestion_date")).alias("ingestion_date")
        )
)

In [0]:
# Deterministic deduplication
# Retain the latest record per customer based on ingestion_date


window_spec = (
    Window
    .partitionBy("customer_id")
    .orderBy((col("ingestion_date").desc()))
)
customers_dedupdf = (
        customers_cleaneddf
        .withColumn("row_num", row_number().over(window_spec))
        .filter(col("row_num")==1)
        .drop("row_num")
)

In [0]:
# Checking no duplicate customer_id remains after deduplication
customers_dedupdf.groupBy("customer_id").count().filter("count > 1").show()

In [0]:
# Hash-based change detection
# All business-relevant attributes are included to detect meaningful changes
customers_scd_ready_df = (
        customers_dedupdf
        .withColumn(
            "record_hash",
            sha2(
                concat_ws(
                    "||",
                    coalesce(col("customer_name"), lit("")),
                    coalesce(col("email"), lit("")),
                    coalesce(col("phone"), lit("")),
                    coalesce(col("address"), lit("")),
                    coalesce(col("segment"), lit("")),
                    coalesce(col("country"), lit("")),
                    coalesce(col("city"), lit("")),
                    coalesce(col("state"), lit(""))
                ),
                256
            
            )
        )
    .withColumn("silver_start_ts", current_timestamp())
    .withColumn("silver_end_ts", lit(None).cast("timestamp"))
    .withColumn("is_current", lit(True))
)
        

In [0]:
# Limitation:
#Later arriving historical customer updates are treated as current
# Backdated SCD handling can be added if required

In [0]:
if not spark.catalog.tableExists("silver.customers_enriched"):
    customers_scd_ready_df.write.format("delta") \
        .mode("overwrite") \
        .saveAsTable("silver.customers_enriched")

In [0]:
spark.table("silver.customers_enriched").printSchema()
spark.table("silver.customers_enriched").count()

In [0]:
from delta.tables import DeltaTable

silver_customers_dt = DeltaTable.forName(spark, "silver.customers_enriched")

(
    silver_customers_dt.alias("t")
    .merge(
        customers_scd_ready_df.alias("s"),
        "t.customer_id = s.customer_id AND t.is_current = true"
    )
    .whenMatchedUpdate(
        condition="t.record_hash <> s.record_hash",
        set={
            "is_current": "false",
            "silver_end_ts": "current_timestamp()"
        }
    )
    .whenNotMatchedInsertAll()
    .execute()
)


In [0]:
spark.table("silver.customers_enriched") \
    .groupBy("customer_id") \
    .count() \
    .filter("count > 1") \
    .show()

Products Silver  
Type 1 dimension - product attributes are overwritten on change  

In [0]:
products_bronze = spark.table("bronze.products")

In [0]:
#Rules followed:
#Standardize text
#Cast numeric fields
#Type-1 overwrite behaviour

from pyspark.sql.functions import trim, col

products_silverdf = (
        products_bronze
        .filter(col("product_id").isNotNull())
        .select(
            col("product_id"),
            trim(col("product_name")).alias("product_name"),
            trim(col("category")).alias("category"),
            trim(col("sub_category")).alias("sub_category"),
        col("price_per_product").cast("double").alias("price_per_product")
    )
    .dropDuplicates(["product_id"])
)

In [0]:
products_silverdf.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("silver.products_enriched")

In [0]:
spark.table("silver.products_enriched").printSchema()
spark.table("silver.products_enriched").count()