In [0]:
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.types import StringType
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class Table:
    """Catálogo de tablas"""
    PROMOS = 'damm_silver_des.pricing_unho.ca_crm_promos_test'
    DESCUENTOS = 'damm_silver_des.pricing_unho.ca_crm_descuentos_test'
    MATERIALES = 'damm_silver_des.dm.dm_material_1'
    VENTAS = 'damm_silver_des.gest_com_horeca.ca_crm_ventas_posicion'
    ESTABLECIMIENTOS = 'damm_silver_des.dm.dm_sf_establecimiento'
    DETALLISTAS = 'damm_silver_des.dm.dm_sf_detallista'
    RVL = 'damm_silver_des.gest_com_horeca.ca_crm_rentabilidad'

class ClasesCondiciones:
    """Clasificación de condiciones comerciales"""
    Tarifa = ["050"]
    Obsequios = ["100"]
    Promociones = ["300"]
    Descuentos = ["210"]
    Contratos = ["400"]
    AmortizacionesCDT = ["420"]
    Rappels = ["740"]
    CostesDistribuidor = ["900", "901", "920", "902", "930"]
    Costes = ["858", "859", "861", "890", "954", "956"]

def create_aggregator_column(col_name: str):
    """
    Crea columna AgregadorCondicion
    """
    condition = None
    for categoria, codigos in ClasesCondiciones.__dict__.items():
        if not categoria.startswith("__"):
            if condition is None:
                condition = F.when(F.col(col_name).isin(codigos), F.lit(categoria))
            else:
                condition = condition.when(F.col(col_name).isin(codigos), F.lit(categoria))
    return condition.otherwise(F.lit(None))


def get_materiales_filtrados(spark: SparkSession) -> 'DataFrame':
    """
    Obtiene y cachea materiales filtrados
    """
    logger.info("Cargando y filtrando materiales...")
    
    df = (
        spark
        .table(Table.MATERIALES)
        .filter(F.col("Sector") == '1')
        .filter(~F.col("Material").isin(["TB8", "TB10", "TB12"]))
        .select("Material")
        .distinct()
        .cache()  # Cache porque se usa en múltiples joins
    )
    
    count = df.count()
    logger.info(f"Materiales filtrados: {count} registros")
    
    return df


def get_rentabilidad_agregada(spark: SparkSession, df_materiales: 'DataFrame', 
                               fecha_inicio: str, fecha_fin: str) -> 'DataFrame':
    """
    Obtiene rentabilidad agregada por dimensiones clave
    """
    logger.info(f"Cargando rentabilidad entre {fecha_inicio} y {fecha_fin}...")
    
    df = (
        spark
        .table(Table.RVL)
        .filter(F.col("Fecha") >= fecha_inicio)
        .filter(F.col("Fecha") <= fecha_fin)
        .join(F.broadcast(df_materiales), on="Material", how="inner")
        .withColumn("AgregadorCondicion", create_aggregator_column("CtaResNiv2"))
        .groupBy("Detallista", "Material", "Fecha", "AgregadorCondicion", "claseOperacion")
        .agg(F.sum("MargenDetallado").alias("MargenTotal"))
        .withColumnRenamed("claseOperacion", "ClaseOperacionID")
    )
    
    logger.info(f"Rentabilidad cargada: {df.count()} registros agregados")
    
    return df


def ajustar_costes_distribuidor(df_rvl: 'DataFrame') -> 'DataFrame':
    """
    Ajusta márgenes de CostesDistribuidor restando MargenTarifa
    """
    logger.info("Ajustando costes de distribuidor...")
    
    # Paso 1: Extraer márgenes de Tarifa en una tabla separada
    df_margen_tarifa = (
        df_rvl
        .filter(F.col("AgregadorCondicion") == "Tarifa")
        .groupBy("Detallista", "Material", "Fecha", "ClaseOperacionID")
        .agg(F.sum("MargenTotal").alias("MargenTarifa"))
    )
    
    df_ajustado = (
        df_rvl
        .join(
            df_margen_tarifa,
            on=["Detallista", "Material", "Fecha", "ClaseOperacionID"],
            how="left"  # Left join para mantener todos los registros
        )
        .withColumn(
            "MargenTotal",
            F.when(
                F.col("AgregadorCondicion") == "CostesDistribuidor",
                F.col("MargenTotal") - F.coalesce(F.col("MargenTarifa"), F.lit(0))
            ).otherwise(F.col("MargenTotal"))
        )
        .drop("MargenTarifa")
    )
    
    return df_ajustado


def get_ventas_agregadas(spark: SparkSession, df_materiales: 'DataFrame') -> 'DataFrame':
    """
    Obtiene ventas agregadas por dimensiones clave
    """
    logger.info("Cargando ventas...")
    
    df = (
        spark
        .table(Table.VENTAS)
        .join(F.broadcast(df_materiales), on="Material", how="inner")
        .groupBy("Detallista", "Material", "Fecha", "ClaseOperacionID")
        .agg(
            F.sum("VentaNeta").alias("VentaNeta"),
            F.sum("CantidadLitros").alias("CantidadLitros")
        )
        .select("Detallista", "Material", "Fecha", "ClaseOperacionID", "CantidadLitros", "VentaNeta")
    )
    
    logger.info(f"Ventas cargadas: {df.count()} registros agregados")
    
    return df


def pivotar_y_unir(df_rvl_ajustado: 'DataFrame', df_ventas: 'DataFrame',
                   num_partitions: int = 200) -> 'DataFrame':
    """
    Pivotea rentabilidad y une con ventas
    """
    logger.info("Pivotando rentabilidad...")
    
    df_reparticionado = df_rvl_ajustado.repartition(
        num_partitions,
        "Detallista", "Material", "Fecha", "ClaseOperacionID"
    )
    
    df_pivotado = (
        df_reparticionado
        .groupBy("Detallista", "Material", "Fecha", "ClaseOperacionID")
        .pivot("AgregadorCondicion")
        .agg(F.sum("MargenTotal"))
        .fillna(0)
    )
    
    logger.info("Uniendo con ventas...")
    
    df_final = df_pivotado.join(
        df_ventas,
        on=["Detallista", "Material", "Fecha", "ClaseOperacionID"],
        how="left"
    )
    
    return df_final


def main(spark: SparkSession, fecha_inicio: str = "2024-01-01", 
         fecha_fin: str = "2025-12-31", num_partitions: int = 200):
    """
    Función principal del pipeline
    """
    logger.info("=" * 80)
    logger.info("Iniciando pipeline de rentabilidad optimizado")
    logger.info("=" * 80)
    
    df_materiales = get_materiales_filtrados(spark)
    df_rvl = get_rentabilidad_agregada(spark, df_materiales, fecha_inicio, fecha_fin)
    df_rvl_ajustado = ajustar_costes_distribuidor(df_rvl)
    df_ventas = get_ventas_agregadas(spark, df_materiales)
    df_final = pivotar_y_unir(df_rvl_ajustado, df_ventas, num_partitions)
    
    logger.info("=" * 80)
    logger.info("Pipeline completado exitosamente")
    logger.info("=" * 80)
    
    df_materiales.unpersist()
    
    return df_final


def pivotar_selectivo(df_rvl_ajustado: 'DataFrame', df_ventas: 'DataFrame') -> 'DataFrame':
    """
    Alternativa al pivot: crear solo las columnas necesarias con agregaciones
    Útil cuando tienes muchas categorías pero solo necesitas algunas
    """
    logger.info("Creando columnas selectivas (sin pivot)...")
    
    df_agregado = (
        df_rvl_ajustado
        .groupBy("Detallista", "Material", "Fecha", "ClaseOperacionID")
        .agg(
            # Crear columna para cada categoría relevante
            F.sum(F.when(F.col("AgregadorCondicion") == "Tarifa", F.col("MargenTotal")).otherwise(0)).alias("Tarifa"),
            F.sum(F.when(F.col("AgregadorCondicion") == "Obsequios", F.col("MargenTotal")).otherwise(0)).alias("Obsequios"),
            F.sum(F.when(F.col("AgregadorCondicion") == "Promociones", F.col("MargenTotal")).otherwise(0)).alias("Promociones"),
            F.sum(F.when(F.col("AgregadorCondicion") == "Descuentos", F.col("MargenTotal")).otherwise(0)).alias("Descuentos"),
            F.sum(F.when(F.col("AgregadorCondicion") == "Contratos", F.col("MargenTotal")).otherwise(0)).alias("Contratos"),
            F.sum(F.when(F.col("AgregadorCondicion") == "AmortizacionesCDT", F.col("MargenTotal")).otherwise(0)).alias("AmortizacionesCDT"),
            F.sum(F.when(F.col("AgregadorCondicion") == "Rappels", F.col("MargenTotal")).otherwise(0)).alias("Rappels"),
            F.sum(F.when(F.col("AgregadorCondicion") == "CostesDistribuidor", F.col("MargenTotal")).otherwise(0)).alias("CostesDistribuidor"),
            F.sum(F.when(F.col("AgregadorCondicion") == "Costes", F.col("MargenTotal")).otherwise(0)).alias("Costes")
        )
        .join(df_ventas, on=["Detallista", "Material", "Fecha", "ClaseOperacionID"], how="left")
    )
    
    return df_agregado


if __name__ == "__main__":
    spark = (SparkSession.builder
        .appName("Rentabilidad_Optimizado")
        .config("spark.sql.adaptive.enabled", "true") 
        .config("spark.sql.adaptive.coalescePartitions.enabled", "true")
        .config("spark.sql.autoBroadcastJoinThreshold", "10485760")  
        .config("spark.sql.shuffle.partitions", "200")
        .getOrCreate()
    )
    
    # Ejecutar pipeline
    df_resultado = main(spark, fecha_inicio="2024-01-01", fecha_fin="2025-12-31")
    
    # Mostrar resultado
    df_resultado.show(10)
    
    # Opcionalmente guardar
    # df_resultado.write.mode("overwrite").saveAsTable("damm_silver_des.pricing_unho.ca_ph_pricing")


In [0]:

from pyspark.sql import SparkSession, functions as F
from pyspark.sql.types import StringType
import logging

# Configuración de logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


class Table:
    """Catálogo de tablas"""
    MATERIALES = 'damm_silver_des.dm.dm_material_1'
    VENTAS = 'damm_silver_des.gest_com_horeca.ca_crm_ventas_posicion'
    RVL = 'damm_silver_des.gest_com_horeca.ca_crm_rentabilidad'
    OUTPUT = 'damm_silver_des.pricing_unho.ca_ph_pricing' 


class ClasesCondiciones:
    """Clasificación de condiciones comerciales"""
    Tarifa = ["050"]
    Obsequios = ["100"]
    Promociones = ["300"]
    Descuentos = ["210"]
    Contratos = ["400"]
    AmortizacionesCDT = ["420"]
    Rappels = ["740"]
    CostesDistribuidor = ["900", "901", "920", "902", "930"]
    Costes = ["858", "859", "861", "890", "954", "956"]


class SparkConfig:
    """Configuración recomendada para 188M registros"""
    
    SHUFFLE_PARTITIONS = 400
    AUTO_BROADCAST_THRESHOLD = 10485760
    ADAPTIVE_ENABLED = True
    COALESCE_PARTITIONS = True
    SORT_MERGE_JOIN_ENABLED = True


def create_aggregator_column(col_name: str):
    """
    Crea columna AgregadorCondicion usando F.when()
    """
    condition = None
    for categoria, codigos in ClasesCondiciones.__dict__.items():
        if not categoria.startswith("__") and not categoria.startswith("get_"):
            if condition is None:
                condition = F.when(F.col(col_name).isin(codigos), F.lit(categoria))
            else:
                condition = condition.when(F.col(col_name).isin(codigos), F.lit(categoria))
    return condition.otherwise(F.lit(None))


def configure_spark_session(spark: SparkSession):
    """
    Configura Spark para manejar 188M registros eficientemente
    """
    logger.info("Configurando Spark para grandes volúmenes...")
    
    spark.conf.set("spark.sql.shuffle.partitions", SparkConfig.SHUFFLE_PARTITIONS)
    spark.conf.set("spark.sql.adaptive.enabled", SparkConfig.ADAPTIVE_ENABLED)
    spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", SparkConfig.COALESCE_PARTITIONS)
    spark.conf.set("spark.sql.autoBroadcastJoinThreshold", SparkConfig.AUTO_BROADCAST_THRESHOLD)
    spark.conf.set("spark.sql.join.preferSortMergeJoin", SparkConfig.SORT_MERGE_JOIN_ENABLED)
    
    spark.conf.set("spark.databricks.delta.optimizeWrite.enabled", "true")
    spark.conf.set("spark.databricks.delta.autoCompact.enabled", "true")
    
    logger.info(f"✅ Spark configurado: {SparkConfig.SHUFFLE_PARTITIONS} shuffle partitions")


def get_materiales_filtrados(spark: SparkSession) -> 'DataFrame':
    """
    Obtiene materiales filtrados
    """
    logger.info("Cargando materiales (225K registros)...")
    
    df = (
        spark
        .table(Table.MATERIALES)
        .filter(F.col("Sector") == '1')
        .filter(~F.col("Material").isin(["TB8", "TB10", "TB12"]))
        .select("Material")
        .distinct()
    )
    
    count = df.count()
    logger.info(f"✅ Materiales filtrados: {count:,} registros")
    
    return df


def get_rentabilidad_base(spark: SparkSession, fecha_inicio: str, fecha_fin: str) -> 'DataFrame':
    """
    Carga RVL con filtros optimizados
    """
    logger.info(f"Cargando RVL (188M registros) entre {fecha_inicio} y {fecha_fin}...")
    
    df = (
        spark
        .table(Table.RVL)
        .filter(F.col("Fecha") >= fecha_inicio)
        .filter(F.col("Fecha") <= fecha_fin)
        .select(
            "Detallista",
            "Material", 
            "Fecha",
            "claseOperacion",
            "CtaResNiv2",
            "MargenDetallado"
        )
    )
    
    count = df.count()
    logger.info(f"✅ RVL cargado: {count:,} registros después de filtro de fechas")
    
    return df


def procesar_rvl_con_materiales(df_rvl: 'DataFrame', df_materiales: 'DataFrame') -> 'DataFrame':
    """
    Join RVL con Materiales
    """
    logger.info("Ejecutando join RVL + Materiales (sort-merge join)...")

    df_rvl_repart = df_rvl.repartition(SparkConfig.SHUFFLE_PARTITIONS, "Material")
    df_materiales_repart = df_materiales.repartition("Material")
    
    df_joined = (
        df_rvl_repart
        .join(df_materiales_repart, on="Material", how="inner")
        .withColumn("AgregadorCondicion", create_aggregator_column("CtaResNiv2"))
    )
    
    count = df_joined.count()
    logger.info(f"✅ Join completado: {count:,} registros")
    
    return df_joined


def agregar_rentabilidad(df: 'DataFrame') -> 'DataFrame':
    """
    Agrega rentabilidad por dimensiones clave
    """
    logger.info("Agregando rentabilidad...")
    
    df_repart = df.repartition(
        SparkConfig.SHUFFLE_PARTITIONS,
        "Detallista", "Material", "Fecha", "AgregadorCondicion", "claseOperacion"
    )
    
    df_agregado = (
        df_repart
        .groupBy("Detallista", "Material", "Fecha", "AgregadorCondicion", "claseOperacion")
        .agg(F.sum("MargenDetallado").alias("MargenTotal"))
        .withColumnRenamed("claseOperacion", "ClaseOperacionID")
    )
    
    count = df_agregado.count()
    logger.info(f"✅ Rentabilidad agregada: {count:,} registros")
    
    return df_agregado


def ajustar_costes_distribuidor(df_rvl: 'DataFrame') -> 'DataFrame':
    """
    Ajusta márgenes de CostesDistribuidor restando MargenTarifa
    """
    logger.info("Ajustando costes de distribuidor...")
    
    df_margen_tarifa = (
        df_rvl
        .filter(F.col("AgregadorCondicion") == "Tarifa")
        .groupBy("Detallista", "Material", "Fecha", "ClaseOperacionID")
        .agg(F.sum("MargenTotal").alias("MargenTarifa"))
    )

    if df_margen_tarifa.count() < 1_000_000:
        df_margen_tarifa = df_margen_tarifa.cache()
        logger.info("✅ MargenTarifa cacheado (< 1M registros)")
    
    df_ajustado = (
        df_rvl
        .join(
            df_margen_tarifa,
            on=["Detallista", "Material", "Fecha", "ClaseOperacionID"],
            how="left"
        )
        .withColumn(
            "MargenTotal",
            F.when(
                F.col("AgregadorCondicion") == "CostesDistribuidor",
                F.col("MargenTotal") - F.coalesce(F.col("MargenTarifa"), F.lit(0))
            ).otherwise(F.col("MargenTotal"))
        )
        .drop("MargenTarifa")
    )
    
    if df_margen_tarifa.is_cached:
        df_margen_tarifa.unpersist()
    
    logger.info("✅ Ajuste de costes completado")
    
    return df_ajustado


def get_ventas_agregadas(spark: SparkSession, df_materiales: 'DataFrame') -> 'DataFrame':
    """
    Obtiene ventas agregadas (22M registros)
    """
    logger.info("Cargando y agregando ventas (22M registros)...")
    
    df_ventas = spark.table(Table.VENTAS)
    
    df_ventas_repart = df_ventas.repartition(SparkConfig.SHUFFLE_PARTITIONS, "Material")
    df_materiales_repart = df_materiales.repartition("Material")
    
    df = (
        df_ventas_repart
        .join(df_materiales_repart, on="Material", how="inner")
        .groupBy("Detallista", "Material", "Fecha", "ClaseOperacionID")
        .agg(
            F.sum("VentaNeta").alias("VentaNeta"),
            F.sum("CantidadLitros").alias("CantidadLitros")
        )
    )
    
    count = df.count()
    logger.info(f"✅ Ventas agregadas: {count:,} registros")
    
    return df


def pivotar_rentabilidad(df_rvl_ajustado: 'DataFrame') -> 'DataFrame':
    """
    Pivota rentabilidad por AgregadorCondicion
    """
    logger.info("Pivotando rentabilidad (9 categorías)...")
    
    df_repart = df_rvl_ajustado.repartition(
        SparkConfig.SHUFFLE_PARTITIONS,
        "Detallista", "Material", "Fecha", "ClaseOperacionID"
    )
    
    df_pivotado = (
        df_repart
        .groupBy("Detallista", "Material", "Fecha", "ClaseOperacionID")
        .pivot("AgregadorCondicion", [
            "Tarifa", "Obsequios", "Promociones", "Descuentos", 
            "Contratos", "AmortizacionesCDT", "Rappels", 
            "CostesDistribuidor", "Costes"
        ])
        .agg(F.sum("MargenTotal"))
        .fillna(0)
    )
    
    count = df_pivotado.count()
    logger.info(f"✅ Pivot completado: {count:,} registros")
    
    return df_pivotado


def unir_con_ventas(df_pivotado: 'DataFrame', df_ventas: 'DataFrame') -> 'DataFrame':
    """
    Une datos pivotados con ventas
    NOTA: Left join porque puede haber registros sin ventas (rappels)
    """
    logger.info("Uniendo con ventas...")
    
    df_final = df_pivotado.join(
        df_ventas,
        on=["Detallista", "Material", "Fecha", "ClaseOperacionID"],
        how="left"
    )
    
    count = df_final.count()
    logger.info(f"✅ Join final completado: {count:,} registros")
    
    return df_final


def guardar_resultado(df: 'DataFrame', tabla_output: str):
    """
    Guarda resultado final en tabla Delta optimizada
    """
    logger.info(f"Guardando resultado en {tabla_output}...")

    df.write \
        .format("delta") \
        .mode("overwrite") \
        .partitionBy("Fecha") \
        .option("overwriteSchema", "true") \
        .saveAsTable(tabla_output)
    
    logger.info(f"✅ Resultado guardado en {tabla_output}")
    
    logger.info("Optimizando tabla Delta...")
    spark.sql(f"OPTIMIZE {tabla_output}")
    
    logger.info("Aplicando Z-Order...")
    spark.sql(f"OPTIMIZE {tabla_output} ZORDER BY (Detallista, Material)")
    
    logger.info("✅ Optimización Delta completada")


def main(spark: SparkSession, 
         fecha_inicio: str = "2024-01-01", 
         fecha_fin: str = "2025-12-31",
         tabla_output: str = None):
    """
    Pipeline principal optimizado para 188M registros
    """
    
    logger.info("=" * 80)
    logger.info("PIPELINE RENTABILIDAD - Optimizado para 188M registros")
    logger.info("=" * 80)
    
    configure_spark_session(spark)
    df_materiales = get_materiales_filtrados(spark)
    df_rvl = get_rentabilidad_base(spark, fecha_inicio, fecha_fin)
    df_rvl_con_materiales = procesar_rvl_con_materiales(df_rvl, df_materiales)
    df_rvl_agregado = agregar_rentabilidad(df_rvl_con_materiales)
    df_rvl_ajustado = ajustar_costes_distribuidor(df_rvl_agregado)
    df_ventas = get_ventas_agregadas(spark, df_materiales)
    df_pivotado = pivotar_rentabilidad(df_rvl_ajustado)
    df_final = unir_con_ventas(df_pivotado, df_ventas)
    tabla_output = tabla_output or Table.OUTPUT
    
    guardar_resultado(df_final, tabla_output)
    
    logger.info("=" * 80)
    logger.info("✅ PIPELINE COMPLETADO EXITOSAMENTE")
    logger.info("=" * 80)
    
    return df_final

if __name__ == "__main__":
    
    spark = SparkSession.builder \
        .appName("Rentabilidad_188M_Optimizado") \
        .getOrCreate()
    
    df_resultado = main(
        spark,
        fecha_inicio="2024-01-01",
        fecha_fin="2025-12-31",
        tabla_output="damm_silver_des.pricing_unho.rentabilidad_consolidada"
    )
    
    logger.info("\n📊 Muestra del resultado final:")
    df_resultado.show(10, truncate=False)

    logger.info("\n📊 Estadísticas del resultado:")
    logger.info(f"Total registros: {df_resultado.count():,}")
    logger.info(f"Particiones: {df_resultado.rdd.getNumPartitions()}")

