## Model persistence using SparkML


## Objectives

In this Jupyter notebook, I learned to:

 - Save a trained model.
 - Load a saved model.
 - Make predictions using the loaded model.


In [4]:
#!pip install pyspark==3.1.2 -q
#!pip install findspark -q

### Importing Required Libraries

In [5]:
def warn(*args, **kwargs):
    pass
import warnings
warnings.warn = warn
warnings.filterwarnings('ignore')

# FindSpark simplifies the process of using Apache Spark with Python

import findspark
findspark.init()

#import functions/Classes for sparkml

from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline

# import functions/Classes for metrics
from pyspark.ml.evaluation import RegressionEvaluator


# Examples


## Task 1 - Create a model


In [6]:
#Create SparkSession
spark = SparkSession.builder.appName("Model Persistence").getOrCreate()

24/07/20 15:03:54 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


Download the data file


In [7]:
!wget https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0231EN-SkillsNetwork/datasets/mpg.csv

--2024-07-20 15:04:01--  https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0231EN-SkillsNetwork/datasets/mpg.csv
Resolving cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud (cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud)... 169.63.118.104, 169.63.118.104
Connecting to cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud (cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud)|169.63.118.104|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 13891 (14K) [text/csv]
Saving to: ‘mpg.csv.7’


2024-07-20 15:04:01 (37.3 MB/s) - ‘mpg.csv.7’ saved [13891/13891]



Load the dataset into the spark dataframe


In [8]:
# Load mpg dataset
mpg_data = spark.read.csv("mpg.csv", header=True, inferSchema=True)

                                                                                

Print the schema of the dataset


In [9]:
mpg_data.printSchema()

root
 |-- MPG: double (nullable = true)
 |-- Cylinders: integer (nullable = true)
 |-- Engine Disp: double (nullable = true)
 |-- Horsepower: integer (nullable = true)
 |-- Weight: integer (nullable = true)
 |-- Accelerate: double (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Origin: string (nullable = true)



Show 5 rows from the dataset


In [10]:
mpg_data.show(5)

+----+---------+-----------+----------+------+----------+----+--------+
| MPG|Cylinders|Engine Disp|Horsepower|Weight|Accelerate|Year|  Origin|
+----+---------+-----------+----------+------+----------+----+--------+
|15.0|        8|      390.0|       190|  3850|       8.5|  70|American|
|21.0|        6|      199.0|        90|  2648|      15.0|  70|American|
|18.0|        6|      199.0|        97|  2774|      15.5|  70|American|
|16.0|        8|      304.0|       150|  3433|      12.0|  70|American|
|14.0|        8|      455.0|       225|  3086|      10.0|  70|American|
+----+---------+-----------+----------+------+----------+----+--------+
only showing top 5 rows



The VectorAssembler to group a bunch of inputCols as single column named "features"

In [11]:
# Prepare feature vector
assembler = VectorAssembler(inputCols=["Cylinders", "Engine Disp", "Horsepower", "Weight", "Accelerate", "Year"], outputCol="features")
mpg_transformed_data = assembler.transform(mpg_data)

Display the assembled "features" and the label column "MPG"


In [12]:
mpg_transformed_data.select("features","MPG").show()

+--------------------+----+
|            features| MPG|
+--------------------+----+
|[8.0,390.0,190.0,...|15.0|
|[6.0,199.0,90.0,2...|21.0|
|[6.0,199.0,97.0,2...|18.0|
|[8.0,304.0,150.0,...|16.0|
|[8.0,455.0,225.0,...|14.0|
|[8.0,350.0,165.0,...|15.0|
|[8.0,307.0,130.0,...|18.0|
|[8.0,454.0,220.0,...|14.0|
|[8.0,400.0,150.0,...|15.0|
|[8.0,307.0,200.0,...|10.0|
|[8.0,383.0,170.0,...|15.0|
|[8.0,318.0,210.0,...|11.0|
|[8.0,360.0,215.0,...|10.0|
|[8.0,429.0,198.0,...|15.0|
|[6.0,200.0,85.0,2...|21.0|
|[8.0,302.0,140.0,...|17.0|
|[8.0,304.0,193.0,...| 9.0|
|[8.0,340.0,160.0,...|14.0|
|[6.0,198.0,95.0,2...|22.0|
|[8.0,440.0,215.0,...|14.0|
+--------------------+----+
only showing top 20 rows



70% training data, 30% testing data.

In [13]:
# Split data into training and testing sets
(training_data, testing_data) = mpg_transformed_data.randomSplit([0.7, 0.3])

Create a LR model and train the model using the pipeline on training data set


In [14]:
# Train linear regression model
lr = LinearRegression(labelCol="MPG", featuresCol="features")
pipeline = Pipeline(stages=[lr])
model = pipeline.fit(training_data)

24/07/20 15:04:18 WARN util.Instrumentation: [c7ba6b89] regParam is zero, which might cause numerical instability and overfitting.
24/07/20 15:04:19 WARN netlib.BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
24/07/20 15:04:19 WARN netlib.BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
24/07/20 15:04:19 WARN netlib.LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeSystemLAPACK
24/07/20 15:04:19 WARN netlib.LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeRefLAPACK


## Task 2 - Save the model


Create a folder where the model will to be saved


In [15]:
!mkdir model_storage

In [16]:
# Persist the model to the path "./model_stoarage/"
model.write().overwrite().save("./model_storage/")

                                                                                

## Task 3 - Load the model


In [17]:
from pyspark.ml.pipeline import PipelineModel
# Load persisted model
loaded_model = PipelineModel.load("./model_storage/")

                                                                                

## Task 4 - Predict using the loaded model


In [18]:
# Make predictions on test data
predictions = loaded_model.transform(testing_data)

The model is now trained. We use the testing data to make predictions.

In [19]:
# Make predictions on testing data
predictions = model.transform(testing_data)

In [20]:
predictions.select("prediction").show(5)

+------------------+
|        prediction|
+------------------+
| 7.234177089859596|
| 9.314203398749019|
| 8.173030384978109|
| 7.884910389645313|
|10.354628406314081|
+------------------+
only showing top 5 rows



Stop Spark Session


In [21]:
spark.stop()

# Exercises


### Exercise 1 - Create a model


Create a spark session with appname "Model Persistence Exercise"


In [22]:
spark = SparkSession.builder.appName("Model Persistence Exercise").getOrCreate() #TODO

Download the data set


In [23]:
!wget https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0231EN-SkillsNetwork/datasets/diamonds.csv


--2024-07-20 15:07:34--  https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0231EN-SkillsNetwork/datasets/diamonds.csv
Resolving cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud (cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud)... 169.63.118.104, 169.63.118.104
Connecting to cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud (cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud)|169.63.118.104|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 3192561 (3.0M) [text/csv]
Saving to: ‘diamonds.csv.4’


2024-07-20 15:07:35 (46.0 MB/s) - ‘diamonds.csv.4’ saved [3192561/3192561]



Load the dataset into a spark dataframe


In [24]:
diamond_data = spark.read.csv("diamonds.csv", header=True, inferSchema=True)

                                                                                

Display sample data from dataset


In [25]:
diamond_data.show(5)

+---+-----+-------+-----+-------+-----+-----+-----+----+----+----+
|  s|carat|    cut|color|clarity|depth|table|price|   x|   y|   z|
+---+-----+-------+-----+-------+-----+-----+-----+----+----+----+
|  1| 0.23|  Ideal|    E|    SI2| 61.5| 55.0|  326|3.95|3.98|2.43|
|  2| 0.21|Premium|    E|    SI1| 59.8| 61.0|  326|3.89|3.84|2.31|
|  3| 0.23|   Good|    E|    VS1| 56.9| 65.0|  327|4.05|4.07|2.31|
|  4| 0.29|Premium|    I|    VS2| 62.4| 58.0|  334| 4.2|4.23|2.63|
|  5| 0.31|   Good|    J|    SI2| 63.3| 58.0|  335|4.34|4.35|2.75|
+---+-----+-------+-----+-------+-----+-----+-----+----+----+----+
only showing top 5 rows



Assemble the columns columns carat,depth and table into a single column named "features"


In [26]:
assembler = VectorAssembler(inputCols=["carat", "depth", "table"], outputCol="features")
diamond_transformed_data = assembler.transform(diamond_data)

Print the vectorized features and label columns


In [27]:
diamond_transformed_data.select("features","price").show()

+----------------+-----+
|        features|price|
+----------------+-----+
|[0.23,61.5,55.0]|  326|
|[0.21,59.8,61.0]|  326|
|[0.23,56.9,65.0]|  327|
|[0.29,62.4,58.0]|  334|
|[0.31,63.3,58.0]|  335|
|[0.24,62.8,57.0]|  336|
|[0.24,62.3,57.0]|  336|
|[0.26,61.9,55.0]|  337|
|[0.22,65.1,61.0]|  337|
|[0.23,59.4,61.0]|  338|
| [0.3,64.0,55.0]|  339|
|[0.23,62.8,56.0]|  340|
|[0.22,60.4,61.0]|  342|
|[0.31,62.2,54.0]|  344|
| [0.2,60.2,62.0]|  345|
|[0.32,60.9,58.0]|  345|
| [0.3,62.0,54.0]|  348|
| [0.3,63.4,54.0]|  351|
| [0.3,63.8,56.0]|  351|
| [0.3,62.7,59.0]|  351|
+----------------+-----+
only showing top 20 rows



Split the dataset into training and testing sets in the ratio of 70:30.


In [28]:
(training_data, testing_data) = diamond_transformed_data.randomSplit([0.7, 0.3])

Create a LR model and train the model using the pipeline on training data set


In [29]:
# Train linear regression model
lr = LinearRegression(labelCol="price", featuresCol="features")
pipeline = Pipeline(stages=[lr])
model = pipeline.fit(training_data)

24/07/20 15:09:54 WARN util.Instrumentation: [2335c4f3] regParam is zero, which might cause numerical instability and overfitting.
                                                                                

### Exercise 2 - Save the model


Create a folder "diamond_model". This is where the model will to be saved


In [30]:
!mkdir diamond_model

Persist the model to the folder "diamond_model"


In [31]:
model.write().overwrite().save("./diamond_model/")

                                                                                

### Exercise 3 - Load the model


Load the model from the folder "diamond_model"


In [33]:
from pyspark.ml.pipeline import PipelineModel

# Load persisted model
loaded_model = PipelineModel.load("./diamond_model/")

### Exercise 4 - Predict using the loaded model


Make predictions on test data


In [34]:
predictions = loaded_model.transform(testing_data)

In [35]:
predictions.select("prediction").show(5)

+-------------------+
|         prediction|
+-------------------+
|-232.78128172173456|
| -628.8574074118533|
| -568.5269159254276|
| -1509.789380066617|
|-58.978698883740435|
+-------------------+
only showing top 5 rows



Stop Spark Session


In [36]:
spark.stop()