In [0]:
from pyspark.sql.functions import col, when, round, sum as _sum, row_number
from pyspark.sql.types import *
from pyspark.sql.window import Window

# 🧩 Crear widget para tipo de cambio (PEN/USD)
dbutils.widgets.text("tipo_cambio", "3.63", "Ingrese el tipo de cambio PEN/USD")
tipo_cambio = float(dbutils.widgets.get("tipo_cambio"))

# 🔄 Eliminar tablas si ya existen
spark.sql("DROP TABLE IF EXISTS default.silver_pedidos_raw")
spark.sql("DROP TABLE IF EXISTS default.silver_pedidos_pago")

# 1️⃣ Leer tabla intermedia bfstable
df_bf = spark.read.table("default.bfstable")

# 2️⃣ Casteo apropiado (sin renombrar columnas)
df_casted = df_bf.select(
    col("codigo_solicitud").cast(StringType()),
    col("pedido_compra").cast(LongType()),
    col("posicion").cast(IntegerType()),
    col("codigo_proveedor").cast(IntegerType()),
    col("nombre_proveedor").cast(StringType()),
    col("texto_breve").cast(StringType()),
    col("moneda").cast(StringType()),
    col("cuenta_contable").cast(LongType()),
    col("grupo_articulo").cast(IntegerType()),
    col("fecha_entrega").cast(DateType()),
    col("cantidad").cast(DoubleType()),
    col("unidad").cast(StringType()),
    col("valor_neto").cast(DoubleType()),
    col("indicador_impuesto").cast(StringType()),
    col("centro_costo").cast(DoubleType()),
    col("orden").cast(StringType()),
    col("area").cast(StringType()),
    col("area_solicitante").cast(StringType()),
    col("fecha_recepcion").cast(TimestampType()),
    col("fecha_generacion").cast(DateType()),
    col("fecha_entrega_servicio").cast(DateType()),
    col("fecha_documento").cast(DateType()),
    col("fecha_aceptacion").cast(DateType()),
    col("fecha_envio_proveedor").cast(DateType()),
    col("fecha_mesa_partes").cast(DateType()),
    col("fecha_contabilidad").cast(DateType()),
    col("numero_comprobante").cast(StringType()),
    col("fecha_pago").cast(DateType()),
    col("estado_pedido").cast(StringType()),
    col("fecha_registro_conta").cast(StringType())
)

# 3️⃣ Enriquecer con cálculo de total_pago
df_enriquecido = df_casted.withColumn(
    "total_pago",
    when(col("indicador_impuesto") == "K8", col("valor_neto") * col("cantidad") * 1.18)
    .otherwise(col("valor_neto") * col("cantidad"))
).withColumn(
    "total_pago",
    when(col("moneda") == "USD", col("total_pago") * tipo_cambio).otherwise(col("total_pago"))
)

# 4️⃣ Evitar duplicado de cantidad en el cálculo del total por pedido
df_total_pago = df_enriquecido.select(
    "pedido_compra", "texto_breve", "valor_neto", "cantidad", "moneda", "indicador_impuesto", "total_pago"
).dropDuplicates(["pedido_compra", "texto_breve", "valor_neto", "cantidad"])

# 5️⃣ Total por pedido_compra (evitando duplicidad de cantidad)
df_total = df_total_pago.groupBy("pedido_compra").agg(
    round(_sum("total_pago"), 2).alias("total_pago_soles")
)

# 6️⃣ Fila representativa por pedido_compra (posición menor)
window_spec = Window.partitionBy("pedido_compra").orderBy("posicion")
df_rep = df_enriquecido.withColumn("row_num", row_number().over(window_spec)) \
    .filter(col("row_num") == 1).drop("row_num", "total_pago")

# 7️⃣ Combinar total con fila representativa
df_silver_final = df_total.join(df_rep, on="pedido_compra", how="left")

# 8️⃣ Guardar tablas Delta
df_enriquecido.write.format("delta").mode("overwrite").saveAsTable("default.silver_pedidos_raw")
df_silver_final.write.format("delta").mode("overwrite").saveAsTable("default.silver_pedidos_pago")

print(f"✅ Tablas Silver generadas correctamente con tipo de cambio aplicado: {tipo_cambio}")

In [0]:
%sql
select spr.pedido_compra,spr.moneda,spr.cantidad,spr.posicion,spr.valor_neto,spr.indicador_impuesto,spr.total_pago from default.silver_pedidos_raw spr 
where spr.pedido_compra = 4800784684

In [0]:
%sql
select spp.total_pago_soles from default.silver_pedidos_pago spp
where spp.pedido_compra = 4800784684

In [0]:
%sql
WITH CONTEOS AS (
  select spr.pedido_compra,count(*) as conteo from default.silver_pedidos_raw spr 
  group by spr.pedido_compra 
)

SELECT SUM(cs.conteo) FROM CONTEOS CS

In [0]:
%sql
WITH CONTEOS AS (
  select spp.pedido_compra,count(*) as conteo from default.silver_pedidos_pago spp 
  group by spp.pedido_compra 
)

SELECT SUM(cs.conteo) FROM CONTEOS CS

In [0]:
%sql
select * from default.silver_pedidos_raw spr
where spr.pedido_compra = 4800816700

In [0]:
%sql
select spp.nombre_proveedor,
spp.pedido_compra, 
sum(spp.total_pago_soles) as ganancia_proveedor 
from default.silver_pedidos_pago spp
group by spp.nombre_proveedor,spp.pedido_compra 
ORDER BY ganancia_proveedor desc 
LIMIT 10

In [0]:
%sql
select spp.nombre_proveedor,spp.pedido_compra, sum(spp.total_pago_soles) as ganancia_proveedor from default.silver_pedidos_pago spp
group by spp.nombre_proveedor,spp.pedido_compra
HAVING spp.nombre_proveedor = 'SAN FERNANDO S.A.'
ORDER BY ganancia_proveedor desc

In [0]:
%sql
select spp.nombre_proveedor,spp.estado_pedido,spp.pedido_compra,spp.total_pago_soles from default.silver_pedidos_pago spp
where spp.nombre_proveedor = 'IMAGEN LIMA SAC'