In [4]:
# this workbook demonstrates how to use PySparkML to do single variable binary regression
# the setup is a bit contrived, since this is a how-to demo
# The gapminder_data has an additional column, "Over_65", which is set to 1 or 0 
# based on whether the life expectancy in 2007 is above or below 65
# we then train a regression model to 
# - predict whether life expectancy is above or below 65
# - evaluate the model accuracy
# - find the decision threshold
# 
# we will do this with 1) a decision tree regressor, and 2) a random forest regressor
#
# this code parallels the continuous regression workbook example
# please see that example for more comments and explanations

In [5]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [6]:
from pyspark.sql.session import SparkSession
sc = SparkContext('local')
spark = SparkSession(sc)

In [7]:
sqlContext = SQLContext(sc)

In [8]:
df = sqlContext.read.format("csv").option("inferschema","true").option("header", "true").option("delimiter", ",").load("gapminder_all_binary.csv")

In [9]:
# split the data into a training set (train model) and testing set (evaluate model)
(trainingData, testData) = df.randomSplit([0.7, 0.3])

In [10]:
df.printSchema()

root
 |-- continent: string (nullable = true)
 |-- country: string (nullable = true)
 |-- gdpPercap_1952: double (nullable = true)
 |-- gdpPercap_1957: double (nullable = true)
 |-- gdpPercap_1962: double (nullable = true)
 |-- gdpPercap_1967: double (nullable = true)
 |-- gdpPercap_1972: double (nullable = true)
 |-- gdpPercap_1977: double (nullable = true)
 |-- gdpPercap_1982: double (nullable = true)
 |-- gdpPercap_1987: double (nullable = true)
 |-- gdpPercap_1992: double (nullable = true)
 |-- gdpPercap_1997: double (nullable = true)
 |-- gdpPercap_2002: double (nullable = true)
 |-- gdpPercap_2007: double (nullable = true)
 |-- lifeExp_1952: double (nullable = true)
 |-- lifeExp_1957: double (nullable = true)
 |-- lifeExp_1962: double (nullable = true)
 |-- lifeExp_1967: double (nullable = true)
 |-- lifeExp_1972: double (nullable = true)
 |-- lifeExp_1977: double (nullable = true)
 |-- lifeExp_1982: double (nullable = true)
 |-- lifeExp_1987: double (nullable = true)
 |-- lifeEx

In [11]:
from pyspark.ml.feature import VectorAssembler

In [25]:
vectorAssembler = VectorAssembler(inputCols = ['lifeExp_2007'], outputCol = 'features')
va_training = vectorAssembler.transform(trainingData)
va_training = va_training.select(['features', 'Over_65'])
va_training.show(10)

+--------+-------+
|features|Over_65|
+--------+-------+
|[72.301]|      1|
|[42.731]|      0|
|[56.728]|      0|
|[50.728]|      0|
| [49.58]|      0|
|[44.741]|      0|
|[50.651]|      0|
|[46.462]|      0|
|[48.328]|      0|
|[54.791]|      0|
+--------+-------+
only showing top 10 rows



In [26]:
from pyspark.ml.regression import DecisionTreeRegressor

In [27]:
dt = DecisionTreeRegressor(featuresCol ='features', labelCol = 'Over_65')

In [28]:
dt_model = dt.fit(va_training)

In [29]:
va_testing = vectorAssembler.transform(trainingData)
va_testing = va_testing.select(['features', 'Over_65'])
va_testing.show(10)

+--------+-------+
|features|Over_65|
+--------+-------+
|[72.301]|      1|
|[42.731]|      0|
|[56.728]|      0|
|[50.728]|      0|
| [49.58]|      0|
|[44.741]|      0|
|[50.651]|      0|
|[46.462]|      0|
|[48.328]|      0|
|[54.791]|      0|
+--------+-------+
only showing top 10 rows



In [30]:
dt_predictions = dt_model.transform(va_testing)

In [31]:
dt_predictions.show(100)

+--------+-------+------------------+
|features|Over_65|        prediction|
+--------+-------+------------------+
|[72.301]|      1|               1.0|
|[42.731]|      0|               0.0|
|[56.728]|      0|               0.0|
|[50.728]|      0|               0.0|
| [49.58]|      0|               0.0|
|[44.741]|      0|               0.0|
|[50.651]|      0|               0.0|
|[46.462]|      0|               0.0|
|[48.328]|      0|               0.0|
|[54.791]|      0|               0.0|
|[51.579]|      0|               0.0|
| [58.04]|      0|               0.0|
|[52.947]|      0|               0.0|
|[59.448]|      0|               0.0|
|[56.007]|      0|               0.0|
|[46.388]|      0|               0.0|
| [54.11]|      0|               0.0|
|[42.592]|      0|               0.0|
|[45.678]|      0|               0.0|
|[73.952]|      1|               1.0|
|[59.443]|      0|               0.0|
|[48.303]|      0|               0.0|
|[54.467]|      0|               0.0|
|[64.164]|  

In [32]:
from pyspark.ml.regression import RandomForestRegressor

In [33]:
rf = RandomForestRegressor(featuresCol ='features', labelCol = 'Over_65', numTrees=20)

In [34]:
rf_model = rf.fit(va_training)

In [35]:
rf_predictions = rf_model.transform(va_testing)

In [37]:
rf_predictions.show(100)

+--------+-------+------------------+
|features|Over_65|        prediction|
+--------+-------+------------------+
|[72.301]|      1|               1.0|
|[42.731]|      0|               0.0|
|[56.728]|      0|               0.0|
|[50.728]|      0|               0.0|
| [49.58]|      0|               0.0|
|[44.741]|      0|               0.0|
|[50.651]|      0|               0.0|
|[46.462]|      0|               0.0|
|[48.328]|      0|               0.0|
|[54.791]|      0|               0.0|
|[51.579]|      0|               0.0|
| [58.04]|      0|               0.0|
|[52.947]|      0|               0.0|
|[59.448]|      0|               0.0|
|[56.007]|      0|               0.0|
|[46.388]|      0|               0.0|
| [54.11]|      0|               0.0|
|[42.592]|      0|               0.0|
|[45.678]|      0|               0.0|
|[73.952]|      1|               1.0|
|[59.443]|      0|               0.0|
|[48.303]|      0|               0.0|
|[54.467]|      0|               0.0|
|[64.164]|  

In [40]:
evaluator = MulticlassClassificationEvaluator(
        labelCol="Over_65", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(rf_predictions)
print("Test Error = %g" % (1.0 - accuracy))

Test Error = 0.122449


In [41]:
# find the cutoff value for predicting "Over_65"
#
# this part was a little surprising to me. Scikit-learn has a method for this: clf.tree_.threshold[0]
# 
# unfortunately, looks like there isn't a method for this in SparkML? 
# you can see the tree by printing .toDebugString on your model and 
# inspecting for where a prediction value around 0.5 occurs
# which appears to be around 63.925...
#
# not sure if this is really my only or best option, I've looked around 
# but haven't found a tidy api call for this...

In [42]:
rf_model.toDebugString

'RandomForestRegressionModel (uid=RandomForestRegressor_d3e9105f539c) with 20 trees\n  Tree 0 (weight 1.0):\n    If (feature 0 <= 66.1785)\n     If (feature 0 <= 60.182)\n      Predict: 0.0\n     Else (feature 0 > 60.182)\n      If (feature 0 <= 63.2415)\n       Predict: 0.3333333333333333\n      Else (feature 0 > 63.2415)\n       If (feature 0 <= 64.431)\n        Predict: 0.0\n       Else (feature 0 > 64.431)\n        Predict: 0.3333333333333333\n    Else (feature 0 > 66.1785)\n     If (feature 0 <= 76.2895)\n      Predict: 1.0\n     Else (feature 0 > 76.2895)\n      If (feature 0 <= 76.464)\n       Predict: 0.8333333333333334\n      Else (feature 0 > 76.464)\n       Predict: 1.0\n  Tree 1 (weight 1.0):\n    If (feature 0 <= 64.431)\n     Predict: 0.0\n    Else (feature 0 > 64.431)\n     If (feature 0 <= 66.1785)\n      Predict: 0.7142857142857143\n     Else (feature 0 > 66.1785)\n      If (feature 0 <= 76.2895)\n       Predict: 1.0\n      Else (feature 0 > 76.2895)\n       If (featur

In [43]:
# try logistic regression
from pyspark.ml.classification import LogisticRegression

In [44]:
lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8,featuresCol="features", labelCol="Over_65",)

In [45]:
lrModel = lr.fit(va_training)

In [46]:
lrModel.intercept

-2.926721349569394

In [47]:
lrModel.summary.objectiveHistory

[0.6517565611726542,
 0.6497899965720552,
 0.6496749082437917,
 0.6484195485613644,
 0.6449890163146406,
 0.6343521971926331,
 0.5982086773774155,
 0.5981287998891636,
 0.5981286764154597,
 0.5981282149744938,
 0.5981282134588519]

In [48]:
print("Coefficients: %s" % str(lrModel.coefficients))
print("Intercept: %s" % str(lrModel.intercept))

# Summarize the model over the training set and print out some metrics
trainingSummary = lrModel.summary
print("numIterations: %d" % trainingSummary.totalIterations)
print("objectiveHistory: %s" % str(trainingSummary.objectiveHistory))

Coefficients: [0.05272353574758242]
Intercept: -2.926721349569394
numIterations: 11
objectiveHistory: [0.6517565611726542, 0.6497899965720552, 0.6496749082437917, 0.6484195485613644, 0.6449890163146406, 0.6343521971926331, 0.5982086773774155, 0.5981287998891636, 0.5981286764154597, 0.5981282149744938, 0.5981282134588519]


In [49]:
vectorAssembler = VectorAssembler(inputCols = ['lifeExp_2007'], outputCol = 'features')
va_training = vectorAssembler.transform(trainingData)
v_traininga = va_training.select(['features', 'Over_65'])
va_training.show(50)

+---------+--------------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+-----------+--------+--------+--------+--------+--------+--------+--------+--------+--------+---------+---------+-------+--------+
|continent|             country|gdpPercap_1952|gdpPercap_1957|gdpPercap_1962|gdpPercap_1967|gdpPercap_1972|gdpPercap_1977|gdpPercap_1982|gdpPercap_1987|gdpPercap_1992|gdpPercap_1997|gdpPercap_2002|gdpPercap_2007|lifeExp_1952|lifeExp_1957|lifeExp_1962|lifeExp_1967|lifeExp_1972|lifeExp_1977|lifeExp_1982|lifeExp_1987|lifeExp_1992|lifeExp_1997|lifeExp_2002|lifeExp_2007|   pop_1952|pop_1957|pop_1962|pop_1967|pop_1972|pop_1977|pop_1982|pop_1987|pop_1992|pop_1997| pop_2002| pop_2007|Over_65|features|
+-

In [51]:
predictions = rf_model.transform(va_testing)

In [52]:
predictions.show(100)

+--------+-------+------------------+
|features|Over_65|        prediction|
+--------+-------+------------------+
|[72.301]|      1|               1.0|
|[42.731]|      0|               0.0|
|[56.728]|      0|               0.0|
|[50.728]|      0|               0.0|
| [49.58]|      0|               0.0|
|[44.741]|      0|               0.0|
|[50.651]|      0|               0.0|
|[46.462]|      0|               0.0|
|[48.328]|      0|               0.0|
|[54.791]|      0|               0.0|
|[51.579]|      0|               0.0|
| [58.04]|      0|               0.0|
|[52.947]|      0|               0.0|
|[59.448]|      0|               0.0|
|[56.007]|      0|               0.0|
|[46.388]|      0|               0.0|
| [54.11]|      0|               0.0|
|[42.592]|      0|               0.0|
|[45.678]|      0|               0.0|
|[73.952]|      1|               1.0|
|[59.443]|      0|               0.0|
|[48.303]|      0|               0.0|
|[54.467]|      0|               0.0|
|[64.164]|  