# Manipulating Data using Apache Spark

In this notebook, we are going to get our hands dirty with Spark DataFrame API to perform common data operations.

In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/02/28 11:25:43 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/02/28 11:25:49 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
23/02/28 11:25:49 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.


If you take a look at dataset folder, you will see `flights.csv` that contains a row for every flight that left Portland International Airport (PDX) or Seattle-Tacoma International Airport (SEA) in 2014 and 2015.

In the first step, we should create a DataFrame using `flights.csv` file and then create a table (temporaray view) for querying flights by using SQL commands. Let's do it.

In [2]:
import os

MAIN_DIRECTORY = os.getcwd()

file_path = MAIN_DIRECTORY + "/dataset/flights.csv"

In [3]:
df_flights = spark.read.format("csv").option("header","true").option('inferSchema','true').load(file_path)

                                                                                

In [4]:
# A simple way to create a dataframe in Spark
df_flights = spark.read.csv(file_path, header=True, inferSchema = True)

In [5]:
df_flights.createOrReplaceTempView('flights')

### Use SQL to get the first five rows of the flights table and save the result to flights5, finally show the results. 

In [6]:
query = 'SELECT * FROM flights LIMIT 5'
flights5 = spark.sql(query)
flights5.show()

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|2014|   12|  8|     658|       -7|     935|       -5|     VX| N846VA|  1780|   SEA| LAX|     132|     954|   6|    58|
|2014|    1| 22|    1040|        5|    1505|        5|     AS| N559AS|   851|   SEA| HNL|     360|    2677|  10|    40|
|2014|    3|  9|    1443|       -2|    1652|        2|     VX| N847VA|   755|   SEA| SFO|     111|     679|  14|    43|
|2014|    4|  9|    1705|       45|    1839|       34|     WN| N360SW|   344|   PDX| SJC|      83|     569|  17|     5|
|2014|    3|  9|     754|       -1|    1015|        1|     AS| N612AS|   522|   SEA| BUR|     127|     937|   7|    54|
+----+-----+---+--------+---------+-----

### Write a query that counts the number of flights to each airport from SEA and PDX.

In [9]:
query = "SELECT origin, dest, count(*) FROM flights GROUP BY origin, dest"
flight_count = spark.sql(query)
flight_count.show()

+------+----+--------+
|origin|dest|count(1)|
+------+----+--------+
|   SEA| RNO|       8|
|   SEA| DTW|      98|
|   SEA| CLE|       2|
|   SEA| LAX|     450|
|   PDX| SEA|     144|
|   SEA| BLI|       5|
|   PDX| IAH|      57|
|   PDX| PHX|     209|
|   SEA| SLC|     225|
|   SEA| SBA|      23|
|   SEA| BWI|      29|
|   PDX| IAD|      23|
|   PDX| SFO|     305|
|   SEA| KOA|      40|
|   PDX| MCI|      15|
|   SEA| SJC|     213|
|   SEA| ABQ|      43|
|   SEA| SAT|      18|
|   PDX| ONT|      57|
|   SEA| LAS|     364|
+------+----+--------+
only showing top 20 rows



### Write a piece of code to create a DataFrame using `airports.csv`, this file contains information about different airports all over the world. 

In [10]:
file_path = MAIN_DIRECTORY + "/dataset/airports.csv"
df_airports = spark.read.csv(file_path, header=True, inferSchema = True)
df_airports.show(10)

+---+--------------------+----------+------------+----+---+---+
|faa|                name|       lat|         lon| alt| tz|dst|
+---+--------------------+----------+------------+----+---+---+
|04G|   Lansdowne Airport|41.1304722| -80.6195833|1044| -5|  A|
|06A|Moton Field Munic...|32.4605722| -85.6800278| 264| -5|  A|
|06C| Schaumburg Regional|41.9893408| -88.1012428| 801| -6|  A|
|06N|     Randall Airport| 41.431912| -74.3915611| 523| -5|  A|
|09J|Jekyll Island Air...|31.0744722| -81.4277778|  11| -4|  A|
|0A9|Elizabethton Muni...|36.3712222| -82.1734167|1593| -4|  A|
|0G6|Williams County A...|41.4673056| -84.5067778| 730| -5|  A|
|0G7|Finger Lakes Regi...|42.8835647| -76.7812318| 492| -5|  A|
|0P2|Shoestring Aviati...|39.7948244| -76.6471914|1000| -5|  U|
|0S9|Jefferson County ...|48.0538086|-122.8106436| 108| -8|  A|
+---+--------------------+----------+------------+----+---+---+
only showing top 10 rows



In [11]:
df_airports.printSchema()

root
 |-- faa: string (nullable = true)
 |-- name: string (nullable = true)
 |-- lat: double (nullable = true)
 |-- lon: double (nullable = true)
 |-- alt: integer (nullable = true)
 |-- tz: integer (nullable = true)
 |-- dst: string (nullable = true)



### Update `flights` DataFrame to include a new column called `duration_hrs`, that contains the duration of each flight in hours.

In [12]:
df_flights = df_flights.withColumn("duration_hrs", df_flights.air_time/60)

In [13]:
df_flights.columns

['year',
 'month',
 'day',
 'dep_time',
 'dep_delay',
 'arr_time',
 'arr_delay',
 'carrier',
 'tailnum',
 'flight',
 'origin',
 'dest',
 'air_time',
 'distance',
 'hour',
 'minute',
 'duration_hrs']

In [14]:
df_flights.select(['origin', 'dest', 'flight', 'air_time', 'duration_hrs']).show()

+------+----+------+--------+------------------+
|origin|dest|flight|air_time|      duration_hrs|
+------+----+------+--------+------------------+
|   SEA| LAX|  1780|     132|               2.2|
|   SEA| HNL|   851|     360|               6.0|
|   SEA| SFO|   755|     111|              1.85|
|   PDX| SJC|   344|      83|1.3833333333333333|
|   SEA| BUR|   522|     127|2.1166666666666667|
|   PDX| DEN|    48|     121|2.0166666666666666|
|   PDX| OAK|  1520|      90|               1.5|
|   SEA| SFO|   755|      98|1.6333333333333333|
|   SEA| SAN|   490|     135|              2.25|
|   SEA| ORD|    26|     198|               3.3|
|   SEA| LAX|   448|     130|2.1666666666666665|
|   SEA| PHX|   656|     154| 2.566666666666667|
|   SEA| LAS|   608|     127|2.1166666666666667|
|   SEA| ANC|   121|     183|              3.05|
|   SEA| SFO|   306|     129|              2.15|
|   PDX| SFO|  1458|      90|               1.5|
|   SEA| SMF|   368|      76|1.2666666666666666|
|   SEA| MDW|   827|

### Write a query using the `.filter()` method to find all the flights that flew over 1000 miles.

In [15]:
long_flights = df_flights.filter('distance > 1000' )

In [16]:
long_flights.show()

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+------------------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|      duration_hrs|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+------------------+
|2014|    1| 22|    1040|        5|    1505|        5|     AS| N559AS|   851|   SEA| HNL|     360|    2677|  10|    40|               6.0|
|2014|    4| 19|    1236|       -4|    1508|       -7|     AS| N309AS|   490|   SEA| SAN|     135|    1050|  12|    36|              2.25|
|2014|   11| 19|    1812|       -3|    2352|       -4|     AS| N564AS|    26|   SEA| ORD|     198|    1721|  18|    12|               3.3|
|2014|    8|  3|    1120|        0|    1415|        2|     AS| N305AS|   656|   SEA| PHX|     154|    1107|  11|    20| 2.566666666666667|
|2014|   11| 12|    2346|  

### Write a query using `.filter()` method, to only keep flights from SEA to PDX. This query should only return `tailnum`, `origin`, and `dest` columns.

In [17]:
# Solution 1

df_flights.filter(df_flights.origin == 'SEA').filter(df_flights.dest == 'PDX')\
.select('tailnum', 'origin', 'dest').show(5)

+-------+------+----+
|tailnum|origin|dest|
+-------+------+----+
| N810SK|   SEA| PDX|
| N822SK|   SEA| PDX|
| N586SW|   SEA| PDX|
| N223SW|   SEA| PDX|
| N580SW|   SEA| PDX|
+-------+------+----+
only showing top 5 rows



In [19]:
#Solution 2

df_flights.filter((df_flights.origin == 'SEA') & (df_flights.dest == 'PDX'))\
.select('tailnum', 'origin', 'dest').show(5)

+-------+------+----+
|tailnum|origin|dest|
+-------+------+----+
| N810SK|   SEA| PDX|
| N822SK|   SEA| PDX|
| N586SW|   SEA| PDX|
| N223SW|   SEA| PDX|
| N580SW|   SEA| PDX|
+-------+------+----+
only showing top 5 rows



### Write a query that return these columns, `origin`, `dest`, `tailnum`, and average speed in KM per hour.

In [20]:
#Solution 1

avg_speed = (df_flights.distance / (df_flights.air_time / 60)).alias('avg_speed')

In [21]:
df_flights.select('origin', 'dest', 'tailnum', avg_speed).show(5)

+------+----+-------+------------------+
|origin|dest|tailnum|         avg_speed|
+------+----+-------+------------------+
|   SEA| LAX| N846VA| 433.6363636363636|
|   SEA| HNL| N559AS| 446.1666666666667|
|   SEA| SFO| N847VA|367.02702702702703|
|   PDX| SJC| N360SW| 411.3253012048193|
|   SEA| BUR| N612AS| 442.6771653543307|
+------+----+-------+------------------+
only showing top 5 rows



In [22]:
#Solution 2

df_flights.selectExpr('origin', 'dest', 'tailnum', "distance / (air_time / 60) as avg_speed").show(5)

+------+----+-------+------------------+
|origin|dest|tailnum|         avg_speed|
+------+----+-------+------------------+
|   SEA| LAX| N846VA| 433.6363636363636|
|   SEA| HNL| N559AS| 446.1666666666667|
|   SEA| SFO| N847VA|367.02702702702703|
|   PDX| SJC| N360SW| 411.3253012048193|
|   SEA| BUR| N612AS| 442.6771653543307|
+------+----+-------+------------------+
only showing top 5 rows



### Find the the shortest (in terms of distance) flight that left PDX by first filtering and using the `.min()` method. Perform the filtering by referencing the column directly, not passing a SQL string.

In [23]:
#Solution 1

df_flights.filter(df_flights.origin == 'PDX').groupBy(df_flights.origin).agg({'distance':'min'}).show()

+------+-------------+
|origin|min(distance)|
+------+-------------+
|   PDX|          106|
+------+-------------+



In [25]:
#Solutin 2

from pyspark.sql.functions import min

df_flights.filter(df_flights.origin == 'PDX').groupBy(df_flights.origin) \
.agg(min('distance').alias('Shortest Flight Distance')).show()

+------+------------------------+
|origin|Shortest Flight Distance|
+------+------------------------+
|   PDX|                     106|
+------+------------------------+



In [29]:
# Solution 3

df_flights.filter(df_flights.origin == 'PDX').groupBy(df_flights.origin).min('distance').show()

+------+-------------+
|origin|min(distance)|
+------+-------------+
|   PDX|          106|
+------+-------------+



### Find the the longest (in terms of time) flight that left SEA by filtering and using the `.max()` method. Perform the filtering by referencing the column directly, not passing a SQL string.

if we run the following code, we will get an error, because `air_time` data type is string, first we should cast it to an integer column.

In [30]:
from pyspark.sql.functions import max

df_flights.filter(df_flights.origin == 'SEA').groupBy(df_flights.origin) \
.agg(max('air_time').alias('Longest Flight Time')).show()

+------+-------------------+
|origin|Longest Flight Time|
+------+-------------------+
|   SEA|                 NA|
+------+-------------------+



In [31]:
df_flights.printSchema()

root
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- dep_time: string (nullable = true)
 |-- dep_delay: string (nullable = true)
 |-- arr_time: string (nullable = true)
 |-- arr_delay: string (nullable = true)
 |-- carrier: string (nullable = true)
 |-- tailnum: string (nullable = true)
 |-- flight: integer (nullable = true)
 |-- origin: string (nullable = true)
 |-- dest: string (nullable = true)
 |-- air_time: string (nullable = true)
 |-- distance: integer (nullable = true)
 |-- hour: string (nullable = true)
 |-- minute: string (nullable = true)
 |-- duration_hrs: double (nullable = true)



In [33]:
from pyspark.sql.functions import col

df_flights = df_flights.withColumn('air_time', col('air_time').cast('Integer'))

In [34]:
df_flights.printSchema()

root
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- dep_time: string (nullable = true)
 |-- dep_delay: string (nullable = true)
 |-- arr_time: string (nullable = true)
 |-- arr_delay: string (nullable = true)
 |-- carrier: string (nullable = true)
 |-- tailnum: string (nullable = true)
 |-- flight: integer (nullable = true)
 |-- origin: string (nullable = true)
 |-- dest: string (nullable = true)
 |-- air_time: integer (nullable = true)
 |-- distance: integer (nullable = true)
 |-- hour: string (nullable = true)
 |-- minute: string (nullable = true)
 |-- duration_hrs: double (nullable = true)



In [35]:
#Solution 1

df_flights.filter(df_flights.origin == 'SEA').groupBy(df_flights.origin) \
.agg(max('air_time').alias('Longest Flight Time')).show()

+------+-------------------+
|origin|Longest Flight Time|
+------+-------------------+
|   SEA|                409|
+------+-------------------+



In [36]:
#Solution 2

df_flights.filter(df_flights.origin == 'SEA').groupBy(df_flights.origin).max('air_time').show()

+------+-------------+
|origin|max(air_time)|
+------+-------------+
|   SEA|          409|
+------+-------------+



### Write a query that uses the `.avg()` method to get the average air time of Delta Airlines flights ( the carrier column value is "DL") that left SEA.

In [38]:
# frist method
# df_flights.filter((df_flights.carrier == 'DL') & (df_flights.origin == 'SEA'))\
# .groupBy(df_flights.origin).avg('air_time').show()

# second method
query = "SELECT AVG(air_time) AS avg_air_time\
         FROM flights\
         WHERE origin = 'SEA' AND carrier = 'DL'"

spark.sql(query).show()

+------------------+
|      avg_air_time|
+------------------+
|188.20689655172413|
+------------------+



### Write a query that uses the `.sum()` method to get the total number of hours all planes spent in the air by creating a column called `duration_hrs` from the column `air_time`.

In [41]:
# first method
# query = "SELECT SUM(air_time / 60.0) AS duration_hrs FROM flights" 
# spark.sql(query).show()

# second methon
# df_flights.groupBy().sum('duration_hrs').show()

# third method
df_duration_hrs = df_flights.withColumn("duration_hrs", col("air_time") / 60).groupBy().sum('duration_hrs').show()

+------------------+
| sum(duration_hrs)|
+------------------+
|25289.600000000126|
+------------------+



### Write a query that uses `tailnum` column to count the number of flights each plane made.

In [44]:
# first method
# query = "SELECT tailnum, COUNT(*) AS num_flights_each_plane_made FROM flights GROUP BY tailnum"
# spark.sql(query).show()

# second method
df_flights.groupBy(df_flights.tailnum).count().show()

+-------+-----+
|tailnum|count|
+-------+-----+
| N442AS|   38|
| N102UW|    2|
| N36472|    4|
| N38451|    4|
| N73283|    4|
| N513UA|    2|
| N954WN|    5|
| N388DA|    3|
| N567AA|    1|
| N516UA|    2|
| N927DN|    1|
| N8322X|    1|
| N466SW|    1|
|  N6700|    1|
| N607AS|   45|
| N622SW|    4|
| N584AS|   31|
| N914WN|    4|
| N654AW|    2|
| N336NW|    1|
+-------+-----+
only showing top 20 rows



### Write a query that returns the average duration of flights from `PDX` and `SEA`.

In [51]:
from pyspark.sql.functions import avg

# df_flights.groupBy(df_flights.origin).avg('air_time').show()

df_flights.groupBy(df_flights.origin).agg(avg(df_flights.air_time)).show()

+------+------------------+
|origin|     avg(air_time)|
+------+------------------+
|   SEA| 160.4361496051259|
|   PDX|137.11543248288737|
+------+------------------+



In [47]:
query = "SELECT origin, AVG(air_time) AS avg_duration FROM flights \
        WHERE origin IN ('PDX', 'SEA') GROUP BY origin"
spark.sql(query).show()

+------+------------------+
|origin|      avg_duration|
+------+------------------+
|   SEA| 160.4361496051259|
|   PDX|137.11543248288737|
+------+------------------+



### Write a query that returns the average departure delay (`dep_delay`) in each month for each destination. Then import PySpark functions to calculate the standard deviation of `dep_delay` by using `stddev()` function.

In [52]:
df_flights.printSchema()

root
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- dep_time: string (nullable = true)
 |-- dep_delay: string (nullable = true)
 |-- arr_time: string (nullable = true)
 |-- arr_delay: string (nullable = true)
 |-- carrier: string (nullable = true)
 |-- tailnum: string (nullable = true)
 |-- flight: integer (nullable = true)
 |-- origin: string (nullable = true)
 |-- dest: string (nullable = true)
 |-- air_time: integer (nullable = true)
 |-- distance: integer (nullable = true)
 |-- hour: string (nullable = true)
 |-- minute: string (nullable = true)
 |-- duration_hrs: double (nullable = true)



In [53]:
df_flights = df_flights.withColumn('dep_delay', col('dep_delay').cast('integer'))

In [54]:
df_flights.printSchema()

root
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- dep_time: string (nullable = true)
 |-- dep_delay: integer (nullable = true)
 |-- arr_time: string (nullable = true)
 |-- arr_delay: string (nullable = true)
 |-- carrier: string (nullable = true)
 |-- tailnum: string (nullable = true)
 |-- flight: integer (nullable = true)
 |-- origin: string (nullable = true)
 |-- dest: string (nullable = true)
 |-- air_time: integer (nullable = true)
 |-- distance: integer (nullable = true)
 |-- hour: string (nullable = true)
 |-- minute: string (nullable = true)
 |-- duration_hrs: double (nullable = true)



In [56]:
grouped_by_month_dest = df_flights.groupBy('month', 'dest')

In [57]:
type(grouped_by_month_dest)

pyspark.sql.group.GroupedData

In [59]:
grouped_by_month_dest.avg('dep_delay').orderBy('month').show(3)

+-----+----+--------------+
|month|dest|avg(dep_delay)|
+-----+----+--------------+
|    1| MSP|          11.2|
|    1| ABQ|          -3.0|
|    1| SLC|        6.5625|
+-----+----+--------------+
only showing top 3 rows



In [61]:
from pyspark.sql.functions import stddev

grouped_by_month_dest.agg(stddev('dep_delay')).show(3)

+-----+----+----------------------+
|month|dest|stddev_samp(dep_delay)|
+-----+----+----------------------+
|    4| PHX|    15.003380033491737|
|    1| RDM|     8.830749846821778|
|    5| ONT|    18.895178691342874|
+-----+----+----------------------+
only showing top 3 rows



### Write a query that performs left outer join on the flights and airports DataFrames.
- The flights and airports DataFrames are already in the workspace. 
- First, examine the airports DataFrame by calling .show() method. 
- Note which key column will let you join these two DataFrames.
- Before joining these two DataFrames, rename the `faa` column in `airports` to `dest`, and then convert this DataFrame to a temporary view (table).
- Use `spark.sql` to perform left outer join on these two tables. 


In [62]:
df_airports.show(10)

+---+--------------------+----------+------------+----+---+---+
|faa|                name|       lat|         lon| alt| tz|dst|
+---+--------------------+----------+------------+----+---+---+
|04G|   Lansdowne Airport|41.1304722| -80.6195833|1044| -5|  A|
|06A|Moton Field Munic...|32.4605722| -85.6800278| 264| -5|  A|
|06C| Schaumburg Regional|41.9893408| -88.1012428| 801| -6|  A|
|06N|     Randall Airport| 41.431912| -74.3915611| 523| -5|  A|
|09J|Jekyll Island Air...|31.0744722| -81.4277778|  11| -4|  A|
|0A9|Elizabethton Muni...|36.3712222| -82.1734167|1593| -4|  A|
|0G6|Williams County A...|41.4673056| -84.5067778| 730| -5|  A|
|0G7|Finger Lakes Regi...|42.8835647| -76.7812318| 492| -5|  A|
|0P2|Shoestring Aviati...|39.7948244| -76.6471914|1000| -5|  U|
|0S9|Jefferson County ...|48.0538086|-122.8106436| 108| -8|  A|
+---+--------------------+----------+------------+----+---+---+
only showing top 10 rows



In [63]:
df_airports = df_airports.withColumnRenamed("faa", "dest")

In [64]:
df_airports.show(5)

+----+--------------------+----------+-----------+----+---+---+
|dest|                name|       lat|        lon| alt| tz|dst|
+----+--------------------+----------+-----------+----+---+---+
| 04G|   Lansdowne Airport|41.1304722|-80.6195833|1044| -5|  A|
| 06A|Moton Field Munic...|32.4605722|-85.6800278| 264| -5|  A|
| 06C| Schaumburg Regional|41.9893408|-88.1012428| 801| -6|  A|
| 06N|     Randall Airport| 41.431912|-74.3915611| 523| -5|  A|
| 09J|Jekyll Island Air...|31.0744722|-81.4277778|  11| -4|  A|
+----+--------------------+----------+-----------+----+---+---+
only showing top 5 rows



In [65]:
df_airports.createOrReplaceTempView('airports')
df_flights.createOrReplaceTempView('flights')

In [66]:
df_join = spark.sql('SELECT * FROM flights LEFT OUTER JOIN airports ON flights.dest = airports.dest')

In [67]:
df_join.show()

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+------------------+----+--------------------+---------+-----------+----+---+---+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|      duration_hrs|dest|                name|      lat|        lon| alt| tz|dst|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+------------------+----+--------------------+---------+-----------+----+---+---+
|2014|   12|  8|     658|       -7|     935|       -5|     VX| N846VA|  1780|   SEA| LAX|     132|     954|   6|    58|               2.2| LAX|    Los Angeles Intl|33.942536|-118.408075| 126| -8|  A|
|2014|    1| 22|    1040|        5|    1505|        5|     AS| N559AS|   851|   SEA| HNL|     360|    2677|  10|    40|               6.0| HNL|       Honolulu Intl|21.318681|-157.922428|  13|-10|  N|


### Rewrite the previous query by using DataFrame API `.join` method. 

In PySpark, we can use `.join` method to perform joins. This method takes three arguments. 
- The first argument is the second DataFrame that we want to join with the first one. 
- The second argument, `on`, is the name of the key column(s) as a string. The names of the key column(s) must be the same in each table. 
- The third argument, `how`, specifies the kind of join to perform. 

To perform left outer join set the value of `how` to `"leftouter"`.

In [68]:
df_flights.join(df_airports,'dest','leftouter').show(5)

+----+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+--------+--------+----+------+------------------+--------------------+---------+-----------+---+---+---+
|dest|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|air_time|distance|hour|minute|      duration_hrs|                name|      lat|        lon|alt| tz|dst|
+----+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+--------+--------+----+------+------------------+--------------------+---------+-----------+---+---+---+
| LAX|2014|   12|  8|     658|       -7|     935|       -5|     VX| N846VA|  1780|   SEA|     132|     954|   6|    58|               2.2|    Los Angeles Intl|33.942536|-118.408075|126| -8|  A|
| HNL|2014|    1| 22|    1040|        5|    1505|        5|     AS| N559AS|   851|   SEA|     360|    2677|  10|    40|               6.0|       Honolulu Intl|21.318681|-157.922428| 13|-10|  N|
| SFO|2014|    3|  9|    1443|

In [71]:
joined_df = df_flights.join(df_airports, df_flights.dest == df_airports.dest, "leftouter").show()

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+------------------+----+--------------------+---------+-----------+----+---+---+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|      duration_hrs|dest|                name|      lat|        lon| alt| tz|dst|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+------------------+----+--------------------+---------+-----------+----+---+---+
|2014|   12|  8|     658|       -7|     935|       -5|     VX| N846VA|  1780|   SEA| LAX|     132|     954|   6|    58|               2.2| LAX|    Los Angeles Intl|33.942536|-118.408075| 126| -8|  A|
|2014|    1| 22|    1040|        5|    1505|        5|     AS| N559AS|   851|   SEA| HNL|     360|    2677|  10|    40|               6.0| HNL|       Honolulu Intl|21.318681|-157.922428|  13|-10|  N|
