<a href="https://colab.research.google.com/github/FuadKhalit/Pyspark_Repo/blob/main/Car_PricePrediction.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **Install the Dependencies**


In [3]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-2.4.7/spark-2.4.7-bin-hadoop2.7.tgz
!tar xf spark-2.4.7-bin-hadoop2.7.tgz
!pip install -q findspark

# **Import Library**

In [4]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.7-bin-hadoop2.7"
import findspark
findspark.init()
from google.colab import files
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import isnan, when, count, col, lit
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline
from pyspark.ml.tuning import CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder

In [5]:
# start spark session locally
sc = SparkSession.builder.master("local[*]").getOrCreate()

# **Upload Dataset**

In [6]:
#upload file from google drive
file_path = '/content/drive/MyDrive/CarFeatureMSRP.csv'
car = sc.read.format('csv').option('header', True).option('inferSchema', True).load(file_path)
car.show()

#Upload file directly from Local Machine
#files.upload()
#car = sc.read.csv('CarFeatureMSRP.csv',inferSchema=True, header=True)

+----+----------+----+--------------------+---------+----------------+-----------------+-----------------+---------------+--------------------+------------+-------------+-----------+--------+----------+-----+
|Make|     Model|Year|    Engine Fuel Type|Engine HP|Engine Cylinders|Transmission Type|    Driven_Wheels|Number of Doors|     Market Category|Vehicle Size|Vehicle Style|highway MPG|city mpg|Popularity| MSRP|
+----+----------+----+--------------------+---------+----------------+-----------------+-----------------+---------------+--------------------+------------+-------------+-----------+--------+----------+-----+
| BMW|1 Series M|2011|premium unleaded ...|      335|               6|           MANUAL| rear wheel drive|              2|Factory Tuner,Lux...|     Compact|        Coupe|         26|      19|      3916|46135|
| BMW|  1 Series|2011|premium unleaded ...|      300|               6|           MANUAL| rear wheel drive|              2|  Luxury,Performance|     Compact|  Conver

# **Data Exploration**

In [7]:
car.printSchema()
car.describe().toPandas().transpose()

root
 |-- Make: string (nullable = true)
 |-- Model: string (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Engine Fuel Type: string (nullable = true)
 |-- Engine HP: integer (nullable = true)
 |-- Engine Cylinders: integer (nullable = true)
 |-- Transmission Type: string (nullable = true)
 |-- Driven_Wheels: string (nullable = true)
 |-- Number of Doors: integer (nullable = true)
 |-- Market Category: string (nullable = true)
 |-- Vehicle Size: string (nullable = true)
 |-- Vehicle Style: string (nullable = true)
 |-- highway MPG: integer (nullable = true)
 |-- city mpg: integer (nullable = true)
 |-- Popularity: integer (nullable = true)
 |-- MSRP: integer (nullable = true)



Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
Make,11914,,,Acura,Volvo
Model,11914,745.5822222222222,1490.8280590623795,1 Series,xD
Year,11914,2010.384337753903,7.5797398875957995,1990,2017
Engine Fuel Type,11911,,,diesel,regular unleaded
Engine HP,11845,249.38607007176023,109.19187025917194,55,1001
Engine Cylinders,11884,5.628828677213059,1.78055934824622,0,16
Transmission Type,11914,,,AUTOMATED_MANUAL,UNKNOWN
Driven_Wheels,11914,,,all wheel drive,rear wheel drive
Number of Doors,11908,3.4360933825999327,0.8813153865835529,2,4


In [8]:
def replace(column, value):
  return when(column != value, column).otherwise(lit(None))
car = car.withColumn("Market Category", replace (col("Market Category"), "N/A"))

In [9]:
#show missing value in dataset
car.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in car.columns]).show()

+----+-----+----+----------------+---------+----------------+-----------------+-------------+---------------+---------------+------------+-------------+-----------+--------+----------+----+
|Make|Model|Year|Engine Fuel Type|Engine HP|Engine Cylinders|Transmission Type|Driven_Wheels|Number of Doors|Market Category|Vehicle Size|Vehicle Style|highway MPG|city mpg|Popularity|MSRP|
+----+-----+----+----------------+---------+----------------+-----------------+-------------+---------------+---------------+------------+-------------+-----------+--------+----------+----+
|   0|    0|   0|               3|       69|              30|                0|            0|              6|           3742|           0|            0|          0|       0|         0|   0|
+----+-----+----+----------------+---------+----------------+-----------------+-------------+---------------+---------------+------------+-------------+-----------+--------+----------+----+



In [10]:
car = car.drop("Market Category")
car = car.na.drop()
print((car.count(), len(car.columns)))

(11812, 15)


This will print the number of row (11,812) & column (12) after dropping

# **Random Forest Pipeline**

In [11]:
assembler = VectorAssembler(inputCols=['Year', 'Engine HP', 'Engine Cylinders', 
                                       'Number of Doors', 'highway MPG', 'city mpg', 'Popularity', 
                                       ], outputCol='Attributes') #combine the value of input into single vector & store to attributes

regressor = RandomForestRegressor(featuresCol='Attributes', labelCol='MSRP')

pipeline = Pipeline(stages=[assembler, regressor])

pipeline.write().overwrite().save("pipeline") #pipeline is save in folder name 'pipeline'

In [12]:
!ls #check the folder in filesystem

drive	  sample_data		     spark-2.4.7-bin-hadoop2.7.tgz
pipeline  spark-2.4.7-bin-hadoop2.7


In [16]:
#load pipeline, create hyperparameter grid & create crossvalidator
pipelineModel = Pipeline.load("pipeline")
paramgrid = ParamGridBuilder() \
  .addGrid(regressor.numTrees, [100,500]) \
  .build()
crossval = CrossValidator(estimator=pipelineModel,
                          estimatorParamMaps = paramgrid,
                          evaluator=RegressionEvaluator(labelCol='MSRP'),
                          numFolds=3)

In [18]:
#pass the data, create model & predict
train_data, test_data = car.randomSplit([0.8,0.2], seed=123)
cvModel = crossval.fit(train_data)


In [19]:
bestModel = cvModel.bestModel
for x in range(len(bestModel.stages)):
  print(bestModel.stages[x])

VectorAssembler_e65f5a8341b9
RandomForestRegressionModel (uid=RandomForestRegressor_1e30b8d48b91) with 100 trees


In [20]:
pred = cvModel.transform(test_data)
pred.select('MSRP','prediction').show()

+-----+------------------+
| MSRP|        prediction|
+-----+------------------+
|29980|31337.212306144535|
|28030|31337.212306144535|
|30030|31337.212306144535|
|32700| 37053.33112287236|
|29350|23021.655422945656|
|31890|28174.999414337697|
|34980|28174.999414337697|
| 2799| 6178.003976382511|
| 2827| 5565.348544467466|
| 3381| 7165.369213043112|
|24450|27376.959152733936|
|21050| 22754.97989433297|
| 2000|  5130.32252284835|
| 2181| 9420.359314829693|
| 2144| 5240.749645039148|
| 2265| 9449.533218132121|
|56780|40631.630199128966|
|49440| 40702.10939543178|
|50640| 40702.10939543178|
|52640| 40702.10939543178|
+-----+------------------+
only showing top 20 rows



In [21]:
eval = RegressionEvaluator(labelCol='MSRP')
rmse = eval.evaluate(pred)
mse = eval.evaluate(pred, {eval.metricName: "mse"})
mae = eval.evaluate(pred, {eval.metricName: "mae"})
rsquare = eval.evaluate(pred, {eval.metricName: "r2"})

print("RMSE: %.3f" %rmse)
print("MSE: %.3f" %mse)
print("MAE: %.3f" %mae)
print("RSquare: %.3f" %rsquare)


RMSE: 16331.103
MSE: 266704919.950
MAE: 8203.649
RSquare: 0.883
