### Saving and loading a SparkML model


#### Objectives:

*   Create a simple Linear Regression Model
*   Save the SparkML model
*   Load the SparkML model
*   Make predictions using the loaded SparkML model


#### Install pyspark


In [1]:
!pip install pyspark
!pip install findspark



#### Import libraries


In [2]:
import findspark
findspark.init()

In [3]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

#### Creating the spark session and context


In [4]:
# Creating a spark context class
sc = SparkContext()

# Creating a spark session
spark = SparkSession \
    .builder \
    .appName("Saving and Loading a SparkML Model").getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/02/06 15:49:26 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


#### Importing Spark ML libraries


In [5]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression

#### Create a DataFrame with sample data


In [6]:
# Create a simple data set of infant height(cms) weight(kgs) chart.

mydata = [[46,2.5],[51,3.4],[54,4.4],[57,5.1],[60,5.6],[61,6.1],[63,6.4]]
  
# Mention column names of dataframe
columns = ["height", "weight"]
  
# creating a dataframe
mydf = spark.createDataFrame(mydata, columns)
  
# show data frame
mydf.show()

                                                                                

+------+------+
|height|weight|
+------+------+
|    46|   2.5|
|    51|   3.4|
|    54|   4.4|
|    57|   5.1|
|    60|   5.6|
|    61|   6.1|
|    63|   6.4|
+------+------+



#### Converting data frame columns into feature vectors

`VectorAssembler()` function is use to convert the dataframe columns into feature vectors.
Here we use the horsepower ("hp) and weight of the car as input features and the miles-per-gallon ("mpg") as target labels.


In [7]:
assembler = VectorAssembler(
    inputCols=["height"],
    outputCol="features")

data = assembler.transform(mydf).select('features','weight')

In [8]:
data.show()

+--------+------+
|features|weight|
+--------+------+
|  [46.0]|   2.5|
|  [51.0]|   3.4|
|  [54.0]|   4.4|
|  [57.0]|   5.1|
|  [60.0]|   5.6|
|  [61.0]|   6.1|
|  [63.0]|   6.4|
+--------+------+



#### Create and Train model

We can create the model using the `LinearRegression()` class and train using the `fit()` function.


In [9]:
# Create a LR model
lr = LinearRegression(featuresCol='features', labelCol='weight', maxIter=100)
lr.setRegParam(0.1)
# Fit the model
lrModel = lr.fit(data)

24/02/06 15:50:22 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
24/02/06 15:50:22 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK


#### Save the model


In [10]:
lrModel.save('infantheight3.model')

                                                                                

#### Load the model


In [11]:
# You need LinearRegressionModel to load the model
from pyspark.ml.regression import LinearRegressionModel

In [12]:
model = LinearRegressionModel.load('infantheight3.model')

#### Make Prediction


#### Predict the weight of an infant whose height is 70 CMs.


In [13]:
# This function converts a scalar number into a dataframe that can be used by the model to predict.
def predict(weight):
    assembler = VectorAssembler(inputCols=["weight"],outputCol="features")
    data = [[weight,0]]
    columns = ["weight", "height"]
    _ = spark.createDataFrame(data, columns)
    __ = assembler.transform(_).select('features','height')
    predictions = model.transform(__)
    predictions.select('prediction').show()


In [14]:
predict(70)

+----------------+
|      prediction|
+----------------+
|7.86345471977588|
+----------------+



### 2. Baby Weight prediction Model


#### Save the model as `babyweightprediction.model`


In [15]:
lrModel.save('babyweightprediction2.model')

#### Load the model `babyweightprediction.model`


In [16]:
model = LinearRegressionModel.load('babyweightprediction2.model')

#### Predict the weight of an infant whose height is 50 CMs.


In [17]:
predict(50)

+----------------+
|      prediction|
+----------------+
|3.46668267111646|
+----------------+



24/02/06 20:10:06 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 8022420 ms exceeds timeout 120000 ms
24/02/06 20:10:06 WARN SparkContext: Killing executors is not supported by current scheduler.
24/02/06 20:10:06 WARN Executor: Issue communicating with driver in heartbeater
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:101)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:85)
	at org.apache.spark.storage.BlockManagerMaster.registerBlockManager(BlockManagerMaster.scala:80)
	at org.apache.spark.storage.BlockManager.reregister(BlockManager.scala:642)
	at org.apache.spark.executor.Executor.reportHeartBeat(Executor.scala:1223)
	at 