# Intro to Apache Spark

* [Intro to Spark slides](https://docs.google.com/presentation/d/125HFklgtvTbutLVfIilLevguw6z_v2RC4_MBR8dU0L0/edit?usp=sharing)
* What is a Spark DataFrame?
  * Read in the [NYT data set](https://github.com/nytimes/covid-19-data) 
* How to perform a distributed count?
* Transformations vs. Actions
* Spark SQL

[Spark docs](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html)

In [2]:
%fs ls databricks-datasets/COVID/covid-19-data/

path,name,size
dbfs:/databricks-datasets/COVID/covid-19-data/.git/,.git/,0
dbfs:/databricks-datasets/COVID/covid-19-data/LICENSE,LICENSE,1289
dbfs:/databricks-datasets/COVID/covid-19-data/NYT-readme.md,NYT-readme.md,1748
dbfs:/databricks-datasets/COVID/covid-19-data/README.md,README.md,11011
dbfs:/databricks-datasets/COVID/covid-19-data/us-counties.csv,us-counties.csv,2313707
dbfs:/databricks-datasets/COVID/covid-19-data/us-states.csv,us-states.csv,71682
dbfs:/databricks-datasets/COVID/covid-19-data/us.csv,us.csv,1592


## How do we represent this data?

![Unified Engine](https://files.training.databricks.com/images/105/unified-engine.png)


####At first there were RDDs...
* **R**esilient: Fault-tolerant
* **D**istributed: Across multiple nodes
* **D**ataset: Collection of partitioned data

RDDs are immutable once created and keep track of their lineage to enable failure recovery.

####... and then there were Dataframes
* Higher-level APIs
* User friendly
* Optimizations and performance improvements

![RDD vs DataFrames](https://files.training.databricks.com/images/105/rdd-vs-dataframes.png)

###Create a Dataframe from the NYT COVID data

In [6]:
covid_df = spark.read.csv("databricks-datasets/COVID/covid-19-data/us-counties.csv", header=True, inferSchema=True)
covid_df.show()

###How many records do we have?
* Instead of counting M&Ms, let's count the number of rows in the Dataframe

###What do we expect our Spark job to look like?
* How many stages?

In [8]:
covid_df.count()

### Let's write some Spark code!

* I want to look at only the information for the county I live in (Los Angeles)
* I want the most recent information at the top

In [10]:
(covid_df
 .sort(covid_df["date"].desc()) 
 .filter(covid_df["county"] == "Los Angeles")) 

**...nothing happened. Why?**

## Transformations vs Actions

There are two types of operations in Spark: transformations and actions.

Fundamental to Apache Spark are the notions that
* Transformations are **LAZY**
* Actions are **EAGER**

In [13]:
# same operations as above
(covid_df
 .sort(covid_df["date"].desc()) 
 .filter(covid_df["county"] == "Los Angeles")) 

Why isn't is showing me results? **Sort** and **filter** are `transformations`, which are lazily evaluated in Spark.

Laziness has a number of benefits
* Not forced to load all data in the first step
  * Technically impossible with **REALLY** large datasets.
* Easier to parallelize operations 
  * N different transformations can be processed on a single data element, on a single thread, on a single machine. 
* Most importantly, it allows the framework to automatically apply various optimizations
  * This is also why we use Dataframes!
  
There's a lot Spark's **Catalyst** optimizer can do. Let's focus on only this situation. For more information, read [this blog!](https://databricks.com/blog/2015/04/13/deep-dive-into-spark-sqls-catalyst-optimizer.html)
  
![Catalyst](https://files.training.databricks.com/images/105/catalyst-diagram.png)

In [15]:
(covid_df
 .sort(covid_df["date"].desc()) 
 .filter(covid_df["county"] == "Los Angeles") 
 .show())  #action!

###We can see the optimizations in action!
* Go to the Spark UI
* Click on the SQL query associated with your Spark job
* See the logical and physical plans!
  * The filter and sort have been swapped

## Spark SQL

In [18]:
covid_df.createOrReplaceTempView("covid")

In [19]:
%sql

SELECT * 
FROM covid

-- keys = date, grouping = county, values = cases

date,county,state,fips,cases,deaths
2020-01-21T00:00:00.000+0000,Snohomish,Washington,53061.0,1,0
2020-01-22T00:00:00.000+0000,Snohomish,Washington,53061.0,1,0
2020-01-23T00:00:00.000+0000,Snohomish,Washington,53061.0,1,0
2020-01-24T00:00:00.000+0000,Cook,Illinois,17031.0,1,0
2020-01-24T00:00:00.000+0000,Snohomish,Washington,53061.0,1,0
2020-01-25T00:00:00.000+0000,Orange,California,6059.0,1,0
2020-01-25T00:00:00.000+0000,Cook,Illinois,17031.0,1,0
2020-01-25T00:00:00.000+0000,Snohomish,Washington,53061.0,1,0
2020-01-26T00:00:00.000+0000,Maricopa,Arizona,4013.0,1,0
2020-01-26T00:00:00.000+0000,Los Angeles,California,6037.0,1,0


In [20]:
%sql

SELECT * 
FROM covid 
WHERE county = "Los Angeles"

-- keys = date, grouping = county, values = cases, deaths

date,county,state,fips,cases,deaths
2020-01-26T00:00:00.000+0000,Los Angeles,California,6037,1,0
2020-01-27T00:00:00.000+0000,Los Angeles,California,6037,1,0
2020-01-28T00:00:00.000+0000,Los Angeles,California,6037,1,0
2020-01-29T00:00:00.000+0000,Los Angeles,California,6037,1,0
2020-01-30T00:00:00.000+0000,Los Angeles,California,6037,1,0
2020-01-31T00:00:00.000+0000,Los Angeles,California,6037,1,0
2020-02-01T00:00:00.000+0000,Los Angeles,California,6037,1,0
2020-02-02T00:00:00.000+0000,Los Angeles,California,6037,1,0
2020-02-03T00:00:00.000+0000,Los Angeles,California,6037,1,0
2020-02-04T00:00:00.000+0000,Los Angeles,California,6037,1,0


In [21]:
%sql

SELECT max(cases) AS max_cases, max(deaths) AS max_deaths, county 
FROM covid 
GROUP BY county 
ORDER BY max_cases DESC
LIMIT 10

max_cases,max_deaths,county
110465,7690,New York City
25250,1217,Nassau
22462,617,Suffolk
20191,654,Westchester
16323,577,Cook
12209,820,Wayne
10426,550,Bergen
10047,360,Los Angeles
8335,192,Rockland
8242,277,Hudson


###Try your own analysis!
* Here's an idea to get you started
* There's a lot more examples [here](https://databricks.com/blog/2020/04/14/covid-19-datasets-now-available-on-databricks.html)

**This is census data taken from census.gov**
* It has enough information to be able to construct a fips code that will correspond the the NYT data

In [24]:
%sh wget https://web.archive.org/web/20160110113240/http://www.census.gov/popest/data/counties/totals/2014/files/CO-EST2014-alldata.csv && cp CO-EST2014-alldata.csv /dbfs/tmp

In [25]:
census_df = spark.read.csv("dbfs:/tmp/CO-EST2014-alldata.csv", header=True, inferSchema=True)
display(census_df)

SUMLEV,REGION,DIVISION,STATE,COUNTY,STNAME,CTYNAME,CENSUS2010POP,ESTIMATESBASE2010,POPESTIMATE2010,POPESTIMATE2011,POPESTIMATE2012,POPESTIMATE2013,POPESTIMATE2014,NPOPCHG_2010,NPOPCHG_2011,NPOPCHG_2012,NPOPCHG_2013,NPOPCHG_2014,BIRTHS2010,BIRTHS2011,BIRTHS2012,BIRTHS2013,BIRTHS2014,DEATHS2010,DEATHS2011,DEATHS2012,DEATHS2013,DEATHS2014,NATURALINC2010,NATURALINC2011,NATURALINC2012,NATURALINC2013,NATURALINC2014,INTERNATIONALMIG2010,INTERNATIONALMIG2011,INTERNATIONALMIG2012,INTERNATIONALMIG2013,INTERNATIONALMIG2014,DOMESTICMIG2010,DOMESTICMIG2011,DOMESTICMIG2012,DOMESTICMIG2013,DOMESTICMIG2014,NETMIG2010,NETMIG2011,NETMIG2012,NETMIG2013,NETMIG2014,RESIDUAL2010,RESIDUAL2011,RESIDUAL2012,RESIDUAL2013,RESIDUAL2014,GQESTIMATESBASE2010,GQESTIMATES2010,GQESTIMATES2011,GQESTIMATES2012,GQESTIMATES2013,GQESTIMATES2014,RBIRTH2011,RBIRTH2012,RBIRTH2013,RBIRTH2014,RDEATH2011,RDEATH2012,RDEATH2013,RDEATH2014,RNATURALINC2011,RNATURALINC2012,RNATURALINC2013,RNATURALINC2014,RINTERNATIONALMIG2011,RINTERNATIONALMIG2012,RINTERNATIONALMIG2013,RINTERNATIONALMIG2014,RDOMESTICMIG2011,RDOMESTICMIG2012,RDOMESTICMIG2013,RDOMESTICMIG2014,RNETMIG2011,RNETMIG2012,RNETMIG2013,RNETMIG2014
40,3,6,1,0,Alabama,Alabama,4779736,4780127,4785822,4801695,4817484,4833996,4849377,5695,15873,15789,16512,15381,14966,59691,59066,58036,58059,11097,48810,48380,49746,49793,3869,10881,10686,8290,8266,1352,4949,5626,5626,5606,489,-98,-810,1913,2034,1841,4851,4816,7539,7640,-15,141,287,683,-525,116185,116211,115491,115691,117165,118470,12.451816252,12.280881768,12.026342074,11.99148272,10.181989769,10.05907053,10.308470825,10.284226374,2.2698264837,2.2218112377,1.7178712488,1.7072563455,1.0323840886,1.1697463994,1.1658315616,1.1578610057,-0.020443249,-0.168413541,0.3964158865,0.4201015493,1.0119408393,1.001332858,1.5622474481,1.577962555
50,3,6,1,1,Alabama,Autauga County,54571,54571,54684,55275,55192,55136,55395,113,591,-83,-56,259,171,636,615,593,618,152,505,563,535,492,19,131,52,58,126,33,16,12,12,11,52,398,-155,-161,129,85,414,-143,-149,140,9,46,8,35,-7,455,455,455,455,455,455,11.567948053,11.134546969,10.749764339,11.182383223,9.1852417719,10.193089339,9.698353999,8.9024798473,2.3827062814,0.9414576299,1.0514103401,2.2799033755,0.2910175611,0.217259453,0.2175331738,0.1990391836,7.2390618321,-2.806267935,-2.918570082,2.3341867892,7.5300793932,-2.589008482,-2.701036908,2.5332259728
50,3,6,1,3,Alabama,Baldwin County,182265,182265,183216,186694,190561,195443,200111,951,3478,3867,4882,4668,548,2187,2093,2162,2169,537,1819,1884,1912,1947,11,368,209,250,222,67,247,269,284,283,851,2692,3364,4244,3779,918,2939,3633,4528,4062,22,171,25,104,384,2307,2307,2262,2244,2297,2296,11.824497851,11.095943062,11.201956456,10.966897061,9.8348246871,9.9879391923,9.9066330919,9.8444207365,1.9896731637,1.1080038701,1.2953233645,1.1224763243,1.3354599768,1.4260911055,1.4714873421,1.4309045036,14.55489173,17.834090999,21.989409436,19.107378512,15.890351707,19.260182105,23.460896778,20.538283016
50,3,6,1,5,Alabama,Barbour County,27457,27457,27336,27225,27169,26978,26887,-121,-111,-56,-191,-91,69,334,300,263,285,131,320,291,285,307,-62,14,9,-22,-22,2,4,-1,-1,1,-64,-130,-65,-190,-60,-62,-126,-66,-191,-59,3,1,1,22,-10,3193,3193,3382,3389,3390,3402,12.243177361,11.030628378,9.7142962676,10.582010582,11.729990286,10.699709527,10.526898997,11.398867539,0.513187075,0.3309188513,-0.81260273,-0.816856957,0.1466248786,-0.036768761,-0.036936488,0.0371298617,-4.765308554,-2.389969482,-7.017932665,-2.227791701,-4.618683675,-2.426738243,-7.054869152,-2.19066184
50,3,6,1,7,Alabama,Bibb County,22915,22919,22879,22740,22634,22504,22506,-40,-139,-106,-130,2,57,266,245,250,243,28,278,238,229,227,29,-12,7,21,16,2,1,0,3,3,-69,-128,-113,-152,-8,-67,-127,-113,-149,-5,-2,0,0,-2,-9,2224,2224,2224,2225,2224,2224,11.66180758,10.799136069,11.077141211,10.797600533,12.187904163,10.490589324,10.14666135,10.086647412,-0.526096583,0.3085467448,0.9304798618,0.7109531215,0.0438413819,0.0,0.1329256945,0.1333037103,-5.611696881,-4.980826024,-6.734901857,-0.355476561,-5.567855499,-4.980826024,-6.601976162,-0.22217285
50,3,6,1,9,Alabama,Blount County,57322,57322,57344,57694,57748,57720,57719,22,350,54,-28,-1,152,744,711,645,676,131,566,593,563,564,21,178,118,82,112,5,15,7,11,12,-3,102,-66,-99,-114,2,117,-59,-88,-102,-1,55,-5,-22,-11,489,489,489,489,489,489,12.934856308,12.317873911,11.171926421,11.711813165,9.8402267077,10.273557284,9.7516194963,9.7713944161,3.0946296007,2.0443166265,1.4203069249,1.9404187493,0.2607833933,0.1212730202,0.1905289777,0.2079020089,1.7733270745,-1.143431333,-1.7147608,-1.975069084,2.0341104678,-1.022158313,-1.524231822,-1.767167075
50,3,6,1,11,Alabama,Bullock County,10914,10915,10886,10623,10589,10605,10764,-29,-263,-34,16,159,39,169,122,129,132,52,132,119,92,111,-13,37,3,37,21,7,16,13,13,14,-23,-333,-50,-18,128,-16,-317,-37,-5,142,0,17,0,-16,-4,1690,1690,1690,1779,1716,1726,15.714352132,11.502922874,12.173256582,12.354345079,12.273931842,11.220064115,8.6817023686,10.388881089,3.4404202892,0.2828587592,3.4915542135,1.9654639899,1.4877493142,1.2257212898,1.2267622912,1.3103093266,-30.9637826,-4.714312653,-1.698593942,11.979970986,-29.47603329,-3.488591363,-0.47183165,13.290280313
50,3,6,1,13,Alabama,Butler County,20947,20946,20945,20676,20409,20289,20296,-1,-269,-267,-120,7,68,274,241,247,252,62,263,277,262,238,6,11,-36,-15,14,0,6,6,6,6,-3,-293,-240,-108,-1,-3,-287,-234,-102,5,-4,7,3,-3,-12,333,333,333,333,333,333,13.166430408,11.731775587,12.138188609,12.418381175,12.637851085,13.48423999,12.875325569,11.72847111,0.5285793229,-1.752464403,-0.73713696,0.6899100653,0.2883159943,0.2920774005,0.294854784,0.2956757423,-14.07943106,-11.68309602,-5.307386112,-0.04927929,-13.79111506,-11.39101862,-5.012531328,0.2463964519
50,3,6,1,15,Alabama,Calhoun County,118572,118586,118443,117760,117264,116547,115916,-143,-683,-496,-717,-631,323,1385,1355,1327,1292,311,1327,1361,1418,1401,12,58,-6,-91,-109,6,27,43,33,31,-157,-726,-534,-643,-471,-151,-699,-491,-610,-440,-4,-42,1,-16,-82,2933,2934,2878,2956,2809,2809,11.727200755,11.530737286,11.351048496,11.115747452,11.236097763,11.581795902,12.129454987,12.0535311,0.4911029919,-0.051058615,-0.778406491,-0.937783647,0.22861691,0.3659200762,0.2822792769,0.2667091107,-6.147254692,-4.544216761,-5.50016894,-4.052257779,-5.918637782,-4.178296685,-5.217889663,-3.785548668
50,3,6,1,17,Alabama,Chambers County,34215,34170,34111,34004,34087,34175,34076,-59,-107,83,88,-99,94,401,393,396,404,80,442,475,472,452,14,-41,-82,-76,-48,7,31,29,32,32,-76,-93,132,112,-88,-69,-62,161,144,-56,-4,-4,4,20,5,458,458,458,458,458,458,11.774205388,11.543375777,11.60235563,11.838654379,12.978051824,13.951917287,13.829070347,13.245227176,-1.203846436,-2.408541511,-2.226714717,-1.406572797,0.9102253542,0.851801266,0.9375640913,0.9377151983,-2.730676063,3.877164383,3.2814743195,-2.578716795,-1.820450708,4.7289656489,4.2190384108,-1.641001597


Let's tweak the DataFrame above to have a fips column that matches the NYT data. Here's the documentation on [user-defined functions (UDFs)](https://docs.databricks.com/spark/latest/spark-sql/udf-python.html).

In [27]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

def make_fips(state_code, county_code):
  if len(str(county_code)) == 1:
    return str(state_code) + "00" + str(county_code)
  elif len(str(county_code)) == 2:
    return str(state_code) + "0" + str(county_code)
  else:
    return str(state_code) + str(county_code)

make_fips_udf = udf(make_fips, StringType())
  
census_df = census_df.withColumn("fips", make_fips_udf(census_df.STATE, census_df.COUNTY))

Now that both the census and the covid data have an identical column, let's join the two dataframes.

In [29]:
covid_with_census = (covid_df
                     .na.drop(subset=["fips"])
                     .join(census_df.drop("COUNTY", "STATE"), on=['fips'], how='inner'))

What do the cases look like for the most populous counties?

In [31]:
display(covid_with_census.filter("POPESTIMATE2014 > 2000000").select("county", "cases", "date"))

county,cases,date
Cook,1,2020-01-24T00:00:00.000+0000
Orange,1,2020-01-25T00:00:00.000+0000
Cook,1,2020-01-25T00:00:00.000+0000
Maricopa,1,2020-01-26T00:00:00.000+0000
Los Angeles,1,2020-01-26T00:00:00.000+0000
Orange,1,2020-01-26T00:00:00.000+0000
Cook,1,2020-01-26T00:00:00.000+0000
Maricopa,1,2020-01-27T00:00:00.000+0000
Los Angeles,1,2020-01-27T00:00:00.000+0000
Orange,1,2020-01-27T00:00:00.000+0000


Since the NYT dataset has a new row for every day, with cases increasing each day, let's grab only the most recent numbers for each county.
* I'm using the "col" function to refer to columns. It's equivalent to something like 'df["column_name"]''

In [33]:
from pyspark.sql.functions import row_number, col
from pyspark.sql import Window

w = Window.partitionBy("fips").orderBy(col("date").desc())
current_covid_rates = (covid_with_census.
                       withColumn("row_num", row_number().over(w))
                       .filter(col("row_num") == 1)
                       .drop("row_num"))

What counties are hardest hit when the cases are scaled with their population?

In [35]:
current_covid_rates = (current_covid_rates
                       .withColumn("case_rates_percent", 100*(col("cases")/col("POPESTIMATE2014")))
                       .sort(col("case_rates_percent").desc()))

#Look at the top 10 counties
display(current_covid_rates.select("county", "case_rates_percent").limit(10))