## Predicting Mortgage Default in Loan Marketplace

In this notebook you will learn how to build a predictive model with Spark machine learning API (SparkML) and deploy it for scoring in Machine Learning (ML).

This notebook walks you through these steps:

    - Build a model with SparkML API
    - Save the model in the ML repository
    - Create a Deployment in ML (via UI)
    - Test the model (via UI)

### Step 1: Install

In [None]:
# Check Python version. Make sure it is Python 3.6.x. 
import platform
print(platform.python_version())

In [None]:
# Uninstall the older Watson Machine Learning client 
!pip uninstall watson-machine-learning-client -y

# Install the WML client 
!pip install watson-machine-learning-client-V4

# Verify WLM Client version
!pip list | grep watson

In [None]:
!pip install findspark
!pip install pyspark==2.3.3

### Action: restart the kernel!

### Step 2: Authenticate

In [None]:
WML_CREDENTIALS = {
   "instance_id": "openshift",
   "url" : "https://zen-cpd-zen.apps.testcluster.demo.ibmcloud.com",
   "username":"admin",
   "password": "passw0rd",
   "version": "2.5.0"
}

In [None]:
#Enter the values for you database connection found under data virtualization 
dsn_url = "jdbc:db2://dv-server.zen.svc.cluster.local:32051/bigsql"   # e.g. "jdbc:db2://dv-server.zen.local:32051/bigsql"
dsn_uid = "user1022"                                                  # e.g. "user999"
dsn_pwd = "sw?#@lT_674MfPI5"                                          # e.g. "7dBZ3jWt9xN6$o0JiX!m"

### Step 3: Load data  --- Update the *dsn* values with your Data Virtualization credential 

In [None]:
import jaydebeapi, sys
import pandas as pd
import findspark
import pyspark
import json
from pyspark import SparkConf, SparkContext

In [None]:
connection_string=dsn_url
if (sys.version_info >= (3,0)):
    conn = jaydebeapi.connect("com.ibm.db2.jcc.DB2Driver", connection_string, [dsn_uid, dsn_pwd])
else:
    conn = jaydebeapi.connect("com.ibm.db2.jcc.DB2Driver", [connection_string, dsn_uid, dsn_pwd])

In [None]:
pdf = pd.read_sql("select * from USER999.MORTGAGE_JOIN_VIEW", con=conn)
pdf.head(5)

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
sdf = spark.createDataFrame(pdf)
sdf.show()

In [None]:
sdf.printSchema()

### Step 4: Build the Spark pipeline and the Random Forest model

In [None]:
# Split data into train and test datasets
train, test = sdf.randomSplit([0.8,0.2], seed=11)

MODEL_NAME = "MORTGAGE PREDICTION MODEL"
DEPLOYMENT_NAME = "MORTGAGE PREDICTION"

print("Number of records for training: " + str(train.count()))
print("Number of records for evaluation: " + str(test.count()))

sdf.printSchema()

The code below creates a Random Forest Classifier with Spark, setting up string indexers for the categorical features and the label column. Finally, this notebook creates a pipeline including the indexers and the model, and does an initial Area Under ROC evaluation of the model.


In [None]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer, IndexToString, VectorAssembler
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml import Pipeline, Model

# Prepare string variables so that they can be used by the decision tree algorithm
# StringIndexer encodes a string column of labels to a column of label indices
SI1 = StringIndexer(inputCol='APPLIED_ONLINE',outputCol='AppliedOnlineEncoded')
SI2 = StringIndexer(inputCol='RESIDENCE',outputCol='ResidenceEncoded')

In [None]:
labelIndexer = StringIndexer(inputCol='MORTGAGE_DEFAULT', outputCol='label').fit(sdf)

# Convert indexed labels back to original labels.
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel", labels=labelIndexer.labels)

# Pipelines API requires that input variables are passed in  a vector
assembler = VectorAssembler(inputCols=["AppliedOnlineEncoded", "ResidenceEncoded", "INCOME", "YRS_CURRENT_ADD", \
                                       "YRS_CURRENT_EMP", "NO_OF_CARDS", "CARD_DEBT", "CURRENT_LOANS", "LOAN_AMOUNT", \
                                       "SALE_PRICE", "LOCATION"], outputCol="features")

In [None]:
from pyspark.ml.classification import RandomForestClassifier
# instantiate the algorithm, take the default settings

#rf=RandomForestClassifier(labelCol="label", featuresCol="features")
rf=RandomForestClassifier(featuresCol="features")

pipeline = Pipeline(stages=[SI1,SI2,labelIndexer, assembler, rf, labelConverter])

# Build models
model = pipeline.fit(train)

In [None]:
predictions = model.transform(test)
evaluatorDT = BinaryClassificationEvaluator(rawPredictionCol="prediction",  metricName='areaUnderROC')
area_under_curve = evaluatorDT.evaluate(predictions)

evaluatorDT = BinaryClassificationEvaluator(rawPredictionCol="prediction",  metricName='areaUnderPR')
area_under_PR = evaluatorDT.evaluate(predictions)

#default evaluation is areaUnderROC
print("areaUnderROC = %g" % area_under_curve, "areaUnderPR = %g" % area_under_PR)

### Step 5: Score the test data set

In [None]:
results = model.transform(test)
results=results.select(results["MORTGAGE_DEFAULT"],results["label"],results["predictedLabel"],results["prediction"],results["probability"])
results.toPandas().head(6)

### Step 6: Model Evaluation

In [None]:
accuracy=results.filter(results.label == results.prediction).count() / float(results.count())
print('Accuracy = {:.2f}.'.format(accuracy))

In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Evaluate model
evaluator = BinaryClassificationEvaluator(rawPredictionCol="prediction", labelCol="label", metricName="areaUnderROC")
print('Area under ROC curve = {:.2f}.'.format(evaluator.evaluate(results)))

### Step 7: Set default space

In [None]:
from watson_machine_learning_client import WatsonMachineLearningAPIClient
import json
client = WatsonMachineLearningAPIClient(WML_CREDENTIALS)
client.repository.list_spaces()

### Action: Find out the GUID for space name 'MortgageDeploymentSpace' from above output.
###              In the following cell replace the GUID with one that you found above. 

In [None]:
# Example: client.set.default_space('b49e13e8-ec68-408d-84a1-957e28c154b1')
client.set.default_space('GUID')

### Step 8: Remove existing model and deployment

In [None]:
deployment_details = client.deployments.get_details()
for deployment in deployment_details['resources']:
    deployment_id = deployment['metadata']['guid']
    model_id = deployment['entity']['asset']['href'].split('/')[3].split('?')[0]
    if deployment['entity']['name'] == DEPLOYMENT_NAME:
        print('Deleting deployment id', deployment_id)
        client.deployments.delete(deployment_id)
        print('Deleting model id', model_id)
        client.repository.delete(model_id)
       
client.repository.list_models()

### Step 9: Save Model in ML repository

In [None]:
wml_models = client.repository.get_model_details()
model_id = None

for model_in in wml_models['resources']:
    if MODEL_NAME == model_in['entity']['name']:
        model_id = model_in['metadata']['guid']
        break

if model_id is None:
    print("Storing model ...")
    meta_props = {
        client.repository.ModelMetaNames.NAME: MODEL_NAME,
        client.repository.ModelMetaNames.TYPE: "mllib_2.3",
        client.repository.ModelMetaNames.RUNTIME_UID: "spark-mllib_2.3"
    }

    model_artifact = client.repository.store_model( model=model,
                                                pipeline=pipeline,
                                                meta_props=meta_props,
                                                training_data=train
                                                )
    model_id = client.repository.get_model_uid(model_artifact)
    print("Done")

In [None]:
model_id

### Step 10: Create Deployment

In [None]:
wml_deployments = client.deployments.get_details()
deployment_uid = None
for deployment in wml_deployments['resources']:
    if DEPLOYMENT_NAME == deployment['entity']['name']:
        deployment_uid = deployment['metadata']['guid']
        break

if deployment_uid is None:
    print("Deploying model...")        
    meta_props = {
        client.deployments.ConfigurationMetaNames.NAME: DEPLOYMENT_NAME,
        client.deployments.ConfigurationMetaNames.ONLINE: {}
    }
    deployment = client.deployments.create(artifact_uid=model_id, meta_props=meta_props)
    deployment_uid = client.deployments.get_uid(deployment)

print("Model id: {}".format(model_id))
print("Deployment id: {}".format(deployment_uid))

In [None]:
# Write the test data to a .csv so that we can later use it for Evaluation
writeCSV=test.toPandas()
writeCSV.to_csv('./MortgagePredictionModelEval.csv', sep=',', index=False)

### Step 9: Test Saved Model with Test UI

1. Save the notebook 
2. Test the deployed module from Analyze > Analytics deployment 

### Summary

You have finished working on this hands-on lab. In this notebook you created a model using SparkML API, deployed it in Machine Learning service for online (real time) scoring and tested it using a test client.

Created by Sanjit Chakraborty sanjitc@us.ibm.com Nov 8, 2019.