In the previous file, we learned how to read JSON into a Spark DataFrame, as well as some basic techniques for interacting with DataFrames. In this file, we'll learn how to use Spark's SQL interface to query and interact with the data. We'll continue to work with the 2010 U.S. Census data set. Later on, we'll add other files to demonstrate how to take advantage of SQL to work with multiple data sets.

Before we can write and run SQL queries, we need to tell Spark to treat the DataFrame as a SQL table. Spark internally maintains a virtual database within the SQLContext object. This object, which we enter as **sqlCtx**, has methods for registering temporary tables.

To register a DataFrame as a table, call the [registerTempTable() method](https://spark.apache.org/docs/1.5.0/api/python/pyspark.sql.html#pyspark.sql.DataFrame.registerTempTable) on that DataFrame object. This method requires one string parameter, name, that we use to set the table name for reference in our SQL queries.

In [1]:
import pyspark
from pyspark.sql import SQLContext

In [2]:
sc = pyspark.SparkContext()

In [3]:
sqlCtx = SQLContext(sc)

In [4]:
df = sqlCtx.read.json("census_2010.json")

In [6]:
# Use the registerTempTable() method to register the DataFrame df 
# as a table named census2010.

df.registerTempTable("census2010")

In [7]:
#  run the SQLContext method tableNames to return the list of tables.

tables = sqlCtx.tableNames()

In [8]:
print(tables)

['censue_2010', 'census2010']


Now that we've registered the table within **sqlCtx**, we can start writing and running SQL queries. With Spark SQL, we represent our query as a string and pass it into the **sql()** method within the SQLContext object. The [sql() method](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.SQLContext.sql) requires a single parameter, the query string. Spark will return the query results as a DataFrame object. This means we'll have to use **show()** to display the results, due to lazy loading.

While SQLite requires that queries end with a semi-colon, Spark SQL will actually throw an error if we include it. Other than this difference in syntax, Spark's flavor of SQL is identical to SQLite, and all the queries we've written for the SQL will work here as well.

In [11]:
# SQL query that returns the age column from the table census2010

sqlCtx.sql("select age from census2010").show(5)

+---+
|age|
+---+
|  0|
|  1|
|  2|
|  3|
|  4|
+---+
only showing top 5 rows



In the previous file, we used DataFrame methods to find all of the rows where **age** was greater than 5. If we only wanted to retrieve data from the **males** and **females** columns where that criteria were true, we'd need to chain additional operations to the Spark DataFrame. To return the results in descending order instead of ascending order, we'd have to chain another method. The DataFrame methods are quick and powerful for simple queries, but chaining them can be cumbersome for more advanced queries.

SQL shines at expressing complex logic in a more compact manner. Let's brush up on SQL by writing a query that expresses more specific criteria.

In [13]:
# Write and run a SQL query that returns:
# The males and females columns (in that order) where age > 5 and age < 15

query = """select males,females from census2010 
           where age >5 and age < 15"""
sqlCtx.sql(query).show(10)

+-------+-------+
|  males|females|
+-------+-------+
|2093905|2007781|
|2097080|2010281|
|2101670|2013771|
|2108014|2018603|
|2114217|2023289|
|2118390|2026352|
|2132030|2037286|
|2159943|2060100|
|2195773|2089651|
+-------+-------+



Because the results of SQL queries are DataFrame objects, we can combine the best aspects of both DataFrames and SQL to enhance our workflow. For example, we can write a SQL query that quickly returns a subset of our data as a DataFrame.

Use [describe() method](https://spark.apache.org/docs/1.5.0/api/python/pyspark.sql.html#pyspark.sql.DataFrame.describe) to calculate summary statistics for the DataFrame

In [14]:
query = 'select males,females from census2010'
sqlCtx.sql(query).describe().show()

+-------+------------------+-----------------+
|summary|             males|          females|
+-------+------------------+-----------------+
|  count|               101|              101|
|   mean|1520095.3168316833|1571460.287128713|
| stddev|  818587.208016823|748671.0493484351|
|    min|              4612|            25673|
|    max|           2285990|          2331572|
+-------+------------------+-----------------+



One of the most powerful use cases in SQL is joining tables. Spark SQL takes this a step further by enabling us to run join queries across data from multiple file types. Spark will read any of the file types and formats it supports into DataFrame objects and we can register each of these as tables within the SQLContext object to use for querying.

Most data science organizations use a variety of file formats and data storage mechanisms. Spark SQL was built with the industry use cases in mind and enables data professionals to use one common query language, SQL, to interact with lots of different data sources. We'll explore joins in Spark SQL further, but first let's introduce the other datasets we'll be using:

* census_1980.json - 1980 U.S. Census data
* census_1990.json - 1990 U.S. Census data
* census_2000.json - 2000 U.S. Census data

Then use the method tableNames() to list the tables within the SQLContext object, assign to tables, and finally print tables.

In [15]:
df_1980 = sqlCtx.read.json("census_1980.json")
df_1990 = sqlCtx.read.json("census_1990.json")
df_2000 = sqlCtx.read.json("census_2000.json")
df_1980.registerTempTable("census1980")
df_1990.registerTempTable("census1990")
df_2000.registerTempTable("census2000")
tables = sqlCtx.tableNames()
print(tables)

['censue_2010', 'census1980', 'census1990', 'census2000', 'census2010']


Now that we have a table for each dataset, we can write join queries to compare values across them. Since we're working with Census data, let's use the age column as the joining column.

Write a query that returns a DataFrame with the total columns for the tables census2010 and census2000 (in that order).

In [18]:
query = """select c10.total, c00.total from census2010 c10
        inner join census2000 c00
        on c10.age=c00.age"""
sqlCtx.sql(query).show(7)

+-------+-------+
|  total|  total|
+-------+-------+
|4079669|3733034|
|4085341|3825896|
|4089295|3904845|
|4092221|3970865|
|4094802|4024943|
|4097728|4068061|
|4101686|4101204|
+-------+-------+
only showing top 7 rows



The functions and operators from SQLite that we've used in the past are available for us to use in Spark SQL:

* COUNT()
* AVG()
* SUM()
* AND
* OR

Write a query that calculates the sums of the total column from each of the tables, in the following order:

* census2010,
* census2000,
* census1990.

We'll need to perform two inner joins for this query (all datasets have the same values for age, which makes things convenient for joining).

In [20]:
query = """
 select sum(c10.total), sum(c00.total), sum(c90.total)
 from census2010 c10
 inner join census2000 c00
 on c10.age=c00.age
 inner join census1990 c90
 on c10.age=c90.age
"""
sqlCtx.sql(query).show()

+----------+----------+----------+
|sum(total)|sum(total)|sum(total)|
+----------+----------+----------+
| 312247116| 284594395| 254506647|
+----------+----------+----------+

