# Multiclass Classification of Cover Type Using PySpark

In this notebook, the task is to classify geographical locations according to their predicted tree cover using Gradient Boosting and Random Forest classifiers. 

**Data**:
http://archive.ics.uci.edu/ml/datasets/Covertype 

This notebook was adapted from an assignment completed as part of the UCSD MicroMasters on edx.

In [1]:
#to time entire solution
import time
start_nb = time.time()

In [2]:
import os

os.environ["PYSPARK_PYTHON"]="python3"
os.environ["PYSPARK_DRIVER_PYTHON"] = "python3"

from pyspark import SparkContext
sc=SparkContext()

In [3]:
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.regression import LabeledPoint

from pyspark.mllib.tree import GradientBoostedTrees, GradientBoostedTreesModel
from pyspark.mllib.tree import RandomForest, RandomForestModel
from pyspark.mllib.util import MLUtils

#import os
import pickle
from os.path import exists

%config IPCompleter.greedy=True

In [4]:
#define a dictionary of cover types
CoverTypes={1.0: 'Spruce/Fir',
            2.0: 'Lodgepole Pine',
            3.0: 'Ponderosa Pine',
            4.0: 'Cottonwood/Willow',
            5.0: 'Aspen',
            6.0: 'Douglas-fir',
            7.0: 'Krummholz' }
print('Tree Cover Types:', CoverTypes)

Tree Cover Types: {1.0: 'Spruce/Fir', 2.0: 'Lodgepole Pine', 3.0: 'Ponderosa Pine', 4.0: 'Cottonwood/Willow', 5.0: 'Aspen', 6.0: 'Douglas-fir', 7.0: 'Krummholz'}


## Collecting data

In [5]:
#break up features made out of several binary features.
def get_columns(cols_txt):
    cols=[a.strip() for a in cols_txt.split(',')]
    colDict={a:[a] for a in cols}
    colDict['Soil_Type (40 binary columns)'] = ['ST_'+str(i) for i in range(40)]
    colDict['Wilderness_Area (4 binarycolumns)'] = ['WA_'+str(i) for i in range(4)]
    columns=[]
    for item in cols:
        columns = columns + colDict[item]
    return columns

In [6]:
#define feature names
cols_txt="""
Elevation, Aspect, Slope, Horizontal_Distance_To_Hydrology,
Vertical_Distance_To_Hydrology, Horizontal_Distance_To_Roadways,
Hillshade_9am, Hillshade_Noon, Hillshade_3pm,
Horizontal_Distance_To_Fire_Points, Wilderness_Area (4 binarycolumns), 
Soil_Type (40 binary columns), Cover_Type
"""
columns = get_columns(cols_txt)

In [7]:
#read file into an RDD; when doing on a real cluster need file available on all nodes ideally in HDFS
path='covtype/covtype.data'
inputRDD=sc.textFile(path)

## Helper Functions

Function label_RDD takes an RDD as input and returns an RDD of labelled points, with the first element being the label and second element being a DenseVector that contains all elements of the input RDD except the last value, which is the label.

In [8]:
def label_RDD(inputRDD):
    return inputRDD.map(lambda x:[float(i) for i in x.split(',')]).map(lambda s:LabeledPoint(s[-1], Vectors.dense(s[0:-1])))    

In [9]:
Data = label_RDD(inputRDD)
Data.cache()

PythonRDD[2] at RDD at PythonRDD.scala:53

The function count_examples takes an RDD as input and returns count of number of labels belonging to each class, sorted in descending order by counts.

In [10]:
def count_examples(Data):
    count = Data.map(lambda labelPoint: (labelPoint.label,1)).reduceByKey(lambda x,y: x+y).map(lambda x: (x[1], x[0])).sortByKey(ascending=False).map(lambda x: (x[1], x[0]))
    return count.collect()  

In [11]:
counts = count_examples(Data)

In [12]:
total=Data.count()
print('total data size=',total)
print('              type (label):   percent of total')
print('---------------------------------------------------------')
print('\n'.join(['%20s (%3.1f):\t%4.2f'%(CoverTypes[a[0]],a[0],100.0*a[1]/float(total)) for a in counts]))

total data size= 581012
              type (label):   percent of total
---------------------------------------------------------
      Lodgepole Pine (2.0):	48.76
          Spruce/Fir (1.0):	36.46
      Ponderosa Pine (3.0):	6.15
           Krummholz (7.0):	3.53
         Douglas-fir (6.0):	2.99
               Aspen (5.0):	1.63
   Cottonwood/Willow (4.0):	0.47


The function labels_to_binary makes the problem binary as the implementation of BoostedGradientTrees in MLLib supports only binary problems. The CovType problem has 7 classes; to make this binary, the dataset is transformed to one-hot encoding.

In [13]:
def labels_to_binary(Data):
    Data = Data.map(lambda x: LabeledPoint(1.0 if x.label == 2.0 else 0.0, x.features))
    return Data

In [14]:
Data = labels_to_binary(Data)

## Train / Test Split

In [15]:
trainingData, testData = Data.randomSplit([0.7, 0.3], seed = 2018)
print("Training Dataset Count: " + str(trainingData.count()))
print("Test Dataset Count: " + str(testData.count()))

Training Dataset Count: 406045
Test Dataset Count: 174967


In [16]:
counts = count_examples(testData)

## Gradient Boosted Trees

### Main classes and methods

* `GradientBoostedTrees` is the class that implements the learning trainClassifier,
   * It's main method is `trainClassifier(trainingData)` which takes as input a training set and generates an instance of `GradientBoostedTreesModel`
   * The main parameter from train Classifier are:
      * **data** – Training dataset: RDD of LabeledPoint. Labels should take values {0, 1}.
      * categoricalFeaturesInfo – Map storing arity of categorical features. E.g., an entry (n -> k) indicates that feature n is categorical with k categories indexed from 0: {0, 1, ..., k-1}.
      * **loss** – Loss function used for minimization during gradient boosting. Supported: {“logLoss” (default), “leastSquaresError”, “leastAbsoluteError”}.
      * **numIterations** – Number of iterations of boosting. (default: 100)
      * **learningRate** – Learning rate for shrinking the contribution of each estimator. The learning rate should be between in the interval (0, 1]. (default: 0.1)
      * **maxDepth** – Maximum depth of the tree. E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes. (default: 3)
      * **maxBins** – maximum number of bins used for splitting features (default: 32) DecisionTree requires maxBins >= max categories
      
      
* `GradientBoostedTreesModel` represents the output of the boosting process: a linear combination of classification trees. The methods supported by this class are:
   * `save(sc, path)` : save the tree to a given filename, sc is the Spark Context.
   * `load(sc,path)` : The counterpart to save - load classifier from file.
   * `predict(X)` : predict on a single datapoint (the `.features` field of a `LabeledPont`) or an RDD of datapoints.
   * `toDebugString()` : print the classifier in a human readable format.

In [17]:
def Classify_GB(trainingData, testData, maxDepth):

    model = GradientBoostedTrees.trainClassifier(trainingData,
                                             categoricalFeaturesInfo={}, maxDepth=maxDepth)

    #evaluate model on test instances and compute test error
    predictions = model.predict(testData.map(lambda x: x.features))
    labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)
    testErr = labelsAndPredictions.filter(
        lambda lp: lp[0] != lp[1]).count() / float(testData.count())
    
    return testErr

In [18]:
testErr = Classify_GB(trainingData, testData, 3)
print(testErr)

0.2138917624466328


## Random Forests

**trainClassifier**`(data, numClasses, categoricalFeaturesInfo, numTrees, featureSubsetStrategy='auto', impurity='gini', maxDepth=4, maxBins=32, seed=None)`   
Method to train a decision tree model for binary or multiclass classification.

**Parameters:**  
* *data* – Training dataset: RDD of LabeledPoint. Labels should take values {0, 1, ..., numClasses-1}.  
* *numClasses* – number of classes for classification.  
* *categoricalFeaturesInfo* – Map storing arity of categorical features. E.g., an entry (n -> k) indicates that feature n is categorical with k categories indexed from 0: {0, 1, ..., k-1}.  
* *numTrees* – Number of trees in the random forest.  
* *featureSubsetStrategy* – Number of features to consider for splits at each node. Supported: “auto” (default), “all”, “sqrt”, “log2”, “onethird”. If “auto” is set, this parameter is set based on numTrees: if numTrees == 1, set to “all”; if numTrees > 1 (forest) set to “sqrt”.
* *impurity* – Criterion used for information gain calculation. Supported values: “gini” (recommended) or “entropy”.  
* *maxDepth* – Maximum depth of the tree. E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes. (default: 4)  
* *maxBins* – maximum number of bins used for splitting features (default: 32)
* *seed* – Random seed for bootstrapping and choosing feature subsets.  

**Returns:**	
RandomForestModel that can be used for prediction

In [19]:
def Classify_RF(trainingData, testData, depth):    

    model = RandomForest.trainClassifier(trainingData, numClasses=2, categoricalFeaturesInfo={},
                                         numTrees=10, featureSubsetStrategy="auto",
                                         impurity='gini', maxDepth=depth, maxBins=32)

    #evaluate model on test instances and compute test error
    predictions = model.predict(testData.map(lambda x: x.features))
    labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)
    testErr = labelsAndPredictions.filter(
        lambda lp: lp[0] != lp[1]).count() / float(testData.count())

    return testErr

In [20]:
testErr = Classify_RF(trainingData, testData, 6)
print(testErr)

0.2654614870232672


In [21]:
end_nb = time.time()
print("Total time taken: ", end_nb - start_nb)

Total time taken:  117.25918221473694
