### Preparing Data for Apache Spark ML Model

### Introduction
Apache Spark is an open-source tool for large-scale data processing. PySpark allows to write spark applications using Python APIs. PySpark contains many libraries for writing effieicnt programs. Some other external libarries are PySpark SQL for processing structured and semi-strucutred data, streaming, MLLib for machine learning and GraphX for graph computation.


Before we feed data into machine learning models in PySpark, we need to assemble individual variables into a single feature vector. This is done by using Spark's VectorAssembler. VectorAssembler takes a list of columns as input and combines them into a single vector column. It is useful for combining various raw as well as generated/transformed features into a single feature vector which then can be used for modeling. 

PySpark modeling consumes data differently than others such as in Scikit learn modeling. Therefore, it is important to know how to process data in spark framework and prepare it for modeling. The main purpose of this article is to exactly do so. 

While working with data, we may encounter both categorical or numerical variables . This article summarizes the steps required to handle both categorical and numerical varialbes and how to convert them to a single feature vector, which later then can be used for modeling purpose. 

This post specifically covers:

    > Creating a sample spark dataframe
    
    > Using VectorAssembler when only numerical features are avaialble
    
    > Using VectorAssembler when both numerical and categorical features are avaialble

#### Process flow 
The overall data processing steps in spark starts with data ingestion, conversion of categorical variable to numeric form and using it for vectorization. Once both categorical and numerical variable are assmebled in a vectorized form, it can further be divided into train/test/validation data sets and use it for modeling purpose. 

### Import librariers

Let's import required libaries and start a spark session and name this as VectAssembler application. 

In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('VectAssembler').getOrCreate()

#### Scenario 1:  Vector Assembler when only numerical features are available
Below we create a spark dataframe that consists of 3 numerical variables. 

In [2]:
customerdata = spark.createDataFrame(
    [
        (1, 29.99, 5),
        (2, 31.99, 3),
        (3, 24.99, 1),
        (4, 21.99, 3),
    ],["cust_id", 'monthly_payment', 'tenure_yrs']
)

In [3]:
customerdata.show(3)

+-------+---------------+----------+
|cust_id|monthly_payment|tenure_yrs|
+-------+---------------+----------+
|      1|          29.99|         5|
|      2|          31.99|         3|
|      3|          24.99|         1|
+-------+---------------+----------+
only showing top 3 rows



The VectorAssembler combines all the variables into one column. Let's call it 'features' as shown below. This is an important step because the features column now becomes the input for machine learning model. 

In [4]:
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(
    inputCols=["monthly_payment", "tenure_yrs"],
    outputCol="features")

outdata = assembler.transform(customerdata)

In [5]:
outdata.show(10, truncate=False)

+-------+---------------+----------+-----------+
|cust_id|monthly_payment|tenure_yrs|features   |
+-------+---------------+----------+-----------+
|1      |29.99          |5         |[29.99,5.0]|
|2      |31.99          |3         |[31.99,3.0]|
|3      |24.99          |1         |[24.99,1.0]|
|4      |21.99          |3         |[21.99,3.0]|
+-------+---------------+----------+-----------+



#### Scenario 2: Vector Assembler when both numerical and categorical features are available
Below I have created a spark dataframe consisting of 3 numerical (cust_id, monthly_payment and tenure_yrs) and 1 categorical (state) variables. The state variable contains 3 types: MD, VA and WA.

In [6]:
customerdata1 = spark.createDataFrame(
    [
        (1, 29.99, 5, 'MD'),
        (2, 31.99, 3, 'MD'),
        (3, 24.99, 1, 'VA'),
        (4, 21.99, 3, 'WA'),
        (5, 22.00, 3, 'WA'),
        (6, 25.00, 7, 'WA'),
    ],
    ["cust_id", 'monthly_payment', 'tenure_yrs', 'state']
)

In [7]:
customerdata1.show(10, truncate=False)

+-------+---------------+----------+-----+
|cust_id|monthly_payment|tenure_yrs|state|
+-------+---------------+----------+-----+
|1      |29.99          |5         |MD   |
|2      |31.99          |3         |MD   |
|3      |24.99          |1         |VA   |
|4      |21.99          |3         |WA   |
|5      |22.0           |3         |WA   |
|6      |25.0           |7         |WA   |
+-------+---------------+----------+-----+



There is a little bit more work involved when we have categorical variable in the data. First step is to change categories to number. And, this is accomplished by using StringIndexer available in pyspark.ml.feature. The StringIndexer allocates unique values to each of the categories of the variable. 

When StringIndexer is applied, the most frequent category gets the index 0, followed by next most frequent and so on. Below, state : WA gets 0 index as it is the most frequent category in state, followed by MD :1 and VA :2. 

Let's import required functions.

In [8]:
from pyspark.ml.feature import (StringIndexer, OneHotEncoder, VectorAssembler)

In [9]:
indexer = StringIndexer(inputCol='state', outputCol='stateNum')
indexd_data=indexer.fit(customerdata1).transform(customerdata1)
indexd_data.show(10, truncate=False)

+-------+---------------+----------+-----+--------+
|cust_id|monthly_payment|tenure_yrs|state|stateNum|
+-------+---------------+----------+-----+--------+
|1      |29.99          |5         |MD   |1.0     |
|2      |31.99          |3         |MD   |1.0     |
|3      |24.99          |1         |VA   |2.0     |
|4      |21.99          |3         |WA   |0.0     |
|5      |22.0           |3         |WA   |0.0     |
|6      |25.0           |7         |WA   |0.0     |
+-------+---------------+----------+-----+--------+



In [10]:
print('State distribution')
indexd_data.groupBy('state').count().show()
print('stateNum distribution')
indexd_data.groupBy('stateNum').count().show()

State distribution
+-----+-----+
|state|count|
+-----+-----+
|   VA|    1|
|   MD|    2|
|   WA|    3|
+-----+-----+

stateNum distribution
+--------+-----+
|stateNum|count|
+--------+-----+
|     0.0|    3|
|     1.0|    2|
|     2.0|    1|
+--------+-----+



As shown above, the StateNum variable is created which has numerical representation for each of the states. 

Once StringIndexer is performed, we can use OneHotEncoder to encode the indexed variable and finally use VectorAssembler to assemble all numerical and one hot encoded vectors together. 

In [11]:
encoder = OneHotEncoder(inputCol='stateNum', outputCol = 'stateVec')
onehotdata = encoder.fit(indexd_data).transform(indexd_data)

In [12]:
onehotdata.show(10, truncate=False)

+-------+---------------+----------+-----+--------+-------------+
|cust_id|monthly_payment|tenure_yrs|state|stateNum|stateVec     |
+-------+---------------+----------+-----+--------+-------------+
|1      |29.99          |5         |MD   |1.0     |(2,[1],[1.0])|
|2      |31.99          |3         |MD   |1.0     |(2,[1],[1.0])|
|3      |24.99          |1         |VA   |2.0     |(2,[],[])    |
|4      |21.99          |3         |WA   |0.0     |(2,[0],[1.0])|
|5      |22.0           |3         |WA   |0.0     |(2,[0],[1.0])|
|6      |25.0           |7         |WA   |0.0     |(2,[0],[1.0])|
+-------+---------------+----------+-----+--------+-------------+



The OneHotEcnoder creates dummy variables for each category. In our case state has 3 categories, MD, VA and WA. When used OneHotEncoder, these 3 categories are represented by two columns (vectors). The one hot encoder outputs n-1 vectors where n is number of categoires in a feature (it's 3, in our case for state). One hot encoded vector looks little bit different as shown above in stateVec. It captures number of vectors, value contained in the vector and position of the value. Let's take a closer look at stateVec for MD i.e. (2,[1],[1.0]). It represents, there are 2 vectors (3 categories -1), with value 1 (1.0) at position 1. 

Furthermore, when we use StringIndexer, the state gets the stateNum 0 for WA (most frequent), 1 for MD (second most) and 2 for VA (least frequent) . Subsequently, when the one hot encoding vector is created, there will be first vector for WA (call WA_vector) where WA_vector=1 for the rows containing WA and 0 otherwise. Similarly, the other vector would be for MD (call MD_vector) where MD_Vector= 1 for the rows containing MD otherwise 0. Now, going back to example (2,[1],[1.0]). We have 2 vectors and for MD, the vector value is 1 at 1st position (0th position for WA, 1st for MD). Similarly for WA the representation is (2,[0],[1.0]). It indicates, 2 vectors, and value of 1 (1.0) is available at 0th place (0th position for WA).  And, for VA it is represented by (2,[],[]).


Note: The way spark handles the One hot encoder is that it does not include the last category. For example with 3 categories with input value of 1 would map to an output vector of [0.0,1.0] The last category is not included. Another example of feature with 3 categories with input value of 2 would map to [0.0,0.0]

#### Vector Assmebler
Now that we have converted the categeorical feature to numerical form, we need to assemble all the input columns including this converted one into a single vector called features. As shown below, the output data has all the inputs that we provided in inputCols of VectorAssembler. Next for further downstream uses such as train, test split and modeling, we only use features as input.

In [13]:
assembler1 = VectorAssembler(
    inputCols=["cust_id", "monthly_payment", "tenure_yrs", "stateVec"],
    outputCol="features")
outdata1 = assembler1.transform(onehotdata)
outdata1.show(10, truncate=False)

+-------+---------------+----------+-----+--------+-------------+-----------------------+
|cust_id|monthly_payment|tenure_yrs|state|stateNum|stateVec     |features               |
+-------+---------------+----------+-----+--------+-------------+-----------------------+
|1      |29.99          |5         |MD   |1.0     |(2,[1],[1.0])|[1.0,29.99,5.0,0.0,1.0]|
|2      |31.99          |3         |MD   |1.0     |(2,[1],[1.0])|[2.0,31.99,3.0,0.0,1.0]|
|3      |24.99          |1         |VA   |2.0     |(2,[],[])    |[3.0,24.99,1.0,0.0,0.0]|
|4      |21.99          |3         |WA   |0.0     |(2,[0],[1.0])|[4.0,21.99,3.0,1.0,0.0]|
|5      |22.0           |3         |WA   |0.0     |(2,[0],[1.0])|[5.0,22.0,3.0,1.0,0.0] |
|6      |25.0           |7         |WA   |0.0     |(2,[0],[1.0])|[6.0,25.0,7.0,1.0,0.0] |
+-------+---------------+----------+-----+--------+-------------+-----------------------+



### Summary
PySpark modeling requires to prepare data using VectorAssembler which contains all the numerical features and vector converted categorical features. StringIndexer and OneHotEncoder avaialble in pyspark.ml.feature are important steps for converting categorical variable into a vectorized form which then can be used for downstream modeling work. 

Look out for my next article on ML models using PySpark.

### END END END