# Lab 3: Binary Classification with Spark ML

### In this notebook, we will explore Binary Classification using Spark ML. We will exploit Spark ML's high-level APIs built on top of DataFrames to create and tune machine learning pipelines. Spark ML Pipelines enable combining multiple algorithms into a single pipeline or workflow. We will heavily utilize Spark ML's feature transformers to convert, modify and scale the features that will be used to develop the machine learning model. Finally, we will evaluate and cross validate our model to demonstrate the process of determining a best fit model.

### The binary classification demo will utilize the famous Titanic dataset, which has been used for Kaggle competitions and can be downloaded here. There is no need to download the data manually as it is downloaded directly within the noteboook.
https://www.kaggle.com/c/titanic/data


### The Titanic data set was chosen for this binary classification demonstration because it contains both text based and numeric features that are both continuous and categorical. This will give us the opportunity to explore and utilize a number of feature transformers available in Spark ML.
     
          

![IBM Logo](https://encrypted-tbn0.gstatic.com/images?q=tbn:ANd9GcSzlUYaJ9xykGC-N5PijcV_eDBGCXy_pMn7sy6ymrVypmJ22q5ZmA)

## Table of contents

1. [Install needed libraries](#libraries)<br/>
2. [Get the Data](#getdata)<br/>
3. [Prepare and clean the data](#prepare)<br/>
    3.1 [Remove unneeded columns](#remove)<br/>
4. [Transform the data](#transform)<br/>
    4.1 [Gender and Embarkation](#stringindexer)<br/>
    4.2 [Age and Fare](#bucketizer)<br/>
5. [Build the Model](#build)<br/>
6. [Split the data into train and test sets](#split)<br/>
7. [Test the Model](#test)<br/>
8. [Tune the Model](#tune)<br/>
9. [Predict imaginary passenger](#predict)<br/>
10. [Random Forest](#randomforest)<br/>
11. [Save Model](#savemodel)<br/>
12. [Summary](#summary)<br/>

## Verify Spark version and existence of Spark and Spark SQL contexts

In [None]:
print('The spark version is {}.'.format(spark.version))

<a id="libraries"></a>
## 1 - Import required Spark libraries 


#### After executing this block, you should see a message saying that the `Pixiedust database opened successfully`.

In [None]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import Bucketizer
from pyspark.mllib.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.feature import Normalizer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

#import pixiedust display module
from pixiedust.display import *

<a id="getdata"></a>
## 2 - Download Data

In [None]:
!rm -f Titanic.csv
!wget https://ibm.box.com/shared/static/crceca9g1ym3nl0hwaxa5c0j0m3e19l8.csv -O Titanic.csv -q
!ls -l Titanic.csv

### Read data in as a DataFrame
### Source data is in CSV format and includes a header. We will ask Spark to infer the schema/data types.

In [None]:
loadTitanicData = sqlContext.read.format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat").option("header", "true").option("inferSchema", "true").load("Titanic.csv")

<a id="prepare"></a>
## 3. Prepare and shape the data

PixieDust is an open-source IBM library which can be used to easily and flexibly `display` data.

Use PixieDust to examine the schema (click on the Schema line).   Try differing displays of the data using PixieDust.

For example, try showing a histogram of `fare` or `age` or `pclass`.    Change the renderer and see what happens.
<br>
 <div class="panel-group" id="accordion-3">
  <div class="panel panel-default">
    <div class="panel-heading">
      <h4 class="panel-title">
        <a data-toggle="collapse" data-parent="#accordion-3" href="#collapse1-3">
        Hint 1</a>
      </h4>
    </div>
    <div id="collapse1-3" class="panel-collapse collapse">
      <div class="panel-body">After executing the display(loadTitanticData) code cell below, select the Chart icon and select Histogram</div>
    </div>
  </div>
  <div class="panel panel-default">
    <div class="panel-heading">
      <h4 class="panel-title">
        <a data-toggle="collapse" data-parent="#accordion-3" href="#collapse2-3">
        Hint 2</a>
      </h4>
    </div>
    <div id="collapse2-3" class="panel-collapse collapse">
      <div class="panel-body">After executing the display(loadTitanticData) code cell below, select the Options button.   Drag the age (or fare or class) field to the values column.   Change number of rows to display to more than the number of rows read in (1400 will do)</div>
    </div>
  </div>
  <div class="panel panel-default">
    <div class="panel-heading">
      <h4 class="panel-title">
        <a data-toggle="collapse" data-parent="#accordion-3" href="#collapse3-3">
        Hint 3</a>
      </h4>
    </div>
    <div id="collapse3-3" class="panel-collapse collapse">
      <div class="panel-body">After executing the display(loadTitanticData) code cell below, change the renderer (dropdown on upper right) to something else, such as `seaborn` or `bokeh`</div>
    </div>
  </div>
</div> 

In [None]:
display(loadTitanicData)

<a id="remove"></a>
## 3.1 - Drop unwanted columns and rows with null or invalid data.

In [None]:
loadTitanicData = loadTitanicData.drop("PassengerId").drop("Name").drop("Ticket").drop("Cabin").dropna(how="any", subset=("Age", "Embarked"))

##  We will use the 'Survived' column as a label for training the machine learning model
#### Spark ML requires that that the labels are data type Double, so we will cast the  column as Double (it was inferred as Integer when read into Spark).

In [None]:
LabeledTitanicData = (loadTitanicData.withColumn("SurvivedTemp", loadTitanicData["Survived"]
    .cast("Double")).drop("Survived")
    .withColumnRenamed("SurvivedTemp", "Survived"))

## Show the labeled data

In [None]:
LabeledTitanicData.sample(False, 0.01, seed=0).show(5)

## Print some record counts

In [None]:
print('The total number of rows is {}.'.format(LabeledTitanicData.count()))
print('The number of rows labeled Not Survived is {}.'.format(LabeledTitanicData.filter(LabeledTitanicData['Survived'] == 0).count()))
print('The number of rows labeled Survived is {}.'.format(LabeledTitanicData.filter(LabeledTitanicData['Survived'] == 1).count()))

## Show the schema of the labeled data

In [None]:
LabeledTitanicData.printSchema()

<a id="transform"></a>
## 4. Transform the data

Certain data fields need to be transformed before building the model.   This can be for several reasons ranging from needing to convert String values to numeric values or shaping data into different formats.

<a id="stringindexer"></a>
## 4.1 Use <a href="https://spark.apache.org/docs/latest/ml-features.html#stringindexer">StringIndexer</a> to transform gender and embarked values

StringIndexer is a transformer that encodes a string column to a column of indices. The indices are ordered by value frequencies, so the most frequent value gets index 0. If the input column is numeric, it is cast to string first. 

For the Titanic data set, we will index the Sex/Gender column as well as the Embarked column, which specifies at which  port the passenger boarded the ship.## StringIndexer

In [None]:
SexIndexer = StringIndexer(inputCol="Sex", outputCol="SexIndex")
EmbarkedIndexer = StringIndexer(inputCol="Embarked", outputCol="EmbarkedIndex")

<a id="bucketizer"></a>
## 4.2 <a href="https://spark.apache.org/docs/latest/ml-features.html#bucketizer">Bucketizer</a> is a transformer that transforms a column of continuous features to a column of feature buckets, where the buckets are by a splits parameter. 

For the Titanic data set, we will index the Age and Fare features.

<br/>
<div class="panel-group" id="accordion-42">
  <div class="panel panel-default">
    <div class="panel-heading">
      <h4 class="panel-title">
        <a data-toggle="collapse" data-parent="#accordion-42" href="#collapse1-42">
        Advanced Optional</a>
      </h4>
    </div>
    <div id="collapse1-42" class="panel-collapse collapse">
      <div class="panel-body">After completing the lab, note the prediction percentage then come back and change the values for either Bucketizer and re-run the kernel [Kernel->Restart and Run All].   Note the change in prediction accuracy.</div>
    </div>
  </div>
</div> 

In [None]:
AgeBucketSplits = [0.0, 6.0, 12.0, 18.0, 40.0, 65.0, 80.0, float("inf")]
AgeBucket = Bucketizer(splits=AgeBucketSplits, inputCol="Age", outputCol="AgeBucket")

FareBucketSplits = [0.0, 10.0, 20.0, 30.0, 40.0, 50.0, 80.0, 100.0, float("inf")]
FareBucket = Bucketizer(splits=FareBucketSplits, inputCol="Fare", outputCol="FareBucket")

<a id="build"></a>
## 5. Building the Model

## <a href="https://spark.apache.org/docs/latest/ml-features.html#vectorassembler">VectorAssembler</a> is a transformer that combines a given list of columns in the order specified into a single vector column in order to train a model.

<br/>
<div class="panel-group" id="accordion-5">
  <div class="panel panel-default">
    <div class="panel-heading">
      <h4 class="panel-title">
        <a data-toggle="collapse" data-parent="#accordion-5" href="#collapse1-5">
        Advanced Optional</a>
      </h4>
    </div>
    <div id="collapse1-5" class="panel-collapse collapse">
      <div class="panel-body">After completing the lab, note the prediction percentage then come back and remove some of the values in the assembler (i.e. remove sibsp, pclass and parch or remove SexIndex) and re-run the kernel [Kernel->Restart and Run All].   Note the change in prediction accuracy.</div>
    </div>
  </div>
</div> 

In [None]:
assembler = VectorAssembler(inputCols= ["SexIndex", "EmbarkedIndex", "AgeBucket", "FareBucket", "SibSp", "Pclass", "Parch"], outputCol="features")

## Normalizer is a Transformer which transforms a dataset of Vector rows, normalizing each Vector to have unit norm
### This normalization can help standardize your input data and improve the behavior of learning algorithms.

In [None]:
normalizer = Normalizer(inputCol="features", outputCol="normFeatures", p=1.0)

## Logistic regression is a popular method to predict a binary response
### It is a special case of Generalized Linear models that predicts the probability of an outcome.

In [None]:
lr = LogisticRegression(featuresCol="normFeatures", labelCol="Survived", predictionCol="prediction", maxIter=10, regParam=0.1, elasticNetParam=0.8)

## A Pipeline is a sequence of stages where each stage is either a Transformer or an Estimator
### These stages are run in order and the input DataFrame is transformed as it passes through each stage. 

### In machine learning, it is common to run a sequence of algorithms to process and learn from data.

In [None]:
pipeline = Pipeline(stages=[SexIndexer, EmbarkedIndexer, AgeBucket,FareBucket, assembler, normalizer, lr])

<a id="split"></a>
## 6 - Split the data into training (90%) and testing (10%) sets using <a href="https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.randomSplit">random_split()</a>

Set seed to 1 in order to make certain this is repeatable.
<br>
 <div class="panel-group" id="accordion-6">
  <div class="panel panel-default">
    <div class="panel-heading">
      <h4 class="panel-title">
        <a data-toggle="collapse" data-parent="#accordion-6" href="#collapse1-6">
        Hint 1</a>
      </h4>
    </div>
    <div id="collapse1-6" class="panel-collapse collapse">
      <div class="panel-body">train, test = LabeledTitanicData.randomSplit()</div>
    </div>
  </div>
  <div class="panel panel-default">
    <div class="panel-heading">
      <h4 class="panel-title">
        <a data-toggle="collapse" data-parent="#accordion-6" href="#collapse2-6">
        Hint 2</a>
      </h4>
    </div>
    <div id="collapse2-6" class="panel-collapse collapse">
      <div class="panel-body">train, test = LabeledTitanicData.randomSplit([??,??], seed=??)</div>
    </div>
  </div>
  <div class="panel panel-default">
    <div class="panel-heading">
      <h4 class="panel-title">
        <a data-toggle="collapse" data-parent="#accordion-6" href="#collapse3-6">
        Solution</a>
      </h4>
    </div>
    <div id="collapse3-6" class="panel-collapse collapse">
      <div class="panel-body">train, test = LabeledTitanicData.randomSplit([90.0,10.0], seed=1)</div>
    </div>
  </div>
</div> 

In [None]:
train, test = LabeledTitanicData.
train.cache()
test.cache()
print('The number of records in the traininig data set is {}.'.format(train.count()))
print('The number of rows labeled Not Survived in the training data set is {}.'.format(train.filter(train['Survived'] == 0).count()))
print('The number of rows labeled Survived in the training data set is {}.'.format(train.filter(train['Survived'] == 1).count()))
train.sample(False, 0.01, seed=0).show(5)
print('')

print('The number of records in the test data set is {}.'.format(test.count()))
print('The number of rows labeled Not Survived in the test data set is {}.'.format(test.filter(train['Survived'] == 0).count()))
print('The number of rows labeled Survived in the test data set is {}.'.format(test.filter(train['Survived'] == 1).count()))
test.sample(False, 0.1, seed=0).show(5)

## Fit the pipeline to the training data

In [None]:
model = pipeline.fit(train)

<a id="test"></a>
## 7 - Make predictions on passengers in the Test data set
### Keep in mind that the model has not seen the data in the test data set

In [None]:
predictions = model.transform(test)

## Show results

In [None]:
predictions.sample(False, 0.1, seed=0).show(5)

In [None]:
print('The number of predictions labeled Not Survived is {}.'.format(predictions.filter(predictions['prediction'] == 0).count()))
print('The number of predictions labeled Survived is {}.'.format(predictions.filter(predictions['prediction'] == 1).count()))

In [None]:
(predictions.filter("Survived = 0.0")
     .select("Sex", "Age", "Fare", "Embarked", "Pclass", "Parch", "SibSp", "Survived", "prediction")
     .sample(False, 0.1, seed=0).show(5))

(predictions.filter("Survived = 1.0")
     .select("Sex", "Age", "Fare", "Embarked", "Pclass", "Parch", "SibSp", "Survived", "prediction")
     .sample(False, 0.5, seed=0).show(5))

## Create an evaluator for the binary classification using area under the ROC Curve as the evaluation metric

### Receiver operating characteristic (ROC) is a graphical plot that illustrates the performance of a binary classifier system as its discrimination threshold is varied

The curve is created by plotting the true positive rate against the false positive rate at various threshold settings. The ROC curve is thus the sensitivity as a function of fall-out. The area under the ROC curve is useful for comparing and selecting the best machine learning model for a given data set. A model with an area under the ROC curve score near 1 has very good performance. A model with a score near 0.5 is about as good as flipping a coin.

In [None]:
evaluator = BinaryClassificationEvaluator().setLabelCol("Survived").setMetricName("areaUnderROC")
print('Area under the ROC curve = {}.'.format(evaluator.evaluate(predictions)))

<a id="tune"></a>
## 8 - Tune Hyperparameters
### Generate hyperparameter combinations by taking the cross product of some parameter values

Spark ML algorithms provide many hyperparameters for tuning models. These hyperparameters are distinct from the model parameters being optimized by Spark ML itself. Hyperparameter tuning is accomplished by choosing the best set of parameters based on model performance on test data that the model was not trained with. All combinations of hyperparameters specified will be tried in order to find the one that leads to the model with the best evaluation result.

## Build a Parameter Grid specifying what parameters and values will be evaluated in order to determine the best combination

In [None]:
paramGrid = (ParamGridBuilder().addGrid(lr.regParam, [0.0, 0.1, 0.3])
                 .addGrid(lr.elasticNetParam, [0.0, 0.8, 1.0])
                 .addGrid(normalizer.p, [1.0, 2.0])
                 .build())

## Create a cross validator to tune the pipeline with the generated parameter grid
Spark ML provides for cross-validation for hyperparameter tuning. Cross-validation attempts to fit the underlying estimator with user-specified combinations of parameters, cross-evaluate the fitted models, and output the best one.

In [None]:
cv = CrossValidator().setEstimator(pipeline).setEvaluator(evaluator).setEstimatorParamMaps(paramGrid).setNumFolds(10)

## Cross-evaluate the ML Pipeline to find the best model
### using the area under the ROC evaluator and hyperparameters specified in the parameter grid

In [None]:
cvModel = cv.fit(train)
print('Area under the ROC curve for best fitted model = {}.'.format(evaluator.evaluate(cvModel.transform(test))))

## Let's see what improvement we achieve by tuning the hyperparameters using cross-evaluation 

In [None]:
print('Area under the ROC curve for non-tuned model = {}.'.format(evaluator.evaluate(predictions)))
print('Area under the ROC curve for best fitted model = {}.'.format(evaluator.evaluate(cvModel.transform(test))))
print('Improvement = {0:0.2f}%'.format((evaluator.evaluate(cvModel.transform(test)) - evaluator.evaluate(predictions)) *100 / evaluator.evaluate(predictions)))

## Make improved predictions using the Cross-validated model
### Using the Test data set and DataFrame API

In [None]:
cvModel.transform(test).select("Survived", "prediction").sample(False, 0.1, seed=0).show(10)

### Like above, but now using SQL

In [None]:
# create temporary table
cvModel.transform(test).createOrReplaceTempView("cvModelPredictions")
spark.sql("select Survived, prediction from cvModelPredictions").sample(False, 0.1, seed=0).show(10)

<a id="predict"></a>
## 9 - Make a prediction on an imaginary passenger

## Define the imaginary passenger's features

In [None]:
SexValue = 'female'
AgeValue = 40.0
FareValue = 15.0
EmbarkedValue = 'C'
PclassValue = 2
SibSpValue = 1
ParchValue = 1

PredictionFeatures = (spark.createDataFrame([(SexValue, AgeValue, FareValue, EmbarkedValue, PclassValue, SibSpValue, ParchValue)],
    ['Sex', 'Age', 'Fare', 'Embarked', 'Pclass', 'SibSp', 'Parch']))
PredictionFeatures.show()

## Predict whether the imaginary person would have survived
### using the best fit model

In [None]:
SurvivedOrNotPrediction = cvModel.transform(PredictionFeatures)
SurvivedOrNotPrediction.select('rawPrediction', 'probability', 'prediction').show(1, False)

## Display Prediction Result

In [None]:
SurvivedOrNot = SurvivedOrNotPrediction.select("prediction").first()[0]
if SurvivedOrNot == 0.0:
    print("Did NOT Survive")
elif(SurvivedOrNot == 1.0):
    print("Did Survive!!!")
else:
    print("Invalid Prediction")

<a id="randomforest"></a>
## 10 - Let's take a quick look at applying the feature engineering performed above to a Random Forest Model
### Random forests are ensembles of decision trees. They combine many decision trees in order to reduce the risk of overfitting.
### We won't do any hyperparamter tuning in this example, but just show how to create and evaluate the model using all default hyperparameters

In [None]:
from pyspark.ml.classification import RandomForestClassificationModel, RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import IndexToString

# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
labelIndexer = StringIndexer().setInputCol("Survived").setOutputCol("indexedLabel").fit(LabeledTitanicData)

# Train a RandomForest model
rf = RandomForestClassifier().setLabelCol("indexedLabel").setFeaturesCol("features").setNumTrees(20)

# Convert indexed labels back to original labels.
labelConverter = IndexToString().setInputCol("prediction").setOutputCol("predictedLabel").setLabels(labelIndexer.labels)

# Create new Pipeline using the RandomForest model and all the same feature transformers used above for logistic regression
pipelineRF = Pipeline().setStages([labelIndexer, SexIndexer, EmbarkedIndexer, AgeBucket, FareBucket, assembler, normalizer, rf, labelConverter])

# Train model.
modelRF = pipelineRF.fit(train)

# Make predictions.
predictionsRF = modelRF.transform(test)

# Select example rows to display.
predictionsRF.select("predictedLabel", "Survived", "features").show(10)

# Select (prediction, true label) and compute test error
evaluatorRF = MulticlassClassificationEvaluator().setLabelCol("Survived").setPredictionCol("prediction").setMetricName("accuracy")
accuracyRF = evaluatorRF.evaluate(predictionsRF)
print("Accuracy = %g" % accuracyRF)
print("Test Error = %g" % (1.0 - accuracyRF))

rfModel = modelRF.stages[7]
print(rfModel)  # summary only

<a id="savemodel"></a>
## 11 - Save Model - This section below demonstrates the Watson Machine Language API which allows you to programmatically save the model that was trained to a model repository.  The model can then be deployed for use in a production application.  

### Persist model to the Watson Machine Learning Repository
### A Watson Machine Learning service should already have been created. You will need to obtain the credentials of the service in order to save the model. 

1. Click on Services and then click on Watson Services. <img alt="Service Credentials" src="https://raw.githubusercontent.com/bleonardb3/SparkPOT/master/Lab-3/images/Services-Watson%20Services.png">
1. Click on Machine Learning (or whatever you named the Machine Learning service) <img alt="Service Credentials" src="https://raw.githubusercontent.com/bleonardb3/SparkPOT/master/Lab-3/images/ClickonMachineLearning.png">
1. Click on Service Credentials in the left panel. <img alt="Service Credentials" src="https://raw.githubusercontent.com/bleonardb3/DSX/master/Lab-2/images/MLServiceCredentials.png">
1. Click on New Credential. <img alt="New Credentials" src="https://raw.githubusercontent.com/bleonardb3/DSX/master/Lab-2/images/MLNewCredential.png"> 
1. On the Add new credential popup, click on Add. <img alt="Add Credential" src="https://raw.githubusercontent.com/bleonardb3/DSX/master/Lab-2/images/MLAddCredential.png"> 
1. In the Credentials-1 row, click on the down arrow next to View Credentials. <img alt="View Credentials" src="https://raw.githubusercontent.com/bleonardb3/DSX/master/Lab-2/images/MLViewCredentials.png"> 
1. Copy the credentials shown into the appropriate places in the notebook cell below. 

In [None]:
# @hidden_cell
# The following code contains the credentials the Machine Learning service. 
# You might want to remove those credentials before you share your notebook.

wml_credentials= {
  "url": "",
  "username": "",
  "password": "",
  "instance_id": ""
}


In [None]:
from repository_v3.mlrepositoryclient import MLRepositoryClient
from repository_v3.mlrepositoryartifact import MLRepositoryArtifact
from repository_v3.mlrepository import MetaProps, MetaNames
import json

In [None]:
ml_repository_client = MLRepositoryClient(wml_credentials['url'])
ml_repository_client.authorize(wml_credentials['username'], wml_credentials['password'])

In [None]:
meta_props=MetaProps({
    MetaNames.EVALUATION_METHOD: "binary",
    MetaNames.EVALUATION_METRICS: json.dumps([{
        "name": "areaUnderROC",
        "value":evaluator.evaluate(cvModel.transform(test)),
        "threshold":0.8
    }])
})

## After the cell below is executed, a model entry "Titanic Notebook Model" will appear in the Project Model section. 

In [None]:
pipeline_artifact = MLRepositoryArtifact(pipeline, name="pipeline")
model_artifact = MLRepositoryArtifact(model, training_data=train, pipeline_artifact=pipeline_artifact, meta_props=meta_props,name="Titanic Notebook Model")
saved_model = ml_repository_client.models.save(model_artifact)


<a id="summary"></a>
![IBM Logo](http://www-03.ibm.com/press/img/Large_IBM_Logo_TN.jpg)

You created a predictive model that predicts survival probabilities for passengers on the Titanic.

  - Load the data
  - Cleaned the data
  - Created transformers to shape the data
  - Created a model using Pipeline
  - Split the data into training and test sets
  - Tested the model
  - Tuned the model
  - Tested the model on an imaginary passenger
  - Build a second model using Random Forest
  - Save model to the model repository
  