### Objetivo

* El objetivo de la prueba es idear una solución para identificar transacciones que evidencian un 
comportamiento de Mala Práctica Transaccional, empleando un producto de datos. Adicional, 
describir la solución y detallar cómo incorporar el producto de datos en un marco operativo.

### Datos relevantes 

* Se entiende como una Mala Práctica Transaccional, un comportamiento donde se evidencia un 
uso de los canales mal intencionado; como Fraccionamiento transacional.
* Fraccionamiento transacional: esta es una mala practica que consiste en fraccionar una transacion grande en varias pequeñas, estas transacciones se caracterizan por estar en una misma ventana de tiempo que suele ser 24 horas y tienen como origen o destino la misma cuenta o cliente. 

### MOdelo identificacion de Transacciones Fracionadas: 
*  Genaracion de Clientes identificados como practicantes de fracionamiento transacional

##########################################################################

* Intalacion de Bibliotecas
* Lectura de bibliotecas
* Descarga de datos y/o creacion de Pipeline
* Lectura de datos
* identificacion de usuarios que tienen mas de 2 transaciones en un solo dia

## Intalacion librerias

In [None]:
import importlib
import subprocess
import sys

## Funcion para revisar si la libreria ya esta instalada, si no esta la instala.
def ensure_library_installed(library_name):
    try:
        # Intenta importar la librería
        importlib.import_module(library_name)
        print(f"La librería '{library_name}' ya está instalada.")
    except ImportError:
        print(f"La librería '{library_name}' no está instalada. Instalando ahora...")
        subprocess.check_call([sys.executable, "-m", "pip", "install", library_name])
        print(f"Librería '{library_name}' instalada correctamente.")

# Ejemplo de uso
librerias =["gdown","matplotlib","pandas","pyspark","seaborn","scikit-learn","virtualenv"]
for i in librerias:
    ensure_library_installed(i)

### Lectura de librerias

In [1]:
import pandas as pd
import os
import re
import gdown
import seaborn as sns
import matplotlib.pyplot as plt
import requests
from pyspark.sql.functions import to_timestamp, date_format
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, isnan, when, count, last, unix_timestamp, lag, to_date, round, sum as _sum
from pyspark.sql import functions as F
from pyspark.sql import Window
from pyspark.sql.types import (
    StructType, StructField, StringType, TimestampType, DecimalType
)
import gc

#### Iniciar secion en spark y mejorar el rendimiento del cuaderno

In [None]:
# Initialize the SparkSession
spark = SparkSession.builder \
    .appName("Example App") \
    .getOrCreate()

# Set the configuration
spark.conf.set("spark.sql.repl.eagerEval.truncate", 0)  # Use 0 to disable truncation

In [None]:
# Iniciar sesión de Spark
spark = SparkSession.builder \
    .appName("Descargar y cargar archivo") \
    .getOrCreate()

In [None]:
spark = (
    SparkSession.builder
    .appName("Test")
    .config("spark.executor.memory", "16g") \
    .config("spark.driver.memory", "16g")
    .getOrCreate()
)

In [None]:
from pyspark.sql import SparkSession

# Iniciar sesión de Spark
spark = SparkSession.builder \
    .appName("Descargar y cargar archivo") \
    .getOrCreate()

# URL del archivo
url = "https://nequi-data.s3.us-east-1.amazonaws.com/sandbox_co/mscarmon/prueba_seleccion_ds/sample_data_0007_part_00.parquet"

# Ruta y nombre donde se guardará el archivo descargado
output_path = "C:/Users/jhonf/Descargas/Prueba_tecnica_Nequi/Data/sample_data_0007_part_00.parquet"

# 1. Verificar si el archivo existe y eliminarlo si es necesario
if os.path.exists(output_path):
    os.remove(output_path)
    print(f"Archivo existente eliminado: {output_path}")

# 2. Descargar el archivo desde la URL
response = requests.get(url, stream=True)
if response.status_code == 200:  # porque si el codigo esta en estado 200 es porque el servidor respondio de manera exitosa
    with open(output_path, "wb") as file:
        for chunk in response.iter_content(chunk_size=1024):
            file.write(chunk)
    print(f"Archivo descargado exitosamente como: {output_path}")

  # Leemos el archivo descargado
    df7 = spark.read.parquet(output_path)
    
    # Mostrar algunas filas como verificación
    df7.show(truncate=False)
else:
    print(f"Error al descargar el archivo: {response.status_code}")

print(df7.count())

In [None]:
from pyspark.sql import SparkSession

# Iniciar sesión de Spark
spark = SparkSession.builder \
    .appName("Descargar y cargar archivo") \
    .getOrCreate()

# URL del archivo
url = "https://nequi-data.s3.us-east-1.amazonaws.com/sandbox_co/mscarmon/prueba_seleccion_ds/sample_data_0006_part_00.parquet"

# Ruta y nombre donde se guardará el archivo descargado
output_path = "C:/Users/jhonf/Descargas/Prueba_tecnica_Nequi/Data/sample_data_0006_part_00_modelo.parquet"

# 1. Verificar si el archivo existe y eliminarlo si es necesario
if os.path.exists(output_path):
    os.remove(output_path)
    print(f"Archivo existente eliminado: {output_path}")

# 2. Descargar el archivo desde la URL
response = requests.get(url, stream=True)
if response.status_code == 200:  # porque si el codigo esta en estado 200 es porque el servidor respondio de manera exitosa
    with open(output_path, "wb") as file:
        for chunk in response.iter_content(chunk_size=1024):
            file.write(chunk)
    print(f"Archivo descargado exitosamente como: {output_path}")

  # Leemos el archivo descargado
    df6 = spark.read.parquet(output_path)
    
    # Mostrar algunas filas como verificación
    df6.show(truncate=False)
else:
    print(f"Error al descargar el archivo: {response.status_code}")

print(df6.count())
print(df6.select("user_id").distinct().count())
print(df6.dtypes)

In [None]:
# Función para corregir el nombre de la columna
def corregir_columna(columna):
    # Eliminar espacios al principio y al final
    columna = columna.strip()
    columna = columna.replace("%", "pct")
    columna = columna.replace(" ", "_").replace(".", "_")
    columna = columna.replace("'", "").replace('"', '')
    columna = columna.replace('//', '')
    columna = columna.replace("#", "")
    return columna

for columna in df6.columns:
    # Renombrar cada columna usando la función corregir_columna
    df6 = df6.withColumnRenamed(columna, corregir_columna(columna))
    df7 = df7.withColumnRenamed(columna, corregir_columna(columna))

In [None]:
## Unimos toda la informacion
df6=df6.union(df7)

In [None]:
print(df6.groupBy("transaction_type").count().show())

In [None]:
# Eliminar duplicados basándose en la columna "_id"
dfu = df6.dropDuplicates(["_id"]) # Se deben quitar para ver cuantas transaciones eran iguales y encontrar posibles fraudes
dfu = dfu.dropDuplicates(["user_id", "transaction_date"])
print(dfu.count())

In [None]:
## Liberar espacio en memoria
del df6
del duplicates
del df6_users
gc.collect()

In [None]:
# Asegúrate de que la columna transaction_date esté en formato timestamp
dfu = dfu.withColumn("transaction_time", to_timestamp(col("transaction_date"), "yyyy-MM-dd HH:mm:ss"))
dfu = dfu.orderBy("user_id", "transaction_time")

#### Calculo de las ventanas de tiempo y diferencia en horas entre transaciones

In [None]:
# === 1) Definir la columna 'day' ===
dfu = dfu.withColumn("day", to_date(col("transaction_time")))

# === 2) Detectar cambio de día con lag(day) ===
# Ventana principal: particionamos por user_id y ordenamos por transaction_time
w_user_order = Window.partitionBy("user_id").orderBy("transaction_time")

dfu = dfu.withColumn(
    "prev_day",
    lag("day").over(w_user_order)
)

# Creamos una bandera day_change que es 1 cuando cambia el día, 0 si es el mismo día
dfu = dfu.withColumn(
    "day_change_flag",
    when(col("prev_day").isNull(), 0)  # primera transacción => no cambia día
    .when(col("day") != col("prev_day"), 1)
    .otherwise(0)
)

# === 3) day_group: identificador acumulado de cada día, por usuario ===
# Sumar en forma acumulada la bandera day_change_flag
dfu = dfu.withColumn(
    "day_group",
    _sum("day_change_flag").over(w_user_order)
)
# Así, cada vez que day_change_flag = 1, se incrementa day_group en 1

# --- Limpieza opcional ---
dfu = dfu.drop("prev_day", "day_change_flag")

# === 4) Calcular diff_hours dentro de cada día_group ===
#  (1) Definir ventana que particiona por user_id y day_group, ordena por fecha/hora
w_user_day = Window.partitionBy("user_id", "day_group").orderBy("transaction_time")

# (2) Calcular la transacción anterior dentro del mismo day_group
dfu = dfu.withColumn(
    "prev_tr_time_tmp",
    lag("transaction_time").over(w_user_day)
)

# (3) Si es la primera transacción del day_group, prev_tr_time_tmp estará en null
#     Asignamos la transaction_time actual para que diff_hours = 0 en esa fila
dfu = dfu.withColumn(
    "prev_tr_time",
    when(col("prev_tr_time_tmp").isNull(), col("transaction_time"))
    .otherwise(col("prev_tr_time_tmp"))
)
dfu = dfu.drop("prev_tr_time_tmp")

# (4) Calcular diff_hours
dfu = dfu.withColumn(
    "diff_hours",
    (unix_timestamp("transaction_time") - unix_timestamp("prev_tr_time")) / 3600
)

dfu = dfu.withColumn(
    "diff_minutes",
    (unix_timestamp("transaction_time") - unix_timestamp("prev_tr_time")) / 60
)

# === 5) new_window_flag: si diff_hours > 24, se abre una nueva ventana ===
dfu = dfu.withColumn(
    "new_window_flag",
    when(col("diff_hours") > 24, 1).otherwise(0)
)

# === 6) Calcular windows_time dentro de cada day_group ===
#     - sumamos en forma acumulada new_window_flag y le sumamos 1 para iniciar en 1
dfu = dfu.withColumn(
    "windows_time",
    _sum("new_window_flag").over(w_user_day) + 1
)

# (Opcional) limpiar columnas
dfu = dfu.drop("new_window_flag","windows_time")
dfu = dfu.withColumnRenamed("day_group", "windows_time")
# Ajustar la columna "day_group" para que inicie en 1
dfu = dfu.withColumn("windows_time", col("windows_time") + 1)
# dfu.show(truncate=False)
dfu = dfu.withColumn("diff_hours", round(col("diff_hours"), 2))
dfu = dfu.withColumn("diff_minutes", round(col("diff_minutes"), 2))

### Identificacion de usuarios que tienen  mas de dos transaciones en un solo dia

In [None]:
# Agrupar por 'user_id' y 'windows_time', contar los 'windows_time' y sumar 'transaction_amount'
df_count = dfu.groupBy("user_id", "windows_time") \
    .agg(
        F.count("windows_time").alias("windows_time_count"),
        F.sum("transaction_amount").alias("total_transaction_amount"),
        F.avg("diff_hours").alias("avg_diff_hours")  # Promedio de diff_hours
    )
# Filtrar aquellos 'user_id' donde el conteo de 'windows_time' sea mayor a 2
df_filtered = df_count.filter(F.col("windows_time_count") > 2)
# Ordenar por total_transaction_amount de mayor a menor
df_filtered = df_filtered.orderBy(F.col("total_transaction_amount").desc())
# Mostrar el resultado
print('# Usuarios unicos que estan haciendo Fraccionamiento transaccional',df_filtered.select("user_id").distinct().count())
df_filtered.show(truncate=False)

In [None]:
# Seleccionar las columnas relevantes y convertirlas a pandas
columns_to_plot = ['avg_diff_hours','avg_diff_minutes', 'windows_time_count', 'total_transaction_amount']
df_pandas = df_filtered.select(columns_to_plot).toPandas()

# Configurar el tamaño del gráfico
plt.figure(figsize=(12, 6))

# Crear el boxplot para todas las columnas seleccionadas
sns.boxplot(data=df_pandas, orient="h", palette="Set2")

# Personalización
plt.title('Boxplot de columnas para detección de outliers', fontsize=14)
plt.xlabel('Valores', fontsize=12)
plt.yticks(range(len(columns_to_plot)), columns_to_plot, fontsize=10)
plt.grid(axis="x", linestyle="--", alpha=0.7)

# Mostrar el gráfico
plt.show()

In [None]:
# Seleccionar solo las columnas necesarias y convertirlas a pandas
df_pandas2 = df_filtered.select('windows_time', 'total_transaction_amount').toPandas()

# Crear el scatter plot
plt.figure(figsize=(10, 6))
plt.scatter(df_pandas2['windows_time'], df_pandas2['total_transaction_amount'], alpha=0.7, color='skyblue', edgecolor='black')

# Personalización de la gráfica
plt.title('Scatter Plot: windows_time vs total_transaction_amount', fontsize=14)
plt.xlabel('Total Amount Sum', fontsize=12)
plt.ylabel('Transaction Count', fontsize=12)
plt.grid(alpha=0.5)

# Mostrar la gráfica
plt.show()

In [None]:
# Ruta de salida donde deseas guardar el archivo Parquet
save_path = "C:/Users/jhonf/Descargas/Prueba_tecnica_Nequi/Data/Clientes_con_mas_de_tres_tranferencias.parquet"

# Guardar el DataFrame como archivo Parquet
df_filtered.write.mode("overwrite").parquet(save_path)

print(f"Archivo guardado correctamente en {save_path}")