### Librerias y Funciones

In [0]:
spark.conf.set("spark.databricks.io.cache.enabled", True)
spark.conf.set('spark.sql.shuffle.partitions', 'auto')

In [0]:
%run ../../../../../04_utils/commons_functions_de

### Librerías

### Funciones Ingenieria de datos

### Funciones de ingesta en RDS

### Funciones de control de flujo de ingesta

In [0]:
%run ../../../../../04_utils/commons_functions_ds

In [0]:
%run ../../../../../spigot/initial/global_parameter_py

In [0]:
from pyspark.sql.types import IntegerType
import pyspark.sql.functions as F
import pandas as pd
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number
from pyspark.sql.functions import regexp_extract
from pyspark import StorageLevel

#### Carga de Fuente

In [0]:
int_pedidos_clientes = (spark.read.parquet("/Volumes/dbw_prod_aavanzada/db_tmp/files/pburbano/data/")
                                  .withColumn("fecha_pedido_dt", F.to_date(F.col("fecha_pedido_dt")))
                        )

#### Creacion de MDT 

In [0]:
def definir_ventanas():
    # Definir ventanas
    window_cliente = Window.partitionBy("cliente_id").orderBy("fecha_pedido_dt")
    return window_cliente

def enriquecer_df_base(df_pedidos, window_cliente):
    df = df_pedidos.withColumn("fecha_ultimo_pedido", F.max("fecha_pedido_dt").over(Window.partitionBy("cliente_id")))
    df = df.withColumn("canal_previo", F.lag("canal_pedido_cd").over(window_cliente))
    df = df.withColumn("es_ultimo", F.when(F.col("fecha_pedido_dt") == F.col("fecha_ultimo_pedido"), 1).otherwise(0))
    return df

def crear_target_multiclase(df):
    # Crear target multiclase
    df = df.withColumn(
        "target",
        F.when((F.col("canal_previo") != "DIGITAL") & (F.col("canal_pedido_cd") != "DIGITAL"), 0)
         .when((F.col("canal_previo") == "DIGITAL") & (F.col("canal_pedido_cd") == "DIGITAL"), 1)
         .when((F.col("canal_previo") != "DIGITAL") & (F.col("canal_pedido_cd") == "DIGITAL"), 2)
         .when((F.col("canal_previo") == "DIGITAL") & (F.col("canal_pedido_cd") != "DIGITAL"), 3)
    )

    # Filtrar último pedido con target válido
    df_target = (
        df.filter("es_ultimo = 1")
          .filter(F.col("target").isNotNull())
          .select("cliente_id", "target", F.col("fecha_ultimo_pedido").alias("fecha_ultimo_pedido_target"))
    )
    
    return df, df_target

def crear_historico(df, df_target):
    # Crear histórico previo al último pedido
    df_historico = (
        df.filter(F.col("es_ultimo") == 0)
          .join(df_target.select("cliente_id", "fecha_ultimo_pedido_target"), "cliente_id", "inner")
          .withColumn("dias_antes_ultimo", F.datediff("fecha_ultimo_pedido_target", "fecha_pedido_dt"))
          .withColumn("canal_pedido_cd", F.when(F.col("canal_pedido_cd") == "DIGITAL", "DIGITAL").otherwise("NO_DIGITAL"))
    )

    df_historico = df_historico.repartition("cliente_id").persist(StorageLevel.MEMORY_AND_DISK)
    return df_historico

def agregar_variables_pedidos(df_historico):
    # Variables agregadas del histórico
    f_pedidos = df_historico.groupBy("cliente_id").agg(
        F.count("*").alias("n_pedidos_previos"),
        F.countDistinct("canal_pedido_cd").alias("n_canales_utilizados")
    )
    return f_pedidos

def agregar_variables_frecuencia(df_historico):
    # calcular días entre pedidos
    w_orden = Window.partitionBy("cliente_id").orderBy("fecha_pedido_dt")
    df_historico = df_historico.withColumn(
        "dias_entre_pedidos", F.datediff("fecha_pedido_dt", F.lag("fecha_pedido_dt").over(w_orden))
    )

    # agregar canal previo y cambio de canal
    df_historico = df_historico.withColumn("canal_previo", F.lag("canal_pedido_cd").over(w_orden))
    df_historico = df_historico.withColumn(
        "cambio_canal", F.when(F.col("canal_previo") != F.col("canal_pedido_cd"), 1).otherwise(0)
    )

    # cambios hacia y resde digital
    df_historico = df_historico.withColumn(
        "cambio_a_digital",
        F.when((F.col("canal_previo") != "DIGITAL") & (F.col("canal_pedido_cd") == "DIGITAL"), 1).otherwise(0)
    )
    df_historico = df_historico.withColumn(
        "cambio_desde_digital",
        F.when((F.col("canal_previo") == "DIGITAL") & (F.col("canal_pedido_cd") != "DIGITAL"), 1).otherwise(0)
    )

    # frecuncia de pedidos
    f_frecuencia = df_historico.groupBy("cliente_id").agg(
        F.mean("dias_entre_pedidos").alias("dias_entre_pedidos_mean"),
        F.expr("percentile(dias_entre_pedidos, 0.5)").alias("dias_entre_pedidos_median"),
        F.min("dias_entre_pedidos").alias("dias_entre_pedidos_min"),
        F.max("dias_entre_pedidos").alias("dias_entre_pedidos_max")
    )
    return df_historico, f_frecuencia

def agregar_variables_canales(df_historico):
    # Conteo de cada canal
    f_canales = df_historico.groupBy("cliente_id").pivot("canal_pedido_cd").count().fillna(0)
    if "DIGITAL" not in f_canales.columns:
        f_canales = f_canales.withColumn("DIGITAL", F.lit(0))
    if "NO_DIGITAL" not in f_canales.columns:
        f_canales = f_canales.withColumn("NO_DIGITAL", F.lit(0))
    f_canales = (
        f_canales.withColumnRenamed("DIGITAL", "n_digital")
                 .withColumnRenamed("NO_DIGITAL", "n_no_digital")
                 .withColumn(
                     "prop_digital",
                     F.when(
                         (F.col("n_digital") + F.col("n_no_digital")) > 0,
                         F.col("n_digital") / (F.col("n_digital") + F.col("n_no_digital"))
                     ).otherwise(0)
                 )
    )
    return f_canales

def agregar_variables_valores(df_historico):
    # stadisticas variables numericas
    f_valores = (
        df_historico.groupBy("cliente_id").agg(
            F.sum("facturacion_usd_val").alias("facturacion_total"),
            F.avg("facturacion_usd_val").alias("facturacion_prom"),
            F.stddev("facturacion_usd_val").alias("facturacion_std"),
            F.sum("materiales_distintos_val").alias("materiales_distintos_total"),
            F.avg("materiales_distintos_val").alias("materiales_prom"),
            F.stddev("materiales_distintos_val").alias("materiales_std"),
            F.sum("cajas_fisicas").alias("cajas_fisicas_total"),
            F.avg("cajas_fisicas").alias("cajas_fisicas_prom"),
            F.stddev("cajas_fisicas").alias("cajas_fisicas_std"),
        )
        .fillna(0, subset=["facturacion_std", "materiales_std", "cajas_fisicas_std"])
    )
    return f_valores

def agregar_variables_fijas(df_historico):
    # variables que no cambian en el tiempo
    f_variables_fijas = (
        df_historico.groupBy("cliente_id").agg(
            F.first("pais_cd").alias("pais_cd"),
            F.first("region_comercial_txt").alias("region_comercial_txt"),
            F.first("tipo_cliente_cd").alias("tipo_cliente_cd"),
            F.first("madurez_digital_cd").alias("madurez_digital_val"),
            F.first("estrellas_txt").cast("int").alias("estrellas_val"),
            F.length(F.first("frecuencia_visitas_cd")).alias("frecuencia_visitas_val"),
            F.first("fecha_ultimo_pedido_target").cast("timestamp").alias("fecha_ultimo_pedido"),
        )
    )
    return f_variables_fijas

def agregar_variables_ruta_agencia(df_historico):
    # informacion de agencia y ruta
    df_historico = (
        df_historico.withColumn("agencia_num", regexp_extract(F.col("agencia_id"), r"A(\d+)$", 1).cast("int"))
                     .withColumn("ruta_num", regexp_extract(F.col("ruta_id"), r"R(\d+)$", 1).cast("int"))
    )

    f_ruta_agencia = (
        df_historico.groupBy("cliente_id").agg(
            F.first("agencia_num").alias("agencia_num"),
            F.first("ruta_num").alias("ruta_num")
        )
    )
    return df_historico, f_ruta_agencia

def construir_mdt_final(f_pedidos, f_canales, f_valores, f_frecuencia, f_ruta_agencia, f_variables_fijas, df_target):
    df_mdt = (
        f_pedidos
        .join(f_canales, "cliente_id", "left")
        .join(f_valores, "cliente_id", "left")
        .join(f_frecuencia, "cliente_id", "left")
        .join(f_ruta_agencia, "cliente_id", "left")
        .join(f_variables_fijas, "cliente_id", "left")
        .join(df_target.select("cliente_id", "target"), "cliente_id", "inner")
    )
    return df_mdt

def pipeline_completo(int_pedidos_clientes):
    # 1. Definir ventanas
    window_cliente = definir_ventanas()
    
    # 2. Enriquecer DF base
    df = enriquecer_df_base(int_pedidos_clientes, window_cliente)
    
    # 3. Crear target multiclase
    df, df_target = crear_target_multiclase(df)
    
    # 4. Crear histórico
    df_historico = crear_historico(df, df_target)
    
    # 5. Agregar variables
    f_pedidos = agregar_variables_pedidos(df_historico)
    df_historico, f_frecuencia = agregar_variables_frecuencia(df_historico)
    f_canales = agregar_variables_canales(df_historico)
    f_valores = agregar_variables_valores(df_historico)
    f_variables_fijas = agregar_variables_fijas(df_historico)
    df_historico, f_ruta_agencia = agregar_variables_ruta_agencia(df_historico)
    
    # 6. Construir MDT final
    df_mdt = construir_mdt_final(f_pedidos, f_canales, f_valores, f_frecuencia, f_ruta_agencia, f_variables_fijas, df_target)
    
    return df_mdt

df_mdt = pipeline_completo(int_pedidos_clientes)

com.databricks.backend.common.rpc.CommandCancelledException
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$5(SequenceExecutionState.scala:136)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3(SequenceExecutionState.scala:136)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3$adapted(SequenceExecutionState.scala:133)
	at scala.collection.immutable.Range.foreach(Range.scala:158)
	at com.databricks.spark.chauffeur.SequenceExecutionState.cancel(SequenceExecutionState.scala:133)
	at com.databricks.spark.chauffeur.ExecContextState.cancelRunningSequence(ExecContextState.scala:730)
	at com.databricks.spark.chauffeur.ExecContextState.$anonfun$cancel$1(ExecContextState.scala:448)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.ExecContextState.cancel(ExecContextState.scala:448)
	at com.databricks.spark.chauffeur.ChauffeurState.cancelExecutio