# Machine Learning with Spark ML

### In this notebook, we will explore machine learning using Spark ML. We will exploit Spark ML's high-level APIs built on top of DataFrames to create and tune machine learning pipelines. Spark ML Pipelines enable combining multiple algorithms into a single pipeline or workflow. We will utilize Spark ML's feature transformers to convert, modify and scale the features that will be used to develop the machine learning model. Finally, we will evaluate and cross validate our model to demonstrate the process of determining a best fit model and load the results in the database.

### We will load generated travel data that has been examined for patterns of Human Traffickng from dashDB to do the machine learning.



## Verify Spark version and existence of Spark

In [1]:
print('The spark version is {}.'.format(spark.version))

The spark version is 2.0.2.


### Import the required libraries

In [2]:
# Imports for DashDB
# these are no longer needed
#import jaydebeapi
#from ibmdbpy import IdaDataBase
#from ibmdbpy import IdaDataFrame

#Imports for Spark
from pyspark.ml.feature import StringIndexer, IndexToString
from pyspark.ml.feature import Bucketizer
from pyspark.mllib.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.feature import Normalizer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.classification import NaiveBayes, DecisionTreeClassifier
from pyspark.sql.functions import year
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator


## Insert the database connection credentials like we did in lab 1

Click on the cell below, then on the notebook toolbar, click the box of 1's and 0's, find your database connection and click the Insert to code link under the connection name to have a credentials_1 dictionary added to the notebook. If you don't have any connections listed, refer to the PDF file detailing how to add an data source.

Connecting to dashDB requires the following information which are provided by the credentials dictionary inserted:

    Database name
    Host DNS name or IP address
    Host port
    Connection protocol
    User ID
    User password

The information credentials_1 will be used to build a connection string in a subsequent step. Note: it is possible that the credentials may be named credentials_2, etc. If so, simply rename to credentials_1


In [3]:

# @hidden_cell


In [4]:
sqlContext=SQLContext(sc)
trafficking_df = sqlContext.read.jdbc(credentials_1["jdbcurl"],
                                     'FEMALE_TRAFFICKING',
                                      properties = {"user" : credentials_1["username"], "password" : credentials_1["password"]})


### Show the first several rows of the data

In [5]:
trafficking_df.show(5)

+-------------+--------------------+------+---+----------+-------------+------------------+--------------------+--------------------+-----------+---------------+----------------+---------------------+--------------------+-----------------------+----------------------------+--------------------+----------------------------+----------------------+------------------------------+----------------------+------------------------------+------------------------+--------------------+
|VETTING_LEVEL|                NAME|GENDER|AGE|BIRTH_DATE|BIRTH_COUNTRY|BIRTH_COUNTRY_CODE|          OCCUPATION|             ADDRESS|        SSN|PASSPORT_NUMBER|PASSPORT_COUNTRY|PASSPORT_COUNTRY_CODE|   COUNTRIES_VISITED|COUNTRIES_VISITED_COUNT|ARRIVAL_AIRPORT_COUNTRY_CODE|ARRIVAL_AIRPORT_IATA|ARRIVAL_AIRPORT_MUNICIPALITY|ARRIVAL_AIRPORT_REGION|DEPARTURE_AIRPORT_COUNTRY_CODE|DEPARTURE_AIRPORT_IATA|DEPARTURE_AIRPORT_MUNICIPALITY|DEPARTURE_AIRPORT_REGION|                UUID|
+-------------+--------------------+------

##  We will use the 'VETTING_LEVEL' column as a label for training the machine learning model.  This is where our analyst has marked the data as vetted.  
#### Spark ML requires that that the labels are data type Double, so we will cast the  column as Double (it was inferred as Integer when read into Spark).
#### withColumn() is a Spark SQL way to manipulate a dataframe.  Since an RDD is immutable, we create a new RDD each time we transform.

This code creates a new column VettingTemp and sets it to the values in "VETTING_LEVEL" cast to a Double.    It then drops column VETTING_LEVEL and renames column VettingTemp to VETTING_LEVEL.

In [6]:
DataWithLabels = (trafficking_df.withColumn("VettingTemp", trafficking_df["VETTING_LEVEL"]
    .cast("Double")).drop("VETTING_LEVEL")
    .withColumnRenamed("VettingTemp", "VETTING_LEVEL"))

## We want to use year of birth intead of date of birth in our learning.  
### Another way to transform an rdd in Spark is using SQL Syntax.  Here, we will be adding a new field, BIRTH_YEAR to our vetting set.  We will also just select the fields we need.

In [7]:
DataWithLabels.createOrReplaceTempView("VettingData")
AllVettingData = sqlContext.sql ("SELECT UUID, VETTING_LEVEL, NAME, OCCUPATION, COUNTRIES_VISITED_COUNT, PASSPORT_COUNTRY_CODE, GENDER, year(BIRTH_DATE) as BIRTH_YEAR FROM VettingData")
AllVettingData.show()

+--------------------+-------------+--------------------+--------------------+-----------------------+---------------------+------+----------+
|                UUID|VETTING_LEVEL|                NAME|          OCCUPATION|COUNTRIES_VISITED_COUNT|PASSPORT_COUNTRY_CODE|GENDER|BIRTH_YEAR|
+--------------------+-------------+--------------------+--------------------+-----------------------+---------------------+------+----------+
|55cb1f8b-526f-4c0...|        100.0|Kelli Rachel Shaffer|Industrial/produc...|                      2|                   GH|     F|      1991|
|b7ca10cd-20af-403...|        100.0|Sandt Christina J...|Nutritional thera...|                      2|                   GH|     F|      1995|
|812f53d5-5589-42d...|        100.0|Cathie Chelsea Pi...|Geophysicist/fiel...|                      4|                   GH|     F|      1992|
|ff62334d-0b17-4a3...|        100.0|   Kristi Stacie Day|    Doctor, hospital|                      1|                   GH|     F|      1994|

## Note that the majority of the data has not been labeled (VETTING_LABEL=100 means unvetted).  We can not use it for our training data, so filter it out.

In [8]:
LabeledVettingData=AllVettingData.filter("VETTING_LEVEL != 100")


## VETTING_LEVEL is in three different statuses:
#### 10 - HIGH
#### 20 - MEDIUM
#### 30 - LOW


### Print the total number of vetting statuses 

In [9]:
# I removed the total count since you do it in the next section
print('The number of rows labeled high is {}.'.format(LabeledVettingData.filter(LabeledVettingData['VETTING_LEVEL'] == 10).count()))
print('The number of rows labeled medium is {}.'.format(LabeledVettingData.filter(LabeledVettingData['VETTING_LEVEL'] == 20).count()))
print('The number of rows labeled low is {}.'.format(LabeledVettingData.filter(LabeledVettingData['VETTING_LEVEL'] == 30).count()))
print('The number of unlabled rows is {}.'.format(LabeledVettingData.filter(LabeledVettingData['VETTING_LEVEL'] == 100).count()))

The number of rows labeled high is 42.
The number of rows labeled medium is 40.
The number of rows labeled low is 96.
The number of unlabled rows is 0.


### Count the number of vetted records

In [10]:
LabeledVettingData.count()

178

### Even though our labels are already doubles, we want to index them anyway so we can get string based statuses based on our predictions later.  For each vetting status, we need to convert each label to a double using StringIndexer().  The ML models will require the labels to be in a column called "label".


In [11]:
labelIndexer = StringIndexer(inputCol="VETTING_LEVEL", outputCol="label", handleInvalid="error")

# We need to fit our data into a model in order to get a mapping of the double to the original vetting status later
labelModel=labelIndexer.fit(LabeledVettingData)
converter = IndexToString(inputCol="prediction", outputCol="predCategory", labels=labelModel.labels)


## Now, we will need to index the data for machine learning.  SparkML has several indexers we can choose from. Each takes an input column and an output label that we will use in our pipeline later.


### StringIndexer
#### StringIndexer is a transformer that encodes a string column to a column of indices. The indices are ordered by value frequencies, so the most frequent value gets index 0. If the input column is numeric, it is cast to string first.
#### For our vetting dataset, we are interested in all string based features so we will use the StringIndexer for them.
#### We need to use 'handleInvalid="skip"' because not all values have been validated in our vetting set.  That means the algorithms will skip these records.




In [12]:
occupationIndexer = StringIndexer(inputCol="OCCUPATION", outputCol="occupationIndex", handleInvalid="skip")
countryIndexer = StringIndexer(inputCol="PASSPORT_COUNTRY_CODE", outputCol="countryIndex", handleInvalid="skip")
genderIndexer = StringIndexer(inputCol="GENDER", outputCol="genderIndex", handleInvalid="skip")
yearOfBirthIndexer = StringIndexer(inputCol="BIRTH_YEAR", outputCol="birthYearIndex", handleInvalid="skip")


### A VectorAssembler puts all of the features into a simple array.  This combines all of our features into one.  COUNTRIIES_VISITED_COUNT is already a numeric, so we can just put that in the array as is.
### The converter takes the double values of the predictions, and helps us convert them to our labels when we actually view them later.

In [13]:
converter = IndexToString(inputCol="prediction", outputCol="predCategory", labels=labelModel.labels)
vecAssembler = VectorAssembler(inputCols=["occupationIndex","countryIndex","genderIndex", "birthYearIndex", "COUNTRIES_VISITED_COUNT"], outputCol="features")


## Normalizer is a Transformer which transforms a dataset of Vector rows, normalizing each Vector to have unit norm
### This normalization can help standardize your input data and improve the behavior of learning algorithms.

In [14]:
normalizer = Normalizer(inputCol="features", outputCol="normFeatures", p=1.0)

## Declare the model that we want to use

### The model here is Naive Bayes.  It will output each prediction into a 'prediction' column.  Naive Bayes  is a probabistic model that learns based on previous decisions.



In [15]:
nb = NaiveBayes(smoothing=1.0, modelType="multinomial", labelCol="VETTING_LEVEL", predictionCol="prediction")


## A Pipeline is a sequence of stages where each stage is either a Transformer or an Estimator
### These stages are run in order and the input DataFrame is transformed as it passes through each stage.  The pipeline strings everything together.  First, comes the feature transformations, then the assembler to put them togather into one DF.  We pass that into the model. 

### In machine learning, it is common to run a sequence of algorithms to process and learn from data.

In [16]:
pipeline = Pipeline(stages=[labelIndexer,occupationIndexer,countryIndexer, genderIndexer, yearOfBirthIndexer, vecAssembler, normalizer, nb, converter])

## Now, train the data
### We split up the data randomly into 90% for training and 10% for testing.

In [17]:
train, test = LabeledVettingData.randomSplit([90.0,10.0], seed=1)
train.cache()
test.cache()
print('The number of records in the traininig data set is {}.'.format(train.count()))
print('The number of rows labeled high is {}.'.format(train.filter(train['VETTING_LEVEL'] == 10).count()))
print('The number of rows labeled medium is {}.'.format(train.filter(train['VETTING_LEVEL'] == 20).count()))
print('The number of rows labeled low is {}.'.format(train.filter(train['VETTING_LEVEL'] == 30).count()))
print('')

print('The number of records in the test data set is {}.'.format(test.count()))
print('The number of rows labeled high is {}.'.format(test.filter(test['VETTING_LEVEL'] == 10).count()))
print('The number of rows labeled medium is {}.'.format(test.filter(test['VETTING_LEVEL'] == 20).count()))
print('The number of rows labeled low is {}.'.format(test.filter(test['VETTING_LEVEL'] == 30).count()))


The number of records in the traininig data set is 159.
The number of rows labeled high is 35.
The number of rows labeled medium is 39.
The number of rows labeled low is 85.

The number of records in the test data set is 19.
The number of rows labeled high is 7.
The number of rows labeled medium is 1.
The number of rows labeled low is 11.


## Fit the pipeline to the training data
<div class="panel-group" id="accordion-3">
  <div class="panel panel-default">
    <div class="panel-heading">
      <h4 class="panel-title">
        <a data-toggle="collapse" data-parent="#accordion-3" href="#collapse-3">
        Hint</a>
      </h4>
    </div>
    <div id="collapse-3" class="panel-collapse collapse">
      <div class="panel-body">Type (or copy) the following in the cell below: <br>
          model = pipeline.fit(train)<br>
      </div>
    </div>
  </div>

In [18]:
# Fit the pipeline to the training data assigning the result to a variable called 'model'.


## Make predictions on document in the Test data set
### Keep in mind that the model has not seen the data in the test data set
<div class="panel-group" id="accordion-4">
  <div class="panel panel-default">
    <div class="panel-heading">
      <h4 class="panel-title">
        <a data-toggle="collapse" data-parent="#accordion-4" href="#collapse-4">
        Hint</a>
      </h4>
    </div>
    <div id="collapse-4" class="panel-collapse collapse">
      <div class="panel-body">Type (or copy) the following in the cell below: <br>
          predictions = model.transform(test)<br>
      </div>
    </div>
  </div>

In [19]:
# Make predictions on the test data assigning the result to a variable called 'predictions'.



## Show Results
### Note that we only got a small sample of the results back beacuse we have a very small about of training data. 

In [20]:
predictions.count()

2

## Create an evaluator for the binary classification using area under the ROC Curve as the evaluation metric

### Receiver operating characteristic (ROC) is a graphical plot that illustrates the performance of a binary classifier system as its discrimination threshold is varied

The curve is created by plotting the true positive rate against the false positive rate at various threshold settings. The ROC curve is thus the sensitivity as a function of fall-out. The area under the ROC curve is useful for comparing and selecting the best machine learning model for a given data set. A model with an area under the ROC curve score near 1 has very good performance. A model with a score near 0.5 is about as good as flipping a coin.

In [21]:
evaluator = BinaryClassificationEvaluator().setLabelCol("VETTING_LEVEL").setMetricName("areaUnderROC")
print('Area under the ROC curve = {}.'.format(evaluator.evaluate(predictions)))

Area under the ROC curve = 1.0.


## Tune Hyperparameters
### Generate hyperparameter combinations by taking the cross product of some parameter values

Spark ML algorithms provide many hyperparameters for tuning models. These hyperparameters are distinct from the model parameters being optimized by Spark ML itself. Hyperparameter tuning is accomplished by choosing the best set of parameters based on model performance on test data that the model was not trained with. All combinations of hyperparameters specified will be tried in order to find the one that leads to the model with the best evaluation result.

## Build a Parameter Grid specifying what parameters and values will be evaluated in order to determine the best combination

In [22]:
paramGrid = (ParamGridBuilder().addGrid(nb.smoothing, [0.25, 0.5, 0.75])
                 .addGrid(normalizer.p, [1.0, 2.0])
                 .build())

## Create a cross validator to tune the pipeline with the generated parameter grid
Spark ML provides for cross-validation for hyperparameter tuning. Cross-validation attempts to fit the underlying estimator with user-specified combinations of parameters, cross-evaluate the fitted models, and output the best one.  Note that since runs the model several times, it takes a few seconds.

In [23]:
cv = CrossValidator().setEstimator(pipeline).setEvaluator(evaluator).setEstimatorParamMaps(paramGrid).setNumFolds(10)

## Cross-evaluate the ML Pipeline to find the best model
### using the area under the ROC evaluator and hyperparameters specified in the parameter grid

In [24]:
cvModel = cv.fit(LabeledVettingData)
print('Area under the ROC curve for best fitted model = {}.'.format(evaluator.evaluate(cvModel.transform(LabeledVettingData))))

Area under the ROC curve for best fitted model = 1.0.


## Let's see what improvement we achieve by tuning the hyperparameters using cross-evaluation 

In [25]:
print('Area under the ROC curve for non-tuned model = {}.'.format(evaluator.evaluate(predictions)))
print('Area under the ROC curve for best fitted model = {}.'.format(evaluator.evaluate(cvModel.transform(LabeledVettingData))))
print('Improvement = {0:0.2f}%'.format((evaluator.evaluate(cvModel.transform(LabeledVettingData)) - evaluator.evaluate(predictions)) *100 / evaluator.evaluate(predictions)))

Area under the ROC curve for non-tuned model = 1.0.
Area under the ROC curve for best fitted model = 1.0.
Improvement = 0.00%


### We didn't do any better, so keep with the original model.  If it was better, we would go ahead and use "cvModel" instead of "model" below.

## Now, we want to score the remaining records that were unscored, and load them into a new table in the database.
### First, in order to run 'new' data through the machine algoriths, we need to remove the VETTING_LEVEL field from the data frame.

In [26]:
# Remember, the "AllVettingData" data frame has all the formatted data.
AllVettingData.drop("VETTING_LEVEL")

DataFrame[UUID: string, NAME: string, OCCUPATION: string, COUNTRIES_VISITED_COUNT: int, PASSPORT_COUNTRY_CODE: string, GENDER: string, BIRTH_YEAR: int]

## Do the actual transformation on the unvetted data.
<div class="panel-group" id="accordion-3">
  <div class="panel panel-default">
    <div class="panel-heading">
      <h4 class="panel-title">
        <a data-toggle="collapse" data-parent="#accordion-3" href="#collapse-3">
        Hint</a>
      </h4>
    </div>
    <div id="collapse-3" class="panel-collapse collapse">
      <div class="panel-body">Type (or copy) the following in the cell below: <br>
         newPreds = model.transform(AllVettingData)<br>
      </div>
    </div>
  </div>

In [27]:
 # Transform the new model


## Show the data we have predicted and some of the fields in the data.  

In [28]:
newPreds.select("UUID", "prediction", "predCategory", "probability", "NAME", "GENDER", "COUNTRIES_VISITED_COUNT", "PASSPORT_COUNTRY_CODE" ).show()

+--------------------+----------+------------+--------------------+--------------------+------+-----------------------+---------------------+
|                UUID|prediction|predCategory|         probability|                NAME|GENDER|COUNTRIES_VISITED_COUNT|PASSPORT_COUNTRY_CODE|
+--------------------+----------+------------+--------------------+--------------------+------+-----------------------+---------------------+
|dc34c030-e0fb-403...|       1.0|        10.0|[0.20180601907557...|    Tina Lori Keller|     F|                      7|                   GH|
|c57dc633-9338-4e0...|       1.0|        10.0|[0.38309263872194...|        Laura Booker|     F|                      4|                   GH|
|81a59282-cd5d-4e7...|       1.0|        10.0|[0.12434049166133...|      Rebecca Wilson|     F|                      2|                   GH|
|77bbd090-ab53-48b...|       1.0|        10.0|[0.27907699814084...|      Maureen Holmes|     F|                      3|                   GH|
|6260a

# Remember that VETTING_LEVEL is in three different statuses:
#### 10- HIGH
#### 20- MEDIUM
#### 30 - LOW


### Print the total number of vetting statuses that we predicted.  The actual predicted data is low because we only have a few vetted records.

In [29]:
print('The number of records in the test data set is {}.'.format(newPreds.count()))
print('The number of rows labeled high is {}.'.format(test.filter(newPreds['VETTING_LEVEL'] == 10).count()))
print('The number of rows labeled medium is {}.'.format(test.filter(newPreds['VETTING_LEVEL'] == 20).count()))
print('The number of rows labeled low is {}.'.format(test.filter(newPreds['VETTING_LEVEL'] == 30).count()))


The number of records in the test data set is 321.
The number of rows labeled high is 7.
The number of rows labeled medium is 1.
The number of rows labeled low is 11.


### Downselect all the values we need to join in our next lab to display the results, and write to the database.

In [30]:
valuesToWrite= newPreds.select("UUID",  "predCategory")

valuesToWrite.write.jdbc(credentials_1["jdbcurl"], "FEMALE_TRAFFICKING_ML_RESULTS", properties = {"user" : credentials_1["username"], "password" : credentials_1["password"]}, mode="overwrite")
