# Analysis & Prediction: Customer Likelihood to buy aPhone
- - -
- Build Machine Learning Model of Customer Likelihood to buy aPhone
- Publish & Deploy Model Locally or to Cloud-based Watson Machine Learning Service in Bluemix

## Initialize project and environment

In [1]:
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf

// MLRepositoryClient won't load remote models correctly without running in local mode
sc.stop()
val conf1 = new SparkConf().setAppName("spark_context").setMaster("local[*]")
val scl = new SparkContext(conf1)

<div class="alert alert-block alert-info"> Note: Only run the cell above when you run this notebook the first time after you create it, or whenever you restart the kernel. If you receive error about "Only one SparkContext may be running in this JVM", that is expected, restart kernel and rerun.  When re-running this notebook, select this cell and "Run All Below"</div> 

# Importing Brunel and ML Libraries

In [2]:
%AddJar -magic https://brunelvis.org/jar/spark-kernel-brunel-all-2.3.jar -f

Starting download from https://brunelvis.org/jar/spark-kernel-brunel-all-2.3.jar
Finished download of spark-kernel-brunel-all-2.3.jar


In [3]:
//import libraries
import org.apache.spark.{SparkConf, SparkContext, SparkFiles}
import org.apache.spark.sql.{SQLContext, SparkSession, Row}
import org.apache.spark.SparkFiles

import org.apache.spark.ml.feature.{StringIndexer, IndexToString, VectorIndexer, VectorAssembler}
import org.apache.spark.ml.regression.LinearRegression
import org.apache.spark.ml.classification.{LogisticRegression, DecisionTreeClassifier}
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.ml.{Pipeline, PipelineStage}

# Loading Data from CSV File

In [4]:
import org.apache.spark.sql.SQLContext
val sqlContext = new SQLContext(scl)

// Add data asset from file system
val phoneSales = sqlContext.read.format("csv").option("header", "true").option("inferSchema", "true").option("mode", "DROPMALFORMED").csv("../datasets/aPhone-Customers.csv")
phoneSales.show(5)


+------+--------+----------+----------+------+--------+------------+------------+------+
|Gender|AgeGroup| Education|Profession|Income|Switcher|LastPurchase|Annual_Spend|APhone|
+------+--------+----------+----------+------+--------+------------+------------+------+
|     F|   45-54|   Masters|    Lawyer|114073|       0|           3|      1211.0|   1.0|
|     M|   35-44|   Masters| Performer|186464|       0|           5|       382.0|   0.0|
|     M|   55-64|   Masters|    Doctor|237237|       0|           4|      1440.0|   1.0|
|     F|   18-24|HighSchool| Secretary| 18800|       1|           1|       546.0|   0.0|
|     F|   55-64| Bachelors|     Nurse| 61742|       0|           4|       741.0|   0.0|
+------+--------+----------+----------+------+--------+------------+------------+------+
only showing top 5 rows



In [5]:
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._ 

val toDouble = udf {x: Integer => if(x != null && x == 1) 1.0 else 0.0}
val aphoneIndicator = udf {phone: String => if(phone.equals("A-phone")) 1.0 else 0.0}

val phoneSalesData = phoneSales.select("GENDER", "AGEGROUP", "EDUCATION", "PROFESSION", "INCOME", "SWITCHER", "LASTPURCHASE", "ANNUAL_SPEND", "APHONE").
                           withColumn("label", toDouble(col("APHONE"))).drop("APHONE")

phoneSalesData.show(5)
phoneSalesData.printSchema()
phoneSalesData.count()


+------+--------+----------+----------+------+--------+------------+------------+-----+
|GENDER|AGEGROUP| EDUCATION|PROFESSION|INCOME|SWITCHER|LASTPURCHASE|ANNUAL_SPEND|label|
+------+--------+----------+----------+------+--------+------------+------------+-----+
|     F|   45-54|   Masters|    Lawyer|114073|       0|           3|      1211.0|  1.0|
|     M|   35-44|   Masters| Performer|186464|       0|           5|       382.0|  0.0|
|     M|   55-64|   Masters|    Doctor|237237|       0|           4|      1440.0|  1.0|
|     F|   18-24|HighSchool| Secretary| 18800|       1|           1|       546.0|  0.0|
|     F|   55-64| Bachelors|     Nurse| 61742|       0|           4|       741.0|  0.0|
+------+--------+----------+----------+------+--------+------------+------------+-----+
only showing top 5 rows

root
 |-- GENDER: string (nullable = true)
 |-- AGEGROUP: string (nullable = true)
 |-- EDUCATION: string (nullable = true)
 |-- PROFESSION: string (nullable = true)
 |-- INCOME: inte

8301

# Splitting into training and testing data sets

In [6]:
val train = 0.80
val test = 0.20

//Split the data into training data set, testing data set, and validation data set

val splits = phoneSalesData.randomSplit(Array(train, test), seed = 17L)

val trainingDF = splits(0)
val testDF = splits(1)

println("Training data set")
trainingDF.show(5)

println("Testing data set")
testDF.show(5)
println(testDF.count())

Training data set
+------+--------+---------+----------+------+--------+------------+------------+-----+
|GENDER|AGEGROUP|EDUCATION|PROFESSION|INCOME|SWITCHER|LASTPURCHASE|ANNUAL_SPEND|label|
+------+--------+---------+----------+------+--------+------------+------------+-----+
|     F|   18-24|Associate| Architect| 47131|       0|           2|      1417.0|  1.0|
|     F|   18-24|Associate|     Clerk|  8041|       0|           5|       573.0|  0.0|
|     F|   18-24|Associate|     Clerk| 10998|       1|           2|       746.0|  0.0|
|     F|   18-24|Associate|     Clerk| 14131|       1|           2|       381.0|  0.0|
|     F|   18-24|Associate|     Clerk| 21128|       0|           4|       582.0|  0.0|
+------+--------+---------+----------+------+--------+------------+------------+-----+
only showing top 5 rows

Testing data set
+------+--------+---------+----------+------+--------+------------+------------+-----+
|GENDER|AGEGROUP|EDUCATION|PROFESSION|INCOME|SWITCHER|LASTPURCHASE|ANN

# Building Features

In [7]:
import org.apache.spark.ml.feature.{StringIndexer, IndexToString, VectorIndexer, VectorAssembler}

//Feature definition
val genderIndexer = new StringIndexer().setInputCol("GENDER").setOutputCol("GENDER_CODE")
val ageGroupIndexer = new StringIndexer().setInputCol("AGEGROUP").setOutputCol("AGE_GROUP_CODE")
val professionIndexer = new StringIndexer().setInputCol("PROFESSION").setOutputCol("PROFESSION_CODE")
val educationIndexer = new StringIndexer().setInputCol("EDUCATION").setOutputCol("EDUCATION_CODE")

val featuresAssembler = new VectorAssembler().setInputCols(Array("INCOME",
                                                                 "GENDER_CODE",
                                                                 "AGE_GROUP_CODE",
                                                                 "PROFESSION_CODE", 
                                                                 "EDUCATION_CODE",
                                                                 "ANNUAL_SPEND",
                                                                 "SWITCHER",
                                                                 "LASTPURCHASE")).setOutputCol("features")

# Assembling a pipeline with logistic regression model

In [8]:
import org.apache.spark.ml.classification.{LogisticRegression, DecisionTreeClassifier}

//Using Logistic Regression
val lr = new LogisticRegression().setRegParam(0.01).setLabelCol("label").setFeaturesCol("features")


In [9]:
import org.apache.spark.ml.{Pipeline, PipelineStage}


//Cognitive Assistant for Data Scientists - predict model performance based on sampled data
val pipeline = new Pipeline().setStages(Array(genderIndexer, 
                                              ageGroupIndexer, 
                                              professionIndexer,
                                              educationIndexer,
                                              featuresAssembler,
                                              lr))
val newModel = pipeline.fit(trainingDF)


# Using DataFrame-based API for assessing performance

In [10]:
import org.apache.spark.ml.classification.{BinaryLogisticRegressionSummary, LogisticRegressionModel}

// Extract the summary from the LogisticRegressionModel instance 
val lrModel = newModel.stages(5).asInstanceOf[LogisticRegressionModel]
val trainingSummary = lrModel.summary

// Metrics for binary classification
val binarySummary = trainingSummary.asInstanceOf[BinaryLogisticRegressionSummary]

// The receiver-operating characteristic curve and area under the ROC curve
val roc = binarySummary.roc
println(s"areaUnderROC: ${binarySummary.areaUnderROC}")

// Example of optimizing logistic regression model to maximize F-measure
val fMeasure = binarySummary.fMeasureByThreshold
fMeasure.show(5)
val maxFMeasure = fMeasure.select(max("F-Measure")).head().getDouble(0)
val bestThreshold = fMeasure.where(fMeasure("F-Measure") === maxFMeasure).
  select("threshold").head().getDouble(0)
println(s"best threshold:${bestThreshold}")

val updatedModel = lrModel.setThreshold(bestThreshold)     

areaUnderROC: 0.9629843009364488
+------------------+-------------------+
|         threshold|          F-Measure|
+------------------+-------------------+
|0.9884018841317078| 0.0684326710816777|
|0.9730702953731861|0.12886048988285412|
|0.9663727171800365|0.18415637860082304|
|0.9588561238477399|0.23582089552238805|
| 0.952242262751719|0.27745664739884396|
+------------------+-------------------+
only showing top 5 rows

best threshold:0.32415690502632283


## ROC curve based on training data

In [11]:
%%brunel data('roc') x(FPR) y(TPR) line tooltip(#all) axes(x:'False Positive Rate':grid, y:'True Positive Rate':grid) title('ROC') 

# Using RDD-based API for assessing performance

In [12]:
import org.apache.spark.mllib.evaluation.MulticlassMetrics
import org.apache.spark.mllib.regression.LabeledPoint


val testDFWithPredictions = newModel.transform(testDF)
testDFWithPredictions.select("features", "rawPrediction", "probability", "prediction").show(5)

// Compute raw scores on the test set
val predictionAndLabels = testDFWithPredictions.select("prediction", "label").
                                                rdd.map { row =>
  (row(0).asInstanceOf[Double], row(1).asInstanceOf[Double])
}

println("Prediction and the original labels")
predictionAndLabels.take(5).foreach(println)


+--------------------+--------------------+--------------------+----------+
|            features|       rawPrediction|         probability|prediction|
+--------------------+--------------------+--------------------+----------+
|[14219.0,1.0,5.0,...|[2.96268184396044...|[0.95085945671341...|       0.0|
|[45306.0,1.0,5.0,...|[-0.2671103472660...|[0.43361663763690...|       1.0|
|[15592.0,1.0,5.0,...|[1.59499556345547...|[0.83131778347299...|       0.0|
|[9890.0,1.0,5.0,8...|[1.36018434894123...|[0.79578965761543...|       0.0|
|[10138.0,1.0,5.0,...|[2.63844486284414...|[0.93329521417407...|       0.0|
+--------------------+--------------------+--------------------+----------+
only showing top 5 rows

Prediction and the original labels
(0.0,0.0)
(1.0,1.0)
(0.0,0.0)
(0.0,0.0)
(0.0,0.0)


## ROC curve based on test data

In [13]:
val testData = testDFWithPredictions.drop("prediction", "rawPrediction", "probability")
val trainingSummary = lrModel.evaluate(testData)
val binarySummary = trainingSummary.asInstanceOf[BinaryLogisticRegressionSummary]

// The receiver-operating characteristic curve and area under the ROC curve
val rocOnTestData = binarySummary.roc
println(s"areaUnderROC: ${binarySummary.areaUnderROC}")

// The threshold to maximize F-Measure
val fMeasure = binarySummary.fMeasureByThreshold
val maxFMeasure = fMeasure.select(max("F-Measure")).head().getDouble(0)
val bestThreshold = fMeasure.where(fMeasure("F-Measure") === maxFMeasure).
  select("threshold").head().getDouble(0)
println(s"best threashold:${bestThreshold}")

areaUnderROC: 0.9597124600638979
best threashold:0.2589596552451625


In [14]:
%%brunel data('rocOnTestData') x(FPR) y(TPR) line tooltip(#all) axes(x:'False Positive Rate':grid, y:'True Positive Rate':grid) title('ROC') 

## Metrics (RDD-based API)

In [15]:
val dtTestDFWithPredictions = newModel.transform(testDF)

// Compute raw scores on the test set
val dtPredictionAndLabels = dtTestDFWithPredictions.select("prediction", "label").
                                                rdd.map { row =>
  (row(0).asInstanceOf[Double], row(1).asInstanceOf[Double])
}


In [16]:
import org.apache.spark.mllib.evaluation.MulticlassMetrics

// Instantiate metrics object
val dtMetrics = new MulticlassMetrics(dtPredictionAndLabels)

// Confusion matrix
println("Confusion matrix:")
println(dtMetrics.confusionMatrix)

Confusion matrix:
1173.0  79.0   
49.0    351.0  


In [17]:
val dtAccuracy = dtMetrics.accuracy
println("Statistics for the model")
println(s"Accuracy = $dtAccuracy")

// Precision by label
val dtLabels = dtMetrics.labels
dtLabels.foreach { label =>
  println(s"Precision($label) = " + dtMetrics.precision(label))
}

// Recall by label
dtLabels.foreach { label =>
  println(s"Recall($label) = " + dtMetrics.recall(label))
}

// False positive rate by label
dtLabels.foreach { label =>
  println(s"FPR($label) = " + dtMetrics.falsePositiveRate(label))
}

// F-measure by label
dtLabels.foreach { label =>
  println(s"F1-Score($label) = " + dtMetrics.fMeasure(label))
}

// Weighted statistics
println(s"Weighted precision: ${dtMetrics.weightedPrecision}")
println(s"Weighted recall: ${dtMetrics.weightedRecall}")
println(s"Weighted F1 score: ${dtMetrics.weightedFMeasure}")
println(s"Weighted false positive rate: ${dtMetrics.weightedFalsePositiveRate}")

Statistics for the model
Accuracy = 0.9225181598062954
Precision(0.0) = 0.9599018003273322
Precision(1.0) = 0.8162790697674419
Recall(0.0) = 0.9369009584664537
Recall(1.0) = 0.8775
FPR(0.0) = 0.1225
FPR(1.0) = 0.06309904153354633
F1-Score(0.0) = 0.9482619240097009
F1-Score(1.0) = 0.8457831325301205
Weighted precision: 0.9251263207728793
Weighted recall: 0.9225181598062955
Weighted F1 score: 0.9234486573076233
Weighted false positive rate: 0.10811720133984173


# Publish and create online deployments of model to both DSX Local and to WML service in Bluemix Cloud

## Publish to Cloud:  Save model to WML service in Bluemix Cloud
*Publish and Deploy code based on the following documentation and examples:*

https://apsportal.ibm.com/analytics/notebooks/c8652d2c-bfc9-4354-8168-f1c9f7f8dfc2/view?access_token=02a83fea8450a452c8de76af98dae078459d0f56810ddef4f4c62d5bc4fc72cf&cm_mc_uid=40662902271614933277870&cm_mc_sid_50200000=1503946544)

In [18]:
// WML client library

import com.ibm.analytics.ngp.repository._

// Helper libraries

import scalaj.http.{Http, HttpOptions}
import scala.util.{Success, Failure}
import java.util.Base64
import java.nio.charset.StandardCharsets
import play.api.libs.json._

// Authenticate to Watson Machine Learning service on Bluemix.
// These values come from "Service Credentials" tab in Bluemix WML service
val service_path = "https://ibm-watson-ml.mybluemix.net"
val instance_id = "***"
val username = "***"
val password = "***"


val client = MLRepositoryClient(service_path)
client.authorize(username, password)

// Save model
val model_artifact = MLRepositoryArtifact(newModel, trainingDF, "From DSX Local: aPhone notebook-based ML Model")
val saved_model = client.models.save(model_artifact).get


SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.


<div class="alert alert-block alert-info"> Note: Class loading errors from logger can be ignored</div> 

## Deploy in Cloud:  Create online deployment of published model in WML Bluemix Service

In [19]:
// Reload saved model
val model_version_href = saved_model.meta.prop("modelVersionHref").get
val loaded_model_artifact = client.models.version(model_version_href).get

// Get WML service instance token
val wml_auth_header = "Basic " + Base64.getEncoder.encodeToString((username + ":" + password).getBytes(StandardCharsets.UTF_8))
val wml_url = service_path + "/v3/identity/token"
val wml_response = Http(wml_url).header("Authorization", wml_auth_header).asString
val wmltoken_json: JsValue = Json.parse(wml_response.body)

val wmltoken = (wmltoken_json \ "token").asOpt[String] match {
    case Some(x) => x
    case None => ""
}
wmltoken

// Get published_models url from instance details
val endpoint_instance = service_path + "/v3/wml_instances/" + instance_id
val wml_response_instance = Http(endpoint_instance).header("Content-Type", "application/json").header("Authorization", "Bearer " + wmltoken).option(HttpOptions.connTimeout(10000)).option(HttpOptions.readTimeout(50000)).asString
wml_response_instance
val published_models_json: JsValue = Json.parse(wml_response_instance.body)
val published_models_url = (((published_models_json \ "entity") \\ "published_models")(0) \ "url").as[JsString].value
published_models_url

//Get list of published models
val wml_models = Http(published_models_url).header("Content-Type", "application/json").header("Authorization", "Bearer " + wmltoken).option(HttpOptions.connTimeout(10000)).option(HttpOptions.readTimeout(50000)).asString
wml_models
var deployment_endpoint: String = _
wml_models.body.split("\"").map{ s => {if ((s contains "deployments") & (s contains saved_model.uid.mkString)) {deployment_endpoint = s}}}
deployment_endpoint

//Create online deployment for published model
val payload_name = "From DSX Local:  Deployed aPhone notebook-based ML Model"
val payload_data_online = Json.stringify(Json.toJson(Map("type" -> "online", "name" -> payload_name)))
val response_online = Http(deployment_endpoint).postData(payload_data_online).header("Content-Type", "application/json").header("Authorization", "Bearer " + wmltoken).option(HttpOptions.connTimeout(50000)).option(HttpOptions.readTimeout(50000)).asString
print (response_online)


HttpResponse({"metadata":{"guid":"2be0035b-771b-4279-bea6-6798a2ea3f07","url":"https://ibm-watson-ml.mybluemix.net/v3/wml_instances/fc65ecd2-a080-4f94-a7b8-40541f3fbc5c/published_models/7611f2d1-93c7-4825-8ab3-4be080eeee56/deployments/2be0035b-771b-4279-bea6-6798a2ea3f07","created_at":"2017-09-28T18:17:37.981Z","modified_at":"2017-09-28T18:17:39.206Z"},"entity":{"runtime_environment":"spark-2.0","name":"From DSX Local:  Deployed aPhone notebook-based ML Model","scoring_url":"https://ibm-watson-ml.mybluemix.net/v3/wml_instances/fc65ecd2-a080-4f94-a7b8-40541f3fbc5c/published_models/7611f2d1-93c7-4825-8ab3-4be080eeee56/deployments/2be0035b-771b-4279-bea6-6798a2ea3f07/online","published_model":{"author":{},"name":"From DSX Local: aPhone notebook-based ML Model","url":"https://ibm-watson-ml.mybluemix.net/v3/wml_instances/fc65ecd2-a080-4f94-a7b8-40541f3fbc5c/published_models/7611f2d1-93c7-4825-8ab3-4be080eeee56","guid":"7611f2d1-93c7-4825-8ab3-4be080eeee56","created_at":"2017-09-28T18:17:37.

# Publish Locally - Save model to DSX Local ML service

In [20]:

//DSX Local Machine Learning - Use Repository service to save model.

import com.ibm.analytics.ngp.repository.MLRepositoryClient
import com.ibm.analytics.ngp.repository.MLRepositoryArtifact

val dsxl_repository_client = MLRepositoryClient()
val model_artifact = MLRepositoryArtifact(newModel, trainingDF, "From DSX Local: aPhone notebook-based ML Model")

//Add creater information for model
val meta_with_author = model_artifact.meta.add("authorName", "Demo User");
val mutableArtifact = MLRepositoryArtifact.mutableModelArtifact(model_artifact);
val new_artifact = mutableArtifact.mutate(newModel, meta_with_author);

val saved_model = dsxl_repository_client.models.save(new_artifact).get


## Deploy Locally:  Create online deployment of published model in DSX Local ML Service

In [21]:
//DSX Local Machine Learning - Use Deployment service

import play.api.libs.json._
import scalaj.http.{Http, HttpOptions}

val model_version_href = saved_model.meta.prop("modelVersionHref").get
val loaded_model_artifact = dsxl_repository_client.models.version(model_version_href).get

val payload_artifactVersionHref = loaded_model_artifact.meta.prop("modelVersionHref").get
val payload_name = "From DSX Local: Deployed aPhone notebook-based ML Model"
val payload_data_online = Json.stringify(Json.toJson(Map("artifactVersionHref" -> payload_artifactVersionHref, "name" -> payload_name)))

val service_path = "https://internal-nginx-svc.ibm-private-cloud.svc.cluster.local:12443"
val online_path = service_path + "/v2/deployments"

val response_online = Http(online_path).postData(payload_data_online).header("Content-Type", "application/json").option(HttpOptions.connTimeout(10000)).option(HttpOptions.readTimeout(50000)).asString

print (response_online)

HttpResponse({"metadata":{"guid":"81f751ec-d70e-49fc-b211-10ae34779eab","href":"https://internal-nginx-svc.ibm-private-cloud.svc.cluster.local:12443/v2/deployments/81f751ec-d70e-49fc-b211-10ae34779eab","createdAt":"2017-09-28T18:18:09.589Z"},"entity":{"name":"From DSX Local: Deployed aPhone notebook-based ML Model","artifactVersion":{"guid":"1c587273-6375-4117-a934-fd8f53e04dc9","href":"https://internal-nginx-svc.ibm-private-cloud.svc.cluster.local:12443/v2/artifacts/models/f363d532-8ea9-43aa-aa9f-0644bdebc368/versions/1c587273-6375-4117-a934-fd8f53e04dc9"},"predictionEndpoints":{"online":"https://internal-nginx-svc.ibm-private-cloud.svc.cluster.local:12443/v2/scoring/online/81f751ec-d70e-49fc-b211-10ae34779eab","stream":"https://internal-nginx-svc.ibm-private-cloud.svc.cluster.local:12443/v2/deployments/81f751ec-d70e-49fc-b211-10ae34779eab/streams","batch":"https://internal-nginx-svc.ibm-private-cloud.svc.cluster.local:12443/v2/deployments/81f751ec-d70e-49fc-b211-10ae34779eab/batch_jo

Developed by Analytics Technology Acceleration Team, IBM Analytics