# Machine Learning with Spark ML

### In this notebook, we will explore machine learning using Spark ML. We will exploit Spark ML's high-level APIs built on top of DataFrames to create and tune machine learning pipelines. Spark ML Pipelines enable combining multiple algorithms into a single pipeline or workflow. We will utilize Spark ML's feature transformers to convert, modify and scale the features that will be used to develop the machine learning model. Finally, we will evaluate and cross validate our model to demonstrate the process of determining a best fit model and load the results in the database.

### We are using machine learning to sort the highest priority records for an analyst to review.  We will use as a training set for the algorithm fake data that has been vetted by an analyst as high, medium or low.¶

### We will use generated travel data that has been examined for patterns of Human Trafficking from dashDB to do the machine learning.  

### Notebook Tips 
- Press SHIFT + ENTER keys to Execute a Code Cell. 
- In [ ] - code has not yet been run
- In [*] - code is running and not yet complete
- In [5] - code execution has completed




## Verify Spark version and existence of Spark

In [None]:
print('The spark version is {}.'.format(spark.version))

### Install pixiedust.  With this package we can do some nice visualizations.

In [None]:
!pip install --trusted-host pypi.python.org JayDeBeApi==0.2.0 --user
!pip install --trusted-host pypi.python.org --user --upgrade ibmdbpy
!pip install --trusted-host pypi.python.org --user --upgrade pixiedust

### Import the required libraries

In [None]:
# Imports for DashDB
import jaydebeapi
from ibmdbpy import IdaDataBase
from ibmdbpy import IdaDataFrame

#Imports for Spark
from pyspark.ml.feature import StringIndexer, IndexToString
from pyspark.ml.feature import Bucketizer
from pyspark.mllib.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.feature import Normalizer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.classification import NaiveBayes, DecisionTreeClassifier
from pyspark.sql.functions import year
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Imports for pixiedust
from pixiedust.display import *


## Use the insert to code option to connect to the database.  

Click on the cell below, then on the notebook toolbar, click the box of 1's and 0's, find your database connection and click the Insert to code link under the connection name and then click on "Insert SparkSession DataFrame" to have the data from the FEMALE_HUMAN_TRAFFICKING database read into an Spark DataFrame.  

Please change the resulting ida_df_1 to be trafficking_df to maintain consistency with the remainder of the notebook

The @hidden_cell directive tells DSX not to export credentials when sharing.


## Step 1 - Connect to the database and read in our data

In [None]:
# Insert code from connection here. 
#Click on the "Insert to code", and then Insert SparkSession DataFrame
# Please change generated data frame name(possibly data_df_1) to trafficking_df to be consistent with the remainder of the notebook.
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

# @hidden_cell
# The following code is used to access your data and contains your credentials.
# You might want to remove those credentials before you share your notebook.

properties_b009ab6c6e134caa95dd4d1712bd1c40 = {
    'jdbcurl': 'jdbc:db2://bluemix05.bluforcloud.com:50000/BLUDB',
    'user': 'dash113964',
    'password': '_d33Mb_uWRaR'
}

trafficking_df = spark.read.jdbc(properties_b009ab6c6e134caa95dd4d1712bd1c40['jdbcurl'], table='DASH113964.FEMALE_HUMAN_TRAFFICKING', properties=properties_b009ab6c6e134caa95dd4d1712bd1c40)
trafficking_df.head()



## Step 2- Identify our labels and transform 

We will use the 'VETTING_LEVEL' column as a label for training the machine learning model.  This is where our analyst has marked the data as vetted.  

Spark ML requires that that the labels are data type Double, so we will cast the  column as Double (it was inferred as Integer when read into Spark).

withColumn() is a Spark SQL way to manipulate a dataframe.  Since an RDD is immutable, we create a new RDD each time we transform.  This code creates a new column VettingTemp and sets it to the values in "VETTING_LEVEL" cast to a Double.    It then drops column VETTING_LEVEL and renames column VettingTemp to VETTING_LEVEL.

In [None]:
DataWithLabels = (trafficking_df.withColumn("VettingTemp", trafficking_df["VETTING_LEVEL"]
    .cast("Double")).drop("VETTING_LEVEL").withColumnRenamed("VettingTemp", "VETTING_LEVEL"))

We want to use year of birth intead of date of birth in our learning.  

Another way to transform an rdd in Spark is using SQL Syntax.  Here, we will be adding a new field, BIRTH_YEAR to our vetting set.  We will also just select the fields we need.

In [None]:
DataWithLabels.createOrReplaceTempView("VettingData")
AllVettingData = sqlContext.sql ("SELECT UUID, VETTING_LEVEL, NAME, OCCUPATION, COUNTRIES_VISITED_COUNT, PASSPORT_COUNTRY_CODE, GENDER,year(BIRTH_DATE) as BIRTH_YEAR, 1 as Counter FROM VettingData")
FilteredVettingData = AllVettingData.filter("VETTING_LEVEL==100")
#AllVettingData.show()
FilteredVettingData.count()
FilteredVettingData.show()

Use pixiedust to visually explore the data.

In [None]:
display(AllVettingData)

Now, let's look at the data we have:

VETTING_LEVEL is in three different statuses:

    10 - HIGH
    
    20 - MEDIUM
    
    30 - LOW


Print the total number of vetting statuses 

In [None]:
print('The number of rows labeled high is {}.'.format(AllVettingData.filter(AllVettingData['VETTING_LEVEL'] == 10).count()))
print('The number of rows labeled medium is {}.'.format(AllVettingData.filter(AllVettingData['VETTING_LEVEL'] == 20).count()))
print('The number of rows labeled low is {}.'.format(AllVettingData.filter(AllVettingData['VETTING_LEVEL'] == 30).count()))
print('The number of unlabled rows is {}.'.format(AllVettingData.filter(AllVettingData['VETTING_LEVEL'] == 100).count()))

The majority of the data has not been labeled (VETTING_LABEL=100 means unvetted).  We can not use it for our training data, so filter it out.
Print the total number of rows.

In [None]:
LabeledVettingData=AllVettingData.filter("VETTING_LEVEL != 100")
LabeledVettingData.count()

## Step 3  - Feature Engineering.
### A feature is the elements of the data that we are using in our learning.  We need to transform each one of our features into a format that SparkML can use it.
More about the choices for feature engineering can be found here:
http://spark.apache.org/docs/2.0.0/ml-features.html#stringindexer


The first thing we will do is transorm our lables (VETTING_LEVEL) into a format that we can use in the algorithm, and then get back to 'human readable' from in the end. The ML models require that the labels are in a column called 'label'.    The converter helps us transform these back in the end.



In [None]:
labelIndexer = StringIndexer(inputCol="VETTING_LEVEL", outputCol="label", handleInvalid="error")
labelModel = labelIndexer.fit(LabeledVettingData)
converter = IndexToString(inputCol="prediction", outputCol="predCategory", labels=labelModel.labels)

Next, we will process all of the features we will use. While there are a variety of choices for transforming elements, we will treat each as a String using the StringIndexer.

StringIndexer is a transformer that encodes a string column to a column of indices. The indices are ordered by value frequencies, so the most frequent value gets index 0. If the input column is numeric, it is cast to string first.

For our vetting dataset, we are interested in all string based features so we will use the StringIndexer for them.  We need to use 'handleInvalid="skip"' because not all values have been validated in our vetting set.  That means the algorithms will skip these records.

In [None]:
occupationIndexer = StringIndexer(inputCol="OCCUPATION", outputCol="occupationIndex", handleInvalid="skip")
countryIndexer = StringIndexer(inputCol="PASSPORT_COUNTRY_CODE", outputCol="countryIndex", handleInvalid="skip")
genderIndexer = StringIndexer(inputCol="GENDER", outputCol="genderIndex", handleInvalid="skip")
yearOfBirthIndexer = StringIndexer(inputCol="BIRTH_YEAR", outputCol="birthYearIndex", handleInvalid="skip")

Now, put all of our features into a simple array using a VectorAssembler.

Note that COUNTRIIES_VISITED_COUNT is already a numeric, so we can just put that in the array as is.


In [None]:
vecAssembler = VectorAssembler(inputCols=["occupationIndex","countryIndex","genderIndex", "birthYearIndex", "COUNTRIES_VISITED_COUNT"], outputCol="features")

Normalizer will help us normalize the features into a standard frmat.  It can help us improve the behavior of the learning algorithms.


In [None]:
normalizer = Normalizer(inputCol="features", outputCol="normFeatures", p=1.0)

## Step 4- Declare the model that we want to use

The model here is Naive Bayes.  It will output each prediction into a 'prediction' column.  Naive Bayes  is a probabistic model that learns based on previous decisions.  We will take a best guess at the paramater 'smoothing'- SparkML will help us tune it later!



In [None]:
nb = NaiveBayes(smoothing=1.0, modelType="multinomial", labelCol="label", predictionCol="prediction")

## Step 5 - Setup the Pipeline

The pipeline is the guts of the algorithm that strings all the work we've done together.

The stages are run in order and the input DataFrame is transformed as it passes through each stage.   First, comes the feature transformations, then the assembler to put them togather into one DF.  We pass that into the model. 

In machine learning, it is common to run a sequence of algorithms to process and learn from data, so this can get as complex as we want to make it!

In [None]:
pipeline = Pipeline(stages=[labelIndexer,occupationIndexer,countryIndexer, genderIndexer, yearOfBirthIndexer, vecAssembler, normalizer, nb, converter])

## Step 6 - Train the model

We will split it into training data which is marked and test data which will be used to test the efficiency of the algorithms.

It is common to split the split up the data randomly into 70% for training and 30% for testing.  If we were to use a bigger test set, we might use an 80% / 20% split.

In [None]:
train, test = LabeledVettingData.randomSplit([70.0,30.0], seed=1)
train.cache()
test.cache()
print('The number of records in the training data set is {}.'.format(train.count()))
print('The number of rows labeled high is {}.'.format(train.filter(train['VETTING_LEVEL'] == 10).count()))
print('The number of rows labeled medium is {}.'.format(train.filter(train['VETTING_LEVEL'] == 20).count()))
print('The number of rows labeled low is {}.'.format(train.filter(train['VETTING_LEVEL'] == 30).count()))
print('')

print('The number of records in the test data set is {}.'.format(test.count()))
print('The number of rows labeled high is {}.'.format(test.filter(test['VETTING_LEVEL'] == 10).count()))
print('The number of rows labeled medium is {}.'.format(test.filter(test['VETTING_LEVEL'] == 20).count()))
print('The number of rows labeled low is {}.'.format(test.filter(test['VETTING_LEVEL'] == 30).count()))

 Fit the pipeline to the training data.  This will run the data through the algorithm to train it based on our labled data.
 
<div class="panel-group" id="accordion-3">
  <div class="panel panel-default">
    <div class="panel-heading">
      <h4 class="panel-title">
        <a data-toggle="collapse" data-parent="#accordion-3" href="#collapse-3">
        Hint</a>
      </h4>
    </div>
    <div id="collapse-3" class="panel-collapse collapse">
      <div class="panel-body">Type (or copy) the following in the cell below: <br>
          model = pipeline.fit(train)<br>
      </div>
    </div>
  </div>

In [None]:
# Fit the pipeline to the training data assigning the result to a variable called 'model'.
model = pipeline.fit(train)

Make predictions on documents in the Test data set.  This will test the model based on the 30% data we have left in reserve.  Keep in mind that the model has not seen the data in the test data set.

<div class="panel-group" id="accordion-4">
  <div class="panel panel-default">
    <div class="panel-heading">
      <h4 class="panel-title">
        <a data-toggle="collapse" data-parent="#accordion-4" href="#collapse-4">
        Hint</a>
      </h4>
    </div>
    <div id="collapse-4" class="panel-collapse collapse">
      <div class="panel-body">Type (or copy) the following in the cell below: <br>
          predictions = model.transform(test)<br>
      </div>
    </div>
  </div>

In [None]:
# Make predictions on the test data assigning the result to a variable called 'predictions'.
predictions = model.transform(test)

## Step 7 -  Show and Evaluate Results

Note that we only got a small sample of the results back because we have a very small amount of training data. 

In [None]:
predictions.count()

SparkML has automated ways to look at result quality called Evaluators.  More information can be found here:
http://spark.apache.org/docs/latest/mllib-evaluation-metrics.html

For simplicity here, we will use a a common evaluation method called Reciever Operator Characteristic.  This genenerally is used for binary classifiers, but we will use it because we only have 3 levels of prediction.

The curve is created by plotting the true positive rate against the false positive rate at various threshold settings. The ROC curve is thus the sensitivity as a function of fall-out. The area under the ROC curve is useful for comparing and selecting the best machine learning model for a given data set. A model with an area under the ROC curve score near 1 has very good performance. A model with a score near 0.5 is about as good as flipping a coin.

In [None]:
evaluator = BinaryClassificationEvaluator().setLabelCol("label").setMetricName("areaUnderROC")
print('Area under the ROC curve = {}.'.format(evaluator.evaluate(predictions)))

## Step 8 - Automatic algorithm Tuning - Also Called  Hyperparameter Tuning


Spark ML algorithms provide many hyperparameters for tuning models. These hyperparameters are distinct from the model parameters being optimized by Spark ML itself.  Hyperparameter tuning is accomplished by choosing the best set of parameters based on model performance on test data that the model was not trained with. All combinations of hyperparameters specified will be tried in order to find the one that leads to the model with the best evaluation result.


First we will build a paramater grid to tell SparkML what to change in its testing.  Note that we are changing all the paramaters we setup in our pipeline before - the 'smoothing' in our model, and the normalizer parameter.

In [None]:
paramGrid = (ParamGridBuilder().addGrid(nb.smoothing, [0.25, 0.5, 0.75])
                 .addGrid(normalizer.p, [1.0, 2.0]).build())

Now, create a cross validator to tune the pipeline with the generated parameter grid.  Cross-validation attempts to fit the underlying estimator with user-specified combinations of parameters, cross-evaluate the fitted models, and output the best one.  

In [None]:
cv = CrossValidator().setEstimator(pipeline).setEvaluator(evaluator).setEstimatorParamMaps(paramGrid).setNumFolds(10)

Next, we will run the models through the grid we set above.  It runs Cross-evaluate the ML Pipeline to find the best model.  Note that since runs the model several times, it takes a few minutes to run.

In [None]:
cvModel = cv.fit(train)
print('Area under the ROC curve for best fitted model = {}.'.format(evaluator.evaluate(cvModel.transform(test))))

Let's see what improvement we achieve by tuning the hyperparameters using cross-evaluation 

In [None]:
print('Area under the ROC curve for non-tuned model = {}.'.format(evaluator.evaluate(predictions)))
print('Area under the ROC curve for best fitted model = {}.'.format(evaluator.evaluate(cvModel.transform(test))))
print('Improvement = {0:0.2f}%'.format((evaluator.evaluate(cvModel.transform(test)) - evaluator.evaluate(predictions)) *100 / evaluator.evaluate(predictions)))

We did a bit better with the new params!  Let's use "cvModel" instead of "model" below, because SparkML told us it was the best result.

## Step 9 - Score the remaining records that were unscored, and load them into a new table in the database.

First, we want to only get the unvetted records.

In [None]:
NewVettingData=AllVettingData.filter("VETTING_LEVEL == 100")

Next, transform the new model with the new vetting records

In [None]:
newPreds = cvModel.transform(NewVettingData)

 Show the data we have predicted and some of the fields in the data.  

In [None]:
newPreds.select("UUID", "prediction", "predCategory", "probability", "NAME", "GENDER", "COUNTRIES_VISITED_COUNT", "PASSPORT_COUNTRY_CODE" ).show()

Remember that VETTING_LEVEL is in three different statuses:


10- HIGH

20- MEDIUM

30 - LOW


Let's print the total number of vetting statuses that we predicted.  The actual predicted data is low because we only have a few vetted records.  Remember that we had to 'skip' and features that were not in our trained data, so if we didn't have someone who was born in a certain year in our training data, we won't be able to predict a result.

In [None]:
print('The number of records in the unvetted data set is {}.'.format(newPreds.count()))
print('The number of rows labeled high is {}.'.format(newPreds.filter(newPreds['predCategory'] == 10).count()))
print('The number of rows labeled medium is {}.'.format(newPreds.filter(newPreds['predCategory'] == 20).count()))
print('The number of rows labeled low is {}.'.format(newPreds.filter(newPreds['predCategory'] == 30).count()))

Now, downselect all the values we need to join in our next lab to display the results, and write to the database.  We will only load the unique ID and the prediction into our new table in dashDB.  We'll call the table "FEMALE_TRAFFICKING_ML_RESULTS". First we need to obtain the credentials for the dashDB database. 

### Move cursor to below cell. Click on the 1s and 0s icon if the Files/Connections dialog is not open. Click on the Connections tab. Then click on Intert to code down arrow. Then click on Insert Credentials.  

In [None]:

# @hidden_cell
credentials_1 = {
  'host':'bluemix05.bluforcloud.com',
  'port':'50000',
  'user':'dash113964',
  'password':"""_d33Mb_uWRaR""",
  'database':'BLUDB'
}


In [None]:

valuesToWrite= newPreds.select("UUID",  "predCategory")
valuesToWrite.write.jdbc("jdbc:db2://bluemix05.bluforcloud.com:50000/BLUDB", "FEMALE_HUMAN_TRAFFICKING_ML_RESULTS",
                         properties = {"user" : credentials_1["user"], "password" : credentials_1["password"]},
                         mode="overwrite")