# Introduction - Customer Churn Prediction notebook
In this notebook, we illustrate how you can train a model for Churn Prediction using PySpark. After training the model, you step through the instructions to deploy the model using Watson Machine Learning.

This notebook is a variation of the original notebook reference in this github repo: https://github.com/elenalowery/cpd4_demo/blob/master/assets/jupyterlab/Predict_Customer_Churn_CPD4.ipynb

## Package installation

In [None]:
import warnings
warnings.filterwarnings('ignore')
import time

In [None]:
# install required Python modules
!pip install --upgrade pyspark==3.0.3 --no-cache | tail -n 1
!pip install lime --no-cache | tail -n 1
!pip install SciPy --no-cache | tail -n 1

# Model building and deployment <a name="model"></a>

In this section you will learn how to train Spark MLLib model and next deploy it as web-service using Watson Machine Learning service.

## Load the training data 

- Click in the next cell to insert the code to import the training dataset.
- Click the **Find and add data** icon in the top right, find the data set you'd like to import (for example, CUSTOMER_DATA_ready) into this notebook and click **Insert to code** drop down and select **pandas DataFrame (depr...)**

<font color='red'>DO NOTE select the **pandas DataFrame** option but rather the **pandas DataFrame (depr...)** option</font>

In [None]:
# Click here to insert code to import training datasetfrom ibm_watson_studio_lib import access_project_or_space
# Import dataset into a pandas DataFrame
## Sample inserted code (Note that the name of your dataframe may be different)
##from ibm_watson_studio_lib import access_project_or_space
##wslib = access_project_or_space()

##import pandas as pd

##df_data_1 = pd.read_csv(wslib.mount.get_data_path('CUSTOMER_DATA_ready'))
##df_data_1.head()from ibm_watson_studio_lib import access_project_or_space
from ibm_watson_studio_lib import access_project_or_space
wslib = access_project_or_space()

import pandas as pd

df_data_1 = pd.read_csv(wslib.mount.get_data_path('CUSTOMER_DATA_ready'))
df_data_1.head()


In [None]:
# Create a PySpark DataFrame from the pandas DataFrame
from pyspark.sql import SparkSession
import pandas as pd

import json
# Provide the name of the pandas DataFrame from the previous cell (should be of the format df_data_<some_number>)
pandasDFname=df_data_1
spark = SparkSession.builder.getOrCreate()
sparkDF=spark.createDataFrame(pandasDFname)
sparkDF.head()

## Explore data

In [None]:
sparkDF.printSchema()

In [None]:
print("Number of records: " + str(sparkDF.count()))

## Create a model

In [None]:
spark_df = sparkDF
# Split the labeled data into a training set and a test set
(train_data, test_data) = spark_df.randomSplit([0.8, 0.2], 24)

# Provide a target name for your churn model
MODEL_NAME = "Churn Model"
# Provide a target name for your churn model deployment
DEPLOYMENT_NAME = "Churn Deployment"

print("Number of records for training: " + str(train_data.count()))
print("Number of records for evaluation: " + str(test_data.count()))

spark_df.printSchema()

The code below creates a Random Forest Classifier with Spark, setting up string indexers for the categorical features and the label column. Finally, this notebook creates a pipeline including the indexers and the model, and does an initial Area Under ROC evaluation of the model.

In [None]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer, IndexToString, VectorAssembler
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml import Pipeline, Model
from pyspark.ml.feature import SQLTransformer

features = [x for x in spark_df.columns if x != 'CHURN']
# Specify the categorical features
categorical_features = ['PAYMETHOD', 'LOCALBILLTYPE', 'LONGDISTANCEBILLTYPE', 'GENDER', 'STATUS', 'CAROWNER']
# Index the categorical feature so each string value is replaced with an integer
categorical_num_features = [x + '_IX' for x in categorical_features]
si_list = [StringIndexer(inputCol=x, outputCol=y) for x, y in zip(categorical_features, categorical_num_features)]
va_features = VectorAssembler(inputCols=categorical_num_features + [x for x in features if x not in categorical_features], outputCol="features")

In [None]:
# Index the label column
si_label = StringIndexer(inputCol="CHURN", outputCol="label").fit(spark_df)
label_converter = IndexToString(inputCol="prediction", outputCol="predictedLabel", labels=si_label.labels)

In [None]:
from pyspark.ml.classification import RandomForestClassifier
# train a Random Forect Classifier
classifier = RandomForestClassifier(featuresCol="features")
pipeline = Pipeline(stages= si_list + [si_label, va_features, classifier, label_converter])

model = pipeline.fit(train_data)

In [None]:
predictions = model.transform(test_data)
evaluatorDT = BinaryClassificationEvaluator(rawPredictionCol="prediction",  metricName='areaUnderROC')
area_under_curve = evaluatorDT.evaluate(predictions)

evaluatorDT = BinaryClassificationEvaluator(rawPredictionCol="prediction",  metricName='areaUnderPR')
area_under_PR = evaluatorDT.evaluate(predictions)
#default evaluation is areaUnderROC
print("areaUnderROC = %g" % area_under_curve, "areaUnderPR = %g" % area_under_PR)

In [None]:
# extra code: evaluate more metrics by exporting them into pandas and numpy
from sklearn.metrics import classification_report
y_pred = predictions.toPandas()['prediction']
y_pred = ['T' if pred == 1.0 else 'F' for pred in y_pred]
y_test = test_data.toPandas()['CHURN']
print(classification_report(y_test, y_pred, target_names=['T', 'F']))

## Publish the model

In this section, the notebook uses Watson Machine Learning to save the model (including the pipeline) to the WML instance. Previous versions of the model are removed so that the notebook can be run again, resetting all data for another demo.

In [None]:
import os
cpdtoken=os.environ['USER_ACCESS_TOKEN']
wml_credentials = {
"token": cpdtoken,
"instance_id" : "openshift",
"url": os.environ['RUNTIME_ENV_APSX_URL'],
"version": "4.0"
}

from ibm_watson_machine_learning import APIClient
wml_client = APIClient(wml_credentials)

In [None]:
def getSpaceIDwml(wml_client,space_name):
    spaces = wml_client.spaces.get_details()['resources'];
    try:
        spaceList = next(item for item in spaces if item['entity']['name']==space_name)
        spaceID = spaceList['metadata']['id']
    except:
        spaceID = -1
    return spaceID

In [None]:
def createSpacewml(wml_client,space_name):
    spaces = wml_client.spaces.get_details()['resources'];
    for space in spaces:
        if space['entity']['name'] ==space_name:
            print("Deployment space with name",space_name,"already exists . .")
            return space['metadata']['id']
    print("\nCreating a new deployment space -",space_name)
    # create the space
    space_meta_data = {
        wml_client.spaces.ConfigurationMetaNames.NAME : space_name
    }

    stored_space_details = wml_client.spaces.store(space_meta_data)
    space_id = stored_space_details['metadata']['id']
    i=0
    while(True):
        stored_space_details=wml_client.spaces.get_details(space_id)
        status=stored_space_details['entity']['status']['state']
        print("i: ", i, " status: ", status)
        if status == 'active':
            break
        time.sleep(1)
        i = i+1
    return space_id

In [None]:
# Associate Watson Machine Learning with a specific space
space_name='churnUATspace'
space_id=getSpaceIDwml(wml_client,space_name)
if space_id == -1:
    space_id = createSpacewml(wml_client,space_name)
print('space id: ', space_id)
wml_client.set.default_space(space_id)

In [None]:
software_spec_uid = wml_client.software_specifications.get_id_by_name("spark-mllib_3.0")
print("Software Specification ID: {}".format(software_spec_uid))
model_props = {
        wml_client._models.ConfigurationMetaNames.NAME:"{}".format(MODEL_NAME),
        wml_client._models.ConfigurationMetaNames.TYPE: "mllib_3.0",
        wml_client._models.ConfigurationMetaNames.SOFTWARE_SPEC_UID: software_spec_uid,
        #wml_client._models.ConfigurationMetaNames.TRAINING_DATA_REFERENCES: training_data_references,
        wml_client._models.ConfigurationMetaNames.LABEL_FIELD: "CHURN",
    }

In [None]:
def deleteExistingModelsSameName(wml_client,model_name):
    stored_models=wml_client.repository.get_model_details()
    stored_models_details = stored_models['resources']
    for m in stored_models_details:
        m_name = m['metadata']['name']
        if m_name == model_name:
            model_id = m['metadata']['id']
            print("Deleteing model with id: ", model_id, " and name: ", m_name)
            wml_client.repository.delete(model_id)
    return 'Success'

In [None]:
def deleteExistingDeploymentsSameName(wml_client,deployment_name):
    stored_deployments=wml_client.deployments.get_details()
    stored_deployment_details = stored_deployments['resources']
    for d in stored_deployment_details:
        d_name = d['metadata']['name']
        if d_name == deployment_name:
            deployment_id = d['metadata']['id']
            print("Deleteing deployment with id: ", deployment_id, " and name: ", d_name)
            wml_client.deployments.delete(deployment_id)
    return 'Success'

In [None]:
# Delete Existing Deployments with same name
deleteExistingDeploymentsSameName(wml_client,DEPLOYMENT_NAME)

In [None]:
# Delete existing models with same name
deleteExistingModelsSameName(wml_client,MODEL_NAME)

In [None]:
print("Storing model ...")
published_model_details = wml_client.repository.store_model(
    model=model, 
    meta_props=model_props, 
    training_data=train_data, 
    pipeline=pipeline)

model_uid = wml_client.repository.get_model_id(published_model_details)
print("Done")
print("Model ID: {}".format(model_uid))

In [None]:
wml_client.repository.list_models()

In [None]:
wml_client.deployments.list()

## Deploy the model

The next section of the notebook deploys the model as a RESTful web service in Watson Machine Learning. The deployed model will have a scoring URL you can use to send data to the model for predictions.

In [None]:
deployment_details = wml_client.deployments.create(
    model_uid, 
    meta_props={
        wml_client.deployments.ConfigurationMetaNames.NAME: "{}".format(DEPLOYMENT_NAME),
        wml_client.deployments.ConfigurationMetaNames.ONLINE: {}
    }
)
scoring_url = wml_client.deployments.get_scoring_href(deployment_details)
deployment_uid=wml_client.deployments.get_uid(deployment_details)

print("Scoring URL:" + scoring_url)
print("Model id: {}".format(model_uid))
print("Deployment id: {}".format(deployment_uid))

## Sample scoring

In [None]:
fields = ["ID","LONGDISTANCE","INTERNATIONAL","LOCAL","DROPPED","PAYMETHOD","LOCALBILLTYPE","LONGDISTANCEBILLTYPE","USAGE",\
            "RATEPLAN","GENDER","STATUS","CHILDREN","ESTINCOME","CAROWNER","AGE"]
values = [[1,28,0,60,0,"Auto","FreeLocal","Standard",89,4,"F","M",1,23000,"N",45]]
scoring_payload = {"input_data": [{"fields": fields, "values": values}]}

In [None]:
scoring_response = wml_client.deployments.score(deployment_uid, scoring_payload)
scoring_response