In [0]:
#Imports
import pyspark.sql.functions as F
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import LogisticRegression, MultilayerPerceptronClassifier
from pyspark.ml.feature import StringIndexerModel, OneHotEncoderModel, VectorAssembler
from pyspark.ml.classification import LogisticRegressionModel
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder,  VectorAssembler, MinMaxScaler, Imputer, PCA
from pyspark.sql.functions import col, count, mean, stddev, min, max, corr, isnan, when, percentile_approx, regexp_replace, sum, to_date, unix_timestamp, date_format, rand, year
from pyspark.sql.window import Window
import pandas as pd
from pandas.tseries.holiday import USFederalHolidayCalendar
import matplotlib.pyplot as plt
import seaborn as sns
from pyspark import SparkFiles
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType
import math
from datetime import timedelta
import builtins
from pyspark.ml.functions import vector_to_array
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.types import ArrayType, DoubleType
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.tuning import ParamGridBuilder
import numpy as np



In [0]:
display(dbutils.fs.ls("dbfs:/mnt/mids-w261/OTPW_60M_Backup/"))


In [0]:
df_otpw_60_months = spark.read.parquet("dbfs:/mnt/mids-w261/OTPW_60M_Backup/")
display(df_otpw_60_months)

# Data Cleaning and EDA

### Redefining clean dataset to create target variable and derived features

- added derived feature for rolling past 3 month number of delays per airline
- changed is holiday to be +- 3 days
- added back in the delay target since this doesnt exist in the 60 m data

In [0]:
def clean_dataset(df):
    # First, derive features (including DELAY_TARGET)
    df = derive_features(df)
    # Now, remove duplicate rows based on all columns including DELAY_TARGET
    df = df.dropDuplicates()
    # Continue with further transformations
    df = transform_features(df)
    df = create_time_based_feature(df)
    df = subset_columns(df)

    df = df.orderBy("FL_DATE")

    return df



def derive_features(df):

    #create DELAY_TARGET as target variable
    # Check if 'ARR_DELAY' exists in the DataFrame
    if "ARR_DELAY" in df.columns:
        df = df.withColumn(
            "DELAY_TARGET",
            when((col("CANCELLED") == 1) | (col("DIVERTED") == 1), 1)
            .when(col("ARR_DELAY") < 15, 0)
            .otherwise(1)
        )
    else:
        # Fallback: Assume flight is delayed if cancelled/diverted otherwise keep existing values
        df = df.withColumn(
            "DELAY_TARGET",
            when((col("CANCELLED") == 1) | (col("DIVERTED") == 1), 1)
            .otherwise(0)
        )

    #create HAS_WEATHER_TYPE if Hourly Present Weather Type is not null
    df = df.withColumn(
        "HAS_WEATHER_TYPE",
        when(col("HourlyPresentWeatherType").isNull(), 0)
        .otherwise(1)
    )

    # create IS_HOLIDAY
    # Step 1: Convert FL_DATE to Spark DateType
    df = df.withColumn("FL_DATE", to_date(col("FL_DATE"), "yyyy-MM-dd"))
    
    # Step 2: Generate US federal holidays for 2015 via Pandas
    cal = USFederalHolidayCalendar()
    holidays = cal.holidays(start="2015-01-01", end="2019-12-31")
    
    # Step 3: Expand each holiday to ±3 days and convert to string
    expanded_holiday_dates = set()
    for holiday in holidays:
        for offset in range(-3, 4):  # -3, -2, -1, 0, 1, 2, 3
            expanded_date = holiday + timedelta(days=offset)
            expanded_holiday_dates.add(expanded_date.strftime("%Y-%m-%d"))
        
    # Step 3: Broadcast the holiday list for efficient lookup
    holidays_broadcast = spark.sparkContext.broadcast(expanded_holiday_dates)
    
    # Step 4: Add 'IS_HOLIDAY' column.
    # Convert FL_DATE to "yyyy-MM-dd" string, then check if that string is in the broadcasted holiday set
    df = df.withColumn(
        "IS_HOLIDAY",
        date_format(col("FL_DATE"), "yyyy-MM-dd").isin(holidays_broadcast.value).cast("string")
    )

    #Step 5: add YEAR:
    df = df.withColumn("YEAR", year(col("FL_DATE")))
    
    return df



def transform_features(df):
    df =  df.withColumn(
            "HourlyPrecipitation",
            when(col("HourlyPrecipitation") == "T", 0)  # Replace 'T' with 0
            .otherwise(regexp_replace(col("HourlyPrecipitation"), "s", ""))  # Remove 's'
            .cast("float")
    )

    df = df.withColumn("HourlyWindDirection", col("HourlyWindDirection").cast("float"))
    df =  df.withColumn("HourlyPressureChange", col("HourlyPressureChange").cast("float"))


    df =  df.withColumn(
            "HourlyWindSpeed",
            when(col("HourlyWindSpeed") == "*", 0)  # Replace '*' with 0
            .otherwise(
                when(col("HourlyWindSpeed").contains("s"), regexp_replace(col("HourlyWindSpeed"), "s", ""))
                .otherwise(regexp_replace(col("HourlyWindSpeed"), "V", ""))
            )
            .cast("float")
    )


    df =  df.withColumn(
        "HourlyWetBulbTemperature",
        when(col("HourlyWetBulbTemperature") == "*", 0)  # Replace '*' with 0
        .when(col("HourlyWetBulbTemperature") == "T", 0)  # Replace 'T' with 0
        .otherwise(
            when(col("HourlyWetBulbTemperature").contains("s"), regexp_replace(col("HourlyWetBulbTemperature"), "s", ""))
            .otherwise(regexp_replace(col("HourlyWetBulbTemperature"), "V", ""))
        )
        .cast("float")  # Convert to float
    )

    df =  df.withColumn(
        "HourlyVisibility",
        when(col("HourlyVisibility") == "*", 0)  # Replace '*' with 0
        .when(col("HourlyVisibility") == "T", 0)  # Replace 'T' with 0
        .otherwise(
            when(col("HourlyVisibility").contains("s"), regexp_replace(col("HourlyVisibility"), "s", ""))
            .otherwise(regexp_replace(col("HourlyVisibility"), "V", ""))
        )
        .cast("float")  # Convert to float
    )

    df =  df.withColumn(
        "HourlyStationPressure",
        when(col("HourlyStationPressure") == "*", 0)  # Replace '*' with 0
        .when(col("HourlyStationPressure") == "T", 0)  # Replace 'T' with 0
        .otherwise(
            when(col("HourlyStationPressure").contains("s"), regexp_replace(col("HourlyStationPressure"), "s", ""))
            .otherwise(regexp_replace(col("HourlyStationPressure"), "V", ""))
        )
        .cast("float")  # Convert to float
    )

    df =  df.withColumn(
        "HourlyDewPointTemperature",
        when(col("HourlyDewPointTemperature") == "*", 0)  # Replace '*' with 0
        .when(col("HourlyDewPointTemperature") == "T", 0)  # Replace 'T' with 0
        .otherwise(
            when(col("HourlyDewPointTemperature").contains("s"), regexp_replace(col("HourlyDewPointTemperature"), "s", ""))
            .otherwise(regexp_replace(col("HourlyDewPointTemperature"), "V", ""))
        )
        .cast("float")  # Convert to float
    )

    df =  df.withColumn(
        "HourlyAltimeterSetting",
        when(col("HourlyAltimeterSetting") == "*", 0)  # Replace '*' with 0
        .when(col("HourlyAltimeterSetting") == "T", 0)  # Replace 'T' with 0
        .otherwise(
            when(col("HourlyAltimeterSetting").contains("s"), regexp_replace(col("HourlyAltimeterSetting"), "s", ""))
            .otherwise(regexp_replace(col("HourlyAltimeterSetting"), "V", ""))
        )
        .cast("float")  # Convert to float
    )

    df =  df.withColumn(
        "HourlyDryBulbTemperature",
        when(col("HourlyDryBulbTemperature") == "*", 0)  # Replace '*' with 0
        .when(col("HourlyDryBulbTemperature") == "T", 0)  # Replace 'T' with 0
        .otherwise(
            when(col("HourlyDryBulbTemperature").contains("s"), regexp_replace(col("HourlyDryBulbTemperature"), "s", ""))
            .otherwise(regexp_replace(col("HourlyDryBulbTemperature"), "V", ""))
        )
        .cast("float")  # Convert to float
    )

    df =  df.withColumn(
        "HourlyRelativeHumidity",
        when(col("HourlyRelativeHumidity") == "*", 0)  # Replace '*' with 0
        .when(col("HourlyRelativeHumidity") == "T", 0)  # Replace 'T' with 0
        .otherwise(
            when(col("HourlyRelativeHumidity").contains("s"), regexp_replace(col("HourlyRelativeHumidity"), "s", ""))
            .otherwise(regexp_replace(col("HourlyRelativeHumidity"), "V", ""))
        )
        .cast("float")  # Convert to float
    )

    df = df.withColumn("sched_depart_unix", unix_timestamp("sched_depart_date_time_UTC"))

    return df

def create_time_based_feature(df):
    # create a "year_month" column (e.g., "2017-05")
    df = df.withColumn("year_month", date_format(col("FL_DATE"), "yyyy-MM"))

    # aggregate count of DELAY_TARGET = 1 per carrier per month
    monthly_delay_counts = (
        df.filter(col("DELAY_TARGET") == 1)
        .groupBy("OP_UNIQUE_CARRIER", "year_month")
        .count()
        .withColumnRenamed("count", "monthly_delay_count")
    )

    # define a window of the last 3 months **ordered by year_month**
    window_spec = (
        Window
        .partitionBy("OP_UNIQUE_CARRIER")
        .orderBy("year_month")
        .rowsBetween(-3, -1)
    )

    # compute rolling sum of delay counts
    rolling_delays = monthly_delay_counts.withColumn(
        "rolling_3mo_delay_count",
        sum("monthly_delay_count").over(window_spec)
    )

    df_with_rolling = df.join(
        rolling_delays,
        on=["OP_UNIQUE_CARRIER", "year_month"],
        how="left"
    )

    return df_with_rolling

def subset_columns(df):
    columns_to_keep=["QUARTER",  "DAY_OF_MONTH",  "DAY_OF_WEEK",  "OP_UNIQUE_CARRIER",  "OP_CARRIER_FL_NUM",  "ORIGIN",  "DEST",  "CRS_DEP_TIME",  "DEP_TIME",  "DEP_DELAY",  "TAXI_OUT",  "WHEELS_OFF",  "WHEELS_ON",  "TAXI_IN",  "CRS_ARR_TIME",  "CANCELLED",  "DIVERTED",  "CRS_ELAPSED_TIME",  "AIR_TIME",  "DISTANCE",  "MONTH",  "origin_type",  "origin_region",  "origin_airport_lat",  "origin_airport_lon",  "dest_type",  "dest_region",  "dest_airport_lat",  "dest_airport_lon",  "sched_depart_unix",  "ELEVATION",  "HourlyAltimeterSetting",  "HourlyDewPointTemperature",  "HourlyDryBulbTemperature",  "HourlyPrecipitation",  "HourlyRelativeHumidity",  "HourlySkyConditions",  "HourlyStationPressure",  "HourlyVisibility",  "HourlyWetBulbTemperature",  "HourlyWindDirection",  "HourlyWindSpeed",  "DELAY_TARGET",  "HAS_WEATHER_TYPE",  "IS_HOLIDAY", "FL_DATE", "rolling_3mo_delay_count", "YEAR"]

    return df.select(columns_to_keep)


In [0]:
df_otpw_60_months = clean_dataset(df_otpw_60_months)

In [0]:
train_df = df_otpw_60_months

In [0]:
display(df_otpw_60_months)

In [0]:

@F.udf(returnType=VectorUDT())
def array_to_vector_udf(arr):
    return Vectors.dense(arr)

def time_cv_years(train_df, pipeline):

    # Each tuple is (training_upper_bound_year, validation_year).
    # For example, (2017, 2018) means:
    #   TRAIN: all rows with YEAR <= 2017
    #   VALID: all rows with YEAR == 2018
    year_folds = [
        (2015, 2016),
        #(2016, 2017),
        (2017, 2018),
        # Add more folds as needed.
    ]

    trainPosPrior = 0.5
    realPosPrior = 0.2

    # Containers for fold metrics
    all_precision = []
    all_recall = []
    all_auc = []
    all_f1 = []

    all_precision_calibrated = []
    all_recall_calibrated = []
    all_auc_calibrated  = []
    all_f1_calibrated = []

    # Set up the binary classification evaluator for AUC
    evaluator_auc = BinaryClassificationEvaluator(
        labelCol="DELAY_TARGET",
        rawPredictionCol="rawPrediction",
        metricName="areaUnderROC"
    )

    # Evaluator that will use the "calibrated" probability to measure AUC
    evaluator_auc_cal = BinaryClassificationEvaluator(
        labelCol="DELAY_TARGET",
        rawPredictionCol="prob_calibrated_vec",  # we'll create this column
        metricName="areaUnderROC"
    )

    # We'll filter subsets from the full DataFrame
    full_df = train_df


    for (train_y, valid_y) in year_folds:
        # TRAIN: YEAR <= train_y
        train_subset = full_df.filter(F.col("YEAR") <= train_y)

        # VALID: YEAR == valid_y
        valid_subset = full_df.filter(F.col("YEAR") == valid_y)

        # Fit the pipeline on the training subset
        pipeline_model = pipeline.fit(train_subset)

        # Predict => select minimal columns, then cache
        preds = pipeline_model.transform(valid_subset).select(
            "DELAY_TARGET", "prediction", "rawPrediction",
            "probability", "scaled_features"
        )

        preds.cache()

        # Compute AUC
        auc = evaluator_auc.evaluate(preds)

        # Build confusion matrix with pivot
        cm = (
            preds.groupBy("DELAY_TARGET", "prediction")
                 .count()
        )
        pivot_df = (
            cm.groupBy("DELAY_TARGET")
              .pivot("prediction", [0,1])
              .sum("count")
              .fillna(0)
        )
        # Convert pivot result to a dict
        dvals = { (r["DELAY_TARGET"], c): r[c] for r in pivot_df.collect() for c in ["0","1"] }
        def get_count(label, pred):
            return dvals.get((label, str(pred)), 0)
        tn = get_count(0,0)
        fp = get_count(0,1)
        fn = get_count(1,0)
        tp = get_count(1,1)

        # Calculate precision and recall (with safe division)
        precision = tp / float(tp + fp) if (tp + fp) else 0.0
        recall = tp / float(tp + fn) if (tp + fn) else 0.0

        f1 = 2.0 * precision * recall / (precision + recall) if (precision + recall) else 0.0

        # Collect metrics
        all_precision.append(precision)
        all_recall.append(recall)
        all_auc.append(auc)
        all_f1.append(f1)

        first_row_vector = preds.select("scaled_features").first()["scaled_features"]
        num_features = len(first_row_vector)
        num_rows = preds.count()

        print(f"Shape of feature matrix: ({num_rows}, {num_features})")
        print('Non-Calibrated:')
        print(f"Fold: Train <= Year {train_y}, Validate = Year {valid_y} "
              f"=> Precision={precision:.4f}, Recall={recall:.4f}, AUC={auc:.4f}")
        print(f"F1={f1:.4f}")


        #Calibrate
        # Extract p_raw = probability of positive class
        preds_cal = preds.withColumn("p_raw", vector_to_array(F.col("probability"))[1])

        preds_cal.cache()
        

        
        #Apply prior correction
        p_cal_expr = (
            (F.col("p_raw") * (realPosPrior / trainPosPrior))
            /
            (
                (F.col("p_raw") * (realPosPrior / trainPosPrior))
                + ((1 - F.col("p_raw")) * ((1 - realPosPrior) / (1 - trainPosPrior)))
            )
        )

        preds_cal = preds_cal.withColumn("p_calibrated", p_cal_expr)

        preds_cal = preds_cal.withColumn(
            "prob_calibrated_vec",
            F.array((1 - F.col("p_calibrated")), F.col("p_calibrated"))
        )

        preds_cal = preds_cal.withColumn(
            "prediction_cal",
            (F.col("p_calibrated") >= 0.3).cast("integer")
        )

        # **Convert** that array into a Spark ML vector:
        preds_cal = preds_cal.withColumn(
            "prob_calibrated_vec",  
            array_to_vector_udf(F.col("prob_calibrated_vec"))
        )


        #Compute AUC (calibrated)
        auc_cal = evaluator_auc_cal.evaluate(preds_cal)


        cm_cal = (
            preds_cal.groupBy("DELAY_TARGET", "prediction_cal")
                     .count()
        )
        pivot_cal = (
            cm_cal.groupBy("DELAY_TARGET")
                  .pivot("prediction_cal", [0,1])
                  .sum("count")
                  .fillna(0)
        )
        dvals_cal = { (r["DELAY_TARGET"], c): r[c] for r in pivot_cal.collect() for c in ["0","1"] }
        def get_cal(label, pred):
            return dvals_cal.get((label, str(pred)), 0)
        
        tn = get_cal(0,0)
        fp = get_cal(0,1)
        fn = get_cal(1,0)
        tp = get_cal(1,1)
        


        precision_cal = tp / float(tp + fp) if (tp + fp) else 0.0
        recall_cal = tp / float(tp + fn) if (tp + fn) else 0.0

        f1_cal = 2.0 * precision_cal * recall_cal / (precision_cal + recall_cal) if (precision_cal + recall_cal) else 0.0

        all_precision_calibrated.append(precision_cal)
        all_recall_calibrated.append(recall_cal)
        all_auc_calibrated.append(auc_cal)
        all_f1_calibrated.append(f1_cal)

        print('Calibrated:')
        print(f"Fold: Train <= {train_y}, Validate = {valid_y}")
        print(f"   => Shape: ({num_rows}, {num_features})")
        print(f"   => Precision_cal={precision_cal:.4f}, Recall_cal={recall_cal:.4f}, AUC_cal={auc_cal:.4f}")
        print(f"   => F1_cal={f1_cal:.4f}")
        print()
        
        preds_cal.unpersist()
        preds.unpersist()

    # Aggregate metrics across folds
    # Use builtins to prevent conflicts in pySpark SQL sum
    avg_precision = builtins.sum(all_precision) / len(all_precision)
    avg_recall = builtins.sum(all_recall) / len(all_recall)
    avg_auc = builtins.sum(all_auc) / len(all_auc)
    avg_f1 = builtins.sum(all_f1) / len(all_f1)

    print("\nTime-Series CV Results:")
    print(f"Avg Precision: {avg_precision:.4f}")
    print(f"Avg Recall:    {avg_recall:.4f}")
    print(f"Avg AUC:       {avg_auc:.4f}")
    print(f"Avg F1:        {avg_f1:.4f}")

    avg_precision_calibrated = builtins.sum(all_precision_calibrated) / len(all_precision_calibrated)
    avg_recall_calibrated = builtins.sum(all_recall_calibrated) / len(all_recall_calibrated)
    avg_auc_calibrated = builtins.sum(all_auc_calibrated) / len(all_auc_calibrated)
    avg_f1_calibrated = builtins.sum(all_f1_calibrated) / len(all_f1_calibrated)

    print("Calibrated:")
    print(f"Avg Precision_cal: {avg_precision_calibrated:.4f}")
    print(f"Avg Recall_cal:    {avg_recall_calibrated:.4f}")
    print(f"Avg AUC_cal:       {avg_auc_calibrated:.4f}")
    print(f"Avg F1_cal:        {avg_f1_calibrated:.4f}")


    return avg_precision, avg_recall, avg_auc, avg_f1, avg_precision_calibrated, avg_recall_calibrated, avg_auc_calibrated, avg_f1_calibrated



In [0]:
def time_cv_years_with_gridsearch(
    train_df,
    pipeline,
    paramGrid,
):

    best_params = None
    best_params_calibrated = None
    best_metrics = (0.0, 0.0, 0.0, 0.0)
    best_metrics_calibrated = (0.0, 0.0, 0.0, 0.0)
    best_auc = float("-inf")
    best_auc_calibrated = float("-inf")
    best_recall = float("-inf")
    best_recall_calibrated = float("-inf")

    for i, params in enumerate(paramGrid):
        # Make a copy of the pipeline with these param settings
        pipeline_with_params = pipeline.copy(params)

        # Evaluate with custom CV
        avg_precision, avg_recall, avg_auc, avg_f1, avg_precision_calibrated, avg_recall_calibrated, avg_auc_calibrated, avg_f1_calibrated = time_cv_years(
            train_df, pipeline_with_params
        )

        print(f"[GridSearch] Param Set {i+1}/{len(paramGrid)} Params: {params} => "
              f"AUC={avg_auc:.4f}, Precision={avg_precision:.4f}, Recall={avg_recall:.4f}, f1={avg_f1:.4f}, Params={params}")
        print()
        print(f"Calibrated: AUC={avg_auc_calibrated:.4f}, Precision={avg_precision_calibrated:.4f}, Recall={avg_recall_calibrated:.4f}, f1={avg_f1_calibrated:.4f}")

        # Decide if this set is better
        if avg_recall > best_recall:
            best_recall = avg_recall
            best_params = params
            best_metrics = (avg_precision, avg_recall, avg_auc, avg_f1)

        if avg_recall_calibrated > best_recall_calibrated:
            best_recall_calibrated = avg_recall_calibrated
            best_params_calibrated = params
            best_metrics_calibrated = (
                avg_precision_calibrated,
                avg_recall_calibrated,
                avg_auc_calibrated,
                avg_f1_calibrated
            )
    print(f"[GridSearch] Best Params: {best_params}, AUC={best_auc:.4f}, Precision={best_metrics[0]:.4f}, Recall={best_metrics[1]:.4f}")

    return best_params, best_metrics, best_params_calibrated, best_metrics_calibrated

In [0]:
def mlp_gridsearch(
    train_df,
    test_df,
    pipeline,
    paramGrid
    
):

    best_params = None
    best_params_calibrated = None
    best_metrics = (0.0, 0.0, 0.0, 0.0)
    best_metrics_calibrated = (0.0, 0.0, 0.0, 0.0)
    best_auc = float("-inf")
    best_auc_calibrated = float("-inf")

    for i, params in enumerate(paramGrid):
        # Make a copy of the pipeline with these param settings
        pipeline_with_params = pipeline.copy(params)

        fitted_model = pipeline_with_params.fit(train_df)

        precision, recall, auc, f1, precision_cal, recall_cal, auc_cal, f1_cal = evaluate_ml_model(fitted_model, test_df)


        print(f"[GridSearch] Param Set {i+1}/{len(paramGrid)} Params: {params} => "
              f"AUC={auc:.4f}, Precision={precision:.4f}, Recall={recall:.4f}, f1={f1:.4f}, Params={params}")
        print()
        print(f"Calibrated: AUC={auc_cal:.4f}, Precision={precision_cal:.4f}, Recall={recall_cal:.4f}, f1={f1_cal:.4f}")

        # Decide if this set is better
        if auc > best_auc:
            best_auc = auc
            best_params = params
            best_metrics = (precision, recall, auc, f1)

        if auc_cal > best_auc_calibrated:
            best_auc_calibrated = auc_cal
            best_params_calibrated = params
            best_metrics_calibrated = (
                precision_cal,
                recall_cal,
                auc_cal,
                f1_cal
            )
    print(f"[GridSearch] Best Params: {best_params}, AUC={best_auc:.4f}, Precision={best_metrics[0]:.4f}, Recall={best_metrics[1]:.4f}")

    return best_params, best_metrics, best_params_calibrated, best_metrics_calibrated

In [0]:
def evaluate_ml_model(fitted_model, test_df):
    
    trainPosPrior = 0.5
    realPosPrior = 0.2


    # Set up the binary classification evaluator for AUC
    evaluator_auc = BinaryClassificationEvaluator(
        labelCol="DELAY_TARGET",
        rawPredictionCol="rawPrediction",
        metricName="areaUnderROC"
    )

    # Evaluator that will use the "calibrated" probability to measure AUC
    evaluator_auc_cal = BinaryClassificationEvaluator(
        labelCol="DELAY_TARGET",
        rawPredictionCol="prob_calibrated_vec",  # we'll create this column
        metricName="areaUnderROC"
    )


    # Generate predictions
    preds = fitted_model.transform(test_df).select(
            "DELAY_TARGET", "prediction", "rawPrediction",
            "probability", "scaled_features"
        )
    
    preds.cache()

    # Compute AUC
    auc = evaluator_auc.evaluate(preds)

    # Compute true positives, false positives, and false negatives
    tp = preds.filter((F.col("prediction") == 1) & (F.col("DELAY_TARGET") == 1)).count()
    fp = preds.filter((F.col("prediction") == 1) & (F.col("DELAY_TARGET") == 0)).count()
    fn = preds.filter((F.col("prediction") == 0) & (F.col("DELAY_TARGET") == 1)).count()

    # Calculate precision and recall (with safe division)
    precision = tp / float(tp + fp) if (tp + fp) else 0.0
    recall = tp / float(tp + fn) if (tp + fn) else 0.0

    f1 = 2.0 * precision * recall / (precision + recall) if (precision + recall) else 0.0



    print('Non-Calibrated:')
    print(f"=> Precision={precision:.4f}, Recall={recall:.4f}, AUC={auc:.4f}")
    print(f"F1={f1:.4f}")


    #Calibrate
    # Extract p_raw = probability of positive class
    preds_cal = preds.withColumn("p_raw", vector_to_array(F.col("probability"))[1])
    

    
    #Apply prior correction
    p_cal_expr = (
        (F.col("p_raw") * (realPosPrior / trainPosPrior))
        /
        (
            (F.col("p_raw") * (realPosPrior / trainPosPrior))
            + ((1 - F.col("p_raw")) * ((1 - realPosPrior) / (1 - trainPosPrior)))
        )
    )

    preds_cal = preds_cal.withColumn("p_calibrated", p_cal_expr)

    preds_cal = preds_cal.withColumn(
        "prob_calibrated_vec",
        F.array((1 - F.col("p_calibrated")), F.col("p_calibrated"))
    )

    preds_cal = preds_cal.withColumn(
        "prediction_cal",
        (F.col("p_calibrated") >= 0.30).cast("integer")
    )

    # **Convert** that array into a Spark ML vector:
    preds_cal = preds_cal.withColumn(
        "prob_calibrated_vec",  
        array_to_vector_udf(F.col("prob_calibrated_vec"))
    )


    #Compute AUC (calibrated)
    auc_cal = evaluator_auc_cal.evaluate(preds_cal)

    # --------------------------
    # Compute precision, recall from "prediction_cal"
    tp = preds_cal.filter((F.col("prediction_cal") == 1) & (F.col("DELAY_TARGET") == 1)).count()
    fp = preds_cal.filter((F.col("prediction_cal") == 1) & (F.col("DELAY_TARGET") == 0)).count()
    fn = preds_cal.filter((F.col("prediction_cal") == 0) & (F.col("DELAY_TARGET") == 1)).count()

    precision_cal = tp / float(tp + fp) if (tp + fp) else 0.0
    recall_cal = tp / float(tp + fn) if (tp + fn) else 0.0

    f1_cal = 2.0 * precision_cal * recall_cal / (precision_cal + recall_cal) if (precision_cal + recall_cal) else 0.0
    print('Calibrated:')
    print(f"   => Precision_cal={precision_cal:.4f}, Recall_cal={recall_cal:.4f}, AUC_cal={auc_cal:.4f}")
    print(f"   => F1_cal={f1_cal:.4f}")
    print()

    preds.unpersist()

    return precision, recall, auc, f1, precision_cal, recall_cal, auc_cal, f1_cal


##To use the time_cv_years_with_gridsearch:
Create a paramGrid, MLP Example:
paramGrid = (

    ParamGridBuilder()
    .addGrid(yourmodel.maxIter, [50, 100])
    .addGrid(yourmodel.stepSize, [0.01, 0.1])
    .addGrid(yourmodel.layers, [
        [3, 5, 4, 2],   # 2 hidden layers of size 5 and 4
        [3, 8, 2],      # 1 hidden layer of size 8
    ])
    .build()
)

best_params, best_metrics, best_params_calibrated, best_metrics_calibrated = time_cv_years_with_gridsearch(

    train_df=train_df,
    pipeline=pipeline, #pass the pipeline with yur model
    paramGrid=paramGrid
)





#Baseline Model Pipeline


In [0]:
checkpoint_path = "/tmp/train_df_60_months"
train_df = spark.read.parquet(checkpoint_path)
train_df = train_df.orderBy(col("FL_DATE").asc())

In [0]:
display(train_df)

In [0]:
display(train_df.tail(5))

In [0]:
train_df.groupBy("DELAY_TARGET").count().show()

+------------+--------+
|DELAY_TARGET|   count|
+------------+--------+
|           1| 4814891|
|           0|19464430|
+------------+--------+



In [0]:
# Umesh - adding the code to load test_df
checkpoint_path_test = "/tmp/test_df_60_months"
test_df = spark.read.parquet(checkpoint_path_test)
test_df = test_df.dropna()
test_df = test_df.orderBy(col("FL_DATE").asc())

In [0]:
display(test_df)

In [0]:
(test_df.count(), len(test_df.columns))

(6009508, 48)

In [0]:
(train_df.count(), len(train_df.columns))

(24279321, 48)

In [0]:
ratio_df = train_df.groupBy("DELAY_TARGET").count()
ratio_df = ratio_df.withColumnRenamed("count", "count_delay_target")
total_count = ratio_df.agg({"count_delay_target": "sum"}).collect()[0][0]
ratio_df = ratio_df.withColumn("ratio", ratio_df["count_delay_target"] / total_count)
display(ratio_df)

DELAY_TARGET,count_delay_target,ratio
1,4814891,0.1983124239759423
0,19464430,0.8016875760240577


In [0]:


# Create baseline predictions (predict "on time" 0 with N% probability)
train_df = train_df.withColumn("baseline_prediction", when(rand() <= ratio_df.filter(col("DELAY_TARGET") == 0).collect()[0]["ratio"], 0).otherwise(1).cast("double")
)

# Prepare data for evaluation
assembler = VectorAssembler(inputCols=["baseline_prediction"], outputCol="feature_cols_3")
train_df = assembler.transform(train_df)

# Evaluate baseline model using AUC
evaluator = BinaryClassificationEvaluator(
    labelCol="DELAY_TARGET",
    rawPredictionCol="baseline_prediction",
    metricName="areaUnderROC"
)
auc = evaluator.evaluate(train_df)

# Calculate precision and recall
tp = train_df.filter((train_df.DELAY_TARGET == 1) & (train_df.baseline_prediction == 1)).count()
fp = train_df.filter((train_df.DELAY_TARGET == 0) & (train_df.baseline_prediction == 1)).count()
fn = train_df.filter((train_df.DELAY_TARGET == 1) & (train_df.baseline_prediction == 0)).count()





In [0]:
#Basline metrics
precision = round(tp / (tp + fp) if (tp + fp) > 0 else 0.0, 2)
recall = round(tp / (tp + fn) if (tp + fn) > 0 else 0.0, 2)
auc = round(auc, 2)
precision, recall, auc

(0.2, 0.2, 0.5)

##Absolute Baseline
precision = 0.2, recall = 0.2, auc = 0.5

In [0]:
def downsample(df):
    df = df.orderBy(col("FL_DATE").asc())
    df_minority = df.filter(df.DELAY_TARGET == 1)
    count_minority = df.filter(col("DELAY_TARGET") == 1).count()
    df_majority_downsampled = df.filter(col("DELAY_TARGET") == 0).sample(fraction=count_minority / df.filter(col("DELAY_TARGET") == 0).count(), seed=42)
    df_downsampled = df_majority_downsampled.union(df_minority)
    df_downsampled = df_downsampled.orderBy("FL_DATE")
    return df_downsampled


In [0]:
train_df = train_df.dropna()
train_df = downsample(train_df)
display(train_df)

In [0]:
train_df.groupBy("DELAY_TARGET").count().show()

+------------+-------+
|DELAY_TARGET|  count|
+------------+-------+
|           0|3733373|
|           1|3731522|
+------------+-------+



In [0]:
cols_to_cast = [
    "CRS_DEP_TIME", "CRS_ARR_TIME", "CRS_ELAPSED_TIME", "DISTANCE", "origin_airport_lat", "origin_airport_lon", 
    "dest_airport_lat", "dest_airport_lon", "ELEVATION",
    "HourlyAltimeterSetting", "HourlyDewPointTemperature", "HourlyPrecipitation",
    "HourlyDryBulbTemperature", "HourlyRelativeHumidity", "HourlyStationPressure",
    "HourlyVisibility", "HourlyWetBulbTemperature", "HourlyWindDirection",
    "HourlyWindSpeed", "rolling_3mo_delay_count", "DAY_OF_MONTH", 
]

for col_name in cols_to_cast:
    train_df = train_df.withColumn(col_name, col(col_name).cast("float"))

for col_name in cols_to_cast:
    test_df = test_df.withColumn(col_name, col(col_name).cast("float"))


In [0]:
# Categorical columns
categorical_cols = [
  "OP_UNIQUE_CARRIER", "origin_type", "dest_type",
    "QUARTER", "IS_HOLIDAY", "YEAR", "MONTH", "HAS_WEATHER_TYPE", "DAY_OF_WEEK"
]

# Numerical columns (removed the duplicate "HourlyPrecipitation")
numerical_cols = [
    "CRS_DEP_TIME", "CRS_ARR_TIME", "CRS_ELAPSED_TIME", "DISTANCE", "origin_airport_lat", "origin_airport_lon", 
    "dest_airport_lat", "dest_airport_lon", "sched_depart_unix", "ELEVATION",
    "HourlyAltimeterSetting", "HourlyDewPointTemperature", "HourlyPrecipitation",
    "HourlyDryBulbTemperature", "HourlyRelativeHumidity", "HourlyStationPressure",
    "HourlyVisibility", "HourlyWetBulbTemperature", "HourlyWindDirection",
    "HourlyWindSpeed", "rolling_3mo_delay_count", "DAY_OF_MONTH", 
]


# 1) StringIndex each categorical column
indexers = [
    StringIndexer(inputCol=col, outputCol=f"{col}_index", handleInvalid="keep")
    for col in categorical_cols
]

# 2) One-Hot Encode each indexed column
encoders = [
    OneHotEncoder(inputCol=f"{col}_index", outputCol=f"{col}_ohe", handleInvalid="keep")
    for col in categorical_cols
]

# 3) VectorAssembler for all features (OHE + numeric)
feature_cols = [f"{col}_ohe" for col in categorical_cols] + numerical_cols
vector_assembler = VectorAssembler(
    inputCols=feature_cols,
    outputCol="assembled_features",
    handleInvalid="keep"
)

# 4) MinMaxScaler on assembled_features -> scaled_features
scaler = MinMaxScaler(
    inputCol="assembled_features",
    outputCol="scaled_features"
)



In [0]:
lr = LogisticRegression(
    featuresCol="scaled_features",
    labelCol="DELAY_TARGET",
    predictionCol="prediction",         
    rawPredictionCol="rawPrediction",   
    probabilityCol="probability"    
)

baseline_pipeline = Pipeline(stages=indexers + encoders + [vector_assembler, scaler, lr])

## Training Data Results

In [0]:
avg_precision, avg_recall, avg_auc, avg_f1, avg_precision_calibrated, avg_recall_calibrated, avg_auc_calibrated, avg_f1_calibrated = time_cv_years(train_df, baseline_pipeline)



Downloading artifacts:   0%|          | 0/210 [00:00<?, ?it/s]

Uploading artifacts:   0%|          | 0/4 [00:00<?, ?it/s]

Shape of feature matrix: (1696836, 79)
Non-Calibrated:
Fold: Train <= Year 2015, Validate = Year 2016 => Precision=0.5961, Recall=0.5778, AUC=0.6468
F1=0.5868
Calibrated:
Fold: Train <= 2015, Validate = 2016
   => Shape: (1696836, 79)
   => Precision_cal=0.6657, Recall_cal=0.2351, AUC_cal=0.6468
   => F1_cal=0.3475





Downloading artifacts:   0%|          | 0/210 [00:00<?, ?it/s]

Uploading artifacts:   0%|          | 0/4 [00:00<?, ?it/s]

Shape of feature matrix: (2226090, 81)
Non-Calibrated:
Fold: Train <= Year 2017, Validate = Year 2018 => Precision=0.6068, Recall=0.6425, AUC=0.6433
F1=0.6241
Calibrated:
Fold: Train <= 2017, Validate = 2018
   => Shape: (2226090, 81)
   => Precision_cal=0.6813, Recall_cal=0.2863, AUC_cal=0.6433
   => F1_cal=0.4032


Time-Series CV Results:
Avg Precision: 0.6014
Avg Recall:    0.6101
Avg AUC:       0.6451
Avg F1:        0.6055
Calibrated:
Avg Precision_cal: 0.6735
Avg Recall_cal:    0.2607
Avg AUC_cal:       0.6451
Avg F1_cal:        0.3754


### Threshold of 0.4

Shape of feature matrix: (2223217, 82)
Non-Calibrated:
Fold: Train <= Year 2017, Validate = Year 2018 
- Precision=0.6582
- Recall=0.6754
- AUC=0.7105
- F1=0.6667

Calibrated:
Fold: Train <= 2017, Validate = 2018
Shape: (2223217, 82)
- Precision_cal=0.8729
- Recall_cal=0.1499
- AUC_cal=0.7105
- F1_cal=0.2559


Time-Series CV Results:
- Avg Precision: 0.6779
- Avg Recall:    0.6630
- Avg AUC:       0.7343
- Avg F1:        0.6700
Calibrated:
- Avg Precision_cal: 0.9008
- Avg Recall_cal:    0.1599
- Avg AUC_cal:       0.7343
- Avg F1_cal:        0.2716

### (Final) Threshold of 0.3
Shape of feature matrix: (1697609, 80)
Non-Calibrated:
Fold: Train <= Year 2015, Validate = Year 2016 => 
- Precision=0.6976
- Recall=0.6495
- AUC=0.7575
- F1=0.6727

Calibrated:
Fold: Train <= 2015, Validate = 2016
   => Shape: (1697609, 80)
- Precision_cal=0.8359
- Recall_cal=0.3563
- AUC_cal=0.7575
- F1_cal=0.4996


Shape of feature matrix: (2223294, 82)
Non-Calibrated:
Fold: Train <= Year 2017, Validate = Year 2018 => 
- Precision=0.6568
- Recall=0.6784
- AUC=0.7100
- F1=0.6674

Calibrated:
Fold: Train <= 2017, Validate = 2018
   => Shape: (2223294, 82)
- Precision_cal=0.7727
- Recall_cal=0.3657
- AUC_cal=0.7100
- F1_cal=0.4964

### Overall Results

Time-Series CV Results:
- Avg Precision: 0.6772
- Avg Recall:    0.6640
- Avg AUC:       0.7338
- Avg F1:        0.6701

Calibrated:
- Avg Precision_cal: 0.8043
- Avg Recall_cal:    0.3610
- Avg AUC_cal:       0.7338
- Avg F1_cal:        0.4980

## Test Data Results

In [0]:
spark.catalog.clearCache()


In [0]:
baseline_test = baseline_pipeline.fit(train_df)





Downloading artifacts:   0%|          | 0/210 [00:00<?, ?it/s]

Uploading artifacts:   0%|          | 0/4 [00:00<?, ?it/s]

In [0]:
precision, recall, auc, f1, precision_cal, recall_cal, auc_cal, f1_cal = evaluate_ml_model(baseline_test, test_df)
precision, recall, auc, f1, precision_cal, recall_cal, auc_cal, f1_cal

Non-Calibrated:
=> Precision=0.2633, Recall=0.6461, AUC=0.6500
F1=0.3742
Calibrated:
   => Precision_cal=0.3367, Recall_cal=0.2872, AUC_cal=0.6500
   => F1_cal=0.3100



(0.2633433233527429,
 0.6461126586813776,
 0.649989166190065,
 0.3741785378482383,
 0.3367230178838238,
 0.28720234093257324,
 0.6499892675496601,
 0.309997462406631)

### Threshold At 0.4
Non-Calibrated:
=> Precision=0.2998, Recall=0.7073, AUC=0.7171
F1=0.4211
Calibrated:
   => Precision_cal=0.6132, Recall_cal=0.1676, AUC_cal=0.7171
   => F1_cal=0.2632

(0.2998188895446865,
 0.7073010133573002,
 0.7170892944373173,
 0.4211260323374925,
 0.6132276106374469,
 0.16759737764194246,
 0.7170893706232652,
 0.26324807987240256)

### (Final) Threshold At 0.30
Non-Calibrated:
- Precision=0.2998
- Recall=0.7073
- AUC=0.7171
- F1=0.4211 

Calibrated: 
- Precision_cal=0.6132
- Recall_cal=0.1676
- AUC_cal=0.7171
- F1_cal=0.2632

(0.29980609442716266,
 0.7076809898126404,
 0.7171843933165093,
 0.4211807316937796,
 0.43505609073574203,
 0.39625818619366826,
 0.7171841548987117,
 0.41475177845909095)
 0.41451334958181213)

In [0]:
# Extract pipeline stages
assembler = [s for s in baseline_test.stages if isinstance(s, VectorAssembler)][0]
ohe_models = [s for s in baseline_test.stages if isinstance(s, OneHotEncoderModel)]
indexer_models = [s for s in baseline_test.stages if isinstance(s, StringIndexerModel)]

# Build mappings
ohe_output_sizes = {}
ohe_input_output_map = {}
for ohe in ohe_models:
    out_col = ohe.getOutputCol()
    in_col = ohe.getInputCol()
    size = ohe.categorySizes[0]
    ohe_output_sizes[out_col] = size
    ohe_input_output_map[out_col] = in_col

indexer_label_dict = {
    indexer.getOutputCol(): indexer.labels for indexer in indexer_models
}

# Get coefficients
lr_model = baseline_test.stages[-1]
coefficients = lr_model.coefficients
intercept = lr_model.intercept

# Collect feature–coefficient pairs
feature_coeff_pairs = []
current_index = 0

for col in assembler.getInputCols():
    if col in numerical_cols:
        coef = coefficients[current_index]
        feature_coeff_pairs.append((col, coef))
        current_index += 1

    elif col in ohe_output_sizes:
        size = ohe_output_sizes[col]
        num_dims = size - 1  # default: dropLast=True

        index_col = ohe_input_output_map[col]
        labels = indexer_label_dict.get(index_col, [f"{col}_cat_{i}" for i in range(num_dims)])
        labels = labels[:num_dims]  # due to dropLast=True

        for i, label in enumerate(labels):
            coef = coefficients[current_index]
            feature_label = f"{col.replace('_ohe', '')} = {label}"
            feature_coeff_pairs.append((feature_label, coef))
            current_index += 1
    else:
        feature_coeff_pairs.append((f"UNKNOWN: {col}", coefficients[current_index]))
        current_index += 1

# Sort by absolute value of coefficient
sorted_features = sorted(feature_coeff_pairs, key=lambda x: abs(x[1]), reverse=True)

# Print results
print("=== Sorted Logistic Regression Coefficients (Absolute Basis) ===\n")
for feature, coef in sorted_features:
    print(f"{feature:40s} -> {coef:.6f}")

print(f"\nIntercept: {intercept:.6f}")

=== Sorted Logistic Regression Coefficients (Absolute Basis) ===

DAY_OF_MONTH                             -> 6.329973
HourlyWindDirection                      -> -2.430201
rolling_3mo_delay_count                  -> -1.496087
ELEVATION                                -> 1.268183
HourlyDryBulbTemperature                 -> -1.079132
HourlyAltimeterSetting                   -> 0.674307
HourlyDewPointTemperature                -> 0.627263
HourlyWindSpeed                          -> -0.553703
OP_UNIQUE_CARRIER = HA                   -> -0.505725
OP_UNIQUE_CARRIER = F9                   -> 0.430347
OP_UNIQUE_CARRIER = DL                   -> -0.429041
OP_UNIQUE_CARRIER = B6                   -> 0.362700
DAY_OF_WEEK = 1                          -> -0.319243
OP_UNIQUE_CARRIER = G4                   -> 0.310929
MONTH = 4                                -> 0.290975
MONTH = 1                                -> 0.290460
OP_UNIQUE_CARRIER = VX                   -> 0.257529
OP_UNIQUE_CARRIER = NK    

# Advanced Models

##MLP Single Layer

In [0]:
#Anshul
indexers = [
    StringIndexer(inputCol=col, outputCol=f"{col}_index", handleInvalid="keep")
    for col in categorical_cols
]

In [0]:
# Create the vector assembler 
feature_cols = [f"{col}_index" for col in categorical_cols] + numerical_cols
vector_assembler = VectorAssembler(
    inputCols=feature_cols,
    outputCol="assembled_features",
    handleInvalid="skip"
)

In [0]:
# MinMaxScaler - same as random forest
scaler = MinMaxScaler(
    inputCol="assembled_features",
    outputCol="scaled_features"
)

In [0]:
# Create a pipeline to determine the vector size
vector_size_pipeline = Pipeline(stages=indexers + [vector_assembler, scaler])
vector_size_model = vector_size_pipeline.fit(train_df)
vector_size_df = vector_size_model.transform(train_df)

# Get the vector size from the first row
first_row = vector_size_df.select("scaled_features").first()
vector_size = len(first_row["scaled_features"])
print(f"Input vector size: {vector_size}")



Downloading artifacts:   0%|          | 0/110 [00:00<?, ?it/s]

Uploading artifacts:   0%|          | 0/4 [00:00<?, ?it/s]

Input vector size: 31


In [0]:

MLP = MultilayerPerceptronClassifier(
    featuresCol="scaled_features",
    labelCol="DELAY_TARGET",
    predictionCol="prediction",         
    rawPredictionCol="rawPrediction",   
    probabilityCol="probability" ,
    solver="l-bfgs",
    layers=[vector_size, 20, 2]
)

In [0]:
mlp_pipeline = Pipeline(stages=indexers + [vector_assembler, scaler, MLP])

In [0]:
layer_configs = [
    [vector_size, 10, 2],  # Input layer, hidden layer with 10 neurons, output layer
    [vector_size, 20, 2],  # Input layer, hidden layer with 20 neurons, output layer
    [vector_size, 30, 2]   # Input layer, hidden layer with 30 neurons, output layer
]

# Create parameter grid
mlp_paramGrid = (
    ParamGridBuilder()
    .addGrid(MLP.layers, layer_configs)
    .addGrid(MLP.maxIter, [25, 50, 100])
    .addGrid(MLP.stepSize, [0.01, 0.05])
    .build()
)

In [0]:
# Call grid search function
best_params, best_metrics, best_params_calibrated, best_metrics_calibrated = time_cv_years_with_gridsearch(
    train_df=train_df,
    pipeline=mlp_pipeline,
    paramGrid=mlp_paramGrid
)

Shape of feature matrix: (1696903, 31)
Non-Calibrated:
Fold: Train <= Year 2015, Validate = Year 2016 => Precision=0.5887, Recall=0.5671, AUC=0.6344
F1=0.5777
Calibrated:
Fold: Train <= 2015, Validate = 2016
   => Shape: (1696903, 31)
   => Precision_cal=0.6636, Recall_cal=0.1865, AUC_cal=0.6388
   => F1_cal=0.2911

Shape of feature matrix: (2225943, 31)
Non-Calibrated:
Fold: Train <= Year 2017, Validate = Year 2018 => Precision=0.6132, Recall=0.5563, AUC=0.6364
F1=0.5833
Calibrated:
Fold: Train <= 2017, Validate = 2018
   => Shape: (2225943, 31)
   => Precision_cal=0.6979, Recall_cal=0.1761, AUC_cal=0.6343
   => F1_cal=0.2813


Time-Series CV Results:
Avg Precision: 0.6009
Avg Recall:    0.5617
Avg AUC:       0.6354
Avg F1:        0.5805
Calibrated:
Avg Precision_cal: 0.6807
Avg Recall_cal:    0.1813
Avg AUC_cal:       0.6365
Avg F1_cal:        0.2862
[GridSearch] Param Set 1/18 Params: {Param(parent='MultilayerPerceptronClassifier_e56fbf6947b6', name='layers', doc='Sizes of layers fr

In [0]:
print(best_params)
print("************")
print(best_metrics)
print("************")
print(best_params_calibrated)
print("************")
print(best_metrics_calibrated)
print("************")

{Param(parent='MultilayerPerceptronClassifier_e56fbf6947b6', name='layers', doc='Sizes of layers from input layer to output layer E.g., Array(780, 100, 10) means 780 inputs, one hidden layer with 100 neurons and output layer of 10 neurons.'): [31, 20, 2], Param(parent='MultilayerPerceptronClassifier_e56fbf6947b6', name='maxIter', doc='max number of iterations (>= 0).'): 100, Param(parent='MultilayerPerceptronClassifier_e56fbf6947b6', name='stepSize', doc='Step size to be used for each iteration of optimization (>= 0).'): 0.05}
************
(0.5760645418380439, 0.7054510213460385, 0.6392966767463464, 0.6320791488079622)
************
{Param(parent='MultilayerPerceptronClassifier_e56fbf6947b6', name='layers', doc='Sizes of layers from input layer to output layer E.g., Array(780, 100, 10) means 780 inputs, one hidden layer with 100 neurons and output layer of 10 neurons.'): [31, 10, 2], Param(parent='MultilayerPerceptronClassifier_e56fbf6947b6', name='maxIter', doc='max number of iteration

### CV Results:
{Param(parent='MultilayerPerceptronClassifier_72eef61641c1', name='layers', doc='Sizes of layers from input layer to output layer E.g., Array(780, 100, 10) means 780 inputs, one hidden layer with 100 neurons and output layer of 10 neurons.'): [32, 10, 2], Param(parent='MultilayerPerceptronClassifier_72eef61641c1', name='maxIter', doc='max number of iterations (>= 0).'): 100, Param(parent='MultilayerPerceptronClassifier_72eef61641c1', name='stepSize', doc='Step size to be used for each iteration of optimization (>= 0).'): 0.01}
************
Non Calibrated:
(precision, recall, auc, f1)

(0.8921949635310785, 0.768082360762984, 0.8868068885366374, 0.8242354873600939)
************
{Param(parent='MultilayerPerceptronClassifier_72eef61641c1', name='layers', doc='Sizes of layers from input layer to output layer E.g., Array(780, 100, 10) means 780 inputs, one hidden layer with 100 neurons and output layer of 10 neurons.'): [32, 10, 2], Param(parent='MultilayerPerceptronClassifier_72eef61641c1', name='maxIter', doc='max number of iterations (>= 0).'): 100, Param(parent='MultilayerPerceptronClassifier_72eef61641c1', name='stepSize', doc='Step size to be used for each iteration of optimization (>= 0).'): 0.01}
************
Calibrated:
- Precision: 0.9368102975566611
- Recall: 0.7069352376698033
- AUC: 0.8865981546775914
- F1: 0.8051884869202455

(precision, recall, auc, f1)
(0.9368102975566611, 0.7069352376698033, 0.8865981546775914, 0.8051884869202455)
************

In [0]:

# Create the final MLP with the best parameters
best_layers = best_params_calibrated.get('layers', layer_configs[0])
best_maxIter = best_params_calibrated.get('maxIter', 100)
best_stepSize = best_params_calibrated.get('stepSize', 0.01)

print(f"Best parameters: layers={best_layers}, maxIter={best_maxIter}, stepSize={best_stepSize}")


Best parameters: layers=[31, 10, 2], maxIter=100, stepSize=0.01


###Best Param:
Best parameters: layers=[31, 10, 2], maxIter=100, stepSize=0.01

In [0]:
# Create the final MLP model with best parameters
final_mlp = MultilayerPerceptronClassifier(
    featuresCol="scaled_features",
    labelCol="DELAY_TARGET",
    predictionCol="prediction",
    probabilityCol="probability",
    layers=best_layers,
    maxIter=best_maxIter,
    stepSize=best_stepSize,
    solver="l-bfgs"
)

In [0]:
# Create the final pipeline
final_mlp_pipeline = Pipeline(stages=indexers + [vector_assembler, scaler, final_mlp])

# Fit the final model on training data
final_model = final_mlp_pipeline.fit(train_df)

# Evaluate on test data
precision, recall, auc, f1, precision_cal, recall_cal, auc_cal, f1_cal = evaluate_ml_model(final_model, test_df)
print(f"Test metrics: Precision={precision}, Recall={recall}, AUC={auc}, F1={f1}")
print(f"Calibrated: Precision={precision_cal}, Recall={recall_cal}, AUC={auc_cal}, F1={f1_cal}")

Non-Calibrated:
=> Precision=0.2476, Recall=0.6788, AUC=0.6365
F1=0.3629
Calibrated:
   => Precision_cal=0.3266, Recall_cal=0.2879, AUC_cal=0.6386
   => F1_cal=0.3060

Test metrics: Precision=0.24762758683551225, Recall=0.6787672106344826, AUC=0.6365222999903057, F1=0.3628722588933425
Calibrated: Precision=0.3265543894866837, Recall=0.2878937592771078, AUC=0.6386191837076368, F1=0.30600782502773244


###Train Test results with best parm:

#### Train Results
Non-Calibrated:
- Precision=0.2476
- Recall=0.6788
- AUC=0.6365
- F1=0.3629

Calibrated:
- Precision_cal=0.3266,
- Recall_cal=0.2879
- AUC_cal=0.6386
- F1_cal=0.3060

#### Test Results

Non Calibrated: 
- Precision=0.24762758683551225
- Recall=0.6787672106344826
- AUC=0.6365222999903057
- F1=0.3628722588933425

Calibrated: 
- Precision=0.3265543894866837
- Recall=0.2878937592771078
- AUC=0.6386191837076368
- F1=0.30600782502773244

### Top 10 Features

In [0]:
# Extract the final MLP model from the fitted pipeline
final_mlp_model = final_model.stages[-1]

# Get the weights vector from the MLP model
weights_vector = final_mlp_model.weights

# Convert the weights to a NumPy array for easier manipulation
import numpy as np
weights_array = np.array(weights_vector.toArray())

# Determine sizes: input_size and hidden_size are obtained from best_layers.
input_size = best_layers[0]
hidden_size = best_layers[1]

# Extract weights for the first layer and reshape into a matrix
# The first layer contains input_size * hidden_size weights.
first_layer_weights = weights_array[: input_size * hidden_size]
first_layer_matrix = first_layer_weights.reshape((input_size, hidden_size))

# Calculate the aggregated importance for each input feature by summing
# the absolute weights across all neurons in the first hidden layer.
aggregated_importance = np.sum(np.abs(first_layer_matrix), axis=1)

# Sort features by aggregated importance in descending order
sorted_indices = np.argsort(aggregated_importance)[::-1]

# Retrieve feature names from the vector assembler
feature_names = vector_assembler.getInputCols()

# Get the top 10 feature indices and their corresponding names and scores
top_10_indices = sorted_indices[:10]
top_features = [(feature_names[i], aggregated_importance[i])
                for i in top_10_indices]

print("Top 10 features based on weight analysis:")
for feature, score in top_features:
    print(f"Feature: {feature}, Score: {score:.4f}")


Top 10 features based on weight analysis:
Feature: HourlyAltimeterSetting, Score: 10.9176
Feature: CRS_DEP_TIME, Score: 10.4867
Feature: HourlyRelativeHumidity, Score: 8.8356
Feature: origin_airport_lon, Score: 8.5671
Feature: CRS_ARR_TIME, Score: 7.1204
Feature: origin_airport_lat, Score: 6.8495
Feature: HourlyDewPointTemperature, Score: 6.8400
Feature: MONTH_index, Score: 6.7503
Feature: rolling_3mo_delay_count, Score: 6.7112
Feature: OP_UNIQUE_CARRIER_index, Score: 5.9972


## MLP 2 hidden layers

In [0]:
spark.catalog.clearCache()

In [0]:
mlp_2layers = MultilayerPerceptronClassifier(
    featuresCol="scaled_features",
    labelCol="DELAY_TARGET",
    predictionCol="prediction",
    probabilityCol="probability",
    solver="l-bfgs",
    layers=[vector_size, 20, 10, 2]  # Default values with 2 hidden layers (20 and 10 neurons)
)

In [0]:
# Create the full pipeline
mlp_2layer_pipeline = Pipeline(stages=indexers + [vector_assembler, scaler, mlp_2layers])

# Format: [input_size, first_hidden_layer, second_hidden_layer, output_layer]
layer_configs_2hidden = [
    [vector_size, 20, 10, 2],  # Medium-sized first layer, smaller second layer
    [vector_size, 30, 15, 2],  # Larger first layer, medium second layer
    [vector_size, 40, 20, 2]   # Even larger layers
]

In [0]:
# Create parameter grid for 2-hidden-layer MLP
mlp_2layer_paramGrid = (
    ParamGridBuilder()
    .addGrid(mlp_2layers.layers, layer_configs_2hidden)
    .addGrid(mlp_2layers.maxIter, [50, 100])
    .addGrid(mlp_2layers.stepSize, [0.01, 0.03])
    .build()
)

In [0]:
best_params_2layer, best_metrics_2layer, best_params_calibrated_2layer, best_metrics_calibrated_2layer = time_cv_years_with_gridsearch(
    train_df=train_df,
    pipeline=mlp_2layer_pipeline,
    paramGrid=mlp_2layer_paramGrid
)



# Get the best parameters
best_layers_2layer = best_params_calibrated_2layer.get('layers', [vector_size, 30, 15, 2])
best_maxIter_2layer = best_params_calibrated_2layer.get('maxIter', 100)
best_stepSize_2layer = best_params_calibrated_2layer.get('stepSize', 0.01)

print(f"Best parameters for 2-layer MLP: layers={best_layers_2layer}, maxIter={best_maxIter_2layer}, stepSize={best_stepSize_2layer}")


Shape of feature matrix: (1696952, 31)
Non-Calibrated:
Fold: Train <= Year 2015, Validate = Year 2016 => Precision=0.6335, Recall=0.3232, AUC=0.6346
F1=0.4280
Calibrated:
Fold: Train <= 2015, Validate = 2016
   => Shape: (1696952, 31)
   => Precision_cal=0.7046, Recall_cal=0.0678, AUC_cal=0.6371
   => F1_cal=0.1237

Shape of feature matrix: (2226085, 31)
Non-Calibrated:
Fold: Train <= Year 2017, Validate = Year 2018 => Precision=0.6126, Recall=0.5665, AUC=0.6351
F1=0.5887
Calibrated:
Fold: Train <= 2017, Validate = 2018
   => Shape: (2226085, 31)
   => Precision_cal=0.6982, Recall_cal=0.2011, AUC_cal=0.6356
   => F1_cal=0.3123


Time-Series CV Results:
Avg Precision: 0.6230
Avg Recall:    0.4449
Avg AUC:       0.6348
Avg F1:        0.5083
Calibrated:
Avg Precision_cal: 0.7014
Avg Recall_cal:    0.1345
Avg AUC_cal:       0.6364
Avg F1_cal:        0.2180
[GridSearch] Param Set 1/12 Params: {Param(parent='MultilayerPerceptronClassifier_0cb20715c5de', name='layers', doc='Sizes of layers fr

In [0]:
print(best_params_2layer)
print("************")
print(best_metrics_2layer)
print("************")
print(best_params_calibrated_2layer)
print("************")
print(best_metrics_calibrated_2layer)
print("************")

{Param(parent='MultilayerPerceptronClassifier_0cb20715c5de', name='layers', doc='Sizes of layers from input layer to output layer E.g., Array(780, 100, 10) means 780 inputs, one hidden layer with 100 neurons and output layer of 10 neurons.'): [31, 40, 20, 2], Param(parent='MultilayerPerceptronClassifier_0cb20715c5de', name='maxIter', doc='max number of iterations (>= 0).'): 100, Param(parent='MultilayerPerceptronClassifier_0cb20715c5de', name='stepSize', doc='Step size to be used for each iteration of optimization (>= 0).'): 0.01}
************
(0.5858998787299918, 0.6498049637588736, 0.6407055349265882, 0.6154909950307311)
************
{Param(parent='MultilayerPerceptronClassifier_0cb20715c5de', name='layers', doc='Sizes of layers from input layer to output layer E.g., Array(780, 100, 10) means 780 inputs, one hidden layer with 100 neurons and output layer of 10 neurons.'): [31, 20, 10, 2], Param(parent='MultilayerPerceptronClassifier_0cb20715c5de', name='maxIter', doc='max number of i

In [0]:
print(best_params)
print("************")
print(best_metrics)
print("************")
print(best_params_calibrated)
print("************")
print(best_metrics_calibrated)
print("************")

{Param(parent='MultilayerPerceptronClassifier_899632fbcbe5', name='layers', doc='Sizes of layers from input layer to output layer E.g., Array(780, 100, 10) means 780 inputs, one hidden layer with 100 neurons and output layer of 10 neurons.'): [31, 10, 2], Param(parent='MultilayerPerceptronClassifier_899632fbcbe5', name='maxIter', doc='max number of iterations (>= 0).'): 50, Param(parent='MultilayerPerceptronClassifier_899632fbcbe5', name='stepSize', doc='Step size to be used for each iteration of optimization (>= 0).'): 0.01}
************
(0.27267268560795327, 0.5491666110004051, 0.6420380220541615, 0.36440879691606537)
************
{Param(parent='MultilayerPerceptronClassifier_899632fbcbe5', name='layers', doc='Sizes of layers from input layer to output layer E.g., Array(780, 100, 10) means 780 inputs, one hidden layer with 100 neurons and output layer of 10 neurons.'): [31, 10, 2], Param(parent='MultilayerPerceptronClassifier_899632fbcbe5', name='maxIter', doc='max number of iteratio

###CV Results
{Param(parent='MultilayerPerceptronClassifier_72eef61641c1', name='layers', doc='Sizes of layers from input layer to output layer E.g., Array(780, 100, 10) means 780 inputs, one hidden layer with 100 neurons and output layer of 10 neurons.'): [32, 10, 2], Param(parent='MultilayerPerceptronClassifier_72eef61641c1', name='maxIter', doc='max number of iterations (>= 0).'): 100, Param(parent='MultilayerPerceptronClassifier_72eef61641c1', name='stepSize', doc='Step size to be used for each iteration of optimization (>= 0).'): 0.01}
************
Non Calibrated:
(precision, recall, auc, f1)

(0.8921949635310785, 0.768082360762984, 0.8868068885366374, 0.8242354873600939)
************
{Param(parent='MultilayerPerceptronClassifier_72eef61641c1', name='layers', doc='Sizes of layers from input layer to output layer E.g., Array(780, 100, 10) means 780 inputs, one hidden layer with 100 neurons and output layer of 10 neurons.'): [32, 10, 2], Param(parent='MultilayerPerceptronClassifier_72eef61641c1', name='maxIter', doc='max number of iterations (>= 0).'): 100, Param(parent='MultilayerPerceptronClassifier_72eef61641c1', name='stepSize', doc='Step size to be used for each iteration of optimization (>= 0).'): 0.01}
************
Calibrated:
- Precision: 0.9368102975566611
- Recall: 0.7069352376698033
- AUC: 0.8865981546775914
- F1: 0.8051884869202455

(precision, recall, auc, f1)

(0.9368102975566611, 0.7069352376698033, 0.8865981546775914, 0.8051884869202455)
************

###Best Param:
layers=[32, 30, 15, 2], maxIter=100, stepSize=0.01

In [0]:

# Create the final 2-layer MLP model with best parameters
final_mlp_2layer = MultilayerPerceptronClassifier(
    featuresCol="scaled_features",
    labelCol="DELAY_TARGET",
    predictionCol="prediction",
    probabilityCol="probability",
    layers=best_layers_2layer,
    maxIter=best_maxIter_2layer,
    stepSize=best_stepSize_2layer,
    solver="l-bfgs"
)

In [0]:
# Create the final pipeline
final_mlp_2layer_pipeline = Pipeline(stages=indexers + [vector_assembler, scaler, final_mlp_2layer])

# Fit the final model on training data
final_model_2layer = final_mlp_2layer_pipeline.fit(train_df)

# Evaluate on test data
precision_2l, recall_2l, auc_2l, f1_2l, precision_cal_2l, recall_cal_2l, auc_cal_2l, f1_cal_2l = evaluate_ml_model(final_model_2layer, test_df)
print(f"Precision: {precision_2l}, Recall: {recall_2l}, AUC: {auc_2l}, F1: {f1_2l}")
print(f"Calibrated: Precision: {precision_cal_2l}, Recall: {recall_cal_2l}, AUC: {auc_cal_2l}, F1: {f1_cal_2l}")

Non-Calibrated:
=> Precision=0.2653, Recall=0.5855, AUC=0.6399
F1=0.3652
Calibrated:
   => Precision_cal=0.3489, Recall_cal=0.2330, AUC_cal=0.6420
   => F1_cal=0.2794

Precision: 0.2653317575791887, Recall: 0.5854786532176979, AUC: 0.6399170502337804, F1: 0.3651720244886232
Calibrated: Precision: 0.34889999246481196, Recall: 0.23297935374506704, AUC: 0.6420499272056805, F1: 0.2793929542114777


###Train Test Results with best params:

Non-Calibrated:
=> Precision=0.6481, Recall=0.6838, AUC=0.8634
F1=0.6655
Calibrated:
   => Precision_cal=0.7408, Recall_cal=0.6276, AUC_cal=0.8633
   => F1_cal=0.6795

#### Test Results

Non-Calibrated:
- Precision: 0.2653317575791887
- Recall: 0.5854786532176979
- AUC: 0.6399170502337804
- F1: 0.3651720244886232

Calibrated: 
- Precision: 0.34889999246481196
- Recall: 0.23297935374506704
- AUC: 0.6420499272056805
- F1: 0.2793929542114777

## Random Forest Model
Umesh

In [0]:
spark.catalog.clearCache()

In [0]:
# Categorical columns
categorical_cols = [
  "OP_UNIQUE_CARRIER", "origin_type", "dest_type",
    "QUARTER", "IS_HOLIDAY", "YEAR", "MONTH", "HAS_WEATHER_TYPE", "DAY_OF_WEEK"
]

# Numerical columns (removed the duplicate "HourlyPrecipitation")
numerical_cols = [
    "CRS_DEP_TIME", "CRS_ARR_TIME", "CRS_ELAPSED_TIME", "DISTANCE", "origin_airport_lat", "origin_airport_lon", 
    "dest_airport_lat", "dest_airport_lon", "sched_depart_unix", "ELEVATION",
    "HourlyAltimeterSetting", "HourlyDewPointTemperature", "HourlyPrecipitation",
    "HourlyDryBulbTemperature", "HourlyRelativeHumidity", "HourlyStationPressure",
    "HourlyVisibility", "HourlyWetBulbTemperature", "HourlyWindDirection",
    "HourlyWindSpeed", "rolling_3mo_delay_count", "DAY_OF_MONTH", 
]


# 1) StringIndex each categorical column
indexers = [
    StringIndexer(inputCol=col, outputCol=f"{col}_index", handleInvalid="keep")
    for col in categorical_cols
]


# 3) VectorAssembler for all features (OHE + numeric)
feature_cols = [f"{col}_index" for col in categorical_cols] + numerical_cols
vector_assembler = VectorAssembler(
    inputCols=feature_cols,
    outputCol="assembled_features",
    handleInvalid="keep"
)

# 4) MinMaxScaler on assembled_features -> scaled_features
scaler = MinMaxScaler(
    inputCol="assembled_features",
    outputCol="scaled_features"
)



In [0]:
# Create a Random Forest Classifier   
rf = RandomForestClassifier(
    featuresCol="scaled_features",
    labelCol="DELAY_TARGET",
    rawPredictionCol="rawPrediction",
    predictionCol="prediction",
	probabilityCol="probability",
	numTrees=10
)
# Create a pipeline
random_forest_pipeline = Pipeline(stages=indexers + [vector_assembler, scaler, rf])

# # Fit the pipeline to the training data
# model = random_forest_pipeline.fit(train_df)

# # Make predictions on test data
# predictions = model.transform(test_df)

# # Evaluate the model
# evaluator = BinaryClassificationEvaluator(labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
# accuracy = evaluator.evaluate(predictions)
# print("Accuracy:", accuracy)

In [0]:
# Tailor it for Random Forest
rf_paramGrid = (

ParamGridBuilder()
.addGrid(rf.numTrees, [10, 15, 20]) \
.addGrid(rf.maxDepth, [5, 8, 12]) \
.build()
)

In [0]:
# Tailor it for Random Forest
best_params, best_metrics, best_params_calibrated, best_metrics_calibrated = time_cv_years_with_gridsearch(
train_df=train_df,
pipeline=random_forest_pipeline, #pass the pipeline with yur model
paramGrid=rf_paramGrid)

Downloading artifacts:   0%|          | 0/125 [00:00<?, ?it/s]

Uploading artifacts:   0%|          | 0/4 [00:00<?, ?it/s]

Shape of feature matrix: (1696321, 31)
Non-Calibrated:
Fold: Train <= Year 2015, Validate = Year 2016 => Precision=0.5749, Recall=0.6297, AUC=0.6370
F1=0.6010
Calibrated:
Fold: Train <= 2015, Validate = 2016
   => Shape: (1696321, 31)
   => Precision_cal=0.7325, Recall_cal=0.0744, AUC_cal=0.6370
   => F1_cal=0.1351



Downloading artifacts:   0%|          | 0/125 [00:00<?, ?it/s]

Uploading artifacts:   0%|          | 0/4 [00:00<?, ?it/s]

Shape of feature matrix: (2224963, 31)
Non-Calibrated:
Fold: Train <= Year 2017, Validate = Year 2018 => Precision=0.6071, Recall=0.6276, AUC=0.6457
F1=0.6172
Calibrated:
Fold: Train <= 2017, Validate = 2018
   => Shape: (2224963, 31)
   => Precision_cal=0.7794, Recall_cal=0.0841, AUC_cal=0.6457
   => F1_cal=0.1519


Time-Series CV Results:
Avg Precision: 0.5910
Avg Recall:    0.6287
Avg AUC:       0.6414
Avg F1:        0.6091
Calibrated:
Avg Precision_cal: 0.7560
Avg Recall_cal:    0.0793
Avg AUC_cal:       0.6414
Avg F1_cal:        0.1435
[GridSearch] Param Set 1/9 Params: {Param(parent='RandomForestClassifier_337537bc3d72', name='numTrees', doc='Number of trees to train (>= 1).'): 10, Param(parent='RandomForestClassifier_337537bc3d72', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes. Must be in range [0, 30].'): 5} => AUC=0.6414, Precision=0.5910, Recall=0.6287, f1=0.6091, Params={Param(parent='Ran

Downloading artifacts:   0%|          | 0/125 [00:00<?, ?it/s]

Uploading artifacts:   0%|          | 0/4 [00:00<?, ?it/s]

Shape of feature matrix: (1696226, 31)
Non-Calibrated:
Fold: Train <= Year 2015, Validate = Year 2016 => Precision=0.5794, Recall=0.6400, AUC=0.6447
F1=0.6082
Calibrated:
Fold: Train <= 2015, Validate = 2016
   => Shape: (1696226, 31)
   => Precision_cal=0.6917, Recall_cal=0.1626, AUC_cal=0.6447
   => F1_cal=0.2633



Downloading artifacts:   0%|          | 0/125 [00:00<?, ?it/s]

Uploading artifacts:   0%|          | 0/4 [00:00<?, ?it/s]

Shape of feature matrix: (2224970, 31)
Non-Calibrated:
Fold: Train <= Year 2017, Validate = Year 2018 => Precision=0.6245, Recall=0.5952, AUC=0.6555
F1=0.6095
Calibrated:
Fold: Train <= 2017, Validate = 2018
   => Shape: (2224970, 31)
   => Precision_cal=0.7579, Recall_cal=0.1497, AUC_cal=0.6555
   => F1_cal=0.2500


Time-Series CV Results:
Avg Precision: 0.6020
Avg Recall:    0.6176
Avg AUC:       0.6501
Avg F1:        0.6089
Calibrated:
Avg Precision_cal: 0.7248
Avg Recall_cal:    0.1561
Avg AUC_cal:       0.6501
Avg F1_cal:        0.2567
[GridSearch] Param Set 2/9 Params: {Param(parent='RandomForestClassifier_337537bc3d72', name='numTrees', doc='Number of trees to train (>= 1).'): 10, Param(parent='RandomForestClassifier_337537bc3d72', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes. Must be in range [0, 30].'): 8} => AUC=0.6501, Precision=0.6020, Recall=0.6176, f1=0.6089, Params={Param(parent='Ran



Downloading artifacts:   0%|          | 0/125 [00:00<?, ?it/s]

Uploading artifacts:   0%|          | 0/4 [00:00<?, ?it/s]

Shape of feature matrix: (1696400, 31)
Non-Calibrated:
Fold: Train <= Year 2015, Validate = Year 2016 => Precision=0.5770, Recall=0.6699, AUC=0.6479
F1=0.6200
Calibrated:
Fold: Train <= 2015, Validate = 2016
   => Shape: (1696400, 31)
   => Precision_cal=0.6884, Recall_cal=0.1642, AUC_cal=0.6479
   => F1_cal=0.2652





Downloading artifacts:   0%|          | 0/125 [00:00<?, ?it/s]

Uploading artifacts:   0%|          | 0/4 [00:00<?, ?it/s]

Shape of feature matrix: (2225011, 31)
Non-Calibrated:
Fold: Train <= Year 2017, Validate = Year 2018 => Precision=0.6337, Recall=0.5901, AUC=0.6650
F1=0.6111
Calibrated:
Fold: Train <= 2017, Validate = 2018
   => Shape: (2225011, 31)
   => Precision_cal=0.7399, Recall_cal=0.2245, AUC_cal=0.6650
   => F1_cal=0.3444


Time-Series CV Results:
Avg Precision: 0.6054
Avg Recall:    0.6300
Avg AUC:       0.6564
Avg F1:        0.6156
Calibrated:
Avg Precision_cal: 0.7141
Avg Recall_cal:    0.1943
Avg AUC_cal:       0.6564
Avg F1_cal:        0.3048
[GridSearch] Param Set 3/9 Params: {Param(parent='RandomForestClassifier_337537bc3d72', name='numTrees', doc='Number of trees to train (>= 1).'): 10, Param(parent='RandomForestClassifier_337537bc3d72', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes. Must be in range [0, 30].'): 12} => AUC=0.6564, Precision=0.6054, Recall=0.6300, f1=0.6156, Params={Param(parent='Ra



Downloading artifacts:   0%|          | 0/125 [00:00<?, ?it/s]



Uploading artifacts:   0%|          | 0/4 [00:00<?, ?it/s]

Shape of feature matrix: (1696205, 31)
Non-Calibrated:
Fold: Train <= Year 2015, Validate = Year 2016 => Precision=0.5687, Recall=0.6857, AUC=0.6423
F1=0.6217
Calibrated:
Fold: Train <= 2015, Validate = 2016
   => Shape: (1696205, 31)
   => Precision_cal=0.7228, Recall_cal=0.0738, AUC_cal=0.6423
   => F1_cal=0.1339





Downloading artifacts:   0%|          | 0/125 [00:00<?, ?it/s]



Uploading artifacts:   0%|          | 0/4 [00:00<?, ?it/s]

Shape of feature matrix: (2224507, 31)
Non-Calibrated:
Fold: Train <= Year 2017, Validate = Year 2018 => Precision=0.6014, Recall=0.6323, AUC=0.6410
F1=0.6165
Calibrated:
Fold: Train <= 2017, Validate = 2018
   => Shape: (2224507, 31)
   => Precision_cal=0.7863, Recall_cal=0.0660, AUC_cal=0.6410
   => F1_cal=0.1218


Time-Series CV Results:
Avg Precision: 0.5850
Avg Recall:    0.6590
Avg AUC:       0.6416
Avg F1:        0.6191
Calibrated:
Avg Precision_cal: 0.7545
Avg Recall_cal:    0.0699
Avg AUC_cal:       0.6416
Avg F1_cal:        0.1279
[GridSearch] Param Set 4/9 Params: {Param(parent='RandomForestClassifier_337537bc3d72', name='numTrees', doc='Number of trees to train (>= 1).'): 15, Param(parent='RandomForestClassifier_337537bc3d72', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes. Must be in range [0, 30].'): 5} => AUC=0.6416, Precision=0.5850, Recall=0.6590, f1=0.6191, Params={Param(parent='Ran



Downloading artifacts:   0%|          | 0/125 [00:00<?, ?it/s]



Uploading artifacts:   0%|          | 0/4 [00:00<?, ?it/s]

Shape of feature matrix: (1696320, 31)
Non-Calibrated:
Fold: Train <= Year 2015, Validate = Year 2016 => Precision=0.5659, Recall=0.7244, AUC=0.6507
F1=0.6354
Calibrated:
Fold: Train <= 2015, Validate = 2016
   => Shape: (1696320, 31)
   => Precision_cal=0.7161, Recall_cal=0.1358, AUC_cal=0.6507
   => F1_cal=0.2284





Downloading artifacts:   0%|          | 0/125 [00:00<?, ?it/s]

Uploading artifacts:   0%|          | 0/4 [00:00<?, ?it/s]

Shape of feature matrix: (2222778, 31)
Non-Calibrated:
Fold: Train <= Year 2017, Validate = Year 2018 => Precision=0.6209, Recall=0.6039, AUC=0.6559
F1=0.6123
Calibrated:
Fold: Train <= 2017, Validate = 2018
   => Shape: (2222778, 31)
   => Precision_cal=0.7771, Recall_cal=0.1093, AUC_cal=0.6559
   => F1_cal=0.1916


Time-Series CV Results:
Avg Precision: 0.5934
Avg Recall:    0.6641
Avg AUC:       0.6533
Avg F1:        0.6238
Calibrated:
Avg Precision_cal: 0.7466
Avg Recall_cal:    0.1225
Avg AUC_cal:       0.6533
Avg F1_cal:        0.2100
[GridSearch] Param Set 5/9 Params: {Param(parent='RandomForestClassifier_337537bc3d72', name='numTrees', doc='Number of trees to train (>= 1).'): 15, Param(parent='RandomForestClassifier_337537bc3d72', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes. Must be in range [0, 30].'): 8} => AUC=0.6533, Precision=0.5934, Recall=0.6641, f1=0.6238, Params={Param(parent='Ran



Downloading artifacts:   0%|          | 0/125 [00:00<?, ?it/s]

Uploading artifacts:   0%|          | 0/4 [00:00<?, ?it/s]

Shape of feature matrix: (1696210, 31)
Non-Calibrated:
Fold: Train <= Year 2015, Validate = Year 2016 => Precision=0.5573, Recall=0.7543, AUC=0.6459
F1=0.6410
Calibrated:
Fold: Train <= 2015, Validate = 2016
   => Shape: (1696210, 31)
   => Precision_cal=0.6507, Recall_cal=0.2818, AUC_cal=0.6459
   => F1_cal=0.3933





Downloading artifacts:   0%|          | 0/125 [00:00<?, ?it/s]

Uploading artifacts:   0%|          | 0/4 [00:00<?, ?it/s]

Shape of feature matrix: (2224394, 31)
Non-Calibrated:
Fold: Train <= Year 2017, Validate = Year 2018 => Precision=0.6386, Recall=0.5796, AUC=0.6667
F1=0.6077
Calibrated:
Fold: Train <= 2017, Validate = 2018
   => Shape: (2224394, 31)
   => Precision_cal=0.7536, Recall_cal=0.2041, AUC_cal=0.6667
   => F1_cal=0.3212


Time-Series CV Results:
Avg Precision: 0.5979
Avg Recall:    0.6669
Avg AUC:       0.6563
Avg F1:        0.6243
Calibrated:
Avg Precision_cal: 0.7022
Avg Recall_cal:    0.2429
Avg AUC_cal:       0.6563
Avg F1_cal:        0.3572
[GridSearch] Param Set 6/9 Params: {Param(parent='RandomForestClassifier_337537bc3d72', name='numTrees', doc='Number of trees to train (>= 1).'): 15, Param(parent='RandomForestClassifier_337537bc3d72', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes. Must be in range [0, 30].'): 12} => AUC=0.6563, Precision=0.5979, Recall=0.6669, f1=0.6243, Params={Param(parent='Ra



Downloading artifacts:   0%|          | 0/125 [00:00<?, ?it/s]



Uploading artifacts:   0%|          | 0/4 [00:00<?, ?it/s]

Shape of feature matrix: (1696358, 31)
Non-Calibrated:
Fold: Train <= Year 2015, Validate = Year 2016 => Precision=0.5779, Recall=0.6330, AUC=0.6442
F1=0.6042
Calibrated:
Fold: Train <= 2015, Validate = 2016
   => Shape: (1696358, 31)
   => Precision_cal=0.7381, Recall_cal=0.0658, AUC_cal=0.6442
   => F1_cal=0.1208





Downloading artifacts:   0%|          | 0/125 [00:00<?, ?it/s]

Uploading artifacts:   0%|          | 0/4 [00:00<?, ?it/s]

Shape of feature matrix: (2224557, 31)
Non-Calibrated:
Fold: Train <= Year 2017, Validate = Year 2018 => Precision=0.6033, Recall=0.6168, AUC=0.6434
F1=0.6100
Calibrated:
Fold: Train <= 2017, Validate = 2018
   => Shape: (2224557, 31)
   => Precision_cal=0.7735, Recall_cal=0.0953, AUC_cal=0.6434
   => F1_cal=0.1697


Time-Series CV Results:
Avg Precision: 0.5906
Avg Recall:    0.6249
Avg AUC:       0.6438
Avg F1:        0.6071
Calibrated:
Avg Precision_cal: 0.7558
Avg Recall_cal:    0.0805
Avg AUC_cal:       0.6438
Avg F1_cal:        0.1453
[GridSearch] Param Set 7/9 Params: {Param(parent='RandomForestClassifier_337537bc3d72', name='numTrees', doc='Number of trees to train (>= 1).'): 20, Param(parent='RandomForestClassifier_337537bc3d72', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes. Must be in range [0, 30].'): 5} => AUC=0.6438, Precision=0.5906, Recall=0.6249, f1=0.6071, Params={Param(parent='Ran



Downloading artifacts:   0%|          | 0/125 [00:00<?, ?it/s]

Uploading artifacts:   0%|          | 0/4 [00:00<?, ?it/s]

Shape of feature matrix: (1696209, 31)
Non-Calibrated:
Fold: Train <= Year 2015, Validate = Year 2016 => Precision=0.5759, Recall=0.6776, AUC=0.6529
F1=0.6226
Calibrated:
Fold: Train <= 2015, Validate = 2016
   => Shape: (1696209, 31)
   => Precision_cal=0.7053, Recall_cal=0.1743, AUC_cal=0.6529
   => F1_cal=0.2796





Downloading artifacts:   0%|          | 0/125 [00:00<?, ?it/s]

Uploading artifacts:   0%|          | 0/4 [00:00<?, ?it/s]

Shape of feature matrix: (2224867, 31)
Non-Calibrated:
Fold: Train <= Year 2017, Validate = Year 2018 => Precision=0.6271, Recall=0.5936, AUC=0.6572
F1=0.6099
Calibrated:
Fold: Train <= 2017, Validate = 2018
   => Shape: (2224867, 31)
   => Precision_cal=0.7541, Recall_cal=0.1625, AUC_cal=0.6572
   => F1_cal=0.2674


Time-Series CV Results:
Avg Precision: 0.6015
Avg Recall:    0.6356
Avg AUC:       0.6551
Avg F1:        0.6163
Calibrated:
Avg Precision_cal: 0.7297
Avg Recall_cal:    0.1684
Avg AUC_cal:       0.6551
Avg F1_cal:        0.2735
[GridSearch] Param Set 8/9 Params: {Param(parent='RandomForestClassifier_337537bc3d72', name='numTrees', doc='Number of trees to train (>= 1).'): 20, Param(parent='RandomForestClassifier_337537bc3d72', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes. Must be in range [0, 30].'): 8} => AUC=0.6551, Precision=0.6015, Recall=0.6356, f1=0.6163, Params={Param(parent='Ran



Downloading artifacts:   0%|          | 0/125 [00:00<?, ?it/s]



Uploading artifacts:   0%|          | 0/4 [00:00<?, ?it/s]

Shape of feature matrix: (1696247, 31)
Non-Calibrated:
Fold: Train <= Year 2015, Validate = Year 2016 => Precision=0.5789, Recall=0.6833, AUC=0.6563
F1=0.6268
Calibrated:
Fold: Train <= 2015, Validate = 2016
   => Shape: (1696247, 31)
   => Precision_cal=0.6979, Recall_cal=0.2079, AUC_cal=0.6563
   => F1_cal=0.3204





Downloading artifacts:   0%|          | 0/125 [00:00<?, ?it/s]



Uploading artifacts:   0%|          | 0/4 [00:00<?, ?it/s]

Shape of feature matrix: (2223856, 31)
Non-Calibrated:
Fold: Train <= Year 2017, Validate = Year 2018 => Precision=0.6356, Recall=0.5948, AUC=0.6683
F1=0.6145
Calibrated:
Fold: Train <= 2017, Validate = 2018
   => Shape: (2223856, 31)
   => Precision_cal=0.7512, Recall_cal=0.2164, AUC_cal=0.6683
   => F1_cal=0.3360


Time-Series CV Results:
Avg Precision: 0.6072
Avg Recall:    0.6390
Avg AUC:       0.6623
Avg F1:        0.6207
Calibrated:
Avg Precision_cal: 0.7245
Avg Recall_cal:    0.2121
Avg AUC_cal:       0.6623
Avg F1_cal:        0.3282
[GridSearch] Param Set 9/9 Params: {Param(parent='RandomForestClassifier_337537bc3d72', name='numTrees', doc='Number of trees to train (>= 1).'): 20, Param(parent='RandomForestClassifier_337537bc3d72', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes. Must be in range [0, 30].'): 12} => AUC=0.6623, Precision=0.6072, Recall=0.6390, f1=0.6207, Params={Param(parent='Ra

In [0]:
print(best_params)
print("************")
print(best_metrics)
print("************")
print(best_params_calibrated)
print("************")
print(best_metrics_calibrated)
print("************")

{Param(parent='RandomForestClassifier_337537bc3d72', name='numTrees', doc='Number of trees to train (>= 1).'): 15, Param(parent='RandomForestClassifier_337537bc3d72', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes. Must be in range [0, 30].'): 12}
************
(0.597948967274389, 0.6669227841282295, 0.6562678095825172, 0.6243239845704931)
************
{Param(parent='RandomForestClassifier_337537bc3d72', name='numTrees', doc='Number of trees to train (>= 1).'): 15, Param(parent='RandomForestClassifier_337537bc3d72', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes. Must be in range [0, 30].'): 12}
************
(0.7021529669023652, 0.24294670787691763, 0.6562682306156873, 0.3572393823242457)
************


###CV Results

{Param(parent='RandomForestClassifier_1427ed233d95', name='numTrees', doc='Number of trees to train (>= 1).'): 20, Param(parent='RandomForestClassifier_1427ed233d95', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes. Must be in range [0, 30].'): 12}
************
Non Calibrated:
(precision, recall, auc, f1)


(0.6985803483739265, 0.7382673746996642, 0.7884390604247213, 0.7149955312522794)
************
{Param(parent='RandomForestClassifier_1427ed233d95', name='numTrees', doc='Number of trees to train (>= 1).'): 20, Param(parent='RandomForestClassifier_1427ed233d95', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes. Must be in range [0, 30].'): 12}
************
Calibrated:
- Precision: 0.8634837406271338
- Recall: 0.3979052541561583
- AUC: 0.7884397820446747
- F1: 0.5441362696717309

(precision, recall, auc, f1)


(0.8634837406271338, 0.3979052541561583, 0.7884397820446747, 0.5441362696717309)
************

In [0]:
#get best params
best_numTrees = best_params_calibrated.get("numTrees", 20)
best_maxDepth = best_params_calibrated.get("maxDepth", 12)

print(f"best_numTrees: {best_numTrees}")
print(f"best_maxDepth: {best_maxDepth}")



best_numTrees: 20
best_maxDepth: 12


###Best Params:
best_numTrees: 20

best_maxDepth: 12

In [0]:
# Now we have the best parameters, let's train the model with the best parameters (Calibrated)
# Create a Random Forest Classifier   
rf_calibrated  = RandomForestClassifier(
    featuresCol="scaled_features",
    labelCol="DELAY_TARGET",
    rawPredictionCol="rawPrediction",
    predictionCol="prediction",
	probabilityCol="probability",
	numTrees=best_numTrees,
    maxDepth=best_maxDepth
)
# Create a pipeline
random_forest_pipeline = Pipeline(stages=indexers + [vector_assembler, scaler, rf])

# Fit the pipeline to the training data
model_calibrated   = random_forest_pipeline.fit(train_df)




Downloading artifacts:   0%|          | 0/125 [00:00<?, ?it/s]

Uploading artifacts:   0%|          | 0/4 [00:00<?, ?it/s]

In [0]:
#Evaluate on test

precision, recall, auc, f1, precision_cal, recall_cal, auc_cal, f1_cal = evaluate_ml_model(model_calibrated, test_df)
precision, recall, auc, f1, precision_cal, recall_cal, auc_cal, f1_cal

Non-Calibrated:
=> Precision=0.2628, Recall=0.6380, AUC=0.6485
F1=0.3722
Calibrated:
   => Precision_cal=0.4545, Recall_cal=0.0970, AUC_cal=0.6485
   => F1_cal=0.1599



(0.2627831305841777,
 0.6379848061469433,
 0.648503943616904,
 0.3722415902875936,
 0.45452771669098846,
 0.09700417193327336,
 0.648506615716258,
 0.1598858948602641)

###Best Param Model train test results:

Non-Calibrated:
- Precision=0.2628
- Recall=0.6380
- AUC=0.6485
- F1=0.3722

Calibrated:
- Precision_cal=0.4545
- Recall_cal=0.0970
- AUC_cal=0.6485
- F1_cal=0.1599

(0.2709525407708272,
 0.6863658722151326,
 0.6875188157235367,
 0.3885281520805818,
 0.5237647141749489,
 0.12892566428646407,
 0.6875057139402567,
 0.20691806079322325)

In [0]:
# Extract pipeline stages
assembler = [s for s in model_calibrated.stages if isinstance(s, VectorAssembler)][0]
ohe_models = [s for s in model_calibrated.stages if isinstance(s, OneHotEncoderModel)]
indexer_models = [s for s in model_calibrated.stages if isinstance(s, StringIndexerModel)]

# Build mappings
ohe_output_sizes = {}
ohe_input_output_map = {}
for ohe in ohe_models:
    out_col = ohe.getOutputCol()
    in_col = ohe.getInputCol()
    size = ohe.categorySizes[0]
    ohe_output_sizes[out_col] = size
    ohe_input_output_map[out_col] = in_col

indexer_label_dict = {
    indexer.getOutputCol(): indexer.labels for indexer in indexer_models
}

# Get feature importances
rf_model = model_calibrated.stages[-1]
importances = rf_model.featureImportances

# Collect feature–importance pairs
feature_importance_pairs = []
current_index = 0

for col in assembler.getInputCols():
    if col in numerical_cols:
        importance = importances[current_index]
        feature_importance_pairs.append((col, importance))
        current_index += 1

    elif col in ohe_output_sizes:
        size = ohe_output_sizes[col]
        num_dims = size - 1  # default: dropLast=True

        index_col = ohe_input_output_map[col]
        labels = indexer_label_dict.get(index_col, [f"{col}_cat_{i}" for i in range(num_dims)])
        labels = labels[:num_dims]  # due to dropLast=True

        for i, label in enumerate(labels):
            importance = importances[current_index]
            feature_label = f"{col.replace('_ohe', '')} = {label}"
            feature_importance_pairs.append((feature_label, importance))
            current_index += 1
    else:
        feature_importance_pairs.append((f"UNKNOWN: {col}", importances[current_index]))
        current_index += 1

# Sort by importance
sorted_features = sorted(feature_importance_pairs, key=lambda x: x[1], reverse=True)

# Print results
print("=== Sorted Random Forest Feature Importances ===\n")
for feature, importance in sorted_features:
    print(f"{feature:40s} -> {importance:.6f}")

[0;31m---------------------------------------------------------------------------[0m
[0;31mTypeError[0m                                 Traceback (most recent call last)
File [0;32m<command-5381617944348395>, line 20[0m
[1;32m     17[0m predictions [38;5;241m=[39m model[38;5;241m.[39mtransform(test_df)
[1;32m     19[0m [38;5;66;03m# Evaluate the model[39;00m
[0;32m---> 20[0m evaluator [38;5;241m=[39m BinaryClassificationEvaluator(labelCol[38;5;241m=[39m[38;5;124m"[39m[38;5;124mindexedLabel[39m[38;5;124m"[39m, predictionCol[38;5;241m=[39m[38;5;124m"[39m[38;5;124mprediction[39m[38;5;124m"[39m, metricName[38;5;241m=[39m[38;5;124m"[39m[38;5;124maccuracy[39m[38;5;124m"[39m)
[1;32m     21[0m accuracy [38;5;241m=[39m evaluator[38;5;241m.[39mevaluate(predictions)
[1;32m     22[0m [38;5;28mprint[39m([38;5;124m"[39m[38;5;124mAccuracy:[39m[38;5;124m"[39m, accuracy)

File [0;32m/databricks/spark/python/pyspark/__init__.py:120[0m, in [0

In [0]:

eda = df_otpw_60_months.orderBy(col("FL_DATE").asc())

[0;31m---------------------------------------------------------------------------[0m
[0;31mTypeError[0m                                 Traceback (most recent call last)
File [0;32m<command-5381617944348395>, line 20[0m
[1;32m     17[0m predictions [38;5;241m=[39m model[38;5;241m.[39mtransform(test_df)
[1;32m     19[0m [38;5;66;03m# Evaluate the model[39;00m
[0;32m---> 20[0m evaluator [38;5;241m=[39m BinaryClassificationEvaluator(labelCol[38;5;241m=[39m[38;5;124m"[39m[38;5;124mindexedLabel[39m[38;5;124m"[39m, predictionCol[38;5;241m=[39m[38;5;124m"[39m[38;5;124mprediction[39m[38;5;124m"[39m, metricName[38;5;241m=[39m[38;5;124m"[39m[38;5;124maccuracy[39m[38;5;124m"[39m)
[1;32m     21[0m accuracy [38;5;241m=[39m evaluator[38;5;241m.[39mevaluate(predictions)
[1;32m     22[0m [38;5;28mprint[39m([38;5;124m"[39m[38;5;124mAccuracy:[39m[38;5;124m"[39m, accuracy)

File [0;32m/databricks/spark/python/pyspark/__init__.py:120[0m, in [0

In [0]:
(eda.count(), len(eda.columns))

[0;31m---------------------------------------------------------------------------[0m
[0;31mTypeError[0m                                 Traceback (most recent call last)
File [0;32m<command-5381617944348395>, line 20[0m
[1;32m     17[0m predictions [38;5;241m=[39m model[38;5;241m.[39mtransform(test_df)
[1;32m     19[0m [38;5;66;03m# Evaluate the model[39;00m
[0;32m---> 20[0m evaluator [38;5;241m=[39m BinaryClassificationEvaluator(labelCol[38;5;241m=[39m[38;5;124m"[39m[38;5;124mindexedLabel[39m[38;5;124m"[39m, predictionCol[38;5;241m=[39m[38;5;124m"[39m[38;5;124mprediction[39m[38;5;124m"[39m, metricName[38;5;241m=[39m[38;5;124m"[39m[38;5;124maccuracy[39m[38;5;124m"[39m)
[1;32m     21[0m accuracy [38;5;241m=[39m evaluator[38;5;241m.[39mevaluate(predictions)
[1;32m     22[0m [38;5;28mprint[39m([38;5;124m"[39m[38;5;124mAccuracy:[39m[38;5;124m"[39m, accuracy)

File [0;32m/databricks/spark/python/pyspark/__init__.py:120[0m, in [0

In [0]:
# EDA of 60 months OTPW Training Dataset

# Data Dictionary of Flights Data

data_dictionary_otpw_60_months  = {
    "YEAR": "Year (int)",
    "QUARTER": "Quarter (1-4) (int)",
    "MONTH": "Month (int)",
    "DAY_OF_MONTH": "Day of Month (int)",
    "DAY_OF_WEEK": "Day of Week (int)",
    "FL_DATE": "Flight Date (yyyymmdd) (String)",
    "OP_UNIQUE_CARRIER": "Unique Carrier Code. When the same code has been used by multiple carriers, a numeric suffix is used for earlier users, for example, PA, PA(1), PA(2). Use this field for  across a range of years. (String)",
    "OP_CARRIER_FL_NUM": "Flight Number (int)",
    "ORIGIN": "Origin Airport (String)",
    "DEST": "Destination Airport (String)",
    "CRS_DEP_TIME": "CRS Departure Time (local time: hhmm) (int)",
    "DEP_TIME": "Actual Departure Time (local time: hhmm) (int)",
    "DEP_DELAY": "Difference in minutes between scheduled and actual departure time. Early departures show negative numbers. (double)",
    "TAXI_OUT": "Taxi Out Time, in Minutes (double)",
    "WHEELS_OFF": "Wheels Off Time (local time: hhmm)) (int)",
    "WHEELS_ON": "Wheels On Time (local time: hhmm) (int)",
    "TAXI_IN": "Taxi In Time, in Minutes (double)",
    "CRS_ARR_TIME": "CRS Arrival Time (local time: hhmm) (int)",
    "SCHED_DEPART_UNIX": "Scheduled Departure Time in Unix epoch (long)",
    "CANCELLED": "Cancelled Flight Indicator (1=Yes) (double)",
    "DIVERTED": "Diverted Flight Indicator (1=Yes) (double)",
    "CRS_ELAPSED_TIME": "CRS Elapsed Time of Flight, in Minutes (double)",
    "AIR_TIME": "Flight Time, in Minutes (double)",
    "DISTANCE": "Distance between airports (miles) (double)",
    "ORIGIN_TYPE": "Origin Airport Type (string)",
    "ORIGIN_REGION": "Origin Airport Region (string)",
    "ORIGIN_AIRPORT_LAT": "Origin Airport Latitude (double)",
    "ORIGIN_AIRPORT_LON": "Origin Airport Longitude (double)",
    "DEST_TYPE": "Destination Airport Type (string)",
    "DEST_REGION": "Destination Airport Region (string)",
    "DEST_AIRPORT_LAT": "Destination Airport Latitude (double)",
    "DEST_AIRPORT_LON": "Destination Airport Longitude (double)",
    "ELEVATION": "Airport Elevation (double)",
    "HourlyAltimeterSetting": "Hourly Altimeter Setting (float)",
    "HourlyDewPointTemperature": "Hourly Dew Point Temperature (float)",
    "HourlyDryBulbTemperature": "Hourly Dry Bulb Temperature (float)",
    "HourlyPrecipitation": "Hourly Precipitation (float)",
    "HourlyRelativeHumidity": "Hourly Relative Humidity (float)",
    "HourlySkyConditions": "Hourly Sky Conditions (string)",
    "HourlyStationPressure": "Hourly Station Pressure (float)",
    "HourlyVisibility": "Hourly Visibility (float)",
    "HourlyWetBulbTemperature": "Hourly Wet Bulb Temperature (float)",
    "HourlyWindDirection": "Hourly Wind Direction (float)",
    "HourlyWindSpeed": "Hourly Wind Speed (float)",
    "DELAY_TARGET": "Delay Target (0=No Delay, 1=Delay) (int)",
    "HAS_WEATHER_TYPE": "Has Weather Type (1=Yes, 0=No) (int)",
    "IS_HOLIDAY": "Is Holiday (1=Yes, 0=No) (string)",
    "rolling_3mo_delay_count": "Rolling 3 Month Delay Count (long)"
}

df_dict_otpw_60_months = pd.DataFrame(list(data_dictionary_otpw_60_months.items()), columns=["Feature Name", "Description"])
display(df_dict_otpw_60_months)

[0;31m---------------------------------------------------------------------------[0m
[0;31mTypeError[0m                                 Traceback (most recent call last)
File [0;32m<command-5381617944348395>, line 20[0m
[1;32m     17[0m predictions [38;5;241m=[39m model[38;5;241m.[39mtransform(test_df)
[1;32m     19[0m [38;5;66;03m# Evaluate the model[39;00m
[0;32m---> 20[0m evaluator [38;5;241m=[39m BinaryClassificationEvaluator(labelCol[38;5;241m=[39m[38;5;124m"[39m[38;5;124mindexedLabel[39m[38;5;124m"[39m, predictionCol[38;5;241m=[39m[38;5;124m"[39m[38;5;124mprediction[39m[38;5;124m"[39m, metricName[38;5;241m=[39m[38;5;124m"[39m[38;5;124maccuracy[39m[38;5;124m"[39m)
[1;32m     21[0m accuracy [38;5;241m=[39m evaluator[38;5;241m.[39mevaluate(predictions)
[1;32m     22[0m [38;5;28mprint[39m([38;5;124m"[39m[38;5;124mAccuracy:[39m[38;5;124m"[39m, accuracy)

File [0;32m/databricks/spark/python/pyspark/__init__.py:120[0m, in [0

In [0]:

# train_df EDA continued
eda.createOrReplaceTempView("otpw_60")


[0;31m---------------------------------------------------------------------------[0m
[0;31mTypeError[0m                                 Traceback (most recent call last)
File [0;32m<command-5381617944348395>, line 20[0m
[1;32m     17[0m predictions [38;5;241m=[39m model[38;5;241m.[39mtransform(test_df)
[1;32m     19[0m [38;5;66;03m# Evaluate the model[39;00m
[0;32m---> 20[0m evaluator [38;5;241m=[39m BinaryClassificationEvaluator(labelCol[38;5;241m=[39m[38;5;124m"[39m[38;5;124mindexedLabel[39m[38;5;124m"[39m, predictionCol[38;5;241m=[39m[38;5;124m"[39m[38;5;124mprediction[39m[38;5;124m"[39m, metricName[38;5;241m=[39m[38;5;124m"[39m[38;5;124maccuracy[39m[38;5;124m"[39m)
[1;32m     21[0m accuracy [38;5;241m=[39m evaluator[38;5;241m.[39mevaluate(predictions)
[1;32m     22[0m [38;5;28mprint[39m([38;5;124m"[39m[38;5;124mAccuracy:[39m[38;5;124m"[39m, accuracy)

File [0;32m/databricks/spark/python/pyspark/__init__.py:120[0m, in [0

In [0]:
# Cancelled Flights
spark.sql("SELECT count(*) FROM otpw_60 WHERE CANCELLED = 1").show()
# Diverted Flights
spark.sql("SELECT count(*) FROM otpw_60 WHERE Diverted = 1").show()
# Delayed Flights
spark.sql("SELECT count(*) FROM otpw_60 WHERE ARR_DELAY > 0").show()
#Total Flights
spark.sql("SELECT count(*) FROM otpw_60").show()
display(spark.sql("SELECT * FROM otpw_60"))

[0;31m---------------------------------------------------------------------------[0m
[0;31mTypeError[0m                                 Traceback (most recent call last)
File [0;32m<command-5381617944348395>, line 20[0m
[1;32m     17[0m predictions [38;5;241m=[39m model[38;5;241m.[39mtransform(test_df)
[1;32m     19[0m [38;5;66;03m# Evaluate the model[39;00m
[0;32m---> 20[0m evaluator [38;5;241m=[39m BinaryClassificationEvaluator(labelCol[38;5;241m=[39m[38;5;124m"[39m[38;5;124mindexedLabel[39m[38;5;124m"[39m, predictionCol[38;5;241m=[39m[38;5;124m"[39m[38;5;124mprediction[39m[38;5;124m"[39m, metricName[38;5;241m=[39m[38;5;124m"[39m[38;5;124maccuracy[39m[38;5;124m"[39m)
[1;32m     21[0m accuracy [38;5;241m=[39m evaluator[38;5;241m.[39mevaluate(predictions)
[1;32m     22[0m [38;5;28mprint[39m([38;5;124m"[39m[38;5;124mAccuracy:[39m[38;5;124m"[39m, accuracy)

File [0;32m/databricks/spark/python/pyspark/__init__.py:120[0m, in [0

In [0]:
#Causes of Delay
display(spark.sql("SELECT sum(CARRIER_DELAY), sum(WEATHER_DELAY), sum(NAS_DELAY), sum(SECURITY_DELAY), sum(LATE_AIRCRAFT_DELAY) FROM otpw_60 Where ARR_DELAY > 0 "))

[0;31m---------------------------------------------------------------------------[0m
[0;31mTypeError[0m                                 Traceback (most recent call last)
File [0;32m<command-5381617944348395>, line 20[0m
[1;32m     17[0m predictions [38;5;241m=[39m model[38;5;241m.[39mtransform(test_df)
[1;32m     19[0m [38;5;66;03m# Evaluate the model[39;00m
[0;32m---> 20[0m evaluator [38;5;241m=[39m BinaryClassificationEvaluator(labelCol[38;5;241m=[39m[38;5;124m"[39m[38;5;124mindexedLabel[39m[38;5;124m"[39m, predictionCol[38;5;241m=[39m[38;5;124m"[39m[38;5;124mprediction[39m[38;5;124m"[39m, metricName[38;5;241m=[39m[38;5;124m"[39m[38;5;124maccuracy[39m[38;5;124m"[39m)
[1;32m     21[0m accuracy [38;5;241m=[39m evaluator[38;5;241m.[39mevaluate(predictions)
[1;32m     22[0m [38;5;28mprint[39m([38;5;124m"[39m[38;5;124mAccuracy:[39m[38;5;124m"[39m, accuracy)

File [0;32m/databricks/spark/python/pyspark/__init__.py:120[0m, in [0

In [0]:
display(spark.sql("SELECT count(*) FROM otpw_60 Where ARR_DELAY > 0 and (CARRIER_DELAY is null and WEATHER_DELAY is null and NAS_DELAY is null and SECURITY_DELAY is null and LATE_AIRCRAFT_DELAY is null  )"))

[0;31m---------------------------------------------------------------------------[0m
[0;31mTypeError[0m                                 Traceback (most recent call last)
File [0;32m<command-5381617944348395>, line 20[0m
[1;32m     17[0m predictions [38;5;241m=[39m model[38;5;241m.[39mtransform(test_df)
[1;32m     19[0m [38;5;66;03m# Evaluate the model[39;00m
[0;32m---> 20[0m evaluator [38;5;241m=[39m BinaryClassificationEvaluator(labelCol[38;5;241m=[39m[38;5;124m"[39m[38;5;124mindexedLabel[39m[38;5;124m"[39m, predictionCol[38;5;241m=[39m[38;5;124m"[39m[38;5;124mprediction[39m[38;5;124m"[39m, metricName[38;5;241m=[39m[38;5;124m"[39m[38;5;124maccuracy[39m[38;5;124m"[39m)
[1;32m     21[0m accuracy [38;5;241m=[39m evaluator[38;5;241m.[39mevaluate(predictions)
[1;32m     22[0m [38;5;28mprint[39m([38;5;124m"[39m[38;5;124mAccuracy:[39m[38;5;124m"[39m, accuracy)

File [0;32m/databricks/spark/python/pyspark/__init__.py:120[0m, in [0

In [0]:
display(spark.sql("select CAST(ARR_DELAY AS double) as Flight_Delay_In_Minutes from otpw_60 where CANCELLED = 0"))

[0;31m---------------------------------------------------------------------------[0m
[0;31mTypeError[0m                                 Traceback (most recent call last)
File [0;32m<command-5381617944348395>, line 20[0m
[1;32m     17[0m predictions [38;5;241m=[39m model[38;5;241m.[39mtransform(test_df)
[1;32m     19[0m [38;5;66;03m# Evaluate the model[39;00m
[0;32m---> 20[0m evaluator [38;5;241m=[39m BinaryClassificationEvaluator(labelCol[38;5;241m=[39m[38;5;124m"[39m[38;5;124mindexedLabel[39m[38;5;124m"[39m, predictionCol[38;5;241m=[39m[38;5;124m"[39m[38;5;124mprediction[39m[38;5;124m"[39m, metricName[38;5;241m=[39m[38;5;124m"[39m[38;5;124maccuracy[39m[38;5;124m"[39m)
[1;32m     21[0m accuracy [38;5;241m=[39m evaluator[38;5;241m.[39mevaluate(predictions)
[1;32m     22[0m [38;5;28mprint[39m([38;5;124m"[39m[38;5;124mAccuracy:[39m[38;5;124m"[39m, accuracy)

File [0;32m/databricks/spark/python/pyspark/__init__.py:120[0m, in [0

In [0]:
display(spark.sql("select count(*) from otpw_60 where CANCELLED = 0 and ARR_DELAY > 0"))
display(spark.sql("select count(*) from otpw_60 where CANCELLED = 0"))

[0;31m---------------------------------------------------------------------------[0m
[0;31mTypeError[0m                                 Traceback (most recent call last)
File [0;32m<command-5381617944348395>, line 20[0m
[1;32m     17[0m predictions [38;5;241m=[39m model[38;5;241m.[39mtransform(test_df)
[1;32m     19[0m [38;5;66;03m# Evaluate the model[39;00m
[0;32m---> 20[0m evaluator [38;5;241m=[39m BinaryClassificationEvaluator(labelCol[38;5;241m=[39m[38;5;124m"[39m[38;5;124mindexedLabel[39m[38;5;124m"[39m, predictionCol[38;5;241m=[39m[38;5;124m"[39m[38;5;124mprediction[39m[38;5;124m"[39m, metricName[38;5;241m=[39m[38;5;124m"[39m[38;5;124maccuracy[39m[38;5;124m"[39m)
[1;32m     21[0m accuracy [38;5;241m=[39m evaluator[38;5;241m.[39mevaluate(predictions)
[1;32m     22[0m [38;5;28mprint[39m([38;5;124m"[39m[38;5;124mAccuracy:[39m[38;5;124m"[39m, accuracy)

File [0;32m/databricks/spark/python/pyspark/__init__.py:120[0m, in [0

In [0]:
display(spark.sql("SELECT fl_date, Avg(hourlyvisibility) FROM otpw_60 Where ARR_DELAY > 0 and WEATHER_DELAY is not null group by fl_date"))

[0;31m---------------------------------------------------------------------------[0m
[0;31mTypeError[0m                                 Traceback (most recent call last)
File [0;32m<command-5381617944348395>, line 20[0m
[1;32m     17[0m predictions [38;5;241m=[39m model[38;5;241m.[39mtransform(test_df)
[1;32m     19[0m [38;5;66;03m# Evaluate the model[39;00m
[0;32m---> 20[0m evaluator [38;5;241m=[39m BinaryClassificationEvaluator(labelCol[38;5;241m=[39m[38;5;124m"[39m[38;5;124mindexedLabel[39m[38;5;124m"[39m, predictionCol[38;5;241m=[39m[38;5;124m"[39m[38;5;124mprediction[39m[38;5;124m"[39m, metricName[38;5;241m=[39m[38;5;124m"[39m[38;5;124maccuracy[39m[38;5;124m"[39m)
[1;32m     21[0m accuracy [38;5;241m=[39m evaluator[38;5;241m.[39mevaluate(predictions)
[1;32m     22[0m [38;5;28mprint[39m([38;5;124m"[39m[38;5;124mAccuracy:[39m[38;5;124m"[39m, accuracy)

File [0;32m/databricks/spark/python/pyspark/__init__.py:120[0m, in [0

In [0]:
display(spark.sql("SELECT fl_date, Avg(hourlystationpressure) FROM otpw_60 Where ARR_DELAY > 0 and WEATHER_DELAY is not null group by fl_date"))

[0;31m---------------------------------------------------------------------------[0m
[0;31mTypeError[0m                                 Traceback (most recent call last)
File [0;32m<command-5381617944348395>, line 20[0m
[1;32m     17[0m predictions [38;5;241m=[39m model[38;5;241m.[39mtransform(test_df)
[1;32m     19[0m [38;5;66;03m# Evaluate the model[39;00m
[0;32m---> 20[0m evaluator [38;5;241m=[39m BinaryClassificationEvaluator(labelCol[38;5;241m=[39m[38;5;124m"[39m[38;5;124mindexedLabel[39m[38;5;124m"[39m, predictionCol[38;5;241m=[39m[38;5;124m"[39m[38;5;124mprediction[39m[38;5;124m"[39m, metricName[38;5;241m=[39m[38;5;124m"[39m[38;5;124maccuracy[39m[38;5;124m"[39m)
[1;32m     21[0m accuracy [38;5;241m=[39m evaluator[38;5;241m.[39mevaluate(predictions)
[1;32m     22[0m [38;5;28mprint[39m([38;5;124m"[39m[38;5;124mAccuracy:[39m[38;5;124m"[39m, accuracy)

File [0;32m/databricks/spark/python/pyspark/__init__.py:120[0m, in [0

In [0]:
display(spark.sql("SELECT fl_date, Avg(HourlyPrecipitation) FROM otpw_60 Where ARR_DELAY > 0 and WEATHER_DELAY is not null group by fl_date"))

[0;31m---------------------------------------------------------------------------[0m
[0;31mTypeError[0m                                 Traceback (most recent call last)
File [0;32m<command-5381617944348395>, line 20[0m
[1;32m     17[0m predictions [38;5;241m=[39m model[38;5;241m.[39mtransform(test_df)
[1;32m     19[0m [38;5;66;03m# Evaluate the model[39;00m
[0;32m---> 20[0m evaluator [38;5;241m=[39m BinaryClassificationEvaluator(labelCol[38;5;241m=[39m[38;5;124m"[39m[38;5;124mindexedLabel[39m[38;5;124m"[39m, predictionCol[38;5;241m=[39m[38;5;124m"[39m[38;5;124mprediction[39m[38;5;124m"[39m, metricName[38;5;241m=[39m[38;5;124m"[39m[38;5;124maccuracy[39m[38;5;124m"[39m)
[1;32m     21[0m accuracy [38;5;241m=[39m evaluator[38;5;241m.[39mevaluate(predictions)
[1;32m     22[0m [38;5;28mprint[39m([38;5;124m"[39m[38;5;124mAccuracy:[39m[38;5;124m"[39m, accuracy)

File [0;32m/databricks/spark/python/pyspark/__init__.py:120[0m, in [0