In [0]:
from pyspark.sql.functions import col, sum as _sum, count as _count, avg as _avg

fact = spark.table("default.silver_fact_purchases")
dim_date = spark.table("default.silver_dim_date")
dim_product = spark.table("default.silver_dim_product")
dim_supplier = spark.table("default.silver_dim_supplier")
dim_payment = spark.table("default.silver_dim_payment")
dim_status = spark.table("default.silver_dim_status")


In [0]:
base = (
    fact
    .join(dim_date, on="date_key", how="left")
    .join(dim_product, on="product_key", how="left")
    .join(dim_supplier, on="supplier_key", how="left")
    .join(dim_payment, on="payment_key", how="left")
    .join(dim_status, on="status_key", how="left")
)

base.printSchema()
base.show(5, truncate=False)


root
 |-- status_key: long (nullable = true)
 |-- payment_key: long (nullable = true)
 |-- supplier_key: long (nullable = true)
 |-- product_key: long (nullable = true)
 |-- date_key: long (nullable = true)
 |-- purchase_id: long (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- unit_price: decimal(18,2) (nullable = true)
 |-- total_amount: decimal(18,2) (nullable = true)
 |-- notes: string (nullable = true)
 |-- data: date (nullable = true)
 |-- ano: integer (nullable = true)
 |-- mes: integer (nullable = true)
 |-- mes_nome: string (nullable = true)
 |-- trimestre: integer (nullable = true)
 |-- dia: integer (nullable = true)
 |-- dia_semana: integer (nullable = true)
 |-- dia_semana_nome: string (nullable = true)
 |-- product_name: string (nullable = true)
 |-- supplier_code: string (nullable = true)
 |-- supplier_name: string (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- status_name: string (nullable = true)

+----------+-----------+------------+-

In [0]:
gold_purchases_daily = (
    base.groupBy("date_key") 
        _count("*").alias("rows"),
        _count("purchase_id").alias("num_purchases"),
        _sum("quantity").alias("total_quantity"),
        _sum("total_amount").alias("total_amount"),
        _avg("total_amount").alias("avg_purchase_amount")
    )
)

(
  gold_purchases_daily.write
  .mode("overwrite")
  .format("delta")
  .option("overwriteSchema", "true")
  .saveAsTable("default.gold_purchases_daily")
)


In [0]:
gold_purchases_by_product = (
    base.groupBy("product_key") 
    .agg(
        _count("purchase_id").alias("num_purchases"),
        _sum("quantity").alias("total_quantity"),
        _sum("total_amount").alias("total_amount"),
        _avg("unit_price").alias("avg_unit_price")
    )
)

(
  gold_purchases_by_product.write
  .mode("overwrite")
  .format("delta")
  .option("overwriteSchema", "true")
  .saveAsTable("default.gold_purchases_by_product")
)


In [0]:
gold_purchases_by_supplier = (
    base.groupBy("supplier_key")
    .agg(
        _count("purchase_id").alias("num_purchases"),
        _sum("quantity").alias("total_quantity"),
        _sum("total_amount").alias("total_amount")
    )
)

(
  gold_purchases_by_supplier.write
  .mode("overwrite")
  .format("delta")
  .option("overwriteSchema", "true")
  .saveAsTable("default.gold_purchases_by_supplier")
)


In [0]:
gold_purchases_by_status = (
    base.groupBy("status_key")
    .agg(
        _count("purchase_id").alias("num_purchases"),
        _sum("total_amount").alias("total_amount"),
        _sum("quantity").alias("total_quantity")
    )
)

(
  gold_purchases_by_status.write
  .mode("overwrite")
  .format("delta")
  .option("overwriteSchema", "true")
  .saveAsTable("default.gold_purchases_by_status")
)


In [0]:
(
  base.write
  .mode("overwrite")
  .format("delta")
  .option("overwriteSchema", "true")
  .saveAsTable("default.gold_purchases_detail")
)


In [0]:
spark.sql("SHOW TABLES IN default").show(truncate=False)


+--------+--------------------------+-----------+
|database|tableName                 |isTemporary|
+--------+--------------------------+-----------+
|default |bronze_diamonds           |false      |
|default |dim_clarity               |false      |
|default |dim_color                 |false      |
|default |dim_cut                   |false      |
|default |dim_date                  |false      |
|default |dim_payment               |false      |
|default |dim_product               |false      |
|default |dim_status                |false      |
|default |dim_supplier              |false      |
|default |fact_diamonds             |false      |
|default |fact_purchases            |false      |
|default |gold_purchases_by_product |false      |
|default |gold_purchases_by_status  |false      |
|default |gold_purchases_by_supplier|false      |
|default |gold_purchases_daily      |false      |
|default |gold_purchases_detail     |false      |
|default |silver_diamonds           |false      |


In [0]:
tables = [
  "default.gold_purchases_daily",
  "default.gold_purchases_by_product",
  "default.gold_purchases_by_supplier",
  "default.gold_purchases_by_status",
  "default.gold_purchases_detail"
]

for t in tables:
    print("\n==>", t)
    print("rows:", spark.table(t).count())
    spark.table(t).show(5, truncate=False)



==> default.gold_purchases_daily
rows: 365
+--------+----+-------------+--------------+------------+-------------------+
|date_key|rows|num_purchases|total_quantity|total_amount|avg_purchase_amount|
+--------+----+-------------+--------------+------------+-------------------+
|20260523|1   |1            |10            |25000.00    |25000.000000       |
|20260317|1   |1            |10            |25000.00    |25000.000000       |
|20261006|2   |2            |22            |13945.00    |6972.500000        |
|20260730|2   |2            |22            |18000.00    |9000.000000        |
|20260604|1   |1            |1             |7500.00     |7500.000000        |
+--------+----+-------------+--------------+------------+-------------------+
only showing top 5 rows

==> default.gold_purchases_by_product
rows: 10
+-----------+-------------+--------------+------------+--------------+
|product_key|num_purchases|total_quantity|total_amount|avg_unit_price|
+-----------+-------------+-------------