# Introduction to PySpark with Jupyter

## Import the module

This is already installed in the docker container, so simply import it here.

## Create a Spark Context

This should only be done once per session.  Guarding the creation with the "try" block ensures that we will only create the context the first time the cell is executed.

In [1]:
import pyspark

# Create a Spark context for local work
try:
    sc
except:
    sc = pyspark.SparkContext('local[*]')


## Prove the module is available

Create a simple example and execute it in order to demonstrate that the module working correctly and the context is configured correctly.

In [2]:
# Prove that Spark is installed and working correctly
rdd = sc.parallelize(range(1000))
rdd.takeSample(False, 5)

[391, 630, 325, 456, 673]

### Use the Forest CoverType dataset

The _Covtype_ data set is available online from: https://archive.ics.uci.edu/ml/machine-learning-databases/covtype/
The file _covtype.data.gz_ includes the type data and _covtype.info_ includes the metadata.  This is makde available from UC Irvine Machine Learning Repository and was originally provided by Colorado State University.

This dataset has also been used in a *Kaggle* competition (https://www.kaggle.com/c/forest-cover-type-prediction).

First lets make sure that the file is available locally:

In [3]:
import urllib.request

url = 'https://archive.ics.uci.edu/ml/machine-learning-databases/covtype/covtype.data.gz'
localfile = 'data/covtype.data.gz'
localfile, headers = urllib.request.urlretrieve(url, localfile)

Now that we have the data locally, we can grab it through an RDD.  Use a simple textfile RDD since this is an ASCII CSV file:

In [4]:
rawData = sc.textFile(localfile)

Now we can transform the raw data into a sequence of LabeledPoints (from MLLib):

In [5]:
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.regression import LabeledPoint

# Extract the dataset features and target.
def ingest(line):
    # Simple numeric features (some are one-hot encoded).
    # Last field is the label (training target).
    fields = [float(f) for f in line.split(',')]
    features = Vectors.dense(fields[0:len(fields)-1])
    
    # Subtract 1 from the label to satisfy the '0' based
    # DecisionTree model.
    label    = fields[-1] - 1
    return LabeledPoint(label,features)

pointdata = rawData.map(ingest)

Now that we have our data in a format we can train a model with, lets look at a few entries to see if it is as expected:

In [6]:
pointdata.take(5)

[LabeledPoint(4.0, [2596.0,51.0,3.0,258.0,0.0,510.0,221.0,232.0,148.0,6279.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0]),
 LabeledPoint(4.0, [2590.0,56.0,2.0,212.0,-6.0,390.0,220.0,235.0,151.0,6225.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0]),
 LabeledPoint(1.0, [2804.0,139.0,9.0,268.0,65.0,3180.0,234.0,238.0,135.0,6121.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0]),
 LabeledPoint(1.0, [2785.0,155.0,18.0,242.0,118.0,3090.0,238.0,238.0,122.0,6211.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.

and we see that we have a sequence of LabeledPoint objects, each with a label (numeric in this case, since we ensured that the values would be converted to float values).  There is a DenseMatrix element included for each point, with 55 (float) values corresponding to the features.

The first 10 features are numeric.  The next two are categorical and have been encoded as one-hot codes.  The _Wilderness Areas_ feature takes 4 columns, and the _Soil Type_ takes up 40 columns.

The _Cover Type_ factor is used as the label is a numerically encoded categorical factor.  It is important to ensure that this encoded value is not treated as a number - no ordering in the feature is implied.  Each value stands alone and simply indicates a category and not a relationship with any other feature.

### Data Partitioning

At this point, we can partition the data into a training set, a validation set, and a testing set.  We will use the training set to train the model, and the validation set to measure how our training is performing.  We can adjust model hyperparameters to adjust training results as measured by the validation dataset without issue.

We will only run predictions against the testing dataset and no information from testing will be used during training or validation.  This keeps our confidence in the test performance measurements high.

For this example we can simply split the data randomly into the 3 sets of data, with 80% used for training, and 10% used for each of validation and testing:

In [7]:
trainData, cvData, testData = pointdata.randomSplit([0.8,0.1,0.1])
trainData.cache()
cvData.cache()
testData.cache()

print(trainData.count(),cvData.count(),testData.count())

464813 58010 58189


we can see that the data partitioning has separated the original dataset into suitable size segments that we can use for our different purposes.

### Train a Decision Tree Model

At this point we can go ahead and train the model.  Here we are using the Spark MLLib DecisionTree (http://spark.apache.org/docs/latest/api/python/pyspark.mllib.html#pyspark.mllib.tree.DecisionTree) and DecisionTreeModel (http://spark.apache.org/docs/latest/api/python/pyspark.mllib.html#pyspark.mllib.tree.DecisionTreeModel) for this purpose.  Read more about these and the hyperparameters available from the documentation.

In [8]:
from pyspark.mllib.tree import DecisionTree, DecisionTreeModel
from pyspark.mllib.util import MLUtils

model = DecisionTree.trainClassifier( trainData, 7, {}, "gini", 4, 100)

At this point we have a trained model and we would like to evaluate its performance.  We do this with the validation data.  If we find that we want to increase the performance, we can adjust hyperparameters or even select a different model type depending on the outcome from this evaluation.

Using the MLLib MulticlassMetrics object allows us to very simply evaluate the prediction performance of the model.  Here we perform the predictions, then form a labelled set of results that can be evaluated.

In [9]:
from pyspark.mllib.evaluation import MulticlassMetrics
cvPredictions = model.predict( cvData.map(lambda x: x.features))
cvLabelsAndPredictions = cvData.map(lambda x: x.label).zip(cvPredictions)

cvMetrics = MulticlassMetrics(cvLabelsAndPredictions)


And now we can extract some performance results from the metrics object:

In [10]:
print('                 precision: ',cvMetrics.precision(),'\n',
      '                   recall: ',cvMetrics.recall(),'\n',
      '                 fMeasure: ',cvMetrics.fMeasure(),'\n',
      '        weightedPrecision: ',cvMetrics.weightedPrecision,'\n',
      '           weightedRecall: ',cvMetrics.weightedRecall,'\n',
      'weightedFalsePositiveRate: ',cvMetrics.weightedFalsePositiveRate,'\n',
      ' weightedTruePositiveRate: ',cvMetrics.weightedTruePositiveRate,'\n',
      '\nConfusion Matrix:'
     )

import numpy as np
print(cvMetrics.confusionMatrix().toArray().astype(int))

                 precision:  0.7023789001896225 
                    recall:  0.7023789001896225 
                  fMeasure:  0.7023789001896225 
         weightedPrecision:  0.743841763253158 
            weightedRecall:  0.7023789001896226 
 weightedFalsePositiveRate:  0.18384149482164294 
  weightedTruePositiveRate:  0.7023789001896226 
 
Confusion Matrix:
[[14341  5586     0     0     0  1153]
 [ 6438 22322   433     0   516    30]
 [   11   356  3010   159  1144     0]
 [    0    17    61   126    31     0]
 [    1     9    12     0    56     0]
 [  328    34     0     0     0   890]]


Once we are satisfied with the models predictive performance (hint - the performance above is not very satisfying!), we can then evaluate the model on the test data, which we expect will give slightly lower performance than the validation data since some work flows include iteration over the validation data more than once.

In [11]:
testPredictions = model.predict( testData.map(lambda x: x.features))
testLabelsAndPredictions = testData.map(lambda x: x.label).zip(testPredictions)

testMetrics = MulticlassMetrics(testLabelsAndPredictions)


In [12]:
print('                 precision: ',testMetrics.precision(),'\n',
      '                   recall: ',testMetrics.recall(),'\n',
      '                 fMeasure: ',testMetrics.fMeasure(),'\n',
      '        weightedPrecision: ',testMetrics.weightedPrecision,'\n',
      '           weightedRecall: ',testMetrics.weightedRecall,'\n',
      'weightedFalsePositiveRate: ',testMetrics.weightedFalsePositiveRate,'\n',
      ' weightedTruePositiveRate: ',testMetrics.weightedTruePositiveRate,'\n',
      '\nConfusion Matrix:'
     )

print(testMetrics.confusionMatrix().toArray().astype(int))

                 precision:  0.7043083744350307 
                    recall:  0.7043083744350307 
                  fMeasure:  0.7043083744350307 
         weightedPrecision:  0.7452877071581182 
            weightedRecall:  0.7043083744350307 
 weightedFalsePositiveRate:  0.18281720128587403 
  weightedTruePositiveRate:  0.7043083744350307 
 
Confusion Matrix:
[[14309  5654     0     0     0  1121]
 [ 6285 22506   468     0   529    43]
 [   10   338  3142   150  1127     0]
 [    0    18    65   106    28     0]
 [    0     4    10     0    61     0]
 [  343    37     0     0     0   859]]
