### **Hybrid screening system: 3-day screening as an example**

In [None]:
import pandas as pd
import numpy as np


# Convert time difference to days
long_3_df["time_difference_days"] = (
    pd.to_timedelta(long_3_df["time_difference"]).dt.total_seconds() / 86400
)


# mark very high temporal risk among High symptoms
rubric_df["is_veryhigh"] = (
    (rubric_df["symptom_risk_category"] == "High") &
    (rubric_df["stroke_risk_score"] >= 0.85)
)



In [None]:

long_3_df = long_3_df.merge(
    rubric_df[["symptom_id", "symptom_risk_category", "is_veryhigh"]],
    left_on="classifications",
    right_on="symptom_id",
    how="left"
)


In [None]:
import pandas as pd

def compute_3day_screening_features_full(
    df: pd.DataFrame,
    person_col: str = "user_id",
    days_col: str = "time_difference_days",
    category_col: str = "symptom_risk_category",
    veryhigh_col: str = "is_veryhigh",
    outcome_col: str = "stroke_event",
    window_days: int = 3,   # <-- 3-day window by default
) -> pd.DataFrame:
    """
    Build 3-day features for ALL person in df:
      - Counts of High / Moderate / Moderate-low / Low / VeryHigh in 0–3 days
      - Person-level stroke_event from the full df
      - 3-day screening alert using a simple conservative rule (editable)
    """

    # -----------------------------
    # 0) Person-level outcome table
    # -----------------------------
    if outcome_col not in df.columns:
        raise KeyError(f"'{outcome_col}' not found in df.")

    person_outcome = (
        df.groupby(person_col)[outcome_col]
          .max()
          .reset_index()
    )

    # -----------------------------
    # 1) Restrict to 0–3 day window (for counts only)
    # -----------------------------
    window = df[(df[days_col] >= 0) & (df[days_col] <= window_days)].copy()

    if window.empty:
        counts = person_outcome.copy()
        for col in [
            "High_count_3d", "Moderate_count_3d",
            "ModerateLow_count_3d", "Low_count_3d",
            "VeryHigh_count_3d", "total_symptoms_3d",
            "screening_alert_3d"
        ]:
            counts[col] = 0
        return counts

    # -----------------------------
    # 2) Category counts: (person, category) → count
    # -----------------------------
    cat_counts = (
        pd.crosstab(window[person_col], window[category_col])
          .rename_axis(index=person_col)
          .reset_index()
    )

    for cat in ["High", "Moderate", "Moderate-low", "Low"]:
        if cat not in cat_counts.columns:
            cat_counts[cat] = 0

    # -----------------------------
    # 3) Very-High counts
    # -----------------------------
    if veryhigh_col in window.columns:
        vh = (
            window[window[veryhigh_col] == True]
            .groupby(person_col)
            .size()
            .rename("VeryHigh_count_3d")
        )
        cat_counts = cat_counts.merge(vh, on=person_col, how="left")
        cat_counts["VeryHigh_count_3d"] = cat_counts["VeryHigh_count_3d"].fillna(0).astype(int)
    else:
        cat_counts["VeryHigh_count_3d"] = 0

    # -----------------------------
    # 4) Rename category columns
    # -----------------------------
    counts = cat_counts.rename(columns={
        "High": "High_count_3d",
        "Moderate": "Moderate_count_3d",
        "Moderate-low": "ModerateLow_count_3d",
        "Low": "Low_count_3d",
    })

    # -----------------------------
    # 5) Merge with ALL person (including no 0–3d messages)
    # -----------------------------
    full = person_outcome.merge(counts, on=person_col, how="left")

    for col in [
        "High_count_3d", "Moderate_count_3d",
        "ModerateLow_count_3d", "Low_count_3d",
        "VeryHigh_count_3d"
    ]:
        if col not in full.columns:
            full[col] = 0
        full[col] = full[col].fillna(0).astype(int)

    full["total_symptoms_3d"] = (
        full["High_count_3d"]
        + full["Moderate_count_3d"]
        + full["ModerateLow_count_3d"]
        + full["Low_count_3d"]
    )

    # -----------------------------
    # 6) Apply your 3-day rule (conservative; adjust if desired)
    # -----------------------------
    def rule_3d(row):
        vh = row["VeryHigh_count_3d"]
        h  = row["High_count_3d"]
        m  = row["Moderate_count_3d"]

        # Very-High Alert
        if vh >= 1:
            return "Very-High Alert (3d)"

        # High-Risk Alert (tight rule for short window)
        if (h >= 1) or ((h + m) >= 2):
            return "High-Risk Alert (3d)"

        # Moderate-Risk Alert
        if m >= 1:
            return "Moderate-Risk Alert (3d)"

        return "No Alert (3d)"

    full["screening_alert_3d"] = full.apply(rule_3d, axis=1)

    return full


In [None]:
eval_3_df = compute_3day_screening_features_full(long_3_df)

eval_3_df["stroke_event"].value_counts()


In [None]:
# Logistic regression rule

In [None]:
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from sklearn.metrics import roc_auc_score
import numpy as np
import pandas as pd

def fit_logistic_3day(eval_3_df: pd.DataFrame):
    """
    Fit a logistic regression model for 3-day stroke risk using
    symptom-risk counts and return:
      - fitted model
      - coefficient table (coef + odds ratios + intercept)
      - (X_test, y_test, y_prob) on the held-out set
      - ROC AUC on the test set
    """

    features_3 = [
        "VeryHigh_count_3d",
        "High_count_3d",
        "Moderate_count_3d",
        "ModerateLow_count_3d",
        "Low_count_3d",
    ]

    X = eval_3_df[features_3].astype(float).values
    y = eval_3_df["stroke_event"].astype(int).values

    # Check columns
    missing = [f for f in features_3 + ["stroke_event"] if f not in eval_3_df.columns]
    if missing:
        raise KeyError(f"Missing required columns in eval_3_df: {missing}")

    X = eval_3_df[features_3].astype(float).values
    y = eval_3_df["stroke_event"].astype(int).values

    X_train, X_test, y_train, y_test = train_test_split(
        X,
        y,
        test_size=0.3,
        stratify=y,
        random_state=42,
    )

    clf = LogisticRegression(max_iter=2000)
    clf.fit(X_train, y_train)

    y_prob = clf.predict_proba(X_test)[:, 1]
    auc_roc = roc_auc_score(y_test, y_prob)

    # Build coefficient table, including intercept
    coef_table = pd.DataFrame({
        "feature": ["intercept"] + features_3,
        "coef": np.concatenate([[clf.intercept_[0]], clf.coef_[0]]),
        "odds_ratio": np.exp(np.concatenate([[clf.intercept_[0]], clf.coef_[0]])),
    })

    return clf, coef_table, (X_test, y_test, y_prob), auc_roc


In [None]:
clf, coef_table, (X_test, y_test, y_prob), auc_roc = fit_logistic_3day(eval_3_df)

print(coef_table)
print("ROC AUC (3d):", auc_roc)


In [None]:
# Logistic regression derived threshold optimization

import numpy as np
import pandas as pd
from sklearn.metrics import roc_auc_score, average_precision_score

def add_logistic_score_3d(eval_3_df: pd.DataFrame) -> pd.DataFrame:
    df = eval_3_df.copy()

    for col in [
        "VeryHigh_count_3d",
        "High_count_3d",
        "Moderate_count_3d",
        "ModerateLow_count_3d",
        "Low_count_3d",
    ]:
        if col not in df.columns:
            df[col] = 0

    vh = df["VeryHigh_count_3d"].astype(float)
    h  = df["High_count_3d"].astype(float)
    m  = df["Moderate_count_3d"].astype(float)
    ml = df["ModerateLow_count_3d"].astype(float)
    l  = df["Low_count_3d"].astype(float)

    logit_3 = (
        a
        + b * vh
        + c * h
        + d * m
        + e * ml
        + f
        * l
    )
    p_3 = 1.0 / (1.0 + np.exp(-logit_3))

    df["logit_3"] = logit_3
    df["p_3"] = p_3
    return df




def threshold_scan(y_true, y_prob, thresholds=None, prevalence=0.10):
    """
    Return a DataFrame of tn/fp/fn/tp, sensitivity, specificity, precision, npv, f1, accuracy
    plus prevalence-adjusted PPV/NPV (Bayes) using a fixed prevalence.
    """
    if thresholds is None:
        thresholds = np.linspace(0, 1, 101)

    rows = []
    y_true = np.asarray(y_true).astype(int)
    y_prob = np.asarray(y_prob).astype(float)

    pi = float(prevalence)

    for thr in thresholds:
        y_hat = (y_prob >= thr).astype(int)

        tp = ((y_true == 1) & (y_hat == 1)).sum()
        tn = ((y_true == 0) & (y_hat == 0)).sum()
        fp = ((y_true == 0) & (y_hat == 1)).sum()
        fn = ((y_true == 1) & (y_hat == 0)).sum()

        sensitivity = tp / (tp + fn) if (tp + fn) > 0 else np.nan
        specificity = tn / (tn + fp) if (tn + fp) > 0 else np.nan
        precision   = tp / (tp + fp) if (tp + fp) > 0 else np.nan
        npv         = tn / (tn + fn) if (tn + fn) > 0 else np.nan

        # Bayes prevalence-adjusted PPV/NPV
        if np.isnan(sensitivity) or np.isnan(specificity):
            ppv_adj = np.nan
            npv_adj = np.nan
        else:
            denom_ppv = sensitivity * pi + (1 - specificity) * (1 - pi)
            ppv_adj = (sensitivity * pi) / denom_ppv if denom_ppv > 0 else np.nan

            denom_npv = (1 - sensitivity) * pi + specificity * (1 - pi)
            npv_adj = (specificity * (1 - pi)) / denom_npv if denom_npv > 0 else np.nan

        f1 = (
            2 * precision * sensitivity / (precision + sensitivity)
            if not np.isnan(precision) and not np.isnan(sensitivity) and (precision + sensitivity) > 0
            else np.nan
        )
        accuracy = (tp + tn) / (tp + tn + fp + fn) if (tp + tn + fp + fn) > 0 else np.nan

        rows.append({
            "tn": tn, "fp": fp, "fn": fn, "tp": tp,
            "sensitivity": sensitivity,
            "specificity": specificity,
            "precision": precision,
            "npv": npv,
            "ppv_adj_prev_0.10": ppv_adj,
            "npv_adj_prev_0.10": npv_adj,
            "f1": f1,
            "accuracy": accuracy,
            "threshold": thr,
        })

    return pd.DataFrame(rows)



# --- Run on your eval_3_df ---
eval_3_scored = add_logistic_score_3d(eval_3_df)

y_true_3 = eval_3_scored["stroke_event"].values
y_prob_3 = eval_3_scored["p_3"].values

thr_df_3 = threshold_scan(y_true_3, y_prob_3, prevalence=0.10)

# Example: best F1
best_f1_row_3 = thr_df_3.loc[thr_df_3["f1"].idxmax()]
print("Best-F1 3d threshold row:")
print(best_f1_row_3)



# AUCs for reporting
auc_roc_3 = roc_auc_score(y_true_3, y_prob_3)
auc_pr_3  = average_precision_score(y_true_3, y_prob_3)

print("ROC AUC (3d):", auc_roc_3)
print("PR AUC (3d):", auc_pr_3)


In [None]:
# choose a specificity constraint
SPEC_MIN = 0.90

# restrict to high-specificity operating points
thr_df_3_high_spec = thr_df_3[thr_df_3["specificity"] >= SPEC_MIN]

# guard against empty result
if thr_df_3_high_spec.empty:
    raise ValueError("No thresholds meet the specificity requirement.")

# best F1 under high specificity
best_f1_high_spec_3 = thr_df_3_high_spec.loc[
    thr_df_3_high_spec["f1"].idxmax()
]

print("Best F1 with specificity ≥", SPEC_MIN)
print(best_f1_high_spec_3)


In [None]:
# symptom rule

In [None]:
import numpy as np
import pandas as pd
from sklearn.metrics import confusion_matrix

def apply_symptom_rule_3d(
    df: pd.DataFrame,
    vh_thresh: int,
    high_thresh: int,
    mod_thresh: int,
    hm_combo_thresh: int,
) -> np.ndarray:
    """
    3-day symptom rule with tunable integer thresholds.

    Screen-positive if ANY are true:
      1) VeryHigh_count_3d >= vh_thresh
      2) High_count_3d >= high_thresh
      3) High_count_3d >= 1 AND Moderate_count_3d >= mod_thresh
      4) (High_count_3d + Moderate_count_3d) >= hm_combo_thresh
    """
    needed = ["VeryHigh_count_3d", "High_count_3d", "Moderate_count_3d"]
    missing = [c for c in needed if c not in df.columns]
    if missing:
        raise KeyError(f"Missing required columns for 3d rule: {missing}")

    vh = df["VeryHigh_count_3d"]
    h  = df["High_count_3d"]
    m  = df["Moderate_count_3d"]

    cond_vh = vh >= vh_thresh
    cond_h  = h >= high_thresh
    cond_hm_pair = (h >= 1) & (m >= mod_thresh)
    cond_hm_sum  = (h + m) >= hm_combo_thresh

    return (cond_vh | cond_h | cond_hm_pair | cond_hm_sum).astype(int).values


def grid_search_symptom_rule_3d(
    eval_3_df: pd.DataFrame,
    vh_values=[1],
    high_values=range(0, 4),
    mod_values=range(0, 4),
    hm_combo_values=range(1, 6),
    prevalence: float | None = 0.10,
) -> pd.DataFrame:
    """
    Grid search over thresholds for the 3-day symptom rule.
    Returns tn/fp/fn/tp + metrics (+ optional prevalence-adjusted PPV/NPV).
    """
    if "stroke_event" not in eval_3_df.columns:
        raise KeyError("Missing required column 'stroke_event' in eval_3_df.")

    y_true = eval_3_df["stroke_event"].astype(int).values
    pi = None if prevalence is None else float(prevalence)

    rows = []
    for vh_t in vh_values:
        for h_t in high_values:
            for m_t in mod_values:
                for hm_t in hm_combo_values:
                    y_pred = apply_symptom_rule_3d(
                        eval_3_df,
                        vh_thresh=vh_t,
                        high_thresh=h_t,
                        mod_thresh=m_t,
                        hm_combo_thresh=hm_t,
                    )

                    tn, fp, fn, tp = confusion_matrix(y_true, y_pred, labels=[0, 1]).ravel()

                    sens = tp / (tp + fn) if (tp + fn) > 0 else np.nan
                    spec = tn / (tn + fp) if (tn + fp) > 0 else np.nan
                    prec = tp / (tp + fp) if (tp + fp) > 0 else np.nan
                    npv  = tn / (tn + fn) if (tn + fn) > 0 else np.nan
                    f1   = (
                        2 * prec * sens / (prec + sens)
                        if not np.isnan(prec) and not np.isnan(sens) and (prec + sens) > 0
                        else np.nan
                    )
                    acc  = (tp + tn) / (tp + tn + fp + fn) if (tp + tn + fp + fn) > 0 else np.nan

                    out = {
                        "vh_thresh": vh_t,
                        "high_thresh": h_t,
                        "mod_thresh": m_t,
                        "hm_combo_thresh": hm_t,
                        "tn": tn, "fp": fp, "fn": fn, "tp": tp,
                        "sensitivity": sens,
                        "specificity": spec,
                        "precision": prec,
                        "npv": npv,
                        "f1": f1,
                        "accuracy": acc,
                    }

                    #  prevalence-adjusted PPV/NPV
                    if pi is not None and not (np.isnan(sens) or np.isnan(spec)):
                        denom_ppv = sens * pi + (1 - spec) * (1 - pi)
                        denom_npv = (1 - sens) * pi + spec * (1 - pi)
                        out["ppv_adj"] = (sens * pi) / denom_ppv if denom_ppv > 0 else np.nan
                        out["npv_adj"] = (spec * (1 - pi)) / denom_npv if denom_npv > 0 else np.nan
                        out["assumed_prevalence"] = pi
                    else:
                        out["ppv_adj"] = np.nan
                        out["npv_adj"] = np.nan
                        out["assumed_prevalence"] = pi

                    rows.append(out)

    return pd.DataFrame(rows)


In [None]:
grid_3d = grid_search_symptom_rule_3d(eval_3_df, prevalence=0.10)
best_3 = grid_3d.loc[grid_3d["f1"].idxmax()]
best_3


In [None]:
# a grid search for the optimal symptom based rule (3-day)

import pandas as pd
import numpy as np
from sklearn.metrics import confusion_matrix

def apply_symptom_rule_3d(
    df: pd.DataFrame,
    vh_thresh: int,
    high_thresh: int,
    mod_thresh: int,
    hm_combo_thresh: int,
) -> np.ndarray:
    vh = df["VeryHigh_count_3d"]
    h  = df["High_count_3d"]
    m  = df["Moderate_count_3d"]

    cond_vh = vh >= vh_thresh
    cond_h  = h >= high_thresh
    cond_hm_pair = (h >= 1) & (m >= mod_thresh)
    cond_hm_sum  = (h + m) >= hm_combo_thresh

    flag = cond_vh | cond_h | cond_hm_pair | cond_hm_sum
    return flag.astype(int).values


def grid_search_symptom_rule_3d(
    eval_3_df: pd.DataFrame,
    vh_values = [1],
    high_values = range(0, 4),
    mod_values = range(0, 4),
    hm_combo_values = range(1, 6),
    prevalence: float = 0.10,
) -> pd.DataFrame:
    """
    Returns a DataFrame with thresholds + tn/fp/fn/tp + metrics + F1
    plus prevalence-adjusted PPV/NPV using a fixed prevalence.
    """
    y_true = eval_3_df["stroke_event"].astype(int).values
    pi = float(prevalence)

    rows = []
    for vh_t in vh_values:
        for h_t in high_values:
            for m_t in mod_values:
                for hm_t in hm_combo_values:
                    y_pred = apply_symptom_rule_3d(
                        eval_3_df,
                        vh_thresh=vh_t,
                        high_thresh=h_t,
                        mod_thresh=m_t,
                        hm_combo_thresh=hm_t,
                    )

                    tn, fp, fn, tp = confusion_matrix(y_true, y_pred, labels=[0, 1]).ravel()

                    sens = tp / (tp + fn) if (tp + fn) > 0 else np.nan
                    spec = tn / (tn + fp) if (tn + fp) > 0 else np.nan
                    prec = tp / (tp + fp) if (tp + fp) > 0 else np.nan
                    npv  = tn / (tn + fn) if (tn + fn) > 0 else np.nan

                    # --- prevalence-adjusted PPV/NPV (Bayes) ---
                    if np.isnan(sens) or np.isnan(spec):
                        ppv_adj = np.nan
                        npv_adj = np.nan
                    else:
                        denom_ppv = sens * pi + (1 - spec) * (1 - pi)
                        ppv_adj = (sens * pi) / denom_ppv if denom_ppv > 0 else np.nan

                        denom_npv = (1 - sens) * pi + spec * (1 - pi)
                        npv_adj = (spec * (1 - pi)) / denom_npv if denom_npv > 0 else np.nan

                    f1 = (
                        2 * prec * sens / (prec + sens)
                        if not np.isnan(prec) and not np.isnan(sens) and (prec + sens) > 0
                        else np.nan
                    )
                    acc = (tp + tn) / (tp + tn + fp + fn) if (tp + tn + fp + fn) > 0 else np.nan

                    rows.append({
                        "vh_thresh": vh_t,
                        "high_thresh": h_t,
                        "mod_thresh": m_t,
                        "hm_combo_thresh": hm_t,
                        "tn": tn, "fp": fp, "fn": fn, "tp": tp,
                        "sensitivity": sens,
                        "specificity": spec,
                        "precision": prec,
                        "npv": npv,
                        "ppv_adj_prev_0.10": ppv_adj,
                        "npv_adj_prev_0.10": npv_adj,
                        "f1": f1,
                        "accuracy": acc,
                    })

    return pd.DataFrame(rows)


In [None]:
# Run grid search
grid_3d = grid_search_symptom_rule_3d(eval_3_df)

# Sort by F1 (descending)
grid_3d_sorted = grid_3d.sort_values("f1", ascending=False)

# Top 100 combinations by F1
grid_3d_sorted.head(100)

In [None]:
# high specificity

best_f1_high_spec = (
    grid_3d
    .query("specificity >= 0.9")
    .sort_values("f1", ascending=False)
    .iloc[0]
)

best_f1_high_spec


In [None]:
def symptom_rule_3d(
    vh_count: int,
    high_count: int,
    mod_count: int,
    vh_thresh: int,
    high_thresh: int,
    mod_thresh: int,
    hm_combo_thresh: int,
) -> int:
    """
    Row-wise 3-day symptom rule (used in two-stage screening).
    """

    vh = int(vh_count)
    h  = int(high_count)
    m  = int(mod_count)

    if vh >= vh_thresh:
        return 1
    if h >= high_thresh:
        return 1
    if h >= 1 and m >= mod_thresh:
        return 1
    if (h + m) >= hm_combo_thresh:
        return 1

    return 0


In [None]:
# apply cleaned OR hybrid rule

import numpy as np
import pandas as pd
from sklearn.metrics import confusion_matrix

def apply_3day_hybrid_rule(
    eval_3_scored: pd.DataFrame,
    prob_threshold: float,
    vh_thresh: int,
    high_thresh: int,
    mod_thresh: int,
    hm_combo_thresh: int,
) -> pd.DataFrame:

    df = eval_3_scored.copy()

    if "p_3" not in df.columns:
        raise KeyError("Missing 'p_3'. Run add_logistic_score_3d first (or create p_3).")

    # ensure numeric counts, NaN -> 0
    for c in ["VeryHigh_count_3d", "High_count_3d", "Moderate_count_3d"]:
        if c not in df.columns:
            df[c] = 0

    df[["VeryHigh_count_3d", "High_count_3d", "Moderate_count_3d"]] = (
        df[["VeryHigh_count_3d", "High_count_3d", "Moderate_count_3d"]]
          .apply(pd.to_numeric, errors="coerce")
          .fillna(0)
    )

    # logistic
    df["logistic_flag_3d"] = (df["p_3"].astype(float) >= float(prob_threshold)).astype(int)

    # symptom rule (row-wise)
    df["symptom_flag_3d"] = df.apply(
        lambda r: symptom_rule_3d(
            vh_count=r["VeryHigh_count_3d"],
            high_count=r["High_count_3d"],
            mod_count=r["Moderate_count_3d"],
            vh_thresh=vh_thresh,
            high_thresh=high_thresh,
            mod_thresh=mod_thresh,
            hm_combo_thresh=hm_combo_thresh,
        ),
        axis=1,
    ).astype(int)

    # OR hybrid
    df["hybrid_flag_3d"] = ((df["symptom_flag_3d"] == 1) | (df["logistic_flag_3d"] == 1)).astype(int)

    def label_3(r):
        if r["symptom_flag_3d"] == 1 and r["logistic_flag_3d"] == 1:
            return "High-Risk Alert (3d, symptom+logistic)"
        elif r["symptom_flag_3d"] == 1:
            return "High-Risk Alert (3d, symptom-rule)"
        elif r["logistic_flag_3d"] == 1:
            return "High-Risk Alert (3d, logistic)"
        else:
            return "No Alert (3d)"

    df["hybrid_alert_3d"] = df.apply(label_3, axis=1)
    return df



def compute_metrics(y_true, y_pred, prevalence=0.10):
    """
    Computes standard metrics + deployment-style (prevalence-adjusted) PPV/NPV.

    prevalence: assumed real-world prevalence π (e.g., 0.10 = 10%)
    """
    y_true = np.asarray(y_true).astype(int)
    y_pred = np.asarray(y_pred).astype(int)

    tn, fp, fn, tp = confusion_matrix(y_true, y_pred, labels=[0, 1]).ravel()
    n = tn + fp + fn + tp

    prev_emp = (tp + fn) / n if n > 0 else np.nan
    sens = tp / (tp + fn) if (tp + fn) > 0 else np.nan
    spec = tn / (tn + fp) if (tn + fp) > 0 else np.nan
    ppv  = tp / (tp + fp) if (tp + fp) > 0 else np.nan
    npv  = tn / (tn + fn) if (tn + fn) > 0 else np.nan
    f1   = (2 * tp) / (2 * tp + fp + fn) if (2 * tp + fp + fn) > 0 else np.nan
    acc  = (tp + tn) / n if n > 0 else np.nan

    # --- deployment-style PPV/NPV (Bayes with fixed prevalence π) ---
    pi = float(prevalence)
    if np.isnan(sens) or np.isnan(spec):
        ppv_adj = np.nan
        npv_adj = np.nan
    else:
        denom_ppv = sens * pi + (1 - spec) * (1 - pi)
        ppv_adj = (sens * pi) / denom_ppv if denom_ppv > 0 else np.nan

        denom_npv = (1 - sens) * pi + spec * (1 - pi)
        npv_adj = (spec * (1 - pi)) / denom_npv if denom_npv > 0 else np.nan

    return {
        "n": n,
        "prevalence_empirical": prev_emp,
        "tn": tn, "fp": fp, "fn": fn, "tp": tp,
        "sensitivity": sens,
        "specificity": spec,
        "precision": ppv,
        "npv": npv,
        "ppv_adj_prev_0.10": ppv_adj,
        "npv_adj_prev_0.10": npv_adj,
        "f1": f1,
        "accuracy": acc,
    }

def scan_hybrid_3d_thresholds(
    eval_3_scored: pd.DataFrame,
    vh_thresh: int,
    high_thresh: int,
    mod_thresh: int,
    hm_combo_thresh: int,
    thresholds=None,
    prevalence: float = 0.10,
) -> pd.DataFrame:

    if thresholds is None:
        thresholds = np.linspace(0, 1, 101)

    # compute symptom flag ONCE (speed)
    base = apply_3day_hybrid_rule(
        eval_3_scored,
        prob_threshold=0.0,
        vh_thresh=vh_thresh,
        high_thresh=high_thresh,
        mod_thresh=mod_thresh,
        hm_combo_thresh=hm_combo_thresh,
    )

    y_true = base["stroke_event"].astype(int).values
    rows = []

    for thr in thresholds:
        logistic_flag = (base["p_3"].astype(float) >= float(thr)).astype(int).values
        hybrid_flag = ((base["symptom_flag_3d"].values == 1) | (logistic_flag == 1)).astype(int)

        met = compute_metrics(y_true, hybrid_flag, prevalence=prevalence)
        met["threshold"] = thr
        rows.append(met)

    return pd.DataFrame(rows)


In [None]:
hyb_scan_3 = scan_hybrid_3d_thresholds(
    eval_3_scored,
    vh_thresh=vh_best,
    high_thresh=h_best,
    mod_thresh=m_best,
    hm_combo_thresh=hm_best,
)

hyb_scan_3.head()


In [None]:
# high specificity

screening_3 = (
    hyb_scan_3
    .query("specificity >= 0.9")
    .sort_values("sensitivity", ascending=False)
)

screening_3.head(5)


In [None]:
# alert burden

hyb_scan_3.assign(
    alert_rate=lambda d: (d["tp"] + d["fp"]) / d["n"]
)[["threshold","sensitivity","specificity","ppv_adj_prev_0.10","alert_rate"]].head(50)
