# Binary Classification with Spark ML

### In this notebook, we will explore Binary Classification using Spark ML. We will exploit Spark ML's high-level APIs built on top of DataFrames to create and tune machine learning pipelines. Spark ML Pipelines enable combining multiple algorithms into a single pipeline or workflow. We will heavily utilize Spark ML's feature transformers to convert, modify and scale the features that will be used to develop the machine learning model. Finally, we will evaluate and cross validate our model to demonstrate the process of determining a best fit model.

### In statistics, logistic regression, or logit regression, or logit model[1] is a regression model where the dependent variable (DV) is categorical. This article covers the case of a binary dependent variable—that is, where it can take only two values, "0" and "1", which represent outcomes such as pass/fail, win/lose, alive/dead or healthy/sick. Cases where the dependent variable has more than two outcome categories may be analysed in multinomial logistic regression, or, if the multiple categories are ordered, in ordinal logistic regression.[2] In the terminology of economics, logistic regression is an example of a qualitative response/discrete choice model.

### The binary classification demo will utilize the famous Titanic dataset, which has been used for Kaggle competitions and can be downloaded here. There is no need to download the data manually as it is downloaded directly within the noteboook.   The Kaggle dataset (training data only) can be found here:
https://www.kaggle.com/c/titanic/data


### The Titanic data set was chosen for this binary classification demonstration because it contains both text based and numeric features that are both continuous and categorical. This will give us the opportunity to explore and utilize a number of feature transformers available in Spark ML.
     
          
               
               
    


![IBM Logo](https://encrypted-tbn0.gstatic.com/images?q=tbn:ANd9GcSzlUYaJ9xykGC-N5PijcV_eDBGCXy_pMn7sy6ymrVypmJ22q5ZmA)

## Table of contents

1. [Install needed libraries](#libraries)<br/>
2. [Get the Data](#getdata)<br/>
3. [Prepare and clean the data](#prepare)<br/>
    3.1 [Remove unneeded columns](#remove)<br/>
    3.2 [Drop rows with invalid numeric values](#drop1)<br/>
    3.3 [Drop rows with invalid object values](#drop2)<br/>
4. [Split the data into train and test sets](#split)<br/>
5. [Examine the data](#examine)<br/>
    5.1 [Sibling/Spouse](#sibsp)<br/>
    5.2 [Parents/Children](#parch)<br/>
    5.3 [Age](#age)<br/>
    5.4 [Fare](#fare)<br/>
    5.5 [Gender](#gender)<br/>
    5.6 [Class](#pclass)<br/>
    5.7 [Embarkation](#embarkation)<br/>
6. [Transform the data](#transform)<br/>
    6.1 [Gender and Embarkation](#stringindexer)<br/>
    6.2 [Age and Fare](#bucketizer)<br/>
7. [Build the Model](#build)<br/>
8. [Test the Model](#test)<br/>
9. [Tune the Model](#tune)<br/>
10. [Predict imaginary passenger](#predict)<br/>
11. [Random Forest](#randomforest)<br/>
12. [Summary](#summary)<br/>

<a id="libraries"></a>
## 1. Install pixiedust and sklearn libraries (upgrade if needed)

<a href="https://www.ibm.com/analytics/us/en/watson-data-platform/pixiedust/">PixieDust</a> is an open source add-on created by IBM for Jupyter Notebooks to make working with data simple.<br>
<a href="http://scikit-learn.org/stable/">sklearn</a> is an open-source machine-learning library for Python.

In [None]:
!pip install --user --upgrade pixiedust
!pip install --user --upgrade sklearn
!pip install --user --upgrade seaborn
!pip install --user --upgrade statsmodels


## Please wait until all modules are installed. The asterisk between the brackets above will become a number when the processing is complete. 
### Note, you will need to Restart the kernel for the changes to take effect by clicking on the Kernel Menu item above, and selecting Restart. 

## Verify Spark version and existence of Spark

In [None]:
print('The spark version is {}.'.format(spark.version))

## Import required Spark libraries

In [None]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import Bucketizer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.feature import Normalizer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from sklearn.model_selection import train_test_split
import pixiedust
import pandas as pd
import seaborn as sns
import numpy as np

sns.set(style='white', context='notebook', palette='deep')

<a id="getdata"></a>
## 2. Download the data and examine.

-  pclass - Passenger class 
-  survived - Whether passenger survived or not (1=survived)
-  name - Passenger name
-  sex - Passenger sex 
-  age - Passenger age
-  sibsp - Number of passenger siblings/spouses
-  parch - Number of passenger parents/children
-  ticket - ticket number
-  fare - fare price
-  cabin - cabin
-  embarked - embarcation location
-  boat - lifeboat (if used)
-  body - body tag

## Read data in as a pandas dataFrame
### Source data is in CSV format and includes a header. We will use Pandas to infer the schema/data types.

In [None]:
url = "https://raw.githubusercontent.com/bleonardb3/AA/master/Lab-4/data/titanic.csv"
LoadTitanicData = pd.read_csv(url)

<a id="prepare"></a>
## 3. Prepare and shape the data

PixieDust is an open-source IBM library which can be used to easily and flexibly display data.

Use PixieDust to examine the schema (click on the Schema line).   Try differing displays of the data using PixieDust.

For example, try showing a histogram of fare or age or pclass.    Change the renderer and see what happens.

<br>
 <div class="panel-group" id="accordion-1">
  <div class="panel panel-default">
    <div class="panel-heading">
      <h4 class="panel-title">
        <a data-toggle="collapse" data-parent="#accordion-1" href="#collapse1-1">
        Hint 1</a>
      </h4>
    </div>
    <div id="collapse1-1" class="panel-collapse collapse">
      <div class="panel-body">Execute the displayLoadTitanicData) below. Then select the Chart icon and select Histogram</div>
    </div>
  </div>
  <div class="panel panel-default">
    <div class="panel-heading">
      <h4 class="panel-title">
        <a data-toggle="collapse" data-parent="#accordion-1" href="#collapse1-2">
        Hint 2</a>
      </h4>
    </div>
    <div id="collapse1-2" class="panel-collapse collapse">
      <div class="panel-body">Select the Options button.   Drag the age (or fare or class) field to the values column.   Change number of rows to display to more than the number of rows read in (1400 will do)</div>
    </div>
  </div>
  <div class="panel panel-default">
    <div class="panel-heading">
      <h4 class="panel-title">
        <a data-toggle="collapse" data-parent="#accordion-1" href="#collapse1-3">
        Hint 3</a>
      </h4>
    </div>
    <div id="collapse1-3" class="panel-collapse collapse">
      <div class="panel-body">Change the renderer (dropdown on upper right) to seaborn</div>
    </div>
  </div>
</div> 

In [None]:
display(LoadTitanicData)

<a id="remove"></a>
## 3.1 Remove unneeded columns

We are certain we can't make use of the "boat", "body" and "homedest" columns, so let's remove them.   

Confirm that those columns have been removed by examining the schema and data in PixieDust.

In [None]:
TitanicData = LoadTitanicData.drop("boat",1).drop("body",1).drop("homedest",1)
display(TitanicData)

## Get a count of the total number of passengers and percentage of passenger survivors.

Note: our count here is only for passengers.   There were a large number of crew as well.

In [None]:
print('The total number of passengers is {}.'.format(len(TitanicData)))
print('The percentage of passengers who survived is {}.'.format(TitanicData['survived'].mean() * 100.0))

## Describe the data

Use the <a href="https://pandas.pydata.org/pandas-docs/stable/generated/pandas.DataFrame.describe.html">describe()</a> method to examine the data.   

Why aren't all the data fields represented?   Can you change the describe command to show all values?<br/>
    
Why are the percentile values for some columns showing as NaN?<br/>

<br>
 <div class="panel-group" id="accordion-2">
  <div class="panel panel-default">
    <div class="panel-heading">
      <h4 class="panel-title">
        <a data-toggle="collapse" data-parent="#accordion-2" href="#collapse2-1">
        Optional - change describe() to show all values</a>
      </h4>
    </div>
    <div id="collapse2-1" class="panel-collapse collapse">
      <div class="panel-body">describe(include='all')</div>
    </div>
  </div>
</div> 

In [None]:
TitanicData.describe()

<a id="drop1"></a>
## 3.2 Drop rows with columns contain no or invalid values

Note how we have 1309 values for most rows -- but for age (1046) and fare (1308) we are missing values.    

For this notebook, we will drop all rows which do not have valid values using <a href="https://pandas.pydata.org/pandas-docs/stable/generated/pandas.DataFrame.dropna.html">dropna()</a> but will limit it to only age and fare columns<br/>.   

<br>
 <div class="panel-group" id="accordion-3">
  <div class="panel panel-default">
    <div class="panel-heading">
      <h4 class="panel-title">
        <a data-toggle="collapse" data-parent="#accordion-3" href="#collapse3-1">
        Hint 1</a>
      </h4>
    </div>
    <div id="collapse3-1" class="panel-collapse collapse">
      <div class="panel-body">Use the how and subset parameters</div>
    </div>
  </div>
  <div class="panel panel-default">
    <div class="panel-heading">
      <h4 class="panel-title">
        <a data-toggle="collapse" data-parent="#accordion-3" href="#collapse3-2">
        Hint 2</a>
      </h4>
    </div>
    <div id="collapse3-2" class="panel-collapse collapse">
      <div class="panel-body">how="any", subset=["age", "fare"]</div>
    </div>
  </div>
  <div class="panel panel-default">
    <div class="panel-heading">
      <h4 class="panel-title">
        <a data-toggle="collapse" data-parent="#accordion-3" href="#collapse3-3">
        Solution</a>
      </h4>
    </div>
    <div id="collapse3-3" class="panel-collapse collapse">
      <div class="panel-body">TitanicData_clean = TitanicData.dropna(how="any", subset=["age", "fare"])</div>
    </div>
  </div>
</div> 
What are some alternatives we might have employed instead where we could have kept the rows instead of dropping them?<br/>

In [None]:
TitanicData_clean = TitanicData.dropna(<include code here>)
TitanicData_clean.describe()

<a id="drop2"></a>
## 3.3 Examine all columns for null or invalid values

The values presented by describe() are only the numeric values.   We need to check all values (especially those we plan to use later such as those for gender).

Some values might be null, so let's replace any nulls with NaN [Not a Number] using the <a href="https://pandas.pydata.org/pandas-docs/stable/generated/pandas.DataFrame.fillna.html">fillna()</a> function.

In [None]:
TitanicData_clean = TitanicData_clean.fillna(np.nan)

# Check for Null values
TitanicData_clean.isnull().sum()

## Drop the rows with invalid embarked values

Also, the cabin column probably has too little data across all rows to be worth using. <br/>

In [None]:
del TitanicData_clean['cabin']
CleanedData = TitanicData_clean.dropna(how="any", subset=['embarked'])
CleanedData.isnull().sum()

<a id="split"></a>
## 4. Split the data into training (80%) and testing (20%) sets using <a href="http://scikit-learn.org/stable/modules/generated/sklearn.model_selection.train_test_split.html">test_train_split()</a>

Set random_state to 1 in order to make certain this is repeatable.
Set shuffle to True in order to randomize the data first (did you notice that the data was ordered by pclass?)

<br>
 <div class="panel-group" id="accordion-4">
  <div class="panel panel-default">
    <div class="panel-heading">
      <h4 class="panel-title">
        <a data-toggle="collapse" data-parent="#accordion-4" href="#collapse4-1">
        Hint 1</a>
      </h4>
    </div>
    <div id="collapse4-1" class="panel-collapse collapse">
      <div class="panel-body">test_size, random_state and shuffle</div>
    </div>
  </div>
  <div class="panel panel-default">
    <div class="panel-heading">
      <h4 class="panel-title">
        <a data-toggle="collapse" data-parent="#accordion-4" href="#collapse4-2">
        Hint 2</a>
      </h4>
    </div>
    <div id="collapse4-2" class="panel-collapse collapse">
      <div class="panel-body">test_size should be set to the desired percentage (0.2)<br/>random_state should be set to a fixed value (i.e. 1)<br/>shuffle should be set to True</div>
    </div>
  </div>
  <div class="panel panel-default">
    <div class="panel-heading">
      <h4 class="panel-title">
        <a data-toggle="collapse" data-parent="#accordion-4" href="#collapse4-3">
        Solution</a>
      </h4>
    </div>
    <div id="collapse4-3" class="panel-collapse collapse">
      <div class="panel-body">train, test = train_test_split(CleanedData, test_size = 0.2, random_state=1, shuffle=True)</div>
    </div>
  </div>
</div> 

In [None]:
train, test = train_test_split(CleanedData,<insert code here>  )
train.head(5)

<a id="examine"></a>
## 5. Examine the data

Examine the data to determine which values we may want to include in the model and whether we need to perform any additional data shaping.

## Examine correlations between the numeric values in the data set to survived using a <a href="https://seaborn.pydata.org/generated/seaborn.heatmap.html">heatmap</a>.

We can see the closest collelation between fare and survived (if you think about it this makes sense).   This doesn't mean that the other values are not useful. But to determine this, we need to explore in detail the remaining features.

To do this (as we did here) we will only look at the training data.   Why wouldn't we use the test data as well?<br/>

<br/>
<div class="panel-group" id="accordion-5">
  <div class="panel panel-default">
    <div class="panel-heading">
      <h4 class="panel-title">
        <a data-toggle="collapse" data-parent="#accordion-5" href="#collapse5-1">
        Hint 1</a>
      </h4>
    </div>
    <div id="collapse5-1" class="panel-collapse collapse">
      <div class="panel-body">"survived","pclass"</div>
    </div>
  </div>
  <div class="panel panel-default">
    <div class="panel-heading">
      <h4 class="panel-title">
        <a data-toggle="collapse" data-parent="#accordion-5" href="#collapse5-2">
        Hint 2</a>
      </h4>
    </div>
    <div id="collapse5-2" class="panel-collapse collapse">
      <div class="panel-body">"survived","pclass","sibsp","parch"</div>
    </div>
  </div>
  <div class="panel panel-default">
    <div class="panel-heading">
      <h4 class="panel-title">
        <a data-toggle="collapse" data-parent="#accordion-5" href="#collapse5-3">
        Solution</a>
      </h4>
    </div>
    <div id="collapse5-3" class="panel-collapse collapse">
      <div class="panel-body">g = sns.heatmap(train[["survived","pclass","sibsp","parch","age","fare"]].corr(),annot=True, fmt = ".2f", cmap = "coolwarm")</div>
    </div>
  </div>
</div> 

In [None]:
g = sns.heatmap(train[<insert code here>].corr(),annot=True, fmt = ".2f", cmap = "coolwarm"


## We are going to do an in-depth data examination below --  if you are behind, you can skip to [Transform the data](#transform).

<a id="sibsp"></a>
## 5.1 Sibling/Spouse vs Survived using <a href="https://seaborn.pydata.org/generated/seaborn.factorplot.html">factorplot()</a>

It seems that passengers having a lot of siblings/spouses have less chance to survive.   Perhaps they refused to break apart and chose to face their fate together.<br>
Single passengers (0 SibSP) or with two other persons (SibSP 1 or 2) have more chance to survive

In [None]:
# Explore SibSp feature vs Survived
g = sns.factorplot(x="sibsp",y="survived",data=train,kind="bar",size = 6 , 
palette = "muted")
g.despine(left=True)
g = g.set_ylabels("survival probability")

<a id="parch"></a>
## 5.2 Parents/Children vs Survived

Small families have more chance to survive, more than single (Parch 0), medium (Parch 3,4) and large families (Parch 5,6 ) -- this appears to complement the findings for sibsp.<br>
Of interest is a significant standard deviation in the survival of passengers with three (3) parents/children

In [None]:
# Explore Parch feature vs Survived
g  = sns.factorplot(x="parch",y="survived",data=train,kind="bar", size = 6 , 
palette = "muted")
g.despine(left=True)
g = g.set_ylabels("survival probability")

<a id="age"></a>
## 5.3 Age vs Survived using <a href="https://seaborn.pydata.org/generated/seaborn.FacetGrid.html">FacetGrid()</a>

Age distributions are not the same in the survived and not survived subpopulations. Indeed, there is a peak corresponding to young passengers, that have survived. We also see that passengers between 60-80 have less chance to survive.
So, even if "age" is not correlated with "survived", we can see that there is age categories of passengers that of have more or less chance to survive.   Young adults have a good chance to survive perhaps because they could survive in the water longer (this did happen).

It seems that very young passengers have more chance to survive [likely they got priority in the lifeboats].

In [None]:
# Explore Age vs Survived
g = sns.FacetGrid(train, col='survived')
g = g.map(sns.distplot, "age")

## When we superimpose the two densities , we cleary see a peak correponsing (between 0 and 5) to babies and very young children.

In [None]:
# Explore Age distibution 
g = sns.kdeplot(train["age"][(train["survived"] == 0) & (train["age"].notnull())], color="Red", shade = True)
g = sns.kdeplot(train["age"][(train["survived"] == 1) & (train["age"].notnull())], ax =g, color="Blue", shade= True)
g.set_xlabel("Age")
g.set_ylabel("Frequency")
g = g.legend(["Not Survived","Survived"])

<a id="fare"></a>
## 5.4 Fare distribution is very <a href="https://en.wikipedia.org/wiki/Skewness">skewed</a>. This can lead to overweight values in the model, even if it is scaled.

In this case, it is better to transform it with a log function to reduce this skew.

In [None]:
# Explore Fare distribution 
g = sns.distplot(train["fare"], color="m", label="Skewness : %.2f"%(train["fare"].skew()))
g = g.legend(loc="best")

## Map a log() function to all values of fare -- both train *and* test!

If we are going to modify the training data, we need to make certain the test data also reflects the same change.

In [None]:
# Apply log to Fare to reduce skewness distribution
train["fare"] = train["fare"].map(lambda i: np.log(i) if i > 0 else 0)
test["fare"] = test["fare"].map(lambda i: np.log(i) if i > 0 else 0)

In [None]:
g = sns.distplot(train["fare"], color="b", label="Skewness : %.2f"%(train["fare"].skew()))
g = g.legend(loc="best")

<a id="gender"></a>
## 5.5 Survival rates by Gender

It is clearly obvious that Males have less chance to survive than Females.
So gender likely plays an important role in the survival prediction.

Remember this sentence during the evacuation : "Women and children first".

In [None]:
g = sns.barplot(x="sex",y="survived",data=train)
g = g.set_ylabel("Survival Probability")

In [None]:
train[["sex","survived"]].groupby('sex').mean()

<a id="pclass"></a>
## 5.6 Survival by Passenger Class

The passenger survival rate is not the same across all three (3) classes. First class passengers have more chance to survive than second class and third class passengers.
This trend extends to when we look at both male and female passengers by class.

In [None]:
# Explore Pclass vs Survived
g = sns.factorplot(x="pclass",y="survived",data=train,kind="bar", size = 6 , 
palette = "muted")
g.despine(left=True)
g = g.set_ylabels("survival probability")

In [None]:
# Explore Pclass vs Survived by Sex
g = sns.factorplot(x="pclass", y="survived", hue="sex", data=train,
                   size=6, kind="bar", palette="muted")
g.despine(left=True)
g = g.set_ylabels("survival probability")

<a id="embarkation"></a>
## 5.7 Survival by Embarkation Location

Passengers embarking at Cherbourg have a better chance of survival than Queenstown (Q) or Southampton (S).   Any thoughts as to why?

In [None]:
# Explore Embarked vs Survived 
g = sns.factorplot(x="embarked", y="survived",  data=train,
                   size=6, kind="bar", palette="muted")
g.despine(left=True)
g = g.set_ylabels("survival probability")

## Examine Embarked by Class

Looks like most first class passengers embarked in Cherbourg (C).   

In [None]:
# Explore Pclass vs Embarked 
g = sns.factorplot("pclass", col="embarked",  data=train,
                   size=6, kind="count", palette="muted")
g.despine(left=True)
g = g.set_ylabels("Count")

<a id="transform"></a>
## 6. Transform the data

Certain data fields need to be transformed before building the model.   This can be for several reasons ranging from needing to convert String values to numeric values or shaping data into different formats.

<a id="stringindexer"></a>
## 6.1 Use <a href="https://spark.apache.org/docs/latest/ml-features.html#stringindexer">StringIndexer</a> to transform gender and embarked values

StringIndexer is a transformer that encodes a string column to a column of indices. The indices are ordered by value frequencies, so the most frequent value gets index 0. If the input column is numeric, it is cast to string first. 

For the Titanic data set, we will index the Sex/Gender column as well as the Embarked column, which specifies at which  port the passenger boarded the ship.

In [None]:
SexIndexer = StringIndexer(inputCol="sex", outputCol="SexIndex")
EmbarkedIndexer = StringIndexer(inputCol="embarked", outputCol="EmbarkedIndex")

<a id="bucketizer"></a>
## 6.2 <a href="https://spark.apache.org/docs/latest/ml-features.html#bucketizer">Bucketizer</a> is a transformer that transforms a column of continuous features to a column of feature buckets, where the buckets are by a splits parameter. 

For the Titanic data set, we will index the Age and Fare features.

Important!   Note that for Fare the splits now correspond to the log values since we made that change.

<br/>
<div class="panel-group" id="accordion-6">
  <div class="panel panel-default">
    <div class="panel-heading">
      <h4 class="panel-title">
        <a data-toggle="collapse" data-parent="#accordion-6" href="#collapse6-1">
        Advanced Optional</a>
      </h4>
    </div>
    <div id="collapse6-1" class="panel-collapse collapse">
      <div class="panel-body">After completing the lab, note the prediction percentage then come back and change the values for either Bucketizer and re-run the kernel [Kernel->Restart and Run All].   Note the change in prediction accuracy.</div>
    </div>
  </div>
</div> 

In [None]:
AgeBucketSplits = [0.0, 6.0, 12.0, 18.0, 40.0, 65.0, 80.0, float("inf")]
AgeBucket = Bucketizer(splits=AgeBucketSplits, inputCol="age", outputCol="AgeBucket")

FareBucketSplits = [-float("inf"), 0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, float("inf")]
FareBucket = Bucketizer(splits=FareBucketSplits, inputCol="fare", outputCol="FareBucket")

<a id="build"></a>
## 7. Building the Model

## <a href="https://spark.apache.org/docs/latest/ml-features.html#vectorassembler">VectorAssembler</a> is a transformer that combines a given list of columns in the order specified into a single vector column in order to train a model.

<br/>
<div class="panel-group" id="accordion-7">
  <div class="panel panel-default">
    <div class="panel-heading">
      <h4 class="panel-title">
        <a data-toggle="collapse" data-parent="#accordion-7" href="#collapse7-1">
        Advanced Optional</a>
      </h4>
    </div>
    <div id="collapse7-1" class="panel-collapse collapse">
      <div class="panel-body">After completing the lab, note the prediction percentage then come back and remove some of the values in the assembler (i.e. remove sibsp, pclass and parch or remove SexIndex) and re-run the kernel [Kernel->Restart and Run All].   Note the change in prediction accuracy.</div>
    </div>
  </div>
</div> 

In [None]:
assembler = VectorAssembler(inputCols= ["SexIndex", "EmbarkedIndex", "AgeBucket", "FareBucket", "sibsp", "pclass", "parch"], outputCol="features")

## <a href="https://spark.apache.org/docs/latest/ml-features.html#normalizer">Normalizer</a> is a Transformer which transforms a dataset of Vector rows, normalizing each Vector to have unit norm
### This normalization can help standardize your input data and improve the behavior of learning algorithms.

In [None]:
normalizer = Normalizer(inputCol="features", outputCol="normFeatures", p=1.0)

## <a href="https://spark.apache.org/docs/latest/ml-classification-regression.html#logistic-regression">Logistic regression</a> is a popular method to predict a binary response (Survived/Did Not Survive)
### It is a special case of Generalized Linear models that predicts the probability of an outcome.

<br/>
<div class="panel-group" id="accordion-7">
  <div class="panel panel-default">
    <div class="panel-heading">
      <h4 class="panel-title">
        <a data-toggle="collapse" data-parent="#accordion-7" href="#collapse7-2">
        Advanced Optional</a>
      </h4>
    </div>
    <div id="collapse7-2" class="panel-collapse collapse">
      <div class="panel-body">After completing the lab, note the prediction percentage then come back and change maxIter (say to 50 or 100) and re-run the kernel [Kernel->Restart and Run All].   Note the change in prediction accuracy.</div>
    </div>
  </div>
</div> 

In [None]:
lr = LogisticRegression(featuresCol="normFeatures", labelCol="survived", predictionCol="prediction", maxIter=10, regParam=0.3, elasticNetParam=0.8)

## A <a href="https://spark.apache.org/docs/latest/ml-pipeline.html">Pipeline</a> is a sequence of stages where each stage is either a Transformer or an Estimator
### These stages are run in order and the input DataFrame is transformed as it passes through each stage. 

### In machine learning, it is common to run a sequence of algorithms to process and learn from data.

We want to run the indexers and bucketizers first, then the assembler, normalizer -- and finally the logistic regression.

In [None]:
pipeline = Pipeline(stages=[SexIndexer, EmbarkedIndexer, AgeBucket, FareBucket, assembler, normalizer, lr])

## Transform the test and training sets back from Pandas dataframes to Spark dataframes.
### We need to do this because the ML algorithms require Spark dataframes.   They will not work with Pandas.
### Cache the resulting DataFrames - this is simply to improve performance.

In [None]:
train_df = spark.createDataFrame(train)
test_df = spark.createDataFrame(test)

train_df.cache()
test_df.cache()
print('The number of records in the training data set is {}.'.format(train_df.count()))
print('The number of rows labeled Not Survived in the training data set is {}.'.format(train_df.filter(train_df['survived'] == 0).count()))
print('The number of rows labeled Survived in the training data set is {}.'.format(train_df.filter(train_df['survived'] == 1).count()))
train_df.sample(False, 0.01, seed=0).show(5)
print('')

print('The number of records in the test data set is {}.'.format(test_df.count()))
print('The number of rows labeled Not Survived in the test data set is {}.'.format(test_df.filter(test_df['survived'] == 0).count()))
print('The number of rows labeled Survived in the test data set is {}.'.format(test_df.filter(test_df['survived'] == 1).count()))
test_df.sample(False, 0.1, seed=0).show(5)

## Fit the pipeline to the training data

In [None]:
model = pipeline.fit(train_df)

<a id="test"></a>
## 8. Make predictions on passengers in the Test data set
### Keep in mind that the model has not seen the data in the test data set

In [None]:
predictions = model.transform(test_df)

## Obtain a sample of the predictions and view using PixieDust

You can see the survived value on the far left side of the table and the prediction on the far right.

In [None]:
display(predictions.sample(False, 0.1, seed=0))

In [None]:
print('The number of predictions labeled Not Survived is {}.'.format(predictions.filter(predictions['prediction'] == 0).count()))
print('The number of predictions labeled Survived is {}.'.format(predictions.filter(predictions['prediction'] == 1).count()))

In [None]:
(predictions.filter("Survived = 0.0")
     .select("Sex", "Age", "Fare", "Embarked", "Pclass", "Parch", "SibSp", "Survived", "prediction")
     .sample(False, 0.1, seed=0).show(5))

(predictions.filter("Survived = 1.0")
     .select("Sex", "Age", "Fare", "Embarked", "Pclass", "Parch", "SibSp", "Survived", "prediction")
     .sample(False, 0.5, seed=0).show(5))

## Create an evaluator for the <a href="https://spark.apache.org/docs/latest/api/java/org/apache/spark/ml/evaluation/BinaryClassificationEvaluator.html">binary classification</a> using area under the ROC Curve as the evaluation metric

### Receiver operating characteristic (ROC) is a graphical plot that illustrates the performance of a binary classifier system as its discrimination threshold is varied

The curve is created by plotting the true positive rate against the false positive rate at various threshold settings. The ROC curve is thus the sensitivity as a function of fall-out. The area under the ROC curve is useful for comparing and selecting the best machine learning model for a given data set. A model with an area under the ROC curve score near 1 has very good performance. A model with a score near 0.5 is about as good as flipping a coin.

In [None]:
evaluator = BinaryClassificationEvaluator().setLabelCol("survived").setMetricName("areaUnderROC")
print('Area under the ROC curve = {}.'.format(evaluator.evaluate(predictions)))

<a id="tune"></a>
## 9. Tune the Model

## Tune Hyperparameters
### Generate hyperparameter combinations by taking the cross product of some parameter values

Spark ML algorithms provide many hyperparameters for tuning models. These hyperparameters are distinct from the model parameters being optimized by Spark ML itself. Hyperparameter tuning is accomplished by choosing the best set of parameters based on model performance on test data that the model was not trained with. All combinations of hyperparameters specified will be tried in order to find the one that leads to the model with the best evaluation result.

## Build a <a href="https://spark.apache.org/docs/latest/ml-tuning.html">Parameter Grid</a> specifying what parameters and values will be evaluated in order to determine the best combination

In [None]:
paramGrid = (ParamGridBuilder().addGrid(lr.regParam, [0.01, 0.1, 0.3])
                 .addGrid(lr.elasticNetParam, [0.1, 0.5, 0.8])
                 .addGrid(normalizer.p, [1.0, 2.0])
                 .build())

## Create a <a href="https://spark.apache.org/docs/latest/ml-tuning.html">cross validator</a> to tune the pipeline with the generated parameter grid
Spark ML provides for cross-validation for hyperparameter tuning. Cross-validation attempts to fit the underlying estimator with user-specified combinations of parameters, cross-evaluate the fitted models, and output the best one.

In [None]:
cv = CrossValidator().setEstimator(pipeline).setEvaluator(evaluator).setEstimatorParamMaps(paramGrid).setNumFolds(10)

## Cross-evaluate the ML Pipeline to find the best model
### using the area under the ROC evaluator and hyperparameters specified in the parameter grid

#### Note, this will take several minutes to go through all the permutations. 

In [None]:
cvModel = cv.fit(train_df)
print('Area under the ROC curve for best fitted model = {}.'.format(evaluator.evaluate(cvModel.transform(test_df))))

## Let's see what improvement we achieve by tuning the hyperparameters using cross-evaluation 

In [None]:
print('Area under the ROC curve for non-tuned model = {}.'.format(evaluator.evaluate(predictions)))
print('Area under the ROC curve for best fitted model = {}.'.format(evaluator.evaluate(cvModel.transform(test_df))))
print('Improvement = {0:0.2f}%'.format((evaluator.evaluate(cvModel.transform(test_df)) - evaluator.evaluate(predictions)) *100 / evaluator.evaluate(predictions)))

## Make improved predictions using the Cross-validated model
### Using the Test data set and DataFrame API

In [None]:
cvModel.transform(test_df).select("survived", "prediction").sample(False, 0.1, seed=0).show(10)

<a id="predict"></a>
## 10. Make a prediction on an imaginary passenger

## Define the imaginary passenger's features

In [None]:
SexValue = 'female'
AgeValue = 40.0

# remember fare values were transformed to the log -- so values should be -1 to 8
FareValue = 8.0

EmbarkedValue = 'C'
PclassValue = 2
SibSpValue = 1
ParchValue = 1

PredictionFeatures = (spark.createDataFrame([(SexValue, AgeValue, FareValue, EmbarkedValue, PclassValue, SibSpValue, ParchValue)],
    ['sex', 'age', 'fare', 'embarked', 'pclass', 'sibsp', 'parch']))
PredictionFeatures.show()

## Predict whether the imaginary person would have survived
### using the best fit model

In [None]:
SurvivedOrNotPrediction = cvModel.transform(PredictionFeatures)
SurvivedOrNotPrediction.select('rawPrediction', 'probability', 'prediction').show(1, False)

## Display Prediction Result

In [None]:
SurvivedOrNot = SurvivedOrNotPrediction.select("prediction").first()[0]
if SurvivedOrNot == 0.0:
    print("Did NOT Survive")
elif(SurvivedOrNot == 1.0):
    print("Did Survive!!!")
else:
    print("Invalid Prediction")

<a id="randomforest"></a>
## 11. Let's take a quick look at applying the feature engineering performed above to a Random Forest Model
### Random forests are ensembles of decision trees. They combine many decision trees in order to reduce the risk of overfitting.
### We won't do any hyperparamter tuning in this example, but just show how to create and evaluate the model using all default hyperparameters

In [None]:
from pyspark.ml.classification import RandomForestClassificationModel, RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import IndexToString

# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
labelIndexer = StringIndexer().setInputCol("survived").setOutputCol("indexedLabel").fit(train_df)

# Train a RandomForest model
rf = RandomForestClassifier().setLabelCol("indexedLabel").setFeaturesCol("features").setNumTrees(20)

# Convert indexed labels back to original labels.
labelConverter = IndexToString().setInputCol("prediction").setOutputCol("predictedLabel").setLabels(labelIndexer.labels)

# Create new Pipeline using the RandomForest model and all the same feature transformers used above for logistic regression
pipelineRF = Pipeline().setStages([labelIndexer, SexIndexer, EmbarkedIndexer, AgeBucket, FareBucket, assembler, normalizer, rf, labelConverter])

# Train model.
modelRF = pipelineRF.fit(train_df)

# Make predictions.
predictionsRF = modelRF.transform(test_df)

# Select example rows to display.
predictionsRF.select("predictedLabel", "survived", "features").show(10)

# Select (prediction, true label) and compute test error
evaluatorRF = MulticlassClassificationEvaluator().setLabelCol("survived").setPredictionCol("prediction").setMetricName("accuracy")
accuracyRF = evaluatorRF.evaluate(predictionsRF)
print("Test Error = %g" % (1.0 - accuracyRF))

rfModel = modelRF.stages[7]
print(rfModel)  # summary only

<a id="summary"></a>
## 12. Summary and next steps

You created a predictive model that predicts survival probabilities for passengers on the Titanic.

  - Load the data
  - Cleaned the data
  - Split the data into training and test sets
  - Examined data in the training set to determine which data to use and how it needed to be shaped
  - Created transformers to shape the data
  - Created a model using Pipeline
  - Tested the model
  - Tuned the model
  - Tested the model on an imaginary passenger
  - Build a second model using Random Forest
  

### Authors

Joel Patterson - IBM Corporation

Based in part on work by Rich Tarro (IBM) and Yassine Ghouzam, PhD

![IBM Logo](http://www-03.ibm.com/press/img/Large_IBM_Logo_TN.jpg)