In [0]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorSlicer,VectorAssembler, StandardScaler
from pyspark.ml.classification import RandomForestClassifier, LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from xgboost.spark import SparkXGBClassifier
from pyspark.sql import functions as F
from pyspark.sql.functions import col
from pyspark.sql.types import DoubleType
from pyspark.ml.functions import vector_to_array

In [0]:
blob_container  = "w261projectteam33"       # The name of your container created in https://portal.azure.com
storage_account = "w261projectteam33"  # The name of your Storage account created in https://portal.azure.com
secret_scope    = "w261projectteam33_scope"          # The name of the scope created in your local computer using the Databricks CLI
secret_key      = "w261projectteam33"             # The name of the secret key created in your local computer using the Databricks CLI

team_blob_url   = f"wasbs://{blob_container}@{storage_account}.blob.core.windows.net"  #points to the root of your team storage bucket

In [0]:
# the 261 course blob storage is mounted here.
mids261_mount_path = "/mnt/mids-w261"

In [0]:
# SAS Token: Grant the team limited access to Azure Storage resources
spark.conf.set(
  f"fs.azure.sas.{blob_container}.{storage_account}.blob.core.windows.net",
  dbutils.secrets.get(scope = secret_scope, key = secret_key)
)

#df_cleaned_row.write.parquet(f"{team_blob_url}/1yr_otpw_clean")

# see what's in the blob storage root folder 
#display(dbutils.fs.ls(f"{team_blob_url}"))
#        "test": f"{team_blob_url}/5yr_step7_holdout_pagerank_df"


Cross Validation data and model

In [0]:
# Define the paths for the cross-validation datasets 1yr_otpw_downsample_train_{fold}
cv_data_paths = {
    "fold_0": {"train": f"{team_blob_url}/cross_validation/5yr_step8_downsampled_no_pr_train_fold_fold_0",
               "validation": f"{team_blob_url}/cross_validation/5yr_step6_cv_validation_fold_0"},
    "fold_1": {"train": f"{team_blob_url}/cross_validation/5yr_step8_downsampled_no_pr_train_fold_fold_1",
               "validation": f"{team_blob_url}/cross_validation/5yr_step6_cv_validation_fold_1"},
    "fold_2": {"train": f"{team_blob_url}/cross_validation/5yr_step8_downsampled_no_pr_train_fold_fold_2",
               "validation": f"{team_blob_url}/cross_validation/5yr_step6_cv_validation_fold_2"},
    "fold_3": {"train": f"{team_blob_url}/cross_validation/5yr_step8_downsampled_no_pr_train_fold_fold_3",
               "validation": f"{team_blob_url}/cross_validation/5yr_step6_cv_validation_fold_3"},
    "fold_4": {"train": f"{team_blob_url}/cross_validation/5yr_step8_downsampled_no_pr_train_fold_fold_4",
               "validation": f"{team_blob_url}/cross_validation/5yr_step6_cv_validation_fold_4"}
}


In [0]:
from pyspark.sql.functions import col

# Define function to calculate precision and recall
def calculate_precision_recall(predictions):
    tp = predictions.filter((col("final_prediction") == 1) & (col("DEP_DEL15") == 1)).count()
    fp = predictions.filter((col("final_prediction") == 1) & (col("DEP_DEL15") == 0)).count()
    fn = predictions.filter((col("final_prediction") == 0) & (col("DEP_DEL15") == 1)).count()
    
    precision = tp / (tp + fp) if (tp + fp) > 0 else 0
    recall = tp / (tp + fn) if (tp + fn) > 0 else 0
    return precision, recall

# Custom F-beta score function
def fbeta_score(predictions, beta=2.0):
    tp = predictions.filter(col("final_prediction") == 1).filter(col("DEP_DEL15") == 1).count()
    fp = predictions.filter(col("final_prediction") == 1).filter(col("DEP_DEL15") == 0).count()
    fn = predictions.filter(col("final_prediction") == 0).filter(col("DEP_DEL15") == 1).count()

    precision = tp / (tp + fp) if (tp + fp) > 0 else 0
    recall = tp / (tp + fn) if (tp + fn) > 0 else 0

    fbeta = (1 + beta**2) * (precision * recall) / (beta**2 * precision + recall) if (beta**2 * precision + recall) > 0 else 0
    return fbeta


In [0]:
#Feature columns
numeric_columns = ['DEP_DELAY_lag_1','CRS_ELAPSED_TIME', 'dest_airport_lat','dest_airport_lon', 'origin_airport_lat','origin_airport_lon','ELEVATION','DISTANCE']
categorical_columns = ['DAY_OF_WEEK','DAY_OF_MONTH','QUARTER', 'TAIL_NUM','OP_CARRIER_FL_NUM']

#experimentation 1:

In [0]:
# Define the initial pipeline for preprocessing and XGBoost
indexers = [StringIndexer(inputCol=col, outputCol=col + "_index", handleInvalid="skip") for col in categorical_columns]

categorical_assembler = VectorAssembler(inputCols=[col + "_index" for col in categorical_columns], outputCol="categorical_features")

# Combine All Features
final_assembler = VectorAssembler(inputCols=["categorical_features"] + numeric_columns, outputCol="features")

# Standardization
scaler = StandardScaler(inputCol="features", outputCol="scaled_features")

# XGBoost Classifier
xgb_classifier = SparkXGBClassifier(
    features_col="scaled_features",  
    label_col="DEP_DEL15",
    prediction_col="xgb_prediction",  
    probability_col="xgb_probability",  
    raw_prediction_col="xgb_rawPrediction",  
    num_workers=3,  
    tree_method='hist',   
    max_depth=5,            
    eta=0.2,              
    num_round=100,          
    seed=101,                
    missing=0.0,           
    enable_sparse_data_optim=True  
)

# Random Forest Classifier
rf_classifier = RandomForestClassifier(
    featuresCol="scaled_features", 
    labelCol="DEP_DEL15", 
    predictionCol="rf_prediction",  
    probabilityCol="rf_probability",  
    rawPredictionCol="rf_rawPrediction",  
    numTrees=3, maxDepth=3, seed=123, maxBins=10000
)

# Assemble features for stacking
assembler_stacked = VectorAssembler(
    inputCols=["xgb_prob_class_1", "rf_prob_class_1"],
    outputCol="stacked_features"
)

# Meta model (Logistic Regression)
meta_model = LogisticRegression(
    featuresCol="stacked_features", 
    labelCol="DEP_DEL15", 
    predictionCol="final_prediction"  
)

# Define the preprocessing pipeline (without models)
preprocessing_pipeline = Pipeline(stages=indexers + [categorical_assembler, final_assembler, scaler])

# Define the final pipeline for stacking and meta-model
pipeline_stage_2 = Pipeline(stages=[assembler_stacked, meta_model])

# Evaluator setup
evaluator = MulticlassClassificationEvaluator(labelCol="DEP_DEL15", predictionCol="final_prediction")

# Placeholder for storing results
results = []

In [0]:
# Cross-validation loop
for fold, paths in cv_data_paths.items():
    print(f"Starting fold {fold}...")

    # Load train and validation datasets for this fold
    train_df = spark.read.parquet(paths['train'])
    validation_df = spark.read.parquet(paths['validation'])
    print("Loaded train and validation datasets")

    # Convert string columns to numeric types
    for col_name in numeric_columns:
        train_df = train_df.withColumn(col_name, col(col_name).cast(DoubleType()))
        validation_df = validation_df.withColumn(col_name, col(col_name).cast(DoubleType()))
    print("Converted string columns to numeric types")

    # Preprocess the data (apply transformations except model predictions)
    pipeline_model_stage_1 = preprocessing_pipeline.fit(train_df)
    train_df_stage_1 = pipeline_model_stage_1.transform(train_df)
    validation_df_stage_1 = pipeline_model_stage_1.transform(validation_df)
    print("Preprocessed the data")

    # Run the XGBoost model on the preprocessed data
    xgb_model = xgb_classifier.fit(train_df_stage_1)
    train_df_stage_1 = xgb_model.transform(train_df_stage_1)
    validation_df_stage_1 = xgb_model.transform(validation_df_stage_1)
    print("Ran XGBoost on the preprocessed data")

    # Extract the class 1 probability from xgb_probability using vector_to_array
    train_df_stage_1 = train_df_stage_1.withColumn("xgb_prob_class_1", vector_to_array(col("xgb_probability"))[1])
    validation_df_stage_1 = validation_df_stage_1.withColumn("xgb_prob_class_1", vector_to_array(col("xgb_probability"))[1])
    print("Extracted XGBoost class 1 probabilities")

    # Run the RandomForestClassifier separately on the preprocessed data (not the XGBoost output)
    rf_model = rf_classifier.fit(train_df_stage_1)
    train_df_stage_1 = rf_model.transform(train_df_stage_1)
    validation_df_stage_1 = rf_model.transform(validation_df_stage_1)
    print("Ran RandomForest on the preprocessed data")

    # Extract the class 1 probability from rf_probability using vector_to_array
    train_df_stage_1 = train_df_stage_1.withColumn("rf_prob_class_1", vector_to_array(col("rf_probability"))[1])
    validation_df_stage_1 = validation_df_stage_1.withColumn("rf_prob_class_1", vector_to_array(col("rf_probability"))[1])
    print("Extracted RandomForest class 1 probabilities")

    # Stage 2: Fit and transform the final pipeline on the same DataFrame (containing both probabilities)
    pipeline_model_stage_2 = pipeline_stage_2.fit(train_df_stage_1)
    train_df_final = pipeline_model_stage_2.transform(train_df_stage_1)
    validation_df_final = pipeline_model_stage_2.transform(validation_df_stage_1)
    print("Fitted and transformed the final pipeline")

    # Calculate precision and recall using the custom function
    precision, recall = calculate_precision_recall(validation_df_final)
    
    # Calculate F2 score using the custom function
    f2_score = fbeta_score(validation_df_final, beta=2.0)

    # Store the metrics
    results.append({
        "fold": fold,
        "f2_score": f2_score,
        "precision": precision,
        "recall": recall
    })

    # Print the results for the current fold
    print(f"Fold {fold} results:")
    print(f"  F2 Score: {f2_score:.4f}")
    print(f"  Precision: {precision:.4f}")
    print(f"  Recall: {recall:.4f}")
    print("\n")

# After all folds are completed, print the summary of results
print("Cross-validation complete.")
for result in results:
    print(f"Fold {result['fold']} - F2 Score: {result['f2_score']:.4f}, Precision: {result['precision']:.4f}, Recall: {result['recall']:.4f}")

Starting fold fold_0...
Loaded train and validation datasets
Converted string columns to numeric types


Downloading artifacts:   0%|          | 0/75 [00:00<?, ?it/s]

Uploading artifacts:   0%|          | 0/4 [00:00<?, ?it/s]

Preprocessed the data


2024-08-10 23:25:26,684 INFO XGBoost-PySpark: _fit Running xgboost-2.0.3 on 3 workers with
	booster params: {'objective': 'binary:logistic', 'device': 'cpu', 'max_depth': 5, 'tree_method': 'hist', 'eta': 0.2, 'num_round': 100, 'seed': 101, 'nthread': 1}
	train_call_kwargs_params: {'verbose_eval': True, 'num_boost_round': 100}
	dmatrix_kwargs: {'nthread': 1, 'missing': 0.0}
2024-08-10 23:26:10,444 INFO XGBoost-PySpark: _fit Finished xgboost training!


Ran XGBoost on the preprocessed data
Extracted XGBoost class 1 probabilities


Downloading artifacts:   0%|          | 0/20 [00:00<?, ?it/s]

Uploading artifacts:   0%|          | 0/4 [00:00<?, ?it/s]

Ran RandomForest on the preprocessed data
Extracted RandomForest class 1 probabilities


Downloading artifacts:   0%|          | 0/20 [00:00<?, ?it/s]

Uploading artifacts:   0%|          | 0/4 [00:00<?, ?it/s]

Fitted and transformed the final pipeline
Fold fold_0 results:
  F2 Score: 0.4062
  Precision: 0.3713
  Recall: 0.4159


Starting fold fold_1...
Loaded train and validation datasets
Converted string columns to numeric types


Downloading artifacts:   0%|          | 0/75 [00:00<?, ?it/s]

Uploading artifacts:   0%|          | 0/4 [00:00<?, ?it/s]

Preprocessed the data


2024-08-10 23:42:27,057 INFO XGBoost-PySpark: _fit Running xgboost-2.0.3 on 3 workers with
	booster params: {'objective': 'binary:logistic', 'device': 'cpu', 'max_depth': 5, 'tree_method': 'hist', 'eta': 0.2, 'num_round': 100, 'seed': 101, 'nthread': 1}
	train_call_kwargs_params: {'verbose_eval': True, 'num_boost_round': 100}
	dmatrix_kwargs: {'nthread': 1, 'missing': 0.0}
2024-08-10 23:43:05,058 INFO XGBoost-PySpark: _fit Finished xgboost training!


Ran XGBoost on the preprocessed data
Extracted XGBoost class 1 probabilities


Downloading artifacts:   0%|          | 0/20 [00:00<?, ?it/s]

Uploading artifacts:   0%|          | 0/4 [00:00<?, ?it/s]

Ran RandomForest on the preprocessed data
Extracted RandomForest class 1 probabilities


Downloading artifacts:   0%|          | 0/20 [00:00<?, ?it/s]

Uploading artifacts:   0%|          | 0/4 [00:00<?, ?it/s]

Fitted and transformed the final pipeline
Fold fold_1 results:
  F2 Score: 0.5188
  Precision: 0.2683
  Recall: 0.6767


Starting fold fold_2...
Loaded train and validation datasets
Converted string columns to numeric types


Downloading artifacts:   0%|          | 0/75 [00:00<?, ?it/s]

Uploading artifacts:   0%|          | 0/4 [00:00<?, ?it/s]

Preprocessed the data


2024-08-10 23:58:14,362 INFO XGBoost-PySpark: _fit Running xgboost-2.0.3 on 3 workers with
	booster params: {'objective': 'binary:logistic', 'device': 'cpu', 'max_depth': 5, 'tree_method': 'hist', 'eta': 0.2, 'num_round': 100, 'seed': 101, 'nthread': 1}
	train_call_kwargs_params: {'verbose_eval': True, 'num_boost_round': 100}
	dmatrix_kwargs: {'nthread': 1, 'missing': 0.0}
2024-08-10 23:58:46,308 INFO XGBoost-PySpark: _fit Finished xgboost training!


Ran XGBoost on the preprocessed data
Extracted XGBoost class 1 probabilities


Downloading artifacts:   0%|          | 0/20 [00:00<?, ?it/s]

Uploading artifacts:   0%|          | 0/4 [00:00<?, ?it/s]

Ran RandomForest on the preprocessed data
Extracted RandomForest class 1 probabilities


Downloading artifacts:   0%|          | 0/20 [00:00<?, ?it/s]

Uploading artifacts:   0%|          | 0/4 [00:00<?, ?it/s]

Fitted and transformed the final pipeline
Fold fold_2 results:
  F2 Score: 0.5745
  Precision: 0.3992
  Recall: 0.6453


Starting fold fold_3...
Loaded train and validation datasets
Converted string columns to numeric types


Downloading artifacts:   0%|          | 0/75 [00:00<?, ?it/s]

Uploading artifacts:   0%|          | 0/4 [00:00<?, ?it/s]

Preprocessed the data


2024-08-11 00:14:30,754 INFO XGBoost-PySpark: _fit Running xgboost-2.0.3 on 3 workers with
	booster params: {'objective': 'binary:logistic', 'device': 'cpu', 'max_depth': 5, 'tree_method': 'hist', 'eta': 0.2, 'num_round': 100, 'seed': 101, 'nthread': 1}
	train_call_kwargs_params: {'verbose_eval': True, 'num_boost_round': 100}
	dmatrix_kwargs: {'nthread': 1, 'missing': 0.0}
2024-08-11 00:15:11,276 INFO XGBoost-PySpark: _fit Finished xgboost training!


Ran XGBoost on the preprocessed data
Extracted XGBoost class 1 probabilities


Downloading artifacts:   0%|          | 0/20 [00:00<?, ?it/s]

Uploading artifacts:   0%|          | 0/4 [00:00<?, ?it/s]

Ran RandomForest on the preprocessed data
Extracted RandomForest class 1 probabilities


Downloading artifacts:   0%|          | 0/20 [00:00<?, ?it/s]

Uploading artifacts:   0%|          | 0/4 [00:00<?, ?it/s]

Fitted and transformed the final pipeline
Fold fold_3 results:
  F2 Score: 0.5616
  Precision: 0.3066
  Recall: 0.7091


Starting fold fold_4...
Loaded train and validation datasets
Converted string columns to numeric types


Downloading artifacts:   0%|          | 0/75 [00:00<?, ?it/s]

Uploading artifacts:   0%|          | 0/4 [00:00<?, ?it/s]

Preprocessed the data


2024-08-11 00:28:45,482 INFO XGBoost-PySpark: _fit Running xgboost-2.0.3 on 3 workers with
	booster params: {'objective': 'binary:logistic', 'device': 'cpu', 'max_depth': 5, 'tree_method': 'hist', 'eta': 0.2, 'num_round': 100, 'seed': 101, 'nthread': 1}
	train_call_kwargs_params: {'verbose_eval': True, 'num_boost_round': 100}
	dmatrix_kwargs: {'nthread': 1, 'missing': 0.0}
2024-08-11 00:29:19,969 INFO XGBoost-PySpark: _fit Finished xgboost training!


Ran XGBoost on the preprocessed data
Extracted XGBoost class 1 probabilities


Downloading artifacts:   0%|          | 0/20 [00:00<?, ?it/s]

Uploading artifacts:   0%|          | 0/4 [00:00<?, ?it/s]

Ran RandomForest on the preprocessed data
Extracted RandomForest class 1 probabilities


Downloading artifacts:   0%|          | 0/20 [00:00<?, ?it/s]

Uploading artifacts:   0%|          | 0/4 [00:00<?, ?it/s]

Fitted and transformed the final pipeline
Fold fold_4 results:
  F2 Score: 0.5243
  Precision: 0.3811
  Recall: 0.5786


Cross-validation complete.
Fold fold_0 - F2 Score: 0.4062, Precision: 0.3713, Recall: 0.4159
Fold fold_1 - F2 Score: 0.5188, Precision: 0.2683, Recall: 0.6767
Fold fold_2 - F2 Score: 0.5745, Precision: 0.3992, Recall: 0.6453
Fold fold_3 - F2 Score: 0.5616, Precision: 0.3066, Recall: 0.7091
Fold fold_4 - F2 Score: 0.5243, Precision: 0.3811, Recall: 0.5786


In [0]:
 # Calculate precision and recall using the custom function
precision, recall = calculate_precision_recall(validation_df_final)
    
# Calculate F2 score using the custom function
f2_score = fbeta_score(validation_df_final, beta=2.0)
print("Calculated precision{}, recall, and F2 score")


Calculated precision, recall, and F2 score


In [0]:
precision

0.37130790203096575

In [0]:
recall

0.4159467425551377

In [0]:
f2_score

0.4061804940067907

In [0]:
# Load the test dataset
test_path = f"{team_blob_url}/5yr_step5_holdout_df"
test_df = spark.read.parquet(test_path)
print("Loaded test dataset")

# Convert string columns to numeric types in test set
for col_name in numeric_columns:
    test_df = test_df.withColumn(col_name, col(col_name).cast(DoubleType()))
print("Converted string columns to numeric types in test set")

# Preprocess the test data using the trained preprocessing pipeline
test_df_stage_1 = pipeline_model_stage_1.transform(test_df)
print("Preprocessed the test data")

# Apply the XGBoost model to the preprocessed test data
test_df_stage_1 = xgb_model.transform(test_df_stage_1)
print("Applied XGBoost to the test data")

# Extract the class 1 probability from xgb_probability using vector_to_array
test_df_stage_1 = test_df_stage_1.withColumn("xgb_prob_class_1", vector_to_array(col("xgb_probability"))[1])
print("Extracted XGBoost class 1 probabilities for the test data")

# Apply the RandomForest model to the preprocessed test data
test_df_stage_1 = rf_model.transform(test_df_stage_1)
print("Applied RandomForest to the test data")

# Extract the class 1 probability from rf_probability using vector_to_array
test_df_stage_1 = test_df_stage_1.withColumn("rf_prob_class_1", vector_to_array(col("rf_probability"))[1])
print("Extracted RandomForest class 1 probabilities for the test data")

# Apply the final meta-model to the test data
test_df_final = pipeline_model_stage_2.transform(test_df_stage_1)
print("Applied the final pipeline to the test data")

# Calculate precision and recall using the custom function
precision, recall = calculate_precision_recall(test_df_final)

# Calculate F2 score using the custom function
f2_score = fbeta_score(test_df_final, beta=2.0)

# Print the evaluation metrics for the test set
print(f"Test Set Results:")
print(f"  F2 Score: {f2_score:.4f}")
print(f"  Precision: {precision:.4f}")
print(f"  Recall: {recall:.4f}")


Loaded test dataset
Converted string columns to numeric types in test set
Preprocessed the test data
Applied XGBoost to the test data
Extracted XGBoost class 1 probabilities for the test data
Applied RandomForest to the test data
Extracted RandomForest class 1 probabilities for the test data
Applied the final pipeline to the test data
Test Set Results:
  F2 Score: 0.5505
  Precision: 0.3732
  Recall: 0.6247
