In [None]:
#  Last amended: 30th June, 2021
#  Myfolder: /home/ashok/Documents/spark
# Ref: https://mingchen0919.github.io/learning-apache-spark/categorical-data.html
#      https://www.analyticsvidhya.com/blog/2016/10/spark-dataframe-and-operations/

# Extracting, transforming and selecting features
#  https://spark.apache.org/docs/latest/ml-features#extracting-transforming-and-selecting-features

# Objectives:
#            1. Dealing with categorical columns
#            2. Using StingIndexer, OneHotEncoder
#            3. Using VectorAssembler
#            4. Using StandardScaler
#            5. Using pipelines

In [None]:
## A. Create some data

# This data frame will be used to demonstrate how to use 
#                  a) StingIndexer,
#                  b) OneHotEncoder, 


# x1, x2 and y2 are categorical columns type strings.
# x3 and y1 are a categorical columns with integers.

# x4, x5 are numerical columns 


# 1.0
import pandas as pd

# 1.1
pdf = pd.DataFrame({
                    'x1': ['a','a','b','b', 'b', 'c', 'd','d'],
                    'x2': ['apple', 'orange', 'orange','orange', 'peach', 'peach','apple','orange'],
                    'x3': [1, 1, 2, 2, 2, 4, 1, 2],
                    'x4': [2.4, 2.5, 3.5, 1.4, 2.1,1.5, 3.0, 2.0],
                    'x5': [12.4, 22.5, 33.5, 11.4, 42.1,11.5, 23.0, 32.0],
                    'y1': [1, 0, 1, 0, 0, 1, 1, 0],
                    'y2': ['yes', 'no', 'no', 'yes', 'yes', 'yes', 'no', 'yes']
                   })  

In [None]:
# 1.2
df = spark.createDataFrame(pdf)
type(df)           # pyspark.sql.dataframe.DataFrame

In [None]:
# B. About DataFrame
# Ref: https://s3.amazonaws.com/assets.datacamp.com/blog_assets/PySpark_SQL_Cheat_Sheet_Python.pdf
# 2.0
df.show(3)          # Show data
df.head(3)
df.take(2)         # Show two rows
type(df.take(2))   # List of objects: pyspark.sql.types.Row
r = df.take(2)
r[0]               # First row
type(r[0])         # pyspark.sql.types.Row
df.describe().show()  # Summary statistics

### C. StringIndexer

#### [Syntax](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.feature.StringIndexer.html):
<i>class pyspark.ml.feature.StringIndexer(*, inputCol=None, outputCol=None, inputCols=None, outputCols=None, handleInvalid='error', stringOrderType='frequencyDesc')</i>

    A label indexer that maps a string column of labels to an ML column of label indices. If the input column is numeric, we cast it to string and index the string values. The indices are in [0, numLabels). By default, this is ordered by label frequencies so the most frequent label gets index 0. The ordering behavior is controlled by setting stringOrderType. Its default value is ‘frequencyDesc’.

StringIndexer maps a string column to an index column that will be treated as a categorical column by spark. <br>
The indices start with 0 and are ordered by label frequencies. If it is a numerical column, the column will first<br>
be casted to a string column and then indexed by StringIndexer.<br>
There are three steps to implement the StringIndexer<br>
-      Build the StringIndexer model: specify the input column and output column names.
-      Learn the StringIndexer model: fit the model with your data.
-      Execute the indexing: call the transform function to execute the indexing process.

In [None]:
# 3.0
from pyspark.ml.feature import StringIndexer

In [None]:
# 3.1
# build indexer. No need to specify dataframe here, just column names
#                inputCol and outputCol are not lists:

string_indexer = StringIndexer(inputCol='x1',
                               outputCol='indexed_x1'
                              )

In [None]:
# 3.2 Learn/fit the model on dataframe:

si_model = string_indexer.fit(df)

In [None]:
# 3.3 Transform the data to a new DataFrame:

df_si = si_model.transform(df)

In [None]:
# 3.4 Resulting df
#     From the result it can be seen that (a, b, c) in column x1 are converted to
#     (1.0, 0.0, 2.0). They are ordered by their frequencies in column x1.
#     Max freq value is coded as 0.

df_si.show(3)

### D. OneHotEncoder

<i>class pyspark.ml.feature.OneHotEncoder(*, inputCols=None, outputCols=None, handleInvalid='error', dropLast=True, inputCol=None, outputCol=None)</i>

    A one-hot encoder that maps a column of category indices to a column of binary vectors, with at most a single one-value per row that indicates the input category index. For example with 5 categories, an input value of 2.0 would map to an output vector of [0.0, 0.0, 1.0, 0.0]. The last category is not included by default (configurable via dropLast), because it makes the vector entries sum up to one, and hence linearly dependent. So an input value of 4.0 maps to [0.0, 0.0, 0.0, 0.0].

    When handleInvalid is configured to ‘keep’, an extra “category” indicating invalid values is added as last category. So when dropLast is true, invalid values are encoded as all-zeros vector.



One-hot encoding maps a categorical feature, represented as a label index,<br>
to a binary vector with at most a single one-value indicating the presence of<br>
a specific feature value from among the set of all feature values. This encoding<br>
allows algorithms which expect continuous features, such as Logistic Regression,<br>
to use categorical features. For string type input data, it is common to encode<br>
categorical features using StringIndexer first.<br>
OneHotEncoderEstimator can transform multiple columns, returning an <br>
one-hot-encoded output vector column for each input column. It is common to<br>
merge these vectors into a single feature vector using VectorAssembler.<br>


Each index is converted to a vector. However, in spark the vector is represented by a<br>
sparse vector, becase sparse vector can save a lot of memory.<br>
The process of using OneHotEncoder is different to using StingIndexer. <br>
There are only two steps.<br>
-    i) Build an indexer model
-    ii) Execute the indexing by calling transform

In [None]:
# 4.0
from pyspark.ml.feature import OneHotEncoder

In [None]:
# 4.1 Build OHEE.    Only specify the input/output columns.:
#                    Multiple columns can be specified:

onehotencoder = OneHotEncoder(
                               inputCols= ['indexed_x1'],
                                outputCols=['onehotencoded_x1']
                             )

In [None]:
# 4.2 Transform df_si DataFrame to df_dummy

model = onehotencoder.fit(df_si)
df_dummy = model.transform(df_si)

In [None]:
# 4.3 Resulting df
# (3,[0],[1.0])  => Vector length: 3, At 0th      position, value is 1	=  1 0 0 0
# (3,[1],[1.0])  => Vector length: 3, At 0th      position, value is 1	=  0 1 0 0
# (3,[2],[1.0])  => Vector length: 3, At second   position, value is 1	=  0 0 1 0
# (3,[],[])	 => Vector length: 3      At 3rd/last position, value is 1	=  0 0 0 1
# There is also:  0 0 0 0 . But for invalid values
# When parameter, 'dropLast' is true, invalid values are encoded as all-zeros vector.

df_dummy.show()

### Multiple columns handling

#### StringIndexing multiple cols

In [None]:
## E. Process all categorical columns with Pipeline
#     A Pipeline is a sequence of stages. A stage is an instance which has the property of either fit()
#      or transform(). When fitting a Pipeline, the stages get executed in order. The example below shows
#       how to use pipeline to process all categorical columns.

# 5. List all categorical columns:

categorical_cols = ['x1', 'x2', 'x3', 'y1', 'y2']

In [None]:
# 5.1 Out column names:

stg_out_cols = ["_".join(["indexed",c]) for c in categorical_cols]
stg_out_cols

In [None]:
# 5.2 StringIndex all columns at one go:

string_indexer = StringIndexer(
                               inputCols=categorical_cols,
                               outputCols= stg_out_cols
                              )

# 5.3 Learn/fit the model on dataframe:

si_model = string_indexer.fit(df)

# 5.4 Transform the data to a new DataFrame:

df_si = si_model.transform(df)
df_si.show()

#### OHE multiple columns
This will always follow StringIndexing

In [None]:
# 6.1 OneHotEncode all columns at one go
# 6.1.1 First OHE column names

ohe_out_cols = ['oheCoded_' + c  for c in categorical_cols]
ohe_out_cols

In [None]:
# 6.2 Build OHE.    Only specify the input/output columns.:
#                    Multiple columns can be specified:

onehotencoder = OneHotEncoder(
                               inputCols   = stg_out_cols,
                                outputCols = ohe_out_cols
                             )


# 6.3 Transform df_si DataFrame to df_dummy

model = onehotencoder.fit(df_si)
df_dummy = model.transform(df_si)

In [None]:
# 6.4 Show transformed output:

df_dummy.select(df_dummy.columns[:7]).show()
df_dummy.select(df_dummy.columns[7:12]).show()
df_dummy.select(df_dummy.columns[12:]).show()


### Pipeline simple


In [None]:
# 7.1
from pyspark.ml import Pipeline

In [None]:
# 7.2 Create stages of pipeline operations:

p=Pipeline(
           stages=
                   [
                    StringIndexer(
                                  inputCols= categorical_cols,
                                  outputCols= stg_out_cols
                                  ),
                    
                    
                    OneHotEncoder(
                                   inputCols= stg_out_cols,
                                   outputCols= ohe_out_cols
                                  )  
        
                    ]

            )

In [None]:
# 7.3 Execute pipe
dx = p.fit(df).transform(df)

In [None]:
# 7.4 Show transformed output:

dx.select(dx.columns[:7]).show()
dx.select(dx.columns[7:12]).show()
dx.select(dx.columns[12:]).show()

### Cleaning up

In [None]:
# 8.0 Remove categorical columns and StringIndexer cols. Only keep OHE columns:

fc = list(
           set(df.columns) - set(categorical_cols)
         )

# 8.1 Append OHE columns
fc + ohe_out_cols

In [None]:
# 8.2 Just these columns and none others
dx.select(fc+ohe_out_cols).show()
dx = dx.select(fc+ohe_out_cols)

## VectorAssembler

VectorAssembler is a transformer that combines a given list of columns into a single vector column. It is useful for combining raw features and features generated by different feature transformers into a single feature vector, in order to train ML models like logistic regression and decision trees. VectorAssembler accepts the following input column types: all numeric types, boolean type, and vector type. In each row, the values of the input columns will be concatenated into a vector in the specified order.

Examples

Assume that we have a DataFrame with the columns id, hour, mobile, userFeatures, and clicked:

 id | hour | mobile | userFeatures     | clicked
----|------|--------|------------------|---------
 0  | 18   | 1.0    | [0.0, 10.0, 0.5] | 1.0

userFeatures is a vector column that contains three user features. We want to combine hour, mobile, and userFeatures into a single feature vector called features and use it to predict clicked or not. If we set VectorAssembler’s input columns to hour, mobile, and userFeatures and output column to features, after transformation we should get the following DataFrame:

 id | hour | mobile | userFeatures     | clicked | features
----|------|--------|------------------|---------|-----------------------------
 0  | 18   | 1.0    | [0.0, 10.0, 0.5] | 1.0     | [18.0, 1.0, 0.0, 10.0, 0.5]


In [None]:
# 9.0
from pyspark.ml.feature import VectorAssembler

In [None]:
# 9.1 Using vectorassembler

# 9.1 Create object
#     Input cols are OHE columns + numerical columns
#     It excludes 'target'
#     Generally output col name is 'features'

vc_demo = VectorAssembler(
                          inputCols = fc+ ohe_out_cols,
                          outputCol = 'features'
                         )

# 9.2 vc_demo does not have 'fit' method
#     only transform() is available
#     So transform the data:

df_trans_vc = vc_demo.transform(dx)
df_trans_vc.show()


In [None]:
# 9.3 We add VectorAssembler object to pipe

p=Pipeline(
           stages=
                   [
                    StringIndexer(
                                  inputCols= categorical_cols,
                                  outputCols= stg_out_cols
                                  ),
                    
                    
                    OneHotEncoder(
                                   inputCols= stg_out_cols,
                                   outputCols= ohe_out_cols
                                  ),
                       
                     VectorAssembler(
                                      inputCols = fc+ ohe_out_cols,
                                      outputCol = 'features'
                                    )  
        
                    ]

            )

In [None]:
# 9.4 Execute pipe
model_pipe = p.fit(df)

# 9.4.1
df_trans = model_pipe.transform(df)                       

# 9.4.2
df_trans.show(truncate = False)


In [None]:
df_trans.columns

In [None]:
# 9.4.3
df_trans.select("features").show()

In [None]:
# 9.4.4 Reverse engineering
# Convert Vector to DataFrame
# Slightly complicated

from pyspark.ml.functions import vector_to_array
re = df_trans.withColumn("myfeatures", vector_to_array("features"))
re.show()
re.dtypes

# 3.1
from pyspark.sql.functions import col
dt = re.select(col("myfeatures")[0].alias("x1"),col("myfeatures")[1].alias("x2"))
dt.show()


### Standardization
Refer [here](https://spark.apache.org/docs/latest/ml-features#standardscaler)

In [None]:
# 10.0
from pyspark.ml.feature import StandardScaler

In [None]:
# 10.1 Instantiate scaler class
normalizer = StandardScaler(
                              inputCol="features",
                              outputCol="scaledFeatures",
                            )

In [None]:
# 10.2 Normalize 'features' column
ss_model = normalizer.fit(df_trans)
df_ss = ss_model.transform(df_trans)

In [None]:
# 10.3 Our data columns
df_ss.columns

In [None]:
# 10.4 Compare existing and transformed features:

df_ss.select('features').show(1,truncate=False)
df_ss.select('scaledFeatures').show(1,truncate=False)

### Complete pipe

In [None]:
# 11.0 We add StandardScaler to pipe:

p=Pipeline(
           stages=
                   [
                    StringIndexer(
                                  inputCols= categorical_cols,
                                  outputCols= stg_out_cols
                                  ),
                    
                    
                    OneHotEncoder(
                                   inputCols= stg_out_cols,
                                   outputCols= ohe_out_cols
                                  ),
                       
                     VectorAssembler(
                                      inputCols = fc+ ohe_out_cols,
                                      outputCol = 'features'
                                    ),
                       
                      StandardScaler(
                                      inputCol = 'features',
                                      outputCol = 'scaledFeatures'
                                     )  
        
                    ]

            )

In [None]:
# 11.1 Execute pipe
model_pipe = p.fit(df)

# 11.2
df_trans = model_pipe.transform(df)           

In [None]:
# 11.3 Show transformed output:

df_trans.select(df_trans.columns[:7]).show()
df_trans.select(df_trans.columns[7:12]).show()
df_trans.select(df_trans.columns[12:17]).show()
df_trans.select(df_trans.columns[17:]).show()


In [None]:
################ I am done ##################

In [None]:
# 12 Reverse engineering
#    Convert Vector to DataFrame
#    Slightly complicated

from pyspark.ml.functions import vector_to_array
re = df_trans.withColumn("myfeatures", vector_to_array("features"))
re.show()
re.dtypes

# 3.1
from pyspark.sql.functions import col
dt = re.select(col("myfeatures")[0].alias("x1"),col("myfeatures")[1].alias("x2"))
dt.show()


In [None]:

#################