Task 1 - Install Spark, load required libraries, set environment variables, initiate Spark, load file

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-2.4.7/spark-2.4.7-bin-hadoop2.7.tgz
!tar xf spark-2.4.7-bin-hadoop2.7.tgz
!pip install -q findspark

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.7-bin-hadoop2.7"
import findspark
findspark.init()
from google.colab import files
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import isnan, when, count, col, lit
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline
from pyspark.ml.tuning import CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder


In [None]:
from google.colab import files 
files.upload()

{}

In [None]:
sc =SparkSession.builder.master("local").getOrCreate()

In [None]:
!ls

data.csv     spark-2.4.7-bin-hadoop2.7	    spark-2.4.7-bin-hadoop2.7.tgz.1
sample_data  spark-2.4.7-bin-hadoop2.7.tgz


In [None]:
df = sc.read.csv('data.csv' , inferSchema = True ,header = True)

In [None]:
df.printSchema()

root
 |-- Make: string (nullable = true)
 |-- Model: string (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Engine Fuel Type: string (nullable = true)
 |-- Engine HP: integer (nullable = true)
 |-- Engine Cylinders: integer (nullable = true)
 |-- Transmission Type: string (nullable = true)
 |-- Driven_Wheels: string (nullable = true)
 |-- Number of Doors: integer (nullable = true)
 |-- Market Category: string (nullable = true)
 |-- Vehicle Size: string (nullable = true)
 |-- Vehicle Style: string (nullable = true)
 |-- highway MPG: integer (nullable = true)
 |-- city mpg: integer (nullable = true)
 |-- Popularity: integer (nullable = true)
 |-- MSRP: integer (nullable = true)



In [None]:
df.describe().toPandas().transpose()

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
Make,11914,,,Acura,Volvo
Model,11914,745.5822222222222,1490.8280590623795,1 Series,xD
Year,11914,2010.384337753903,7.5797398875957995,1990,2017
Engine Fuel Type,11911,,,diesel,regular unleaded
Engine HP,11845,249.38607007176023,109.19187025917194,55,1001
Engine Cylinders,11884,5.628828677213059,1.78055934824622,0,16
Transmission Type,11914,,,AUTOMATED_MANUAL,UNKNOWN
Driven_Wheels,11914,,,all wheel drive,rear wheel drive
Number of Doors,11908,3.4360933825999327,0.8813153865835529,2,4


Replace values of 'N/A' to None because nan values are expressed in string format 

In [None]:
def replace(column ,value ):
  return when(column != value ,column).otherwise(lit(None))
df = df.withColumn("Market category" , replace(col("Market Category") , "N/A"))  

In [None]:
df.select([count(when(isnan(c) | col(c).isNull() ,c)).alias(c) for c in df.columns]).show()

+----+-----+----+----------------+---------+----------------+-----------------+-------------+---------------+---------------+------------+-------------+-----------+--------+----------+----+
|Make|Model|Year|Engine Fuel Type|Engine HP|Engine Cylinders|Transmission Type|Driven_Wheels|Number of Doors|Market category|Vehicle Size|Vehicle Style|highway MPG|city mpg|Popularity|MSRP|
+----+-----+----+----------------+---------+----------------+-----------------+-------------+---------------+---------------+------------+-------------+-----------+--------+----------+----+
|   0|    0|   0|               3|       69|              30|                0|            0|              6|           3742|           0|            0|          0|       0|         0|   0|
+----+-----+----+----------------+---------+----------------+-----------------+-------------+---------------+---------------+------------+-------------+-----------+--------+----------+----+



In [None]:
# drop column of market category 
# drop na rows 
# show the len and count of dataframe 
df = df.drop('Market category')
df = df.na.drop()
print (df.count() , len(df.columns))

11812 15


## Random forest Pipeline 

### Vector of numeric features 

In [None]:
assembler = VectorAssembler(inputCols=['Year' , 'Engine HP' , 'Engine Cylinders' ,
                                       'Number of Doors' , 'highway MPG' , 'city mpg' ,
                                       'Popularity']
                            ,outputCol= 'Attributes')
regressor =RandomForestRegressor(featuresCol= 'Attributes' , labelCol='MSRP')

pipeline =Pipeline(stages= [assembler ,regressor])
pipeline.write().overwrite().save("pipeline")

In [None]:
# Make sure that pipeline folder exist
!ls

data.csv  sample_data		     spark-2.4.7-bin-hadoop2.7.tgz
pipeline  spark-2.4.7-bin-hadoop2.7  spark-2.4.7-bin-hadoop2.7.tgz.1


## HyperParameter Tuning 

In [None]:
## Load pipeline 
pipelineModel = Pipeline.load("pipeline")

In [None]:
paramgrid = ParamGridBuilder()\
.addGrid(regressor.numTrees , [100,500])\
.build()

crossval = CrossValidator(estimator=pipelineModel , 
                          estimatorParamMaps = paramgrid ,
                          evaluator = RegressionEvaluator(labelCol='MSRP'),
                          numFolds= 3)

## splitting data into training and testing data 

---

---





In [None]:
train_data , test_data = df.randomSplit([0.8,0.2] , seed = 123)
cvModel = crossval.fit(train_data)

In [None]:
bestModel = cvModel.bestModel
for x in range(len(bestModel.stages)):
  print(bestModel.stages[x])

VectorAssembler_8a6bf3dff02c
RandomForestRegressionModel (uid=RandomForestRegressor_387e798c0ec4) with 100 trees


## Testing our model 

In [None]:
pred = cvModel.transform(test_data)
pred.select('MSRP' , 'prediction').show()

+-----+------------------+
| MSRP|        prediction|
+-----+------------------+
|29980|33138.404083102156|
|28030| 33271.19296861404|
|30030| 33271.19296861404|
|32700|36842.402375035046|
|29350|23656.588891648218|
|31890|27863.327455186296|
|34980|27863.327455186296|
| 2799| 4912.678628412443|
| 2827| 5075.523903858093|
| 3381| 5685.350294688414|
|24450|26168.963067738583|
|21050|22178.078253991756|
| 2000| 4827.894962818531|
| 2181| 6261.377916348775|
| 2144| 4820.591928045666|
| 2265| 6333.294017194229|
|56780| 39869.53647074223|
|49440| 39869.53647074223|
|50640| 39869.53647074223|
|52640| 39869.53647074223|
+-----+------------------+
only showing top 20 rows



In [None]:
eval =RegressionEvaluator(labelCol="MSRP")
r2 = eval.evaluate(pred,{eval.metricName:"r2"})