# Setting Up spark enviroment

In [1]:
pip install pyspark

Note: you may need to restart the kernel to use updated packages.


In [2]:
pip install findspark

Note: you may need to restart the kernel to use updated packages.


In [3]:
import findspark

In [4]:
findspark.init()

In [5]:
pip install matplotlib

Note: you may need to restart the kernel to use updated packages.


# Importing Libraries

In [6]:
import pandas as pd

In [7]:
import matplotlib.pyplot as plt

In [8]:
from pyspark.sql import SparkSession

In [9]:
from pyspark import SparkContext,SparkConf

# Spark Context and Session

In [10]:
from pyspark import SparkContext,SparkConf

In [11]:
## Creating a spark context class
sc=SparkSession.builder.master("local[*]").getOrCreate()
sc

# Data loading

In [12]:
data=sc.read.csv("CarPricePrediction.csv",inferSchema=True,header=True)
data.show(5)

+----+----------+----+--------------------+---------+----------------+-----------------+----------------+---------------+--------------------+------------+-------------+-----------+--------+----------+-----+
|Make|     Model|Year|    Engine Fuel Type|Engine HP|Engine Cylinders|Transmission Type|   Driven_Wheels|Number of Doors|     Market Category|Vehicle Size|Vehicle Style|highway MPG|city mpg|Popularity| MSRP|
+----+----------+----+--------------------+---------+----------------+-----------------+----------------+---------------+--------------------+------------+-------------+-----------+--------+----------+-----+
| BMW|1 Series M|2011|premium unleaded ...|      335|               6|           MANUAL|rear wheel drive|              2|Factory Tuner,Lux...|     Compact|        Coupe|         26|      19|      3916|46135|
| BMW|  1 Series|2011|premium unleaded ...|      300|               6|           MANUAL|rear wheel drive|              2|  Luxury,Performance|     Compact|  Convertible

In [13]:
data.printSchema()

root
 |-- Make: string (nullable = true)
 |-- Model: string (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Engine Fuel Type: string (nullable = true)
 |-- Engine HP: integer (nullable = true)
 |-- Engine Cylinders: integer (nullable = true)
 |-- Transmission Type: string (nullable = true)
 |-- Driven_Wheels: string (nullable = true)
 |-- Number of Doors: integer (nullable = true)
 |-- Market Category: string (nullable = true)
 |-- Vehicle Size: string (nullable = true)
 |-- Vehicle Style: string (nullable = true)
 |-- highway MPG: integer (nullable = true)
 |-- city mpg: integer (nullable = true)
 |-- Popularity: integer (nullable = true)
 |-- MSRP: integer (nullable = true)



# Statistical Analysis

In [14]:
data.describe().toPandas().transpose()

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
Make,11914,,,Acura,Volvo
Model,11914,745.5822222222222,1490.8280590623795,09-Mar,xD
Year,11914,2010.384337753903,7.5797398875957995,1990,2017
Engine Fuel Type,11911,,,diesel,regular unleaded
Engine HP,11845,249.38607007176023,109.19187025917194,55,1001
Engine Cylinders,11884,5.628828677213059,1.78055934824622,0,16
Transmission Type,11914,,,AUTOMATED_MANUAL,UNKNOWN
Driven_Wheels,11914,,,all wheel drive,rear wheel drive
Number of Doors,11908,3.4360933825999327,0.8813153865835529,2,4


# Data Cleaning

In [15]:
from pyspark.sql.functions import col,when,lit

In [16]:
def replace(column,value):
    return when(column!=value,column).otherwise(lit(None))

In [17]:
data = data.withColumn("Market Category", replace(col("Market Category"),"N/A"))

# Null Values

In [18]:
from pyspark.sql.functions import when,lit,count,isnan,col
data.select([count(when(isnan(c)|col(c).isNull(),c)).alias(c) for c in data.columns]).show()

+----+-----+----+----------------+---------+----------------+-----------------+-------------+---------------+---------------+------------+-------------+-----------+--------+----------+----+
|Make|Model|Year|Engine Fuel Type|Engine HP|Engine Cylinders|Transmission Type|Driven_Wheels|Number of Doors|Market Category|Vehicle Size|Vehicle Style|highway MPG|city mpg|Popularity|MSRP|
+----+-----+----+----------------+---------+----------------+-----------------+-------------+---------------+---------------+------------+-------------+-----------+--------+----------+----+
|   0|    0|   0|               3|       69|              30|                0|            0|              6|           3742|           0|            0|          0|       0|         0|   0|
+----+-----+----+----------------+---------+----------------+-----------------+-------------+---------------+---------------+------------+-------------+-----------+--------+----------+----+



# Dropping the NaN values

In [19]:
#deleting the column Market Category
data = data.drop("Market Category")
# deleting the all null values 
data = data.na.drop()

In [20]:
print((data.count(), len(data.columns)))

(11812, 15)


# Feature Vectors in Spark ML-lib

In [21]:
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols = ["Year","Engine HP","Engine Cylinders","Number of Doors","Popularity",
                                        "highway MPG","city mpg"],
                           outputCol = "Input Attributes")

# Model and Pipeline

In [22]:
from pyspark.ml.regression import RandomForestRegressor
regressor = RandomForestRegressor(labelCol = "MSRP",featuresCol = 'Input Attributes')

# Pipeline

In [23]:
from pyspark.ml import Pipeline
pipeline  = Pipeline(stages = [assembler,regressor])
#--Saving the Pipeline
pipeline.write().overwrite().save("pipeline_saved_model")

# Loading the Pipeline

In [24]:
from pyspark.ml.regression import LinearRegression


In [25]:
pipelineModel = Pipeline.load('./pipeline_saved_model')

# Splitting the data

In [26]:
data_train , data_test = data.randomSplit([0.8,0.2], seed = 123)

# Training the Pipeline

In [30]:
Model = pipelineModel.fit(data_train)

# Prediction

In [31]:
pred = Model.transform(data_test)

# Model Evaluation

In [34]:
from pyspark.ml.evaluation import RegressionEvaluator
eval = RegressionEvaluator(labelCol = 'MSRP')
rmse = eval.evaluate(pred, {eval.metricName:'rmse'})
r2 =eval.evaluate(pred,{eval.metricName:'r2'})
mae =eval.evaluate(pred,{eval.metricName:'mae'})

# Printing the Result

In [35]:
print("RMSE: %.2f" %rmse)
print("MAE: %.2f" %mae)
print("R2: %.2f" %r2)

RMSE: 38413.23
MAE: 9696.86
R2: 0.75
