# X–θ / NIST Bell Test (trial-based, no coincidence windows)

Goal:

1. Load a NIST HDF5 run (trial-structured).
2. Build θ from the _settings path_ (turning-angle holonomy proxy).
3. Decode outcomes (prefer ±1 from detector bitmasks if present).
4. Compute baseline Bell metric (CHSH S) and then S(θ).
5. Run drift-preserving permutation: blockwise + setting-stratified + joint-outcome shuffle.
6. No-signalling check per θ bin.

Notes:

- This is CPU work. GPU is not needed.
- Start with a slice (1–3 million trials). Then scale up.


In [None]:
import os, math
import numpy as np
import pandas as pd
import h5py
import matplotlib.pyplot as plt
from dataclasses import dataclass
from typing import Dict, Any, Optional, Tuple

try:
    from tqdm import tqdm
except Exception:
    tqdm = lambda x, **kwargs: x

## Data: NIST "processed_compressed" HDF5

You need a "real run" HDF5 (often ~GB). Training/small files may not include per-trial outcomes.

You can:

- Download into ./data/
- OR set HDF5_PATH to a file you already have.


In [43]:
# Replace ONLY the URL + local filename


@dataclass
class CFG:
    # Path to your local file:
    HDF5_PATH: str = "./data_nist/nist_run.build.hdf5"

    # Work on a slice while debugging:
    START: int = 0
    MAX_TRIALS: int = 3_000_000  # None = full file (careful)

    # Theta definition:
    THETA_MODE: str = "turning_angle_settings_path"  # (look-up only, we implement it)
    THETA_BINS: int = 12
    THETA_WRAP_2PI: bool = True

    # Permutation test:
    BLOCK_SIZE: int = 20_000
    PERM_ROUNDS: int = 200
    SEED: int = 123

    # Binning:
    MIN_VALID_PER_BIN: int = 20_000


CFG = CFG()
os.makedirs(os.path.dirname(CFG.HDF5_PATH), exist_ok=True)
print("Using HDF5_PATH:", CFG.HDF5_PATH)


# Optional downloader (run only if you mean it)
def download_with_resume(url: str, out_path: str, chunk_size: int = 8 * 1024 * 1024):
    import requests

    headers = {}
    mode = "wb"
    if os.path.exists(out_path):
        existing = os.path.getsize(out_path)
        headers["Range"] = f"bytes={existing}-"
        mode = "ab"
        print(f"Resuming at byte {existing:,}")
    else:
        print("Starting fresh download")

    with requests.get(url, stream=True, headers=headers, timeout=60) as r:
        r.raise_for_status()
        with open(out_path, mode) as f:
            for chunk in r.iter_content(chunk_size=chunk_size):
                if chunk:
                    f.write(chunk)


# Example usage:
URL = "https://s3.amazonaws.com/nist-belltestdata/belldata/processed_compressed/hdf5/2015_09_18/03_31_CH_pockel_100kHz.run4.afterTimingfix2_training.dat.compressed.build.hdf5"
if not os.path.exists(CFG.HDF5_PATH):
    download_with_resume(URL, CFG.HDF5_PATH)


# @dataclass(frozen=True)
# class Config:
#     # Pick a better non-training file when possible:
#     # Example alternate filename you can try (edit as needed):
#     # "...afterTimingfix2_afterfixingModeLocking.dat.compressed.build.hdf5"
#     nist_hdf5_url: str = (
#         "https://s3.amazonaws.com/nist-belltestdata/belldata/processed_compressed/hdf5/"
#         "2015_09_18/03_31_CH_pockel_100kHz.run4.afterTimingfix2_training.dat.compressed.build.hdf5"
#     )

#     data_dir: str = "./data_nist"
#     local_name: str = "nist_run.hdf5"

#     # Work on a contiguous slice for speed; set None for full run
#     start_idx: int = 0
#     max_trials: Optional[int] = 3_000_000

#     # Theta
#     theta_bins: int = 12

#     # Drift-preserving permutation
#     perm_rounds: int = 200
#     block_size: int = 20000


# CFG_NEW = Config(
#     nist_hdf5_url=(
#         "https://s3.amazonaws.com/nist-belltestdata/belldata/processed_compressed/hdf5/"
#         "2015_09_18/03_43_CH_pockel_100kHz.run4.afterTimingfix2_afterfixingModeLocking.dat.compressed.build.hdf5"
#     ),
#     local_name="nist_afterfixingModeLocking.hdf5",
#     start_idx=0,
#     max_trials=3_000_000,  # keep a slice while developing; set None later
#     theta_bins=12,
#     perm_rounds=200,
#     block_size=20000,
# )

# os.makedirs(CFG_NEW.data_dir, exist_ok=True)
# hdf5_path = os.path.join(CFG_NEW.data_dir, CFG_NEW.local_name)

# if not os.path.exists(hdf5_path):
#     print("Downloading:", CFG_NEW.nist_hdf5_url)
#     download_with_resume(CFG_NEW.nist_hdf5_url, hdf5_path)
# else:
#     print("Already exists:", hdf5_path)

# print("HDF5:", hdf5_path)

Using HDF5_PATH: ./data_nist/nist_run.build.hdf5
Starting fresh download


## Step 1: Inspect the HDF5 structure

We need to locate:

- settings arrays: alice/settings, bob/settings
- per-trial outcome arrays (ideally bitmasks) aligned with settings length


In [None]:
def list_hdf5_datasets(hdf5_path: str) -> pd.DataFrame:
    rows = []
    with h5py.File(hdf5_path, "r") as h5:

        def visit(name, obj):
            if isinstance(obj, h5py.Dataset):
                shape = obj.shape
                rows.append(
                    {
                        "path": name,
                        "ndim": len(shape) if shape is not None else 0,
                        "n0": int(shape[0]) if shape and len(shape) > 0 else 0,
                        "dtype": str(obj.dtype),
                    }
                )

        h5.visititems(visit)
    return pd.DataFrame(rows)


df_ds = list_hdf5_datasets(CFG.HDF5_PATH)
df_ds["path_l"] = df_ds["path"].str.lower()
display(df_ds.sort_values("n0", ascending=False).head(40))

Unnamed: 0,path,ndim,n0,dtype,path_l
4,alice/settings,1,28526907,uint8,alice/settings
9,bob/settings,1,28526894,uint8,bob/settings
6,bob/clicks,1,363,uint16,bob/clicks
1,alice/clicks,1,363,uint16,alice/clicks
8,bob/phase,1,15,float64,bob/phase
7,bob/laserPulseNumber,1,15,uint16,bob/laserpulsenumber
2,alice/laserPulseNumber,1,11,uint16,alice/laserpulsenumber
3,alice/phase,1,11,float64,alice/phase
5,bob/badSyncInfo,2,4,int64,bob/badsyncinfo
0,alice/badSyncInfo,2,4,int64,alice/badsyncinfo


## Step 2: Choose candidate paths

We’ll try:

- settings: "alice/settings" and "bob/settings" if present
- outcomes: per-trial int arrays whose length matches settings (or very close)

If auto-detect picks wrong, you will set these manually.


In [None]:
def pick_settings_paths(df: pd.DataFrame) -> Tuple[str, str]:
    # Prefer exact canonical names
    if (df["path"] == "alice/settings").any() and (df["path"] == "bob/settings").any():
        return "alice/settings", "bob/settings"

    # Else: pick two largest 1D datasets containing "settings"
    cand = df[(df["ndim"] == 1) & (df["path_l"].str.contains("settings"))].sort_values(
        "n0", ascending=False
    )
    if len(cand) < 2:
        raise ValueError("Could not find two settings datasets. Inspect df_ds.")
    # heuristic: pick one that contains alice and one bob if possible
    a = cand[cand["path_l"].str.contains("alice")]
    b = cand[cand["path_l"].str.contains("bob")]
    if len(a) and len(b):
        return a.iloc[0]["path"], b.iloc[0]["path"]
    return cand.iloc[0]["path"], cand.iloc[1]["path"]


A_SET_PATH, B_SET_PATH = pick_settings_paths(df_ds)
print("A_SET_PATH:", A_SET_PATH)
print("B_SET_PATH:", B_SET_PATH)

# Determine common length (settings are sometimes off by a few samples)
a_n = int(df_ds.loc[df_ds["path"] == A_SET_PATH, "n0"].iloc[0])
b_n = int(df_ds.loc[df_ds["path"] == B_SET_PATH, "n0"].iloc[0])
N_common = min(a_n, b_n)
print("a_n, b_n, N_common:", a_n, b_n, N_common)

# Candidate per-trial int arrays of length N_common (exclude settings)
per_trial = df_ds[
    (df_ds["ndim"] == 1)
    & (df_ds["n0"] == N_common)
    & (df_ds["dtype"].str.contains("int|uint", case=False, regex=True))
    & (~df_ds["path_l"].str.contains("settings"))
].copy()

display(per_trial.sort_values("path").head(50))

# You can manually set these if auto-pick fails:
A_MASK_PATH = None
B_MASK_PATH = None

# Try to pick obvious "alice" and "bob" candidates
a_like = per_trial[per_trial["path_l"].str.contains("alice")]
b_like = per_trial[per_trial["path_l"].str.contains("bob")]
if len(a_like):
    A_MASK_PATH = a_like.iloc[0]["path"]
if len(b_like):
    B_MASK_PATH = b_like.iloc[0]["path"]

print("Auto A_MASK_PATH:", A_MASK_PATH)
print("Auto B_MASK_PATH:", B_MASK_PATH)
print(
    "If None/wrong: choose from per_trial table and set A_MASK_PATH/B_MASK_PATH manually."
)

A_SET_PATH: alice/settings
B_SET_PATH: bob/settings
a_n, b_n, N_common: 28526907 28526894 28526894


Unnamed: 0,path,ndim,n0,dtype,path_l


Auto A_MASK_PATH: None
Auto B_MASK_PATH: None
If None/wrong: choose from per_trial table and set A_MASK_PATH/B_MASK_PATH manually.


## Step 3: Load settings + outcomes (aligned) + slice

Important bug fix:

- alice/settings and bob/settings can be different lengths.
- We truncate everything to N_common first, then slice, then apply validity mask.


In [None]:
def remap_two_state_to_01(x: np.ndarray) -> np.ndarray:
    """
    Map the two most frequent values -> {0,1}. Everything else -> -1.
    Works even if source values are {1,2} or {0,1}.
    """
    x = np.asarray(x)
    vals, cnt = np.unique(x, return_counts=True)
    if len(vals) < 2:
        return np.full_like(x, -1, dtype=np.int8)
    order = np.argsort(cnt)[::-1]
    v0, v1 = vals[order[0]], vals[order[1]]
    out = np.full(x.shape, -1, dtype=np.int8)
    out[x == v0] = 0
    out[x == v1] = 1
    return out


def load_aligned_slice(
    hdf5_path: str,
    a_set_path: str,
    b_set_path: str,
    a_mask_path: Optional[str],
    b_mask_path: Optional[str],
    start: int,
    max_trials: Optional[int],
) -> Dict[str, np.ndarray]:
    with h5py.File(hdf5_path, "r") as h5:
        a_raw_full = h5[a_set_path][:]
        b_raw_full = h5[b_set_path][:]

        N_common = min(len(a_raw_full), len(b_raw_full))
        a_raw_full = a_raw_full[:N_common]
        b_raw_full = b_raw_full[:N_common]

        # Slice bounds
        end = N_common if max_trials is None else min(N_common, start + int(max_trials))
        a_raw = a_raw_full[start:end]
        b_raw = b_raw_full[start:end]

        a = remap_two_state_to_01(a_raw)
        b = remap_two_state_to_01(b_raw)

        ok = (a >= 0) & (b >= 0)
        a = a[ok].astype(np.int8)
        b = b[ok].astype(np.int8)

        out = {"a_set": a, "b_set": b}

        if a_mask_path and b_mask_path:
            a_mask = h5[a_mask_path][start:end][ok].astype(np.uint32)
            b_mask = h5[b_mask_path][start:end][ok].astype(np.uint32)
            out["a_mask"] = a_mask
            out["b_mask"] = b_mask

    return out


data = load_aligned_slice(
    CFG.HDF5_PATH,
    A_SET_PATH,
    B_SET_PATH,
    A_MASK_PATH,
    B_MASK_PATH,
    CFG.START,
    CFG.MAX_TRIALS,
)

a_set = data["a_set"]
b_set = data["b_set"]
print("Loaded slice trials:", len(a_set))
print("a unique:", np.unique(a_set), "b unique:", np.unique(b_set))

a_mask = data.get("a_mask", None)
b_mask = data.get("b_mask", None)
print("Have masks:", a_mask is not None and b_mask is not None)
if a_mask is not None:
    print("Alice mask min/max:", int(a_mask.min()), int(a_mask.max()))
    print("Bob   mask min/max:", int(b_mask.min()), int(b_mask.max()))

Loaded slice trials: 2999999
a unique: [0 1] b unique: [0 1]
Have masks: False


## Step 4: Compute θ using turning-angle on settings path

This is your `turning_angle_settings_path` operationalization:

Map (a,b) to square corners:
(0,0)->(-1,-1), (0,1)->(-1,+1), (1,0)->(+1,-1), (1,1)->(+1,+1)

Then for each triple of consecutive settings-points, compute signed turning angle and accumulate.


In [None]:
def compute_theta_turning(
    a: np.ndarray, b: np.ndarray, wrap_2pi: bool = True
) -> np.ndarray:
    """
    Vectorized turning-angle accumulator on square corners.
    """
    n = len(a)
    theta = np.zeros(n, dtype=np.float64)
    if n < 3:
        return theta

    # Map to corners
    x = np.where(a == 0, -1, +1).astype(np.int16)
    y = np.where(b == 0, -1, +1).astype(np.int16)

    v1x = x[1:-1] - x[:-2]
    v1y = y[1:-1] - y[:-2]
    v2x = x[2:] - x[1:-1]
    v2y = y[2:] - y[1:-1]

    cross = (v1x * v2y - v1y * v2x).astype(np.int32)
    dot = (v1x * v2x + v1y * v2y).astype(np.int32)

    dtheta = np.arctan2(cross, dot).astype(np.float64)
    csum = np.cumsum(dtheta)
    theta[2:] = np.mod(csum, 2 * np.pi) if wrap_2pi else csum
    theta[0] = 0.0
    theta[1] = theta[2]
    return theta


theta = compute_theta_turning(a_set, b_set, wrap_2pi=CFG.THETA_WRAP_2PI)

edges = np.linspace(0, 2 * np.pi, CFG.THETA_BINS + 1)
theta_bin = np.clip(np.digitize(theta, edges) - 1, 0, CFG.THETA_BINS - 1).astype(
    np.int16
)

print("theta min/max:", float(theta.min()), float(theta.max()))
print("theta bin counts:", np.bincount(theta_bin, minlength=CFG.THETA_BINS))

theta min/max: 0.0 6.283185307179585
theta bin counts: [154184 375970 222140 152997 374177 222761 154947 374508 219141 153172
 373266 222736]


In [23]:
def list_hdf5_datasets(hdf5_path: str) -> pd.DataFrame:
    rows = []
    with h5py.File(hdf5_path, "r") as h5:

        def walk(g, prefix=""):
            for k in g.keys():
                obj = g[k]
                path = f"{prefix}/{k}" if prefix else k
                if isinstance(obj, h5py.Dataset):
                    rows.append(
                        {
                            "path": path,
                            "shape": obj.shape,
                            "dtype": str(obj.dtype),
                            "ndim": obj.ndim,
                            "n": int(obj.shape[0]) if obj.ndim >= 1 else 1,
                        }
                    )
                elif isinstance(obj, h5py.Group):
                    walk(obj, path)

        walk(h5)
    df = pd.DataFrame(rows)
    return df.sort_values(["n", "path"], ascending=[False, True])


df_ds = list_hdf5_datasets(hdf5_path)
display(df_ds.head(80))
print("Total datasets:", len(df_ds))


def assert_outcome_info_present(hdf5_path: str) -> None:
    df = list_hdf5_datasets(hdf5_path)

    # Heuristic 1: any dataset same length as big settings besides settings itself
    # (click masks usually match N_trials)
    big_settings = df[df["path"].str.lower().str.contains("settings")].sort_values(
        "n", ascending=False
    )
    N_trials = int(big_settings.iloc[0]["n"]) if len(big_settings) else None

    # Per-trial integer masks (same length)
    per_trial_int = df[
        (df["ndim"] == 1)
        & (df["n"] == N_trials)
        & (df["dtype"].str.contains("int|uint", case=False, regex=True))
        & (~df["path"].str.lower().str.contains("settings"))
    ]

    # Heuristic 2: event lists (large, and names suggest sync/delay/tag/click)
    low = df["path"].str.lower()
    event_lists = df[
        (df["ndim"] == 1)
        & (df["n"] >= 200_000)
        & (low.str.contains("sync|delay|tag|time|click|detector|pulse"))
    ]

    print("N_trials guess:", N_trials)
    print("Per-trial int datasets (non-settings):", len(per_trial_int))
    print("Event-list clicky datasets:", len(event_lists))

    if len(per_trial_int) == 0 and len(event_lists) == 0:
        display(df.sort_values("n", ascending=False).head(50))
        raise RuntimeError(
            "This HDF5 file appears to contain NO outcome information "
            "(no per-trial masks and no click event lists). "
            "Switch to a different non-training run from NIST processed_compressed/hdf5."
        )


assert_outcome_info_present(hdf5_path)

Unnamed: 0,path,shape,dtype,ndim,n
1,alice/clicks,"(107109596,)",uint16,1,107109596
4,alice/settings,"(107109596,)",uint8,1,107109596
6,bob/clicks,"(107109596,)",uint16,1,107109596
9,bob/settings,"(107109596,)",uint8,1,107109596
2,alice/laserPulseNumber,"(5084060,)",uint16,1,5084060
3,alice/phase,"(5084060,)",float64,1,5084060
7,bob/laserPulseNumber,"(4339158,)",uint16,1,4339158
8,bob/phase,"(4339158,)",float64,1,4339158
0,alice/badSyncInfo,"(4, 10)",int64,2,4
5,bob/badSyncInfo,"(4, 8)",int64,2,4


Total datasets: 16
N_trials guess: 107109596
Per-trial int datasets (non-settings): 2
Event-list clicky datasets: 4


In [None]:
df = df_ds.copy()
df["path_l"] = df["path"].str.lower()

A_SET_PATH, B_SET_PATH = "alice/settings", "bob/settings"
# print(df["path"].astype(str))
# print()

a_n = int(df.loc[df["path"] == A_SET_PATH, "n0"].iloc[0])
b_n = int(df.loc[df["path"] == B_SET_PATH, "n0"].iloc[0])
N_common = min(a_n, b_n)
print("a_n, b_n, N_common:", a_n, b_n, N_common)

TOL = 5000

near = df[df["n0"].between(N_common - TOL, N_common + TOL)].copy()
print("Datasets near settings length:", len(near))
display(near.sort_values("path")[["path", "n0", "dtype"]].head(200))

Top-level keys: ['alice', 'bob', 'config']
alice/badSyncInfo (4, 479804) int64
alice/clicks (363,) uint16
alice/laserPulseNumber (11,) uint16
alice/phase (11,) float64
alice/settings (28526907,) uint8
bob/badSyncInfo (4, 479791) int64
bob/clicks (363,) uint16
bob/laserPulseNumber (15,) uint16
bob/phase (15,) float64
bob/settings (28526894,) uint8
config/alice/bitoffset () int64
config/alice/pk () int64
config/alice/radius () int64
config/bob/bitoffset () int64
config/bob/pk () int64
config/bob/radius () int64
1               alice/clicks
4             alice/settings
6                 bob/clicks
9               bob/settings
2     alice/laserPulseNumber
3                alice/phase
7       bob/laserPulseNumber
8                  bob/phase
0          alice/badSyncInfo
5            bob/badSyncInfo
10    config/alice/bitoffset
11           config/alice/pk
12       config/alice/radius
13      config/bob/bitoffset
14             config/bob/pk
15         config/bob/radius
Name: path, dtype: ob

KeyError: 'n0'

In [None]:
df = df_ds.copy()

print("Total datasets:", len(df))
print("Columns:", df.columns.tolist())
display(df.head(10))

# Make sure path is string + lowercase helper
df["path"] = df["path"].astype(str)
df["path_l"] = df["path"].str.lower()


import ast
import numpy as np
import pandas as pd


def to_shape_tuple(x):
    """Convert whatever 'shape' representation is into a tuple."""
    if isinstance(x, tuple):
        return x
    if isinstance(x, list):
        return tuple(x)
    if isinstance(x, np.ndarray):
        return tuple(x.tolist())
    if pd.isna(x):
        return tuple()
    if isinstance(x, str):
        # common case: "(123,)" or "[123]" etc.
        try:
            v = ast.literal_eval(x)
            if isinstance(v, (list, tuple)):
                return tuple(v)
            if isinstance(v, (int, np.integer)):
                return (int(v),)
        except Exception:
            pass
    # last resort
    return (int(x),) if str(x).strip().isdigit() else tuple()


if "n0" not in df.columns:
    # Try the most common column names people use for shape/dims
    shape_col = next(
        (c for c in ["shape", "dims", "dim", "sizes"] if c in df.columns), None
    )

    if shape_col is not None:
        df["shape_t"] = df[shape_col].apply(to_shape_tuple)
        df["n0"] = df["shape_t"].apply(lambda t: int(t[0]) if len(t) else 0)
        print(f"Created n0 from '{shape_col}'")
    else:
        # Try a direct dim0/len0 style column
        dim0_col = next(
            (
                c
                for c in ["shape0", "dim0", "len0", "n", "size0", "length0"]
                if c in df.columns
            ),
            None,
        )
        if dim0_col is None:
            raise KeyError(
                "No 'n0' and no recognizable shape column found. Check df.columns printed above."
            )
        df["n0"] = pd.to_numeric(df[dim0_col], errors="coerce").fillna(0).astype(int)
        print(f"Created n0 from '{dim0_col}'")

display(df[["path", "ndim", "dtype", "n0"]].head(16))


# Safer dtype string column (avoids .str issues if dtype isn't already a string)
df["dtype_s"] = df["dtype"].astype(str)
df["ndim_n"] = pd.to_numeric(df["ndim"], errors="coerce")

A_SET_PATH, B_SET_PATH = "alice/settings", "bob/settings"

# Use path_l so case can't bite you
a_row = df.loc[df["path_l"] == A_SET_PATH]
b_row = df.loc[df["path_l"] == B_SET_PATH]

print("alice/settings rows:", len(a_row))
print("bob/settings rows:", len(b_row))
display(a_row)
display(b_row)

a_n = int(a_row["n0"].iloc[0])
b_n = int(b_row["n0"].iloc[0])
N_common = min(a_n, b_n)
print("a_n:", a_n, "b_n:", b_n, "N_common:", N_common)

TOL = 5000  # allow small mismatch

cand = df[
    (df["ndim_n"] == 1)
    & (~df["path_l"].str.contains("settings"))
    & (df["dtype_s"].str.contains(r"int|uint|bool", case=False, regex=True))
    & (df["n0"].between(N_common - TOL, N_common + TOL))
].copy()

print("Per-trial candidates:", len(cand))
display(cand.sort_values(["n0", "path"], ascending=[False, True]).head(80))


A_SET_PATH, B_SET_PATH = "alice/settings", "bob/settings"
print(df["path"] == A_SET_PATH)

a_n = int(df.loc[df["path"] == A_SET_PATH, "n0"].iloc[0])
b_n = int(df.loc[df["path"] == B_SET_PATH, "n0"].iloc[0])
N_common = min(a_n, b_n)
print("N_common:", N_common)

TOL = 5000  # allow small mismatch

cand = df[
    (df["ndim"] == 1)
    & (~df["path_l"].str.contains("settings"))
    & (df["dtype"].str.contains("int|uint|bool", case=False, regex=True))
    & (df["n0"].between(N_common - TOL, N_common + TOL))
].copy()

print("Per-trial candidates:", len(cand))
display(cand.sort_values(["n0", "path"], ascending=[False, True]).head(80))

Total datasets: 16
Columns: ['path', 'shape', 'dtype', 'ndim', 'n']


Unnamed: 0,path,shape,dtype,ndim,n
1,alice/clicks,"(107109596,)",uint16,1,107109596
4,alice/settings,"(107109596,)",uint8,1,107109596
6,bob/clicks,"(107109596,)",uint16,1,107109596
9,bob/settings,"(107109596,)",uint8,1,107109596
2,alice/laserPulseNumber,"(5084060,)",uint16,1,5084060
3,alice/phase,"(5084060,)",float64,1,5084060
7,bob/laserPulseNumber,"(4339158,)",uint16,1,4339158
8,bob/phase,"(4339158,)",float64,1,4339158
0,alice/badSyncInfo,"(4, 10)",int64,2,4
5,bob/badSyncInfo,"(4, 8)",int64,2,4


Created n0 from 'shape'


Unnamed: 0,path,ndim,dtype,n0
1,alice/clicks,1,uint16,107109596
4,alice/settings,1,uint8,107109596
6,bob/clicks,1,uint16,107109596
9,bob/settings,1,uint8,107109596
2,alice/laserPulseNumber,1,uint16,5084060
3,alice/phase,1,float64,5084060
7,bob/laserPulseNumber,1,uint16,4339158
8,bob/phase,1,float64,4339158
0,alice/badSyncInfo,2,int64,4
5,bob/badSyncInfo,2,int64,4


alice/settings rows: 1
bob/settings rows: 1


Unnamed: 0,path,shape,dtype,ndim,n,path_l,shape_t,n0,dtype_s,ndim_n
4,alice/settings,"(107109596,)",uint8,1,107109596,alice/settings,"(107109596,)",107109596,uint8,1


Unnamed: 0,path,shape,dtype,ndim,n,path_l,shape_t,n0,dtype_s,ndim_n
9,bob/settings,"(107109596,)",uint8,1,107109596,bob/settings,"(107109596,)",107109596,uint8,1


a_n: 107109596 b_n: 107109596 N_common: 107109596
Per-trial candidates: 2


Unnamed: 0,path,shape,dtype,ndim,n,path_l,shape_t,n0,dtype_s,ndim_n
1,alice/clicks,"(107109596,)",uint16,1,107109596,alice/clicks,"(107109596,)",107109596,uint16,1
6,bob/clicks,"(107109596,)",uint16,1,107109596,bob/clicks,"(107109596,)",107109596,uint16,1


1     False
4      True
6     False
9     False
2     False
3     False
7     False
8     False
0     False
5     False
10    False
11    False
12    False
13    False
14    False
15    False
Name: path, dtype: bool
N_common: 107109596
Per-trial candidates: 2


Unnamed: 0,path,shape,dtype,ndim,n,path_l,shape_t,n0,dtype_s,ndim_n
1,alice/clicks,"(107109596,)",uint16,1,107109596,alice/clicks,"(107109596,)",107109596,uint16,1
6,bob/clicks,"(107109596,)",uint16,1,107109596,bob/clicks,"(107109596,)",107109596,uint16,1


## Step 5: Decode outcomes

Best case: masks encode which detector fired (bitmask).
We keep only single-click trials (popcount == 1) and map the two dominant bits to ±1.

Fallback: if you can’t decode ±1, you can still do an Eberhard-style detection test using A_det,B_det in {0,1}.
But CHSH needs ±1.


In [None]:
def load_aligned_slice_settings_and_masks(
    hdf5_path: str,
    a_set_path: str,
    b_set_path: str,
    a_mask_path: str,
    b_mask_path: str,
    start: int,
    max_trials: Optional[int],
):
    with h5py.File(hdf5_path, "r") as h5:
        aS = h5[a_set_path]
        bS = h5[b_set_path]
        aM = h5[a_mask_path]
        bM = h5[b_mask_path]

        # Critical: truncate everything to common length FIRST
        N0 = min(aS.shape[0], bS.shape[0], aM.shape[0], bM.shape[0])

        end = N0 if max_trials is None else min(N0, start + int(max_trials))

        a_raw = aS[start:end]
        b_raw = bS[start:end]

        a = remap_two_state_to_01(a_raw)
        b = remap_two_state_to_01(b_raw)

        ok = (a >= 0) & (b >= 0)

        a = a[ok].astype(np.int8)
        b = b[ok].astype(np.int8)

        a_mask = np.asarray(aM[start:end])[ok].astype(np.uint32)
        b_mask = np.asarray(bM[start:end])[ok].astype(np.uint32)

    return a, b, a_mask, b_mask


a_set, b_set, a_mask, b_mask = load_aligned_slice_settings_and_masks(
    CFG.HDF5_PATH,
    A_SET_PATH,
    B_SET_PATH,
    A_MASK_PATH,
    B_MASK_PATH,
    CFG.START,
    CFG.MAX_TRIALS,
)

print("Loaded:", len(a_set))
print("Mask stats A min/max:", int(a_mask.min()), int(a_mask.max()))
print("Mask stats B min/max:", int(b_mask.min()), int(b_mask.max()))
print("Zero frac A,B:", float(np.mean(a_mask == 0)), float(np.mean(b_mask == 0)))


def popcount_u32(x: np.ndarray) -> np.ndarray:
    x = x.astype(np.uint32)
    x = x - ((x >> 1) & np.uint32(0x55555555))
    x = (x & np.uint32(0x33333333)) + ((x >> 2) & np.uint32(0x33333333))
    x = (x + (x >> 4)) & np.uint32(0x0F0F0F0F)
    x = x + (x >> 8)
    x = x + (x >> 16)
    return (x & np.uint32(0x3F)).astype(np.uint8)


def decode_pm1_from_mask(
    mask: np.ndarray, sample_for_mapping: int = 500_000
) -> Tuple[np.ndarray, Dict[str, Any]]:
    """
    Return x in {+1,-1,0}. 0 = undecoded/invalid.
    """
    mask = mask.astype(np.uint32)
    pc = popcount_u32(mask)
    single = pc == 1
    m_single = mask[single]
    if len(m_single) == 0:
        raise RuntimeError(
            "No single-click trials found (pc==1). Try another mask/run."
        )

    # isolate lowbit for single-click bitmask
    bit = (m_single & (~m_single + 1)).astype(np.uint32)

    s = min(len(bit), sample_for_mapping)
    bits, counts = np.unique(bit[:s], return_counts=True)
    order = np.argsort(counts)[::-1]
    if len(order) < 2:
        raise RuntimeError(
            "Could not find 2 dominant detector bits; need another mapping."
        )

    bit_plus = bits[order[0]]
    bit_minus = bits[order[1]]

    out = np.zeros(len(mask), dtype=np.int8)
    out[single & (mask == bit_plus)] = +1
    out[single & (mask == bit_minus)] = -1

    info = {
        "bit_plus": int(bit_plus),
        "bit_minus": int(bit_minus),
        "plus_count_sample": int(counts[order[0]]),
        "minus_count_sample": int(counts[order[1]]),
        "single_click_frac": float(np.mean(single)),
        "decoded_frac": float(np.mean(out != 0)),
    }
    return out, info


def detection_from_mask(mask: np.ndarray) -> np.ndarray:
    """
    A_det in {0,1}: 1 if any click (mask != 0), else 0.
    """
    return (mask.astype(np.uint32) != 0).astype(np.int8)


if a_mask is None or b_mask is None:
    raise RuntimeError(
        "No per-trial masks loaded. Choose A_MASK_PATH/B_MASK_PATH or switch run file."
    )

x, infoA = decode_pm1_from_mask(a_mask)
y, infoB = decode_pm1_from_mask(b_mask)
print("Alice decode:", infoA)
print("Bob decode:", infoB)

valid_pm1 = (x != 0) & (y != 0)
print("Valid decoded (both sides):", int(valid_pm1.sum()), " / ", len(x))

A_det = detection_from_mask(a_mask)
B_det = detection_from_mask(b_mask)
print("Detection rates:", float(A_det.mean()), float(B_det.mean()))

TypeError: Accessing a group is done with bytes or str, not <class 'NoneType'>

## Step 6: Baseline metrics

1. CHSH S (requires decoded ±1)
2. Eberhard-style J (can use detection A_det,B_det in {0,1})

If baseline CHSH does NOT violate (or is NaN), your ±1 decoding is wrong for this run/mask.


In [None]:
def chsh_S_pm1(a_set, b_set, x, y, signs=(+1, +1, +1, -1)) -> Dict[str, Any]:
    """
    E_ab = mean(x*y | a,b, valid_pm1)
    S = E00 + E01 + E10 - E11 by default
    """
    E, N = {}, {}
    for a in [0, 1]:
        for b in [0, 1]:
            m = (a_set == a) & (b_set == b) & (x != 0) & (y != 0)
            N[(a, b)] = int(m.sum())
            E[(a, b)] = float(np.mean(x[m] * y[m])) if N[(a, b)] else np.nan
    s00, s01, s10, s11 = signs
    S = s00 * E[(0, 0)] + s01 * E[(0, 1)] + s10 * E[(1, 0)] + s11 * E[(1, 1)]
    # make JSON-friendly keys
    E2 = {f"{k[0]}{k[1]}": v for k, v in E.items()}
    N2 = {f"{k[0]}{k[1]}": v for k, v in N.items()}
    return {"E": E2, "N": N2, "S": float(S)}


baseline_S = chsh_S_pm1(a_set, b_set, x, y)
print("Baseline CHSH:", baseline_S)


def eberhard_J(
    a_set: np.ndarray, b_set: np.ndarray, A: np.ndarray, B: np.ndarray
) -> float:
    """
    One common "J" form (detection-loophole style) using events in {0,1}.
    This is NOT the only convention, but is a valid consistent statistic.
    """

    def probs(a, b):
        m = (a_set == a) & (b_set == b)
        n = int(m.sum())
        if n == 0:
            return (np.nan, np.nan, np.nan)
        Am = A[m]
        Bm = B[m]
        p11 = float(np.mean((Am == 1) & (Bm == 1)))
        p10 = float(np.mean((Am == 1) & (Bm == 0)))
        p01 = float(np.mean((Am == 0) & (Bm == 1)))
        return p11, p10, p01

    p11_00, _, _ = probs(0, 0)
    _, p10_01, _ = probs(0, 1)
    _, _, p01_10 = probs(1, 0)
    p11_11, _, _ = probs(1, 1)

    return float(p11_00 - p10_01 - p01_10 - p11_11)


baseline_J = eberhard_J(a_set, b_set, A_det, B_det)
print(f"Baseline J (slice) = {baseline_J:.12e}")

## Step 7: Bin by θ and compute S(θ) and J(θ)


In [None]:
def binned_S(
    a_set, b_set, x, y, theta_bin, bins, min_valid_per_bin=20000
) -> pd.DataFrame:
    rows = []
    for k in range(bins):
        m = theta_bin == k
        sub = chsh_S_pm1(a_set[m], b_set[m], x[m], y[m])
        n_valid = sum(sub["N"].values())
        rows.append(
            {
                "bin": k,
                "n_valid": int(n_valid),
                "theta_center": float((k + 0.5) * 2 * np.pi / bins),
                "S": float(sub["S"]) if n_valid >= min_valid_per_bin else np.nan,
            }
        )
    return pd.DataFrame(rows)


def binned_J(
    a_set, b_set, A, B, theta_bin, bins, min_trials_per_bin=5000
) -> pd.DataFrame:
    rows = []
    for k in range(bins):
        m = theta_bin == k
        n = int(m.sum())
        if n < min_trials_per_bin:
            rows.append(
                {
                    "bin": k,
                    "n": n,
                    "theta_center": float((k + 0.5) * 2 * np.pi / bins),
                    "J": np.nan,
                }
            )
        else:
            Jk = eberhard_J(a_set[m], b_set[m], A[m], B[m])
            rows.append(
                {
                    "bin": k,
                    "n": n,
                    "theta_center": float((k + 0.5) * 2 * np.pi / bins),
                    "J": float(Jk),
                }
            )
    return pd.DataFrame(rows)


bs = binned_S(
    a_set,
    b_set,
    x,
    y,
    theta_bin,
    CFG.THETA_BINS,
    min_valid_per_bin=CFG.MIN_VALID_PER_BIN,
)
display(bs)

plt.figure()
plt.plot(bs["theta_center"], bs["S"], marker="o")
plt.xlabel("theta (bin center)")
plt.ylabel("CHSH S (binned)")
plt.title("S(theta)")
plt.grid(True, alpha=0.3)
plt.show()

bj = binned_J(
    a_set, b_set, A_det, B_det, theta_bin, CFG.THETA_BINS, min_trials_per_bin=5000
)
display(bj)

plt.figure()
plt.plot(bj["theta_center"], bj["J"], marker="o")
plt.ticklabel_format(axis="y", style="sci", scilimits=(0, 0))
plt.xlabel("theta (bin center)")
plt.ylabel("Eberhard J (binned)")
plt.title("J(theta)")
plt.grid(True, alpha=0.3)
plt.show()

## Step 8: Modulation statistic Δχ²

We fit:

- flat: y=c
- sine: y=c + A\*sin(theta + phi)

Statistic: Δχ² = SSE_flat - SSE_sine (bigger = more modulation)


In [None]:
def fit_delta_chi2(theta_centers: np.ndarray, y: np.ndarray) -> Dict[str, Any]:
    ok = np.isfinite(y)
    x = theta_centers[ok].astype(np.float64)
    y = y[ok].astype(np.float64)
    if len(y) < 6:
        return {"delta_chi2": np.nan, "bins_used": int(len(y))}

    c0 = float(np.mean(y))
    sse_flat = float(np.sum((y - c0) ** 2))

    # robust fit without scipy: scan phi, solve linear least squares for (c,A)
    phis = np.linspace(-np.pi, np.pi, 181)
    best = None
    for phi in phis:
        s = np.sin(x + phi)
        M = np.column_stack([np.ones_like(s), s])
        coef, *_ = np.linalg.lstsq(M, y, rcond=None)
        c, A = float(coef[0]), float(coef[1])
        yhat = c + A * s
        sse = float(np.sum((y - yhat) ** 2))
        if best is None or sse < best["sse"]:
            best = {"c": c, "A": A, "phi": float(phi), "sse": sse}

    return {
        "delta_chi2": float(sse_flat - best["sse"]),
        "c": float(best["c"]),
        "A": float(best["A"]),
        "phi": float(best["phi"]),
        "sse_flat": float(sse_flat),
        "sse_sine": float(best["sse"]),
        "bins_used": int(len(y)),
    }


obs_S = fit_delta_chi2(bs["theta_center"].to_numpy(), bs["S"].to_numpy())
obs_J = fit_delta_chi2(bj["theta_center"].to_numpy(), bj["J"].to_numpy())
print("Observed modulation (S):", obs_S)
print("Observed modulation (J):", obs_J)

## Step 9: Drift-preserving permutation test

Null design (reviewer-proof):

- keep settings sequence fixed => θ fixed
- preserve drift by shuffling within time blocks
- preserve setting-conditioned statistics by shuffling within (a,b) strata
- for CHSH: shuffle the JOINT (x,y) code so correlations are broken but marginals preserved


In [None]:
def perm_test_delta_chi2_S(
    a_set,
    b_set,
    x,
    y,
    theta_bin,
    bins: int,
    block_size: int,
    perm_rounds: int,
    min_valid_per_bin: int,
    seed: int = 123,
) -> Dict[str, Any]:
    rng = np.random.default_rng(seed)
    n = len(a_set)

    # observed
    bs0 = binned_S(
        a_set, b_set, x, y, theta_bin, bins, min_valid_per_bin=min_valid_per_bin
    )
    obs = fit_delta_chi2(bs0["theta_center"].to_numpy(), bs0["S"].to_numpy())

    # stratum id 0..3
    sp = ((a_set.astype(np.int8) << 1) | b_set.astype(np.int8)).astype(np.int8)

    dec = (x != 0) & (y != 0)
    code = np.zeros(n, dtype=np.int8)
    # 4 states based on sign bits
    code[dec] = (
        ((x[dec] == 1).astype(np.int8) << 1) | ((y[dec] == 1).astype(np.int8))
    ).astype(np.int8)

    null_deltas = np.empty(perm_rounds, dtype=np.float64)

    for r in tqdm(range(perm_rounds), desc="Perm S"):
        code_p = code.copy()
        for start in range(0, n, block_size):
            end = min(n, start + block_size)
            idx_block = np.arange(start, end)
            for s in range(4):
                idx = idx_block[(sp[idx_block] == s) & dec[idx_block]]
                if len(idx) > 2:
                    code_p[idx] = rng.permutation(code_p[idx])

        x_p = x.copy()
        y_p = y.copy()
        x_p[dec] = np.where(((code_p[dec] >> 1) & 1) == 1, +1, -1).astype(np.int8)
        y_p[dec] = np.where((code_p[dec] & 1) == 1, +1, -1).astype(np.int8)

        bs_r = binned_S(
            a_set, b_set, x_p, y_p, theta_bin, bins, min_valid_per_bin=min_valid_per_bin
        )
        fit_r = fit_delta_chi2(bs_r["theta_center"].to_numpy(), bs_r["S"].to_numpy())
        null_deltas[r] = fit_r["delta_chi2"]

    p = float(np.mean(null_deltas >= obs["delta_chi2"]))
    return {
        "observed": obs,
        "null_deltas": null_deltas,
        "p_value": p,
        "bs_observed": bs0,
    }


def perm_test_delta_chi2_J(
    a_set,
    b_set,
    A_det,
    B_det,
    theta_bin,
    bins: int,
    block_size: int,
    perm_rounds: int,
    min_trials_per_bin: int,
    seed: int = 123,
) -> Dict[str, Any]:
    rng = np.random.default_rng(seed)
    n = len(a_set)

    bj0 = binned_J(
        a_set,
        b_set,
        A_det,
        B_det,
        theta_bin,
        bins,
        min_trials_per_bin=min_trials_per_bin,
    )
    obs = fit_delta_chi2(bj0["theta_center"].to_numpy(), bj0["J"].to_numpy())

    sp = ((a_set.astype(np.int8) << 1) | b_set.astype(np.int8)).astype(np.int8)

    # joint detection code 0..3 for (A_det,B_det)
    code = ((A_det.astype(np.int8) << 1) | B_det.astype(np.int8)).astype(np.int8)
    null_deltas = np.empty(perm_rounds, dtype=np.float64)

    for r in tqdm(range(perm_rounds), desc="Perm J"):
        code_p = code.copy()
        for start in range(0, n, block_size):
            end = min(n, start + block_size)
            idx_block = np.arange(start, end)
            for s in range(4):
                idx = idx_block[(sp[idx_block] == s)]
                if len(idx) > 2:
                    code_p[idx] = rng.permutation(code_p[idx])

        A_p = ((code_p >> 1) & 1).astype(np.int8)
        B_p = (code_p & 1).astype(np.int8)

        bj_r = binned_J(
            a_set,
            b_set,
            A_p,
            B_p,
            theta_bin,
            bins,
            min_trials_per_bin=min_trials_per_bin,
        )
        fit_r = fit_delta_chi2(bj_r["theta_center"].to_numpy(), bj_r["J"].to_numpy())
        null_deltas[r] = fit_r["delta_chi2"]

    p = float(np.mean(null_deltas >= obs["delta_chi2"]))
    return {
        "observed": obs,
        "null_deltas": null_deltas,
        "p_value": p,
        "bj_observed": bj0,
    }

## Step 10: Run permutation tests + plot null distributions


In [None]:
ptS = perm_test_delta_chi2_S(
    a_set,
    b_set,
    x,
    y,
    theta_bin,
    bins=CFG.THETA_BINS,
    block_size=CFG.BLOCK_SIZE,
    perm_rounds=CFG.PERM_ROUNDS,
    min_valid_per_bin=CFG.MIN_VALID_PER_BIN,
    seed=CFG.SEED,
)
print("Permutation p-value (S):", ptS["p_value"])
print("Observed fit (S):", ptS["observed"])

plt.figure()
plt.hist(ptS["null_deltas"][np.isfinite(ptS["null_deltas"])], bins=30)
plt.axvline(ptS["observed"]["delta_chi2"], linewidth=2)
plt.xlabel("delta_chi2 null (S)")
plt.ylabel("count")
plt.title(f"Null distribution (S), p={ptS['p_value']:.4f}")
plt.grid(True, alpha=0.3)
plt.show()

ptJ = perm_test_delta_chi2_J(
    a_set,
    b_set,
    A_det,
    B_det,
    theta_bin,
    bins=CFG.THETA_BINS,
    block_size=CFG.BLOCK_SIZE,
    perm_rounds=CFG.PERM_ROUNDS,
    min_trials_per_bin=5000,
    seed=CFG.SEED,
)
print("Permutation p-value (J):", ptJ["p_value"])
print("Observed fit (J):", ptJ["observed"])

plt.figure()
plt.hist(ptJ["null_deltas"][np.isfinite(ptJ["null_deltas"])], bins=30)
plt.axvline(ptJ["observed"]["delta_chi2"], linewidth=2)
plt.xlabel("delta_chi2 null (J)")
plt.ylabel("count")
plt.title(f"Null distribution (J), p={ptJ['p_value']:.4f}")
plt.grid(True, alpha=0.3)
plt.show()

## Step 11: No-signalling check (must pass)

Within each θ bin:

- Alice marginal P(x=+1 | a, b=0) ≈ P(x=+1 | a, b=1)
- Bob marginal P(y=+1 | b, a=0) ≈ P(y=+1 | b, a=1)

If this fails systematically, you likely have an artifact/selection bias.


In [None]:
def nosignalling_check(a_set, b_set, x, y, theta_bin, bins) -> pd.DataFrame:
    rows = []
    for k in range(bins):
        base = (theta_bin == k) & (x != 0) & (y != 0)

        # Alice marginal vs Bob setting
        for a in [0, 1]:
            m0 = base & (a_set == a) & (b_set == 0)
            m1 = base & (a_set == a) & (b_set == 1)
            if m0.sum() and m1.sum():
                p0 = float(np.mean(x[m0] == 1))
                p1 = float(np.mean(x[m1] == 1))
                rows.append(
                    {
                        "bin": k,
                        "side": "Alice",
                        "fixed": f"a={a}",
                        "p_plus_remote0": p0,
                        "p_plus_remote1": p1,
                        "diff": p0 - p1,
                    }
                )

        # Bob marginal vs Alice setting
        for b in [0, 1]:
            m0 = base & (b_set == b) & (a_set == 0)
            m1 = base & (b_set == b) & (a_set == 1)
            if m0.sum() and m1.sum():
                p0 = float(np.mean(y[m0] == 1))
                p1 = float(np.mean(y[m1] == 1))
                rows.append(
                    {
                        "bin": k,
                        "side": "Bob",
                        "fixed": f"b={b}",
                        "p_plus_remote0": p0,
                        "p_plus_remote1": p1,
                        "diff": p0 - p1,
                    }
                )

    return pd.DataFrame(rows)


ns = nosignalling_check(a_set, b_set, x, y, theta_bin, CFG.THETA_BINS)
display(ns)
print("Median |diff|:", float(np.median(np.abs(ns["diff"].to_numpy()))))

## If your results look "too flat" or p ~ 1.0, the usual causes are:

1. Wrong outcome decoding:

   - Your ±1 mapping is picking the wrong bits.
   - Symptom: baseline CHSH S ~ 0, or S doesn't violate, or S(theta) looks constant.

2. File/run doesn’t contain usable per-trial outcomes (only settings):

   - Symptom: no per-trial arrays matching settings length.

3. You tested a detection-only statistic (J) on a dataset where it’s not informative:
   - J(theta) can drift around a constant negative baseline and still show no modulation.

What to do next:

- First make sure baseline CHSH S violates on your decoded ±1 outcomes.
- Only then do S(theta) + permutation.
- If baseline CHSH doesn't violate, switch to another run file OR fix detector-bit mapping.
