# S04 — Rule-based Detectors & Event Aggregation

Реализуем набор **детекторов безопасности** (WWO, ILS, UUT, HBR, HCT, GLD), 
агрегируем события по **edge/hex**, применяем **Empirical Bayes** сглаживание.

In [None]:
%run ./S00_setup.ipynb

In [None]:
# 1) Загрузка фич
pt = pd.read_parquet(POINT_FEATURES_PARQUET) if POINT_FEATURES_PARQUET.exists() else None
edge_df = pd.read_parquet(EDGE_FEATURES_PARQUET) if EDGE_FEATURES_PARQUET.exists() else None
hex_df = pd.read_parquet(HEX_FEATURES_PARQUET) if HEX_FEATURES_PARQUET.exists() else None
mm = pd.read_parquet(MATCHED_PARQUET) if MATCHED_PARQUET.exists() else None

print("Loaded point:", None if pt is None else pt.shape)
print("Loaded edge:", None if edge_df is None else edge_df.shape)
print("Loaded hex:", None if hex_df is None else hex_df.shape)

In [None]:
# 2) Объединение point + edge контекста (если есть)
if pt is not None and mm is not None:
    cols = ["u","v","key","highway","oneway","edge_bearing","dist2edge","dist2node","length"]
    pt2 = pt.drop(columns=cols, errors="ignore").join(mm[cols], how="left")
else:
    pt2 = pt.copy() if pt is not None else None

In [None]:
# 3) Детекторы на точечном уровне (reason flags)
if pt2 is not None:
    C = CONFIG
    # Wrong-Way (на односторонних дорогах)
    pt2["is_WWO"] = (pt2.get("oneway", False)==True) & (pt2["bearing_dev"] > 90.0) & (pt2.get("dist2edge",0) < 5.0) & (pt2["alt_residual"].abs() < C["ALT_RESID_THR"])
    # In-Lane Stop
    # приближенно возьмем freeflow на ребре из edge_df (мердж по u,v,key)
    if edge_df is not None and {"u","v","key"}.issubset(pt2.columns):
        join_cols = ["u","v","key"]
        pt2 = pt2.merge(edge_df[join_cols + ["freeflow"]], on=join_cols, how="left")
        pt2["is_ILS"] = (pt2["stop_flag"]) & (pt2["freeflow"] > C["FREEFLOW_EDGE_MS"]) & (pt2.get("dist2node",0) > C["JUNCTION_SAFE_M"])
    else:
        pt2["freeflow"] = np.nan
        pt2["is_ILS"] = False

    # Unsafe U-Turn — приближенно: большие отклонения bearing_dev в радиусе (используем dist2node>)
    pt2["is_UUT"] = (pt2.get("dist2node",0) > 30.0) & (pt2["bearing_dev"] > 140.0)

    # Harsh Braking — нужен сегмент/пара — если есть 'a_long' из S03, иначе используем суррогат
    pt2["is_HBR"] = False
    if "a_long" in pt2.columns:
        pt2["is_HBR"] = pt2["a_long"] < CONFIG["HBR_THR"]

    # Harsh Cornering Turn — если есть кривизна или приближённая поперечная
    pt2["is_HCT"] = (pt2["bearing_dev"]>45) & (pt2["spd_clip"] > 8.0)

    # Ghost-Lane Drift
    pt2["is_GLD"] = (pt2.get("dist2edge",0) > CONFIG["DIST2EDGE_OFFLANE_M"]) & (pt2["alt_residual"].abs() < C["ALT_RESID_THR"])
else:
    print("No point features available.")

In [None]:
# 4) Агрегация событий по edge/hex
def agg_events(df, key_cols):
    events = {}
    n = df.groupby(key_cols).size().rename("n_obs").to_frame()
    for reason in REASONS:
        flag = f"is_{reason}"
        if flag in df.columns:
            n[reason] = df.groupby(key_cols)[flag].sum()
        else:
            n[reason] = 0
    # k-анонимность: добавим unique IDs (если есть randomized_id)
    if "randomized_id" in df.columns:
        n["n_ids"] = df.groupby(key_cols)["randomized_id"].nunique()
    else:
        n["n_ids"] = np.nan
    n = n.reset_index()
    return n

events_edge = None
events_hex = None
if pt2 is not None and {"u","v","key"}.issubset(pt2.columns):
    events_edge = agg_events(pt2, ["u","v","key"])
    events_edge.to_parquet(EVENTS_EDGE_PARQUET, index=False)
    print("Saved:", EVENTS_EDGE_PARQUET)

if pt2 is not None and "h3" in pt2.columns and pt2["h3"].notna().any():
    events_hex = agg_events(pt2[pt2["h3"].notna()], ["h3"])
    events_hex.to_parquet(EVENTS_HEX_PARQUET, index=False)
    print("Saved:", EVENTS_HEX_PARQUET)

In [None]:
# 5) Empirical Bayes сглаживание
def eb_table(n_df, unit_key):
    rows = []
    for reason in REASONS:
        a,b = CONFIG["EB_PRIORS"][reason]
        k = n_df[reason].values
        n = n_df["n_obs"].values
        p = eb_posterior(k, n, a, b)
        rows.append(pd.DataFrame({unit_key: n_df[unit_key], "reason": reason, "k":k, "n":n, "p_hat":p}))
    out = pd.concat(rows, ignore_index=True)
    return out

if events_edge is not None:
    eb_edge = eb_table(events_edge, "u")
    # Используем комбинированный ключ; упростим: соединим ключи в строку
    eb_edge = eb_edge.assign(edge_key = events_edge["u"].astype(str)+"_"+events_edge["v"].astype(str)+"_"+events_edge["key"].astype(str))
    eb_edge.to_parquet(EB_EDGE_PARQUET, index=False)
    print("Saved:", EB_EDGE_PARQUET)

if events_hex is not None:
    eb_hex = eb_table(events_hex, "h3")
    eb_hex.to_parquet(EB_HEX_PARQUET, index=False)
    print("Saved:", EB_HEX_PARQUET)