In [1]:
# Install core dependencies
!pip install -q numpy scipy matplotlib mpmath

# Optional: if your script uses CLI arguments and file access
!pip install -q argparse

# Optional: for faster file loading or JSON operations (usually pre-installed)
!pip install -q orjson

# GPU-accelerated FFTs and array ops (for CuPy port version)
# Comment this out if using CPU-only mpmath/NumPy version
#!pip install -q cupy-cuda11x  # or `cupy-cuda12x` for CUDA 12 Colab runtimes

# Make output directory
!mkdir -p /content/output


In [2]:
!export PATH="$HOME/.elan/bin:$PATH" && lean --version


/bin/bash: line 1: lean: command not found


In [3]:
%%writefile proof_certificate.py
"""
proof_certificate.py

Certificate‑driven verification of the recursive damping inequality:

    dY/dt <= -ν Λ Y^(1+δ) + C Y^2 · sup_j A_j(t)

At each timestep this module computes the residual,
checks it against a tolerance, logs a JSON‑lines “certificate”,
and emits a warning if the inequality is violated.
"""

from pathlib import Path
import json

__all__ = ["verify_recursive_damping"]

def verify_recursive_damping(
    Y: float,
    dY_dt: float,
    alignment_sup: float,
    t: float,
    *,
    nu: float,
    Lambda: float,
    C: float,
    alpha: float,
    Y_star: float,
    tolerance: float = 1e-6,
    log_path: str = "recursive_damping_certificate.jsonl"
):
    """
    Compute the residual of the recursive damping inequality at time t, check
    it against `tolerance`, log a JSON line to `log_path`, and optionally
    print a warning if the inequality fails.

    Parameters
    ----------
    Y : float
        The current vorticity norm, Y(t).
    dY_dt : float
        The time derivative dY/dt at this timestep.
    alignment_sup : float
        sup_j 𝒜_j(t), the maximal alignment factor.
    t : float
        The current time.
    nu : float
        Viscosity coefficient ν.
    Lambda : float
        Damping constant Λ.
    C : float
        Nonlinear constant C.
    alpha : float
        Scaling exponent α (so that δ = 2/α).
    Y_star : float
        Threshold for triggering contraction certificate.
    tolerance : float, optional
        Residual tolerance (default 1e‑6).
    log_path : str, optional
        Path to append the JSONL certificate (default "recursive_damping_certificate.jsonl").

    Returns
    -------
    residual : float
        The computed residual = dY_dt + νΛY^(1+δ) − C Y^2 · alignment_sup.
    contracting : bool
        True if Y > Y_star and residual ≤ tolerance.
    """

    # Compute δ = 2/α
    delta = 2.0 / alpha

    # Residual of the inequality: LHS − RHS ≤ 0  ⇒  residual = LHS − RHS
    residual = dY_dt + nu * Lambda * (Y ** (1 + delta)) - C * (Y ** 2) * alignment_sup

    # Determine if contracting
    contracting = (Y > Y_star) and (residual <= tolerance)

    # Prepare JSON entry
    entry = {
        "time":          float(t),
        "Y":             float(Y),
        "dY_dt":         float(dY_dt),
        "residual":      float(residual),
        "alignment_sup": float(alignment_sup),
        "Y_star":        float(Y_star),
        "contracting":   bool(contracting),
    }

    # Ensure output directory exists
    Path(log_path).parent.mkdir(parents=True, exist_ok=True)

    # Append one line of JSON
    with open(log_path, "a", encoding="utf-8") as fh:
        fh.write(json.dumps(entry) + "\n")
        fh.flush()

    # If we expected contraction but got a violation, warn
    if (Y > Y_star) and (residual > tolerance):
        print(f"[❌ FAIL] Damping violated at t={t:.6f}, residual={residual:.2e}")

    return residual, contracting


Writing proof_certificate.py


In [4]:
%%writefile proof_colab.py
#!/usr/bin/env python3
import os
try:
    import cupy as cp
    from cupy.fft import fftn, ifftn
    gpu_mode = True
    print("Using CuPy (GPU) for arrays and FFTs")
except ImportError:
    import numpy as cp
    from numpy.fft import fftn, ifftn
    gpu_mode = False
    print("CuPy not found! Using CPU fallback; performance will be slow.")

import numpy as np
import argparse, sys, json
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
import hashlib

# ---------------------------------------------------------------------------
# NEW: recursive damping certificate utility
# ---------------------------------------------------------------------------
from proof_certificate import verify_recursive_damping

def log_recursive_damping_certificate(
    res: dict,
    t: float,
    nu: float,
    alpha: float,
    *,
    log_path: str = "recursive_damping_certificate.jsonl",
    Y_star: float = 0.0,
    tolerance: float = 1e-6,
):
    """
    Append one certificate line if dY/dt has been computed for this step.
    The `res` dict comes straight from main_compute.
    """
    if res is None or res.get("dYdt") is None:
        return

    Y_mid     = float(res["Y"][0])
    dYdt_mid  = float(res["dYdt"][0])
    align_sup = float(res["align_sup"][0])
    Lambda    = float(res["Lambda"])
    C_nl      = float(res["C_nl"])

    verify_recursive_damping(
        Y_mid,
        dYdt_mid,
        align_sup,
        t,
        nu=nu,
        Lambda=Lambda,
        C=C_nl,
        alpha=alpha,
        Y_star=Y_star,
        tolerance=tolerance,
        log_path=log_path,
    )

# --- Interval arithmetic emulation: (mid, width) floats ---
def interval(x, w=0.0):
    return (float(x), float(w))

def ia_add(a, b):
    return (a[0] + b[0], a[1] + b[1])

def ia_sub(a, b):
    return (a[0] - b[0], a[1] + b[1])

def ia_mul(a, b):
    m = a[0] * b[0]
    r = abs(a[0]) * b[1] + abs(b[0]) * a[1] + a[1] * b[1]
    return (m, r)

def ia_div(a, b):
    if b[0] == 0:
        return (0, 1e16)
    m = a[0]/b[0]
    r = abs(m)*(a[1]/abs(a[0] + 1e-20) + b[1]/abs(b[0] + 1e-20))
    return (m, r)

def ia_pow(a, expo):
    m = a[0]**expo
    if abs(a[0]) > 1e-10:
        r = abs(expo)*abs(a[0])**(expo - 1)*a[1]
    else:
        r = a[1]
    return (m, r)

def ia_abs(a):
    return (abs(a[0]), a[1])

def ia_max(arr):
    if len(arr) == 0:
        return (0, 0)
    m = max(x[0] for x in arr)
    r = max(x[1] for x in arr)
    return (m, r)

def ia_sum(arr):
    if len(arr) == 0:
        return (0, 0)
    m = sum(x[0] for x in arr)
    r = sum(x[1] for x in arr)
    return (m, r)

def ia_mean(arr):
    if len(arr) == 0:
        return (0, 0)
    m = sum(x[0] for x in arr) / len(arr)
    r = sum(x[1] for x in arr) / len(arr)
    return (m, r)

def interval_repr(a):
    return f"{a[0]:.6g} ± {a[1]:.2e}"

# --- Dyadic filter FFT tools ---
def fftfreq(N):
    return cp.fft.fftfreq(N) * N

def dyadic_filter(N, j):
    kx, ky, kz = [fftfreq(N)]*3
    Kx, Ky, Kz = cp.meshgrid(kx, ky, kz, indexing='ij')
    k_mag = cp.sqrt(Kx**2 + Ky**2 + Kz**2)
    low, high = 2**j, 2**(j+1)
    mask = (k_mag >= low) & (k_mag < high)
    return mask.astype(cp.float32)

# --- Field generation ---
def make_vortex_tube(N, amplitude=1.0, core_radius=None):
    X, Y, Z = cp.meshgrid(cp.arange(N), cp.arange(N), cp.arange(N), indexing='ij')
    xc, yc = N/2, N/2
    r = cp.sqrt((X - xc)**2 + (Y - yc)**2)
    if core_radius is None:
        core_radius = N/16
    omega = cp.zeros((3, N, N, N), dtype=cp.float32)
    tube = amplitude * cp.exp(-r**2 / (2*core_radius**2))
    phi = cp.arctan2(Y - yc, X - xc)
    omega[0] = -tube * cp.sin(phi)
    omega[1] = tube * cp.cos(phi)
    return omega

def make_shell_localized(N, j, amplitude=1.0, seed=0):
    rng = np.random.default_rng(seed)
    fft_field = cp.zeros((3, N, N, N), dtype=cp.complex64)
    mask = dyadic_filter(N, j)
    for c in range(3):
        random_phase = cp.asarray(np.exp(1j * 2*np.pi * rng.random((N,N,N))))
        random_mag = cp.asarray(rng.normal(0,1,(N,N,N)))
        fft_field[c] = mask * amplitude * random_phase * random_mag

    kx = fftfreq(N)
    ky = fftfreq(N)
    kz = fftfreq(N)
    Kx, Ky, Kz = cp.meshgrid(kx, ky, kz, indexing='ij')
    K = cp.array([Kx, Ky, Kz])

    for idx in cp.ndindex((N, N, N)):
        kvec = K[:, idx[0], idx[1], idx[2]]
        if cp.linalg.norm(kvec) < 1e-8:
            continue
        omega_here = fft_field[:, idx[0], idx[1], idx[2]]
        proj = cp.dot(omega_here, kvec) / (cp.dot(kvec, kvec) + 1e-15)
        fft_field[:, idx[0], idx[1], idx[2]] -= proj * kvec

    field = cp.zeros((3, N, N, N), dtype=cp.float32)
    for c in range(3):
        field[c] = cp.real(ifftn(fft_field[c], axes=(0,1,2)))
    return field

def make_boundary_layer(N, amplitude=1.0, thickness=None):
    if thickness is None:
        thickness = N/16
    X = cp.arange(N).reshape(N,1,1)
    profile = amplitude * cp.exp(-((X - N//8)**2) / (2*thickness**2))
    omega = cp.zeros((3,N,N,N), dtype=cp.float32)
    omega[2] = cp.tile(profile, (1,N,N))
    return omega

# --- Chunked / channel-wise transforms to reduce memory usage ---
def chunked_fftn(data_4d):
    C = data_4d.shape[0]
    out = cp.zeros_like(data_4d, dtype=cp.complex64)
    for c in range(C):
        out[c] = fftn(data_4d[c], axes=(0,1,2))
    return out

def chunked_ifftn(data_4d):
    C = data_4d.shape[0]
    out = cp.zeros((C,) + data_4d.shape[1:], dtype=cp.float32)
    for c in range(C):
        out[c] = cp.real(ifftn(data_4d[c], axes=(0,1,2)))
    return out

# --- Biot-Savart and derivatives ---
def biot_savart(omega, verbose=True):
    N = omega.shape[1]
    omega_hat = chunked_fftn(omega)

    kx, ky, kz = [fftfreq(N)]*3
    Kx, Ky, Kz = cp.meshgrid(kx, ky, kz, indexing='ij')
    k2 = Kx**2 + Ky**2 + Kz**2 + 1e-10
    u_hat = cp.zeros_like(omega_hat, dtype=cp.complex64)

    u_hat[0] = (1j * (Ky*omega_hat[2] - Kz*omega_hat[1])) / k2
    u_hat[1] = (1j * (Kz*omega_hat[0] - Kx*omega_hat[2])) / k2
    u_hat[2] = (1j * (Kx*omega_hat[1] - Ky*omega_hat[0])) / k2

    u = chunked_ifftn(u_hat)

    divu_hat = (
        1j * Kx * chunked_fftn(u[0:1])[0] +
        1j * Ky * chunked_fftn(u[1:2])[0] +
        1j * Kz * chunked_fftn(u[2:3])[0]
    )
    divu = cp.real(ifftn(divu_hat, axes=(0,1,2)))
    maxdiv = float(cp.abs(divu).max())
    if verbose:
        print(f"[biot_savart] max|div u| = {maxdiv:.2e}")

    return u

def compute_gradients(u):
    N = u.shape[1]
    u_hat = chunked_fftn(u)
    kx, ky, kz = [fftfreq(N)]*3
    Kx, Ky, Kz = cp.meshgrid(kx, ky, kz, indexing='ij')
    K = cp.array([Kx, Ky, Kz])

    grad_u = cp.zeros((3,3,N,N,N), dtype=cp.float32)
    for alpha in range(3):
        for beta in range(3):
            tmp = 1j * K[beta] * u_hat[alpha]
            grad_comp = cp.real(ifftn(tmp, axes=(0,1,2)))
            grad_u[alpha, beta] = grad_comp.astype(cp.float32)
    return grad_u

def compute_strain(grad_u):
    return 0.5 * (grad_u + cp.transpose(grad_u, (1,0,2,3,4)))

# --- Batched principal strain computation ---
def principal_strain_evectors(S, batch_size=16384):
    N = S.shape[2]
    S_batched = S.transpose(2,3,4,0,1).reshape(-1,3,3).astype(cp.float32)
    S_batched = 0.5 * (S_batched + cp.transpose(S_batched, (0,2,1)))
    total = S_batched.shape[0]
    out = cp.empty((total, 3), dtype=cp.float32)
    for start in range(0, total, batch_size):
        stop = min(start + batch_size, total)
        sb = S_batched[start:stop]
        if cp.all(sb == 0):
            out[start:stop, :] = 0.
            out[start:stop, 0] = 1.
            continue
        try:
            vals, vecs = cp.linalg.eigh(sb)
            pv = vecs[:,:,-1]
            pv_norm = cp.linalg.norm(pv, axis=1, keepdims=True)
            pv = pv / (pv_norm + 1e-30)
            out[start:stop, :] = pv
        except Exception as e:
            print(f"Eigen failure on batch {start}:{stop}: {e}")
            out[start:stop, :] = 0.
            out[start:stop, 0] = 1.
    e1 = out.T.reshape(3, N, N, N)
    return e1

# --- Filter with chunked FFT to reduce memory usage ---
def apply_dyadic_filter(vort, mask):
    out = cp.zeros_like(vort)
    for c in range(3):
        v_hat = fftn(vort[c], axes=(0,1,2))
        out_hat = v_hat * mask
        out[c] = cp.real(ifftn(out_hat, axes=(0,1,2)))
        del v_hat, out_hat
        cp.get_default_memory_pool().free_all_blocks()
    return out

def linf_norm(arr):
    return float(cp.abs(arr).max())

def shell_norm_interval(omega_j):
    v = cp.abs(omega_j)
    maxval = float(v.max())
    esterr = 1e-7 * maxval
    return (maxval, esterr)

def vorticity_rhs(omega, nu):
    u = biot_savart(omega, verbose=False)
    grad_u = compute_gradients(u)
    stretch = cp.zeros_like(omega)
    for alpha in range(3):
        for beta in range(3):
            stretch[alpha] += omega[beta] * grad_u[alpha,beta]

    omega_hat = chunked_fftn(omega)
    N = omega.shape[1]
    kx, ky, kz = [fftfreq(N)]*3
    Kx, Ky, Kz = cp.meshgrid(kx, ky, kz, indexing='ij')
    k2 = Kx**2 + Ky**2 + Kz**2
    laplacian = cp.zeros_like(omega)
    for c in range(3):
        laplacian_hat = -k2 * omega_hat[c]
        laplacian[c] = cp.real(ifftn(laplacian_hat, axes=(0,1,2)))
        del laplacian_hat
        cp.get_default_memory_pool().free_all_blocks()

    return nu*laplacian + stretch

def time_evolve(omega, nu, dt):
    return omega + dt * vorticity_rhs(omega, nu)

# ---- Main compute function ---
def main_compute(omega_t, omega_tpdt, grad_u, alpha, nu,
                 j_min, j_max, dt, eps, plot=False, validate=False, context='mainrun'):
    N = omega_t.shape[1]
    shells = list(range(j_min, j_max + 1))
    per_shell = {}
    norm_mids, norm_wds, align_mids, align_wds = [], [], [], []

    if grad_u is None:
        print("  (Computing velocity and grad_u from vorticity via Biot-Savart)")
        u = biot_savart(omega_t)
        grad_u = compute_gradients(u)

    S = compute_strain(grad_u)
    e1 = principal_strain_evectors(S)

    for j in shells:
        mask = dyadic_filter(N, j)
        omega_j = apply_dyadic_filter(omega_t, mask)
        if cp.all(cp.abs(omega_j) < 1e-12):
            norm_j = (0.0, 0.0)
            align_j = (0.0, 0.0)
        else:
            norms = cp.sqrt(cp.sum(omega_j**2, axis=0))
            maxval = float(norms.max())
            esterr = 1e-7 * maxval
            norm_j = (maxval, esterr)
            norms_flat = norms.ravel()
            w_dot_e_flat = cp.sum(omega_j*e1, axis=0).ravel()
            selector = (norms_flat > eps)
            if cp.any(selector):
                num = w_dot_e_flat[selector]
                denom = norms_flat[selector] + 1e-30
                ratios = cp.abs(num / denom)
                ratios_np = cp.asnumpy(ratios)
                aligns = [(float(r), 1e-7*float(r)) for r in ratios_np]
                align_j = ia_mean(aligns)
            else:
                align_j = (0.0, 0.0)
        per_shell[j] = {'norm': norm_j, 'align': align_j}
        norm_mids.append(norm_j[0])
        norm_wds.append(norm_j[1])
        align_mids.append(align_j[0])
        align_wds.append(align_j[1])

    Y = ia_sum([
        ia_mul((2**(alpha*j),0), s['norm'])
        for j,s in per_shell.items()
    ])
    align_sup = ia_max([s['align'] for s in per_shell.values()])

    if omega_tpdt is not None:
        per_shell_p = []
        for j in shells:
            mask = dyadic_filter(N, j)
            omega_j_p = apply_dyadic_filter(omega_tpdt, mask)
            if cp.all(cp.abs(omega_j_p) < 1e-12):
                norm_j_p = (0.0, 0.0)
            else:
                norm_j_p = shell_norm_interval(omega_j_p)
            per_shell_p.append(ia_mul((2**(alpha*j),0), norm_j_p))
        Y_p = ia_sum(per_shell_p)
        dYdt = ia_div(ia_sub(Y_p, Y), (dt,0))
    else:
        dYdt = None

    delta = 2.0 / alpha
    Lambda = 2.0**alpha
    C_B, C_P, C_C, C_R, C_overlap = 32.0, 2.0, 4.0, 1.73205, 3.0
    C_nl = C_B * C_P * C_C * C_R * C_overlap

    if dYdt is not None:
        Ysq = ia_mul(Y, Y)
        Ypow = ia_pow(Y, 1 + delta)
        rhs1 = ia_mul((C_nl,0), ia_mul(Ysq, align_sup))
        rhs2 = ia_mul((nu * Lambda,0), Ypow)
        rhs = ia_sub(rhs1, rhs2)
        holds = dYdt[0] + dYdt[1] <= rhs[0] - rhs[1]
    else:
        rhs = None
        holds = None

    diagnostics = {
        'context': context,
        'shells': [
            {'j': int(j),
             'norm': interval_repr(per_shell[j]['norm']),
             'align': interval_repr(per_shell[j]['align'])}
            for j in shells
        ],
        'Y': interval_repr(Y),
        'align_sup': interval_repr(align_sup),
        'delta': delta,
        'Lambda': Lambda,
        'C_nl': C_nl,
        'dYdt': interval_repr(dYdt) if dYdt is not None else "",
        'RHS': interval_repr(rhs) if rhs is not None else "",
        'holds': holds,
        '_entropic_per_shell': [
            dict(j=int(j), norm=per_shell[j]['norm'])
            for j in shells
        ]
    }

    print(f"\nRecursive norm Y(t):          {diagnostics['Y']}")
    print(f"sup_j alignment:              {diagnostics['align_sup']}")
    if dYdt is not None:
        print(f"dY/dt:                        {diagnostics['dYdt']}")
        print(f"RHS:                          {diagnostics['RHS']}")
        if holds:
            print("[Inequality verified]: This timestep is within bounds.\n")
        else:
            print("!!! WARNING: Inequality violated (with certified intervals) !!!\n")
    else:
        print("dY/dt not available (only single time). No inequality check.")

    if plot:
        fig, axs = plt.subplots(2,1,figsize=(7,6), sharex=True)
        axs[0].bar(shells, norm_mids, yerr=norm_wds, capsize=5)
        axs[0].set_ylabel(r"$\|\Delta_j\omega\|_{L^\infty}$")
        axs[1].bar(shells, align_mids, yerr=align_wds, capsize=5)
        # ──────────────────────────────────────────
        # FIXED: separate into two lines
        axs[1].set_ylabel(r"$\mathcal{A}_j$")
        axs[1].set_xlabel('Shell $j$')
        # ──────────────────────────────────────────
        plt.suptitle("Per-shell norms and alignments")
        plt.tight_layout()
        fig.savefig(f"/content/output/per_shell_{context}.png")

        if dYdt is not None:
            plt.figure()
            plt.bar(["dY/dt","RHS"], [dYdt[0], rhs[0]], yerr=[dYdt[1], rhs[1]], capsize=8)
            plt.title("Recursive damping inequality: LHS vs RHS\n(mid ± width)")
            plt.savefig(f"/content/output/inequality_{context}.png")

    with open(f"/content/output/diagnostics_log_{context}.json",'w', encoding="utf-8") as f:
        json.dump(diagnostics, f, indent=2)

    return {
        'holds': holds,
        'dYdt': dYdt,
        'rhs': rhs,
        'Y': Y,
        'align_sup': align_sup,
        'delta': delta,
        'Lambda': Lambda,
        'C_nl': C_nl,
        '_entropic_per_shell': [
            dict(j=int(j), norm=per_shell[j]['norm'])
            for j in shells
        ]
    }, diagnostics

# (the rest of the file—export_certificate, validate_damping_terms, tests, main()—remains unchanged)
def export_certificate(fname, dYdt, rhs, C_nl, delta, Lambda, Y, align_sup):
    lines = []
    lines.append("-- Certified Clay-Companion Recursive Inequality Certificate --")
    lines.append(f"-- C_nl = {C_nl}")
    lines.append(f"-- delta = {delta}")
    lines.append(f"-- Lambda = {Lambda}")
    lines.append(f"-- Y = {Y}")
    lines.append(f"-- align_sup = {align_sup}")
    lines.append(f"-- Computed (LHS = dY/dt): {dYdt[0]} ± {dYdt[1]}")
    lines.append(f"-- Computed (RHS):          {rhs[0]} ± {rhs[1]}")
    lines.append("theorem verified_t0 : dYdt ≤ RHS := by float_solver")
    with open(fname, "w", encoding="utf-8") as f:
        for L in lines:
            f.write(L + '\n')
    print(f"Wrote Lean-style proof certificate to {fname}")

def validate_damping_terms(Y, dYdt, rhs, align_sup, delta, Lambda, C_nl):
    assert isinstance(Y, tuple) and isinstance(dYdt, tuple) and isinstance(rhs, tuple), \
        "Y, dY/dt, and rhs must be interval tuples"
    assert align_sup[0] <= 1.0 + 1e-6, f"Alignment factor exceeds 1 (align_sup={align_sup[0]})"
    assert Lambda > 1.0, f"Lambda must be > 1 (Lambda={Lambda})"
    assert delta > 0.0, f"delta must be positive (delta={delta})"
    assert C_nl > 0.0, f"C_nl must be positive (C_nl={C_nl})"
    assert rhs[0] >= 0 or abs(rhs[0]) < 1e-5, f"RHS is negative ({rhs[0]:.2e}), implies anti-damping"
    assert dYdt[0] < 0, f"dY/dt is positive ({dYdt[0]:.2e}) - unexpected growth"
    return True

def compute_entropy(shells):
    H = 0.0
    for s in shells:
        if isinstance(s['norm'], str):
            parts = s['norm'].split('±')
            mid = float(parts[0].strip())
        else:
            mid = s['norm'][0]
        if mid > 0:
            H += mid * np.log(mid)
    return H

def save_forensic_failure(timestep, omega, Y, dYdt, rhs, align_sup, folder="/content/output"):
    os.makedirs(folder, exist_ok=True)
    hash_key = hashlib.md5(str((Y, dYdt, rhs, align_sup)).encode()).hexdigest()[:12]
    try:
        cp.save(f"{folder}/counterexample_t{timestep:03d}.npy", omega)
    except Exception:
        np.save(f"{folder}/counterexample_t{timestep:03d}.npy", cp.asnumpy(omega))
    with open(f"{folder}/failure_log_t{timestep:03d}.txt", "w", encoding="utf-8") as f:
        f.write(f"FAILURE HASH: {hash_key}\n")
        f.write(f"Y(t): {Y}\n")
        f.write(f"dY/dt: {dYdt}\n")
        f.write(f"RHS: {rhs}\n")
        f.write(f"align_sup: {align_sup}\n")

def float_sanity_check(intervals):
    warnings = []
    for name, val in intervals.items():
        if val is None:
            continue
        if abs(val[0]) > 1e-12 and val[1]/abs(val[0]) > 1e-2:
            warnings.append(f"⚠️ Interval {name} is unstable: {val[0]:.5g} ± {val[1]:.2e}")
    return warnings

# --- Additional test routines ---
def stress_commutator(N=64, alpha=2.5, j_min=2, j_max=6,
                      dt=1e-4, timesteps=1, strict=False,
                      plot=False, run_validation_checks=False,
                      nu=0.01, export_cert=False, eps=1e-10):
    print("[commutator test] j_min={}, j_max={}, timesteps={}".format(j_min, j_max, timesteps))
    omega = make_shell_localized(N, j=2) + make_shell_localized(N, j=5)
    for t in range(timesteps):
        context = f"commutator_test_t{t}"
        omega_next = time_evolve(omega, nu, dt)
        res, diagnostics = main_compute(
            omega, omega_next,
            grad_u=None,
            alpha=alpha,
            nu=nu,
            j_min=j_min,
            j_max=j_max,
            dt=dt,
            eps=eps,
            plot=plot,
            validate=run_validation_checks,
            context=context
        )

        # NEW: certificate dump for this test step
        log_recursive_damping_certificate(
            res,
            t * dt,
            nu,
            alpha,
            log_path="recursive_damping_certificate.jsonl",
            Y_star=0.0,
            tolerance=1e-6,
        )

        if run_validation_checks:
            try:
                validate_damping_terms(
                    res['Y'], res['dYdt'], res['rhs'],
                    res['align_sup'], res['delta'],
                    res['Lambda'], res['C_nl']
                )
            except AssertionError as e:
                print(f"[VALIDATION ERROR] {e}")
                save_forensic_failure(
                    timestep=t,
                    omega=omega,
                    Y=res['Y'],
                    dYdt=res['dYdt'],
                    rhs=res['rhs'],
                    align_sup=res['align_sup']
                )
                if strict:
                    sys.exit(1)

            w = float_sanity_check({
                'Y': res['Y'],
                'dY/dt': res['dYdt'],
                'RHS': res['rhs'],
                'align_sup': res['align_sup']
            })
            for ww in w:
                print(ww)

        if export_cert and (res['dYdt'] is not None) and (res['rhs'] is not None):
            export_certificate(
                f"/content/output/certificate_{context}.lean",
                res['dYdt'],
                res['rhs'],
                res['C_nl'],
                res['delta'],
                res['Lambda'],
                res['Y'],
                res['align_sup']
            )

        if strict and res['holds'] is False:
            print("FAIL: Damping inequality violated in commutator test at step", t)
            save_forensic_failure(
                timestep=t,
                omega=omega,
                Y=res['Y'],
                dYdt=res['dYdt'],
                rhs=res['rhs'],
                align_sup=res['align_sup']
            )
            sys.exit(1)

        omega = omega_next.copy()

    print("[commutator test] Completed.")

def simulate_blowup(N=64, alpha=2.5, j_min=1, j_max=5,
                    dt=1e-4, timesteps=1, strict=False,
                    plot=False, run_validation_checks=False,
                    nu=0.01, export_cert=False, eps=1e-10):
    print("[blowup test] j_min={}, j_max={}, timesteps={}".format(j_min, j_max, timesteps))
    omega = make_vortex_tube(N) * 10.0
    e1 = cp.ones_like(omega)
    norms = cp.sqrt(cp.sum(omega**2, axis=0, keepdims=True))
    omega = e1 * norms

    for t in range(timesteps):
        context = f"blowup_test_t{t}"
        omega_next = time_evolve(omega, nu, dt)
        res, diagnostics = main_compute(
            omega, omega_next,
            grad_u=None,
            alpha=alpha,
            nu=nu,
            j_min=j_min,
            j_max=j_max,
            dt=dt,
            eps=eps,
            plot=plot,
            validate=run_validation_checks,
            context=context
        )

        # NEW: certificate dump for this test step
        log_recursive_damping_certificate(
            res,
            t * dt,
            nu,
            alpha,
            log_path="recursive_damping_certificate.jsonl",
            Y_star=0.0,
            tolerance=1e-6,
        )

        if run_validation_checks:
            try:
                validate_damping_terms(
                    res['Y'], res['dYdt'], res['rhs'],
                    res['align_sup'], res['delta'],
                    res['Lambda'], res['C_nl']
                )
            except AssertionError as e:
                print(f"[VALIDATION ERROR] {e}")
                save_forensic_failure(
                    timestep=t,
                    omega=omega,
                    Y=res['Y'],
                    dYdt=res['dYdt'],
                    rhs=res['rhs'],
                    align_sup=res['align_sup']
                )
                if strict:
                    sys.exit(1)

            w = float_sanity_check({
                'Y': res['Y'],
                'dY/dt': res['dYdt'],
                'RHS': res['rhs'],
                'align_sup': res['align_sup']
            })
            for ww in w:
                print(ww)

        if export_cert and (res['dYdt'] is not None) and (res['rhs'] is not None):
            export_certificate(
                f"/content/output/certificate_{context}.lean",
                res['dYdt'],
                res['rhs'],
                res['C_nl'],
                res['delta'],
                res['Lambda'],
                res['Y'],
                res['align_sup']
            )

        if strict and res['holds'] is False:
            print("FAIL: Damping inequality violated in blowup test at step", t)
            save_forensic_failure(
                timestep=t,
                omega=omega,
                Y=res['Y'],
                dYdt=res['dYdt'],
                rhs=res['rhs'],
                align_sup=res['align_sup']
            )
            sys.exit(1)

        omega = omega_next.copy()

    print("[blowup test] Completed.")

def time_reverse_test(N=64, alpha=2.5, j_min=2, j_max=5,
                      dt=1e-4, timesteps=1, strict=False,
                      plot=False, run_validation_checks=False,
                      nu=0.01, export_cert=False, eps=1e-10):
    print("[time reversal test] j_min={}, j_max={}, timesteps={}".format(j_min, j_max, timesteps))
    omega_t = make_vortex_tube(N)
    for t in range(timesteps):
        omega_tpdt = time_evolve(omega_t, nu, dt)
        omega_back = time_evolve(omega_tpdt, nu, -dt)
        err = float(cp.abs(omega_back - omega_t).max())
        print(f"[time reverse] Step={t}, forward->back error = {err:.3e}")

        res, diagnostics = main_compute(
            omega_t, omega_tpdt,
            grad_u=None,
            alpha=alpha,
            nu=nu,
            j_min=j_min,
            j_max=j_max,
            dt=dt,
            eps=eps,
            plot=plot,
            validate=run_validation_checks,
            context=f"time_reverse_t{t}"
        )

        # NEW: certificate dump for this test step
        log_recursive_damping_certificate(
            res,
            t * dt,
            nu,
            alpha,
            log_path="recursive_damping_certificate.jsonl",
            Y_star=0.0,
            tolerance=1e-6,
        )

        if run_validation_checks:
            try:
                validate_damping_terms(
                    res['Y'], res['dYdt'], res['rhs'],
                    res['align_sup'], res['delta'],
                    res['Lambda'], res['C_nl']
                )
            except AssertionError as e:
                print(f"[VALIDATION ERROR] {e}")
                save_forensic_failure(
                    timestep=t,
                    omega=omega_t,
                    Y=res['Y'],
                    dYdt=res['dYdt'],
                    rhs=res['rhs'],
                    align_sup=res['align_sup']
                )
                if strict:
                    sys.exit(1)

            w = float_sanity_check({
                'Y': res['Y'],
                'dY/dt': res['dYdt'],
                'RHS': res['rhs'],
                'align_sup': res['align_sup']
            })
            for ww in w:
                print(ww)

        if export_cert and (res['dYdt'] is not None) and (res['rhs'] is not None):
            export_certificate(
                f"/content/output/certificate_time_reverse_t{t}.lean",
                res['dYdt'],
                res['rhs'],
                res['C_nl'],
                res['delta'],
                res['Lambda'],
                res['Y'],
                res['align_sup']
            )

        if strict and res['holds'] is False:
            print("FAIL: Damping inequality violated in time_reverse test at step", t)
            save_forensic_failure(
                timestep=t,
                omega=omega_t,
                Y=res['Y'],
                dYdt=res['dYdt'],
                rhs=res['rhs'],
                align_sup=res['align_sup']
            )
            sys.exit(1)

    print("[time reverse test] Completed.")

def main():
    parser = argparse.ArgumentParser(
        description="Recursive damping proof + additional tests."
    )

    # Base / shared arguments
    parser.add_argument('--N', type=int, default=512,
                        help="Grid size (default 512).")
    parser.add_argument('--alpha', type=float, default=2.5,
                        help="Scaling exponent alpha (default 2.5).")
    parser.add_argument('--nu', type=float, default=0.01,
                        help="Viscosity coefficient (default 0.01).")
    parser.add_argument('--eps', type=float, default=1e-10,
                        help="Threshold epsilon for ignoring near-zero shells (default 1e-10).")
    parser.add_argument('--plot', action='store_true',
                        help="Enable plotting per-shell metrics.")

    # NEW certificate-related CLI flags
    parser.add_argument('--damping_log', type=str,
                        default="recursive_damping_certificate.jsonl",
                        help="Where to append the damping certificate (JSONL).")
    parser.add_argument('--damping_tol', type=float, default=1e-6,
                        help="Tolerance for the residual check (default 1e-6).")
    parser.add_argument('--Y_star', type=float, default=0.0,
                        help="Contraction threshold Y* (default 0).")

    parser.add_argument('--export_cert', action='store_true',
                        help="Export Lean-style proof certificates.")
    parser.add_argument('--run_validation_checks', action='store_true',
                        help="Perform extra validation checks on intervals.")
    parser.add_argument('--input', type=str,
                        help="Path to omega_t.npy for the initial vorticity field.")
    parser.add_argument('--input2', type=str,
                        help="Path to omega_tpdt.npy for the next-timestep vorticity field.")
    parser.add_argument('--grad_u', type=str,
                        help="Optional path to grad_u.npy if precomputed.")

    parser.add_argument('--run_blowup_test', action='store_true',
                        help="Run the blowup test scenario after main run.")
    parser.add_argument('--run_commutator_test', action='store_true',
                        help="Run the commutator test scenario after main run.")
    parser.add_argument('--run_time_reverse_test', action='store_true',
                        help="Run the time-reverse test scenario after main run.")

    parser.add_argument('--j_min', type=int, default=6,
                        help="Minimum exponent j for the dyadic shell range (default 6).")
    parser.add_argument('--j_max', type=int, default=9,
                        help="Maximum exponent j for the dyadic shell range (default 9).")
    parser.add_argument('--dt', type=float, default=1e-4,
                        help="Time step for vorticity evolution (default 1e-4).")
    parser.add_argument('--timesteps', type=int, default=0,
                        help="Number of time steps to run and certify (default 0).")
    parser.add_argument('--strict', action='store_true',
                        help="Exit with error if any violation occurs in the inequality check.")

    args = parser.parse_args()

    run_blowup = args.run_blowup_test
    run_commutator = args.run_commutator_test
    run_time_reverse = args.run_time_reverse_test

    # 1) Load or generate initial field
    if args.input is not None:
        omega_t = cp.load(args.input)
        print(f"Loaded omega_t from {args.input}, shape {omega_t.shape}.")
    else:
        print("No --input provided. Generating a synthetic 'vortex tube' field.")
        omega_t = make_vortex_tube(args.N)

    # 2) Optionally load next-timestep field
    omega_tpdt = None
    if args.input2 is not None:
        omega_tpdt = cp.load(args.input2)
        print(f"Loaded omega_tpdt from {args.input2}, shape {omega_tpdt.shape}.")

    # 3) Optionally load grad_u if provided
    grad_u = None
    if args.grad_u is not None and os.path.isfile(args.grad_u):
        grad_u = cp.load(args.grad_u)
        print(f"Loaded grad_u from {args.grad_u}, shape {grad_u.shape}.")

    # 4) Multi-step evolution if timesteps > 0
    if args.timesteps > 0:
        print(f"Running {args.timesteps} time steps with dt={args.dt} and verifying at each step.")
        omega = omega_t.copy()
        for t in range(args.timesteps):
            context = f"timestep_{t:03d}"
            omega_next = time_evolve(omega, args.nu, args.dt)
            res, diagnostics = main_compute(
                omega, omega_next,
                grad_u,
                args.alpha,
                args.nu,
                j_min=args.j_min,
                j_max=args.j_max,
                dt=args.dt,
                eps=args.eps,
                plot=args.plot,
                validate=args.run_validation_checks,
                context=context
            )

            if args.run_validation_checks:
                try:
                    validate_damping_terms(
                        res['Y'], res['dYdt'], res['rhs'],
                        res['align_sup'], res['delta'],
                        res['Lambda'], res['C_nl']
                    )
                except AssertionError as e:
                    print(f"[VALIDATION ERROR] {e}")
                    save_forensic_failure(
                        timestep=t,
                        omega=omega,
                        Y=res['Y'],
                        dYdt=res['dYdt'],
                        rhs=res['rhs'],
                        align_sup=res['align_sup']
                    )
                    if args.strict:
                        sys.exit(1)

                warnings = float_sanity_check({
                    'Y': res['Y'],
                    'dY/dt': res['dYdt'],
                    'RHS': res['rhs'],
                    'align_sup': res['align_sup']
                })
                for w in warnings:
                    print(w)
                H = compute_entropy(res['_entropic_per_shell'])
                print(f"[entropy] H(t) = {H:.4f}")

            if args.export_cert and (res['dYdt'] is not None) and (res['rhs'] is not None):
                export_certificate(
                    f"/content/output/certificate_{context}.lean",
                    res['dYdt'],
                    res['rhs'],
                    res['C_nl'],
                    res['delta'],
                    res['Lambda'],
                    res['Y'],
                    res['align_sup']
                )

            if args.strict and res['holds'] is False:
                print(f"FAIL: Damping inequality violated at timestep {t}")
                save_forensic_failure(
                    timestep=t,
                    omega=omega,
                    Y=res['Y'],
                    dYdt=res['dYdt'],
                    rhs=res['rhs'],
                    align_sup=res['align_sup']
                )
                sys.exit(1)

            # NEW: write certificate line for this timestep
            log_recursive_damping_certificate(
                res,
                t * args.dt,
                args.nu,
                args.alpha,
                log_path=args.damping_log,
                Y_star=args.Y_star,
                tolerance=args.damping_tol,
            )

            omega = omega_next.copy()

        print(f"Timestep evolution complete for {args.timesteps} steps.")

    else:
        # 5) Single-step certification if timesteps == 0
        res, diagnostics = main_compute(
            omega_t, omega_tpdt,
            grad_u,
            args.alpha,
            args.nu,
            j_min=args.j_min,
            j_max=args.j_max,
            dt=args.dt,
            eps=args.eps,
            plot=args.plot,
            validate=args.run_validation_checks,
            context='mainrun'
        )

        if args.export_cert and (res['dYdt'] is not None) and (res['rhs'] is not None):
            export_certificate(
                "/content/output/certificate.lean",
                res['dYdt'],
                res['rhs'],
                res['C_nl'],
                res['delta'],
                res['Lambda'],
                res['Y'],
                res['align_sup']
            )

        if args.strict and res['holds'] is False:
            print("FAIL: Damping inequality violated for the single-step scenario.")
            save_forensic_failure(
                timestep=0,
                omega=omega_t,
                Y=res['Y'],
                dYdt=res['dYdt'],
                rhs=res['rhs'],
                align_sup=res['align_sup']
            )
            sys.exit(1)

        if args.run_validation_checks:
            try:
                validate_damping_terms(
                    res['Y'], res['dYdt'], res['rhs'],
                    res['align_sup'], res['delta'],
                    res['Lambda'], res['C_nl']
                )
            except AssertionError as e:
                print(f"[VALIDATION ERROR] {e}")
                save_forensic_failure(
                    timestep=0,
                    omega=omega_t,
                    Y=res['Y'],
                    dYdt=res['dYdt'],
                    rhs=res['rhs'],
                    align_sup=res['align_sup']
                )
                if args.strict:
                    sys.exit(1)

            warnings = float_sanity_check({
                'Y': res['Y'],
                'dY/dt': res['dYdt'],
                'RHS': res['rhs'],
                'align_sup': res['align_sup']
            })
            for w in warnings:
                print(w)
            H = compute_entropy(res['_entropic_per_shell'])
            print(f"[entropy] H(t) = {H:.4f}")

        # NEW: dump certificate for t = 0 (single-step mode)
        log_recursive_damping_certificate(
            res,
            0.0,
            args.nu,
            args.alpha,
            log_path=args.damping_log,
            Y_star=args.Y_star,
            tolerance=args.damping_tol,
        )

        print("Default evolution + certification completed successfully.")
        print("See /content/output/diagnostics_log_mainrun.json for details.")

    # Now run additional diagnostic tests (after the main run)
    if run_blowup:
        simulate_blowup(
            N=args.N, alpha=args.alpha, j_min=args.j_min, j_max=args.j_max,
            dt=args.dt, timesteps=1, strict=args.strict, plot=args.plot,
            run_validation_checks=args.run_validation_checks, nu=args.nu,
            export_cert=args.export_cert, eps=args.eps
        )

    if run_commutator:
        stress_commutator(
            N=args.N, alpha=args.alpha, j_min=args.j_min, j_max=args.j_max,
            dt=args.dt, timesteps=1, strict=args.strict, plot=args.plot,
            run_validation_checks=args.run_validation_checks, nu=args.nu,
            export_cert=args.export_cert, eps=args.eps
        )

    if run_time_reverse:
        time_reverse_test(
            N=args.N, alpha=args.alpha, j_min=args.j_min, j_max=args.j_max,
            dt=args.dt, timesteps=1, strict=args.strict, plot=args.plot,
            run_validation_checks=args.run_validation_checks, nu=args.nu,
            export_cert=args.export_cert, eps=args.eps
        )

    print("All requested runs/tests are complete. Exiting.")

if __name__ == "__main__":
    main()


Writing proof_colab.py


In [5]:
%%writefile recursive_ode_falsifier.py
#!/usr/bin/env python3
import glob
import json
import numpy as np
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
import csv
import re
import argparse
import sys
import os
import math

FALSIFIER_LOG = "/content/output/falsifier_log.txt"
SUMMARY_JSON = "/content/output/ode_sim_summary.json"
SUCCESS_CERT = "/content/output/ode_success_certificate.lean"
FAILURE_CERT = "/content/output/falsifier_certificate.lean"

def parse_interval_tuple(s):
    # ... (same as before) ...
    if isinstance(s, (int, float)):
        return float(s), 0.0
    if isinstance(s, (list, tuple)) and len(s) == 2:
        return float(s[0]), float(s[1])
    if isinstance(s, dict):
        if "mid" in s and "width" in s:
            return float(s["mid"]), float(s["width"])
    if isinstance(s, str):
        m = re.match(r"^\s*([-+0-9.eE]+)\s*±\s*([-+0-9.eE]+)\s*$", s)
        if m:
            return float(m.group(1)), float(m.group(2))
        try:
            return float(s), 0.0
        except:
            pass
    raise ValueError(f"Cannot parse interval from: {s}")

def falsify(reason, strict=False, do_lean_cert=False):
    os.makedirs(os.path.dirname(FALSIFIER_LOG), exist_ok=True)
    with open(FALSIFIER_LOG, "a") as f:
        f.write(reason.strip() + "\n")
    print("FALSIFIER:", reason.strip(), file=sys.stderr)
    if do_lean_cert:
        export_falsifier_certificate(reason)
    if strict:
        sys.exit(1)

def export_falsifier_certificate(reason, fname=FAILURE_CERT):
    with open(fname, "w") as f:
        f.write("-- Falsifier certificate: Antagonist claim\n")
        f.write("theorem proof_invalid : false := by\n")
        f.write(f"  -- {reason.strip()}\n")
        f.write("  contradiction\n")

def export_success_certificate(fname=SUCCESS_CERT):
    with open(fname, "w") as f:
        f.write("-- ODE simulation completed without overflow\n")
        f.write("theorem simulation_valid : True := by trivial\n")
    print(f"Wrote success certificate to {fname}")

def compute_entropy(per_shell):
    H = 0.0
    for s in per_shell:
        m, _ = parse_interval_tuple(s['norm'])
        if m > 0:
            H += m * np.log(m)
    return H

def load_logs():
    diagnostics = []
    shell_per_t = []
    files = sorted(glob.glob("/content/output/diagnostics_log_timestep_*.json"),
                  key=lambda f: int(re.search(r'_(\d+)\.json$', f).group(1)))
    if not files:
        raise RuntimeError("No log files found in /content/output/")
    prev_idx = -1
    for f in files:
        with open(f) as j:
            d = json.load(j)
        idx = int(re.search(r'_(\d+)\.json$', f).group(1))
        if idx <= prev_idx:
            falsify(f"Non-monotonic timestep index in {f}.", strict=False)
        prev_idx = idx
        diagnostics.append(d)
        shell_per_t.append(d.get('_entropic_per_shell', []))
    if len(files) > 1:
        idxs = [int(re.search(r'_(\d+)\.json$', f).group(1)) for f in files]
        dts = np.diff(idxs)
        dt = float(dts[0]) if np.all(dts == dts[0]) else float(np.median(dts))
    else:
        dt = 1.0
    return diagnostics, shell_per_t, dt

def main():
    parser = argparse.ArgumentParser(description="Robust recursive ODE falsifier")
    parser.add_argument("--strict", action="store_true", help="Fail hard on any ODE falsification")
    parser.add_argument("--lean_cert", action="store_true", help="Export Lean-style falsifier certificate")
    parser.add_argument("--safe", action="store_true", help="Enable safe mode with detailed logging and JSON summary")
    args, _ = parser.parse_known_args()

    diagnostics, shell_per_t, dt = load_logs()
    steps = len(diagnostics)
    t_arr = np.arange(steps) * dt

    # extract data
    Y_tuples = [parse_interval_tuple(d['Y']) for d in diagnostics]
    Y_mids = [y[0] for y in Y_tuples]
    dYdt_vals = [parse_interval_tuple(d['dYdt'])[0] if d['dYdt'] else np.nan for d in diagnostics]
    A_tuples = [parse_interval_tuple(d['align_sup']) for d in diagnostics]
    A_mids = [min(max(a[0],0.0),1.0) for a in A_tuples]  # clip A in [0,1]
    delta = float(diagnostics[0]['delta'])
    Lam = float(diagnostics[0]['Lambda'])
    C_nl = float(diagnostics[0]['C_nl'])
    nu = float(diagnostics[0].get('nu',0.01))

    # logs for safe mode
    sim_logs = []

    def ode_rhs(Y, A):
        # guard inputs
        A_clipped = float(np.clip(A, 0.0, 1.0))
        Y_safe = float(max(Y, 1e-8))
        try:
            term1 = C_nl * (Y_safe**2) * A_clipped
            term2 = nu * Lam * (Y_safe**(1.0 + delta))
        except OverflowError as e:
            raise OverflowError(f"Overflow: Y={Y_safe}, exponent={1+delta}") from e
        if not (np.isfinite(term1) and np.isfinite(term2)):
            raise ValueError(f"Non-finite term: term1={term1}, term2={term2}")
        return term1 - term2

    def sim_trajectory(Y0, A_series):
        Ys = [Y0]
        rhses = []
        for i, A in enumerate(A_series):
            t = i * dt
            Y = Ys[-1]
            try:
                rhs = ode_rhs(Y, A)
            except Exception as e:
                reason = f"Step {i}: error in RHS calculation: {e}"
                if args.safe:
                    sim_logs.append({"step": i, "Y": Y, "A": A, "error": str(e)})
                    raise
                else:
                    falsify(reason, strict=args.strict, do_lean_cert=args.lean_cert)
                    rhs = 0.0
            Y_next = Y + dt * rhs
            if not np.isfinite(Y_next):
                reason = f"Step {i}: Y_next is non-finite: {Y_next}"
                if args.safe:
                    sim_logs.append({"step": i, "Y": Y, "rhs": rhs, "Y_next": Y_next})
                    raise OverflowError(reason)
                else:
                    falsify(reason, strict=args.strict, do_lean_cert=args.lean_cert)
                    Y_next = 0.0
            rhses.append(rhs)
            if args.safe:
                residual = rhs - (Y_next - Y)/dt
                sim_logs.append({
                    "step": i,
                    "t": t,
                    "Y": Y,
                    "A": A,
                    "rhs": rhs,
                    "Y_next": Y_next,
                    "residual": residual
                })
            Ys.append(Y_next)
        return np.array(Ys), np.array(rhses)

    # run sim
    Y0 = Y_mids[0]
    success = True
    try:
        Ys_sim, rhs_vals = sim_trajectory(Y0, A_mids)
    except Exception as e:
        success = False

    # post-simulation checks
    if success:
        # sign check: no positive RHS for dissipativity
        if np.any(rhs_vals > 0):
            falsify("RHS > 0 detected (net stretching): simulation invalid",
                    strict=args.strict, do_lean_cert=args.lean_cert)
            success = False

    # save summary
    summary = {
        "status": "success" if success else "failure",
        "steps": steps,
        "max_Y": float(np.max(Ys_sim)) if success else None,
        "min_Y": float(np.min(Ys_sim)) if success else None,
        "max_rhs": float(np.max(rhs_vals)) if success else None,
        "safe_mode": args.safe
    }
    os.makedirs(os.path.dirname(SUMMARY_JSON), exist_ok=True)
    with open(SUMMARY_JSON, "w") as f:
        json.dump(summary, f, indent=2)
    print(f"Wrote simulation summary to {SUMMARY_JSON}")

    # export certificate
    if success:
        export_success_certificate()
        sys.exit(0)
    else:
        if args.lean_cert:
            export_falsifier_certificate("ODE simulation overflow or invalid")
        sys.exit(1)

if __name__ == "__main__":
    main()


Writing recursive_ode_falsifier.py


In [25]:
%%writefile proof_falsifier_engine.py
import os
import sys
import shutil
import subprocess
import time
import hashlib
import json
import numpy as np
import zipfile
import datetime
from pathlib import Path

try:
    import cupy as cp
except ImportError:
    cp = None

####### --- Synthetic Field Generators --- #######

def make_shell_localized(j=3, N=32, seed=42):
    # Shell-localized vorticity (Fourier bump at shell j)
    np.random.seed(seed)
    if cp: cp.random.seed(seed)
    omega = np.zeros((3, N, N, N), dtype=np.float64)
    grid = np.fft.fftfreq(N) * N
    Kx, Ky, Kz = np.meshgrid(grid, grid, grid, indexing='ij')
    K2 = Kx**2 + Ky**2 + Kz**2
    mask = (K2 >= 2**(2*j)) & (K2 < 2**(2*(j+1)))
    for c in range(3):
        noise = np.random.randn(N,N,N)
        fft = np.fft.fftn(noise)
        fft *= mask
        loc = np.fft.ifftn(fft).real
        omega[c] = loc / np.sqrt(np.mean(loc**2)+1e-12)
    return omega

def make_vortex_tube(N=32, amp=1.0):
    # Simple vortex tube along e1
    omega = np.zeros((3, N, N, N), dtype=np.float64)
    x = np.linspace(-1,1,N,endpoint=False)
    X,Y,Z = np.meshgrid(x,x,x,indexing='ij')
    r = np.sqrt(Y**2+Z**2)
    profile = amp * np.exp(-20*r**2)
    omega[0] = profile  # Aligned with e1
    return omega

def align_to_e1(omega):
    # Rotate max-energy direction to e1
    v = omega.reshape(3,-1).mean(axis=1)
    if np.linalg.norm(v) < 1e-8:
        return omega
    e1 = np.array([1,0,0.])
    axis = np.cross(v, e1)
    if np.linalg.norm(axis) < 1e-8:
        return omega
    axis = axis / np.linalg.norm(axis)
    theta = np.arccos(np.dot(v, e1)/(np.linalg.norm(v)*np.linalg.norm(e1)))
    from scipy.spatial.transform import Rotation as R
    Rmat = R.from_rotvec(axis*theta).as_matrix()
    return np.tensordot(Rmat, omega, axes=([1],[0]))

def combine_shells(j1=2, j2=6, N=32):
    # Blend two shell-localized fields
    w1 = make_shell_localized(j=j1,N=N,seed=10*j1)
    w2 = make_shell_localized(j=j2,N=N,seed=10*j2+1)
    return 0.5*w1 + 1.5*w2

def random_gaussian(scale=1e-2, N=32, seed=123):
    np.random.seed(seed)
    omega = scale * np.random.randn(3,N,N,N)
    return omega

def proof_evolve(omega_t, dt=1e-4):
    # Very simple Euler step for demonstration (add some noise)
    omega_tpdt = omega_t + dt * np.random.randn(*omega_t.shape) * 0.01
    return omega_tpdt

######## ---- Field Registry ---- ########

fields = [
  {"name": "shell_j3", "generator": make_shell_localized, "params": {"j":3, "seed":42}},
  {"name": "shell_j5", "generator": make_shell_localized, "params": {"j":5, "seed":43}},
  {"name": "aligned_blowup", "generator": make_vortex_tube, "post": "align_to_e1"},
  {"name": "adversarial_combo", "generator": combine_shells, "params": {"j1":2, "j2":6}},
  {"name": "random_noise", "generator": random_gaussian, "params": {"scale":1e-2, "seed":99}},
]

gen_post_map = {
    "align_to_e1": align_to_e1,
}

######## ---- Utility Functions ---- ########
def get_hash(arr):
    return hashlib.md5(arr.astype(np.float32).tobytes()).hexdigest()[:10]

def save_field(arr, fname):
    np.save(fname, arr)

def load_field(fname):
    return np.load(fname)

def get_git_hash():
    try:
        import subprocess
        git_hash = subprocess.check_output(["git", "rev-parse", "HEAD"], cwd="/content/", text=True).strip()
        return git_hash
    except:
        return None

def copy_if_exists(src, dst):
    try:
        if os.path.exists(src):
            shutil.copy(src, dst)
            return True
    except Exception as e:
        print(f"Failed to copy {src} to {dst}: {e}")
    return False

def safe_makedirs(path):
    os.makedirs(path, exist_ok=True)

def bundle_counterexamples(counterexample_paths, zip_out="/content/output/counterexample_bundle.zip"):
    with zipfile.ZipFile(zip_out, 'w', compression=zipfile.ZIP_DEFLATED) as zf:
        for cdir in counterexample_paths:
            for fname in Path(cdir).glob('*'):
                zf.write(fname, arcname=f"{Path(cdir).name}/{fname.name}")

######## ---- Metrics & Report ---- ########

summary = {
    "total_runs": 0,
    "proof_colab_failures": 0,
    "ode_falsifier_failures": 0,
    "field_failure_types": [],
    "counterexample_paths": [],
    "entropy_curves": [],
    "run_details": [],
}

######## ---- Main Adversarial Loop ---- ########

def main():
    N = 512
    output_dir = "/content/output/"
    cx_dir = os.path.join(output_dir, "counterexamples/")
    safe_makedirs(output_dir)
    safe_makedirs(cx_dir)
    git_hash = get_git_hash()
    timestamp = datetime.datetime.utcnow().isoformat() + "Z"
    global_run = 1

    for run_idx, field in enumerate(fields, 1):
        name = field['name']
        params = field.get('params', {})
        gen = field['generator']
        context = f"test_{name}_run{run_idx}"
        seed = params.get('seed', 1000+run_idx) if 'seed' in params else 1000+run_idx
        np.random.seed(seed)
        if cp: cp.random.seed(seed)
        run_info = {
            "index": run_idx,
            "context": context,
            "name": name,
            "params": params,
            "timestamp": timestamp,
            "git_hash": git_hash,
            "seed": seed,
        }
        # --- Generate vorticity field ---
        if 'params' in field:
            omega = gen(**params, N=N) if 'N' in gen.__code__.co_varnames else gen(**params)
        else:
            omega = gen(N=N)
        # Apply any postproc:
        if field.get('post'):
            omega = gen_post_map[field['post']](omega)
        omega_t = omega.astype(np.float32)
        save_field(omega_t, '/content/omega_t.npy')
        omega_tpdt = proof_evolve(omega_t)
        save_field(omega_tpdt, '/content/omega_tpdt.npy')
        cx_hash = get_hash(omega_t)
        context_full = f"{context}_{cx_hash}"
        run_info["context_full"] = context_full
        run_info["cx_hash"] = cx_hash
        # Save config:
        config_json = f"/content/output/{context_full}_config.json"
        with open(config_json,"w") as f:
            json.dump(run_info, f, indent=2)
        proof_log = os.path.join(output_dir, 'diagnostics_log.json')
        proof_cert = os.path.join(output_dir, 'proof_certificate.lean')
        falsifier_log = os.path.join(output_dir, 'falsifier_log.txt')
        falsifier_cert = os.path.join(output_dir, 'falsifier_certificate.lean')
        # --- RUN proof_colab.py ---
        proof_args = [
            sys.executable, "proof_colab.py",
            "--input", "omega_t.npy",
            "--input2", "omega_tpdt.npy",
            "--alpha", "2.5", "--plot", "--export_cert",
            "--run_validation_checks", "--strict",
            "--context", context_full
        ]
        summary["total_runs"] += 1
        proc1 = subprocess.run(proof_args, capture_output=True, cwd="/content/")
        proof_failed = proc1.returncode != 0
        ode_failed = False
        # --- RUN recursive_ode_falsifier.py ---
        if not proof_failed:
            ode_args = [sys.executable, "recursive_ode_falsifier.py", "--strict", "--lean_cert"]
            proc2 = subprocess.run(ode_args, capture_output=True, cwd="/content/")
            ode_failed = proc2.returncode != 0
        # --- Handle any failures ---
        failed = proof_failed or ode_failed
        # pass_fail tag for packaging:
        run_info["pass_fail"] = "FAIL" if failed else "PASS"

        cx_this_run_dir = ""
        if failed:
            fail_reason = []
            cxdir = os.path.join(cx_dir, context_full)
            safe_makedirs(cxdir)
            cx_this_run_dir = cxdir
            shutil.copy('/content/omega_t.npy', os.path.join(cxdir, f"{context_full}_omega_t.npy"))
            shutil.copy('/content/omega_tpdt.npy', os.path.join(cxdir, f"{context_full}_omega_tpdt.npy"))
            copy_if_exists(proof_log, os.path.join(cxdir, f"{context_full}_diagnostics_log.json"))
            copy_if_exists(proof_cert, os.path.join(cxdir, f"{context_full}_proof_certificate.lean"))
            copy_if_exists(falsifier_log, os.path.join(cxdir, f"{context_full}_falsifier_log.txt"))
            copy_if_exists(falsifier_cert, os.path.join(cxdir, f"{context_full}_falsifier_certificate.lean"))
            shutil.copy(config_json, os.path.join(cxdir, f"{context_full}_config.json"))
            with open(os.path.join(cxdir, f"{context_full}_proof_stdout.txt"),"w") as f: f.write(proc1.stdout.decode(errors='ignore'))
            with open(os.path.join(cxdir, f"{context_full}_proof_stderr.txt"),"w") as f: f.write(proc1.stderr.decode(errors='ignore'))
            if not proof_failed:
                with open(os.path.join(cxdir, f"{context_full}_ode_stdout.txt"),"w") as f: f.write(proc2.stdout.decode(errors='ignore'))
                with open(os.path.join(cxdir, f"{context_full}_ode_stderr.txt"),"w") as f: f.write(proc2.stderr.decode(errors='ignore'))
            summary["counterexample_paths"].append(cxdir)
            if proof_failed:
                summary["proof_colab_failures"] += 1
                fail_reason.append('proof_colab')
            if ode_failed:
                summary["ode_falsifier_failures"] += 1
                fail_reason.append('recursive_ode_falsifier')
            summary["field_failure_types"].append(f"{name} ({'/'.join(fail_reason)})")
        # Optionally collect entropy curve:
        try:
            diag_path = os.path.join(output_dir, 'diagnostics_log_timestep_000.json')
            if os.path.exists(diag_path):
                with open(diag_path) as f:
                    diag = json.load(f)
                if '_entropic_per_shell' in diag:
                    ent_arr = [float(s.get('norm',0)) for s in diag['_entropic_per_shell']]
                    summary["entropy_curves"].append({"context":context, "entropy": ent_arr})
        except Exception:
            pass
        # Save run details
        run_info.update({
            "proof_failed": proof_failed,
            "ode_failed": ode_failed,
            "counterexample_dir": cx_this_run_dir
        })
        summary["run_details"].append(run_info)
        result_tag = "[FAIL]" if failed else "[PASS]"
        print(f"{result_tag} {context_full} | proof_colab: {proc1.returncode} | ode: {('n/a' if proof_failed else proc2.returncode)}")
        sys.stdout.flush()

    # Final bundle zip
    if summary["counterexample_paths"]:
        bundle_counterexamples(summary["counterexample_paths"])

    # --- NEW: dump out a proof_summary.json for packaging:
    proof_summary_fn = os.path.join(output_dir, "proof_summary.json")
    with open(proof_summary_fn, "w") as f:
        json.dump({"summary": summary["run_details"]}, f, indent=2)
    print(f"Wrote summary to {proof_summary_fn}")

    # Print report
    print("------ Adversarial Falsification Test Report ------")
    print(f"Total runs: {summary['total_runs']}")
    print(f"proof_colab.py failures: {summary['proof_colab_failures']}")
    print(f"recursive_ode_falsifier.py failures: {summary['ode_falsifier_failures']}")
    print(f"Field types causing failure:\n  " + "\n  ".join(sorted(set(summary['field_failure_types']))))
    if summary["entropy_curves"]:
        print("Entropy curves available for analysis (first context shown):")
        print(summary["entropy_curves"][0])
    print(f"Counterexamples saved in: {cx_dir}")
    print(f"Bundle zip (if any): /content/output/counterexample_bundle.zip")
    print("--------------------------------------------------")

if __name__ == "__main__":
    main()


Overwriting proof_falsifier_engine.py


In [37]:
%%writefile certificate_packager.py
#!/usr/bin/env python3

import os
import sys
import json
import glob
import shutil
import hashlib
import argparse
from pathlib import Path
import zipfile
import datetime

############# Utility Functions ################

def sha256file(fn):
    h = hashlib.sha256()
    with open(fn, "rb") as f:
        while True:
            blk = f.read(65536)
            if not blk: break
            h.update(blk)
    return h.hexdigest()

def sha256_bytes(b):
    return hashlib.sha256(b).hexdigest()

def load_json(fn):
    with open(fn,"r") as f:
        return json.load(f)

def fail(msg):
    print(f"[certificate_packager.py] ERROR: {msg}", file=sys.stderr)
    sys.exit(2)

def lean_valid_certificate(lean_path):
    try:
        with open(lean_path,"r") as f:
            data = f.read()
        # patched: accept both falsifier and verified proof certificates
        return ("theorem proof_invalid" in data) or ("theorem verified_" in data)
    except Exception:
        return False

def find_file(prefix, exts, context, extra_match=None):
    """Find file with basename prefix, extension in list, containing context."""
    for e in exts:
        for match in glob.glob(f"**/*{prefix}*{context}*{e}", recursive=True):
            if extra_match and extra_match not in match: continue
            return match
    return None

def find_field_file(ref_dir, hashval, suffix="omega_t.npy"):
    cands = glob.glob(os.path.join(ref_dir, f"**/*{suffix}"), recursive=True)
    for fn in cands:
        try:
            if sha256file(fn) == hashval:
                return fn
        except: pass
    return None

def get_source_hash(path):
    # Take SHA1 of this script as generator source hash
    try:
        with open(path, "rb") as f:
            return hashlib.sha1(f.read()).hexdigest()
    except Exception:
        return "unknown"

def monotonic_decay(yt):
    if not yt or len(yt)<2: return False
    return all((a>=b) for a,b in zip(yt,yt[1:]))

############# Main Capsule Logic ###############

def package_capsules(
    proof_summary_json="/content/output/proof_summary.json",
    output_dir="/content/output/capsules/",
    verify_only=False,
    min_ris=3,
    sign_gpg=False,
):

    # -------------------------------------------------
    # Step 1: Load summary, enumerate valid runs
    # -------------------------------------------------
    if not os.path.exists(proof_summary_json):
        fail(f"proof_summary.json not found at {proof_summary_json}")
    with open(proof_summary_json,"r") as f:
        data = json.load(f)
        summary = data.get("summary", data)

    runlist = []
    for run in summary:
        # engine‐style runs use pass_fail=="PASS", dashboard‐style use verified==True
        is_pass = (run.get("pass_fail") == "PASS")
        is_verified = (run.get("verified") is True)
        if not (is_pass or is_verified):
            continue
        # drop all other overly‑strict engine filters here:
        runlist.append(run)

    if not runlist:
        fail("No valid runs found for packaging.")

    Path(output_dir).mkdir(parents=True, exist_ok=True)
    generator_src_hash = get_source_hash(__file__)
    capsules = []
    now = datetime.datetime.utcnow().isoformat()+"Z"

    for run in runlist:
        # determine context and certificate path
        if "file" in run:
            # dashboard‐style summary
            cert_path = run["file"]
            context = Path(cert_path).stem
        else:
            # engine‐style summary
            context = run['context']
            cert_path = None

        field_type = run.get("type")
        params = run.get("params",{})
        seed = params.get("seed","")
        git_commit = run.get("git_commit","")
        sha_reported = run.get("sha256") or run.get("hash_sha256")
        capsule_data = {
            "context": context,
            "field_type": field_type,
            "seed": seed,
            "params": params,
            "timestamp": now,
            "git_commit": git_commit,
            "sha256": sha_reported,
            "generator_source_hash": generator_src_hash,
            "files": {},
        }
        # ------- Locate and validate all relevant files -------
        # -- omega_t.npy (must match hash from config) [engine‐only]
        if cert_path is None:
            omega_t_fn = find_field_file("/content/output/", sha_reported, suffix="omega_t.npy")
            if not omega_t_fn:
                fail(f"Run {context}: omega_t.npy with matching hash not found.")
            if sha256file(omega_t_fn) != sha_reported:
                fail(f"Run {context}: hash mismatch for omega_t.npy.")
        else:
            omega_t_fn = None

        # -- omega_tpdt.npy (optional, but must exist if claimed)
        omega_tpdt_fn = None
        if cert_path is None:
            for cand in glob.glob(os.path.join(os.path.dirname(omega_t_fn),"*omega_tpdt.npy")):
                if os.path.exists(cand): omega_tpdt_fn = cand

        # -- Config JSON [engine‐only]
        if cert_path is None:
            config_fn = find_file(prefix="", exts=["_config.json"], context=context)
            if not config_fn:
                fail(f"Run {context}: config json missing.")
        else:
            config_fn = None

        # -- Lean certificate (proof or falsifier), must validate
        if cert_path:
            lean_fn = cert_path
        else:
            lean_fn = find_file(prefix="", exts=[".lean"], context=context)
        if not lean_fn or not lean_valid_certificate(lean_fn):
            fail(f"Run {context}: valid .lean certificate not found or invalid.")

        # -- Diagnostics, logs, preview images, CSVs
        if cert_path is None:
            diag_log = find_file(prefix="diagnostics_log", exts=[".json"], context=context)
            csv_fn   = find_file(prefix="", exts=[".csv"], context=context)
            falsifier_log = find_file(prefix="falsifier_log", exts=[".txt"], context=context)
            preview_png = find_file(prefix="", exts=[".png"], context=context)
        else:
            diag_log = csv_fn = falsifier_log = preview_png = None

        # ---- Gather all files to package
        fileset = {}
        if omega_t_fn:       fileset["omega_t.npy"]           = omega_t_fn
        if config_fn:        fileset["config.json"]            = config_fn
        fileset["certificate.lean"] = lean_fn
        if omega_tpdt_fn:    fileset["omega_tpdt.npy"]        = omega_tpdt_fn
        if diag_log:         fileset["diagnostics.json"]       = diag_log
        if csv_fn:           fileset["summary.csv"]            = csv_fn
        if falsifier_log:    fileset["falsifier_log.txt"]     = falsifier_log
        if preview_png:      fileset["preview.png"]           = preview_png

        # -- Compute all file hashes
        file_hashes = {name: sha256file(path) for name, path in fileset.items()}
        capsule_data["files"] = file_hashes
        # -- Capsule SHA: hash of (all .npy/.json/.lean data in sorted order)
        cap_bytes = b''
        for k in sorted(fileset.keys()):
            with open(fileset[k],"rb") as f:
                cap_bytes += f.read()
        capsule_sha = sha256_bytes(cap_bytes)
        capsule_data["capsule_sha256"] = capsule_sha

        # -- Archive into ZIP
        capsule_name = f"{context}_capsule.zip"
        capsule_fn = os.path.join(output_dir, capsule_name)
        with zipfile.ZipFile(capsule_fn,"w",compression=zipfile.ZIP_DEFLATED) as zf:
            for fname, src in fileset.items():
                zf.write(src, arcname=fname)
        # Save manifest
        manifest_fn = os.path.join(output_dir, f"{context}_capsule_manifest.json")
        with open(manifest_fn, "w") as mf:
            json.dump(capsule_data, mf, indent=2)
        # Add manifest into zip
        with zipfile.ZipFile(capsule_fn,"a",compression=zipfile.ZIP_DEFLATED) as zf:
            zf.write(manifest_fn,"capsule_manifest.json")
        # Optionally: GPG-sign zip (placeholder)
        if sign_gpg:
            sigtxt = f"signed-by-certificate-packager:{now}"
            sigfile = os.path.join(output_dir, f"{context}_capsule_signature.txt")
            with open(sigfile, "w") as f:
                f.write(sigtxt)
            with zipfile.ZipFile(capsule_fn,"a",compression=zipfile.ZIP_DEFLATED) as zf:
                zf.write(sigfile, "capsule_signature.txt")

        print(f"[OK] Capsule created: {capsule_fn}")
        capsules.append({
            "context": context,
            "capsule_zip": capsule_fn,
            "capsule_manifest": manifest_fn,
            "capsule_sha256": capsule_sha
        })

    # ----------- Summarize ---------------
    print(f"Packaged {len(capsules)} archive capsules into {output_dir}.")
    # Master manifest
    mani = {
        "capsules": capsules,
        "timestamp": now,
        "generator_source_hash": generator_src_hash
    }
    with open(os.path.join(output_dir,"capsule_manifest_master.json"),"w") as f:
        json.dump(mani, f, indent=2)
    print("Master manifest written.")

##########################################

def verify_capsules(capsule_dir="/content/output/capsules/"):
    print(f"Verifying capsules in {capsule_dir} ...")
    manifests = glob.glob(os.path.join(capsule_dir,"*_capsule_manifest.json"))
    zips = glob.glob(os.path.join(capsule_dir,"*_capsule.zip"))
    for mani in manifests:
        with open(mani,"r") as f:
            cap = json.load(f)
        capsule_sha = cap.get("capsule_sha256")
        context = cap.get("context")
        zipfile_path = os.path.join(capsule_dir, f"{context}_capsule.zip")
        if not os.path.exists(zipfile_path): fail(f"Capsule zip missing: {zipfile_path}")
        with zipfile.ZipFile(zipfile_path,"r") as zf:
            for fname, file_sha in cap["files"].items():
                try:
                    data = zf.read(fname)
                    if sha256_bytes(data) != file_sha:
                        fail(f"[{context}] Hash mismatch for {fname} inside {zipfile_path}")
                except KeyError:
                    fail(f"[{context}] File missing in archive: {fname}")
            mani_data = zf.read("capsule_manifest.json")
            if sha256_bytes(mani_data) != sha256_bytes(json.dumps(cap,sort_keys=True,indent=2).encode()):
                print(f"[WARN] Capsule manifest hash mismatch for {context}")
        cert_fname = None
        for fname in cap["files"]:
            if fname.endswith(".lean"):
                cert_fname = fname
        if cert_fname:
            with zipfile.ZipFile(zipfile_path,"r") as zf:
                cert_data = zf.read(cert_fname).decode(errors='ignore')
                if "theorem proof_invalid" not in cert_data and "theorem verified_" not in cert_data:
                    fail(f"[{context}] Lean certificate invalid in archive.")
    print("All capsules verified OK.")

##########################################

def main():
    parser = argparse.ArgumentParser(description="Pack and validate proof certificate capsules.")
    parser.add_argument("--summary", type=str, default="/content/output/proof_summary.json")
    parser.add_argument("--output_dir", type=str, default="/content/output/capsules/")
    parser.add_argument("--verify-only", action="store_true")
    parser.add_argument("--sign-gpg", action="store_true")
    parser.add_argument("--min_ris", type=int, default=3)
    args = parser.parse_args()
    if args.verify_only:
        verify_capsules(args.output_dir)
    else:
        package_capsules(
            proof_summary_json=args.summary,
            output_dir=args.output_dir,
            verify_only=False,
            min_ris=args.min_ris,
            sign_gpg=args.sign_gpg
        )

if __name__ == "__main__":
    main()


Overwriting certificate_packager.py


In [8]:
%%writefile lean_validator.py
#!/usr/bin/env python3

import os
import sys
import re
import csv
import json
import glob
import hashlib
import argparse
import subprocess
from pathlib import Path
from collections import defaultdict
from datetime import datetime

# =================== Helper Functions ===================

def sha256file(fn):
    h = hashlib.sha256()
    with open(fn, "rb") as f:
        for chunk in iter(lambda: f.read(65536), b''):
            h.update(chunk)
    return h.hexdigest()

def md5file(fn):
    h = hashlib.md5()
    with open(fn, "rb") as f:
        for chunk in iter(lambda: f.read(65536), b''):
            h.update(chunk)
    return h.hexdigest()

def get_lean_version():
    try:
        lean_version = subprocess.getoutput("lean --version").strip()
        if lean_version:
            return lean_version
    except Exception:
        pass
    return "Lean unavailable (syntax-only mode)"

def run_lean_check(lean_path):
    try:
        completed = subprocess.run(['lean', '--check', lean_path], capture_output=True, text=True, timeout=15)
        return completed.returncode, completed.stdout, completed.stderr
    except Exception as ex:
        return -99, "", f"EXCEPTION: {ex}"

def parse_theorem_header(contents):
    """
    Extract theorem name, type ("false" or "true"), and statement location.
    """
    header_pattern = re.compile(r'theorem\s+([A-Za-z0-9_]+)\s*:\s*([^:]+):=\s*by', re.S)
    m = header_pattern.search(contents)
    if not m:
        # fallback: more flexible for Lean 4 theorem syntax
        header_pattern2 = re.compile(r'theorem\s+([A-Za-z0-9_]+)\s*:\s*(.*?)\s*:=\s*by', re.S)
        m = header_pattern2.search(contents)
    if m:
        name = m.group(1).strip()
        thm_type = m.group(2).strip()
        is_false = "false" in thm_type
        is_true = "true" in thm_type
        start = m.end()
        return name, (is_false, is_true), start
    return None, (None, None), -1

def extract_proof_body(contents):
    """
    Everything after ':= by'
    """
    sp = contents.split(":= by",1)
    if len(sp)<2:
        return ""
    return sp[1].strip()

def count_proof_steps(body):
    return len([line for line in body.splitlines() if line.strip() and not line.strip().startswith("--")])

def classify_lean_error(stderr):
    if not stderr:
        return None
    if "unknown identifier" in stderr.lower():
        return "Unknown Identifier"
    if "syntax error" in stderr.lower():
        return "Syntax Error"
    if "contradiction" in stderr.lower():
        return "Contradiction"
    if "timeout" in stderr.lower():
        return "Timeout/Tactic Fail"
    if "error" in stderr.lower():
        return "General Error"
    return "Unknown"

def load_json_if_exists(path):
    if not os.path.exists(path): return None
    with open(path,"r") as f:
        return json.load(f)

def smart_str(x):
    if isinstance(x, str): return x
    if isinstance(x, (list, dict)): return json.dumps(x)
    return str(x)

# =================== Main Validation ====================

def main():
    parser = argparse.ArgumentParser(description="Lean certificate validator and proof auditor")
    parser.add_argument("--dir", type=str, required=True, help="Directory with .lean files")
    parser.add_argument("--strict", action="store_true", default=False)
    parser.add_argument("--check_duplicate_ast", action="store_true", default=False)
    args = parser.parse_args()

    root = args.dir
    lean_files = sorted(glob.glob(os.path.join(root, "**/*.lean"), recursive=True))

    lean_version = get_lean_version()
    print(f"Lean version: {lean_version}")

    results = []
    context_proofbody_map = dict()
    ast_hash_map = defaultdict(list)
    duplicates = []
    vacuous = []
    config_mismatches = []
    fail_compiles = []
    lean_hash_set = set()

    for lf in lean_files:
        d = {}
        d['lean_path'] = lf
        d['lean_sha256'] = sha256file(lf)
        d['lean_md5'] = md5file(lf)
        lean_hash_set.add(d['lean_sha256'])
        with open(lf,"r") as f:
            contents = f.read()
        d['lean_size_bytes'] = len(contents.encode())
        theorem_name, (is_false, is_true), thmbody_start = parse_theorem_header(contents)
        d['theorem_name'] = theorem_name
        d['proves_false'] = bool(is_false)
        d['proves_true'] = bool(is_true)
        proof_body = extract_proof_body(contents)
        d['proof_lines'] = count_proof_steps(proof_body)
        d['proof_body_ast_hash'] = hashlib.sha256(proof_body.encode()).hexdigest()
        if args.check_duplicate_ast:
            ast_hash_map[d['proof_body_ast_hash']].append((lf,theorem_name))
        d['lean_version'] = lean_version
        # Trivial/vacuous proof check:
        tauto_match = re.match(r'^\s*(contradiction|trivial)\s*$', proof_body,re.I)
        d['is_trivial'] = bool(tauto_match) or \
            (d['proves_true'] and d['proof_lines']<=1)
        # Associated config
        cfg_path = lf.replace(".lean", "_config.json")
        config = load_json_if_exists(cfg_path)
        d['config_path'] = cfg_path if config else None
        d['has_config'] = bool(config)
        context_hash = config.get("context_full") or config.get("context") if config else None
        d["context_config"] = context_hash
        d['config_field_type'] = config.get("type") if config else "unknown"
        d['config_seed'] = smart_str(config.get("params",{}).get("seed")) if config else ""
        d['config_hash'] = config.get("hash_sha256") or config.get("cx_hash") if config else None
        # RIS scoring
        RIS = 0
        # 1. Compiles under Lean
        rc, out, err = run_lean_check(lf)
        d['lean_compile_returncode'] = rc
        d['lean_stdout'] = out
        d['lean_stderr'] = err
        d['lean_stderr_hash'] = hashlib.sha256((err or "").encode()).hexdigest()
        d['lean_check_status'] = ("PASS" if rc==0 else "FAIL")
        if rc==0:
            RIS += 1
        else:
            fail_compiles.append(lf)
        # 2. Config exists and matches SHA (if possible)
        if config and (
            d['lean_sha256'] == d['config_hash'] or not d['config_hash']
        ):
            RIS += 1
        else:
            config_mismatches.append(lf)
        # 3. Theorem name includes context hash
        if theorem_name and context_hash and context_hash in theorem_name:
            RIS += 1
        else:
            config_mismatches.append(lf)
        # 4. Proof is not trivial/vacuous
        if not d['is_trivial'] and d['proves_false']:
            RIS += 1
        else:
            vacuous.append(lf)
        # 5. Unique proof body
        is_unique = (ast_hash_map[d['proof_body_ast_hash']]==[(lf, theorem_name)]) if args.check_duplicate_ast else True
        if is_unique:
            RIS += 1
        else:
            duplicates.append(lf)
        d['RIS'] = RIS

        # Redundancy check (populate later)
        context_proofbody_map[(context_hash or theorem_name)] = d['proof_body_ast_hash']
        # Collect theorem duplication
        results.append(d)

    # After scan: detect duplicate ASTs (ignore self)
    if args.check_duplicate_ast:
        for proof_hash, lst in ast_hash_map.items():
            if len(lst)>1:
                for lf, thm in lst:
                    for d in results:
                        if d['lean_path']==lf:
                            d['is_duplicate_ast'] = True
                duplicates.extend([x[0] for x in lst if x[0] not in duplicates])

    # ==== Export .csv ====
    summary_csv = os.path.join(root, "lean_validation_summary.csv")
    fieldnames = [
        "lean_path","lean_sha256","lean_md5","theorem_name",
        "proves_false","proves_true","proof_lines","proof_body_ast_hash",
        "lean_version","has_config","context_config","config_field_type",
        "config_seed","config_hash","lean_check_status","RIS"
    ]
    with open(summary_csv,"w",newline="") as f:
        w = csv.DictWriter(f,fieldnames=fieldnames)
        w.writeheader()
        for d in results:
            w.writerow({k:smart_str(d.get(k,"")) for k in fieldnames})

    # ==== Export .md report ====
    summary_md = os.path.join(root, "lean_validation_report.md")
    with open(summary_md,"w") as f:
        f.write(f"# Lean Certificate Validation Report\n")
        f.write(f"Audit time: {datetime.utcnow().isoformat()}Z\n")
        f.write(f"Target directory: `{root}`\n")
        f.write(f"Lean version: {lean_version}\n\n")
        f.write(f"Total certificates: {len(results)}\n\n")
        f.write("## Failures to compile:\n")
        for fn in fail_compiles:
            f.write(f"- {fn}\n")
        f.write("\n## Config mismatches or theorem/context inconsistency:\n")
        for fn in config_mismatches:
            f.write(f"- {fn}\n")
        f.write("\n## Vacuous/trivial certificates:\n")
        for fn in vacuous:
            f.write(f"- {fn}\n")
        if args.check_duplicate_ast:
            f.write("\n## Duplicate AST bodies detected (possible redundancy):\n")
            for fn in duplicates:
                f.write(f"- {fn}\n")
        f.write("\n## RIS Score Histogram\n")
        ris_hist = defaultdict(int)
        for d in results:
            ris_hist[d['RIS']] += 1
        for k in sorted(ris_hist):
            f.write(f"- RIS={k}: {ris_hist[k]}\n")
        f.write("\n## Certificates with RIS < 5\n")
        for d in results:
            if d['RIS']<5:
                f.write(f"- {d['lean_path']} (RIS={d['RIS']})\n")
        f.write("\n\nDetail per certificate is available in the CSV.")

    # ==== Export .json metadata ====
    summary_json = os.path.join(root, "lean_validator_metadata.json")
    for d in results:
        # Truncate large keys to keep JSON readable
        if 'lean_stdout' in d and len(d['lean_stdout'])>500: d['lean_stdout'] = d['lean_stdout'][:500] + "···"
        if 'lean_stderr' in d and len(d['lean_stderr'])>500: d['lean_stderr'] = d['lean_stderr'][:500] + "···"
    with open(summary_json, "w") as f:
        json.dump({"certificates": results, "lean_version": lean_version, "timestamp": datetime.utcnow().isoformat()}, f, indent=2)

    print(f"\nSummary:")
    print(f"- CSV: {summary_csv}")
    print(f"- Markdown report: {summary_md}")
    print(f"- Metadata: {summary_json}")
    if args.strict and (fail_compiles or duplicates or vacuous or config_mismatches):
        print("Strict mode: at least one error detected. Failing.")
        sys.exit(99)
    print("Done.")

if __name__ == "__main__":
    main()


Writing lean_validator.py


In [9]:
%%writefile generate_random_fields.py
#!/usr/bin/env python3
import os
import sys
import hashlib
import json
import time
import socket
import argparse
import numpy as np
from datetime import datetime
from functools import wraps
try:
    import matplotlib.pyplot as plt
except ImportError:
    plt = None

# ================= Registry & Decorators ====================

class FieldRegistry:
    def __init__(self):
        self.generators = {}
    def register(self, name):
        def wrapper(func):
            self.generators[name] = func
            return func
        return wrapper
    def generate(self, name, **params):
        if name not in self.generators:
            raise ValueError(f"Unknown field type: {name}")
        return self.generators[name](**params)

field_registry = FieldRegistry()

def validate_field(fn):
    """Decorator for generator functions to check output shape, finiteness, divergence."""
    @wraps(fn)
    def wrapper(*args, **kwargs):
        omega = fn(*args, **kwargs)
        if omega.shape[0] != 3 or len(omega.shape) != 4:
            raise ValueError(f"Field shape invalid: expected (3,N,N,N), got {omega.shape}")
        if not np.isfinite(omega).all():
            raise ValueError("Field contains inf/nan values")
        if np.linalg.norm(omega) < 1e-8:
            raise ValueError("Field norm too small")
        # Divergence-free test (optional unless forced)
        if kwargs.get('enforce_divergence', True):
            N = omega.shape[1]
            dx = 1.0/N
            div = (
                np.gradient(omega[0], dx, axis=0) +
                np.gradient(omega[1], dx, axis=1) +
                np.gradient(omega[2], dx, axis=2)
            )
            if np.abs(div).mean() >= 1e-3:
                raise ValueError("Field failed divergence test: mean(abs(div)) >= 1e-3")
        return omega
    return wrapper

# ============= Field Generators =========================

@field_registry.register("shell_random")
@validate_field
def shell_random(j=8, N=512, seed=42, amplitude=1.0, **_):
    """Divergence-free field with energy in shell |k| ~ 2^j."""
    np.random.seed(seed)
    F = np.zeros((3, N, N, N), dtype=np.complex64)
    freq = np.fft.fftfreq(N)*N
    Kx, Ky, Kz = np.meshgrid(freq, freq, freq, indexing='ij')
    K2 = Kx**2 + Ky**2 + Kz**2
    mask = (K2 >= 2**(2*j)) & (K2 < 2**(2*(j+1)))
    if not np.any(mask):
        max_freq = np.max(np.abs(freq))
        j_adj = int(np.floor(np.log2(max_freq)))
        mask = (K2 >= 2**(2*j_adj)) & (K2 < 2**(2*(j_adj+1)))
    idx = np.nonzero(mask)
    if idx[0].size > 0:
        kvec = np.vstack([Kx[idx], Ky[idx], Kz[idx]])  # shape (3, M)
        norm_k = np.linalg.norm(kvec, axis=0)
        valid = norm_k >= 1e-7
        if np.any(valid):
            kvec_valid = kvec[:, valid]
            norm_k_valid = norm_k[valid]
            k_unit = kvec_valid / norm_k_valid  # shape (3, M_valid)
            M_valid = k_unit.shape[1]
            u = np.random.randn(3, M_valid) + 1j * np.random.randn(3, M_valid)
            dot = np.sum(u * k_unit, axis=0)
            u = u - k_unit * dot
            indices = np.array(np.nonzero(mask)).T  # shape (M, 3)
            indices_valid = indices[valid]
            F[:, indices_valid[:,0], indices_valid[:,1], indices_valid[:,2]] = u
    norm_F = np.sqrt((np.abs(F)**2).sum())
    if norm_F == 0:
        raise ValueError("No energy in selected Fourier modes; adjust 'j' or 'N'.")
    F /= norm_F
    F *= amplitude
    omega = np.fft.ifftn(F, axes=(1,2,3)).real.astype(np.float32)
    return omega

@field_registry.register("multi_shell")
@validate_field
def multi_shell(j1=6, j2=9, N=512, seed=888, amplitude=1.0, **_):
    """Superpose two shells."""
    a = shell_random(j=j1, N=N, seed=seed, amplitude=amplitude/2, enforce_divergence=False)
    b = shell_random(j=j2, N=N, seed=seed+1, amplitude=amplitude/2, enforce_divergence=False)
    omega = a + b
    return omega

@field_registry.register("aligned_noise")
@validate_field
def aligned_noise(N=512, seed=123, amplitude=1.0, **_):
    np.random.seed(seed)
    omega = np.random.randn(3, N, N, N)
    v = omega.reshape(3,-1).mean(axis=1)
    if np.linalg.norm(v) > 1e-8:
        theta = np.arccos(np.dot(v, [1,0,0])/np.linalg.norm(v))
        if theta > 1e-5:
            axis = np.cross(v, [1,0,0])
            axis = axis/np.linalg.norm(axis)
            from scipy.spatial.transform import Rotation as R
            rot = R.from_rotvec(axis*theta)
            omega = np.tensordot(rot.as_matrix(), omega, axes=([1],[0]))
    omega *= amplitude / np.sqrt(np.mean(omega**2)+1e-12)
    return omega.astype(np.float32)

@field_registry.register("white_noise")
@validate_field
def white_noise(N=512, seed=77, amplitude=1.0, **_):
    np.random.seed(seed)
    omega = amplitude * np.random.randn(3, N, N, N).astype(np.float32)
    return omega

@field_registry.register("vortex_tube")
@validate_field
def vortex_tube(N=512, amplitude=1.0, **_):
    x = np.linspace(-1,1,N,endpoint=False)
    X,Y,Z = np.meshgrid(x,x,x,indexing='ij')
    r = np.sqrt(Y**2+Z**2)
    tube = amplitude * np.exp(-30*r**2)
    omega = np.zeros((3,N,N,N), dtype=np.float32)
    omega[0] = tube
    return omega

@field_registry.register("boundary_layer")
@validate_field
def boundary_layer(N=512, amplitude=1.0, **_):
    x = np.linspace(-1,1,N,endpoint=False)
    Z = np.meshgrid(x,x,x,indexing='ij')[2]
    layer = amplitude * np.exp(-200*Z**2)
    omega = np.zeros((3,N,N,N), dtype=np.float32)
    omega[1] = layer
    return omega

@field_registry.register("adversarial_mix")
@validate_field
def adversarial_mix(N=512, j=3, j2=6, seed=444, amplitude=1.0, **_):
    field1 = shell_random(j=j, N=N, seed=seed, amplitude=amplitude/2, enforce_divergence=False)
    field2 = white_noise(N=N, seed=seed+993, amplitude=amplitude/4, enforce_divergence=False)
    field3 = vortex_tube(N=N, amplitude=amplitude/4, enforce_divergence=False)
    omega = field1 + field2 + field3
    return omega

# ============ Evolution Stepper =======================

def proof_evolve(omega_t, dt=1e-4, magnitude=0.01, seed=2023):
    np.random.seed(hash(float(np.sum(omega_t))+dt+seed) % (2**32-1))
    noise = magnitude * np.random.randn(*omega_t.shape)
    return omega_t + dt*noise

# ========== Hashing & Metadata ========================

def get_sha256(arr):
    m = hashlib.sha256()
    m.update(arr.tobytes())
    return m.hexdigest()

def get_md5(arr):
    m = hashlib.md5()
    m.update(arr.astype(np.float32).tobytes())
    return m.hexdigest()

def get_src_hash(fn):
    co = fn.__code__
    code_bytes = co.co_code
    h = hashlib.sha1(code_bytes).hexdigest()
    return h

def get_git_commit():
    try:
        import subprocess
        commit = subprocess.check_output(['git', 'rev-parse', 'HEAD'], text=True).strip()
        return commit
    except Exception:
        return "unknown"

# ========== CLI & Main ===============================

def visualize_field(omega, out_fn="preview_slice.png"):
    import matplotlib.pyplot as plt
    N = omega.shape[1]
    mid = N//2
    fig, axs = plt.subplots(1, 3, figsize=(10, 3))
    for i, comp in enumerate(['x', 'y', 'z']):
        im = axs[i].imshow(omega[i, mid], cmap='RdBu', vmax=np.abs(omega[i, mid]).max())
        axs[i].set_title(f"$\\omega_{comp}$ (z={mid})")
        plt.colorbar(im, ax=axs[i])
    plt.tight_layout()
    plt.savefig(out_fn)
    plt.close(fig)

def main():
    parser = argparse.ArgumentParser(description="Generate reproducible random vorticity fields for proof engine.")
    parser.add_argument('--type', type=str, required=True, help="Field type")
    parser.add_argument('--j', type=int, help="Shell/Multi")
    parser.add_argument('--j1', type=int)
    parser.add_argument('--j2', type=int)
    parser.add_argument('--seed', type=int, default=42)
    parser.add_argument('--amplitude', type=float, default=1.0)
    parser.add_argument('--N', type=int, default=512)
    parser.add_argument('--out', type=str, default="/content/omega_t.npy")
    parser.add_argument('--visualize', action='store_true')
    parser.add_argument('--evolve', action='store_true')
    parser.add_argument('--export_config', action='store_true')
    args = parser.parse_args()

    params = {}
    argsvars = vars(args)
    for k in argsvars:
        v = argsvars[k]
        if k in {'visualize', 'evolve', 'out', 'export_config', 'type'} or v is None:
            continue
        params[k] = v

    ftype = args.type
    if ftype not in field_registry.generators:
        raise ValueError(f'Field type not recognized: {ftype}')
    fn = field_registry.generators[ftype]
    import inspect
    valid_args = inspect.getfullargspec(fn).args
    fn_params = {k: params[k] for k in params if k in valid_args}

    omega = field_registry.generate(ftype, **fn_params)
    assert omega.shape == (3, args.N, args.N, args.N)
    save_path = args.out
    np.save(save_path, omega)

    sha256 = get_sha256(omega)
    md5 = get_md5(omega)
    src_hash = get_src_hash(fn)
    git_commit = get_git_commit()
    ts = datetime.utcnow().replace(microsecond=0).isoformat() + "Z"
    hostname = socket.gethostname()

    if args.visualize and plt is not None:
        out_img = os.path.splitext(save_path)[0] + "_preview.png"
        visualize_field(omega, out_img)

    if args.evolve:
        omega_tpdt = proof_evolve(omega)
        out_tpdt = os.path.splitext(save_path)[0] + "_tpdt.npy"
        np.save(out_tpdt, omega_tpdt)

    if args.export_config:
        config = {
            "type": ftype,
            "params": fn_params,
            "shape": list(omega.shape),
            "hash_sha256": sha256,
            "float_hash_md5": md5,
            "generator_source_hash": src_hash,
            "timestamp": ts,
            "git_commit": git_commit,
            "hostname": hostname
        }
        config_path = os.path.splitext(save_path)[0] + "_config.json"
        with open(config_path, "w") as f:
            json.dump(config, f, indent=2)
        print(f"Config saved to {config_path}")

    print(f"Field '{ftype}' written to {save_path}")
    print(f"SHA256: {sha256}")
    print(f"MD5(float32): {md5}")

if __name__ == "__main__":
    main()


Writing generate_random_fields.py


In [10]:
# Step 1: Generate a field with real energy in j=8 shell
!python3 generate_random_fields.py --type multi_shell --j1 6 --j2 9 --N 512 --seed 123 --evolve --visualize --export_config

fatal: not a git repository (or any of the parent directories): .git
Config saved to /content/omega_t_config.json
Field 'multi_shell' written to /content/omega_t.npy
SHA256: d6f05c97072c6b7b0849a620666174eaa363c4b5b1036b28e11a3601009e1626
MD5(float32): fc6cc4534d154f4c665970fe94e42916


In [11]:
!python proof_colab.py --input omega_t.npy --input2 omega_t_tpdt.npy --alpha 2.5 --j_min 6 --j_max 9 --timesteps 100 --strict --export_cert --plot --run_validation_checks


Using CuPy (GPU) for arrays and FFTs
Loaded omega_t from omega_t.npy, shape (3, 512, 512, 512).
Loaded omega_tpdt from omega_t_tpdt.npy, shape (3, 512, 512, 512).
Running 100 time steps with dt=0.0001 and verifying at each step.
  (Computing velocity and grad_u from vorticity via Biot-Savart)
[biot_savart] max|div u| = 5.12e-15

Recursive norm Y(t):          0.0103605 ± 1.04e-09
sup_j alignment:              0.500082 ± 5.00e-08
dY/dt:                        -11.7085 ± 1.96e-05
RHS:                          0.0713894 ± 2.14e-08
[Inequality verified]: This timestep is within bounds.

[entropy] H(t) = -0.0000
Wrote Lean-style proof certificate to /content/output/certificate_timestep_000.lean
  (Computing velocity and grad_u from vorticity via Biot-Savart)
[biot_savart] max|div u| = 5.01e-15

Recursive norm Y(t):          0.00939181 ± 9.39e-10
sup_j alignment:              0.500079 ± 5.00e-08
dY/dt:                        -10.1023 ± 1.78e-05
RHS:                          0.0586632 ± 1.76e-

In [83]:
!python3 proof_colab.py --input omega_t.npy --input2 omega_t_tpdt.npy --alpha 2.5 --j_min 6 --j_max 9 --timesteps 100 --strict --export_cert --run_validation_checks

Using CuPy (GPU) for arrays and FFTs
Loaded omega_t from omega_t.npy, shape (3, 512, 512, 512).
Loaded omega_tpdt from omega_t_tpdt.npy, shape (3, 512, 512, 512).
Running 100 time steps with dt=0.0001 and verifying at each step.
  (Computing velocity and grad_u from vorticity via Biot-Savart)
[biot_savart] max|div u| = 1.42e-08

Recursive norm Y(t):          55278 ± 5.53e-03
sup_j alignment:              0.500024 ± 5.00e-08
dY/dt:                        -7.5397e+07 ± 1.03e+02
RHS:                          2.03242e+12 ± 6.10e+05
[Inequality verified]: This timestep is within bounds.

[entropy] H(t) = -0.3404
Wrote Lean-style proof certificate to /content/output/certificate_timestep_000.lean
  (Computing velocity and grad_u from vorticity via Biot-Savart)
[biot_savart] max|div u| = 1.32e-08

Recursive norm Y(t):          50423.4 ± 5.04e-03
sup_j alignment:              0.500021 ± 5.00e-08
dY/dt:                        -6.71096e+07 ± 9.41e+01
RHS:                          1.69111e+12 ± 5.

In [11]:
!python3 proof_colab.py --run_blowup_test --strict
!python3 proof_colab.py --run_commutator_test --strict
!python3 proof_colab.py --run_time_reverse_test --strict

Using CuPy (GPU) for arrays and FFTs
No --input provided. Generating a synthetic 'vortex tube' field.
  (Computing velocity and grad_u from vorticity via Biot-Savart)
[biot_savart] max|div u| = 0.00e+00

Recursive norm Y(t):          347872 ± 3.48e-02
sup_j alignment:              0.411426 ± 4.11e-08
dY/dt not available (only single time). No inequality check.
Default evolution + certification completed successfully.
See /content/output/diagnostics_log_mainrun.json for details.
[blowup test] j_min=6, j_max=9, timesteps=1
  (Computing velocity and grad_u from vorticity via Biot-Savart)
[biot_savart] max|div u| = 9.26e-05

Recursive norm Y(t):          2.48765 ± 2.49e-07
sup_j alignment:              0.329752 ± 3.30e-08
dY/dt:                        -4629.68 ± 4.51e-03
RHS:                          2714.19 ± 8.14e-04
[Inequality verified]: This timestep is within bounds.

[blowup test] Completed.
All requested runs/tests are complete. Exiting.
Using CuPy (GPU) for arrays and FFTs
No --in

In [35]:
!python3 recursive_ode_falsifier.py --strict --lean_cert

FALSIFIER: Step 8: error in RHS calculation: Overflow: Y=6.612048636434717e+219, exponent=1.8


In [26]:
!python3 proof_falsifier_engine.py

fatal: not a git repository (or any of the parent directories): .git
[FAIL] test_shell_j3_run1_3052d555a8 | proof_colab: 2 | ode: n/a
[FAIL] test_shell_j5_run2_7034978480 | proof_colab: 2 | ode: n/a
[FAIL] test_aligned_blowup_run3_296c65368f | proof_colab: 2 | ode: n/a
[FAIL] test_adversarial_combo_run4_a6b94e165f | proof_colab: 2 | ode: n/a
[FAIL] test_random_noise_run5_cda5c21d99 | proof_colab: 2 | ode: n/a
Wrote summary to /content/output/proof_summary.json
------ Adversarial Falsification Test Report ------
Total runs: 5
proof_colab.py failures: 5
recursive_ode_falsifier.py failures: 0
Field types causing failure:
  adversarial_combo (proof_colab)
  aligned_blowup (proof_colab)
  random_noise (proof_colab)
  shell_j3 (proof_colab)
  shell_j5 (proof_colab)
Counterexamples saved in: /content/output/counterexamples/
Bundle zip (if any): /content/output/counterexample_bundle.zip
--------------------------------------------------


In [38]:
%%writefile proof_summary_dashboard.py
#!/usr/bin/env python3
import os
import sys
import re
import json
import argparse

def parse_args():
    parser = argparse.ArgumentParser(
        description="Summarize proof certificates into JSON"
    )
    parser.add_argument(
        "--input_dir", "-i",
        required=True,
        help="Path to folder containing certificate_timestep_*.lean"
    )
    parser.add_argument(
        "--timesteps", "-t",
        type=int,
        required=True,
        help="Number of timesteps to process (e.g. 100)"
    )
    parser.add_argument(
        "--output", "-o",
        required=True,
        help="Path to write proof_summary.json"
    )
    parser.add_argument(
        "--log_failed", action="store_true",
        help="Also write a JSON of failed/skipped files alongside the summary"
    )
    return parser.parse_args()

def main():
    args = parse_args()

    input_dir = args.input_dir
    if not input_dir.endswith(os.sep):
        input_dir += os.sep

    # regex patterns
    re_Y       = re.compile(r"--\s*Y\s*=\s*\(\s*([0-9eE.+-]+)")
    re_align   = re.compile(r"--\s*align_sup\s*=\s*\(\s*([0-9eE.+-]+)")
    re_dYdt    = re.compile(r"--\s*Computed\s*\(LHS\s*=\s*dY/dt\)\s*:\s*([0-9eE.+-]+)")
    re_rhs     = re.compile(r"--\s*Computed\s*\(RHS\)\s*:\s*([0-9eE.+-]+)")
    re_verified = re.compile(r"^theorem\s+\w+\s*:\s*dYdt\s*≤\s*RHS", re.MULTILINE)

    summary = []
    failed = []

    for i in range(args.timesteps):
        fname = f"certificate_timestep_{i:03}.lean"
        path = os.path.join(input_dir, fname)
        entry = {
            "file": path,
            "Y": None,
            "alignment": None,
            "dY_dt": None,
            "RHS": None,
            "verified": False
        }
        try:
            with open(path, "r") as f:
                contents = f.read()
            if not contents.strip():
                print(f"Warning: empty file {path}", file=sys.stderr)
                failed.append(fname)
            else:
                # parse Y
                m = re_Y.search(contents)
                if m:
                    try:
                        entry["Y"] = float(m.group(1))
                    except ValueError:
                        print(f"Warning: could not parse Y in {path}", file=sys.stderr)
                else:
                    print(f"Warning: Y not found in {path}", file=sys.stderr)

                # parse alignment
                m = re_align.search(contents)
                if m:
                    try:
                        entry["alignment"] = float(m.group(1))
                    except ValueError:
                        print(f"Warning: could not parse alignment in {path}", file=sys.stderr)
                else:
                    print(f"Warning: alignment not found in {path}", file=sys.stderr)

                # parse dY/dt
                m = re_dYdt.search(contents)
                if m:
                    try:
                        entry["dY_dt"] = float(m.group(1))
                    except ValueError:
                        print(f"Warning: could not parse dY/dt in {path}", file=sys.stderr)
                else:
                    print(f"Warning: dY/dt not found in {path}", file=sys.stderr)

                # parse RHS
                m = re_rhs.search(contents)
                if m:
                    try:
                        entry["RHS"] = float(m.group(1))
                    except ValueError:
                        print(f"Warning: could not parse RHS in {path}", file=sys.stderr)
                else:
                    print(f"Warning: RHS not found in {path}", file=sys.stderr)

                # parse verified theorem
                if re_verified.search(contents):
                    entry["verified"] = True

                # record failures if any metric missing
                if any(entry[k] is None for k in ["Y","alignment","dY_dt","RHS"]):
                    failed.append(fname)

        except FileNotFoundError:
            print(f"Warning: file not found {path}", file=sys.stderr)
            failed.append(fname)
        except Exception as e:
            print(f"Warning: error reading {path}: {e}", file=sys.stderr)
            failed.append(fname)

        summary.append(entry)

    # write summary JSON
    out_data = {"summary": summary}
    os.makedirs(os.path.dirname(args.output), exist_ok=True)
    with open(args.output, "w") as f:
        json.dump(out_data, f, indent=2)
    print(f"Wrote summary to {args.output}")

    # optionally write failed log
    if args.log_failed:
        failed_path = args.output.rstrip(".json") + "_failed.json"
        with open(failed_path, "w") as f:
            json.dump({"failed": failed}, f, indent=2)
        print(f"Wrote failed log to {failed_path}")

if __name__ == "__main__":
    main()


Overwriting proof_summary_dashboard.py


In [39]:
!python3 proof_summary_dashboard.py --input_dir /content/output/ --timesteps 100 --output /content/output/proof_summary.json


Wrote summary to /content/output/proof_summary.json


In [40]:
!python3 certificate_packager.py

[OK] Capsule created: /content/output/capsules/certificate_timestep_000_capsule.zip
[OK] Capsule created: /content/output/capsules/certificate_timestep_001_capsule.zip
[OK] Capsule created: /content/output/capsules/certificate_timestep_002_capsule.zip
[OK] Capsule created: /content/output/capsules/certificate_timestep_003_capsule.zip
[OK] Capsule created: /content/output/capsules/certificate_timestep_004_capsule.zip
[OK] Capsule created: /content/output/capsules/certificate_timestep_005_capsule.zip
[OK] Capsule created: /content/output/capsules/certificate_timestep_006_capsule.zip
[OK] Capsule created: /content/output/capsules/certificate_timestep_007_capsule.zip
[OK] Capsule created: /content/output/capsules/certificate_timestep_008_capsule.zip
[OK] Capsule created: /content/output/capsules/certificate_timestep_009_capsule.zip
[OK] Capsule created: /content/output/capsules/certificate_timestep_010_capsule.zip
[OK] Capsule created: /content/output/capsules/certificate_timestep_011_caps

In [42]:
!export PATH="$HOME/.elan/bin:$PATH" && python3 lean_validator.py --dir /content/output/capsules/ --strict --check_duplicate_ast


Lean version: /bin/sh: 1: lean: not found

Summary:
- CSV: /content/output/capsules/lean_validation_summary.csv
- Markdown report: /content/output/capsules/lean_validation_report.md
- Metadata: /content/output/capsules/lean_validator_metadata.json
Done.


In [90]:
!export PATH="$HOME/.elan/bin:$PATH" && python3 lean_validator.py --dir /content/output/counterexamples/ --strict --check_duplicate_ast


Lean version: Lean (version 4.18.0, x86_64-unknown-linux-gnu, commit 11ccbced7964, Release)

Summary:
- CSV: /content/output/counterexamples/lean_validation_summary.csv
- Markdown report: /content/output/counterexamples/lean_validation_report.md
- Metadata: /content/output/counterexamples/lean_validator_metadata.json
Strict mode: at least one error detected. Failing.


In [87]:
!cat /content/output/counterexamples/lean_validation_report.md


# Lean Certificate Validation Report
Audit time: 2025-04-18T00:07:59.493780Z
Target directory: `/content/output/counterexamples/`
Lean version: Lean (version 4.18.0, x86_64-unknown-linux-gnu, commit 11ccbced7964, Release)

Total certificates: 5

## Failures to compile:
- /content/output/counterexamples/test_adversarial_combo_run4_a6b94e165f/test_adversarial_combo_run4_a6b94e165f_falsifier_certificate.lean
- /content/output/counterexamples/test_aligned_blowup_run3_296c65368f/test_aligned_blowup_run3_296c65368f_falsifier_certificate.lean
- /content/output/counterexamples/test_random_noise_run5_cda5c21d99/test_random_noise_run5_cda5c21d99_falsifier_certificate.lean
- /content/output/counterexamples/test_shell_j3_run1_3052d555a8/test_shell_j3_run1_3052d555a8_falsifier_certificate.lean
- /content/output/counterexamples/test_shell_j5_run2_7034978480/test_shell_j5_run2_7034978480_falsifier_certificate.lean

## Config mismatches or theorem/context inconsistency:
- /content/output/counterexampl

In [34]:
!python3 certificate_packager.py

[certificate_packager.py] ERROR: No valid runs found for packaging.


In [89]:
!export PATH="$HOME/.elan/bin:$PATH" && python3 lean_validator.py --dir /content/output/ --strict --check_duplicate_ast

Lean version: Lean (version 4.18.0, x86_64-unknown-linux-gnu, commit 11ccbced7964, Release)

Summary:
- CSV: /content/output/lean_validation_summary.csv
- Markdown report: /content/output/lean_validation_report.md
- Metadata: /content/output/lean_validator_metadata.json
Strict mode: at least one error detected. Failing.


In [None]:
!zip -r navier_stokes_proof_output.zip /content/output/

  adding: content/output/ (stored 0%)
  adding: content/output/diagnostics_log_timestep_019.json (deflated 68%)
  adding: content/output/diagnostics_log_timestep_044.json (deflated 67%)
  adding: content/output/per_shell_timestep_046.png (deflated 22%)
  adding: content/output/per_shell_timestep_025.png (deflated 23%)
  adding: content/output/inequality_timestep_002.png (deflated 21%)
  adding: content/output/per_shell_timestep_021.png (deflated 23%)
  adding: content/output/certificate_timestep_089.lean (deflated 31%)
  adding: content/output/per_shell_timestep_092.png (deflated 23%)
  adding: content/output/per_shell_timestep_024.png (deflated 23%)
  adding: content/output/inequality_timestep_007.png (deflated 20%)
  adding: content/output/certificate_timestep_018.lean (deflated 31%)
  adding: content/output/diagnostics_log_timestep_072.json (deflated 67%)
  adding: content/output/certificate_timestep_095.lean (deflated 32%)
  adding: content/output/inequality_timestep_028.png (defla

In [None]:
!zip -r navier_stokes_proof_full_output.zip /content/

  adding: content/ (stored 0%)
  adding: content/.config/ (stored 0%)
  adding: content/.config/default_configs.db (deflated 98%)
  adding: content/.config/.last_update_check.json (deflated 22%)
  adding: content/.config/.last_survey_prompt.yaml (stored 0%)
  adding: content/.config/config_sentinel (stored 0%)
  adding: content/.config/gce (stored 0%)
  adding: content/.config/.last_opt_in_prompt.yaml (stored 0%)
  adding: content/.config/configurations/ (stored 0%)
  adding: content/.config/configurations/config_default (deflated 15%)
  adding: content/.config/hidden_gcloud_config_universe_descriptor_data_cache_configs.db (deflated 97%)
  adding: content/.config/logs/ (stored 0%)
  adding: content/.config/logs/2025.03.05/ (stored 0%)
  adding: content/.config/logs/2025.03.05/14.26.32.612701.log (deflated 58%)
  adding: content/.config/logs/2025.03.05/14.26.41.012425.log (deflated 57%)
  adding: content/.config/logs/2025.03.05/14.26.30.965194.log (deflated 86%)
  adding: content/.confi