In [None]:
# Cell 1: Imports + params
from datetime import timedelta
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import GBTRegressor, LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

forecast_days = 30
horizon_valid_days = 14
model_version = "gbt_risk_v2"

In [None]:
# Cell 2: Base data hygiene
base = (
    spark.table("gold_country_risk_daily")
    .select("iso3", "as_of_date", "risk_score", "funding_gap_ratio", "in_need_ratio", "flood_area_pct")
    .withColumn("as_of_date", F.to_date("as_of_date"))
    .withColumn("risk_score", F.col("risk_score").cast("double"))
    .withColumn("funding_gap_ratio", F.col("funding_gap_ratio").cast("double"))
    .withColumn("in_need_ratio", F.col("in_need_ratio").cast("double"))
    .withColumn("flood_area_pct", F.col("flood_area_pct").cast("double"))
    .filter(F.col("iso3").rlike("^[A-Z]{3}$"))
    .filter(F.col("as_of_date").isNotNull())
    .filter(F.col("risk_score").isNotNull())
)
display(base.limit(20))
print("rows:", base.count())

In [None]:
# Cell 3: Feature engineering + robust split (auto lag/no-lag)
source_table = "workspace.default.gold_country_risk_history"
try:
    raw = spark.table(source_table)
except Exception:
    source_table = "workspace.default.gold_country_risk_daily"
    raw = spark.table(source_table)

base = (
    raw
    .select(
        "iso3", "as_of_date", "risk_score",
        "flood_area_pct", "in_need_ratio", "funding_gap_ratio"
    )
    .withColumn("as_of_date", F.to_date("as_of_date"))
    .withColumn("risk_score", F.col("risk_score").cast("double"))
    .withColumn("flood_area_pct", F.col("flood_area_pct").cast("double"))
    .withColumn("in_need_ratio", F.col("in_need_ratio").cast("double"))
    .withColumn("funding_gap_ratio", F.col("funding_gap_ratio").cast("double"))
    .filter(F.col("iso3").rlike("^[A-Z]{3}$"))
    .filter(F.col("as_of_date").isNotNull())
    .filter(F.col("risk_score").isNotNull())
)

base_rows = base.count()
distinct_dates = base.select("as_of_date").distinct().count()
print("source_table:", source_table)
print("base rows:", base_rows)
print("distinct dates:", distinct_dates)
base.groupBy("iso3").agg(F.count("*").alias("n")).orderBy(F.col("n").asc()).show(10, False)

if base_rows == 0:
    raise ValueError("No base rows available in source table.")

# Need at least 8 dates to make lag7 and rolling stats meaningful.
use_lag_features = distinct_dates >= 8
print("use_lag_features:", use_lag_features)

if use_lag_features:
    w = Window.partitionBy("iso3").orderBy("as_of_date")
    feat = (
        base
        .withColumn("risk_lag1", F.lag("risk_score", 1).over(w))
        .withColumn("risk_lag7", F.lag("risk_score", 7).over(w))
        .withColumn("risk_ma7", F.avg("risk_score").over(w.rowsBetween(-7, -1)))
        .withColumn("risk_std7", F.coalesce(F.stddev("risk_score").over(w.rowsBetween(-7, -1)), F.lit(0.0)))
        .withColumn("day_of_week", F.dayofweek("as_of_date").cast("double"))
        .withColumn("month", F.month("as_of_date").cast("double"))
        .withColumn("flood_area_pct", F.coalesce(F.col("flood_area_pct"), F.lit(0.0)))
        .withColumn("in_need_ratio", F.coalesce(F.col("in_need_ratio"), F.lit(0.0)))
        .withColumn("funding_gap_ratio", F.coalesce(F.col("funding_gap_ratio"), F.lit(0.0)))
        .withColumn("risk_lag7", F.coalesce(F.col("risk_lag7"), F.col("risk_lag1")))
        .withColumn("risk_ma7", F.coalesce(F.col("risk_ma7"), F.col("risk_lag1")))
        .filter(F.col("risk_lag1").isNotNull())
    )
    feature_cols = [
        "risk_lag1", "risk_lag7", "risk_ma7", "risk_std7",
        "funding_gap_ratio", "in_need_ratio", "flood_area_pct",
        "day_of_week", "month"
    ]
else:
    # Fallback for short history (e.g., single-date snapshots).
    feat = (
        base
        .withColumn("day_of_week", F.dayofweek("as_of_date").cast("double"))
        .withColumn("month", F.month("as_of_date").cast("double"))
        .withColumn("flood_area_pct", F.coalesce(F.col("flood_area_pct"), F.lit(0.0)))
        .withColumn("in_need_ratio", F.coalesce(F.col("in_need_ratio"), F.lit(0.0)))
        .withColumn("funding_gap_ratio", F.coalesce(F.col("funding_gap_ratio"), F.lit(0.0)))
    )
    feature_cols = [
        "funding_gap_ratio", "in_need_ratio", "flood_area_pct",
        "day_of_week", "month"
    ]

feat_count = feat.count()
print("feat rows:", feat_count)
if feat_count == 0:
    raise ValueError("No usable rows after feature engineering.")

max_date = feat.agg(F.max("as_of_date").alias("mx")).first()["mx"]
if max_date is None:
    raise ValueError("No max_date found from features.")

# If date coverage is too small for a temporal split, do random split.
if distinct_dates <= horizon_valid_days:
    train_df, valid_df = feat.randomSplit([0.8, 0.2], seed=42)
    split_date = None
else:
    split_date = max_date - timedelta(days=horizon_valid_days)
    train_df = feat.filter(F.col("as_of_date") <= F.lit(split_date))
    valid_df = feat.filter(F.col("as_of_date") > F.lit(split_date))

train_count = train_df.count()
valid_count = valid_df.count()

if valid_count == 0:
    train_df, valid_df = feat.randomSplit([0.8, 0.2], seed=42)
    train_count = train_df.count()
    valid_count = valid_df.count()

if train_count == 0 or valid_count == 0:
    raise ValueError(f"Insufficient train/valid rows. train={train_count}, valid={valid_count}")

print("max_date:", max_date, "split_date:", split_date)
print("feature_cols:", feature_cols)
print("train rows:", train_count, "valid rows:", valid_count)

In [None]:
# Cell 4: Train ML model with fallback + evaluation
assembler = VectorAssembler(
    inputCols=feature_cols,
    outputCol="features",
    handleInvalid="skip"
)

gbt = GBTRegressor(
    labelCol="risk_score",
    featuresCol="features",
    predictionCol="prediction",
    maxIter=120,
    maxDepth=5,
    stepSize=0.05,   # learning-rate style control (smaller = slower, steadier updates)
    subsamplingRate=0.8,
    seed=42
)

lr = LinearRegression(
    labelCol="risk_score",
    featuresCol="features",
    predictionCol="prediction",
    maxIter=200,
    regParam=0.05,
    elasticNetParam=0.2
)

train_count = train_df.count()
if train_count < 30:
    raise ValueError(f"Too few training rows for ML model. train rows={train_count}")

model_name = "gbt"
try:
    model = Pipeline(stages=[assembler, gbt]).fit(train_df)
except Exception as e:
    print("GBT fit failed; falling back to LinearRegression:", str(e)[:400])
    model = Pipeline(stages=[assembler, lr]).fit(train_df)
    model_name = "linear_regression"

valid_pred = model.transform(valid_df)

# Clamp predictions into risk range
valid_pred = valid_pred.withColumn(
    "prediction",
    F.when(F.col("prediction") < 0, F.lit(0.0))
     .when(F.col("prediction") > 1, F.lit(1.0))
     .otherwise(F.col("prediction"))
)

rmse = RegressionEvaluator(labelCol="risk_score", predictionCol="prediction", metricName="rmse").evaluate(valid_pred)
mae = RegressionEvaluator(labelCol="risk_score", predictionCol="prediction", metricName="mae").evaluate(valid_pred)

print("model_used:", model_name)
print("validation rmse:", round(rmse, 5), "mae:", round(mae, 5))

display(valid_pred.select("iso3", "as_of_date", "risk_score", "prediction"))