# Spline Test Data Generation

Generates synthetic and real curve CSV files consumed by `spline_test.ipynb`.
Run top-to-bottom to refresh datasets.

In [6]:
import numpy as np
from scipy.interpolate import UnivariateSpline, make_smoothing_spline, make_splrep
import warnings
from scipy.optimize import minimize_scalar
from pathlib import Path
import re
import zipfile
import xml.etree.ElementTree as ET
import plotly.graph_objects as go
from plotly.subplots import make_subplots
warnings.filterwarnings('ignore')
PLOTLY_TEMPLATE = 'plotly_white'
PLOTLY_CONFIG = {'displayModeBar': True}
COLORS = {'data': '#888888', 'true': '#000000', 'fixed': 'red', 'auto': 'blue', 'gcv': 'green', 'rough': 'blue'}

def _dash(style):
    return {'-': 'solid', '--': 'dash', '-.': 'dashdot', ':': 'dot'}.get(style, 'solid')

def _row_col(i, n_cols):
    return (i // n_cols + 1, i % n_cols + 1)

In [7]:
import sys
if 'growthcurves.models' not in sys.modules:
    if 'growthcurves' not in sys.modules:
        for _base in [Path.cwd(), Path.cwd().parent]:
            _src = _base / 'src'
            if (_src / 'growthcurves' / 'models.py').exists():
                if str(_src) not in sys.path:
                    sys.path.insert(0, str(_src))
                break
from growthcurves.models import phenom_gompertz_model, phenom_logistic_model, phenom_richards_model

def make_growth_curve(t, N0=0.05, Nmax=2.0, mu_max=0.5, lag=4.0, noise_sigma=0.05, seed=42, model='gompertz', richards_nu=1.2):
    """
    Noisy synthetic growth curve from imported growth models.

    Multiplicative lognormal noise (additive in log-space).
    Supported models: gompertz, logistic, richards.
    """
    t = np.asarray(t, dtype=float)
    rng = np.random.default_rng(seed)
    N0 = float(max(N0, 1e-10))
    Nmax = float(max(Nmax, N0 * 1.01))
    A = float(np.log(max(Nmax, 1e-12) / max(N0, 1e-12)))
    if not np.isfinite(A) or A <= 0:
        A = 1e-06
    model_key = str(model).strip().lower()
    if model_key == 'gompertz':
        N_true = phenom_gompertz_model(t=t, A=A, mu_max=float(mu_max), lam=float(lag), N0=N0)
    elif model_key == 'logistic':
        N_true = phenom_logistic_model(t=t, A=A, mu_max=float(mu_max), lam=float(lag), N0=N0)
    elif model_key == 'richards':
        nu = float(np.clip(richards_nu, 0.05, 5.0))
        N_true = phenom_richards_model(t=t, A=A, mu_max=float(mu_max), lam=float(lag), nu=nu, N0=N0)
    else:
        raise ValueError(f"Unknown model '{model}'. Use gompertz/logistic/richards.")
    N_noisy = np.asarray(N_true, dtype=float) * np.exp(rng.normal(0, noise_sigma, size=len(t)))
    return (np.asarray(N_true, dtype=float), np.clip(N_noisy, 1e-10, None))

In [8]:
def _second_diff_series(t, y):
    t = np.asarray(t, dtype=float)
    y = np.asarray(y, dtype=float)
    if len(t) < 4 or np.ptp(t) <= 0:
        return np.array([], dtype=float)
    order = np.argsort(t, kind='mergesort')
    t = t[order]
    y = y[order]
    dt = np.diff(t)
    if len(dt) < 2 or np.any(dt <= 0):
        return np.array([], dtype=float)
    d1 = np.diff(y) / dt
    dt2 = 0.5 * (dt[:-1] + dt[1:])
    if len(dt2) < 1 or np.any(dt2 <= 0):
        return np.array([], dtype=float)
    d2 = np.diff(d1) / dt2
    if len(d2) < 3:
        return np.array([], dtype=float)
    h = float(np.median(dt))
    return np.asarray(d2 * h ** 2, dtype=float)

def _trim_by_abs_deviation(vals, trim_q=0.8):
    vals = np.asarray(vals, dtype=float)
    vals = vals[np.isfinite(vals)]
    if len(vals) < 3:
        return vals
    if trim_q is None:
        return vals
    q = float(trim_q)
    if not np.isfinite(q) or q <= 0.0 or q >= 1.0:
        return vals
    abs_dev = np.abs(vals - np.median(vals))
    if len(abs_dev) < 5:
        return vals
    cutoff = float(np.quantile(abs_dev, q))
    core = vals[abs_dev <= cutoff]
    if len(core) >= 3:
        return core
    return vals

def _sigma_second_diff_mad(t, y, trim_q=0.8):
    d2y = _second_diff_series(t, y)
    if len(d2y) < 3:
        return np.nan
    d2y = _trim_by_abs_deviation(d2y, trim_q=trim_q)
    if len(d2y) < 3:
        return np.nan
    mad = float(np.median(np.abs(d2y - np.median(d2y))))
    return float(1.4826 * mad / np.sqrt(6.0))

def _sigma_second_diff_iqr(t, y):
    d2y = _second_diff_series(t, y)
    if len(d2y) < 3:
        return np.nan
    q25, q75 = np.quantile(d2y, [0.25, 0.75])
    iqr = float(q75 - q25)
    if iqr <= 0:
        return np.nan
    return float(iqr / 1.349 / np.sqrt(6.0))

def _sigma_second_diff_std(t, y):
    d2y = _second_diff_series(t, y)
    if len(d2y) < 3:
        return np.nan
    sd = float(np.std(d2y, ddof=1))
    return float(sd / np.sqrt(6.0))

def _sigma_first_diff_mad(t, y, trim_q=0.8):
    t = np.asarray(t, dtype=float)
    y = np.asarray(y, dtype=float)
    if len(t) < 3 or np.ptp(t) <= 0:
        return np.nan
    order = np.argsort(t, kind='mergesort')
    t = t[order]
    y = y[order]
    dt = np.diff(t)
    if len(dt) < 2 or np.any(dt <= 0):
        return np.nan
    dy = np.diff(y)
    slopes = dy / dt
    slope_med = float(np.median(slopes))
    resid = dy - slope_med * dt
    resid = _trim_by_abs_deviation(resid, trim_q=trim_q)
    if len(resid) < 2:
        return np.nan
    mad = float(np.median(np.abs(resid - np.median(resid))))
    return float(1.4826 * mad / np.sqrt(2.0))

def _estimate_sigma_log_noise(t, y, method='second_diff_mad', trim_mad=True, trim_q=0.8):
    method = str(method).strip().lower()
    trim_q_use = float(trim_q) if trim_mad else None
    if method == 'second_diff_mad':
        sigma = _sigma_second_diff_mad(t, y, trim_q=trim_q_use)
    elif method == 'first_diff_mad':
        sigma = _sigma_first_diff_mad(t, y, trim_q=trim_q_use)
    elif method == 'second_diff_iqr':
        sigma = _sigma_second_diff_iqr(t, y)
    elif method == 'second_diff_std':
        sigma = _sigma_second_diff_std(t, y)
    else:
        raise ValueError('Unknown sigma method. Use one of: second_diff_mad, first_diff_mad, second_diff_iqr, second_diff_std.')
    if not np.isfinite(sigma) or sigma < 1e-08:
        sigma = 0.0001
    return float(sigma)

def _robust_sigma_from_second_diff(t, y, trim_q=0.8):
    """

    Backward-compatible default noise estimate in log-space from second divided differences.

    A trimmed MAD is used to reduce inflation when many spikes are present.

    """
    return _estimate_sigma_log_noise(t, y, method='second_diff_mad', trim_mad=True, trim_q=trim_q)
_SMOOTH_MULT = float(globals().get('AUTO_SMOOTH_MULT', 5.0))
FIXED_S = 0.55

def fixed_s_spline(t, N, s=FIXED_S, k=3, use_od_weights=False, weight_floor_q=0.15, weight_power=1.0):
    """Baseline: spline with fixed s in ln(N), optionally with OD-dependent weights."""
    t_in = np.asarray(t, dtype=float)
    N_in = np.asarray(N, dtype=float)
    y_in = np.log(np.clip(N_in, 1e-12, None))
    order = np.argsort(t_in, kind='mergesort')
    t = t_in[order]
    y = y_in[order]
    N_sorted = np.clip(N_in[order], 1e-12, None)
    if bool(use_od_weights):
        w = _od_weight_vector(N_sorted, floor_q=weight_floor_q, power=weight_power)
        spline = UnivariateSpline(t, y, k=k, s=s, w=w)
    else:
        spline = UnivariateSpline(t, y, k=k, s=s)
    return spline

def _od_weight_vector(N, floor_q=0.15, power=1.0):
    """

    Build OD-dependent spline weights to reduce low-OD log-noise leverage.

    """
    N = np.asarray(N, dtype=float)
    if len(N) == 0:
        return np.asarray([], dtype=float)
    q = float(np.clip(float(floor_q), 0.0, 0.49))
    od_floor = max(float(np.quantile(N, q)), 1e-06)
    N_eff = np.clip(N, od_floor, None)
    p = float(np.clip(float(power), 0.0, 3.0))
    w = (N_eff / float(np.median(N_eff))) ** p
    w = np.clip(w, 0.2, 5.0)
    w = w / np.sqrt(np.mean(w ** 2))
    return np.asarray(w, dtype=float)

def _auto_s_from_sigma(n, y, sigma, smooth_mult=None, clip_smoothing=True):
    n = int(max(int(n), 2))
    sigma = float(max(float(sigma), 1e-08))
    smooth_mult = float(_SMOOTH_MULT if smooth_mult is None else smooth_mult)
    if not np.isfinite(smooth_mult):
        smooth_mult = float(_SMOOTH_MULT)
    if clip_smoothing:
        smooth_mult_eff = float(np.clip(smooth_mult, 0.25, 30.0))
    else:
        smooth_mult_eff = float(max(smooth_mult, 0.0))
    raw_s = float(smooth_mult_eff * float(n) * sigma ** 2)
    s_min = 0.0
    s_max = max(1e-08, 0.8 * float(n) * float(np.var(y)))
    if clip_smoothing:
        s_opt = float(np.clip(raw_s, 0.0, s_max))
    else:
        s_opt = float(max(raw_s, 0.0))
    return (s_opt, raw_s, smooth_mult_eff, s_min, s_max)

def auto_smooth_spline_variant(t, N, k=3, smooth_mult=None, sigma_method='second_diff_mad', trim_mad=True, clip_smoothing=True, trim_q=0.8, use_od_weights=True, weight_floor_q=0.15, weight_power=1.0):
    """

    Fit a smoothing spline to ln(N) with configurable noise estimation and smoothing rules.

    """
    t_in = np.asarray(t, dtype=float)
    N_in = np.asarray(N, dtype=float)
    y_in = np.log(np.clip(N_in, 1e-12, None))
    order = np.argsort(t_in, kind='mergesort')
    t = t_in[order]
    y = y_in[order]
    N_sorted = N_in[order]
    sigma = _estimate_sigma_log_noise(t, y, method=sigma_method, trim_mad=bool(trim_mad), trim_q=trim_q)
    s_opt, _, _, _, _ = _auto_s_from_sigma(n=len(t), y=y, sigma=sigma, smooth_mult=smooth_mult, clip_smoothing=bool(clip_smoothing))
    if bool(use_od_weights):
        w = _od_weight_vector(N_sorted, floor_q=weight_floor_q, power=weight_power)
        spline = UnivariateSpline(t, y, k=k, s=s_opt, w=w)
    else:
        spline = UnivariateSpline(t, y, k=k, s=s_opt)
    return (spline, float(s_opt), float(sigma))

def gcv_smooth_spline(t, N, use_od_weights=True, weight_floor_q=0.15, weight_power=1.0):
    """

    Fit ln(N) smoothing spline with GCV-selected regularization.

    This mirrors R's smooth.spline default behavior (GCV for smoothing).

    """
    t_in = np.asarray(t, dtype=float)
    N_in = np.asarray(N, dtype=float)
    if len(t_in) < 5:
        raise ValueError('Need at least 5 points for GCV smoothing spline.')
    order = np.argsort(t_in, kind='mergesort')
    t = t_in[order]
    N_sorted = np.clip(N_in[order], 1e-12, None)
    uniq_t, uniq_idx = np.unique(t, return_index=True)
    t = uniq_t
    N_sorted = N_sorted[uniq_idx]
    if len(t) < 5 or np.ptp(t) <= 0:
        raise ValueError('Need >=5 unique time points with non-zero span.')
    y = np.log(N_sorted)
    if bool(use_od_weights):
        w = _od_weight_vector(N_sorted, floor_q=weight_floor_q, power=weight_power)
    else:
        w = np.ones_like(y, dtype=float)
    spline = make_smoothing_spline(t, y, w=w, lam=None)
    resid = y - np.asarray(spline(t), dtype=float)
    s_equiv = float(np.sum((w * resid) ** 2))
    sigma = _estimate_sigma_log_noise(t, y, method='second_diff_mad', trim_mad=True, trim_q=0.8)
    return (spline, float(max(s_equiv, 0.0)), float(sigma))

def growthrates_fit_spline(t, N, optgrid=200, use_od_weights=False, weight_floor_q=0.15, weight_power=1.0):
    """

    growthrates::fit_spline-like method:

    - fit smoothing spline on ln(N) with automatic GCV smoothing

    - estimate mu_max via grid search + local bounded optimization of derivative

    """
    t_in = np.asarray(t, dtype=float)
    N_in = np.asarray(N, dtype=float)
    if len(t_in) < 5:
        raise ValueError('Need at least 5 points for growthrates-like spline.')
    order = np.argsort(t_in, kind='mergesort')
    t = t_in[order]
    N_sorted = np.clip(N_in[order], 1e-12, None)
    uniq_t, uniq_idx = np.unique(t, return_index=True)
    t = uniq_t
    N_sorted = N_sorted[uniq_idx]
    if len(t) < 5 or np.ptp(t) <= 0:
        raise ValueError('Need >=5 unique time points with non-zero span.')
    y = np.log(N_sorted)
    if bool(use_od_weights):
        w = _od_weight_vector(N_sorted, floor_q=weight_floor_q, power=weight_power)
    else:
        w = np.ones_like(y, dtype=float)
    spline = make_smoothing_spline(t, y, w=w, lam=None)
    resid = y - np.asarray(spline(t), dtype=float)
    s_equiv = float(np.sum((w * resid) ** 2))
    sigma = _estimate_sigma_log_noise(t, y, method='second_diff_mad', trim_mad=True, trim_q=0.8)
    n_grid = int(max(60, optgrid))
    xnew = np.linspace(float(np.min(t)), float(np.max(t)), n_grid)
    deriv_fun = spline.derivative()
    dvals = np.asarray(deriv_fun(xnew), dtype=float)
    if not np.any(np.isfinite(dvals)) or len(xnew) < 2:
        t_opt = float(np.mean([np.min(t), np.max(t)]))
        mu_opt = np.nan
    else:
        idx = int(np.nanargmax(dvals))
        t_guess = float(xnew[idx])
        dx = float(xnew[1] - xnew[0])
        lo = float(max(np.min(t), t_guess - 2.0 * dx))
        hi = float(min(np.max(t), t_guess + 2.0 * dx))
        if hi > lo:
            try:
                opt_res = minimize_scalar(lambda z: -float(deriv_fun(float(z))), bounds=(lo, hi), method='bounded')
                t_opt = float(opt_res.x) if bool(opt_res.success) else t_guess
            except Exception:
                t_opt = t_guess
        else:
            t_opt = t_guess
        mu_opt = float(deriv_fun(t_opt))
    return (spline, float(max(s_equiv, 0.0)), float(sigma), float(t_opt), float(mu_opt) if np.isfinite(mu_opt) else np.nan)

def scipy_fitpack_spline(t, N, use_od_weights=False, weight_floor_q=0.15, weight_power=1.0, s_target=None):
    """

    Fit ln(N) smoothing spline using scipy.interpolate.make_splrep (FITPACK).

    FITPACK minimizes derivative roughness with weighted RSS constrained by s.

    This method defaults to s = n points.

    """
    t_in = np.asarray(t, dtype=float)
    N_in = np.asarray(N, dtype=float)
    if len(t_in) < 5:
        raise ValueError('Need at least 5 points for FITPACK smoothing spline.')
    order = np.argsort(t_in, kind='mergesort')
    t = t_in[order]
    N_sorted = np.clip(N_in[order], 1e-12, None)
    uniq_t, uniq_idx = np.unique(t, return_index=True)
    t = uniq_t
    N_sorted = N_sorted[uniq_idx]
    if len(t) < 5 or np.ptp(t) <= 0:
        raise ValueError('Need >=5 unique time points with non-zero span.')
    y = np.log(N_sorted)
    sigma = _estimate_sigma_log_noise(t, y, method='second_diff_mad', trim_mad=True, trim_q=0.8)
    sigma = float(max(sigma, 1e-08))
    w = np.full_like(y, 1.0 / sigma, dtype=float)
    if bool(use_od_weights):
        w = w * _od_weight_vector(N_sorted, floor_q=weight_floor_q, power=weight_power)
    s_use = float(len(t) if s_target is None else max(float(s_target), 0.0))
    spline = make_splrep(t, y, w=w, s=s_use, k=3)
    resid = y - np.asarray(spline(t), dtype=float)
    s_equiv = float(np.sum((w * resid) ** 2))
    return (spline, float(max(s_equiv, 0.0)), float(sigma))

def auto_smooth_spline(t, N, k=3, smooth_mult=None):
    """

    Fit a smoothing spline to ln(N) with automatic smoothing factor selection.

    Default behavior uses second-difference MAD sigma with clipping, plus

    OD-weighted fitting to reduce low-OD log-noise overfitting.

    """
    return auto_smooth_spline_variant(t, N, k=k, smooth_mult=smooth_mult, sigma_method='second_diff_mad', trim_mad=True, clip_smoothing=True, trim_q=0.8, use_od_weights=True, weight_floor_q=0.15, weight_power=1.0)
import hashlib
_SPLINE_FIT_CACHE = {}
_SPLINE_FIT_CACHE_STATS = {'hits': 0, 'misses': 0}
_SPLINE_FIT_CACHE_ENABLED = bool(globals().get('SPLINE_FIT_CACHE_ENABLED', True))

def _hash_array64(arr):
    a = np.ascontiguousarray(np.asarray(arr, dtype=np.float64))
    h = hashlib.blake2b(a.view(np.uint8), digest_size=16).hexdigest()
    return (h, int(a.size))

def _fit_cache_key(method_key, t, N, kwargs):
    kw_items = tuple(sorted(((str(k), repr(v)) for k, v in kwargs.items())))
    return (str(method_key), _hash_array64(t), _hash_array64(N), kw_items)

def clear_spline_fit_cache():
    _SPLINE_FIT_CACHE.clear()
    _SPLINE_FIT_CACHE_STATS['hits'] = 0
    _SPLINE_FIT_CACHE_STATS['misses'] = 0

def spline_fit_cache_stats():
    return {'enabled': bool(_SPLINE_FIT_CACHE_ENABLED), 'entries': int(len(_SPLINE_FIT_CACHE)), 'hits': int(_SPLINE_FIT_CACHE_STATS['hits']), 'misses': int(_SPLINE_FIT_CACHE_STATS['misses'])}

def _fit_spline_uncached(method_key, t, N, **kwargs):
    mode = str(method_key).strip().lower()
    if mode == 'auto_default':
        return auto_smooth_spline(t, N, **kwargs)
    if mode == 'auto_variant':
        return auto_smooth_spline_variant(t, N, **kwargs)
    if mode == 'gcv':
        return gcv_smooth_spline(t, N, **kwargs)
    if mode == 'growthrates':
        return growthrates_fit_spline(t, N, **kwargs)
    if mode == 'fitpack':
        return scipy_fitpack_spline(t, N, **kwargs)
    raise ValueError(f'Unknown spline fit mode: {method_key}')

def fit_spline_cached(method_key, t, N, use_cache=None, **kwargs):
    cache_on = _SPLINE_FIT_CACHE_ENABLED if use_cache is None else bool(use_cache)
    if not cache_on:
        return _fit_spline_uncached(method_key, t, N, **kwargs)
    key = _fit_cache_key(method_key, t, N, kwargs)
    if key in _SPLINE_FIT_CACHE:
        _SPLINE_FIT_CACHE_STATS['hits'] += 1
        return _SPLINE_FIT_CACHE[key]
    out = _fit_spline_uncached(method_key, t, N, **kwargs)
    _SPLINE_FIT_CACHE[key] = out
    _SPLINE_FIT_CACHE_STATS['misses'] += 1
    return out
print('fixed_s_spline(), auto_smooth_spline_variant(), gcv_smooth_spline(), growthrates_fit_spline(), scipy_fitpack_spline(), and auto_smooth_spline() defined.')

fixed_s_spline(), auto_smooth_spline_variant(), gcv_smooth_spline(), growthrates_fit_spline(), scipy_fitpack_spline(), and auto_smooth_spline() defined.


In [9]:
DATA_DIR = Path('/Users/sambra/Documents/GitHub/TheGrowthAnalysisApp/example_data')
MIN_POINTS = 10
MIN_OD_INCREASE = 0.05
REAL_SMOOTH_MULT = float(globals().get('REAL_SMOOTH_MULT', _SMOOTH_MULT))
REAL_MU_EDGE_FRAC = float(globals().get('REAL_MU_EDGE_FRAC', 0.0))
REAL_MU_Q = float(globals().get('REAL_MU_Q', 1.0))

def _mu_max_from_spline(spline, t, edge_frac=0.0, q=1.0, n_eval=320):
    t = np.asarray(t, dtype=float)
    t0 = float(np.nanmin(t))
    t1 = float(np.nanmax(t))
    span = max(t1 - t0, 1e-08)
    e = float(np.clip(edge_frac, 0.0, 0.35))
    lo = t0 + e * span
    hi = t1 - e * span
    if hi <= lo:
        lo, hi = (t0, t1)
    te = np.linspace(lo, hi, int(max(80, n_eval)))
    mu = np.asarray(spline.derivative()(te), dtype=float)
    mu = mu[np.isfinite(mu)]
    if len(mu) == 0:
        return np.nan
    qq = float(np.clip(q, 0.8, 1.0))
    if qq >= 0.999999:
        return float(np.max(mu))
    return float(np.quantile(mu, qq))

def _excel_col_to_idx_auto(cell_ref):
    m = re.match('([A-Z]+)', str(cell_ref))
    if not m:
        return 0
    col = m.group(1)
    idx = 0
    for ch in col:
        idx = idx * 26 + (ord(ch) - ord('A') + 1)
    return idx - 1

def _parse_xlsx_first_sheet_auto(path):
    ns_main = {'x': 'http://schemas.openxmlformats.org/spreadsheetml/2006/main'}
    ns_rel = {'r': 'http://schemas.openxmlformats.org/package/2006/relationships'}
    with zipfile.ZipFile(path) as zf:
        shared = []
        if 'xl/sharedStrings.xml' in zf.namelist():
            root_ss = ET.fromstring(zf.read('xl/sharedStrings.xml'))
            for si in root_ss.findall('x:si', ns_main):
                txt = ''.join((t.text or '' for t in si.findall('.//x:t', ns_main)))
                shared.append(txt)
        wb_root = ET.fromstring(zf.read('xl/workbook.xml'))
        first_sheet = wb_root.find('x:sheets/x:sheet', ns_main)
        if first_sheet is None:
            raise ValueError(f'No sheets found in {path.name}')
        rel_id = first_sheet.attrib.get('{http://schemas.openxmlformats.org/officeDocument/2006/relationships}id')
        rels_root = ET.fromstring(zf.read('xl/_rels/workbook.xml.rels'))
        target = None
        for rel in rels_root.findall('r:Relationship', ns_rel):
            if rel.attrib.get('Id') == rel_id:
                target = rel.attrib.get('Target')
                break
        if target is None:
            raise ValueError(f'Could not resolve first sheet rel in {path.name}')
        sheet_xml = 'xl/' + target.lstrip('/')
        root_sheet = ET.fromstring(zf.read(sheet_xml))
        rows_dict = {}
        max_col = 0
        for row in root_sheet.findall('x:sheetData/x:row', ns_main):
            rnum = int(row.attrib.get('r', '1')) - 1
            row_map = {}
            for c in row.findall('x:c', ns_main):
                ref = c.attrib.get('r', 'A1')
                cidx = _excel_col_to_idx_auto(ref)
                ctype = c.attrib.get('t')
                val = None
                v_node = c.find('x:v', ns_main)
                is_node = c.find('x:is', ns_main)
                if ctype == 's' and v_node is not None:
                    sid = int(v_node.text)
                    if 0 <= sid < len(shared):
                        val = shared[sid]
                elif ctype == 'inlineStr' and is_node is not None:
                    t_node = is_node.find('x:t', ns_main)
                    val = t_node.text if t_node is not None else None
                elif v_node is not None:
                    val = v_node.text
                row_map[cidx] = val
                max_col = max(max_col, cidx)
            rows_dict[rnum] = row_map
        max_row = max(rows_dict.keys()) if rows_dict else 0
        rows = []
        for r in range(max_row + 1):
            row = [None] * (max_col + 1)
            for cidx, v in rows_dict.get(r, {}).items():
                row[cidx] = v
            rows.append(row)
    return rows

def _to_float_auto(v):
    if v is None:
        return np.nan
    if isinstance(v, (int, float, np.floating)):
        return float(v)
    s = str(v).strip().replace(',', '.')
    try:
        return float(s)
    except Exception:
        return np.nan

def _extract_curves_auto(rows, file_name, min_points=10):
    if len(rows) < 3:
        return []
    header = rows[0]
    curves = []
    n_cols = len(header)
    for j in range(1, n_cols):
        well = header[j]
        if well is None or str(well).strip() == '':
            continue
        well = str(well).strip()
        t_vals, y_vals = ([], [])
        for r in range(1, len(rows)):
            t_raw = rows[r][0] if 0 < len(rows[r]) else None
            y_raw = rows[r][j] if j < len(rows[r]) else None
            t_f = _to_float_auto(t_raw)
            y_f = _to_float_auto(y_raw)
            if np.isfinite(t_f) and np.isfinite(y_f):
                t_vals.append(t_f)
                y_vals.append(y_f)
        if len(t_vals) < min_points:
            continue
        t = np.asarray(t_vals, dtype=float)
        y = np.asarray(y_vals, dtype=float)
        m = np.isfinite(t) & np.isfinite(y) & (y > 0)
        t, y = (t[m], y[m])
        if len(t) < min_points or np.ptp(t) <= 0:
            continue
        o = np.argsort(t, kind='mergesort')
        t, y = (t[o], y[o])
        if np.nanmax(t) > 96:
            t = t / 60.0
        uniq_t, uniq_idx = np.unique(t, return_index=True)
        t, y = (uniq_t, y[uniq_idx])
        if len(t) < min_points or np.ptp(t) <= 0:
            continue
        curves.append({'curve_id': f'{file_name}:{well}', 'file': file_name, 'well': well, 't': t, 'N': y})
    return curves
if not DATA_DIR.exists():
    raise RuntimeError(f'Real-data directory not found: {DATA_DIR}')
data_files = sorted(DATA_DIR.glob('*_data.xlsx'))
if not data_files:
    raise RuntimeError(f'No *_data.xlsx files found in {DATA_DIR}')
real_curves_auto = []
for fp in data_files:
    try:
        rows = _parse_xlsx_first_sheet_auto(fp)
        real_curves_auto.extend(_extract_curves_auto(rows, fp.name, min_points=MIN_POINTS))
    except Exception as exc:
        print(f'Skipping {fp.name}: {exc}')
if not real_curves_auto:
    raise RuntimeError('No usable real curves extracted.')
print(f'Loaded {len(real_curves_auto)} real curves from {len(data_files)} file(s).')

Loaded 226 real curves from 3 file(s).


In [10]:
if 'real_curves_auto' not in globals() or len(real_curves_auto) == 0:
    raise RuntimeError('Run the real data loading cell first.')
rsm = float(globals().get('REAL_SMOOTH_MULT', _SMOOTH_MULT))
r_edge = float(globals().get('REAL_MU_EDGE_FRAC', 0.0))
r_q = float(globals().get('REAL_MU_Q', 1.0))
MU_MAX_LO = 0.05
MU_MAX_HI = 1.5
VARIETY_TAIL_PROB = float(globals().get('VARIETY_TAIL_PROB', 0.35))
SYN_MODELS = list(globals().get('SYN_MODELS', ['gompertz', 'logistic', 'richards']))
SYN_MODEL_P = np.asarray(globals().get('SYN_MODEL_P', [0.45, 0.3, 0.25]), dtype=float)
if len(SYN_MODELS) != len(SYN_MODEL_P):
    raise RuntimeError('SYN_MODELS and SYN_MODEL_P must have same length.')
SYN_MODEL_P = np.clip(SYN_MODEL_P, 0.0, None)
if float(np.sum(SYN_MODEL_P)) <= 0:
    SYN_MODEL_P = np.ones(len(SYN_MODELS), dtype=float)
SYN_MODEL_P = SYN_MODEL_P / np.sum(SYN_MODEL_P)
SYN_NPTS_RANGE = (100, 1000)
SYN_TSPAN_RANGE = (24.0, 96.0)
if len(SYN_NPTS_RANGE) != 2 or len(SYN_TSPAN_RANGE) != 2:
    raise RuntimeError('SYN_NPTS_RANGE and SYN_TSPAN_RANGE must be length-2 tuples.')
_n_lo, _n_hi = (int(SYN_NPTS_RANGE[0]), int(SYN_NPTS_RANGE[1]))
if _n_hi < _n_lo:
    _n_lo, _n_hi = (_n_hi, _n_lo)
SYN_NPTS_RANGE = (max(10, _n_lo), max(10, _n_hi))
_t_lo, _t_hi = (float(SYN_TSPAN_RANGE[0]), float(SYN_TSPAN_RANGE[1]))
if _t_hi < _t_lo:
    _t_lo, _t_hi = (_t_hi, _t_lo)
SYN_TSPAN_RANGE = (max(1.0, _t_lo), max(1.0, _t_hi))
SYN_OD_FOLD_RANGE = (0.0, 200.0)
if len(SYN_OD_FOLD_RANGE) != 2:
    raise RuntimeError('SYN_OD_FOLD_RANGE must be a length-2 tuple.')
_f_lo, _f_hi = (float(SYN_OD_FOLD_RANGE[0]), float(SYN_OD_FOLD_RANGE[1]))
if _f_hi < _f_lo:
    _f_lo, _f_hi = (_f_hi, _f_lo)
SYN_OD_FOLD_RANGE = (max(0.0, _f_lo), max(0.0, _f_hi))
SYN_NOISE_MULT = float(globals().get('SYN_NOISE_MULT', 2.0))
SYN_NOISE_RANGE = tuple(globals().get('SYN_NOISE_RANGE', (0.0001, 0.8)))
if len(SYN_NOISE_RANGE) != 2:
    raise RuntimeError('SYN_NOISE_RANGE must be a length-2 tuple.')
_noise_lo, _noise_hi = (float(SYN_NOISE_RANGE[0]), float(SYN_NOISE_RANGE[1]))
if _noise_hi < _noise_lo:
    _noise_lo, _noise_hi = (_noise_hi, _noise_lo)
SYN_NOISE_RANGE = (max(1e-08, _noise_lo), max(1e-08, _noise_hi))
EVAL_N = max(500, int(globals().get('EVAL_N', 500)))
EVAL_SEED = int(globals().get('EVAL_SEED', 20260224))
EVAL_MIN_OD_INCREASE = float(globals().get('MIN_OD_INCREASE', 0.0))
EVAL_POOL_MULT = int(globals().get('EVAL_POOL_MULT', 8))
EVAL_NPTS_BINS = np.asarray(globals().get('EVAL_NPTS_BINS', [220, 400, 600, 800]), dtype=float)
EVAL_TSPAN_BINS = np.asarray(globals().get('EVAL_TSPAN_BINS', [30.0, 40.0, 52.0, 64.0, 80.0]), dtype=float)
EVAL_STRESS_CASES_PER_SCENARIO = int(globals().get('EVAL_STRESS_CASES_PER_SCENARIO', 32))
EVAL_STRESS_FRACTION = float(globals().get('EVAL_STRESS_FRACTION', 0.4))
EVAL_STRESS_MAX_KEEP = int(globals().get('EVAL_STRESS_MAX_KEEP', 220))
EVAL_STRESS_TARGET = int(globals().get('EVAL_STRESS_TARGET', -1))

def _safe_quantile(arr, q, default=np.nan):
    arr = np.asarray(arr, dtype=float)
    arr = arr[np.isfinite(arr)]
    if len(arr) == 0:
        return float(default)
    return float(np.quantile(arr, q))

def _estimate_true_mu_from_curve(t, N_true, n_eval=420):
    """Approximate true mu_max = max d(ln N)/dt from noiseless curve."""
    t = np.asarray(t, dtype=float)
    N_true = np.asarray(N_true, dtype=float)
    if len(t) < 4 or np.ptp(t) <= 0:
        return np.nan
    order = np.argsort(t, kind='mergesort')
    t = t[order]
    N_true = np.clip(N_true[order], 1e-12, None)
    te = np.linspace(float(np.min(t)), float(np.max(t)), int(max(120, n_eval)))
    Ne = np.interp(te, t, N_true)
    y = np.log(np.clip(Ne, 1e-12, None))
    mu = np.gradient(y, te)
    mu = mu[np.isfinite(mu)]
    if len(mu) == 0:
        return np.nan
    return float(np.max(mu))

def _estimate_lag_fraction(t, mu):
    t = np.asarray(t, dtype=float)
    mu = np.asarray(mu, dtype=float)
    if len(t) < 3 or len(mu) != len(t):
        return 0.35
    peak = int(np.nanargmax(mu))
    if peak < 1:
        return 0.2
    mu_peak = float(np.nanmax(mu))
    if not np.isfinite(mu_peak) or mu_peak <= 0:
        return 0.35
    thr = 0.25 * mu_peak
    idx = np.where(mu[:peak + 1] >= thr)[0]
    if len(idx) == 0:
        return 0.35
    span = max(float(np.ptp(t)), 1e-08)
    return float(np.clip((t[int(idx[0])] - float(t[0])) / span, 0.02, 0.95))

def _describe_real_curve(rec):
    t = np.asarray(rec['t'], dtype=float)
    N = np.asarray(rec['N'], dtype=float)
    if len(t) < 10 or np.ptp(t) <= 0:
        return None
    y = np.log(np.clip(N, 1e-12, None))
    sigma = _robust_sigma_from_second_diff(t, y, trim_q=0.8)
    try:
        sp, s_opt, _ = fit_spline_cached('auto_default', t, N, smooth_mult=rsm)
    except Exception:
        return None
    t_dense = np.linspace(float(np.min(t)), float(np.max(t)), 320)
    mu = np.asarray(sp.derivative()(t_dense), dtype=float)
    mu_hat = float(np.nanmax(mu)) if np.any(np.isfinite(mu)) else np.nan
    lag_frac = _estimate_lag_fraction(t_dense, mu)
    od_p05 = float(np.percentile(N, 5))
    od_p95 = float(np.percentile(N, 95))
    od_fold = float(od_p95 / max(od_p05, 1e-08))
    return {'n_pts': int(len(t)), 't_span': float(np.ptp(t)), 'od_p05': od_p05, 'od_p95': od_p95, 'od_fold': od_fold, 'sigma': float(max(0.0001, sigma)), 'mu_hat': float(max(0.0001, mu_hat)) if np.isfinite(mu_hat) else np.nan, 'lag_frac': lag_frac, 't_template': np.asarray(t, dtype=float), 'rec': rec, 's_opt': float(s_opt)}
real_desc = []
for rec in real_curves_auto:
    d = _describe_real_curve(rec)
    if d is not None and np.isfinite(d['mu_hat']) and (d['od_fold'] > 1.02):
        real_desc.append(d)
if len(real_desc) < 10:
    raise RuntimeError('Not enough describable real curves to build a synthetic profile.')
arr_n = np.asarray([d['n_pts'] for d in real_desc], dtype=float)
arr_span = np.asarray([d['t_span'] for d in real_desc], dtype=float)
arr_sigma = np.asarray([d['sigma'] for d in real_desc], dtype=float)
arr_mu = np.asarray([d['mu_hat'] for d in real_desc], dtype=float)
arr_lag = np.asarray([d['lag_frac'] for d in real_desc], dtype=float)
arr_fold = np.asarray([d['od_fold'] for d in real_desc], dtype=float)
arr_od0 = np.asarray([d['od_p05'] for d in real_desc], dtype=float)
print('Real-data profile used for synthetic generation')
print(f'  curves profiled: {len(real_desc)}')
print(f'  n points: median={np.median(arr_n):.1f}, q10={_safe_quantile(arr_n, 0.1):.1f}, q90={_safe_quantile(arr_n, 0.9):.1f}')
print(f'  t span (h): median={np.median(arr_span):.2f}, q10={_safe_quantile(arr_span, 0.1):.2f}, q90={_safe_quantile(arr_span, 0.9):.2f}')
print(f'  OD fold (p95/p05): median={np.median(arr_fold):.2f}, q10={_safe_quantile(arr_fold, 0.1):.2f}, q90={_safe_quantile(arr_fold, 0.9):.2f}')
print(f'  sigma proxy: median={np.median(arr_sigma):.4f}, q10={_safe_quantile(arr_sigma, 0.1):.4f}, q90={_safe_quantile(arr_sigma, 0.9):.4f}')
print(f'  synthetic test-set noise: sigma_proxy * {SYN_NOISE_MULT:.2f}, clipped to [{SYN_NOISE_RANGE[0]:.4f}, {SYN_NOISE_RANGE[1]:.4f}]')
print(f'  mu_max proxy (real, informational): median={np.median(arr_mu):.3f}, q10={_safe_quantile(arr_mu, 0.1):.3f}, q90={_safe_quantile(arr_mu, 0.9):.3f}')
print(f'  mu_max sampling target: log-uniform [{MU_MAX_LO}, {MU_MAX_HI}] h^-1')
print(f'  tail-case probability (variety boost): {VARIETY_TAIL_PROB:.2f}')
print('  synthetic model mix: ' + ', '.join([f'{m}:{p:.2f}' for m, p in zip(SYN_MODELS, SYN_MODEL_P)]))
print(f'  synthetic n_points range (uniform): [{SYN_NPTS_RANGE[0]}, {SYN_NPTS_RANGE[1]}], t_span range (uniform h): [{SYN_TSPAN_RANGE[0]:.1f}, {SYN_TSPAN_RANGE[1]:.1f}], od_fold range (uniform): [{SYN_OD_FOLD_RANGE[0]:.1f}, {SYN_OD_FOLD_RANGE[1]:.1f}]')

def _sample_real_like_curve(rng):
    base = real_desc[int(rng.integers(0, len(real_desc)))]
    t_base = np.asarray(base['t_template'], dtype=float)
    t0 = float(np.min(t_base))
    t_norm_base = (t_base - t0) / max(float(np.ptp(t_base)), 1e-08)
    tail_case = bool(rng.random() < VARIETY_TAIL_PROB)
    n0_jitter = 0.3 if tail_case else 0.18
    lag_jitter = 0.14 if tail_case else 0.06
    n_target = int(rng.integers(SYN_NPTS_RANGE[0], SYN_NPTS_RANGE[1] + 1))
    n_target = int(np.clip(n_target, 100, 1000))
    t_span = float(rng.uniform(SYN_TSPAN_RANGE[0], SYN_TSPAN_RANGE[1]))
    q_old = np.linspace(0.0, 1.0, len(t_norm_base))
    q_new = np.linspace(0.0, 1.0, n_target)
    t_norm = np.interp(q_new, q_old, t_norm_base)
    warp = float(rng.uniform(0.75, 1.35) if tail_case else rng.uniform(0.9, 1.12))
    t_norm = np.clip(t_norm ** warp, 0.0, 1.0)
    if n_target > 6:
        jitter_sd = 0.01 if tail_case else 0.004
        jitter = rng.normal(0.0, jitter_sd, size=n_target)
        jitter[0] = 0.0
        jitter[-1] = 0.0
        t_norm = np.clip(t_norm + jitter, 0.0, 1.0)
        t_norm = np.sort(t_norm)
        t_norm[0] = 0.0
        t_norm[-1] = 1.0
    t = t_norm * t_span
    t = np.maximum.accumulate(t)
    if t[-1] > t[0]:
        t = (t - t[0]) / (t[-1] - t[0]) * t_span
    N0 = float(np.clip(rng.choice(arr_od0) * rng.lognormal(0.0, n0_jitter), 0.0001, 3.0))
    fold = float(rng.uniform(SYN_OD_FOLD_RANGE[0], SYN_OD_FOLD_RANGE[1]))
    Nmax = float(max(1.01 * N0, N0 * max(fold, 0.0)))
    lag_frac = float(np.clip(rng.choice(arr_lag) + rng.normal(0.0, lag_jitter), 0.01, 0.96))
    lag = float(lag_frac * t_span)
    noise_sigma = float(rng.choice(arr_sigma))
    if not np.isfinite(noise_sigma) or noise_sigma <= 0:
        noise_sigma = float(np.nanmedian(arr_sigma)) if np.any(np.isfinite(arr_sigma)) else 0.0001
    noise_sigma = float(np.clip(noise_sigma * SYN_NOISE_MULT, SYN_NOISE_RANGE[0], SYN_NOISE_RANGE[1]))
    mu_target = float(np.exp(rng.uniform(np.log(MU_MAX_LO), np.log(MU_MAX_HI))))
    model_name = str(rng.choice(SYN_MODELS, p=SYN_MODEL_P))
    richards_nu = float(np.clip(rng.lognormal(0.0, 0.35), 0.35, 3.5)) if model_name == 'richards' else 1.0
    N_true, N_obs = make_growth_curve(t, N0=N0, Nmax=Nmax, mu_max=mu_target, lag=lag, noise_sigma=noise_sigma, seed=int(rng.integers(0, 1000000000)), model=model_name, richards_nu=richards_nu)
    mu_true = _estimate_true_mu_from_curve(t, N_true)
    if not np.isfinite(mu_true) or mu_true <= 0:
        mu_true = float(max(1e-06, mu_target))
    return (t, N_true, N_obs, mu_true, noise_sigma, N0, Nmax, model_name)

def _bin_index(v, edges):
    return int(np.digitize([float(v)], edges)[0])

def _uniform_from_range(rng, lo_hi):
    lo = float(lo_hi[0])
    hi = float(lo_hi[1])
    if hi < lo:
        lo, hi = (hi, lo)
    return float(lo if abs(hi - lo) < 1e-12 else rng.uniform(lo, hi))

def _make_eval_record(t, N_true, N_obs, mu_true, noise_sigma, model_name, curve_id, is_stress=False, stress_tag='', target_fail='none'):
    t = np.asarray(t, dtype=float)
    N_true = np.asarray(N_true, dtype=float)
    N_obs = np.asarray(N_obs, dtype=float)
    if len(t) < 10 or np.ptp(t) <= 0:
        return None
    od_inc = float(np.nanmax(N_obs) - np.nanmin(N_obs))
    if not np.isfinite(od_inc) or od_inc < EVAL_MIN_OD_INCREASE:
        return None
    mu_true = float(mu_true)
    if not np.isfinite(mu_true) or mu_true <= 0:
        mu_true = _estimate_true_mu_from_curve(t, N_true)
    if not np.isfinite(mu_true) or mu_true <= 0:
        return None
    return {'curve_id': str(curve_id), 't': np.asarray(t, dtype=float), 'N_true': np.asarray(N_true, dtype=float), 'N_obs': np.asarray(N_obs, dtype=float), 'mu_true': float(max(1e-06, mu_true)), 'noise_sigma': float(max(1e-08, noise_sigma)), 't_span': float(np.ptp(t)), 'n_pts': int(len(t)), 'model': str(model_name), 'od_fold': float(np.percentile(N_obs, 95) / max(np.percentile(N_obs, 5), 1e-08)), 'is_stress': bool(is_stress), 'stress_tag': str(stress_tag), 'target_fail': str(target_fail)}

def _inject_sparse_spikes(N_obs, rng, spike_prob=0.0, spike_mult_range=(1.0, 1.0)):
    N_obs = np.asarray(N_obs, dtype=float).copy()
    p = float(max(0.0, spike_prob))
    if p <= 0 or len(N_obs) < 5:
        return np.clip(N_obs, 1e-10, None)
    mask = rng.random(len(N_obs)) < p
    mask[0] = False
    mask[-1] = False
    if not np.any(mask):
        return np.clip(N_obs, 1e-10, None)
    lo = float(max(1.01, min(spike_mult_range)))
    hi = float(max(lo, max(spike_mult_range)))
    mags = np.exp(rng.uniform(np.log(lo), np.log(hi), size=int(np.sum(mask))))
    N_obs[mask] = N_obs[mask] * mags
    return np.clip(N_obs, 1e-10, None)

def _inject_low_od_relative_log_noise(N_true, rng, sigma_low, sigma_high, decay_power=2.0, min_weight=0.0):
    """
    Add heteroscedastic log-noise: strongest at low OD, weakest at high OD.
    """
    N_true = np.asarray(N_true, dtype=float)
    y_true = np.log(np.clip(N_true, 1e-12, None))
    sigma_low = float(max(1e-08, sigma_low))
    sigma_high = float(max(1e-08, sigma_high))
    if sigma_low < sigma_high:
        sigma_low, sigma_high = (sigma_high, sigma_low)
    pwr = float(np.clip(decay_power, 0.2, 8.0))
    min_w = float(np.clip(min_weight, 0.0, 1.0))
    if len(y_true) < 2 or np.ptp(y_true) <= 1e-12:
        sigma_vec = np.full(len(y_true), sigma_low, dtype=float)
    else:
        y01 = (y_true - float(np.min(y_true))) / float(np.ptp(y_true))
        w = np.power(1.0 - np.clip(y01, 0.0, 1.0), pwr)
        w = min_w + (1.0 - min_w) * w
        sigma_vec = sigma_high + (sigma_low - sigma_high) * w
    y_noisy = y_true + rng.normal(0.0, sigma_vec, size=len(y_true))
    return (np.clip(np.exp(y_noisy), 1e-10, None), np.asarray(sigma_vec, dtype=float))

def _generate_stress_pool(rng, n_per_scenario):
    n_per_scenario = int(max(0, n_per_scenario))
    if n_per_scenario <= 0:
        return []
    scenarios = [{'tag': 'baseline_high_noise_spikes', 'target_fail': 'baseline', 'model': 'gompertz', 'n_pts_range': (180, 420), 't_span_range': (24.0, 52.0), 'n0_range': (0.01, 0.06), 'fold_range': (12.0, 80.0), 'mu_range': (0.2, 0.55), 'lag_frac_range': (0.08, 0.35), 'noise_range': (0.45, 0.8), 'warp_range': (0.55, 1.15), 'spike_prob': 0.05, 'spike_mult_range': (2.5, 5.5), 'richards_nu_range': (1.0, 1.0)}, {'tag': 'baseline_dense_noisy', 'target_fail': 'baseline', 'model': 'logistic', 'n_pts_range': (650, 1000), 't_span_range': (30.0, 80.0), 'n0_range': (0.01, 0.08), 'fold_range': (8.0, 60.0), 'mu_range': (0.12, 0.4), 'lag_frac_range': (0.15, 0.45), 'noise_range': (0.3, 0.7), 'warp_range': (0.7, 1.5), 'spike_prob': 0.02, 'spike_mult_range': (2.0, 4.0), 'richards_nu_range': (1.0, 1.0)}, {'tag': 'baseline_sparse_extreme_noise', 'target_fail': 'baseline', 'model': 'logistic', 'n_pts_range': (100, 220), 't_span_range': (24.0, 60.0), 'n0_range': (0.005, 0.05), 'fold_range': (6.0, 40.0), 'mu_range': (0.08, 0.3), 'lag_frac_range': (0.1, 0.55), 'noise_range': (0.55, 0.8), 'warp_range': (0.45, 1.25), 'spike_prob': 0.1, 'spike_mult_range': (3.0, 8.0), 'richards_nu_range': (1.0, 1.0)}, {'tag': 'baseline_jagged_bursts', 'target_fail': 'baseline', 'model': 'gompertz', 'n_pts_range': (220, 520), 't_span_range': (28.0, 90.0), 'n0_range': (0.008, 0.07), 'fold_range': (10.0, 70.0), 'mu_range': (0.15, 0.6), 'lag_frac_range': (0.05, 0.42), 'noise_range': (0.4, 0.8), 'warp_range': (0.5, 1.6), 'spike_prob': 0.12, 'spike_mult_range': (4.0, 10.0), 'richards_nu_range': (1.0, 1.0)}, {'tag': 'auto_low_od_hetero_dense', 'target_fail': 'auto', 'model': 'gompertz', 'n_pts_range': (380, 1000), 't_span_range': (30.0, 96.0), 'n0_range': (0.003, 0.03), 'fold_range': (20.0, 180.0), 'mu_range': (0.2, 0.8), 'lag_frac_range': (0.08, 0.45), 'noise_range': (0.01, 0.05), 'noise_low_mult_range': (8.0, 18.0), 'noise_decay_power_range': (1.6, 3.5), 'noise_min_weight': 0.05, 'hetero_low_od_log_noise': True, 'warp_range': (0.7, 1.6), 'spike_prob': 0.03, 'spike_mult_range': (1.8, 3.6), 'richards_nu_range': (1.0, 1.0)}, {'tag': 'auto_low_od_hetero_sparse', 'target_fail': 'auto', 'model': 'logistic', 'n_pts_range': (120, 260), 't_span_range': (24.0, 84.0), 'n0_range': (0.002, 0.025), 'fold_range': (10.0, 130.0), 'mu_range': (0.12, 0.55), 'lag_frac_range': (0.1, 0.55), 'noise_range': (0.01, 0.06), 'noise_low_mult_range': (10.0, 24.0), 'noise_decay_power_range': (2.0, 4.5), 'noise_min_weight': 0.02, 'hetero_low_od_log_noise': True, 'warp_range': (0.55, 1.7), 'spike_prob': 0.06, 'spike_mult_range': (2.0, 4.5), 'richards_nu_range': (1.0, 1.0)}, {'tag': 'auto_sharp_transition', 'target_fail': 'auto', 'model': 'richards', 'n_pts_range': (350, 900), 't_span_range': (36.0, 96.0), 'n0_range': (0.02, 0.1), 'fold_range': (30.0, 180.0), 'mu_range': (0.9, 1.6), 'lag_frac_range': (0.35, 0.7), 'noise_range': (0.02, 0.08), 'warp_range': (0.9, 2.0), 'spike_prob': 0.0, 'spike_mult_range': (1.0, 1.0), 'richards_nu_range': (0.35, 0.7)}, {'tag': 'auto_late_fast_edge', 'target_fail': 'auto', 'model': 'gompertz', 'n_pts_range': (250, 700), 't_span_range': (28.0, 84.0), 'n0_range': (0.02, 0.1), 'fold_range': (20.0, 120.0), 'mu_range': (0.8, 1.5), 'lag_frac_range': (0.72, 0.93), 'noise_range': (0.02, 0.08), 'warp_range': (1.1, 2.2), 'spike_prob': 0.0, 'spike_mult_range': (1.0, 1.0), 'richards_nu_range': (1.0, 1.0)}, {'tag': 'auto_ultra_sharp_richards', 'target_fail': 'auto', 'model': 'richards', 'n_pts_range': (500, 1000), 't_span_range': (32.0, 96.0), 'n0_range': (0.015, 0.08), 'fold_range': (50.0, 220.0), 'mu_range': (1.2, 2.4), 'lag_frac_range': (0.45, 0.82), 'noise_range': (0.01, 0.05), 'warp_range': (1.4, 3.2), 'spike_prob': 0.0, 'spike_mult_range': (1.0, 1.0), 'richards_nu_range': (0.2, 0.45)}, {'tag': 'auto_terminal_takeoff', 'target_fail': 'auto', 'model': 'logistic', 'n_pts_range': (280, 900), 't_span_range': (24.0, 72.0), 'n0_range': (0.02, 0.1), 'fold_range': (15.0, 110.0), 'mu_range': (1.0, 2.2), 'lag_frac_range': (0.88, 0.97), 'noise_range': (0.01, 0.06), 'warp_range': (1.8, 3.5), 'spike_prob': 0.0, 'spike_mult_range': (1.0, 1.0), 'richards_nu_range': (1.0, 1.0)}]
    stress_pool = []
    for sc in scenarios:
        for j in range(n_per_scenario):
            n_pts = int(np.clip(int(round(_uniform_from_range(rng, sc['n_pts_range']))), 100, 1000))
            t_span = float(np.clip(_uniform_from_range(rng, sc['t_span_range']), SYN_TSPAN_RANGE[0], SYN_TSPAN_RANGE[1]))
            q = np.linspace(0.0, 1.0, n_pts)
            warp = _uniform_from_range(rng, sc['warp_range'])
            t = np.power(q, warp) * t_span
            if n_pts > 8:
                jitter_sd = 0.002 if sc['target_fail'] == 'auto' else 0.004
                jitter = rng.normal(0.0, jitter_sd, size=n_pts)
                jitter[0] = 0.0
                jitter[-1] = 0.0
                t = np.clip(t + jitter * t_span, 0.0, t_span)
                t = np.sort(t)
                t[0] = 0.0
                t[-1] = t_span
            N0 = _uniform_from_range(rng, sc['n0_range'])
            fold = _uniform_from_range(rng, sc['fold_range'])
            Nmax = float(max(1.01 * N0, N0 * fold))
            mu_max = _uniform_from_range(rng, sc['mu_range'])
            lag_frac = _uniform_from_range(rng, sc['lag_frac_range'])
            lag = float(np.clip(lag_frac * t_span, 0.01 * t_span, 0.97 * t_span))
            noise_sigma = float(np.clip(_uniform_from_range(rng, sc['noise_range']), SYN_NOISE_RANGE[0], SYN_NOISE_RANGE[1]))
            hetero_low_od = bool(sc.get('hetero_low_od_log_noise', False))
            noise_sigma_eff = float(noise_sigma)
            model_name = str(sc['model'])
            richards_nu = _uniform_from_range(rng, sc['richards_nu_range']) if model_name == 'richards' else 1.0
            if hetero_low_od:
                low_mult = float(max(1.0, _uniform_from_range(rng, sc.get('noise_low_mult_range', (4.0, 10.0)))))
                sigma_low = float(np.clip(noise_sigma * low_mult, noise_sigma, SYN_NOISE_RANGE[1]))
                decay_power = float(np.clip(_uniform_from_range(rng, sc.get('noise_decay_power_range', (1.5, 3.5))), 0.2, 8.0))
                min_weight = float(np.clip(sc.get('noise_min_weight', 0.0), 0.0, 1.0))
                N_true, _ = make_growth_curve(t, N0=N0, Nmax=Nmax, mu_max=mu_max, lag=lag, noise_sigma=0.0, seed=int(rng.integers(0, 1000000000)), model=model_name, richards_nu=richards_nu)
                N_obs, sigma_vec = _inject_low_od_relative_log_noise(N_true, rng, sigma_low=sigma_low, sigma_high=noise_sigma, decay_power=decay_power, min_weight=min_weight)
                if len(sigma_vec) > 0:
                    noise_sigma_eff = float(np.sqrt(np.mean(np.square(sigma_vec))))
            else:
                N_true, N_obs = make_growth_curve(t, N0=N0, Nmax=Nmax, mu_max=mu_max, lag=lag, noise_sigma=noise_sigma, seed=int(rng.integers(0, 1000000000)), model=model_name, richards_nu=richards_nu)
            N_obs = _inject_sparse_spikes(N_obs, rng, spike_prob=float(sc.get('spike_prob', 0.0)), spike_mult_range=tuple(sc.get('spike_mult_range', (1.0, 1.0))))
            mu_true = _estimate_true_mu_from_curve(t, N_true)
            rec = _make_eval_record(t, N_true, N_obs, mu_true, noise_sigma_eff, model_name, curve_id=f"stress_{sc['tag']}_{j:04d}", is_stress=True, stress_tag=sc['tag'], target_fail=sc['target_fail'])
            if rec is not None:
                stress_pool.append(rec)
    return stress_pool
rng_eval = np.random.default_rng(EVAL_SEED)
cal_pool = []
attempts = 0
target_pool = EVAL_N * max(2, EVAL_POOL_MULT)
max_attempts = EVAL_N * max(200, EVAL_POOL_MULT * 80)
while len(cal_pool) < target_pool:
    attempts += 1
    if attempts > max_attempts:
        raise RuntimeError(f'Could not build enough usable curves: {len(cal_pool)} / {target_pool} after {attempts} attempts.')
    t, N_true, N_obs, mu_true, noise_sigma, N0, Nmax, model_name = _sample_real_like_curve(rng_eval)
    rec = _make_eval_record(t, N_true, N_obs, mu_true, noise_sigma, model_name, curve_id=f'random_{attempts:06d}', is_stress=False, stress_tag='', target_fail='none')
    if rec is None:
        continue
    cal_pool.append(rec)
stress_pool = _generate_stress_pool(rng_eval, EVAL_STRESS_CASES_PER_SCENARIO)
cal_pool.extend(stress_pool)
stress_counts_pool = {}
stress_target_counts_pool = {}
for r in stress_pool:
    tag = str(r.get('stress_tag', 'stress'))
    stress_counts_pool[tag] = stress_counts_pool.get(tag, 0) + 1
    tgt = str(r.get('target_fail', 'unknown'))
    stress_target_counts_pool[tgt] = stress_target_counts_pool.get(tgt, 0) + 1
if len(cal_pool) < max(250, int(0.5 * EVAL_N)):
    raise RuntimeError(f'Only {len(cal_pool)} usable synthetic curves generated (target={EVAL_N}).')
strata = {}
for rec in cal_pool:
    k = (_bin_index(rec['n_pts'], EVAL_NPTS_BINS), _bin_index(rec['t_span'], EVAL_TSPAN_BINS))
    strata.setdefault(k, []).append(rec)
for k in strata:
    rng_eval.shuffle(strata[k])
all_keys = sorted(strata.keys())
if len(all_keys) == 0:
    raise RuntimeError('No strata formed for evaluation-set selection.')
eval_curves = []
used_by_key = {k: 0 for k in all_keys}
while len(eval_curves) < EVAL_N:
    progressed = False
    for k in all_keys:
        i = used_by_key[k]
        bucket = strata[k]
        if i < len(bucket):
            eval_curves.append(bucket[i])
            used_by_key[k] += 1
            progressed = True
            if len(eval_curves) >= EVAL_N:
                break
    if not progressed:
        break
stress_target_keep = EVAL_STRESS_TARGET
if stress_target_keep < 0:
    stress_target_keep = int(round(EVAL_STRESS_FRACTION * EVAL_N))
stress_target_keep = int(np.clip(stress_target_keep, 0, min(EVAL_STRESS_MAX_KEEP, EVAL_N, len(stress_pool))))
if stress_target_keep > 0 and len(eval_curves) > 0:
    selected_ids = {str(r.get('curve_id', '')) for r in eval_curves}
    stress_now = [i for i, r in enumerate(eval_curves) if bool(r.get('is_stress', False))]
    need = max(0, stress_target_keep - len(stress_now))
    if need > 0:
        candidate_idxs = [i for i, r in enumerate(eval_curves) if not bool(r.get('is_stress', False))]
        rng_eval.shuffle(candidate_idxs)
        extra_stress = [r for r in stress_pool if str(r.get('curve_id', '')) not in selected_ids]
        rng_eval.shuffle(extra_stress)
        for rep_idx, new_rec in zip(candidate_idxs, extra_stress[:need]):
            eval_curves[rep_idx] = new_rec
            selected_ids.add(str(new_rec.get('curve_id', '')))
fill_attempts = 0
max_fill_attempts = EVAL_N * 200
while len(eval_curves) < EVAL_N:
    fill_attempts += 1
    attempts += 1
    if fill_attempts > max_fill_attempts:
        raise RuntimeError(f'Could not refill eval_curves to {EVAL_N}; current={len(eval_curves)} after {fill_attempts} refill attempts.')
    t, N_true, N_obs, mu_true, noise_sigma, N0, Nmax, model_name = _sample_real_like_curve(rng_eval)
    rec = _make_eval_record(t, N_true, N_obs, mu_true, noise_sigma, model_name, curve_id=f'refill_{attempts:06d}', is_stress=False, stress_tag='', target_fail='none')
    if rec is None:
        continue
    eval_curves.append(rec)
if len(eval_curves) > EVAL_N:
    eval_curves = eval_curves[:EVAL_N]
if len(eval_curves) != EVAL_N:
    raise RuntimeError(f'Evaluation set size mismatch: got {len(eval_curves)}, expected {EVAL_N}.')
arr_mu_eval = np.asarray([r['mu_true'] for r in eval_curves], dtype=float)
arr_noise_eval = np.asarray([r['noise_sigma'] for r in eval_curves], dtype=float)
arr_span_eval = np.asarray([r['t_span'] for r in eval_curves], dtype=float)
arr_npts_eval = np.asarray([r['n_pts'] for r in eval_curves], dtype=float)
arr_fold_eval = np.asarray([r['od_fold'] for r in eval_curves], dtype=float)
if np.min(arr_npts_eval) < 100 or np.max(arr_npts_eval) > 1000:
    raise RuntimeError(f'Generated n_pts out of bounds: [{np.min(arr_npts_eval):.0f}, {np.max(arr_npts_eval):.0f}] (expected 100..1000).')
model_counts_eval = {}
stress_counts_eval = {}
stress_target_counts_eval = {}
for r in eval_curves:
    model_counts_eval[r['model']] = model_counts_eval.get(r['model'], 0) + 1
    if bool(r.get('is_stress', False)):
        tag = str(r.get('stress_tag', 'stress'))
        stress_counts_eval[tag] = stress_counts_eval.get(tag, 0) + 1
        tgt = str(r.get('target_fail', 'unknown'))
        stress_target_counts_eval[tgt] = stress_target_counts_eval.get(tgt, 0) + 1
print(f'Evaluation set ready: n={len(eval_curves)} (attempts={attempts}, pool={len(cal_pool)})')
print(f'  mu_true range: q05={np.quantile(arr_mu_eval, 0.05):.3f}, med={np.median(arr_mu_eval):.3f}, q95={np.quantile(arr_mu_eval, 0.95):.3f}')
print(f'  noise_sigma range: q05={np.quantile(arr_noise_eval, 0.05):.4f}, med={np.median(arr_noise_eval):.4f}, q95={np.quantile(arr_noise_eval, 0.95):.4f}')
print(f'  n_pts/t_span/od_fold medians: n_pts={np.median(arr_npts_eval):.1f}, t_span={np.median(arr_span_eval):.2f}, od_fold={np.median(arr_fold_eval):.2f}')
print(f'  n_pts/t_span ranges: n_pts=[{np.min(arr_npts_eval):.0f}, {np.max(arr_npts_eval):.0f}], t_span=[{np.min(arr_span_eval):.2f}, {np.max(arr_span_eval):.2f}] h')
print('  model counts:', model_counts_eval)
print(f'  stress pool generated: {len(stress_pool)}')
if stress_counts_pool:
    print('  stress pool by scenario:', stress_counts_pool)
if stress_target_counts_pool:
    print('  stress pool by expected failure mode:', stress_target_counts_pool)
print(f'  stress curves in eval set: {sum(stress_counts_eval.values())}/{len(eval_curves)} (target >= {stress_target_keep})')
if stress_counts_eval:
    print('  stress eval by scenario:', stress_counts_eval)
if stress_target_counts_eval:
    print('  stress eval by expected failure mode:', stress_target_counts_eval)
print('  strata counts (npts_bin, tspan_bin -> n):')
for k in all_keys:
    print(f'    {k}: {used_by_key[k]}')
arr_syn_n = np.asarray([len(s['t']) for s in eval_curves], dtype=float)
arr_syn_span = np.asarray([s['t_span'] for s in eval_curves], dtype=float)
arr_syn_sigma = np.asarray([s['noise_sigma'] for s in eval_curves], dtype=float)
arr_syn_fold = np.asarray([s['od_fold'] for s in eval_curves], dtype=float)
fig_dist = make_subplots(rows=2, cols=2, subplot_titles=['n points', 'time span (h)', 'noise proxy sigma', 'OD fold (p95/p05)'], horizontal_spacing=0.12, vertical_spacing=0.16)
for row, col, real_arr, syn_arr in [(1, 1, arr_n, arr_syn_n), (1, 2, arr_span, arr_syn_span), (2, 1, arr_sigma, arr_syn_sigma), (2, 2, arr_fold, arr_syn_fold)]:
    sl = row == 1 and col == 1
    fig_dist.add_trace(go.Histogram(x=real_arr, nbinsx=25, name='Real', opacity=0.6, marker=dict(color='#8a8a8a'), showlegend=sl), row=row, col=col)
    fig_dist.add_trace(go.Histogram(x=syn_arr, nbinsx=25, name='Synthetic', opacity=0.6, marker=dict(color=COLORS['auto']), showlegend=sl), row=row, col=col)
fig_dist.update_layout(title='Real vs synthetic descriptor distributions', barmode='overlay', width=1080, height=700, template=PLOTLY_TEMPLATE)
fig_dist.show(config=PLOTLY_CONFIG)

Real-data profile used for synthetic generation
  curves profiled: 208
  n points: median=281.0, q10=105.0, q90=694.0
  t span (h): median=138.60, q10=20.80, q90=138.98
  OD fold (p95/p05): median=3.27, q10=1.03, q90=19.19
  sigma proxy: median=0.0012, q10=0.0001, q90=0.0043
  synthetic test-set noise: sigma_proxy * 2.00, clipped to [0.0001, 0.8000]
  mu_max proxy (real, informational): median=0.279, q10=0.045, q90=6.477
  mu_max sampling target: log-uniform [0.05, 1.5] h^-1
  tail-case probability (variety boost): 0.35
  synthetic model mix: gompertz:0.45, logistic:0.30, richards:0.25
  synthetic n_points range (uniform): [100, 1000], t_span range (uniform h): [24.0, 96.0], od_fold range (uniform): [0.0, 200.0]
Evaluation set ready: n=500 (attempts=4061, pool=4320)
  mu_true range: q05=0.065, med=0.352, q95=1.585
  noise_sigma range: q05=0.0002, med=0.0076, q95=0.6814
  n_pts/t_span/od_fold medians: n_pts=492.5, t_span=52.77, od_fold=58.46
  n_pts/t_span ranges: n_pts=[101, 1000], t_s

In [11]:
import csv
from pathlib import Path

OUT_DIR = Path('exploratory notebooks/generated_data')
OUT_DIR.mkdir(parents=True, exist_ok=True)

EVAL_CURVES_CSV = OUT_DIR / 'spline_eval_curves.csv'
REAL_CURVES_CSV = OUT_DIR / 'spline_real_curves.csv'


def _as_float(v, default=float('nan')):
    try:
        return float(v)
    except Exception:
        return float(default)


with EVAL_CURVES_CSV.open('w', newline='') as fh:
    fields = [
        'curve_id', 'point_idx', 't', 'N_obs', 'N_true', 'mu_true', 'model',
        'noise_sigma', 't_span', 'n_pts', 'od_fold', 'is_stress', 'stress_tag', 'target_fail'
    ]
    writer = csv.DictWriter(fh, fieldnames=fields)
    writer.writeheader()
    for i, rec in enumerate(eval_curves):
        cid = str(rec.get('curve_id', f'eval_{i:05d}'))
        t = np.asarray(rec['t'], dtype=float)
        n_obs = np.asarray(rec['N_obs'], dtype=float)
        n_true = np.asarray(rec['N_true'], dtype=float)
        n = int(min(len(t), len(n_obs), len(n_true)))
        for j in range(n):
            writer.writerow({
                'curve_id': cid,
                'point_idx': int(j),
                't': float(t[j]),
                'N_obs': float(n_obs[j]),
                'N_true': float(n_true[j]),
                'mu_true': _as_float(rec.get('mu_true', np.nan)),
                'model': str(rec.get('model', 'unknown')),
                'noise_sigma': _as_float(rec.get('noise_sigma', np.nan)),
                't_span': _as_float(rec.get('t_span', np.nan)),
                'n_pts': int(rec.get('n_pts', n)),
                'od_fold': _as_float(rec.get('od_fold', np.nan)),
                'is_stress': int(bool(rec.get('is_stress', False))),
                'stress_tag': str(rec.get('stress_tag', '')),
                'target_fail': str(rec.get('target_fail', '')),
            })

with REAL_CURVES_CSV.open('w', newline='') as fh:
    fields = ['curve_id', 'file', 'well', 'point_idx', 't', 'N']
    writer = csv.DictWriter(fh, fieldnames=fields)
    writer.writeheader()
    for i, rec in enumerate(real_curves_auto):
        cid = str(rec.get('curve_id', f'real_{i:05d}'))
        t = np.asarray(rec['t'], dtype=float)
        N = np.asarray(rec['N'], dtype=float)
        n = int(min(len(t), len(N)))
        for j in range(n):
            writer.writerow({
                'curve_id': cid,
                'file': str(rec.get('file', '')),
                'well': str(rec.get('well', '')),
                'point_idx': int(j),
                't': float(t[j]),
                'N': float(N[j]),
            })

print(f'Wrote eval curves: {EVAL_CURVES_CSV}')
print(f'Wrote real curves: {REAL_CURVES_CSV}')
print(f'eval_curves={len(eval_curves)}, real_curves_auto={len(real_curves_auto)}')


Wrote eval curves: exploratory notebooks/generated_data/spline_eval_curves.csv
Wrote real curves: exploratory notebooks/generated_data/spline_real_curves.csv
eval_curves=500, real_curves_auto=226
