*** SPARK SQL ***

## The Data

![img](http://training.databricks.com/databricks_guide/USDA_logo.png)

The first of the two datasets that we will be working with is the **Farmers Markets Directory and Geographic Data**. This dataset contains information on the longitude and latitude, state, address, name, and zip code of farmers markets in the United States. The raw data is published by the Department of Agriculture. The version on the data that is found in Databricks (and is used in this tutorial) was updated by the Department of Agriculture on Dec 01, 2015.

![img](http://training.databricks.com/databricks_guide/irs-logo.jpg)

The second dataset we will be working with is the **SOI Tax Stats - Individual Income Tax Statistics - ZIP Code Data (SOI)**. This study provides detailed tabulations of individual income tax return data at the state and ZIP code level and is provided by the IRS. This repository only has a sample of the data: 2013 and includes "AGI". The ZIP Code data shows selected income and tax items classified by State, ZIP Code, and size of adjusted gross income. Data is based on individual income tax returns filed with the IRS and is available for Tax Years 1998, 2001, 2004 through 2013.


In [0]:
# Read The data
taxes2013 = (spark.read
  .option("header", "true")
  .csv("dbfs:/databricks-datasets/data.gov/irs_zip_code_data/data-001/2013_soi_zipcode_agi.csv"))

markets = (spark.read
  .option("header", "true")
  .csv("dbfs:/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/market_data.csv"))

In [0]:
# Register spark SQL tables

taxes2013.createOrReplaceTempView("taxes2013")
markets.createOrReplaceTempView("markets")

In [0]:
%sql 
DROP TABLE IF EXISTS cleaned_taxes;

CREATE TABLE cleaned_taxes AS
SELECT state, int(zipcode / 10) as zipcode, 
  int(mars1) as single_returns, 
  int(mars2) as joint_returns, 
  int(numdep) as numdep, 
  double(A02650) as total_income_amount,
  double(A00300) as taxable_interest_amount,
  double(a01000) as net_capital_gains,
  double(a00900) as biz_net_income
FROM taxes2013

In [0]:
sqlContext.cacheTable("cleaned_taxes")

# Convert back to a dataset from a table
cleanedTaxes = spark.sql("SELECT * FROM cleaned_taxes")

summedTaxes = cleanedTaxes.groupBy("zipcode").sum() # because of AGI, where groups income groups are broken out 

cleanedMarkets = (markets
  .selectExpr("*", "int(zip / 10) as zipcode")
  .groupBy("zipcode")
  .count()
  .selectExpr("double(count) as count", "zipcode as zip"))
#  selectExpr is short for Select Expression - equivalent to what we
#  might be doing in SQL SELECT expression

joined = (cleanedMarkets.join(summedTaxes, cleanedMarkets.zip == summedTaxes.zipcode, "outer"))

In [0]:
display(cleanedMarkets)

count,zip
5.0,4900.0
2.0,7240.0
8.0,4818.0
1.0,9852.0
2.0,5300.0
5.0,2122.0
2.0,9900.0
1.0,8592.0
1.0,1580.0
1.0,3175.0


In [0]:
display(joined)

count,zip,zipcode,sum(zipcode),sum(single_returns),sum(joint_returns),sum(numdep),sum(total_income_amount),sum(taxable_interest_amount),sum(net_capital_gains),sum(biz_net_income)
,,463.0,8334.0,930.0,840.0,980.0,82700.0,460.0,825.0,4409.0
2.0,496.0,496.0,17856.0,3290.0,3250.0,4150.0,357071.0,1540.0,5704.0,10969.0
,,833.0,9996.0,14000.0,9490.0,19940.0,1497553.0,7747.0,12788.0,32485.0
1.0,1342.0,1342.0,40260.0,4800.0,3680.0,5450.0,475283.0,2919.0,6476.0,13496.0
1.0,1580.0,1580.0,9480.0,4600.0,3940.0,4650.0,516516.0,4243.0,10827.0,22441.0
5.0,2122.0,2122.0,127320.0,94310.0,46450.0,120950.0,8381305.0,32598.0,71136.0,202711.0
,,2366.0,99372.0,31870.0,23150.0,43120.0,3395514.0,15435.0,40289.0,46188.0
,,2659.0,31908.0,370.0,450.0,570.0,45116.0,97.0,-20.0,596.0
,,2866.0,85980.0,1320.0,1720.0,2230.0,147989.0,774.0,2556.0,3534.0
1.0,3175.0,3175.0,57150.0,4250.0,4440.0,9490.0,495720.0,4784.0,7824.0,14127.0


deal with na values

In [0]:
prepped = joined.na.fill(0)
display(prepped)

count,zip,zipcode,sum(zipcode),sum(single_returns),sum(joint_returns),sum(numdep),sum(total_income_amount),sum(taxable_interest_amount),sum(net_capital_gains),sum(biz_net_income)
0.0,0,463,8334,930,840,980,82700.0,460.0,825.0,4409.0
2.0,496,496,17856,3290,3250,4150,357071.0,1540.0,5704.0,10969.0
0.0,0,833,9996,14000,9490,19940,1497553.0,7747.0,12788.0,32485.0
1.0,1342,1342,40260,4800,3680,5450,475283.0,2919.0,6476.0,13496.0
1.0,1580,1580,9480,4600,3940,4650,516516.0,4243.0,10827.0,22441.0
5.0,2122,2122,127320,94310,46450,120950,8381305.0,32598.0,71136.0,202711.0
0.0,0,2366,99372,31870,23150,43120,3395514.0,15435.0,40289.0,46188.0
0.0,0,2659,31908,370,450,570,45116.0,97.0,-20.0,596.0
0.0,0,2866,85980,1320,1720,2230,147989.0,774.0,2556.0,3534.0
1.0,3175,3175,57150,4250,4440,9490,495720.0,4784.0,7824.0,14127.0


In [0]:
display(prepped)

count,zip,zipcode,sum(zipcode),sum(single_returns),sum(joint_returns),sum(numdep),sum(total_income_amount),sum(taxable_interest_amount),sum(net_capital_gains),sum(biz_net_income)
0.0,0,463,8334,930,840,980,82700.0,460.0,825.0,4409.0
2.0,496,496,17856,3290,3250,4150,357071.0,1540.0,5704.0,10969.0
0.0,0,833,9996,14000,9490,19940,1497553.0,7747.0,12788.0,32485.0
1.0,1342,1342,40260,4800,3680,5450,475283.0,2919.0,6476.0,13496.0
1.0,1580,1580,9480,4600,3940,4650,516516.0,4243.0,10827.0,22441.0
5.0,2122,2122,127320,94310,46450,120950,8381305.0,32598.0,71136.0,202711.0
0.0,0,2366,99372,31870,23150,43120,3395514.0,15435.0,40289.0,46188.0
0.0,0,2659,31908,370,450,570,45116.0,97.0,-20.0,596.0
0.0,0,2866,85980,1320,1720,2230,147989.0,774.0,2556.0,3534.0
1.0,3175,3175,57150,4250,4440,9490,495720.0,4784.0,7824.0,14127.0


Now that all of our data is prepped. We're going to have to put all of it into one column of a vector type for Spark MLLib. This makes it easy to embed a prediction right in a DataFrame and also makes it very clear as to what is getting passed into the model and what isn't without having to convert it to a numpy array or specify an R formula. This also makes it easy to incrementally add new features, simply by adding to the vector. In the below case rather than specifically adding them in.

In [0]:
nonFeatureCols = ["zip", "zipcode", "count"]
featureCols = [item for item in prepped.columns if item not in nonFeatureCols]

In [0]:
# VectorAssembler Assembles all of these columns into one single vector. To do this, set the input columns and output column. Then that assembler will be used to transform the prepped data to the final dataset.
from pyspark.ml.feature import VectorAssembler

assembler = (VectorAssembler()
  .setInputCols(featureCols)
  .setOutputCol("features"))

finalPrep = assembler.transform(prepped)

Now split the dataset 70-30 for training and testing purposes.A validation set can be created as well, we are omitting it here. It's worth noting that MLLib also supports performing hyperparameter tuning with cross validation and pipelines. All this can be found in [the Databrick's Guide](https://docs.databricks.com).

In [0]:
training, test = finalPrep.randomSplit([0.7, 0.3])

#  Going to cache the data to make sure things stay snappy!
training.cache()
test.cache()

print(training.count()) # Why execute count here??
print(test.count())

# Apache Spark MLLib

At a high level, we're going to create an instance of a `regressor` or `classifier`, that in turn will then be trained and return a `Model` type. Whenever you access Spark MLLib you should be sure to import/train on the name of the algorithm you want as opposed to the `Model` type. For example:

You should import:

`org.apache.spark.ml.regression.LinearRegression`

as opposed to:

`org.apache.spark.ml.regression.LinearRegressionModel`

In the below example, we're going to use linear regression.

The linear regression that is available in Spark MLLib supports an elastic net parameter allowing you to set a threshold of how much you would like to mix l1 and l2 regularization, for [more information on Elastic net regularization see Wikipedia](https://en.wikipedia.org/wiki/Elastic_net_regularization).

As we saw above, we had to perform some preparation of the data before inputting it into the model. We've got to do the same with the model itself. We'll set our hyper parameters, print them out and then finally we can train it! The `explainParams` is a great way to ensure that you're taking advantage of all the different hyperparameters that you have available.

In [0]:
from pyspark.ml.regression import LinearRegression

lrModel = (LinearRegression()
  .setLabelCol("count")
  .setFeaturesCol("features")
  .setElasticNetParam(0.5))

print("Printing out the model Parameters:")
print("-"*20)
print(lrModel.explainParams())
print("-"*20)

Now finally we can go about fitting our model! You'll see that we're going to do this in a series of steps. First we'll fit it, then we'll use it to make predictions via the `transform` method. This is the same way you would make predictions with your model in the future however in this case we're using it to evaluate how our model is doing. We'll be using regression metrics to get some idea of how our model is performing, we'll then print out those values to be able to evaluate how it performs.

In [0]:
from pyspark.mllib.evaluation import RegressionMetrics
lrFitted = lrModel.fit(training)

In [0]:
%fs ls /databricks-datasets/songs/data-001/

path,name,size
dbfs:/databricks-datasets/songs/data-001/header.txt,header.txt,377
dbfs:/databricks-datasets/songs/data-001/part-00000,part-00000,52837
dbfs:/databricks-datasets/songs/data-001/part-00001,part-00001,52469
dbfs:/databricks-datasets/songs/data-001/part-00002,part-00002,51778
dbfs:/databricks-datasets/songs/data-001/part-00003,part-00003,50551
dbfs:/databricks-datasets/songs/data-001/part-00004,part-00004,53449
dbfs:/databricks-datasets/songs/data-001/part-00005,part-00005,53301
dbfs:/databricks-datasets/songs/data-001/part-00006,part-00006,54184
dbfs:/databricks-datasets/songs/data-001/part-00007,part-00007,50924
dbfs:/databricks-datasets/songs/data-001/part-00008,part-00008,52533


Now you'll see that since we're working with exact numbers (you can't have 1/2 a farmer's market for example), I'm going to check equality by first rounding the value to the nearest digital value.

In [0]:
holdout = (lrFitted
  .transform(test)
  .selectExpr("prediction as raw_prediction", 
    "double(round(prediction)) as prediction", 
    "count", 
    """CASE double(round(prediction)) = count 
  WHEN true then 1
  ELSE 0
END as equal"""))

display(holdout)

raw_prediction,prediction,count,equal
1.6547100704051516,2.0,0.0,0
1.634397496405839,2.0,0.0,0
0.2863834118047603,0.0,0.0,1
1.4981691136975046,1.0,1.0,1
0.6075548815127711,1.0,2.0,0
1.4302627778456922,1.0,2.0,0
0.687945202836624,1.0,2.0,0
1.651330441053325,2.0,1.0,0
1.228177051347341,1.0,1.0,1
1.1949249860965696,1.0,1.0,1


Now let's see what proportion was exactly correct.

In [0]:
display(holdout.selectExpr("sum(equal)/sum(1)"))

(sum(equal) / sum(1))
0.2089994079336885


In [0]:
# have to do a type conversion for RegressionMetrics
rm = RegressionMetrics(holdout.select("prediction", "count").rdd.map(lambda x:  (x[0], x[1])))

print("MSE: ", rm.meanSquaredError)
print("MAE: ", rm.meanAbsoluteError)
print("RMSE Squared: ", rm.rootMeanSquaredError)
print("R Squared: ", rm.r2)
print("Explained Variance: ", rm.explainedVariance, "\n")

These results appear to be sub-optimal, so let's try exploring another way to train the model. Rather than training on a single model with hard-coded parameters, let's train using a [pipeline](http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.ml.Pipeline). 

A pipeline is going to give us some nice benefits in that it will allow us to use a couple of transformations we need in order to transform our raw data into the prepared data for the model but also it provides a simple, straightforward way to try out a lot of different combinations of parameters. This is a process called [hyperparameter tuning](https://en.wikipedia.org/wiki/Hyperparameter_optimization) or grid search. To review, grid search is where you set up the exact parameters that you would like to test and MLLib will automatically create all the necessary combinations of these to test.

For example, below we'll set `numTrees` to 20 and 60 and `maxDepth` to 5 and 10. The parameter grid builder will automatically construct all the combinations of these two variable (along with the other ones that we might specify too). Additionally we're also going to use [cross validation](https://en.wikipedia.org/wiki/Cross-validation_(statistics)) to tune our hyperparameters, this will allow us to attempt to try to control [overfitting](https://en.wikipedia.org/wiki/Overfitting) of our model.

Lastly we'll need to set up a [Regression Evaluator](http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.ml.evaluation.RegressionEvaluator) that will evaluate the models that we choose based on some metric (the default is RMSE). The key take away is that the pipeline will automatically optimize for our given metric choice by exploring the parameter grid that we set up rather than us having to do it manually like we would have had to do above.

Now we can go about training our random forest! 

*note: this might take a little while because of the number of combinations that we're trying and limitations in workers available.*

In [0]:
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator

from pyspark.ml import Pipeline

rfModel = (RandomForestRegressor()
  .setLabelCol("count")
  .setFeaturesCol("features"))

paramGrid = (ParamGridBuilder()
  .addGrid(rfModel.maxDepth, [5, 10])
  .addGrid(rfModel.numTrees, [20, 60])
  .build())
# Note, that this parameter grid will take a long time
# to run in the community edition due to limited number
# of workers available! Be patient for it to run!
# If you want it to run faster, remove some of
# the above parameters and it'll speed right up!

stages = [rfModel]

pipeline = Pipeline().setStages(stages)

cv = (CrossValidator() # you can feel free to change the number of folds used in cross validation as well
  .setEstimator(pipeline) # the estimator can also just be an individual model rather than a pipeline
  .setEstimatorParamMaps(paramGrid)
  .setEvaluator(RegressionEvaluator().setLabelCol("count")))

pipelineFitted = cv.fit(training)

Now we've trained our model! Let's take a look at which version performed best!

In [0]:
print("The Best Parameters:\n--------------------")
print(pipelineFitted.bestModel.stages[0])
pipelineFitted.bestModel.stages[0].extractParamMap()

As well as our regression metrics on the test set.

In [0]:
%fs


In [0]:
pipelineFitted.bestModel

In [0]:
holdout2 = (pipelineFitted.bestModel
  .transform(test)
  .selectExpr("prediction as raw_prediction", 
    "double(round(prediction)) as prediction", 
    "count", 
    """CASE double(round(prediction)) = count 
  WHEN true then 1
  ELSE 0
END as equal"""))
  
display(holdout2)

raw_prediction,prediction,count,equal
0.1856122458574,0.0,0.0,1
0.2291585851628172,0.0,0.0,1
2.370338573362336,2.0,0.0,0
1.2738122164564702,1.0,1.0,1
2.3919763787678687,2.0,2.0,1
0.8689785710785228,1.0,2.0,0
1.6913496178618903,2.0,2.0,1
1.5865049076144575,2.0,1.0,0
0.8300741629889465,1.0,1.0,1
0.9659730722002748,1.0,1.0,1


In [0]:
rm2 = RegressionMetrics(holdout2.select("prediction", "count").rdd.map(lambda x:  (x[0], x[1])))

print("MSE: ", rm2.meanSquaredError)
print("MAE: ", rm2.meanAbsoluteError)
print("RMSE Squared: ", rm2.rootMeanSquaredError)
print("R Squared: ", rm2.r2)
print("Explained Variance: ", rm2.explainedVariance, "\n")

Finally we'll see an improvement in our "exactly right" proportion as well!

In [0]:
display(holdout2.selectExpr("sum(equal)/sum(1)"))

(sum(equal) / sum(1))
0.3771462403789224
