In [None]:
def display(*args, **kargs): pass

# Decision Trees
 
This lab covers decision trees and random forests, while introducing metadata, cross-validation, `StringIndexer`, and `PolynomialExpansion`.

#### Prepare the data

Load in the data from a parquet file.

In [None]:
baseDir = '/mnt/ml-class/'
irisFourFeatures = sqlContext.read.parquet(baseDir + 'irisFourFeatures.parquet')
print '\n'.join(map(repr, irisFourFeatures.take(2)))

Convert the data from `SparseVector` to `DenseVector` types.

In [None]:
from pyspark.sql.functions import udf
from pyspark.mllib.linalg import Vectors, VectorUDT, DenseVector

sparseToDense = udf(lambda sv: Vectors.dense(sv.toArray()), VectorUDT())
irisDense = irisFourFeatures.select(sparseToDense('features').alias('features'), 'label')

print '\n'.join(map(repr, irisDense.take(2)))

Save the new format for use in another notebook.

In [None]:
#irisDense.write.mode('overwrite').parquet('/tmp/irisDense.parquet')

Visualize the data.

In [None]:
import matplotlib.pyplot as plt
import matplotlib.cm as cm
import numpy as np

def prepareSubplot(xticks, yticks, figsize=(10.5, 6), hideLabels=False, gridColor='#999999',
                gridWidth=1.0, subplots=(1, 1)):
    """Template for generating the plot layout."""
    plt.close()
    fig, axList = plt.subplots(subplots[0], subplots[1], figsize=figsize, facecolor='white',
                               edgecolor='white')
    if not isinstance(axList, np.ndarray):
        axList = np.array([axList])

    for ax in axList.flatten():
        ax.axes.tick_params(labelcolor='#999999', labelsize='10')
        for axis, ticks in [(ax.get_xaxis(), xticks), (ax.get_yaxis(), yticks)]:
            axis.set_ticks_position('none')
            axis.set_ticks(ticks)
            axis.label.set_color('#999999')
            if hideLabels: axis.set_ticklabels([])
        ax.grid(color=gridColor, linewidth=gridWidth, linestyle='-')
        map(lambda position: ax.spines[position].set_visible(False), ['bottom', 'top', 'left', 'right'])

    if axList.size == 1:
        axList = axList[0]  # Just return a single axes object for a regular plot
    return fig, axList

In [None]:
data = irisDense.collect()
features, labels = zip(*data)
x1, y1, x2, y2 = zip(*features)

colorMap = 'Set1'  # was 'Set2', 'Set1', 'Dark2', 'winter'

fig, axList = prepareSubplot(np.arange(-1, 1.1, .2), np.arange(-1, 1.1, .2), figsize=(11., 5.), subplots=(1, 2))
ax0, ax1 = axList

ax0.scatter(x1, y1, s=14**2, c=labels, edgecolors='#444444', alpha=0.80, cmap=colorMap)
ax0.set_xlabel('Sepal Length'), ax0.set_ylabel('Sepal Width')

ax1.scatter(x2, y2, s=14**2, c=labels, edgecolors='#444444', alpha=0.80, cmap=colorMap)
ax1.set_xlabel('Petal Length'), ax1.set_ylabel('Petal Width')

fig.tight_layout()

display(fig)

Split the data into train and test sets and visualize the datasets.

In [None]:
irisTest, irisTrain = irisDense.randomSplit([.30, .70], seed=1)
irisTest.cache()
irisTrain.cache()

print 'Items in test datset: {0}'.format(irisTest.count())
print 'Items in train dataset: {0}'.format(irisTrain.count())

In [None]:
dataTrain = irisTrain.collect()
featuresTrain, labelsTrain = zip(*dataTrain)
x1Train, y1Train, x2Train, y2Train = zip(*featuresTrain)

dataTest = irisTest.collect()
featuresTest, labelsTest = zip(*dataTest)
x1Test, y1Test, x2Test, y2Test = zip(*featuresTest)

trainPlot1 = (x1Train, y1Train, labelsTrain, 'Train Data', 'Sepal Length', 'Sepal Width')
trainPlot2 = (x2Train, y2Train, labelsTrain, 'Train Data', 'Petal Length', 'Petal Width')
testPlot1 = (x1Test, y1Test, labelsTest, 'Test Data', 'Sepal Length', 'Sepal Width')
testPlot2 = (x2Test, y2Test, labelsTest, 'Test Data', 'Petal Length', 'Petal Width')
plotData = [trainPlot1, testPlot1, trainPlot2, testPlot2]

In [None]:
fig, axList = prepareSubplot(np.arange(-1, 1.1, .2), np.arange(-1, 1.1, .2), figsize=(11.,10.), subplots=(2, 2))

for ax, pd in zip(axList.flatten(), plotData):
    ax.scatter(pd[0], pd[1], s=14**2, c=pd[2], edgecolors='#444444', alpha=0.80, cmap=colorMap)
    ax.set_xlabel(pd[4]), ax.set_ylabel(pd[5])
    ax.set_title(pd[3], color='#999999')

    ax.set_xlim((-1.1, 1.1))
    ax.set_ylim((-1.1, 1.1))

fig.tight_layout()

display(fig)

#### Update the metadata for decision trees and build a tree

We use `StringIndexer` on our labels in order to obtain a `DataFrame` that decision trees can work with.  Here are the [Python](http://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.feature.StringIndexer) and [Scala](https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.ml.feature.StringIndexer) APIs for `StringIndexer`.
 
You'll need to set the input column to "label", the output column to "indexed", and fit and transform using the `irisTrain` `DataFrame`.

In [None]:
# ANSWER
from pyspark.ml.feature import StringIndexer

stringIndexer = (StringIndexer()
                 .setInputCol('label')
                 .setOutputCol('indexed'))

indexerModel = stringIndexer.fit(irisTrain)
irisTrainIndexed = indexerModel.transform(irisTrain)
display(irisTrainIndexed)

In [None]:
# TEST
from test_helper import Test
Test.assertEquals(irisTrainIndexed.select('indexed').take(50)[-1][0], 2.0, 'incorrect values in indexed column')
Test.assertTrue(irisTrainIndexed.schema.fields[2].metadata != {}, 'indexed should have metadata')

We've updated the metadata for the field.  Now we know that the field takes on three values and is nominal.

In [None]:
print irisTrainIndexed.schema.fields[1].metadata
print irisTrainIndexed.schema.fields[2].metadata

Let's build a decision tree to classify our data.

In [None]:
from pyspark.ml.classification import DecisionTreeClassifier
help(DecisionTreeClassifier)

In [None]:
dt = (DecisionTreeClassifier()
      .setLabelCol('indexed')
      .setMaxDepth(5)
      .setMaxBins(10)
      .setImpurity('gini'))

In [None]:
print dt.explainParam('impurity')
print '\n', dt.explainParam('maxBins')

View all of the parameters to see if there is anything we'd like to update.

In [None]:
print dt.explainParams()

Fit the model and display predictions on the test data.

In [None]:
dtModel = dt.fit(irisTrainIndexed)
predictionsTest = dtModel.transform(indexerModel.transform(irisTest))
display(predictionsTest)

Next, we'll evaluate the results of the model.

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
multiEval = (MulticlassClassificationEvaluator()
             .setMetricName('precision')
             .setLabelCol('indexed'))

print multiEval.evaluate(predictionsTest)

View the decision tree model.

In [None]:
dtModelString = dtModel._java_obj.toDebugString()
print dtModelString

In [None]:
readableModel = dtModelString
for feature, name in enumerate(['Sepal Length', 'Sepal Width', 'Petal Length', 'Petal Width']):
    readableModel = readableModel.replace('feature {0}'.format(feature), name)

print readableModel

#### Cross-validation

Let's go ahead and find the best cross-validated model.  A `CrossValidator` requires an estimator to build the models, an evaluator to compare the performance of the models, a parameter grid that specifies which estimator parameters to tune, and the number of folds to use.
 
There is a good example in the [ML Guide](http://spark.apache.org/docs/latest/ml-guide.html#example-model-selection-via-cross-validation), although it is only in Scala.  The Python code is very similar.
 
The estimator that we will use is a `Pipeline` that has `stringIndexer` and `dt`.
 
The evaluator will be `multiEval`.  You just need to make sure the metric is "precision".
 
We'll use `ParamGridBuilder` to build a grid with `dt.maxDepth` values of 2, 4, 6, and 10 (in that order).
 
Finally, we'll use 5 folds.

In [None]:
# ANSWER
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.pipeline import Pipeline

cvPipeline = Pipeline().setStages([stringIndexer, dt])

multiEval.setMetricName('precision')

paramGrid = (ParamGridBuilder()
             .addGrid(dt.maxDepth, [2, 4, 6, 10])
             .build())

cv = (CrossValidator()
      .setEstimator(cvPipeline)
      .setEvaluator(multiEval)
      .setEstimatorParamMaps(paramGrid)
      .setNumFolds(5))

cvModel = cv.fit(irisTrain)
predictions = cvModel.transform(irisTest)
print multiEval.evaluate(predictions)

In [None]:
# TEST
Test.assertEquals(round(multiEval.evaluate(predictions), 2), .98, 'incorrect predictions')

What was our best model?

In [None]:
bestDTModel = cvModel.bestModel.stages[-1]
print bestDTModel

Let's see more details on what parameters were used to build the best model.

In [None]:
print bestDTModel._java_obj.parent().explainParams()

In [None]:
print bestDTModel._java_obj.parent().getMaxDepth()

#### Random forest and `PolynomialExpansion`
 
Next, we'll build a random forest.  Since we only have a few features and random forests tend to work better with a lot of features, we'll expand our features using `PolynomialExpansion`.

In [None]:
from pyspark.ml.feature import PolynomialExpansion

px = (PolynomialExpansion()
      .setInputCol('features')
      .setOutputCol('polyFeatures'))

print px.explainParams()

Next, we'll use the `RandomForestClassifier` to build our random forest model.

In [None]:
from pyspark.ml.classification import RandomForestClassifier

rf = (RandomForestClassifier()
      .setLabelCol('indexed')
      .setFeaturesCol('polyFeatures'))

print rf.explainParams()

Let's set some params based on what we just read.

In [None]:
(rf
 .setMaxBins(10)
 .setMaxDepth(2)
 .setNumTrees(20)
 .setSeed(0))

Next, we'll build a pipeline that includes the `StringIndexer`, `PolynomialExpansion`, and `RandomForestClassifier`.

In [None]:
rfPipeline = Pipeline().setStages([stringIndexer, px, rf])
rfModelPipeline = rfPipeline.fit(irisTrain)
rfPredictions = rfModelPipeline.transform(irisTest)

print multiEval.evaluate(rfPredictions)

In [None]:
display(rfPredictions)

So what exactly did `PolynomialExpansion` do?

All interactions
 
\\[ \begin{bmatrix} a \times a & b \times a & c \times a & d \times a \\\ a \times b & b \times b & c \times b & d \times b \\\ a \times c & b \times c & c \times c & d \times c \\\ a \times d & b \times d & c \times d & d \times d \end{bmatrix}  \\]
 
Remove duplicates
 
\\[ \begin{bmatrix} a \times a \\\ a \times b & b \times b \\\ a \times c & b \times c & c \times c \\\ a \times d & b \times d & c \times d & d \times d \end{bmatrix}  \\]
 
Plus the original features
 
\\[ \begin{bmatrix} a & b & c & d \end{bmatrix} \\]

Can we do better?  Let's build a grid of params and search using `CrossValidator`.

In [None]:
paramGridRand = (ParamGridBuilder()
                 .addGrid(rf.maxDepth, [2, 4, 8, 12])
                 .baseOn({rf.numTrees, 20})
                 .build())

cvRand = (CrossValidator()
          .setEstimator(rfPipeline)
          .setEvaluator(multiEval)
          .setEstimatorParamMaps(paramGridRand)
          .setNumFolds(2))

cvModelRand = cvRand.fit(irisTrain)
predictionsRand = cvModelRand.transform(irisTest)
print multiEval.evaluate(predictionsRand)
print cvModelRand.bestModel.stages[-1]._java_obj.parent().getMaxDepth()

Finally, let's view the resulting model.

In [None]:
print cvModelRand.bestModel.stages[-1]._java_obj.toDebugString()