In [None]:
import json
import uuid
from datetime import datetime
from snowflake.snowpark.context import get_active_session
session = get_active_session()


In [None]:
# === PARAMETERS ===
RUN_ID = None  # set to a specific run_id string if you want; otherwise we'll auto-pick latest SUCCEEDED
ASOF_SCOPE = "GLOBAL__WAPE_MICRO_OVERALL"
OVERRIDE_SCOPE = "PC_REASON__WAPE_MICRO_OVERALL__OVERRIDE5PCT"
EPS = 100.0
MAX_HORIZON = 12
EVAL_ANCHORS = 12               # last 12 anchors (months) to backtest
OVERRIDE_MIN_REL_IMPROV = 0.05  # 5%
MAPE_EPSILON = 100              # per your decision
BIAS_MAX_ABS = 0.02             # 2% guardrail (tunable)


In [None]:
from snowflake.snowpark.functions import col
import pandas as pd
import uuid
from datetime import datetime

# Snowflake notebook usually provides `session`
if RUN_ID is None:
    df = session.sql("""
      select run_id
      from DB_BI_P_SANDBOX.SANDBOX.FORECAST_RUNS
      where status = 'SUCCEEDED'
      order by triggered_at desc
      limit 1
    """).to_pandas()
    RUN_ID = df.iloc[0]["RUN_ID"]

asof = session.sql(f"""
  select asof_fiscal_yyyymm
  from DB_BI_P_SANDBOX.SANDBOX.FORECAST_RUNS
  where run_id = '{RUN_ID}'
""").to_pandas().iloc[0]["ASOF_FISCAL_YYYYMM"]

print("RUN_ID:", RUN_ID)
print("ASOF:", asof)


In [None]:
EXPERIMENT_ID = "EXP_GLOBAL_3A_V1"
now = datetime.utcnow()

session.sql(f"""
merge into DB_BI_P_SANDBOX.SANDBOX.FORECAST_MODEL_EXPERIMENTS t
using (select '{EXPERIMENT_ID}' experiment_id) s
on t.experiment_id = s.experiment_id
when not matched then insert (experiment_id, experiment_name, experiment_desc, created_by, created_at, tags)
values (
  '{EXPERIMENT_ID}',
  'Global candidates + per-series overrides (3A)',
  'Train global candidate models; choose global champion + per-series overrides by WAPE improvement.',
  current_user(), current_timestamp(),
  parse_json('{{"type":"3A","mape_epsilon":{MAPE_EPSILON},"override_min_rel_improv":{OVERRIDE_MIN_REL_IMPROV}}}')
);
""").collect()

def new_model_run_id():
    return str(uuid.uuid4())

CANDIDATES = [
    # baseline: seasonal naive (computed in SQL; no sklearn needed)
    {"name": "SEASONAL_NAIVE_LAG12", "family": "baseline", "params": {"kind": "seasonal_naive_lag12"}},
    # ridge regression
    {"name": "RIDGE_OHE", "family": "ridge", "params": {"alpha": 1.0, "target_transform": "signed_log1p"}},
    # gradient boosting (sklearn)
    {"name": "GBR_OHE", "family": "gbr", "params": {"target_transform": "signed_log1p"}},
]

model_runs = []
for c in CANDIDATES:
    mrid = new_model_run_id()
    model_runs.append((c, mrid))
    session.sql(f"""
      insert into DB_BI_P_SANDBOX.SANDBOX.FORECAST_MODEL_RUNS
      (model_run_id, run_id, asof_fiscal_yyyymm, experiment_id, model_scope, model_family, feature_set_id,
       target_name, max_horizon, params, training_env, status, started_at, updated_at)
      select
        '{mrid}', '{RUN_ID}', {asof}, '{EXPERIMENT_ID}', 'GLOBAL',
        '{c["family"]}', 'fs_v1', 'TOTAL_REVENUE', {MAX_HORIZON},
        parse_json('{json.dumps({"candidate": c["name"], **c["params"], "mape_epsilon": MAPE_EPSILON, "eval_anchors": EVAL_ANCHORS})}'),
        parse_json('{{"runner":"snowflake_notebook"}}'),
        'STARTED', current_timestamp(), current_timestamp();
    """).collect()

print("Registered model_run_ids:")
for c, mrid in model_runs:
    print(c["name"], mrid)



In [None]:
import numpy as np
import json
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder
from sklearn.linear_model import Ridge
from sklearn.ensemble import GradientBoostingRegressor

ds = session.sql(f"""
  select *
  from DB_BI_P_SANDBOX.SANDBOX.FORECAST_MODEL_DATASET_PC_REASON_H_SNAP
  where run_id = '{RUN_ID}'
""").to_pandas()

# ---- Column map (case-safe) ----
lower_cols = {c.lower(): c for c in ds.columns}

# ---- Required ID columns (use actual column names from the table) ----
required = [
    "roll_up_shop",
    "reason_group",
    "anchor_fiscal_yyyymm",
    "anchor_month_seq",
    "target_fiscal_yyyymm",
    "target_month_seq",
    "horizon",
    "run_id",
]
missing = [c for c in required if c not in lower_cols]
if missing:
    raise ValueError(f"Dataset snap missing required columns: {missing}")

id_cols = [lower_cols[c] for c in required]

# ---- Target column (explicit) ----
y_col = "Y_REVENUE"
if y_col not in ds.columns:
    raise ValueError(f"Target column '{y_col}' not found. Available columns: {ds.columns.tolist()}")

print("Using y_col:", y_col)
print("ID cols:", id_cols)

# ---- Feature selection ----
# Exclude IDs + target. Also exclude bookkeeping columns you don't want as model inputs.
exclude_cols = set(id_cols + [y_col, "BUILT_AT", "ROW_HASH"])

# Optional: exclude BUDGET_TARGET from features to prevent "cheating" during backtest
# if budget is not truly known at prediction time in your process. If budget is always known, remove this line.
# exclude_cols.add("BUDGET_TARGET")

feature_cols = [c for c in ds.columns if c not in exclude_cols]

# Split categorical vs numeric
cat_cols = [c for c in feature_cols if ds[c].dtype == "object"]
num_cols = [c for c in feature_cols if c not in cat_cols]

# Ensure series identifiers are included as categorical features for GLOBAL models
for c in ["ROLL_UP_SHOP", "REASON_GROUP"]:
    if c in ds.columns and c not in cat_cols:
        cat_cols.append(c)
        if c in num_cols:
            num_cols.remove(c)

print("Numeric features:", len(num_cols))
print("Categorical features:", len(cat_cols))
print("Example numeric cols:", num_cols[:10])
print("Example categorical cols:", cat_cols[:10])


In [None]:
# Add HORIZON as a numeric feature (we want one global model across horizons)
if "HORIZON" not in num_cols:
    num_cols.append("HORIZON")

# Fill nulls defensively
ds[num_cols] = ds[num_cols].fillna(0)
ds[cat_cols] = ds[cat_cols].fillna("UNKNOWN")

print("Numeric features (post):", len(num_cols))
print("Categorical features (post):", len(cat_cols))


In [None]:
import numpy as np

def signed_log1p(x, eps: float):
    """
    Signed log transform:
      y = sign(x) * log1p(|x| / eps)

    eps > 0 controls how aggressive the compression is.
    """
    if eps is None or eps <= 0:
        raise ValueError("eps must be > 0")

    x = np.asarray(x, dtype=float)
    return np.sign(x) * np.log1p(np.abs(x) / eps)

def signed_expm1(y, eps: float):
    """
    Inverse of signed_log1p:
      x = sign(y) * eps * (expm1(|y|))
    """
    if eps is None or eps <= 0:
        raise ValueError("eps must be > 0")

    y = np.asarray(y, dtype=float)
    return np.sign(y) * eps * np.expm1(np.abs(y))

def signed_log10_1p(x, eps: float):
    """
    Optional (not used yet): signed log10 variant.
      y = sign(x) * log10(1 + |x|/eps)
    """
    if eps is None or eps <= 0:
        raise ValueError("eps must be > 0")

    x = np.asarray(x, dtype=float)
    return np.sign(x) * np.log10(1.0 + (np.abs(x) / eps))

def signed_pow10_m1(y, eps: float):
    """
    Inverse of signed_log10_1p:
      x = sign(y) * eps * ((10^|y|) - 1)
    """
    if eps is None or eps <= 0:
        raise ValueError("eps must be > 0")

    y = np.asarray(y, dtype=float)
    return np.sign(y) * eps * (np.power(10.0, np.abs(y)) - 1.0)


In [None]:
import pandas as pd
from datetime import datetime

anchor_seq_col = "ANCHOR_MONTH_SEQ"
target_seq_col = "TARGET_MONTH_SEQ"
pc_col = "ROLL_UP_SHOP"
rg_col = "REASON_GROUP"
h_col = "HORIZON"

y_col = "Y_REVENUE"  # explicit

anchors = sorted(ds[anchor_seq_col].unique())
eval_anchors = anchors[-EVAL_ANCHORS:]  # last 12 anchors

print("Anchors min/max:", min(anchors), max(anchors))
print("Eval anchors:", eval_anchors)

# Identify model_run_ids from earlier registration cell
MODEL_RUN_IDS = [mrid for (_, mrid) in model_runs]
print("MODEL_RUN_IDS:", MODEL_RUN_IDS)

# Clear prior predictions for these model_run_ids (rerunnable)
mrid_list_sql = ", ".join([f"'{x}'" for x in MODEL_RUN_IDS])
session.sql(f"""
delete from DB_BI_P_SANDBOX.SANDBOX.FORECAST_MODEL_BACKTEST_PREDICTIONS
where model_run_id in ({mrid_list_sql});
""").collect()


In [None]:
from datetime import datetime
import pandas as pd
import numpy as np

pred_rows = []
now = datetime.utcnow()

# ---- Ensure eval_anchors are plain ints (avoid np.int8 etc) ----
eval_anchors = [int(x) for x in eval_anchors]

# ---- Ensure model_runs exists (donâ€™t rely on hidden state) ----
# Expected upstream vars (depending on your notebook):
# - candidates: list[dict] with {"name": ...}
# - model_run_ids: dict mapping name -> uuid
if "model_runs" not in globals() or model_runs is None:
    if "candidates" in globals() and "model_run_ids" in globals():
        model_runs = [(c, model_run_ids[c["name"]]) for c in candidates if c.get("name") in model_run_ids]
    elif "CANDIDATES" in globals() and "model_run_ids" in globals():
        model_runs = [(c, model_run_ids[c["name"]]) for c in CANDIDATES if c.get("name") in model_run_ids]
    else:
        raise NameError(
            "model_runs is not defined and could not be inferred. "
            "Define `model_runs = [(candidate_dict, model_run_id), ...]` in experiment_row_register_models."
        )

# Also keep a convenience list for later leaderboard cells
MODEL_RUN_IDS = [mrid for _, mrid in model_runs]

# Optional: only used if you want to prevent huge Ridge blowups in transformed space.
RIDGE_SLOG_CLIP_MARGIN = 0.25

for (cand, mrid) in model_runs:
    cname = cand["name"]
    if cname == "SEASONAL_NAIVE_LAG12":
        continue  # baseline handled in SQL

    pipe = make_model(cname)

    for a in eval_anchors:
        # Train on rows whose targets would have been known at anchor a:
        train = ds[(ds[target_seq_col] <= a)].copy()
        test  = ds[(ds[anchor_seq_col] == a)].copy()

        # drop null targets defensively
        train = train[train[y_col].notna()]
        test  = test[test[y_col].notna()]

        if train.empty or test.empty:
            continue

        X_train = train[num_cols + cat_cols]
        X_test  = test[num_cols + cat_cols]

        y_train = train[y_col].astype(float).to_numpy()

        # ---- Transform with explicit EPS from parameters ----
        y_train_t = signed_log1p(y_train, eps=EPS)

        # Optional guard: clip for Ridge only (helps prevent extreme extrapolation)
        if cname.upper().startswith("RIDGE"):
            lo = np.nanmin(y_train_t) - RIDGE_SLOG_CLIP_MARGIN
            hi = np.nanmax(y_train_t) + RIDGE_SLOG_CLIP_MARGIN

        pipe.fit(X_train, y_train_t)
        yhat_t = pipe.predict(X_test)

        if cname.upper().startswith("RIDGE"):
            yhat_t = np.clip(yhat_t, lo, hi)

        yhat = signed_expm1(yhat_t, eps=EPS)

        # Append row-wise predictions for Snowflake table
        for j, (_, r) in enumerate(test.iterrows()):
            pred_rows.append({
                "MODEL_RUN_ID": mrid,
                "ROLL_UP_SHOP": str(r[pc_col]),
                "REASON_GROUP": str(r[rg_col]),
                "ANCHOR_FISCAL_YYYYMM": int(r["ANCHOR_FISCAL_YYYYMM"]),
                "ANCHOR_MONTH_SEQ": int(r[anchor_seq_col]),
                "HORIZON": int(r[h_col]),
                "TARGET_FISCAL_YYYYMM": int(r["TARGET_FISCAL_YYYYMM"]),
                "TARGET_MONTH_SEQ": int(r[target_seq_col]),
                "Y_TRUE": float(r[y_col]),
                "Y_PRED": float(yhat[j]),
                "Y_PRED_LO": None,
                "Y_PRED_HI": None,
                "CREATED_AT": now,
                "DETAILS": {"eps": float(EPS), "candidate": cname, "eval_anchor": int(a)}
            })

pred_df = pd.DataFrame(pred_rows)
print("Python prediction rows:", len(pred_df))
pred_df.head()


In [None]:
print("EPS:", EPS)


In [None]:
if len(pred_df) > 0:
    session.write_pandas(
        pred_df,
        table_name="FORECAST_MODEL_BACKTEST_PREDICTIONS",
        database="DB_BI_P_SANDBOX",
        schema="SANDBOX",
        auto_create_table=False,
        overwrite=False
    )


In [None]:
# Detect column names in FORECAST_ACTUALS_PC_REASON_MTH_SNAP (no guessing)
cols = session.sql("""
select column_name
from DB_BI_P_SANDBOX.INFORMATION_SCHEMA.COLUMNS
where table_schema = 'SANDBOX'
  and table_name   = 'FORECAST_ACTUALS_PC_REASON_MTH_SNAP'
order by ordinal_position
""").to_pandas()["COLUMN_NAME"].tolist()

print("ACTUALS_SNAP columns:", cols)

# Find the month sequence column
if "MONTH_SEQ" in cols:
    actuals_seq_col = "MONTH_SEQ"
else:
    raise ValueError("Could not find MONTH_SEQ in FORECAST_ACTUALS_PC_REASON_MTH_SNAP. Paste the column list above.")

# Find the revenue/actual value column (ordered candidates)
actual_candidates = [
    "REVENUE", "ACTUAL_REVENUE", "Y_REVENUE", "TOTAL_REVENUE",
    "REVENUE_MTH", "ACTUALS_REVENUE", "ACTUAL"
]
actuals_y_col = next((c for c in actual_candidates if c in cols), None)

if actuals_y_col is None:
    raise ValueError(
        "Could not find an actuals revenue column in FORECAST_ACTUALS_PC_REASON_MTH_SNAP. "
        f"Columns are: {cols}"
    )

print("Using actuals_seq_col:", actuals_seq_col)
print("Using actuals_y_col:", actuals_y_col)


In [None]:
baseline_mrid = [mrid for (c, mrid) in model_runs if c["name"] == "SEASONAL_NAIVE_LAG12"][0]

session.sql(f"""
insert into DB_BI_P_SANDBOX.SANDBOX.FORECAST_MODEL_BACKTEST_PREDICTIONS
(model_run_id, roll_up_shop, reason_group,
 anchor_fiscal_yyyymm, anchor_month_seq, horizon,
 target_fiscal_yyyymm, target_month_seq,
 y_true, y_pred, y_pred_lo, y_pred_hi,
 created_at, details)
with ds as (
  select *
  from DB_BI_P_SANDBOX.SANDBOX.FORECAST_MODEL_DATASET_PC_REASON_H_SNAP
  where run_id = '{RUN_ID}'
),
eval_anchors as (
  select distinct ds.anchor_month_seq as anchor_month_seq
  from ds
  qualify dense_rank() over (order by ds.anchor_month_seq desc) <= {EVAL_ANCHORS}
),
base as (
  select
    ds.roll_up_shop,
    ds.reason_group,
    ds.anchor_fiscal_yyyymm,
    ds.anchor_month_seq,
    ds.horizon,
    ds.target_fiscal_yyyymm,
    ds.target_month_seq,
    ds.y_revenue::number as y_true
  from ds
  join eval_anchors ea
    on ea.anchor_month_seq = ds.anchor_month_seq
),
lag12 as (
  select
    a.roll_up_shop,
    a.reason_group,
    a.{actuals_seq_col} as month_seq,
    a.{actuals_y_col}   as y_lag12
  from DB_BI_P_SANDBOX.SANDBOX.FORECAST_ACTUALS_PC_REASON_MTH_SNAP a
  where a.run_id = '{RUN_ID}'
)
select
  '{baseline_mrid}',
  b.roll_up_shop, b.reason_group,
  b.anchor_fiscal_yyyymm, b.anchor_month_seq, b.horizon,
  b.target_fiscal_yyyymm, b.target_month_seq,
  b.y_true,
  l.y_lag12 as y_pred,
  null, null,
  current_timestamp(),
  parse_json('{{"baseline":"seasonal_naive_lag12"}}')
from base b
left join lag12 l
  on l.roll_up_shop = b.roll_up_shop
 and l.reason_group = b.reason_group
 and l.month_seq = (b.target_month_seq - 12);
""").collect()


In [None]:
# Show how many backtest predictions we have per model_run_id
mrid_list_sql = ", ".join([f"'{x}'" for x in MODEL_RUN_IDS])

session.sql(f"""
select
  model_run_id,
  count(*) as rows_pred,
  count_if(y_pred is null) as null_preds,
  count_if(y_true is null) as null_true,
  min(anchor_month_seq) as min_anchor_seq,
  max(anchor_month_seq) as max_anchor_seq,
  min(horizon) as min_h,
  max(horizon) as max_h
from DB_BI_P_SANDBOX.SANDBOX.FORECAST_MODEL_BACKTEST_PREDICTIONS
where model_run_id in ({mrid_list_sql})
group by 1
order by 1
""").show()


In [None]:
# Horizon coverage by anchor for ONE model_run_id (pick any; they should match)
one_mrid = MODEL_RUN_IDS[0]

session.sql(f"""
select
  anchor_month_seq,
  count(*) as rowz,
  count(distinct horizon) as horizons_present,
  min(horizon) as min_h,
  max(horizon) as max_h,
  min(target_month_seq) as min_target_seq,
  max(target_month_seq) as max_target_seq
from DB_BI_P_SANDBOX.SANDBOX.FORECAST_MODEL_BACKTEST_PREDICTIONS
where model_run_id = '{one_mrid}'
group by 1
order by 1
""").show()


In [None]:
one_mrid = session.sql(f"""
  select model_run_id
  from DB_BI_P_SANDBOX.SANDBOX.FORECAST_MODEL_RUNS
  where run_id = '{RUN_ID}'
  order by started_at desc
  limit 1
""").to_pandas().iloc[0,0]


In [None]:
# --- Inputs ---
fixed_eval_anchors = [48]
anchor_list_sql = ", ".join([str(x) for x in fixed_eval_anchors])
anchor_array_sql = "array_construct(" + ", ".join([str(x) for x in fixed_eval_anchors]) + ")"

mrid_list_sql = ", ".join([f"'{x}'" for x in MODEL_RUN_IDS])

baseline_mrid = [mrid for (c, mrid) in model_runs if c["name"] == "SEASONAL_NAIVE_LAG12"][0]

# --- Clear existing metrics for reruns ---
session.sql(f"""
delete from DB_BI_P_SANDBOX.SANDBOX.FORECAST_MODEL_METRICS
where model_run_id in ({mrid_list_sql})
""").collect()

# --- Compute + insert metrics (note: INSERT ... WITH ... SELECT ...) ---
session.sql(f"""
insert into DB_BI_P_SANDBOX.SANDBOX.FORECAST_MODEL_METRICS
(model_run_id, metric_scope, metric_name, horizon, value, computed_at, details)

with p as (
  select *
  from DB_BI_P_SANDBOX.SANDBOX.FORECAST_MODEL_BACKTEST_PREDICTIONS
  where model_run_id in ({mrid_list_sql})
    and anchor_month_seq in ({anchor_list_sql})
),
naive as (
  select
    roll_up_shop,
    reason_group,
    anchor_month_seq,
    horizon,
    abs(y_true - y_pred) as abs_naive_err
  from DB_BI_P_SANDBOX.SANDBOX.FORECAST_MODEL_BACKTEST_PREDICTIONS
  where model_run_id = '{baseline_mrid}'
    and anchor_month_seq in ({anchor_list_sql})
),
p3 as (
  select
    p.*,
    (p.y_true - p.y_pred) as err,
    (p.y_pred - p.y_true) as err_signed,
    abs(p.y_true - p.y_pred) as abs_err,
    abs(p.y_true) as abs_y,
    n.abs_naive_err
  from p
  left join naive n
    on n.roll_up_shop = p.roll_up_shop
   and n.reason_group = p.reason_group
   and n.anchor_month_seq = p.anchor_month_seq
   and n.horizon = p.horizon
),

-- MICRO
micro_overall as (
  select
    model_run_id,
    'OVERALL' as metric_scope,
    null::number as horizon,
    avg(abs_err) as mae,
    sqrt(avg(err*err)) as rmse,
    sum(abs_err) / nullif(sum(abs_y),0) as wape,
    avg(iff(abs_y >= {MAPE_EPSILON}, abs_err/abs_y, null)) as mape_eps,
    avg(abs_err) / nullif(avg(abs_naive_err),0) as mase,
    sum(err_signed) / nullif(sum(abs_y),0) as bias
  from p3
  group by 1
),
micro_by_h as (
  select
    model_run_id,
    'BY_HORIZON' as metric_scope,
    horizon,
    avg(abs_err) as mae,
    sqrt(avg(err*err)) as rmse,
    sum(abs_err) / nullif(sum(abs_y),0) as wape,
    avg(iff(abs_y >= {MAPE_EPSILON}, abs_err/abs_y, null)) as mape_eps,
    avg(abs_err) / nullif(avg(abs_naive_err),0) as mase,
    sum(err_signed) / nullif(sum(abs_y),0) as bias
  from p3
  group by 1,3
),

-- MACRO (per series, then average)
series_overall as (
  select
    model_run_id,
    roll_up_shop,
    reason_group,
    'OVERALL' as metric_scope,
    null::number as horizon,
    avg(abs_err) as mae,
    sqrt(avg(err*err)) as rmse,
    sum(abs_err) / nullif(sum(abs_y),0) as wape,
    avg(iff(abs_y >= {MAPE_EPSILON}, abs_err/abs_y, null)) as mape_eps,
    avg(abs_err) / nullif(avg(abs_naive_err),0) as mase,
    sum(err_signed) / nullif(sum(abs_y),0) as bias
  from p3
  group by 1,2,3
),
series_by_h as (
  select
    model_run_id,
    roll_up_shop,
    reason_group,
    'BY_HORIZON' as metric_scope,
    horizon,
    avg(abs_err) as mae,
    sqrt(avg(err*err)) as rmse,
    sum(abs_err) / nullif(sum(abs_y),0) as wape,
    avg(iff(abs_y >= {MAPE_EPSILON}, abs_err/abs_y, null)) as mape_eps,
    avg(abs_err) / nullif(avg(abs_naive_err),0) as mase,
    sum(err_signed) / nullif(sum(abs_y),0) as bias
  from p3
  group by 1,2,3,5
),
macro_agg as (
  select
    model_run_id,
    metric_scope,
    horizon,
    avg(mae) as mae,
    avg(rmse) as rmse,
    avg(wape) as wape,
    avg(mape_eps) as mape_eps,
    avg(mase) as mase,
    avg(bias) as bias
  from (
    select model_run_id, metric_scope, horizon, mae, rmse, wape, mape_eps, mase, bias from series_overall
    union all
    select model_run_id, metric_scope, horizon, mae, rmse, wape, mape_eps, mase, bias from series_by_h
  )
  group by 1,2,3
)

-- Emit rows into FORECAST_MODEL_METRICS
select
  model_run_id,
  metric_scope,
  metric_name,
  horizon,
  value,
  current_timestamp(),
  object_construct('series_agg','MICRO','eval_anchors',{anchor_array_sql},'mape_epsilon',{MAPE_EPSILON})
from (
  select model_run_id, metric_scope, horizon, 'MAE' as metric_name, mae as value from micro_overall
  union all select model_run_id, metric_scope, horizon, 'RMSE', rmse from micro_overall
  union all select model_run_id, metric_scope, horizon, 'WAPE', wape from micro_overall
  union all select model_run_id, metric_scope, horizon, 'MAPE_EPS', mape_eps from micro_overall
  union all select model_run_id, metric_scope, horizon, 'MASE', mase from micro_overall
  union all select model_run_id, metric_scope, horizon, 'BIAS', bias from micro_overall

  union all select model_run_id, metric_scope, horizon, 'MAE', mae from micro_by_h
  union all select model_run_id, metric_scope, horizon, 'RMSE', rmse from micro_by_h
  union all select model_run_id, metric_scope, horizon, 'WAPE', wape from micro_by_h
  union all select model_run_id, metric_scope, horizon, 'MAPE_EPS', mape_eps from micro_by_h
  union all select model_run_id, metric_scope, horizon, 'MASE', mase from micro_by_h
  union all select model_run_id, metric_scope, horizon, 'BIAS', bias from micro_by_h
)

union all

select
  model_run_id,
  metric_scope,
  metric_name,
  horizon,
  value,
  current_timestamp(),
  object_construct('series_agg','MACRO','eval_anchors',{anchor_array_sql},'mape_epsilon',{MAPE_EPSILON})
from (
  select model_run_id, metric_scope, horizon, 'MAE' as metric_name, mae as value from macro_agg
  union all select model_run_id, metric_scope, horizon, 'RMSE', rmse from macro_agg
  union all select model_run_id, metric_scope, horizon, 'WAPE', wape from macro_agg
  union all select model_run_id, metric_scope, horizon, 'MAPE_EPS', mape_eps from macro_agg
  union all select model_run_id, metric_scope, horizon, 'MASE', mase from macro_agg
  union all select model_run_id, metric_scope, horizon, 'BIAS', bias from macro_agg
)
""").collect()

print("Cell 11 done. Anchors:", fixed_eval_anchors)


In [None]:
# --- leaderboard (replace entire cell) ---
mrid_list_sql = ", ".join([f"'{x}'" for x in MODEL_RUN_IDS])

session.sql(f"""
with s as (
  select
    m.model_run_id,
    r.params:"candidate"::string as candidate,
    m.value as wape_micro_overall,
    r.updated_at
  from DB_BI_P_SANDBOX.SANDBOX.FORECAST_MODEL_METRICS m
  join DB_BI_P_SANDBOX.SANDBOX.FORECAST_MODEL_RUNS r
    on r.model_run_id = m.model_run_id
  where m.model_run_id in ({mrid_list_sql})
    and m.metric_scope = 'OVERALL'
    and m.metric_name  = 'WAPE'
    and m.details:"series_agg"::string = 'MICRO'
),
best as (
  select *
  from s
  qualify row_number() over (
    partition by candidate
    order by wape_micro_overall asc, updated_at desc
  ) = 1
)
select model_run_id, candidate, wape_micro_overall
from best
order by wape_micro_overall
""").show()


In [None]:
select
  params:"candidate"::string as candidate,
  model_run_id,
  created_at
from DB_BI_P_SANDBOX.SANDBOX.FORECAST_MODEL_RUNS
where run_id = '6673357e-8195-4146-9e36-17209b6cca57'
  and params:"candidate"::string = 'RIDGE_OHE'
order by created_at;


In [None]:
# --- Inputs you already have ---
# RUN_ID, MODEL_RUN_IDS, MAPE_EPSILON, fixed_eval_anchors
fixed_eval_anchors = [48]
anchor_array_sql = "array_construct(" + ", ".join([str(x) for x in fixed_eval_anchors]) + ")"

# 1) Get ASOF_FISCAL_YYYYMM for this RUN_ID
asof = session.sql(f"""
select asof_fiscal_yyyymm
from DB_BI_P_SANDBOX.SANDBOX.FORECAST_RUNS
where run_id = '{RUN_ID}'
""").to_pandas().iloc[0]["ASOF_FISCAL_YYYYMM"]

# 2) Pick champion by WAPE / MICRO / OVERALL (fixed-grid anchors already embedded in metrics.details)
champ = session.sql(f"""
with scores as (
  select
    m.model_run_id,
    r.params:"candidate"::string as candidate,
    m.value as wape_micro_overall
  from DB_BI_P_SANDBOX.SANDBOX.FORECAST_MODEL_METRICS m
  join DB_BI_P_SANDBOX.SANDBOX.FORECAST_MODEL_RUNS r
    on r.model_run_id = m.model_run_id
  where r.run_id = '{RUN_ID}'
    and m.metric_scope = 'OVERALL'
    and m.metric_name = 'WAPE'
    and m.details:"series_agg"::string = 'MICRO'
  order by wape_micro_overall
)
select * from scores limit 1
""").to_pandas()

champ_mrid = champ.iloc[0]["MODEL_RUN_ID"]
champ_candidate = champ.iloc[0]["CANDIDATE"]
champ_wape = float(champ.iloc[0]["WAPE_MICRO_OVERALL"])

print("Champion:", champ_candidate, champ_mrid, "WAPE=", champ_wape, "ASOF=", asof)

# 3) Upsert into FORECAST_MODEL_CHAMPIONS using sentinel keys for GLOBAL scope
session.sql(f"""
merge into DB_BI_P_SANDBOX.SANDBOX.FORECAST_MODEL_CHAMPIONS t
using (
  select
    {asof}::number as asof_fiscal_yyyymm,
    'GLOBAL'::string as champion_scope,
    '__ALL__'::string as roll_up_shop,
    '__ALL__'::string as reason_group,
    '{champ_mrid}'::string as model_run_id,
    'WAPE_MICRO_OVERALL'::string as selection_metric,
    object_construct(
      'series_agg','MICRO',
      'metric_scope','OVERALL',
      'metric','WAPE',
      'wape', {champ_wape},
      'eval_anchors', {anchor_array_sql},
      'mape_epsilon', {MAPE_EPSILON},
      'run_id', '{RUN_ID}',
      'candidate', '{champ_candidate}'
    ) as selection_logic,
    current_timestamp() as selected_at,
    current_user() as selected_by
) s
on  t.asof_fiscal_yyyymm = s.asof_fiscal_yyyymm
and t.champion_scope     = s.champion_scope
and t.roll_up_shop       = s.roll_up_shop
and t.reason_group       = s.reason_group
when matched then update set
  model_run_id      = s.model_run_id,
  selection_metric  = s.selection_metric,
  selection_logic   = s.selection_logic,
  selected_at       = s.selected_at,
  selected_by       = s.selected_by
when not matched then insert (
  asof_fiscal_yyyymm, champion_scope, roll_up_shop, reason_group,
  model_run_id, selection_metric, selection_logic, selected_at, selected_by
) values (
  s.asof_fiscal_yyyymm, s.champion_scope, s.roll_up_shop, s.reason_group,
  s.model_run_id, s.selection_metric, s.selection_logic, s.selected_at, s.selected_by
)
""").collect()

print("Champion upserted into FORECAST_MODEL_CHAMPIONS")


In [None]:
mrid_list_sql = ", ".join([f"'{x}'" for x in MODEL_RUN_IDS])

session.sql(f"""
select
  r.params:"candidate"::string as candidate,
  p.model_run_id,
  count(*) as n,
  min(p.y_true) as y_true_min,
  avg(p.y_true) as y_true_avg,
  max(p.y_true) as y_true_max,
  min(p.y_pred) as y_pred_min,
  avg(p.y_pred) as y_pred_avg,
  max(p.y_pred) as y_pred_max,
  sum(abs(p.y_true - p.y_pred)) / nullif(sum(abs(p.y_true)),0) as wape
from DB_BI_P_SANDBOX.SANDBOX.FORECAST_MODEL_BACKTEST_PREDICTIONS p
join DB_BI_P_SANDBOX.SANDBOX.FORECAST_MODEL_RUNS r
  on r.model_run_id = p.model_run_id
where p.model_run_id in ({mrid_list_sql})
  and p.anchor_month_seq in (48)
group by 1,2
order by wape
""").show()


In [None]:
ridge_mrid = session.sql(f"""
select model_run_id
from DB_BI_P_SANDBOX.SANDBOX.FORECAST_MODEL_RUNS
where run_id = '{RUN_ID}'
  and params:"candidate"::string = 'RIDGE_OHE'
limit 1
""").to_pandas().iloc[0]["MODEL_RUN_ID"]

session.sql(f"""
select
  roll_up_shop,
  reason_group,
  anchor_fiscal_yyyymm,
  horizon,
  y_true,
  y_pred,
  (y_true - y_pred) as err,
  abs(y_true - y_pred) as abs_err
from DB_BI_P_SANDBOX.SANDBOX.FORECAST_MODEL_BACKTEST_PREDICTIONS
where model_run_id = '{ridge_mrid}'
  and anchor_month_seq in (48)
order by abs_err desc
limit 30
""").show()


In [None]:
EPS = 100  # your epsilon

session.sql(f"""
with p as (
  select y_true, y_pred
  from DB_BI_P_SANDBOX.SANDBOX.FORECAST_MODEL_BACKTEST_PREDICTIONS
  where model_run_id = '{ridge_mrid}'
    and anchor_month_seq in (48)
),
t as (
  select
    y_true,
    y_pred,
    -- signed log transform of y_true
    case
      when y_true >= 0 then ln(1 + abs(y_true)/{EPS})
      else -ln(1 + abs(y_true)/{EPS})
    end as y_true_slog
  from p
)
select
  avg(abs(y_pred - y_true)) as avg_abs_err_on_raw,
  avg(abs(y_pred - y_true_slog)) as avg_abs_err_on_slog,
  corr(y_pred, y_true) as corr_raw,
  corr(y_pred, y_true_slog) as corr_slog
from t
""").show()


In [None]:
ridge_mrid = "6673357e-8195-4146-9e36-17209b6cca57"  # from 13A

session.sql(f"""
select
  count(*) as n,
  min(y_true) as y_true_min,
  avg(y_true) as y_true_avg,
  max(y_true) as y_true_max,
  min(y_pred) as y_pred_min,
  avg(y_pred) as y_pred_avg,
  max(y_pred) as y_pred_max
from DB_BI_P_SANDBOX.SANDBOX.FORECAST_MODEL_BACKTEST_PREDICTIONS
where model_run_id = '{ridge_mrid}'
  and anchor_month_seq = 48
""").show()


In [None]:
import numpy as np
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder, StandardScaler
from sklearn.impute import SimpleImputer
from sklearn.linear_model import Ridge

ridge_mrid = "6673357e-8195-4146-9e36-17209b6cca57"
EVAL_ANCHORS = [48]
TRAIN_MAX_ANCHOR = min(EVAL_ANCHORS) - 1

# Load dataset snap for this RUN_ID
ds = session.sql(f"""
  select *
  from DB_BI_P_SANDBOX.SANDBOX.FORECAST_MODEL_DATASET_PC_REASON_H_SNAP
  where run_id = '{RUN_ID}'
""").to_pandas()

# Target + IDs
y_col = "Y_REVENUE"
id_cols = ["RUN_ID","ROLL_UP_SHOP","REASON_GROUP","ANCHOR_FISCAL_YYYYMM","ANCHOR_MONTH_SEQ",
           "TARGET_FISCAL_YYYYMM","TARGET_MONTH_SEQ","HORIZON"]

# Base feature set (start from your earlier computed cols, but re-derive safely)
base_exclude = set(id_cols + [y_col])

feature_cols = [c for c in ds.columns if c not in base_exclude]

# Split numeric vs categorical
cat_cols = [c for c in feature_cols if ds[c].dtype == "object"]
num_cols = [c for c in feature_cols if c not in cat_cols]

# Ensure series identifiers are categorical features
for c in ["ROLL_UP_SHOP","REASON_GROUP"]:
    if c not in cat_cols:
        cat_cols.append(c)

# Drop ID-like / key-like numeric columns that break linear models
DROP_NUM = set([
    "ASOF_FISCAL_YYYYMM",
    "ANCHOR_FISCAL_YYYYMM",
    "TARGET_FISCAL_YYYYMM",
    "ANCHOR_MONTH_SEQ",
    "TARGET_MONTH_SEQ",
    "ANCHOR_FISCAL_YEAR",
    "ANCHOR_FISCAL_MONTH",
])
num_cols = [c for c in num_cols if c not in DROP_NUM]

print("Ridge numeric cols:", len(num_cols))
print("Ridge categorical cols:", len(cat_cols))

# Train/test split by anchor
train = ds[ds["ANCHOR_MONTH_SEQ"] <= TRAIN_MAX_ANCHOR].copy()
test  = ds[ds["ANCHOR_MONTH_SEQ"].isin(EVAL_ANCHORS)].copy()

X_train = train[num_cols + cat_cols]
y_train = train[y_col].astype(float)

X_test  = test[num_cols + cat_cols]
y_test  = test[y_col].astype(float)

# Pipeline: impute + scale numeric, OHE categorical
pre = ColumnTransformer(
    transformers=[
        ("num", Pipeline([
            ("imp", SimpleImputer(strategy="median")),
            ("scaler", StandardScaler())
        ]), num_cols),
        ("cat", OneHotEncoder(handle_unknown="ignore"), cat_cols),
    ],
    remainder="drop",
    sparse_threshold=0.3
)

model = Ridge(alpha=1.0, random_state=0)

pipe = Pipeline([("pre", pre), ("model", model)])
pipe.fit(X_train, y_train)

y_pred = pipe.predict(X_test)

# Build prediction rows (match table columns dynamically)
pred = test[id_cols].copy()
pred["MODEL_RUN_ID"] = ridge_mrid
pred["Y_TRUE"] = y_test.values
pred["Y_PRED"] = y_pred
pred["BUILT_AT"] = np.datetime64("now")

# Align to actual prediction table columns
pred_cols = session.sql("""
select column_name
from DB_BI_P_SANDBOX.INFORMATION_SCHEMA.COLUMNS
where table_schema='SANDBOX'
  and table_name='FORECAST_MODEL_BACKTEST_PREDICTIONS'
order by ordinal_position
""").to_pandas()["COLUMN_NAME"].tolist()
pred_cols_set = set(pred_cols)

# keep only columns that exist in the table (case-sensitive to Snowflake output)
# our pandas cols are uppercase already; if not, upper them
pred.columns = [c.upper() for c in pred.columns]
pred = pred[[c for c in pred.columns if c in pred_cols_set]]

# Overwrite Ridge rows for these anchors (rerunnable)
session.sql(f"""
delete from DB_BI_P_SANDBOX.SANDBOX.FORECAST_MODEL_BACKTEST_PREDICTIONS
where model_run_id = '{ridge_mrid}'
  and anchor_month_seq in ({",".join([str(x) for x in EVAL_ANCHORS])})
""").collect()

session.create_dataframe(pred).write.mode("append").save_as_table(
    "DB_BI_P_SANDBOX.SANDBOX.FORECAST_MODEL_BACKTEST_PREDICTIONS"
)

print("Ridge predictions overwritten for anchors:", EVAL_ANCHORS)
