In [None]:
# Activate Spark in our Colab notebook.
import os
# Find the latest version of spark 3.0  from http://www.apache.org/dist/spark/ and enter as the spark version
# For example: 'spark-3.2.2'
spark_version = 'spark-3.2.2'
# spark_version = 'spark-3.<enter version>'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop3.2.tgz
!tar xf $SPARK_VERSION-bin-hadoop3.2.tgz
!pip install -q findspark

# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop3.2"

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Get:1 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
0% [Connecting to archive.ubuntu.com (185.125.190.39)] [1 InRelease 0 B/88.7 kB0% [Connecting to archive.ubuntu.com (185.125.190.39)] [Waiting for headers] [C                                                                               Get:2 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
                                                                               Hit:3 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
0% [Waiting for headers] [Connecting to ppa.launchpad.net (185.125.190.52)] [Wa0% [1 InRelease gpgv 88.7 kB] [Waiting for headers] [Waiting for headers] [Wait                                                                               Hit:4 http://archive.ubuntu.com/ubuntu bionic InRelease
0% [1 InRelease gpgv 88.7 kB] [Waiting for headers] [Waiting for headers] [Wait                   

In [None]:
# Import packages
from pyspark.sql import SparkSession
import time

# Create a SparkSession
spark = SparkSession.builder.appName("SparkSQL").getOrCreate()

# Recall that the default shuffle partitions is 200.  We want to bring that down to a reasonable size for both our data and our Spark cluster
# 4 is reasonable for a free Colab 
spark.conf.set("spark.sql.shuffle.partitions", 4)

In [None]:
# Read in data from S3 Bucket
from pyspark import SparkFiles
url = "https://2u-data-curriculum-team.s3.amazonaws.com/nflx-data-science-adv/week-5/DelayedFlights.csv"
spark.sparkContext.addFile(url)
df = spark.read.csv(SparkFiles.get("DelayedFlights.csv"), sep=",", header=True)

# Create a lookup table for the 500 cities. 
url_cities='https://2u-data-curriculum-team.s3.amazonaws.com/nflx-data-science-adv/week-5/cities500.txt'
spark.sparkContext.addFile(url_cities)
df_lookup_geo = spark.read.csv(SparkFiles.get("cities500.txt"), sep="\t", header=True)

# Create a lookup table for the airport codes. 
url_airportCodes ='https://2u-data-curriculum-team.s3.amazonaws.com/nflx-data-science-adv/week-5/airportCodes.csv'
spark.sparkContext.addFile(url_airportCodes)
df_lookup_codes = spark.read.csv(SparkFiles.get("airportCodes.csv"), sep=",", header=True)


In [None]:
# Look over the delayed flight data.
df.show()

+---+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|_c0|Year|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|ArrTime|CRSArrTime|UniqueCarrier|FlightNum|TailNum|ActualElapsedTime|CRSElapsedTime|AirTime|ArrDelay|DepDelay|Origin|Dest|Distance|TaxiIn|TaxiOut|Cancelled|CancellationCode|Diverted|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|
+---+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|  0|2008|    1|         3|        4| 2003.0|      1955| 2211.0|      2225|       

In [None]:
# Look over the data of the 500 cities.
df_lookup_geo.show()

+---------+-------------------+-------------------+--------------------+--------+---------+-------------+------------+------------+----+-----------+-----------+-----------+-----------+----------+---------+----+--------------+-----------------+
|geonameid|               name|          asciiname|      alternatenames|latitude|longitude|feature_class|feature_code|country_code| cc2|admin1_code|admin2_code|admin3_code|admin4_code|population|elevation| dem|      timezone|modification_date|
+---------+-------------------+-------------------+--------------------+--------+---------+-------------+------------+------------+----+-----------+-----------+-----------+-----------+----------+---------+----+--------------+-----------------+
|  3038999|             Soldeu|             Soldeu|                null|42.57688|  1.66769|            P|         PPL|          AD|null|         02|       null|       null|       null|       602|     null|1832|Europe/Andorra|       2017-11-06|
|  3039154|          El 

In [None]:
# Look over the airport codes.
df_lookup_codes.show()

+--------------+--------------------+-----------+
|          City|             country|airportCode|
+--------------+--------------------+-----------+
|       Aalborg|             Denmark|        AAL|
|      Aalesund|              Norway|        AES|
|        Aarhus|             Denmark|        AAR|
|Abbotsford, BC|              Canada|        YXX|
|Abbotsford, BC|              Canada|        YXX|
|      Aberdeen|            Scotland|        ABZ|
|  Aberdeen, SD|                 USA|        ABR|
|       Abidjan|         Ivory Coast|        ABJ|
|   Abilene, TX|                 USA|        ABI|
|     Abu Dhabi|United Arab Emirates|        AUH|
|         Abuja|             Nigeria|        ABV|
|      Acapulco|              Mexico|        ACA|
|         Accra|               Ghana|        ACC|
|         Adana|              Turkey|        ADA|
|   Addis Ababa|            Ethiopia|        ADD|
|Adelaide, S.A.|           Australia|        ADL|
|          Aden|               Yemen|        ADE|


In [None]:
# Filter the airport codes to only contain rows whose `country` equals `USA`
df_lookup_city_name=df_lookup_codes.filter("country='USA'")
df_lookup_city_name.show(5)

+------------+-------+-----------+
|        City|country|airportCode|
+------------+-------+-----------+
|Aberdeen, SD|    USA|        ABR|
| Abilene, TX|    USA|        ABI|
|   Akron, OH|    USA|        CAK|
| Alamosa, CO|    USA|        ALS|
|  Albany, GA|    USA|        ABY|
+------------+-------+-----------+
only showing top 5 rows



In [None]:
# Filter the latitude and longitude dataframe to only contain the 'name','latitude','longitude','admin1_code' fields and rows whose `country_code` equals `US`
df_lookup_geo=df_lookup_geo.select('name','latitude','longitude','admin1_code').filter("country_code='US'")
df_lookup_geo.show(5)

+--------------+--------+---------+-----------+
|          name|latitude|longitude|admin1_code|
+--------------+--------+---------+-----------+
|   Bay Minette|30.88296|-87.77305|         AL|
|          Edna|28.97859|-96.64609|         TX|
|Bayou La Batre|30.40352|-88.24852|         AL|
|     Henderson|32.15322|-94.79938|         TX|
|       Natalia|29.18968|-98.86253|         TX|
+--------------+--------+---------+-----------+
only showing top 5 rows



In [None]:
# Create temporary views for each of our DataFrames
df.createOrReplaceTempView('delayed')
df_lookup_city_name.createOrReplaceTempView('lookup_city')
df_lookup_geo.createOrReplaceTempView('lookup_geo')


In [None]:
# First, join the airport codes lookup table to the delayed flight DataFrame 
# and add the city of origin and destination like we did in the instructor demo.  

start_time = time.time()

spark.sql("""
select a.Year,
a.Month,
a.DayofMonth,
a.DayOfWeek,
a.DepTime,
a.CRSDepTime,
a.ArrTime,
a.CRSArrTime,
a.UniqueCarrier,
a.FlightNum,
a.TailNum,
a.ActualElapsedTime,
a.CRSElapsedTime,
a.AirTime,
a.ArrDelay,
a.DepDelay,
a.Origin,
b.City as Origin_City,
a.Dest,
c.City as Dest_City,
a.Distance,
a.TaxiIn,
a.TaxiOut,
a.Cancelled,
a.CancellationCode,
a.Diverted,
a.CarrierDelay,
a.WeatherDelay,
a.NASDelay,
a.SecurityDelay,
a.LateAircraftDelay from delayed a 
  inner join lookup_city b
    on a.Origin=b.airportCode
  inner join lookup_city c
    on a.Dest=c.airportCode
""").show()

print("--- %s seconds ---" % (time.time() - start_time))

+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+---------------+----+---------------+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|Year|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|ArrTime|CRSArrTime|UniqueCarrier|FlightNum|TailNum|ActualElapsedTime|CRSElapsedTime|AirTime|ArrDelay|DepDelay|Origin|    Origin_City|Dest|      Dest_City|Distance|TaxiIn|TaxiOut|Cancelled|CancellationCode|Diverted|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|
+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+---------------+----+---------------+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------

In [None]:
# Add the `origin_latitude` and `origin_longitude` fields by joining the `lookup_geo` view 
# to the `lookup_city` view and the delayed flight DataFrame.
# Note:  The two lookup views do not have matching columns, so we must be mindful what names are used when joining both views together.

start_time = time.time()

spark.sql("""
select a.Year,
a.Month,
a.DayofMonth,
a.DayOfWeek,
a.DepTime,
a.CRSDepTime,
a.ArrTime,
a.CRSArrTime,
a.UniqueCarrier,
a.FlightNum,
a.TailNum,
a.ActualElapsedTime,
a.CRSElapsedTime,
a.AirTime,
a.ArrDelay,
a.DepDelay,
a.Origin,
b.City as Origin_City,
geo.latitude as Origin_latitude,
geo.longitude as Origin_longitude,
a.Dest,
c.City as Dest_City,
a.Distance,
a.TaxiIn,
a.TaxiOut,
a.Cancelled,
a.CancellationCode,
a.Diverted,
a.CarrierDelay,
a.WeatherDelay,
a.NASDelay,
a.SecurityDelay,
a.LateAircraftDelay from delayed a 
  inner join lookup_city b
    on a.Origin=b.airportCode
  inner join lookup_city c
    on a.Dest=c.airportCode
  inner join lookup_geo geo
on split(b.City,',')[0]=geo.name and trim(split(b.City,',')[1])=geo.admin1_code
""").show()

print("--- %s seconds ---" % (time.time() - start_time))

+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+---------------+---------------+----------------+----+---------------+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|Year|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|ArrTime|CRSArrTime|UniqueCarrier|FlightNum|TailNum|ActualElapsedTime|CRSElapsedTime|AirTime|ArrDelay|DepDelay|Origin|    Origin_City|Origin_latitude|Origin_longitude|Dest|      Dest_City|Distance|TaxiIn|TaxiOut|Cancelled|CancellationCode|Diverted|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|
+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+---------------+---------------+----------------+----+---------------+--------+------+-------+---

In [None]:
# Finally, add the `dest_latitude` and `dest_longitude` fields by joining the `lookup_geo` view again as another alias, `geo_dest`.

start_time = time.time()

spark.sql("""
select a.Year,
a.Month,
a.DayofMonth,
a.DayOfWeek,
a.DepTime,
a.CRSDepTime,
a.ArrTime,
a.CRSArrTime,
a.UniqueCarrier,
a.FlightNum,
a.TailNum,
a.ActualElapsedTime,
a.CRSElapsedTime,
a.AirTime,
a.ArrDelay,
a.DepDelay,
a.Origin,
b.City as Origin_City,
geo.latitude as Origin_latitude,
geo.longitude as Origin_longitude,
a.Dest,
c.City as Dest_City,
geo_dest.latitude as Dest_latitude,
geo_dest.longitude as Dest_longitude,
a.Distance,
a.TaxiIn,
a.TaxiOut,
a.Cancelled,
a.CancellationCode,
a.Diverted,
a.CarrierDelay,
a.WeatherDelay,
a.NASDelay,
a.SecurityDelay,
a.LateAircraftDelay from delayed a 
  inner join lookup_city b
    on a.Origin=b.airportCode
  inner join lookup_city c
    on a.Dest=c.airportCode
  inner join lookup_geo geo
on split(b.City,',')[0]=geo.name and trim(split(b.City,',')[1])=geo.admin1_code
  inner join lookup_geo geo_dest
    on c.City=concat(geo_dest.name,', ',geo_dest.admin1_code)
""").show()

print("--- %s seconds ---" % (time.time() - start_time))

+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+---------------+---------------+----------------+----+---------------+-------------+--------------+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|Year|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|ArrTime|CRSArrTime|UniqueCarrier|FlightNum|TailNum|ActualElapsedTime|CRSElapsedTime|AirTime|ArrDelay|DepDelay|Origin|    Origin_City|Origin_latitude|Origin_longitude|Dest|      Dest_City|Dest_latitude|Dest_longitude|Distance|TaxiIn|TaxiOut|Cancelled|CancellationCode|Diverted|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|
+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+---------------+---------------+-------

In [None]:
# Run the same query with a Broadcast hint for either table
 
start_time = time.time()

spark.sql("""
select /*+ BROADCAST(lookup_geo) */ 
a.Year,
a.Month,
a.DayofMonth,
a.DayOfWeek,
a.DepTime,
a.CRSDepTime,
a.ArrTime,
a.CRSArrTime,
a.UniqueCarrier,
a.FlightNum,
a.TailNum,
a.ActualElapsedTime,
a.CRSElapsedTime,
a.AirTime,
a.ArrDelay,
a.DepDelay,
a.Origin,
b.City as Origin_City,
geo.latitude as Origin_latitude,
geo.longitude as Origin_longitude,
a.Dest,
c.City as Dest_City,
geo_dest.latitude as Dest_latitude,
geo_dest.longitude as Dest_longitude,
a.Distance,
a.TaxiIn,
a.TaxiOut,
a.Cancelled,
a.CancellationCode,
a.Diverted,
a.CarrierDelay,
a.WeatherDelay,
a.NASDelay,
a.SecurityDelay,
a.LateAircraftDelay from  delayed a 
  inner join lookup_city b
    on a.Origin=b.airportCode
  inner join lookup_city c
    on a.Dest=c.airportCode
  inner join lookup_geo geo
on split(b.City,',')[0]=geo.name
     and trim(split(b.City,',')[1])=geo.admin1_code
  inner join lookup_geo geo_dest
    on c.City=concat(geo_dest.name,', ',geo_dest.admin1_code)
""").show()
print("--- %s seconds ---" % (time.time() - start_time))

+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+---------------+---------------+----------------+----+---------------+-------------+--------------+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|Year|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|ArrTime|CRSArrTime|UniqueCarrier|FlightNum|TailNum|ActualElapsedTime|CRSElapsedTime|AirTime|ArrDelay|DepDelay|Origin|    Origin_City|Origin_latitude|Origin_longitude|Dest|      Dest_City|Dest_latitude|Dest_longitude|Distance|TaxiIn|TaxiOut|Cancelled|CancellationCode|Diverted|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|
+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+---------------+---------------+-------

In [None]:
# Write a sql (we used a CTE here) that does some aggregations on the new data.  The purpose of this SQL is to add some processing time.
# Note the runtime
start_time = time.time()

spark.sql("""
with allColumns
(select /*+ BROADCAST(lookup_geo) */ 
a.Year,
a.Month,
a.DayofMonth,
a.DayOfWeek,
a.DepTime,
a.CRSDepTime,
a.ArrTime,
a.CRSArrTime,
a.UniqueCarrier,
a.FlightNum,
a.TailNum,
a.ActualElapsedTime,
a.CRSElapsedTime,
a.AirTime,
a.ArrDelay,
a.DepDelay,
a.Origin,
b.City as Origin_City,
geo.latitude as Origin_latitude,
geo.longitude as Origin_longitude,
a.Dest,
c.City as Dest_City,
geo_dest.latitude as Dest_latitude,
geo_dest.longitude as Dest_longitude,
a.Distance,
a.TaxiIn,
a.TaxiOut,
a.Cancelled,
a.CancellationCode,
a.Diverted,
a.CarrierDelay,
a.WeatherDelay,
a.NASDelay,
a.SecurityDelay,
a.LateAircraftDelay from  delayed a 
  inner join lookup_city b
    on a.Origin=b.airportCode
  inner join lookup_city c
    on a.Dest=c.airportCode
  inner join lookup_geo geo
on split(b.City,',')[0]=geo.name
     and trim(split(b.City,',')[1])=geo.admin1_code
  inner join lookup_geo geo_dest
    on c.City=concat(geo_dest.name,', ',geo_dest.admin1_code)
)
select Origin_City, avg(CarrierDelay) avgCarrierDelay from allColumns group by 1
""").show()

print("--- %s seconds ---" % (time.time() - start_time))

+-----------------+------------------+
|      Origin_City|   avgCarrierDelay|
+-----------------+------------------+
|  Albuquerque, NM| 18.31268436578171|
|       Austin, TX|25.133111480865225|
|     Amarillo, TX|              34.3|
|  Baton Rouge, LA| 40.45161290322581|
|    Allentown, PA|31.496688741721854|
|    Asheville, NC|31.566101694915254|
|  Bloomington, IL|21.365470852017935|
|    Anchorage, AK| 24.98989898989899|
|    Baltimore, MD|  15.7993145468393|
|      Atlanta, GA| 24.09317927692881|
|       Bangor, ME|34.068627450980394|
|      Augusta, GA|26.899313501144164|
|       Albany, GA|  26.4811320754717|
|      Bozeman, MT|            18.125|
|        Aspen, CO|               6.0|
|       Albany, NY|17.217179902755266|
|       Boston, MA|15.216697936210132|
|        Akron, OH|  20.0124716553288|
|Atlantic City, NJ|             106.3|
|   Birmingham, AL|19.541315345699832|
+-----------------+------------------+
only showing top 20 rows

--- 10.633858919143677 seconds ---


In [None]:
# Cache your largest temporary view
# Note: when we use SparkSQL to cache a table, the table is immediately cached (no lazy evaluation), when using Pyspark it will not be cached until an action is ran.
spark.sql("cache table delayed")

DataFrame[]

In [None]:
# Check that your table is cached 
spark.catalog.isCached("delayed")

True

In [None]:
# Run the same query again with the data cached. This should greatly improve the run time.  
# Keep in mind we are not working with particularly large data here so the improvements may not be dramatic.

start_time = time.time()

spark.sql("""
with allColumns
(select /*+ BROADCAST(lookup_geo) */ 
a.Year,
a.Month,
a.DayofMonth,
a.DayOfWeek,
a.DepTime,
a.CRSDepTime,
a.ArrTime,
a.CRSArrTime,
a.UniqueCarrier,
a.FlightNum,
a.TailNum,
a.ActualElapsedTime,
a.CRSElapsedTime,
a.AirTime,
a.ArrDelay,
a.DepDelay,
a.Origin,
b.City as Origin_City,
geo.latitude as Origin_latitude,
geo.longitude as Origin_longitude,
a.Dest,
c.City as Dest_City,
geo_dest.latitude as Dest_latitude,
geo_dest.longitude as Dest_longitude,
a.Distance,
a.TaxiIn,
a.TaxiOut,
a.Cancelled,
a.CancellationCode,
a.Diverted,
a.CarrierDelay,
a.WeatherDelay,
a.NASDelay,
a.SecurityDelay,
a.LateAircraftDelay from  delayed a 
  inner join lookup_city b
    on a.Origin=b.airportCode
  inner join lookup_city c
    on a.Dest=c.airportCode
  inner join lookup_geo geo
on split(b.City,',')[0]=geo.name
     and trim(split(b.City,',')[1])=geo.admin1_code
  inner join lookup_geo geo_dest
    on c.City=concat(geo_dest.name,', ',geo_dest.admin1_code)
)
select Origin_City, avg(ArrDelay) avgDelay from allColumns group by 1
""").show()

print("--- %s seconds ---" % (time.time() - start_time))


+-----------------+------------------+
|      Origin_City|          avgDelay|
+-----------------+------------------+
|  Albuquerque, NM| 34.16695059625213|
|       Austin, TX| 44.39871382636656|
|     Amarillo, TX| 42.94736842105263|
|  Baton Rouge, LA| 63.06401137980085|
|    Allentown, PA|            50.192|
|    Asheville, NC| 58.58413461538461|
|  Bloomington, IL| 50.65745007680491|
|    Anchorage, AK| 36.90522875816993|
|    Baltimore, MD| 41.57686980609418|
|      Atlanta, GA| 41.65309675814609|
|       Bangor, ME| 53.30357142857143|
|      Augusta, GA|61.791666666666664|
|       Albany, GA|50.907534246575345|
|      Bozeman, MT| 44.65217391304348|
|        Aspen, CO|              52.8|
|       Albany, NY| 43.70686070686071|
|       Boston, MA| 50.85264194289465|
|        Akron, OH|53.393058918482645|
|Atlantic City, NJ| 64.64285714285714|
|   Birmingham, AL|46.458149779735685|
+-----------------+------------------+
only showing top 20 rows

--- 4.07237982749939 seconds ---


In [None]:
# you can even cache a large lookup table.
spark.sql("cache table lookup_geo")

DataFrame[]

In [None]:
# Run the same query again with the data cached. This should greatly improve the run time.

start_time = time.time()

spark.sql("""
with allColumns
(select /*+ BROADCAST(lookup_geo) */ 
a.Year,
a.Month,
a.DayofMonth,
a.DayOfWeek,
a.DepTime,
a.CRSDepTime,
a.ArrTime,
a.CRSArrTime,
a.UniqueCarrier,
a.FlightNum,
a.TailNum,
a.ActualElapsedTime,
a.CRSElapsedTime,
a.AirTime,
a.ArrDelay,
a.DepDelay,
a.Origin,
b.City as Origin_City,
geo.latitude as Origin_latitude,
geo.longitude as Origin_longitude,
a.Dest,
c.City as Dest_City,
geo_dest.latitude as Dest_latitude,
geo_dest.longitude as Dest_longitude,
a.Distance,
a.TaxiIn,
a.TaxiOut,
a.Cancelled,
a.CancellationCode,
a.Diverted,
a.CarrierDelay,
a.WeatherDelay,
a.NASDelay,
a.SecurityDelay,
a.LateAircraftDelay from  delayed a 
  inner join lookup_city b
    on a.Origin=b.airportCode
  inner join lookup_city c
    on a.Dest=c.airportCode
  inner join lookup_geo geo
on split(b.City,',')[0]=geo.name
     and trim(split(b.City,',')[1])=geo.admin1_code
  inner join lookup_geo geo_dest
    on c.City=concat(geo_dest.name,', ',geo_dest.admin1_code)
)
select Origin_City, avg(ArrDelay) avgDelay from allColumns group by 1
""").show()

print("--- %s seconds ---" % (time.time() - start_time))

+-----------------+------------------+
|      Origin_City|          avgDelay|
+-----------------+------------------+
|  Albuquerque, NM| 34.16695059625213|
|       Austin, TX| 44.39871382636656|
|     Amarillo, TX| 42.94736842105263|
|  Baton Rouge, LA| 63.06401137980085|
|    Allentown, PA|            50.192|
|    Asheville, NC| 58.58413461538461|
|  Bloomington, IL| 50.65745007680491|
|    Anchorage, AK| 36.90522875816993|
|    Baltimore, MD| 41.57686980609418|
|      Atlanta, GA| 41.65309675814609|
|       Bangor, ME| 53.30357142857143|
|      Augusta, GA|61.791666666666664|
|       Albany, GA|50.907534246575345|
|      Bozeman, MT| 44.65217391304348|
|        Aspen, CO|              52.8|
|       Albany, NY| 43.70686070686071|
|       Boston, MA| 50.85264194289465|
|        Akron, OH|53.393058918482645|
|Atlantic City, NJ| 64.64285714285714|
|   Birmingham, AL|46.458149779735685|
+-----------------+------------------+
only showing top 20 rows

--- 2.1907601356506348 seconds ---


In [None]:
# Remember to uncache the table as soon as you are done.
spark.sql("uncache table delayed")
spark.sql("uncache table lookup_geo")

DataFrame[]

In [None]:
# Verify that the table is no longer cached
if spark.catalog.isCached("delayed") or spark.catalog.isCached("lookup_geo"):
  print("a table is till cached")
else:
  print("all clear")

all clear
