## ML Pipeline in spark

Performs ETL and ML on MPG data

In [1]:
def warn(*args, **kwargs):
    pass
import warnings
warnings.warn = warn
warnings.filterwarnings('ignore')

# FindSpark simplifies the process of using Apache Spark with Python

import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.pipeline import PipelineModel
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import StandardScaler

### Part 1: ETL

In [2]:
spark = SparkSession.builder.appName("MPG_Project").getOrCreate()

24/06/10 15:24:21 WARN Utils: Your hostname, ubuntu-MS-7D15 resolves to a loopback address: 127.0.1.1; using 192.168.1.3 instead (on interface enp5s0)
24/06/10 15:24:21 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
24/06/10 15:24:36 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [3]:
#!wget https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBMSkillsNetwork-BD0231EN-Coursera/datasets/mpg-raw.csv

In [4]:
df = spark.read.csv("mpg-raw.csv", header=True, inferSchema=True)

In [5]:
df.show(5)

+----+---------+-----------+----------+------+----------+----+--------+
| MPG|Cylinders|Engine Disp|Horsepower|Weight|Accelerate|Year|  Origin|
+----+---------+-----------+----------+------+----------+----+--------+
|46.6|        4|       86.0|        65|  2110|      17.9|  80|Japanese|
|44.6|        4|       91.0|        67|  1850|      13.8|  80|Japanese|
|44.3|        4|       90.0|        48|  2085|      21.7|  80|European|
|44.0|        4|       97.0|        52|  2130|      24.6|  82|European|
|43.4|        4|       90.0|        48|  2335|      23.7|  80|European|
+----+---------+-----------+----------+------+----------+----+--------+
only showing top 5 rows



In [6]:
df = df.withColumnRenamed("Engine Disp","Engine_Disp")

In [7]:
df.groupBy('Origin').count().orderBy('count').show()

+--------+-----+
|  Origin|count|
+--------+-----+
|    null|    1|
|European|   70|
|Japanese|   88|
|American|  247|
+--------+-----+



#### Duplicates

In [8]:
rowcount1 = df.count()
print(rowcount1)

406


In [9]:
df = df.dropDuplicates()

In [10]:
rowcount2 = df.count()
print(rowcount2)

392


#### drop na

In [11]:
df=df.dropna()

In [12]:
rowcount3 = df.count()
print(rowcount3)

385


In [13]:
df.write.mode("overwrite").parquet("mpg-cleaned.parquet")

24/06/10 15:24:41 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
24/06/10 15:24:41 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 84.44% for 9 writers
24/06/10 15:24:41 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 76.00% for 10 writers
24/06/10 15:24:41 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 69.09% for 11 writers
24/06/10 15:24:41 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 63.33% for 12 writers
24/06/10 15:24:41 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 69.09% for 11 writers
24/06/10 15:24:41 WARN MemoryManager: Total allocation exceeds 95.

In [14]:
print("Part 1 - Evaluation")

print("Total rows = ", rowcount1)
print("Total rows after dropping duplicate rows = ", rowcount2)
print("Total rows after dropping duplicate rows and rows with null values = ", rowcount3)
print("Renamed column name = ", df.columns[2])

import os

print("mpg-cleaned.parquet exists :", os.path.isdir("mpg-cleaned.parquet"))

Part 1 - Evaluation
Total rows =  406
Total rows after dropping duplicate rows =  392
Total rows after dropping duplicate rows and rows with null values =  385
Renamed column name =  Engine_Disp
mpg-cleaned.parquet exists : True


### Part 2: ML Pipeline

In [15]:
df = spark.read.parquet("mpg-cleaned.parquet")
rowcount4 = df.count()
print(rowcount4)

385


In [16]:
indexer = StringIndexer(inputCol="Origin", outputCol="OriginIndex")

In [17]:
assembler = VectorAssembler(inputCols=['Cylinders','Engine_Disp','Horsepower','Weight','Accelerate','Year'], outputCol="features")

#### standard scalar

In [18]:
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures")

#### lr pipeline

In [19]:
lr = LinearRegression(featuresCol="scaledFeatures", labelCol="MPG")

In [20]:
pipeline = Pipeline(stages=[indexer,assembler, scaler, lr])

#### train test

In [21]:
(trainingData, testingData) = df.randomSplit([0.7, 0.3], seed=42)

#### pipeline fit

In [22]:
pipelineModel = pipeline.fit(trainingData)

24/06/10 15:24:43 WARN Instrumentation: [aaddc371] regParam is zero, which might cause numerical instability and overfitting.
24/06/10 15:24:43 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
24/06/10 15:24:43 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
24/06/10 15:24:43 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeSystemLAPACK
24/06/10 15:24:43 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeRefLAPACK


In [23]:
print("Part 2 - Evaluation")
print("Total rows = ", rowcount4)
ps = [str(x).split("_")[0] for x in pipeline.getStages()]

print("Pipeline Stage 1 = ", ps[0])
print("Pipeline Stage 2 = ", ps[1])
print("Pipeline Stage 3 = ", ps[2])

print("Label column = ", lr.getLabelCol())

Part 2 - Evaluation
Total rows =  385
Pipeline Stage 1 =  StringIndexer
Pipeline Stage 2 =  VectorAssembler
Pipeline Stage 3 =  StandardScaler
Label column =  MPG


### Part 3: model evaluation

In [24]:
predictions = pipelineModel.transform(testingData)

In [25]:
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="MPG", metricName="mse")
mse = evaluator.evaluate(predictions)
print(mse)

evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="MPG", metricName="mae")
mae = evaluator.evaluate(predictions)
print(mae)

evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="MPG", metricName="r2")
r2 = evaluator.evaluate(predictions)
print(r2)

9.279864329372996
2.350907028306923
0.8170972243874175


In [26]:
print("Part 3 - Evaluation")

print("Mean Squared Error = ", round(mse,2))
print("Mean Absolute Error = ", round(mae,2))
print("R Squared = ", round(r2,2))

lrModel = pipelineModel.stages[-1]

print("Intercept = ", round(lrModel.intercept,2))


Part 3 - Evaluation
Mean Squared Error =  9.28
Mean Absolute Error =  2.35
R Squared =  0.82
Intercept =  -10.48


In [29]:
print("Part 4 - Evaluation")

loadedmodel = pipelineModel.stages[-1]
totalstages = len(pipelineModel.stages)
inputcolumns = pipelineModel.stages[1].getInputCols()

print("Number of stages in the pipeline = ", totalstages)
for i,j in zip(inputcolumns, loadedmodel.coefficients):
    print(f"Coefficient for {i} is {round(j,4)}")

Part 4 - Evaluation
Number of stages in the pipeline =  4
Coefficient for Cylinders is -0.5029
Coefficient for Engine_Disp is 0.7078
Coefficient for Horsepower is 0.1952
Coefficient for Weight is -6.077
Coefficient for Accelerate is 0.239
Coefficient for Year is 2.6966
