### Importar librerías a utilizar

In [1]:
import sys
from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import pyspark.sql.functions as F
from pyspark.sql.functions import col, expr
from pyspark.sql.types import LongType
from pyspark.sql.functions import mean, stddev, min, max, last

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
0,application_1687906217705_0001,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

La sesión de PySpark es generada automáticamente por AWS-EMR, por lo que no es necesario crearla manualmente. Se configura en función de los recursos que se le asignen al cluster.

### Funciones auxiliares

In [None]:
def shape(data):
    num_rows = data.count()
    num_columns = len(data.columns)
    print("Shape of the Dataframe: ({}, {})".format(num_rows, num_columns))

### Cargar datos

In [None]:
data_path = "s3://proyecto-big-data/amex-partitioned-data/"

- Se cargan los datos desde el bucket de S3. Se utiliza el formato parquet, que es el formato nativo de PySpark.
-  Adicionalmente, se hace type-casting de los IDs y de la fecha.
-  Finalmente, se muestran se imputan los valores null.

In [4]:
NAN_VALUE = -127 # Valor mínimo numérico posible
data = spark.read.option("inferSchema", "true").option("header", "true").parquet(data_path + "train.parquet")
data = data.withColumn("customer_ID", data.customer_ID.cast('string'))
data = data.withColumn('S_2', data.S_2.cast('timestamp'))
data = data.fillna(NAN_VALUE)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Preprocesamiento

Cada cliente cuenta con múltiples registros, por lo que se debe generar un único registro por cliente. Para esto, se realiza un agrupamiento por cliente calculando funciones de agregación. Siguiendo la siguiente lógica:

Para las variables numéricas se calcula:
- mean, std, min, max, last

Para las variables categóricas se calcula:
- count, last, count distinct

In [5]:
def preprocess_data(data):
    all_cols = [c for c in list(data.columns) if c not in ['customer_ID','S_2']]
    cat_features = ["B_30","B_38","D_114","D_116","D_117","D_120","D_126","D_63","D_64","D_66","D_68"]
    num_features = [col for col in all_cols if col not in cat_features]

    num_aggregation_exprs = []
    for col_name in num_features:
        num_aggregation_exprs.extend([
            F.mean(col_name).alias(f"{col_name}_mean"),
            F.stddev(col_name).alias(f"{col_name}_std"),
            F.min(col_name).alias(f"{col_name}_min"),
            F.max(col_name).alias(f"{col_name}_max"),
            F.last(col_name).alias(f"{col_name}_last"),
        ])

    # Apply the dynamic aggregations on the DataFrame
    num_aggregated_data = data.groupBy('customer_ID').agg(*num_aggregation_exprs)
    
    cat_aggregation_exprs = []
    for col_name in cat_features:
        cat_aggregation_exprs.extend([
            F.count(col_name).alias(f"{col_name}_count"),
            F.last(col_name).alias(f"{col_name}_last"),
            F.countDistinct(col_name).alias(f"{col_name}_distinct"),
        ])

    # Apply the dynamic aggregations on the DataFrame
    cat_aggregated_data = data.groupBy('customer_ID').agg(*cat_aggregation_exprs)
    return num_aggregated_data.join(cat_aggregated_data, on="customer_ID", how='inner')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Apliquemos este preprocesamiento a los datos. Se muestra las dimensiones del dataset antes y después del preprocesamiento.

In [6]:
shape(data)
data = preprocess_data(data)
shape(data)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Shape of the Dataframe: (5531451, 190)
Shape of the Dataframe: (458913, 919)

Se muestran las primeras 5 filas del dataset preprocesado como ejemplo.

In [7]:
data.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+------------------+--------------------+----------+----------+----------+-----------------+-----------------+--------+--------+---------+--------------------+--------------------+------------+-----------+------------+-------------------+--------------------+-----------+----------+-----------+--------------------+--------------------+------------+-----------+------------+-------------------+--------------------+----------+----------+---------+---------+--------+--------+--------+---------+--------------------+--------------------+------------+-----------+------------+--------------------+--------------------+-----------+-----------+-----------+-------------------+--------------------+-----------+-----------+-----------+-------------------+-------------------+--------+--------+---------+-------------------+------------------+-------+-------+--------+--------------------+--------------------+-----------+-----------+-----------+--------------------+-------------------

Se carga el archivo con los labels de cada cliente.

In [8]:
labels = spark.read.option("inferSchema", "true").option("header", "true").csv(data_path+"train_labels.csv")
labels = labels.withColumn("customer_ID", labels.customer_ID.cast('string'))
labels.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+------+
|         customer_ID|target|
+--------------------+------+
|0000099d6bd597052...|     0|
|00000fd6641609c6e...|     0|
|00001b22f846c82c5...|     0|
|000041bdba6ecadd8...|     0|
|00007889e4fcd2614...|     0|
|000084e5023181993...|     0|
|000098081fde4fd64...|     0|
|0000d17a1447b25a0...|     0|
|0000f99513770170a...|     1|
|00013181a0c5fc8f1...|     1|
|0001337ded4e1c253...|     1|
|00013c6e1cec7c21b...|     1|
|0001812036f155833...|     1|
|00018dd4932409baf...|     0|
|000198b3dc70edd65...|     0|
|000201146e53cacdd...|     0|
|0002d381bdd8048d7...|     0|
|0002e335892f7998f...|     1|
|00031e8be98bc3411...|     0|
|000333075fb8ec6d5...|     1|
+--------------------+------+
only showing top 20 rows

Se hace inner join entre los regresores y los labels del cliente.

In [9]:
data = data.join(labels, on="customer_ID", how='inner')
shape(data)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Shape of the Dataframe: (458913, 920)

### Importar librerías de Machine Learning de PySpark

In [69]:
import time
import builtins
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator, CrossValidatorModel
from pyspark.ml.classification import RandomForestClassifier, RandomForestClassificationModel

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Se divide el dataset en train y test (70% y 30% respectivamente). Con un random seed de 42.

In [11]:
SEED = 42
(train_data, test_data) = data.randomSplit([0.7, 0.3], seed = SEED)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Las dimensiones de los datasets de train y test son las siguientes:

In [12]:
shape(train_data)
shape(test_data)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Shape of the Dataframe: (321423, 920)
Shape of the Dataframe: (137490, 920)

### Vector Assembler
Se utiliza VectorAssembler para generar un vector con las variables de entrada del modelo.

In [13]:
feature_cols = [c for c in list(train_data.columns) if c not in ['customer_ID','S_2','target']]
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
train_data = assembler.setHandleInvalid("skip").transform(train_data)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# Entrenamiento de modelos
Se entrena un modelo de Random Forest realizando un grid search para encontrar los mejores hiperparámetros. Se utiliza 3-fold cross validation.

In [15]:
NUM_FOLDS = 3

rf = RandomForestClassifier(labelCol="target", featuresCol="features")

param_grid = ParamGridBuilder() \
    .addGrid(rf.maxDepth, [5, 10]) \
    .addGrid(rf.numTrees, [20, 50]) \
    .build()

evaluator = MulticlassClassificationEvaluator(labelCol='target', metricName="precisionByLabel")

crossval = CrossValidator(estimator=rf,
                          estimatorParamMaps=param_grid,
                          evaluator=evaluator,
                          numFolds=NUM_FOLDS)

start_time = time.time()

cv_model = crossval.fit(train_data)

end_time = time.time()
fit_time = end_time - start_time
print("Fit Time: {:.2f} seconds".format(fit_time))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Fit Time: 4138.47 seconds

Resultados de los modelos candidatos:

In [17]:
avg_metrics = cv_model.avgMetrics
for i, params in enumerate(param_grid):
    print("Candidate {}: Avg. Metric = {}".format(i+1, avg_metrics[i]))
    print("Params: {}".format(params))
    print("-" * 50)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Candidate 1: Avg. Metric = 0.9040087483757906
Params: {Param(parent='RandomForestClassifier_82d69759480c', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes.'): 5, Param(parent='RandomForestClassifier_82d69759480c', name='numTrees', doc='Number of trees to train (>= 1).'): 20}
--------------------------------------------------
Candidate 2: Avg. Metric = 0.9005120456389684
Params: {Param(parent='RandomForestClassifier_82d69759480c', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes.'): 5, Param(parent='RandomForestClassifier_82d69759480c', name='numTrees', doc='Number of trees to train (>= 1).'): 50}
--------------------------------------------------
Candidate 3: Avg. Metric = 0.9209274065264259
Params: {Param(parent='RandomForestClassifier_82d69759480c', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth

Se selecciona el mejor modelo obtenido entre los candidatos.

In [18]:
best_model = cv_model.bestModel

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Validación del modelo
Se ejecutan las predicciones sobre el dataset de test.

In [19]:
# Transform the test data
test_data = assembler.setHandleInvalid("skip").transform(test_data)

# Make predictions on the test data
predictions = best_model.transform(test_data)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Se calcula el área bajo la curva ROC, precisión y accuracy.

In [72]:
# Evaluate precision
evaluator_precision = MulticlassClassificationEvaluator(labelCol='target', metricName="precisionByLabel")
precision = evaluator_precision.evaluate(predictions)

# Evaluate AUC-ROC
evaluator_auc = BinaryClassificationEvaluator(labelCol='target', metricName="areaUnderROC")
auc_roc = evaluator_auc.evaluate(predictions)

# Evaluate accuracy
evaluator_accuracy = MulticlassClassificationEvaluator(labelCol='target', metricName="accuracy")
accuracy = evaluator_accuracy.evaluate(predictions)

print("Precision:", builtins.round(precision,2))
print("AUC-ROC:", builtins.round(float(auc_roc),2))
print("Accuracy:", builtins.round(float(accuracy),2))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Precision: 0.92
AUC-ROC: 0.95
Accuracy: 0.89

Guardar el modelo en formato de PySpark dentro del bucket S3 para su posterior uso sin necesidad de entrenar el modelo nuevamente.

In [73]:
model_path = "s3://proyecto-big-data/amex-model/"

best_model.save(model_path)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Se comprueba que el modelo se haya guardado correctamente cargándolo y ejecutando predicciones sobre el dataset de test. Se espera que el resultado sea el mismo que el obtenido anteriormente.

In [74]:
# Load the saved model
loaded_model = RandomForestClassificationModel.load(model_path)

# Use the loaded model for predictions
predictions = loaded_model.transform(test_data)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [76]:
# Evaluate precision
evaluator_precision = MulticlassClassificationEvaluator(labelCol='target', metricName="precisionByLabel")
precision = evaluator_precision.evaluate(predictions)

# Evaluate AUC-ROC
evaluator_auc = BinaryClassificationEvaluator(labelCol='target', metricName="areaUnderROC")
auc_roc = evaluator_auc.evaluate(predictions)

# Evaluate accuracy
evaluator_accuracy = MulticlassClassificationEvaluator(labelCol='target', metricName="accuracy")
accuracy = evaluator_accuracy.evaluate(predictions)

print("Precision:", builtins.round(precision,2))
print("AUC-ROC:", builtins.round(float(auc_roc),2))
print("Accuracy:", builtins.round(float(accuracy),2))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Precision: 0.92
AUC-ROC: 0.95
Accuracy: 0.89

Efectivamente, el modelo se guardó correctamente y los resultados obtenidos son sumamente satisfactorios, por lo que es posible afirmar que el modelo es capaz de predecir si un cliente va a ser moroso o no con un 92% de precisión.