#INDEX
##Dataset importation and convert to table
1)[Load Data](#notebook/2590900347992364/command/2590900347992366)  
2)[Export to Table](#notebook/2590900347992364/command/2590900347992369)
##Data Aggregation and preparation
1) [Data Documentation](#notebook/1041232494344796/command/1041232494344797)  
2) [Load data and prepare data](#notebook/1041232494344796/command/300252100231806)  
3) [Export data to tables](#notebook/1041232494344796/command/300252100231807)  
##Script to get airports coordinates
[Script](#notebook/1041232494344773/command/300252100231796)
##Data analysis
1) [Load data](#notebook/1041232494344861/command/1041232494344867)  
2) [Plot with the number of flights per airport](#notebook/1041232494344861/command/1041232494344881)  
3) [Plot DataFrame with the highest and lowest numbers of flights per month](#notebook/1041232494344861/command/300252100231802)  
4) [Plot the number of canceled flights, delayed flights and average delay per state](#notebook/1041232494344861/command/1041232494344884)  
5) [Plot the rate of canceled flights and delayed flights per state](#notebook/1041232494344861/command/2740486504779300)  
6) [Network from the routes](#notebook/1041232494344861/command/1041232494344908)  
7) [Map the network for the 350 routes with more flights between 2017-2020](#notebook/1041232494344861/command/1041232494344988)
##App to find the shortest path between two airports
1) [Load Data](#notebook/1646276694320946/command/1646276694320998)  
2) [Shortest path between two airports](#notebook/1646276694320946/command/1646276694320982)

##1) DATA DOCUMENTATION

FL_DATE = Flight Date (yyyy-mm-dd)  
OP_UNIQUE_CARRIER = Carrier ID 
ORIGIN_AIRPORT_ID = Origin Airport ID. An identification number assigned by US DOT to identify a unique airport.  
ORIGIN = Origin Airport  
ORIGIN_CITY_NAME = Origin Airport city name  
ORIGIN_STATE_ABR = Origin Airport state abreviation  
ORIGIN_STATE_NM = Origin Airport state name  
DEST_AIRPORT_ID = Destination Airport ID. An identification number assigned by US DOT to identify a unique airport.  
DEST = Destination Airport  
DEST_CITY_NAME = Destination Airport city name  
DEST_STATE_ABR = Destination Airport state abreviation  
DEST_STATE_NM = Destination Airport state name  
CRS_DEP_TIME = Programmed departure time (local time: hhmm)  
DEP_TIME = Actual departure time (local time: hhmm)  
DEP_DELAY = Departure delay in minutes, early departures show negative numbers.(DEP_TIME - CRS_DEP_TIME).  
WHEELS_OFF = Wheels Off Time (local time: hhmm)  
WHEELS_ON =  Wheels On Time (local time: hhmm)  
CRS_ARR_TIME = Programmed arrival time (local time: hhmm)  
ARR_TIME = Actual arrival time (local time: hhmm)  
ARR_DELAY = Arrival delay in minutes, early arrivals show negative numbers.(ARR_TIME - CRS_ARR_TIME).  
CANCELLED = Cancelled Flight Indicator (1=Yes)   
CANCELLATION_CODE = Specifies The Reason For Cancellation   
CRS_ELAPSED_TIME = Programmed Elapsed Time of Flight, in Minutes  
ACTUAL_ELAPSED_TIME = Actual Elapsed Time of Flight, in Minutes  
AIR_TIME = Flight Time, in Minutes  
DISTANCE = Distance between airports (miles)  
CARRIER_DELAY = Carrier Delay, in Minutes  
WEATHER_DELAY = Weather Delay, in Minutes  
NAS _DELAY = National Air System Delay, in Minutes  
SECURITY_DELAY = Security Delay, in Minutes  
LATE_AIRCRAFT_DELAY = Late Aircraft Delay, in Minutes

In [0]:
from pyspark.sql.types import DateType
from datetime import datetime
import pyspark.sql.functions as F
import pandas as pd
from pyspark.sql import SparkSession

## 2) Load data and prepare data

In [0]:
df = spark.table("complete_dataset")
display(df.select("*"))

FL_DATE,OP_UNIQUE_CARRIER,ORIGIN_AIRPORT_ID,ORIGIN,ORIGIN_CITY_NAME,ORIGIN_STATE_ABR,ORIGIN_STATE_NM,DEST_AIRPORT_ID,DEST,DEST_CITY_NAME,DEST_STATE_ABR,DEST_STATE_NM,CRS_DEP_TIME,DEP_TIME,DEP_DELAY,WHEELS_OFF,WHEELS_ON,CRS_ARR_TIME,ARR_TIME,ARR_DELAY,CANCELLED,CANCELLATION_CODE,CRS_ELAPSED_TIME,ACTUAL_ELAPSED_TIME,AIR_TIME,DISTANCE,CARRIER_DELAY,WEATHER_DELAY,NAS_DELAY,SECURITY_DELAY,LATE_AIRCRAFT_DELAY,_c31
2019-07-01,NK,10158,ACY,"Atlantic City, NJ",NJ,New Jersey,13577,MYR,"Myrtle Beach, SC",SC,South Carolina,725,717.0,-8.0,729.0,831.0,850,838.0,-12.0,0.0,,85.0,81.0,62.0,466.0,,,,,,
2019-07-01,NK,11697,FLL,"Fort Lauderdale, FL",FL,Florida,11433,DTW,"Detroit, MI",MI,Michigan,2110,2104.0,-6.0,2118.0,2345.0,11,2354.0,-17.0,0.0,,181.0,170.0,147.0,1127.0,,,,,,
2019-07-01,NK,10821,BWI,"Baltimore, MD",MD,Maryland,13487,MSP,"Minneapolis, MN",MN,Minnesota,1150,1156.0,6.0,1204.0,1316.0,1328,1319.0,-9.0,0.0,,158.0,143.0,132.0,936.0,,,,,,
2019-07-01,NK,13487,MSP,"Minneapolis, MN",MN,Minnesota,11292,DEN,"Denver, CO",CO,Colorado,1425,1415.0,-10.0,1427.0,1504.0,1533,1514.0,-19.0,0.0,,128.0,119.0,97.0,680.0,,,,,,
2019-07-01,NK,11292,DEN,"Denver, CO",CO,Colorado,13487,MSP,"Minneapolis, MN",MN,Minnesota,1626,1623.0,-3.0,1637.0,1913.0,1924,1918.0,-6.0,0.0,,118.0,115.0,96.0,680.0,,,,,,
2019-07-01,NK,15304,TPA,"Tampa, FL",FL,Florida,10821,BWI,"Baltimore, MD",MD,Maryland,1130,1233.0,63.0,1245.0,1434.0,1349,1446.0,57.0,0.0,,139.0,133.0,109.0,842.0,8.0,0.0,49.0,0.0,0.0,
2019-07-01,NK,10821,BWI,"Baltimore, MD",MD,Maryland,15304,TPA,"Tampa, FL",FL,Florida,820,923.0,63.0,939.0,1133.0,1040,1139.0,59.0,0.0,,140.0,136.0,114.0,842.0,0.0,0.0,59.0,0.0,0.0,
2019-07-01,NK,13204,MCO,"Orlando, FL",FL,Florida,10874,CAK,"Akron, OH",OH,Ohio,958,959.0,1.0,1009.0,1201.0,1219,1208.0,-11.0,0.0,,141.0,129.0,112.0,861.0,,,,,,
2019-07-01,NK,10874,CAK,"Akron, OH",OH,Ohio,13204,MCO,"Orlando, FL",FL,Florida,1310,1315.0,5.0,1323.0,1518.0,1532,1528.0,-4.0,0.0,,142.0,133.0,115.0,861.0,,,,,,
2019-07-01,NK,11433,DTW,"Detroit, MI",MI,Michigan,11697,FLL,"Fort Lauderdale, FL",FL,Florida,1911,1914.0,3.0,1923.0,2148.0,2206,2154.0,-12.0,0.0,,175.0,160.0,145.0,1127.0,,,,,,


In [0]:
#Number of Rows
df.count()

In [0]:
#Convert FL_DATE to DateType and sort the data by FL_DATE
df = df.withColumn("FL_DATE", df["FL_DATE"].cast(DateType()))\
       .sort('FL_DATE').drop('_c31')

display(df.select("*"))

FL_DATE,OP_UNIQUE_CARRIER,ORIGIN_AIRPORT_ID,ORIGIN,ORIGIN_CITY_NAME,ORIGIN_STATE_ABR,ORIGIN_STATE_NM,DEST_AIRPORT_ID,DEST,DEST_CITY_NAME,DEST_STATE_ABR,DEST_STATE_NM,CRS_DEP_TIME,DEP_TIME,DEP_DELAY,WHEELS_OFF,WHEELS_ON,CRS_ARR_TIME,ARR_TIME,ARR_DELAY,CANCELLED,CANCELLATION_CODE,CRS_ELAPSED_TIME,ACTUAL_ELAPSED_TIME,AIR_TIME,DISTANCE,CARRIER_DELAY,WEATHER_DELAY,NAS_DELAY,SECURITY_DELAY,LATE_AIRCRAFT_DELAY
2017-01-01,AA,11298,DFW,"Dallas/Fort Worth, TX",TX,Texas,12266,IAH,"Houston, TX",TX,Texas,2025,2218.0,113.0,2231.0,2315.0,2136,2321.0,105.0,0.0,,71.0,63.0,44.0,224.0,10.0,0.0,0.0,0.0,95.0
2017-01-01,EV,10397,ATL,"Atlanta, GA",GA,Georgia,13577,MYR,"Myrtle Beach, SC",SC,South Carolina,940,936.0,-4.0,951.0,1036.0,1058,1040.0,-18.0,0.0,,78.0,64.0,45.0,317.0,,,,,
2017-01-01,AA,14027,PBI,"West Palm Beach/Palm Beach, FL",FL,Florida,13930,ORD,"Chicago, IL",IL,Illinois,1335,1331.0,-4.0,1346.0,1522.0,1550,1534.0,-16.0,0.0,,195.0,183.0,156.0,1144.0,,,,,
2017-01-01,AS,13871,OMA,"Omaha, NE",NE,Nebraska,14747,SEA,"Seattle, WA",WA,Washington,1710,,,,,1845,,,1.0,B,215.0,,,1368.0,,,,,
2017-01-01,EV,11057,CLT,"Charlotte, NC",NC,North Carolina,13487,MSP,"Minneapolis, MN",MN,Minnesota,1240,1315.0,35.0,1330.0,1452.0,1427,1455.0,28.0,0.0,,167.0,160.0,142.0,930.0,0.0,0.0,0.0,0.0,28.0
2017-01-01,AA,14107,PHX,"Phoenix, AZ",AZ,Arizona,12339,IND,"Indianapolis, IN",IN,Indiana,1955,2008.0,13.0,2024.0,112.0,112,117.0,5.0,0.0,,197.0,189.0,168.0,1488.0,,,,,
2017-01-01,AA,12266,IAH,"Houston, TX",TX,Texas,11298,DFW,"Dallas/Fort Worth, TX",TX,Texas,2000,2021.0,21.0,2034.0,2119.0,2115,2125.0,10.0,0.0,,75.0,64.0,45.0,224.0,,,,,
2017-01-01,AS,14908,SNA,"Santa Ana, CA",CA,California,14747,SEA,"Seattle, WA",WA,Washington,1310,,,,,1559,,,1.0,B,169.0,,,978.0,,,,,
2017-01-01,AS,14747,SEA,"Seattle, WA",WA,Washington,12266,IAH,"Houston, TX",TX,Texas,1030,1047.0,17.0,1135.0,1725.0,1645,1728.0,43.0,0.0,,255.0,281.0,230.0,1874.0,0.0,17.0,26.0,0.0,0.0
2017-01-01,EV,11298,DFW,"Dallas/Fort Worth, TX",TX,Texas,14952,SPI,"Springfield, IL",IL,Illinois,1330,1330.0,0.0,1336.0,1509.0,1514,1512.0,-2.0,0.0,,104.0,102.0,93.0,630.0,,,,,


In [0]:
# count na values
def count_missings(spark_df,sort=True):
    """
    Counts number of nulls and nans in each column
    """
    df = spark_df.select([F.count(F.when(F.isnan(c) | F.isnull(c), c)).alias(c) for (c,c_type) in spark_df.dtypes if c_type not in ('string', 'date')]).toPandas()

    if len(df) == 0:
        print("There are no any missing values!")
        return None

    if sort:
        return df.rename(index={0: 'count'}).T.sort_values("count",ascending=False)

    return df

count_missings(df)

Unnamed: 0,count
LATE_AIRCRAFT_DELAY,20795102
CARRIER_DELAY,20795102
SECURITY_DELAY,20795102
NAS_DELAY,20795102
WEATHER_DELAY,20795102
ARR_DELAY,674835
ACTUAL_ELAPSED_TIME,672237
AIR_TIME,672237
WHEELS_ON,623407
ARR_TIME,623404


In [0]:
#When the DEP_TIME is na we consider that is the same has CRS_DEP_TIME.
#When the ARR_TIME is na we consider that is the same has CRS_ARR_TIME.
#When the DEP_DELAY is na we consider that is no delay(0).
#When the ARR_DELAY is na we consider that is no delay(0).

df = df.withColumn('DEP_TIME', F.coalesce('DEP_TIME', 'CRS_DEP_TIME'))\
      .withColumn('ARR_TIME', F.coalesce('ARR_TIME', 'CRS_ARR_TIME'))\
      .fillna(0, 'DEP_DELAY').fillna(0, 'ARR_DELAY')

count_missings(df)

Unnamed: 0,count
LATE_AIRCRAFT_DELAY,20795102
SECURITY_DELAY,20795102
NAS_DELAY,20795102
WEATHER_DELAY,20795102
CARRIER_DELAY,20795102
AIR_TIME,672237
ACTUAL_ELAPSED_TIME,672237
WHEELS_ON,623407
WHEELS_OFF,612478
CRS_ELAPSED_TIME,157


In [0]:
#Function to count rows with a condition
cnt_cond = lambda cond: F.sum(F.when(cond, 1).otherwise(0))

In [0]:
#Agregate df and convert it to pandas DataFrame
flights_data = df.groupBy('ORIGIN_STATE_ABR','ORIGIN_AIRPORT_ID','FL_DATE').agg(
    F.count('FL_DATE').alias('TOTAL_FLIGHTS'),
    cnt_cond(F.col('CANCELLED') == 1).alias('CANCELLED'),
    cnt_cond(F.col('DEP_DELAY') > 0).alias('TOTAL_DEP_DELAY'),
    F.round(F.avg('DEP_DELAY'), 2).alias('AVG_DELAY'),
    cnt_cond((F.col('DEP_DELAY') < 15) & (F.col('DEP_DELAY') > 0)).alias('DEP_DELAY_LESS15'),
    cnt_cond(F.col('DEP_DELAY') >= 15).alias('DEP_DELAY_MORE15')).toPandas()
flights_data.head()

Unnamed: 0,ORIGIN_STATE_ABR,ORIGIN_AIRPORT_ID,FL_DATE,TOTAL_FLIGHTS,CANCELLED,TOTAL_DEP_DELAY,AVG_DELAY,DEP_DELAY_LESS15,DEP_DELAY_MORE15
0,PA,14100,2017-01-01,174,1,45,3.79,23,22
1,TX,11298,2017-01-01,476,6,210,17.51,83,127
2,FL,11697,2017-01-01,264,2,113,16.61,46,67
3,VA,11278,2017-01-01,158,0,31,2.18,15,16
4,IL,13930,2017-01-01,560,2,204,7.21,103,101


In [0]:
#Covert FL_DATE to datetime
flights_data['FL_DATE'] = pd.to_datetime(flights_data['FL_DATE'])
                                          
#Create 2 columns that store the week day (Monday=0, Sunday=6) and the week day name
flights_data['FL_WEEKDAY'] = flights_data['FL_DATE'].dt.dayofweek
flights_data['FL_WEEKDAY_NAME'] = flights_data['FL_DATE'].dt.day_name()                                        

flights_data.head()

Unnamed: 0,ORIGIN_STATE_ABR,ORIGIN_AIRPORT_ID,FL_DATE,TOTAL_FLIGHTS,CANCELLED,TOTAL_DEP_DELAY,AVG_DELAY,DEP_DELAY_LESS15,DEP_DELAY_MORE15,FL_WEEKDAY,FL_WEEKDAY_NAME
0,PA,14100,2017-01-01,174,1,45,3.79,23,22,6,Sunday
1,TX,11298,2017-01-01,476,6,210,17.51,83,127,6,Sunday
2,FL,11697,2017-01-01,264,2,113,16.61,46,67,6,Sunday
3,VA,11278,2017-01-01,158,0,31,2.18,15,16,6,Sunday
4,IL,13930,2017-01-01,560,2,204,7.21,103,101,6,Sunday


##3) Export data to tables

In [0]:
flights_data = spark.createDataFrame(flights_data)

In [0]:
network_data = df.select('ORIGIN_AIRPORT_ID', 'DEST_AIRPORT_ID','DISTANCE')\
                 .groupBy('ORIGIN_AIRPORT_ID', 'DEST_AIRPORT_ID', 'DISTANCE').agg(
                 F.count('DISTANCE').alias('TOTAL_FLIGHTS'))

display(network_data)

ORIGIN_AIRPORT_ID,DEST_AIRPORT_ID,DISTANCE,TOTAL_FLIGHTS
11298,11618,1372.0,11211
11630,14747,1533.0,6608
10397,10423,813.0,13793
12889,13303,2174.0,4345
13930,10721,867.0,25784
12478,14492,427.0,9064
10792,11697,1166.0,2658
14771,14908,372.0,14372
14570,14771,192.0,5642
13204,13230,851.0,318


In [0]:
flights_data.write.format("csv").saveAsTable('flights_data_agg')
network_data.write.format("csv").saveAsTable('network_data')