## Random Forest Classifier

#### Standard Imports

In [0]:
from pyspark.sql import functions as F
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StandardScaler, OneHotEncoder, StringIndexer, Imputer
from pyspark.sql.functions import log1p, unix_timestamp, to_date, col, when, lit, row_number, hour, dayofweek, minute, count
from pyspark.sql.window import Window
from pyspark.ml.linalg import Vectors
from pyspark.sql.types import IntegerType, DoubleType, StringType
from pyspark.mllib.evaluation import MulticlassMetrics, BinaryClassificationMetrics
import numpy as np
import pandas as pd
from sklearn.model_selection import TimeSeriesSplit
from imblearn.over_sampling import SMOTE
from functools import reduce
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import classification_report, accuracy_score, f1_score, precision_score, recall_score
import matplotlib.pyplot as plt
import warnings
import math
from pyspark.ml.feature import VectorAssembler, StandardScaler
import mlflow
import mlflow.spark
from itertools import product
warnings.filterwarnings("ignore", category=FutureWarning) #ignore

#### Setup Path

In [0]:
def data_set(time):
    if time == 3:
        return "_3m"
    elif time == 6:
        return "_6m"
    elif time == 12:
        return "_1y"
    elif time == 'all':
        return ""
    else:
        raise ValueError("time must be 3, 6, 12, or 'all'")

### Paths for Training Data

In [0]:
dbfs_path = "dbfs:/student-groups/Group_02_01"
splits_path = f"{dbfs_path}/fasw"
folds_input_path = f"{splits_path}/processed_rolling_windows"

window_dirs = dbutils.fs.ls(folds_input_path)
train_windows = [d.name for d in window_dirs if d.name.startswith("window_") and d.name.endswith("_train/")]
N_SPLITS = len(train_windows)

print(f"Loading folds from: {folds_input_path}")
print(f"Detected {N_SPLITS} rolling windows")

Loading folds from: dbfs:/student-groups/Group_02_01/fasw/processed_rolling_windows
Detected 2 rolling windows


In [0]:
sample_window_path = f"{folds_input_path}/window_2_train"
df_sample = spark.read.parquet(sample_window_path)

null_counts = (
    df_sample.select([
        F.sum(F.col(c).isNull().cast("int")).alias(c)
        for c in df_sample.columns
    ])
    .collect()[0]
    .asDict()
)

total_rows = df_sample.count()

print(f"{'Column':<50} {'Type':<20} {'Null %':>10}")
print("-" * 82)

for field in df_sample.schema.fields:
    col_name = field.name
    dtype = str(field.dataType)
    null_count = null_counts.get(col_name, 0)
    null_pct = (null_count / total_rows) * 100 if total_rows > 0 else 0
    print(f"{col_name:<50} {dtype:<20} {null_pct:>9.2f}%")

Column                                             Type                     Null %
----------------------------------------------------------------------------------
YEAR                                               IntegerType()             0.00%
QUARTER                                            IntegerType()             0.00%
MONTH                                              IntegerType()             0.00%
DAY_OF_MONTH                                       IntegerType()             0.00%
DAY_OF_WEEK                                        IntegerType()             0.00%
IS_US_HOLIDAY                                      DoubleType()              0.00%
OP_UNIQUE_CARRIER                                  StringType()              0.00%
TAIL_NUM                                           StringType()              0.00%
ORIGIN                                             StringType()              0.00%
ORIGIN_STATE_ABR                                   StringType()              0.00%
DEST

In [0]:
window_col_dict = {}
window_col_list = {}

for w in train_windows:
    path = f"{folds_input_path}/{w}"
    df = spark.read.parquet(path)
    
    # --- Column info ---
    cols = set(df.columns)
    col_list = df.columns
    window_col_dict[w] = cols
    window_col_list[w] = col_list
    print(f"{w}: {len(cols)} columns")
    print(f"  Full column list: {col_list[:10]}... (showing first 10)")
    
    # --- Class imbalance check ---
    label_counts = df.groupBy("ARR_DEL15").count().collect()
    counts_dict = {row['ARR_DEL15']: row['count'] for row in label_counts}
    num_pos = counts_dict.get(1, 0)
    num_neg = counts_dict.get(0, 0)
    total = num_pos + num_neg
    pos_pct = num_pos / total * 100 if total > 0 else 0
    neg_pct = num_neg / total * 100 if total > 0 else 0
    print(f"  Class distribution -> 1: {num_pos} ({pos_pct:.2f}%), 0: {num_neg} ({neg_pct:.2f}%)\n")

print("\n" + "="*80)

# Columns that appear in ALL windows
common_cols = set.intersection(*window_col_dict.values())
print(f"Columns present in ALL windows ({len(common_cols)}):")
print(sorted(list(common_cols)))

print("\n" + "="*80)

# Columns that differ between windows
all_cols = set.union(*window_col_dict.values())
print(f"Columns found in SOME but not all windows ({len(all_cols - common_cols)}):")
for col in sorted(list(all_cols - common_cols)):
    windows_with_col = [w for w, cols in window_col_dict.items() if col in cols]
    print(f"  {col}: present in {len(windows_with_col)}/{len(train_windows)} windows - {windows_with_col}")

print("\n" + "="*80)

# Detailed per-window missing columns
print("Per-window missing columns:\n")
for w, cols in window_col_dict.items():
    missing = all_cols - cols
    if missing:
        print(f"{w} is MISSING {len(missing)} columns:")
        print(sorted(list(missing)))
    else:
        print(f"{w} has all columns.")
    print("-"*50)

print(f"Total unique columns across all windows: {len(all_cols)}")
print(f"Common columns in ALL windows: {len(common_cols)}")
print(f"Inconsistent columns: {len(all_cols - common_cols)}")


window_1_train/: 101 columns
  Full column list: ['YEAR', 'QUARTER', 'MONTH', 'DAY_OF_MONTH', 'DAY_OF_WEEK', 'IS_US_HOLIDAY', 'OP_UNIQUE_CARRIER', 'TAIL_NUM', 'ORIGIN', 'ORIGIN_STATE_ABR']... (showing first 10)
  Class distribution -> 1: 2027479 (49.98%), 0: 2029369 (50.02%)

window_2_train/: 101 columns
  Full column list: ['YEAR', 'QUARTER', 'MONTH', 'DAY_OF_MONTH', 'DAY_OF_WEEK', 'IS_US_HOLIDAY', 'OP_UNIQUE_CARRIER', 'TAIL_NUM', 'ORIGIN', 'ORIGIN_STATE_ABR']... (showing first 10)
  Class distribution -> 1: 1993288 (49.98%), 0: 1995228 (50.02%)


Columns present in ALL windows (101):
['ARR_DEL15', 'CARRIER_DELAY', 'CRS_ARR_TIME_BLOCK', 'CRS_ARR_TIME_BLOCK_idx', 'CRS_ARR_TIME_BLOCK_ohe', 'CRS_DEP_DATETIME_UTC', 'CRS_DEP_TIME_BLOCK', 'CRS_DEP_TIME_BLOCK_idx', 'CRS_DEP_TIME_BLOCK_ohe', 'CRS_ELAPSED_TIME', 'CRS_ELAPSED_TIME_log', 'DAY_OF_MONTH', 'DAY_OF_MONTH_idx', 'DAY_OF_MONTH_ohe', 'DAY_OF_WEEK', 'DAY_OF_WEEK_idx', 'DAY_OF_WEEK_ohe', 'DEST', 'DEST_ELEVATION_FT', 'DEST_ELEVATION_FT_log

### Path for Testing Data

In [0]:
blind_test_set_path = f"dbfs:/student-groups/Group_02_01/fasw/processed_train_test/test"
blind_train_set_path = f"dbfs:/student-groups/Group_02_01/fasw/processed_train_test/train"

print(f"Blind Test set path: {blind_test_set_path}")
print(f"Blind Train set path: {blind_train_set_path}")

Blind Test set path: dbfs:/student-groups/Group_02_01/fasw/processed_train_test/test
Blind Train set path: dbfs:/student-groups/Group_02_01/fasw/processed_train_test/train


In [0]:
def class_imbalance_report(path, label_col="ARR_DEL15"):
    df = spark.read.parquet(path)
    total = df.count()
    counts = df.groupBy(label_col).count().toPandas().set_index(label_col)['count'].to_dict()
    num_pos = counts.get(1, 0)
    num_neg = counts.get(0, 0)
    pos_pct = num_pos / total * 100 if total > 0 else 0
    neg_pct = num_neg / total * 100 if total > 0 else 0
    print(f"Path: {path}")
    print(f"  Total rows: {total:,}")
    print(f"  {label_col}=1: {num_pos:,} ({pos_pct:.2f}%)")
    print(f"  {label_col}=0: {num_neg:,} ({neg_pct:.2f}%)")
    print("-" * 60)

class_imbalance_report(blind_train_set_path)
class_imbalance_report(blind_test_set_path)

Path: dbfs:/student-groups/Group_02_01/fasw/processed_train_test/train
  Total rows: 23,873,225
  ARR_DEL15=1: 4,405,402 (18.45%)
  ARR_DEL15=0: 19,467,823 (81.55%)
------------------------------------------------------------
Path: dbfs:/student-groups/Group_02_01/fasw/processed_train_test/test
  Total rows: 7,271,711
  ARR_DEL15=1: 1,387,904 (19.09%)
  ARR_DEL15=0: 5,883,807 (80.91%)
------------------------------------------------------------


In [0]:
def downsample(train_df, target_ratio=1.0, verbose=False):
    '''Downsamples train_df to balance classes'''
    delay_count = train_df.filter(F.col("ARR_DEL15") == 1).count()
    non_delay_count = train_df.filter(F.col("ARR_DEL15") == 0).count()
    
    keep_percent = (delay_count / non_delay_count) / target_ratio

    if keep_percent >= 1.0:
        print("Warning: Target ratio is already balanced or majority is smaller. No sampling applied.")
        return train_df
    
    train_delay = train_df.filter(F.col('ARR_DEL15') == 1)
    train_non_delay = train_df.filter(F.col('ARR_DEL15') == 0).sample(withReplacement=False, fraction=keep_percent, seed=42)
    train_downsampled = train_delay.union(train_non_delay)
    return train_downsampled

In [0]:
df_train_master = spark.read.parquet(blind_train_set_path)
df_train_master = downsample(df_train_master, target_ratio=1.0)

In [0]:
train_master_class_balance_check = df_train_master.groupBy("ARR_DEL15").count().orderBy("ARR_DEL15")
display(train_master_class_balance_check)

ARR_DEL15,count
0,4405998
1,4405402


In [0]:
df_test = spark.read.parquet(blind_test_set_path)
null_counts = (
    df_test.select([
        F.sum(F.col(c).isNull().cast("int")).alias(c)
        for c in df_test.columns
    ])
    .collect()[0]
    .asDict()
)

total_rows = df_test.count()

print(f"Test Set Analysis: {blind_test_set_path}")
print(f"Total rows: {total_rows:,}")
print(f"Total columns: {len(df_test.columns)}")
print()
print(f"{'Column':<50} {'Type':<20} {'Null %':>10}")
print("-" * 82)

for field in df_test.schema.fields:
    col_name = field.name
    dtype = str(field.dataType)
    null_count = null_counts.get(col_name, 0)
    null_pct = (null_count / total_rows) * 100 if total_rows > 0 else 0
    print(f"{col_name:<50} {dtype:<20} {null_pct:>9.2f}%")

Test Set Analysis: dbfs:/student-groups/Group_02_01/fasw/processed_train_test/test
Total rows: 7,271,711
Total columns: 101

Column                                             Type                     Null %
----------------------------------------------------------------------------------
YEAR                                               IntegerType()             0.00%
QUARTER                                            IntegerType()             0.00%
MONTH                                              IntegerType()             0.00%
DAY_OF_MONTH                                       IntegerType()             0.00%
DAY_OF_WEEK                                        IntegerType()             0.00%
IS_US_HOLIDAY                                      DoubleType()              0.00%
OP_UNIQUE_CARRIER                                  StringType()              0.00%
TAIL_NUM                                           StringType()              0.00%
ORIGIN                                       

In [0]:
test_cols = set(df_test.columns)
print(f"{'Columns in TEST data':<50} {len(test_cols):>10}")
print(f"{'Columns common to ALL training windows':<50} {len(common_cols):>10}")
print(f"{'Columns in ANY training window':<50} {len(all_cols):>10}")

# Compare test vs common training columns
common_test_train = test_cols & common_cols
only_in_test = test_cols - common_cols
only_in_train = common_cols - test_cols

print(f"Alighment Check Test & Training Data")
print(f"{'Columns in BOTH test and ALL train windows':<50} {len(common_test_train):>10}")
print(f"{'Columns ONLY in test (missing from ALL train)':<50} {len(only_in_test):>10}")
print(f"{'Columns ONLY in ALL train (missing from test)':<50} {len(only_in_train):>10}")

if only_in_train:
    print(f"\nCOLUMNS IN ALL TRAINING WINDOWS BUT MISSING FROM TEST ({len(only_in_train)}):")
    for col in sorted(only_in_train):
        print(f"  - {col}")

if only_in_test:
    print(f"\nCOLUMNS IN TEST BUT MISSING FROM ALL TRAINING WINDOWS ({len(only_in_test)}):")
    for col in sorted(only_in_test):
        print(f"  - {col}")

Columns in TEST data                                      101
Columns common to ALL training windows                    101
Columns in ANY training window                            101
Alighment Check Test & Training Data
Columns in BOTH test and ALL train windows                101
Columns ONLY in test (missing from ALL train)               0
Columns ONLY in ALL train (missing from test)               0


#### Summary of Nulls/Naan Values on all columns

In [0]:
leakage_cols = ['CARRIER_DELAY', 'LATE_AIRCRAFT_DELAY', 'NAS_DELAY', 'SECURITY_DELAY']
vector_cols_to_remove = ['num_vector', 'ohe_vector', 'log_scaled_features', 'log_unscaled_features', 'scaled_num_vector', 'full_unscaled_features']
ohe_cols = [c for c in df_test.columns if c.endswith('_ohe')]
log_cols = [c for c in df_test.columns if c.endswith('_log')]
metadata_cols = ['TAIL_NUM', 'CRS_DEP_DATETIME_UTC']
cat_string_cols = [
    'OP_UNIQUE_CARRIER', 'ORIGIN', 'DEST', 'ORIGIN_STATE_ABR', 'DEST_STATE_ABR',
    'ORIGIN_SIZE', 'DEST_SIZE', 'HourlyWindCardinalDirection',
    'CRS_DEP_TIME_BLOCK', 'CRS_ARR_TIME_BLOCK'
]
temporal_numeric_cols = ['YEAR']
cols_to_remove = leakage_cols + vector_cols_to_remove + ohe_cols + log_cols + metadata_cols + cat_string_cols + temporal_numeric_cols
cols_to_remove = [c for c in cols_to_remove if c in df_test.columns]
df_test_final = df_test.drop(*cols_to_remove)
feature_cols = [c for c in df_test_final.columns if c != 'ARR_DEL15']

print("\nREMAINING COLUMNS:")
for c in df_test_final.columns:
    print("  -", c)


REMAINING COLUMNS:
  - QUARTER
  - MONTH
  - DAY_OF_MONTH
  - DAY_OF_WEEK
  - IS_US_HOLIDAY
  - ARR_DEL15
  - CRS_ELAPSED_TIME
  - DISTANCE
  - ORIGIN_LAT
  - ORIGIN_LONG
  - ORIGIN_ELEVATION_FT
  - DEST_LAT
  - DEST_LON
  - DEST_ELEVATION_FT
  - overall_cloud_frac_0_1
  - lowest_cloud_ft
  - highest_cloud_ft
  - has_few
  - has_sct
  - has_bkn
  - has_ovc
  - HourlyAltimeterSetting
  - HourlyWindGustSpeed
  - HourlyWindSpeed
  - light
  - heavy
  - thunderstorm
  - rain_or_drizzle
  - freezing_conditions
  - snow
  - hail_or_ice
  - reduced_visibility
  - spatial_effects
  - unknown_precip
  - origin_pagerank
  - dest_pagerank
  - origin_out_degree
  - dest_in_degree
  - prev_flight_arr_delay_clean
  - actual_to_crs_time_to_next_flight_diff_mins_clean
  - crs_time_to_next_flight_diff_mins
  - OP_UNIQUE_CARRIER_idx
  - ORIGIN_idx
  - ORIGIN_STATE_ABR_idx
  - DEST_idx
  - DEST_STATE_ABR_idx
  - ORIGIN_SIZE_idx
  - DEST_SIZE_idx
  - HourlyWindCardinalDirection_idx
  - QUARTER_idx
  - MO

In [0]:
available_cols = set(df_test_final.columns)

numeric_cols_to_use = [
    'DISTANCE', 
    'ORIGIN_ELEVATION_FT', 
    'DEST_ELEVATION_FT', 
    'overall_cloud_frac_0_1', 
    'lowest_cloud_ft', 
    'highest_cloud_ft', 
    # 'has_few',                  # 0.0007% - REMOVED
    # 'has_sct',                  # 0.0006% - REMOVED
    # 'has_bkn',                  # 0.0008% - REMOVED
    'has_ovc', 
    'HourlyAltimeterSetting', 
    'HourlyWindGustSpeed', 
    'HourlyWindSpeed', 
    'light', 
    # 'heavy',                    # 0.0001% - REMOVED
    # 'thunderstorm',             # 0.0003% - REMOVED
    'rain_or_drizzle', 
    # 'freezing_conditions',      # 0.0005% - REMOVED
    'snow', 
    # 'hail_or_ice',              # 0.0000% - REMOVED
    'reduced_visibility', 
    # 'spatial_effects',          # 0.0000% - REMOVED
    # 'unknown_precip',           # 0.0000% - REMOVED
    'ORIGIN_LAT', 
    'ORIGIN_LONG', 
    'DEST_LAT', 
    'DEST_LON', 
    'MONTH', 
    'DAY_OF_MONTH', 
    'DAY_OF_WEEK', 
    'IS_US_HOLIDAY', 
    'origin_pagerank', 
    'dest_pagerank', 
    'prev_flight_arr_delay_clean', 
    'actual_to_crs_time_to_next_flight_diff_mins_clean'
]

cat_cols_to_use = [
    'OP_UNIQUE_CARRIER_idx', 
    'ORIGIN_idx', 
    'DEST_idx', 
    'ORIGIN_STATE_ABR_idx', 
    'DEST_STATE_ABR_idx', 
    'ORIGIN_SIZE_idx', 
    # 'DEST_SIZE_idx',            # 0.0005% - REMOVED
    'HourlyWindCardinalDirection_idx', 
    'CRS_DEP_TIME_BLOCK_idx', 
    'CRS_ARR_TIME_BLOCK_idx'
]

all_model_features = numeric_cols_to_use + cat_cols_to_use


print(f"FEATURE DEFINITIONS FOR MODEL")
print(f"Numeric columns: {len(numeric_cols_to_use)}")
print(f"Indexed categorical columns: {len(cat_cols_to_use)}")
print(f"Total model features: {len(all_model_features)}")

missing_from_test = [c for c in all_model_features if c not in available_cols]
present_in_test = [c for c in all_model_features if c in available_cols]

print(f"\nPresent in test: {len(present_in_test)}/{len(all_model_features)}")
print(f"Missing from test: {len(missing_from_test)}")

if missing_from_test:
    print(f"\n⚠️ Missing features:")
    for col in missing_from_test:
        print(f"  - {col}")

FEATURE DEFINITIONS FOR MODEL
Numeric columns: 26
Indexed categorical columns: 9
Total model features: 35

Present in test: 35/35
Missing from test: 0


In [0]:
%skip
print("="*80)
print("STEP 1: CORRELATION ANALYSIS - FIND REDUNDANT FEATURES")
print("="*80)

# Select only numeric features for correlation
numeric_features_present = [col for col in numeric_cols_to_use if col in available_cols]

# Convert to Pandas for correlation analysis (sample if dataset is large)
sample_size = min(100000, df_train_master.count())
df_sample = df_train_master.select(numeric_features_present + ['ARR_DEL15']).sample(
    fraction=sample_size/df_train_master.count(), 
    seed=42
).toPandas()

# Correlation with target
target_corr = df_sample[numeric_features_present].corrwith(df_sample['ARR_DEL15']).abs().sort_values(ascending=False)
print(target_corr)

# Find highly correlated feature pairs (redundant features)
corr_matrix = df_sample[numeric_features_present].corr().abs()
upper_triangle = corr_matrix.where(np.triu(np.ones(corr_matrix.shape), k=1).astype(bool))

redundant_pairs = []
for column in upper_triangle.columns:
    high_corr = upper_triangle[column][upper_triangle[column] > 0.85]  # 0.85+ is highly redundant
    if len(high_corr) > 0:
        for idx in high_corr.index:
            redundant_pairs.append((column, idx, high_corr[idx]))

print(f"\nHighly correlated feature pairs (>0.85) - consider removing one from each pair:")
if redundant_pairs:
    for feat1, feat2, corr_val in redundant_pairs:
        print(f"  {feat1} <-> {feat2}: {corr_val:.3f}")
else:
    print("None")

In [0]:
%skip
print("ACTUAL FEATURE VARIABILITY:")
print("\n✅ GOOD VARIABILITY (Keep these):")
good_features = [
    "DISTANCE: 1,574 distinct values - GOOD",
    "prev_flight_arr_delay_clean: 1,446 distinct values - GOOD", 
    "actual_to_crs_time_to_next_flight_diff_mins_clean: 21,339 distinct values - EXCELLENT",
    "HourlyAltimeterSetting: 394 distinct values - GOOD",
    "ORIGIN/DEST coordinates: 354-355 distinct values - GOOD",
    "origin/dest_pagerank: 354-355 distinct values - GOOD",
    "ORIGIN/DEST_ELEVATION: 308 distinct values - ACCEPTABLE",
    "Cloud heights: 117-135 distinct values - ACCEPTABLE",
]
for feat in good_features:
    print(f"  • {feat}")

print("\n⚠️  LIMITED VARIABILITY (Consider consolidating):")
limited_features = [
    "HourlyWindSpeed: 58 distinct values - ACCEPTABLE but limited",
    "HourlyWindGustSpeed: 59 distinct values - ACCEPTABLE but limited",
    "DAY_OF_MONTH: 31 distinct values - ACCEPTABLE",
    "MONTH: 12 distinct values - ACCEPTABLE",
    "DAY_OF_WEEK: 7 distinct values - ACCEPTABLE",
    "overall_cloud_frac_0_1: 5 distinct values - LIMITED (but likely important)",
]
for feat in limited_features:
    print(f"  • {feat}")

print("\n❌ BINARY FEATURES (2 distinct values - 0/1 flags):")
binary_features = [
    'has_few', 'has_sct', 'has_bkn', 'has_ovc',
    'light', 'heavy', 'thunderstorm', 'rain_or_drizzle',
    'freezing_conditions', 'snow', 'hail_or_ice',
    'reduced_visibility', 'spatial_effects', 'unknown_precip',
    'IS_US_HOLIDAY'
]
print("These are binary indicators - NOT near-constant, just binary by design!")
print(f"  {len(binary_features)} binary features")

# The real question for binary features: what's the class balance?
print("\n" + "="*80)
print("BINARY FEATURE CLASS BALANCE ANALYSIS")
print("="*80)
print("Checking if any binary features are extremely imbalanced...\n")

imbalanced_features = []
for col in binary_features:
    if col in df_train_master.columns:
        # Get value counts
        counts = df_train_master.groupBy(col).count().collect()
        if len(counts) == 2:
            count_0 = [r['count'] for r in counts if r[col] == 0][0]
            count_1 = [r['count'] for r in counts if r[col] == 1][0]
            total = count_0 + count_1
            
            # Calculate minority class percentage
            minority_pct = min(count_0, count_1) / total * 100
            
            # Flag if minority class is < 1%
            if minority_pct < 1.0:
                imbalanced_features.append({
                    'feature': col,
                    'minority_pct': minority_pct,
                    'count_0': count_0,
                    'count_1': count_1
                })
                print(f"⚠️  {col}: {minority_pct:.3f}% minority class")
            elif minority_pct < 5.0:
                print(f"   {col}: {minority_pct:.2f}% minority class (somewhat rare)")

if imbalanced_features:
    print(f"\n🎯 HIGHLY IMBALANCED FEATURES (< 1% minority class): {len(imbalanced_features)}")
    print("These might not add much predictive power:")
    for feat in imbalanced_features:
        print(f"  • {feat['feature']}: {feat['minority_pct']:.3f}%")
else:
    print("\n✅ No extremely imbalanced binary features found")

## Cardinality Analysis for Categorical Columns

In [0]:
max_bins = 400

cols_to_check = cat_cols_to_use.copy()

print(f"Checking cardinality for {len(cols_to_check)} indexed categorical columns")

print(f"\nTest Data Cardinality")
test_cardinality = []
for col in cols_to_check:
    if col in df_test_final.columns:
        distinct_count = df_test_final.select(F.countDistinct(col)).first()[0]
        test_cardinality.append((col, distinct_count))
    else:
        print(f"Column not in test data: {col}")

print(f"\n{'Column':<40} {'Unique Values':>15} {'Exceeds maxBins':>20}")
print("-" * 77)
for col, count in sorted(test_cardinality, key=lambda x: x[1], reverse=True):
    exceeds = "⚠️ YES" if count > max_bins else "✓ OK"
    print(f"{col:<40} {count:>15,} {exceeds:>20}")

print(f"\nTraining Data Cardinality")
train_cardinality = []
for col in cols_to_check:
    if col in df_sample.columns:
        distinct_count = df_sample.select(F.countDistinct(col)).first()[0]
        train_cardinality.append((col, distinct_count))
    else:
        print(f"Column not in training data: {col}")

print(f"\n{'Column':<40} {'Unique Values':>15} {'Exceeds maxBins':>20}")
print("-" * 77)
for col, count in sorted(train_cardinality, key=lambda x: x[1], reverse=True):
    exceeds = "⚠️ YES" if count > max_bins else "✓ OK"
    print(f"{col:<40} {count:>15,} {exceeds:>20}")

Checking cardinality for 9 indexed categorical columns

Test Data Cardinality

Column                                     Unique Values      Exceeds maxBins
-----------------------------------------------------------------------------
ORIGIN_idx                                           347                 ✓ OK
DEST_idx                                             347                 ✓ OK
ORIGIN_STATE_ABR_idx                                  49                 ✓ OK
DEST_STATE_ABR_idx                                    49                 ✓ OK
CRS_ARR_TIME_BLOCK_idx                                25                 ✓ OK
CRS_DEP_TIME_BLOCK_idx                                24                 ✓ OK
OP_UNIQUE_CARRIER_idx                                 17                 ✓ OK
HourlyWindCardinalDirection_idx                       10                 ✓ OK
ORIGIN_SIZE_idx                                        3                 ✓ OK

Training Data Cardinality

Column                             

## Features Selection

In [0]:
print(f"\nUsing {len(numeric_cols_to_use)} numeric columns:")
print(numeric_cols_to_use)
print(f"\nUsing {len(cat_cols_to_use)} categorical columns:")
print(cat_cols_to_use)  


Using 26 numeric columns:
['DISTANCE', 'ORIGIN_ELEVATION_FT', 'DEST_ELEVATION_FT', 'overall_cloud_frac_0_1', 'lowest_cloud_ft', 'highest_cloud_ft', 'has_ovc', 'HourlyAltimeterSetting', 'HourlyWindGustSpeed', 'HourlyWindSpeed', 'light', 'rain_or_drizzle', 'snow', 'reduced_visibility', 'ORIGIN_LAT', 'ORIGIN_LONG', 'DEST_LAT', 'DEST_LON', 'MONTH', 'DAY_OF_MONTH', 'DAY_OF_WEEK', 'IS_US_HOLIDAY', 'origin_pagerank', 'dest_pagerank', 'prev_flight_arr_delay_clean', 'actual_to_crs_time_to_next_flight_diff_mins_clean']

Using 9 categorical columns:
['OP_UNIQUE_CARRIER_idx', 'ORIGIN_idx', 'DEST_idx', 'ORIGIN_STATE_ABR_idx', 'DEST_STATE_ABR_idx', 'ORIGIN_SIZE_idx', 'HourlyWindCardinalDirection_idx', 'CRS_DEP_TIME_BLOCK_idx', 'CRS_ARR_TIME_BLOCK_idx']


#### Function for Model Building

In [0]:
def plot_pr(preds, title="Precision–Recall Curve"):
    # get (score, label)
    rows = preds.select("probability", "ARR_DEL15").collect()
    data = [(float(r["probability"][1]), float(r["ARR_DEL15"])) for r in rows]

    # sort descending by score
    data.sort(key=lambda x: -x[0])

    tp = 0
    fp = 0
    positives = sum(y for _, y in data)

    recalls = []
    precisions = []

    for score, label in data:
        if label == 1:
            tp += 1
        else:
            fp += 1

        precision = tp / (tp + fp)
        recall    = tp / positives

        precisions.append(precision)
        recalls.append(recall)

    plt.figure(figsize=(7, 5))
    plt.plot(recalls, precisions)
    plt.xlabel("Recall")
    plt.ylabel("Precision")
    plt.title(title)
    plt.grid()
    plt.show()


In [0]:
def cv_eval(preds):
    """
    Input: preds = Spark DF with prediction, ARR_DEL15, probability
    Output: F2, PR-AUC, Precision, Recall, TP, FP, FN, TN
    """
    rdd_preds_m = preds.select(
        ['prediction', 'ARR_DEL15']
    ).rdd.map(lambda row: (float(row['prediction']), float(row['ARR_DEL15'])))

    rdd_preds_b = preds.select(
        ['probability', 'ARR_DEL15']
    ).rdd.map(lambda row: (float(row['probability'][1]), float(row['ARR_DEL15'])))

    metrics_m = MulticlassMetrics(rdd_preds_m)
    metrics_b = BinaryClassificationMetrics(rdd_preds_b)

    precision = round(metrics_m.precision(1.0), 4)
    recall = round(metrics_m.recall(1.0), 4)
    F2 = round(metrics_m.fMeasure(label=1.0, beta=2.0), 4)
    pr_auc = round(metrics_b.areaUnderPR, 4)

    cm = metrics_m.confusionMatrix().toArray()

    TN = int(cm[0, 0])
    FP = int(cm[0, 1])
    FN = int(cm[1, 0])
    TP = int(cm[1, 1])

    return F2, pr_auc, precision, recall, TP, FP, FN, TN

In [0]:
def get_rf_model(params, cat_cols_to_use, numeric_cols_to_use, use_weights=False):
    from pyspark.ml.classification import RandomForestClassifier
    from pyspark.ml import Pipeline
    from pyspark.ml.feature import VectorAssembler
    
    final_assembler = VectorAssembler(
        inputCols=numeric_cols_to_use + cat_cols_to_use,  
        outputCol="features", 
        handleInvalid="keep"
    )
    
    rf_params = {
        'labelCol': "ARR_DEL15",
        'featuresCol': "features",
        'maxDepth': params['maxDepth'],
        'numTrees': params['numTrees'],
        'maxBins': params.get('maxBins', 400),
        'minInstancesPerNode': params.get('minInstancesPerNode', 1),
        'featureSubsetStrategy': params.get('featureSubsetStrategy', 'auto'),
        'seed': 42
    }
    
    if use_weights:
        rf_params['weightCol'] = 'weight'
    
    rf = RandomForestClassifier(**rf_params)
    pipeline = Pipeline(stages=[final_assembler, rf])
    return pipeline

In [0]:
def model_eval_on_folds(folds_input_path, param_grid, cat_cols_to_use, numeric_cols_to_use, 
                        N_SPLITS=N_SPLITS, plot_pr_curve_func=None, use_weights=False):
    """
    Evaluate Random Forest model on pre-processed rolling windows
    
    Args:
        folds_input_path: path to transform_rolling_window (already preprocessed)
        param_grid: dict with RF parameters
        cat_cols_to_use: filtered categorical columns
        numeric_cols_to_use: filtered numeric columns
        N_SPLITS: number of windows
        plot_pr_curve_func: function to plot PR curve (optional)
    
    Returns:
        Dictionary with metrics and best parameters
    """
    from pyspark.sql import functions as F
    from itertools import product
    import numpy as np
    
    # Generate parameter combinations
    parameter_names = list(param_grid.keys())
    parameter_values = list(param_grid.values())
    parameters = list(product(*parameter_values))
    
    best_score = 0
    best_parameters = None
    best_fold_predictions = []
    all_results = []
    
    print(f"\nEvaluating Random Forest Model")
    print(f"Number of windows: {N_SPLITS}")
    print(f"Parameter combinations to test: {len(parameters)}")
    print("="*80)
    
    # Loop through parameter combinations
    for p in parameters:
        param_print = {x[0]:x[1] for x in zip(parameter_names, p)}
        print(f"\nTesting Parameters: {param_print}")
        print("-"*80)
        
        scores_f2 = []
        scores_pr = []
        scores_precision = []
        scores_recall = []
        confusion_list = []
        fold_predictions = []
        
        # Loop through ALL windows
        for i in range(N_SPLITS):
            print(f"  Window {i+1}:")
            train_path = f"{folds_input_path}/window_{i+1}_train"
            val_path = f"{folds_input_path}/window_{i+1}_val"
            # Load preprocessed data
            train_df = spark.read.parquet(f"{folds_input_path}/window_{i+1}_train")
            val_df = spark.read.parquet(f"{folds_input_path}/window_{i+1}_val")
            print(f"    Train path: {train_path}")  # Added this line
            print(f"    Val path: {val_path}")   
            
            print(f"    Train samples: {train_df.count():,}")
            print(f"    Val samples: {val_df.count():,}")
            
            # Build and train model
            pipeline = get_rf_model(param_print, cat_cols_to_use, numeric_cols_to_use, use_weights=False)
            model = pipeline.fit(train_df)
            preds = model.transform(val_df)
            
            # Compute metrics
            f2, pr_auc, precision, recall, tp, fp, fn, tn = cv_eval(preds)
            scores_f2.append(f2)
            scores_pr.append(pr_auc)
            scores_precision.append(precision)
            scores_recall.append(recall)
            confusion_list.append({"TP": tp, "FP": fp, "FN": fn, "TN": tn})
            fold_predictions.append((i+1, preds, f2, pr_auc, precision, recall, tp, fp, fn, tn))
            
            print(f"    F2: {f2:.4f}, PR-AUC: {pr_auc:.4f}, Precision: {precision:.4f}, Recall: {recall:.4f}")
            print(f"    TP={tp}, FP={fp}, FN={fn}, TN={tn}")
        
        # Calculate average scores for this parameter combination
        avg_f2 = np.mean(scores_f2)
        avg_pr = np.mean(scores_pr)
        avg_precision = np.mean(scores_precision)
        avg_recall = np.mean(scores_recall)
        
        print(f"\n  Average across {N_SPLITS} windows:")
        print(f"    F2: {avg_f2:.4f} (±{np.std(scores_f2):.4f})")
        print(f"    PR-AUC: {avg_pr:.4f} (±{np.std(scores_pr):.4f})")
        print(f"    Precision: {avg_precision:.4f} (±{np.std(scores_precision):.4f})")
        print(f"    Recall: {avg_recall:.4f} (±{np.std(scores_recall):.4f})")
        
        # Update best parameters
        if avg_f2 > best_score:
            best_score = avg_f2
            best_parameters = param_print
            best_fold_predictions = fold_predictions
            print(f"  ✓ NEW BEST F2: {best_score:.4f}")
        
        # Store results
        all_results.append({
            'parameters': param_print,
            'f2_scores': scores_f2,
            'pr_auc_scores': scores_pr,
            'precision_scores': scores_precision,
            'recall_scores': scores_recall,
            'confusion': confusion_list,
            'f2_mean': float(avg_f2),
            'f2_std': float(np.std(scores_f2)),
            'pr_auc_mean': float(avg_pr),
            'pr_auc_std': float(np.std(scores_pr)),
            'precision_mean': float(avg_precision),
            'precision_std': float(np.std(scores_precision)),
            'recall_mean': float(avg_recall),
            'recall_std': float(np.std(scores_recall))
        })
        
        print("="*80)
    
    # Plot PR curves for all windows of best parameter combination
    # if plot_pr_curve_func and best_fold_predictions:
    #     print(f"\nPlotting PR curves for best parameters: {best_parameters}")
    #     for fold_num, preds, f2, pr_auc, precision, recall, tp, fp, fn, tn in best_fold_predictions:
    #         plot_pr_curve_func(preds, title=f"Best Model - Window {fold_num} (F2={f2:.4f}, PR-AUC={pr_auc:.4f}, P={precision:.4f}, R={recall:.4f})")
    
    # Get best result
    best_result = [r for r in all_results if r['f2_mean'] == best_score][0]
    
    print("\n" + "="*80)
    print("MODEL EVALUATION COMPLETE")
    print("="*80)
    print(f"Best Parameters: {best_parameters}")
    print(f"Best F2: {best_result['f2_mean']:.4f} (±{best_result['f2_std']:.4f})")
    print(f"Best PR-AUC: {best_result['pr_auc_mean']:.4f} (±{best_result['pr_auc_std']:.4f})")
    print(f"Best Precision: {best_result['precision_mean']:.4f} (±{best_result['precision_std']:.4f})")
    print(f"Best Recall: {best_result['recall_mean']:.4f} (±{best_result['recall_std']:.4f})")
    print("="*80)
    
    return {
        'best_parameters': best_parameters,
        'best_f2_mean': best_result['f2_mean'],
        'best_f2_std': best_result['f2_std'],
        'best_pr_auc_mean': best_result['pr_auc_mean'],
        'best_pr_auc_std': best_result['pr_auc_std'],
        'best_precision_mean': best_result['precision_mean'],
        'best_precision_std': best_result['precision_std'],
        'best_recall_mean': best_result['recall_mean'],
        'best_recall_std': best_result['recall_std'],
        'best_f2_scores': best_result['f2_scores'],
        'best_pr_auc_scores': best_result['pr_auc_scores'],
        'best_precision_scores': best_result['precision_scores'],
        'best_recall_scores': best_result['recall_scores'],
        'best_confusion': best_result['confusion'],
        'all_results': all_results
    }

In [0]:
param_grid = {
  'maxDepth': [10],                  
  'numTrees': [20],               
  'maxBins': [400],                  
  'minInstancesPerNode': [10],        
  'featureSubsetStrategy': ['sqrt']
}


In [0]:
%skip
rf_results = model_eval_on_folds(
    folds_input_path=folds_input_path,
    param_grid=param_grid,
    cat_cols_to_use=cat_cols_to_use,
    numeric_cols_to_use=numeric_cols_to_use,
    N_SPLITS=N_SPLITS,
    plot_pr_curve_func=plot_pr
)

### Model w/ ML Flow

In [0]:
import mlflow
import mlflow.spark

experiment_name = "/Workspace/Shared/Team_2_1/rf/random_forest_basic"

try:
    experiment_id = mlflow.create_experiment(
        name=experiment_name,
        artifact_location="dbfs:/student-groups/Group_02_01/experiments/rf"
    )
except Exception as e:
    print(f"Experiment already exists or error creating: {e}")
    experiment = mlflow.get_experiment_by_name(experiment_name)
    if experiment:
        experiment_id = experiment.experiment_id
    else:
        print("ERROR: Experiment doesn't exist and couldn't be created.")
        print("Make sure /Workspace/Shared/Team_2_1/rf/ folder exists in Databricks workspace")
        raise e

mlflow.set_experiment(experiment_name)

Experiment already exists or error creating: RESOURCE_ALREADY_EXISTS: Node named 'random_forest_basic' already exists


<Experiment: artifact_location='dbfs:/student-groups/Group_02_01/experiments/rf', creation_time=1764631967974, experiment_id='3915201981340333', last_update_time=1764631976232, lifecycle_stage='active', name='/Shared/Team_2_1/rf/random_forest_basic', tags={'mlflow.experiment.sourceName': '/Shared/Team_2_1/rf/random_forest_basic',
 'mlflow.experimentType': 'MLFLOW_EXPERIMENT',
 'mlflow.ownerEmail': 'Team_2_1',
 'mlflow.ownerId': '839106675862014'}>

In [0]:
experiment_name_mlflow = "/Workspace/Shared/Team_2_1/rf/random_forest_grid_search"

try:
    experiment_id_mlflow = mlflow.create_experiment(
        name=experiment_name_mlflow,
        artifact_location="dbfs:/student-groups/Group_02_01/experiments/rf"
    )
except Exception as e:
    experiment_mlflow = mlflow.get_experiment_by_name(experiment_name_mlflow)
    if experiment_mlflow:
        experiment_id_mlflow = experiment_mlflow.experiment_id

mlflow.set_experiment(experiment_name_mlflow)

param_grid_mlflow = {
    'maxDepth': [5,8,10],
    'numTrees': [20],
    'maxBins': [400],
    'minInstancesPerNode': [5,10,20],
    'featureSubsetStrategy': ['sqrt']
}

parameter_names_mlflow = list(param_grid_mlflow.keys())
parameter_values_mlflow = list(param_grid_mlflow.values())
parameters_mlflow = list(product(*parameter_values_mlflow))

print(f"Total parameter combinations: {len(parameters_mlflow)}")
print(f"Total runs (combinations × windows): {len(parameters_mlflow) * N_SPLITS}")

best_f2_mlflow = 0
best_params_mlflow = None

for p in parameters_mlflow:
    param_dict_mlflow = {x[0]:x[1] for x in zip(parameter_names_mlflow, p)}
    print(f"\nTesting: {param_dict_mlflow}")
    
    f2_scores_mlflow = []
    pr_scores_mlflow = []
    precision_scores_mlflow = []
    recall_scores_mlflow = []
    val_f2_scores = []
    val_pr_scores = []
    val_precision_scores = []
    val_recall_scores = []
    
    for i in range(N_SPLITS):
        train_path_mlflow = f"{folds_input_path}/window_{i+1}_train"
        val_path_mlflow = f"{folds_input_path}/window_{i+1}_val"
        
        train_df_mlflow = spark.read.parquet(train_path_mlflow)
        val_df_mlflow = spark.read.parquet(val_path_mlflow)
        
        with mlflow.start_run(run_name=f"d{param_dict_mlflow['maxDepth']}_t{param_dict_mlflow['numTrees']}_w{i+1}"):
            pipeline_mlflow = get_rf_model(param_dict_mlflow, cat_cols_to_use, numeric_cols_to_use, use_weights=False)
            model_mlflow = pipeline_mlflow.fit(train_df_mlflow)
            preds_mlflow = model_mlflow.transform(val_df_mlflow)
            
            # Metrics for validation data
            val_f2, val_pr_auc, val_precision, val_recall, val_tp, val_fp, val_fn, val_tn = cv_eval(preds_mlflow)
            val_f2_scores.append(val_f2)
            val_pr_scores.append(val_pr_auc)
            val_precision_scores.append(val_precision)
            val_recall_scores.append(val_recall)
            
            # Metrics for training data
            train_preds = model_mlflow.transform(train_df_mlflow)
            train_f2, train_pr_auc, train_precision, train_recall, train_tp, train_fp, train_fn, train_tn = cv_eval(train_preds)
            f2_scores_mlflow.append(train_f2)
            pr_scores_mlflow.append(train_pr_auc)
            precision_scores_mlflow.append(train_precision)
            recall_scores_mlflow.append(train_recall)
            
            # Log parameters
            mlflow.log_param("maxDepth", param_dict_mlflow['maxDepth'])
            mlflow.log_param("numTrees", param_dict_mlflow['numTrees'])
            mlflow.log_param("maxBins", param_dict_mlflow['maxBins'])
            mlflow.log_param("minInstancesPerNode", param_dict_mlflow['minInstancesPerNode'])
            mlflow.log_param("featureSubsetStrategy", param_dict_mlflow['featureSubsetStrategy'])
            mlflow.log_param("window", i+1)
            
            # Log training metrics
            mlflow.log_metric("train_f2_score", train_f2)
            mlflow.log_metric("train_pr_auc", train_pr_auc)
            mlflow.log_metric("train_precision", train_precision)
            mlflow.log_metric("train_recall", train_recall)
            mlflow.log_metric("train_true_positives", train_tp)
            mlflow.log_metric("train_false_positives", train_fp)
            mlflow.log_metric("train_false_negatives", train_fn)
            mlflow.log_metric("train_true_negatives", train_tn)
            
            # Log validation metrics
            mlflow.log_metric("val_f2_score", val_f2)
            mlflow.log_metric("val_pr_auc", val_pr_auc)
            mlflow.log_metric("val_precision", val_precision)
            mlflow.log_metric("val_recall", val_recall)
            mlflow.log_metric("val_true_positives", val_tp)
            mlflow.log_metric("val_false_positives", val_fp)
            mlflow.log_metric("val_false_negatives", val_fn)
            mlflow.log_metric("val_true_negatives", val_tn)
            
            mlflow.spark.log_model(model_mlflow, artifact_path="rf_model")
            
            print(f"  Window {i+1}:")
            print(f"    TRAIN  - F2={train_f2:.4f}, PR-AUC={train_pr_auc:.4f}, Precision={train_precision:.4f}, Recall={train_recall:.4f}")
            print(f"    VAL    - F2={val_f2:.4f}, PR-AUC={val_pr_auc:.4f}, Precision={val_precision:.4f}, Recall={val_recall:.4f}")
    
    avg_f2_mlflow = np.mean(val_f2_scores)
    avg_pr_mlflow = np.mean(val_pr_scores)
    avg_precision_mlflow = np.mean(val_precision_scores)
    avg_recall_mlflow = np.mean(val_recall_scores)
    
    print(f"  Average: F2={avg_f2_mlflow:.4f} (±{np.std(val_f2_scores):.4f}), PR-AUC={avg_pr_mlflow:.4f}, Precision={avg_precision_mlflow:.4f}, Recall={avg_recall_mlflow:.4f}")
    
    if avg_f2_mlflow > best_f2_mlflow:
        best_f2_mlflow = avg_f2_mlflow
        best_params_mlflow = param_dict_mlflow
        print(f"  ✓ NEW BEST!")

print("\n" + "="*80)
print("GRID SEARCH COMPLETE")
print("="*80)
print(f"Best Parameters: {best_params_mlflow}")
print(f"Best F2: {best_f2_mlflow:.4f}")
print("="*80)

Total parameter combinations: 9
Total runs (combinations × windows): 18

Testing: {'maxDepth': 5, 'numTrees': 20, 'maxBins': 400, 'minInstancesPerNode': 5, 'featureSubsetStrategy': 'sqrt'}




  Window 1:
    TRAIN  - F2=0.6322, PR-AUC=0.6575, Precision=0.6371, Recall=0.6310
    VAL    - F2=0.5079, PR-AUC=0.3237, Precision=0.2971, Recall=0.6174




  Window 2:
    TRAIN  - F2=0.6322, PR-AUC=0.6702, Precision=0.6454, Recall=0.6290
    VAL    - F2=0.5048, PR-AUC=0.3248, Precision=0.3009, Recall=0.6078
  Average: F2=0.5064 (±0.0015), PR-AUC=0.3242, Precision=0.2990, Recall=0.6126
  ✓ NEW BEST!

Testing: {'maxDepth': 5, 'numTrees': 20, 'maxBins': 400, 'minInstancesPerNode': 10, 'featureSubsetStrategy': 'sqrt'}




  Window 1:
    TRAIN  - F2=0.6322, PR-AUC=0.6575, Precision=0.6371, Recall=0.6310
    VAL    - F2=0.5079, PR-AUC=0.3237, Precision=0.2971, Recall=0.6174




  Window 2:
    TRAIN  - F2=0.6322, PR-AUC=0.6702, Precision=0.6454, Recall=0.6290
    VAL    - F2=0.5048, PR-AUC=0.3248, Precision=0.3009, Recall=0.6078
  Average: F2=0.5064 (±0.0015), PR-AUC=0.3242, Precision=0.2990, Recall=0.6126

Testing: {'maxDepth': 5, 'numTrees': 20, 'maxBins': 400, 'minInstancesPerNode': 20, 'featureSubsetStrategy': 'sqrt'}




  Window 1:
    TRAIN  - F2=0.6322, PR-AUC=0.6575, Precision=0.6371, Recall=0.6310
    VAL    - F2=0.5079, PR-AUC=0.3237, Precision=0.2971, Recall=0.6174




  Window 2:
    TRAIN  - F2=0.6322, PR-AUC=0.6702, Precision=0.6454, Recall=0.6290
    VAL    - F2=0.5048, PR-AUC=0.3248, Precision=0.3009, Recall=0.6078
  Average: F2=0.5064 (±0.0015), PR-AUC=0.3242, Precision=0.2990, Recall=0.6126

Testing: {'maxDepth': 8, 'numTrees': 20, 'maxBins': 400, 'minInstancesPerNode': 5, 'featureSubsetStrategy': 'sqrt'}




  Window 1:
    TRAIN  - F2=0.6591, PR-AUC=0.6768, Precision=0.6454, Recall=0.6626
    VAL    - F2=0.5286, PR-AUC=0.3450, Precision=0.3039, Recall=0.6485




  Window 2:
    TRAIN  - F2=0.6649, PR-AUC=0.6888, Precision=0.6538, Recall=0.6677
    VAL    - F2=0.5251, PR-AUC=0.3403, Precision=0.3069, Recall=0.6386
  Average: F2=0.5269 (±0.0017), PR-AUC=0.3427, Precision=0.3054, Recall=0.6435
  ✓ NEW BEST!

Testing: {'maxDepth': 8, 'numTrees': 20, 'maxBins': 400, 'minInstancesPerNode': 10, 'featureSubsetStrategy': 'sqrt'}




  Window 1:
    TRAIN  - F2=0.6639, PR-AUC=0.6760, Precision=0.6440, Recall=0.6691
    VAL    - F2=0.5309, PR-AUC=0.3444, Precision=0.3026, Recall=0.6543




  Window 2:
    TRAIN  - F2=0.6638, PR-AUC=0.6892, Precision=0.6534, Recall=0.6664
    VAL    - F2=0.5242, PR-AUC=0.3417, Precision=0.3068, Recall=0.6370
  Average: F2=0.5275 (±0.0034), PR-AUC=0.3430, Precision=0.3047, Recall=0.6457
  ✓ NEW BEST!

Testing: {'maxDepth': 8, 'numTrees': 20, 'maxBins': 400, 'minInstancesPerNode': 20, 'featureSubsetStrategy': 'sqrt'}




  Window 1:
    TRAIN  - F2=0.6641, PR-AUC=0.6772, Precision=0.6436, Recall=0.6695
    VAL    - F2=0.5302, PR-AUC=0.3451, Precision=0.3020, Recall=0.6538




  Window 2:
    TRAIN  - F2=0.6650, PR-AUC=0.6885, Precision=0.6528, Recall=0.6681
    VAL    - F2=0.5248, PR-AUC=0.3409, Precision=0.3059, Recall=0.6392
  Average: F2=0.5275 (±0.0027), PR-AUC=0.3430, Precision=0.3039, Recall=0.6465

Testing: {'maxDepth': 10, 'numTrees': 20, 'maxBins': 400, 'minInstancesPerNode': 5, 'featureSubsetStrategy': 'sqrt'}




  Window 1:
    TRAIN  - F2=0.6700, PR-AUC=0.6892, Precision=0.6522, Recall=0.6746
    VAL    - F2=0.5354, PR-AUC=0.3558, Precision=0.3085, Recall=0.6560




  Window 2:
    TRAIN  - F2=0.6745, PR-AUC=0.7016, Precision=0.6611, Recall=0.6779
    VAL    - F2=0.5315, PR-AUC=0.3516, Precision=0.3109, Recall=0.6462
  Average: F2=0.5334 (±0.0020), PR-AUC=0.3537, Precision=0.3097, Recall=0.6511
  ✓ NEW BEST!

Testing: {'maxDepth': 10, 'numTrees': 20, 'maxBins': 400, 'minInstancesPerNode': 10, 'featureSubsetStrategy': 'sqrt'}




  Window 1:
    TRAIN  - F2=0.6729, PR-AUC=0.6897, Precision=0.6510, Recall=0.6786
    VAL    - F2=0.5375, PR-AUC=0.3572, Precision=0.3076, Recall=0.6610




  Window 2:
    TRAIN  - F2=0.6753, PR-AUC=0.7007, Precision=0.6605, Recall=0.6791
    VAL    - F2=0.5321, PR-AUC=0.3511, Precision=0.3105, Recall=0.6477
  Average: F2=0.5348 (±0.0027), PR-AUC=0.3542, Precision=0.3090, Recall=0.6543
  ✓ NEW BEST!

Testing: {'maxDepth': 10, 'numTrees': 20, 'maxBins': 400, 'minInstancesPerNode': 20, 'featureSubsetStrategy': 'sqrt'}




  Window 1:
    TRAIN  - F2=0.6689, PR-AUC=0.6897, Precision=0.6521, Recall=0.6732
    VAL    - F2=0.5348, PR-AUC=0.3572, Precision=0.3085, Recall=0.6548




  Window 2:
    TRAIN  - F2=0.6754, PR-AUC=0.7009, Precision=0.6608, Recall=0.6791
    VAL    - F2=0.5320, PR-AUC=0.3514, Precision=0.3110, Recall=0.6470
  Average: F2=0.5334 (±0.0014), PR-AUC=0.3543, Precision=0.3097, Recall=0.6509

GRID SEARCH COMPLETE
Best Parameters: {'maxDepth': 10, 'numTrees': 20, 'maxBins': 400, 'minInstancesPerNode': 10, 'featureSubsetStrategy': 'sqrt'}
Best F2: 0.5348


In [0]:
from mlflow.tracking import MlflowClient

client = MlflowClient()
experiment_id_mlflow = mlflow.get_experiment_by_name("/Workspace/Shared/Team_2_1/rf/random_forest_grid_search").experiment_id

runs = client.search_runs(
    experiment_id_mlflow, 
    filter_string="attribute.start_time >= 1764974781390",
    order_by=["metrics.val_f2_score DESC"]
)

results = []
for run in runs:
    start_time = run.info.start_time
    end_time = run.info.end_time
    wall_time_sec = None
    if start_time and end_time:
        wall_time_sec = (end_time - start_time) / 1000.0  # ms to seconds
    results.append({
        "run_id": run.info.run_id,
        "maxDepth": run.data.params.get("maxDepth"),
        "numTrees": run.data.params.get("numTrees"),
        "maxBins": run.data.params.get("maxBins"),
        "minInstancesPerNode": run.data.params.get("minInstancesPerNode"),
        "featureSubsetStrategy": run.data.params.get("featureSubsetStrategy"),
        "window": run.data.params.get("window"),
        "train_f2_score": run.data.metrics.get("train_f2_score"),
        "train_pr_auc": run.data.metrics.get("train_pr_auc"),
        "train_precision": run.data.metrics.get("train_precision"),
        "train_recall": run.data.metrics.get("train_recall"),
        "train_true_positives": run.data.metrics.get("train_true_positives"),
        "train_false_positives": run.data.metrics.get("train_false_positives"),
        "train_false_negatives": run.data.metrics.get("train_false_negatives"),
        "train_true_negatives": run.data.metrics.get("train_true_negatives"),
        "val_f2_score": run.data.metrics.get("val_f2_score"),
        "val_pr_auc": run.data.metrics.get("val_pr_auc"),
        "val_precision": run.data.metrics.get("val_precision"),
        "val_recall": run.data.metrics.get("val_recall"),
        "val_true_positives": run.data.metrics.get("val_true_positives"),
        "val_false_positives": run.data.metrics.get("val_false_positives"),
        "val_false_negatives": run.data.metrics.get("val_false_negatives"),
        "val_true_negatives": run.data.metrics.get("val_true_negatives"),
        "wall_time_sec": wall_time_sec
    })

results_df = spark.createDataFrame(pd.DataFrame(results))
print(f"Total runs exported: {results_df.count()}")
display(results_df)

Total runs exported: 25


run_id,maxDepth,numTrees,maxBins,minInstancesPerNode,featureSubsetStrategy,window,train_f2_score,train_pr_auc,train_precision,train_recall,train_true_positives,train_false_positives,train_false_negatives,train_true_negatives,val_f2_score,val_pr_auc,val_precision,val_recall,val_true_positives,val_false_positives,val_false_negatives,val_true_negatives,wall_time_sec
13cd55668ef743608fd8b5957fb55a70,10.0,20.0,400.0,5.0,sqrt,1.0,0.6745,0.691,0.6501,0.6808,1380375.0,742936.0,647104.0,1286433.0,0.539,0.3578,0.3076,0.6638,683921.0,1539408.0,346464.0,3002226.0,131.091
e82dd769a9be45d2a5276b0bb7cfa494,10.0,20.0,400.0,10.0,sqrt,1.0,0.6729,0.6897,0.651,0.6786,1375938.0,737800.0,651541.0,1291569.0,0.5375,0.3572,0.3076,0.661,681119.0,1533284.0,349266.0,3008350.0,120.113
d68ab5ac105742ac9a364f9ff188a208,10.0,20.0,400.0,5.0,sqrt,1.0,0.67,0.6892,0.6522,0.6746,1367772.0,729329.0,659707.0,1300040.0,0.5354,0.3558,0.3085,0.656,675931.0,1514899.0,354454.0,3026735.0,119.443
619b54481cc247a7a6a454a53b8e07c4,10.0,20.0,400.0,20.0,sqrt,1.0,0.6689,0.6897,0.6521,0.6732,1364968.0,728125.0,662511.0,1301244.0,0.5348,0.3572,0.3085,0.6548,674734.0,1512493.0,355651.0,3029141.0,119.287
68257ba2a0f541baa00265dffe975508,10.0,20.0,400.0,5.0,sqrt,2.0,0.6777,0.7022,0.6613,0.682,1359390.0,696237.0,633898.0,1298991.0,0.5323,0.3501,0.3097,0.6489,874465.0,1948950.0,473073.0,3789677.0,220.107
909e9b223baf4edea7ffb75f714b6c27,10.0,20.0,400.0,10.0,sqrt,2.0,0.6753,0.7007,0.6605,0.6791,1353646.0,695867.0,639642.0,1299361.0,0.5321,0.3511,0.3105,0.6477,872838.0,1938398.0,474700.0,3800229.0,130.828
9b098e8663a64d91b4bffe257801313e,10.0,20.0,400.0,5.0,sqrt,2.0,0.6772,0.702,0.6605,0.6816,1358545.0,698340.0,634743.0,1296888.0,0.5321,0.3503,0.3091,0.6492,874768.0,1955257.0,472770.0,3783370.0,145.111
656e52ab333740d1b26371eaa55ff3b3,10.0,20.0,400.0,20.0,sqrt,2.0,0.6754,0.7009,0.6608,0.6791,1353639.0,694935.0,639649.0,1300293.0,0.532,0.3514,0.311,0.647,871863.0,1931769.0,475675.0,3806858.0,130.762
f99c0a4821b447caaaebe55394134a1e,10.0,20.0,400.0,5.0,sqrt,2.0,0.6745,0.7016,0.6611,0.6779,1351259.0,692809.0,642029.0,1302419.0,0.5315,0.3516,0.3109,0.6462,870743.0,1930320.0,476795.0,3808307.0,132.651
1e44450e1242446bb31a6cccad7a3162,8.0,20.0,400.0,10.0,sqrt,1.0,0.6639,0.676,0.644,0.6691,1356610.0,750024.0,670869.0,1279345.0,0.5309,0.3444,0.3026,0.6543,674144.0,1553841.0,356241.0,2987793.0,95.801


In [0]:
from mlflow.tracking import MlflowClient

client = MlflowClient()
runs = client.search_runs(experiment_id_mlflow, order_by=["metrics.f2_score DESC"])

best_run_id = runs[0].info.run_id

# Load feature importance from the best run's model
model_uri = f"runs:/{best_run_id}/rf_model"
rf_model = mlflow.spark.load_model(model_uri)

# Extract feature importances and feature names
importances = rf_model.stages[-1].featureImportances
feature_names = rf_model.stages[0].getInputCols()

# Pair and sort features by importance
feature_importance = sorted(zip(feature_names, importances), key=lambda x: x[1], reverse=True)

print("Top features by importance:")
for name, score in feature_importance:
    print(f"{name}: {score:.4f}")

Top features by importance:
actual_to_crs_time_to_next_flight_diff_mins_clean: 0.2836
crs_time_to_next_flight_diff_mins: 0.1688
CRS_ARR_TIME_BLOCK_idx: 0.0804
CRS_DEP_TIME_BLOCK_idx: 0.0568
DEST_idx: 0.0346
MONTH: 0.0310
ORIGIN_idx: 0.0272
prev_flight_arr_delay_clean: 0.0233
OP_UNIQUE_CARRIER_idx: 0.0230
HourlyAltimeterSetting: 0.0178
origin_pagerank: 0.0164
DEST_LON: 0.0140
lowest_cloud_ft: 0.0130
ORIGIN_LONG: 0.0130
origin_out_degree: 0.0126
DAY_OF_MONTH: 0.0121
DEST_STATE_ABR_idx: 0.0119
ORIGIN_STATE_ABR_idx: 0.0116
DISTANCE: 0.0114
CRS_ELAPSED_TIME: 0.0114
DEST_ELEVATION_FT: 0.0113
dest_in_degree: 0.0104
dest_pagerank: 0.0097
snow: 0.0092
ORIGIN_LAT: 0.0090
has_ovc: 0.0082
ORIGIN_ELEVATION_FT: 0.0076
DEST_LAT: 0.0075
overall_cloud_frac_0_1: 0.0072
QUARTER: 0.0065
HourlyWindCardinalDirection_idx: 0.0062
highest_cloud_ft: 0.0059
DAY_OF_WEEK: 0.0056
HourlyWindSpeed: 0.0046
IS_US_HOLIDAY: 0.0043
reduced_visibility: 0.0031
light: 0.0022
rain_or_drizzle: 0.0019
HourlyWindGustSpeed: 0.001

### Blind Test Model Run

In [0]:
best_parameters = {
    'maxDepth': 10, 
    'numTrees': 20, 
    'maxBins': 400, 
    'minInstancesPerNode': 5, 
    'featureSubsetStrategy': 'sqrt'
}

columns_to_keep = list(set(numeric_cols_to_use + cat_cols_to_use + ['ARR_DEL15']))
train_full_df = df_train_master.select(columns_to_keep)
print(f"\nTotal training rows: {train_full_df.count():,}")

pipeline = get_rf_model(best_parameters, cat_cols_to_use, numeric_cols_to_use, use_weights=False)

print(f"Training model on downsampled data...")
model = pipeline.fit(train_full_df)

test_df = spark.read.parquet(blind_test_set_path).select(columns_to_keep)
print(f"Test samples: {test_df.count():,}")
print(f"Running predictions...")

test_pred = model.transform(test_df)

f2, pr_auc, precision, recall, tp, fp, fn, tn = cv_eval(test_pred)

print("\nBLIND TEST RESULTS:")
print("=" * 80)
print(f"F2 Score:  {f2:.4f}")
print(f"PR-AUC:    {pr_auc:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall:    {recall:.4f}")
print(f"\nConfusion Matrix:")
print(f"  TP={tp:,}, FP={fp:,}")
print(f"  FN={fn:,}, TN={tn:,}")
print("=" * 80)


Total training rows: 8,811,266
Training model on downsampled data...
Test samples: 7,271,711
Running predictions...

BLIND TEST RESULTS:
F2 Score:  0.5453
PR-AUC:    0.3662
Precision: 0.3102
Recall:    0.6728

Confusion Matrix:
  TP=933,786, FP=2,076,416
  FN=454,118, TN=3,807,391


#### Full_unscaled_features model

In [0]:
def get_rf_model_with_prebuilt_features(params, feature_col='full_unscaled_features', use_weights=False):
    from pyspark.ml.classification import RandomForestClassifier
    from pyspark.ml import Pipeline
    
    rf_params = {
        'labelCol': "ARR_DEL15",
        'featuresCol': feature_col,
        'maxDepth': params['maxDepth'],
        'numTrees': params['numTrees'],
        'maxBins': params.get('maxBins', 400),
        'minInstancesPerNode': params.get('minInstancesPerNode', 1),
        'featureSubsetStrategy': params.get('featureSubsetStrategy', 'auto'),
        'seed': 42
    }
    
    if use_weights:
        rf_params['weightCol'] = 'weight'
    
    rf = RandomForestClassifier(**rf_params)
    pipeline = Pipeline(stages=[rf])
    return pipeline


def model_eval_on_folds_prebuilt(folds_input_path, param_grid, feature_col='full_unscaled_features',
                                  N_SPLITS=N_SPLITS, use_weights=False):
    from pyspark.sql import functions as F
    from itertools import product
    import numpy as np
    
    parameter_names = list(param_grid.keys())
    parameter_values = list(param_grid.values())
    parameters = list(product(*parameter_values))
    
    best_score = 0
    best_parameters = None
    best_fold_predictions = []
    all_results = []
    
    print(f"\nEvaluating Random Forest Model with Pre-built Features")
    print(f"Feature column: {feature_col}")
    print(f"Number of windows: {N_SPLITS}")
    print(f"Parameter combinations to test: {len(parameters)}")
    print("="*80)
    
    for p in parameters:
        param_print = {x[0]:x[1] for x in zip(parameter_names, p)}
        print(f"\nTesting Parameters: {param_print}")
        print("-"*80)
        
        scores_f2 = []
        scores_pr = []
        scores_precision = []
        scores_recall = []
        confusion_list = []
        fold_predictions = []
        
        for i in range(N_SPLITS):
            print(f"  Window {i+1}:")
            train_path = f"{folds_input_path}/window_{i+1}_train"
            val_path = f"{folds_input_path}/window_{i+1}_val"
            
            train_df = spark.read.parquet(train_path)
            val_df = spark.read.parquet(val_path)
            
            print(f"    Train path: {train_path}")
            print(f"    Val path: {val_path}")
            print(f"    Train samples: {train_df.count():,}")
            print(f"    Val samples: {val_df.count():,}")
            
            pipeline = get_rf_model_with_prebuilt_features(param_print, feature_col, use_weights)
            model = pipeline.fit(train_df)
            preds = model.transform(val_df)
            
            f2, pr_auc, precision, recall, tp, fp, fn, tn = cv_eval(preds)
            scores_f2.append(f2)
            scores_pr.append(pr_auc)
            scores_precision.append(precision)
            scores_recall.append(recall)
            confusion_list.append({"TP": tp, "FP": fp, "FN": fn, "TN": tn})
            fold_predictions.append((i+1, preds, f2, pr_auc, precision, recall, tp, fp, fn, tn))
            
            print(f"    F2: {f2:.4f}, PR-AUC: {pr_auc:.4f}, Precision: {precision:.4f}, Recall: {recall:.4f}")
            print(f"    TP={tp}, FP={fp}, FN={fn}, TN={tn}")
        
        avg_f2 = np.mean(scores_f2)
        avg_pr = np.mean(scores_pr)
        avg_precision = np.mean(scores_precision)
        avg_recall = np.mean(scores_recall)
        
        print(f"\n  Average across {N_SPLITS} windows:")
        print(f"    F2: {avg_f2:.4f} (±{np.std(scores_f2):.4f})")
        print(f"    PR-AUC: {avg_pr:.4f} (±{np.std(scores_pr):.4f})")
        print(f"    Precision: {avg_precision:.4f} (±{np.std(scores_precision):.4f})")
        print(f"    Recall: {avg_recall:.4f} (±{np.std(scores_recall):.4f})")
        
        if avg_f2 > best_score:
            best_score = avg_f2
            best_parameters = param_print
            best_fold_predictions = fold_predictions
            print(f"  ✓ NEW BEST F2: {best_score:.4f}")
        
        all_results.append({
            'parameters': param_print,
            'f2_scores': scores_f2,
            'pr_auc_scores': scores_pr,
            'precision_scores': scores_precision,
            'recall_scores': scores_recall,
            'confusion': confusion_list,
            'f2_mean': float(avg_f2),
            'f2_std': float(np.std(scores_f2)),
            'pr_auc_mean': float(avg_pr),
            'pr_auc_std': float(np.std(scores_pr)),
            'precision_mean': float(avg_precision),
            'precision_std': float(np.std(scores_precision)),
            'recall_mean': float(avg_recall),
            'recall_std': float(np.std(scores_recall))
        })
        
        print("="*80)
    
    best_result = [r for r in all_results if r['f2_mean'] == best_score][0]
    
    print("\n" + "="*80)
    print("MODEL EVALUATION COMPLETE")
    print("="*80)
    print(f"Feature column used: {feature_col}")
    print(f"Best Parameters: {best_parameters}")
    print(f"Best F2: {best_result['f2_mean']:.4f} (±{best_result['f2_std']:.4f})")
    print(f"Best PR-AUC: {best_result['pr_auc_mean']:.4f} (±{best_result['pr_auc_std']:.4f})")
    print(f"Best Precision: {best_result['precision_mean']:.4f} (±{best_result['precision_std']:.4f})")
    print(f"Best Recall: {best_result['recall_mean']:.4f} (±{best_result['recall_std']:.4f})")
    print("="*80)
    
    return {
        'feature_col': feature_col,
        'best_parameters': best_parameters,
        'best_f2_mean': best_result['f2_mean'],
        'best_f2_std': best_result['f2_std'],
        'best_pr_auc_mean': best_result['pr_auc_mean'],
        'best_pr_auc_std': best_result['pr_auc_std'],
        'best_precision_mean': best_result['precision_mean'],
        'best_precision_std': best_result['precision_std'],
        'best_recall_mean': best_result['recall_mean'],
        'best_recall_std': best_result['recall_std'],
        'best_f2_scores': best_result['f2_scores'],
        'best_pr_auc_scores': best_result['pr_auc_scores'],
        'best_precision_scores': best_result['precision_scores'],
        'best_recall_scores': best_result['recall_scores'],
        'best_confusion': best_result['confusion'],
        'all_results': all_results
    }

In [0]:
# ============================================================
# MLFLOW GRID SEARCH - PREBUILT FEATURES (full_unscaled_features)
# ============================================================

experiment_name_mlflow = "/Workspace/Shared/Team_2_1/rf/random_forest_full_unscaled_features"

try:
    experiment_id_mlflow = mlflow.create_experiment(
        name=experiment_name_mlflow,
        artifact_location="dbfs:/student-groups/Group_02_01/experiments/rf"
    )
except Exception as e:
    experiment_mlflow = mlflow.get_experiment_by_name(experiment_name_mlflow)
    if experiment_mlflow:
        experiment_id_mlflow = experiment_mlflow.experiment_id

mlflow.set_experiment(experiment_name_mlflow)

# Use your param_grid
param_grid_mlflow = param_grid  # Or define it explicitly if needed

parameter_names_mlflow = list(param_grid_mlflow.keys())
parameter_values_mlflow = list(param_grid_mlflow.values())
parameters_mlflow = list(product(*parameter_values_mlflow))

print(f"Total parameter combinations: {len(parameters_mlflow)}")
print(f"Total runs (combinations × windows): {len(parameters_mlflow) * N_SPLITS}")

best_f2_mlflow = 0
best_params_mlflow = None
all_results_mlflow = []

for p in parameters_mlflow:
    param_dict_mlflow = {x[0]:x[1] for x in zip(parameter_names_mlflow, p)}
    print(f"\nTesting: {param_dict_mlflow}")
    
    f2_scores_mlflow = []
    pr_scores_mlflow = []
    precision_scores_mlflow = []
    recall_scores_mlflow = []
    
    for i in range(N_SPLITS):
        train_path_mlflow = f"{folds_input_path}/window_{i+1}_train"
        val_path_mlflow = f"{folds_input_path}/window_{i+1}_val"
        
        # Select only the prebuilt feature column and target
        columns_to_keep = ['full_unscaled_features', 'ARR_DEL15']
        train_df_mlflow = spark.read.parquet(train_path_mlflow).select(columns_to_keep)
        val_df_mlflow = spark.read.parquet(val_path_mlflow).select(columns_to_keep)
        
        with mlflow.start_run(run_name=f"d{param_dict_mlflow['maxDepth']}_t{param_dict_mlflow['numTrees']}_w{i+1}"):
            # Use prebuilt features pipeline
            pipeline_mlflow = get_rf_model_with_prebuilt_features(
                param_dict_mlflow, 
                feature_col='full_unscaled_features', 
                use_weights=False
            )
            
            model_mlflow = pipeline_mlflow.fit(train_df_mlflow)
            preds_mlflow = model_mlflow.transform(val_df_mlflow)
            
            f2_mlflow, pr_auc_mlflow, precision_mlflow, recall_mlflow, tp_mlflow, fp_mlflow, fn_mlflow, tn_mlflow = cv_eval(preds_mlflow)
            
            f2_scores_mlflow.append(f2_mlflow)
            pr_scores_mlflow.append(pr_auc_mlflow)
            precision_scores_mlflow.append(precision_mlflow)
            recall_scores_mlflow.append(recall_mlflow)
            
            # Log parameters
            mlflow.log_param("maxDepth", param_dict_mlflow['maxDepth'])
            mlflow.log_param("numTrees", param_dict_mlflow['numTrees'])
            mlflow.log_param("maxBins", param_dict_mlflow['maxBins'])
            mlflow.log_param("minInstancesPerNode", param_dict_mlflow['minInstancesPerNode'])
            mlflow.log_param("featureSubsetStrategy", param_dict_mlflow['featureSubsetStrategy'])
            mlflow.log_param("feature_col", "full_unscaled_features")
            mlflow.log_param("window", i+1)
            
            # Log metrics
            mlflow.log_metric("f2_score", f2_mlflow)
            mlflow.log_metric("pr_auc", pr_auc_mlflow)
            mlflow.log_metric("precision", precision_mlflow)
            mlflow.log_metric("recall", recall_mlflow)
            mlflow.log_metric("true_positives", tp_mlflow)
            mlflow.log_metric("false_positives", fp_mlflow)
            mlflow.log_metric("false_negatives", fn_mlflow)
            mlflow.log_metric("true_negatives", tn_mlflow)
            
            # Log model
            mlflow.spark.log_model(model_mlflow, artifact_path="rf_model")
            
            print(f"  Window {i+1}: F2={f2_mlflow:.4f}, Recall={recall_mlflow:.4f}")
    
    # Calculate averages
    avg_f2_mlflow = np.mean(f2_scores_mlflow)
    avg_pr_mlflow = np.mean(pr_scores_mlflow)
    avg_precision_mlflow = np.mean(precision_scores_mlflow)
    avg_recall_mlflow = np.mean(recall_scores_mlflow)
    
    print(f"  Average: F2={avg_f2_mlflow:.4f} (±{np.std(f2_scores_mlflow):.4f}), Recall={avg_recall_mlflow:.4f}")
    
    # Store results
    all_results_mlflow.append({
        'parameters': param_dict_mlflow,
        'avg_f2': avg_f2_mlflow,
        'std_f2': np.std(f2_scores_mlflow),
        'avg_pr_auc': avg_pr_mlflow,
        'avg_precision': avg_precision_mlflow,
        'avg_recall': avg_recall_mlflow,
        'fold_f2_scores': f2_scores_mlflow
    })
    
    # Track best parameters
    if avg_f2_mlflow > best_f2_mlflow:
        best_f2_mlflow = avg_f2_mlflow
        best_params_mlflow = param_dict_mlflow
        print(f"  ✓ NEW BEST!")

print("\n" + "="*80)
print("GRID SEARCH COMPLETE - full_unscaled_features")
print("="*80)
print(f"Best Parameters: {best_params_mlflow}")
print(f"Best F2: {best_f2_mlflow:.4f}")
print("="*80)

# Create results dictionary matching your original format
rf_results_full_features = {
    'best_parameters': best_params_mlflow,
    'best_f2': best_f2_mlflow,
    'all_results': all_results_mlflow
}

print(f"\nResults stored in: rf_results_full_features")

Total parameter combinations: 1
Total runs (combinations × windows): 2

Testing: {'maxDepth': 10, 'numTrees': 20, 'maxBins': 400, 'minInstancesPerNode': 10, 'featureSubsetStrategy': 'sqrt'}




  Window 1: F2=0.5294, Recall=0.6659




  Window 2: F2=0.5240, Recall=0.6550
  Average: F2=0.5267 (±0.0027), Recall=0.6604
  ✓ NEW BEST!

GRID SEARCH COMPLETE - full_unscaled_features
Best Parameters: {'maxDepth': 10, 'numTrees': 20, 'maxBins': 400, 'minInstancesPerNode': 10, 'featureSubsetStrategy': 'sqrt'}
Best F2: 0.5267

Results stored in: rf_results_full_features


In [0]:
best_parameters = rf_results_full_features['best_parameters']

print(f"Best Parameters: {best_parameters}")
print(f"Feature column: full_unscaled_features")

pipeline_full_unscaled_features = get_rf_model_with_prebuilt_features(
    best_parameters, 
    feature_col='full_unscaled_features', 
    use_weights=True
)

columns_to_keep_full_unscaled_only = ['full_unscaled_features', 'ARR_DEL15']

# =======================
# USE DOWNSAMPLED TRAINING DATA
# =======================
train_full_df = df_train_master.select(columns_to_keep_full_unscaled_only)
print(f"\nTotal training rows: {train_full_df.count():,}")

print(f"Class distribution in training data:")
train_full_df.groupBy('ARR_DEL15').count().show()

# =======================
# TRAIN MODEL
# =======================
print(f"Training model on downsampled data...")
model_full_unscaled_features = pipeline_full_unscaled_features.fit(train_full_df)

# =======================
# TEST ON BLIND TEST SET
# =======================
test_df = spark.read.parquet(blind_test_set_path).select(columns_to_keep_full_unscaled_only)
print(f"Total test samples: {test_df.count():,}")
print(f"Running predictions...")

test_pred_full_unscaled_features = model_full_unscaled_features.transform(test_df)

f2, pr_auc, precision, recall, tp, fp, fn, tn = cv_eval(test_pred_full_unscaled_features)

print("\nBLIND TEST SET RESULTS (full_unscaled_features):")
print("=" * 80)
print(f"F2 Score:  {f2:.4f}")
print(f"PR-AUC:    {pr_auc:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall:    {recall:.4f}")
print(f"\nConfusion Matrix:")
print(f"  TP={tp:,}, FP={fp:,}")
print(f"  FN={fn:,}, TN={tn:,}")
print("=" * 80)

Best Parameters: {'maxDepth': 10, 'numTrees': 20, 'maxBins': 400, 'minInstancesPerNode': 10, 'featureSubsetStrategy': 'sqrt'}
Feature column: full_unscaled_features

Total training rows: 8,812,266
Class distribution in training data:
+---------+-------+
|ARR_DEL15|  count|
+---------+-------+
|        1|4405402|
|        0|4406864|
+---------+-------+

Training model on downsampled data...


Exception ignored in: <function JavaModelWrapper.__del__ at 0x7f04285827a0>
Traceback (most recent call last):
  File "/databricks/spark/python/pyspark/mllib/common.py", line 152, in __del__
    assert self._sc._gateway is not None
           ^^^^^^^^
AttributeError: 'BinaryClassificationMetrics' object has no attribute '_sc'


[0;31m---------------------------------------------------------------------------[0m
[0;31mIllegalArgumentException[0m                  Traceback (most recent call last)
File [0;32m<command-8516020384389433>, line 27[0m
[1;32m     24[0m [38;5;66;03m# TRAIN MODEL[39;00m
[1;32m     26[0m [38;5;28mprint[39m([38;5;124mf[39m[38;5;124m"[39m[38;5;124mTraining model on downsampled data...[39m[38;5;124m"[39m)
[0;32m---> 27[0m model_full_unscaled_features [38;5;241m=[39m pipeline_full_unscaled_features[38;5;241m.[39mfit(train_full_df)
[1;32m     30[0m [38;5;66;03m# TEST ON BLIND TEST SET[39;00m
[1;32m     32[0m test_df [38;5;241m=[39m spark[38;5;241m.[39mread[38;5;241m.[39mparquet(blind_test_set_path)[38;5;241m.[39mselect(columns_to_keep_full_unscaled_only)

File [0;32m/databricks/python_shell/lib/dbruntime/MLWorkloadsInstrumentation/_pyspark.py:30[0m, in [0;36m_create_patch_function.<locals>.patched_method[0;34m(self, *args, **kwargs)[0m
[1;32m   

In [0]:
best_parameters = rf_results_log_features['best_parameters']

print(f"Best Parameters: {best_parameters}")
print(f"Feature column: log_unscaled_features")

pipeline_log_unscaled_features = get_rf_model_with_prebuilt_features(
    best_parameters, 
    feature_col='log_unscaled_features', 
    use_weights=False
)

columns_to_keep_log_unscaled_only = ['log_unscaled_features', 'ARR_DEL15']

print(f"\nUsing downsampled training data")
train_full_df_log_unscaled_only = df_train_master.select(columns_to_keep_log_unscaled_only)

print(f"Total training samples: {train_full_df_log_unscaled_only.count():,}")
print(f"Training final model...")

model_log_unscaled_features = pipeline_log_unscaled_features.fit(train_full_df_log_unscaled_only)

test_df = spark.read.parquet(blind_test_set_path)
test_df_log_unscaled_only = test_df.select(columns_to_keep_log_unscaled_only)

print(f"Total test samples: {test_df_log_unscaled_only.count():,}")
print(f"Running predictions...")

test_pred_log_unscaled_features = model_log_unscaled_features.transform(test_df_log_unscaled_only)

f2, pr_auc, precision, recall, tp, fp, fn, tn = cv_eval(test_pred_log_unscaled_features)

print("\nBLIND TEST SET RESULTS (log_unscaled_features):")
print("=" * 80)
print(f"F2 Score:  {f2:.4f}")
print(f"PR-AUC:    {pr_auc:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall:    {recall:.4f}")
print(f"\nConfusion Matrix:")
print(f"  TP={tp:,}, FP={fp:,}")
print(f"  FN={fn:,}, TN={tn:,}")
print("=" * 80)

com.databricks.backend.common.rpc.CommandCancelledException
	at com.databricks.spark.chauffeur.ExecContextState.cancel(ExecContextState.scala:434)
	at com.databricks.spark.chauffeur.ExecutionContextManagerV1.cancelExecution(ExecutionContextManagerV1.scala:473)
	at com.databricks.spark.chauffeur.ChauffeurState.$anonfun$process$1(ChauffeurState.scala:750)
	at com.databricks.logging.UsageLogging.$anonfun$recordOperation$1(UsageLogging.scala:510)
	at com.databricks.logging.UsageLogging.executeThunkAndCaptureResultTags$1(UsageLogging.scala:616)
	at com.databricks.logging.UsageLogging.$anonfun$recordOperationWithResultTags$4(UsageLogging.scala:643)
	at com.databricks.logging.AttributionContextTracing.$anonfun$withAttributionContext$1(AttributionContextTracing.scala:49)
	at com.databricks.logging.AttributionContext$.$anonfun$withValue$1(AttributionContext.scala:293)
	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
	at com.databricks.logging.AttributionContext$.withValue(Attr

In [0]:
print("\n" + "="*80)
print("COMPARISON: full_unscaled_features vs log_unscaled_features")
print("="*80)
print(f"full_unscaled_features - F2: {rf_results_full_features['best_f2_mean']:.4f}, Recall: {rf_results_full_features['best_recall_mean']:.4f}")
print(f"log_unscaled_features  - F2: {rf_results_log_features['best_f2_mean']:.4f}, Recall: {rf_results_log_features['best_recall_mean']:.4f}")
print("="*80)

## Archived Codes

In [0]:
%skip
rf_results_log_features = model_eval_on_folds_prebuilt(
    folds_input_path=folds_input_path,
    param_grid=param_grid,
    feature_col='log_unscaled_features',
    N_SPLITS=N_SPLITS,
    use_weights=False
)

com.databricks.backend.common.rpc.CommandCancelledException
	at com.databricks.spark.chauffeur.ExecContextState.cancel(ExecContextState.scala:434)
	at com.databricks.spark.chauffeur.ExecutionContextManagerV1.cancelExecution(ExecutionContextManagerV1.scala:473)
	at com.databricks.spark.chauffeur.ChauffeurState.$anonfun$process$1(ChauffeurState.scala:750)
	at com.databricks.logging.UsageLogging.$anonfun$recordOperation$1(UsageLogging.scala:510)
	at com.databricks.logging.UsageLogging.executeThunkAndCaptureResultTags$1(UsageLogging.scala:616)
	at com.databricks.logging.UsageLogging.$anonfun$recordOperationWithResultTags$4(UsageLogging.scala:643)
	at com.databricks.logging.AttributionContextTracing.$anonfun$withAttributionContext$1(AttributionContextTracing.scala:49)
	at com.databricks.logging.AttributionContext$.$anonfun$withValue$1(AttributionContext.scala:293)
	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
	at com.databricks.logging.AttributionContext$.withValue(Attr

In [0]:
%skip
from pyspark.sql.functions import col, count, when

# Function to summarize null/NaN values for all columns
def summarize_nulls_all(df):
    """
    Summarize null/NaN values for all columns
    
    Args:
        df: Spark DataFrame
    
    Returns:
        DataFrame with null value statistics
    """
    null_stats = []
    total_rows = df.count()
    
    for column in df.columns:
        null_count = df.filter(col(column).isNull()).count()
        null_percent = (null_count / total_rows) * 100 if total_rows > 0 else 0
        
        null_stats.append({
            'Column': column,
            'Null_Count': null_count,
            'Null_Percent': round(null_percent, 2),
            'Non_Null_Count': total_rows - null_count
        })
    
    return spark.createDataFrame(null_stats)

# Get null summary for all columns
null_summary = summarize_nulls_all(sample_fold)

# Sort by null percentage (descending) to see which columns have the most missing data
null_summary.orderBy(col('Null_Percent').desc()).show(truncate=False, n=len(sample_fold.columns))

# Optional: Filter columns with 0% nulls (completely non-null)
non_null_cols = null_summary.filter(col('Null_Percent') == 0).select('Column').rdd.flatMap(lambda x: x).collect()
print(f"\nColumns with NO null values ({len(non_null_cols)} columns):")
for col_name in sorted(non_null_cols):
    print(f"  - {col_name}")

In [0]:
%skip
cols_to_int_clean = [c for c in cols_to_int if c not in leakage_cols and c in cols_no_nulls]
cols_to_double_clean = [c for c in cols_to_double if c not in leakage_cols and c in cols_no_nulls]
binary_cols_clean = [c for c in binary_cols if c not in leakage_cols and c in cols_no_nulls]

numeric_cols_clean = cols_to_int_clean + cols_to_double_clean + binary_cols_clean

feature_eng_numeric = [
    "TAXI_OUT_log_imp", "TAXI_IN_log_imp", "CRS_ELAPSED_TIME_log_imp", 
    "AIR_TIME_log_imp", "DISTANCE_log_imp", "ORIGIN_ELEVATION_FT_log_imp", 
    "DEST_ELEVATION_FT_log_imp", "LowestBase_ft_agl_log_imp", "BaseRange_ft_agl_log_imp", 
    "Ceiling_ft_agl_log_imp", "HourlyStationPressure_log_imp", "HourlyVisibility_log_imp", 
    "HourlyPrecipitation_log_imp","is_us_holiday"
]
numeric_cols_clean.extend([c for c in feature_eng_numeric if c in cols_no_nulls and c not in numeric_cols_clean])

from pyspark.sql.functions import stddev

print(f"NUMERIC COLUMN VARIANCE ANALYSIS")
print(f"-" * 67)
print(f"Total numeric columns to assess: {len(numeric_cols_clean)}\n")

numeric_std = df_sample.select([stddev(c).alias(c) for c in numeric_cols_clean]).collect()[0].asDict()

variance_results = [(col, std_val) for col, std_val in numeric_std.items()]
variance_results.sort(key=lambda x: x[1] if x[1] is not None else 0)

print(f"{'Column':<50} {'Std Dev':>15}")
print("-" * 67)
for col, std_val in variance_results:
    if std_val is not None:
        print(f"{col:<50} {std_val:>15.6f}")
    else:
        print(f"{col:<50} {'None':>15}")

variance_threshold = 0.01
low_variance = [(col, std) for col, std in variance_results if std is not None and std <= variance_threshold]
high_variance = [(col, std) for col, std in variance_results if std is not None and std > variance_threshold]

print(f"\n" + "-" * 67)
print(f"RECOMMENDATIONS (variance threshold: {variance_threshold})")
print(f"-" * 67)

if low_variance:
    print(f"\nLOW VARIANCE (<={variance_threshold}) - Consider excluding:")
    for col, std in low_variance:
        print(f"  {col:<50} {std:>15.6f}")

print(f"\nHIGH VARIANCE (>{variance_threshold}) - Safe to use:")
print(f"  {len(high_variance)} columns")

print(f"\nTotal columns to exclude: {len(low_variance)}")
print(f"Total columns to keep: {len(high_variance)}")

In [0]:
%skip
rf_results_full_features = model_eval_on_folds_prebuilt(
    folds_input_path=folds_input_path,
    param_grid=param_grid,
    feature_col='full_unscaled_features',
    N_SPLITS=N_SPLITS,
    use_weights=False
)

In [0]:
%skip
print("=" * 80)
print("TRIPLE CHECK: FEATURE EXISTENCE ACROSS ALL WINDOWS + TEST SET")
print("=" * 80)

# All features to check
all_features_to_check = numeric_cols_to_use + cat_cols_to_use

print(f"\nChecking {len(all_features_to_check)} features:")
print(f"  Numeric: {len(numeric_cols_to_use)}")
print(f"  Categorical: {len(cat_cols_to_use)}")

# Check across all training windows
print(f"\nChecking across {N_SPLITS} training windows...")
all_windows_have_features = True

for i in range(1, N_SPLITS + 1):
    window_df = spark.read.parquet(f"{folds_input_path}/window_{i}_train")
    window_cols = set(window_df.columns)
    
    missing_in_window = [col for col in all_features_to_check if col not in window_cols]
    
    if missing_in_window:
        all_windows_have_features = False
        print(f"  ✗ Window {i}: MISSING {len(missing_in_window)} features: {missing_in_window}")
    else:
        print(f"  ✓ Window {i}: All features present")

# Check test set
print(f"\nChecking test set...")
df_test = spark.read.parquet(test_set_path)
test_cols = set(df_test.columns)

missing_in_test = [col for col in all_features_to_check if col not in test_cols]

if missing_in_test:
    print(f"  ✗ Test set: MISSING {len(missing_in_test)} features:")
    for col in missing_in_test:
        col_type = "numeric" if col in numeric_cols_to_use else "categorical"
        print(f"    - {col} ({col_type})")
else:
    print(f"  ✓ Test set: All features present")

# Final verdict
print("\n" + "=" * 80)
if all_windows_have_features and not missing_in_test:
    print("✅ SUCCESS: ALL FEATURES EXIST IN ALL WINDOWS AND TEST SET")
    print("=" * 80)
    print(f"Ready to train with {len(all_features_to_check)} features:")
    print(f"  - {len(numeric_cols_to_use)} numeric features")
    print(f"  - {len(cat_cols_to_use)} categorical features")
    print("\n🚀 You can now proceed with model training!")
else:
    print("❌ FAILURE: SOME FEATURES ARE MISSING")
    print("=" * 80)
    print("⚠️  You MUST fix the feature list before training!")
    
    if not all_windows_have_features:
        print("\n  Problem: Some features missing from training windows")
    if missing_in_test:
        print("\n  Problem: Some features missing from test set")
        print(f"\n  Missing from test: {missing_in_test}")
        print("\n  Solution: Remove these features from your feature lists:")
        print(f"    numeric_cols_to_use = [c for c in numeric_cols_to_use if c not in {missing_in_test}]")
        print(f"    cat_cols_to_use = [c for c in cat_cols_to_use if c not in {missing_in_test}]")

print("=" * 80)

In [0]:
%skip
test_final_cols = set(df_test_final_clean.columns)

# Collect columns from ALL training windows (clean, no artifacts)
window_clean_cols = {}
for w in train_windows:
    path = f"{folds_input_path}/{w}"
    df_window = spark.read.parquet(path)
    
    # Remove artifact columns
    artifact_cols = [c for c in df_window.columns if 
                     c.endswith('_ohe') or 
                     c.endswith('_idx') or 
                     c.endswith('_log') or 
                     c in ['features_num', 'features_num_scaled']]
    
    clean_cols = set([c for c in df_window.columns if c not in artifact_cols])
    window_clean_cols[w] = clean_cols
    print(f"{w}: {len(clean_cols)} clean columns")

# Find columns common to ALL windows
common_across_all_windows = set.intersection(*window_clean_cols.values())
print(f"\n{'='*80}")
print(f"Columns common to ALL {len(train_windows)} training windows: {len(common_across_all_windows)}")

# Compare test vs common training columns
common_test_train = test_final_cols & common_across_all_windows
only_in_test = test_final_cols - common_across_all_windows
only_in_all_windows = common_across_all_windows - test_final_cols

print(f"\nAlignment:")
print(f"  Columns in BOTH test and ALL windows: {len(common_test_train)}")
print(f"  Columns ONLY in test: {len(only_in_test)}")
print(f"  Columns ONLY in ALL windows: {len(only_in_all_windows)}")

if only_in_test:
    print(f"\n⚠️ Columns ONLY in test:")
    for col in sorted(only_in_test):
        print(f"  - {col}")

if only_in_all_windows:
    print(f"\n⚠️ Columns ONLY in ALL training windows:")
    for col in sorted(only_in_all_windows):
        print(f"  - {col}")

# Check model features specifically
print(f"\n{'='*80}")
print(f"MODEL FEATURES ACROSS ALL WINDOWS")
print(f"{'='*80}\n")

model_features_in_test = [c for c in all_model_features if c in test_final_cols]
model_features_in_all_windows = [c for c in all_model_features if c in common_across_all_windows]
model_features_common = [c for c in all_model_features if c in common_test_train]

print(f"Total model features required: {len(all_model_features)}")
print(f"  Present in test: {len(model_features_in_test)}")
print(f"  Present in ALL windows: {len(model_features_in_all_windows)}")
print(f"  Present in BOTH test AND all windows: {len(model_features_common)}")

missing_from_test = [c for c in all_model_features if c not in test_final_cols]
missing_from_windows = [c for c in all_model_features if c not in common_across_all_windows]

if missing_from_test:
    print(f"\n⚠️ Model features missing from test:")
    for col in missing_from_test:
        print(f"  - {col}")

if missing_from_windows:
    print(f"\n⚠️ Model features missing from some training windows:")
    for col in missing_from_windows:
        print(f"  - {col}")

# Final verdict
print(f"\n{'='*80}")
print(f"ULTIMATE FINAL VERDICT")
print(f"{'='*80}\n")

all_checks_passed = (
    len(only_in_test) == 0 and 
    len(only_in_all_windows) == 0 and 
    len(missing_from_test) == 0 and 
    len(missing_from_windows) == 0 and
    len(model_features_common) == len(all_model_features)
)

if all_checks_passed:
    print(f"✅✅✅ PERFECT ALIGNMENT!")
    print(f"\n📊 df_test_final_clean specifications:")
    print(f"   - Rows: {df_test_final_clean.count():,}")
    print(f"   - Columns: {len(df_test_final_clean.columns)}")
    print(f"   - Model features: {len(model_features_common)}")
    print(f"   - Nulls: 0")
    print(f"   - Aligned with ALL {len(train_windows)} training windows: ✅")
    print(f"\n🎯🎯🎯 READY FOR RANDOM FOREST PREDICTIONS!")
else:
    print(f"⚠️ Issues detected - review details above")
    print(f"\nSummary of issues:")
    if only_in_test:
        print(f"  - {len(only_in_test)} columns only in test")
    if only_in_all_windows:
        print(f"  - {len(only_in_all_windows)} columns only in training")
    if missing_from_test:
        print(f"  - {len(missing_from_test)} model features missing from test")
    if missing_from_windows:
        print(f"  - {len(missing_from_windows)} model features missing from windows")

In [0]:
%skip
from pyspark.sql.functions import col, count, when

# Function to summarize null/NaN values
def summarize_nulls(df, cols):
    null_stats = []
    total_rows = df.count()
    
    for column in cols:
        null_count = df.filter(col(column).isNull()).count()
        null_percent = (null_count / total_rows) * 100 if total_rows > 0 else 0
        
        null_stats.append({
            'Column': column,
            'Null_Count': null_count,
            'Null_Percent': round(null_percent, 2),
            'Non_Null_Count': total_rows - null_count
        })
    
    return spark.createDataFrame(null_stats)

# Get null summary
all_selected_cols = numeric_cols_to_use + cat_cols_ohe
null_summary = summarize_nulls(sample_fold, all_selected_cols)

# Sort by null percentage (descending)
print("=" * 80)
print("NULL VALUE SUMMARY FOR SELECTED FEATURES")
print("=" * 80)
null_summary.orderBy(col('Null_Percent').desc()).show(truncate=False, n=len(all_selected_cols))

# Summary statistics
print("\n" + "=" * 80)
print("SUMMARY STATISTICS")
print("=" * 80)
cols_with_nulls = null_summary.filter(col('Null_Percent') > 0).count()
cols_no_nulls = null_summary.filter(col('Null_Percent') == 0).count()

print(f"Total columns selected: {len(all_selected_cols)}")
print(f"Columns with NO nulls: {cols_no_nulls}")
print(f"Columns WITH nulls: {cols_with_nulls}")

if cols_with_nulls > 0:
    print("\nColumns with nulls:")
    null_summary.filter(col('Null_Percent') > 0).orderBy(col('Null_Percent').desc()).show(truncate=False)


In [0]:
%skip
# Filter both numeric and categorical columns to what's actually available
sample_fold = spark.read.parquet(f"{folds_input_path}/fold_1_train")
available_cols = set(sample_fold.columns)

# Filter numeric columns (exclude CANCELLED, DIVERTED)
numeric_cols_to_use = [col for col in numeric_cols_clean if col in available_cols]

# Filter categorical columns (only keep those with _ohe versions)
cat_cols_to_use = [col for col in cat_cols if f"{col}_ohe" in available_cols]

print(f"Using {len(numeric_cols_to_use)} numeric columns")
print(f"Using {len(cat_cols_to_use)} categorical columns")
print(f"Excluded categorical: DEP_TIME_BLK, ARR_TIME_BLK")

# Then in your grid search loop:
pipeline = get_rf_model(param_print, cat_cols_to_use, numeric_cols_to_use, use_weights=use_weights)

In [0]:
%skip
def get_rf_model(params, cat_cols, numeric_cols, use_weights=False):
    """
    Create Random Forest pipeline for ALREADY PREPROCESSED data
    Assumes: StringIndexer and OneHotEncoder already applied in scaled_smote_folds
    
    Args:
        params: dict with 'maxDepth', 'numTrees', 'maxBins'
        cat_cols: list of categorical column names (base names, _ohe will be added)
        numeric_cols: list of numeric column names (already scaled/cleaned)
        use_weights: bool, whether to use class weights (for SAMPLING_METHOD='weights')
    
    Returns:
        Pipeline with VectorAssembler + RandomForest only
    """
    from pyspark.ml.classification import RandomForestClassifier
    
    # Stage 1: VectorAssembler - combine numeric + categorical features
    # Assumes _ohe columns already exist from preprocessing
    final_assembler = VectorAssembler(
        inputCols=numeric_cols + [f"{col}_ohe" for col in cat_cols], 
        outputCol="features", 
        handleInvalid="keep"
    )
    
    # Stage 2: Random Forest classifier
    rf_params = {
        'labelCol': "ARR_DEL15",
        'featuresCol': "features",
        'maxDepth': params['maxDepth'],
        'numTrees': params['numTrees'],
        'maxBins': params.get('maxBins', 32),
        'seed': 42
    }
    
    # Add weight column if using class weights
    if use_weights:
        rf_params['weightCol'] = 'weight'
    
    rf = RandomForestClassifier(**rf_params)
    
    # Return pipeline with only 2 stages
    pipeline = Pipeline(stages=[final_assembler, rf])
    return pipeline

In [0]:
%skip
null_check = df_test_final.filter(
    F.col("DEST_LAT").isNull() | 
    F.col("DEST_LON").isNull() | 
    F.col("DEST_ELEVATION_FT").isNull() |
    F.col("DEST_SIZE").isNull()
)

print(f"Rows with destination nulls: {null_check.count()}")
print(f"Checking if all 4 columns are null together...")

# Check if they're all null together
all_dest_null = null_check.filter(
    F.col("DEST_LAT").isNull() & 
    F.col("DEST_LON").isNull() & 
    F.col("DEST_ELEVATION_FT").isNull() &
    F.col("DEST_SIZE").isNull()
).count()

print(f"Rows where all 4 DEST columns are null: {all_dest_null}")

# Option 1: Drop these rows (recommended if only 334 out of 1.5M)
print(f"\nOption 1: Drop rows with null DEST columns")
df_test_final_clean = df_test_final.filter(
    F.col("DEST_LAT").isNotNull() & 
    F.col("DEST_LON").isNotNull() & 
    F.col("DEST_ELEVATION_FT").isNotNull() &
    F.col("DEST_SIZE").isNotNull()
)

print(f"  Before: {df_test_final.count():,} rows")
print(f"  After: {df_test_final_clean.count():,} rows")
print(f"  Dropped: {df_test_final.count() - df_test_final_clean.count()} rows ({((df_test_final.count() - df_test_final_clean.count()) / df_test_final.count() * 100):.3f}%)")

final_null_check = (
    df_test_final_clean.select([
        F.sum(F.col(c).isNull().cast("int")).alias(c)
        for c in present_in_test
    ])
    .collect()[0]
    .asDict()
)

remaining_nulls = {c: n for c, n in final_null_check.items() if n > 0}

In [0]:
%skip
def parameter_sets(param_grid):
    """Generate all combinations of parameters from param_grid"""
    from itertools import product
    parameter_names = list(param_grid.keys())
    parameter_values = list(param_grid.values())
    parameters = list(product(*parameter_values))
    return parameter_names, parameters
  
param_grid = {
    'maxDepth': [5, 10],
    'numTrees': [50],
    'maxBins': [32]
}

In [0]:
%skip
#Leakage column
leakage_cols = ['ARR_DELAY','DEP_DELAY','CARRIER_DELAY','WEATHER_DELAY', 'NAS_DELAY','SECURITY_DELAY','LATE_AIRCRAFT_DELAY','DEP_DEL15','ACTUAL_ELAPSED_TIME','ARR_TIME','AIR_TIME','TAXI_IN','TAXI_OUT', 'WHEELS_OFF','WHEELS_ON']
cols_to_int_clean = [c for c in cols_to_int if c not in leakage_cols]
cols_to_double_clean = [c for c in cols_to_double if c not in leakage_cols]
binary_cols_clean = [c for c in binary_cols if c not in leakage_cols and c != 'ARR_DEL15']

# Combine clean numeric columns
numeric_cols_clean = cols_to_int_clean + cols_to_double_clean + binary_cols_clean

print(f"Original numeric columns: {len(cols_to_int + cols_to_double + binary_cols)}")
print(f"Clean numeric columns (after removing leakage): {len(numeric_cols_clean)}")
print(f"Removed columns: {leakage_cols}")

In [0]:
%skip
from pyspark.sql import functions as F

SAMPLING_METHOD = 'weights'
N_SPLITS = 4
metric = "f2"
best_score = 0
best_param_vals = None
best_parameters = None
best_model = None
best_predictions = None
parameter_names, parameters = parameter_sets(param_grid)

print(f'Number of folds: {N_SPLITS}')
print(f'Total parameter combinations to test: {len(parameters)}')
print("*"*10)

if len(parameters) == 1:
    print('Only 1 parameter')

# Loop through all parameter combinations
for p in parameters:
    param_print = {x[0]:x[1] for x in zip(parameter_names, p)}
    print(f"Parameters: {param_print}")  
    scores = []
  
    for i in range(N_SPLITS):
        # Load Folds (from scaled_smote_folds - already preprocessed)
        train_path = f"{folds_input_path}/fold_{i+1}_train"
        val_path = f"{folds_input_path}/fold_{i+1}_val"
        train_df = spark.read.parquet(train_path)
        dev_df = spark.read.parquet(val_path)
        

        # Determine if we're using weights
        use_weights = (SAMPLING_METHOD == 'weights')
        
        # Apply sampling (if needed - may already be in scaled_smote_folds)
        if SAMPLING_METHOD == 'down':
            train_df = downsample(train_df)
        elif SAMPLING_METHOD == 'up':
            train_df = upsample(train_df)
        elif SAMPLING_METHOD == 'weights':
            train_df = add_class_weights(train_df)
        
        train_df = train_df.cache()
        
        print(f'    TRAIN set for fold {i+1} count is {train_df.count():,} flights ({SAMPLING_METHOD})' if SAMPLING_METHOD else f'    TRAIN set for fold {i+1} count is {train_df.count():,} flights (no sampling)')
        print(f'    DEV set for fold {i+1} count is {dev_df.count():,} flights')
        
        # Get pipeline with weights support
        pipeline = get_rf_model(param_print, cat_cols_to_use, numeric_cols_to_use, use_weights=use_weights)
        
        # Fit and transform
        model = pipeline.fit(train_df)
        dev_pred = model.transform(dev_df)
        
        # Calculate metrics
        if metric == 'f2':
            score = cv_eval(dev_pred)[0]
        elif metric == 'pr':
            score = cv_eval(dev_pred)[1]
        
        scores.append(score)
        print(f'    Fold {i+1} {metric} score: {score:.2f}')
        print('------------------------------------------------------------')
        
        if best_param_vals == None:
            best_param_vals = p
        
        # Store best predictions for PR curve
        if score > best_score:
            best_predictions = dev_pred
    
    # Take average of all scores
    avg_score = np.average(scores)
    
    # Update best score
    if avg_score > best_score:
        previous_best = best_score
        best_score = avg_score
        best_parameters = param_print
        best_param_vals = p
        print(f'new best score of {best_score:.2f}')
    else:
        print(f'Result was not better, score was {avg_score:.2f} with best {metric} score {best_score:.2f}')
    print("************************************************************")

print('')
print('='*20)
print('Grid Search Done')
print('='*20)
print('Best parameter:')
print(best_parameters)
print(f'Best average {metric} score: {best_score:.2f}')
print(f'Sampling method: {SAMPLING_METHOD}')
print('='*80)

# Save results
dbutils.fs.mkdirs(chads_test_output)
results_df = spark.createDataFrame(
    [(str(best_parameters), float(best_score), SAMPLING_METHOD)], 
    ["best_params", "best_score", "sampling_method"]
)
results_df.write.mode("overwrite").parquet(f"{chads_test_output}/grid_search_results")
print(f"Results saved to: {chads_test_output}/grid_search_results")

# Plot PR curve for best model
if best_predictions is not None:
    print("\nPlotting PR curve for best model...")
    plot_pr_curve(best_predictions, title=f"PR Curve - Best Model (F2={best_score:.2f})")

In [0]:
%skip
def evaluate_baseline_on_folds(folds_input_path, N_SPLITS=4, plot_pr=plot_pr):
    from pyspark.sql import functions as F
    
    scores_f2 = []
    scores_pr = []

    for i in range(N_SPLITS):
        print(f"\nFold {i+1}")

        # Load data
        train_df = spark.read.parquet(f"{folds_input_path}/fold_{i+1}_train")
        val_df = spark.read.parquet(f"{folds_input_path}/fold_{i+1}_val")

        # Majority class baseline
        majority_class = (
            train_df.groupBy("ARR_DEL15")
            .count()
            .orderBy(F.desc("count"))
            .first()[0]
        )

        preds_baseline = (
            val_df
            .withColumn("prediction", F.lit(float(majority_class)))
            .withColumn("probability", prob_udf(F.lit(majority_class)))
        )

        # Compute metrics
        f2, pr_auc = cv_eval(preds_baseline)
        scores_f2.append(f2)
        scores_pr.append(pr_auc)

        print(f"Fold {i+1} — F2: {f2:.4f}, PR-AUC: {pr_auc:.4f}")

    return {
        "f2_scores": scores_f2,
        "pr_auc_scores": scores_pr,
        "f2_mean": float(np.mean(scores_f2)),
        "f2_std": float(np.std(scores_f2)),
        "pr_auc_mean": float(np.mean(scores_pr)),
        "pr_auc_std": float(np.std(scores_pr)),
    }


In [0]:
%skip
#Checkpoint location
dbfs_path = "dbfs:/student-groups/Group_02_01"

#RUN FOR 3 MONTH FOLDS
time = 3 #test on 3 months

# Define your existing paths
time_length = data_set(time)
splits_path = f"{dbfs_path}/splits{time_length}"
folds_input_path = f"{splits_path}/scaled_smote_folds" #use scaled_smote_folds

#run evaluation
baseline_results_3m = evaluate_baseline_on_folds(
    folds_input_path=folds_input_path,
    N_SPLITS=4,
    plot_pr=False   #  <- don't plot PR-AUC curves (save time)
)

baseline_results_3m


In [0]:
%skip
#RUN FOR 12 MONTH 
time = 12 #1 yr

# Define your existing paths
time_length = data_set(time)
splits_path = f"{dbfs_path}/splits{time_length}"
folds_input_path = f"{splits_path}/scaled_smote_folds" #use scaled_smote_folds

#run evaluation
baseline_results_1yr = evaluate_baseline_on_folds(
    folds_input_path=folds_input_path,
    N_SPLITS=4,
    plot_pr=True   #  <- enable PR-AUC curves
)

baseline_results_1yr


In [0]:
%skip
#Checkpoint location
dbfs_path = "dbfs:/student-groups/Group_02_01"

#RUN FOR 1 year
time = 12 #1 year

time_length = data_set(time)
splits_path = f"{dbfs_path}/splits{time_length}"

# Define your existing paths
folds_input_path = f"{splits_path}/scaled_smote_folds" #use scaled_smote_folds
N_SPLITS = 4  # number of folds
plot_pr = True #plot PR-AUC curve


In [0]:
%skip
from pyspark.sql import functions as F

scores_f2 = []
scores_pr = []

for i in range(N_SPLITS):
    print(f"\nFold {i+1}")
    
    train_path = f"{folds_input_path}/fold_{i+1}_train"
    val_path = f"{folds_input_path}/fold_{i+1}_val"
    
    train_df = spark.read.parquet(train_path)
    val_df = spark.read.parquet(val_path)
    
    # Majority class baseline
    majority_class = (
        train_df.groupBy("ARR_DEL15")
        .count()
        .orderBy(F.desc("count"))
        .first()[0]
    )
    
    print(f"Majority class (Fold {i+1}): {majority_class}")

    preds_baseline = (
        val_df
        .withColumn("prediction", F.lit(float(majority_class)))
        .withColumn("probability", prob_udf(F.lit(majority_class)))
    )

    # --- Compute BOTH metrics ---
    f2, pr_auc = cv_eval(preds_baseline)
    scores_f2.append(f2)
    scores_pr.append(pr_auc)

    print(f"F2 Score (Fold {i+1}): {f2:.4f}")
    print(f"PR-AUC (Fold {i+1}): {pr_auc:.4f}")

    # --- Plot only if needed ---
    if plot_pr:   
        plt.figure(figsize=(7, 5))
        plt.bar(["F2 Score", "PR-AUC"], [f2, pr_auc], color=["lightcoral", "skyblue"])
        plt.ylabel("Score")
        plt.title(f"Fold {i+1} — Baseline Metrics")
        plt.ylim(0, 1)
        plt.grid(True, axis='y')
        plt.show()

In [0]:
%skip
#mean of F2 score and PR-AUC
f2_mean = np.mean(scores_f2)
f2_std  = np.std(scores_f2)

pr_mean = np.mean(scores_pr)
pr_std  = np.std(scores_pr)

print(f"F2 Baseline: {f2_mean:.4f} ± {f2_std:.4f}")
print(f"PR-AUC Baseline: {pr_mean:.4f} ± {pr_std:.4f}")

In [0]:
%skip
# Add prediction column
preds_baseline = val_df.withColumn("prediction", F.lit(float(majority_class)))

# For compatibility with binary metrics, also create a probability vector
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.functions import udf

# Create probability vector [P(0), P(1)]
def prob_vector(label):
    return Vectors.dense([1.0, 0.0]) if label == 0 else Vectors.dense([0.0, 1.0])

prob_udf = udf(prob_vector, VectorUDT())
preds_baseline = preds_baseline.withColumn("probability", prob_udf(F.lit(majority_class)))

pr_auc = cv_eval(preds_baseline)[1]
print(f"PR-AUC: {pr_auc:.4f}")

In [0]:
%skip
def run_model_cv(folds_input_path, N_SPLITS, pipeline, model, plot_pr=False):
    metrics_f2 = []
    metrics_pr = []

    for i in range(N_SPLITS):
        print(f"\n===== Fold {i+1} =====")

        train_df = spark.read.parquet(f"{folds_input_path}/fold_{i+1}_train")
        val_df = spark.read.parquet(f"{folds_input_path}/fold_{i+1}_val")

        # Fit pipeline + model
        pipeline_model = pipeline.fit(train_df)
        preds = pipeline_model.transform(val_df)

        # Evaluate
        f2, pr_auc = cv_eval(preds)
        metrics_f2.append(f2)
        metrics_pr.append(pr_auc)

        print(f"Model F2 (Fold {i+1}): {f2:.4f}")
        print(f"Model PR-AUC (Fold {i+1}): {pr_auc:.4f}")

        if plot_pr:
            plt.figure(figsize=(7,5))
            plt.bar(["F2", "PR-AUC"], [f2, pr_auc])
            plt.ylim(0, 1)
            plt.title(f"Fold {i+1} — Model Scores")
            plt.grid(True, axis='y')
            plt.show()

    return {
        "f2_scores": metrics_f2,
        "pr_auc_scores": metrics_pr,
        "f2_mean": float(np.mean(metrics_f2)),
        "f2_std": float(np.std(metrics_f2)),
        "pr_auc_mean": float(np.mean(metrics_pr)),
        "pr_auc_std": float(np.std(metrics_pr)),
    }

In [0]:
%skip
# Your preprocessing + model pipeline
pipeline = Pipeline(stages=indexers + encoders + [model])

# ---- RUN FOR 3 MONTHS ----
baseline_3m, model_3m = run_full_pipeline(
    time_window=3,
    model=model,
    pipeline=pipeline,
    dbfs_base=dbfs_base,
    plot_pr=True
)

# # ---- RUN FOR 1 YEAR ----
# baseline_1y, model_1y = run_full_pipeline(
#     time_window=12,
#     model=model,
#     pipeline=pipeline,
#     dbfs_base=dbfs_base,
#     plot_pr=True
# )

In [0]:
%skip
#run full pipeline
def run_full_pipeline(time_window, model, pipeline, dbfs_base, plot_pr=True):
    time_length = data_set(time_window)
    splits_path = f"{dbfs_base}/splits{time_length}"
    folds_input_path = f"{splits_path}/scaled_smote_folds"
    N_SPLITS = 4

    print(f"\n==== RUNNING BASELINE for {time_window}-month window ====")
    baseline_results = evaluate_baseline_on_folds(
        folds_input_path, 
        N_SPLITS=N_SPLITS,
        plot_pr=plot_pr
    )

    print("\n==== RUNNING MODEL CV ====")
    model_results = run_model_cv(
        folds_input_path,
        N_SPLITS=N_SPLITS,
        pipeline=pipeline,
        model=model,
        plot_pr=plot_pr
    )

    # Save results to DBFS as Parquet
    import pandas as pd
    results_df = pd.DataFrame([{
        "time_window": time_window,
        "baseline": baseline_results,
        "model": model_results
    }])
    spark_df = spark.createDataFrame(results_df)
    results_parquet_path = f"/student-groups/Group_02_01/splits{time_length}/results_{time_length}.parquet"
    spark_df.write.mode("overwrite").parquet(results_parquet_path)

    print(f"\nResults saved to: {results_parquet_path}")

    return baseline_results, model_results