# XGboost testing

##Initial Setup

In [0]:
from pyspark.sql import Row,SparkSession
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.types import *
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StandardScaler
from pyspark.ml.feature import MinMaxScaler
from pyspark.mllib.evaluation import MulticlassMetrics, BinaryClassificationMetrics
from pyspark.ml.evaluation import BinaryClassificationEvaluator
import time
from pyspark.sql.functions import *
import numpy as np
import random
import datetime
import pandas as pd
import xgboost as xgb
import mlflow.xgboost
import math
import itertools

from sparkdl.xgboost import XgboostRegressor,XgboostClassifier
from sklearn.model_selection import RandomizedSearchCV

In [0]:
blob_container = "team06" # The name of your container created in https://portal.azure.com
storage_account = "apatel" # The name of your Storage account created in https://portal.azure.com
secret_scope = "team06" # The name of the scope created in your local computer using the Databricks CLI
secret_key = "team06" # The name of the secret key created in your local computer using the Databricks CLI 
blob_url = f"wasbs://{blob_container}@{storage_account}.blob.core.windows.net"
mount_path = "/mnt/mids-w261"

In [0]:
spark.catalog.clearCache()
spark.conf.set("spark.tasks.cpus","spark.executor.cores")

In [0]:
df_imported = spark.read.parquet(f'{blob_url}/full_join_mattsFeats_anandFeats_cleaned_extraFeatures_upToOD_good_v8_4-6-22')

##Set Up Feature Lists

In [0]:
# copied from section `Flights Features to Keep`
Features_for_flights = [ 'DISTANCE','ORI_elevation','DEST_elevation', 
                   'prior_dep_delayed', 'previous_DEP_DELAY_NEW',
                    'prior_arr_delayed', 'previous_ARR_DELAY_NEW', 'plane_is_here',
                    'avg_carrier_delay_24hrs', 'flights_sch_Today_ORIGIN', 'flights_sch_Today_DEST', 'avg_ori_airport_delay_24hrs', 'daytime', 'evening',
                    'new_england', 'mid_atlantic', 'south', 'midwest', 'southwest', 'west', 'pacific_islands', 
                    'spring', 'winter', 'autumn',
                    'weekend_or_holiday','prior_dep_delayed','previous_DEP_DELAY_NEW','plane_is_here','previous_ARR_DELAY_NEW','avg_carrier_delay_24hrs',
                    'prior_arr_delayed','avg_ori_airport_delay_24hrs','depDelayPageRank','arrDelayPageRank'
                     ]

Features_for_weather = ['OC1_0_wind_gust_spd_rate_imp', 'AA1_1_liquid_precip', 'AA3_1_liquid_precip', 'gd1_0_sky_coverage', 'au2_4_extreme_wind_weather', 'mv1_0_sand_dust_near', 'mv1_0_thunder_rain_near', 'aw1_mw1_0_smoke_haze_dust', 'aw1_mw1_0_fog', 'aw1_mw1_0_rain_drizzle', 'aw1_mw1_0_freezing_rain_drizzle', 'aw1_mw1_0_snow', 'aw1_mw1_0_hail_or_ice', 'aw1_mw1_0_thunderstorm', 'aw1_mw1_0_tornado', 'SLP_0_avg_station_press_imp', 'WND_3_wind_speed_imp', 'CIG_0_sky_ceiling_height_imp', 'VIS_0_visibility_dist_imp', 'TMP_0_air_temperature_imp', 'DEW_0_dew_pt_temp_imp', 'MA1_0_altimeter_set_rate_imp', 'MA1_2_station_pres_rate_imp', 'GD1_3_cloud_height_imp']

new_features_4_6 = ['avg_hourly_delay_24hr', 'avg_OD_dep_del15_24hr', 'avg_OD_delay_min_24hr', 'avg_OD_num_flights_24hr', 'depDelayPageRank_ordinal', 'arrDelayPageRank_ordinal', 'departed_for_current_aiport', 'time_inb_flight_min', 'avg_time_inb_flights_carrier_24hr', 'avg_time_inb_flights_origin_24hr', 'avg_time_inb_flights_dest_24hr', 'avg_ori_DEP_DELAY_NEW_24hr','airline_carrier_del', 'airline_carrier_del_min', 'avg_carrier_delay_over15_lastQ', 'airline_carrier_del_ordinal', 'airline_carrier_del_min_ordinal', 'origin_avg_DEP_DEL15', 'origin_avg_DEP_DELAY_NEW', 'avg_origin_delay_over15_lastQ', 'origin_del_ordinal', 'origin_del_min_ordinal', 'OD_avg_DEP_DEL15', 'OD_avg_DEP_DELAY_NEW', 'avg_OD_delay_over15_lastQ', 'OD_del_ordinal', 'OD_del_min_ordinal']

In [0]:
feature_list_full = Features_for_flights + Features_for_weather + new_features_4_6

In [0]:
print(feature_list_full)

In [0]:
len(feature_list_full)

In [0]:
def rebalanceDF(trainingDF, desired_neg_to_pos_ratio):
    original_dep_del15_count_df = trainingDF.groupby('DEP_DEL15').count()
    positiveTrainCount = original_dep_del15_count_df.filter(original_dep_del15_count_df.DEP_DEL15 ==  1).select(['count']).head()[0]
    negativeTrainCount = original_dep_del15_count_df.filter(original_dep_del15_count_df.DEP_DEL15 ==  0).select(['count']).head()[0]

    # undersampling the negative cases
    negativeDF = trainingDF.filter(trainingDF['DEP_DEL15']==0).sample(False, positiveTrainCount/negativeTrainCount*desired_neg_to_pos_ratio, seed=12345)
    positiveDF = trainingDF.filter(trainingDF['DEP_DEL15']==1)
    new_trainDF = positiveDF.union(negativeDF).cache()
    return new_trainDF

## Manual XGboost pipeline (initial testing)

In [0]:
CV_name = 'fold_1'
train_data = df_imported.where((col('YEAR').cast('int') == 2015))
val_data = df_imported.where((col('YEAR').cast('int') == 2016))
holdout_data = df_imported.where((col('YEAR').cast('int') == 2019))
save_name = 'xgboost_test_preds_4_7_'+CV_name

print("Fold file will be saved as",save_name)

In [0]:
desired_neg_to_pos_ratio = 1.6

In [0]:
spark.catalog.clearCache()

desired_neg_to_pos_ratio = 1.6

prior_features = ['evening', 'prior_dep_delayed', 'previous_DEP_DELAY_NEW', 'plane_is_here', 'previous_ARR_DELAY_NEW', 'avg_carrier_delay_24hrs', 'prior_arr_delayed', 'avg_ori_airport_delay_24hrs', 'depDelayPageRank', 'arrDelayPageRank', 'daytime','aw1_mw1_0_thunderstorm','autumn','WND_3_wind_speed_imp','AA1_1_liquid_precip','GD1_3_cloud_height_imp'
]

extra_features_added = ['avg_hourly_delay_24hr', 'avg_OD_dep_del15_24hr', 'avg_OD_delay_min_24hr', 'depDelayPageRank_ordinal', 'arrDelayPageRank_ordinal', 'departed_for_current_aiport', 'time_inb_flight_min', 'avg_ori_DEP_DELAY_NEW_24hr','airline_carrier_del', 'airline_carrier_del_min', 'avg_carrier_delay_over15_lastQ', 'airline_carrier_del_ordinal', 'airline_carrier_del_min_ordinal', 'origin_avg_DEP_DEL15', 'origin_avg_DEP_DELAY_NEW', 'avg_origin_delay_over15_lastQ', 'origin_del_ordinal', 'origin_del_min_ordinal', 'OD_avg_DEP_DEL15', 'OD_avg_DEP_DELAY_NEW', 'avg_OD_delay_over15_lastQ', 'OD_del_ordinal', 'OD_del_min_ordinal']

feature_list = prior_features + extra_features_added

train = train_data
val = val_data
holdout = holdout_data

train = rebalanceDF(train, desired_neg_to_pos_ratio)

assembler = VectorAssembler().setInputCols(feature_list_full)\
                        .setOutputCol('vectorized_features')

assmb_train = assembler.transform(train)
assmb_val = assembler.transform(val)
assmb_holdout = assembler.transform(holdout)

# no normalization required

selected_train = assmb_train.select(['vectorized_features','DEP_DEL15']).withColumnRenamed('vectorized_features','features').withColumnRenamed('DEP_DEL15','label')

selected_val = assmb_val.select(['vectorized_features','DEP_DEL15']).withColumnRenamed('vectorized_features','features').withColumnRenamed('DEP_DEL15','label')  

selected_holdout = assmb_holdout.select(['vectorized_features','DEP_DEL15']).withColumnRenamed('vectorized_features','features').withColumnRenamed('DEP_DEL15','label')  

In [0]:
# from earlier testing, we determined the best params for fold 1--the rest will randomize as below.

params = {
'seed':0,
'eta':0.21,
'gamma':0.005,
'max_depth':8,
'subsample':0.565,
'lambda':1.43,
'num_workers':8,
'objective':'binary:logitraw',
'base_score':0.66,
'eval_metric':'aucpr',
'early_stopping_round':10,
'verbosity':0
}

In [0]:
# from initial testing

# spark.catalog.clearCache()
# spark.conf.set("spark.tasks.cpus","spark.executor.cores")

# import xgboost as xgb
# import mlflow.xgboost

# from sparkdl.xgboost import XgboostRegressor,XgboostClassifier
# from sklearn.model_selection import RandomizedSearchCV

# xgbr = XgboostClassifier(params=params)

In [0]:
# The test worked fine! tuning follows.

# spark.conf.set("spark.tasks.cpus","spark.executor.cores")

# model = xgbr.fit(selected_train)

In [0]:
# from initial testing

# val_predictions = model.transform(selected_val)
# holdout_predictions = model.transform(selected_holdout)

In [0]:
# from initial testing

# spark.conf.set("spark.tasks.cpus","spark.executor.cores")

# val_predictions_trim = val_predictions.select(["label","prediction"])
# metrics = MulticlassMetrics(val_predictions_trim.rdd)

# print("Validation set: Accuracy:",metrics.accuracy, "Recall:",metrics.recall(label=1),"Precision:", metrics.precision(1), "F1 Score:",metrics.fMeasure(1.0,1.0))

# holdout_predictions_trim = holdout_predictions.select(["label","prediction"])
# metrics = MulticlassMetrics(holdout_predictions_trim.rdd)

# print("Test set: Accuracy:",metrics.accuracy, "Recall:",metrics.recall(label=1),"Precision:", metrics.precision(1), "F1 Score:",metrics.fMeasure(1.0,1.0))

In [0]:
# no longer used

# param_grid = {
# 'seed':[0],
# 'eta':[0.05,0.15,0.25,0.5],
# 'gamma':[0.01],
# 'max_depth':[4,6,8],
# 'subsample':[0.5,0.8,1],
# 'lambda':[0.8,1,1.5,2],
# 'num_parallel_tree':[1,2],
# 'num_workers':[4],
# 'objective':['binary:logitraw'],
# 'base_score':[0.5],
# 'eval_metric':['aucpr'],
# 'verbosity':0
# }

In [0]:
# these are not the original search parameters, which bracketed a larger space.
# these are centered around the "consensus" values from the first set of training to eliminate any outliers,
# to eliminate cases where a model perhaps did not stumble upon the best range of values the first time around

def generate_random_configuration_honed():
    params = {'seed':0,
                'eta':random.uniform(0.15,0.3),
                'gamma':random.uniform(0.2,0.35),
                'max_depth':math.floor(random.uniform(4,6)),
                'subsample':random.uniform(0.5,0.7),
                'lambda':random.uniform(1.2,2.5),
                'num_parallel_tree':math.floor(random.uniform(4,8)),
                'num_workers':8,
                'objective':'binary:logitraw',
                'base_score':random.uniform(0.55,0.7),
                'eval_metric':'aucpr',
                'verbosity':0
              }
    return params

In [0]:
CV_name = 'fold_1'
train_data = df_imported.where((col('YEAR').cast('int') == 2015))
val_data = df_imported.where((col('YEAR').cast('int') == 2016))
holdout_data = df_imported.where((col('YEAR').cast('int') == 2019))
save_name = 'xgboost_test_preds_4_9_'+CV_name
save_val_name = 'xgboost_val_preds_4_9_'+CV_name

train = train_data
val = val_data
holdout = holdout_data

train = rebalanceDF(train, desired_neg_to_pos_ratio)

assembler = VectorAssembler().setInputCols(feature_list_full)\
                        .setOutputCol('vectorized_features')

assmb_train = assembler.transform(train)
assmb_val = assembler.transform(val)
assmb_holdout = assembler.transform(holdout)

selected_train = assmb_train.select(['vectorized_features','DEP_DEL15']).withColumnRenamed('vectorized_features','features').withColumnRenamed('DEP_DEL15','label')

selected_val = assmb_val.select(['vectorized_features','DEP_DEL15']).withColumnRenamed('vectorized_features','features').withColumnRenamed('DEP_DEL15','label')  

selected_holdout = assmb_holdout.select(['vectorized_features','DEP_DEL15']).withColumnRenamed('vectorized_features','features').withColumnRenamed('DEP_DEL15','label') 

print("Fold file will be saved as",save_name)

iterations = 1
best_score = 0
best_params = None
best_model = None


# re-generating based on best hyperparameters
params = {
'seed':0,
'random_state':42,
'eta':0.21,
'gamma':0.005,
'max_depth':8,
'subsample':0.565,
'lambda':1.43,
'num_workers':8,
'objective':'binary:logitraw',
'num_parallel_tree':4,
'base_score':0.66,
'eval_metric':'aucpr',
'early_stopping_round':10,
'verbosity':0
}

for i in range(iterations):

    # re-generating based on best hyperparameters
    # params = generate_random_configuration_honed()
    print(params)

    xgb = XgboostClassifier(**params)
    model = xgb.fit(selected_train)
    val_predictions = model.transform(selected_val)
    val_predictions.write.mode("overwrite").parquet(f"{blob_url}/{save_val_name}")
    val_predictions = val_predictions.select(["label","prediction"])
    metrics = MulticlassMetrics(val_predictions.rdd)
    f1_score = metrics.fMeasure(1.0,1.0)
    
    if f1_score > best_score:
        best_score = f1_score
        best_params = params
        best_model = model

    print("Validation set: Accuracy:",metrics.accuracy, "Recall:",metrics.recall(label=1),"Precision:", metrics.precision(1), "F1 Score:",f1_score)

print("For Fold 1:")
print("Best Results:", best_score)
print("Best params:", best_params)

best_mode1_fold_1 = best_model

predictions = best_model.transform(selected_holdout)

print("Best predictions saved as",save_name)
predictions.write.mode("overwrite").parquet(f"{blob_url}/{save_name}")

In [0]:
CV_name = 'fold_2'
train_data = df_imported.where((col('YEAR').cast('int') == 2016))
val_data = df_imported.where((col('YEAR').cast('int') == 2017))
holdout_data = df_imported.where((col('YEAR').cast('int') == 2019))
save_name = 'xgboost_test_preds_4_9_'+CV_name
save_val_name = 'xgboost_val_preds_4_9_'+CV_name

train = train_data
val = val_data
holdout = holdout_data

train = rebalanceDF(train, desired_neg_to_pos_ratio)

assembler = VectorAssembler().setInputCols(feature_list_full)\
                        .setOutputCol('vectorized_features')

assmb_train = assembler.transform(train)
assmb_val = assembler.transform(val)
assmb_holdout = assembler.transform(holdout)

# no normalization required

selected_train = assmb_train.select(['vectorized_features','DEP_DEL15']).withColumnRenamed('vectorized_features','features').withColumnRenamed('DEP_DEL15','label')

selected_val = assmb_val.select(['vectorized_features','DEP_DEL15']).withColumnRenamed('vectorized_features','features').withColumnRenamed('DEP_DEL15','label')  

selected_holdout = assmb_holdout.select(['vectorized_features','DEP_DEL15']).withColumnRenamed('vectorized_features','features').withColumnRenamed('DEP_DEL15','label')

iterations = 1
best_score = 0
best_params = None
best_model = None

print(save_name)

# customized for fold 2 based on some bracketing

# re-generating based on best hyperparameters
params = {
'seed':0,
'random_state':42,
'eta':0.196,
'gamma':0.288,
'max_depth':4,
'subsample':0.587,
'lambda':1.205,
'num_workers':8,
'num_parallel_tree':7,
'objective':'binary:logitraw',
'base_score':0.634,
'eval_metric':'aucpr',
'early_stopping_round':10,
'verbosity':0
}

for i in range(iterations):
    #params = generate_random_configuration_honed()
    print(params)

    xgb = XgboostClassifier(**params)
    model = xgb.fit(selected_train)
    val_predictions = model.transform(selected_val)
    val_predictions.write.mode("overwrite").parquet(f"{blob_url}/{save_val_name}")
    val_predictions = val_predictions.select(["label","prediction"])
    metrics = MulticlassMetrics(val_predictions.rdd)
    f1_score = metrics.fMeasure(1.0,1.0)
    
    if f1_score > best_score:
        best_score = f1_score
        best_params = params
        best_model = model

    print("Validation set: Accuracy:",metrics.accuracy, "Recall:",metrics.recall(label=1),"Precision:", metrics.precision(1), "F1 Score:",f1_score)

print("For Fold 2:")
print("Best Results:", best_score)
print("Best params:", best_params)

best_mode1_fold_2 = best_model

predictions = best_model.transform(selected_holdout)

print("Best predictions saved as",save_name)
predictions.write.mode("overwrite").parquet(f"{blob_url}/{save_name}")

In [0]:
CV_name = 'fold_3'
train_data = df_imported.where((col('YEAR').cast('int') == 2017))
val_data = df_imported.where((col('YEAR').cast('int') == 2018))
holdout_data = df_imported.where((col('YEAR').cast('int') == 2019))
save_name = 'xgboost_test_preds_4_9_'+CV_name
save_val_name = 'xgboost_val_preds_4_9_'+CV_name

train = train_data
val = val_data
holdout = holdout_data

train = rebalanceDF(train, desired_neg_to_pos_ratio)

assembler = VectorAssembler().setInputCols(feature_list_full)\
                        .setOutputCol('vectorized_features')

assmb_train = assembler.transform(train)
assmb_val = assembler.transform(val)
assmb_holdout = assembler.transform(holdout)

# no normalization required

selected_train = assmb_train.select(['vectorized_features','DEP_DEL15']).withColumnRenamed('vectorized_features','features').withColumnRenamed('DEP_DEL15','label')

selected_val = assmb_val.select(['vectorized_features','DEP_DEL15']).withColumnRenamed('vectorized_features','features').withColumnRenamed('DEP_DEL15','label')  

selected_holdout = assmb_holdout.select(['vectorized_features','DEP_DEL15']).withColumnRenamed('vectorized_features','features').withColumnRenamed('DEP_DEL15','label')

iterations = 1
best_score = 0
best_params = None
best_model = None

print(save_name)

# re-generating based on best hyperparameters
params = {
'seed':0,
'random_state':42,
'eta':0.214,
'gamma':0.229,
'max_depth':4,
'subsample':0.659,
'lambda':2.235,
'num_workers':8,
'num_parallel_tree':5,
'objective':'binary:logitraw',
'base_score':0.562,
'eval_metric':'aucpr',
'early_stopping_round':10,
'verbosity':0
}

for i in range(iterations):
    # params = generate_random_configuration_honed()
    print(params)

    xgb = XgboostClassifier(**params)
    model = xgb.fit(selected_train)
    val_predictions = model.transform(selected_val)
    val_predictions.write.mode("overwrite").parquet(f"{blob_url}/{save_val_name}")
    val_predictions = val_predictions.select(["label","prediction"])
    metrics = MulticlassMetrics(val_predictions.rdd)
    f1_score = metrics.fMeasure(1.0,1.0)
    
    if f1_score > best_score:
        best_score = f1_score
        best_params = params
        best_model = model

    print("Validation set: Accuracy:",metrics.accuracy, "Recall:",metrics.recall(label=1),"Precision:", metrics.precision(1), "F1 Score:",f1_score)

print("For Fold 3:")
print("Best Results:", best_score)
print("Best params:", best_params)

best_mode1_fold_3 = best_model

predictions = best_model.transform(selected_holdout)

print("Best predictions saved as",save_name)
predictions.write.mode("overwrite").parquet(f"{blob_url}/{save_name}")

In [0]:
CV_name = 'fold_4'
train_data = df_imported.where(((col('YEAR').cast('int') == 2017) & (col('MONTH').cast('int') > 6)) | ((col('YEAR').cast('int') == 2018) & (col('MONTH').cast('int') < 7)))
val_data = df_imported.where(((col('YEAR').cast('int') == 2018) & (col('MONTH').cast('int') > 6)))
holdout_data = df_imported.where((col('YEAR').cast('int') == 2019))
save_name = 'xgboost_test_preds_4_9_'+CV_name
save_val_name = 'xgboost_val_preds_4_9_'+CV_name

train = train_data
val = val_data
holdout = holdout_data

train = rebalanceDF(train, desired_neg_to_pos_ratio)

assembler = VectorAssembler().setInputCols(feature_list_full)\
                        .setOutputCol('vectorized_features')

assmb_train = assembler.transform(train)
assmb_val = assembler.transform(val)
assmb_holdout = assembler.transform(holdout)

# no normalization required

selected_train = assmb_train.select(['vectorized_features','DEP_DEL15']).withColumnRenamed('vectorized_features','features').withColumnRenamed('DEP_DEL15','label')

selected_val = assmb_val.select(['vectorized_features','DEP_DEL15']).withColumnRenamed('vectorized_features','features').withColumnRenamed('DEP_DEL15','label')  

selected_holdout = assmb_holdout.select(['vectorized_features','DEP_DEL15']).withColumnRenamed('vectorized_features','features').withColumnRenamed('DEP_DEL15','label')

iterations = 1
best_score = 0
best_params = None
best_model = None
                               
print(save_name)

# re-generating based on best hyperparameters
params = {
'seed':0,
'random_state':42,
'eta':0.218,
'gamma':0.423,
'max_depth':4,
'subsample':0.522,
'lambda':2.28,
'num_workers':8,
'num_parallel_tree':6,
'objective':'binary:logitraw',
'base_score':0.595,
'eval_metric':'aucpr',
'early_stopping_round':10,
'verbosity':0
}

for i in range(iterations):
    # params = generate_random_configuration_honed()
    print(params)

    xgb = XgboostClassifier(**params)
    model = xgb.fit(selected_train)
    val_predictions = model.transform(selected_val)
    val_predictions.write.mode("overwrite").parquet(f"{blob_url}/{save_val_name}")
    val_predictions = val_predictions.select(["label","prediction"])
    metrics = MulticlassMetrics(val_predictions.rdd)
    f1_score = metrics.fMeasure(1.0,1.0)
    
    if f1_score > best_score:
        best_score = f1_score
        best_params = params
        best_model = model

    print("Validation set: Accuracy:",metrics.accuracy, "Recall:",metrics.recall(label=1),"Precision:", metrics.precision(1), "F1 Score:",f1_score)

print("For Fold 4:")
print("Best Results:", best_score)
print("Best params:", best_params)

best_mode1_fold_4 = best_model

predictions = best_model.transform(selected_holdout)

print("Best predictions saved as",save_name)
predictions.write.mode('overwrite').parquet(f"{blob_url}/{save_name}")

In [0]:
CV_name = 'fold_5'
train_data = df_imported.where(((col('YEAR').cast('int') == 2016) & (col('MONTH').cast('int') > 6)) | ((col('YEAR').cast('int') == 2017) & (col('MONTH').cast('int') < 7)))
val_data = df_imported.where(((col('YEAR').cast('int') == 2017) & (col('MONTH').cast('int') > 6)) | ((col('YEAR').cast('int') == 2018) & (col('MONTH').cast('int') < 7)))
holdout_data = df_imported.where((col('YEAR').cast('int') == 2019))
save_name = 'xgboost_test_preds_4_9_'+CV_name
save_val_name = 'xgboost_val_preds_4_9_'+CV_name

train = train_data
val = val_data
holdout = holdout_data

train = rebalanceDF(train, desired_neg_to_pos_ratio)

assembler = VectorAssembler().setInputCols(feature_list_full)\
                        .setOutputCol('vectorized_features')

assmb_train = assembler.transform(train)
assmb_val = assembler.transform(val)
assmb_holdout = assembler.transform(holdout)

# no normalization required

selected_train = assmb_train.select(['vectorized_features','DEP_DEL15']).withColumnRenamed('vectorized_features','features').withColumnRenamed('DEP_DEL15','label')

selected_val = assmb_val.select(['vectorized_features','DEP_DEL15']).withColumnRenamed('vectorized_features','features').withColumnRenamed('DEP_DEL15','label')  

selected_holdout = assmb_holdout.select(['vectorized_features','DEP_DEL15']).withColumnRenamed('vectorized_features','features').withColumnRenamed('DEP_DEL15','label')

iterations = 1
best_score = 0
best_params = None
best_model = None
                             
print(save_name)

# def generate_random_configuration_honed():
#     params = {'seed':0,
#                 'eta':random.uniform(0.25,0.45),
#                 'gamma':random.uniform(0.3,0.4),
#                 'max_depth':math.floor(random.uniform(3,5)),
#                 'subsample':random.uniform(0.6,1),
#                 'lambda':random.uniform(1,2),
#                 'num_parallel_tree':math.floor(random.uniform(1,8)),
#                 'num_workers':6,
#                 'objective':'binary:logitraw',
#                 'base_score':random.uniform(0.4,0.6),
#                 'eval_metric':'aucpr',
#                 'verbosity':0
#               }
#     return params

# re-generating based on best hyperparameters
params = {
'seed':0,
'random_state':42,
'eta':0.434,
'gamma':0.34,
'max_depth':4,
'subsample':0.995,
'lambda':1.711,
'num_workers':8,
'num_parallel_tree':7,
'objective':'binary:logitraw',
'base_score':0.416,
'eval_metric':'aucpr',
'early_stopping_round':10,
'verbosity':0
}

for i in range(iterations):
    # params = generate_random_configuration_honed()
    print(params)
                             
    xgb = XgboostClassifier(**params)
    model = xgb.fit(selected_train)
    val_predictions = model.transform(selected_val)
    val_predictions.write.mode("overwrite").parquet(f"{blob_url}/{save_val_name}")
    val_predictions = val_predictions.select(["label","prediction"])
    metrics = MulticlassMetrics(val_predictions.rdd)
    f1_score = metrics.fMeasure(1.0,1.0)
    
    if f1_score > best_score:
        best_score = f1_score
        best_params = params
        best_model = model

    print("Validation set: Accuracy:",metrics.accuracy, "Recall:",metrics.recall(label=1),"Precision:", metrics.precision(1), "F1 Score:",f1_score)

print("For Fold 5:")
print("Best Results:", best_score)
print("Best params:", best_params)

best_mode1_fold_5 = best_model

predictions = best_model.transform(selected_holdout)

print("Best predictions saved as",save_name)
predictions.write.parquet(f"{blob_url}/{save_name}")

#Thresholding

##Threshold function and determination

In [0]:
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import lit, udf

def ith_(v, i):
    try:
        return float(v[i])
    except ValueError:
        return None

ith = udf(ith_, DoubleType())

output = predictions.withColumn("del_prob",ith("probability", lit(1)))
display(output)

In [0]:
search_center = 0.6
search_bounds = 0.2
granularity = 5
times_to_zoom = 3

for i in range(times_to_zoom):
    search_space = np.linspace(search_center - search_bounds, search_center + search_bounds, granularity)
    best_score = 0
    best_thresh = -1
    for threshold in search_space:
        test_df = output.select('label','del_prob')
        test_df = test_df.withColumn('prediction', when((col('del_prob') >= lit(threshold)), 1.0).otherwise(0.0))
        test_df = test_df.select('label','prediction')
        test_df.cache()
        test_metrics = MulticlassMetrics(test_df.rdd)
        f1_score = test_metrics.fMeasure(1.0,1.0)
        if f1_score > best_score:
            best_score = f1_score
            best_thresh = threshold
        print("threshold:",threshold,"f1 score:",f1_score)
    print("best score this level:", best_score, "at threshold", best_thresh)
    search_center = best_thresh
    search_bounds = search_bounds / 4

In [0]:
def find_optimal_threshold(df, search_center=0.6, search_bounds=0.2, granularity=5, times_to_zoom=3):
    """Finds optimal threshold for a model based on f1 score"""
    
    def ith_(v, i):
    try:
        return float(v[i])
    except ValueError:
        return None
    
    ith = udf(ith_, DoubleType())
    output = df.withColumn("del_prob",ith("probability", lit(1)))
    
    for i in range(times_to_zoom):
    search_space = np.linspace(search_center - search_bounds, search_center + search_bounds, granularity)
    for threshold in search_space:
        best_score = 0
        best_thresh = -1
        test_df = output.select('label','del_prob')
        test_df = test_df.withColumn('prediction', when((col('del_prob') >= lit(threshold)), 1.0).otherwise(0.0))
        test_df = test_df.select('label','prediction')
        test_df.cache()
        test_metrics = MulticlassMetrics(test_df.rdd)
        f1_score = test_metrics.fMeasure(1.0,1.0)
        if f1_score > best_score:
            best_score = f1_score
            best_thresh = threshold
        print("threshold:",threshold,"f1 score:",f1_score)
    print("best score this level:", best_score, "at threshold", best_thresh)
    search_center = threshold
    search_bounds = search_bounds / 4
    
    print("overall best threshold:", best_thresh, "with f1 score", best_score)
    return best_thresh

In [0]:
preds_1 = spark.read.parquet(f'{blob_url}/xgboost_val_preds_4_7_fold_1')
preds_1.cache()
preds_1_thresh = find_optimal_threshold(preds_1)
preds_1.unpersist()

preds_2 = spark.read.parquet(f'{blob_url}/xgboost_val_preds_4_7_fold_2')
preds_2.cache()
preds_2_thresh = find_optimal_threshold(preds_2) 
preds_2.unpersist()

preds_3 = spark.read.parquet(f'{blob_url}/xgboost_val_preds_4_7_fold_3')
preds_3.cache()
preds_3_thresh = find_optimal_threshold(preds_3) 
preds_3.unpersist()

preds_4 = spark.read.parquet(f'{blob_url}/xgboost_val_preds_4_7_fold_4')
preds_4.cache()
preds_4_thresh = find_optimal_threshold(preds_4) 
preds_4.unpersist()

preds_5 = spark.read.parquet(f'{blob_url}/xgboost_val_preds_4_7_fold_5')
preds_5.cache()
preds_5_thresh = find_optimal_threshold(preds_5) 
preds_5.unpersist()


#predictions.write.parquet(f"{blob_url}/catboost_2019_predictions_untuned_rebalanced_100")

##Re-Labeling on new thresholds

In [0]:
def infer_new_labels(df, threshold):
    """Based on input df and threshold, output df with re-inferred labels from new threshold."""
    def ith_(v, i):
        try:
            return float(v[i])
        except ValueError:
            return None
    
    ith = udf(ith_, DoubleType())
    output = df.withColumn("del_prob",ith("probability", lit(1)))
    
    metrics = MulticlassMetrics(df.rdd)
    orig_f1 = metrics.fMeasure(1.0,1.0)
    
    print("Original f1 score:",orig_f1)
    
    test_df = output.select('label','del_prob')
    test_df = test_df.withColumn('prediction', when((col('del_prob') >= lit(threshold)), 1.0).otherwise(0.0))
    test_df = test_df.select('label','prediction')
    test_df.cache()
    test_metrics = MulticlassMetrics(test_df.rdd)
    new_f1 = test_metrics.fMeasure(1.0,1.0)
    
    print("New f1 score:", new_f1)
    
    return test_df

### Apply new labels and save out files

In [0]:
CV_name = 'fold_1'
save_name = 'xgboost_reinf_4_7_'+CV_name
preds_1 = spark.read.parquet(f'{blob_url}/xgboost_test_preds_4_7_fold_1')
preds_1.cache()
preds_1_reinf = infer_new_labels(df=preds_1, threshold=preds_1_thresh)
preds_1.unpersist()
preds_1_reinf.cache()
preds_1_reinf.write.parquet(f"{blob_url}/{save_name}")
preds_1_reinf.unpersist()

CV_name = 'fold_2'
save_name = 'xgboost_reinf_4_7_'+CV_name
preds_2 = spark.read.parquet(f'{blob_url}/xgboost_test_preds_4_7_fold_2')
preds_2.cache()
preds_2_reinf = infer_new_labels(df=preds_2, threshold=preds_2_thresh)
preds_2.unpersist()
preds_2_reinf.cache()
preds_2_reinf.write.parquet(f"{blob_url}/{save_name}")
preds_2_reinf.unpersist()

CV_name = 'fold_3'
save_name = 'xgboost_reinf_4_7_'+CV_name
preds_3 = spark.read.parquet(f'{blob_url}/xgboost_test_preds_4_7_fold_3')
preds_3.cache()
preds_3_reinf = infer_new_labels(df=preds_3, threshold=preds_3_thresh)
preds_3.unpersist()
preds_3_reinf.cache()
preds_3_reinf.write.parquet(f"{blob_url}/{save_name}")
preds_3_reinf.unpersist()

CV_name = 'fold_4'
save_name = 'xgboost_reinf_4_7_'+CV_name
preds_4 = spark.read.parquet(f'{blob_url}/xgboost_test_preds_4_7_fold_4')
preds_4.cache()
preds_4_reinf = infer_new_labels(df=preds_4, threshold=preds_4_thresh)
preds_4.unpersist()
preds_4_reinf.cache()
preds_4_reinf.write.parquet(f"{blob_url}/{save_name}")
preds_4_reinf.unpersist()

CV_name = 'fold_5'
save_name = 'xgboost_reinf_4_7_'+CV_name
preds_5 = spark.read.parquet(f'{blob_url}/xgboost_test_preds_4_7_fold_5')
preds_5.cache()
preds_5_reinf = infer_new_labels(df=preds_5, threshold=preds_5_thresh)
preds_5.unpersist()
preds_5_reinf.cache()
preds_5_reinf.write.parquet(f"{blob_url}/{save_name}")
preds_5_reinf.unpersist()

#Voting and Test Set Performance

In [0]:
# add index to each results column for joining

from pyspark.sql.functions import desc, row_number, monotonically_increasing_id
from pyspark.sql.window import Window

# read in files

preds_1_reinf = spark.read.parquet(f'{blob_url}/xgboost_reinf_4_7_fold_1')
preds_2_reinf = spark.read.parquet(f'{blob_url}/xgboost_reinf_4_7_fold_2')
preds_3_reinf = spark.read.parquet(f'{blob_url}/xgboost_reinf_4_7_fold_3')
preds_4_reinf = spark.read.parquet(f'{blob_url}/xgboost_reinf_4_7_fold_4')
preds_5_reinf = spark.read.parquet(f'{blob_url}/xgboost_reinf_4_7_fold_5')

# verify lengths

preds_1_reinf.count()
preds_2_reinf.count()
preds_3_reinf.count()
preds_4_reinf.count()
preds_5_reinf.count()

df_with_seq_id = df.withColumn

preds_1_reinf = preds_1_reinf.withColumn('index', row_number().over(Window.orderBy(monotonically_increasing_id())) - 1)
preds_2_reinf = preds_2_reinf.withColumn('index', row_number().over(Window.orderBy(monotonically_increasing_id())) - 1)
preds_3_reinf = preds_3_reinf.withColumn('index', row_number().over(Window.orderBy(monotonically_increasing_id())) - 1)
preds_4_reinf = preds_4_reinf.withColumn('index', row_number().over(Window.orderBy(monotonically_increasing_id())) - 1)
preds_5_reinf = preds_5_reinf.withColumn('index', row_number().over(Window.orderBy(monotonically_increasing_id())) - 1)

In [0]:
all_preds = preds_1_reinf.withColumnRenamed("prediction","pred_1")
all_preds = all_preds.join(preds_2_reinf.select("index","prediction"), ['index']).withColumnRenamed("prediction","pred_2")
all_preds = all_preds.join(preds_3_reinf.select("index","prediction"), ['index']).withColumnRenamed("prediction","pred_3")
all_preds = all_preds.join(preds_4_reinf.select("index","prediction"), ['index']).withColumnRenamed("prediction","pred_4")
all_preds = all_preds.join(preds_5_reinf.select("index","prediction"), ['index']).withColumnRenamed("prediction","pred_5")

all_preds.cache()

display(all_preds)

In [0]:
all_preds = all_preds.withColumn("weighted_pred", lit(0.10)*col('pred_1') + lit(0.15)*col('pred_2') + lit(0.2)*col('pred_3') + lit(0.35)*col('pred_4')+ lit(0.2)*col('pred_5'))

all_preds = all_preds.withColumn("prediction", (col("weighted_pred") >= lit(0.5)).cast('int'))

all_preds.cache()

display(all_preds)

In [0]:
all_preds.write.parquet(f"{blob_url}/xgboost_all_folds_weighted_preds")

In [0]:
test_df = all_preds.select('prediction','label')

test_metrics = MulticlassMetrics(test_df.rdd)
f1_score = test_metrics.fMeasure(1.0,1.0)

print("Test set (2019) f1 score:", f1_score)
print("Hurray! (we hope)...")

### Looped inference 2019

Loop through the 2019 test data and add inference for each of the 5 CV fold models to a dataframe to ensure the order of the predictions is kept.

15 columns. Need raw probabilities for each, label for each, prediction for each. Including an ID.

Thresholding & Voting XGBoost for threshold has thresholding in it. Notebook: https://adb-731998097721284.4.azuredatabricks.net/?o=731998097721284#notebook/1858507102384751/command/1858507102384884

- Do thresholding next on it.

In [0]:
# rename the xgb models
best_model_fold_1 = best_mode1_fold_1
best_model_fold_2 = best_mode1_fold_2
best_model_fold_3 = best_mode1_fold_3
best_model_fold_4 = best_mode1_fold_4
best_model_fold_5 = best_mode1_fold_5

In [0]:
from pyspark.sql.functions import desc, row_number, monotonically_increasing_id
from pyspark.sql.window import Window

In [0]:
# prepare test data for 2019
test_data_2019 = holdout_data # See cmd 26, line 4.
# test_data_2019 = df_imported.where((col('YEAR').cast('int') == 2019))

assembler = VectorAssembler().setInputCols(feature_list_full)\
                        .setOutputCol('vectorized_features')

assmb_test2019 = assembler.transform(test_data_2019)

selected_test2019 = assmb_test2019.select(['vectorized_features','DEP_DEL15']).withColumnRenamed('vectorized_features','features').withColumnRenamed('DEP_DEL15','label')

selected_test2019_withID = selected_test2019.withColumn("_id", row_number().over(Window.orderBy(monotonically_increasing_id()))).cache()

In [0]:
num_flights_test2019 = selected_test2019_withID.count()
print(num_flights_test2019)

In [0]:
selected_test2019_withID.where(selected_test2019_withID["_id"] == 42+1).select("features", "label").show()

In [0]:
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import lit, udf

def ith_(v, i):
    try:
        return float(v[i])
    except ValueError:
        return None

# ith = udf(ith_, DoubleType())

# output = predictions.withColumn("del_prob",ith("probability", lit(1)))
# display(output)

In [0]:
val_predictions_3 = best_model_fold_3.transform(test_record)
val_predictions_3.columns

In [0]:
## One example and it produces our 

test_record = selected_test2019_withID.where(selected_test2019_withID["_id"] == 0+1).select("features", "label")

id=0

# model 1-5
val_predictions_1 = best_model_fold_1.transform(test_record)
val_predictions_1 = val_predictions_1.select(["label","prediction","probability"]).withColumnRenamed("label","label_1")\
                                   .withColumnRenamed("prediction","prediction_1").withColumnRenamed("probability","probability_1").withColumn("_id",lit(id+1))

val_predictions_2 = best_model_fold_2.transform(test_record)
val_predictions_2 = val_predictions_2.select(["label","prediction","probability"]).withColumnRenamed("label","label_2")\
                                   .withColumnRenamed("prediction","prediction_2").withColumnRenamed("probability","probability_2").withColumn("_id",lit(id+1))

val_predictions_3 = best_model_fold_3.transform(test_record)
val_predictions_3 = val_predictions_3.select(["label","prediction","probability"]).withColumnRenamed("label","label_3")\
                                   .withColumnRenamed("prediction","prediction_3").withColumnRenamed("probability","probability_3").withColumn("_id",lit(id+1))

val_predictions_4 = best_model_fold_4.transform(test_record)
val_predictions_4 = val_predictions_4.select(["label","prediction","probability"]).withColumnRenamed("label","label_4")\
                                   .withColumnRenamed("prediction","prediction_4").withColumnRenamed("probability","probability_4").withColumn("_id",lit(id+1))

val_predictions_5 = best_model_fold_5.transform(test_record)
val_predictions_5 = val_predictions_5.select(["label","prediction","probability"]).withColumnRenamed("label","label_5")\
                                   .withColumnRenamed("prediction","prediction_5").withColumnRenamed("probability","probability_5").withColumn("_id",lit(id+1))

# chain horizontal joins
val_preds_all = val_predictions_1.join(val_predictions_2, ['_id']).join(val_predictions_3, ['_id']).join(val_predictions_4, ['_id']).join(val_predictions_5, ['_id'])

val_preds_all_big = val_preds_all

display(val_preds_all_big)

_id,label_1,prediction_1,probability_1,label_2,prediction_2,probability_2,label_3,prediction_3,probability_3,label_4,prediction_4,probability_4,label_5,prediction_5,probability_5
1,0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.7685880064964294, 0.23141199350357056))",0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.7439590692520142, 0.25604090094566345))",0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.7285400629043579, 0.2714599370956421))",0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.6941097974777222, 0.30589020252227783))",0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.7341816425323486, 0.26581838726997375))"


In [0]:
# skip the first, already added
for id in range(1, num_flights_test2019):
    # get the record
    test_record = selected_test2019_withID.where(selected_test2019_withID["_id"] == id+1).select("features", "label").cache()
    
    if id%100000 == 0:
        print(f"On id: {id}")
    # get 5 predictions from each model
    
    # model 1-5
    val_predictions_1 = best_model_fold_1.transform(test_record)
    val_predictions_1 = val_predictions_1.select(["label","prediction","probability"]).withColumnRenamed("label","label_1")\
                                       .withColumnRenamed("prediction","prediction_1").withColumnRenamed("probability","probability_1").withColumn("_id",lit(id+1))
    
    val_predictions_2 = best_model_fold_2.transform(test_record)
    val_predictions_2 = val_predictions_2.select(["label","prediction","probability"]).withColumnRenamed("label","label_2")\
                                       .withColumnRenamed("prediction","prediction_2").withColumnRenamed("probability","probability_2").withColumn("_id",lit(id+1))
    
    val_predictions_3 = best_model_fold_3.transform(test_record)
    val_predictions_3 = val_predictions_3.select(["label","prediction","probability"]).withColumnRenamed("label","label_3")\
                                       .withColumnRenamed("prediction","prediction_3").withColumnRenamed("probability","probability_3").withColumn("_id",lit(id+1))
    
    val_predictions_4 = best_model_fold_4.transform(test_record)
    val_predictions_4 = val_predictions_4.select(["label","prediction","probability"]).withColumnRenamed("label","label_4")\
                                       .withColumnRenamed("prediction","prediction_4").withColumnRenamed("probability","probability_4").withColumn("_id",lit(id+1))
    
    val_predictions_5 = best_model_fold_5.transform(test_record)
    val_predictions_5 = val_predictions_5.select(["label","prediction","probability"]).withColumnRenamed("label","label_5")\
                                       .withColumnRenamed("prediction","prediction_5").withColumnRenamed("probability","probability_5").withColumn("_id",lit(id+1))
    
    # chain horizontal joins
    val_preds_all = val_predictions_1.join(val_predictions_2, ['_id']).join(val_predictions_3, ['_id']).join(val_predictions_4, ['_id']).join(val_predictions_5, ['_id'])
    
    # append or add row to a big dataframe
    val_preds_all_big = val_preds_all_big.union(val_preds_all)
        
    test_record.unpersist()
    

In [0]:
val_preds_all.write.parquet(f"{blob_url}/xgboost_predictions_preThresholding_4_9")

In [0]:
display(val_preds_all)