In [1]:
from pathlib import Path
import numpy as np
import matplotlib.pyplot as plt
from preprocess import (
    load_tdms_first_channel, PreParams, prefilter_pipeline, time_axis,
    equi_ripple_highpass_fir, equi_ripple_lowpass_fir, fir_freq_response,
    detect_r_peaks, PeakParams, qc_metrics
)

TDMS_PATH = r"E:\ML algoritme tl anfaldsdetektion vha HRV\ePatch data from Aarhus to Lausanne\Patients ePatch data\Patient 5\recording 1\Patient 5_1.tdms"  # <-- SÆT DENNE
FS_OVERRIDE = None  # fx 512.0 hvis wf_increment mangler

P = PreParams(
    drop_start_s = 300.0,
    drop_end_s = 300.0,
    target_fs = 512.0,  # resample hvis ønsket
    hp_pass_hz = 30.0,  # baseline
    hp_stop_hz = 1.0,
    lp_pass_hz = 32.0,  # QRS-fokus
    lp_stop_hz = 50.0
)

In [2]:
out = load_tdms_first_channel(TDMS_PATH)
x = out["signal"]
x_raw = x[:1000000]
fs = out["fs"]
t0 = out["start_time"]

print(f"Signal: {len(x_raw)} samples, fs={fs:.2f} Hz, starttid={t0}")

Signal: 1000000 samples, fs=512.00 Hz, starttid=2016-10-12T09:05:02.000000


In [3]:
# eksempel-brug
from preprocess import prefilter_pipeline, PreParams
from rpeak_jeppesen import JeppesenRPeak, RPeakParams

# 1) preprocess
stages = prefilter_pipeline(x_raw, fs, P)
x_sm = stages["smoothed"]
fsr   = float(stages["_fs"][0])


In [4]:

# 2) R-peak
rp = RPeakParams(fs=fsr)


In [5]:
print(rp)

RPeakParams(fs=512.0, win_s=2.0, refractory_s=0.25, thigh_scale=0.75, thigh_peaks=8, tlow_scale_posmean=2.0, tlow_max_frac=0.4, theta_samples_cut=35.0, s2_high=12, s2_low=10, search_30samples_ref_512=30, rshort_n=8, rrlong_n=34, rmax_cap_s=1.2)


In [6]:
# det = JeppesenRPeak(rp).detect(x_sm)

# rpeaks = det["peaks_idx"]
# rr     = det["rr_s"]


In [7]:
import numpy as np
from dataclasses import dataclass, field
from typing import Callable, Dict, List, Tuple, Optional

# ---------- Utils ----------

def _clean_1d(x: np.ndarray) -> np.ndarray:
    x = np.asarray(x, float).ravel()
    bad = ~np.isfinite(x)
    if bad.any():
        idx = np.arange(x.size)
        x[bad] = np.interp(idx[bad], idx[~bad], x[~bad])
    return x

def _assert_1d(x: np.ndarray, name="x"):
    if x.ndim != 1:
        raise ValueError(f"{name} must be 1-D, got shape {x.shape}")

def _local_max_idx(x: np.ndarray, i0: int, radius: int) -> int:
    st = max(0, i0 - radius)
    en = min(x.size, i0 + radius + 1)
    if en <= st + 1:
        return i0
    return st + int(np.argmax(x[st:en]))

# ---------- Parametre ----------

@dataclass
class RPeakParams:
    fs: float                   # Hz (efter preprocess/resample)
    win_s: float = 2.0          # vindueslængde til Thigh/Tlow
    refractory_s: float = 0.25  # min RR
    thigh_scale: float = 0.75
    thigh_topN: int = 8         # antal top-lokalmaks til Thigh-estimat
    tlow_max_frac: float = 0.40
    # Variabilitet
    theta_cut_samples_512: float = 35.0  # svarer til ~68 ms ved 512 Hz
    s2_high: int = 12
    s2_low: int = 10
    # Searchpoint
    search_window_30samples_512: int = 30
    rshort_n: int = 8
    rrlong_n: int = 34
    rmax_cap_s: float = 1.2
    # Lokalisering
    localise_radius_s: float = 0.05  # ±50 ms

@dataclass
class RPeakDebugStats:
    steps: int = 0
    peaks_found: int = 0
    used_thigh: int = 0
    used_tlow: int = 0
    used_failsafe: int = 0
    overdue_count: int = 0
    refractory_skips: int = 0
    bound_clamps: int = 0

# ---------- Detektor ----------

class JeppesenRPeakDebug:
    """
    Debug-venlig Jeppesen-lignende R-peak detektor.
    - defensive checks
    - step_limit for at undgå kernel hang
    - debug_fn(event, **data) callback eller print(debug=True)
    """
    def __init__(self, p: RPeakParams):
        self.p = p
        self.stats = RPeakDebugStats()

    # --------- hjælpefunktioner ---------
    def _window_edges(self, N: int) -> List[Tuple[int, int]]:
        L = max(1, int(round(self.p.win_s * self.p.fs)))
        return [(i, min(N, i + L)) for i in range(0, N, L)]

    def _top_local_max_in_window(self, seg: np.ndarray, topN: int) -> float:
        # robust topN: brug simple "block maxima" via stride for O(N)
        if seg.size == 0:
            return 0.0
        # grov lokalmaks: sammenlign naboer
        idx = np.arange(1, seg.size-1)
        loc = idx[(seg[idx] >= seg[idx-1]) & (seg[idx] >= seg[idx+1])]
        vals = seg[loc] if loc.size else np.array([seg.max()])
        vals = np.sort(vals)
        vals = vals[-min(topN, vals.size):]
        return float(np.median(vals)) if vals.size else float(seg.max())

    def _thigh_series(self, x: np.ndarray) -> np.ndarray:
        thigh = np.zeros_like(x, float)
        for st, en in self._window_edges(x.size):
            T = self.p.thigh_scale * self._top_local_max_in_window(x[st:en], self.p.thigh_topN)
            thigh[st:en] = T
        return thigh

    def _variability_mode(self, rr_s: List[float]) -> str:
        if len(rr_s) < self.p.rrlong_n:
            return "low"
        rr = np.asarray(rr_s[-self.p.rrlong_n:], float)
        med = np.median(rr)
        dev = np.abs(rr - med)
        if dev.size >= 2:
            # fjern to største
            order = np.argsort(dev)
            mask = np.ones_like(dev, dtype=bool)
            mask[order[-2:]] = False
            base = dev[mask] if mask.any() else dev
        else:
            base = dev
        theta = float(np.mean(base)) if base.size else 0.0
        theta_samples = theta * self.p.fs
        cut = self.p.theta_cut_samples_512 * (self.p.fs / 512.0)
        return "high" if theta_samples > cut else "low"

    def _tlow_series(self, x: np.ndarray, thigh: np.ndarray, peaks_idx: List[int]) -> np.ndarray:
        tlow = np.zeros_like(x, float)
        peaks = np.asarray(peaks_idx, int)
        for wi, (st, en) in enumerate(self._window_edges(x.size)):
            # to foregående vinduer: [st2 : st)
            edges = self._window_edges(x.size)
            st2 = edges[max(0, wi-2)][0]
            seg = x[st2:st]
            pos = seg[seg > 0.0]
            mu_pos = float(pos.mean()) if pos.size else 0.0
            s1 = int(((peaks >= st2) & (peaks < st)).sum()) if peaks.size else 0
            mode = self._variability_mode(np.diff(peaks)/self.p.fs if peaks.size >= 2 else [])
            s2 = self.p.s2_high if mode == "high" else self.p.s2_low
            T = 2.0 * mu_pos * (s1 / max(1, s2))
            # clamp til 0.4*Thigh (brug lokal middel for robusthed)
            cap = self.p.tlow_max_frac * (float(thigh[st:en].mean()) if en > st else 0.0)
            tlow[st:en] = min(T, cap)
        return tlow

    def _rmax_samp(self, rr_s: List[float], mode: str) -> int:
        if not rr_s:
            return int(round(0.8 * self.p.fs))
        rr = np.asarray(rr_s, float)
        rr_long = np.median(rr[-self.p.rrlong_n:])
        if mode == "high":
            rr_short = np.median(rr[-self.p.rshort_n:])
            rr_temp = min(rr_short, rr_short)  # proxy for searchback
        else:
            rr_temp = rr_long
        return int(round(min(1.2 * rr_temp, self.p.rmax_cap_s) * self.p.fs))

    def _searchpoint(self, last_idx: int, rr_s: List[float]) -> int:
        fs = self.p.fs
        if not rr_s:
            return last_idx + int(0.8 * fs)
        rr_short = np.median(rr_s[-self.p.rshort_n:])
        rr_samples = int(round(rr_short * fs))
        thr350 = int(round(350 * fs / 512.0))
        thr467 = int(round(467 * fs / 512.0))
        if rr_samples < thr350:
            return last_idx + rr_samples
        elif rr_samples < thr467:
            return last_idx + thr350
        else:
            return last_idx + int(round(0.75 * rr_samples))

    # --------- main ----------
    def detect(self,
               x_in: np.ndarray,
               debug: bool = False,
               debug_fn: Optional[Callable[[str, Dict], None]] = None,
               step_limit: int = 5_000_000) -> Dict[str, np.ndarray]:

        x = _clean_1d(x_in)
        _assert_1d(x, "x")
        N = x.size
        if N < 10:
            return {"peaks_idx": np.array([], int), "rr_s": np.array([], float),
                    "Thigh": np.zeros(N), "Tlow": np.zeros(N), "stats": self.stats}

        fs = self.p.fs
        min_rr = int(round(self.p.refractory_s * fs))
        radius = max(1, int(round(self.p.localise_radius_s * fs)))
        thirty = int(round(self.p.search_window_30samples_512 * (fs / 512.0)))

        peaks: List[int] = []
        rr_s: List[float] = []

        # Forudberegn tærskler (kan recomputes løbende ved behov)
        thigh = self._thigh_series(x)
        tlow  = self._tlow_series(x, thigh, peaks)

        i = 0
        while i < N:
            self.stats.steps += 1
            if self.stats.steps > step_limit:
                raise RuntimeError(f"Step limit {step_limit} exceeded (possible infinite loop).")

            # refraktær
            if peaks and (i - peaks[-1]) < min_rr:
                self.stats.refractory_skips += 1
                i = peaks[-1] + min_rr
                continue

            mode = self._variability_mode(rr_s)
            rmax = self._rmax_samp(rr_s, mode)
            overdue = (not peaks) or ((i - peaks[-1]) >= rmax)
            if overdue:
                self.stats.overdue_count += 1

            # primær: søg omkring searchpoint ift Thigh
            found = False
            if not overdue and peaks:
                sp = self._searchpoint(peaks[-1], rr_s)
                # venstre/højre skæringer mod Thigh
                left_i  = max(0, sp - thirty)
                right_i = min(N - 1, sp + thirty)
                if right_i <= left_i:
                    self.stats.bound_clamps += 1
                cand = []
                try:
                    lv = np.where(x[left_i:sp] >= thigh[left_i:sp])[0]
                    if lv.size: cand.append(left_i + lv[-1])
                    rv = np.where(x[sp:right_i+1] >= thigh[sp:right_i+1])[0]
                    if rv.size: cand.append(sp + rv[0])
                except Exception as e:
                    if debug or debug_fn:
                        msg = {"err": str(e), "i": i, "sp": sp, "left_i": left_i, "right_i": right_i, "N": N}
                        (debug_fn or print)("thigh_search_error", msg)
                    cand = []

                if cand:
                    # begge indenfor 30 samples → vælg højeste lokalmaks
                    if len(cand) == 2 and abs(cand[1] - cand[0]) <= thirty:
                        c0 = _local_max_idx(x, cand[0], radius)
                        c1 = _local_max_idx(x, cand[1], radius)
                        ci = c0 if x[c0] >= x[c1] else c1
                    else:
                        # vælg bedste af (lokalmaks ved hver kandidat)
                        best, best_amp = None, -np.inf
                        for c in cand:
                            cc = _local_max_idx(x, c, radius)
                            amp = x[cc]
                            if amp > best_amp:
                                best_amp, best = amp, cc
                        ci = best

                    if ci is not None and ((not peaks) or (ci - peaks[-1] >= min_rr)):
                        peaks.append(ci)
                        if len(peaks) >= 2:
                            rr_s.append((peaks[-1] - peaks[-2]) / fs)
                        tlow = self._tlow_series(x, thigh, peaks)  # opdater
                        self.stats.peaks_found += 1
                        self.stats.used_thigh += 1
                        if debug or debug_fn:
                            (debug_fn or print)("peak_thigh", {"i": i, "ci": ci, "amp": float(x[ci])})
                        i = ci + min_rr
                        found = True

            if found:
                continue

            # sekundær: Tlow/searchback eller init
            if (peaks and (i - peaks[-1]) >= rmax) or not peaks:
                try:
                    rel = np.where(x[i:] >= tlow[i:])[0]
                except Exception as e:
                    if debug or debug_fn:
                        (debug_fn or print)("tlow_search_error", {"err": str(e), "i": i, "N": N})
                    rel = np.array([], int)

                if rel.size:
                    j = i + rel[0]
                    ci = _local_max_idx(x, j, radius)
                    if (not peaks) or (ci - peaks[-1] >= min_rr):
                        peaks.append(ci)
                        if len(peaks) >= 2:
                            rr_s.append((peaks[-1] - peaks[-2]) / fs)
                        tlow = self._tlow_series(x, thigh, peaks)
                        self.stats.peaks_found += 1
                        self.stats.used_tlow += 1
                        if debug or debug_fn:
                            (debug_fn or print)("peak_tlow", {"i": i, "ci": ci, "amp": float(x[ci])})
                        i = ci + min_rr
                        continue

                # failsafe: 2 s efter sidste peak
                if peaks and (i - peaks[-1]) >= int(round(2.0 * fs)):
                    last = peaks[-1]
                    # start ~min(RR_short, 1.2 s)
                    if rr_s:
                        rrs = min(np.median(rr_s[-self.p.rshort_n:]), self.p.rmax_cap_s)
                    else:
                        rrs = 0.8
                    j0 = last + int(round(rrs * fs))
                    j1 = last + int(round((self.p.refractory_s + 2.0) * fs))
                    j0 = max(0, j0); j1 = min(N, j1)
                    if j1 > j0:
                        ci = j0 + int(np.argmax(x[j0:j1]))
                        if (not peaks) or (ci - peaks[-1] >= min_rr):
                            peaks.append(ci)
                            if len(peaks) >= 2:
                                rr_s.append((peaks[-1] - peaks[-2]) / fs)
                            tlow = self._tlow_series(x, thigh, peaks)
                            self.stats.peaks_found += 1
                            self.stats.used_failsafe += 1
                            if debug or debug_fn:
                                (debug_fn or print)("peak_failsafe", {"i": i, "ci": ci, "amp": float(x[ci])})
                            i = ci + min_rr
                            continue

            # fremad
            i += 1

        return {
            "peaks_idx": np.asarray(peaks, int),
            "rr_s": np.asarray(rr_s, float),
            "Thigh": thigh,
            "Tlow": tlow,
            "stats": self.stats,
        }


In [8]:
# x_sm = stages["smoothed"]; fsr = float(stages["_fs"][0])
p = RPeakParams(fs=fsr)


In [9]:

def dbg(event, data):
    # skriv kort log for spæde fejl
    if event.endswith("_error"):
        print(event, data)
    # evt. logge hvert 1000. step eller kun peaks:
    if event.startswith("peak_"):
        print(event, data)


In [13]:

# det = JeppesenRPeakDebug(p).detect(x_sm, debug=False, debug_fn=dbg, step_limit=2_000_000)
det = JeppesenRPeak(p).detect(x_sm)
print(det["stats"])
print("Peaks:", det["peaks_idx"][:10], "...", det["peaks_idx"][-10:])


AttributeError: 'RPeakParams' object has no attribute 'thigh_peaks'