# ML Notebook for Banking Churn Model

## Initializing project and environment

In [None]:
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf

sc.stop()
val conf1 = new SparkConf().setAppName("spark_context").setMaster("local[*]")
val scl = new SparkContext(conf1)

<div class="alert alert-block alert-info"> Note: Only run the cell above when you run this notebook the first time after you create it, or whenever you restart the kernel. If there is error about "Only one SparkContext may be running in this JVM", that is expected.</div> 

## Importing Brunel and ML Libraries

In [None]:
%AddJar -magic https://brunelvis.org/jar/spark-kernel-brunel-all-2.3.jar -f

In [None]:
//import libraries
import org.apache.spark.{SparkConf, SparkContext, SparkFiles}
import org.apache.spark.sql.{SQLContext, SparkSession, Row}
import org.apache.spark.SparkFiles

import org.apache.spark.ml.feature.{StringIndexer, IndexToString, VectorIndexer, VectorAssembler}
import org.apache.spark.ml.regression.LinearRegression
import org.apache.spark.ml.classification.{LogisticRegression, DecisionTreeClassifier}
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.ml.{Pipeline, PipelineStage}
import org.apache.spark.ml.ibm.transformers.RenameColumn

import com.ibm.analytics.ngp.ingest.Sampling
//import com.ibm.analytics.ngp.pipeline._
import com.ibm.analytics.ngp.util._
import com.ibm.analytics.ngp.pipeline.evaluate.{Evaluator,MLProblemType}

import com.ibm.analytics.wml.{Learner, Target}
import com.ibm.analytics.wml.cads.CADSEstimator

# Loading Data from HortonWorks Connection

In [None]:
// TODO:  Insert "cust_summary_notebook_training" remote data set as Spark DataFrame
// TODO:  Change the automatically inserted code to use "scl" instead of "sc"



In [None]:
// TODO: Rename the dataframe in the statement below to match the dataframe automatically inserted above
val churnDataRaw = df1


import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._ 

val toDouble = udf {churn: Boolean => (if(churn) 1.0 else 0.0)}

val churnData = churnDataRaw.select("AGE", "ACTIVITY", "EDUCATION", "GENDER", "STATE", "NEGTWEETS", "INCOME", "CHURN").
                             withColumn("label", toDouble(churnDataRaw.col("CHURN"))).
                             drop("CHURN")
churnData.show(5)

In [None]:
val train = 70
val test = 15
val validate = 15

//Split the data into training data set, testing data set, and validation data set

val splits = Sampling.trainingSplit(churnData, train, test, validate)

val trainingDF = splits._1
val testDF = splits._2
val validationDF = splits._3

println("Training data set")
trainingDF.show(5)

println("Testing data set")
testDF.show(5)

println("Validation data set")
validationDF.show(5)

# Building and Evaluating LR model

In [None]:
//Feature definition

val genderIndexer = new StringIndexer().setInputCol("GENDER").setOutputCol("gender_code")
val stateIndexer = new StringIndexer().setInputCol("STATE").setOutputCol("state_code")
val educationIndexer = new StringIndexer().setInputCol("EDUCATION").setOutputCol("education_code")

val featuresAssembler = new VectorAssembler().setInputCols(Array("AGE", 
                                                         "ACTIVITY", 
                                                         "education_code", 
                                                         "NEGTWEETS", 
                                                         "INCOME",
                                                         "gender_code",
                                                         "state_code")).setOutputCol("features")

In [None]:
//Logistic Regression
val lr = new LogisticRegression().setRegParam(0.01).setLabelCol("label").setFeaturesCol("features")

In [None]:
import org.apache.spark.ml.{Pipeline, PipelineStage}

val pipeline = new Pipeline().setStages(Array(genderIndexer, 
                                              stateIndexer, 
                                              educationIndexer,
                                              featuresAssembler,
                                              lr))
val newModel = pipeline.fit(trainingDF)

In [None]:
import org.apache.spark.ml.classification.{BinaryLogisticRegressionSummary, LogisticRegressionModel}

// Extract the summary from the LogisticRegressionModel instance 
val lrModel = newModel.stages(4).asInstanceOf[LogisticRegressionModel]

In [None]:
import org.apache.spark.mllib.evaluation.MulticlassMetrics
import org.apache.spark.mllib.regression.LabeledPoint

val testDFWithPredictions = newModel.transform(testDF)
val testData = testDFWithPredictions.drop("prediction", "rawPrediction", "probability")
val trainingSummary = lrModel.evaluate(testData)
val binarySummary = trainingSummary.asInstanceOf[BinaryLogisticRegressionSummary]

// The ROC curve and area under the ROC curve on test data
val rocOnTestData = binarySummary.roc
println(s"Area under ROC curve for the initial model: ${binarySummary.areaUnderROC}")

## Displaying the evaluation results - ROC curve with Brunel

In [None]:
%%brunel data('rocOnTestData') x(FPR) y(TPR) line tooltip(#all) axes(x:'False Positive Rate':grid, y:'True Positive Rate':grid) title('ROC')

#  Publish Locally - Use repository service to save model

In [None]:
// TODO:  Rename two values in code below to replace the label TODO_CHANGE_TO_TEAMNAME with your lab team's name 

// DSX Local Machine Learning - Use Repository service to save model.

import com.ibm.analytics.ngp.repository.MLRepositoryClient
import com.ibm.analytics.ngp.repository.MLRepositoryArtifact

val ml_repository_client = MLRepositoryClient()
val model_artifact = MLRepositoryArtifact(newModel, trainingDF, "TODO_CHANGE_TO_TEAMNAME Banking Churn Notebook Model(LR)")

//Add creater information for model
val meta_with_author = model_artifact.meta.add("authorName", "TODO_CHANGE_TO_TEAMNAME");
val mutableArtifact = MLRepositoryArtifact.mutableModelArtifact(model_artifact);
val new_artifact = mutableArtifact.mutate(newModel, meta_with_author);

val saved_model = ml_repository_client.models.save(new_artifact).get

## Deploy Locally:  Create online deployment of published model in DSX Local ML Service

In [None]:
// TODO:  Rename one value in code below to include your lab team's name, look for TODO_CHANGE_TO_TEAMNAME

//DSX Local Machine Learning - Use Deployment service

import play.api.libs.json._
import scalaj.http.{Http, HttpOptions}

val model_version_href = saved_model.meta.prop("modelVersionHref").get
val loaded_model_artifact = ml_repository_client.models.version(model_version_href).get
val payload_artifactVersionHref = loaded_model_artifact.meta.prop("modelVersionHref").get

val payload_name = "TODO_CHANGE_TO_TEAMNAME Banking Churn Notebook Model (LR) Deployment"

val payload_data_online = Json.stringify(Json.toJson(Map("artifactVersionHref" -> payload_artifactVersionHref, "name" -> payload_name)))
val service_path = "https://internal-nginx-svc.ibm-private-cloud.svc.cluster.local:12443"
val online_path = service_path + "/v2/deployments"

val response_online = Http(online_path).postData(payload_data_online).header("Content-Type", "application/json").option(HttpOptions.connTimeout(10000)).option(HttpOptions.readTimeout(50000)).asString

print (response_online)

## Use DSX Local scoring service to classify data

In [None]:
val scoring_href_json: JsValue = Json.parse(response_online.body)
val scoring_href = (scoring_href_json \ "entity" \ "predictionEndpoints" \ "online").asOpt[String] match {
    case Some(x) => x
    case None => ""
}

print(scoring_href)

In [None]:
val fields_json =  Json.toJson(List(Json.toJson("AGE"), 
                                    Json.toJson("ACTIVITY"), 
                                    Json.toJson("EDUCATION"), 
                                    Json.toJson("GENDER"), 
                                    Json.toJson("STATE"),
                                    Json.toJson("NEGTWEETS"), 
                                    Json.toJson("INCOME")))
val record_json = Json.toJson(List(Json.toJson(List(Json.toJson(23), 
                                                    Json.toJson(3), 
                                                    Json.toJson("Masters degree"), 
                                                    Json.toJson("M"), 
                                                    Json.toJson("NY"), 
                                                    Json.toJson(7), 
                                                    Json.toJson(878657)))))
val json_map = Json.toJson(Map("fields" -> fields_json, "records" -> record_json))
print(json_map)

In [None]:
val payload_scoring = Json.stringify(json_map)
val response_scoring = Http(scoring_href).postData(payload_scoring).header("Content-Type", "application/json").option(HttpOptions.method("POST")).asString
print(response_scoring)

## * Optional - Remaining steps require WML service and credentials to Publish, Deploy, and Score using WML Cloud Service *  
*Code below is based on the following documentation and examples:*

https://apsportal.ibm.com/analytics/notebooks/c8652d2c-bfc9-4354-8168-f1c9f7f8dfc2/view?access_token=02a83fea8450a452c8de76af98dae078459d0f56810ddef4f4c62d5bc4fc72cf&cm_mc_uid=40662902271614933277870&cm_mc_sid_50200000=1503946544)

## First step:  Save model to WML service in IBM Cloud

In [None]:
// TODO:  This lab step is optional, do not run this cell unless you have WML credentials and have updated the credentials below.

// WML client library
import com.ibm.analytics.ngp.repository.MLRepositoryClient
import com.ibm.analytics.ngp.repository.MLRepositoryArtifact

// Helper libraries

import scalaj.http.{Http, HttpOptions}
import scala.util.{Success, Failure}
import java.util.Base64
import java.nio.charset.StandardCharsets
import play.api.libs.json._

// Authenticate to Watson Machine Learning service on IBM Cloud.  These values come from "Service Credentials" tab in IBM Cloud WML service
val service_path = "https://ibm-watson-ml.mybluemix.net"

// Your Team's WML Creds - update these credentials with your personal WML credentials, **the values below are not valid**
val instance_id = "4de7exyz-6af1-4f1e-8aaf-561a6eb44xyz"
val username = "8bdcaxyz-b2c9-4579-bb95-ec552d050xyz"
val password = "8182bxyz-bba1-4154-919c-4bc6113f6xyz"

val client = MLRepositoryClient(service_path)
client.authorize(username, password)

// Save model
val model_artifact = MLRepositoryArtifact(newModel, trainingDF, "From DSX Local: Banking Churn Notebook Model(LR)")
val saved_model = client.models.save(model_artifact).get

## *(Optional - requires WML service and credentials)* Deploy in Cloud:  Create online deployment of published model in IBM Cloud WML Service

In [None]:
// TODO:  This lab step is optional, do not run this cell unless you have WML credentials and have updated the WML credentials in the cell above.

// Reload saved model
val model_version_href = saved_model.meta.prop("modelVersionHref").get
val loaded_model_artifact = client.models.version(model_version_href).get

// Get WML service instance token
val wml_auth_header = "Basic " + Base64.getEncoder.encodeToString((username + ":" + password).getBytes(StandardCharsets.UTF_8))
val wml_url = service_path + "/v3/identity/token"
val wml_response = Http(wml_url).header("Authorization", wml_auth_header).asString
val wmltoken_json: JsValue = Json.parse(wml_response.body)

val wmltoken = (wmltoken_json \ "token").asOpt[String] match {
    case Some(x) => x
    case None => ""
}
wmltoken

// Get published_models url from instance details
val endpoint_instance = service_path + "/v3/wml_instances/" + instance_id
val wml_response_instance = Http(endpoint_instance).header("Content-Type", "application/json").header("Authorization", "Bearer " + wmltoken).option(HttpOptions.connTimeout(10000)).option(HttpOptions.readTimeout(50000)).asString
wml_response_instance
val published_models_json: JsValue = Json.parse(wml_response_instance.body)
val published_models_url = (((published_models_json \ "entity") \\ "published_models")(0) \ "url").as[JsString].value
published_models_url

//Get list of published models
val wml_models = Http(published_models_url).header("Content-Type", "application/json").header("Authorization", "Bearer " + wmltoken).option(HttpOptions.connTimeout(10000)).option(HttpOptions.readTimeout(50000)).asString
wml_models
var deployment_endpoint: String = _
wml_models.body.split("\"").map{ s => {if ((s contains "deployments") & (s contains saved_model.uid.mkString)) {deployment_endpoint = s}}}
deployment_endpoint

//Create online deployment for published model
val payload_name = "From DSX Local:  Deployed Banking Churn Notebook Model(LR)"
val payload_data_online = Json.stringify(Json.toJson(Map("type" -> "online", "name" -> payload_name)))
val response_online = Http(deployment_endpoint).postData(payload_data_online).header("Content-Type", "application/json").header("Authorization", "Bearer " + wmltoken).option(HttpOptions.connTimeout(50000)).option(HttpOptions.readTimeout(50000)).asString
print (response_online)

## *(Optional - requires WML service and credentials)* Use IBM Cloud WML scoring service to classify data

In [None]:
// TODO:  This lab step is optional, do not run this cell unless you have WML credentials and have updated the WML credentials in the cell above.
val scoring_href_json: JsValue = Json.parse(response_online.body)

val scoring_href = (scoring_href_json \ "entity" \ "scoring_url").asOpt[String] match {
    case Some(x) => x
    case None => ""
}

print(scoring_href)

In [None]:
// TODO:  This lab step is optional, do not run this cell unless you have WML credentials and have updated the WML credentials in the cell above.
val fields_json =  Json.toJson(List(Json.toJson("AGE"), 
                                    Json.toJson("ACTIVITY"), 
                                    Json.toJson("EDUCATION"), 
                                    Json.toJson("GENDER"), 
                                    Json.toJson("STATE"),
                                    Json.toJson("NEGTWEETS"), 
                                    Json.toJson("INCOME")))
val record_json = Json.toJson(List(Json.toJson(List(Json.toJson(23), 
                                                    Json.toJson(3), 
                                                    Json.toJson("Masters degree"), 
                                                    Json.toJson("M"), 
                                                    Json.toJson("NY"), 
                                                    Json.toJson(7), 
                                                    Json.toJson(878657)))))
val json_map = Json.toJson(Map("fields" -> fields_json, "values" -> record_json))
print(json_map)

In [None]:
// TODO:  This lab step is optional, do not run this cell unless you have WML credentials and have updated the WML credentials in the cell above.
val payload_scoring = Json.stringify(json_map)
val response_scoring = Http(scoring_href).postData(payload_scoring).header("Content-Type", "application/json").header("Authorization", "Bearer " + wmltoken).option(HttpOptions.method("POST")).asString
print(response_scoring)

Developed by Alexander Petrov, Matt Walli, Analytics Technology Acceleration Team, IBM Analytics