Links to other notebooks in the same folder:
<a href='http://pivotal.io/data-science'><img src='https://raw.githubusercontent.com/crawles/Logos/master/Pivotal_TealOnWhite.png' width='200px' align='right'></a>

<nav class="navbar navbar-light bg-faded">
    <ul class="nav navbar-nav">
        <li class="">
            <a class="nav-link" href="MLlib Example.ipynb">MLlib Example</a>
        </li>
        <li class="">
            <a class="nav-link">ML Example</a>
        </li>


# Table of Contents
 <p><div class="lev1 toc-item"><a href="#Import-useful-libraries" data-toc-modified-id="Import-useful-libraries-1"><span class="toc-item-num">1&nbsp;&nbsp;</span>Import useful libraries</a></div><div class="lev1 toc-item"><a href="#Data" data-toc-modified-id="Data-2"><span class="toc-item-num">2&nbsp;&nbsp;</span>Data</a></div><div class="lev2 toc-item"><a href="#Spark-DataFrames" data-toc-modified-id="Spark-DataFrames-21"><span class="toc-item-num">2.1&nbsp;&nbsp;</span>Spark DataFrames</a></div><div class="lev1 toc-item"><a href="#Operations-on-DataFrames" data-toc-modified-id="Operations-on-DataFrames-3"><span class="toc-item-num">3&nbsp;&nbsp;</span>Operations on DataFrames</a></div><div class="lev1 toc-item"><a href="#Modelling" data-toc-modified-id="Modelling-4"><span class="toc-item-num">4&nbsp;&nbsp;</span>Modelling</a></div><div class="lev1 toc-item"><a href="#Pipeline" data-toc-modified-id="Pipeline-5"><span class="toc-item-num">5&nbsp;&nbsp;</span>Pipeline</a></div><div class="lev1 toc-item"><a href="#Extracting-the-Probability" data-toc-modified-id="Extracting-the-Probability-6"><span class="toc-item-num">6&nbsp;&nbsp;</span>Extracting the Probability</a></div>

# Import useful libraries

In [1]:
from datetime import datetime
from dateutil.relativedelta import relativedelta
import getopt
import os
import sys
import urllib

from IPython.core.display import display, HTML
from IPython.core.magic import register_cell_magic, register_line_cell_magic,\
                               register_line_magic
%matplotlib inline
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import pandas.io.sql as psql
import psycopg2
import seaborn as sns

from pyspark.ml.classification import LogisticRegression,\
                                      RandomForestClassifier
from pyspark.ml.feature import VectorAssembler, StandardScaler, StringIndexer
from pyspark.ml import Pipeline
from pyspark.sql import Row
import pyspark.sql.functions as F
from pyspark.sql.functions import col

In [2]:
# Changes logo to a Pivotal logo
jPrefs = urllib.urlopen("https://raw.githubusercontent.com/crawles/Logos/master/jupyterPrefs.js").read()
HTML('<script>{}</script>'.format(jPrefs))

In [3]:
# Set default cell width
display(HTML('<style>.container {width:80% !important;}</style>'))

# Set default matplotlib settings
plt.rcParams['figure.figsize'] = (10, 7)
plt.rcParams['lines.linewidth'] = 3
plt.rcParams['figure.titlesize'] = 26
plt.rcParams['axes.labelsize'] = 18
plt.rcParams['axes.titlesize'] = 22
plt.rcParams['xtick.labelsize'] = 14
plt.rcParams['ytick.labelsize'] = 14
plt.rcParams['legend.fontsize'] = 16

# Set seaborn colours
blue, green, red, purple, yellow, cyan = sns.color_palette('deep')

# Data

In [4]:
column_names = ["sex", "length", "diameter", "height", "whole weight", 
                "shucked weight", "viscera weight", "shell weight", "rings"]
abalone_df = pd.read_csv('abalone.csv', names=column_names)
abalone_df['sex'] = abalone_df['sex'].map({'F': 0, 'I': 1, 'M': 2})
abalone_df.head()

Unnamed: 0,sex,length,diameter,height,whole weight,shucked weight,viscera weight,shell weight,rings
0,2,0.455,0.365,0.095,0.514,0.2245,0.101,0.15,15
1,2,0.35,0.265,0.09,0.2255,0.0995,0.0485,0.07,7
2,0,0.53,0.42,0.135,0.677,0.2565,0.1415,0.21,9
3,2,0.44,0.365,0.125,0.516,0.2155,0.114,0.155,10
4,1,0.33,0.255,0.08,0.205,0.0895,0.0395,0.055,7


## Spark DataFrames
Spark is moving towards DataFrames as opposed to RDDs.
- <a href="https://databricks.com/blog/2015/02/17/introducing-dataframes-in-spark-for-large-scale-data-science.html">https://databricks.com/blog/2015/02/17/introducing-dataframes-in-spark-for-large-scale-data-science.html</a>
- <a href="https://www.quora.com/Why-are-there-two-ML-implementations-in-Spark-ML-and-MLlib-and-what-are-their-different-features">https://www.quora.com/Why-are-there-two-ML-implementations-in-Spark-ML-and-MLlib-and-what-are-their-different-features</a>

We will use the abalone data set for this example. We first take the Pandas DataFrame containing the abalone data and create a Spark DataFrame out of it. 

In [5]:
abalone_sdf = sqlContext.createDataFrame(abalone_df)

Under the RDD framework, the way to look at a subset of the data is to use the `take` function.

In [6]:
abalone_sdf.take(5)

[Row(sex=2, length=0.455, diameter=0.365, height=0.095, whole weight=0.514, shucked weight=0.2245, viscera weight=0.10099999999999999, shell weight=0.15, rings=15),
 Row(sex=2, length=0.35, diameter=0.265, height=0.09, whole weight=0.2255, shucked weight=0.0995, viscera weight=0.0485, shell weight=0.07, rings=7),
 Row(sex=0, length=0.53, diameter=0.42, height=0.135, whole weight=0.677, shucked weight=0.2565, viscera weight=0.1415, shell weight=0.21, rings=9),
 Row(sex=2, length=0.44, diameter=0.365, height=0.125, whole weight=0.516, shucked weight=0.2155, viscera weight=0.114, shell weight=0.155, rings=10),
 Row(sex=1, length=0.33, diameter=0.255, height=0.08, whole weight=0.205, shucked weight=0.0895, viscera weight=0.0395, shell weight=0.055, rings=7)]

However, we can see that this returns a list of `Row` types. This is not very pretty. We can instead take a `limit` and then use the `toPandas` function to convert the Spark DataFrame to a Pandas DataFrame.

In [7]:
# Convert to Pandas DataFrame
abalone_sdf.limit(5).toPandas()

Unnamed: 0,sex,length,diameter,height,whole weight,shucked weight,viscera weight,shell weight,rings
0,2,0.455,0.365,0.095,0.514,0.2245,0.101,0.15,15
1,2,0.35,0.265,0.09,0.2255,0.0995,0.0485,0.07,7
2,0,0.53,0.42,0.135,0.677,0.2565,0.1415,0.21,9
3,2,0.44,0.365,0.125,0.516,0.2155,0.114,0.155,10
4,1,0.33,0.255,0.08,0.205,0.0895,0.0395,0.055,7


# Operations on DataFrames

In [8]:
# Apply filters using SQL syntax. Cannot put a ';' at the end of the
# query or it will throw an error.
abalone_sdf.registerTempTable('abalone')
sql = '''
SELECT *
  FROM abalone
 WHERE length > 0.3
'''
sqlContext.sql(sql)\
    .limit(5)\
    .toPandas()

Unnamed: 0,sex,length,diameter,height,whole weight,shucked weight,viscera weight,shell weight,rings
0,2,0.455,0.365,0.095,0.514,0.2245,0.101,0.15,15
1,2,0.35,0.265,0.09,0.2255,0.0995,0.0485,0.07,7
2,0,0.53,0.42,0.135,0.677,0.2565,0.1415,0.21,9
3,2,0.44,0.365,0.125,0.516,0.2155,0.114,0.155,10
4,1,0.33,0.255,0.08,0.205,0.0895,0.0395,0.055,7


In [9]:
# We can do the same thing in pure Spark.
abalone_sdf\
    .where(col('length') > 0.3)\
    .limit(5)\
    .toPandas()

Unnamed: 0,sex,length,diameter,height,whole weight,shucked weight,viscera weight,shell weight,rings
0,2,0.455,0.365,0.095,0.514,0.2245,0.101,0.15,15
1,2,0.35,0.265,0.09,0.2255,0.0995,0.0485,0.07,7
2,0,0.53,0.42,0.135,0.677,0.2565,0.1415,0.21,9
3,2,0.44,0.365,0.125,0.516,0.2155,0.114,0.155,10
4,1,0.33,0.255,0.08,0.205,0.0895,0.0395,0.055,7


In [10]:
abalone_sdf\
    .select('sex', 'length')\
    .orderBy('length')\
    .limit(5)\
    .toPandas()

Unnamed: 0,sex,length
0,1,0.075
1,1,0.11
2,1,0.13
3,1,0.13
4,1,0.135


In [11]:
abalone_sdf\
    .select(['sex', 'length'])\
    .orderBy('length')\
    .limit(5)\
    .toPandas()

Unnamed: 0,sex,length
0,1,0.075
1,1,0.11
2,1,0.13
3,1,0.13
4,1,0.135


In [12]:
train_sdf, test_sdf = abalone_sdf.randomSplit([0.8, 0.2])
test_sdf\
    .limit(5)\
    .toPandas()

Unnamed: 0,sex,length,diameter,height,whole weight,shucked weight,viscera weight,shell weight,rings
0,0,0.325,0.26,0.09,0.1915,0.085,0.036,0.062,7
1,0,0.345,0.25,0.09,0.203,0.078,0.059,0.055,6
2,0,0.36,0.265,0.09,0.2065,0.078,0.057,0.06,8
3,0,0.37,0.275,0.1,0.2225,0.093,0.026,0.08,8
4,0,0.37,0.28,0.11,0.2305,0.0945,0.0465,0.075,10


# Modelling

Modelling requires input columns label and features. We will need to take all of our feature information and group them together into a Vector. We use the `VectorAssembler` function to achieve this.

In [13]:
assembler = VectorAssembler(inputCols=train_sdf.columns[1:],
                            outputCol='features'
                           )

train_assemble_sdf = assembler.transform(train_sdf)
test_assemble_sdf = assembler.transform(test_sdf)

train_assemble_sdf\
    .limit(5)\
    .toPandas()

Unnamed: 0,sex,length,diameter,height,whole weight,shucked weight,viscera weight,shell weight,rings,features
0,0,0.275,0.195,0.07,0.08,0.031,0.0215,0.025,5,"[0.275, 0.195, 0.07, 0.08, 0.031, 0.0215, 0.02..."
1,0,0.305,0.23,0.08,0.156,0.0675,0.0345,0.048,7,"[0.305, 0.23, 0.08, 0.156, 0.0675, 0.0345, 0.0..."
2,0,0.345,0.26,0.09,0.207,0.0775,0.0435,0.0765,10,"[0.345, 0.26, 0.09, 0.207, 0.0775, 0.0435, 0.0..."
3,0,0.36,0.27,0.09,0.1885,0.0845,0.0385,0.055,5,"[0.36, 0.27, 0.09, 0.1885, 0.0845, 0.0385, 0.0..."
4,0,0.37,0.29,0.115,0.25,0.111,0.057,0.075,9,"[0.37, 0.29, 0.115, 0.25, 0.111, 0.057, 0.075,..."


Next, we need to change our label using StringIndexer which converts our categorical values into indices. They are ordered by frequency.

In [14]:
string_indexer = StringIndexer(inputCol='sex', outputCol='sex_label')
si_model = string_indexer.fit(train_assemble_sdf)
train_index_sdf = si_model.transform(train_assemble_sdf)
test_index_sdf = si_model.transform(test_assemble_sdf)

train_index_sdf\
    .limit(5)\
    .toPandas()

Unnamed: 0,sex,length,diameter,height,whole weight,shucked weight,viscera weight,shell weight,rings,features,sex_label
0,0,0.275,0.195,0.07,0.08,0.031,0.0215,0.025,5,"[0.275, 0.195, 0.07, 0.08, 0.031, 0.0215, 0.02...",2.0
1,0,0.305,0.23,0.08,0.156,0.0675,0.0345,0.048,7,"[0.305, 0.23, 0.08, 0.156, 0.0675, 0.0345, 0.0...",2.0
2,0,0.345,0.26,0.09,0.207,0.0775,0.0435,0.0765,10,"[0.345, 0.26, 0.09, 0.207, 0.0775, 0.0435, 0.0...",2.0
3,0,0.36,0.27,0.09,0.1885,0.0845,0.0385,0.055,5,"[0.36, 0.27, 0.09, 0.1885, 0.0845, 0.0385, 0.0...",2.0
4,0,0.37,0.29,0.115,0.25,0.111,0.057,0.075,9,"[0.37, 0.29, 0.115, 0.25, 0.111, 0.057, 0.075,...",2.0


Finally, we can set up and run our model.

In [15]:
# Set up model
rf_model = RandomForestClassifier(numTrees=3, maxDepth=2, labelCol='sex_label')
# Train the model
rf_clf = rf_model.fit(train_index_sdf)
rf_clf

RandomForestClassificationModel (uid=rfc_8399174cde78) with 3 trees

Now, we apply the model to the test set.

In [16]:
rf_results = rf_clf.transform(test_index_sdf)
rf_results\
    .limit(5)\
    .toPandas()

Unnamed: 0,sex,length,diameter,height,whole weight,shucked weight,viscera weight,shell weight,rings,features,sex_label,rawPrediction,probability,prediction
0,0,0.325,0.26,0.09,0.1915,0.085,0.036,0.062,7,"[0.325, 0.26, 0.09, 0.1915, 0.085, 0.036, 0.06...",2.0,"[0.450149354117, 2.38000046113, 0.169850184758]","[0.150049784706, 0.793333487042, 0.0566167282528]",1.0
1,0,0.345,0.25,0.09,0.203,0.078,0.059,0.055,6,"[0.345, 0.25, 0.09, 0.203, 0.078, 0.059, 0.055...",2.0,"[0.450149354117, 2.38000046113, 0.169850184758]","[0.150049784706, 0.793333487042, 0.0566167282528]",1.0
2,0,0.36,0.265,0.09,0.2065,0.078,0.057,0.06,8,"[0.36, 0.265, 0.09, 0.2065, 0.078, 0.057, 0.06...",2.0,"[0.450149354117, 2.38000046113, 0.169850184758]","[0.150049784706, 0.793333487042, 0.0566167282528]",1.0
3,0,0.37,0.275,0.1,0.2225,0.093,0.026,0.08,8,"[0.37, 0.275, 0.1, 0.2225, 0.093, 0.026, 0.08,...",2.0,"[0.450149354117, 2.38000046113, 0.169850184758]","[0.150049784706, 0.793333487042, 0.0566167282528]",1.0
4,0,0.37,0.28,0.11,0.2305,0.0945,0.0465,0.075,10,"[0.37, 0.28, 0.11, 0.2305, 0.0945, 0.0465, 0.0...",2.0,"[0.605302779606, 1.93192397624, 0.462773244154]","[0.201767593202, 0.643974658747, 0.154257748051]",1.0


# Pipeline
In each of these above steps (Assembling, Indexing, and Modelling), we had to save a different object for each step, then fit and/or transform. We can circumvent this by using the `Pipeline` class, which allows us to specify these a set of these objects in a series, then fit the pipeline. Here, we will create a pipeline for assembling and indexing. We can also put the model into this pipeline, but if we did, we would lose the object that would give us other properties such as feature importance or regression coefficients.

In [17]:
assembler = VectorAssembler(inputCols=train_sdf.columns[1:],
                            outputCol='features'
                           )
string_indexer = StringIndexer(inputCol='sex', outputCol='label')
rf_model = RandomForestClassifier(numTrees=3, maxDepth=2, labelCol='sex_label')

pre_process_pipeline = Pipeline(stages=[assembler, string_indexer])

We can apply the pipeline to the original training set, that is, the one without the features assembled or label indexed.

In [18]:
pre_process_model = pre_process_pipeline.fit(train_sdf)
pre_process_train_sdf = pre_process_model.transform(train_sdf)
pre_process_test_sdf = pre_process_model.transform(test_sdf)

In [19]:
train_sdf\
    .limit(5)\
    .toPandas()

Unnamed: 0,sex,length,diameter,height,whole weight,shucked weight,viscera weight,shell weight,rings
0,0,0.275,0.195,0.07,0.08,0.031,0.0215,0.025,5
1,0,0.305,0.23,0.08,0.156,0.0675,0.0345,0.048,7
2,0,0.345,0.26,0.09,0.207,0.0775,0.0435,0.0765,10
3,0,0.36,0.27,0.09,0.1885,0.0845,0.0385,0.055,5
4,0,0.37,0.29,0.115,0.25,0.111,0.057,0.075,9


In [20]:
pre_process_train_sdf\
    .limit(5)\
    .toPandas()

Unnamed: 0,sex,length,diameter,height,whole weight,shucked weight,viscera weight,shell weight,rings,features,label
0,0,0.275,0.195,0.07,0.08,0.031,0.0215,0.025,5,"[0.275, 0.195, 0.07, 0.08, 0.031, 0.0215, 0.02...",2.0
1,0,0.305,0.23,0.08,0.156,0.0675,0.0345,0.048,7,"[0.305, 0.23, 0.08, 0.156, 0.0675, 0.0345, 0.0...",2.0
2,0,0.345,0.26,0.09,0.207,0.0775,0.0435,0.0765,10,"[0.345, 0.26, 0.09, 0.207, 0.0775, 0.0435, 0.0...",2.0
3,0,0.36,0.27,0.09,0.1885,0.0845,0.0385,0.055,5,"[0.36, 0.27, 0.09, 0.1885, 0.0845, 0.0385, 0.0...",2.0
4,0,0.37,0.29,0.115,0.25,0.111,0.057,0.075,9,"[0.37, 0.29, 0.115, 0.25, 0.111, 0.057, 0.075,...",2.0


In [21]:
lr_model = LogisticRegression(maxIter=1000, regParam=1.0)
lr_clf = lr_model.fit(pre_process_train_sdf)

In [22]:
lr_clf.coefficientMatrix

DenseMatrix(3, 8, [0.2687, 0.3461, 0.7352, 0.0898, 0.2073, 0.3757, 0.2823, 0.0102, ..., 0.4235, 0.5437, 1.2315, 0.0982, 0.1656, 0.5003, 0.3661, 0.0169], 1)

In [23]:
test_pred_sdf = lr_clf.transform(pre_process_test_sdf)
test_pred_sdf\
    .limit(5)\
    .toPandas()

Unnamed: 0,sex,length,diameter,height,whole weight,shucked weight,viscera weight,shell weight,rings,features,label,rawPrediction,probability,prediction
0,0,0.325,0.26,0.09,0.1915,0.085,0.036,0.062,7,"[0.325, 0.26, 0.09, 0.1915, 0.085, 0.036, 0.06...",2.0,"[-0.236427150554, 0.786129248705, -0.549702098...","[0.221663526142, 0.616289600271, 0.162046873587]",1.0
1,0,0.345,0.25,0.09,0.203,0.078,0.059,0.055,6,"[0.345, 0.25, 0.09, 0.203, 0.078, 0.059, 0.055...",2.0,"[-0.238440021138, 0.793063769722, -0.554623748...","[0.220545744487, 0.6186928353, 0.160761420214]",1.0
2,0,0.36,0.265,0.09,0.2065,0.078,0.057,0.06,8,"[0.36, 0.265, 0.09, 0.2065, 0.078, 0.057, 0.06...",2.0,"[-0.207896524299, 0.713098196191, -0.505201671...","[0.235039446423, 0.59036918451, 0.174591369068]",1.0
3,0,0.37,0.275,0.1,0.2225,0.093,0.026,0.08,8,"[0.37, 0.275, 0.1, 0.2225, 0.093, 0.026, 0.08,...",2.0,"[-0.195848572141, 0.683193983099, -0.487345410...","[0.240631211284, 0.579582532645, 0.179786256071]",1.0
4,0,0.37,0.28,0.11,0.2305,0.0945,0.0465,0.075,10,"[0.37, 0.28, 0.11, 0.2305, 0.0945, 0.0465, 0.0...",2.0,"[-0.159101073434, 0.588213190107, -0.429112116...","[0.258084962785, 0.54490044452, 0.197014592695]",1.0


# Extracting the Probability
Here's a really annoying thing about ML. The `rawPrediction` and `probability` columns come as vector types--not array types. Hence, we cannot index them directly.

In [24]:
lr_clf.transform(pre_process_test_sdf)

DataFrame[sex: bigint, length: double, diameter: double, height: double, whole weight: double, shucked weight: double, viscera weight: double, shell weight: double, rings: bigint, features: vector, label: double, rawPrediction: vector, probability: vector, prediction: double]

In [25]:
lr_clf\
    .transform(pre_process_test_sdf)\
    .select(col('probability')[0])

AnalysisException: u"Can't extract value from probability#410;"

In [26]:
lr_clf\
    .transform(pre_process_test_sdf)\
    .selectExpr('probability[0]')

AnalysisException: u"Can't extract value from probability#470;"

A way to get around this is to change it into an RDD, convert the column into an array, then change the RDD back into a Spark DataFrame. From here, we can extract

In [27]:
extracted_pred_sdf = lr_clf\
    .transform(pre_process_test_sdf)\
    .rdd\
    .map(lambda x: Row(probability=[float(i) for i in x.probability],
                       first_value=float(x.probability[0]),
                       array_length=int(len(x.probability))
                      )
        )\
    .toDF()
    
extracted_pred_sdf

DataFrame[array_length: bigint, first_value: double, probability: array<double>]

Now, that our `probability` column is an array type, we can index it.

In [28]:
extracted_pred_sdf\
    .select('*', col('probability')[0])\
    .limit(5)\
    .toPandas()

Unnamed: 0,array_length,first_value,probability,probability[0]
0,3,0.221664,"[0.221663526142, 0.616289600271, 0.162046873587]",0.221664
1,3,0.220546,"[0.220545744487, 0.6186928353, 0.160761420214]",0.220546
2,3,0.235039,"[0.235039446423, 0.59036918451, 0.174591369068]",0.235039
3,3,0.240631,"[0.240631211284, 0.579582532645, 0.179786256071]",0.240631
4,3,0.258085,"[0.258084962785, 0.54490044452, 0.197014592695]",0.258085
