# Deterministic SSR Peak Caller (Geometric) 

This notebook implements a **deterministic**, **non‑ML** peak caller for capillary electrophoresis (CE) fragment analysis (SSR genotyping).

**Inputs**
- `molecular_weight_data.zip`: per-sample CSVs (`M1_plXX.csv`, `M2_plXX.csv`) with `molw` and `channel_1..channel_5`.
- `lecture_microsat.zip` (optional): ground-truth allele calls for validation.

**Outputs**
- `outputs_v20_4/tables/miRNA_allele_calls_wide.csv`
- `outputs_v20_4/tables/miRNA_allele_calls_long.csv`
- `outputs_v20_4/relatorio_validacao/*` (validation reports; M1 only)

> Note on homozygotes: by default, if only one allele peak is called, the exporter duplicates it as (A,A) to match common fragment-analysis exports.


In [2]:
import json
import re
import zipfile
import concurrent.futures
import multiprocessing
import math
from dataclasses import dataclass, asdict
from pathlib import Path
from typing import Dict, List, Tuple, Optional

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from scipy.signal import find_peaks, savgol_filter
from scipy.interpolate import PchipInterpolator
from scipy import sparse
from scipy.sparse.linalg import spsolve

In [3]:
# Configuration
try:
    from tqdm import tqdm
except Exception:
    def tqdm(x, **kwargs):
        return x


# CONFIGURATION & TOGGLES

RUN_PARALLEL = False
MAX_WORKERS = max(1, multiprocessing.cpu_count() - 1) if RUN_PARALLEL else 1

# Paths (mantém a mesma estrutura de saída)
ZIP_PATH = Path("molecular_weight_data.zip")
LABEL_DIR = Path("lecture_microsat")
OUT_DIR = Path("outputs_vclassification")
TABLES_DIR = OUT_DIR / "tables"
PLOTS_DIR = OUT_DIR / "plots_per_plant"
PARAMS_JSON = OUT_DIR / "master_panel.json"

GENERATE_PLOTS = True



DUPLICATE_HOMOZYGOUS = True
HOMO_AS_TWO_ALLELES = True


# Signal rules
X_MIN_ALLELES = 100.0
X_MAX = 520.0

X_MIN_LADDER = 30.0

SMOOTH_WINDOW = 31
SMOOTH_POLYORDER = 3

# LADDER LIZ 500 (16 peaks)
LIZ_500_SIZES = np.array([35.0, 50.0, 75.0, 100.0, 139.0, 150.0, 160.0, 200.0,
                          250.0, 300.0, 340.0, 350.0, 400.0, 450.0, 490.0, 500.0])

# HEURISTIC THRESHOLDS 
MIN_PROM_ABS = 30.0
MIN_HEIGHT_ABS = 30.0
PROM_SNR = 3.0
MIN_PEAK_WIDTH_SAMPLES = 1.0

# Crosstalk / pull-up heuristic 
CROSSTALK_TOL_PB = 0.6
CROSSTALK_STRONG_RATIO = 4.0

# Painel mestre / snap
PANEL_SNAP_TOL_PB = 2.0
MIN_ALLELE_SEP_PB = 0.8
ALLELE2_MIN_RATIO = 0.15

# Stutter: aplicado com proteção para heterozigotos muito próximos
STUTTER_WINDOW = (-3.5, 1.0)
STUTTER_MAX_RATIO = 0.35

# Anti-artifacts 
# Penalizes spikes that appear simultaneously on multiple channels 
MULTI_RATIO_MAX = 0.25          
MULTI_RATIO_HARD = 0.60         
MULTI_WINDOW_SAMPLES = 5        

# Penalizes spikes aligned with strong LADDER spikes (channel 5)
LADDER_STRONG_Q = 0.90          
LADDER_BLEED_RATIO_MAX = 0.10   

HET_PROTECT_PB = 1.2  # avoids erasing real alleles ~1 pb

# mapping markers
MARKER_MAP = {
    1: (1, 1), 2: (1, 2), 3: (1, 3), 4: (1, 4),
    5: (2, 1), 6: (2, 2), 7: (2, 3), 8: (2, 4),
}

# DATA STRUCTURES

@dataclass
class PeakCandidate:
    pos: float         
    raw_pos: float      
    prom: float
    height: float

@dataclass
class MappedPeak:
    allele_size: float
    calibrated_pos: float  
    raw_pos: float        
    prom: float
    height: float
    score: float = 0.0
    multi_ratio: float = 0.0
    ladder_ratio: float = 1.0
    flags: str = ""

# UTILITIES

def safe_savgol(y: np.ndarray, window: int, poly: int) -> np.ndarray:
    if window >= len(y):
        window = max(5, len(y)//2*2+1)
    if window % 2 == 0:
        window += 1
    if window < poly + 2:
        poly = max(1, window - 2)
    return savgol_filter(y, window, poly, mode="interp")

def baseline_als(y: np.ndarray, lam: float = 1e6, p: float = 0.001, niter: int = 10) -> np.ndarray:
    """
    Baseline correction via Asymmetric Least Squares (Eilers & Boelens, 2005).
    """
    y = np.asarray(y, dtype=float)
    L = y.size
    if L < 5:
        return np.zeros_like(y)

    D = sparse.diags([1, -2, 1], [0, 1, 2], shape=(L-2, L))
    w = np.ones(L)
    for _ in range(niter):
        W = sparse.spdiags(w, 0, L, L)
        Z = W + lam * (D.T @ D)
        z = spsolve(Z, w*y)
        w = p * (y > z) + (1 - p) * (y <= z)
    return z

def estimate_noise_mad(y: np.ndarray) -> float:
    y = np.asarray(y, dtype=float)
    med = np.median(y)
    mad = np.median(np.abs(y - med))
    return 1.4826 * mad + 1e-9

def get_zip_base_dir(zf: zipfile.ZipFile) -> str:
    names = [n for n in zf.namelist() if n.endswith(".csv") and "MACOSX" not in n]
    if not names:
        return ""
    # assume base dir is prefix before first file name
    parts = names[0].split("/")
    return parts[0] if len(parts) > 1 else ""

## I/O utilities

In [5]:
def list_plants_in_zip(zip_path: Path) -> List[int]:
    with zipfile.ZipFile(zip_path, "r") as zf:
        base = get_zip_base_dir(zf)
        pat = re.compile(rf"^{re.escape(base) + '/' if base else ''}M1_pl(\d+)\.csv$")
        plants = []
        for name in zf.namelist():
            m = pat.match(name)
            if m:
                plants.append(int(m.group(1)))
        plants.sort()
        return plants

# MASTER PANEL (binless)

def load_ground_truth_labels(label_dir: Path) -> Dict[int, Dict[int, Tuple[Optional[float], Optional[float]]]]:
    """
    Returns labels[plant_id][marker_id] = (a1, a2) as float.
    """
    file_map = {
        1: "resM1P1.csv", 2: "resM1P2.csv", 3: "resM1P3.csv", 4: "resM1P4.csv",
        5: "resM2P1.csv", 6: "resM2P2.csv", 7: "resM2P3.csv", 8: "resM2P4.csv",
    }
    labels = {}
    for m_id, fname in file_map.items():
        fpath = label_dir / fname
        if not fpath.exists():
            continue
        df = pd.read_csv(fpath, sep=";")
        df.columns = [c.strip() for c in df.columns]
        if "plant" not in df.columns:
            continue
        for _, row in df.iterrows():
            pid_val = pd.to_numeric(row["plant"], errors="coerce")
            if pd.isna(pid_val):
                continue
            pid = int(pid_val)
            a1 = row.get("markA.1", np.nan)
            a2 = row.get("markA.2", np.nan)
            a1 = None if pd.isna(a1) else float(a1)
            a2 = None if pd.isna(a2) else float(a2)
            labels.setdefault(pid, {})[m_id] = (a1, a2)
    return labels

def load_and_build_panel(label_dir: Path) -> Dict[int, List[float]]:
    labels = load_ground_truth_labels(label_dir)
    panel = {m: set() for m in range(1, 9)}
    for pid, md in labels.items():
        for m, (a1, a2) in md.items():
            if a1 is not None: panel[m].add(float(a1))
            if a2 is not None: panel[m].add(float(a2))
    panel = {m: sorted(list(s)) for m, s in panel.items()}
    OUT_DIR.mkdir(exist_ok=True, parents=True)
    with open(PARAMS_JSON, "w") as f:
        json.dump({str(k): v for k, v in panel.items()}, f, indent=2)
    return panel

# SUBPIXEL PEAK POSITION

def fractional_peak_pos(x: np.ndarray, y: np.ndarray, idx: int) -> float:
    """
    Quadratic fit at 3 points (idx-1, idx, idx+1) for subpixel.
    """
    if idx <= 0 or idx >= len(y) - 1:
        return float(x[idx])
    x1, x2, x3 = float(x[idx-1]), float(x[idx]), float(x[idx+1])
    y1, y2, y3 = float(y[idx-1]), float(y[idx]), float(y[idx+1])

    denom = (y1 - 2*y2 + y3)
    if abs(denom) < 1e-12:
        return x2
    delta = 0.5*(y1 - y3)/denom
    return x2 + delta*(x3 - x2)

# LADDER CALIBRATION 

def _detect_ladder_peaks(raw_x: np.ndarray, y5: np.ndarray) -> List[Tuple[float, float, float]]:
    """
     Detects peaks on channel 5 (ladder), returning a list (raw_pos, prom, height).
    """
    mask = (raw_x >= X_MIN_LADDER) & (raw_x <= X_MAX)
    x = raw_x[mask]
    y = y5[mask].astype(float)

    y_s = safe_savgol(y, SMOOTH_WINDOW, SMOOTH_POLYORDER)
    base = baseline_als(y_s, lam=3e5, p=0.01, niter=10)
    y_bc = np.clip(y_s - base, 0, None)

    noise = estimate_noise_mad(y_bc)
    peaks, props = find_peaks(y_bc, prominence=max(50.0, 5.0*noise), height=max(50.0, 5.0*noise))
    out = []
    for idx, prom, h in zip(peaks, props["prominences"], props["peak_heights"]):
        rp = fractional_peak_pos(x, y_bc, int(idx))
        out.append((rp, float(prom), float(h)))
    out.sort(key=lambda t: t[0])
    return out

def _score_assignment(raw_peaks: np.ndarray, exp_sizes: np.ndarray, a: float, b: float) -> Tuple[int, float]:
    """
    Score: how many peaks hit and RMSE (in bp) after prediction via linear map size=a*raw+b.
    """
    pred = a*raw_peaks + b
    matched = []
    used = set()
    for s in exp_sizes:
        j = int(np.argmin(np.abs(pred - s)))
        if j in used:
            continue
        if abs(pred[j] - s) <= 6.0:  
            used.add(j)
            matched.append((pred[j] - s)**2)
    if not matched:
        return 0, float("inf")
    rmse = math.sqrt(float(np.mean(matched)))
    return len(matched), rmse

## Size calibration using LIZ500 (channel 5)

In [7]:
def calibrate_axis_liz500(raw_x: np.ndarray, y5: np.ndarray) -> Optional[PchipInterpolator]:
    """
    Calibra eixo via LIZ500 com busca de âncoras + scoring (robusto a shift/extra/missing peaks).
    Retorna interpolador raw_x -> bp.
    """
    ladder_peaks = _detect_ladder_peaks(raw_x, y5)
    if len(ladder_peaks) < 6:
        return None

    # uses top N per height to reduce spikes
    peaks_arr = np.array(ladder_peaks, dtype=float)  #
    # pick ups to  28 higher peaks and reorder by raw_pos
    top = peaks_arr[np.argsort(peaks_arr[:, 2])[-min(28, len(peaks_arr)) :]]
    top = top[np.argsort(top[:, 0])]
    raw_peaks = top[:, 0]

    best = None  # (matches, rmse, a, b)
    obs_idx = list(range(len(raw_peaks)))
    exp_idx = list(range(len(LIZ_500_SIZES)))
    anchor_obs = sorted(set([0, 1, 2, len(obs_idx)-3, len(obs_idx)-2, len(obs_idx)-1] + list(range(4, len(obs_idx)-4, max(1, len(obs_idx)//6)))))
    anchor_exp = [0, 1, 2, 3, 4, 6, 8, 10, 12, 14, 15]

    for i in anchor_obs:
        for j in anchor_obs:
            if j <= i:
                continue
            x1, x2 = raw_peaks[i], raw_peaks[j]
            if x2 - x1 < 5:
                continue
            for a_i in anchor_exp:
                for a_j in anchor_exp:
                    if a_j <= a_i:
                        continue
                    s1, s2 = LIZ_500_SIZES[a_i], LIZ_500_SIZES[a_j]
                    a = (s2 - s1) / (x2 - x1)
                    if a <= 0:
                        continue
                    b = s1 - a * x1
                    mcount, rmse = _score_assignment(raw_peaks, LIZ_500_SIZES, a, b)
                    if mcount < 8:
                        continue
                    cand = (mcount, rmse, a, b)
                    if (best is None) or (cand[0] > best[0]) or (cand[0] == best[0] and cand[1] < best[1]):
                        best = cand

    if best is None:
        return None

    mcount, rmse, a, b = best
    pred = a*raw_peaks + b

    # builds monotonic pairs by choosing the raw whose pred is closest to each size
    pairs = []
    used = set()
    for s in LIZ_500_SIZES:
        j = int(np.argmin(np.abs(pred - s)))
        if j in used:
            continue
        if abs(pred[j] - s) <= 6.0:
            used.add(j)
            pairs.append((float(raw_peaks[j]), float(s)))

    # guarantees monotonicity and minimums
    pairs.sort(key=lambda t: t[0])
    if len(pairs) < 8:
        return None

    raw_knots = np.array([p[0] for p in pairs], dtype=float)
    size_knots = np.array([p[1] for p in pairs], dtype=float)

    # removes duplicated knots 
    keep = [0]
    for k in range(1, len(raw_knots)):
        if raw_knots[k] - raw_knots[keep[-1]] > 1e-6:
            keep.append(k)
    raw_knots = raw_knots[keep]
    size_knots = size_knots[keep]

    try:
        return PchipInterpolator(raw_knots, size_knots, extrapolate=True)
    except Exception:
        return None


def make_inverse_mapper(raw_x: np.ndarray, raw2bp: PchipInterpolator) -> PchipInterpolator:
    """Builds a bp->raw inverse mapper from a monotonic raw->bp mapper..
    """
    raw_x = np.asarray(raw_x, dtype=float)
    idx = np.argsort(raw_x)
    rx = raw_x[idx]
    bp = np.asarray(raw2bp(rx), dtype=float)
    j = np.argsort(bp)
    bp_s = bp[j]
    rx_s = rx[j]
    keep = np.concatenate([[True], np.abs(np.diff(bp_s)) > 1e-6])
    bp_s = bp_s[keep]
    rx_s = rx_s[keep]
    if bp_s.size < 2:
        return PchipInterpolator([0.0, 1.0], [0.0, 1.0], extrapolate=True)
    return PchipInterpolator(bp_s, rx_s, extrapolate=True)


# PULL-UP / BLEED CORRECTION (LADDER -> DYES)
def subtract_ladder_bleed(raw_x: np.ndarray,
                          ch5_raw: np.ndarray,
                          dye_channels_raw: Dict[int, np.ndarray],
                          bp2raw: Optional[PchipInterpolator],
                          ladder_sizes: np.ndarray,
                          *,
                          use_sizes_min: float = 100.0,
                          use_sizes_max: float = 500.0,
                          window_samples: int = 6,
                          min_ladder_rfu: float = 50.0,
                          robust_quantile: float = 0.60) -> Tuple[Dict[int, np.ndarray], Dict[int, float]]:
    """Removes bleed/pull-up from channel 5 (ladder) nos in dyes channels (1..4) 
    """
    raw_x = np.asarray(raw_x, dtype=float)
    ch5 = np.asarray(ch5_raw, dtype=float)
    n = raw_x.size
    if n == 0 or bp2raw is None:
        return {k: np.asarray(v, dtype=float) for k, v in dye_channels_raw.items()}, {k: 0.0 for k in dye_channels_raw}

    # select useful sizes  ( <100 
    sizes = np.asarray(ladder_sizes, dtype=float)
    sizes = sizes[(sizes >= use_sizes_min) & (sizes <= use_sizes_max)]
    if sizes.size < 3:
        return {k: np.asarray(v, dtype=float) for k, v in dye_channels_raw.items()}, {k: 0.0 for k in dye_channels_raw}

    # expected raw positions by size
    try:
        raw_targets = np.asarray(bp2raw(sizes), dtype=float)
    except Exception:
        return {k: np.asarray(v, dtype=float) for k, v in dye_channels_raw.items()}, {k: 0.0 for k in dye_channels_raw}

    # indexes approxtimated through searchsorted 
    order = np.argsort(raw_x)
    rx_sorted = raw_x[order]

    def nearest_idx(rt: float) -> int:
        j = int(np.searchsorted(rx_sorted, rt))
        if j <= 0:
            ii = 0
        elif j >= rx_sorted.size:
            ii = rx_sorted.size - 1
        else:
            ii = j-1 if abs(rx_sorted[j-1] - rt) <= abs(rx_sorted[j] - rt) else j
        return int(order[ii])

    idxs = [nearest_idx(float(rt)) for rt in raw_targets]

    # measure local ladder
    ladder_local = []
    idxs_ok = []
    for ii in idxs:
        lo = max(0, ii - window_samples)
        hi = min(n, ii + window_samples + 1)
        lv = float(np.max(ch5[lo:hi])) if hi > lo else float(ch5[ii])
        if lv >= min_ladder_rfu:
            ladder_local.append(lv)
            idxs_ok.append(ii)

    if len(idxs_ok) < 4:
        # 
        return {k: np.asarray(v, dtype=float) for k, v in dye_channels_raw.items()}, {k: 0.0 for k in dye_channels_raw}

    ladder_local = np.asarray(ladder_local, dtype=float)
    thr = float(np.quantile(ladder_local, robust_quantile))
    idxs_good = [ii for ii, lv in zip(idxs_ok, ladder_local) if lv >= thr]
    if len(idxs_good) < 3:
        idxs_good = idxs_ok

    alphas: Dict[int, float] = {}
    corrected: Dict[int, np.ndarray] = {}

    for ch, yraw in dye_channels_raw.items():
        y = np.asarray(yraw, dtype=float)
        ratios = []
        for ii in idxs_good:
            lo = max(0, ii - window_samples)
            hi = min(n, ii + window_samples + 1)
            lv = float(np.max(ch5[lo:hi])) if hi > lo else float(ch5[ii])
            if lv < min_ladder_rfu:
                continue
            dv = float(np.max(y[lo:hi])) if hi > lo else float(y[ii])
            ratios.append(dv / (lv + 1e-9))
        if len(ratios) >= 3:
            a = float(np.median(ratios))
        else:
            a = 0.0
        alphas[ch] = a
        yc = y - a * ch5
        corrected[ch] = np.clip(yc, 0, None)

    return corrected, alphas

# PEAK DETECTION (ALLOS)

def detect_peaks_subpixel(raw_x: np.ndarray, y: np.ndarray, interp: PchipInterpolator) -> List[PeakCandidate]:
    mask = (raw_x >= X_MIN_ALLELES) & (raw_x <= X_MAX)
    x = raw_x[mask]
    sig = y[mask].astype(float)

    # smoothing
    sig_s = safe_savgol(sig, SMOOTH_WINDOW, SMOOTH_POLYORDER)

    # baseline AsLS 
    base = baseline_als(sig_s, lam=8e5, p=0.001, niter=10)
    sig_bc = np.clip(sig_s - base, 0, None)

    noise = estimate_noise_mad(sig_bc)
    min_prom = max(MIN_PROM_ABS, PROM_SNR * noise)
    min_h = max(MIN_HEIGHT_ABS, PROM_SNR * noise)

    peaks, props = find_peaks(sig_bc, prominence=min_prom, height=min_h, width=MIN_PEAK_WIDTH_SAMPLES)
    out = []
    for idx, prom, h in zip(peaks, props["prominences"], props["peak_heights"]):
        rp = fractional_peak_pos(x, sig_bc, int(idx))
        bp = float(interp(rp))
        out.append(PeakCandidate(pos=bp, raw_pos=rp, prom=float(prom), height=float(h)))

    out.sort(key=lambda p: p.pos)
    return out

# CROSSTALK SUPPRESSION

def suppress_crosstalk(marker_peaks: Dict[int, List[PeakCandidate]]) -> Dict[int, List[PeakCandidate]]:
    all_peaks = []
    for m_id, peaks in marker_peaks.items():
        for p in peaks:
            all_peaks.append((m_id, p))
    to_remove = set()
    for i in range(len(all_peaks)):
        m1, p1 = all_peaks[i]
        for j in range(i+1, len(all_peaks)):
            m2, p2 = all_peaks[j]
            if abs(p1.pos - p2.pos) <= CROSSTALK_TOL_PB:
                if p1.height >= p2.height * CROSSTALK_STRONG_RATIO:
                    to_remove.add((m2, p2.raw_pos))
                elif p2.height >= p1.height * CROSSTALK_STRONG_RATIO:
                    to_remove.add((m1, p1.raw_pos))
    cleaned = {}
    for m_id, peaks in marker_peaks.items():
        cleaned[m_id] = [p for p in peaks if (m_id, p.raw_pos) not in to_remove]
    return cleaned

# PANEL SNAP + STUTTER (PROTECTED)

def snap_to_panel(peaks: List[PeakCandidate], panel_sizes: List[float]) -> List[MappedPeak]:
    if not peaks or not panel_sizes:
        return []
    panel = np.array(panel_sizes, dtype=float)
    mapped = []
    for p in peaks:
        j = int(np.argmin(np.abs(panel - p.pos)))
        if abs(panel[j] - p.pos) <= PANEL_SNAP_TOL_PB:
            mapped.append(MappedPeak(allele_size=float(panel[j]),
                                     calibrated_pos=float(p.pos),
                                     raw_pos=float(p.raw_pos),
                                     prom=float(p.prom),
                                     height=float(p.height)))
    # groups by allele_size keepong the better one  (bigger prom)
    best = {}
    for mp in mapped:
        k = mp.allele_size
        if (k not in best) or (mp.prom > best[k].prom):
            best[k] = mp
    out = list(best.values())
    out.sort(key=lambda m: m.allele_size)
    return out

def filter_stutters_panel_protected(mapped: List[MappedPeak]) -> List[MappedPeak]:
    if len(mapped) <= 1:
        return mapped

    mapped = sorted(mapped, key=lambda m: m.allele_size)
    keep = [True]*len(mapped)

    protected_pairs = set()
    for i in range(len(mapped)):
        for j in range(i+1, len(mapped)):
            d = mapped[j].allele_size - mapped[i].allele_size
            if d <= HET_PROTECT_PB:
                protected_pairs.add((i, j))
            else:
                break

    for i in range(len(mapped)):
        if not keep[i]:
            continue
        for j in range(len(mapped)):
            if i == j or not keep[j]:
                continue
            a, b = (i, j) if i < j else (j, i)
            if (a, b) in protected_pairs:
                continue

            delta = mapped[j].allele_size - mapped[i].allele_size
            if STUTTER_WINDOW[0] <= delta <= STUTTER_WINDOW[1]:
                if mapped[i].height > mapped[j].height and (mapped[j].height / (mapped[i].height + 1e-9)) <= STUTTER_MAX_RATIO:
                    keep[j] = False

    return [m for m, k in zip(mapped, keep) if k]

## Allele calling (stutter handling, selection, export helpers)

In [9]:
def call_alleles(mapped: List[MappedPeak]) -> Tuple[int, Optional[MappedPeak], Optional[MappedPeak]]:
    if not mapped:
        return 0, None, None
    mapped = sorted(mapped, key=lambda m: (m.score, m.height), reverse=True)
    a1 = mapped[0]
    a2 = None
    if len(mapped) > 1:
        for cand in mapped[1:]:
            if abs(cand.allele_size - a1.allele_size) >= MIN_ALLELE_SEP_PB and cand.height >= a1.height * ALLELE2_MIN_RATIO:
                a2 = cand
                break
    cnt = 1 if a1 is not None else 0
    if a2 is not None:
        cnt = 2
        # ordena por size
        if a2.allele_size < a1.allele_size:
            a1, a2 = a2, a1
    return cnt, a1, a2

# IO / WORKERS
def _read_raw_csv_from_zip(zf: zipfile.ZipFile, member: str) -> pd.DataFrame:
    with zf.open(member, "r") as f:
        df = pd.read_csv(f, sep=";")
    df.columns = [c.strip() for c in df.columns]
    return df

def _worker_process_plant(plant_id: int, panel: Dict[int, List[float]]) -> Dict:
    with zipfile.ZipFile(ZIP_PATH, "r") as zf:
        base = get_zip_base_dir(zf)
        prefix = f"{base}/" if base else ""
        m1_name = f"{prefix}M1_pl{plant_id}.csv"
        m2_name = f"{prefix}M2_pl{plant_id}.csv"

        df1 = _read_raw_csv_from_zip(zf, m1_name)
        df2 = _read_raw_csv_from_zip(zf, m2_name)

    # raw axe
    raw_x1 = df1["molw"].to_numpy(dtype=float)
    raw_x2 = df2["molw"].to_numpy(dtype=float)

    # calibration by  multiplex (through channel 5)
    interp1 = calibrate_axis_liz500(raw_x1, df1["channel_5"].to_numpy())
    interp2 = calibrate_axis_liz500(raw_x2, df2["channel_5"].to_numpy())

    if interp1 is None:
        interp1 = PchipInterpolator([raw_x1.min(), raw_x1.max()], [raw_x1.min(), raw_x1.max()], extrapolate=True)
    if interp2 is None:
        interp2 = PchipInterpolator([raw_x2.min(), raw_x2.max()], [raw_x2.min(), raw_x2.max()], extrapolate=True)


    inv1 = make_inverse_mapper(raw_x1, interp1)
    inv2 = make_inverse_mapper(raw_x2, interp2)
    ch5_1 = df1["channel_5"].to_numpy(dtype=float)
    ch5_2 = df2["channel_5"].to_numpy(dtype=float)

    m1_raw_channels = {1: df1["channel_1"].to_numpy(dtype=float),
                       2: df1["channel_2"].to_numpy(dtype=float),
                       3: df1["channel_3"].to_numpy(dtype=float),
                       4: df1["channel_4"].to_numpy(dtype=float)}
    m2_raw_channels = {1: df2["channel_1"].to_numpy(dtype=float),
                       2: df2["channel_2"].to_numpy(dtype=float),
                       3: df2["channel_3"].to_numpy(dtype=float),
                       4: df2["channel_4"].to_numpy(dtype=float)}

    m1_corr_channels, m1_alphas = subtract_ladder_bleed(raw_x1, ch5_1, m1_raw_channels, inv1, LIZ_500_SIZES)
    m2_corr_channels, m2_alphas = subtract_ladder_bleed(raw_x2, ch5_2, m2_raw_channels, inv2, LIZ_500_SIZES)

    # pre processing for score
    m1_bc = {}
    for mid, (mx, ch) in MARKER_MAP.items():
        if mx != 1 or mid > 4:
            continue
        x_f, bc = _preprocess_for_scoring(raw_x1, m1_corr_channels[ch], X_MIN_ALLELES)
        m1_bc[mid] = (x_f, bc)
    l1_bc = _preprocess_for_scoring(raw_x1, df1["channel_5"].to_numpy(), X_MIN_LADDER)
    
    m2_bc = {}
    for mid, (mx, ch) in MARKER_MAP.items():
        if mx != 2 or mid < 5:
            continue
        x_f, bc = _preprocess_for_scoring(raw_x2, m2_corr_channels[ch], X_MIN_ALLELES)
        m2_bc[mid] = (x_f, bc)
    l2_bc = _preprocess_for_scoring(raw_x2, df2["channel_5"].to_numpy(), X_MIN_LADDER)
    
    marker_peaks = {}
    for marker_id, (mx, ch) in MARKER_MAP.items():
        if mx == 1:
            y = m1_corr_channels[ch]
            marker_peaks[marker_id] = detect_peaks_subpixel(raw_x1, y, interp1)
        else:
            y = m2_corr_channels[ch]
            marker_peaks[marker_id] = detect_peaks_subpixel(raw_x2, y, interp2)

    # crosstalk 
    marker_peaks = suppress_crosstalk(marker_peaks)


    # snap + stutter protected
    calls = {}
    for marker_id in range(1, 9):
        mapped = snap_to_panel(marker_peaks.get(marker_id, []), panel.get(marker_id, []))

        if marker_id <= 4:
            mapped = _compute_scores_for_mapped(marker_id, mapped, m1_bc, l1_bc, bp2raw=inv1)
        else:
            mapped = _compute_scores_for_mapped(marker_id, mapped, m2_bc, l2_bc, bp2raw=inv2)

        mapped = filter_stutters_panel_protected(mapped)
        allele_count, a1, a2 = call_alleles(mapped)
        calls[marker_id] = (allele_count, a1, a2)

    return {"plant_id": plant_id, "calls": calls}



def generate_plot(plant_id: int, df: pd.DataFrame, interp: PchipInterpolator, out_path: Path):
    raw_x = df["molw"].to_numpy(dtype=float)
    x = raw_x
    true_x = interp(x)
    fig, ax = plt.subplots(figsize=(12, 6))
    ax.plot(true_x, df["channel_1"], label="Ch1")
    ax.plot(true_x, df["channel_2"], label="Ch2")
    ax.plot(true_x, df["channel_3"], label="Ch3")
    ax.plot(true_x, df["channel_4"], label="Ch4")
    ax.plot(true_x, df["channel_5"], label="Ladder (Ch5)", alpha=0.8)
    ax.set_title(f"Plant {plant_id} - Multiplex (calibrated)")
    ax.set_xlabel("True size (bp)")
    ax.set_ylabel("RFU")
    ax.set_xlim(0, X_MAX)
    ax.legend()
    out_path.parent.mkdir(parents=True, exist_ok=True)
    fig.tight_layout()
    fig.savefig(out_path, dpi=140)
    plt.close(fig)

## Run classifier + optional validation

In [18]:
def main():
    OUT_DIR.mkdir(parents=True, exist_ok=True)
    TABLES_DIR.mkdir(parents=True, exist_ok=True)
    PLOTS_DIR.mkdir(parents=True, exist_ok=True)

    panel = load_and_build_panel(LABEL_DIR)

    plants = list_plants_in_zip(ZIP_PATH)
    print(f"Encontradas {len(plants)} plantas no zip.")

    results = []
    if RUN_PARALLEL:
        with concurrent.futures.ProcessPoolExecutor(max_workers=MAX_WORKERS) as ex:
            futs = [ex.submit(_worker_process_plant, pid, panel) for pid in plants]
            with tqdm(total=len(futs), desc="Chamando alelos (EC)", unit="planta") as pbar:
                for f in concurrent.futures.as_completed(futs):
                    results.append(f.result())
                    pbar.update(1)
    else:
        for pid in tqdm(plants, desc="Chamando alelos (EC)", unit="planta"):
            results.append(_worker_process_plant(pid, panel))

    results = sorted(results, key=lambda d: d["plant_id"])

    rows = []
    for r in results:
        row = {"plant_id": int(r["plant_id"])}
        for m_id in range(1, 9):
            allele_count, a1, a2 = r["calls"][m_id]
            prefix = f"miR{m_id}"
            if DUPLICATE_HOMOZYGOUS and (a1 is not None) and (a2 is None) and (allele_count == 1):
                a2 = a1
                if HOMO_AS_TWO_ALLELES:
                    allele_count = 2

            row[f"{prefix}_allele_count"] = int(allele_count)
            row[f"{prefix}_a1_size"] = np.nan if a1 is None else float(a1.allele_size)
            row[f"{prefix}_a2_size"] = np.nan if a2 is None else float(a2.allele_size)
            row[f"{prefix}_a1_raw_pos"] = np.nan if a1 is None else float(a1.calibrated_pos)
            row[f"{prefix}_a2_raw_pos"] = np.nan if a2 is None else float(a2.calibrated_pos)
            row[f"{prefix}_a1_prom"] = np.nan if a1 is None else float(a1.prom)
            row[f"{prefix}_a2_prom"] = np.nan if a2 is None else float(a2.prom)
            row[f"{prefix}_present"] = int(allele_count > 0)
        rows.append(row)

    wide = pd.DataFrame(rows)
    wide.to_csv(TABLES_DIR / "miRNA_allele_calls_wide.csv", index=False)

    # builds the "long"output
    long_rows = []
    for r in results:
        pid = int(r["plant_id"])
        for m_id in range(1, 9):
            allele_count, a1, a2 = r["calls"][m_id]
            if allele_count == 0:
                long_rows.append({"plant_id": pid, "marker_id": m_id, "allele_rank": 0,
                                  "allele_size": np.nan, "calibrated_pos": np.nan, "prom": np.nan, "height": np.nan})
            else:
                if DUPLICATE_HOMOZYGOUS and (a1 is not None) and (a2 is None) and (allele_count == 1):
                    a2 = a1
                    if HOMO_AS_TWO_ALLELES:
                        allele_count = 2

                for rank, a in [(1, a1), (2, a2)]:
                    if a is None: 
                        continue
                    long_rows.append({"plant_id": pid, "marker_id": m_id, "allele_rank": rank,
                                      "allele_size": float(a.allele_size),
                                      "calibrated_pos": float(a.calibrated_pos),
                                      "prom": float(a.prom),
                                      "height": float(a.height)})
    long_df = pd.DataFrame(long_rows)
    long_df.to_csv(TABLES_DIR / "miRNA_allele_calls_long.csv", index=False)

    print(" Classification ended.")
    print("Saídas:")
    print(" -", TABLES_DIR / "miRNA_allele_calls_wide.csv")
    print(" -", TABLES_DIR / "miRNA_allele_calls_long.csv")

# PREPROCESSING FOR  SCORE (anti-artefacts)
def _preprocess_for_scoring(raw_x: np.ndarray, y: np.ndarray, x_min: float) -> Tuple[np.ndarray, np.ndarray]:
    mask = (raw_x >= x_min) & (raw_x <= X_MAX)
    x = raw_x[mask].astype(float)
    sig = y[mask].astype(float)
    sig_s = safe_savgol(sig, SMOOTH_WINDOW, SMOOTH_POLYORDER)
    base = baseline_als(sig_s, lam=8e5 if x_min >= 100 else 5e5, p=0.001, niter=10)
    sig_bc = np.clip(sig_s - base, 0, None)
    return x, sig_bc

def _local_max(x: np.ndarray, sig_bc: np.ndarray, raw_pos: float, w: int = MULTI_WINDOW_SAMPLES) -> float:
    if len(x) == 0:
        return 0.0
    idx = int(np.argmin(np.abs(x - raw_pos)))
    lo = max(0, idx - w)
    hi = min(len(sig_bc), idx + w + 1)
    return float(np.max(sig_bc[lo:hi])) if hi > lo else float(sig_bc[idx])


def _compute_scores_for_mapped(marker_id: int,
                              mapped: List[MappedPeak],
                              sig_bc_channels: Dict[int, Tuple[np.ndarray, np.ndarray]],
                              ladder_bc: Tuple[np.ndarray, np.ndarray],
                              bp2raw: Optional[PchipInterpolator] = None) -> List[MappedPeak]:
    if not mapped:
        return mapped

    lx, lbc = ladder_bc
    ladder_ref = float(np.quantile(lbc, LADDER_STRONG_Q)) + 1e-9

    def raw_at_bp(bp: float, fallback_raw: float) -> float:
        if bp2raw is None:
            return fallback_raw
        try:
            v = float(bp2raw(bp))
            if math.isfinite(v):
                return v
        except Exception:
            pass
        return fallback_raw

    for mp in mapped:
        rpos = raw_at_bp(mp.allele_size, mp.raw_pos)

        x_main, bc_main = sig_bc_channels[marker_id]
        main = _local_max(x_main, bc_main, rpos)

        other = 0.0
        for mid in sig_bc_channels.keys():
            if mid == marker_id:
                continue
            xo, bco = sig_bc_channels[mid]
            other += _local_max(xo, bco, rpos)

        multi_ratio = other / (main + 1e-9)

        lad = _local_max(lx, lbc, rpos)
        near_liz = float(np.min(np.abs(LIZ_500_SIZES - mp.allele_size))) <= LADDER_MATCH_PB
        ladder_strong = (lad / ladder_ref) >= 0.5
        ladder_ratio = main / (lad + 1e-9)

        flags = []
        score = float(mp.prom)

        if marker_id == 2 and abs(mp.allele_size - 250.0) <= LADDER_MATCH_PB and ladder_strong and ladder_ratio < 0.20:
            mp.score = 0.0
            mp.multi_ratio = float(multi_ratio)
            mp.ladder_ratio = float(ladder_ratio)
            mp.flags = "BLACKLIST250"
            continue

        if marker_id == 2 and abs(mp.allele_size - 148.0) <= LADDER_MATCH_PB and multi_ratio >= 0.25:
            mp.score = 0.0
            mp.multi_ratio = float(multi_ratio)
            mp.ladder_ratio = float(ladder_ratio)
            mp.flags = "BLACKLIST148"
            continue

        # multichannel penalisation
        if multi_ratio > MULTI_RATIO_MAX:
            flags.append("MULTI")
            score *= max(0.02, 1.0 - min(1.0, multi_ratio))
            if multi_ratio >= MULTI_RATIO_HARD:
                score *= 0.05

        # penalisation ladder bleed (general)
        if near_liz and ladder_strong and ladder_ratio < LADDER_BLEED_RATIO_MAX:
            flags.append("LADDER_BLEED")
            score *= 0.03

        mp.score = float(score)
        mp.multi_ratio = float(multi_ratio)
        mp.ladder_ratio = float(ladder_ratio)
        mp.flags = ",".join(flags)

    return mapped


    # reference ladder 
    lx, lbc = ladder_bc
    ladder_ref = float(np.quantile(lbc, LADDER_STRONG_Q)) + 1e-9

    for mp in mapped:
        x_main, bc_main = sig_bc_channels[marker_id]
        main = _local_max(x_main, bc_main, mp.raw_pos)
        other = 0.0
        for mid in sig_bc_channels.keys():
            if mid == marker_id:
                continue
            xo, bco = sig_bc_channels[mid]
            other += _local_max(xo, bco, mp.raw_pos)

        multi_ratio = other / (main + 1e-9)

        lad = _local_max(lx, lbc, mp.raw_pos)
        near_liz = float(np.min(np.abs(LIZ_500_SIZES - mp.allele_size))) <= LADDER_MATCH_PB
        ladder_strong = (lad / ladder_ref) >= 0.5  
        ladder_ratio = main / (lad + 1e-9)

        flags = []
        score = float(mp.prom)

        if multi_ratio > MULTI_RATIO_MAX:
            flags.append("MULTI")
            score *= max(0.05, 1.0 - min(1.0, multi_ratio))  # penaliza progressivo
            if multi_ratio >= MULTI_RATIO_HARD:
                score *= 0.10

        if near_liz and ladder_strong and ladder_ratio < LADDER_BLEED_RATIO_MAX:
            flags.append("LADDER_BLEED")
            score *= 0.05

        mp.score = float(score)
        mp.multi_ratio = float(multi_ratio)
        mp.ladder_ratio = float(ladder_ratio)
        mp.flags = ",".join(flags)

    return mapped

if __name__ == "__main__":
    main()

Encontradas 384 plantas no zip.


Chamando alelos (EC): 100%|█████████████████| 384/384 [03:30<00:00,  1.83planta/s]

✅ Classificação concluída.
Saídas:
 - outputs_v20_4/tables/miRNA_allele_calls_wide.csv
 - outputs_v20_4/tables/miRNA_allele_calls_long.csv



