In [0]:
# 1. Imports
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [0]:
# 2. Widgets
dbutils.widgets.removeAll()

dbutils.widgets.text("catalogName", "catalog_supermarket")
dbutils.widgets.text("schemaSilver", "silver")
dbutils.widgets.text("schemaGold", "gold")

print("catalogName :", dbutils.widgets.get("catalogName"))
print("schemaSilver:", dbutils.widgets.get("schemaSilver"))
print("schemaGold  :", dbutils.widgets.get("schemaGold"))


In [0]:
# 3. Constants
catalog_name  = dbutils.widgets.get("catalogName")
schema_silver = dbutils.widgets.get("schemaSilver")
schema_gold   = dbutils.widgets.get("schemaGold")

order_products_table    = f"{catalog_name}.{schema_silver}.order_products"
product_hierarchy_table = f"{catalog_name}.{schema_silver}.product_hierarchy"

print("Tabla order_products    :", order_products_table)
print("Tabla product_hierarchy :", product_hierarchy_table)


In [0]:
# 4. Read sources
df_order_products = spark.table(order_products_table)
df_product_hierarchy = spark.table(product_hierarchy_table)

print("Preview order_products (silver):")
display(df_order_products.limit(5))

print("Preview product_hierarchy (silver):")
display(df_product_hierarchy.limit(5))


In [0]:
# 5. Transform - Ventas por departamento

df_orders_enriched = (
    df_order_products.alias("op")
    .join(
        df_product_hierarchy.alias("ph"),
        on="product_id",
        how="inner"
    )
)


In [0]:
df_sales_by_department = (
    df_orders_enriched
    .groupBy("department_id", "department_name")
    .agg(
        count("*").alias("total_items"),
        countDistinct("order_id").alias("num_orders")
    )
)

# Puedes agregar un ratio, por ejemplo participación sobre el total:
window_all = sum("total_items").over(Window.partitionBy())
df_sales_by_department = df_sales_by_department.withColumn(
    "pct_items",
    col("total_items") / window_all
)

print("Preview df_sales_by_department:")
display(df_sales_by_department.orderBy(col("total_items").desc()).limit(20))


In [0]:
# 6. Save

target_table = f"{catalog_name}.{schema_gold}.sales_by_department"

(
    df_sales_by_department
    .write
    .mode("overwrite")
    .saveAsTable(target_table)
)

print(f"✅ Tabla GOLD creada/actualizada: {target_table}")
display(spark.table(target_table).limit(20))