# Use Spark to predict credit risk with `ibm-watson-machine-learning`

This notebook introduces commands for model persistance to Watson Machine Learning repository, model deployment, and scoring.

Some familiarity with Python is helpful. This notebook uses Python 3.6 and Apache® Spark 2.4.

You will use **German Credit Risk** dataset.


## Learning goals

The learning goals of this notebook are:

-  Load a CSV file into an Apache® Spark DataFrame.
-  Explore data.
-  Prepare data for training and evaluation.
-  Persist a pipeline and model in Watson Machine Learning repository from tar.gz files.
-  Deploy a model for online scoring using Wastson Machine Learning API.
-  Score sample scoring data using the Watson Machine Learning API.
-  Explore and visualize prediction result using the plotly package.


## Contents

This notebook contains the following parts:

1.	[Set up](#setup)
2.	[Load and explore data](#load)
3.	[Persist model](#upload)
4.	[Predict locally](#visualization)
5.	[Deploy and score](#deploy)
6.	[Clean up](#cleanup)
7.	[Summary and next steps](#summary)

<a id="setup"></a>
## 1. Set up the environment

Before you use the sample code in this notebook, you must perform the following setup tasks:

-  Contact with your Cloud Pack for Data administrator and ask him for your account credentials

### Connection to WML

Authenticate the Watson Machine Learning service on IBM Cloud Pack for Data. You need to provide platform `url`, your `username` and `password`.

In [None]:
username = 'PASTE YOUR USERNAME HERE'
password = 'PASTE YOUR PASSWORD HERE'
url = 'PASTE THE PLATFORM URL HERE'

In [2]:
wml_credentials = {
    "username": username,
    "password": password,
    "url": url,
    "instance_id": 'openshift',
    "version": '3.5'
}

### Install and import the `ibm-watson-machine-learning` package
**Note:** `ibm-watson-machine-learning` documentation can be found <a href="http://ibm-wml-api-pyclient.mybluemix.net/" target="_blank" rel="noopener no referrer">here</a>.

In [None]:
!pip install -U ibm-watson-machine-learning

In [3]:
from ibm_watson_machine_learning import APIClient

client = APIClient(wml_credentials)

### Working with spaces

First of all, you need to create a space that will be used for your work. If you do not have space already created, you can use `{PLATFORM_URL}/ml-runtime/spaces?context=icp4data` to create one.

- Click New Deployment Space
- Create an empty space
- Go to space `Settings` tab
- Copy `space_id` and paste it below

**Tip**: You can also use SDK to prepare the space for your work. More information can be found [here](https://github.com/IBM/watson-machine-learning-samples/blob/master/cpd3.5/notebooks/python_sdk/instance-management/Space%20management.ipynb).

**Action**: Assign space ID below

In [None]:
space_id = 'PASTE YOUR SPACE ID HERE'

You can use `list` method to print all existing spaces.

In [None]:
client.spaces.list(limit=10)

To be able to interact with all resources available in Watson Machine Learning, you need to set **space** which you will be using.

In [4]:
client.set.default_space(space_id)

'SUCCESS'

### Test Spark

In [5]:
try:
    from pyspark.sql import SparkSession
except:
    print('Error: Spark runtime is missing. If you are using Watson Studio change the notebook runtime to Spark.')
    raise

<a id="load"></a>
## 2. Load and explore data

In this section you will load the data as an Apache® Spark DataFrame and perform a basic exploration.
 

The csv file for German Credit Risk is available on the same repository as this notebook. Load the file to Apache® Spark DataFrame using code below.

In [6]:
import os
from wget import download

sample_dir = 'spark_sample_model'
if not os.path.isdir(sample_dir):
    os.mkdir(sample_dir)
    
filename = os.path.join(sample_dir, 'credit_risk_training.csv')
if not os.path.isfile(filename):
    filename = download('https://github.com/IBM/watson-machine-learning-samples/raw/master/cpd3.5/data/credit_risk/credit_risk_training.csv', out=sample_dir)

In [7]:
spark = SparkSession.builder.getOrCreate()

df_data = spark.read\
  .format('org.apache.spark.sql.execution.datasources.csv.CSVFileFormat')\
  .option('header', 'true')\
  .option('inferSchema', 'true')\
  .load(filename)

Explore the loaded data by using the following Apache® Spark DataFrame methods:
-  print schema
-  print top ten records
-  count all records

In [8]:
df_data.printSchema()

root
 |-- CheckingStatus: string (nullable = true)
 |-- LoanDuration: integer (nullable = true)
 |-- CreditHistory: string (nullable = true)
 |-- LoanPurpose: string (nullable = true)
 |-- LoanAmount: integer (nullable = true)
 |-- ExistingSavings: string (nullable = true)
 |-- EmploymentDuration: string (nullable = true)
 |-- InstallmentPercent: integer (nullable = true)
 |-- Sex: string (nullable = true)
 |-- OthersOnLoan: string (nullable = true)
 |-- CurrentResidenceDuration: integer (nullable = true)
 |-- OwnsProperty: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- InstallmentPlans: string (nullable = true)
 |-- Housing: string (nullable = true)
 |-- ExistingCreditsCount: integer (nullable = true)
 |-- Job: string (nullable = true)
 |-- Dependents: integer (nullable = true)
 |-- Telephone: string (nullable = true)
 |-- ForeignWorker: string (nullable = true)
 |-- Risk: string (nullable = true)



As you can see, the data contains 21 fields. Risk field is the one we would like to predict (label).

In [9]:
df_data.show(n=5, truncate=False, vertical=True)

-RECORD 0------------------------------------------
 CheckingStatus           | 0_to_200               
 LoanDuration             | 31                     
 CreditHistory            | credits_paid_to_date   
 LoanPurpose              | other                  
 LoanAmount               | 1889                   
 ExistingSavings          | 100_to_500             
 EmploymentDuration       | less_1                 
 InstallmentPercent       | 3                      
 Sex                      | female                 
 OthersOnLoan             | none                   
 CurrentResidenceDuration | 3                      
 OwnsProperty             | savings_insurance      
 Age                      | 32                     
 InstallmentPlans         | none                   
 Housing                  | own                    
 ExistingCreditsCount     | 1                      
 Job                      | skilled                
 Dependents               | 1                      
 Telephone  

In [10]:
print("Number of records: " + str(df_data.count()))

Number of records: 5000


As you can see, the data set contains 5000 records.

### 2.1 Prepare data

In this subsection you will split your data into: train, test and predict datasets.

In [11]:
splitted_data = df_data.randomSplit([0.8, 0.18, 0.02], 24)
train_data = splitted_data[0]
test_data = splitted_data[1]
predict_data = splitted_data[2]

print("Number of training records: " + str(train_data.count()))
print("Number of testing records : " + str(test_data.count()))
print("Number of prediction records : " + str(predict_data.count()))

Number of training records: 4016
Number of testing records : 881
Number of prediction records : 103


As you can see our data has been successfully split into three datasets: 

-  The train data set, which is the largest group, is used for training.
-  The test data set will be used for model evaluation and is used to test the assumptions of the model.
-  The predict data set will be used for prediction.

<a id="upload"></a>
## 3. Persist model

In this section you will learn how to store your pipeline and model in Watson Machine Learning repository by using python client libraries.

**Note**: Apache® Spark 2.4 is required.

### 3.1: Save pipeline and model

In this subsection you will learn how to save pipeline and model artifacts to your Watson Machine Learning instance.

**Download pipeline and model archives**

In [12]:
import os
from wget import download

sample_dir = 'spark_sample_model'
if not os.path.isdir(sample_dir):
    os.mkdir(sample_dir)
    
pipeline_filename = os.path.join(sample_dir, 'credit_risk_spark_pipeline.tar.gz')
if not os.path.isfile(pipeline_filename):
    pipeline_filename = download('https://github.com/IBM/watson-machine-learning-samples/raw/master/cpd3.5/models/spark/credit-risk/model/credit_risk_spark_pipeline.tar.gz', out=sample_dir)
model_filename = os.path.join(sample_dir, 'credit_risk_spark_model.gz')
if not os.path.isfile(model_filename):
    model_filename = download('https://github.com/IBM/watson-machine-learning-samples/raw/master/cpd3.5/models/spark/credit-risk/model/credit_risk_spark_model.gz', out=sample_dir)

**Store piepline and model**

To be able to store your Spark model, you need to provide a training data reference, this will allow to read the model schema automatically.

In [13]:
training_data_references = [
                {
                    "type": "fs",
                    "connection": {},
                    "location": {},
                    "schema": {
                    "id": "training_schema",
                    "fields": [
                      {
                        "metadata": {},
                        "name": "CheckingStatus",
                        "nullable": True,
                        "type": "string"
                      },
                      {
                        "metadata": {},
                        "name": "LoanDuration",
                        "nullable": True,
                        "type": "integer"
                      },
                      {
                        "metadata": {},
                        "name": "CreditHistory",
                        "nullable": True,
                        "type": "string"
                      },
                      {
                        "metadata": {},
                        "name": "LoanPurpose",
                        "nullable": True,
                        "type": "string"
                      },
                      {
                        "metadata": {},
                        "name": "LoanAmount",
                        "nullable": True,
                        "type": "integer"
                      },
                      {
                        "metadata": {},
                        "name": "ExistingSavings",
                        "nullable": True,
                        "type": "string"
                      },
                      {
                        "metadata": {},
                        "name": "EmploymentDuration",
                        "nullable": True,
                        "type": "string"
                      },
                      {
                        "metadata": {},
                        "name": "InstallmentPercent",
                        "nullable": True,
                        "type": "integer"
                      },
                      {
                        "metadata": {},
                        "name": "Sex",
                        "nullable": True,
                        "type": "string"
                      },
                      {
                        "metadata": {},
                        "name": "OthersOnLoan",
                        "nullable": True,
                        "type": "string"
                      },
                      {
                        "metadata": {},
                        "name": "CurrentResidenceDuration",
                        "nullable": True,
                        "type": "integer"
                      },
                      {
                        "metadata": {},
                        "name": "OwnsProperty",
                        "nullable": True,
                        "type": "string"
                      },
                      {
                        "metadata": {},
                        "name": "Age",
                        "nullable": True,
                        "type": "integer"
                      },
                      {
                        "metadata": {},
                        "name": "InstallmentPlans",
                        "nullable": True,
                        "type": "string"
                      },
                      {
                        "metadata": {},
                        "name": "Housing",
                        "nullable": True,
                        "type": "string"
                      },
                      {
                        "metadata": {},
                        "name": "ExistingCreditsCount",
                        "nullable": True,
                        "type": "integer"
                      },
                      {
                        "metadata": {},
                        "name": "Job",
                        "nullable": True,
                        "type": "string"
                      },
                      {
                        "metadata": {},
                        "name": "Dependents",
                        "nullable": True,
                        "type": "integer"
                      },
                      {
                        "metadata": {},
                        "name": "Telephone",
                        "nullable": True,
                        "type": "string"
                      },
                      {
                        "metadata": {},
                        "name": "ForeignWorker",
                        "nullable": True,
                        "type": "string"
                      },
                      {
                        "metadata": {
                          "modeling_role": "target"
                        },
                        "name": "Risk",
                        "nullable": True,
                        "type": "string"
                      }
                    ]
                  }
                }
]

In [14]:
published_model_details = client.repository.store_model(
    model=model_filename, 
    meta_props={
        client.repository.ModelMetaNames.NAME:'Credit Risk model',
        client.repository.ModelMetaNames.TYPE: "mllib_2.4",
        client.repository.ModelMetaNames.SOFTWARE_SPEC_UID: client.software_specifications.get_id_by_name('spark-mllib_2.4'),
        client.repository.ModelMetaNames.TRAINING_DATA_REFERENCES: training_data_references,
        client.repository.ModelMetaNames.LABEL_FIELD: "Risk",
    }, 
    training_data=train_data, 
    pipeline=pipeline_filename)

In [15]:
model_uid = client.repository.get_model_uid(published_model_details)
print(model_uid)

e96692a9-cc64-408d-9b1c-05da622c508a


In [None]:
client.repository.get_model_details(model_uid)

Get saved model metadata from Watson Machine Learning.

**Tip**: Use `client.repository.ModelMetaNames.show()` to get the list of available props.

In [16]:
client.repository.ModelMetaNames.show()

------------------------  ----  --------  ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
META_PROP NAME            TYPE  REQUIRED  SCHEMA
NAME                      str   Y
DESCRIPTION               str   N
INPUT_DATA_SCHEMA         list  N         {'id(required)': 'string', 'fields(required)': [{'name(required)': 'string', 'type(required)': 'string', 'nullable(optional)': 'string'}]}
TRAINING_DATA_REFERENCES  list  N         [{'name(optional)': 'string', 'type(required)': 'string', 'connection(required)': {'endpoint_url(required)': 'string', 'access_key_id(required)': 'string', 'secret_access_key(required)': 'string'},

### 3.2: Load model

In this subsection you will learn how to load back saved model from specified instance of Watson Machine Learning.

In [17]:
loaded_model = client.repository.load(model_uid)

You can print for example model name to make sure that model has been loaded correctly.

In [18]:
print(type(loaded_model))

<class 'pyspark.ml.pipeline.PipelineModel'>


<a id="visualization"></a>
## 4. Predict locally

In this section you will learn how to score test data using loaded model.

### 4.1: Make local prediction using previously loaded model and test data

In this subsection you will score *predict_data* data set.

In [19]:
predictions = loaded_model.transform(predict_data)

Preview the results by calling the *show()* method on the predictions DataFrame.

In [20]:
predictions.show(5)

+--------------+------------+--------------------+-----------+----------+---------------+------------------+------------------+------+------------+------------------------+------------+---+----------------+-------+--------------------+----------+----------+---------+-------------+-------+-----------------+----------------+--------------+------------------+---------------------+------+---------------+---------------+-------------------+----------+------+------------+----------------+-----+--------------------+--------------------+--------------------+----------+--------------+
|CheckingStatus|LoanDuration|       CreditHistory|LoanPurpose|LoanAmount|ExistingSavings|EmploymentDuration|InstallmentPercent|   Sex|OthersOnLoan|CurrentResidenceDuration|OwnsProperty|Age|InstallmentPlans|Housing|ExistingCreditsCount|       Job|Dependents|Telephone|ForeignWorker|   Risk|CheckingStatus_IX|CreditHistory_IX|LoanPurpose_IX|ExistingSavings_IX|EmploymentDuration_IX|Sex_IX|OthersOnLoan_IX|OwnsProperty_I

By tabulating a count, you can see which product line is the most popular.

In [21]:
predictions.select("predictedLabel").groupBy("predictedLabel").count().show(truncate=False)

+--------------+-----+
|predictedLabel|count|
+--------------+-----+
|No Risk       |82   |
|Risk          |21   |
+--------------+-----+



<a id="deploy"></a>
## 5. Deploy and score

In this section you will learn how to create online scoring and to score a new data record using `ibm-watson-machine-learning`.

**Note:** You can also use REST API to deploy and score.
For more information about REST APIs, see the [Swagger Documentation](https://watson-ml-v4-api.mybluemix.net/wml-restapi-cloud.html#/Deployments/deployments_create).

### 5.1: Create online scoring endpoint

Now you can create an online scoring endpoint. 

#### Create online deployment for published model

In [22]:
deployment_details = client.deployments.create(
    model_uid, 
    meta_props={
        client.deployments.ConfigurationMetaNames.NAME: "Credit Risk model deployment",
        client.deployments.ConfigurationMetaNames.ONLINE: {}
    }
)



#######################################################################################

Synchronous deployment creation for uid: 'e96692a9-cc64-408d-9b1c-05da622c508a' started

#######################################################################################


initializing.
ready


------------------------------------------------------------------------------------------------
Successfully finished deployment creation, deployment_uid='3f22f536-f395-4d03-a08f-6311765a0e42'
------------------------------------------------------------------------------------------------




In [23]:
deployment_details

{'entity': {'asset': {'id': 'e96692a9-cc64-408d-9b1c-05da622c508a'},
  'custom': {},
  'deployed_asset_type': 'model',
  'hardware_spec': {'id': 'Not_Applicable', 'name': 'S', 'num_nodes': 1},
  'name': 'Credit Risk model deployment',
  'online': {},
  'space_id': '83b00166-9047-4159-b777-83dcb498e7ab',
  'status': {'online_url': {'url': 'https://wmlgmc-cpd-wmlgmc.apps.wmlautoai.cp.fyre.ibm.com/ml/v4/deployments/3f22f536-f395-4d03-a08f-6311765a0e42/predictions'},
   'state': 'ready'}},
 'metadata': {'created_at': '2020-12-08T12:12:57.722Z',
  'id': '3f22f536-f395-4d03-a08f-6311765a0e42',
  'modified_at': '2020-12-08T12:12:57.722Z',
  'name': 'Credit Risk model deployment',
  'owner': '1000330999',
  'space_id': '83b00166-9047-4159-b777-83dcb498e7ab'}}

<a id="score"></a>
Now, you can send new scoring records (new data) for which you would like to get predictions. To do that, execute the following sample code:

In [24]:
fields = ["CheckingStatus", "LoanDuration", "CreditHistory", "LoanPurpose", "LoanAmount", "ExistingSavings",
                  "EmploymentDuration", "InstallmentPercent", "Sex", "OthersOnLoan", "CurrentResidenceDuration",
                  "OwnsProperty", "Age", "InstallmentPlans", "Housing", "ExistingCreditsCount", "Job", "Dependents",
                  "Telephone", "ForeignWorker"]
values = [
    ["no_checking", 13, "credits_paid_to_date", "car_new", 1343, "100_to_500", "1_to_4", 2, "female", "none", 3,
     "savings_insurance", 46, "none", "own", 2, "skilled", 1, "none", "yes"],
    ["no_checking", 24, "prior_payments_delayed", "furniture", 4567, "500_to_1000", "1_to_4", 4, "male", "none",
     4, "savings_insurance", 36, "none", "free", 2, "management_self-employed", 1, "none", "yes"],
    ["0_to_200", 26, "all_credits_paid_back", "car_new", 863, "less_100", "less_1", 2, "female", "co-applicant",
     2, "real_estate", 38, "none", "own", 1, "skilled", 1, "none", "yes"],
    ["0_to_200", 14, "no_credits", "car_new", 2368, "less_100", "1_to_4", 3, "female", "none", 3, "real_estate",
     29, "none", "own", 1, "skilled", 1, "none", "yes"],
    ["0_to_200", 4, "no_credits", "car_new", 250, "less_100", "unemployed", 2, "female", "none", 3,
     "real_estate", 23, "none", "rent", 1, "management_self-employed", 1, "none", "yes"],
    ["no_checking", 17, "credits_paid_to_date", "car_new", 832, "100_to_500", "1_to_4", 2, "male", "none", 2,
     "real_estate", 42, "none", "own", 1, "skilled", 1, "none", "yes"],
    ["no_checking", 33, "outstanding_credit", "appliances", 5696, "unknown", "greater_7", 4, "male",
     "co-applicant", 4, "unknown", 54, "none", "free", 2, "skilled", 1, "yes", "yes"],
    ["0_to_200", 13, "prior_payments_delayed", "retraining", 1375, "100_to_500", "4_to_7", 3, "male", "none", 3,
     "real_estate", 37, "none", "own", 2, "management_self-employed", 1, "none", "yes"]
]

payload_scoring = {"input_data": [{"fields": fields, "values": values}]}
deployment_id = client.deployments.get_id(deployment_details)

client.deployments.score(deployment_id, payload_scoring)

{'predictions': [{'fields': ['CheckingStatus',
    'LoanDuration',
    'CreditHistory',
    'LoanPurpose',
    'LoanAmount',
    'ExistingSavings',
    'EmploymentDuration',
    'InstallmentPercent',
    'Sex',
    'OthersOnLoan',
    'CurrentResidenceDuration',
    'OwnsProperty',
    'Age',
    'InstallmentPlans',
    'Housing',
    'ExistingCreditsCount',
    'Job',
    'Dependents',
    'Telephone',
    'ForeignWorker',
    'CheckingStatus_IX',
    'CreditHistory_IX',
    'LoanPurpose_IX',
    'ExistingSavings_IX',
    'EmploymentDuration_IX',
    'Sex_IX',
    'OthersOnLoan_IX',
    'OwnsProperty_IX',
    'InstallmentPlans_IX',
    'Housing_IX',
    'Job_IX',
    'Telephone_IX',
    'ForeignWorker_IX',
    'features',
    'rawPrediction',
    'probability',
    'prediction',
    'predictedLabel'],
   'values': [['no_checking',
     13,
     'credits_paid_to_date',
     'car_new',
     1343,
     '100_to_500',
     '1_to_4',
     2,
     'female',
     'none',
     3,
     'savings

<a id="cleanup"></a>
## 6. Clean up 

If you want to clean up all created assets:
- experiments
- trainings
- pipelines
- model definitions
- models
- functions
- deployments

please follow up this sample [notebook](https://github.com/IBM/watson-machine-learning-samples/blob/master/cpd3.5/notebooks/python_sdk/instance-management/Machine%20Learning%20artifacts%20management.ipynb).

<a id="summary"></a>
## 7. Summary and next steps     

 You successfully completed this notebook! You learned how to use Apache Spark machine learning as well as Watson Machine Learning for model creation and deployment. 
 
 Check out our [Online Documentation](https://dataplatform.cloud.ibm.com/docs/content/analyze-data/wml-setup.html) for more samples, tutorials, documentation, how-tos, and blog posts. 

### Authors

**Amadeusz Masny**, Python Software Developer in Watson Machine Learning at IBM

Copyright © 2020-2025 IBM. This notebook and its source code are released under the terms of the MIT License.