Task 1 - Install Spark, load required libraries, set environment variables, initiate Spark, load file

In [37]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-2.4.8/spark-2.4.8-bin-hadoop2.7.tgz
!tar xf spark-2.4.8-bin-hadoop2.7.tgz
!pip install -q findspark

In [38]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.8-bin-hadoop2.7"
import findspark
findspark.init()
from google.colab import files
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import isnan, when, count, col, lit
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline
from pyspark.ml.tuning import CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder


In [39]:
#start Spark connection
sc = SparkSession.builder.master('local[*]').getOrCreate()

In [40]:
#upload dataset to Google Colab
#files.upload()

In [41]:
!ls

'data (1).csv'	 spark-2.4.8-bin-hadoop2.7
 data.csv	 spark-2.4.8-bin-hadoop2.7.tgz
 pipeline	 spark-2.4.8-bin-hadoop2.7.tgz.1
 sample_data


# Load Data

In [42]:
#read data
data = sc.read.csv('data.csv', inferSchema=True, header=True)
data.show()

+----+----------+----+--------------------+---------+----------------+-----------------+-----------------+---------------+--------------------+------------+-------------+-----------+--------+----------+-----+
|Make|     Model|Year|    Engine Fuel Type|Engine HP|Engine Cylinders|Transmission Type|    Driven_Wheels|Number of Doors|     Market Category|Vehicle Size|Vehicle Style|highway MPG|city mpg|Popularity| MSRP|
+----+----------+----+--------------------+---------+----------------+-----------------+-----------------+---------------+--------------------+------------+-------------+-----------+--------+----------+-----+
| BMW|1 Series M|2011|premium unleaded ...|      335|               6|           MANUAL| rear wheel drive|              2|Factory Tuner,Lux...|     Compact|        Coupe|         26|      19|      3916|46135|
| BMW|  1 Series|2011|premium unleaded ...|      300|               6|           MANUAL| rear wheel drive|              2|  Luxury,Performance|     Compact|  Conver

# Scrub Data

In [43]:
#check schema
data.printSchema()

root
 |-- Make: string (nullable = true)
 |-- Model: string (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Engine Fuel Type: string (nullable = true)
 |-- Engine HP: integer (nullable = true)
 |-- Engine Cylinders: integer (nullable = true)
 |-- Transmission Type: string (nullable = true)
 |-- Driven_Wheels: string (nullable = true)
 |-- Number of Doors: integer (nullable = true)
 |-- Market Category: string (nullable = true)
 |-- Vehicle Size: string (nullable = true)
 |-- Vehicle Style: string (nullable = true)
 |-- highway MPG: integer (nullable = true)
 |-- city mpg: integer (nullable = true)
 |-- Popularity: integer (nullable = true)
 |-- MSRP: integer (nullable = true)



In [44]:
#check statistics
data.describe().toPandas().transpose()

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
Make,11914,,,Acura,Volvo
Model,11914,745.5822222222222,1490.8280590623795,1 Series,xD
Year,11914,2010.384337753903,7.5797398875957995,1990,2017
Engine Fuel Type,11911,,,diesel,regular unleaded
Engine HP,11845,249.38607007176023,109.19187025917194,55,1001
Engine Cylinders,11884,5.628828677213059,1.78055934824622,0,16
Transmission Type,11914,,,AUTOMATED_MANUAL,UNKNOWN
Driven_Wheels,11914,,,all wheel drive,rear wheel drive
Number of Doors,11908,3.4360933825999327,0.8813153865835529,2,4


In [45]:
#check null values
def replace(column, value):
  return when(column != value, column).otherwise(lit(None))

#try it out on the 'Market Category' column, replace all occurence of string value 'N/A' with 'None'
data = data.withColumn('Market Category', replace(col('Market Category'), 'N/A'))
data.show()

+----+----------+----+--------------------+---------+----------------+-----------------+-----------------+---------------+--------------------+------------+-------------+-----------+--------+----------+-----+
|Make|     Model|Year|    Engine Fuel Type|Engine HP|Engine Cylinders|Transmission Type|    Driven_Wheels|Number of Doors|     Market Category|Vehicle Size|Vehicle Style|highway MPG|city mpg|Popularity| MSRP|
+----+----------+----+--------------------+---------+----------------+-----------------+-----------------+---------------+--------------------+------------+-------------+-----------+--------+----------+-----+
| BMW|1 Series M|2011|premium unleaded ...|      335|               6|           MANUAL| rear wheel drive|              2|Factory Tuner,Lux...|     Compact|        Coupe|         26|      19|      3916|46135|
| BMW|  1 Series|2011|premium unleaded ...|      300|               6|           MANUAL| rear wheel drive|              2|  Luxury,Performance|     Compact|  Conver

In [46]:
#count number of missing values
data.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in data.columns]).show()

+----+-----+----+----------------+---------+----------------+-----------------+-------------+---------------+---------------+------------+-------------+-----------+--------+----------+----+
|Make|Model|Year|Engine Fuel Type|Engine HP|Engine Cylinders|Transmission Type|Driven_Wheels|Number of Doors|Market Category|Vehicle Size|Vehicle Style|highway MPG|city mpg|Popularity|MSRP|
+----+-----+----+----------------+---------+----------------+-----------------+-------------+---------------+---------------+------------+-------------+-----------+--------+----------+----+
|   0|    0|   0|               3|       69|              30|                0|            0|              6|           3742|           0|            0|          0|       0|         0|   0|
+----+-----+----+----------------+---------+----------------+-----------------+-------------+---------------+---------------+------------+-------------+-----------+--------+----------+----+



In [47]:
#'Market Category' contains more than 30% in null values, we willd drop it
data = data.drop('Market Category')

In [48]:
#drop null
data = data.na.drop()

In [49]:
print((data.count(), len(data.columns)))

(11812, 15)


# Create Random Forest Pipline

In [50]:
#assemble all numeric columns into one vector of features
assembler = VectorAssembler(inputCols=['Year', 'Engine HP', 'Engine Cylinders', 'Number of Doors', 'highway MPG', 'city mpg', 'Popularity'],
                            outputCol='Attributes')

#create a regressor to predict car price
regressor = RandomForestRegressor(featuresCol='Attributes',
                                  labelCol='MSRP')

#create pipeline
pipeline = Pipeline(stages=[assembler, regressor])

#save pipeline
pipeline.write().overwrite().save('pipeline')
!ls

'data (1).csv'	 spark-2.4.8-bin-hadoop2.7
 data.csv	 spark-2.4.8-bin-hadoop2.7.tgz
 pipeline	 spark-2.4.8-bin-hadoop2.7.tgz.1
 sample_data


# Create a Crossvalidation for Hyperparameter Tuning

In [51]:
#load pipeline
pipelineModel = Pipeline.load('pipeline')

In [52]:
#build paramgrid
paramGrid = ParamGridBuilder().addGrid(regressor.numTrees, [100, 500]).build()

In [53]:
#build crossvalidator
crossval = CrossValidator(estimator=pipelineModel,
                          estimatorParamMaps=paramGrid,
                          evaluator=RegressionEvaluator(labelCol='MSRP'), #MSRP is the column we want to predict
                          numFolds=10)

# Train & Test Model

In [54]:
#train test split
train_data, test_data = data.randomSplit([0.8, 0.2], seed=123)

In [55]:
#fit
cvModel = crossval.fit(train_data)

In [56]:
#extract best model and view all the stages of the pipeline that our data went through
bestModel = cvModel.bestModel
for x in range(len(bestModel.stages)):
  print(bestModel.stages[x])

VectorAssembler_6a85cffe3df4
RandomForestRegressionModel (uid=RandomForestRegressor_e1b5a447ddb8) with 500 trees


In [57]:
#transform the test set (use cvModel as it knows to pick the best model to use)
pred = cvModel.transform(test_data)
pred.select('MSRP', 'prediction').show()

+-----+------------------+
| MSRP|        prediction|
+-----+------------------+
|29980| 33118.51355871024|
|28030| 33111.69644092475|
|30030| 33111.69644092475|
|32700| 36944.58128887873|
|29350| 23777.19840488744|
|31890|27425.207432530147|
|34980|27425.207432530147|
| 2799| 4710.279495735422|
| 2827| 4702.093674582832|
| 3381| 5795.900571510275|
|24450| 26963.53600785278|
|21050|23335.799453528663|
| 2000| 5881.886615086319|
| 2181| 7579.665887380805|
| 2144| 5866.794024208272|
| 2265| 7609.066010258098|
|56780| 39508.53715538059|
|49440| 39481.70065393558|
|50640| 39481.70065393558|
|52640| 39481.70065393558|
+-----+------------------+
only showing top 20 rows



# Evaluate Model

In [58]:
#evaluate
eval = RegressionEvaluator(labelCol='MSRP')

#get rmse
rmse = eval.evaluate(pred)

#get mse
mse = eval.evaluate(pred, {eval.metricName:'mse'})

#get mae
mae = eval.evaluate(pred, {eval.metricName:'mae'})

#get r2
r2 = eval.evaluate(pred, {eval.metricName:'r2'})

#print
print('RMSE: %3f' %rmse)
print('MSE: %3f' %mse)
print('MAE: %3f' %mae)
print('R2: %3f' %r2)

RMSE: 16506.042010
MSE: 272449422.844313
MAE: 8304.383455
R2: 0.880353
