In [0]:
from pyspark.sql.functions import countDistinct, col, round

# =========================
# Gold - Product Return Rate
# =========================

silver_sales_df = spark.table("default.silver_sales")
silver_returns_df = spark.table("default.silver_returns")
silver_products_df = spark.table("default.silver_products")

# ---- valid returns (sales âˆ© returns)
valid_returns_df = (
    silver_returns_df
    .join(
        silver_sales_df.select("order_id", "product_id"),
        on="order_id",
        how="inner"
    )
)

# ---- total orders per product
product_sales_df = (
    silver_sales_df
    .groupBy("product_id")
    .agg(
        countDistinct("order_id").alias("total_orders")
    )
)

# ---- returned orders per product
product_returns_df = (
    valid_returns_df
    .groupBy("product_id")
    .agg(
        countDistinct("order_id").alias("returned_orders")
    )
)

# ---- final product return rate
gold_product_return_rate_df = (
    product_sales_df.alias("s")
    .join(product_returns_df.alias("r"), on="product_id", how="left")
    .join(silver_products_df.alias("p"), on="product_id", how="left")
    .withColumn("returned_orders", col("r.returned_orders"))
    .fillna({"returned_orders": 0})
    .withColumn(
        "return_rate",
        round(col("returned_orders") / col("total_orders"), 4)
    )
    .select(
        "product_id",
        "product_name",
        "category",
        "total_orders",
        "returned_orders",
        "return_rate"
    )
    .orderBy(col("return_rate").desc())
)

gold_product_return_rate_df.write.mode("overwrite").saveAsTable(
    "default.gold_product_return_rate"
)

display(gold_product_return_rate_df)


product_id,product_name,category,total_orders,returned_orders,return_rate
P01237,,,1,1,1.0
P00184,,,1,1,1.0
P00338,,,1,1,1.0
P00072,,,1,1,1.0
P01460,,,1,1,1.0
P00664,Product_664,manicure,1,1,1.0
P00446,,,3,3,1.0
P01131,,,1,1,1.0
P01425,Product_1425,manicure,1,1,1.0
P01348,,,1,1,1.0


In [0]:
from pyspark.sql.functions import count, col, round

# =========================
# Gold - Returns KPI
# =========================

# 1. Unique orders from sales (truth source)
sales_orders_df = (
    spark.table("default.silver_sales")
    .select("order_id")
    .distinct()
)

# 2. Unique orders from returns
returns_orders_df = (
    spark.table("default.silver_returns")
    .select("order_id")
    .distinct()
)

# 3. Only valid returns (intersection: sales âˆ© returns)
valid_returns_df = (
    returns_orders_df
    .join(sales_orders_df, on="order_id", how="inner")
)

# 4. KPI calculation
gold_returns_kpi_df = (
    sales_orders_df
    .agg(count("order_id").alias("total_orders"))
    .crossJoin(
        valid_returns_df
        .agg(count("order_id").alias("returned_orders"))
    )
    .withColumn(
        "return_rate",
        round(col("returned_orders") / col("total_orders"), 4)
    )
)

# 5. Save Gold table
gold_returns_kpi_df.write.mode("overwrite").saveAsTable(
    "default.gold_returns_kpi"
)

display(gold_returns_kpi_df)


total_orders,returned_orders,return_rate
2782,1067,0.3835


In [0]:
from pyspark.sql.functions import (
    col,
    sum as _sum,
    countDistinct,
    round,
    when
)

# =========================
# Gold - Customer Business Metrics
# =========================

silver_sales_df = spark.table("default.silver_sales")
silver_customers_df = spark.table("default.silver_customers")
silver_returns_df = spark.table("default.silver_returns")

# ---- sales aggregated per customer
customer_sales_df = (
    silver_sales_df
    .groupBy("customer_id")
    .agg(
        _sum(col("quantity") * col("unit_price")).alias("total_revenue"),
        countDistinct("order_id").alias("orders_count")
    )
)

# ---- returns aggregated per customer
customer_returns_df = (
    silver_returns_df
    .join(
        silver_sales_df.select("order_id", "customer_id"),
        on="order_id",
        how="left"
    )
    .groupBy("customer_id")
    .agg(
        countDistinct("order_id").alias("returns_count")
    )
)

# ---- final customer performance
gold_customer_performance_df = (
    customer_sales_df.alias("s")
    .join(
        customer_returns_df.alias("r"),
        on="customer_id",
        how="left"
    )
    .join(
        silver_customers_df.alias("c"),
        on="customer_id",
        how="left"
    )
    .withColumn("returns_count", col("r.returns_count"))
    .withColumn("returns_count", when(col("returns_count").isNull(), 0).otherwise(col("returns_count")))
    .withColumn(
        "aov",
        round(col("total_revenue") / col("orders_count"), 2)
    )
    .withColumn(
        "return_rate",
        round(col("returns_count") / col("orders_count"), 3)
    )
    .select(
        "customer_id",
        "customer_name",
        "country",
        "total_revenue",
        "orders_count",
        "returns_count",
        "aov",
        "return_rate"
    )
    .orderBy(col("total_revenue").desc())
)

display(gold_customer_performance_df)


customer_id,customer_name,country,total_revenue,orders_count,returns_count,aov,return_rate
C000380,Customer_380,France,12163.0,3,1,4054.33,0.333
C007257,Customer_7257,France,11590.0,2,2,5795.0,1.0
C003971,Customer_3971,Unknown,10872.0,2,0,5436.0,0.0
C002545,Customer_2545,United Kingdom,10825.0,3,1,3608.33,0.333
C006292,Customer_6292,Unknown,10410.0,2,1,5205.0,0.5
C003140,Customer_3140,France,10396.0,2,0,5198.0,0.0
C010747,Customer_10747,Unknown,10232.0,2,2,5116.0,1.0
C008295,Customer_8295,United Kingdom,10150.0,2,0,5075.0,0.0
C007633,Customer_7633,Germany,9983.0,2,1,4991.5,0.5
C004265,Customer_4265,Poland,9916.0,2,2,4958.0,1.0


In [0]:
from pyspark.sql.functions import sum as _sum, countDistinct, round, desc
from pyspark.sql.window import Window

customer_window = Window.orderBy(desc("customer_revenue"))

# =========================
# Gold - Customer Performance (Pareto)
# =========================

gold_customer_performance_df = (
    spark.table("default.silver_sales")
    .groupBy("customer_id")
    .agg(
        _sum(col("quantity") * col("unit_price")).alias("customer_revenue"),
        countDistinct("order_id").alias("orders_count")
    )
    .withColumn("customer_revenue", round(col("customer_revenue"), 2))
    .withColumn(
        "cumulative_revenue",
        _sum("customer_revenue").over(customer_window)
    )
    .withColumn(
        "total_revenue",
        _sum("customer_revenue").over(
            Window.rowsBetween(
                Window.unboundedPreceding,
                Window.unboundedFollowing
            )
        )
    )
    .withColumn(
        "revenue_share",
        round(col("cumulative_revenue") / col("total_revenue"), 4)
    )
    .orderBy(desc("customer_revenue"))
)

gold_customer_performance_df.write.mode("overwrite").saveAsTable(
    "default.gold_customer_performance"
)

display(gold_customer_performance_df)


In [0]:
from pyspark.sql.functions import (
    col,
    sum as _sum,
    countDistinct,
    round,
    desc,
    when
)
from pyspark.sql.window import Window

# =========================
# Gold - Category Performance
# =========================

silver_sales_df = spark.table("default.silver_sales")
silver_products_df = spark.table("default.silver_products")

sales_with_categories_df = (
    silver_sales_df.alias("s")
    .join(
        silver_products_df.alias("p"),
        on="product_id",
        how="left"
    )
    .withColumn(
        "category",
        when(col("p.category").isNull(), "UNKNOWN_CATEGORY")
        .otherwise(col("p.category"))
    )
)

category_window = Window.orderBy(desc("category_revenue"))

gold_category_performance_df = (
    sales_with_categories_df
    .groupBy("category")
    .agg(
        _sum(col("quantity") * col("unit_price")).alias("category_revenue"),
        _sum("quantity").alias("units_sold"),
        countDistinct("order_id").alias("orders_count")
    )
    .withColumn("category_revenue", round(col("category_revenue"), 2))
    .withColumn(
        "cumulative_revenue",
        _sum("category_revenue").over(category_window)
    )
    .withColumn(
        "total_revenue",
        _sum("category_revenue").over(
            Window.rowsBetween(
                Window.unboundedPreceding,
                Window.unboundedFollowing
            )
        )
    )
    .withColumn(
        "revenue_share",
        round(col("cumulative_revenue") / col("total_revenue"), 4)
    )
    .withColumn(
        "pareto_flag",
        when(col("revenue_share") <= 0.8, "TOP_80_PERCENT")
        .otherwise("BOTTOM_20_PERCENT")
    )
    .orderBy(desc("category_revenue"))
)

display(gold_category_performance_df)




category,category_revenue,units_sold,orders_count,cumulative_revenue,total_revenue,revenue_share,pareto_flag
UNKNOWN_CATEGORY,5747255.0,7482,2477,5747255.0,6482067.0,0.8866,BOTTOM_20_PERCENT
manicure,281672.0,356,117,6028927.0,6482067.0,0.9301,BOTTOM_20_PERCENT
pedicure,238519.0,280,91,6267446.0,6482067.0,0.9669,BOTTOM_20_PERCENT
tools,214621.0,279,97,6482067.0,6482067.0,1.0,BOTTOM_20_PERCENT


In [0]:
from pyspark.sql.functions import (
    col,
    sum as _sum,
    countDistinct,
    round,
    desc,
    when
)
from pyspark.sql.window import Window

# =========================
# Gold - Product Performance (fixed)
# =========================

silver_sales_df = spark.table("default.silver_sales")
silver_products_df = spark.table("default.silver_products")

sales_with_products_df = (
    silver_sales_df.alias("s")
    .join(
        silver_products_df.alias("p"),
        on="product_id",
        how="left"
    )
    .withColumn(
        "product_name",
        when(col("p.product_name").isNull(), "UNKNOWN_PRODUCT")
        .otherwise(col("p.product_name"))
    )
    .withColumn(
        "category",
        when(col("p.category").isNull(), "UNKNOWN_CATEGORY")
        .otherwise(col("p.category"))
    )
)

product_window = Window.orderBy(desc("product_revenue"))

gold_product_performance_df = (
    sales_with_products_df
    .groupBy(
        "product_id",
        "product_name",
        "category"
    )
    .agg(
        _sum(col("quantity") * col("unit_price")).alias("product_revenue"),
        _sum("quantity").alias("units_sold"),
        countDistinct("order_id").alias("orders_count")
    )
    .withColumn("product_revenue", round(col("product_revenue"), 2))
    .withColumn(
        "cumulative_revenue",
        _sum("product_revenue").over(product_window)
    )
    .withColumn(
        "total_revenue",
        _sum("product_revenue").over(
            Window.rowsBetween(
                Window.unboundedPreceding,
                Window.unboundedFollowing
            )
        )
    )
    .withColumn(
        "revenue_share",
        round(col("cumulative_revenue") / col("total_revenue"), 4)
    )
    .withColumn(
        "pareto_flag",
        when(col("revenue_share") <= 0.8, "TOP_80_PERCENT")
        .otherwise("BOTTOM_20_PERCENT")
    )
    .orderBy(desc("product_revenue"))
)

display(gold_product_performance_df)




product_id,product_name,category,product_revenue,units_sold,orders_count,cumulative_revenue,total_revenue,revenue_share,pareto_flag
P99999,UNKNOWN_PRODUCT,UNKNOWN_CATEGORY,3291962.0,4250,1401,3291962.0,6482067.0,0.5079,TOP_80_PERCENT
P00990,UNKNOWN_PRODUCT,UNKNOWN_CATEGORY,19029.0,16,4,3310991.0,6482067.0,0.5108,TOP_80_PERCENT
P01170,UNKNOWN_PRODUCT,UNKNOWN_CATEGORY,17733.0,13,4,3328724.0,6482067.0,0.5135,TOP_80_PERCENT
P00702,UNKNOWN_PRODUCT,UNKNOWN_CATEGORY,16359.0,19,5,3345083.0,6482067.0,0.5161,TOP_80_PERCENT
P00105,Product_105,manicure,16090.0,17,5,3361173.0,6482067.0,0.5185,TOP_80_PERCENT
P00329,UNKNOWN_PRODUCT,UNKNOWN_CATEGORY,15558.0,13,4,3376731.0,6482067.0,0.5209,TOP_80_PERCENT
P00458,UNKNOWN_PRODUCT,UNKNOWN_CATEGORY,14978.0,13,5,3391709.0,6482067.0,0.5232,TOP_80_PERCENT
P00809,UNKNOWN_PRODUCT,UNKNOWN_CATEGORY,14595.0,13,3,3406304.0,6482067.0,0.5255,TOP_80_PERCENT
P01107,UNKNOWN_PRODUCT,UNKNOWN_CATEGORY,13930.0,15,3,3420234.0,6482067.0,0.5276,TOP_80_PERCENT
P01463,UNKNOWN_PRODUCT,UNKNOWN_CATEGORY,13012.0,13,4,3433246.0,6482067.0,0.5297,TOP_80_PERCENT


In [0]:
from pyspark.sql.functions import (
    col,
    sum as _sum,
    countDistinct,
    round
)

gold_sales_summary_df = (
    spark.table("default.silver_sales")
    .groupBy("order_date")
    .agg(
        _sum(col("quantity") * col("unit_price")).alias("revenue"),
        _sum("quantity").alias("units_sold"),
        countDistinct("order_id").alias("orders_count")
    )
    .withColumn("revenue", round(col("revenue"), 2))
    .orderBy("order_date")
)

gold_sales_summary_df.write.mode("overwrite").saveAsTable(
    "default.gold_sales_summary"
)

display(gold_sales_summary_df)


order_date,revenue,units_sold,orders_count
,5339304.0,6946,2305
2022-01-03,5640.0,4,1
2022-01-05,1167.0,1,1
2022-01-06,728.0,4,1
2022-01-07,972.0,3,2
2022-01-08,6280.0,5,1
2022-01-12,3565.0,5,1
2022-01-13,3720.0,5,1
2022-01-20,213.0,1,1
2022-01-22,3800.0,5,1


In [0]:
from pyspark.sql.functions import (
    col,
    sum as _sum,
    countDistinct,
    when,
    round
)

# =========================
# Gold - Daily Sales KPIs (with return rate)
# =========================

gold_daily_sales_df = (
    silver_df
    .groupBy("order_date")
    .agg(
        # revenue
        _sum(col("quantity") * col("unit_price")).alias("daily_revenue"),
        
        # orders
        countDistinct("order_id").alias("orders_count"),
        
        # units sold
        _sum("quantity").alias("units_sold"),
        
        # returned orders
        countDistinct(
            when(col("return_date").isNotNull(), col("order_id"))
        ).alias("returned_orders")
    )
    .withColumn(
        "aov",
        round(col("daily_revenue") / col("orders_count"), 2)
    )
    .withColumn(
        "return_rate",
        round(col("returned_orders") / col("orders_count"), 4)
    )
    .orderBy("order_date")
)

display(gold_daily_sales_df.limit(10))


order_date,daily_revenue,orders_count,units_sold,returned_orders,aov,return_rate
,5339304.0,2305,6946,155,2316.4,0.0672
2022-01-03,5640.0,1,4,1,5640.0,1.0
2022-01-05,1167.0,1,1,0,1167.0,0.0
2022-01-06,728.0,1,4,0,728.0,0.0
2022-01-07,972.0,2,3,0,486.0,0.0
2022-01-08,6280.0,1,5,0,6280.0,0.0
2022-01-12,3565.0,1,5,0,3565.0,0.0
2022-01-13,3720.0,1,5,0,3720.0,0.0
2022-01-20,213.0,1,1,0,213.0,0.0
2022-01-22,3800.0,1,5,1,3800.0,1.0


In [0]:
# =========================
# Gold layer - Load source
# =========================

silver_df = spark.table("default.silver_sales_enriched")

print(f"Silver rows: {silver_df.count()}")
display(silver_df.limit(10))


Silver rows: 2782


order_id,order_date,customer_id,product_id,quantity,unit_price,sales_ingestion_ts,sales_source_file,customer_name,country,signup_date,customer_source_system,product_name,category,base_price,return_date,return_reason
100011,2024-01-04,C001741,P01179,5,1273.0,2026-02-03T14:06:16.602Z,sales_raw.csv,Customer_1741,Germany,2021-12-19,SRC_249,,,,,Defect
100012,,C006462,P99999,4,852.0,2026-02-03T14:06:16.602Z,sales_raw.csv,Customer_6462,France,,SRC_97,,,,,
100023,,C010950,P99999,3,896.0,2026-02-03T14:06:16.602Z,sales_raw.csv,Customer_10950,United Kingdom,,SRC_101,,,,,
100058,,C004066,P01114,3,1481.0,2026-02-03T14:06:16.602Z,sales_raw.csv,Customer_4066,United Kingdom,2020-01-17,SRC_37,,,,,
100091,2023-01-26,C007443,P00125,4,1228.0,2026-02-03T14:06:16.602Z,sales_raw.csv,Customer_7443,United Kingdom,,SRC_220,Product_125,manicure,25.0,,
100102,,C007484,P99999,3,934.0,2026-02-03T14:06:16.602Z,sales_raw.csv,Customer_7484,United Kingdom,,SRC_5,,,,,Wrong item
100109,,C002205,P01388,2,1174.0,2026-02-03T14:06:16.602Z,sales_raw.csv,Customer_2205,France,,SRC_64,Product_1388,manicure,129.0,,
100145,2022-04-25,C000446,P00390,4,349.0,2026-02-03T14:06:16.602Z,sales_raw.csv,Customer_446,Unknown,,SRC_65,Product_390,tools,28.0,,
100194,,C010189,P00738,5,624.0,2026-02-03T14:06:16.602Z,sales_raw.csv,Customer_10189,United Kingdom,,SRC_53,,,,,Defect
100196,,C006536,P01190,5,451.0,2026-02-03T14:06:16.602Z,sales_raw.csv,Customer_6536,Poland,2022-08-16,SRC_9,,,,,
