<a href="https://colab.research.google.com/github/Dey313/ResEthiq/blob/main/F_AI_(updated).ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [11]:
!pip -q install pandas numpy scipy scikit-learn reportlab matplotlib

In [12]:
from google.colab import files
uploaded = files.upload()
csv_path = next(iter(uploaded.keys()))
csv_path


Saving 2025_06_09_morphometric data.xlsx to 2025_06_09_morphometric data (1).xlsx


'2025_06_09_morphometric data (1).xlsx'

In [13]:
import io, re, math, json, hashlib
from dataclasses import dataclass, asdict
from datetime import datetime
from typing import Any, Dict, List, Tuple

import numpy as np
import pandas as pd
from scipy.stats import chisquare, ks_2samp, skew, kurtosis
from scipy.stats import entropy as scipy_entropy

from sklearn.preprocessing import OneHotEncoder, StandardScaler
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.decomposition import TruncatedSVD
from sklearn.ensemble import IsolationForest
from sklearn.metrics.pairwise import cosine_similarity

from reportlab.lib.pagesizes import A4
from reportlab.lib.units import cm
from reportlab.lib import colors
from reportlab.pdfgen import canvas


# =========================================================
# 0) Robust CSV loader (fixes UnicodeDecodeError)
# =========================================================
def load_csv_robust(path: str, nrows: int | None = None) -> Tuple[pd.DataFrame, Dict[str, Any]]:
    encodings = ["utf-8", "utf-8-sig", "cp1252", "iso-8859-1", "latin1", "utf-16", "utf-16le", "utf-16be"]
    errors_mode = ["strict", "replace"]  # strict first; replace as fallback
    delimiter_candidates = [",", ";", "\t", "|"]

    last_err = None

    for enc in encodings:
        for err in errors_mode:
            for sep in delimiter_candidates:
                try:
                    df = pd.read_csv(
                        path,
                        encoding=enc,
                        encoding_errors=err,  # pandas >= 2.0
                        sep=sep,
                        engine="python",      # tolerant parser
                        nrows=nrows
                    )
                    # sanity check: "real" CSV usually has >=2 columns
                    if df.shape[1] >= 2:
                        return df, {"encoding": enc, "encoding_errors": err, "sep": sep}
                except Exception as e:
                    last_err = e

    # If we got here, maybe it's not a CSV at all (e.g., Excel zip)
    with open(path, "rb") as f:
        sig = f.read(8)
    raise RuntimeError(
        f"Could not parse as CSV with common encodings/separators. "
        f"First 8 bytes: {sig}. Last error: {last_err}"
    )


# =========================================================
# 1) Config
# =========================================================
HEALTHCARE_KEYWORDS = [
    "patient", "mrn", "ehr", "icd", "diagnosis", "diag", "drug", "med", "dose",
    "lab", "test", "hba1c", "bp", "systolic", "diastolic", "spo2", "hr", "heart_rate",
    "temperature", "temp", "admission", "discharge", "encounter", "visit", "hospital", "clinic"
]
AI_KEYWORDS = [
    "prompt", "completion", "token", "logprob", "embedding", "model", "train", "eval",
    "benchmark", "label", "annotation", "dataset", "loss", "accuracy", "f1", "roc",
    "latency", "inference", "input", "output"
]

MAX_ROWS_FOR_HEAVY = 300_000
SAMPLE_ROWS = 120_000

THRESH = {
    "dup_row_pct_warn": 0.02,
    "dup_row_pct_fail": 0.10,
    "missing_overall_warn": 0.10,
    "missing_overall_fail": 0.30,
    "high_corr_warn": 10,
    "high_corr_fail": 30,
    "benford_p_warn": 0.05,
    "benford_p_fail": 0.01,
    "rounding_integer_warn": 0.85,
    "rounding_integer_fail": 0.95,
    "entropy_low_warn": 1.0,
    "entropy_low_fail": 0.6,
    "isoforest_outlier_warn": 0.08,
    "isoforest_outlier_fail": 0.15,
}


# =========================================================
# 2) Utilities
# =========================================================
def sha256_bytes(b: bytes) -> str:
    return hashlib.sha256(b).hexdigest()

def stable_dataset_fingerprint(df: pd.DataFrame) -> str:
    schema = "|".join([f"{c}:{str(t)}" for c, t in zip(df.columns, df.dtypes)])
    sample = df.head(500).to_csv(index=False).encode("utf-8", errors="ignore")
    return sha256_bytes(schema.encode("utf-8", errors="ignore") + b"\n" + sample)

def safe_sample(df: pd.DataFrame, max_rows: int = MAX_ROWS_FOR_HEAVY) -> Tuple[pd.DataFrame, Dict[str, Any]]:
    meta = {"sampled": False, "original_rows": int(len(df)), "used_rows": int(len(df))}
    if len(df) > max_rows:
        df2 = df.sample(n=min(SAMPLE_ROWS, len(df)), random_state=42)
        meta["sampled"] = True
        meta["used_rows"] = int(len(df2))
        return df2, meta
    return df, meta

def to_numeric_clean(series: pd.Series) -> pd.Series:
    s = pd.to_numeric(series, errors="coerce")
    s = s.replace([np.inf, -np.inf], np.nan).dropna()
    return s

def top_keywords_score(df: pd.DataFrame, keywords: List[str], max_cells: int = 50_000) -> int:
    blob = " ".join([str(c).lower() for c in df.columns])
    obj_cols = df.select_dtypes(include=["object", "string"]).columns.tolist()
    taken = 0
    for c in obj_cols[:10]:
        col = df[c].astype(str).head(500)
        for v in col:
            blob += " " + str(v).lower()
            taken += 1
            if taken >= max_cells:
                break
        if taken >= max_cells:
            break
    score = 0
    for kw in keywords:
        if kw in blob:
            score += 1
    return score

def severity_from_threshold(value: float, warn: float, fail: float, higher_is_worse: bool = True) -> str:
    if higher_is_worse:
        if value >= fail: return "FAIL"
        if value >= warn: return "WARN"
        return "INFO"
    else:
        if value <= fail: return "FAIL"
        if value <= warn: return "WARN"
        return "INFO"


# =========================================================
# 3) Data structures
# =========================================================
@dataclass
class Finding:
    name: str
    severity: str
    metric: Dict[str, Any]
    rationale: str

@dataclass
class Invariant:
    name: str
    status: str  # PASS/FAIL/UNKNOWN
    details: Dict[str, Any]
    rationale: str

@dataclass
class DomainRouting:
    p_healthcare: float
    p_ai: float
    chosen: str
    evidence: Dict[str, Any]

@dataclass
class AISignals:
    applicable: bool
    outlier_rate: float
    centroid_cosine: float
    notes: List[str]
    details: Dict[str, Any]

@dataclass
class RealityStress:
    applicable: bool
    rri: float
    collapse_curve: List[Tuple[float, float]]
    assumption_load: List[str]
    notes: List[str]
    details: Dict[str, Any]


# =========================================================
# 4) Router (Healthcare vs AI company)
# =========================================================
def domain_router(df: pd.DataFrame) -> DomainRouting:
    hc = top_keywords_score(df, HEALTHCARE_KEYWORDS)
    ai = top_keywords_score(df, AI_KEYWORDS)

    cols = [str(c).lower() for c in df.columns]
    has_patient_id = any(re.search(r"\b(mrn|patient_id|patientid|uhid)\b", c) for c in cols)
    has_icd = any("icd" in c for c in cols)
    has_prompt = any("prompt" in c for c in cols)
    has_label = any(re.search(r"\b(label|target|class|y)\b", c) for c in cols)

    hc += 2 if has_patient_id else 0
    hc += 2 if has_icd else 0
    ai += 2 if has_prompt else 0
    ai += 1 if has_label else 0

    total = hc + ai + 2
    p_hc = (hc + 1) / total
    p_ai = (ai + 1) / total
    chosen = "Healthcare" if p_hc >= p_ai else "AI Company"

    return DomainRouting(
        p_healthcare=float(p_hc),
        p_ai=float(p_ai),
        chosen=chosen,
        evidence={
            "keyword_hits": {"healthcare": hc, "ai": ai},
            "structural_hints": {"has_patient_id": has_patient_id, "has_icd": has_icd, "has_prompt": has_prompt, "has_label": has_label},
        }
    )


# =========================================================
# 5) Forensic statistics core
# =========================================================
def duplicate_rows(df: pd.DataFrame) -> Finding:
    total = len(df)
    dup = int(df.duplicated().sum())
    rate = dup / total if total else 0.0
    sev = severity_from_threshold(rate, THRESH["dup_row_pct_warn"], THRESH["dup_row_pct_fail"], True)
    return Finding("Duplicate rows", sev, {"duplicate_rows": dup, "rows": int(total), "duplicate_rate": float(rate)},
                   "High duplication can indicate templating/copying, synthetic repetition, or pipeline errors.")

def missingness(df: pd.DataFrame) -> Finding:
    overall = float(df.isna().mean().mean())
    top = df.isna().mean().sort_values(ascending=False).head(12)
    sev = severity_from_threshold(overall, THRESH["missing_overall_warn"], THRESH["missing_overall_fail"], True)
    return Finding("Missingness profile", sev,
                   {"overall_missing_rate": overall, "top_missing": [(k, float(v)) for k, v in top.items()]},
                   "Excess missingness can hide manipulation or indicate broken collection pipelines.")

def benford_first_digit(series: pd.Series) -> Dict[str, Any]:
    s = to_numeric_clean(series)
    s = s[s != 0].abs()
    if len(s) < 250:
        return {"applicable": False, "reason": "Too few numeric values (n<250).", "n": int(len(s))}
    first = s.astype(str).str.replace(r"[^0-9.]", "", regex=True)
    first = first.str.lstrip("0").str.replace(".", "", regex=False)
    first = first[first.str.len() > 0].str[0]
    first = pd.to_numeric(first, errors="coerce").dropna().astype(int)
    first = first[first.between(1, 9)]
    if len(first) < 250:
        return {"applicable": False, "reason": "Too few usable first digits.", "n": int(len(first))}
    counts = np.array([(first == d).sum() for d in range(1, 10)], dtype=float)
    exp = np.array([math.log10(1 + 1/d) for d in range(1, 10)], dtype=float) * counts.sum()
    chi, p = chisquare(counts, f_exp=exp)
    return {"applicable": True, "n": int(counts.sum()), "chi2": float(chi), "p_value": float(p)}

def benford(df: pd.DataFrame) -> Finding:
    num_cols = df.select_dtypes(include=[np.number]).columns.tolist()
    if not num_cols:
        return Finding("Benford test", "INFO", {"applicable": False, "reason": "No numeric columns."},
                       "Benford requires naturally occurring numbers; not applicable here.")
    col = num_cols[0]
    res = benford_first_digit(df[col])
    if not res.get("applicable"):
        return Finding(f"Benford test on '{col}'", "INFO", res, "Benford not applicable or insufficient data.")
    p = res["p_value"]
    sev = "FAIL" if p <= THRESH["benford_p_fail"] else ("WARN" if p <= THRESH["benford_p_warn"] else "INFO")
    return Finding(f"Benford test on '{col}'", sev, res,
                   "Low p suggests deviation (can be benign due to bounded scales; may also suggest fabrication).")

def rounding(series: pd.Series) -> Dict[str, Any]:
    s = to_numeric_clean(series)
    if len(s) < 200:
        return {"applicable": False, "n": int(len(s))}
    frac = np.abs(s - np.round(s))
    pct_int = float((frac < 1e-12).mean())
    pct_1dp = float((np.abs(s * 10 - np.round(s * 10)) < 1e-12).mean())
    pct_2dp = float((np.abs(s * 100 - np.round(s * 100)) < 1e-12).mean())
    return {"applicable": True, "n": int(len(s)), "pct_integer": pct_int, "pct_1dp": pct_1dp, "pct_2dp": pct_2dp}

def rounding_f(df: pd.DataFrame) -> Finding:
    num_cols = df.select_dtypes(include=[np.number]).columns.tolist()
    if not num_cols:
        return Finding("Rounding artifacts", "INFO", {"applicable": False, "reason": "No numeric columns."},
                       "Rounding checks need numeric columns.")
    col = num_cols[0]
    res = rounding(df[col])
    if not res.get("applicable"):
        return Finding(f"Rounding artifacts on '{col}'", "INFO", res, "Not enough numeric values.")
    sev = "FAIL" if res["pct_integer"] >= THRESH["rounding_integer_fail"] else ("WARN" if res["pct_integer"] >= THRESH["rounding_integer_warn"] else "INFO")
    return Finding(f"Rounding artifacts on '{col}'", sev, res,
                   "Extremely high rounding may indicate templating/synthetic generation (or true instrument precision).")

def entropy_f(df: pd.DataFrame) -> Finding:
    num_cols = df.select_dtypes(include=[np.number]).columns.tolist()
    if num_cols:
        col = num_cols[0]
        s = to_numeric_clean(df[col])
        if len(s) < 200:
            return Finding("Entropy profile", "INFO", {"applicable": False, "n": int(len(s))}, "Too few values.")
        bins = np.histogram_bin_edges(s, bins="auto")
        hist, _ = np.histogram(s, bins=bins)
        p = hist / max(hist.sum(), 1)
        h = float(scipy_entropy(p + 1e-12))
        sev = severity_from_threshold(h, THRESH["entropy_low_warn"], THRESH["entropy_low_fail"], higher_is_worse=False)
        return Finding(f"Entropy on '{col}' (binned)", sev, {"entropy": h, "bins": int(len(hist)), "n": int(len(s))},
                       "Very low entropy can indicate over-smoothing/mode collapse; interpret with context.")
    obj_cols = df.select_dtypes(include=["object", "string"]).columns.tolist()
    if obj_cols:
        col = obj_cols[0]
        s = df[col].dropna().astype(str)
        if len(s) < 200:
            return Finding("Entropy profile", "INFO", {"applicable": False, "n": int(len(s))}, "Too few values.")
        vc = s.value_counts().head(300)
        p = vc.values / vc.values.sum()
        h = float(scipy_entropy(p + 1e-12))
        return Finding(f"Entropy on '{col}' (categorical)", "INFO", {"entropy": h, "k": int(len(vc)), "n": int(len(s))},
                       "Categorical entropy is contextual.")
    return Finding("Entropy profile", "INFO", {"applicable": False}, "No usable columns for entropy.")

def corr_anoms(df: pd.DataFrame) -> Finding:
    num = df.select_dtypes(include=[np.number])
    if num.shape[1] < 3:
        return Finding("Correlation structure", "INFO", {"applicable": False, "reason": "Too few numeric cols."},
                       "Need >=3 numeric cols.")
    corr = num.corr().abs()
    upper = corr.where(np.triu(np.ones(corr.shape), k=1).astype(bool))
    high = upper.stack()
    high = high[high >= 0.98].sort_values(ascending=False)
    pairs = [(a, b, float(v)) for (a, b), v in high.head(30).items()]
    n = len(pairs)
    sev = "FAIL" if n >= THRESH["high_corr_fail"] else ("WARN" if n >= THRESH["high_corr_warn"] else "INFO")
    return Finding("Near-perfect correlations (>=0.98)", sev, {"count": n, "pairs_top": pairs[:15]},
                   "Excess near-perfect correlations can indicate leakage/derived cols or synthetic construction.")

def split_shift(df: pd.DataFrame) -> Finding:
    if len(df) < 600:
        return Finding("Split-half KS shift", "INFO", {"applicable": False, "reason": "Too few rows."},
                       "Need >=600 rows.")
    num_cols = df.select_dtypes(include=[np.number]).columns.tolist()
    if len(num_cols) < 2:
        return Finding("Split-half KS shift", "INFO", {"applicable": False, "reason": "Too few numeric cols."},
                       "Need >=2 numeric cols.")
    mid = len(df)//2
    a, b = df.iloc[:mid], df.iloc[mid:]
    shifts=[]
    for c in num_cols[:20]:
        sa, sb = to_numeric_clean(a[c]), to_numeric_clean(b[c])
        if len(sa) < 150 or len(sb) < 150:
            continue
        stat, p = ks_2samp(sa.values, sb.values)
        shifts.append((c, float(stat), float(p)))
    low_p = [x for x in shifts if x[2] < 0.01]
    rate = len(low_p)/max(1, len(shifts))
    sev = "FAIL" if rate >= 0.50 else ("WARN" if rate >= 0.25 else "INFO")
    return Finding("Split-half distribution shift (KS)", sev,
                   {"tested_cols": len(shifts), "low_p_cols": len(low_p), "rate": float(rate), "examples": low_p[:10]},
                   "Widespread shifts can indicate stitching/backfill/regime changes.")

def moments(df: pd.DataFrame) -> Finding:
    num = df.select_dtypes(include=[np.number])
    if num.shape[1] < 2:
        return Finding("Moment fingerprints", "INFO", {"applicable": False, "reason": "Too few numeric cols."},
                       "Need >=2 numeric cols.")
    stats=[]
    for c in num.columns[:25]:
        s = to_numeric_clean(num[c])
        if len(s) < 200:
            continue
        stats.append((c, float(skew(s)), float(kurtosis(s, fisher=True))))
    if not stats:
        return Finding("Moment fingerprints", "INFO", {"applicable": False, "reason": "Not enough data per col."},
                       "Not enough data.")
    near0 = sum(1 for _,_,k in stats if abs(k) < 0.25)
    rate = near0/len(stats)
    sev = "WARN" if rate > 0.65 else "INFO"
    return Finding("Moment fingerprints (skew/kurtosis)", sev,
                   {"cols": len(stats), "near_zero_kurtosis_rate": float(rate), "examples": stats[:10]},
                   "Many near-zero kurtosis columns can indicate over-smoothing; interpret with context.")

def forensic_core(df: pd.DataFrame) -> List[Finding]:
    return [duplicate_rows(df), missingness(df), benford(df), rounding_f(df), entropy_f(df),
            corr_anoms(df), split_shift(df), moments(df)]


# =========================================================
# 6) Formal invariants
# =========================================================
def inv_unique_columns(df: pd.DataFrame) -> Invariant:
    ok = len(set(df.columns)) == len(df.columns)
    return Invariant("Schema: unique column names", "PASS" if ok else "FAIL",
                     {"unique": ok}, "Duplicate columns cause ambiguity and can mask tampering.")

def inv_parseable_time(df: pd.DataFrame) -> Invariant:
    time_cols = [c for c in df.columns if any(k in str(c).lower() for k in ("time","timestamp","date"))]
    if not time_cols:
        return Invariant("Temporal: timestamp parseability", "UNKNOWN",
                         {"reason":"No timestamp-like columns"}, "Needs a timestamp column.")
    c = time_cols[0]
    parsed = pd.to_datetime(df[c], errors="coerce", utc=True)
    rate = float(parsed.notna().mean())
    status = "PASS" if rate >= 0.90 else ("FAIL" if rate < 0.75 else "UNKNOWN")
    return Invariant(f"Temporal: '{c}' parseability", status, {"parseable_rate": rate, "column": c},
                     "Low parseability indicates inconsistent timestamps or schema misuse.")

def inv_id_dup(df: pd.DataFrame) -> Invariant:
    id_cols = [c for c in df.columns if re.search(r"\b(id|uuid|mrn|patient_id|record_id)\b", str(c).lower())]
    if not id_cols:
        return Invariant("Identity: ID duplication sanity", "UNKNOWN",
                         {"reason":"No ID-like columns"}, "Needs an ID column.")
    c = id_cols[0]
    s = df[c].dropna().astype(str)
    if len(s) < 100:
        return Invariant(f"Identity: '{c}' duplication sanity", "UNKNOWN",
                         {"reason":"Too few IDs", "column": c}, "Not enough IDs.")
    dup = float(s.duplicated().mean())
    status = "PASS" if dup <= 0.05 else ("FAIL" if dup >= 0.20 else "UNKNOWN")
    return Invariant(f"Identity: '{c}' duplication rate", status, {"dup_rate": dup, "column": c},
                     "Very high duplication suggests broken entity identity or fabricated records.")

def inv_time_order(df: pd.DataFrame) -> Invariant:
    lower = {str(c).lower(): c for c in df.columns}
    start_keys = ["start","admission","admit","begin"]
    end_keys = ["end","discharge","release","finish"]
    start_col = next((lower[k] for k in lower if any(s in k for s in start_keys)), None)
    end_col = next((lower[k] for k in lower if any(e in k for e in end_keys)), None)
    if not start_col or not end_col:
        return Invariant("Temporal: no time-travel ordering", "UNKNOWN",
                         {"reason":"No obvious start/end pair"}, "Needs start/end timestamps.")
    a = pd.to_datetime(df[start_col], errors="coerce", utc=True)
    b = pd.to_datetime(df[end_col], errors="coerce", utc=True)
    m = a.notna() & b.notna()
    if m.mean() < 0.30:
        return Invariant("Temporal: no time-travel ordering", "UNKNOWN",
                         {"reason":"Too few paired timestamps"}, "Not enough pairs.")
    violations = int((b[m] < a[m]).sum())
    rate = float(violations / max(1, int(m.sum())))
    status = "PASS" if rate == 0.0 else ("FAIL" if rate > 0.02 else "UNKNOWN")
    return Invariant(f"Temporal ordering: '{end_col}' >= '{start_col}'", status,
                     {"violations": violations, "paired": int(m.sum()), "violation_rate": rate},
                     "End-before-start violations can indicate tampering or merge errors.")

def invariants(df: pd.DataFrame) -> List[Invariant]:
    return [inv_unique_columns(df), inv_parseable_time(df), inv_id_dup(df), inv_time_order(df)]


# =========================================================
# 7) AI integrity engine (embedding + anomaly + disagreement)
# =========================================================
def build_embedding_pipe(df: pd.DataFrame) -> Pipeline:
    num_cols = df.select_dtypes(include=[np.number]).columns.tolist()
    cat_cols = [c for c in df.columns if c not in num_cols]
    pre = ColumnTransformer(
        transformers=[
            ("num", Pipeline([("scaler", StandardScaler(with_mean=False))]), num_cols),
            ("cat", OneHotEncoder(handle_unknown="ignore", min_frequency=5), cat_cols),
        ],
        remainder="drop",
        sparse_threshold=0.3,
    )
    return Pipeline([("pre", pre), ("svd", TruncatedSVD(n_components=32, random_state=42))])

def ai_engine(df: pd.DataFrame) -> AISignals:
    if len(df) < 800:
        return AISignals(False, 0.0, 1.0, ["Not enough rows for AI model (n<800)."], {})
    try:
        dfx = df.copy()
        if len(dfx) > 60_000:
            dfx = dfx.sample(n=60_000, random_state=42)
        pipe = build_embedding_pipe(dfx)
        Z = pipe.fit_transform(dfx)

        iso = IsolationForest(n_estimators=200, contamination="auto", random_state=42, n_jobs=-1)
        pred = iso.fit_predict(Z)
        out_rate = float((pred == -1).mean())

        idx = np.arange(Z.shape[0])
        np.random.shuffle(idx)
        a = Z[idx[:len(idx)//2]]
        b = Z[idx[len(idx)//2:]]
        ca = a.mean(axis=0, keepdims=True)
        cb = b.mean(axis=0, keepdims=True)
        cos = float(cosine_similarity(ca, cb)[0,0])

        notes=[]
        sev_out = severity_from_threshold(out_rate, THRESH["isoforest_outlier_warn"], THRESH["isoforest_outlier_fail"], True)
        if sev_out != "INFO":
            notes.append(f"Outlier rate elevated: {out_rate:.2%} (IsolationForest in latent space).")
        if cos < 0.985:
            notes.append(f"Centroid disagreement across splits: cosine={cos:.4f} (mixing/instability).")

        return AISignals(True, out_rate, cos, notes, {"used_rows": int(len(dfx)), "latent_dim": int(Z.shape[1])})
    except Exception as e:
        return AISignals(False, 0.0, 1.0, [f"AI engine error: {e}"], {})


# =========================================================
# 8) Industry packs
# =========================================================
def healthcare_pack(df: pd.DataFrame) -> List[Finding]:
    cols = {str(c).lower(): c for c in df.columns}
    out=[]
    def bounds(key, lo, hi, label):
        if key in cols:
            c = cols[key]
            s = to_numeric_clean(df[c])
            if len(s) < 200: return
            bad = int(((s < lo) | (s > hi)).sum())
            rate = bad / max(1, len(s))
            sev = "FAIL" if rate > 0.05 else ("WARN" if rate > 0.01 else "INFO")
            out.append(Finding(f"Healthcare bounds: {c}", sev,
                              {"column": c, "bounds":[lo,hi], "violations": bad, "n": int(len(s)), "violation_rate": float(rate)},
                              f"{label} plausibility bounds. Violations can indicate unit errors/corruption/fabrication."))
    bounds("age", 0, 120, "Age")
    bounds("spo2", 50, 100, "SpO₂")
    bounds("hr", 20, 250, "Heart rate")
    bounds("heart_rate", 20, 250, "Heart rate")
    bounds("systolic", 50, 250, "Systolic BP")
    bounds("diastolic", 20, 150, "Diastolic BP")
    bounds("temp", 30, 45, "Temperature")
    bounds("temperature", 30, 45, "Temperature")
    return out

def ai_pack(df: pd.DataFrame) -> List[Finding]:
    out=[]
    label_cols = [c for c in df.columns if str(c).lower() in ("label","target","y","class")]
    if label_cols:
        c = label_cols[0]
        vc = df[c].value_counts(dropna=False)
        total = int(vc.sum())
        top_pct = float(vc.iloc[0]/total) if total else 0.0
        sev = "WARN" if top_pct > 0.90 else "INFO"
        out.append(Finding(f"AI pack: label imbalance '{c}'", sev,
                           {"column": c, "top_class_pct": top_pct, "top_classes": vc.head(10).to_dict()},
                           "Extreme imbalance may indicate bias, leakage, or synthetic oversampling."))
    text_cols = [c for c in df.columns if any(k in str(c).lower() for k in ("text","prompt","query","content"))]
    if text_cols:
        c = text_cols[0]
        s = df[c].dropna().astype(str)
        if len(s)>0:
            dup = int(s.duplicated().sum())
            rate = float(dup/len(s))
            sev = "FAIL" if rate > 0.25 else ("WARN" if rate > 0.08 else "INFO")
            out.append(Finding(f"AI pack: text duplication '{c}'", sev,
                               {"column": c, "duplicate_count": dup, "n": int(len(s)), "duplicate_rate": rate},
                               "High duplication suggests templating/scraping artifacts or synthetic regeneration."))
    return out


# =========================================================
# 9) Reality Stress Testing (collapse curve + RRI)
# =========================================================
def reality_stress(df: pd.DataFrame) -> RealityStress:
    num = df.select_dtypes(include=[np.number])
    if num.shape[1] < 3 or len(df) < 1500:
        return RealityStress(False, 0.70, [], ["Insufficient numeric structure for v1 stress testing."],
                            ["RST not applicable; using default RRI=0.70."], {})

    dfx = df
    if len(dfx) > 80_000:
        dfx = dfx.sample(n=80_000, random_state=42)
    num = dfx.select_dtypes(include=[np.number]).copy()
    cols = list(num.columns[:12])
    base = num[cols]

    def ks_fail_rate(test_df: pd.DataFrame, p_thresh: float = 0.01) -> float:
        fails=0; tested=0
        for c in cols:
            a = to_numeric_clean(base[c])
            b = to_numeric_clean(test_df[c])
            if len(a)<500 or len(b)<500: continue
            _, p = ks_2samp(a.values, b.values)
            tested += 1
            if p < p_thresh: fails += 1
        return fails / max(1, tested)

    curve=[]
    for s in [0.0, 0.1, 0.2, 0.35, 0.5]:
        test = num[cols].copy()
        if s>0:
            frac = max(0.5, 1.0 - s)
            test = test.sample(frac=frac, random_state=int(s*1000))
            for c in cols:
                col = to_numeric_clean(test[c])
                if len(col) < 500: continue
                sd = float(np.nanstd(col.values))
                if sd <= 0: continue
                noise = np.random.normal(0.0, sd*(s*0.15), size=len(test))
                test.loc[test.index, c] = pd.to_numeric(test[c], errors="coerce").values + noise
        fr = ks_fail_rate(test, 0.01)
        curve.append((float(s), float(fr)))

    xs = np.array([x for x,_ in curve]); ys = np.array([y for _,y in curve])
    auc = float(np.trapz(ys, xs) / max(1e-9, xs.max()))
    rri = float(np.clip(1.0 - auc, 0.0, 1.0))

    notes = ["Reality robustness high under v1 stress tests." if rri>=0.75 else
             "Reality robustness moderate; review collapse modes." if rri>=0.60 else
             "Reality robustness low; fingerprints collapse quickly under perturbation."]

    return RealityStress(True, rri, curve,
                        ["Distribution stability under subsampling", "Noise-floor realism under perturbation", "Multi-column coherence under stress (KS)"],
                        notes, {"cols_used": cols, "used_rows": int(len(dfx))})


# =========================================================
# 10) Scoring + verdict
# =========================================================
def score_findings(fs: List[Finding]) -> float:
    score=100.0
    for f in fs:
        if f.severity=="WARN": score -= 8
        elif f.severity=="FAIL": score -= 18
    return float(np.clip(score,0,100))

def score_invariants(inv: List[Invariant]) -> float:
    score=100.0
    for x in inv:
        if x.status=="FAIL": score -= 20
        elif x.status=="UNKNOWN": score -= 6
    return float(np.clip(score,0,100))

def score_ai(ai: AISignals) -> float:
    if not ai.applicable:
        return 75.0
    sc = 100.0
    sc -= 60.0 * float(np.clip(ai.outlier_rate, 0, 0.25))
    if ai.centroid_cosine < 0.985:
        sc -= 12.0
    return float(np.clip(sc,0,100))

def verdict(scores: Dict[str,float]) -> str:
    if scores["statistical_integrity"] < 60 or scores["invariant_integrity"] < 60 or scores["reality_robustness"] < 55:
        return "COMPROMISED / DO NOT USE (high risk)"
    if scores["statistical_integrity"] < 75 or scores["invariant_integrity"] < 75 or scores["reality_robustness"] < 70:
        return "CONDITIONAL (use only with controls + provenance)"
    return "ACCEPTABLE (MVP checks passed; enable provenance for high-stakes use)"

def recommendations(scores: Dict[str,float], routing: DomainRouting) -> List[str]:
    rec=[]
    if scores["invariant_integrity"] < 80: rec.append("Fix schema/ID/timestamp invariant issues before relying on this dataset.")
    if scores["statistical_integrity"] < 80: rec.append("Investigate forensic flags (missingness/duplication/Benford/rounding/correlation) and rerun.")
    if scores["reality_robustness"] < 75: rec.append("Dataset is brittle under stress tests; request collection/process documentation and run deeper checks.")
    rec.append("Next step: enable Proof-of-Provenance (signed receipts + version diffs) to prove what existed when and detect tampering.")
    rec.append("Healthcare: validate units and enforce physiologic constraints upstream." if routing.chosen=="Healthcare"
               else "AI companies: lock eval sets by hash and run leakage/duplication controls.")
    return rec


# =========================================================
# 11) PDF report
# =========================================================
def generate_pdf(report: Dict[str, Any], out_path: str = "resethiq_integrity_report.pdf") -> str:
    c = canvas.Canvas(out_path, pagesize=A4)
    w,h = A4

    def header():
        c.setFont("Helvetica-Bold", 16)
        c.setFillColor(colors.black)
        c.drawString(2*cm, h-2*cm, "Resethiq Integrity Report (Colab MVP)")
        c.setFont("Helvetica", 9)
        c.setFillColor(colors.grey)
        meta = report["meta"]
        c.drawString(2*cm, h-2.6*cm,
                     f"Generated: {meta['generated_at']} | Fingerprint: {meta['fingerprint'][:16]}… | Rows used: {meta['used_rows']}/{meta['rows']}")
        c.setStrokeColor(colors.lightgrey)
        c.line(2*cm, h-2.85*cm, w-2*cm, h-2.85*cm)

    def ensure(y, need=2*cm):
        if y < need:
            c.showPage()
            header()
            return h - 3.3*cm
        return y

    def section(y, title, lines):
        y = ensure(y)
        c.setFont("Helvetica-Bold", 12)
        c.setFillColor(colors.black)
        c.drawString(2*cm, y, title)
        y -= 0.6*cm
        c.setFont("Helvetica", 10)
        for ln in lines:
            y = ensure(y)
            c.drawString(2.2*cm, y, str(ln)[:115])
            y -= 0.45*cm
        return y - 0.3*cm

    header()
    y = h - 3.5*cm

    scores = report["scores"]
    routing = report["routing"]
    y = section(y, "1) Executive Summary", [
        f"Industry routing: {routing['chosen']} (p_healthcare={routing['p_healthcare']:.2f}, p_ai={routing['p_ai']:.2f})",
        f"Verdict: {report['verdict']}",
        f"Statistical integrity: {scores['statistical_integrity']:.1f}/100",
        f"Invariant integrity: {scores['invariant_integrity']:.1f}/100",
        f"AI integrity: {scores['ai_integrity']:.1f}/100",
        f"Reality robustness: {scores['reality_robustness']:.1f}/100",
        f"CSV parse: {report.get('read_meta',{})}",
    ])

    y = section(y, "2) Forensic Findings (Core)", [
        f"[{f['severity']}] {f['name']} | {json.dumps(f['metric'])[:90]}..." for f in report["forensic_findings"][:24]
    ] or ["No forensic findings."])

    y = section(y, "3) Formal Invariants", [
        f"[{i['status']}] {i['name']} | {json.dumps(i['details'])[:90]}..." for i in report["invariants"][:24]
    ] or ["No invariants."])

    ai = report["ai_signals"]
    ai_lines = [
        f"Applicable: {ai['applicable']}",
        f"Outlier rate: {ai.get('outlier_rate',0):.2%}",
        f"Centroid cosine: {ai.get('centroid_cosine',1.0):.4f}",
    ] + [f"- {n}" for n in ai.get("notes",[])[:10]]
    y = section(y, "4) AI Integrity Signals", ai_lines)

    rst = report["reality"]
    rst_lines = [f"Applicable: {rst['applicable']}", f"RRI: {rst['rri']:.3f}"]
    if rst.get("collapse_curve"):
        rst_lines.append("Collapse curve (stress -> KS fail-rate):")
        rst_lines += [f"  • stress={x:.2f} -> fail_rate={fr:.2f}" for x,fr in rst["collapse_curve"]]
    rst_lines += [f"- {n}" for n in rst.get("notes",[])[:8]]
    y = section(y, "5) Reality Stress Testing (CDRM-lite)", rst_lines)

    y = section(y, "6) Industry Pack Findings", [
        f"[{f['severity']}] {f['name']} | {json.dumps(f['metric'])[:90]}..." for f in report["industry_pack_findings"][:18]
    ] or ["No industry pack findings."])

    y = section(y, "7) Recommendations", [f"- {r}" for r in report["recommendations"][:14]])

    c.showPage()
    c.save()
    return out_path


# =========================================================
# 12) Orchestrator (read -> run -> report)
# =========================================================
def run_resethiq_mvp_from_path(path: str) -> Dict[str, Any]:
    df_raw, read_meta = load_csv_robust(path)
    df, samp_meta = safe_sample(df_raw)

    meta = {
        "generated_at": datetime.utcnow().strftime("%Y-%m-%d %H:%M UTC"),
        "fingerprint": stable_dataset_fingerprint(df),
        "rows": int(len(df_raw)),
        "cols": int(df_raw.shape[1]),
        "used_rows": int(len(df)),
        "sampled": bool(samp_meta["sampled"]),
    }

    routing = domain_router(df)
    core = forensic_core(df)
    invs = invariants(df)
    ai = ai_engine(df)
    rst = reality_stress(df)

    pack_findings = healthcare_pack(df) if routing.chosen == "Healthcare" else ai_pack(df)

    scores = {
        "statistical_integrity": score_findings(core + pack_findings),
        "invariant_integrity": score_invariants(invs),
        "ai_integrity": score_ai(ai),
        "reality_robustness": float(np.clip(100.0 * rst.rri, 0, 100)),
    }

    rep = {
        "meta": meta,
        "read_meta": read_meta,
        "routing": asdict(routing),
        "forensic_findings": [asdict(x) for x in core],
        "invariants": [asdict(x) for x in invs],
        "ai_signals": asdict(ai),
        "reality": asdict(rst),
        "industry_pack_findings": [asdict(x) for x in pack_findings],
        "scores": scores,
    }

    rep["verdict"] = verdict(scores)
    rep["recommendations"] = recommendations(scores, routing)
    return rep


In [14]:
report = run_resethiq_mvp_from_path(csv_path)
report["verdict"], report["scores"], report["read_meta"]


  "generated_at": datetime.utcnow().strftime("%Y-%m-%d %H:%M UTC"),


('ACCEPTABLE (MVP checks passed; enable provenance for high-stakes use)',
 {'statistical_integrity': 100.0,
  'invariant_integrity': 82.0,
  'ai_integrity': 75.0,
  'reality_robustness': 70.0},
 {'encoding': 'utf-16le', 'encoding_errors': 'replace', 'sep': ','})

In [15]:
pdf_path = generate_pdf(report, out_path="resethiq_integrity_report.pdf")
pdf_path


'resethiq_integrity_report.pdf'

In [16]:
from google.colab import files
files.download(pdf_path)


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

In [17]:
print("=== ROUTING ===")
print(report["routing"])
print("\n=== VERDICT ===")
print(report["verdict"])
print("\n=== SCORES ===")
print(report["scores"])

print("\n=== FORENSIC FLAGS (WARN/FAIL) ===")
for f in report["forensic_findings"]:
    if f["severity"] in ("WARN", "FAIL"):
        print(f"[{f['severity']}] {f['name']} -> {f['metric']}")

print("\n=== INDUSTRY PACK FLAGS (WARN/FAIL) ===")
for f in report["industry_pack_findings"]:
    if f["severity"] in ("WARN", "FAIL"):
        print(f"[{f['severity']}] {f['name']} -> {f['metric']}")

print("\n=== INVARIANTS (FAIL/UNKNOWN) ===")
for inv in report["invariants"]:
    if inv["status"] in ("FAIL", "UNKNOWN"):
        print(f"[{inv['status']}] {inv['name']} -> {inv['details']}")


=== ROUTING ===
{'p_healthcare': 0.5, 'p_ai': 0.5, 'chosen': 'Healthcare', 'evidence': {'keyword_hits': {'healthcare': 0, 'ai': 0}, 'structural_hints': {'has_patient_id': False, 'has_icd': False, 'has_prompt': False, 'has_label': False}}}

=== VERDICT ===
ACCEPTABLE (MVP checks passed; enable provenance for high-stakes use)

=== SCORES ===
{'statistical_integrity': 100.0, 'invariant_integrity': 82.0, 'ai_integrity': 75.0, 'reality_robustness': 70.0}

=== FORENSIC FLAGS (WARN/FAIL) ===

=== INDUSTRY PACK FLAGS (WARN/FAIL) ===

=== INVARIANTS (FAIL/UNKNOWN) ===
[UNKNOWN] Temporal: timestamp parseability -> {'reason': 'No timestamp-like columns'}
[UNKNOWN] Identity: ID duplication sanity -> {'reason': 'No ID-like columns'}
[UNKNOWN] Temporal: no time-travel ordering -> {'reason': 'No obvious start/end pair'}


In [18]:
from google.colab import files
files.download(pdf_path)


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>