# Modelado con PySpark (Versión Optimizada)

## Objetivo

Este notebook implementa el pipeline de machine learning para la clasificación de préstamos utilizando PySpark. Esta versión se centra en optimizar el rendimiento y la eficiencia en un entorno local.

1.  **Iniciar una SparkSession** optimizada para el uso de todos los recursos locales.
2.  **Cargar** el conjunto de datos y aplicar técnicas de **cache y repartitioning**.
3.  **Construir un `Pipeline` de PySpark** para la transformación de características.
4.  **Entrenar un `RandomForestClassifier`** y optimizar sus hiperparámetros usando `TrainValidationSplit` para mayor velocidad.
5.  **Evaluar** el rendimiento del modelo en el conjunto de prueba.
6.  **Guardar** el modelo entrenado y sus métricas.

-   **Entrada:** `data/processed/loans_cleaned.parquet`
-   **Salidas:**
    -   `models/pyspark_random_forest/` (Carpeta con el modelo de Spark)
    -   `results/pyspark_metrics.json` (Métricas de evaluación y tiempo de entrenamiento)

### 1. Importación de Librerías e Inicio de SparkSession

Iniciamos una `SparkSession` con configuraciones específicas para mejorar el rendimiento en una máquina local.

In [1]:
import os
os.environ['HADOOP_HOME'] = 'C:\\hadoop'
os.environ['PATH'] += ';C:\\hadoop\\bin'

In [2]:
import time
import json
import os

from pyspark.sql import SparkSession, types as T
from pyspark.ml import Pipeline as SparkPipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, Imputer
from pyspark.ml.classification import RandomForestClassifier as SparkRandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.ml.tuning import TrainValidationSplit, ParamGridBuilder

# Ajustes para optimización local:
# - master("local[*]"): Usa todos los cores disponibles en la máquina.
# - config("spark.driver.memory", "10g"): Aumenta la memoria asignada al driver.
# - setLogLevel("WARN"): Reduce la cantidad de logs para una salida más limpia.
spark = SparkSession.builder \
    .appName("PySparkLoanClassification") \
    .master("local[*]") \
    .config("spark.driver.memory", "10g") \
    .getOrCreate()

spark.sparkContext.setLogLevel("WARN")

# Ajustar particiones de shuffle: un parámetro clave para el rendimiento.
# Evita tener demasiadas particiones pequeñas, lo que genera overhead.
num_cores = max(1, spark.sparkContext.defaultParallelism)
spark.conf.set("spark.sql.shuffle.partitions", str(max(8, num_cores * 4)))

print(f"Spark iniciado. Usando {num_cores} cores.")

Spark iniciado. Usando 12 cores.


### 2. Definición de Rutas y Carga de Datos

In [3]:
PROCESSED_DATA_PATH = 'data/processed/loans_cleaned.parquet'
MODEL_PATH = 'models/pyspark_random_forest'
RESULTS_PATH = 'results/pyspark_metrics.json'
os.makedirs('models', exist_ok=True)
os.makedirs('results', exist_ok=True)

df_spark = spark.read.parquet(PROCESSED_DATA_PATH)
# Evitamos llamar a .count() repetidamente, ya que es una acción costosa.
print(f"Registros totales: {df_spark.count()}")

Registros totales: 1345310


### 3. Preparación de Datos: División, Repartición y Cache

Este es un paso crucial para el rendimiento. Dividimos los datos y luego los `repartition` y `cache`.
-   **`repartition`**: Asegura que los datos se distribuyan uniformemente entre los cores, maximizando el paralelismo.
-   **`cache`**: Almacena el DataFrame en memoria después de la primera acción. Esto evita que Spark tenga que volver a calcularlo desde el principio en cada paso posterior.

In [4]:
# Dividir datos una sola vez y cachear; repartir para buen paralelismo
train_data, test_data = df_spark.randomSplit([0.8, 0.2], seed=42)

# Reparticionamos para que cada core tenga una cantidad de trabajo razonable
train_data = train_data.repartition(num_cores * 2).cache()
test_data = test_data.repartition(num_cores * 2).cache()

# Spark es "lazy", por lo que las transformaciones no se ejecutan hasta que se llama a una acción.
# Usamos .count() para "materializar" la caché y forzar la ejecución de los pasos anteriores.
print(f"Registros de entrenamiento (cacheados): {train_data.count()}")
print(f"Registros de prueba (cacheados): {test_data.count()}")

Registros de entrenamiento (cacheados): 1076235
Registros de prueba (cacheados): 269075


### 4. Construcción del Pipeline de PySpark ML

El pipeline define las etapas de preprocesamiento y modelado. Usamos una forma más robusta de identificar los tipos de columnas.

In [5]:
label_col = 'default'
features = [c for c in df_spark.columns if c != label_col]

categorical_cols = []
numerical_cols = []

# Identificar columnas por su tipo de dato real en el schema
for field in df_spark.schema.fields:
    if field.name in features:
        if isinstance(field.dataType, T.StringType):
            categorical_cols.append(field.name)
        elif isinstance(field.dataType, (T.IntegerType, T.DoubleType, T.FloatType, T.LongType, T.ShortType)):
            numerical_cols.append(field.name)

print(f"Columnas Categóricas: {len(categorical_cols)}, Columnas Numéricas: {len(numerical_cols)}")

# --- Etapas del Pipeline ---
imputer = Imputer(strategy='median',
                  inputCols=numerical_cols,
                  outputCols=[f"{c}_imputed" for c in numerical_cols])

indexers = [StringIndexer(inputCol=c, outputCol=f"{c}_index", handleInvalid='keep') for c in categorical_cols]
encoders = [OneHotEncoder(inputCol=f"{c}_index", outputCol=f"{c}_vec") for c in categorical_cols]

assembler_inputs = [f"{c}_imputed" for c in numerical_cols] + [f"{c}_vec" for c in categorical_cols]
assembler = VectorAssembler(inputCols=assembler_inputs, outputCol="features")

rf = SparkRandomForestClassifier(labelCol=label_col, featuresCol="features", seed=42)

# Unir todas las etapas en un único pipeline
all_stages = [imputer] + indexers + encoders + [assembler, rf]
pipeline = SparkPipeline(stages=all_stages)

Columnas Categóricas: 3, Columnas Numéricas: 6


### 5. Entrenamiento y Ajuste de Hiperparámetros con TrainValidationSplit

Para acelerar la búsqueda de hiperparámetros, usamos `TrainValidationSplit` en lugar de `CrossValidator`.
-   `CrossValidator` entrena `k` modelos por cada combinación de hiperparámetros (ej: 3 pliegues = 3 entrenamientos).
-   `TrainValidationSplit` divide los datos de entrenamiento una sola vez (ej: 80/20) y entrena solo un modelo por combinación. Es mucho más rápido y es ideal para desarrollos iniciales o datasets muy grandes.

In [6]:
# Definir el grid de parámetros
paramGrid = ParamGridBuilder() \
    .addGrid(rf.numTrees, [50, 100]) \
    .addGrid(rf.maxDepth, [5, 10]) \
    .build()

evaluator = BinaryClassificationEvaluator(labelCol=label_col, metricName="areaUnderROC")

# Configurar TrainValidationSplit
tvs = TrainValidationSplit(
    estimator=pipeline,
    estimatorParamMaps=paramGrid,
    evaluator=evaluator,
    trainRatio=0.8,   # 80% de los datos de entrenamiento se usan para entrenar, 20% para validar
    parallelism=max(1, num_cores // 2)  # Controla cuántos modelos se entrenan en paralelo
)

print("Iniciando TrainValidationSplit...")
start_time = time.time()
model = tvs.fit(train_data)
training_time = time.time() - start_time
print(f"Entrenamiento completado en {training_time:.1f} segundos.")

Iniciando TrainValidationSplit...
Entrenamiento completado en 484.3 segundos.


### 6. Evaluación y Guardado del Modelo

Finalmente, evaluamos el mejor modelo en el conjunto de prueba y guardamos tanto el modelo como las métricas.

In [7]:
# Realizar predicciones en el conjunto de prueba
predictions = model.transform(test_data)

# Calcular ROC AUC
auc = evaluator.evaluate(predictions)

# Función auxiliar para calcular otras métricas
def get_multiclass_metric(metric_name, preds):
    ev = MulticlassClassificationEvaluator(labelCol=label_col, predictionCol="prediction", metricName=metric_name)
    return ev.evaluate(preds)

metrics = {
    "Accuracy": get_multiclass_metric("accuracy", predictions),
    "Precision": get_multiclass_metric("weightedPrecision", predictions),
    "Recall": get_multiclass_metric("weightedRecall", predictions),
    "F1-score": get_multiclass_metric("f1", predictions),
    "ROC AUC": auc
}

print("Métricas de evaluación en el conjunto de prueba:")
for metric, value in metrics.items():
    print(f"- {metric}: {value:.4f}")

# Guardar el mejor modelo (pipeline completo)
best_model_spark = model.bestModel
best_model_spark.write().overwrite().save(MODEL_PATH)

# Guardar los resultados en formato JSON
results = {"metrics": metrics, "training_time_seconds": training_time}
with open(RESULTS_PATH, "w") as f:
    json.dump(results, f, indent=2)

print(f"\nModelo guardado en: '{MODEL_PATH}'")
print(f"Resultados guardados en: '{RESULTS_PATH}'")

Métricas de evaluación en el conjunto de prueba:
- Accuracy: 0.8011
- Precision: 0.6418
- Recall: 0.8011
- F1-score: 0.7126
- ROC AUC: 0.6947

Modelo guardado en: 'models/pyspark_random_forest'
Resultados guardados en: 'results/pyspark_metrics.json'


### 7. Detener la Sesión de Spark

Es una buena práctica detener explícitamente la `SparkSession` para liberar todos los recursos del clúster.

In [8]:
spark.stop()
print("SparkSession detenida.")

SparkSession detenida.


---
### Fin del Notebook 3

Hemos implementado un pipeline de modelado optimizado con PySpark, listo para ser comparado con el enfoque de Scikit-learn.
---