<table style="border: none" align="left">
   <tr style="border: none">
          <th style="border: none"><font face="verdana" size="5" color="black"><b>Lab: Build a model with data stored in a Bluemix dashDB service</b></th>
      <th style="border: none"><img src="https://github.com/pmservice/customer-satisfaction-prediction/blob/master/app/static/images/ml_icon_gray.png?raw=true" alt="Watson Machine Learning icon" height="40" width="40"></th>
   </tr>
</table>

### Step 1: Connect to dashDB and load CUSTOMER table

#### Important: Replace dashDB connection information for loading data from Customer and Churn tables prior to running the cells.

In [None]:
# Import the required API and instantiate Spark Context
from ingest.Connectors import Connectors
from pyspark.sql import SparkSession

sparkSession = SparkSession.builder.master("local").appName("spark session example").getOrCreate()

# IMPORTANT: Replace all values with values in your dashDB instance
customerTable = { Connectors.DASHDB.HOST              : 'dashdb-entry-yp-dal09-09.services.dal.bluemix.net',
                      Connectors.DASHDB.DATABASE          : 'BLUDB',
                      Connectors.DASHDB.USERNAME          : 'dash9737',
                      Connectors.DASHDB.PASSWORD          : 'gDO~np@2IKj4',
                      Connectors.DASHDB.SOURCE_TABLE_NAME : 'DASH9737.CUSTOMER'}

customer = sparkSession.read.format("com.ibm.spark.discover").options(**customerTable).load()
customer.printSchema()
customer.show()

### Step 2: Connect to dashDB and load CHURN table

In [None]:
# IMPORTANT: Replace all values with values in your dashDB instance
churnTable = { Connectors.DASHDB.HOST              : 'dashdb-entry-yp-dal09-09.services.dal.bluemix.net',
                      Connectors.DASHDB.DATABASE          : 'BLUDB',
                      Connectors.DASHDB.USERNAME          : 'dash9737',
                      Connectors.DASHDB.PASSWORD          : 'gDO~np@2IKj4',
                      Connectors.DASHDB.SOURCE_TABLE_NAME : 'DASH9737.CHURN'}

customer_churn = sparkSession.read.format("com.ibm.spark.discover").options(**churnTable).load()
customer_churn.printSchema()
customer_churn.show()

### Step 3: Merge Files

In [None]:
merged=customer.join(customer_churn,customer['ID']==customer_churn['ID']).select(customer['*'],customer_churn['CHURN'])

### Step 4: Rename some columns
This step is to clean up columns names

In [None]:
merged = merged.withColumnRenamed("LONGDISTANCE", "LONG_DISTANCE").withColumnRenamed("PAYMETHOD", "PAY_METHOD").withColumnRenamed("LOCALBILLTYPE","LOCAL_BILL_TYPE").withColumnRenamed("LONGDISTANCEBILLTYPE","LONG_DISTANCE_BILLTYPE")
merged.toPandas().head()

In [None]:
merged.printSchema()

### Step 5: Build the Spark pipeline and the Random Forest model
"Pipeline" is an API in SparkML that's used for building models.
Additional information on SparkML: https://spark.apache.org/docs/2.0.2/ml-guide.html

In [None]:
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import RandomForestClassifier

# Prepare string variables so that they can be used by the decision tree algorithm
stringIndexer1 = StringIndexer(inputCol='GENDER', outputCol='GENDER_ENCODED')
stringIndexer2 = StringIndexer(inputCol='STATUS',outputCol='STATUS_ENCODED')
stringIndexer3 = StringIndexer(inputCol='CAR_OWNER',outputCol='CAR_OWNER_ENCODED')
stringIndexer4 = StringIndexer(inputCol='PAY_METHOD',outputCol='PAY_METHOD_ENCODED')
stringIndexer5 = StringIndexer(inputCol='LOCAL_BILL_TYPE',outputCol='LOCAL_BILL_TYPE_ENCODED')
stringIndexer6 = StringIndexer(inputCol='LONG_DISTANCE_BILLTYPE',outputCol='LONG_DISTANCE_BILLTYPE_ENCODED')
stringIndexer7 = StringIndexer(inputCol='CHURN', outputCol='label')

# Pipelines API requires that input variables are passed in  a vector
assembler = VectorAssembler(inputCols=["GENDER_ENCODED", "STATUS_ENCODED", "CAR_OWNER_ENCODED", "PAY_METHOD_ENCODED", "LOCAL_BILL_TYPE_ENCODED", \
                                       "LONG_DISTANCE_BILLTYPE_ENCODED", "CHILDREN", "EST_INCOME", "AGE", "LONG_DISTANCE", "INTERNATIONAL", "LOCAL",\
                                      "DROPPED","USAGE","RATEPLAN"], outputCol="features")


# instantiate the algorithm, take the default settings
rf=RandomForestClassifier(labelCol="label", featuresCol="features")

#pipeline = Pipeline(stages=[stringIndexer1, stringIndexer2, stringIndexer3, assembler, rf])
pipeline = Pipeline(stages=[stringIndexer1,stringIndexer2,stringIndexer3,stringIndexer4,stringIndexer5,stringIndexer6,stringIndexer7, assembler, rf])

In [None]:
# Split data into train and test datasets
train, test = merged.randomSplit([0.8,0.2], seed=6)

In [None]:
# Build models
model = pipeline.fit(train)

### Step 6: Score the test data set

In [None]:
results = model.transform(test)

### Step 7: Model Evaluation 

In [None]:
print 'Precision model1 = {:.2f}.'.format(results.filter(results.label == results.prediction).count() / float(results.count()))

In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Evaluate model
evaluator = BinaryClassificationEvaluator(rawPredictionCol="prediction", labelCol="label", metricName="areaUnderROC")
print 'Area under ROC curve = {:.2f}.'.format(evaluator.evaluate(results))

You have come to the end of this notebook


**Sidney Phoon**
<br/>
yfphoon@us.ibm.com
<br/>
April 25, 2017