Task 1 - Install Spark, load required libraries, set environment variables, initiate Spark, load file

In [2]:

from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import isnan, when, count, col, lit
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline
from pyspark.ml.tuning import CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder

# MODULES, CONTEXT, AND PATHING
from pyspark.sql import SparkSession
import os

spark = SparkSession.builder \
        .master("local") \
        .appName("mllib_classifier") \
        .getOrCreate()
sc = spark.sparkContext

In [27]:
# DATA IMPORT
df = spark.read.csv("data.csv", header=True, 
                    inferSchema=True)

In [28]:
df.show(5)

+----+----------+----+--------------------+---------+----------------+-----------------+----------------+---------------+--------------------+------------+-------------+-----------+--------+----------+-----+
|Make|     Model|Year|    Engine Fuel Type|Engine HP|Engine Cylinders|Transmission Type|   Driven_Wheels|Number of Doors|     Market Category|Vehicle Size|Vehicle Style|highway MPG|city mpg|Popularity| MSRP|
+----+----------+----+--------------------+---------+----------------+-----------------+----------------+---------------+--------------------+------------+-------------+-----------+--------+----------+-----+
| BMW|1 Series M|2011|premium unleaded ...|      335|               6|           MANUAL|rear wheel drive|              2|Factory Tuner,Lux...|     Compact|        Coupe|         26|      19|      3916|46135|
| BMW|  1 Series|2011|premium unleaded ...|      300|               6|           MANUAL|rear wheel drive|              2|  Luxury,Performance|     Compact|  Convertible

In [29]:
df.printSchema()


root
 |-- Make: string (nullable = true)
 |-- Model: string (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Engine Fuel Type: string (nullable = true)
 |-- Engine HP: integer (nullable = true)
 |-- Engine Cylinders: integer (nullable = true)
 |-- Transmission Type: string (nullable = true)
 |-- Driven_Wheels: string (nullable = true)
 |-- Number of Doors: integer (nullable = true)
 |-- Market Category: string (nullable = true)
 |-- Vehicle Size: string (nullable = true)
 |-- Vehicle Style: string (nullable = true)
 |-- highway MPG: integer (nullable = true)
 |-- city mpg: integer (nullable = true)
 |-- Popularity: integer (nullable = true)
 |-- MSRP: integer (nullable = true)



In [30]:
df.describe().toPandas().transpose()

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
Make,11914,,,Acura,Volvo
Model,11914,745.5822222222222,1490.8280590623795,1 Series,xD
Year,11914,2010.384337753903,7.5797398875957995,1990,2017
Engine Fuel Type,11911,,,diesel,regular unleaded
Engine HP,11845,249.38607007176023,109.19187025917194,55,1001
Engine Cylinders,11884,5.628828677213059,1.78055934824622,0,16
Transmission Type,11914,,,AUTOMATED_MANUAL,UNKNOWN
Driven_Wheels,11914,,,all wheel drive,rear wheel drive
Number of Doors,11908,3.4360933825999327,0.8813153865835529,2,4


In [31]:
from pyspark.sql.functions import col, isnan

df.select([count(when(isnan(col(c)) | col(c).isNull(), c)).alias(c) for c in df.columns]).show()


+----+-----+----+----------------+---------+----------------+-----------------+-------------+---------------+---------------+------------+-------------+-----------+--------+----------+----+
|Make|Model|Year|Engine Fuel Type|Engine HP|Engine Cylinders|Transmission Type|Driven_Wheels|Number of Doors|Market Category|Vehicle Size|Vehicle Style|highway MPG|city mpg|Popularity|MSRP|
+----+-----+----+----------------+---------+----------------+-----------------+-------------+---------------+---------------+------------+-------------+-----------+--------+----------+----+
|   0|    0|   0|               3|       69|              30|                0|            0|              6|              0|           0|            0|          0|       0|         0|   0|
+----+-----+----+----------------+---------+----------------+-----------------+-------------+---------------+---------------+------------+-------------+-----------+--------+----------+----+



In [32]:
# Replace null string values with none

from pyspark.sql.functions import when, lit

def replace(column, value):
    return when(column != value, column).otherwise(lit(None))

for col, dtype in df.dtypes:
    if dtype == 'string':
        df = df.withColumn(col, replace(df[col], "None"))


In [33]:
from pyspark.sql.functions import col, isnan

df.select([count(when(isnan(col(c)) | col(c).isNull(), c)).alias(c) for c in df.columns]).show()


+----+-----+----+----------------+---------+----------------+-----------------+-------------+---------------+---------------+------------+-------------+-----------+--------+----------+----+
|Make|Model|Year|Engine Fuel Type|Engine HP|Engine Cylinders|Transmission Type|Driven_Wheels|Number of Doors|Market Category|Vehicle Size|Vehicle Style|highway MPG|city mpg|Popularity|MSRP|
+----+-----+----+----------------+---------+----------------+-----------------+-------------+---------------+---------------+------------+-------------+-----------+--------+----------+----+
|   0|    0|   0|               3|       69|              30|                0|            0|              6|              0|           0|            0|          0|       0|         0|   0|
+----+-----+----+----------------+---------+----------------+-----------------+-------------+---------------+---------------+------------+-------------+-----------+--------+----------+----+



In [25]:
df.dtypes

[('Make', 'string'),
 ('Model', 'string'),
 ('Year', 'int'),
 ('Engine Fuel Type', 'string'),
 ('Engine HP', 'int'),
 ('Engine Cylinders', 'int'),
 ('Transmission Type', 'string'),
 ('Driven_Wheels', 'string'),
 ('Number of Doors', 'int'),
 ('Market Category', 'string'),
 ('Vehicle Size', 'string'),
 ('Vehicle Style', 'string'),
 ('highway MPG', 'int'),
 ('city mpg', 'int'),
 ('Popularity', 'int'),
 ('MSRP', 'int')]

In [34]:
df = df.drop('Market Category')
df = df.na.drop()

In [37]:
print((df.count(), len(df.columns)))

(11812, 15)


In [41]:
# PIPELINE

assembler = VectorAssembler(inputCols=['Year', 'Engine HP', 
                                       'Engine Cylinders', 
                                       'Number of Doors', 
                                       'highway MPG', 'city mpg', 
                                       'Popularity'], 
                                       outputCol='features')


regressor = RandomForestRegressor(featuresCol='features',
                                    labelCol='MSRP')


pipeline = Pipeline(stages=[assembler, regressor])

pipeline.write().overwrite().save("pipeline")

In [42]:
pipelineModel = Pipeline.load("pipeline")

In [43]:
paramGrid = ParamGridBuilder() \
    .addGrid(regressor.maxDepth, [2, 4, 6]) \
    .addGrid(regressor.maxBins, [20, 60]) \
    .addGrid(regressor.numTrees, [5, 20]) \
    .build()



In [45]:
crossval = CrossValidator(estimator=pipelineModel,
                            estimatorParamMaps=paramGrid,
                            evaluator=RegressionEvaluator(labelCol='MSRP'),
                            numFolds=3)

(trainingData, testData) = df.randomSplit([0.8, 0.2], seed=42)

cvModel = crossval.fit(trainingData)



In [52]:
bestModel = cvModel.bestModel

for x in range(len(bestModel.stages)):
    print(bestModel.stages[x])

VectorAssembler_be44e6a9edfb
RandomForestRegressionModel: uid=RandomForestRegressor_8fe897248263, numTrees=5, numFeatures=7


In [47]:
predictions = cvModel.transform(testData)

predictions.select("prediction", "MSRP", "features").show(5)



+------------------+-----+--------------------+
|        prediction| MSRP|            features|
+------------------+-----+--------------------+
|32968.902106967114|28030|[2002.0,225.0,6.0...|
|  39415.0326957185|30550|[2003.0,260.0,6.0...|
|  39415.0326957185|32700|[2003.0,260.0,6.0...|
|25801.847581796723|27900|[2016.0,201.0,4.0...|
|25801.847581796723|27990|[2017.0,201.0,4.0...|
+------------------+-----+--------------------+
only showing top 5 rows



In [53]:
evaluator = RegressionEvaluator(labelCol='MSRP')

rmse = evaluator.evaluate(predictions)

mse = evaluator.evaluate(predictions, {evaluator.metricName: "mse"})
mae = evaluator.evaluate(predictions, {evaluator.metricName: "mae"})
r2 = evaluator.evaluate(predictions, {evaluator.metricName: "r2"})

print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)
print("Mean Squared Error (MSE) on test data = %g" % mse)
print("Mean Absolute Error (MAE) on test data = %g" % mae)
print("R-squared (R2) on test data = %g" % r2)




Root Mean Squared Error (RMSE) on test data = 23682.6
Mean Squared Error (MSE) on test data = 5.60867e+08
Mean Absolute Error (MAE) on test data = 8502.48
R-squared (R2) on test data = 0.839937


In [50]:
rfModel = cvModel.bestModel.stages[1]

print(rfModel)


RandomForestRegressionModel: uid=RandomForestRegressor_8fe897248263, numTrees=5, numFeatures=7


In [51]:
print(rfModel.featureImportances)

(7,[0,1,2,3,4,5,6],[0.0849967946475923,0.3042879149452976,0.30244955177869476,0.007127274313479691,0.08609987135683908,0.15066756174545018,0.0643710312126464])
