# Essentials of Feature Engineering in pyspark - Part I

Data preprocessing in Spark

The most commanly used data preprocessing techniques in Spark approaches are as follows 
  
  1) VectorAssembler

  2)Bucketing

3)Scaling and normalization

 a) StandardScaler

 b) MinMAxScaler

 c) MaxAbsScaler

 d) Elementwise Product

 e) Normalizer

4) Working with categorical features

a) StringIndexer

b) Converting Indexed values back to text

c) Indexing in vectors

d) One-hot encoding

5) Text data transformers

a) tokenizing text

b) Removing common words

c) Creating word combinations

d) Converting words into numerical representations

e) Tf-Idf

f) Word2Vec

6) Feature Manipulation

7) PCA

In [6]:
# Initializing a Spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("Word Count").config("spark.some.config.option","some-value").getOrCreate()

Downloading the dataset
For the purpose of this demonstration we will be using three different datasets

1) retail-data/by-day

2) simple-ml-integers

3) simple-ml

4) simple-ml-scaling

The datasets can be downloaded from this link https://github.com/databricks/Spark-The-Definitive-Guide

In [7]:
# # Lets us begin by reading the "retail-data/by-day" which is in .csv format
# sales = spark.read.format("csv") \ # here we space the format of the file we intend to read
#         .option("header","true") \ # setting "header" as true will consider the first row as the header of the Dataframe
#         .option("inferSchema", "true") \ # Spark has its own mechanism to infer the schema which I will leverage at this poit of time
#         .load("/data/retail-data/by-day/*.csv") \ # here we specify the path to our csv file(s)
#         .coalesce(5)\
#         .where("Description IS NOT NULL") # We intend to take only those rows in the which the value in the description column is not null

In [8]:
# Lets us begin by reading the "retail-data/by-day" which is in .csv format and save it into a Spark dataframe named 'sales'
sales = spark.read.format("csv").option("header","true").option("inferSchema", "true").load(r"data/retail-data/by-day/*.csv").coalesce(5).where("Description IS NOT NULL")

In [9]:
# Lets us read the parquet files in "simple-ml-integers" and make a Spark dataframe named 'fakeIntDF'
fakeIntDF=spark.read.parquet("/home/spark/DhirajR/Spark/feature_engineering/data/simple-ml-integers")
# Lets us read the parquet files in "simple-ml" and make a Spark dataframe named 'simpleDF'
simpleDF=spark.read.json(r"/home/spark/DhirajR/Spark/feature_engineering/data/simple-ml")
# Lets us read the parquet files in "simple-ml-scaling" and make a Spark dataframe named 'scaleDF'
scaleDF=spark.read.parquet(r"/home/spark/DhirajR/Spark/feature_engineering/data/simple-ml-scaling")

In [10]:
sales.cache()
sales.show()

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|   580538|    23084|  RABBIT NIGHT LIGHT|      48|2011-12-05 08:38:00|     1.79|   14075.0|United Kingdom|
|   580538|    23077| DOUGHNUT LIP GLOSS |      20|2011-12-05 08:38:00|     1.25|   14075.0|United Kingdom|
|   580538|    22906|12 MESSAGE CARDS ...|      24|2011-12-05 08:38:00|     1.65|   14075.0|United Kingdom|
|   580538|    21914|BLUE HARMONICA IN...|      24|2011-12-05 08:38:00|     1.25|   14075.0|United Kingdom|
|   580538|    22467|   GUMBALL COAT RACK|       6|2011-12-05 08:38:00|     2.55|   14075.0|United Kingdom|
|   580538|    21544|SKULLS  WATER TRA...|      48|2011-12-05 08:38:00|     0.85|   14075.0|United Kingdom|
|   580538|    23126|FELTCRA

In [11]:
type(sales)

pyspark.sql.dataframe.DataFrame

# Vector assembler

The vector assembler is basically use to concatenate all the features into a single vector which can be further passed to the estimator or ML algorithm. In order to demo the 'Vector Assembler' we will use the 'fakeIntDF' which we had created in the previous steps.

In [12]:
# Let us see what kind of data do we have in 'fakeIntDF'

In [13]:
fakeIntDF.cache()
fakeIntDF.show()

+----+----+----+
|int1|int2|int3|
+----+----+----+
|   7|   8|   9|
|   1|   2|   3|
|   4|   5|   6|
+----+----+----+



In [14]:
# Let us import the vector assembler
from pyspark.ml.feature import VectorAssembler
# Once the Vector assembler is imported we are required to create the object of the same. Here I will create an object anmed va
# The above result shows that we have three features in 'FakeIntDF' i.e. int1, int2, int3. Let us create the object va so as to combine the three features into a single column named features
assembler = VectorAssembler(inputCols=["int1", "int2", "int3"],outputCol="features")
# Now let us use the transform method to transform our dataset
assembler.transform(fakeIntDF).show()

+----+----+----+-------------+
|int1|int2|int3|     features|
+----+----+----+-------------+
|   7|   8|   9|[7.0,8.0,9.0]|
|   1|   2|   3|[1.0,2.0,3.0]|
|   4|   5|   6|[4.0,5.0,6.0]|
+----+----+----+-------------+



# Bucketing
Bucketing is a most straight forward approach for fro converting the contonuous variables into categorical variable let us understand this with an example straight away

In pyspark the task of bucketing can be easily accomplished using the Bucketizer class.

Firstly, We shall accomplish the noop task of creating bucket borders. Let us define a list
bucketBorders =[-1.0, 5.0,10.0,250.0,600.0]

Next, let us create a object of the Bucketizer class. Then we will apply the transform method to our target Dataframe "dataframe"

In [15]:
# Let us create a sample dataframe for demo purpose

data = [(-999.9,), (-0.5,), (-0.3,), (0.0,), (0.2,), (999.9,)]
dataFrame = spark.createDataFrame(data, ["features"])

In [16]:
from pyspark.ml.feature import Bucketizer
bucketBorders=[-float("inf"), -0.5, 0.0, 0.5, float("inf")]

bucketer=Bucketizer().setSplits(bucketBorders).setInputCol("features")
bucketer.transform(dataFrame).show()

+--------+---------------------------------------+
|features|Bucketizer_433da22e7cb656b7f278__output|
+--------+---------------------------------------+
|  -999.9|                                    0.0|
|    -0.5|                                    1.0|
|    -0.3|                                    1.0|
|     0.0|                                    2.0|
|     0.2|                                    2.0|
|   999.9|                                    3.0|
+--------+---------------------------------------+



# Scaling and normalization

Scaling and normalization is another common task that we come across while handling continuous varaibles. It is not always imperative to scale and normalize the features. However, it is highly recommended to scale and normalize the features before applying an ML algorithm in order to avert the risk of an algorithm being insensitive to a certain features.

Spark ML provides us with a class "StandardScaler" for easy scaling and normaization of features

In [17]:
scaleDF.show()

+---+--------------+
| id|      features|
+---+--------------+
|  0|[1.0,0.1,-1.0]|
|  1| [2.0,1.1,1.0]|
|  0|[1.0,0.1,-1.0]|
|  1| [2.0,1.1,1.0]|
|  1|[3.0,10.1,3.0]|
+---+--------------+



In [18]:
from pyspark.ml.feature import StandardScaler
# Let us create an object of StandardScaler class
Scalerizer=StandardScaler().setInputCol("features").setOutputCol("Scaled_features")
Scalerizer.fit(scaleDF).transform(scaleDF).show()

+---+--------------+--------------------+
| id|      features|     Scaled_features|
+---+--------------+--------------------+
|  0|[1.0,0.1,-1.0]|[1.19522860933439...|
|  1| [2.0,1.1,1.0]|[2.39045721866878...|
|  0|[1.0,0.1,-1.0]|[1.19522860933439...|
|  1| [2.0,1.1,1.0]|[2.39045721866878...|
|  1|[3.0,10.1,3.0]|[3.58568582800318...|
+---+--------------+--------------------+



# MinMaxScaler

The StandardScaler standardizes the features with a zero mean and standard deviation of 1. Sometimes, we encounter situations where we need to scale values within a given range (i.e. max and min). For such task Spark ML provdies a MinMaxScaler.

The StandardScaler and MinMaxScaler share the common soul, the only difference is that we can provide the minimum value and maximum values within which we wish to scale the features.

For the sake of illustration, let us scale the features in the range 5 to 10.

In [19]:
from pyspark.ml.feature import MinMaxScaler
# Let us create an object of MinMaxScaler class
MinMaxScalerizer=MinMaxScaler().setMin(5).setMax(10).setInputCol("features").setOutputCol("MinMax_Scaled_features")
MinMaxScalerizer.fit(scaleDF).transform(scaleDF).show()

+---+--------------+----------------------+
| id|      features|MinMax_Scaled_features|
+---+--------------+----------------------+
|  0|[1.0,0.1,-1.0]|         [5.0,5.0,5.0]|
|  1| [2.0,1.1,1.0]|         [7.5,5.5,7.5]|
|  0|[1.0,0.1,-1.0]|         [5.0,5.0,5.0]|
|  1| [2.0,1.1,1.0]|         [7.5,5.5,7.5]|
|  1|[3.0,10.1,3.0]|      [10.0,10.0,10.0]|
+---+--------------+----------------------+



# MinAbsScaler

Sometimes we need to scalerize features between -1 to 1. The MinAbsScaler does exactly this by dividing the features by the maximum absolute values

In [20]:
from pyspark.ml.feature import MaxAbsScaler
# Let us create an object of MinAbsScaler class
MinAbsScalerizer=MaxAbsScaler().setInputCol("features").setOutputCol("MinAbs_Scaled_features")
MinAbsScalerizer.fit(scaleDF).transform(scaleDF).show()

+---+--------------+----------------------+
| id|      features|MinAbs_Scaled_features|
+---+--------------+----------------------+
|  0|[1.0,0.1,-1.0]|  [0.33333333333333...|
|  1| [2.0,1.1,1.0]|  [0.66666666666666...|
|  0|[1.0,0.1,-1.0]|  [0.33333333333333...|
|  1| [2.0,1.1,1.0]|  [0.66666666666666...|
|  1|[3.0,10.1,3.0]|         [1.0,1.0,1.0]|
+---+--------------+----------------------+



# ElementwiseProduct

What differentiates ElementwiseProduct from the previously mentioned scalizers is the fact that, in ElementwiseProduct the features are scaled based on a multiplying factor. 

The below mentioned code snippet will transform the feature#1 --> 10 times, feature#2 --> 0.1 times and feature#3 --> -1 times 

For example --> the features [10, 20, 30] if scaled by [10, 0.1, -1] will become [100, 2.0, -30]

In [22]:
from pyspark.ml.feature import ElementwiseProduct
from pyspark.ml.linalg import Vectors

# Let us define a scaling vector 

ScalebyVector=Vectors.dense([10,0.1,-1])

# Let us create an object of the class Elementwise product
ScalingUp=ElementwiseProduct().setScalingVec(ScalebyVector).setInputCol("features").setOutputCol("ElementWiseProduct")
# Let us transform
ScalingUp.transform(scaleDF).show()

+---+--------------+--------------------+
| id|      features|  ElementWiseProduct|
+---+--------------+--------------------+
|  0|[1.0,0.1,-1.0]|[10.0,0.010000000...|
|  1| [2.0,1.1,1.0]|[20.0,0.110000000...|
|  0|[1.0,0.1,-1.0]|[10.0,0.010000000...|
|  1| [2.0,1.1,1.0]|[20.0,0.110000000...|
|  1|[3.0,10.1,3.0]|    [30.0,1.01,-3.0]|
+---+--------------+--------------------+



# Normalizer

The normalizer allows the user to calculate distance between features. The most commonly used distance metircs are "Manhattan distance" and the "Euclidean distance". The Normalizer takes a parameter "p" from the user which represents the power norm.

For example, Manhatan norm (Mahnatan distance) p = 1; Euclidean norm (Euclidean distance) p = 2;

In [23]:
from pyspark.ml.feature import Normalizer
# Let us create an object of the class Normalizer product
ManhattanDistance=Normalizer().setP(1).setInputCol("features").setOutputCol("Manhattan Distance")
EuclideanDistance=Normalizer().setP(2).setInputCol("features").setOutputCol("Euclidean Distance")
# Let us transform
ManhattanDistance.transform(scaleDF).show()

+---+--------------+--------------------+
| id|      features|  Manhattan Distance|
+---+--------------+--------------------+
|  0|[1.0,0.1,-1.0]|[0.47619047619047...|
|  1| [2.0,1.1,1.0]|[0.48780487804878...|
|  0|[1.0,0.1,-1.0]|[0.47619047619047...|
|  1| [2.0,1.1,1.0]|[0.48780487804878...|
|  1|[3.0,10.1,3.0]|[0.18633540372670...|
+---+--------------+--------------------+



In [24]:
EuclideanDistance.transform(scaleDF).show()

+---+--------------+--------------------+
| id|      features|  Euclidean Distance|
+---+--------------+--------------------+
|  0|[1.0,0.1,-1.0]|[0.70534561585859...|
|  1| [2.0,1.1,1.0]|[0.80257235390512...|
|  0|[1.0,0.1,-1.0]|[0.70534561585859...|
|  1| [2.0,1.1,1.0]|[0.80257235390512...|
|  1|[3.0,10.1,3.0]|[0.27384986857909...|
+---+--------------+--------------------+



# Working with Categorical features

Most of the ML algorithms require converting categorical features into numerical ones. 

Sparks StringIndexer maps strings nto different numerical values


In [25]:
simpleDF.show()

+-----+----+------+------------------+
|color| lab|value1|            value2|
+-----+----+------+------------------+
|green|good|     1|14.386294994851129|
| blue| bad|     8|14.386294994851129|
| blue| bad|    12|14.386294994851129|
|green|good|    15| 38.97187133755819|
|green|good|    12|14.386294994851129|
|green| bad|    16|14.386294994851129|
|  red|good|    35|14.386294994851129|
|  red| bad|     1| 38.97187133755819|
|  red| bad|     2|14.386294994851129|
|  red| bad|    16|14.386294994851129|
|  red|good|    45| 38.97187133755819|
|green|good|     1|14.386294994851129|
| blue| bad|     8|14.386294994851129|
| blue| bad|    12|14.386294994851129|
|green|good|    15| 38.97187133755819|
|green|good|    12|14.386294994851129|
|green| bad|    16|14.386294994851129|
|  red|good|    35|14.386294994851129|
|  red| bad|     1| 38.97187133755819|
|  red| bad|     2|14.386294994851129|
+-----+----+------+------------------+
only showing top 20 rows



Let us apply string indexer to a categorical variable named "lab" in "simpleDF" DataFrame.

In [26]:
from pyspark.ml.feature import StringIndexer
# Let us create an object of the class StringIndexer
lblindexer=StringIndexer().setInputCol("lab").setOutputCol("LabelIndexed")
# Let us transform
idxRes=lblindexer.fit(simpleDF).transform(simpleDF)
idxRes.show()

+-----+----+------+------------------+------------+
|color| lab|value1|            value2|LabelIndexed|
+-----+----+------+------------------+------------+
|green|good|     1|14.386294994851129|         1.0|
| blue| bad|     8|14.386294994851129|         0.0|
| blue| bad|    12|14.386294994851129|         0.0|
|green|good|    15| 38.97187133755819|         1.0|
|green|good|    12|14.386294994851129|         1.0|
|green| bad|    16|14.386294994851129|         0.0|
|  red|good|    35|14.386294994851129|         1.0|
|  red| bad|     1| 38.97187133755819|         0.0|
|  red| bad|     2|14.386294994851129|         0.0|
|  red| bad|    16|14.386294994851129|         0.0|
|  red|good|    45| 38.97187133755819|         1.0|
|green|good|     1|14.386294994851129|         1.0|
| blue| bad|     8|14.386294994851129|         0.0|
| blue| bad|    12|14.386294994851129|         0.0|
|green|good|    15| 38.97187133755819|         1.0|
|green|good|    12|14.386294994851129|         1.0|
|green| bad|

# IndexToString

Sometimes we come accross situations where it is necessary to convert the indexed values back to text. To do this the Spark ML provides a class IndextoString. To demonstrate the "IndextoString" let us use the "LabelIndexed" column of  "idxRes" dataframe which was created in the previous code snippet.

The LabelIndexed column consists of 1.0 --> good and 0.0 --> bad. Nw let us try and reverse this

In [27]:
from pyspark.ml.feature import IndexToString
LabelReverse=IndexToString().setInputCol("LabelIndexed").setOutputCol("ReverseIndex")
LabelReverse.transform(idxRes).show()

+-----+----+------+------------------+------------+------------+
|color| lab|value1|            value2|LabelIndexed|ReverseIndex|
+-----+----+------+------------------+------------+------------+
|green|good|     1|14.386294994851129|         1.0|        good|
| blue| bad|     8|14.386294994851129|         0.0|         bad|
| blue| bad|    12|14.386294994851129|         0.0|         bad|
|green|good|    15| 38.97187133755819|         1.0|        good|
|green|good|    12|14.386294994851129|         1.0|        good|
|green| bad|    16|14.386294994851129|         0.0|         bad|
|  red|good|    35|14.386294994851129|         1.0|        good|
|  red| bad|     1| 38.97187133755819|         0.0|         bad|
|  red| bad|     2|14.386294994851129|         0.0|         bad|
|  red| bad|    16|14.386294994851129|         0.0|         bad|
|  red|good|    45| 38.97187133755819|         1.0|        good|
|green|good|     1|14.386294994851129|         1.0|        good|
| blue| bad|     8|14.386

# Indexing within Vectors

Spark offer yet another class named "VectorIndexer". The "VectorIndexer" identifies the categorical variables with a set of features which is already been vectorized and converts it into a categorical feature with zero based category indices.

For the purpose of illustration let us first create a new DataFrame with features in the form of Vectors.

In [28]:
from pyspark.ml.linalg import Vectors
dataln=spark.createDataFrame([(Vectors.dense(1,2,3),1),(Vectors.dense(2,5,6),2),(Vectors.dense(1,8,9),3)]).toDF("features","labels")
dataln.show()

+-------------+------+
|     features|labels|
+-------------+------+
|[1.0,2.0,3.0]|     1|
|[2.0,5.0,6.0]|     2|
|[1.0,8.0,9.0]|     3|
+-------------+------+



In [29]:
from pyspark.ml.feature import VectorIndexer
VecInd=VectorIndexer().setInputCol("features").setMaxCategories(2).setOutputCol("indexed")
VecInd.fit(dataln).transform(dataln).show()

+-------------+------+-------------+
|     features|labels|      indexed|
+-------------+------+-------------+
|[1.0,2.0,3.0]|     1|[0.0,2.0,3.0]|
|[2.0,5.0,6.0]|     2|[1.0,5.0,6.0]|
|[1.0,8.0,9.0]|     3|[0.0,8.0,9.0]|
+-------------+------+-------------+



# One hot endcoding

One hot encoder is the most common type of transformation performed during pre-processing. Let us look at an example straight away.

In [30]:
simpleDF.show()

+-----+----+------+------------------+
|color| lab|value1|            value2|
+-----+----+------+------------------+
|green|good|     1|14.386294994851129|
| blue| bad|     8|14.386294994851129|
| blue| bad|    12|14.386294994851129|
|green|good|    15| 38.97187133755819|
|green|good|    12|14.386294994851129|
|green| bad|    16|14.386294994851129|
|  red|good|    35|14.386294994851129|
|  red| bad|     1| 38.97187133755819|
|  red| bad|     2|14.386294994851129|
|  red| bad|    16|14.386294994851129|
|  red|good|    45| 38.97187133755819|
|green|good|     1|14.386294994851129|
| blue| bad|     8|14.386294994851129|
| blue| bad|    12|14.386294994851129|
|green|good|    15| 38.97187133755819|
|green|good|    12|14.386294994851129|
|green| bad|    16|14.386294994851129|
|  red|good|    35|14.386294994851129|
|  red| bad|     1| 38.97187133755819|
|  red| bad|     2|14.386294994851129|
+-----+----+------+------------------+
only showing top 20 rows



In [36]:
# Let us encode the "color" feature in the "simpleDF"
from pyspark.ml.feature import StringIndexer,OneHotEncoder
SI=StringIndexer().setInputCol('color').setOutputCol('StrIndexed')
ColorIdx=SI.fit(simpleDF).transform(simpleDF)
ohe=OneHotEncoder().setInputCol('StrIndexed').setOutputCol("oheIndexed")
ohe.transform(ColorIdx).show()

+-----+----+------+------------------+----------+-------------+
|color| lab|value1|            value2|StrIndexed|   oheIndexed|
+-----+----+------+------------------+----------+-------------+
|green|good|     1|14.386294994851129|       1.0|(2,[1],[1.0])|
| blue| bad|     8|14.386294994851129|       2.0|    (2,[],[])|
| blue| bad|    12|14.386294994851129|       2.0|    (2,[],[])|
|green|good|    15| 38.97187133755819|       1.0|(2,[1],[1.0])|
|green|good|    12|14.386294994851129|       1.0|(2,[1],[1.0])|
|green| bad|    16|14.386294994851129|       1.0|(2,[1],[1.0])|
|  red|good|    35|14.386294994851129|       0.0|(2,[0],[1.0])|
|  red| bad|     1| 38.97187133755819|       0.0|(2,[0],[1.0])|
|  red| bad|     2|14.386294994851129|       0.0|(2,[0],[1.0])|
|  red| bad|    16|14.386294994851129|       0.0|(2,[0],[1.0])|
|  red|good|    45| 38.97187133755819|       0.0|(2,[0],[1.0])|
|green|good|     1|14.386294994851129|       1.0|(2,[1],[1.0])|
| blue| bad|     8|14.386294994851129|  

# Conclusion

In this humble blog I have tried to cover some basic and widely used data preprocessing transformations offered by Spark ML. I have demonstrated each of them with an illustration. However, there is a plethora of Spark tools for to aid the feature engineering task some of these I will try to cover in my next blog. 

# Bibliography
Chambers, B., & Zaharia, M., 2018. Spark: The definitive guide. " O'Reilly Media, Inc.".