# Spark SQL Lab
Spark SQL is a Spark module for structured data processing. It provides a programming abstraction called DataFrames and can also act as distributed SQL query engine. This lab presents how to work with Spark SQL.

### Creating the DataFrame
The `SparkSession` class is the entry point for the DataFrames API. This class exposes a `DataFrameReader` named `read` that can be used to create a DataFrame from existing data in supported formats. In our application, we create a `SparkSession` and then create a DataFrame from a JSON file. The dataset we are using in this lab is the results of the March 2016 Virginia Primary Elections for Presidency. The file, `loudoun_d_primary_results_2016.json`, located in `data/sql`, in which each line has the following structure:
```
{
  "district_type": "Congressional", 
  "last_name": "Clinton", 
  "candidate_ballot_order": "1", 
  "precinct_code": "###PROV", 
  "referendumId": "", 
  "total_votes": "9", 
  "candidate_name": "Hillary Clinton", 
  "locality_name": "LOUDOUN COUNTY", 
  "office_ballot_order": "1", 
  "party": "Democratic", 
  "election_name": "2016 March Democratic Presidential Primary", 
  "election_date": "2016-03-01 00:00:00.000", 
  "precinct_name": "## Provisional", 
  "null": [""], 
  "locality_code": "107",
  "negative_votes": "",
  "office_name": "President",
  "candidateId": "124209128",
  "DESCRIPTION": "10th District",
  "districtId": "1085224094",
  "referendum_title": "",
  "officeId": "933838092",
  "in_precinct": "## Provisional",
  "election_type": "Primary"
}
```

In [1]:
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
import org.apache.spark.sql.functions._

val spark = SparkSession.builder.master("local[*]").appName("SparkSQL").getOrCreate()
import spark.implicits._

// Create a DataFrame based on the JSON results.
val fileName = "data/sql/loudoun_d_primary_results_2016.json"
val df = spark.read.json(fileName)    

spark = org.apache.spark.sql.SparkSession@fa0273f
fileName = data/sql/loudoun_d_primary_results_2016.json
df = [DESCRIPTION: string, candidateId: string ... 22 more fields]


[DESCRIPTION: string, candidateId: string ... 22 more fields]

Now, print the inferred schema of the data, and the first 2 lines of the file.

In [2]:
// TODO: Replace <FILL IN> with appropriate code

// print the dataframe schema
df.printSchema()

// print the first two rows
df.take(2)

root
 |-- DESCRIPTION: string (nullable = true)
 |-- candidateId: string (nullable = true)
 |-- candidate_ballot_order: string (nullable = true)
 |-- candidate_name: string (nullable = true)
 |-- districtId: string (nullable = true)
 |-- district_type: string (nullable = true)
 |-- election_date: string (nullable = true)
 |-- election_name: string (nullable = true)
 |-- election_type: string (nullable = true)
 |-- in_precinct: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- locality_code: string (nullable = true)
 |-- locality_name: string (nullable = true)
 |-- negative_votes: string (nullable = true)
 |-- null: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- officeId: string (nullable = true)
 |-- office_ballot_order: string (nullable = true)
 |-- office_name: string (nullable = true)
 |-- party: string (nullable = true)
 |-- precinct_code: string (nullable = true)
 |-- precinct_name: string (nullable = true)
 |-- referendumId: stri

Array([10th District,124209128,1,Hillary Clinton,1085224094,Congressional,2016-03-01 00:00:00.000,2016 March Democratic Presidential Primary,Primary,## Provisional,Clinton,107,LOUDOUN COUNTY,,WrappedArray(),933838092,1,President,Democratic,###PROV,## Provisional,,,9], [10th District,1999936198,2,Martin J. O'Malley,1085224094,Congressional,2016-03-01 00:00:00.000,2016 March Democratic Presidential Primary,Primary,## Provisional,O'Malley,107,LOUDOUN COUNTY,,WrappedArray(),933838092,1,President,Democratic,###PROV,## Provisional,,,0])

### Transforming and Querying the DataFrame
Let's explore the data to determine who the candidates on the ballot were, based on the unique names in the `candidate_name` field. You should receive the following result:
```
+------------------+
|    candidate_name|
+------------------+
|    Bernie Sanders|
|   Hillary Clinton|
|Martin J. O'Malley|
+------------------+
```

In [3]:
// TODO: Replace <FILL IN> with appropriate code

// get all distinct candidate names from the DataFrame
df.select("candidate_name").distinct().show()

+------------------+
|    candidate_name|
+------------------+
|    Bernie Sanders|
|   Hillary Clinton|
|Martin J. O'Malley|
+------------------+



Next, let's see what order the candidates were printed on the ballots, using the `candidate_ballot_order` field. In Virginia, every county uses the same ballot, so we only need one sampling and can safely discard the duplicates. Since we are going to reuse the following DataFrame, use the `cache()` method to cache it. Here is the result:
```
+------------------+----------------------+
|    candidate_name|candidate_ballot_order|
+------------------+----------------------+
|   Hillary Clinton|                     1|
|Martin J. O'Malley|                     2|
|    Bernie Sanders|                     3|
+------------------+----------------------+
```

In [4]:
// TODO: Replace <FILL IN> with appropriate code

// get the ballot order and discard the many duplicates (all VA ballots are the same)
val orderDF = df.select(df("candidate_name"), df("candidate_ballot_order")).distinct().orderBy("candidate_ballot_order").cache()
orderDF.show()

+------------------+----------------------+
|    candidate_name|candidate_ballot_order|
+------------------+----------------------+
|   Hillary Clinton|                     1|
|Martin J. O'Malley|                     2|
|    Bernie Sanders|                     3|
+------------------+----------------------+



orderDF = [candidate_name: string, candidate_ballot_order: string]


[candidate_name: string, candidate_ballot_order: string]

The above query that showed the ballot order needs to be changed to show descriptive English text instead of numbers. We have a reference lookup table available in the file called `friendly_orders.json` in `data/sql` that we would like to use. This file has the following structure:
```
{"candidate_ballot_order": "1", "friendly_name": "First on Ballot"}
{"candidate_ballot_order": "2", "friendly_name": "In Middle of Ballot"}
{"candidate_ballot_order": "3", "friendly_name": "Last on Ballot"}
```

We create a DataFrame of this reference data and then use it to alter the output of our ballot order query, and show the `friendly_name` instead of numbers. You should get a result as below:
```
+------------------+-------------------+
|    candidate_name|      friendly_name|
+------------------+-------------------+
|   Hillary Clinton|    First on Ballot|
|Martin J. O'Malley|In Middle of Ballot|
|    Bernie Sanders|     Last on Ballot|
+------------------+-------------------+
```

In [12]:
// TODO: Replace <FILL IN> with appropriate code

val orderFileName = "data/sql/friendly_orders.json"
val friendlyDF = spark.read.json(orderFileName)

// join the tables so the results show descriptive text
val joinedDF = orderDF.join(friendlyDF, orderDF("candidate_ballot_order") === friendlyDF("candidate_ballot_order"), "inner")

// hide the numeric column in the output.
joinedDF.select(joinedDF("candidate_name"), joinedDF("friendly_name")).show()

+------------------+-------------------+
|    candidate_name|      friendly_name|
+------------------+-------------------+
|   Hillary Clinton|    First on Ballot|
|Martin J. O'Malley|In Middle of Ballot|
|    Bernie Sanders|     Last on Ballot|
+------------------+-------------------+



[candidate_name: string, candidate_ballot_order: string ... 2 more fields]

orderFileName = data/sql/friendly_orders.json
friendlyDF = [candidate_ballot_order: string, friendly_name: string]
joinedDF = [candidate_name: string, candidate_ballot_order: string ... 2 more fields]


lastException: Throwable = null


Next, let's try an aggregate query. To count the total votes, we must cast the `total_votes` column to numeric data and then take the sum of every cell. Let's assign an alias to the column after the cast, using the `alias` method, to increase readability. Here is the result:
```
+--------------------+
|sum(total_votes_int)|
+--------------------+
|               36149|
+--------------------+
```

In [50]:
// TODO: Replace <FILL IN> with appropriate code

// orginal data is string-based, so create an integer version of it and call it total_votes_int
val votesColumn = df("total_votes").cast("int").alias("total_votes_int")

// get the integer-based votes column and sum all values together
df.select(sum(votesColumn).alias("sum(total_votes_int)")).show()

+--------------------+
|sum(total_votes_int)|
+--------------------+
|               36149|
+--------------------+



votesColumn = CAST(total_votes AS INT) AS `total_votes_int`


CAST(total_votes AS INT) AS `total_votes_int`

Grouping this vote count by `candidate_name` employs a similar pattern. Let's use `orderBy()` to sort the results, and show how many votes each candidate got, as below:
```
+------------------+----------+
|    candidate_name|sum_column|
+------------------+----------+
|   Hillary Clinton|     21180|
|    Bernie Sanders|     14730|
|Martin J. O'Malley|       239|
+------------------+----------+
```

In [65]:
// TODO: Replace <FILL IN> with appropriate code

// get just the candidate names and votes.
val candidateDF = df.select(df("candidate_name"), votesColumn)

// group by candidate name and sum votes, and assign an alias "sum_column" to the sum so we can order
// on that column.
val groupedDF = candidateDF.groupBy("candidate_name").agg(sum("total_votes_int").alias("sum_column"))
val summaryDF = groupedDF.orderBy(desc("sum_column")).cache()
summaryDF.show()

+------------------+----------+
|    candidate_name|sum_column|
+------------------+----------+
|   Hillary Clinton|     21180|
|    Bernie Sanders|     14730|
|Martin J. O'Malley|       239|
+------------------+----------+



candidateDF = [candidate_name: string, total_votes_int: int]
groupedDF = [candidate_name: string, sum_column: bigint]
summaryDF = [candidate_name: string, sum_column: bigint]


[candidate_name: string, sum_column: bigint]

For our final exploration, we see which physical precincts (polling station) had the highest physical turnout. Virginia designates special theoretical precincts for absentee and provisional ballots, which can skew our results. So, we want to omit these precincts from our query. A glance at the data shows that the theoretical precincts have non-integer values for `precinct_code`. We can apply cast to the `precinct_code` column and then filter out the rows containing non-integer codes. All physical precincts have a numeric code. Provisional/absentee precincts start with "##". We expect to see the result as below:
```
+-------------+----------+
|precinct_name|sum_column|
+-------------+----------+
| 314 - LEGACY|       652|
+-------------+----------+
```

In [84]:
// TODO: Replace <FILL IN> with appropriate code

// Spark's cast function converts these to "null".
val precinctColumn = df("precinct_code").cast("int").alias("precinct_code_int")

// get the precinct name, integer-based code, and integer-based votes, then filter on non-null codes.
val pollingDF = df.select(df("precinct_name"), precinctColumn("precinct_code_int"), candidateDF("total_votes_int")).filter(precinctColumn("precinct_code_int").isNotNull)

// group by precinct name and sum votes, and assign an alias "sum_column" to the sum so we can order on that
// column, and then, show the max row.
val groupedPollDF = pollingDF.groupBy("precinct_name").agg(sum("total_votes_int").alias("sum_column")).orderBy(desc("sum_column"))
groupedPollDF.select(max("sum_column")).show()

org.apache.spark.sql.AnalysisException: Can't extract value from cast(precinct_code#25 as int) AS precinct_code_int#418: need struct type but got int;

### Saving the DataFrame
The `DataFrame` class  exposes a `DataFrameWriter` named write that can be used to save a DataFrame. There are four available write modes which can be specified, with error being the default:
* `append`: add this data to the end of any data already at the target location.
* `overwrite`: erase any existing data at the target location and replace with this data.
* `ignore`: silently skip this command if any data already exists at the target location.
* `error`: throw an exception if any data already exists at the target location.

Here, we save one of our generated DataFrames as JSON data in the folder `target/json` with the `overwrite` mode. If you look in this directory after running the code, you will see a separate JSON file for each row of the DataFrame, along with a `_SUCCESS` indicator file.

In [None]:
// TODO: Replace <FILL IN> with appropriate code

summaryDF.<FILL IN>