<ul>
    <li> Run SQl queries on Dataframes</li>
    <li> Work with Spark databases & Tables</li>
    <li> Work with user defined functions (UDF)</li>
    <li> Perform operations on multiple datasets</li>
    <li> Perform window operations</li>
</ul>


In [2]:
# Running SQL Queries on DataFrames

from pyspark.sql.types import *
from pyspark.sql import SparkSession
import findspark

findspark.init()
findspark.find()

'/usr/local/spark'

In [3]:
spark = (
    SparkSession.builder.appName("SparkSQLApp")
    .master("local[4]")
    .config("apark.dynamicAllocation.enabled", "false")
    .config("spark.sql.adaptive.enabled", "false")
    .getOrCreate()
)

sc = spark.sparkContext
sc

In [4]:
# Create the schema for YellowTaxi Data

taxi_schema = StructType(
    [
        StructField("VendorId", IntegerType(), True),
        StructField("tpep_pickup_datetime", TimestampType(), True),
        StructField("tpep_dropoff_datetime", TimestampType(), True),
        StructField("passenger_count", DoubleType(), True),
        StructField("trip_distance", DoubleType(), True),
        StructField("RatecodeID", DoubleType(), True),
        StructField("store_and_fwd_flag", StringType(), True),
        StructField("PULocationID", IntegerType(), True),
        StructField("DOLocationID", IntegerType(), True),
        StructField("payment_type", IntegerType(), True),
        StructField("fare_amount", DoubleType(), True),
        StructField("extra", DoubleType(), True),
        StructField("mta_tax", DoubleType(), True),
        StructField("tip_amount", DoubleType(), True),
        StructField("tolls_amount", DoubleType(), True),
        StructField("improvement_surcharge", IntegerType(), True),
        StructField("total_amount", IntegerType(), True),
        StructField("congestion_surcharge", DoubleType(), True),
        StructField("airport_fee", DoubleType(), True),
    ]
)

In [5]:
yellowTaxiDF = (
    spark.read.option("header", "true")
    .schema(taxi_schema)
    .csv("data/csvs/YellowTaxis_202210.csv")
)

yellowTaxiDF.printSchema()

root
 |-- VendorId: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: integer (nullable = true)
 |-- total_amount: integer (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: double (nullable = true)



In [11]:
yellowTaxiDF.createOrReplaceTempView("YellowTaxis")
outputDF = spark.sql("SELECT * FROM YellowTaxis WHERE PULocationID = 171")
outputDF.show()

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|VendorId|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|airport_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|       1| 2022-10-01 08:17:23|  2022-10-01 09:08:50|            1.0|          9.4|      99.0|                 N|         171|         263|           1|       35.2|  0.0|    0.5|       0.

In [14]:
greenTaxiDF = (
    spark.read.option("header", "true")
    .schema(taxi_schema)
    .csv("data/csvs/GreenTaxis_202210.csv")
)

greenTaxiDF.createOrReplaceTempView("GreenTaxis")

In [15]:
greenTaxiDF.printSchema()

root
 |-- VendorId: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: integer (nullable = true)
 |-- total_amount: integer (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: double (nullable = true)



### Merge query for yellow and green taxis data

In [16]:
outputDF = spark.sql(
    """
SELECT 'Yellow'                 AS TaxiType,
        tpep_pickup_datetime   AS PickupTime,
        tpep_dropoff_datetime  AS DropTime,
        PULocationID           AS PickupLocationId,
        DOLocationID           AS DropLocationId
FROM YellowTaxis

UNION ALL

SELECT 'Green'                 AS TaxiType,
        tpep_pickup_datetime   AS PickupTime,
        tpep_dropoff_datetime  AS DropTime,
        PULocationID           AS PickupLocationId,
        DOLocationID           AS DropLocationId
FROM GreenTaxis
"""
)

outputDF.show()

+--------+-------------------+-------------------+----------------+--------------+
|TaxiType|         PickupTime|           DropTime|PickupLocationId|DropLocationId|
+--------+-------------------+-------------------+----------------+--------------+
|  Yellow|2022-10-01 00:03:41|2022-10-01 00:18:39|             249|           107|
|  Yellow|2022-10-01 00:14:30|2022-10-01 00:19:48|             151|           238|
|  Yellow|2022-10-01 00:27:13|2022-10-01 00:37:41|             238|           166|
|  Yellow|2022-10-01 00:32:53|2022-10-01 00:38:55|             142|           239|
|  Yellow|2022-10-01 00:44:55|2022-10-01 00:50:21|             238|           166|
|  Yellow|2022-10-01 00:22:52|2022-10-01 00:52:14|             186|            41|
|  Yellow|2022-10-01 00:33:19|2022-10-01 00:44:51|             162|           145|
|  Yellow|2022-10-01 00:02:42|2022-10-01 00:50:01|             100|            22|
|  Yellow|2022-10-01 00:06:35|2022-10-01 00:24:38|             138|           112|
|  Y

In [7]:
taxiZonesSchema = "LocationID INT, Borough STRING, Zone STRING, ServiceZone STRING"

taxiZonesDF = spark.read.schema(taxiZonesSchema).csv("data/csvs/TaxiZones.csv")


# Create a global temp view:
taxiZonesDF.createOrReplaceGlobalTempView("TaxiZones")

taxiZonesDF.show()

+----------+-------------+--------------------+-----------+
|LocationID|      Borough|                Zone|ServiceZone|
+----------+-------------+--------------------+-----------+
|         1|          EWR|      Newark Airport|        EWR|
|         2|       Queens|         Jamaica Bay|  Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|  Boro Zone|
|         4|    Manhattan|       Alphabet City|Yellow Zone|
|         5|Staten Island|       Arden Heights|  Boro Zone|
|         6|Staten Island|Arrochar/Fort Wad...|  Boro Zone|
|         7|       Queens|             Astoria|  Boro Zone|
|         8|       Queens|        Astoria Park|  Boro Zone|
|         9|       Queens|          Auburndale|  Boro Zone|
|        10|       Queens|        Baisley Park|  Boro Zone|
|        11|     Brooklyn|          Bath Beach|  Boro Zone|
|        12|    Manhattan|        Battery Park|Yellow Zone|
|        13|    Manhattan|   Battery Park City|Yellow Zone|
|        14|     Brooklyn|           Bay

#### Create a report

In [17]:
spark.sql(
    """
    SELECT Borough, TaxiType, Count(*) AS totaltrips
    FROM global_temp.TaxiZones
    LEFT JOIN
    (
    SELECT 'Yellow' AS TaxiType, PULocationID FROM YellowTaxis
    UNION ALL
    SELECT 'Green' AS TaxiType, PULocationID FROM GreenTaxis
    ) AllTaxis

    ON AllTaxis.PULocationID = TaxiZones.LocationID

    GROUP BY Borough, TaxiType
    ORDER BY Borough, TaxiType
"""
).show()

+-------------+--------+----------+
|      Borough|TaxiType|totaltrips|
+-------------+--------+----------+
|        Bronx|    NULL|         1|
|        Bronx|  Yellow|       766|
|     Brooklyn|    NULL|         1|
|     Brooklyn|  Yellow|      3268|
|          EWR|  Yellow|       177|
|    Manhattan|    NULL|         3|
|    Manhattan|  Yellow|    491632|
|       Queens|    NULL|         2|
|       Queens|  Yellow|     53025|
|Staten Island|    NULL|        10|
|Staten Island|  Yellow|        60|
|      Unknown|  Yellow|      8002|
+-------------+--------+----------+

