# Binary Classification with Spark ML

### In this notebook, we will explore Binary Classification using Spark ML. We will exploit Spark ML's high-level APIs built on top of DataFrames to create and tune machine learning pipelines. Spark ML Pipelines enable combining multiple algorithms into a single pipeline or workflow. We will heavitly utilize Spark ML's feature transformers to convert, modify and scale the features that will be used to develop the machine learning model. Finally, we will evaluate and cross validate our model to demonstrate the process of determining a best fit model.

### The binary classification demo will utilize the famous Titanic dataset, which has been used for Kaggle competitions and can be downloaded here. There is no need to download the data manually as it is downloaded directly within the noteboook.
https://www.kaggle.com/c/titanic/data


### The Titanic data set was chosen for this binary classification demonstration because it contains both text based and numeric features that are both continuous and categorical. This will give us the opportunity to explore and utilize a number of feature transformers available in Spark ML.
     
          
               
               
    


![IBM Logo](https://encrypted-tbn0.gstatic.com/images?q=tbn:ANd9GcSzlUYaJ9xykGC-N5PijcV_eDBGCXy_pMn7sy6ymrVypmJ22q5ZmA)

## Verify Spark version and existence of Spark and Spark SQL contexts

In [1]:
print('The spark version is {}.'.format(spark.version))

The spark version is 2.0.2.


## Import required Spark libraries

In [2]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import Bucketizer
from pyspark.mllib.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.feature import Normalizer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

## Download Data

In [3]:
!rm -f Titanic.csv
!wget https://ibm.box.com/shared/static/crceca9g1ym3nl0hwaxa5c0j0m3e19l8.csv -O Titanic.csv -q
!ls -l Titanic.csv

-rw-r--r-- 1 s659-33fa456b216cab-a5f39cf201a0 users 61194 Feb 20 18:46 Titanic.csv


## Read data in as a DataFrame
### Source data is in CSV format and includes a header. We will ask Spark to infer the schema/data types.
### Drop unwanted columns and rows with null or invalid data.

In [4]:
loadTitanicData = sqlContext.read.format("csv").option("header", "true").option("inferSchema", "true").load("Titanic.csv")
TitanicData = loadTitanicData.drop("PassengerId").drop("Name").drop("Ticket").drop("Cabin").dropna(how="any", subset=("Age", "Embarked"))

##  We will use the 'Survived' column as a label for training the machine learning model
#### Spark ML requires that that the labes are data type Double, so we will cast the  column as Double (it was inferred as Integer when read into Spark).

In [5]:
LabeledTitanicData = (TitanicData.withColumn("SurvivedTemp", TitanicData["Survived"]
    .cast("Double")).drop("Survived")
    .withColumnRenamed("SurvivedTemp", "Survived"))

## Show the labeled data

In [6]:
LabeledTitanicData.sample(False, 0.01, seed=0).show(5)

+------+------+----+-----+-----+-------+--------+--------+
|Pclass|   Sex| Age|SibSp|Parch|   Fare|Embarked|Survived|
+------+------+----+-----+-----+-------+--------+--------+
|     1|female|19.0|    0|    2|26.2833|       S|     1.0|
|     1|  male|65.0|    0|    0|  26.55|       S|     0.0|
|     3|female|36.0|    1|    0|   17.4|       S|     1.0|
|     3|  male|24.5|    0|    0|   8.05|       S|     0.0|
|     1|  male|48.0|    1|    0|   52.0|       S|     1.0|
+------+------+----+-----+-----+-------+--------+--------+



In [7]:
print('The total number of rows is {}.'.format(LabeledTitanicData.count()))
print('The number of rows labeled Not Survived is {}.'.format(LabeledTitanicData.filter(LabeledTitanicData['Survived'] == 0).count()))
print('The number of rows labeled Survived is {}.'.format(LabeledTitanicData.filter(LabeledTitanicData['Survived'] == 1).count()))

The total number of rows is 712.
The number of rows labeled Not Survived is 424.
The number of rows labeled Survived is 288.


## Show the schema of the data

In [8]:
LabeledTitanicData.printSchema()

root
 |-- Pclass: integer (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Embarked: string (nullable = true)
 |-- Survived: double (nullable = true)



## StringIndexer

### StringIndexer is a transformer that encodes a string column to a column of indices. The indices are ordered by value frequencies, so the most frequent value gets index 0. If the input column is numeric, it is cast to string first. 

### For the Titanic data set, we will index the Sex/gender column as well as the Embarked column, which specifiies at which  port the passenger boarded the ship.

In [9]:
SexIndexer = StringIndexer(inputCol="Sex", outputCol="SexIndex")
EmbarkedIndexer = StringIndexer(inputCol="Embarked", outputCol="EmbarkedIndex")

## Bucketizer is a transformer that transforms a column of continuous features to a column of feature buckets, where the buckets are by a splits parameter. 

### For the Titanic data set, we will index the Age and Fare features.

In [10]:
AgeBucketSplits = [0.0, 6.0, 12.0, 18.0, 40.0, 65.0, 80.0, float("inf")]
AgeBucket = Bucketizer(splits=AgeBucketSplits, inputCol="Age", outputCol="AgeBucket")

FareBucketSplits = [0.0, 10.0, 20.0, 30.0, 40.0, 50.0, 80.0, 100.0, float("inf")]
FareBucket = Bucketizer(splits=FareBucketSplits, inputCol="Fare", outputCol="FareBucket")

## VectorAssembler is a transformer that combines a given list of columns in the order specified into a single vector column in order to train a model.

In [11]:
assembler = VectorAssembler(inputCols= ["SexIndex", "EmbarkedIndex", "AgeBucket", "FareBucket", "SibSp", "Pclass", "Parch"], outputCol="features")

## Normalizer is a Transformer which transforms a dataset of Vector rows, normalizing each Vector to have unit norm
### This normalization can help standardize your input data and improve the behavior of learning algorithms.

In [12]:
normalizer = Normalizer(inputCol="features", outputCol="normFeatures", p=1.0)

## Logistic regression is a popular method to predict a binary response
### It is a special case of Generalized Linear models that predicts the probability of an outcome.

In [13]:
lr = LogisticRegression(featuresCol="normFeatures", labelCol="Survived", predictionCol="prediction", maxIter=10, regParam=0.01, elasticNetParam=0.8)

## A Pipeline is a sequence of stages where each stage is either a Transformer or an Estimator
### These stages are run in order and the input DataFrame is transformed as it passes through each stage. 

### In machine learning, it is common to run a sequence of algorithms to process and learn from data.

In [14]:
pipeline = Pipeline(stages=[SexIndexer, EmbarkedIndexer, AgeBucket,FareBucket, assembler, normalizer, lr])

## Randomly split the source data into training and test data sets
### 90% training, 10% test
### Cache the resulting DataFrames

In [15]:
train, test = LabeledTitanicData.randomSplit([90.0,10.0], seed=1)
train.cache()
test.cache()
print('The number of records in the traininig data set is {}.'.format(train.count()))
print('The number of rows labeled Not Survived in the training data set is {}.'.format(train.filter(train['Survived'] == 0).count()))
print('The number of rows labeled Survived in the training data set is {}.'.format(train.filter(train['Survived'] == 1).count()))
train.sample(False, 0.01, seed=0).show(5)
print('')

print('The number of records in the test data set is {}.'.format(test.count()))
print('The number of rows labeled Not Survived in the test data set is {}.'.format(test.filter(train['Survived'] == 0).count()))
print('The number of rows labeled Survived in the test data set is {}.'.format(test.filter(train['Survived'] == 1).count()))
test.sample(False, 0.1, seed=0).show(5)

The number of records in the traininig data set is 628.
The number of rows labeled Not Survived in the training data set is 365.
The number of rows labeled Survived in the training data set is 263.
+------+------+----+-----+-----+-------+--------+--------+
|Pclass|   Sex| Age|SibSp|Parch|   Fare|Embarked|Survived|
+------+------+----+-----+-----+-------+--------+--------+
|     1|  male|36.0|    0|    0|26.2875|       S|     1.0|
|     3|female|22.0|    0|    0|   7.75|       Q|     1.0|
|     3|  male|16.0|    4|    1|39.6875|       S|     0.0|
|     3|  male|27.0|    0|    0|  6.975|       S|     1.0|
|     3|  male|31.0|    0|    0|  7.775|       S|     0.0|
+------+------+----+-----+-----+-------+--------+--------+


The number of records in the test data set is 84.
The number of rows labeled Not Survived in the test data set is 59.
The number of rows labeled Survived in the test data set is 25.
+------+------+----+-----+-----+------+--------+--------+
|Pclass|   Sex| Age|SibSp|Par

## Fit the pipeline to the training data

In [16]:
model = pipeline.fit(train)

## Make predictions on document in the Test data set
### Keep in mind that the model has not seen the data in the test data set

In [17]:
predictions = model.transform(test)

## Show results

In [18]:
predictions.sample(False, 0.1, seed=0).show(5)

+------+------+----+-----+-----+------+--------+--------+--------+-------------+---------+----------+--------------------+--------------------+--------------------+--------------------+----------+
|Pclass|   Sex| Age|SibSp|Parch|  Fare|Embarked|Survived|SexIndex|EmbarkedIndex|AgeBucket|FareBucket|            features|        normFeatures|       rawPrediction|         probability|prediction|
+------+------+----+-----+-----+------+--------+--------+--------+-------------+---------+----------+--------------------+--------------------+--------------------+--------------------+----------+
|     2|female|29.0|    0|    0|  10.5|       S|     1.0|     1.0|          0.0|      3.0|       1.0|[1.0,0.0,3.0,1.0,...|[0.14285714285714...|[-1.4372675609811...|[0.19196883906346...|       1.0|
|     2|  male|51.0|    0|    0|12.525|       S|     0.0|     0.0|          0.0|      4.0|       1.0|(7,[2,3,5],[4.0,1...|(7,[2,3,5],[0.571...|[1.67924947176139...|[0.84280512322624...|       0.0|
|     3|female|

In [19]:
print('The number of predictions labeled Not Survived is {}.'.format(predictions.filter(predictions['prediction'] == 0).count()))
print('The number of predictions labeled Survived is {}.'.format(predictions.filter(predictions['prediction'] == 1).count()))

The number of predictions labeled Not Survived is 55.
The number of predictions labeled Survived is 29.


In [20]:
(predictions.filter("Survived = 0.0")
     .select("Sex", "Age", "Fare", "Embarked", "Pclass", "Parch", "SibSp", "Survived", "prediction")
     .sample(False, 0.1, seed=0).show(5))

(predictions.filter("Survived = 1.0")
     .select("Sex", "Age", "Fare", "Embarked", "Pclass", "Parch", "SibSp", "Survived", "prediction")
     .sample(False, 0.5, seed=0).show(5))

+------+----+------+--------+------+-----+-----+--------+----------+
|   Sex| Age|  Fare|Embarked|Pclass|Parch|SibSp|Survived|prediction|
+------+----+------+--------+------+-----+-----+--------+----------+
|female| 8.0|21.075|       S|     3|    1|    3|     0.0|       1.0|
|  male|20.5|  7.25|       S|     3|    0|    0|     0.0|       0.0|
|  male|26.0|7.8875|       S|     3|    0|    0|     0.0|       0.0|
|  male|30.0|  8.05|       S|     3|    0|    0|     0.0|       0.0|
|  male|33.0|   9.5|       S|     3|    0|    0|     0.0|       0.0|
+------+----+------+--------+------+-----+-----+--------+----------+
only showing top 5 rows

+------+----+--------+--------+------+-----+-----+--------+----------+
|   Sex| Age|    Fare|Embarked|Pclass|Parch|SibSp|Survived|prediction|
+------+----+--------+--------+------+-----+-----+--------+----------+
|female|23.0| 113.275|       C|     1|    0|    1|     1.0|       1.0|
|  male|35.0|512.3292|       C|     1|    0|    0|     1.0|       1.0|

## Create an evaluator for the binary classification using area under the ROC Curve as the evaluation metric

### Receiver operating characteristic (ROC) is a graphical plot that illustrates the performance of a binary classifier system as its discrimination threshold is varied

The curve is created by plotting the true positive rate against the false positive rate at various threshold settings. The ROC curve is thus the sensitivity as a function of fall-out. The area under the ROC curve is useful for comparing and selecting the best machine learning model for a given data set. A model with an area under the ROC curve score near 1 has very good performance. A model with a score near 0.5 is about as good as flipping a coin.

In [21]:
evaluator = BinaryClassificationEvaluator().setLabelCol("Survived").setMetricName("areaUnderROC")
print('Area under the ROC curve = {}.'.format(evaluator.evaluate(predictions)))

Area under the ROC curve = 0.8389830508474577.


## Tune Hyperparameters
### Generate hyperparameter combinations by taking the cross product of some parameter values

Spark ML algorithms provide many hyperparameters for tuning models. These hyperparameters are distinct from the model parameters being optimized by Spark ML itself. Hyperparameter tuning is accomplished by choosing the best set of parameters based on model performance on test data that the model was not trained with. All combinations of hyperparameters specified will be tried in order to find the one that leads to the model with the best evaluation result.

## Build a Parameter Grid specifying what parameters and values will be evaluated in order to determine the best combination

In [22]:
paramGrid = (ParamGridBuilder().addGrid(lr.regParam, [0.0, 0.1, 0.3])
                 .addGrid(lr.elasticNetParam, [0.0, 1.0])
                 .addGrid(normalizer.p, [1.0, 2.0])
                 .build())

## Create a cross validator to tune the pipeline with the generated parameter grid
Spark ML provides for cross-validation for hyperparameter tuning. Cross-validation attempts to fit the underlying estimator with user-specified combinations of parameters, cross-evaluate the fitted models, and output the best one.

In [23]:
cv = CrossValidator().setEstimator(pipeline).setEvaluator(evaluator).setEstimatorParamMaps(paramGrid).setNumFolds(10)

## Cross-evaluate the ML Pipeline to find the best model
### using the area under the ROC evaluator and hyperparameters specified in the parameter grid

In [24]:
cvModel = cv.fit(LabeledTitanicData)
print('Area under the ROC curve for best fitted model = {}.'.format(evaluator.evaluate(cvModel.transform(LabeledTitanicData))))

Area under the ROC curve for best fitted model = 0.8548955057651987.


## Let's see what improvement we achieve by tuning the hyperparameters using cross-evaluation 

In [25]:
print('Area under the ROC curve for non-tuned model = {}.'.format(evaluator.evaluate(predictions)))
print('Area under the ROC curve for best fitted model = {}.'.format(evaluator.evaluate(cvModel.transform(LabeledTitanicData))))
print('Improvement = {0:0.2f}%'.format((evaluator.evaluate(cvModel.transform(LabeledTitanicData)) - evaluator.evaluate(predictions)) *100 / evaluator.evaluate(predictions)))

Area under the ROC curve for non-tuned model = 0.8389830508474577.
Area under the ROC curve for best fitted model = 0.8548955057651987.
Improvement = 1.90%


## Make improved predictions using the Cross-validated model
### Using the Test data set and DataFrame API

In [26]:
cvModel.transform(test).select("Survived", "prediction").sample(False, 0.1, seed=0).show(10)

+--------+----------+
|Survived|prediction|
+--------+----------+
|     1.0|       1.0|
|     0.0|       0.0|
|     1.0|       1.0|
|     0.0|       1.0|
|     0.0|       0.0|
|     0.0|       0.0|
|     0.0|       0.0|
+--------+----------+



### Like above, but now using SQL

In [27]:
# create temporary table
cvModel.transform(test).createOrReplaceTempView("cvModelPredictions")
spark.sql("select Survived, prediction from cvModelPredictions").sample(False, 0.1, seed=0).show(10)

+--------+----------+
|Survived|prediction|
+--------+----------+
|     1.0|       1.0|
|     0.0|       0.0|
|     1.0|       1.0|
|     0.0|       1.0|
|     0.0|       0.0|
|     0.0|       0.0|
|     0.0|       0.0|
+--------+----------+



## Make a prediction on an imaginary passenger

## Define the imaginary passenger's features

In [28]:
SexValue = 'female'
AgeValue = 40.0
FareValue = 15.0
EmbarkedValue = 'C'
PclassValue = 2
SibSpValue = 1
ParchValue = 1

PredictionFeatures = (spark.createDataFrame([(SexValue, AgeValue, FareValue, EmbarkedValue, PclassValue, SibSpValue, ParchValue)],
    ['Sex', 'Age', 'Fare', 'Embarked', 'Pclass', 'SibSp', 'Parch']))
PredictionFeatures.show()

+------+----+----+--------+------+-----+-----+
|   Sex| Age|Fare|Embarked|Pclass|SibSp|Parch|
+------+----+----+--------+------+-----+-----+
|female|40.0|15.0|       C|     2|    1|    1|
+------+----+----+--------+------+-----+-----+



## Predict whether the imaginary person would have survived
### using the best fit model

In [29]:
SurvivedOrNotPrediction = cvModel.transform(PredictionFeatures)
SurvivedOrNotPrediction.select('rawPrediction', 'probability', 'prediction').show(1, False)

+------------------------------------------+---------------------------------------+----------+
|rawPrediction                             |probability                            |prediction|
+------------------------------------------+---------------------------------------+----------+
|[0.19683342469048395,-0.19683342469048395]|[0.5490500943973472,0.4509499056026528]|0.0       |
+------------------------------------------+---------------------------------------+----------+



## Display Prediction Result

In [30]:
SurvivedOrNot = SurvivedOrNotPrediction.select("prediction").first()[0]
if SurvivedOrNot == 0.0:
    print("Did NOT Survive")
elif(SurvivedOrNot == 1.0):
    print("Did Survive!!!")
else:
    print("Invalid Prediction")

Did NOT Survive


## Let's take a quick look at applying the feature engineering performed above to a Random Forest Model
### Random forests are ensembles of decision trees. They combine many decision trees in order to reduce the risk of overfitting.
### We won't do any hyperparamter tuning in this example, but just show how to create and evaluate the model using all default hyperparameters

In [None]:
from pyspark.ml.classification import RandomForestClassificationModel, RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import IndexToString

# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
labelIndexer = StringIndexer().setInputCol("Survived").setOutputCol("indexedLabel").fit(LabeledTitanicData)

# Train a RandomForest model
rf = RandomForestClassifier().setLabelCol("indexedLabel").setFeaturesCol("features").setNumTrees(20)

# Convert indexed labels back to original labels.
labelConverter = IndexToString().setInputCol("prediction").setOutputCol("predictedLabel").setLabels(labelIndexer.labels)

# Create new Pipeline using the RandomForest model and all the same feature transformers used above for logistic regression
pipelineRF = Pipeline().setStages([labelIndexer, SexIndexer, EmbarkedIndexer, AgeBucket, FareBucket, assembler, normalizer, rf, labelConverter])

# Train model.
modelRF = pipelineRF.fit(train)

# Make predictions.
predictionsRF = modelRF.transform(test)

# Select example rows to display.
predictionsRF.select("predictedLabel", "Survived", "features").show(10)

# Select (prediction, true label) and compute test error
evaluatorRF = MulticlassClassificationEvaluator().setLabelCol("Survived").setPredictionCol("prediction").setMetricName("accuracy")
accuracyRF = evaluatorRF.evaluate(predictionsRF)
print("Test Error = %g" % (1.0 - accuracyRF))

rfModel = modelRF.stages[7]
print(rfModel)  # summary only

![IBM Logo](http://www-03.ibm.com/press/img/Large_IBM_Logo_TN.jpg)

Rich Tarro  
Big Data Architect, IBM Corporation  
email: rtarro@us.ibm.com

February 17, 2017