Features of MLlib
Spark provides a separate package to handle all the machine learning tasks. There are some features of MLlib is listed below:

MLlib provides a similar API to the  other machine learning package.
MLlib supports almost all types of machine learning tasks.
It also supports computer vision and natural language processing.
It can be implemented in the near-real-time operations.


Car Price Prediction Pipeline with Spark

This notebook demonstrates the process of creating a data pipeline using Apache Spark to predict car prices. The pipeline includes data preprocessing, feature engineering, model training


#Setup and Imports



In [1]:
!pip install pyspark
!pip install findspark
import findspark
findspark.init()



In [2]:
import pandas as pd
import matplotlib.pyplot as plt
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf
from pyspark.sql.functions import when,lit,count,isnan,col
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml import Pipeline
from pyspark.ml.tuning import ParamGridBuilder
from pyspark.ml.tuning import CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import isnan, isnull


In [3]:
# Creating a spark context class
sc = SparkSession.builder.master("local[*]").getOrCreate()

#Load and Inspect Data

In [4]:
data = sc.read.csv('data.csv', inferSchema = True, header = True)
data.show(5)

+----+----------+----+--------------------+---------+----------------+-----------------+----------------+---------------+--------------------+------------+-------------+-----------+--------+----------+-----+
|Make|     Model|Year|    Engine Fuel Type|Engine HP|Engine Cylinders|Transmission Type|   Driven_Wheels|Number of Doors|     Market Category|Vehicle Size|Vehicle Style|highway MPG|city mpg|Popularity| MSRP|
+----+----------+----+--------------------+---------+----------------+-----------------+----------------+---------------+--------------------+------------+-------------+-----------+--------+----------+-----+
| BMW|1 Series M|2011|premium unleaded ...|      335|               6|           MANUAL|rear wheel drive|              2|Factory Tuner,Lux...|     Compact|        Coupe|         26|      19|      3916|46135|
| BMW|  1 Series|2011|premium unleaded ...|      300|               6|           MANUAL|rear wheel drive|              2|  Luxury,Performance|     Compact|  Convertible

In [5]:
data.printSchema()


root
 |-- Make: string (nullable = true)
 |-- Model: string (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Engine Fuel Type: string (nullable = true)
 |-- Engine HP: integer (nullable = true)
 |-- Engine Cylinders: integer (nullable = true)
 |-- Transmission Type: string (nullable = true)
 |-- Driven_Wheels: string (nullable = true)
 |-- Number of Doors: integer (nullable = true)
 |-- Market Category: string (nullable = true)
 |-- Vehicle Size: string (nullable = true)
 |-- Vehicle Style: string (nullable = true)
 |-- highway MPG: integer (nullable = true)
 |-- city mpg: integer (nullable = true)
 |-- Popularity: integer (nullable = true)
 |-- MSRP: integer (nullable = true)



In [6]:
data.describe().toPandas().transpose()


Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
Make,11914,,,Acura,Volvo
Model,11914,745.5822222222222,1490.8280590623795,1 Series,xD
Year,11914,2010.384337753903,7.5797398875957995,1990,2017
Engine Fuel Type,11911,,,diesel,regular unleaded
Engine HP,11845,249.38607007176023,109.19187025917194,55,1001
Engine Cylinders,11884,5.628828677213059,1.78055934824622,0,16
Transmission Type,11914,,,AUTOMATED_MANUAL,UNKNOWN
Driven_Wheels,11914,,,all wheel drive,rear wheel drive
Number of Doors,11908,3.4360933825999327,0.8813153865835529,2,4


#Data Preprocessing

In [7]:
def replace(column, value):
    return when(column!=value,column).otherwise(lit(None))

In [8]:
data = data.withColumn("Market Category", replace(col("Market Category"),"N/A"))


In [9]:
data.select([count(when(isnan(c)|col(c).isNull(),c)).alias(c) for c in data.columns]).show()


+----+-----+----+----------------+---------+----------------+-----------------+-------------+---------------+---------------+------------+-------------+-----------+--------+----------+----+
|Make|Model|Year|Engine Fuel Type|Engine HP|Engine Cylinders|Transmission Type|Driven_Wheels|Number of Doors|Market Category|Vehicle Size|Vehicle Style|highway MPG|city mpg|Popularity|MSRP|
+----+-----+----+----------------+---------+----------------+-----------------+-------------+---------------+---------------+------------+-------------+-----------+--------+----------+----+
|   0|    0|   0|               3|       69|              30|                0|            0|              6|           3742|           0|            0|          0|       0|         0|   0|
+----+-----+----+----------------+---------+----------------+-----------------+-------------+---------------+---------------+------------+-------------+-----------+--------+----------+----+



In [10]:
#deleting the column Market Category
data = data.drop("Market Category")


In [11]:
# deleting the all null values
data = data.na.drop()

In [12]:
data.select([count(when(isnan(c)|col(c).isNull(),c)).alias(c) for c in data.columns]).show()


+----+-----+----+----------------+---------+----------------+-----------------+-------------+---------------+------------+-------------+-----------+--------+----------+----+
|Make|Model|Year|Engine Fuel Type|Engine HP|Engine Cylinders|Transmission Type|Driven_Wheels|Number of Doors|Vehicle Size|Vehicle Style|highway MPG|city mpg|Popularity|MSRP|
+----+-----+----+----------------+---------+----------------+-----------------+-------------+---------------+------------+-------------+-----------+--------+----------+----+
|   0|    0|   0|               0|        0|               0|                0|            0|              0|           0|            0|          0|       0|         0|   0|
+----+-----+----+----------------+---------+----------------+-----------------+-------------+---------------+------------+-------------+-----------+--------+----------+----+



In [13]:
print((data.count(), len(data.columns)))


(11812, 15)


#Feature Engineering



In [14]:
# Encode categorical columns
indexers = [StringIndexer(inputCol=col, outputCol=col + "_index") for col in ['Make', 'Model', 'Engine Fuel Type', 'Transmission Type', 'Driven_Wheels', 'Vehicle Size', 'Vehicle Style']]
encoders = [OneHotEncoder(inputCol=col + "_index", outputCol=col + "_ohe") for col in ['Make', 'Model', 'Engine Fuel Type', 'Transmission Type', 'Driven_Wheels', 'Vehicle Size', 'Vehicle Style']]


In [15]:
# Assemble features into a single vector column "Input_Attributes"
assembler = VectorAssembler(
    inputCols=[col + "_ohe" for col in ['Make', 'Model', 'Engine Fuel Type', 'Transmission Type', 'Driven_Wheels', 'Vehicle Size', 'Vehicle Style']] +
              ["Engine HP", "Engine Cylinders", "Year", "highway MPG", "Number of Doors", "city mpg", "Popularity"],
    outputCol="Input_Attributes"
)

In [16]:
# Initialize RandomForestRegressor using the correct features column name
regressor_model = RandomForestRegressor(featuresCol='Input_Attributes', labelCol="MSRP")


#Pipeline Setup

In [17]:
# Define a pipeline with indexers, encoders, assembler, and regressor
pipeline = Pipeline(stages=indexers + encoders + [assembler, regressor_model])


#Cross-Validation and Parameter Grid

In [18]:
# Define the parameter grid for cross-validation
paramGrid = ParamGridBuilder() \
    .addGrid(regressor_model.numTrees, [100, 500]) \
    .build()


In [19]:
# Cross-validator with the pipeline
crossval = CrossValidator(
    estimator=pipeline,
    estimatorParamMaps=paramGrid,
    evaluator=RegressionEvaluator(labelCol="MSRP"),
    numFolds=3
)


#Model Training and Predictions

In [20]:
# Split the data into training and testing sets
train_data, test_data = data.randomSplit([0.8, 0.2], seed=133)


In [21]:
# Fit the pipeline without cross-validation to check for errors
pipeline_model = pipeline.fit(train_data)



In [22]:
# Make predictions on the test data
predictions = pipeline_model.transform(test_data)

In [23]:
# Show the predictions
predictions.select("Make", "Model", "Year", "Engine HP", "Engine Cylinders", "MSRP", "prediction").show(10, truncate=False)


+-----+----------+----+---------+----------------+-----+------------------+
|Make |Model     |Year|Engine HP|Engine Cylinders|MSRP |prediction        |
+-----+----------+----+---------+----------------+-----+------------------+
|Acura|CL        |2001|225      |6               |27980|41521.38437844447 |
|Acura|CL        |2002|225      |6               |28030|41521.38437844447 |
|Acura|CL        |2003|225      |6               |30350|41521.38437844447 |
|Acura|CL        |2003|260      |6               |32700|42437.081992262414|
|Acura|ILX       |2015|150      |4               |31750|31002.841043474007|
|Acura|ILX       |2016|201      |4               |29200|31371.494860135677|
|Acura|ILX       |2016|201      |4               |29900|31371.494860135677|
|Acura|ILX Hybrid|2014|111      |4               |34600|31002.841043474007|
|Acura|Integra   |2000|140      |4               |3222 |4217.49855388558  |
|Acura|Integra   |2000|170      |4               |3652 |4491.223286075199 |
+-----+-----

In [24]:
predictions.select("MSRP", "prediction").show(20, truncate=False)


+-----+------------------+
|MSRP |prediction        |
+-----+------------------+
|27980|41521.38437844447 |
|28030|41521.38437844447 |
|30350|41521.38437844447 |
|32700|42437.081992262414|
|31750|31002.841043474007|
|29200|31371.494860135677|
|29900|31371.494860135677|
|34600|31002.841043474007|
|3222 |4217.49855388558  |
|3652 |4491.223286075199 |
|7398 |4491.223286075199 |
|21600|23125.920739890193|
|2000 |4084.0859483469008|
|2181 |4084.0859483469008|
|2384 |4084.0859483469008|
|2066 |3311.892742374511 |
|2066 |3311.892742374511 |
|50840|38999.62812123405 |
|42565|37188.36557152353 |
|46840|37188.36557152353 |
+-----+------------------+
only showing top 20 rows



#Conclusion

`The pipeline successfully predicts car prices with the RandomForestRegressor model.`