In [3]:
# Replication script for "LATE: Only Compliers Move" tests
# Jupyter-friendly, no CLI. Run cells top-to-bottom.
#
# This script reproduces the simulation tables in the paper:
#  - Test A: Uniform CDF identity (Cramér–von Mises T2)
#  - Test B: GMM-style moment test over indicator basis (identity weighting by default)
#
# It also saves CSVs
#  - late_lumpyITT_tests_simulations.csv
#  - late_lumpyITT_tests_simulations_additional.csv
#
# Notes:
#  - This block implements the RCT (no-covariates) design used for the main tables.
#  - CDF validity is enforced using isotonic regression and clipping to [0,1].
#  - Critical values are computed from null Monte Carlo and then used to assess size/power.
#  - For a robust covariance GMM J-test, see the optional bootstrap W estimator stub below.

#%pip install --upgrade --force-reinstall numpy pandas

import numpy as np
import pandas as pd
from dataclasses import dataclass
from typing import Tuple, Dict, List, Optional
from sklearn.isotonic import IsotonicRegression

# -----------------------
# Reproducibility
# -----------------------
GLOBAL_SEED = 20250910
rng_global = np.random.default_rng(GLOBAL_SEED)

# -----------------------
# DGP specification (RCT)
# -----------------------
@dataclass
class DGPSpec:
    n: int
    pC: float
    pA: float
    tau_mean: float = 0.5
    tau_sd: float = 0.4
    alt: str = "null"    # "null", "excl_small", "excl_big", "defiers_small", "defiers_big", "excl_all_small"
    gamma: float = 0.0   # magnitude for exclusion violation (direct effect)
    pD: float = 0.0      # defier share if needed

def simulate_dataset(spec: DGPSpec, rng: Optional[np.random.Generator]=None) -> Dict[str, np.ndarray]:
    """
    Principal-strata simulation with RCT assignment.
    Types: C (compliers), A (always), N (never), D (defiers, only in alts).
    Exclusion violation (excl_*): add gamma*Z to Y for noncompliers (or all types in excl_all_*).
    """
    if rng is None:
        rng = rng_global
    
    n = spec.n
    pC, pA = spec.pC, spec.pA
    pN = max(1.0 - pC - pA - spec.pD, 0.0)
    pD = spec.pD

    # Draw types
    probs = [pC, pA, pN, pD] if pD > 0 else [pC, pA, pN, 0.0]
    types = rng.choice(['C','A','N','D'], size=n, p=probs)

    # Assignment
    Z = rng.binomial(1, 0.5, size=n)

    # Potential treatment by type (monotonic unless defiers present)
    D0 = np.zeros(n)
    D1 = np.zeros(n)
    D0[types=='C'] = 0; D1[types=='C'] = 1
    D0[types=='A'] = 1; D1[types=='A'] = 1
    D0[types=='N'] = 0; D1[types=='N'] = 0
    D0[types=='D'] = 1; D1[types=='D'] = 0  # defiers violate monotonicity

    # Heterogeneous complier effects
    tau = rng.normal(spec.tau_mean, spec.tau_sd, size=n)
    # Baseline potential outcome
    Y0 = rng.normal(0.0, 1.0, size=n)
    Y1 = Y0 + tau

    # Observed treatment and outcome via exclusion (base)
    D = np.where(Z==1, D1, D0)
    Y = np.where(D==1, Y1, Y0)

    # Exclusion violation variants
    if spec.alt.startswith("excl"):
        if spec.alt == "excl_all_small":
            Y = Y + spec.gamma * Z
        else:
            # direct effect for noncompliers
            nonC_mask = (types!='C')
            Y = Y + spec.gamma * Z * nonC_mask.astype(float)

    return {"Z": Z.astype(int), "D": D.astype(int), "Y": Y.astype(float)}

# -----------------------
# CDF utilities
# -----------------------
def isotonic_cdf_enforce(grid: np.ndarray, values: np.ndarray) -> np.ndarray:
    """
    Enforce monotonicity and [0,1] range for a function measured on a sorted grid
    using isotonic regression, then clip to [0,1].
    """
    ir = IsotonicRegression(increasing=True, out_of_bounds="clip")
    fitted = ir.fit_transform(grid, values)
    return np.clip(fitted, 0.0, 1.0)

def compute_weighted_cdfs_rct(Z: np.ndarray, D: np.ndarray, Y: np.ndarray,
                              grid: Optional[np.ndarray]=None) -> Dict[str, np.ndarray]:
    """
    Compute empirical CDFs FY1, FY0 and Abadie-weighted complier CDFs F1C, F0C
    in an RCT without covariates. Enforce CDF validity via isotonic regression.
    """
    n = len(Y)
    # Grid
    if grid is None:
        qs = np.linspace(0.01, 0.99, 99)
        grid = np.quantile(Y, qs)
    grid = np.asarray(grid, dtype=float)
    # First-stage
    e = 0.5
    p1 = D[Z==1].mean() if (Z==1).any() else 0.0
    p0 = D[Z==0].mean() if (Z==0).any() else 0.0
    pC_hat = p1 - p0
    if pC_hat <= 1e-10:
        pC_hat = 1e-10

    # Empirical CDFs
    FY1 = np.array([ (Y[Z==1] <= t).mean() if (Z==1).any() else 0.0 for t in grid ])
    FY0 = np.array([ (Y[Z==0] <= t).mean() if (Z==0).any() else 0.0 for t in grid ])
    FY1 = isotonic_cdf_enforce(grid, FY1)
    FY0 = isotonic_cdf_enforce(grid, FY0)

    # Weights for complier marginals
    w1 = (Z/e) * (D - p0)            # targets Y(1)|C
    w0 = ((1-Z)/(1-e)) * (p1 - D)    # targets Y(0)|C

    # Weighted CDFs for compliers
    num1 = np.array([ ((Y<=t).astype(float) * w1).sum() for t in grid ])
    num0 = np.array([ ((Y<=t).astype(float) * w0).sum() for t in grid ])
    F1C = num1 / (pC_hat * n)
    F0C = num0 / (pC_hat * n)

    # Enforce CDF validity
    F1C = isotonic_cdf_enforce(grid, F1C)
    F0C = isotonic_cdf_enforce(grid, F0C)

    # Identity difference
    diff = (FY1 - FY0) - pC_hat*(F1C - F0C)

    return {
        "grid": grid, "FY1": FY1, "FY0": FY0,
        "F1C": F1C, "F0C": F0C, "diff": diff, "pC_hat": pC_hat
    }

# -----------------------
# Test statistics
# -----------------------
def stat_T2(diff: np.ndarray, grid: Optional[np.ndarray]=None) -> float:
    """
    Cramér–von Mises style integrated squared error over the grid.
    Uses an unweighted mean across grid points.
    """
    return float(np.mean(diff**2))

def stat_GMM_identity(Y: np.ndarray, Z: np.ndarray, stats: Dict[str,np.ndarray], J: int=10) -> float:
    """
    Moment vector m at J indicator cutpoints with identity weighting:
    m_j = [FY1(t_j) - FY0(t_j)] - pC_hat * [F1C(t_j) - F0C(t_j)]
    Returns ||m||^2. This matches the simple GMM used in the main replication.
    """
    qs = np.linspace(0.1, 0.9, J)
    tjs = np.quantile(Y, qs)
    grid = stats["grid"]
    # nearest indices on grid
    idx = np.searchsorted(grid, tjs, side="left")
    idx = np.clip(idx, 0, len(grid)-1)
    # empirical CDFs at t_j
    FY1_t = np.array([ (Y[Z==1] <= t).mean() for t in tjs ])
    FY0_t = np.array([ (Y[Z==0] <= t).mean() for t in tjs ])
    m = (FY1_t - FY0_t) - stats["pC_hat"] * (stats["F1C"][idx] - stats["F0C"][idx])
    return float(np.dot(m, m))

# -----------------------
# Simulation runner
# -----------------------
def run_null_draws(spec: DGPSpec, R: int=300, n_grid: int=101, rng: Optional[np.random.Generator]=None):
    T2_vals, TJ_vals = [], []
    if rng is None:
        rng = rng_global
    for r in range(R):
        dat = simulate_dataset(spec, rng)
        Z, D, Y = dat["Z"], dat["D"], dat["Y"]
        grid = np.quantile(Y, np.linspace(0.01,0.99,n_grid-2))
        stats = compute_weighted_cdfs_rct(Z,D,Y,grid=grid)
        T2_vals.append(stat_T2(stats["diff"], grid))
        TJ_vals.append(stat_GMM_identity(Y,Z,stats,J=10))
    return np.array(T2_vals), np.array(TJ_vals)

def run_eval_draws(spec: DGPSpec, crit_T2: float, crit_TJ: float,
                   R: int=250, n_grid: int=101, rng: Optional[np.random.Generator]=None):
    T2_vals, TJ_vals = [], []
    rejs_T2, rejs_TJ = 0, 0
    if rng is None:
        rng = rng_global
    for r in range(R):
        dat = simulate_dataset(spec, rng)
        Z, D, Y = dat["Z"], dat["D"], dat["Y"]
        grid = np.quantile(Y, np.linspace(0.01,0.99,n_grid-2))
        stats = compute_weighted_cdfs_rct(Z,D,Y,grid=grid)
        T2 = stat_T2(stats["diff"], grid)
        TJ = stat_GMM_identity(Y,Z,stats,J=10)
        T2_vals.append(T2); TJ_vals.append(TJ)
        rejs_T2 += int(T2 > crit_T2)
        rejs_TJ += int(TJ > crit_TJ)
    out = {
        "T2_mean": float(np.mean(T2_vals)),
        "T2_sd": float(np.std(T2_vals)),
        "T2_reject_5pct": float(rejs_T2 / R),
        "TJ_mean": float(np.mean(TJ_vals)),
        "TJ_sd": float(np.std(TJ_vals)),
        "TJ_reject_5pct": float(rejs_TJ / R),
    }
    return out

def replicate_main_tables() -> Tuple[pd.DataFrame, pd.DataFrame]:
    """
    Reproduce the main and additional tables from the paper.
    Returns two dataframes and saves them to CSV.
    """
    # Scenarios
    scenarios = [
        ("Null,strong,pC=0.3", DGPSpec(n=2000, pC=0.30, pA=0.10, alt="null")),
        ("Null,weak,pC=0.1",   DGPSpec(n=2000, pC=0.10, pA=0.10, alt="null")),
        ("Excl small (γ=0.2)", DGPSpec(n=2000, pC=0.30, pA=0.10, alt="excl_small", gamma=0.2)),
        ("Excl big (γ=0.5)",   DGPSpec(n=2000, pC=0.30, pA=0.10, alt="excl_big", gamma=0.5)),
        ("Defiers 5%",         DGPSpec(n=2000, pC=0.30, pA=0.10, alt="defiers_small", pD=0.05)),
        ("Defiers 10%",        DGPSpec(n=2000, pC=0.30, pA=0.10, alt="defiers_big", pD=0.10)),
    ]
    # Null criticals
    T2_null_strong, TJ_null_strong = run_null_draws(scenarios[0][1], R=400)
    T2_null_weak,   TJ_null_weak   = run_null_draws(scenarios[1][1], R=400)
    crit_T2_strong = float(np.quantile(T2_null_strong, 0.95))
    crit_TJ_strong = float(np.quantile(TJ_null_strong, 0.95))
    crit_T2_weak   = float(np.quantile(T2_null_weak,   0.95))
    crit_TJ_weak   = float(np.quantile(TJ_null_weak,   0.95))

    # Evaluate scenarios
    rows = []
    for name, spec in scenarios:
        if "weak" in name:
            crit_T2, crit_TJ = crit_T2_weak, crit_TJ_weak
        else:
            crit_T2, crit_TJ = crit_T2_strong, crit_TJ_strong
        res = run_eval_draws(spec, crit_T2, crit_TJ, R=250)
        rows.append({
            "Scenario": name,
            "n": spec.n, "pC": spec.pC, "pA": spec.pA, "Alt": spec.alt,
            **res
        })
    df_main = pd.DataFrame(rows)
    csv_main = "late_lumpyITT_tests_simulations.csv"
    df_main.to_csv(csv_main, index=False)

    # Additional sensitivity scenarios
    more = [
        ("Excl small (γ=0.2), n=1000", DGPSpec(n=1000, pC=0.30, pA=0.10, alt="excl_small", gamma=0.2)),
        ("Excl small (γ=0.2), n=4000", DGPSpec(n=4000, pC=0.30, pA=0.10, alt="excl_small", gamma=0.2)),
        ("Defiers 5%, pC=0.1",         DGPSpec(n=2000, pC=0.10, pA=0.10, alt="defiers_small", pD=0.05)),
        ("Defiers 5%, pC=0.5",         DGPSpec(n=2000, pC=0.50, pA=0.10, alt="defiers_small", pD=0.05)),
    ]
    rows2 = []
    for name, spec in more:
        # compute matching null criticals for each (n,pC)
        T2n, TJn = run_null_draws(DGPSpec(n=spec.n, pC=spec.pC, pA=spec.pA, alt="null"), R=300)
        crit_T2, crit_TJ = float(np.quantile(T2n,0.95)), float(np.quantile(TJn,0.95))
        res = run_eval_draws(spec, crit_T2, crit_TJ, R=250)
        rows2.append({
            "Scenario": name,
            "n": spec.n, "pC": spec.pC, "pA": spec.pA, "Alt": spec.alt,
            **res
        })
    df_more = pd.DataFrame(rows2)
    csv_more = "late_lumpyITT_tests_simulations_additional.csv"
    df_more.to_csv(csv_more, index=False)

    print("Saved:")
    print(f"- {csv_main}")
    print(f"- {csv_more}")
    return df_main, df_more

# -----------------------
# Optional: robust covariance GMM (bootstrap W)
# -----------------------
def gmm_bootstrap_covariance(Y: np.ndarray, Z: np.ndarray, stats: Dict[str,np.ndarray],
                             J: int=10, B: int=200, rng: Optional[np.random.Generator]=None) -> Tuple[np.ndarray, np.ndarray, float]:
    """
    Optional helper to estimate a covariance matrix for the GMM moment vector via a simple nonparametric bootstrap.
    Returns (m_vector, W_hat, T_J) where T_J = n m' W^{-1} m.
    This adds runtime. Not used in main replication tables.
    """
    if rng is None:
        rng = rng_global
    qs = np.linspace(0.1, 0.9, J)
    tjs = np.quantile(Y, qs)
    grid = stats["grid"]
    idx = np.searchsorted(grid, tjs, side="left")
    idx = np.clip(idx, 0, len(grid)-1)
    FY1_t = np.array([ (Y[Z==1] <= t).mean() for t in tjs ])
    FY0_t = np.array([ (Y[Z==0] <= t).mean() for t in tjs ])
    m = (FY1_t - FY0_t) - stats["pC_hat"] * (stats["F1C"][idx] - stats["F0C"][idx])
    # bootstrap covariance
    n = len(Y)
    Ms = []
    for b in range(B):
        idx_b = rng.integers(0, n, size=n)
        Yb, Zb = Y[idx_b], Z[idx_b]
        FY1_t_b = np.array([ (Yb[Zb==1] <= t).mean() for t in tjs ])
        FY0_t_b = np.array([ (Yb[Zb==0] <= t).mean() for t in tjs ])
        # reuse complier CDFs from full sample for speed; for more accuracy recompute stats on bootstrap sample
        mb = (FY1_t_b - FY0_t_b) - stats["pC_hat"] * (stats["F1C"][idx] - stats["F0C"][idx])
        Ms.append(mb)
    Ms = np.vstack(Ms)
    W = np.cov(Ms.T, bias=False)
    # Regularize if needed
    eps = 1e-10
    W += eps * np.eye(J)
    TJ = float(n * m.T @ np.linalg.inv(W) @ m)
    return m, W, TJ

# -----------------------
# Run a quick smoke test (lightweight) and then run the full replication
# -----------------------
if __name__ == "__main__":
    # Smoke test with fewer reps (adjust as needed)
    df_main, df_more = replicate_main_tables()
    print("\n=== Main table preview ===")
    print(df_main.to_string(index=False))
    print("\n=== Additional table preview ===")
    print(df_more.to_string(index=False))

Saved:
- late_lumpyITT_tests_simulations.csv
- late_lumpyITT_tests_simulations_additional.csv

=== Main table preview ===
          Scenario    n  pC  pA           Alt  T2_mean    T2_sd  T2_reject_5pct  TJ_mean    TJ_sd  TJ_reject_5pct
Null,strong,pC=0.3 2000 0.3 0.1          null 0.000583 0.000507           0.052 0.006407 0.005677           0.064
  Null,weak,pC=0.1 2000 0.1 0.1          null 0.000769 0.000708           0.064 0.008560 0.007940           0.068
Excl small (γ=0.2) 2000 0.3 0.1    excl_small 0.000733 0.000593           0.084 0.008009 0.006667           0.084
  Excl big (γ=0.5) 2000 0.3 0.1      excl_big 0.006999 0.002107           1.000 0.077788 0.023616           1.000
        Defiers 5% 2000 0.3 0.1 defiers_small 0.000877 0.000727           0.140 0.009674 0.008192           0.160
       Defiers 10% 2000 0.3 0.1   defiers_big 0.001403 0.000948           0.360 0.015509 0.010574           0.380

=== Additional table preview ===
                  Scenario    n  pC  pA       