In [0]:
from pyspark.sql.functions import *
from pyspark.sql.window import Window


In [0]:
fact_sales    = spark.table("retailer_sales.silver.fact_sales")
fact_returns  = spark.table("retailer_sales.silver.fact_returns")
fact_payments = spark.table("retailer_sales.silver.fact_payments")

dim_product   = spark.table("retailer_sales.silver.dim_product")
dim_customer  = spark.table("retailer_sales.silver.dim_customer")
dim_date      = spark.table("retailer_sales.silver.dim_date")


In [0]:
df_sales_qtr = (
    fact_sales
    .join(dim_date, fact_sales.order_date == dim_date.date, "inner")
    .groupBy("year", "quarter")
    .agg(sum("total_amount").alias("total_sales"))
)

window_spec = Window.orderBy("year", "quarter")

df_kpi_sales_growth = (
    df_sales_qtr
    .withColumn("previous_quarter_sales", lag("total_sales").over(window_spec))
    .withColumn(
        "qoq_growth_percent",
        round(
            ((col("total_sales") - col("previous_quarter_sales")) /
             col("previous_quarter_sales")) * 100,
            2
        )
    )
)
df_kpi_sales_growth.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable("retailer_sales.gold.kpi_sales_growth_qoq")


In [0]:
df_kpi_product_margin = (
    fact_sales
    .join(dim_product, "product_id", "inner")
    .groupBy("product_id", "product_name")
    .agg(
        sum("total_amount").alias("total_sales"),
        sum(col("quantity") * col("profit_per_unit")).alias("total_profit")
    )
    .withColumn(
        "profit_margin_percent",
        round((col("total_profit") / col("total_sales")) * 100, 2)
    )
)
df_kpi_product_margin.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable("retailer_sales.gold.kpi_product_sales_margin")


In [0]:
fs = fact_sales.alias("fs")
dc = dim_customer.alias("dc")
dd = dim_date.alias("dd")
df_customer_qtr = (
    fs
    .join(dc, fs.customer_id == dc.customer_id, "inner")
    .join(dd, fs.order_date == dd.date, "inner")
    .groupBy(
        col("dc.region").alias("region"),
        col("dd.year").alias("year"),
        col("dd.quarter").alias("quarter")
    )
    .agg(countDistinct("fs.customer_id").alias("customer_count"))
)
window_spec = Window.partitionBy("region").orderBy("year", "quarter")

df_kpi_region_growth = (
    df_customer_qtr
    .withColumn(
        "previous_quarter_count",
        lag("customer_count").over(window_spec)
    )
    .withColumn(
        "growth_percent",
        round(
            ((col("customer_count") - col("previous_quarter_count")) /
             col("previous_quarter_count")) * 100,
            2
        )
    )
)
df_kpi_region_growth.write \
    .format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable("retailer_sales.gold.kpi_region_customer_growth")


In [0]:
df_orders = (
    fact_sales
    .groupBy("customer_id", "product_id")
    .agg(count("order_id").alias("total_orders"))
)

df_returns = (
    fact_returns
    .groupBy("customer_id", "product_id")
    .agg(count("order_id").alias("total_returns"))
)

df_kpi_orders_returns = (
    df_orders
    .join(df_returns, ["customer_id", "product_id"], "left")
    .fillna(0)
    .orderBy(desc("total_returns"))
    .limit(5)
)
df_kpi_orders_returns.write \
    .format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable("retailer_sales.gold.kpi_top_orders_returns")


In [0]:
df_payment_qtr = (
    fact_payments
    .filter(col("transaction_type") == "PAYMENT")
    .join(dim_date, fact_payments.transaction_date == dim_date.date, "inner")
    .groupBy("year", "quarter")
    .agg(
        sum(when(col("payment_mode").isin("UPI", "Card", "NetBanking"), col("amount"))
            .otherwise(0)).alias("digital_payment_amount"),
        sum("amount").alias("total_payment_amount")
    )
    .withColumn(
        "digital_payment_percent",
        round((col("digital_payment_amount") / col("total_payment_amount")) * 100, 2)
    )
)
df_payment_qtr.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable("retailer_sales.gold.kpi_digital_payment_percentage")


In [0]:
spark.sql("SELECT * FROM retailer_sales.gold.kpi_sales_growth_qoq").show()
spark.sql("SELECT * FROM retailer_sales.gold.kpi_product_sales_margin").show()
spark.sql("SELECT * FROM retailer_sales.gold.kpi_region_customer_growth").show()
spark.sql("SELECT * FROM retailer_sales.gold.kpi_top_orders_returns").show()
spark.sql("SELECT * FROM retailer_sales.gold.kpi_digital_payment_percentage").show()
