# Getting Comfortable with Feature Engineering Using PySpark


It's time to get some practice with the different options for feature engineering in PySpark!

Check out these resources:
* https://spark.apache.org/docs/1.4.0/ml-features.html
* https://dhiraj-p-rai.medium.com/essentials-of-feature-engineering-in-pyspark-part-i-76a57680a85
* https://www.kaggle.com/code/dhirajrai87/feature-engineering-with-pyspark
* https://datascience.stackexchange.com/questions/45900/when-to-use-standard-scaler-and-when-normalizer

the common things we can do in PySpark are:
* Feature Transformers
* PolynomialExpansion*
* StringIndexer
* OneHotEncoder
* VectorIndexer
* Normalizer*
* StandardScaler* (do this one last!)
* Bucketizer*
* ElementwiseProduct
* VectorAssembler
  

# Exercise
Using the CA Housing Dataset on the right, try FOUR methods (**PolynomialExpansion, Bucketizer, Normalizer and StandardScaler**) which runs successfully on train and is correctly applied to test.



# Start PySpark and Read Data from Colab

In [None]:
!pip install pyspark



In [None]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

spark = SparkSession.builder\
        .appName("FeatureEngineering_HW")\
        .getOrCreate()

In [None]:
# specify the directory
DIRECTORY = '/content/sample_data'

In [None]:
import os

# read the train data
train = spark.read.csv(
 path=os.path.join(DIRECTORY, "california_housing_train.csv"),
 sep=",",
 header=True,
 inferSchema=True,
 timestampFormat="yyyy-MM-dd", # used to tell spark the format of dateTime columns
)

# read the test data
test = spark.read.csv(
 path=os.path.join(DIRECTORY, "california_housing_test.csv"),
 sep=",",
 header=True,
 inferSchema=True,
 timestampFormat="yyyy-MM-dd", # used to tell spark the format of dateTime columns
)

In [None]:
# view first few rows
train.show()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|  -114.31|   34.19|              15.0|     5612.0|        1283.0|    1015.0|     472.0|       1.4936|           66900.0|
|  -114.47|    34.4|              19.0|     7650.0|        1901.0|    1129.0|     463.0|         1.82|           80100.0|
|  -114.56|   33.69|              17.0|      720.0|         174.0|     333.0|     117.0|       1.6509|           85700.0|
|  -114.57|   33.64|              14.0|     1501.0|         337.0|     515.0|     226.0|       3.1917|           73400.0|
|  -114.57|   33.57|              20.0|     1454.0|         326.0|     624.0|     262.0|        1.925|           65500.0|
|  -114.58|   33.63|    

# VectorAssembler
This method will combine all of the columns of interest into a single vector column. This vector column has been optimized for ML pipelines. You can think of it as each row having a list with ALL of the columns of interest inside of it.

## Train

In [None]:
train.columns           #variables in training dataset

['longitude',
 'latitude',
 'housing_median_age',
 'total_rooms',
 'total_bedrooms',
 'population',
 'households',
 'median_income',
 'median_house_value']

In [None]:
#Assigning all continuous columns to a variable as an array
CONTINUOUS_COLUMNS = ['longitude',
                      'latitude',
                      'housing_median_age',
                      'total_rooms',
                      'total_bedrooms',
                      'population',
                      'households',
                      'median_income'] # note that we dropped the target variable here!
print(CONTINUOUS_COLUMNS)

['longitude', 'latitude', 'housing_median_age', 'total_rooms', 'total_bedrooms', 'population', 'households', 'median_income']


In [None]:
#assigning target variable to a variable
TARGET_COLUMN = ['median_house_value']
print(TARGET_COLUMN)

['median_house_value']


In [None]:
from pyspark.ml.feature import VectorAssembler

# we input all the continuous columns as a vector "CONTINUOUS_COLUMNS"
continuous_features = VectorAssembler(inputCols=CONTINUOUS_COLUMNS, outputCol="continuous_features")

In [None]:
#remove all the vector data with null values
for x in CONTINUOUS_COLUMNS:
  vector_df_train = train.where(~F.isnull(F.col(x)))
  vector_df_test = test.where(~F.isnull(F.col(x)))

In [None]:
#transform
vector_variable_train = continuous_features.transform(vector_df_train)
vector_variable_train.show() # see how all of the features are now in one column?

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+--------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value| continuous_features|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+--------------------+
|  -114.31|   34.19|              15.0|     5612.0|        1283.0|    1015.0|     472.0|       1.4936|           66900.0|[-114.31,34.19,15...|
|  -114.47|    34.4|              19.0|     7650.0|        1901.0|    1129.0|     463.0|         1.82|           80100.0|[-114.47,34.4,19....|
|  -114.56|   33.69|              17.0|      720.0|         174.0|     333.0|     117.0|       1.6509|           85700.0|[-114.56,33.69,17...|
|  -114.57|   33.64|              14.0|     1501.0|         337.0|     515.0|     226.0|       3.1917|           73400.0|[-114.57,33.64,14...|

## Test

In [None]:
# now apply to test
vector_variable_test = continuous_features.transform(vector_df_test)
vector_variable_test.show() # see how all of the features are now in one column?

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+--------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value| continuous_features|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+--------------------+
|  -122.05|   37.37|              27.0|     3885.0|         661.0|    1537.0|     606.0|       6.6085|          344700.0|[-122.05,37.37,27...|
|   -118.3|   34.26|              43.0|     1510.0|         310.0|     809.0|     277.0|        3.599|          176500.0|[-118.3,34.26,43....|
|  -117.81|   33.78|              27.0|     3589.0|         507.0|    1484.0|     495.0|       5.7934|          270500.0|[-117.81,33.78,27...|
|  -118.36|   33.82|              28.0|       67.0|          15.0|      49.0|      11.0|       6.1359|          330000.0|[-118.36,33.82,28...|

# Polynomial Expansion

Polynomial expansion is used to expand your features into a polynomial space, which is computed by an n-degree combination of original dimensions. This can help capture interactions between features that linear models can miss.

## Train

In [None]:
from pyspark.ml.feature import PolynomialExpansion

# Polynomial Expansion of the continuous features vector
polynomial_expansion = PolynomialExpansion(degree=2, inputCol="continuous_features", outputCol="poly_features")

# Applying Polynomial Expansion to the train dataset
poly_train = polynomial_expansion.transform(vector_variable_train)
poly_train.show(truncate=False)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+-------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|continuous_features                                    |poly_features                                                                                                                                                             

## Test

In [None]:
# etc... feel free to add your own headers after this!
# Applying Polynomial Expansion to the test dataset
poly_test = polynomial_expansion.transform(vector_variable_test)
poly_test.show(truncate = False)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+-------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|continuous_features                                    |poly_features                                                                                                                                          

# Bucketizer

Bucketizer transforms a column of continuous features into a column of feature buckets, where the buckets are specified by users. It can be useful for dividing a column of continuous features into different categories based on feature values.

## Train

In [None]:
from pyspark.ml.feature import Bucketizer

# Defines the boundaries for bucketizing the data.
# Values less than 2.0 fall into the first bucket
# values between 2.0 and 4.0 in the second, and so on
splits = [-float("inf"), 2.0, 4.0, 6.0, float("inf")]

# Creates an instance of Bucketizer.
# 'median_income' is the input column to be bucketized.
# 'income_bucket' is the output column where the bucket indices will be stored.
bucketizer = Bucketizer(splits=splits, inputCol="median_income", outputCol="income_bucket")

# Applying Bucketizer to the train datasets
bucketized_train = bucketizer.transform(train)
bucketized_train.show(truncate = False)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+-------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|income_bucket|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+-------------+
|-114.31  |34.19   |15.0              |5612.0     |1283.0        |1015.0    |472.0     |1.4936       |66900.0           |0.0          |
|-114.47  |34.4    |19.0              |7650.0     |1901.0        |1129.0    |463.0     |1.82         |80100.0           |0.0          |
|-114.56  |33.69   |17.0              |720.0      |174.0         |333.0     |117.0     |1.6509       |85700.0           |0.0          |
|-114.57  |33.64   |14.0              |1501.0     |337.0         |515.0     |226.0     |3.1917       |73400.0           |1.0          |
|-114.57  |33.57   |20.0              |1454.0   

## Test

In [None]:
# Applying Bucketizer to the test datasets
bucketized_test = bucketizer.transform(test)
bucketized_test.show(truncate = False)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+-------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|income_bucket|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+-------------+
|-122.05  |37.37   |27.0              |3885.0     |661.0         |1537.0    |606.0     |6.6085       |344700.0          |3.0          |
|-118.3   |34.26   |43.0              |1510.0     |310.0         |809.0     |277.0     |3.599        |176500.0          |1.0          |
|-117.81  |33.78   |27.0              |3589.0     |507.0         |1484.0    |495.0     |5.7934       |270500.0          |2.0          |
|-118.36  |33.82   |28.0              |67.0       |15.0          |49.0      |11.0      |6.1359       |330000.0          |3.0          |
|-119.67  |36.33   |19.0              |1241.0   

# Normalizer

Normalizer is a Transformer which transforms a dataset of Vector rows, normalizing each Vector to have unit norm. It's a way to standardize your features. The normalization can help in some algorithms that are sensitive to the scale of data.

##Train

In [None]:
from pyspark.ml.feature import Normalizer

# Normalizing continuous features vector
normalizer = Normalizer(inputCol="continuous_features", outputCol="normalized_features", p=1.0)

# Applying Normalizer to the train datasets
normalized_train = normalizer.transform(vector_variable_train)
normalized_train.show(truncate = False)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+-------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|continuous_features                                    |normalized_features                                                                                                                                                     |
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+-------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------

##Test

In [None]:
# Applying Normalizer to the test datasets
normalized_test = normalizer.transform(vector_variable_test)
normalized_test.show(truncate = False)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+-------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|continuous_features                                    |normalized_features                                                                                                                                                    |
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+-------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------

# StandardScaler

StandardScaler standardizes a feature by removing the mean and scaling to unit variance using column summary statistics on the samples in the training set.

## Train

In [None]:
from pyspark.ml.feature import StandardScaler

# StandardScaler to scale continuous features vector
scaler = StandardScaler(inputCol="continuous_features", outputCol="scaled_features", withStd=True, withMean=False)

# Fit on train data
scalerModel = scaler.fit(vector_variable_train)

# Transforming train dataset
scaled_train = scalerModel.transform(vector_variable_train)
scaled_train.show(truncate = False)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+-------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|continuous_features                                    |scaled_features                                                                                                                                                |
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+-------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------

## Test

In [None]:
# Transforming test dataset
scaled_test = scalerModel.transform(vector_variable_test)
scaled_test.show(truncate = False)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+-------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|continuous_features                                    |scaled_features                                                                                                                                              |
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+-------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------

# Bullets


I learnt the following points in pyspark by doing this assignment

*   First of all, I learnt the concept of feature engineering in a clear way and different types of feature engineering tools used in pyspark.
*   Secondly, I learnt how to use important feature engineeing tools like Vector Assembler, Polynomial Expansion, Bucketizer, Normalizer, and StandardScaler in pyspark.
*   Furthermore, this knowledge showed me how to properly prepare data for machine learning by transforming variables into the right format, making my analyses more effective and insightful.
*   Next, I learnt the difference between various transformative functions/tools and now I got to know which type of tools are used in specific situations.
*   Finally, this exercise gave me a valuable handson experience in pyspark dealing with different feature engineering tools, how to set the parameters while using a specific transformative function and how to use it to train and test data.
