## Global Settings

In [0]:
from pyspark.sql.functions import col, substring, split, when, lit, max as pyspark_max, countDistinct, count, mean, sum as pyspark_sum, expr, to_utc_timestamp, to_timestamp, concat, length
from pyspark.sql import SQLContext, Window 
from pyspark.sql.types import IntegerType, StringType, BooleanType, DateType, DoubleType, TimestampType
import pandas as pd
from gcmap import GCMapper, Gradient
import matplotlib.pyplot as plt
from pandas.tseries.holiday import USFederalHolidayCalendar
from datetime import datetime
from pyspark.sql import functions as f

blob_container = "w261team07container" # The name of your container created in https://portal.azure.com
storage_account = "w261team07storage" # The name of your Storage account created in https://portal.azure.com
secret_scope = "w261team07" # The name of the scope created in your local computer using the Databricks CLI
secret_key = "w261team07-key" # The name of the secret key created in your local computer using the Databricks CLI 
blob_url = f"wasbs://{blob_container}@{storage_account}.blob.core.windows.net"
mount_path = "/mnt/mids-w261"

spark.conf.set(
  f"fs.azure.sas.{blob_container}.{storage_account}.blob.core.windows.net",
  dbutils.secrets.get(scope = secret_scope, key = secret_key)
)


## Load Dataset

In [0]:
# Inspect the Mount's Final Project folder
display(dbutils.fs.ls("/mnt/mids-w261/datasets_final_project/"))

path,name,size
dbfs:/mnt/mids-w261/datasets_final_project/airlines/,airlines/,0
dbfs:/mnt/mids-w261/datasets_final_project/airlines_data/,airlines_data/,0
dbfs:/mnt/mids-w261/datasets_final_project/parquet_airlines_data/,parquet_airlines_data/,0
dbfs:/mnt/mids-w261/datasets_final_project/parquet_airlines_data_3m/,parquet_airlines_data_3m/,0
dbfs:/mnt/mids-w261/datasets_final_project/parquet_airlines_data_6m/,parquet_airlines_data_6m/,0
dbfs:/mnt/mids-w261/datasets_final_project/stations_data/,stations_data/,0
dbfs:/mnt/mids-w261/datasets_final_project/weather_data/,weather_data/,0
dbfs:/mnt/mids-w261/datasets_final_project/weather_data_6_hr/,weather_data_6_hr/,0
dbfs:/mnt/mids-w261/datasets_final_project/weather_data_single/,weather_data_single/,0


In [0]:
# data = spark.read.parquet(f"{blob_url}/joined_eda/*")
# data = spark.read.parquet(f"{blob_url}/full_join_2015_v0/*")
# data = spark.read.parquet(f"{blob_url}/full_join_with_aggs_v0/*")
data = spark.read.parquet(f"{blob_url}/model_features_v6/*")


In [0]:
n = data.count()
print("The number of rows are {}".format(n))

## On-The-Fly Feature Engineering

In [0]:
display(data)

dep_is_delayed,canceled,planned_departure_utc,origin_state,origin_city,origin_ICAO,origin_altitude,origin_wnd_type,origin_wnd_speed,origin_cig_cloud_agl,origin_cig_cavok,origin_vis_dist,origin_vis_var,origin_tmp_c,origin_dew_c,origin_slp_p,dest_state,dest_city,dest_ICAO,dest_altitude,carrier,year,quarter,month,day_of_month,day_of_week,dep_hour,arr_hour,planned_duration,flight_distance,distance_group,pct_delayed_from_origin,mean_delay_from_origin,pct_delayed_to_dest,mean_delay_to_dest,pct_delayed_for_route,mean_delay_for_route,pct_delayed_from_state,mean_delay_from_state,pct_delayed_to_state,mean_delay_to_state,oa_avg_del_ind,da_avg_del_ind,carrier_avg_del_ind,poten_for_del,prev_fl_del,nas_window_del_ind,weather_window_del_ind,carrier_window_del_ind,security_window_del_ind,late_ac_window_del_ind,holiday,origin_wnd_type_null,origin_wnd_speed_null,origin_cig_cloud_agl_null,origin_cig_cavok_null,origin_vis_dist_null,origin_vis_var_null,origin_tmp_c_null,origin_dew_c_null,origin_slp_p_null
False,False,2015-01-01T11:55:00.000+0000,TX,"Tyler, TX",KTYR,544.0,N,26.0,1829.0,N,16093.0,N,28.0,6.0,10271.0,TX,"Dallas/Fort Worth, TX",KDFW,607.0,EV,2015,1,1,1,4,0001-0559,0600-0659,50.0,102.0,1,18.21929101401484,6.020610057708161,19.162027181922383,10.410166585889812,19.258821210329195,7.10408042578356,20.916771440066903,10.34018189654612,19.216577192318606,9.440727036672566,-1,0,-1,-1,True,-1,-1,-1,-1,-1,holiday,False,False,False,False,False,False,False,False,False
True,True,2015-01-01T12:00:00.000+0000,AR,"Texarkana, AR",KTXK,390.0,N,15.0,1829.0,N,16093.0,N,28.0,-33.0,10283.0,TX,"Dallas/Fort Worth, TX",KDFW,607.0,MQ,2015,1,1,1,4,0600-0659,0700-0759,60.0,181.0,1,17.222844344904814,6.135274356103023,19.162027181922383,10.410166585889812,17.222844344904814,6.135274356103023,17.32794640698919,8.768813120414329,19.216577192318606,9.440727036672566,-1,0,-1,-1,True,-1,-1,-1,-1,-1,holiday,False,False,False,False,False,False,False,False,False
False,False,2015-01-01T14:05:00.000+0000,WA,"Seattle, WA",KSEA,433.0,N,21.0,12479.802773912874,N,16000.0,9,-11.0,-61.0,10291.0,CA,"San Francisco, CA",KSFO,13.0,AS,2015,1,1,1,4,0600-0659,0800-0859,125.0,679.0,3,16.129214141680922,6.7084862252597,25.83841151810567,15.370273550337396,25.206611570247933,13.775118395394188,15.786905806766386,6.527221454727693,20.968720611599743,10.329385058496566,-1,1,-1,-1,False,-1,-1,-1,-1,-1,holiday,False,False,True,False,False,False,False,False,False
True,False,2015-01-01T11:25:00.000+0000,MD,"Baltimore, MD",KBWI,146.0,C,0.0,22000.0,N,16093.0,N,-50.0,-106.0,10230.0,MN,"Minneapolis, MN",KMSP,841.0,DL,2015,1,1,1,4,0600-0659,0800-0859,171.0,936.0,4,22.143887325064405,10.49078919886618,15.40571797423415,8.17815405516324,16.659359929855327,9.059945205479451,22.150829214760854,10.500544681798484,15.51499295740704,8.257378442592778,-1,0,-1,-1,True,-1,-1,-1,-1,-1,holiday,False,False,False,False,False,False,False,False,False
False,False,2015-01-01T11:40:00.000+0000,FL,"Tampa, FL",KTPA,26.0,N,26.0,12479.802773912874,N,16000.0,9,150.0,122.0,10235.0,NY,"New York, NY",KLGA,21.0,DL,2015,1,1,1,4,0600-0659,0900-0959,154.0,1010.0,5,19.180487096900244,9.805819308381478,26.945589705147427,15.464645187869255,24.84897301651228,16.708014498590416,20.73620691585407,11.050036683785766,24.916361845579637,14.3535372305469,-1,-1,-1,-1,True,-1,-1,-1,-1,-1,holiday,False,False,True,False,False,False,False,False,False
False,False,2015-01-01T12:00:00.000+0000,VA,"Washington, DC",KDCA,15.0,N,15.0,22000.0,N,16093.0,N,-17.0,-111.0,10233.0,FL,"Orlando, FL",KMCO,96.0,B6,2015,1,1,1,4,0700-0759,0900-0959,143.0,759.0,4,19.100854206337097,9.091498787465724,20.67967910868919,10.94146988993651,18.246962301381245,9.279156714092844,18.7980526903322,9.91242879918872,20.265895898289283,10.53228037685095,-1,1,-1,-1,True,-1,-1,-1,-1,-1,holiday,False,False,False,False,False,False,False,False,False
True,False,2015-01-01T14:10:00.000+0000,AZ,"Phoenix, AZ",KPHX,1135.0,N,15.0,12479.802773912874,N,16000.0,9,22.0,-11.0,10179.0,CO,"Denver, CO",KDEN,5431.0,UA,2015,1,1,1,4,0700-0759,0800-0859,107.0,602.0,3,18.533780662728223,8.261370432499453,18.306390573988093,8.998358497534271,19.636372935099477,9.068275967882167,18.17005811626455,8.07152729970168,18.532731624832557,9.249279629313008,-1,0,-1,0,True,-1,-1,-1,-1,-1,holiday,False,False,True,False,False,False,False,False,False
False,False,2015-01-01T13:05:00.000+0000,CT,"Hartford, CT",KBDL,173.0,N,57.0,22000.0,N,16093.0,N,-50.0,-156.0,10181.0,NC,"Charlotte, NC",KCLT,748.0,US,2015,1,1,1,4,0800-0859,1000-1059,135.0,644.0,3,16.38343246530381,7.35613083785202,16.557974220386658,7.017735308741668,12.895498552749777,4.016069468010779,16.48220308458037,7.34015437378756,17.903528650048557,7.991099841105581,0,0,-1,-1,False,-1,-1,-1,-1,-1,holiday,False,False,False,False,False,False,False,False,False
False,False,2015-01-01T16:15:00.000+0000,CA,"Los Angeles, CA",KLAX,125.0,N,31.0,22000.0,N,16093.0,N,28.0,-44.0,10195.0,NY,"New York, NY",KJFK,13.0,B6,2015,1,1,1,4,0800-0859,1600-1659,316.0,2475.0,10,21.48681135009189,10.485671790061964,23.91429521143748,14.871565892935887,21.578725595767597,12.60806529644022,20.23576491502932,9.533154260269752,24.916361845579637,14.3535372305469,0,0,0,-1,True,-1,-1,-1,-1,-1,holiday,False,False,False,False,False,False,False,False,False
False,False,2015-01-01T13:30:00.000+0000,VA,"Washington, DC",KIAD,312.0,N,26.0,22000.0,N,16093.0,N,-67.0,-117.0,10232.0,CA,"Los Angeles, CA",KLAX,125.0,UA,2015,1,1,1,4,0800-0859,1100-1159,340.0,2288.0,10,17.31865794909882,10.568394953516025,19.793827590958823,9.899910379928064,15.583452513374649,7.657767890248424,18.7980526903322,9.91242879918872,20.968720611599743,10.329385058496566,0,0,0,-1,True,1,0,0,0,0,holiday,False,False,False,False,False,False,False,False,False


In [0]:
data.dtypes

In [0]:
# null check
from pyspark.sql.functions import isnan, when, count, col
if False:
  display(data.select([(100 * count(when(isnan(c) | col(c).isNull(), c))/n).alias(c) for c in data.columns if c != "planned_departure_utc"]))


## Helper Functions

In [0]:
from pyspark.sql.functions import percent_rank, to_timestamp
from pyspark.sql import Window
from datetime import datetime, timedelta
from pyspark.sql import functions as F
from pyspark.sql.types import *

# write model to storage
def write_model_to_storage(list_dic, model_class_path, mod_name =''):
  if len(list_dic) == 0:
    raise Exception("Cannot insert empty object into storage")
    
  # add timestamp as key so we can differentiate models of the same type by time
  list_dic_new = []
  now = datetime.now()
  for d in list_dic:
    assert("train" in d.keys())
    assert("test" in d.keys())
    d["timestamp"] = now
    list_dic_new.append(d)
  
  schema = StructType([ \
    StructField("timestamp", TimestampType(), True), \
    StructField("train", StringType(), True), \
    StructField("test", StringType(), True), \
    StructField("val", StringType(), True)])
  
  todf = []
  for d in list_dic_new:
    todf.append((d["timestamp"], d["train"], d["test"], None))
    
  df = spark.createDataFrame(data = todf, schema = schema)
  
  # default model name is based on timestamp - to generate unique name
  if mod_name == '':
    mod_name = str(now).replace(' ', '').replace(':', '').replace('-', '').split('.')[0]
  
  df.write.mode('overwrite').parquet(f"{blob_url}/{model_class_path}/{mod_name}")

def read_model_from_storage(model_path):
  return spark.read.parquet(f"{blob_url}/{model_path}/*")
  
def get_numeric_features(df):
  return [t[0] for t in df.dtypes if t[1] == 'int' or t[1] == 'double']

def get_categorical_features(df):
  return [t[0] for t in df.dtypes if t[1] == 'string']

def get_datetime_features(df):
  return  [t[0] for t in df.dtypes if t[1] == 'timestamp']

def get_boolean_features(df):
  return  [t[0] for t in df.dtypes if t[1] == 'boolean']

def assert_no_other_features_exist(df):
  numeric_features = get_numeric_features(df)
  categorical_features = get_categorical_features(df)
  dt_features = get_datetime_features(df)
  boolean_features = get_boolean_features(df)
  other_features = [t[0] for t in df.dtypes if t[0] not in numeric_features + categorical_features + dt_features + boolean_features]
  assert len(other_features) == 0

def pretty_print_list(elements):
  print("#########################")
  for e in elements:
    print(e)
  print("#########################")
  
def get_feature_dtype(df, colName):
  for t in df.dtypes:
    if t[0] == colName:
      return t[1]
  return None
  
def set_feature_dtype(df, colNames, dtype='string'):
  for colName in colNames:
    currentType = get_feature_dtype(df, colName)
    if currentType == None:      
      raise Exception("Colname is not valid: {}".format(colName))
    
    # preserve existing type
    if currentType == dtype:
      continue
      
    
    # implicit conversion from bool/str to int is not allowed, for some reason - this problem only appears with "dep_is_delayed"
    # we get back nulls for each row if we do a straight conversion to int
    # special case to convert dep_is_delayed to int (needed to be in this form for ML models to work)
    if (currentType == 'string' and colName == "dep_is_delayed") and dtype == 'int':
      
      def convert_to_int(value):
        return 1 if value == "true" else 0
        
      udf_convert = F.udf(convert_to_int, IntegerType())
    
      df = df.withColumn(colName + "_tmp", udf_convert(colName))
      df = df.drop(df.dep_is_delayed)
      df = df.withColumnRenamed(colName + "_tmp", colName)
    
    
    elif dtype == 'string':
      df = df.withColumn(colName, col(colName).cast(StringType()))
    elif dtype == 'int':
      df = df.withColumn(colName, col(colName).cast(IntegerType()))
    elif dtype == 'double':
      df = df.withColumn(colName, col(colName).cast(DoubleType()))
    elif dtype == 'boolean':
      df = df.withColumn(colName, col(colName).cast(BooleanType()))
    elif dtype == 'timestamp':
      df = df.withColumn(colName, to_timestamp(colName))
    else:
      raise Exception("Unsupported data type")
  
  return df

def get_df_for_model(df, splits, index, datatype="train"):
  start_date, end_date = get_dates_from_splits(splits, index, dtype = datatype)
  if verbose:
    print("In method: get_df_for_model - getting back data for data type '{}'. Start date is: {} and End date is: {}".format(datatype, start_date, end_date))
  return get_df(df, start_date, end_date, True)
  
# gets df between 2 given dates
def get_df(df, start_date, end_date, raise_empty=True):
  # assumes that we have access to planned_departure_utc 
  all_columns = [t[0] for t in df.dtypes]
  if "planned_departure_utc" not in all_columns:
    raise Exception("We cannot slice the data by time because we are missing planned_departure_utc")
  
  df = df.filter((col('planned_departure_utc') >= start_date) & (col('planned_departure_utc') <= end_date))
  
  if df.count() == 0 and raise_empty:
    raise Exception("Found 0 records, raising an error as this is not expected")
  
  if verbose:
    print("In method: get_df - getting back data with Start date: {} and End date: {}. Returning {} results".format(start_date, end_date, df.count()))
    
  return df

# contract format depends on function get_timeseries_train_test_splits
def get_dates_from_splits(splits, index, dtype="train"):
  if index >= len(splits):
    raise Exception("Index out of bounds")
    
  split = splits[index]
    
  if dtype == "train":
    # 1st 2 dates are training
    return (split[0], split[1])
  if dtype == "test":
    # next pair is test
    return (split[2], split[3])
  if dtype == "val":
    # last pair is val
    return (split[4], split[5])
  
  # by default return all
  return split

# get rolling or non-rolling time series splits of data
def get_timeseries_train_test_splits(df, rolling=False, roll_months=3, start_year=2015, start_month=1, end_year=2016, end_month=6, train_test_ratio=2, test_months=1):
  if start_year < 2015 or start_year > 2019:
    raise Exception("Invalid date range")
  
  if start_month < 1 or start_month > 12:
    raise Exception("Invalid date range")
  
  if end_month < 1 or end_month > 12:
    raise Exception("Invalid date range")
    
  if start_year > end_year:
    raise Exception("Start year cannot be larger than end year")
  
  if train_test_ratio <= 1 or int(train_test_ratio) != train_test_ratio:
    raise Exception("train_test_ratio must be > 1 and must be int")
  
  assert(test_months >=1 and train_test_ratio > 1 and roll_months >=1)
  
  # assert that we have values for the year and month
  assert(data.filter(data.year.isNull()).count() == 0)
  assert(data.filter(data.month.isNull()).count() == 0)
  
  # format months to 2 numbers - needed for date time parsing
  if start_month <= 9:
    start_month = "0" + str(start_month)
    
  if end_month <= 9:
    end_month = "0" + str(end_month)
  
  
  global_start = "{}-{}-01T00:00:00.000+0000".format(start_year, start_month)
  # why 28? consider february
  global_end = "{}-{}-28T00:00:00.000+0000".format(end_year, end_month)
  
  global_start = datetime.strptime(global_start, '%Y-%m-%dT%H:%M:%S.%f+0000')
  global_end = datetime.strptime(global_end, '%Y-%m-%dT%H:%M:%S.%f+0000')
  
  # check for sufficient data
  # train data is ratio x num months used for testing, hence (test_months * train_test_ratio)
  # validation set and test set have same number of months always, hence (2 * test_months)
  if (global_end - global_start).days < 30 * ((2 * test_months) + (test_months * train_test_ratio)):
    raise Exception("Insufficient data to train on. Please increase date range")
  
  df = df.filter((col('year') >= start_year) & (col('month') >= start_month))
  df = df.filter((col('year') <= end_year) & (col('month') <= end_month))
  
  # create result object - a list of tuple objects
  # tuple object is of the form of dates: (train_start, train_end, test_start, test_end, val_start, val_end)
  result = []
  
  # train is between start (T0) and X days after start, say (T1)
  temp_start_train = global_start
  temp_end_train = global_start + timedelta(days=(test_months * train_test_ratio * 30))

  while (global_end-temp_end_train).days > 0:
    # test is between T1 and Y days after T1, say T2
    temp_start_test = temp_end_train + timedelta(days=1) 
    temp_end_test = temp_start_test + timedelta(days=(test_months * 30))

    # validation is between T2 and Y days after T2, say T3
    temp_start_val = temp_end_test + timedelta(days=1)
    temp_end_val = temp_start_val + timedelta(days=(test_months * 30))

    # add these dates to our result
    result.append((temp_start_train, temp_end_train, temp_start_test, temp_end_test, temp_start_val, temp_end_val))

    # reset new date for ending point for train data and repeat till we reach global end date
    temp_end_train = temp_end_val
    
    # if rolling is enabled, we just roll the train start date by the rolling months
    # and adjust the end train date as well
    if rolling:
      temp_start_train = temp_start_train + timedelta(days=30 * roll_months)
      temp_end_train = temp_start_train + timedelta(days=(test_months * train_test_ratio * 30))
  
  if verbose:
    print("There are {} splits formed based on the date ranges given".format(len(result)))
    print("Date ranges are: start: {} and end: {} with rolling set to {} and rolling window months set to {} months".format(global_start, global_end, rolling, roll_months))
    print("Note that the train_test_ratio is {} and test_months is {}, so training data will have {} month(s) size and test/val data will have {} month(s) size".format(train_test_ratio, test_months, train_test_ratio * test_months, test_months))
    print("Here is a sample split that follows the following format: (train_start, train_end, test_start, test_end, val_start, val_end)")
    print(pretty_print_list(result[0]))
  
  return result

def get_best_param_dic_metrics(best_model, displayKeys=False):
  # https://stackoverflow.com/questions/36697304/how-to-extract-model-hyper-parameters-from-spark-ml-in-pyspark
  parameter_dict = best_model.stages[-1].extractParamMap()
  dic = dict()
  for x, y in parameter_dict.items():
    dic[x.name] = y
    if displayKeys:
      print("Parameter available: {}".format(x.name))

  return dic

## Feature Engineering (DataType Transformation, Data Prep, ML Algorithms)

In [0]:
def get_std_features(data):
  # Notes
  # div_reached_dest is going to be full of nulls, so dropping - we should consider making the default as "-1" - so it doesn't make us drop rows (dropna)
  numeric_features = get_numeric_features(data)
  categorical_features = get_categorical_features(data)
  dt_features = get_datetime_features(data)
  bool_features = get_boolean_features(data)
  assert_no_other_features_exist(data)
  # no features are null in our model as we dropped/mean imputed/pre-processed them in the data processing stage
  # so drop their null indicator variables as they provide no value
  cols_to_drop = ['index_id', 'origin_utc_offset', 'dest_utc_offset', 'origin_latitude', 
                  'origin_longitude', 'dest_latitude', 'dest_longitude', 'dt', 'planned_dep_time', 
                  'actual_dep_time', 'actual_arr_time', 'div_reached_dest', 
                  'time_at_prediction_utc', 'oa_avg_del2_4hr', 'da_avg_del2_4hr', 'carrier_avg_del2_4hr']\
                  + [x for x in dt_features if x != 'planned_departure_utc'] + [x for x in numeric_features + categorical_features if x.endswith('_null')]
  
  
  # there are some special snowflakes we need to handle here
  # dep_is_delayed, origin_altitude and dest_altitude are strings, they should be numeric
  # so we remove them from the categorical and add them to numeric
  numeric_features = numeric_features + ['origin_altitude', 'dest_altitude', 'dep_is_delayed']
  numeric_features = list(set(numeric_features))
  
  try:
    categorical_features.remove('origin_altitude')
    categorical_features.remove('dest_altitude')
    categorical_features.remove('dep_is_delayed')
  except:
    # dont error if these were not in categorical features
    pass
    
  # likewise, there are some indicator variables that are numeric (int), but need to be string (categorical)
  ind_vars = [x for x in numeric_features if x.endswith("_null") or x.endswith("_ind")]
  for x in ind_vars:
    try:
      numeric_features.remove(x)
    except:
      # dont error if these were not in numeric
      pass
    
  categorical_features = categorical_features + ind_vars
  categorical_features = list(set(categorical_features))
  
  bool_features = [x for x in bool_features if x not in cols_to_drop]
  dt_features = [x for x in dt_features if x not in cols_to_drop]
  categorical_features = [x for x in categorical_features if x not in cols_to_drop]
  numeric_features = [x for x in numeric_features if x not in cols_to_drop]
  all_cols = numeric_features + categorical_features + dt_features + bool_features
  cols_to_consider = [x for x in all_cols if x not in cols_to_drop] 
  
  if verbose:  
    print("There are {} total columns out of which there are {} columns to consider in the model".format(len(all_cols), len(cols_to_consider)))
    print("There are {} categorical features".format(len(categorical_features)))
    print("There are {} numeric features".format(len(numeric_features)))
    print("There are {} date features".format(len(dt_features)))
    print("There are {} bool features".format(len(bool_features)))
    
  return all_cols, cols_to_consider, cols_to_drop, numeric_features, categorical_features, dt_features, bool_features

def add_required_cols(cols):
  # every model must contain the label and the timestamp var
  if 'planned_departure_utc' not in cols:
    cols.append('planned_departure_utc')
  if 'dep_is_delayed' not in cols:
    cols.append('dep_is_delayed')

  return list(set(cols))
  

def get_std_desired_numeric(df, hypothesis=1, custom_cols_to_drop=[]):
  all_cols, cols_to_consider, cols_to_drop, numeric_features, categorical_features, dt_features, bool_features = get_std_features(data)

  if hypothesis == 1:
    # all numeric features in the df
    desired_numeric = [x for x in numeric_features if x in df.columns]
  elif hypothesis == 2:
    # includes mandatory features like origin temp and flight dist (or planned duration) + percentage features
    desired_numeric = ['pct_delayed_from_origin', 'pct_delayed_to_dest', 'pct_delayed_for_route', 'pct_delayed_from_state', 'pct_delayed_to_state', 'flight_distance', 'origin_tmp_c']
  elif hypothesis == 3:
    # includes mandatory features like origin temp and flight dist (or planned duration) + weather features
    desired_numeric = ['origin_altitude', 'origin_wnd_speed', 'origin_cig_cloud_agl', 'origin_vis_dist', 'origin_tmp_c', 'origin_dew_c', 'origin_slp_p',
                       'dest_altitude', 'planned_duration']
  elif hypothesis == 4:
    # includes mandatory features like origin temp and flight dist (or planned duration) + computed columns
    desired_numeric = ['flight_distance', 'origin_tmp_c', 'pct_delayed_from_origin', 'mean_delay_from_origin', 'pct_delayed_to_dest', 
                       'mean_delay_to_dest', 'pct_delayed_for_route', 'mean_delay_for_route', 'pct_delayed_from_state', 'mean_delay_from_state', 
                       'pct_delayed_to_state', 'mean_delay_to_state']
  else:
    raise Exception("Invalid hypothesis number!")

  # drop any columns that are a no-no in the model
  desired_numeric = [x for x in desired_numeric if x not in cols_to_drop + custom_cols_to_drop]
  
  # we must convert dep_is_delayed to numeric
  desired_numeric = list(set(desired_numeric + ['dep_is_delayed']))
  
  # confirm no duplicates
  assert(len(desired_numeric) == len(set(desired_numeric)))

  # confirm data actually has these features
  # also confirm that the desired_numeric is part of the "registered" numeric features to choose from
  all_cols = [t[0] for t in df.dtypes]
  for dn in desired_numeric:
    if dn not in all_cols:
      raise Exception("Unknown feature found: {}".format(dn))
    if dn not in numeric_features:
      raise Exception("Feature: {} is not a registered numeric feature".format(dn))
    
  # ensure that the desired numeric columns are indeed converted to numeric
  # for example, this will ensure that dep_is_delayed is converted to int
  to_convert = get_std_to_convert_numeric(df, desired_numeric)
  df = set_feature_dtype(df, to_convert, dtype='int')

  return df, list(set(desired_numeric))

def get_std_desired_categorical(df, hypothesis=1, custom_cols_to_drop=[]):
  all_cols, cols_to_consider, cols_to_drop, numeric_features, categorical_features, dt_features, bool_features = get_std_features(data)

  if hypothesis == 1:
    # all categorical features in df
    desired_categorical = [x for x in categorical_features if x in df.columns]
  elif hypothesis == 2:
    # includes mandatory features = time related + origin/dest/dist + carrier + holiday + computed score (potential for delay)
    desired_categorical = ['month', 'day_of_month', 'day_of_week', 'dep_hour', 'arr_hour', 'origin_ICAO', 'dest_ICAO', 'carrier', 'distance_group', 'holiday', 'poten_for_del']
  elif hypothesis == 3:
    # includes mandatory features = time related + origin/dest/dist + carrier + holiday + computed score (potential for delay) + weather related 
    desired_categorical = ['month', 'day_of_month', 'day_of_week', 'dep_hour', 'arr_hour', 'origin_ICAO', 'dest_ICAO', 'carrier', 'distance_group', 
                           'holiday', 'poten_for_del', 'canceled', 'origin_cig_cavok', 'origin_wnd_type', 'origin_vis_var', 'origin_city', 'dest_city']
  elif hypothesis == 4:
    # includes mandatory features = time related + origin/dest/dist + carrier + holiday + computed score (potential for delay) + computed indicators
    desired_categorical = ['month', 'day_of_month', 'day_of_week', 'dep_hour', 'arr_hour', 'origin_ICAO', 'dest_ICAO', 'carrier', 'holiday',
                           'weather_window_del_ind', 'carrier_window_del_ind', 'security_window_del_ind', 'late_ac_window_del_ind', 'nas_window_del_ind',
                           'oa_avg_del_ind', 'da_avg_del_ind', 'carrier_avg_del_ind', 'poten_for_del', 'prev_fl_del']
  else:
    raise Exception("Invalid hypothesis number!")

  # drop any columns that are a no-no in the model and drop dep_is_delayed since it has to be numeric (int)
  desired_categorical = [x for x in desired_categorical if x not in cols_to_drop + custom_cols_to_drop + ['dep_is_delayed']]

  # confirm no duplicates
  assert(len(desired_categorical) == len(set(desired_categorical)))

  # confirm data actually has these features
  # also confirm that the desired_categorical is part of the "registered" categorical features to choose from
  all_cols = [t[0] for t in df.dtypes]
  for dc in desired_categorical:
    if dc not in all_cols:
      raise Exception("Unknown feature found: {}".format(dc))
    if dc not in categorical_features:
      raise Exception("Feature: {} is not a registered categorical feature".format(dc))
  
  # ensure the vars are converted to strings
  df = set_feature_dtype(df, desired_categorical, dtype='string')
  
  return df, list(set(desired_categorical))

def get_std_desired_numeric_int(df, desired_numeric):  
  return [x for x in desired_numeric if get_feature_dtype(df, x) == 'int']

def get_std_desired_numeric_double(df, desired_numeric):
  return [x for x in desired_numeric if get_feature_dtype(df, x) == 'double']

def get_std_to_convert_numeric(df, desired_numeric):
  desired_numeric_int = get_std_desired_numeric_int(df, desired_numeric)
  desired_numeric_double =  get_std_desired_numeric_double(df, desired_numeric)

  to_convert_numeric = [x for x in desired_numeric if x not in desired_numeric_int + desired_numeric_double]
  if verbose:
    print("These columns need to be converted to numeric type: {}".format(to_convert_numeric))
    
  return to_convert_numeric

def get_proportion_labels(df):
  if verbose:
    print("In method - get_proportion_labels - displaying proportion of labeled class")
    print(display(df.groupby('dep_is_delayed').count()))
  
  positive = df.filter(df.dep_is_delayed == 1).count()
  negative = df.filter(df.dep_is_delayed == 0).count()
  total = negative + positive
  if total == 0:
    raise Exception("No records found!")
  
  if positive == 0:
    raise Exception("No positive records found!")
  
  if negative == 0:
    raise Exception("No negative records found!")
    
  # there is a risk that the positive/negative classes are so imbalanced that they are non existent in the df
  # so we should guard against that case in order to avoid throwing div by 0
  np = -1 if positive == 0 else 1.0 * negative/positive
  pn = -1 if negative == 0 else 1.0 * positive/negative
  
  return 1.0 * positive/total, 1.0 * negative/total, pn, np

def downsample(df, min_major_class_ratio, alpha=0.99):
  if min_major_class_ratio == -1:
    # assign default value to reduce the majority class by half
    min_major_class_ratio = 0.5
    print("In method downsample: Warning - reset min_major_class_ratio to default: {}".format(min_major_class_ratio))
    
  if verbose:
    print("Starting to downsample, negative class has {} rows and positive class has {} rows".format(df.filter(df.dep_is_delayed == 0).count(), df.filter(df.dep_is_delayed == 1).count()))
    
  negative = df.filter(df.dep_is_delayed == 0).sample(False, min_major_class_ratio * alpha, seed=2021)
  positive = df.filter(df.dep_is_delayed == 1)
  
  new_df = positive.union(negative).cache()
  if verbose:
    negative = new_df.filter(new_df.dep_is_delayed ==0).count()
    positive = new_df.filter(new_df.dep_is_delayed ==1).count()
    print("After downsampling, negative class has {} rows and positive class has {} rows".format(negative, positive))
  
  return new_df

#### Logit Specific Functions

In [0]:
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.sql.types import StringType,BooleanType,DateType
from pyspark.ml.feature import StringIndexer, VectorAssembler, OneHotEncoder
from pyspark.ml.feature import IndexToString, StringIndexer, OneHotEncoder, VectorAssembler, Bucketizer, StandardScaler
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline

def get_train_test_finalset_for_logit_2(train, test, custom_payload, drop_na = True, set_handle_invalid="keep"):
  if custom_payload == None:
    raise Exception("Custom payload cannot be null as it contains feature selection info")
  
  categorical_features = custom_payload["categorical_features"]
  numeric_features = custom_payload["numeric_features"]
  
  stages = []
  for feature in categorical_features:
    # string index categorical features:
    indexer = StringIndexer(inputCol=feature, outputCol = feature+'_index')
    indexer.setHandleInvalid(set_handle_invalid)
    # one-hot the categorical features:
    one_hot_encoder = OneHotEncoder(inputCols=[indexer.getOutputCol()], outputCols=[feature+'_Indicator'])
    stages += [indexer, one_hot_encoder]
  
  # convert_label
  label_stringIdx = StringIndexer(inputCol = 'dep_is_delayed', outputCol = 'label')
  stages += [label_stringIdx]
  
  # convert numerical features
  vector_assembler = VectorAssembler(inputCols = numeric_features, outputCol="numeric_vec")
  vector_assembler.setHandleInvalid(set_handle_invalid)
  scaler = StandardScaler(inputCol="numeric_vec", outputCol="scaled_features_1")
  
  stages += [vector_assembler, scaler]
  
  # feature assembler
  assemblerInputs = [feature + "_Indicator" for feature in categorical_features] + ['scaled_features_1']
  assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="scaled_features")
  stages += [assembler]
  pipeline = Pipeline(stages = stages)
  pipelineModel = pipeline.fit(train)
  
  # comb_features = categorical_features + numeric_features + ['dep_is_delayed']
  
  # transforming the data using pipeline
  df_train = pipelineModel.transform(train)
  selectedCols = ['label', 'scaled_features'] # + comb_features
  df_train = df_train.select(selectedCols)
  df_test = pipelineModel.transform(test)
  df_test = df_test.select(selectedCols)
  
  if verbose:
    print("Training Dataset Count Before Dropping NA: " + str(df_train.count()))
    print("Test Dataset Count Before Dropping NA: " + str(df_test.count()))
    # display(training_set)
  
  if drop_na:  
    df_train = df_train.dropna()
    df_test = df_test.dropna()
  else:
    print("Drop NA is set to false, will not drop any rows...")
  
  if verbose and drop_na:
    print("Training Dataset Count After Dropping NA: " + str(df_train.count()))
    print("Test Dataset Count After Dropping NA: " + str(df_test.count()))
    
  # convert label to integer type, so we can compute performance metrics easily
  df_train = df_train.withColumn('label', df_train['label'].cast(IntegerType()))  
  df_test = df_test.withColumn('label', df_test['label'].cast(IntegerType()))
  
  return df_train, df_test

def get_train_test_finalset_for_logit(train, test, custom_payload, drop_na = True, set_handle_invalid="keep"):
  
  if custom_payload == None:
    raise Exception("Custom payload cannot be null as it contains feature selection info")
    
  categorical_features = custom_payload["categorical_features"]
  numeric_features = custom_payload["numeric_features"]
  
  # form a string indexer and change name of dep_is_delayed to "label" - used in std naming conventions in models  
  # https://stackoverflow.com/questions/34681534/spark-ml-stringindexer-handling-unseen-labels
  labelIndexer = StringIndexer(inputCol="dep_is_delayed", outputCol="label").setHandleInvalid(set_handle_invalid).fit(train)
  train = labelIndexer.transform(train)
  test = labelIndexer.transform(test)

  # create index for each categorical feature
  categorical_index = [i + "_Index" for i in categorical_features]
  stringIndexer = StringIndexer(inputCols=categorical_features, outputCols=categorical_index).setHandleInvalid(set_handle_invalid).fit(train)
  train = stringIndexer.transform(train)
  test = stringIndexer.transform(test)

  # create indicator feature for each categorical variable and do one hot encoding, encode only train data
  list_encoders = [i + "_Indicator" for i in categorical_features]
  encoder = OneHotEncoder(inputCols=categorical_index, outputCols=list_encoders).setHandleInvalid(set_handle_invalid).fit(train)
  train_one_hot = encoder.transform(train)
  test_one_hot = encoder.transform(test)

  # retain only encoded categorical columns, numeric features and label 
  train_one_hot = train_one_hot.select(["label"] + categorical_index + list_encoders + numeric_features) 
  test_one_hot = test_one_hot.select(["label"] + categorical_index + list_encoders + numeric_features)

  if verbose:
    print("Training Dataset Count Before Dropping NA: " + str(train_one_hot.count()))
    print("Test Dataset Count Before Dropping NA: " + str(test_one_hot.count()))
    # display(training_set)
  
  if drop_na:  
    training_set = train_one_hot.dropna()
    test_set = test_one_hot.dropna()
  else:
    print("Drop NA is set to false, will not drop any rows...")
    training_set = train_one_hot
    test_set = test_one_hot
    
  # convert label to integer type, so we can compute performance metrics easily
  training_set = training_set.withColumn('label', training_set['label'].cast(IntegerType()))  
  test_set = test_set.withColumn('label', test_set['label'].cast(IntegerType()))

  if verbose and drop_na:
    print("Training Dataset Count After Dropping NA: " + str(training_set.count()))
    print("Test Dataset Count After Dropping NA: " + str(test_set.count()))
    # display(training_set)
  
  return training_set, test_set

def get_logit_pipeline(training_set, set_handle_invalid="keep", grid_search_mode=True):
  
  # get features only
  features_only = training_set.columns
  features_only.remove("label")

  # Combine training input columns into a single vector column, "features" is the default column name for sklearn/pyspark feature df
  # so we preserve that default name
  assembler = VectorAssembler(inputCols=features_only,outputCol="features").setHandleInvalid(set_handle_invalid)

  # Scale features so we can actually use them in logit
  # StandardScaler standardizes features by removing the mean and scaling to unit variance.
  standardscaler = StandardScaler().setInputCol("features").setOutputCol("scaled_features")
  
  # use scaled features in logit, with output column as "label"
  lr = LogisticRegression(featuresCol = 'scaled_features', labelCol = 'label', maxIter=10)

  # for ML Lib pipeline, build a pipeline that will assemble the features into a single vector, perform scaling, and do optionally logit
  if grid_search_mode:
    pipeline = Pipeline(stages=[assembler, standardscaler, lr])
  else:
    pipeline = Pipeline(stages=[assembler, standardscaler])
    
  return lr, pipeline


def model_train_logit_grid_search(training_set, test_set, pipeline, lr, ts_split):
  # grid search is broken - fails with the following error mode
  # https://stackoverflow.com/questions/58827795/requirement-failed-nothing-has-been-added-to-this-summarizer
  # this error mode seems specific to the data it is training on - which is non deterministic based on our train-test size
  # so we don't want to take a dependency on this method
  # moreover - its unclear whether the "numFolds" param should be 1 or > 1 
  # if we make it > 1 then we don't preserve the ordering of the time series data, which is important
  result = {}
  
  # form param grid for searching across multiple params to find best model
  paramGrid = ParamGridBuilder() \
    .addGrid(lr.threshold, [0.01, 0.1, 0.2, 0.3]) \
    .addGrid(lr.maxIter, [2, 5, 10]) \
    .addGrid(lr.regParam, [0.1, 0.2]) \
    .build()
  
  # set up cross validator with the pipeline, choose num cross == 1
  # TODO: clarify on what numFolds should be
  crossval = CrossValidator(estimator = pipeline,
                          estimatorParamMaps = paramGrid,
                          evaluator = BinaryClassificationEvaluator(),
                          numFolds = 1)
  
  # https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.evaluation.BinaryClassificationEvaluator.html#pyspark.ml.evaluation.BinaryClassificationEvaluator.metricName
  # https://stats.stackexchange.com/questions/99916/interpretation-of-the-area-under-the-pr-curve
  evaluator_aupr = BinaryClassificationEvaluator(labelCol="label", metricName="areaUnderPR")
  evaluator_auroc = BinaryClassificationEvaluator(labelCol="label", metricName="areaUnderROC")
  
  # fit the model
  cvModel = crossval.fit(training_set)
  
  # return best model from all our models we trained on
  best_model = cvModel.bestModel
  best_param_dic = get_best_param_dic_metrics(best_model, False)

  # review performance on training data 
  train_model = cvModel.transform(training_set)
  aupr = evaluator_aupr.evaluate(train_model)
  auroc = evaluator_auroc.evaluate(train_model)
  true_positive, true_negative, false_positive, false_negative, accuracy, precision, recall, f1_score = compute_classification_metrics(train_model)
  result["train"] = (true_positive, true_negative, false_positive, false_negative, accuracy, precision, recall, f1_score, aupr, auroc, ts_split, best_param_dic)
  
  # review performance on test data 
  test_model = cvModel.transform(test_set)
  aupr = evaluator_aupr.evaluate(test_model)
  auroc = evaluator_auroc.evaluate(test_model)
  true_positive, true_negative, false_positive, false_negative, accuracy, precision, recall, f1_score = compute_classification_metrics(test_model)
  result["test"] = (true_positive, true_negative, false_positive, false_negative, accuracy, precision, recall, f1_score, aupr, auroc, ts_split, best_param_dic)
  
  return result

def model_train_logit(training_set, test_set, pipeline, lr, ts_split, custom_payload):
  result = {}
  if custom_payload == None:
    raise Exception("Custom payload cannot be none as it contains hyper-param information")
  
  if pipeline != None:
    # regular logit path
    pipelineModel = pipeline.fit(training_set)
    df_train = pipelineModel.transform(training_set)
    df_train = df_train.select(['label', 'scaled_features'])

    pipelineModel = pipeline.fit(test_set)
    df_test = pipelineModel.transform(test_set)
    df_test = df_test.select(['label', 'scaled_features'])
  else:
    # we've already fit the pipeline (logit_alt)
    df_train = training_set
    df_test = test_set
    
  # hyper param setting
  lr.threshold = custom_payload["threshold"] if "threshold" in custom_payload.keys() else 0.5
  lr.maxIter = custom_payload["maxIter"] if "maxIter" in custom_payload.keys() else 10
  lr.regParam = custom_payload["regParam"] if "regParam" in custom_payload.keys() else 0.5
  
  print("Starting training of Logit model with parameters - threshold: {}, max iterations: {}, regParam: {}"\
        .format(lr.threshold, lr.maxIter, lr.regParam))
  
  lrModel = lr.fit(df_train)
  
  # set up evaluators
  evaluator_aupr = BinaryClassificationEvaluator(labelCol="label", metricName="areaUnderPR")
  evaluator_auroc = BinaryClassificationEvaluator(labelCol="label", metricName="areaUnderROC")
  
  # review performance on training data 
  train_model = lrModel.transform(df_train)
  aupr = evaluator_aupr.evaluate(train_model)
  auroc = evaluator_auroc.evaluate(train_model)
  true_positive, true_negative, false_positive, false_negative, accuracy, precision, recall, f1_score = compute_classification_metrics(train_model)
  result["train"] = (true_positive, true_negative, false_positive, false_negative, accuracy, precision, recall, f1_score, aupr, auroc, ts_split, lrModel.summary)
  
  # review performance on test data 
  test_model = lrModel.transform(df_test)
  aupr = evaluator_aupr.evaluate(test_model)
  auroc = evaluator_auroc.evaluate(test_model)
  true_positive, true_negative, false_positive, false_negative, accuracy, precision, recall, f1_score = compute_classification_metrics(test_model)
  result["test"] = (true_positive, true_negative, false_positive, false_negative, accuracy, precision, recall, f1_score, aupr, auroc, ts_split, lrModel.summary)
  
  return result

#### Random Forest Specific Functions

In [0]:
from pyspark.sql.functions import col, isnan, substring, split, when, lit, max as pyspark_max, countDistinct, count, mean, sum as pyspark_sum, expr, to_utc_timestamp, to_timestamp, concat, length
from pyspark.sql import SQLContext
from pyspark.sql.types import IntegerType, StringType, BooleanType, DateType, DoubleType
import pandas as pd
from gcmap import GCMapper, Gradient
import matplotlib.pyplot as plt
from pandas.tseries.holiday import USFederalHolidayCalendar
from pyspark.sql.types import *
from pyspark.ml.feature import IndexToString, StringIndexer, OneHotEncoder, VectorAssembler, Bucketizer, StandardScaler, VectorIndexer
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.mllib.evaluation import BinaryClassificationMetrics
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, GBTClassifier
from pyspark.ml import Pipeline
from pyspark.ml.linalg import Vectors
import seaborn as sns
from pyspark.ml.feature import QuantileDiscretizer
  
def get_staged_data_for_trees(train, test, custom_payload):
  stages = []
  if custom_payload == None:
    raise Exception("Custom payload cannot be none as it contains feature selection information")
  
  categorical_features = custom_payload["categorical_features"]
  numeric_features = custom_payload["numeric_features"] 
  num_buckets = custom_payload["num_buckets"] if "num_buckets" in custom_payload.keys() else 3
  quantize_numeric = custom_payload["quantize_numeric"] if "quantize_numeric" in custom_payload.keys() else False
  
  for cat_feat in categorical_features:
    # string indexing categorical features 
    stringIndexer = StringIndexer(inputCol=cat_feat, outputCol=cat_feat + "_Index").setHandleInvalid("keep")
    # one hot encode categorical features
    encoder = OneHotEncoder(inputCols=[stringIndexer.getOutputCol()], outputCols=[cat_feat + "_One_Hot"])
    # add to stages
    stages += [stringIndexer, encoder]
  
  # create indexer for label class
  labelIndexer = StringIndexer(inputCol="dep_is_delayed", outputCol="label").setHandleInvalid("keep")
  stages += [labelIndexer]
  
  print ("Quantizing numeric features is set to {}. If set to true, we will use buckets = {}".format(quantize_numeric, num_buckets))
  if quantize_numeric:
    for num_feat in numeric_features:
      # bin numeric features 
      num_bin = QuantileDiscretizer(numBuckets=num_buckets, 
                                    inputCol=num_feat, outputCol=num_feat + "_Binned").setHandleInvalid("keep")
      stages += [num_bin]

    # create vector assembler combining features into 1 vector (combining binner and categorical features)
    assemblerInputs = [c + "_One_Hot" for c in categorical_features] + [n + "_Binned" for n in numeric_features]
    assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features").setHandleInvalid("keep")
    stages += [assembler]
  else:
    num_assembler = VectorAssembler(inputCols=numeric_features, outputCol="numeric_vec").setHandleInvalid("skip")
    stages += [num_assembler]
    assemblerInputs = [f + "_One_Hot" for f in categorical_features] + ["numeric_vec"]
    # create vector assembler combining categorical and numeric_vec
    assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
    stages += [assembler]
    
  # notice no need for scaling in the case of RFs
  # ensure that the model train eval functions for trees have feature column called "features" and not "scaled_features"
  # it must match the output column from the processing phase here
  pipeline = Pipeline().setStages(stages)
  pipelineModel = pipeline.fit(train)
  df_train = pipelineModel.transform(train)
  
  # features_comb = categorical_features + numeric_features + ["dep_is_delayed"]
  selectedcols = ["label", "features"] # + features_comb
  df_train = df_train.select(selectedcols)
  df_test = pipelineModel.transform(test)
  df_test = df_test.select(selectedcols)
  
  return df_train, df_test

def model_train_rf(train, test, ts_split, custom_payload):
  if custom_payload == None:
    raise Exception("Custom payload cannot be none as it contains hyper-param information")
    
  result = {}
  # convert label to integer type, so we can find performance metrics easily
  train = train.withColumn('label', train['label'].cast(IntegerType()))  
  test = test.withColumn('label', test['label'].cast(IntegerType()))
  
  # create an initial RandomForest model
  rf = RandomForestClassifier(labelCol="label", featuresCol="features")
  
  # hyper param setting
  rf.maxBins = custom_payload["maxBins"] if "maxBins" in custom_payload.keys() else 32
  rf.numTrees = custom_payload["numTrees"] if "numTrees" in custom_payload.keys() else 20
  rf.minInstancesPerNode = custom_payload["minInstancesPerNode"] if "minInstancesPerNode" in custom_payload.keys() else 10
  rf.minInfoGain = custom_payload["minInfoGain"] if "minInfoGain" in custom_payload.keys() else 0.001
  print("Starting training of random forest model with parameters - max bins: {}, num trees: {}, minInstancesPerNode: {}, minInfoGain: {}"\
        .format(rf.maxBins, rf.numTrees, rf.minInstancesPerNode, rf.minInfoGain))
  
  # train model with training data
  rfModel = rf.fit(train)

  # make predictions on test data 
  rf_predictions = rfModel.transform(test)
  
  # set up evaluators
  evaluator_aupr = BinaryClassificationEvaluator(labelCol="label", metricName="areaUnderPR")
  evaluator_auroc = BinaryClassificationEvaluator(labelCol="label", metricName="areaUnderROC")
  
  # review performance on training data 
  train_model = rfModel.transform(train)
  aupr = evaluator_aupr.evaluate(train_model)
  auroc = evaluator_auroc.evaluate(train_model)
  true_positive, true_negative, false_positive, false_negative, accuracy, precision, recall, f1_score = compute_classification_metrics(train_model)
  result["train"] = (true_positive, true_negative, false_positive, false_negative, accuracy, precision, recall, f1_score, aupr, auroc, ts_split, rfModel.summary)
  
  # review performance on test data 
  test_model = rfModel.transform(test)
  aupr = evaluator_aupr.evaluate(test_model)
  auroc = evaluator_auroc.evaluate(test_model)
  true_positive, true_negative, false_positive, false_negative, accuracy, precision, recall, f1_score = compute_classification_metrics(test_model)
  result["test"] = (true_positive, true_negative, false_positive, false_negative, accuracy, precision, recall, f1_score, aupr, auroc, ts_split, rfModel.summary)
  
  return result

#### Gradient Boosted Trees Specific Functions

In [0]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

def model_train_gbt(train, test, ts_split, custom_payload):
  if custom_payload == None:
    raise Exception("Custom payload cannot be none as it contains hyper-param information")
    
  result = {}
  # convert label to integer type, so we can find performance metrics easily
  train = train.withColumn('label', train['label'].cast(IntegerType()))  
  test = test.withColumn('label', test['label'].cast(IntegerType()))
  
  # create an initial GBT model
  gbt = GBTClassifier(labelCol="label", featuresCol="features")
  
  # hyper param setting
  gbt.maxBins = custom_payload["maxBins"] if "maxBins" in custom_payload.keys() else 32
  gbt.maxDepth = custom_payload["maxDepth"] if "maxDepth" in custom_payload.keys() else 10
  gbt.minInstancesPerNode = custom_payload["minInstancesPerNode"] if "minInstancesPerNode" in custom_payload.keys() else 10
  gbt.minInfoGain = custom_payload["minInfoGain"] if "minInfoGain" in custom_payload.keys() else 0.001
  gbt.maxIter = custom_payload["maxIter"] if "maxIter" in custom_payload.keys() else 10
  gbt.stepSize = custom_payload["stepSize"] if "stepSize" in custom_payload.keys() else 0.2
  print("Starting training of GBT model with parameters - max bins: {}, max depth: {}, minInstancesPerNode: {}, minInfoGain: {}, max iterations: {}, step size: {}"\
        .format(gbt.maxBins, gbt.maxDepth, gbt.minInstancesPerNode, gbt.minInfoGain, gbt.maxIter, gbt.stepSize))
  
  # train model with training data
  gbtModel = gbt.fit(train)

  # make predictions on test data
  gbt_predictions = gbtModel.transform(test)
  
  # set up evaluators
  evaluator_aupr = BinaryClassificationEvaluator(labelCol="label", metricName="areaUnderPR")
  evaluator_auroc = BinaryClassificationEvaluator(labelCol="label", metricName="areaUnderROC")
  
  # review performance on training data 
  train_model = gbtModel.transform(train)
  aupr = evaluator_aupr.evaluate(train_model)
  auroc = evaluator_auroc.evaluate(train_model)
  true_positive, true_negative, false_positive, false_negative, accuracy, precision, recall, f1_score = compute_classification_metrics(train_model)
  result["train"] = (true_positive, true_negative, false_positive, false_negative, accuracy, precision, recall, f1_score, aupr, auroc, ts_split, gbtModel) # no summary object exists in GBT
  
  # review performance on test data 
  test_model = gbtModel.transform(test)
  aupr = evaluator_aupr.evaluate(test_model)
  auroc = evaluator_auroc.evaluate(test_model)
  true_positive, true_negative, false_positive, false_negative, accuracy, precision, recall, f1_score = compute_classification_metrics(test_model)
  result["test"] = (true_positive, true_negative, false_positive, false_negative, accuracy, precision, recall, f1_score, aupr, auroc, ts_split, gbtModel) # no summary object exists in GBT
  
  return result

#### SVM Specific Functions

In [0]:
from pyspark.ml.classification import LinearSVC

def model_train_svm(training_set, test_set, pipeline, ts_split, custom_payload):
  result = {}
  if custom_payload == None:
    raise Exception("Custom payload cannot be none as it contains hyper-param information")
  
  if pipeline != None:
    # regular svm implementation
    pipelineModel = pipeline.fit(training_set)
    df_train = pipelineModel.transform(training_set)
    df_train = df_train.select(['label', 'scaled_features'])

    pipelineModel = pipeline.fit(test_set)
    df_test = pipelineModel.transform(test_set)
    df_test = df_test.select(['label', 'scaled_features'])
  else:
    # svm_alt implementation
    df_train = training_set
    df_test = test_set
  
  svc = LinearSVC(featuresCol='scaled_features')
  
  # hyper param setting
  svc.maxIter = custom_payload["maxIter"] if "maxIter" in custom_payload.keys() else 40
  svc.regParam = custom_payload["regParam"] if "regParam" in custom_payload.keys() else 0.2
  svc.aggregationDepth = custom_payload["aggregationDepth"] if "aggregationDepth" in custom_payload.keys() else 2
  svc.tol = custom_payload["tol"] if "tol" in custom_payload.keys() else 1e-05
  svc.threshold = custom_payload["threshold"] if "threshold" in custom_payload.keys() else 0.0001
  
  print("Starting training of SVM model with parameters - aggregationDepth: {}, max iterations: {}, L2regParam: {}, convergence tolerance: {}, threshold: {}"\
        .format(svc.aggregationDepth, svc.maxIter, svc.regParam, svc.tol, svc.threshold))
  
  svcModel = svc.fit(df_train)
  
  # set up evaluators
  evaluator_aupr = BinaryClassificationEvaluator(labelCol="label", metricName="areaUnderPR")
  evaluator_auroc = BinaryClassificationEvaluator(labelCol="label", metricName="areaUnderROC")
  
  # review performance on training data 
  train_model = svcModel.transform(df_train)
  aupr = evaluator_aupr.evaluate(train_model)
  auroc = evaluator_auroc.evaluate(train_model)
  true_positive, true_negative, false_positive, false_negative, accuracy, precision, recall, f1_score = compute_classification_metrics(train_model)
  result["train"] = (true_positive, true_negative, false_positive, false_negative, accuracy, precision, recall, f1_score, aupr, auroc, ts_split, svcModel.extractParamMap())
  
  # review performance on test data 
  test_model = svcModel.transform(df_test)
  aupr = evaluator_aupr.evaluate(test_model)
  auroc = evaluator_auroc.evaluate(test_model)
  true_positive, true_negative, false_positive, false_negative, accuracy, precision, recall, f1_score = compute_classification_metrics(test_model)
  result["test"] = (true_positive, true_negative, false_positive, false_negative, accuracy, precision, recall, f1_score, aupr, auroc, ts_split, svcModel.extractParamMap())
  
  return result

## Model Training and Evaluation - Apply Pipeline To Data & Train & Collect Metrics

In [0]:
from statistics import *
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType

def compute_classification_metrics(df):
  # assumes df has columns called label and prediction
  true_positive = df[(df.label == 1) & (df.prediction == 1)].count()
  true_negative = df[(df.label == 0) & (df.prediction == 0)].count()
  false_positive = df[(df.label == 0) & (df.prediction == 1)].count()
  false_negative = df[(df.label == 1) & (df.prediction == 0)].count()
  accuracy = ((true_positive + true_negative)/df.count())
  
  if (true_positive + false_negative == 0.0):
    recall = 0.0
    precision = float(true_positive) / (true_positive + false_positive)
    
  elif (true_positive + false_positive == 0.0):
    recall = float(true_positive) / (true_positive + false_negative)
    precision = 0.0
    
  else:
    recall = float(true_positive) / (true_positive + false_negative)
    precision = float(true_positive) / (true_positive + false_positive)

  if(precision + recall == 0):
    f1_score = 0
    
  else:
    f1_score = 2 * ((precision * recall)/(precision + recall))
    
  return true_positive, true_negative, false_positive, false_negative, accuracy, precision, recall, f1_score

def get_classification_metrics(dic, with_display=True, display_train_metrics=False):
  '''
  assumes every model follows a contract of having a result dictionary
  with key = "train" and key = "test", and optionally, key = "val"
  
  also assumes that the dictionary payload follows the format
  result["key"] = (true_positive, true_negative, false_positive, false_negative, accuracy, precision, recall, f1_score, aupr, auroc, ts_split, modelsummary)
  '''
  if "train" not in dic.keys() or "test" not in dic.keys():
    raise Exception("Result object does not have the right keys")
  
  contains_val = "val" in dic.keys()
  result = {"train": dict(), "test": dict()}
  if contains_val:
    result["val"] = dict()
  
  if not with_display:
    display_train_metrics = False
  
  true_positive, true_negative, false_positive, false_negative, accuracy, precision, recall, f1_score, aupr, auroc, ts_split, modelsummary = dic["train"]
  # TODO: format this to human readable form
  ts_split_str = str(ts_split)
  
  tmp = {"true_positive": true_positive, "true_negative": true_negative, "false_positive": false_positive, "false_negative": false_negative,
         "accuracy": accuracy, "precision": precision, "recall": recall, "f1_score": f1_score, "aupr": aupr, "auroc": auroc, "ts_split": ts_split_str, "summary": modelsummary}
  
  if with_display:
    # enter new line for neatness
    print()
    
  # set the temp dictionary to result
  result["train"] = tmp
  
  str_ts = "Metrics for Split - (Train: {}-{}), (Test: {}-{}), (Val: {}-{})".format(ts_split[0].strftime("%b %d %Y"), \
  ts_split[1].strftime("%b %d %Y"), ts_split[2].strftime("%b %d %Y"), ts_split[3].strftime("%b %d %Y"), ts_split[4].strftime("%b %d %Y"), ts_split[5].strftime("%b %d %Y"))
  
  num = 150
  
  if with_display and display_train_metrics:
    print("#" * num)
    print("Training Data " + str_ts)
    print("Accuracy: {}".format(result["train"]["accuracy"]))
    print("Precision: {}".format(result["train"]["precision"]))
    print("Recall: {}".format(result["train"]["recall"]))
    print("F1 Score: {}".format(result["train"]["f1_score"]))
    print("Area under PR curve: {}".format(result["train"]["aupr"]))
    print("Area under ROC curve: {}".format(result["train"]["auroc"]))
    print("#" * num)
  
  # do the same for test and val
  true_positive, true_negative, false_positive, false_negative, accuracy, precision, recall, f1_score, aupr, auroc, ts_split, modelsummary = dic["test"]
  # TODO: format this to human readable form
  ts_split_str = str(ts_split)
  
  tmp = {"true_positive": true_positive, "true_negative": true_negative, "false_positive": false_positive, "false_negative": false_negative,
         "accuracy": accuracy, "precision": precision, "recall": recall, "f1_score": f1_score, "aupr": aupr, "auroc": auroc, "ts_split": ts_split_str, "summary": modelsummary}
  
  # set the temp dictionary to result
  result["test"] = tmp
  
  if with_display:
    print("#" * num)
    print("Test Data " + str_ts)
    print("Accuracy: {}".format(result["test"]["accuracy"]))
    print("Precision: {}".format(result["test"]["precision"]))
    print("Recall: {}".format(result["test"]["recall"]))
    print("F1 Score: {}".format(result["test"]["f1_score"]))
    print("Area under PR curve: {}".format(result["test"]["aupr"]))
    print("Area under ROC curve: {}".format(result["test"]["auroc"]))
    print("#" * num)
    
  if contains_val:
    true_positive, true_negative, false_positive, false_negative, accuracy, precision, recall, f1_score, aupr, auroc, ts_split, modelsummary = dic["val"]
    # TODO: format this to human readable form
    ts_split_str = str(ts_split)
    
    tmp = {"true_positive": true_positive, "true_negative": true_negative, "false_positive": false_positive, "false_negative": false_negative,
         "accuracy": accuracy, "precision": precision, "recall": recall, "f1_score": f1_score, "aupr": aupr, "auroc": auroc, "ts_split": ts_split_str, "summary": modelsummary}
    
    # set the temp dictionary to result
    result["val"] = tmp
  
    if with_display:
      print("#" * num)
      print("Validation Data " + str_ts)
      print("Accuracy: {}".format(result["val"]["accuracy"]))
      print("Precision: {}".format(result["val"]["precision"]))
      print("Recall: {}".format(result["val"]["recall"]))
      print("F1 Score: {}".format(result["val"]["f1_score"]))
      print("Area under PR curve: {}".format(result["val"]["aupr"]))
      print("Area under ROC curve: {}".format(result["val"]["auroc"]))
      print("#" * num)
  
  return result


def get_classification_metrics_for_storage_ingestion(list_dic):
  # sadly, we cannot store model summary into the dataframe, thus we return everything except that
  # assumes we have output from get_classification_metrics() (in list form) as the input here
  for dic in list_dic:
    dic["train"].pop("summary", None)
    dic["test"].pop("summary", None)
    if "val" in dic.keys():
      dic["val"].pop("summary", None)
  
  return list_dic

def get_aggregated_classification_metrcs(list_dic, dtype="test", with_display=True):
  '''
  gets summary stats (avg, min, percentiles etc.) for the list of models 
  has a dependency on the key naming defined in get_classification_metrics()
  '''
  metric_type = ["accuracy", "precision", "recall", "f1_score", "aupr", "auroc"]
  summary_type = ["mean", "min", "max", "median"]
  
  # for some reason math.min, math.max don't work and throw a "type" error
  # same goes for statistics.mean and statistics.median
  # this shows up sometimes in the RF models get_aggregated_classification_metrcs() calc
  # so we go old school for now :)
  def get_max(list_nums):
    maxi = -1
    for n in list_nums:
      if n > maxi:
        maxi = n
    return float(maxi)
  
  def get_min(list_nums):
    mini = 1000
    for n in list_nums:
      if n < mini:
        mini = n
    return float(mini)
  
  def get_mean(list_nums):
    meanval = 0
    for n in list_nums:
      meanval += n
    return 1.0 * meanval/len(list_nums)
  
  def get_median(list_nums):
    list_nums.sort()
    n = len(list_nums)
    if n % 2 == 0:
      median1 = list_nums[n//2]
      median2 = list_nums[n//2 - 1]
      median = (median1 + median2)/2
    else:
      median = list_nums[n//2]
      
    return float(median)
    
  todf = []
  for s in summary_type:
    metrics = []
    for m in metric_type:
      if s == "mean":
        # print("For summary type: {} and metric type: {}, value is {}".format(s, m, get_mean([dic[dtype][m] for dic in list_dic])))
        metrics.append(get_mean([dic[dtype][m] for dic in list_dic]))
      elif s == "min":
        # print("For summary type: {} and metric type: {}, value is {}".format(s, m, get_min([dic[dtype][m] for dic in list_dic])))
        metrics.append(get_min([dic[dtype][m] for dic in list_dic]))
      elif s == "max":        
        # print("For summary type: {} and metric type: {}, value is {}".format(s, m, get_max([dic[dtype][m] for dic in list_dic])))
        metrics.append(get_max([dic[dtype][m] for dic in list_dic]))
      elif s == "median":
        # print("For summary type: {} and metric type: {}, value is {}".format(s, m, get_median([dic[dtype][m] for dic in list_dic])))
        metrics.append(get_median([dic[dtype][m] for dic in list_dic]))
        
    todf.append(tuple(metrics))   
  
  schema = StructType([ \
    StructField("accuracy", DoubleType(), True), \
    StructField("precision", DoubleType(), True), \
    StructField("recall", DoubleType(), True), \
    StructField("f1_score", DoubleType(), True), \
    StructField("AUPR", DoubleType(), True), \
    StructField("AUROC", DoubleType(), True) \
  ])
  
  df = spark.createDataFrame(data = todf, schema = schema)
  if with_display:
    print("Displaying aggregated metrics - rows are in order: {}".format(summary_type))
    display(df)
    
  return df
    

In [0]:
# for each train, test split - apply pipeline and perform model training
##### THIS IS THE MAIN METHOD FOR TRAINING AND PLUGGING IN ALL OTHER MODELS #####

def model_train_and_eval(data, splits, max_iter=1, model="logit", collect_metrics = True, rebalance_downsample=True, custom_payload=None):
  '''
  Main method for running models and returning results
  custom_payload is referring to a dictionary that each model can unpack to access model specific values (reg params, special columns, feature slection etc.) 
  '''
  
  # list that holds the metrics results for the specified model
  # the metrics returned per model may be different, 
  # look into each model's specific return format to extract relevant metric
  collect_metrics_result = []
  
  for i in range(len(splits)):
    if i > max_iter-1:
      break
    
    train = get_df_for_model(data, splits, index=i, datatype="train")
    test = get_df_for_model(data, splits, index=i, datatype="test")
    #val = get_df_for_model(data, splits, index=i, datatype="val")
    
    # drop planned_departure_utc and index id before sending off to the model
    # we kept planned_departure_utc up till now as that's needed for data time filtering
    # we kept index_id because its the index and may help spark in retrieving rows quicker
    cols = [t[0] for t in train.dtypes if t[0] != 'planned_departure_utc' or t[0] != 'index_id']
    train = train.select(cols).cache()
    test = test.select(cols).cache()
    
    # need to pass down split dates info to the models as they need this for result object
    split = get_dates_from_splits(splits, index=i, dtype="all")
    
    # finally, downsample the majority class (dep_is_delayed == false) if need be
    # we downsample because we have tons of data fortunately - otherwise, we would have up sampled
    if rebalance_downsample:
      print("Down-sampling the training data to have more balanced classes...")
      pt, nt, pn, np = get_proportion_labels(train)
      train = downsample(train, pn)
      
    print("Starting training iteration: {} for model: '{}' with collect_metrics: {}".format(i+1, model, collect_metrics))
    
    if model == "logit":
      training_set, test_set = get_train_test_finalset_for_logit(train, test, custom_payload)
      lr, pipeline = get_logit_pipeline(training_set, grid_search_mode=False)
      result = model_train_logit(training_set, test_set, pipeline, lr, split, custom_payload)
    
    # a different function for processing the data is applied in this mode
    elif model == "logit_alt":
      training_set, test_set = get_train_test_finalset_for_logit_2(train, test, custom_payload)
      lr = LogisticRegression(featuresCol = 'scaled_features', labelCol = 'label')
      result = model_train_logit(training_set, test_set, None, lr, split, custom_payload)
      
    # do not use gs version of logit - there is a bug - see func defn
    elif model == "logit_gs":
      training_set, test_set = get_train_test_finalset_for_logit(train, test, custom_payload)
      lr, pipeline = get_logit_pipeline(training_set, grid_search_mode=True)
      result = model_train_logit_grid_search(training_set, test_set, pipeline, lr, split)
    
    elif model == "rf":
      train, test = get_staged_data_for_trees(train, test, custom_payload)
      result = model_train_rf(train, test, split, custom_payload)
    
    elif model == "gbt":
      train, test = get_staged_data_for_trees(train, test, custom_payload)
      result = model_train_gbt(train, test, split, custom_payload)
    
    elif model == "svm":
      # reuses logit pre-processing
      training_set, test_set = get_train_test_finalset_for_logit(train, test, custom_payload)
      lr, pipeline = get_logit_pipeline(training_set, grid_search_mode=False)
      result = model_train_svm(training_set, test_set, pipeline, split, custom_payload)
    
    # a different function for processing the data is applied in this mode
    elif model == "svm_alt":
      # reuses logit pre-processing
      training_set, test_set = get_train_test_finalset_for_logit_2(train, test, custom_payload)
      result = model_train_svm(training_set, test_set, None, split, custom_payload)
      
    else:
      raise Exception("Model name not found - given name is {}".format(model))
      
    if collect_metrics:
      collect_metrics_result.append(result)
    
  return collect_metrics_result
      

## Driver Program - Logistic Regression

In [0]:
verbose = True
splits = get_timeseries_train_test_splits(data, train_test_ratio=2, test_months=4, start_year=2015, end_year=2019)

In [0]:
print(get_dates_from_splits(splits, 0, dtype="train"))
print(get_dates_from_splits(splits, 0, dtype="test"))

In [0]:
print(get_dates_from_splits(splits, 1, dtype="train"))
print(get_dates_from_splits(splits, 1, dtype="test"))
print(len(splits))

In [0]:
print(get_dates_from_splits(splits, 5, dtype="train"))
print(get_dates_from_splits(splits, 5, dtype="test"))

In [0]:
iters = len(splits) - 1

In [0]:
display(data.groupBy("year", "month").count())

year,month,count
2017,7,508950
2016,7,502383
2015,11,467903
2017,4,468215
2018,1,568494
2016,8,498276
2018,3,610541
2018,6,624661
2017,1,449906
2019,4,611814


In [0]:
###### WARNING: DO NOT MODIFY THE "data" OBJECT ######
###### IT IS SHARED AMONGST OTHER DRIVER PROGRAMS ######

# verbose logging / debug mode
# can be changed per driver program
verbose = True

if verbose:
  print("Total number of rows in original dataset are {}".format(n))

# all_cols, cols_to_consider, cols_to_drop, numeric_features, categorical_features, dt_features, bool_features = get_std_features(data)  


def get_values_from_hypothesis(hypothesis=1, custom_cols_to_drop=[]):
  ##### HYPOTHESIS ######
  data_, desired_numeric_h = get_std_desired_numeric(data, hypothesis= hypothesis, custom_cols_to_drop= custom_cols_to_drop)
  data_, desired_categorical_h = get_std_desired_categorical(data_, hypothesis= hypothesis, custom_cols_to_drop= custom_cols_to_drop)

  # assert label is numeric - this is because its needed for the classification metrics
  assert(get_feature_dtype(data_, 'dep_is_delayed') == 'int')
  cols_to_consider_h = list(set(desired_numeric_h + desired_categorical_h)) 
  
  # we added dep_is_delayed to desired_numeric_h as we wanted to convert it to numeric
  # however, it should not be part of the features list as it is the output var
  # we will later add this col to cols_to_consider so its still part of our dataset
  try:
    desired_numeric_h.remove('dep_is_delayed')
    desired_categorical_h.remove('dep_is_delayed')
  except:
    pass
  
  # ensure label and planned_departure_utc are present in cols_to_consider
  cols_to_consider_h = add_required_cols(cols_to_consider_h)
  # +2 in assert comes from adding planned_departure_utc and label (dep_is_delayed)
  assert(len(cols_to_consider_h) == len(desired_numeric_h) + len(desired_categorical_h) + 2)
  
  # create custom payload object
  custom_payload = {"categorical_features": desired_categorical_h, "numeric_features": desired_numeric_h}
  
  return desired_categorical_h, desired_numeric_h, cols_to_consider_h, data_.select(cols_to_consider_h).cache(), custom_payload

desired_categorical_logit, desired_numeric_logit, cols_to_consider_logit, data_logit, custom_payload_logit = get_values_from_hypothesis(3)

#### COMMON ####  
if verbose:
  print("Finally, there are {} categorical features and {} numeric features".format(len(desired_categorical_logit), len(desired_numeric_logit)))
  print("data_logit has {} rows".format(data_logit.count()))
  display(data_logit)


distance_group,origin_altitude,month,dest_city,day_of_month,dep_is_delayed,origin_wnd_speed,dest_altitude,holiday,arr_hour,origin_vis_dist,canceled,dep_hour,origin_tmp_c,planned_duration,poten_for_del,origin_dew_c,origin_city,origin_cig_cloud_agl,planned_departure_utc,origin_wnd_type,carrier,origin_slp_p,origin_ICAO,origin_vis_var,day_of_week,origin_cig_cavok,dest_ICAO
1,544.0,1,"Dallas/Fort Worth, TX",1,0,26.0,607.0,holiday,0600-0659,16093.0,False,0001-0559,28.0,50.0,-1,6.0,"Tyler, TX",1829.0,2015-01-01T11:55:00.000+0000,N,EV,10271.0,KTYR,N,4,N,KDFW
1,390.0,1,"Dallas/Fort Worth, TX",1,1,15.0,607.0,holiday,0700-0759,16093.0,True,0600-0659,28.0,60.0,-1,-33.0,"Texarkana, AR",1829.0,2015-01-01T12:00:00.000+0000,N,MQ,10283.0,KTXK,N,4,N,KDFW
3,433.0,1,"San Francisco, CA",1,0,21.0,13.0,holiday,0800-0859,16000.0,False,0600-0659,-11.0,125.0,-1,-61.0,"Seattle, WA",12479.802773912874,2015-01-01T14:05:00.000+0000,N,AS,10291.0,KSEA,9,4,N,KSFO
4,146.0,1,"Minneapolis, MN",1,1,0.0,841.0,holiday,0800-0859,16093.0,False,0600-0659,-50.0,171.0,-1,-106.0,"Baltimore, MD",22000.0,2015-01-01T11:25:00.000+0000,C,DL,10230.0,KBWI,N,4,N,KMSP
5,26.0,1,"New York, NY",1,0,26.0,21.0,holiday,0900-0959,16000.0,False,0600-0659,150.0,154.0,-1,122.0,"Tampa, FL",12479.802773912874,2015-01-01T11:40:00.000+0000,N,DL,10235.0,KTPA,9,4,N,KLGA
4,15.0,1,"Orlando, FL",1,0,15.0,96.0,holiday,0900-0959,16093.0,False,0700-0759,-17.0,143.0,-1,-111.0,"Washington, DC",22000.0,2015-01-01T12:00:00.000+0000,N,B6,10233.0,KDCA,N,4,N,KMCO
3,1135.0,1,"Denver, CO",1,1,15.0,5431.0,holiday,0800-0859,16000.0,False,0700-0759,22.0,107.0,0,-11.0,"Phoenix, AZ",12479.802773912874,2015-01-01T14:10:00.000+0000,N,UA,10179.0,KPHX,9,4,N,KDEN
3,173.0,1,"Charlotte, NC",1,0,57.0,748.0,holiday,1000-1059,16093.0,False,0800-0859,-50.0,135.0,-1,-156.0,"Hartford, CT",22000.0,2015-01-01T13:05:00.000+0000,N,US,10181.0,KBDL,N,4,N,KCLT
10,125.0,1,"New York, NY",1,0,31.0,13.0,holiday,1600-1659,16093.0,False,0800-0859,28.0,316.0,-1,-44.0,"Los Angeles, CA",22000.0,2015-01-01T16:15:00.000+0000,N,B6,10195.0,KLAX,N,4,N,KJFK
10,312.0,1,"Los Angeles, CA",1,0,26.0,125.0,holiday,1100-1159,16093.0,False,0800-0859,-67.0,340.0,-1,-117.0,"Washington, DC",22000.0,2015-01-01T13:30:00.000+0000,N,UA,10232.0,KIAD,N,4,N,KLAX


#### Display Result and Write Models to Storage

In [0]:
# perform actual training with logit model, get back list of dictionaries (each dic has train, test, val keys)
logit_results = model_train_and_eval(data_logit, splits, max_iter=iters, model = "logit", collect_metrics = True, custom_payload = custom_payload_logit)

storage_logit_results = []
for lrdic in logit_results:
  # get back well formed metrics dictionary for each time-iteration of the model
  metrics = get_classification_metrics(lrdic, with_display=True, display_train_metrics=True)
  storage_logit_results.append(metrics)
  
print("Displaying Aggregated Results for Logistic Regression")
get_aggregated_classification_metrcs(storage_logit_results, dtype="test", with_display=True)
  

dep_is_delayed,count
1,832781
0,3035505


dep_is_delayed,count
1,1436750
0,6124956


dep_is_delayed,count
1,2137454
0,9188348


dep_is_delayed,count
1,2937183
0,12179903


dep_is_delayed,count
1,3667991
0,15567444


accuracy,precision,recall,f1_score,AUPR,AUROC
0.7885611046205176,0.8473590082324842,0.8992376687734284,0.8687468612025804,0.8977079354316062,0.6948428663679481
0.6771480712312008,0.820948696197354,0.7033174402016774,0.7715798182501077,0.8806951577941912,0.6769067414491771
0.8569309174607835,0.8726603605248985,0.9999854955752044,0.9223398243790988,0.919985475224125,0.7059493452694364
0.8174623806098632,0.8545173088502819,0.9597994335724788,0.8967353797174727,0.8893888859243281,0.6950482504591197


In [0]:
print("Writing results to storage")
# get back formatted dictionary list that is compatible to write to storage
storage_logit_results = get_classification_metrics_for_storage_ingestion(storage_logit_results)
write_model_to_storage(storage_logit_results, "logit_h3yd_master")

display(read_model_from_storage("logit_h3yd_master"))

timestamp,train,test,val
2021-08-05T05:35:08.519+0000,"{ts_split=(datetime.datetime(2015, 1, 1, 0, 0), datetime.datetime(2016, 12, 25, 0, 0), datetime.datetime(2016, 12, 26, 0, 0), datetime.datetime(2017, 4, 25, 0, 0), datetime.datetime(2017, 4, 26, 0, 0), datetime.datetime(2017, 8, 24, 0, 0)), true_negative=1409469, f1_score=0.6515062301394831, false_negative=742788, false_positive=727985, recall=0.6492290517461621, precision=0.653799439219625, accuracy=0.6543457513100344, aupr=0.6934739285324251, true_positive=1374799, auroc=0.7147134974527742}","{ts_split=(datetime.datetime(2015, 1, 1, 0, 0), datetime.datetime(2016, 12, 25, 0, 0), datetime.datetime(2016, 12, 26, 0, 0), datetime.datetime(2017, 4, 25, 0, 0), datetime.datetime(2017, 4, 26, 0, 0), datetime.datetime(2017, 8, 24, 0, 0)), true_negative=45205, f1_score=0.8967353797174727, false_negative=17570, false_positive=313861, recall=0.9879378286718568, precision=0.820948696197354, accuracy=0.8174623806098632, aupr=0.8893888859243281, true_positive=1439050, auroc=0.6946038395098397}",
2021-08-05T05:35:08.519+0000,"{ts_split=(datetime.datetime(2015, 1, 1, 0, 0), datetime.datetime(2016, 4, 27, 0, 0), datetime.datetime(2016, 4, 28, 0, 0), datetime.datetime(2016, 8, 26, 0, 0), datetime.datetime(2016, 8, 27, 0, 0), datetime.datetime(2016, 12, 25, 0, 0)), true_negative=933776, f1_score=0.6546931780446235, false_negative=485925, false_positive=502974, recall=0.6586140542044374, precision=0.6508187093092518, accuracy=0.6542479229156345, aupr=0.6929841292454043, true_positive=937464, auroc=0.7152587671980959}","{ts_split=(datetime.datetime(2015, 1, 1, 0, 0), datetime.datetime(2016, 4, 27, 0, 0), datetime.datetime(2016, 4, 28, 0, 0), datetime.datetime(2016, 8, 26, 0, 0), datetime.datetime(2016, 8, 27, 0, 0), datetime.datetime(2016, 12, 25, 0, 0)), true_negative=150950, f1_score=0.8389212514617311, false_negative=234171, false_positive=256620, recall=0.8451481458459251, precision=0.8327854431720071, accuracy=0.7443525249557765, aupr=0.8844658874392344, true_positive=1278055, auroc=0.6950482504591197}",
2021-08-05T05:35:08.519+0000,"{ts_split=(datetime.datetime(2015, 1, 1, 0, 0), datetime.datetime(2015, 8, 29, 0, 0), datetime.datetime(2015, 8, 30, 0, 0), datetime.datetime(2015, 12, 28, 0, 0), datetime.datetime(2015, 12, 29, 0, 0), datetime.datetime(2016, 4, 27, 0, 0)), true_negative=550359, f1_score=0.6561844253093274, false_negative=284162, false_positive=282422, recall=0.6554923105833288, precision=0.6568780031466599, accuracy=0.6581934537311416, aupr=0.7011240368681892, true_positive=540673, auroc=0.7212082929263111}","{ts_split=(datetime.datetime(2015, 1, 1, 0, 0), datetime.datetime(2015, 8, 29, 0, 0), datetime.datetime(2015, 8, 30, 0, 0), datetime.datetime(2015, 12, 28, 0, 0), datetime.datetime(2015, 12, 29, 0, 0), datetime.datetime(2016, 4, 27, 0, 0)), true_negative=13705, f1_score=0.9223398243790987, false_negative=23, false_positive=267006, recall=0.9999854955752044, precision=0.8558832324178796, accuracy=0.8569309174607835, aupr=0.9140042707761531, true_positive=1585700, auroc=0.6769067414491771}",
2021-08-05T05:35:08.519+0000,"{ts_split=(datetime.datetime(2015, 1, 1, 0, 0), datetime.datetime(2017, 8, 24, 0, 0), datetime.datetime(2017, 8, 25, 0, 0), datetime.datetime(2017, 12, 23, 0, 0), datetime.datetime(2017, 12, 24, 0, 0), datetime.datetime(2018, 4, 23, 0, 0)), true_negative=1950994, f1_score=0.6503771210351889, false_negative=1032351, false_positive=986189, recall=0.6452182609233561, precision=0.655619141671247, accuracy=0.6547735651922874, aupr=0.6929865884773682, true_positive=1877469, auroc=0.7145275267567962}","{ts_split=(datetime.datetime(2015, 1, 1, 0, 0), datetime.datetime(2017, 8, 24, 0, 0), datetime.datetime(2017, 8, 25, 0, 0), datetime.datetime(2017, 12, 23, 0, 0), datetime.datetime(2017, 12, 24, 0, 0), datetime.datetime(2018, 4, 23, 0, 0)), true_negative=58286, f1_score=0.9141580322044911, false_negative=62640, false_positive=218232, recall=0.9597994335724788, precision=0.8726603605248985, accuracy=0.8469116288449642, aupr=0.919985475224125, true_positive=1495547, auroc=0.7017061551521674}",
2021-08-05T05:35:08.519+0000,"{ts_split=(datetime.datetime(2015, 1, 1, 0, 0), datetime.datetime(2018, 4, 23, 0, 0), datetime.datetime(2018, 4, 24, 0, 0), datetime.datetime(2018, 8, 22, 0, 0), datetime.datetime(2018, 8, 23, 0, 0), datetime.datetime(2018, 12, 21, 0, 0)), true_negative=2411047, f1_score=0.6523286497152486, false_negative=1266180, false_positive=1256944, recall=0.6514995088382959, precision=0.6531599037193014, accuracy=0.6544240450971578, aupr=0.6919895497590318, true_positive=2367043, auroc=0.7149592178641584}","{ts_split=(datetime.datetime(2015, 1, 1, 0, 0), datetime.datetime(2018, 4, 23, 0, 0), datetime.datetime(2018, 4, 24, 0, 0), datetime.datetime(2018, 8, 22, 0, 0), datetime.datetime(2018, 8, 23, 0, 0), datetime.datetime(2018, 12, 21, 0, 0)), true_negative=324660, f1_score=0.7715798182501077, false_negative=566305, false_positive=228560, recall=0.7033174402016774, precision=0.8545173088502819, accuracy=0.6771480712312008, aupr=0.8806951577941912, true_positive=1342486, auroc=0.7059493452694364}",


## Driver Program - Random Forests

In [0]:
###### WARNING: DO NOT MODIFY THE "data" OBJECT ######
###### IT IS SHARED AMONGST OTHER DRIVER PROGRAMS ######

# verbose logging / debug mode
# can be changed per driver program
verbose = True

if verbose:
  print("Total number of rows in original dataset are {}".format(n))

desired_categorical_rf, desired_numeric_rf, cols_to_consider_rf, data_rf, custom_payload_rf = get_values_from_hypothesis(3)

#### COMMON ####  
if verbose:
  print("Finally, there are {} categorical features and {} numeric features".format(len(desired_categorical_rf), len(desired_numeric_rf)))
  print("data_rf has {} rows".format(data_rf.count()))
  display(data_rf)


distance_group,origin_altitude,month,dest_city,day_of_month,dep_is_delayed,origin_wnd_speed,dest_altitude,holiday,arr_hour,origin_vis_dist,canceled,dep_hour,origin_tmp_c,planned_duration,poten_for_del,origin_dew_c,origin_city,origin_cig_cloud_agl,planned_departure_utc,origin_wnd_type,carrier,origin_slp_p,origin_ICAO,origin_vis_var,day_of_week,origin_cig_cavok,dest_ICAO
1,544.0,1,"Dallas/Fort Worth, TX",1,0,26.0,607.0,holiday,0600-0659,16093.0,False,0001-0559,28.0,50.0,-1,6.0,"Tyler, TX",1829.0,2015-01-01T11:55:00.000+0000,N,EV,10271.0,KTYR,N,4,N,KDFW
1,390.0,1,"Dallas/Fort Worth, TX",1,1,15.0,607.0,holiday,0700-0759,16093.0,True,0600-0659,28.0,60.0,-1,-33.0,"Texarkana, AR",1829.0,2015-01-01T12:00:00.000+0000,N,MQ,10283.0,KTXK,N,4,N,KDFW
3,433.0,1,"San Francisco, CA",1,0,21.0,13.0,holiday,0800-0859,16000.0,False,0600-0659,-11.0,125.0,-1,-61.0,"Seattle, WA",12479.802773912874,2015-01-01T14:05:00.000+0000,N,AS,10291.0,KSEA,9,4,N,KSFO
4,146.0,1,"Minneapolis, MN",1,1,0.0,841.0,holiday,0800-0859,16093.0,False,0600-0659,-50.0,171.0,-1,-106.0,"Baltimore, MD",22000.0,2015-01-01T11:25:00.000+0000,C,DL,10230.0,KBWI,N,4,N,KMSP
5,26.0,1,"New York, NY",1,0,26.0,21.0,holiday,0900-0959,16000.0,False,0600-0659,150.0,154.0,-1,122.0,"Tampa, FL",12479.802773912874,2015-01-01T11:40:00.000+0000,N,DL,10235.0,KTPA,9,4,N,KLGA
4,15.0,1,"Orlando, FL",1,0,15.0,96.0,holiday,0900-0959,16093.0,False,0700-0759,-17.0,143.0,-1,-111.0,"Washington, DC",22000.0,2015-01-01T12:00:00.000+0000,N,B6,10233.0,KDCA,N,4,N,KMCO
3,1135.0,1,"Denver, CO",1,1,15.0,5431.0,holiday,0800-0859,16000.0,False,0700-0759,22.0,107.0,0,-11.0,"Phoenix, AZ",12479.802773912874,2015-01-01T14:10:00.000+0000,N,UA,10179.0,KPHX,9,4,N,KDEN
3,173.0,1,"Charlotte, NC",1,0,57.0,748.0,holiday,1000-1059,16093.0,False,0800-0859,-50.0,135.0,-1,-156.0,"Hartford, CT",22000.0,2015-01-01T13:05:00.000+0000,N,US,10181.0,KBDL,N,4,N,KCLT
10,125.0,1,"New York, NY",1,0,31.0,13.0,holiday,1600-1659,16093.0,False,0800-0859,28.0,316.0,-1,-44.0,"Los Angeles, CA",22000.0,2015-01-01T16:15:00.000+0000,N,B6,10195.0,KLAX,N,4,N,KJFK
10,312.0,1,"Los Angeles, CA",1,0,26.0,125.0,holiday,1100-1159,16093.0,False,0800-0859,-67.0,340.0,-1,-117.0,"Washington, DC",22000.0,2015-01-01T13:30:00.000+0000,N,UA,10232.0,KIAD,N,4,N,KLAX


#### Display Result and Write Models to Storage

In [0]:
# perform actual training with RF model
rf_results = model_train_and_eval(data_rf, splits, max_iter=iters, model = "rf", collect_metrics = True, custom_payload = custom_payload_rf)

storage_rf_results = []
for rfdic in rf_results:
  # get back well formed metrics dictionary for each time-iteration of the model
  metrics = get_classification_metrics(rfdic, with_display=True, display_train_metrics=True)
  storage_rf_results.append(metrics)  

print("Displaying Aggregated Results for Random Forests")
get_aggregated_classification_metrcs(storage_rf_results, dtype="test", with_display=True)

dep_is_delayed,count
1,832781
0,3035505


dep_is_delayed,count
1,1436750
0,6124956


dep_is_delayed,count
1,2137454
0,9188348


dep_is_delayed,count
1,2937183
0,12179903


dep_is_delayed,count
1,3667991
0,15567444


accuracy,precision,recall,f1_score,AUPR,AUROC
0.6421419620802289,0.8746464345157715,0.6505635010127331,0.7446926141900234,0.8901594735094122,0.6737299380052443
0.5894376277479482,0.8531277941675711,0.5599890492558652,0.6824167074963253,0.8755330732730122,0.6540642889267646
0.7015754576348786,0.8906726266877525,0.7403861025666367,0.8082131369137546,0.9110550864467144,0.6918165651353102
0.644663650974752,0.8733537534936007,0.654324124537469,0.7406167853324469,0.8790202879475779,0.671894474330754


In [0]:
print("Writing results to storage")
# get back formatted dictionary list that is compatible to write to storage
storage_rf_results = get_classification_metrics_for_storage_ingestion(storage_rf_results)
write_model_to_storage(storage_rf_results, "rf_h3yd_master")

display(read_model_from_storage("rf_h3yd_master"))

timestamp,train,test,val
2021-08-05T06:49:41.768+0000,"{ts_split=(datetime.datetime(2015, 1, 1, 0, 0), datetime.datetime(2016, 12, 25, 0, 0), datetime.datetime(2016, 12, 26, 0, 0), datetime.datetime(2017, 4, 25, 0, 0), datetime.datetime(2017, 4, 26, 0, 0), datetime.datetime(2017, 8, 24, 0, 0)), true_negative=1401061, f1_score=0.6140927937904707, false_negative=852994, false_positive=736393, recall=0.5971858535210124, precision=0.6319849314288056, accuracy=0.6264696391879655, aupr=0.657886462399067, true_positive=1264593, auroc=0.6795846856822266}","{ts_split=(datetime.datetime(2015, 1, 1, 0, 0), datetime.datetime(2016, 12, 25, 0, 0), datetime.datetime(2016, 12, 26, 0, 0), datetime.datetime(2017, 4, 25, 0, 0), datetime.datetime(2017, 4, 26, 0, 0), datetime.datetime(2017, 8, 24, 0, 0)), true_negative=222447, f1_score=0.7145544981826543, false_negative=570969, false_positive=136619, recall=0.6080178770029245, precision=0.8663572246079803, accuracy=0.6102916473443095, aupr=0.8778852683158707, true_positive=885651, auroc=0.6651459574158645}",
2021-08-05T06:49:41.768+0000,"{ts_split=(datetime.datetime(2015, 1, 1, 0, 0), datetime.datetime(2015, 8, 29, 0, 0), datetime.datetime(2015, 8, 30, 0, 0), datetime.datetime(2015, 12, 28, 0, 0), datetime.datetime(2015, 12, 29, 0, 0), datetime.datetime(2016, 4, 27, 0, 0)), true_negative=485152, f1_score=0.6494808548674319, false_negative=260983, false_positive=347629, recall=0.6835936884346566, precision=0.6186108103185914, accuracy=0.632838968735823, aupr=0.6722835145817345, true_positive=563852, auroc=0.6907214655227792}","{ts_split=(datetime.datetime(2015, 1, 1, 0, 0), datetime.datetime(2015, 8, 29, 0, 0), datetime.datetime(2015, 8, 30, 0, 0), datetime.datetime(2015, 12, 28, 0, 0), datetime.datetime(2015, 12, 29, 0, 0), datetime.datetime(2016, 4, 27, 0, 0)), true_negative=146388, f1_score=0.7776619430249357, false_negative=491415, false_positive=134323, recall=0.6901003517007699, precision=0.8906726266877525, accuracy=0.6647414266992564, aupr=0.9073036515638856, true_positive=1094308, auroc=0.6540642889267646}",
2021-08-05T06:49:41.768+0000,"{ts_split=(datetime.datetime(2015, 1, 1, 0, 0), datetime.datetime(2018, 4, 23, 0, 0), datetime.datetime(2018, 4, 24, 0, 0), datetime.datetime(2018, 8, 22, 0, 0), datetime.datetime(2018, 8, 23, 0, 0), datetime.datetime(2018, 12, 21, 0, 0)), true_negative=2112363, f1_score=0.6429893900900336, false_negative=1174601, false_positive=1555628, recall=0.676705503625844, precision=0.6124735629320546, accuracy=0.6260582144284499, aupr=0.6653358633687415, true_positive=2458622, auroc=0.6840743209727}","{ts_split=(datetime.datetime(2015, 1, 1, 0, 0), datetime.datetime(2018, 4, 23, 0, 0), datetime.datetime(2018, 4, 24, 0, 0), datetime.datetime(2018, 8, 22, 0, 0), datetime.datetime(2018, 8, 23, 0, 0), datetime.datetime(2018, 12, 21, 0, 0)), true_negative=338201, f1_score=0.7406167853324469, false_negative=659823, false_positive=215019, recall=0.654324124537469, precision=0.8531277941675711, accuracy=0.644663650974752, aupr=0.8755330732730122, true_positive=1248968, auroc=0.6918165651353102}",
2021-08-05T06:49:41.768+0000,"{ts_split=(datetime.datetime(2015, 1, 1, 0, 0), datetime.datetime(2017, 8, 24, 0, 0), datetime.datetime(2017, 8, 25, 0, 0), datetime.datetime(2017, 12, 23, 0, 0), datetime.datetime(2017, 12, 24, 0, 0), datetime.datetime(2018, 4, 23, 0, 0)), true_negative=1605055, f1_score=0.6446437209682769, false_negative=892236, false_positive=1332128, recall=0.6933707239623069, precision=0.6023156617643547, accuracy=0.61957194138604, aupr=0.656671564959506, true_positive=2017584, auroc=0.6757292936233243}","{ts_split=(datetime.datetime(2015, 1, 1, 0, 0), datetime.datetime(2017, 8, 24, 0, 0), datetime.datetime(2017, 8, 25, 0, 0), datetime.datetime(2017, 12, 23, 0, 0), datetime.datetime(2017, 12, 24, 0, 0), datetime.datetime(2018, 4, 23, 0, 0)), true_negative=133524, f1_score=0.8082131369137546, false_negative=404527, false_positive=142994, recall=0.7403861025666367, precision=0.8897207736219531, accuracy=0.7015754576348786, aupr=0.9110550864467145, true_positive=1153660, auroc=0.671894474330754}",
2021-08-05T06:49:41.768+0000,"{ts_split=(datetime.datetime(2015, 1, 1, 0, 0), datetime.datetime(2016, 4, 27, 0, 0), datetime.datetime(2016, 4, 28, 0, 0), datetime.datetime(2016, 8, 26, 0, 0), datetime.datetime(2016, 8, 27, 0, 0), datetime.datetime(2016, 12, 25, 0, 0)), true_negative=962658, f1_score=0.6123626380658153, false_negative=586033, false_positive=474092, recall=0.5882833153832157, precision=0.638497294593457, accuracy=0.6293449374313625, aupr=0.6639286518806238, true_positive=837356, auroc=0.6859627169551191}","{ts_split=(datetime.datetime(2015, 1, 1, 0, 0), datetime.datetime(2016, 4, 27, 0, 0), datetime.datetime(2016, 4, 28, 0, 0), datetime.datetime(2016, 8, 26, 0, 0), datetime.datetime(2016, 8, 27, 0, 0), datetime.datetime(2016, 12, 25, 0, 0)), true_negative=284770, f1_score=0.6824167074963253, false_negative=665396, false_positive=122800, recall=0.5599890492558652, precision=0.8733537534936007, accuracy=0.5894376277479482, aupr=0.8790202879475779, true_positive=846830, auroc=0.6857284042175282}",


## Driver Program - GBT

In [0]:
###### WARNING: DO NOT MODIFY THE "data" OBJECT ######
###### IT IS SHARED AMONGST OTHER DRIVER PROGRAMS ######

# verbose logging / debug mode
# can be changed per driver program
verbose = True

if verbose:
  print("Total number of rows in original dataset are {}".format(n))

desired_categorical_gbt, desired_numeric_gbt, cols_to_consider_gbt, data_gbt, custom_payload_gbt = get_values_from_hypothesis(3)
    
#### COMMON ####  
if verbose:
  print("Finally, there are {} categorical features and {} numeric features".format(len(desired_categorical_gbt), len(desired_numeric_gbt)))
  print("data_gbt has {} rows".format(data_gbt.count()))
  display(data_gbt)


distance_group,origin_altitude,month,dest_city,day_of_month,dep_is_delayed,origin_wnd_speed,dest_altitude,holiday,arr_hour,origin_vis_dist,canceled,dep_hour,origin_tmp_c,planned_duration,poten_for_del,origin_dew_c,origin_city,origin_cig_cloud_agl,planned_departure_utc,origin_wnd_type,carrier,origin_slp_p,origin_ICAO,origin_vis_var,day_of_week,origin_cig_cavok,dest_ICAO
1,544.0,1,"Dallas/Fort Worth, TX",1,0,26.0,607.0,holiday,0600-0659,16093.0,False,0001-0559,28.0,50.0,-1,6.0,"Tyler, TX",1829.0,2015-01-01T11:55:00.000+0000,N,EV,10271.0,KTYR,N,4,N,KDFW
1,390.0,1,"Dallas/Fort Worth, TX",1,1,15.0,607.0,holiday,0700-0759,16093.0,True,0600-0659,28.0,60.0,-1,-33.0,"Texarkana, AR",1829.0,2015-01-01T12:00:00.000+0000,N,MQ,10283.0,KTXK,N,4,N,KDFW
3,433.0,1,"San Francisco, CA",1,0,21.0,13.0,holiday,0800-0859,16000.0,False,0600-0659,-11.0,125.0,-1,-61.0,"Seattle, WA",12479.802773912874,2015-01-01T14:05:00.000+0000,N,AS,10291.0,KSEA,9,4,N,KSFO
4,146.0,1,"Minneapolis, MN",1,1,0.0,841.0,holiday,0800-0859,16093.0,False,0600-0659,-50.0,171.0,-1,-106.0,"Baltimore, MD",22000.0,2015-01-01T11:25:00.000+0000,C,DL,10230.0,KBWI,N,4,N,KMSP
5,26.0,1,"New York, NY",1,0,26.0,21.0,holiday,0900-0959,16000.0,False,0600-0659,150.0,154.0,-1,122.0,"Tampa, FL",12479.802773912874,2015-01-01T11:40:00.000+0000,N,DL,10235.0,KTPA,9,4,N,KLGA
4,15.0,1,"Orlando, FL",1,0,15.0,96.0,holiday,0900-0959,16093.0,False,0700-0759,-17.0,143.0,-1,-111.0,"Washington, DC",22000.0,2015-01-01T12:00:00.000+0000,N,B6,10233.0,KDCA,N,4,N,KMCO
3,1135.0,1,"Denver, CO",1,1,15.0,5431.0,holiday,0800-0859,16000.0,False,0700-0759,22.0,107.0,0,-11.0,"Phoenix, AZ",12479.802773912874,2015-01-01T14:10:00.000+0000,N,UA,10179.0,KPHX,9,4,N,KDEN
3,173.0,1,"Charlotte, NC",1,0,57.0,748.0,holiday,1000-1059,16093.0,False,0800-0859,-50.0,135.0,-1,-156.0,"Hartford, CT",22000.0,2015-01-01T13:05:00.000+0000,N,US,10181.0,KBDL,N,4,N,KCLT
10,125.0,1,"New York, NY",1,0,31.0,13.0,holiday,1600-1659,16093.0,False,0800-0859,28.0,316.0,-1,-44.0,"Los Angeles, CA",22000.0,2015-01-01T16:15:00.000+0000,N,B6,10195.0,KLAX,N,4,N,KJFK
10,312.0,1,"Los Angeles, CA",1,0,26.0,125.0,holiday,1100-1159,16093.0,False,0800-0859,-67.0,340.0,-1,-117.0,"Washington, DC",22000.0,2015-01-01T13:30:00.000+0000,N,UA,10232.0,KIAD,N,4,N,KLAX


#### Display Result and Write Models to Storage

In [0]:
# perform actual training with GBT model
gbt_results = model_train_and_eval(data_gbt, splits, max_iter=iters, model = "gbt", collect_metrics = True, custom_payload = custom_payload_gbt)

storage_gbt_results = []
for gbtdic in gbt_results:
  # get back well formed metrics dictionary for each time-iteration of the model
  metrics = get_classification_metrics(gbtdic, with_display=True, display_train_metrics=True)
  storage_gbt_results.append(metrics)  

print("Displaying Aggregated Results for GBT")
get_aggregated_classification_metrcs(storage_gbt_results, dtype="test", with_display=True)

dep_is_delayed,count
1,832781
0,3035505


dep_is_delayed,count
1,1436750
0,6124956


dep_is_delayed,count
1,2137454
0,9188348


dep_is_delayed,count
1,2937183
0,12179903


dep_is_delayed,count
1,3667991
0,15567444


accuracy,precision,recall,f1_score,AUPR,AUROC
0.6243590857940666,0.8926150594181455,0.6083066960628207,0.7218282104025474,0.8998058997109881,0.6998663632027615
0.5872337477523654,0.8801532653610803,0.5432785840211715,0.674107480366953,0.8847121402786078,0.6774585336946328
0.696234544518056,0.9040510943342772,0.71902666368029,0.8008202779338374,0.9210147978440892,0.7160095574738256
0.6129347254976907,0.8897880585408542,0.5990972250827258,0.7129250689207114,0.8914687464457473,0.7048790498148595


In [0]:
print("Writing results to storage")
# get back formatted dictionary list that is compatible to write to storage
storage_gbt_results = get_classification_metrics_for_storage_ingestion(storage_gbt_results)
write_model_to_storage(storage_gbt_results, "gbt_h3yd_master")

display(read_model_from_storage("gbt_h3yd_master"))

timestamp,train,test,val
2021-08-05T09:47:09.608+0000,"{ts_split=(datetime.datetime(2015, 1, 1, 0, 0), datetime.datetime(2016, 12, 25, 0, 0), datetime.datetime(2016, 12, 26, 0, 0), datetime.datetime(2017, 4, 25, 0, 0), datetime.datetime(2017, 4, 26, 0, 0), datetime.datetime(2017, 8, 24, 0, 0)), true_negative=1465612, f1_score=0.6392143098099315, false_negative=807283, false_positive=671842, recall=0.6187722157342296, precision=0.66105322211381, accuracy=0.6523829030084551, aupr=0.6919481753701244, true_positive=1310304, auroc=0.7136525166774328}","{ts_split=(datetime.datetime(2015, 1, 1, 0, 0), datetime.datetime(2016, 12, 25, 0, 0), datetime.datetime(2016, 12, 26, 0, 0), datetime.datetime(2017, 4, 25, 0, 0), datetime.datetime(2017, 4, 26, 0, 0), datetime.datetime(2017, 8, 24, 0, 0)), true_negative=240240, f1_score=0.7129250689207114, false_negative=583963, false_positive=118826, recall=0.5990972250827258, precision=0.8801532653610803, accuracy=0.6129347254976907, aupr=0.8914687464457473, true_positive=872657, auroc=0.6939800072881317}",
2021-08-05T09:47:09.608+0000,"{ts_split=(datetime.datetime(2015, 1, 1, 0, 0), datetime.datetime(2015, 8, 29, 0, 0), datetime.datetime(2015, 8, 30, 0, 0), datetime.datetime(2015, 12, 28, 0, 0), datetime.datetime(2015, 12, 29, 0, 0), datetime.datetime(2016, 4, 27, 0, 0)), true_negative=570154, f1_score=0.646100789646511, false_negative=305882, false_positive=262627, recall=0.629159771348209, precision=0.6639793751119527, accuracy=0.6570321473730949, aupr=0.6980700933552023, true_positive=518953, auroc=0.7208665305761502}","{ts_split=(datetime.datetime(2015, 1, 1, 0, 0), datetime.datetime(2015, 8, 29, 0, 0), datetime.datetime(2015, 8, 30, 0, 0), datetime.datetime(2015, 12, 28, 0, 0), datetime.datetime(2015, 12, 29, 0, 0), datetime.datetime(2016, 4, 27, 0, 0)), true_negative=173687, f1_score=0.7466471687984747, false_negative=577320, false_positive=107024, recall=0.6359263250895648, precision=0.9040510943342773, accuracy=0.6333414414868139, aupr=0.9135132335325578, true_positive=1008403, auroc=0.6774585336946328}",
2021-08-05T09:47:09.608+0000,"{ts_split=(datetime.datetime(2015, 1, 1, 0, 0), datetime.datetime(2016, 4, 27, 0, 0), datetime.datetime(2016, 4, 28, 0, 0), datetime.datetime(2016, 8, 26, 0, 0), datetime.datetime(2016, 8, 27, 0, 0), datetime.datetime(2016, 12, 25, 0, 0)), true_negative=972434, f1_score=0.643019064320381, false_negative=528881, false_positive=464316, recall=0.6284353750099235, precision=0.6582957027547349, accuracy=0.6527451987473336, aupr=0.6920180143289207, true_positive=894508, auroc=0.7148296571376977}","{ts_split=(datetime.datetime(2015, 1, 1, 0, 0), datetime.datetime(2016, 4, 27, 0, 0), datetime.datetime(2016, 4, 28, 0, 0), datetime.datetime(2016, 8, 26, 0, 0), datetime.datetime(2016, 8, 27, 0, 0), datetime.datetime(2016, 12, 25, 0, 0)), true_negative=305809, f1_score=0.6746410559927606, false_negative=690666, false_positive=101761, recall=0.5432785840211715, precision=0.8897880585408542, accuracy=0.5872337477523654, aupr=0.888320580453939, true_positive=821560, auroc=0.7070046677423582}",
2021-08-05T09:47:09.608+0000,"{ts_split=(datetime.datetime(2015, 1, 1, 0, 0), datetime.datetime(2017, 8, 24, 0, 0), datetime.datetime(2017, 8, 25, 0, 0), datetime.datetime(2017, 12, 23, 0, 0), datetime.datetime(2017, 12, 24, 0, 0), datetime.datetime(2018, 4, 23, 0, 0)), true_negative=2024308, f1_score=0.6378713036661575, false_negative=1119690, false_positive=912875, recall=0.6152030022475617, precision=0.6622740246503429, accuracy=0.6523749004404479, aupr=0.6913470205470011, true_positive=1790130, auroc=0.7129935004866408}","{ts_split=(datetime.datetime(2015, 1, 1, 0, 0), datetime.datetime(2017, 8, 24, 0, 0), datetime.datetime(2017, 8, 25, 0, 0), datetime.datetime(2017, 12, 23, 0, 0), datetime.datetime(2017, 12, 24, 0, 0), datetime.datetime(2018, 4, 23, 0, 0)), true_negative=157007, f1_score=0.8008202779338374, false_negative=437809, false_positive=119511, recall=0.71902666368029, precision=0.9036115329678706, accuracy=0.696234544518056, aupr=0.9210147978440891, true_positive=1120378, auroc=0.7048790498148595}",
2021-08-05T09:47:09.608+0000,"{ts_split=(datetime.datetime(2015, 1, 1, 0, 0), datetime.datetime(2018, 4, 23, 0, 0), datetime.datetime(2018, 4, 24, 0, 0), datetime.datetime(2018, 8, 22, 0, 0), datetime.datetime(2018, 8, 23, 0, 0), datetime.datetime(2018, 12, 21, 0, 0)), true_negative=2487171, f1_score=0.6412089914242255, false_negative=1361492, false_positive=1180820, recall=0.625266051657165, precision=0.6579862252577876, accuracy=0.6517959889958026, aupr=0.6899947678243077, true_positive=2271731, auroc=0.712958537389161}","{ts_split=(datetime.datetime(2015, 1, 1, 0, 0), datetime.datetime(2018, 4, 23, 0, 0), datetime.datetime(2018, 4, 24, 0, 0), datetime.datetime(2018, 8, 22, 0, 0), datetime.datetime(2018, 8, 23, 0, 0), datetime.datetime(2018, 12, 21, 0, 0)), true_negative=418863, f1_score=0.674107480366953, false_negative=870018, false_positive=134357, recall=0.544204682440351, precision=0.8854713458866451, accuracy=0.5920509697154075, aupr=0.8847121402786078, true_positive=1038773, auroc=0.7160095574738256}",


## Driver Program - SVM

In [0]:
###### WARNING: DO NOT MODIFY THE "data" OBJECT ######
###### IT IS SHARED AMONGST OTHER DRIVER PROGRAMS ######

# verbose logging / debug mode
# can be changed per driver program
verbose = True

if verbose:
  print("Total number of rows in original data set are {}".format(data.count()))

desired_categorical_svm, desired_numeric_svm, cols_to_consider_svm, data_svm, custom_payload_svm = get_values_from_hypothesis(3)

#### COMMON ####  
if verbose:
  print("Finally, there are {} categorical features and {} numeric features".format(len(desired_categorical_svm), len(desired_numeric_svm)))
  print("data_gbt has {} rows".format(data_svm.count()))
  display(data_svm)


distance_group,origin_altitude,month,dest_city,day_of_month,dep_is_delayed,origin_wnd_speed,dest_altitude,holiday,arr_hour,origin_vis_dist,canceled,dep_hour,origin_tmp_c,planned_duration,poten_for_del,origin_dew_c,origin_city,origin_cig_cloud_agl,planned_departure_utc,origin_wnd_type,carrier,origin_slp_p,origin_ICAO,origin_vis_var,day_of_week,origin_cig_cavok,dest_ICAO
1,544.0,1,"Dallas/Fort Worth, TX",1,0,26.0,607.0,holiday,0600-0659,16093.0,False,0001-0559,28.0,50.0,-1,6.0,"Tyler, TX",1829.0,2015-01-01T11:55:00.000+0000,N,EV,10271.0,KTYR,N,4,N,KDFW
1,390.0,1,"Dallas/Fort Worth, TX",1,1,15.0,607.0,holiday,0700-0759,16093.0,True,0600-0659,28.0,60.0,-1,-33.0,"Texarkana, AR",1829.0,2015-01-01T12:00:00.000+0000,N,MQ,10283.0,KTXK,N,4,N,KDFW
3,433.0,1,"San Francisco, CA",1,0,21.0,13.0,holiday,0800-0859,16000.0,False,0600-0659,-11.0,125.0,-1,-61.0,"Seattle, WA",12479.802773912874,2015-01-01T14:05:00.000+0000,N,AS,10291.0,KSEA,9,4,N,KSFO
4,146.0,1,"Minneapolis, MN",1,1,0.0,841.0,holiday,0800-0859,16093.0,False,0600-0659,-50.0,171.0,-1,-106.0,"Baltimore, MD",22000.0,2015-01-01T11:25:00.000+0000,C,DL,10230.0,KBWI,N,4,N,KMSP
5,26.0,1,"New York, NY",1,0,26.0,21.0,holiday,0900-0959,16000.0,False,0600-0659,150.0,154.0,-1,122.0,"Tampa, FL",12479.802773912874,2015-01-01T11:40:00.000+0000,N,DL,10235.0,KTPA,9,4,N,KLGA
4,15.0,1,"Orlando, FL",1,0,15.0,96.0,holiday,0900-0959,16093.0,False,0700-0759,-17.0,143.0,-1,-111.0,"Washington, DC",22000.0,2015-01-01T12:00:00.000+0000,N,B6,10233.0,KDCA,N,4,N,KMCO
3,1135.0,1,"Denver, CO",1,1,15.0,5431.0,holiday,0800-0859,16000.0,False,0700-0759,22.0,107.0,0,-11.0,"Phoenix, AZ",12479.802773912874,2015-01-01T14:10:00.000+0000,N,UA,10179.0,KPHX,9,4,N,KDEN
3,173.0,1,"Charlotte, NC",1,0,57.0,748.0,holiday,1000-1059,16093.0,False,0800-0859,-50.0,135.0,-1,-156.0,"Hartford, CT",22000.0,2015-01-01T13:05:00.000+0000,N,US,10181.0,KBDL,N,4,N,KCLT
10,125.0,1,"New York, NY",1,0,31.0,13.0,holiday,1600-1659,16093.0,False,0800-0859,28.0,316.0,-1,-44.0,"Los Angeles, CA",22000.0,2015-01-01T16:15:00.000+0000,N,B6,10195.0,KLAX,N,4,N,KJFK
10,312.0,1,"Los Angeles, CA",1,0,26.0,125.0,holiday,1100-1159,16093.0,False,0800-0859,-67.0,340.0,-1,-117.0,"Washington, DC",22000.0,2015-01-01T13:30:00.000+0000,N,UA,10232.0,KIAD,N,4,N,KLAX


#### Display Result and Write Models to Storage

In [0]:
# perform actual training with SVM model
svm_results = model_train_and_eval(data_svm, splits, max_iter=iters, model = "svm", collect_metrics = True, custom_payload = custom_payload_svm)

storage_svm_results = []
for svmdic in svm_results:
  # get back well formed metrics dictionary for each time-iteration of the model
  metrics = get_classification_metrics(svmdic, with_display=True, display_train_metrics=True)
  storage_svm_results.append(metrics)  

print("Displaying Aggregated Results for SVM")
get_aggregated_classification_metrcs(storage_svm_results, dtype="test", with_display=True)

dep_is_delayed,count
1,832781
0,3035505


dep_is_delayed,count
1,1436750
0,6124956


dep_is_delayed,count
1,2137454
0,9188348


dep_is_delayed,count
1,2937183
0,12179903


dep_is_delayed,count
1,3667991
0,15567444


accuracy,precision,recall,f1_score,AUPR,AUROC
0.5549157208453064,0.7029061943855269,0.5289111995645079,0.5984284218612483,0.8939389942725955,0.6846407710188338
0.1507152375995051,0.0,0.0,0.0,0.8836901184486058,0.6491754446646514
0.7372502858391993,0.8932659616689248,0.7930193356595068,0.8368266297289932,0.9109066686998832,0.715381546588414
0.6515244328043188,0.872010369405665,0.6535259941305069,0.747122726379292,0.8860854480916317,0.6877085661874162


In [0]:
print("Writing results to storage")
# get back formatted dictionary list that is compatible to write to storage
storage_svm_results = get_classification_metrics_for_storage_ingestion(storage_svm_results)
write_model_to_storage(storage_svm_results, "svm_h3yd_master")

display(read_model_from_storage("svm_h3yd_master"))

timestamp,train,test,val
2021-08-05T11:20:57.483+0000,"{ts_split=(datetime.datetime(2015, 1, 1, 0, 0), datetime.datetime(2018, 4, 23, 0, 0), datetime.datetime(2018, 4, 24, 0, 0), datetime.datetime(2018, 8, 22, 0, 0), datetime.datetime(2018, 8, 23, 0, 0), datetime.datetime(2018, 12, 21, 0, 0)), true_negative=2503326, f1_score=0.6290642030392454, false_negative=1431676, false_positive=1164665, recall=0.6059487678020314, precision=0.6540131756407499, accuracy=0.6443959867496009, aupr=0.6838361085875146, true_positive=2201547, auroc=0.7075288180007724}","{ts_split=(datetime.datetime(2015, 1, 1, 0, 0), datetime.datetime(2018, 4, 23, 0, 0), datetime.datetime(2018, 4, 24, 0, 0), datetime.datetime(2018, 8, 22, 0, 0), datetime.datetime(2018, 8, 23, 0, 0), datetime.datetime(2018, 12, 21, 0, 0)), true_negative=444033, f1_score=0.6233736377270682, false_negative=994996, false_positive=109187, recall=0.4787297299704368, precision=0.8932659616689248, accuracy=0.5515117519783624, aupr=0.8836901184486058, true_positive=913795, auroc=0.715381546588414}",
2021-08-05T11:20:57.483+0000,"{ts_split=(datetime.datetime(2015, 1, 1, 0, 0), datetime.datetime(2015, 8, 29, 0, 0), datetime.datetime(2015, 8, 30, 0, 0), datetime.datetime(2015, 12, 28, 0, 0), datetime.datetime(2015, 12, 29, 0, 0), datetime.datetime(2016, 4, 27, 0, 0)), true_negative=562758, f1_score=0.6286299050062126, false_negative=322957, false_positive=270023, recall=0.6084586614292555, precision=0.6501844148407633, accuracy=0.6422693796391926, aupr=0.6891013053441075, true_positive=501878, auroc=0.7102854282924579}","{ts_split=(datetime.datetime(2015, 1, 1, 0, 0), datetime.datetime(2015, 8, 29, 0, 0), datetime.datetime(2015, 8, 30, 0, 0), datetime.datetime(2015, 12, 28, 0, 0), datetime.datetime(2015, 12, 29, 0, 0), datetime.datetime(2016, 4, 27, 0, 0)), true_negative=118520, f1_score=0.8368266297289932, false_negative=328214, false_positive=162191, recall=0.7930193356595068, precision=0.8857568500387406, accuracy=0.7372502858391993, aupr=0.9109066686998832, true_positive=1257509, auroc=0.6693353302018442}",
2021-08-05T11:20:57.483+0000,"{ts_split=(datetime.datetime(2015, 1, 1, 0, 0), datetime.datetime(2016, 12, 25, 0, 0), datetime.datetime(2016, 12, 26, 0, 0), datetime.datetime(2017, 4, 25, 0, 0), datetime.datetime(2017, 4, 26, 0, 0), datetime.datetime(2017, 8, 24, 0, 0)), true_negative=1459517, f1_score=0.6290362288349629, false_negative=834923, false_positive=677937, recall=0.6057196233259837, precision=0.6542198030093833, accuracy=0.6444546597788364, aupr=0.6855291328929233, true_positive=1282664, auroc=0.7076465383707599}","{ts_split=(datetime.datetime(2015, 1, 1, 0, 0), datetime.datetime(2016, 12, 25, 0, 0), datetime.datetime(2016, 12, 26, 0, 0), datetime.datetime(2017, 4, 25, 0, 0), datetime.datetime(2017, 4, 26, 0, 0), datetime.datetime(2017, 8, 24, 0, 0)), true_negative=193442, f1_score=0.7848191154708886, false_negative=408901, false_positive=165624, recall=0.7192809380620889, precision=0.863497790814304, accuracy=0.6835768960051463, aupr=0.8857620286804232, true_positive=1047719, auroc=0.6877085661874162}",
2021-08-05T11:20:57.483+0000,"{ts_split=(datetime.datetime(2015, 1, 1, 0, 0), datetime.datetime(2016, 4, 27, 0, 0), datetime.datetime(2016, 4, 28, 0, 0), datetime.datetime(2016, 8, 26, 0, 0), datetime.datetime(2016, 8, 27, 0, 0), datetime.datetime(2016, 12, 25, 0, 0)), true_negative=969876, f1_score=0.6381871245179385, false_negative=537554, false_positive=466874, recall=0.6223421706926217, precision=0.6548599883640901, accuracy=0.6488184665150889, aupr=0.6876616737022478, true_positive=885835, auroc=0.7108143790515088}","{ts_split=(datetime.datetime(2015, 1, 1, 0, 0), datetime.datetime(2016, 4, 27, 0, 0), datetime.datetime(2016, 4, 28, 0, 0), datetime.datetime(2016, 8, 26, 0, 0), datetime.datetime(2016, 8, 27, 0, 0), datetime.datetime(2016, 12, 25, 0, 0)), true_negative=262515, f1_score=0.747122726379292, false_negative=523947, false_positive=145055, recall=0.6535259941305069, precision=0.872010369405665, accuracy=0.6515244328043188, aupr=0.8860854480916317, true_positive=988279, auroc=0.7016029674518429}",
2021-08-05T11:20:57.483+0000,"{ts_split=(datetime.datetime(2015, 1, 1, 0, 0), datetime.datetime(2017, 8, 24, 0, 0), datetime.datetime(2017, 8, 25, 0, 0), datetime.datetime(2017, 12, 23, 0, 0), datetime.datetime(2017, 12, 24, 0, 0), datetime.datetime(2018, 4, 23, 0, 0)), true_negative=2937183, f1_score=0, false_negative=2909820, false_positive=0, recall=0.0, precision=0.0, accuracy=0.5023399167060458, aupr=0.6388040846010784, true_positive=0, auroc=0.6547577222919375}","{ts_split=(datetime.datetime(2015, 1, 1, 0, 0), datetime.datetime(2017, 8, 24, 0, 0), datetime.datetime(2017, 8, 25, 0, 0), datetime.datetime(2017, 12, 23, 0, 0), datetime.datetime(2017, 12, 24, 0, 0), datetime.datetime(2018, 4, 23, 0, 0)), true_negative=276518, f1_score=0, false_negative=1558187, false_positive=0, recall=0.0, precision=0.0, accuracy=0.1507152375995051, aupr=0.9032507074424331, true_positive=0, auroc=0.6491754446646514}",


## Resources and Links

- https://docs.databricks.com/applications/machine-learning/automl-hyperparam-tuning/mllib-mlflow-integration.html
- https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.tuning.ParamGridBuilder.html
- https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.dropna.html
- https://scikit-learn.org/stable/modules/generated/sklearn.preprocessing.OneHotEncoder.html
- https://spark.apache.org/docs/latest/ml-classification-regression.html#binomial-logistic-regression
- https://towardsdatascience.com/machine-learning-with-pyspark-and-mllib-solving-a-binary-classification-problem-96396065d2aa
- https://medium.com/swlh/logistic-regression-with-pyspark-60295d41221
- https://medium.com/@soumyachess1496/cross-validation-in-time-series-566ae4981ce4
- https://medium.com/@haoyunlai/smote-implementation-in-pyspark-76ec4ffa2f1d
- https://docs.databricks.com/applications/machine-learning/train-model/mllib/index.html
- https://github.com/MingChen0919/learning-apache-spark/blob/master/notebooks/06-machine-learning/classification/random-forest-classification.ipynb
- https://spark.apache.org/docs/latest/ml-classification-regression.html#random-forest-classifier

In [0]:
display(data)

dep_is_delayed,canceled,planned_departure_utc,origin_state,origin_city,origin_ICAO,origin_altitude,origin_wnd_type,origin_wnd_speed,origin_cig_cloud_agl,origin_cig_cavok,origin_vis_dist,origin_vis_var,origin_tmp_c,origin_dew_c,origin_slp_p,dest_state,dest_city,dest_ICAO,dest_altitude,carrier,year,quarter,month,day_of_month,day_of_week,dep_hour,arr_hour,planned_duration,flight_distance,distance_group,pct_delayed_from_origin,mean_delay_from_origin,pct_delayed_to_dest,mean_delay_to_dest,pct_delayed_for_route,mean_delay_for_route,pct_delayed_from_state,mean_delay_from_state,pct_delayed_to_state,mean_delay_to_state,oa_avg_del_ind,da_avg_del_ind,carrier_avg_del_ind,poten_for_del,prev_fl_del,nas_window_del_ind,weather_window_del_ind,carrier_window_del_ind,security_window_del_ind,late_ac_window_del_ind,holiday,origin_wnd_type_null,origin_wnd_speed_null,origin_cig_cloud_agl_null,origin_cig_cavok_null,origin_vis_dist_null,origin_vis_var_null,origin_tmp_c_null,origin_dew_c_null,origin_slp_p_null
False,False,2015-01-01T11:55:00.000+0000,TX,"Tyler, TX",KTYR,544.0,N,26.0,1829.0,N,16093.0,N,28.0,6.0,10271.0,TX,"Dallas/Fort Worth, TX",KDFW,607.0,EV,2015,1,1,1,4,0001-0559,0600-0659,50.0,102.0,1,18.21929101401484,6.020610057708161,19.162027181922383,10.410166585889812,19.258821210329195,7.10408042578356,20.916771440066903,10.34018189654612,19.216577192318606,9.440727036672566,-1,0,-1,-1,True,-1,-1,-1,-1,-1,holiday,False,False,False,False,False,False,False,False,False
True,True,2015-01-01T12:00:00.000+0000,AR,"Texarkana, AR",KTXK,390.0,N,15.0,1829.0,N,16093.0,N,28.0,-33.0,10283.0,TX,"Dallas/Fort Worth, TX",KDFW,607.0,MQ,2015,1,1,1,4,0600-0659,0700-0759,60.0,181.0,1,17.222844344904814,6.135274356103023,19.162027181922383,10.410166585889812,17.222844344904814,6.135274356103023,17.32794640698919,8.768813120414329,19.216577192318606,9.440727036672566,-1,0,-1,-1,True,-1,-1,-1,-1,-1,holiday,False,False,False,False,False,False,False,False,False
False,False,2015-01-01T14:05:00.000+0000,WA,"Seattle, WA",KSEA,433.0,N,21.0,12479.802773912874,N,16000.0,9,-11.0,-61.0,10291.0,CA,"San Francisco, CA",KSFO,13.0,AS,2015,1,1,1,4,0600-0659,0800-0859,125.0,679.0,3,16.129214141680922,6.7084862252597,25.83841151810567,15.370273550337396,25.206611570247933,13.775118395394188,15.786905806766386,6.527221454727693,20.968720611599743,10.329385058496566,-1,1,-1,-1,False,-1,-1,-1,-1,-1,holiday,False,False,True,False,False,False,False,False,False
True,False,2015-01-01T11:25:00.000+0000,MD,"Baltimore, MD",KBWI,146.0,C,0.0,22000.0,N,16093.0,N,-50.0,-106.0,10230.0,MN,"Minneapolis, MN",KMSP,841.0,DL,2015,1,1,1,4,0600-0659,0800-0859,171.0,936.0,4,22.143887325064405,10.49078919886618,15.40571797423415,8.17815405516324,16.659359929855327,9.059945205479451,22.150829214760854,10.500544681798484,15.51499295740704,8.257378442592778,-1,0,-1,-1,True,-1,-1,-1,-1,-1,holiday,False,False,False,False,False,False,False,False,False
False,False,2015-01-01T11:40:00.000+0000,FL,"Tampa, FL",KTPA,26.0,N,26.0,12479.802773912874,N,16000.0,9,150.0,122.0,10235.0,NY,"New York, NY",KLGA,21.0,DL,2015,1,1,1,4,0600-0659,0900-0959,154.0,1010.0,5,19.180487096900244,9.805819308381478,26.945589705147427,15.464645187869255,24.84897301651228,16.708014498590416,20.73620691585407,11.050036683785766,24.916361845579637,14.3535372305469,-1,-1,-1,-1,True,-1,-1,-1,-1,-1,holiday,False,False,True,False,False,False,False,False,False
False,False,2015-01-01T12:00:00.000+0000,VA,"Washington, DC",KDCA,15.0,N,15.0,22000.0,N,16093.0,N,-17.0,-111.0,10233.0,FL,"Orlando, FL",KMCO,96.0,B6,2015,1,1,1,4,0700-0759,0900-0959,143.0,759.0,4,19.100854206337097,9.091498787465724,20.67967910868919,10.94146988993651,18.246962301381245,9.279156714092844,18.7980526903322,9.91242879918872,20.265895898289283,10.53228037685095,-1,1,-1,-1,True,-1,-1,-1,-1,-1,holiday,False,False,False,False,False,False,False,False,False
True,False,2015-01-01T14:10:00.000+0000,AZ,"Phoenix, AZ",KPHX,1135.0,N,15.0,12479.802773912874,N,16000.0,9,22.0,-11.0,10179.0,CO,"Denver, CO",KDEN,5431.0,UA,2015,1,1,1,4,0700-0759,0800-0859,107.0,602.0,3,18.533780662728223,8.261370432499453,18.306390573988093,8.998358497534271,19.636372935099477,9.068275967882167,18.17005811626455,8.07152729970168,18.532731624832557,9.249279629313008,-1,0,-1,0,True,-1,-1,-1,-1,-1,holiday,False,False,True,False,False,False,False,False,False
False,False,2015-01-01T13:05:00.000+0000,CT,"Hartford, CT",KBDL,173.0,N,57.0,22000.0,N,16093.0,N,-50.0,-156.0,10181.0,NC,"Charlotte, NC",KCLT,748.0,US,2015,1,1,1,4,0800-0859,1000-1059,135.0,644.0,3,16.38343246530381,7.35613083785202,16.557974220386658,7.017735308741668,12.895498552749777,4.016069468010779,16.48220308458037,7.34015437378756,17.903528650048557,7.991099841105581,0,0,-1,-1,False,-1,-1,-1,-1,-1,holiday,False,False,False,False,False,False,False,False,False
False,False,2015-01-01T16:15:00.000+0000,CA,"Los Angeles, CA",KLAX,125.0,N,31.0,22000.0,N,16093.0,N,28.0,-44.0,10195.0,NY,"New York, NY",KJFK,13.0,B6,2015,1,1,1,4,0800-0859,1600-1659,316.0,2475.0,10,21.48681135009189,10.485671790061964,23.91429521143748,14.871565892935887,21.578725595767597,12.60806529644022,20.23576491502932,9.533154260269752,24.916361845579637,14.3535372305469,0,0,0,-1,True,-1,-1,-1,-1,-1,holiday,False,False,False,False,False,False,False,False,False
False,False,2015-01-01T13:30:00.000+0000,VA,"Washington, DC",KIAD,312.0,N,26.0,22000.0,N,16093.0,N,-67.0,-117.0,10232.0,CA,"Los Angeles, CA",KLAX,125.0,UA,2015,1,1,1,4,0800-0859,1100-1159,340.0,2288.0,10,17.31865794909882,10.568394953516025,19.793827590958823,9.899910379928064,15.583452513374649,7.657767890248424,18.7980526903322,9.91242879918872,20.968720611599743,10.329385058496566,0,0,0,-1,True,1,0,0,0,0,holiday,False,False,False,False,False,False,False,False,False


In [0]:
data.dtypes