In [0]:
silver_df = spark.read.table("sales_db.silver_sales_orders")
print("✅ Silver loaded")

In [0]:
from pyspark.sql.functions import sum as spark_sum, count, round as spark_round

gold_revenue_by_customer = (silver_df
    .groupBy("customer_id", "customer_name")
    .agg(
        spark_round(spark_sum("line_total"), 2).alias("total_revenue"),
        count("order_number").alias("total_orders")
    )
    .orderBy("total_revenue", ascending=False)
)
display(gold_revenue_by_customer)

In [0]:
gold_revenue_by_product = (silver_df
    .groupBy("product_id", "product_name")
    .agg(
        spark_round(spark_sum("line_total"), 2).alias("total_revenue"),
        spark_sum("quantity").alias("total_units_sold"
    ))
    .orderBy("total_revenue", ascending=False)
)
display(gold_revenue_by_product)

In [0]:
from pyspark.sql.functions import date_format, col, sum as spark_sum, count, round as spark_round

gold_revenue_by_month = (silver_df
    .filter(col("order_datetime").isNotNull())
    .withColumn("month", date_format(col("order_datetime"), "yyyy-MM"))
    .groupBy("month")
    .agg(
        spark_round(spark_sum("line_total"), 2).alias("total_revenue"),
        count("order_number").alias("total_orders")
    )
    .orderBy("month")
)
display(gold_revenue_by_month)

In [0]:
gold_revenue_by_customer.write.format("delta").mode("overwrite").saveAsTable("sales_db.gold_revenue_by_customer")
gold_revenue_by_product.write.format("delta").mode("overwrite").saveAsTable("sales_db.gold_revenue_by_product")
gold_revenue_by_month.write.format("delta").mode("overwrite").saveAsTable("sales_db.gold_revenue_by_month")
print("✅ All Gold tables saved!")

In [0]:
%sql
SHOW TABLES IN sales_db