 # Skill Drill: Putting it ALL together
 ## Overview

 A VP at your company has written a SQL for a report that shows how much time is being lost due to flight delays by Airport and Carrier.  It is performing below expectations.  Using all of the optimizing skills you have learned in Unit 8, get this query to run as fast as possible.


---

---




 Hint: Initial query takes between 15-20 seconds.  Final query should be <2 seconds


In [10]:
# activate Spark in our Colab notebook.
import os
# Find the latest version of spark 3.0  from http://www-us.apache.org/dist/spark/ and enter as the spark version
# For example:
spark_version = 'spark-3.0.2'
# spark_version = 'spark-3.<enter version>'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www-us.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Hit:1 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease
Ign:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
Ign:3 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Hit:4 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release
Hit:5 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Hit:6 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease
Hit:7 http://archive.ubuntu.com/ubuntu bionic InRelease
Get:9 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
Get:10 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
Hit:12 http://ppa.launchpad.net/cran/libgit2/ubuntu bionic InRelease
Hit:13 http://ppa.launchpad.net/deadsnakes/ppa/ubuntu bionic InRelease
Get:14 http://archive.ubuntu.com/ubuntu bionic-backports InRelease [74.6 kB]
Hit:15 ht

In [11]:
#import packages

from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.types import StructType,StructField,StringType, DateType,IntegerType
import pandas as pd

# we are going to use this to time our queries.
import time

# Create a SparkSession
spark = SparkSession.builder.appName("SparkSQL").getOrCreate()

In [12]:
# Read in data from S3 Bucket
from pyspark import SparkFiles
url = "https://2u-data-curriculum-team.s3.amazonaws.com/dataviz-netflix/DelayedFlights.csv"
spark.sparkContext.addFile(url)
df = spark.read.csv(SparkFiles.get("DelayedFlights.csv"), sep=",", header=True)
url_cities='https://2u-data-curriculum-team.s3.amazonaws.com/dataviz-netflix/cities500.txt'
spark.sparkContext.addFile(url_cities)
df_lookup_geo = spark.read.csv(SparkFiles.get("cities500.txt"), sep="\t", header=True)

# we are going to do a lookup here as well so upload the airportCodes.csv file from you Resources directory 
df_lookup_city_name=spark.read.csv('/content/airportCodes.csv', sep=',', header=True)


In [14]:
#Create temporary views for each of our dataframes
# We are going to filter the data to US only as we create the Temp Views.

df.createOrReplaceTempView('delayed')
df_lookup_city_name.createOrReplaceTempView('lookup_city')
df_lookup_geo.createOrReplaceTempView('lookup_geo')

In [15]:
df_lookup_city_name.show()

+--------------+--------------------+-----------+
|          City|             country|airportCode|
+--------------+--------------------+-----------+
|       Aalborg|             Denmark|        AAL|
|      Aalesund|              Norway|        AES|
|        Aarhus|             Denmark|        AAR|
|Abbotsford, BC|              Canada|        YXX|
|Abbotsford, BC|              Canada|        YXX|
|      Aberdeen|            Scotland|        ABZ|
|  Aberdeen, SD|                 USA|        ABR|
|       Abidjan|         Ivory Coast|        ABJ|
|   Abilene, TX|                 USA|        ABI|
|     Abu Dhabi|United Arab Emirates|        AUH|
|         Abuja|             Nigeria|        ABV|
|      Acapulco|              Mexico|        ACA|
|         Accra|               Ghana|        ACC|
|         Adana|              Turkey|        ADA|
|   Addis Ababa|            Ethiopia|        ADD|
|Adelaide, S.A.|           Australia|        ADL|
|          Aden|               Yemen|        ADE|


In [16]:
# Here is the  initial query presented to you for optimization
# Note the runtime
start_time = time.time()

spark.sql("""
with allColumns
(select 
a.Year,
a.Month,
a.DayofMonth,
a.DayOfWeek,
a.DepTime,
a.CRSDepTime,
a.ArrTime,
a.CRSArrTime,
a.UniqueCarrier,
a.FlightNum,
a.TailNum,
a.ActualElapsedTime,
a.CRSElapsedTime,
a.AirTime,
a.ArrDelay,
a.DepDelay,
a.Origin,
b.City as Origin_City,
geo.latitude as Origin_latitude,
geo.longitude as Origin_longitude,
a.Dest,
c.City as Dest_City,
geo_dest.latitude as Dest_latitude,
geo_dest.longitude as Dest_longitude,
a.Distance,
a.TaxiIn,
a.TaxiOut,
a.Cancelled,
a.CancellationCode,
a.Diverted,
a.CarrierDelay,
a.WeatherDelay,
a.NASDelay,
a.SecurityDelay,
a.LateAircraftDelay from  delayed a 
  inner join lookup_city b
    on a.Origin=b.airportCode
  inner join lookup_city c
    on a.Dest=c.airportCode
  inner join lookup_geo geo
on split(b.City,',')[0]=geo.name
     and trim(split(b.City,',')[1])=geo.admin1_code
  inner join lookup_geo geo_dest
    on c.City=concat(geo_dest.name,', ',geo_dest.admin1_code)
)
select Origin, UniqueCarrier, Origin_City, Origin_latitude, Origin_Longitude, Dest_latitude, Dest_longitude, max(DepDelay) as delayed, avg(CarrierDelay) avgCarrierDelay 
from allColumns 
group by Origin, UniqueCarrier, Origin_City, Origin_latitude, Origin_Longitude, Dest_latitude, Dest_longitude
""").show()

print("--- %s seconds ---" % (time.time() - start_time))

+------+-------------+---------------+---------------+----------------+-------------+--------------+-------+------------------+
|Origin|UniqueCarrier|    Origin_City|Origin_latitude|Origin_Longitude|Dest_latitude|Dest_longitude|delayed|   avgCarrierDelay|
+------+-------------+---------------+---------------+----------------+-------------+--------------+-------+------------------+
|   ATL|           EV|    Atlanta, GA|         33.749|       -84.38798|     31.31129|     -92.44514|   99.0| 28.81025641025641|
|   ABQ|           DL|Albuquerque, NM|       35.08449|      -106.65114|       33.749|     -84.38798|   99.0|40.401869158878505|
|   ATW|           EV|   Appleton, WI|       44.26193|       -88.41538|       33.749|     -84.38798|   99.0|30.546666666666667|
|   BWI|           WN|  Baltimore, MD|       39.29038|       -76.61219|     30.26715|     -97.74306|   95.0|12.741935483870968|
|   CAK|           EV|      Akron, OH|       41.08144|       -81.51901|       33.749|     -84.38798|   9

In [17]:
#partition the largest table
df_1 = spark.sql("""
with allColumns
(select 
a.Year,
a.Month,
a.DayofMonth,
a.DayOfWeek,
a.DepTime,
a.CRSDepTime,
a.ArrTime,
a.CRSArrTime,
a.UniqueCarrier,
a.FlightNum,
a.TailNum,
a.ActualElapsedTime,
a.CRSElapsedTime,
a.AirTime,
a.ArrDelay,
a.DepDelay,
a.Origin,
b.City as Origin_City,
geo.latitude as Origin_latitude,
geo.longitude as Origin_longitude,
a.Dest,
c.City as Dest_City,
geo_dest.latitude as Dest_latitude,
geo_dest.longitude as Dest_longitude,
a.Distance,
a.TaxiIn,
a.TaxiOut,
a.Cancelled,
a.CancellationCode,
a.Diverted,
a.CarrierDelay,
a.WeatherDelay,
a.NASDelay,
a.SecurityDelay,
a.LateAircraftDelay from  delayed a 
  inner join lookup_city b
    on a.Origin=b.airportCode
  inner join lookup_city c
    on a.Dest=c.airportCode
  inner join lookup_geo geo
on split(b.City,',')[0]=geo.name
and trim(split(b.City,',')[1])=geo.admin1_code
  inner join lookup_geo geo_dest
    on c.City=concat(geo_dest.name,', ',geo_dest.admin1_code)
)
select Origin, UniqueCarrier, Origin_City, Origin_latitude, Origin_Longitude, Dest_latitude, Dest_longitude, max(DepDelay) as delayed, avg(CarrierDelay) avgCarrierDelay 
from allColumns 
group by Origin, UniqueCarrier, Origin_City, Origin_latitude, Origin_Longitude, Dest_latitude, Dest_longitude
""")

In [18]:
# read the new parquet formatted data
df_1.show()

+------+-------------+---------------+---------------+----------------+-------------+--------------+-------+------------------+
|Origin|UniqueCarrier|    Origin_City|Origin_latitude|Origin_Longitude|Dest_latitude|Dest_longitude|delayed|   avgCarrierDelay|
+------+-------------+---------------+---------------+----------------+-------------+--------------+-------+------------------+
|   ATL|           EV|    Atlanta, GA|         33.749|       -84.38798|     31.31129|     -92.44514|   99.0| 28.81025641025641|
|   ABQ|           DL|Albuquerque, NM|       35.08449|      -106.65114|       33.749|     -84.38798|   99.0|40.401869158878505|
|   ATW|           EV|   Appleton, WI|       44.26193|       -88.41538|       33.749|     -84.38798|   99.0|30.546666666666667|
|   BWI|           WN|  Baltimore, MD|       39.29038|       -76.61219|     30.26715|     -97.74306|   95.0|12.741935483870968|
|   CAK|           EV|      Akron, OH|       41.08144|       -81.51901|       33.749|     -84.38798|   9

In [20]:
# create a view (same name as before so we don't have change our SQL)
df_1.write.parquet('parquet_title_basic',mode='overwrite')


In [22]:
par_df=spark.read.parquet('parquet_title_basic')
par_df.createOrReplaceTempView('parquet_all')

In [23]:
start_time = time.time()
spark.sql("""
select *
from parquet_all
""").show()
print("--- %s seconds ---" % (time.time() - start_time))

+------+-------------+---------------+---------------+----------------+-------------+--------------+-------+------------------+
|Origin|UniqueCarrier|    Origin_City|Origin_latitude|Origin_Longitude|Dest_latitude|Dest_longitude|delayed|   avgCarrierDelay|
+------+-------------+---------------+---------------+----------------+-------------+--------------+-------+------------------+
|   ABQ|           XE|Albuquerque, NM|       35.08449|      -106.65114|     30.26715|     -97.74306|   95.0| 5.610169491525424|
|   ABQ|           WN|Albuquerque, NM|       35.08449|      -106.65114|     39.29038|     -76.61219|   96.0| 9.567164179104477|
|   ABQ|           DL|Albuquerque, NM|       35.08449|      -106.65114|       33.749|     -84.38798|   99.0|40.401869158878505|
|   ATW|           EV|   Appleton, WI|       44.26193|       -88.41538|       33.749|     -84.38798|   99.0|30.546666666666667|
|   BWI|           WN|  Baltimore, MD|       39.29038|       -76.61219|     30.26715|     -97.74306|   9

In [24]:
# run 2 after storing the data more appropriately and partitioning

# Note the runtime
start_time = time.time()

spark.sql("""
with allColumns
(select 
a.Year,
a.Month,
a.DayofMonth,
a.DayOfWeek,
a.DepTime,
a.CRSDepTime,
a.ArrTime,
a.CRSArrTime,
a.UniqueCarrier,
a.FlightNum,
a.TailNum,
a.ActualElapsedTime,
a.CRSElapsedTime,
a.AirTime,
a.ArrDelay,
a.DepDelay,
a.Origin,
b.City as Origin_City,
geo.latitude as Origin_latitude,
geo.longitude as Origin_longitude,
a.Dest,
c.City as Dest_City,
geo_dest.latitude as Dest_latitude,
geo_dest.longitude as Dest_longitude,
a.Distance,
a.TaxiIn,
a.TaxiOut,
a.Cancelled,
a.CancellationCode,
a.Diverted,
a.CarrierDelay,
a.WeatherDelay,
a.NASDelay,
a.SecurityDelay,
a.LateAircraftDelay from  delayed a 
  inner join lookup_city b
    on a.Origin=b.airportCode
  inner join lookup_city c
    on a.Dest=c.airportCode
  inner join lookup_geo geo
on split(b.City,',')[0]=geo.name
     and trim(split(b.City,',')[1])=geo.admin1_code
  inner join lookup_geo geo_dest
    on c.City=concat(geo_dest.name,', ',geo_dest.admin1_code)
)
select Origin, UniqueCarrier, Origin_City, Origin_latitude, Origin_Longitude, Dest_latitude, Dest_longitude, max(DepDelay) as delayed, avg(CarrierDelay) avgCarrierDelay 
from allColumns 
group by Origin, UniqueCarrier, Origin_City, Origin_latitude, Origin_Longitude, Dest_latitude, Dest_longitude
""").show()

print("--- %s seconds ---" % (time.time() - start_time))

+------+-------------+---------------+---------------+----------------+-------------+--------------+-------+------------------+
|Origin|UniqueCarrier|    Origin_City|Origin_latitude|Origin_Longitude|Dest_latitude|Dest_longitude|delayed|   avgCarrierDelay|
+------+-------------+---------------+---------------+----------------+-------------+--------------+-------+------------------+
|   ATL|           EV|    Atlanta, GA|         33.749|       -84.38798|     31.31129|     -92.44514|   99.0| 28.81025641025641|
|   ABQ|           DL|Albuquerque, NM|       35.08449|      -106.65114|       33.749|     -84.38798|   99.0|40.401869158878505|
|   ATW|           EV|   Appleton, WI|       44.26193|       -88.41538|       33.749|     -84.38798|   99.0|30.546666666666667|
|   BWI|           WN|  Baltimore, MD|       39.29038|       -76.61219|     30.26715|     -97.74306|   95.0|12.741935483870968|
|   CAK|           EV|      Akron, OH|       41.08144|       -81.51901|       33.749|     -84.38798|   9

In [25]:
df_1.write.partitionBy("UniqueCarrier").mode("overwrite").parquet("delayed_partitioned")

In [26]:
par_df_p=spark.read.parquet('delayed_partitioned')

In [27]:
par_df_p.createOrReplaceTempView('p_delays_p')
start_time = time.time()
spark.sql("""
select *
from p_delays_p
""").show()
print("--- %s seconds ---" % (time.time() - start_time))

+------+---------------+---------------+----------------+-------------+--------------+-------+------------------+-------------+
|Origin|    Origin_City|Origin_latitude|Origin_Longitude|Dest_latitude|Dest_longitude|delayed|   avgCarrierDelay|UniqueCarrier|
+------+---------------+---------------+----------------+-------------+--------------+-------+------------------+-------------+
|   ABQ|Albuquerque, NM|       35.08449|      -106.65114|     39.29038|     -76.61219|   96.0| 9.567164179104477|           WN|
|   ABQ|Albuquerque, NM|       35.08449|      -106.65114|     30.26715|     -97.74306|   95.0| 5.610169491525424|           XE|
|   ANC|  Anchorage, AK|       61.21806|      -149.90028|     60.79222|    -161.75583|   99.0|24.789473684210527|           AS|
|   ABQ|Albuquerque, NM|       35.08449|      -106.65114|       35.222|     -101.8313|   97.0|  8.59047619047619|           WN|
|   BWI|  Baltimore, MD|       39.29038|       -76.61219|     35.08449|    -106.65114|   94.0|19.3177570

In [29]:
# Recall that the default shuffle partitions is 200.  We want to bring that down to a reasonable size for both our data and our Spark cluster
# 4 is reasonable for a free Colab 
spark.conf.set("spark.sql.shuffle.partitions", 4)
df_1.show()

+------+-------------+-------------+---------------+----------------+-------------+--------------+-------+------------------+
|Origin|UniqueCarrier|  Origin_City|Origin_latitude|Origin_Longitude|Dest_latitude|Dest_longitude|delayed|   avgCarrierDelay|
+------+-------------+-------------+---------------+----------------+-------------+--------------+-------+------------------+
|   ABE|           OO|Allentown, PA|       40.60843|       -75.49018|     33.52066|     -86.80249|   11.0|              null|
|   ALB|           OH|   Albany, NY|       42.65258|       -73.75623|     42.35843|     -71.05977|   55.0|               0.0|
|   ALB|           WN|   Albany, NY|       42.65258|       -73.75623|     39.29038|     -76.61219|   99.0| 8.203647416413373|
|   ASE|           OO|    Aspen, CO|        39.1911|      -106.81754|       33.749|     -84.38798|   82.0|               6.0|
|   ATL|           9E|  Atlanta, GA|         33.749|       -84.38798|     33.52066|     -86.80249|   94.0|15.083333333

In [30]:
# Run 3 after setting the shuffle partitions to a more appropriate number
# Note the runtime
start_time = time.time()

spark.sql("""
with allColumns
(select 
a.Year,
a.Month,
a.DayofMonth,
a.DayOfWeek,
a.DepTime,
a.CRSDepTime,
a.ArrTime,
a.CRSArrTime,
a.UniqueCarrier,
a.FlightNum,
a.TailNum,
a.ActualElapsedTime,
a.CRSElapsedTime,
a.AirTime,
a.ArrDelay,
a.DepDelay,
a.Origin,
b.City as Origin_City,
geo.latitude as Origin_latitude,
geo.longitude as Origin_longitude,
a.Dest,
c.City as Dest_City,
geo_dest.latitude as Dest_latitude,
geo_dest.longitude as Dest_longitude,
a.Distance,
a.TaxiIn,
a.TaxiOut,
a.Cancelled,
a.CancellationCode,
a.Diverted,
a.CarrierDelay,
a.WeatherDelay,
a.NASDelay,
a.SecurityDelay,
a.LateAircraftDelay from  delayed a 
  inner join lookup_city b
    on a.Origin=b.airportCode
  inner join lookup_city c
    on a.Dest=c.airportCode
  inner join lookup_geo geo
on split(b.City,',')[0]=geo.name
     and trim(split(b.City,',')[1])=geo.admin1_code
  inner join lookup_geo geo_dest
    on c.City=concat(geo_dest.name,', ',geo_dest.admin1_code)
)
select Origin, UniqueCarrier, Origin_City, Origin_latitude, Origin_Longitude, Dest_latitude, Dest_longitude, max(DepDelay) as delayed, avg(CarrierDelay) avgCarrierDelay 
from allColumns 
group by Origin, UniqueCarrier, Origin_City, Origin_latitude, Origin_Longitude, Dest_latitude, Dest_longitude
""").show()

print("--- %s seconds ---" % (time.time() - start_time))

+------+-------------+-------------+---------------+----------------+-------------+--------------+-------+------------------+
|Origin|UniqueCarrier|  Origin_City|Origin_latitude|Origin_Longitude|Dest_latitude|Dest_longitude|delayed|   avgCarrierDelay|
+------+-------------+-------------+---------------+----------------+-------------+--------------+-------+------------------+
|   ABE|           OO|Allentown, PA|       40.60843|       -75.49018|     33.52066|     -86.80249|   11.0|              null|
|   ALB|           OH|   Albany, NY|       42.65258|       -73.75623|     42.35843|     -71.05977|   55.0|               0.0|
|   ALB|           WN|   Albany, NY|       42.65258|       -73.75623|     39.29038|     -76.61219|   99.0| 8.203647416413373|
|   ASE|           OO|    Aspen, CO|        39.1911|      -106.81754|       33.749|     -84.38798|   82.0|               6.0|
|   ATL|           9E|  Atlanta, GA|         33.749|       -84.38798|     33.52066|     -86.80249|   94.0|15.083333333

In [31]:
# cache your largest temporary view
# Note: when we use SparkSQL to cache a table, the table is immediately cached (no lazy evaluation), when using Pyspark it will not be cached until an action is ran.
spark.sql("cache table delayed")

DataFrame[]

In [32]:
spark.sql("cache table lookup_city")

DataFrame[]

In [33]:
# check that your table is cached 
spark.catalog.isCached("delayed")

True

In [34]:
spark.catalog.isCached("lookup_city")

True

In [35]:
# Run 4 - after caching driver table
# Note the runtime
start_time = time.time()

spark.sql("""
with allColumns
(select 
a.Year,
a.Month,
a.DayofMonth,
a.DayOfWeek,
a.DepTime,
a.CRSDepTime,
a.ArrTime,
a.CRSArrTime,
a.UniqueCarrier,
a.FlightNum,
a.TailNum,
a.ActualElapsedTime,
a.CRSElapsedTime,
a.AirTime,
a.ArrDelay,
a.DepDelay,
a.Origin,
b.City as Origin_City,
geo.latitude as Origin_latitude,
geo.longitude as Origin_longitude,
a.Dest,
c.City as Dest_City,
geo_dest.latitude as Dest_latitude,
geo_dest.longitude as Dest_longitude,
a.Distance,
a.TaxiIn,
a.TaxiOut,
a.Cancelled,
a.CancellationCode,
a.Diverted,
a.CarrierDelay,
a.WeatherDelay,
a.NASDelay,
a.SecurityDelay,
a.LateAircraftDelay from  delayed a 
  inner join lookup_city b
    on a.Origin=b.airportCode
  inner join lookup_city c
    on a.Dest=c.airportCode
  inner join lookup_geo geo
on split(b.City,',')[0]=geo.name
     and trim(split(b.City,',')[1])=geo.admin1_code
  inner join lookup_geo geo_dest
    on c.City=concat(geo_dest.name,', ',geo_dest.admin1_code)
)
select Origin, UniqueCarrier, Origin_City, Origin_latitude, Origin_Longitude, Dest_latitude, Dest_longitude, max(DepDelay) as delayed, avg(CarrierDelay) avgCarrierDelay 
from allColumns 
group by Origin, UniqueCarrier, Origin_City, Origin_latitude, Origin_Longitude, Dest_latitude, Dest_longitude
""").show()

print("--- %s seconds ---" % (time.time() - start_time))

+------+-------------+-------------+---------------+----------------+-------------+--------------+-------+------------------+
|Origin|UniqueCarrier|  Origin_City|Origin_latitude|Origin_Longitude|Dest_latitude|Dest_longitude|delayed|   avgCarrierDelay|
+------+-------------+-------------+---------------+----------------+-------------+--------------+-------+------------------+
|   ABE|           OO|Allentown, PA|       40.60843|       -75.49018|     33.52066|     -86.80249|   11.0|              null|
|   ALB|           OH|   Albany, NY|       42.65258|       -73.75623|     42.35843|     -71.05977|   55.0|               0.0|
|   ALB|           WN|   Albany, NY|       42.65258|       -73.75623|     39.29038|     -76.61219|   99.0| 8.203647416413373|
|   ASE|           OO|    Aspen, CO|        39.1911|      -106.81754|       33.749|     -84.38798|   82.0|               6.0|
|   ATL|           9E|  Atlanta, GA|         33.749|       -84.38798|     33.52066|     -86.80249|   94.0|15.083333333

In [37]:
# Run 5 - caching one of the lookups
#Note the runtime
start_time = time.time()

spark.sql("""
with allColumns
(select 
a.Year,
a.Month,
a.DayofMonth,
a.DayOfWeek,
a.DepTime,
a.CRSDepTime,
a.ArrTime,
a.CRSArrTime,
a.UniqueCarrier,
a.FlightNum,
a.TailNum,
a.ActualElapsedTime,
a.CRSElapsedTime,
a.AirTime,
a.ArrDelay,
a.DepDelay,
a.Origin,
b.City as Origin_City,
geo.latitude as Origin_latitude,
geo.longitude as Origin_longitude,
a.Dest,
c.City as Dest_City,
geo_dest.latitude as Dest_latitude,
geo_dest.longitude as Dest_longitude,
a.Distance,
a.TaxiIn,
a.TaxiOut,
a.Cancelled,
a.CancellationCode,
a.Diverted,
a.CarrierDelay,
a.WeatherDelay,
a.NASDelay,
a.SecurityDelay,
a.LateAircraftDelay from  delayed a 
  inner join lookup_city b
    on a.Origin=b.airportCode
  inner join lookup_city c
    on a.Dest=c.airportCode
  inner join lookup_geo geo
on split(b.City,',')[0]=geo.name
     and trim(split(b.City,',')[1])=geo.admin1_code
  inner join lookup_geo geo_dest
    on c.City=concat(geo_dest.name,', ',geo_dest.admin1_code)
)
select Origin, UniqueCarrier, Origin_City, Origin_latitude, Origin_Longitude, Dest_latitude, Dest_longitude, max(DepDelay) as delayed, avg(CarrierDelay) avgCarrierDelay 
from allColumns 
group by Origin, UniqueCarrier, Origin_City, Origin_latitude, Origin_Longitude, Dest_latitude, Dest_longitude
""").show()

print("--- %s seconds ---" % (time.time() - start_time))

+------+-------------+-------------+---------------+----------------+-------------+--------------+-------+------------------+
|Origin|UniqueCarrier|  Origin_City|Origin_latitude|Origin_Longitude|Dest_latitude|Dest_longitude|delayed|   avgCarrierDelay|
+------+-------------+-------------+---------------+----------------+-------------+--------------+-------+------------------+
|   ABE|           OO|Allentown, PA|       40.60843|       -75.49018|     33.52066|     -86.80249|   11.0|              null|
|   ALB|           OH|   Albany, NY|       42.65258|       -73.75623|     42.35843|     -71.05977|   55.0|               0.0|
|   ALB|           WN|   Albany, NY|       42.65258|       -73.75623|     39.29038|     -76.61219|   99.0| 8.203647416413373|
|   ASE|           OO|    Aspen, CO|        39.1911|      -106.81754|       33.749|     -84.38798|   82.0|               6.0|
|   ATL|           9E|  Atlanta, GA|         33.749|       -84.38798|     33.52066|     -86.80249|   94.0|15.083333333

In [39]:
# run 6 - remove unnecesary columns from the SQL
# Note the runtime
start_time = time.time()

df_clean = df[['UniqueCarrier','DepDelay','Origin','CarrierDelay','Dest']]

print("--- %s seconds ---" % (time.time() - start_time))

--- 0.03547954559326172 seconds ---


In [40]:
usa_look_up_cities = spark.sql("""SELECT * from lookup_city where Country = 'USA' """)

In [41]:
# run 7 - filter the lookup tables in the SQL
# Note the runtime
start_time = time.time()

usa_look_up_geo = spark.sql("""SELECT name,
              latitude,
              longitude,
              admin1_code,
              country_code
              from lookup_geo where country_code = 'US' """)

print("--- %s seconds ---" % (time.time() - start_time))

--- 0.010618448257446289 seconds ---


In [42]:
usa_look_up_cities.createOrReplaceTempView('lookup_city')
usa_look_up_geo.createOrReplaceTempView('lookup_geo')
df_clean.createOrReplaceTempView('delayed')
spark.sql("cache table delayed")
spark.sql("cache table lookup_geo")

DataFrame[]

In [43]:
spark.sql(""" SELECT  * from delayed a
              inner join lookup_city b
               on a.Origin=b.airportCode
               """).show()

+-------------+--------+------+------------+----+---------------+-------+-----------+
|UniqueCarrier|DepDelay|Origin|CarrierDelay|Dest|           City|country|airportCode|
+-------------+--------+------+------------+----+---------------+-------+-----------+
|           WN|    78.0|   ABQ|         0.0| AMA|Albuquerque, NM|    USA|        ABQ|
|           WN|     9.0|   ABQ|        null| DAL|Albuquerque, NM|    USA|        ABQ|
|           WN|   108.0|   ABQ|         1.0| DAL|Albuquerque, NM|    USA|        ABQ|
|           WN|     7.0|   ABQ|        null| DAL|Albuquerque, NM|    USA|        ABQ|
|           WN|    30.0|   ABQ|        12.0| DAL|Albuquerque, NM|    USA|        ABQ|
|           WN|    27.0|   ABQ|         3.0| DAL|Albuquerque, NM|    USA|        ABQ|
|           WN|    72.0|   ABQ|         2.0| DEN|Albuquerque, NM|    USA|        ABQ|
|           WN|    15.0|   ABQ|        null| DEN|Albuquerque, NM|    USA|        ABQ|
|           WN|    46.0|   ABQ|         2.0| HOU|Albuq

In [45]:
#recreate the dataframes selecting only the columns you need, filtering the data before creating the view, then caching the views.
# columns needed are 'UniqueCarrier','DepDelay','Origin','CarrierDelay','Dest' from the main table
df_1.show
# filter the df_lookup_city data prior to creating the view to only contain USA data

# filter the df_lookup_geo data prior to creating the view to only contain US data and select only the columns you need to perform the lookup
# fields from geo ('name','latitude','longitude','admin1_code')

df_lookup_city_name.createOrReplaceTempView('lookup_city')
df_lookup_geo.createOrReplaceTempView('lookup_geo')
df.createOrReplaceTempView('delayed')
spark.sql("cache table delayed")
spark.sql("cache table lookup_geo")

DataFrame[]

In [47]:
# run 8 - filtered lookup dataframes
# Note the runtime
start_time = time.time()

spark.sql("""
with allColumns
(select 
a.UniqueCarrier,
a.DepDelay,
a.Origin,
b.City as Origin_City,
geo.latitude as Origin_latitude,
geo.longitude as Origin_longitude,
a.Dest,
b.City as Dest_City,
geo.latitude as Dest_latitude,
geo.longitude as Dest_longitude,
a.CarrierDelay
     from  delayed a 
     inner join lookup_city b
    on a.Origin=b.airportCode
    inner join lookup_city c
    on a.Dest=c.airportCode
  inner join lookup_geo geo
on split(b.City,',')[0]=geo.name
     and trim(split(b.City,',')[1])=geo.admin1_code
  inner join lookup_geo geo_dest
    on c.City=concat(geo_dest.name,', ',geo_dest.admin1_code))
    select Origin, UniqueCarrier, Origin_City, Origin_latitude, Origin_Longitude, Dest_latitude, Dest_longitude, max(DepDelay) as delayed, avg(CarrierDelay) avgCarrierDelay 
from allColumns 
group by Origin, UniqueCarrier, Origin_City, Origin_latitude, Origin_Longitude, Dest_latitude, Dest_longitude
  """).show()


print("--- %s seconds ---" % (time.time() - start_time))

+------+-------------+---------------+---------------+----------------+-------------+--------------+-------+------------------+
|Origin|UniqueCarrier|    Origin_City|Origin_latitude|Origin_Longitude|Dest_latitude|Dest_longitude|delayed|   avgCarrierDelay|
+------+-------------+---------------+---------------+----------------+-------------+--------------+-------+------------------+
|   ABE|           EV|  Allentown, PA|       40.60843|       -75.49018|     40.60843|     -75.49018|   96.0|  31.4527027027027|
|   ABE|           OH|  Allentown, PA|       40.60843|       -75.49018|     40.60843|     -75.49018|   50.0|33.666666666666664|
|   ABE|           OO|  Allentown, PA|       40.60843|       -75.49018|     40.60843|     -75.49018|   11.0|              null|
|   ABY|           EV|     Albany, GA|       31.57851|       -84.15574|     31.57851|     -84.15574|   97.0|  26.4811320754717|
|   AEX|           EV| Alexandria, LA|       31.31129|       -92.44514|     31.31129|     -92.44514|   9

In [48]:
# Remember to uncache the table as soon as you are done.
spark.sql("uncache table delayed")
spark.sql("uncache table lookup_geo")

DataFrame[]

In [49]:
#Verify that the table is no longer cached
if spark.catalog.isCached("delayed") or spark.catalog.isCached("lookup_geo"):
  print("a table is till cached")
else:
  print("all clear")

all clear
