# Model Experimentation

## Notebook Setup

In [0]:
%pip install timezonefinder
%pip install tzfpy

Python interpreter will be restarted.
Python interpreter will be restarted.
Python interpreter will be restarted.
Python interpreter will be restarted.


In [0]:
# General 
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import plotly.express as px
import sys
from statistics import mean
import itertools
import mlflow.spark

# PySpark 
from pyspark.sql.functions import col,isnan,when,count
from pyspark.sql.functions import regexp_replace

# SQL Functions
from pyspark.sql import functions as f
from pyspark.sql.functions import monotonically_increasing_id, to_timestamp, to_utc_timestamp, to_date
from pyspark.sql.functions import isnan, when, count, col, isnull, percent_rank, first, dense_rank
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType, NullType, ShortType, DateType, BooleanType, BinaryType, FloatType, DecimalType
from pyspark.sql import SQLContext
from pyspark.sql.window import Window
from pyspark.streaming import StreamingContext
from pyspark.sql import Row
from functools import reduce
from pyspark.sql.functions import rand,col,when,concat,substring,lit,udf,lower,sum as ps_sum,count as ps_count,row_number
from pyspark.sql.window import *
from pyspark.sql import DataFrame
from pyspark.ml.stat import Correlation
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import DenseMatrix, Vectors
from pyspark.sql.functions import row_number

# ML
from pyspark.ml.stat import Correlation
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.classification import LogisticRegression, DecisionTreeClassifier, RandomForestClassifier, GBTClassifier, MultilayerPerceptronClassifier
from pyspark.ml.regression import LinearRegression, DecisionTreeRegressor, RandomForestRegressor, GBTRegressor#, MultilayerPerceptronRegressor
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler, StandardScaler, OneHotEncoder
from pyspark.ml.evaluation import MulticlassClassificationEvaluator#, MulticlassRegressionEvaluator

# Misc 
from pandas.tseries.holiday import USFederalHolidayCalendar as calendar
from timezonefinder import TimezoneFinder
from tzfpy import get_tz



In [0]:
# Display and define where mids-w261 is located
data_BASE_DIR = "dbfs:/mnt/mids-w261/"
# display(dbutils.fs.ls(f"{data_BASE_DIR}"))

# Inspect the Mount's Final Project folder 
data_BASE_DIR = "dbfs:/mnt/mids-w261/datasets_final_project_2022/"
# display(dbutils.fs.ls(f"{data_BASE_DIR}"))

In [0]:
blob_container = "housestark" # The name of your container created in https://portal.azure.com
storage_account = "neilp" # The name of your Storage account created in https://portal.azure.com
secret_scope = "w261_s1g4" # The name of the scope created in your local computer using the Databricks CLI
secret_key = "w261_s1g4_key" # The name of the secret key created in your local computer using the Databricks CLI 
blob_url = f"wasbs://{blob_container}@{storage_account}.blob.core.windows.net"
mount_path = "/mnt/mids-w261"

In [0]:
spark.conf.set(
  f"fs.azure.sas.{blob_container}.{storage_account}.blob.core.windows.net",
  dbutils.secrets.get(scope = secret_scope, key = secret_key)
)

In [0]:
# df = spark.read.parquet(f"{blob_url}/df_main_3m")
df_full = spark.read.parquet(f"{blob_url}/df_main_fullClean")

## Helper Functions

In [0]:
def preModeling_dataEdit(df):
  '''
  Input: df that has already gone through the final join, cleaning, and feature engineering
  Output: df that includes null imputing and # and % of flights (by tail number) that were delayed and cancelled in the past 90 days --> these depend on window functions, as such they need to be done right after the data is split for modelling and not during feature engineering phase
  '''
  
  ### FINAL CLEANING 
  # Remove rows with null scheduled_departure_UTC because these are rows without a proper timezone (timezonefinder could not find)
  df = df.na.drop(subset=["scheduled_departure_UTC"])
  dropCols = ['TAXI_IN', 'TAXI_OUT']
  df = df.drop(*dropCols) 

  
  ### FINAL FEATURE ADDITIONS
  ## GET NUMBER & PERCENTAGE OF TIMES A PLANE (BY TAIL NUMBER) HAS BEEN DELAYED OR CANCELLED IN THE PAST 3 MONTHS (2 COLUMNS)
  # Make window function
  df = df.withColumn('roundedMonth', f.date_trunc('month', df.scheduled_departure_UTC))
  window_3m = Window().partitionBy('TAIL_NUM').orderBy(f.col('roundedMonth').cast('long')).rangeBetween(-(86400), 0) # changed to 1 day instead of 3 months 

  # Add in Columns
  # Number of flights delayed/cancelled
  df = df.withColumn('no_delays_last1d', when(df.TAIL_NUM.isNotNull(), f.sum('dep_delay_15').over(window_3m)).otherwise(-1)) \
         .withColumn('no_cancellation_last1d', when(df.TAIL_NUM.isNotNull(), f.sum('CANCELLED').over(window_3m)).otherwise(-1)) 
  # Percentage of flights delayed/cancelled
  df = df.withColumn('count_flights_last1d', when(df.TAIL_NUM.isNotNull(), f.count('TAIL_NUM').over(window_3m)).otherwise(-1)) 
  df = df.withColumn('perc_delays_last1d', when(df.count_flights_last1d != -1, (df.no_delays_last1d/ df.count_flights_last1d)).otherwise(-1.0)) \
         .withColumn('perc_cancellation_last1d', when(df.count_flights_last1d != -1, (df.no_cancellation_last1d/ df.count_flights_last1d)).otherwise(-1.0))     
  
  ### HANDLING NULLS
  ## Imputing Hourly Weather Data to the best of our ability (up to 3 hours back)
  window = Window.partitionBy(col("ORIGIN_AIRPORT_ID"))\
                     .orderBy(col("rounded_depTimestamp"))\
                     .rowsBetween(0,3)
  
  cols_to_fill  = ['origin_HourlyAltimeterSetting', 'origin_HourlyDewPointTemperature', 'origin_HourlyDryBulbTemperature', 'origin_HourlyPrecipitation', 'origin_HourlyPressureChange', 'origin_HourlyPressureTendency', 'origin_HourlyRelativeHumidity', 'origin_HourlySeaLevelPressure', 'origin_HourlyStationPressure', 'origin_HourlyVisibility', 'origin_HourlyWetBulbTemperature', 'origin_HourlyWindDirection', 'origin_HourlyWindGustSpeed', 'origin_HourlyWindSpeed', 'origin_HourlySkyConditions_SCT_cnt', 'origin_HourlySkyConditions_OVC_cnt', 'origin_HourlySkyConditions_FEW_cnt', 'origin_HourlySkyConditions_BKN_cnt', 'origin_HourlySkyConditions_VV_cnt', 'origin_HourlySkyConditions_SKC_cnt', 'origin_HourlySkyConditions_CLR_cnt', 'dest_HourlyAltimeterSetting', 'dest_HourlyDewPointTemperature', 'dest_HourlyDryBulbTemperature', 'dest_HourlyPrecipitation', 'dest_HourlyPressureChange', 'dest_HourlyPressureTendency', 'dest_HourlyRelativeHumidity', 'dest_HourlySeaLevelPressure', 'dest_HourlyStationPressure', 'dest_HourlyVisibility', 'dest_HourlyWetBulbTemperature', 'dest_HourlyWindDirection','dest_HourlyWindGustSpeed', 'dest_HourlyWindSpeed', 'dest_HourlySkyConditions_SCT_cnt', 'dest_HourlySkyConditions_OVC_cnt', 'dest_HourlySkyConditions_FEW_cnt', 'dest_HourlySkyConditions_BKN_cnt', 'dest_HourlySkyConditions_VV_cnt', 'dest_HourlySkyConditions_SKC_cnt', 'dest_HourlySkyConditions_CLR_cnt']

  
  for field in cols_to_fill:
      filled_column_start = first(df[field], ignorenulls=True).over(window)
      df = df.withColumn(field, filled_column_start)
  
  ## We are still left with some null values --> will deal with them now in accordance to the table in section VII of this notebook
  impute_minus1int = ['DEP_DELAY_NEW', 'holiday' ,'holiday_in2DayRange']
  df = df.na.fill(value = -1,subset = impute_minus1int)
  
  impute_minus9999int = ['DEP_DELAY']
  df = df.na.fill(value = -9999,subset = impute_minus9999int)
  
  impute_minus1fl = ['perc_delays_last1d', 'perc_cancellation_last1d']
  df = df.na.fill(value = -1.0,subset = impute_minus1fl)
  
  impute_minus9999int = ['elevation_ft']
  df = df.na.fill(value = -9999,subset = impute_minus9999int)
  
  impute_99int = [ 'origin_HourlyRelativeHumidity', 'dest_HourlyRelativeHumidity']
  df = df.na.fill(value = 99 ,subset = impute_99int)
  
  impute_99fl = ['origin_HourlyPrecipitation', 'dest_HourlyPrecipitation']
  df = df.na.fill(value = 99.0 ,subset = impute_99fl)
  
  impute_999int = ['origin_HourlyPressureTendency', 'dest_HourlyPressureTendency']
  df = df.na.fill(value = 999 ,subset = impute_999int)
  
  impute_999fl = ['origin_HourlyPressureChange', 'dest_HourlyPressureChange']
  df = df.na.fill(value = 999.0 ,subset = impute_999fl)
  
  impute_9999int = ['origin_HourlyDewPointTemperature', 'origin_HourlyDryBulbTemperature', 'origin_HourlyWetBulbTemperature', 'origin_HourlyWindGustSpeed', 'dest_HourlyDewPointTemperature', 'dest_HourlyDryBulbTemperature', 'dest_HourlyWetBulbTemperature', 'dest_HourlyWindGustSpeed']
  df = df.na.fill(value = 9999 ,subset = impute_9999int)
    
  impute_99999int = ['origin_HourlyWindDirection', 'origin_HourlyWindSpeed', 'dest_HourlyWindDirection', 'dest_HourlyWindSpeed']
  df = df.na.fill(value = 99999 ,subset = impute_99999int)
  
  impute_99999fl = ['origin_HourlyAltimeterSetting',  'dest_HourlyAltimeterSetting', 'origin_HourlySeaLevelPressure','dest_HourlySeaLevelPressure', 'origin_HourlyStationPressure', 'dest_HourlyStationPressure']
  df = df.na.fill(value = 99999.0 ,subset = impute_99999fl)
  
  impute_999999fl = ['origin_HourlyVisibility', 'dest_HourlyVisibility']
  df = df.na.fill(value = 999999.0 ,subset = impute_999999fl)
  
  impute_str = ['TAIL_NUM', 'type', 'origin_HourlySkyConditions', 'dest_HourlySkyConditions', 'local_timestamp', 'timezone']
  df = df.na.fill(value = 'no_data',subset = impute_str)
  
  imputed_cols  = cols_to_fill + ['perc_delays_last1d', 'perc_cancellation_last1d', 'elevation_ft']
  
  return df,imputed_cols

In [0]:
# Function to create pipeline
def create_pipeline(df, inputCols_cat, inputCols_cont):
  """Creates a feature engineering pipeline for modeling 
  Args:
    inputCols_cat (list): list of categorical input cols
    inputCols_cont (list): list of continuous input cols 
  
  Returns: 
    pipeline (Pipeline): MLlib pipeline with stages  
  """
  
  # String Indexer
  inputCols_categorical_indexed = [f'{i}_index' for i in inputCols_cat]
  string_indexer = StringIndexer(inputCols = inputCols_cat, 
                                 outputCols = inputCols_categorical_indexed).setHandleInvalid('keep')

  # One Hot Encoder  
  inputCols_categorical_encoded = [f'{i}_encoded' for i in inputCols_categorical_indexed]
  one_hot_encoder = OneHotEncoder(inputCols = inputCols_categorical_indexed, 
                                  outputCols = inputCols_categorical_encoded)

  # Vector Assembler (Categorical)
  assembler_cat = VectorAssembler(inputCols = inputCols_categorical_encoded, 
                              outputCol = 'features_cat').setHandleInvalid('keep')
  
  # Vector Assembler (Continuous)
  assembler_cont = VectorAssembler(inputCols = inputCols_cont, 
                              outputCol = 'features_cont').setHandleInvalid('keep')

  # Pipeline
  return Pipeline().setStages([string_indexer, one_hot_encoder, assembler_cat, assembler_cont])

In [0]:
def impute_and_scale_features(df):
  
  # Impute data 
  imputed_df, imputed_cols = preModeling_dataEdit(df)

  # Vector Assembler (Continuous) 
  assembler_cont = VectorAssembler(inputCols = ['features_cont'] + imputed_cols, 
                              outputCol = 'features_cont_all').setHandleInvalid('keep')

  # Standard Scaler 
  scaler = StandardScaler(inputCol = 'features_cont_all',
                          outputCol = 'features_scaled',
                          withMean = True, withStd = True)
  
  # Vector Assembler (Continuous + Categorical) 
  assembler_all = VectorAssembler(inputCols = ['features_scaled', 'features_cat'], 
                              outputCol = 'features_all').setHandleInvalid('keep')

  pipeline = Pipeline().setStages([assembler_cont, scaler, assembler_all])

  # Create features_scaled for all dfs
  pipeline_df = pipeline.fit(imputed_df).transform(imputed_df) 
  
  return  pipeline_df

In [0]:
def get_sampling(train_df, sampling):
  """Modifies the training data to under/over sample 
  Args:
    train_df (df): training data
    sampling (string): if none, no sampling is performed; if under, undersampling is performed; if over, oversampling is performed 
  Returns:
    train_df_sampled (df): modified training data 
  """
  train_df = train_df.filter(col('label') != 2)
  # No sampling 
  if sampling == 'none':
    return train_df
  
  # Undersampling
  elif sampling == 'under':
    no_delay = train_df.filter(col('label') == 0)
    delay = train_df.filter(col('label') == 1)
    
    class_ratio =  delay.count() / no_delay.count()
    no_delay_sample = no_delay.sample(withReplacement=True, fraction=class_ratio)
    train_df_sampled = delay.unionAll(no_delay_sample)
    
    return train_df_sampled
    
  # Oversampling
  elif sampling == 'over':
    no_delay = train_df.filter(col('label') == 0) #3000
    delay = train_df.filter(col('label') == 1) #700
    
    class_ratio = no_delay.count() / delay.count() #0.2
    delay_sample = delay.sample(withReplacement=True, fraction=class_ratio)
    train_df_sampled = no_delay.unionAll(delay_sample)
    
    return train_df_sampled

In [0]:
def get_model(model_type, params, feature_count=None):
  """Builds a model based on the given parameters
  Args:
    model_type (string): type of model to be built 
    params (dict): dictionary of parameters specific to the model_type
  Returns:
    model: MLlib model ready to be trained 
    ml_type (string): type of model (classification or regression)
  """
  # Logistic Regression
  if model_type == 'LogisticRegression':
    ml_type = 'c'
    model = LogisticRegression(featuresCol = 'features_all',
                               labelCol = 'label',
                               maxIter = params['maxIter'],
                               regParam = params['regParam'],
                               elasticNetParam = params['elasticNetParam'])

  # Linear Regression
  elif model_type == 'LinearRegression':
    ml_type = 'r'
    model = LinearRegression(featuresCol = 'features_all',
                             labelCol = 'DEP_DELAY_NEW',
                             maxIter = params['maxIter'],
                             regParam = params['regParam'],
                             elasticNetParam = params['elasticNetParam'])

  # Decision Tree Classifier
  elif model_type == 'DecisionTreeClassifier':
    ml_type = 'c'
    model = DecisionTreeClassifier(featuresCol = 'features_all',
                                   labelCol = 'label',
                                   maxDepth = params['maxDepth'],
                                   impurity = params['impurity'],
                                   maxBins = params['maxBins'],
                                   minInfoGain = params['minInfoGain'])
    
  # Decision Tree Regressor
  elif model_type == 'DecisionTreeRegressor':
    ml_type = 'r'
    model = DecisionTreeRegressor(featuresCol = 'features_all',
                                  labelCol = 'DEP_DELAY_NEW',
                                  maxDepth = params['maxDepth'],
                                  minInfoGain = params['minInfoGain'])

  # Random Forest Classifier
  elif model_type == 'RandomForestClassifier':
    ml_type = 'c'
    model = RandomForestClassifier(featuresCol = 'features_all',
                                   labelCol='label',
                                   numTrees= params['numTrees'], 
                                   maxDepth=params['maxDepth'], 
                                   impurity = params['impurity'],
                                   maxBins = params['maxBins'],
                                   minInfoGain = params['minInfoGain'])
  
  # Random Forest Regressor
  elif model_type == 'RandomForestRegressor':
    ml_type = 'r'
    model = RandomForestRegressor(featuresCol = 'features_all',
                                   labelCol='DEP_DELAY_NEW',
                                   numTrees= params['numTrees'], 
                                   maxDepth=params['maxDepth'],
                                   minInfoGain = params['minInfoGain'])

  # Gradient Boosted Tree Regressor 
  elif model_type == 'GBTRegressor':
    ml_type = 'r'
    model = GBTRegressor(featuresCol = 'features_all',
                         labelCol='DEP_DELAY_NEW',
                         maxIter= params['maxIter'], 
                         maxDepth=params['maxDepth'],
                         stepSize = params['stepSize'],
                         minInfoGain = params['minInfoGain'])

  # MLP NN Classifier 
  elif model_type == 'MultilayerPerceptronClassifier':
    ml_type = 'c'
    model = MultilayerPerceptronClassifier(featuresCol = 'features_all',
                         labelCol='label',
                         layers = [feature_count, 5, 4, 2],
                         maxIter= params['maxIter'], 
                         blockSize=params['blockSize'],
                         stepSize = params['stepSize'])

  
  return model, ml_type

In [0]:
def get_param_permutations(params):
  """Given a dictionary of parameters to test in a grid search, returns all possible permutations
  Args:
    params (dict): dictionary of parameters inputted by user
  Returns:
    param_list (list): list of dictionaries to pass to the model
  """
  param_list = []
  vals = params.values()

  # Loop through all permutations 
  for param_vals in list(itertools.product(*vals)):
    # Create a dictionary to hold each permutation of parameters 
    param_dict = {}
    # Loop over the different parameters 
    for i, key in enumerate(params.keys()):
      param_dict[key] = param_vals[i]
    # Add each dictionary to the parameter list 
    param_list.append(param_dict)
  return param_list 

In [0]:
def evaluate_model(predictions, ml_type):
  """Provides evaluation metrics for classification/regression models
  Args:
    predictions (df): dataframe of predicated and actual values 
    ml_type (string): type of model 
  Returns:
    classification: accuracy, precision, recall, f1score
    regression: r2, rmse, mse, mae
  """
  if ml_type == 'c':
    eval_accuracy = MulticlassClassificationEvaluator(labelCol='label', predictionCol='prediction')
    eval_precision = MulticlassClassificationEvaluator(labelCol='label', predictionCol='prediction', metricName='precisionByLabel')
    eval_recall = MulticlassClassificationEvaluator(labelCol='label', predictionCol='prediction', metricName='recallByLabel')
    eval_f1 = MulticlassClassificationEvaluator(labelCol='label', predictionCol='prediction', metricName='f1')

    accuracy = eval_accuracy.evaluate(predictions)
    precision = eval_precision.evaluate(predictions)
    recall = eval_recall.evaluate(predictions)
    f1score = eval_f1.evaluate(predictions)
    
    return accuracy, precision, recall, f1score
    
  elif ml_type == 'r':
    eval_r2 = RegressionEvaluator(predictionCol='prediction', labelCol='DEP_DELAY_NEW', metricName='r2')
    eval_rmse = RegressionEvaluator(predictionCol='prediction', labelCol='DEP_DELAY_NEW', metricName='rmse')
    eval_mse = RegressionEvaluator(predictionCol='prediction', labelCol='DEP_DELAY_NEW', metricName='mse')
    eval_mae = RegressionEvaluator(predictionCol='prediction', labelCol='DEP_DELAY_NEW', metricName='mae')
    
    r2 = eval_r2.evaluate(predictions)
    rmse = eval_rmse.evaluate(predictions)
    mse = eval_mse.evaluate(predictions)
    mae = eval_mae.evaluate(predictions)
    
    return r2, rmse, mse, mae

## Modeling Functions

In [0]:
def train_model_no_CV(train_df, val_df, model_type, params, train_metrics=False):
  """Splits the df into time series cross validation splits, trains a model, and provides evaluation metrics. Should be used for experimentation to determine best model parameters.
  Args:
    train_df (df): training data that has been through grid_search_test_train_split
    val_df (df): validation data that has been through grid_search_test_train_split
    model_type (string): indicates the type of model that will be trained 
    params (dict): a dictionary of parameters as keys and list of parameter values as values 
        - LogisticRegression: { 'maxIter': [10,20,30], 'regParam': [0.2,0.3,0.4], 'elasticNetParam': [0,0.8,0.9] }
        - LinearRegression: { 'maxIter': [10,20,30], 'regParam': [0.2,0.3,0.4], 'elasticNetParam': [0,0.8,0.9] }
        - DecisionTreeClassifier: { 'numClasses': [3], 'maxDepth': [2], 'impurity': ['gini'], 'maxBins': [32] }
        - DecisionTreeRegressor: { 'maxDepth': [1,2,3] }
  
  Returns: 
    results_df (df):  dataframe of parameters tested and the results from that iteration  
  feature_pipeline_model = (Pipeline()
     .setStages(...)  # Only feature extraction
     .fit(train_df))

    train_df_features = feature_pipeline_model.transform(train_df)
    layers = [
        train_df_features.schema["features"].metadata["ml_attr"]["num_attrs"],
        20, 10, 2
    ]
  """

  # ---------- Train Model ---------- #
  param_permutations = get_param_permutations(params)

  results_df = pd.DataFrame()
  for param in param_permutations:
    if model_type == 'MultilayerPerceptronClassifier':
      model, ml_type = get_model(model_type, param, train_df.schema["features_all"].metadata["ml_attr"]["num_attrs"])
    else:
      model, ml_type = get_model(model_type, param)
    trained_model  = model.fit(train_df)
    
    if train_metrics == True:
      training_predictions = trained_model.transform(train_df)
    predictions          = trained_model.transform(val_df)

    # ---------- Evaluate Model ---------- #
    iter_params = pd.DataFrame(param, index=[0])
    
    # Classification 
    if ml_type == 'c':
      if train_metrics == True:
        train_accuracy, train_precision, train_recall, train_f1score = evaluate_model(training_predictions, ml_type)
        train_iter_results = pd.DataFrame({'Train Accuracy': [train_accuracy], 'Train Precision': [train_precision], 'Train Recall': [train_recall], 'Train F1 Score': [train_f1score]})
      val_accuracy, val_precision, val_recall, val_f1score = evaluate_model(predictions, ml_type)
      val_iter_results = pd.DataFrame({'Val Accuracy': [val_accuracy], 'Val Precision': [val_precision], 'Val Recall': [val_recall], 'Val F1 Score': [val_f1score]})

    # Regression
    elif ml_type == 'r':
      if train_metrics == True:
        train_r2, train_rmse, train_mse, train_mae = evaluate_model(training_predictions, ml_type)
        train_iter_results = pd.DataFrame({'Train R2': [train_r2], 'Train RMSE': [train_rmse], 'Train MSE': [train_mse], 'Train MAE': [train_mae]})
      val_r2, val_rmse, val_mse, val_mae = evaluate_model(predictions, ml_type)
      val_iter_results = pd.DataFrame({'Val R2': [val_r2], 'Val RMSE': [val_rmse], 'Val MSE': [val_mse], 'Val MAE': [val_mae]})
      
    if train_metrics == True:
      iter_df = pd.concat([iter_params, train_iter_results, val_iter_results], axis=1)  
    else:
      iter_df = pd.concat([iter_params, val_iter_results], axis=1)
    results_df = pd.concat([results_df,iter_df], axis=0)
  
  return results_df

In [0]:
def train_model_CV(df, model_type, params, k=5, sampling='none'):
  """Splits the df into time series cross validation splits, trains a model, and provides evaluation metrics. This function shuold be used to evaluate performance metrics for various models. It should not be used for experimentation. (https://stats.stackexchange.com/questions/52274/how-to-choose-a-predictive-model-after-k-fold-cross-validation)
  Args:
    df (dataframe): dataframe to model on; requirements:
      - Has gone through create_pipeline function 
      - Has 'Year' column from 2015 - 2021
      - Has 'features' column (not scaled)
      - Has 'label' column
    model_type (string): indicates the type of model that will be trained 
    params (dict): a dictionary of parameters as keys and parameter values as values 
        - LogisticRegression: { 'maxIter': 10, 'regParam': 0.3, 'elasticNetParam': 0 }
        - LinearRegression: { 'maxIter': 10, 'regParam': 0.3, 'elasticNetParam': 0 }
        - DecisionTreeClassifier: { 'numClasses': 3, 'maxDepth': 2, 'impurity': 'gini', 'maxBins': 32 }
        - DecisionTreeRegressor: { 'maxDepth': 2 }
    k (int): number of folds to split data into
    sampling (string): if none, no sampling is performed; if under, undersampling is performed; if over, oversampling is performed
  
  Returns: 
    results_df (df):  dataframe validation results from each fold 
    saved_model (model): returns the model that had the best validation performance of each fold 
  """
  results_df = pd.DataFrame()

  # ---------- Split Data ---------- #
  df_ranked = df.withColumn("rank", row_number().over(Window.partitionBy().orderBy("scheduled_departure_UTC")))
  df_ranked = df_ranked.filter(col('Year') <= 2020).cache()
  fold_size = df_ranked.count() / k

  # Saved variables across folds
  results_df = pd.DataFrame()
  saved_model = None
  saved_train_df = None 
  lowest_mae = 1000000
  highest_f1 = 0
  
  for i in range(k):
   
    # Split the original dataframe into folds 
    fold_df = df_ranked.where(f"{i * fold_size} < rank").where(f" rank <= {(i+1) * fold_size}").drop("rank")
    # Split the fold into train and validation sets 
    fold_df_ranked = fold_df.withColumn("rank", percent_rank().over(Window.partitionBy().orderBy("scheduled_departure_UTC")))
    train = fold_df_ranked.where("rank <= 0.7").drop("rank")
    train = get_sampling(train, sampling)
    val   = fold_df_ranked.where("rank >  0.7").drop("rank")
    
    # ---------- Impute and Scale Features ---------- #
    train_df_full = impute_and_scale_features(train)
    val_df_full   = impute_and_scale_features(val)

    # ---------- Train Model ---------- #
    #model, ml_type = get_model(model_type, params)
    if model_type == 'MultilayerPerceptronClassifier':
      model, ml_type = get_model(model_type, params, train_df_full.schema["features_all"].metadata["ml_attr"]["num_attrs"])
    else:
      model, ml_type = get_model(model_type, params)
    trained_model  = model.fit(train_df_full)
    predictions    = trained_model.transform(val_df_full)
    
    # ---------- Evaluate Model ---------- #
    # Classification
    if ml_type == 'c':
      accuracy, precision, recall, f1score = evaluate_model(predictions, ml_type)
      iter_params = pd.DataFrame(params, index=[0])
      iter_results = pd.DataFrame({'Accuracy': [accuracy], 'Precision': [precision], 'Recall': [recall], 'F1 Score': [f1score]})
      iter_df = pd.concat([iter_params, iter_results], axis=1)  
      results_df = pd.concat([results_df,iter_df], axis=0)
      
      if f1score > highest_f1:
        highest_f1     = f1score
        saved_model    = trained_model
        saved_train_df = train_df_full
  
    # Regression
    elif ml_type == 'r':
      r2, rmse, mse, mae = evaluate_model(predictions, ml_type)
      iter_params = pd.DataFrame(params, index=[0])
      iter_results = pd.DataFrame({'R2': [r2], 'RMSE': [rmse], 'MSE': [mse], 'MAE': [mae]})
      iter_df = pd.concat([iter_params, iter_results], axis=1)  
      results_df = pd.concat([results_df,iter_df], axis=0)
    
      if mae < lowest_mae:
        lowest_mae     = mae
        saved_model    = trained_model
        saved_train_df = train_df_full
  
  # Clear memory 
  df_ranked.unpersist()
  
  return results_df, saved_model, saved_train_df

In [0]:
def grid_search_test_train_split(pipeline_df, sample_size, sampling='none'):
  """Splits the dataframe in train and test splits for grid search 
  Args:
    df (dataframe): dataframe to model on; requirements:
      - Has gone through create_pipeline function 
      - Has 'Year' column from 2015 - 2021
      - Has 'features' column (not scaled)
      - Has 'label' column
    sample_size (float): optional parameter to specify if you would like a subset of the data 
    sampling (string): if none, no sampling is performed; if under, undersampling is performed; if over, oversampling is performed 
  
  Returns: 
    results_df (df):  dataframe of parameters tested and the results from that iteration   
  """
  # ---------- Split Data ---------- #
  train = pipeline_df.filter(col('Year') <= 2019)
  train = get_sampling(train, sampling)
  val   = pipeline_df.filter(col('Year') == 2020)
#   test  = pipeline_df.filter(col('Year') == 2021)

  # ---------- Get Subset of Train & Val Data ---------- #
  if sample_size:
    train = train.sample(sample_size)
    val = val.sample(sample_size)
#     test = test.sample(sample_size)

  # ---------- Impute and Scale Features ---------- #
  train_df_full = impute_and_scale_features(train)
  val_df_full   = impute_and_scale_features(val)
#   test_df_full  = impute_and_scale_features(test)
  
  return train_df_full, val_df_full

In [0]:
inputCols_categorical = ['MONTH','DAY_OF_WEEK', 'holiday_in2DayRange', 'C19', 'OP_UNIQUE_CARRIER', 'type', 'DEP_TIME_BLK']
inputCols_continuous = ['DISTANCE']

pipeline = create_pipeline(df_full, inputCols_categorical, inputCols_continuous)
pipeline_df = pipeline.fit(df_full).transform(df_full)
pipeline_df = pipeline_df.filter(col('label') != 2)

train_10_none, val_10_none = grid_search_test_train_split(pipeline_df, 0.1, sampling='none')
train_10_none = train_10_none.cache()
val_10_none  = val_10_none.cache()

In [0]:
lin_reg_r_params = { 'maxIter': 20, 'regParam': 0.2, 'elasticNetParam': 0.8 }
lr_10, lr_type_10 = get_model('LinearRegression', lin_reg_r_params)

lr_trained_model_10  = lr_10.fit(train_10_none)
lr_predictions_10    = lr_trained_model_10.transform(val_10_none)

In [0]:
lr_10_eval_r2 = RegressionEvaluator(predictionCol='prediction', labelCol='DEP_DELAY_NEW', metricName='r2')
lr_10_eval_rmse = RegressionEvaluator(predictionCol='prediction', labelCol='DEP_DELAY_NEW', metricName='rmse')
lr_10_eval_mse = RegressionEvaluator(predictionCol='prediction', labelCol='DEP_DELAY_NEW', metricName='mse')
lr_10_eval_mae = RegressionEvaluator(predictionCol='prediction', labelCol='DEP_DELAY_NEW', metricName='mae')

r2_lr_10 = lr_10_eval_r2.evaluate(lr_predictions_10)
rmse_lr_10 = lr_10_eval_rmse.evaluate(lr_predictions_10)
mse_lr_10 = lr_10_eval_mse.evaluate(lr_predictions_10)
mae_lr_10 = lr_10_eval_mae.evaluate(lr_predictions_10)

lr_r_10_results = pd.DataFrame({'r2': [r2_lr_10], 'rmse': [rmse_lr_10], 'mse': [mse_lr_10], 'mae': [mae_lr_10]})
print(lr_r_10_results)

         r2       rmse          mse        mae
0  0.014964  35.315483  1247.183358  14.768823


In [0]:
mlp_r_params = { 'maxIter': 20, 'stepSize': 0.03, 'blockSize': 300, 'seed': 1234, 'solver': 'gd'}
model_10, ml_type_10 = get_model('MultilayerPerceptronClassifier', mlp_r_params, train_10_none.schema["features_all"].metadata["ml_attr"]["num_attrs"])
trained_model_10  = model_10.fit(train_10_none)
predictions_10    = trained_model_10.transform(val_10_none)

In [0]:
eval_accuracy_10 = MulticlassClassificationEvaluator(labelCol='label', predictionCol='prediction')
eval_precision_10 = MulticlassClassificationEvaluator(labelCol='label', predictionCol='prediction', metricName='precisionByLabel')
eval_recall_10 = MulticlassClassificationEvaluator(labelCol='label', predictionCol='prediction', metricName='recallByLabel')
eval_f1_10 = MulticlassClassificationEvaluator(labelCol='label', predictionCol='prediction', metricName='f1')

accuracy_10 = eval_accuracy_10.evaluate(predictions_10)
precision_10 = eval_precision_10.evaluate(predictions_10)
recall_10 = eval_recall_10.evaluate(predictions_10)
f1score_10 = eval_f1_10.evaluate(predictions_10)

mlp_c_10_results = pd.DataFrame({'Accuracy': [accuracy_10], 'Precision': [precision_10], 'Recall': [recall_10], 'F1 Score': [f1score_10]})
print(mlp_c_10_results)

   Accuracy  Precision    Recall  F1 Score
0  0.881305   0.917491  0.988715  0.881305


In [0]:
def prepare_for_regression(df):
  df = df.drop(*['probability', 'rawPrediction'])
  df = df.withColumnRenamed("prediction","MLP_10_class_prediction")
  return df

In [0]:
new_train_10 = prepare_for_regression(predictions_10)

In [0]:
lin_reg_r_params = { 'maxIter': 20, 'regParam': 0.2, 'elasticNetParam': 0.8 }
lr_10, lr_type_10 = get_model('LinearRegression', lin_reg_r_params)

lr_trained_model_10  = lr_10.fit(new_train_10)
lr_predictions_10    = lr_trained_model_10.transform(val_10_none)

In [0]:
lr_10_eval_r2 = RegressionEvaluator(predictionCol='prediction', labelCol='DEP_DELAY_NEW', metricName='r2')
lr_10_eval_rmse = RegressionEvaluator(predictionCol='prediction', labelCol='DEP_DELAY_NEW', metricName='rmse')
lr_10_eval_mse = RegressionEvaluator(predictionCol='prediction', labelCol='DEP_DELAY_NEW', metricName='mse')
lr_10_eval_mae = RegressionEvaluator(predictionCol='prediction', labelCol='DEP_DELAY_NEW', metricName='mae')

r2_lr_10 = lr_10_eval_r2.evaluate(lr_predictions_10)
rmse_lr_10 = lr_10_eval_rmse.evaluate(lr_predictions_10)
mse_lr_10 = lr_10_eval_mse.evaluate(lr_predictions_10)
mae_lr_10 = lr_10_eval_mae.evaluate(lr_predictions_10)
lr_r_10_results = pd.DataFrame({'r2': [r2_lr_10], 'rmse': [rmse_lr_10], 'mse': [mse_lr_10], 'mae': [mae_lr_10]})
print(lr_r_10_results)

         r2       rmse          mse        mae
0  0.052012  34.644997  1200.275817  10.366102
