In [1]:
import findspark
findspark.init()
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

In [2]:
sc = SparkContext(conf=SparkConf())
spark = SparkSession(sparkContext=sc)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/02/27 17:06:07 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/02/27 17:06:08 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


# Example data

In [3]:
import pandas as pd
pdf = pd.DataFrame({
    'x1': ['a','a','b','b', 'b', 'c'],
    'x2': ['apple', 'orange', 'orange','orange', 'peach', 'peach'],
    'x3': [1, 1, 2, 2, 2, 4],
    'x4': [2.4, 2.5, 3.5, 1.4, 2.1,1.5],
    'y1': [1, 0, 1, 0, 0, 1],
    'y2': ['yes', 'no', 'no', 'yes', 'yes', 'yes']
    })
df = spark.createDataFrame(pdf)
df.show()

                                                                                

+---+------+---+---+---+---+
| x1|    x2| x3| x4| y1| y2|
+---+------+---+---+---+---+
|  a| apple|  1|2.4|  1|yes|
|  a|orange|  1|2.5|  0| no|
|  b|orange|  2|3.5|  1| no|
|  b|orange|  2|1.4|  0|yes|
|  b| peach|  2|2.1|  0|yes|
|  c| peach|  4|1.5|  1|yes|
+---+------+---+---+---+---+



# Pipeline
Pipeline is a sequence of stages which consists of **Estimators** and/or **Transformers**.
**Estimator** has **fit** method and **Transformer** has **transform** method. Therefore, we can say,
**a pipeline is a sequence of fit and transform methods.**
When it is a **fit** method, it applies to the input data and turns into a **transform** method. 
Then the **transform** method applies to the **fitted** data and output **transformed** data.
**The transformed data output from previous stage has to be an acceptable input to the next stage's fit/tranform method.**

In [4]:
# spark pipeline
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder

# Example
We are going to use pipeline to StringIndex columns x1, x2, y1, and y2.
Then we OneHotEncode the resulting StringIndexed columns.

**StringIndexer** is used to convert catgorical string columns into numerical indices
inputcol is the column to be indexed, outputCol is the column that will store the indexed values(prefixed with idx_)

In [5]:
# create a list of StringIndexer stages for each column in the pipeline.
stringindex_stages = [StringIndexer(inputCol=c, outputCol='idx_' + c) for c in ['x1', 'x2', 'y1', 'y2' ]]
stringindex_stages

[StringIndexer_910f073d56e0,
 StringIndexer_506df0d5cf9f,
 StringIndexer_ca05a2ab046c,
 StringIndexer_5797c224d37e]

**OneHotEncoder** creates a binary vector for each category in the indexed column

In [6]:
# perform one-hot encoding on the indexed columns(generated by StringIndexer)
onehotencode_stages = [OneHotEncoder(inputCol='idx_' + c, outputCol='ohe_' + c) for c in ['x1', 'x2', 'y1', 'y2' ]]
onehotencode_stages

[OneHotEncoder_b010c734ce37,
 OneHotEncoder_b6766cd86ef4,
 OneHotEncoder_f39a10fa09f2,
 OneHotEncoder_85123b739ca9]

Note that the outputCol label in StringIndex stages is the same as the **inputCol label** in the OneHotEncode stages.

# Elements in the stage list

In [13]:
# combine the StringIndexer and OneHotEncoder stages into one list
all_stages = stringindex_stages + onehotencode_stages
[type(x) for x in all_stages]

[pyspark.ml.feature.StringIndexer,
 pyspark.ml.feature.StringIndexer,
 pyspark.ml.feature.StringIndexer,
 pyspark.ml.feature.StringIndexer,
 pyspark.ml.feature.OneHotEncoder,
 pyspark.ml.feature.OneHotEncoder,
 pyspark.ml.feature.OneHotEncoder,
 pyspark.ml.feature.OneHotEncoder]

In the above list, **pyspark.ml.feature.StringIndexer** is an **Estimator**(has a fit method) and 
**pyspark.ml.feature.OneHotEncoder** is a **transformer**(has a transform method).

# Build and run pipeline
build a pipeline that uses the stages

In [8]:
Pipeline(stages=all_stages).fit(df).transform(df).show()

                                                                                

+---+------+---+---+---+---+------+------+------+------+-------------+-------------+-------------+-------------+
| x1|    x2| x3| x4| y1| y2|idx_x1|idx_x2|idx_y1|idx_y2|       ohe_x1|       ohe_x2|       ohe_y1|       ohe_y2|
+---+------+---+---+---+---+------+------+------+------+-------------+-------------+-------------+-------------+
|  a| apple|  1|2.4|  1|yes|   1.0|   2.0|   1.0|   0.0|(2,[1],[1.0])|    (2,[],[])|    (1,[],[])|(1,[0],[1.0])|
|  a|orange|  1|2.5|  0| no|   1.0|   0.0|   0.0|   1.0|(2,[1],[1.0])|(2,[0],[1.0])|(1,[0],[1.0])|    (1,[],[])|
|  b|orange|  2|3.5|  1| no|   0.0|   0.0|   1.0|   1.0|(2,[0],[1.0])|(2,[0],[1.0])|    (1,[],[])|    (1,[],[])|
|  b|orange|  2|1.4|  0|yes|   0.0|   0.0|   0.0|   0.0|(2,[0],[1.0])|(2,[0],[1.0])|(1,[0],[1.0])|(1,[0],[1.0])|
|  b| peach|  2|2.1|  0|yes|   0.0|   1.0|   0.0|   0.0|(2,[0],[1.0])|(2,[1],[1.0])|(1,[0],[1.0])|(1,[0],[1.0])|
|  c| peach|  4|1.5|  1|yes|   2.0|   1.0|   1.0|   0.0|    (2,[],[])|(2,[1],[1.0])|    (1,[],[]

# Reorder pipeline stages
In the example above, out strategy is to StringIndex all four columns and then OneHotEncode them.
Since each OneHotEncode stage only depends on the output of their corresponding **StringIndex** stage,
our stages list could be **[stringindexer on x1, onehotencoder on x1, stringindexer on x2, onehotencoder on x2, stringindexer on y1, onehotencoder on y1, stringindexer on y2, onehotencoder on y2]**.

## Old stages

In [9]:
all_stages

[StringIndexer_910f073d56e0,
 StringIndexer_506df0d5cf9f,
 StringIndexer_ca05a2ab046c,
 StringIndexer_5797c224d37e,
 OneHotEncoder_b010c734ce37,
 OneHotEncoder_b6766cd86ef4,
 OneHotEncoder_f39a10fa09f2,
 OneHotEncoder_85123b739ca9]

## New stages

In [10]:
# reorder the stages in a more logical order to make the encoding after indexing
new_all_stages = [all_stages[x] for x in [0,4,1,5,2,6,3,7]]
new_all_stages

[StringIndexer_910f073d56e0,
 OneHotEncoder_b010c734ce37,
 StringIndexer_506df0d5cf9f,
 OneHotEncoder_b6766cd86ef4,
 StringIndexer_ca05a2ab046c,
 OneHotEncoder_f39a10fa09f2,
 StringIndexer_5797c224d37e,
 OneHotEncoder_85123b739ca9]

## Build and run pipeline

In [12]:
# create a new pipeline with the reordered stages and run
# pipeline helps in chaining multiple stages (transformations) for streamlined data processing.
Pipeline(stages=new_all_stages).fit(df).transform(df).show()

                                                                                

+---+------+---+---+---+---+------+-------------+------+-------------+------+-------------+------+-------------+
| x1|    x2| x3| x4| y1| y2|idx_x1|       ohe_x1|idx_x2|       ohe_x2|idx_y1|       ohe_y1|idx_y2|       ohe_y2|
+---+------+---+---+---+---+------+-------------+------+-------------+------+-------------+------+-------------+
|  a| apple|  1|2.4|  1|yes|   1.0|(2,[1],[1.0])|   2.0|    (2,[],[])|   1.0|    (1,[],[])|   0.0|(1,[0],[1.0])|
|  a|orange|  1|2.5|  0| no|   1.0|(2,[1],[1.0])|   0.0|(2,[0],[1.0])|   0.0|(1,[0],[1.0])|   1.0|    (1,[],[])|
|  b|orange|  2|3.5|  1| no|   0.0|(2,[0],[1.0])|   0.0|(2,[0],[1.0])|   1.0|    (1,[],[])|   1.0|    (1,[],[])|
|  b|orange|  2|1.4|  0|yes|   0.0|(2,[0],[1.0])|   0.0|(2,[0],[1.0])|   0.0|(1,[0],[1.0])|   0.0|(1,[0],[1.0])|
|  b| peach|  2|2.1|  0|yes|   0.0|(2,[0],[1.0])|   1.0|(2,[1],[1.0])|   0.0|(1,[0],[1.0])|   0.0|(1,[0],[1.0])|
|  c| peach|  4|1.5|  1|yes|   2.0|    (2,[],[])|   1.0|(2,[1],[1.0])|   1.0|    (1,[],[])|   0.