## Customer Churn Model Scoring

### Step 1: Download new customer data



In [None]:
import wget
url_customer='https://raw.githubusercontent.com/DScienceAtScale/DSX/master/PredictCustomerChurn/Data/new_customer_churn_data.csv'

#remove existing files before downloading
!rm -f new_customer_churn_data.csv

customerFilename=wget.download(url_customer)

!ls -l new_customer_churn_data.csv

### Step 2: Read data into a DataFrame
Note: the new dataset does not contain the label column

In [None]:
newData= sqlContext.read.format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat").option("header", "true").option("inferSchema", "true").load(customerFilename)

In [None]:
newData = newData.withColumnRenamed("Est Income", "EstIncome").withColumnRenamed("Car Owner","CarOwner")
newData.toPandas().head()

### Step 3: Load Saved Model
Load model in Object Storage.

In [None]:
from pyspark.ml import PipelineModel
model1_loaded = PipelineModel.load("PredictChurn.churnModel")

### Step 4: Score the new data
Note: The scored output contains the predicted values and confidence scores

In [None]:
results = model1_loaded.transform(newData)
results.toPandas().head(4)

### Step 5: Export Score into a csv file

In [None]:
#Select ID, prediction and probability fields from the results dataframe

r1=results.select(results["ID"],results["prediction"],results["probability"])
r1.show(5,False)

#### Decompose the probability column
The probability column contains a vector for each record, and the elements must be extracted

In [None]:
from pyspark.sql import Row
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import udf
from pyspark.ml.linalg import Vectors

udf_0 = udf(lambda vector: float(vector[0]), DoubleType())
udf_1 = udf(lambda vector: float(vector[1]), DoubleType())

r2 = (r1.select(r1["ID"], r1["prediction"],r1["probability"])
    .withColumn('probability_0', udf_0(r1.probability))
    .withColumn('probability_1', udf_1(r1.probability))
    .drop("probability"))

r2.show(10, False)

### Write scores to csv file in local storage. (save it in a local subdirectory called 'PredictChurn/' )

In [None]:
# Spark 2.0 method to write should use the new csv(path) class which is equivalent to format("csv").save('path'). However due to current glitch in Spark environment where two csv 
# packages coexist, this will return an error about duplicate sources for the csv class.
# r2.write.csv('SparkdayMelbourne.' + 'churn_scores.csv', mode='overwrite')

# instead of format('csv'), we provide the fully qualified name due to problem mentioned above (currently two csv packages causing resolution issues)
r2.write.format('org.apache.spark.sql.execution.datasources.csv.CSVFileFormat').save('PredictChurn/' + 'churn_scores.csv', mode='overwrite')

In [None]:
!ls PredictChurn/

### Verify that the csv file can be read back

In [None]:
# Same issue as above. read.csv('path') will currently fail, so resorting to the older approach with fully qualified csv class
#r3= spark.read.csv('SparkdayMelbourne.' + 'churn_scores.csv')
r3= spark.read.format('org.apache.spark.sql.execution.datasources.csv.CSVFileFormat').load('PredictChurn/' + 'churn_scores.csv')

r3.select(r3["_c0"].alias("ID"), r3["_c1"].alias("prediction"), r3["_c2"].alias("probability_0"), r3["_c3"].alias("probability_1")).show(5, False)

In [None]:
!rm -rf PredictChurn/

### Write scores to csv file but to object store this time (more involved, as requires credentials, so <span style="color:red"> individual customization </span> needed for each user running this notebook)

#### Connect to Object Storage
In order to write the scores to Object Storage, specify the credentials to connect to your instance of Object Storage.  The easiet way to do that is:
- If you do not already have a file in Object Storage, load a file into it using the **Files** interface
- Click on the provided blank cell right below this one.
- Choose "*Insert SparkSession DataFame*" to generate the credentials and code to connect to Object Storage

![Load Files](https://raw.githubusercontent.com/DScienceAtScale/DSX/master/PredictCustomerChurn/Images/upload_files.png)

- Edit the code to comment out or edit the code that reads the file.  The edited code cell should look like this

![credentials](https://raw.githubusercontent.com/DScienceAtScale/DSX/master/PredictCustomerChurn/Images/generated_credentials.png)


## Once you have at least one file available in the object storage container associated with this project, select the blank cell below this one (click on it), then click on the icon highlighted in the picture above (icon with a pattern of '1001') and then choose "Insert SparkSession DataFrame". 
## Some code similar to what is displayed in the picture above should be automatically inserted in the blank cell.

### Make sure you replace the <span style="color:blue">XXXXXX</span> string below with the <span style="color:blue">name of the container</span> for your current project (by default the same name as the project)

In [None]:
from ingest.Connectors import Connectors

objectstoresaveOptions = {
        Connectors.BluemixObjectStorage.AUTH_URL          : credentials['auth_url'],
        Connectors.BluemixObjectStorage.USERID            : credentials['user_id'],
        Connectors.BluemixObjectStorage.PASSWORD          : credentials['password'],
        Connectors.BluemixObjectStorage.PROJECTID         : credentials['project_id'],
        Connectors.BluemixObjectStorage.REGION            : credentials['region'],
        Connectors.BluemixObjectStorage.TARGET_CONTAINER  : 'XXXXXX',
        Connectors.BluemixObjectStorage.TARGET_FILE_NAME  : 'churn_scores.csv',
        Connectors.BluemixObjectStorage.TARGET_WRITE_MODE : 'write'}


r2.write.format("com.ibm.spark.discover").options(**objectstoresaveOptions).save()

### Make sure you replace the <span style="color:blue">XXXXXX</span> string below with the <span style="color:blue">name of the container</span> for your current project (by default the same name as the project)

In [None]:
r3 = spark.read\
  .format('org.apache.spark.sql.execution.datasources.csv.CSVFileFormat')\
  .load(bmos.url('XXXXXX', 'churn_scores.csv'))
r3.select(r3["_c0"].alias("ID"), r3["_c1"].alias("prediction"), r3["_c2"].alias("probability_0"), r3["_c3"].alias("probability_1")).show(5, False)

### Step 6: Schedule this notebook to run at a time and frequency of your choice
Click on the "clock" icon at the top right

You have come to the end of this notebook

** Sidney Phoon** <br/>
yfphoon@us.ibm.com<br/>
May 4th, 2017