In [15]:
# this workbook demonstrates how to use PySparkML to do single variable binary regression
# the setup is a bit contrived, since this is a how-to demo
# The gapminder_data has an additional column, "Over_65", which is set to 1 or 0 
# based on whether the life expectancy in 2007 is above or below 65
# we then train a regression model to predict whether life expectancy is above or below 65
# we will use this to predict the values used to train the model, as well as
# to see what cutoff value the model uses for this binary mapping
# (which in this contrived example should, of course, be close to 65)
# 
# we will do this with 1) a decision tree regressor, and 2) a random forest regressor
#
# this code parallels the continuous regression workbook example
# please see that example for more comments and explanations

In [16]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext

In [17]:
from pyspark.sql.session import SparkSession
sc = SparkContext('local')
spark = SparkSession(sc)

ValueError: Cannot run multiple SparkContexts at once; existing SparkContext(app=pyspark-shell, master=local) created by __init__ at <ipython-input-6-80d619339f4b>:2 

In [18]:
sqlContext = SQLContext(sc)

In [8]:
df = sqlContext.read.format("csv").option("inferschema","true").option("header", "true").option("delimiter", ",").load("gapminder_all_binary.csv")

In [9]:
df.printSchema()

root
 |-- continent: string (nullable = true)
 |-- country: string (nullable = true)
 |-- gdpPercap_1952: double (nullable = true)
 |-- gdpPercap_1957: double (nullable = true)
 |-- gdpPercap_1962: double (nullable = true)
 |-- gdpPercap_1967: double (nullable = true)
 |-- gdpPercap_1972: double (nullable = true)
 |-- gdpPercap_1977: double (nullable = true)
 |-- gdpPercap_1982: double (nullable = true)
 |-- gdpPercap_1987: double (nullable = true)
 |-- gdpPercap_1992: double (nullable = true)
 |-- gdpPercap_1997: double (nullable = true)
 |-- gdpPercap_2002: double (nullable = true)
 |-- gdpPercap_2007: double (nullable = true)
 |-- lifeExp_1952: double (nullable = true)
 |-- lifeExp_1957: double (nullable = true)
 |-- lifeExp_1962: double (nullable = true)
 |-- lifeExp_1967: double (nullable = true)
 |-- lifeExp_1972: double (nullable = true)
 |-- lifeExp_1977: double (nullable = true)
 |-- lifeExp_1982: double (nullable = true)
 |-- lifeExp_1987: double (nullable = true)
 |-- lifeEx

In [10]:
from pyspark.ml.feature import VectorAssembler

In [11]:
vectorAssembler = VectorAssembler(inputCols = ['lifeExp_2007'], outputCol = 'features')
va = vectorAssembler.transform(df)
va = va.select(['features', 'Over_65'])
va.show(10)

+--------+-------+
|features|Over_65|
+--------+-------+
|[72.301]|      1|
|[42.731]|      0|
|[56.728]|      0|
|[50.728]|      0|
|[52.295]|      0|
| [49.58]|      0|
| [50.43]|      0|
|[44.741]|      0|
|[50.651]|      0|
|[65.152]|      1|
+--------+-------+
only showing top 10 rows



In [12]:
from pyspark.ml.regression import DecisionTreeRegressor

In [13]:
dt = DecisionTreeRegressor(featuresCol ='features', labelCol = 'Over_65')

In [14]:
dt_model = dt.fit(va)

In [23]:
dt_predictions = dt_model.transform(va)

In [24]:
de_predictions.show(100)

+--------+-------+----------+
|features|Over_65|prediction|
+--------+-------+----------+
|[72.301]|      1|       1.0|
|[42.731]|      0|       0.0|
|[56.728]|      0|       0.0|
|[50.728]|      0|       0.0|
|[52.295]|      0|       0.0|
| [49.58]|      0|       0.0|
| [50.43]|      0|       0.0|
|[44.741]|      0|       0.0|
|[50.651]|      0|       0.0|
|[65.152]|      1|      0.25|
|[46.462]|      0|       0.0|
|[55.322]|      0|       0.0|
|[48.328]|      0|       0.0|
|[54.791]|      0|       0.0|
|[71.338]|      1|       1.0|
|[51.579]|      0|       0.0|
| [58.04]|      0|       0.0|
|[52.947]|      0|       0.0|
|[56.735]|      0|       0.0|
|[59.448]|      0|       0.0|
|[60.022]|      0|       0.0|
|[56.007]|      0|       0.0|
|[46.388]|      0|       0.0|
| [54.11]|      0|       0.0|
|[42.592]|      0|       0.0|
|[45.678]|      0|       0.0|
|[73.952]|      1|       1.0|
|[59.443]|      0|       0.0|
|[48.303]|      0|       0.0|
|[54.467]|      0|       0.0|
|[64.164]|

In [25]:
from pyspark.ml.regression import RandomForestRegressor

In [28]:
rf = RandomForestRegressor(featuresCol ='features', labelCol = 'Over_65', numTrees=20)

In [29]:
rf_model = rf.fit(va)

In [30]:
predictions = rf_model.transform(va)

In [31]:
predictions.show(100)

+--------+-------+-------------------+
|features|Over_65|         prediction|
+--------+-------+-------------------+
|[72.301]|      1|                1.0|
|[42.731]|      0|                0.0|
|[56.728]|      0|                0.0|
|[50.728]|      0|                0.0|
|[52.295]|      0|                0.0|
| [49.58]|      0|                0.0|
| [50.43]|      0|                0.0|
|[44.741]|      0|                0.0|
|[50.651]|      0|                0.0|
|[65.152]|      1|0.18958333333333333|
|[46.462]|      0|                0.0|
|[55.322]|      0|                0.0|
|[48.328]|      0|                0.0|
|[54.791]|      0|                0.0|
|[71.338]|      1|                1.0|
|[51.579]|      0|                0.0|
| [58.04]|      0|                0.0|
|[52.947]|      0|                0.0|
|[56.735]|      0|                0.0|
|[59.448]|      0|                0.0|
|[60.022]|      0|                0.0|
|[56.007]|      0|                0.0|
|[46.388]|      0|       

In [None]:
# find the cutoff value for predicting "Over_65"
#
# this part was a little surprising to me. Scikit-learn has a method for this: clf.tree_.threshold[0]
# 
# unfortunately, looks like there isn't a method for this in SparkML? 
# you can see the tree by printing .toDebugString on your model and 
# inspecting for where a prediction value around 0.5 occurs
# which appears to be around 63.925...
#
# not sure if this is really my only or best option, I've looked around 
# but haven't found a tidy api call for this...

In [37]:
rf_model.toDebugString

'RandomForestRegressionModel (uid=RandomForestRegressor_df344b7bd79e) with 20 trees\n  Tree 0 (weight 1.0):\n    If (feature 0 <= 65.3175)\n     Predict: 0.0\n    Else (feature 0 > 65.3175)\n     If (feature 0 <= 75.6375)\n      Predict: 1.0\n     Else (feature 0 > 75.6375)\n      If (feature 0 <= 76.4325)\n       Predict: 0.4\n      Else (feature 0 > 76.4325)\n       Predict: 1.0\n  Tree 1 (weight 1.0):\n    If (feature 0 <= 65.3175)\n     Predict: 0.0\n    Else (feature 0 > 65.3175)\n     Predict: 1.0\n  Tree 2 (weight 1.0):\n    If (feature 0 <= 65.3175)\n     If (feature 0 <= 60.468999999999994)\n      Predict: 0.0\n     Else (feature 0 > 60.468999999999994)\n      If (feature 0 <= 63.9235)\n       Predict: 0.5\n      Else (feature 0 > 63.9235)\n       Predict: 0.0\n    Else (feature 0 > 65.3175)\n     If (feature 0 <= 75.6375)\n      Predict: 1.0\n     Else (feature 0 > 75.6375)\n      If (feature 0 <= 76.4325)\n       Predict: 0.75\n      Else (feature 0 > 76.4325)\n       Predic

In [38]:
# try logistic regression
from pyspark.ml.classification import LogisticRegression

In [41]:
lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8,featuresCol="features", labelCol="Over_65",)

In [42]:
lrModel = lr.fit(va)

In [43]:
lrModel.intercept

-0.47969926995683415

In [44]:
lrModel.summary.objectiveHistory

[0.6569006935452156,
 0.6547050641904729,
 0.6545341290721645,
 0.6540710709709727,
 0.652286351873163,
 0.6406910757613997,
 0.63963401071486,
 0.6393084114199987,
 0.6360441600447867,
 0.6327485585421929,
 0.6303534937596784]

In [45]:
print("Coefficients: %s" % str(lrModel.coefficients))
print("Intercept: %s" % str(lrModel.intercept))

# Summarize the model over the training set and print out some metrics
trainingSummary = lrModel.summary
print("numIterations: %d" % trainingSummary.totalIterations)
print("objectiveHistory: %s" % str(trainingSummary.objectiveHistory))

Coefficients: [0.014396655808173855]
Intercept: -0.47969926995683415
numIterations: 11
objectiveHistory: [0.6569006935452156, 0.6547050641904729, 0.6545341290721645, 0.6540710709709727, 0.652286351873163, 0.6406910757613997, 0.63963401071486, 0.6393084114199987, 0.6360441600447867, 0.6327485585421929, 0.6303534937596784]


In [46]:
vectorAssembler = VectorAssembler(inputCols = ['lifeExp_2007'], outputCol = 'features')
va = vectorAssembler.transform(df)
va = va.select(['features', 'Over_65'])
va.show(50)

+--------+-------+
|features|Over_65|
+--------+-------+
|[72.301]|      1|
|[42.731]|      0|
|[56.728]|      0|
|[50.728]|      0|
|[52.295]|      0|
| [49.58]|      0|
| [50.43]|      0|
|[44.741]|      0|
|[50.651]|      0|
|[65.152]|      1|
|[46.462]|      0|
|[55.322]|      0|
|[48.328]|      0|
|[54.791]|      0|
|[71.338]|      1|
|[51.579]|      0|
| [58.04]|      0|
|[52.947]|      0|
|[56.735]|      0|
|[59.448]|      0|
|[60.022]|      0|
|[56.007]|      0|
|[46.388]|      0|
| [54.11]|      0|
|[42.592]|      0|
|[45.678]|      0|
|[73.952]|      1|
|[59.443]|      0|
|[48.303]|      0|
|[54.467]|      0|
|[64.164]|      0|
|[72.801]|      1|
|[71.164]|      1|
|[42.082]|      0|
|[52.906]|      0|
|[56.867]|      0|
|[46.859]|      0|
|[76.442]|      1|
|[46.242]|      0|
|[65.528]|      1|
|[63.062]|      0|
|[42.568]|      0|
|[48.159]|      0|
|[49.339]|      0|
|[58.556]|      0|
|[39.613]|      0|
|[52.517]|      0|
| [58.42]|      0|
|[73.923]|      1|
|[51.542]|  

In [47]:
predictions = rf_model.transform(va)

In [48]:
predictions.show(100)

+--------+-------+-------------------+
|features|Over_65|         prediction|
+--------+-------+-------------------+
|[72.301]|      1|                1.0|
|[42.731]|      0|                0.0|
|[56.728]|      0|                0.0|
|[50.728]|      0|                0.0|
|[52.295]|      0|                0.0|
| [49.58]|      0|                0.0|
| [50.43]|      0|                0.0|
|[44.741]|      0|                0.0|
|[50.651]|      0|                0.0|
|[65.152]|      1|0.18958333333333333|
|[46.462]|      0|                0.0|
|[55.322]|      0|                0.0|
|[48.328]|      0|                0.0|
|[54.791]|      0|                0.0|
|[71.338]|      1|                1.0|
|[51.579]|      0|                0.0|
| [58.04]|      0|                0.0|
|[52.947]|      0|                0.0|
|[56.735]|      0|                0.0|
|[59.448]|      0|                0.0|
|[60.022]|      0|                0.0|
|[56.007]|      0|                0.0|
|[46.388]|      0|       

In [49]:
lrModel

LogisticRegressionModel: uid = LogisticRegression_63ecc440f87d, numClasses = 2, numFeatures = 1