In [0]:
import mlflow
from pyspark.sql import functions as F
from pyspark.sql.window import Window

mlflow.set_experiment("/Shared/air_quality_forecasting")

df = spark.table("workspace.air_quality.silver_air_quality")

base = (df
  .filter(F.col("pm25_ugm3").isNotNull() & (~F.isnan("pm25_ugm3")))
  .filter(F.col("pm25_ugm3") > 0)
  .filter(F.col("date").isNotNull() & F.col("city_code").isNotNull())
)

w = Window.partitionBy("city_code").orderBy("date")

ml_df = (base
  .withColumn("day_of_week", F.dayofweek("date").cast("double"))
  .withColumn("month", F.month("date").cast("double"))
  .withColumn("pm25_lag1", F.lag("pm25_ugm3", 1).over(w))
  .filter(F.col("pm25_lag1").isNotNull())
  .select(
      "pm25_ugm3",                       
      "pm25_lag1", "temp_c", "relative_humidity_pct", "wind_speed_ms",
      "day_of_week", "month"
  )
)

train_df, test_df = ml_df.randomSplit([0.8, 0.2], seed=42)
print("Train/Test:", train_df.count(), test_df.count())

Train/Test: 350701 87181


In [0]:
%sql
CREATE VOLUME IF NOT EXISTS workspace.air_quality.mlflow_artifacts;

SHOW VOLUMES IN workspace.air_quality;

database,volume_name
air_quality,data
air_quality,mlflow_artifacts


In [0]:
from pyspark.ml.feature import Imputer, VectorAssembler, StandardScaler
from pyspark.ml.regression import GBTRegressor
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql import functions as F
import mlflow
from mlflow.models import infer_signature
import pandas as pd
import numpy as np
import os

# Serverless compute environment variables
os.environ['SPARKML_TEMP_DFS_PATH'] = '/Volumes/workspace/air_quality/mlflow_artifacts/tmp'
os.environ['MLFLOW_DFS_TMP'] = '/Volumes/workspace/air_quality/mlflow_artifacts/tmp'

feature_cols = ["pm25_lag1", "temp_c", "relative_humidity_pct", "wind_speed_ms", "day_of_week", "month"]

print("🔧 CLEANING: Filter PM2.5 > 2 µg/m³ (remove sensor noise)")
train_df_clean = (train_df
                  .filter(F.col("pm25_ugm3") > 2.0)
                  .withColumn("pm25_log", F.log1p(F.col("pm25_ugm3"))))

test_df_clean = (test_df
                 .filter(F.col("pm25_ugm3") > 2.0)
                 .withColumn("pm25_log", F.log1p(F.col("pm25_ugm3"))))

print(f"✅ Clean dataset: Train {train_df_clean.count():,} | Test {test_df_clean.count():,} rows")

# Production pipeline
imputer = Imputer(inputCols=feature_cols, outputCols=[c + "_imp" for c in feature_cols]).setStrategy("median")
assembler = VectorAssembler(inputCols=[c + "_imp" for c in feature_cols], outputCol="features", handleInvalid="skip")
scaler = StandardScaler(inputCol="features", outputCol="features_scaled", withStd=True, withMean=True)

gbt = GBTRegressor(
    featuresCol="features_scaled", 
    labelCol="pm25_log",
    maxIter=20,
    maxDepth=4, 
    stepSize=0.2,
    seed=42
)

pipeline = Pipeline(stages=[imputer, assembler, scaler, gbt])

# Signature
sample_input = spark.createDataFrame([(45.2, 28.5, 65.0, 2.1, 3.0, 2.0)], feature_cols).toPandas()
sample_output = pd.DataFrame({"prediction": [3.88]})
signature = infer_signature(sample_input, sample_output)

print("🚀 Training serverless model...")

# Train & register
with mlflow.start_run(run_name="pm25_serverless_fixed_v1"):
    model = pipeline.fit(train_df_clean)
    
    pred_log = model.transform(test_df_clean)
    
    # Log-scale metrics
    log_metrics = {}
    for metric in ["rmse", "mae", "r2"]:
        score = RegressionEvaluator(labelCol="pm25_log", predictionCol="prediction", metricName=metric).evaluate(pred_log)
        mlflow.log_metric(f"log_{metric}", score)
        log_metrics[metric] = score
    
    # Original scale metrics - FIXED Column callable errors
    pred_orig = pred_log.withColumn("pred_pm25", F.exp(pred_log["prediction"]) - 1)
    
    rmse_orig = RegressionEvaluator(labelCol="pm25_ugm3", predictionCol="pred_pm25", metricName="rmse").evaluate(pred_orig)
    mae_orig = RegressionEvaluator(labelCol="pm25_ugm3", predictionCol="pred_pm25", metricName="mae").evaluate(pred_orig)
    
    # FIXED: Safe MAPE calculation using expr
    mape_orig = pred_orig.agg(
        F.expr("avg(abs((pm25_ugm3 - pred_pm25)/(pm25_ugm3 + 0.1))) * 100")
    ).collect()[0][0]
    
    mlflow.log_metric("rmse_pm25", rmse_orig)
    mlflow.log_metric("mae_pm25", mae_orig)
    mlflow.log_metric("mape_pm25", mape_orig)
    
    # Register model
    mlflow.spark.log_model(
        model, 
        "pm25_serverless_fixed",
        signature=signature,
        input_example=sample_input,
        registered_model_name="workspace.air_quality.pm25_forecast_serverless_fixed_v1"
    )
    
    print(f"\n🎯 PRODUCTION METRICS (PM2.5 > 2 µg/m³):")
    print(f"   Log RMSE:     {log_metrics['rmse']:.3f}")
    print(f"   PM2.5 RMSE:   {rmse_orig:.1f} µg/m³")
    print(f"   PM2.5 MAE:    {mae_orig:.1f} µg/m³")
    print(f"   PM2.5 MAPE:   {mape_orig:.1f}%")
    print(f"   R² (log):     {log_metrics['r2']:.3f}")
    print("✅ SERVERLESS MODEL REGISTERED!")
    
    # FIXED: Safe display (no callable column errors)
    display(pred_orig
        .select(
            F.col("pm25_ugm3").alias("actual_pm25"),
            F.round(F.col("pred_pm25"), 1).alias("predicted_pm25"),
            F.round(F.abs(F.col("pm25_ugm3") - F.col("pred_pm25")), 1).alias("abs_error"),
            F.round(
                F.when(F.col("pm25_ugm3") > 1, 
                       (F.abs(F.col("pm25_ugm3") - F.col("pred_pm25")) / F.col("pm25_ugm3")) * 100)
                .otherwise(None), 1
            ).alias("mape_%"),
            F.when(F.col("pm25_ugm3") > 100, F.lit("POOR"))
             .when(F.col("pm25_ugm3") > 50, F.lit("MODERATE"))
             .otherwise(F.lit("GOOD")).alias("aqi_category")
        )
        .filter(F.col("pm25_ugm3").between(2, 150))
        .orderBy("actual_pm25")
        .limit(20))

🔧 CLEANING: Filter PM2.5 > 2 µg/m³ (remove sensor noise)
✅ Clean dataset: Train 348,472 | Test 86,641 rows
🚀 Training serverless model...


Successfully registered model 'workspace.air_quality.pm25_forecast_serverless_fixed_v1'.
Created version '1' of model 'workspace.air_quality.pm25_forecast_serverless_fixed_v1'.



🎯 PRODUCTION METRICS (PM2.5 > 2 µg/m³):
   Log RMSE:     0.536
   PM2.5 RMSE:   56.4 µg/m³
   PM2.5 MAE:    28.4 µg/m³
   PM2.5 MAPE:   49.6%
   R² (log):     0.682
✅ SERVERLESS MODEL REGISTERED!


actual_pm25,predicted_pm25,abs_error,mape_%,aqi_category
2.03,11.1,9.0,445.4,GOOD
2.04,9.3,7.3,357.1,GOOD
2.04,26.3,24.3,1190.6,GOOD
2.04,9.1,7.1,346.3,GOOD
2.04,11.4,9.4,459.0,GOOD
2.04,10.9,8.9,436.0,GOOD
2.04,8.2,6.1,301.0,GOOD
2.04,13.8,11.7,575.5,GOOD
2.05,9.3,7.3,355.7,GOOD
2.06,9.2,7.1,344.8,GOOD
