# <center>IBM WolfPack</center>

<img style="float: center;" src="https://github.com/team-wolfpack/Predicting-Customer-Churn-with-Watson-Data-Platform/blob/master/Documents/Team%20%23WolfPack-01.png?raw=true">

# <center><span style="color:blue">Predicting Customer Churn with Watson Data Platform </span></center>

*Adapted from "Predict Customer Churn Use Case Implementation" by Sydney Phoon | https://github.com/SidneyPhoon/IntroToNotebooksLab*

## Table of contents

1. [Step 1: Download the customer churn data](#download)<br/>
2. [Step 2: Read data into Spark DataFrames](#getdata)<br/>
3. [Step 3: Merge Files](#merge)<br/>
4. [Step 4: Rename some columns](#rename)<br/>
5. [Step 5: Data understanding](#dataunderstanding)<br/>
    5.1 [Dataset overview](#overview)<br/>
    5.2 [VectorAssembler](#vectorassembler)<br/>
    5.3 [Normalizer](#normalizer)<br/>
7. [Step 7: Applying Spark pipeline concepts to customer churn data](#applypipelineconcepts)<br/>
8. [Step 8: Creating a Spark ML pipeline](#createpipeline)<br/>
9. [Step 9: Score the test dataset](#scoretestdata)<br/>
10. [Step 10: Model evaluation](#evaluate)<br/>
11. [Step 11: Execute inline invocation of the Random Forests Model](#execute)<br/>

<a id="download"></a>
# <span style="color:#fa04d9"> Step 1: Download the customer churn data</span>

In [1]:
# Run once to install the wget package
!pip install wget



In [2]:
# Download data from GitHub repository
import wget
url_churn='https://raw.githubusercontent.com/team-wolfpack/Predicting-Customer-Churn-with-Watson-Data-Platform/master/Data/churn.csv'
url_customer='https://raw.githubusercontent.com/team-wolfpack/Predicting-Customer-Churn-with-Watson-Data-Platform/master/Data/customer.csv'
    
#remove existing files before downloading
!rm -f churn.csv
!rm -f customer.csv

churnFilename=wget.download(url_churn)
customerFilename=wget.download(url_customer)

#list existing files
!ls -l churn.csv
!ls -l customer.csv

-rw------- 1 s6a7-9b57efaba520fc-a7e94bb08b86 users 20077 Feb  2 15:00 churn.csv
-rw------- 1 s6a7-9b57efaba520fc-a7e94bb08b86 users 279539 Feb  2 15:00 customer.csv


<a id="getdata"></a>
# <span style="color:#fa04d9">Step 2: Read data into Spark DataFrames</span>

Note: You want to reference the Spark DataFrame API to learn more about the supported operations, https://spark.apache.org/docs/2.0.0-preview/api/python/pyspark.sql.html#pyspark.sql.DataFrame

In [3]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

churn_df= spark.read\
  .format('org.apache.spark.sql.execution.datasources.csv.CSVFileFormat')\
  .option('header', 'true')\
  .option("inferSchema", "true")\
  .load("churn.csv")

customer_df = spark.read\
    .format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat")\
    .option("header", "true")\
    .option("inferSchema", "true")\
    .load("customer.csv")

#### <span style="color:blue">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;Take a look at the 5 first datapoints from the newly loaded Spark dataframes.</span>

In [4]:
customer_df.show(5)

+---+------+------+--------+----------+---------+---------+------------+-------------+------+-------+---------+-------------+--------------------+------+--------+
| ID|Gender|Status|Children|Est Income|Car Owner|      Age|LongDistance|International| Local|Dropped|Paymethod|LocalBilltype|LongDistanceBilltype| Usage|RatePlan|
+---+------+------+--------+----------+---------+---------+------------+-------------+------+-------+---------+-------------+--------------------+------+--------+
|  1|     F|     S|     1.0|   38000.0|        N|24.393333|       23.56|          0.0|206.08|    0.0|       CC|       Budget|      Intnl_discount|229.64|     3.0|
|  6|     M|     M|     2.0|   29616.0|        N|49.426667|       29.78|          0.0|  45.5|    0.0|       CH|    FreeLocal|            Standard| 75.29|     2.0|
|  8|     M|     M|     0.0|   19732.8|        N|50.673333|       24.81|          0.0| 22.44|    0.0|       CC|    FreeLocal|            Standard| 47.25|     3.0|
| 11|     M|     S|   

In [5]:
# show the schema
customer_df.printSchema()

root
 |-- ID: integer (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Status: string (nullable = true)
 |-- Children: double (nullable = true)
 |-- Est Income: double (nullable = true)
 |-- Car Owner: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- LongDistance: double (nullable = true)
 |-- International: double (nullable = true)
 |-- Local: double (nullable = true)
 |-- Dropped: double (nullable = true)
 |-- Paymethod: string (nullable = true)
 |-- LocalBilltype: string (nullable = true)
 |-- LongDistanceBilltype: string (nullable = true)
 |-- Usage: double (nullable = true)
 |-- RatePlan: double (nullable = true)



In [6]:
churn_df.show(5)

+---+-----+
| ID|CHURN|
+---+-----+
|  1|    T|
|  6|    F|
|  8|    F|
| 11|    F|
| 14|    F|
+---+-----+
only showing top 5 rows



In [7]:
churn_df.printSchema()

root
 |-- ID: integer (nullable = true)
 |-- CHURN: string (nullable = true)



<a id="merge"></a>
# <span style="color:#fa04d9">Step 3: Merge Files </span>


In [8]:
data_df = customer_df.join(churn_df,customer_df['ID'] == churn_df['ID']).select(customer_df['*'], churn_df['CHURN'])

In [9]:
data_df.show(5)

+---+------+------+--------+----------+---------+---------+------------+-------------+------+-------+---------+-------------+--------------------+------+--------+-----+
| ID|Gender|Status|Children|Est Income|Car Owner|      Age|LongDistance|International| Local|Dropped|Paymethod|LocalBilltype|LongDistanceBilltype| Usage|RatePlan|CHURN|
+---+------+------+--------+----------+---------+---------+------------+-------------+------+-------+---------+-------------+--------------------+------+--------+-----+
|  1|     F|     S|     1.0|   38000.0|        N|24.393333|       23.56|          0.0|206.08|    0.0|       CC|       Budget|      Intnl_discount|229.64|     3.0|    T|
|  6|     M|     M|     2.0|   29616.0|        N|49.426667|       29.78|          0.0|  45.5|    0.0|       CH|    FreeLocal|            Standard| 75.29|     2.0|    F|
|  8|     M|     M|     0.0|   19732.8|        N|50.673333|       24.81|          0.0| 22.44|    0.0|       CC|    FreeLocal|            Standard| 47.25|  

<a id="rename"></a>
# <span style="color:#fa04d9">Step 4: Rename some columns </span>
This step is not a requirement, it just makes some columns names simpler to type with no spaces

In [10]:
# withColumnRenamed renames an existing column in a SparkDataFrame and returns a new SparkDataFrame
data_df = data_df.withColumnRenamed("Est Income", "EstIncome").withColumnRenamed("Car Owner","CarOwner")
data_df.toPandas().head()

Unnamed: 0,ID,Gender,Status,Children,EstIncome,CarOwner,Age,LongDistance,International,Local,Dropped,Paymethod,LocalBilltype,LongDistanceBilltype,Usage,RatePlan,CHURN
0,1,F,S,1,38000.0,N,24.393333,23.56,0,206.08,0,CC,Budget,Intnl_discount,229.64,3,T
1,6,M,M,2,29616.0,N,49.426667,29.78,0,45.5,0,CH,FreeLocal,Standard,75.29,2,F
2,8,M,M,0,19732.8,N,50.673333,24.81,0,22.44,0,CC,FreeLocal,Standard,47.25,3,F
3,11,M,S,2,96.33,N,56.473333,26.13,0,32.88,1,CC,Budget,Standard,59.01,1,F
4,14,F,M,2,52004.8,N,25.14,5.03,0,23.11,0,CH,Budget,Intnl_discount,28.14,1,F


<a id="dataunderstanding"></a>
# <span style="color:#fa04d9">Step 5: Data understanding </span>

<a id="overview"></a>
### &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;Dataset Overview
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;The Pandas library has a powerful set commands to analyze data. As an example, check the use of "describe" below.

In [11]:
df_pandas = data_df.toPandas()
print "There are " + str(len(df_pandas)) + " observations in the customer history dataset."
print "There are " + str(len(df_pandas.columns)) + " variables in the dataset."

print "\n******************Descriptive statistics*****************************\n"
print df_pandas.drop(['ID'], axis = 1).describe()


There are 2066 observations in the customer history dataset.
There are 17 variables in the dataset.

******************Descriptive statistics*****************************

          Children      EstIncome          Age  LongDistance  International  \
count  2066.000000    2066.000000  2066.000000   2066.000000    2066.000000   
mean      1.146176   51514.070465    42.783982     16.122076       1.191104   
std       0.843105   30805.652721    14.894693      9.874795       2.602010   
min       0.000000      96.330000    12.326667      0.000000       0.000000   
25%       0.000000   21021.600000    30.356667      8.090000       0.000000   
50%       1.000000   55860.000000    45.526667     16.140000       0.000000   
75%       2.000000   78000.000000    54.013333     22.990000       0.000000   
max       2.000000  120000.000000    77.000000     59.000000       9.700000   

             Local      Dropped        Usage     RatePlan  
count  2066.000000  2066.000000  2066.000000  2066.00000

<a id="eda"></a>
### &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;Exploratory Data Analysis

The **Brunel** Visualization library provides a highly succinct and novel language that defines interactive data visualizations based on tabular data. The language is well suited for both data scientists and more aggressive business users. The system interprets the language and produces visualizations using the user's choice of existing lower-level visualization technologies typically used by application engineers such as RAVE or D3. 

More information about Brunel Visualization: https://github.com/Brunel-Visualization/Brunel/wiki

Try Brunel visualization here:  http://brunel.mybluemix.net/gallery_app/renderer

In [12]:
import brunel
df_pandas = data_df.toPandas()
%brunel data('df_pandas') stack bar x(Paymethod) y(#count) color(CHURN) bin(Paymethod) percent(#count) label(#count) tooltip(#all) | x(LongDistance) y(Usage) point color(Paymethod) tooltip(LongDistance, Usage) :: width=1100, height=400 

<IPython.core.display.Javascript object>

**PixieDust** is a Python Helper library for Spark IPython Notebooks. One of it's main features are visualizations. You'll notice that unlike other APIs which produce just output, PixieDust creates an interactive UI in which you can explore data.<br/>
More information about PixieDust: https://github.com/ibm-cds-labs/pixiedust?cm_mc_uid=78151411419314871783930&cm_mc_sid_50200000=1487962969

**If you haven't already installed it, uncomment and run the following cell to install the pixiedust Python library in your notebook environment. You only need to run it once**


In [13]:
!pip install --user --upgrade pixiedust

Requirement already up-to-date: pixiedust in /gpfs/global_fs01/sym_shared/YPProdSpark/user/s6a7-9b57efaba520fc-a7e94bb08b86/.local/lib/python2.7/site-packages
Requirement already up-to-date: mpld3 in /gpfs/global_fs01/sym_shared/YPProdSpark/user/s6a7-9b57efaba520fc-a7e94bb08b86/.local/lib/python2.7/site-packages (from pixiedust)
Requirement already up-to-date: lxml in /gpfs/global_fs01/sym_shared/YPProdSpark/user/s6a7-9b57efaba520fc-a7e94bb08b86/.local/lib/python2.7/site-packages (from pixiedust)
Requirement already up-to-date: astunparse in /usr/local/src/bluemix_jupyter_bundle.v78/notebook/lib/python2.7/site-packages (from pixiedust)
Requirement already up-to-date: geojson in /gpfs/global_fs01/sym_shared/YPProdSpark/user/s6a7-9b57efaba520fc-a7e94bb08b86/.local/lib/python2.7/site-packages (from pixiedust)
Requirement already up-to-date: markdown in /gpfs/global_fs01/sym_shared/YPProdSpark/user/s6a7-9b57efaba520fc-a7e94bb08b86/.local/lib/python2.7/site-packages (from pixiedust)
Require

In [None]:
from pixiedust.display import *
display(data_df)

Pixiedust database opened successfully


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<a id="sparksql"></a>
### &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;Interactive query with Spark SQL

In [15]:
# Spark SQL also allow you to use standard SQL
data_df.createOrReplaceTempView("data_df")
sql = """
SELECT c.*
FROM data_df c
WHERE c.EstIncome>50000

"""
spark.sql(sql).toPandas().head()

Unnamed: 0,ID,Gender,Status,Children,EstIncome,CarOwner,Age,LongDistance,International,Local,Dropped,Paymethod,LocalBilltype,LongDistanceBilltype,Usage,RatePlan,CHURN
0,14,F,M,2,52004.8,N,25.14,5.03,0,23.11,0,CH,Budget,Intnl_discount,28.14,1,F
1,17,M,M,2,53010.8,N,18.84,12.45,0,46.42,4,CC,FreeLocal,Standard,58.87,1,F
2,18,M,M,1,75004.5,N,64.8,26.52,0,32.19,0,CC,Budget,Intnl_discount,58.72,1,F
3,22,M,S,1,57626.9,Y,43.906667,9.38,0,38.96,0,CC,Budget,Standard,48.35,2,F
4,35,F,S,0,78851.3,N,48.373333,0.37,0,28.66,0,CC,FreeLocal,Standard,29.04,4,T


<a id="intropipeline"></a>
# <span style="color:#fa04d9">Step 6: Introduction to Spark Pipelines (Optional. if you are already familiar with these concepts, please skip to Step 7).</span>

### In the section following this one, you will be building a SparkML Pipeline which consists of Transformers and Estimators. As a preamble to that section, users who are not familiar with the concepts and terminology of "Transformers", "Estimators" and "Pipeline" are invited to take advantage of this section to get familiarity with those concepts. Users who are already familiar with these concepts can skip directly to the next section of this notebook: Step 7


## <span style="color:green">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;In this section, you will get familiar with a few important Spark ML concepts:
### <span style="color:green">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;* Discovering some Estimators, Transformers and what they do.
### <span style="color:green">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;* Introduction to the notion of a Spark Machine Learning Pipeline.</span>

<a id="stringindexer"></a>
## <span style="color:green">Getting familiar with the SparkML Estimator: <a href="https://spark.apache.org/docs/latest/ml-features.html#stringindexer">StringIndexer</a> </span>

### StringIndexer encodes a string column of labels to a column of label indices. The indices are in [0, numLabels), ordered by label frequencies, so the most frequent label gets index 0.<br><br> Note that StringIndexer is an estimator, not a transformer. StringIndexer needs to scan the data it is given as input, to find the most frequent string and assign to it label 0, and then label 1 to the next most frequent string and so on. It will then produce a StringIndexerModel which is a transformer which can be applied to the input data using the "transform" method.

<div class="panel-group" id="accordion-10">
  <div class="panel panel-default">
    <div class="panel-heading">
      <h4 class="panel-title">
        <a data-toggle="collapse" data-parent="#accordion-10" href="#collapse1-10">
        Click on this link to expand this cell, then copy and paste the code which will appear in a new cell just below, and execute that new cell to see how StringIndexer works. (You may subsequently delete that new cell and proceed with this notebook).</a>
      </h4>
    </div>
    <div id="collapse1-10" class="panel-collapse collapse">
      <div class="panel-body">
from pyspark.ml.feature import StringIndexer<br>
<br>
df = spark.createDataFrame( <br>
    [(0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c")], <br>
    ["id", "category"]) <br>
<br>
indexer = StringIndexer(inputCol="category", outputCol="categoryIndex") <br>
indexed = indexer.fit(df).transform(df) <br>
indexed.show()
      </div>
    </div>
  </div>

<a id="indextostring"></a>
## <span style="color:green">Getting familiar with the SparkML Transformer: <a href="https://spark.apache.org/docs/latest/ml-features.html#indextostring">IndexToString</a> </span>

### Symmetrically to StringIndexer, IndexToString maps a column of label indices back to a column containing the original labels as strings. A common use case is to produce indices from labels with StringIndexer, train a model with those indices and retrieve the original labels from the column of predicted indices with IndexToString. However, you are free to supply your own labels.

<a id="onehotencoder"></a>
## <span style="color:green">Getting familiar with the SparkML Transformer: <a href="https://spark.apache.org/docs/latest/ml-features.html#onehotencoder">OneHotEncoder</a> </span>

### One-hot encoding maps a column of label indices to a column of binary vectors, with at most a single one-value. This encoding allows algorithms which expect continuous (quantitative to be precise as the output is discrete) features, such as Logistic Regression, to use categorical features. OneHotEncoder is a transformer.

<a id="bucketizer"></a>
## <span style="color:green">Getting familiar with the SparkML Transformer: <a href="https://spark.apache.org/docs/latest/ml-features.html#bucketizer">Bucketizer</a> </span>

### Bucketizer transforms a column of continuous features to a column of feature buckets, where the buckets are specified by users. It takes a parameter defining the number of buckets. Bucketizing data is also referred to as "binning".

<a id="vectorassembler"></a>
## <span style="color:green">Getting familiar with the SparkML Transformer: <a href="https://spark.apache.org/docs/latest/ml-features.html#vectorassembler">VectorAssembler</a> </span>

### VectorAssembler is a transformer that combines a given list of columns into a single vector column. It is useful for combining raw features and features generated by different feature transformers into a single feature vector, in order to train ML models like logistic regression and decision trees.

<a id="normalizer"></a>
## <span style="color:green">Getting familiar with the SparkML Transformer: <a href="https://spark.apache.org/docs/latest/ml-features.html#normalizer">Normalizer</a> </span>

### Normalizer is a Transformer which transforms a dataset of Vector rows, normalizing each Vector to have unit norm. It takes parameter p, which specifies the p-norm used for normalization. (p=2 by default.) This normalization can help standardize your input data and improve the behavior of learning algorithms.

## <span style="color:green">There are several other Estimators and Transformers which are documented in the Apache documentation online right <a href="https://spark.apache.org/docs/latest/ml-features.html">here</a>

<a id="applypipelineconcepts"></a>
# <span style="color:#fa04d9">Step 7: Applying the concepts described above to our customer churn dataset: 
** * a) Defining and applying the StringIndexer Estimator to input columns Gender, Status, CarOwner, Paymethod, LocalBilltype, LongDistanceBilltype. **<br>
** * b) Defining and applying VectorAssembler to the columns above to group them as one input vector to the model. **<br>
** * c) Defining and applying a StringIndexer Estimator to the target label column "CHURN", to encode the T/F values into 0/1. ** <br>
** * d) Defining and applying an IndexToString Transformer to reverse the output of our model from 0/1 predictions back to T/F values . ** <br>
** * e) Defining the Random Forest estimator itself, which will be trained on the input training data to produce the actual model which will perform the predictions. **

In [16]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorIndexer, IndexToString
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.classification import DecisionTreeClassifier

### &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;a) Defining a StringIndexer for the String columns in our dataset.

In [17]:
### In this dataset, we will encode columns Gender, Status, CarOwner, Paymethod, LocalBilltype and LongDistanceBilltype
# StringIndexer encodes a string column of labels to a column of label indices. 
SI1 = StringIndexer(inputCol='Gender', outputCol='GenderEncoded')
SI2 = StringIndexer(inputCol='Status',outputCol='StatusEncoded')
SI3 = StringIndexer(inputCol='CarOwner',outputCol='CarOwnerEncoded')
SI4 = StringIndexer(inputCol='Paymethod',outputCol='PaymethodEncoded')
SI5 = StringIndexer(inputCol='LocalBilltype',outputCol='LocalBilltypeEncoded')
SI6 = StringIndexer(inputCol='LongDistanceBilltype',outputCol='LongDistanceBilltypeEncoded')

### &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;b) Let's see what StringIndexer is actually doing.

In [18]:
indexed = SI4.fit(data_df).transform(data_df)
indexed.show(10)

+---+------+------+--------+---------+--------+---------+------------+-------------+------+-------+---------+-------------+--------------------+------+--------+-----+----------------+
| ID|Gender|Status|Children|EstIncome|CarOwner|      Age|LongDistance|International| Local|Dropped|Paymethod|LocalBilltype|LongDistanceBilltype| Usage|RatePlan|CHURN|PaymethodEncoded|
+---+------+------+--------+---------+--------+---------+------------+-------------+------+-------+---------+-------------+--------------------+------+--------+-----+----------------+
|  1|     F|     S|     1.0|  38000.0|       N|24.393333|       23.56|          0.0|206.08|    0.0|       CC|       Budget|      Intnl_discount|229.64|     3.0|    T|             0.0|
|  6|     M|     M|     2.0|  29616.0|       N|49.426667|       29.78|          0.0|  45.5|    0.0|       CH|    FreeLocal|            Standard| 75.29|     2.0|    F|             2.0|
|  8|     M|     M|     0.0|  19732.8|       N|50.673333|       24.81|          

In [19]:
# Spark SQL also allow you to use standard SQL
indexed.createOrReplaceTempView("indexed")
sql = """
SELECT distinct Paymethod, PaymethodEncoded, count(1)
FROM indexed c
group by c.Paymethod, PaymethodEncoded
order by 3 desc
"""
spark.sql(sql).toPandas().head()

Unnamed: 0,Paymethod,PaymethodEncoded,count(1)
0,CC,0,1237
1,Auto,1,452
2,CH,2,377


### &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;c) Define a Vector Assembler for all the columns of interest to be passed into the chosen machine learning model (columns which are encoded as well as those kept as is).

In [20]:
# Pipelines API requires that input variables are passed in  a vector
assembler = VectorAssembler(inputCols=["GenderEncoded", "StatusEncoded", "CarOwnerEncoded", "PaymethodEncoded", \
                                       "LocalBilltypeEncoded", "LongDistanceBilltypeEncoded", "Children", "EstIncome", "Age", \
                                       "LongDistance", "International", "Local", "Dropped","Usage","RatePlan"], outputCol="features")

### &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;d) Defining a StringIndexer for the label column of our model (CHURN column. The values True and False will be converted to 0 and 1).

In [21]:
# encode the label column
labelIndexer = StringIndexer(inputCol='CHURN', outputCol='label').fit(data_df)

### &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;e) Defining an IndexToString transformer to bring the labels back to True and False once the predictions are done. The model will produce a column named "prediction" which will contain 0 or 1. We will convert it back to True and False in a column named "predictedLabel."

In [22]:
# Convert indexed labels back to original labels.
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel", labels=labelIndexer.labels)

### &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;f) Defining a Random Forest estimator, and a Decision Tree (CART) estimator. These are two very popular tree based classifiers.

In [23]:
# instantiate the algorithms, take the default settings
# Random Forests
rf=RandomForestClassifier(labelCol="label", featuresCol="features")
# Classification and Regression Trees (CART)
dt=DecisionTreeClassifier(labelCol="label", featuresCol="features")

<a id="createpipeline"></a>
# <span style="color:#fa04d9">Step 8: Creating a Spark ML pipeline:
** * All the individual components of the pipeline have been defined in the section above. Notice how we will now "group" them into a pipeline object.</span> **

### In machine learning, it is common to run a sequence of algorithms to process and learn from data. E.g., a simple text document processing workflow might include several stages:
* Split each document’s text into words. 
* Convert each document’s words into a numerical feature vector.  
* Learn a prediction model using the feature vectors and labels.<br>

### MLlib represents such a workflow as a Pipeline, which consists of a sequence of PipelineStages (Transformers and Estimators) to be run in a specific order. 

We will now build the Spark pipeline including the operations defined in Step 7 above.
"Pipeline" is an API in SparkML. A pipeline defines a sequence of transformers and estimators to perform the analysis in stages.
Additional information on SparkML is available online, including at this link: https://spark.apache.org/docs/2.0.2/ml-guide.html

In [24]:
# build the pipeline
# Random Forests
rfPipeline = Pipeline(stages=[SI1,SI2,SI3,SI4,SI5,SI6, labelIndexer, assembler, rf, labelConverter])

# Classification and Regression Trees (CART)
dtPipeline = Pipeline(stages=[SI1,SI2,SI3,SI4,SI5,SI6, labelIndexer, assembler, dt, labelConverter])

### Split the data into Training and Testing sets (this is a standard best practice in data science)

In [25]:
# Split data into train and test datasets
(trainingData, testingData) = data_df.randomSplit([0.7, 0.3],seed=9)
trainingData.cache()
testingData.cache()

DataFrame[ID: int, Gender: string, Status: string, Children: double, EstIncome: double, CarOwner: string, Age: double, LongDistance: double, International: double, Local: double, Dropped: double, Paymethod: string, LocalBilltype: string, LongDistanceBilltype: string, Usage: double, RatePlan: double, CHURN: string]

### Build the model from fitting the whole pipeline using the training data set. <br><br>Note that the pipeline interface will correctly call fit+transform or just transform alone for each stage of the pipeline, depending on whether the current stage is an estimator (such as StringIndex) or a Transformer

In [26]:
# Build Random Forests model. The fitted model from a Pipeline is a PipelineModel, which consists of fitted models and transformers, corresponding to the pipeline stages.
rfModel =rfPipeline.fit(trainingData)

In [27]:
# Build Decision Trees model. The fitted model from a Pipeline is a PipelineModel, which consists of fitted models and transformers, corresponding to the pipeline stages.
dtModel =dtPipeline.fit(trainingData)

<a id="scoretestdata"></a>
# <span style="color:#fa04d9">Step 9: Score the test data set </span>

In [28]:
# Random Forests
rfResult=rfModel.transform(testingData)
rfResultDisplay=rfResult.select(rfResult["ID"],rfResult["CHURN"],rfResult["Label"],rfResult["predictedLabel"],rfResult["prediction"],rfResult["probability"])
rfResultDisplay.toPandas().head(6)

Unnamed: 0,ID,CHURN,Label,predictedLabel,prediction,probability
0,1,T,1,T,1,"[0.0390066830438, 0.960993316956]"
1,18,F,0,F,0,"[0.604568153513, 0.395431846487]"
2,22,F,0,F,0,"[0.743416529489, 0.256583470511]"
3,23,F,0,F,0,"[0.92480094066, 0.0751990593396]"
4,29,T,1,T,1,"[0.134435036135, 0.865564963865]"
5,40,T,1,T,1,"[0.222010772111, 0.777989227889]"


In [29]:
# Decision Trees
dtResult=dtModel.transform(testingData)
dtResultDisplay=dtResult.select(dtResult["ID"],dtResult["CHURN"],dtResult["Label"],dtResult["predictedLabel"],dtResult["prediction"],dtResult["probability"])
dtResultDisplay.toPandas().head(6)

Unnamed: 0,ID,CHURN,Label,predictedLabel,prediction,probability
0,1,T,1,T,1,"[0.051724137931, 0.948275862069]"
1,18,F,0,F,0,"[0.859154929577, 0.140845070423]"
2,22,F,0,F,0,"[0.920731707317, 0.0792682926829]"
3,23,F,0,F,0,"[0.976666666667, 0.0233333333333]"
4,29,T,1,T,1,"[0.27802690583, 0.72197309417]"
5,40,T,1,T,1,"[0.0, 1.0]"


<a id="evaluate"></a>
# <span style="color:#fa04d9">Step 10: Model Evaluation </span>
** Find accuracy of the model and the Area Under the ROC Curve **

In [30]:
# Random Forests
print ' Random Forests Model Accuracy = {:.2f}.'.format(rfResult.filter(rfResult.label == rfResult.prediction).count() / float(rfResult.count()))

 Random Forests Model Accuracy = 0.92.


In [31]:
# Decision Trees
print ' Decision Trees Model Accuracy = {:.2f}.'.format(dtResult.filter(dtResult.label == dtResult.prediction).count() / float(dtResult.count()))

 Decision Trees Model Accuracy = 0.89.


### Create an evaluator for the binary classification using area under the ROC Curve as the evaluation metric
Receiver operating characteristic (ROC) is a graphical plot that illustrates the performance of a binary classifier system as its discrimination threshold is varied.

Additional reading on this metric can be found very easily online, such as at this wikipedia link: https://en.wikipedia.org/wiki/Receiver_operating_characteristic


In [33]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Evaluate Random Forests model
rfEvaluator = BinaryClassificationEvaluator(rawPredictionCol="prediction", labelCol="label", metricName="areaUnderROC")
print 'Random Forests: Area under ROC curve = {:.3f}.'.format(rfEvaluator.evaluate(rfResult))

# Evaluate Decision Trees model
dtEvaluator = BinaryClassificationEvaluator(rawPredictionCol="prediction", labelCol="label", metricName="areaUnderROC")
print 'Decistion Tress: Area under ROC curve = {:.3f}.'.format(dtEvaluator.evaluate(dtResult))


Random Forests: Area under ROC curve = 0.911.
Decistion Tress: Area under ROC curve = 0.891.


<a id="execute"></a>
# <span style="color:#fa04d9">Step 11: Execute an inline invocation of the Random Forest Model.</span>

### Let us now make a prediction on some customer for which we will provide our own made up attributes

In [None]:
Gender = 'F'
Status = 'M'
CarOwner = 'N'
Paymethod = 'CC'
LocalBilltype = 'Budget'
LongDistanceBilltype = 'Standard'
Children = 1
EstIncome = 45000
Age = 30
LongDistance = 30
International = 0
Local = 100
Dropped = 0
Usage = 150
RatePlan = 2

Features = (spark.createDataFrame([(Gender, Status, CarOwner, Paymethod, LocalBilltype, LongDistanceBilltype, Children, EstIncome, Age, LongDistance, \
                                              International, Local, Dropped, Usage, RatePlan)],
    ['Gender', 'Status', 'CarOwner', 'Paymethod', 'LocalBilltype', 'LongDistanceBilltype', 'Children', 'EstIncome', 'Age', 'LongDistance', \
     'International', 'Local', 'Dropped', 'Usage', 'RatePlan']))
Features.show()

In [None]:
ChurnPrediction = rfModel.transform(Features)
ChurnPrediction.select('rawPrediction', 'probability', 'prediction').show(1, False)

### Mini Exercise: Change the number of children and/or the EstIncome in the cell prior to the one above, and observe the impact on the prediction:
* It seems that a number of children lower than 3 will result in churn, but a customer with 3 children or more will not churn.
* The rule above is true for lower incomes. With higher incomes, churn is less likely (if we change the income to 145,000 the model does not seem to predict churn anymore, regardless of the number of children)

## End of Spark Lab Excercise