### Notebook Setup

In [0]:
#basic imports
from pyspark.sql.functions import col, count, when, isnan, isnull, percent_rank, monotonically_increasing_id
from pyspark.sql import functions as F
from pyspark.sql import types
from pyspark import StorageLevel

#for EDA/plots
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
%matplotlib inline

#for feature creation
from pyspark.ml.feature import StandardScaler, VectorAssembler, StringIndexer, OneHotEncoder, ChiSqSelector, Bucketizer
from pyspark.sql.window import Window
from pyspark.sql import Row
from pyspark.sql.functions import concat, substring, lit, udf
from pyspark.sql import DataFrame
from pyspark.sql import Window as W

#for modeling
from pyspark.ml.classification import LogisticRegression, DecisionTreeClassifier, RandomForestClassifier, GBTClassifier, NaiveBayes
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml import Pipeline
import itertools
#for evaluation
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.mllib.evaluation import BinaryClassificationMetrics, MulticlassMetrics
from sklearn.metrics import classification_report
from sparkdl.xgboost import XgboostClassifier

In [0]:
blob_container = "team20fp" # The name of your container created in https://portal.azure.com
storage_account = "w261fp" # The name of your Storage account created in https://portal.azure.com
secret_scope = "team20scope" # The name of the scope created in your local computer using the Databricks CLI
secret_key = "team20key" # The name of the secret key created in your local computer using the Databricks CLI 
blob_url = f"wasbs://{blob_container}@{storage_account}.blob.core.windows.net"
mount_path = "/mnt/mids-w261"

spark.conf.set(
  f"fs.azure.sas.{blob_container}.{storage_account}.blob.core.windows.net",
  dbutils.secrets.get(scope = secret_scope, key = secret_key)
)

In [0]:
display(dbutils.fs.ls(blob_url))

path,name,size
wasbs://team20fp@w261fp.blob.core.windows.net/airlines_weather_full/,airlines_weather_full/,0
wasbs://team20fp@w261fp.blob.core.windows.net/airport_gps.csv,airport_gps.csv,760437
wasbs://team20fp@w261fp.blob.core.windows.net/airport_utc.csv,airport_utc.csv,1127225
wasbs://team20fp@w261fp.blob.core.windows.net/airports_weatherStations_joined/,airports_weatherStations_joined/,0
wasbs://team20fp@w261fp.blob.core.windows.net/df_carrier_delayed_flights_vol/,df_carrier_delayed_flights_vol/,0
wasbs://team20fp@w261fp.blob.core.windows.net/df_carrier_flights_vol/,df_carrier_flights_vol/,0
wasbs://team20fp@w261fp.blob.core.windows.net/df_flight_delay_proportion/,df_flight_delay_proportion/,0
wasbs://team20fp@w261fp.blob.core.windows.net/df_flight_delay_proportion2/,df_flight_delay_proportion2/,0
wasbs://team20fp@w261fp.blob.core.windows.net/df_planes_by_carrier/,df_planes_by_carrier/,0
wasbs://team20fp@w261fp.blob.core.windows.net/feature_set_full_sorted/,feature_set_full_sorted/,0


### Import Data

For test on 2019 data, uncomment the last two lines in the below cell

In [0]:
df_train = spark.read.parquet(f"{blob_url}/final_sets/train").withColumnRenamed('DEP_DEL15', 'label')
df_test = spark.read.parquet(f"{blob_url}/final_sets/test").withColumnRenamed('DEP_DEL15', 'label')

In [0]:
df_train.select('OP_CARRIER').distinct().count()

### Modeling Helper Functions

In [0]:
################# functions for prepping data ###################

def balance_trainset(train_df, up_or_down = 'up'):
    ''' Balance classes in training dataset'''
    
    num_delay = train_df.filter(F.col('label') == 1).count()
    num_ontime = train_df.filter(F.col('label') == 0).count()
    
    #total_obs = num_delay + num_ontime
    
    if up_or_down == 'up':
        pct = num_ontime/num_delay
        
        ontime_df = train_df.filter(F.col('label') == 0)
        delay_df = train_df.filter(F.col('label') == 1).sample(withReplacement = True, fraction = pct, seed = 1)
        
    
    elif up_or_down == 'down':
        pct = num_delay/num_ontime
        
        delay_df = train_df.filter(F.col('label') == 1)
        ontime_df = train_df.filter(F.col('label') == 0).sample(withReplacement = False, fraction = pct, seed = 1)
    
    
    train_balance = delay_df.union(ontime_df)
    print(f'Balancing factor: {round(pct,2)}')
    print(f'Num observations after balancing: {train_balance.count()}')
    return train_balance

################### functions for modeling ######################

def lr_pipeline(model, cts_features, cat_features, bucket_features):
    ''' Pipeline to scale continuous features and encode categorical features'''
    
    #scale continuous features
    vect_cts = VectorAssembler(inputCols = cts_features,
                               outputCol = 'cts_feats',
                              handleInvalid = 'skip')
    scaler = StandardScaler(inputCol = 'cts_feats',
                            outputCol = 'scaled_cts_feats',
                            withStd = True, withMean = True)
    
    ##test bucketed vars
    vect_bucket = Bucketizer(inputCols = bucket_features,
                                         splitsArray = [[0,10,25,50,100,250,500,1000,float('inf')], #orig_fpd
                                           [0,10,25,50,100,250,500,1000,float('inf')], #dest_fpd
                                           [-float('inf'), 4023, float('inf')], #vis
                                           [-float('inf'), -156, float('inf')], #air_tmp
                                           [1,2,3,4,5,7,float('inf')], #distance (group)
                                           [0, 0.20, 0.35, 0.55, 0.75, float('inf')], #pct carrier del
                                           [0, 0.20, 0.35, 0.55, 0.75, float('inf')], #pct route del
                                           [0, 0.20, 0.35, 0.55, 0.75, float('inf')], #pct orig del
                                           [0, 0.20, 0.35, 0.55, 0.75, float('inf')] #pct dest del
                                          ],
                            outputCols = [col+'_bucket' for col in bucket_features])
    
    #index string values before one hot encoding (also works on numeric categoricals, will convert to string then index)
    indexed = StringIndexer(inputCols = cat_features+[col+'_bucket' for col in bucket_features],
                            outputCols = [col+'_idx' for col in cat_features+bucket_features],
                            handleInvalid = 'keep')
    
    
    onehot_feats = OneHotEncoder(inputCols = [col+'_idx' for col in cat_features+bucket_features],
                                outputCols = [col+'_enc' for col in cat_features+bucket_features])
    
    vect_cat = VectorAssembler(inputCols = [col+'_enc' for col in cat_features+bucket_features],
                              outputCol = 'cat_feats')
    

    #combine cts and cat features
    combined_vect = VectorAssembler(inputCols = ['scaled_cts_feats', 'cat_feats'],
                                   outputCol = 'features')
    
    #combine pipeline components
    pipeline = Pipeline(stages = [vect_cts, scaler, vect_bucket, indexed, onehot_feats, vect_cat, combined_vect, model])
    return pipeline

# tree algo pipeline - no scaler, no feature selector
def tree_pipeline(model, cts_features, cat_features, bucket_features):
    
    #cts vars
    vect_cts = VectorAssembler(inputCols = cts_features,
                               outputCol = 'cts_feats',
                              handleInvalid = 'skip')
    
    #cts vars to bucket
    vect_bucket = Bucketizer(inputCols = bucket_features,
                            splitsArray = [[0,10,25,50,100,250,500,1000,float('inf')], #orig_fpd
                                           [0,10,25,50,100,250,500,1000,float('inf')], #dest_fpd
                                           [-float('inf'), 4023, float('inf')], #vis
                                           [-float('inf'), -156, float('inf')], #air_tmp
                                           [1,2,3,4,5,7,float('inf')], #distance (group)
                                           [0, 0.20, 0.35, 0.55, 0.75, float('inf')], #pct carrier del
                                           [0, 0.20, 0.35, 0.55, 0.75, float('inf')], #pct route del
                                           [0, 0.20, 0.35, 0.55, 0.75, float('inf')], #pct orig del
                                           [0, 0.20, 0.35, 0.55, 0.75, float('inf')] #pct dest del
                                          ],
                             outputCols = [col+'_bucket' for col in bucket_features])
    
    #cat vars
    indexed = StringIndexer(inputCols = cat_features+[col+'_bucket' for col in bucket_features],
                            outputCols = [col+'_idx' for col in cat_features+bucket_features],
                            handleInvalid = 'keep')
    
    
    onehot_feats = OneHotEncoder(inputCols = [col+'_idx' for col in cat_features+bucket_features],
                                outputCols = [col+'_enc' for col in cat_features+bucket_features])
    
    vect_cat = VectorAssembler(inputCols = [col+'_enc' for col in cat_features+bucket_features],
                              outputCol = 'cat_feats')
    
    combined_vect = VectorAssembler(inputCols = ['cts_feats', 'cat_feats'],
                                   outputCol = 'features')
    
    #combine pipeline components
    pipeline = Pipeline(stages = [vect_cts, vect_bucket, indexed, onehot_feats, vect_cat, combined_vect, model])
    return pipeline

    
def gen_model_pipeline(model_type, param_dict, cts_features, cat_features, bucket_features = None):
    '''Input model type and parameters, return model pipeline'''
    
    #still need to create dict with parameters for each model
    params = param_dict[model_type]
    if model_type == 'lr':
        lr = LogisticRegression(regParam = params['regParam']
                                #,weightCol = 'label_weight'
                               )
        pipeline = lr_pipeline(lr, cts_features, cat_features, bucket_features)
    
    elif model_type == 'gbt':
        gbt = GBTClassifier(maxDepth = params['maxDepth'],
                           maxBins = params['maxBins'],
                           maxIter = params['maxIter'],
                           stepSize = params['stepSize'])
        pipeline = tree_pipeline(gbt, cts_features, cat_features, bucket_features)
    
    elif model_type == 'xgb':

        xgb = XgboostClassifier(labelCol = 'label',
                                featuresCol = 'features',
                                missing = 0.0,
                                rawPredictionCol = 'probability',
                                #booster = params['booster'], #defaults to gbtree
                                max_depth = params['max_depth'],
                                n_estimators = params['n_estimators'],
                                reg_lambda = params['reg_lambda'],
                                reg_alpha = params['reg_alpha'],
                                objective = params['objective'],
                                base_score = params['base_score'],
                                gamma = params['gamma'],
                                scale_pos_weight = params['scale_pos_weight'],
                                min_child_weight = params['min_child_weight'],
                                #max_delta_step = params['max_delta_step'],
                                learning_rate = params['learning_rate'],
                                max_bin = params['max_bin']
                               )
        
        pipeline = tree_pipeline(xgb, cts_features, cat_features, bucket_features)
        
    return pipeline




def fit_model(train_df, model_type, param_dict, cts_features, cat_features, bucket_features = None, balance_type = 'up', pipeline = None):
    ''' Balance train_df, generate model pipeline using best params, train model'''
    
    #balance train_df
    if balance_type == 'up':
        train_df = balance_trainset(train_df, up_or_down = 'up').persist(StorageLevel.MEMORY_AND_DISK)
    elif balance_type == 'down':
        train_df = balance_trainset(train_df, up_or_down = 'down').persist(StorageLevel.MEMORY_AND_DISK)
    elif balance_type == 'weight':
        train_df = weight_classes(train_df).persist(StorageLevel.MEMORY_AND_DISK)
    #elif balance_type == None:
        #train_df = train_df.persist(StorageLevel.MEMORY_AND_DISK)
    #print('Train dataset balancing complete')  
    
    #gen model pipeline using best params (need to find thru CV)
    #params = param_dict[model_type]
    if pipeline == None:
        pipeline = gen_model_pipeline(model_type, param_dict, cts_features, cat_features, bucket_features)
        #print('Pipeline generation complete')
    #train model
    model = pipeline.fit(train_df)
    #print('Training complete')
    train_df.unpersist()
    return model


################# functions for evaluation ######################

def eval_p_r_f2(df, acc = True):
    pred_rdd = df.select(['prediction', 'label']).rdd
    multi_metrics = MulticlassMetrics(pred_rdd)
    precision = multi_metrics.precision(1)
    recall = multi_metrics.recall(label = 1)
    f2 = multi_metrics.fMeasure(1.0,2.0)
    if acc:
        score =  multi_metrics.accuracy
        return(precision, recall, f2, score)
    return (precision, recall, f2)
  
 


#### Define Featureset

In [0]:
cts_vars = ['PRE_FL_WINDOW','CUMAVG_WND_DIR_WEEKLY','CUMAVG_DEW_WEEKLY','CUMAVG_VIS_WEEKLY','CUMAVG_SLP_WEEKLY','CUMAVG_WND_SPEED_WEEKLY','CUMAVG_CEIL_HEIGHT_WEEKLY','CUMAVG_AIR_TMP_WEEKLY','CUMAVG_DEP_DELAY_WEEKLY','CUMAVG_DEP_DEL15_WEEKLY','CUMAVG_ARR_DELAY_WEEKLY','CUMAVG_ARR_DEL15_WEEKLY']

cat_vars = ['ORIGIN','YEAR','QUARTER','DEST','MONTH','DAY_OF_WEEK','OP_CARRIER','PRIOR_ARR_DEL','PRIOR_DEP_DEL','ORIG_DEST']

bucket_vars = ['ORIG_FPD','DEST_FPD','VIS','AIR_TMP','DISTANCE_GROUP','PCT_CARRIER_DEL','PCT_ROUTE_DEL','PCT_ORIG_DEL','PCT_DEST_DEL']


#### Define Model and Train

In [0]:
params = {'xgb':{'max_depth':6,
                  'n_estimators':150,
                  'reg_lambda':1,
                  'reg_alpha':0.2,
                  'tree_method':'hist',
                  'objective':'binary:logistic',
                  'base_score':0.5,
                  'gamma':0.05,
                  'min_child_weight':1.5,
                  'max_bin': 50,
                  'learning_rate' : 0.2}}

xgb_model_2 = fit_model(df_train,
                       'xgb',
                       params,
                       cts_features = cts_vars,
                       cat_features = cat_vars,
                       bucket_features = bucket_vars,
                       balance_type = 'down')

#### Evaluate

In [0]:
val_pred2 =xgb_model_2.transform(df_test)
precision_2, recall_2, f2_2, score_2 = eval_p_r_f2(val_pred2, acc = True)

In [0]:
print(f'Precision: {precision_2}')
print(f'Recall: {recall_2}')
print(f'F2: {f2_2}')
print(f'Accuracy: {score_2}')