In [0]:
# =========================
# Write Silver tables
# =========================

(
    silver_sales_df
    .write
    .mode("overwrite")
    .format("delta")
    .saveAsTable("default.silver_sales")
)

(
    silver_customers_df
    .write
    .mode("overwrite")
    .format("delta")
    .saveAsTable("default.silver_customers")
)

(
    silver_products_df
    .write
    .mode("overwrite")
    .format("delta")
    .saveAsTable("default.silver_products")
)

(
    silver_returns_df
    .write
    .mode("overwrite")
    .format("delta")
    .saveAsTable("default.silver_returns")
)

(
    silver_sales_enriched_df
    .write
    .mode("overwrite")
    .format("delta")
    .saveAsTable("default.silver_sales_enriched")
)

print("All Silver tables written successfully")


All Silver tables written successfully


In [0]:
silver_sales_enriched_df = (
    silver_sales_df.alias("s")
    
    .join(
        silver_customers_df.alias("c"),
        col("s.customer_id") == col("c.customer_id"),
        "left"
    )
    .join(
        silver_products_df.alias("p"),
        col("s.product_id") == col("p.product_id"),
        "left"
    )
    .join(
        silver_returns_df.alias("r"),
        col("s.order_id") == col("r.order_id"),
        "left"
    )
    .select(
        # --- sales (fact) ---
        col("s.order_id"),
        col("s.order_date"),
        col("s.customer_id"),
        col("s.product_id"),
        col("s.quantity"),
        col("s.unit_price"),
        col("s._ingestion_timestamp").alias("sales_ingestion_ts"),
        col("s._source_file").alias("sales_source_file"),
        
        # --- customers ---
        col("c.customer_name"),
        col("c.country"),
        col("c.signup_date"),
        col("c.source_system").alias("customer_source_system"),
        
        # --- products ---
        col("p.product_name"),
        col("p.category"),
        col("p.base_price"),
        
        # --- returns ---
        col("r.return_date"),
        col("r.reason").alias("return_reason")
    )
)

display(silver_sales_enriched_df.limit(10))

order_id,order_date,customer_id,product_id,quantity,unit_price,sales_ingestion_ts,sales_source_file,customer_name,country,signup_date,customer_source_system,product_name,category,base_price,return_date,return_reason
100011,2024-01-04,C001741,P01179,5,1273.0,2026-02-03T14:06:16.602Z,sales_raw.csv,Customer_1741,Germany,2021-12-19,SRC_249,,,,,Defect
100012,,C006462,P99999,4,852.0,2026-02-03T14:06:16.602Z,sales_raw.csv,Customer_6462,France,,SRC_97,,,,,
100023,,C010950,P99999,3,896.0,2026-02-03T14:06:16.602Z,sales_raw.csv,Customer_10950,United Kingdom,,SRC_101,,,,,
100058,,C004066,P01114,3,1481.0,2026-02-03T14:06:16.602Z,sales_raw.csv,Customer_4066,United Kingdom,2020-01-17,SRC_37,,,,,
100091,2023-01-26,C007443,P00125,4,1228.0,2026-02-03T14:06:16.602Z,sales_raw.csv,Customer_7443,United Kingdom,,SRC_220,Product_125,manicure,25.0,,
100102,,C007484,P99999,3,934.0,2026-02-03T14:06:16.602Z,sales_raw.csv,Customer_7484,United Kingdom,,SRC_5,,,,,Wrong item
100109,,C002205,P01388,2,1174.0,2026-02-03T14:06:16.602Z,sales_raw.csv,Customer_2205,France,,SRC_64,Product_1388,manicure,129.0,,
100145,2022-04-25,C000446,P00390,4,349.0,2026-02-03T14:06:16.602Z,sales_raw.csv,Customer_446,Unknown,,SRC_65,Product_390,tools,28.0,,
100194,,C010189,P00738,5,624.0,2026-02-03T14:06:16.602Z,sales_raw.csv,Customer_10189,United Kingdom,,SRC_53,,,,,Defect
100196,,C006536,P01190,5,451.0,2026-02-03T14:06:16.602Z,sales_raw.csv,Customer_6536,Poland,2022-08-16,SRC_9,,,,,


In [0]:
from pyspark.sql.functions import (
    col,
    trim,
    expr
)

# =========================
# Silver - Returns cleaning
# =========================

silver_returns_df = (
    bronze_returns_df
    # remove rows without business key
    .dropna(subset=["order_id"])
    
    # clean string columns
    .withColumn("order_id", trim(col("order_id")))
    .withColumn("reason", trim(col("reason")))
    
    # safe date casting
    .withColumn("return_date", expr("try_cast(return_date as date)"))
    
    # remove duplicates
    .dropDuplicates(["order_id"])
)

display(silver_returns_df.limit(10))


return_id,order_id,return_date,reason,source_system,_ingestion_timestamp,_source_file
R0000000,99999999,,Defect,SRC_132,2026-02-03T14:06:29.140Z,returns_raw.csv
R0000002,125323,,Wrong item,SRC_17,2026-02-03T14:06:29.140Z,returns_raw.csv
R0000003,106877,,,SRC_27,2026-02-03T14:06:29.140Z,returns_raw.csv
R0000004,117995,,Wrong item,SRC_172,2026-02-03T14:06:29.140Z,returns_raw.csv
R0000010,121637,,Complaint,SRC_7,2026-02-03T14:06:29.140Z,returns_raw.csv
R0000011,107592,,,SRC_153,2026-02-03T14:06:29.140Z,returns_raw.csv
R0000013,104184,,,SRC_173,2026-02-03T14:06:29.140Z,returns_raw.csv
R0000014,128379,,,SRC_18,2026-02-03T14:06:29.140Z,returns_raw.csv
R0000015,105123,2021-03-09,,SRC_138,2026-02-03T14:06:29.140Z,returns_raw.csv
R0000017,115689,,,SRC_200,2026-02-03T14:06:29.140Z,returns_raw.csv


In [0]:
from pyspark.sql.functions import (
    col,
    trim,
    lower,
    expr
)

# =========================
# Silver - Products cleaning
# =========================

silver_products_df = (
    bronze_products_df
    # remove rows without business key
    .dropna(subset=["product_id"])
    
    # standardize strings
    .withColumn("product_id", trim(col("product_id")))
    .withColumn("product_name", trim(col("product_name")))
    .withColumn("category", lower(trim(col("category"))))
    
    # safe price casting
    .withColumn("base_price", expr("try_cast(base_price as double)"))
    
    # business rules
    .filter(col("base_price") > 0)
    .filter(col("category").isNotNull())
    
    # deduplication
    .dropDuplicates(["product_id"])
)

display(silver_products_df.limit(10))


product_id,product_name,category,base_price,source_system,_ingestion_timestamp,_source_file
P00012,Product_12,tools,184.0,SRC_109,2026-02-03T14:06:24.709Z,products_raw.csv
P00014,Product_14,manicure,267.0,SRC_137,2026-02-03T14:06:24.709Z,products_raw.csv
P00019,Product_19,manicure,160.0,SRC_32,2026-02-03T14:06:24.709Z,products_raw.csv
P00023,Product_23,pedicure,264.0,SRC_63,2026-02-03T14:06:24.709Z,products_raw.csv
P00027,Product_27,pedicure,16.0,SRC_76,2026-02-03T14:06:24.709Z,products_raw.csv
P00043,Product_43,pedicure,72.0,SRC_239,2026-02-03T14:06:24.709Z,products_raw.csv
P00044,Product_44,tools,292.0,SRC_31,2026-02-03T14:06:24.709Z,products_raw.csv
P00045,Product_45,pedicure,49.0,SRC_131,2026-02-03T14:06:24.709Z,products_raw.csv
P00049,Product_49,pedicure,281.0,SRC_82,2026-02-03T14:06:24.709Z,products_raw.csv
P00054,Product_54,pedicure,184.0,SRC_17,2026-02-03T14:06:24.709Z,products_raw.csv


In [0]:
from pyspark.sql.functions import (
    col,
    trim,
    expr,
    lower,
    when
)

# =========================
# Silver - Customers cleaning
# =========================

bronze_customers_df = spark.table("default.bronze_customers")

silver_customers_df = (
    bronze_customers_df
    # remove rows without business key
    .dropna(subset=["customer_id"])
    
    # clean business columns
    .withColumn("customer_id", trim(col("customer_id")))
    .withColumn("customer_name", trim(col("customer_name")))
    .withColumn("country_raw", lower(trim(col("country"))))
    
    # -------------------------
    # country normalization
    # -------------------------
    .withColumn(
        "country",
        when(col("country_raw").isin("pl", "poland", "polska"), "Poland")
        .when(col("country_raw").isin("de", "germany", "deutschland"), "Germany")
        .when(col("country_raw").isin("us", "usa", "united states", "united states of america"), "United States")
        .when(col("country_raw").isin("uk", "united kingdom", "great britain", "england"), "United Kingdom")
        .when(col("country_raw").isin("fr", "france"), "France")
        .otherwise("Unknown")
    )
    
    # safe date casting
    .withColumn("signup_date", expr("try_cast(signup_date as date)"))
    
    # keep technical metadata
    .withColumn("source_system", trim(col("source_system")))
    
    # deduplication on business key
    .dropDuplicates(["customer_id"])
    
    # drop helper column
    .drop("country_raw")
)

display(silver_customers_df.limit(10))


customer_id,customer_name,country,signup_date,source_system,_ingestion_timestamp,_source_file
C000000,Customer_0,Poland,,SRC_71,2026-02-03T14:06:20.707Z,customers_raw.csv
C000001,Customer_1,United Kingdom,,SRC_189,2026-02-03T14:06:20.707Z,customers_raw.csv
C000002,Customer_2,Poland,,SRC_109,2026-02-03T14:06:20.707Z,customers_raw.csv
C000003,Customer_3,Poland,2018-05-03,SRC_56,2026-02-03T14:06:20.707Z,customers_raw.csv
C000004,Customer_4,United Kingdom,,SRC_7,2026-02-03T14:06:20.707Z,customers_raw.csv
C000005,Customer_5,Unknown,,SRC_167,2026-02-03T14:06:20.707Z,customers_raw.csv
C000006,Customer_6,Unknown,,SRC_115,2026-02-03T14:06:20.707Z,customers_raw.csv
C000007,Customer_7,Unknown,2021-02-13,SRC_195,2026-02-03T14:06:20.707Z,customers_raw.csv
C000008,Customer_8,United Kingdom,,SRC_72,2026-02-03T14:06:20.707Z,customers_raw.csv
C000009,Customer_9,United Kingdom,,SRC_27,2026-02-03T14:06:20.707Z,customers_raw.csv


In [0]:
from pyspark.sql.functions import (
    col,
    trim,
    to_date,
    expr
)

# =========================
# Silver - Sales cleaning
# =========================

silver_sales_df = (
    bronze_sales_df
    # remove rows without business keys
    .dropna(subset=["order_id", "product_id", "customer_id"])
    
    # clean string columns
    .withColumn("order_id", trim(col("order_id")))
    .withColumn("product_id", trim(col("product_id")))
    .withColumn("customer_id", trim(col("customer_id")))
    
    # safe casting (dirty data aware)
    .withColumn("order_date", expr("try_cast(order_date as date)"))
    .withColumn("quantity", expr("try_cast(quantity as int)"))
    .withColumn("unit_price", expr("try_cast(unit_price as double)"))
    
    # business rules
    .filter(col("quantity") > 0)
    .filter(col("unit_price") > 0)
)

display(silver_sales_df.limit(10))


order_id,order_date,customer_id,product_id,quantity,unit_price,currency,source_system,_ingestion_timestamp,_source_file
100011,2024-01-04,C001741,P01179,5,1273.0,USD,SRC_198,2026-02-03T14:06:16.602Z,sales_raw.csv
100012,,C006462,P99999,4,852.0,USD,SRC_113,2026-02-03T14:06:16.602Z,sales_raw.csv
100023,,C010950,P99999,3,896.0,USD,SRC_229,2026-02-03T14:06:16.602Z,sales_raw.csv
100058,,C004066,P01114,3,1481.0,USD,SRC_21,2026-02-03T14:06:16.602Z,sales_raw.csv
100091,2023-01-26,C007443,P00125,4,1228.0,EUR,SRC_242,2026-02-03T14:06:16.602Z,sales_raw.csv
100102,,C007484,P99999,3,934.0,USD,SRC_38,2026-02-03T14:06:16.602Z,sales_raw.csv
100109,,C002205,P01388,2,1174.0,EUR,SRC_117,2026-02-03T14:06:16.602Z,sales_raw.csv
100145,2022-04-25,C000446,P00390,4,349.0,USD,SRC_61,2026-02-03T14:06:16.602Z,sales_raw.csv
100194,,C010189,P00738,5,624.0,PLN,SRC_150,2026-02-03T14:06:16.602Z,sales_raw.csv
100196,,C006536,P01190,5,451.0,EUR,SRC_134,2026-02-03T14:06:16.602Z,sales_raw.csv


In [0]:
# =========================
# Load Bronze tables
# =========================

bronze_sales_df = spark.table("default.bronze_sales")
bronze_customers_df = spark.table("default.bronze_customers")
bronze_products_df = spark.table("default.bronze_products")
bronze_returns_df = spark.table("default.bronze_returns")

print("Bronze tables loaded:")
print(f"sales: {bronze_sales_df.count()}")
print(f"customers: {bronze_customers_df.count()}")
print(f"products: {bronze_products_df.count()}")
print(f"returns: {bronze_returns_df.count()}")


Bronze tables loaded:
sales: 50000
customers: 12000
products: 1500
returns: 50000


In [0]:
# =========================
# Project configuration
# =========================

PROJECT_NAME = "enterprise_sales_analytics_platform"

CATALOG = "workspace"
SCHEMA = "default"

BRONZE_DB = "default"
SILVER_DB = "default"

spark.sql(f"USE CATALOG {CATALOG}")
spark.sql(f"USE SCHEMA {SCHEMA}")

print("Catalog and schema set successfully")


Catalog and schema set successfully
