In [0]:
# Load your clean Silver table
df_silver = spark.table("default.silver_flights_processed")

In [0]:
from pyspark.sql.functions import col, when, count, isnan

In [0]:
df_with_status = df_silver.withColumn(
    "arrival_status",
    when(col("arrival_delay").isNull(), "Cancelled/Diverted")
    .when(col("arrival_delay") >= 15, "Delayed")
    .when(col("arrival_delay") < 0, "Early")
    .otherwise("On-Time") # This covers 0 to 14 minutes
)

df_silver = df_with_status

In [0]:
# Show a sample of the new column
print("Sample of 'arrival_delay' and 'arrival_status':")
df_silver.select("arrival_delay", "arrival_status").show(20)

# See the breakdown of all categories
print("\nCounts for each category:")
df_silver.groupBy("arrival_status").count().show()

Sample of 'arrival_delay' and 'arrival_status':
+-------------+--------------+
|arrival_delay|arrival_status|
+-------------+--------------+
|        -14.0|         Early|
|         -5.0|         Early|
|          0.0|       On-Time|
|         24.0|       Delayed|
|        141.0|       Delayed|
|        -29.0|         Early|
|         23.0|       Delayed|
|        -11.0|         Early|
|         60.0|       Delayed|
|          1.0|       On-Time|
|        -32.0|         Early|
|          6.0|       On-Time|
|        -24.0|         Early|
|        -13.0|         Early|
|         35.0|       Delayed|
|        -26.0|         Early|
|         11.0|       On-Time|
|        -30.0|         Early|
|          6.0|       On-Time|
|        -20.0|         Early|
+-------------+--------------+
only showing top 20 rows

Counts for each category:
+------------------+-------+
|    arrival_status|  count|
+------------------+-------+
|             Early|1537789|
|           On-Time| 436398|
|          

In [0]:
# Get all column names
all_columns = df_silver.columns

# Find just the float/double columns
numeric_cols = [
    c_name for (c_name, c_type) in df_silver.dtypes 
    if c_type in ('float', 'double')
]

# Get all *other* columns
other_cols = [
    c_name for c_name in all_columns 
    if c_name not in numeric_cols
]

# Create expressions for numeric columns (check for null OR nan)
numeric_expressions = [
    count(when(col(c).isNull() | isnan(c), c)).alias(c) 
    for c in numeric_cols
]

# Create expressions for all other columns (check for null only)
other_expressions = [
    count(when(col(c).isNull(), c)).alias(c) 
    for c in other_cols
]

# Combine the lists of expressions
all_expressions = numeric_expressions + other_expressions

# Run the counts and show the result
print("Missing value counts per column (before ML pipeline):")
df_silver.select(*all_expressions).show()

Missing value counts per column (before ML pipeline):
+---------+-------------+----------------+--------+------------+------------+---------+-------------------+------------------------+------------+------------+-----------+------------+-----------+-----------+------------+------------+----------+----------+---------------+-----------------+------+-------+--------------+
|DEP_DELAY|arrival_delay|CRS_ELAPSED_TIME|DISTANCE|airline_name|airline_code|FL_NUMBER|origin_airport_code|destination_airport_code|CRS_DEP_TIME|CRS_ARR_TIME|flight_date|flight_month|flight_year|day_of_week|week_of_year|day_of_month|is_weekend|is_holiday|is_near_holiday|is_holiday_period|season|quarter|arrival_status|
+---------+-------------+----------------+--------+------------+------------+---------+-------------------+------------------------+------------+------------+-----------+------------+-----------+-----------+------------+------------+----------+----------+---------------+-----------------+------+-------+--

In [0]:
from pyspark.sql.functions import col
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler
from pyspark.ml import Pipeline

# --- Prerequisite Check ---
# These are the columns we'll use for this experiment
required_cols = [
    "airline_name", "airline_code", "origin_airport_code", "destination_airport_code",
    "flight_month", "flight_year", "arrival_status"
]
missing_cols = [c for c in required_cols if c not in df_silver.columns]

if missing_cols:
    print(f"❌ ERROR: Your DataFrame is missing columns: {missing_cols}")
else:
    print("✅ All required columns found. Proceeding with ML pipeline.")

    # --- 1. Define Pipeline Stages ---

    # Stage 1: Index the STRING categorical features
    categorical_cols_in = ["airline_name", "airline_code", "origin_airport_code", "destination_airport_code"]
    categorical_cols_indexed = [f"{c}_index" for c in categorical_cols_in]

    indexers = [
        StringIndexer(inputCol=c, outputCol=f"{c}_index", handleInvalid="keep")
        for c in categorical_cols_in
    ]

    # Stage 2: One-Hot Encode the INDEXED categorical features
    categorical_cols_ohe = [f"{c}_ohe" for c in categorical_cols_in]

    ohe_encoder = OneHotEncoder(
        inputCols=categorical_cols_indexed,
        outputCols=categorical_cols_ohe
    )

    # Stage 3: Index the TARGET LABEL ('arrival_status')
    label_indexer = StringIndexer(
        inputCol="arrival_status", 
        outputCol="label"
    )

    # Stage 4: Assemble all features into one vector
    # Our feature list is just the date columns + the OHE columns
    numeric_feature_cols = [
        "flight_month", 
        "flight_year"
    ]
    feature_columns = numeric_feature_cols + categorical_cols_ohe

    assembler = VectorAssembler(
        inputCols=feature_columns,
        outputCol="unscaled_features",
        handleInvalid="skip" # Skips rows with any lingering nulls
    )

    # Stage 5: Scale the feature vector
    scaler = StandardScaler(inputCol="unscaled_features", outputCol="features")

    # --- 2. Create and Run the Pipeline ---
    pipeline = Pipeline(stages=[
        *indexers,    # Unpacks the list of indexers
        ohe_encoder,
        label_indexer,
        assembler,
        scaler
    ])

    print("Fitting the ML pipeline...")
    pipeline_model = pipeline.fit(df_silver)
    df_gold_ml = pipeline_model.transform(df_silver)

    # --- 3. Final Table Selection ---
    df_gold_ml_final = df_gold_ml.select("features", "label")



✅ All required columns found. Proceeding with ML pipeline.
Fitting the ML pipeline...


[0;31m---------------------------------------------------------------------------[0m
[0;31mSparkException[0m                            Traceback (most recent call last)
File [0;32m<command-8141152329916539>, line 70[0m
[1;32m     61[0m pipeline [38;5;241m=[39m Pipeline(stages[38;5;241m=[39m[
[1;32m     62[0m     [38;5;241m*[39mindexers,    [38;5;66;03m# Unpacks the list of indexers[39;00m
[1;32m     63[0m     ohe_encoder,
[0;32m   (...)[0m
[1;32m     66[0m     scaler
[1;32m     67[0m ])
[1;32m     69[0m [38;5;28mprint[39m([38;5;124m"[39m[38;5;124mFitting the ML pipeline...[39m[38;5;124m"[39m)
[0;32m---> 70[0m pipeline_model [38;5;241m=[39m pipeline[38;5;241m.[39mfit(df_silver)
[1;32m     71[0m df_gold_ml [38;5;241m=[39m pipeline_model[38;5;241m.[39mtransform(df_silver)
[1;32m     73[0m [38;5;66;03m# --- 3. Final Table Selection ---[39;00m

File [0;32m/databricks/python_shell/lib/dbruntime/MLWorkloadsInstrumentation/_pyspark.py:30[0

In [0]:
# This is the final DataFrame from your ML pipeline
assert df_gold_ml_final, "The DataFrame 'df_gold_ml_final' does not exist."

# Define the paths for your new Gold ML table
GOLD_ML_PATH = "/Volumes/workspace/default/ds_capstone/gold/ml_features_experimental"
GOLD_ML_TABLE_NAME = "default.gold_ml_features_experimental"
DATABASE_NAME = "default"

assert DATABASE_NAME, "DATABASE_NAME is not defined."

com.databricks.backend.common.rpc.CommandCancelledException
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$5(SequenceExecutionState.scala:139)
	at scala.Option.getOrElse(Option.scala:201)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3(SequenceExecutionState.scala:139)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3$adapted(SequenceExecutionState.scala:136)
	at scala.collection.immutable.Range.foreach(Range.scala:192)
	at com.databricks.spark.chauffeur.SequenceExecutionState.cancel(SequenceExecutionState.scala:136)
	at com.databricks.spark.chauffeur.ExecContextState.cancelRunningSequence(ExecContextState.scala:721)
	at com.databricks.spark.chauffeur.ExecContextState.$anonfun$cancel$1(ExecContextState.scala:441)
	at scala.Option.getOrElse(Option.scala:201)
	at com.databricks.spark.chauffeur.ExecContextState.cancel(ExecContextState.scala:441)
	at com.databricks.spark.chauffeur.ExecutionContextManagerV1.can

In [0]:
def path_exists(path):
    """Check if a path exists"""
    try:
        dbutils.fs.ls(path)
        return True
    except:
        return False

def create_directory_if_not_exists(path):
    """Create directory if it doesn't exist"""
    if not path_exists(path):
        dbutils.fs.mkdirs(path)
        print(f"✅ Created directory: {path}")
    else:
        print(f"ℹ️  Directory already exists: {path}")

def table_exists(table_name):
    """Check if a table exists"""
    try:
        spark.table(table_name)
        return True
    except:
        return False

com.databricks.backend.common.rpc.CommandCancelledException
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$5(SequenceExecutionState.scala:139)
	at scala.Option.getOrElse(Option.scala:201)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3(SequenceExecutionState.scala:139)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3$adapted(SequenceExecutionState.scala:136)
	at scala.collection.immutable.Range.foreach(Range.scala:192)
	at com.databricks.spark.chauffeur.SequenceExecutionState.cancel(SequenceExecutionState.scala:136)
	at com.databricks.spark.chauffeur.ExecContextState.cancelRunningSequence(ExecContextState.scala:721)
	at com.databricks.spark.chauffeur.ExecContextState.$anonfun$cancel$1(ExecContextState.scala:441)
	at scala.Option.getOrElse(Option.scala:201)
	at com.databricks.spark.chauffeur.ExecContextState.cancel(ExecContextState.scala:441)
	at com.databricks.spark.chauffeur.ExecutionContextManagerV1.can

In [0]:
# Check if Gold ML path exists and clean if needed
print(f"\n📁 Checking Gold ML path: {GOLD_ML_PATH}")
if path_exists(GOLD_ML_PATH):
    print(f"⚠️  Path already exists. Checking if it's a valid Delta table...")
    try:
        # Try to read as Delta
        test_df = spark.read.format("delta").load(GOLD_ML_PATH)
        print(f"✅ Valid Delta table found with {test_df.count()} records")
        print(f"💡 Will overwrite existing table")
    except:
        print(f"⚠️  Path exists but is not a valid Delta table")
        print(f"🧹 Cleaning up old data...")
        dbutils.fs.rm(GOLD_ML_PATH, recurse=True)
        print(f"✅ Old data removed")
else:
    print(f"✅ Path is clear, ready to create new table")

# Create parent directory if needed
gold_ml_parent = "/".join(GOLD_ML_PATH.split("/")[:-1])
create_directory_if_not_exists(gold_ml_parent)

# Write Gold ML Delta table
print(f"\n💾 Writing Gold ML Delta table...")
try:
    df_gold_ml_final.write.format("delta").mode("overwrite").save(GOLD_ML_PATH)
    print(f"✅ Delta table written to: {GOLD_ML_PATH}")
    print(f"✅ Records written: {df_gold_ml_final.count():,}")
except Exception as e:
    print(f"❌ ERROR: Could not write Delta table")
    print(f"   Error: {str(e)}")
    print(f"\n💡 Trying to clean and retry...")
    try:
        dbutils.fs.rm(GOLD_ML_PATH, recurse=True)
        df_gold_ml_final.write.format("delta").mode("overwrite").save(GOLD_ML_PATH)
        print(f"✅ Successfully wrote Delta table after cleanup")
    except Exception as e2:
        print(f"❌ Still failed: {str(e2)}")
        raise

# Registering Delta table
print(f"\n📌 Registering Delta table as: {GOLD_ML_TABLE_NAME}")
try:
    # Ensure database exists
    spark.sql(f"CREATE DATABASE IF NOT EXISTS {DATABASE_NAME}")
    print(f"✅ Database '{DATABASE_NAME}' ready")
    
    # Drop table if it exists (to avoid conflicts)
    spark.sql(f"DROP TABLE IF EXISTS {GOLD_ML_TABLE_NAME}")
    print(f"   Dropped existing table (if any)")
    
    # Create managed table 
    df_for_table = spark.read.format("delta").load(GOLD_ML_PATH)
    df_for_table.write.format("delta").mode("overwrite").saveAsTable(GOLD_ML_TABLE_NAME)
    
    print(f"✅ Table registered successfully as '{GOLD_ML_TABLE_NAME}'!")
except Exception as e:
    print(f"⚠️  Could not create table with saveAsTable, trying alternative method...")
    try:
        # Alternative: Create external table with explicit LOCATION
        spark.sql(f"""
            CREATE TABLE IF NOT EXISTS {GOLD_ML_TABLE_NAME}
            USING DELTA
            LOCATION '{GOLD_ML_PATH}'
        """)
        print(f"✅ Table registered with LOCATION clause!")
    except Exception as e2:
        print(f"⚠️  Table registration failed: {str(e2)}")
        print(f"💡 You can still access the data directly using:")
        print(f"   spark.read.format('delta').load('{GOLD_ML_PATH}')")

com.databricks.backend.common.rpc.CommandCancelledException
	at com.databricks.spark.chauffeur.ExecContextState.cancel(ExecContextState.scala:440)
	at com.databricks.spark.chauffeur.ExecutionContextManagerV1.cancelExecution(ExecutionContextManagerV1.scala:470)
	at com.databricks.spark.chauffeur.ChauffeurState.$anonfun$process$1(ChauffeurState.scala:768)
	at com.databricks.logging.UsageLogging.$anonfun$recordOperation$1(UsageLogging.scala:510)
	at com.databricks.logging.UsageLogging.executeThunkAndCaptureResultTags$1(UsageLogging.scala:616)
	at com.databricks.logging.UsageLogging.$anonfun$recordOperationWithResultTags$4(UsageLogging.scala:643)
	at com.databricks.logging.AttributionContextTracing.$anonfun$withAttributionContext$1(AttributionContextTracing.scala:80)
	at com.databricks.logging.AttributionContext$.$anonfun$withValue$1(AttributionContext.scala:348)
	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:59)
	at com.databricks.logging.AttributionContext$.withValue(Attr