In [1]:
%pip -q install cvxpy ecos
# If you plan to use method='cp' for exact one-sided CIs:
# %pip -q install scipy

[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/222.1 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [91m━━━━━━━━━━━━━━━━━━[0m[90m╺[0m[90m━━━━━━━━━━━━━━━━━━━━━[0m [32m102.4/222.1 kB[0m [31m3.2 MB/s[0m eta [36m0:00:01[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m222.1/222.1 kB[0m [31m2.8 MB/s[0m eta [36m0:00:00[0m
[?25h

In [2]:
# CHSH: S, certified lower CI, δ→ lower bound, and an LP upper bound (π = id)
# Colab tip: before running, install LP deps with:
#   %pip -q install cvxpy ecos

import numpy as np
from math import log, sqrt

try:
    import cvxpy as cp
    _HAS_CVXPY = True
except Exception:
    _HAS_CVXPY = False

try:
    from scipy.stats import beta as _beta_dist
    _HAS_SCIPY = True
except Exception:
    _HAS_SCIPY = False

CONTEXTS = ['00','01','10','11']              # +, +, +, -
SIGNS     = {'00': +1, '01': +1, '10': +1, '11': -1}
AB_VALUES = [-1, +1]                           # outcome alphabet

# ---------- Utilities ----------
def _to_pm1(x):
    # Accept {±1} or {0,1}. Raise on anything else.
    if x in (-1, +1): return int(x)
    if x in (0, 1):   return -1 if x == 0 else +1
    raise ValueError(f"Outcome must be in {{-1,+1}} or {{0,1}}; got {x!r}")

def raw_to_counts(raw):
    """
    raw: dict ctx -> list of (a,b), each in {±1} or {0,1}.
    Returns counts_dict: ctx -> 2x2 array with a_idx=0->-1,1->+1; same for b_idx.
    """
    counts = {}
    for c in CONTEXTS:
        pairs = raw.get(c, [])
        if len(pairs) == 0:
            print(f"[warn] Context {c} has 0 samples.")
        C = np.zeros((2,2), dtype=int)
        for a_raw, b_raw in pairs:
            a = _to_pm1(a_raw); b = _to_pm1(b_raw)
            ai = 0 if a == -1 else 1
            bi = 0 if b == -1 else 1
            C[ai, bi] += 1
        counts[c] = C
    return counts

def counts_to_probs(counts_dict):
    probs_dict, n_dict = {}, {}
    for c in CONTEXTS:
        C = np.array(counts_dict[c], dtype=np.float64)
        n = C.sum()
        if n <= 0:
            raise ValueError(f"Context {c} has zero total count.")
        probs_dict[c] = (C / n).astype(np.float64)
        n_dict[c] = int(n)
    return probs_dict, n_dict

def check_nonsignalling(probs_dict, tol=1e-6, verbose=True):
    """
    Quick no-signaling check: verify Alice's marginals P(a|x,y) don't depend on y, and Bob's don't depend on x.
    Returns max absolute deviation across all checks.
    """
    P = {c: np.array(probs_dict[c], dtype=float) for c in CONTEXTS}

    devs = []
    # Alice marginals
    for x in [0,1]:
        for a_idx in [0,1]:
            pa_y0 = P['00' if x==0 else '10'][a_idx,:].sum()
            pa_y1 = P['01' if x==0 else '11'][a_idx,:].sum()
            devs.append(abs(pa_y0 - pa_y1))
    # Bob marginals
    for y in [0,1]:
        for b_idx in [0,1]:
            pb_x0 = P['00' if y==0 else '01'][:,b_idx].sum()
            pb_x1 = P['10' if y==0 else '11'][:,b_idx].sum()
            devs.append(abs(pb_x0 - pb_x1))

    max_dev = float(max(devs)) if devs else 0.0

    # Optional diagnostics
    per_marginals = []
    for x in [0,1]:
        for a_idx in [0,1]:
            pa_y0 = P['00' if x==0 else '10'][a_idx,:].sum()
            pa_y1 = P['01' if x==0 else '11'][a_idx,:].sum()
            per_marginals.append((f"Alice x={x} a_idx={a_idx}", float(abs(pa_y0 - pa_y1))))
    for y in [0,1]:
        for b_idx in [0,1]:
            pb_x0 = P['00' if y==0 else '01'][:,b_idx].sum()
            pb_x1 = P['10' if y==0 else '11'][:,b_idx].sum()
            per_marginals.append((f"Bob y={y} b_idx={b_idx}", float(abs(pb_x0 - pb_x1))))

    if verbose:
        if max_dev > tol:
            print(f"[warn] Max no-signaling deviation = {max_dev:.3e} (tolerance {tol:.1e})")
            for label, dev in per_marginals:
                print(f"    {label}: deviation={dev:.3e}")
        else:
            print(f"[info] No-signaling check passed (max deviation {max_dev:.3e} ≤ {tol:.1e})")
    return max_dev

def ab_mean_from_counts(C):
    n   = C.sum()
    pos = C[0,0] + C[1,1]
    neg = C[0,1] + C[1,0]
    return (pos - neg) / n

def E_ab_from_probs(P):
    e = 0.0
    for ai, a in enumerate(AB_VALUES):
        for bi, b in enumerate(AB_VALUES):
            e += a * b * P[ai, bi]
    return e

def chsh_S_from_probs(probs_dict):
    E = {c: E_ab_from_probs(probs_dict[c]) for c in CONTEXTS}
    S = E['00'] + E['01'] + E['10'] - E['11']
    return S, E

# ---------- One-sided CIs ----------
def t_hoeffding(n, delta_per):
    # X in [-1,1] => one-sided Hoeffding: P(m-μ ≥ t) ≤ exp(-n t^2 / 2)
    return sqrt(2.0 * log(1.0 / delta_per) / n)

def t_empirical_bernstein(n, m, delta_per):
    """
    One-sided empirical Bernstein for X ∈ [-1,1], conservative constants:
      t = sqrt( 2 * s2 * log(2/δ) / n ) + (14/3) * log(2/δ) / (n - 1),
    where s2 is the unbiased sample variance. Falls back to Hoeffding if n <= 1.
    """
    if n <= 1:
        return t_hoeffding(n, delta_per)
    s2 = (n / (n - 1)) * (1.0 - m*m)
    t_val = sqrt(2.0 * s2 * log(2.0 / delta_per) / n) + (14.0 / 3.0) * log(2.0 / delta_per) / (n - 1)
    return min(1.0, t_val)

def cp_one_sided_bounds_for_mean_from_counts(C, delta_per, side):
    """
    Exact one-sided Clopper–Pearson bound for mean m = E[ab] with ab ∈ {±1}.
    'side' ∈ {'lower','upper'} for the mean m.
    Uses SciPy's Beta quantile. Raises if SciPy is unavailable.
    """
    if not _HAS_SCIPY:
        raise RuntimeError("SciPy not found; choose method='hoeffding' or 'bernstein', or install SciPy (Colab: %pip -q install scipy).")
    n = int(C.sum())
    if n == 0:
        raise ValueError("Zero n in Clopper–Pearson.")
    # ab ∈ {±1}. Let p = P(ab=+1). Then m = E[ab] = 2p - 1.
    pos = int(C[0,0] + C[1,1])  # count of events with ab=+1
    if side == 'lower':
        p_L = 0.0 if pos == 0 else _beta_dist.ppf(delta_per, pos, n - pos + 1)
        return 2.0 * p_L - 1.0
    elif side == 'upper':
        p_U = 1.0 if pos == n else _beta_dist.ppf(1.0 - delta_per, pos + 1, n - pos)
        return 2.0 * p_U - 1.0
    else:
        raise ValueError("side must be 'lower' or 'upper'")

def chsh_lower_confidence_bound(counts_dict, delta=0.05, method='hoeffding', min_n=1, verbose=True, multiple_correction='bonferroni'):
    """
    Returns: S_hat, S_low, E_hat_dict, t_dict
    - Union bound across 4 contexts (δ/4 each) with signs +,+,+,−.
    - method ∈ {'hoeffding','bernstein','cp'}  # 'cp' = exact Clopper–Pearson (one-sided, requires SciPy)
    - min_n: per-context minimum n (warns if small)
    - multiple_correction: 'bonferroni' or 'sidak'
    """
    probs, n = counts_to_probs(counts_dict)
    S_hat, E_hat = chsh_S_from_probs(probs)

    for c in CONTEXTS:
        if abs(E_hat[c]) > 1 + 1e-12:
            raise RuntimeError(f"E_hat[{c}]={E_hat[c]} out of range [-1,1].")
    if abs(S_hat) > 4 + 1e-12:
        raise RuntimeError(f"S_hat={S_hat} out of CHSH range [-4,4].")

    if multiple_correction == 'bonferroni':
        delta_per = float(delta) / 4.0
    elif multiple_correction == 'sidak':
        delta_per = 1.0 - (1.0 - float(delta))**(1.0/4.0)
    else:
        raise ValueError("multiple_correction must be 'bonferroni' or 'sidak'")
    if verbose:
        print(f"[info] δ_total={delta:.4g}, δ_per-context={delta_per:.4g} via {multiple_correction} (fixed sign pattern +,+,+,−)")
    for c in CONTEXTS:
        if n[c] < min_n:
            print(f"[warn] Context {c} has n={n[c]} < min_n={min_n}. CI will be very loose.")
        elif n[c] < 20:
            print(f"[note] Context {c} has n={n[c]} (<20). Consider collecting more samples for tighter CI.")

    t_dict, m_dict = {}, {}
    for c in CONTEXTS:
        C = np.array(counts_dict[c], dtype=float)
        m = ab_mean_from_counts(C)
        m_dict[c] = m
        if method == 'hoeffding':
            t_dict[c] = t_hoeffding(n[c], delta_per)
        elif method == 'bernstein':
            t_dict[c] = t_empirical_bernstein(n[c], m, delta_per)
        elif method == 'cp':
            if SIGNS[c] == +1:
                m_L = cp_one_sided_bounds_for_mean_from_counts(C, delta_per, side='lower')
                t_dict[c] = m - m_L
            else:
                m_U = cp_one_sided_bounds_for_mean_from_counts(C, delta_per, side='upper')
                t_dict[c] = m_U - m
        else:
            raise ValueError("method must be 'hoeffding', 'bernstein', or 'cp'")

    S_low = (
        (m_dict['00'] - t_dict['00'])
      + (m_dict['01'] - t_dict['01'])
      + (m_dict['10'] - t_dict['10'])
      - (m_dict['11'] + t_dict['11'])
    )

    return S_hat, S_low, E_hat, t_dict

def all_chsh_sign_patterns():
    """
    Return the 8 standard CHSH sign orientations:
    4 choices of which term is negative, times a global sign flip.
    Each pattern is a dict mapping context -> {+1,-1}.
    """
    patterns = []
    for neg in CONTEXTS:
        sp = {c: +1 for c in CONTEXTS}
        sp[neg] = -1
        patterns.append(sp)
    patterns += [{c: -v for c, v in sp.items()} for sp in patterns]
    return patterns

def chsh_lower_confidence_bound_scan_signs(counts_dict, delta=0.05, method='hoeffding', min_n=1, verbose=True, multiple_correction='bonferroni'):
    """
    Pick the CHSH sign pattern from the same data, with a proper ×8 multiple-testing correction.
    Returns: best_S_hat, S_low, E_hat_dict, t_dict, best_SIGNS (dict).
    """
    probs, n = counts_to_probs(counts_dict)
    E_hat = {c: E_ab_from_probs(probs[c]) for c in CONTEXTS}
    patterns = all_chsh_sign_patterns()

    best_S_hat, best_SIGNS = -1e18, None
    for SIG in patterns:
        S_hat = sum(SIG[c] * E_hat[c] for c in CONTEXTS)
        if S_hat > best_S_hat:
            best_S_hat, best_SIGNS = S_hat, SIG

    mtests = 4 * len(patterns)
    if multiple_correction == 'bonferroni':
        delta_per = float(delta) / mtests
    elif multiple_correction == 'sidak':
        delta_per = 1.0 - (1.0 - float(delta))**(1.0/mtests)
    else:
        raise ValueError("multiple_correction must be 'bonferroni' or 'sidak'")

    if verbose:
        print(f"[info] δ_total={delta:.4g}, δ_per-context={delta_per:.4g} across {mtests} tests (4 contexts × 8 patterns)")
        for c in CONTEXTS:
            if n[c] < min_n:
                print(f"[warn] Context {c} has n={n[c]} < min_n={min_n}. CI will be very loose.")
            elif n[c] < 20:
                print(f"[note] Context {c} has n={n[c]} (<20). Consider more samples.")

    t_dict, m_dict = {}, {}
    for c in CONTEXTS:
        C = np.array(counts_dict[c], dtype=float)
        m = ab_mean_from_counts(C)
        m_dict[c] = m
        if method == 'hoeffding':
            t = t_hoeffding(n[c], delta_per)
        elif method == 'bernstein':
            t = t_empirical_bernstein(n[c], m, delta_per)
        elif method == 'cp':
            if best_SIGNS[c] == +1:
                m_L = cp_one_sided_bounds_for_mean_from_counts(C, delta_per, side='lower')
                t = m - m_L
            else:
                m_U = cp_one_sided_bounds_for_mean_from_counts(C, delta_per, side='upper')
                t = m_U - m
        else:
            raise ValueError("method must be 'hoeffding', 'bernstein', or 'cp'")
        t_dict[c] = t

    S_low = sum((m_dict[c] - t_dict[c]) if best_SIGNS[c] == +1 else -(m_dict[c] + t_dict[c]) for c in CONTEXTS)
    return best_S_hat, S_low, E_hat, t_dict, best_SIGNS

def delta_min_lower_bound(S_low):
    # Certified δ→(K, Local) lower bound from CHSH
    return max(0.0, (S_low - 2.0) / 8.0)

# ---------- LP upper bound (π = id specialization) ----------
def enumerate_local_strategies():
    return [{'A0':A0,'A1':A1,'B0':B0,'B1':B1}
            for A0 in AB_VALUES for A1 in AB_VALUES
            for B0 in AB_VALUES for B1 in AB_VALUES]

def build_indicator_tensor(strategies):
    # M[ctx_idx, ab_idx, strat_idx] ∈ {0,1}
    ctx_pairs = [('0','0'), ('0','1'), ('1','0'), ('1','1')]
    ab_pairs  = [(-1,-1), (-1,+1), (+1,-1), (+1,+1)]
    M = np.zeros((4,4,len(strategies)), dtype=int)
    for ci, (xa, yb) in enumerate(ctx_pairs):
        for ai, (a,b) in enumerate(ab_pairs):
            for si, s in enumerate(strategies):
                Ax = s['A'+xa]; By = s['B'+yb]
                M[ci, ai, si] = 1 if (Ax == a and By == b) else 0
    return M

def deficiency_upper_bound_lp(probs_dict, solver='ECOS'):
    """
    Upper bound on δ→(K, Local) via π = id:
        δ→(K, Local) = inf_{L∈Local, π} sup_xy TV(K_xy, π L_xy)
    Fixing π = id yields the computable upper bound:
        inf_{L∈Local} sup_xy TV(K_xy, L_xy)
    """
    if not _HAS_CVXPY:
        raise RuntimeError("cvxpy not found. In Colab: %pip -q install cvxpy ecos")

    strategies = enumerate_local_strategies()
    M = build_indicator_tensor(strategies)
    lam = cp.Variable(len(strategies), nonneg=True)
    t   = cp.Variable((4,4),  nonneg=True)
    eta = cp.Variable(nonneg=True)

    ab_pairs = [(-1,-1), (-1,+1), (+1,-1), (+1,+1)]
    K = np.zeros((4,4), dtype=float)
    for ci, c in enumerate(CONTEXTS):
        for ai, (a,b) in enumerate(ab_pairs):
            a_idx = 0 if a == -1 else 1
            b_idx = 0 if b == -1 else 1
            K[ci, ai] = probs_dict[c][a_idx, b_idx]

    constraints = [cp.sum(lam) == 1]

    for ci in range(4):
        L_ci_ab = M[ci,:,:] @ lam  # shape (4,)
        for ai in range(4):
            constraints += [
                t[ci, ai] >= K[ci, ai] - L_ci_ab[ai],
                t[ci, ai] >= -(K[ci, ai] - L_ci_ab[ai]),
            ]
        constraints += [0.5 * cp.sum(t[ci, :]) <= eta]  # TV per context ≤ eta

    prob = cp.Problem(cp.Minimize(eta), constraints)
    prob.solve(solver=solver, verbose=False)
    if prob.status not in ("optimal", "optimal_inaccurate"):
        try:
            prob.solve(solver="SCS", verbose=False)
        except Exception:
            pass
    if prob.status not in ("optimal", "optimal_inaccurate"):
        raise RuntimeError(f"LP did not solve to optimality. Status: {prob.status}")
    return float(eta.value)

def deficiency_upper_bound_lp_with_pi(probs_dict, solver='ECOS'):
    """
    Lower bound via a single global joint post-processing π ∈ R^{4×4} (column-stochastic, shared across contexts):
        min η
        s.t.  for each context xy: (1/2) * || K_xy - π @ L_xy^{λ} ||_1 ≤ η
              λ ≥ 0, sum λ = 1
              π ≥ 0, colsum(π) = 1  (column-stochastic)
    Note: Because {π L | L ∈ Local, π column-stochastic} ⊇ Local, this optimizes over a superset of Local.
          Hence the optimum is a **lower bound** on δ→(K, Local) (optimistic). For certification, use the π = id upper bound.
    """
    if not _HAS_CVXPY:
        raise RuntimeError("cvxpy not found. In Colab: %pip -q install cvxpy ecos")

    strategies = enumerate_local_strategies()
    M = build_indicator_tensor(strategies)  # shape (4,4,16)

    lam = cp.Variable(len(strategies), nonneg=True)
    Pi  = cp.Variable((4,4), nonneg=True)   # column-stochastic post-processing
    t   = cp.Variable((4,4),  nonneg=True)
    eta = cp.Variable(nonneg=True)

    ab_pairs = [(-1,-1), (-1,+1), (+1,-1), (+1,+1)]
    K = np.zeros((4,4), dtype=float)
    for ci, c in enumerate(CONTEXTS):
        for ai, (a,b) in enumerate(ab_pairs):
            a_idx = 0 if a == -1 else 1
            b_idx = 0 if b == -1 else 1
            K[ci, ai] = probs_dict[c][a_idx, b_idx]

    constraints = [cp.sum(lam) == 1, cp.sum(Pi, axis=0) == np.ones(4)]  # column-stochastic π

    for ci in range(4):
        L_ci = M[ci,:,:] @ lam      # (4,)
        PiL  = Pi @ L_ci            # (4,)
        for ai in range(4):
            constraints += [
                t[ci, ai] >= K[ci, ai] - PiL[ai],
                t[ci, ai] >= -(K[ci, ai] - PiL[ai]),
            ]
        constraints += [0.5 * cp.sum(t[ci, :]) <= eta]

    prob = cp.Problem(cp.Minimize(eta), constraints)
    prob.solve(solver=solver, verbose=False)
    if prob.status not in ("optimal", "optimal_inaccurate"):
        try:
            prob.solve(solver="SCS", verbose=False)
        except Exception:
            pass
    if prob.status not in ("optimal", "optimal_inaccurate"):
        raise RuntimeError(f"LP with global π did not solve to optimality. Status: {prob.status}")
    return float(eta.value)

# ---------- Example usage ----------
if __name__ == "__main__":
    # Example counts (edit to your data): counts[a_idx,b_idx] with a_idx=0->-1,1->+1; b_idx likewise.
    counts_example = {
        '00': np.array([[120, 80],
                        [ 85,115]]),
        '01': np.array([[110, 90],
                        [ 75,125]]),
        '10': np.array([[115, 85],
                        [ 80,120]]),
        '11': np.array([[ 70,130],
                        [135, 65]]),
    }

    # 1) S and lower CI (choose method='hoeffding' or 'bernstein' or 'cp')
    S_hat, S_low, E_hat, t_ctx = chsh_lower_confidence_bound(
        counts_example, delta=0.05, method='hoeffding', min_n=1, verbose=True
    )
    print(f"S_hat = {S_hat:.4f}")
    print(f"S_low (95% one-sided, union bound) = {S_low:.4f}")
    print("Per-context E_hat and t:")
    for c in CONTEXTS:
        print(f"  {c}: E_hat={E_hat[c]:.4f}, t={t_ctx[c]:.4f}")

    # 1b) (optional) Pick CHSH sign pattern from data with ×8 correction
    print("\n[scan] Choosing CHSH sign pattern from data with ×8 correction:")
    best_S_hat, S_low_scan, E_hat_scan, t_ctx_scan, best_SIGNS = chsh_lower_confidence_bound_scan_signs(
        counts_example, delta=0.05, method='hoeffding', min_n=1, verbose=True
    )
    print(f"S_hat(best pattern) = {best_S_hat:.4f}")
    print(f"S_low(best pattern, 95% one-sided with ×8 correction) = {S_low_scan:.4f}")
    delta_min_scan = delta_min_lower_bound(S_low_scan)
    print(f"Certified lower bound on δ→(K, Local) [sign-scan corrected]: {delta_min_scan:.4f}")
    print(f"Chosen sign pattern: {best_SIGNS}")

    # 2) Certified deficiency lower bound
    delta_min = delta_min_lower_bound(S_low)
    print(f"Certified lower bound on δ→(K, Local): {delta_min:.4f}")

    # 3) LP upper bound (π = id)
    probs_dict, _ = counts_to_probs(counts_example)
    check_nonsignalling(probs_dict, tol=1e-6, verbose=True)
    if _HAS_CVXPY:
        eta_star = deficiency_upper_bound_lp(probs_dict, solver='ECOS')
        print(f"LP upper bound (π = id): {eta_star:.4f}")
        try:
            eta_star_pi = deficiency_upper_bound_lp_with_pi(probs_dict, solver='ECOS')
            print(f"Global-π lower bound (optimistic, joint post-processing): {eta_star_pi:.4f}")
            print(f"Certified bracket (lower ≤ δ ≤ upper_id): [{delta_min:.4f}, {eta_star:.4f}]")
            print(f"Certified bracket (data-chosen signs, ×8-corrected): [{delta_min_scan:.4f}, {eta_star:.4f}]")
        except Exception as e:
            print(f"[note] Global-π LP not run: {e}")
            print(f"Bracket: [{delta_min:.4f}, {eta_star:.4f}]")
    else:
        print("cvxpy not installed; skip LP. In Colab run: %pip -q install cvxpy ecos")

    print("\n[note] Sign pattern is precommitted (+,+,+,−). If you pick signs from data,")
    print("       apply an extra ×8 union bound (or lock the pattern before looking).")

[info] δ_total=0.05, δ_per-context=0.0125 via bonferroni (fixed sign pattern +,+,+,−)
S_hat = 0.8500
S_low (95% one-sided, union bound) = 0.2579
Per-context E_hat and t:
  00: E_hat=0.1750, t=0.1480
  01: E_hat=0.1750, t=0.1480
  10: E_hat=0.1750, t=0.1480
  11: E_hat=-0.3250, t=0.1480

[scan] Choosing CHSH sign pattern from data with ×8 correction:
[info] δ_total=0.05, δ_per-context=0.001563 across 32 tests (4 contexts × 8 patterns)
S_hat(best pattern) = 0.8500
S_low(best pattern, 95% one-sided with ×8 correction) = 0.1310
Certified lower bound on δ→(K, Local) [sign-scan corrected]: 0.0000
Chosen sign pattern: {'00': 1, '01': 1, '10': 1, '11': -1}
Certified lower bound on δ→(K, Local): 0.0000
[warn] Max no-signaling deviation = 5.000e-02 (tolerance 1.0e-06)
    Alice x=0 a_idx=0: deviation=0.000e+00
    Alice x=0 a_idx=1: deviation=0.000e+00
    Alice x=1 a_idx=0: deviation=0.000e+00
    Alice x=1 a_idx=1: deviation=0.000e+00
    Bob y=0 b_idx=0: deviation=2.500e-02
    Bob y=0 b_idx=