1. Select relevant features
2. Encode numerical + categorical
3. Separate 2019 as test set + checkpoint
4. Class imbalance - undersampling
5. Scaling based on distributions (MinMax, Log, StandardScalar)
6. Handling outliers
7. Imputing nulls - mean/median
8. Checkpoint baseline (MinMax scaling, drop nulls, keep outliers)

## Setup

In [0]:

from pyspark.sql import SparkSession
from pyspark.sql.functions import lit
from pyspark.sql.functions import col, log1p, when

# Spark SQL API
from pyspark.sql import Window, DataFrame
from pyspark.sql import functions as F
from pyspark.sql.functions import lit, concat, col

from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType, ArrayType
from pyspark.ml.linalg import VectorUDT, SparseVector

import os
from functools import reduce
import numpy as np


In [0]:
# Mount blob storage and grant team access
blob_container  = "261project"       # The name of your container created in https://portal.azure.com
storage_account = "261teamderm"  # The name of your Storage account created in https://portal.azure.com
secret_scope = "261teamderm"           # The name of the scope created in your local computer using the Databricks CLI
secret_key = "261key"             # The name of the secret key created in your local computer using the Databricks CLI
team_blob_url = f"wasbs://{blob_container}@{storage_account}.blob.core.windows.net"  #points to the root of your team storage bucket
mids261_mount_path      = "/mnt/mids-w261" # the 261 course blob storage is mounted here.
# SAS Token: Grant the team limited access to Azure Storage resources
spark.conf.set(
  f"fs.azure.sas.{blob_container}.{storage_account}.blob.core.windows.net",
  dbutils.secrets.get(scope = secret_scope, key = secret_key)
  )

In [0]:
# Spark Context
sc = spark.sparkContext
spark

# Load data

In [0]:
display(f'{team_blob_url}/data/')

'wasbs://261project@261teamderm.blob.core.windows.net/data/'

In [0]:
# Load data from parquet file
folds_df = spark.read.parquet(f"{team_blob_url}/data/5yr_folds.parquet")
# test_df = spark.read.parquet(f"{team_blob_url}/data/5yr_test.parquet")

# Drop nulls
folds_df = folds_df.filter(folds_df.DEP_DEL15.isNotNull())

In [0]:
# Get class counts and dct to prepare for undersampling
pre_balance_class_counts = folds_df.groupBy("DEP_DEL15").count().collect()
pre_balance_class_dct = {row["DEP_DEL15"]: row["count"] for row in pre_balance_class_counts}

# Get df by class
minority_class_df = folds_df.filter(col("DEP_DEL15") == 1)  
majority_class_df = folds_df.filter(col("DEP_DEL15") == 0)

# Get fraction
fraction = pre_balance_class_dct[1] / pre_balance_class_dct[0]  # Fraction for sampling majority class
undersampled_majority_df = majority_class_df.sample(withReplacement=False, fraction=fraction, seed=42)
balanced_df = minority_class_df.union(undersampled_majority_df)

# Get groupby to visualize update
post_balance_class_counts = balanced_df.groupBy("DEP_DEL15").count().collect()

# Show results
display('Pre-balanced:', pre_balance_class_counts)
display('Post-balanced:', post_balance_class_counts)

'Pre-balanced:'

[Row(DEP_DEL15=0.0, count=20309828), Row(DEP_DEL15=1.0, count=4517628)]

'Post-balanced:'

[Row(DEP_DEL15=1.0, count=4517628), Row(DEP_DEL15=0.0, count=4519901)]

In [0]:
# groupy by on fold and split
display(balanced_df.groupBy("foldCol", "split").count().orderBy("foldCol", "split"))



foldCol,split,count
0,train,836453
0,val,205204
1,train,924106
1,val,229004
2,train,884833
2,val,236118
3,train,909500
3,val,226375
4,train,920293
4,val,232081


# Additional pre-processing

## Change specWeather to categorical (0, 1)

In [0]:
balanced_df = balanced_df.withColumn("specWeather", when(col("specWeather") > 0, 1).otherwise(0))

In [0]:
display(balanced_df.select(col("specWeather")).distinct())

specWeather
1
0


## Set columns

In [0]:
# Define feature columns
ignore_cols = ['ORIGIN', 'DEST','FL_DATE', 'CANCELLED', 'YEAR']

# Enumerate columns for ohe ("categorical"), already ohe ("ohe") and for scaling ()
need_ohe_cols = ['QUARTER', 'MONTH', 'DAY_OF_WEEK','OP_UNIQUE_CARRIER', 'DEP_TIME_BLK', 'ARR_TIME_BLK', 'DAY_OF_MONTH', 'HIST_ARR_FLT_NUM', 'HIST_DEP_FLT_NUM']
already_ohe_cols = ['ceiling_height_is_below_10000', 'ceiling_height_is_between_10000_20000', 'ceiling_height_is_above_20000', 'isHoliday', 'specWeather']
standard_scalar_cols = ['wind_direction', 'temperature', 'sea_level_pressure'] # wind, temp and pressure
min_max_cols = ['CRS_ELAPSED_TIME', 'DISTANCE', # Time and distance
                'visibility', 'dew_point', 'wind_speed', 'gust_speed',  # vis, dew, weather and wind
                'origin_yr_flights', 'dest_yr_flights', # origin/dest
                'HIST_ARR_DELAY', 'HIST_DEP_DELAY' # lag columns for previous flights
                ]
unscaled_cols =['pagerank'] # PageRank (which is null for yr1)

# Consolidate
categorical_cols = need_ohe_cols + already_ohe_cols
numerical_cols = standard_scalar_cols + min_max_cols + unscaled_cols

# Removing extra time columns
need_ohe_cols = [c for c in need_ohe_cols if c not in ['QUARTER', 'MONTH', 'DAY_OF_WEEK', 'DAY_OF_MONTH']]

label_col = 'DEP_DEL15'


## Drop columns, duplicates and checkpoint

In [0]:
# Clean the data
filtered_df = balanced_df.drop(*ignore_cols)
filtered_df = filtered_df.dropDuplicates()

In [0]:
filtered_df.write.parquet(f'{team_blob_url}/data/5yr_preprocessed.parquet', mode='overwrite')

# Remove outliers and checkpoint

In [0]:
# Remove outliers
outlier_bounds = {}
for col_name in numerical_cols:
    # Compute Q1, Q3, and IQR for the column
    quantiles = filtered_df.approxQuantile(col_name, [0.25, 0.75], 0.0)  # Approx quantiles: Q1 and Q3

    outlier_bounds[col_name] = quantiles

    if len(quantiles) == 2:

        q1, q3 = quantiles
        iqr = q3 - q1
        lower_bound = q1 - 1.5 * iqr
        upper_bound = q3 + 1.5 * iqr
        
        # Filter rows within the IQR bounds
        filtered_df = filtered_df.filter((col(col_name) >= lower_bound) & (col(col_name) <= upper_bound))

In [0]:
# Checkpoint
filtered_df.write.parquet(f'{team_blob_url}/data/1yr_preprocessed_no.parquet', mode='overwrite')

# Preprocess Test Sets

In [0]:
test_5yr = spark.read.parquet(f'{team_blob_url}/data/5yr_test.parquet')

In [0]:
# Preprocess test set by dropping DEP_DEL15 nulls, reconfiguring specWeather, and dropping unused cols
test_5yr = test_5yr.filter(test_5yr.DEP_DEL15.isNotNull())
test_5yr = test_5yr.withColumn("specWeather", F.when(col("specWeather") > 0, 1).otherwise(0))
test_5yr = test_5yr.drop(*['ORIGIN', 'DEST','FL_DATE', 'CANCELLED', 'YEAR'])

In [0]:
test_5yr.write.parquet(f'{team_blob_url}/data/5yr_test_preprocess', mode='overwrite')