This notebook looks to replicate the tutorial material from databrick's own tutorials, only using open source libraries and tools.  
  
Below is a list of the tools used.

http://192.168.1.123:8998  
Livy Server: http://192.168.1.123:8998/ui  
Spark Master: http://192.168.1.123:8080/  
Spark Magic: https://github.com/jupyter-incubator/sparkmagic/blob/master/examples/Magics%20in%20IPython%20Kernel.ipynb  

Let's load up sparkmagic to be able to communicate without spark cluster

In [1]:
%load_ext sparkmagic.magics

Now let's configure our connection

In [2]:
%manage_spark

TWFnaWNzQ29udHJvbGxlcldpZGdldChjaGlsZHJlbj0oVGFiKGNoaWxkcmVuPShNYW5hZ2VTZXNzaW9uV2lkZ2V0KGNoaWxkcmVuPShIVE1MKHZhbHVlPXUnPGJyLz4nKSwgSFRNTCh2YWx1ZT3igKY=


Added endpoint http://192.168.1.123:8998/
Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
2,,spark,idle,,,✔


SparkSession available as 'spark'.


Let's read the data used in the tutorial

In [3]:
%%spark
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

val marketsFile = sc.textFile("file:///home/churtado/notebooks/data/farmers_market.csv")

val taxes2013 = spark.read.format("csv").option("header", "true").load("file:///home/churtado/notebooks/data/zipcodes.csv")
taxes2013.createOrReplaceTempView("taxes2013")
val markets = spark.read.format("csv").option("header", "true").load("file:///home/churtado/notebooks/data/farmers_market.csv")
markets.createOrReplaceTempView("markets")

sqlContext: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@1a54232e
marketsFile: org.apache.spark.rdd.RDD[String] = file:///home/churtado/notebooks/data/farmers_market.csv MapPartitionsRDD[1] at textFile at <console>:26
taxes2013: org.apache.spark.sql.DataFrame = [STATEFIPS: string, STATE: string ... 112 more fields]
markets: org.apache.spark.sql.DataFrame = [FMID: string, MarketName: string ... 57 more fields]


We can use sql magic to view the data

In [4]:
%%spark -c sql
SHOW TABLES

Unnamed: 0,database,tableName,isTemporary
0,default,cleaned_taxes,False
1,default,cleaned_taxes_state,False
2,default,income_state,False
3,,markets,True
4,,taxes2013,True


In [5]:
%%spark -c sql -q -o df_markets 
SELECT * FROM markets

We can now use pandas dataframes for the data

In [6]:
df_markets.head(3)

Unnamed: 0,FMID,MarketName,Website,Facebook,city,County,State,zip,Season1Date,Season1Time,...,Tofu,WildHarvested,updateTime,street,OtherMedia,Location,Twitter,Youtube,Season3Date,Season3Time
0,1018261,Caledonia Farmers Market Association - Danville,https://sites.google.com/site/caledoniafarmers...,https://www.facebook.com/Danville.VT.Farmers.M...,Danville,Caledonia,Vermont,5828.0,06/14/2017 to 08/30/2017,Wed: 9:00 AM-1:00 PM;,...,N,N,2017-06-20 22:43:57,,,,,,,
1,1018318,Stearns Homestead Farmers' Market,http://www.StearnsHomestead.com,StearnsHomesteadFarmersMarket,Parma,Cuyahoga,Ohio,,06/24/2017 to 09/30/2017,Sat: 9:00 AM-1:00 PM;,...,N,N,2017-06-21 17:15:01,6975 Ridge Road,,,,,,
2,1009364,106 S. Main Street Farmers Market,http://thetownofsixmile.wordpress.com/,,Six Mile,,South Carolina,29682.0,,,...,N,N,2013-01-01 00:00:00,106 S. Main Street,,,,,,


We can visualize the data as well

In [7]:
from autovizwidget.widget.utils import display_dataframe
display_dataframe(df_markets)

VkJveChjaGlsZHJlbj0oSEJveChjaGlsZHJlbj0oSFRNTCh2YWx1ZT11J1R5cGU6JyksIEJ1dHRvbihkZXNjcmlwdGlvbj11J1RhYmxlJywgbGF5b3V0PUxheW91dCh3aWR0aD11JzcwcHgnKSzigKY=


Output()

AutoVizWidget()

Now we're going to clean up the data with SQL

In [8]:
%%spark -c sql
DROP TABLE IF EXISTS cleaned_taxes

In [9]:
%%spark -c sql -q -o df_cleaned_taxes

CREATE TABLE cleaned_taxes AS
SELECT state, int(zipcode / 10) as zipcode, 
  int(mars1) as single_returns, 
  int(mars2) as joint_returns, 
  int(numdep) as numdep, 
  double(A02650) as total_income_amount,
  double(A00300) as taxable_interest_amount,
  double(a01000) as net_capital_gains,
  double(a00900) as biz_net_income
FROM taxes2013

We want to look at avg income per state

In [10]:
%%spark -c sql
DROP TABLE IF EXISTS income_state

In [11]:
%%spark -c sql -q 

CREATE TABLE income_state AS
SELECT state, 
sum(total_income_amount) / sum(1) as total_income
FROM cleaned_taxes
group by state

In [12]:
%%spark -c sql -q  -o df_income_state

select * from income_state

Let's try to plot income by state using plotly

In [13]:
import plotly
plotly.tools.set_credentials_file(username='churtado', api_key='iaMRV6ydU9Ove5Yfy0R7')

In [14]:
states = df_income_state["state"]
values = df_income_state["total_income"]

import plotly.plotly as py
import pandas as pd

data = [ dict(
        type='choropleth',
        autocolorscale = True,
        locations = df_income_state['state'],
        z = df_income_state['total_income'].astype(float),
        locationmode = 'USA-states',
        marker = dict(
            line = dict (
                color = 'rgb(255,255,255)',
                width = 2
            ) ),
        colorbar = dict(
            title = "Total Avg Income")
        ) ]

layout = dict(
        title = 'Map of avg income',
        geo = dict(
            scope='usa',
            projection=dict( type='albers usa' ),
            showlakes = True,
            lakecolor = 'rgb(255, 255, 255)'),
             )
    
fig = dict( data=data, layout=layout )
py.iplot( fig, filename='d3-cloropleth-map' )

In [15]:
%%spark -c sql 

SELECT * FROM cleaned_taxes limit 5

Unnamed: 0,state,zipcode,single_returns,joint_returns,numdep,total_income_amount,taxable_interest_amount,net_capital_gains,biz_net_income
0,TX,7924,70,90,130,6226.0,25.0,0.0,273.0
1,TX,7924,0,60,60,4063.0,52.0,168.0,133.0
2,TX,7924,0,30,40,3347.0,0.0,0.0,0.0
3,TX,7924,0,30,0,5660.0,40.0,0.0,0.0
4,TX,7924,0,0,0,0.0,0.0,0.0,0.0


In [16]:
%%spark -c sql -q -o df_capital_gains

SELECT zipcode AS zipcode, SUM(net_capital_gains) AS cap_gains
FROM cleaned_taxes 
  WHERE NOT (zipcode = 0000 OR zipcode = 9999)
GROUP BY zipcode
ORDER BY cap_gains ASC
LIMIT 10

Let's look at the set of zip codes with the lowest total capital gains and plot the results.

In [17]:
import plotly.plotly as py
import plotly.graph_objs as go

trace1 = go.Bar(
    x=df_capital_gains["zipcode"],
    y=df_capital_gains["cap_gains"]
)
data = [trace1]
layout = go.Layout(
    xaxis=dict(
        showgrid=False,
        type='category',
        zeroline=False,
        showline=True,
        mirror='ticks',
        gridcolor='#bdbdbd',
        gridwidth=1,
        zerolinecolor='#969696',
        zerolinewidth=4,
        linecolor='#636363',
        #linewidth=6
    ),
    yaxis=dict(
        showgrid=False,
        zeroline=False,
        showline=True,
        mirror='ticks',
        gridcolor='#bdbdbd',
        gridwidth=1,
        zerolinecolor='#969696',
        zerolinewidth=1,
        linecolor='#636363',
        #linewidth=6
    )
)
fig = go.Figure(data=data, layout=layout)
py.iplot(fig, filename='axes-lines')

Let's look at a combination of capital gains and business net income to see what we find.

In [18]:
%%spark -c sql -q -o df_capital_and_business_income

SELECT zipcode, 
  SUM(biz_net_income) as business_net_income, 
  SUM(net_capital_gains) as capital_gains, 
  SUM(net_capital_gains) + SUM(biz_net_income) as capital_and_business_income
FROM cleaned_taxes 
  WHERE NOT (zipcode = 0000 OR zipcode = 9999)
GROUP BY zipcode
ORDER BY capital_and_business_income DESC
LIMIT 50

In [21]:
import plotly.plotly as py
import plotly.graph_objs as go

trace1 = go.Bar(
    x=df_capital_and_business_income["zipcode"],
    y=df_capital_and_business_income["business_net_income"],
    name='Business Net Income'
)

trace2 = go.Bar(
    x=df_capital_and_business_income["zipcode"],
    y=df_capital_and_business_income["capital_gains"],
    name='Capital Gains'
)

trace3 = go.Bar(
    x=df_capital_and_business_income["zipcode"],
    y=df_capital_and_business_income["capital_and_business_income"],
    name='Capital and Business Income'
)

data = [trace1, trace2, trace3]
layout = go.Layout(
    barmode='group',
    xaxis=dict(
        showgrid=False,
        type='category',
        zeroline=False,
        showline=True,
        mirror='ticks',
        gridcolor='#bdbdbd',
        gridwidth=1,
        zerolinecolor='#969696',
        zerolinewidth=4,
        linecolor='#636363',
        #linewidth=6
    ),
    yaxis=dict(
        showgrid=False,
        zeroline=False,
        showline=True,
        mirror='ticks',
        gridcolor='#bdbdbd',
        gridwidth=1,
        zerolinecolor='#969696',
        zerolinewidth=1,
        linecolor='#636363',
        #linewidth=6
    )
)
fig = go.Figure(data=data, layout=layout)
py.iplot(fig, filename='kpis')

If we want to see how a query will run we can look at the plan

In [56]:
%%spark -c sql -q -o df_explain_plan
EXPLAIN 
  SELECT zipcode, 
    SUM(biz_net_income) as net_income, 
    SUM(net_capital_gains) as cap_gains, 
    SUM(net_capital_gains) + SUM(biz_net_income) as combo
  FROM cleaned_taxes 
  WHERE NOT (zipcode = 0000 OR zipcode = 9999)
  GROUP BY zipcode
  ORDER BY combo desc
  limit 50

In [55]:
import sys

pd.set_option("display.max_colwidth", 10000)
plan = df_explain_plan["plan"].to_string().replace('\\n', '\n')
sys.stdout.write(plan)

0    == Physical Plan ==
TakeOrderedAndProject(limit=50, orderBy=[combo#809 DESC NULLS LAST], output=[zipcode#813,net_income#807,cap_gains#808,combo#809])
+- *(2) HashAggregate(keys=[zipcode#813], functions=[sum(biz_net_income#820), sum(net_capital_gains#819)])
   +- Exchange hashpartitioning(zipcode#813, 200)
      +- *(1) HashAggregate(keys=[zipcode#813], functions=[partial_sum(biz_net_income#820), partial_sum(net_capital_gains#819)])
         +- *(1) Filter ((isnotnull(zipcode#813) && NOT (zipcode#813 = 0)) && NOT (zipcode#813 = 9999))
            +- HiveTableScan [zipcode#813, net_capital_gains#819, biz_net_income#820], HiveTableRelation `default`.`cleaned_taxes`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [state#812, zipcode#813, single_returns#814, joint_returns#815, numdep#816, total_income_amount#817, taxable_interest_amount#818, net_capital_gains#819, biz_net_income#820]

We can cache the table in memory. If we do it with SQL it's done eagerly, and with the API lazily

In [57]:
%%spark
spark.table("cleaned_taxes").cache()

res21: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [state: string, zipcode: int ... 7 more fields]


In [58]:
%%spark -c sql 

CACHE TABLE cleaned_taxes

If we query it now, the query will be faster

In [61]:
%%spark -c sql  -q 

SELECT zipcode, 
  SUM(biz_net_income) as net_income, 
  SUM(net_capital_gains) as cap_gains, 
  SUM(net_capital_gains) + SUM(biz_net_income) as combo
FROM cleaned_taxes 
  WHERE NOT (zipcode = 0000 OR zipcode = 9999)
GROUP BY zipcode
ORDER BY combo desc
limit 50

This will have a different explain plan

In [62]:
%%spark -c sql -q -o df_explain_plan2
EXPLAIN 
  SELECT zipcode, 
    SUM(biz_net_income) as net_income, 
    SUM(net_capital_gains) as cap_gains, 
    SUM(net_capital_gains) + SUM(biz_net_income) as combo
  FROM cleaned_taxes 
  WHERE NOT (zipcode = 0000 OR zipcode = 9999)
  GROUP BY zipcode
  ORDER BY combo desc
  limit 50

In [63]:
import sys

pd.set_option("display.max_colwidth", 10000)
plan = df_explain_plan2["plan"].to_string().replace('\\n', '\n')
sys.stdout.write(plan)

0    == Physical Plan ==
TakeOrderedAndProject(limit=50, orderBy=[combo#1183 DESC NULLS LAST], output=[zipcode#1188,net_income#1181,cap_gains#1182,combo#1183])
+- *(2) HashAggregate(keys=[zipcode#1188], functions=[sum(biz_net_income#1195), sum(net_capital_gains#1194)])
   +- Exchange hashpartitioning(zipcode#1188, 200)
      +- *(1) HashAggregate(keys=[zipcode#1188], functions=[partial_sum(biz_net_income#1195), partial_sum(net_capital_gains#1194)])
         +- *(1) Filter ((isnotnull(zipcode#1188) && NOT (zipcode#1188 = 0)) && NOT (zipcode#1188 = 9999))
            +- InMemoryTableScan [zipcode#1188, net_capital_gains#1194, biz_net_income#1195], [isnotnull(zipcode#1188), NOT (zipcode#1188 = 0), NOT (zipcode#1188 = 9999)]
                  +- InMemoryRelation [state#1187, zipcode#1188, single_returns#1189, joint_returns#1190, numdep#1191, total_income_amount#1192, taxable_interest_amount#1193, net_capital_gains#1194, biz_net_income#1195], true, 10000, StorageLevel(disk, memory, deserial

Now let's look at the farmer's market data

In [67]:
%%spark -c sql -q -o df_markets
SELECT state, count(*) as count FROM markets GROUP BY state

In [68]:
import plotly.plotly as py
import plotly.graph_objs as go

trace1 = go.Bar(
    x=df_markets["state"],
    y=df_markets["count"]
)
data = [trace1]
layout = go.Layout(
    xaxis=dict(
        showgrid=False,
        type='category',
        zeroline=False,
        showline=True,
        mirror='ticks',
        gridcolor='#bdbdbd',
        gridwidth=1,
        zerolinecolor='#969696',
        zerolinewidth=4,
        linecolor='#636363',
        #linewidth=6
    ),
    yaxis=dict(
        showgrid=False,
        zeroline=False,
        showline=True,
        mirror='ticks',
        gridcolor='#bdbdbd',
        gridwidth=1,
        zerolinecolor='#969696',
        zerolinewidth=1,
        linecolor='#636363',
        #linewidth=6
    )
)
fig = go.Figure(data=data, layout=layout)
py.iplot(fig, filename='markets1')

We could explore more, but fuck it. Let's do some machine learning. Basically our categorical values will have to be converted to numerics

In [70]:
%%spark

val cleanedTaxes = spark.sql("SELECT * FROM cleaned_taxes")

val summedTaxes = cleanedTaxes.groupBy("zipcode").sum()

// selectExpr is short for Select Expression, basically running a select on SQL
val cleanedMarkets = markets.selectExpr("*", "int(zip / 10) as zipcode").groupBy("zipcode").count().selectExpr("double(count) as count", "zipcode as zip")

val joined = cleanedMarkets.join(summedTaxes, cleanedMarkets("zip") === summedTaxes("zipcode"), "outer")

joined.

cleanedTaxes: org.apache.spark.sql.DataFrame = [state: string, zipcode: int ... 7 more fields]
summedTaxes: org.apache.spark.sql.DataFrame = [zipcode: int, sum(zipcode): bigint ... 7 more fields]
cleanedMarkets: org.apache.spark.sql.DataFrame = [count: double, zip: int]
joined: org.apache.spark.sql.DataFrame = [count: double, zip: int ... 9 more fields]


In [73]:
%%spark

joined.createOrReplaceTempView("joined")

MLLIB doesn't accept null values, so we'll convert our nulls to zeros

In [77]:
%%spark
val prepped = joined.na.fill(0)
prepped.createOrReplaceTempView("prepped")

prepped: org.apache.spark.sql.DataFrame = [count: double, zip: int ... 9 more fields]


In [78]:
%%spark -c sql
select * from prepped limit 5

Unnamed: 0,count,zip,zipcode,sum(zipcode),sum(single_returns),sum(joint_returns),sum(numdep),sum(total_income_amount),sum(taxable_interest_amount),sum(net_capital_gains),sum(biz_net_income)
0,0.0,0,463,8334,930,840,980,82700.0,460.0,825.0,4409.0
1,2.0,496,496,17856,3290,3250,4150,357071.0,1540.0,5704.0,10969.0
2,0.0,0,833,9996,14000,9490,19940,1497553.0,7747.0,12788.0,32485.0
3,1.0,1342,1342,40260,4800,3680,5450,475283.0,2919.0,6476.0,13496.0
4,1.0,1580,1580,9480,4600,3940,4650,516516.0,4243.0,10827.0,22441.0


Data is ready, lets conver into a vector type, so we can embed a prediction into it easily. We'll remove what isn't a feature

In [79]:
%%spark

val nonFeatureCols = Array("zip", "zipcode", "count")
val featureCols = prepped.columns.diff(nonFeatureCols)

nonFeatureCols: Array[String] = Array(zip, zipcode, count)
featureCols: Array[String] = Array(sum(zipcode), sum(single_returns), sum(joint_returns), sum(numdep), sum(total_income_amount), sum(taxable_interest_amount), sum(net_capital_gains), sum(biz_net_income))


Now we use VectorAssembler so all columns go in a single vector: set up the input and output columns

Now I'm going to use the VectorAssembler in Apache Spark to Assemble all of these columns into one single vector. To do this I'll have to set the input columns and output column. Then I'll use that assembler to transform the prepped data to my final dataset.

In [81]:
%%spark
import org.apache.spark.ml.feature.VectorAssembler

val assembler = new VectorAssembler().setInputCols(featureCols).setOutputCol("features")

val finalPrep = assembler.transform(prepped)

import org.apache.spark.ml.feature.VectorAssembler
assembler: org.apache.spark.ml.feature.VectorAssembler = vecAssembler_e25e19a06a51
finalPrep: org.apache.spark.sql.DataFrame = [count: double, zip: int ... 10 more fields]


Split into training and test set

In [82]:
%%spark

val Array(training, test) = finalPrep.randomSplit(Array(0.7, 0.3))

// Going to cache the data to make sure things stay snappy!
training.cache()
test.cache()

println(training.count())
println(test.count())

training: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [count: double, zip: int ... 10 more fields]
test: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [count: double, zip: int ... 10 more fields]
res47: training.type = [count: double, zip: int ... 10 more fields]
res48: test.type = [count: double, zip: int ... 10 more fields]
4068
1740


Now let's do some machine learning

In [84]:
%%spark
import org.apache.spark.ml.regression.LinearRegression

val lrModel = new LinearRegression().setLabelCol("count").setFeaturesCol("features").setElasticNetParam(0.5)

println("Printing out the model Parameters:")
println("-"*20)
println(lrModel.explainParams)
println("-"*20)

import org.apache.spark.ml.regression.LinearRegression
lrModel: org.apache.spark.ml.regression.LinearRegression = linReg_557b5ccab242
Printing out the model Parameters:
--------------------
aggregationDepth: suggested depth for treeAggregate (>= 2) (default: 2)
elasticNetParam: the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty (default: 0.0, current: 0.5)
epsilon: The shape parameter to control the amount of robustness. Must be > 1.0. (default: 1.35)
featuresCol: features column name (default: features, current: features)
fitIntercept: whether to fit an intercept term (default: true)
labelCol: label column name (default: label, current: count)
loss: The loss function to be optimized. Supported options: squaredError, huber. (Default squaredError) (default: squaredError)
maxIter: maximum number of iterations (>= 0) (default: 100)
predictionCol: prediction column name (default: prediction)
regParam: regulariza

We can see what our model is going to do, now let's train it

In [86]:
%%spark 

import org.apache.spark.mllib.evaluation.RegressionMetrics
val lrFitted = lrModel.fit(training)

import org.apache.spark.mllib.evaluation.RegressionMetrics
lrFitted: org.apache.spark.ml.regression.LinearRegressionModel = linReg_557b5ccab242


Now you'll see that since we're working with exact numbers (you can't have 1/2 a farmer's market for example), I'm going to check equality by first rounding the value to the nearest digital value.

In [92]:
%%spark 
val holdout = lrFitted.transform(test).selectExpr("prediction as raw_prediction", "double(round(prediction)) as prediction", "count", """CASE double(round(prediction)) = count WHEN true then 1 ELSE 0 END as equal""")

holdout.createOrReplaceTempView("holdout")

holdout: org.apache.spark.sql.DataFrame = [raw_prediction: double, prediction: double ... 2 more fields]


In [93]:
%%spark -c sql
select * from holdout limit 5

Unnamed: 0,raw_prediction,prediction,count,equal
0,1.653628,2.0,0.0,0
1,1.226335,1.0,0.0,0
2,1.301371,1.0,0.0,0
3,0.091524,0.0,0.0,1
4,1.537867,2.0,1.0,0


now let's see what proportion was exactly correct.

In [98]:
%%spark
val proportion = holdout.selectExpr("sum(equal)/sum(1)")

proportion.createOrReplaceTempView("proportion")

proportion: org.apache.spark.sql.DataFrame = [(sum(equal) / sum(1)): double]


In [99]:
%%spark -c sql
select * from proportion limit 5

Unnamed: 0,(sum(equal) / sum(1))
0,0.223563


Let's also calculate some regression metrics.

In [100]:
%%spark
// have to do a type conversion for RegressionMetrics
val rm = new RegressionMetrics(
  holdout.select("prediction", "count").rdd.map(x =>
  (x(0).asInstanceOf[Double], x(1).asInstanceOf[Double])))

println("MSE: " + rm.meanSquaredError)
println("MAE: " + rm.meanAbsoluteError)
println("RMSE Squared: " + rm.rootMeanSquaredError)
println("R Squared: " + rm.r2)
println("Explained Variance: " + rm.explainedVariance + "\n")

rm: org.apache.spark.mllib.evaluation.RegressionMetrics = org.apache.spark.mllib.evaluation.RegressionMetrics@62cbddcf
MSE: 3.1
MAE: 1.2494252873563219
RMSE Squared: 1.760681686165901
R Squared: -0.08621070373921191
Explained Variance: 0.7688796406394507



I found these results to be sub-optimal, so let's try exploring another way to train the model. Rather than training on a single model with hard-coded parameters, let's train using a pipeline.  
  
A pipeline is going to give us some nice benefits in that it will allow us to couple of transformations we need in order to transform our raw data into the prepared data for the model but also it provides a simple, straightforward way to try out a lot of different combinations of parameters. This is a process called hyperparameter tuning or grid search. To review, grid search is where you set up the exact parameters that you would like to test and MLLib will automatically create all the necessary combinations of these to test.  
  
For example, below we'll see numTrees to 20 and 60 and maxDepth to 5 and 10. The parameter grid builder will automatically construct all the combinations of these two variable (along with the other ones that we might specify too). Additionally we're also going to use cross validation) to tune our hyperparameters, this will allow us to attempt to try to control overfitting of our model.  
  
Lastly we'll need to set up a Regression Evaluator that will evaluate the models that we choose based on some metric (the default is RMSE). The key take away is that the pipeline will automatically optimize for our given metric choice by exploring the parameter grid that we set up rather than us having to do it manually like we would have had to do above.  
  
Now we can go about training our random forest!  
  
note: this might take a little while because of the number of combinations that we're trying and limitations in workers available.  


In [102]:
%%spark
import org.apache.spark.ml.regression.RandomForestRegressor
import org.apache.spark.ml.tuning.{ParamGridBuilder, CrossValidator}

import org.apache.spark.ml.evaluation.RegressionEvaluator

import org.apache.spark.ml.{Pipeline, PipelineStage}

val rfModel = new RandomForestRegressor().setLabelCol("count").setFeaturesCol("features")

val paramGrid = new ParamGridBuilder().addGrid(rfModel.maxDepth, Array(5, 10)).addGrid(rfModel.numTrees, Array(20, 60)).build()
// Note, that this parameter grid will take a long time
// to run in the community edition due to limited number
// of workers available! Be patient for it to run!
// If you want it to run faster, remove some of
// the above parameters and it'll speed right up!

val steps:Array[PipelineStage] = Array(rfModel)

val pipeline = new Pipeline().setStages(steps)

// you can feel free to change the number of folds used in cross validation as well
// the estimator can also just be an individual model rather than a pipeline
val cv = new CrossValidator().setEstimator(pipeline).setEstimatorParamMaps(paramGrid).setEvaluator(new RegressionEvaluator().setLabelCol("count"))

val pipelineFitted = cv.fit(training)

import org.apache.spark.ml.regression.RandomForestRegressor
import org.apache.spark.ml.tuning.{ParamGridBuilder, CrossValidator}
import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.{Pipeline, PipelineStage}
rfModel: org.apache.spark.ml.regression.RandomForestRegressor = rfr_6eb6ad236606
paramGrid: Array[org.apache.spark.ml.param.ParamMap] =
Array({
	rfr_6eb6ad236606-maxDepth: 5,
	rfr_6eb6ad236606-numTrees: 20
}, {
	rfr_6eb6ad236606-maxDepth: 5,
	rfr_6eb6ad236606-numTrees: 60
}, {
	rfr_6eb6ad236606-maxDepth: 10,
	rfr_6eb6ad236606-numTrees: 20
}, {
	rfr_6eb6ad236606-maxDepth: 10,
	rfr_6eb6ad236606-numTrees: 60
})
steps: Array[org.apache.spark.ml.PipelineStage] = Array(rfr_6eb6ad236606)
pipeline: org.apache.spark.ml.Pipeline = pipeline_1d5bc027eabe
cv: org.apache.spark.ml.tuning.CrossValidator = cv_03d853461efb
pipelineFitted: org.apache.spark.ml.tuning.CrossValidatorModel = cv_03d853461efb


Now we've trained our model! Let's take a look at which version performed best!

In [105]:
%%spark
println("The Best Parameters:\n--------------------")
println(pipelineFitted.bestModel.asInstanceOf[org.apache.spark.ml.PipelineModel].stages(0))
pipelineFitted.bestModel.asInstanceOf[org.apache.spark.ml.PipelineModel].stages(0).extractParamMap

The Best Parameters:
--------------------
RandomForestRegressionModel (uid=rfr_6eb6ad236606) with 20 trees
res100: org.apache.spark.ml.param.ParamMap =
{
	rfr_6eb6ad236606-cacheNodeIds: false,
	rfr_6eb6ad236606-checkpointInterval: 10,
	rfr_6eb6ad236606-featureSubsetStrategy: auto,
	rfr_6eb6ad236606-featuresCol: features,
	rfr_6eb6ad236606-impurity: variance,
	rfr_6eb6ad236606-labelCol: count,
	rfr_6eb6ad236606-maxBins: 32,
	rfr_6eb6ad236606-maxDepth: 10,
	rfr_6eb6ad236606-maxMemoryInMB: 256,
	rfr_6eb6ad236606-minInfoGain: 0.0,
	rfr_6eb6ad236606-minInstancesPerNode: 1,
	rfr_6eb6ad236606-numTrees: 20,
	rfr_6eb6ad236606-predictionCol: prediction,
	rfr_6eb6ad236606-seed: 235498149,
	rfr_6eb6ad236606-subsamplingRate: 1.0
}


Now let's take a look at our holdout set results.

In [106]:
%%spark
val holdout2 = pipelineFitted.bestModel.transform(test).selectExpr("prediction as raw_prediction", "double(round(prediction)) as prediction", "count", """CASE double(round(prediction)) = count WHEN true then 1 ELSE 0 END as equal""")
holdout2.createOrReplaceTempView("holdout2")

holdout2: org.apache.spark.sql.DataFrame = [raw_prediction: double, prediction: double ... 2 more fields]


In [107]:
%%spark -c sql
select * from holdout2 limit 5

Unnamed: 0,raw_prediction,prediction,count,equal
0,0.257969,0.0,0.0,1
1,1.023895,1.0,0.0,0
2,0.565895,1.0,0.0,0
3,2.083627,2.0,0.0,0
4,1.136485,1.0,1.0,1


As well as our regression metrics on the test set.

In [108]:
%%spark
// have to do a type conversion for RegressionMetrics
val rm2 = new RegressionMetrics(holdout2.select("prediction", "count").rdd.map(x => (x(0).asInstanceOf[Double], x(1).asInstanceOf[Double])))

println("MSE: " + rm2.meanSquaredError)
println("MAE: " + rm2.meanAbsoluteError)
println("RMSE Squared: " + rm2.rootMeanSquaredError)
println("R Squared: " + rm2.r2)
println("Explained Variance: " + rm2.explainedVariance + "\n")

rm2: org.apache.spark.mllib.evaluation.RegressionMetrics = org.apache.spark.mllib.evaluation.RegressionMetrics@4c8974d8
MSE: 5.785632183908046
MAE: 1.1120689655172413
RMSE Squared: 2.40533411066073
R Squared: -1.027230840664191
Explained Variance: 4.8692601400449185



In [109]:
%%spark
val proportion2 = holdout2.selectExpr("sum(equal)/sum(1)")
proportion2.createOrReplaceTempView("proportion2")

proportion2: org.apache.spark.sql.DataFrame = [(sum(equal) / sum(1)): double]


Finally we'll see an improvement in our "exactly right" proportion as well!

In [110]:
%%spark -c sql
select * from proportion2 limit 5

Unnamed: 0,(sum(equal) / sum(1))
0,0.372989
