# Spark SQL

## Overview

In the previous mission, we learned how to read JSON into a Spark DataFrame, as well as some basic techniques for interacting with DataFrames. In this mission, we'll learn how to use Spark's SQL interface to query and interact with the data. We'll continue to work with the 2010 U.S. Census data set in this mission. Later on, we'll add other files to demonstrate how to take advantage of SQL to work with multiple data sets.

## Register the DataFrame as a Table

Before we can write and run SQL queries, we need to tell Spark to treat the DataFrame as a SQL table. Spark internally maintains a virtual database within the SQLContext object. This object, which we enter as `sqlCtx`, has methods for registering temporary tables.<br>

To register a DataFrame as a table, call the [`registerTempTable()` method](https://spark.apache.org/docs/1.5.0/api/python/pyspark.sql.html#pyspark.sql.DataFrame.registerTempTable) on that DataFrame object. This method requires one string parameter, `name`, that we use to set the table name for reference in our SQL queries.

* Use the [`registerTempTable()` method](https://spark.apache.org/docs/1.5.0/api/python/pyspark.sql.html#pyspark.sql.DataFrame.registerTempTable) to register the DataFrame `df` as a table named `census2010`.
* Then, run the SQLContext method `tableNames` to return the list of tables.
  * Assign the resulting list to `tables`, and use the `print` function to display it.

In [1]:
import pyspark

In [2]:
sc = pyspark.SparkContext()

In [3]:
from pyspark.sql import SQLContext
sqlCtx = SQLContext(sc)
df = sqlCtx.read.json("data/census_2010.json")

In [5]:
df.registerTempTable('census2010')

<bound method SQLContext.tableNames of <pyspark.sql.context.SQLContext object at 0x1037bf4e0>>


In [6]:
tables = sqlCtx.tableNames()
print(tables)

['census2010']


## Querying

Now that we've registered the table within `sqlCtx`, we can start writing and running SQL queries. With Spark SQL, we represent our query as a string and pass it into the `sql()` method within the SQLContext object. The [`sql()` method](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.SQLContext.sql) requires a single parameter, the query string. Spark will return the query results as a DataFrame object. This means you'll have to use `show()` to display the results, due to lazy loading.<br>

While SQLite requires that queries end with a semi-colon, Spark SQL will actually throw an error if you include it. Other than this difference in syntax, Spark's flavor of SQL is identical to SQLite, and all the queries you've written for the [SQL course](https://www.dataquest.io/section/databases-sql) will work here as well.

* Write a SQL query that returns the `age` column from the table `census2010`, and use the `show()` method to display the first 20 results.

In [7]:
query = 'select age from census2010'
sqlCtx.sql(query).show()

+---+
|age|
+---+
|  0|
|  1|
|  2|
|  3|
|  4|
|  5|
|  6|
|  7|
|  8|
|  9|
| 10|
| 11|
| 12|
| 13|
| 14|
| 15|
| 16|
| 17|
| 18|
| 19|
+---+
only showing top 20 rows



## Filtering

In the previous mission, we used DataFrame methods to find all of the rows where `age` was greater than `5`. If we only wanted to retrieve data from the `males` and `females` columns where that criteria were true, we'd need to chain additional operations to the Spark DataFrame. To return the results in descending order instead of ascending order, we'd have to chain another method. The DataFrame methods are quick and powerful for simple queries, but chaining them can be cumbersome for more advanced queries.<br>

SQL shines at expressing complex logic in a more compact manner. Let's brush up on SQL by writing a query that expresses more specific criteria.

Write and run a SQL query that returns:
* The `males` and `females` columns (in that order) where `age` > 5 and `age` < 15

In [8]:
query = '''select males, females
           from census2010
           where age > 5 and age < 15
'''
sqlCtx.sql(query).show()

+-------+-------+
|  males|females|
+-------+-------+
|2093905|2007781|
|2097080|2010281|
|2101670|2013771|
|2108014|2018603|
|2114217|2023289|
|2118390|2026352|
|2132030|2037286|
|2159943|2060100|
|2195773|2089651|
+-------+-------+



## Mixing Functionality

Because the results of SQL queries are DataFrame objects, **we can combine the best aspects of both DataFrames and SQL to enhance our workflow**. For example, we can write a SQL query that quickly returns a subset of our data as a DataFrame.

* Write a SQL query that returns a DataFrame containing the `males` and `females` columns from the `census2010` table.
* Use the [`describe()` method](https://spark.apache.org/docs/1.5.0/api/python/pyspark.sql.html#pyspark.sql.DataFrame.describe) to calculate summary statistics for the DataFrame and the `show()` method to display the results.

In [10]:
query = '''
        select males, females
        from census2010
'''

sqlCtx.sql(query).describe().show()

+-------+------------------+-----------------+
|summary|             males|          females|
+-------+------------------+-----------------+
|  count|               101|              101|
|   mean|1520095.3168316833|1571460.287128713|
| stddev|  818587.208016823|748671.0493484351|
|    min|              4612|            25673|
|    max|           2285990|          2331572|
+-------+------------------+-----------------+



## Multiple tables

One of the most powerful use cases in SQL is **joining tables**. Spark SQL takes this a step further by enabling you to run join queries across data from multiple file types. Spark will read any of the file types and formats it supports into DataFrame objects and we can register each of these as tables within the SQLContext object to use for querying.<br>

As we mentioned briefly in the previous mission, **most data science organizations use a variety of file formats and data storage mechanisms**. Spark SQL was built with the industry use cases in mind and enables data professionals to use one common query language, SQL, to interact with lots of different data sources. We'll explore joins in Spark SQL further, but first let's introduce the other datasets we'll be using:

* `census_1980.json` - 1980 U.S. Census data
* `census_1990.json` - 1990 U.S. Census data
* `census_2000.json` - 2000 U.S. Census data

Read these additional datasets into DataFrame objects and then use the `registerTempTable()` function to register these tables individually within SQLContext:
* `census_1980.json` as `census1980`,
* `census_1990.json` as `census1990`,
* `census_2000.json` as `census2000`.

Then use the method `tableNames()` to list the tables within the SQLContext object, assign to `tables`, and finally print `tables`.

In [11]:
# previous codes to read

#from pyspark.sql import SQLContext
#sqlCtx = SQLContext(sc)
#df = sqlCtx.read.json("census_2010.json")
#df.registerTempTable('census2010')

In [12]:
df = sqlCtx.read.json('data/census_1980.json')
df.registerTempTable('census1980')

df = sqlCtx.read.json('data/census_1990.json')
df.registerTempTable('census1990')

df = sqlCtx.read.json('data/census_2000.json')
df.registerTempTable('census2000')

In [13]:
tables = sqlCtx.tableNames()
print(tables)

['census1980', 'census1990', 'census2000', 'census2010']


## Joins

Now that we have a table for each dataset, we can write join queries to compare values across them. Since we're working with Census data, let's use the `age` column as the joining column.

* Write a query that returns a DataFrame with the `total` columns for the tables `census2010` and `census2000` (in that order).
* Then, run the query and use the `show()` method to display the first 20 results.

In [17]:
query = '''
        select census2010.total, census2000.total
        from census2010
        inner join census2000
        on census2010.age = census2000.age
'''
sqlCtx.sql(query).show(20)

+-------+-------+
|  total|  total|
+-------+-------+
|4079669|3733034|
|4085341|3825896|
|4089295|3904845|
|4092221|3970865|
|4094802|4024943|
|4097728|4068061|
|4101686|4101204|
|4107361|4125360|
|4115441|4141510|
|4126617|4150640|
|4137506|4152174|
|4144742|4145530|
|4169316|4139512|
|4220043|4138230|
|4285424|4137982|
|4347028|4133932|
|4410804|4130632|
|4451147|4111244|
|4454165|4068058|
|4432260|4011192|
+-------+-------+
only showing top 20 rows



## SQL Functions

The functions and operators from SQLite that we've used in the past are available for us to use in Spark SQL:

* COUNT()
* AVG()
* SUM()
* AND
* OR

Write a query that calculates the sums of the `total` column from each of the tables, in the following order:
* `census2010`,
* `census2000`,
* `census1990`.

You'll need to perform two inner joins for this query (all datasets have the same values for `age`, which makes things convenient for joining).

In [20]:
query = '''
        select sum(census2010.total),
                sum(census2000.total),
                sum(census1990.total)
        from census2010
        inner join census2000 
        on census2010.age=census2000.age
        inner join census1990
        on census2010.age=census1990.age
'''
sqlCtx.sql(query).show()

+----------+----------+----------+
|sum(total)|sum(total)|sum(total)|
+----------+----------+----------+
| 312247116| 284594395| 254506647|
+----------+----------+----------+



In [21]:
sc.stop()