In [4]:
import numpy as np
import pandas as pd
from numpy.linalg import cholesky
from sklearn.model_selection import train_test_split
from sklearn.metrics import r2_score
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import RandomForestRegressor
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.ensemble import HistGradientBoostingRegressor  # xgboost-ish
from sklearn.neural_network import MLPRegressor
from sklearn.linear_model import ElasticNet
import matplotlib.pyplot as plt

Matplotlib is building the font cache; this may take a moment.


In [5]:
# =========================================================
# 0. Global config
# =========================================================

RNG = np.random.default_rng(2025)

N_REPS_NULL = 3          # repetitions under H0 (N1/N2)
N_REPS_ALT = 3           # repetitions under H1 (I1/I2/I4/IMR)
N_PERM = 100              # permutations for p-value
ALPHA = 0.05

GRID_2D = 25             # 25x25 grid for (xj, xk)
GRID_1D = 50             # 50 pts for 1D PD/ALE

# # sample sizes to try (subset of pdf: {200, 1000, 5000})
N_LIST = [200]
# dimension setting p ∈ {0.2n, n} just to keep runtime ok
P_MODE = ["0.2n"]

# correlation levels
RHO_LIST = [0.3]

In [None]:
# =========================================================
# Custom Transformer: Fourier Features
# =========================================================
class FourierFeats(BaseEstimator, TransformerMixin):
    """
    Add sine/cosine basis features to help linear models capture
    nonlinear relationships like sin(pi * x) and cos(pi * x).
    Each input x_j becomes [x_j, 2*sin(pi*x_j), 2*cos(pi*x_j)].
    """
    def __init__(self, use_cos=True, scale=2.0):
        self.use_cos = use_cos
        self.scale = scale

    def fit(self, X, y=None):
        return self

    def transform(self, X):
        s = self.scale * np.sin(np.pi * X)
        if self.use_cos:
            c = self.scale * np.cos(np.pi * X)
            return np.hstack([X, s, c])
        return np.hstack([X, s])

In [None]:
# =========================================================
# Custom Transformer: Fourier Features
# =========================================================
class FourierFeats(BaseEstimator, TransformerMixin):
    """
    Add sine/cosine basis features to help linear models capture
    nonlinear relationships like sin(pi * x) and cos(pi * x).
    Each input x_j becomes [x_j, 2*sin(pi*x_j), 2*cos(pi*x_j)].
    """
    def __init__(self, use_cos=True, scale=2.0):
        self.use_cos = use_cos
        self.scale = scale

    def fit(self, X, y=None):
        return self

    def transform(self, X):
        s = self.scale * np.sin(np.pi * X)
        if self.use_cos:
            c = self.scale * np.cos(np.pi * X)
            return np.hstack([X, s, c])
        return np.hstack([X, s])

In [None]:
# =========================================================
# model factory
# =========================================================

def make_models(random_state=0):
    return {
        "Random Forest": RandomForestRegressor(
            n_estimators=800,
            max_depth=None,
            min_samples_leaf=1,
            random_state=random_state,
            n_jobs=-1
        ),
        "HistGB": HistGradientBoostingRegressor(
            max_depth=10,
            learning_rate=0.05,
            random_state=random_state
        ),
        "DNN": Pipeline([
            ("scaler", StandardScaler()),
            ("mlp", MLPRegressor(
                hidden_layer_sizes=(512, 256, 128),
                activation="tanh",
                solver="adam",
                learning_rate_init=0.002,
                batch_size=64, 
                max_iter=8000,
                early_stopping=True,
                validation_fraction=0.2,
                n_iter_no_change=50,
                tol=1e-6, 
                random_state=random_state))
        ]),
        "Elastic Net": Pipeline([
            ("scaler", StandardScaler()),
            ("fourier", FourierFeats(use_cos=True, scale=2.0)),
            ("enet", ElasticNet(
            alpha=1e-4,
            l1_ratio=0.3,
            max_iter=50000,
            random_state=random_state
            ))
        ]),
    }

In [None]:
# =========================================================
# Oracle R^2* and R-panel metrics
# =========================================================
def oracle_r2_star(f: np.ndarray, y: np.ndarray, eps: np.ndarray | None = None) -> float:
    """
    R2* = 1 - Var(eps)/Var(y); eps defaults to y - f.
    """
    if eps is None:
        eps = y - f
    vy = np.var(y)
    return np.nan if vy == 0 else 1.0 - (np.var(eps) / vy)


def _corr2(a: np.ndarray, b: np.ndarray) -> float:
    """
    Squared Pearson correlation with NaN-guard.
    """
    sa, sb = np.std(a), np.std(b)
    if sa == 0 or sb == 0:
        return np.nan
    return float(np.corrcoef(a, b)[0, 1] ** 2)


def r_panel_metrics(
    y_tr: np.ndarray,
    yhat_tr: np.ndarray,
    y_te: np.ndarray,
    yhat_te: np.ndarray,
    f_te: np.ndarray,
) -> dict:
    """
    Collect R^2 (train/test) and a few helpful correlation-based diagnostics.
    """
    return {
        "R2_train": r2_score(y_tr, yhat_tr),
        "R2_test": r2_score(y_te, yhat_te),
        "corr2_yhat_y": _corr2(yhat_te, y_te),
        "corr2_yhat_f": _corr2(yhat_te, f_te),
        "corr2_f_y": _corr2(f_te, y_te),
    }

In [None]:
# =========================================================
# 1. Utilities
# =========================================================

def make_block_corr_matrix(p, rho=0.3, block_size=10):
    """
    Build a block correlation matrix Sigma (p x p) with rho inside blocks,
    identity across blocks. PDF: "block correlation (block size 10–20), ρ in {0.1,...}" :contentReference[oaicite:8]{index=8}
    """
    Sigma = np.eye(p)
    for start in range(0, p, block_size):
        end = min(start + block_size, p)
        for i in range(start, end):
            for j in range(start, end):
                if i != j:
                    Sigma[i, j] = rho
    return Sigma

def sample_correlated_normal(n, p, rho=0.3):
    Sigma = make_block_corr_matrix(p, rho=rho)
    L = cholesky(Sigma)
    Z = RNG.standard_normal(size=(n, p))
    X = Z @ L.T
    return X

In [None]:
# =========================================================
# 2. DGPs
# =========================================================

def dgp_N1_additive(X, causal_ratio=0.10, max_causal=6):
    """
    Smooth additive, no interaction.
    Only a small causal subset has signal.
    This makes N1 learnable at moderate n.
    """
    n, p = X.shape
    c = min(max(2, int(np.ceil(causal_ratio * p))), max_causal)
    idx = np.arange(c)  # or RNG.choice(p, size=c, replace=False) if you want random
    a = RNG.uniform(1.0, 3.0, size=c)
    phi = 2 * np.sin(np.pi * X[:, idx])
    f = (a * phi).sum(axis=1)
    return f

def dgp_N2_mixed(X):
    # N2: x1 binary, x2 3-level categorical + smooth terms
    n, p = X.shape
    x1 = (X[:, 0] > 0).astype(float)
    # 3-level categorical from X[:,1]
    x2_raw = X[:, 1]
    x2 = np.digitize(x2_raw, np.quantile(x2_raw, [1/3, 2/3]))  # 0,1,2
    alpha1 = 1.0
    alpha2B = 0.5
    alpha2C = 0.8
    f = alpha1 * x1 \
        + alpha2B * (x2 == 1).astype(float) \
        + alpha2C * (x2 == 2).astype(float)
    if p > 2:
        a = RNG.uniform(0.3, 1.0, size=p-2)
        phi = 2 * np.sin(np.pi * X[:, 2:])
        f += (a * phi).sum(axis=1)
    return f

def dgp_I1_linear_times_linear(X, beta=1.0, alpha=1.0):
    # I1: f = α(x1 + x2) + β x1 x2
    x1 = X[:, 0]
    x2 = X[:, 1]
    f = alpha * (x1 + x2) + beta * (x1 * x2)
    return f

def dgp_I2_smooth_interaction(X, beta=1.0, alpha=1.0):
    # I2: f = α{sin(pi x1) + cos(pi x2)} + β sin(x1 x2)
    x1 = X[:, 0]
    x2 = X[:, 1]
    f = alpha * (np.sin(np.pi * x1) + np.cos(np.pi * x2)) + beta * np.sin(x1 * x2)
    return f

def dgp_I4_local_bump(X, beta=1.0, alpha=1.0):
    # I4: f = α(x1 + x2) + β exp{-3(x1^2 + x2^2)}
    x1 = X[:, 0]
    x2 = X[:, 1]
    f = alpha * (x1 + x2) + beta * np.exp(-3 * (x1**2 + x2**2))
    return f

def dgp_IMR_style(X, beta_ratio=0.5):
    """
    fm + fint, and scale to target IMR = R2_int / R2_m :contentReference[oaicite:5]{index=5}
    """
    x1 = X[:, 0]
    x2 = X[:, 1]
    f_main = np.sin(np.pi * x1) + np.cos(np.pi * x2)
    f_int = np.sin(x1 * x2)
    var_m = np.var(f_main)
    var_i = np.var(f_int)
    if var_m == 0 or var_i == 0 or beta_ratio == 0:
        return f_main
    scale = np.sqrt(beta_ratio * var_m / var_i)
    return f_main + scale * f_int

In [None]:
# =========================================================
# 3. Noise
# =========================================================

def add_noise_to_target(f, target_R2=0.8):
    var_f = np.var(f)
    if var_f <= 0:
        eps = RNG.normal(0, 1, len(f))
        return f + eps
    noise_var = var_f * (1 - target_R2) / max(target_R2, 1e-12)
    eps = RNG.normal(0, np.sqrt(noise_var), len(f))
    return f + eps

In [None]:
# =========================================================
# 4. PD: 1D & 2D
# =========================================================

def partial_dependence_2d(model, X_bg, j, k, grid_2d=25):
    """
    2D PD: f_hat_jk(xj, xk) = E_{X- (j,k)}[f(xj, xk, X_- (j,k))]
    We'll do brute-force: for each grid pair, replace j,k in background and average.
    """
    xj_vals = np.linspace(np.percentile(X_bg[:, j], 1),
                          np.percentile(X_bg[:, j], 99),
                          grid_2d)
    xk_vals = np.linspace(np.percentile(X_bg[:, k], 1),
                          np.percentile(X_bg[:, k], 99),
                          grid_2d)
    pd_surface = np.zeros((grid_2d, grid_2d))
    for a, xj in enumerate(xj_vals):
        for b, xk in enumerate(xk_vals):
            X_tmp = X_bg.copy()
            X_tmp[:, j] = xj
            X_tmp[:, k] = xk
            y_pred = model.predict(X_tmp)
            pd_surface[a, b] = y_pred.mean()
    return xj_vals, xk_vals, pd_surface

def partial_dependence_1d(model, X_bg, j, grid_1d=50):
    xj_vals = np.linspace(np.percentile(X_bg[:, j], 1),
                          np.percentile(X_bg[:, j], 99),
                          grid_1d)
    pd_1d = np.zeros(grid_1d)
    for a, xj in enumerate(xj_vals):
        X_tmp = X_bg.copy()
        X_tmp[:, j] = xj
        y_pred = model.predict(X_tmp)
        pd_1d[a] = y_pred.mean()
    return xj_vals, pd_1d

In [None]:
# =========================================================
# 5. ALE: 1D & 2D
# =========================================================

def ale_1d(model, X_bg, j, grid_1d=50):
    xj_sorted = np.sort(X_bg[:, j])
    z = np.quantile(xj_sorted, np.linspace(0, 1, grid_1d))
    effects = np.zeros(grid_1d - 1)
    for l in range(grid_1d - 1):
        lower, upper = z[l], z[l+1]
        if l < grid_1d - 2:
            mask = (X_bg[:, j] >= lower) & (X_bg[:, j] < upper)
        else:
            mask = (X_bg[:, j] >= lower) & (X_bg[:, j] <= upper)
        idx = np.where(mask)[0]
        if idx.size == 0:
            effects[l] = 0.0
            continue
        X_low = X_bg[idx, :].copy()
        X_up  = X_bg[idx, :].copy()
        X_low[:, j] = lower
        X_up[:, j]  = upper
        pred_low = model.predict(X_low)
        pred_up  = model.predict(X_up)
        effects[l] = (pred_up - pred_low).mean()
    ale = np.zeros(grid_1d)
    ale[1:] = np.cumsum(effects)
    ale = ale - ale.mean()
    return z, ale

def ale_2d(model, X_bg, j, k, grid_2d=25):
    zj = np.quantile(X_bg[:, j], np.linspace(0, 1, grid_2d))
    zk = np.quantile(X_bg[:, k], np.linspace(0, 1, grid_2d))
    ale_surf = np.zeros((grid_2d, grid_2d))
    for b in range(grid_2d - 1):
        k_low, k_up = zk[b], zk[b+1]
        if b < grid_2d - 2:
            mask_k = (X_bg[:, k] >= k_low) & (X_bg[:, k] < k_up)
        else:
            mask_k = (X_bg[:, k] >= k_low) & (X_bg[:, k] <= k_up)
        idx_k = np.where(mask_k)[0]
        if idx_k.size == 0:
            continue
        X_k = X_bg[idx_k, :]
        local_j = np.zeros(grid_2d - 1)
        for a in range(grid_2d - 1):
            j_low, j_up = zj[a], zj[a+1]
            if a < grid_2d - 2:
                mask_j = (X_k[:, j] >= j_low) & (X_k[:, j] < j_up)
            else:
                mask_j = (X_k[:, j] >= j_low) & (X_k[:, j] <= j_up)
            idx_j = np.where(mask_j)[0]
            if idx_j.size == 0:
                local_j[a] = 0.0
                continue
            X_low = X_k[idx_j, :].copy()
            X_up  = X_k[idx_j, :].copy()
            X_low[:, j] = j_low
            X_up[:, j]  = j_up
            pred_low = model.predict(X_low)
            pred_up  = model.predict(X_up)
            local_j[a] = (pred_up - pred_low).mean()
        ale_line = np.zeros(grid_2d)
        ale_line[1:] = np.cumsum(local_j)
        ale_line = ale_line - ale_line.mean()
        ale_surf[:, b] = ale_line
    ale_surf = ale_surf - ale_surf.mean()
    return zj, zk, ale_surf

In [None]:
# =========================================================
# 6. H-statistic
# =========================================================

def h_statistic_from_pd(pd_2d, pd_j, pd_k, fbar):
    """
    Implements formula:
    H^2_{jk} = sum[(f_jk - f_j - f_k + fbar)^2] / sum[(f_jk - fbar)^2]  in [0,1]
    """
    # need to broadcast 1D PDs to 2D
    # pd_2d: (A, B)
    A, B = pd_2d.shape
    pd_j_2d = np.repeat(pd_j.reshape(A, 1), B, axis=1)
    pd_k_2d = np.repeat(pd_k.reshape(1, B), A, axis=0)
    numer = (pd_2d - pd_j_2d - pd_k_2d + fbar)**2
    denom = (pd_2d - fbar)**2
    if denom.sum() == 0:
        return 0.0
    return float(numer.sum() / denom.sum())

def compute_H_for_pair(model, X_bg, j, k, grid_2d=25, grid_1d=50, method="pd"):
    # This function was incorrectly returning None because the main logic was inside an 'if' block that was skipped.
    # The grid_1d parameter is also effectively ignored for consistency with grid_2d in H-statistic calculation.
    # use model prediction average as fbar
    fbar = model.predict(X_bg).mean()
    if method == "pd":
        xj_vals, xk_vals, pd_2d = partial_dependence_2d(model, X_bg, j, k, grid_2d)
        _, pd_j = partial_dependence_1d(model, X_bg, j, grid_2d)  # use same length
        _, pd_k = partial_dependence_1d(model, X_bg, k, grid_2d)
        return h_statistic_from_pd(pd_2d, pd_j, pd_k, fbar)
    elif method == "ale":
        xj_vals, xk_vals, ale_2d_surf = ale_2d(model, X_bg, j, k, grid_2d)
        _, ale_j = ale_1d(model, X_bg, j, grid_2d)
        _, ale_k = ale_1d(model, X_bg, k, grid_2d)
        return h_statistic_from_pd(ale_2d_surf, ale_j, ale_k, fbar)
    else:
        raise ValueError("method must be 'pd' or 'ale'")

In [None]:
# =========================================================
# 7. Permutation-based p-value
# =========================================================

def permute_feature_within_quantile_bins(X_bg, k, n_bins=5, rng=None):
    if rng is None:
        rng = RNG
    X_new = X_bg.copy()
    xk = X_bg[:, k]
    qs = np.quantile(xk, np.linspace(0, 1, n_bins + 1))
    for b in range(n_bins):
        if b == n_bins - 1:
            mask = (xk >= qs[b]) & (xk <= qs[b+1])
        else:
            mask = (xk >= qs[b]) & (xk < qs[b+1])
        idx = np.where(mask)[0]
        if idx.size > 1:
            shuffled = idx.copy()
            rng.shuffle(shuffled)
            X_new[idx, k] = X_bg[shuffled, k]
    return X_new

def permutation_pvalue_for_pair(model, X_bg, j, k, observed_H,
                                B=100, n_bins=5, method="pd"):
    """
    Permute x_k within quantile bins of f_k(x_k) (approximation),
    recompute H each time, p = (1 + # perm >= obs) / (1 + B)
    """
    # simple version: permute feature k completely; you can improve to "within bins"
    H_perm = []
    for _ in range(B):
        X_perm = permute_feature_within_quantile_bins(X_bg, k, n_bins=n_bins, rng=RNG)
        H_b = compute_H_for_pair(model, X_perm, j, k, GRID_2D, GRID_1D, method=method)
        H_perm.append(H_b)
    H_perm = np.array(H_perm)
    pval = (1 + np.sum(H_perm >= observed_H)) / (1 + B)
    return pval, H_perm

In [None]:
# =========================================================
# 8. One experiment: fit models, output PD-H & ALE-H
# =========================================================

def run_one_experiment(X, y, pair=(0, 1), random_state=0):
    """
    Train all models on (X, y), compute H and permutation p-value for the target pair.
    Returns dict of: model -> (H, pval, r2)
    """
    X_train, X_tmp, y_train, y_tmp = train_test_split(
        X, y, test_size=0.4, random_state=random_state
    )
    X_val, X_test, y_val, y_test = train_test_split(
        X_tmp, y_tmp, test_size=0.5, random_state=random_state
    )

    models = make_models(random_state)
    results = {}

    # background sample for PD (m = min(2000, n)) per PDF
    m = min(2000, X_train.shape[0])
    bg_idx = RNG.choice(X_train.shape[0], size=m, replace=False)
    X_bg = X_train[bg_idx, :]

    for name, model in models.items():
        model.fit(X_train, y_train)
        y_pred = model.predict(X_test)
        r2 = r2_score(y_test, y_pred)

        # PD-H
        H_pd = compute_H_for_pair(model, X_bg, pair[0], pair[1],
                                  GRID_2D, GRID_1D, method="pd")
        pval_pd, _ = permutation_pvalue_for_pair(
            model, X_bg, pair[0], pair[1], H_pd,
            B=N_PERM, n_bins=5, method="pd"
        )

        # ALE-H
        H_ale = compute_H_for_pair(model, X_bg, pair[0], pair[1],
                                   GRID_2D, GRID_1D, method="ale")
        pval_ale, _ = permutation_pvalue_for_pair(
            model, X_bg, pair[0], pair[1], H_ale,
            B=N_PERM, n_bins=5, method="ale"
        )

        results[name] = {
            "H_pd": H_pd,
            "pval_pd": pval_pd,
            "H_ale": H_ale,
            "pval_ale": pval_ale,
            "r2": r2
        }

    return results

In [None]:
# =========================================================
# 9. Simulation: Type I (N1, N2)
# =========================================================

def simulate_type1():
    """
    Under H0 (no interaction), p-values should be ~Uniform(0,1),
    so Type-I error ≈ 0.05 at alpha=0.05.
    """
    records = []
    for n in N_LIST:
        for p_mode in P_MODE:
            p = max(5, int(0.2 * n)) if p_mode == "0.2n" else n
            for rho in RHO_LIST:
                for dgp_name in ["N1", "N2"]:
                    for rep in range(N_REPS_NULL):
                        X = sample_correlated_normal(n, p, rho=rho)
                        f = (
                            dgp_N1_additive(X)
                            if dgp_name == "N1"
                            else dgp_N2_mixed(X)
                        )
                        y = add_noise_to_target(f, target_R2=0.8)

                        res = run_one_experiment(X, y, pair=(0, 1), random_state=rep)
                        for model_name, d in res.items():
                            records.append({
                                "n": n,
                                "p": p,
                                "rho": rho,
                                "dgp": dgp_name,
                                "model": model_name,
                                "H_pd": d["H_pd"],
                                "pval_pd": d["pval_pd"],
                                "H_ale": d["H_ale"],
                                "pval_ale": d["pval_ale"],
                                "reject_pd": int(d["pval_pd"] < ALPHA),
                                "reject_ale": int(d["pval_ale"] < ALPHA),
                            })
    return pd.DataFrame(records)

In [None]:
# =========================================================
# 10. Simulation: Power (I1/I2/I4/IMR)
# =========================================================

def simulate_power():
    """
    Under H1 (interaction present), we want power = P(reject) to be high.
    We'll vary beta to create different interaction strengths.
    """
    records = []
    for n in N_LIST:
        for p_mode in P_MODE:
            p = max(5, int(0.2 * n)) if p_mode == "0.2n" else n
            for rho in RHO_LIST:
                for dgp_name in ["I1", "I2", "I4", "IMR"]:
                    for strength in [0.25, 0.5, 1.0]:
                        for rep in range(N_REPS_ALT):
                            X = sample_correlated_normal(n, p, rho=rho)
                            if dgp_name == "I1":
                                f = dgp_I1_linear_times_linear(X, beta=strength, alpha=1.0)
                            elif dgp_name == "I2":
                                f = dgp_I2_smooth_interaction(X, beta=strength, alpha=1.0)
                            elif dgp_name == "I4":
                                f = dgp_I4_local_bump(X, beta=strength, alpha=1.0)
                            else:  # IMR
                                f = dgp_IMR_style(X, beta_ratio=strength)

                            y = add_noise_to_target(f, target_R2=0.8)
                            res = run_one_experiment(X, y, pair=(0, 1), random_state=rep)

                            for model_name, d in res.items():
                                records.append({
                                    "n": n,
                                    "p": p,
                                    "rho": rho,
                                    "dgp": dgp_name,
                                    "strength": strength,
                                    "model": model_name,
                                    "H_pd": d["H_pd"],
                                    "pval_pd": d["pval_pd"],
                                    "H_ale": d["H_ale"],
                                    "pval_ale": d["pval_ale"],
                                    "reject_pd": int(d["pval_pd"] < ALPHA),
                                    "reject_ale": int(d["pval_ale"] < ALPHA),
                                })
    return pd.DataFrame(records)

In [None]:
# =========================================================
# 11. Summaries (PD & ALE)
# =========================================================

def make_type1_table(df_type1):
    """Aggregate rejection rates by DGP × model for PD and ALE."""
    tbl_pd = (
        df_type1.groupby(["dgp", "model"])["reject_pd"]
        .mean().reset_index().rename(columns={"reject_pd": "type1_pd"})
    )
    tbl_ale = (
        df_type1.groupby(["dgp", "model"])["reject_ale"]
        .mean().reset_index().rename(columns={"reject_ale": "type1_ale"})
    )
    return pd.merge(tbl_pd, tbl_ale, on=["dgp", "model"])


def make_power_table(df_power):
    """Aggregate power by DGP × strength × model for PD and ALE."""
    tbl_pd = (
        df_power.groupby(["dgp", "strength", "model"])["reject_pd"]
        .mean().reset_index().rename(columns={"reject_pd": "power_pd"})
    )
    tbl_ale = (
        df_power.groupby(["dgp", "strength", "model"])["reject_ale"]
        .mean().reset_index().rename(columns={"reject_ale": "power_ale"})
    )
    return pd.merge(tbl_pd, tbl_ale, on=["dgp", "strength", "model"])

In [None]:
# =========================================================
# Main: build the R^2 table
# =========================================================

def build_r2_table(
    n_list=(200,),
    p_mode="0.2n",
    rho_list=(0.1, 0.3),
    sims_per_setting=50,
    target_R2=0.8,
    dgp_grid=(("N1", 0.0), ("I1", 0.5), ("I1", 1.0)),  # (DGP, beta)
    seed=123,
) -> pd.DataFrame:
    """
    For each (n, p, rho, DGP, beta) and each simulation:
      1) Generate X, f, y (one y per simulation).
      2) Train/test split once per simulation.
      3) Fit multiple models on the same split and record:
         R2_train, R2_test, R2_star, and room = R2_star - R2_test.
    Return a long-form dataframe (one row per model per simulation).
    """
    rng = np.random.default_rng(seed)
    rows = []

    for n in n_list:
        p = 10
        # p = max(5, int(0.2 * n)) if p_mode == "0.2n" else n

        for rho in rho_list:
            for (dgp, beta) in dgp_grid:
                for sim in range(1, sims_per_setting + 1):
                    # one y per simulation
                    X = sample_correlated_normal(n, p, rho=rho)

                    if dgp == "N1":
                        f = dgp_N1_additive(X)
                    else:  # I1
                        f = dgp_I1_linear_times_linear(X, beta=beta, alpha=1.0)

                    y = add_noise_to_target(f, target_R2=0.8)

                    # single split per simulation
                    X_tr, X_te, y_tr, y_te, f_tr, f_te = train_test_split(
                        X, y, f, test_size=0.30, random_state=sim
                    )

                    R2_star = oracle_r2_star(f, y)  # should be close to target_R2

                    models = make_models(random_state=sim)
                    for name, mdl in models.items():
                        mdl.fit(X_tr, y_tr)
                        yhat_tr = mdl.predict(X_tr)
                        yhat_te = mdl.predict(X_te)

                        panel = r_panel_metrics(y_tr, yhat_tr, y_te, yhat_te, f_te)

                        rows.append(
                            {
                                "sim": sim,
                                "n": n,
                                "p": p,
                                "rho": rho,
                                "DGP": dgp,
                                "beta": beta,
                                "model": name,
                                "R2_star": R2_star,
                                **panel,
                                "room": (
                                    np.nan
                                    if np.isnan(R2_star)
                                    else max(0.0, R2_star - panel["R2_test"])
                                ),
                            }
                        )

    return pd.DataFrame(rows)
# -----------------------------
# Example run
# -----------------------------
if __name__ == "__main__":
    df_r2 = build_r2_table(
        n_list=(500,),
        rho_list=(0.1,),
        sims_per_setting=10,
        target_R2=0.8, 
        dgp_grid=(("N1", 0.0), ("I1", 0.5), ("I1", 1.0)),
        seed=2025,
    )

    # --- show first few raw results
    print("\n=== Raw R² table (first 10 rows) ===")
    print(df_r2.head(10).to_string(index=False))

    # --- show median & IQR summary
    summary = (
        df_r2.groupby(["DGP", "beta", "n", "rho", "model"])
        .agg(
            R2_test_median=("R2_test", "median"),
            R2_test_IQR_low=("R2_test", lambda x: np.nanpercentile(x, 25)),
            R2_test_IQR_high=("R2_test", lambda x: np.nanpercentile(x, 75)),
            R2_star_median=("R2_star", "median"),
            room_median=("room", "median"),
        )
        .reset_index()
    )

    print("\n=== Summary: median and IQR of R² across simulations ===")
    # limit output width for readability
    pd.set_option("display.width", 120)
    pd.set_option("display.max_columns", None)
    print(summary.to_string(index=False))
# ============================
# QUICK (Type I & Power)
# ============================

def main_quick():
    print("=== QUICK TEST: H0 (N1 only) ===")
    global N_PERM
    N_PERM = 5  # very few perms for debugging speed

    pair = (0, 1)
    n = 200
    p = max(5, int(0.2 * n))
    rho = 0.3

    # --- H0 (N1) ---
    X = sample_correlated_normal(n, p, rho=rho)
    f = dgp_N1_additive(X)
    y = add_noise_to_target(f, target_R2=0.8)
    res_null = run_one_experiment(X, y, pair=pair, random_state=0)

    rows = []
    for model_name, d in res_null.items():
        rows.append({
            "setting": "H0-N1",
            "model": model_name,
            "H_pd": d["H_pd"],
            "pval_pd": d["pval_pd"],
            "H_ale": d["H_ale"],
            "pval_ale": d["pval_ale"],
        })

    print("\n=== QUICK TEST: H1 (I1, beta=1.0) ===")
    # --- H1 (I1) ---
    X = sample_correlated_normal(n, p, rho=rho)
    f = dgp_I1_linear_times_linear(X, beta=1.0, alpha=1.0)
    y = add_noise_to_target(f, target_R2=0.8)
    res_alt = run_one_experiment(X, y, pair=pair, random_state=1)

    for model_name, d in res_alt.items():
        rows.append({
            "setting": "H1-I1-b1.0",
            "model": model_name,
            "H_pd": d["H_pd"],
            "pval_pd": d["pval_pd"],
            "H_ale": d["H_ale"],
            "pval_ale": d["pval_ale"],
        })

    df = pd.DataFrame(rows)
    print("\n--- QUICK RESULTS ---")
    print(df.to_string(index=False))


if __name__ == "__main__":
    main_quick()


=== Raw R² table (first 10 rows) ===
 sim   n  p  rho DGP  beta         model  R2_star  R2_train  R2_test  corr2_yhat_y  corr2_yhat_f  corr2_f_y     room
   1 500 10  0.1  N1   0.0 Random Forest 0.798643  0.953164 0.608986      0.628878      0.838780   0.786912 0.189657
   1 500 10  0.1  N1   0.0        HistGB 0.798643  0.900477 0.595067      0.613838      0.776611   0.786912 0.203576
   1 500 10  0.1  N1   0.0           DNN 0.798643  0.015606 0.014987      0.019495      0.023414   0.786912 0.783657
   1 500 10  0.1  N1   0.0   Elastic Net 0.798643  0.809765 0.755114      0.771832      0.980865   0.786912 0.043529
   2 500 10  0.1  N1   0.0 Random Forest 0.803708  0.942733 0.581013      0.599288      0.787128   0.800167 0.222696
   2 500 10  0.1  N1   0.0        HistGB 0.803708  0.897425 0.608444      0.614773      0.785939   0.800167 0.195264
   2 500 10  0.1  N1   0.0           DNN 0.803708  0.545615 0.078078      0.102004      0.131892   0.800167 0.725630
   2 500 10  0.1  N1   0.0