# In this Notebook we shall create a Machine Learning Model using Spark and Save the Model

In [None]:
!pip install --user watson_machine_learning_client==1.0.375
from IPython.core.display import HTML
HTML("<script>Jupyter.notebook.kernel.restart()</script>")



#### The key reason we are using PySpark here is to show how to create a Machine Learning Model using large volume of data that may not fit within the memory of single Python process. So we need distributed computing infrastructure for those cases. We are using Spark here as a distributed computing Infrastructure. PySpark is a library that helps using Spark's capability using Python as programming language.

#### First, we import PySpark.

In [5]:
import os
from pyspark.sql import SparkSession

In [6]:
spark = SparkSession\
    .builder\
    .appName("WOSM")\
    .getOrCreate()

#### Next, we read in a dataset that we will use to develop a Machine Learning model. Once we read in the data, it can potentially reside on multiple machines/VMs in a distributed fashion. 

#### We can read the data here in various ways. We are showing here two mechanisms - reading data from a CSV file and reading data from a Database.

#### If you are reading the data from CSV file, read the data you have stored in previous step using the same file name in the next cell. Otherwise skip the next cell.

In [7]:
cmergedDf = spark.read.csv(os.environ['DSX_PROJECT_DIR']+'/datasets/enhanced_customers_history.csv', header='true')
cmergedDf.show(5)

+---+------------+-------------+-----+-------+---------+-------------+--------------------+-----+--------+-----+------+------+--------+---------+--------+---------+
| ID|LONGDISTANCE|INTERNATIONAL|LOCAL|DROPPED|PAYMETHOD|LOCALBILLTYPE|LONGDISTANCEBILLTYPE|USAGE|RATEPLAN|CHURN|GENDER|STATUS|CHILDREN|ESTINCOME|CAROWNER|      AGE|
+---+------------+-------------+-----+-------+---------+-------------+--------------------+-----+--------+-----+------+------+--------+---------+--------+---------+
|  1|          23|            0|  206|      0|       CC|       Budget|      Intnl_discount|  229|       3|    T|     F|     S|       1|  38000.0|       N|24.393333|
|  6|          29|            0|   45|      0|       CH|    FreeLocal|            Standard|   75|       2|    F|     M|     M|       2|  29616.0|       N|49.426667|
|  8|          24|            0|   22|      0|       CC|    FreeLocal|            Standard|   47|       3|    F|     M|     M|       0|  19732.8|       N|50.673333|
| 11|     

#### If you are reading the data from the Database read the data from the same using the instruction provided in the Hands On Lab document. Otherwise skip the next 2 cells

In [8]:
# import dsx_core_utils, requests, os, io
# from pyspark.sql import SparkSession

# # Add asset from remote connection
# df83 = None
# dataSet = dsx_core_utils.get_remote_data_set_info('USER1009.CUSTOMER_MERGED_HISTORY_VIEW')
# dataSource = dsx_core_utils.get_data_source_info(dataSet['datasource'])
# sparkSession = SparkSession(sc).builder.getOrCreate()
# # Load JDBC data to Spark dataframe
# dbTableOrQuery = '"' + (dataSet['schema'] + '"."' if(len(dataSet['schema'].strip()) != 0) else '') + dataSet['table'] + '"'
# if (dataSet['query']):
#     dbTableOrQuery = "(" + dataSet['query'] + ") TBL"
# df83 = sparkSession.read.format("jdbc").option("url", dataSource['URL']).option("dbtable", dbTableOrQuery).option("user",dataSource['user']).option("password",dataSource['password']).load()
# df83.show(5)

#### In the previous cell in last line there would be show() command for the dataset that created. In the next cell change the name of the dataset to cmergedDf.

In [9]:
# Uncomment next line. Use the dataframe name used in the last line of previous cell (for the show() command) instead of df81. Then execute this cell.
#cmergedDf = df82

#### With this data we can create a Binary Classification model that can predict whether a person is a top KoL or not

In [10]:
cmergedDf.printSchema()

root
 |-- ID: string (nullable = true)
 |-- LONGDISTANCE: string (nullable = true)
 |-- INTERNATIONAL: string (nullable = true)
 |-- LOCAL: string (nullable = true)
 |-- DROPPED: string (nullable = true)
 |-- PAYMETHOD: string (nullable = true)
 |-- LOCALBILLTYPE: string (nullable = true)
 |-- LONGDISTANCEBILLTYPE: string (nullable = true)
 |-- USAGE: string (nullable = true)
 |-- RATEPLAN: string (nullable = true)
 |-- CHURN: string (nullable = true)
 |-- GENDER: string (nullable = true)
 |-- STATUS: string (nullable = true)
 |-- CHILDREN: string (nullable = true)
 |-- ESTINCOME: string (nullable = true)
 |-- CAROWNER: string (nullable = true)
 |-- AGE: string (nullable = true)



#### In next 2 lines we create a distributed temporary table/view out of the Dataset so that we can access the data easily using SQL syntax (through use of Spark SQL)

In [11]:
cmergedDf.createOrReplaceTempView("mergedt")

In [12]:
custDf = spark.sql("select CHURN, GENDER, STATUS, CAROWNER, PAYMETHOD, LOCALBILLTYPE, LONGDISTANCEBILLTYPE, "\
                   "ID, cast(CHILDREN as integer) as CHILDREN,"\
                   "cast(ESTINCOME as double) as ESTINCOME, cast(round(AGE,0) as double) as AGE, cast(DROPPED as integer) as DROPPED,"\
                   "cast(RATEPLAN as integer) as RATEPLAN, cast(ID as String) as str_id, "\
                   "cast(longdistance as double) as LONGDISTANCE, "\
                   "cast(international as double) as INTERNATIONAL, cast(local as double) as LOCAL, cast(usage as double) as USAGE, "\
                   "case when AGE < 1 then 'INFANT' when AGE < 18 then 'Child' else 'Adult' End as PHASE from mergedt").cache()

In [13]:
custDf.count()

1415

#### For developing the Binary Classification Model we want to use Spark ML Lib. Spark MLLib has implementations of various machine learning algorithms that can help creating model on Distributed datasets. Using Spark MLLib, one can create a model using dataset as big as needed for developing a real life Machine Learning model as long as the data is available in the Distributed dataset we created in previous step.

#### Next, we import components from Spark MLLib for developing the model using the data

#### We are importing Pipeline, as that helps building the model development steps as pipeline and the same can be applied to training dataset easily. Additionally we are also importing the VectorAssembler that helps in asembling the attributes (the ones we want to use as features of the model) as Vector.

In [14]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorIndexer, IndexToString
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import RandomForestClassifier

In [15]:
# Prepare string variables so that they can be used by the decision tree algorithm
# StringIndexer encodes a string column of labels to a column of label indices
SI1 = StringIndexer(inputCol='GENDER', outputCol='GenderEncoded')
SI2 = StringIndexer(inputCol='STATUS',outputCol='StatusEncoded')
SI3 = StringIndexer(inputCol='CAROWNER',outputCol='CarOwnerEncoded')
SI4 = StringIndexer(inputCol='PAYMETHOD',outputCol='PaymethodEncoded')
SI5 = StringIndexer(inputCol='LOCALBILLTYPE',outputCol='LocalBilltypeEncoded')
SI6 = StringIndexer(inputCol='LONGDISTANCEBILLTYPE',outputCol='LongDistanceBilltypeEncoded')
SI7 = StringIndexer(inputCol='PHASE',outputCol='PhaseEncoded')
S18 = StringIndexer(inputCol='CHURN', outputCol='label')

# Pipelines API requires that input variables are passed in  a vector
assembler = VectorAssembler(inputCols=["GenderEncoded", "StatusEncoded", "CarOwnerEncoded", "PaymethodEncoded", "LocalBilltypeEncoded", \
                                       "LongDistanceBilltypeEncoded", "PhaseEncoded", "CHILDREN", "ESTINCOME", "AGE", "LONGDISTANCE", "INTERNATIONAL", "LOCAL",\
                                      "DROPPED","USAGE","RATEPLAN"], outputCol="features")

In [16]:
labelIndexer = S18.fit(custDf)

In [17]:
# Convert indexed labels back to original labels.
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel", labels=labelIndexer.labels)

#### We will use the Random Forest algorithm for Binary Classification. One could also use another supervised learning algorithm such as Binary Treee or XG Boost.

In [18]:
# instantiate the algorithm, take the default settings
rf=RandomForestClassifier(labelCol="label", featuresCol="features")

#### Next, we define a pipeline with 2 stages. In first stage the features are assembled and in the next stage they are used to create a model using the Random Forest Classifier

In [19]:
pipeline = Pipeline(stages=[SI1,SI2,SI3,SI4,SI5,SI6,SI7,labelIndexer, assembler, rf, labelConverter])

In [20]:
# Split data into train and test datasets
train, test = custDf.randomSplit([0.7,0.3], seed=6)
train.cache()
test.cache()

DataFrame[CHURN: string, GENDER: string, STATUS: string, CAROWNER: string, PAYMETHOD: string, LOCALBILLTYPE: string, LONGDISTANCEBILLTYPE: string, ID: string, CHILDREN: int, ESTINCOME: double, AGE: double, DROPPED: int, RATEPLAN: int, str_id: string, LONGDISTANCE: double, INTERNATIONAL: double, LOCAL: double, USAGE: double, PHASE: string]

#### In next step the Model gets created when we apply the pipeline on the Training dataset. It returns the model.

In [21]:
# Build models
model = pipeline.fit(train)

#### Next, we test the model by using the Test dataset

In [22]:
model.transform(test)

DataFrame[CHURN: string, GENDER: string, STATUS: string, CAROWNER: string, PAYMETHOD: string, LOCALBILLTYPE: string, LONGDISTANCEBILLTYPE: string, ID: string, CHILDREN: int, ESTINCOME: double, AGE: double, DROPPED: int, RATEPLAN: int, str_id: string, LONGDISTANCE: double, INTERNATIONAL: double, LOCAL: double, USAGE: double, PHASE: string, GenderEncoded: double, StatusEncoded: double, CarOwnerEncoded: double, PaymethodEncoded: double, LocalBilltypeEncoded: double, LongDistanceBilltypeEncoded: double, PhaseEncoded: double, label: double, features: vector, rawPrediction: vector, probability: vector, prediction: double, predictedLabel: string]

In [23]:
results = model.transform(test)
results=results.select(results["ID"],results["CHURN"],results["label"],results["predictedLabel"],results["prediction"],results["probability"])
results.toPandas().head(6)

Unnamed: 0,ID,CHURN,label,predictedLabel,prediction,probability
0,2507,F,0.0,F,0.0,"[0.8653451539990605, 0.13465484600093944]"
1,3414,F,0.0,F,0.0,"[0.8127938153614898, 0.1872061846385103]"
2,587,F,0.0,F,0.0,"[0.8439980760753223, 0.15600192392467777]"
3,1872,F,0.0,F,0.0,"[0.7714502461131859, 0.22854975388681412]"
4,848,F,0.0,F,0.0,"[0.8088322794224062, 0.19116772057759382]"
5,1287,F,0.0,F,0.0,"[0.9253756926443769, 0.07462430735562309]"


In [24]:
print('Precision model = {:.2f}.'.format(results.filter(results.label == results.prediction).count() / float(results.count())))

Precision model = 0.89.


#### Next, we import  the BinaryClassificationEvaluator object from Spark ML Lib for evaluating the model

#### We evaluate the model using the area under the Receiver Operating Characteristic (ROC) curve.

In [25]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Evaluate model
evaluator = BinaryClassificationEvaluator(rawPredictionCol="prediction", labelCol="label", metricName="areaUnderROC")
print('Area under ROC curve = {:.2f}.'.format(evaluator.evaluate(results)))

Area under ROC curve = 0.88.


#### Now we save the Model using MMD infrastructure of Cloud Pak For Da. Please note the URL that is crated after the model is saved. Please use your name as a prefix to model's name.

In [23]:
from dsx_ml.ml import save

In [24]:
model_name = "Telco Churn Model 3"

In [25]:
save(name = model_name, model = model, algorithm_type = "Classification", test_data = test)

Using TensorFlow backend.


{'path': '/user-home/1001/DSX_Projects/custchurn/models/Telco Churn Model 3/2',
 'scoring_endpoint': 'https://dsxl-api/v3/project/score/Python36/spark-2.3/custchurn/Telco%20Churn%20Model%203/2'}

#### Save training data as it would be needed by Open Scale

In [32]:
jdbcuri = "jdbc:db2://10.187.215.33:31959/BLUDB"

properties = {
    "user": "user999",
    "password": "bGSgd%77k7VZ1**@",
    "driver": "com.ibm.db2.jcc.DB2Driver",
    "sslConnection":"false"
}

TABLE_NAME = "modeltrn2"

In [34]:
custDf.printSchema()

root
 |-- CHURN: string (nullable = true)
 |-- GENDER: string (nullable = true)
 |-- STATUS: string (nullable = true)
 |-- CAROWNER: string (nullable = true)
 |-- PAYMETHOD: string (nullable = true)
 |-- LOCALBILLTYPE: string (nullable = true)
 |-- LONGDISTANCEBILLTYPE: string (nullable = true)
 |-- ID: string (nullable = true)
 |-- CHILDREN: integer (nullable = true)
 |-- ESTINCOME: double (nullable = true)
 |-- AGE: double (nullable = true)
 |-- DROPPED: integer (nullable = true)
 |-- RATEPLAN: integer (nullable = true)
 |-- str_id: string (nullable = true)
 |-- LONGDISTANCE: double (nullable = true)
 |-- INTERNATIONAL: double (nullable = true)
 |-- LOCAL: double (nullable = true)
 |-- USAGE: double (nullable = true)
 |-- PHASE: string (nullable = false)



In [33]:
custDf.write.jdbc(url=jdbcuri, table=TABLE_NAME, mode="append", properties=properties)

#### Next few cells need to be run if you want to directly deploy the model in WML. Please use your name as prefix to the name of the Model before storing for uniqueness

In [1]:
from watson_machine_learning_client import WatsonMachineLearningAPIClient



In [2]:
wml_credentials = {
    "url": "<the url of cp4d without the port>"",
    "username": "<your user id with administrator permission>",
    "password": ">your password",
    "instance_id": "icp"               
}

wml_client = WatsonMachineLearningAPIClient(wml_credentials)

In [3]:
model_props = {
    wml_client.repository.ModelMetaNames.NAME: "churn-model-in-cp4d-wml",
    wml_client.repository.ModelMetaNames.EVALUATION_METHOD: "binary",
}

In [26]:
published_model_details = wml_client.repository.store_model(model=model, meta_props=model_props, training_data=train, pipeline=pipeline)

In [33]:
#published_model_details

In [39]:
wml_client.repository.list_models()

------------------------------------  ------------------------  ------------------------  -----------------
GUID                                  NAME                      CREATED                   FRAMEWORK
89b02fc5-ecff-4c57-b3e6-cf437243a94f  churn-model-in-cp4d-wml   2019-09-19T04:27:51.086Z  mllib-2.3
461e6896-2d71-4251-914b-8ee8b62c68e5  churn-model-in-cp4d-wml   2019-09-17T19:01:04.539Z  mllib-2.3
49fa0aa9-30d3-459e-a840-1cc8242b512b  GermanCreditRiskModelICP  2019-09-17T16:32:02.642Z  mllib-2.3
75d7afec-7148-45f3-ad67-96ea6fb45680  HouseCreditRiskModelICP   2019-09-17T15:58:17.452Z  scikit-learn-0.19
64c9f2c0-1374-42a6-ae08-53e32ddc0a29  GolfModelICP              2019-09-17T15:51:35.223Z  scikit-learn-0.19
2ddc52dd-fbdd-4085-bcc0-e42c6a6fde86  DrugSelectionModelICP     2019-09-17T15:31:32.637Z  mllib-2.3
------------------------------------  ------------------------  ------------------------  -----------------
