# Import a Spark MLlib model into IBM Watson Machine Learning

Importing a model into Watson Machine Learning means to store a trained model in your Watson Machine Learning repository and then deploy the stored model.  This notebook demonstrates importing a Spark MLlib PipelineModel object.

See also: <a href="https://dataplatform.cloud.ibm.com/docs/content/analyze-data/ml-import-spark-mllib.html" target="_blank" rel="noopener noreferrer">Importing a Spark MLlib model</a>

This notebook runs on Spark Python 3.5.


### Notebook sections

[Step 0: Build and train a model, and then save the model and training data](#step0)

[Step 1: Store the model in your Watson Machine Learning repository](#step1)

[Step 2: Deploy the stored modelin your Watson Machine Learning service](#step2)


## <a id="step0"></a> Step 0: Build, train, and save a model

**About the sample model**

The sample model built here is a logistic regression model for predicting whether or not a customer will purchase a tent from a fictional outdoor equipment store, based on the customer charateristics.

The data used to train the model is the "GoSales.csv" training data in the IBM Watson Studio community: <a href="https://dataplatform.cloud.ibm.com/exchange/public/entry/view/aa07a773f71cf1172a349f33e2028e4e" target="_blank" rel="noopener noreferrer">GoSales sample data</a>.

### Get and prepare training data

In [None]:
!pip install wget # needed to download sample training data

In [25]:
# Download sample training data to notebook working directory
import wget
training_data_url = 'https://dataplatform.cloud.ibm.com/data/exchange-api/v1/entries/aa07a773f71cf1172a349f33e2028e4e/data?accessKey=e98b7315f84e5448aa94c633ca66ea83'
filename = wget.download( training_data_url )
print( filename )

GoSales (1).csv


In [26]:
# Read sample data into a Spark DataFrame
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
df = spark.read\
  .format('org.apache.spark.sql.execution.datasources.csv.CSVFileFormat')\
  .option('header', 'true')\
  .option('inferSchema', 'true')\
  .load( filename )
df.printSchema()

root
 |-- GENDER: string (nullable = true)
 |-- AGE: integer (nullable = true)
 |-- MARITAL_STATUS: string (nullable = true)
 |-- PROFESSION: string (nullable = true)
 |-- IS_TENT: boolean (nullable = true)
 |-- PRODUCT_LINE: string (nullable = true)
 |-- PURCHASE_AMOUNT: double (nullable = true)



In [27]:
# Select columns of interest
from pyspark.sql.types import IntegerType
training_data = df.select( "GENDER", "AGE", "MARITAL_STATUS", "PROFESSION", df.IS_TENT.cast( IntegerType() ) )
training_data.printSchema()

root
 |-- GENDER: string (nullable = true)
 |-- AGE: integer (nullable = true)
 |-- MARITAL_STATUS: string (nullable = true)
 |-- PROFESSION: string (nullable = true)
 |-- IS_TENT: integer (nullable = true)



### Build a PipelineModel

In [28]:
# Create indexers for string columns
from pyspark.ml.feature import StringIndexer
indexer_GENDER         = StringIndexer( inputCol="GENDER",         outputCol="GENDER_index"         )
indexer_MARITAL_STATUS = StringIndexer( inputCol="MARITAL_STATUS", outputCol="MARITAL_STATUS_index" )
indexer_PROFESSION     = StringIndexer( inputCol="PROFESSION",     outputCol="PROFESSION_index"     )

In [29]:
# Create an assembler that generates the feature vector column
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
feature_vector_assembler = VectorAssembler( inputCols=[ "GENDER_index", "AGE", "MARITAL_STATUS_index", "PROFESSION_index" ],  outputCol="features" )

In [30]:
# Create a logistic regression model
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression( featuresCol='features', labelCol='IS_TENT' )

In [33]:
# Create a PipelineModel
from pyspark.ml import Pipeline
pipeline_org = Pipeline( stages=[ indexer_GENDER, indexer_MARITAL_STATUS, indexer_PROFESSION, feature_vector_assembler, lr ] )

### Train and evaluate the model

In [36]:
# Split the training data into a training set and a test set
train_org, test = training_data.randomSplit( [ 0.75, 0.25 ], seed = 2019 )
print( "Train count: " + str( train_org.count() ) )
print( "Test count: "  + str( test.count()  ) )

Train count: 45186
Test count: 15066


In [37]:
# Train the PipelineModel
pipeline_model_org = pipeline.fit( train_org )

In [38]:
# Evaluate the model performance
predictions_org = pipeline_model_org.transform( test )
correct_false_org = predictions_org.filter( "IS_TENT == 0 AND prediction == 0.0" )
correct_true_org = predictions_org.filter( "IS_TENT == 1 AND prediction != 0.0" )
print( "Success rate: " + str( round( 100 * ( ( correct_false_org.count() + correct_true_org.count() ) / predictions_org.count() ) ) ) + "%" )

Success rate: 78%


### Save the training data, the Pipeline, and the PipelineModel

To import your Spark MLlib PipeineModel into Watson Machine Learning, you need three things:
- The training set (Spark DataFrame)
- The PipelineModel object
- The Pipeline object (which you can get from the PipelineModel object)

In this section of the notebook, the training data and the PipelineModel object are saved to the notebook working directory.

In [39]:
pipeline_model_org.save( "tent-prediction-model" )
train_org.write.save( "training-data.parquet" )

In [52]:
# Just for interest, see what is in the directory
print( "tent-prediction-model:" )
!ls -l tent-prediction-model
print( "\ntent-prediction-model/metadata:" )
!ls -l tent-prediction-model/metadata
print( "\ntent-prediction-model/stages:" )
!ls -l tent-prediction-model/stages

tent-prediction-model:
total 8
drwxr-xr-x 2 spark spark 4096 Feb 26 19:43 metadata
drwxr-xr-x 7 spark spark 4096 Feb 26 19:43 stages

tent-prediction-model/metadata:
total 4
-rw-r--r-- 1 spark spark   0 Feb 26 19:43 _SUCCESS
-rw-r--r-- 1 spark spark 357 Feb 26 19:43 part-00000

tent-prediction-model/stages:
total 20
drwxr-xr-x 4 spark spark 4096 Feb 26 19:43 0_StringIndexer_4feba54772c2c0d34b7e
drwxr-xr-x 4 spark spark 4096 Feb 26 19:43 1_StringIndexer_47dfa36cc7a7491a90e5
drwxr-xr-x 4 spark spark 4096 Feb 26 19:43 2_StringIndexer_45a280a3ea48946c1033
drwxr-xr-x 3 spark spark 4096 Feb 26 19:43 3_VectorAssembler_4f7b9d9845f504d26247
drwxr-xr-x 4 spark spark 4096 Feb 26 19:43 4_LogisticRegression_46eaabcb01ad31bfa677


## <a id="step1"></a> Step 1: Store the model in your Watson Machine Learning repository

This section of the notebook demonstrates calling the <a href="https://wml-api-pyclient.mybluemix.net/index.html?highlight=store_model#client.Repository.store_model" target="_blank" rel="noopener noreferrer">store_model</a> function, passing the in-memory PipelineMoel object.

In [48]:
# Load the model into memory
from pyspark.ml import PipelineModel
pipeline_model = PipelineModel.load( "tent-prediction-model" )
pipeline = Pipeline( stages = pipeline_model_org.stages )
train = spark.read.load( "training-data.parquet" )

In [None]:
!pip install watson_machine_learning_client # Needed to work with the Watson Machine Learning Python client

Paste your Watson Machine Learning credentials in the following cell.

See: <a href="https://dataplatform.cloud.ibm.com/docs/content/analyze-data/ml-get-wml-credentials.html" target="_blank" rel="noopener noreferrer">Looking up credentials</a>

In [50]:
# Create a Watson Machine Learning client instance
from watson_machine_learning_client import WatsonMachineLearningAPIClient
wml_credentials = {
    "instance_id": "",
    "password": "",
    "url": "",
    "username": ""
}
client = WatsonMachineLearningAPIClient( wml_credentials )

In [51]:
# Store the PipelineModel in the Watson Machine Learning repository
model_details = client.repository.store_model( pipeline_model, 'Spark MLlib model', training_data=train, pipeline=pipeline )

## <a id="step2"></a> Step 2: Deploy the stored model in your Watson Machine Learning service

In [53]:
# Deploy the stored model as an online web service deployment
model_id = model_details["metadata"]["guid"]
deployment_details = client.deployments.create( artifact_uid=model_id, name="Spark MLlib model deployment" )



#######################################################################################

Synchronous deployment creation for uid: '87d108a2-ef70-4c15-844a-b6576ee5ca43' started

#######################################################################################


INITIALIZING
DEPLOY_IN_PROGRESS.........
DEPLOY_SUCCESS


------------------------------------------------------------------------------------------------
Successfully finished deployment creation, deployment_uid='603efbe3-0e1a-4fa1-8af7-9d2ebf2a91e7'
------------------------------------------------------------------------------------------------




In [55]:
# Test the deployment
model_endpoint_url = client.deployments.get_scoring_url( deployment_details )
payload = { "fields" : [ "GENDER", "AGE", "MARITAL_STATUS", "PROFESSION" ], "values" : [ [ "M", 27, "Single", "Professional" ] ] }
client.deployments.score( model_endpoint_url, payload )

{'fields': ['GENDER',
  'AGE',
  'MARITAL_STATUS',
  'PROFESSION',
  'GENDER_index',
  'MARITAL_STATUS_index',
  'PROFESSION_index',
  'features',
  'rawPrediction',
  'probability',
  'prediction'],
 'values': [['M',
   27,
   'Single',
   'Professional',
   0.0,
   1.0,
   1.0,
   [0.0, 27.0, 1.0, 1.0],
   [0.16773330636208073, -0.16773330636208073],
   [0.541835288108435, 0.458164711891565],
   0.0]]}

In [56]:
# Testing the model locally gets the same results
test_df = spark.createDataFrame( [ ( "M", 27, "Single", "Professional" ) ], [ "GENDER", "AGE", "MARITAL_STATUS", "PROFESSION" ] )
pipeline_model.transform( test_df ).show()

+------+---+--------------+------------+------------+--------------------+----------------+------------------+--------------------+--------------------+----------+
|GENDER|AGE|MARITAL_STATUS|  PROFESSION|GENDER_index|MARITAL_STATUS_index|PROFESSION_index|          features|       rawPrediction|         probability|prediction|
+------+---+--------------+------------+------------+--------------------+----------------+------------------+--------------------+--------------------+----------+
|     M| 27|        Single|Professional|         0.0|                 1.0|             1.0|[0.0,27.0,1.0,1.0]|[0.16773330636208...|[0.54183528810843...|       0.0|
+------+---+--------------+------------+------------+--------------------+----------------+------------------+--------------------+--------------------+----------+



## Summary
In this notebook, you imported a Spark MLlib PipelineModel into Watson Machine Learning using the Watson Machine Learning Python client.

### <a id="authors"></a>Authors

**Sarah Packowski** is a member of the IBM Watson Studio Content Design team in Canada.


<hr>
Copyright &copy; IBM Corp. 2019. This notebook and its source code are released under the terms of the MIT License.

<div style="background:#F5F7FA; height:110px; padding: 2em; font-size:14px;">
<span style="font-size:18px;color:#152935;">Love this notebook? </span>
<span style="font-size:15px;color:#152935;float:right;margin-right:40px;">Don't have an account yet?</span><br>
<span style="color:#5A6872;">Share it with your colleagues and help them discover the power of Watson Studio!</span>
<span style="border: 1px solid #3d70b2;padding:8px;float:right;margin-right:40px; color:#3d70b2;"><a href="https://ibm.co/wsnotebooks" target="_blank" style="color: #3d70b2;text-decoration: none;">Sign Up</a></span><br>
</div>