#Imports

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window

#Widgets

In [0]:
dbutils.widgets.removeAll()
dbutils.widgets.text("catalog","catalog_ecommerce")
dbutils.widgets.text("schema_source", "bronze")
dbutils.widgets.text("schema_sink", "silver")

#Constantes

In [0]:
catalog         =  dbutils.widgets.get("catalog")
schema_source   =  dbutils.widgets.get("schema_source")
schema_sink     = dbutils.widgets.get("schema_sink")
window_spec = Window.partitionBy("order_id").orderBy(col("payment_value").desc())

#Preparacion

In [0]:
df_payments_bronze = spark.table(f"{catalog}.{schema_source}.payments")

#Limpieza

In [0]:
df_payments_cleaned = df_payments_bronze.withColumn("payment_value", col("payment_value").cast(DecimalType(10,2)))\
    .withColumn("payment_installments", col("payment_installments")
        .cast(IntegerType())) \
    .withColumn("payment_type", trim(upper(col("payment_type"))))


#Conversion

In [0]:
#Sumar todos los pagos por orden
df_payments_agg = df_payments_cleaned \
    .groupBy("order_id") \
    .agg(
        sum("payment_value").alias("total_payment_value"),
        count("*").alias("payment_count")
    )

In [0]:
#Agregar Pagos por Ordenes y numero de cuotas por orden
df_payment_method = df_payments_cleaned \
    .withColumn("rank", row_number().over(window_spec)) \
    .filter(col("rank") == 1) \
    .select(
        "order_id", 
        col("payment_type").alias("primary_payment_type"),
        col("payment_installments").alias("primary_payment_installments")
    )

In [0]:
df_payments_silver = df_payments_agg \
    .join(df_payment_method, "order_id", "left")\
    .withColumn("has_multiple_payments",
            when(col("payment_count") > 1, True).otherwise(False)) \
    .withColumn("avg_payment_value",
            col("total_payment_value") / col("payment_count"))

#Guardamos tabla

In [0]:
df_payments_cleaned.write.mode("overwrite").saveAsTable(f"{catalog}.{schema_sink}.payments")