# Uso de PySpark

Construir un modelo de clasificación supervisada usando MLP (Multilayer Perceptron/Red Neuronal Multicapa) para predecir si un préstamo emitido por la plataforma Lending Club resultará en default (1) o será pagado completamente (0). Se comparará el desempeño de los modelos construidos con scikit-learn y PySpark. Además, se aplicará LIME para interpretar predicciones.

## Dataset

Nombre: Lending Club Loan Data (2007–2020)

Fuente: Kaggle Dataset - Lending Club

Tamaño: más de 1,3 millones de registros

Características: variables socioeconómicas, financieras, de crédito, empleo y propósito del préstamo

**Respuesta:**

Variable binaria: loan_status

0 = Fully Paid

1 = Charged Off (default)

Generar la variable default basada en la columna loan_status:

In [1]:
import pandas as pd
import numpy as np 
import matplotlib.pyplot as plt
import seaborn as sns       
import warnings
warnings.filterwarnings('ignore')

import re
from sklearn.model_selection import train_test_split
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder, StandardScaler, FunctionTransformer
from sklearn.impute import SimpleImputer
from sklearn.linear_model import LogisticRegression

In [2]:
import time
import pandas as pd
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.svm import SVC
from sklearn.metrics import roc_auc_score
from pyspark.sql import SparkSession
from pyspark.ml.classification import LinearSVC
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [3]:
import os, sys
# Estas dos líneas ayudan a que PySpark use tu mismo intérprete
os.environ["PYSPARK_PYTHON"] = sys.executable
os.environ["PYSPARK_DRIVER_PYTHON"] = sys.executable

from pyspark.sql import SparkSession

spark = (SparkSession.builder
    .appName("mlp-classifier")
    .master("local[6]")                          # 6 de 8 núcleos
    .config("spark.driver.memory","8g")          # 8 GB de 16
    .config("spark.sql.shuffle.partitions","64") # menos particiones en local
    .config("spark.sql.adaptive.enabled","true")
    .config("spark.sql.execution.arrow.pyspark.enabled","true")
    .config("spark.ui.enabled","true")                     # UI web
    .config("spark.ui.showConsoleProgress","true")         # barra en consola
    .getOrCreate())


In [4]:
print("Spark:", spark.version)
print("Parallelism:", spark.sparkContext.defaultParallelism)
print("Memory:", spark.sparkContext._conf.get("spark.driver.memory"))

Spark: 3.5.5
Parallelism: 6
Memory: 8g


In [5]:
# 0) Verifica dependencias en ESTE kernel/env
import sys, pandas as pd
print("Python:", sys.executable)
print("pandas:", pd.__version__)
try:
    import pyarrow as pa
    print("pyarrow:", pa.__version__)
except Exception as e:
    print("pyarrow NO disponible:", repr(e))


Python: C:\Users\elias\anaconda3\envs\ml_subject\python.exe
pandas: 2.3.1
pyarrow: 16.1.0


In [6]:
# Load the dataset
df = pd.read_csv('C:/Users/elias/OneDrive/Desktop/MachineLearning/JBOOKS/Project1DL/accepted_2007_to_2018Q4.csv')

In [7]:
df["default"] = df["loan_status"].apply(lambda x: 1 if x == "Charged Off" else 0)

In [8]:
df_copy = df.copy()

In [9]:
df_copy = df_copy.dropna(subset=['loan_amnt', 'int_rate', 'fico_range_low', 'open_rv_24m'
                   , 'grade', 'sub_grade', 'disbursement_method', 'application_type', 'initial_list_status', 'home_ownership', 'verification_status', 'term', 'addr_state'
                   ,'default'])

In [10]:
df_copy = df_copy[['loan_amnt', 'int_rate', 'fico_range_low', 'open_rv_24m'
                   , 'grade', 'sub_grade', 'disbursement_method', 'application_type', 'initial_list_status', 'home_ownership', 'verification_status', 'term', 'addr_state'
                   ,'default']]

## Codificación categóricas

In [11]:
cat_features = [col for col in df_copy.columns if df_copy[col].dtype == 'object']

encoder = OneHotEncoder(drop="first", handle_unknown="ignore", sparse_output=False)

encoded = encoder.fit_transform(df_copy[cat_features])

# Convertimos a DataFrame con nombres de columnas
encoded_df = pd.DataFrame(encoded, columns=encoder.get_feature_names_out(cat_features), index=df_copy.index)

# Unimos al dataset original (quitando las categóricas originales)
df_encoded = df_copy.drop(columns=cat_features).join(encoded_df)

In [12]:
df_encoded

Unnamed: 0,loan_amnt,int_rate,fico_range_low,open_rv_24m,default,grade_B,grade_C,grade_D,grade_E,grade_F,...,addr_state_SD,addr_state_TN,addr_state_TX,addr_state_UT,addr_state_VA,addr_state_VT,addr_state_WA,addr_state_WI,addr_state_WV,addr_state_WY
0,3600.0,13.99,675.0,3.0,0,0.0,1.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
1,24700.0,11.99,715.0,3.0,0,0.0,1.0,0.0,0.0,0.0,...,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
2,20000.0,10.78,695.0,2.0,0,1.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
3,35000.0,14.85,785.0,1.0,0,0.0,1.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
4,10400.0,22.45,695.0,7.0,0,0.0,0.0,0.0,0.0,1.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
2260694,24000.0,12.79,665.0,2.0,0,0.0,1.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
2260695,24000.0,10.49,695.0,2.0,0,1.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0
2260696,40000.0,10.49,705.0,0.0,0,1.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
2260697,24000.0,14.49,660.0,5.0,1,0.0,1.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0


In [13]:
df_encoded.info()

<class 'pandas.core.frame.DataFrame'>
Index: 1394539 entries, 0 to 2260698
Columns: 104 entries, loan_amnt to addr_state_WY
dtypes: float64(103), int64(1)
memory usage: 1.1 GB


In [14]:
X = df_encoded.drop(columns=['default']).values
y = df_encoded['default'].values

In [15]:
# Convertir a Spark DataFrame
df_pd = pd.DataFrame(X, columns=[f"f{i}" for i in range(X.shape[1])])
df_pd["label"] = y

In [16]:
df_spark = spark.createDataFrame(df_pd)

In [17]:
"""# 1. Guarda tu DataFrame pandas como parquet
df_pd.to_parquet("temp_spark_data.parquet", index=False)

# 2. Lee desde Spark (será instantáneo)
df_spark = spark.read.parquet("temp_spark_data.parquet")

# 3. Verifica que funcionó
print(f"Filas: {df_spark.count()}")
print(f"Columnas: {len(df_spark.columns)}")"""

'# 1. Guarda tu DataFrame pandas como parquet\ndf_pd.to_parquet("temp_spark_data.parquet", index=False)\n\n# 2. Lee desde Spark (será instantáneo)\ndf_spark = spark.read.parquet("temp_spark_data.parquet")\n\n# 3. Verifica que funcionó\nprint(f"Filas: {df_spark.count()}")\nprint(f"Columnas: {len(df_spark.columns)}")'

In [18]:
# VectorAssembler
assembler = VectorAssembler(inputCols=[f"f{i}" for i in range(X.shape[1])], outputCol="features")
df_spark = assembler.transform(df_spark).select("features", "label")

In [19]:
from pyspark.storagelevel import StorageLevel

# 1) Forzar particiones físicas y CACHEAR antes del split (clave)
BASE_PARTS = 64
base = (df_spark
        .repartition(BASE_PARTS)                       # ← garantiza ~64 tasks por stage
        .persist(StorageLevel.MEMORY_AND_DISK))
base.count()                                           # materializa el cache

print("particiones reales:", base.rdd.getNumPartitions())

# 2) Split a partir del base cacheado
train_df, test_df = base.randomSplit([0.8, 0.2], seed=42)
#train_df, test_df = df_spark.randomSplit([0.8, 0.2], seed=42)

particiones reales: 64


In [20]:


# En otra celda, verifica si está usando todos los cores
print("Cores configurados:", spark.conf.get("spark.sql.adaptive.coalescePartitions.enabled"))
print("Parallelism:", spark.sparkContext.defaultParallelism)

Cores configurados: true
Parallelism: 6


In [21]:
# Verifica el estado de memoria
import psutil
print(f"RAM disponible: {psutil.virtual_memory().available / (1024**3):.1f} GB")
print(f"RAM total: {psutil.virtual_memory().total / (1024**3):.1f} GB")
print(f"CPU usage: {psutil.cpu_percent()}%")

RAM disponible: 0.4 GB
RAM total: 15.7 GB
CPU usage: 26.5%


In [22]:
from pyspark.ml.classification import MultilayerPerceptronClassifier
from pyspark.ml.feature import StandardScaler
from pyspark.ml import Pipeline
# MLP básico
# layers = [num_features, hidden1, hidden2, ..., num_classes]
num_features = len(df_spark.columns) - 1  # Excluye la columna label
num_classes = 2  # Para clasificación binaria

# 3) Preprocesamiento + MLP
feature_dim = len(train_df.select("features").first()[0])  # detecta tamaño real
scaler = StandardScaler(inputCol="features", outputCol="scaled_features",
                        withStd=True, withMean=True)

mlp = MultilayerPerceptronClassifier(
    featuresCol="scaled_features", labelCol="label",
    layers=[feature_dim, 100, 2], maxIter=200, seed=42
)

pipeline = Pipeline(stages=[scaler, mlp])

In [23]:
from pyspark.ml.tuning import ParamGridBuilder

paramGrid_mlp = (ParamGridBuilder()
                 .addGrid(mlp.layers, [
                     [feature_dim, 10, 2],
                     [feature_dim, 50, 2],
                     [feature_dim, 100, 2],
                 ])
                 .addGrid(mlp.maxIter, [100, 200])
                 .build())

evaluator = BinaryClassificationEvaluator(labelCol="label", metricName="areaUnderROC")
# Total: 3 arquitecturas × 2 maxIter × 3 folds = 18 modelos

In [24]:
cv_mlp = CrossValidator(
    estimator=pipeline,
    estimatorParamMaps=paramGrid_mlp,
    evaluator=evaluator,
    numFolds=3,
    parallelism=6  # Usa 6 núcleos
)

In [25]:
import time
t0 = time.perf_counter()
mlp_model = cv_mlp.fit(train_df)
elapsed = time.perf_counter() - t0
h, rem = divmod(int(elapsed), 3600); m, s = divmod(rem, 60)
print(f"⏱️ CV completado en {h}h {m:02d}m {s:02d}s")


⏱️ CV completado en 0h 52m 10s


In [None]:
from pyspark.sql import functions as F
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

# Predicciones
pred = mlp_model.transform(test_df)

# Accuracy
acc = MulticlassClassificationEvaluator(
    labelCol="label", predictionCol="prediction", metricName="accuracy"
).evaluate(pred)

# AUC-ROC
auc = BinaryClassificationEvaluator(
    labelCol="label", rawPredictionCol="rawPrediction", metricName="areaUnderROC"
).evaluate(pred)

# TP, FP, TN, FN (asumiendo clase positiva = 1)
agg = pred.agg(
    F.sum(F.when((F.col("label") == 1) & (F.col("prediction") == 1), 1).otherwise(0)).alias("tp"),
    F.sum(F.when((F.col("label") == 0) & (F.col("prediction") == 1), 1).otherwise(0)).alias("fp"),
    F.sum(F.when((F.col("label") == 0) & (F.col("prediction") == 0), 1).otherwise(0)).alias("tn"),
    F.sum(F.when((F.col("label") == 1) & (F.col("prediction") == 0), 1).otherwise(0)).alias("fn"),
).collect()[0]

tp, fp, tn, fn = int(agg["tp"]), int(agg["fp"]), int(agg["tn"]), int(agg["fn"])

# Precision / Recall / F1 (binario, clase positiva = 1)
prec = tp / (tp + fp) if (tp + fp) else 0.0
rec  = tp / (tp + fn) if (tp + fn) else 0.0
f1   = (2 * prec * rec / (prec + rec)) if (prec + rec) else 0.0

# Impresión en tu formato
print(f"Accuracy : {acc:.4f}")
print(f"Precision: {prec:.4f}")
print(f"Recall   : {rec:.4f}")
print(f"F1-score : {f1:.4f}")
print(f"ROC AUC  : {auc:.4f}")


Accuracy : 0.9136
Precision: 0.4444
Recall   : 0.0101
F1-score : 0.0198
ROC AUC  : 0.7545


In [34]:
import pyspark.sql.functions as F
from pyspark.ml.functions import vector_to_array
import numpy as np
from sklearn.metrics import precision_recall_curve

# 1) Conjunto de validación (ajusta si tienes uno aparte)
valid_df = test_df

# 2) Probabilidades P(y=1) con tu mejor pipeline (escalar + MLP)
best_pipe = mlp_model.bestModel  # PipelineModel
pred_val = (best_pipe.transform(valid_df)
            .select(
                F.col("label").cast("int").alias("label"),
                vector_to_array("probability").getItem(1).alias("score")
            ))

# 3) Colecta liviana al driver (solo dos columnas numéricas)
pairs = pred_val.select("score", "label").rdd.map(lambda r: (float(r[0]), int(r[1]))).collect()
scores = np.fromiter((p[0] for p in pairs), dtype=np.float64)
labels = np.fromiter((p[1] for p in pairs), dtype=np.int8)

# 4) Curva PR y métricas por umbral (idéntico a tu snippet de sklearn)
prec, rec, thr = precision_recall_curve(labels, scores)  # len(thr)=len(prec)-1
prec_ = prec[:-1]
rec_  = rec[:-1]
thr_  = thr

P  = (labels == 1).sum()
Nn = (labels == 0).sum()
tp = rec_ * P
fp = tp * (1.0/np.clip(prec_, 1e-12, None) - 1.0)
fp = np.clip(fp, 0, Nn)
tn = Nn - fp
acc_s = (tp + tn) / (P + Nn)
f1_s  = 2*prec_*rec_ / (prec_ + rec_ + 1e-12)

t_accuracy  = float(thr_[np.nanargmax(acc_s)])
t_precision = float(thr_[np.nanargmax(prec_)])
t_recall    = float(thr_[np.nanargmax(rec_)])
t_f1        = float(thr_[np.nanargmax(f1_s)])

print({
    "t_accuracy":  t_accuracy,
    "t_precision": t_precision,
    "t_recall":    t_recall,
    "t_f1":        t_f1
})


{'t_accuracy': 0.8528860861439481, 't_precision': 0.5458427533802419, 't_recall': 9.598345924034816e-05, 't_f1': 0.15191662591242627}


In [35]:
t = t_f1  #
pred_t = pred_val.withColumn("pred_t", (F.col("score") >= F.lit(t)).cast("int"))

tp = pred_t.filter((F.col("label")==1) & (F.col("pred_t")==1)).count()
tn = pred_t.filter((F.col("label")==0) & (F.col("pred_t")==0)).count()
fp = pred_t.filter((F.col("label")==0) & (F.col("pred_t")==1)).count()
fn = pred_t.filter((F.col("label")==1) & (F.col("pred_t")==0)).count()

precision = tp / (tp + fp + 1e-12)
recall    = tp / (tp + fn + 1e-12)
accuracy  = (tp + tn) / (tp + tn + fp + fn + 1e-12)
f1        = 2*precision*recall / (precision + recall + 1e-12)
print(f"Acc: {accuracy:.4f}  Prec: {precision:.4f}  Rec: {recall:.4f}  F1: {f1:.4f}")


Acc: 0.8199  Prec: 0.2331  Rec: 0.4758  F1: 0.3129


In [30]:
import numpy as np

# 1) índice del mejor combo por métrica media (AUC)
i_best = int(np.argmax(mlp_model.avgMetrics))
print("AUC medio (mejor combo):", float(mlp_model.avgMetrics[i_best]))

# 2) ParamMap ganador
best_pm = mlp_model.getEstimatorParamMaps()[i_best]

def simplify_parammap(pm):
    out = {}
    for k, v in pm.items():
        # k puede ser Param(...) o str; nos quedamos con el nombre "limpio"
        if hasattr(k, "name"):                 # caso Param(...)
            name = k.name
        else:
            ks = str(k)
            if "__" in ks:
                name = ks.split("__")[-1]
            elif "." in ks:
                name = ks.split(".")[-1]
            else:
                # último recurso: intenta parsear name='...'
                import re
                m = re.search(r"name='([^']+)'", ks)
                name = m.group(1) if m else ks
        out[name] = v
    return out

best_params = simplify_parammap(best_pm)
print("Hiperparámetros ganadores:")
for k, v in best_params.items():
    print(f"  - {k}: {v}")

# 3) Accesos directos útiles
print("layers:", best_params.get("layers"))
print("maxIter:", best_params.get("maxIter"))

# 4) (opcional) confirmar desde el modelo entrenado
best_pipe = mlp_model.bestModel                  # PipelineModel
mlp_best  = best_pipe.stages[-1]                 # MultilayerPerceptronClassificationModel
try:
    layers_from_model = list(mlp_best._java_obj.getLayers())
    print("layers (desde el modelo):", layers_from_model)
except Exception:
    pass
print("numFeatures:", mlp_best.numFeatures, "numClasses:", mlp_best.numClasses)


AUC medio (mejor combo): 0.7491192328832751
Hiperparámetros ganadores:
  - layers: [103, 50, 2]
  - maxIter: 200
layers: [103, 50, 2]
maxIter: 200
layers (desde el modelo): [103, 50, 2]
numFeatures: 103 numClasses: 2


## Resumen.

### Métricas para Scikit-learn

AUC      : 0.7638652669990518 <br>
Accuracy : 0.8661996070388802 <br> 
Precision: 0.26704505640810033 <br>
Recall   : 0.3173189009159034 <br>
F1-score : 0.290019405654275

tiempo para un gridsearch: 13.48 minutos
Usando paralelización de 6 (n_jobs = 6) y un grid de 3x2x3 parametros con 3 folds

Para predicciones en test se tomó un tiempo de 1 min

### Métricas para PySpark

AUC :0.7545 <br> Accuracy: 0.8199 <br> Precision: 0.2331  <br>
Recall: 0.4758 <br> F1-score: 0.3129 

tiempo para un gridsearch: 0h 52m 10s
Usando paralelización de 6 y un grid de 3x2 parametros con 3 folds.

Para predicciones en test se tomó un tiempo de 13s

## Conclusiones.

Para ambos métodos se tuvo que ajustar probabilidades en la predicción binaria ya que daban resultados de Recall y F1-Score por el suelo. Esto conllevó a tener métricas bastante similares siendo las métricas de sci-kit mejores que las del MLP de PySpark. 
Por parte del tiempo, Pyspark se tarda mucho tiempo en la busqueda de parámetros en comparación con la de Sci-kit y teniendo en cuenta que habían menos combinaciones de parámetros para la ejecución de el MLP de PySpark que el de Sci-kit.


Por la parte de las predicciones en el conjunto de test, Pyspark fue menor comparándola con las de Sci-Kit. 
Todo esto es comparable ya que se usa la misma cantidad de datos y columnas. Asímismo se intentó lograr la misma paralelización (6 núcleos vs 6 núcleos)

PySpark siendo una de las estrategias usadas en el campo del Big Data no fue lo suficientemente útil en esta ocasión. 
Una de las causas podría ser la configuración dada al inicio, el tratamiento de los datos hecho o el hecho de que fue usado en ordenador windows sin usar el enlace con WSL2. 