In [2]:
## Updated Python module (OGTT removed, HOMA-IR Path 1 added)


from __future__ import annotations

from dataclasses import dataclass
from math import log
from typing import Dict, Literal, Optional

import numpy as np
import pandas as pd
from scipy.stats import norm


CACMethod = Literal["ln", "piecewise"]


def clamp_0_100(x: float | np.ndarray) -> float | np.ndarray:
    return np.clip(x, 0.0, 100.0)


def norm_score(x: float, mean: float, sd: float, direction: int) -> float:
    """
    Stata-compatible: 100*normal(direction * ((x - mean)/sd))
    """
    z = direction * ((x - mean) / sd)
    return float(clamp_0_100(100.0 * norm.cdf(z)))


def homa_ir(fasting_glucose_mgdl: float, fasting_insulin_uIUml: float) -> float:
    """
    HOMA-IR = (fasting_insulin_uIUml * fasting_glucose_mgdl) / 405
    """
    return (fasting_insulin_uIUml * fasting_glucose_mgdl) / 405.0


# -----------------------------
# Path 1: HOMA-IR piecewise map
# -----------------------------
HOMA_X = np.array([0.5, 1.0, 1.5, 2.0, 3.0, 5.0, 8.0], dtype=float)
HOMA_Y = np.array([98,  95,  89,  75,  50,  20,  5], dtype=float)

def homa_score(homa_value: float) -> float:
    """
    Path 1 spec:
      - clamp to endpoints
      - linear interpolation between anchors
      - final clamp to [0,100]
    """
    if homa_value <= HOMA_X[0]:
        return float(HOMA_Y[0])
    if homa_value >= HOMA_X[-1]:
        return float(HOMA_Y[-1])

    score = float(np.interp(homa_value, HOMA_X, HOMA_Y))
    return float(np.clip(score, 0.0, 100.0))


def cac_score(cac: float, method: CACMethod) -> float:
    """
    Mirrors your Stata lq_score CAC switch.
    """
    if method == "ln":
        # Stata: 100*normal( -(ln(cac+1) - ln(100)) )
        val = 100.0 * norm.cdf(-(log(cac + 1.0) - log(100.0)))
        return float(clamp_0_100(val))

    if method == "piecewise":
        # Stata piecewise 0–400
        if cac == 0:
            val = 100.0
        elif cac >= 400:
            val = 0.0
        elif cac <= 100:
            val = 100.0 - 0.2 * cac
        else:
            val = 50.0 - 0.1 * (cac - 100.0)
        return float(clamp_0_100(min(100.0, val)))

    raise ValueError("cac method must be 'ln' or 'piecewise'")


@dataclass(frozen=True)
class Param:
    mean: float
    sd: float
    direction: int  # +1 higher-better, -1 lower-better


@dataclass(frozen=True)
class LongevityScoreParams:
    """
    Parameters for z->CDF normalized variables.
    HOMA-IR is intentionally excluded from this class because we are using Path 1 (piecewise).
    """

    apob: Param = Param(mean=90.0, sd=25.0, direction=-1)
    vo2max: Param = Param(mean=36.0, sd=8.0, direction=+1)
    crp: Param = Param(mean=1.5, sd=0.8, direction=-1)
    bmi: Param = Param(mean=26.0, sd=5.0, direction=-1)
    packyrs: Param = Param(mean=2.0, sd=5.0, direction=-1)
    moca: Param = Param(mean=27.0, sd=2.0, direction=+1)
    mvpa: Param = Param(mean=150.0, sd=75.0, direction=+1)

    hrv: Param = Param(mean=35.0, sd=15.0, direction=+1)
    phq9: Param = Param(mean=4.0, sd=4.0, direction=-1)
    alt: Param = Param(mean=25.0, sd=12.0, direction=-1)
    egfr: Param = Param(mean=95.0, sd=15.0, direction=+1)
    bmd_t: Param = Param(mean=-0.5, sd=1.0, direction=+1)
    truage_delta: Param = Param(mean=2.0, sd=5.0, direction=-1)
    small_hdl: Param = Param(mean=25.0, sd=5.0, direction=+1)
    rem_pct: Param = Param(mean=20.0, sd=5.0, direction=+1)
    grip: Param = Param(mean=38.0, sd=10.0, direction=+1)
    swls: Param = Param(mean=24.0, sd=6.0, direction=+1)

    # rPDQS is explicit scaling (not z-based)
    rpdqs_max: float = 52.0

    # composite → 300–850
    lq_base: float = 300.0
    lq_slope: float = 5.5


def score_patient(
    x: Dict[str, float],
    params: LongevityScoreParams = LongevityScoreParams(),
    cac_method: CACMethod = "ln",
    remminutes: Optional[float] = None,
    tst: Optional[float] = None,
) -> Dict[str, float]:
    """
    Deterministic patient-level scoring (OGTT removed; HOMA-IR added via Path 1).

    Required keys in x:
      - Either: homa_ir
        OR: fasting_glucose_mgdl + fasting_insulin_uIUml
      - apob, vo2max, crp, bmi, packyrs, moca, mvpa,
        cac, hrv, phq9, alt, egfr, bmd_t, truage_delta, small_hdl,
        grip, swls, rpdqs,
      - plus: rem_pct OR provide remminutes+tst arguments
    """

    # --- REM % logic (matches Stata approach) ---
    rem_pct = x.get("rem_pct", None)
    if rem_pct is None:
        if remminutes is not None and tst is not None and tst > 0:
            rem_pct = 100.0 * remminutes / tst
        else:
            raise KeyError("Provide rem_pct in x, or pass remminutes and tst")

    # --- HOMA-IR logic ---
    homa_val = x.get("homa_ir", None)
    if homa_val is None:
        fg = x.get("fasting_glucose_mgdl", None)
        fi = x.get("fasting_insulin_uIUml", None)
        if fg is None or fi is None:
            raise KeyError("Provide homa_ir OR fasting_glucose_mgdl + fasting_insulin_uIUml")
        homa_val = homa_ir(fg, fi)

    subscores: Dict[str, float] = {}

    # Path 1 HOMA score (piecewise-linear)
    subscores["N_homa_ir"] = homa_score(float(homa_val))

    # z->CDF normalized variables
    subscores["N_apob"] = norm_score(x["apob"], params.apob.mean, params.apob.sd, params.apob.direction)
    subscores["N_vo2"] = norm_score(x["vo2max"], params.vo2max.mean, params.vo2max.sd, params.vo2max.direction)
    subscores["N_crp"] = norm_score(x["crp"], params.crp.mean, params.crp.sd, params.crp.direction)
    subscores["N_bmi"] = norm_score(x["bmi"], params.bmi.mean, params.bmi.sd, params.bmi.direction)
    subscores["N_packyrs"] = norm_score(x["packyrs"], params.packyrs.mean, params.packyrs.sd, params.packyrs.direction)
    subscores["N_moca"] = norm_score(x["moca"], params.moca.mean, params.moca.sd, params.moca.direction)
    subscores["N_mvpa"] = norm_score(x["mvpa"], params.mvpa.mean, params.mvpa.sd, params.mvpa.direction)

    subscores["N_cac"] = cac_score(x["cac"], method=cac_method)

    subscores["N_hrv"] = norm_score(x["hrv"], params.hrv.mean, params.hrv.sd, params.hrv.direction)
    subscores["N_phq9"] = norm_score(x["phq9"], params.phq9.mean, params.phq9.sd, params.phq9.direction)
    subscores["N_alt"] = norm_score(x["alt"], params.alt.mean, params.alt.sd, params.alt.direction)
    subscores["N_egfr"] = norm_score(x["egfr"], params.egfr.mean, params.egfr.sd, params.egfr.direction)
    subscores["N_bmd_t"] = norm_score(x["bmd_t"], params.bmd_t.mean, params.bmd_t.sd, params.bmd_t.direction)
    subscores["N_truage_delta"] = norm_score(x["truage_delta"], params.truage_delta.mean, params.truage_delta.sd, params.truage_delta.direction)
    subscores["N_small_hdl"] = norm_score(x["small_hdl"], params.small_hdl.mean, params.small_hdl.sd, params.small_hdl.direction)
    subscores["N_rem_pct"] = norm_score(rem_pct, params.rem_pct.mean, params.rem_pct.sd, params.rem_pct.direction)
    subscores["N_grip"] = norm_score(x["grip"], params.grip.mean, params.grip.sd, params.grip.direction)
    subscores["N_swls"] = norm_score(x["swls"], params.swls.mean, params.swls.sd, params.swls.direction)

    # rPDQS explicit linear scaling
    subscores["N_rpdqs"] = float(clamp_0_100((x["rpdqs"] / params.rpdqs_max) * 100.0))

    # Equal-weight composite of 20 components (OGTT removed, HOMA included)
    component_keys = [
        "N_homa_ir", "N_apob", "N_vo2", "N_crp", "N_bmi",
        "N_packyrs", "N_moca", "N_mvpa", "N_cac", "N_hrv",
        "N_phq9", "N_alt", "N_egfr", "N_bmd_t", "N_truage_delta",
        "N_small_hdl", "N_rem_pct", "N_grip", "N_swls", "N_rpdqs",
    ]

    composite = float(np.mean([subscores[k] for k in component_keys]))
    composite = float(clamp_0_100(composite))

    LQ = params.lq_base + params.lq_slope * composite
    LQ = float(np.clip(LQ, 300.0, 850.0))

    out = {"composite_0_100": composite, "LQ": LQ}
    out.update(subscores)
    return out


def score_dataframe(df: pd.DataFrame, params: LongevityScoreParams = LongevityScoreParams(), cac_method: CACMethod = "ln") -> pd.DataFrame:
    """
    Convenience wrapper for pandas usage.
    Expects columns matching the keys used in score_patient.
    """
    rows = []
    for _, r in df.iterrows():
        rows.append(score_patient(r.to_dict(), params=params, cac_method=cac_method))
    return pd.DataFrame(rows)


In [7]:
# validation in a simple case (healthy)

patient = {
    "homa_ir": 1.5,          # or provide fasting glucose + insulin instead
    "apob": 60,
    "vo2max": 55,
    "crp": 0.4,
    "bmi": 22.5,
    "packyrs": 0,
    "moca": 29,
    "mvpa": 300,
    "cac": 0,
    "hrv": 75,
    "phq9": 0,
    "alt": 18,
    "egfr": 110,
    "bmd_t": 1.0,
    "truage_delta": -5,
    "small_hdl": 40,
    "rem_pct": 28.57142857,
    "grip": 55,
    "swls": 33,
    "rpdqs": 48,
}

out = score_patient(patient, cac_method="ln")
out

for k in sorted(out.keys()):
    print(f"{k:15s} {out[k]:.4f}")

# here we are testing if the newly created homa_ir interpolation is correct. it should be 89.0

LQ              793.1360
N_alt           72.0166
N_apob          88.4930
N_bmd_t         93.3193
N_bmi           75.8036
N_cac           99.9998
N_crp           91.5434
N_egfr          84.1345
N_grip          95.5435
N_homa_ir       89.0000
N_hrv           99.6170
N_moca          84.1345
N_mvpa          97.7250
N_packyrs       65.5422
N_phq9          84.1345
N_rem_pct       95.6762
N_rpdqs         92.3077
N_small_hdl     99.8650
N_swls          93.3193
N_truage_delta  91.9243
N_vo2           99.1226
composite_0_100 89.6611


In [9]:
# test out if CAC switch works in sample healthy patient

out_ln = score_patient(patient, cac_method="ln")
out_pw = score_patient(patient, cac_method="piecewise")

print("N_cac ln:      ", out_ln["N_cac"])
print("N_cac piecewise", out_pw["N_cac"])
print("LQ ln:         ", out_ln["LQ"])
print("LQ piecewise:  ", out_pw["LQ"])


N_cac ln:       99.9997939356604
N_cac piecewise 100.0
LQ ln:          793.1359892499545
LQ piecewise:   793.1360459176478


In [10]:
# sanity check - make one "bad" patient and confirm monotonicity

patient_bad = patient.copy()
patient_bad.update({
    "homa_ir": 6.0,
    "apob": 160,
    "crp": 12,
    "bmi": 38,
    "mvpa": 20,
    "cac": 900,
    "phq9": 20,
})

out_bad = score_patient(patient_bad, cac_method="ln")

print("Good LQ:", out["LQ"])
print("Bad  LQ:", out_bad["LQ"])


Good LQ: 793.1359892499545
Bad  LQ: 626.7409600910273


In [11]:
# sanity check 

keys = ["N_homa_ir", "N_apob", "N_crp", "N_bmi", "N_mvpa", "N_cac", "N_phq9", "LQ", "composite_0_100"]
for k in keys:
    print(k, out[k], out_bad[k])



N_homa_ir 89.0 15.0
N_apob 88.49303297782917 0.2555130330427932
N_crp 91.54342776486644 1.1839137928519186e-37
N_bmi 75.8036347776927 0.8197535924596131
N_mvpa 97.72498680518208 4.15182196887791
N_cac 99.9997939356604 1.396261844331794
N_phq9 84.1344746068543 0.003167124183311986
LQ 793.1359892499545 626.7409600910273
composite_0_100 89.66108895453718 59.407447289277684


In [22]:
import pandas as pd
import io

# --- 1) Define the CSV text (as you already did) ---
csv_text = """id,homa_ir,apob,vo2max,crp,bmi,packyrs,moca,mvpa,cac,hrv,phq9,alt,egfr,bmd_t,truage_delta,small_hdl,rem_pct,grip,swls,rpdqs
A,1.5,60,55,0.4,22.5,0,29,300,0,75,0,18,110,1.0,-5,40,28.5714,55,33,48
B,2.5,120,32,3.0,32,5,26,90,50,30,8,45,85,-0.8,4,18,18,32,20,22
C,4.5,70,52,0.8,24,0,28,280,0,65,1,20,105,0.5,-2,35,26,50,30,40
D,1.4,85,40,1.2,26,0,27,160,900,40,3,25,95,-0.4,1,24,20,40,24,30
E,1.2,80,38,1.0,25,0,28,170,0,45,18,22,100,-0.2,0,26,12,38,10,28
F,6.0,160,20,12.0,38,30,22,20,1200,15,20,80,45,-2.5,10,12,8,18,8,10
"""

# --- 2) Read into pandas (this already works for you) ---
df = pd.read_csv(io.StringIO(csv_text))

# Optional sanity checks (recommended)
print(df.shape)
print(df.columns)
print(df.head())

# --- 3) Write a CLEAN CSV to disk (this is the key step) ---
df.to_csv(
    "lq_validation_v1.csv",
    index=False,
    encoding="utf-8",
    lineterminator="\n"
)


print("Saved lq_validation_v1.csv successfully")


(6, 21)
Index(['id', 'homa_ir', 'apob', 'vo2max', 'crp', 'bmi', 'packyrs', 'moca',
       'mvpa', 'cac', 'hrv', 'phq9', 'alt', 'egfr', 'bmd_t', 'truage_delta',
       'small_hdl', 'rem_pct', 'grip', 'swls', 'rpdqs'],
      dtype='object')
  id  homa_ir  apob  vo2max  crp   bmi  packyrs  moca  mvpa  cac  ...  phq9  \
0  A      1.5    60      55  0.4  22.5        0    29   300    0  ...     0   
1  B      2.5   120      32  3.0  32.0        5    26    90   50  ...     8   
2  C      4.5    70      52  0.8  24.0        0    28   280    0  ...     1   
3  D      1.4    85      40  1.2  26.0        0    27   160  900  ...     3   
4  E      1.2    80      38  1.0  25.0        0    28   170    0  ...    18   

   alt  egfr  bmd_t  truage_delta  small_hdl  rem_pct  grip  swls  rpdqs  
0   18   110    1.0            -5         40  28.5714    55    33     48  
1   45    85   -0.8             4         18  18.0000    32    20     22  
2   20   105    0.5            -2         35  26.0000    50  

In [23]:
out = score_dataframe(df, cac_method="ln")
out.to_csv("lq_validation_v1_expected_python_ln.csv", index=False)
