![](./images/itam_logo.png)

M. Sc. Liliana Millán Núñez liliana.millan@itam.mx

Marzo 2021


## SparkML

### Agenda 

+ SparkML
    + Pipelines
    + Feature engineering
    + Clasificación y regresión
    + Agrupación
    + Tuneo de hiperparámetros
+ Ejemplo

### Spark ML

`spark.ml` es el módulo de *machine learning* de Spark, diseñado para realizar *machine learning* dentro de spark de manera escalable, sencilla y aprovechando el procesamiento en paralelo.

**Características** 

+ Tiene algoritmos de ML ya implementados con la modificaciones necesarias para aprovechar el ambiente distribuido donde vive Spark: clasificación, regresión, agrupación, filtros colaborativos, etc. 
+ Tiene implementaciones de funciones que ocupamos para hacer *feature engineering*: *feature extraction*, *feature selection*, transformaciones, reducción de dimensionalidad.
+ Permite generar *pipelines* en Spark al estilo de los pipelines de `scikitlearn`.
+ Tiene una libería de utilerías con álgebra lineal, estadística, manejo de datos, etc.

¿Por qué hay un `spark.mllib` y un `spark.ml`? 

En la primer versión de Spark no existía la abstracción de *DataFrame* -el *wrapper* de los RDD- y todos los algoritmos de ML desarrollados en Spark interactuaban directamente con el RDD, todas estas implementaciones se encuentran en el paquete `spark.mllib` -que ya está descontinuada-. Una vez que salió la versión 2 de Spark y con ella los nuevos objetos *SparkSession* y *DataFrame* los algoritmos de ML fueron modificados -algunos- para que solo tengan interacción con la abstracción *DataFrame* y con ello surgió la librería `spark.ml` que es la que utilizaremos nosotros. Aún no están todos los algoritmos de `spark.mllib` implementados en *DataFrame*, en la versión 2.4.5 de Spark, la librería de `spark.mllib` ya está en estatus de solo mantenimiento para que a partir de Spark 3.0 la librería será removida completamente de Spark y solo ocupar la interacción con los *DataFrames*. 

*Anyway* Para confundir más a la banda, el nombre oficial de la herramienta que ocupa Spark para ML se conoce como **MLlib** (╯°□°)╯︵ ┻━┻ aunque realmente se refieren a la librería `spark.ml`.

[Spark ML API](https://spark.apache.org/docs/2.4.7/ml-guide.html)

#### Pipelines 

El diseño de los *pipelines* de Spark está inspirado en los *pipelines* de `scikit-learn`. Un *pipeline* en Spark está formado por los siguientes elementos: 

+ **DataFrame:** API que ocupa los *DataFrame* de SparkSQL para poder agregar otros tipos de datos que pueden ser útiles para ML -*vector*-

+ **Transformer:** Algoritmo que transforma un *DataFrame* en otro DataFrame, recuerda que los *DataFrame* en Spark envuelven a un RDD y un RDD no puede ser modificado!. Para hacer una transformación se ocupa el método `transform()`. Los casos en los que ocuparemos un `transform` pueden ser agregar una nueva columna -por ejemplo *feature engineering*-, o por ejemplo una vez que se ha pasado un modelo de aprendizaje poner la respuesta final del modelo como parte del *DataFrame* original -etiqueta, score-. 

+ **Estimator:** Procesos o algoritmos que se aplican a un *DataFrame* para producir un *Transformer*. Los estimadores son los que ocupan el método `fit()` para poder realizar un entrenamiento. El método `fit` recibe como parámetro un *DataFrame* y devuelve un modelo -que es un *transformer*-. Por ejemplo: Un algoritmo de regresión lineal es un *estimator* que tiene su método `fit` a través del cual entrena el algoritmo. 


$\rightarrow$ Es importante conocer que por cada instancia de un *transformer* o *estimator* se genera un ID a través del cuál es reconocido durante todo el *pipeline* y por lo tanto podemos llamarlo más adelante en el pipeline.

+ **Pipeline:** Es una secuencia de procesos/etapas generado por *transformers* y *estimators* para hacer un *workflow* de ML. Cada *transformer*/*estimator* es una etapa dentro de la secuencia del *pipeline*, cada paso se corre en el orden establecido y el *DataFrame* de entrada es transformado por cada paso, si el paso es un *transformer* entonces se le aplica el método `transform` y si el paso es un `estimator` se le aplica el método `fit`. 


![](./images/spark_estimator_transformer.png)
<br>

Por ejemplo: Si tuviéramos un texto al cuál quisieramos aplicarle un análisis de sentimiento, el *pipeline* podría consistir en los siguientes pasos: 

Suposiciones: 
+ Tenemos un corpus.
+ Tenemos las palabras asociadas a un sentimiento.

+ Separar cada documento en palabras.
+ Convertir cada palabra de cada documento en un vector numérico.
+ Utilizando el vector numérico y las etiquetas asociadas -del sentimiento- ocupar un modelo de clasificación 

![](./images/spark_pipelines.png)
<br>
* Fuente: [Spark ML Guide](https://spark.apache.org/docs/2.4.7/ml-guide.html)


![](./images/pointer.png) En Spark, un pipeline **es** un *estimator* (al igual que en `sklearn`, por lo que puede hacer llamada al método `fit`, al hacer esto se genera un *PipelineModel* -que es un *transformer*-. Cuando querramos ocupar modelos entrenados para producción deberemos ocupar el *PipelineModel* generado en el momento de entranamiento al hacer una llamada a su método *transform*, de esta manera todos los *estimators* del *pipeline* original son convertidos a *transformer* asegurándonos de que en pruebas tendremos los mismos pasos/trasnformaciones ocupados para el entrenamiento del modelo. ╭(◔ ◡ ◔)/

![](./images/spark_pipeline_model.png)
<br>
* Fuente: [Spark ML Guide](https://spark.apache.org/docs/2.4.7/api/python/pyspark.ml.html#module-pyspark.ml) 

Un *Pipeline* en Spark está representado como un DAG, el ejemplo anterior es un DAG lineal, pero no necesariamente deben ser lineales, basta con que cumplan las características de ser un DAG -grafo **acíclico** dirigido-. Es por esta razón que cada instanciación de un *transformer* o *estimator* tiene asociado un ID y debe ser único, si necesitáramos un mismo *transformer* en el *pipeline* requerimos de generar otro *transformer* -aunque tenga el mismo código- :( (ya sé! esto medio que le da en la ma al principio de *reuse* pero ... por el momento así se resuelve en Spark en pro de tener un *pipeline*), Spark revisa en tiempo de ejecución que no se rompa "algo" antes de correr el *pipeline* -*lazy*-.

+ **Parameter:** API con la que se pueden compartir parámetros entre *Estimators* y *Transformers*. Ocupamos el objeto `Param` que es un parámetro nombrado con documentación auto contenida en un `ParamMap` -diccionario de parámetro, valor-.

En Spark hay dos maneras de pasar parámetros a los algoritmos de ML:

1. Configurar los parámetros fijos de los algoritmos a ocupar (*setters*)
2. Pasar un `ParamMap` con los parámetros y sus valores a través de `fit` o `transform`, si se envían parámetros de esta manera se hace *override* a los específicados vía *setters*

Lo lindo de estos objetos es que cada definición dentro del `ParamMap` es "atado" a un *estimator* o *transformer* en específico -a través del ID antes mencionado-. Por ejemplo: si tuvieramos en un *pipeline* dos regresiones logísticas -`lr1` y `lr2`- podríamos ocupar un `ParamMap` que establezca el valor de las iteraciones máximas de cada regresión: 

#### I/O de *pipeline* o modelos

Desde Spark 1.6 se agregó la posibilidad de guardar modelos y *pipelines* implementados en Spark para poder ocuparlos después, pero no todos los algoritmos de `spark.ml` tienen esta posibilidad -tiene que ver con la paridad que mencionamos al principio de la libería `spark.mllib` y la `spark.ml`-, por lo que se requiere revisar la documentación específica de cada algoritmo y ver si se puede y cómo. ¯\\_(ツ)_/¯

El paquete `spark.ml.util` tiene los objetos `MLWriter` y `MLReader` que permiten guardar y cargar modelos sin importar el lenguaje en el que se hayan implementado -Scala, Java, Python o R (implementaciones **específicas** para Spark!)-

**Pipeline:** [Spark ML API](https://spark.apache.org/docs/2.4.7/api/python/pyspark.ml.html#module-pyspark.ml)

+ En Spark 2.4.7 existe el método `save(path)` para poder guardar un *pipeline* en el *path* indicado, es un *shortcut* a `write.save(path)` 
+ En Spark 2.4.7 existe el método `load(path)` para poder cargar un *pipeline*, es un *shortcut* a `read.load(path)`

**Modelos:**

Los modelos que tengan posibilidad de ser guardados tendrán los métodos de `save(path)` y `load(path)` (verificar documentación del API para el modelo ocupado)

#### Ejemplos:

1. Veremos un pequeño ejemplo de cómo ocupar los *estimators* y *transformers* (viene en la documentación de Spark).


+ [Crear un DataFrame](https://spark.apache.org/docs/2.4.7/api/python/pyspark.sql.html#pyspark.sql.SparkSession)

El código que se muestra en cada uno de los ejemplos no corre aquí directamente (necesita de spark!)

El siguiente ejemplo se encuentra en la carpeta `scripts/spark/sparkml_logisit_regression.json` para que puedas cargarlo en Zeppelin y echarlo a andar. 


In [None]:
#pyspark 
from pyspark.ml.linalg import Vectors
from pyspark.ml.classification import LogisticRegression

# En Spark se puede crear un DataFrame de un RDD, de una lista o de un DataFrame de Pandas, 
# aquí lo estamos creando con una lista que contiene tuplas de (label, features)
# tal cual lo hacíamos en sklearn, y le estamos agregando los nombres de cada columna.
# Vectors.dense recibe una lista como parámetro
# Este DataFrame lo estamos ocupando como nuestro set de entrenamiento mock!
training = spark.createDataFrame([
    (1.0, Vectors.dense([0.0, 1.1, 0.1])),
    (0.0, Vectors.dense([2.0, 1.0, -1.0])),
    (0.0, Vectors.dense([2.0, 1.3, 1.0])),
    (1.0, Vectors.dense([0.0, 1.2, -0.5]))], ["label", "features"])

# Como lo hacíamos en sklearn, primero configuramos los hiperparámetros del modelo que queremos
# ocupar.
lr = LogisticRegression(maxIter=10, regParam=0.01)
# Veamos la documentación del modelo y qué parametros le pusimos a nuestra
# configuración
print("LogisticRegression parameters:\n" + lr.explainParams() + "\n")

# Ocupemos el modelo que configuramos para entrenar con lo datos que
# creamos en el DataFrame training
model_1 = lr.fit(training)

# model_1 es un transfomer creado a traves de un estimador (LogisticRegression)
print("Model 1 was fit using parameters: ")
# aquí estamos obteniendo la configuración con la que se entrenó
# la regresión logística que ocupamos
print(lr.extractParamMap())


# Tambien podemos especificar los parametros con los que queremos que 
# corra el modelo utilizando el diccionario de ParamMap
# Creamos un diccionario -se puede llamar como quieras!- que tenga
# como llave el nombre del parametro que quieres modificar, con el valor
# correspondiente.
param_map = {lr.maxIter: 20}
# Si el valor ya existe en el diccionario puedes actualizarlo
param_map[lr.maxIter] = 30  
# Tambien puedes actualizar varios parametros del diccionario al mismo tiempo
param_map.update({lr.regParam: 0.1, lr.threshold: 0.55}) 

# Se pueden combinar diferentes diccionarios...realmente puedes tener
# un solo diccionario con los parámetros de diversos modelos que ocupes en el
# pipeline sin ningun problema, pues el valor asociado es por objeto (ID)
# Aquí estamos cambiando el nombre de la columna que guarda la salida del
# modelo, por default se llama 'probability' -> verificar documentacion del 
# metodo
param_map_2 = {lr.probabilityCol: "my_probability"}  
param_map_combined = param_map.copy()
param_map_combined.update(param_map_2)
#puedes ver el contenido del diccionario con param_map_combined.items() -> python 3.5.2

# Entrenemos una segunda regresión logística con los nuevos parámetros que 
# establecimos a traves del paramMap
# En este fit estamos enviando tanto los datos como los parámetros a ocupar en el
# modelo de regresión logística
model_2 = lr.fit(training, param_map_combined)
print("Model 2 was fit using parameters: ")
# aqui queremos ver con qué parámetros se quedó configurado el modelo
# que ocupamos para entrenar
print(lr.extractParamMap())

# Creemos el data frame que tendrá los datos de prueba mock!
test = spark.createDataFrame([
    (1.0, Vectors.dense([-1.0, 1.5, 1.3])),
    (0.0, Vectors.dense([3.0, 2.0, -0.1])),
    (1.0, Vectors.dense([0.0, 2.2, -1.5]))], ["label", "features"])

# Ahora sí, hagamos predicciones sobre el *set* de pruebas utilizando 
# el método `transform` (aquí no hay `predict`!)
prediction = model_2.transform(test)
# la respuesta es un DataFrame (la salida de un transform en su DataFrame)
# por lo que podemos aplicarle los metodos de SparkSQL :)
# verificamos que si es un DataFrame...
type(prediction)
# veamos que columnas tiene este DataFrame (como el names de R)
prediction.columns
# Aqui estamos seleccionando las columnas features, label, 
# my_probability -> que es el nombre que nosotros especificamos anteriormente en
# ParamMap, y la columna prediction que es el nombre por default que regresa
# el modelo al parametro 'predictionCol' -> ver documentacion
# el collect hara que se regresen los resultados al drive!!! 
result = prediction.select("features", "label", "my_probability", "prediction") \
    .collect()


for row in result:
    print("features={}, label={} -> prob={}, prediction={}".format( \
    row.features, row.label, row.my_probability, row.prediction))
    
type(model_1)

¿Qué pasó en este ejemplo?

![](./images/spark_ex1.png)

2. Ahora hagamos el ejemplo de texto para ver cómo se hacen los *pipeline* en Spark (viene en la documentación de Spark).


+ [Tokenizer](https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.feature.Tokenizer)
+ [HashingTF (Hashing with Term Frequency -> MurmurHash3)](https://spark.apache.org/docs/2.1.0/api/python/pyspark.ml.html#pyspark.ml.feature.HashingTF)

El siguiente ejemplo se encuentra en la carpeta `scripts/spark/sparkml_pipeline.json` para que puedas cargarlo en Zeppelin y echarlo a andar. 


In [None]:
#pyspark
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer
    
# al igual que en el ejemplo anterior, creamos un dataframe a través
# de una lista con los datos de entrenamiento, la lista esta formada
# por tuplas (id, texto, label). Esta forma no nos servirá para poder meterla
# en los objetos de ML, pero más adelante arreglaremos esto
training = spark.createDataFrame([
    (0, "a b c d e spark", 1.0),
    (1, "b d", 0.0),
    (2, "spark f g h", 1.0),
    (3, "hadoop mapreduce", 0.0)
], ["id", "text", "label"])


# Definimos los transformers: Tokenizer y HashingTF, y los 
# estimators: LogisticRegression que ocuparemos. Nota que aquí no hemos hecho
# ningun fit todavia... la magia vendrá más adelante ;)
# Tokenizer convierte el string de entrada (inputCol) a minúsculas y separa en
# palabras utilizando como separador el espacio
tokenizer = Tokenizer(inputCol="text", outputCol="words")
# HashingTF permite hashear cada palabra utilizando MurmurHash3 convirtiendo
# el hash generado en el índice a poner en el "TDM". Este método optimiza el
# tiempo para generar el TDM de TF-IDF "normal". Para evitar colisiones en
# la conversión a hash se aumenta el número de buckets -se recomienda ocupar
# potencias de 2 para balancear las cubetas-
# Nota que en este transformer estamos ocupando como entrada la salida del
# transformer Tokenizer
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
# Ocuparemos una regresión logistica de nuex
lr = LogisticRegression(maxIter=10, regParam=0.001)
# Aqui viene lo bonito... definimos un pipeline que tiene como etapas/pasos
# primero el tokenizer, luego el hashing y luego la regresión logística. Aquí
# estamos definiendo el flujo de procesamiento, el DAG! 
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])


# Voila, solo se requiere de hacer fit al pipeline para que esto funcione
# como un pipeline, siguiendo el orden de los pasos establecidos en la 
# definicion del pipeline :) ... recuerda que el fit hace 
# el entrenamiento una vez que ya definimos las configuraciones de 
# los objetos que ocuparemos (transformers y estimators)
model = pipeline.fit(training)

# Creamos el dataframe de pruebas mock! -> Nota que aqui no hay 
# label!!!! (asi funcionaría en producción cierto!)
test = spark.createDataFrame([
    (4, "spark i j k"),
    (5, "l m n"),
    (6, "spark hadoop spark"),
    (7, "apache hadoop")
], ["id", "text"])


# Lixto, "ejecutamos" el pipeline haciendo un transform al pipeline para 
# obtener las predicciones del set de pruebas
prediction = model.transform(test)

# De nuevo, prediction es un DataFrame generado con un transformer generado
# a través de estimadores y transformers :) 
# Seleccionamos las columnas que queremos ver 
selected = prediction.select("id", "text", "probability", "prediction")
for row in selected.collect():
    rid, text, prob, prediction = row
    print("({}, {}) --> prob={}, prediction={}".format( \
    rid, text, str(prob), prediction))

![](./images/spark_ex2.png)
<br>

#### Feature engineering

En este módulo de la librería de ML (`pyspark.ml.feature`) se encuentran las funciones asociadas a las siguientes acciones: 

+ **Feature extraction:** Extraer características -*features*- de datos crudos -*raw*-
+ **Feature transformation:** Escalar, convertir o modificar características -*features*-
+ **Feature selection:** Seleccionar un subconjunto de variables/características -*feataures*- de un conjunto más amplio
+ **Locality Sensitive Hashing (LSH):** Algoritmos que se ocupan para obtener *feature transformation* 

En el ejemplo del texto ocupamos métodos de *feature transformantion*: `Tokenizer`, y de LSH: `HashingTF`. En esta parte, veremos ejemplos de los métodos más utilizados en cada una de las categorías mencionadas, esta parte  no es en absoluto exhaustiva pues Spark cuenta con muchos métodos implementados, solo es para que se den una idea de cómo se ocupan en Spark. ([Spark ML feature API](https://spark.apache.org/docs/latest/ml-features.html))

![](./images/spark_feature_module.png)
<br>
\*Fuente: [Spark ML Guide](https://spark.apache.org/docs/latest/ml-guide.html)

3. **Feature extraction**

a. **TF-IDF**

[Spark TF-IDF](https://spark.apache.org/docs/latest/ml-features.html#tf-idf)

Solo para recordar, TF-IDF es un algoritmo de minería de texto ocupado normalmente en problemas de IR a través del cual, contando la frecuencia de aparición de una palabra en todo la colección de documentos y en la frecuencia dentro de cada documento, se establece la relevancia de un documento dado un query de búsqueda. 

$$tf\_idf=tf \cdot log_{10}\frac{N}{df}$$ 

En este ejemplo, TF-IDF se ocupa como una transformación a una variable *raw* -las palabras- para ser ocupadas como un *feature* en otro algoritmo de aprendizaje de máquina.

El siguiente ejemplo se encuentra en la carpeta `scripts/spark/sparkml_idf.json` para que puedas cargarlo en Zeppelin y echarlo a andar. 


In [None]:
#pyspark
from pyspark.ml.feature import HashingTF, IDF, Tokenizer, CountVectorizer

# Creamos nuestro set de entrada para formar la TDM
sentence_data = spark.createDataFrame([
    (0.0, "Hi I heard about Spark"),
    (0.0, "I wish Java could use case classes"),
    (1.0, "Logistic regression models are neat")
], ["label", "sentence"])

# Ocupamos el transformer Tokenizer para separar por palabras
tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
# Aqui no hay train! porque no estamos entrenando nanda... estamos en un problema
# de IR. Tokenizer no tiene un metodo fit -no hay entrenamiento-
words_data = tokenizer.transform(sentence_data)

# Ocupamos el estimador CountVectorizer para generar una matriz de 
# terminos y sus frecuencias 
count_vectorizer = CountVectorizer(inputCol="words", outputCol="raw_features")
featurized_model = count_vectorizer.fit(words_data)
featurized_data = featurized_model.transform(words_data)
featurized_data.show(truncate=False)

# Ocupamos IDF para obtener el IDF de la coleccion de documentos mock que 
# generamos. IDF si tiene un metodo fit a traves del cual le enviamos el set 
# de tokens al que queremos obtener el IDF
idf = IDF(inputCol="raw_features", outputCol="features", minDocFreq=1)
# Aqui obtenemos el modelo a ocupar (transformer) a ocupar 
idf_model = idf.fit(featurized_data)
rescaled_data = idf_model.transform(featurized_data)

rescaled_data.select("label", "features").show(truncate=False)

![](./images/spark_tfidf.png)
<br>


4. **Feature transformation** 

a. **OneHotEncoder** 

Solo para recordar, *one hot encoding* transforma una variable categórica de $n$ categorías a $n$ variables binarias, normalmente ocupamos esta transformación para ocupar variables categóricas en algoritmos que solo ocupan representaciones numéricas -normalmente aquellos algoritmos que ocupand distancias-


+ [Spark OneHotEncoder](https://spark.apache.org/docs/2.1.0/ml-features.html#onehotencoder)
+ [Spark StringIndexer](https://spark.apache.org/docs/2.1.0/ml-features.html#stringindexer)

El siguiente ejemplo se encuentra en la carpeta `scripts/sparkml_one_hot_encoder.json` para que puedas cargarlo en Zeppelin y echarlo a andar. 


In [None]:
#pyspark
from pyspark.ml.feature import OneHotEncoder, StringIndexer

# creamos nuestro set de datos de entrada categorico
df = spark.createDataFrame([
    (0, "a"),
    (1, "b"),
    (2, "c"),
    (3, "a"),
    (4, "a"),
    (5, "c")
], ["id", "category"])

# Esta funcion agrega un id numerico a cada valor diferente de un valor categorico 
# es como establecer los niveles en R de una factor pero los niveles son numericos,
# sus id. El indice se establece por orden de frecuencia (descendente), por lo que 
# el indice 0 corresponde a la variable que aparece con mas frecuencia
string_indexer = StringIndexer(inputCol="category", outputCol="categoryIndex")
model = string_indexer.fit(df)
indexed = model.transform(df)
indexed.show()

# OneHotEncoder no tiene un fit ya que solo es un transformador
encoder = OneHotEncoder(inputCol="categoryIndex", outputCol="categoryVec")
encoded = encoder.transform(indexed)
encoded.show()

![](./images/spark_onehot.png)
<br>

b. **MinMaxScaler** 

Es la normalización de minería de datos $\frac{x-min}{max-min}$

El siguiente ejemplo se encuentra en la carpeta `scripts/sparkml_minmax_scaler.json` para que puedas cargarlo en Zeppelin y echarlo a andar. 


In [None]:
#pyspark
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.linalg import Vectors

data_frame = spark.createDataFrame([
    (0, Vectors.dense([1.0, 0.1, -1.0]),),
    (1, Vectors.dense([2.0, 1.1, 1.0]),),
    (2, Vectors.dense([3.0, 10.1, 3.0]),)
], ["id", "features"])
data_frame.show()

# Configuramos el estimator MinMaxScaler como lo necesitamos
scaler = MinMaxScaler(inputCol="features", outputCol="scaled_features")

# Creamos el modelo MinMaxScaler (transformer)
scaler_model = scaler.fit(data_frame)

# Transformamos los datos reescalando 
scaled_data = scaler_model.transform(data_frame)
# Nota que cuando pedimos getMin y getMax lo hacemos al estimator, no al modelo
print("Features scaled to range: [{}, {}]".format(scaler.getMin(), scaler.getMax()))
scaled_data.select("features", "scaled_features").show(truncate=False)

![](./images/spark_mimaxscaler.png)
<br>

c. **StandardScaler**

Corresponde a la estandarización en minería de datos $\frac{x-\mu}{\sigma}$

El siguiente ejemplo se encuentra en la carpeta `scripts/sparkml_standard_scaler.json` para que puedas cargarlo en Zeppelin y echarlo a andar. 


In [None]:
#pyspark
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import StandardScaler

# Creamos el data frame que queremos estandarizar
data_frame = spark.createDataFrame([
    (0, Vectors.dense([1.0, 0.1, -1.0]),),
    (1, Vectors.dense([2.0, 1.1, 1.0]),),
    (2, Vectors.dense([3.0, 10.1, 3.0]),)
], ["id", "features"])
# Configuramos el estimator StandarScaler como lo necesitamos (por default
# withMean esta en False porque hace que se regrese un vector dense...
# hay que tener cuidado con eso cuando estemos manejandoo vectores sparse
scaler = StandardScaler(inputCol="features", outputCol="scaled_features",
                        withStd=True, withMean=True)
# Creamos el modelo StandardScaler para los datos de entrada
scaler_model = scaler.fit(data_frame)

# Transformamos los datos 
scaled_data = scaler_model.transform(data_frame)
scaled_data.show(truncate=False)


![](./images/spark_standardscaler.png)
<br>

### Grid Search 

El procesamiento en paralelo ha habilitado la posibilidad de probar un algoritmo con diferentes configuraciones de hiperparámetros y también muchos posibles algoritmos al mismo problema para poder encontrar la mejor configuración asociada al problema y el algoritmo con mejor desempeño. 

El objeto `GridSearchCV` (estimador) de scklearn nos permite cubrir la primera posibilidad -probar diferentes configuraciones de hiperparámetros de un mismo algoritmo- ocupando el un diccionario `param_grid` en donde por cada parámetro a probar establecemos los diferentes valores que queremos explorar con la ventaja de poderlo probar con un *pipeline* asociado. 

Por otro lado, para cubrir la segunda posibilidad -probar con diferentes algoritmos- usualmente ocumapos un par de algoritmos diferentes de forma "manual" pasando de uno simple a otro más complejo. Sin embargo, cuando queremos probar diferentes algoritmos a un mismo problema para ver sus diferencias en desempeño ocupamos un **magic loop** en donde especificamos diferentes algoritmos y además aprovechamos para poner diferentes configuraciones de hiperparámetros a cada algoritmo. 

Por ejemplo: Si quisieramos probar con un Árbol, un RandomForest y una Regresión Logística ($\leftarrow$ ¿qué queremos hacer?), probando con diferentes hiperparámetros para cada algoritmo podríamos generar el siguiente script de *magic loop*. 

El siguiente ejemplo se encuentra en la carpeta `scripts/sparkml_grid_search.json` para que puedas cargarlo en Zeppelin y echarlo a andar. 


In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

# Prepare training documents, which are labeled.
training = spark.createDataFrame([
    (0, "a b c d e spark", 1.0),
    (1, "b d", 0.0),
    (2, "spark f g h", 1.0),
    (3, "hadoop mapreduce", 0.0),
    (4, "b spark who", 1.0),
    (5, "g d a y", 0.0),
    (6, "spark fly", 1.0),
    (7, "was mapreduce", 0.0),
    (8, "e spark program", 1.0),
    (9, "a e c l", 0.0),
    (10, "spark compile", 1.0),
    (11, "hadoop software", 0.0)
], ["id", "text", "label"])

# Configure an ML pipeline, which consists of tree stages: tokenizer, hashingTF, and lr.
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])

# We now treat the Pipeline as an Estimator, wrapping it in a CrossValidator instance.
# This will allow us to jointly choose parameters for all Pipeline stages.
# A CrossValidator requires an Estimator, a set of Estimator ParamMaps, and an Evaluator.
# We use a ParamGridBuilder to construct a grid of parameters to search over.
# With 3 values for hashingTF.numFeatures and 2 values for lr.regParam,
# this grid will have 3 x 2 = 6 parameter settings for CrossValidator to choose from.
paramGrid = ParamGridBuilder() \
    .addGrid(hashingTF.numFeatures, [10, 100, 1000]) \
    .addGrid(lr.regParam, [0.1, 0.01]) \
    .build()

crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=BinaryClassificationEvaluator(), #solo auc o areaunderPR:(
                          numFolds=5)  # use 3+ folds in practice

# Run cross-validation, and choose the best set of parameters.
cvModel = crossval.fit(training)

# Prepare test documents, which are unlabeled.
test = spark.createDataFrame([
    (4, "spark i j k"),
    (5, "l m n"),
    (6, "mapreduce spark"),
    (7, "apache hadoop")
], ["id", "text"])

# Make predictions on test documents. cvModel uses the best model found (lrModel).
prediction = cvModel.transform(test)
selected = prediction.select("id", "text", "probability", "prediction")
for row in selected.collect():
    print(row)