In [0]:
import pyspark
from pyspark.sql.types import StringType, BooleanType, IntegerType
import pyspark.sql.functions as F

import airporttime
from datetime import datetime, timedelta

import numpy as np

In [0]:
from pyspark.sql import SQLContext
from pyspark.mllib.stat import Statistics
from pyspark.sql.functions import udf
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler,StandardScaler
from pyspark.ml.feature import Bucketizer
from pyspark.ml import Pipeline
from sklearn.metrics import confusion_matrix

In [0]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.classification import RandomForestClassifier

#### Create the Azure BLOB storage to store data for quick access when datasets are huge

In [0]:
blob_container = "w261-scrr" # The name of your container created in https://portal.azure.com
storage_account = "midsw261rv" # The name of your Storage account created in https://portal.azure.com
secret_scope = "w261scrr" # The name of the scope created in your local computer using the Databricks CLI
secret_key = "w261scrrkey" # The name of the secret key created in your local computer using the Databricks CLI 
blob_url = f"wasbs://{blob_container}@{storage_account}.blob.core.windows.net"
mount_path = "/mnt/mids-w261"

In [0]:
spark.conf.set(
  f"fs.azure.sas.{blob_container}.{storage_account}.blob.core.windows.net",
  dbutils.secrets.get(scope = secret_scope, key = secret_key)
)

In [0]:
%run "./libs/weather_aggregation"

In [0]:
%run "./libs/time_based_features"

In [0]:
%run "./libs/transform"

In [0]:
%run "./libs/model_helper_functions"

In [0]:
%run "./libs/custom_cv"

#### Import joined data

In [0]:
df_train = spark.read.parquet(f"{blob_url}/join_full_0329")

In [0]:
df_test = spark.read.parquet(f"{blob_url}/test_full_join_0404")

### Cross Validation

In [0]:
# Transform the data and save it - run this once

# trainsplits, valsplits = Split4year5Fold(df_train)

# for i, val_train in enumerate(trainsplits):
  
#   df_train_split = aggregate_weather_reports(val_train)
#   df_val_split = aggregate_weather_reports(valsplits[i])
  
#   df_train_split = get_transformed_df(df_train_split)
#   df_val_split = get_transformed_df(df_val_split)
  
#   df_train_split = add_previous_flight_delay_indicator(df_train_split)
#   df_val_split = add_previous_flight_delay_indicator(df_val_split)
  
#   df_train_split.write.parquet(f"{blob_url}/cv_train_0407_split"+str(i))
#   df_val_split.write.parquet(f"{blob_url}/cv_val_0407_split"+str(i))
  
  
  

In [0]:
# This would be part of main flow

df_train_split = []
df_val_split = []

for i in range(5):
  
  cv_train_str = "cv_train_0407_split" + str(i)
  cv_val_str = "cv_val_0407_split" + str(i)
  
  df_train_split.append(spark.read.parquet(f"{blob_url}/{cv_train_str}"))
  df_val_split.append(spark.read.parquet(f"{blob_url}/{cv_val_str}"))



In [0]:
def preprocess(df):

  df = df.fillna(999999, subset=['CIG_CeilingHeightDim_median', 'VIS_Horizontal_median' ])
  df = df.fillna(0, subset=['AA_RainDepth','AA_RainDuration', 'AL_SnowAccumDuration_mean', 'AL_SnowAccumDepth', 'AJ1_SnowDepth_mean', 'AJ1_SnowEqWaterDepth','WND_Speed_mean', 'SLP_Value_mean'])

  
  df = df.withColumn("ORIGIN_DEST_COMBO", F.concat(col("ORIGIN"),F.lit('-'),col("DEST")))
  
  df = target_mean_encoding(df, col=['ORIGIN', 'DEST','ORIGIN_DEST_COMBO'], target='DEP_DEL15')

  df = df.withColumn("CRS_DEP_TIME",(F.regexp_replace(col("CRS_DEP_TIME"), "[:]","")).cast(IntegerType())) \
                          .withColumn("DAY_OF_WEEK",col("DAY_OF_WEEK").cast(StringType())) \
                          .withColumn("MONTH",col("MONTH").cast(StringType())) \
                          .drop('ORIGIN', 'DEST', 'ORIGIN_DEST_COMBO')

  return df

In [0]:
# test with hubs

# def add_hubs(df):
  
#   df = df.withColumn("UA_HUB_ORIG", F.when((df.OP_UNIQUE_CARRIER == "UA") & ((df.ORIGIN == "ORD") | (df.ORIGIN == "DEN") | (df.ORIGIN == "IAH") | (df.ORIGIN == "LAX") | \
#                                                    (df.ORIGIN == "EWR") | (df.ORIGIN == "SFO") | (df.ORIGIN == "IAD")), 1)
#                                      .otherwise(0))
  
#   df = df.withColumn("DL_HUB_ORIG", F.when((df.OP_UNIQUE_CARRIER == "DL") & ((df.ORIGIN == "ATL") | (df.ORIGIN == "BOS") | (df.ORIGIN == "DTW") | (df.ORIGIN == "LAX") | \
#                                                    (df.ORIGIN == "MSP") | (df.ORIGIN == "JFK") | (df.ORIGIN == "LGA") | (df.ORIGIN == "SLC") | (df.ORIGIN == "SEA")), 1)
#                                      .otherwise(0))

#   df = df.withColumn("AA_HUB_ORIG", F.when((df.OP_UNIQUE_CARRIER == "AA") & ((df.ORIGIN == "DFW") | (df.ORIGIN == "CLT") | (df.ORIGIN == "ORD") | (df.ORIGIN == "LAX") | \
#                                                    (df.ORIGIN == "MIA") | (df.ORIGIN == "JFK") | (df.ORIGIN == "LGA") | (df.ORIGIN == "PHL") | (df.ORIGIN == "PHX") | (df.ORIGIN == "DCA")), 1)
#                                      .otherwise(0))
  
#   df = df.withColumn("WN_HUB_ORIG", F.when((df.OP_UNIQUE_CARRIER == "WN") & ((df.ORIGIN == "ATL") | (df.ORIGIN == "BWI") | (df.ORIGIN == "MDW") | (df.ORIGIN == "DAL") | \
#                                                    (df.ORIGIN == "DEN") | (df.ORIGIN == "HOU") | (df.ORIGIN == "LAS") | (df.ORIGIN == "LAX") | (df.ORIGIN == "OAK") | (df.ORIGIN == "MCO") | (df.ORIGIN == "PHX")), 1)
#                                      .otherwise(0))
  
#   df = df.withColumn("AS_HUB_ORIG", F.when((df.OP_UNIQUE_CARRIER == "AS") & ((df.ORIGIN == "SEA") | (df.ORIGIN == "ANC") | (df.ORIGIN == "LAX") | (df.ORIGIN == "PDX") | \
#                                                    (df.ORIGIN == "SFO")), 1)
#                                      .otherwise(0))

#   return df




In [0]:
# select the columns we'll be using for training. This is so that we can choose columns for model and record scores.


# flights + weather + time based attribute
selected_cols = ['DEP_DEL15', 'CRS_DEP_TIME', 'OP_UNIQUE_CARRIER', 'DAY_OF_WEEK', 'DISTANCE', 'DISTANCE_GROUP', 'MONTH', 'ORIGIN', 'DEST', \
                  'CIG_CeilingHeightDim_median', 'VIS_Horizontal_median', 'AA_RainDepth','AA_RainDuration', 'AL_SnowAccumDuration_mean', \
                  'AL_SnowAccumDepth', 'AJ1_SnowDepth_mean', 'AJ1_SnowEqWaterDepth','WND_Speed_mean', 'SLP_Value_mean', \
                  'OP_CARRIER_FL_NUM', 'TAIL_NUM', 'TIMESTAMP_UTC', \
                  'PREV_DEP_DEL15']

df_temp = df_train_split[0].select(*selected_cols)

df_temp = preprocess(df_temp)

labelCol = ['DEP_DEL15']

categoricalColumns = [t[0] for t in df_temp.dtypes if t[1] =='string']
categoricalColumns.remove('OP_CARRIER_FL_NUM') # not needed for features
categoricalColumns.remove('TAIL_NUM')

numericCols = [t[0] for t in df_temp.dtypes if t[1] !='string']

numericCols.remove(*labelCol)
numericCols.remove('TIMESTAMP_UTC') # not needed for features

In [0]:
display(df_temp)

DEP_DEL15,CRS_DEP_TIME,OP_UNIQUE_CARRIER,DAY_OF_WEEK,DISTANCE,DISTANCE_GROUP,MONTH,CIG_CeilingHeightDim_median,VIS_Horizontal_median,AA_RainDepth,AA_RainDuration,AL_SnowAccumDuration_mean,AL_SnowAccumDepth,AJ1_SnowDepth_mean,AJ1_SnowEqWaterDepth,WND_Speed_mean,SLP_Value_mean,OP_CARRIER_FL_NUM,TAIL_NUM,TIMESTAMP_UTC,PREV_DEP_DEL15,ORIGIN_mean_encoding,DEST_mean_encoding,ORIGIN_DEST_COMBO_mean_encoding
0.0,1410,US,4,992.0,4,1,1263.92,10298.53,2.39,3.19,0.0,0,0.0,0.0,9.859999656677246,10205.1904296875,1790,N109UW,2015-01-01T19:10:00.000+0000,0.0,0.2089608241092899,0.2022562904926924,0.1948424068767908
1.0,1750,US,4,920.0,4,1,22000.0,16078.99,0.0,1.0,0.0,0,0.0,0.0,77.12999725341797,10230.6796875,1883,N109UW,2015-01-01T22:50:00.000+0000,0.0,0.1932316258770857,0.2041274635071824,0.1746522411128284
0.0,655,US,5,507.0,3,1,5439.91,16083.32,0.0,1.0,0.0,0,0.0,0.0,25.11000061035156,10231.7998046875,1960,N109UW,2015-01-02T11:55:00.000+0000,0.0,0.1943438256658595,0.1364126327266419,0.1243611584327086
0.0,734,US,6,541.0,3,1,1324.45,13512.06,12.53,4.32,0.0,0,0.0,0.0,11.779999732971191,10275.919921875,2042,N109UW,2015-01-03T12:34:00.000+0000,0.0,0.1771598071931776,0.2292914007500493,0.251937984496124
0.0,1020,US,6,541.0,3,1,14816.61,16089.95,0.0,6.44,0.0,0,0.0,0.0,25.479999542236328,10323.2099609375,887,N109UW,2015-01-03T15:20:00.000+0000,0.0,0.211280756434029,0.1364126327266419,0.202525497814473
1.0,1315,US,6,1475.0,6,1,362.0,13925.49,20.22,1.53,0.0,0,0.0,0.0,10.9399995803833,10272.009765625,756,N109UW,2015-01-03T18:15:00.000+0000,0.0,0.1771598071931776,0.2244701836022457,0.1382252559726962
1.0,1840,US,6,1475.0,6,1,212.53,8580.53,0.0,0.0,0.0,0,0.0,0.0,36.2549991607666,10168.4150390625,2039,N109UW,2015-01-03T22:40:00.000+0000,1.0,0.1991600465516369,0.1364126327266419,0.1615646258503401
1.0,2220,US,6,930.0,4,1,212.53,8580.53,2.61,1.93,0.0,0,0.0,0.0,24.0,10247.169921875,2039,N109UW,2015-01-04T03:20:00.000+0000,1.0,0.1771598071931776,0.1543094247565943,0.1641113003975014
0.0,510,US,7,930.0,4,1,765.76,4780.21,0.0,1.05,0.0,0,8.0,778.22,91.72000122070312,10157.3798828125,823,N109UW,2015-01-04T11:10:00.000+0000,1.0,0.151653351328994,0.1364126327266419,0.1047239612976664
0.0,940,US,7,361.0,2,1,61.0,1205.38,20.27,5.57,0.0,0,0.0,0.0,8.239999771118164,10204.01953125,1960,N109UW,2015-01-04T14:40:00.000+0000,0.0,0.1771598071931776,0.2029393824973133,0.1887138145840967


In [0]:


metricsArray = np.empty((0,3), int)

for i, cv_train in enumerate(df_train_split):
  
  cv_train = cv_train.select(*selected_cols)
  cv_val = df_val_split[i].select(*selected_cols)
  
  cv_train = preprocess(cv_train)
  cv_val = preprocess(cv_val)

  # oversampling
  # cv_train = undersampling(cv_train)
  
  pipeline = getRegressionPipeline(categoricalColumns, numericCols, labelCol)
   
  pipelineModel = pipeline.fit(cv_train)  

  val_ml_train = pipelineModel.transform(cv_train)
  val_ml_test = pipelineModel.transform(cv_val)
  
  cols = cv_train.columns
  selectedCols = ['features'] + cols
  
  train = val_ml_train.select(selectedCols)
  test = val_ml_test.select(selectedCols)
  
  print("############################")
  print("Validation Set {:d}".format(i+1))
  print("Training Dataset Count: " + str(train.count()))
  print("Test Dataset Count: " + str(test.count()))
  
  model, pred = execLinearModel(train, test, iter=20)
  
  precision, recall, fmeasure = getMetrics(pred)
  
  print("Precision is {:.3f}".format(precision))
  print("Recall is {:.3f}".format(recall))
  print("F beta(0.5) score is {:.3f}".format(fmeasure))
  
  newrow = np.array([precision, recall, fmeasure])

  metricsArray = np.append(metricsArray, [newrow], axis=0)


avgArray = np.mean(metricsArray, axis=0)

print("############################")
print("Average of Cross validation")
print("Average Precision is {:.3f}".format(avgArray[0]))
print("Average Recall is {:.3f}".format(avgArray[1]))
print("Average F beta(0.5) score is {:.3f}".format(avgArray[2])) 

  

### Run the model on test data

In [0]:
# Transform the training & test data and save it - run this once
  
# df_train_upd = aggregate_weather_reports(df_train)
# df_test_upd = aggregate_weather_reports(df_test)
  
# df_train_upd = get_transformed_df(df_train_upd)
# df_test_upd = get_transformed_df(df_test_upd)
  
# df_train_upd = add_previous_flight_delay_indicator(df_train_upd)
# df_test_upd = add_previous_flight_delay_indicator(df_test_upd)
  
# df_train_upd.write.parquet(f"{blob_url}/train_agg_0404")
# df_test_upd.write.parquet(f"{blob_url}/test_agg_0404")

In [0]:
# read the dataframes for inference - this will be part of main loop

df_train_main = spark.read.parquet(f"{blob_url}/train_agg_0404")
df_test_main = spark.read.parquet(f"{blob_url}/test_agg_0404")

### Custom cross validation

In [0]:
# cv_train = df_train_main.select(*selected_cols)

# cv_train = preprocess_dos(cv_train)

# pipeline = getRegressionPipeline(categoricalColumns, numericCols, labelCol)

# pipelineModel = pipeline.fit(cv_train) 

# val_ml_train = pipelineModel.transform(cv_train)

# val_ml_train = val_ml_train.withColumn("MONTH", val_ml_train.MONTH.cast(IntegerType()))
# val_ml_train = val_ml_train.withColumn("YEAR", val_ml_train.YEAR.cast(IntegerType()))


# cols = cv_train.columns
# selectedCols = ['features'] + cols
  
# train = val_ml_train.select(selectedCols)
# train = train.withColumnRenamed('DEP_DEL15', 'label')
  
# lr = LogisticRegression(labelCol="label", featuresCol="features")

# grid = ParamGridBuilder()\
#             .addGrid(lr.regParam, [0.1,1,10])\
#             .addGrid(lr.maxIter, [5,10,20])\
#             .build()

# evaluator = BinaryClassificationEvaluator()

# predictions = customGridsearchCV(train, estimator=lr, grid=grid, evaluator=evaluator)

# display(predictions)

In [0]:
def preprocess_dos(df):

  df = df.fillna(999999, subset=['CIG_CeilingHeightDim_median', 'VIS_Horizontal_median' ])
  df = df.fillna(0, subset=['AA_RainDepth','AA_RainDuration', 'AL_SnowAccumDuration_mean', 'AL_SnowAccumDepth', 'AJ1_SnowDepth_mean', 'AJ1_SnowEqWaterDepth','WND_Speed_mean', 'SLP_Value_mean'])
  
  df = df.withColumn("ORIGIN_DEST_COMBO", F.concat(col("ORIGIN"),F.lit('-'),col("DEST")))
  
  df = target_mean_encoding(df, col=['ORIGIN', 'DEST','ORIGIN_DEST_COMBO'], target='DEP_DEL15')

  df = df.withColumn("CRS_DEP_TIME",(F.regexp_replace(col("CRS_DEP_TIME"), "[:]","")).cast(IntegerType())) \
                          .withColumn("DAY_OF_WEEK",col("DAY_OF_WEEK").cast(StringType())) \
                          .withColumn("MONTH",col("MONTH").cast(StringType())) \
                          .drop('ORIGIN', 'DEST', 'ORIGIN_DEST_COMBO')

  return df

In [0]:
# flights + weather + time based attribute
selected_cols = ['DEP_DEL15', 'CRS_DEP_TIME','OP_UNIQUE_CARRIER', 'DAY_OF_WEEK', 'DISTANCE', 'DISTANCE_GROUP', 'MONTH', 'YEAR', 'ORIGIN', 'DEST', \
                  'CIG_CeilingHeightDim_median', 'VIS_Horizontal_median', 'AA_RainDepth','AA_RainDuration', 'AL_SnowAccumDuration_mean', \
                  'AL_SnowAccumDepth', 'AJ1_SnowDepth_mean', 'AJ1_SnowEqWaterDepth','WND_Speed_mean', 'SLP_Value_mean', \
                  'OP_CARRIER_FL_NUM', 'TAIL_NUM', 'TIMESTAMP_UTC', \
                  'PREV_DEP_DEL15']

df_temp2 = df_train_main.select(*selected_cols)

df_temp2 = preprocess_dos(df_temp2)

# Get numerical, categorical values and label ready for pipeline
labelCol = ['DEP_DEL15']

categoricalColumns = [t[0] for t in df_temp2.dtypes if t[1] =='string']
categoricalColumns.remove('OP_CARRIER_FL_NUM') # not needed for features
categoricalColumns.remove('TAIL_NUM')

numericCols = [t[0] for t in df_temp2.dtypes if t[1] !='string']

numericCols.remove(*labelCol)
numericCols.remove('TIMESTAMP_UTC') # not needed for features

In [0]:
df_train_main = df_train_main.select(*selected_cols)
df_test_main = df_test_main.select(*selected_cols)

df_train_main = preprocess_dos(df_train_main)
df_test_main = preprocess_dos(df_test_main)
  
#oversampling
# df_train_main = undersampling(df_train_main)
  
pipeline = getRegressionPipeline(categoricalColumns, numericCols, labelCol)
   
pipelineModel = pipeline.fit(df_train_main)  

ml_train = pipelineModel.transform(df_train_main)
ml_test = pipelineModel.transform(df_test_main)

cols = df_train_main.columns
selectedCols = ['features'] + cols
  
train_all = ml_train.select(selectedCols)
test_all = ml_test.select(selectedCols)

print("############################")

model, pred = execLinearModel(train_all, test_all, iter=20)

precision, recall, fmeasure = getMetrics(pred)

print("Final test scores")
print("Precision is {:.3f}".format(precision))
print("Recall is {:.3f}".format(recall))
print("F beta(0.5) score is {:.3f}".format(fmeasure))

In [0]:
pred.write.parquet(f"{blob_url}/lr_test_0410")

### Analyze errors

In [0]:
%run "./libs/error_analysis"

In [0]:
analyze_errors(pred)

PRED_GROUP,avg(DISTANCE),avg(CIG_CeilingHeightDim_median),avg(CRS_DEP_TIME),avg(VIS_Horizontal_median),avg(WND_Speed_mean)
TP,843.1379388649191,10244.190668704992,1631.7239394963196,14268.70745387751,40.98120574528572
TN,792.6368175403345,12674.413193342876,1294.1300056500847,15009.10957174036,33.61618904525877
FN,820.0505912297701,11297.236294662258,1452.7310437780202,14508.526454400817,35.57967096743533
FP,839.0495266389655,11479.684976545512,1253.7146279854603,14830.057540705751,39.48762411583894


PRED_GROUP,avg(PREV_DEP_DEL15)
TP,0.9969039218835676
TN,0.0363169708361514
FN,0.1589596549923925
FP,0.9861688218341628
