In [None]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m3.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=8f7c4669db5cbf2a083523cba24a2a4c49c1412ab74a9d4ee79dbb3838e97f66
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [None]:
import os
import time
import numpy as np
import pandas as pd

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import warnings
warnings.filterwarnings('ignore')

In [None]:
pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', None)

In [None]:
spark = SparkSession.builder.appName('flight_delay').getOrCreate()

In [None]:
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

In [None]:
file_path = 'flights.csv'
file_path1 = 'airlines.csv'
file_path2 = 'airports.csv'

flights = spark.read.csv(file_path,
                         inferSchema = True,
                         header = True)

airlines = spark.read.csv(file_path1,
                         inferSchema = True,
                         header = True)

airports = spark.read.csv(file_path2,
                         inferSchema = True,
                         header = True)

In [None]:
# display dataframe
flights.show(2)

+-----+------+----+-----------+-------+-------------+-----------+--------------+-------------------+-------------------+--------------+---------------+--------+----------+--------------+------------+--------+--------+---------+-------+-----------------+------------+-------------+--------+---------+-------------------+----------------+--------------+-------------+-------------------+-------------+
|YEAR_|MONTH_|DAY_|DAY_OF_WEEK|AIRLINE|FLIGHT_NUMBER|TAIL_NUMBER|ORIGIN_AIRPORT|DESTINATION_AIRPORT|SCHEDULED_DEPARTURE|DEPARTURE_TIME|DEPARTURE_DELAY|TAXI_OUT|WHEELS_OFF|SCHEDULED_TIME|ELAPSED_TIME|AIR_TIME|DISTANCE|WHEELS_ON|TAXI_IN|SCHEDULED_ARRIVAL|ARRIVAL_TIME|ARRIVAL_DELAY|DIVERTED|CANCELLED|CANCELLATION_REASON|AIR_SYSTEM_DELAY|SECURITY_DELAY|AIRLINE_DELAY|LATE_AIRCRAFT_DELAY|WEATHER_DELAY|
+-----+------+----+-----------+-------+-------------+-----------+--------------+-------------------+-------------------+--------------+---------------+--------+----------+--------------+------------+-

In [None]:
# display in pandas format
flights.toPandas().head(2)

Unnamed: 0,YEAR_,MONTH_,DAY_,DAY_OF_WEEK,AIRLINE,FLIGHT_NUMBER,TAIL_NUMBER,ORIGIN_AIRPORT,DESTINATION_AIRPORT,SCHEDULED_DEPARTURE,DEPARTURE_TIME,DEPARTURE_DELAY,TAXI_OUT,WHEELS_OFF,SCHEDULED_TIME,ELAPSED_TIME,AIR_TIME,DISTANCE,WHEELS_ON,TAXI_IN,SCHEDULED_ARRIVAL,ARRIVAL_TIME,ARRIVAL_DELAY,DIVERTED,CANCELLED,CANCELLATION_REASON,AIR_SYSTEM_DELAY,SECURITY_DELAY,AIRLINE_DELAY,LATE_AIRCRAFT_DELAY,WEATHER_DELAY
0,2015,1,1,4,AS,98,N407AS,ANC,SEA,5.0,2354.0,-11.0,21.0,15.0,205.0,194.0,169.0,1448.0,404.0,4.0,430.0,408.0,-22.0,0.0,0.0,,,,,,
1,2015,1,1,4,AA,2336,N3KUAA,LAX,PBI,10.0,2.0,-8.0,12.0,14.0,280.0,279.0,263.0,2330.0,737.0,4.0,750.0,741.0,-9.0,0.0,0.0,,,,,,


In [None]:
flights.printSchema()

root
 |-- YEAR_: integer (nullable = true)
 |-- MONTH_: integer (nullable = true)
 |-- DAY_: integer (nullable = true)
 |-- DAY_OF_WEEK: integer (nullable = true)
 |-- AIRLINE: string (nullable = true)
 |-- FLIGHT_NUMBER: integer (nullable = true)
 |-- TAIL_NUMBER: string (nullable = true)
 |-- ORIGIN_AIRPORT: string (nullable = true)
 |-- DESTINATION_AIRPORT: string (nullable = true)
 |-- SCHEDULED_DEPARTURE: integer (nullable = true)
 |-- DEPARTURE_TIME: integer (nullable = true)
 |-- DEPARTURE_DELAY: integer (nullable = true)
 |-- TAXI_OUT: integer (nullable = true)
 |-- WHEELS_OFF: integer (nullable = true)
 |-- SCHEDULED_TIME: integer (nullable = true)
 |-- ELAPSED_TIME: integer (nullable = true)
 |-- AIR_TIME: integer (nullable = true)
 |-- DISTANCE: integer (nullable = true)
 |-- WHEELS_ON: integer (nullable = true)
 |-- TAXI_IN: integer (nullable = true)
 |-- SCHEDULED_ARRIVAL: integer (nullable = true)
 |-- ARRIVAL_TIME: integer (nullable = true)
 |-- ARRIVAL_DELAY: integer (n

In [None]:
flights = flights.drop('AIR_SYSTEM_DELAY','SECURITY_DELAY','AIRLINE_DELAY','LATE_AIRCRAFT_DELAY','WEATHER_DELAY')

In [None]:
airports = airports.drop('_c7','_c8','_c9','_c10','_c11','_c12','_c13','_c14','_c15','_c16','_c17','_c18','_c19','_c20','_c21','_c22','_c23','_c24','_c25','_c26','_c27','_c28','_c29', '_c30','_c31','_c32', '_c33','CANCELLATION_REASON', 'AIR_SYSTEM_DELAY','SECURITY_DELAY','AIRLINE_DELAY')

In [None]:
# using spark with sql, first register a sql temporary view
flights.createOrReplaceTempView("flights")

# use spark.sql and query inside it and then return a pandas dataframe
query = "select YEAR_, DESTINATION_AIRPORT, count(DESTINATION_AIRPORT) as dest_count\
         from flights\
         where ORIGIN_AIRPORT = 'SEA'\
         group by 1,2\
         order by 1 desc"

flight_query = spark.sql(query)
flight_query.show(5)

+-----+-------------------+----------+
|YEAR_|DESTINATION_AIRPORT|dest_count|
+-----+-------------------+----------+
| 2015|                JFK|        16|
| 2015|                JNU|         8|
| 2015|                SAT|         3|
| 2015|                PHX|        58|
| 2015|                MSY|         4|
+-----+-------------------+----------+
only showing top 5 rows



In [None]:
# We can also register multiple temporary views
airlines.createOrReplaceTempView("airlines")
airports.createOrReplaceTempView("airports")

In [None]:
# To check the temporary views created.
spark.catalog.listTables()

[Table(name='airlines', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True),
 Table(name='airports', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True),
 Table(name='flights', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True)]

In [None]:
spark.sql("SHOW views").show()

+---------+--------+-----------+
|namespace|viewName|isTemporary|
+---------+--------+-----------+
|         |airlines|       true|
|         |airports|       true|
|         | flights|       true|
+---------+--------+-----------+



In [None]:
# to create a spark dataframe from the table using spark.table
temp_table = spark.table("flights")
temp_table.show(5)

+-----+------+----+-----------+-------+-------------+-----------+--------------+-------------------+-------------------+--------------+---------------+--------+----------+--------------+------------+--------+--------+---------+-------+-----------------+------------+-------------+--------+---------+-------------------+
|YEAR_|MONTH_|DAY_|DAY_OF_WEEK|AIRLINE|FLIGHT_NUMBER|TAIL_NUMBER|ORIGIN_AIRPORT|DESTINATION_AIRPORT|SCHEDULED_DEPARTURE|DEPARTURE_TIME|DEPARTURE_DELAY|TAXI_OUT|WHEELS_OFF|SCHEDULED_TIME|ELAPSED_TIME|AIR_TIME|DISTANCE|WHEELS_ON|TAXI_IN|SCHEDULED_ARRIVAL|ARRIVAL_TIME|ARRIVAL_DELAY|DIVERTED|CANCELLED|CANCELLATION_REASON|
+-----+------+----+-----------+-------+-------------+-----------+--------------+-------------------+-------------------+--------------+---------------+--------+----------+--------------+------------+--------+--------+---------+-------+-----------------+------------+-------------+--------+---------+-------------------+
| 2015|     1|   1|          4|     AS| 

In [None]:
# to save the dataframe in parquet format, remember the inital flight csv dataframe
flight_df = flights.write.parquet('flights.parquet', mode = 'overwrite')

In [None]:
# to read the parquet file into a new dataframe
flight_df = spark.read.parquet('flights.parquet')
flight_df

DataFrame[YEAR_: int, MONTH_: int, DAY_: int, DAY_OF_WEEK: int, AIRLINE: string, FLIGHT_NUMBER: int, TAIL_NUMBER: string, ORIGIN_AIRPORT: string, DESTINATION_AIRPORT: string, SCHEDULED_DEPARTURE: int, DEPARTURE_TIME: int, DEPARTURE_DELAY: int, TAXI_OUT: int, WHEELS_OFF: int, SCHEDULED_TIME: int, ELAPSED_TIME: int, AIR_TIME: int, DISTANCE: int, WHEELS_ON: int, TAXI_IN: int, SCHEDULED_ARRIVAL: int, ARRIVAL_TIME: int, ARRIVAL_DELAY: int, DIVERTED: int, CANCELLED: int, CANCELLATION_REASON: string]

In [None]:
# register in temp table
flight_df.createOrReplaceTempView('flights')

In [None]:
# run sql query on the parquet file to find the distinct count of origin and destination airport
origin_dest_count = spark.sql('select count(distinct origin_airport) as origin_airport\
                               from flights').collect()[0]
print('Count of origin airport: %d' % origin_dest_count)

Count of origin airport: 312


In [None]:
# add a duration_hrs column
flights = flights.withColumn("duration_hrs", flights.AIR_TIME / 60)

# to view changes
flights.select("duration_hrs").show(5)

+-----------------+
|     duration_hrs|
+-----------------+
|2.816666666666667|
|4.383333333333334|
|4.433333333333334|
|              4.3|
|3.316666666666667|
+-----------------+
only showing top 5 rows



In [None]:
# using the lower function to transform all rows in CANCELLATION_REASON to lowercase
flights = flights.withColumn("cancellation_reasons", F.lower('CANCELLATION_REASON'))
flights.select("cancellation_reasons").show(5)

+--------------------+
|cancellation_reasons|
+--------------------+
|                NULL|
|                NULL|
|                NULL|
|                NULL|
|                NULL|
+--------------------+
only showing top 5 rows



In [None]:
# creating a row ID with distinct row
flights_df = flights.select(flights["origin_airport"]).distinct()

# count rows in flights_df and number of partition
print('\nThere are %d rows in the flights_df dataframe.\n' % flights_df.count())
print('\nThere are %d partition in the flights_df dataframe.\n' % flights_df.rdd.getNumPartitions())

# add a row ID
flights_df = flights_df.withColumn('row_id', F.monotonically_increasing_id())

# show the highest 10 row
flights_df.orderBy(flights_df.row_id.desc()).show(2)


There are 313 rows in the flights_df dataframe.


There are 1 partition in the flights_df dataframe.

+--------------+------+
|origin_airport|row_id|
+--------------+------+
|          NULL|   312|
|           ASE|   311|
+--------------+------+
only showing top 2 rows



In [None]:
# filter where distance > 2000
long_flight = flights.filter("distance > 2000")
long_flight.show(2)

+-----+------+----+-----------+-------+-------------+-----------+--------------+-------------------+-------------------+--------------+---------------+--------+----------+--------------+------------+--------+--------+---------+-------+-----------------+------------+-------------+--------+---------+-------------------+-----------------+--------------------+
|YEAR_|MONTH_|DAY_|DAY_OF_WEEK|AIRLINE|FLIGHT_NUMBER|TAIL_NUMBER|ORIGIN_AIRPORT|DESTINATION_AIRPORT|SCHEDULED_DEPARTURE|DEPARTURE_TIME|DEPARTURE_DELAY|TAXI_OUT|WHEELS_OFF|SCHEDULED_TIME|ELAPSED_TIME|AIR_TIME|DISTANCE|WHEELS_ON|TAXI_IN|SCHEDULED_ARRIVAL|ARRIVAL_TIME|ARRIVAL_DELAY|DIVERTED|CANCELLED|CANCELLATION_REASON|     duration_hrs|cancellation_reasons|
+-----+------+----+-----------+-------+-------------+-----------+--------------+-------------------+-------------------+--------------+---------------+--------+----------+--------------+------------+--------+--------+---------+-------+-----------------+------------+-------------+--

In [None]:
# define avg_speed using .selectExpr() method
avg_speed = flights.selectExpr("origin_airport",
                               "destination_airport",
                               "tail_number",
                               "airline",
                               "round(distance/(air_time/60),2) as avg_speed")
avg_speed.show(5)

+--------------+-------------------+-----------+-------+---------+
|origin_airport|destination_airport|tail_number|airline|avg_speed|
+--------------+-------------------+-----------+-------+---------+
|           ANC|                SEA|     N407AS|     AS|   514.08|
|           LAX|                PBI|     N3KUAA|     AA|   531.56|
|           SFO|                CLT|     N171US|     US|   517.89|
|           LAX|                MIA|     N3HYAA|     AA|   544.65|
|           SEA|                ANC|     N527AS|     AS|   436.58|
+--------------+-------------------+-----------+-------+---------+
only showing top 5 rows



In [None]:
# using the .contains() method
flights.filter(flights['airline'].contains('AA')).orderBy(['origin_airport', 'destination_airport'], ascending = [True, True]).show(2)

+-----+------+----+-----------+-------+-------------+-----------+--------------+-------------------+-------------------+--------------+---------------+--------+----------+--------------+------------+--------+--------+---------+-------+-----------------+------------+-------------+--------+---------+-------------------+------------------+--------------------+
|YEAR_|MONTH_|DAY_|DAY_OF_WEEK|AIRLINE|FLIGHT_NUMBER|TAIL_NUMBER|ORIGIN_AIRPORT|DESTINATION_AIRPORT|SCHEDULED_DEPARTURE|DEPARTURE_TIME|DEPARTURE_DELAY|TAXI_OUT|WHEELS_OFF|SCHEDULED_TIME|ELAPSED_TIME|AIR_TIME|DISTANCE|WHEELS_ON|TAXI_IN|SCHEDULED_ARRIVAL|ARRIVAL_TIME|ARRIVAL_DELAY|DIVERTED|CANCELLED|CANCELLATION_REASON|      duration_hrs|cancellation_reasons|
+-----+------+----+-----------+-------+-------------+-----------+--------------+-------------------+-------------------+--------------+---------------+--------+----------+--------------+------------+--------+--------+---------+-------+-----------------+------------+-------------+

In [None]:
# using ~(negate)
flights.filter(~ F.col('airline').contains('AA')).show(2)

+-----+------+----+-----------+-------+-------------+-----------+--------------+-------------------+-------------------+--------------+---------------+--------+----------+--------------+------------+--------+--------+---------+-------+-----------------+------------+-------------+--------+---------+-------------------+-----------------+--------------------+
|YEAR_|MONTH_|DAY_|DAY_OF_WEEK|AIRLINE|FLIGHT_NUMBER|TAIL_NUMBER|ORIGIN_AIRPORT|DESTINATION_AIRPORT|SCHEDULED_DEPARTURE|DEPARTURE_TIME|DEPARTURE_DELAY|TAXI_OUT|WHEELS_OFF|SCHEDULED_TIME|ELAPSED_TIME|AIR_TIME|DISTANCE|WHEELS_ON|TAXI_IN|SCHEDULED_ARRIVAL|ARRIVAL_TIME|ARRIVAL_DELAY|DIVERTED|CANCELLED|CANCELLATION_REASON|     duration_hrs|cancellation_reasons|
+-----+------+----+-----------+-------+-------------+-----------+--------------+-------------------+-------------------+--------------+---------------+--------+----------+--------------+------------+--------+--------+---------+-------+-----------------+------------+-------------+--

In [None]:
# using .like()
flights.filter(flights['airline'].like('%A%')).show(2)

+-----+------+----+-----------+-------+-------------+-----------+--------------+-------------------+-------------------+--------------+---------------+--------+----------+--------------+------------+--------+--------+---------+-------+-----------------+------------+-------------+--------+---------+-------------------+-----------------+--------------------+
|YEAR_|MONTH_|DAY_|DAY_OF_WEEK|AIRLINE|FLIGHT_NUMBER|TAIL_NUMBER|ORIGIN_AIRPORT|DESTINATION_AIRPORT|SCHEDULED_DEPARTURE|DEPARTURE_TIME|DEPARTURE_DELAY|TAXI_OUT|WHEELS_OFF|SCHEDULED_TIME|ELAPSED_TIME|AIR_TIME|DISTANCE|WHEELS_ON|TAXI_IN|SCHEDULED_ARRIVAL|ARRIVAL_TIME|ARRIVAL_DELAY|DIVERTED|CANCELLED|CANCELLATION_REASON|     duration_hrs|cancellation_reasons|
+-----+------+----+-----------+-------+-------------+-----------+--------------+-------------------+-------------------+--------------+---------------+--------+----------+--------------+------------+--------+--------+---------+-------+-----------------+------------+-------------+--

In [None]:
# isin
flights.filter(flights['origin_airport'].isin(['PDX', 'LAX', 'SEA'])).show(2)

+-----+------+----+-----------+-------+-------------+-----------+--------------+-------------------+-------------------+--------------+---------------+--------+----------+--------------+------------+--------+--------+---------+-------+-----------------+------------+-------------+--------+---------+-------------------+-----------------+--------------------+
|YEAR_|MONTH_|DAY_|DAY_OF_WEEK|AIRLINE|FLIGHT_NUMBER|TAIL_NUMBER|ORIGIN_AIRPORT|DESTINATION_AIRPORT|SCHEDULED_DEPARTURE|DEPARTURE_TIME|DEPARTURE_DELAY|TAXI_OUT|WHEELS_OFF|SCHEDULED_TIME|ELAPSED_TIME|AIR_TIME|DISTANCE|WHEELS_ON|TAXI_IN|SCHEDULED_ARRIVAL|ARRIVAL_TIME|ARRIVAL_DELAY|DIVERTED|CANCELLED|CANCELLATION_REASON|     duration_hrs|cancellation_reasons|
+-----+------+----+-----------+-------+-------------+-----------+--------------+-------------------+-------------------+--------------+---------------+--------+----------+--------------+------------+--------+--------+---------+-------+-----------------+------------+-------------+--

In [None]:
flights.select(flights.ORIGIN_AIRPORT, flights.DESTINATION_AIRPORT, flights.AIRLINE,
               F.when(flights.AIRLINE == 'OO', "yes").alias("airline_status")).show(5)

+--------------+-------------------+-------+--------------+
|ORIGIN_AIRPORT|DESTINATION_AIRPORT|AIRLINE|airline_status|
+--------------+-------------------+-------+--------------+
|           ANC|                SEA|     AS|          NULL|
|           LAX|                PBI|     AA|          NULL|
|           SFO|                CLT|     US|          NULL|
|           LAX|                MIA|     AA|          NULL|
|           SEA|                ANC|     AS|          NULL|
+--------------+-------------------+-------+--------------+
only showing top 5 rows



In [None]:
flights.select(flights.ORIGIN_AIRPORT, flights.DESTINATION_AIRPORT, flights.AIRLINE,
               F.when(flights.AIRLINE == 'OO', "yes")
               .when(flights.AIRLINE != 'OO', "no").alias("airline_status")).show(5)

+--------------+-------------------+-------+--------------+
|ORIGIN_AIRPORT|DESTINATION_AIRPORT|AIRLINE|airline_status|
+--------------+-------------------+-------+--------------+
|           ANC|                SEA|     AS|            no|
|           LAX|                PBI|     AA|            no|
|           SFO|                CLT|     US|            no|
|           LAX|                MIA|     AA|            no|
|           SEA|                ANC|     AS|            no|
+--------------+-------------------+-------+--------------+
only showing top 5 rows



In [None]:
# using the otherwise() clause
flights.select(flights.ORIGIN_AIRPORT, flights.DESTINATION_AIRPORT, flights.AIRLINE,
               F.when(flights.AIRLINE == 'OO', "yes")
               .otherwise("no").alias("airline_status")).orderBy("airline_status", ascending = False).show(5)

+--------------+-------------------+-------+--------------+
|ORIGIN_AIRPORT|DESTINATION_AIRPORT|AIRLINE|airline_status|
+--------------+-------------------+-------+--------------+
|           PSC|                SLC|     OO|           yes|
|           ONT|                SFO|     OO|           yes|
|           MTJ|                DEN|     OO|           yes|
|           BJI|                MSP|     OO|           yes|
|           ICT|                ORD|     OO|           yes|
+--------------+-------------------+-------+--------------+
only showing top 5 rows



In [None]:
# find the shortest flight from PDX in terms of distance
flights.filter("origin_airport == 'PDX'").groupby().min("distance").show(5)

+-------------+
|min(distance)|
+-------------+
|          129|
+-------------+



In [None]:
# find the longest flight from SEA in terms of distance
flights.filter("origin_airport == 'SEA'").groupby().max("distance").show()

+-------------+
|max(distance)|
+-------------+
|         2724|
+-------------+



In [None]:
# group by origin and find the avg duration of flight
flights.groupby("origin_airport").avg("air_time").show(5)

+--------------+------------------+
|origin_airport|     avg(air_time)|
+--------------+------------------+
|           PSE|184.58333333333334|
|           INL|40.833333333333336|
|           MSY|103.88169014084507|
|           PPG|             299.0|
|           GEG| 86.12359550561797|
+--------------+------------------+
only showing top 5 rows



In [None]:
# group by origin, destination and find the avg duration of flight
flights.groupby("origin_airport", "destination_airport").avg("air_time").show(5)

+--------------+-------------------+------------------+
|origin_airport|destination_airport|     avg(air_time)|
+--------------+-------------------+------------------+
|           BQN|                MCO|           150.875|
|           PHL|                MCO|129.22916666666666|
|           MCI|                IAH|            111.75|
|           SPI|                ORD|36.333333333333336|
|           SNA|                PHX| 54.02777777777778|
+--------------+-------------------+------------------+
only showing top 5 rows



In [None]:
# alternatively
flights.groupby("origin_airport", "destination_airport").agg({"air_time": 'mean'}).show(5)

+--------------+-------------------+------------------+
|origin_airport|destination_airport|     avg(air_time)|
+--------------+-------------------+------------------+
|           BQN|                MCO|           150.875|
|           PHL|                MCO|129.22916666666666|
|           MCI|                IAH|            111.75|
|           SPI|                ORD|36.333333333333336|
|           SNA|                PHX| 54.02777777777778|
+--------------+-------------------+------------------+
only showing top 5 rows



In [None]:
# rename IATA_CODE column in airports table to airport_code
airports = airports.withColumnRenamed("IATA_CODE", "origin_airport")

In [None]:
# join flights dataframe to airports dataframe
flight_airports = flights.join(airports,\
                          flights["ORIGIN_AIRPORT"] == airports["origin_airport"])
flight_airports.show(2)

+-----+------+----+-----------+-------+-------------+-----------+--------------+-------------------+-------------------+--------------+---------------+--------+----------+--------------+------------+--------+--------+---------+-------+-----------------+------------+-------------+--------+---------+-------------------+-----------------+--------------------+--------------+--------------------+-----------+-----+-------+--------+----------+
|YEAR_|MONTH_|DAY_|DAY_OF_WEEK|AIRLINE|FLIGHT_NUMBER|TAIL_NUMBER|ORIGIN_AIRPORT|DESTINATION_AIRPORT|SCHEDULED_DEPARTURE|DEPARTURE_TIME|DEPARTURE_DELAY|TAXI_OUT|WHEELS_OFF|SCHEDULED_TIME|ELAPSED_TIME|AIR_TIME|DISTANCE|WHEELS_ON|TAXI_IN|SCHEDULED_ARRIVAL|ARRIVAL_TIME|ARRIVAL_DELAY|DIVERTED|CANCELLED|CANCELLATION_REASON|     duration_hrs|cancellation_reasons|origin_airport|             AIRPORT|       CITY|STATE|COUNTRY|LATITUDE| LONGITUDE|
+-----+------+----+-----------+-------+-------------+-----------+--------------+-------------------+------------------

In [None]:
# rename IATA_CODE column in airports table to airline
airlines = airlines.withColumnRenamed("IATA_CODE", "airline")

# join flight_airports dataframe to airports dataframe
flight_airports_airlines = flight_airports.join(airlines, on = "airline", how = "leftouter")
print(flight_airports_airlines.show(2))

+-------+-----+------+----+-----------+-------------+-----------+--------------+-------------------+-------------------+--------------+---------------+--------+----------+--------------+------------+--------+--------+---------+-------+-----------------+------------+-------------+--------+---------+-------------------+-----------------+--------------------+--------------+--------------------+-----------+-----+-------+--------+----------+--------------------+
|AIRLINE|YEAR_|MONTH_|DAY_|DAY_OF_WEEK|FLIGHT_NUMBER|TAIL_NUMBER|ORIGIN_AIRPORT|DESTINATION_AIRPORT|SCHEDULED_DEPARTURE|DEPARTURE_TIME|DEPARTURE_DELAY|TAXI_OUT|WHEELS_OFF|SCHEDULED_TIME|ELAPSED_TIME|AIR_TIME|DISTANCE|WHEELS_ON|TAXI_IN|SCHEDULED_ARRIVAL|ARRIVAL_TIME|ARRIVAL_DELAY|DIVERTED|CANCELLED|CANCELLATION_REASON|     duration_hrs|cancellation_reasons|origin_airport|             AIRPORT|       CITY|STATE|COUNTRY|LATITUDE| LONGITUDE|             AIRLINE|
+-------+-----+------+----+-----------+-------------+-----------+-----------

In [None]:
start_time = time.time()

# add caching to the rows in flights_df
flights_df = flights_df.distinct().cache()
#  count rows in flights_df noting the time
print("Counting %d rows took %f seconds" % (flights_df.count(), time.time() - start_time))


start_time = time.time()
#  count rows in flights_df again, noting the time
print("Counting %d rows took %f seconds" % (flights_df.count(), time.time() - start_time))

Counting 313 rows took 10.186875 seconds
Counting 313 rows took 3.011356 seconds


In [None]:
# to check if a dataframe is cached
print('Is flights_df cached? : %s' % flights_df.is_cached)

Is flights_df cached? : True


In [None]:
# to remove a cached dataframe
flights_df.unpersist()

print('Is flights_df cached? : %s' % flights_df.is_cached)

Is flights_df cached? : False
