# IN - Feature Engineering and encoding notebook version

In [0]:
from pyspark.sql.functions import col, udf, expr, pandas_udf, to_date, date_trunc, concat_ws, collect_list, struct, broadcast, row_number
import pyspark.sql.functions as F
import matplotlib.pyplot as plt
import seaborn as sns
from pyspark.sql.functions import when
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.ml import Pipeline
from pyspark.sql.types import IntegerType, StringType, StructType, StructField, DoubleType, DateType, TimestampType, ArrayType, FloatType, LongType
from pyspark.ml.feature import VectorAssembler, StandardScaler, MinMaxScaler
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from sklearn.metrics import confusion_matrix
import numpy as np
from pyspark.sql import Window
from pyspark.sql.functions import date_add, to_timestamp, PandasUDFType
from pyspark.sql.functions import col, unix_timestamp, from_unixtime
import pandas as pd
from prophet import Prophet
# import tensorflow
# import xgboost
# import graphframes
from pyspark.ml.feature import Imputer
from sklearn.preprocessing import LabelEncoder
# import numpy as np
import math
from xgboost.spark import SparkXGBClassifier
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator


print("Welcome to the W261 final project!") 

Welcome to the W261 final project!


# Connect to team storage blob

In [0]:
# The following blob storage is accessible to team members only (read and write)
# access key is valid til TTL
# after that you will need to create a new SAS key and authenticate access again via DataBrick command line
blob_container  = "team-3-2-project261"       # The name of your container created in https://portal.azure.com
storage_account = "fall2024team32"  # The name of your Storage account created in https://portal.azure.com
secret_scope    = "scope_team_3_2"           # The name of the scope created in your local computer using the Databricks CLI
secret_key      = "key_team_3_2"             # The name of the secret key created in your local computer using the Databricks CLI
team_blob_url   = f"wasbs://{blob_container}@{storage_account}.blob.core.windows.net"  #points to the root of your team storage bucket


# the 261 course blob storage is mounted here.
mids261_mount_path      = "/mnt/mids-w261"

# SAS Token: Grant the team limited access to Azure Storage resources
spark.conf.set(
  f"fs.azure.sas.{blob_container}.{storage_account}.blob.core.windows.net",
  dbutils.secrets.get(scope = secret_scope, key = secret_key)
)

In [0]:
# IN check the listing in team blob
display(dbutils.fs.ls(f"{team_blob_url}"))

path,name,size,modificationTime
wasbs://team-3-2-project261@fall2024team32.blob.core.windows.net/12m_cancelled_data/,12m_cancelled_data/,0,1731754303000
wasbs://team-3-2-project261@fall2024team32.blob.core.windows.net/12m_diverted_data/,12m_diverted_data/,0,1731754311000
wasbs://team-3-2-project261@fall2024team32.blob.core.windows.net/12m_duplicates_removed/,12m_duplicates_removed/,0,1733298522000
wasbs://team-3-2-project261@fall2024team32.blob.core.windows.net/12m_final_data/,12m_final_data/,0,1731755226000
wasbs://team-3-2-project261@fall2024team32.blob.core.windows.net/12m_final_data_deduped/,12m_final_data_deduped/,0,1731805979000
wasbs://team-3-2-project261@fall2024team32.blob.core.windows.net/12m_mean_year_month_destination/,12m_mean_year_month_destination/,0,1731754740000
wasbs://team-3-2-project261@fall2024team32.blob.core.windows.net/12m_mean_year_month_origin/,12m_mean_year_month_origin/,0,1731754736000
wasbs://team-3-2-project261@fall2024team32.blob.core.windows.net/12m_null_summary/,12m_null_summary/,0,1731754630000
wasbs://team-3-2-project261@fall2024team32.blob.core.windows.net/12m_sorted_data/,12m_sorted_data/,0,1731754292000
wasbs://team-3-2-project261@fall2024team32.blob.core.windows.net/12m_summary_statistics/,12m_summary_statistics/,0,1731756196000


In [0]:
# Deleting some blob content to make room
dbutils.fs.rm(f'{team_blob_url}/IN_NA_fold_1_train_NA/', recurse=True)
dbutils.fs.rm(f'{team_blob_url}/IN_NA_fold_1_val_NA/', recurse=True)
dbutils.fs.rm(f'{team_blob_url}/IN_NA_fold_2_train_NA/', recurse=True)
dbutils.fs.rm(f'{team_blob_url}/IN_NA_fold_2_val_NA/', recurse=True)
dbutils.fs.rm(f'{team_blob_url}/IN_NA_fold_3_train_NA/', recurse=True)
dbutils.fs.rm(f'{team_blob_url}/IN_NA_fold_3_val_NA/', recurse=True)
dbutils.fs.rm(f'{team_blob_url}/IN_NA_fold_4_train_NA/', recurse=True)
dbutils.fs.rm(f'{team_blob_url}/IN_NA_fold_4_val_NA/', recurse=True)
dbutils.fs.rm(f'{team_blob_url}/IN_NA_fold_5_train_NA/', recurse=True)
dbutils.fs.rm(f'{team_blob_url}/IN_NA_fold_5_val_NA/', recurse=True)

dbutils.fs.rm(f'{team_blob_url}/60m_take_3_sorted_data/', recurse=True)

dbutils.fs.rm(f'{team_blob_url}/20241117_cols_to_be_assembled_lr_12m//', recurse=True)
dbutils.fs.rm(f'{team_blob_url}/20241117_cols_to_be_assembled_lr_12m_backup//', recurse=True)
dbutils.fs.rm(f'{team_blob_url}/20241117_data_encoded_scaled_12m//', recurse=True)
dbutils.fs.rm(f'{team_blob_url}/20241117_data_encoded_scaled_12m_backup//', recurse=True)
dbutils.fs.rm(f'{team_blob_url}/20241117_data_encoded_scaled_ordered_12m//', recurse=True)
dbutils.fs.rm(f'{team_blob_url}/20241117_data_encoded_scaled_ordered_12m_backup//', recurse=True)
dbutils.fs.rm(f'{team_blob_url}/20241118_fold_1_train//', recurse=True)
dbutils.fs.rm(f'{team_blob_url}/20241118_fold_1_train_10pctval//', recurse=True)
dbutils.fs.rm(f'{team_blob_url}/20241118_fold_1_val//', recurse=True)
dbutils.fs.rm(f'{team_blob_url}/20241118_fold_1_val_10pctval//', recurse=True)
dbutils.fs.rm(f'{team_blob_url}/20241118_fold_2_train//', recurse=True)
dbutils.fs.rm(f'{team_blob_url}/20241118_fold_2_val//', recurse=True)

dbutils.fs.rm(f'{team_blob_url}/20241118_fold_2_val_10pctval///', recurse=True)
dbutils.fs.rm(f'{team_blob_url}/20241118_fold_3_val///', recurse=True)
dbutils.fs.rm(f'{team_blob_url}/20241118_fold_3_val_10pctval///', recurse=True)
dbutils.fs.rm(f'{team_blob_url}/20241118_fold_4_train///', recurse=True)
dbutils.fs.rm(f'{team_blob_url}/20241118_fold_4_train_10pctval///', recurse=True)
dbutils.fs.rm(f'{team_blob_url}/20241118_fold_5_train///', recurse=True)

dbutils.fs.rm(f'{team_blob_url}/20241118_fold_5_train_10pctval///', recurse=True)
dbutils.fs.rm(f'{team_blob_url}/20241118_test_10pctval////', recurse=True)

dbutils.fs.rm(f'{team_blob_url}/20241118_fold_2_train_10pctval///', recurse=True)
dbutils.fs.rm(f'{team_blob_url}/20241118_fold_3_train///', recurse=True)
dbutils.fs.rm(f'{team_blob_url}/20241118_fold_3_train_10pctval///', recurse=True)
dbutils.fs.rm(f'{team_blob_url}/20241118_fold_4_val///', recurse=True)


dbutils.fs.rm(f'{team_blob_url}/20241118_fold_4_val_10pctval////', recurse=True)
dbutils.fs.rm(f'{team_blob_url}/20241118_fold_5_val////', recurse=True)
dbutils.fs.rm(f'{team_blob_url}/20241118_fold_5_val_10pctval////', recurse=True)
dbutils.fs.rm(f'{team_blob_url}/20241118_test////', recurse=True)


dbutils.fs.rm(f'{team_blob_url}/TP/////', recurse=True)
dbutils.fs.rm(f'{team_blob_url}/TP2/////', recurse=True)
dbutils.fs.rm(f'{team_blob_url}/phase1/////', recurse=True)


True


# Know your mount
Here is the mounting for this class, your source for the original data! Remember, you only have Read access, not Write! Also, become familiar with `dbutils` the equivalent of `gcp` in DataProc

In [0]:
data_BASE_DIR = "dbfs:/mnt/mids-w261/"
display(dbutils.fs.ls(f"{data_BASE_DIR}"))

path,name,size,modificationTime
dbfs:/mnt/mids-w261/2014_flights/,2014_flights/,0,1731598242000
dbfs:/mnt/mids-w261/HW5/,HW5/,0,0
dbfs:/mnt/mids-w261/OTPW_12M/,OTPW_12M/,0,1721794777000
dbfs:/mnt/mids-w261/OTPW_12M_2015.parquet/,OTPW_12M_2015.parquet/,0,1721930011000
dbfs:/mnt/mids-w261/OTPW_12M_2015_cleaned_sorted.parquet/,OTPW_12M_2015_cleaned_sorted.parquet/,0,1731788138000
dbfs:/mnt/mids-w261/OTPW_12M_2015_sorted.parquet/,OTPW_12M_2015_sorted.parquet/,0,1731652365000
dbfs:/mnt/mids-w261/OTPW_1D_CSV/,OTPW_1D_CSV/,0,0
dbfs:/mnt/mids-w261/OTPW_36M/,OTPW_36M/,0,0
dbfs:/mnt/mids-w261/OTPW_3M/,OTPW_3M/,0,1721832995000
dbfs:/mnt/mids-w261/OTPW_3M_2015.csv,OTPW_3M_2015.csv,1500620247,1679772070000


# Functions

In [0]:
def connect_to_team_blob_and_start_spark():
  # The following blob storage is accessible to team members only (read and write)
  # access key is valid til TTL
  # after that you will need to create a new SAS key and authenticate access again via DataBrick command line
  blob_container  = "team-3-2-project261"       # The name of your container created in https://portal.azure.com
  storage_account = "fall2024team32"  # The name of your Storage account created in https://portal.azure.com
  secret_scope    = "scope_team_3_2"           # The name of the scope created in your local computer using the Databricks CLI
  secret_key      = "key_team_3_2"             # The name of the secret key created in your local computer using the Databricks CLI
  team_blob_url   = f"wasbs://{blob_container}@{storage_account}.blob.core.windows.net"  #points to the root of your team storage bucket


  # the 261 course blob storage is mounted here.
  mids261_mount_path      = "/mnt/mids-w261"

  # SAS Token: Grant the team limited access to Azure Storage resources
  spark.conf.set(
    f"fs.azure.sas.{blob_container}.{storage_account}.blob.core.windows.net",
    dbutils.secrets.get(scope = secret_scope, key = secret_key)
  )

  return team_blob_url, mids261_mount_path

def make_departure_related_features_from_other_flights_simplified(input_df, tgt_feat_names):
    input_df = input_df.withColumn("two_hours_prior_depart_UTC", to_timestamp("two_hours_prior_depart_UTC", "yyyy-MM-dd HH:mm:ss"))
    input_df = input_df.withColumn("two_hours_prior_depart_UTC_EPOCH", unix_timestamp("two_hours_prior_depart_UTC", "yyyy-MM-dd HH:mm:ss"))
    input_df = input_df.orderBy('two_hours_prior_depart_UTC_EPOCH')

    # IN: an approximation of the avg of the departure delay and taxi out time of the other flights, over the most recent 2 hour window, which is available at 2hr before departure
    # IN: so this is calculated by ordering records by 2hr ahead of departure time, then look for instances 4 to 2 hours ahead of current record. Before they are the ones
    # IN: most likely have already departed by then, because their 2hr ahead of departure time is 2.5hrs ahead of the time of prediction (2hr ahead of current record's departure)
    # IN: this is less accurate than using join with explicit condition, but hopefully faster by avoiding join

    window_spec = Window.partitionBy('ORIGIN').orderBy('two_hours_prior_depart_UTC_EPOCH').rangeBetween(-int(5.5*3600), -int(2.5*3600))

    # calculate the avg
    for col_name in tgt_feat_names:
        input_df = input_df.withColumn(f'Avg_{col_name}_other_flights', F.avg(col_name).over(window_spec))
        input_df = input_df.withColumn(f'Last_{col_name}_other_flights', F.last(col_name).over(window_spec))    
    
    input_df = input_df.drop('two_hours_prior_depart_UTC_EPOCH')

    return input_df

def make_departure_related_features_from_other_flights_4_2hr_prior_dep(input_df, tgt_feat_names):
    input_df = input_df.withColumn("two_hours_prior_depart_UTC", to_timestamp("two_hours_prior_depart_UTC", "yyyy-MM-dd HH:mm:ss"))
    input_df = input_df.withColumn('row_id', F.monotonically_increasing_id())
    input_df_sel = input_df.select(['row_id', 'two_hours_prior_depart_UTC', 'ORIGIN', 'four_hours_prior_depart_UTC', 'Actual_departure_time_UTC']+tgt_feat_names)

    df_joined = broadcast(input_df_sel).alias('df0').join(input_df_sel.alias('df_cond'),
                                                    (F.col('df_cond.Actual_departure_time_UTC') > F.col('df0.four_hours_prior_depart_UTC'))&
                                                    (F.col('df_cond.Actual_departure_time_UTC') < F.col('df0.two_hours_prior_depart_UTC'))&
                                                    (F.col('df_cond.ORIGIN') == F.col('df0.ORIGIN')),
                                                    how='left')
    
    # calculate the avg
    agg_exprs_avg = [F.avg(f'df_cond.{col}').alias(f'Avg_{col}_other_flights') for col in tgt_feat_names]

    df_avg = df_joined.groupBy('df0.row_id').agg(*agg_exprs_avg)\
                                                 .select(['row_id']+[f'Avg_{col}_other_flights' for col in tgt_feat_names])
    ''''''
    # calculate the last - 1 - the native collect list does not work well if we want to sort by time; turn to applyINPandas
    def process_group(pdf):
        pdf_sorted = pdf.sort_values('Actual_departure_time_UTC')
        result_dic = {'row_id': pdf_sorted['row_id'].iloc[0]}

        for col_name in tgt_feat_names:
            result_dic[f'Last_{col_name}_other_flights'] = pdf_sorted[f'{col_name}'].iloc[-1]
        return pd.DataFrame(result_dic, index=[0])

    schema_spec = StructType([StructField('row_id', LongType(), True)] + 
                    [StructField(f'Last_{col_name}_other_flights', DoubleType(), True) for col_name in tgt_feat_names])
    
    #IN: note in side applyInPandas, can't refer df0.row_id ect, need to use explicit column names, so select first
    # df_last = df_joined.select(['df0.row_id', 'df_cond.Actual_departure_time_UTC']+
    #                            [f'df_cond.{col}' for col in tgt_feat_names])\
    #                                .groupBy('df0.row_id').applyInPandas(process_group, schema=schema_spec)\
    #                                              .select(['row_id']+[f'Last_{col}_other_flights' for col in tgt_feat_names])
    ''''''

    # join to original data
    # input_df = (input_df).join(broadcast(df_avg), on='row_id', how='left')\
    #     .join(broadcast(df_last), on='row_id', how='left').drop('row_id')
    
    # IN: if skip last flight metrics, only join the average
    input_df = (input_df).join(broadcast(df_avg), on='row_id', how='left').drop('row_id')

    return input_df

def make_arrival_related_features_from_other_flights_4_2hr_prior_dep(input_df, tgt_feat_names):
    input_df = input_df.withColumn('row_id', F.monotonically_increasing_id())
    input_df_sel = input_df.select(['row_id', 'two_hours_prior_depart_UTC', 'ORIGIN', 'DEST', 'four_hours_prior_depart_UTC','Actual_arrival_time_UTC']+tgt_feat_names)

    df_joined = (broadcast(input_df_sel).alias('df0')).join(input_df_sel.alias('df_cond'),
                                                    (F.col('df_cond.Actual_arrival_time_UTC') > F.col('df0.four_hours_prior_depart_UTC'))&
                                                    (F.col('df_cond.Actual_arrival_time_UTC') < F.col('df0.two_hours_prior_depart_UTC'))&
                                                    (F.col('df_cond.DEST') == F.col('df0.ORIGIN')),
                                                    how='left')
    # calculate the avg
    agg_exprs_avg = [F.avg(f'df_cond.{col}').alias(f'Avg_{col}_other_flights') for col in tgt_feat_names]

    df_avg = df_joined.groupBy('df0.row_id').agg(*agg_exprs_avg)\
                                                 .select(['row_id']+[f'Avg_{col}_other_flights' for col in tgt_feat_names])

    # calculate the last
    def process_group(pdf):
        pdf_sorted = pdf.sort_values('Actual_arrival_time_UTC')
        result_dic = {'row_id': pdf_sorted['row_id'].iloc[0]}

        for col_name in tgt_feat_names:
            result_dic[f'Last_{col_name}_other_flights'] = pdf_sorted[f'{col_name}'].iloc[-1]
        return pd.DataFrame(result_dic, index=[0])

    schema_spec = StructType([StructField('row_id', LongType(), True)] + 
                    [StructField(f'Last_{col_name}_other_flights', DoubleType(), True) for col_name in tgt_feat_names])
    
    #IN: note inside applyInPandas, can't refer df0.row_id ect, need to use explicit column names, so select first
    # df_last = df_joined.select(['df0.row_id', 'df_cond.Actual_arrival_time_UTC']+
    #                            [f'df_cond.{col}' for col in tgt_feat_names])\
    #                                .groupBy('df0.row_id').applyInPandas(process_group, schema=schema_spec)\
    #                                              .select(['row_id']+[f'Last_{col}_other_flights' for col in tgt_feat_names])

    # join to original data
    # input_df = (input_df).join(broadcast(df_avg), on='row_id', how='left')\
    #     .join(broadcast(df_last), on='row_id', how='left').drop('row_id')
    
    # IN: if skip last flight metrics, only join the average
    input_df = (input_df).join(broadcast(df_avg), on='row_id', how='left').drop('row_id')

    return input_df
  
def apply_rolling_forecast_weekly(pdf):
    res = pd.DataFrame(columns=['origin_type', 'ds', 'yhat'])
    group_name = pdf.iloc[0]['origin_type']
    # pdf = pdf.groupby('ds').agg(F.mean('y').alias('y')).cache()
    # rolling_windows = pdf['ds'].rolling(window=30)

    for i, row in pdf.iterrows():
        # Define the rolling window (last 30 days)
        current_week = row["ds"]
        window_data = pdf[
            (pdf["ds"] <= current_week) & 
            (pdf["ds"] > (current_week - pd.Timedelta(weeks=52)))
        ]
        
        try:
            ts_mod = Prophet(interval_width=0.95, growth='linear',
                            daily_seasonality=False,
                            weekly_seasonality=False,
                            yearly_seasonality=True,
                            seasonality_mode='multiplicative')
            ts_mod.fit(window_data)
            curr_1delta = ts_mod.make_future_dataframe(periods=1, freq='W-MON', include_history=False)
            forecast = ts_mod.predict(curr_1delta)
        except Exception as e:
            print(e)
            forecast = pd.DataFrame(columns=['ds', 'yhat'])
      
        res = pd.concat([res,  \
            pd.DataFrame({'origin_type': [str(group_name)]}).\
            join(forecast[['ds', 'yhat']].iloc[-1:])])
    
    return res

def add_prophet_features_weekly(input_df, new_features, tgt_feat_name):
    groupby_name = 'origin_type'
    df_used = input_df.withColumn('year_month_week', date_trunc('week', col('FL_DATE'))) \
        .select('year_month_week',groupby_name, tgt_feat_name).orderBy('year_month_week')\
            .groupBy(groupby_name, 'year_month_week').agg(F.mean(tgt_feat_name).alias(f'Weekly_avg_{tgt_feat_name}')).cache()
    df_new = df_used.withColumn('ds', col('year_month_week')).withColumn('y', col(f'Weekly_avg_{tgt_feat_name}'))\
        .groupBy(groupby_name).applyInPandas(apply_rolling_forecast_weekly, schema=f'{groupby_name} string, ds timestamp, yhat double').cache()

    output_df = (input_df.withColumn('year_month_week', date_trunc('week', col('FL_DATE'))))\
    .join(broadcast(df_new.select(col(groupby_name), col('ds').alias('year_month_week'), col('yhat').alias(f'Weekly_avg_{tgt_feat_name}_hat'))), \
        on=[groupby_name,'year_month_week'], how='left').cache()

    new_features.append(f'Weekly_avg_{tgt_feat_name}_hat')

    return output_df, new_features, df_used, df_new


def apply_rolling_forecast_monthly(pdf):
    res = pd.DataFrame(columns=['origin_type', 'ds', 'yhat'])
    group_name = pdf.iloc[0]['origin_type']
    # pdf = pdf.groupby('ds').agg(F.mean('y').alias('y')).cache()
    # rolling_windows = pdf['ds'].rolling(window=30)

    for i, row in pdf.iterrows():
        # Define the rolling window (last 30 days)
        current_month = row["ds"]
        window_data = pdf[
            (pdf["ds"] <= current_month) & 
            (pdf["ds"] > (current_month - pd.DateOffset(months=12)))
        ]

        try:
            ts_mod = Prophet(interval_width=0.95, growth='linear',
                            daily_seasonality=False,
                            weekly_seasonality=False,
                            yearly_seasonality=True,
                            seasonality_mode='multiplicative')
            ts_mod.fit(window_data)
            curr_1delta = ts_mod.make_future_dataframe(periods=1, freq='MS', include_history=False)  #IN: 'MS' is to make the prediction dates as the start of the month, default: end of month
            forecast = ts_mod.predict(curr_1delta)
        except Exception as e:
            print(e)
            forecast = pd.DataFrame(columns=['ds', 'yhat'])
        res = pd.concat([res,  \
            pd.DataFrame({'origin_type': [str(group_name)]}).\
            join(forecast[['ds', 'yhat']].iloc[-1:])])
    
    return res


def add_prophet_features_monthly(input_df, new_features, tgt_feat_name):

    groupby_name = 'origin_type'
    df_used = input_df.withColumn('year_month', date_trunc('month', col('FL_DATE'))) \
        .select('year_month',groupby_name, tgt_feat_name).orderBy('year_month')\
            .groupBy(groupby_name, 'year_month').agg(F.mean(tgt_feat_name).alias(f'Monthly_avg_{tgt_feat_name}')).cache()

    df_new = df_used.withColumn('ds', col('year_month')).withColumn('y', col(f'Monthly_avg_{tgt_feat_name}'))\
        .groupBy(groupby_name).applyInPandas(apply_rolling_forecast_monthly, schema=f'{groupby_name} string, ds timestamp, yhat double').cache()

    output_df = (input_df.withColumn('year_month', date_trunc('month', col('FL_DATE'))))\
    .join(broadcast(df_new.select(col(groupby_name), col('ds').alias('year_month'), col('yhat').alias(f'Monthly_avg_{tgt_feat_name}_hat'))), \
        on=[groupby_name,'year_month'], how='left').cache()

    new_features.append(f'Monthly_avg_{tgt_feat_name}_hat')

    return output_df, new_features, df_used, df_new


def make_degree_centrality_features_scheduled(input_df):
    input_df = input_df.withColumn("two_hours_prior_depart_UTC", to_timestamp("two_hours_prior_depart_UTC", "yyyy-MM-dd HH:mm:ss"))
    input_df = input_df.withColumn("two_hours_prior_depart_UTC_EPOCH", unix_timestamp("two_hours_prior_depart_UTC", "yyyy-MM-dd HH:mm:ss"))
    input_df = input_df.orderBy('two_hours_prior_depart_UTC_EPOCH')
    # input_df = input_df.withColumn('row_id', F.monotonically_increasing_id())

    # input_df_sel = input_df.select(['two_hours_prior_depart_UTC_EPOCH', 'ORIGIN']+tgt_feat_names)

    # IN: an approximation of the number of flights departing from current records' origin airport, and the number of flights arriving in current records' destination ariport; here we simply use the number of flights scheduled to depart and arrive in the past 3 hours preceeding the current records' 2hr ahead departure time (prediction time) as approximation. Given this information is by schedule so it is already knonw even though such event may not have happened  

    window_spec_origin = Window.partitionBy('ORIGIN').orderBy('two_hours_prior_depart_UTC_EPOCH').rangeBetween(-int(3*3600), -1)
    window_spec_dest = Window.partitionBy('DEST').orderBy('two_hours_prior_depart_UTC_EPOCH').rangeBetween(-int(3*3600), -1)

    # calculate the avg
    # input_df = input_df.withColumn(f'In_degree_same_origin_past_3hr', F.count('two_hours_prior_depart_UTC_EPOCH').over(window_spec_origin))
    # input_df = input_df.withColumn(f'Out_degree_same_dest_past_3hr', F.count('two_hours_prior_depart_UTC_EPOCH').over(window_spec_dest))    

    input_df = input_df.withColumn(f'Out_degree_same_origin_past_3hr', F.count('two_hours_prior_depart_UTC_EPOCH').over(window_spec_origin))
    input_df = input_df.withColumn(f'In_degree_same_dest_past_3hr', F.count('two_hours_prior_depart_UTC_EPOCH').over(window_spec_dest))  
    
    input_df = input_df.drop('two_hours_prior_depart_UTC_EPOCH')

    return input_df

def create_binary_representation_for_origin_dest(input_df):
    ## Convert origin and dest to binary representation - From MM
    def create_pandas_airport_codes_df(spark_df):
        # Print number of unique values in the origin_icao and dest_icao columns
        # print(f"Unique Origin airport codes: {spark_df.select('ORIGIN').distinct().count()}")
        # print(f"Unique Destination airport codes: {spark_df.select('DEST').distinct().count()}")

        pandas_orig_df  = spark_df.select("ORIGIN").distinct().toPandas()
        pandas_dest_df = spark_df.select("DEST").distinct().toPandas()

        # Create a superset of airport codes from origin and destination airports
        pandas_icao_df = pd.merge(pandas_orig_df, pandas_dest_df, left_on='ORIGIN', right_on='DEST', how='outer')
        if pandas_icao_df['ORIGIN'].count() < pandas_icao_df['DEST'].count():
            pandas_icao_df.drop('ORIGIN', axis=1, inplace=True)
            pandas_icao_df.rename(columns={'DEST':'icao'}, inplace=True)
        else:
            pandas_icao_df.drop('DEST', axis=1, inplace=True)
            pandas_icao_df.rename(columns={'ORIGIN':'icao'}, inplace=True)

        del pandas_orig_df
        del pandas_dest_df
        
        return pandas_icao_df

    def int_to_bits_array(num, embed_size):
        import numpy as np
        """
        Converts an integer to a list of bits, limited by the specified embedding size
        Note: The bit array is always zero padded to the specified embedding size
        """
        # start from 2nd field because the bit array is prefixed with 0b
        return np.array([int(bit) for bit in bin(num)[2:].zfill(embed_size)])

    def create_binary_embeddings_for_airport_codes(pandas_icao_df):
        le = LabelEncoder()
        pandas_icao_df['le_index'] = le.fit_transform(pandas_icao_df['icao'])
        # unique_indices = pandas_icao_df["le_index"].values.astype(np.int32)
        unique_indices = pandas_icao_df["le_index"].values.astype(int)

        # Create embedding layer
        vocab_size = len(unique_indices)
        embedding_dim = math.floor(math.log2(vocab_size)) + 1

        embeddings = []
        for i in range(vocab_size):
            embeddings.append(int_to_bits_array(unique_indices[i], embedding_dim))

        embeddings = np.array(embeddings)
        return embeddings, embeddings.shape[1]
    
    # EMBEDDING_DIMENSION = airport_code_embeddings.shape[1]

    def create_spark_airport_embeddings_df(pandas_icao_df, embeddings):
        # Drop the le_index column, because we don't need it anymore
        pandas_icao_df.drop('le_index', axis=1, inplace=True)

        # Create copies of pandas_icao_df - one each for origin and destination
        pandas_origin_icao_df = pandas_icao_df.copy()
        pandas_dest_icao_df = pandas_icao_df.copy()

        embeddings_orig_df = pd.DataFrame(embeddings, columns=[f'origin_icao_embedding_{i}' for i in range(embeddings.shape[1])])
        pandas_origin_icao_df.rename(columns={'icao':'ORIGIN'}, inplace=True)
        pandas_origin_icao_df = pd.concat([pandas_origin_icao_df, embeddings_orig_df], axis=1)

        embeddings_dest_df = pd.DataFrame(embeddings, columns=[f'dest_icao_embedding_{i}' for i in range(embeddings.shape[1])])
        pandas_dest_icao_df.rename(columns={'icao':'DEST'}, inplace=True)
        pandas_dest_icao_df = pd.concat([pandas_dest_icao_df, embeddings_dest_df], axis=1)

        airport_origin_emb_df = spark.createDataFrame(pandas_origin_icao_df)
        airport_dest_emb_df = spark.createDataFrame(pandas_dest_icao_df)

        return airport_origin_emb_df, airport_dest_emb_df

    def assemble_airport_code_columns(spark_flights_df, embed_size):
        # IN: fill na with zero before assemble
        col_names = [f'origin_icao_embedding_{i}' for i in range(embed_size)] + [f'dest_icao_embedding_{i}' for i in range(embed_size)]
        for col_name in col_names:
            spark_flights_df = spark_flights_df.fillna({col_name: 0})

        cols_to_be_assembled = [f'origin_icao_embedding_{i}' for i in range(embed_size)] 
        assembler = VectorAssembler(inputCols=cols_to_be_assembled, outputCol='origin_code_vector')
        spark_flights_df = assembler.transform(spark_flights_df).drop(*cols_to_be_assembled)
        # spark_flights_df = assembler.transform(spark_flights_df)

        cols_to_be_assembled = [f'dest_icao_embedding_{i}' for i in range(embed_size)]
        assembler = VectorAssembler(inputCols=cols_to_be_assembled, outputCol='dest_code_vector')
        spark_flights_df = assembler.transform(spark_flights_df).drop(*cols_to_be_assembled)
        # spark_flights_df = assembler.transform(spark_flights_df)
        
        return spark_flights_df

    def create_embedding_columns_in_flights_df(spark_flights_df, airport_origin_emb_df, airport_dest_emb_df, EMBEDDING_DIMENSION):

        # Perform an outer join of origin airport embeddings
        spark_flights_df = (spark_flights_df).join(broadcast(airport_origin_emb_df), 
                    spark_flights_df.ORIGIN == airport_origin_emb_df.ORIGIN,
                    how="left").drop(airport_origin_emb_df.ORIGIN).cache()

        # Perform an outer join of destination airport embeddings
        spark_flights_df = (spark_flights_df).join(broadcast(airport_dest_emb_df), 
                    spark_flights_df.DEST == airport_dest_emb_df.DEST,
                    how="left").drop(airport_dest_emb_df.DEST).cache()
        
        # Put in assembled columns
        spark_flights_df = assemble_airport_code_columns(spark_flights_df, EMBEDDING_DIMENSION).cache()

        return spark_flights_df
    
    ## put it together
    pandas_icao_df = create_pandas_airport_codes_df(input_df)
    airport_code_embeddings, EMBEDDING_DIMENSION = create_binary_embeddings_for_airport_codes(pandas_icao_df)
    airport_origin_emb_df, airport_dest_emb_df = create_spark_airport_embeddings_df(pandas_icao_df, airport_code_embeddings)
    input_df = create_embedding_columns_in_flights_df(input_df, airport_origin_emb_df, airport_dest_emb_df, EMBEDDING_DIMENSION)

    return input_df

def make_flight_tail_related_features(input_df, tgt_feat_names):
    input_df = input_df.withColumn("two_hours_prior_depart_UTC", 
                                    to_timestamp("two_hours_prior_depart_UTC", "yyyy-MM-dd HH:mm:ss"))

    '''method 1 (groupby, applyInPandas)'''
    def process_group_shift(pdf):
        pdf_sorted = pdf.sort_values('two_hours_prior_depart_UTC')

        pdf_used = pd.concat([pdf_sorted[['TAIL_NUM','two_hours_prior_depart_UTC']], 
                                pdf_sorted[tgt_feat_names+['Actual_departure_time_UTC']].shift(1)], axis=1)
        pdf_used['calculated_delay'] = (pdf_used['two_hours_prior_depart_UTC'].diff()
                                        -pd.Timedelta(hours=2)).dt.total_seconds()/60.0

        pdf_used[[f'Lag_same_flight_{col_name}' for col_name in tgt_feat_names]] \
            = pdf_used[tgt_feat_names].where(
                (pdf_used['two_hours_prior_depart_UTC']>pdf_used['Actual_departure_time_UTC']),\
                (pdf_used['calculated_delay']), axis=0)

        return pdf_used[['TAIL_NUM','two_hours_prior_depart_UTC']+[f'Lag_same_flight_{col_name}' for col_name in tgt_feat_names]]

    schema_spec = StructType([StructField('TAIL_NUM', StringType(), True)]+
                            [StructField('two_hours_prior_depart_UTC', TimestampType(), True)] + 
                    [StructField(f'Lag_same_flight_{col_name}', DoubleType(), True) for col_name in tgt_feat_names])
                                
    tail_df = input_df.select(['TAIL_NUM','two_hours_prior_depart_UTC','Actual_departure_time_UTC']+tgt_feat_names).\
                            groupBy('TAIL_NUM').applyInPandas(process_group_shift, schema_spec)

    
    # join to original data
    input_df = input_df.join((tail_df), on=['TAIL_NUM', 'two_hours_prior_depart_UTC'], how='left')

    return input_df

def create_additional_features(data_col_cleaned_fe, new_features, team_blob_url):
    # collecting new features:
    # new_features = []

    # Derive actual arrival and actual departure UTC
    data_col_cleaned_fe = data_col_cleaned_fe.withColumn(
        "Actual_departure_time_UTC",
        to_timestamp(
            from_unixtime(
                unix_timestamp(col("sched_depart_date_time_UTC"))
                + col("DEP_DELAY_NEW") * 60
            )
        ),
    ).cache()

    data_col_cleaned_fe = data_col_cleaned_fe.withColumn(
        "Actual_arrival_time_UTC",
        to_timestamp(
            from_unixtime(
                unix_timestamp(col("Actual_departure_time_UTC"))
                + col("ACTUAL_ELAPSED_TIME") * 60
            )
        ),
    ).cache()

    # Derive features that's related to the airplane iteself
    print ('Derive features based on the flight itself..')    
    tgt_feat_names_tail =['DEP_DELAY_NEW']
    data_col_cleaned_fe = make_flight_tail_related_features(data_col_cleaned_fe, tgt_feat_names_tail).cache()
    ## if load from blob
    # data_col_cleaned_fe = spark.read.parquet(f"{team_blob_url}/data_cleaned_ordered_with_lag_feature_{data_months_label}_{output_version}")
    new_features.extend([f'Lag_same_flight_{col}' for col in tgt_feat_names_tail])

    # # Checkpoint the data with lag features before splitting
    # data_col_cleaned_fe.write.mode("overwrite").parquet(
    #   f"{team_blob_url}/data_cleaned_with_lag_feature_{data_months_label}_{output_version}")
    # print(f"saved data with lag feature")

    # Derive features from other flights departure status
    # print ('Derive features from other flights departure status (full)..')
    # tgt_feat_names_dep = ["DEP_DELAY_NEW", "TAXI_OUT"]
    # data_col_cleaned_fe = (
    #     make_departure_related_features_from_other_flights_4_2hr_prior_dep(
    #         data_col_cleaned_fe, tgt_feat_names_dep
    #     )
    # ).cache()
    # new_features.extend([f"Avg_{col}_other_flights" for col in tgt_feat_names_dep])
    # # new_features.extend([f"Last_{col}_other_flights" for col in tgt_feat_names_dep])

    print (f'Total number of rows after lag based features: {data_col_cleaned_fe.count()}')

    print ('Derive features from other flights departure status (simpflied)..')
    tgt_feat_names_dep = ["DEP_DELAY_NEW", "TAXI_OUT"]
    data_col_cleaned_fe = (
        make_departure_related_features_from_other_flights_simplified(
            data_col_cleaned_fe, tgt_feat_names_dep
        )
    ).cache()

    new_features.extend([f"Avg_{col}_other_flights" for col in tgt_feat_names_dep])
    new_features.extend([f"Last_{col}_other_flights" for col in tgt_feat_names_dep])

    print (f'Total number of rows after departure based features: {data_col_cleaned_fe.count()}')

    # # Checkpoint the data with departure features before splitting
    # data_col_cleaned_fe.write.mode("overwrite").parquet(
    #   f"{team_blob_url}/data_cleaned_with_departure_features_{data_months_label}_{output_version}")
    # print(f"saved data with departure related feature")

    # Derive features from other flights' arrival status - disabled for >60m
    # print ('Derive features from other flights arrival status..')
    # # tgt_feat_names_arr =['ARR_DELAY_NEW', 'TAXI_IN', 'Actual_vs_CRS_elapsed_time']
    # tgt_feat_names_arr = ["ARR_DELAY_NEW", "TAXI_IN"]
    # data_col_cleaned_fe = (
    #     make_arrival_related_features_from_other_flights_4_2hr_prior_dep(
    #         data_col_cleaned_fe, tgt_feat_names_arr
    #     )
    # ).cache()
    # new_features.extend([f"Avg_{col}_other_flights" for col in tgt_feat_names_arr])
    # # new_features.extend([f"Last_{col}_other_flights" for col in tgt_feat_names_arr])
    # # new_features.remove("Actual_vs_CRS_elapsed_time")

    # # # Checkpoint the data with arrival features before splitting
    # # data_col_cleaned_fe.write.mode("overwrite").parquet(
    # #   f"{team_blob_url}/data_cleaned_with_arrival_related_features_{data_months_label}_{output_version}")
    # # print(f"saved data with arrival related feature")
    
    # # Derive rolling window prophet forecast for a given type of airport - weekly - not for 60m
    # print ('Derive features from rolling weekly prophet forecast..')
    # tgt_feat_name = 'DEP_DELAY_NEW'
    # data_col_cleaned_fe, new_features, df_used, df_new = \
    #     add_prophet_features_weekly(data_col_cleaned_fe.drop(f'Weekly_avg_{tgt_feat_name}_hat'), new_features, tgt_feat_name)
    # new_features.append(f'Weekly_avg_{tgt_feat_name}_hat')

    # Derive rolling window prophet forecast for a given type of airport - monthly - only for 60m
    print ('Derive features from rolling monthly prophet forecast..')
    tgt_feat_name = 'DEP_DELAY_NEW'
    data_col_cleaned_fe, new_features, df_used, df_new = \
        add_prophet_features_monthly(data_col_cleaned_fe.drop('Monthly_avg_DEP_DELAY_NEW_hat'), new_features, tgt_feat_name)
    
    print (f'Total number of rows after prophet based features: {data_col_cleaned_fe.count()}')

    # # Checkpoint the data with prophet features before splitting
    # data_col_cleaned_fe.write.mode("overwrite").parquet(
    #   f"{team_blob_url}/data_cleaned_with_prophet_related_features_{data_months_label}_{output_version}")
    # print(f"saved data with prophet related feature")

    # Convert ORIGIN and DEST to binary representation
    print ('Convert ORIGIN and DEST to binary representation..')
    data_col_cleaned_fe = create_binary_representation_for_origin_dest(data_col_cleaned_fe).cache()
    new_features.extend(['origin_code_vector', 'dest_code_vector'])

    print (f'Total number of rows after ORIGIN and DEST binary vector: {data_col_cleaned_fe.count()}')

    # # Checkpoint the data with ORIGIN and DEST binary representation features before splitting
    # data_col_cleaned_fe.write.mode("overwrite").parquet(
    #   f"{team_blob_url}/data_cleaned_with_origin_desk_vector_features_{data_months_label}_{output_version}")
    # print(f"saved data with origin and desk vector related feature")

    # Create degree_centrality based on schedule
    print ('Create degree_centrality based on scheduled flight in past three hours..')
    data_col_cleaned_fe = make_degree_centrality_features_scheduled(data_col_cleaned_fe).cache()
    # new_features.extend(['In_degree_same_origin_past_3hr', 'Out_degree_same_dest_past_3hr'])
    new_features.extend(['Out_degree_same_origin_past_3hr', 'In_degree_same_dest_past_3hr'])

    print (f'Total number of rows after degree_centrality in past three hours: {data_col_cleaned_fe.count()}')

    return data_col_cleaned_fe, new_features


def label_encoding_with_order(input_df, col_name, order_label_list):
    mapping_label = {label: i for i, label in enumerate(order_label_list)}

    # create udf to map the labels according to the order
    label_udf = udf(lambda x: mapping_label.get(x), IntegerType())

    # Skip StringIndexer, create columns with revised name and intended label indexing
    encoded_df2 = input_df.withColumn(col_name+'_Encoded', label_udf(col(col_name)))

    # check results
    encoded_df2.groupBy(col_name).agg(F.last_value(col_name+'_Encoded')).show()
    
    return encoded_df2


def one_hot_encoding(col_list_for_one_hot, input_df_to_fit, input_df_to_transform_list):
  stages = []

  for col in col_list_for_one_hot:
    # step 1: StringIndexer
    indexer = StringIndexer(inputCol=col, outputCol=col+'_Indexed', handleInvalid='keep')
    # step 2: OneHot Encoder
    encoder = OneHotEncoder(inputCol=col+'_Indexed', outputCol = col+'_Encoded')
    # step 3: add stages to the pipeline
    stages += [indexer, encoder]

  pipeline = Pipeline(stages=stages)

  output_df_list = []
  for input_df_to_transform in input_df_to_transform_list:
    print ('input_df_to_transform.count(): ', input_df_to_transform.count())
    df_encoded = pipeline.fit(input_df_to_fit).transform(input_df_to_transform)
    print ('df_encoded.count(): ', df_encoded.count())
    df_encoded = df_encoded.drop(*[col+'_Indexed' for col in col_list_for_one_hot]).cache()
    print ('df_encoded.count(): ', df_encoded.count())
    output_df_list.append(df_encoded)
    
    # check results
    df_encoded.select(*[col+'_Encoded' for col in col_list_for_one_hot]).show(10)

  return output_df_list


def scaling(cols_to_be_scaled, input_df_to_fit, input_df_to_transform_list, scaling_flag):
    # fill na first
    ## fill with zero
    # input_df_filled = input_df.fillna(dict(zip(cols_to_be_scaled, [0]*len(cols_to_be_scaled))))

    ## fill with mean
    imputer = Imputer(
    inputCols=cols_to_be_scaled,
    outputCols=cols_to_be_scaled  # Use the same names to overwrite
    ).setStrategy("mean")

    # Step 2: Fit the imputer model and transform the DataFrame
    input_df_to_fit_filled = imputer.fit(input_df_to_fit).transform(input_df_to_fit)

    assembler = VectorAssembler(inputCols=cols_to_be_scaled, outputCol='features_to_be_scaled', handleInvalid='keep')
    df_vector_to_fit = assembler.transform(input_df_to_fit_filled)

    if scaling_flag == 'standard':
        scaler = StandardScaler(inputCol='features_to_be_scaled', outputCol = 'scaled_features').fit(df_vector_to_fit)

    elif scaling_flag == 'minmax':
        scaler = MinMaxScaler(inputCol='features_to_be_scaled', outputCol = 'scaled_features').fit(df_vector_to_fit)
        # df_scaled = scaler.fit(df_vector_to_fit).transform(df_vector_to_transform)

    output_df_list = []
    for input_df_to_transform in input_df_to_transform_list:
        input_df_to_transform_filled = imputer.fit(input_df_to_transform).transform(input_df_to_transform)
        df_vector_to_transform = assembler.transform(input_df_to_transform_filled)
        
        df_scaled = scaler.transform(df_vector_to_transform)
        # check results
        df_scaled.select('features_to_be_scaled', 'scaled_features').show(5)
        output_df_list.append(df_scaled.drop('features_to_be_scaled').cache())

    return output_df_list
  
def split_train_val_test(input_df, split_ratio, data_months_label):
    if data_months_label is None or data_months_label == '3m':
        '''if using limit, subtract'''
        # input_df = input_df.withColumn("two_hours_prior_depart_UTC", to_timestamp("two_hours_prior_depart_UTC", "yyyy-MM-dd HH:mm:ss"))
        # # input_df = input_df.orderBy('two_hours_prior_depart_UTC').cache()
        # total_rows = input_df.count()
        # train_val_limit = int(total_rows*split_ratio)

        # train_val_df = input_df.orderBy('two_hours_prior_depart_UTC').limit(train_val_limit).cache()
        # test_df = input_df.subtract(train_val_df).cache() 

        '''if using row_id'''
        input_df = input_df.withColumn("two_hours_prior_depart_UTC", 
                                       to_timestamp("two_hours_prior_depart_UTC", "yyyy-MM-dd HH:mm:ss"))
        input_df = input_df.withColumn('row_id', row_number().over(Window.partitionBy().orderBy('two_hours_prior_depart_UTC')))
        total_rows = input_df.count()
        train_val_limit = int(total_rows*split_ratio)
        
        train_val_df = input_df.where(col('row_id') <= train_val_limit).cache()
        test_df = input_df.where(col('row_id')>train_val_limit).cache()
    
    elif data_months_label == '12m':
        train_val_df, test_df = split_train_val_test_12m(input_df, split_ratio)
    
    elif data_months_label == '60m':
        train_val_df, test_df = split_train_val_test_60m(input_df, split_ratio)

    return train_val_df, test_df

def split_train_val_test_12m(input_df, split_ratio):
    input_df = input_df.withColumn("two_hours_prior_depart_UTC", 
                                   to_timestamp("two_hours_prior_depart_UTC", "yyyy-MM-dd HH:mm:ss"))
    input_df = input_df.orderBy('two_hours_prior_depart_UTC').cache()
    # total_rows = input_df.count()
    # train_val_limit = int(total_rows*split_ratio)

    train_val_df = input_df.filter(col('QUARTER').isin([1,2,3])).cache()
    test_df = input_df.filter(col('QUARTER').isin([4])).cache()    

    return train_val_df, test_df
  
def split_train_val_test_60m(input_df, split_ratio):
    input_df = input_df.withColumn("two_hours_prior_depart_UTC", 
                                   to_timestamp("two_hours_prior_depart_UTC", "yyyy-MM-dd HH:mm:ss"))
    input_df = input_df.orderBy('two_hours_prior_depart_UTC').cache()
    # total_rows = input_df.count()
    # train_val_limit = int(total_rows*split_ratio)

    train_val_df = input_df.filter(col('YEAR')<=2018).cache()
    test_df = input_df.filter(col('YEAR')>2018).cache()    

    return train_val_df, test_df

def execute_main(input_path, data_months_label, split_ratio, output_version):
    # connect to the blob and start spark
    team_blob_url, mids261_mount_path = connect_to_team_blob_and_start_spark()

    # read in pre processing data
    # data_col_cleaned = (spark.read.parquet(f"{team_blob_url}/12m_final_data/"))
    # data_col_cleaned = spark.read.parquet(f"{team_blob_url}/12m_final_data_deduped/")
    data_col_cleaned = spark.read.parquet(f"{team_blob_url}/{input_path}/")
    print(f"data_col_cleaned rows: {data_col_cleaned.count()}")

    # list of features to be used in analytical stages
    tgt_var = ["DEP_DELAY_GROUP"]  #'DEP_DELAY_GROUP' will be regrouped later
    key_time_features = [
        "two_hours_prior_depart_UTC",
        # "sched_depart_date_time", # IN: doesn't exist in 60m data
        "sched_depart_date_time_UTC",
        "four_hours_prior_depart_UTC",
        "FL_DATE",
    ]

    important_num_features = [
        "DEP_DELAY_NEW",
        "TAXI_OUT",
        "TAXI_IN",
        "ARR_DELAY_NEW",
        "CRS_ELAPSED_TIME",
        "ACTUAL_ELAPSED_TIME",
        "DISTANCE",
        "ELEVATION",
        "HourlyAltimeterSetting",
        # "HourlyDewPointTemperature", #IN: highly correlated with drytemp, drop
        "HourlyDryBulbTemperature",
        "HourlyPrecipitation",
        "HourlyRelativeHumidity",
        "HourlySeaLevelPressure",
        "HourlyStationPressure",
        "HourlyVisibility",
        # "HourlyWetBulbTemperature", #IN: highly correlated with drytemp, drop
        # NOTE: Not found in 12m previously, but maybe Irene's new dataset will work.
        "HourlyWindDirection",
        "HourlyWindSpeed",
    ] # 17 in total

    # IN: before fixing the original data, let's not include CANCELLED and DIVERTED as features
    # IN: when we have <1 year data, exclude 'YEAR', for 3m data, exclude 'QUARTER'
    important_cat_features = [
        "MONTH",
        "DAY_OF_MONTH",
        "DAY_OF_WEEK",
        "DEP_TIME_BLK",
        "ARR_TIME_BLK",
        "origin_type",
        "dest_type",
        "OP_UNIQUE_CARRIER",
        "ORIGIN",
        "DEST",
        'TAIL_NUM',
        # NOTE: Added for 12m
        "QUARTER",
        # NOTE: Added for >12m
        'YEAR'
    ]

    other_relevant_cat_features = [
        "DEP_TIMESTAMP_ACTUAL_UTC",
        "ARR_TIMESTAMP_ACTUAL_UTC",
        "DEP_DEL15",
        "ARR_DEL15",
        "ORIGIN_CITY_NAME",
        "ORIGIN_STATE_ABR",
        "DEST_CITY_NAME",
        "DEST_STATE_ABR",
    ]

    features_not_directly_available = [
        "DEP_DELAY_NEW",
        "ARR_DELAY_NEW",
        "TAXI_OUT",
        "TAXI_IN",
        "CANCELLED",
        "DIVERTED",
        "ACTUAL_ELAPSED_TIME",
        "Actual_vs_scheduled_elapsed_time",
    ]

    # new_features = []

    new_features = [
        "DEP_DELAY_COUNT_3600sec",
        "DEP_DELAY_COUNT_7200sec",
        "ARR_DELAY_COUNT_3600sec",
        "ARR_DELAY_COUNT_7200sec",
        "CANCELLED_DELAY_COUNT_3600sec",
        "CANCELLED_DELAY_COUNT_7200sec",
        # 'InDegreeOrigin', 
        # 'OutDegreeOrigin', 
        # 'TotalDegreeOrigin'
    ] # IN: only for 12month onwards (maybe Rahul's new 3m has it all already)

    # select most relevant features to proceed with analysis
    included_cols = (
        tgt_var
        + key_time_features
        + important_num_features
        + important_cat_features
        + new_features
    )

    data_col_cleaned_fe = data_col_cleaned.select(*included_cols)
    # print(f"After data_col_cleaned.select - data_col_cleaned_fe rows: {data_col_cleaned_fe.count()}")

    # NOTE: Not found in 12m - included since take-3
    data_col_cleaned_fe = data_col_cleaned_fe.withColumn('HourlyWindDirection', col('HourlyWindDirection').cast('float'))
    # IN: this needs to be fixed at pre-processing level

    # create additional features
    data_col_cleaned_fe, new_features = create_additional_features(
        data_col_cleaned_fe,
        new_features,
        team_blob_url
    )
    print(
        f"After create_additional_features - data_col_cleaned_fe rows: {data_col_cleaned_fe.count()}"
    )

    # rebucketize target variable
    data_col_cleaned_fe = data_col_cleaned_fe.withColumn(
        "DEP_DELAY_GROUP_NEW",
        when(data_col_cleaned["DEP_DELAY_GROUP"] < 0, 0)
        .when(data_col_cleaned["DEP_DELAY_GROUP"] == 0, 1)
        .otherwise(2),
    )
    # print(
    #     f"After rebucketize target variable - data_col_cleaned_fe rows: {data_col_cleaned_fe.count()}"
    # )


    # Order the dataframe for later splitting (note: will still need to order right before the split to make sure)
    data_col_cleaned_fe = data_col_cleaned_fe.orderBy('two_hours_prior_depart_UTC').cache()

    # Checkpoint the data with all relevant features before splitting
    data_col_cleaned_fe.write.mode("overwrite").parquet(
      f"{team_blob_url}/data_cleaned_ordered_with_feature_engineerings_{data_months_label}_{output_version}")
    print(f"After saving to blob, data_col_cleaned_fe rows: {data_col_cleaned_fe.count()}")

    '''if already have full data with features'''
    # data_col_cleaned_fe = spark.read.parquet(f"{team_blob_url}/data_cleaned_ordered_with_feature_engineerings_{data_months_label}_{output_version}")

    # print(f"data_col_cleaned_fe rows: {data_col_cleaned_fe.count()}")

    # new_features.extend(['Lag_same_flight_DEP_DELAY_NEW',
    #     'Avg_DEP_DELAY_NEW_other_flights',
    #     'Avg_TAXI_OUT_other_flights',
    #     'Last_DEP_DELAY_NEW_other_flights',
    #     'Last_TAXI_OUT_other_flights',
    #     'Avg_ARR_DELAY_NEW_other_flights',
    #     'Avg_TAXI_IN_other_flights',
    #     'Last_ARR_DELAY_NEW_other_flights',
    #     'Last_TAXI_IN_other_flights',
    #     'Weekly_avg_DEP_DELAY_NEW_hat',
    #     'origin_code_vector',
    #     'dest_code_vector'])

    ''''''

    # Splitting data into train_val_df, test_df
    train_val_df, test_df = split_train_val_test(data_col_cleaned_fe, split_ratio, data_months_label)

    print(
        f"After data splitting - train_val_df rows: {train_val_df.count()}"
    )
    print(
        f"After data splitting - test_df rows: {test_df.count()}"
    )

    # Encoding
    ## Label Encoding
    encoded_cols = []

    # define the desired order
    order_label = ["small_airport", "medium_airport", "large_airport"]

    print ('Label encoding train_val..')
    train_val_df = label_encoding_with_order(train_val_df, 'origin_type', 
                                                    order_label).cache()

    train_val_df = label_encoding_with_order(train_val_df, 'dest_type', 
                                                    order_label).cache() 

    # process test
    print ('Label encoding test..')
    test_df = label_encoding_with_order(test_df, 'origin_type', 
                                                    order_label).cache()

    test_df = label_encoding_with_order(test_df, 'dest_type', 
                                                    order_label).cache() 

    print(
        f"After label_encoding_with_order for both origin_type/dest_type - train_val_df rows: {train_val_df.count()}"
    )
    print(
        f"After label_encoding_with_order for both origin_type/dest_type - test_df rows: {test_df.count()}"
    )

    encoded_cols.append("origin_type_Encoded")
    encoded_cols.append("dest_type_Encoded")

    ## One-hot Encoding
    # IN: for 3m data, Year AND QUARTER can't be included
    # IN: BEFORE FIXING THE DATA, do not include 'CANCELLED'and 'DIVERTED' as a column
    col_for_one_hot = [
        "MONTH",
        "DAY_OF_MONTH",
        "DAY_OF_WEEK",
        "OP_UNIQUE_CARRIER",
        "DEP_TIME_BLK",
        "ARR_TIME_BLK",
        # NOTE: Added for 12m
        "QUARTER",
        # NOTE: Added for >12m
        'YEAR'
    ]

    print ('One hot encoding train_val and test_df using train_val encoder..')
    train_val_df, test_df = one_hot_encoding(col_for_one_hot, train_val_df, 
                                         [train_val_df, 
                                          test_df])

    print(
        f"After one_hot_encoding - train_val_df rows: {train_val_df.count()}"
    )
    print(
        f"After one_hot_encoding - test_df rows: {test_df.count()}"
    )

    encoded_cols.extend([col + "_Endoded" for col in col_for_one_hot])

    # Scaling numerical and label encoded variables
    scaling_flag = 'minmax'  #IN: 'minmax' or 'standard'

    # scaled_cols = list(
    #     set(important_num_features).union(set(new_features))
    #     - {"ACTUAL_ELAPSED_TIME", "CRS_ELAPSED_TIME"}
    #     - set(features_not_directly_available)
    #     - set(['origin_code_vector', 'dest_code_vector'])
    # ) + ["origin_type_Encoded", "dest_type_Encoded"]

    scaled_cols = sorted(list(set(important_num_features).union(set(new_features))- {'ACTUAL_ELAPSED_TIME','CRS_ELAPSED_TIME'} - set(features_not_directly_available)
                   -set(['origin_code_vector', 'dest_code_vector']))+ ['origin_type_Encoded', 'dest_type_Encoded'])
    print ('scaled_cols are: ')
    print (scaled_cols) 
    print ()
    # IN:using sorted() to keep the order of columns more consistent and understandable (11 original, 19 new, 2 label encoding - should be 32 in total)

    print ('scaling train_val_df and test_df using train_val scaler..')
    train_val_df, test_df = scaling(scaled_cols, train_val_df, 
                                    [train_val_df, test_df], scaling_flag)
    
    print(
        f"After standard_scaling - train_val_df rows: {train_val_df.count()}"
    )

    print(
        f"After standard_scaling - test_df rows: {test_df.count()}"
    )

    # Order the train_val_df and test_df given the encoding may shuffle the order
    train_val_df  = train_val_df.orderBy('two_hours_prior_depart_UTC').cache()
    test_df  = test_df.orderBy('two_hours_prior_depart_UTC').cache()

    print(f"End: train_val_df rows: {train_val_df.count()}")
    print(f"End: test_df rows: {test_df.count()}")

    # checkpoint the feature selected and encoded, scaled data to blob
    train_val_df.write.mode("overwrite").parquet(
        f"{team_blob_url}/train_val_data_encoded_scaled_ordered_{data_months_label}_{output_version}"
    )

    test_df.write.mode("overwrite").parquet(
      f"{team_blob_url}/test_data_encoded_scaled_ordered_{data_months_label}_{output_version}"
    )

    # save scaled_cols too
    spark.createDataFrame(pd.DataFrame(scaled_cols)).write.mode('overwrite').parquet(
        f'{team_blob_url}/scaled_cols_{data_months_label}_{output_version}')
    
    # DONE on 20241116
    # data_col_cleaned_fe.write.mode("overwrite").parquet(f"{team_blob_url}/data_encoded_scaled_12m")

    return (
        train_val_df, 
        test_df,
        data_col_cleaned_fe,
        encoded_cols,
        scaled_cols,
        new_features,
        important_cat_features,
        important_num_features,
        tgt_var,
        key_time_features,
        other_relevant_cat_features,
    )


# Option one - Combined Execution - in main

In [0]:
output_version = 'v4'
split_ratio = 0.75
input_path = '60m_take_4_final'  # '60m_take_4_final', 12m_take_5_final', 12m_take_4_final', 12m_take_3_final_data' , '12m_take_2_final_data'
data_months_label = '60m'
# spark.catalog.clearCache() # IN: note this should only clear cache for the current spark session instead of the entire cluster

(   train_val_df, 
    test_df,
    data_col_cleaned_fe,
    encoded_cols,
    scaled_cols,
    new_features,
    important_cat_features,
    important_num_features,
    tgt_var,
    key_time_features,
    other_relevant_cat_features,
) = execute_main(input_path,
                    data_months_label,
                    split_ratio,
                    output_version)

[Row(MONTH=6), Row(MONTH=3), Row(MONTH=5), Row(MONTH=4), Row(MONTH=8), Row(MONTH=7), Row(MONTH=2), Row(MONTH=9), Row(MONTH=1)]
[Row(MONTH=12), Row(MONTH=10), Row(MONTH=11)]


# Option two - Separate execution

In [0]:
## set up
output_version = 'v4' # 'v3', 'v2'
split_ratio = 0.75
input_path = '60m_take_4_final'  # '60m_take_4_final', 12m_take_5_final', 12m_take_4_final', 12m_take_3_final_data' , '12m_take_2_final_data'
data_months_label = '60m'
spark.catalog.clearCache() # IN: note this should only clear cache for the current spark session instead of the entire cluster

## 1. Feature engineering

In [0]:
team_blob_url, mids261_mount_path = connect_to_team_blob_and_start_spark()

# read in pre processing data
# data_col_cleaned = (spark.read.parquet(f"{team_blob_url}/12m_final_data/"))
# data_col_cleaned = spark.read.parquet(f"{team_blob_url}/12m_final_data_deduped/")
data_col_cleaned = spark.read.parquet(f"{team_blob_url}/{input_path}/")
print(f"data_col_cleaned rows: {data_col_cleaned.count()}")

# list of features to be used in analytical stages
tgt_var = ["DEP_DELAY_GROUP"]  #'DEP_DELAY_GROUP' will be regrouped later
key_time_features = [
    "two_hours_prior_depart_UTC",
    # "sched_depart_date_time", # IN: doesn't exist in 60m data
    "sched_depart_date_time_UTC",
    "four_hours_prior_depart_UTC",
    "FL_DATE",
]

important_num_features = [
    "DEP_DELAY_NEW",
    "TAXI_OUT",
    "TAXI_IN",
    "ARR_DELAY_NEW",
    "CRS_ELAPSED_TIME",
    "ACTUAL_ELAPSED_TIME",
    "DISTANCE",
    "ELEVATION",
    "HourlyAltimeterSetting",
    # "HourlyDewPointTemperature", #IN: highly correlated with drytemp, drop
    "HourlyDryBulbTemperature",
    "HourlyPrecipitation",
    "HourlyRelativeHumidity",
    "HourlySeaLevelPressure",
    "HourlyStationPressure",
    "HourlyVisibility",
    # "HourlyWetBulbTemperature", #IN: highly correlated with drytemp, drop
    # NOTE: Not found in 12m previously, but maybe Irene's new dataset will work.
    "HourlyWindDirection",
    "HourlyWindSpeed",
] # 17 in total

# IN: before fixing the original data, let's not include CANCELLED and DIVERTED as features
# IN: when we have <1 year data, exclude 'YEAR', for 3m data, exclude 'QUARTER'
important_cat_features = [
    "MONTH",
    "DAY_OF_MONTH",
    "DAY_OF_WEEK",
    "DEP_TIME_BLK",
    "ARR_TIME_BLK",
    "origin_type",
    "dest_type",
    "OP_UNIQUE_CARRIER",
    "ORIGIN",
    "DEST",
    'TAIL_NUM',
    # NOTE: Added for 12m
    "QUARTER",
    # NOTE: Added for >12m
    'YEAR'
]

other_relevant_cat_features = [
    "DEP_TIMESTAMP_ACTUAL_UTC",
    "ARR_TIMESTAMP_ACTUAL_UTC",
    "DEP_DEL15",
    "ARR_DEL15",
    "ORIGIN_CITY_NAME",
    "ORIGIN_STATE_ABR",
    "DEST_CITY_NAME",
    "DEST_STATE_ABR",
]

features_not_directly_available = [
    "DEP_DELAY_NEW",
    "ARR_DELAY_NEW",
    "TAXI_OUT",
    "TAXI_IN",
    "CANCELLED",
    "DIVERTED",
    "ACTUAL_ELAPSED_TIME",
    "Actual_vs_scheduled_elapsed_time",
]

# new_features = []

new_features = [
    "DEP_DELAY_COUNT_3600sec",
    "DEP_DELAY_COUNT_7200sec",
    "ARR_DELAY_COUNT_3600sec",
    "ARR_DELAY_COUNT_7200sec",
    "CANCELLED_DELAY_COUNT_3600sec",
    "CANCELLED_DELAY_COUNT_7200sec",
    # 'InDegreeOrigin', 
    # 'OutDegreeOrigin', 
    # 'TotalDegreeOrigin'
] # IN: only for 12month onwards (maybe Rahul's new 3m has it all already)

# select most relevant features to proceed with analysis
included_cols = (
    tgt_var
    + key_time_features
    + important_num_features
    + important_cat_features
    + new_features
)

data_col_cleaned_fe = data_col_cleaned.select(*included_cols)
# print(f"After data_col_cleaned.select - data_col_cleaned_fe rows: {data_col_cleaned_fe.count()}")

# NOTE: Not found in 12m - included since take-3
data_col_cleaned_fe = data_col_cleaned_fe.withColumn('HourlyWindDirection', col('HourlyWindDirection').cast('float'))
# IN: this needs to be fixed at pre-processing level

# create additional features
data_col_cleaned_fe, new_features = create_additional_features(
    data_col_cleaned_fe,
    new_features,
    team_blob_url
)
print(
    f"After create_additional_features - data_col_cleaned_fe rows: {data_col_cleaned_fe.count()}"
)

# rebucketize target variable
data_col_cleaned_fe = data_col_cleaned_fe.withColumn(
    "DEP_DELAY_GROUP_NEW",
    when(data_col_cleaned["DEP_DELAY_GROUP"] < 0, 0)
    .when(data_col_cleaned["DEP_DELAY_GROUP"] == 0, 1)
    .otherwise(2),
)
print(
    f"After rebucketize target variable - data_col_cleaned_fe rows: {data_col_cleaned_fe.count()}"
)


# Order the dataframe for later splitting (note: will still need to order right before the split to make sure)
data_col_cleaned_fe = data_col_cleaned_fe.orderBy('two_hours_prior_depart_UTC').cache()

# Checkpoint the data with all relevant features before splitting
data_col_cleaned_fe.write.mode("overwrite").parquet(
    f"{team_blob_url}/data_cleaned_ordered_with_feature_engineerings_{data_months_label}_{output_version}")
print(f"After saving to blob, data_col_cleaned_fe rows: {data_col_cleaned_fe.count()}")



data_col_cleaned rows: 31099553
Derive features based on the flight itself..
Total number of rows after lag based features: 31105479
Derive features from other flights departure status (simpflied)..
Total number of rows after departure based features: 31105479
Derive features from rolling monthly prophet forecast..
Total number of rows after prophet based features: 31105479
Convert ORIGIN and DEST to binary representation..
Total number of rows after ORIGIN and DEST binary vector: 31105479
Create degree_centrality based on scheduled flight in past three hours..
Total number of rows after degree_centrality in past three hours: 31105479
After create_additional_features - data_col_cleaned_fe rows: 31105479
After rebucketize target variable - data_col_cleaned_fe rows: 31105479
After saving to blob, data_col_cleaned_fe rows: 31105479


In [0]:
new_features

['DEP_DELAY_COUNT_3600sec',
 'DEP_DELAY_COUNT_7200sec',
 'ARR_DELAY_COUNT_3600sec',
 'ARR_DELAY_COUNT_7200sec',
 'CANCELLED_DELAY_COUNT_3600sec',
 'CANCELLED_DELAY_COUNT_7200sec',
 'Lag_same_flight_DEP_DELAY_NEW',
 'Avg_DEP_DELAY_NEW_other_flights',
 'Avg_TAXI_OUT_other_flights',
 'Last_DEP_DELAY_NEW_other_flights',
 'Last_TAXI_OUT_other_flights',
 'Monthly_avg_DEP_DELAY_NEW_hat',
 'origin_code_vector',
 'dest_code_vector',
 'Out_degree_same_origin_past_3hr',
 'In_degree_same_dest_past_3hr']

In [0]:
# How it should be - for reference
new_features = ['DEP_DELAY_COUNT_3600sec',
 'DEP_DELAY_COUNT_7200sec',
 'ARR_DELAY_COUNT_3600sec',
 'ARR_DELAY_COUNT_7200sec',
 'CANCELLED_DELAY_COUNT_3600sec',
 'CANCELLED_DELAY_COUNT_7200sec',
 'Lag_same_flight_DEP_DELAY_NEW',
 'Avg_DEP_DELAY_NEW_other_flights',
 'Avg_TAXI_OUT_other_flights',
 'Last_DEP_DELAY_NEW_other_flights',
 'Last_TAXI_OUT_other_flights',
 'Monthly_avg_DEP_DELAY_NEW_hat',
 'origin_code_vector',
 'dest_code_vector',
 'Out_degree_same_origin_past_3hr', # corrected from 'In_degree_same_origin_past_3hr'
 'In_degree_same_dest_past_3hr'] # corrected from 'Out_degree_same_dest_past_3hr'

In [0]:
display(data_col_cleaned_fe.dtypes)

_1,_2
origin_type,string
year_month,timestamp
TAIL_NUM,string
two_hours_prior_depart_UTC,timestamp
DEP_DELAY_GROUP,int
sched_depart_date_time_UTC,timestamp
four_hours_prior_depart_UTC,timestamp
FL_DATE,date
DEP_DELAY_NEW,double
TAXI_OUT,double


In [0]:
data_col_cleaned_fe.count()

31105479

## 2. Split data into train_val and test

In [0]:
'''if already have full data with features'''
# data_col_cleaned_fe = spark.read.parquet(f"{team_blob_url}/data_cleaned_ordered_with_feature_engineerings_{data_months_label}_{output_version}")

# print(f"data_col_cleaned_fe rows: {data_col_cleaned_fe.count()}")

# new_features.extend(['Lag_same_flight_DEP_DELAY_NEW',
#     'Avg_DEP_DELAY_NEW_other_flights',
#     'Avg_TAXI_OUT_other_flights',
#     'Last_DEP_DELAY_NEW_other_flights',
#     'Last_TAXI_OUT_other_flights',
#     'Avg_ARR_DELAY_NEW_other_flights',
#     'Avg_TAXI_IN_other_flights',
#     'Last_ARR_DELAY_NEW_other_flights',
#     'Last_TAXI_IN_other_flights',
#     'Weekly_avg_DEP_DELAY_NEW_hat',
#     'origin_code_vector',
#     'dest_code_vector'])

''''''

# Splitting data into train_val_df, test_df
train_val_df, test_df = split_train_val_test(data_col_cleaned_fe, split_ratio, data_months_label)

print(
    f"After data splitting - train_val_df rows: {train_val_df.count()}"
)
print(
    f"After data splitting - test_df rows: {test_df.count()}"
)

After data splitting - train_val_df rows: 23863442
After data splitting - test_df rows: 7242037


In [0]:
# spark.catalog.clearCache()

## 3 Encoding

In [0]:
# Encoding
## Label Encoding
encoded_cols = []

# define the desired order
order_label = ["small_airport", "medium_airport", "large_airport"]

print ('Label encoding train_val..')
train_val_df = label_encoding_with_order(train_val_df, 'origin_type', 
                                                order_label).cache()

train_val_df = label_encoding_with_order(train_val_df, 'dest_type', 
                                                order_label).cache() 

# process test
print ('Label encoding test..')
test_df = label_encoding_with_order(test_df, 'origin_type', 
                                                order_label).cache()

test_df = label_encoding_with_order(test_df, 'dest_type', 
                                                order_label).cache() 

print(
    f"After label_encoding_with_order for both origin_type/dest_type - train_val_df rows: {train_val_df.count()}"
)
print(
    f"After label_encoding_with_order for both origin_type/dest_type - test_df rows: {test_df.count()}"
)

encoded_cols.append("origin_type_Encoded")
encoded_cols.append("dest_type_Encoded")

## One-hot Encoding
# IN: for 3m data, Year AND QUARTER can't be included
# IN: BEFORE FIXING THE DATA, do not include 'CANCELLED'and 'DIVERTED' as a column
col_for_one_hot = [
    "MONTH",
    "DAY_OF_MONTH",
    "DAY_OF_WEEK",
    "OP_UNIQUE_CARRIER",
    "DEP_TIME_BLK",
    "ARR_TIME_BLK",
    # NOTE: Added for 12m
    "QUARTER",
    # NOTE: Added for >12m
    'YEAR'
]

print ('One hot encoding train_val and test_df using train_val encoder..')
train_val_df, test_df = one_hot_encoding(col_for_one_hot, train_val_df, 
                                      [train_val_df, 
                                      test_df])

print(
    f"After one_hot_encoding - train_val_df rows: {train_val_df.count()}"
)
print(
    f"After one_hot_encoding - test_df rows: {test_df.count()}"
)

encoded_cols.extend([col + "_Endoded" for col in col_for_one_hot])

# Scaling numerical and label encoded variables
scaling_flag = 'minmax'  #IN: 'minmax' or 'standard'

# scaled_cols = list(
#     set(important_num_features).union(set(new_features))
#     - {"ACTUAL_ELAPSED_TIME", "CRS_ELAPSED_TIME"}
#     - set(features_not_directly_available)
#     - set(['origin_code_vector', 'dest_code_vector'])
# ) + ["origin_type_Encoded", "dest_type_Encoded"]

scaled_cols = sorted(list(set(important_num_features).union(set(new_features))- {'ACTUAL_ELAPSED_TIME','CRS_ELAPSED_TIME'} - set(features_not_directly_available)
                -set(['origin_code_vector', 'dest_code_vector']))+ ['origin_type_Encoded', 'dest_type_Encoded'])
print ('scaled_cols are: ')
print (scaled_cols) 
print ()
# IN:using sorted() to keep the order of columns more consistent and understandable (11 original, 19 new, 2 label encoding - should be 32 in total)

print ('scaling train_val_df and test_df using train_val scaler..')
train_val_df, test_df = scaling(scaled_cols, train_val_df, 
                                [train_val_df, test_df], scaling_flag)

print(
    f"After standard_scaling - train_val_df rows: {train_val_df.count()}"
)

print(
    f"After standard_scaling - test_df rows: {test_df.count()}"
)

# Order the train_val_df and test_df given the encoding may shuffle the order
train_val_df  = train_val_df.orderBy('two_hours_prior_depart_UTC').cache()
test_df  = test_df.orderBy('two_hours_prior_depart_UTC').cache()

print(f"End: train_val_df rows: {train_val_df.count()}")
print(f"End: test_df rows: {test_df.count()}")

# checkpoint the feature selected and encoded, scaled data to blob
train_val_df.write.mode("overwrite").parquet(
    f"{team_blob_url}/train_val_data_encoded_scaled_ordered_{data_months_label}_{output_version}"
)

test_df.write.mode("overwrite").parquet(
  f"{team_blob_url}/test_data_encoded_scaled_ordered_{data_months_label}_{output_version}"
)

# save scaled_cols too
spark.createDataFrame(pd.DataFrame(scaled_cols)).write.mode('overwrite').parquet(
    f'{team_blob_url}/scaled_cols_{data_months_label}_{output_version}')


Label encoding train_val..
+--------------+-------------------------------+
|   origin_type|last_value(origin_type_Encoded)|
+--------------+-------------------------------+
| large_airport|                              2|
|medium_airport|                              1|
| small_airport|                              0|
+--------------+-------------------------------+

+--------------+-----------------------------+
|     dest_type|last_value(dest_type_Encoded)|
+--------------+-----------------------------+
| large_airport|                            2|
|medium_airport|                            1|
| small_airport|                            0|
+--------------+-----------------------------+

Label encoding test..
+--------------+-------------------------------+
|   origin_type|last_value(origin_type_Encoded)|
+--------------+-------------------------------+
| large_airport|                              2|
|medium_airport|                              1|
| small_airport|                

Downloading artifacts:   0%|          | 0/165 [00:00<?, ?it/s]

Uploading artifacts:   0%|          | 0/4 [00:00<?, ?it/s]

df_encoded.count():  23863442
df_encoded.count():  23863442
+---------------+--------------------+-------------------+-------------------------+--------------------+--------------------+---------------+-------------+
|  MONTH_Encoded|DAY_OF_MONTH_Encoded|DAY_OF_WEEK_Encoded|OP_UNIQUE_CARRIER_Encoded|DEP_TIME_BLK_Encoded|ARR_TIME_BLK_Encoded|QUARTER_Encoded| YEAR_Encoded|
+---------------+--------------------+-------------------+-------------------------+--------------------+--------------------+---------------+-------------+
|(12,[10],[1.0])|     (31,[23],[1.0])|      (7,[1],[1.0])|           (19,[8],[1.0])|     (19,[16],[1.0])|     (19,[17],[1.0])|  (4,[3],[1.0])|(4,[1],[1.0])|
|(12,[10],[1.0])|     (31,[23],[1.0])|      (7,[1],[1.0])|           (19,[8],[1.0])|     (19,[16],[1.0])|     (19,[17],[1.0])|  (4,[3],[1.0])|(4,[1],[1.0])|
|(12,[10],[1.0])|     (31,[23],[1.0])|      (7,[1],[1.0])|           (19,[8],[1.0])|     (19,[16],[1.0])|     (19,[17],[1.0])|  (4,[3],[1.0])|(4,[1],[1.0])

Downloading artifacts:   0%|          | 0/165 [00:00<?, ?it/s]

Uploading artifacts:   0%|          | 0/4 [00:00<?, ?it/s]

df_encoded.count():  7242037
df_encoded.count():  7242037
+---------------+--------------------+-------------------+-------------------------+--------------------+--------------------+---------------+------------+
|  MONTH_Encoded|DAY_OF_MONTH_Encoded|DAY_OF_WEEK_Encoded|OP_UNIQUE_CARRIER_Encoded|DEP_TIME_BLK_Encoded|ARR_TIME_BLK_Encoded|QUARTER_Encoded|YEAR_Encoded|
+---------------+--------------------+-------------------+-------------------------+--------------------+--------------------+---------------+------------+
|(12,[10],[1.0])|     (31,[23],[1.0])|      (7,[4],[1.0])|           (19,[8],[1.0])|     (19,[16],[1.0])|     (19,[17],[1.0])|  (4,[3],[1.0])|   (4,[],[])|
|(12,[10],[1.0])|     (31,[23],[1.0])|      (7,[4],[1.0])|           (19,[8],[1.0])|     (19,[16],[1.0])|     (19,[18],[1.0])|  (4,[3],[1.0])|   (4,[],[])|
|(12,[10],[1.0])|     (31,[23],[1.0])|      (7,[4],[1.0])|           (19,[2],[1.0])|     (19,[16],[1.0])|     (19,[16],[1.0])|  (4,[3],[1.0])|   (4,[],[])|
|(12,[

In [0]:
scaled_cols

['ARR_DELAY_COUNT_3600sec',
 'ARR_DELAY_COUNT_7200sec',
 'Avg_DEP_DELAY_NEW_other_flights',
 'Avg_TAXI_OUT_other_flights',
 'CANCELLED_DELAY_COUNT_3600sec',
 'CANCELLED_DELAY_COUNT_7200sec',
 'DEP_DELAY_COUNT_3600sec',
 'DEP_DELAY_COUNT_7200sec',
 'DISTANCE',
 'ELEVATION',
 'HourlyAltimeterSetting',
 'HourlyDryBulbTemperature',
 'HourlyPrecipitation',
 'HourlyRelativeHumidity',
 'HourlySeaLevelPressure',
 'HourlyStationPressure',
 'HourlyVisibility',
 'HourlyWindDirection',
 'HourlyWindSpeed',
 'In_degree_same_dest_past_3hr',
 'Lag_same_flight_DEP_DELAY_NEW',
 'Last_DEP_DELAY_NEW_other_flights',
 'Last_TAXI_OUT_other_flights',
 'Monthly_avg_DEP_DELAY_NEW_hat',
 'Out_degree_same_origin_past_3hr',
 'dest_type_Encoded',
 'origin_type_Encoded']

In [0]:
# replicate the potential features to use
cols_to_be_assembled_mod = ['scaled_features'] + list({x for x in train_val_df.columns if '_Encoded' in x} - {'origin_type_Encoded', 'dest_type_Encoded'} - {'ORIGIN_Encoded', 'DEST_Encoded'})\
    + ['origin_code_vector', 'dest_code_vector']

In [0]:
# for reference
cols_to_be_assembled_mod

['scaled_features',
 'DEP_TIME_BLK_Encoded',
 'DAY_OF_WEEK_Encoded',
 'MONTH_Encoded',
 'QUARTER_Encoded',
 'OP_UNIQUE_CARRIER_Encoded',
 'YEAR_Encoded',
 'ARR_TIME_BLK_Encoded',
 'DAY_OF_MONTH_Encoded',
 'origin_code_vector',
 'dest_code_vector']

In [0]:
# Total mod features for reference
all_mod_features = (scaled_cols + cols_to_be_assembled_mod)
all_mod_features.remove('scaled_features')
all_mod_features

['ARR_DELAY_COUNT_3600sec',
 'ARR_DELAY_COUNT_7200sec',
 'Avg_DEP_DELAY_NEW_other_flights',
 'Avg_TAXI_OUT_other_flights',
 'CANCELLED_DELAY_COUNT_3600sec',
 'CANCELLED_DELAY_COUNT_7200sec',
 'DEP_DELAY_COUNT_3600sec',
 'DEP_DELAY_COUNT_7200sec',
 'DISTANCE',
 'ELEVATION',
 'HourlyAltimeterSetting',
 'HourlyDryBulbTemperature',
 'HourlyPrecipitation',
 'HourlyRelativeHumidity',
 'HourlySeaLevelPressure',
 'HourlyStationPressure',
 'HourlyVisibility',
 'HourlyWindDirection',
 'HourlyWindSpeed',
 'In_degree_same_dest_past_3hr',
 'Lag_same_flight_DEP_DELAY_NEW',
 'Last_DEP_DELAY_NEW_other_flights',
 'Last_TAXI_OUT_other_flights',
 'Monthly_avg_DEP_DELAY_NEW_hat',
 'Out_degree_same_origin_past_3hr',
 'dest_type_Encoded',
 'origin_type_Encoded',
 'DEP_TIME_BLK_Encoded',
 'DAY_OF_WEEK_Encoded',
 'MONTH_Encoded',
 'QUARTER_Encoded',
 'OP_UNIQUE_CARRIER_Encoded',
 'YEAR_Encoded',
 'ARR_TIME_BLK_Encoded',
 'DAY_OF_MONTH_Encoded',
 'origin_code_vector',
 'dest_code_vector']