### Proceso de Machine Learning
1. ETL **(Raw RDD Processing, Structured Data Transformation)** con SPARK 
2. Exploration **(DataFrame, Dataset)** con SPARK SQL 
3. ML **(Feature Engineering -> Parameter Tuning)** con SPARK MLLIB
4. Model Training 
5. Data Product

### Feature Engineering
- **Vector Asembler :** Se utiliza básicamente para concatenar todas las características en un solo vector que se puede pasar al estimador o al algoritmo ML.
- **Agrupamiento :** Es el método más sencillo para convertir las variables continuas en variables categóricas.
- **Escalado y normalización :** Tarea común en variables continuas. Permite que los datos tengan una distribución normal.
- **MinMaxScaler y StandardScaler :** Estandarizan las características con una media 0 y una desviación estándard de 1.
- **StringIndexer :** Para convertir características categóricas en numéricas.

# Predicción: ¿Una persona va a gastar más de $50 000 en gastos médicos?

## Paso 1. Cargar el Dataset

Incluidos los nombres y tipo de dato a las columnas, el dataset no lo incluye

In [0]:
# Definimos el esquema del dataset
schema = """`age` DOUBLE,
`workclass` STRING,
`fnlwgt` DOUBLE,
`education` STRING,
`education_num` DOUBLE,
`marital_status` STRING,
`occupation` STRING,
`relationship` STRING,
`race` STRING,
`sex` STRING,
`capital_gain` DOUBLE,
`capital_loss` DOUBLE,
`hours_per_week` DOUBLE,
`native_country` STRING,
`income` STRING"""

# Importamos los datos
data = spark.read.csv("/databricks-datasets/adult/adult.data", schema=schema)
display(data)

age,workclass,fnlwgt,education,education_num,marital_status,occupation,relationship,race,sex,capital_gain,capital_loss,hours_per_week,native_country,income
39.0,State-gov,77516.0,Bachelors,13.0,Never-married,Adm-clerical,Not-in-family,White,Male,2174.0,0.0,40.0,United-States,<=50K
50.0,Self-emp-not-inc,83311.0,Bachelors,13.0,Married-civ-spouse,Exec-managerial,Husband,White,Male,0.0,0.0,13.0,United-States,<=50K
38.0,Private,215646.0,HS-grad,9.0,Divorced,Handlers-cleaners,Not-in-family,White,Male,0.0,0.0,40.0,United-States,<=50K
53.0,Private,234721.0,11th,7.0,Married-civ-spouse,Handlers-cleaners,Husband,Black,Male,0.0,0.0,40.0,United-States,<=50K
28.0,Private,338409.0,Bachelors,13.0,Married-civ-spouse,Prof-specialty,Wife,Black,Female,0.0,0.0,40.0,Cuba,<=50K
37.0,Private,284582.0,Masters,14.0,Married-civ-spouse,Exec-managerial,Wife,White,Female,0.0,0.0,40.0,United-States,<=50K
49.0,Private,160187.0,9th,5.0,Married-spouse-absent,Other-service,Not-in-family,Black,Female,0.0,0.0,16.0,Jamaica,<=50K
52.0,Self-emp-not-inc,209642.0,HS-grad,9.0,Married-civ-spouse,Exec-managerial,Husband,White,Male,0.0,0.0,45.0,United-States,>50K
31.0,Private,45781.0,Masters,14.0,Never-married,Prof-specialty,Not-in-family,White,Female,14084.0,0.0,50.0,United-States,>50K
42.0,Private,159449.0,Bachelors,13.0,Married-civ-spouse,Exec-managerial,Husband,White,Male,5178.0,0.0,40.0,United-States,>50K


#### Dividir aleatoriamente los datos en conjuntos de entrenamiento(train) y prueba(test).

Es mejor dividir los datos antes de realizar cualquier preprocesamiento. Esto permite que el conjunto de datos de **prueba(test)** simule más de cerca los nuevos datos cuando evaluamos el modelo.

In [0]:
# La mayor parte se utilizan para el conjunto de datos de entrenamiento, por ejemplo: 80%
# La menor parte se utilizan para el conjunto de datos de prueba, por ejemplo 20%

# seed : para umentar la reproductibilidad del modelo
trainDF, testDF = data.randomSplit([0.8,0.2], seed=42) # Definimos que el 80% de los datos serán para entrenamiento y el 20% para test

# Almacenamos la data de entrenamiento en caché, para que su uso se más rápido
print(trainDF.cache().count()) # cantidad de datos definidos para entrenamiento (train)
print(testDF.count()) # cantidad de datos definidos para prueba (test)

26076
6485


Revisamos la data

In [0]:
display(trainDF)

age,workclass,fnlwgt,education,education_num,marital_status,occupation,relationship,race,sex,capital_gain,capital_loss,hours_per_week,native_country,income
17.0,?,34019.0,10th,6.0,Never-married,?,Own-child,White,Male,0.0,0.0,20.0,United-States,<=50K
17.0,?,34088.0,12th,8.0,Never-married,?,Own-child,White,Female,0.0,0.0,25.0,United-States,<=50K
17.0,?,47407.0,11th,7.0,Never-married,?,Own-child,White,Male,0.0,0.0,10.0,United-States,<=50K
17.0,?,48703.0,11th,7.0,Never-married,?,Own-child,White,Female,0.0,0.0,30.0,United-States,<=50K
17.0,?,48751.0,11th,7.0,Never-married,?,Own-child,Black,Female,0.0,0.0,40.0,United-States,<=50K
17.0,?,67808.0,10th,6.0,Never-married,?,Own-child,White,Male,0.0,0.0,40.0,United-States,<=50K
17.0,?,86786.0,10th,6.0,Never-married,?,Own-child,White,Female,0.0,0.0,40.0,United-States,<=50K
17.0,?,89870.0,10th,6.0,Never-married,?,Own-child,White,Male,0.0,0.0,40.0,United-States,<=50K
17.0,?,94366.0,10th,6.0,Never-married,?,Other-relative,White,Male,0.0,0.0,6.0,United-States,<=50K
17.0,?,103810.0,12th,8.0,Never-married,?,Own-child,White,Male,0.0,0.0,40.0,United-States,<=50K


#### Comenzamos analizando la distribución del N° de horas que trabaja por semana

In [0]:
display(trainDF.select("hours_per_week").summary())

summary,hours_per_week
count,26076.0
mean,40.4284782942169
stddev,12.404569739132008
min,1.0
25%,40.0
50%,40.0
75%,45.0
max,99.0


#### Ahora analizamos el estado de la educación

In [0]:
# Agrupamos en base al tipo de educación y realizamos un conteo
display(trainDF.groupBy("education").count().sort("count",ascending=False))

education,count
HS-grad,8408
Some-college,5860
Bachelors,4255
Masters,1388
Assoc-voc,1102
11th,958
Assoc-acdm,845
10th,748
7th-8th,510
Prof-school,465


Databricks visualization. Run in Databricks to view.

#### Transformadores, estimadores y pipelines (tuberías)
- **Transformer :** Toma un DataFrame como entrada y devuelve un nuevo DataFrame. Los **transformadores** no aprenden ningún parámetro de los datos y simplemente aplican transformaciones basadas en reglas para preparar los datos para el entrenamiento del modelo o generar predicciones usando un modelo MLlib entrenado. Usa el método **.transform()**.
- **Estimador :** Aprende los parámetros del DataFrame a través de un método **.fit()** y devuleve un modelo, que es un transformador.
- **Pipeline :** Combina varios pasos en un solo flujo de trabajo. Ayudan a automatizar el proceso.

## Paso 2. Preprocesamiento de características

El objetivo es constrir un modelo que prediga el nivel de "ingresos" a partir de características incluidas en el dataset **(nivel de educación,estado civil,etc)**. <br>
Lo primero es manipular o procesar las características que estén en el formato que requiere **MLlib**.

#### Convertir variables categóricas en numéricas <br>
El dataset incluye características categóricas como **educación, ocupación y estado civil**. <br>

`StringIndexer` y` OneHotEncoder` para convertir variables categóricas en un conjunto de variables numéricas que solo toman los valores 0 y 1.

- **StringIndexer** convierte una columna de valores de cadena en una columna de índices de etiquetas. Por ejemplo, podría convertir los valores "rojo", "azul" y "verde" en 0, 1 y 2.
- **OneHotEncoder** mapea una columna de índices de categoría a una columna de vectores binarios, con como máximo un "1" en cada fila que indica el índice de categoría para esa fila.

In [0]:
# Libería que utilizaremos para la categorización
from pyspark.ml.feature import StringIndexer, OneHotEncoder

#Definimos las columnas que categorizaremos (DE CADENA A NÚMEROS)
catcol = ["workclass","education","marital_status","occupation","relationship","race","sex"]

#Las siguientes dos líneas son estimadores. Devuelven funciones que luego aplicaremos para transformar el conjunto de datos.

# Para categorizar, definimos las variables categóricas de entrada, en la salida recorremos que será cada categoría
stringIndexer = StringIndexer(inputCols=catcol, outputCols=[x + "Index" for x in catcol]) 
# Definimos el OneHotEncode, tomando el StringIndexer para la entrada, en la salida recorremos las categorías
encoder = OneHotEncoder(inputCols=stringIndexer.getOutputCols(), outputCols=[x + "OHE" for x in catcol]) 

# Varible objetivo : "income"

# La columna label ("income") también es un valor de cadena: tiene dos valores posibles, "<50K" y ">50K".
# Convertimos a valor numérico usando StringIndexer
labelToIndex = StringIndexer(inputCol="income", outputCol="label") # definimos "label" para dar a entender que es la variable que queremos predecir ("income")

In [0]:
# Entrenamos con el dataset de train
StringIndexerModel = stringIndexer.fit(trainDF)
# Transformamos el dataset
display(StringIndexerModel.transform(trainDF))

age,workclass,fnlwgt,education,education_num,marital_status,occupation,relationship,race,sex,capital_gain,capital_loss,hours_per_week,native_country,income,workclassIndex,educationIndex,marital_statusIndex,occupationIndex,relationshipIndex,raceIndex,sexIndex
17.0,?,34019.0,10th,6.0,Never-married,?,Own-child,White,Male,0.0,0.0,20.0,United-States,<=50K,3.0,7.0,1.0,7.0,2.0,0.0,0.0
17.0,?,34088.0,12th,8.0,Never-married,?,Own-child,White,Female,0.0,0.0,25.0,United-States,<=50K,3.0,11.0,1.0,7.0,2.0,0.0,1.0
17.0,?,47407.0,11th,7.0,Never-married,?,Own-child,White,Male,0.0,0.0,10.0,United-States,<=50K,3.0,5.0,1.0,7.0,2.0,0.0,0.0
17.0,?,48703.0,11th,7.0,Never-married,?,Own-child,White,Female,0.0,0.0,30.0,United-States,<=50K,3.0,5.0,1.0,7.0,2.0,0.0,1.0
17.0,?,48751.0,11th,7.0,Never-married,?,Own-child,Black,Female,0.0,0.0,40.0,United-States,<=50K,3.0,5.0,1.0,7.0,2.0,1.0,1.0
17.0,?,67808.0,10th,6.0,Never-married,?,Own-child,White,Male,0.0,0.0,40.0,United-States,<=50K,3.0,7.0,1.0,7.0,2.0,0.0,0.0
17.0,?,86786.0,10th,6.0,Never-married,?,Own-child,White,Female,0.0,0.0,40.0,United-States,<=50K,3.0,7.0,1.0,7.0,2.0,0.0,1.0
17.0,?,89870.0,10th,6.0,Never-married,?,Own-child,White,Male,0.0,0.0,40.0,United-States,<=50K,3.0,7.0,1.0,7.0,2.0,0.0,0.0
17.0,?,94366.0,10th,6.0,Never-married,?,Other-relative,White,Male,0.0,0.0,6.0,United-States,<=50K,3.0,7.0,1.0,7.0,5.0,0.0,0.0
17.0,?,103810.0,12th,8.0,Never-married,?,Own-child,White,Male,0.0,0.0,40.0,United-States,<=50K,3.0,11.0,1.0,7.0,2.0,0.0,0.0


#### Combinamos las columnas de características en un solo vector
La mayoría de los algoritmos MLlib requieren una sola columna de características como entrada.

MLlib proporciona el transformador `VectorAssembler` para crear una única columna vectorial a partir de una lista de columnas.

In [0]:
# Importamos la libería que utilizaremos
from pyspark.ml.feature import VectorAssembler

# Definimos las columnas numéricas
numCols=["age","fnlwgt","education_num","capital_gain","capital_loss","hours_per_week"]

# Definimos el conjunto de variables que vienen de las variables categóricas transformadas a números (OHE), más las numéricas
assemblerInputs= [c + "OHE" for c in catcol] + numCols

# Definimos una sola columna de características (entrada)
vecAssembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")

## Paso 3. Definir el modelo

Utilizaremos un **modelo de regresión logística**, ya que se trata de un caso de clasificación binaria

In [0]:
# Importamos la librería necesaria
from pyspark.ml.classification import LogisticRegression

# Definimos el modelo
# features : variables predictoras
# label : varibale objetivo
lr=LogisticRegression(featuresCol="features", labelCol="label", regParam=1.0)

## Paso 4. Construir la canalización

In [0]:
# Importamos la libería necesaria
from pyspark.ml import Pipeline

# Definimos la canalización en función de las etapas creadas en los pasos anteriores
pipeline = Pipeline(stages=[stringIndexer, encoder, labelToIndex, vecAssembler, lr])

# Definimos el modelo de canalización y entrenamos el dataset de entrenamiento
pipelineModel = pipeline.fit(trainDF)

# Aplicamos el modelo de canalización al dataset de prueba
predDF=pipelineModel.transform(testDF) # realizamos una predicción con la función transforma con el dataset de test

Mostrar los resultados de la predicción.

In [0]:
# features : conjunto de variables que unificamos
# label : variable objetivo, que convertimos a 0 y 1
# prediction: resultado que devuleve el modelo
# probability : certeza del modelo y la probabilidad que asigna a una clase
display(predDF.select("features","label","prediction","probability"))

features,label,prediction,probability
"Map(vectorType -> sparse, length -> 59, indices -> List(3, 13, 24, 36, 45, 48, 53, 54, 55, 58), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 17.0, 41643.0, 7.0, 15.0))",0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.9062475766406436, 0.09375242335935641))"
"Map(vectorType -> sparse, length -> 59, indices -> List(3, 15, 24, 36, 45, 48, 52, 53, 54, 55, 58), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 17.0, 64785.0, 6.0, 30.0))",0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.8927692098013015, 0.10723079019869852))"
"Map(vectorType -> sparse, length -> 59, indices -> List(3, 13, 24, 36, 45, 48, 53, 54, 55, 58), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 17.0, 80077.0, 7.0, 20.0))",0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.9041098028218706, 0.09589019717812941))"
"Map(vectorType -> sparse, length -> 59, indices -> List(3, 13, 24, 36, 45, 48, 52, 53, 54, 55, 58), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 17.0, 104025.0, 7.0, 18.0))",0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.8952739426670617, 0.10472605733293827))"
"Map(vectorType -> sparse, length -> 59, indices -> List(3, 15, 24, 36, 45, 48, 53, 54, 55, 58), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 17.0, 139183.0, 6.0, 15.0))",0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.908769684685861, 0.09123031531413905))"
"Map(vectorType -> sparse, length -> 59, indices -> List(3, 8, 24, 36, 45, 49, 52, 53, 54, 55, 58), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 17.0, 148769.0, 9.0, 40.0))",0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.8861378465891755, 0.11386215341082451))"
"Map(vectorType -> sparse, length -> 59, indices -> List(3, 13, 24, 36, 45, 48, 53, 54, 55, 58), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 17.0, 170320.0, 7.0, 8.0))",0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.9089517680637973, 0.0910482319362027))"
"Map(vectorType -> sparse, length -> 59, indices -> List(3, 13, 24, 36, 45, 48, 52, 53, 54, 55, 58), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 17.0, 198797.0, 7.0, 20.0))",0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.8942282062675614, 0.10577179373243861))"
"Map(vectorType -> sparse, length -> 59, indices -> List(3, 19, 24, 36, 45, 53, 54, 55, 58), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 17.0, 241021.0, 8.0, 40.0))",0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.8945664757777927, 0.10543352422220731))"
"Map(vectorType -> sparse, length -> 59, indices -> List(3, 13, 24, 36, 45, 49, 52, 53, 54, 55, 58), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 17.0, 250541.0, 7.0, 8.0))",0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.9099365894880657, 0.09006341051193434))"


**Explicación :** El resultado de label es 0, quiere decir que gana menos de %50K , la prediction es 0 porque no cumple y la probabilidad para la clase 0 de 0.906.. y clase 1 de 0.093.., por eso se le asigna de manera automática la clase 0.

## Paso 5. Evaluar el modelo
- https://spark.apache.org/docs/latest/mllib-evaluation-metrics.html

Para evaluar el modelo, usaremos **BinaryClassificationEvaluator** para evaluar el área bajo la curva ROC y el **MulticlassClassificationEvaluator** para evular la precisión.

In [0]:
# Importamos las librerías necesarias
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

# Obtenemos el ROC
bcEvaluator = BinaryClassificationEvaluator(metricName="areaUnderROC")
print(f"Area under ROC curve: {bcEvaluator.evaluate(predDF)}") # evaluamos el ROC de la predicción

# Obtenemos la precisión
mcEvaluator=MulticlassClassificationEvaluator(metricName="accuracy")
print(f"Accuracy: {mcEvaluator.evaluate(predDF)}") # evaluamos la precisión de la predicción

# MIENTRAS MÁS SE ACERQUEN A 1 MEJOR SERÁ EL MODELO

Area under ROC curve: 0.8832451799218509
Accuracy: 0.7680801850424056


## Paso 6. Ajuste de hiperparámetros <br>
- Para el ajuste de hiperparámetros, **ParamGridBuilder** le permite definir una búsqueda sobre un conjunto de hiperparámetros del modelo.
- Para la validación cruzada, **CrossValidator** le permite especificar un estimador (la canalización que se aplicará al conjunto de datos de entrada), un evaluador, un espacio de hiperparámetros y el número de iteraciones o folds que se utilizarán para la validación cruzada.


Utilice `ParamGridBuilder` y` CrossValidator` para ajustar el modelo. Este ejemplo usa tres valores para `regParam` y tres para` elasticNetParam`, para un total de 3 x 3 = 9 combinaciones de hiperparámetros para que `CrossValidator` las examine.

In [0]:
# Para mejorar el desempeño de un modelo, se realiza un ajuste

# Importamos la libería necesaria
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

paramGrid= (ParamGridBuilder()
            .addGrid(lr.regParam, [0.01,0.5,2.0]) # definimos 3 parámetros para regParam
            .addGrid(lr.elasticNetParam, [0.0,0.5,1.0]) # definimos 3 parámetros para elasticNetParam
            .build() # construimos la matriz de hiperparámetros (3x3=9 combinaciones de hiperparámetros)
           )

Siempre que llamemos a **CrossValidator** en MLlib, Databricks rastrea automáticamente todas las ejecuciones usando MLflow. Podemos utilizar la interfaz de MLflow para comparar el rendimiento de cada modelo.

In [0]:
# Aplicamos el CrossValidator, a través de 3 combinaciones

# Utilizamos el "pipeline" que definimos en la canalización
# El paramGrid que contraimos la matriz de hiperparámetros
# Y el bcEvaluator definido para evaluar el ROC de la predicción
cv = CrossValidator(estimator=pipeline, estimatorParamMaps=paramGrid, evaluator=bcEvaluator, numFolds=3, parallelism = 4)

# Ejecutamos la validación cruzada con el dataset de entrenamiento
cvModel = cv.fit(trainDF)

## Paso 7. Realizar predicciones y evaluar el rendimiento del modelo <br>
Utilizamos el mejor modelo identificado por la validación cruzada para hacer predicciones en el conjunto de datos de prueba y luego evaluamos el rendimiento del modelo utilizando el área bajo la curva ROC.

In [0]:
# Realizamos una predicción con el modelo con los hiperparámetros ajustados
#cvModel : es la mejor combinación de hiperparámetros definidos anteriormente
cvPredDF = cvModel.transform(testDF) # con el dataset de test

# Evalumos la predicción con hiperparámetros ajustado
print(f"Area under ROC curve: {bcEvaluator.evaluate(cvPredDF)}") # evaluamos el ROC de la predicción ajustada
print(f"Accuracy: {mcEvaluator.evaluate(cvPredDF)}") # evaluamos la precisión de la predicción ajustada

# EL DESEMPEÑO DEL MODELO MEJORÓ

Area under ROC curve: 0.9023398315950719
Accuracy: 0.8450269853508096


Con los comandos SQL, también podemos mostrar predicciones agrupadas por edad y ocupación. Esto requiere crear una vista temporal del conjunto de datos de predicciones.

In [0]:
# Creamos la vista temporal, para el uso de SQL
cvPredDF.createOrReplaceTempView("Final_predictions")

In [0]:
%sql
-- CONTEO AGRUPADO POR OCUPACIÓN Y PREDICCIÓN
-- 0 : Ganan menos $50K
-- 1 : Ganan más de $50K
SELECT occupation, prediction, count(*) as Cantidad  
FROM Final_predictions
GROUP BY occupation,prediction
ORDER BY occupation

occupation,prediction,Cantidad
?,0.0,349
?,1.0,14
Adm-clerical,1.0,54
Adm-clerical,0.0,731
Armed-Forces,0.0,3
Craft-repair,1.0,72
Craft-repair,0.0,771
Exec-managerial,0.0,381
Exec-managerial,1.0,415
Farming-fishing,1.0,8


Databricks visualization. Run in Databricks to view.

In [0]:
%sql
-- CONTEO AGRUPADO POR EDAD Y PREDICCIÓN
-- 0 : Ganan menos $50K
-- 1 : Ganan más de $50K
SELECT age, prediction, count(*) as Cantidad  
FROM Final_predictions
GROUP BY age,prediction
ORDER BY age

age,prediction,Cantidad
17.0,0.0,57
18.0,1.0,1
18.0,0.0,95
19.0,0.0,134
20.0,0.0,139
21.0,0.0,147
21.0,1.0,1
22.0,0.0,148
23.0,1.0,1
23.0,0.0,191


Databricks visualization. Run in Databricks to view.