## Modelo de deserción para clientes 

#### Nos enfocaremos en construir un modelo de aprendizaje supervisado, que prediga que clientes van a dejar de comprar un producto. Particularmente en productos de Azar.

#### A tener en cuenta:

1. Se usará un cluster de Spark que procesa el gran volumen de información transaccional que el las ventas de chance tienen. 
2. La base de datos es semiestructurada: MongoDB, por lo cual las consultas usarán la sintaxis documental.


##### Se importan las librerias de Spark

In [1]:
## Importa las librerías y módulos de PySpark
import pyspark
import pyspark.sql.functions as F

from pyspark import SparkConf
from pyspark.sql import SparkSession

from pyspark.sql.types import *
from pyspark.storagelevel import StorageLevel
from pyspark.sql.window import Window
from pyspark.sql import Window
from pyspark.sql.functions import sum as spark_sum, first, lit, datediff, count as spark_count, lag, when, col, udf, col, count, row_number, monotonically_increasing_id

from pyspark.sql.functions import *
from pyspark.sql.functions import sum

from datetime import datetime, date

import pandas as pd

import warnings
warnings.filterwarnings("ignore")

##### Configuración cluster Spark

In [17]:
def spark_config(local=True):
    # Configuracion de spark
    
    if local:
       conf = SparkConf().set("spark.jars.packages","org.mongodb.spark:mongo-spark-connector_2.12:3.0.1,com.oracle.database.jdbc:ojdbc8:19.18.0.0")\
        .setMaster("local[*]")\
        .setAppName("pruebas")
        
    else:
        conf = SparkConf().set("spark.jars.packages","org.mongodb.spark:mongo-spark-connector_2.12:3.0.1,com.oracle.database.jdbc:ojdbc8:19.18.0.0")\
         .set("spark.executor.memory", "4g")\
         .set("spark.driver.memory", "4g")\
         .set("spark.executor.cores", "2")\
         .set("spark.sql.codegen.wholeStage","false") \
         .set("spark.dynamicAllocation.enabled", "true") \
         .set("spark.sql.autoBroadcastJoinThreshold","-1") \
         .set("spark.dynamicAllocation.shuffleTracking.enabled", "true") \
         .set("spark.dynamicAllocation.initialExecutors", "2") \
         .set("spark.dynamicAllocation.minExecutors", "2") \
         .set("spark.dynamicAllocation.maxExecutors", "4") \
         .set("spark.driver.port", "18100")\
         .set("spark.driver.blockManager.port", "19100") \
         .set("spark.driver.bindAddress","0.0.0.0") \
         .set("spark.driver.host", "10.192.85.54") \
         .setMaster("spark://10.10.1.15:7077")\
         .setAppName("desercion_extraccion")
        
              #.set("spark.sql.autoBroadcastJoinThreshold","-1")\
               #.set("spark.speculation","false") \
    # Se crea la sesion de spark
    spark = SparkSession.builder.config(conf=conf).getOrCreate()
    print('Se configuro Spark')
    return spark


In [18]:
spark = spark_config(local=False)

24/03/24 16:25:04 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
Se configuro Spark


In [7]:
spark

IP: 10.192.85.54 \
Puerto: 8083 - Master de Spark - Se muestran todas las corridas del cluster \
Puerto: 8090 - Jupyter - Jupyter \
Puerto: 4040-4050 - Puertos para ver los monitores 

Cuando se está en modo no local


#### Se crea la conexión a la base de datos

##### A tener en cuenta: La bd mongo tiene 2 esquemas, uno transaccional y uno de clientes. El de clientes se divide en clientes con errores de calidad y sin errores de calidad... Paso siguiente vamos a realizar las consultas a la bd mongo para extraer la información correspondiende a las transaccionales por clientes.


#### Esquema de transacciones


Los principales componentes de esta consulta son:

$match: Esta etapa se utiliza para filtrar documentos que cumplan con ciertos criterios. En este caso, se están buscando documentos que coincidan con las siguientes condiciones:

La "empresa_origen" debe ser "SUSUERTE S.A." \
La "fecha_transaccion" debe estar en el rango entre XXXXXXXXXXXXXXXXX.\

Otro $match:$ Esta es otra etapa $match que se utiliza para aplicar múltiples condiciones de filtrado. Las condiciones incluyen:

El campo "linea" debe ser "APUESTAS". Pues Chance es un producto de apuestas\
El campo "cliente_origen_identificacion.documento" no debe ser nulo.\
El campo "estado" debe ser "E". De Exitoso\
El campo "naturaleza" debe ser "D". De debito\
El campo "datos_venta_apuesta.cantidad_premios" no debe ser nulo. Información de los premios\
El campo "datos_venta_apuesta.vlr_premio" no debe ser nulo.

**$project:** Esta etapa se utiliza para proyectar o seleccionar las columnas específicas que se incluirán en el resultado final.\ 


En este caso, se seleccionan las siguientes columnas y se les da nombres más simples:


"ID_TRANSACCION"   Id de la transaccion \
"CLIENTE_DOCUMENTO" ": Numero documento del cliente \
"FECHA_TRX": Fecha de la transaccion \
"VALOR_TRANSACCION": Valor de la transaccion \
"CANTIDAD_PREMIOS": Si el cliente gana, se muestra la cantidad de premios \
"VALOR_PREMIO":  Valor de los premios \
"LINEA": Linea de la apuesta
"PRODUCTO": Codigo de producto de la apuesta \
"PRODUCTO_NOMB_PRODUCTO": Nombre de producto de la apuesta \
"SUBPRODUCTO": Codigo subproducto \
"PRODUCTO_NOMB_SUB_PRODUCTO": Nombre subproducto \

In [8]:
consulta_trx =  '''[
        {"$match":{ 
                "fecha_transaccion":{"$gte": ISODate("2023-06-01"), 
                "$lte": ISODate("2023-11-30")},
                "empresa_origen": "SUSUERTE S.A.",
                "datos_venta_apuesta":{"$exists": true}, 
                "cliente_origen_identificacion.documento":{"$ne":null},
                "estado":"E",
                "naturaleza":"D",
                "linea":"APUESTAS"

        }},

        {"$project":{
                _id: 0,
                "ID_TRANSACCION" : "$id_transaccion",
                "CLIENTE_DOCUMENTO":"$cliente_origen_identificacion.documento",
                "FECHA_TRX":"$fecha_transaccion",
                "VALOR_TRANSACCION": "$datos_venta_apuesta.vlr_transaccion",
                "CANTIDAD_PREMIOS":"$datos_venta_apuesta.cantidad_premios",
                "VALOR_PREMIO": "$datos_venta_apuesta.vlr_premio",  
                "LINEA":"$linea",
                "PRODUCTO":"$codigo_producto",
                "PRODUCTO_NOMB_PRODUCTO":"$tipo_producto",
                "SUBPRODUCTO":"$codigo_sub_producto",
                "PRODUCTO_NOMB_SUB_PRODUCTO":"$tipo_sub_producto"
        }}
        ] '''


##### Lectura de query 


##### Definicion del Schema de la bd mongo

In [9]:
# nombre de la base de datos
DB= 'TransaccionesMD'
# nombre de la coleccion
Cll= 'transactions'
# URL para conectarse a la base de datos
URI= f'mongodb://CodAdmin:C0d3sa*202Z@10.192.85.46:27017/?authSource=admin&authMechanism=SCRAM-SHA-1&readPreference=secondary'


# Lectura de query 
def read_query(consulta, DB, Cll, URI):
    # Lectura de datos
    df_origen = spark.read.format("com.mongodb.spark.sql.DefaultSource") \
    .option("spark.mongodb.input.uri",URI) \
    .option("spark.mongodb.input.database",DB) \
    .option("spark.mongodb.input.collection",Cll)\
    .option("spark.mongodb.input.partitioner", "MongoSinglePartitioner") \
    .option("spark.mongodb.input.partitionerOptions.partitionKey","_id")\
    .option("spark.mongodb.input.partitionerOptions.samplesPerPartition", 10)\
    .option("spark.mongodb.input.partitioner", "MongoShardedPartitioner") \
    .option("pipeline", consulta)\
    .load()

    print(df_origen.printSchema())
    return df_origen

schema = StructType([StructField('ID_TRANSACCION', StringType(), True),
                     StructField('FECHA_TRX', TimestampType(), True),
                     StructField('VALOR_TRANSACCION', IntegerType(), True),
                     StructField('CANTIDAD_PREMIOS', IntegerType(), True),
                     StructField('VALOR_PREMIO', IntegerType(), True),
                     #StructField('LINEA', StringType(), True),
                     StructField('CLIENTE_DOCUMENTO', StringType(), True),
                     StructField('PRODUCTO_NOMB_PRODUCTO', StringType(), True),  
                     StructField('SUBPRODUCTO', StringType(), True),
                     StructField('PRODUCTO_NOMB_SUB_PRODUCTO', StringType(), True),
                     StructField('PRODUCTO', StringType(), True),
                     ])

#### Se extraen los datos y se ponen en memoria

In [10]:
%%time
df_trx = read_query(consulta_trx, DB, Cll, URI)
df_trx = df_trx.repartition(200)
df_trx = df_trx.persist(StorageLevel.DISK_ONLY_2)

                                                                                

root
 |-- CANTIDAD_PREMIOS: integer (nullable = true)
 |-- CLIENTE_DOCUMENTO: string (nullable = true)
 |-- FECHA_TRX: timestamp (nullable = true)
 |-- ID_TRANSACCION: string (nullable = true)
 |-- LINEA: string (nullable = true)
 |-- PRODUCTO: string (nullable = true)
 |-- PRODUCTO_NOMB_PRODUCTO: string (nullable = true)
 |-- PRODUCTO_NOMB_SUB_PRODUCTO: string (nullable = true)
 |-- SUBPRODUCTO: string (nullable = true)
 |-- VALOR_PREMIO: integer (nullable = true)
 |-- VALOR_TRANSACCION: double (nullable = true)

None
CPU times: user 12.3 ms, sys: 1.68 ms, total: 14 ms
Wall time: 9.48 s


In [11]:
df_trx = df_trx.orderBy(['FECHA_TRX', 'ID_TRANSACCION', 'CLIENTE_DOCUMENTO'])

df_trx = df_trx.withColumn("CONTADOR", monotonically_increasing_id()) #Creamos un contador para no duplicar la información despues de cualquier join

In [12]:
%%time
df_trx.select(['FECHA_TRX', 'ID_TRANSACCION', 'CLIENTE_DOCUMENTO', 'CONTADOR', 'PRODUCTO_NOMB_SUB_PRODUCTO']).show()

24/03/23 19:44:58 WARN MongoShardedPartitioner: Collection 'TransaccionesMD.transactions' does not appear to be sharded, continuing with a single partition. To split the collections into multiple partitions connect to the MongoDB node directly


[Stage 7:>                                                          (0 + 1) / 1]

+-------------------+--------------+-----------------+--------+--------------------------+
|          FECHA_TRX|ID_TRANSACCION|CLIENTE_DOCUMENTO|CONTADOR|PRODUCTO_NOMB_SUB_PRODUCTO|
+-------------------+--------------+-----------------+--------+--------------------------+
|2023-06-01 00:00:00|    5989247495|       1054991483|       0|          ASTRO MILLONARIO|
|2023-06-01 00:00:00|    5989247495|       1054991483|       1|          ASTRO MILLONARIO|
|2023-06-01 00:00:00|    5989247495|       1054991483|       2|          ASTRO MILLONARIO|
|2023-06-01 00:00:00|    5989247495|       1054991483|       3|          ASTRO MILLONARIO|
|2023-06-01 00:00:00|    5989247505|       1054991483|       4|          ASTRO MILLONARIO|
|2023-06-01 00:00:00|    5989247505|       1054991483|       5|          ASTRO MILLONARIO|
|2023-06-01 00:00:00|    5989247505|       1054991483|       6|          ASTRO MILLONARIO|
|2023-06-01 00:00:00|    5989247505|       1054991483|       7|          ASTRO MILLONARIO|

                                                                                

In [13]:
df_trx.count()

                                                                                

49655861

##### Documento que homologa los productos 

In [14]:
df_productos = pd.read_csv("./Homologadores/productos_de_seta.csv", sep=',')
df_productos = df_productos[['PRODUCTO_NOMB_PRODUCTO','PRODUCTO_NOMB_SUB_PRODUCTO', 'PRODUCTO_UNIFICADO', 'TIPO_SERVICIO']]
df_productos = spark.createDataFrame(df_productos)

#### Se cruza con la base de transacciones

In [15]:
df_trx_1 = df_trx.join(df_productos, on=["PRODUCTO_NOMB_PRODUCTO", "PRODUCTO_NOMB_SUB_PRODUCTO"], how="left").dropDuplicates(["CONTADOR"])

#### Filtro solo Chance

In [16]:
df_trx_1_sel = df_trx_1.filter(col("TIPO_SERVICIO") == "CHANCE")
df_trx_1_sel.count()

                                                                                

22742878

In [17]:
# Miramos si hay algun producto unificado con valores nulos o vacios
# df_trx_1_sel.select(when(col("PRODUCTO_UNIFICADO").isNull() | (col("PRODUCTO_UNIFICADO") == ""), 1).otherwise(0)).collect()[0][0]

In [18]:
agregacion_df = df_trx_1_sel.groupBy("PRODUCTO_NOMB_SUB_PRODUCTO").agg(
                       count("ID_TRANSACCION").alias("num_transacciones")).sort('num_transacciones').show()

                                                                                

+--------------------------+-----------------+
|PRODUCTO_NOMB_SUB_PRODUCTO|num_transacciones|
+--------------------------+-----------------+
|                       UÑA|             1973|
|                      PATA|           335525|
|                 DIRECTO 4|         10528264|
|                 DIRECTO 3|         11877116|
+--------------------------+-----------------+



In [19]:
df_trx_1_select = df_trx_1_sel.select(['CLIENTE_DOCUMENTO', 'FECHA_TRX', 'ID_TRANSACCION', 'VALOR_TRANSACCION', 'VALOR_PREMIO', 'CANTIDAD_PREMIOS'])\
                              .orderBy(['FECHA_TRX', 'ID_TRANSACCION', 'CLIENTE_DOCUMENTO'])

### Creación de las variables sinteticas:

##Ver la frecuencia de compra de los clientes


##Frecuencia\
numero_trx_realizadas\
numero_trx_ultima_semana\
numero_trx_ultimo_mes\
numero_veces_ganadas


##Moneda\
valor_total_transado\
valor_ganancias\
valor_promedio_transaccion\
valor_mediana_transaccion\
valor_maximo_transado\
valor_minimo_transado\

##Recencia\
dias_transcurridos_ultima_apuesta\
dias_transcurridos_primera_apuesta\
dias_transcurridos_apuesta_maxima\
dias_transcurridos_apuesta_minima\
dias_transcurridos_desde_ultima_ganancia\
dias_promedio_entre_apuestas\
dias_mediana_entre_apuestas\
días_maximo_entre_apuestas\


##Sociodemografico
sexo\
rango_edad\

##Variable_respuesta
churn

#### Iniciamos calculando las variables respecto a las frecuencias:

#### Numero de transacciones realizadas

In [20]:
#numero_trx_realizadas:
# Realizar groupBy y contar ocurrencias por cliente y transacción
gb_1 = df_trx_1_select.groupBy("CLIENTE_DOCUMENTO", "ID_TRANSACCION").agg(count("*").cast("int").alias("CONTEO"))\
                      .orderBy("CLIENTE_DOCUMENTO")


num_trx_total = gb_1.groupBy("CLIENTE_DOCUMENTO").agg(count('CONTEO').alias("SUMA_CONTEO"))

#### Numero de transacciones ultima semana

In [21]:
#Numero_trx_ultima_semana
gb_2 = df_trx_1_select.withColumn("SEMANA_ANIO", weekofyear("FECHA_TRX"))

# Encontrar el valor máximo de 'semana_del_anio'
max_semana = gb_2.select(max("SEMANA_ANIO")).collect()[0][0]

# Filtrar el DataFrame para obtener solo las filas de la semana máxima
df_semana_maxima = gb_2.filter(col("SEMANA_ANIO") == max_semana)

#trx_ultima_semana
trx_ultima_semana = df_semana_maxima.groupBy('CLIENTE_DOCUMENTO')\
                                    .agg(count('SEMANA_ANIO').alias('TRX_ULTIMA_SEMANA'))

                                                                                

#### Numero de transacciones ultimo mes

In [22]:
#numero_trx_ultimo_mes
gb_3 = df_trx_1_select.withColumn("MES_ANIO", format_string("%02d-%d", month("FECHA_TRX"), year("FECHA_TRX")))

# Encontrar el valor máximo de 'semana_del_anio'
max_mes = gb_3.select(max("MES_ANIO")).collect()[0][0]

# Filtrar el DataFrame para obtener solo las filas de la semana máxima
df_mes_max = gb_3.filter(col("MES_ANIO") == max_mes)

#trx_ultima_semana
trx_ultimo_mes = df_mes_max.groupBy('CLIENTE_DOCUMENTO')\
                           .agg(count('MES_ANIO').alias('TRX_ULTIMO_MES'))

                                                                                

#### Numero de veces ganadas

In [23]:
#numero_veces_ganadas
gb_4 = df_trx_1_select.filter(col("VALOR_PREMIO") != 0)

num_veces_ganadas = gb_4.groupBy('CLIENTE_DOCUMENTO')\
                           .agg(count('ID_TRANSACCION').alias('NUM_VECES_GANADAS'))

#### Ahora unimos las BD

In [24]:
df_final_1 = num_trx_total.join(trx_ultima_semana, ['CLIENTE_DOCUMENTO'], "left")
df_final_2 = df_final_1.join(trx_ultimo_mes, ['CLIENTE_DOCUMENTO'], "left")
df_final_3 = df_final_2.join(num_veces_ganadas, ['CLIENTE_DOCUMENTO'], "left")

#### Calculamos las variables respecto a la moneda

In [25]:
# agrupando por tipo y numero de documento

#valor_total_transado\
#valor_ganancias\
#valor_promedio_transaccion\
#valor_mediana_transaccion\
#valor_maximo_transado\
#valor_minimo_transado\

df_agg_clientes = df_trx_1_select.groupBy('CLIENTE_DOCUMENTO')\
                                 .agg(
                                        sum('VALOR_TRANSACCION').alias('VALOR_TOTAL'),
                                        mean('VALOR_TRANSACCION').alias('VALOR_PROM'),
                                        expr("approx_percentile(VALOR_TRANSACCION, 0.5)").alias("VALOR_MEDIANO"),
                                        min('VALOR_TRANSACCION').alias('VALOR_MIN'),
                                        max('VALOR_TRANSACCION').alias('VALOR_MAX'),
                                        sum('VALOR_PREMIO').alias('VALOR_PREMIOS'),
                                        min("FECHA_TRX").alias('FECHA_PRIMERA_APUESTA'),
                                        max("FECHA_TRX").alias('FECHA_ULTIMA_APUESTA')
                                  )

#### Recencia

In [26]:
fecha_def_recencia = date(2023, 11, 30)

In [27]:
# recencia
df_agg_clientes = df_agg_clientes.withColumn("DIAS_TRANSCURRIDOS_ULTIMA_APUESTA", datediff(lit(fecha_def_recencia), col("FECHA_ULTIMA_APUESTA"))) # current_date()
# primera apuesta dentro del periodo de tiempo
df_agg_clientes = df_agg_clientes.withColumn("DIAS_TRANSCURRIDOS_PRIMERA_APUESTA", datediff(lit(fecha_def_recencia), col("FECHA_PRIMERA_APUESTA"))) # current_date()


In [28]:
df_final_4 = df_final_3.join(df_agg_clientes, ['CLIENTE_DOCUMENTO'], "left")

In [29]:
#df_final_4.count() == df_final_3.count()

#### Ahora calculamos las variables respecto a los periodos entre apuestas

* dias_transcurridos_apuesta_maxima
* dias_transcurridos_apuesta_minima
* dias_transcurridos_desde_ultima_ganancia
* dias_promedio_entre_apuestas
* dias_mediana_entre_apuestas
* días_maximo_entre_apuestas

In [30]:
# Definir una ventana particionada por cliente y producto, y ordenada por fecha_compra
windowSpec = Window.partitionBy("CLIENTE_DOCUMENTO").orderBy("FECHA_TRX")

# Agregar una columna con la fecha de la compra anterior
df_1 = df_trx_1_select.withColumn("FECHA_COMPRA_ANTERIOR", lag("FECHA_TRX").over(windowSpec))

# Calcular los días entre la compra actual y la compra anterior
df_2 = df_1.withColumn("DIAS_ENTRE_COMPRA", datediff("FECHA_TRX", "FECHA_COMPRA_ANTERIOR"))

# Filtrar para remover la primera compra de cada cliente y producto, ya que no tiene compra anterior
df_2 = df_2.filter(col("DIAS_ENTRE_COMPRA").isNotNull())

In [31]:
# agrupando por tipo y numero de documento
df_dias_entre_apuestas= df_2.groupBy('CLIENTE_DOCUMENTO')\
                            .agg(mean('DIAS_ENTRE_COMPRA').alias('DIAS_PROM_ENTRE_APUESTAS'),
                                 expr("approx_percentile(DIAS_ENTRE_COMPRA, 0.5)").alias("DIAS_MEDIANOS_ENTRE_APUESTAS"),
                                 min('DIAS_ENTRE_COMPRA').alias('DIAS_MIN_ENTRE_APUESTAS'),
                                 max('DIAS_ENTRE_COMPRA').alias('DIAS_MAX_ENTRE_APUESTAS')
                                )

In [32]:
# Paso 1: Encontrar la apuesta máxima por cliente

# Definir una ventana particionada por cliente
windowSpec = Window.partitionBy("CLIENTE_DOCUMENTO")

# Encontrar el valor máximo de apuesta para cada cliente y la fecha correspondiente
df_max_apuesta = df_trx_1_select.withColumn("max_valor_apuesta", max("VALOR_TRANSACCION").over(windowSpec))\
                   .where(col("VALOR_TRANSACCION") == col("max_valor_apuesta"))

# Eliminar duplicados en caso de que un cliente tenga más de una apuesta máxima con el mismo valor
df_max_apuesta = df_max_apuesta.dropDuplicates(["CLIENTE_DOCUMENTO","max_valor_apuesta"])

# Paso 2: Calcular los días transcurridos desde la apuesta máxima hasta la fecha actual
df_max_apuesta = df_max_apuesta.withColumn("dias_transcurridos_apuesta_max", datediff(lit(fecha_def_recencia), col("FECHA_TRX")))

# Seleccionar columnas relevantes para mostrar
df_max_apuesta = df_max_apuesta.select("CLIENTE_DOCUMENTO", "max_valor_apuesta", "dias_transcurridos_apuesta_max")

In [33]:
# Paso 1: Encontrar la apuesta minima por cliente
# Definir una ventana particionada por cliente
windowSpec = Window.partitionBy("CLIENTE_DOCUMENTO")

# Encontrar el valor máximo de apuesta para cada cliente y la fecha correspondiente
df_min_apuesta = df_trx_1_select.withColumn("min_valor_apuesta", min("VALOR_TRANSACCION").over(windowSpec))\
                   .where(col("VALOR_TRANSACCION") == col("min_valor_apuesta"))

# Eliminar duplicados en caso de que un cliente tenga más de una apuesta máxima con el mismo valor
df_min_apuesta = df_min_apuesta.dropDuplicates(["CLIENTE_DOCUMENTO","min_valor_apuesta"])

# Paso 2: Calcular los días transcurridos desde la apuesta máxima hasta la fecha actual
df_min_apuesta = df_min_apuesta.withColumn("dias_transcurridos_apuesta_min", datediff(lit(fecha_def_recencia), col("FECHA_TRX")))

# Seleccionar columnas relevantes para mostrar
df_min_apuesta = df_min_apuesta.select("CLIENTE_DOCUMENTO", "min_valor_apuesta", "dias_transcurridos_apuesta_min")

In [34]:
# Definir una ventana particionada por cliente
windowSpec = Window.partitionBy("CLIENTE_DOCUMENTO")

# Encontrar la fecha de la última ganancia para cada cliente
ultima_ganancia = df_trx_1_select.filter(col('CANTIDAD_PREMIOS')> 0).withColumn("fecha_ultima_ganancia", max("FECHA_TRX").over(windowSpec))



# Calcular los días transcurridos desde la última ganancia hasta la fecha actual
ultima_ganancia = ultima_ganancia.withColumn("dias_transcurridos_desde_ultima_ganancia", datediff(lit(fecha_def_recencia), col("fecha_ultima_ganancia")))
ultima_ganancia = ultima_ganancia.dropDuplicates(["CLIENTE_DOCUMENTO","dias_transcurridos_desde_ultima_ganancia"])

                                  
# Seleccionar columnas relevantes para mostrar (eliminando duplicados si es necesario)
ultima_ganancia = ultima_ganancia.select("CLIENTE_DOCUMENTO","fecha_ultima_ganancia", "dias_transcurridos_desde_ultima_ganancia").dropDuplicates(["CLIENTE_DOCUMENTO"])

In [35]:
df_final_5 = df_final_4.join(df_dias_entre_apuestas, ['CLIENTE_DOCUMENTO'], "left")
df_final_5 = df_final_5.join(df_max_apuesta, ['CLIENTE_DOCUMENTO'], "left")
df_final_5 = df_final_5.join(df_min_apuesta, ['CLIENTE_DOCUMENTO'], "left")
df_final_5 = df_final_5.join(ultima_ganancia, ['CLIENTE_DOCUMENTO'], "left")

In [36]:
df_final_4.count() == df_final_5.count()

                                                                                

True

In [37]:
df_final_5.show(2,vertical = True)

[Stage 685:>                                                        (0 + 1) / 1]

-RECORD 0-------------------------------------------------------
 CLIENTE_DOCUMENTO                        | 1327088             
 SUMA_CONTEO                              | 162                 
 TRX_ULTIMA_SEMANA                        | 6                   
 TRX_ULTIMO_MES                           | 48                  
 NUM_VECES_GANADAS                        | null                
 VALOR_TOTAL                              | 363800.0            
 VALOR_PROM                               | 528.7790697674419   
 VALOR_MEDIANO                            | 400.0               
 VALOR_MIN                                | 125.0               
 VALOR_MAX                                | 6000.0              
 VALOR_PREMIOS                            | 0                   
 FECHA_PRIMERA_APUESTA                    | 2023-06-03 00:00:00 
 FECHA_ULTIMA_APUESTA                     | 2023-11-30 00:00:00 
 DIAS_TRANSCURRIDOS_ULTIMA_APUESTA        | 0                   
 DIAS_TRANSCURRIDOS_PRIME

                                                                                

In [38]:
df_final_5.count()

                                                                                

300871

In [39]:
df_final = df_final_5.toPandas()

                                                                                

In [40]:
df_final.to_parquet('/home/notebooks/santiago_castro_/df_trx_6m.parquet', index=False)

In [88]:
df_final.head(3)

Unnamed: 0,CLIENTE_DOCUMENTO,SUMA_CONTEO,TRX_ULTIMA_SEMANA,TRX_ULTIMO_MES,NUM_VECES_GANADAS,VALOR_TOTAL,VALOR_PROM,VALOR_MEDIANO,VALOR_MIN,VALOR_MAX,...,DIAS_PROM_ENTRE_APUESTAS,DIAS_MEDIANOS_ENTRE_APUESTAS,DIAS_MIN_ENTRE_APUESTAS,DIAS_MAX_ENTRE_APUESTAS,max_valor_apuesta,dias_transcurridos_apuesta_max,min_valor_apuesta,dias_transcurridos_apuesta_min,fecha_ultima_ganancia,dias_transcurridos_desde_ultima_ganancia
0,12345678,1125,62.0,523.0,1.0,3630000.07,1192.509878,500.0,36.0,20000.0,...,0.059809,0.0,0.0,52.0,20000.0,5,36.0,157,2023-07-12,141.0
1,10000591,6,,2.0,,29000.0,4142.857143,5000.0,2000.0,6000.0,...,28.166667,0.0,0.0,105.0,6000.0,7,2000.0,176,NaT,
2,1000064446,2,,,,20000.0,10000.0,10000.0,10000.0,10000.0,...,0.0,0.0,0.0,0.0,10000.0,70,10000.0,70,NaT,


### Ahora vamos a extraer la información de clientes

In [2]:
start_date = '2023-06-01'
end_date = '2023-11-30'
companies_clientes = 'SETA_SUSUERTE SUSUERTE'

In [3]:
consulta_clientes_optimizada = """
[
    {
        "$match": {
            "$text": {
                "$search": "companies_clientes"
            }
        }
    },
    {
        "$project": {
            "_id": 1,
            "CLIENTE_DOCUMENTO": "$identificaciones.documento",
            "CLIENTE_TIPO_DOCUMENTO": "$identificaciones.tipo_documento",
            "datos_enrolados_filtrados": {
                "$filter": {
                    "input": "$datos_enrolados",
                    "as": "datos_enrolado",
                    "cond": {
                        "$and": [
                            { "$ne": ["$$datos_enrolado.fecha_enrolamiento", null] },
                            { "$ne": ["$$datos_enrolado.fecha_enrolamiento", ""] }
                        ]
                    }
                }
            },
            "ANIO_NACIMIENTO": "$ano_nacimiento",
            "MES_NACIMIENTO": "$mes_nacimiento",
            "SEXO": "$sexo",
            "COD_ACTIVIDAD_ECONOMICA": "$actividad_economica.codigo",
            "ACTIVIDAD_ECONOMICA": "$actividad_economica.descripcion"
        }
    },
    {
        "$project": {
            "_id": 1,
            "CLIENTE_DOCUMENTO": {$first: "$CLIENTE_DOCUMENTO"},
            "CLIENTE_TIPO_DOCUMENTO": {$first: "$CLIENTE_TIPO_DOCUMENTO"},
            "FECHA_ENROLAMIENTO": {$first:"$datos_enrolados_filtrados.fecha_enrolamiento"},
            "EMPRESA": {
                "$arrayElemAt": ["$datos_enrolados_filtrados.empresa", 0]
            },
            "ANIO_NACIMIENTO": 1,
            "MES_NACIMIENTO": 1,
            "SEXO": 1,
            "COD_ACTIVIDAD_ECONOMICA": {$first: "$COD_ACTIVIDAD_ECONOMICA"},
            "ACTIVIDAD_ECONOMICA": {$first: "$ACTIVIDAD_ECONOMICA"}
        }
    }
]

"""

shcli = StructType([StructField('_id', StringType(), True),
                    StructField('CLIENTE_DOCUMENTO', StringType(), True), 
                    StructField('CLIENTE_TIPO_DOCUMENTO',StringType(), True),
                    StructField('FECHA_ENROLAMIENTO',TimestampType(), True),
                    StructField('EMPRESA',StringType(), True),
                    StructField('ANIO_NACIMIENTO',StringType(), True),
                    StructField('MES_NACIMIENTO',StringType(), True),
                    StructField('SEXO',StringType(), True),
                    StructField('COD_ACTIVIDAD_ECONOMICA',StringType(), True),
                    StructField('ACTIVIDAD_ECONOMICA',StringType(), True)
                    
                   ])

consulta_clientes_optimizada = consulta_clientes_optimizada.replace("companies_clientes", companies_clientes)


In [4]:
consulta_clientes_error_optimizada = """
[
    {
        "$match": {
            "$text": {
                "$search": "companies_clientes"
            }
        }
    },
    {
        "$project": {
            "_id": 1,
            "CLIENTE_DOCUMENTO": "$client.identificaciones.documento",
            "CLIENTE_TIPO_DOCUMENTO": "$client.identificaciones.tipo_documento",
            "datos_enrolados_filtrados": {
                "$filter": {
                    "input": "$client.datos_enrolados",
                    "as": "datos_enrolado",
                    "cond": {
                        "$and": [
                            { "$ne": ["$$datos_enrolado.fecha_enrolamiento", null] },
                            { "$ne": ["$$datos_enrolado.fecha_enrolamiento", ""] }
                        ]
                    }
                }
            },
            "ANIO_NACIMIENTO": "$client.ano_nacimiento",
            "MES_NACIMIENTO": "$client.mes_nacimiento",
            "SEXO": "$client.sexo",
            "COD_ACTIVIDAD_ECONOMICA": "$client.actividad_economica.codigo",
            "ACTIVIDAD_ECONOMICA": "$client.actividad_economica.descripcion"
        }
    },
    {
        "$project": {
            "_id": 1,
            "CLIENTE_DOCUMENTO": {$first: "$CLIENTE_DOCUMENTO"},
            "CLIENTE_TIPO_DOCUMENTO": {$first: "$CLIENTE_TIPO_DOCUMENTO"},
            "FECHA_ENROLAMIENTO": {$first:"$datos_enrolados_filtrados.fecha_enrolamiento"},
            "EMPRESA": {
                "$arrayElemAt": ["$datos_enrolados_filtrados.empresa", 0]
            },
            "ANIO_NACIMIENTO": 1,
            "MES_NACIMIENTO": 1,
            "SEXO": 1,
            "COD_ACTIVIDAD_ECONOMICA": {$first: "$COD_ACTIVIDAD_ECONOMICA"},
            "ACTIVIDAD_ECONOMICA": {$first: "$ACTIVIDAD_ECONOMICA"}
        }
    }
]

"""

# ,
#      {
#         "$limit": 10
#     }
    

sherr = StructType([StructField('_id', StringType(), True),
                    StructField('CLIENTE_DOCUMENTO', StringType(), True), 
                    StructField('CLIENTE_TIPO_DOCUMENTO',StringType(), True),
                    StructField('FECHA_ENROLAMIENTO',TimestampType(), True),
                    StructField('EMPRESA',StringType(), True),
                    StructField('ANIO_NACIMIENTO',StringType(), True),
                    StructField('MES_NACIMIENTO',StringType(), True),
                    StructField('SEXO',StringType(), True),
                    StructField('COD_ACTIVIDAD_ECONOMICA',StringType(), True),
                    StructField('ACTIVIDAD_ECONOMICA',StringType(), True)
                    
                   ])

consulta_clientes_error_optimizada = consulta_clientes_error_optimizada.replace("companies_clientes", companies_clientes)

In [5]:
import config

2024-03-24 2023-12-25


In [6]:
from spark_util import SparkHandler

In [7]:
spark_session = SparkHandler(f'desercion', local = False)

:: loading settings :: url = jar:file:/opt/miniconda/lib/python3.9/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/santiagocastro/.ivy2/cache
The jars for the packages stored in: /home/santiagocastro/.ivy2/jars
org.mongodb.spark#mongo-spark-connector_2.12 added as a dependency
com.oracle.database.jdbc#ojdbc8 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-52a67417-bddd-48aa-8c74-a255169ac2f7;1.0
	confs: [default]
	found org.mongodb.spark#mongo-spark-connector_2.12;3.0.1 in central
	found org.mongodb#mongodb-driver-sync;4.0.5 in central
	found org.mongodb#bson;4.0.5 in central
	found org.mongodb#mongodb-driver-core;4.0.5 in central
	found com.oracle.database.jdbc#ojdbc8;19.18.0.0 in central
:: resolution report :: resolve 350ms :: artifacts dl 12ms
	:: modules in use:
	com.oracle.database.jdbc#ojdbc8;19.18.0.0 from central in [default]
	org.mongodb#bson;4.0.5 from central in [default]
	org.mongodb#mongodb-driver-core;4.0.5 from central in [default]
	org.mongodb#mongodb-driver-sync;4.0.5 from central in [default]
	org.mongodb.spa

24/03/24 16:19:12 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


24/03/24 16:19:13 WARN Utils: Service 'sparkDriver' could not bind on port 18100. Attempting port 18101.
24/03/24 16:19:13 WARN Utils: Service 'sparkDriver' could not bind on port 18101. Attempting port 18102.
24/03/24 16:19:15 WARN Utils: Service 'org.apache.spark.network.netty.NettyBlockTransferService' could not bind on port 19100. Attempting port 19101.
24/03/24 16:19:15 WARN Utils: spark.executor.instances less than spark.dynamicAllocation.minExecutors is invalid, ignoring its setting, please update your configs.
desercion


In [8]:
dict_connection_mongo = config.dict_connection_mongo
clientes_sup = dict_connection_mongo['CLIENTES_SUP']
clientes_error = dict_connection_mongo['CLIENTES_ERROR']

In [9]:
df_clientes = spark_session.read_query(
        consulta_clientes_optimizada, 
        clientes_sup['DB'], 
        clientes_sup['CLL'], 
        clientes_sup['URI'],
        shcli
    ) 


df_clientes_error = spark_session.read_query(
    consulta_clientes_error_optimizada, 
    clientes_error['DB'], 
    clientes_error['CLL'], 
    clientes_error['URI'], 
    sherr
)

root
 |-- _id: string (nullable = true)
 |-- CLIENTE_DOCUMENTO: string (nullable = true)
 |-- CLIENTE_TIPO_DOCUMENTO: string (nullable = true)
 |-- FECHA_ENROLAMIENTO: timestamp (nullable = true)
 |-- EMPRESA: string (nullable = true)
 |-- ANIO_NACIMIENTO: string (nullable = true)
 |-- MES_NACIMIENTO: string (nullable = true)
 |-- SEXO: string (nullable = true)
 |-- COD_ACTIVIDAD_ECONOMICA: string (nullable = true)
 |-- ACTIVIDAD_ECONOMICA: string (nullable = true)

root
 |-- _id: string (nullable = true)
 |-- CLIENTE_DOCUMENTO: string (nullable = true)
 |-- CLIENTE_TIPO_DOCUMENTO: string (nullable = true)
 |-- FECHA_ENROLAMIENTO: timestamp (nullable = true)
 |-- EMPRESA: string (nullable = true)
 |-- ANIO_NACIMIENTO: string (nullable = true)
 |-- MES_NACIMIENTO: string (nullable = true)
 |-- SEXO: string (nullable = true)
 |-- COD_ACTIVIDAD_ECONOMICA: string (nullable = true)
 |-- ACTIVIDAD_ECONOMICA: string (nullable = true)



In [10]:
df_clientes_persist =  df_clientes.persist(StorageLevel.DISK_ONLY_2)
df_clientes_error_p = df_clientes_error.persist(StorageLevel.DISK_ONLY_2)

In [58]:
#df_clientes_persist.unionByName(df_clientes_error_p, allowMissingColumns=True)

DataFrame[ACTIVIDAD_ECONOMICA: string, ANIO_NACIMIENTO: string, CLIENTE_DOCUMENTO: string, CLIENTE_TIPO_DOCUMENTO: string, COD_ACTIVIDAD_ECONOMICA: int, EMPRESA: string, FECHA_ENROLAMIENTO: timestamp, MES_NACIMIENTO: string, SEXO: string, _id: struct<oid:string>]

In [36]:
df_clientes_mod = df_clientes_persist.withColumn("valid", F.lit(1))\
.withColumn("CLIENTE_DOCUMENTO", F.regexp_replace("CLIENTE_DOCUMENTO", "^0+", ""))\
.withColumn("CLIENTE_DOCUMENTO", regexp_replace("CLIENTE_DOCUMENTO", "[^a-zA-Z0-9]", ""))\
.dropDuplicates(['CLIENTE_DOCUMENTO'])


In [37]:
df_clientes_error_mod = df_clientes_error_p.withColumn("valid", F.lit(0))\
.withColumn("CLIENTE_DOCUMENTO", F.regexp_replace("CLIENTE_DOCUMENTO", "^0+", ""))\
.withColumn("CLIENTE_DOCUMENTO", F.regexp_replace("CLIENTE_DOCUMENTO", "^\.", ""))\
.withColumn("CLIENTE_DOCUMENTO", regexp_replace("CLIENTE_DOCUMENTO", "[^a-zA-Z0-9]", ""))\
.dropDuplicates(['CLIENTE_DOCUMENTO'])



In [38]:
df_union_cls = df_clientes_mod.union(df_clientes_error_mod)

# Filtrar las filas con documento menor a 4 digitos
df_union_cls = df_union_cls.filter(length("CLIENTE_DOCUMENTO") >= 5)
df_union_cls = df_union_cls.filter(~col('CLIENTE_DOCUMENTO').isin(
    ['012345', '0000000', '012345678', '0000000', '12345', '99999', 
     '9999999', '50000', '1234567', '20000', '10000', "1234567890", "0987654321"]))


df_union_cls = df_union_cls.dropDuplicates(['CLIENTE_DOCUMENTO'])

df_union_cls = df_union_cls.filter(col('FECHA_ENROLAMIENTO').isNotNull())

In [21]:
df_final = pd.read_parquet('/home/notebooks/santiago_castro_/df_trx_6m.parquet')

spark_df = spark.createDataFrame(df_final)

In [43]:
spark_df.count()

24/03/24 17:34:59 WARN TaskSetManager: Stage 5 contains a task of very large size (13186 KiB). The maximum recommended task size is 1000 KiB.


                                                                                

300871

In [40]:
df_union_select = df_union_cls.select('CLIENTE_DOCUMENTO', 'FECHA_ENROLAMIENTO', 'ANIO_NACIMIENTO',
                                       'MES_NACIMIENTO', 'SEXO', 'ACTIVIDAD_ECONOMICA')

df_total = spark_df.join(df_union_select,['CLIENTE_DOCUMENTO'], how='left')
df_total_p= df_total.persist(StorageLevel.DISK_ONLY_2)

In [42]:
df_total_p.show(2,vertical = True)

24/03/24 16:45:49 WARN TaskSetManager: Stage 0 contains a task of very large size (13186 KiB). The maximum recommended task size is 1000 KiB.


                                                                                

-RECORD 0--------------------------------------------------------
 CLIENTE_DOCUMENTO                        | 10000591             
 SUMA_CONTEO                              | 6                    
 TRX_ULTIMA_SEMANA                        | NaN                  
 TRX_ULTIMO_MES                           | 2.0                  
 NUM_VECES_GANADAS                        | NaN                  
 VALOR_TOTAL                              | 29000.0              
 VALOR_PROM                               | 4142.857142857143    
 VALOR_MEDIANO                            | 5000.0               
 VALOR_MIN                                | 2000.0               
 VALOR_MAX                                | 6000.0               
 VALOR_PREMIOS                            | 0                    
 FECHA_PRIMERA_APUESTA                    | 2023-06-07 00:00:00  
 FECHA_ULTIMA_APUESTA                     | 2023-11-23 00:00:00  
 DIAS_TRANSCURRIDOS_ULTIMA_APUESTA        | 7                    
 DIAS_TRAN

In [44]:
df_total_p.count()

                                                                                

300871

In [45]:
df_total_final = df_total_p.withColumn("CLIENTE_DOCUMENTO", F.regexp_replace("CLIENTE_DOCUMENTO", "^0+", ""))\
.withColumn("CLIENTE_DOCUMENTO", F.regexp_replace("CLIENTE_DOCUMENTO", "^\.", ""))

In [48]:
df_export = df_total_final.toPandas()
df_export.to_parquet('/home/notebooks/santiago_castro_/df_total_6m.parquet', index=False)

                                                                                