# Binary Classification Example

The Spark MLlib Pipelines API provides higher-level API built on top of DataFrames for constructing ML pipelines.
You can read more about the Pipelines API in the [MLlib programming guide](https://spark.apache.org/docs/latest/ml-guide.html).

**Binary Classification** is the task of predicting a binary label.
For example, is an email spam or not spam? Should I show this ad to this user or not? Will it rain tomorrow or not?
This notebook illustrates algorithms for making these types of predictions.

## Dataset Review

The Adult dataset is publicly available at the [UCI Machine Learning Repository](https://archive.ics.uci.edu/ml/datasets/Adult).
This data derives from census data and consists of information about 48842 individuals and their annual income.
You can use this information to predict if an individual earns **<=50K or >50k** a year.
The dataset consists of both numeric and categorical variables.

Attribute Information:

- age: continuous
- workclass: Private,Self-emp-not-inc, Self-emp-inc, Federal-gov, Local-gov, State-gov, Without-pay, Never-worked
- fnlwgt: continuous
- education: Bachelors, Some-college, 11th, HS-grad, Prof-school, Assoc-acdm, Assoc-voc...
- education-num: continuous
- marital-status: Married-civ-spouse, Divorced, Never-married, Separated, Widowed, Married-spouse-absent...
- occupation: Tech-support, Craft-repair, Other-service, Sales, Exec-managerial, Prof-specialty, Handlers-cleaners...
- relationship: Wife, Own-child, Husband, Not-in-family, Other-relative, Unmarried
- race: White, Asian-Pac-Islander, Amer-Indian-Eskimo, Other, Black
- sex: Female, Male
- capital-gain: continuous
- capital-loss: continuous
- hours-per-week: continuous
- native-country: United-States, Cambodia, England, Puerto-Rico, Canada, Germany...

Target/Label: - <=50K, >50K

## Load Data

The Adult dataset is available in databricks-datasets. Read in the data using the CSV data source for Spark and rename the columns appropriately.

In [0]:
%fs ls databricks-datasets/adult/adult.data

path,name,size
dbfs:/databricks-datasets/adult/adult.data,adult.data,3974305


In [0]:
%fs head databricks-datasets/adult/adult.data

In [0]:
from pyspark.sql.types import DoubleType, StringType, StructField, StructType

schema = StructType([
  StructField("age", DoubleType(), False),
  StructField("workclass", StringType(), False),
  StructField("fnlwgt", DoubleType(), False),
  StructField("education", StringType(), False),
  StructField("education_num", DoubleType(), False),
  StructField("marital_status", StringType(), False),
  StructField("occupation", StringType(), False),
  StructField("relationship", StringType(), False),
  StructField("race", StringType(), False),
  StructField("sex", StringType(), False),
  StructField("capital_gain", DoubleType(), False),
  StructField("capital_loss", DoubleType(), False),
  StructField("hours_per_week", DoubleType(), False),
  StructField("native_country", StringType(), False),
  StructField("income", StringType(), False)
])

dataset = spark.read.format("csv").schema(schema).load("/databricks-datasets/adult/adult.data")
cols = dataset.columns

In [0]:
display(dataset)

age,workclass,fnlwgt,education,education_num,marital_status,occupation,relationship,race,sex,capital_gain,capital_loss,hours_per_week,native_country,income
39.0,State-gov,77516.0,Bachelors,13.0,Never-married,Adm-clerical,Not-in-family,White,Male,2174.0,0.0,40.0,United-States,<=50K
50.0,Self-emp-not-inc,83311.0,Bachelors,13.0,Married-civ-spouse,Exec-managerial,Husband,White,Male,0.0,0.0,13.0,United-States,<=50K
38.0,Private,215646.0,HS-grad,9.0,Divorced,Handlers-cleaners,Not-in-family,White,Male,0.0,0.0,40.0,United-States,<=50K
53.0,Private,234721.0,11th,7.0,Married-civ-spouse,Handlers-cleaners,Husband,Black,Male,0.0,0.0,40.0,United-States,<=50K
28.0,Private,338409.0,Bachelors,13.0,Married-civ-spouse,Prof-specialty,Wife,Black,Female,0.0,0.0,40.0,Cuba,<=50K
37.0,Private,284582.0,Masters,14.0,Married-civ-spouse,Exec-managerial,Wife,White,Female,0.0,0.0,40.0,United-States,<=50K
49.0,Private,160187.0,9th,5.0,Married-spouse-absent,Other-service,Not-in-family,Black,Female,0.0,0.0,16.0,Jamaica,<=50K
52.0,Self-emp-not-inc,209642.0,HS-grad,9.0,Married-civ-spouse,Exec-managerial,Husband,White,Male,0.0,0.0,45.0,United-States,>50K
31.0,Private,45781.0,Masters,14.0,Never-married,Prof-specialty,Not-in-family,White,Female,14084.0,0.0,50.0,United-States,>50K
42.0,Private,159449.0,Bachelors,13.0,Married-civ-spouse,Exec-managerial,Husband,White,Male,5178.0,0.0,40.0,United-States,>50K


## Preprocess Data

To use algorithms like Logistic Regression, you must first convert the categorical variables in the dataset into numeric variables.
There are two ways to do this.

* Category Indexing

  This is basically assigning a numeric value to each category from {0, 1, 2, ...numCategories-1}.
  This introduces an implicit ordering among your categories, and is more suitable for ordinal variables (eg: Poor: 0, Average: 1, Good: 2)

* One-Hot Encoding

  This converts categories into binary vectors with at most one nonzero value (eg: (Blue: [1, 0]), (Green: [0, 1]), (Red: [0, 0]))

This notebook uses a combination of [StringIndexer] and, depending on your Spark version, either [OneHotEncoder] or [OneHotEncoderEstimator] to convert the categorical variables.
`OneHotEncoder` and `OneHotEncoderEstimator` return a [SparseVector]. 

Since there is more than one stage of feature transformations, use a [Pipeline] to tie the stages together.
This simplifies the code.

[StringIndexer]: http://spark.apache.org/docs/latest/ml-features.html#stringindexer
[OneHotEncoderEstimator]: https://spark.apache.org/docs/2.4.5/api/python/pyspark.ml.html?highlight=one%20hot%20encoder#pyspark.ml.feature.OneHotEncoderEstimator
[SparseVector]: https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.linalg.SparseVector.html#pyspark.ml.linalg.SparseVector
[Pipeline]: https://spark.apache.org/docs/latest/ml-pipeline.html#ml-pipelines
[OneHotEncoder]: https://spark.apache.org/docs/latest/ml-features.html#onehotencoder

In [0]:
import pyspark
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler

from distutils.version import LooseVersion

categoricalColumns = ["workclass", "education", "marital_status", "occupation", "relationship", "race", "sex", "native_country"]
stages = [] # stages in Pipeline
for categoricalCol in categoricalColumns:
    # Category Indexing with StringIndexer
    stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol + "Index")
    # Use OneHotEncoder to convert categorical variables into binary SparseVectors
    if LooseVersion(pyspark.__version__) < LooseVersion("3.0"):
        from pyspark.ml.feature import OneHotEncoderEstimator
        encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"])
    else:
        from pyspark.ml.feature import OneHotEncoder
        encoder = OneHotEncoder(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"])
    # Add stages.  These are not run here, but will run all at once later on.
    stages += [stringIndexer, encoder]

In [0]:
print(pyspark.__version__)

3.1.2


In [0]:
LooseVersion(pyspark.__version__)

Out[5]: LooseVersion ('3.1.2')

The above code basically indexes each categorical column using the `StringIndexer`,
and then converts the indexed categories into one-hot encoded variables.
The resulting output has the binary vectors appended to the end of each row.

Use the `StringIndexer` again to encode labels to label indices.

In [0]:
# Convert label into label indices using the StringIndexer
label_stringIdx = StringIndexer(inputCol="income", outputCol="label")
stages += [label_stringIdx]

Use a `VectorAssembler` to combine all the feature columns into a single vector column.
This includes both the numeric columns and the one-hot encoded binary vector columns in the dataset.

In [0]:
# Transform all features into a vector using VectorAssembler
numericCols = ["age", "fnlwgt", "education_num", "capital_gain", "capital_loss", "hours_per_week"]
assemblerInputs = [c + "classVec" for c in categoricalColumns] + numericCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]

Run the stages as a Pipeline. This puts the data through all of the feature transformations in a single call.

In [0]:
from pyspark.ml.classification import LogisticRegression
  
partialPipeline = Pipeline().setStages(stages)
pipelineModel = partialPipeline.fit(dataset)
preppedDataDF = pipelineModel.transform(dataset)

In [0]:
# Fit model to prepped data
lrModel = LogisticRegression().fit(preppedDataDF)

# ROC for training data
display(lrModel, preppedDataDF, "ROC")

False Positive Rate,True Positive Rate,Threshold
0.0,0.0,0.999999999999998
0.0,0.0294117647058823,0.999999999999998
0.0,0.0588235294117647,0.9990711228185274
0.0,0.088235294117647,0.998799388244511
0.0,0.1176470588235294,0.9980956262870608
0.0,0.1470588235294117,0.9924488810180736
0.0,0.1764705882352941,0.9857871527075162
0.0,0.2058823529411764,0.9780074377264764
0.0,0.2352941176470588,0.9625272519799214
0.0,0.2647058823529412,0.8880880030405891


In [0]:
display(lrModel, preppedDataDF)

fitted values,residuals
-2.393182935397028,-0.0836940110653732
-1.1223113099359612,-0.2455828110147658
-0.0301224372871701,-0.4924699600417892
-2.347613929426214,0.9127443830058678
-1.1213359560759493,-0.2457635615100061
-5.306296240000036,-0.0049357815848263
-2.493045750440344,-0.0763471390182114
-5.16303436772777,-0.0056917225156873
-0.5959524881147853,-0.3552702456863094
0.7473567948921191,0.3213975145479291


In [0]:
# Keep relevant columns
selectedcols = ["label", "features"] + cols
dataset = preppedDataDF.select(selectedcols)
display(dataset)

label,features,age,workclass,fnlwgt,education,education_num,marital_status,occupation,relationship,race,sex,capital_gain,capital_loss,hours_per_week,native_country,income
0.0,"Map(vectorType -> sparse, length -> 100, indices -> List(4, 10, 24, 32, 44, 48, 52, 53, 94, 95, 96, 97, 99), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 39.0, 77516.0, 13.0, 2174.0, 40.0))",39.0,State-gov,77516.0,Bachelors,13.0,Never-married,Adm-clerical,Not-in-family,White,Male,2174.0,0.0,40.0,United-States,<=50K
0.0,"Map(vectorType -> sparse, length -> 100, indices -> List(1, 10, 23, 31, 43, 48, 52, 53, 94, 95, 96, 99), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 50.0, 83311.0, 13.0, 13.0))",50.0,Self-emp-not-inc,83311.0,Bachelors,13.0,Married-civ-spouse,Exec-managerial,Husband,White,Male,0.0,0.0,13.0,United-States,<=50K
0.0,"Map(vectorType -> sparse, length -> 100, indices -> List(0, 8, 25, 38, 44, 48, 52, 53, 94, 95, 96, 99), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 38.0, 215646.0, 9.0, 40.0))",38.0,Private,215646.0,HS-grad,9.0,Divorced,Handlers-cleaners,Not-in-family,White,Male,0.0,0.0,40.0,United-States,<=50K
0.0,"Map(vectorType -> sparse, length -> 100, indices -> List(0, 13, 23, 38, 43, 49, 52, 53, 94, 95, 96, 99), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 53.0, 234721.0, 7.0, 40.0))",53.0,Private,234721.0,11th,7.0,Married-civ-spouse,Handlers-cleaners,Husband,Black,Male,0.0,0.0,40.0,United-States,<=50K
0.0,"Map(vectorType -> sparse, length -> 100, indices -> List(0, 10, 23, 29, 47, 49, 62, 94, 95, 96, 99), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 28.0, 338409.0, 13.0, 40.0))",28.0,Private,338409.0,Bachelors,13.0,Married-civ-spouse,Prof-specialty,Wife,Black,Female,0.0,0.0,40.0,Cuba,<=50K
0.0,"Map(vectorType -> sparse, length -> 100, indices -> List(0, 11, 23, 31, 47, 48, 53, 94, 95, 96, 99), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 37.0, 284582.0, 14.0, 40.0))",37.0,Private,284582.0,Masters,14.0,Married-civ-spouse,Exec-managerial,Wife,White,Female,0.0,0.0,40.0,United-States,<=50K
0.0,"Map(vectorType -> sparse, length -> 100, indices -> List(0, 18, 28, 34, 44, 49, 64, 94, 95, 96, 99), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 49.0, 160187.0, 5.0, 16.0))",49.0,Private,160187.0,9th,5.0,Married-spouse-absent,Other-service,Not-in-family,Black,Female,0.0,0.0,16.0,Jamaica,<=50K
1.0,"Map(vectorType -> sparse, length -> 100, indices -> List(1, 8, 23, 31, 43, 48, 52, 53, 94, 95, 96, 99), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 52.0, 209642.0, 9.0, 45.0))",52.0,Self-emp-not-inc,209642.0,HS-grad,9.0,Married-civ-spouse,Exec-managerial,Husband,White,Male,0.0,0.0,45.0,United-States,>50K
1.0,"Map(vectorType -> sparse, length -> 100, indices -> List(0, 11, 24, 29, 44, 48, 53, 94, 95, 96, 97, 99), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 31.0, 45781.0, 14.0, 14084.0, 50.0))",31.0,Private,45781.0,Masters,14.0,Never-married,Prof-specialty,Not-in-family,White,Female,14084.0,0.0,50.0,United-States,>50K
1.0,"Map(vectorType -> sparse, length -> 100, indices -> List(0, 10, 23, 31, 43, 48, 52, 53, 94, 95, 96, 97, 99), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 42.0, 159449.0, 13.0, 5178.0, 40.0))",42.0,Private,159449.0,Bachelors,13.0,Married-civ-spouse,Exec-managerial,Husband,White,Male,5178.0,0.0,40.0,United-States,>50K


In [0]:
### Randomly split data into training and test sets. set seed for reproducibility
(trainingData, testData) = dataset.randomSplit([0.7, 0.3], seed=100)
print(trainingData.count())
print(testData.count())

## Fit and Evaluate Models

Now, try out some of the Binary Classification algorithms available in the Pipelines API.

Out of these algorithms, the below are also capable of supporting multiclass classification with the Python API:
- Decision Tree Classifier
- Random Forest Classifier

These are the general steps to build the models:
- Create initial model using the training set
- Tune parameters with a `ParamGrid` and 5-fold Cross Validation
- Evaluate the best model obtained from the Cross Validation using the test set

Use the `BinaryClassificationEvaluator` to evaluate the models, which uses [areaUnderROC] as the default metric.

[areaUnderROC]: https://en.wikipedia.org/wiki/Receiver_operating_characteristic#Area_under_the_curve

## Logistic Regression

You can read more about [Logistic Regression] from the [classification and regression] section of MLlib Programming Guide.
In the Pipelines API, you can now perform Elastic-Net Regularization with Logistic Regression, as well as other linear methods.

[classification and regression]: https://spark.apache.org/docs/latest/ml-classification-regression.html
[Logistic Regression]: https://spark.apache.org/docs/latest/ml-classification-regression.html#logistic-regression

In [0]:
from pyspark.ml.classification import LogisticRegression

# Create initial LogisticRegression model
lr = LogisticRegression(labelCol="label", featuresCol="features", maxIter=10)

# Train model with Training Data
lrModel = lr.fit(trainingData)

In [0]:
# Make predictions on test data using the transform() method.
# LogisticRegression.transform() will only use the 'features' column.
predictions = lrModel.transform(testData)

In [0]:
# View model's predictions and probabilities of each prediction class
# You can select any columns in the above schema to view as well
selected = predictions.select("label", "prediction", "probability", "age", "occupation")
display(selected)

label,prediction,probability,age,occupation
0.0,1.0,"Map(vectorType -> dense, length -> 2, values -> List(0.16304404160706326, 0.8369559583929367))",36.0,Prof-specialty
0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.7011865325539305, 0.2988134674460695))",32.0,Prof-specialty
0.0,1.0,"Map(vectorType -> dense, length -> 2, values -> List(0.4980113187669919, 0.5019886812330081))",33.0,Prof-specialty
0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.68126165186417, 0.31873834813583))",39.0,Prof-specialty
0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.6108620507115975, 0.3891379492884025))",39.0,Prof-specialty
0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.6096341652357697, 0.39036583476423026))",50.0,Prof-specialty
0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.6002598383118382, 0.39974016168816184))",51.0,Prof-specialty
0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.5998980983235114, 0.4001019016764886))",60.0,Prof-specialty
0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.7478554516614605, 0.2521445483385395))",34.0,Prof-specialty
0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.9885496846452969, 0.011450315354703089))",20.0,Prof-specialty


Use `BinaryClassificationEvaluator` to evaluate the model.

In [0]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Evaluate model
evaluator = BinaryClassificationEvaluator()
evaluator.evaluate(predictions)

Note that the default metric for the ``BinaryClassificationEvaluator`` is ``areaUnderROC``

In [0]:
evaluator.getMetricName()

The evaluator accepts two kinds of metrics - areaUnderROC and areaUnderPR.
Set it to areaUnderPR by using evaluator.setMetricName("areaUnderPR").

Now, tune the model using `ParamGridBuilder` and `CrossValidator`.

You can use `explainParams()` to print a list of all parameters and their definitions.

In [0]:
print(lr.explainParams())

Using three values for regParam, three values for maxIter, and two values for elasticNetParam,
the grid includes 3 x 3 x 3 = 27 parameter settings for CrossValidator.

In [0]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Create ParamGrid for Cross Validation
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.01, 0.5, 2.0])
             .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])
             .addGrid(lr.maxIter, [1, 5, 10])
             .build())

In [0]:
# Create 5-fold CrossValidator
cv = CrossValidator(estimator=lr, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)

# Run cross validations
cvModel = cv.fit(trainingData)
# this will likely take a fair amount of time because of the amount of models that we're creating and testing

In [0]:
# Use the test set to measure the accuracy of the model on new data
predictions = cvModel.transform(testData)

In [0]:
# cvModel uses the best model found from the Cross Validation
# Evaluate best model
evaluator.evaluate(predictions)

You can also access the model's feature weights and intercepts.

In [0]:
print('Model Intercept: ', cvModel.bestModel.intercept)

In [0]:
weights = cvModel.bestModel.coefficients
weights = [(float(w),) for w in weights]  # convert numpy type to float, and to tuple
weightsDF = sqlContext.createDataFrame(weights, ["Feature Weight"])
display(weightsDF)

Feature Weight
-0.281603275704831
-0.6264483359096494
-0.4360275569860984
-0.5064247711709583
-0.506326689118052
-0.0049481441717509
0.0708698962396303
-2.66978938102928
-0.5593567014148134
-0.2239437895813485


In [0]:
# View best model's predictions and probabilities of each prediction class
selected = predictions.select("label", "prediction", "probability", "age", "occupation")
display(selected)

label,prediction,probability,age,occupation
0.0,1.0,"Map(vectorType -> dense, length -> 2, values -> List(0.2329641926839113, 0.7670358073160887))",36.0,Prof-specialty
0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.6552066745246206, 0.34479332547537944))",32.0,Prof-specialty
0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.5391022452506109, 0.4608977547493891))",33.0,Prof-specialty
0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.6373416944642422, 0.36265830553575784))",39.0,Prof-specialty
0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.6016034358099771, 0.3983965641900229))",39.0,Prof-specialty
0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.5947767904063128, 0.4052232095936872))",50.0,Prof-specialty
0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.5899376739565122, 0.4100623260434878))",51.0,Prof-specialty
0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.5976373681713757, 0.4023626318286243))",60.0,Prof-specialty
0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.6907455609652081, 0.3092544390347919))",34.0,Prof-specialty
0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.957840027974627, 0.04215997202537303))",20.0,Prof-specialty


## Decision Trees

You can read more about [Decision Trees](http://spark.apache.org/docs/latest/mllib-decision-tree.html) in the Spark MLLib Programming Guide.
The Decision Trees algorithm is popular because it handles categorical
data and works out of the box with multiclass classification tasks.

In [0]:
from pyspark.ml.classification import DecisionTreeClassifier

# Create initial Decision Tree Model
dt = DecisionTreeClassifier(labelCol="label", featuresCol="features", maxDepth=3)

# Train model with Training Data
dtModel = dt.fit(trainingData)

You can extract the number of nodes in the decision tree as well as the
tree depth of the model.

In [0]:
print("numNodes = ", dtModel.numNodes)
print("depth = ", dtModel.depth)

In [0]:
display(dtModel)

treeNode
"{""index"":5,""featureType"":""categorical"",""prediction"":null,""threshold"":null,""categories"":[0.0],""feature"":23,""overflow"":false}"
"{""index"":1,""featureType"":""continuous"",""prediction"":null,""threshold"":7792.0,""categories"":null,""feature"":97,""overflow"":false}"
"{""index"":0,""featureType"":null,""prediction"":0.0,""threshold"":null,""categories"":null,""feature"":null,""overflow"":false}"
"{""index"":3,""featureType"":""continuous"",""prediction"":null,""threshold"":20.5,""categories"":null,""feature"":94,""overflow"":false}"
"{""index"":2,""featureType"":null,""prediction"":0.0,""threshold"":null,""categories"":null,""feature"":null,""overflow"":false}"
"{""index"":4,""featureType"":null,""prediction"":1.0,""threshold"":null,""categories"":null,""feature"":null,""overflow"":false}"
"{""index"":9,""featureType"":""continuous"",""prediction"":null,""threshold"":12.5,""categories"":null,""feature"":96,""overflow"":false}"
"{""index"":7,""featureType"":""continuous"",""prediction"":null,""threshold"":3368.0,""categories"":null,""feature"":97,""overflow"":false}"
"{""index"":6,""featureType"":null,""prediction"":0.0,""threshold"":null,""categories"":null,""feature"":null,""overflow"":false}"
"{""index"":8,""featureType"":null,""prediction"":1.0,""threshold"":null,""categories"":null,""feature"":null,""overflow"":false}"


In [0]:
# Make predictions on test data using the Transformer.transform() method.
predictions = dtModel.transform(testData)

In [0]:
predictions.printSchema()

In [0]:
# View model's predictions and probabilities of each prediction class
selected = predictions.select("label", "prediction", "probability", "age", "occupation")
display(selected)

label,prediction,probability,age,occupation
0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.7010492093985518, 0.2989507906014482))",36.0,Prof-specialty
0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.7010492093985518, 0.2989507906014482))",32.0,Prof-specialty
0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.7010492093985518, 0.2989507906014482))",33.0,Prof-specialty
0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.7010492093985518, 0.2989507906014482))",39.0,Prof-specialty
0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.7010492093985518, 0.2989507906014482))",39.0,Prof-specialty
0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.7010492093985518, 0.2989507906014482))",50.0,Prof-specialty
0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.7010492093985518, 0.2989507906014482))",51.0,Prof-specialty
0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.7010492093985518, 0.2989507906014482))",60.0,Prof-specialty
0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.7010492093985518, 0.2989507906014482))",34.0,Prof-specialty
0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.7010492093985518, 0.2989507906014482))",20.0,Prof-specialty


Evaluate the Decision Tree model with
`BinaryClassificationEvaluator`.

In [0]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
# Evaluate model
evaluator = BinaryClassificationEvaluator()
evaluator.evaluate(predictions)

Entropy and the Gini coefficient are the supported measures of impurity for Decision Trees. This is ``Gini`` by default. Changing this value is simple, ``model.setImpurity("Entropy")``.

In [0]:
dt.getImpurity()

Now tune the model with using `ParamGridBuilder` and `CrossValidator`.

With three values for maxDepth and three values for maxBin, the grid has 3 x 3 = 9 parameter settings for `CrossValidator`.

In [0]:
# Create ParamGrid for Cross Validation
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
paramGrid = (ParamGridBuilder()
             .addGrid(dt.maxDepth, [1, 2, 6, 10])
             .addGrid(dt.maxBins, [20, 40, 80])
             .build())

In [0]:
# Create 5-fold CrossValidator
cv = CrossValidator(estimator=dt, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)

# Run cross validations
cvModel = cv.fit(trainingData)
# Takes ~5 minutes

In [0]:
print("numNodes = ", cvModel.bestModel.numNodes)
print("depth = ", cvModel.bestModel.depth)

In [0]:
# Use test set to measure the accuracy of the model on new data
predictions = cvModel.transform(testData)

In [0]:
# cvModel uses the best model found from the Cross Validation
# Evaluate best model
evaluator.evaluate(predictions)

In [0]:
# View Best model's predictions and probabilities of each prediction class
selected = predictions.select("label", "prediction", "probability", "age", "occupation")
display(selected)

label,prediction,probability,age,occupation
0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.5089285714285714, 0.49107142857142855))",36.0,Prof-specialty
0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.8206106870229007, 0.17938931297709923))",32.0,Prof-specialty
0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.8206106870229007, 0.17938931297709923))",33.0,Prof-specialty
0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.6779661016949152, 0.3220338983050847))",39.0,Prof-specialty
0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.6779661016949152, 0.3220338983050847))",39.0,Prof-specialty
0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.6779661016949152, 0.3220338983050847))",50.0,Prof-specialty
0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.6779661016949152, 0.3220338983050847))",51.0,Prof-specialty
0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.9274193548387096, 0.07258064516129033))",60.0,Prof-specialty
0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.6779661016949152, 0.3220338983050847))",34.0,Prof-specialty
0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.9145077720207254, 0.08549222797927461))",20.0,Prof-specialty


## Random Forest

Random Forests uses an ensemble of trees to improve model accuracy.
You can read more about [Random Forest] from the [classification and regression] section of MLlib Programming Guide.

[classification and regression]: https://spark.apache.org/docs/latest/ml-classification-regression.html
[Random Forest]: https://spark.apache.org/docs/latest/ml-classification-regression.html#random-forests

In [0]:
from pyspark.ml.classification import RandomForestClassifier

# Create an initial RandomForest model.
rf = RandomForestClassifier(labelCol="label", featuresCol="features")

# Train model with Training Data
rfModel = rf.fit(trainingData)

In [0]:
# Make predictions on test data using the Transformer.transform() method.
predictions = rfModel.transform(testData)

In [0]:
predictions.printSchema()

In [0]:
# View model's predictions and probabilities of each prediction class
selected = predictions.select("label", "prediction", "probability", "age", "occupation")
display(selected)

label,prediction,probability,age,occupation
0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.537048668949202, 0.4629513310507979))",36.0,Prof-specialty
0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.6776796783146757, 0.3223203216853244))",32.0,Prof-specialty
0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.6524471509421086, 0.3475528490578914))",33.0,Prof-specialty
0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.6783793074615304, 0.3216206925384696))",39.0,Prof-specialty
0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.6524471509421086, 0.3475528490578914))",39.0,Prof-specialty
0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.6783793074615304, 0.3216206925384696))",50.0,Prof-specialty
0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.6783793074615304, 0.3216206925384696))",51.0,Prof-specialty
0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.6854267291327509, 0.3145732708672492))",60.0,Prof-specialty
0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.6783793074615304, 0.3216206925384696))",34.0,Prof-specialty
0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.8385976167514594, 0.16140238324854073))",20.0,Prof-specialty


Evaluate the Random Forest model with `BinaryClassificationEvaluator`.

In [0]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Evaluate model
evaluator = BinaryClassificationEvaluator()
evaluator.evaluate(predictions)

Now tune the model with `ParamGridBuilder` and `CrossValidator`.

With three values for maxDepth, two values for maxBin, and two values for numTrees,
the grid has 3 x 2 x 2 = 12 parameter settings for `CrossValidator`.

In [0]:
# Create ParamGrid for Cross Validation
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

paramGrid = (ParamGridBuilder()
             .addGrid(rf.maxDepth, [2, 4, 6])
             .addGrid(rf.maxBins, [20, 60])
             .addGrid(rf.numTrees, [5, 20])
             .build())

In [0]:
# Create 5-fold CrossValidator
cv = CrossValidator(estimator=rf, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)

# Run cross validations.  This can take about 6 minutes since it is training over 20 trees!
cvModel = cv.fit(trainingData)

In [0]:
# Use the test set to measure the accuracy of the model on new data
predictions = cvModel.transform(testData)

In [0]:
# cvModel uses the best model found from the Cross Validation
# Evaluate best model
evaluator.evaluate(predictions)

In [0]:
# View Best model's predictions and probabilities of each prediction class
selected = predictions.select("label", "prediction", "probability", "age", "occupation")
display(selected)

label,prediction,probability,age,occupation
0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.5094807638277546, 0.49051923617224535))",36.0,Prof-specialty
0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.661701148411863, 0.338298851588137))",32.0,Prof-specialty
0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.6475646101435547, 0.3524353898564453))",33.0,Prof-specialty
0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.661701148411863, 0.338298851588137))",39.0,Prof-specialty
0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.6409016328210144, 0.3590983671789857))",39.0,Prof-specialty
0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.661701148411863, 0.338298851588137))",50.0,Prof-specialty
0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.661701148411863, 0.338298851588137))",51.0,Prof-specialty
0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.6847313095078987, 0.31526869049210127))",60.0,Prof-specialty
0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.6680692793701579, 0.331930720629842))",34.0,Prof-specialty
0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.8630135705355219, 0.1369864294644781))",20.0,Prof-specialty


## Make Predictions
As Random Forest gives the best areaUnderROC value, use the bestModel obtained from Random Forest for deployment,
and use it to generate predictions on new data.
Instead of new data, this example generates predictions on the entire dataset.

In [0]:
bestModel = cvModel.bestModel

In [0]:
# Generate predictions for entire dataset
finalPredictions = bestModel.transform(dataset)

In [0]:
# Evaluate best model
evaluator.evaluate(finalPredictions)

This example shows predictions grouped by age and occupation.

In [0]:
finalPredictions.createOrReplaceTempView("finalPredictions")

In an operational environment, analysts may use a similar machine learning pipeline to obtain predictions on new data, organize it into a table and use it for analysis or lead targeting.

In [0]:
%sql
SELECT occupation, prediction, count(*) AS count
FROM finalPredictions
GROUP BY occupation, prediction
ORDER BY occupation


occupation,prediction,count
?,0.0,1773
?,1.0,70
Adm-clerical,1.0,175
Adm-clerical,0.0,3595
Armed-Forces,1.0,1
Armed-Forces,0.0,8
Craft-repair,0.0,3921
Craft-repair,1.0,178
Exec-managerial,0.0,2577
Exec-managerial,1.0,1489


In [0]:
%sql
SELECT age, prediction, count(*) AS count
FROM finalPredictions
GROUP BY age, prediction
ORDER BY age

age,prediction,count
17.0,1.0,1
17.0,0.0,394
18.0,0.0,550
19.0,0.0,712
20.0,0.0,753
21.0,1.0,1
21.0,0.0,719
22.0,1.0,4
22.0,0.0,761
23.0,0.0,874
