# Predicting Flight Delays: Data Pipeline
This notebook outlines the data pipeline up until the final join is executed. Specifically it includes the following steps:
1. Notebook Setup
2. Reading in the full airlines, weather, weather stations, and airport dataset
3. Airlines data cleaning + checkpoint to blob storage 
4. Weather data cleaning + checkpoint to blob storage
5. Other datasets data cleaning 
6. Execution of the joins + checkpoint to blob storage 

The idea of checkpointing our clean data to the blob storage as well as after the join execution was to help Spark split up the calculations it is doing, as it will only compute when called to do an action (such as write to parquet).

### Time to Read/Write to Blob Tables:
| DataFrame           | Read Time             |
|---------------------|-----------------------|
|Cleaned Airlines     | 1.18 seconds          |
|Cleaned Weather      | 1.02 seconds          |
|Full Joined Dataset  | 0.88 seconds          |

| DataFrame           | Write Time            |
|---------------------|-----------------------|
|Cleaned Airlines     | 6.03 minutes          |
|Cleaned Weather      | 57.43 minutes         |
|Full Joined Dataset  | 11.52 minutes         |

### Shape of DataFrames Table:
| DataFrame                   | Rows           | Columns         |
|-----------------------------|----------------|-----------------|
|Airlines Raw                 | 74,177,433     | 108             |
|Airlines Cleaned             | 42,430,592     | 51              |
|Weather Raw                  | 898,983,399    | 123             |
|Weather Cleaned              | 223,810,507    | 35              |
|Weather Stations Raw + Clean | 5,004,169      | 12              |
|Airports Raw + Clean         | 74,043         | 18              |
|Full Joined Dataset          | 42,430,592     | 95              |

# Notebook Setup

In [0]:
%pip install timezonefinder
%pip install tzfpy

Python interpreter will be restarted.
Collecting timezonefinder
  Downloading timezonefinder-6.1.7.tar.gz (45.9 MB)
  Installing build dependencies: started
  Installing build dependencies: finished with status 'done'
  Getting requirements to build wheel: started
  Getting requirements to build wheel: finished with status 'done'
    Preparing wheel metadata: started
    Preparing wheel metadata: finished with status 'done'
Collecting cffi<2,>=1.15.1
  Using cached cffi-1.15.1-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (441 kB)
Collecting setuptools>=65.5
  Using cached setuptools-65.6.0-py3-none-any.whl (1.2 MB)
Collecting h3<4,>=3.7.4
  Downloading h3-3.7.4-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (1.1 MB)
Building wheels for collected packages: timezonefinder
  Building wheel for timezonefinder (PEP 517): started
  Building wheel for timezonefinder (PEP 517): finished with status 'done'
  Created wheel for timezonefinder: filename=timezonefinder-6.1.7-c

In [0]:
# General 
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import plotly.express as px
import sys

# PySpark 
from pyspark.sql.functions import col,isnan,when,count
from pyspark.sql.functions import regexp_replace

# SQL Functions
from pyspark.sql import functions as f
from pyspark.sql.functions import monotonically_increasing_id, to_timestamp, to_utc_timestamp
from pyspark.sql.functions import isnan, when, count, col, isnull, percent_rank, first
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType, NullType, ShortType, DateType, BooleanType, BinaryType, FloatType, DecimalType
from pyspark.sql import SQLContext
from pyspark.sql.window import Window
from pyspark.streaming import StreamingContext
from pyspark.sql import Row
from functools import reduce
from pyspark.sql.functions import rand,col,when,concat,substring,lit,udf,lower,sum as ps_sum,count as ps_count,row_number
from pyspark.sql.window import *
from pyspark.sql import DataFrame

# ML
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.stat import Correlation

# Misc 
from pandas.tseries.holiday import USFederalHolidayCalendar as calendar
from timezonefinder import TimezoneFinder
from tzfpy import get_tz



In [0]:
# Display and define where mids-w261 is located
data_BASE_DIR = "dbfs:/mnt/mids-w261/"
# display(dbutils.fs.ls(f"{data_BASE_DIR}"))

# Inspect the Mount's Final Project folder 
data_BASE_DIR = "dbfs:/mnt/mids-w261/datasets_final_project_2022/"
# display(dbutils.fs.ls(f"{data_BASE_DIR}"))

In [0]:
blob_container = "housestark" # The name of your container created in https://portal.azure.com
storage_account = "neilp" # The name of your Storage account created in https://portal.azure.com
secret_scope = "w261_s1g4" # The name of the scope created in your local computer using the Databricks CLI
secret_key = "w261_s1g4_key" # The name of the secret key created in your local computer using the Databricks CLI 
blob_url = f"wasbs://{blob_container}@{storage_account}.blob.core.windows.net"
mount_path = "/mnt/mids-w261"

In [0]:
spark.conf.set(
  f"fs.azure.sas.{blob_container}.{storage_account}.blob.core.windows.net",
  dbutils.secrets.get(scope = secret_scope, key = secret_key)
)

# Read in Full Data

In [0]:
# Set partitions
spark.conf.set("spark.sql.shuffle.partitions", 1000)
spark.conf.set("spark.sql.files.minPartitionNum", 1000)

## Read in parquet files to be dataframes
# Airlines dataset
df_airlinesRAW = spark.read.parquet(f"{data_BASE_DIR}parquet_airlines_data/*").repartition(1000, "FL_DATE").persist()

# # Weather dataset
df_weatherRAW = spark.read.parquet(f"{data_BASE_DIR}parquet_weather_data/*").repartition(1000, "STATION").persist()

# Stations dataset 
df_stationsRAW = spark.read.parquet(f"{data_BASE_DIR}stations_data/*").persist()

# Airports dataset
df_airports = pd.read_csv("https://davidmegginson.github.io/ourairports-data/airports.csv").astype(str)
df_airportsRAW = spark.createDataFrame(df_airports).persist()

# Raw Data EDA

In [0]:
%sh du -h /dbfs/mnt/mids-w261/datasets_final_project_2022/

435M	/dbfs/mnt/mids-w261/datasets_final_project_2022/parquet_airlines_data/YEAR=2015
427M	/dbfs/mnt/mids-w261/datasets_final_project_2022/parquet_airlines_data/YEAR=2016
437M	/dbfs/mnt/mids-w261/datasets_final_project_2022/parquet_airlines_data/YEAR=2017
574M	/dbfs/mnt/mids-w261/datasets_final_project_2022/parquet_airlines_data/YEAR=2018
595M	/dbfs/mnt/mids-w261/datasets_final_project_2022/parquet_airlines_data/YEAR=2019
143M	/dbfs/mnt/mids-w261/datasets_final_project_2022/parquet_airlines_data/YEAR=2020
198M	/dbfs/mnt/mids-w261/datasets_final_project_2022/parquet_airlines_data/YEAR=2021
2.8G	/dbfs/mnt/mids-w261/datasets_final_project_2022/parquet_airlines_data
595M	/dbfs/mnt/mids-w261/datasets_final_project_2022/parquet_airlines_data_1y
96M	/dbfs/mnt/mids-w261/datasets_final_project_2022/parquet_airlines_data_3m
206M	/dbfs/mnt/mids-w261/datasets_final_project_2022/parquet_airlines_data_6m
4.7G	/dbfs/mnt/mids-w261/datasets_final_project_2022/parquet_weather_data/YEAR=2015
4.6G	/dbfs/mn

In [0]:
print('# Rows Airlines Raw:', df_airlinesRAW.count())
print('# Columns Airlines Raw:', len(df_airlinesRAW.columns))

print()

print('# Rows Weather Raw:', df_weatherRAW.count())
print('# Columns Weather Raw:', len(df_weatherRAW.columns))

print()

print('# Rows Weather Stations Raw:', df_stationsRAW.count())
print('# Columns Weather Stations Raw:', len(df_stationsRAW.columns))

print()

print('# Rows Airports Raw:', df_airportsRAW.count())
print('# Columns Airports Raw:', len(df_airportsRAW.columns))

# Rows Airlines Raw: 74177433
# Columns Airlines Raw: 108

# Rows Weather Raw: 898983399
# Columns Weather Raw: 123

# Rows Weather Stations Raw: 5004169
# Columns Weather Stations Raw: 12

# Rows Airports Raw: 74043
# Columns Airports Raw: 18


In [0]:
df_airlinesRAWNulls = df_airlinesRAW.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df_airlinesRAW.columns])

display(df_airlinesRAWNulls)

QUARTER,MONTH,DAY_OF_MONTH,DAY_OF_WEEK,FL_DATE,OP_UNIQUE_CARRIER,OP_CARRIER_AIRLINE_ID,OP_CARRIER,TAIL_NUM,OP_CARRIER_FL_NUM,ORIGIN_AIRPORT_ID,ORIGIN_AIRPORT_SEQ_ID,ORIGIN_CITY_MARKET_ID,ORIGIN,ORIGIN_CITY_NAME,ORIGIN_STATE_ABR,ORIGIN_STATE_FIPS,ORIGIN_STATE_NM,ORIGIN_WAC,DEST_AIRPORT_ID,DEST_AIRPORT_SEQ_ID,DEST_CITY_MARKET_ID,DEST,DEST_CITY_NAME,DEST_STATE_ABR,DEST_STATE_FIPS,DEST_STATE_NM,DEST_WAC,CRS_DEP_TIME,DEP_TIME,DEP_DELAY,DEP_DELAY_NEW,DEP_DEL15,DEP_DELAY_GROUP,DEP_TIME_BLK,TAXI_OUT,WHEELS_OFF,WHEELS_ON,TAXI_IN,CRS_ARR_TIME,ARR_TIME,ARR_DELAY,ARR_DELAY_NEW,ARR_DEL15,ARR_DELAY_GROUP,ARR_TIME_BLK,CANCELLED,CANCELLATION_CODE,DIVERTED,CRS_ELAPSED_TIME,ACTUAL_ELAPSED_TIME,AIR_TIME,FLIGHTS,DISTANCE,DISTANCE_GROUP,CARRIER_DELAY,WEATHER_DELAY,NAS_DELAY,SECURITY_DELAY,LATE_AIRCRAFT_DELAY,FIRST_DEP_TIME,TOTAL_ADD_GTIME,LONGEST_ADD_GTIME,DIV_AIRPORT_LANDINGS,DIV_REACHED_DEST,DIV_ACTUAL_ELAPSED_TIME,DIV_ARR_DELAY,DIV_DISTANCE,DIV1_AIRPORT,DIV1_AIRPORT_ID,DIV1_AIRPORT_SEQ_ID,DIV1_WHEELS_ON,DIV1_TOTAL_GTIME,DIV1_LONGEST_GTIME,DIV1_WHEELS_OFF,DIV1_TAIL_NUM,DIV2_AIRPORT,DIV2_AIRPORT_ID,DIV2_AIRPORT_SEQ_ID,DIV2_WHEELS_ON,DIV2_TOTAL_GTIME,DIV2_LONGEST_GTIME,DIV2_WHEELS_OFF,DIV2_TAIL_NUM,DIV3_AIRPORT,DIV3_AIRPORT_ID,DIV3_AIRPORT_SEQ_ID,DIV3_WHEELS_ON,DIV3_TOTAL_GTIME,DIV3_LONGEST_GTIME,DIV3_WHEELS_OFF,DIV3_TAIL_NUM,DIV4_AIRPORT,DIV4_AIRPORT_ID,DIV4_AIRPORT_SEQ_ID,DIV4_WHEELS_ON,DIV4_TOTAL_GTIME,DIV4_LONGEST_GTIME,DIV4_WHEELS_OFF,DIV4_TAIL_NUM,DIV5_AIRPORT,DIV5_AIRPORT_ID,DIV5_AIRPORT_SEQ_ID,DIV5_WHEELS_ON,DIV5_TOTAL_GTIME,DIV5_LONGEST_GTIME,DIV5_WHEELS_OFF,DIV5_TAIL_NUM
0,0,0,0,0,0,0,0,315314,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1325132,1335235,1335235,1335235,1335235,0,1355826,1355816,1390121,1390121,0,1390099,1547237,1547237,1547237,1547237,0,0,72813445,0,334,1542041,1542041,0,0,0,61136954,61136954,61136954,61136954,61136954,73711726,73711743,73711744,95,73999356,74025552,74025493,73999380,73991099,73991099,73991099,73991102,73991099,73991099,74024559,74024558,74175859,74175848,74175848,74175859,74175859,74175859,74176789,74176789,74177419,74177419,74177419,74177419,74177419,74177419,74177431,74177431,74177433,74177433,74177433,74177433,74177433,74177433,74177433,74177433,74177433,74177433,74177433,74177433,74177433,74177433,74177433,74177433


In [0]:
df_weatherRAWNulls = df_weatherRAW.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df_weatherRAW.columns])

display(df_weatherRAWNulls)

STATION,DATE,LATITUDE,LONGITUDE,ELEVATION,NAME,REPORT_TYPE,SOURCE,HourlyAltimeterSetting,HourlyDewPointTemperature,HourlyDryBulbTemperature,HourlyPrecipitation,HourlyPresentWeatherType,HourlyPressureChange,HourlyPressureTendency,HourlyRelativeHumidity,HourlySkyConditions,HourlySeaLevelPressure,HourlyStationPressure,HourlyVisibility,HourlyWetBulbTemperature,HourlyWindDirection,HourlyWindGustSpeed,HourlyWindSpeed,Sunrise,Sunset,DailyAverageDewPointTemperature,DailyAverageDryBulbTemperature,DailyAverageRelativeHumidity,DailyAverageSeaLevelPressure,DailyAverageStationPressure,DailyAverageWetBulbTemperature,DailyAverageWindSpeed,DailyCoolingDegreeDays,DailyDepartureFromNormalAverageTemperature,DailyHeatingDegreeDays,DailyMaximumDryBulbTemperature,DailyMinimumDryBulbTemperature,DailyPeakWindDirection,DailyPeakWindSpeed,DailyPrecipitation,DailySnowDepth,DailySnowfall,DailySustainedWindDirection,DailySustainedWindSpeed,DailyWeather,MonthlyAverageRH,MonthlyDaysWithGT001Precip,MonthlyDaysWithGT010Precip,MonthlyDaysWithGT32Temp,MonthlyDaysWithGT90Temp,MonthlyDaysWithLT0Temp,MonthlyDaysWithLT32Temp,MonthlyDepartureFromNormalAverageTemperature,MonthlyDepartureFromNormalCoolingDegreeDays,MonthlyDepartureFromNormalHeatingDegreeDays,MonthlyDepartureFromNormalMaximumTemperature,MonthlyDepartureFromNormalMinimumTemperature,MonthlyDepartureFromNormalPrecipitation,MonthlyDewpointTemperature,MonthlyGreatestPrecip,MonthlyGreatestPrecipDate,MonthlyGreatestSnowDepth,MonthlyGreatestSnowDepthDate,MonthlyGreatestSnowfall,MonthlyGreatestSnowfallDate,MonthlyMaxSeaLevelPressureValue,MonthlyMaxSeaLevelPressureValueDate,MonthlyMaxSeaLevelPressureValueTime,MonthlyMaximumTemperature,MonthlyMeanTemperature,MonthlyMinSeaLevelPressureValue,MonthlyMinSeaLevelPressureValueDate,MonthlyMinSeaLevelPressureValueTime,MonthlyMinimumTemperature,MonthlySeaLevelPressure,MonthlyStationPressure,MonthlyTotalLiquidPrecipitation,MonthlyTotalSnowfall,MonthlyWetBulb,AWND,CDSD,CLDD,DSNW,HDSD,HTDD,NormalsCoolingDegreeDay,NormalsHeatingDegreeDay,ShortDurationEndDate005,ShortDurationEndDate010,ShortDurationEndDate015,ShortDurationEndDate020,ShortDurationEndDate030,ShortDurationEndDate045,ShortDurationEndDate060,ShortDurationEndDate080,ShortDurationEndDate100,ShortDurationEndDate120,ShortDurationEndDate150,ShortDurationEndDate180,ShortDurationPrecipitationValue005,ShortDurationPrecipitationValue010,ShortDurationPrecipitationValue015,ShortDurationPrecipitationValue020,ShortDurationPrecipitationValue030,ShortDurationPrecipitationValue045,ShortDurationPrecipitationValue060,ShortDurationPrecipitationValue080,ShortDurationPrecipitationValue100,ShortDurationPrecipitationValue120,ShortDurationPrecipitationValue150,ShortDurationPrecipitationValue180,REM,BackupDirection,BackupDistance,BackupDistanceUnit,BackupElements,BackupElevation,BackupEquipment,BackupLatitude,BackupLongitude,BackupName,WindEquipmentChangeDate
0,0,6388837,6388837,6392201,6388837,0,0,434627564,153150719,19395099,784617131,804583926,641488319,631593536,153438316,461348098,546547716,416467448,315169298,425250751,125475322,827361398,116109419,894429194,894429189,897808216,895888250,897805650,897810050,896307781,897808216,896289555,895888250,896322047,895888250,895887189,895887392,896382039,896263884,895910577,897953870,897960452,896286212,896251846,896142133,898983399,898894406,898894409,898896466,898896583,898896466,898896466,898898672,898899326,898899326,898898663,898898671,898907535,898983399,898907085,898911264,898964464,898977509,898964191,898974905,898907610,898907608,898907608,898894687,898894699,898907678,898907624,898907624,898894696,898908702,898908422,898897940,898974923,898983399,898910427,898900504,898896572,898952915,898901042,898896572,898904450,898904424,898914601,898914601,898917385,898917388,898917383,898917384,898917384,898917390,898917385,898917384,898917382,898917464,898917385,898917409,898917385,898917388,898917383,898917384,898917385,898917390,898917385,898917384,898917382,898917464,228945826,886796590,774494573,884583526,884302245,887096998,884703937,887572779,887219591,884134608,850727231


In [0]:
# Get rid of unnecessary columns --> first we get the year from DATE columns
df_airlinesRAW = df_airlinesRAW.withColumn("YEAR",f.year(to_timestamp("FL_DATE")))
df_weatherRAW = df_weatherRAW.withColumn("YEAR",f.year(to_timestamp("DATE")))

After an initial glance at the data (phase I), we knew there would be certain columns we would not be using. As such we decided to remove the columns in question to optimize data processing. In the airlines dataset, we removed columns relating to flight arrivals and some flight summary data (e.g. AIR_TIME) because in reality, we would not have access to data from the "future". In case there are scenarios where an aircraft arrives late and thus delays the flight about to take off, the LATE_AIRCRAFT_DELAY variable takes care of highlighting this. We also removed columns relating to flight diversions. Flight diversions can happen before take off or while the plane is in air, as such we are not including this in our model objective. In the weather dataset, we removed daily, monthly, and short duration columns because the hourly weather data will be more relevant for our context.

In [0]:
# Remove unneccessary columns 

# Airlines Dataset
df_airlinesSelect1  = ['QUARTER','MONTH','DAY_OF_MONTH','DAY_OF_WEEK','FL_DATE','OP_UNIQUE_CARRIER','OP_CARRIER_AIRLINE_ID','OP_CARRIER','TAIL_NUM','OP_CARRIER_FL_NUM','ORIGIN_AIRPORT_ID','ORIGIN_AIRPORT_SEQ_ID','ORIGIN_CITY_MARKET_ID','ORIGIN','ORIGIN_CITY_NAME','ORIGIN_STATE_ABR','ORIGIN_STATE_FIPS','ORIGIN_STATE_NM','ORIGIN_WAC','DEST_AIRPORT_ID','DEST_AIRPORT_SEQ_ID', 'DEST_CITY_MARKET_ID','DEST','DEST_CITY_NAME', 'DEST_STATE_ABR', 'DEST_STATE_FIPS','DEST_STATE_NM','DEST_WAC','CRS_DEP_TIME','DEP_TIME','DEP_DELAY','DEP_DELAY_NEW','DEP_DEL15','DEP_DELAY_GROUP','DEP_TIME_BLK','TAXI_OUT','WHEELS_OFF','WHEELS_ON','TAXI_IN','CANCELLED','CANCELLATION_CODE', 'DIVERTED','FLIGHTS','DISTANCE','DISTANCE_GROUP','CARRIER_DELAY','WEATHER_DELAY','NAS_DELAY', 'SECURITY_DELAY', 'LATE_AIRCRAFT_DELAY','YEAR']
df_airlinesRaw = df_airlinesRAW.select(df_airlinesSelect1).persist()


# Weather Dataset 
df_weatherSelect1 = ['STATION', 'YEAR', 'DATE', 'LATITUDE', 'LONGITUDE', 'ELEVATION', 'NAME', 'REPORT_TYPE', 'SOURCE', 'HourlyAltimeterSetting', 'HourlyDewPointTemperature', 'HourlyDryBulbTemperature', 'HourlyPrecipitation', 'HourlyPresentWeatherType', 'HourlyPressureChange', 'HourlyPressureTendency', 'HourlyRelativeHumidity', 'HourlySkyConditions', 'HourlySeaLevelPressure', 'HourlyStationPressure', 'HourlyVisibility', 'HourlyWetBulbTemperature', 'HourlyWindDirection', 'HourlyWindGustSpeed', 'HourlyWindSpeed', 'Sunrise', 'Sunset', 'AWND', 'CDSD', 'CLDD', 'DSNW', 'HDSD', 'HTDD', 'NormalsCoolingDegreeDay', 'NormalsHeatingDegreeDay']
df_weatherRaw = df_weatherRAW.select(df_weatherSelect1).persist()

# Weather Stations Dataset 
df_stationsSub1 = df_stationsRAW.persist()
# Airports Dataset
df_airportsSub1 = df_airportsRAW.persist()

# Airlines Data Processing

In [0]:
# Let's take a look at shape of Raw Airlines dataframe post drop 
print('# Rows Airlines Raw:', df_airlinesRaw.count())
print('# Columns Airlines Raw:', len(df_airlinesRaw.columns))

# Rows Airlines Raw: 74177433
# Columns Airlines Raw: 51


For now only need to drop duplicates (seems to exist for each flight, likely because the information is coming from the origin and destination airport), further cleaning/manipulation for the join for this dataset will be conducted later

In [0]:
#df_airlinesSub1Clean = df_airlinesRaw.dropDuplicates().persist() 
#df_airlinesSub1Clean.write.mode('overwrite').parquet(f"{blob_url}/dfa_Clean")
df_airlinesSub1Clean = spark.read.parquet(f"{blob_url}/dfa_Clean")


In [0]:
# Let's take a look at shape of Cleaned Airlines dataframe
print('# Rows Airlines Clean:', df_airlinesSub1Clean.count()) # 42.4 M
print('# Columns Airlines Clean:', len(df_airlinesSub1Clean.columns))

# Rows Airlines Clean: 42430592
# Columns Airlines Clean: 51


In [0]:
# Keep this number in mind till the last cell to compare the number of airports before joining vs after joining
df_airlinesSub1Clean.createOrReplaceTempView('airport_count')
airport_count = sqlContext.sql(""" select COUNT(DISTINCT ORIGIN_AIRPORT_ID) as airport_count from airport_count """).persist()
display(airport_count)

airport_count
388


In [0]:
# Convert date and time to UTC for Airlines dataset

def convert_to_utc2(df, sub_hrs=0):
  '''
  Input: dataframe, hours to subtract (optional)
  Output: datafram with a column of the UTC timestamp of the flight departure
  '''
  def format_time(ts):
    '''
    Input: departure time
    Output: departure time in proper time format
    '''
    if ts is None:
        return None
    ts = str(ts)
    if len(ts) == 2:
      ts = '00:'+ts
    elif len(ts) == 3:
      ts = "0"+ts
      ts = ts[0] + ts[1] + ":" + ts[2] + ts[3]
    elif len(ts) == 4:
      ts = ts[0] + ts[1] + ":" + ts[2] + ts[3]
    else:
      ts = ''
    return ts
  
  format_timeUDF = udf(lambda x: format_time(x), StringType())
  
  df = df.withColumn("CRS_DEP_TIME_mod", format_timeUDF("CRS_DEP_TIME"))
  
  df = df.withColumn('FL_DATE_2', f.to_date('FL_DATE'))
  
  df = df.withColumn("local_timezone", concat("FL_DATE_2",lit("T"),"CRS_DEP_TIME_mod"))
  df = df.withColumn("local_timestamp",to_timestamp("local_timezone"))
  
  def ret_tz(longitude, latitude):
    '''
    Input: longitude, latitude
    Output: proper timezone
    '''
    if longitude is None or latitude is None:
        return None
    longitude = float(longitude)
    latitude = float(latitude)
    return get_tz(lng=longitude, lat=latitude)
  
  ret_tzUDF = udf(lambda x,y: ret_tz(x,y),StringType())
  
  df = df.withColumn("timezone", ret_tzUDF("longitude_deg", "latitude_deg"))
  
  df = df.withColumn("scheduled_departure_UTC", to_utc_timestamp(df["local_timestamp"], df["timezone"]))
  
  return df

# Weather Data Processing

In [0]:
# Let's take a look at shape of Raw  Weather dataframe
print('# Rows Weather Raw:', df_weatherRaw.count())
print('# Columns Weather Raw:', len(df_weatherRaw.columns))

# Rows Weather Raw: 898983399
# Columns Weather Raw: 35


To address the different weather sources, we decided to pick source 7 which brings in weather information from the United States Air Force (USAF) and Automated Surface/Weather Oberving Systems/Automated Weather Observing System (ASOS/AWOS - both regulated by the Federal Aviation Administration) on an hourly basis.

We also filter for only weather stations in the United States and make our DATE column to a timestamp which will be useful for later.

In [0]:
# Clean before function 
expr = '.*\sUS$'

df_weatherRaw = df_weatherRaw.filter(df_weatherRaw.SOURCE == "7").withColumn("DATE",to_timestamp("DATE")).filter(col("NAME").rlike(expr))


In [0]:
# Let's take a look at shape of our 1st Cleaned Weather dataframe
print('# Rows Weather Clean1:', df_weatherRaw.count()) # 223.8 M
print('# Columns Weather Clean1:', len(df_weatherRaw.columns))

# Rows Weather Clean1: 223810507
# Columns Weather Clean1: 35


Now that our weather dataset is at a smaller size, we will do calculations on the rows we need. Specifically getting the proper UTC timestamp that we will need for the join. Further cleaning is being done in phase III.

In [0]:
# Function to prep weather dataset    
def prep_weather(df):
    '''
    Input: dataframe
    Output: cleaned dataframe with formatted times 
    '''
    # convert lat and long to decimal 
    df = df.withColumn("LATITUDE", df.LATITUDE.cast(DecimalType()))
    df = df.withColumn("LONGITUDE", df.LONGITUDE.cast(DecimalType()))
  
    # UDF to get TZ
    def ret_tz(longitude, latitude):
      '''
      Input: longitude, latitude
      Output: proper timezone
      '''
      if longitude is not None and latitude is not None:
          return get_tz(lng=longitude, lat=latitude)
    
    ret_tzUDF = udf(lambda x,y: ret_tz(x,y),StringType())
    df = df.withColumn("timezone", ret_tzUDF("LONGITUDE","LATITUDE"))
    
    # convert timestamp to UTC
    df = df.withColumn("DATE_UTC", to_utc_timestamp(col("DATE"), col("timezone")))
    

    # drop intermediary cols
    df = df.drop("timezone")
    df = df.drop("DATE")
        
    return df

After running the above function we need to get a rounded UTC stamp to the hour (will be used to join to the airlines dataset), and we will remove any duplicates based on the rounded UTC time stamp and station.

In [0]:
# Run Weather Prep Function
df_weatherRaw = prep_weather(df_weatherRaw)
df_weatherSub1Clean2 = df_weatherRaw.withColumn("rounded_DATE_UTC", f.date_trunc("hour", df_weatherRaw.DATE_UTC)).persist()
df_weatherSub1Clean = df_weatherSub1Clean2.dropDuplicates(['rounded_DATE_UTC', 'station'])

In [0]:
# Let's take a look at shape of our 2nd Cleaned Weather dataframe
print('# Rows Weather Clean:', df_weatherSub1Clean.count()) # 116.3 M
print('# Columns Weather Clean:', len(df_weatherSub1Clean.columns))

# Rows Weather Clean: 116336620
# Columns Weather Clean: 36


In [0]:
# Write clean weather data
#df_weatherSub1Clean.write.mode('overwrite').parquet(f"{blob_url}/dfw_Clean")
df_weatherSub1Clean = spark.read.parquet(f"{blob_url}/dfw_Clean")

# Other Data Processing
In the initial data exploration (EDA (I): Understanding the Data section in final notebook) it was determined that the columns needed from the weather and airports dataset were already clean and could be immediately used to execute the joins. Below we simply adjust the names of the dataframes for later.

In [0]:
# Make clean df
df_stationsSub1Clean =  df_stationsSub1.persist()

In [0]:
# Make clean df
df_airportsSub1Clean = df_airportsSub1.persist()

In [0]:
df_airlinesRaw.unpersist()
df_weatherRaw.unpersist()

Out[12]: DataFrame[STATION: string, YEAR: int, DATE: string, LATITUDE: string, LONGITUDE: string, ELEVATION: string, NAME: string, REPORT_TYPE: string, SOURCE: string, HourlyAltimeterSetting: string, HourlyDewPointTemperature: string, HourlyDryBulbTemperature: string, HourlyPrecipitation: string, HourlyPresentWeatherType: string, HourlyPressureChange: string, HourlyPressureTendency: string, HourlyRelativeHumidity: string, HourlySkyConditions: string, HourlySeaLevelPressure: string, HourlyStationPressure: string, HourlyVisibility: string, HourlyWetBulbTemperature: string, HourlyWindDirection: string, HourlyWindGustSpeed: string, HourlyWindSpeed: string, Sunrise: string, Sunset: string, AWND: string, CDSD: string, CLDD: string, DSNW: string, HDSD: string, HTDD: string, NormalsCoolingDegreeDay: string, NormalsHeatingDegreeDay: string]

# Joins
The below code runs the full join across all years.
### Update on Joining:
We decided on adding onto our original way of joining airlines and weather. Previously, we used weather station as well as UTC time of a flight's scheduled departure; however, now we have changed what weather observations we want to provide each airline record with although we still keep the same way of joining on weather station to closest station to airport. As shown in the below two cells, we include weather data 1 hour before the flight origin departure time as well as 2 hours after the flight origin departure time. The reasoning for including data 1 hour before the flight origin departure time is so that we can account for allowing airport staff the time to prepare for, if any, delays. The reasoning for including data 2 hours after the flight origin departure time is so that we can include indicators if the destination is having bad weather. We make the assumption that this is information we would have access to in a real life data pipeline.

### Execute the Join

In [0]:
# Airlines_airports join
df_airlinesSub1Clean.createOrReplaceTempView('airlines')
df_airportsSub1Clean.createOrReplaceTempView('airports')

df_airlines_airports = sqlContext.sql("""
  select al.*, ap.*
  from airlines as al 
    left join airports as ap
      on al.ORIGIN = ap.iata_code
 """).persist()

df_airlinesSub1Clean.createOrReplaceTempView('airlines')
df_airportsSub1Clean.createOrReplaceTempView('airports')

df_airlines_airports = sqlContext.sql("""
  select al.*, ap.*
  from airlines as al 
    left join airports as ap
      on al.ORIGIN = ap.iata_code
 """).persist()

In [0]:
# Convert from local timestamp to UTC timestamp only for scheduled departure
df_airlines_airports = convert_to_utc2(df_airlines_airports).persist()

In [0]:
# create columns for which we wish to join weather data to
df_airlines_airports = df_airlines_airports.withColumn("scheduled_departure_UTC_minus_1hr", col("scheduled_departure_UTC") - f.expr("INTERVAL 1 HOURS"))
df_airlines_airports = df_airlines_airports.withColumn("scheduled_departure_UTC_add_2hr", col("scheduled_departure_UTC") + f.expr("INTERVAL 2 HOURS"))

In [0]:
# Check if above cell worked
df_airlines_airports.createOrReplaceTempView('quickView')
quickview = sqlContext.sql(""" select * from quickView limit 50 """).persist()
display(quickview)

QUARTER,MONTH,DAY_OF_MONTH,DAY_OF_WEEK,FL_DATE,OP_UNIQUE_CARRIER,OP_CARRIER_AIRLINE_ID,OP_CARRIER,TAIL_NUM,OP_CARRIER_FL_NUM,ORIGIN_AIRPORT_ID,ORIGIN_AIRPORT_SEQ_ID,ORIGIN_CITY_MARKET_ID,ORIGIN,ORIGIN_CITY_NAME,ORIGIN_STATE_ABR,ORIGIN_STATE_FIPS,ORIGIN_STATE_NM,ORIGIN_WAC,DEST_AIRPORT_ID,DEST_AIRPORT_SEQ_ID,DEST_CITY_MARKET_ID,DEST,DEST_CITY_NAME,DEST_STATE_ABR,DEST_STATE_FIPS,DEST_STATE_NM,DEST_WAC,CRS_DEP_TIME,DEP_TIME,DEP_DELAY,DEP_DELAY_NEW,DEP_DEL15,DEP_DELAY_GROUP,DEP_TIME_BLK,TAXI_OUT,WHEELS_OFF,WHEELS_ON,TAXI_IN,CANCELLED,CANCELLATION_CODE,DIVERTED,FLIGHTS,DISTANCE,DISTANCE_GROUP,CARRIER_DELAY,WEATHER_DELAY,NAS_DELAY,SECURITY_DELAY,LATE_AIRCRAFT_DELAY,YEAR,id,ident,type,name,latitude_deg,longitude_deg,elevation_ft,continent,iso_country,iso_region,municipality,scheduled_service,gps_code,iata_code,local_code,home_link,wikipedia_link,keywords,CRS_DEP_TIME_mod,FL_DATE_2,local_timezone,local_timestamp,timezone,scheduled_departure_UTC,scheduled_departure_UTC_minus_1hr,scheduled_departure_UTC_add_2hr
1,1,5,6,2019-01-05,OO,20304,OO,N930SW,5022,11468,1146802,31468,EAR,"Kearney, NE",NE,31,Nebraska,65,11292,1129202,30325,DEN,"Denver, CO",CO,8,Colorado,82,635,621,-14.0,0.0,0.0,-1,0600-0659,14.0,635,630,8.0,0.0,,0.0,1.0,305.0,2,,,,,,2019,19716,KEAR,medium_airport,Kearney Regional Airport,40.727001,-99.006798,2131.0,,US,US-NE,Kearney,yes,KEAR,EAR,EAR,,https://en.wikipedia.org/wiki/Kearney_Municipal_Airport,,06:35,2019-01-05,2019-01-05T06:35,2019-01-05T06:35:00.000+0000,America/Chicago,2019-01-05T12:35:00.000+0000,2019-01-05T11:35:00.000+0000,2019-01-05T14:35:00.000+0000
3,7,24,6,2021-07-24 00:00:00,OO,20304,OO,N223JS,5059,11468,1146802,31468,EAR,"Kearney, NE",NE,31,Nebraska,65,13930,1393007,30977,ORD,"Chicago, IL",IL,17,Illinois,41,1032,2102,630.0,630.0,1.0,12,1000-1059,13.0,2115,2235,16.0,0.0,,0.0,1.0,583.0,3,627.0,0.0,0.0,0.0,0.0,2021,19716,KEAR,medium_airport,Kearney Regional Airport,40.727001,-99.006798,2131.0,,US,US-NE,Kearney,yes,KEAR,EAR,EAR,,https://en.wikipedia.org/wiki/Kearney_Municipal_Airport,,10:32,2021-07-24,2021-07-24T10:32,2021-07-24T10:32:00.000+0000,America/Chicago,2021-07-24T15:32:00.000+0000,2021-07-24T14:32:00.000+0000,2021-07-24T17:32:00.000+0000
3,7,24,6,2021-07-24 00:00:00,OO,20304,OO,N933EV,5072,11468,1146802,31468,EAR,"Kearney, NE",NE,31,Nebraska,65,11292,1129202,30325,DEN,"Denver, CO",CO,8,Colorado,82,630,1111,281.0,281.0,1.0,12,0600-0659,14.0,1125,1115,7.0,0.0,,0.0,1.0,305.0,2,271.0,0.0,0.0,0.0,0.0,2021,19716,KEAR,medium_airport,Kearney Regional Airport,40.727001,-99.006798,2131.0,,US,US-NE,Kearney,yes,KEAR,EAR,EAR,,https://en.wikipedia.org/wiki/Kearney_Municipal_Airport,,06:30,2021-07-24,2021-07-24T06:30,2021-07-24T06:30:00.000+0000,America/Chicago,2021-07-24T11:30:00.000+0000,2021-07-24T10:30:00.000+0000,2021-07-24T13:30:00.000+0000
3,7,24,6,2021-07-24 00:00:00,OO,20304,OO,N975SW,5176,11468,1146802,31468,EAR,"Kearney, NE",NE,31,Nebraska,65,11292,1129202,30325,DEN,"Denver, CO",CO,8,Colorado,82,1306,1444,98.0,98.0,1.0,6,1300-1359,18.0,1502,1455,9.0,0.0,,0.0,1.0,305.0,2,97.0,0.0,0.0,0.0,0.0,2021,19716,KEAR,medium_airport,Kearney Regional Airport,40.727001,-99.006798,2131.0,,US,US-NE,Kearney,yes,KEAR,EAR,EAR,,https://en.wikipedia.org/wiki/Kearney_Municipal_Airport,,13:06,2021-07-24,2021-07-24T13:06,2021-07-24T13:06:00.000+0000,America/Chicago,2021-07-24T18:06:00.000+0000,2021-07-24T17:06:00.000+0000,2021-07-24T20:06:00.000+0000
4,12,20,4,2018-12-20,OO,20304,OO,N901EV,5022,11468,1146802,31468,EAR,"Kearney, NE",NE,31,Nebraska,65,11292,1129202,30325,DEN,"Denver, CO",CO,8,Colorado,82,635,615,-20.0,0.0,0.0,-2,0600-0659,10.0,625,614,18.0,0.0,,0.0,1.0,305.0,2,,,,,,2018,19716,KEAR,medium_airport,Kearney Regional Airport,40.727001,-99.006798,2131.0,,US,US-NE,Kearney,yes,KEAR,EAR,EAR,,https://en.wikipedia.org/wiki/Kearney_Municipal_Airport,,06:35,2018-12-20,2018-12-20T06:35,2018-12-20T06:35:00.000+0000,America/Chicago,2018-12-20T12:35:00.000+0000,2018-12-20T11:35:00.000+0000,2018-12-20T14:35:00.000+0000
4,12,20,4,2018-12-20,OO,20304,OO,N909SW,5025,11468,1146802,31468,EAR,"Kearney, NE",NE,31,Nebraska,65,11292,1129202,30325,DEN,"Denver, CO",CO,8,Colorado,82,1715,1700,-15.0,0.0,0.0,-1,1700-1759,17.0,1717,1708,9.0,0.0,,0.0,1.0,305.0,2,,,,,,2018,19716,KEAR,medium_airport,Kearney Regional Airport,40.727001,-99.006798,2131.0,,US,US-NE,Kearney,yes,KEAR,EAR,EAR,,https://en.wikipedia.org/wiki/Kearney_Municipal_Airport,,17:15,2018-12-20,2018-12-20T17:15,2018-12-20T17:15:00.000+0000,America/Chicago,2018-12-20T23:15:00.000+0000,2018-12-20T22:15:00.000+0000,2018-12-21T01:15:00.000+0000
4,12,17,1,2018-12-17,OO,20304,OO,N945SW,5022,11468,1146802,31468,EAR,"Kearney, NE",NE,31,Nebraska,65,11292,1129202,30325,DEN,"Denver, CO",CO,8,Colorado,82,635,618,-17.0,0.0,0.0,-2,0600-0659,16.0,634,621,7.0,0.0,,0.0,1.0,305.0,2,,,,,,2018,19716,KEAR,medium_airport,Kearney Regional Airport,40.727001,-99.006798,2131.0,,US,US-NE,Kearney,yes,KEAR,EAR,EAR,,https://en.wikipedia.org/wiki/Kearney_Municipal_Airport,,06:35,2018-12-17,2018-12-17T06:35,2018-12-17T06:35:00.000+0000,America/Chicago,2018-12-17T12:35:00.000+0000,2018-12-17T11:35:00.000+0000,2018-12-17T14:35:00.000+0000
4,12,17,1,2018-12-17,OO,20304,OO,N962SW,5025,11468,1146802,31468,EAR,"Kearney, NE",NE,31,Nebraska,65,11292,1129202,30325,DEN,"Denver, CO",CO,8,Colorado,82,1714,1708,-6.0,0.0,0.0,-1,1700-1759,18.0,1726,1716,11.0,0.0,,0.0,1.0,305.0,2,,,,,,2018,19716,KEAR,medium_airport,Kearney Regional Airport,40.727001,-99.006798,2131.0,,US,US-NE,Kearney,yes,KEAR,EAR,EAR,,https://en.wikipedia.org/wiki/Kearney_Municipal_Airport,,17:14,2018-12-17,2018-12-17T17:14,2018-12-17T17:14:00.000+0000,America/Chicago,2018-12-17T23:14:00.000+0000,2018-12-17T22:14:00.000+0000,2018-12-18T01:14:00.000+0000
1,2,19,5,2021-02-19 00:00:00,OO,20304,OO,N593ML,5131,11468,1146802,31468,EAR,"Kearney, NE",NE,31,Nebraska,65,13930,1393007,30977,ORD,"Chicago, IL",IL,17,Illinois,41,800,748,-12.0,0.0,0.0,-1,0800-0859,20.0,808,934,12.0,0.0,,0.0,1.0,583.0,3,,,,,,2021,19716,KEAR,medium_airport,Kearney Regional Airport,40.727001,-99.006798,2131.0,,US,US-NE,Kearney,yes,KEAR,EAR,EAR,,https://en.wikipedia.org/wiki/Kearney_Municipal_Airport,,08:00,2021-02-19,2021-02-19T08:00,2021-02-19T08:00:00.000+0000,America/Chicago,2021-02-19T14:00:00.000+0000,2021-02-19T13:00:00.000+0000,2021-02-19T16:00:00.000+0000
1,2,19,5,2021-02-19 00:00:00,OO,20304,OO,N929EV,5091,11468,1146802,31468,EAR,"Kearney, NE",NE,31,Nebraska,65,11292,1129202,30325,DEN,"Denver, CO",CO,8,Colorado,82,1215,1204,-11.0,0.0,0.0,-1,1200-1259,11.0,1215,1209,9.0,0.0,,0.0,1.0,305.0,2,,,,,,2021,19716,KEAR,medium_airport,Kearney Regional Airport,40.727001,-99.006798,2131.0,,US,US-NE,Kearney,yes,KEAR,EAR,EAR,,https://en.wikipedia.org/wiki/Kearney_Municipal_Airport,,12:15,2021-02-19,2021-02-19T12:15,2021-02-19T12:15:00.000+0000,America/Chicago,2021-02-19T18:15:00.000+0000,2021-02-19T17:15:00.000+0000,2021-02-19T20:15:00.000+0000


In [0]:
# Create DF of closest stations
df_stationsSub1Clean.createOrReplaceTempView('stations')
df_closest_stations = sqlContext.sql("""
  WITH cte as (
    select 
      ROW_NUMBER() OVER (PARTITION BY neighbor_call ORDER BY distance_to_neighbor ASC) AS rank,
      station_id,
      neighbor_call
    from stations
  )
  select 
    station_id,
    neighbor_call
  from cte 
  where rank = 1 
""").persist()

# Add ICAO for airlines_airports
def transform_to_ICAO(origin, gps_code):
  '''
  Argument: IATA code
  Return: ICAO code
  '''
  if origin == 'ISN':
    icao = 'KISN'
  elif origin == 'PSE':
    icao = 'TJMZ'
  elif origin == 'PPG':
    icao = 'PHKO'
  elif origin == 'SJU':
    icao = 'TJIG'
  elif origin == 'GUM':
    icao = 'PASY'
  else:
    icao = gps_code
  return icao
  
udf_iata_func = udf(transform_to_ICAO, StringType())
df_airlines_airports = df_airlines_airports.withColumn("closest_station_ICAO", udf_iata_func(df_airlines_airports.ORIGIN, df_airlines_airports.gps_code))

df_airlines_airports = df_airlines_airports.withColumn("rounded_depTimestamp", f.date_trunc("hour", df_airlines_airports.scheduled_departure_UTC)).persist()

df_airlines_airports = df_airlines_airports.withColumn("rounded_depTimestamp_minus_1hr", f.date_trunc("hour", df_airlines_airports.scheduled_departure_UTC_minus_1hr)).persist()

df_airlines_airports = df_airlines_airports.withColumn("rounded_depTimestamp_add_2hr", f.date_trunc("hour", df_airlines_airports.scheduled_departure_UTC_add_2hr)).persist()

# Join closest station for airlines_airports
df_airlines_airports.createOrReplaceTempView('airlines_airports')
df_closest_stations.createOrReplaceTempView('closest_stations')

df_airlines_airports_closest_station = sqlContext.sql("""
  select *
  from airlines_airports
    left join closest_stations
      on airlines_airports.closest_station_ICAO = closest_stations.neighbor_call
 """).persist()

At this stage we did a review of columns and realized there were many that we did not need anymore because either the information would not be relvant for our models (e.g. wikipedia_link) or because we had columns with more accurate information (e.g. OP_UNIQUE_CARRIER is better to use than OP_CARRIER according to the data dictionary provided by the United States Department of Transportation). We remove them now to help speed up the join.

In [0]:
# Drop columns pre join - 
dropCols = ['name', 'YEAR', 'ident', 'latitude_deg', 'longitude_deg', 'gps_code', 'iata_code', 'local_code', 'home_link', 'wikipedia_link', 'keywords', 'FL_DATE', 'OP_CARRIER_AIRLINE_ID', 'OP_CARRIER', 'ORIGIN_AIRPORT_SEQ_ID', 'ORIGIN_CITY_MARKET_ID', 'ORIGIN_WAC', 'ORIGIN_STATE_FIPS', 'ORIGIN_STATE_NM', 'DEST_AIRPORT_SEQ_ID', 'DEST_CITY_MARKET_ID', 'DEST_STATE_FIPS', 'DEST_STATE_NM', 'DEST_WAC', 'DEP_TIME', 'DEP_DEL15', 'DEP_DELAY_GROUP','WHEELS_OFF','WHEELS_ON','CANCELLATION_CODE', 'FLIGHTS', 'id', 'continent','iso_country','iso_region','municipality','CRS_DEP_TIME_mod','local_timezone','closest_station_ICAO','neighbor_call']

df_airlines_airports_closest_station = df_airlines_airports_closest_station.drop(*dropCols).persist()

In [0]:
# Join conditions for origin_dep_UTC - 1 hr
df_airlines_airports_closest_station.createOrReplaceTempView('airlines_airports_closest_stations')
df_weatherSub1Clean.createOrReplaceTempView('weather')

df_main_1 = sqlContext.sql("""
  select 
    aacs.*, 
    w.STATION as station, 
     w.YEAR as YEAR, 
     w.LATITUDE as LATITUDE,
     w.LONGITUDE as LONGITUDE,
     w.ELEVATION as ELEVATION,
     w.NAME as NAME,
     w.REPORT_TYPE as REPORT_TYPE,
     w.SOURCE as SOURCE,
     w.HourlyAltimeterSetting as origin_HourlyAltimeterSetting,
     w.HourlyDewPointTemperature as origin_HourlyDewPointTemperature,
     w.HourlyDryBulbTemperature as origin_HourlyDryBulbTemperature,
     w.HourlyPrecipitation as origin_HourlyPrecipitation,
     w.HourlyPresentWeatherType as origin_HourlyPresentWeatherType,
     w.HourlyPressureChange as origin_HourlyPressureChange,
     w.HourlyPressureTendency as origin_HourlyPressureTendency,
     w.HourlyRelativeHumidity as origin_HourlyRelativeHumidity,
     w.HourlySkyConditions as origin_HourlySkyConditions,
     w.HourlySeaLevelPressure as origin_HourlySeaLevelPressure,
     w.HourlyStationPressure as origin_HourlyStationPressure,
     w.HourlyVisibility as origin_HourlyVisibility,
     w.HourlyWetBulbTemperature as origin_HourlyWetBulbTemperature,
     w.HourlyWindDirection as origin_HourlyWindDirection,
     w.HourlyWindGustSpeed as origin_HourlyWindGustSpeed,
     w.HourlyWindSpeed as origin_HourlyWindSpeed,
     w.Sunrise as origin_Sunrise,
     w.Sunset as origin_Sunset,
     w.AWND as origin_AWND,
     w.CDSD as origin_CDSD,
     w.CLDD as origin_CLDD,
     w.DSNW as origin_DSNW,
     w.HDSD as origin_HDSD,
     w.HTDD as origin_HTDD,
     w.NormalsCoolingDegreeDay as origin_NormalsCoolingDegreeDay,
     w.NormalsHeatingDegreeDay as origin_NormalsHeatingDegreeDay
  from airlines_airports_closest_stations as aacs
    left join weather as w
      on aacs.station_id = w.STATION and aacs.rounded_depTimestamp_minus_1hr = w.rounded_DATE_UTC
""").persist()

In [0]:
# Check if above cell worked
df_main_1.createOrReplaceTempView('quickView')
quickview = sqlContext.sql(""" select * from quickView limit 50 """).persist()
display(quickview)

QUARTER,MONTH,DAY_OF_MONTH,DAY_OF_WEEK,OP_UNIQUE_CARRIER,TAIL_NUM,OP_CARRIER_FL_NUM,ORIGIN_AIRPORT_ID,ORIGIN,ORIGIN_CITY_NAME,ORIGIN_STATE_ABR,DEST_AIRPORT_ID,DEST,DEST_CITY_NAME,DEST_STATE_ABR,CRS_DEP_TIME,DEP_DELAY,DEP_DELAY_NEW,DEP_TIME_BLK,TAXI_OUT,TAXI_IN,CANCELLED,DIVERTED,DISTANCE,DISTANCE_GROUP,CARRIER_DELAY,WEATHER_DELAY,NAS_DELAY,SECURITY_DELAY,LATE_AIRCRAFT_DELAY,type,elevation_ft,scheduled_service,FL_DATE_2,local_timestamp,timezone,scheduled_departure_UTC,scheduled_departure_UTC_minus_1hr,scheduled_departure_UTC_add_2hr,rounded_depTimestamp,rounded_depTimestamp_minus_1hr,rounded_depTimestamp_add_2hr,station_id,station,YEAR,LATITUDE,LONGITUDE,ELEVATION,NAME,REPORT_TYPE,SOURCE,origin_HourlyAltimeterSetting,origin_HourlyDewPointTemperature,origin_HourlyDryBulbTemperature,origin_HourlyPrecipitation,origin_HourlyPresentWeatherType,origin_HourlyPressureChange,origin_HourlyPressureTendency,origin_HourlyRelativeHumidity,origin_HourlySkyConditions,origin_HourlySeaLevelPressure,origin_HourlyStationPressure,origin_HourlyVisibility,origin_HourlyWetBulbTemperature,origin_HourlyWindDirection,origin_HourlyWindGustSpeed,origin_HourlyWindSpeed,origin_Sunrise,origin_Sunset,origin_AWND,origin_CDSD,origin_CLDD,origin_DSNW,origin_HDSD,origin_HTDD,origin_NormalsCoolingDegreeDay,origin_NormalsHeatingDegreeDay
3,7,14,7,UA,N73278,117,14955,SPN,"Saipan, TT",TT,12016,GUM,"Guam, TT",TT,915,-5.0,0.0,0900-0959,11.0,5.0,0.0,0.0,129.0,1,,,,,,medium_airport,215.0,yes,2019-07-14,2019-07-14T09:15:00.000+0000,Pacific/Saipan,2019-07-13T23:15:00.000+0000,2019-07-13T22:15:00.000+0000,2019-07-14T01:15:00.000+0000,2019-07-13T23:00:00.000+0000,2019-07-13T22:00:00.000+0000,2019-07-14T01:00:00.000+0000,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
1,3,2,1,UA,N36280,117,14955,SPN,"Saipan, TT",TT,12016,GUM,"Guam, TT",TT,915,53.0,53.0,0900-0959,8.0,2.0,0.0,0.0,129.0,1,0.0,0.0,0.0,0.0,51.0,medium_airport,215.0,yes,2020-03-02,2020-03-02T09:15:00.000+0000,Pacific/Saipan,2020-03-01T23:15:00.000+0000,2020-03-01T22:15:00.000+0000,2020-03-02T01:15:00.000+0000,2020-03-01T23:00:00.000+0000,2020-03-01T22:00:00.000+0000,2020-03-02T01:00:00.000+0000,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
3,9,10,4,OO,N986SW,5115,13832,OGS,"Ogdensburg, NY",NY,13930,ORD,"Chicago, IL",IL,1731,,,1700-1759,,,1.0,0.0,654.0,3,,,,,,small_airport,297.0,yes,2020-09-10,2020-09-10T17:31:00.000+0000,America/New_York,2020-09-10T21:31:00.000+0000,2020-09-10T20:31:00.000+0000,2020-09-10T23:31:00.000+0000,2020-09-10T21:00:00.000+0000,2020-09-10T20:00:00.000+0000,2020-09-10T23:00:00.000+0000,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
4,12,13,7,OO,N877AS,5834,16869,XWA,"Williston, ND",ND,11292,DEN,"Denver, CO",CO,1539,-9.0,0.0,1500-1559,11.0,9.0,0.0,0.0,582.0,3,,,,,,medium_airport,2344.0,yes,2020-12-13,2020-12-13T15:39:00.000+0000,America/Chicago,2020-12-13T21:39:00.000+0000,2020-12-13T20:39:00.000+0000,2020-12-13T23:39:00.000+0000,2020-12-13T21:00:00.000+0000,2020-12-13T20:00:00.000+0000,2020-12-13T23:00:00.000+0000,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
1,2,21,7,AS,N765AS,50,10754,BRW,"Barrow, AK",AK,10299,ANC,"Anchorage, AK",AK,1905,-13.0,0.0,1900-1959,3.0,3.0,0.0,0.0,725.0,3,,,,,,medium_airport,44.0,yes,2016-02-21,2016-02-21T19:05:00.000+0000,America/Anchorage,2016-02-22T04:05:00.000+0000,2016-02-22T03:05:00.000+0000,2016-02-22T06:05:00.000+0000,2016-02-22T04:00:00.000+0000,2016-02-22T03:00:00.000+0000,2016-02-22T06:00:00.000+0000,70026027502.0,70026027502.0,2016.0,71.0,-157.0,9.4,"BARROW AIRPORT, AK US",FM-15,7.0,29.84,-7.0,-4.0,T,BR:1 ||,,,86.0,OVC:08 11,29.83,29.83,1.0,-4.0,50.0,,15.0,,,,,,,,,,
1,3,14,4,AS,N611AS,55,10754,BRW,"Barrow, AK",AK,11630,FAI,"Fairbanks, AK",AK,1155,-20.0,0.0,1100-1159,9.0,4.0,0.0,0.0,503.0,3,,,,,,medium_airport,44.0,yes,2019-03-14,2019-03-14T11:55:00.000+0000,America/Anchorage,2019-03-14T19:55:00.000+0000,2019-03-14T18:55:00.000+0000,2019-03-14T21:55:00.000+0000,2019-03-14T19:00:00.000+0000,2019-03-14T18:00:00.000+0000,2019-03-14T21:00:00.000+0000,70026027502.0,70026027502.0,2019.0,71.0,-157.0,9.4,"BARROW AIRPORT, AK US",FM-15,7.0,29.91,-13.0,-8.0,0.00,BL:5 SN:03 ||,,,78.0,CLR:00,29.91,29.9,5.0,-9.0,50.0,,20.0,,,,,,,,,,
2,4,15,3,AS,N613AS,50,10754,BRW,"Barrow, AK",AK,10299,ANC,"Anchorage, AK",AK,1750,-27.0,0.0,1700-1759,17.0,3.0,0.0,0.0,725.0,3,,,,,,medium_airport,44.0,yes,2020-04-15,2020-04-15T17:50:00.000+0000,America/Anchorage,2020-04-16T01:50:00.000+0000,2020-04-16T00:50:00.000+0000,2020-04-16T03:50:00.000+0000,2020-04-16T01:00:00.000+0000,2020-04-16T00:00:00.000+0000,2020-04-16T03:00:00.000+0000,70026027502.0,70026027502.0,2020.0,71.0,-157.0,9.4,"BARROW AIRPORT, AK US",FM-16,7.0,29.88,21.0,23.0,,,,,92.0,FEW:02 6 BKN:07 18 OVC:08 50,,29.87,10.0,22.0,100.0,,13.0,,,,,,,,,,
2,5,28,5,AS,N615AS,50,10754,BRW,"Barrow, AK",AK,10299,ANC,"Anchorage, AK",AK,1440,18.0,18.0,1400-1459,8.0,4.0,0.0,0.0,725.0,3,0.0,0.0,3.0,18.0,0.0,medium_airport,44.0,yes,2021-05-28,2021-05-28T14:40:00.000+0000,America/Anchorage,2021-05-28T22:40:00.000+0000,2021-05-28T21:40:00.000+0000,2021-05-29T00:40:00.000+0000,2021-05-28T22:00:00.000+0000,2021-05-28T21:00:00.000+0000,2021-05-29T00:00:00.000+0000,70026027502.0,70026027502.0,2021.0,71.0,-157.0,8.1,"BARROW AIRPORT, AK US",FM-15,7.0,30.24,27.0,31.0,0.00,,,,85.0,BKN:07 10 BKN:07 14 OVC:08 70,30.24,30.23,10.0,30.0,360.0,,8.0,,,,,,,,,,
2,4,12,7,AS,N763AS,55,14709,SCC,"Deadhorse, AK",AK,10754,BRW,"Barrow, AK",AK,1013,-7.0,0.0,1000-1059,5.0,3.0,0.0,0.0,204.0,1,,,,,,medium_airport,65.0,yes,2015-04-12,2015-04-12T10:13:00.000+0000,America/Anchorage,2015-04-12T18:13:00.000+0000,2015-04-12T17:13:00.000+0000,2015-04-12T20:13:00.000+0000,2015-04-12T18:00:00.000+0000,2015-04-12T17:00:00.000+0000,2015-04-12T20:00:00.000+0000,70063727406.0,70063727406.0,2015.0,70.0,-148.0,18.6,"DEADHORSE, AK US",FM-15,7.0,29.61,2.0,7.0,T,-SN:03 |SN |,,,80.0,BKN:07 32 OVC:08 75,29.62,29.55,9.0,6.0,70.0,,10.0,,,,,,,,,,
3,7,17,5,AS,N763AS,51,14709,SCC,"Deadhorse, AK",AK,10299,ANC,"Anchorage, AK",AK,2148,-23.0,0.0,2100-2159,3.0,4.0,0.0,0.0,627.0,3,,,,,,medium_airport,65.0,yes,2015-07-17,2015-07-17T21:48:00.000+0000,America/Anchorage,2015-07-18T05:48:00.000+0000,2015-07-18T04:48:00.000+0000,2015-07-18T07:48:00.000+0000,2015-07-18T05:00:00.000+0000,2015-07-18T04:00:00.000+0000,2015-07-18T07:00:00.000+0000,70063727406.0,70063727406.0,2015.0,70.0,-148.0,18.6,"DEADHORSE, AK US",FM-15,7.0,29.57,42.0,47.0,T,,-0.03,3.0,83.0,SCT:04 110,29.57,29.51,10.0,45.0,30.0,,8.0,,,,,,,,,,


In [0]:
# Join conditions for origin_dep_UTC + 2 hrs
df_main_1.createOrReplaceTempView('df_main_1')

df_main = sqlContext.sql("""
  select 
    aacs.*, 
     w.HourlyAltimeterSetting as dest_HourlyAltimeterSetting,
     w.HourlyDewPointTemperature as dest_HourlyDewPointTemperature,
     w.HourlyDryBulbTemperature as dest_HourlyDryBulbTemperature,
     w.HourlyPrecipitation as dest_HourlyPrecipitation,
     w.HourlyPresentWeatherType as dest_HourlyPresentWeatherType,
     w.HourlyPressureChange as dest_HourlyPressureChange,
     w.HourlyPressureTendency as dest_HourlyPressureTendency,
     w.HourlyRelativeHumidity as dest_HourlyRelativeHumidity,
     w.HourlySkyConditions as dest_HourlySkyConditions,
     w.HourlySeaLevelPressure as dest_HourlySeaLevelPressure,
     w.HourlyStationPressure as dest_HourlyStationPressure,
     w.HourlyVisibility as dest_HourlyVisibility,
     w.HourlyWetBulbTemperature as dest_HourlyWetBulbTemperature,
     w.HourlyWindDirection as dest_HourlyWindDirection,
     w.HourlyWindGustSpeed as dest_HourlyWindGustSpeed,
     w.HourlyWindSpeed as dest_HourlyWindSpeed,
     w.Sunrise as dest_Sunrise,
     w.Sunset as dest_Sunset,
     w.AWND as dest_AWND,
     w.CDSD as dest_CDSD,
     w.CLDD as dest_CLDD,
     w.DSNW as dest_DSNW,
     w.HDSD as dest_HDSD,
     w.HTDD as dest_HTDD,
     w.NormalsCoolingDegreeDay as dest_NormalsCoolingDegreeDay,
     w.NormalsHeatingDegreeDay as dest_NormalsHeatingDegreeDay
  from df_main_1 as aacs
    left join weather as w
      on aacs.station_id = w.STATION and aacs.rounded_depTimestamp_add_2hr = w.rounded_DATE_UTC
""").persist()

Remove columns we know for sure we will not need anymore post join.

In [0]:
# drop post join columns
dropCols = ['station_id', 'STATION', 'YEAR', 'LATITUDE', 'LONGITUDE', 'NAME', 'ELEVATION', 'REPORT_TYPE']
df_main = df_main.drop(*dropCols).persist()

# Clean up memory and disk 
df_airlinesSub1Clean.unpersist()
df_weatherSub1Clean.unpersist()
df_stationsSub1Clean.unpersist()
df_airportsSub1Clean.unpersist()
df_closest_stations.unpersist()
df_airlines_airports.unpersist()
df_airlines_airports_closest_station.unpersist()

Out[30]: DataFrame[QUARTER: int, MONTH: int, DAY_OF_MONTH: int, DAY_OF_WEEK: int, OP_UNIQUE_CARRIER: string, TAIL_NUM: string, OP_CARRIER_FL_NUM: int, ORIGIN_AIRPORT_ID: int, ORIGIN: string, ORIGIN_CITY_NAME: string, ORIGIN_STATE_ABR: string, DEST_AIRPORT_ID: int, DEST: string, DEST_CITY_NAME: string, DEST_STATE_ABR: string, CRS_DEP_TIME: int, DEP_DELAY: double, DEP_DELAY_NEW: double, DEP_TIME_BLK: string, TAXI_OUT: double, TAXI_IN: double, CANCELLED: double, DIVERTED: double, DISTANCE: double, DISTANCE_GROUP: int, CARRIER_DELAY: double, WEATHER_DELAY: double, NAS_DELAY: double, SECURITY_DELAY: double, LATE_AIRCRAFT_DELAY: double, type: string, elevation_ft: string, scheduled_service: string, FL_DATE_2: date, local_timestamp: timestamp, timezone: string, scheduled_departure_UTC: timestamp, scheduled_departure_UTC_minus_1hr: timestamp, scheduled_departure_UTC_add_2hr: timestamp, rounded_depTimestamp: timestamp, rounded_depTimestamp_minus_1hr: timestamp, rounded_depTimestamp_add_2hr: tim

In [0]:
df_main.write.mode('overwrite').parquet(f"{blob_url}/df_main_fullJoin")

In [0]:
df_main = spark.read.parquet(f"{blob_url}/df_main_fullJoin")

### Investigate New 2015-2021 Join

In [0]:
print('# Rows:', df_main.count())
print('# Distinct Rows:', df_main.distinct().count())
print('# Columns:', len(df_main.columns))

# Rows: 42430592
# Distinct Rows: 42430592
# Columns: 95


In [0]:
display(df_main)

QUARTER,MONTH,DAY_OF_MONTH,DAY_OF_WEEK,OP_UNIQUE_CARRIER,TAIL_NUM,OP_CARRIER_FL_NUM,ORIGIN_AIRPORT_ID,ORIGIN,ORIGIN_CITY_NAME,ORIGIN_STATE_ABR,DEST_AIRPORT_ID,DEST,DEST_CITY_NAME,DEST_STATE_ABR,CRS_DEP_TIME,DEP_DELAY,DEP_DELAY_NEW,DEP_TIME_BLK,TAXI_OUT,TAXI_IN,CANCELLED,DIVERTED,DISTANCE,DISTANCE_GROUP,CARRIER_DELAY,WEATHER_DELAY,NAS_DELAY,SECURITY_DELAY,LATE_AIRCRAFT_DELAY,type,elevation_ft,scheduled_service,FL_DATE_2,local_timestamp,timezone,scheduled_departure_UTC,scheduled_departure_UTC_minus_1hr,scheduled_departure_UTC_add_2hr,rounded_depTimestamp,rounded_depTimestamp_minus_1hr,rounded_depTimestamp_add_2hr,SOURCE,origin_HourlyAltimeterSetting,origin_HourlyDewPointTemperature,origin_HourlyDryBulbTemperature,origin_HourlyPrecipitation,origin_HourlyPresentWeatherType,origin_HourlyPressureChange,origin_HourlyPressureTendency,origin_HourlyRelativeHumidity,origin_HourlySkyConditions,origin_HourlySeaLevelPressure,origin_HourlyStationPressure,origin_HourlyVisibility,origin_HourlyWetBulbTemperature,origin_HourlyWindDirection,origin_HourlyWindGustSpeed,origin_HourlyWindSpeed,origin_Sunrise,origin_Sunset,origin_AWND,origin_CDSD,origin_CLDD,origin_DSNW,origin_HDSD,origin_HTDD,origin_NormalsCoolingDegreeDay,origin_NormalsHeatingDegreeDay,dest_HourlyAltimeterSetting,dest_HourlyDewPointTemperature,dest_HourlyDryBulbTemperature,dest_HourlyPrecipitation,dest_HourlyPresentWeatherType,dest_HourlyPressureChange,dest_HourlyPressureTendency,dest_HourlyRelativeHumidity,dest_HourlySkyConditions,dest_HourlySeaLevelPressure,dest_HourlyStationPressure,dest_HourlyVisibility,dest_HourlyWetBulbTemperature,dest_HourlyWindDirection,dest_HourlyWindGustSpeed,dest_HourlyWindSpeed,dest_Sunrise,dest_Sunset,dest_AWND,dest_CDSD,dest_CLDD,dest_DSNW,dest_HDSD,dest_HTDD,dest_NormalsCoolingDegreeDay,dest_NormalsHeatingDegreeDay
4,12,21,5,UA,N73278,117,14955,SPN,"Saipan, TT",TT,12016,GUM,"Guam, TT",TT,915,18.0,18.0,0900-0959,11.0,3.0,0.0,0.0,129.0,1,0.0,0.0,17.0,0.0,3.0,medium_airport,215.0,yes,2018-12-21,2018-12-21T09:15:00.000+0000,Pacific/Saipan,2018-12-20T23:15:00.000+0000,2018-12-20T22:15:00.000+0000,2018-12-21T01:15:00.000+0000,2018-12-20T23:00:00.000+0000,2018-12-20T22:00:00.000+0000,2018-12-21T01:00:00.000+0000,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
1,1,25,6,UA,N39726,156,14955,SPN,"Saipan, TT",TT,12016,GUM,"Guam, TT",TT,1820,-10.0,0.0,1800-1859,8.0,3.0,0.0,0.0,129.0,1,,,,,,medium_airport,215.0,yes,2020-01-25,2020-01-25T18:20:00.000+0000,Pacific/Saipan,2020-01-25T08:20:00.000+0000,2020-01-25T07:20:00.000+0000,2020-01-25T10:20:00.000+0000,2020-01-25T08:00:00.000+0000,2020-01-25T07:00:00.000+0000,2020-01-25T10:00:00.000+0000,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
4,12,21,1,OO,N433SW,5123,13832,OGS,"Ogdensburg, NY",NY,13930,ORD,"Chicago, IL",IL,1700,-12.0,0.0,1700-1759,7.0,20.0,0.0,0.0,654.0,3,,,,,,small_airport,297.0,yes,2020-12-21,2020-12-21T17:00:00.000+0000,America/New_York,2020-12-21T22:00:00.000+0000,2020-12-21T21:00:00.000+0000,2020-12-22T00:00:00.000+0000,2020-12-21T22:00:00.000+0000,2020-12-21T21:00:00.000+0000,2020-12-22T00:00:00.000+0000,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
1,1,7,4,OO,N964SW,5226,16869,XWA,"Williston, ND",ND,11292,DEN,"Denver, CO",CO,915,-10.0,0.0,0900-0959,29.0,15.0,0.0,0.0,582.0,3,,,,,,medium_airport,2344.0,yes,2021-01-07,2021-01-07T09:15:00.000+0000,America/Chicago,2021-01-07T15:15:00.000+0000,2021-01-07T14:15:00.000+0000,2021-01-07T17:15:00.000+0000,2021-01-07T15:00:00.000+0000,2021-01-07T14:00:00.000+0000,2021-01-07T17:00:00.000+0000,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
1,3,24,3,OO,N979SW,4655,16869,XWA,"Williston, ND",ND,11292,DEN,"Denver, CO",CO,745,18.0,18.0,0700-0759,50.0,7.0,0.0,0.0,582.0,3,41.0,0.0,0.0,0.0,0.0,medium_airport,2344.0,yes,2021-03-24,2021-03-24T07:45:00.000+0000,America/Chicago,2021-03-24T12:45:00.000+0000,2021-03-24T11:45:00.000+0000,2021-03-24T14:45:00.000+0000,2021-03-24T12:00:00.000+0000,2021-03-24T11:00:00.000+0000,2021-03-24T14:00:00.000+0000,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
4,12,22,3,OO,N869AS,5834,16869,XWA,"Williston, ND",ND,11292,DEN,"Denver, CO",CO,1520,-2.0,0.0,1500-1559,52.0,8.0,0.0,0.0,582.0,3,,,,,,medium_airport,2344.0,yes,2021-12-22,2021-12-22T15:20:00.000+0000,America/Chicago,2021-12-22T21:20:00.000+0000,2021-12-22T20:20:00.000+0000,2021-12-22T23:20:00.000+0000,2021-12-22T21:00:00.000+0000,2021-12-22T20:00:00.000+0000,2021-12-22T23:00:00.000+0000,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
4,12,22,3,OO,N917EV,4260,16869,XWA,"Williston, ND",ND,13487,MSP,"Minneapolis, MN",MN,1520,45.0,45.0,1500-1559,13.0,16.0,0.0,0.0,553.0,3,19.0,0.0,0.0,0.0,0.0,medium_airport,2344.0,yes,2021-12-22,2021-12-22T15:20:00.000+0000,America/Chicago,2021-12-22T21:20:00.000+0000,2021-12-22T20:20:00.000+0000,2021-12-22T23:20:00.000+0000,2021-12-22T21:00:00.000+0000,2021-12-22T20:00:00.000+0000,2021-12-22T23:00:00.000+0000,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
4,12,23,4,UA,N77295,117,14955,SPN,"Saipan, TT",TT,12016,GUM,"Guam, TT",TT,720,-12.0,0.0,0700-0759,10.0,4.0,0.0,0.0,129.0,1,,,,,,medium_airport,215.0,yes,2021-12-23,2021-12-23T07:20:00.000+0000,Pacific/Saipan,2021-12-22T21:20:00.000+0000,2021-12-22T20:20:00.000+0000,2021-12-22T23:20:00.000+0000,2021-12-22T21:00:00.000+0000,2021-12-22T20:00:00.000+0000,2021-12-22T23:00:00.000+0000,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
2,4,8,3,AS,N594AS,55,10754,BRW,"Barrow, AK",AK,11630,FAI,"Fairbanks, AK",AK,1148,0.0,0.0,1100-1159,13.0,3.0,0.0,0.0,503.0,3,,,,,,medium_airport,44.0,yes,2015-04-08,2015-04-08T11:48:00.000+0000,America/Anchorage,2015-04-08T19:48:00.000+0000,2015-04-08T18:48:00.000+0000,2015-04-08T21:48:00.000+0000,2015-04-08T19:00:00.000+0000,2015-04-08T18:00:00.000+0000,2015-04-08T21:00:00.000+0000,7.0,29.55,1.0,4.0,T,-SN:03 BL:5 SN:03 |s SN s |BLSN,,,88.0,OVC:08 14,29.56,29.54,0.25,3.0,060,34.0,28.0,,,,,,,,,,,29.53,1.0,5.0,0.01,-SN:03 BL:5 SN:03 |SN |BLSN,,,85.0,BKN:07 28 OVC:08 38,,29.52,0.75,4.0,060,,25.0,,,,,,,,,,
1,3,11,5,AS,N535AS,55,10754,BRW,"Barrow, AK",AK,11630,FAI,"Fairbanks, AK",AK,1209,43.0,43.0,1200-1259,4.0,3.0,0.0,0.0,503.0,3,37.0,0.0,0.0,0.0,0.0,medium_airport,44.0,yes,2016-03-11,2016-03-11T12:09:00.000+0000,America/Anchorage,2016-03-11T21:09:00.000+0000,2016-03-11T20:09:00.000+0000,2016-03-11T23:09:00.000+0000,2016-03-11T21:00:00.000+0000,2016-03-11T20:00:00.000+0000,2016-03-11T23:00:00.000+0000,7.0,29.91,-15.0,-11.0,0.00,BR:1 ||,-0.02,2.0,82.0,CLR:00,29.92,29.9,4.00,-11.0,050,,14.0,,,,,,,,,,,29.92,-12.0,-8.0,0.00,BR:1 ||,-0.01,0.0,82.0,CLR:00,29.93,29.91,3.00,-8.0,040,,15.0,,,,,,,,,,


As noted from cell 16, we should see the number of airports at 388, and as from this run we see this matches.

In [0]:
df_main.createOrReplaceTempView('new_airport_count_post_join')
new_airport_count_post_join = sqlContext.sql(""" select COUNT(DISTINCT ORIGIN_AIRPORT_ID) as new_airport_count_post_join from airport_count """).persist()
display(new_airport_count_post_join)

new_airport_count_post_join
388


# Next Steps
In phase III we are expanding on cleaning and adding in more predictive features to our dataset (e.g. related to percentage of flights missed by a plane in the last 3 months, a Covid-19 indicator, etc.). This can be found <a href="https://adb-731998097721284.4.azuredatabricks.net/?o=731998097721284#notebook/1325974983871287/command/1325974983871304" target="_blank">here<a/>.