<a href="https://colab.research.google.com/github/Juanreyna3/ML-Pipeline-with-PySpark/blob/main/ML%20Pipeline%20with%20PySpark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Task 1 - Install Spark, load required libraries, set environment variables, initiate Spark, load file

In [None]:
# installs Java (required to run Spark)
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

# downloads Spark repository from the official Spark website
#!wget -q https://downloads.apache.org/spark/spark-2.4.6/spark-2.4.6-bin-hadoop2.7.tgz

#inserted 
#!wget -q https://dlcdn.apache.org/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz

#other inserted
!wget -q https://downloads.apache.org/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz

# unpacks repo
#!tar xf spark-2.4.6-bin-hadoop2.7.tgz

#edited 
!tar xf spark-3.2.1-bin-hadoop3.2.tgz

# installs findspark
!pip install -q findspark

In [None]:
import os

#creates environment variables for JAVA_HOME and SPARK_HOME 
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.2.1-bin-hadoop3.2"

#initiates findspark and imports all required libraries
import findspark
findspark.init()

from google.colab import files
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import isnan, when, count, col, lit
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline
from pyspark.ml.tuning import CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder

In [None]:
#start a spark session and load the dataset file into Spark 

#sets Spark to run locally on all available cores
sc = SparkSession.builder.master("local[*]").getOrCreate()

In [None]:
#loads dataset file into Colab virtual machine 
files.upload()

Saving data1.csv to data1.csv


{'data1.csv': b'Make,Model,Engine,Engine Fuel Type,Engine HP,Engine Cylinders,Transmission Type,Driven_Wheels,Number of Doors,Market Category,Vehicle Size,Vehicle Style,Highway MPG,City mpg,Popularity,MSRP\r\nFIAT,124 Spider,2017,premium unleaded (recommended),160,4,MANUAL,rear wheel drive,2,Performance,Compact,Convertible,35,26,819,24995\r\nFIAT,124 Spider,2017,premium unleaded (recommended),160,4,MANUAL,rear wheel drive,2,Performance,Compact,Convertible,35,26,819,28195\r\nMercedes-Benz,190-Class,1991,regular unleaded,130,4,MANUAL,rear wheel drive,4,Luxury,Compact,Sedan,26,18,617,2000\r\nMercedes-Benz,190-Class,1991,regular unleaded,158,6,MANUAL,rear wheel drive,4,Luxury,Compact,Sedan,25,17,617,2000\r\nMercedes-Benz,190-Class,1992,regular unleaded,158,6,MANUAL,rear wheel drive,4,Luxury,Compact,Sedan,25,17,617,2000\r\nMercedes-Benz,190-Class,1992,regular unleaded,130,4,MANUAL,rear wheel drive,4,Luxury,Compact,Sedan,26,18,617,2000\r\nMercedes-Benz,190-Class,1993,regular unleaded,130,4,M

In [None]:
!ls

data1.csv    spark-3.2.1-bin-hadoop3.2
sample_data  spark-3.2.1-bin-hadoop3.2.tgz


In [None]:
#reads the file into Spark
data = sc.read.csv('data1.csv', inferSchema=True, header=True)

Describing and Cleaning the Dataset:

In [None]:
data.printSchema()
data.describe().toPandas().transpose()

root
 |-- Make: string (nullable = true)
 |-- Model: string (nullable = true)
 |-- Engine: integer (nullable = true)
 |-- Engine Fuel Type: string (nullable = true)
 |-- Engine HP: integer (nullable = true)
 |-- Engine Cylinders: integer (nullable = true)
 |-- Transmission Type: string (nullable = true)
 |-- Driven_Wheels: string (nullable = true)
 |-- Number of Doors: integer (nullable = true)
 |-- Market Category: string (nullable = true)
 |-- Vehicle Size: string (nullable = true)
 |-- Vehicle Style: string (nullable = true)
 |-- Highway MPG: integer (nullable = true)
 |-- City mpg: integer (nullable = true)
 |-- Popularity: integer (nullable = true)
 |-- MSRP: integer (nullable = true)



Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
Make,11881,,,Acura,Volvo
Model,11881,767.8436781609196,1511.4508757153594,124 Spider,xD
Engine,11881,2010.4031647167747,7.564386036663579,1990,2017
Engine Fuel Type,11878,,,diesel,regular unleaded
Engine HP,11812,249.46012529630883,109.29258300671904,55,1001
Engine Cylinders,11851,5.6279638849042275,1.7828695584153884,0,16
Transmission Type,11881,,,AUTOMATED_MANUAL,UNKNOWN
Driven_Wheels,11881,,,all wheel drive,rear wheel drive
Number of Doors,11875,3.437557894736842,0.8805368947909242,2,4


In [None]:
def replace(column, value):
  return when(column != value, column).otherwise(lit(None))

#replace all strings of "N/A" with "None" which Spark registers as a missing value
data = data.withColumn("Market Category", replace(col("Market Category"), "N/A"))

In [None]:
# counts number of "None" in each column
data.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in data.columns]).show()

+----+-----+------+----------------+---------+----------------+-----------------+-------------+---------------+---------------+------------+-------------+-----------+--------+----------+----+
|Make|Model|Engine|Engine Fuel Type|Engine HP|Engine Cylinders|Transmission Type|Driven_Wheels|Number of Doors|Market Category|Vehicle Size|Vehicle Style|Highway MPG|City mpg|Popularity|MSRP|
+----+-----+------+----------------+---------+----------------+-----------------+-------------+---------------+---------------+------------+-------------+-----------+--------+----------+----+
|   0|    0|     0|               3|       69|              30|                0|            0|              6|           3742|           0|            0|          0|       0|         0|   0|
+----+-----+------+----------------+---------+----------------+-----------------+-------------+---------------+---------------+------------+-------------+-----------+--------+----------+----+



Since Market Category has over 3700 "None" values (>30% of dataset), the column can be safely dropped from the dataset

In [None]:
data = data.drop('Market Category')
data = data.na.drop()
print(data.count(), len(data.columns))

11779 15


Will assemble all numeric columns into a vector of features and create a Random Forest regressor to predict the car prices using the features. 

In [None]:
assembler = VectorAssembler(inputCols=['Year','Engine HP', 'Engine Cylinders', 
                                       'Number of Doors', 'Highway MPG',
                                       'City MPG','Popularity'],
                                       outputCol='Attributes')

Note, in the above VectorAssember() function, the input columns are specified as all numeric columns, whereas the output columns attributes are specified.

This combines the values of the 7 columns into a single vector and stores this vector in a new column called attributes.

In [23]:
regressor = RandomForestRegressor(featuresCol='Attributes', labelCol='MSRP')
pipeline = Pipeline(stages = [assembler, regressor])

"""
    saves the pipeline on the Google Colab machine & ensures if the pipeline is 
    re-run it will overwrite the previous pipeline, not return any errors and 
    save the pipeline under the folder name 'pipeline'
"""

pipeline.write().overwrite().save('pipeline')

In this pipeline there is:
  *   Vector Assembler
  *   Random Forest Regressor




In [24]:
# check: views the content of the directory to ensure the pipeline has been saved
!ls

data1.csv  sample_data		      spark-3.2.1-bin-hadoop3.2.tgz
pipeline   spark-3.2.1-bin-hadoop3.2


Cross Validation and Hyperparameter Tuning:

Hyperparameter Tuning is a crucial process for building any machine learning model because it chooses the best parameters for the mdoel.

Cross Validation is a resampling procedure used to evaluate machine learning models.

This generally results in a less-biased model.

In [25]:
# load the pipeline in a new object called pipelineModel
pipelineModel = Pipeline.load('pipeline')

In [27]:
# creates a gridmap of parameters to explore for the random forest model
paramGrid = ParamGridBuilder() \
  .addGrid(regressor.numTrees, [100,500]) \
  .build()


Note, for the above, any parameters of the random forest model could have been explored (e.g. number of trees, maximum depth, features subset strategy, etc.)

For time, only the number of trees were explored.

Also above, the regressor object is the random forest object created in the previous task

In [29]:
# creates cross validator using 3 folds and a regression evaluator (evaluator metrics are RMSE by default)
crossval = CrossValidator(estimator=pipelineModel, 
                          estimatorParamMaps = paramGrid, 
                          evaluator=RegressionEvaluator(labelCol='MSRP'), 
                          numFolds=3)

To create a cross validator object, you need your pipeline, the parameter grid, an evaluator and the number of folds.

Note, for a regression evaluator, the default looks for a label column called 'label', but in our case this column is called 'MSRP' bc that is the column that we are predicting.