In [1]:
from pyspark.sql.functions import col

# 0. Se cargan las tablas de Bronze (managed tables)
df_items = spark.read.table("lh_bronze.bronze_order_items")
df_ordenes = spark.read.table("lh_bronze.bronze_ordenes")
df_productos = spark.read.table("lh_bronze.bronze_productos")

# 1. Creación de la dimensión de direcciones
dim_direccion = df_ordenes.select(
    col("cp_destino").alias("codigo_postal_id"),
    col("direccion_des").alias("direccion_completa")
).distinct()

# 2. Cruce de ventas (Fact Table)
fact_ventas = df_items.join(df_ordenes, df_items["order_id"] == df_ordenes["id"], "inner") \
    .join(df_productos, df_items["product_id"] == df_productos["id"], "inner") \
    .select(
        df_ordenes["id"].alias("orden_id"),
        df_items["product_id"].alias("producto_id"),
        df_ordenes["proveedor_asignado"], 
        df_ordenes["estado_logistica"],
        df_ordenes["cp_destino"].alias("codigo_postal_id"),
        col("cantidad"), 
        df_items["precio_unitario"].alias("precio_venta_real"),
        (col("cantidad") * df_items["precio_unitario"]).alias("monto_linea_total"),
        col("fecha_venta").cast("date").alias("fecha_id"), 
        col("costo_envio").alias("gasto_logistico_total")
    )

# 3. Guardado en Silver
fact_ventas.write.format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable("lh_silver.fact_ventas")

dim_direccion.write.format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable("lh_silver.dim_direccion")

print("Silver listo: fact_ventas y dim_direccion actualizadas.")

StatementMeta(, c01adc82-daac-4218-a703-86478ea9034f, 3, Finished, Available, Finished)

Silver listo: fact_ventas y dim_direccion actualizadas.
