In [20]:
import pyspark
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from pyspark.sql import functions as func
from pyspark.sql.types import StringType,FloatType
import nltk
from pyspark.sql import SparkSession


In [22]:
# reading in flight data csv
spark = SparkSession.builder.master('local[*]').appName('FlightAnalysis').getOrCreate()
combinedFlightsDF = spark.read.options(header = True, inferSchema = True).csv("Combined_Flights_2022.csv")

                                                                                

In [23]:
# drops unnecessary columns
drop_cols = ("FlightDate","Origin","Dest","Diverted","DepDelayMinutes","ArrDelayMinutes","IATA_Code_Operating_Airline","DestAirportSeqID","DestCityMarketID","DOT_ID_Operating_Airline","DOT_ID_Marketing_Airline","IATA_Code_Marketing_Airline","Operated_or_Branded_Code_Share_Partners","CRSDepTime","DepDelay","AirTime","CRSElapsedTime", "Distance", "Marketing_Airline_Network", "Flight_Number_Marketing_Airline", "Operating_Airline", "Tail_Number", "Flight_Number_Operating_Airline", "OriginAirportID",  "OriginAirportSeqID", "OriginCityMarketID",  "OriginState",  "OriginStateFips", "OriginWac", "DestAirportID", "DestState", "DestStateFips", "DestStateName", "DestWac", "DepartureDelayGroups", "DepTimeBlk", "TaxiOut", "WheelsOff", "WheelsOn", "TaxiIn", "CRSArrTime", "ArrDelay", "ArrivalDelayGroups", "ArrTimeBlk", "DistanceGroup", "DivAirportLandings")
combinedFlightsDF = combinedFlightsDF.drop(*drop_cols)

In [70]:
combinedFlightsDF.show(5)

+---------+---------+-------+-------+-----------------+----+-------+-----+----------+---------+---------------+---------------+------------+--------+--------+
|  Airline|Cancelled|DepTime|ArrTime|ActualElapsedTime|Year|Quarter|Month|DayofMonth|DayOfWeek| OriginCityName|OriginStateName|DestCityName|DepDel15|ArrDel15|
+---------+---------+-------+-------+-----------------+----+-------+-----+----------+---------+---------------+---------------+------------+--------+--------+
|Envoy Air|    false| 1209.0| 1350.0|            101.0|2019|      2|    4|         1|        1|Little Rock, AR|       Arkansas| Chicago, IL|     0.0|     0.0|
|Envoy Air|    false| 1200.0| 1348.0|            108.0|2019|      2|    4|         2|        2|Little Rock, AR|       Arkansas| Chicago, IL|     0.0|     0.0|
|Envoy Air|    false| 1203.0| 1342.0|             99.0|2019|      2|    4|         3|        3|Little Rock, AR|       Arkansas| Chicago, IL|     0.0|     0.0|
|Envoy Air|    false| 1435.0| 1621.0|         

In [24]:
searchBarDropCols = ("Cancelled","Quarter","DayOfWeek")
searchBarDF = combinedFlightsDF.drop(*searchBarDropCols)

In [6]:
searchBarDF.show(5)

+---------+-------+-------+-----------------+----+-----+----------+---------------+------------+--------+--------+
|  Airline|DepTime|ArrTime|ActualElapsedTime|Year|Month|DayofMonth| OriginCityName|DestCityName|DepDel15|ArrDel15|
+---------+-------+-------+-----------------+----+-----+----------+---------------+------------+--------+--------+
|Envoy Air| 1209.0| 1350.0|            101.0|2019|    4|         1|Little Rock, AR| Chicago, IL|     0.0|     0.0|
|Envoy Air| 1200.0| 1348.0|            108.0|2019|    4|         2|Little Rock, AR| Chicago, IL|     0.0|     0.0|
|Envoy Air| 1203.0| 1342.0|             99.0|2019|    4|         3|Little Rock, AR| Chicago, IL|     0.0|     0.0|
|Envoy Air| 1435.0| 1621.0|            106.0|2019|    4|         4|Little Rock, AR| Chicago, IL|     1.0|     1.0|
|Envoy Air| 1216.0| 1410.0|            114.0|2019|    4|         5|Little Rock, AR| Chicago, IL|     0.0|     0.0|
+---------+-------+-------+-----------------+----+-----+----------+-------------

In [25]:
#Creates csv with specified data (DONT RUN UNLESS NEEDED)
combinedFlightsDF.write.option("header",True).csv("2022FlightsSearchBar")

                                                                                

In [82]:
#dont need to run
# Find # of flights per year
yearCount = combinedFlightsDF.groupBy("Year").count()
yearCount.show()



+----+-------+
|Year|  count|
+----+-------+
|2019|8091684|
|2018|5689521|
|2022|4078327|
|2020|5022397|
|2021|6311871|
+----+-------+



                                                                                

In [26]:
# creating new df to analyze delayed flights by airline, orgin city, and month/year
drop_cols = ("DestCityName", "DayOfMonth", "DepTime", "ArrTime", "ActualElapsedTime")
delayDF = combinedFlightsDF.drop(*drop_cols)
delayDF.show(5)

+--------------------+---------+----+-------+-----+---------+--------------------+---------------+--------+--------+
|             Airline|Cancelled|Year|Quarter|Month|DayOfWeek|      OriginCityName|OriginStateName|DepDel15|ArrDel15|
+--------------------+---------+----+-------+-----+---------+--------------------+---------------+--------+--------+
|Commutair Aka Cha...|    false|2022|      2|    4|        1|  Grand Junction, CO|       Colorado|     0.0|     0.0|
|Commutair Aka Cha...|    false|2022|      2|    4|        1|Harlingen/San Ben...|          Texas|     0.0|     0.0|
|Commutair Aka Cha...|    false|2022|      2|    4|        1|         Durango, CO|       Colorado|     0.0|     0.0|
|Commutair Aka Cha...|    false|2022|      2|    4|        1|         Houston, TX|          Texas|     0.0|     0.0|
|Commutair Aka Cha...|    false|2022|      2|    4|        1|         Durango, CO|       Colorado|     0.0|     0.0|
+--------------------+---------+----+-------+-----+---------+---

In [None]:
delayDF.columns

In [27]:
# uses bitwise or operator to find overall delayed flights
delayDF = delayDF.withColumn("ifDelayed", func.when((delayDF.ArrDel15 > 0) | (delayDF.DepDel15 > 0), 1).otherwise(0))
delayDF.show(10)

+--------------------+---------+----+-------+-----+---------+--------------------+---------------+--------+--------+---------+
|             Airline|Cancelled|Year|Quarter|Month|DayOfWeek|      OriginCityName|OriginStateName|DepDel15|ArrDel15|ifDelayed|
+--------------------+---------+----+-------+-----+---------+--------------------+---------------+--------+--------+---------+
|Commutair Aka Cha...|    false|2022|      2|    4|        1|  Grand Junction, CO|       Colorado|     0.0|     0.0|        0|
|Commutair Aka Cha...|    false|2022|      2|    4|        1|Harlingen/San Ben...|          Texas|     0.0|     0.0|        0|
|Commutair Aka Cha...|    false|2022|      2|    4|        1|         Durango, CO|       Colorado|     0.0|     0.0|        0|
|Commutair Aka Cha...|    false|2022|      2|    4|        1|         Houston, TX|          Texas|     0.0|     0.0|        0|
|Commutair Aka Cha...|    false|2022|      2|    4|        1|         Durango, CO|       Colorado|     0.0|    

In [28]:
#create column making cancelled into bool (integer) value
delayDF = delayDF.withColumn('ifCancelled', func.when(delayDF.Cancelled == 'false', 0).otherwise(1))
delayDF.show(10)

+--------------------+---------+----+-------+-----+---------+--------------------+---------------+--------+--------+---------+-----------+
|             Airline|Cancelled|Year|Quarter|Month|DayOfWeek|      OriginCityName|OriginStateName|DepDel15|ArrDel15|ifDelayed|ifCancelled|
+--------------------+---------+----+-------+-----+---------+--------------------+---------------+--------+--------+---------+-----------+
|Commutair Aka Cha...|    false|2022|      2|    4|        1|  Grand Junction, CO|       Colorado|     0.0|     0.0|        0|          0|
|Commutair Aka Cha...|    false|2022|      2|    4|        1|Harlingen/San Ben...|          Texas|     0.0|     0.0|        0|          0|
|Commutair Aka Cha...|    false|2022|      2|    4|        1|         Durango, CO|       Colorado|     0.0|     0.0|        0|          0|
|Commutair Aka Cha...|    false|2022|      2|    4|        1|         Houston, TX|          Texas|     0.0|     0.0|        0|          0|
|Commutair Aka Cha...|    f

In [37]:
# groups delayed flights by airline
numDelayedByAirline = delayDF.groupBy("Airline").agg({"ifDelayed" : "sum"})
numDelayedByAirline = numDelayedByAirline.withColumnRenamed("sum(ifDelayed)", "2022sumDelayedByAirline") #CHANGED
numDelayedByAirline.show(30,truncate=False)

# to check if delayed flight per airline add up to original amount
# sumdf = numDelayed.agg({"sum(ifDelayed)" : "sum"})
# sumdf.show()



+-----------------------------------------+-----------------------+
|Airline                                  |2022sumDelayedByAirline|
+-----------------------------------------+-----------------------+
|GoJet Airlines, LLC d/b/a United Express |9614                   |
|Endeavor Air Inc.                        |25708                  |
|Allegiant Air                            |27412                  |
|SkyWest Airlines Inc.                    |85982                  |
|Air Wisconsin Airlines Corp              |7243                   |
|Frontier Airlines Inc.                   |31234                  |
|Southwest Airlines Co.                   |230921                 |
|JetBlue Airways                          |59127                  |
|Commutair Aka Champlain Enterprises, Inc.|9541                   |
|Capital Cargo International              |8914                   |
|Envoy Air                                |29116                  |
|Hawaiian Airlines Inc.                   |8577 

                                                                                

In [34]:
#total flights for each of the 28 airlines
airlineRank = combinedFlightsDF.groupBy("Airline").count() #28 airlines
airlineRank = airlineRank.withColumnRenamed("count", "2022flightCountByAirline") #CHANGED

In [15]:
airlineRank.show()



+--------------------+--------------------+
|             Airline|flightCountByAirline|
+--------------------+--------------------+
|   Endeavor Air Inc.|              998224|
|United Air Lines ...|             2354538|
|    Compass Airlines|              154985|
|         Comair Inc.|              957220|
|Southwest Airline...|             5474339|
|ExpressJet Airlin...|              353669|
|     JetBlue Airways|             1106079|
|Empire Airlines Inc.|               23122|
|           Envoy Air|             1072778|
|Capital Cargo Int...|              392011|
|Hawaiian Airlines...|              310782|
|  Mesa Airlines Inc.|              749216|
|American Airlines...|             3134117|
|   Republic Airlines|             1283704|
|    Spirit Air Lines|              836694|
|GoJet Airlines, L...|              276486|
|       Allegiant Air|              489400|
|SkyWest Airlines ...|             3159683|
|         Horizon Air|              471153|
|Air Wisconsin Air...|          

                                                                                

In [39]:
#1 add a new column to show ratio of delayed flights for each airline
numDelayedByAirline = numDelayedByAirline\
    .join(airlineRank, "Airline")\
    .withColumn("2022delayedRatio", ((func.col("2022SumDelayedByAirline") / func.col("2022flightCountByAirline")) * 100))\
    .drop("count")
#CHANGE

In [40]:
#1
numDelayedByAirline.show(10)

                                                                                

+--------------------+-----------------------+------------------------+------------------+
|             Airline|2022sumDelayedByAirline|2022flightCountByAirline|  2022delayedRatio|
+--------------------+-----------------------+------------------------+------------------+
|GoJet Airlines, L...|                   9614|                   34793|27.631994941511223|
|   Endeavor Air Inc.|                  25708|                  142624|18.025016827462416|
|       Allegiant Air|                  27412|                   73504| 37.29320853286896|
|SkyWest Airlines ...|                  85982|                  440807|19.505588613610943|
|Air Wisconsin Air...|                   7243|                   40300|17.972704714640198|
|Frontier Airlines...|                  31234|                   86557|36.084892036461525|
|Southwest Airline...|                 230921|                  731925|31.549817262697683|
|     JetBlue Airways|                  59127|                  156793| 37.71022941075176|

In [41]:
#Creates csv with specified data (DONT RUN UNLESS NEEDED)
combinedFlightsDF.write.option("header",True).csv("2022numDelayedByAirline")

                                                                                

In [46]:
#total flights for each of the origin cities
cityRank = combinedFlightsDF.groupBy("OriginCityName").count() 
cityRank = cityRank.withColumnRenamed("count", "2022totalFlightsByCity") #CHANGE

In [47]:
# groups delayed flights by city
numDelayedByCity = delayDF.groupBy("OriginCityName").agg({"ifDelayed" : "sum"})
numDelayedByCity = numDelayedByCity.withColumnRenamed("sum(ifDelayed)", "2022sumDelayedByCity") #CHANGE
numDelayedByCity.show(30,truncate=False)



+----------------------------+--------------------+
|OriginCityName              |2022sumDelayedByCity|
+----------------------------+--------------------+
|Gainesville, FL             |443                 |
|Richmond, VA                |2987                |
|Ontario, CA                 |2742                |
|Tucson, AZ                  |1618                |
|Pago Pago, TT               |16                  |
|Myrtle Beach, SC            |2149                |
|Medford, OR                 |729                 |
|Palm Springs, CA            |1865                |
|Durango, CO                 |479                 |
|St. Cloud, MN               |34                  |
|Mobile, AL                  |617                 |
|Corpus Christi, TX          |468                 |
|Huntsville, AL              |1066                |
|Pensacola, FL               |1510                |
|Fort Myers, FL              |7534                |
|Columbus, GA                |230                 |
|Springfield

                                                                                

In [49]:
#2 add a new column to show ratio of delayed flights for each airline
numDelayedByCity = numDelayedByCity\
    .join(cityRank, "OriginCityName")\
    .withColumn("2022delayedRatio", ((func.col("2022SumDelayedByCity") / func.col("2022totalFlightsByCity")) * 100))\
    .drop("count")
#CHANGE

In [50]:
#2
numDelayedByCity.show(5)

                                                                                

+---------------+--------------------+----------------------+------------------+
| OriginCityName|2022sumDelayedByCity|2022totalFlightsByCity|  2022delayedRatio|
+---------------+--------------------+----------------------+------------------+
|Gainesville, FL|                 443|                  2193| 20.20063839489284|
|   Richmond, VA|                2987|                 13977|21.370823495743007|
|    Ontario, CA|                2742|                 13418| 20.43523624981368|
|     Tucson, AZ|                1618|                  9546|16.949507647182067|
|  Pago Pago, TT|                  16|                    25|              64.0|
+---------------+--------------------+----------------------+------------------+
only showing top 5 rows



In [51]:
#Creates csv with specified data (DONT RUN UNLESS NEEDED)
combinedFlightsDF.write.option("header",True).csv("2022numDelayedByCity") #CHANGE

                                                                                

In [58]:
#Count total # of flights by Quarter and Origin State
timeLocationCancelledDF = delayDF.groupBy("Quarter","OriginStateName").count()
timeLocationCancelledDF = timeLocationCancelledDF.withColumnRenamed("count", "2022totalFlights") #CHANGE
timeLocationCancelledDF.show()




+-------+-------------------+----------------+
|Quarter|    OriginStateName|2022totalFlights|
+-------+-------------------+----------------+
|      2|               Ohio|           24548|
|      2|      Massachusetts|           36848|
|      2|         New Mexico|            6278|
|      2|             Hawaii|           30792|
|      2|            Vermont|            2535|
|      2|             Nevada|           47811|
|      2|          Louisiana|           17803|
|      2|             Kansas|            3905|
|      2|           Oklahoma|            9872|
|      2|            Wyoming|            1750|
|      2|         California|          187240|
|      2|              Texas|          187306|
|      2|U.S. Virgin Islands|            1846|
|      2|            Florida|          151702|
|      2|           Arkansas|            7017|
|      2|        Puerto Rico|            8771|
|      2|     South Carolina|           18536|
|      2|        Connecticut|            5999|
|      2|    

                                                                                

In [59]:
#Count cancelled flight based on quarter and origin state name
timeLocationFlightsDF = delayDF.groupBy("Quarter","OriginStateName").agg({"ifCancelled" : "sum"})
timeLocationFlightsDF = timeLocationFlightsDF.withColumnRenamed("sum(ifCancelled)", "2022cancelledFlights") #CHANGE
timeLocationFlightsDF.show()



+-------+-------------------+--------------------+
|Quarter|    OriginStateName|2022cancelledFlights|
+-------+-------------------+--------------------+
|      2|               Ohio|                 742|
|      2|      Massachusetts|                1277|
|      2|         New Mexico|                  65|
|      2|             Hawaii|                 233|
|      2|            Vermont|                 114|
|      2|             Nevada|                 763|
|      2|          Louisiana|                 358|
|      2|             Kansas|                  50|
|      2|           Oklahoma|                 141|
|      2|            Wyoming|                  46|
|      2|         California|                2432|
|      2|              Texas|                3482|
|      2|U.S. Virgin Islands|                  29|
|      2|            Florida|                5402|
|      2|           Arkansas|                  89|
|      2|        Puerto Rico|                 272|
|      2|     South Carolina|  

                                                                                

In [60]:
#join previous dataframes
timeLocationDF = timeLocationFlightsDF.join(timeLocationCancelledDF, ["Quarter", "OriginStateName"])

In [61]:
#calculate ration of cancelled flights / total flights based on quarter + origin state
timeLocationDF = timeLocationDF.withColumn("2022timeLocationCancellationRatio", ((func.col("2022cancelledFlights") / func.col("2022totalFlights")) * 100))
#CHANGE

In [62]:
# Cancelation based on Time + Location
timeLocationDF.show()

                                                                                

+-------+-------------------+--------------------+----------------+---------------------------------+
|Quarter|    OriginStateName|2022cancelledFlights|2022totalFlights|2022timeLocationCancellationRatio|
+-------+-------------------+--------------------+----------------+---------------------------------+
|      2|               Ohio|                 742|           24548|                3.022649503014502|
|      2|      Massachusetts|                1277|           36848|               3.4655883630047764|
|      2|         New Mexico|                  65|            6278|               1.0353615801210576|
|      2|             Hawaii|                 233|           30792|                0.756690049363471|
|      2|            Vermont|                 114|            2535|                4.497041420118343|
|      2|             Nevada|                 763|           47811|               1.5958670598816171|
|      2|          Louisiana|                 358|           17803|               

In [63]:
#3 Creates csv with specified data (DONT RUN UNLESS NEEDED)
combinedFlightsDF.write.option("header",True).csv("2022timeLocationCancellation") #CHANGE

                                                                                

In [65]:
#4 delayed flights
delayedFlights = delayDF.groupBy("ifDelayed").count()
delayedFlights = delayedFlights.withColumnRenamed("count", "2022count") #CHANGE
delayedFlights.show() #1 - Delayed, 0 - Not delayed



+---------+---------+
|ifDelayed|2022count|
+---------+---------+
|        1|  1032521|
|        0|  3045797|
+---------+---------+



                                                                                

In [66]:
#Creates csv with specified data (DONT RUN UNLESS NEEDED)
combinedFlightsDF.write.option("header",True).csv("2022delayedFlights") #CHANGE

                                                                                

In [67]:
#5 cancelled flights
cancelledFlights = combinedFlightsDF.groupBy("Cancelled").count()
cancelledFlights = cancelledFlights.withColumnRenamed("count", "2022count") #CHANGE
cancelledFlights.show()



+---------+---------+
|Cancelled|2022count|
+---------+---------+
|     true|   123192|
|    false|  3955126|
+---------+---------+



                                                                                

In [68]:
#Creates csv with specified data (DONT RUN UNLESS NEEDED)
combinedFlightsDF.write.option("header",True).csv("2022cancelledFlights") #CHANGE

                                                                                

In [69]:
#categorize flights by date
dayOfWeekRank = combinedFlightsDF.groupBy("DayOfWeek").count()
dayOfWeekRank = dayOfWeekRank.withColumnRenamed("count", "2022totalFlightsOnDay") #CHANGE

In [70]:
dayOfWeekRank.show()



+---------+---------------------+
|DayOfWeek|2022totalFlightsOnDay|
+---------+---------------------+
|        1|               600370|
|        6|               535618|
|        3|               566951|
|        5|               605385|
|        4|               603939|
|        7|               609250|
|        2|               556805|
+---------+---------------------+



                                                                                

In [71]:
# group delayed flights by day of the week
delayedFlightByDay = delayDF.groupBy("DayOfWeek").agg({"ifDelayed" : "sum"})
delayedFlightByDay = delayedFlightByDay.withColumnRenamed("sum(ifDelayed)", "2022sumDelayedByWeekDay") #TODO NO RENAMING
delayedFlightByDay.show()



+---------+-----------------------+
|DayOfWeek|2022sumDelayedByWeekDay|
+---------+-----------------------+
|        1|                 150474|
|        6|                 141345|
|        3|                 123488|
|        5|                 174882|
|        4|                 156528|
|        7|                 167959|
|        2|                 117845|
+---------+-----------------------+



                                                                                

In [72]:
# group cancelled flights by day of the week
cancelledFlightByDay = delayDF.groupBy("DayOfWeek").agg({"ifCancelled" : "sum"})
cancelledFlightByDay = cancelledFlightByDay.withColumnRenamed("sum(ifCancelled)", "2022sumCancelledByWeekDay") 
cancelledFlightByDay.show()



+---------+-------------------------+
|DayOfWeek|2022sumCancelledByWeekDay|
+---------+-------------------------+
|        1|                    16622|
|        6|                    19799|
|        3|                    12902|
|        5|                    21914|
|        4|                    22793|
|        7|                    20669|
|        2|                     8493|
+---------+-------------------------+



                                                                                

In [73]:
analysisByWeekDay = dayOfWeekRank.join(delayedFlightByDay, ["DayOfWeek"]).join(cancelledFlightByDay, ["DayOfWeek"])

In [75]:
analysisByWeekDay = analysisByWeekDay.withColumn("2022CancelledRatio", (( func.col("2022sumCancelledByWeekDay") / func.col("2022totalFlightsOnDay")) * 100))

In [76]:
analysisByWeekDay = analysisByWeekDay.withColumn("2022DelayedRatio", ((func.col("2022sumDelayedByWeekDay") / func.col("2022totalFlightsOnDay")) * 100))

In [77]:
analysisByWeekDay = analysisByWeekDay.withColumn("2022Delayed+CancelledRatio", (((func.col("2022sumDelayedByWeekDay") + func.col("2022sumCancelledByWeekDay")) / func.col("2022totalFlightsOnDay")) * 100))

In [78]:
#6 Days of the week analysis
analysisByWeekDay.show()

                                                                                

+---------+---------------------+-----------------------+-------------------------+------------------+------------------+--------------------------+
|DayOfWeek|2022totalFlightsOnDay|2022sumDelayedByWeekDay|2022sumCancelledByWeekDay|2022CancelledRatio|  2022DelayedRatio|2022Delayed+CancelledRatio|
+---------+---------------------+-----------------------+-------------------------+------------------+------------------+--------------------------+
|        1|               600370|                 150474|                    16622| 2.768626013958059| 25.06354414777554|          27.8321701617336|
|        6|               535618|                 141345|                    19799|  3.69647771359439| 26.38914300863675|        30.085620722231145|
|        3|               566951|                 123488|                    12902| 2.275681672666597|21.781071027302183|         24.05675269996878|
|        5|               605385|                 174882|                    21914|3.6198452224617395|28.8

In [79]:
#Creates csv with specified data (DONT RUN UNLESS NEEDED)
combinedFlightsDF.write.option("header",True).csv("2022analysisByWeekDay")

                                                                                