## NOTEBOOK 3: Airline Delays - Feature Engineering and Table Joins
This notebook contains our feature engineering work. 
We start by reading the raw data with added information from our delta tables (INSERT LINK), as listed below:
1. Weather Table includes a column for 'Airport_code' to help join with Airlines data table
2. Airlines table include additional columns for date and time stamp attributes in UTC that will be used for feature engineering.

Next, we perform transformations, feature engineering, and feature selection

At last, we join our various datasets to produce a new dataset where each record contains all necessary data to be passed into our models. Finally we save off joined tables in both Delta tables and Parquet format for flexibility.

## Set Up and Imports

In [0]:
# package imports
from pyspark.sql import SQLContext
import pyspark.sql.functions as f
import pandas as pd

from pyspark.sql.functions import col, size, lag, udf,monotonically_increasing_id#, to_timestamp, max
from pyspark.sql.types import IntegerType, StringType#, StructType, StructField, DoubleType, NullType, ShortType, DateType, BooleanType, BinaryType
from pyspark.sql import types

from pyspark.sql.window import Window
from datetime import datetime#, timedelta

In [0]:
# initialize the sql context
sqlContext = SQLContext(sc)

In [0]:
from pyspark.sql.functions import col, max

blob_container = "w261group5container" # The name of your container created in https://portal.azure.com
storage_account = "w261team5storage" # The name of your Storage account created in https://portal.azure.com
secret_scope = "w261_group_05" # The name of the scope created in your local computer using the Databricks CLI
secret_key = "w261_group_05_key" # The name of the secret key created in your local computer using the Databricks CLI 
blob_url = f"wasbs://{blob_container}@{storage_account}.blob.core.windows.net"
mount_path = "/mnt/mids-w261"

In [0]:
spark.conf.set(
  f"fs.azure.sas.{blob_container}.{storage_account}.blob.core.windows.net",
  dbutils.secrets.get(scope = secret_scope, key = secret_key)
)

## Read Airline data from source

In [0]:
# read in raw data
airlines = spark.read.option("header", "true").parquet("dbfs:/mnt/mids-w261/datasets_final_project/parquet_airlines_data/20*.parquet")

#### STEP 1: Filter Our Cancelled and Diverted flight records

Since 'Cancelled' and 'Diverted' flights constitute insignificant share in the airlines dataset and require further analysis to extract delay related information, we exclude those to stay within defined scope of this effort.

In [0]:
# REMOVE Cancelled and Diverted Flights
airlines_wo_cancel_div = airlines.where((col("cancelled")!= 1) | (col("diverted") != 1))

#### STEP 2: Imputation of target variable 'dep_del15'

We noticed that there are records where target variable 'dep_del15' is null but the scheduled and actual departure are same. We imputed such records with target variable value of 0, indicating that flight was NOT deplayed.

In [0]:
# Fix Nulls for Target Variable where scheduled departure time matches 
airlines_target_null_fixed = airlines_wo_cancel_div.withColumn('DEP_DEL15', \
                            f.when((airlines_wo_cancel_div['DEP_DEL15'].isNull()) & \
                                 (airlines_wo_cancel_div['CRS_DEP_TIME'] == \
                                  airlines_wo_cancel_div['DEP_TIME']),0) \
                           .when((airlines_wo_cancel_div['DEP_DEL15'].isNull()) & \
                                 (airlines_wo_cancel_div['CRS_DEP_TIME'] != \
                                  airlines_wo_cancel_div['DEP_TIME']),None) \
                           .otherwise(airlines_wo_cancel_div['DEP_DEL15']))

#### STEP 3: Filter our records where target variable is still null after imputation

In [0]:
# Keep records with NOT Null Target Variables
airlines_target_not_null = airlines_target_null_fixed.where(col('DEP_DEL15').isNotNull())

#### STEP 4: Identify unique city name to identify Time Zone for UTC conversion

there are cities of origina and destination airports which have multiple city names separated using '/'. We need to pick first to facilitate the Time zone identification.

In [0]:
# Split cities to find unique in next step. Split by ','
airlines_split_cities = airlines_target_not_null.withColumn('DEST_CITIES', \
                                f.split(airlines_target_not_null['DEST_CITY_NAME'],',').getItem(0)) \
                                  .withColumn('DEST_STATE', \
                                f.split(airlines_target_not_null['DEST_CITY_NAME'],',').getItem(1)) \
                                  .withColumn('ORIGIN_CITIES', \
                                f.split(airlines_target_not_null['ORIGIN_CITY_NAME'],',').getItem(0)) \
                                  .withColumn('ORIGIN_STATE', \
                                f.split(airlines_target_not_null['ORIGIN_CITY_NAME'],',').getItem(1))

In [0]:
# Clean Cities to identify unique city, drop temporary columns. Split multiple cities by '/'
airlines_unique_cities = airlines_split_cities.withColumn('UNQ_DEST_CITY', \
                                f.split(airlines_split_cities['DEST_CITIES'],'/').getItem(0)) \
                                  .withColumn('UNQ_ORIGIN_CITY', \
                                f.split(airlines_split_cities['ORIGIN_CITIES'],'/').getItem(0)) \
                                  .withColumn('UNQ_DEST', \
                                f.concat(f.col('UNQ_DEST_CITY'), f.lit(','), f.col('DEST_STATE'))) \
                                  .withColumn('UNQ_ORIGIN', \
                                f.concat(f.col('UNQ_ORIGIN_CITY'), f.lit(','), f.col('ORIGIN_STATE'))) \
                                  .drop('DEST_CITIES','DEST_STATE','UNQ_DEST_CITY', \
                                        'ORIGIN_CITIES','ORIGIN_STATE','UNQ_ORIGIN_CITY')

#### STEP 5: Identify unique ID for flight records

In [0]:
airlines_flightID = airlines_unique_cities.withColumn("flightID", f.monotonically_increasing_id())

#### STEP 6: Keep selective columns

In [0]:
airline_cols = ['flightID','YEAR','QUARTER','MONTH','DAY_OF_WEEK','FL_DATE','OP_UNIQUE_CARRIER',
                'TAIL_NUM','ORIGIN_AIRPORT_ID','ORIGIN','ORIGIN_CITY_NAME','UNQ_ORIGIN',
                'DEST_AIRPORT_ID','DEST','DEST_CITY_NAME','UNQ_DEST','CRS_DEP_TIME', 'DEP_TIME',
                'DEP_DELAY','DEP_DEL15','TAXI_OUT','CRS_ARR_TIME','ARR_TIME','ARR_DELAY','ARR_DEL15',
                'CANCELLED','DIVERTED','DISTANCE', 'DISTANCE_GROUP','CARRIER_DELAY', 'WEATHER_DELAY',
                'NAS_DELAY','SECURITY_DELAY', 'LATE_AIRCRAFT_DELAY']

In [0]:
airlines = airlines_flightID.select(*airline_cols)

#### STEP 7: Use external source to identify Airport Code  and corresponding TimeZone in 'City, State' format

In [0]:
airport_codes = pd.read_csv('https://raw.githubusercontent.com/jpatokal/openflights/master/data/airports.dat',header=None)

In [0]:
airport_codes = airport_codes.rename(columns={0:'AirportID',1:'Name',2:'City',3:'Country',4:'IATA',5:'ICAO',6:'Latitude',7:'Longitude',8:'Altitude',9:'Timezone',10:'DST',11:'db_timezone',12:'Type',13:'source'})
airport_codes = spark.createDataFrame(airport_codes)

In [0]:
airlines_w_codes = airlines.join(airport_codes, airlines.ORIGIN == airport_codes.IATA)

In [0]:
# Get timezone to airport table
airport_tz = pd.read_csv('https://raw.githubusercontent.com/hroptatyr/dateutils/tzmaps/iata.tzmap',sep='\t',header=None)\
                 .rename(columns={0:'IATA',1:'timezone_airport'})

In [0]:
airport_tz_df = spark.createDataFrame(airport_tz)
impute_tz = [['XWA', 'America/Chicago']]
impute_tz_df = spark.createDataFrame(impute_tz)
airport_tz_df_full = airport_tz_df.union(impute_tz_df)

In [0]:
# Join timezone to airport for dest and origin
airlines_tz_origin = airlines_w_codes.join(airport_tz_df_full, airlines_w_codes.ORIGIN == \
                                           airport_tz_df_full.IATA)\
                                    .select('flightID',f.col('timezone_airport').alias('origin_tz'))

airlines_tz_dest = airlines_w_codes.join(airport_tz_df_full, airlines_w_codes.DEST == \
                                         airport_tz_df_full.IATA)\
                                  .withColumnRenamed('timezone_airport', 'dest_tz')

airlines_tz_both = airlines_tz_origin.join(airlines_tz_dest, on='flightID', how='inner')

In [0]:
#rename columns
airlines_w_codes = airlines_tz_both.withColumnRenamed("Latitude","Latitude_origin")\
                               .withColumnRenamed("Longitude","Longitude_origin")

In [0]:
#make copy of airport codes to be used for destinations
airport_codes_renamed = airport_codes.withColumnRenamed("Latitude","Latitude_dest")\
                                       .withColumnRenamed("Longitude","Longitude_dest")\
                                       .withColumnRenamed('Altitude','Altitude_dest')\
                                       .withColumnRenamed('IATA','IATA_dest')\
                                       .select('IATA_dest','Latitude_dest','Longitude_dest','Altitude_dest')

In [0]:
airlines_w_codes = airlines_w_codes.join(airport_codes_renamed, airlines.DEST == airport_codes_renamed.IATA_dest)

#### STEP 8: Create UTC timestamp related columns for use in Feature Engineering

In [0]:
# convert Key Time stamps to UTC
airlines_tz_both = airlines_w_codes.withColumn('sch_departure_UTC', \
                          f.to_utc_timestamp(f.to_timestamp(f.concat(f.col('FL_DATE'), \
                          f.lpad(airlines_tz_both.CRS_DEP_TIME, 4, '0')), format='yyyy-MM-ddHHmm'), \
                          f.col('origin_tz'))) \
                                   .withColumn("sch_dep_hour_UTC", \
                          f.hour('sch_departure_UTC')) \
                                    .withColumn('act_departure_UTC', \
                          f.to_utc_timestamp(f.to_timestamp(f.concat(f.col('FL_DATE'), \
                          f.lpad(airlines_tz_both.DEP_TIME, 4, '0')), format='yyyy-MM-ddHHmm'), \
                          f.col('origin_tz'))) \
                                   .withColumn('sch_arrival_UTC', \
                          f.to_utc_timestamp(f.to_timestamp(f.concat(f.col('FL_DATE'), \
                          f.lpad(airlines_tz_both.CRS_ARR_TIME, 4, '0')), format='yyyy-MM-ddHHmm'), \
                          f.col('dest_tz'))) \
                                   .withColumn('act_arrival_UTC', \
                          f.to_utc_timestamp(f.to_timestamp(f.concat(f.col('FL_DATE'), \
                          f.lpad(airlines_tz_both.ARR_TIME, 4, '0')), format='yyyy-MM-ddHHmm'), \
                          f.col('dest_tz'))) \
                                   .withColumn('sch_dep_minus_2_UTC', \
                          f.to_utc_timestamp((f.unix_timestamp(f.to_timestamp(f.concat(f.col('FL_DATE')\
                          ,f.lpad(airlines_tz_both.CRS_DEP_TIME, 4, '0')), format='yyyy-MM-ddHHmm')) - \
                          2*3600).cast('timestamp'),f.col('origin_tz'))) \
                                   .withColumn('sch_dep_minus_3_UTC', \
                          f.to_utc_timestamp((f.unix_timestamp(f.to_timestamp(f.concat(f.col('FL_DATE')\
                          ,f.lpad(airlines_tz_both.CRS_DEP_TIME, 4, '0')), format='yyyy-MM-ddHHmm')) - \
                          2*5400).cast('timestamp'),f.col('origin_tz')))

In [0]:
airlines_tz_both = airlines_tz_both.withColumn('sch_dep_minus_2HOUR_UTC',\
                          f.hour(airlines_tz_both.sch_dep_minus_2_UTC))\
                                   .withColumn('sch_dep_minus_3HOUR_UTC',\
                          f.hour(airlines_tz_both.sch_dep_minus_3_UTC))

In [0]:
# Creat etemp view for feature engineering
airlines_tz_both.createOrReplaceTempView("airlines_tz_both")

#### STEP 9: Feature Engineering on Airlines Data
In this section we take the basic airlines data above and perform some feature engineering to add a few columns we think will be useful. We focus on two categories of delays - 'Cascade delays' and 'Root cause delays'. The `delays_by_airport` and `delays_by_carrier` (also grouped by airport) features attempt to capture root cause delays. The `cascade_delay` feature joins on `tail_num` in order to capture previous delays for the same physical airplane on a given day.

##### STEP 9.a.: ROOT CAUSE DELAYS

##### Aggregate various delay types by Airport

In [0]:
delays_by_airport_total = airlines_tz_both.select('origin', 'flightID','dep_delay','dep_del15','taxi_out',\
                                            'weather_delay','nas_delay','security_delay','late_aircraft_delay')\
                                     .groupby('origin')\
                                     .agg(f.coalesce(f.count('flightID'),f.lit(0)).alias('num_flights_airport'),
                                          f.coalesce(f.round(f.avg('dep_delay'),2),f.lit(0)).alias('avg_airport_dep_delay'),
                                          f.coalesce(f.round(f.avg('dep_del15'),2),f.lit(0)).alias('pct_airport_dep_del15'),
                                          f.coalesce(f.round(f.avg('taxi_out'),2),f.lit(0)).alias('avg_airport_taxi_time'),
                                          f.coalesce(f.round(f.avg('weather_delay'),2),f.lit(0)).alias('avg_airport_weather_delay'),
                                          f.coalesce(f.round(f.avg('nas_delay'),2),f.lit(0)).alias('avg_airport_nas_delay'),
                                          f.coalesce(f.round(f.avg('security_delay'),2), f.lit(0)).alias('avg_airport_security_delay'),
                                          f.coalesce(f.round(f.avg('late_aircraft_delay'),2),f.lit(0)).alias('avg_airport_late_aircraft_delay'))

In [0]:
delays_by_airport_total.createOrReplaceTempView("delays_by_airport_total")

##### Hourly Average by Airport

In [0]:
delays_by_airport_by_hour = airlines_tz_both \
                              .select('origin', 'flightID','sch_dep_hour_UTC','dep_delay','dep_del15','taxi_out',\
                                     'weather_delay','nas_delay','security_delay','late_aircraft_delay')\
                              .groupby('origin','sch_dep_hour_UTC')\
                              .agg(f.coalesce(f.count('flightID'), f.lit(0)).alias('num_flights_airport_hour'),
                                   f.coalesce(f.round(f.avg('dep_delay'),2), f.lit(0)).alias('avg_airport_hour_dep_delay'),
                                   f.coalesce(f.round(f.avg('dep_del15'),2), f.lit(0)).alias('pct_airport_hour_dep_del15'),
                                   f.coalesce(f.round(f.avg('taxi_out'),2), f.lit(0)).alias('avg_airport_hour_taxi_time'),
                                   f.coalesce(f.round(f.avg('weather_delay'),2), f.lit(0)).alias('avg_airport_hour_weather_delay'),
                                   f.coalesce(f.round(f.avg('nas_delay'),2),f.lit(0)).alias('avg_airport_hour_nas_delay'),
                                   f.coalesce(f.round(f.avg('security_delay'),2), f.lit(0)).alias('avg_airport_hour_security_delay'),
                                   f.coalesce(f.round(f.avg('late_aircraft_delay'),2), f.lit(0)).alias('avg_airport_hour_late_aircraft_delay'))

In [0]:
delays_by_airport_by_hour.createOrReplaceTempView("delays_by_airport_by_hour")

In [0]:
# Aggregate delays by airport and hour
sqlContext.sql("""
DROP VIEW IF EXISTS delays_by_airport
""")
sqlContext.sql("""
CREATE TEMPORARY VIEW delays_by_airport
AS
SELECT
  a.origin,
  a.sch_dep_hour_UTC AS hour,
  a.num_flights_airport_hour / at.num_flights_airport AS num_flights,
  a.avg_airport_hour_dep_delay / IF(at.avg_airport_dep_delay == 0, 0.1, at.avg_airport_dep_delay) AS avg_dep_delay,
  a.pct_airport_hour_dep_del15 / IF(at.pct_airport_dep_del15 == 0, 0.1, at.pct_airport_dep_del15) AS pct_dep_del15,
  a.avg_airport_hour_taxi_time / IF(at.avg_airport_taxi_time == 0, 0.1, at.avg_airport_taxi_time) AS avg_taxi_time,
  a.avg_airport_hour_weather_delay / IF(at.avg_airport_weather_delay == 0, 0.1, at.avg_airport_weather_delay) AS avg_weather_delay,
  a.avg_airport_hour_nas_delay / IF(at.avg_airport_nas_delay == 0, 0.1, at.avg_airport_nas_delay) AS avg_nas_delay,
  a.avg_airport_hour_security_delay / IF(at.avg_airport_security_delay == 0, 0.1, at.avg_airport_security_delay) AS avg_security_delay,
  a.avg_airport_hour_late_aircraft_delay / IF(at.avg_airport_late_aircraft_delay == 0, 0.1, at.avg_airport_late_aircraft_delay) AS avg_late_aircraft_delay
FROM delays_by_airport_by_hour AS a
INNER JOIN delays_by_airport_total AS at ON
  a.origin = at.origin
""")

In [0]:
delays_by_airport = sqlContext.sql("""
SELECT * FROM delays_by_airport
""")

##### Hourly Average by Airline Carrier

Aggregate delays by airport, carrier, and hour

In [0]:
delays_by_airport_hour_carrier = airlines_tz_both \
                                     .select('flightID','origin', 'op_unique_carrier',\
                                             'sch_dep_hour_UTC','dep_delay','carrier_delay')\
                                     .groupby('origin','sch_dep_hour_UTC', 'op_unique_carrier')\
                                     .agg(f.count('flightID').alias('num_flights'),\
                                          f.coalesce(f.round(f.avg('dep_delay'),2),f.lit(0)).alias('avg_dep_delay'),\
                                          f.coalesce(f.round(f.avg('carrier_delay'),2),f.lit(0)).alias('avg_carrier_delay'))

In [0]:
delays_by_airport_hour_carrier.createOrReplaceTempView('delays_by_airport_hour_carrier')

#### STEP 9.b.: CASCADE DELAY

Cascade Delay is defined as indicator whether one of the two most recent flights for the same aircraft within last 7 hours were delayed 
and the that lead to the delay of current flight as well. Since we need to predict two hour in advance of the upcoming flight, we need to
make observations at T-2 hour of scheduled flight time, where T is the scheduled flight time.
We define this categorical feature by defining below new features:
#####1. dep_diff_1_before: time, in seconds, between current departure and most recent previous flight
#####2. dep_diff_2_before: time, in seconds, between current departure and most recent 2nd previous flight
#####3. delayed_1_before: indicator whether previous, most recent, flight was delayed(1) or not(0)
#####4. delayed_2_before: indicator whether previous, 2nd most recent, flight was delayed(1) or not(0)
#####5. CASCADE_DELAY: indicator whether current flight is delayed given one of planes 2 most recent were delayed or not

In [0]:
airlines_tz_both.createOrReplaceTempView('airlines_temp')

In [0]:
plane_tracking = airlines_tz_both.orderBy("tail_num","fl_date", "sch_departure_UTC")

#### STEP 10: Define window over which partition by, in this case the specific airplane.

In [0]:
win = Window.partitionBy('tail_num').orderBy('sch_departure_UTC')

In [0]:
tail_tracking_diff = plane_tracking.withColumn('dep_diff_1_before', \
                                               col('sch_departure_UTC').cast('long')-lag('sch_departure_UTC', 1) \
                                               .over(win).cast('long')) \
                                    .withColumn('delayed_1_before', lag('dep_del15',1).over(win)) \
                                    .withColumn('dep_diff_2_before', \
                                               col('sch_departure_UTC').cast('long')-lag('sch_departure_UTC', 2) \
                                               .over(win).cast('long')) \
                                    .withColumn('delayed_2_before', lag('dep_del15',2).over(win)) \
                                    .withColumn('arr_time_1_before', \
                                               col('sch_departure_UTC').cast('long')-lag('sch_arrival_UTC', 1) \
                                               .over(win).cast('long')) \
                                    .withColumn('arr_time_2_before', \
                                               col('sch_departure_UTC').cast('long')-lag('sch_arrival_UTC', 1) \
                                               .over(win).cast('long'))

#### STEP 11: Define CASCASE DELAY:
Depending upon time gap between current flight and most recent one, assign 1 if recent flight one before was delayed 
AND was outside of 2 hour window from current flight.
If recent one was outside of 2 hour window, observe 2nd most recent flight of the same plane, if happened betwen last 7 hours, 
AND assign 1 if that one was delayed, 0 if not.

We are not interested in arrivals of the same flight outside of 5 hours for recent one or 7 hours for 2nd most recent one, 
We assign 0 in above cases. This is because we dont care if the previous flights were yesterday or before as it should not have 
caused delay of current flight because there is sufficient time for plane to recover from previous delays if any.

In [0]:
tail_tracking_diff_NEW = tail_tracking_diff.withColumn( \
                                                       'CASCADE_DELAY', \
                                                       f.when(((f.col('dep_diff_1_before') >= 7600) & \
                                                             (f.col('arr_time_1_before') <=18000)), \
                                                            f.col('delayed_1_before')) \
                                                      .when(((f.col('dep_diff_1_before') >= 7600) & \
                                                             (f.col('arr_time_1_before') >18000)), \
                                                            int(0))
                                                      .when(((f.col('dep_diff_1_before') < 7600) & \
                                                             (f.col('arr_time_2_before') <= 25200)), \
                                                             f.col('delayed_2_before'))
                                                      .otherwise(int(0))
                                                      ) \
                                            .withColumn('ID', monotonically_increasing_id())

In [0]:
tail_tracking_diff_NEW_no_dups = tail_tracking_diff_NEW.dropDuplicates(['id'])

#### STEP 12: Select final set of features and columns for Joins

In [0]:
airlines_FE = tail_tracking_diff_NEW_no_dups.select('year', 'quarter', 'month', 'day_of_week', 'fl_date', 'op_unique_carrier', 'tail_num', 'origin_airport_id', 'origin', 'origin_city_name', 'dest_airport_id', 'dest', 'dest_city_name', 'crs_dep_time', 'dep_time', 'dep_delay', 'dep_del15', 'cancelled', 'diverted', 'distance', 'distance_group', 'unq_dest', 'unq_origin', 'carrier_delay', 'weather_delay', 'nas_delay', 'security_delay', 'late_aircraft_delay', 'taxi_out', 'dest_tz', 'origin_tz', 'sch_dep_hour_UTC', 'sch_dep_minus_2_UTC', 'sch_dep_minus_2HOUR_UTC', 'sch_dep_minus_3_UTC', 'sch_dep_minus_3HOUR_UTC', 'sch_departure_UTC', 'flightID', 'dep_diff_1_before', 'dep_diff_2_before', 'delayed_1_before', 'delayed_2_before', 'CASCADE_DELAY')

## Read and process Weather Data

In [0]:
weather_raw = spark.read.option("header", "true").parquet('dbfs:/mnt/mids-w261/datasets_final_project/weather_data/weather20*.parquet')

#### STEP 13: Filter weather data to exclude Non-US data since project scope is limited to US only

Original raw weather data includes 626,994,336 records and locations all over the world.

In [0]:
weather_raw_US = weather_raw.filter(f.col('NAME').endswith('US'))

#### STEP 14: Exclude duplicate records for the multiple report types
Weather data includes 9 different report types. We exclude report types listed below because we identified duplicate records for these report types. Also these report types were excluded instead of other ones since these represented minority share of overall weather data.

In [0]:
weather_US_report_type = weather_raw_US.filter((f.col('REPORT_TYPE') != 'SOM  ') & 
                                         (f.col('REPORT_TYPE') != 'SOD  ') & 
                                         (f.col('REPORT_TYPE') != 'CRN05'))

#### STEP 15: Select relevant columns to exclude ones with 50% or more null records

As we obsered during EDA, majority of the weather data column had no values or nulls. In fact majority of the columns had 75% or more null records. 

With assumption that columns with 50% or more nulls would not represent the feature and we will have to synthesize using imputation, we chose to ignore such features

In [0]:
weather_cols = ['STATION','DATE','REPORT_TYPE','CALL_SIGN','LATITUDE', 
                                      'LONGITUDE','ELEVATION','WND','CIG','VIS','TMP',
                                      'DEW', 'SLP']

weather_selective_columns = weather_US_report_type.select(*weather_cols)

#### STEP 16: Define a function to extract airport code for weather station using `CALL_SIGN`

In [0]:
def create_airport_code_stations(call_code):
  """
  This method creates an airport code from the call sign in the weather data.
  Call signs that start with 'K' correspond to weather stations at airports and match airport codes.
  If the call sign either does not start with K or is less than 5 characters the airport code will be blank.
  """
  try:
    if call_code[0] == 'K':
      airport_code = call_code[1:4]
    else:
      airport_code = ''
  except:
    airport_code = ''
  return airport_code

In [0]:
# convert function to udf
create_airport_code_stations_udf = udf(create_airport_code_stations, types.StringType())
# add airport code to weather dataset
weather_airport_added = weather_selective_columns.withColumn("airport_code", create_airport_code_stations_udf('CALL_SIGN'))
# filter weather to records with valid airport code
weather_airport_added = weather_airport_added.where(col("airport_code") != '')

#### STEP 17: Define functions to parse key weather columns to extract important features

We noticed that most of the weather attributes have more than one sub-feature seperated by a ',' and concatenated into one column. We process each relevant column below using user defined functions

#### Step 17.1. Wind (WND) Parsing: WIND-OBSERVATION 
    a. Split column on ','
    b. Wind_Direction_Angle: 
        i. Cast column as integer type
        ii. Replace '999' (i.e. Missing) with 'None'
    c. Wind_Direction_Quality:
        i. Cast column as integer type
    d. Wind_Type_Code:
        i. KEEP AS original data type, i.e. String
        ii. Replace '9' (i.e. Missing) with 'None'
    e. Wind_Speed_Rate:
        i. Cast column as integer type
        ii. Replace '9999' (i.e. Missing) with 'None'
    f. Wind_speed_Quality:
        i. Cast column as integer type
    g. Drop rows where wind direction quality or wind speed quality is erroneous (code = 7)
    h. Drop original column after features extraction

In [0]:
def parse_wind_column(df, col_name = "WND", splitter = ',', missing = ''):
  """Function to parse wind column to extract features and assign proper data type"""
  
  """Split the column using specified splitter"""
  splitted = f.split(df[col_name], splitter)

  "1st item: Wind Direction Angle"
  missing = 999
  df = df.withColumn(col_name+'_dir_angle', splitted.getItem(0).cast('integer'))
  missing_udf = udf(lambda x: None if x == missing else x)
  df = df.withColumn(col_name+'_dir_angle', missing_udf(df[col_name+'_dir_angle']))
  
  "2nd item: Wind Direction Quality"
  df = df.withColumn(col_name+'_dir_quality', splitted.getItem(1).cast('integer'))
  
  "3rd item: Wind Type Code"
  missing = 9
  df = df.withColumn(col_name+'_type_code', splitted.getItem(2))
  missing_udf = udf(lambda x: None if x == missing else x)
  df = df.withColumn(col_name+'_type_code', missing_udf(df[col_name+'_type_code']))
  

  "4th item: Wind Speed Rate"
  missing = 9999
  df = df.withColumn(col_name+'_speed_rate', splitted.getItem(3).cast('integer'))
  missing_udf = udf(lambda x: None if x == missing else x)
  df = df.withColumn(col_name+'_speed_rate', missing_udf(df[col_name+'_speed_rate']))
  
  "5th item: Wind Speed Quality"
  df = df.withColumn(col_name+'_speed_quality', splitted.getItem(4).cast('integer'))
  
  "Drop rows with erroneus wind related records"
  df = df.filter((df[col_name+'_dir_quality'] != 7) & (df[col_name+'_speed_quality'] != 7))
  
  return df.drop('WND')

#### Step 17.2 CIG Parsing : SKY-CONDITION-OBSERVATION ceiling height dimension (The height above ground level (AGL) of the lowest cloud )
    a. Split column on ','
    b. CIG_ceiling_height:
        i. Cast column as integer type
        ii. Replace '99999' (i.e. Missing) with 'None'
    c. CIG_ceiling_quality:
        i. Cast column as integer type
    d. CIG_ceiling_deter:
        i. Replace '9' (i.e. Missing) with 'None'
    e. CIG_ceiling_visibility:
        i. Replace '9' (i.e. Missing) with 'None'
    f. Drop rows where Ceiling height quality is erroneous (code = 3 & 7)
    g. Drop original column after features extraction

In [0]:
def parse_cig_column(df, col_name = "CIG", splitter = ",", missing = ""):
  """Function to parse CIG column to extract features and assign proper data type"""
  
  """Split the column using specified splitter"""
  splitted = f.split(df[col_name], splitter)

  "1st item: Ceiling Height"
  missing = 99999
  df = df.withColumn(col_name+'_ceil_height', splitted.getItem(0).cast('integer'))
  missing_udf = udf(lambda x: None if x == missing else x)
  df = df.withColumn(col_name+'_ceil_height', missing_udf(df[col_name+'_ceil_height']))
  
  "2nd item: Ceiling Height Quality"
  df = df.withColumn(col_name+'_ceil_ht_quality', splitted.getItem(1).cast('integer'))
  
  "3rd item: Ceiling Determination"
  missing = '9'
  df = df.withColumn(col_name+'_ceil_det', splitted.getItem(2))
  missing_udf = udf(lambda x: None if x == missing else x)
  df = df.withColumn(col_name+'_ceil_det', missing_udf(df[col_name+'_ceil_det']))

  "4th item: Ceiling Visibility"
  missing = '9'
  df = df.withColumn(col_name+'_ceil_vis', splitted.getItem(3))
  missing_udf = udf(lambda x: None if x == missing else x)
  df = df.withColumn(col_name+'_ceil_vis', missing_udf(df[col_name+'_ceil_vis']))
  
  "Drop rows with erroneus records"
  df = df.filter((df[col_name+'_ceil_ht_quality'] != 3) & (df[col_name+'_ceil_ht_quality'] != 7))

  return df.drop('CIG')

#### Step 17.3. Visibility (VIS) Parsing:
    a. Split column on ','
    b. VIS_distance:
        i. Cast column as integer type
        ii. Replace '999999' (i.e. Missing) with 'None'
    c. VIS_distance_quality:
        i. Cast column as integer type
    d. VIS_variability:
        i. Replace '9' (i.e. Missing) with 'None'
    e. VIS_variability_quality:
        i. Cast column as integer type
    f. Drop rows where visibility distance quality or Visibility Variability Quality is erroneous (code = 3 & 7)
    g. Drop original column after features extraction

In [0]:
def parse_vis_column(df, col_name = "VIS", splitter = ",", missing = ""):
  """Function to parse Visibility column to extract features and assign proper data type"""
  
  """Split the column using specified splitter"""
  splitted = f.split(df[col_name], splitter)

  "1st item: Visibility Distance"
  missing = 999999
  df = df.withColumn(col_name+'_dist', splitted.getItem(0).cast('integer'))
  missing_udf = udf(lambda x: None if x == missing else x)
  df = df.withColumn(col_name+'_dist', missing_udf(df[col_name+'_dist']))
  
  "2nd item: Visibility Distance Quality"
  df = df.withColumn(col_name+'_dist_quality', splitted.getItem(1).cast('integer'))
  
  "3rd item: Visibility Variability"
  missing = "9"
  missing_udf = udf(lambda x: None if x == missing else x)
  df = df.withColumn(col_name+'_var', missing_udf(splitted.getItem(2)))

  "4th item: Visibility Variability Quality"
  df = df.withColumn(col_name+'_var_qual', splitted.getItem(3).cast('integer'))
  
  "Drop rows with erroneus records"
  df = df.filter((df[col_name+'_dist_quality'] != 3) & (df[col_name+'_dist_quality'] != 7))
  df = df.filter((df[col_name+'_var_qual'] != 3) & (df[col_name+'_var_qual'] != 7))

  return df.drop('VIS')

#### Step 17.4. Temperature (TMP) Parsing:
    a. Split column on ','
    b. TMP_air_temp:
        i. Cast column as integer type
        ii. Replace '+9999' (i.e. Missing) with 'None'
    c. TMP_air_temp_quality:
        i. TAKE AS IS
    d. Drop rows where Air temp Quality is erroneous (code = 3 & 7)
    e. Drop original column after features extraction

In [0]:
def parse_temp_column(df, col_name = "TMP", splitter = ",", missing = ""):
  """Function to parse Tempreture column to extract features and assign proper data type"""
  
  """Split the column using specified splitter"""
  splitted = f.split(df[col_name], splitter)

  "1st item: Air Tempreture"
  missing = 9999
  df = df.withColumn(col_name+'_air_temp', splitted.getItem(0).cast('integer'))
  missing_udf = udf(lambda x: None if x == missing else x)
  df = df.withColumn(col_name+'_air_temp', missing_udf(df[col_name+'_air_temp']))
  
  "2nd item: Air Tempreture Quality"
  df = df.withColumn(col_name+'_air_temp_qual', splitted.getItem(1))

  "Drop rows with erroneus records"
  df = df.filter((df[col_name+'_air_temp_qual'] != '3') & (df[col_name+'_air_temp_qual'] != '7'))

  return df.drop('TMP')

#### Step 17.5. Dew (DEW) Parsing:
    a. Split column on ','
    b. DEW_point_temp:
        i. Cast column as integer type
        ii. Replace '9999' (i.e. Missing) with 'None'
    c. TMP_air_temp_quality:
        i. TAKE AS IS
    d. Drop rows where Dew point temp Quality is erroneous (code = 3 & 7)
    e. Drop original column after features extraction

In [0]:
def parse_dew_column(df, col_name = "DEW", splitter = ",", missing = ""):
  """Function to parse DEW column to extract features and assign proper data type"""
  
  """Split the column using specified splitter"""
  splitted = f.split(df[col_name], splitter)

  "1st item: Dew Point Tempreture"
  missing = 9999
  df = df.withColumn(col_name+'_pt_temp', splitted.getItem(0).cast('integer'))
  missing_udf = udf(lambda x: None if x == missing else x)
  df = df.withColumn(col_name+'_pt_temp', missing_udf(df[col_name+'_pt_temp']))
  
  "2nd item: Dew Point Tempreture Quality"
  df = df.withColumn(col_name+'_pt_temp_qual', splitted.getItem(1))

  "Drop rows with erroneus records"
  df = df.filter((df[col_name+'_pt_temp_qual'] != '3' )& (df[col_name+'_pt_temp_qual'] != '7'))

  return df.drop('DEW')

#### Step 17.6. Pressure (SLP: Sea Level Pressure) Parsing:
    a. Split column on ','
    b. SLP_pressure:
        i. Cast column as integer type
        ii. Replace '99999' (i.e. Missing) with 'None'
    c. SLP_pressure_quality:
        i. Cast column as integer type
    d. Drop rows where Sea level pressure quality is erroneous (code = 3 & 7)
    e. Drop original column after features extraction

In [0]:
def parse_slp_column(df, col_name = "SLP", splitter = ",", missing = ""):
  """Function to parse DEW column to extract features and assign proper data type"""
  
  """Split the column using specified splitter"""
  splitted = f.split(df[col_name], splitter)

  "1st item: Sea Level Pressure"
  missing = 99999
  df = df.withColumn(col_name+'_pressure', splitted.getItem(0).cast('integer'))
  missing_udf = udf(lambda x: None if x == missing else x)
  df = df.withColumn(col_name+'_pressure', missing_udf(df[col_name+'_pressure']))
  
  "2nd item: Sea Level Pressure Quality"
  df = df.withColumn(col_name+'_pres_qual', splitted.getItem(1).cast('integer'))

  "Drop rows with erroneus records"
  df = df.filter((df[col_name+'_pres_qual'] != 3) & (df[col_name+'_pres_qual'] != 7))

  return df.drop('SLP')

#### STEP 18: Apply parsing function to extract features from weather data table

In [0]:
weather_parsed = parse_wind_column(weather_airport_added)
weather_parsed = parse_cig_column(weather_parsed)
weather_parsed = parse_vis_column(weather_parsed)
weather_parsed = parse_temp_column(weather_parsed)
weather_parsed = parse_dew_column(weather_parsed)
weather_parsed = parse_slp_column(weather_parsed)

#### STEP 19: Filter records to drop various quality columns after filter errorneous records

Quality features for various weather features are not relevant anymore since we used those to filter our erraneous records, hence we exclude those.

In [0]:
weather_parsed = weather_parsed.drop('WND_dir_quality','WND_speed_quality','CIG_ceil_ht_quality','CIG_ceil_det' 'VIS_dist_quality', 'VIS_var_qual', 'TMP_air_temp_qual','DEW_pt_temp_qual','SLP_pres_qual')

#### STEP 20: Aggregate multiple records per hour per weather station to nearest hour

Aggregate to get actual hourly per station since there are many records per hour per station due to multiple observations within each hour and multiple report types for same time stamp. 

Aggregation is done as average for numerical features and first value for the categorical features

In [0]:
weather_agg = weather_parsed.withColumn('DAY', f.date_format('DATE', 'dd/MM/yyyy'))\
                            .withColumn('hour', f.hour('DATE'))\
                            .groupBy('DAY', 'hour', 'LATITUDE', 'LONGITUDE')\
                            .agg(f.round(f.avg('WND_dir_angle'),2).alias('WND_dir_angle_avg'), 
                                f.round(f.avg('WND_speed_rate'),2).alias('WND_speed_rate_avg'),
                                f.first('WND_type_code', True).alias('first_WND_type_code'),
                                f.round(f.avg('CIG_ceil_height'),2).alias('avg_CIG_ceil_height'),
                                f.first('CIG_ceil_vis', True).alias('first_CIG_ceil_vis'),
                                f.round(f.avg('VIS_dist'),2).alias('avg_VIS_dist'),
                                f.first('VIS_var', True).alias('first_VIS_var'),
                                f.round(f.avg('TMP_air_temp'),2).alias('avg_TMP_air_temp'),
                                f.round(f.avg('DEW_pt_temp'),2).alias('avg_DEW_pt_temp'),
                                f.round(f.avg('SLP_pressure'),2).alias('avg_SLP_pressure'),
                                f.first('airport_code', True).alias('airport'))\
                             .withColumn('date', f.to_date('DAY', format='dd/MM/yyyy'))\
                             .withColumn("reportID", f.monotonically_increasing_id())

#### STEP 21: Recollect UTC Timstamp

We restore back the UTC timestamp that was lost due to aggregation. This is done using concatenation of of hour and date attributes

In [0]:
weather_time = weather_agg.withColumn('long_hour', f.when(f.length('hour') == 2, f.rpad(weather_agg.hour, 4, '0')).otherwise(f.col('hour')))
weather_time = weather_time.withColumn('long_hour', f.when(f.length('hour') == 1, f.lpad(weather_agg.hour, 2, '0')).otherwise(f.col('long_hour')))
weather_time = weather_time.withColumn('long_hour', f.when(f.length('long_hour') == 2, f.rpad(weather_time.long_hour, 4, '0')).otherwise(f.col('long_hour')))\
                              .withColumn('UTC_timestamp', f.to_timestamp(f.concat(f.col('date'), f.col('long_hour')), format='yyyy-MM-ddHHmm'))

#### STEP 22: Null imputation

Since many of the numerical weather features had significant nulls, we imputed those using aggregated average for past 7 days.

In [0]:
days = lambda i: i * 86400
window_7days = Window.orderBy(f.col('UTC_timestamp').cast('long'))\
                       .rangeBetween(-days(7), 0)

In [0]:
weather_impute = weather_time.withColumn('WND_dir_angle_avg', f.when(f.col('WND_dir_angle_avg').isNull(), f.round(f.avg('WND_dir_angle_avg').over(window_7days), 2))
                                                           .otherwise(f.col('WND_dir_angle_avg')))\
                              .withColumn('WND_speed_rate_avg', f.when(f.col('WND_speed_rate_avg').isNull(), f.round(f.avg('WND_speed_rate_avg').over(window_7days), 2))
                                                            .otherwise(f.col('WND_speed_rate_avg')))\
                              .withColumn('first_WND_type_code', f.when(f.col('first_WND_type_code').isNull(),  f.round(f.avg('first_WND_type_code').over(window_7days), 2))
                                                            .otherwise(f.col('first_WND_type_code')))\
                              .withColumn('avg_CIG_ceil_height', f.when(f.col('avg_CIG_ceil_height').isNull(),  f.round(f.avg('avg_CIG_ceil_height').over(window_7days), 2))
                                                            .otherwise(f.col('avg_CIG_ceil_height')))\
                              .withColumn('first_CIG_ceil_vis', f.when(f.col('first_CIG_ceil_vis').isNull(),  f.round(f.avg('first_CIG_ceil_vis').over(window_7days), 2))
                                                            .otherwise(f.col('first_CIG_ceil_vis')))\
                              .withColumn('avg_VIS_dist', f.when(f.col('avg_VIS_dist').isNull(), f.round(f.avg('avg_VIS_dist').over(window_7days), 2))
                                                            .otherwise(f.col('avg_VIS_dist')))\
                              .withColumn('first_VIS_var', f.when(f.col('first_VIS_var').isNull(),  f.round(f.avg('first_VIS_var').over(window_7days), 2))
                                                            .otherwise(f.col('first_VIS_var')))\
                              .withColumn('avg_TMP_air_temp', f.when(f.col('avg_TMP_air_temp').isNull(),  f.round(f.avg('avg_TMP_air_temp').over(window_7days), 2))
                                                            .otherwise(f.col('avg_TMP_air_temp')))\
                              .withColumn('avg_DEW_pt_temp', f.when(f.col('avg_DEW_pt_temp').isNull(),  f.round(f.avg('avg_DEW_pt_temp').over(window_7days), 2))
                                                            .otherwise(f.col('avg_DEW_pt_temp'))) \
                              .withColumn('avg_SLP_pressure', f.when(f.col('avg_SLP_pressure').isNull(),  f.round(f.avg('avg_SLP_pressure').over(window_7days), 2))
                                                            .otherwise(f.col('avg_SLP_pressure')))

#### STEP 23: JOIN Weather and Airline data using temporary tables using Spark SQL

#### Step 23.1. Create temporary table for Weather and Airline data after data cleansing and Feature Engineering

In [0]:
airlines_FE.createOrReplaceTempView("airlines_feature_engineered")

In [0]:
weather_impute.createOrReplaceTempView("weather")

#### Step 23.2. Join the tables

We join airlines and weather data using:
    1. Join Weather (at BOTH 'Origin' and 'Destination' Airports) with Airline Original on
        a. Airport Code
        b. Date (Flight Date = Weather Report Date) 
        c. Hour (Scheduled Airline Departure - 3 Hours = Weather report Hour)
    2. Join Delays By Airport (at BOTH 'Origin' and 'Destination' Airports) table with Airline Original on
        a. Airport Code
        b. Hour (Scheduled Airline Departure - 3 Hours = Delays by airport Hour)
    3. Join Delays By Airport By Hour By Airline Carrier table with Airline Original on
        a. Airport Code
        b. Hour (Scheduled Airline Departure - 3 Hours = Delays By Airport By Hour By Airline Carrier Hour)

In [0]:
weather_airline_joined = sqlContext.sql("""
SELECT
  f.*,
  IFNULL(dao.num_flights, 0) AS flights_count_origin,
  IFNULL(dad.num_flights, 0) AS flights_count_dest,
  IFNULL(dao.avg_dep_delay, 0) AS avg_dep_delay_origin,
  IFNULL(dad.avg_dep_delay, 0) AS avg_dep_delay_dest,
  IFNULL(dao.pct_dep_del15, 0) AS pct_dep_delayed_origin,
  IFNULL(dad.pct_dep_del15, 0) AS pct_dep_delayed_dest,
  IFNULL(dao.avg_taxi_time, 0) AS avg_taxi_time_origin,
  IFNULL(dad.avg_taxi_time, 0) AS avg_taxi_time_dest,
  IFNULL(dao.avg_weather_delay, 0) AS avg_weather_delay_origin,
  IFNULL(dad.avg_weather_delay, 0) AS avg_weather_delay_dest,
  IFNULL(dao.avg_nas_delay, 0) AS avg_nas_delay_origin,
  IFNULL(dad.avg_nas_delay, 0) AS avg_nas_delay_dest,
  IFNULL(dao.avg_security_delay, 0) AS avg_sec_delay_origin,
  IFNULL(dad.avg_security_delay, 0) AS avg_sec_delay_dest,
  IFNULL(dao.avg_late_aircraft_delay, 0) AS avg_tail_delay_origin,
  IFNULL(dad.avg_late_aircraft_delay, 0) AS avg_tail_delay_dest,
  IFNULL(dco.num_flights, 0) AS flight_count_carrier,
  IFNULL(dco.avg_dep_delay, 0) AS avg_dep_delay_carrier,
  IFNULL(dco.avg_carrier_delay, 0) AS avg_carrier_delay_carrier,
  wo.WND_dir_angle_avg AS WND_dir_angle_avg_origin,
  wd.WND_dir_angle_avg AS WND_dir_angle_avg_dest,
  wo.first_WND_type_code AS first_WND_wind_obs_origin,
  wd.first_WND_type_code AS first_WND_wind_obs_dest,
  wo.WND_speed_rate_avg AS WND_speed_avg_origin,
  wd.WND_speed_rate_avg AS WND_speed_avg_dest,
  wo.avg_CIG_ceil_height AS CIG_avg_ceil_ht_origin,
  wd.avg_CIG_ceil_height AS CIG_avg_ceil_ht_dest,
  wo.first_CIG_ceil_vis AS first_CIG_ceil_vis_origin,
  wd.first_CIG_ceil_vis AS first_CIG_ceil_vis_dest,
  wo.avg_VIS_dist AS avg_VIS_dist_origin,
  wd.avg_VIS_dist AS avg_VIS_dist_dest,
  wo.first_VIS_var AS first_VIS_var_origin,
  wd.first_VIS_var AS first_VIS_var_dest,
  wo.avg_TMP_air_temp AS avg_TMP_air_temp_origin,
  wd.avg_TMP_air_temp AS avg_TMP_air_temp_dest,
  wo.avg_DEW_pt_temp AS avg_DEW_pt_temp_origin,
  wd.avg_DEW_pt_temp AS avg_DEW_pt_temp_dest,
  wo.avg_SLP_pressure AS avg_SLP_pressure_origin,
  wd.avg_SLP_pressure AS avg_SLP_pressure_dest
FROM airlines_feature_engineered AS f
INNER JOIN weather AS wo ON
  f.origin = wo.airport
  AND f.sch_dep_minus_3HOUR_UTC = wo.hour
  AND to_date(f.sch_dep_minus_3_UTC,'yyyy-MM-dd') = to_date(wo.UTC_timestamp,'yyyy-MM-dd')
INNER JOIN weather AS wd ON
  f.dest = wd.airport
  AND f.sch_dep_minus_3HOUR_UTC = wd.hour
  AND to_date(f.sch_dep_minus_3_UTC,'yyyy-MM-dd') = to_date(wd.UTC_timestamp,'yyyy-MM-dd')
INNER JOIN delays_by_airport AS dao ON
  f.origin = dao.origin
  AND f.sch_dep_minus_3HOUR_UTC = dao.hour
INNER JOIN delays_by_airport AS dad ON
  f.dest = dad.origin
  AND f.sch_dep_minus_3HOUR_UTC = dad.hour
INNER JOIN delays_by_airport_hour_carrier AS dco ON
  f.origin = dco.origin
  AND f.sch_dep_minus_3HOUR_UTC = dco.sch_dep_hour_UTC
  AND f.op_unique_carrier = dco.op_unique_carrier
""")

In [0]:
joins = weather_airline_joined.count()
joins

In [0]:
#weather_airline_joined.display()

#### STEP 24: Write Joined Table as Delta Table on Cloud Storage to be used in Models

In [0]:
weather_airline_joined.write.format('delta').saveAsTable('w261_Summer21_Team05_JOINED_FULL')

#### STEP 25: Write Joined Table in parquet on Cloud Storage to be used in Models

In [0]:
#Save df as PARQUET
Team05_PARQUET_JOINED_FULL_path = f"{blob_url}/w261_Summer21_Team05_JOINED_FULL_Parquet"
dbutils.fs.rm(Team05_PARQUET_JOINED_FULL_path, recurse=True)

#overwrite
weather_airline_joined.write.format("parquet").mode("overwrite").save(Team05_PARQUET_JOINED_FULL_path)