<a href="https://colab.research.google.com/github/ROARMarketingConcepts/Machine-Learning-Projects/blob/master/Building_Machine_Learning_Pipelines_in_PySpark_MLlib.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Task 1 - Install Spark, load required libraries, set environment variables, initiate Spark, load file

Mount my Google drive where input files are located...

In [1]:
from google.colab import drive
drive.mount('/gdrive')

Drive already mounted at /gdrive; to attempt to forcibly remount, call drive.mount("/gdrive", force_remount=True).


In [2]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-2.4.8/spark-2.4.8-bin-hadoop2.7.tgz
!tar xf spark-2.4.8-bin-hadoop2.7.tgz
!pip install -q findspark

In [3]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.8-bin-hadoop2.7"
import findspark
findspark.init()
from google.colab import files
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import isnan, when, count, col, lit
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline
from pyspark.ml.tuning import CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder


In [4]:
sc=SparkSession.builder.master("local[*]").getOrCreate()

In [5]:
!ls

pipeline     spark-2.4.8-bin-hadoop2.7	    spark-2.4.8-bin-hadoop2.7.tgz.1
sample_data  spark-2.4.8-bin-hadoop2.7.tgz  spark-2.4.8-bin-hadoop2.7.tgz.2


Read in the "data.csv" file...

In [6]:
data=sc.read.csv('/gdrive/MyDrive/Colab Notebooks/Building Machine Learning Pipelines in PySpark MLlib/data.csv',inferSchema=True,header=True)

Quickly examine the dataframe...

In [7]:
data.printSchema()
data.describe().toPandas().transpose()

root
 |-- Make: string (nullable = true)
 |-- Model: string (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Engine Fuel Type: string (nullable = true)
 |-- Engine HP: integer (nullable = true)
 |-- Engine Cylinders: integer (nullable = true)
 |-- Transmission Type: string (nullable = true)
 |-- Driven_Wheels: string (nullable = true)
 |-- Number of Doors: integer (nullable = true)
 |-- Market Category: string (nullable = true)
 |-- Vehicle Size: string (nullable = true)
 |-- Vehicle Style: string (nullable = true)
 |-- highway MPG: integer (nullable = true)
 |-- city mpg: integer (nullable = true)
 |-- Popularity: integer (nullable = true)
 |-- MSRP: integer (nullable = true)



Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
Make,11914,,,Acura,Volvo
Model,11914,745.5822222222222,1490.8280590623795,1 Series,xD
Year,11914,2010.384337753903,7.5797398875957995,1990,2017
Engine Fuel Type,11911,,,diesel,regular unleaded
Engine HP,11845,249.38607007176023,109.19187025917194,55,1001
Engine Cylinders,11884,5.628828677213059,1.78055934824622,0,16
Transmission Type,11914,,,AUTOMATED_MANUAL,UNKNOWN
Driven_Wheels,11914,,,all wheel drive,rear wheel drive
Number of Doors,11908,3.4360933825999327,0.8813153865835529,2,4


Define a function to replace 'N/A's in the dataset columns with null values

In [8]:
def replace(column,value):
  return when(column!=value,column).otherwise(lit(None))

data=data.withColumn("Market Category",replace(col("Market Category"),"N/A"))

Let's get a count of the null values in each column...

In [9]:
data.select([count(when(isnan(c)|col(c).isNull(),c)).alias(c) for c in data.columns]).show()

+----+-----+----+----------------+---------+----------------+-----------------+-------------+---------------+---------------+------------+-------------+-----------+--------+----------+----+
|Make|Model|Year|Engine Fuel Type|Engine HP|Engine Cylinders|Transmission Type|Driven_Wheels|Number of Doors|Market Category|Vehicle Size|Vehicle Style|highway MPG|city mpg|Popularity|MSRP|
+----+-----+----+----------------+---------+----------------+-----------------+-------------+---------------+---------------+------------+-------------+-----------+--------+----------+----+
|   0|    0|   0|               3|       69|              30|                0|            0|              6|           3742|           0|            0|          0|       0|         0|   0|
+----+-----+----+----------------+---------+----------------+-----------------+-------------+---------------+---------------+------------+-------------+-----------+--------+----------+----+



In [10]:
data=data.drop("Market Category")
data=data.na.drop()
print((data.count(),len(data.columns)))

(11812, 15)


Form the feature matrix `Attributes` for training a Random Forest regressor. The output vector will be `MSRP`. 

In [11]:
assembler = VectorAssembler(inputCols=["Year","Engine HP","Engine Cylinders","Number of Doors","highway MPG",
                                       "city mpg","Popularity"],outputCol="Attributes")

regressor = RandomForestRegressor(featuresCol="Attributes",labelCol="MSRP")
pipeline = Pipeline(stages=[assembler,regressor])
pipeline.write().overwrite().save('pipeline')   # ensure that we overwrite old pipelines
!ls

pipeline     spark-2.4.8-bin-hadoop2.7	    spark-2.4.8-bin-hadoop2.7.tgz.1
sample_data  spark-2.4.8-bin-hadoop2.7.tgz  spark-2.4.8-bin-hadoop2.7.tgz.2


We now create the ML pipeline and set up a grid of `numTrees` values to train the RandomForestRegressor.

In [12]:
pipelineModel = Pipeline.load('pipeline')
paramGrid = ParamGridBuilder().addGrid(regressor.numTrees,[100,500]).build()
crossval = CrossValidator(estimator=pipelineModel,
                          estimatorParamMaps=paramGrid,
                          evaluator=RegressionEvaluator(labelCol="MSRP"),
                          numFolds=3)

Split `data` into training and test datasets...

In [13]:
train_data,test_data = data.randomSplit([0.8,0.2],seed=123)
cvModel = crossval.fit(train_data)

Determine the best model parameters...

In [14]:
bestModel = cvModel.bestModel
for x in range(len(bestModel.stages)):
  print(bestModel.stages[x])

VectorAssembler_8615bc391e9c
RandomForestRegressionModel (uid=RandomForestRegressor_50848606e716) with 500 trees


Now, we make predictions using `test_data`...

In [15]:
pred = cvModel.transform(test_data)
pred.select("MSRP","prediction").show()

+-----+------------------+
| MSRP|        prediction|
+-----+------------------+
|29980| 32848.65647370881|
|28030| 32997.62070188599|
|30030| 32997.62070188599|
|32700| 36688.02773571044|
|29350|23696.932387098073|
|31890| 27920.70804196848|
|34980| 27920.70804196848|
| 2799|5218.1119426641135|
| 2827| 5315.114328650283|
| 3381| 6201.761439022404|
|24450|26685.425872616986|
|21050| 22931.39894293414|
| 2000|5727.3993522661785|
| 2181| 8674.453524146527|
| 2144| 5870.595039946907|
| 2265| 8693.249128277552|
|56780| 39587.94834692646|
|49440| 39567.74681121242|
|50640| 39567.74681121242|
|52640| 39567.74681121242|
+-----+------------------+
only showing top 20 rows



Now, let's evaluate our model using a number of commonly-accepted metrics.

In [17]:
eval = RegressionEvaluator(labelCol="MSRP")

rmse = eval.evaluate(pred)
mse = eval.evaluate(pred,{eval.metricName: "mse"})
mae = eval.evaluate(pred,{eval.metricName: "mae"})
r2 = eval.evaluate(pred,{eval.metricName: "r2"})

print("RMSE: %.3f" %rmse)
print("MSE: %.3f" %mse)
print("MAE: %.3f" %mae)
print("R2: %.3f" %r2)

RMSE: 16498.020
MSE: 272184662.478
MAE: 8329.209
R2: 0.880
