In [1]:
import findspark
findspark.init()
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
sc=SparkContext()
spark = SparkSession(sparkContext=sc)

# Example data

In [2]:
import pandas as pd
pdf = pd.DataFrame({
        'x1': ['a','a','b','b', 'b', 'c'],
        'x2': ['apple', 'orange', 'orange','orange', 'peach', 'peach'],
        'x3': [1, 1, 2, 2, 2, 4],
        'x4': [2.4, 2.5, 3.5, 1.4, 2.1,1.5],
        'y1': [1, 0, 1, 0, 0, 1],
        'y2': ['yes', 'no', 'no', 'yes', 'yes', 'yes']
    })
df = spark.createDataFrame(pdf)
df.show()

+---+------+---+---+---+---+
| x1|    x2| x3| x4| y1| y2|
+---+------+---+---+---+---+
|  a| apple|  1|2.4|  1|yes|
|  a|orange|  1|2.5|  0| no|
|  b|orange|  2|3.5|  1| no|
|  b|orange|  2|1.4|  0|yes|
|  b| peach|  2|2.1|  0|yes|
|  c| peach|  4|1.5|  1|yes|
+---+------+---+---+---+---+



## Pipeline
Pipeline is a sequence of stages which consists of **Estimators** and/or **Transformers**. **Estimator** has fit method and **Transformer** has transform method. Therefore, we can say, **a pipeline is a sequence of fit and transform methods**. When it is a fit method, it applies to the input data and turns into a transform method. Then the **transform** method applies to the fitted data and output transformed data. The **transformed data output from previous stage has to be an acceptable input to the next stage's fit/transform method**.
## Example

In [3]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder

In [4]:
stringindex_stage = [StringIndexer(inputCol=c,outputCol="idx_"+c) for c in ['x1',"x2","y1","y2"]
                    ]
stringindex_stage

[StringIndexer_fa414fa1b34e,
 StringIndexer_3bbd7e96bba5,
 StringIndexer_da182a06eeb1,
 StringIndexer_def9f36fae62]

In [5]:
onehotencoder_stage = [OneHotEncoder(inputCol='idx_'+c,outputCol='ohe_'+c) for c in ['x1','x2','y1','y2']]
onehotencoder_stage

[OneHotEncoder_db01a6c374b5,
 OneHotEncoder_17ae299f2c53,
 OneHotEncoder_eafeefaf5ebd,
 OneHotEncoder_6d500081672c]

# ELements in the stage list

In [6]:
all_stages = stringindex_stage+onehotencoder_stage
[type(x) for x in all_stages]

[pyspark.ml.feature.StringIndexer,
 pyspark.ml.feature.StringIndexer,
 pyspark.ml.feature.StringIndexer,
 pyspark.ml.feature.StringIndexer,
 pyspark.ml.feature.OneHotEncoder,
 pyspark.ml.feature.OneHotEncoder,
 pyspark.ml.feature.OneHotEncoder,
 pyspark.ml.feature.OneHotEncoder]

# Build and run pipeline

In [7]:
Pipeline(stages=all_stages).fit(df).transform(df).show()

+---+------+---+---+---+---+------+------+------+------+-------------+-------------+-------------+-------------+
| x1|    x2| x3| x4| y1| y2|idx_x1|idx_x2|idx_y1|idx_y2|       ohe_x1|       ohe_x2|       ohe_y1|       ohe_y2|
+---+------+---+---+---+---+------+------+------+------+-------------+-------------+-------------+-------------+
|  a| apple|  1|2.4|  1|yes|   1.0|   2.0|   1.0|   0.0|(2,[1],[1.0])|    (2,[],[])|    (1,[],[])|(1,[0],[1.0])|
|  a|orange|  1|2.5|  0| no|   1.0|   0.0|   0.0|   1.0|(2,[1],[1.0])|(2,[0],[1.0])|(1,[0],[1.0])|    (1,[],[])|
|  b|orange|  2|3.5|  1| no|   0.0|   0.0|   1.0|   1.0|(2,[0],[1.0])|(2,[0],[1.0])|    (1,[],[])|    (1,[],[])|
|  b|orange|  2|1.4|  0|yes|   0.0|   0.0|   0.0|   0.0|(2,[0],[1.0])|(2,[0],[1.0])|(1,[0],[1.0])|(1,[0],[1.0])|
|  b| peach|  2|2.1|  0|yes|   0.0|   1.0|   0.0|   0.0|(2,[0],[1.0])|(2,[1],[1.0])|(1,[0],[1.0])|(1,[0],[1.0])|
|  c| peach|  4|1.5|  1|yes|   2.0|   1.0|   1.0|   0.0|    (2,[],[])|(2,[1],[1.0])|    (1,[],[]

# Reorder pipeline stages
In the example above, our strategy is to StringIndex all four columns and then OneHotEncode them. Since each OneHotEncode stage only depends on the output of their corresponding StringIndex stage, our stages list could be [stringindexer on x1, onehotencoder on x1, stringindexer on x2, onehotencoder on x2, stringindexer on y1, onehotencoder on y1, stringindexer on y2, onehotencoder on y2].

## Old Stages

In [8]:
all_stages

[StringIndexer_fa414fa1b34e,
 StringIndexer_3bbd7e96bba5,
 StringIndexer_da182a06eeb1,
 StringIndexer_def9f36fae62,
 OneHotEncoder_db01a6c374b5,
 OneHotEncoder_17ae299f2c53,
 OneHotEncoder_eafeefaf5ebd,
 OneHotEncoder_6d500081672c]

# New Stages

In [9]:
new_all_stages = [all_stages[x] for x in [0,4,1,5,2,6,3,7]]
new_all_stages

[StringIndexer_fa414fa1b34e,
 OneHotEncoder_db01a6c374b5,
 StringIndexer_3bbd7e96bba5,
 OneHotEncoder_17ae299f2c53,
 StringIndexer_da182a06eeb1,
 OneHotEncoder_eafeefaf5ebd,
 StringIndexer_def9f36fae62,
 OneHotEncoder_6d500081672c]

# Build and run pipeline

In [10]:
Pipeline(stages=new_all_stages).fit(df).transform(df).show()

+---+------+---+---+---+---+------+-------------+------+-------------+------+-------------+------+-------------+
| x1|    x2| x3| x4| y1| y2|idx_x1|       ohe_x1|idx_x2|       ohe_x2|idx_y1|       ohe_y1|idx_y2|       ohe_y2|
+---+------+---+---+---+---+------+-------------+------+-------------+------+-------------+------+-------------+
|  a| apple|  1|2.4|  1|yes|   1.0|(2,[1],[1.0])|   2.0|    (2,[],[])|   1.0|    (1,[],[])|   0.0|(1,[0],[1.0])|
|  a|orange|  1|2.5|  0| no|   1.0|(2,[1],[1.0])|   0.0|(2,[0],[1.0])|   0.0|(1,[0],[1.0])|   1.0|    (1,[],[])|
|  b|orange|  2|3.5|  1| no|   0.0|(2,[0],[1.0])|   0.0|(2,[0],[1.0])|   1.0|    (1,[],[])|   1.0|    (1,[],[])|
|  b|orange|  2|1.4|  0|yes|   0.0|(2,[0],[1.0])|   0.0|(2,[0],[1.0])|   0.0|(1,[0],[1.0])|   0.0|(1,[0],[1.0])|
|  b| peach|  2|2.1|  0|yes|   0.0|(2,[0],[1.0])|   1.0|(2,[1],[1.0])|   0.0|(1,[0],[1.0])|   0.0|(1,[0],[1.0])|
|  c| peach|  4|1.5|  1|yes|   2.0|    (2,[],[])|   1.0|(2,[1],[1.0])|   1.0|    (1,[],[])|   0.