In [1]:
!pip install pyspark==3.1.2 -q
!pip install findspark -q

In [2]:
# Suppress warnings
def warn(*args, **kwargs):
    pass
import warnings
warnings.warn = warn
warnings.filterwarnings('ignore')


import findspark
findspark.init()

In [3]:
# Import libraries
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.pipeline import PipelineModel
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import StandardScaler

In [5]:
# Create SparkSession
spark = SparkSession.builder.appName("mpg").getOrCreate()

24/05/01 16:41:30 WARN Utils: Your hostname, javi-VirtualBox resolves to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface enp0s3)
24/05/01 16:41:30 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
24/05/01 16:41:37 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [8]:
# Download data
!wget https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBMSkillsNetwork-BD0231EN-Coursera/datasets/mpg-raw.csv

--2024-05-01 16:42:50--  https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBMSkillsNetwork-BD0231EN-Coursera/datasets/mpg-raw.csv
Resolving cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud (cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud)... 

198.23.119.245
Connecting to cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud (cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud)|198.23.119.245|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 14354 (14K) [text/csv]
Saving to: ‘mpg-raw.csv’


2024-05-01 16:42:51 (111 MB/s) - ‘mpg-raw.csv’ saved [14354/14354]



In [9]:
# Load dataset
df = spark.read.csv("mpg-raw.csv", header=True, inferSchema=True)

                                                                                

In [10]:
df.show()

                                                                                

+----+---------+-----------+----------+------+----------+----+--------+
| MPG|Cylinders|Engine Disp|Horsepower|Weight|Accelerate|Year|  Origin|
+----+---------+-----------+----------+------+----------+----+--------+
|46.6|        4|       86.0|        65|  2110|      17.9|  80|Japanese|
|44.6|        4|       91.0|        67|  1850|      13.8|  80|Japanese|
|44.3|        4|       90.0|        48|  2085|      21.7|  80|European|
|44.0|        4|       97.0|        52|  2130|      24.6|  82|European|
|43.4|        4|       90.0|        48|  2335|      23.7|  80|European|
|43.1|        4|       90.0|        48|  1985|      21.5|  78|European|
|41.5|        4|       98.0|        76|  2144|      14.7|  80|European|
|40.8|        4|       85.0|        65|  2110|      19.2|  80|Japanese|
|39.4|        4|       85.0|        70|  2070|      18.6|  78|Japanese|
|39.1|        4|       79.0|        58|  1755|      16.9|  81|Japanese|
|39.0|        4|       86.0|        64|  1875|      16.4|  81|Am

In [13]:
# Numbers of cars by origin
df.groupBy('Origin').count().orderBy('count').show()



+--------+-----+
|  Origin|count|
+--------+-----+
|    null|    1|
|European|   68|
|Japanese|   79|
|American|  244|
+--------+-----+



                                                                                

In [15]:
# Drop duplicates
df = df.dropDuplicates()

# Drop Na
df=df.dropna()

# Rename columns
df = df.withColumnRenamed("Engine Disp","Engine_Disp")

In [16]:
# Save df
df.write.parquet("mpg-cleaned.parquet")

                                                                                

# Machine learning Pipeline

In [18]:
# Convert "Origin" column in "OriginIndex"
indexer = StringIndexer(inputCol="Origin", outputCol="OriginIndex")

In [20]:
# Assemble the input columns 'Cylinders','Engine_Disp','Horsepower','Weight','Accelerate','Year' into a single column "features"
assembler = VectorAssembler(inputCols=['Cylinders','Engine_Disp','Horsepower','Weight','Accelerate','Year'], outputCol="features")

In [21]:
# Scale the "features" using standard scaler and store in "scaledFeatures" column
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures")

In [23]:
# Create a LinearRegression stage to predict "MPG"
lr = LinearRegression(featuresCol="scaledFeatures", labelCol="MPG")

In [24]:
# Pipeline
pipeline = Pipeline(stages=[indexer, assembler, scaler, lr])

In [25]:
# Split the data
(trainingData, testingData) = df.randomSplit([0.7, 0.3], seed=42)

In [26]:
# Fit the pipeline using the training data 
pipelineModel = pipeline.fit(trainingData)

24/05/01 16:54:44 WARN Instrumentation: [f92d0e44] regParam is zero, which might cause numerical instability and overfitting.
24/05/01 16:54:46 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
24/05/01 16:54:46 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
24/05/01 16:54:55 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeSystemLAPACK
24/05/01 16:54:55 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeRefLAPACK
                                                                                

In [28]:
# Evaluation
ps = [str(x).split("_")[0] for x in pipeline.getStages()]

print("Pipeline Stage 1 = ", ps[0])
print("Pipeline Stage 2 = ", ps[1])
print("Pipeline Stage 3 = ", ps[2])

print("Label column = ", lr.getLabelCol())

Pipeline Stage 1 =  StringIndexer
Pipeline Stage 2 =  VectorAssembler
Pipeline Stage 3 =  StandardScaler
Label column =  MPG


# Model Evaluation

In [30]:
# Make predictions
predictions = pipelineModel.transform(testingData)

In [31]:
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="MPG", metricName="mse")
mse = evaluator.evaluate(predictions)
print(mse)



12.845495281915143


                                                                                

In [32]:
evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="MPG", metricName="mae")
mae = evaluator.evaluate(predictions)
print(mae)



2.7351744433622636


                                                                                

In [33]:
evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="MPG", metricName="r2")
r2 = evaluator.evaluate(predictions)
print(r2)



0.7841054136645005


                                                                                

In [34]:
print("Mean Squared Error = ", round(mse,2))
print("Mean Absolute Error = ", round(mae,2))
print("R Squared = ", round(r2,2))

lrModel = pipelineModel.stages[-1]

print("Intercept = ", round(lrModel.intercept,2))

Mean Squared Error =  12.85
Mean Absolute Error =  2.74
R Squared =  0.78
Intercept =  -11.2


# Model Persistance

In [36]:
pipelineModel.write().save("mpg")

In [37]:
loadedPipelineModel = PipelineModel.load("mpg")

                                                                                

In [38]:
predictions = loadedPipelineModel.transform(testingData)

In [39]:
predictions.select("MPG","prediction").show()

                                                                                

+----+------------------+
| MPG|        prediction|
+----+------------------+
|13.0|14.216408820347961|
|29.0|28.539638236877728|
|15.5|16.448557583581337|
|18.0|26.444046799197004|
|31.0|30.172810503024664|
|13.0| 18.49524319441184|
|24.2|25.688359513466175|
|21.1|28.101716196449743|
|29.0|30.060371706474555|
|16.0|20.361776906569986|
|30.0|24.974982487185294|
|36.0|  33.4581645949177|
|21.0| 23.75846184152998|
|25.0| 24.11874562883999|
|13.0|16.727808802940178|
|37.7| 33.19356080860416|
|13.0|12.011423967534562|
|16.5|15.902350377944924|
|14.0|12.540061376646488|
|20.6| 22.43542366094588|
+----+------------------+
only showing top 20 rows



In [40]:
loadedmodel = loadedPipelineModel.stages[-1]
totalstages = len(loadedPipelineModel.stages)
inputcolumns = loadedPipelineModel.stages[1].getInputCols()

print("Number of stages in the pipeline = ", totalstages)
for i,j in zip(inputcolumns, loadedmodel.coefficients):
    print(f"Coefficient for {i} is {round(j,4)}")

Number of stages in the pipeline =  4
Coefficient for Cylinders is -1.2482
Coefficient for Engine_Disp is 2.8022
Coefficient for Horsepower is -0.1045
Coefficient for Weight is -6.9009
Coefficient for Accelerate is 0.3259
Coefficient for Year is 2.6951


In [41]:
spark.stop()