# Procesamiento de Datos a Gran Escala

In [21]:
import findspark
findspark.init()
from pyspark import SparkConf
from pyspark.sql import SparkSession, SQLContext, functions as F

In [22]:
# Configuración local
configuraDierickBrochero = (
    SparkConf()
        .set("spark.scheduler.mode", "FAIR")
        .set("spark.executor.memory", "10G")
        .set("spark.executor.cores", "1")
        .set("spark.cores.max", "9")
        .set("spark.ui.port", "4040")
        .setMaster("local[*]")  # <---- Spark local usando todos los cores
        .setAppName("hpcsparkDierickBrochero")
)

# Crear sesión Spark local
sparkDierickBrochero = (
    SparkSession
        .builder
        .config(conf=configuraDierickBrochero)
        .getOrCreate()
)

sqlContext = SQLContext(sparkDierickBrochero.sparkContext)

sparkDierickBrochero

<p><strong>Objetivo: </strong> El objetivo de este cuaderno es crear un flujo sencillo de machine learning</p>

## Cargar de datos en un Dataframe

Este conjunto de datos consta de una etiqueta categórica con dos valores (buenos o malos), una variable categórica (color) y dos variables numéricas.

Si bien los datos son sintéticos, imaginemos que este conjunto de datos representa la salud del cliente de una empresa. La columna "color" representa una calificación de salud categórica hecha por un representante de servicio al cliente. La columna "laboratorio" representa la verdadera salud del cliente. Los otros dos valores son algunas medidas numéricas de actividad dentro de una aplicación (por ejemplo, minutos de permanencia en el sitio y compras).

Supongamos que queremos entrenar un modelo de clasificación en el que esperamos predecir una variable binaria, la etiqueta, a partir de los otros valores.

In [23]:
df = sparkDierickBrochero.read.csv("predictive_maintenance.csv", header=True, inferSchema=True)
df.orderBy("Type").show()

+---+----------+----+-------------------+-----------------------+----------------------+-----------+---------------+------+------------+
|UDI|Product ID|Type|Air temperature [K]|Process temperature [K]|Rotational speed [rpm]|Torque [Nm]|Tool wear [min]|Target|Failure Type|
+---+----------+----+-------------------+-----------------------+----------------------+-----------+---------------+------+------------+
| 53|    H29466|   H|              298.8|                  309.0|                  1497|       43.8|            147|     0|  No Failure|
|267|    H29680|   H|              298.0|                  308.1|                  1440|       43.7|             37|     0|  No Failure|
| 68|    H29481|   H|              298.8|                  308.9|                  1466|       44.1|            184|     0|  No Failure|
| 12|    H29425|   H|              298.6|                  309.1|                  1423|       44.3|             29|     0|  No Failure|
| 81|    H29494|   H|              298.8|

In [24]:
df = (
    df
    .withColumnRenamed("Product ID", "product_id")
    .withColumnRenamed("Air temperature [K]", "air_temp")
    .withColumnRenamed("Process temperature [K]", "process_temp")
    .withColumnRenamed("Rotational speed [rpm]", "rotational_speed")
    .withColumnRenamed("Torque [Nm]", "torque")
    .withColumnRenamed("Tool wear [min]", "tool_wear")
    .withColumnRenamed("Failure Type", "failure_Type")
)

df.show(5)

+---+----------+----+--------+------------+----------------+------+---------+------+------------+
|UDI|product_id|Type|air_temp|process_temp|rotational_speed|torque|tool_wear|Target|failure_Type|
+---+----------+----+--------+------------+----------------+------+---------+------+------------+
|  1|    M14860|   M|   298.1|       308.6|            1551|  42.8|        0|     0|  No Failure|
|  2|    L47181|   L|   298.2|       308.7|            1408|  46.3|        3|     0|  No Failure|
|  3|    L47182|   L|   298.1|       308.5|            1498|  49.4|        5|     0|  No Failure|
|  4|    L47183|   L|   298.2|       308.6|            1433|  39.5|        7|     0|  No Failure|
|  5|    L47184|   L|   298.2|       308.7|            1408|  40.0|        9|     0|  No Failure|
+---+----------+----+--------+------------+----------------+------+---------+------+------------+
only showing top 5 rows


In [25]:
minority = df.filter(df["Target"] == 1)
majority = df.filter(df["Target"] == 0)

# Muestreo aleatorio de la clase mayoritaria
majority_downsampled = majority.sample(
    withReplacement=False, 
    fraction=minority.count()/majority.count(), 
    seed=42
)

balancedDF = majority_downsampled.union(minority)
balancedDF.groupBy("Target").count().show()

+------+-----+
|Target|count|
+------+-----+
|     0|  366|
|     1|  339|
+------+-----+



## Transformaciones

El conjunto de datos actual no cumple con el requisito de estar en formato de Vector y, por lo tanto, debemos transformarlo al formato adecuado.

Para lograr esto en nuestro ejemplo, vamos a especificar una RFormula. Este es un lenguaje declarativo para especificar transformaciones de aprendizaje automático y es fácil de usar una vez que comprende la sintaxis.

Los operadores básicos de RFormula son:
<p>
<p>"~" Destino y términos separados</p>
<p>"+" Términos de Concat; "+ 0" significa eliminar la intersección (esto significa que la intersección y de la línea que ajustaremos será 0)</p>
<p>"-" Eliminar un término; "- 1" significa eliminar la intersección (esto significa que la intersección y de la línea que vamos a ajustar será 0; sí, esto hace lo mismo que "+ 0"</p>
<p>":" Interacción (multiplicación de valores numéricos o valores categóricos binarizados)</p>
<p>"." Todas las columnas excepto la variable objetivo / dependiente</p>
</p>

Para especificar transformaciones con esta sintaxis, necesitamos importar la clase RFormula. Luego pasamos por el proceso de definir nuestra fórmula. En este caso, queremos usar todas las variables disponibles (el ".") Y también agregar las interacciones entre valor1 y color y valor2 y color, tratándolas como características nuevas:

In [26]:
from pyspark.ml.feature import RFormula
supervised = RFormula(formula="Target ~ . + Type:air_temp + Type:process_temp + Type:rotational_speed + Type:torque + Type:tool_wear + Type:failure_Type",)

El siguiente paso es ajustar el transformador RFormula a los datos para que descubra los posibles valores de cada columna.

No todos los transformadores tienen este requisito, pero debido a que RFormula manejará automáticamente las variables categóricas por nosotros, necesita determinar qué columnas son categóricas y cuáles no, así como cuáles son los valores distintos de las columnas categóricas.

Por esta razón, tenemos que llamar al método fit. Una vez que llamamos a fit, devuelve una versión "entrenada" de nuestro transformador que luego podemos usar para transformar nuestros datos.

Luego llamamos a transform en ese objeto para transformar nuestros datos de entrada en los datos de salida esperados.

In [27]:
fittedRF = supervised.fit(balancedDF) # Ajusta
preparedDF = fittedRF.transform(balancedDF) # Transforma
preparedDF.select("features", "Target").show(5)

+--------------------+------+
|            features|Target|
+--------------------+------+
|(750,[0,503,706,7...|     0|
|(750,[0,1,707,708...|     0|
|(750,[0,504,706,7...|     0|
|(750,[0,60,705,70...|     0|
|(750,[0,64,705,70...|     0|
+--------------------+------+
only showing top 5 rows


En la salida podemos ver el resultado de nuestra transformación: una columna llamada características que tiene nuestros datos sin procesar.

Lo que sucede detrás de escena es bastante simple: RFormula inspecciona nuestros datos durante la llamada de ajuste y genera un objeto que transformará nuestros datos de acuerdo con la fórmula especificada.

Cuando usamos este transformador, Spark convierte automáticamente nuestra variable categórica en Dobles para que podamos ingresarla en un modelo de aprendizaje automático.

En particular, asigna un valor numérico a cada color posible.
categoría, crea características adicionales para las variables de interacción entre colores y valor1 / valor2, y las coloca todas en un solo vector.

Creemos ahora un conjunto de prueba simple basado en una división aleatoria de los datos:

In [28]:
train, test = preparedDF.randomSplit([0.7, 0.3])

## Estimators

En este caso usaremos un algoritmo de clasificación llamado regresión logística.

Para crear nuestro clasificador, creamos una instancia de LogisticRegression, usando la configuración predeterminada o los hiperparámetros.

Luego configuramos las columnas de etiquetas y las columnas de características.

In [29]:
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(labelCol="Target",featuresCol="features")

Antes de comenzar a entrenar este modelo, inspeccionemos los parámetros.

Este método muestra una explicación de todos los parámetros para la implementación de Spark de la regresión logística.

El método "explainParams" existe en todos los algoritmos disponibles en MLlib.

In [30]:
print(lr.explainParams())

aggregationDepth: suggested depth for treeAggregate (>= 2). (default: 2)
elasticNetParam: the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty. (default: 0.0)
family: The name of family which is a description of the label distribution to be used in the model. Supported options: auto, binomial, multinomial (default: auto)
featuresCol: features column name. (default: features, current: features)
fitIntercept: whether to fit an intercept term. (default: True)
labelCol: label column name. (default: label, current: Target)
lowerBoundsOnCoefficients: The lower bounds on coefficients if fitting under bound constrained optimization. The bound matrix must be compatible with the shape (1, number of features) for binomial regression, or (number of classes, number of features) for multinomial regression. (undefined)
lowerBoundsOnIntercepts: The lower bounds on intercepts if fitting under bound constrained optimization. Th

Al crear una instancia de un algoritmo no entrenado, llega el momento de ajustarlo a los datos (entrenarlo). En este caso, esto devuelve un LogisticRegressionModel.

Este código iniciará un trabajo de Spark para entrenar el modelo. A diferencia de las transformaciones, el ajuste de un modelo de aprendizaje automático es ansioso y se realiza de inmediato.

In [31]:
fittedLR = lr.fit(train)

Una vez completado, puede usar el modelo para hacer predicciones. Lógicamente, esto significa transformar características en etiquetas.

Hacemos predicciones con el método transform. Por ejemplo, podemos transformar nuestro conjunto de datos de entrenamiento para ver qué etiquetas asignó nuestro modelo a los datos de entrenamiento y cómo se comparan con los resultados reales.

Realicemos esa predicción con el siguiente fragmento de código:

In [32]:
fittedLR.transform(train).select("Target", "features").show(50)

+------+--------------------+
|Target|            features|
+------+--------------------+
|     0|(750,[0,503,706,7...|
|     0|(750,[0,504,706,7...|
|     0|(750,[0,60,705,70...|
|     0|(750,[0,64,705,70...|
|     0|(750,[0,65,705,70...|
|     0|(750,[0,507,706,7...|
|     0|(750,[0,508,706,7...|
|     0|(750,[0,72,705,70...|
|     0|(750,[0,77,705,70...|
|     0|(750,[0,80,705,70...|
|     0|(750,[0,2,707,708...|
|     0|(750,[0,510,706,7...|
|     0|(750,[0,81,705,70...|
|     0|(750,[0,513,706,7...|
|     0|(750,[0,84,705,70...|
|     0|(750,[0,88,705,70...|
|     0|(750,[0,3,707,708...|
|     0|(750,[0,90,705,70...|
|     0|(750,[0,93,705,70...|
|     0|(750,[0,95,705,70...|
|     0|(750,[0,520,706,7...|
|     0|(750,[0,99,705,70...|
|     0|(750,[0,101,705,7...|
|     0|(750,[0,102,705,7...|
|     0|(750,[0,104,705,7...|
|     0|(750,[0,107,705,7...|
|     0|(750,[0,111,705,7...|
|     0|(750,[0,112,705,7...|
|     0|(750,[0,115,705,7...|
|     0|(750,[0,527,706,7...|
|     0|(7

In [33]:
from pyspark.sql.functions import col
fittedLR.transform(train).select("Target", "features").filter(col("Target") == 1).show(5)

+------+--------------------+
|Target|            features|
+------+--------------------+
|     1|(750,[0,57,705,70...|
|     1|(750,[0,58,705,70...|
|     1|(750,[0,59,705,70...|
|     1|(750,[0,61,705,70...|
|     1|(750,[0,62,705,70...|
+------+--------------------+
only showing top 5 rows


Nuestro siguiente paso sería evaluar manualmente este modelo y calcular métricas de rendimiento como la tasa de verdaderos positivos, la tasa de falsos negativos, etc.

## Pipelines

Como probablemente haya notado, si está realizando muchas transformaciones, escribir todos los pasos y realizar un seguimiento de DataFrames termina siendo bastante tedioso.

Por eso Spark incluye el concepto Pipeline.

Tenga en cuenta que es esencial que las instancias de transformadores o modelos no se reutilicen en diferentes Pipeline. Cree siempre una nueva instancia de un modelo antes de crear otra Pipeline.

Para asegurarnos de no sobreajustarnos, crearemos un conjunto de pruebas de holdout(un método de validación) y ajustaremos nuestros hiperparámetros en función de un conjunto de validación (tenga en cuenta que creamos este conjunto de validación basado en el conjunto de datos original, no en el preparedDF):

In [34]:
train_pip, test_pip = balancedDF.randomSplit([0.7, 0.3])

Ahora que tiene un conjunto de entrenamiento y prueba, creemos las stages base en nuestra Pipeline.

Una stage simplemente representa un transformador o un estimador. En nuestro caso, tendremos dos estimadores. La RFomula y el LogisticRegresión:

In [35]:
from pyspark.ml.feature import VectorAssembler, StringIndexer

rForm = RFormula().setFeaturesCol("features").setLabelCol("label")
lr = LogisticRegression().setLabelCol("label").setFeaturesCol("features")

type_indexer = StringIndexer(inputCol="Type", outputCol="Type_indexed")

assembler = VectorAssembler(
    inputCols=["air_temp", "process_temp", "rotational_speed", "torque", "tool_wear", "Type_indexed"],
    outputCol="features"
)

Ahora, en lugar de usar manualmente nuestras transformaciones y luego ajustar nuestro modelo, simplemente las hacemos stages en la Pipeline general, como en el siguiente fragmento de código:

In [36]:
from pyspark.ml import Pipeline
stages = [type_indexer, assembler, lr]
pipeline = Pipeline().setStages(stages)

## Entrenamiento y Evaluación

Ahora que organizó la Pipeline, el siguiente paso es el Entrenamiento.

En este caso, no entrenaremos solo un modelo. Entrenaremos varias variaciones del modelo especificando diferentes combinaciones de hiperparámetros que nos gustaría que Spark probara.

Luego, seleccionaremos el mejor modelo usando un evaluador que compara sus predicciones con nuestros datos de validación.

Podemos probar diferentes hiperparámetros en toda la Pipeline, incluso en la fórmula de RF que usamos para manipular los datos sin procesar.

En nuestro ParamGridBuilder, hay tres hiperparámetros que varían de los valores predeterminados:
<li>Dos versiones diferentes de RFormula</li>
<li>Tres opciones diferentes para el parámetro ElasticNet</li>
<li>Dos opciones diferentes para el parámetro de regularización</li>
Esto nos da un total de 12 combinaciones diferentes de estos parámetros, lo que significa que entrenaremos 12 versiones diferentes de regresión logística.

In [37]:
from pyspark.ml.tuning import ParamGridBuilder
params = ParamGridBuilder()\
    .addGrid(rForm.formula, [
        "Target ~ . + Type:air_temp",
        "Target ~ . + Type:air_temp + Type:process_temp",
        "Target ~ . + Type:air_temp + Type:process_temp + Type:rotational_speed",
        "Target ~ . + Type:air_temp + Type:process_temp + Type:rotational_speed + Type:torque",
    ])\
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])\
    .addGrid(lr.regParam, [0.1, 2.0])\
    .build()

Ahora que la cuadrícula está construida, es hora de especificar nuestro proceso de evaluación. El evaluador nos permite comparar de forma automática y objetiva varios modelos con la misma métrica de evaluación.

En este caso usaremos el BinaryClassificationEvaluator, que tiene una serie de métricas de evaluación potenciales.

En este caso usaremos areaUnderROC, que es el área total bajo la característica operativa del receptor, una medida común de desempeño de clasificación.

In [38]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Configurar evaluador
evaluator = BinaryClassificationEvaluator(
    labelCol="label",
    rawPredictionCol="rawPrediction",
    metricName="areaUnderROC"
)

Ahora que tenemos una canalización que especifica cómo se deben transformar nuestros datos, realizaremos la selección del modelo para probar diferentes hiperparámetros en nuestro modelo de regresión logística y medir el éxito comparando su desempeño usando la métrica areaUnderROC.

In [39]:
from pyspark.ml.tuning import TrainValidationSplit
tvs = TrainValidationSplit()\
    .setTrainRatio(0.75)\
    .setEstimatorParamMaps(params)\
    .setEstimator(pipeline)\
    .setEvaluator(evaluator)


In [43]:
from pyspark.ml.tuning import TrainValidationSplit

# Configurar TrainValidationSplit
tvs = TrainValidationSplit(
    estimator=pipeline,
    estimatorParamMaps=params,
    evaluator=evaluator,
    trainRatio=0.8,  # Mayor proporción para entrenamiento
    seed=42
)

Ejecutemos toda la Pipeline que construimos. Para revisar, la ejecución de esta canalización probará todas las versiones del modelo con el conjunto de validación.

Tambien se evalua cómo funciona el algoritmo con el conjunto de prueba:

In [44]:
auc = evaluator.evaluate(tvs.transform(test_pip))
print(f"Area under ROC: {auc}")

AttributeError: 'TrainValidationSplit' object has no attribute 'transform'

25/11/20 20:55:45 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 366813 ms exceeds timeout 120000 ms
25/11/20 20:55:45 WARN SparkContext: Killing executors is not supported by current scheduler.
25/11/20 20:55:46 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:53)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:342)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:132)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$$

Tambien se evalua cómo funciona el algoritmo con el conjunto de prueba:

In [None]:
evaluator.evaluate(tvsFitted.transform(test_pip))