In [1]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

In [2]:
sc = SparkContext(conf=SparkConf())
spark = SparkSession(sparkContext=sc)

# Datos de ejemplo

In [3]:
import pandas as pd
pdf = pd.DataFrame({
        'x1': ['a','a','b','b', 'b', 'c'],
        'x2': ['apple', 'orange', 'orange','orange', 'peach', 'peach'],
        'x3': [1, 1, 2, 2, 2, 4],
        'x4': [2.4, 2.5, 3.5, 1.4, 2.1,1.5],
        'y1': [1, 0, 1, 0, 0, 1],
        'y2': ['yes', 'no', 'no', 'yes', 'yes', 'yes']
    })
df = spark.createDataFrame(pdf)
df.show()

+---+------+---+---+---+---+
| x1|    x2| x3| x4| y1| y2|
+---+------+---+---+---+---+
|  a| apple|  1|2.4|  1|yes|
|  a|orange|  1|2.5|  0| no|
|  b|orange|  2|3.5|  1| no|
|  b|orange|  2|1.4|  0|yes|
|  b| peach|  2|2.1|  0|yes|
|  c| peach|  4|1.5|  1|yes|
+---+------+---+---+---+---+



# Pipeline

Pipeline es una secuencia de etapas que consiste en **Estimadores** y/o **Transformadores**. El **Estimator** tiene el método **`fit`** y el **Transformer** tiene el método **`transformar`**. Por lo tanto, podemos decir, **una tubería es una secuencia de métodos de `adaptación` y `transformación`**. Cuando es un método **`fit`**, se aplica a los datos de entrada y se convierte en un método **`transform`**. Entonces el método **`transformar`** se aplica a los datos *ajustados** y a los datos de salida **transformados**. **La salida de datos transformados de la etapa anterior debe ser una entrada aceptable para el método de ajuste/transformación de la siguiente etapa**.

![spark pipeline](images/spark-pipeline.png)

In [5]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder

## Ejemplo

Vamos a usar pipeline para las columnas `StringIndex` x1, x2, y1, y1, e y2. Entonces nosotros `OneHotEncode` las columnas resultantes `StringIdexed`.

In [21]:
stringindex_stages = [StringIndexer(inputCol=c, outputCol='idx_' + c) for c in ['x1', 'x2', 'y1', 'y2']]
stringindex_stages

[StringIndexer_4822a1cd65667da67acd,
 StringIndexer_458889022f08259328b3,
 StringIndexer_4cf4afdb4c495ecc227f,
 StringIndexer_4460a02263a04635288b]

In [22]:
onehotencode_stages = [OneHotEncoder(inputCol='idx_' + c, outputCol='ohe_' + c) for c in ['x1', 'x2', 'y1', 'y2']]
onehotencode_stages

[OneHotEncoder_4435bac1095e2bed0952,
 OneHotEncoder_480db42beb32e2d6cb83,
 OneHotEncoder_498cbd54623097da2b6c,
 OneHotEncoder_460c95da4c6185ac4fe2]

Nota Tenga en cuenta que la etiqueta **outputCol label** en las etapas de StringIndex es la misma que la etiqueta **inputCol** en las etapas de OneHotEncode.

## Elementos de la lista de etapas

In [23]:
all_stages = stringindex_stages + onehotencode_stages
[type(x) for x in all_stages]

[pyspark.ml.feature.StringIndexer,
 pyspark.ml.feature.StringIndexer,
 pyspark.ml.feature.StringIndexer,
 pyspark.ml.feature.StringIndexer,
 pyspark.ml.feature.OneHotEncoder,
 pyspark.ml.feature.OneHotEncoder,
 pyspark.ml.feature.OneHotEncoder,
 pyspark.ml.feature.OneHotEncoder]

En la lista anterior, **`pyspark.ml.feature.StringIndexer`** es un **Estimator** (tiene un método de ajuste) y **`pyspark.ml.feature.OneHotEncoder`** es un **transformador** (tiene un método de transformación).

## Construir y ejecutar un pipeline

In [24]:
Pipeline(stages=all_stages).fit(df).transform(df).show()

+---+------+---+---+---+---+------+------+------+------+-------------+-------------+-------------+-------------+
| x1|    x2| x3| x4| y1| y2|idx_x1|idx_x2|idx_y1|idx_y2|       ohe_x1|       ohe_x2|       ohe_y1|       ohe_y2|
+---+------+---+---+---+---+------+------+------+------+-------------+-------------+-------------+-------------+
|  a| apple|  1|2.4|  1|yes|   1.0|   2.0|   1.0|   0.0|(2,[1],[1.0])|    (2,[],[])|    (1,[],[])|(1,[0],[1.0])|
|  a|orange|  1|2.5|  0| no|   1.0|   0.0|   0.0|   1.0|(2,[1],[1.0])|(2,[0],[1.0])|(1,[0],[1.0])|    (1,[],[])|
|  b|orange|  2|3.5|  1| no|   0.0|   0.0|   1.0|   1.0|(2,[0],[1.0])|(2,[0],[1.0])|    (1,[],[])|    (1,[],[])|
|  b|orange|  2|1.4|  0|yes|   0.0|   0.0|   0.0|   0.0|(2,[0],[1.0])|(2,[0],[1.0])|(1,[0],[1.0])|(1,[0],[1.0])|
|  b| peach|  2|2.1|  0|yes|   0.0|   1.0|   0.0|   0.0|(2,[0],[1.0])|(2,[1],[1.0])|(1,[0],[1.0])|(1,[0],[1.0])|
|  c| peach|  4|1.5|  1|yes|   2.0|   1.0|   1.0|   0.0|    (2,[],[])|(2,[1],[1.0])|    (1,[],[]

## Reordenar etapas pipeline

En el ejemplo anterior, nuestra estrategia es **StringIndex** las cuatro columnas y luego **OneHotEncode**. Dado que cada etapa **OneHotEncode** sólo depende de la salida de su correspondiente etapa **StringIndex**, nuestra lista de etapas podría ser **`[stringindexer en x1, onehotencoder en x1, stringindexer en x2, onehotencoder en x2, stringindexer en y1, onehotencoder en y1, stringindexer en y2, onehotencoder en y2]`**.

### Etapas antiguas

In [33]:
all_stages

[StringIndexer_4822a1cd65667da67acd,
 StringIndexer_458889022f08259328b3,
 StringIndexer_4cf4afdb4c495ecc227f,
 StringIndexer_4460a02263a04635288b,
 OneHotEncoder_4435bac1095e2bed0952,
 OneHotEncoder_480db42beb32e2d6cb83,
 OneHotEncoder_498cbd54623097da2b6c,
 OneHotEncoder_460c95da4c6185ac4fe2]

### Nuevas etapas

In [35]:
new_all_stages = [all_stages[x] for x in [0,4,1,5,2,6,3,7]]
new_all_stages

[StringIndexer_4822a1cd65667da67acd,
 OneHotEncoder_4435bac1095e2bed0952,
 StringIndexer_458889022f08259328b3,
 OneHotEncoder_480db42beb32e2d6cb83,
 StringIndexer_4cf4afdb4c495ecc227f,
 OneHotEncoder_498cbd54623097da2b6c,
 StringIndexer_4460a02263a04635288b,
 OneHotEncoder_460c95da4c6185ac4fe2]

## Construir y ejecutar pipeline

In [36]:
Pipeline(stages=new_all_stages).fit(df).transform(df).show()

+---+------+---+---+---+---+------+-------------+------+-------------+------+-------------+------+-------------+
| x1|    x2| x3| x4| y1| y2|idx_x1|       ohe_x1|idx_x2|       ohe_x2|idx_y1|       ohe_y1|idx_y2|       ohe_y2|
+---+------+---+---+---+---+------+-------------+------+-------------+------+-------------+------+-------------+
|  a| apple|  1|2.4|  1|yes|   1.0|(2,[1],[1.0])|   2.0|    (2,[],[])|   1.0|    (1,[],[])|   0.0|(1,[0],[1.0])|
|  a|orange|  1|2.5|  0| no|   1.0|(2,[1],[1.0])|   0.0|(2,[0],[1.0])|   0.0|(1,[0],[1.0])|   1.0|    (1,[],[])|
|  b|orange|  2|3.5|  1| no|   0.0|(2,[0],[1.0])|   0.0|(2,[0],[1.0])|   1.0|    (1,[],[])|   1.0|    (1,[],[])|
|  b|orange|  2|1.4|  0|yes|   0.0|(2,[0],[1.0])|   0.0|(2,[0],[1.0])|   0.0|(1,[0],[1.0])|   0.0|(1,[0],[1.0])|
|  b| peach|  2|2.1|  0|yes|   0.0|(2,[0],[1.0])|   1.0|(2,[1],[1.0])|   0.0|(1,[0],[1.0])|   0.0|(1,[0],[1.0])|
|  c| peach|  4|1.5|  1|yes|   2.0|    (2,[],[])|   1.0|(2,[1],[1.0])|   1.0|    (1,[],[])|   0.