##### In this notebook we compare how to write the same query in Spark SQL and using Spark DataFrame and also explore Spark Mllib

## The Data

![img](http://training.databricks.com/databricks_guide/USDA_logo.png)

The first of the two datasets that we will be working with is the **Farmers Markets Directory and Geographic Data**. This dataset contains information on the longitude and latitude, state, address, name, and zip code of farmers markets in the United States. The raw data is published by the Department of Agriculture. The version on the data that is found in Databricks (and is used in this tutorial) was updated by the Department of Agriculture on Dec 01, 2015.

![img](http://training.databricks.com/databricks_guide/irs-logo.jpg)

The second dataset we will be working with is the **SOI Tax Stats - Individual Income Tax Statistics - ZIP Code Data (SOI)**. This study provides detailed tabulations of individual income tax return data at the state and ZIP code level and is provided by the IRS. This repository only has a sample of the data: 2013 and includes "AGI". The ZIP Code data shows selected income and tax items classified by State, ZIP Code, and size of adjusted gross income. Data is based on individual income tax returns filed with the IRS and is available for Tax Years 1998, 2001, 2004 through 2013. The data includes items, such as:

- Number of returns, which approximates the number of households
- Number of personal exemptions, which approximates the population
- Adjusted gross income
- Wages and salaries
- Dividends before exclusion
- Interest received

You can learn more about the two datasets on data.gov:

- [Farmer's Market Data](http://catalog.data.gov/dataset/farmers-markets-geographic-data/resource/cca1cc8a-9670-4a27-a8c7-0c0180459bef)
- [Zip Code Data](http://catalog.data.gov/dataset/zip-code-data)

### Getting the Data

As a data scientist, your data is likely going to be living in a place like S3 or Redshift. Apache Spark provides simple and easy connectors to these data sources and Databricks provides simple demonstrations of how to use them. Just search in the Databricks guide (use the `?` at the top left) to see if your data source is available. For the purposes of this tutorial, our files are already available on S3 via `dbfs` or the Databricks file system. [While you're free to upload the csvs made available on data.gov as a table](https://docs.databricks.com/user-guide/tables.html#creating-tables) you can also (more easily) access this data via the `/databricks-datasets` directory which is a repository of public, Databricks-hosted datasets that is available on all Databricks accounts.

In [3]:
taxes2013 = sqlContext\
  .read.format("com.databricks.spark.csv")\
  .option("header", "true")\
  .load("dbfs:/databricks-datasets/data.gov/irs_zip_code_data/data-001/2013_soi_zipcode_agi.csv")

In [4]:
markets = sqlContext\
  .read.format("com.databricks.spark.csv")\
  .option("header", "true")\
  .load("dbfs:/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/market_data.csv")

In [5]:
display(taxes2013)

<img src='screenshots/Screen Shot 2017-11-18 at 11.36.37 PM.png'>

<img src="files/image.png">

In [6]:
display(markets)

<img src="screenshots/Screen Shot 2017-11-18 at 11.40.16 PM.png">

Now that we've loaded in the data - let's go ahead and register the DataFrames as Spark SQL tables.
While this might seem unnecessary, what it's going to allow us to do is to leverage  SQL skills immediately to manipulate the data. Some people prefer working directly with DataFrames while others prefer working in Spark SQL directly. Whatever the case, they take advantage of the same tungsten optimizations under the hood.

In [8]:
taxes2013.registerTempTable("taxes2013")
markets.registerTempTable("markets")

You can see that we are using the registerTempTable/createOrReplaceTempView method call to create this table. The lifetime of this temporary table is tied to the Spark/Spark SQL Context that was used to create this DataFrame. This means when you shutdown the SQLContext that is associated with a cluster (like when you shutdown the cluster) then the temporary table will disappear as well.
Running SQL Commands
As we progress through the notebook, you'll notice that all SQL cells are prefaced with %sql. This tells the spark environment that you'd like to execute an SQL command. You can do the same with python and R.
In order to list the tables, we can show them very easily by simply executing show tables. You'll see that this also provides information about their lifetime (and whether or not they are temporary or not).

In [10]:
%sql show tables

<img src="screenshots/Screen Shot 2017-11-18 at 11.40.29 PM.png">

We can see that we've got a variety of columns that you might want to look into further however for the purpose of this analysis I'm only going to look at a very small subset. I'm also going to perform two small manipulations to this data:
I'm going to do some simple type conversions and rename the columns to something a bit more semantic so that it's easier to talk about them going forward.
I'm also going to shorten each zip code to be four digits instead of 5. This will make it so that we look a bit more at the general location around a zip code as opposed to a very specific one. This is an imprecise overall process, but for the purpose of this example works just fine.

In [12]:
%sql 
DROP TABLE IF EXISTS cleaned_taxes;

CREATE TABLE cleaned_taxes AS
SELECT state, int(zipcode / 10) as zipcode, 
  int(mars1) as single_returns, 
  int(mars2) as joint_returns, 
  int(numdep) as numdep, 
  double(A02650) as total_income_amount,
  double(A00300) as taxable_interest_amount,
  double(a01000) as net_capital_gains,
  double(a00900) as biz_net_income
FROM taxes2013

In [13]:
# Import this SQL Table back into an RDD
cleanedTaxes = sqlContext.table("cleaned_taxes")

In [14]:
display(cleanedTaxes.describe())

<img src="screenshots/Screen Shot 2017-11-18 at 11.40.39 PM.png">

Let's look at the set of zip codes with the lowest total capital gains and plot the results. You can see that we're able to use simple expressive SQL to achieve these results in a very straightforward manner as well as some familiar DataFrame manipulations available in R and Python.

In [16]:
%sql
SELECT zipcode, SUM(net_capital_gains) AS cap_gains
FROM cleaned_taxes 
  WHERE NOT (zipcode = 0000 OR zipcode = 9999)
GROUP BY zipcode
ORDER BY cap_gains ASC
LIMIT 10

<img src = "screenshots/Screen Shot 2017-11-18 at 11.40.49 PM.png">

In [17]:
# The equivalent of this using Spark DataFrame would be the following
from pyspark.sql.functions import col
cleanedTaxes.groupBy("zipcode") \
.sum("net_capital_gains")\
.filter(col("zipcode").isin(9999,0)==False)\
.withColumnRenamed("SUM(net_capital_gains)", "cap_gains")\
.orderBy("cap_gains",ascending=True).show(10)

Next, let's look at a combination of capital gains and business net income to see what we find. It's worth stressing again how simple it is to iteratively build up these queries with Spark SQL as well - it's just so simple!
In the below query, I've built this combo metric that represents the total capital gains and business net income by zip code. This is weighted very strongly by capital gains as we can see in the plot.

In [19]:
%sql
SELECT zipcode, 
  SUM(biz_net_income) as business_net_income, 
  SUM(net_capital_gains) as capital_gains, 
  SUM(net_capital_gains) + SUM(biz_net_income) as capital_and_business_income
FROM cleaned_taxes 
  WHERE NOT (zipcode = 0000 OR zipcode = 9999)
GROUP BY zipcode
ORDER BY capital_and_business_income DESC
LIMIT 50

<img src="screenshots/Screen Shot 2017-11-18 at 11.44.23 PM.png">

In [20]:
# The equivalent of this using Spark DataFrame would be the following
from pyspark.sql.types import FloatType

# Create a UDF which sums up two columns
function = udf(lambda col1, col2 : col1+col2, FloatType())

cleanedTaxes.groupBy("zipcode")\
.sum("biz_net_income","net_capital_gains")\
.filter(col("zipcode").isin(9999,0)==False)\
.withColumn('capital_and_business_income',function(col('sum(biz_net_income)'), col('sum(net_capital_gains)')))\
.withColumnRenamed("sum(biz_net_income)","business_net_income")\
.withColumnRenamed("sum(net_capital_gains)","capital_gains")\
.orderBy("capital_and_business_income",ascending=False)\
.show(5)

<img src="screenshots/Screen Shot 2017-11-18 at 11.44.30 PM.png">

##### We can also get a peak at what will happen when we use the EXPLAIN keyword in SQL.

In [22]:
%sql
EXPLAIN 
  SELECT zipcode, 
    SUM(biz_net_income) as net_income, 
    SUM(net_capital_gains) as cap_gains, 
    SUM(net_capital_gains) + SUM(biz_net_income) as combo
  FROM cleaned_taxes 
  WHERE NOT (zipcode = 0000 OR zipcode = 9999)
  GROUP BY zipcode
  ORDER BY combo desc
  limit 50

<img src="screenshots/Screen Shot 2017-11-18 at 11.44.39 PM.png">

One thing that is great about Apache Spark is that out of the box it can store and access tables in memory. All that we need to do is to cache the data to do so. We can either do this directly in SQL (at which point the cache will be done eagerly or right away), or we can do it through the sqlContext with the cacheTable method which will be performed lazily.

In [24]:
sqlContext.cacheTable("cleaned_taxes")

Now that we've spent some time exploring the IRS data - let's take a moment to look at the Farmer's Market Data. We'll start off with a total summation of farmer's markets per state. You'll notice that I'm not using SQL at this time. While we can certainly query the SQL table, it's worth showing that all the functions available in SQL are also available directly on a DataFrame.

In [26]:
display(markets.groupBy("State").count())

<img src="screenshots/Screen Shot 2017-11-18 at 11.44.47 PM.png">

## Using Spark Mllib

Let's go ahead and prep the data for use in Apache Spark MLLib. Apache Spark MLLib has some specific requirements about how inputs are structured. Firstly, input data has to be numeric unless you're performing a transformation inside of a data pipeline. What this means for you as a user is that Apache Spark won't automatically convert string to categories for instance, instead the output will be a Double type. Let's go ahead and prepare our data so that it meets those requirements as well as joining together our input data with the target variable - the number of farmer's markets in a given zipcode.

In [29]:
cleanedTaxes = sqlContext.sql("SELECT * FROM cleaned_taxes")

summedTaxes = cleanedTaxes\
  .groupBy("zipcode")\
  .sum() 

cleanedMarkets = markets\
  .selectExpr("*", "int(zip / 10) as zipcode")\
  .groupBy("zipcode")\
  .count()\
  .selectExpr("double(count) as count", "zipcode as zip")
# selectExpr is short for Select Expression - equivalent to what we
# might be doing in SQL SELECT expression

joined = cleanedMarkets.join(summedTaxes, cleanedMarkets.zip== summedTaxes.zipcode, "outer")

Now that we've joined our tax data to our output variable, we're going to have to do a final bit of cleanup before we can input this data into Spark MLLib. For example, when we go to display our joined data, we're going to have null values.

In [31]:
display(joined)

<img src="screenshots/Screen Shot 2017-11-18 at 11.44.58 PM.png">

Currently Apache Spark MLLib doesn't allow us to enter in null values (nor would it make sense to leave them out). Therefore we're going to replace them with 0's. Luckily, DataFrames make it easy to work with null data under the .na prefix as you'll see below. You can see all of the null functions in the API documentation. These should be very familiar and similar to what you might find in pandas or in R DataFrames

In [33]:
prepped = joined.na.fill(0)
display(prepped)

<img src="screenshots/Screen Shot 2017-11-18 at 11.45.06 PM.png">

Now that all of our data is prepped. We're going to have to put all of it into one column of a vector type for Spark MLLib. This makes it easy to embed a prediction right in a DataFrame and also makes it very clear as to what is getting passed into the model and what isn't without having to convert it to a numpy array or specify an R formula. This also makes it easy to incrementally add new features, simply by adding to the vector. In the below case rather than specifically adding them in, I'm going to create an exclusionary group and just remove what is NOT a feature.

In [35]:
nonFeatureCols = ["zip", "zipcode", "count"]
featureCols = set(prepped.columns) - set(nonFeatureCols)

In [36]:
featureCols

<img src="screenshots/Screen Shot 2017-11-18 at 11.45.16 PM.png">

Now I'm going to use the VectorAssembler in Apache Spark to Assemble all of these columns into one single vector. To do this I'll have to set the input columns and output column. Then I'll use that assembler to transform the prepped data to my final dataset.

In [38]:
from pyspark.ml.feature import VectorAssembler

assembler =  VectorAssembler(inputCols=list(featureCols),outputCol="features")
finalPrep = assembler.transform(prepped)

In [39]:
finalPrep.take(1)

<img src="screenshots/Screen Shot 2017-11-18 at 11.45.26 PM.png">

In [40]:
display(finalPrep.drop("zip").drop("zipcode").drop("features"))

<img src="screenshots/Screen Shot 2017-11-18 at 11.45.43 PM.png">

Now in order to follow best practices, I'm going to perform a random split of 70-30 on the dataset for training and testing purposes. This can be used to create a validation set as well however this tutorial will omit doing so. It's worth noting that MLLib also supports performing hyperparameter tuning with cross validation and pipelines. All this can be found in the Databrick's Guide.

In [42]:
training, test = finalPrep.randomSplit((0.7, 0.3))

# Going to cache the data to make sure things stay snappy!
training.cache()
test.cache()

print(training.count())
print(test.count())

<img src="screenshots/Screen Shot 2017-11-18 at 11.45.52 PM.png">

Now we're going to get into the core of Apache Spark MLLib. At a high level, we're going to create an instance of a regressor or classifier, that in turn will then be trained and return a Model type

In [44]:
from pyspark.ml.regression import LinearRegression

lrModel =LinearRegression()\
  .setLabelCol("count")\
  .setFeaturesCol("features")\
  .setElasticNetParam(0.5)

print("Printing out the model Parameters:")
print("-"*20)
print(lrModel.explainParams())
print("-"*20)

<img src="screenshots/Screen Shot 2017-11-18 at 11.45.59 PM.png">

Now finally we can go about fitting our model! You'll see that we're going to do this in a series of steps. First we'll fit it, then we'll use it to make predictions via the transform method. This is the same way you would make predictions with your model in the future however in this case we're using it to evaluate how our model is doing. We'll be using regression metrics to get some idea of how our model is performing, we'll then print out those values to be able to evaluate how it performs.

In [46]:
from pyspark.ml.evaluation import RegressionEvaluator
lrFitted = lrModel.fit(training)

In [47]:
holdout = lrFitted\
  .transform(test)\
  .selectExpr("prediction as raw_prediction", \
    "double(round(prediction)) as prediction", \
    "count", \
    """CASE double(round(prediction)) = count 
  WHEN true then 1
  ELSE 0
END as equal""")
display(holdout)

<img src="screenshots/Screen Shot 2017-11-18 at 11.46.06 PM.png">

In [48]:
# have to do a type conversion for RegressionMetrics
from pyspark.mllib.evaluation import RegressionMetrics

rm1 = RegressionMetrics(holdout.select("prediction", "count").map(lambda line: [float(x) for x in line]))

print ("MSE: " + str(rm1.meanSquaredError))
print ("MAE: " + str(rm1.meanAbsoluteError))
print ("RMSE Squared: " + str(rm1.rootMeanSquaredError))
print ("R Squared: " + str(rm1.r2))
print ("Explained Variance: " + str(rm1.explainedVariance) + "\n")

<img src="screenshots/Screen Shot 2017-11-18 at 11.46.13 PM.png">

I found these results to be sub-optimal, so let's try exploring another way to train the model. Rather than training on a single model with hard-coded parameters, let's train using a pipeline.
A pipeline is going to give us some nice benefits in that it will allow us to use a couple of transformations we need in order to transform our raw data into the prepared data for the model but also it provides a simple, straightforward way to try out a lot of different combinations of parameters. This is a process called hyperparameter tuning or grid search. To review, grid search is where you set up the exact parameters that you would like to test and MLLib will automatically create all the necessary combinations of these to test.
For example, below we'll set numTrees to 20 and 60 and maxDepth to 5 and 10. The parameter grid builder will automatically construct all the combinations of these two variable (along with the other ones that we might specify too). Additionally we're also going to use cross validation) to tune our hyperparameters, this will allow us to attempt to try to control overfitting of our model.
Lastly we'll need to set up a Regression Evaluator that will evaluate the models that we choose based on some metric (the default is RMSE). The key take away is that the pipeline will automatically optimize for our given metric choice by exploring the parameter grid that we set up rather than us having to do it manually like we would have had to do above.
Now we can go about training our random forest!
note: this might take a little while because of the number of combinations that we're trying and limitations in workers available.

In [50]:
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.tuning import ParamGridBuilder,CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline

rfModel = RandomForestRegressor().setLabelCol("count").setFeaturesCol("features")

paramGrid = ParamGridBuilder()\
  .addGrid(rfModel.maxDepth, [5, 10])\
  .addGrid(rfModel.numTrees, [20, 60])\
  .build()

pipeline = Pipeline(stages=[rfModel])

cv = CrossValidator() \
  .setEstimator(pipeline) \
  .setEstimatorParamMaps(paramGrid) \
  .setEvaluator(RegressionEvaluator().setLabelCol("count"))

# Fit the pipeline to training documents.
model = cv.fit(training)

In [51]:
model.bestModel.stages[0]

In [52]:
from pyspark.ml import PipelineModel
print ("The Best Parameters:\n--------------------")
print model.bestModel.stages[0]
model\
  .bestModel\
  .stages[0]\
  .extractParamMap

<img src= "screenshots/Screen Shot 2017-11-18 at 11.46.24 PM.png">

In [53]:
print("The Best Parameters:\n--------------------")
holdout2 = model.bestModel \
  .transform(test)\
  .selectExpr("prediction as raw_prediction", 
    "double(round(prediction)) as prediction", 
    "count", 
    """CASE double(round(prediction)) = count 
  WHEN true then 1
  ELSE 0
END as equal""")
display(holdout2.limit(5))

<img src="screenshots/Screen Shot 2017-11-18 at 11.46.31 PM.png">

In [55]:
# have to do a type conversion for RegressionMetrics
from pyspark.mllib.evaluation import RegressionMetrics

rm2 = RegressionMetrics(holdout2.select("prediction", "count").map(lambda line: [float(x) for x in line]))

print ("MSE: " + str(rm2.meanSquaredError))
print ("MAE: " + str(rm2.meanAbsoluteError))
print ("RMSE Squared: " + str(rm2.rootMeanSquaredError))
print ("R Squared: " + str(rm2.r2))
print ("Explained Variance: " + str(rm2.explainedVariance) + "\n")

<img src="screenshots/Screen Shot 2017-11-18 at 11.46.38 PM.png">