## Predict Customer Churn Use Case Implementation
The objective is to follow the CRISP-DM methodology to build a model to predict customer churn
![CRISP-DM](https://raw.githubusercontent.com/yfphoon/dsx_demo/master/crisp_dm.png)

### Step 1: Download the customer churn data

In [None]:
#Run once to install the wget package
!pip install wget

In [None]:
# download data from GitHub repository
import wget
url_churn='https://raw.githubusercontent.com/yfphoon/dsx_demo/master/data/customer_churn/churn.csv'
url_customer='https://raw.githubusercontent.com/yfphoon/dsx_demo/master/data/customer_churn/customer.csv'

#remove existing files before downloading
!rm -f churn.csv
!rm -f customer.csv

churnFilename=wget.download(url_churn)
customerFilename=wget.download(url_customer)

#list existing files
!ls -l churn.csv
!ls -l customer.csv

### Step 2: Read data into Spark DataFrames

Note: You want to reference the Spark DataFrame API to learn more about the supported operations, https://spark.apache.org/docs/2.0.0-preview/api/python/pyspark.sql.html#pyspark.sql.DataFrame

In [None]:
churn= sqlContext.read.format("csv").option("header", "true").option("inferSchema", "true").load(churnFilename)
customer= sqlContext.read.format("csv").option("header", "true").option("inferSchema", "true").load(customerFilename)

### Step 3: Merge Files


In [None]:
data=customer.join(churn,customer['ID']==churn['ID']).select(customer['*'],churn['CHURN'])
data.toPandas().head()

### Step 4: Rename some columns
This step is not a requirement, it just makes some columns names simpler to type with no spaces

In [None]:
# withColumnRenamed renames an existing column in a SparkDataFrame and returns a new SparkDataFrame

data = data.withColumnRenamed("Est Income", "EstIncome").withColumnRenamed("Car Owner","CarOwner")
data.toPandas().head()

### Step 5: Data understanding

### Dataset Overview

In [None]:
df_pandas = data.toPandas()
print "There are " + str(len(df_pandas)) + " observations in the customer history dataset."
print "There are " + str(len(df_pandas.columns)) + " variables in the dataset."

print "\n******************Descriptive statistics*****************************\n"
print df_pandas.drop(['ID'], axis = 1).describe()


### Exploratory Data Analysis

The **Brunel** Visualization Language is a highly succinct and novel language that defines interactive data visualizations based on tabular data. The language is well suited for both data scientists and more aggressive business users. The system interprets the language and produces visualizations using the user's choice of existing lower-level visualization technologies typically used by application engineers such as RAVE or D3. 

More information about Brunel Visualization: https://github.com/Brunel-Visualization/Brunel/wiki

Try Brunel visualization here:  http://brunel.mybluemix.net/gallery_app/renderer

In [None]:
import brunel
df_pandas = data.toPandas()
%brunel data('df_pandas') stack bar x(Paymethod) y(#count) color(CHURN) bin(Paymethod) percent(#count) label(#count) tooltip(#all) | x(LongDistance) y(Usage) point color(Paymethod) tooltip(LongDistance, Usage) :: width=1100, height=400 

In [None]:
# Heat map
%brunel data('df_pandas') x(LocalBilltype) y(Dropped) color(#count:red) style('symbol:rect; size:100%; stroke:none') tooltip(Dropped,#count)

**PixieDust** is a Python Helper library for Spark IPython Notebooks. One of it's main features are visualizations. You'll notice that unlike other APIs which produce just output, PixieDust creates an interactive UI in which you can explore data.<br/>
More information about PixieDust: https://github.com/ibm-cds-labs/pixiedust?cm_mc_uid=78151411419314871783930&cm_mc_sid_50200000=1487962969

**If you haven't already installed it, uncomment and run the following cell to install the pixiedust Python library in your notebook environment. You only need to run it once**


In [None]:
#!pip install --user --upgrade pixiedust

In [None]:
from pixiedust.display import *
display(data)

### Interactive query with Spark SQL

In [None]:
# Spark SQL also allow you to use standard SQL
data.createOrReplaceTempView("data")
sql = """
SELECT c.*
FROM data c
WHERE c.EstIncome>90000

"""
spark.sql(sql).toPandas().head()

### Step 6: Build the Spark pipeline and the Random Forest model
"Pipeline" is an API in SparkML that's used for building models. A pipeline defines a sequence of transformers and estimators to perform tha analysis in stages.<br/>
Additional information on SparkML: https://spark.apache.org/docs/2.0.2/ml-guide.html

In [None]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorIndexer, IndexToString
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import RandomForestClassifier

# StringIndexer encodes a string column of labels to a column of label indices. 
SI1 = StringIndexer(inputCol='Gender', outputCol='GenderEncoded')
SI2 = StringIndexer(inputCol='Status',outputCol='StatusEncoded')
SI3 = StringIndexer(inputCol='CarOwner',outputCol='CarOwnerEncoded')
SI4 = StringIndexer(inputCol='Paymethod',outputCol='PaymethodEncoded')
SI5 = StringIndexer(inputCol='LocalBilltype',outputCol='LocalBilltypeEncoded')
SI6 = StringIndexer(inputCol='LongDistanceBilltype',outputCol='LongDistanceBilltypeEncoded')

# Apply OneHotEncoder so categorical features aren't given numeric importance
# One-hot encoding maps a column of label indices to a column of binary vectors, with at most a single one-value. 
OH1 = OneHotEncoder(inputCol="GenderEncoded", outputCol="GenderEncoded"+"classVec")
OH2 = OneHotEncoder(inputCol="StatusEncoded", outputCol="StatusEncoded"+"classVec")
OH3 = OneHotEncoder(inputCol="CarOwnerEncoded", outputCol="CarOwnerEncoded"+"classVec")
OH4 = OneHotEncoder(inputCol="PaymethodEncoded", outputCol="PaymethodEncoded"+"classVec")
OH5 = OneHotEncoder(inputCol="LocalBilltypeEncoded", outputCol="LocalBilltypeEncoded"+"classVec")
OH6 = OneHotEncoder(inputCol="LongDistanceBilltypeEncoded", outputCol="LongDistanceBilltypeEncoded"+"classVec")


# Pipelines API requires that input variables are passed in  a vector
assembler = VectorAssembler(inputCols=["GenderEncodedclassVec", "StatusEncodedclassVec", "CarOwnerEncodedclassVec", "PaymethodEncodedclassVec", "LocalBilltypeEncodedclassVec", \
                                       "LongDistanceBilltypeEncodedclassVec", "Children", "EstIncome", "Age", "LongDistance", "International", "Local",\
                                      "Dropped","Usage","RatePlan"], outputCol="features")

In [None]:
# encode the label column
labelIndexer = StringIndexer(inputCol='CHURN', outputCol='label').fit(data)

In [None]:
# instantiate the algorithm, take the default settings
rf=RandomForestClassifier(labelCol="label", featuresCol="features")

In [None]:
# Convert indexed labels back to original labels.
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel", labels=labelIndexer.labels)

In [None]:
# build the pipeline
pipeline = Pipeline(stages=[SI1,SI2,SI3,SI4,SI5,SI6, labelIndexer, OH1, OH2, OH3, OH4, OH5, OH6, assembler, rf, labelConverter])

In [None]:
# Split data into train and test datasets
(trainingData, testingData) = data.randomSplit([0.7, 0.3],seed=9)
trainingData.cache()
testingData.cache()

In [None]:
# Build model. The fitted model from a Pipeline is a PipelineModel, which consists of fitted models and transformers, corresponding to the pipeline stages.
model = pipeline.fit(trainingData)

### Step 7: Score the test data set

In [None]:
result=model.transform(testingData)
result_display=result.select(result["ID"],result["CHURN"],result["Label"],result["predictedLabel"],result["prediction"],result["probability"])
result_display.toPandas().head(6)

### Step 8: Model Evaluation
Find accuracy of the models and the Area Under the ROC Curve 

In [None]:
print 'Model Accuracy = {:.2f}.'.format(result.filter(result.label == result.prediction).count() / float(result.count()))

In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Evaluate model
evaluator = BinaryClassificationEvaluator(rawPredictionCol="prediction", labelCol="label", metricName="areaUnderROC")
print 'Area under ROC curve = {:.2f}.'.format(evaluator.evaluate(result))

###  Step 9:  Tune the model to find the best model

#### Build a Parameter Grid specifying the parameters to be evaluated to determine the best combination

In [None]:
# set different levels for the maxDepth
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
paramGrid = (ParamGridBuilder().addGrid(rf.maxDepth,[4,6,8]).build())

#### Create a cross validator to tune the pipeline with the generated parameter grid
Cross-validation attempts to fit the underlying estimator with user-specified combinations of parameters, cross-evaluate the fitted models, and output the best one.

In [None]:
# perform 3 fold cross validation
cv = CrossValidator().setEstimator(pipeline).setEvaluator(evaluator).setEstimatorParamMaps(paramGrid).setNumFolds(3)

In [None]:
# train the model
cvModel = cv.fit(trainingData)

# pick the best model
best_rfModel = cvModel.bestModel

In [None]:
# score the test data set with the best model
cvresult=best_rfModel.transform(testingData)
cvresults_show=cvresult.select(cvresult["ID"],cvresult["CHURN"],cvresult["Label"],cvresult["predictedLabel"],cvresult["prediction"],cvresult["probability"])
cvresults_show.toPandas().head()

In [None]:

print 'Model Accuracy of the best fitted model = {:.2f}.'.format(cvresult.filter(cvresult.label == cvresult.prediction).count()/ float(cvresult.count()))
print 'Model Accuracy of the default model = {:.2f}.'.format(result.filter(result.label == result.prediction).count() / float(result.count()))
print '   '
print('Area under the ROC curve of best fitted model = {:.2f}.'.format(evaluator.evaluate(cvresult)))
print 'Area under the ROC curve of the default model = {:.2f}.'.format(evaluator.evaluate(result))

### Step 10: Save Model
Save the best model. 

A separate notebook has been created for "batch scoring deployment". This deployment notebook retrieves the saved model and applies it to a new dataset. The notebook can be scheduled to run via the Notebook scheduler or through the deployment interface in IBM WML.

In [None]:
# Overwrite any existing saved model in the specified path
best_rfModel.write().overwrite().save("PredictChurn.churnModel")

You have come to the end of this notebook

**Sidney Phoon**<br/>
**Elena Lowery**<br/>
**Rich Tarro**<br/>
**Mokhtar Kandil**<br/>
July, 2017