# **Machine Learning Pipelines in PySpark MLlib**

Predict car prices in PySpark using a Random Forest model.

Dataset:



**Install libraries, packages and datasets.**

In [2]:
# Install PySpark

!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 43 kB/s 
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 61.8 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.0-py2.py3-none-any.whl size=281764026 sha256=7de131152b83c0895a589f3c21d3399073cef8c906092971c1a69c9b91623b71
  Stored in directory: /root/.cache/pip/wheels/7a/8e/1b/f73a52650d2e5f337708d9f6a1750d451a7349a867f928b885
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.0


In [55]:
import os

from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import isnan, when, count, col, lit
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline
from pyspark.ml.tuning import CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder
from pyspark.ml.pipeline import PipelineModel

Initialize Spark session.

In [6]:
sc=SparkSession.builder.master('local[*]').getOrCreate()

Import the dataset.

In [7]:
data=sc.read.csv('/content/drive/MyDrive/Colab Notebooks/Spark/ML Pipelines in PySpark MLlib/data.csv', inferSchema=True, header=True)

See the schema.

In [10]:
data.printSchema()

root
 |-- Make: string (nullable = true)
 |-- Model: string (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Engine Fuel Type: string (nullable = true)
 |-- Engine HP: integer (nullable = true)
 |-- Engine Cylinders: integer (nullable = true)
 |-- Transmission Type: string (nullable = true)
 |-- Driven_Wheels: string (nullable = true)
 |-- Number of Doors: integer (nullable = true)
 |-- Market Category: string (nullable = true)
 |-- Vehicle Size: string (nullable = true)
 |-- Vehicle Style: string (nullable = true)
 |-- highway MPG: integer (nullable = true)
 |-- city mpg: integer (nullable = true)
 |-- Popularity: integer (nullable = true)
 |-- MSRP: integer (nullable = true)



In [16]:
# Check 10 rows of data.

data.show(10)

+----+----------+----+--------------------+---------+----------------+-----------------+----------------+---------------+--------------------+------------+-------------+-----------+--------+----------+-----+
|Make|     Model|Year|    Engine Fuel Type|Engine HP|Engine Cylinders|Transmission Type|   Driven_Wheels|Number of Doors|     Market Category|Vehicle Size|Vehicle Style|highway MPG|city mpg|Popularity| MSRP|
+----+----------+----+--------------------+---------+----------------+-----------------+----------------+---------------+--------------------+------------+-------------+-----------+--------+----------+-----+
| BMW|1 Series M|2011|premium unleaded ...|      335|               6|           MANUAL|rear wheel drive|              2|Factory Tuner,Lux...|     Compact|        Coupe|         26|      19|      3916|46135|
| BMW|  1 Series|2011|premium unleaded ...|      300|               6|           MANUAL|rear wheel drive|              2|  Luxury,Performance|     Compact|  Convertible

See the summary statistics.

In [9]:
# Use the toPandas & transpose functions to generate a Pandas transposed summary table.

data.describe().toPandas().transpose()

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
Make,11914,,,Acura,Volvo
Model,11914,745.5822222222222,1490.8280590623795,1 Series,xD
Year,11914,2010.384337753903,7.5797398875957995,1990,2017
Engine Fuel Type,11911,,,diesel,regular unleaded
Engine HP,11845,249.38607007176023,109.19187025917194,55,1001
Engine Cylinders,11884,5.628828677213059,1.78055934824622,0,16
Transmission Type,11914,,,AUTOMATED_MANUAL,UNKNOWN
Driven_Wheels,11914,,,all wheel drive,rear wheel drive
Number of Doors,11908,3.4360933825999327,0.8813153865835529,2,4


We need to replace missing values inserted as NA strings with nulls so that PySpark can properly identify them as null values.

In [11]:
# Replace string NA values with nulls.

def replace(column, value):
    return when(column != value, column).otherwise(lit(None))
data=data.withColumn('Market Category', replace(col('Market Category'), "N/A"))

Check how many null values there are in the dataset.

In [13]:
data.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in data.columns]).show()

+----+-----+----+----------------+---------+----------------+-----------------+-------------+---------------+---------------+------------+-------------+-----------+--------+----------+----+
|Make|Model|Year|Engine Fuel Type|Engine HP|Engine Cylinders|Transmission Type|Driven_Wheels|Number of Doors|Market Category|Vehicle Size|Vehicle Style|highway MPG|city mpg|Popularity|MSRP|
+----+-----+----+----------------+---------+----------------+-----------------+-------------+---------------+---------------+------------+-------------+-----------+--------+----------+----+
|   0|    0|   0|               3|       69|              30|                0|            0|              6|           3742|           0|            0|          0|       0|         0|   0|
+----+-----+----+----------------+---------+----------------+-----------------+-------------+---------------+---------------+------------+-------------+-----------+--------+----------+----+



Market Category has a high number of missing values, we will check what % of values are missing.

In [35]:
# Total rows in the dataset.
(data.count())

11914

In [32]:
# Total number of missing Market Category variables.
data.select([count(when(col('Market Category').isNull(),True))]).show()


+--------------------------------------------------------+
|count(CASE WHEN (Market Category IS NULL) THEN true END)|
+--------------------------------------------------------+
|                                                    3742|
+--------------------------------------------------------+



In [47]:
# Percentage of missing variables per variable.

amount_missing_df =data.select([(count(when(isnan(c) | col(c).isNull(), c))/count(lit(1))).alias(c) for c in data.columns])
amount_missing_df.show()

+----+-----+----+--------------------+--------------------+--------------------+-----------------+-------------+--------------------+-------------------+------------+-------------+-----------+--------+----------+----+
|Make|Model|Year|    Engine Fuel Type|           Engine HP|    Engine Cylinders|Transmission Type|Driven_Wheels|     Number of Doors|    Market Category|Vehicle Size|Vehicle Style|highway MPG|city mpg|Popularity|MSRP|
+----+-----+----+--------------------+--------------------+--------------------+-----------------+-------------+--------------------+-------------------+------------+-------------+-----------+--------+----------+----+
| 0.0|  0.0| 0.0|2.518045996306866E-4|0.005791505791505791|0.002518045996306866|              0.0|          0.0|5.036091992613732E-4|0.31408427060600974|         0.0|          0.0|        0.0|     0.0|       0.0| 0.0|
+----+-----+----+--------------------+--------------------+--------------------+-----------------+-------------+----------------

Since on Market Category we have about 30% missing values and therefore it would not be very relevant to replace them with the mode (since its a categorical value), we will drop the column.

In [50]:
data=data.drop('Market Category')
data=data.na.drop()

print((data.count(), len(data.columns)))

(11812, 15)


# **Create a Random Forest Pipeline**

In [54]:
assembler=VectorAssembler(inputCols=['Year', 'Engine HP', 'Engine Cylinders', 'Number of Doors', 'highway MPG', 'city mpg', 'Popularity'], 
                          outputCol='Attributes')
regressor=RandomForestRegressor(featuresCol='Attributes', labelCol='MSRP')

pipeline=Pipeline(stages=[assembler, regressor])
pipeline.write().overwrite().save('pipeline')
!ls

drive  pipeline  sample_data


# **Create a cross validator for HyperParameter tuning**

In [58]:
pipelineModel=Pipeline.load('pipeline')
paramGrid=ParamGridBuilder()  \
          .addGrid(regressor.numTrees, [100, 500]) \
          .build()

crossval=CrossValidator(estimator=pipelineModel,
                        estimatorParamMaps=paramGrid,
                        evaluator=RegressionEvaluator(labelCol='MSRP'),
                        numFolds=3)

# **Train model and generate predictions.**

In [59]:
train_data, test_data=data.randomSplit([0.8,0.2], seed=123)
cvModel=crossval.fit(train_data)

In [61]:
bestModel=cvModel.bestModel
for x in range(len(bestModel.stages)):
    print(bestModel.stages[x])

VectorAssembler_72616384d573
RandomForestRegressionModel: uid=RandomForestRegressor_fbbf552edff1, numTrees=100, numFeatures=7


In [63]:
pred=cvModel.transform(test_data)
pred.select('MSRP', 'prediction').show()

+-----+------------------+
| MSRP|        prediction|
+-----+------------------+
|28030|31936.890663075996|
|30550| 37620.33025306393|
|29350| 28083.28165492313|
|27900| 27386.61754491371|
|34890| 27386.61754491371|
|32990| 27386.61754491371|
| 2827| 5281.016249484541|
| 3000| 5281.016249484541|
| 3086| 5487.674571093254|
| 3130| 5487.674571093254|
| 3012| 5286.568754407562|
| 3622|  6344.62306670122|
|22300|23705.812630705786|
|19400|22253.565282897704|
| 2042| 5682.278399153643|
| 2144| 5132.404322379251|
|49440|39801.327587435866|
|52640|39801.327587435866|
|47440| 39699.71096440099|
|58400| 39895.67628400238|
+-----+------------------+
only showing top 20 rows



# **Evaluate model performance**

In [65]:
# Performance metrics

eval=RegressionEvaluator(labelCol='MSRP')
rmse=eval.evaluate(pred)
mse=eval.evaluate(pred, {eval.metricName: "mse"})
mae=eval.evaluate(pred, {eval.metricName: "mae"})
r2=eval.evaluate(pred, {eval.metricName: "r2"})

print("RMSE: %.3f" %rmse)
print("MSE: %.3f" %mse)
print("MAE: %.3f" %mae)
print("R2: %.3f" %r2)

RMSE: 36032.887
MSE: 1298368909.682
MAE: 9745.590
R2: 0.782
