# Notebook de Silver a Gold con medolo de datos para el dataset

In [ ]:
from pyspark.sql.functions import col, trim, upper, when, lit, current_timestamp, \
                                  substring, regexp_replace, unix_timestamp, \
                                  year, month, dayofmonth, dayofweek, avg, min, max, \
                                  floor, hour, date_format, desc, lag, abs, \
                                  lead, datediff, when, window, mean, stddev, skewness, kurtosis, \
                                  sum, to_date, count, concat, countDistinct, \
                                  asc, filter

from pyspark.sql.window import Window
from pyspark.ml.feature import QuantileDiscretizer

from pyspark.sql.types import StringType, IntegerType, DoubleType, LongType, TimestampType

import plotly.express as px
import numpy as np

bronze_path = "Files/bronze/transacciones_servicios/"
silver_path = "Files/silver/transacciones_servicios/"

dataset_recurrencia = "Files/gold/dataset_recurrencia_servicios/"


notebook_version = "1.0"

print(f"Ruta Bronze: {bronze_path}")
print(f"Ruta Silver: {silver_path}")

StatementMeta(, 7555818f-b483-4bf6-9bc5-ed8d2af5fc8b, 106, Finished, Available, Finished)

Ruta Bronze: Files/bronze/transacciones_servicios/
Ruta Silver: Files/silver/transacciones_servicios/


# Carga de datos desde Silver

In [ ]:
#cargamos los datos de transacciones a terceros 
df_transacciones_servicios = spark.read.format('delta')\
                                      .load(silver_path)

print(f"Lectura completada. Se leyeron {df_transacciones_servicios.count()} registros.")

StatementMeta(, 7555818f-b483-4bf6-9bc5-ed8d2af5fc8b, 107, Finished, Available, Finished)

Lectura completada. Se leyeron 6537744 registros.


In [ ]:
# calculamos la cantidad de dias de diferencia entre las transaccioes
df_transacciones_servicios = df_transacciones_servicios.withColumn("dias_diferencia_anterior_transaccion", datediff(col("posting_date"), col("fecha_anterior"))) \
                         .withColumn("diferencia_monto_anterior", abs(col("amount") - col("monto_anterior")))\
                         .withColumn("dias_diferencia_siguiente_transaccion",datediff(col("fecha_siguiente_transaccion"), col("posting_date")))\
                         .withColumn("diferencia_monto_siguiente", abs(col("amount") - col("monto_siguiente_transaccion")))

StatementMeta(, 7555818f-b483-4bf6-9bc5-ed8d2af5fc8b, 108, Finished, Available, Finished)

In [ ]:
"""
vamos a agrupar los dias de diferencia de manera descriptiva para poder realizar un proceso de interpretación 
"""

df_transacciones_servicios = df_transacciones_servicios.withColumn("grupo_dias_diferencia_anterior",
                                    when(col("dias_diferencia_anterior_transaccion").isNull(), "Primera Transacción")
                                    .when(col("dias_diferencia_anterior_transaccion") == 0, "Mismo Día")
                                    .when(col("dias_diferencia_anterior_transaccion").between(1, 7), "Semanal (1-7 días)")
                                    .when(col("dias_diferencia_anterior_transaccion").between(8, 30), "Mensual (8-30 días)")
                                    .when(col("dias_diferencia_anterior_transaccion").between(31, 90), "Trimestral (31-90 días)")
                                    .otherwise("Largo Plazo (>90 días)"))

df_transacciones_servicios = df_transacciones_servicios.withColumn("grupo_dias_diferencia_siguiente",
                                    when(col("dias_diferencia_siguiente_transaccion").isNull(), "Primera Transacción")
                                    .when(col("dias_diferencia_siguiente_transaccion") == 0, "Mismo Día")
                                    .when(col("dias_diferencia_siguiente_transaccion").between(1, 7), "Semanal (1-7 días)")
                                    .when(col("dias_diferencia_siguiente_transaccion").between(8, 30), "Mensual (8-30 días)")
                                    .when(col("dias_diferencia_siguiente_transaccion").between(31, 90), "Trimestral (31-90 días)")
                                    .otherwise("Largo Plazo (>90 días)"))

StatementMeta(, 7555818f-b483-4bf6-9bc5-ed8d2af5fc8b, 109, Finished, Available, Finished)

In [ ]:
"""
Vamos a crear una variable que nos permita identificar la diferencia de dias entre transacciones realizadas
por un cliente a un mismo beneficiario.

En este caso vamos a usar una funcion de ventana usando la particion de cuenta origen y cuenta destino ordenando
por la fecha de la transacccion y contanto la cantidad de dias entre cada ventana
"""

window_spec = Window.partitionBy("account_nbr", "to_account_nbr").orderBy("posting_date")

# 2. Calcular diferencia en días y en monto con la transacción anterior y la siguiente
df_transacciones_servicios = df_transacciones_servicios.withColumn("monto_anterior", lag("amount", 1).over(window_spec)) \
                   .withColumn("fecha_anterior", lag("posting_date", 1).over(window_spec))\
                   .withColumn("fecha_siguiente_transaccion",lead("posting_date", 1).over(window_spec))\
                   .withColumn("monto_siguiente_transaccion",lead("amount", 1).over(window_spec)
)

# calculamos la cantidad de dias de diferencia entre las transaccioes
df_transacciones_servicios = df_transacciones_servicios.withColumn("dias_diferencia_anterior_transaccion", datediff(col("posting_date"), col("posting_date"))) \
                   .withColumn("diferencia_monto_anterior", abs(col("amount") - col("monto_anterior")))\
                   .withColumn("dias_diferencia_siguiente_transaccion",datediff(col("fecha_siguiente_transaccion"), col("posting_date")))\
                   .withColumn("diferencia_monto_siguiente", abs(col("amount") - col("monto_siguiente_transaccion")))

StatementMeta(, 7555818f-b483-4bf6-9bc5-ed8d2af5fc8b, 110, Finished, Available, Finished)

In [ ]:
"""
Vamos a crear una variable que nos permita identificar la diferencia de dias entre transacciones realizadas
por un cliente a un mismo beneficiario.

En este caso vamos a usar una funcion de ventana usando la particion de cuenta origen y cuenta destino ordenando
por la fecha de la transacccion y contanto la cantidad de dias entre cada ventana
"""

window_spec = Window.partitionBy("account_nbr", "to_account_nbr").orderBy("posting_date")


# 2. Calcular diferencia en días y en monto con la transacción anterior y la siguiente
df_transacciones_servicios = df_transacciones_servicios.withColumn("monto_anterior_transaccion", lag("amount", 1).over(window_spec))\
                                                    .withColumn("monto_siguiente_transaccion",lead("amount", 1).over(window_spec))\
                                                    .withColumn("fecha_anterior_transaccion", lag("posting_date", 1).over(window_spec))\
                                                    .withColumn("fecha_siguiente_transaccion",lead("posting_date", 1).over(window_spec))



# calculamos la cantidad de dias de diferencia entre las transaccioes
df_transacciones_servicios = df_transacciones_servicios.withColumn("dias_diferencia_anterior_transaccion", datediff(col("posting_date"), col("fecha_anterior_transaccion"))) \
                   .withColumn("dias_diferencia_siguiente_transaccion",datediff(col("fecha_siguiente_transaccion"), col("posting_date")))\
                   .withColumn("diferencia_monto_anterior", abs(col("amount") - col("monto_anterior_transaccion")))\
                   .withColumn("diferencia_monto_siguiente", abs(col("amount") - col("monto_siguiente_transaccion")))

StatementMeta(, 7555818f-b483-4bf6-9bc5-ed8d2af5fc8b, 111, Finished, Available, Finished)

In [ ]:
df_transacciones_servicios = df_transacciones_servicios.select(
    col("account_nbr").alias('numero_cuenta_remitente'),
    col("ident").alias('cedula_remitente'),
    col("posting_date").alias('fecha_transaccion'),
    "product_type_id",
    "description1",
    "description2",
    "description3",
    col("amount").alias('monto'),
    "cashier_nbr",
    col("to_account_nbr").alias('numero_cuenta_servicio'),
    col("idsegmento").alias('segmento'),
    "anio_transaccion",
    "mes_transaccion",
    "dia_transaccion",
    "dia_semana",
    "nombre_dia_semana",
    "monto_anterior",
    "fecha_anterior",
    "fecha_siguiente_transaccion",
    "fecha_anterior_transaccion",
    "monto_siguiente_transaccion",
    "monto_anterior_transaccion",
    "dias_diferencia_anterior_transaccion",
    "grupo_dias_diferencia_anterior",
    "diferencia_monto_anterior",
    "dias_diferencia_siguiente_transaccion",
    "grupo_dias_diferencia_siguiente", 
    "diferencia_monto_siguiente",
    "fecha_proceso_silver",
    "version_notebook"
)


StatementMeta(, 7555818f-b483-4bf6-9bc5-ed8d2af5fc8b, 112, Finished, Available, Finished)

In [ ]:
display(df_transacciones_servicios)

StatementMeta(, 7555818f-b483-4bf6-9bc5-ed8d2af5fc8b, 113, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 53d8c9af-c6c3-4480-9ab8-650e5550e035)

In [ ]:
df_agg = df_transacciones_servicios.groupBy(
    "cedula_remitente",
    "numero_cuenta_servicio",
    "description1",
).agg(
  count("cedula_remitente").alias("total_transacciones"),
    avg("dias_diferencia_anterior_transaccion").alias("promedio_dias_diferencia_anterior"),
    sum("monto").alias("total_monto")
).fillna(0).orderBy(desc("total_transacciones"))

StatementMeta(, 7555818f-b483-4bf6-9bc5-ed8d2af5fc8b, 114, Finished, Available, Finished)

In [ ]:
display(df_transacciones_servicios)

StatementMeta(, 7555818f-b483-4bf6-9bc5-ed8d2af5fc8b, 115, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 1131781b-20ac-4514-95da-a911037a97e4)

# Reglas de Reduccion de Datos

- Minimo de 12 transacciones en Total
- Menor a un promedio de 45 dias de diferencia entre transacciones
- Monto Total minimo es de 81 (para evitar las transacciones automaticas de cobro de 80 pesos de la DGII)
- (En la notebook de silver estan eliminadas todas la tablas que no sean PAGOS concretos a SERVICIOS)



In [ ]:
df_transacciones_servicios.printSchema()

StatementMeta(, 7555818f-b483-4bf6-9bc5-ed8d2af5fc8b, 116, Finished, Available, Finished)

root
 |-- numero_cuenta_remitente: long (nullable = true)
 |-- cedula_remitente: string (nullable = true)
 |-- fecha_transaccion: string (nullable = true)
 |-- product_type_id: integer (nullable = true)
 |-- description1: string (nullable = true)
 |-- description2: string (nullable = true)
 |-- description3: string (nullable = true)
 |-- monto: double (nullable = true)
 |-- cashier_nbr: integer (nullable = true)
 |-- numero_cuenta_servicio: long (nullable = true)
 |-- segmento: integer (nullable = true)
 |-- anio_transaccion: integer (nullable = true)
 |-- mes_transaccion: integer (nullable = true)
 |-- dia_transaccion: integer (nullable = true)
 |-- dia_semana: integer (nullable = true)
 |-- nombre_dia_semana: string (nullable = true)
 |-- monto_anterior: double (nullable = true)
 |-- fecha_anterior: string (nullable = true)
 |-- fecha_siguiente_transaccion: string (nullable = true)
 |-- fecha_anterior_transaccion: string (nullable = true)
 |-- monto_siguiente_transaccion: double (nul

In [ ]:
df_joined = df_transacciones_servicios.join(
    df_agg,
    on=["cedula_remitente", "numero_cuenta_servicio", "description1"],
    how="inner"
)

StatementMeta(, 7555818f-b483-4bf6-9bc5-ed8d2af5fc8b, 117, Finished, Available, Finished)

In [ ]:
# Aplicar la cadena de filtros corregida
df_joined = df_joined.filter(col("total_transacciones") >= 12)\
                     .filter(col("promedio_dias_diferencia_anterior") <= 45)

StatementMeta(, 7555818f-b483-4bf6-9bc5-ed8d2af5fc8b, 118, Finished, Available, Finished)

In [ ]:
columnas = [
    "cedula_remitente",
    "numero_cuenta_servicio",
    "description1",
    "fecha_transaccion",
    "anio_transaccion",
    "mes_transaccion",
    "dia_transaccion",
    "dia_semana",
    "fecha_anterior_transaccion",
    "dias_diferencia_anterior_transaccion",
    "fecha_siguiente_transaccion",
    "dias_diferencia_siguiente_transaccion",
    "monto",
    "monto_anterior_transaccion",
    "diferencia_monto_anterior",
    "monto_siguiente_transaccion",
    "diferencia_monto_siguiente",
    "total_transacciones",
    "promedio_dias_diferencia_anterior"
]

df_transacciones_servicios_modelo = df_joined.select(columnas)


StatementMeta(, 7555818f-b483-4bf6-9bc5-ed8d2af5fc8b, 121, Finished, Available, Finished)

In [ ]:
display(df_transacciones_servicios_modelo)

StatementMeta(, 7555818f-b483-4bf6-9bc5-ed8d2af5fc8b, 122, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, cc17b6b0-6d76-47e9-a246-84f50c8b4076)

In [ ]:
# La estrategia de partición se mantiene igual que en Bronze para consistencia (usando la nueva columna renombrada)
df_transacciones_servicios_modelo.write \
                                .format("delta") \
                                .mode("overwrite") \
                                .option("overwriteSchema", "true") \
                                .save(dataset_recurrencia)


StatementMeta(, 7555818f-b483-4bf6-9bc5-ed8d2af5fc8b, 123, Finished, Available, Finished)