In [6]:
import os
import numpy as np
import re

# --- GT mapping for conditions ---
CONDITION_GT = {
    'A': {'name': "Healthy rest", 'RR': 14,  'HR': 80},
    'B': {'name': "Healthy sleeping", 'RR': 12, 'HR': 60},
    'C': {'name': "Healthy agitated", 'RR': 20, 'HR': 110},
    'D': {'name': "Apnea", 'RR': 0, 'HR': 80},
    'E': {'name': "Asthma", 'RR': 22, 'HR': 90},
    'F': {'name': "Asthma attack", 'RR': 33, 'HR': 130},
    'G': {'name': "pneumothorax", 'RR': 30, 'HR': 120},
    'H': {'name': "Bradypnea", 'RR': 6, 'HR': 50},
    'I': {'name': "Cardiac arrest", 'RR': 16, 'HR': 160},
    'J': {'name': "ACS Chest Pain", 'RR': 18, 'HR': 104},
    'K': {'name': "Tachynpea Hyperventilation", 'RR': 30, 'HR': 125}
}

# --- 1. Filename Metadata Parser ---
def parse_iwr6843_filename(filename):
    basename = os.path.basename(filename)
    pattern = (
        r'(?P<id>\d+)_.*_Man\d+_'
        r'Cond(?P<cond_a>[A-Z])_Cond(?P<cond_b>[A-Z])_Cond(?P<cond_c>[A-Z])_'
        r'ADC(?P<adc>\d+)_Chirp(?P<chirp>\d+)_SR(?P<sr>\d+)_RE(?P<re>\d+)_FR(?P<fr>\d+)_Gain(?P<gain>\d+)_FS(?P<fs>\d+)_IWR'
    )
    m = re.match(pattern, basename)
    if m:
        params = {k: (int(v) if v.isdigit() else v) for k, v in m.groupdict().items()}
        return params
    else:
        raise ValueError(f"Filename {basename} does not match expected pattern")

# --- 2. Data Decoder Generalized ---
def decode_iwr6843_data(filepath, params, num_rx=4, header_bytes=0):
    num_adc_samples = int(params['adc'])
    num_chirps      = int(params['chirp'])

    with open(filepath, 'rb') as f:
        f.seek(header_bytes)
        raw = np.fromfile(f, dtype=np.int16)

    # Ensure I/Q pairing
    if raw.size % 2 != 0:
        raw = raw[:-1]

    raw = raw.reshape(-1, 2)
    complex_data = raw[:, 0].astype(np.float32) + 1j * raw[:, 1].astype(np.float32)

    samp_pf    = num_rx * num_chirps * num_adc_samples
    num_frames = complex_data.size // samp_pf
    if num_frames == 0:
        print(f"Warning: No frames found for {filepath}. Check ADC/Chirp parameters.")
        return None

    complex_data = complex_data[: num_frames * samp_pf]

    adc = np.empty((num_frames, num_rx, num_chirps, num_adc_samples), dtype=np.complex64)
    for fr in range(num_frames):
        base = fr * samp_pf
        for rx in range(num_rx):
            st = base + rx * (num_chirps * num_adc_samples)
            en = st + (num_chirps * num_adc_samples)
            adc[fr, rx] = complex_data[st:en].reshape(num_chirps, num_adc_samples)
    return adc


# --- 3. RD Spectrogram Generator ---
def generate_rd_spectrogram(adc_tensor):
    """
    adc_tensor: (F, RX, C_chirps, S_samples)
    returns rd: (F, RX, C_doppler, S_range)
    """
    if adc_tensor is None:
        return None
    range_fft = np.fft.fft(adc_tensor, axis=-1)        # range (samples -> bins)
    rd = np.fft.fftshift(np.fft.fft(range_fft, axis=2), axes=2)  # doppler (chirps -> bins)
    return rd

# --- 4. Simple Mask-Based Separation (Spatial Slicing) ---
def mannequin_mask(rd, mannequin_idx, num_mannequins=3):
    """
    Slice RD by RANGE bins into equal contiguous blocks.
    rd: (F, RX, C_doppler, S_range)
    """
    if rd is None:
        return None
    num_range_bins = rd.shape[-1]
    bins_per_man = num_range_bins // num_mannequins
    start = mannequin_idx * bins_per_man
    end   = (mannequin_idx + 1) * bins_per_man if mannequin_idx < num_mannequins - 1 else num_range_bins

    mask = np.zeros_like(rd, dtype=np.float32)
    mask[..., start:end] = 1.0
    return rd * mask

# --- 5. HR/RR Estimation (FFT on slow time) ---
def estimate_hr_rr(rd_masked, params):
    """
    Estimate RR/HR from slow-time (frame) variation of RD energy.

    rd_masked: shape (F, RX, C, S)
    params['fr']: frame period in ms (from filename, e.g., FR40 => 40 ms)
    """
    if rd_masked is None:
        return None, None

    # Collapse RX, Doppler, Range -> 1D time series over frames
    # axes: (F, RX, C, S) -> mean over (1,2,3) => (F,)
    time_series = np.abs(rd_masked).mean(axis=(1, 2, 3))

    if time_series.size < 4:
        return None, None

    # Sample spacing in seconds between frames
    dt = float(params['fr']) / 1000.0

    # One-sided spectrum (non-negative freqs)
    ts_fft = np.fft.rfft(time_series)
    freqs  = np.fft.rfftfreq(time_series.size, d=dt)

    # Physiological bands (Hz)
    rr_band = (0.10, 1.00)   # 6–60 bpm
    hr_band = (1.00, 3.00)   # 60–180 bpm

    rr_mask = (freqs >= rr_band[0]) & (freqs <= rr_band[1])
    hr_mask = (freqs >= hr_band[0]) & (freqs <= hr_band[1])

    RR = None
    HR = None

    if np.any(rr_mask):
        rr_idx  = np.argmax(np.abs(ts_fft[rr_mask]))
        rr_peak = freqs[rr_mask][rr_idx]
        RR = rr_peak * 60.0 if rr_peak > 0 else None

    if np.any(hr_mask):
        hr_idx  = np.argmax(np.abs(ts_fft[hr_mask]))
        hr_peak = freqs[hr_mask][hr_idx]
        HR = hr_peak * 60.0 if hr_peak > 0 else None

    return RR, HR

# --- 6. Metrics ---
def compute_metrics(results):
    if not results:
        return {"RR_MAE": None, "RR_RMSE": None, "HR_MAE": None, "HR_RMSE": None}
    from sklearn.metrics import mean_absolute_error, mean_squared_error
    rr_est = [r['estimated_RR'] for r in results if r['gt_RR'] is not None and r['estimated_RR'] is not None]
    rr_gt  = [r['gt_RR'] for r in results if r['gt_RR'] is not None and r['estimated_RR'] is not None]
    hr_est = [r['estimated_HR'] for r in results if r['gt_HR'] is not None and r['estimated_HR'] is not None]
    hr_gt  = [r['gt_HR'] for r in results if r['gt_HR'] is not None and r['estimated_HR'] is not None]

    metrics = {
        "RR_MAE": mean_absolute_error(rr_gt, rr_est) if rr_gt else None,
        "RR_RMSE": np.sqrt(mean_squared_error(rr_gt, rr_est)) if rr_gt else None,
        "HR_MAE": mean_absolute_error(hr_gt, hr_est) if hr_gt else None,
        "HR_RMSE": np.sqrt(mean_squared_error(hr_gt, hr_est)) if hr_gt else None,
    }
    return metrics

# --- 7. Pipeline Runner ---
def run_pipeline(folder):
    files = [f for f in os.listdir(folder) if f.endswith('.bin') and '_IWR' in f]
    print(f"Found {len(files)} files in {folder}: {files}")
    if not files:
        print("WARNING: No radar files found. Check your folder path and file naming.")
    results = []
    for fname in files:
        fpath = os.path.join(folder, fname)
        try:
            params = parse_iwr6843_filename(fpath)
        except Exception as e:
            print(f"Skipping file {fname}: {e}")
            continue
        adc = decode_iwr6843_data(fpath, params)
        rd = generate_rd_spectrogram(adc)
        for mi in range(3):  # mannequins
            cond_key = f'cond_{chr(97+mi)}'
            cond_code = params[cond_key]
            gt = CONDITION_GT.get(cond_code)
            rd_mask = mannequin_mask(rd, mi)
            RR, HR = estimate_hr_rr(rd_mask, params)
            rr_err = abs(RR - gt['RR']) if (RR is not None and gt and gt.get('RR') is not None) else None
            hr_err = abs(HR - gt['HR']) if (HR is not None and gt and gt.get('HR') is not None) else None
            results.append({
                'file': fname,
                'mannequin': mi,
                'condition': cond_code,
                'condition_name': gt['name'] if gt else None,
                'estimated_RR': RR,
                'estimated_HR': HR,
                'gt_RR': gt['RR'] if gt else None,
                'gt_HR': gt['HR'] if gt else None,
                'RR_err': rr_err,
                'HR_err': hr_err
            })
    metrics = compute_metrics(results)
    print("\n--- Validation Metrics ---")
    for k, v in metrics.items():
        print(f"{k}: {v:.2f}" if v is not None else f"{k}: N/A")
    return results, metrics

# --- Optional: save CSV ---
def save_results_csv(results, out_path="results.csv"):
    import csv
    if not results:
        print("No results to save.")
        return
    keys = results[0].keys()
    with open(out_path, 'w', newline='') as f:
        writer = csv.DictWriter(f, fieldnames=keys)
        writer.writeheader()
        writer.writerows(results)

# --- Example usage ---
if __name__ == "__main__":
    folder = "SubsetG"
    res, metrics = run_pipeline(folder)
    save_results_csv(res)
    for r in res:
        print(r)

Found 26 files in SubsetG: ['272_TopFront_4m_0deg_Man3_CondF_CondM_CondC_ADC128_Chirp64_SR1000_RE80_FR40_Gain40_FS60_IWR_0.bin', '273_TopFront_4m_0deg_Man3_CondF_CondM_CondC_ADC128_Chirp64_SR1000_RE80_FR40_Gain40_FS30_IWR_0.bin', '259_TopFront_4m_0deg_Man3_CondF_CondM_CondC_ADC128_Chirp128_SR1000_RE80_FR24_Gain30_FS30_IWR_0.bin', '274_TopFront_4m_0deg_Man3_CondF_CondM_CondC_ADC128_Chirp64_SR1000_RE80_FR40_Gain30_FS60_IWR_0.bin', '271_TopFront_4m_0deg_Man3_CondF_CondM_CondC_ADC128_Chirp128_SR1000_RE60_FR60_Gain30_FS30_IWR_0.bin', '254_TopFront_4m_0deg_Man3_CondF_CondM_CondC_ADC128_Chirp128_SR1000_RE60_FR40_Gain30_FS60_IWR_0.bin', '251_TopFront_4m_0deg_Man3_CondF_CondM_CondC_ADC128_Chirp128_SR1000_RE80_FR40_Gain30_FS30_IWR_0.bin', '262_TopFront_4m_0deg_Man3_CondF_CondM_CondC_ADC128_Chirp128_SR1000_RE60_FR24_Gain30_FS60_IWR_0.bin', '260_TopFront_4m_0deg_Man3_CondF_CondM_CondC_ADC128_Chirp128_SR1000_RE60_FR24_Gain40_FS60_IWR_0.bin', '258_TopFront_4m_0deg_Man3_CondF_CondM_CondC_ADC128_Chirp

In [7]:
import os
import numpy as np
import re

# --- GT mapping for conditions ---
CONDITION_GT = {
    'A': {'name': "Healthy rest", 'RR': 14,  'HR': 80},
    'B': {'name': "Healthy sleeping", 'RR': 12, 'HR': 60},
    'C': {'name': "Healthy agitated", 'RR': 20, 'HR': 110},
    'D': {'name': "Apnea", 'RR': 0, 'HR': 80},
    'E': {'name': "Asthma", 'RR': 22, 'HR': 90},
    'F': {'name': "Asthma attack", 'RR': 33, 'HR': 130},
    'G': {'name': "pneumothorax", 'RR': 30, 'HR': 120},
    'H': {'name': "Bradypnea", 'RR': 6, 'HR': 50},
    'I': {'name': "Cardiac arrest", 'RR': 16, 'HR': 160},
    'J': {'name': "ACS Chest Pain", 'RR': 18, 'HR': 104},
    'K': {'name': "Tachynpea Hyperventilation", 'RR': 30, 'HR': 125}
}

# --- 1. Filename Metadata Parser ---
def parse_iwr6843_filename(filename):
    basename = os.path.basename(filename)
    pattern = (
        r'(?P<id>\d+)_.*_Man\d+_'
        r'Cond(?P<cond_a>[A-Z])_Cond(?P<cond_b>[A-Z])_Cond(?P<cond_c>[A-Z])_'
        r'ADC(?P<adc>\d+)_Chirp(?P<chirp>\d+)_SR(?P<sr>\d+)_RE(?P<re>\d+)_FR(?P<fr>\d+)_Gain(?P<gain>\d+)_FS(?P<fs>\d+)_IWR'
    )
    m = re.match(pattern, basename)
    if m:
        params = {k: (int(v) if v.isdigit() else v) for k, v in m.groupdict().items()}
        return params
    else:
        raise ValueError(f"Filename {basename} does not match expected pattern")

# --- 2. Data Decoder Generalized ---
def decode_iwr6843_data(filepath, params, num_rx=4, header_bytes=0):
    num_adc_samples = int(params['adc'])
    num_chirps      = int(params['chirp'])

    with open(filepath, 'rb') as f:
        f.seek(header_bytes)
        raw = np.fromfile(f, dtype=np.int16)

    # Ensure I/Q pairing
    if raw.size % 2 != 0:
        raw = raw[:-1]

    raw = raw.reshape(-1, 2)
    complex_data = raw[:, 0].astype(np.float32) + 1j * raw[:, 1].astype(np.float32)

    samp_pf    = num_rx * num_chirps * num_adc_samples
    num_frames = complex_data.size // samp_pf
    if num_frames == 0:
        print(f"Warning: No frames found for {filepath}. Check ADC/Chirp parameters.")
        return None

    complex_data = complex_data[: num_frames * samp_pf]

    adc = np.empty((num_frames, num_rx, num_chirps, num_adc_samples), dtype=np.complex64)
    for fr in range(num_frames):
        base = fr * samp_pf
        for rx in range(num_rx):
            st = base + rx * (num_chirps * num_adc_samples)
            en = st + (num_chirps * num_adc_samples)
            adc[fr, rx] = complex_data[st:en].reshape(num_chirps, num_adc_samples)
    return adc

# --- 5. HR/RR Estimation (FFT on slow time) ---
def estimate_hr_rr(rd_masked, params):
    """


    return RR, HR"""""

# --- 6. Metrics ---
def compute_metrics(results):
    if not results:
        return {"RR_MAE": None, "RR_RMSE": None, "HR_MAE": None, "HR_RMSE": None}
    from sklearn.metrics import mean_absolute_error, mean_squared_error
    rr_est = [r['estimated_RR'] for r in results if r['gt_RR'] is not None and r['estimated_RR'] is not None]
    rr_gt  = [r['gt_RR'] for r in results if r['gt_RR'] is not None and r['estimated_RR'] is not None]
    hr_est = [r['estimated_HR'] for r in results if r['gt_HR'] is not None and r['estimated_HR'] is not None]
    hr_gt  = [r['gt_HR'] for r in results if r['gt_HR'] is not None and r['estimated_HR'] is not None]

    metrics = {
        "RR_MAE": mean_absolute_error(rr_gt, rr_est) if rr_gt else None,
        "RR_RMSE": np.sqrt(mean_squared_error(rr_gt, rr_est)) if rr_gt else None,
        "HR_MAE": mean_absolute_error(hr_gt, hr_est) if hr_gt else None,
        "HR_RMSE": np.sqrt(mean_squared_error(hr_gt, hr_est)) if hr_gt else None,
    }
    return metrics

# --- 7. Pipeline Runner ---
def run_pipeline(folder):
    files = [f for f in os.listdir(folder) if f.endswith('.bin') and '_IWR' in f]
    print(f"Found {len(files)} files in {folder}: {files}")
    if not files:
        print("WARNING: No radar files found. Check your folder path and file naming.")
    results = []
    for fname in files:
        fpath = os.path.join(folder, fname)
        try:
            params = parse_iwr6843_filename(fpath)
        except Exception as e:
            print(f"Skipping file {fname}: {e}")
            continue
        adc = decode_iwr6843_data(fpath, params)
        rd = generate_rd_spectrogram(adc)
        for mi in range(3):  # mannequins
            cond_key = f'cond_{chr(97+mi)}'
            cond_code = params[cond_key]
            gt = CONDITION_GT.get(cond_code)
            rd_mask = mannequin_mask(rd, mi)
            RR, HR = estimate_hr_rr(rd_mask, params)
            rr_err = abs(RR - gt['RR']) if (RR is not None and gt and gt.get('RR') is not None) else None
            hr_err = abs(HR - gt['HR']) if (HR is not None and gt and gt.get('HR') is not None) else None
            results.append({
                'file': fname,
                'mannequin': mi,
                'condition': cond_code,
                'condition_name': gt['name'] if gt else None,
                'estimated_RR': RR,
                'estimated_HR': HR,
                'gt_RR': gt['RR'] if gt else None,
                'gt_HR': gt['HR'] if gt else None,
                'RR_err': rr_err,
                'HR_err': hr_err
            })
    metrics = compute_metrics(results)
    print("\n--- Validation Metrics ---")
    for k, v in metrics.items():
        print(f"{k}: {v:.2f}" if v is not None else f"{k}: N/A")
    return results, metrics


# --- Example usage ---
if __name__ == "__main__":
    folder = "SubsetJ"
    res, metrics = run_pipeline(folder)
    save_results_csv(res)
    for r in res:
        print(r)

Found 25 files in SubsetJ: ['333_TopFront_4m_0deg_Man3_CondF_CondM_CondC_ADC64_Chirp128_SR1000_RE60_FR24_Gain40_FS30_IWR.bin', '329_TopFront_4m_0deg_Man3_CondF_CondM_CondC_ADC64_Chirp128_SR1000_RE80_FR24_Gain40_FS30_IWR.bin', '339_TopFront_4m_0deg_Man3_CondF_CondM_CondC_ADC64_Chirp128_SR1000_RE80_FR60_Gain30_FS30_IWR.bin', '340_TopFront_4m_0deg_Man3_CondF_CondM_CondC_ADC64_Chirp128_SR1000_RE60_FR60_Gain40_FS60_IWR.bin', '323_TopFront_4m_0deg_Man3_CondF_CondM_CondC_ADC64_Chirp128_SR1000_RE80_FR40_Gain30_FS30_IWR.bin', '344_TopFront_4m_0deg_Man3_CondF_CondM_CondC_ADC64_Chirp64_SR1000_RE80_FR40_Gain40_FS60_IWR.bin', '325_TopFront_4m_0deg_Man3_CondF_CondM_CondC_ADC64_Chirp128_SR1000_RE60_FR40_Gain40_FS30_IWR.bin', '341_TopFront_4m_0deg_Man3_CondF_CondM_CondC_ADC64_Chirp128_SR1000_RE60_FR60_Gain40_FS30_IWR.bin', '330_TopFront_4m_0deg_Man3_CondF_CondM_CondC_ADC64_Chirp128_SR1000_RE80_FR24_Gain30_FS60_IWR.bin', '328_TopFront_4m_0deg_Man3_CondF_CondM_CondC_ADC64_Chirp128_SR1000_RE80_FR24_Gain4