In [0]:
# ============================================================
# RQ5 — NYC Tail Risk (Mean vs P90/P95 + Predictive Tail Proxy)
# Databricks Notebook (Python) — FULL CODE (copy/paste)
#
# What this produces:
# A) Overall NYC tail metrics: mean, P90, P95, P(response>8)
# B) Tail metrics by hour (table for dashboard/report)
# C) Predictive tail proxy using GBTRegressor:
#    compare actual P90 vs predicted P90 on test set
#
# Notes:
# - This is NOT true quantile regression; it is a tail proxy.
# - Works on Serverless (no persist/cache required).
# ============================================================

from pyspark.sql import functions as F
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import GBTRegressor
from pyspark.ml import Pipeline

# -----------------------------
# 0) Load NYC Data
# -----------------------------
TABLE_NYC = "workspace.capstone_project.nyc_model_ready"

df = spark.table(TABLE_NYC).filter(F.col("response_minutes").isNotNull())

print("NYC rows (non-null response_minutes):", df.count())
df.select("response_minutes").summary().show()

# -----------------------------
# 1) Part A — TRUE tail metrics (overall)
# -----------------------------
df.createOrReplaceTempView("nyc")

tail_overall = spark.sql("""
SELECT
  AVG(response_minutes) AS mean_minutes,
  percentile_approx(response_minutes, 0.90) AS p90_minutes,
  percentile_approx(response_minutes, 0.95) AS p95_minutes,
  AVG(CASE WHEN response_minutes > 8 THEN 1 ELSE 0 END) AS prob_over_8
FROM nyc
""")

print("\n=== NYC Overall Tail Metrics ===")
tail_overall.show(truncate=False)

# -----------------------------
# 2) Part B — Tail metrics by hour (strong evidence for RQ5)
# -----------------------------
tail_by_hour = spark.sql("""
SELECT
  hour,
  AVG(response_minutes) AS mean_minutes,
  percentile_approx(response_minutes, 0.90) AS p90_minutes,
  percentile_approx(response_minutes, 0.95) AS p95_minutes,
  AVG(CASE WHEN response_minutes > 8 THEN 1 ELSE 0 END) AS prob_over_8
FROM nyc
GROUP BY hour
ORDER BY hour
""")

print("\n=== NYC Tail Metrics by Hour ===")
display(tail_by_hour)

# -----------------------------
# 3) Part C — Predictive tail proxy (Teammate Model 2 idea, improved)
#    Model: GBTRegressor predicts response_minutes
#    Evaluation: compare actual P90 vs predicted P90 on test set
# -----------------------------
feature_cols = ["hour", "day_of_week", "calls_past_30min"]  # conservative: all exist in NYC

# Keep only needed columns + drop nulls
df_model = df.select(*(feature_cols + ["response_minutes"])).dropna()

train_df, test_df = df_model.randomSplit([0.8, 0.2], seed=42)

assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")

gbt = GBTRegressor(
    labelCol="response_minutes",
    featuresCol="features",
    maxIter=40,
    maxDepth=5,
    stepSize=0.05,
    seed=42
)

pipeline = Pipeline(stages=[assembler, gbt])

print("\nTraining GBTRegressor tail proxy...")
model_tail = pipeline.fit(train_df)
pred = model_tail.transform(test_df)

pred.createOrReplaceTempView("preds")

tail_compare = spark.sql("""
SELECT
  AVG(response_minutes) AS actual_mean_test,
  percentile_approx(response_minutes, 0.90) AS actual_p90_test,
  percentile_approx(response_minutes, 0.95) AS actual_p95_test,
  percentile_approx(prediction, 0.90) AS predicted_p90_test,
  percentile_approx(prediction, 0.95) AS predicted_p95_test
FROM preds
""")

print("\n=== NYC Predictive Tail Comparison (Test Set) ===")
tail_compare.show(truncate=False)

# -----------------------------
# 4) Optional: Save model (ONLY if your environment allows UC Volumes)
# -----------------------------
SAVE_MODEL = False  # set True if you want to save
if SAVE_MODEL:
    save_path = "/Volumes/workspace/capstone_project/models/rq5_tail_proxy_gbt_nyc"
    model_tail.write().overwrite().save(save_path)
    print("Saved NYC RQ5 tail proxy model to:", save_path)

# -----------------------------
# 5) Optional: Save a single summary table for report
# -----------------------------
# Create a compact combined summary for easy export/reporting
overall_row = tail_overall.first()
compare_row = tail_compare.first()

summary = [(
    "NYC",
    float(overall_row["mean_minutes"]),
    float(overall_row["p90_minutes"]),
    float(overall_row["p95_minutes"]),
    float(overall_row["prob_over_8"]),
    float(compare_row["predicted_p90_test"]),
    float(compare_row["predicted_p95_test"])
)]

summary_df = spark.createDataFrame(
    summary,
    ["city", "mean_minutes", "p90_minutes", "p95_minutes", "prob_over_8", "predicted_p90_test", "predicted_p95_test"]
)

print("\n=== NYC RQ5 Final Summary Row ===")
display(summary_df)
