STEP 1 — install stable libs (run once, then restart session)

```
# This is formatted as code
```



In [None]:
# STEP 1A — install a stable stack (no NumPy 2.x conflicts)
!pip -q install --force-reinstall --no-deps \
  "numpy==1.26.4" "pandas==2.2.2" "pyarrow==14.0.2" "fastparquet==2024.5.0" \
  "scikit-learn==1.5.1" "lightgbm==4.3.0" "xgboost==2.1.1" \
  "networkx==3.3" "gensim==4.3.3" "node2vec==0.4.6"


[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m61.0/61.0 kB[0m [31m2.3 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m18.0/18.0 MB[0m [31m55.7 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m12.7/12.7 MB[0m [31m62.9 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m38.0/38.0 MB[0m [31m11.5 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.8/1.8 MB[0m [31m36.4 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m13.1/13.1 MB[0m [31m43.3 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.1/3.1 MB[0m [31m45.3 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m153.9/153.9 MB[0m [31m6.6 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

*before running the seccond step kindly restart *

STEP 1C — Verify imports and versions (single cell)

STEP 2 — Pick your parquet files (no paths to edit)

In [None]:
# STEP 1C — verify imports/versions
import numpy as np, pandas as pd, pyarrow as pa
import sklearn, lightgbm as lgb, xgboost as xgb
import networkx as nx
from node2vec import Node2Vec

print("numpy        :", np.__version__)     # expect 1.26.4
print("pandas       :", pd.__version__)     # expect 2.2.2
print("pyarrow      :", pa.__version__)     # expect 14.0.2
print("scikit-learn :", sklearn.__version__)# ~1.5.1
print("lightgbm     :", lgb.__version__)    # ~4.3.0
print("xgboost      :", xgb.__version__)    # ~2.1.x
print("networkx     :", nx.__version__)     # 3.3
print("node2vec     : ok")


RuntimeError: empty_like method already has a different docstring

In [None]:
# STEP 2: upload once via picker → store with stable names in /content
from google.colab import files
from pathlib import Path
import shutil, os

MERGED_DST = Path("/content/merged_trip_details_0.parquet")
TXN_DST    = Path("/content/transaction-0.parquet")

def detect_role(fname):
    # Try schema first; fall back to filename keywords
    try:
        import pyarrow.parquet as pq
        names = set(pq.ParquetFile(fname).schema.names)
        if {"trip_id","trip_start_time","fare_amount"} <= names: return "merged"
        if {"transaction_id","transaction_amount","merchant_id"} <= names: return "txn"
    except Exception:
        pass
    low = os.path.basename(fname).lower()
    if "merged" in low: return "merged"
    if "transact" in low or "txn" in low: return "txn"
    return "unknown"

missing = []
if not MERGED_DST.exists(): missing.append("merged")
if not TXN_DST.exists():    missing.append("txn")

if not missing:
    print("✅ Found both parquet files in /content/. Reusing them for all next steps.")
    print("MERGED_PATH:", MERGED_DST)
    print("TXN_PATH   :", TXN_DST)
else:
    print("Missing:", missing, "→ pick your parquet file(s) in the dialog.")
    uploaded = files.upload()
    leftovers = []
    for fname in uploaded.keys():
        role = detect_role(fname)
        if role == "merged" and not MERGED_DST.exists():
            shutil.move(fname, MERGED_DST.as_posix()); print("✔", fname, "→", MERGED_DST)
        elif role == "txn" and not TXN_DST.exists():
            shutil.move(fname, TXN_DST.as_posix());    print("✔", fname, "→", TXN_DST)
        else:
            leftovers.append(fname); print("ℹ Skipped", fname, "(role:", role, ")")
    # If one role still empty and exactly one leftover exists, assign it
    if (not MERGED_DST.exists() or not TXN_DST.exists()) and leftovers:
        target = MERGED_DST if not MERGED_DST.exists() else TXN_DST
        lf = leftovers[0]
        shutil.move(lf, target.as_posix())
        print(f"↪ Assigned remaining {lf} → {target}")

    assert MERGED_DST.exists(), "merged_trip_details_0.parquet still missing."
    assert TXN_DST.exists(),    "transaction-0.parquet still missing."
    print("\n✅ Ready. Stored at:")
    print("MERGED_PATH:", MERGED_DST)
    print("TXN_PATH   :", TXN_DST)


✅ Now run: STEP 3 (OPTIMIZED) — compact per-user transaction features

In [None]:
# STEP 3: build compact per-user txn features (low RAM)
import pandas as pd, numpy as np
from pathlib import Path

TXN_PATH = Path("/content/transaction-0.parquet")
assert TXN_PATH.exists(), f"Missing {TXN_PATH}. Run STEP 2 first."

# 1) Load only needed columns
use_cols = [
    "user_id", "transaction_amount",
    "payment_method", "transaction_status", "merchant_category",
    "transaction_date", "transaction_time"
]
txn = pd.read_parquet(TXN_PATH, engine="pyarrow", columns=use_cols)

# 2) Lean dtypes + timestamp
txn["user_id"]            = txn["user_id"].astype("string")
txn["payment_method"]     = txn["payment_method"].astype("string")
txn["transaction_status"] = txn["transaction_status"].astype("string")
txn["merchant_category"]  = txn["merchant_category"].astype("string")
txn["transaction_amount"] = pd.to_numeric(txn["transaction_amount"], errors="coerce").astype("float32")
txn["txn_ts"] = pd.to_datetime(
    txn["transaction_date"].astype("string") + " " + txn["transaction_time"].astype("string"),
    errors="coerce"
)

# 3) Numeric aggregations
g = txn.groupby("user_id", observed=True)
num_agg = g["transaction_amount"].agg(
    txn_count="size",
    txn_sum="sum",
    txn_mean="mean",
    txn_std="std",
    txn_min="min",
    txn_max="max",
).fillna(0.0).reset_index()

num_agg = num_agg.astype({
    "txn_count":"int32","txn_sum":"float32","txn_mean":"float32",
    "txn_std":"float32","txn_min":"float32","txn_max":"float32"
})

# 4) Top-K fractional one-hots (keeps width small)
def frac_topk(df, user_col, cat_col, k=6, prefix=None):
    if cat_col not in df.columns:
        return pd.DataFrame({user_col: df[user_col].unique()})
    prefix = prefix or cat_col
    top = df[cat_col].value_counts().nlargest(k).index.astype("string")
    mapped = df[[user_col, cat_col]].copy()
    mapped[cat_col] = mapped[cat_col].astype("string").where(mapped[cat_col].isin(top), "__other__")
    cnts = mapped.groupby([user_col, cat_col], observed=True).size().astype("int32")
    frac = (cnts / cnts.groupby(level=0).sum()).rename("frac").reset_index()
    wide = frac.pivot(index=user_col, columns=cat_col, values="frac").fillna(0.0)
    cols = list(top) + ["__other__"]
    for c in cols:
        if c not in wide.columns: wide[c] = 0.0
    wide = wide[cols]
    wide.columns = [f"{prefix}_{c}" for c in wide.columns]
    wide = wide.reset_index()
    for c in wide.columns:
        if c != user_col: wide[c] = wide[c].astype("float32")
    return wide

pm_frac = frac_topk(txn, "user_id", "payment_method",     k=6, prefix="pm")
st_frac = frac_topk(txn, "user_id", "transaction_status", k=6, prefix="status")
mc_frac = frac_topk(txn, "user_id", "merchant_category",  k=6, prefix="mc")

# 5) Days since last transaction
last_txn = g["txn_ts"].max().reset_index().rename(columns={"txn_ts":"last_txn_ts"})
global_max_ts = pd.to_datetime(txn["txn_ts"].max())
last_txn["days_since_last_txn"] = ((global_max_ts - last_txn["last_txn_ts"]).dt.days).fillna(9999).astype("int32")
last_txn = last_txn.drop(columns=["last_txn_ts"])

# 6) Merge all features
user_txn_features = (
    num_agg.merge(pm_frac, on="user_id", how="left")
           .merge(st_frac, on="user_id", how="left")
           .merge(mc_frac, on="user_id", how="left")
           .merge(last_txn, on="user_id", how="left")
           .fillna(0.0)
)

# 7) Save
OUT_FULL   = "/content/user_txn_features.parquet"
OUT_SAMPLE = "/content/user_txn_features_sample.csv"
user_txn_features.to_parquet(OUT_FULL, index=False)
user_txn_features.head(5).to_csv(OUT_SAMPLE, index=False)

print("user_txn_features shape:", user_txn_features.shape)
print("Saved ->", OUT_FULL, "and", OUT_SAMPLE)
display(user_txn_features.head(5))


STEP 4 → write model_table_90d.*





In [None]:
# STEP 4: build per-trip table, merge user features, keep it numeric+lean (→ 90d)
import pandas as pd, numpy as np
from pathlib import Path

MERGED_PATH = Path("/content/merged_trip_details_0.parquet")
USERF_PATH  = Path("/content/user_txn_features.parquet")
assert MERGED_PATH.exists() and USERF_PATH.exists(), "Run Step 2/3 first."

need = [
    "trip_id","user_id","trip_date","trip_start_time","incident_flag","safety_score",
    "trip_duration","trip_distance","fare_amount","tip_amount","trip_rating","cancellation_flag",
    "day_of_week","route_type","payment_method","currency"
]
avail = pd.read_parquet(MERGED_PATH, engine="pyarrow").columns
use_cols = [c for c in need if c in avail]
merged = pd.read_parquet(MERGED_PATH, engine="pyarrow", columns=use_cols)

merged["user_id"] = merged["user_id"].astype("string")
if "trip_start_time" in merged.columns:
    merged["trip_ts"] = pd.to_datetime(merged["trip_start_time"], errors="coerce")
else:
    merged["trip_ts"] = pd.to_datetime(merged["trip_date"], errors="coerce")
merged["trip_date_dt"] = merged["trip_ts"].dt.normalize()

for c in ["trip_duration","trip_distance","fare_amount","tip_amount","trip_rating","safety_score"]:
    if c in merged.columns:
        merged[c] = pd.to_numeric(merged[c], errors="coerce").astype("float32")

if "cancellation_flag" in merged.columns:
    merged["cancellation_flag"] = pd.to_numeric(merged["cancellation_flag"], errors="coerce").fillna(0).astype("int8")
merged["incident_flag"] = (merged.get("incident_flag", 0)).astype("int8")

for c in ["day_of_week","route_type","payment_method","currency"]:
    if c in merged.columns:
        merged[c] = merged[c].astype("string")

def topk_onehot(df, col, k=6, prefix=None):
    if col not in df.columns: return pd.DataFrame(index=df.index)
    prefix = prefix or col
    top = df[col].value_counts().nlargest(k).index.astype("string")
    s = df[col].where(df[col].isin(top), "__other__")
    oh = pd.get_dummies(s, prefix=prefix)
    want = [f"{prefix}_{x}" for x in list(top) + ["__other__"]]
    return oh.reindex(columns=want, fill_value=0).astype("int8")

oh = pd.concat([
    topk_onehot(merged, "day_of_week",     6, "dow"),
    topk_onehot(merged, "route_type",      6, "route"),
    topk_onehot(merged, "payment_method",  6, "pm_trip"),
], axis=1)

if {"trip_duration","trip_distance"} <= set(merged.columns):
    merged["speed_kmph"] = (
        merged["trip_distance"] / (merged["trip_duration"] / 60.0)
    ).replace([np.inf, -np.inf], np.nan).clip(0, 160).astype("float32")

if {"fare_amount","trip_distance"} <= set(merged.columns):
    fk = (merged["fare_amount"] / merged["trip_distance"]).replace([np.inf,-np.inf], np.nan)
    merged["fare_per_km"] = fk.clip(0, fk.quantile(0.995)).astype("float32")

if {"fare_amount","tip_amount"} <= set(merged.columns):
    merged["tip_pct"] = (
        merged["tip_amount"] / merged["fare_amount"]
    ).replace([np.inf,-np.inf], np.nan).clip(0, 1).astype("float32")

merged["hour"]    = merged["trip_ts"].dt.hour.astype("float32")
merged["weekday"] = merged["trip_ts"].dt.dayofweek.astype("float32")

keep_cols = [
    "trip_id","user_id","trip_date_dt","incident_flag",
    "trip_duration","trip_distance","fare_amount","tip_amount","trip_rating",
    "cancellation_flag","safety_score","speed_kmph","fare_per_km","tip_pct","hour","weekday"
]
keep_cols = [c for c in keep_cols if c in merged.columns]
base = pd.concat([merged[keep_cols], oh], axis=1)

userf = pd.read_parquet(USERF_PATH, engine="pyarrow")
userf["user_id"] = userf["user_id"].astype("string")
model_table = base.merge(userf, on="user_id", how="left").fillna(0)

id_cols   = ["trip_id","user_id","trip_date_dt","incident_flag"]
num_cols  = [c for c in model_table.columns if c not in id_cols]
for c in num_cols:
    if model_table[c].dtype == "bool": model_table[c] = model_table[c].astype("int8")
    if model_table[c].dtype.name.startswith("float"): model_table[c] = model_table[c].astype("float32")
    if model_table[c].dtype.name.startswith("int"):   model_table[c] = model_table[c].astype("int32")

OUT_PARQ = "/content/model_table_90d.parquet"
SAMPLE_CSV = "/content/model_table_90d_sample.csv"
model_table.to_parquet(OUT_PARQ, index=False)
model_table.head(5).to_csv(SAMPLE_CSV, index=False)

cutoff_90d = model_table["trip_date_dt"].max() - pd.Timedelta(days=90)
print("model_table_90d:", model_table.shape, "→", OUT_PARQ)
print("date range:", model_table["trip_date_dt"].min().date(), "→", model_table["trip_date_dt"].max().date())
print("90-day cutoff (for training/val split later):", cutoff_90d.date())
print("numeric feature count:", len(num_cols))
display(model_table.head(5))


STEP 5 (90-day) LightGBM

In [None]:
# STEP 5: 90-day LightGBM → probs → Nova Score → Decision Band
import pandas as pd, numpy as np
from pathlib import Path
from sklearn.metrics import roc_auc_score, average_precision_score, f1_score
import lightgbm as lgb

PARQ = Path("/content/model_table_90d.parquet")
assert PARQ.exists(), "Run STEP 4 (90d) first."

df = pd.read_parquet(PARQ, engine="pyarrow")

# 1) Chronological 70/30 split (tz-safe)
df["trip_date_dt"] = pd.to_datetime(df["trip_date_dt"], errors="coerce")
if str(df["trip_date_dt"].dtype).startswith("datetime64[ns,"):
    df["trip_date_naive"] = df["trip_date_dt"].dt.tz_localize(None)
else:
    df["trip_date_naive"] = df["trip_date_dt"]

cutoff = df["trip_date_naive"].quantile(0.70)
train_mask = df["trip_date_naive"] <= cutoff
val_mask   = df["trip_date_naive"] >  cutoff

# 2) Features / labels
id_cols  = ["trip_id","user_id","trip_date_dt","trip_date_naive","incident_flag"]
num_cols = [c for c in df.columns if c not in id_cols]
X_train  = df.loc[train_mask, num_cols].astype("float32")
y_train  = df.loc[train_mask, "incident_flag"].astype("int8")
X_val    = df.loc[val_mask,   num_cols].astype("float32")
y_val    = df.loc[val_mask,   "incident_flag"].astype("int8")

print(f"Train/Val sizes: {len(X_train)} / {len(X_val)} | features: {len(num_cols)}")

# Optional RAM cap
N_MAX = 400_000
if len(X_train) > N_MAX:
    pos_idx = y_train[y_train == 1].index
    neg_idx = y_train[y_train == 0].index
    n_pos = min(len(pos_idx), N_MAX // 8)
    n_neg = N_MAX - n_pos
    pos_keep = np.random.choice(pos_idx, n_pos, replace=False)
    neg_keep = np.random.choice(neg_idx, n_neg, replace=False)
    keep = np.concatenate([pos_keep, neg_keep])
    X_train = X_train.loc[keep]
    y_train = y_train.loc[keep]
    print(f"Downsampled train → {len(X_train)} rows (pos={int(y_train.sum())})")

# 3) LightGBM params (class imbalance aware)
pos_rate = max(1e-6, y_train.mean())
scale_pos_weight = (1 - pos_rate) / pos_rate
print("Pos rate (train):", float(pos_rate), "| scale_pos_weight:", float(scale_pos_weight))

lgb_params = dict(
    objective="binary", metric="auc",
    learning_rate=0.05, num_leaves=64, max_depth=-1, min_data_in_leaf=200,
    feature_fraction=0.8, bagging_fraction=0.8, bagging_freq=1,
    lambda_l1=0.0, lambda_l2=0.0,
    scale_pos_weight=scale_pos_weight,
    verbosity=-1,
)

train_set = lgb.Dataset(X_train, label=y_train, feature_name=[f"f_{i}" for i in range(X_train.shape[1])])
val_set   = lgb.Dataset(X_val,   label=y_val,   reference=train_set, feature_name=[f"f_{i}" for i in range(X_val.shape[1])])

gbm = lgb.train(
    lgb_params,
    train_set,
    num_boost_round=1500,
    valid_sets=[val_set],
    valid_names=["valid"],
    callbacks=[lgb.early_stopping(stopping_rounds=100, first_metric_only=True),
               lgb.log_evaluation(period=100)],
)

# 4) Metrics
p_val = gbm.predict(X_val, num_iteration=gbm.best_iteration)
auroc  = float(roc_auc_score(y_val, p_val))
prauc  = float(average_precision_score(y_val, p_val))

def f1_at(p, y, thr): return f1_score(y, (p >= thr).astype(int))
grid = np.linspace(0.05, 0.50, 10)
f1s = [f1_at(p_val, y_val, t) for t in grid]
best_t  = float(grid[int(np.argmax(f1s))])
best_f1 = float(max(f1s))
print({"AUROC": auroc, "PR_AUC": prauc, "best_F1": best_f1, "best_threshold": best_t})

# 5) Nova Score (sigmoid) + Decision Band
def nova_score_from_proba(p, beta0=-3.5, beta1=10.0):
    s = 1.0 / (1.0 + np.exp(-(beta0 - beta1 * p)))
    return 300.0 + 600.0 * s  # ~[300,900]

def decision_band(score):
    if score < 600:    return "<600 (Decline + coaching)"
    if score < 680:    return "600–679 (Caution)"
    if score < 740:    return "680–739 (Manual review)"
    return "≥740 (Approve)"

val_out = df.loc[val_mask, ["trip_id","user_id","incident_flag"]].copy()
val_out["proba"] = p_val.astype("float32")
val_out["nova_score"] = nova_score_from_proba(val_out["proba"]).astype("float32")
val_out["decision_band"] = [decision_band(s) for s in val_out["nova_score"]]

# 6) Per-user consolidation
user_scores = (
    val_out.groupby("user_id", as_index=False)
           .agg(proba_mean=("proba","mean"),
                trips=("proba","size"))
)
user_scores["nova_score"] = nova_score_from_proba(user_scores["proba_mean"]).astype("float32")
user_scores["decision_band"] = [decision_band(s) for s in user_scores["nova_score"]]

# 7) Save
val_out.sample(min(10, len(val_out))).to_csv("/content/val_preds_90d_sample.csv", index=False)
user_scores.to_csv("/content/user_scores_90d.csv", index=False)
print("Saved -> /content/val_preds_90d_sample.csv")
print("Saved -> /content/user_scores_90d.csv")

display(user_scores.head(10))


STEP 6A — Build a small user–merchant edge list (pruned, RAM-safe)

In [None]:
# STEP 6A: make a pruned bipartite edge list → /content/edges_pruned.parquet  (90d)
import pandas as pd, numpy as np
from pathlib import Path

TXN_PATH   = Path("/content/transaction-0.parquet")
MODEL_PARQ = Path("/content/model_table_90d.parquet")
assert TXN_PATH.exists() and MODEL_PARQ.exists(), "Run Steps 2–5 (90d) first."

# Load just what we need
use_cols = ["user_id","merchant_id"]
txn_cols = pd.read_parquet(TXN_PATH, engine="pyarrow").columns
txn = pd.read_parquet(TXN_PATH, engine="pyarrow",
                      columns=[c for c in use_cols if c in txn_cols])

txn["user_id"] = txn["user_id"].astype("string")
if "merchant_id" not in txn.columns:
    raise RuntimeError("merchant_id not found in transactions.")

# Keep only users that appear in the 90d modeling table (RAM saver)
users_90d = pd.read_parquet(MODEL_PARQ, engine="pyarrow",
                            columns=["user_id"])["user_id"].astype("string").unique()
txn = txn[txn["user_id"].isin(set(users_90d))]

# Prune to avoid huge graphs
MIN_USER_TXN  = 5
TOPK_PER_USER = 20
EDGE_CAP      = 300_000  # hard cap for safety

# freq per (u,m)
pairs = (txn.groupby(["user_id","merchant_id"], observed=True)
           .size().rename("w").reset_index())

# drop low-degree users
deg = (pairs.groupby("user_id", observed=True)["merchant_id"]
            .nunique().rename("deg").reset_index())
good_users = set(deg.loc[deg["deg"] >= MIN_USER_TXN, "user_id"])
pairs = pairs[pairs["user_id"].isin(good_users)]

# top-K merchants per user
pairs["rn"] = pairs.groupby("user_id")["w"].rank(method="first", ascending=False)
pairs = pairs[pairs["rn"] <= TOPK_PER_USER].drop(columns="rn")

# cap total edges
if len(pairs) > EDGE_CAP:
    pairs = pairs.sample(EDGE_CAP, random_state=42).reset_index(drop=True)

pairs = pairs.reset_index(drop=True)
pairs.to_parquet("/content/edges_pruned.parquet", index=False)

print(f"Edges pruned: {len(pairs)}  | users: {pairs['user_id'].nunique()}  | merchants: {pairs['merchant_id'].nunique()}")
print("Saved -> /content/edges_pruned.parquet")


STEP 6B — make user embeddings (Node2Vec → SVD fallback), save to /content/user_n2v.parquet

In [None]:
# STEP 6B: build /content/user_n2v.parquet (Node2Vec or SVD fallback, RAM-safe)
import pandas as pd, numpy as np
from pathlib import Path
import gc

from sklearn.decomposition import TruncatedSVD
from scipy.sparse import coo_matrix
import networkx as nx

EDGES = Path("/content/edges_pruned.parquet")
assert EDGES.exists(), "Run STEP 6A first."

edges = pd.read_parquet(EDGES, engine="pyarrow")
edges["user_id"]     = edges["user_id"].astype("string")
edges["merchant_id"] = edges["merchant_id"].astype("string")
if "w" not in edges.columns:
    edges["w"] = 1.0
edges["w"] = edges["w"].astype("float32")

print(f"Edges for embedding: {len(edges)} | users={edges['user_id'].nunique()} | merchants={edges['merchant_id'].nunique()}")

EMB_DIM = 32   # small & safe for Colab Free
method_used = None
user_emb = None

# ---- Try Node2Vec (small params) ----
try:
    from node2vec import Node2Vec

    G = nx.Graph()
    # prefix with U_/M_ to avoid ID collisions
    G.add_edges_from(
        (f"U_{u}", f"M_{m}", {"weight": float(w)})
        for u, m, w in edges.itertuples(index=False)
    )
    print(f"Graph: |V|={G.number_of_nodes()} |E|={G.number_of_edges()}")

    n2v = Node2Vec(
        G,
        dimensions=EMB_DIM,
        walk_length=8,
        num_walks=4,
        p=1, q=1,
        workers=1,                 # keep RAM/CPU low
        weight_key="weight",
        quiet=True,
        seed=42,
    )
    w2v = n2v.fit(window=5, min_count=1, batch_words=512, epochs=3)

    u_nodes = [n for n in G.nodes if str(n).startswith("U_")]
    rows = []
    zero = np.zeros(EMB_DIM, dtype=np.float32)
    for n in u_nodes:
        vec = w2v.wv[n] if n in w2v.wv else zero
        rows.append([n[2:]] + list(map(float, vec)))

    cols = ["user_id"] + [f"user_n2v_{i}" for i in range(EMB_DIM)]
    user_emb = pd.DataFrame(rows, columns=cols)
    method_used = "node2vec"

    del w2v, n2v, G
    gc.collect()

except Exception as e:
    print("Node2Vec failed (falling back to SVD):", e)

# ---- SVD fallback (very RAM-friendly) ----
if user_emb is None:
    u_codes = edges["user_id"].astype("category")
    m_codes = edges["merchant_id"].astype("category")
    uid = u_codes.cat.codes.values
    mid = m_codes.cat.codes.values
    w   = edges["w"].values

    n_u = int(uid.max()) + 1
    n_m = int(mid.max()) + 1
    X = coo_matrix((w, (uid, mid)), shape=(n_u, n_m), dtype=np.float32).tocsr()

    svd = TruncatedSVD(n_components=EMB_DIM, n_iter=7, random_state=42)
    U = svd.fit_transform(X).astype("float32")

    # map codes back to user_id strings
    code_to_user = pd.DataFrame({
        "user_id": u_codes.cat.categories.astype("string"),
        "uid": np.arange(len(u_codes.cat.categories))
    }).sort_values("uid").reset_index(drop=True)

    user_emb = pd.DataFrame(U, columns=[f"user_n2v_{i}" for i in range(EMB_DIM)])
    user_emb.insert(0, "user_id", code_to_user["user_id"].values)
    method_used = "svd_fallback"

    del X, svd, U
    gc.collect()

print(f"Embeddings ready: {user_emb.shape}  | method={method_used}")
user_emb.to_parquet("/content/user_n2v.parquet", index=False)
user_emb.head(10).to_csv("/content/user_n2v_sample.csv", index=False)
print("Saved -> /content/user_n2v.parquet and /content/user_n2v_sample.csv")


# STEP 6C-SAFE (part 1/2): prep + TRAIN on a stratified sample without full merge


In [None]:
# STEP 6C-SAFE (part 1/2): prep + TRAIN on a stratified sample without full merge
import pandas as pd, numpy as np, gc
from pathlib import Path
import lightgbm as lgb
from sklearn.metrics import roc_auc_score, average_precision_score, f1_score

PARQ = Path("/content/model_table_90d.parquet")
EMB  = Path("/content/user_n2v.parquet")
assert PARQ.exists() and EMB.exists(), "Missing inputs. Run Steps 4–6B first."

# ---- 1) Load base table (no merge), get numeric columns ----
df  = pd.read_parquet(PARQ, engine="pyarrow")
df["user_id"] = df["user_id"].astype("string")
df.reset_index(drop=True, inplace=True)

id_cols = ["trip_id","user_id","trip_date_dt","incident_flag"]
num_base = [c for c in df.columns if c not in id_cols and np.issubdtype(df[c].dtype, np.number)]

# ---- 2) Split (70/30 by time) ----
df["trip_date_dt"] = pd.to_datetime(df["trip_date_dt"], errors="coerce")
if str(df["trip_date_dt"].dtype).startswith("datetime64[ns,"):
    trip_naive = df["trip_date_dt"].dt.tz_localize(None)
else:
    trip_naive = df["trip_date_dt"]

cutoff = trip_naive.quantile(0.70)
train_mask = trip_naive <= cutoff
val_mask   = trip_naive >  cutoff

# ---- 3) Load embeddings & index for fast mapping ----
emb = pd.read_parquet(EMB, engine="pyarrow")
emb["user_id"] = emb["user_id"].astype("string")
emb = emb.set_index("user_id")
emb_cols = [c for c in emb.columns if c.startswith("user_n2v_")]
EMB_DIM = len(emb_cols)

# ---- 4) Stratified downsample BEFORE building features ----
N_MAX = 300_000  # adjust down if still tight (e.g., 200_000)
rng = np.random.default_rng(42)

y = df["incident_flag"].astype("int8")
pos_idx = np.where(train_mask & (y == 1))[0]
neg_idx = np.where(train_mask & (y == 0))[0]

n_pos = min(len(pos_idx), max(1, N_MAX // 8))
n_neg = min(len(neg_idx), max(1, N_MAX - n_pos))
keep = np.concatenate([rng.choice(pos_idx, n_pos, replace=False),
                       rng.choice(neg_idx, n_neg, replace=False)])
rng.shuffle(keep)

print(f"Train downsample: pos={n_pos}, neg={n_neg}, total={len(keep)}")

# ---- 5) Build TRAIN features on the kept rows ONLY (map embeds column-wise) ----
X_train = df.loc[keep, num_base].astype("float32").copy()
uid_train = df.loc[keep, "user_id"]

# add embeddings via vectorized map per column (no big merge)
for c in emb_cols:
    X_train[c] = uid_train.map(emb[c]).astype("float32").fillna(0.0)

y_train = y.iloc[keep].to_numpy()

# ---- 6) Build a compact VALIDATION sample for live metrics (keep full scoring for 6C part 2) ----
VAL_SAMPLE = 250_000  # small holdout for AUC; full validation scored streaming later
val_idx_all = np.where(val_mask)[0]
val_keep = rng.choice(val_idx_all, size=min(VAL_SAMPLE, len(val_idx_all)), replace=False)

X_val_small = df.loc[val_keep, num_base].astype("float32").copy()
uid_val_small = df.loc[val_keep, "user_id"]
for c in emb_cols:
    X_val_small[c] = uid_val_small.map(emb[c]).astype("float32").fillna(0.0)
y_val_small = y.iloc[val_keep].to_numpy()

# ---- 7) Train LightGBM ----
pos_rate = float(max(1e-6, y_train.mean()))
scale_pos_weight = (1 - pos_rate) / pos_rate
params = dict(objective="binary", metric="auc",
              learning_rate=0.05, num_leaves=64, min_data_in_leaf=200,
              feature_fraction=0.8, bagging_fraction=0.8, bagging_freq=1,
              scale_pos_weight=scale_pos_weight, verbosity=-1)

train_set = lgb.Dataset(X_train, label=y_train)
val_set   = lgb.Dataset(X_val_small, label=y_val_small, reference=train_set)

gbm = lgb.train(params, train_set, num_boost_round=1200,
                valid_sets=[val_set], valid_names=["valid"],
                callbacks=[lgb.early_stopping(100, first_metric_only=True),
                           lgb.log_evaluation(100)])

# quick metrics on sampled val
p_small = gbm.predict(X_val_small, num_iteration=gbm.best_iteration)
auroc = float(roc_auc_score(y_val_small, p_small))
prauc = float(average_precision_score(y_val_small, p_small))
grid = np.linspace(0.05, 0.50, 10)
best_t = float(grid[int(np.argmax([f1_score(y_val_small, (p_small>=t).astype(int), zero_division=0) for t in grid]))])
print({"AUROC_sample": auroc, "PR_AUC_sample": prauc, "best_thr_sample": best_t})

# keep objects for part 2
val_idx_all = val_idx_all  # full validation row indices
num_base = num_base
emb_cols = emb_cols
emb = emb  # indexed by user_id
df_ids = df[["trip_id","user_id"]].copy()
del X_train, X_val_small, uid_train, uid_val_small
gc.collect()


# STEP 6C-SAFE (part 2/2): STREAM full validation to get per-user scores (low RAM)


In [None]:
# STEP 6C-SAFE (part 2/2): STREAM full validation to get per-user scores (low RAM)
import numpy as np, pandas as pd, gc
from sklearn.metrics import roc_auc_score, average_precision_score

# --- sanity checks (must have run 6C part-1) ---
needed = ["df", "df_ids", "val_idx_all", "num_base", "emb", "emb_cols", "gbm"]
for name in needed:
    assert name in globals(), f"Missing '{name}'. Please run 6C part-1 first."

# --- chunked scoring over full validation set ---
CHUNK = 200_000  # lower if RAM is tight (e.g., 120_000)
val_idx_all = np.asarray(val_idx_all)
n_val = len(val_idx_all)
print(f"Scoring full validation in chunks: {n_val} rows")

# arrays for metrics
y_full = df.loc[val_idx_all, "incident_flag"].astype("int8").to_numpy()
p_full = np.empty(n_val, dtype=np.float32)

# per-user aggregation
from collections import defaultdict
sum_d = defaultdict(float)
cnt_d = defaultdict(int)

sample_rows = []  # keep a tiny preview (up to 10 rows)

for a in range(0, n_val, CHUNK):
    b = min(n_val, a + CHUNK)
    idx = val_idx_all[a:b]

    # base numeric block
    Xb = df.loc[idx, num_base].astype("float32").copy()
    # user ids for this chunk
    uids = df.loc[idx, "user_id"]

    # add embeddings per column via fast map; fill missing with 0.0
    for c in emb_cols:
        Xb[c] = uids.map(emb[c]).astype("float32").fillna(0.0)

    # predict
    pb = gbm.predict(Xb, num_iteration=gbm.best_iteration).astype("float32")
    p_full[a:b] = pb

    # aggregate per user
    uids_arr = uids.astype("string").values
    for uid, pr in zip(uids_arr, pb):
        sum_d[uid] += float(pr)
        cnt_d[uid] += 1

    # collect a tiny preview of per-trip predictions
    if len(sample_rows) < 10:
        trips = df_ids.loc[idx, "trip_id"].astype("string").values
        need = 10 - len(sample_rows)
        sample_rows.extend(list(zip(trips[:need], uids_arr[:need], pb[:need])))

    # cleanup
    del Xb, uids, uids_arr, pb
    gc.collect()

# --- metrics on full validation ---
auroc  = float(roc_auc_score(y_full, p_full))
prauc  = float(average_precision_score(y_full, p_full))
print({"AUROC_full": auroc, "PR_AUC_full": prauc})

# --- per-user table (fixed dtype handling) ---
users = np.array(list(sum_d.keys()), dtype=object)           # keep as object; not numpy "string"
proba_mean = np.array([sum_d[u] / cnt_d[u] for u in users], dtype=np.float32)
trips = np.array([cnt_d[u] for u in users], dtype=np.int32)

user_scores = pd.DataFrame({
    "user_id": users,        # object now; cast to pandas string below
    "proba_mean": proba_mean,
    "trips": trips
})
user_scores["user_id"] = user_scores["user_id"].astype("string")

def nova_score_from_proba(p, beta0=-3.5, beta1=10.0):
    s = 1.0 / (1.0 + np.exp(-(beta0 - beta1 * p)))
    return 300.0 + 600.0 * s

def decision_band(s):
    if s < 600:  return "<600 (Decline + coaching)"
    if s < 680:  return "600–679 (Caution)"
    if s < 740:  return "680–739 (Manual review)"
    return "≥740 (Approve)"

user_scores["nova_score"] = nova_score_from_proba(user_scores["proba_mean"]).astype("float32")
user_scores["decision_band"] = [decision_band(s) for s in user_scores["nova_score"]]

# --- save outputs ---
user_scores.to_csv("/content/user_scores_90d_n2v.csv", index=False)

if sample_rows:
    samp = pd.DataFrame(sample_rows, columns=["trip_id","user_id","proba"])
    samp["nova_score"] = nova_score_from_proba(samp["proba"]).astype("float32")
    samp["decision_band"] = [decision_band(s) for s in samp["nova_score"]]
    samp.to_csv("/content/val_preds_90d_n2v_sample.csv", index=False)

print("Saved -> /content/user_scores_90d_n2v.csv")
print("Saved -> /content/val_preds_90d_n2v_sample.csv")

display(user_scores.head(10))


# STEP 6D — Calibrate Nova Score with a well-spread sigmoid, then banding (90d)
*italicized text*

In [None]:
# STEP 6D — Calibrate Nova Score with a well-spread sigmoid, then banding (90d)
# Reads:  /content/user_scores_90d_n2v.csv
# Writes: /content/user_scores_90d_n2v_cal.csv
#         /content/val_preds_90d_n2v_cal_sample.csv  (if preview file exists)

import pandas as pd
import numpy as np
import os

USERS_PATH = "/content/user_scores_90d_n2v.csv"
assert os.path.exists(USERS_PATH), "Run STEP 6C first to create user_scores_90d_n2v.csv."

def nova_score_sigmoid(p, k=3.0, pivot=0.5):
    # s in [0,1], score = 300 + 600*s
    s = 1.0 / (1.0 + np.exp(-k * (pivot - p)))
    return 300.0 + 600.0 * s

def decision_band_v2(s):
    if s >= 800: return "≥800 (Auto-approve, large limit)"
    if s >= 700: return "700–799 (Standard approve)"
    if s >= 600: return "600–699 (Manual review)"
    return "<600 (Decline + coaching)"

# ---- 1) Calibrate per-user outputs ----
users = pd.read_csv(USERS_PATH, dtype={"user_id":"string"})
if "proba_mean" not in users.columns:
    raise ValueError("Expected column 'proba_mean' not found in user scores.")

users["nova_score"] = nova_score_sigmoid(users["proba_mean"].astype("float32")).astype("float32")
users["decision_band"] = [decision_band_v2(s) for s in users["nova_score"]]

OUT_USERS = "/content/user_scores_90d_n2v_cal.csv"
users.to_csv(OUT_USERS, index=False)
print(f"Saved → {OUT_USERS}")
print(users.head(10).to_string(index=False))

# ---- 2) (Optional) Calibrate tiny per-trip preview if it exists ----
SAMPLE_IN  = "/content/val_preds_90d_n2v_sample.csv"
SAMPLE_OUT = "/content/val_preds_90d_n2v_cal_sample.csv"
if os.path.exists(SAMPLE_IN):
    samp = pd.read_csv(SAMPLE_IN)
    if "proba" in samp.columns:
        samp["nova_score"] = nova_score_sigmoid(samp["proba"].astype("float32")).astype("float32")
        samp["decision_band"] = [decision_band_v2(s) for s in samp["nova_score"]]
        samp.to_csv(SAMPLE_OUT, index=False)
        print(f"Saved → {SAMPLE_OUT}")
    else:
        print(f"Preview file found but missing 'proba' column: {SAMPLE_IN}")
else:
    print("No per-trip preview found to calibrate (this is fine).")


# STEP 7A-PREP (90d): Merge embeds, select numeric features, standardize, split, and SAVE to disk.


In [None]:
# STEP 7A-PREP (90d): Merge embeds, select numeric features, standardize, split, and SAVE to disk.
import numpy as np, pandas as pd, json, gc
from pathlib import Path

PARQ = Path("/content/model_table_90d.parquet")
EMB  = Path("/content/user_n2v.parquet")
assert PARQ.exists() and EMB.exists(), "Missing inputs. Make sure Steps 4–6B ran."

outdir = Path("/content/ft90"); outdir.mkdir(exist_ok=True)

# 1) Load + merge once
df  = pd.read_parquet(PARQ, engine="pyarrow")
emb = pd.read_parquet(EMB,  engine="pyarrow")

df["user_id"]  = df["user_id"].astype("string")
emb["user_id"] = emb["user_id"].astype("string")
df = df.merge(emb, on="user_id", how="left").fillna(0.0)

# 2) Chronological 70/30 split
df["trip_date_dt"] = pd.to_datetime(df["trip_date_dt"], errors="coerce")
if str(df["trip_date_dt"].dtype).startswith("datetime64[ns,"):
    df["trip_date_naive"] = df["trip_date_dt"].dt.tz_localize(None)
else:
    df["trip_date_naive"] = df["trip_date_dt"]

cutoff = df["trip_date_naive"].quantile(0.70)
train_mask = df["trip_date_naive"] <= cutoff
val_mask   = df["trip_date_naive"] >  cutoff

# 3) Numeric-only features
id_cols = ["trip_id","user_id","trip_date_dt","trip_date_naive","incident_flag"]
num_cols = [c for c in df.columns if c not in id_cols and np.issubdtype(df[c].dtype, np.number)]
with open(outdir/"num_cols.json","w") as f:
    json.dump(num_cols, f)

X_train_df = df.loc[train_mask, num_cols].astype("float32")
y_train = df.loc[train_mask, "incident_flag"].astype("int8").to_numpy()
X_val_df = df.loc[val_mask,   num_cols].astype("float32")
y_val   = df.loc[val_mask,   "incident_flag"].astype("int8").to_numpy()

# 4) Standardize (fit on train only), save mu/sd for reuse
mu = X_train_df.mean(axis=0).to_numpy(dtype="float32")
sd = X_train_df.std(axis=0).replace(0, 1.0).to_numpy(dtype="float32")

X_train = (X_train_df.to_numpy() - mu) / sd
X_val   = (X_val_df.to_numpy()   - mu) / sd

# 5) Persist arrays (will be memory-mapped later)
np.save(outdir/"X_train.npy", X_train)
np.save(outdir/"y_train.npy", y_train)
np.save(outdir/"X_val.npy",   X_val)
np.save(outdir/"y_val.npy",   y_val)
np.save(outdir/"mu.npy",      mu)
np.save(outdir/"sd.npy",      sd)

# Also persist identifiers (aligned to X_val order) for export step
val_ids = df.loc[val_mask, ["trip_id","user_id","incident_flag"]].reset_index(drop=True)
val_ids.to_parquet(outdir/"val_ids.parquet", index=False)

# Free RAM
del df, emb, X_train_df, X_val_df, X_train, X_val, y_train, y_val, val_ids
gc.collect()
print("PREP DONE → saved arrays in /content/ft90")


# STEP 7A-SPLIT-1 (90d): Create small shards from the big training arrays.


In [None]:
# STEP 7A-SPLIT-1 (90d): Create small shards from the big training arrays.
import numpy as np
from pathlib import Path

ftdir = Path("/content/ft90")
X_tr = np.load(ftdir/"X_train.npy", mmap_mode="r")
y_tr = np.load(ftdir/"y_train.npy", mmap_mode="r")

# tune shard_rows downward if RAM is tight (e.g., 20000)
shard_rows = 50000
ft_shards = ftdir/"shards"
ft_shards.mkdir(exist_ok=True, parents=True)

n, f = X_tr.shape
n_shards = (n + shard_rows - 1)//shard_rows
print(f"Train shape: {X_tr.shape}  -> writing {n_shards} shards (~{shard_rows} rows each)")

for si in range(n_shards):
    a = si*shard_rows
    b = min(n, (si+1)*shard_rows)
    Xp = np.asarray(X_tr[a:b], dtype=np.float32, order="C")  # compact
    yp = np.asarray(y_tr[a:b], dtype=np.float32, order="C")
    np.save(ft_shards/f"X_{si:03d}.npy", Xp)
    np.save(ft_shards/f"y_{si:03d}.npy", yp)
    print(f"  wrote shard {si+1}/{n_shards}: rows {a}:{b}")

print("DONE → shards at /content/ft90/shards")


# STEP 7A-SPLIT-2 (90d): Re-split existing shards into smaller *micro-shards* to reduce RAM


In [None]:
# STEP 7A-SPLIT-2 (90d): Re-split existing shards into smaller *micro-shards* to reduce RAM
import numpy as np, os, glob
from pathlib import Path

BASE = Path("/content/ft90")
IN_DIR  = BASE / "shards"        # from 7A-SPLIT-1
OUT_DIR = BASE / "micro_shards"  # new, smaller shards live here
OUT_DIR.mkdir(parents=True, exist_ok=True)

# tune this down if you still see OOM (e.g., 5000)
MICRO_ROWS = 10_000

in_X = sorted(glob.glob(str(IN_DIR/"X_*.npy")))
in_y = sorted(glob.glob(str(IN_DIR/"y_*.npy")))
assert in_X and in_y and len(in_X) == len(in_y), "Run 7A-SPLIT-1 first to create shards."

print(f"Re-splitting {len(in_X)} shards into micro-shards of {MICRO_ROWS} rows each…")

ms_written = 0
for Xp_path, yp_path in zip(in_X, in_y):
    Xp = np.load(Xp_path, mmap_mode="r")
    yp = np.load(yp_path, mmap_mode="r")

    n = Xp.shape[0]
    # stem like "X_003" -> base tag "003"
    tag = Path(Xp_path).stem.split("X_")[1]

    for a in range(0, n, MICRO_ROWS):
        b = min(n, a + MICRO_ROWS)
        Xm = np.asarray(Xp[a:b], dtype=np.float32, order="C")
        ym = np.asarray(yp[a:b], dtype=np.float32, order="C")
        # name: X_003_000.npy , y_003_000.npy , etc.
        piece = f"{int(a/MICRO_ROWS):03d}"
        np.save(OUT_DIR/f"X_{tag}_{piece}.npy", Xm)
        np.save(OUT_DIR/f"y_{tag}_{piece}.npy", ym)
        ms_written += 1

print(f"Done. Wrote {ms_written} micro-shards to {OUT_DIR}")

# (Optional) delete the original larger shards to free space
DELETE_OLD = True
if DELETE_OLD:
    for p in in_X + in_y:
        try: os.remove(p)
        except Exception: pass
    print("Removed original shards in", IN_DIR)


# STEP 7A-TRAIN-MICRO (90d): Train FT-Transformer using micro-shards (extra RAM-safe) -- split 2 -- part 2



In [None]:
# STEP 7A-TRAIN-MICRO (90d): Train FT-Transformer using micro-shards (extra RAM-safe)
import numpy as np, time, json, glob
from pathlib import Path
import torch, torch.nn as nn
from torch.utils.data import IterableDataset, DataLoader
from sklearn.metrics import roc_auc_score

torch.set_num_threads(2)
device = "cuda" if torch.cuda.is_available() else "cpu"
print("Device:", device)

ftdir   = Path("/content/ft90")
shards  = ftdir / "micro_shards"   # <<< use micro-shards
with open(ftdir/"num_cols.json") as f: num_cols = json.load(f)

# --- Validation memmap (small CHUNK to be safe) ---
X_val = np.load(ftdir/"X_val.npy", mmap_mode="r")
y_val = np.load(ftdir/"y_val.npy")
print(f"Val shape: {X_val.shape}")

# ---- Iterable dataset over micro-shards ----
class ShardBatcher(IterableDataset):
    def __init__(self, shard_dir: Path, batch_size=96, shuffle=True, max_shards=None):
        self.dir = shard_dir
        self.batch = int(batch_size)
        self.shuffle = shuffle
        self.files_X = sorted([p for p in self.dir.glob("X_*.npy")])
        if not self.files_X:
            raise FileNotFoundError(f"No micro-shard files in {self.dir}. Run 7A-SPLIT-2 first.")
        if max_shards is not None:
            self.files_X = self.files_X[:int(max_shards)]
        print(f"ShardBatcher: {len(self.files_X)} micro-shards, batch={self.batch}, shuffle={self.shuffle}")
    def __iter__(self):
        rng = np.random.default_rng(42)
        shard_paths = self.files_X.copy()
        if self.shuffle: rng.shuffle(shard_paths)
        total = len(shard_paths)
        for si, Xp_path in enumerate(shard_paths):
            # Pair y by replacing prefix
            yp_path = Path(str(Xp_path).replace("/X_", "/y_").replace("\\X_", "\\y_"))
            Xp = np.load(Xp_path, mmap_mode="r")
            yp = np.load(yp_path, mmap_mode="r")
            n = Xp.shape[0]
            order = np.arange(n)
            if self.shuffle: rng.shuffle(order)
            print(f"→ micro {si+1}/{total}: {Xp_path.name}  rows={n}")
            for s in range(0, n, self.batch):
                b = order[s:s+self.batch]
                xb = np.asarray(Xp[b], dtype=np.float32, order="C")
                yb = np.asarray(yp[b], dtype=np.float32, order="C")
                yield torch.from_numpy(xb), torch.from_numpy(yb)

# ---- Speed/RAM knobs (smaller than before) ----
BATCH = 96            # smaller batches
ACCUM = 4             # effective batch = 96*4
MAX_BATCHES_PER_EPOCH = 450   # tighter budget
EPOCHS, patience = 2, 1

train_loader = DataLoader(
    ShardBatcher(shards, batch_size=BATCH, shuffle=True),
    batch_size=None, num_workers=0, pin_memory=False
)

# ---- Compact FT-Transformer ----
class FTTransformer(nn.Module):
    def __init__(self, n_features, d_model=48, n_layers=2, n_heads=4, dropout=0.1):
        super().__init__()
        self.weight = nn.Parameter(torch.randn(n_features, d_model)*0.02)
        self.bias   = nn.Parameter(torch.zeros(n_features, d_model))
        self.cls    = nn.Parameter(torch.zeros(1,1,d_model))
        enc = nn.TransformerEncoderLayer(d_model=d_model, nhead=n_heads,
                                         dim_feedforward=4*d_model,
                                         dropout=dropout, batch_first=True,
                                         activation="gelu")
        self.encoder = nn.TransformerEncoder(enc, num_layers=n_layers)
        self.norm = nn.LayerNorm(d_model)
        self.head = nn.Linear(d_model, 1)
    def forward(self, x):
        t = x.unsqueeze(-1)*self.weight + self.bias
        z = torch.cat([self.cls.expand(t.size(0),1,-1), t], dim=1)
        z = self.encoder(z)
        z = self.norm(z[:,0])
        return self.head(z).squeeze(-1)

# class weight from first micro-shard
y_files = sorted(glob.glob(str(shards/"y_*.npy")))
first_y = np.load(y_files[0], mmap_mode="r")
pos_rate = max(1e-6, float(np.mean(first_y)))
pos_w = (1.0 - pos_rate) / pos_rate
print(f"Estimated pos_rate: {pos_rate:.6f}  pos_weight={pos_w:.2f}")

n_features = X_val.shape[1]
model = FTTransformer(n_features).to(device)
criterion = nn.BCEWithLogitsLoss(pos_weight=torch.tensor([pos_w], device=device))
optimizer = torch.optim.AdamW(model.parameters(), lr=3e-4, weight_decay=1e-4)

# autocast helper
if device == "cuda":
    from torch.amp import autocast, GradScaler
    scaler = GradScaler("cuda")
else:
    class _NoOp:
        def __enter__(self): return None
        def __exit__(self, *a): return False
    def autocast(*a, **k): return _NoOp()
    scaler = None

def eval_val(model):
    model.eval()
    preds = []
    with torch.no_grad():
        CH = 2048   # smaller eval chunk = lower RAM
        for s in range(0, X_val.shape[0], CH):
            xb = torch.from_numpy(np.asarray(X_val[s:s+CH], dtype=np.float32)).to(device)
            with autocast("cuda"):
                p = torch.sigmoid(model(xb)).float().cpu().numpy()
            preds.append(p)
    return float(roc_auc_score(y_val, np.concatenate(preds))), np.concatenate(preds)

best_auc, bad = -1.0, 0
for epoch in range(1, EPOCHS+1):
    model.train(); t0=time.time()
    running, step, accum = 0.0, 0, 0
    for xb, yb in train_loader:
        xb = xb.to(device=device, dtype=torch.float32, non_blocking=False)
        yb = yb.to(device=device, dtype=torch.float32, non_blocking=False)
        with autocast("cuda"):
            logits = model(xb)
            loss = criterion(logits, yb) / ACCUM
        if scaler: scaler.scale(loss).backward()
        else:      loss.backward()
        accum += 1
        if accum % ACCUM == 0:
            nn.utils.clip_grad_norm_(model.parameters(), 1.0)
            if scaler:
                scaler.step(optimizer); scaler.update()
            else:
                optimizer.step()
            optimizer.zero_grad(set_to_none=True)
        running += float(loss.item())*xb.size(0); step += 1
        if step % 100 == 0:
            print(f"  step {step}  (loss chunk avg ~ {running/max(1,step*BATCH):.4f})")
        if step >= MAX_BATCHES_PER_EPOCH:
            print(f"Reached MAX_BATCHES_PER_EPOCH={MAX_BATCHES_PER_EPOCH}; ending epoch early.")
            break

    if accum % ACCUM != 0:
        nn.utils.clip_grad_norm_(model.parameters(), 1.0)
        if scaler: scaler.step(optimizer); scaler.update()
        else:      optimizer.step()
        optimizer.zero_grad(set_to_none=True)

    auc, p_val = eval_val(model)
    print(f"[{epoch}] train_loss≈{running/max(1,(step*BATCH)):.4f}  val_auc={auc:.6f}")

    if auc > best_auc + 1e-4:
        best_auc, bad = auc, 0
        best_state = {k: v.detach().cpu().clone() for k,v in model.state_dict().items()}
        np.save(ftdir/"ft_val_proba.npy", p_val)
    else:
        bad += 1
        if bad >= patience:
            print("Early stopping."); break

model.load_state_dict(best_state)
torch.save(model.state_dict(), ftdir/"ft_model.pt")
print(f"TRAIN DONE → best val AUC: {best_auc:.6f}. Saved: /content/ft90/ft_model.pt and ft_val_proba.npy")


# STEP 7B (90d): evaluate FT on validation, per-trip preview, per-user FT scores


In [None]:
# STEP 7B (90d): evaluate FT on validation, per-trip preview, per-user FT scores
import numpy as np, pandas as pd
from pathlib import Path
from sklearn.metrics import roc_auc_score, average_precision_score

FTDIR = Path("/content/ft90")
assert (FTDIR/"ft_val_proba.npy").exists() and (FTDIR/"y_val.npy").exists() and (FTDIR/"val_ids.parquet").exists(), "Run 7A-TRAIN first."

p_val  = np.load(FTDIR/"ft_val_proba.npy")
y_val  = np.load(FTDIR/"y_val.npy")
val_ids = pd.read_parquet(FTDIR/"val_ids.parquet", engine="pyarrow")  # trip_id,user_id,incident_flag
assert len(p_val) == len(y_val) == len(val_ids), "Length mismatch; re-run 7A-PREP."

print({"AUROC": float(roc_auc_score(y_val, p_val)),
       "PR_AUC": float(average_precision_score(y_val, p_val))})

tmp = val_ids.copy()
tmp["proba"] = p_val.astype("float32")

def nova_score_from_proba(p, beta0=-3.5, beta1=10.0):
    s = 1.0 / (1.0 + np.exp(-(beta0 - beta1 * p)))
    return 300.0 + 600.0 * s

def decision_band(s):
    if s < 600:  return "<600 (Decline + coaching)"
    if s < 680:  return "600–679 (Caution)"
    if s < 740:  return "680–739 (Manual review)"
    return "≥740 (Approve)"

tmp["nova_score"] = nova_score_from_proba(tmp["proba"]).astype("float32")
tmp["decision_band"] = [decision_band(s) for s in tmp["nova_score"]]

# tiny per-trip preview
tmp.sample(min(10, len(tmp)), random_state=7).to_csv("/content/val_preds_90d_ft_sample.csv", index=False)
print("Saved → /content/val_preds_90d_ft_sample.csv")

# per-user FT scores
ft_users = (tmp.groupby("user_id", as_index=False)
            .agg(proba_mean=("proba","mean"), trips=("proba","size")))
ft_users["nova_score"] = nova_score_from_proba(ft_users["proba_mean"]).astype("float32")
ft_users["decision_band"] = [decision_band(s) for s in ft_users["nova_score"]]

out_ft = "/content/ft90/user_scores_90d_ft.csv"
ft_users.to_csv(out_ft, index=False)
print(f"Saved → {out_ft}")
display(ft_users.head(10))


# STEP 7E (ROBUST) — user-level aggregation + baseline stacking with single-class safety (90d)


In [None]:
# STEP 7C — user-level ensemble (FT + Node2Vec) for 90d

import numpy as np, pandas as pd
from pathlib import Path

FT_USERS  = Path("/content/ft90/user_scores_90d_ft.csv")      # from 7B
N2V_USERS = Path("/content/user_scores_90d_n2v_cal.csv")      # from 6D (or /content/user_scores_90d_n2v.csv if you skipped 6D)
OUT_PATH  = Path("/content/ensemble_user_scores_90d.csv")

assert FT_USERS.exists(), "Missing FT user scores. Run 7B first."
ft = pd.read_csv(FT_USERS, dtype={"user_id":"string"})

# Prefer calibrated N2V if available; else fall back to raw N2V
use_n2v = N2V_USERS.exists()
if use_n2v:
    n2v = pd.read_csv(N2V_USERS, dtype={"user_id":"string"})
    # keep only what we need and rename proba to avoid collisions
    base_cols = {"user_id","proba_mean","trips"}
    missing = base_cols - set(n2v.columns)
    if missing:
        raise ValueError(f"N2V file missing columns: {missing}")
    n2v = n2v[["user_id","proba_mean","trips"]].rename(
        columns={"proba_mean":"proba_mean_n2v","trips":"trips_n2v"}
    )
    df = ft.merge(n2v, on="user_id", how="left")
else:
    df = ft.copy()
    df["proba_mean_n2v"] = np.nan
    df["trips_n2v"] = np.nan

# blend weights (adjust if you prefer)
W_FT, W_N2V = 0.6, 0.4

# final probability per user
df["proba_blend"] = np.where(
    df["proba_mean_n2v"].notna(),
    W_FT*df["proba_mean"].values + W_N2V*df["proba_mean_n2v"].values,
    df["proba_mean"].values,  # fallback to FT when N2V missing
)

# Nova score mapping (higher risk -> lower score), temperature tunes spread (keep modest here)
def nova_score_from_proba(p, temperature=0.35):
    p = np.clip(p, 1e-6, 1-1e-6)
    z = (0.5 - p) / temperature
    return 300.0 + 600.0 * (1.0 / (1.0 + np.exp(-z)))

def band_from_score(s):
    if s >= 750: return "≥750 (Auto-approve)"
    if s >= 700: return "700–749 (Standard approve)"
    if s >= 600: return "600–699 (Manual review)"
    return "<600 (Decline + coaching)"

df["nova_score"]    = nova_score_from_proba(df["proba_blend"].values)
df["decision_band"] = df["nova_score"].apply(band_from_score)

keep_cols = ["user_id", "proba_mean", "proba_mean_n2v", "proba_blend",
             "trips", "trips_n2v", "nova_score","decision_band"]
out = df[keep_cols].copy()

out.to_csv(OUT_PATH, index=False)
print(f"Saved → {OUT_PATH}")
print(out.head(10).to_string(index=False))

print("\nBand distribution (ensemble):")
print(out["decision_band"].value_counts().to_string())


# STEP 7C — user-level ensemble (FT + Node2Vec) for 90d


In [None]:
# STEP 7C — user-level ensemble (FT + Node2Vec) for 90d
import numpy as np, pandas as pd
from pathlib import Path

FT_USERS  = Path("/content/ft90/user_scores_90d_ft.csv")     # from 7B
N2V_USERS = Path("/content/user_scores_90d_n2v_cal.csv")     # from 6D
OUT_PATH  = Path("/content/ensemble_user_scores_90d.csv")

assert FT_USERS.exists(), "Missing FT user scores. Run 7B first."
ft = pd.read_csv(FT_USERS, dtype={"user_id":"string"})

# optional Node2Vec (calibrated) — if missing, we just pass through FT
use_n2v = N2V_USERS.exists()
if use_n2v:
    n2v = pd.read_csv(N2V_USERS, dtype={"user_id":"string"})
    n2v = n2v[["user_id","proba_mean","trips"]].rename(
        columns={"proba_mean":"proba_mean_n2v","trips":"trips_n2v"}
    )
    df = ft.merge(n2v, on="user_id", how="left")
else:
    df = ft.copy()
    df["proba_mean_n2v"] = np.nan

# blend weights (adjust if you prefer)
W_FT, W_N2V = 0.6, 0.4

# final probability per user
df["proba_blend"] = np.where(
    df["proba_mean_n2v"].notna(),
    W_FT*df["proba_mean"].values + W_N2V*df["proba_mean_n2v"].values,
    df["proba_mean"].values,  # fallback to FT when no N2V
)

# Nova score mapping (higher risk -> lower score) 300–900
def nova_score_from_proba(p, temperature=0.35):
    p = np.clip(p, 1e-6, 1-1e-6)
    z = (0.5 - p) / temperature
    return 300.0 + 600.0 * (1.0 / (1.0 + np.exp(-z)))

def band_from_score(s):
    if s >= 750: return "≥750 (Auto-approve)"
    if s >= 700: return "700–749 (Standard approve)"
    if s >= 600: return "600–699 (Manual review)"
    return "<600 (Decline + coaching)"

df["nova_score"]    = nova_score_from_proba(df["proba_blend"].values)
df["decision_band"] = df["nova_score"].apply(band_from_score)

# tidy output table
keep_cols = ["user_id", "proba_mean", "proba_mean_n2v", "proba_blend",
             "trips"] + (["trips_n2v"] if use_n2v else []) + ["nova_score","decision_band"]
out = df[keep_cols].copy()

out.to_csv(OUT_PATH, index=False)
print(f"Saved → {OUT_PATH}")
print(out.head(10).to_string(index=False))

print("\nBand distribution (ensemble):")
print(out["decision_band"].value_counts().to_string())


# STEP 7F-PREP-TCN — build weekly user sequences (90d) for a temporal model (TCN)


In [None]:
# STEP 7F-PREP-TCN — build weekly user sequences (90d) for a temporal model (TCN)
# Inputs: /content/model_table_90d.parquet
# Outputs under /content/tcn90/: X_train_seq.npy, y_train.npy, X_val_seq.npy, y_val.npy,
#                               users_train.csv, users_val.csv, feat_names.json

import numpy as np, pandas as pd, json, gc
from pathlib import Path

PARQ = Path("/content/model_table_90d.parquet")
assert PARQ.exists(), "Missing model_table_90d.parquet. Run STEP 4 (90d) first."

outdir = Path("/content/tcn90"); outdir.mkdir(exist_ok=True, parents=True)

# 1) Load minimal columns
use_cols = [
    "user_id","trip_date_dt","incident_flag",
    "trip_duration","trip_distance","fare_amount","tip_amount","trip_rating",
    "cancellation_flag","safety_score","speed_kmph","fare_per_km","tip_pct"
]
df = pd.read_parquet(PARQ, engine="pyarrow", columns=[c for c in use_cols if c in pd.read_parquet(PARQ, engine="pyarrow").columns])

# types
df["user_id"] = df["user_id"].astype("string")
df["trip_date_dt"] = pd.to_datetime(df["trip_date_dt"], errors="coerce")
for c in df.columns:
    if c not in ("user_id","trip_date_dt","incident_flag") and pd.api.types.is_numeric_dtype(df[c]) is False:
        df[c] = pd.to_numeric(df[c], errors="coerce")
df["incident_flag"] = df["incident_flag"].astype("int8")

# 2) Chronological split (same 70/30 logic as FT)
if str(df["trip_date_dt"].dtype).startswith("datetime64[ns,"):
    df["trip_date_naive"] = df["trip_date_dt"].dt.tz_localize(None)
else:
    df["trip_date_naive"] = df["trip_date_dt"]

cutoff = df["trip_date_naive"].quantile(0.70)
print("7F cutoff:", cutoff)
last_trip = df.groupby("user_id", observed=True)["trip_date_naive"].max().rename("last_trip").reset_index()
train_users = set(last_trip.loc[last_trip["last_trip"] <= cutoff, "user_id"].astype("string"))
val_users   = set(last_trip.loc[last_trip["last_trip"] >  cutoff, "user_id"].astype("string"))

# 3) Weekly bucket (Mon-start weeks), choose a fixed horizon H weeks (e.g., 12)
H = 12
week = df["trip_date_naive"].dt.to_period("W-MON").apply(lambda p: p.start_time)
df = df.assign(week_start=week)

# 4) Aggregate per user-week (keep compact, stable numerics)
agg = df.groupby(["user_id","week_start"], observed=True).agg(
    trips=("incident_flag","size"),
    incidents=("incident_flag","sum"),
    fare_sum=("fare_amount","sum"),
    fare_mean=("fare_amount","mean"),
    dist_mean=("trip_distance","mean"),
    dur_mean=("trip_duration","mean"),
    rating_mean=("trip_rating","mean"),
    safety_mean=("safety_score","mean"),
    tip_mean=("tip_amount","mean"),
    tip_pct_mean=("tip_pct","mean"),
    cancel_rate=("cancellation_flag","mean"),
    speed_mean=("speed_kmph","mean"),
    fpk_mean=("fare_per_km","mean"),
).reset_index()

# 5) Normalize weeks to a shared window (last H weeks seen globally)
global_last_week = agg["week_start"].max()
week_ends = pd.date_range(end=global_last_week, periods=H, freq="W-MON")
week_starts = (week_ends - pd.to_timedelta(7, unit="D"))  # start of each Mon-week
week_starts = pd.to_datetime(week_starts)

# 6) Build dense sequences: for each user, align to these H week_starts
feat_cols = ["trips","incidents","fare_sum","fare_mean","dist_mean","dur_mean",
             "rating_mean","safety_mean","tip_mean","tip_pct_mean","cancel_rate",
             "speed_mean","fpk_mean"]
D = len(feat_cols)

def build_seq(group_df):
    # map from week_start -> feature row
    wk_map = group_df.set_index("week_start")[feat_cols]
    mat = np.zeros((H, D), dtype=np.float32)
    for i, ws in enumerate(week_starts):
        row = wk_map.loc[ws] if ws in wk_map.index else None
        if row is not None:
            mat[i, :] = row.fillna(0.0).to_numpy(dtype=np.float32)
        # else zeros
    return mat

# Split users first to avoid building for everyone twice
agg["user_id"] = agg["user_id"].astype("string")
agg_train = agg[agg["user_id"].isin(train_users)]
agg_val   = agg[agg["user_id"].isin(val_users)]

# labels: ANY incident within the H-week window we just aligned
def labels_from_window(group_df):
    # any incident in those H weeks
    return int(group_df["incidents"].fillna(0).sum() > 0)

# Build sequences per user (train)
train_list, train_labels, train_uids = [], [], []
for uid, g in agg_train.groupby("user_id", observed=True):
    seq = build_seq(g)
    train_list.append(seq)
    train_labels.append(labels_from_window(g))
    train_uids.append(str(uid))

# Build sequences per user (val)
val_list, val_labels, val_uids = [], [], []
for uid, g in agg_val.groupby("user_id", observed=True):
    seq = build_seq(g)
    val_list.append(seq)
    val_labels.append(labels_from_window(g))
    val_uids.append(str(uid))

X_train = np.stack(train_list, axis=0) if train_list else np.zeros((0,H,D), np.float32)
X_val   = np.stack(val_list,   axis=0) if val_list   else np.zeros((0,H,D), np.float32)
y_train = np.asarray(train_labels, dtype=np.int8)
y_val   = np.asarray(val_labels,   dtype=np.int8)

# 7) Save arrays + user lists + feature names
np.save(outdir/"X_train_seq.npy", X_train)
np.save(outdir/"y_train.npy",     y_train)
np.save(outdir/"X_val_seq.npy",   X_val)
np.save(outdir/"y_val.npy",       y_val)

pd.DataFrame({"user_id": train_uids}).to_csv(outdir/"users_train.csv", index=False)
pd.DataFrame({"user_id": val_uids}).to_csv(outdir/"users_val.csv", index=False)
with open(outdir/"feat_names.json","w") as f:
    json.dump({"feat_cols": feat_cols, "H": H}, f)

gc.collect()
print("7F-PREP-TCN DONE")
print("Train seq:", X_train.shape, "pos:", int(y_train.sum()), "neg:", int((y_train==0).sum()))
print("Val   seq:", X_val.shape,   "pos:", int(y_val.sum()),   "neg:", int((y_val==0).sum()))
print("Saved to:", outdir)


# STEP 7F-PREP-TCN-FIX-v2 — auto-pick windows & label threshold (ensures train users & both classes)


In [None]:
# STEP 7F-PREP-TCN-FIX-v3 — derive windows from actual weeks so we get users on both sides
# Inputs:  /content/model_table_90d.parquet
# Outputs: /content/tcn90/{X_train_seq.npy,y_train.npy,X_val_seq.npy,y_val.npy,
#                       users_train.csv,users_val.csv,feat_names.json,label_info.json}

import numpy as np, pandas as pd, json, gc
from pathlib import Path

PARQ = Path("/content/model_table_90d.parquet")
assert PARQ.exists(), "Missing model_table_90d.parquet. Run STEP 4 (90d) first."
outdir = Path("/content/tcn90"); outdir.mkdir(exist_ok=True, parents=True)

# 1) Load minimal columns
need_cols = [
    "user_id","trip_date_dt","incident_flag",
    "trip_duration","trip_distance","fare_amount","tip_amount","trip_rating",
    "cancellation_flag","safety_score","speed_kmph","fare_per_km","tip_pct"
]
cols_avail = pd.read_parquet(PARQ, engine="pyarrow").columns
use_cols = [c for c in need_cols if c in cols_avail]
df = pd.read_parquet(PARQ, engine="pyarrow", columns=use_cols)

df["user_id"] = df["user_id"].astype("string")
df["trip_date_dt"] = pd.to_datetime(df["trip_date_dt"], errors="coerce")
df["incident_flag"] = pd.to_numeric(df["incident_flag"], errors="coerce").fillna(0).astype("int8")
for c in df.columns:
    if c not in ("user_id","trip_date_dt","incident_flag"):
        df[c] = pd.to_numeric(df[c], errors="coerce")

# 2) Weeks + per-user-week aggregation (use actual weeks present)
dt_naive = df["trip_date_dt"].dt.tz_localize(None) if str(df["trip_date_dt"].dtype).startswith("datetime64[ns,") else df["trip_date_dt"]
df["week_start"] = dt_naive.dt.to_period("W-MON").apply(lambda p: p.start_time)

agg = df.groupby(["user_id","week_start"], observed=True).agg(
    trips=("incident_flag","size"),
    incidents=("incident_flag","sum"),
    fare_sum=("fare_amount","sum"),
    fare_mean=("fare_amount","mean"),
    dist_mean=("trip_distance","mean"),
    dur_mean=("trip_duration","mean"),
    rating_mean=("trip_rating","mean"),
    safety_mean=("safety_score","mean"),
    tip_mean=("tip_amount","mean"),
    tip_pct_mean=("tip_pct","mean"),
    cancel_rate=("cancellation_flag","mean"),
    speed_mean=("speed_kmph","mean"),
    fpk_mean=("fare_per_km","mean"),
).reset_index()
agg["user_id"] = agg["user_id"].astype("string")

feat_cols = ["trips","incidents","fare_sum","fare_mean","dist_mean","dur_mean",
             "rating_mean","safety_mean","tip_mean","tip_pct_mean","cancel_rate",
             "speed_mean","fpk_mean"]
D = len(feat_cols)

# 3) Build windows from the actual unique weeks in data
weeks_all = np.array(sorted(agg["week_start"].unique()))
assert weeks_all.size > 0, "No weekly data found."

# choose up to 8 weeks for val (last weeks), and up to 8 weeks just before that for train
H_val = min(8, weeks_all.size // 2) or 1
val_weeks = weeks_all[-H_val:]
before_val = weeks_all[: -H_val] if H_val < weeks_all.size else np.array([], dtype="datetime64[ns]")
H_train = min(8, before_val.size) or min(1, weeks_all.size)  # at least 1 if anything exists
train_weeks = before_val[-H_train:] if before_val.size else weeks_all[:H_train]

print(f"[7F-FIX-v3] weeks_all={weeks_all.size} | H_train={H_train} | H_val={H_val}")
print(f"[7F-FIX-v3] train range: {train_weeks[0] if train_weeks.size else 'n/a'} → {train_weeks[-1] if train_weeks.size else 'n/a'}")
print(f"[7F-FIX-v3]   val range: {val_weeks[0] if val_weeks.size else 'n/a'} → {val_weeks[-1] if val_weeks.size else 'n/a'}")

agg_t = agg[agg["week_start"].isin(train_weeks)].copy()
agg_v = agg[agg["week_start"].isin(val_weeks)].copy()
print(f"[7F-FIX-v3] train users={agg_t['user_id'].nunique()} | val users={agg_v['user_id'].nunique()}")

# 4) Build dense sequences aligned to their own windows
def build_seq_for_user(gdf, weeks_sorted):
    m = np.zeros((len(weeks_sorted), D), dtype=np.float32)
    mdf = gdf.set_index("week_start")[feat_cols]
    for i, ws in enumerate(weeks_sorted):
        if ws in mdf.index:
            m[i,:] = mdf.loc[ws].fillna(0.0).to_numpy(dtype=np.float32)
    return m

def make_side(agg_side, weeks_sorted):
    seqs, counts, uids = [], [], []
    for uid, g in agg_side.groupby("user_id", observed=True):
        seqs.append(build_seq_for_user(g, weeks_sorted))
        counts.append(float(g["incidents"].fillna(0).sum()))
        uids.append(str(uid))
    X = np.stack(seqs, axis=0).astype(np.float32) if seqs else np.zeros((0, len(weeks_sorted), D), np.float32)
    return X, np.asarray(counts, dtype=np.float32), uids

X_train, cnt_train, u_train = make_side(agg_t, list(train_weeks))
X_val,   cnt_val,   u_val   = make_side(agg_v,  list(val_weeks))

print(f"[7F-FIX-v3] X_train: {X_train.shape} | X_val: {X_val.shape}")

# 5) Pick incidents threshold k to create both classes (if possible)
def pick_k(counts):
    if counts.size == 0: return 1
    for k in [1,2,3,4,5]:
        rate = float((counts >= k).mean())
        if 0.05 <= rate <= 0.95:
            return k
    # fallback to 80th percentile (>=2 typically)
    return int(max(1, np.ceil(np.quantile(counts, 0.80))))

combined = np.concatenate([cnt_train, cnt_val]) if cnt_train.size + cnt_val.size else np.array([0.0])
k = pick_k(combined)
y_train = (cnt_train >= k).astype(np.int8) if cnt_train.size else np.zeros((0,), np.int8)
y_val   = (cnt_val   >= k).astype(np.int8) if cnt_val.size else np.zeros((0,), np.int8)

print(f"[7F-FIX-v3] label k={k} | train pos={int(y_train.sum())}/{len(y_train)} | val pos={int(y_val.sum())}/{len(y_val)}")

# 6) Save
np.save(outdir/"X_train_seq.npy", X_train)
np.save(outdir/"y_train.npy",     y_train)
np.save(outdir/"X_val_seq.npy",   X_val)
np.save(outdir/"y_val.npy",       y_val)

pd.DataFrame({"user_id": u_train}).to_csv(outdir/"users_train.csv", index=False)
pd.DataFrame({"user_id": u_val}).to_csv(outdir/"users_val.csv", index=False)
with open(outdir/"feat_names.json","w") as f:
    json.dump({"feat_cols": feat_cols, "H_train": int(H_train), "H_val": int(H_val)}, f)
with open(outdir/"label_info.json","w") as f:
    json.dump({
        "k_incidents": int(k),
        "train_weeks": [str(x) for x in train_weeks],
        "val_weeks":   [str(x) for x in val_weeks]
    }, f)

gc.collect()
print("7F-PREP-TCN-FIX-v3 DONE")


# STEP 7F-TCN-TRAIN — train a light TCN on the sequences from 7F-PREP-TCN-FIX-v3


In [None]:
# STEP 7F-TCN-TRAIN — train a light TCN on the sequences from 7F-PREP-TCN-FIX-v3

import numpy as np, pandas as pd, torch, torch.nn as nn, time
from pathlib import Path
from sklearn.metrics import roc_auc_score, average_precision_score

torch.set_num_threads(2)
device = "cuda" if torch.cuda.is_available() else "cpu"
print("Device:", device)

tcn_dir = Path("/content/tcn90")
X_train = np.load(tcn_dir/"X_train_seq.npy", mmap_mode="r")
y_train = np.load(tcn_dir/"y_train.npy")
X_val   = np.load(tcn_dir/"X_val_seq.npy",   mmap_mode="r")
y_val   = np.load(tcn_dir/"y_val.npy")
print("Train:", X_train.shape, "| Val:", X_val.shape)

H = X_val.shape[1] if X_val.ndim==3 else 0
D = X_val.shape[2] if X_val.ndim==3 else 0
assert H>0 and D>0, "Empty sequences. Re-run 7F-PREP-TCN-FIX-v3 if this triggers."

class SmallTCN(nn.Module):
    def __init__(self, D, hidden=64, k=3):
        super().__init__()
        self.net = nn.Sequential(
            nn.Conv1d(D, hidden, kernel_size=k, padding="same", dilation=1),
            nn.GELU(),
            nn.Conv1d(hidden, hidden, kernel_size=k, padding="same", dilation=2),
            nn.GELU(),
            nn.Conv1d(hidden, hidden, kernel_size=k, padding="same", dilation=4),
            nn.GELU(),
        )
        self.head = nn.Linear(hidden, 1)
    def forward(self, x):          # x: (N, H, D)
        x = x.transpose(1,2)       # -> (N, D, H)
        z = self.net(x)            # (N, hidden, H)
        z = torch.amax(z, dim=-1)  # global max pool over time
        return self.head(z).squeeze(-1)

model = SmallTCN(D).to(device)

# class-weighted BCE
pos_rate = float(max(1e-6, y_train.mean())) if len(y_train) else 0.5
pos_w = (1.0 - pos_rate) / max(1e-6, pos_rate)
crit  = nn.BCEWithLogitsLoss(pos_weight=torch.tensor([pos_w], device=device))
opt   = torch.optim.AdamW(model.parameters(), lr=2e-3, weight_decay=1e-4)

BATCH, EPOCHS, PATIENCE = 256, 10, 2

def batches(X, y, bs=BATCH, shuffle=True):
    n = X.shape[0]; order = np.arange(n)
    if shuffle: np.random.default_rng(42).shuffle(order)
    for s in range(0, n, bs):
        idx = order[s:s+bs]
        xb = torch.from_numpy(np.asarray(X[idx], dtype=np.float32)).to(device)
        yb = torch.from_numpy(np.asarray(y[idx], dtype=np.float32)).to(device)
        yield xb, yb

def eval_val():
    model.eval(); preds=[]; CH=4096
    with torch.no_grad():
        for s in range(0, X_val.shape[0], CH):
            xb = torch.from_numpy(np.asarray(X_val[s:s+CH], dtype=np.float32)).to(device)
            p = torch.sigmoid(model(xb)).float().cpu().numpy()
            preds.append(p)
    p = np.concatenate(preds)
    ok = (np.unique(y_val).size == 2)
    auc = float(roc_auc_score(y_val, p)) if ok else float("nan")
    pr  = float(average_precision_score(y_val, p)) if ok else float("nan")
    return auc, pr, p

best, bad, best_state = -1.0, 0, None
for ep in range(1, EPOCHS+1):
    model.train(); t0=time.time(); run=0.0; seen=0
    for xb, yb in batches(X_train, y_train):
        logits = model(xb); loss = crit(logits, yb)
        loss.backward(); nn.utils.clip_grad_norm_(model.parameters(), 1.0)
        opt.step(); opt.zero_grad()
        run += float(loss.item())*xb.size(0); seen += xb.size(0)
    auc, pr, p_val = eval_val()
    print(f"[{ep}] train_loss≈{(run/max(1,seen)):.4f}  val_auc={auc:.6f}  pr_auc={pr:.6f}  time={(time.time()-t0):.1f}s")
    score = (auc if not np.isnan(auc) else -(run/max(1,seen)))
    if score > best + 1e-4:
        best, bad, best_state = score, 0, {k:v.detach().cpu().clone() for k,v in model.state_dict().items()}
        np.save(tcn_dir/"tcn_val_proba.npy", p_val)
    else:
        bad += 1
        if bad >= PATIENCE:
            print("Early stopping."); break

if best_state: model.load_state_dict(best_state)
torch.save(model.state_dict(), tcn_dir/"tcn_model.pt")

# Save per-user validation predictions
u_val = pd.read_csv(tcn_dir/"users_val.csv")["user_id"].astype(str).values
p_val = np.load(tcn_dir/"tcn_val_proba.npy")
out = pd.DataFrame({"user_id": u_val, "proba_tcn": p_val, "y_true": y_val})
out.to_csv(tcn_dir/"user_scores_90d_tcn.csv", index=False)
print("Saved →", tcn_dir/"user_scores_90d_tcn.csv")


# STEP 7G — stack TCN + FT + N2V (user-level) and report metrics


In [None]:
# STEP 7G — stack TCN + FT + N2V (user-level) and report metrics

import numpy as np, pandas as pd
from pathlib import Path
from sklearn.metrics import roc_auc_score, average_precision_score
from sklearn.linear_model import LogisticRegression

def approx_any(mean_p, n):
    mean_p = np.clip(pd.Series(mean_p, dtype="float64").values, 0, 1)
    n = np.maximum(pd.Series(n).fillna(1).astype(int).values, 1)
    return 1.0 - np.power(1.0 - mean_p, n)

# ---- Inputs ----
tcn = pd.read_csv("/content/tcn90/user_scores_90d_tcn.csv")     # user_id, proba_tcn, y_true
ft  = pd.read_csv("/content/ft90/user_scores_90d_ft.csv")       # user_id, proba_mean, trips
n2v_path_cal = Path("/content/user_scores_90d_n2v_cal.csv")
n2v_path_raw = Path("/content/user_scores_90d_n2v.csv")
if n2v_path_cal.exists():
    n2v = pd.read_csv(n2v_path_cal)
elif n2v_path_raw.exists():
    n2v = pd.read_csv(n2v_path_raw)
else:
    n2v = None

# ---- Build per-user "any incident" probabilities for FT / N2V ----
ft["user_id"] = ft["user_id"].astype(str)
ft["p_ft"] = approx_any(ft["proba_mean"], ft["trips"])

if n2v is not None:
    n2v["user_id"] = n2v["user_id"].astype(str)
    src_p = "proba_mean" if "proba_mean" in n2v.columns else ("proba" if "proba" in n2v.columns else None)
    src_n = "trips" if "trips" in n2v.columns else None
    if src_p is None:
        n2v = pd.DataFrame({"user_id": [], "p_n2v": []})
    else:
        if src_n is None:
            n2v["p_n2v"] = np.clip(n2v[src_p].astype(float), 0, 1)
        else:
            n2v["p_n2v"] = approx_any(n2v[src_p], n2v[src_n])
        n2v = n2v[["user_id","p_n2v"]]
else:
    n2v = pd.DataFrame({"user_id": [], "p_n2v": []})

# ---- Merge on validation users (from TCN file) ----
tcn["user_id"] = tcn["user_id"].astype(str)
df = tcn.merge(ft[["user_id","p_ft"]], on="user_id", how="left") \
        .merge(n2v, on="user_id", how="left")
df[["p_ft","p_n2v"]] = df[["p_ft","p_n2v"]].fillna(0.0).clip(0,1)

legs = ["proba_tcn"]
if df["p_ft"].notna().any(): legs.append("p_ft")
if "p_n2v" in df.columns and df["p_n2v"].notna().any(): legs.append("p_n2v")

# ---- Metrics per leg (if both classes exist) ----
y = df["y_true"].to_numpy(dtype=int)
multi_class = (np.unique(y).size == 2)

def report(name, p):
    au = float(roc_auc_score(y, p)) if multi_class else float("nan")
    pr = float(average_precision_score(y, p)) if multi_class else float("nan")
    print(f"{name:<12} AUROC={au:.6f}  PR_AUC={pr:.6f}")
    return au, pr

print("User-level metrics (validation users):")
for c in legs:
    report(c, df[c].to_numpy(float))

# ---- Logistic stack (if ≥2 legs AND both classes exist) ----
p_stack = None
if len(legs) >= 2 and multi_class:
    X = df[legs].to_numpy(float)
    clf = LogisticRegression(solver="liblinear", class_weight="balanced",
                             C=0.5, max_iter=200, random_state=42)
    clf.fit(X, y)
    p_stack = clf.predict_proba(X)[:,1]
    report("stack(" + "+".join(legs) + ")", p_stack)

# ---- Rank-average (robust when calibrations differ) ----
arr = df[legs].to_numpy(float)
ranks = np.argsort(np.argsort(-arr, axis=0), axis=0).astype(float)
ranks = 1.0 - (ranks / (len(df)-1))
p_rankavg = ranks.mean(axis=1)
report("rankavg", p_rankavg)

# ---- Save ----
out = df[["user_id","y_true"] + legs].copy()
if p_stack is not None:
    out["p_stack"] = p_stack
out["p_rankavg"] = p_rankavg
out_path = Path("/content/tcn90/user_scores_90d_stack_tcn_ft_n2v.csv")
out.to_csv(out_path, index=False)
print("Saved →", out_path)


# STEP 7H-ROBUST — ΔTPR with min-support + Laplace-smoothed TPR




In [None]:
# STEP 7H-ROBUST — ΔTPR with min-support + Laplace-smoothed TPR

import numpy as np, pandas as pd, json
from pathlib import Path
from sklearn.metrics import f1_score

STACK  = Path("/content/tcn90/user_scores_90d_stack_tcn_ft_n2v.csv")
MERGED = Path("/content/merged_trip_details_0.parquet")
assert STACK.exists() and MERGED.exists(), "Run 7G and Step 2 first."

# ---- 1) pick score column (prefer p_stack) ----
scores = pd.read_csv(STACK)
scores["user_id"] = scores["user_id"].astype(str)
score_col = next(c for c in ["p_stack","p_rankavg","proba_tcn"] if c in scores.columns)

# ---- 2) validation weeks from 7F ----
with open("/content/tcn90/label_info.json") as f:
    info = json.load(f)
val_weeks = pd.to_datetime(info.get("val_weeks", []))

# ---- 3) choose grouping column & build majority group per user from MERGED ----
group_col = "route_type"   # change to "payment_method"/"currency"/"day_of_week" if you prefer
need_cols = ["user_id", group_col, "trip_start_time", "trip_date"]
avail = pd.read_parquet(MERGED, engine="pyarrow").columns
use = [c for c in need_cols if c in avail]
m = pd.read_parquet(MERGED, engine="pyarrow", columns=use)
m["user_id"] = m["user_id"].astype("string")

# timestamp → week filter
if "trip_start_time" in m.columns:
    ts = pd.to_datetime(m["trip_start_time"], errors="coerce")
elif "trip_date" in m.columns:
    ts = pd.to_datetime(m["trip_date"], errors="coerce")
else:
    ts = None
if ts is not None:
    m["week_start"] = ts.dt.to_period("W-MON").apply(lambda p: p.start_time)
    if len(val_weeks) > 0:
        m = m[m["week_start"].isin(val_weeks)]

if group_col not in m.columns:
    m[group_col] = "Unknown"
m[group_col] = m[group_col].astype("string").fillna("Unknown")

# majority group per user
mode_df = (m.groupby("user_id", observed=True)[group_col]
             .agg(lambda s: s.dropna().mode().iloc[0] if len(s.dropna()) else "Unknown")
             .rename("group").reset_index())
mode_df["user_id"] = mode_df["user_id"].astype(str)
fair = scores.merge(mode_df, on="user_id", how="left")
fair["group"] = fair["group"].fillna("Unknown").astype(str)

# ---- 4) global threshold chosen to maximize F1 ----
y = fair["y_true"].to_numpy(int)
p = fair[score_col].to_numpy(float)
grid = np.linspace(0.05, 0.95, 19)
thr  = grid[np.argmax([f1_score(y, (p>=t).astype(int), zero_division=0) for t in grid])]

# ---- 5) robust per-group TPRs: min-support + Laplace smoothing ----
MIN_USERS = 200   # tune if needed
MIN_POS   = 20

rows = []
for g, sub in fair.groupby("group"):
    yg   = sub["y_true"].to_numpy(int)
    pred = (sub[score_col].to_numpy(float) >= thr).astype(int)
    n    = len(sub)
    pos  = int((yg==1).sum())
    tp   = int(((pred==1) & (yg==1)).sum())
    # raw TPR and Laplace-smoothed TPR
    tpr_raw = (tp/pos) if pos>0 else np.nan
    tpr_sm  = (tp + 1) / (pos + 2) if pos>0 else np.nan
    rows.append({"group": g, "n_users": n, "positives": pos, "TPR_raw": tpr_raw, "TPR_sm": tpr_sm})
res = pd.DataFrame(rows).sort_values("n_users", ascending=False).reset_index(drop=True)

supported = res[(res["n_users"]>=MIN_USERS) & (res["positives"]>=MIN_POS)].copy()
delta_raw = float(supported["TPR_raw"].max() - supported["TPR_raw"].min()) if supported["TPR_raw"].notna().any() else float("nan")
delta_sm  = float(supported["TPR_sm"].max()  - supported["TPR_sm"].min())  if supported["TPR_sm"].notna().any()  else float("nan")

out = Path(f"/content/tcn90/fairness_{group_col}_delta_tpr_robust.csv")
res.to_csv(out, index=False)
print(f"Using score: {score_col}, threshold={float(thr)}")
print(f"Min-support: users>={MIN_USERS}, positives>={MIN_POS}")
print("ΔTPR_raw (supported):", delta_raw)
print("ΔTPR_sm  (supported):",  delta_sm)
print("Saved →", out)
print(res.head(10).to_string(index=False))


# STEP 7H-EO-THRESH-ROBUST — Equal Opportunity by route_type with min-support + per-group thresholds


In [None]:
# STEP 7H-EO-THRESH-ROBUST — Equal Opportunity by route_type with min-support + per-group thresholds

import numpy as np, pandas as pd, json
from pathlib import Path
from sklearn.metrics import f1_score

STACK  = Path("/content/tcn90/user_scores_90d_stack_tcn_ft_n2v.csv")
MERGED = Path("/content/merged_trip_details_0.parquet")
assert STACK.exists() and MERGED.exists(), "Need 7G stack + merged parquet."

# 1) Load stacked scores and pick score column (prefer p_stack)
scores = pd.read_csv(STACK)
scores["user_id"] = scores["user_id"].astype(str)
score_col = next(c for c in ["p_stack","p_rankavg","proba_tcn"] if c in scores.columns)

# 2) Validation weeks (align fairness window to your TCN val horizon)
with open("/content/tcn90/label_info.json") as f:
    info = json.load(f)
val_weeks = pd.to_datetime(info.get("val_weeks", []))

# 3) Build route_type per user from MERGED (majority during validation weeks)
need = ["user_id","route_type","trip_start_time","trip_date"]
avail = pd.read_parquet(MERGED, engine="pyarrow").columns
use = [c for c in need if c in avail]
m = pd.read_parquet(MERGED, engine="pyarrow", columns=use)
m["user_id"] = m["user_id"].astype("string")

# timestamp → week filter
if "trip_start_time" in m.columns:
    ts = pd.to_datetime(m["trip_start_time"], errors="coerce")
elif "trip_date" in m.columns:
    ts = pd.to_datetime(m["trip_date"], errors="coerce")
else:
    ts = None
if ts is not None:
    # strip tz before to_period (avoid warning)
    if getattr(ts.dtype, "tz", None) is not None:
        ts = ts.dt.tz_localize(None)
    m["week_start"] = ts.dt.to_period("W-MON").apply(lambda p: p.start_time)
    if len(val_weeks) > 0:
        m = m[m["week_start"].isin(val_weeks)]

if "route_type" not in m.columns:
    m["route_type"] = "Unknown"
m["route_type"] = m["route_type"].astype("string").fillna("Unknown")

mode_df = (m.groupby("user_id", observed=True)["route_type"]
             .agg(lambda s: s.dropna().mode().iloc[0] if len(s.dropna()) else "Unknown")
             .rename("group").reset_index())
mode_df["user_id"] = mode_df["user_id"].astype(str)

fair = scores.merge(mode_df, on="user_id", how="left").fillna({"group":"Unknown"})
y = fair["y_true"].to_numpy(int)
p = fair[score_col].to_numpy(float)

# 4) Global threshold (F1-opt on all users)
grid = np.linspace(0.05, 0.95, 19)
thr_global = grid[np.argmax([f1_score(y, (p>=t).astype(int), zero_division=0) for t in grid])]

# 5) Collapse undersized groups into 'Other' (min support)
MIN_USERS, MIN_POS = 200, 20
stats0 = []
for g, sub in fair.groupby("group"):
    yg = sub["y_true"].to_numpy(int)
    stats0.append({"group": g, "n": len(sub), "pos": int((yg==1).sum())})
stats0 = pd.DataFrame(stats0)
small = set(stats0[(stats0["n"]<MIN_USERS) | (stats0["pos"]<MIN_POS)]["group"].tolist())
fair["group"] = fair["group"].apply(lambda g: "Other" if g in small else g)

# 6) Target TPR = median TPR among supported groups at global threshold
def tpr_at(y, pred):
    pos = int((y==1).sum())
    return (int(((pred==1)&(y==1)).sum())/pos) if pos>0 else np.nan

stats = []
for g, sub in fair.groupby("group"):
    yg = sub["y_true"].to_numpy(int); pg = sub[score_col].to_numpy(float)
    stats.append({"group": g, "n": len(sub), "pos": int((yg==1).sum()),
                  "tpr_global": tpr_at(yg, (pg>=thr_global).astype(int))})
stats = pd.DataFrame(stats).sort_values("n", ascending=False)
supported = stats[(stats["n"]>=MIN_USERS) & (stats["pos"]>=MIN_POS)].copy()
target_tpr = float(np.nanmedian(supported["tpr_global"])) if supported["tpr_global"].notna().any() else float("nan")

# 7) Per-group threshold to match target TPR (tie-break by in-group F1)
def best_thr_for_group(yg, pg, target, grid, thr_fallback):
    if np.isnan(target) or len(yg)==0 or yg.sum()==0:
        return thr_fallback
    best_t, best_gap, best_f1 = thr_fallback, 1e9, -1
    for t in grid:
        pred = (pg>=t).astype(int)
        pos = int((yg==1).sum())
        tpr = (int(((pred==1)&(yg==1)).sum())/pos) if pos>0 else np.nan
        gap = abs((tpr if not np.isnan(tpr) else 0) - target)
        f1  = f1_score(yg, pred, zero_division=0)
        if gap < best_gap - 1e-6 or (abs(gap-best_gap)<=1e-6 and f1>best_f1):
            best_t, best_gap, best_f1 = t, gap, f1
    return best_t

thr_map = {}
for g, sub in fair.groupby("group"):
    yg = sub["y_true"].to_numpy(int); pg = sub[score_col].to_numpy(float)
    thr_map[g] = best_thr_for_group(yg, pg, target_tpr, grid, thr_global)

# 8) Apply EO thresholds and measure ΔTPR after equalization
pred_equal = np.zeros(len(fair), dtype=int)
for g, sub in fair.groupby("group"):
    t = thr_map[g]
    idx = sub.index
    pred_equal[idx] = (sub[score_col].to_numpy(float) >= t).astype(int)

rows = []
for g, sub in fair.groupby("group"):
    yg = sub["y_true"].to_numpy(int); pe = pred_equal[sub.index]
    pos = int((yg==1).sum())
    tpr = (int(((pe==1)&(yg==1)).sum())/pos) if pos>0 else np.nan
    rows.append({"group": g, "n": len(sub), "pos": pos, "TPR_after_EO": tpr, "thr": thr_map[g]})
eq = pd.DataFrame(rows).sort_values("n", ascending=False)
delta_after = float(eq["TPR_after_EO"].max() - eq["TPR_after_EO"].min()) if eq["TPR_after_EO"].notna().any() else float("nan")

# 9) Save artifacts
Path("/content/tcn90").mkdir(parents=True, exist_ok=True)
pd.DataFrame({"group": list(thr_map.keys()), "threshold": list(thr_map.values())}) \
  .to_csv("/content/tcn90/eo_thresholds_by_group.csv", index=False)
out = fair[["user_id","group",score_col,"y_true"]].copy()
out["pred_equalized"] = pred_equal
out.to_csv("/content/tcn90/user_equalized_preds.csv", index=False)

print("Global thr:", float(thr_global), "| target TPR:", target_tpr)
print("ΔTPR after EO (max–min):", delta_after)
print("Saved → /content/tcn90/eo_thresholds_by_group.csv")
print("Saved → /content/tcn90/user_equalized_preds.csv")
print(eq.head(10).to_string(index=False))


# STEP 7H-EO-THRESH-ROBUST — Equal Opportunity by route_type with min-support + per-group thresholds


In [None]:
# STEP 7H-EO-THRESH-ROBUST — Equal Opportunity by route_type with min-support + per-group thresholds

import numpy as np, pandas as pd, json
from pathlib import Path
from sklearn.metrics import f1_score

STACK  = Path("/content/tcn90/user_scores_90d_stack_tcn_ft_n2v.csv")
MERGED = Path("/content/merged_trip_details_0.parquet")
assert STACK.exists() and MERGED.exists(), "Need 7G stack + merged parquet."

# 1) Load stacked scores and pick score column (prefer p_stack)
scores = pd.read_csv(STACK)
scores["user_id"] = scores["user_id"].astype(str)
score_col = next(c for c in ["p_stack","p_rankavg","proba_tcn"] if c in scores.columns)

# 2) Validation weeks (align fairness window to your TCN val horizon)
with open("/content/tcn90/label_info.json") as f:
    info = json.load(f)
val_weeks = pd.to_datetime(info.get("val_weeks", []))

# 3) Build route_type per user from MERGED (majority during validation weeks)
need = ["user_id","route_type","trip_start_time","trip_date"]
avail = pd.read_parquet(MERGED, engine="pyarrow").columns
use = [c for c in need if c in avail]
m = pd.read_parquet(MERGED, engine="pyarrow", columns=use)
m["user_id"] = m["user_id"].astype("string")

# timestamp → week filter
if "trip_start_time" in m.columns:
    ts = pd.to_datetime(m["trip_start_time"], errors="coerce")
elif "trip_date" in m.columns:
    ts = pd.to_datetime(m["trip_date"], errors="coerce")
else:
    ts = None
if ts is not None:
    # strip tz before to_period (avoid warning)
    if getattr(ts.dtype, "tz", None) is not None:
        ts = ts.dt.tz_localize(None)
    m["week_start"] = ts.dt.to_period("W-MON").apply(lambda p: p.start_time)
    if len(val_weeks) > 0:
        m = m[m["week_start"].isin(val_weeks)]

if "route_type" not in m.columns:
    m["route_type"] = "Unknown"
m["route_type"] = m["route_type"].astype("string").fillna("Unknown")

mode_df = (m.groupby("user_id", observed=True)["route_type"]
             .agg(lambda s: s.dropna().mode().iloc[0] if len(s.dropna()) else "Unknown")
             .rename("group").reset_index())
mode_df["user_id"] = mode_df["user_id"].astype(str)

fair = scores.merge(mode_df, on="user_id", how="left").fillna({"group":"Unknown"})
y = fair["y_true"].to_numpy(int)
p = fair[score_col].to_numpy(float)

# 4) Global threshold (F1-opt on all users)
grid = np.linspace(0.05, 0.95, 19)
thr_global = grid[np.argmax([f1_score(y, (p>=t).astype(int), zero_division=0) for t in grid])]

# 5) Collapse undersized groups into 'Other' (min support)
MIN_USERS, MIN_POS = 200, 20
stats0 = []
for g, sub in fair.groupby("group"):
    yg = sub["y_true"].to_numpy(int)
    stats0.append({"group": g, "n": len(sub), "pos": int((yg==1).sum())})
stats0 = pd.DataFrame(stats0)
small = set(stats0[(stats0["n"]<MIN_USERS) | (stats0["pos"]<MIN_POS)]["group"].tolist())
fair["group"] = fair["group"].apply(lambda g: "Other" if g in small else g)

# 6) Target TPR = median TPR among supported groups at global threshold
def tpr_at(y, pred):
    pos = int((y==1).sum())
    return (int(((pred==1)&(y==1)).sum())/pos) if pos>0 else np.nan

stats = []
for g, sub in fair.groupby("group"):
    yg = sub["y_true"].to_numpy(int); pg = sub[score_col].to_numpy(float)
    stats.append({"group": g, "n": len(sub), "pos": int((yg==1).sum()),
                  "tpr_global": tpr_at(yg, (pg>=thr_global).astype(int))})
stats = pd.DataFrame(stats).sort_values("n", ascending=False)
supported = stats[(stats["n"]>=MIN_USERS) & (stats["pos"]>=MIN_POS)].copy()
target_tpr = float(np.nanmedian(supported["tpr_global"])) if supported["tpr_global"].notna().any() else float("nan")

# 7) Per-group threshold to match target TPR (tie-break by in-group F1)
def best_thr_for_group(yg, pg, target, grid, thr_fallback):
    if np.isnan(target) or len(yg)==0 or yg.sum()==0:
        return thr_fallback
    best_t, best_gap, best_f1 = thr_fallback, 1e9, -1
    for t in grid:
        pred = (pg>=t).astype(int)
        pos = int((yg==1).sum())
        tpr = (int(((pred==1)&(yg==1)).sum())/pos) if pos>0 else np.nan
        gap = abs((tpr if not np.isnan(tpr) else 0) - target)
        f1  = f1_score(yg, pred, zero_division=0)
        if gap < best_gap - 1e-6 or (abs(gap-best_gap)<=1e-6 and f1>best_f1):
            best_t, best_gap, best_f1 = t, gap, f1
    return best_t

thr_map = {}
for g, sub in fair.groupby("group"):
    yg = sub["y_true"].to_numpy(int); pg = sub[score_col].to_numpy(float)
    thr_map[g] = best_thr_for_group(yg, pg, target_tpr, grid, thr_global)

# 8) Apply EO thresholds and measure ΔTPR after equalization
pred_equal = np.zeros(len(fair), dtype=int)
for g, sub in fair.groupby("group"):
    t = thr_map[g]
    idx = sub.index
    pred_equal[idx] = (sub[score_col].to_numpy(float) >= t).astype(int)

rows = []
for g, sub in fair.groupby("group"):
    yg = sub["y_true"].to_numpy(int); pe = pred_equal[sub.index]
    pos = int((yg==1).sum())
    tpr = (int(((pe==1)&(yg==1)).sum())/pos) if pos>0 else np.nan
    rows.append({"group": g, "n": len(sub), "pos": pos, "TPR_after_EO": tpr, "thr": thr_map[g]})
eq = pd.DataFrame(rows).sort_values("n", ascending=False)
delta_after = float(eq["TPR_after_EO"].max() - eq["TPR_after_EO"].min()) if eq["TPR_after_EO"].notna().any() else float("nan")

# 9) Save artifacts
Path("/content/tcn90").mkdir(parents=True, exist_ok=True)
pd.DataFrame({"group": list(thr_map.keys()), "threshold": list(thr_map.values())}) \
  .to_csv("/content/tcn90/eo_thresholds_by_group.csv", index=False)
out = fair[["user_id","group",score_col,"y_true"]].copy()
out["pred_equalized"] = pred_equal
out.to_csv("/content/tcn90/user_equalized_preds.csv", index=False)

print("Global thr:", float(thr_global), "| target TPR:", target_tpr)
print("ΔTPR after EO (max–min):", delta_after)
print("Saved → /content/tcn90/eo_thresholds_by_group.csv")
print("Saved → /content/tcn90/user_equalized_preds.csv")
print(eq.head(10).to_string(index=False))


# STEP 7D-ALT — data-driven stretch to 100–900 (no fixed buckets, preserves ranking)


In [None]:
# STEP 7D — Robust percentile scaling to Nova 350–900 (no edge spikes)

import os, numpy as np, pandas as pd

# 1) Pick source (prefers ensemble 90d)
candidates = [
    "/content/ensemble_user_scores_90d.csv",   # from Step 7C
    "/content/ft90/user_scores_90d_ft.csv",
    "/content/user_scores_90d_n2v_cal.csv",
]
path = next((p for p in candidates if os.path.exists(p)), None)
assert path is not None, "No score file found. Run Step 7C (ensemble) or earlier scoring first."

df = pd.read_csv(path)

# 2) Get one risk column (higher = riskier)
if "proba_blend" in df.columns:
    risk = pd.to_numeric(df["proba_blend"], errors="coerce").to_numpy()
elif "proba_mean" in df.columns:
    risk = pd.to_numeric(df["proba_mean"], errors="coerce").to_numpy()
elif "risk_proba" in df.columns:
    risk = pd.to_numeric(df["risk_proba"], errors="coerce").to_numpy()
else:
    raise ValueError("Need one of: proba_blend / proba_mean / risk_proba")

mask = np.isfinite(risk)
df = df.loc[mask].copy()
risk = risk[mask]

# 3) Robust percentile scaling (keeps real shape, avoids edge piles)
#    Only the central 98% of risk is mapped linearly; extreme 2% are clipped.
p_lo, p_hi = np.quantile(risk, [0.02, 0.98])
z = (risk - p_lo) / max(1e-9, (p_hi - p_lo))
z = np.clip(z, 0.0, 1.0)

# Optional gentle curvature (gamma ~ 1.0 keeps near-linear)
gamma = 1.05   # >1 compresses extremes slightly; set =1.0 for pure linear
z_adj = z**gamma

# 4) Map to Nova 350–900 (higher risk => lower score)
low, high = 350.0, 900.0
df["nova_score"] = (high - (high - low) * z_adj).astype("float32")

# 5) Decision bands on 350–900 scale
edges  = [350, 500, 600, 650, 700, 750, 800, 850, 900]
labels = ["<500","500–599","600–649","650–699","700–749","750–799","800–849","≥850"]
df["decision_band"] = pd.cut(df["nova_score"], bins=edges, right=False,
                             labels=labels, include_lowest=True)

# Keep useful columns
keep = ["user_id","nova_score","decision_band"]
for extra in ["trips","risk_proba","proba_blend","proba_mean"]:
    if extra in df.columns: keep.append(extra)

OUT = "/content/nova_scores_90d_final_scaled_350_900.csv"
df[keep].to_csv(OUT, index=False)
print(f"Saved → {OUT}")
print(f"Robust bounds used: p_lo={p_lo:.4f}, p_hi={p_hi:.4f}  (gamma={gamma})")
print(df.head(10).to_string(index=False))


# STEP 7D-DIST-FIX — robust band distribution for 90d (uses stretched if present)


In [None]:
# STEP 7D-DIST-FIX (350–900) — robust band distribution

import numpy as np, pandas as pd, os

candidates = [
    "/content/nova_scores_90d_final_scaled_350_900.csv",  # ← new file
    "/content/nova_scores_90d_final_widespread.csv",
    "/content/nova_scores_90d_final.csv",
]
OUT = next((p for p in candidates if os.path.exists(p)), None)
assert OUT is not None, "Run Step 7D first."

df = pd.read_csv(OUT)
scores = pd.to_numeric(df["nova_score"], errors="coerce").dropna().to_numpy()

edges  = [350, 500, 600, 650, 700, 750, 800, 850, 900]
labels = ["<500","500–599","600–649","650–699","700–749","750–799","800–849","≥850"]

counts, _ = np.histogram(scores, bins=edges)
dist = pd.DataFrame({"nova_score": labels, "count": counts})
dist["percent"] = (dist["count"] / max(1, dist["count"].sum()) * 100).round(2)

print("Distribution (Nova 350–900):")
print(dist.to_string(index=False))
dist.to_csv("/content/nova_scores_90d_band_dist.csv", index=False)
print("\nSaved → /content/nova_scores_90d_band_dist.csv")


# STEP 8A — Export final 90d outputs to Google Drive (with stretched 100–900 file)


In [None]:
# STEP 8A — Export final 90d outputs to Google Drive (Nova 350–900)

from google.colab import drive
drive.mount('/content/drive')

import os, shutil, glob, zipfile

OUT_DIR = "/content/drive/MyDrive/NovaScore_Outputs_90d"
os.makedirs(OUT_DIR, exist_ok=True)

CANDIDATES = [
    # Final decision files (prefer new 350–900)
    "/content/nova_scores_90d_final_scaled_350_900.csv",   # ← NEW
    "/content/nova_scores_90d_final_widespread.csv",
    "/content/nova_scores_90d_final.csv",
    "/content/nova_scores_90d_band_dist.csv",

    # User-level scores / previews
    "/content/ensemble_user_scores_90d.csv",
    "/content/ft90/user_scores_90d_ft.csv",
    "/content/user_scores_90d_n2v.csv",
    "/content/user_scores_90d_n2v_cal.csv",
    "/content/val_preds_90d_ft_sample.csv",
    "/content/val_preds_90d_n2v_sample.csv",

    # FT artifacts (90d)
    "/content/ft90/ft_model.pt",
    "/content/ft90/ft_val_proba.npy",
    "/content/ft90/val_ids.parquet",
    "/content/ft90/num_cols.json",
    "/content/ft90/mu.npy",
    "/content/ft90/sd.npy",
]

# Also include any extra shaped/stretched files if present
CANDIDATES += glob.glob("/content/*90d*shaped*.csv")
CANDIDATES += glob.glob("/content/*stretched*90d*.csv")

copied, missing = [], []
for src in CANDIDATES:
    if os.path.exists(src):
        if "/content/ft90/" in src:
            subdir = os.path.join(OUT_DIR, "ft90")
            os.makedirs(subdir, exist_ok=True)
            dst = os.path.join(subdir, os.path.basename(src))
        else:
            dst = os.path.join(OUT_DIR, os.path.basename(src))
        shutil.copy2(src, dst); copied.append(dst)
    else:
        missing.append(src)

# Optional: zip the entire ft90 folder too
FT_DIR = "/content/ft90"
if os.path.isdir(FT_DIR):
    zip_path = "/content/ft90_artifacts.zip"
    with zipfile.ZipFile(zip_path, "w", compression=zipfile.ZIP_DEFLATED) as zf:
        for root, _, files in os.walk(FT_DIR):
            for f in files:
                full = os.path.join(root, f)
                rel  = os.path.relpath(full, "/content")
                zf.write(full, rel)
    shutil.copy2(zip_path, os.path.join(OUT_DIR, os.path.basename(zip_path)))
    copied.append(os.path.join(OUT_DIR, os.path.basename(zip_path)))

print("✅ Export complete.")
print(f"Saved to Drive folder: {OUT_DIR}\n")
print("Files copied:")
for p in copied: print("  -", p)

if missing:
    print("\n(Skipped — not found):")
    for m in missing: print("  -", m)


# STEP 8B — Probe runtime + Drive for 90d outputs, zip what exists, and download


In [None]:
# STEP 8B — Probe runtime + Drive for 90d outputs, zip what exists, and download

import os, zipfile, fnmatch
from google.colab import drive, files

try:
    drive.mount('/content/drive')
except Exception:
    pass

TARGET_NAMES = [
    "nova_scores_90d_final_scaled_350_900.csv",  # ← preferred 350–900 file
    "nova_scores_90d_final.csv",
    "nova_scores_90d_final_widespread.csv",
    "nova_scores_90d_band_dist.csv",
    "ensemble_user_scores_90d.csv",
    "user_scores_90d_ft.csv",
    "user_scores_90d_n2v.csv",
    "user_scores_90d_n2v_cal.csv",
    "val_preds_90d_ft_sample.csv",
    "val_preds_90d_n2v_sample.csv",
    "ft_model.pt", "ft_val_proba.npy", "val_ids.parquet"
]

SEARCH_ROOTS = ["/content", "/content/drive/MyDrive"]
SKIP_DIRS = {".Trash", ".config", ".ipynb_checkpoints", ".cache"}

found = {}
def scan_for_targets(root):
    for dirpath, dirnames, filenames in os.walk(root):
        dirnames[:] = [d for d in dirnames if d not in SKIP_DIRS and not d.startswith(".")]
        for fn in filenames:
            if fn in TARGET_NAMES and fn not in found:
                p = os.path.join(dirpath, fn)
                try:
                    if os.path.getsize(p) > 0:
                        found[fn] = p
                except Exception:
                    pass

for r in SEARCH_ROOTS:
    if os.path.isdir(r):
        scan_for_targets(r)

if not found:
    print("⚠️ No exact-name matches; listing likely candidates...\n")
    candidates, PATTERNS = [], ["*scaled*350*900*csv", "*90d*csv", "*ensemble*90d*csv", "*val*_preds*90d*csv"]
    for r in SEARCH_ROOTS:
        for dirpath, dirnames, filenames in os.walk(r):
            dirnames[:] = [d for d in dirnames if d not in SKIP_DIRS and not d.startswith(".")]
            for fn in filenames:
                fpath = os.path.join(dirpath, fn)
                try:
                    if os.path.getsize(fpath) == 0: continue
                except Exception:
                    continue
                for pat in PATTERNS:
                    if fnmatch.fnmatch(fn.lower(), pat.lower()):
                        candidates.append(fpath); break
    for p in sorted(candidates)[:100]: print(" ", p)
else:
    # Prefer 90d FT artifacts if present
    for name, desired in {
        "ft_model.pt": "/content/ft90/ft_model.pt",
        "ft_val_proba.npy": "/content/ft90/ft_val_proba.npy",
    }.items():
        if os.path.exists(desired): found[name] = desired

    bundle = "/content/NovaScore_90d_results_bundle.zip"
    with zipfile.ZipFile(bundle, "w", compression=zipfile.ZIP_DEFLATED) as zf:
        for name, path in found.items():
            arcname = path.replace("/content/drive/MyDrive/", "MyDrive/").replace("/content/", "")
            zf.write(path, arcname)
    print("✅ Bundled files:")
    for k, v in found.items(): print(f"  - {k}: {v}")
    print("\nDownloading ZIP...")
    files.download(bundle)


# STEP 8C — Save Nova-score visuals (90d) to /content/plots_nova_90d


In [None]:
# STEP 8C — Save Nova-score visuals (90d) to /content/plots_nova_90d (Nova 350–900)

import os, numpy as np, pandas as pd
import matplotlib.pyplot as plt
import matplotlib as mpl
from datetime import datetime

candidates = [
    "/content/nova_scores_90d_final_scaled_350_900.csv",  # ← prefer this
    "/content/ensemble_user_scores_90d.csv",
    "/content/ft90/user_scores_90d_ft.csv",
    "/content/user_scores_90d_n2v_cal.csv",
    "/content/nova_scores_90d_final_widespread.csv",
    "/content/nova_scores_90d_final.csv",
]
PATH = next((p for p in candidates if os.path.exists(p)), None)
assert PATH, "No score file found."

df = pd.read_csv(PATH)
if "nova_score" not in df.columns:
    if "risk_proba" in df.columns:
        df["nova_score"] = 900 - 600 * pd.to_numeric(df["risk_proba"], errors="coerce")
    else:
        raise AssertionError("No 'nova_score' or 'risk_proba'.")
df["nova_score"] = pd.to_numeric(df["nova_score"], errors="coerce").clip(350, 900)
df = df.dropna(subset=["nova_score"]).copy()

bg, fg, grid_c, accent = "#f6fbf6", "#1f2937", "#e3eee3", "#2f855a"
mpl.rcParams.update({
    "figure.facecolor": bg, "axes.facecolor": bg, "axes.edgecolor": "none",
    "axes.labelcolor": fg, "xtick.color": fg, "ytick.color": fg,
    "grid.color": grid_c, "grid.linestyle": "-", "grid.linewidth": 0.8,
    "axes.grid": True, "axes.grid.axis": "y", "font.size": 11,
})
greens = mpl.cm.Greens

out_dir = "/content/plots_nova_90d"; os.makedirs(out_dir, exist_ok=True)
stamp = datetime.now().strftime("%Y%m%d-%H%M%S")

# Histogram
plt.figure(figsize=(9,4.8))
bins = np.linspace(df["nova_score"].min(), df["nova_score"].max(), 30)
counts, edges, patches = plt.hist(df["nova_score"].values, bins=bins, edgecolor="white", linewidth=0.6)
norm = mpl.colors.Normalize(vmin=max(1, counts.min() if len(counts) else 1), vmax=max(1, counts.max() if len(counts) else 1))
for c, p in zip(counts, patches): p.set_facecolor(greens(0.35 + 0.65 * norm(c)))
for thr in [500, 600, 650, 700, 750, 800, 850]: plt.axvline(thr, color=accent, linestyle="--", linewidth=1, alpha=0.35)
plt.title(f"Nova score distribution (90d, {len(df):,} users)", color=fg, pad=10)
plt.xlabel("Nova score"); plt.ylabel("Users")
plt.tight_layout()
hist_png = os.path.join(out_dir, f"nova_hist_90d_{stamp}.png")
hist_svg = os.path.join(out_dir, f"nova_hist_90d_{stamp}.svg")
plt.savefig(hist_png, dpi=220, bbox_inches="tight"); plt.savefig(hist_svg, dpi=220, bbox_inches="tight"); plt.show()

# Bands (350–900)
labels = ["<500","500–599","600–649","650–699","700–749","750–799","800–849","≥850"]
edges_fixed = [350, 500, 600, 650, 700, 750, 800, 850, 900]
counts_b, _ = np.histogram(df["nova_score"].to_numpy(dtype=float), bins=edges_fixed)
perc_b = (counts_b / max(1, counts_b.sum()) * 100).round(2)

plt.figure(figsize=(9,4.8))
colors = greens(np.linspace(0.45, 0.9, len(labels)))
bars = plt.bar(labels, counts_b, color=colors, edgecolor="white", linewidth=0.6)
for b, p in zip(bars, perc_b):
    y = b.get_height(); plt.text(b.get_x()+b.get_width()/2, y, f"{p:.1f}%", ha="center", va="bottom", fontsize=10, color=fg)
plt.title("Users per Nova band (90d)", color=fg, pad=10)
plt.xlabel("Band"); plt.ylabel("Users")
plt.tight_layout()
bands_png = os.path.join(out_dir, f"nova_bands_90d_{stamp}.png")
bands_svg = os.path.join(out_dir, f"nova_bands_90d_{stamp}.svg")
plt.savefig(bands_png, dpi=220, bbox_inches="tight"); plt.savefig(bands_svg, dpi=220, bbox_inches="tight"); plt.show()

# CDF
plt.figure(figsize=(9,4.8))
xs = np.sort(df["nova_score"].values); ys = np.linspace(0, 1, len(xs), endpoint=True)
plt.plot(xs, ys, linewidth=2, color=accent); plt.fill_between(xs, ys, alpha=0.08, color=accent)
for thr in [500, 600, 650, 700, 750, 800, 850]: plt.axvline(thr, color=accent, linestyle="--", linewidth=1, alpha=0.25)
plt.title("Cumulative distribution of Nova score (CDF, 90d)", color=fg, pad=10)
plt.xlabel("Nova score"); plt.ylabel("Cumulative share of users")
plt.tight_layout()
cdf_png = os.path.join(out_dir, f"nova_cdf_90d_{stamp}.png")
cdf_svg = os.path.join(out_dir, f"nova_cdf_90d_{stamp}.svg")
plt.savefig(cdf_png, dpi=220, bbox_inches="tight"); plt.savefig(cdf_svg, dpi=220, bbox_inches="tight"); plt.show()

print("Saved files:")
for p in [hist_png, hist_svg, bands_png, bands_svg, cdf_png, cdf_svg]:
    print(" -", p)


# STEP 8D — Zip the 90d plot images and download


In [None]:
# STEP 8D — Zip only the selected 90d plot images and download

import os, zipfile
from google.colab import files

FILES = [
    "/content/plots_nova_90d/nova_hist_90d_20250907-132817.png",
    "/content/plots_nova_90d/nova_hist_90d_20250907-132817.svg",
    "/content/plots_nova_90d/nova_bands_90d_20250907-132817.png",
    "/content/plots_nova_90d/nova_bands_90d_20250907-132817.svg",
    "/content/plots_nova_90d/nova_cdf_90d_20250907-132817.png",
    "/content/plots_nova_90d/nova_cdf_90d_20250907-132817.svg",
]

zip_path = "/content/plots_nova_90d_selected.zip"
if os.path.exists(zip_path):
    os.remove(zip_path)

with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as zf:
    for p in FILES:
        if os.path.exists(p) and os.path.getsize(p) > 0:
            zf.write(p, arcname=os.path.basename(p))
            print("✔ added:", p)
        else:
            print("⚠️  skipped (not found or empty):", p)

print("\nCreated:", zip_path)
files.download(zip_path)
