Task 1 - Install Spark, load required libraries, set environment variables, initiate Spark, load file

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-2.4.7/spark-2.4.7-bin-hadoop2.7.tgz
!tar xf spark-2.4.7-bin-hadoop2.7.tgz
!pip install -q findspark

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.7-bin-hadoop2.7"
import findspark
findspark.init()
from google.colab import files
from pyspark.sql import SparkSession
from pyspark.sql.functions import isnan, when, count, col, lit
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml import Pipeline
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator

In [None]:
# start from here
sc = SparkSession.builder.master("local[*]").getOrCreate()

In [None]:
files.upload()

In [None]:
!ls

data.csv  sample_data  spark-2.4.7-bin-hadoop2.7  spark-2.4.7-bin-hadoop2.7.tgz


In [None]:
data = sc.read.csv('data.csv',inferSchema=True, header =True)

Task 2 - Describe data, clean data

Note - If you are starting from this task, you can run the cells from all previous tasks in the kernel by going to Runtime > Run before

In [None]:
data.printSchema()
data.describe().toPandas().transpose()

root
 |-- Make: string (nullable = true)
 |-- Model: string (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Engine Fuel Type: string (nullable = true)
 |-- Engine HP: integer (nullable = true)
 |-- Engine Cylinders: integer (nullable = true)
 |-- Transmission Type: string (nullable = true)
 |-- Driven_Wheels: string (nullable = true)
 |-- Number of Doors: integer (nullable = true)
 |-- Market Category: string (nullable = true)
 |-- Vehicle Size: string (nullable = true)
 |-- Vehicle Style: string (nullable = true)
 |-- highway MPG: integer (nullable = true)
 |-- city mpg: integer (nullable = true)
 |-- Popularity: integer (nullable = true)
 |-- MSRP: integer (nullable = true)



Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
Make,11914,,,Acura,Volvo
Model,11914,745.5822222222222,1490.8280590623795,1 Series,xD
Year,11914,2010.384337753903,7.5797398875957995,1990,2017
Engine Fuel Type,11911,,,diesel,regular unleaded
Engine HP,11845,249.38607007176023,109.19187025917194,55,1001
Engine Cylinders,11884,5.628828677213059,1.78055934824622,0,16
Transmission Type,11914,,,AUTOMATED_MANUAL,UNKNOWN
Driven_Wheels,11914,,,all wheel drive,rear wheel drive
Number of Doors,11908,3.4360933825999327,0.8813153865835529,2,4


In [None]:
def replace(column, value):
    return when(column != value, column).otherwise(lit(None))

data = data.withColumn("Market Category", replace(col("Market Category"), "N/A"))

In [None]:
data.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in data.columns]).show()

+----+-----+----+----------------+---------+----------------+-----------------+-------------+---------------+---------------+------------+-------------+-----------+--------+----------+----+
|Make|Model|Year|Engine Fuel Type|Engine HP|Engine Cylinders|Transmission Type|Driven_Wheels|Number of Doors|Market Category|Vehicle Size|Vehicle Style|highway MPG|city mpg|Popularity|MSRP|
+----+-----+----+----------------+---------+----------------+-----------------+-------------+---------------+---------------+------------+-------------+-----------+--------+----------+----+
|   0|    0|   0|               3|       69|              30|                0|            0|              6|           3742|           0|            0|          0|       0|         0|   0|
+----+-----+----+----------------+---------+----------------+-----------------+-------------+---------------+---------------+------------+-------------+-----------+--------+----------+----+



In [None]:
data = data.drop('Market Category')
data = data.na.drop()
print((data.count(), len(data.columns)))

(11812, 15)


Task 3 - Create ML pipeline to predict price (column MSRP)

Note - If you are starting from this task, you can run the cells from all previous tasks in the kernel by going to Runtime > Run before

In [None]:
#Input all the features in one vector column
assembler = VectorAssembler(inputCols=['Year', 'Engine HP', 'Engine Cylinders', 'Number of Doors', 
                                       'highway MPG', 'city mpg', 'Popularity'], outputCol = 'Attributes')
regressor = RandomForestRegressor(featuresCol = 'Attributes', labelCol = 'MSRP')
pipeline = Pipeline(stages=[assembler, regressor])
pipeline.write().overwrite().save("pipeline")
!ls

data.csv  sample_data		     spark-2.4.7-bin-hadoop2.7.tgz
pipeline  spark-2.4.7-bin-hadoop2.7


Task 4 - Load pipeline, cross validation and hyperparameter tuning

Note - If you are starting from this task, you can run the cells from all previous tasks in the kernel by going to Runtime > Run before

In [None]:
pipelineModel = Pipeline.load("pipeline")
paramGrid = ParamGridBuilder() \
    .addGrid(regressor.numTrees, [100, 500]) \
    .build()
crossval = CrossValidator(estimator=pipelineModel,
                          estimatorParamMaps=paramGrid,
                          evaluator=RegressionEvaluator(labelCol='MSRP'),
                          numFolds=3)

Task 5 - Fit CV on training data, and predict test set

Note - If you are starting from this task, you can run the cells from all previous tasks in the kernel by going to Runtime > Run before

In [None]:

train_data,test_data = data.randomSplit([0.8,0.2], seed=123)

cvModel = crossval.fit(train_data)

In [None]:
bestModel = cvModel.bestModel
for x in range(len(bestModel.stages)):
  print(bestModel.stages[x])

VectorAssembler_968b6f5f17ac
RandomForestRegressionModel (uid=RandomForestRegressor_34882f084fb1) with 500 trees


In [None]:
#To predict the prices on testing set
pred = cvModel.transform(test_data)

#Predict the model
pred.select("MSRP","prediction").show()

+-----+------------------+
| MSRP|        prediction|
+-----+------------------+
|29980|33086.174570805124|
|28030| 33021.34976306812|
|30030| 33021.34976306812|
|32700| 37030.55473131784|
|29350|23906.975271233478|
|31890|27567.168327076528|
|34980|27567.168327076528|
| 2799| 4999.192826374668|
| 2827| 5248.435283769505|
| 3381| 6089.018933711963|
|24450|26572.770116488176|
|21050|23339.154862056403|
| 2000| 5838.813653980177|
| 2181| 7919.398370644249|
| 2144| 5871.385720763212|
| 2265| 7980.795155637073|
|56780| 39820.59078192502|
|49440|39786.572676804324|
|50640|39786.572676804324|
|52640|39786.572676804324|
+-----+------------------+
only showing top 20 rows



Task 6 - Evaluate model with RMSE, MSE, MAE and R2 metrics on test predictions

Note - If you are starting from this task, you can run the cells from all previous tasks in the kernel by going to Runtime > Run before

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator

eval = RegressionEvaluator(labelCol="MSRP")

rmse = eval.evaluate(pred)
mse = eval.evaluate(pred, {eval.metricName: "mse"})
mae = eval.evaluate(pred, {eval.metricName: "mae"})
r2 = eval.evaluate(pred, {eval.metricName: "r2"})
# Root Mean Square Error
print("RMSE: %.3f" % rmse)
# Mean Square Error
print("MSE: %.3f" % mse)
# Mean Absolute Error
print("MAE: %.3f" % mae)
# r2 - coefficient of determination
print("r2: %.3f" %r2)



RMSE: 16643.009
MSE: 276989752.830
MAE: 8384.150
r2: 0.878
