# AIM-3 Spark SQL Demo

In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum

## Start a SparkSession

In [2]:
spark = SparkSession.builder \
    .appName("AIM-3 Spark SQL Demo") \
    .getOrCreate()

## Import data from CSVs (could also use Hive, Parquet,...) and cache it

In [3]:
donations = spark.read.csv('data/Donations.csv', mode = "DROPMALFORMED", inferSchema = True, header = True).cache()
donations.printSchema()

root
 |-- Project ID: string (nullable = true)
 |-- Donation ID: string (nullable = true)
 |-- Donor ID: string (nullable = true)
 |-- Donation Included Optional Donation: string (nullable = true)
 |-- Donation Amount: double (nullable = true)
 |-- Donor Cart Sequence: integer (nullable = true)
 |-- Donation Received Date: timestamp (nullable = true)



In [4]:
donors = spark.read.csv('data/Donors.csv', mode = "DROPMALFORMED", inferSchema = True, header = True).cache()
donors.printSchema()

root
 |-- Donor ID: string (nullable = true)
 |-- Donor City: string (nullable = true)
 |-- Donor State: string (nullable = true)
 |-- Donor Is Teacher: string (nullable = true)
 |-- Donor Zip: string (nullable = true)



In [5]:
donations.show(5)

+--------------------+--------------------+--------------------+-----------------------------------+---------------+-------------------+----------------------+
|          Project ID|         Donation ID|            Donor ID|Donation Included Optional Donation|Donation Amount|Donor Cart Sequence|Donation Received Date|
+--------------------+--------------------+--------------------+-----------------------------------+---------------+-------------------+----------------------+
|000009891526c0ade...|68872912085866622...|1f4b5b6e68445c6c4...|                                 No|         178.37|                 11|   2016-08-23 13:15:57|
|000009891526c0ade...|dcf1071da3aa3561f...|4aaab6d244bf35996...|                                Yes|           25.0|                  2|   2016-06-06 20:05:23|
|000009891526c0ade...|18a234b9d1e538c43...|0b0765dc9c759adc4...|                                Yes|           20.0|                  3|   2016-06-06 14:08:46|
|000009891526c0ade...|38d2744bf9138b0b5.

## Register DataFrames as tables to use them in SQL queries

In [6]:
donations.createOrReplaceTempView("donations")
donors.createOrReplaceTempView("donors")

## Analyze data using SQL queries

In [7]:
results = spark.sql("""
SELECT sum(`Donation Amount`) AS `Total Donations`
FROM donations 
WHERE `Donation Received Date` BETWEEN '2016-09-01' AND '2016-09-30'
""")
# Alternatively, use DataFrames API to get results
# results = donations \
#.where("`Donation Received Date` BETWEEN '2016-09-01' AND '2016-09-30'") \
#.agg(sum("Donation Amount"))
results.show()

+-----------------+
|  Total Donations|
+-----------------+
|7343436.779999997|
+-----------------+



In [8]:
donations_by_state = spark.sql("""
SELECT `Donor State`,
sum(`Donation Amount`) AS `Total Donations`
FROM donations 
JOIN donors ON donations.`Donor ID` == donors.`Donor ID`
WHERE `Donation Received Date` BETWEEN '2016-09-01' AND '2016-12-31'
GROUP BY `Donor State`
ORDER BY 2 DESC
""")
donations_by_state.show(10)

+--------------+------------------+
|   Donor State|   Total Donations|
+--------------+------------------+
|    California|        4351350.57|
|      New York|2300683.9300000006|
|         Texas|1593148.5500000007|
|      Illinois|1434985.6100000003|
|       Florida|1183645.6500000004|
|North Carolina|1120992.6000000003|
| Massachusetts|1007399.6199999998|
|    Washington|         808259.33|
|  Pennsylvania| 799274.9200000005|
|       Georgia| 694610.0200000006|
+--------------+------------------+
only showing top 10 rows



## Use a  lambda function as a User Defined Function in SQL

In [9]:
spark.udf.register("DtoEU", lambda dollars: (dollars * 0.86))
donations_by_state.createOrReplaceTempView("donations_by_state")

spark.sql("""
SELECT `Donor State`,
DtoEU(`Total Donations`) AS `Total Donations in Euro`
FROM donations_by_state""").show(10)

+--------------+-----------------------+
|   Donor State|Total Donations in Euro|
+--------------+-----------------------+
|    California|     3742161.4902000003|
|      New York|     1978588.1798000005|
|         Texas|     1370107.7530000007|
|      Illinois|     1234087.6246000002|
|       Florida|     1017935.2590000003|
|North Carolina|      964053.6360000003|
| Massachusetts|      866363.6731999997|
|    Washington|            695103.0238|
|  Pennsylvania|      687376.4312000005|
|       Georgia|      597364.6172000006|
+--------------+-----------------------+
only showing top 10 rows



## CSVs can also be saved as partitioned Hive tables

In [10]:
donations.select(col("Project ID").alias("project_id"), 
                 col("Donation ID").alias("donation_id"),
                 col("Donor ID").alias("donor_id"),
                 col("Donation Included Optional Donation").alias("donation_included_optional_donation"),
                 col("Donation Amount").alias("donation_amount"),
                 col("Donor Cart Sequence").alias("donor_cart_sequence"),
                 col("Donation Received Date").alias("donation_received_date")) \
.write.bucketBy(10, "donation_received_date").saveAsTable("donations_hive")

In [11]:
donors.select(col("Donor ID").alias("donor_id"),
              col("Donor City").alias("donor_city"),
              col("Donor State").alias("donor_state"),
              col("Donor Is Teacher").alias("donor_is_teacher"),
              col("Donor Zip").alias("donor_zip")) \
.write.partitionBy("donor_state").saveAsTable("donors_hive")

In [12]:
donations_by_state = spark.sql("""
SELECT `donor_state`,
sum(`donation_amount`) AS `Total Donations`
FROM donations_hive 
JOIN donors_hive ON donations_hive.donor_id == donors_hive.donor_id
WHERE donation_received_date BETWEEN '2016-09-01' AND '2016-12-31'
GROUP BY donor_state
ORDER BY 2 DESC
""")
donations_by_state.show(10)

+--------------+------------------+
|   donor_state|   Total Donations|
+--------------+------------------+
|    California| 4351350.569999999|
|      New York|2300683.9300000006|
|         Texas|1593148.5500000007|
|      Illinois|1434985.6100000003|
|       Florida|1183645.6500000006|
|North Carolina|1120992.6000000003|
| Massachusetts|        1007399.62|
|    Washington| 808259.3299999998|
|  Pennsylvania| 799274.9200000005|
|       Georgia| 694610.0200000006|
+--------------+------------------+
only showing top 10 rows



## Hive tables can be analyzed to collect statistics

In [13]:
spark.sql("""ANALYZE TABLE donations_hive COMPUTE STATISTICS""");
spark.sql("""ANALYZE TABLE donors_hive COMPUTE STATISTICS""");

## Use EXPLAIN to see query optimization at work

In [14]:
spark.sql("""
SELECT sum(`donation_amount`) AS `Total Donations`
FROM donations_hive 
WHERE `donation_received_date` BETWEEN '2016-09-01' AND '2016-09-30'
""").explain(True)

== Parsed Logical Plan ==
'Project ['sum('donation_amount) AS Total Donations#523]
+- 'Filter (('donation_received_date >= 2016-09-01) && ('donation_received_date <= 2016-09-30))
   +- 'UnresolvedRelation `donations_hive`

== Analyzed Logical Plan ==
Total Donations: double
Aggregate [sum(donation_amount#528) AS Total Donations#523]
+- Filter ((cast(donation_received_date#530 as string) >= 2016-09-01) && (cast(donation_received_date#530 as string) <= 2016-09-30))
   +- SubqueryAlias donations_hive
      +- Relation[project_id#524,donation_id#525,donor_id#526,donation_included_optional_donation#527,donation_amount#528,donor_cart_sequence#529,donation_received_date#530] parquet

== Optimized Logical Plan ==
Aggregate [sum(donation_amount#528) AS Total Donations#523]
+- Project [donation_amount#528]
   +- Filter ((isnotnull(donation_received_date#530) && (cast(donation_received_date#530 as string) >= 2016-09-01)) && (cast(donation_received_date#530 as string) <= 2016-09-30))
      +- Rela

In [15]:
spark.sql("""
SELECT `Donor State`,
sum(`Donation Amount`) AS `Total Donations`
FROM donations 
JOIN donors ON donations.`Donor ID` == donors.`Donor ID`
WHERE `Donation Received Date` BETWEEN '2016-09-01' AND '2016-12-31'
GROUP BY `Donor State`
""").explain(True)

== Parsed Logical Plan ==
'Aggregate ['Donor State], ['Donor State, 'sum('Donation Amount) AS Total Donations#535]
+- 'Filter (('Donation Received Date >= 2016-09-01) && ('Donation Received Date <= 2016-12-31))
   +- 'Join Inner, ('donations.Donor ID = 'donors.Donor ID)
      :- 'UnresolvedRelation `donations`
      +- 'UnresolvedRelation `donors`

== Analyzed Logical Plan ==
Donor State: string, Total Donations: double
Aggregate [Donor State#78], [Donor State#78, sum(Donation Amount#14) AS Total Donations#535]
+- Filter ((cast(Donation Received Date#16 as string) >= 2016-09-01) && (cast(Donation Received Date#16 as string) <= 2016-12-31))
   +- Join Inner, (Donor ID#12 = Donor ID#76)
      :- SubqueryAlias donations
      :  +- Relation[Project ID#10,Donation ID#11,Donor ID#12,Donation Included Optional Donation#13,Donation Amount#14,Donor Cart Sequence#15,Donation Received Date#16] csv
      +- SubqueryAlias donors
         +- Relation[Donor ID#76,Donor City#77,Donor State#78,Donor I