<table style="border: none" align="left">
   <tr style="border: none">
      <th style="border: none"><font face="verdana" size="5" color="black"><b>Predict outdoor equipment purchase with IBM Watson Machine Learning</b></th>
      <th style="border: none"><img src="https://github.com/pmservice/customer-satisfaction-prediction/blob/master/app/static/images/ml_icon_gray.png?raw=true" alt="Watson Machine Learning icon" height="40" width="40"></th>
   </tr>
   <tr style="border: none">
       <th style="border: none"><img src="https://github.com/pmservice/wml-sample-models/blob/master/spark/product-line-prediction/images/products_graphics.png?raw=true" alt="Icon" width="800"> </th>
   </tr>
</table>

This notebook contains steps and code to get data from the IBM Data Science Experience Community, create a predictive model, and start scoring new data. This notebook introduces commands for getting data and for basic data cleaning and exploration, pipeline creation, model training, model persistance to Watson Machine Learning repository, model deployment, and scoring.

Some familiarity with Scala is helpful. This notebook uses Scala 2.11 and Apache® Spark 2.0.

You will use a publicly available data set, **GoSales Transactions for Naive Bayes Model**, which details anonymous outdoor equipment purchases. Use the details of this data set to predict clients' interests in terms of product line, such as golf accessories, camping equipment, and others.

## Learning goals

The learning goals of this notebook are:

-  Load a CSV file into an Apache® Spark DataFrame.
-  Explore data.
-  Prepare data for training and evaluation.
-  Create an Apache® Spark machine learning pipeline.
-  Train and evaluate a model.
-  Persist a pipeline and model in Watson Machine Learning repository.
-  Deploy a model for online scoring using Wastson Machine Learning API.
-  Score sample scoring data using the Watson Machine Learning API.


## Contents

This notebook contains the following parts:

1.	[Setup](#setup)
2.	[Load and explore data](#load)
3.	[Create spark ml model](#model)
4.	[Persist model](#persistence)
5.	[Predict locally and visualize](#visualization)
6.	[Deploy and score in a Cloud](#scoring)
7.	[Summary and next steps](#summary)

<a id="setup"></a>
## 1. Setup

Before you use the sample code in this notebook, you must perform the following setup tasks:

-  Create a [Watson Machine Learning Service](https://console.ng.bluemix.net/catalog/services/ibm-watson-machine-learning/) instance (a free plan is offered). 
-  Upload **GoSales Transactions** data as a data asset in IBM Data Science Experience.
-  Make sure that you are using a Spark 2.0 kernel.

### Create the GoSales Transactions Data Asset  

The GOSales data is a freely available data set on the Data Science Experience home page.

1.  Go to the [GoSales Transactions for Naive Bayes Model](https://apsportal.ibm.com/exchange-api/v1/entries/8044492073eb964f46597b4be06ff5ea/data?accessKey=9561295fa407698694b1e254d0099600) data card on the Data Science Experience **Community** page and open the card by double-clicking it.
2.  Click the link icon.
4.  Select the link, copy it by pressing Ctrl+C, and then, click **Close**.
5.  In the following cell, replace the **link_to_data** variable value with the link.

<a id="load"></a>
## 2. Load and explore data

In this section you will load the data as an Apache® Spark DataFrame and perform a basic exploration.

Load the data to the Spark DataFrame by using *wget* to upload the data to gpfs and then *read* method. 

In [None]:
import scala.sys.process._

"wget https://apsportal.ibm.com/exchange-api/v1/entries/8044492073eb964f46597b4be06ff5ea/data?accessKey=9561295fa407698694b1e254d0099600".!

In [None]:
val filename = "data?accessKey=9561295fa407698694b1e254d0099600"

In [None]:
val df_data = spark.
    read.format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat").
    option("header", "true").
    option("inferSchema", "true").
    load(filename)

Explore the loaded data by using the following Apache® Spark DataFrame methods:
-  print schema
-  print top ten records
-  count all records

In [None]:
df_data.printSchema()

As you can see, the data contains five fields. PRODUCT_LINE field is the one we would like to predict (label).

In [None]:
df_data.show()

In [None]:
println("Total number of records: " + df_data.count())

As you can see, the data set contains 60252 records.

<a id="model"></a>
## 3. Create an Apache® Spark machine learning model

In this section you will learn how to prepare data, create an Apache® Spark machine learning pipeline, and train a model.

### 3.1: Prepare data

In this subsection you will split your data into: train, test and predict datasets.

In [None]:
val splits = df_data.randomSplit(Array(0.8, 0.18, 0.02), seed = 24L)
val training_data = splits(0).cache()
val test_data = splits(1)
val prediction_data = splits(2)

println("Number of training records: " + training_data.count())
println("Number of testing records: " + test_data.count())
println("Number of prediction records: " + prediction_data.count())

As you can see our data has been successfully split into three datasets: 

-  The train data set, which is the largest group, is used for training.
-  The test data set will be used for model evaluation and is used to test the assumptions of the model.
-  The predict data set will be used for prediction.

### 3.2: Create pipeline and train a model

In this section you will create an Apache® Spark machine learning pipeline and then train the model.

In the first step you need to import the Apache® Spark machine learning packages that will be needed in the subsequent steps.

In [None]:
// Spark ML libraries

import org.apache.spark.ml.classification.RandomForestClassifier
import org.apache.spark.ml.feature.{OneHotEncoder, StringIndexer, IndexToString, VectorAssembler}
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.ml.{Model, Pipeline, PipelineStage, PipelineModel}
import org.apache.spark.sql.SparkSession

In the following step, convert all the string fields to numeric ones by using the StringIndexer transformer.

In [None]:
val stringIndexer_label = new StringIndexer().setInputCol("PRODUCT_LINE").setOutputCol("label").fit(df_data)
val stringIndexer_prof = new StringIndexer().setInputCol("PROFESSION").setOutputCol("PROFESSION_IX")
val stringIndexer_gend = new StringIndexer().setInputCol("GENDER").setOutputCol("GENDER_IX")
val stringIndexer_mar = new StringIndexer().setInputCol("MARITAL_STATUS").setOutputCol("MARITAL_STATUS_IX")

In the following step, create a feature vector by combining all features together.

In [None]:
val vectorAssembler_features = new VectorAssembler().setInputCols(Array("GENDER_IX", "AGE", "MARITAL_STATUS_IX", "PROFESSION_IX")).setOutputCol("features")

Next, define estimators you want to use for classification. Random Forest is used in the following example.

In [None]:
val rf = new RandomForestClassifier().setLabelCol("label").setFeaturesCol("features").setNumTrees(10)

Finally, indexed labels back to original labels.

In [None]:
val labelConverter = new IndexToString().setInputCol("prediction").setOutputCol("predictedLabel").setLabels(stringIndexer_label.labels)

Let's build the pipeline now. A pipeline consists of transformers and an estimator.

In [None]:
val pipeline_rf = new Pipeline().setStages(Array(stringIndexer_label, stringIndexer_prof, stringIndexer_gend, stringIndexer_mar, vectorAssembler_features, rf, labelConverter))

Now, you can train your Random Forest model by using the previously defined **pipeline** and **training data**.

In [None]:
training_data.printSchema()

In [None]:
val model_rf = pipeline_rf.fit(training_data)

You can check your **model accuracy** now. To evaluate the model, use **test data**.

In [None]:
val predictions = model_rf.transform(test_data)
val evaluatorRF = new MulticlassClassificationEvaluator().setLabelCol("label").setPredictionCol("prediction").setMetricName("accuracy")
val accuracy = evaluatorRF.evaluate(predictions)

println("Accuracy = " + accuracy)
println("Test Error = " + (1.0 - accuracy))

You can tune your model now to achieve better accuracy. For simplicity of this example tuning section is omitted.

<a id="persistence"></a>
## 4. Persist model

In this section you will learn how to store your pipeline and model in Watson Machine Learning repository by using Scala client libraries.

First, you must import client libraries.

**Note**: Apache® Spark 2.0 or higher is required.

In [None]:
// WML client library

import com.ibm.analytics.ngp.repository._

// Helper libraries

import scalaj.http.{Http, HttpOptions}
import scala.util.{Success, Failure}
import java.util.Base64
import java.nio.charset.StandardCharsets
import play.api.libs.json._

Authenticate to Watson Machine Learning service on Bluemix.

**Action**: Put authentication information from your instance of Watson Machine Learning service here.</div>

In [None]:
val wml_service_path = "https://ibm-watson-ml.mybluemix.net"
val wml_instance_id = "***"
val wml_username = "***"
val wml_password = "***"

**Tip**: `wml_service_path`, `wml_user` and `wml_password` can be found on **Service Credentials** tab of service instance created in Bluemix. If you cannot see **instance_id** field in **Serice Credentials** generate new credentials by pressing **New credential (+)** button. 

In [None]:
val client = MLRepositoryClient(wml_service_path)
client.authorize(wml_username, wml_password)

Create model artifact (abstraction layer).

In [None]:
val model_artifact = MLRepositoryArtifact(model_rf, training_data, "WML Product Line Prediction Model")

**Tip**: The MLRepositoryArtifact method expects a trained model object, training data, and a model name. (It is this model name that is displayed by the Watson Machine Learning service).

### 4.1: Save pipeline and model

In this subsection you will learn how to save pipeline and model artifacts to your Watson Machine Learning instance.

In [None]:
val saved_model = client.models.save(model_artifact).get

Get saved model metadata from Watson Machine Learning.

**Tip**: Use *meta.availableProps* to get the list of available props.

In [None]:
saved_model.meta.availableProps

In [None]:
println("modelType: " + saved_model.meta.prop("modelType"))
println("trainingDataSchema: " + saved_model.meta.prop("trainingDataSchema"))
println("creationTime: " + saved_model.meta.prop("creationTime"))
println("modelVersionHref: " + saved_model.meta.prop("modelVersionHref"))
println("label: " + saved_model.meta.prop("label"))

**Tip**: **modelVersionHref** is our model unique indentifier in the Watson Machine Learning repository.

### 4.2: Load model and make predictions

In this subsection you will learn how to load back saved model from specified instance of Watson Machine Learning.

In [None]:
val model_version_href = saved_model.meta.prop("modelVersionHref").get
val loaded_model_artifact = client.models.version(model_version_href).get

You can print for example model name to make sure that model artifact has been loaded correctly.

In [None]:
loaded_model_artifact.name.mkString

As you can see the name is correct. 

In [None]:
loaded_model_artifact match {
        case SparkPipelineModelLoader(Success(model)) => {
          val predictions = model.transform(prediction_data)
        }
        case SparkPipelineModelLoader(Failure(e)) => "Loading failed."
        case _ => println(s"Unexpected artifact class: ${loaded_model_artifact.getClass}")
    }
predictions.select("GENDER", "AGE", "MARITAL_STATUS", "PROFESSION", "predictedLabel").show()

By tabulating a count, you can see which product line is the most popular.

In [None]:
predictions.select("predictedLabel").groupBy("predictedLabel").count().show()

You have already learned how save and load the model from Watson Machine Learning repository.

<a id="scoring"></a>
## 5. Deploy and score in a Cloud

In this section you will learn how to create online scoring and to score a new data record by using the Watson Machine Learning REST API. 
For more information about REST APIs, see the [Swagger Documentation](http://watson-ml-api.mybluemix.net/).

To work with the Watson Machine Leraning REST API you must generate an access token. To do that you can use the following sample code:

In [None]:
// Get WML service instance token

val wml_auth_header = "Basic " + Base64.getEncoder.encodeToString((wml_username + ":" + wml_password).getBytes(StandardCharsets.UTF_8))
val wml_url = wml_service_path + "/v3/identity/token"
val wml_response = Http(wml_url).header("Authorization", wml_auth_header).asString
val wml_token_json:JsValue = Json.parse(wml_response.body)

val wml_token = (wml_token_json \ "token").asOpt[String] match {
    case Some(x) => x
    case None => ""
}

In [None]:
println(wml_token)

### 5.1: Create online scoring endpoint

Now you can create an online scoring endpoint. Execute the following sample code that uses the publishedModelId value to create the scoring endpoint to the Bluemix repository.

#### Get published_models url from instance details

In [None]:
val endpoint_instance = service_path + "/v3/wml_instances/" + instance_id
val wml_response_instance = Http(endpoint_instance).header("Content-Type", "application/json").header("Authorization", "Bearer " + wml_token).option(HttpOptions.connTimeout(10000)).option(HttpOptions.readTimeout(50000)).asString

In [None]:
wml_response_instance

In [None]:
val published_models_json: JsValue = Json.parse(wml_response_instance.body)
val published_models_url = (((published_models_json \ "entity") \\ "published_models")(0) \ "url").as[JsString].value
published_models_url

#### Get list of published models

In [None]:
val wml_models = Http(published_models_url).header("Content-Type", "application/json").header("Authorization", "Bearer " + wml_token).option(HttpOptions.connTimeout(10000)).option(HttpOptions.readTimeout(50000)).asString
wml_models

In [None]:
var deployment_endpoint: String = _
wml_models.body.split("\"").map{ s => {if ((s contains "deployments") & (s contains saved_model.uid.mkString)) {deployment_endpoint = s}}}
deployment_endpoint

#### Create online deployment for published model

In [None]:
val payload_name = "Online scoring"
val payload_data_online = Json.stringify(Json.toJson(Map("type" -> "online", "name" -> payload_name)))

In [None]:
val response_online = Http(deployment_endpoint).postData(payload_data_online).header("Content-Type", "application/json").header("Authorization", "Bearer " + wml_token).option(HttpOptions.connTimeout(50000)).option(HttpOptions.readTimeout(50000)).asString

In [None]:
val scoring_url_json: JsValue = Json.parse(response_online.body)
val scoring_url = (scoring_url_json \ "entity" \ "scoring_url").asOpt[String] match {
    case Some(x) => x
    case None => ""
}

In [None]:
print(scoring_url)

In [None]:
val payload_scoring = Json.stringify(Json.toJson(Map("fields" -> Json.toJson(List(Json.toJson("GENDER"), Json.toJson("AGE"), Json.toJson("MARITAL_STATUS"), Json.toJson("PROFESSION"))),
                                                    "values" -> Json.toJson(List(List(Json.toJson("M"), Json.toJson(55), Json.toJson("Single"), Json.toJson("Executive")))))))

In [None]:
payload_scoring

Now, you can send (POST) new scoring records (new data) for which you would like to get predictions. To do that, execute the following sample code: 

In [None]:
val response_scoring = Http(scoring_url).postData(payload_scoring).header("Content-Type", "application/json").header("Authorization", "Bearer " + wml_token).option(HttpOptions.method("POST")).option(HttpOptions.connTimeout(10000)).option(HttpOptions.readTimeout(50000)).asString

In [None]:
print(response_scoring)

As we can see we predict that a 55-year-old single male executive is interested in Mountaineering Equipment (prediction: 2.0).

<a id="summary"></a>
## 6. Summary and next steps 

You successfully completed this notebook! You learned how to use Apache Spark machine learning as well as Watson Machine Learning for model creation and deployment. Check out our _[Online Documentation](https://console.ng.bluemix.net/docs/services/PredictiveModeling/pm_service_api_spark.html)_ for more samples, tutorials, documentation, how-tos, and blog posts. 
 
### Authors

**Umit Mert Cakmak** is Data Scientist in IBM with a track record of developing enterprise-level applications that substantially increases clients' ability to turn data into actionable insights.

Copyright © 2017 IBM. This notebook and its source code are released under the terms of the MIT License.