In [None]:
#imports

In [None]:
# plotting 
import matplotlib.pyplot as plt
import seaborn as sns

# pyspark / sparksql
from pyspark.sql import functions as f
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType, NullType, ShortType, DateType, BooleanType, BinaryType, FloatType
from pyspark.sql import SQLContext
from pyspark.sql.window import Window

# models
import math
import numpy as np
import pandas as pd
from multiprocessing.pool import ThreadPool
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.feature import StringIndexer, OneHotEncoder, FeatureHasher, Normalizer
from pyspark.ml.classification import LogisticRegression, GBTClassifier, DecisionTreeClassifier, RandomForestClassifier
from pyspark.ml.regression import LinearRegression, GBTRegressor, DecisionTreeRegressor
from pyspark.mllib.evaluation import BinaryClassificationMetrics
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, TrainValidationSplit, ParamGridBuilder, _parallelFitTasks, TrainValidationSplitModel

sqlContext = SQLContext(sc)
team_folder = "dbfs:/mnt/mids-w261/team17"
dbutils.fs.ls(team_folder)

In [None]:
#start spark session

In [None]:
spark = SparkSession.builder.appName("generic_name").getOrCreate()
spark

In [None]:
#adding columns - all sql functions available in "f" library of spark including lag / lead

In [None]:
def get_last_tail_num_flight_delay(airlines_with_tz):
  # window spec to get the last flight for the tail number
  tailnum_window_spec = \
    Window \
      .partitionBy([f.col("tail_num")]) \
      .orderBy(f.col("scheduled_flight_datetime_UTC")) \
      .rowsBetween(-1, -1)

  # create columns for:
  # 1. tail_num_delay_lag_tem = the delay value for the last flight that had the same tail number
  # 2. tail_num_dep_time_lag = when was the last flight departure that had the same tail number, in UTC
  # 3. tail_num_from = where did the last flight from this tail number come from
  # 4. tail_num_to = what was destination of the last flight from this tail_num
  # 5. tail_num_expected_elapsed = what is the scheduled elapsed time for the previous flight
  airlines_with_tz = airlines_with_tz \
    .withColumn("tail_num_delay_lag_temp", f.lag("dep_delay", 1).over(tailnum_window_spec)) \
    .withColumn("tail_num_dep_time_lag", f.lag("scheduled_flight_datetime_UTC", 1).over(tailnum_window_spec)) \
    .withColumn("tail_num_from", f.lag("origin", 1).over(tailnum_window_spec)) \
    .withColumn("tail_num_to", f.lag("dest", 1).over(tailnum_window_spec)) \
    .withColumn("tail_num_expected_elapsed", f.lag("crs_elapsed_time", 1).over(tailnum_window_spec))

  # if the last departure was between 2 and 8 hrs ago, record the previous delay, else 0
  airlines_with_tz = airlines_with_tz.withColumn("tail_num_delay_lag",
    f.when((f.col("scheduled_flight_datetime_UTC").cast(LongType()) - 
            f.col("tail_num_dep_time_lag").cast(LongType())).between(2*3600, 8*3600), 
           f.col("tail_num_delay_lag_temp")) \
    .otherwise(0))
  
  # if the new expected arrival time with the observed delay is within two hours of the current flight departure time, indicate number of min else 0
  # last destination must also match current origin
  airlines_with_tz = airlines_with_tz.withColumn("tail_num_arrival_to_next_departure_delta",
      (f.col("scheduled_flight_datetime_UTC").cast(LongType()) - 
      (f.col("tail_num_dep_time_lag").cast(LongType()) + f.col("tail_num_expected_elapsed")*60 + f.col("tail_num_delay_lag")*60)) / 60)
  
  airlines_with_tz = airlines_with_tz.withColumn("tail_num_arrival_to_next_departure_delta",
    f.when((f.col("tail_num_arrival_to_next_departure_delta") > 120) &
           (f.col("tail_num_to") == f.col("origin")), 120) \
    .when(f.col("tail_num_arrival_to_next_departure_delta").isNull(), 120) \
    .otherwise(f.col("tail_num_arrival_to_next_departure_delta")))
  
  # tail num arriving less than two hours before next departure indicator
  airlines_with_tz = airlines_with_tz.withColumn("tail_num_arrive_less_than_two_hours_indicator",
    f.when(f.col("tail_num_arrival_to_next_departure_delta") < 120, 1).otherwise(0))
  
  airlines_with_tz = airlines_with_tz.drop("tail_num_delay_lag_temp", "tail_num_dep_time_lag", "tail_num_to", 
                                           "tail_num_expected_elapsed", "tail_num_delay_lag")
  return airlines_with_tz.fillna({'tail_num_from':'NA'})

In [None]:
#add complex struct types schema udf

In [None]:
from pyspark.sql.functions import udf
from pyspark.sql.types import *
from pyspark.sql import functions as f

# parse wind fields
wind_schema = StructType([
    StructField("direction", FloatType(), False),
    StructField("dir_obs_quality", StringType(), False),
    StructField("type", StringType(), False),
    StructField("speed", FloatType(), False),
    StructField("speed_obs_quality", StringType(), False)
])

def wind(s):
    wind = s.split(',') if s else [float('nan'), None, None, float('nan'), None]
    wind[0], wind[3] = float(wind[0]), float(wind[3])
    return wind
  
# parse sky ceiling
# cavok = ceiling and visibility okay
ceiling_schema = StructType([
    StructField("height", FloatType(), False),
    StructField("obs_quality", StringType(), False),
    StructField("determination", StringType(), False),
    StructField("cavok", StringType(), False)
]) 

def sky_ceiling(s):
    sky = s.split(',') if s else [float('nan'), None, None, None]
    sky[0] = float(sky[0])
    return sky
  
# parse visibility
visibility_schema = StructType([
    StructField("visibility", FloatType(), False),
    StructField("vis_obs_quality", StringType(), False),
    StructField("variability", StringType(), False),
    StructField("var_obs_quality", StringType(), False)
]) 

def visibility(s):
    vis = s.split(',') if s else [float('nan'), None, None, None]
    vis[0] = float(vis[0])
    return vis

# just return the first measurement value as a float for [temperature, dew temp, pressure]
def first_value(s, sep=',', t=float):
    return t(s.split(sep)[0])

# precipitation
precipitation_schema = StructType([
    StructField("period", FloatType(), False),
    StructField("depth", FloatType(), False),
    StructField("condition", StringType(), False),
    StructField("precip_quality", StringType(), False)
]) 
def precipitation(s):
    precip = s.split(',') if s else [-1, -1, '-1', '-1']
    precip[0], precip[1] = float(precip[0]), float(precip[1])
    return precip
  
# snow
snow_schema = StructType([
    StructField("depth", FloatType(), False),
    StructField("condition", StringType(), False),
    StructField("snow_quality", StringType(), False),
    StructField("water_depth", StringType(), False)
]) 
def snow(s):
    snow = s.split(',')[:4] if s else [-1, '-1', '-1', -1]
    snow[0], snow[3] = float(snow[0]), float(snow[3])
    return snow

wind_udf = udf(wind, wind_schema)
sky_udf = udf(sky_ceiling, ceiling_schema)
vis_udf = udf(visibility, visibility_schema)
precip_udf = udf(precipitation, precipitation_schema)
snow_udf = udf(snow, snow_schema)
first_value_udf = udf(first_value, FloatType())

In [None]:
#manual sql in spark instance --> must create temp table

In [None]:
airline_features_full.createOrReplaceTempView("airlineFeaturesFullTeam17")
weather_features_full.createOrReplaceTempView("weatherFeaturesFullTeam17")
weather_feature_names = [f"o.{c} as origin_{c}" for c in weather_features_full.columns]
weather_feature_names.extend([f"d.{c} as dest_{c}" for c in weather_features_full.columns])

joined_full = spark.sql("""
  SELECT a.*, {weather_columns}
  FROM airlineFeaturesFullTeam17 a
  JOIN weatherFeaturesFullTeam17 o
    ON ('K'||a.origin) = o.call_sign
    AND a.scheduled_flight_hour_UTC = o.join_datetime
  JOIN weatherFeaturesFullTeam17 d
    ON ('K'||a.dest) = d.call_sign
    AND a.scheduled_flight_hour_UTC = d.join_datetime;
""".format(weather_columns=", ".join(weather_feature_names)))

display(joined_full)

In [None]:
#saving to disk

In [None]:
joined_full \
.write \
.mode('Overwrite') \
.parquet(team_folder + "/joinedFeaturesFull.parquet")

In [None]:
#kmeans clustering ++ union by name function --> must have same column names

In [None]:
from sklearn.cluster import KMeans, DBSCAN
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

joined_features_full = spark.read.option("header", "true").parquet(team_folder + "/joinedFeaturesFull.parquet")

origin_lat_long = joined_features_full.select(f.col("origin").alias("airport_code"), f.col("origin_latitude").alias("latitude"), f.col("origin_longitude").alias("longitude")).distinct()
dest_lat_long = joined_features_full.select(f.col("dest").alias("airport_code"), f.col("dest_latitude").alias("latitude"), f.col("dest_longitude").alias("longitude")).distinct()

airport_lat_long = origin_lat_long.unionByName(dest_lat_long).toPandas()

airport_lat_long['kmeans_cluster_label'] = KMeans(n_clusters=6, random_state=0).fit_predict(airport_lat_long[['latitude', 'longitude']])

sns.scatterplot(data=airport_lat_long, x='longitude', y='latitude', hue='kmeans_cluster_label', palette="deep")
plt.title("Airports Clustered by Lat, Long")
plt.show()


In [None]:
# udf and add / drop columns

In [None]:
def split_by_date_and_region(data, datecolumn='FL_DATE', region="origin_region", splits=[0.8,0.1,0.1], drop_date=True):
  def hashAndAssign(x, splits):
    hash_val = hash(x)
    perc = (hash_val % 1000) / 1000.0
    for i, split in enumerate(splits):
      if perc < split:
        return i
      perc -= split
    return i
  hash_and_assign_udf = f.udf(lambda x: hashAndAssign(x, splits), IntegerType())
  
  data = data.withColumn('data_group', hash_and_assign_udf(f.concat_ws('_',f.col(datecolumn),f.col(region))))
  if drop_date:
    data = data.drop(datecolumn)
  return data


def split_data_by_column(data, col_name="data_group"):
  train = data.filter(f.col(col_name) == 0).drop(col_name)
  valid = data.filter(f.col(col_name) == 1).drop(col_name)
  test = data.filter(f.col(col_name) == 2).drop(col_name)
  return train, valid, test

In [None]:
# drop na / onehot assembler / vector assembler / pipeline

In [None]:
def categorical_feature_transformations_linear(onehot_cols, hash_cols):
  # hash encode categorical features with high cardinality
  hasher = FeatureHasher(num_features=400, inputCols=hash_cols, outputCol="hashFeatures")
  
  # one hot encode features with low cardinality
  onehotIndexers = []
  onehotInputs = []
  onehotFeatures = []
  for c in onehot_cols:
    indexer = StringIndexer(inputCol=c, outputCol=f"{c}_indexed")
    onehotIndexers.append(indexer)
    onehotInputs.append(indexer.getOutputCol())
    onehotFeatures.append(f"{c}_onehot")
  one_hot = OneHotEncoder(inputCols=onehotInputs, outputCols=onehotFeatures)
  
  # return output columns and transformation objects
  return ["hashFeatures"] + onehotFeatures, onehotIndexers + [one_hot, hasher]

def categorical_feature_transformations_trees(cols):
  indexers = []
  features = []
  # simply index all categories
  for c in cols:
    indexer = StringIndexer(inputCol=c, outputCol=f"{c}_indexed")
    indexers.append(indexer)
    features.append(f"{c}_indexed")
  
  # return output columns and transformation objects
  return features, indexers

def feature_transformation_pipeline(df, numerical_cols, onehot_cols, hash_cols, output, transform_categorical=True):
  pipeline_stages = []
  # normalize numerical columns
  num_assembler = VectorAssembler(inputCols=numerical_cols, outputCol="numericalFeatures")
  normalizer = Normalizer(inputCol="numericalFeatures", outputCol="normFeatures")
  pipeline_stages.extend([num_assembler, normalizer])
  
  if transform_categorical: 
    # transform_categorical should be true if model requires vectorization of categorical feature, e.g. Linear Regression
    catFeatureCols, catStages = categorical_feature_transformations_linear(onehot_cols, hash_cols)
  else:
    # if tree based model, don't need to transform categorical features, just index
    catFeatureCols, catStages = categorical_feature_transformations_trees(onehot_cols + hash_cols)
    
  pipeline_stages.extend(catStages)
  
  # combine all feature transformations into a single feature vector
  assembler = VectorAssembler(inputCols=["normFeatures"] + catFeatureCols, outputCol="features")
  pipeline_stages.append(assembler)
  pipeline = Pipeline(stages=pipeline_stages)
  return pipeline

def full_model_pipeline():
  raise NotImplementedError


#function to convert df into consumable format for MLlib
def data_converter(df, numerical_cols, onehot_cols, hash_cols, output, transform_categorical=True, splits=[0.8,0.1,0.1], seed=0):
  pipe = feature_transformation_pipeline(df, numerical_cols, onehot_cols, hash_cols, output, transform_categorical=transform_categorical)
  
  tmp = df[numerical_cols + onehot_cols + hash_cols + [output, "FL_DATE"]]
  tmp = tmp.dropna()
  for c in onehot_cols + hash_cols:
    tmp = tmp.withColumn(c, f.col(c).cast(StringType()))
  data = split_by_date_and_region(tmp, 
                                  datecolumn="FL_DATE", 
                                  region="origin_region" if "origin_region" in tmp.columns else "origin",
                                  splits=splits)
  return data, pipe


def balance_classes(dataset, output_col, ratio=3.0, seed=2020):
  ones = dataset.where(f.col(output_col)==1).sample(True, ratio, seed=seed)
  zeroes = dataset.where(f.col(output_col)==0)
  train_final = zeroes.union(ones).orderBy(f.rand())
  return train_final

In [None]:
# hasattr method confusion matrix, precision / recall

In [None]:
def sigmoid(logit):
  return 1 / (1 + math.exp(-logit))
sigmoid_udf = f.udf(sigmoid, FloatType())
      
def ith_(v, i):
    try:
        return float(v[i])
    except ValueError:
        return None
extract_prob_udf = udf(lambda v: ith_(v, 1), DoubleType())

def format_predictions(predictions_df, output_col='DEP_DEL15'):
  #if regression based model, transform prediction with sigmoid and create label
  if not hasattr(predictions_df, 'probability'): 
    print("regression")
    predictions_df = predictions_df \
      .withColumn("label", (f.col(output_col) >= 15).cast(FloatType())) \
      .withColumn("delay_prob", sigmoid_udf(f.col("prediction") - 14.5))
  else:
    print("classification")
    predictions_df = predictions_df \
      .withColumn("label", (f.col(output_col)).cast(FloatType())) \
      .withColumn("delay_prob", extract_prob_udf("probability"))
  return predictions_df

from collections import namedtuple
ConfusionMatrix = namedtuple('ConfusionMatrix', ['TP', 'FP', 'TN', 'FN'])

def accuracy(confusion):
  return (confusion.TP + confusion.TN) / sum(confusion)

def accuracy_of_common_class_prediction(confusion):
  pos_class_prediction_accuracy = (confusion.TP + confusion.FN) / sum(confusion)
  return max(pos_class_prediction_accuracy, 1-pos_class_prediction_accuracy)

def precision(confusion):
  try:
    return (confusion.TP) / (confusion.TP + confusion.FP)
  except ZeroDivisionError:
    return float('nan')

def recall(confusion):
  return (confusion.TP) / (confusion.TP + confusion.FN)

def f1_score(confusion):
  prec = precision(confusion)
  rec = recall(confusion)
  return 2 * (prec * rec) / (prec + rec)

def categorize_prediction(pred, true):
  # [TP, FP, TN, FN]
  confusion = [0, 0, 0, 0]
  if pred == 1 and true == 1:
    confusion[0] = 1
  elif pred == 1 and true == 0:
    confusion[1] = 1
  elif pred == 0 and true == 0:
    confusion[2] = 1
  else:
    confusion[3] = 1
  return confusion

confusion_udf = f.udf(categorize_prediction, IntegerType())

def calc_metrics_at_thresh(df, prediction='probability', outcome='label', thresh=0.5, verbose=True):
  df = df.withColumn('prediction_at_thresh', (f.col(prediction) >= thresh).cast(IntegerType()))
  
  confusion = df.select(["prediction_at_thresh", outcome]).rdd \
    .map(lambda pred_truth: categorize_prediction(pred_truth[0], pred_truth[1])) \
    .reduce(lambda x, y: [sum(z) for z in zip(x, y)])
  confusion_matrix = ConfusionMatrix(*confusion)
  print(confusion_matrix)
  
  acc = accuracy(confusion_matrix)
  prec = precision(confusion_matrix)
  rec = recall(confusion_matrix)
  f1 = f1_score(confusion_matrix)
  
  if verbose:
    print("predicting frequent class would yield {:.4f} accuaracy".format(accuracy_of_common_class_prediction(confusion_matrix)))
    print(confusion_matrix)
    print("accuracy: {:.4f}".format(acc))
    print("precision: {:.4f}".format(prec))
    print("recall: {:.4f}".format(rec))
    print("f1: {:.4f}".format(f1))
  metrics = {'confusion_matrix': confusion_matrix, 'accuracy': acc, 'precision': prec, 'recall': rec, 'f1': f1}
  return metrics

def confusion_by_thresh(df, prediction='probability', outcome='label'):
  _min, _max = df.select(f.min(f.col(prediction), f.max(f.col(prediction)))).collect()[0]
  
  confusion_matrices = df.select(prediction, outcome).rdd \
    .flatMap(lambda x: [(thresh, categorize_prediction(x[0]>=thresh, x[1])) for thresh in np.arange(_min, _max, 0.01)]) \
    .reduceByKey(lambda x, y: [sum(z) for z in zip(x, y)]) \
    .mapValues(lambda x: ConfusionMatrix(*x)) \
    .collect()
  
  confusion_df = pd.DataFrame(confusion_matrices, columns=["threshold", "confusion"])
  confusion_df['accuracy'] = confusion_df["confusion"].apply(lambda x: accuracy(x))
  confusion_df['precision'] = confusion_df["confusion"].apply(lambda x: precision(x))
  confusion_df['recall'] = confusion_df["confusion"].apply(lambda x: recall(x))
  confusion_df['f1'] = confusion_df["confusion"].apply(lambda x: f1_score(x))
  return confusion_df

def plot_pr_curve(confusion_by_thresh):
  prec_rec = confusion_by_thresh[['precision', 'recall']].dropna().sort_values('recall')
  sns.lineplot(data=prec_rec, x='recall', y='precision')
  plt.title('Precision-Recall')
  plt.show()

def plot_predictions_by_label(predictions, prediction='probability', outcome='label', fractions={0: 0.005, 1: 0.001}):
  sampled = predictions.sampleBy(outcome, fractions=fractions, seed=0).select(probability, outcome).toPandas()
  sns.distplot(sampled[prediction][sampled[outcome]==0], label="Label=0")
  sns.distplot(sampled[prediction][sampled[outcome]==1], label="Label=1")
  plt.xlabel("predictions")
  plt.legend()
  plt.show()

In [None]:
#classification metrics

In [None]:
#calculate classification metrics
metrics = BinaryClassificationMetrics(predictions.select(["delay_prob", "label"]).rdd)
print("PR AUC: {}".format(metrics.areaUnderPR))
print("ROC AUC: {}".format(metrics.areaUnderROC))

In [None]:
# rename column

In [None]:
tvs_rf_model = tvs_rf.fit(data.withColumnRenamed('DEP_DEL15', 'label'))

In [None]:
#create massive sql query function

In [None]:
def create_weather_features(weather_parsed, where_clause=""):
  weather_parsed.createOrReplaceTempView("weather_parsed")
  weatherFeatures = spark.sql("""
    SELECT 
      date_trunc('HOUR', DATE) + INTERVAL 2 hours as join_datetime,
      date,
      trim(call_sign) as call_sign,

      -- wind
      wind_parsed.speed as wind_speed, 
      wind_parsed.speed - lag(wind_parsed.speed, 1) over (partition by call_sign order by date) as wind_speed_minus_1,
      wind_parsed.speed - lag(wind_parsed.speed, 2) over (partition by call_sign order by date) as wind_speed_minus_2,
      wind_parsed.speed - max(wind_parsed.speed) over (partition by call_sign order by date rows 2 preceding) as diff_to_max_wind,

      -- visibility
      visibility_parsed.visibility as visibility, 
      visibility_parsed.visibility - lag(visibility_parsed.visibility, 1) over (partition by call_sign order by date) as visibility_minus_1,
      visibility_parsed.visibility - lag(visibility_parsed.visibility, 2) over (partition by call_sign order by date) as visibility_minus_2,
      visibility_parsed.variability as visibility_variability,

      -- sky_ceiling
      sky_ceiling_parsed.height as ceiling, 
      sky_ceiling_parsed.height - lag(sky_ceiling_parsed.height, 1) over (partition by call_sign order by date) as ceiling_minus_1,
      sky_ceiling_parsed.height - lag(sky_ceiling_parsed.height, 2) over (partition by call_sign order by date) as ceiling_minus_2,
      sky_ceiling_parsed.cavok,

      -- snow
      snow_parsed.condition as snow_condition,
      snow_parsed.depth as snow_depth, 
      snow_parsed.depth - lag(snow_parsed.depth, 1) over (partition by call_sign order by date) as snow_depth_minus_1,
      snow_parsed.depth - lag(snow_parsed.depth, 2) over (partition by call_sign order by date) as snow_depth_minus_2,

      -- precipitation
      aa1_parsed.period as precip_period,
      aa1_parsed.depth as precip_depth,
      case when aa4_parsed.depth > 0 then aa4_parsed.depth
           when aa3_parsed.depth > 0 then aa3_parsed.depth
           when aa2_parsed.depth > 0 then aa2_parsed.depth
           else aa1_parsed.depth end as max_precip_depth,
      case when aa4_parsed.depth > 0 then aa4_parsed.period
           when aa3_parsed.depth > 0 then aa3_parsed.period
           when aa2_parsed.depth > 0 then aa2_parsed.period
           else aa1_parsed.period end as max_precip_period,

      -- pressure
      pressure,
      pressure - lag(pressure, 1) over (partition by call_sign order by date) as pressure_minus_1,
      pressure - lag(pressure, 2) over (partition by call_sign order by date) as pressure_minus_2,

      temperature,
      dew_temperature,
      latitude, 
      longitude, 
      elevation

    FROM weather_parsed
    {where_clause}
    ORDER BY call_sign, date;
    """.format(where_clause=where_clause))
  return weatherFeatures