# Exploring Spark SQL and CSV data

### Getting started###

#### Spark
* Requires [Apache Spark](https://spark.apache.org/) version 1.3 or above.
* We will mainly be looking at aggregation with Spark SQL and the new [Spark SQL DataFrame API](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame)
* This offers a number of features similar to those provded by data frames in R or Pandas.

#### Sample data
* This exercise uses a sample CSV file of the latest monthly "price paid data" for house sales from the UK Land Registry.
* This sample file is in the /data directory.
* You can [download the latest version of the monthly PPD CSV file](http://publicdata.landregistry.gov.uk/market-trend-data/price-paid-data/a/pp-monthly-update-new-version.csv) (usually around 14MB) directly from the Land Registry website. 
* Other data formats are [also available for download](http://data.gov.uk/dataset/land-registry-monthly-price-paid-data).
* The CSV files do not contain the column definitions, but [these are defined on the Land Registry website](https://www.gov.uk/about-the-price-paid-data)

#### Data licence
* The UK Land Registry data is [published under specific licence conditions](https://www.gov.uk/government/statistical-data-sets/price-paid-data-downloads#when-using-or-publishing-our-price-paid-data)
* Attribution for UK Land Registry data:  Data produced by Land Registry © Crown copyright 2015.

### Read CSV data into Spark and create data frame

In [20]:
from pyspark.sql import SQLContext
from pyspark import SparkConf

### Seems like PySpark SparkContext is already available in Spark 1.3.0 ###
#conf = SparkConf().setMaster("local[*]")
#sc = SparkContext(conf)

sqlContext = SQLContext(sc)

#### Load CSV file as an RDD and convert to a DataFrame

* Spark SQL does not currently provide a built-in function to load CSV, so we do this manually using sc.textFile().
* This creates an RDD, as usual.
* We then map the data rows to tuples ready for converting to a DataFrame.
* We also provide a set of field (column) definitions that will define the schema.
* These data rows and column definitions are then combined to create a DataFrame.

In [21]:
# Need to import StructField etc
from pyspark.sql.types import *

# Load a CSV file and convert each line to a tuple.
lines = sc.textFile("data/ppd-monthly-sample.csv")
data = lines.map(lambda l: l.split(","))
data_tuples = data.map(lambda p: (p[0].strip("\""), float(p[1].strip("\"")), p[2].strip("\""), p[3].strip("\"") \
, p[4].strip("\""), p[5].strip("\""), p[6].strip("\""), p[7].strip("\""), p[8].strip("\""), p[9].strip("\"")\
, p[10].strip("\""), p[11].strip("\""), p[12].strip("\""), p[13].strip("\""), p[14].strip("\"")))

# Construct the field definitions for the schema
# - most fields will be StringType here
# - the sale price will be a DoubleType
fields = []
fields.append(StructField("transaction_uid", StringType(), True))
fields.append(StructField("sale_price", DoubleType(), True))
fields.append(StructField("date_of_transfer", StringType(), True))
fields.append(StructField("postcode", StringType(), True))
fields.append(StructField("property_type", StringType(), True))
fields.append(StructField("old_new", StringType(), True))
fields.append(StructField("duration", StringType(), True))
fields.append(StructField("paon", StringType(), True))
fields.append(StructField("saon", StringType(), True))
fields.append(StructField("street", StringType(), True))
fields.append(StructField("locality", StringType(), True))
fields.append(StructField("city", StringType(), True))
fields.append(StructField("district", StringType(), True))
fields.append(StructField("county", StringType(), True))
fields.append(StructField("record_status", StringType(), True))

schema = StructType(fields)

# Apply the schema to the RDD of tuples:
salesDf = sqlContext.createDataFrame(data_tuples, schema)

* Display the schema of the DataFrame:

In [22]:
salesDf.printSchema()

root
 |-- transaction_uid: string (nullable = true)
 |-- sale_price: double (nullable = true)
 |-- date_of_transfer: string (nullable = true)
 |-- postcode: string (nullable = true)
 |-- property_type: string (nullable = true)
 |-- old_new: string (nullable = true)
 |-- duration: string (nullable = true)
 |-- paon: string (nullable = true)
 |-- saon: string (nullable = true)
 |-- street: string (nullable = true)
 |-- locality: string (nullable = true)
 |-- city: string (nullable = true)
 |-- district: string (nullable = true)
 |-- county: string (nullable = true)
 |-- record_status: string (nullable = true)



### Explore data frame with SQL

#### Use SQL to derive summary data
* We will fetch total number of sales, value of sales and average sale price by postcode.
* We will then use different approaches to derive the top N postcodes by various criteria

#### Remember that each of these operations will cause the RDD data to be materialised
* We will cache the initial data table and the aggregate data (sales_by_postcode).
* This should improve performance when we re-query the postcode data below.

In [23]:
# Register the DataFrame as a table.
salesDf.registerTempTable("sales")

# Cache it so we can run multiple queries
sqlContext.cacheTable("sales")


#### Use SQL to derive aggregate data from the DataFrame table
* SQL can be run over DataFrames that have been registered as a table.
* Bear in mind that the SQL is not parsed or validated until runtime.
* For example, fetch total sales value, count and average sale price by postcode:


In [24]:
# Fetch total sales value, count and average sale price by postcode:
#
# SELECT postcode, 
#        COUNT(*) AS num_sales,
#        SUM(sale_price)  AS value_sales, 
#        AVG(sale_price) AS avg_price 
# FROM   sales 
# WHERE  postcode IS NOT NULL 
# AND postcode != '' 
# GROUP BY postcode
#
sales_by_postcode = sqlContext.sql("SELECT postcode, COUNT(*) AS num_sales, SUM(sale_price)  AS value_sales, AVG(sale_price) AS avg_price FROM sales WHERE postcode IS NOT NULL AND postcode != '' GROUP BY postcode")

#### Cache the aggregate data frame

In [25]:
sales_by_postcode.cache()

DataFrame[postcode: string, num_sales: bigint, value_sales: double, avg_price: double]

#### Set the value of N for your Top N queries

* We will be looking for the top N postcodes using various query methods below.

In [26]:
n = 5

### Top N postcodes by number of sales

#### Using an RDD (the old-fashioned way

* Numbers are rounded here for convenience.

In [27]:
# Remember that in Python we need to explicitly request the RDD via .rdd
topSalesByCount = sales_by_postcode.rdd.sortBy(lambda data: data[1], ascending=False).take(n)

for spc in topSalesByCount:
  print ("{0}: count = {1}, total value = {2:7.0f}, average sale price: {3:7.0f}".format(spc.postcode, spc.num_sales, spc.value_sales, spc.avg_price))

E2 0SZ: count = 70, total value = 22301624, average sale price:  318595
N4 2GS: count = 67, total value = 31506180, average sale price:  470241
E3 3SU: count = 52, total value = 18758584, average sale price:  360742
SE3 9FJ: count = 45, total value = 14099895, average sale price:  313331
E2 0FG: count = 45, total value = 22665000, average sale price:  503667


#### Using the new DataFrame API

* In this case we use the show() function rather than println() to display the chosen records directly.
* Data values are left in the internal numeric format for now.

In [28]:
sales_by_postcode.orderBy(sales_by_postcode.num_sales.desc()).limit(n).show()

+--------+---------+-----------+------------------+
|postcode|num_sales|value_sales|         avg_price|
+--------+---------+-----------+------------------+
|  E2 0SZ|       70|2.2301624E7| 318594.6285714286|
|  N4 2GS|       67| 3.150618E7|470241.49253731343|
|  E3 3SU|       52|1.8758584E7|          360742.0|
|  E2 0FG|       45|   2.2665E7| 503666.6666666667|
| SE3 9FJ|       45|1.4099895E7|          313331.0|
+--------+---------+-----------+------------------+



### Top N postcodes by value of sales

#### Using an RDD (the old-fashioned way

* Numbers are rounded here for convenience.

In [29]:
# Print the top N postcodes by value of sales
# Remember that in Python we need to explicitly request the RDD via .rdd
topSalesByValue = sales_by_postcode.rdd.sortBy(lambda data: data[2], ascending=False).take(n)

for spc in topSalesByValue:
  print ("{0}: count = {1}, total value = {2:7.0f}, average sale price: {3:7.0f}".format(spc.postcode, spc.num_sales, spc.value_sales, spc.avg_price))

SW1W 9AH: count = 8, total value = 36988375, average sale price: 4623547
N2 0BE: count = 1, total value = 33700000, average sale price: 33700000
WC2H 0DT: count = 8, total value = 31604200, average sale price: 3950525
N4 2GS: count = 67, total value = 31506180, average sale price:  470241
W14 8QA: count = 21, total value = 26459075, average sale price: 1259956


#### Using the new DataFrame API

* Data values are left in the internal numeric format for now.

In [30]:
sales_by_postcode.orderBy(sales_by_postcode.value_sales.desc()).limit(n).show()

+--------+---------+-----------+------------------+
|postcode|num_sales|value_sales|         avg_price|
+--------+---------+-----------+------------------+
|SW1W 9AH|        8|3.6988375E7|       4623546.875|
|  N2 0BE|        1|     3.37E7|            3.37E7|
|WC2H 0DT|        8|  3.16042E7|         3950525.0|
|  N4 2GS|       67| 3.150618E7|470241.49253731343|
| W14 8QA|       21|2.6459075E7|1259955.9523809524|
+--------+---------+-----------+------------------+



### Top N postcodes by average sale price###

#### Using an RDD (the old-fashioned way)####

* Numbers are rounded here for convenience.

In [31]:
# Remember that in Python we need to explicitly request the RDD via .rdd
topSalesByAvgPrice = sales_by_postcode.rdd.sortBy(lambda data: data[3], ascending=False).take(n)

for spc in topSalesByAvgPrice:
  print ("{0}: count = {1}, total value = {2:7.0f}, average sale price: {3:7.0f}".format(spc.postcode, spc.num_sales, spc.value_sales, spc.avg_price))

N2 0BE: count = 1, total value = 33700000, average sale price: 33700000
SW10 9SJ: count = 1, total value = 16400000, average sale price: 16400000
SW3 2EB: count = 1, total value = 14650000, average sale price: 14650000
HP17 8EJ: count = 1, total value = 12850000, average sale price: 12850000
W1J 5BJ: count = 1, total value = 12600000, average sale price: 12600000


#### Using the new DataFrame API####

* Data values are left in the internal numeric format for now.

In [32]:
sales_by_postcode.orderBy(sales_by_postcode.avg_price.desc()).limit(n).show()

+--------+---------+-----------+---------+
|postcode|num_sales|value_sales|avg_price|
+--------+---------+-----------+---------+
|  N2 0BE|        1|     3.37E7|   3.37E7|
|SW10 9SJ|        1|     1.64E7|   1.64E7|
| SW3 2EB|        1|    1.465E7|  1.465E7|
|HP17 8EJ|        1|    1.285E7|  1.285E7|
| W1J 5BJ|        1|     1.26E7|   1.26E7|
+--------+---------+-----------+---------+



### Do it all with the DataFrame API

* The [Spark SQL DataFrame API](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame) also provides functionality allowing you to query and aggregate directly in the data frame without using any SQL.
* Some of the groupBy and aggregation functions seem to be a bit clunky compared to SQL.
* However, they seem to run more quickly than the RDD versions.

#### Group data by postcode
* First we group the data from the original data-frame (salesDf) by postcode, excluding any empty or NULL postcodes.
* This is roughly equivalent to the SQL GROUP BY query we used above.

In [33]:
# Group the data by postcode
group_by_postcode = salesDf.filter("postcode IS NOT NULL").filter("postcode != ''").groupBy(salesDf.postcode) 

#### Count sales by postcode
* Now we apply the count() to get the number of sales by postcode.
* Then, we sort the counts in descending order with the desc() function (imported from pyspark.sql.functions).
* The groupBy() function allows us to get at the count directly.
* Other aggregation functions require a bit more work (see below).

In [34]:
# Need to use some extra functions for sorting etc
from pyspark.sql import functions as F

num_sales_by_postcode = group_by_postcode.count().orderBy(F.desc("count"))
num_sales_by_postcode.limit(n).show()

+--------+-----+
|postcode|count|
+--------+-----+
|  E2 0SZ|   70|
|  N4 2GS|   67|
|  E3 3SU|   52|
|  E2 0FG|   45|
| SE3 9FJ|   45|
+--------+-----+



#### Top N postcodes by value of sales
* The Spark SQL agg() functions seem to generate artificial column names at runtime e.g. "SUM(sale_price#1033)".
* However, it does not seem possible to apply an alias() function to this column to get a sensible name.
* But when we apply the orderBy() function, we need to know the name of the column to sort by.
* So in this example we get the name of the aggregate column from the data frame at runtime.
* Not sure if there is a better way to do this, but it works for now.

In [35]:
# Get the sum of sales prices from the grouped data
sales_value_by_postcode = group_by_postcode.sum("sale_price") 
# Get the generated name of the SUM column so we can sort by it
agg_col = (sales_value_by_postcode.columns)[1]
# Apply the sort and take the N highest records
sales_value_by_postcode.orderBy(F.desc(agg_col)).limit(n).show()

+--------+---------------+
|postcode|sum(sale_price)|
+--------+---------------+
|SW1W 9AH|    3.6988375E7|
|  N2 0BE|         3.37E7|
|WC2H 0DT|      3.16042E7|
|  N4 2GS|     3.150618E7|
| W14 8QA|    2.6459075E7|
+--------+---------------+



#### Top N postcodes by average sale price


In [36]:
# Get the average of sales prices from the grouped data
avg_price_by_postcode = group_by_postcode.avg("sale_price") 
# Get the generated name of the average column so we can sort by it
agg_col = (avg_price_by_postcode.columns)[1]
# Apply the sort and take the N highest records
avg_price_by_postcode.orderBy(F.desc(agg_col)).limit(n).show()

+--------+---------------+
|postcode|avg(sale_price)|
+--------+---------------+
|  N2 0BE|         3.37E7|
|SW10 9SJ|         1.64E7|
| SW3 2EB|        1.465E7|
|HP17 8EJ|        1.285E7|
| W1J 5BJ|         1.26E7|
+--------+---------------+



### Cleanup
* Clear out the cached data.

In [37]:
sqlContext.clearCache()

### Conclusions

* The Spark SQL API already offers some useful tools for applying SQL queries to RDD data.
* We have only taken a brief look at the new DataFrame API here, but so far it looks even more useful.
* The data frame aggregation functions are slightly awkward, but seem to perform well.
* There seems to be plenty more functionality to explore in this rapidly growing corner of the Apache Spark project.