# **Modelo MLP con `PySpark`**

In [1]:
!pip install pyspark



In [2]:
# INSTALACIÓN Y CONFIGURACIÓN
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.4.0/spark-3.4.0-bin-hadoop3.tgz
!tar xf spark-3.4.0-bin-hadoop3.tgz
!pip install -q findspark


El sistema no puede encontrar la ruta especificada.


"wget" no se reconoce como un comando interno o externo,
programa o archivo por lotes ejecutable.


tar: Error opening archive: Failed to open 'spark-3.4.0-bin-hadoop3.tgz'


In [3]:
# Librerias
import time
from pyspark.sql import SparkSession
from pyspark import SparkConf
from pyspark.sql import functions as F
from pyspark.ml import Pipeline
from pyspark.sql.types import DoubleType
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler, Imputer
from pyspark.ml.classification import MultilayerPerceptronClassifier
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.sql.functions import col, when
import numpy as np
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.4.0-bin-hadoop3"

import findspark
findspark.init()

In [4]:
try:
    from pyspark.ml.functions import vector_to_array
except Exception:
    from pyspark.sql.functions import vector_to_array  # fallback

In [5]:
# Iniciar sesión Spark
spark = SparkSession.builder \
    .appName("MyMini") \
    .getOrCreate()

FileNotFoundError: [WinError 2] El sistema no puede encontrar el archivo especificado

In [None]:
# MONITOREO DE RECURSOS
def check_colab_resources():
    """Verificar recursos disponibles en Colab"""
    import psutil
    memory = psutil.virtual_memory()
    print(f"Recursos Colab:")
    print(f"   • RAM total: {memory.total / (1024**3):.1f} GB")
    print(f"   • RAM disponible: {memory.available / (1024**3):.1f} GB")
    print(f"   • Cores CPU: {os.cpu_count()}")
    print(f"   • Spark cores: {spark.sparkContext.defaultParallelism}")

check_colab_resources()

Recursos Colab:
   • RAM total: 12.7 GB
   • RAM disponible: 11.0 GB
   • Cores CPU: 2
   • Spark cores: 2


### Funciones de evaluación

In [None]:
def evaluate_model_comprehensive(model, train_df, test_df, labelCol="label"):
    """
    Evaluación completa del modelo con múltiples métricas
    en ambos conjuntos de entrenamiento y prueba
    """

    # Hacer predicciones
    train_predictions = model.transform(train_df)
    test_predictions = model.transform(test_df)

    # Evaluadores para métricas binarias
    evaluator_auc_roc = BinaryClassificationEvaluator(
        labelCol=labelCol,
        rawPredictionCol="rawPrediction",
        metricName="areaUnderROC"
    )
    evaluator_auc_pr = BinaryClassificationEvaluator(
        labelCol=labelCol,
        rawPredictionCol="rawPrediction",
        metricName="areaUnderPR"
    )
    # Evaluadores para métricas multiclase
    evaluator_f1 = MulticlassClassificationEvaluator(
        labelCol=labelCol,
        predictionCol="prediction",
        metricName="f1"
    )
    evaluator_precision = MulticlassClassificationEvaluator(
        labelCol=labelCol,
        predictionCol="prediction",
        metricName="weightedPrecision"
    )
    evaluator_recall = MulticlassClassificationEvaluator(
        labelCol=labelCol,
        predictionCol="prediction",
        metricName="weightedRecall"
    )
    evaluator_accuracy = MulticlassClassificationEvaluator(
        labelCol=labelCol,
        predictionCol="prediction",
        metricName="accuracy"
    )

    # Calcular métricas para TRAIN
    train_auc_roc = evaluator_auc_roc.evaluate(train_predictions)
    train_auc_pr = evaluator_auc_pr.evaluate(train_predictions)
    train_f1 = evaluator_f1.evaluate(train_predictions)
    train_precision = evaluator_precision.evaluate(train_predictions)
    train_recall = evaluator_recall.evaluate(train_predictions)
    train_accuracy = evaluator_accuracy.evaluate(train_predictions)

    # Calcular métricas para TEST
    test_auc_roc = evaluator_auc_roc.evaluate(test_predictions)
    test_auc_pr = evaluator_auc_pr.evaluate(test_predictions)
    test_f1 = evaluator_f1.evaluate(test_predictions)
    test_precision = evaluator_precision.evaluate(test_predictions)
    test_recall = evaluator_recall.evaluate(test_predictions)
    test_accuracy = evaluator_accuracy.evaluate(test_predictions)

    # Calcular matriz de confusión para test
    confusion_matrix = test_predictions.groupBy(
        F.col(labelCol).alias("Actual"),
        F.col("prediction").alias("Predicted")
    ).count().orderBy("Actual", "Predicted")

    return {
        'train_metrics': {
            'AUC_ROC': train_auc_roc,
            'AUC_PR': train_auc_pr,
            'F1': train_f1,
            'Precision': train_precision,
            'Recall': train_recall,
            'Accuracy': train_accuracy
        },
        'test_metrics': {
            'AUC_ROC': test_auc_roc,
            'AUC_PR': test_auc_pr,
            'F1': test_f1,
            'Precision': test_precision,
            'Recall': test_recall,
            'Accuracy': test_accuracy
        },
        'confusion_matrix': confusion_matrix
    }

In [None]:
# Calcular métricas específicas por clase desde matriz de confusión
def calculate_detailed_metrics(confusion_df):
    """Calcular métricas detalladas por clase desde matriz de confusión"""
    # Asumiendo que las clases son 0.0 y 1.0
    try:
        tn = confusion_df[(confusion_df['Actual'] == 0.0) & (confusion_df['Predicted'] == 0.0)]['count'].values[0]
        fp = confusion_df[(confusion_df['Actual'] == 0.0) & (confusion_df['Predicted'] == 1.0)]['count'].values[0]
        fn = confusion_df[(confusion_df['Actual'] == 1.0) & (confusion_df['Predicted'] == 0.0)]['count'].values[0]
        tp = confusion_df[(confusion_df['Actual'] == 1.0) & (confusion_df['Predicted'] == 1.0)]['count'].values[0]

        precision_1 = tp / (tp + fp) if (tp + fp) > 0 else 0
        recall_1 = tp / (tp + fn) if (tp + fn) > 0 else 0
        f1_1 = 2 * (precision_1 * recall_1) / (precision_1 + recall_1) if (precision_1 + recall_1) > 0 else 0

        precision_0 = tn / (tn + fn) if (tn + fn) > 0 else 0
        recall_0 = tn / (tn + fp) if (tn + fp) > 0 else 0
        f1_0 = 2 * (precision_0 * recall_0) / (precision_0 + recall_0) if (precision_0 + recall_0) > 0 else 0

        return {
            'class_0': {'precision': precision_0, 'recall': recall_0, 'f1': f1_0},
            'class_1': {'precision': precision_1, 'recall': recall_1, 'f1': f1_1}
        }
    except:
        return "No se pudo calcular métricas detalladas"

## Encontrar los mejores hiperparámetros


Uso de CrossValidation de 3 folds.
Se utilizo el 10% de la data dispobible.

La metrica de evaluación fue `areaUnderPR`: 'area bajo la curva presición recall.

In [None]:
# 🔹 Lectura de datos [10% de los datos]
df = spark.read.csv("data_to_model.csv", header=True, inferSchema=True)
df = df.sample(fraction=0.1, seed=42)

* Configuración inicial

In [None]:
# 🔹 Configuración inicial
label_col = "default"

# Detección automática de columnas
categorical_cols = [
    col for col in df.columns
    if df.schema[col].dataType.typeName() == 'string' and col != label_col]
numeric_cols = [
    col for col in df.columns
    if df.schema[col].dataType.typeName() in ['integer', 'double', 'float'] and col != label_col]

print(f"Variables categóricas: {categorical_cols}")
print(f"Variables numéricas: {numeric_cols}")

Variables categóricas: ['motivo_prestamo', 'tipo_vivienda']
Variables numéricas: ['monto_aprobado', 'tasa_interes', 'plazo_meses', 'ingreso_anual', 'antiguedad_laboral', 'estado_verif_ingreso', 'promedio_fico', 'anio_apertura_credito', 'cuentas_hipotecarias', 'total_cuentas_credito', 'cuentas_tarjeta_credito', 'saldo_revolvente', 'uso_credito_revolvente', 'limite_credito_total', 'dti', 'meses_ultima_consulta', 'meses_tarjeta_nueva', 'lineas_credito_12m', 'porcentaje_sin_moras', 'moras_2_year', 'monto_total_cobranzas', 'tuvo_acuerdo_pago', 'subcategoria_credito', 'anio_emision_prestamo', 'mes_emision_prestamo']


* Preprocesamiento

In [None]:
# 🔹 Preprocesamiento

# Convertir label a DoubleType y filtrar nulos
df = df.withColumn("label", F.col(label_col).cast(DoubleType()))\
       .filter(F.col("label").isNotNull())

# Imputación de valores nulos en variables numéricas
imputer = Imputer(
    inputCols=numeric_cols,
    outputCols=[f"{c}_imp" for c in numeric_cols],
    strategy="median"
)
num_imp_cols = [f"{c}_imp" for c in numeric_cols]

# Indexado y encoding de variables categóricas
indexers = [StringIndexer(inputCol=c, outputCol=f"{c}_idx", handleInvalid="keep")
            for c in categorical_cols]

encoder = OneHotEncoder(
    inputCols=[f"{c}_idx" for c in categorical_cols],
    outputCols=[f"{c}_oh" for c in categorical_cols]
)
ohe_cols = [f"{c}_oh" for c in categorical_cols]

# Assembler y scaler
assembler = VectorAssembler(
    inputCols=num_imp_cols + ohe_cols,
    outputCol="features_raw",
    handleInvalid="keep"
)

scaler = StandardScaler(
    inputCol="features_raw",
    outputCol="features",
    withStd=True,
    withMean=True
)

In [None]:
# 🔹 Pipeline de preprocesamiento
preprocessing_stages = [imputer] + indexers + [encoder, assembler, scaler]
preprocessing_pipeline = Pipeline(stages=preprocessing_stages)

for i, stage in enumerate(preprocessing_pipeline.getStages()):
    print(f"Paso {i+1}: {stage.__class__.__name__}")
    print(stage.explainParams())
    print("-" * 50)

# Ajustar el pipeline de preprocesamiento
print("Ajustando pipeline de preprocesamiento...")
preprocessing_model = preprocessing_pipeline.fit(df)

# Transformar todos los datos
df_transformed = preprocessing_model.transform(df)

# Obtener dimensión de entrada
sample_vec = (df_transformed.limit(1)
              .select(F.col("features").alias("sample_features"))
              .head()["sample_features"])
input_dim = sample_vec.size
print(f"[INFO] Dimensión de entrada = {input_dim}")


Paso 1: Imputer
inputCol: input column name. (undefined)
inputCols: input column names. (current: ['monto_aprobado', 'tasa_interes', 'plazo_meses', 'ingreso_anual', 'antiguedad_laboral', 'estado_verif_ingreso', 'promedio_fico', 'anio_apertura_credito', 'cuentas_hipotecarias', 'total_cuentas_credito', 'cuentas_tarjeta_credito', 'saldo_revolvente', 'uso_credito_revolvente', 'limite_credito_total', 'dti', 'meses_ultima_consulta', 'meses_tarjeta_nueva', 'lineas_credito_12m', 'porcentaje_sin_moras', 'moras_2_year', 'monto_total_cobranzas', 'tuvo_acuerdo_pago', 'subcategoria_credito', 'anio_emision_prestamo', 'mes_emision_prestamo'])
missingValue: The placeholder for the missing values. All occurrences of missingValue will be imputed. (default: nan)
outputCol: output column name. (default: Imputer_78adc9146dfc__output)
outputCols: output column names. (current: ['monto_aprobado_imp', 'tasa_interes_imp', 'plazo_meses_imp', 'ingreso_anual_imp', 'antiguedad_laboral_imp', 'estado_verif_ingreso_i

In [None]:
# 🔹 Split train-test
train_df, test_df = df_transformed.randomSplit([0.8, 0.2], seed=42)

print(f"Train size: {train_df.count()}")
print(f"Test size: {test_df.count()}")
print("Proporción de clases en train:")
train_df.groupBy('label').count().show()
print("Proporción de clases en test:")
test_df.groupBy('label').count().show()

Train size: 107284
Test size: 26948
Proporción de clases en train:
+-----+-----+
|label|count|
+-----+-----+
|  0.0|85757|
|  1.0|21527|
+-----+-----+

Proporción de clases en test:
+-----+-----+
|label|count|
+-----+-----+
|  0.0|21573|
|  1.0| 5375|
+-----+-----+



* Definición del modelo e hiperparámetros

In [None]:
# 🔹 Configurar MLP con Grid Search
mlp = MultilayerPerceptronClassifier(
    featuresCol="features",
    labelCol="label",
    predictionCol="prediction",
    rawPredictionCol="rawPrediction",
    maxIter=100,
    seed=42
)

# Definir arquitecturas de red
layers_options = [
    [input_dim, 10, 2],           # Simple
    [input_dim, 50, 2],           # Media
    [input_dim, 100, 2],          # Grande
]

# Definir la malla de parámetros
paramGrid = ParamGridBuilder() \
    .addGrid(mlp.layers, layers_options) \
    .addGrid(mlp.stepSize, [0.1, 0.01, 0.001])\
    .build()

# Configurar evaluador
evaluator = BinaryClassificationEvaluator(
    labelCol="label",
    rawPredictionCol="rawPrediction",
    metricName="areaUnderPR"  # Usamos PR por desbalance de datos
)

# Configurar CrossValidator
crossval = CrossValidator(
    estimator=mlp,
    estimatorParamMaps=paramGrid,
    evaluator=evaluator,
    numFolds=3,
    seed=42,
    collectSubModels=False
)


In [None]:
# 🔹 Entrenar el modelo
print("Iniciando grid search...")
start_time = time.time()

cv_model = crossval.fit(train_df)

fit_time = time.time() - start_time
print(f"Tiempo de entrenamiento: {fit_time:.2f} segundos")


Iniciando grid search...
Tiempo de entrenamiento: 3738.77 segundos


In [None]:
# Visualizar los parámetros
best_params = cv_model.bestModel.extractParamMap()

print("═" * 50)
print("MEJORES PARÁMETROS ENCONTRADOS")
print("═" * 50)

# Parámetros más importantes primero
important_params = ['layers', 'stepSize', 'maxIter', 'blockSize', 'solver']
for param_name in important_params:
    for param_obj, value in best_params.items():
        if param_obj.name == param_name:
            print(f"{param_name:15}: {value}")
            break

print("\n" + "═" * 50)
print("OTROS PARÁMETROS")
print("═" * 50)

# El resto de parámetros
for param_obj, value in best_params.items():
    if param_obj.name not in important_params:
        print(f"{param_obj.name:15}: {value}")

══════════════════════════════════════════════════
MEJORES PARÁMETROS ENCONTRADOS
══════════════════════════════════════════════════
layers         : [45, 10, 2]
stepSize       : 0.1
maxIter        : 100
blockSize      : 128
solver         : l-bfgs

══════════════════════════════════════════════════
OTROS PARÁMETROS
══════════════════════════════════════════════════
featuresCol    : features
labelCol       : label
predictionCol  : prediction
probabilityCol : probability
rawPredictionCol: rawPrediction
seed           : 42
tol            : 1e-06


In [None]:
# 7. Evaluar el mejor modelo COMPLETAMENTE
print("Evaluando modelo con métricas")
best_model = cv_model.bestModel

# Evaluación comprehensiva
results = evaluate_model_comprehensive(best_model, train_df, test_df)

# Mostrar resultados en formato tabular
print("\n" + "═" * 80)
print("RENDIMIENTO DEL MEJOR MODELO - MÉTRICAS COMPLETAS")
print("═" * 80)
print(f"{'Métrica':<15} {'Train':<10} {'Test':<10} {'Diferencia':<12}")
print("─" * 80)

metrics_to_display = ['AUC_ROC', 'AUC_PR', 'F1', 'Precision', 'Recall', 'Accuracy']

for metric in metrics_to_display:
    train_val = results['train_metrics'][metric]
    test_val = results['test_metrics'][metric]
    difference = abs(train_val - test_val)

    print(f"{metric:<15} {train_val:.4f}    {test_val:.4f}    {difference:.4f}")

print("═" * 80)


Evaluando modelo con métricas

════════════════════════════════════════════════════════════════════════════════
RENDIMIENTO DEL MEJOR MODELO - MÉTRICAS COMPLETAS
════════════════════════════════════════════════════════════════════════════════
Métrica         Train      Test       Diferencia  
────────────────────────────────────────────────────────────────────────────────
AUC_ROC         0.7504    0.7389    0.0115
AUC_PR          0.5129    0.4935    0.0195
F1              0.7834    0.7804    0.0030
Precision       0.8194    0.8123    0.0071
Recall          0.8274    0.8251    0.0023
Accuracy        0.8274    0.8251    0.0023
════════════════════════════════════════════════════════════════════════════════


In [None]:
test_predictions = best_model.transform(test_df)
def get_confusion_matrix_robust(predictions, labelCol="label", predictionCol="prediction"):
    """Versión robusta para obtener matriz de confusión"""

    # Calcular conteos
    confusion_counts = predictions.groupBy(
        F.col(labelCol).alias("actual"),
        F.col(predictionCol).alias("predicted")
    ).count()

    # Colectar datos de forma segura
    confusion_data = []
    for row in confusion_counts.collect():
        confusion_data.append({
            'actual': row['actual'],
            'predicted': row['predicted'],
            'count': row['count']
        })

    # Crear matriz 2x2
    matrix = {
        (0.0, 0.0): 0, (0.0, 1.0): 0,
        (1.0, 0.0): 0, (1.0, 1.0): 0
    }

    for item in confusion_data:
        matrix[(item['actual'], item['predicted'])] = item['count']

    return matrix

# Usar la función robusta
confusion_matrix = get_confusion_matrix_robust(test_predictions)

print("MATRIZ DE CONFUSIÓN")
print("═" * 40)
print("Actual \\ Predicted |  0  |  1  |")
print("-------------------|-----|-----|")
print(f"        0          |{confusion_matrix[(0.0, 0.0)]:4d}|{confusion_matrix[(0.0, 1.0)]:4d} |")
print("-------------------|-----|-----|")
print(f"        1          |{confusion_matrix[(1.0, 0.0)]:4d} |{confusion_matrix[(1.0, 1.0)]:4d} |")
print("-------------------|-----|-----|")

MATRIZ DE CONFUSIÓN
════════════════════════════════════════
Actual \ Predicted |  0  |  1  |
-------------------|-----|-----|
        0          |21224| 349 |
-------------------|-----|-----|
        1          |4364 |1011 |
-------------------|-----|-----|


In [None]:
def calculate_metrics_from_spark(test_predictions, labelCol="label", predictionCol="prediction"):
    """Calcular métricas directamente desde DataFrame de Spark"""

    # Calcular matriz de confusión manualmente
    confusion_data = test_predictions.groupBy(
        F.col(labelCol).alias("Actual"),
        F.col(predictionCol).alias("Predicted")
    ).count().collect()

    # Inicializar contadores
    tn, fp, fn, tp = 0, 0, 0, 0

    for row in confusion_data:
        if row['Actual'] == 0.0 and row['Predicted'] == 0.0:
            tn = row['count']
        elif row['Actual'] == 0.0 and row['Predicted'] == 1.0:
            fp = row['count']
        elif row['Actual'] == 1.0 and row['Predicted'] == 0.0:
            fn = row['count']
        elif row['Actual'] == 1.0 and row['Predicted'] == 1.0:
            tp = row['count']

    # Calcular métricas
    precision_0 = tn / (tn + fn) if (tn + fn) > 0 else 0
    recall_0 = tn / (tn + fp) if (tn + fp) > 0 else 0
    f1_0 = 2 * (precision_0 * recall_0) / (precision_0 + recall_0) if (precision_0 + recall_0) > 0 else 0

    precision_1 = tp / (tp + fp) if (tp + fp) > 0 else 0
    recall_1 = tp / (tp + fn) if (tp + fn) > 0 else 0
    f1_1 = 2 * (precision_1 * recall_1) / (precision_1 + recall_1) if (precision_1 + recall_1) > 0 else 0

    return {
        'class_0': {'precision': precision_0, 'recall': recall_0, 'f1': f1_0},
        'class_1': {'precision': precision_1, 'recall': recall_1, 'f1': f1_1},
        'confusion_matrix': {'TN': tn, 'FP': fp, 'FN': fn, 'TP': tp}
    }

# Usar esta versión alternativa
spark_metrics = calculate_metrics_from_spark(test_predictions)

print("MÉTRICAS CALCULADAS DIRECTAMENTE DESDE SPARK:")
for class_name, metrics in spark_metrics.items():
    if class_name != 'confusion_matrix':
        print(f" {class_name}: Precision={metrics['precision']:.4f}, Recall={metrics['recall']:.4f}, F1={metrics['f1']:.4f}")

# Mostrar matriz de confusión también
print(f"\n Matriz de Confusión:")
print(f"True Negatives (TN):  {spark_metrics['confusion_matrix']['TN']}")
print(f"False Positives (FP): {spark_metrics['confusion_matrix']['FP']}")
print(f"False Negatives (FN): {spark_metrics['confusion_matrix']['FN']}")
print(f"True Positives (TP):  {spark_metrics['confusion_matrix']['TP']}")

MÉTRICAS CALCULADAS DIRECTAMENTE DESDE SPARK:
 class_0: Precision=0.8295, Recall=0.9838, F1=0.9001
 class_1: Precision=0.7434, Recall=0.1881, F1=0.3002

 Matriz de Confusión:
True Negatives (TN):  21224
False Positives (FP): 349
False Negatives (FN): 4364
True Positives (TP):  1011


In [None]:
# Análisis de overfitting
print("\n ANÁLISIS DE OVERFITTING:")
overfitting_threshold = 0.05  # Diferencia máxima aceptable

for metric in metrics_to_display:
    train_val = results['train_metrics'][metric]
    test_val = results['test_metrics'][metric]
    difference = abs(train_val - test_val)

    if difference > overfitting_threshold:
        print(f" Posible overfitting en {metric}: Diff = {difference:.4f}")
    else:
        print(f"✅ {metric}: Buen generalización (Diff = {difference:.4f})")


 ANÁLISIS DE OVERFITTING:
✅ AUC_ROC: Buen generalización (Diff = 0.0115)
✅ AUC_PR: Buen generalización (Diff = 0.0195)
✅ F1: Buen generalización (Diff = 0.0030)
✅ Precision: Buen generalización (Diff = 0.0071)
✅ Recall: Buen generalización (Diff = 0.0023)
✅ Accuracy: Buen generalización (Diff = 0.0023)


Los resultados del grid search indican que se ha encontrado un modelo con buen rendimiento y excelente capacidad de generalización. La estructura de red neuronal [45, 10, 2] con el optimizador L-BFGS y parámetros específicos (stepSize: 0.1, maxIter: 100) logra un equilibrio óptimo entre aprendizaje y generalización, como evidencia la mínima diferencia entre las métricas de entrenamiento y prueba (todas por debajo del 2%). 

El AUC-ROC de 0.7389 en test demuestra una capacidad aceptable de discriminación entre clases, mientras que el accuracy del 82.51% refleja una precisión general sólida del modelo.

Sin embargo, el análisis detallado revela un desbalance significativo en el rendimiento por clases. Mientras la clase 0 muestra excelentes métricas (Precision: 0.8295, Recall: 0.9838), la clase 1 presenta un recall muy bajo (0.1881), indicando que el modelo identifica correctamente solo el 18.81% de los casos positivos reales. 

Esta disparidad, evidente en la matriz de confusión con 4364 falsos negativos frente a solo 1011 verdaderos positivos, sugiere que el modelo tiene tendencia a predecir la clase mayoritaria.

__________________

## MLP con los mejores hiperparámetros

* Uso de todos los registros

In [None]:
# 🔹 Lectura de datos [100% de los registros]
full_data = spark.read.csv("data_to_model.csv", header=True, inferSchema=True)

In [None]:
# 🔹 Configuración inicial

# Variable objetivo
label_col = "default"

# Detección automática de columnas
categorical_cols = [
    col for col in full_data.columns
    if full_data.schema[col].dataType.typeName() == 'string' and col != label_col]
numeric_cols = [
    col for col in full_data.columns
    if full_data.schema[col].dataType.typeName() in ['integer', 'double', 'float'] and col != label_col]

print(f"Variables categóricas: {categorical_cols}")
print(f"Variables numéricas: {numeric_cols}")

Variables categóricas: ['motivo_prestamo', 'tipo_vivienda']
Variables numéricas: ['monto_aprobado', 'tasa_interes', 'plazo_meses', 'ingreso_anual', 'antiguedad_laboral', 'estado_verif_ingreso', 'promedio_fico', 'anio_apertura_credito', 'cuentas_hipotecarias', 'total_cuentas_credito', 'cuentas_tarjeta_credito', 'saldo_revolvente', 'uso_credito_revolvente', 'limite_credito_total', 'dti', 'meses_ultima_consulta', 'meses_tarjeta_nueva', 'lineas_credito_12m', 'porcentaje_sin_moras', 'moras_2_year', 'monto_total_cobranzas', 'tuvo_acuerdo_pago', 'subcategoria_credito', 'anio_emision_prestamo', 'mes_emision_prestamo']


In [None]:
# 🔹 Preprocesamiento

# Convertir label a DoubleType y filtrar nulos
full_data = full_data.withColumn("label", F.col(label_col).cast(DoubleType()))\
       .filter(F.col("label").isNotNull())

# Imputación de valores nulos en variables numéricas
imputer = Imputer(
    inputCols=numeric_cols,
    outputCols=[f"{c}_imp" for c in numeric_cols],
    strategy="median"
)
num_imp_cols = [f"{c}_imp" for c in numeric_cols]

# Indexado y encoding de variables categóricas
indexers = [StringIndexer(inputCol=c, outputCol=f"{c}_idx", handleInvalid="keep")
            for c in categorical_cols]

encoder = OneHotEncoder(
    inputCols=[f"{c}_idx" for c in categorical_cols],
    outputCols=[f"{c}_oh" for c in categorical_cols]
)
ohe_cols = [f"{c}_oh" for c in categorical_cols]

# Assembler y scaler
assembler = VectorAssembler(
    inputCols=num_imp_cols + ohe_cols,
    outputCol="features_raw",
    handleInvalid="keep"
)

scaler = StandardScaler(
    inputCol="features_raw",
    outputCol="features",
    withStd=True,
    withMean=True
)

In [None]:
# 🔹 Pipeline de preprocesamiento
preprocessing_stages = [imputer] + indexers + [encoder, assembler, scaler]
preprocessing_pipeline = Pipeline(stages=preprocessing_stages)

for i, stage in enumerate(preprocessing_pipeline.getStages()):
    print(f"Paso {i+1}: {stage.__class__.__name__}")
    print(stage.explainParams())
    print("-" * 50)

# Ajustar el pipeline de preprocesamiento
print("Ajustando pipeline de preprocesamiento...")
preprocessing_model = preprocessing_pipeline.fit(full_data)

# Transformar todos los datos
full_data_transformed = preprocessing_model.transform(full_data)

# Obtener dimensión de entrada
sample_vec = (df_transformed.limit(1)
              .select(F.col("features").alias("sample_features"))
              .head()["sample_features"])
input_dim = sample_vec.size
print(f"Dimensión de entrada = {input_dim}")


Paso 1: Imputer
inputCol: input column name. (undefined)
inputCols: input column names. (current: ['monto_aprobado', 'tasa_interes', 'plazo_meses', 'ingreso_anual', 'antiguedad_laboral', 'estado_verif_ingreso', 'promedio_fico', 'anio_apertura_credito', 'cuentas_hipotecarias', 'total_cuentas_credito', 'cuentas_tarjeta_credito', 'saldo_revolvente', 'uso_credito_revolvente', 'limite_credito_total', 'dti', 'meses_ultima_consulta', 'meses_tarjeta_nueva', 'lineas_credito_12m', 'porcentaje_sin_moras', 'moras_2_year', 'monto_total_cobranzas', 'tuvo_acuerdo_pago', 'subcategoria_credito', 'anio_emision_prestamo', 'mes_emision_prestamo'])
missingValue: The placeholder for the missing values. All occurrences of missingValue will be imputed. (default: nan)
outputCol: output column name. (default: Imputer_b4c2aa4cccf0__output)
outputCols: output column names. (current: ['monto_aprobado_imp', 'tasa_interes_imp', 'plazo_meses_imp', 'ingreso_anual_imp', 'antiguedad_laboral_imp', 'estado_verif_ingreso_i

In [None]:
# 🔹 Split train-test
train_data, test_data = full_data_transformed.randomSplit([0.8, 0.2], seed=42)

print(f"Train size: {train_data.count()}")
print(f"Test size: {test_data.count()}")
print("Proporción de clases en train:")
train_data.groupBy('label').count().show()
print("Proporción de clases en test:")
test_data.groupBy('label').count().show()

Train size: 1076425
Test size: 268885
Proporción de clases en train:
+-----+------+
|label| count|
+-----+------+
|  0.0|861332|
|  1.0|215093|
+-----+------+

Proporción de clases en test:
+-----+------+
|label| count|
+-----+------+
|  0.0|215419|
|  1.0| 53466|
+-----+------+



* **Modelo MLP**

In [39]:
# 🔹 Configurar modelo con los mejores hiperparámetros
final_mlp = MultilayerPerceptronClassifier(
    featuresCol="features",
    labelCol="label",
    predictionCol="prediction",
    rawPredictionCol="rawPrediction",
    layers=[45, 10, 2],      # Mejores parámetros
    stepSize=0.01,           # Learning rate óptimo
    maxIter=100,
    blockSize=128,           # Block size para más datos
    solver="l-bfgs",         # Algoritmo eficiente
    tol=1e-06,
    seed=42
)

In [40]:
# 🔹 Entrenar modelo final
print("Entrenando modelo final-----")
start_time = time.time()

final_model = final_mlp.fit(train_data)

training_time = (time.time() - start_time) / 60
print(f"Entrenamiento completado en {training_time:.2f} minutos")


Entrenando modelo final-----
Entrenamiento completado en 5.36 minutos


In [44]:
# 🔹 Evaluar el modelo

print("Evaluando modelo final con métricas")
start_time = time.time()
test_predictions = final_model.transform(test_data)
fit_time = (time.time() - start_time)/60
print(f"Tiempo de evaluación: {fit_time:.2f} minutos")

# Evaluación comprehensiva
results = evaluate_model_comprehensive(final_model, train_data, test_data)

# Mostrar resultados en formato tabular
print("\n" + "═" * 80)
print("RENDIMIENTO DEL MEJOR MODELO - MÉTRICAS COMPLETAS")
print("═" * 80)
print(f"{'Métrica':<15} {'Train':<10} {'Test':<10} {'Diferencia':<12}")
print("─" * 80)

metrics_to_display = ['AUC_ROC', 'AUC_PR', 'F1', 'Precision', 'Recall', 'Accuracy']

for metric in metrics_to_display:
    train_val = results['train_metrics'][metric]
    test_val = results['test_metrics'][metric]
    difference = abs(train_val - test_val)

    print(f"{metric:<15} {train_val:.4f}    {test_val:.4f}    {difference:.4f}")

print("═" * 80)

Evaluando modelo final con métricas
Tiempo de evaluación: 0.00 minutos

════════════════════════════════════════════════════════════════════════════════
RENDIMIENTO DEL MEJOR MODELO - MÉTRICAS COMPLETAS
════════════════════════════════════════════════════════════════════════════════
Métrica         Train      Test       Diferencia  
────────────────────────────────────────────────────────────────────────────────
AUC_ROC         0.7426    0.7440    0.0014
AUC_PR          0.5004    0.5026    0.0022
F1              0.7808    0.7829    0.0020
Precision       0.8232    0.8243    0.0011
Recall          0.8275    0.8288    0.0013
Accuracy        0.8275    0.8288    0.0013
════════════════════════════════════════════════════════════════════════════════


In [46]:
def get_confusion_matrix_robust(predictions, labelCol="label", predictionCol="prediction"):
    """Versión robusta para obtener matriz de confusión"""

    # Calcular conteos
    confusion_counts = predictions.groupBy(
        F.col(labelCol).alias("actual"),
        F.col(predictionCol).alias("predicted")
    ).count()

    # Colectar datos de forma segura
    confusion_data = []
    for row in confusion_counts.collect():
        confusion_data.append({
            'actual': row['actual'],
            'predicted': row['predicted'],
            'count': row['count']
        })

    # Crear matriz 2x2
    matrix = {
        (0.0, 0.0): 0, (0.0, 1.0): 0,
        (1.0, 0.0): 0, (1.0, 1.0): 0
    }

    for item in confusion_data:
        matrix[(item['actual'], item['predicted'])] = item['count']

    return matrix

# Usar la función robusta
confusion_matrix = get_confusion_matrix_robust(test_predictions)

print("MATRIZ DE CONFUSIÓN")
print("═" * 40)
print("Actual \\ Predicted |  0  |  1  |")
print("-------------------|------|-----|")
print(f"        0          |{confusion_matrix[(0.0, 0.0)]:4d}|{confusion_matrix[(0.0, 1.0)]:4d} |")
print("-------------------|------|-----|")
print(f"        1          |{confusion_matrix[(1.0, 0.0)]:4d} |{confusion_matrix[(1.0, 1.0)]:4d} |")
print("-------------------|------|-----|")

MATRIZ DE CONFUSIÓN
════════════════════════════════════════
Actual \ Predicted |  0  |  1  |
-------------------|------|-----|
        0          |212951|2468 |
-------------------|------|-----|
        1          |43557 |9909 |
-------------------|------|-----|


In [47]:
def calculate_metrics_from_spark(test_predictions, labelCol="label", predictionCol="prediction"):
    """Calcular métricas directamente desde DataFrame de Spark"""

    # Calcular matriz de confusión manualmente
    confusion_data = test_predictions.groupBy(
        F.col(labelCol).alias("Actual"),
        F.col(predictionCol).alias("Predicted")
    ).count().collect()

    # Inicializar contadores
    tn, fp, fn, tp = 0, 0, 0, 0

    for row in confusion_data:
        if row['Actual'] == 0.0 and row['Predicted'] == 0.0:
            tn = row['count']
        elif row['Actual'] == 0.0 and row['Predicted'] == 1.0:
            fp = row['count']
        elif row['Actual'] == 1.0 and row['Predicted'] == 0.0:
            fn = row['count']
        elif row['Actual'] == 1.0 and row['Predicted'] == 1.0:
            tp = row['count']

    # Calcular métricas
    precision_0 = tn / (tn + fn) if (tn + fn) > 0 else 0
    recall_0 = tn / (tn + fp) if (tn + fp) > 0 else 0
    f1_0 = 2 * (precision_0 * recall_0) / (precision_0 + recall_0) if (precision_0 + recall_0) > 0 else 0

    precision_1 = tp / (tp + fp) if (tp + fp) > 0 else 0
    recall_1 = tp / (tp + fn) if (tp + fn) > 0 else 0
    f1_1 = 2 * (precision_1 * recall_1) / (precision_1 + recall_1) if (precision_1 + recall_1) > 0 else 0

    return {
        'class_0': {'precision': precision_0, 'recall': recall_0, 'f1': f1_0},
        'class_1': {'precision': precision_1, 'recall': recall_1, 'f1': f1_1},
        'confusion_matrix': {'TN': tn, 'FP': fp, 'FN': fn, 'TP': tp}
    }

# Usar esta versión alternativa
spark_metrics = calculate_metrics_from_spark(test_predictions)

print("MÉTRICAS CALCULADAS DIRECTAMENTE DESDE SPARK:")
for class_name, metrics in spark_metrics.items():
    if class_name != 'confusion_matrix':
        print(f" {class_name}: Precision={metrics['precision']:.4f}, Recall={metrics['recall']:.4f}, F1={metrics['f1']:.4f}")

# Mostrar matriz de confusión también
print(f"\n Matriz de Confusión:")
print(f"True Negatives (TN):  {spark_metrics['confusion_matrix']['TN']}")
print(f"False Positives (FP): {spark_metrics['confusion_matrix']['FP']}")
print(f"False Negatives (FN): {spark_metrics['confusion_matrix']['FN']}")
print(f"True Positives (TP):  {spark_metrics['confusion_matrix']['TP']}")

MÉTRICAS CALCULADAS DIRECTAMENTE DESDE SPARK:
 class_0: Precision=0.8302, Recall=0.9885, F1=0.9025
 class_1: Precision=0.8006, Recall=0.1853, F1=0.3010

 Matriz de Confusión:
True Negatives (TN):  212951
False Positives (FP): 2468
False Negatives (FN): 43557
True Positives (TP):  9909


In [48]:
# Análisis de overfitting
print("\n ANÁLISIS DE OVERFITTING:")
overfitting_threshold = 0.05  # Diferencia máxima aceptable

for metric in metrics_to_display:
    train_val = results['train_metrics'][metric]
    test_val = results['test_metrics'][metric]
    difference = abs(train_val - test_val)

    if difference > overfitting_threshold:
        print(f" Posible overfitting en {metric}: Diff = {difference:.4f}")
    else:
        print(f"✅ {metric}: Buen generalización (Diff = {difference:.4f})")


 ANÁLISIS DE OVERFITTING:
✅ AUC_ROC: Buen generalización (Diff = 0.0014)
✅ AUC_PR: Buen generalización (Diff = 0.0022)
✅ F1: Buen generalización (Diff = 0.0020)
✅ Precision: Buen generalización (Diff = 0.0011)
✅ Recall: Buen generalización (Diff = 0.0013)
✅ Accuracy: Buen generalización (Diff = 0.0013)


El modelo final demuestra un excelente rendimiento en términos de generalización, con diferencias mínimas entre las métricas de entrenamiento y prueba (todas inferiores al 0,23 %), lo que indica una efectiva prevención del sobreajuste. Un AUC-ROC de 0,7440 y una precisión del 82,88 % reflejan una capacidad predictiva sólida y consistente. 

Sin embargo, persiste un desafío crítico en el desequilibrio de clases: mientras que la clase mayoritaria (0) muestra un rendimiento excepcional con un recall del 98,85 % —capturando casi todos sus casos verdaderos—, la clase minoritaria (1) tiene un recall extremadamente bajo del 18,53 %, lo que significa que el modelo solo identifica correctamente menos de una quinta parte de los casos positivos reales.

Esta disparidad significativa, que se evidencia en los 43 557 falsos negativos frente a los 9909 verdaderos positivos, indica que el modelo tiende a predecir la clase mayoritaria. Aunque las métricas agregadas son sólidas, el bajo índice de recuperación para la clase 1 limita considerablemente la utilidad del modelo en aplicaciones en las que la detección de casos positivos es crucial, como en el diagnóstico médico o la detección de fraudes, ya que los falsos negativos pueden tener consecuencias muy graves. Se recomienda implementar estrategias adicionales para abordar este desequilibrio, como ajustes en los umbrales de clasificación, técnicas de muestreo o ponderación de clases.