In [1]:
#!/usr/bin/env python3
"""
Trimmed-harmonic time-to-steady test (CW, −ω)
────────────────────────────────────────────
• Plateau rule fixed: 10-s window, ±5×10⁻⁴ rad
• Parameter grid: p1,p2∈[0,2] (10×10), a,b∈{1…5}  → 2 500 sets
• 3 initial δ₀ per zero-interval (quartile points)
• User controls TRIM_EPS: integration stops ε rad before the target zero
"""

# ----------- USER-EDITABLE TRIM DISTANCE -----------------
TRIM_EPS = 5e-4      # rad. e.g. 1e-3, 2e-4, …
# ---------------------------------------------------------

import itertools, numpy as np, pandas as pd, warnings
from math import sin, cos, pi
from scipy.integrate import solve_ivp, quad, IntegrationWarning
from multiprocessing import Pool, cpu_count
from tqdm import tqdm

# ----- plateau rule (fixed) -----
WINDOW_SEC, DT = 10, 0.001
WIN           = int(WINDOW_SEC/DT)
TOL_PLATEAU   = 5e-4

# ----- integration mesh -----
T_MAX = 100.0
TIME_EVAL = np.arange(0, T_MAX+DT, DT)

# ----- parameter grid: 10×10×5×5 -----
p1_vals = np.linspace(0, 2, 10)
p2_vals = np.linspace(0, 2, 10)
a_vals  = [1, 2, 3, 4, 5]
b_vals  = [1, 2, 3, 4, 5]
GRID = list(itertools.product(p1_vals, p2_vals, a_vals, b_vals))

# ----- ODE and f(δ) -----
def single_cell_ode(t, y, p1, p2, a, b):
    θ, ψ = y
    s2, s, c = sin(2*(ψ-θ)), sin(ψ-θ), cos(ψ-θ)
    denom = 2*((a*s)**2 + (b*c)**2)**1.5
    num   = (a*b)*(a**2 - b**2)*s2
    return [-1.0 - p1*num/denom,  -p2*s2]

def f_delta(δ, p1, p2, a, b):
    s2, s, c = np.sin(2*δ), np.sin(δ), np.cos(δ)
    denom = 2*((a*s)**2 + (b*c)**2)**1.5
    num   = (a*b)*(a**2 - b**2)*s2
    return -p2*s2 - (-1.0 - p1*num/denom)

# ----- zeros in one π-period -----
def zeros_one_period(p1,p2,a,b,n=20000):
    x = np.linspace(-pi,pi,n); y=f_delta(x,p1,p2,a,b)
    r=[]
    for i in range(n-1):
        if y[i]*y[i+1]<0:
            lo,hi=x[i],x[i+1]
            for _ in range(30):
                mid=0.5*(lo+hi)
                (hi,lo)=(mid,lo) if f_delta(lo,p1,p2,a,b)*f_delta(mid,p1,p2,a,b)<0 else (hi,mid)
            r.append(0.5*(lo+hi))
    r.sort()
    if not r: return []
    start=r[0]
    per=[z for z in r if start<z<start+pi]
    end=start+pi
    if abs(end-per[-1])>1e-8: per.append(end)
    per.insert(0,start)
    return per

# ----- integrate until plateau -----
def to_steady(δ0,p1,p2,a,b):
    θ0,ψ0 = pi/3, pi/3+δ0
    sol=solve_ivp(lambda t,y:single_cell_ode(t,y,p1,p2,a,b),
                  (0,T_MAX),[θ0,ψ0],t_eval=TIME_EVAL,
                  rtol=1e-6,atol=1e-9)
    if not sol.success: return None,None
    δ=sol.y[1]-sol.y[0]
    for i in range(len(δ)-WIN):
        win=δ[i:i+WIN]
        if np.max(np.abs(win-win[0]))<TOL_PLATEAU:
            return win[0], sol.t[i]
    return None,None

# ----- worker -----
def run(params):
    p1,p2,a,b=params
    z=zeros_one_period(p1,p2,a,b)
    if len(z)<2: return []
    rec=[]
    for zL,zR in zip(z[:-1],z[1:]):
        for q in (0.25,0.50,0.75):
            δ0=zL+q*(zR-zL)
            sgn=np.sign(f_delta(δ0,p1,p2,a,b))
            if sgn==0: continue
            δ_star=zR if sgn>0 else zL
            # trimmed upper / lower
            if δ_star>δ0:
                upper=δ_star-TRIM_EPS; lower=δ0
            else:
                lower=δ_star+TRIM_EPS; upper=δ0
            if upper<=lower: continue  # interval too small
            with warnings.catch_warnings():
                warnings.filterwarnings("ignore",category=IntegrationWarning)
                T_est,_=quad(lambda x:1/abs(f_delta(x,p1,p2,a,b)),
                             lower, upper, limit=200)
            steady,T_num=to_steady(δ0,p1,p2,a,b)
            if steady is None: continue
            rec.append(dict(T_num=T_num, T_est=T_est))
    return rec

# ----- main -----
def main():
    with Pool(cpu_count()) as pool:
        rows=[]
        for out in tqdm(pool.imap_unordered(run, GRID),
                        total=len(GRID), desc="Running"):
            rows.extend(out)

    df=pd.DataFrame(rows)
    diff=df["T_num"]-df["T_est"]
    N=len(df); n_gt=(diff>1e-6).sum(); n_lt=(diff<-1e-6).sum()
    med_abs=diff.abs().median(); q1,q3=diff.abs().quantile([0.25,0.75])
    med_rel=(diff.abs()/df["T_num"]).median()
    within2=((df["T_est"]/df["T_num"]).between(0.5,2)).mean()

    df.to_excel("trim_harmonic_validation.xlsx", index=False)
    print(f"\nSaved → trim_harmonic_validation.xlsx  ({N} trajectories)")
    print(f"Trim ε = {TRIM_EPS:.1e} rad")
    print(f"T_num > T_est : {n_gt}  ({100*n_gt/N:.2f} %)")
    print(f"T_num < T_est : {n_lt}  ({100*n_lt/N:.2f} %)")
    print(f"Median |Δt|    = {med_abs:.4f} s  (IQR {q1:.4f}–{q3:.4f})")
    print(f"Median rel err = {100*med_rel:.2f} %")
    print(f"Within factor-2= {100*within2:.2f} %")

if __name__ == "__main__":
    main()


Running: 100%|██████████| 2500/2500 [00:53<00:00, 47.12it/s]



Saved → trim_harmonic_validation.xlsx  (11130 trajectories)
Trim ε = 5.0e-04 rad
T_num > T_est : 11058  (99.35 %)
T_num < T_est : 72  (0.65 %)
Median |Δt|    = 0.0053 s  (IQR 0.0031–0.0086)
Median rel err = 0.24 %
Within factor-2= 100.00 %
