## Curve Fitting
Here we used the calibrated F-statistics to decide which model (1-PL or 2-PL) fits each curve better. To do so:
- **2-PL model**:  $ y = \frac{1}{1 + 2^{B (\log_2 C - \log_2 x)}} $; where the upper and lower asymptotes are fixed at 1 and 0, respectivly. The fitting function, fits 2pl with one intial guess of B=1 and C=median concentration, and 6 additional random starts, using TRF as minimizer (up to 8000 evaluations) and further refinement using Nelder-Mead optimizer (up to 20,000 evaluations); where B is bound to -20 and 20 and C is bound to lowest concentration/4 and highest concentration × 4.
- **Flat (1-PL) model**: is calculated as  $  y = c  $.; where $ c $ is the average of observed response (Inhibition Fraction) values and RSS is calculated as sum of sqaured deviations from mean.
- **p-value Calcualtion**: the p-value is computed by scaling the F-statistics to a z-score and applying the survival function of the F-distrubtion:
    - $z = \frac{F - \text{loc}}{\text{scale}}$
        - $F$ is the F-statistic, calculated as $F = \frac{(\text{RSS}_{\text{flat}} - \text{RSS}_{\text{2pl}}) / 1}{\text{RSS}_{\text{2pl}} / (n - 2)}$, with $\text{RSS}_{\text{flat}}$ and $\text{RSS}_{\text{2pl}}$ as the residual sum of squares for the flat and 2-PL models, and $n$ as the number of data points.
    - $p = 1 - F_{\text{CDF}}(z; \text{df1}, \text{df2})$
        - $F_{\text{CDF}}(z; \text{df1}, \text{df2})$ is the cumulative distribution function of the F-distribution, evaluated at $z$.
    - If p < 0.1 and 2-PL succeeded it wil use 2-PL, otherwise it uses flat model.


In [None]:
"""
dose_response_fit_flat_vs_2pl.py
• Seven‑start TRF + Nelder–Mead
• Calibrated F‑test
• **Per‑experiment fits (no tech‑rep pooling)**
"""

import json, warnings, multiprocessing as mp
import numpy as np, pandas as pd
from   scipy.optimize import curve_fit, minimize
from   scipy.stats    import f as f_dist
from   tqdm.auto      import tqdm
warnings.filterwarnings("ignore", category=RuntimeWarning)

# ──────────────────────────────────────────────
# 0 ▸ load & add row‑level identifier
# ──────────────────────────────────────────────
clean_hts = (
    pd.read_csv("/projects/amp/asalehi/Dose/data/clean/full_clean_hts.csv")
      .reset_index()                   # <-  adds a 0‑based integer index
      .rename(columns={"index": "Row_ID"})
)

with open("/projects/amp/asalehi/Dose/f_calibration/null_f_params_full_1M.json") as f:
    null_params = json.load(f)
null_params.pop("_meta", None)

df = clean_hts[~clean_hts["Peptide_Name"]
               .isin(["GrowthControl", "SterilityControl"])].copy()

ABS_ORDER  = ["128","64","32","16","8","4","2","1","0.5","0.25"]
IF_COLUMNS = [f"i{c}_win" for c in ABS_ORDER]

long_df = (
    df.melt(
        id_vars=["Row_ID", "Peptide_Name", "Organism"],
        value_vars=IF_COLUMNS,
        var_name="Conc_lbl",
        value_name="IF",
    )
    .dropna(subset=["IF"])
    .assign(
        Concentration=lambda d: d["Conc_lbl"]
        .str.extract(r"i([0-9.]+)")
        .astype(float)[0]
    )
    .drop(columns="Conc_lbl")
)

# group **per experiment** (one Row_ID = one tech‑rep / assay row)
groups = long_df.groupby(["Row_ID", "Peptide_Name", "Organism"])

# ──────────────────────────────────────────────
# 1 ▸ model helpers  (unchanged)
# ──────────────────────────────────────────────
def two_pl(x, B, C):
    return 1 / (1 + np.exp2(B * (np.log2(C) - np.log2(x))))

def fit_two_pl(x, y, rng, n_start=6):
    p0     = [1.0, np.median(x)]
    bounds = ([0, np.min(x)/4], [10, np.max(x)*4])
    starts = [p0] + rng.uniform(*np.array(bounds), size=(n_start, 2)).tolist()

    best_rss, best_theta = np.inf, None
    for s in starts:
        try:
            θ, _ = curve_fit(two_pl, x, y, p0=s, bounds=bounds,
                             method="trf", max_nfev=8000)
            r = np.sum((y - two_pl(x, *θ))**2)
            if r < best_rss:
                best_rss, best_theta = r, θ
        except Exception:
            pass
    if best_theta is None:
        return None, np.inf

    res = minimize(
        lambda t: np.sum((y - two_pl(x, *t))**2),
        best_theta,
        method="nelder-mead",
        bounds=[(0, 10), (np.min(x)/4, np.max(x)*4)],
        options={"adaptive": True, "xatol": 1e-8, "fatol": 1e-8,
                 "maxfev": 20_000}
    )
    if res.success and res.fun < best_rss:
        best_theta, best_rss = res.x, res.fun

    return best_theta, best_rss


# ──────────────────────────────────────────────
# 2 ▸ fit one experiment
# ──────────────────────────────────────────────
def fit_single_block(args):
    row_id, pep, org, block, nulls = args
    if block.shape[0] < 3:
        return None
    x, y = block["Concentration"].values, block["IF"].values

    c = y.mean()
    rss_flat = ((y - c)**2).sum()
    rng = np.random.default_rng(row_id)
    θ, rss_2pl = fit_two_pl(x, y, rng)

    f_stat = 0.0
    if rss_2pl < rss_flat - 1e-10:
        f_stat = ((rss_flat - rss_2pl) / 1) / (rss_2pl / (len(y) - 2))

    org_par = nulls.get(org, {})
    lbl     = "sterility" if "sterility" in org_par else (
              "growth"    if "growth"    in org_par else None)
    if lbl is None:
        p_val = 1.0
    else:
        d1,d2,loc,sc = (org_par[lbl]["flat_vs_2pl"][k] for k in
                        ("df1","df2","loc","scale"))
        z = (f_stat - loc) / (sc or 1e-6)
        p_val = f_dist.sf(z, d1, d2)

    if p_val < 0.1 and θ is not None:
        B,C   = θ
        model = "2pl"
        i50,slope = C,B
        rss = rss_2pl
        eq  = f"y = 1/(1+2^({B:.3f}*(log2({C:.3f})-log2(x))))"
        r2  = 1 - rss / ((y-y.mean())**2).sum()
        fp,bp = 0.0,1.0
    else:
        model, eq = "flat", f"y = {c:.3f}"
        fp = bp = c
        i50 = slope = np.nan

        rss = rss_flat                    # ← use the real flat‑model RSS
        ss_tot = ((y - y.mean())**2).sum()
        r2 = 1 - rss / ss_tot if ss_tot else np.nan


    return {
        "Row_ID": row_id,
        "Peptide_Name": pep,
        "Organism": org,
        "model": model,
        "equation": eq,
        "front_plateau": fp,
        "back_plateau":  bp,
        "i50": i50,
        "slope": slope,
        "p_value": p_val,
        "r2": r2,
    }

# prepare jobs
jobs = [
    (row_id, pep, org, block, null_params)
    for (row_id, pep, org), block in groups
]

with mp.Pool(min(120, mp.cpu_count())) as pool:
    results = list(tqdm(pool.imap(fit_single_block, jobs),
                        total=len(jobs), desc="Fitting curves"))

fit_df = pd.DataFrame([r for r in results if r is not None])

# ──────────────────────────────────────────────
# 3 ▸ merge back on Row_ID & save
# ──────────────────────────────────────────────
out = clean_hts.merge(fit_df, on=["Row_ID","Peptide_Name","Organism"],
                      how="left")
out.to_csv(
    "/projects/amp/asalehi/Dose/data/clean/full_clean_hts_with_fits_no_pooling.csv",
    index=False,
)
print("✓ wrote full_clean_hts_with_fits_no_pooling.csv")


  from .autonotebook import tqdm as notebook_tqdm
Fitting curves: 100%|██████████| 18467/18467 [00:19<00:00, 938.58it/s] 


✓ wrote full_clean_hts_with_fits_no_pooling.csv
