# Watson Machine Learning (WML) Demo
This notebook demos the use of WML

In [4]:
import scala.sys.process._
// use wget to get the data from the DSX sample data
// todo: explore a better way to load the data
"wget https://apsportal.ibm.com/exchange-api/v1/entries/8044492073eb964f46597b4be06ff5ea/data?accessKey=4a0df0ee0ec2e4e1f7f8988aaddb26e7".!

--2017-10-03 01:27:53--  https://apsportal.ibm.com/exchange-api/v1/entries/8044492073eb964f46597b4be06ff5ea/data?accessKey=4a0df0ee0ec2e4e1f7f8988aaddb26e7
Resolving apsportal.ibm.com (apsportal.ibm.com)... 75.126.81.68
Connecting to apsportal.ibm.com (apsportal.ibm.com)|75.126.81.68|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: unspecified [application/octet-stream]
Saving to: ‘data?accessKey=4a0df0ee0ec2e4e1f7f8988aaddb26e7’

     0K .......... .......... .......... .......... .......... 5.54M
    50K .......... .......... .......... .......... .......... 12.0M
   100K .......... .......... .......... .......... .......... 12.0M
   150K .......... .......... .......... .......... .......... 8.74M
   200K .......... .......... .......... .......... .......... 12.5M
   250K .......... .......... .......... .......... .......... 5.32M
   300K .......... .......... .......... .......... .......... 9.79M
   350K .......... .......... .......... .......... .....

0

In [5]:
val filename = "data?accessKey=4a0df0ee0ec2e4e1f7f8988aaddb26e7"

In [6]:
// create spark data frame form the data
val df_data = spark.
    read.format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat").
    option("header", "true").
    option("inferSchema", "true").
    load(filename)
df_data.take(5)

[Stage 2:>                                                          (0 + 1) / 2]

Array([Personal Accessories,M,27,Single,Professional], [Personal Accessories,F,39,Married,Other], [Mountaineering Equipment,F,39,Married,Other], [Personal Accessories,F,56,Unspecified,Hospitality], [Golf Equipment,M,45,Married,Retired])

In [7]:
// print the schema
df_data.printSchema()

root
 |-- PRODUCT_LINE: string (nullable = true)
 |-- GENDER: string (nullable = true)
 |-- AGE: integer (nullable = true)
 |-- MARITAL_STATUS: string (nullable = true)
 |-- PROFESSION: string (nullable = true)



In [9]:
// show the data
df_data.show()

+--------------------+------+---+--------------+------------+
|        PRODUCT_LINE|GENDER|AGE|MARITAL_STATUS|  PROFESSION|
+--------------------+------+---+--------------+------------+
|Personal Accessories|     M| 27|        Single|Professional|
|Personal Accessories|     F| 39|       Married|       Other|
|Mountaineering Eq...|     F| 39|       Married|       Other|
|Personal Accessories|     F| 56|   Unspecified| Hospitality|
|      Golf Equipment|     M| 45|       Married|     Retired|
|      Golf Equipment|     M| 45|       Married|     Retired|
|   Camping Equipment|     F| 39|       Married|       Other|
|   Camping Equipment|     F| 49|       Married|       Other|
|  Outdoor Protection|     F| 49|       Married|       Other|
|      Golf Equipment|     M| 47|       Married|     Retired|
|      Golf Equipment|     M| 47|       Married|     Retired|
|Mountaineering Eq...|     M| 21|        Single|      Retail|
|Personal Accessories|     F| 66|       Married|       Other|
|   Camp

In [13]:
df_data.count()

60252

In [15]:
// split data into traning, testing (evaluation), prediction
val splits = df_data.randomSplit(Array(0.8, 0.18, 0.02), seed = 24L)
// cache the training data
val training_data = splits(0).cache()
val test_data = splits(1)
val prediction_data = splits(2)
println(s"Training data recod count = ${training_data.count()}")
println(s"Test data recod count = ${test_data.count()}")
println(s"Prediction data recod count = ${prediction_data.count()}")

Training data recod count = 48176
Test data recod count = 10860
Prediction data recod count = 1216


In [16]:
// load spark ML libraries
import org.apache.spark.ml.classification.RandomForestClassifier
import org.apache.spark.ml.feature.{OneHotEncoder, StringIndexer, IndexToString, VectorAssembler}
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.ml.{Model, Pipeline, PipelineStage, PipelineModel}
import org.apache.spark.sql.SparkSession

In [17]:
// conver all string columns to integer which is faster to manipulate
val stringIndexer_label = new StringIndexer().setInputCol("PRODUCT_LINE").setOutputCol("label").fit(df_data)
val stringIndexer_prof = new StringIndexer().setInputCol("PROFESSION").setOutputCol("PROFESSION_IX")
val stringIndexer_gend = new StringIndexer().setInputCol("GENDER").setOutputCol("GENDER_IX")
val stringIndexer_mar = new StringIndexer().setInputCol("MARITAL_STATUS").setOutputCol("MARITAL_STATUS_IX")

In [18]:
// create a vector of all the features
val vectorAssembler_features = new VectorAssembler().setInputCols(Array("GENDER_IX", "AGE", "MARITAL_STATUS_IX", "PROFESSION_IX")).setOutputCol("features")

In [19]:
// create a random forest model with 10 tree (very low)
val rf = new RandomForestClassifier().setLabelCol("label").setFeaturesCol("features").setNumTrees(10)

In [20]:
// index back integer to strings
val labelConverter = new IndexToString().setInputCol("prediction").setOutputCol("predictedLabel").setLabels(stringIndexer_label.labels)

In [21]:
// create a pipeline that consists of the following stages: string to integer indexers for each column, feature assembly, RandomForest model, 
// label converter
val pipeline_rf = new Pipeline().setStages(Array(stringIndexer_label, 
                                                 stringIndexer_prof, 
                                                 stringIndexer_gend, 
                                                 stringIndexer_mar, 
                                                 vectorAssembler_features, 
                                                 rf, 
                                                 labelConverter))

In [22]:
// now train the RandomForest model
val model_rf = pipeline_rf.fit(training_data)

In [23]:
// evaluate the model using test data and the spark MultiClassClassificationEvaluator
// create predications
val predictions = model_rf.transform(test_data)
// compare predication with actual data
val evaluatorRF = new MulticlassClassificationEvaluator().setLabelCol("label").setPredictionCol("prediction").setMetricName("accuracy")
val accuracy = evaluatorRF.evaluate(predictions)
println("Accuracy = " + accuracy)
println("Test Error = " + (1.0 - accuracy))

Accuracy = 0.5736648250460405
Test Error = 0.42633517495395945


In [24]:
// WML client library

import com.ibm.analytics.ngp.repository._

// Helper libraries

import scalaj.http.{Http, HttpOptions}
import scala.util.{Success, Failure}
import java.util.Base64
import java.nio.charset.StandardCharsets
import play.api.libs.json._

In [26]:
// add my authentication information from my WML instance
val service_path = "https://ibm-watson-ml.mybluemix.net"
val instance_id = "5b6f632b-b665-4b48-945a-d07b1fcce911"
val username = "e2b996c5-aac8-4a3c-a165-e8c30577ef5f"
val password = "9fe79cd4-5753-48cc-8106-a6a83cfb8f96"

In [27]:
// point to my WML instance
val client = MLRepositoryClient(service_path)
client.authorize(username, password)

Success(())

In [28]:
// create a model artifacts: the model, training data and model name
val model_artifact = MLRepositoryArtifact(model_rf, training_data, "WML Product line predictor")

In [29]:
// save the model artifacts
val saved_model = client.models.save(model_artifact).get

In [30]:
// get meta data of the model
saved_model.meta.availableProps

Vector(trainingDataSchema, lastUpdated, label, inputDataSchema, modelVersionHref, prediction, modelType, version, modelHref, pipelineVersionHref, runtime, creationTime, probability)

In [31]:
// display model meta data
println("modelType: " + saved_model.meta.prop("modelType"))
println("trainingDataSchema: " + saved_model.meta.prop("trainingDataSchema"))
println("creationTime: " + saved_model.meta.prop("creationTime"))
println("modelVersionHref: " + saved_model.meta.prop("modelVersionHref"))
println("label: " + saved_model.meta.prop("label"))

modelType: Some(sparkml-model-2.0)
trainingDataSchema: Some({"type":"struct","fields":[{"name":"PRODUCT_LINE","type":"string","nullable":true,"metadata":{}},{"name":"GENDER","type":"string","nullable":true,"metadata":{}},{"name":"AGE","type":"integer","nullable":true,"metadata":{}},{"name":"MARITAL_STATUS","type":"string","nullable":true,"metadata":{}},{"name":"PROFESSION","type":"string","nullable":true,"metadata":{}}]})
creationTime: Some(2017-10-03T07:13:11.336Z)
modelVersionHref: Some(https://ibm-watson-ml.mybluemix.net/v2/artifacts/models/9fc98857-92b8-4be8-bc96-0606119859a9/versions/11ace650-5cdd-431a-ab7d-f9d1f585c02d)
label: Some(PRODUCT_LINE)


In [32]:
// load model back from WML
val model_version_href = saved_model.meta.prop("modelVersionHref").get
val loaded_model_artifact = client.models.version(model_version_href).get

In [33]:
// print the name of the model
loaded_model_artifact.name.mkString

WML Product line predictor

In [34]:
// show predications
loaded_model_artifact match {
        case SparkPipelineModelLoader(Success(model)) => {
          val predictions = model.transform(prediction_data)
        }
        case SparkPipelineModelLoader(Failure(e)) => "Loading failed."
        case _ => println(s"Unexpected artifact class: ${loaded_model_artifact.getClass}")
    }
predictions.select("GENDER", "AGE", "MARITAL_STATUS", "PROFESSION","predictedLabel").show()

+------+---+--------------+-----------+--------------------+
|GENDER|AGE|MARITAL_STATUS| PROFESSION|      predictedLabel|
+------+---+--------------+-----------+--------------------+
|     F| 18|        Single|      Other|Personal Accessories|
|     F| 18|        Single|     Retail|Personal Accessories|
|     F| 19|        Single|Hospitality|   Camping Equipment|
|     F| 19|        Single|Hospitality|   Camping Equipment|
|     F| 19|        Single|Hospitality|   Camping Equipment|
|     F| 19|        Single|Hospitality|   Camping Equipment|
|     F| 19|        Single|      Other|Personal Accessories|
|     F| 19|        Single|      Other|Personal Accessories|
|     F| 19|        Single|      Other|Personal Accessories|
|     F| 19|        Single|      Other|Personal Accessories|
|     F| 19|        Single|      Other|Personal Accessories|
|     F| 19|        Single|      Other|Personal Accessories|
|     F| 19|        Single|      Other|Personal Accessories|
|     F| 19|        Sing

In [35]:
// show count of the most popular products
predictions.select("predictedLabel").groupBy("predictedLabel").count().show()

|      predictedLabel|count|
+--------------------+-----+
|   Camping Equipment| 7238|
|      Golf Equipment|  677|
|Mountaineering Eq...|  192|
|Personal Accessories| 2753|
+--------------------+-----+



In [36]:
// to prepare for inoking the WML Rest API we need to generte a token
// Get WML service instance token

val wml_auth_header = "Basic " + Base64.getEncoder.encodeToString((username + ":" + password).getBytes(StandardCharsets.UTF_8))
val wml_url = service_path + "/v3/identity/token"
val wml_response = Http(wml_url).header("Authorization", wml_auth_header).asString
val wmltoken_json: JsValue = Json.parse(wml_response.body)

val wmltoken = (wmltoken_json \ "token").asOpt[String] match {
    case Some(x) => x
    case None => ""
}

In [37]:
// show the token is not empty
wmltoken

eyJhbGciOiJSUzUxMiIsInR5cCI6IkpXVCJ9.eyJ0ZW5hbnRJZCI6IjViNmY2MzJiLWI2NjUtNGI0OC05NDVhLWQwN2IxZmNjZTkxMSIsImluc3RhbmNlSWQiOiI1YjZmNjMyYi1iNjY1LTRiNDgtOTQ1YS1kMDdiMWZjY2U5MTEiLCJwbGFuSWQiOiIzZjZhY2Y0My1lZGU4LTQxM2EtYWM2OS1mOGFmM2JiMGNiZmUiLCJyZWdpb24iOiJ1cy1zb3V0aCIsInVzZXJJZCI6ImUyYjk5NmM1LWFhYzgtNGEzYy1hMTY1LWU4YzMwNTc3ZWY1ZiIsImlzcyI6Imh0dHA6Ly8xMjkuNDEuMjI5LjE4ODo4MDgwL3YyL2lkZW50aXR5IiwiaWF0IjoxNTA3MDE1NDU3LCJleHAiOjE1MDcwNDQyNTd9.hV3vUwHD4c45xiIz9amujTjf9daiZl7KLrM_vKL1ltn1uRwBD9gy5s5MyzqvMNNCDBdStYM-ZIRAhOrr-QnwNP89p7fYI46PTXZ8Lsx6PJ1gVkXPwP2eqnP5zvWYct56DjMQWEXb4Yi6xYCe6pGxinIIXXjxkB6Gw7UbAcYGO3Th4xS96E1T8DUzLReji0TNp0q3BgoLh-EUnIkRCpoFMXmrCkCgQ1UqAczysTGEFDC4USHWf85E0kGvADnKXkznAHtnxF0lvcrHKxCfu_tjvkJLnEghpjSFYqTBk5b6iN8QIAp9s7KVtyltyPQoe9mlWw-4zrLJRRO9xaR20goG2g

In [38]:
// get end point instance
val endpoint_instance = service_path + "/v3/wml_instances/" + instance_id
val wml_response_instance = Http(endpoint_instance).header("Content-Type", "application/json").header("Authorization", 
        "Bearer " + wmltoken).option(HttpOptions.connTimeout(10000)).option(HttpOptions.readTimeout(50000)).asString

In [39]:
// display response instance
wml_response_instance

HttpResponse({"metadata":{"guid":"5b6f632b-b665-4b48-945a-d07b1fcce911","url":"https://ibm-watson-ml.mybluemix.net/v3/wml_instances/5b6f632b-b665-4b48-945a-d07b1fcce911","created_at":"2017-10-03T06:07:26.785Z","modified_at":"2017-10-03T07:13:11.427Z"},"entity":{"source":"Bluemix","published_models":{"url":"https://ibm-watson-ml.mybluemix.net/v3/wml_instances/5b6f632b-b665-4b48-945a-d07b1fcce911/published_models"},"usage":{"expiration_date":"2017-11-01T00:00:00.000Z","computation_time":{"limit":18000,"current":0},"model_count":{"limit":200,"current":1},"prediction_count":{"limit":5000,"current":0},"deployment_count":{"limit":5,"current":0}},"plan_id":"3f6acf43-ede8-413a-ac69-f8af3bb0cbfe","status":"Active","organization_guid":"80b37ee2-c6d0-4e5b...

In [40]:
// get published models
val published_models_json: JsValue = Json.parse(wml_response_instance.body)
val published_models_url = (((published_models_json \ "entity") \\ "published_models")(0) \ "url").as[JsString].value
published_models_url

https://ibm-watson-ml.mybluemix.net/v3/wml_instances/5b6f632b-b665-4b48-945a-d07b1fcce911/published_models

In [41]:
// display published models
val wml_models = Http(published_models_url).header("Content-Type", "application/json").header("Authorization", "Bearer " + wmltoken).option(HttpOptions.connTimeout(10000)).option(HttpOptions.readTimeout(50000)).asString
wml_models

HttpResponse({"count":1,"resources":[{"metadata":{"guid":"9fc98857-92b8-4be8-bc96-0606119859a9","url":"https://ibm-watson-ml.mybluemix.net/v3/wml_instances/5b6f632b-b665-4b48-945a-d07b1fcce911/published_models/9fc98857-92b8-4be8-bc96-0606119859a9","created_at":"2017-10-03T07:13:11.336Z","modified_at":"2017-10-03T07:13:11.547Z"},"entity":{"runtime_environment":"spark-2.0","author":{},"name":"WML Product line predictor","label_col":"PRODUCT_LINE","training_data_schema":{"type":"struct","fields":[{"name":"PRODUCT_LINE","type":"string","nullable":true,"metadata":{}},{"name":"GENDER","type":"string","nullable":true,"metadata":{}},{"name":"AGE","type":"integer","nullable":true,"metadata":{}},{"name":"MARITAL_STATUS","type":"string","nullable":true,"m...

In [42]:
// get the deployment end point and sisplay it
var deployment_endpoint: String = _
wml_models.body.split("\"").map{ s => {if ((s contains "deployments") & (s contains saved_model.uid.mkString)) {deployment_endpoint = s}}}
deployment_endpoint

https://ibm-watson-ml.mybluemix.net/v3/wml_instances/5b6f632b-b665-4b48-945a-d07b1fcce911/published_models/9fc98857-92b8-4be8-bc96-0606119859a9/deployments

In [43]:
// craete online deploymnet
val payload_name = "Online scoring"
val payload_data_online = Json.stringify(Json.toJson(Map("type" -> "online", "name" -> payload_name)))

In [44]:
// send request
val response_online = Http(deployment_endpoint).postData(payload_data_online).header("Content-Type", 
 "application/json").header("Authorization", "Bearer " + wmltoken).option(HttpOptions.connTimeout(50000)).option(HttpOptions.readTimeout(50000)).asString

In [45]:
// parese response
val scoring_url_json: JsValue = Json.parse(response_online.body)
val scoring_url = (scoring_url_json \ "entity" \ "scoring_url").asOpt[String] match {
    case Some(x) => x
    case None => ""
}
println(scoring_url)

https://ibm-watson-ml.mybluemix.net/v3/wml_instances/5b6f632b-b665-4b48-945a-d07b1fcce911/published_models/9fc98857-92b8-4be8-bc96-0606119859a9/deployments/dca80765-3719-480f-80f2-63b952d220bb/online


In [46]:
// create payload for scoring with a random values for the features
val payload_scoring = Json.stringify(Json.toJson(Map("fields" -> Json.toJson(List(Json.toJson("GENDER"), 
    Json.toJson("AGE"), Json.toJson("MARITAL_STATUS"), Json.toJson("PROFESSION"))),
    "values" -> Json.toJson(List(List(Json.toJson("M"), Json.toJson(55), Json.toJson("Single"), Json.toJson("Executive")))))))
payload_scoring

{"fields":["GENDER","AGE","MARITAL_STATUS","PROFESSION"],"values":[["M",55,"Single","Executive"]]}

In [47]:
// post request
val response_scoring = Http(scoring_url).postData(payload_scoring).header("Content-Type", 
    "application/json").header("Authorization", "Bearer " + wmltoken).option(HttpOptions.method("POST")).option(HttpOptions.connTimeout(10000)).option(HttpOptions.readTimeout(50000)).asString

In [48]:
println(response_scoring)

HttpResponse({
  "fields": ["GENDER", "AGE", "MARITAL_STATUS", "PROFESSION", "prediction", "probability", "predictedLabel"],
  "values": [["M", 55, "Single", "Executive", 2.0, [0.22802353691317875, 0.23514743248158335, 0.28897028385330203, 0.2283936941791034, 0.01946505257283251], "Mountaineering Equipment"]]
},200,Map(Cache-Control -> Vector(private, no-cache, no-store, must-revalidate), Connection -> Vector(Keep-Alive), Content-Type -> Vector(application/json), Date -> Vector(Tue, 03 Oct 2017 07:43:49 GMT), Pragma -> Vector(no-cache), Server -> Vector(nginx/1.11.5), Status -> Vector(HTTP/1.1 200 OK), Transfer-Encoding -> Vector(chunked), X-Backside-Transport -> Vector(OK OK), X-Content-Type-Options -> Vector(nosniff), X-Frame-Options -> Vector(DENY), X-Global-Transaction-ID -> Vector(784822791), X-Xss-Protection -> Vector(1)))
