<p style="text-align:center">
    <a href="https://skills.network/?utm_medium=Exinfluencer&utm_source=Exinfluencer&utm_content=000026UJ&utm_term=10006555&utm_id=NA-SkillsNetwork-Channel-SkillsNetworkCoursesIBMSkillsNetworkBD0231ENCoursera2789-2023-01-01">
    <img src="https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/assets/logos/SN_web_lightmode.png" width="200" alt="Skills Network Logo"  />
    </a>
</p>


## Create a machine learning pipeline for a regression project


## Objectives

- Part 1 ETL
  - Load a csv dataset
  - Remove duplicates if any
  - Drop rows with null values if any
  - Make transformations
  - Store the cleaned data in parquet format
- Part 2 Machine Learning Pipeline creation
  - Create a machine learning pipeline for prediction
- Part 3 Model evaluation
  - Evaluate the model using metrics
  - Print the intercept and the coefficients
- Part 4 Model Persistance
  - Save the model for future production use
  - Load and verify the stored model


## Datasets

Modified version of car mileage dataset. Original dataset available at https://archive.ics.uci.edu/ml/datasets/auto+mpg



----


## Setup


We will be using the following libraries:

*   [`PySpark`](https://spark.apache.org/docs/latest/api/python/index.html?utm_medium=Exinfluencer&utm_source=Exinfluencer&utm_content=000026UJ&utm_term=10006555&utm_id=NA-SkillsNetwork-Channel-SkillsNetworkCoursesIBMSkillsNetworkBD0231ENCoursera2789-2023-01-01) for connecting to the Spark Cluster


### Importing Required Libraries


In [2]:
%pip install pyspark
%pip install findspark -q

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m2.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425344 sha256=07b76d3eb4296ca91b4a7161649d6a4a4e430e108a2add3d8b36d59cd1c7d3b9
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0


In [3]:
def warn(*args, **kwargs):
    pass
import warnings
warnings.warn = warn
warnings.filterwarnings('ignore')

# FindSpark simplifies the process of using Apache Spark with Python

import findspark
findspark.init()

In [4]:
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.pipeline import PipelineModel
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import StandardScaler

## Part 1 - ETL


In [5]:
#Create SparkSession

spark = SparkSession.builder.getOrCreate()

Download the data file


In [6]:
!wget https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBMSkillsNetwork-BD0231EN-Coursera/datasets/mpg-raw.csv

--2023-11-24 15:32:29--  https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBMSkillsNetwork-BD0231EN-Coursera/datasets/mpg-raw.csv
Resolving cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud (cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud)... 169.63.118.104
Connecting to cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud (cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud)|169.63.118.104|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 14354 (14K) [text/csv]
Saving to: ‘mpg-raw.csv’


2023-11-24 15:32:29 (214 MB/s) - ‘mpg-raw.csv’ saved [14354/14354]



Load the dataset into the spark dataframe


In [7]:
# Load dataset
df = spark.read.csv("mpg-raw.csv", header=True, inferSchema=True)

In [8]:
df.show(5)

+----+---------+-----------+----------+------+----------+----+--------+
| MPG|Cylinders|Engine Disp|Horsepower|Weight|Accelerate|Year|  Origin|
+----+---------+-----------+----------+------+----------+----+--------+
|46.6|        4|       86.0|        65|  2110|      17.9|  80|Japanese|
|44.6|        4|       91.0|        67|  1850|      13.8|  80|Japanese|
|44.3|        4|       90.0|        48|  2085|      21.7|  80|European|
|44.0|        4|       97.0|        52|  2130|      24.6|  82|European|
|43.4|        4|       90.0|        48|  2335|      23.7|  80|European|
+----+---------+-----------+----------+------+----------+----+--------+
only showing top 5 rows



In [9]:
df.groupBy('Origin').count().orderBy('count').show()

+--------+-----+
|  Origin|count|
+--------+-----+
|    NULL|    1|
|European|   70|
|Japanese|   88|
|American|  247|
+--------+-----+



In [10]:
rowcount1 = df.count()
print(rowcount1)

406


In [11]:
df = df.dropDuplicates()

In [12]:
rowcount2 = df.count()
print(rowcount2)

392


In [13]:
df=df.dropna()

In [14]:
rowcount3 = df.count()
print(rowcount3)

385


In [15]:
df = df.withColumnRenamed("Engine Disp","Engine_Disp")

In [16]:
df.write.mode("overwrite").parquet("mpg-cleaned.parquet")

In [17]:
print("Part 1 - Evaluation")

print("Total rows = ", rowcount1)
print("Total rows after dropping duplicate rows = ", rowcount2)
print("Total rows after dropping duplicate rows and rows with null values = ", rowcount3)
print("Renamed column name = ", df.columns[2])

import os

print("mpg-cleaned.parquet exists :", os.path.isdir("mpg-cleaned.parquet"))

Part 1 - Evaluation
Total rows =  406
Total rows after dropping duplicate rows =  392
Total rows after dropping duplicate rows and rows with null values =  385
Renamed column name =  Engine_Disp
mpg-cleaned.parquet exists : True


## Part - 2 Machine Learning Pipeline creation


In [18]:
df = spark.read.parquet("mpg-cleaned.parquet")
print(df.count())

385


In [19]:
#show top 5 rows
df.show(5)

+----+---------+-----------+----------+------+----------+----+--------+
| MPG|Cylinders|Engine_Disp|Horsepower|Weight|Accelerate|Year|  Origin|
+----+---------+-----------+----------+------+----------+----+--------+
|24.0|        4|      134.0|        96|  2702|      13.5|  75|Japanese|
|18.0|        6|      250.0|        88|  3139|      14.5|  71|American|
|29.0|        4|       68.0|        49|  1867|      19.5|  73|European|
|22.4|        6|      231.0|       110|  3415|      15.8|  81|American|
|20.5|        6|      231.0|       105|  3425|      16.9|  77|American|
+----+---------+-----------+----------+------+----------+----+--------+
only showing top 5 rows



In [20]:
#print the schema of the dataframe
df.schema

StructType([StructField('MPG', DoubleType(), True), StructField('Cylinders', IntegerType(), True), StructField('Engine_Disp', DoubleType(), True), StructField('Horsepower', IntegerType(), True), StructField('Weight', IntegerType(), True), StructField('Accelerate', DoubleType(), True), StructField('Year', IntegerType(), True), StructField('Origin', StringType(), True)])

In [21]:
# Stage - 1 Using StringIndexer convert the string column "Origin" into "OriginIndex"
indexer = StringIndexer(inputCol="Origin", outputCol="OriginIndex")

In [22]:
# Stage 2 - assemble the input columns 'Cylinders','Engine_Disp','Horsepower','Weight','Accelerate','Year' into a single column "features"
assembler = VectorAssembler(inputCols=['Cylinders','Engine_Disp','Horsepower','Weight','Accelerate','Year'], outputCol="features")

In [23]:
# Stage 3 - scale the "features" using standard scaler and store in "scaledFeatures" column
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures")

In [24]:
# Stage 4 - Create a LinearRegression stage to predict "MPG"
lr = LinearRegression(featuresCol="features", labelCol="MPG")

In [25]:
# Build a pipeline using the above four stages
pipeline = Pipeline(stages=[indexer,assembler, scaler, lr])

In [26]:
# Split the data into training and testing sets with 70:30 split. Use 42 as seed
(trainingData, testingData) = df.randomSplit([0.7, 0.3], seed=42)

In [27]:
# Fit the pipeline using the training data
pipelineModel = pipeline.fit(trainingData)

In [28]:
print("Part 2 - Evaluation")
print("Total rows = ", df.count())
ps = [str(x).split("_")[0] for x in pipeline.getStages()]

print("Pipeline Stage 1 = ", ps[0])
print("Pipeline Stage 2 = ", ps[1])
print("Pipeline Stage 3 = ", ps[2])

print("Label column = ", lr.getLabelCol())

Part 2 - Evaluation
Total rows =  385
Pipeline Stage 1 =  StringIndexer
Pipeline Stage 2 =  VectorAssembler
Pipeline Stage 3 =  StandardScaler
Label column =  MPG


## Part 3 - Model Evaluation


In [29]:
# Make predictions on testing data
predictions = pipelineModel.transform(testingData)

In [30]:
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="MPG", metricName="mse")
mse = evaluator.evaluate(predictions)
print(mse)

12.226745835566769


In [31]:
evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="MPG", metricName="mae")
mae = evaluator.evaluate(predictions)
print(mae)

2.845715113013043


In [32]:
evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="MPG", metricName="r2")
r2 = evaluator.evaluate(predictions)
print(r2)

0.801873739489645


In [33]:
print("Part 3 - Evaluation")

print("Mean Squared Error = ", round(mse,2))
print("Mean Absolute Error = ", round(mae,2))
print("R Squared = ", round(r2,2))

lrModel = pipelineModel.stages[-1]

print("Intercept = ", round(lrModel.intercept,2))

Part 3 - Evaluation
Mean Squared Error =  12.23
Mean Absolute Error =  2.85
R Squared =  0.8
Intercept =  -17.37


## Part 4 - Model persistance


In [34]:
# Save the pipeline model
pipelineModel.write().overwrite().save("Practice_Project")

In [35]:
# Load the pipeline model
loadedPipelineModel = PipelineModel.load("Practice_Project")

In [36]:
# Use the loaded pipeline model for predictions
predictions = loadedPipelineModel.transform(testingData)

In [37]:
predictions.select("MPG","prediction").show()

+----+------------------+
| MPG|        prediction|
+----+------------------+
|10.0| 6.960764577507479|
|11.0| 8.545911819810382|
|12.0|10.226709705747357|
|12.0| 5.446415257216561|
|13.0|21.430212400588545|
|13.0| 17.43779207805767|
|13.0|11.245494102903642|
|13.0|14.180626433499196|
|13.0| 9.959082691690636|
|13.0|11.111417171061483|
|13.0| 13.17091781181795|
|13.0|10.889439874579814|
|13.0| 7.144536211558332|
|13.0| 4.279565485358152|
|13.0|  8.61119245028016|
|14.0|10.356052138541756|
|14.0| 16.05730844627147|
|14.0|12.327668542375246|
|14.0|10.787367112522794|
|14.0| 10.98393562815777|
+----+------------------+
only showing top 20 rows



In [38]:
print("Part 4 - Evaluation")

loadedmodel = loadedPipelineModel.stages[-1]
totalstages = len(loadedPipelineModel.stages)
inputcolumns = loadedPipelineModel.stages[1].getInputCols()

print("Number of stages in the pipeline = ", totalstages)
for i,j in zip(inputcolumns, loadedmodel.coefficients):
    print(f"Coefficient for {i} is {round(j,4)}")

Part 4 - Evaluation
Number of stages in the pipeline =  4
Coefficient for Cylinders is 0.0716
Coefficient for Engine_Disp is 0.005
Coefficient for Horsepower is -0.0068
Coefficient for Weight is -0.0072
Coefficient for Accelerate is 0.0866
Coefficient for Year is 0.7911


In [39]:
spark.stop()