In [30]:
from pyspark.sql import SparkSession
from delta.pip_utils import configure_spark_with_delta_pip 
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.sql.functions import col,when

In [2]:
builder = SparkSession.builder \
    .appName("EntrenarModeloFraude") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark = configure_spark_with_delta_pip(builder).getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/11/17 15:13:10 WARN Utils: Your hostname, Zidnz, resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
25/11/17 15:13:10 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
:: loading settings :: url = jar:file:/home/zidnz/mi_proyecto_env/lib/python3.12/site-packages/pyspark/jars/ivy-2.5.3.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: /home/zidnz/.ivy2.5.2/cache
The jars for the packages stored in: /home/zidnz/.ivy2.5.2/jars
io.delta#delta-spark_2.13 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-061a92b2-98c7-46d2-9d57-f43bdef29ad1;1.0
	confs: [default]
	found io.delta#delta-spark_2.13;4.0.0 in central
	found io.delta#delta-storage;4.0.0 in central
	found org.antlr#antlr4-runtime;4.13.1 in central
:: resolution report :: resolve 221ms :: artifacts dl 7ms
	:: modules in use:
	io.d

In [3]:

DELTA_PATH = "/home/zidnz/DanaPP/proyecto_fraude/delta_lake_features"
print(f"Leyendo Delta Lake (con features) desde: {DELTA_PATH}")

df_modelo_input = spark.read.format("delta").load(DELTA_PATH)

print("¡Tabla de features lista para el modelo!")
df_modelo_input.printSchema()


Leyendo Delta Lake (con features) desde: /home/zidnz/DanaPP/proyecto_fraude/delta_lake_features
¡Tabla de features lista para el modelo!
root
 |-- tipo_transaccion: string (nullable = true)
 |-- id_transaccion: integer (nullable = true)
 |-- id_cliente: integer (nullable = true)
 |-- nombre_cliente: string (nullable = true)
 |-- cuenta_origen: double (nullable = true)
 |-- fecha: string (nullable = true)
 |-- monto: double (nullable = true)
 |-- divisa: string (nullable = true)
 |-- ciudad: string (nullable = true)
 |-- cuenta_destino: double (nullable = true)
 |-- categoria: string (nullable = true)
 |-- hora_movimiento: timestamp (nullable = true)
 |-- concepto_movimiento: string (nullable = true)
 |-- saldo_previo: double (nullable = true)
 |-- saldo_posterior: double (nullable = true)
 |-- canal_transaccion: string (nullable = true)
 |-- medio_pago: string (nullable = true)
 |-- estatus: string (nullable = true)
 |-- fraude_probable: string (nullable = true)
 |-- es_fraude: integer

In [4]:
# === PASO 1: Preparar el DataFrame para el Pipeline ===
from pyspark.sql.functions import col

# El pipeline de ML de Spark necesita que la columna objetivo se llame "label"
df_modelo_input = df_modelo_input.withColumnRenamed("es_fraude", "label")

In [5]:
# === PASO 2: Definir las Columnas ===

# Tus features numéricas (las que ya creaste)
columnas_numericas = [
    "feat_horario_riesgo",
    "feat_tipo_riesgo",
    "feat_canal_riesgo",
    "feat_perfil_riesgo_completo",
    "feat_log_monto",
    "feat_ratio_monto_vs_tipo",
    "monto" # Incluir el monto original también es útil
]

# Tus features categóricas (las originales que el modelo también usará)
columnas_categoricas = [
    "tipo_transaccion", 
    "canal_transaccion", 
    "categoria", 
    "divisa", 
    "estatus", 
    "medio_pago", 
    "ciudad"
]

In [19]:
# === PASO 3: Crear los "Stages" del Pipeline ===

# Stage 1: Convertir strings categóricos a índices numéricos
# (Ej: "App movil" -> 0.0, "Web" -> 1.0)
indexer = StringIndexer(
    inputCols=columnas_categoricas, 
    outputCols=[c + "_idx" for c in columnas_categoricas],
    handleInvalid="keep" # Si aparece una categoría nueva, la agrupa
)

# Stage 2: Aplicar One-Hot Encoding a esos índices
# (Convierte "0.0" en un vector [1, 0, 0])
encoder = OneHotEncoder(
    inputCols=indexer.getOutputCols(),
    outputCols=[c + "_ohe" for c in columnas_categoricas]
)

# Stage 3: Ensamblar TODO en un solo vector de features
# (Junta tus features numéricas + las nuevas categóricas codificadas)
features_list = columnas_numericas + encoder.getOutputCols()
assembler = VectorAssembler(
    inputCols=features_list,
    outputCol="features",
    handleInvalid="keep"
)

# Stage 4: El Modelo (Usaremos un Random Forest)
# Es robusto, bueno para empezar y maneja bien el desbalance.
rf = RandomForestClassifier(
    labelCol="label",
    featuresCol="features",
    seed=42
)

# 97.46% (194921) No-Fraude vs 2.54% (5079) Fraude
# Peso = Total No-Fraude / Total Fraude = 194921 / 5079 = 38.38
ratio_peso = 194921 / 5079

df_modelo_ponderado = df_modelo_input.withColumn("classWeight",
    when(col("label") == 1, ratio_peso).otherwise(1.0)
)

In [21]:
# === PASO 4: Crear el Pipeline ===
# (Definir indexer, encoder, assembler... igual que antes)
# ...

# MEJORA: Decirle al modelo que use la columna de peso
rf = RandomForestClassifier(
    labelCol="label",
    featuresCol="features",
    weightCol="classWeight",  # Usamos la ponderación
    numTrees=100,
    maxDepth=10,
    seed=42
)

pipeline = Pipeline(stages=[indexer, encoder, assembler, rf])


In [24]:
# === PASO 5: Dividir Datos ===
# Usamos el DataFrame ponderado
(train_data, test_data) = df_modelo_ponderado.randomSplit([0.8, 0.2], seed=42)

print(f"Datos de entrenamiento (para CV): {train_data.count()}")
print(f"Datos de prueba (finales): {test_data.count()}")

                                                                                

Datos de entrenamiento (para CV): 160255
Datos de prueba (finales): 39745


In [33]:
# === PASO 6: Definir el Evaluador y el CrossValidator ===
evaluator = BinaryClassificationEvaluator(
    rawPredictionCol="rawPrediction",
    labelCol="label",
    metricName="areaUnderPR"
)

# --- ESTA ES LA CORRECCIÓN ---
# 1. Crear una parrilla de parámetros (aunque esté vacía)
paramGrid = ParamGridBuilder().build() 

# 2. Definir el CrossValidator CON la parrilla
cv = CrossValidator(
    estimator=pipeline,
    estimatorParamMaps=paramGrid,  # <-- ESTA LÍNEA ES LA QUE FALTA Y ARREGLA EL ERROR
    evaluator=evaluator,
    numFolds=5,
    seed=42
)
# --- FIN DE LA CORRECCIÓN ---

In [34]:

# === PASO 7: Entrenar el Modelo (AHORA SÍ FUNCIONARÁ) ===
print("\nEntrenando con Cross-Validation (esto tardará más)...")
cv_model = cv.fit(train_data)

print("¡Modelo entrenado y validado!")


Entrenando con Cross-Validation (esto tardará más)...


25/11/17 15:30:26 WARN DAGScheduler: Broadcasting large task binary with size 1129.9 KiB
25/11/17 15:30:27 WARN DAGScheduler: Broadcasting large task binary with size 1387.5 KiB
25/11/17 15:30:29 WARN DAGScheduler: Broadcasting large task binary with size 1674.2 KiB
25/11/17 15:30:32 WARN DAGScheduler: Broadcasting large task binary with size 1300.1 KiB
25/11/17 15:30:47 WARN DAGScheduler: Broadcasting large task binary with size 1191.7 KiB
25/11/17 15:30:48 WARN DAGScheduler: Broadcasting large task binary with size 1466.1 KiB
25/11/17 15:30:50 WARN DAGScheduler: Broadcasting large task binary with size 1757.5 KiB
25/11/17 15:30:53 WARN DAGScheduler: Broadcasting large task binary with size 1305.1 KiB
25/11/17 15:31:10 WARN DAGScheduler: Broadcasting large task binary with size 1265.0 KiB
25/11/17 15:31:11 WARN DAGScheduler: Broadcasting large task binary with size 1598.2 KiB
25/11/17 15:31:13 WARN DAGScheduler: Broadcasting large task binary with size 1960.0 KiB
25/11/17 15:31:17 WAR

¡Modelo entrenado y validado!


In [35]:
# === PASO 8: Hacer Predicciones en Datos de PRUEBA ===
# Usamos 'cv_model' (que contiene el mejor modelo de los 5)
# para predecir en los datos de 'test_data' que NUNCA VIO.
print("Realizando predicciones en el conjunto de prueba...")
predictions = cv_model.transform(test_data)

Realizando predicciones en el conjunto de prueba...


In [36]:
# === PASO 9: Evaluar el Modelo (Resultados Finales) ===
# Calculamos la métrica en el conjunto de prueba
area_under_pr_final = evaluator.evaluate(predictions)

# Calcular la Matriz de Confusión
print("Calculando Matriz de Confusión Final...")
tp = predictions.filter("label == 1 AND prediction == 1").count()
tn = predictions.filter("label == 0 AND prediction == 0").count()
fp = predictions.filter("label == 0 AND prediction == 1").count()
fn = predictions.filter("label == 1 AND prediction == 0").count()

precision = tp / (tp + fp) if (tp + fp) > 0 else 0
recall = tp / (tp + fn) if (tp + fn) > 0 else 0

print(f"\n--- Resultados de Evaluación Finales (Post-CV) ---")
print(f"Área bajo la curva PR (en test_data): {area_under_pr_final:.4f}")
print("---")
print(f"Verdaderos Positivos (Fraudes Atrapados): {tp}")
print(f"Verdaderos Negativos (Inocentes Ignorados): {tn}")
print("---")
print(f"Falsos Positivos (Inocentes Molestados): {fp}")
print(f"Falsos Negativos (Fraudes Perdidos): {fn}")
print("---")
print(f"Precisión Final: {precision:.2%}")
print(f"Recall Final: {recall:.2%}")

25/11/17 15:32:27 WARN DAGScheduler: Broadcasting large task binary with size 1513.3 KiB
                                                                                

Calculando Matriz de Confusión Final...


25/11/17 15:32:29 WARN DAGScheduler: Broadcasting large task binary with size 1515.3 KiB
25/11/17 15:32:30 WARN DAGScheduler: Broadcasting large task binary with size 1515.3 KiB
25/11/17 15:32:32 WARN DAGScheduler: Broadcasting large task binary with size 1515.3 KiB
25/11/17 15:32:34 WARN DAGScheduler: Broadcasting large task binary with size 1515.3 KiB


--- Resultados de Evaluación Finales (Post-CV) ---
Área bajo la curva PR (en test_data): 0.9952
---
Verdaderos Positivos (Fraudes Atrapados): 976
Verdaderos Negativos (Inocentes Ignorados): 38403
---
Falsos Positivos (Inocentes Molestados): 366
Falsos Negativos (Fraudes Perdidos): 0
---
Precisión Final: 72.73%
Recall Final: 100.00%


                                                                                

In [38]:
# %%
# === PASO 10: GUARDAR EL MODELO FINAL ===

# Define una ruta REAL donde guardarás tu modelo
MODEL_SAVE_PATH = "/home/zidnz/DanaPP/proyecto_fraude/modelos/random_forest_fraude_cv" 

print(f"Guardando el modelo entrenado en: {MODEL_SAVE_PATH}")

# 'cv_model' es la variable que contiene tu modelo entrenado del CrossValidator
cv_model.write().overwrite().save(MODEL_SAVE_PATH)

print(f"¡Modelo guardado con éxito en {MODEL_SAVE_PATH}!")

Guardando el modelo entrenado en: /home/zidnz/DanaPP/proyecto_fraude/modelos/random_forest_fraude_cv
¡Modelo guardado con éxito en /home/zidnz/DanaPP/proyecto_fraude/modelos/random_forest_fraude_cv!
