In [1]:
#  Last amended: 15/08/2019
#  Myfolder: /home/ashok/Documents/spark
# Ref: https://mingchen0919.github.io/learning-apache-spark/categorical-data.html
#      https://www.analyticsvidhya.com/blog/2016/10/spark-dataframe-and-operations/


# Objectives:
#            1. Dealing with categorical columns
#            2. Using StingIndexer, OneHotEncoderEstimator, VectorAssember

In [2]:
## A. Create some data

# This data frame will be used to demonstrate how to use 
#                  a) StingIndexer,
#                  b) OneHotEncoderEstimator, 
#                  c) VectorAssember

# x1 and x2 are categorical columns type strings.
# x3 is a categorical column with integers.
# x4 is a numerical column. 
# y1 is a categorical column type integer.
# y2 is a column of type string. 

# 1.0
import pandas as pd

# 1.1
pdf = pd.DataFrame({
                    'x1': ['a','a','b','b', 'b', 'c', 'd','d'],
                    'x2': ['apple', 'orange', 'orange','orange', 'peach', 'peach','apple','orange'],
                    'x3': [1, 1, 2, 2, 2, 4, 1, 2],
                    'x4': [2.4, 2.5, 3.5, 1.4, 2.1,1.5, 3.0, 2.0],
                    'y1': [1, 0, 1, 0, 0, 1, 1, 0],
                    'y2': ['yes', 'no', 'no', 'yes', 'yes', 'yes', 'no', 'yes']
                   })  

In [3]:
# 1.2
df = spark.createDataFrame(pdf)
type(df)           # pyspark.sql.dataframe.DataFrame

pyspark.sql.dataframe.DataFrame

In [4]:
# B. About DataFrame
# Ref: https://s3.amazonaws.com/assets.datacamp.com/blog_assets/PySpark_SQL_Cheat_Sheet_Python.pdf
# 2.0
df.show(3)          # Show data
df.head(3)
df.take(2)         # Show two rows
type(df.take(2))   # List of objects: pyspark.sql.types.Row
r = df.take(2)
r[0]               # First row
type(r[0])         # pyspark.sql.types.Row
df.describe().show()  # Summary statistics

+---+------+---+---+---+---+
| x1|    x2| x3| x4| y1| y2|
+---+------+---+---+---+---+
|  a| apple|  1|2.4|  1|yes|
|  a|orange|  1|2.5|  0| no|
|  b|orange|  2|3.5|  1| no|
+---+------+---+---+---+---+
only showing top 3 rows

+-------+----+-----+------------------+------------------+------------------+----+
|summary|  x1|   x2|                x3|                x4|                y1|  y2|
+-------+----+-----+------------------+------------------+------------------+----+
|  count|   8|    8|                 8|                 8|                 8|   8|
|   mean|null| null|             1.875|               2.3|               0.5|null|
| stddev|null| null|0.9910312089651149|0.7131419413913535|0.5345224838248488|null|
|    min|   a|apple|                 1|               1.4|                 0|  no|
|    max|   d|peach|                 4|               3.5|                 1| yes|
+-------+----+-----+------------------+------------------+------------------+----+



### C. StringIndexer

StringIndexer maps a string column to a index column that will be treated as a categorical column by spark. <br>
The indices start with 0 and are ordered by label frequencies. If it is a numerical column, the column will first<br>
be casted to a string column and then indexed by StringIndexer.<br>
There are three steps to implement the StringIndexer<br>
-      Build the StringIndexer model: specify the input column and output column names.
-      Learn the StringIndexer model: fit the model with your data.
-      Execute the indexing: call the transform function to execute the indexing process.

In [5]:
# 3.0
from pyspark.ml.feature import StringIndexer

In [6]:
# 3.1
# build indexer. No need to specify dataframe here, just column names
#                               inputCol and outputCol are not lists:

string_indexer = StringIndexer(inputCol='x1',
                               outputCol='indexed_x1'
                              )

In [7]:
# 3.2 Learn/fit the model on dataframe:

si_model = string_indexer.fit(df)

In [8]:
# 3.3 Transform the data to a new DataFrame:

df_si = si_model.transform(df)

In [9]:
# 3.4 Resulting df
#     From the result it can be seen that (a, b, c) in column x1 are converted to
#     (1.0, 0.0, 2.0). They are ordered by their frequencies in column x1.
#     Max freq value is coded as 0.

df_si.show(20)

+---+------+---+---+---+---+----------+
| x1|    x2| x3| x4| y1| y2|indexed_x1|
+---+------+---+---+---+---+----------+
|  a| apple|  1|2.4|  1|yes|       2.0|
|  a|orange|  1|2.5|  0| no|       2.0|
|  b|orange|  2|3.5|  1| no|       0.0|
|  b|orange|  2|1.4|  0|yes|       0.0|
|  b| peach|  2|2.1|  0|yes|       0.0|
|  c| peach|  4|1.5|  1|yes|       3.0|
|  d| apple|  1|3.0|  1| no|       1.0|
|  d|orange|  2|2.0|  0|yes|       1.0|
+---+------+---+---+---+---+----------+



### D. OneHotEncoderEstimator

One-hot encoding maps a categorical feature, represented as a label index,<br>
to a binary vector with at most a single one-value indicating the presence of<br>
a specific feature value from among the set of all feature values. This encoding<br>
allows algorithms which expect continuous features, such as Logistic Regression,<br>
to use categorical features. For string type input data, it is common to encode<br>
categorical features using StringIndexer first.<br>
OneHotEncoderEstimator can transform multiple columns, returning an <br>
one-hot-encoded output vector column for each input column. It is common to<br>
merge these vectors into a single feature vector using VectorAssembler.<br>


Each index is converted to a vector. However, in spark the vector is represented by a<br>
sparse vector, becase sparse vector can save a lot of memory.<br>
The process of using OneHotEncoder is different to using StingIndexer. <br>
There are only two steps.<br>
-    i) Build an indexer model
-    ii) Execute the indexing by calling transform



In [10]:
# 4.0
from pyspark.ml.feature import OneHotEncoderEstimator

In [11]:
# 4.1 Build OHEE.    Only specify the input/output columns.:
#                    Multiple columns can be specified:

onehotencoder = OneHotEncoderEstimator(
                                       inputCols= ['indexed_x1'],
                                       outputCols=['onehotencoded_x1']
                                       )

In [12]:
# 4.2 Transform df_si DataFrame to df_dummy

model = onehotencoder.fit(df_si)
df_dummy = model.transform(df_si)

In [13]:
# 4.3 Resulting df
# (3,[2],[1.0])  => Vector length: 3, At second   position, value is 1	=   0 1 0 0
# (3,[0],[1.0])  => Vector length: 3, At 0th      position, value is 1	=  1 0 0 0	
#  (3,[],[])	 => Vector length: 3  At 3rd/last position, value is 1	=  0 0 0 1	

df_dummy.show()

+---+------+---+---+---+---+----------+----------------+
| x1|    x2| x3| x4| y1| y2|indexed_x1|onehotencoded_x1|
+---+------+---+---+---+---+----------+----------------+
|  a| apple|  1|2.4|  1|yes|       2.0|   (3,[2],[1.0])|
|  a|orange|  1|2.5|  0| no|       2.0|   (3,[2],[1.0])|
|  b|orange|  2|3.5|  1| no|       0.0|   (3,[0],[1.0])|
|  b|orange|  2|1.4|  0|yes|       0.0|   (3,[0],[1.0])|
|  b| peach|  2|2.1|  0|yes|       0.0|   (3,[0],[1.0])|
|  c| peach|  4|1.5|  1|yes|       3.0|       (3,[],[])|
|  d| apple|  1|3.0|  1| no|       1.0|   (3,[1],[1.0])|
|  d|orange|  2|2.0|  0|yes|       1.0|   (3,[1],[1.0])|
+---+------+---+---+---+---+----------+----------------+



In [24]:
## E. Process all categorical columns with Pipeline
#     A Pipeline is a sequence of stages. A stage is an instance which has the property of either fit()
#      or transform(). When fitting a Pipeline, the stages get executed in order. The example below shows
#       how to use pipeline to process all categorical columns.

# 5. List all categorical columns
categorical_columns = ['x1', 'x2', 'x3']

In [25]:
##=== build stages ======
# 5.1
stringindexer_stages = [StringIndexer(inputCol=c, outputCol='stringindexed_' + c) for c in categorical_columns]

In [29]:
stringindexer_stages

[StringIndexer_7afc5a94c664,
 StringIndexer_23ec52d81953,
 StringIndexer_df2f7af6bd99]

In [26]:
# 5.2
in_cols = ['stringindexed_' + c for c in categorical_columns]
out_cols = ['onehotencoded_' + c  for c in categorical_columns]
onehotencoder_stages = [OneHotEncoderEstimator(inputCols=in_cols, outputCols=out_cols)]

In [27]:
# 5.3
all_stages = stringindexer_stages + onehotencoder_stages

In [28]:
all_stages

[StringIndexer_7afc5a94c664,
 StringIndexer_23ec52d81953,
 StringIndexer_df2f7af6bd99,
 OneHotEncoderEstimator_2128f5d440d2]

In [18]:
## 5.4 Build pipeline model
# 5.4.1

from pyspark.ml import Pipeline

In [19]:
# 5.4.2

pipeline = Pipeline(stages=all_stages)

In [20]:
## 5.5 Fit pipeline model

pipeline_mode = pipeline.fit(df)

In [21]:
## 5.6 Transform data

df_coded = pipeline_mode.transform(df)
df_coded.columns

['x1',
 'x2',
 'x3',
 'x4',
 'y1',
 'y2',
 'stringindexed_x1',
 'stringindexed_x2',
 'stringindexed_x3',
 'onehotencoded_x1',
 'onehotencoded_x2',
 'onehotencoded_x3']

In [22]:
## 6. Remove uncoded columns

selected_columns = ['onehotencoded_' + c for c in categorical_columns] + ['x4', 'y1', 'y2']
df_coded = df_coded.select(selected_columns)
df_coded.show()

+----------------+----------------+----------------+---+---+---+
|onehotencoded_x1|onehotencoded_x2|onehotencoded_x3| x4| y1| y2|
+----------------+----------------+----------------+---+---+---+
|   (3,[2],[1.0])|       (2,[],[])|   (2,[1],[1.0])|2.4|  1|yes|
|   (3,[2],[1.0])|   (2,[0],[1.0])|   (2,[1],[1.0])|2.5|  0| no|
|   (3,[0],[1.0])|   (2,[0],[1.0])|   (2,[0],[1.0])|3.5|  1| no|
|   (3,[0],[1.0])|   (2,[0],[1.0])|   (2,[0],[1.0])|1.4|  0|yes|
|   (3,[0],[1.0])|   (2,[1],[1.0])|   (2,[0],[1.0])|2.1|  0|yes|
|       (3,[],[])|   (2,[1],[1.0])|       (2,[],[])|1.5|  1|yes|
|   (3,[1],[1.0])|       (2,[],[])|   (2,[1],[1.0])|3.0|  1| no|
|   (3,[1],[1.0])|   (2,[0],[1.0])|   (2,[0],[1.0])|2.0|  0|yes|
+----------------+----------------+----------------+---+---+---+



In [23]:
###################################################
# 7 The above pipline code is equivalent to following:

cat_cols = ['x1', 'x2', 'x3']
stages = []
in_cols = []
out_cols = []

for i in cat_cols:
    si = StringIndexer(inputCol=i, outputCol = i + "index")
    in_cols.append(i+"index")
    out_cols.append(i+"trans")
    stages.append(si)

ohe = OneHotEncoderEstimator(inputCols=in_cols, outputCols=out_cols)
stages.append(ohe)
stages
pipeline = Pipeline(stages=stages)
pipeline_mode = pipeline.fit(df)
df_coded1 = pipeline_mode.transform(df)
df_coded1.columns
df_coded1.show(1)

selected_columns = [i+"trans" for i in cat_cols] + ['x4', 'y1', 'y2']
rest = df_coded1.select(selected_columns)
rest.show()
################################################
 	

+---+-----+---+---+---+---+-------+-------+-------+-------------+---------+-------------+
| x1|   x2| x3| x4| y1| y2|x1index|x2index|x3index|      x1trans|  x2trans|      x3trans|
+---+-----+---+---+---+---+-------+-------+-------+-------------+---------+-------------+
|  a|apple|  1|2.4|  1|yes|    2.0|    2.0|    1.0|(3,[2],[1.0])|(2,[],[])|(2,[1],[1.0])|
+---+-----+---+---+---+---+-------+-------+-------+-------------+---------+-------------+
only showing top 1 row

+-------------+-------------+-------------+---+---+---+
|      x1trans|      x2trans|      x3trans| x4| y1| y2|
+-------------+-------------+-------------+---+---+---+
|(3,[2],[1.0])|    (2,[],[])|(2,[1],[1.0])|2.4|  1|yes|
|(3,[2],[1.0])|(2,[0],[1.0])|(2,[1],[1.0])|2.5|  0| no|
|(3,[0],[1.0])|(2,[0],[1.0])|(2,[0],[1.0])|3.5|  1| no|
|(3,[0],[1.0])|(2,[0],[1.0])|(2,[0],[1.0])|1.4|  0|yes|
|(3,[0],[1.0])|(2,[1],[1.0])|(2,[0],[1.0])|2.1|  0|yes|
|    (3,[],[])|(2,[1],[1.0])|    (2,[],[])|1.5|  1|yes|
|(3,[1],[1.0])|    (2,