In [11]:
# install imblearn to leverage SMOTE resample
!pip install imbalanced-learn

Collecting imbalanced-learn
  Obtaining dependency information for imbalanced-learn from https://files.pythonhosted.org/packages/a3/9e/fbe60a768502af54563dcb59ca7856f5a8833b3ad5ada658922e1ab09b7f/imbalanced_learn-0.11.0-py3-none-any.whl.metadata
  Downloading imbalanced_learn-0.11.0-py3-none-any.whl.metadata (8.3 kB)
Downloading imbalanced_learn-0.11.0-py3-none-any.whl (235 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m235.6/235.6 kB[0m [31m9.3 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: imbalanced-learn
Successfully installed imbalanced-learn-0.11.0


In [12]:
# import some relevant libraries
from IPython.display import display
from pyspark.sql import SparkSession
from pyspark.sql import Window
import pyspark.sql.types as T
import pyspark.sql.functions as F

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

from datetime import datetime, timedelta, date
 
sns.set()

# Spark session & context
spark = SparkSession.builder.master("local").getOrCreate()
sc = spark.sparkContext

# Load Data
Load the underlying dataset and do some quick EDA

In [13]:
# dataset name
## NOTE replace this with new dataset
dataset_name = "OTPW_3M" #"OTPW_3M"

In [14]:
# ingest flights dataset
# NOTEO edit this to point to new data
df_flights = spark.read.format('parquet')\
                        .option("inferSchema", True)\
                        .load(f"../data/{dataset_name}").cache()

# Group data by year and count records for each year
year_counts = df_flights.groupBy('YEAR').count()
 
# Show the count of records for each year
year_counts.show()

+----+-------+
|YEAR|  count|
+----+-------+
|2015|1401363|
+----+-------+



In [15]:
# how many delays per total set
print(f"Number of Flights Delayed in 60M: {df_flights.filter('DEP_DEL15 = 1').count()}")
print(f"Total Number of Flights in 60M: {df_flights.count()}")
print(f"Percentage of Flights Delayed in 60M: {round(df_flights.filter('DEP_DEL15 = 1').count()/df_flights.count()*100,2)}%")
 
majority_class_proba = 1-round(df_flights.filter('DEP_DEL15 = 1').count()/df_flights.count())

Number of Flights Delayed in 60M: 277302
Total Number of Flights in 60M: 1401363
Percentage of Flights Delayed in 60M: 19.79%


# Select and Clean Some Numeric Features

In [16]:
## Weather features
numerics_features = ["HourlyPrecipitation", "HourlyDryBulbTemperature", 
                     "HourlyRelativeHumidity", "HourlySeaLevelPressure", 
                     "HourlyStationPressure", "HourlyVisibility",
                     "HourlyWindSpeed", "ELEVATION"]
 
## Cast Column to Float
for c in numerics_features:
    df_flights = df_flights.withColumn(c, F.col(c).cast("Float"))
 
# filter out Null values from our dataset (12M)
join_keys = ['FL_DATE', 'OP_UNIQUE_CARRIER', 'TAIL_NUM', 'OP_CARRIER_FL_NUM', 'sched_depart_date_time_UTC', 'DEP_DEL15']
df_flights_numeric_non_null = df_flights.select(join_keys + numerics_features).na.drop().cache()
print(f"Total Number of Flights (Non Null): {df_flights_numeric_non_null.count()}")

Total Number of Flights (Non Null): 1088005


# Features Vectorization

In [17]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StringIndexer


# build a vector assembler for the input features
outputCol = "features"
va = VectorAssembler(inputCols=numerics_features, outputCol=outputCol)

# setting up a pipeline
pipeline_stages = [va]
feature_engineered_pipeline = Pipeline(stages=pipeline_stages)

## apply the pipeline to the train and test set
fitted_pipeline = feature_engineered_pipeline.fit(df_flights_numeric_non_null)
df_flights_numeric_non_null_VA = fitted_pipeline.transform(df_flights_numeric_non_null)
df_flights_numeric_non_null_VA.show(10)

+----------+-----------------+--------+-----------------+--------------------------+---------+-------------------+------------------------+----------------------+----------------------+---------------------+----------------+---------------+---------+--------------------+
|   FL_DATE|OP_UNIQUE_CARRIER|TAIL_NUM|OP_CARRIER_FL_NUM|sched_depart_date_time_UTC|DEP_DEL15|HourlyPrecipitation|HourlyDryBulbTemperature|HourlyRelativeHumidity|HourlySeaLevelPressure|HourlyStationPressure|HourlyVisibility|HourlyWindSpeed|ELEVATION|            features|
+----------+-----------------+--------+-----------------+--------------------------+---------+-------------------+------------------------+----------------------+----------------------+---------------------+----------------+---------------+---------+--------------------+
|2015-01-10|               AA|  N786AA|                2|       2015-01-10 17:00:00|      0.0|               0.01|                    58.0|                  81.0|                 29.97

# SMOTE Implementation

The idea behind SMOTE is to introduce some level of randomness when creating synthetic data for the minority class. The machniery of SMOTE is performing many batches of k-nearest neighbours on the miniory class of data and by that, generate a new set of datapoint that follows the outputs neighbors. Typically SMOTE is a more robust way to upsample the data compared to traditional resampling technique. For our experiment, we modified the implementation described i [SMOTE implementation in PySpark](https://medium.com/@hwangdb/smote-implementation-in-pyspark-76ec4ffa2f1d)  to upsample our data using Spark's Vectorized UDF. However, we're still facing challenges with long runtime (> 10 hours) when applying SMOTE upsampling on a 3M dataset. We were able to drill down on the long training time problem by linking it to the large amount of data being shuffle between all cluster nodes when a paritioned window function is applied. Noted for this study, this is an area of improvement that we wanted more time to deep dive into.

In [54]:
from pyspark.ml.feature import BucketedRandomProjectionLSH
from pyspark.ml.linalg import Vectors,VectorUDT
from imblearn.over_sampling import SMOTE
from pyspark.sql import DataFrame
from functools import reduce


In [26]:
# declare features and label to be use
features = 'features'
label = 'DEP_DEL15'

# copy the dataset so that we don't accidently overwrite the original
vectorized_sdf = df_flights_numeric_non_null_VA

In [19]:
# SMOTE configurations
smote_config = {
        "k": 3,              # number of neighbors
        "multiplier": 3,     # number of synthetic batches
        "seed": 234,         # random seeds
        "bucketLength": 3   # bucket length to control the probability of features being hashed into the same bucket
}

In [43]:
# split the data by mininum and maximum
# from earlier EDA, the minority class is 1, meaning the flight is delayed
cols = [features, label]
vectorized_sdf = vectorized_sdf.select(*cols).sample(fraction=0.10)
dataInput_min = vectorized_sdf.filter(f"{label} == '1'")
dataInput_maj = vectorized_sdf.filter(f"{label} == '0'")

# LSH, bucketed random projection
# Bucket Random Projection LSH calculates
brp = BucketedRandomProjectionLSH(inputCol=features, outputCol="hashes", seed=smote_config['seed'], bucketLength=smote_config['bucketLength'])

# Applies SMOTE only on existing minority instances    
model = brp.fit(dataInput_min)
model.transform(dataInput_min)

# here distance is calculated from brp's param inputCol
self_join_w_distance = model.approxSimilarityJoin(dataInput_min, dataInput_min, float("inf"), distCol="EuclideanDistance")
self_join_w_distance.show(10)

+--------------------+--------------------+------------------+
|            datasetA|            datasetB| EuclideanDistance|
+--------------------+--------------------+------------------+
|{[0.0,-24.0,73.0,...|{1.0, [0.0,0.0,46...|48.759756944252494|
|{[0.0,-24.0,73.0,...|{1.0, [0.0,8.0,80...|246.92416954269333|
|{[0.0,-24.0,73.0,...|{1.0, [0.0,9.0,84...|262.96065139070686|
|{[0.0,-24.0,73.0,...|{1.0, [0.0,10.0,4...|47.970058376160665|
|{[0.0,-24.0,73.0,...|{1.0, [0.0,10.0,4...|45.206304874015046|
|{[0.0,-24.0,73.0,...|{1.0, [0.0,11.0,4...| 46.63065300088498|
|{[0.0,-24.0,73.0,...|{1.0, [0.0,11.0,6...|127.36071675040209|
|{[0.0,-24.0,73.0,...|{1.0, [0.0,11.0,7...|200.95830414827935|
|{[0.0,-24.0,73.0,...|{1.0, [0.0,11.0,7...|186.09608861742507|
|{[0.0,-24.0,73.0,...|{1.0, [0.0,12.0,5...|183.10233072174813|
+--------------------+--------------------+------------------+
only showing top 10 rows



In [45]:
self_join_w_distance.count()

11863562

In [44]:
# remove self-comparison (distance 0)
self_join_w_distance = self_join_w_distance.filter(self_join_w_distance.EuclideanDistance > 0)

over_original_rows = Window.partitionBy("datasetA").orderBy("EuclideanDistance")

self_similarity_df = self_join_w_distance.withColumn("r_num", F.row_number().over(over_original_rows))

self_similarity_df.show(10)

+--------------------+--------------------+------------------+-----+
|            datasetA|            datasetB| EuclideanDistance|r_num|
+--------------------+--------------------+------------------+-----+
|{[0.0,-9.0,61.0,3...|{1.0, [0.0,16.0,4...| 65.90818300739144|    1|
|{[0.0,-9.0,61.0,3...|{1.0, [0.0,0.0,69...|  77.5459902270549|    2|
|{[0.0,-9.0,61.0,3...|{1.0, [0.0,1.0,68...| 77.57315386629878|    3|
|{[0.0,-9.0,61.0,3...|{1.0, [0.0,74.0,1...|290.52299251928054|    4|
|{[0.0,-9.0,61.0,3...|{1.0, [0.0,34.0,6...|   357.25361257248|    5|
|{[0.0,-9.0,61.0,3...|{1.0, [0.0,36.0,6...|357.38525501022826|    6|
|{[0.0,-9.0,61.0,3...|{1.0, [0.0,46.0,4...| 359.1979482089619|    7|
|{[0.0,-9.0,61.0,3...|{1.0, [0.01999999...|359.40343807344306|    8|
|{[0.0,-9.0,61.0,3...|{1.0, [0.0,49.0,3...| 360.1249158261858|    9|
|{[0.0,-9.0,61.0,3...|{1.0, [0.0,52.0,4...| 360.3580927339047|   10|
+--------------------+--------------------+------------------+-----+
only showing top 10 rows



In [55]:
self_similarity_df_selected = self_similarity_df.filter(self_similarity_df.r_num <= smote_config["k"])
 
over_original_rows_no_order = Window.partitionBy('datasetA')

# list to store batches of synthetic data
res = [dataInput_min, dataInput_maj]

# two udf for vector add and subtract, subtraction include a random factor [0,1]
subtract_vector_udf = F.udf(lambda arr: random.uniform(0, 1)*(arr[0]-arr[1]), VectorUDT())
add_vector_udf = F.udf(lambda arr: arr[0]+arr[1], VectorUDT())


# apply SMOTE iterations
for i in range(smote_config['multiplier']):
    print("generating batch %s of synthetic instances"%i)
    # logic to randomly select neighbour: pick the largest random number generated row as the neighbour
    df_random_sel = self_similarity_df_selected.withColumn("rand", F.rand()).withColumn('max_rand', F.max('rand').over(over_original_rows_no_order))\
                        .where(F.col('rand') == F.col('max_rand')).drop(*['max_rand','rand','r_num'])
    # create synthetic feature numerical part
    df_vec_diff = df_random_sel.select('*', subtract_vector_udf(F.array('datasetA.features', 'datasetB.features')).alias('vec_diff'))
    df_vec_modified = df_vec_diff.select('*', add_vector_udf(F.array('datasetA.features', 'vec_diff')).alias('features'))
    # df_vec_diff = df_random_sel.withColumn('vec_diff', subtract_vector_udf(F.col('')))
    
    # this df_vec_modified is the synthetic minority instances,
    # df_vec_modified = df_vec_modified.drop(*['datasetA','datasetB','vec_diff','EuclideanDistance'])
    df_vec_modified = df_vec_modified.select(features)\
                                    .withColumn('DEP_DEL15', F.lit(1))
    res.append(df_vec_modified)

oversampled_df = reduce(DataFrame.unionByName, res)
oversampled_df = oversampled_df.select("*").orderBy(F.rand())
oversampled_df = oversampled_df.repartition(252).cache()
oversampled_df.show(10)

generating batch 0 of synthetic instances
generating batch 1 of synthetic instances
generating batch 2 of synthetic instances


PythonException: 
  An exception was thrown from the Python worker. Please see the stack trace below.
Traceback (most recent call last):
  File "/tmp/ipykernel_282/496026766.py", line 9, in <lambda>
NameError: name 'random' is not defined


In [None]:
############################## spark smote oversampling ##########################
#for categorical columns, must take its stringIndexed form (smote should be after string indexing, default by frequency)
 
def smote(vectorized_sdf, features, label, smote_config):
    '''
    contains logic to perform smote oversampling, given a spark df with 2 classes
    inputs:
    * vectorized_sdf: cat cols are already stringindexed, num cols are assembled into 'features' vector
      df target col should be 'label'
    * smote_config: config obj containing smote parameters
    output:
    * oversampled_df: spark df after smote oversampling
    '''
    cols = [features, label]
    vectorized_sdf = vectorized_sdf.select(*cols)
    dataInput_min = vectorized_sdf.filter(f"{label} == '1'")
    dataInput_maj = vectorized_sdf.filter(f"{label} == '0'")
    
    # LSH, bucketed random projection
    brp = BucketedRandomProjectionLSH(inputCol="input_features", outputCol="hashes",seed=smote_config['seed'], bucketLength=smote_config['bucketLength'])
 
    # smote only applies on existing minority instances    
    model = brp.fit(dataInput_min)
    model.transform(dataInput_min)
 
    # here distance is calculated from brp's param inputCol
    self_join_w_distance = model.approxSimilarityJoin(dataInput_min, dataInput_min, float("inf"), distCol="EuclideanDistance")
 
    # remove self-comparison (distance 0)
    self_join_w_distance = self_join_w_distance.filter(self_join_w_distance.EuclideanDistance > 0)
 
    over_original_rows = Window.partitionBy("datasetA").orderBy("EuclideanDistance")
 
    self_similarity_df = self_join_w_distance.withColumn("r_num", F.row_number().over(over_original_rows))
 
    self_similarity_df_selected = self_similarity_df.filter(self_similarity_df.r_num <= smote_config["k"])
 
    over_original_rows_no_order = Window.partitionBy('datasetA')
 
    # list to store batches of synthetic data
    res = [dataInput_min, dataInput_maj]
    
    # two udf for vector add and subtract, subtraction include a random factor [0,1]
    subtract_vector_udf = F.pandas_udf(lambda arr: random.uniform(0, 1)*(arr[0]-arr[1]), VectorUDT())
    add_vector_udf = F.pandas_udf(lambda arr: arr[0]+arr[1], VectorUDT())
 
 
    # apply SMOTE iterations
    for i in range(smote_config['multiplier']):
        print("generating batch %s of synthetic instances"%i)
        # logic to randomly select neighbour: pick the largest random number generated row as the neighbour
        df_random_sel = self_similarity_df_selected.withColumn("rand", F.rand()).withColumn('max_rand', F.max('rand').over(over_original_rows_no_order))\
                            .where(F.col('rand') == F.col('max_rand')).drop(*['max_rand','rand','r_num'])
        # create synthetic feature numerical part
        df_vec_diff = df_random_sel.select('*', subtract_vector_udf(F.array('datasetA.input_features', 'datasetB.input_features')).alias('vec_diff'))
        df_vec_modified = df_vec_diff.select('*', add_vector_udf(F.array('datasetA.input_features', 'vec_diff')).alias('input_features'))
        # df_vec_diff = df_random_sel.withColumn('vec_diff', subtract_vector_udf(F.col('')))
        
        # this df_vec_modified is the synthetic minority instances,
        # df_vec_modified = df_vec_modified.drop(*['datasetA','datasetB','vec_diff','EuclideanDistance'])
        df_vec_modified = df_vec_modified.select(features)\
                                        .withColumn('DEP_DEL15', F.lit(1))
        res.append(df_vec_modified)
    
    oversampled_df = reduce(DataFrame.unionByName, res)
    oversampled_df = oversampled_df.select("*").orderBy(F.rand())
    oversampled_df = oversampled_df.repartition(252).cache()  # evenly distributed the shuffle data
    # # union synthetic instances with original full (both minority and majority) df
    # # oversampled_df = dfunion.union(vectorized_sdf.select(dfunion.columns))
    
    return oversampled_df