# Intro to Apache Spark

* [Intro to Spark slides](https://github.com/databricks/tech-talks/blob/master/2020-04-29%20%7C%20Intro%20to%20Apache%20Spark/Intro%20to%20Spark.pdf)
* What is a Spark DataFrame?
  * Read in the [NYT data set](https://github.com/nytimes/covid-19-data) 
* How to perform a distributed count?
* Transformations vs. Actions
* Spark SQL

[Spark docs](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html)

In [0]:
%fs ls databricks-datasets/COVID/covid-19-data/

## How do we represent this data?

![Unified Engine](https://files.training.databricks.com/images/105/unified-engine.png)


####At first there were RDDs...
* **R**esilient: Fault-tolerant
* **D**istributed: Across multiple nodes
* **D**ataset: Collection of partitioned data

RDDs are immutable once created and keep track of their lineage to enable failure recovery.

####... and then there were DataFrames
* Higher-level APIs
* User friendly
* Optimizations and performance improvements

![RDD vs DataFrames](https://files.training.databricks.com/images/105/rdd-vs-dataframes.png)

###Create a DataFrame from the NYT COVID data

In [0]:
covid_df = spark.read.csv("dbfs:/databricks-datasets/COVID/covid-19-data/us-counties.csv")
covid_df.show()

Let's look at the [Spark docs](https://spark.apache.org/docs/latest/index.html) to see what options we have to pass into the csv reader.

In [0]:
covid_df = spark.read.csv("dbfs:/databricks-datasets/COVID/covid-19-data/us-counties.csv", header=True, inferSchema=True)
covid_df.show()

###How many records do we have?
* Instead of counting M&Ms, let's count the number of rows in the DataFrame

###What do we expect our Spark job to look like?
* How many stages?

In [0]:
covid_df.count()

### Let's write some Spark code!

* I want to look at only the information for the county I live in (Los Angeles)
* I want the most recent information at the top

In [0]:
(covid_df
 .sort(covid_df["date"].desc()) 
 .filter(covid_df["county"] == "Los Angeles")) 

**...nothing happened. Why?**

## Transformations vs Actions

There are two types of operations in Spark: transformations and actions.

Fundamental to Apache Spark are the notions that
* Transformations are **LAZY**
* Actions are **EAGER**

In [0]:
# same operations as above
(covid_df
 .sort(covid_df["date"].desc()) 
 .filter(covid_df["county"] == "Los Angeles")) 

Why isn't is showing me results? **Sort** and **filter** are `transformations`, which are lazily evaluated in Spark.

Laziness has a number of benefits
* Not forced to load all data in the first step
  * Technically impossible with **REALLY** large datasets.
* Easier to parallelize operations 
  * N different transformations can be processed on a single data element, on a single thread, on a single machine. 
* Most importantly, it allows the framework to automatically apply various optimizations
  * This is also why we use Dataframes!
  
There's a lot Spark's **Catalyst** optimizer can do. Let's focus on only this situation. For more information, read [this blog!](https://databricks.com/blog/2015/04/13/deep-dive-into-spark-sqls-catalyst-optimizer.html)
  
![Catalyst](https://files.training.databricks.com/images/105/catalyst-diagram.png)

In [0]:
(covid_df
 .sort(covid_df["date"].desc()) 
 .filter(covid_df["county"] == "Los Angeles") 
 .show())  #action!

###We can see the optimizations in action!
* Go to the Spark UI
* Click on the SQL query associated with your Spark job
* See the logical and physical plans!
  * The filter and sort have been swapped

## Spark SQL

In [0]:
covid_df.createOrReplaceTempView("covid")

In [0]:
%sql

SELECT * 
FROM covid

-- keys = date, grouping = county, values = cases

In [0]:
%sql

SELECT * 
FROM covid 
WHERE county = "Los Angeles"

-- keys = date, grouping = county, values = cases, deaths

In [0]:
%sql

SELECT max(cases) AS max_cases, max(deaths) AS max_deaths, county 
FROM covid 
GROUP BY county 
ORDER BY max_cases DESC
LIMIT 10

### CHALLENGE: Provide a spatial view of the data.

The map view requires the state's two letter acronym.  Let's add another dataframe and then join the two tables.

In [0]:
states_list = ("ALABAMA","AL"),("ALASKA","AK"),("ARIZONA","AZ"),("ARKANSAS","AR"),("CALIFORNIA","CO"),("CONNECTICUT","CT"),("DELAWARE","DE"),("FLORIDA","FL"),("GEORGIA","GA"),("HAWAII","HI"),("IDAHO","ID"),("ILLINOIS","IL"),("INDIANA","IN"),("IOWA","IA"),("KANSAS","KS"),("KENTUCKY","KY"),("LOUISIANA","LA"),("MAINE","ME"),("MARYLAND","MD"),("MASSACHUSETTS","MA"),("MICHIGAN","MI"),("MINNESOTA","MN"),("MISSISSIPPI","MS"),("MISSOURI","MO"),("MONTANA","MT"),("NEBRASKA","NE"),("NEVADA","NV"),("NEW HAMPSHIRE","NH"),("NEW JERSEY","NJ"),("NEW MEXICO","NM"),("NEW YORK","NY"),("NORTH CAROLINA","NC"),("NORTH DAKOTA","ND"),("OHIO","OH"),("OKLAHOMA","OK"),("OREGON","OR"),("PENNSYLVANIA","PA"),("RHODE ISLAND","RI"),("SOUTH CAROLINA","SC"),("SOUTH DAKOTA","SD"),("TENNESSEE","TN"),("TEXAS","TX"),("UTAH","UT"),("VERMONT","VT"),("VIRGINIA","VA"),("WASHINGTON","WA"),("WEST VIRGINIA","WV"),("WISCONSIN","WI"),("WYOMING","WY")

In [0]:
state_columns = ["state","state_code"]
states_df = spark.createDataFrame(data=states_list, schema = state_columns)
display(states_df)

In [0]:
states_df.createOrReplaceTempView("states")

In [0]:
%sql

SELECT *
FROM covid LEFT JOIN states on covid.state = states.state


Whoops!  Join is failing because the state field in one table is all capital letters.

This is an example of _data munging_ that typically needs to occur.  

Let's review the [SQL functions](https://spark.apache.org/docs/latest/api/sql/index.html) that are available.

In [0]:
from pyspark.sql.functions import upper

In [0]:
covid_upper_df = covid_df.select("date", "county", upper("state").alias("state"), "fips", "cases", "deaths")
covid_upper_df.createOrReplaceTempView("covid")
display(covid_upper_df)

When an analyst begins to JOIN two very large data sets together, that's when big data problems begin to appear.  The value of Spark shows itself in these scenarios.

In [0]:
%sql

SELECT *
FROM covid LEFT JOIN states on covid.state = states.state

In [0]:
%sql

SELECT state_code, count(cases) as total_cases
FROM covid LEFT JOIN states on covid.state = states.state
WHERE state_code != "null"
GROUP BY state_code
