# EQ-Proof (improved)
A self-contained, runnable notebook for EQ-Proof: constraint validation, repair, attestation, and reporting.

Highlights:
- Offline-by-default security model with explicit overrides
- Robust input validation (optional JSON Schema)
- Deterministic attestation and environment metadata
- Documented, testable, and maintainable modules
- End-to-end demo loading a real JSON spec


## Configuration and utilities
Centralized tolerances, timing helpers, and simple logging.


In [None]:
# config.py / utils
from __future__ import annotations
import os, time, uuid, platform, json, hashlib
from typing import Dict, Any

class Config:
    """Global configuration for tolerances and runtime behavior."""
    EQUALITY_TOL: float = 1e-9
    SIMPLEX_SUM_TOL: float = 1e-9
    SIMPLEX_NEG_TOL: float = -1e-12
    ALTPROJ_ITERS: int = 200
    ALTPROJ_TOL: float = 1e-9
    SUM_SLACK_FRAC: float = 0.005
    SIMPLEX_SUM_SOFT: float = 1e-6
    MONOTONE_SLACK: float = -1e-12
    DEMO_KEY: bytes = b"DEMO_KEY"

def now_ms() -> int:
    """Return current time in milliseconds."""
    return int(time.time() * 1000)

def run_meta() -> Dict[str, Any]:
    """Return runtime metadata for attestation and reporting."""
    return {
        "python": platform.python_version(),
        "platform": platform.platform(),
        "time_ms": now_ms(),
        "run_id": str(uuid.uuid4()),
    }

def sha256_bytes(b: bytes) -> str:
    """Compute SHA-256 hex digest of bytes."""
    return hashlib.sha256(b).hexdigest()

def canonical_json(obj: Any) -> bytes:
    """Canonical JSON bytes for deterministic hashing/signing."""
    return json.dumps(obj, sort_keys=True, separators=(",", ":")).encode("utf-8")

def log_step(report: Dict[str, Any], step: Dict[str, Any]) -> None:
    """Append a structured step to the report."""
    report.setdefault("steps", []).append(step)


## Offline enforcement
Disable outbound network unless explicitly allowed.


In [None]:
# no_net.py
import socket
import os

class _NoNetSocket(socket.socket):
    def connect(self, *a, **k):
        raise RuntimeError("Outbound network disabled by eq_proof.no_net")
    def connect_ex(self, *a, **k):
        raise RuntimeError("Outbound network disabled by eq_proof.no_net")

def enforce_offline():
    """Monkey-patch socket to disable outbound network unless EQPROOF_ALLOW_NET is set."""
    if os.environ.get("EQPROOF_ALLOW_NET", "0") not in ("1", "true", "True"):
        socket.socket = _NoNetSocket  # type: ignore
        def _deny(*a, **k):
            raise RuntimeError("Outbound network disabled by eq_proof.no_net")
        socket.create_connection = _deny  # type: ignore

enforce_offline()


## Spec and optional schema validation
Load a spec, optionally validate against a JSON Schema if `jsonschema` is available.


In [None]:
# spec.py
from dataclasses import dataclass, field
from typing import List, Dict, Any, Optional
import json

SCHEMA: Dict[str, Any] = {
  "$schema": "http://json-schema.org/draft-07/schema#",
  "title": "EQ-PROOF Spec",
  "type": "object",
  "required": ["name","version","variables","constraints"],
  "properties": {
    "name": {"type": "string"},
    "version": {"type": "string"},
    "variables": {"type": "array", "items": {"type": "string"}},
    "constraints": {"type": "array", "items": {"type": "object"}},
    "probes": {"type": "array", "items": {"type": "object"}},
    "alternates": {"type": "array", "items": {"type": "string"}},
    "units": {"type": "object", "additionalProperties": {"type": "string"}}
  }
}

@dataclass
class Spec:
    """Structured representation of an EQ-Proof spec."""
    name: str
    version: str
    variables: List[str]
    constraints: List[Dict[str, Any]]
    probes: List[Dict[str, Any]] = field(default_factory=list)
    alternates: List[str] = field(default_factory=list)
    units: Dict[str, str] = field(default_factory=dict)

def validate_schema(obj: Dict[str, Any]) -> Optional[str]:
    """Validate spec against JSON Schema if jsonschema is available. Return error message or None."""
    try:
        import jsonschema  # optional
        jsonschema.validate(instance=obj, schema=SCHEMA)
        return None
    except ImportError:
        return None
    except Exception as e:
        return f"Schema validation failed: {e}"

def load_spec(path: str) -> Spec:
    """Load and validate a spec JSON file from disk."""
    with open(path, "r") as f:
        d = json.load(f)
    err = validate_schema(d)
    if err:
        raise ValueError(err)
    for k in ["name","version","variables","constraints"]:
        if k not in d:
            raise ValueError(f"Spec missing required field: {k}")
    return Spec(
        d["name"], d["version"], d["variables"], d["constraints"],
        d.get("probes",[]), d.get("alternates",[]), d.get("units",{})
    )

def spec_hash(spec: Spec) -> str:
    """Deterministic hash of the spec object."""
    return sha256_bytes(canonical_json(spec.__dict__))


## Units handling
Parse, convert, and coerce input values into spec units with detailed step logs.


In [None]:
# units.py
from typing import Tuple
import sympy as sp

BASE = {"":((0,0,0,0,0,0,0),1.0),"m":((1,0,0,0,0,0,0),1.0),"kg":((0,1,0,0,0,0,0),1.0),
        "s":((0,0,1,0,0,0,0),1.0),"A":((0,0,0,1,0,0,0),1.0),"K":((0,0,0,0,1,0,0),1.0),
        "mol":((0,0,0,0,0,1,0),1.0),"cd":((0,0,0,0,0,0,1),1.0)}
DER = {
    "Hz":((0,0,-1,0,0,0,0),1.0),
    "J": ((2,1,-2,0,0,0,0),1.0),
    "V": ((2,1,-3,-1,0,0,0),1.0),
    "Ω": ((2,1,-3,-2,0,0,0),1.0),
    "ohm":((2,1,-3,-2,0,0,0),1.0),
    "eV":((2,1,-2,0,0,0,0),1.602176634e-19),
}
PREFIX={"Y":1e24,"Z":1e21,"E":1e18,"P":1e15,"T":1e12,"G":1e9,"M":1e6,"k":1e3,"h":1e2,"da":1e1,
        "d":1e-1,"c":1e-2,"m":1e-3,"u":1e-6,"μ":1e-6,"n":1e-9,"p":1e-12,"f":1e-15,"a":1e-18,"z":1e-21,"y":1e-24}
CONST={"h":("J*s",6.62607015e-34),"hbar":("J*s",1.054571817e-34),"ħ":("J*s",1.054571817e-34),
       "c":("m/s",299792458.0),"k_B":("J/K",1.380649e-23),"q_e":("C",1.602176634e-19)}

def _mul(a,b): return tuple(x+y for x,y in zip(a,b))
def _div(a,b): return tuple(x-y for x,y in zip(a,b))
def _pow(a,p): return tuple(x*p for x in a)

def _atom(tok):
    if tok in BASE: return BASE[tok]
    if tok in DER: return DER[tok]
    if len(tok)>=2 and tok[:2] in PREFIX and tok[2:] in BASE: d,f=BASE[tok[2:]]; return d,f*PREFIX[tok[:2]]
    if len(tok)>=2 and tok[:2] in PREFIX and tok[2:] in DER: d,f=DER[tok[2:]]; return d,f*PREFIX[tok[:2]]
    if tok[0:1] in PREFIX and tok[1:] in BASE: d,f=BASE[tok[1:]]; return d,f*PREFIX[tok[0:1]]
    if tok[0:1] in PREFIX and tok[1:] in DER: d,f=DER[tok[1:]]; return d,f*PREFIX[tok[0:1]]
    raise ValueError(f"Unknown unit token: {tok}")

def parse_unit(u:str) -> Tuple[Tuple[int,...], float]:
    """Parse a unit expression into dimension exponents and scale factor."""
    if not u or u=="1": return BASE[""]
    u=u.replace(" ","")
    num,*den=u.split("/"); dim=BASE[""][0]; fac=1.0
    for tok in filter(None,num.split("*")):
        base, p = tok.split("^") if "^" in tok else (tok,"1"); p=int(p)
        d,f=_atom(base); dim=_mul(dim,_pow(d,p)); fac*=f**p
    if den:
        den="*".join(den)
        for tok in filter(None,den.split("*")):
            base,p = tok.split("^") if "^" in tok else (tok,"1"); p=int(p)
            d,f=_atom(base); dim=_div(dim,_pow(d,p)); fac/=f**p
    return dim, fac

def convert(val: float, from_u: str, to_u: str) -> float:
    """Convert numeric value between compatible units."""
    d1,f1=parse_unit(from_u); d2,f2=parse_unit(to_u)
    if d1!=d2: raise ValueError("Incompatible units")
    return (val*f1)/f2

def coerce_inputs_to_spec_units(values: dict, spec_units: Dict[str,str]):
    """Coerce input values to spec-declared units; returns (coerced_values, steps)."""
    steps=[]; out=dict(values)
    for k,u in spec_units.items():
        if isinstance(values.get(k), dict) and "value" in values[k] and "unit" in values[k]:
            v=float(values[k]["value"]); from_u=str(values[k]["unit"]); v2=convert(v, from_u, u)
            out[k]=v2; steps.append({"op":"unit_convert","var":k,"from":from_u,"to":u,"value_in":v,"value_out":v2})
    return out, steps


## Symbolic constraints
Residuals and solving with cached SymPy symbols and safe error messages.


In [None]:
# constraints.py
from typing import Optional
import sympy as sp
from typing import Dict

_SYMBOL_CACHE: Dict[str, sp.Symbol] = {}

def _symbol(name: str) -> sp.Symbol:
    if name not in _SYMBOL_CACHE:
        _SYMBOL_CACHE[name] = sp.symbols(name, real=True)
    return _SYMBOL_CACHE[name]

def _locals_from_values(values: Dict[str, float]):
    syms = {k: _symbol(k) for k in values}
    ctx = {"Eq": sp.Eq}
    ctx.update(syms)
    return ctx, syms

def equality_residual(expr_str: str, values: Dict[str, float]) -> float:
    """Compute residual (lhs - rhs) for a SymPy equality string like "Eq(x+y, cap)"."""
    locals_, syms = _locals_from_values(values)
    try:
        eq = sp.sympify(expr_str, locals=locals_)
        subs_map = {syms[k]: float(v) for k,v in values.items() if k in syms}
        res = sp.simplify(eq.lhs - eq.rhs).subs(subs_map)
        return float(sp.N(res))
    except Exception as e:
        raise ValueError(f"Residual evaluation failed for '{expr_str}': {e}")

def equality_solve_for(expr_str: str, target: str, values: Dict[str, float]) -> Optional[float]:
    """Solve equality for target variable, substituting other known values."""
    all_names = set(values) | {target}
    syms = {k: _symbol(k) for k in all_names}
    locals_ = {"Eq": sp.Eq, **syms}
    try:
        eq = sp.sympify(expr_str, locals=locals_)
        t = syms[target]
        sol = sp.solve(eq, t, dict=True)
        if not sol:
            return None
        subs_map = {syms[k]: float(v) for k,v in values.items() if k in syms}
        v = float(sp.N(sol[0][t].subs(subs_map)))
        return v
    except Exception:
        return None


## Repair algorithms
Simplex projection, bounds clipping, and isotonic regression with safeguards.


In [None]:
# repair.py
from typing import Tuple, List
from typing import Dict

def project_simplex(y: List[float]) -> List[float]:
    """Project a vector onto the probability simplex."""
    u = sorted(y, reverse=True)
    cssv, csum, rho = [], 0.0, -1
    for i, ui in enumerate(u):
        csum += ui; cssv.append(csum)
        t = (cssv[i] - 1.0) / (i + 1)
        if ui - t > 0: rho = i
    theta = (cssv[rho] - 1.0) / (rho + 1)
    return [max(0.0, yi - theta) for yi in y]

def clip_bounds(vals: Dict[str, float], b: Dict[str, Tuple[float, float]]):
    """Clip values to lower/upper bounds per variable."""
    out = dict(vals)
    for k,(lo,hi) in b.items():
        v = out.get(k,0.0)
        if lo is not None: v = max(lo, v)
        if hi is not None: v = min(hi, v)
        out[k]=v
    return out

def isotonic_increasing(seq: List[float]) -> List[float]:
    """Pool-adjacent-violators for monotone increasing sequence."""
    y = seq[:]; w=[1.0]*len(y); i=0
    while i < len(y)-1:
        if y[i] <= y[i+1]: i+=1; continue
        j=i
        while j>=0 and y[j] > y[j+1]:
            tot=w[j]+w[j+1]; avg=(w[j]*y[j]+w[j+1]*y[j+1])/tot
            y[j]=y[j+1]=avg; w[j]=w[j+1]=tot; j-=1
        i+=1
    return y


## Quadratic projection with equality and bounds
Alternating projections with rank checks and tolerances.


In [None]:
# qp.py
from typing import Optional
import numpy as np
from typing import List, Tuple, Dict

def project_linear_equality(x: np.ndarray, A: np.ndarray, b: np.ndarray) -> np.ndarray:
    """Project x onto the affine set {y | A y = b} via normal equations."""
    At = A.T
    M = A @ At
    # Regularize if near-singular
    try:
        y = np.linalg.solve(M, (A @ x) - b)
    except np.linalg.LinAlgError:
        y, *_ = np.linalg.lstsq(M, (A @ x) - b, rcond=None)
    return x - At @ y

def alternating_proj_equality_bounds(x0: List[float],
                                     A: List[List[float]],
                                     b: List[float],
                                     bounds: Dict[int, Tuple[Optional[float], Optional[float]]],
                                     iters: int = 200, tol: float = 1e-9) -> List[float]:
    x = np.array(x0, dtype=float); A_m = np.array(A, dtype=float); b_v = np.array(b, dtype=float)
    for _ in range(iters):
        prev = x.copy()
        x = project_linear_equality(x, A_m, b_v)
        for idx,(lo,hi) in bounds.items():
            if lo is not None and x[idx] < lo: x[idx]=lo
            if hi is not None and x[idx] > hi: x[idx]=hi
        if np.linalg.norm(x-prev) <= tol: break
    return list(map(float, x))


## Diagnosis and repair pipeline
Coerce units, clip bounds, solve equalities, and record steps. Designed for clarity and auditability.


In [None]:
# diagnose.py
import numpy as np
import sympy as sp
from typing import Dict, Any, Tuple

def diagnose_and_repair(spec: Spec, values: Dict[str, float], *, spec_path: str = "", inputs_path: str = "", verbose: bool = True) -> Dict[str, Any]:
    """Run the repair pipeline and collect an auditable report."""
    started_ms = now_ms()
    report: Dict[str, Any] = {"violations": [], "steps": [], "meta": {"env": run_meta()}}

    # Units coercion
    coerced, unit_steps = coerce_inputs_to_spec_units(values, getattr(spec, "units", {}))
    report["steps"].extend(unit_steps)
    original = dict(coerced)
    repaired = dict(coerced)

    # Bounds clipping
    bounds: Dict[str, Tuple[float, float]] = {}
    for c in spec.constraints:
        if c.get("type") == "bounds":
            bounds[c["var"]] = (c.get("lower"), c.get("upper"))
    if bounds:
        before = {k: repaired.get(k) for k in bounds.keys()}
        repaired = clip_bounds(repaired, bounds)
        after = {k: repaired.get(k) for k in bounds.keys()}
        log_step(report, {"op": "bounds_clip", "before": before, "after": after})

    # Equality constraints solve
    for c in spec.constraints:
        if c.get("type") == "equality":
            expr = c["expr"]
            tol = float(c.get("tol", Config.EQUALITY_TOL))
            res = abs(equality_residual(expr, repaired))
            if res > tol:
                target = c.get("solve_for")
                if target:
                    new = equality_solve_for(expr, target, repaired)
                    if new is not None and np.isfinite(new):
                        before = repaired.get(target)
                        repaired[target] = float(new)
                        log_step(report, {"op": "equality_solve", "expr": expr, "target": target, "before": before, "after": float(new), "residual": res})

    report["meta"]["elapsed_ms"] = now_ms() - started_ms
    report["meta"]["spec_path"] = spec_path
    report["meta"]["inputs_path"] = inputs_path
    return {"original": original, "repaired": repaired, "report": report}


## Attestation and verification
Deterministic HMAC signatures with explicit key handling and environment metadata.


In [None]:
# attest.py / verify.py
import hmac
import hashlib
import time

def _load_secret() -> bytes:
    """Load HMAC key from env; fall back to demo key with a warning."""
    key = os.environ.get("EQPROOF_KEY")
    if key:
        return key.encode("utf-8")
    print("[WARN] Using DEMO_KEY for HMAC attestation. Set EQPROOF_KEY for production.")
    return Config.DEMO_KEY

def attest(spec: dict, proof: dict, *, spec_path: str = "", inputs_path: str = "") -> dict:
    """Create a deterministic HMAC attestation over the spec and proof."""
    payload = {
        "spec": spec,
        "proof": proof,
        "meta": {
            "spec_hash": sha256_bytes(canonical_json(spec)),
            "inputs_hash": sha256_bytes(canonical_json(proof.get("original", {}))),
            "engine_version": "0.2.0",
            "runtime_env": run_meta(),
            "spec_path": spec_path,
            "inputs_path": inputs_path
        },
        "ts": int(time.time())
    }
    msg = canonical_json(payload)
    sig = hmac.new(_load_secret(), msg, hashlib.sha256).hexdigest()
    payload["signature"] = sig
    payload["algo"] = "HMAC-SHA256"
    return payload

def _payload(att: dict) -> bytes:
    """Extract canonical payload for verification (sans signature)."""
    core = {k:v for k,v in att.items() if k not in ("signature","algo","pubkey")}
    return canonical_json(core)

def verify_hmac(att: dict, key: str = "DEMO_KEY") -> bool:
    """Verify HMAC with the provided key string."""
    msg=_payload(att); calc=hmac.new(key.encode("utf-8"), msg, hashlib.sha256).hexdigest()
    return calc == att.get("signature")


## Reporting and PDF export
Human-readable markdown with tables, plus optional PDF output via matplotlib.


In [None]:
# report.py / pdf.py
import datetime
from typing import List
try:
    import matplotlib.pyplot as plt
except Exception:
    plt = None

def render_markdown(spec_path: str, inputs_path: str, result: dict, attestation: dict, verbose: bool = True) -> str:
    """Render a Markdown summary report."""
    ts = datetime.datetime.utcnow().isoformat()+"Z"
    o=result["original"]; r=result["repaired"]; steps=result["report"].get("steps", [])
    rows = "\n".join([f"| {k} | {o.get(k)} | {r.get(k)} |" for k in sorted(set(o)|set(r))])
    md = [
        f"# EQ-PROOF Report",
        f"- Generated: {ts}",
        f"- Spec: `{spec_path}`",
        f"- Inputs: `{inputs_path}`",
        "\n## Original vs Repaired",
        "| Variable | Original | Repaired |",
        "|---|---:|---:|",
        rows
    ]
    if verbose:
        md.append("\n## Steps")
        md += (["- "+json.dumps(s) for s in steps] or ["- none"])
        md.append("\n## Attestation")
        md.append(f"- algorithm: {attestation.get('algo')}")
        md.append(f"- signature: `{attestation.get('signature')}`")
    return "\n".join(md)

def report_lines(spec_path: str, inputs_path: str, result: dict, attestation: dict) -> List[str]:
    """Plain-text lines suitable for PDF export."""
    lines=[f"Spec: {spec_path}", f"Inputs: {inputs_path}"]
    o=result["original"]; r=result["repaired"]; steps=result["report"].get("steps", [])
    lines.append("")
    lines.append("Original vs Repaired")
    for k in sorted(set(o)|set(r)):
        lines.append(f"- {k}: {o.get(k)} -> {r.get(k)}")
    lines.append("")
    lines.append("Steps:")
    lines += [json.dumps(s) for s in steps]
    lines.append("")
    lines.append(f"Attestation: {attestation.get('algo')} {attestation.get('signature')}")
    return lines

def save_text_pdf(lines: List[str], out_path: str, title: str = "EQ-PROOF Report", title_size: int = 16, body_size: int = 9) -> None:
    """Save a simple text-based PDF via matplotlib."""
    if plt is None:
        raise RuntimeError("matplotlib not available for PDF export")
    fig = plt.figure(figsize=(8.27, 11.69)); ax = fig.add_axes([0,0,1,1]); ax.axis('off')
    ax.text(0.05, 0.95, title, va='top', ha='left', fontsize=title_size, family='monospace')
    ax.text(0.05, 0.90, "\n".join(lines), va='top', ha='left', fontsize=body_size, family='monospace')
    fig.savefig(out_path, format='pdf', bbox_inches='tight'); plt.close(fig)


## End-to-end demo (load a real JSON spec)
Place a JSON file (e.g., `examples/budget_spec.json`) next to the notebook, then run this cell.


In [None]:
# Demo: load spec from JSON and run the pipeline
SPEC_PATH = "examples/budget_spec.json"  # change if needed
INPUTS_PATH = "examples/budget_inputs.json"  # optional; demo inputs inline if missing

try:
    spec = load_spec(SPEC_PATH)
except FileNotFoundError:
    # Fallback: build a tiny spec inline
    spec = Spec(
        name="DemoSpec", version="0.2", variables=["x","y","cap"],
        constraints=[
            {"type":"bounds","var":"x","lower":0,"upper":10},
            {"type":"equality","expr":"Eq(x+y,cap)","solve_for":"y"}
        ],
        probes=[], alternates=[], units={}
    )
    SPEC_PATH = "demo_inline.json"

try:
    with open(INPUTS_PATH, "r") as f:
        inputs = json.load(f)
except FileNotFoundError:
    inputs = {"x": 12, "y": 3, "cap": 14}
    INPUTS_PATH = "demo_inputs_inline.json"

result = diagnose_and_repair(spec, inputs, spec_path=SPEC_PATH, inputs_path=INPUTS_PATH, verbose=True)
att = attest(spec.__dict__, result, spec_path=SPEC_PATH, inputs_path=INPUTS_PATH)

print("\n=== Markdown report ===\n")
md = render_markdown(SPEC_PATH, INPUTS_PATH, result, att, verbose=True)
print(md)

print("\n=== HMAC verification ===")
print("Verified:", verify_hmac(att))

if plt:
    lines = report_lines(SPEC_PATH, INPUTS_PATH, result, att)
    save_text_pdf(lines, "eqproof_report.pdf")
    print("PDF saved: eqproof_report.pdf")
else:
    print("matplotlib unavailable: PDF export skipped")


## Quick tests
Lightweight checks for core functions.


In [None]:
# Tests
def _assert_close(a, b, tol=1e-9):
    assert abs(a-b) <= tol, f"{a} !~= {b} (tol={tol})"

# Units
dim_m, fac_m = parse_unit("m"); _assert_close(fac_m, 1.0)
dim_ms, fac_ms = parse_unit("m/s"); assert dim_ms != dim_m

# Constraints
_assert_close(equality_residual("Eq(x+y, cap)", {"x":1,"y":2,"cap":3}), 0.0)
sol = equality_solve_for("Eq(x+y, cap)", "y", {"x":1,"cap":3}); _assert_close(sol, 2.0)

# Simplex
proj = project_simplex([0.2, 0.2, 0.8]); _assert_close(sum(proj), 1.0)

# Attestation
att_ = attest(spec.__dict__, result, spec_path=SPEC_PATH, inputs_path=INPUTS_PATH)
assert isinstance(att_.get("signature"), str)
print("All quick tests passed.")
