# Spark SQL Tutorial
- Author: Akira Takihara Wang (https://github.com/akiratwang)
- Tutorial Up-to-Date as of: April 2021  
- Usage: For MAST30034 students only  


In [1]:
from pyspark import SparkContext
from pyspark.sql import SparkSession

# Start the spark context
sc = SparkContext.getOrCreate(conf=swan_spark_conf)

# create a spark session (which will run spark jobs)
spark = SparkSession.builder.getOrCreate()

# apply settings to session
spark.conf.set('spark.sql.repl.eagerEval.enabled', True)
spark.conf.set('spark.sql.execution.arrow.pyspark.enabled', True)

Let's create a schema using DDL formatted strings.

In [2]:
# create DDL
schema = """
`VendorID` INT, `tpep_pickup_datetime` STRING, `tpep_dropoff_datetime` STRING,
`passenger_count` INT, `trip_distance` DOUBLE, `pickup_longitude` DOUBLE, `pickup_latitude` DOUBLE,
`RateCodeID` INT, `store_and_fwd_flag` STRING, `dropoff_longitude` DOUBLE, `dropoff_latitude` DOUBLE,
`payment_type` INT, `fare_amount` DOUBLE, `extra` DOUBLE, `mta_tax` DOUBLE, `tip_amount` DOUBLE,
`tolls_amount` DOUBLE, `improvement_surcharge` DOUBLE, `total_amount` DOUBLE
"""

In [3]:
sdf = spark.read.csv('../data/sample.csv', header=True, schema=schema)
sdf.schema

StructType(List(StructField(VendorID,IntegerType,true),StructField(tpep_pickup_datetime,StringType,true),StructField(tpep_dropoff_datetime,StringType,true),StructField(passenger_count,IntegerType,true),StructField(trip_distance,DoubleType,true),StructField(pickup_longitude,DoubleType,true),StructField(pickup_latitude,DoubleType,true),StructField(RateCodeID,IntegerType,true),StructField(store_and_fwd_flag,StringType,true),StructField(dropoff_longitude,DoubleType,true),StructField(dropoff_latitude,DoubleType,true),StructField(payment_type,IntegerType,true),StructField(fare_amount,DoubleType,true),StructField(extra,DoubleType,true),StructField(mta_tax,DoubleType,true),StructField(tip_amount,DoubleType,true),StructField(tolls_amount,DoubleType,true),StructField(improvement_surcharge,DoubleType,true),StructField(total_amount,DoubleType,true)))

- As you can see, this achieves the same result as using `StructType()`, but may be easier or more difficult depending on the number of columns you have. 
- My personal preference would be using `StructType()` for this dataset as you can use generator functions to simplify the allocation of dtypes.

In [4]:
sdf.limit(5)

VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,pickup_longitude,pickup_latitude,RateCodeID,store_and_fwd_flag,dropoff_longitude,dropoff_latitude,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount
2,1/12/15 0:00,1/12/15 0:05,5,0.96,-73.97994232,40.76538086,1,N,-73.96630859,40.76308823,1,5.5,0.5,0.5,1.0,0.0,0.3,7.8
2,1/12/15 0:00,1/12/15 0:00,2,2.69,-73.97233582,40.76237869,1,N,-73.99362946,40.74599838,1,21.5,0.0,0.5,3.34,0.0,0.3,25.64
2,1/12/15 0:00,1/12/15 0:00,1,2.62,-73.96884918,40.76453018,1,N,-73.97454834,40.79164124,1,17.0,0.0,0.5,3.56,0.0,0.3,21.36
1,1/12/15 0:00,1/12/15 0:05,1,1.2,-73.99393463,40.74168396,1,N,-73.99766541,40.74746704,1,6.5,0.5,0.5,0.2,0.0,0.3,8.0
1,1/12/15 0:00,1/12/15 0:09,2,3.0,-73.98892212,40.72698975,1,N,-73.97559357,40.6968689,2,11.0,0.5,0.5,0.0,0.0,0.3,12.3


Now let's get our datetimes into `TIMESTAMP` formats (since our current dataset does not have the correct format).

In [5]:
import pyspark.sql.functions as F

from pyspark.sql.functions import col
from pyspark.sql.types import *

In [6]:
# create UDF
from datetime import datetime

@F.udf("timestamp")
def format_dtime(dtime):
    date, time = dtime.split()
    d, m, y = map(int, date.split('/'))
    # year is abbreviated
    y = int(f"20{y}")
    
    h, mins = map(int, time.split(':'))
    return datetime(y, m, d, h, mins)

In [7]:
sdf.withColumn("tpep_pickup_datetime", format_dtime('tpep_pickup_datetime')) \
    .withColumn("tpep_dropoff_datetime", format_dtime('tpep_dropoff_datetime')) \
    .limit(5)

VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,pickup_longitude,pickup_latitude,RateCodeID,store_and_fwd_flag,dropoff_longitude,dropoff_latitude,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount
2,2015-12-01 00:00:00,2015-12-01 00:05:00,5,0.96,-73.97994232,40.76538086,1,N,-73.96630859,40.76308823,1,5.5,0.5,0.5,1.0,0.0,0.3,7.8
2,2015-12-01 00:00:00,2015-12-01 00:00:00,2,2.69,-73.97233582,40.76237869,1,N,-73.99362946,40.74599838,1,21.5,0.0,0.5,3.34,0.0,0.3,25.64
2,2015-12-01 00:00:00,2015-12-01 00:00:00,1,2.62,-73.96884918,40.76453018,1,N,-73.97454834,40.79164124,1,17.0,0.0,0.5,3.56,0.0,0.3,21.36
1,2015-12-01 00:00:00,2015-12-01 00:05:00,1,1.2,-73.99393463,40.74168396,1,N,-73.99766541,40.74746704,1,6.5,0.5,0.5,0.2,0.0,0.3,8.0
1,2015-12-01 00:00:00,2015-12-01 00:09:00,2,3.0,-73.98892212,40.72698975,1,N,-73.97559357,40.6968689,2,11.0,0.5,0.5,0.0,0.0,0.3,12.3


- Conversion looks good to me, so let's keep it.
- Remember, Spark is immutable.

In [8]:
sdf = sdf.withColumn("tpep_pickup_datetime", format_dtime('tpep_pickup_datetime')) \
    .withColumn("tpep_dropoff_datetime", format_dtime('tpep_dropoff_datetime'))

In [9]:
sdf.limit(5)

VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,pickup_longitude,pickup_latitude,RateCodeID,store_and_fwd_flag,dropoff_longitude,dropoff_latitude,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount
2,2015-12-01 00:00:00,2015-12-01 00:05:00,5,0.96,-73.97994232,40.76538086,1,N,-73.96630859,40.76308823,1,5.5,0.5,0.5,1.0,0.0,0.3,7.8
2,2015-12-01 00:00:00,2015-12-01 00:00:00,2,2.69,-73.97233582,40.76237869,1,N,-73.99362946,40.74599838,1,21.5,0.0,0.5,3.34,0.0,0.3,25.64
2,2015-12-01 00:00:00,2015-12-01 00:00:00,1,2.62,-73.96884918,40.76453018,1,N,-73.97454834,40.79164124,1,17.0,0.0,0.5,3.56,0.0,0.3,21.36
1,2015-12-01 00:00:00,2015-12-01 00:05:00,1,1.2,-73.99393463,40.74168396,1,N,-73.99766541,40.74746704,1,6.5,0.5,0.5,0.2,0.0,0.3,8.0
1,2015-12-01 00:00:00,2015-12-01 00:09:00,2,3.0,-73.98892212,40.72698975,1,N,-73.97559357,40.6968689,2,11.0,0.5,0.5,0.0,0.0,0.3,12.3


## Creating a SQL Table with an existing Spark DataFrame
The easiest method is to use `sdf.createOrReplaceTempView(TABLE_NAME)`


In [10]:
sdf.createOrReplaceTempView("taxi_data")

Select all columns from our table, where the Vendor is `VeriFone Inc.`, has at least 1 passenger with trip distances greater than 1 mile.

In [11]:
sql_query = """
SELECT * 
FROM taxi_data
WHERE VendorID = 2
    AND passenger_count >= 1
    AND trip_distance >= 1
LIMIT 5
"""

spark.sql(sql_query)

VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,pickup_longitude,pickup_latitude,RateCodeID,store_and_fwd_flag,dropoff_longitude,dropoff_latitude,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount
2,2015-12-01 00:00:00,2015-12-01 00:00:00,2,2.69,-73.97233582,40.76237869,1,N,-73.99362946,40.74599838,1,21.5,0.0,0.5,3.34,0.0,0.3,25.64
2,2015-12-01 00:00:00,2015-12-01 00:00:00,1,2.62,-73.96884918,40.76453018,1,N,-73.97454834,40.79164124,1,17.0,0.0,0.5,3.56,0.0,0.3,21.36
2,2015-12-01 00:00:00,2015-12-01 00:08:00,2,1.91,-73.99420929,40.74610138,1,N,-74.00424957,40.72180939,1,8.0,0.5,0.5,1.86,0.0,0.3,11.16
2,2015-12-01 00:00:00,2015-12-01 00:17:00,1,4.5,-74.00675964,40.7189064,1,N,-73.98969269,40.77285385,1,16.5,0.5,0.5,3.56,0.0,0.3,21.36
2,2015-12-01 00:00:00,2015-12-01 00:10:00,2,1.42,-73.99963379,40.73477173,1,N,-73.98906708,40.72312164,1,8.5,0.5,0.5,2.45,0.0,0.3,12.25


To list all metadata:
- Databases: `spark.catalog.listDatabases()`
- Tables: `spark.catalog.listTables()`
- Columns of a table: `spark.catalog.listColumns(TABLE_NAME)`

In [12]:
spark.catalog.listDatabases()

[Database(name='default', description='default database', locationUri='file:/mnt/c/users/akira/documents/github/MAST30034/advanced_tutorials/spark-warehouse')]

In [13]:
spark.catalog.listTables()

[Table(name='taxi_data', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]

...and the alternative using DataFrame API if you want to.

In [14]:
sdf.select(sdf.columns) \
    .filter((col('VendorID') == 2) & (col('passenger_count') >= 1) & (col('trip_distance') >= 1)) \
    .limit(5)

VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,pickup_longitude,pickup_latitude,RateCodeID,store_and_fwd_flag,dropoff_longitude,dropoff_latitude,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount
2,2015-12-01 00:00:00,2015-12-01 00:00:00,2,2.69,-73.97233582,40.76237869,1,N,-73.99362946,40.74599838,1,21.5,0.0,0.5,3.34,0.0,0.3,25.64
2,2015-12-01 00:00:00,2015-12-01 00:00:00,1,2.62,-73.96884918,40.76453018,1,N,-73.97454834,40.79164124,1,17.0,0.0,0.5,3.56,0.0,0.3,21.36
2,2015-12-01 00:00:00,2015-12-01 00:08:00,2,1.91,-73.99420929,40.74610138,1,N,-74.00424957,40.72180939,1,8.0,0.5,0.5,1.86,0.0,0.3,11.16
2,2015-12-01 00:00:00,2015-12-01 00:17:00,1,4.5,-74.00675964,40.7189064,1,N,-73.98969269,40.77285385,1,16.5,0.5,0.5,3.56,0.0,0.3,21.36
2,2015-12-01 00:00:00,2015-12-01 00:10:00,2,1.42,-73.99963379,40.73477173,1,N,-73.98906708,40.72312164,1,8.5,0.5,0.5,2.45,0.0,0.3,12.25


## Creating SQL views directly from files
- If you don't have a Spark DataFrame, you can still read it in directly.

In [15]:
import os

sql_query = f"""
CREATE OR REPLACE TEMPORARY VIEW aggregation_parquet
USING parquet
OPTIONS (path 
    "{'/'.join(os.getcwd().split('/')[:-1])}/data/aggregated_results.parquet/")
"""

spark.sql(sql_query)

In [16]:
spark.sql("SELECT * FROM aggregation_parquet")

passenger_count,avg_trip_amount
0,16.05009840158623
1,15.90999770852072
2,16.84797666441966
3,16.403699092777416
4,16.577589926570035
5,16.25151447266953
6,15.880795405728485
7,47.33983263598327
8,55.25762430939226
9,59.44349112426035
